diff --git a/__pycache__/installments_backfill_vd.cpython-313.pyc b/__pycache__/installments_backfill_vd.cpython-313.pyc new file mode 100644 index 0000000..166d228 Binary files /dev/null and b/__pycache__/installments_backfill_vd.cpython-313.pyc differ diff --git a/__pycache__/installments_reader.cpython-313.pyc b/__pycache__/installments_reader.cpython-313.pyc index a973ba2..f2dc77c 100644 Binary files a/__pycache__/installments_reader.cpython-313.pyc and b/__pycache__/installments_reader.cpython-313.pyc differ diff --git a/__pycache__/trf.cpython-313.pyc b/__pycache__/trf.cpython-313.pyc new file mode 100644 index 0000000..5732a2f Binary files /dev/null and b/__pycache__/trf.cpython-313.pyc differ diff --git a/installments_backfill_vd.py b/installments_backfill_vd.py new file mode 100644 index 0000000..1f0d972 --- /dev/null +++ b/installments_backfill_vd.py @@ -0,0 +1,48 @@ +import os +from datetime import date + +import installments_reader + + +def _setdefault_env(key: str, value: str) -> None: + if not os.getenv(key): + os.environ[key] = value + + +def main() -> None: + today = date.today() + start_of_year = date(today.year, 1, 1) + + defaults = { + "STORE_CHANNEL": "VD", + "START_INSTALLMENT_CHANGE_DATE": start_of_year.isoformat(), + "END_INSTALLMENT_CHANGE_DATE": today.isoformat(), + "CHUNK_DAYS": "30", + "LAST_N_DAYS": "", + "STORE_WORKERS": "1", + "GROUP_WORKERS": "1", + "WRITE_SQL": "1", + "SAVE_JSON": "0", + "INCREMENTAL_MODE": "0", + "LOG_GROUP_CODES": "0", + "INSTALLMENTS_PAGE_SIZE": "50", + "FLUSH_EVERY_PAGES": "50", + "INSTALLMENTS_MIN_INTERVAL_MS": "3500", + "INSTALLMENTS_429_MIN_WAIT_SEC": "45", + "THROTTLE_RECOVERY_PAUSE_SEC": "900", + } + for key, value in defaults.items(): + _setdefault_env(key, value) + + print( + "[backfill] modo=VD " + f"periodo={os.environ['START_INSTALLMENT_CHANGE_DATE']}..{os.environ['END_INSTALLMENT_CHANGE_DATE']} " + f"chunk_days={os.environ['CHUNK_DAYS']} write_sql={os.environ['WRITE_SQL']} " + f"min_interval_ms={os.environ['INSTALLMENTS_MIN_INTERVAL_MS']} " + f"wait429_min_s={os.environ['INSTALLMENTS_429_MIN_WAIT_SEC']}" + ) + installments_reader.main() + + +if __name__ == "__main__": + main() diff --git a/installments_by_order.py b/installments_by_order.py new file mode 100644 index 0000000..d6299c0 --- /dev/null +++ b/installments_by_order.py @@ -0,0 +1,185 @@ +""" +Consulta a API de parcelas por installmentGroupCode (orderNumber) e grava +em DocPedidos + DocPedidosParcelas. + +Fonte dos códigos: SELECT orderNumber FROM Grgb_fiscal_invoices_items +""" +import os +import time +import requests + +import installments_reader +from installments_reader import Auth, get_installments_page, upsert_doc_pedidos_sqlserver + +# ─── CONFIGURE AQUI ─────────────────────────────────────────────────────────── +WRITE_SQL = True # True = grava no banco | False = só lê e exibe +SKIP_EXISTING = True # True = pula códigos já gravados em DocPedidos +BATCH_SIZE = 10 # grava no banco a cada N pedidos processados +MIN_INTERVAL_MS = 3500 # ms mínimo entre chamadas à API +# ────────────────────────────────────────────────────────────────────────────── + +SQL_CONN = ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" +) + + +def _get_order_numbers() -> list[str]: + import pyodbc + cn = pyodbc.connect(SQL_CONN, timeout=30) + cur = cn.cursor() + cur.execute( + "SELECT DISTINCT [orderNumber] " + "FROM [GINSENG].[dbo].[Grgb_fiscal_invoices_items] " + "WHERE [orderNumber] IS NOT NULL AND LTRIM(RTRIM(CAST([orderNumber] AS VARCHAR))) <> ''" + ) + return [str(row[0]).strip() for row in cur.fetchall()] + + +def _get_existing_group_codes() -> set[str]: + import pyodbc + try: + cn = pyodbc.connect(SQL_CONN, timeout=30) + cur = cn.cursor() + cur.execute("SELECT InstallmentGroupCode FROM dbo.DocPedidos") + return {str(row[0]).strip() for row in cur.fetchall()} + except Exception: + return set() + + +def _fetch_all_pages(session: requests.Session, auth: Auth, group_code: str) -> list: + all_items = [] + page = 1 + while True: + body = get_installments_page( + session=session, + auth=auth, + start_date=None, + end_date=None, + installment_change=None, + mediator_code=None, + page=page, + installment_group_code=group_code, + cookie_header=None, + ) + items = ((body.get("data") or {}).get("installments")) or [] + all_items.extend(items) + + pagination = (body.get("data") or {}).get("pagination") or {} + total = int(pagination.get("total") or 0) + limit = int(pagination.get("limit") or len(items) or 10) + total_pages = max(1, (total + limit - 1) // limit) if total else page + + print(f" pagina={page}/{total_pages} itens={len(items)}") + if not items or page >= total_pages: + break + page += 1 + + return all_items + + +def _flush(batch: list, label: str = "") -> dict: + if not batch or not WRITE_SQL: + return {} + stats = upsert_doc_pedidos_sqlserver(batch, SQL_CONN) + print( + f"[sql] {label} " + f"pedidos={stats.get('pedidos', 0)} parcelas={stats.get('parcelas', 0)}" + ) + return stats + + +def main() -> None: + os.environ.setdefault("INSTALLMENTS_MIN_INTERVAL_MS", str(MIN_INTERVAL_MS)) + os.environ.setdefault("INSTALLMENTS_429_MIN_WAIT_SEC", "45") + os.environ.setdefault("THROTTLE_RECOVERY_PAUSE_SEC", "900") + + print("[info] buscando orderNumbers no banco...") + order_numbers = _get_order_numbers() + print(f"[info] total encontrado: {len(order_numbers)}") + + existing: set[str] = set() + if SKIP_EXISTING: + existing = _get_existing_group_codes() + print(f"[info] já em DocPedidos: {len(existing)} — serão pulados") + + pending = [c for c in order_numbers if c not in existing] + print(f"[info] para consultar na API: {len(pending)}\n") + + session = requests.Session() + session.trust_env = False + auth = Auth(session) + + batch: list = [] + batch_first_idx: int = 0 + batch_first_code: str = "" + total_pedidos = 0 + total_parcelas = 0 + erros = 0 + start_ts = time.monotonic() + + def _fmt(s: float) -> str: + s = int(s) + if s < 60: + return f"{s}s" + if s < 3600: + return f"{s // 60}m:{s % 60:02d}s" + return f"{s // 3600}h:{(s % 3600) // 60:02d}m" + + for idx, group_code in enumerate(pending, 1): + elapsed = time.monotonic() - start_ts + avg_s = elapsed / idx + restante = avg_s * (len(pending) - idx) + print(f"[{idx}/{len(pending)}] group_code={group_code} " + f"decorrido={_fmt(elapsed)} restante~{_fmt(restante)}") + try: + items = _fetch_all_pages(session, auth, group_code) + if not items: + print(" sem dados na API") + continue + + if not batch: + batch_first_idx = idx + batch_first_code = group_code + + batch.append({ + "installmentGroupCode": group_code, + "rawResponse": {"data": {"installments": items}}, + "installments": items, + }) + + if len(batch) >= BATCH_SIZE: + label = (f"inserindo consultas {batch_first_idx}..{idx} " + f"(group {batch_first_code} até {group_code})") + stats = _flush(batch, label) + total_pedidos += stats.get("pedidos", 0) + total_parcelas += stats.get("parcelas", 0) + batch = [] + batch_first_idx = 0 + batch_first_code = "" + + except Exception as exc: + print(f" [erro] {exc}") + erros += 1 + + # flush final + if batch: + label = (f"inserindo consultas {batch_first_idx}..{len(pending)} " + f"(group {batch_first_code} até {batch[-1]['installmentGroupCode']})") + else: + label = "flush final" + stats = _flush(batch, label) + total_pedidos += stats.get("pedidos", 0) + total_parcelas += stats.get("parcelas", 0) + + print( + f"\n[fim] pedidos={total_pedidos} parcelas={total_parcelas} erros={erros}" + ) + + +if __name__ == "__main__": + main() diff --git a/installments_reader.py b/installments_reader.py index 9547fe7..717bb97 100644 --- a/installments_reader.py +++ b/installments_reader.py @@ -3,6 +3,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import json import os import re +import threading import time from dataclasses import dataclass from datetime import date, timedelta @@ -14,10 +15,27 @@ import requests TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" INSTALLMENTS_URL = ( - "https://bff-credit-container-portal-apigw.produto-financeiro.grupoboticario.digital/" + "https://bff-credit-container-portal-apigw.prd.produto-financeiro.app.grupoboticario.com.br/" "v1/franchisee/installments" ) +_RATE_LIMIT_LOCK = threading.Lock() +_LAST_INSTALLMENTS_REQUEST_TS = 0.0 + + +def _set_global_backoff(wait_s: float) -> None: + """After a 429, push the shared timestamp forward so ALL threads pause.""" + global _LAST_INSTALLMENTS_REQUEST_TS + min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip() + try: + min_interval_s = int(min_interval_ms_raw or "0") / 1000.0 + except Exception: + min_interval_s = 0.0 + with _RATE_LIMIT_LOCK: + target = time.monotonic() + wait_s - min_interval_s + if target > _LAST_INSTALLMENTS_REQUEST_TS: + _LAST_INSTALLMENTS_REQUEST_TS = target + def _jwt_payload(jwt_token: str) -> Dict[str, Any]: parts = jwt_token.split(".") @@ -65,12 +83,40 @@ class Auth: self.cache = TokenCache() +def _apply_installments_pacing() -> None: + min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip() + try: + min_interval_ms = int(min_interval_ms_raw or "0") + except Exception: + min_interval_ms = 0 + if min_interval_ms <= 0: + return + + min_interval_s = float(min_interval_ms) / 1000.0 + global _LAST_INSTALLMENTS_REQUEST_TS + while True: + with _RATE_LIMIT_LOCK: + now = time.monotonic() + wait_s = (_LAST_INSTALLMENTS_REQUEST_TS + min_interval_s) - now + if wait_s <= 0: + _LAST_INSTALLMENTS_REQUEST_TS = now + return + time.sleep(min(wait_s, 1.0)) + + def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]: h = { "Authorization": auth.get_bearer(), - "Accept": "application/json, text/plain, */*", + "Accept": "*/*", + "Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8,en-US;q=0.7", + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Content-Type": "application/json", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site", "User-Agent": "Mozilla/5.0", } if cookie_header: @@ -88,8 +134,11 @@ def get_installments_page( page: int, installment_group_code: Optional[str] = None, cookie_header: Optional[str] = None, + page_size: Optional[int] = None, ) -> Dict[str, Any]: params: Dict[str, Any] = {"page": page} + if page_size and page_size > 0: + params["limit"] = page_size if installment_group_code: params["installmentGroupCode"] = installment_group_code else: @@ -105,6 +154,7 @@ def get_installments_page( max_attempts = 8 transient_status = {429, 500, 502, 503, 504} for attempt in range(max_attempts): + _apply_installments_pacing() r = session.get( INSTALLMENTS_URL, headers=_headers(auth, cookie_header=cookie_header), @@ -124,6 +174,21 @@ def get_installments_page( break if r.status_code in transient_status: wait_s = min(30, 2 ** min(5, attempt)) + if r.status_code == 429: + min_429_wait_raw = os.getenv("INSTALLMENTS_429_MIN_WAIT_SEC", "3").strip() + try: + min_429_wait = float(min_429_wait_raw or "3") + except Exception: + min_429_wait = 3.0 + retry_after = r.headers.get("Retry-After") or r.headers.get("retry-after") + if retry_after: + try: + min_429_wait = max(min_429_wait, float(retry_after)) + print(f"[info] Retry-After da API: {retry_after}s") + except Exception: + pass + wait_s = max(wait_s, min_429_wait) + _set_global_backoff(wait_s) print( f"[warn] erro temporario {r.status_code} no installments " f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..." @@ -142,6 +207,13 @@ def get_installments_page( f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) if r.status_code in transient_status: + if r.status_code == 429: + recovery_s = float(os.getenv("THROTTLE_RECOVERY_PAUSE_SEC", "900")) + print( + f"[throttle-recovery] 429 persistente apos {max_attempts} tentativas. " + f"Aplicando cooldown global de {int(recovery_s)}s para proxima loja..." + ) + _set_global_backoff(recovery_s) raise RuntimeError( f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" @@ -151,14 +223,31 @@ def get_installments_page( f"400 Bad Request em installments. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) + if r.status_code == 412: + raise RuntimeError( + "412 Precondition Failed em installments. " + "A API recusou a pre-condicao da requisicao (normalmente cabecalhos/sessao). " + f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" + ) r.raise_for_status() return r.json() -def get_store_codes(session: requests.Session, auth: Auth, cookie_header: Optional[str]) -> list[int]: +def get_store_codes( + session: requests.Session, + auth: Auth, + cookie_header: Optional[str], + allowed_channels: Optional[set[str]] = None, +) -> list[int]: r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30) r.raise_for_status() data = r.json().get("data") or [] + if allowed_channels: + data = [ + x + for x in data + if str(x.get("channel") or "").strip().casefold() in allowed_channels + ] out = sorted({int(x.get("code")) for x in data if x.get("code") is not None}) return out @@ -453,21 +542,37 @@ WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders}) def main() -> None: today = date.today() - last_days_env = os.getenv("LAST_N_DAYS", "5").strip() - last_n_days = int(last_days_env) if last_days_env else None - rolling_start = (today - timedelta(days=last_n_days)).isoformat() if last_n_days is not None else None - default_start = date(today.year, 1, 1).isoformat() - default_end = today.isoformat() start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE") start_date_fixed = (start_date_env or "").strip() or None + last_days_env = os.getenv("LAST_N_DAYS", "5").strip() + last_n_days = int(last_days_env) if last_days_env else None + rolling_start = None + if (not start_date_fixed) and (last_n_days is not None): + rolling_start = (today - timedelta(days=last_n_days)).isoformat() + chunk_days_env = os.getenv("CHUNK_DAYS", "0").strip() + chunk_days = int(chunk_days_env) if chunk_days_env else 0 + if chunk_days < 0: + chunk_days = 0 + default_start = date(today.year, 1, 1).isoformat() + default_end = today.isoformat() end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end) installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO" first_page = int(os.getenv("PAGE_START", "1")) max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000")) + page_size_env = os.getenv("INSTALLMENTS_PAGE_SIZE", "").strip() + page_size = int(page_size_env) if page_size_env else None + flush_every_pages = int(os.getenv("FLUSH_EVERY_PAGES", "50")) # Padrao organizado: uma loja por vez (logs nao ficam intercalados). store_workers = int(os.getenv("STORE_WORKERS", "1")) - group_workers = int(os.getenv("GROUP_WORKERS", "4")) cookie_header = os.getenv("EXTRANET_COOKIE") + store_channel_env = os.getenv("STORE_CHANNEL", "VD").strip() + allowed_channels = { + c.strip().casefold() + for c in re.split(r"[,\s;]+", store_channel_env) + if c.strip() + } + if "all" in allowed_channels or "*" in allowed_channels: + allowed_channels = set() target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip() target_mediator = int(target_mediator_env) if target_mediator_env else None resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip() @@ -494,7 +599,15 @@ def main() -> None: session.trust_env = False auth = Auth(session) - stores = get_store_codes(session, auth, cookie_header) + stores = get_store_codes( + session, + auth, + cookie_header, + allowed_channels=allowed_channels or None, + ) + if allowed_channels: + channels_txt = ",".join(sorted(c.upper() for c in allowed_channels)) + print(f"[info] filtro de canal ativo: STORE_CHANNEL={channels_txt} (lojas={len(stores)})") if target_mediator is not None: stores = [target_mediator] print(f"[info] consulta focada na loja {target_mediator}") @@ -509,8 +622,10 @@ def main() -> None: print(f"[info] incremental ativo com watermark em {watermark_file}") else: print("[info] incremental desativado") - if last_n_days is not None: + if rolling_start is not None and last_n_days is not None: print(f"[info] janela movel ativa: ultimos {last_n_days} dias ({rolling_start}..{end_date})") + if chunk_days > 0: + print(f"[info] fatiamento de periodo ativo: CHUNK_DAYS={chunk_days}") authorized: list[int] = [] unauthorized: list[int] = [] @@ -531,173 +646,158 @@ def main() -> None: local_session = requests.Session() local_session.trust_env = False local_auth = Auth(local_session) - store_bearer = local_auth.get_bearer() - try: print( f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}" ) - page = first_page - all_installments: List[Dict[str, Any]] = [] + store_start_obj = _parse_iso_date(store_start_date) + store_end_obj = _parse_iso_date(end_date) + if not store_start_obj or not store_end_obj: + raise RuntimeError( + f"Periodo invalido para consulta. start={store_start_date} end={end_date}" + ) + if store_start_obj > store_end_obj: + raise RuntimeError( + f"Periodo invalido para consulta. start={store_start_date} end={end_date}" + ) + + windows: List[tuple[str, str]] = [] + if chunk_days > 0: + cursor = store_start_obj + step_days = max(1, chunk_days) + while cursor <= store_end_obj: + win_end = min(store_end_obj, cursor + timedelta(days=step_days - 1)) + windows.append((cursor.isoformat(), win_end.isoformat())) + cursor = win_end + timedelta(days=1) + else: + windows = [(store_start_obj.isoformat(), store_end_obj.isoformat())] + + all_group_codes_seen: set = set() total_api = 0 - total_pages = 1 - while True: - try: - body = get_installments_page( - session=local_session, - auth=local_auth, - start_date=store_start_date, - end_date=end_date, - installment_change=installment_change, - mediator_code=mediator_code, - page=page, - cookie_header=cookie_header, - ) - except Exception as e: - msg = str(e) - if "400 Bad Request" in msg and installment_change: + pages_fetched_total = 0 + total_sql_pedidos = 0 + total_sql_parcelas = 0 + + def _flush(buffer: List[Dict[str, Any]], label: str) -> None: + """Agrupa itens do scan por installmentGroupCode e persiste no banco.""" + nonlocal total_sql_pedidos, total_sql_parcelas + if not buffer: + return + groups: Dict[str, List[Dict[str, Any]]] = {} + for item in _dedupe_installments(buffer): + gc = str(item.get("installmentGroupCode") or "").strip() + if not gc or gc in all_group_codes_seen: + continue + groups.setdefault(gc, []).append(item) + if not groups: + return + all_group_codes_seen.update(groups.keys()) + if log_group_codes: + for gc in groups: + print(f"[grupo] loja={mediator_code} {label} installmentGroupCode={gc}") + print(f"[flush] loja={mediator_code} {label} grupos_novos={len(groups)}") + if write_sql: + sql_rows: List[Dict[str, Any]] = [] + for gc, items in groups.items(): + sql_rows.append({ + "mediatorCode": mediator_code, + "installmentGroupCode": gc, + "rawResponse": {"data": {"installments": items}}, + "installments": items, + }) + if sql_rows: + stats = upsert_doc_pedidos_sqlserver(sql_rows, sql_conn, mediator_code_log=mediator_code) + total_sql_pedidos += stats.get("pedidos", 0) + total_sql_parcelas += stats.get("parcelas", 0) print( - f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; " - "tentando sem installmentChange" + f"[sql-flush] loja={mediator_code} {label} " + f"pedidos_upsert={stats.get('pedidos')} parcelas_upsert={stats.get('parcelas')}" ) + + for window_idx, (window_start, window_end) in enumerate(windows, start=1): + print( + f"[consulta-janela] loja={mediator_code} janela={window_idx}/{len(windows)} " + f"periodo={window_start}..{window_end}" + ) + + page = first_page + window_total = 0 + window_total_pages = 1 + window_pages_fetched = 0 + flush_buffer: List[Dict[str, Any]] = [] + while True: + try: body = get_installments_page( session=local_session, auth=local_auth, - start_date=store_start_date, - end_date=end_date, - installment_change=None, + start_date=window_start, + end_date=window_end, + installment_change=installment_change, mediator_code=mediator_code, page=page, cookie_header=cookie_header, + page_size=page_size, ) - else: - raise - installments_page = (((body.get("data") or {}).get("installments")) or []) - all_installments.extend(installments_page) - pagination = ((body.get("data") or {}).get("pagination") or {}) - total_api = int(pagination.get("total") or len(all_installments)) - limit = int(pagination.get("limit") or len(installments_page) or 1) - total_pages = max(1, (total_api + limit - 1) // limit) if total_api else page - print( - f"[pagina-loja] loja={mediator_code} pagina={page}/{total_pages} " - f"itens_pagina={len(installments_page)}" - ) - if not installments_page: - break - if page >= total_pages: - break - if page - first_page + 1 >= max_pages_per_query: + except Exception as e: + msg = str(e) + if "400 Bad Request" in msg and installment_change: + print( + f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; " + "tentando sem installmentChange" + ) + body = get_installments_page( + session=local_session, + auth=local_auth, + start_date=window_start, + end_date=window_end, + installment_change=None, + mediator_code=mediator_code, + page=page, + cookie_header=cookie_header, + page_size=page_size, + ) + else: + raise + installments_page = (((body.get("data") or {}).get("installments")) or []) + flush_buffer.extend(installments_page) + pagination = ((body.get("data") or {}).get("pagination") or {}) + window_total = int(pagination.get("total") or 0) + limit = int(pagination.get("limit") or len(installments_page) or 1) + window_total_pages = max(1, (window_total + limit - 1) // limit) if window_total else page + window_pages_fetched += 1 print( - f"[stop] loja={mediator_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" + f"[pagina-loja] loja={mediator_code} janela={window_idx}/{len(windows)} " + f"pagina={page}/{window_total_pages} itens_pagina={len(installments_page)}" ) - break - page += 1 + if flush_every_pages > 0 and window_pages_fetched % flush_every_pages == 0: + _flush(flush_buffer, f"janela={window_idx}/{len(windows)} pagina={page}") + flush_buffer = [] + if not installments_page: + break + if page >= window_total_pages: + break + if page - first_page + 1 >= max_pages_per_query: + print( + f"[stop] loja={mediator_code} janela={window_idx}/{len(windows)} " + f"limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" + ) + break + page += 1 + + # Flush do restante da janela que não atingiu o threshold + _flush(flush_buffer, f"janela={window_idx}/{len(windows)}") + + total_api += window_total + pages_fetched_total += window_pages_fetched + print( + f"[resultado-janela] loja={mediator_code} janela={window_idx}/{len(windows)} " + f"grupos_acumulados={len(all_group_codes_seen)}" + ) - installments = _dedupe_installments(all_installments) - group_codes = sorted( - { - str(item.get("installmentGroupCode")).strip() - for item in installments - if item.get("installmentGroupCode") is not None - and str(item.get("installmentGroupCode")).strip() - } - ) print( f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} " - f"itens_total_agregados={len(installments)} grupos_unicos={len(group_codes)}" + f"grupos_unicos={len(all_group_codes_seen)}" ) - if log_group_codes: - for gc in group_codes: - print(f"[grupo] loja={mediator_code} installmentGroupCode={gc}") - - pedidos: Dict[str, Any] = {} - - def fetch_group(group_code: str) -> Dict[str, Any]: - try: - group_session = requests.Session() - group_session.trust_env = False - group_auth = Auth(group_session) - # Reutiliza o token da loja para evitar nova ida ao endpoint /api/tokens por grupo. - group_auth.override_bearer = store_bearer - group_page = first_page - all_group_installments: List[Dict[str, Any]] = [] - group_total = 0 - group_total_pages = 1 - while True: - group_body = get_installments_page( - session=group_session, - auth=group_auth, - start_date=None, - end_date=None, - installment_change=None, - mediator_code=None, - page=group_page, - installment_group_code=group_code, - cookie_header=cookie_header, - ) - group_page_items = (((group_body.get("data") or {}).get("installments")) or []) - all_group_installments.extend(group_page_items) - group_pagination = ((group_body.get("data") or {}).get("pagination") or {}) - group_total = int(group_pagination.get("total") or len(all_group_installments)) - group_limit = int(group_pagination.get("limit") or len(group_page_items) or 1) - group_total_pages = ( - max(1, (group_total + group_limit - 1) // group_limit) if group_total else group_page - ) - print( - f"[grupo-pagina] loja={mediator_code} installmentGroupCode={group_code} " - f"pagina={group_page}/{group_total_pages} itens_pagina={len(group_page_items)}" - ) - if not group_page_items: - break - if group_page >= group_total_pages: - break - if group_page - first_page + 1 >= max_pages_per_query: - print( - f"[stop] grupo={group_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" - ) - break - group_page += 1 - - group_installments = _dedupe_installments(all_group_installments) - count = len(group_installments) - print(f"[grupo-ok] loja={mediator_code} installmentGroupCode={group_code} itens={count}") - group_body_agg = { - "data": { - "installments": group_installments, - "pagination": {"limit": count, "total": group_total or count}, - }, - "status": 200, - "message": "Success", - } - return { - "pedidoNumero": group_code, - "consulta": { - "installmentGroupCode": group_code, - "pageStart": first_page, - "pagesFetched": group_total_pages, - }, - "ok": True, - "totalItens": count, - "detalhes": group_body_agg, - } - except Exception as e: - print(f"[grupo-erro] loja={mediator_code} installmentGroupCode={group_code} erro={e}") - return { - "pedidoNumero": group_code, - "consulta": { - "installmentGroupCode": group_code, - "pageStart": first_page, - }, - "ok": False, - "error": str(e), - } - - if group_codes: - with ThreadPoolExecutor(max_workers=max(1, group_workers)) as group_pool: - group_futures = {group_pool.submit(fetch_group, gc): gc for gc in group_codes} - for fut in as_completed(group_futures): - gc = group_futures[fut] - pedidos[gc] = fut.result() store_out = { "queryWindow": { @@ -705,35 +805,19 @@ def main() -> None: "endInstallmentChangeDate": end_date, "installmentChange": installment_change, "pageStart": first_page, - "pagesFetched": total_pages, + "pagesFetched": pages_fetched_total, + "windowsFetched": len(windows), + "chunkDays": chunk_days if chunk_days > 0 else None, }, "mediatorCode": mediator_code, - "pedidosEncontradosPagina": len(group_codes), - "pedidos": pedidos, + "pedidosEncontradosPagina": len(all_group_codes_seen), + "grupos_unicos": len(all_group_codes_seen), } if write_sql: - sql_rows: List[Dict[str, Any]] = [] - for gc, pedido_payload in pedidos.items(): - if not pedido_payload.get("ok"): - continue - raw_response = pedido_payload.get("detalhes") or {} - sql_rows.append( - { - "mediatorCode": mediator_code, - "installmentGroupCode": gc, - "rawResponse": raw_response, - "installments": (((raw_response.get("data") or {}).get("installments")) or []), - } - ) - stats = upsert_doc_pedidos_sqlserver( - sql_rows, - sql_conn, - mediator_code_log=mediator_code, - ) - store_out["sqlUpsert"] = stats + store_out["sqlUpsert"] = {"pedidos": total_sql_pedidos, "parcelas": total_sql_parcelas} print( - f"[sql] loja={mediator_code} pedidos_upsert={stats.get('pedidos')} " - f"parcelas_upsert={stats.get('parcelas')}" + f"[sql] loja={mediator_code} pedidos_upsert={total_sql_pedidos} " + f"parcelas_upsert={total_sql_parcelas}" ) if save_json: store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json") @@ -781,6 +865,31 @@ def main() -> None: failed[mediator_code] = f"erro no worker: {e}" print(f"[falha] loja={mediator_code} erro no worker: {e}") + retry_wait_sec = int(os.getenv("RETRY_FAILED_WAIT_SEC", "90")) + if failed and retry_wait_sec > 0: + retry_candidates = sorted( + k for k, v in failed.items() if "429" in str(v) or "temporaria" in str(v).lower() + ) + if retry_candidates: + print( + f"[retry] {len(retry_candidates)} loja(s) falharam por throttle. " + f"Aguardando {retry_wait_sec}s antes de retentar sequencialmente..." + ) + time.sleep(retry_wait_sec) + for mc in retry_candidates: + result = process_store(mc) + status = result["status"] + if status == "authorized": + authorized.append(mc) + failed.pop(mc, None) + if incremental_mode: + watermark[str(mc)] = str(result["endDate"]) + save_watermark(watermark_file, watermark) + print(f"[retry-ok] loja={mc}") + else: + failed[mc] = str(result.get("error") or "erro desconhecido") + print(f"[retry-falha] loja={mc} erro={failed[mc]}") + print( "[resumo] " f"lojas_total={len(stores)} autorizadas={len(authorized)} " diff --git a/trf_registroerro.py b/trf_registroerro.py new file mode 100644 index 0000000..086b88c --- /dev/null +++ b/trf_registroerro.py @@ -0,0 +1,246 @@ +import json +from typing import Any, Dict, Optional, Set + +import trf + +FRANCHISES_LIST_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/franchises/list/franchise" + + +def _collect_store_codes(payload: Any) -> list[str]: + keys = ("sapCode", "code", "franchiseId", "franchiseCode", "mediatorCode", "storeCode") + out: list[str] = [] + seen = set() + + def _push(v: Any) -> None: + if v is None: + return + s = str(v).strip() + if not s: + return + if s not in seen: + seen.add(s) + out.append(s) + + def _walk(obj: Any) -> None: + if isinstance(obj, dict): + for k in keys: + if k in obj: + _push(obj.get(k)) + for v in obj.values(): + _walk(v) + return + if isinstance(obj, list): + for item in obj: + _walk(item) + + _walk(payload) + return out + + +class ClientLoja(trf.Client): + def __init__(self, store_code: str): + super().__init__() + self.store_code = str(store_code).strip() + + def get_franchises(self, only_channels: Optional[Set[str]] = None): + r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30) + if r.status_code in (401, 403): + self.auth.invalidate() + r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30) + r.raise_for_status() + + body = r.json() + source = body.get("data") if isinstance(body, dict) and "data" in body else body + lojas = _collect_store_codes(source) + if not lojas: + raise RuntimeError( + "Endpoint de franquias nao retornou codigos de loja validos. " + f"Resposta resumida: {str(body)[:500]}" + ) + + if self.store_code not in lojas: + raise RuntimeError( + f"Loja {self.store_code} nao encontrada/sem permissao no usuario. " + f"Lojas disponiveis: {', '.join(lojas[:30])}{'...' if len(lojas) > 30 else ''}" + ) + return [self.store_code] + + +def sincronizar_paginas_sqlserver_loja( + connection_string: str, + store_code: str, + cp_id: int = 10269, + document_type: str = "EFAT", + limit: int = 100, + start_offset: int = 0, + only_channels: Optional[Set[str]] = None, + commit_cada_paginas: int = 1, +) -> Dict[str, Any]: + cli = ClientLoja(store_code) + offset = start_offset + paginas = 0 + docs_persistidos = 0 + total = None + + with trf.SqlServerSink(connection_string) as sink: + sink.ensure_schema() + while True: + pagina = cli.processar_pagina( + cp_id=cp_id, + document_type=document_type, + offset=offset, + limit=limit, + only_channels=only_channels, + ) + if total is None: + total = int(pagina.get("total") or 0) + + itens = pagina.get("items") or [] + persistidos_pag, novos_pag = sink.persist_items(itens) + docs_persistidos += persistidos_pag + paginas += 1 + + if paginas % commit_cada_paginas == 0: + sink.cn.commit() + + print( + f"[sync][loja={store_code}] offset={offset} count={len(itens)} " + f"novos_pag={novos_pag} persistidos={docs_persistidos} total={total}" + ) + if not pagina.get("hasNext"): + break + offset += limit + + sink.cn.commit() + + return { + "store_code": store_code, + "total": total, + "paginas_processadas": paginas, + "documentos_persistidos": docs_persistidos, + "offset_final": offset, + } + + +def sincronizar_incremental_sqlserver_loja( + connection_string: str, + store_code: str, + cp_id: int = 10269, + document_type: str = "EFAT", + limit: int = 100, + only_channels: Optional[Set[str]] = None, + max_paginas_sem_novidade: int = 3, + max_paginas: int = 20, +) -> Dict[str, Any]: + cli = ClientLoja(store_code) + offset = 0 + paginas = 0 + docs_persistidos = 0 + docs_novos = 0 + sem_novidade = 0 + total = None + + with trf.SqlServerSink(connection_string) as sink: + sink.ensure_schema() + while True: + pagina = cli.processar_pagina( + cp_id=cp_id, + document_type=document_type, + offset=offset, + limit=limit, + only_channels=only_channels, + ) + if total is None: + total = int(pagina.get("total") or 0) + + itens = pagina.get("items") or [] + persistidos_pag, novos_pag = sink.persist_items(itens) + sink.cn.commit() + + docs_persistidos += persistidos_pag + docs_novos += novos_pag + paginas += 1 + sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1) + + print( + f"[inc][loja={store_code}] offset={offset} count={len(itens)} " + f"novos_pag={novos_pag} sem_novidade={sem_novidade}/{max_paginas_sem_novidade} " + f"total_novos={docs_novos}" + ) + + if sem_novidade >= max_paginas_sem_novidade: + break + if paginas >= max_paginas: + break + if not pagina.get("hasNext"): + break + offset += limit + + return { + "store_code": store_code, + "total": total, + "paginas_processadas": paginas, + "documentos_persistidos": docs_persistidos, + "documentos_novos": docs_novos, + "offset_final": offset, + "parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade, + } + + + + +if __name__ == "__main__": + RUN_MODE = "full" # edite: full | incremental | json + if RUN_MODE not in ("full", "incremental", "json"): + raise RuntimeError("RUN_MODE invalido. Use: full, incremental ou json.") + + TARGET_STORE_CODE = "24430" # edite: codigo da loja, ex: 20997 + store_code = str(TARGET_STORE_CODE).strip() + if not store_code: + raise RuntimeError("Preencha TARGET_STORE_CODE no codigo.") + print(f"[info] consulta focada na loja {store_code}") + + if RUN_MODE in ("full", "incremental"): + SQLSERVER_CONN = ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" + ) + + if RUN_MODE == "full": + resultado = sincronizar_paginas_sqlserver_loja( + connection_string=SQLSERVER_CONN, + store_code=store_code, + cp_id=10269, + document_type="EFAT", + limit=100, + start_offset=0, + only_channels=None, + commit_cada_paginas=1, + ) + else: + resultado = sincronizar_incremental_sqlserver_loja( + connection_string=SQLSERVER_CONN, + store_code=store_code, + cp_id=10269, + document_type="EFAT", + limit=100, + only_channels=None, + max_paginas_sem_novidade=5, + max_paginas=15, + ) + else: + c = ClientLoja(store_code) + resultado = c.processar_pagina( + cp_id=10269, + document_type="EFAT", + offset=0, + limit=25, + only_channels=None, + ) + + print(json.dumps(resultado, ensure_ascii=False, indent=2)) +