import base64 from concurrent.futures import ThreadPoolExecutor, as_completed import json import os import re import threading import time from dataclasses import dataclass from datetime import date, timedelta from typing import Any, Dict, List, Optional from urllib.parse import urlencode import requests TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" INSTALLMENTS_URL = ( "https://bff-credit-container-portal-apigw.prd.produto-financeiro.app.grupoboticario.com.br/" "v1/franchisee/installments" ) _RATE_LIMIT_LOCK = threading.Lock() _LAST_INSTALLMENTS_REQUEST_TS = 0.0 def _set_global_backoff(wait_s: float) -> None: """After a 429, push the shared timestamp forward so ALL threads pause.""" global _LAST_INSTALLMENTS_REQUEST_TS min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip() try: min_interval_s = int(min_interval_ms_raw or "0") / 1000.0 except Exception: min_interval_s = 0.0 with _RATE_LIMIT_LOCK: target = time.monotonic() + wait_s - min_interval_s if target > _LAST_INSTALLMENTS_REQUEST_TS: _LAST_INSTALLMENTS_REQUEST_TS = target def _jwt_payload(jwt_token: str) -> Dict[str, Any]: parts = jwt_token.split(".") if len(parts) != 3: return {} payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) return json.loads(raw.decode("utf-8")) @dataclass class TokenCache: bearer: Optional[str] = None exp_epoch: int = 0 def valid(self, skew_seconds: int = 30) -> bool: return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) class Auth: def __init__(self, session: requests.Session): self.s = session self.cache = TokenCache() self.override_bearer = os.getenv("EXTRANET_BEARER") def get_bearer(self, force_refresh: bool = False) -> str: if self.override_bearer: return self.override_bearer if (not force_refresh) and self.cache.valid(): return self.cache.bearer # type: ignore[return-value] r = self.s.get(TOKENS_URL, timeout=30) r.raise_for_status() body = r.json() if not body.get("success"): raise RuntimeError(f"Token API retornou success=false: {body}") bearer = body["data"][0]["token"] jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer exp = int(_jwt_payload(jwt).get("exp") or 0) self.cache = TokenCache(bearer=bearer, exp_epoch=exp) return bearer def invalidate(self) -> None: self.cache = TokenCache() def _apply_installments_pacing() -> None: min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip() try: min_interval_ms = int(min_interval_ms_raw or "0") except Exception: min_interval_ms = 0 if min_interval_ms <= 0: return min_interval_s = float(min_interval_ms) / 1000.0 global _LAST_INSTALLMENTS_REQUEST_TS while True: with _RATE_LIMIT_LOCK: now = time.monotonic() wait_s = (_LAST_INSTALLMENTS_REQUEST_TS + min_interval_s) - now if wait_s <= 0: _LAST_INSTALLMENTS_REQUEST_TS = now return time.sleep(min(wait_s, 1.0)) def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]: h = { "Authorization": auth.get_bearer(), "Accept": "*/*", "Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8,en-US;q=0.7", "Cache-Control": "no-cache", "Pragma": "no-cache", "Content-Type": "application/json", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "User-Agent": "Mozilla/5.0", } if cookie_header: h["Cookie"] = cookie_header return h def get_installments_page( session: requests.Session, auth: Auth, start_date: Optional[str], end_date: Optional[str], installment_change: Optional[str], mediator_code: Optional[int], page: int, installment_group_code: Optional[str] = None, cookie_header: Optional[str] = None, page_size: Optional[int] = None, ) -> Dict[str, Any]: params: Dict[str, Any] = {"page": page} if page_size and page_size > 0: params["limit"] = page_size if installment_group_code: params["installmentGroupCode"] = installment_group_code else: if not start_date or not end_date or mediator_code is None: raise ValueError("Para consulta por periodo, informe start/end/mediator_code.") params["endInstallmentChangeDate"] = end_date params["startInstallmentChangeDate"] = start_date params["mediatorCode"] = mediator_code if installment_change: params["installmentChange"] = installment_change r = None max_attempts = 8 transient_status = {429, 500, 502, 503, 504} for attempt in range(max_attempts): _apply_installments_pacing() r = session.get( INSTALLMENTS_URL, headers=_headers(auth, cookie_header=cookie_header), params=params, timeout=60, ) if r.status_code == 401: print( f"[warn] 401 no installments (tentativa {attempt + 1}/{max_attempts}), " "renovando token..." ) auth.invalidate() auth.get_bearer(force_refresh=True) time.sleep(min(3, attempt + 1)) continue if r.status_code == 403: break if r.status_code in transient_status: wait_s = min(30, 2 ** min(5, attempt)) if r.status_code == 429: min_429_wait_raw = os.getenv("INSTALLMENTS_429_MIN_WAIT_SEC", "3").strip() try: min_429_wait = float(min_429_wait_raw or "3") except Exception: min_429_wait = 3.0 retry_after = r.headers.get("Retry-After") or r.headers.get("retry-after") if retry_after: try: min_429_wait = max(min_429_wait, float(retry_after)) print(f"[info] Retry-After da API: {retry_after}s") except Exception: pass wait_s = max(wait_s, min_429_wait) _set_global_backoff(wait_s) print( f"[warn] erro temporario {r.status_code} no installments " f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..." ) time.sleep(wait_s) continue break assert r is not None if r.status_code == 403: payload = _jwt_payload(auth.get_bearer().split(" ", 1)[1]) stores_claim = payload.get("stores") raise RuntimeError( "403 Forbidden no installments. Possiveis causas: mediatorCode sem permissao no token " f"ou falta de Cookie CloudFront. stores no token={stores_claim}. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) if r.status_code in transient_status: if r.status_code == 429: recovery_s = float(os.getenv("THROTTLE_RECOVERY_PAUSE_SEC", "900")) print( f"[throttle-recovery] 429 persistente apos {max_attempts} tentativas. " f"Aplicando cooldown global de {int(recovery_s)}s para proxima loja..." ) _set_global_backoff(recovery_s) raise RuntimeError( f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) if r.status_code == 400: raise RuntimeError( f"400 Bad Request em installments. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) if r.status_code == 412: raise RuntimeError( "412 Precondition Failed em installments. " "A API recusou a pre-condicao da requisicao (normalmente cabecalhos/sessao). " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) r.raise_for_status() return r.json() def get_store_codes( session: requests.Session, auth: Auth, cookie_header: Optional[str], allowed_channels: Optional[set[str]] = None, ) -> list[int]: r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30) r.raise_for_status() data = r.json().get("data") or [] if allowed_channels: data = [ x for x in data if str(x.get("channel") or "").strip().casefold() in allowed_channels ] out = sorted({int(x.get("code")) for x in data if x.get("code") is not None}) return out def load_watermark(path: str) -> Dict[str, str]: try: if not os.path.exists(path): return {} with open(path, "r", encoding="utf-8") as f: raw = json.load(f) if isinstance(raw, dict): return {str(k): str(v) for k, v in raw.items()} return {} except Exception: return {} def save_watermark(path: str, data: Dict[str, str]) -> None: tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(tmp, path) def _normalize_sql_conn(conn: str) -> str: s = (conn or "").strip().rstrip(";") if not s: return s if re.search(r"(?i)\bencrypt\s*=", s): s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s) else: s += ";Encrypt=no" if not re.search(r"(?i)\btrustservercertificate\s*=", s): s += ";TrustServerCertificate=yes" return s + ";" def _parse_iso_date(value: Optional[str]): if not value: return None try: return date.fromisoformat(str(value)[:10]) except Exception: return None def _to_float(value: Any) -> Optional[float]: if value is None: return None try: return float(value) except Exception: return None def _dedupe_installments(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen = set() out: List[Dict[str, Any]] = [] for item in items: code = str(item.get("installmentCode") or "").strip() key = code or ( str(item.get("installmentGroupCode") or ""), str(item.get("installmentNumber") or ""), str(item.get("borrowerCode") or ""), str(item.get("dueDate") or ""), ) if key in seen: continue seen.add(key) out.append(item) return out def _ensure_doc_tables(cur) -> None: cur.execute( """ IF OBJECT_ID('dbo.DocPedidos', 'U') IS NULL BEGIN CREATE TABLE dbo.DocPedidos ( Id INT IDENTITY(1,1) PRIMARY KEY, InstallmentGroupCode VARCHAR(40) NOT NULL UNIQUE, MediatorCode VARCHAR(20) NULL, MediatorGroupCode VARCHAR(20) NULL, IssueDate DATE NULL, CreatedAt DATE NULL, TotalItens INT NOT NULL DEFAULT 0, ValorTotalOriginal DECIMAL(18,2) NULL, ValorTotalFee DECIMAL(18,2) NULL, PayloadJson NVARCHAR(MAX) NULL, CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() ); END """ ) cur.execute( """ IF OBJECT_ID('dbo.DocPedidosParcelas', 'U') IS NULL BEGIN CREATE TABLE dbo.DocPedidosParcelas ( Id INT IDENTITY(1,1) PRIMARY KEY, DocPedidoId INT NOT NULL, InstallmentCode VARCHAR(40) NOT NULL UNIQUE, InstallmentGroupCode VARCHAR(40) NOT NULL, InstallmentNumber INT NULL, BorrowerCode VARCHAR(40) NULL, DueDate DATE NULL, IssueDate DATE NULL, CreatedAt DATE NULL, PaymentType VARCHAR(40) NULL, OriginalAmount DECIMAL(18,2) NULL, TotalFee DECIMAL(18,2) NULL, PayloadJson NVARCHAR(MAX) NULL, CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), CONSTRAINT FK_DocPedidosParcelas_DocPedidos FOREIGN KEY (DocPedidoId) REFERENCES dbo.DocPedidos(Id) ); END """ ) def upsert_doc_pedidos_sqlserver( rows: List[Dict[str, Any]], connection_string: str, mediator_code_log: Optional[int] = None, ) -> Dict[str, int]: try: import pyodbc # type: ignore except Exception as e: raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e if not rows: return {"pedidos": 0, "parcelas": 0} cn = pyodbc.connect(_normalize_sql_conn(connection_string), timeout=30) cn.autocommit = False cur = cn.cursor() try: cn.timeout = int(os.getenv("SQL_STATEMENT_TIMEOUT_SEC", "120")) except Exception: pass pedidos_count = 0 parcelas_count = 0 try: _ensure_doc_tables(cur) total_rows = len(rows) if mediator_code_log is not None: print(f"[sql-inicio] loja={mediator_code_log} pedidos_para_upsert={total_rows}") for idx, row in enumerate(rows, start=1): group_code = str(row.get("installmentGroupCode") or "").strip() if not group_code: continue if mediator_code_log is not None: print(f"[sql-progresso] loja={mediator_code_log} pedido={idx}/{total_rows} group={group_code}") installments = row.get("installments") or [] first = installments[0] if installments else {} mediator_code = str(row.get("mediatorCode") or first.get("mediatorCode") or "").strip() or None mediator_group_code = str(first.get("mediatorGroupCode") or "").strip() or None issue_date = _parse_iso_date(first.get("issueDate")) created_at = _parse_iso_date(first.get("createdAt")) total_itens = int(len(installments)) valor_total_original = sum(_to_float(x.get("originalAmount")) or 0.0 for x in installments) valor_total_fee = sum(_to_float(x.get("totalFee")) or 0.0 for x in installments) payload_json = json.dumps(row.get("rawResponse"), ensure_ascii=False) cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code) found = cur.fetchone() if found: doc_id = int(found[0]) cur.execute( """ UPDATE dbo.DocPedidos SET MediatorCode=?, MediatorGroupCode=?, IssueDate=?, CreatedAt=?, TotalItens=?, ValorTotalOriginal=?, ValorTotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME() WHERE Id=? """, mediator_code, mediator_group_code, issue_date, created_at, total_itens, float(valor_total_original), float(valor_total_fee), payload_json, doc_id, ) else: cur.execute( """ INSERT INTO dbo.DocPedidos ( InstallmentGroupCode, MediatorCode, MediatorGroupCode, IssueDate, CreatedAt, TotalItens, ValorTotalOriginal, ValorTotalFee, PayloadJson ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, group_code, mediator_code, mediator_group_code, issue_date, created_at, total_itens, float(valor_total_original), float(valor_total_fee), payload_json, ) cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code) got = cur.fetchone() if not got: continue doc_id = int(got[0]) pedidos_count += 1 current_codes: List[str] = [] for item in installments: installment_code = str(item.get("installmentCode") or "").strip() if not installment_code: continue current_codes.append(installment_code) cur.execute("SELECT Id FROM dbo.DocPedidosParcelas WHERE InstallmentCode = ?", installment_code) par_found = cur.fetchone() if par_found: cur.execute( """ UPDATE dbo.DocPedidosParcelas SET DocPedidoId=?, InstallmentGroupCode=?, InstallmentNumber=?, BorrowerCode=?, DueDate=?, IssueDate=?, CreatedAt=?, PaymentType=?, OriginalAmount=?, TotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME() WHERE InstallmentCode=? """, doc_id, group_code, int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None, str(item.get("borrowerCode") or "")[:40] or None, _parse_iso_date(item.get("dueDate")), _parse_iso_date(item.get("issueDate")), _parse_iso_date(item.get("createdAt")), str(item.get("paymentType") or "")[:40] or None, _to_float(item.get("originalAmount")), _to_float(item.get("totalFee")), json.dumps(item, ensure_ascii=False), installment_code, ) else: cur.execute( """ INSERT INTO dbo.DocPedidosParcelas ( DocPedidoId, InstallmentCode, InstallmentGroupCode, InstallmentNumber, BorrowerCode, DueDate, IssueDate, CreatedAt, PaymentType, OriginalAmount, TotalFee, PayloadJson ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, doc_id, installment_code, group_code, int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None, str(item.get("borrowerCode") or "")[:40] or None, _parse_iso_date(item.get("dueDate")), _parse_iso_date(item.get("issueDate")), _parse_iso_date(item.get("createdAt")), str(item.get("paymentType") or "")[:40] or None, _to_float(item.get("originalAmount")), _to_float(item.get("totalFee")), json.dumps(item, ensure_ascii=False), ) parcelas_count += 1 if current_codes: placeholders = ",".join("?" for _ in current_codes) cur.execute( f""" DELETE FROM dbo.DocPedidosParcelas WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders}) """, doc_id, *current_codes, ) cn.commit() if mediator_code_log is not None: print( f"[sql-fim] loja={mediator_code_log} pedidos_upsert={pedidos_count} " f"parcelas_upsert={parcelas_count}" ) return {"pedidos": pedidos_count, "parcelas": parcelas_count} except Exception: cn.rollback() raise finally: cur.close() cn.close() def main() -> None: today = date.today() start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE") start_date_fixed = (start_date_env or "").strip() or None last_days_env = os.getenv("LAST_N_DAYS", "5").strip() last_n_days = int(last_days_env) if last_days_env else None rolling_start = None if (not start_date_fixed) and (last_n_days is not None): rolling_start = (today - timedelta(days=last_n_days)).isoformat() chunk_days_env = os.getenv("CHUNK_DAYS", "0").strip() chunk_days = int(chunk_days_env) if chunk_days_env else 0 if chunk_days < 0: chunk_days = 0 default_start = date(today.year, 1, 1).isoformat() default_end = today.isoformat() end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end) installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO" first_page = int(os.getenv("PAGE_START", "1")) max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000")) page_size_env = os.getenv("INSTALLMENTS_PAGE_SIZE", "").strip() page_size = int(page_size_env) if page_size_env else None flush_every_pages = int(os.getenv("FLUSH_EVERY_PAGES", "50")) # Padrao organizado: uma loja por vez (logs nao ficam intercalados). store_workers = int(os.getenv("STORE_WORKERS", "1")) cookie_header = os.getenv("EXTRANET_COOKIE") store_channel_env = os.getenv("STORE_CHANNEL", "VD").strip() allowed_channels = { c.strip().casefold() for c in re.split(r"[,\s;]+", store_channel_env) if c.strip() } if "all" in allowed_channels or "*" in allowed_channels: allowed_channels = set() target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip() target_mediator = int(target_mediator_env) if target_mediator_env else None resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip() resume_from_mediator = int(resume_from_env) if resume_from_env else None log_group_codes = os.getenv("LOG_GROUP_CODES", "1") == "1" output_dir = os.getenv("OUTPUT_DIR", "installments_por_loja") save_json = os.getenv("SAVE_JSON", "0") == "1" write_sql = os.getenv("WRITE_SQL", "1") == "1" watermark_file = os.getenv("WATERMARK_FILE", "installments_watermark.json") incremental_mode = os.getenv("INCREMENTAL_MODE", "1") == "1" sql_conn = os.getenv( "SQLSERVER_CONN", ( "DRIVER={ODBC Driver 17 for SQL Server};" "SERVER=10.77.77.10;" "DATABASE=GINSENG;" "UID=andrey;" "PWD=88253332;" "TrustServerCertificate=yes;" ), ) session = requests.Session() session.trust_env = False auth = Auth(session) stores = get_store_codes( session, auth, cookie_header, allowed_channels=allowed_channels or None, ) if allowed_channels: channels_txt = ",".join(sorted(c.upper() for c in allowed_channels)) print(f"[info] filtro de canal ativo: STORE_CHANNEL={channels_txt} (lojas={len(stores)})") if target_mediator is not None: stores = [target_mediator] print(f"[info] consulta focada na loja {target_mediator}") elif resume_from_mediator is not None: stores = [s for s in stores if s >= resume_from_mediator] print(f"[info] retomando a partir da loja {resume_from_mediator} (restantes={len(stores)})") stores = sorted(stores) if save_json: os.makedirs(output_dir, exist_ok=True) watermark = load_watermark(watermark_file) if incremental_mode else {} if incremental_mode: print(f"[info] incremental ativo com watermark em {watermark_file}") else: print("[info] incremental desativado") if rolling_start is not None and last_n_days is not None: print(f"[info] janela movel ativa: ultimos {last_n_days} dias ({rolling_start}..{end_date})") if chunk_days > 0: print(f"[info] fatiamento de periodo ativo: CHUNK_DAYS={chunk_days}") authorized: list[int] = [] unauthorized: list[int] = [] skipped_bad_request: list[int] = [] failed: Dict[int, str] = {} def process_store(mediator_code: int) -> Dict[str, Any]: if rolling_start: store_start_date = rolling_start else: store_start_date = start_date_fixed if not store_start_date: if incremental_mode: store_start_date = watermark.get(str(mediator_code), default_start) else: store_start_date = default_start local_session = requests.Session() local_session.trust_env = False local_auth = Auth(local_session) try: print( f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}" ) store_start_obj = _parse_iso_date(store_start_date) store_end_obj = _parse_iso_date(end_date) if not store_start_obj or not store_end_obj: raise RuntimeError( f"Periodo invalido para consulta. start={store_start_date} end={end_date}" ) if store_start_obj > store_end_obj: raise RuntimeError( f"Periodo invalido para consulta. start={store_start_date} end={end_date}" ) windows: List[tuple[str, str]] = [] if chunk_days > 0: cursor = store_start_obj step_days = max(1, chunk_days) while cursor <= store_end_obj: win_end = min(store_end_obj, cursor + timedelta(days=step_days - 1)) windows.append((cursor.isoformat(), win_end.isoformat())) cursor = win_end + timedelta(days=1) else: windows = [(store_start_obj.isoformat(), store_end_obj.isoformat())] all_group_codes_seen: set = set() total_api = 0 pages_fetched_total = 0 total_sql_pedidos = 0 total_sql_parcelas = 0 def _flush(buffer: List[Dict[str, Any]], label: str) -> None: """Agrupa itens do scan por installmentGroupCode e persiste no banco.""" nonlocal total_sql_pedidos, total_sql_parcelas if not buffer: return groups: Dict[str, List[Dict[str, Any]]] = {} for item in _dedupe_installments(buffer): gc = str(item.get("installmentGroupCode") or "").strip() if not gc or gc in all_group_codes_seen: continue groups.setdefault(gc, []).append(item) if not groups: return all_group_codes_seen.update(groups.keys()) if log_group_codes: for gc in groups: print(f"[grupo] loja={mediator_code} {label} installmentGroupCode={gc}") print(f"[flush] loja={mediator_code} {label} grupos_novos={len(groups)}") if write_sql: sql_rows: List[Dict[str, Any]] = [] for gc, items in groups.items(): sql_rows.append({ "mediatorCode": mediator_code, "installmentGroupCode": gc, "rawResponse": {"data": {"installments": items}}, "installments": items, }) if sql_rows: stats = upsert_doc_pedidos_sqlserver(sql_rows, sql_conn, mediator_code_log=mediator_code) total_sql_pedidos += stats.get("pedidos", 0) total_sql_parcelas += stats.get("parcelas", 0) print( f"[sql-flush] loja={mediator_code} {label} " f"pedidos_upsert={stats.get('pedidos')} parcelas_upsert={stats.get('parcelas')}" ) for window_idx, (window_start, window_end) in enumerate(windows, start=1): print( f"[consulta-janela] loja={mediator_code} janela={window_idx}/{len(windows)} " f"periodo={window_start}..{window_end}" ) page = first_page window_total = 0 window_total_pages = 1 window_pages_fetched = 0 flush_buffer: List[Dict[str, Any]] = [] while True: try: body = get_installments_page( session=local_session, auth=local_auth, start_date=window_start, end_date=window_end, installment_change=installment_change, mediator_code=mediator_code, page=page, cookie_header=cookie_header, page_size=page_size, ) except Exception as e: msg = str(e) if "400 Bad Request" in msg and installment_change: print( f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; " "tentando sem installmentChange" ) body = get_installments_page( session=local_session, auth=local_auth, start_date=window_start, end_date=window_end, installment_change=None, mediator_code=mediator_code, page=page, cookie_header=cookie_header, page_size=page_size, ) else: raise installments_page = (((body.get("data") or {}).get("installments")) or []) flush_buffer.extend(installments_page) pagination = ((body.get("data") or {}).get("pagination") or {}) window_total = int(pagination.get("total") or 0) limit = int(pagination.get("limit") or len(installments_page) or 1) window_total_pages = max(1, (window_total + limit - 1) // limit) if window_total else page window_pages_fetched += 1 print( f"[pagina-loja] loja={mediator_code} janela={window_idx}/{len(windows)} " f"pagina={page}/{window_total_pages} itens_pagina={len(installments_page)}" ) if flush_every_pages > 0 and window_pages_fetched % flush_every_pages == 0: _flush(flush_buffer, f"janela={window_idx}/{len(windows)} pagina={page}") flush_buffer = [] if not installments_page: break if page >= window_total_pages: break if page - first_page + 1 >= max_pages_per_query: print( f"[stop] loja={mediator_code} janela={window_idx}/{len(windows)} " f"limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" ) break page += 1 # Flush do restante da janela que não atingiu o threshold _flush(flush_buffer, f"janela={window_idx}/{len(windows)}") total_api += window_total pages_fetched_total += window_pages_fetched print( f"[resultado-janela] loja={mediator_code} janela={window_idx}/{len(windows)} " f"grupos_acumulados={len(all_group_codes_seen)}" ) print( f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} " f"grupos_unicos={len(all_group_codes_seen)}" ) store_out = { "queryWindow": { "startInstallmentChangeDate": store_start_date, "endInstallmentChangeDate": end_date, "installmentChange": installment_change, "pageStart": first_page, "pagesFetched": pages_fetched_total, "windowsFetched": len(windows), "chunkDays": chunk_days if chunk_days > 0 else None, }, "mediatorCode": mediator_code, "pedidosEncontradosPagina": len(all_group_codes_seen), "grupos_unicos": len(all_group_codes_seen), } if write_sql: store_out["sqlUpsert"] = {"pedidos": total_sql_pedidos, "parcelas": total_sql_parcelas} print( f"[sql] loja={mediator_code} pedidos_upsert={total_sql_pedidos} " f"parcelas_upsert={total_sql_parcelas}" ) if save_json: store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json") with open(store_file, "w", encoding="utf-8") as f: json.dump(store_out, f, ensure_ascii=False, indent=2) print(f"[ok] loja={mediator_code} json_salvo={store_file}") return {"status": "authorized", "mediatorCode": mediator_code, "endDate": end_date} except Exception as e: msg = str(e) if "403 Forbidden no installments" in msg: return {"status": "unauthorized", "mediatorCode": mediator_code} if "400 Bad Request em installments" in msg and ( "Combina" in msg or "invalid" in msg.lower() ): print(f"[skip-400] loja={mediator_code} filtro nao aplicavel para essa loja") return {"status": "skipped_bad_request", "mediatorCode": mediator_code} else: print(f"[falha] loja={mediator_code} erro={msg}") return {"status": "failed", "mediatorCode": mediator_code, "error": msg} with ThreadPoolExecutor(max_workers=max(1, store_workers)) as store_pool: store_futures = {store_pool.submit(process_store, mediator): mediator for mediator in stores} for fut in as_completed(store_futures): try: result = fut.result() mediator_code = int(result["mediatorCode"]) status = result["status"] if status == "authorized": authorized.append(mediator_code) if incremental_mode: watermark[str(mediator_code)] = str(result["endDate"]) save_watermark(watermark_file, watermark) print(f"[loja-fim] loja={mediator_code} status=ok") elif status == "unauthorized": unauthorized.append(mediator_code) print(f"[loja-fim] loja={mediator_code} status=nao_autorizada") elif status == "skipped_bad_request": skipped_bad_request.append(mediator_code) print(f"[loja-fim] loja={mediator_code} status=skip_400") else: failed[mediator_code] = str(result.get("error") or "erro desconhecido") print(f"[loja-fim] loja={mediator_code} status=falha") except Exception as e: mediator_code = int(store_futures[fut]) failed[mediator_code] = f"erro no worker: {e}" print(f"[falha] loja={mediator_code} erro no worker: {e}") retry_wait_sec = int(os.getenv("RETRY_FAILED_WAIT_SEC", "90")) if failed and retry_wait_sec > 0: retry_candidates = sorted( k for k, v in failed.items() if "429" in str(v) or "temporaria" in str(v).lower() ) if retry_candidates: print( f"[retry] {len(retry_candidates)} loja(s) falharam por throttle. " f"Aguardando {retry_wait_sec}s antes de retentar sequencialmente..." ) time.sleep(retry_wait_sec) for mc in retry_candidates: result = process_store(mc) status = result["status"] if status == "authorized": authorized.append(mc) failed.pop(mc, None) if incremental_mode: watermark[str(mc)] = str(result["endDate"]) save_watermark(watermark_file, watermark) print(f"[retry-ok] loja={mc}") else: failed[mc] = str(result.get("error") or "erro desconhecido") print(f"[retry-falha] loja={mc} erro={failed[mc]}") print( "[resumo] " f"lojas_total={len(stores)} autorizadas={len(authorized)} " f"nao_autorizadas={len(unauthorized)} skip_400={len(skipped_bad_request)} falhas={len(failed)} " f"store_workers={store_workers} group_workers={group_workers} " f"pasta_saida={output_dir}" ) if failed: first_code = sorted(failed.keys())[0] print(f"[falha-exemplo] loja={first_code} erro={failed[first_code]}") if __name__ == "__main__": main()