import base64 from concurrent.futures import ThreadPoolExecutor, as_completed import json import os import re import time from dataclasses import dataclass from datetime import date, timedelta from typing import Any, Dict, List, Optional from urllib.parse import urlencode import requests TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" INSTALLMENTS_URL = ( "https://bff-credit-container-portal-apigw.produto-financeiro.grupoboticario.digital/" "v1/franchisee/installments" ) def _jwt_payload(jwt_token: str) -> Dict[str, Any]: parts = jwt_token.split(".") if len(parts) != 3: return {} payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) return json.loads(raw.decode("utf-8")) @dataclass class TokenCache: bearer: Optional[str] = None exp_epoch: int = 0 def valid(self, skew_seconds: int = 30) -> bool: return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) class Auth: def __init__(self, session: requests.Session): self.s = session self.cache = TokenCache() self.override_bearer = os.getenv("EXTRANET_BEARER") def get_bearer(self, force_refresh: bool = False) -> str: if self.override_bearer: return self.override_bearer if (not force_refresh) and self.cache.valid(): return self.cache.bearer # type: ignore[return-value] r = self.s.get(TOKENS_URL, timeout=30) r.raise_for_status() body = r.json() if not body.get("success"): raise RuntimeError(f"Token API retornou success=false: {body}") bearer = body["data"][0]["token"] jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer exp = int(_jwt_payload(jwt).get("exp") or 0) self.cache = TokenCache(bearer=bearer, exp_epoch=exp) return bearer def invalidate(self) -> None: self.cache = TokenCache() def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]: h = { "Authorization": auth.get_bearer(), "Accept": "application/json, text/plain, */*", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", "User-Agent": "Mozilla/5.0", } if cookie_header: h["Cookie"] = cookie_header return h def get_installments_page( session: requests.Session, auth: Auth, start_date: Optional[str], end_date: Optional[str], installment_change: Optional[str], mediator_code: Optional[int], page: int, installment_group_code: Optional[str] = None, cookie_header: Optional[str] = None, ) -> Dict[str, Any]: params: Dict[str, Any] = {"page": page} if installment_group_code: params["installmentGroupCode"] = installment_group_code else: if not start_date or not end_date or mediator_code is None: raise ValueError("Para consulta por periodo, informe start/end/mediator_code.") params["endInstallmentChangeDate"] = end_date params["startInstallmentChangeDate"] = start_date params["mediatorCode"] = mediator_code if installment_change: params["installmentChange"] = installment_change r = None max_attempts = 8 transient_status = {429, 500, 502, 503, 504} for attempt in range(max_attempts): r = session.get( INSTALLMENTS_URL, headers=_headers(auth, cookie_header=cookie_header), params=params, timeout=60, ) if r.status_code == 401: print( f"[warn] 401 no installments (tentativa {attempt + 1}/{max_attempts}), " "renovando token..." ) auth.invalidate() auth.get_bearer(force_refresh=True) time.sleep(min(3, attempt + 1)) continue if r.status_code == 403: break if r.status_code in transient_status: wait_s = min(30, 2 ** min(5, attempt)) print( f"[warn] erro temporario {r.status_code} no installments " f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..." ) time.sleep(wait_s) continue break assert r is not None if r.status_code == 403: payload = _jwt_payload(auth.get_bearer().split(" ", 1)[1]) stores_claim = payload.get("stores") raise RuntimeError( "403 Forbidden no installments. Possiveis causas: mediatorCode sem permissao no token " f"ou falta de Cookie CloudFront. stores no token={stores_claim}. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) if r.status_code in transient_status: raise RuntimeError( f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) if r.status_code == 400: raise RuntimeError( f"400 Bad Request em installments. " f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" ) r.raise_for_status() return r.json() def get_store_codes(session: requests.Session, auth: Auth, cookie_header: Optional[str]) -> list[int]: r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30) r.raise_for_status() data = r.json().get("data") or [] out = sorted({int(x.get("code")) for x in data if x.get("code") is not None}) return out def load_watermark(path: str) -> Dict[str, str]: try: if not os.path.exists(path): return {} with open(path, "r", encoding="utf-8") as f: raw = json.load(f) if isinstance(raw, dict): return {str(k): str(v) for k, v in raw.items()} return {} except Exception: return {} def save_watermark(path: str, data: Dict[str, str]) -> None: tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(tmp, path) def _normalize_sql_conn(conn: str) -> str: s = (conn or "").strip().rstrip(";") if not s: return s if re.search(r"(?i)\bencrypt\s*=", s): s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s) else: s += ";Encrypt=no" if not re.search(r"(?i)\btrustservercertificate\s*=", s): s += ";TrustServerCertificate=yes" return s + ";" def _parse_iso_date(value: Optional[str]): if not value: return None try: return date.fromisoformat(str(value)[:10]) except Exception: return None def _to_float(value: Any) -> Optional[float]: if value is None: return None try: return float(value) except Exception: return None def _dedupe_installments(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen = set() out: List[Dict[str, Any]] = [] for item in items: code = str(item.get("installmentCode") or "").strip() key = code or ( str(item.get("installmentGroupCode") or ""), str(item.get("installmentNumber") or ""), str(item.get("borrowerCode") or ""), str(item.get("dueDate") or ""), ) if key in seen: continue seen.add(key) out.append(item) return out def _ensure_doc_tables(cur) -> None: cur.execute( """ IF OBJECT_ID('dbo.DocPedidos', 'U') IS NULL BEGIN CREATE TABLE dbo.DocPedidos ( Id INT IDENTITY(1,1) PRIMARY KEY, InstallmentGroupCode VARCHAR(40) NOT NULL UNIQUE, MediatorCode VARCHAR(20) NULL, MediatorGroupCode VARCHAR(20) NULL, IssueDate DATE NULL, CreatedAt DATE NULL, TotalItens INT NOT NULL DEFAULT 0, ValorTotalOriginal DECIMAL(18,2) NULL, ValorTotalFee DECIMAL(18,2) NULL, PayloadJson NVARCHAR(MAX) NULL, CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() ); END """ ) cur.execute( """ IF OBJECT_ID('dbo.DocPedidosParcelas', 'U') IS NULL BEGIN CREATE TABLE dbo.DocPedidosParcelas ( Id INT IDENTITY(1,1) PRIMARY KEY, DocPedidoId INT NOT NULL, InstallmentCode VARCHAR(40) NOT NULL UNIQUE, InstallmentGroupCode VARCHAR(40) NOT NULL, InstallmentNumber INT NULL, BorrowerCode VARCHAR(40) NULL, DueDate DATE NULL, IssueDate DATE NULL, CreatedAt DATE NULL, PaymentType VARCHAR(40) NULL, OriginalAmount DECIMAL(18,2) NULL, TotalFee DECIMAL(18,2) NULL, PayloadJson NVARCHAR(MAX) NULL, CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(), CONSTRAINT FK_DocPedidosParcelas_DocPedidos FOREIGN KEY (DocPedidoId) REFERENCES dbo.DocPedidos(Id) ); END """ ) def upsert_doc_pedidos_sqlserver( rows: List[Dict[str, Any]], connection_string: str, mediator_code_log: Optional[int] = None, ) -> Dict[str, int]: try: import pyodbc # type: ignore except Exception as e: raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e if not rows: return {"pedidos": 0, "parcelas": 0} cn = pyodbc.connect(_normalize_sql_conn(connection_string), timeout=30) cn.autocommit = False cur = cn.cursor() try: cn.timeout = int(os.getenv("SQL_STATEMENT_TIMEOUT_SEC", "120")) except Exception: pass pedidos_count = 0 parcelas_count = 0 try: _ensure_doc_tables(cur) total_rows = len(rows) if mediator_code_log is not None: print(f"[sql-inicio] loja={mediator_code_log} pedidos_para_upsert={total_rows}") for idx, row in enumerate(rows, start=1): group_code = str(row.get("installmentGroupCode") or "").strip() if not group_code: continue if mediator_code_log is not None: print(f"[sql-progresso] loja={mediator_code_log} pedido={idx}/{total_rows} group={group_code}") installments = row.get("installments") or [] first = installments[0] if installments else {} mediator_code = str(row.get("mediatorCode") or first.get("mediatorCode") or "").strip() or None mediator_group_code = str(first.get("mediatorGroupCode") or "").strip() or None issue_date = _parse_iso_date(first.get("issueDate")) created_at = _parse_iso_date(first.get("createdAt")) total_itens = int(len(installments)) valor_total_original = sum(_to_float(x.get("originalAmount")) or 0.0 for x in installments) valor_total_fee = sum(_to_float(x.get("totalFee")) or 0.0 for x in installments) payload_json = json.dumps(row.get("rawResponse"), ensure_ascii=False) cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code) found = cur.fetchone() if found: doc_id = int(found[0]) cur.execute( """ UPDATE dbo.DocPedidos SET MediatorCode=?, MediatorGroupCode=?, IssueDate=?, CreatedAt=?, TotalItens=?, ValorTotalOriginal=?, ValorTotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME() WHERE Id=? """, mediator_code, mediator_group_code, issue_date, created_at, total_itens, float(valor_total_original), float(valor_total_fee), payload_json, doc_id, ) else: cur.execute( """ INSERT INTO dbo.DocPedidos ( InstallmentGroupCode, MediatorCode, MediatorGroupCode, IssueDate, CreatedAt, TotalItens, ValorTotalOriginal, ValorTotalFee, PayloadJson ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, group_code, mediator_code, mediator_group_code, issue_date, created_at, total_itens, float(valor_total_original), float(valor_total_fee), payload_json, ) cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code) got = cur.fetchone() if not got: continue doc_id = int(got[0]) pedidos_count += 1 current_codes: List[str] = [] for item in installments: installment_code = str(item.get("installmentCode") or "").strip() if not installment_code: continue current_codes.append(installment_code) cur.execute("SELECT Id FROM dbo.DocPedidosParcelas WHERE InstallmentCode = ?", installment_code) par_found = cur.fetchone() if par_found: cur.execute( """ UPDATE dbo.DocPedidosParcelas SET DocPedidoId=?, InstallmentGroupCode=?, InstallmentNumber=?, BorrowerCode=?, DueDate=?, IssueDate=?, CreatedAt=?, PaymentType=?, OriginalAmount=?, TotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME() WHERE InstallmentCode=? """, doc_id, group_code, int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None, str(item.get("borrowerCode") or "")[:40] or None, _parse_iso_date(item.get("dueDate")), _parse_iso_date(item.get("issueDate")), _parse_iso_date(item.get("createdAt")), str(item.get("paymentType") or "")[:40] or None, _to_float(item.get("originalAmount")), _to_float(item.get("totalFee")), json.dumps(item, ensure_ascii=False), installment_code, ) else: cur.execute( """ INSERT INTO dbo.DocPedidosParcelas ( DocPedidoId, InstallmentCode, InstallmentGroupCode, InstallmentNumber, BorrowerCode, DueDate, IssueDate, CreatedAt, PaymentType, OriginalAmount, TotalFee, PayloadJson ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, doc_id, installment_code, group_code, int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None, str(item.get("borrowerCode") or "")[:40] or None, _parse_iso_date(item.get("dueDate")), _parse_iso_date(item.get("issueDate")), _parse_iso_date(item.get("createdAt")), str(item.get("paymentType") or "")[:40] or None, _to_float(item.get("originalAmount")), _to_float(item.get("totalFee")), json.dumps(item, ensure_ascii=False), ) parcelas_count += 1 if current_codes: placeholders = ",".join("?" for _ in current_codes) cur.execute( f""" DELETE FROM dbo.DocPedidosParcelas WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders}) """, doc_id, *current_codes, ) cn.commit() if mediator_code_log is not None: print( f"[sql-fim] loja={mediator_code_log} pedidos_upsert={pedidos_count} " f"parcelas_upsert={parcelas_count}" ) return {"pedidos": pedidos_count, "parcelas": parcelas_count} except Exception: cn.rollback() raise finally: cur.close() cn.close() def main() -> None: today = date.today() last_days_env = os.getenv("LAST_N_DAYS", "5").strip() last_n_days = int(last_days_env) if last_days_env else None rolling_start = (today - timedelta(days=last_n_days)).isoformat() if last_n_days is not None else None default_start = date(today.year, 1, 1).isoformat() default_end = today.isoformat() start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE") start_date_fixed = (start_date_env or "").strip() or None end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end) installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO" first_page = int(os.getenv("PAGE_START", "1")) max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000")) # Padrao organizado: uma loja por vez (logs nao ficam intercalados). store_workers = int(os.getenv("STORE_WORKERS", "1")) group_workers = int(os.getenv("GROUP_WORKERS", "4")) cookie_header = os.getenv("EXTRANET_COOKIE") target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip() target_mediator = int(target_mediator_env) if target_mediator_env else None resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip() resume_from_mediator = int(resume_from_env) if resume_from_env else None log_group_codes = os.getenv("LOG_GROUP_CODES", "1") == "1" output_dir = os.getenv("OUTPUT_DIR", "installments_por_loja") save_json = os.getenv("SAVE_JSON", "0") == "1" write_sql = os.getenv("WRITE_SQL", "1") == "1" watermark_file = os.getenv("WATERMARK_FILE", "installments_watermark.json") incremental_mode = os.getenv("INCREMENTAL_MODE", "1") == "1" sql_conn = os.getenv( "SQLSERVER_CONN", ( "DRIVER={ODBC Driver 17 for SQL Server};" "SERVER=10.77.77.10;" "DATABASE=GINSENG;" "UID=andrey;" "PWD=88253332;" "TrustServerCertificate=yes;" ), ) session = requests.Session() session.trust_env = False auth = Auth(session) stores = get_store_codes(session, auth, cookie_header) if target_mediator is not None: stores = [target_mediator] print(f"[info] consulta focada na loja {target_mediator}") elif resume_from_mediator is not None: stores = [s for s in stores if s >= resume_from_mediator] print(f"[info] retomando a partir da loja {resume_from_mediator} (restantes={len(stores)})") stores = sorted(stores) if save_json: os.makedirs(output_dir, exist_ok=True) watermark = load_watermark(watermark_file) if incremental_mode else {} if incremental_mode: print(f"[info] incremental ativo com watermark em {watermark_file}") else: print("[info] incremental desativado") if last_n_days is not None: print(f"[info] janela movel ativa: ultimos {last_n_days} dias ({rolling_start}..{end_date})") authorized: list[int] = [] unauthorized: list[int] = [] skipped_bad_request: list[int] = [] failed: Dict[int, str] = {} def process_store(mediator_code: int) -> Dict[str, Any]: if rolling_start: store_start_date = rolling_start else: store_start_date = start_date_fixed if not store_start_date: if incremental_mode: store_start_date = watermark.get(str(mediator_code), default_start) else: store_start_date = default_start local_session = requests.Session() local_session.trust_env = False local_auth = Auth(local_session) store_bearer = local_auth.get_bearer() try: print( f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}" ) page = first_page all_installments: List[Dict[str, Any]] = [] total_api = 0 total_pages = 1 while True: try: body = get_installments_page( session=local_session, auth=local_auth, start_date=store_start_date, end_date=end_date, installment_change=installment_change, mediator_code=mediator_code, page=page, cookie_header=cookie_header, ) except Exception as e: msg = str(e) if "400 Bad Request" in msg and installment_change: print( f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; " "tentando sem installmentChange" ) body = get_installments_page( session=local_session, auth=local_auth, start_date=store_start_date, end_date=end_date, installment_change=None, mediator_code=mediator_code, page=page, cookie_header=cookie_header, ) else: raise installments_page = (((body.get("data") or {}).get("installments")) or []) all_installments.extend(installments_page) pagination = ((body.get("data") or {}).get("pagination") or {}) total_api = int(pagination.get("total") or len(all_installments)) limit = int(pagination.get("limit") or len(installments_page) or 1) total_pages = max(1, (total_api + limit - 1) // limit) if total_api else page print( f"[pagina-loja] loja={mediator_code} pagina={page}/{total_pages} " f"itens_pagina={len(installments_page)}" ) if not installments_page: break if page >= total_pages: break if page - first_page + 1 >= max_pages_per_query: print( f"[stop] loja={mediator_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" ) break page += 1 installments = _dedupe_installments(all_installments) group_codes = sorted( { str(item.get("installmentGroupCode")).strip() for item in installments if item.get("installmentGroupCode") is not None and str(item.get("installmentGroupCode")).strip() } ) print( f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} " f"itens_total_agregados={len(installments)} grupos_unicos={len(group_codes)}" ) if log_group_codes: for gc in group_codes: print(f"[grupo] loja={mediator_code} installmentGroupCode={gc}") pedidos: Dict[str, Any] = {} def fetch_group(group_code: str) -> Dict[str, Any]: try: group_session = requests.Session() group_session.trust_env = False group_auth = Auth(group_session) # Reutiliza o token da loja para evitar nova ida ao endpoint /api/tokens por grupo. group_auth.override_bearer = store_bearer group_page = first_page all_group_installments: List[Dict[str, Any]] = [] group_total = 0 group_total_pages = 1 while True: group_body = get_installments_page( session=group_session, auth=group_auth, start_date=None, end_date=None, installment_change=None, mediator_code=None, page=group_page, installment_group_code=group_code, cookie_header=cookie_header, ) group_page_items = (((group_body.get("data") or {}).get("installments")) or []) all_group_installments.extend(group_page_items) group_pagination = ((group_body.get("data") or {}).get("pagination") or {}) group_total = int(group_pagination.get("total") or len(all_group_installments)) group_limit = int(group_pagination.get("limit") or len(group_page_items) or 1) group_total_pages = ( max(1, (group_total + group_limit - 1) // group_limit) if group_total else group_page ) print( f"[grupo-pagina] loja={mediator_code} installmentGroupCode={group_code} " f"pagina={group_page}/{group_total_pages} itens_pagina={len(group_page_items)}" ) if not group_page_items: break if group_page >= group_total_pages: break if group_page - first_page + 1 >= max_pages_per_query: print( f"[stop] grupo={group_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" ) break group_page += 1 group_installments = _dedupe_installments(all_group_installments) count = len(group_installments) print(f"[grupo-ok] loja={mediator_code} installmentGroupCode={group_code} itens={count}") group_body_agg = { "data": { "installments": group_installments, "pagination": {"limit": count, "total": group_total or count}, }, "status": 200, "message": "Success", } return { "pedidoNumero": group_code, "consulta": { "installmentGroupCode": group_code, "pageStart": first_page, "pagesFetched": group_total_pages, }, "ok": True, "totalItens": count, "detalhes": group_body_agg, } except Exception as e: print(f"[grupo-erro] loja={mediator_code} installmentGroupCode={group_code} erro={e}") return { "pedidoNumero": group_code, "consulta": { "installmentGroupCode": group_code, "pageStart": first_page, }, "ok": False, "error": str(e), } if group_codes: with ThreadPoolExecutor(max_workers=max(1, group_workers)) as group_pool: group_futures = {group_pool.submit(fetch_group, gc): gc for gc in group_codes} for fut in as_completed(group_futures): gc = group_futures[fut] pedidos[gc] = fut.result() store_out = { "queryWindow": { "startInstallmentChangeDate": store_start_date, "endInstallmentChangeDate": end_date, "installmentChange": installment_change, "pageStart": first_page, "pagesFetched": total_pages, }, "mediatorCode": mediator_code, "pedidosEncontradosPagina": len(group_codes), "pedidos": pedidos, } if write_sql: sql_rows: List[Dict[str, Any]] = [] for gc, pedido_payload in pedidos.items(): if not pedido_payload.get("ok"): continue raw_response = pedido_payload.get("detalhes") or {} sql_rows.append( { "mediatorCode": mediator_code, "installmentGroupCode": gc, "rawResponse": raw_response, "installments": (((raw_response.get("data") or {}).get("installments")) or []), } ) stats = upsert_doc_pedidos_sqlserver( sql_rows, sql_conn, mediator_code_log=mediator_code, ) store_out["sqlUpsert"] = stats print( f"[sql] loja={mediator_code} pedidos_upsert={stats.get('pedidos')} " f"parcelas_upsert={stats.get('parcelas')}" ) if save_json: store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json") with open(store_file, "w", encoding="utf-8") as f: json.dump(store_out, f, ensure_ascii=False, indent=2) print(f"[ok] loja={mediator_code} json_salvo={store_file}") return {"status": "authorized", "mediatorCode": mediator_code, "endDate": end_date} except Exception as e: msg = str(e) if "403 Forbidden no installments" in msg: return {"status": "unauthorized", "mediatorCode": mediator_code} if "400 Bad Request em installments" in msg and ( "Combina" in msg or "invalid" in msg.lower() ): print(f"[skip-400] loja={mediator_code} filtro nao aplicavel para essa loja") return {"status": "skipped_bad_request", "mediatorCode": mediator_code} else: print(f"[falha] loja={mediator_code} erro={msg}") return {"status": "failed", "mediatorCode": mediator_code, "error": msg} with ThreadPoolExecutor(max_workers=max(1, store_workers)) as store_pool: store_futures = {store_pool.submit(process_store, mediator): mediator for mediator in stores} for fut in as_completed(store_futures): try: result = fut.result() mediator_code = int(result["mediatorCode"]) status = result["status"] if status == "authorized": authorized.append(mediator_code) if incremental_mode: watermark[str(mediator_code)] = str(result["endDate"]) save_watermark(watermark_file, watermark) print(f"[loja-fim] loja={mediator_code} status=ok") elif status == "unauthorized": unauthorized.append(mediator_code) print(f"[loja-fim] loja={mediator_code} status=nao_autorizada") elif status == "skipped_bad_request": skipped_bad_request.append(mediator_code) print(f"[loja-fim] loja={mediator_code} status=skip_400") else: failed[mediator_code] = str(result.get("error") or "erro desconhecido") print(f"[loja-fim] loja={mediator_code} status=falha") except Exception as e: mediator_code = int(store_futures[fut]) failed[mediator_code] = f"erro no worker: {e}" print(f"[falha] loja={mediator_code} erro no worker: {e}") print( "[resumo] " f"lojas_total={len(stores)} autorizadas={len(authorized)} " f"nao_autorizadas={len(unauthorized)} skip_400={len(skipped_bad_request)} falhas={len(failed)} " f"store_workers={store_workers} group_workers={group_workers} " f"pasta_saida={output_dir}" ) if failed: first_code = sorted(failed.keys())[0] print(f"[falha-exemplo] loja={first_code} erro={failed[first_code]}") if __name__ == "__main__": main()