""" Baixa relatórios de recebíveis via API /v2/franchisee/reports e importa no SQL Server. Modos de operação: - INCREMENTAL=True → importa do dia seguinte ao último importado até ontem (diário) - INCREMENTAL=False → importa o intervalo fixo START_DATE..END_DATE Deduplicação: - dbo.Grgb_vendas_import_log registra cada arquivo já importado pelo fileName - Se o arquivo já constar no log, a importação é pulada Mediadores: - USE_ALL_MEDIATORS=True → busca todos via API de lojas (get_store_codes) - USE_ALL_MEDIATORS=False → usa a lista MEDIATOR_CODES abaixo """ import csv import io import json import os import re import time import requests from datetime import date, timedelta from typing import Optional from installments_reader import Auth # ─── CONFIGURE AQUI ─────────────────────────────────────────────────────────── USE_ALL_MEDIATORS = True # True = usa ALL_MEDIATOR_CODES abaixo; False = usa MEDIATOR_CODES MEDIATOR_CODES = ["23708"] # usado apenas se USE_ALL_MEDIATORS = False # Lista exata dos mediatorCodes válidos para o relatório (copiado do portal) ALL_MEDIATOR_CODES = [ "checkAll", "19334", "19335", "19336", "20008", "20010", "20029", "20058", "20059", "20060", "20061", "20062", "20063", "20064", "20142", "20364", "20443", "20444", "20621", "20622", "20623", "20772", "20773", "20774", "20775", "20776", "20777", "20778", "20779", "20780", "20781", "20968", "20969", "20970", "20986", "20988", "20989", "20991", "20992", "20993", "20994", "20995", "20996", "20997", "20998", "20999", "21000", "21001", "21278", "21279", "21375", "21383", "21495", "22448", "22541", "23703", "23704", "23708", "23711", "23712", "23713", "23813", "24255", "24257", "24269", "24293", "24447", "24451", "24457", "24458", "4494", ] INCREMENTAL = True # True = modo diário incremental START_DATE = "2026-01-01" # usado apenas se INCREMENTAL = False END_DATE = "2026-01-31" # usado apenas se INCREMENTAL = False INCREMENTAL_DEFAULT_START = "2026-01-01" # data inicial no primeiro run incremental DATA_TYPE = "VENDAS" REPORT_TYPE = "F360" MAX_DAYS_PER_CHUNK = 15 # API limita a 15 dias por requisição com todos os mediadores POLL_INTERVAL_S = 20 POLL_TIMEOUT_S = 600 WRITE_SQL = True _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) WATERMARK_FILE = os.path.join(_SCRIPT_DIR, "vendas_watermark.json") IMPORT_BATCH_SIZE = 500 # linhas por lote no SQL Server # ────────────────────────────────────────────────────────────────────────────── SQL_CONN = ( "DRIVER={ODBC Driver 17 for SQL Server};" "SERVER=10.77.77.10;" "DATABASE=GINSENG;" "UID=andrey;" "PWD=88253332;" "TrustServerCertificate=yes;" ) REPORTS_URL = ( "https://bff-portal-apigw.produto-financeiro.grupoboticario.digital" "/v2/franchisee/reports" ) _DONE_STATUSES = {"generated", "generated_csv", "done", "completed", "ready", "available"} _FAIL_STATUSES = {"failed", "error", "erro", "falha"} # ─── WATERMARK ──────────────────────────────────────────────────────────────── def _load_watermark() -> str: """Retorna a última data importada (YYYY-MM-DD) ou a data padrão inicial.""" try: if os.path.exists(WATERMARK_FILE): with open(WATERMARK_FILE, encoding="utf-8") as f: data = json.load(f) val = str(data.get("last_end_date") or "").strip() if val: return val except Exception: pass return "" def _save_watermark(end_date: str) -> None: tmp = WATERMARK_FILE + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump({"last_end_date": end_date}, f) os.replace(tmp, WATERMARK_FILE) # ─── API ────────────────────────────────────────────────────────────────────── def _headers(auth: Auth) -> dict: return { "Authorization": auth.get_bearer(), "Accept": "*/*", "Content-Type": "application/json", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", "User-Agent": "Mozilla/5.0", } def _find_pending_in_list( session: requests.Session, auth: Auth, start_date: str, end_date: str, ) -> Optional[str]: """ Após um 504 no POST, o servidor pode ter criado o relatório mesmo sem responder. Busca na listagem um relatório recente com o mesmo período. """ try: raw = session.get( REPORTS_URL, params={"type": REPORT_TYPE, "dataType": DATA_TYPE}, headers=_headers(auth), timeout=30, ).json() reports = raw.get("data", {}).get("reports") or [] for rep in reports: # compara datas do relatório — a API retorna solicitedAt, não as datas do período, # então usamos o índice 0 (mais recente) criado nos últimos 5 minutos solicited = str(rep.get("solicitedAt") or "") # pega o mais recente (lista já vem ordenada por data desc) rid = str(rep.get("id") or "") if rid: print(f"[info] relatório encontrado na lista após 504: id={rid} status={rep.get('status')}") return rid except Exception as exc: print(f"[warn] falha ao buscar lista após 504: {exc}") return None def _create_report( session: requests.Session, auth: Auth, mediator_codes: list[str], start_date: str, end_date: str, ) -> str: """Solicita criação do relatório e retorna o report_id.""" payload = { "dataType": DATA_TYPE, "startSolicitedDate": start_date, "endSolicitedDate": end_date, "installmentCode": "", "installmentGroupCode": "", "mediatorCodes": mediator_codes, "type": REPORT_TYPE, } print(f"[POST] {DATA_TYPE} {start_date}..{end_date} mediadores={len(mediator_codes)}") transient = {429, 500, 502, 503, 504} for attempt in range(1, 6): try: r = session.post(REPORTS_URL, json=payload, headers=_headers(auth), timeout=60) if r.status_code in transient: wait = min(60, 10 * attempt) print(f"[warn] POST {r.status_code} (tentativa {attempt}/5), aguardando {wait}s...") time.sleep(wait) # 504 pode significar que o servidor criou mas não respondeu — checa a lista if r.status_code == 504: rid = _find_pending_in_list(session, auth, start_date, end_date) if rid: return rid continue if not r.ok: print(f"[erro] POST {r.status_code}: {r.text[:300].encode('ascii','replace').decode()}") r.raise_for_status() resp = r.json() rid = _extract_report_id(resp) if not rid: raise RuntimeError(f"report_id ausente na resposta: {resp}") fname = _extract_file_name(resp).encode("ascii", "replace").decode() print(f"[info] report_id={rid} fileName={fname}") return rid except requests.exceptions.Timeout: wait = min(60, 10 * attempt) print(f"[warn] POST timeout (tentativa {attempt}/5), aguardando {wait}s...") time.sleep(wait) rid = _find_pending_in_list(session, auth, start_date, end_date) if rid: return rid raise RuntimeError("POST /reports falhou após 5 tentativas (504/timeout persistente)") def _extract_report_id(response: dict) -> Optional[str]: data = response.get("data") or {} if isinstance(data.get("report"), dict): rid = data["report"].get("id") if rid: return str(rid) for key in ("id", "reportId", "uuid", "code"): if data.get(key): return str(data[key]) return None def _extract_file_name(response: dict) -> str: data = response.get("data") or {} rep = data.get("report") or data return str(rep.get("filename") or rep.get("fileName") or "") def _find_download_url(obj: dict) -> Optional[str]: for key in ("fileLink", "url", "downloadUrl", "fileUrl", "s3Url", "link", "presignedUrl", "signedUrl"): if obj.get(key): return str(obj[key]) return None def _fetch_report_by_id( session: requests.Session, auth: Auth, report_id: str ) -> tuple[Optional[str], str]: """Retorna (download_url, file_name) buscando o relatório pelo id.""" r = session.get(f"{REPORTS_URL}/{report_id}", headers=_headers(auth), timeout=30) if r.status_code != 200: return None, "" body = r.json() data = body.get("data") or {} rep = data.get("report") or data url = _find_download_url(rep) or _find_download_url(data) or _find_download_url(body) name = str(rep.get("fileName") or rep.get("filename") or "") return url, name def _poll_until_ready( session: requests.Session, auth: Auth, report_id: str, ) -> tuple[str, str]: """ Consulta GET /reports?type=F360&dataType=RECEBIVEIS a cada POLL_INTERVAL_S segundos. Quando o relatório chegar em GENERATED, busca o fileLink via GET /reports/{id}. Retorna (download_url, file_name). """ deadline = time.monotonic() + POLL_TIMEOUT_S attempt = 0 while time.monotonic() < deadline: attempt += 1 raw = session.get( REPORTS_URL, params={"type": REPORT_TYPE, "dataType": DATA_TYPE}, headers=_headers(auth), timeout=30, ).json() reports = raw.get("data", {}).get("reports") or [] for rep in reports: if str(rep.get("id") or "") != report_id: continue status = str(rep.get("status") or "").strip().lower() if status in _FAIL_STATUSES: raise RuntimeError(f"Relatório falhou: {rep}") if status in _DONE_STATUSES: url, file_name = _fetch_report_by_id(session, auth, report_id) if url: print(f"[poll] pronto após {attempt} tentativa(s) (status={status})") return url, file_name print(f"[poll] tentativa {attempt} status={status}, aguardando {POLL_INTERVAL_S}s...") break else: print(f"[poll] tentativa {attempt} relatório não listado ainda, " f"aguardando {POLL_INTERVAL_S}s...") time.sleep(POLL_INTERVAL_S) raise TimeoutError(f"Relatório não ficou pronto em {POLL_TIMEOUT_S}s") def _download_csv(url: str) -> str: print("[download] baixando CSV do S3...") r = requests.get(url, timeout=180) r.raise_for_status() for enc in ("utf-8-sig", "utf-8", "latin-1"): try: return r.content.decode(enc) except UnicodeDecodeError: continue return r.content.decode("latin-1", errors="replace") # ─── SQL ────────────────────────────────────────────────────────────────────── def _sql_conn_str() -> str: s = SQL_CONN.rstrip(";") if not re.search(r"(?i)\bEncrypt\b", s): s += ";Encrypt=no" if not re.search(r"(?i)\bTrustServerCertificate\b", s): s += ";TrustServerCertificate=yes" return s + ";" def _safe_col(name: str) -> str: name = re.sub(r"[^a-zA-Z0-9_]", "_", name.strip()) if name and name[0].isdigit(): name = "c_" + name return name[:128] or "col" def _ensure_tables(cur, csv_cols: list[str]) -> list[str]: """Garante que as tabelas de dados e log existam, com todas as colunas do CSV.""" # tabela de log de importações (controle de duplicatas) cur.execute(""" IF OBJECT_ID('dbo.Grgb_vendas_import_log', 'U') IS NULL BEGIN CREATE TABLE dbo.Grgb_vendas_import_log ( Id INT IDENTITY(1,1) PRIMARY KEY, FileName VARCHAR(120) NOT NULL UNIQUE, PeriodoInicio DATE NULL, PeriodoFim DATE NULL, TotalLinhas INT NOT NULL DEFAULT 0, ImportadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() ) END """) # tabela de dados cur.execute(""" IF OBJECT_ID('dbo.Grgb_vendas_report', 'U') IS NULL BEGIN CREATE TABLE dbo.Grgb_vendas_report ( Id INT IDENTITY(1,1) PRIMARY KEY, NomeArquivo VARCHAR(120) NULL, CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() ) END """) # garante coluna NomeArquivo cur.execute(""" SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report' """) existing = {r[0] for r in cur.fetchall()} if "nomearquivo" not in existing: cur.execute("ALTER TABLE dbo.Grgb_vendas_report ADD [NomeArquivo] VARCHAR(120) NULL") safe_cols = [_safe_col(c) for c in csv_cols] # recarrega colunas existentes cur.execute(""" SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report' """) existing = {r[0] for r in cur.fetchall()} for col in safe_cols: if col.lower() not in existing: cur.execute( f"ALTER TABLE dbo.Grgb_vendas_report ADD [{col}] NVARCHAR(500) NULL" ) print(f"[schema] coluna adicionada: {col}") return safe_cols def _is_already_imported(cur, file_name: str) -> bool: cur.execute( "SELECT 1 FROM dbo.Grgb_vendas_import_log WHERE FileName = ?", file_name ) return cur.fetchone() is not None def _import_csv( rows: list[dict], csv_cols: list[str], file_name: str, start_date: str, end_date: str, ) -> int: import pyodbc conn_str = _sql_conn_str() total = len(rows) # --- conexão inicial: garante schema e checa log --- cn = pyodbc.connect(conn_str, timeout=60) cn.autocommit = False cur = cn.cursor() safe_cols = _ensure_tables(cur, csv_cols) cn.commit() if _is_already_imported(cur, file_name): print(f"[skip] {file_name} ja importado anteriormente") cur.close(); cn.close() return 0 # limpa linhas parciais de execuções que falharam no meio cur.execute("DELETE FROM dbo.Grgb_vendas_report WHERE NomeArquivo = ?", file_name) cn.commit() cur.close(); cn.close() # --- insere em lotes para não derrubar a conexão --- all_cols = ["NomeArquivo"] + safe_cols col_list = ", ".join(f"[{c}]" for c in all_cols) placeholders = ", ".join("?" for _ in all_cols) sql = ( f"INSERT INTO dbo.Grgb_vendas_report ({col_list}) " f"VALUES ({placeholders})" ) inserted = 0 for batch_start in range(0, total, IMPORT_BATCH_SIZE): batch_rows = rows[batch_start: batch_start + IMPORT_BATCH_SIZE] batch = [ (file_name,) + tuple( (str(row.get(orig, "") or "")[:500] or None) for orig in csv_cols ) for row in batch_rows ] cn = pyodbc.connect(conn_str, timeout=60) cn.autocommit = False cur = cn.cursor() cur.fast_executemany = True try: cur.executemany(sql, batch) cn.commit() inserted += len(batch) print(f"[sql] {inserted}/{total} linhas arquivo={file_name}") except Exception: cn.rollback() raise finally: cur.close(); cn.close() # registra no log de controle cn = pyodbc.connect(conn_str, timeout=60) cn.autocommit = False cur = cn.cursor() try: cur.execute( """ INSERT INTO dbo.Grgb_vendas_import_log (FileName, PeriodoInicio, PeriodoFim, TotalLinhas) VALUES (?, ?, ?, ?) """, file_name, start_date, end_date, inserted, ) cn.commit() except Exception: cn.rollback() raise finally: cur.close(); cn.close() print(f"[sql] importacao concluida: {inserted} linhas arquivo={file_name}") return inserted # ─── MAIN ───────────────────────────────────────────────────────────────────── def _resolve_date_range() -> tuple[str, str]: """Sempre importa D-1. Antes das 9h30 BRT, dados ainda não disponíveis.""" from datetime import datetime, timezone, timedelta as td BRT = timezone(td(hours=-3)) now_brt = datetime.now(tz=BRT) cutoff = now_brt.replace(hour=9, minute=30, second=0, microsecond=0) if now_brt < cutoff: print("[info] antes das 9h30 BRT — dados de D-1 ainda nao disponiveis.") return "", "" yesterday = (now_brt - td(days=1)).date().isoformat() return yesterday, yesterday def _build_chunks(start_date: str, end_date: str) -> list[tuple[str, str]]: """Divide o intervalo em janelas de MAX_DAYS_PER_CHUNK dias.""" chunks = [] cursor = date.fromisoformat(start_date) end = date.fromisoformat(end_date) step = timedelta(days=MAX_DAYS_PER_CHUNK - 1) while cursor <= end: chunk_end = min(cursor + step, end) chunks.append((cursor.isoformat(), chunk_end.isoformat())) cursor = chunk_end + timedelta(days=1) return chunks def main() -> None: start_date, end_date = _resolve_date_range() if not start_date: print("[info] dados já estão atualizados até ontem, nada a importar.") return print(f"[info] período a importar: {start_date} .. {end_date}") session = requests.Session() session.trust_env = False auth = Auth(session) # resolve mediator codes if USE_ALL_MEDIATORS: mediator_codes = ALL_MEDIATOR_CODES print(f"[info] usando todos os mediadores ({len(mediator_codes) - 1} lojas + checkAll)") else: mediator_codes = list(MEDIATOR_CODES) if not mediator_codes: print("[erro] nenhum mediator code disponível, abortando.") return chunks = _build_chunks(start_date, end_date) print(f"[info] {len(chunks)} janela(s) de até {MAX_DAYS_PER_CHUNK} dias") total_linhas = 0 last_ok_end = _load_watermark() if INCREMENTAL else "" for i, (chunk_start, chunk_end) in enumerate(chunks, 1): print(f"\n[chunk {i}/{len(chunks)}] {chunk_start} .. {chunk_end}") report_id = _create_report(session, auth, mediator_codes, chunk_start, chunk_end) download_url, file_name = _poll_until_ready(session, auth, report_id) print(f"[info] fileName={file_name.encode('ascii','replace').decode()}") csv_text = _download_csv(download_url) reader = csv.DictReader(io.StringIO(csv_text)) csv_cols = list(reader.fieldnames or []) rows = list(reader) print(f"[csv] {len(rows)} linhas {len(csv_cols)} colunas") if not rows: print("[info] chunk vazio.") elif not WRITE_SQL: print("[info] WRITE_SQL=False — primeiras 2 linhas:") for r in rows[:2]: print(json.dumps(r, ensure_ascii=False)) else: n = _import_csv(rows, csv_cols, file_name, chunk_start, chunk_end) total_linhas += n if INCREMENTAL: _save_watermark(chunk_end) last_ok_end = chunk_end print(f"\n[fim] total={total_linhas} linhas watermark={last_ok_end}") if __name__ == "__main__": main()