diff --git a/__pycache__/installments_reader.cpython-311.pyc b/__pycache__/installments_reader.cpython-311.pyc new file mode 100644 index 0000000..70bde61 Binary files /dev/null and b/__pycache__/installments_reader.cpython-311.pyc differ diff --git a/installments_backfill_vd.py b/installments_backfill_vd.py deleted file mode 100644 index 1f0d972..0000000 --- a/installments_backfill_vd.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -from datetime import date - -import installments_reader - - -def _setdefault_env(key: str, value: str) -> None: - if not os.getenv(key): - os.environ[key] = value - - -def main() -> None: - today = date.today() - start_of_year = date(today.year, 1, 1) - - defaults = { - "STORE_CHANNEL": "VD", - "START_INSTALLMENT_CHANGE_DATE": start_of_year.isoformat(), - "END_INSTALLMENT_CHANGE_DATE": today.isoformat(), - "CHUNK_DAYS": "30", - "LAST_N_DAYS": "", - "STORE_WORKERS": "1", - "GROUP_WORKERS": "1", - "WRITE_SQL": "1", - "SAVE_JSON": "0", - "INCREMENTAL_MODE": "0", - "LOG_GROUP_CODES": "0", - "INSTALLMENTS_PAGE_SIZE": "50", - "FLUSH_EVERY_PAGES": "50", - "INSTALLMENTS_MIN_INTERVAL_MS": "3500", - "INSTALLMENTS_429_MIN_WAIT_SEC": "45", - "THROTTLE_RECOVERY_PAUSE_SEC": "900", - } - for key, value in defaults.items(): - _setdefault_env(key, value) - - print( - "[backfill] modo=VD " - f"periodo={os.environ['START_INSTALLMENT_CHANGE_DATE']}..{os.environ['END_INSTALLMENT_CHANGE_DATE']} " - f"chunk_days={os.environ['CHUNK_DAYS']} write_sql={os.environ['WRITE_SQL']} " - f"min_interval_ms={os.environ['INSTALLMENTS_MIN_INTERVAL_MS']} " - f"wait429_min_s={os.environ['INSTALLMENTS_429_MIN_WAIT_SEC']}" - ) - installments_reader.main() - - -if __name__ == "__main__": - main() diff --git a/installments_by_order.py b/installments_by_order.py deleted file mode 100644 index d6299c0..0000000 --- a/installments_by_order.py +++ /dev/null @@ -1,185 +0,0 @@ -""" -Consulta a API de parcelas por installmentGroupCode (orderNumber) e grava -em DocPedidos + DocPedidosParcelas. - -Fonte dos códigos: SELECT orderNumber FROM Grgb_fiscal_invoices_items -""" -import os -import time -import requests - -import installments_reader -from installments_reader import Auth, get_installments_page, upsert_doc_pedidos_sqlserver - -# ─── CONFIGURE AQUI ─────────────────────────────────────────────────────────── -WRITE_SQL = True # True = grava no banco | False = só lê e exibe -SKIP_EXISTING = True # True = pula códigos já gravados em DocPedidos -BATCH_SIZE = 10 # grava no banco a cada N pedidos processados -MIN_INTERVAL_MS = 3500 # ms mínimo entre chamadas à API -# ────────────────────────────────────────────────────────────────────────────── - -SQL_CONN = ( - "DRIVER={ODBC Driver 17 for SQL Server};" - "SERVER=10.77.77.10;" - "DATABASE=GINSENG;" - "UID=andrey;" - "PWD=88253332;" - "TrustServerCertificate=yes;" -) - - -def _get_order_numbers() -> list[str]: - import pyodbc - cn = pyodbc.connect(SQL_CONN, timeout=30) - cur = cn.cursor() - cur.execute( - "SELECT DISTINCT [orderNumber] " - "FROM [GINSENG].[dbo].[Grgb_fiscal_invoices_items] " - "WHERE [orderNumber] IS NOT NULL AND LTRIM(RTRIM(CAST([orderNumber] AS VARCHAR))) <> ''" - ) - return [str(row[0]).strip() for row in cur.fetchall()] - - -def _get_existing_group_codes() -> set[str]: - import pyodbc - try: - cn = pyodbc.connect(SQL_CONN, timeout=30) - cur = cn.cursor() - cur.execute("SELECT InstallmentGroupCode FROM dbo.DocPedidos") - return {str(row[0]).strip() for row in cur.fetchall()} - except Exception: - return set() - - -def _fetch_all_pages(session: requests.Session, auth: Auth, group_code: str) -> list: - all_items = [] - page = 1 - while True: - body = get_installments_page( - session=session, - auth=auth, - start_date=None, - end_date=None, - installment_change=None, - mediator_code=None, - page=page, - installment_group_code=group_code, - cookie_header=None, - ) - items = ((body.get("data") or {}).get("installments")) or [] - all_items.extend(items) - - pagination = (body.get("data") or {}).get("pagination") or {} - total = int(pagination.get("total") or 0) - limit = int(pagination.get("limit") or len(items) or 10) - total_pages = max(1, (total + limit - 1) // limit) if total else page - - print(f" pagina={page}/{total_pages} itens={len(items)}") - if not items or page >= total_pages: - break - page += 1 - - return all_items - - -def _flush(batch: list, label: str = "") -> dict: - if not batch or not WRITE_SQL: - return {} - stats = upsert_doc_pedidos_sqlserver(batch, SQL_CONN) - print( - f"[sql] {label} " - f"pedidos={stats.get('pedidos', 0)} parcelas={stats.get('parcelas', 0)}" - ) - return stats - - -def main() -> None: - os.environ.setdefault("INSTALLMENTS_MIN_INTERVAL_MS", str(MIN_INTERVAL_MS)) - os.environ.setdefault("INSTALLMENTS_429_MIN_WAIT_SEC", "45") - os.environ.setdefault("THROTTLE_RECOVERY_PAUSE_SEC", "900") - - print("[info] buscando orderNumbers no banco...") - order_numbers = _get_order_numbers() - print(f"[info] total encontrado: {len(order_numbers)}") - - existing: set[str] = set() - if SKIP_EXISTING: - existing = _get_existing_group_codes() - print(f"[info] já em DocPedidos: {len(existing)} — serão pulados") - - pending = [c for c in order_numbers if c not in existing] - print(f"[info] para consultar na API: {len(pending)}\n") - - session = requests.Session() - session.trust_env = False - auth = Auth(session) - - batch: list = [] - batch_first_idx: int = 0 - batch_first_code: str = "" - total_pedidos = 0 - total_parcelas = 0 - erros = 0 - start_ts = time.monotonic() - - def _fmt(s: float) -> str: - s = int(s) - if s < 60: - return f"{s}s" - if s < 3600: - return f"{s // 60}m:{s % 60:02d}s" - return f"{s // 3600}h:{(s % 3600) // 60:02d}m" - - for idx, group_code in enumerate(pending, 1): - elapsed = time.monotonic() - start_ts - avg_s = elapsed / idx - restante = avg_s * (len(pending) - idx) - print(f"[{idx}/{len(pending)}] group_code={group_code} " - f"decorrido={_fmt(elapsed)} restante~{_fmt(restante)}") - try: - items = _fetch_all_pages(session, auth, group_code) - if not items: - print(" sem dados na API") - continue - - if not batch: - batch_first_idx = idx - batch_first_code = group_code - - batch.append({ - "installmentGroupCode": group_code, - "rawResponse": {"data": {"installments": items}}, - "installments": items, - }) - - if len(batch) >= BATCH_SIZE: - label = (f"inserindo consultas {batch_first_idx}..{idx} " - f"(group {batch_first_code} até {group_code})") - stats = _flush(batch, label) - total_pedidos += stats.get("pedidos", 0) - total_parcelas += stats.get("parcelas", 0) - batch = [] - batch_first_idx = 0 - batch_first_code = "" - - except Exception as exc: - print(f" [erro] {exc}") - erros += 1 - - # flush final - if batch: - label = (f"inserindo consultas {batch_first_idx}..{len(pending)} " - f"(group {batch_first_code} até {batch[-1]['installmentGroupCode']})") - else: - label = "flush final" - stats = _flush(batch, label) - total_pedidos += stats.get("pedidos", 0) - total_parcelas += stats.get("parcelas", 0) - - print( - f"\n[fim] pedidos={total_pedidos} parcelas={total_parcelas} erros={erros}" - ) - - -if __name__ == "__main__": - main() diff --git a/recebiveis_report_importer.py b/recebiveis_report_importer.py new file mode 100644 index 0000000..1abb8df --- /dev/null +++ b/recebiveis_report_importer.py @@ -0,0 +1,556 @@ +""" +Baixa relatórios de recebíveis via API /v2/franchisee/reports e importa no SQL Server. + +Modos de operação: + - INCREMENTAL=True → importa do dia seguinte ao último importado até ontem (diário) + - INCREMENTAL=False → importa o intervalo fixo START_DATE..END_DATE + +Deduplicação: + - dbo.Grgb_vendas_import_log registra cada arquivo já importado pelo fileName + - Se o arquivo já constar no log, a importação é pulada + +Mediadores: + - USE_ALL_MEDIATORS=True → busca todos via API de lojas (get_store_codes) + - USE_ALL_MEDIATORS=False → usa a lista MEDIATOR_CODES abaixo +""" +import csv +import io +import json +import os +import re +import time +import requests +from datetime import date, timedelta +from typing import Optional + +from installments_reader import Auth + +# ─── CONFIGURE AQUI ─────────────────────────────────────────────────────────── +USE_ALL_MEDIATORS = True # True = usa ALL_MEDIATOR_CODES abaixo; False = usa MEDIATOR_CODES +MEDIATOR_CODES = ["23708"] # usado apenas se USE_ALL_MEDIATORS = False + +# Lista exata dos mediatorCodes válidos para o relatório (copiado do portal) +ALL_MEDIATOR_CODES = [ + "checkAll", + "19334", "19335", "19336", "20008", "20010", "20029", "20058", "20059", + "20060", "20061", "20062", "20063", "20064", "20142", "20364", "20443", + "20444", "20621", "20622", "20623", "20772", "20773", "20774", "20775", + "20776", "20777", "20778", "20779", "20780", "20781", "20968", "20969", + "20970", "20986", "20988", "20989", "20991", "20992", "20993", "20994", + "20995", "20996", "20997", "20998", "20999", "21000", "21001", "21278", + "21279", "21375", "21383", "21495", "22448", "22541", "23703", "23704", + "23708", "23711", "23712", "23713", "23813", "24255", "24257", "24269", + "24293", "24447", "24451", "24457", "24458", "4494", +] + +INCREMENTAL = True # True = modo diário incremental +START_DATE = "2026-01-01" # usado apenas se INCREMENTAL = False +END_DATE = "2026-01-31" # usado apenas se INCREMENTAL = False +INCREMENTAL_DEFAULT_START = "2026-01-01" # data inicial no primeiro run incremental + +DATA_TYPE = "VENDAS" +REPORT_TYPE = "F360" +MAX_DAYS_PER_CHUNK = 15 # API limita a 15 dias por requisição com todos os mediadores +POLL_INTERVAL_S = 20 +POLL_TIMEOUT_S = 600 +WRITE_SQL = True +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +WATERMARK_FILE = os.path.join(_SCRIPT_DIR, "vendas_watermark.json") +IMPORT_BATCH_SIZE = 500 # linhas por lote no SQL Server +# ────────────────────────────────────────────────────────────────────────────── + +SQL_CONN = ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" +) + +REPORTS_URL = ( + "https://bff-portal-apigw.produto-financeiro.grupoboticario.digital" + "/v2/franchisee/reports" +) + +_DONE_STATUSES = {"generated", "generated_csv", "done", "completed", "ready", "available"} +_FAIL_STATUSES = {"failed", "error", "erro", "falha"} + + +# ─── WATERMARK ──────────────────────────────────────────────────────────────── + +def _load_watermark() -> str: + """Retorna a última data importada (YYYY-MM-DD) ou a data padrão inicial.""" + try: + if os.path.exists(WATERMARK_FILE): + with open(WATERMARK_FILE, encoding="utf-8") as f: + data = json.load(f) + val = str(data.get("last_end_date") or "").strip() + if val: + return val + except Exception: + pass + return "" + + +def _save_watermark(end_date: str) -> None: + tmp = WATERMARK_FILE + ".tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump({"last_end_date": end_date}, f) + os.replace(tmp, WATERMARK_FILE) + + +# ─── API ────────────────────────────────────────────────────────────────────── + +def _headers(auth: Auth) -> dict: + return { + "Authorization": auth.get_bearer(), + "Accept": "*/*", + "Content-Type": "application/json", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + + +def _find_pending_in_list( + session: requests.Session, + auth: Auth, + start_date: str, + end_date: str, +) -> Optional[str]: + """ + Após um 504 no POST, o servidor pode ter criado o relatório mesmo sem responder. + Busca na listagem um relatório recente com o mesmo período. + """ + try: + raw = session.get( + REPORTS_URL, + params={"type": REPORT_TYPE, "dataType": DATA_TYPE}, + headers=_headers(auth), + timeout=30, + ).json() + reports = raw.get("data", {}).get("reports") or [] + for rep in reports: + # compara datas do relatório — a API retorna solicitedAt, não as datas do período, + # então usamos o índice 0 (mais recente) criado nos últimos 5 minutos + solicited = str(rep.get("solicitedAt") or "") + # pega o mais recente (lista já vem ordenada por data desc) + rid = str(rep.get("id") or "") + if rid: + print(f"[info] relatório encontrado na lista após 504: id={rid} status={rep.get('status')}") + return rid + except Exception as exc: + print(f"[warn] falha ao buscar lista após 504: {exc}") + return None + + +def _create_report( + session: requests.Session, + auth: Auth, + mediator_codes: list[str], + start_date: str, + end_date: str, +) -> str: + """Solicita criação do relatório e retorna o report_id.""" + payload = { + "dataType": DATA_TYPE, + "startSolicitedDate": start_date, + "endSolicitedDate": end_date, + "installmentCode": "", + "installmentGroupCode": "", + "mediatorCodes": mediator_codes, + "type": REPORT_TYPE, + } + print(f"[POST] {DATA_TYPE} {start_date}..{end_date} mediadores={len(mediator_codes)}") + transient = {429, 500, 502, 503, 504} + for attempt in range(1, 6): + try: + r = session.post(REPORTS_URL, json=payload, headers=_headers(auth), timeout=60) + if r.status_code in transient: + wait = min(60, 10 * attempt) + print(f"[warn] POST {r.status_code} (tentativa {attempt}/5), aguardando {wait}s...") + time.sleep(wait) + # 504 pode significar que o servidor criou mas não respondeu — checa a lista + if r.status_code == 504: + rid = _find_pending_in_list(session, auth, start_date, end_date) + if rid: + return rid + continue + if not r.ok: + print(f"[erro] POST {r.status_code}: {r.text[:300].encode('ascii','replace').decode()}") + r.raise_for_status() + resp = r.json() + rid = _extract_report_id(resp) + if not rid: + raise RuntimeError(f"report_id ausente na resposta: {resp}") + fname = _extract_file_name(resp).encode("ascii", "replace").decode() + print(f"[info] report_id={rid} fileName={fname}") + return rid + except requests.exceptions.Timeout: + wait = min(60, 10 * attempt) + print(f"[warn] POST timeout (tentativa {attempt}/5), aguardando {wait}s...") + time.sleep(wait) + rid = _find_pending_in_list(session, auth, start_date, end_date) + if rid: + return rid + raise RuntimeError("POST /reports falhou após 5 tentativas (504/timeout persistente)") + + +def _extract_report_id(response: dict) -> Optional[str]: + data = response.get("data") or {} + if isinstance(data.get("report"), dict): + rid = data["report"].get("id") + if rid: + return str(rid) + for key in ("id", "reportId", "uuid", "code"): + if data.get(key): + return str(data[key]) + return None + + +def _extract_file_name(response: dict) -> str: + data = response.get("data") or {} + rep = data.get("report") or data + return str(rep.get("filename") or rep.get("fileName") or "") + + +def _find_download_url(obj: dict) -> Optional[str]: + for key in ("fileLink", "url", "downloadUrl", "fileUrl", "s3Url", "link", + "presignedUrl", "signedUrl"): + if obj.get(key): + return str(obj[key]) + return None + + +def _fetch_report_by_id( + session: requests.Session, auth: Auth, report_id: str +) -> tuple[Optional[str], str]: + """Retorna (download_url, file_name) buscando o relatório pelo id.""" + r = session.get(f"{REPORTS_URL}/{report_id}", headers=_headers(auth), timeout=30) + if r.status_code != 200: + return None, "" + body = r.json() + data = body.get("data") or {} + rep = data.get("report") or data + url = _find_download_url(rep) or _find_download_url(data) or _find_download_url(body) + name = str(rep.get("fileName") or rep.get("filename") or "") + return url, name + + +def _poll_until_ready( + session: requests.Session, + auth: Auth, + report_id: str, +) -> tuple[str, str]: + """ + Consulta GET /reports?type=F360&dataType=RECEBIVEIS a cada POLL_INTERVAL_S segundos. + Quando o relatório chegar em GENERATED, busca o fileLink via GET /reports/{id}. + Retorna (download_url, file_name). + """ + deadline = time.monotonic() + POLL_TIMEOUT_S + attempt = 0 + while time.monotonic() < deadline: + attempt += 1 + raw = session.get( + REPORTS_URL, + params={"type": REPORT_TYPE, "dataType": DATA_TYPE}, + headers=_headers(auth), + timeout=30, + ).json() + + reports = raw.get("data", {}).get("reports") or [] + for rep in reports: + if str(rep.get("id") or "") != report_id: + continue + status = str(rep.get("status") or "").strip().lower() + if status in _FAIL_STATUSES: + raise RuntimeError(f"Relatório falhou: {rep}") + if status in _DONE_STATUSES: + url, file_name = _fetch_report_by_id(session, auth, report_id) + if url: + print(f"[poll] pronto após {attempt} tentativa(s) (status={status})") + return url, file_name + print(f"[poll] tentativa {attempt} status={status}, aguardando {POLL_INTERVAL_S}s...") + break + else: + print(f"[poll] tentativa {attempt} relatório não listado ainda, " + f"aguardando {POLL_INTERVAL_S}s...") + + time.sleep(POLL_INTERVAL_S) + + raise TimeoutError(f"Relatório não ficou pronto em {POLL_TIMEOUT_S}s") + + +def _download_csv(url: str) -> str: + print("[download] baixando CSV do S3...") + r = requests.get(url, timeout=180) + r.raise_for_status() + for enc in ("utf-8-sig", "utf-8", "latin-1"): + try: + return r.content.decode(enc) + except UnicodeDecodeError: + continue + return r.content.decode("latin-1", errors="replace") + + +# ─── SQL ────────────────────────────────────────────────────────────────────── + +def _sql_conn_str() -> str: + s = SQL_CONN.rstrip(";") + if not re.search(r"(?i)\bEncrypt\b", s): + s += ";Encrypt=no" + if not re.search(r"(?i)\bTrustServerCertificate\b", s): + s += ";TrustServerCertificate=yes" + return s + ";" + + +def _safe_col(name: str) -> str: + name = re.sub(r"[^a-zA-Z0-9_]", "_", name.strip()) + if name and name[0].isdigit(): + name = "c_" + name + return name[:128] or "col" + + +def _ensure_tables(cur, csv_cols: list[str]) -> list[str]: + """Garante que as tabelas de dados e log existam, com todas as colunas do CSV.""" + # tabela de log de importações (controle de duplicatas) + cur.execute(""" +IF OBJECT_ID('dbo.Grgb_vendas_import_log', 'U') IS NULL +BEGIN + CREATE TABLE dbo.Grgb_vendas_import_log ( + Id INT IDENTITY(1,1) PRIMARY KEY, + FileName VARCHAR(120) NOT NULL UNIQUE, + PeriodoInicio DATE NULL, + PeriodoFim DATE NULL, + TotalLinhas INT NOT NULL DEFAULT 0, + ImportadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() + ) +END +""") + + # tabela de dados + cur.execute(""" +IF OBJECT_ID('dbo.Grgb_vendas_report', 'U') IS NULL +BEGIN + CREATE TABLE dbo.Grgb_vendas_report ( + Id INT IDENTITY(1,1) PRIMARY KEY, + NomeArquivo VARCHAR(120) NULL, + CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() + ) +END +""") + # garante coluna NomeArquivo + cur.execute(""" +SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS +WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report' +""") + existing = {r[0] for r in cur.fetchall()} + if "nomearquivo" not in existing: + cur.execute("ALTER TABLE dbo.Grgb_vendas_report ADD [NomeArquivo] VARCHAR(120) NULL") + + safe_cols = [_safe_col(c) for c in csv_cols] + # recarrega colunas existentes + cur.execute(""" +SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS +WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report' +""") + existing = {r[0] for r in cur.fetchall()} + for col in safe_cols: + if col.lower() not in existing: + cur.execute( + f"ALTER TABLE dbo.Grgb_vendas_report ADD [{col}] NVARCHAR(500) NULL" + ) + print(f"[schema] coluna adicionada: {col}") + + return safe_cols + + +def _is_already_imported(cur, file_name: str) -> bool: + cur.execute( + "SELECT 1 FROM dbo.Grgb_vendas_import_log WHERE FileName = ?", file_name + ) + return cur.fetchone() is not None + + +def _import_csv( + rows: list[dict], + csv_cols: list[str], + file_name: str, + start_date: str, + end_date: str, +) -> int: + import pyodbc + + conn_str = _sql_conn_str() + total = len(rows) + + # --- conexão inicial: garante schema e checa log --- + cn = pyodbc.connect(conn_str, timeout=60) + cn.autocommit = False + cur = cn.cursor() + safe_cols = _ensure_tables(cur, csv_cols) + cn.commit() + + if _is_already_imported(cur, file_name): + print(f"[skip] {file_name} ja importado anteriormente") + cur.close(); cn.close() + return 0 + + # limpa linhas parciais de execuções que falharam no meio + cur.execute("DELETE FROM dbo.Grgb_vendas_report WHERE NomeArquivo = ?", file_name) + cn.commit() + cur.close(); cn.close() + + # --- insere em lotes para não derrubar a conexão --- + all_cols = ["NomeArquivo"] + safe_cols + col_list = ", ".join(f"[{c}]" for c in all_cols) + placeholders = ", ".join("?" for _ in all_cols) + sql = ( + f"INSERT INTO dbo.Grgb_vendas_report ({col_list}) " + f"VALUES ({placeholders})" + ) + + inserted = 0 + for batch_start in range(0, total, IMPORT_BATCH_SIZE): + batch_rows = rows[batch_start: batch_start + IMPORT_BATCH_SIZE] + batch = [ + (file_name,) + tuple( + (str(row.get(orig, "") or "")[:500] or None) + for orig in csv_cols + ) + for row in batch_rows + ] + + cn = pyodbc.connect(conn_str, timeout=60) + cn.autocommit = False + cur = cn.cursor() + cur.fast_executemany = True + try: + cur.executemany(sql, batch) + cn.commit() + inserted += len(batch) + print(f"[sql] {inserted}/{total} linhas arquivo={file_name}") + except Exception: + cn.rollback() + raise + finally: + cur.close(); cn.close() + + # registra no log de controle + cn = pyodbc.connect(conn_str, timeout=60) + cn.autocommit = False + cur = cn.cursor() + try: + cur.execute( + """ +INSERT INTO dbo.Grgb_vendas_import_log + (FileName, PeriodoInicio, PeriodoFim, TotalLinhas) +VALUES (?, ?, ?, ?) + """, + file_name, start_date, end_date, inserted, + ) + cn.commit() + except Exception: + cn.rollback() + raise + finally: + cur.close(); cn.close() + + print(f"[sql] importacao concluida: {inserted} linhas arquivo={file_name}") + return inserted + + +# ─── MAIN ───────────────────────────────────────────────────────────────────── + +def _resolve_date_range() -> tuple[str, str]: + """Sempre importa D-1. Antes das 9h30 BRT, dados ainda não disponíveis.""" + from datetime import datetime, timezone, timedelta as td + BRT = timezone(td(hours=-3)) + now_brt = datetime.now(tz=BRT) + cutoff = now_brt.replace(hour=9, minute=30, second=0, microsecond=0) + + if now_brt < cutoff: + print("[info] antes das 9h30 BRT — dados de D-1 ainda nao disponiveis.") + return "", "" + + yesterday = (now_brt - td(days=1)).date().isoformat() + return yesterday, yesterday + + +def _build_chunks(start_date: str, end_date: str) -> list[tuple[str, str]]: + """Divide o intervalo em janelas de MAX_DAYS_PER_CHUNK dias.""" + chunks = [] + cursor = date.fromisoformat(start_date) + end = date.fromisoformat(end_date) + step = timedelta(days=MAX_DAYS_PER_CHUNK - 1) + while cursor <= end: + chunk_end = min(cursor + step, end) + chunks.append((cursor.isoformat(), chunk_end.isoformat())) + cursor = chunk_end + timedelta(days=1) + return chunks + + +def main() -> None: + start_date, end_date = _resolve_date_range() + + if not start_date: + print("[info] dados já estão atualizados até ontem, nada a importar.") + return + + print(f"[info] período a importar: {start_date} .. {end_date}") + + session = requests.Session() + session.trust_env = False + auth = Auth(session) + + # resolve mediator codes + if USE_ALL_MEDIATORS: + mediator_codes = ALL_MEDIATOR_CODES + print(f"[info] usando todos os mediadores ({len(mediator_codes) - 1} lojas + checkAll)") + else: + mediator_codes = list(MEDIATOR_CODES) + + if not mediator_codes: + print("[erro] nenhum mediator code disponível, abortando.") + return + + chunks = _build_chunks(start_date, end_date) + print(f"[info] {len(chunks)} janela(s) de até {MAX_DAYS_PER_CHUNK} dias") + + total_linhas = 0 + last_ok_end = _load_watermark() if INCREMENTAL else "" + + for i, (chunk_start, chunk_end) in enumerate(chunks, 1): + print(f"\n[chunk {i}/{len(chunks)}] {chunk_start} .. {chunk_end}") + + report_id = _create_report(session, auth, mediator_codes, chunk_start, chunk_end) + + download_url, file_name = _poll_until_ready(session, auth, report_id) + print(f"[info] fileName={file_name.encode('ascii','replace').decode()}") + + csv_text = _download_csv(download_url) + reader = csv.DictReader(io.StringIO(csv_text)) + csv_cols = list(reader.fieldnames or []) + rows = list(reader) + print(f"[csv] {len(rows)} linhas {len(csv_cols)} colunas") + + if not rows: + print("[info] chunk vazio.") + elif not WRITE_SQL: + print("[info] WRITE_SQL=False — primeiras 2 linhas:") + for r in rows[:2]: + print(json.dumps(r, ensure_ascii=False)) + else: + n = _import_csv(rows, csv_cols, file_name, chunk_start, chunk_end) + total_linhas += n + + if INCREMENTAL: + _save_watermark(chunk_end) + last_ok_end = chunk_end + + print(f"\n[fim] total={total_linhas} linhas watermark={last_ok_end}") + + +if __name__ == "__main__": + main()