diff --git a/installments_reader.py b/installments_reader.py new file mode 100644 index 0000000..bed529b --- /dev/null +++ b/installments_reader.py @@ -0,0 +1,786 @@ +import base64 +from concurrent.futures import ThreadPoolExecutor, as_completed +import json +import os +import re +import time +from dataclasses import dataclass +from datetime import date +from typing import Any, Dict, List, Optional +from urllib.parse import urlencode + +import requests + +TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" +STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" +INSTALLMENTS_URL = ( + "https://bff-credit-container-portal-apigw.produto-financeiro.grupoboticario.digital/" + "v1/franchisee/installments" +) + + +def _jwt_payload(jwt_token: str) -> Dict[str, Any]: + parts = jwt_token.split(".") + if len(parts) != 3: + return {} + payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) + raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) + return json.loads(raw.decode("utf-8")) + + +@dataclass +class TokenCache: + bearer: Optional[str] = None + exp_epoch: int = 0 + + def valid(self, skew_seconds: int = 30) -> bool: + return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) + + +class Auth: + def __init__(self, session: requests.Session): + self.s = session + self.cache = TokenCache() + self.override_bearer = os.getenv("EXTRANET_BEARER") + + def get_bearer(self, force_refresh: bool = False) -> str: + if self.override_bearer: + return self.override_bearer + if (not force_refresh) and self.cache.valid(): + return self.cache.bearer # type: ignore[return-value] + + r = self.s.get(TOKENS_URL, timeout=30) + r.raise_for_status() + body = r.json() + if not body.get("success"): + raise RuntimeError(f"Token API retornou success=false: {body}") + + bearer = body["data"][0]["token"] + jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer + exp = int(_jwt_payload(jwt).get("exp") or 0) + self.cache = TokenCache(bearer=bearer, exp_epoch=exp) + return bearer + + def invalidate(self) -> None: + self.cache = TokenCache() + + +def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]: + h = { + "Authorization": auth.get_bearer(), + "Accept": "application/json, text/plain, */*", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + if cookie_header: + h["Cookie"] = cookie_header + return h + + +def get_installments_page( + session: requests.Session, + auth: Auth, + start_date: Optional[str], + end_date: Optional[str], + installment_change: Optional[str], + mediator_code: Optional[int], + page: int, + installment_group_code: Optional[str] = None, + cookie_header: Optional[str] = None, +) -> Dict[str, Any]: + params: Dict[str, Any] = {"page": page} + if installment_group_code: + params["installmentGroupCode"] = installment_group_code + else: + if not start_date or not end_date or mediator_code is None: + raise ValueError("Para consulta por periodo, informe start/end/mediator_code.") + params["endInstallmentChangeDate"] = end_date + params["startInstallmentChangeDate"] = start_date + params["mediatorCode"] = mediator_code + if installment_change: + params["installmentChange"] = installment_change + + r = None + max_attempts = 8 + transient_status = {429, 500, 502, 503, 504} + for attempt in range(max_attempts): + r = session.get( + INSTALLMENTS_URL, + headers=_headers(auth, cookie_header=cookie_header), + params=params, + timeout=60, + ) + if r.status_code == 401: + print( + f"[warn] 401 no installments (tentativa {attempt + 1}/{max_attempts}), " + "renovando token..." + ) + auth.invalidate() + auth.get_bearer(force_refresh=True) + time.sleep(min(3, attempt + 1)) + continue + if r.status_code == 403: + break + if r.status_code in transient_status: + wait_s = min(30, 2 ** min(5, attempt)) + print( + f"[warn] erro temporario {r.status_code} no installments " + f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..." + ) + time.sleep(wait_s) + continue + break + + assert r is not None + if r.status_code == 403: + payload = _jwt_payload(auth.get_bearer().split(" ", 1)[1]) + stores_claim = payload.get("stores") + raise RuntimeError( + "403 Forbidden no installments. Possiveis causas: mediatorCode sem permissao no token " + f"ou falta de Cookie CloudFront. stores no token={stores_claim}. " + f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" + ) + if r.status_code in transient_status: + raise RuntimeError( + f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. " + f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" + ) + if r.status_code == 400: + raise RuntimeError( + f"400 Bad Request em installments. " + f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" + ) + r.raise_for_status() + return r.json() + + +def get_store_codes(session: requests.Session, auth: Auth, cookie_header: Optional[str]) -> list[int]: + r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30) + r.raise_for_status() + data = r.json().get("data") or [] + out = sorted({int(x.get("code")) for x in data if x.get("code") is not None}) + return out + + +def load_watermark(path: str) -> Dict[str, str]: + try: + if not os.path.exists(path): + return {} + with open(path, "r", encoding="utf-8") as f: + raw = json.load(f) + if isinstance(raw, dict): + return {str(k): str(v) for k, v in raw.items()} + return {} + except Exception: + return {} + + +def save_watermark(path: str, data: Dict[str, str]) -> None: + tmp = f"{path}.tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True) + os.replace(tmp, path) + + +def _normalize_sql_conn(conn: str) -> str: + s = (conn or "").strip().rstrip(";") + if not s: + return s + if re.search(r"(?i)\bencrypt\s*=", s): + s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s) + else: + s += ";Encrypt=no" + if not re.search(r"(?i)\btrustservercertificate\s*=", s): + s += ";TrustServerCertificate=yes" + return s + ";" + + +def _parse_iso_date(value: Optional[str]): + if not value: + return None + try: + return date.fromisoformat(str(value)[:10]) + except Exception: + return None + + +def _to_float(value: Any) -> Optional[float]: + if value is None: + return None + try: + return float(value) + except Exception: + return None + + +def _dedupe_installments(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + seen = set() + out: List[Dict[str, Any]] = [] + for item in items: + code = str(item.get("installmentCode") or "").strip() + key = code or ( + str(item.get("installmentGroupCode") or ""), + str(item.get("installmentNumber") or ""), + str(item.get("borrowerCode") or ""), + str(item.get("dueDate") or ""), + ) + if key in seen: + continue + seen.add(key) + out.append(item) + return out + + +def _ensure_doc_tables(cur) -> None: + cur.execute( + """ +IF OBJECT_ID('dbo.DocPedidos', 'U') IS NULL +BEGIN + CREATE TABLE dbo.DocPedidos ( + Id INT IDENTITY(1,1) PRIMARY KEY, + InstallmentGroupCode VARCHAR(40) NOT NULL UNIQUE, + MediatorCode VARCHAR(20) NULL, + MediatorGroupCode VARCHAR(20) NULL, + IssueDate DATE NULL, + CreatedAt DATE NULL, + TotalItens INT NOT NULL DEFAULT 0, + ValorTotalOriginal DECIMAL(18,2) NULL, + ValorTotalFee DECIMAL(18,2) NULL, + PayloadJson NVARCHAR(MAX) NULL, + CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), + AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() + ); +END + """ + ) + cur.execute( + """ +IF OBJECT_ID('dbo.DocPedidosParcelas', 'U') IS NULL +BEGIN + CREATE TABLE dbo.DocPedidosParcelas ( + Id INT IDENTITY(1,1) PRIMARY KEY, + DocPedidoId INT NOT NULL, + InstallmentCode VARCHAR(40) NOT NULL UNIQUE, + InstallmentGroupCode VARCHAR(40) NOT NULL, + InstallmentNumber INT NULL, + BorrowerCode VARCHAR(40) NULL, + DueDate DATE NULL, + IssueDate DATE NULL, + CreatedAt DATE NULL, + PaymentType VARCHAR(40) NULL, + OriginalAmount DECIMAL(18,2) NULL, + TotalFee DECIMAL(18,2) NULL, + PayloadJson NVARCHAR(MAX) NULL, + CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), + AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), + CONSTRAINT FK_DocPedidosParcelas_DocPedidos FOREIGN KEY (DocPedidoId) REFERENCES dbo.DocPedidos(Id) + ); +END + """ + ) + + +def upsert_doc_pedidos_sqlserver( + rows: List[Dict[str, Any]], + connection_string: str, + mediator_code_log: Optional[int] = None, +) -> Dict[str, int]: + try: + import pyodbc # type: ignore + except Exception as e: + raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e + + if not rows: + return {"pedidos": 0, "parcelas": 0} + + cn = pyodbc.connect(_normalize_sql_conn(connection_string), timeout=30) + cn.autocommit = False + cur = cn.cursor() + try: + cn.timeout = int(os.getenv("SQL_STATEMENT_TIMEOUT_SEC", "120")) + except Exception: + pass + pedidos_count = 0 + parcelas_count = 0 + try: + _ensure_doc_tables(cur) + total_rows = len(rows) + if mediator_code_log is not None: + print(f"[sql-inicio] loja={mediator_code_log} pedidos_para_upsert={total_rows}") + + for idx, row in enumerate(rows, start=1): + group_code = str(row.get("installmentGroupCode") or "").strip() + if not group_code: + continue + if mediator_code_log is not None: + print(f"[sql-progresso] loja={mediator_code_log} pedido={idx}/{total_rows} group={group_code}") + installments = row.get("installments") or [] + first = installments[0] if installments else {} + mediator_code = str(row.get("mediatorCode") or first.get("mediatorCode") or "").strip() or None + mediator_group_code = str(first.get("mediatorGroupCode") or "").strip() or None + issue_date = _parse_iso_date(first.get("issueDate")) + created_at = _parse_iso_date(first.get("createdAt")) + total_itens = int(len(installments)) + valor_total_original = sum(_to_float(x.get("originalAmount")) or 0.0 for x in installments) + valor_total_fee = sum(_to_float(x.get("totalFee")) or 0.0 for x in installments) + payload_json = json.dumps(row.get("rawResponse"), ensure_ascii=False) + + cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code) + found = cur.fetchone() + if found: + doc_id = int(found[0]) + cur.execute( + """ +UPDATE dbo.DocPedidos +SET MediatorCode=?, MediatorGroupCode=?, IssueDate=?, CreatedAt=?, TotalItens=?, + ValorTotalOriginal=?, ValorTotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME() +WHERE Id=? + """, + mediator_code, + mediator_group_code, + issue_date, + created_at, + total_itens, + float(valor_total_original), + float(valor_total_fee), + payload_json, + doc_id, + ) + else: + cur.execute( + """ +INSERT INTO dbo.DocPedidos ( + InstallmentGroupCode, MediatorCode, MediatorGroupCode, IssueDate, CreatedAt, + TotalItens, ValorTotalOriginal, ValorTotalFee, PayloadJson +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + group_code, + mediator_code, + mediator_group_code, + issue_date, + created_at, + total_itens, + float(valor_total_original), + float(valor_total_fee), + payload_json, + ) + cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code) + got = cur.fetchone() + if not got: + continue + doc_id = int(got[0]) + pedidos_count += 1 + + current_codes: List[str] = [] + for item in installments: + installment_code = str(item.get("installmentCode") or "").strip() + if not installment_code: + continue + current_codes.append(installment_code) + cur.execute("SELECT Id FROM dbo.DocPedidosParcelas WHERE InstallmentCode = ?", installment_code) + par_found = cur.fetchone() + if par_found: + cur.execute( + """ +UPDATE dbo.DocPedidosParcelas +SET DocPedidoId=?, InstallmentGroupCode=?, InstallmentNumber=?, BorrowerCode=?, DueDate=?, + IssueDate=?, CreatedAt=?, PaymentType=?, OriginalAmount=?, TotalFee=?, PayloadJson=?, + AtualizadoEm=SYSUTCDATETIME() +WHERE InstallmentCode=? + """, + doc_id, + group_code, + int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None, + str(item.get("borrowerCode") or "")[:40] or None, + _parse_iso_date(item.get("dueDate")), + _parse_iso_date(item.get("issueDate")), + _parse_iso_date(item.get("createdAt")), + str(item.get("paymentType") or "")[:40] or None, + _to_float(item.get("originalAmount")), + _to_float(item.get("totalFee")), + json.dumps(item, ensure_ascii=False), + installment_code, + ) + else: + cur.execute( + """ +INSERT INTO dbo.DocPedidosParcelas ( + DocPedidoId, InstallmentCode, InstallmentGroupCode, InstallmentNumber, BorrowerCode, + DueDate, IssueDate, CreatedAt, PaymentType, OriginalAmount, TotalFee, PayloadJson +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + doc_id, + installment_code, + group_code, + int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None, + str(item.get("borrowerCode") or "")[:40] or None, + _parse_iso_date(item.get("dueDate")), + _parse_iso_date(item.get("issueDate")), + _parse_iso_date(item.get("createdAt")), + str(item.get("paymentType") or "")[:40] or None, + _to_float(item.get("originalAmount")), + _to_float(item.get("totalFee")), + json.dumps(item, ensure_ascii=False), + ) + parcelas_count += 1 + + if current_codes: + placeholders = ",".join("?" for _ in current_codes) + cur.execute( + f""" +DELETE FROM dbo.DocPedidosParcelas +WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders}) + """, + doc_id, + *current_codes, + ) + + cn.commit() + if mediator_code_log is not None: + print( + f"[sql-fim] loja={mediator_code_log} pedidos_upsert={pedidos_count} " + f"parcelas_upsert={parcelas_count}" + ) + return {"pedidos": pedidos_count, "parcelas": parcelas_count} + except Exception: + cn.rollback() + raise + finally: + cur.close() + cn.close() + + +def main() -> None: + today = date.today() + default_start = date(today.year, 1, 1).isoformat() + default_end = today.isoformat() + start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE") + start_date_fixed = (start_date_env or "").strip() or None + end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end) + installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO" + first_page = int(os.getenv("PAGE_START", "1")) + max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000")) + # Padrao organizado: uma loja por vez (logs nao ficam intercalados). + store_workers = int(os.getenv("STORE_WORKERS", "1")) + group_workers = int(os.getenv("GROUP_WORKERS", "4")) + cookie_header = os.getenv("EXTRANET_COOKIE") + target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip() + target_mediator = int(target_mediator_env) if target_mediator_env else None + resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip() + resume_from_mediator = int(resume_from_env) if resume_from_env else None + log_group_codes = os.getenv("LOG_GROUP_CODES", "1") == "1" + output_dir = os.getenv("OUTPUT_DIR", "installments_por_loja") + save_json = os.getenv("SAVE_JSON", "0") == "1" + write_sql = os.getenv("WRITE_SQL", "1") == "1" + watermark_file = os.getenv("WATERMARK_FILE", "installments_watermark.json") + incremental_mode = os.getenv("INCREMENTAL_MODE", "1") == "1" + sql_conn = os.getenv( + "SQLSERVER_CONN", + ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" + ), + ) + + session = requests.Session() + session.trust_env = False + auth = Auth(session) + + stores = get_store_codes(session, auth, cookie_header) + if target_mediator is not None: + stores = [target_mediator] + print(f"[info] consulta focada na loja {target_mediator}") + elif resume_from_mediator is not None: + stores = [s for s in stores if s >= resume_from_mediator] + print(f"[info] retomando a partir da loja {resume_from_mediator} (restantes={len(stores)})") + stores = sorted(stores) + if save_json: + os.makedirs(output_dir, exist_ok=True) + watermark = load_watermark(watermark_file) if incremental_mode else {} + if incremental_mode: + print(f"[info] incremental ativo com watermark em {watermark_file}") + else: + print("[info] incremental desativado") + + authorized: list[int] = [] + unauthorized: list[int] = [] + skipped_bad_request: list[int] = [] + failed: Dict[int, str] = {} + + def process_store(mediator_code: int) -> Dict[str, Any]: + store_start_date = start_date_fixed + if not store_start_date: + if incremental_mode: + store_start_date = watermark.get(str(mediator_code), default_start) + else: + store_start_date = default_start + + local_session = requests.Session() + local_session.trust_env = False + local_auth = Auth(local_session) + + try: + print( + f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}" + ) + page = first_page + all_installments: List[Dict[str, Any]] = [] + total_api = 0 + total_pages = 1 + while True: + try: + body = get_installments_page( + session=local_session, + auth=local_auth, + start_date=store_start_date, + end_date=end_date, + installment_change=installment_change, + mediator_code=mediator_code, + page=page, + cookie_header=cookie_header, + ) + except Exception as e: + msg = str(e) + if "400 Bad Request" in msg and installment_change: + print( + f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; " + "tentando sem installmentChange" + ) + body = get_installments_page( + session=local_session, + auth=local_auth, + start_date=store_start_date, + end_date=end_date, + installment_change=None, + mediator_code=mediator_code, + page=page, + cookie_header=cookie_header, + ) + else: + raise + installments_page = (((body.get("data") or {}).get("installments")) or []) + all_installments.extend(installments_page) + pagination = ((body.get("data") or {}).get("pagination") or {}) + total_api = int(pagination.get("total") or len(all_installments)) + limit = int(pagination.get("limit") or len(installments_page) or 1) + total_pages = max(1, (total_api + limit - 1) // limit) if total_api else page + print( + f"[pagina-loja] loja={mediator_code} pagina={page}/{total_pages} " + f"itens_pagina={len(installments_page)}" + ) + if not installments_page: + break + if page >= total_pages: + break + if page - first_page + 1 >= max_pages_per_query: + print( + f"[stop] loja={mediator_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" + ) + break + page += 1 + + installments = _dedupe_installments(all_installments) + group_codes = sorted( + { + str(item.get("installmentGroupCode")).strip() + for item in installments + if item.get("installmentGroupCode") is not None + and str(item.get("installmentGroupCode")).strip() + } + ) + print( + f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} " + f"itens_total_agregados={len(installments)} grupos_unicos={len(group_codes)}" + ) + if log_group_codes: + for gc in group_codes: + print(f"[grupo] loja={mediator_code} installmentGroupCode={gc}") + + pedidos: Dict[str, Any] = {} + + def fetch_group(group_code: str) -> Dict[str, Any]: + try: + group_session = requests.Session() + group_session.trust_env = False + group_auth = Auth(group_session) + group_page = first_page + all_group_installments: List[Dict[str, Any]] = [] + group_total = 0 + group_total_pages = 1 + while True: + group_body = get_installments_page( + session=group_session, + auth=group_auth, + start_date=None, + end_date=None, + installment_change=None, + mediator_code=None, + page=group_page, + installment_group_code=group_code, + cookie_header=cookie_header, + ) + group_page_items = (((group_body.get("data") or {}).get("installments")) or []) + all_group_installments.extend(group_page_items) + group_pagination = ((group_body.get("data") or {}).get("pagination") or {}) + group_total = int(group_pagination.get("total") or len(all_group_installments)) + group_limit = int(group_pagination.get("limit") or len(group_page_items) or 1) + group_total_pages = ( + max(1, (group_total + group_limit - 1) // group_limit) if group_total else group_page + ) + print( + f"[grupo-pagina] loja={mediator_code} installmentGroupCode={group_code} " + f"pagina={group_page}/{group_total_pages} itens_pagina={len(group_page_items)}" + ) + if not group_page_items: + break + if group_page >= group_total_pages: + break + if group_page - first_page + 1 >= max_pages_per_query: + print( + f"[stop] grupo={group_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" + ) + break + group_page += 1 + + group_installments = _dedupe_installments(all_group_installments) + count = len(group_installments) + print(f"[grupo-ok] loja={mediator_code} installmentGroupCode={group_code} itens={count}") + group_body_agg = { + "data": { + "installments": group_installments, + "pagination": {"limit": count, "total": group_total or count}, + }, + "status": 200, + "message": "Success", + } + return { + "pedidoNumero": group_code, + "consulta": { + "installmentGroupCode": group_code, + "pageStart": first_page, + "pagesFetched": group_total_pages, + }, + "ok": True, + "totalItens": count, + "detalhes": group_body_agg, + } + except Exception as e: + print(f"[grupo-erro] loja={mediator_code} installmentGroupCode={group_code} erro={e}") + return { + "pedidoNumero": group_code, + "consulta": { + "installmentGroupCode": group_code, + "pageStart": first_page, + }, + "ok": False, + "error": str(e), + } + + if group_codes: + with ThreadPoolExecutor(max_workers=max(1, group_workers)) as group_pool: + group_futures = {group_pool.submit(fetch_group, gc): gc for gc in group_codes} + for fut in as_completed(group_futures): + gc = group_futures[fut] + pedidos[gc] = fut.result() + + store_out = { + "queryWindow": { + "startInstallmentChangeDate": store_start_date, + "endInstallmentChangeDate": end_date, + "installmentChange": installment_change, + "pageStart": first_page, + "pagesFetched": total_pages, + }, + "mediatorCode": mediator_code, + "pedidosEncontradosPagina": len(group_codes), + "pedidos": pedidos, + } + if write_sql: + sql_rows: List[Dict[str, Any]] = [] + for gc, pedido_payload in pedidos.items(): + if not pedido_payload.get("ok"): + continue + raw_response = pedido_payload.get("detalhes") or {} + sql_rows.append( + { + "mediatorCode": mediator_code, + "installmentGroupCode": gc, + "rawResponse": raw_response, + "installments": (((raw_response.get("data") or {}).get("installments")) or []), + } + ) + stats = upsert_doc_pedidos_sqlserver( + sql_rows, + sql_conn, + mediator_code_log=mediator_code, + ) + store_out["sqlUpsert"] = stats + print( + f"[sql] loja={mediator_code} pedidos_upsert={stats.get('pedidos')} " + f"parcelas_upsert={stats.get('parcelas')}" + ) + if save_json: + store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json") + with open(store_file, "w", encoding="utf-8") as f: + json.dump(store_out, f, ensure_ascii=False, indent=2) + print(f"[ok] loja={mediator_code} json_salvo={store_file}") + return {"status": "authorized", "mediatorCode": mediator_code, "endDate": end_date} + except Exception as e: + msg = str(e) + if "403 Forbidden no installments" in msg: + return {"status": "unauthorized", "mediatorCode": mediator_code} + if "400 Bad Request em installments" in msg and ( + "Combina" in msg or "invalid" in msg.lower() + ): + print(f"[skip-400] loja={mediator_code} filtro nao aplicavel para essa loja") + return {"status": "skipped_bad_request", "mediatorCode": mediator_code} + else: + print(f"[falha] loja={mediator_code} erro={msg}") + return {"status": "failed", "mediatorCode": mediator_code, "error": msg} + + with ThreadPoolExecutor(max_workers=max(1, store_workers)) as store_pool: + store_futures = {store_pool.submit(process_store, mediator): mediator for mediator in stores} + for fut in as_completed(store_futures): + try: + result = fut.result() + mediator_code = int(result["mediatorCode"]) + status = result["status"] + if status == "authorized": + authorized.append(mediator_code) + if incremental_mode: + watermark[str(mediator_code)] = str(result["endDate"]) + save_watermark(watermark_file, watermark) + print(f"[loja-fim] loja={mediator_code} status=ok") + elif status == "unauthorized": + unauthorized.append(mediator_code) + print(f"[loja-fim] loja={mediator_code} status=nao_autorizada") + elif status == "skipped_bad_request": + skipped_bad_request.append(mediator_code) + print(f"[loja-fim] loja={mediator_code} status=skip_400") + else: + failed[mediator_code] = str(result.get("error") or "erro desconhecido") + print(f"[loja-fim] loja={mediator_code} status=falha") + except Exception as e: + mediator_code = int(store_futures[fut]) + failed[mediator_code] = f"erro no worker: {e}" + print(f"[falha] loja={mediator_code} erro no worker: {e}") + + print( + "[resumo] " + f"lojas_total={len(stores)} autorizadas={len(authorized)} " + f"nao_autorizadas={len(unauthorized)} skip_400={len(skipped_bad_request)} falhas={len(failed)} " + f"store_workers={store_workers} group_workers={group_workers} " + f"pasta_saida={output_dir}" + ) + if failed: + first_code = sorted(failed.keys())[0] + print(f"[falha-exemplo] loja={first_code} erro={failed[first_code]}") + + +if __name__ == "__main__": + main()