att pedidos

This commit is contained in:
Andrey Cunha 2026-03-04 23:54:25 -03:00
parent 2d09ab55fd
commit 5ddfda737a

786
installments_reader.py Normal file
View File

@ -0,0 +1,786 @@
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os
import re
import time
from dataclasses import dataclass
from datetime import date
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode
import requests
TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
INSTALLMENTS_URL = (
"https://bff-credit-container-portal-apigw.produto-financeiro.grupoboticario.digital/"
"v1/franchisee/installments"
)
def _jwt_payload(jwt_token: str) -> Dict[str, Any]:
parts = jwt_token.split(".")
if len(parts) != 3:
return {}
payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4)
raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
return json.loads(raw.decode("utf-8"))
@dataclass
class TokenCache:
bearer: Optional[str] = None
exp_epoch: int = 0
def valid(self, skew_seconds: int = 30) -> bool:
return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds))
class Auth:
def __init__(self, session: requests.Session):
self.s = session
self.cache = TokenCache()
self.override_bearer = os.getenv("EXTRANET_BEARER")
def get_bearer(self, force_refresh: bool = False) -> str:
if self.override_bearer:
return self.override_bearer
if (not force_refresh) and self.cache.valid():
return self.cache.bearer # type: ignore[return-value]
r = self.s.get(TOKENS_URL, timeout=30)
r.raise_for_status()
body = r.json()
if not body.get("success"):
raise RuntimeError(f"Token API retornou success=false: {body}")
bearer = body["data"][0]["token"]
jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer
exp = int(_jwt_payload(jwt).get("exp") or 0)
self.cache = TokenCache(bearer=bearer, exp_epoch=exp)
return bearer
def invalidate(self) -> None:
self.cache = TokenCache()
def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]:
h = {
"Authorization": auth.get_bearer(),
"Accept": "application/json, text/plain, */*",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
if cookie_header:
h["Cookie"] = cookie_header
return h
def get_installments_page(
session: requests.Session,
auth: Auth,
start_date: Optional[str],
end_date: Optional[str],
installment_change: Optional[str],
mediator_code: Optional[int],
page: int,
installment_group_code: Optional[str] = None,
cookie_header: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"page": page}
if installment_group_code:
params["installmentGroupCode"] = installment_group_code
else:
if not start_date or not end_date or mediator_code is None:
raise ValueError("Para consulta por periodo, informe start/end/mediator_code.")
params["endInstallmentChangeDate"] = end_date
params["startInstallmentChangeDate"] = start_date
params["mediatorCode"] = mediator_code
if installment_change:
params["installmentChange"] = installment_change
r = None
max_attempts = 8
transient_status = {429, 500, 502, 503, 504}
for attempt in range(max_attempts):
r = session.get(
INSTALLMENTS_URL,
headers=_headers(auth, cookie_header=cookie_header),
params=params,
timeout=60,
)
if r.status_code == 401:
print(
f"[warn] 401 no installments (tentativa {attempt + 1}/{max_attempts}), "
"renovando token..."
)
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(min(3, attempt + 1))
continue
if r.status_code == 403:
break
if r.status_code in transient_status:
wait_s = min(30, 2 ** min(5, attempt))
print(
f"[warn] erro temporario {r.status_code} no installments "
f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..."
)
time.sleep(wait_s)
continue
break
assert r is not None
if r.status_code == 403:
payload = _jwt_payload(auth.get_bearer().split(" ", 1)[1])
stores_claim = payload.get("stores")
raise RuntimeError(
"403 Forbidden no installments. Possiveis causas: mediatorCode sem permissao no token "
f"ou falta de Cookie CloudFront. stores no token={stores_claim}. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
if r.status_code in transient_status:
raise RuntimeError(
f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
if r.status_code == 400:
raise RuntimeError(
f"400 Bad Request em installments. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
r.raise_for_status()
return r.json()
def get_store_codes(session: requests.Session, auth: Auth, cookie_header: Optional[str]) -> list[int]:
r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30)
r.raise_for_status()
data = r.json().get("data") or []
out = sorted({int(x.get("code")) for x in data if x.get("code") is not None})
return out
def load_watermark(path: str) -> Dict[str, str]:
try:
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
if isinstance(raw, dict):
return {str(k): str(v) for k, v in raw.items()}
return {}
except Exception:
return {}
def save_watermark(path: str, data: Dict[str, str]) -> None:
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(tmp, path)
def _normalize_sql_conn(conn: str) -> str:
s = (conn or "").strip().rstrip(";")
if not s:
return s
if re.search(r"(?i)\bencrypt\s*=", s):
s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s)
else:
s += ";Encrypt=no"
if not re.search(r"(?i)\btrustservercertificate\s*=", s):
s += ";TrustServerCertificate=yes"
return s + ";"
def _parse_iso_date(value: Optional[str]):
if not value:
return None
try:
return date.fromisoformat(str(value)[:10])
except Exception:
return None
def _to_float(value: Any) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except Exception:
return None
def _dedupe_installments(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen = set()
out: List[Dict[str, Any]] = []
for item in items:
code = str(item.get("installmentCode") or "").strip()
key = code or (
str(item.get("installmentGroupCode") or ""),
str(item.get("installmentNumber") or ""),
str(item.get("borrowerCode") or ""),
str(item.get("dueDate") or ""),
)
if key in seen:
continue
seen.add(key)
out.append(item)
return out
def _ensure_doc_tables(cur) -> None:
cur.execute(
"""
IF OBJECT_ID('dbo.DocPedidos', 'U') IS NULL
BEGIN
CREATE TABLE dbo.DocPedidos (
Id INT IDENTITY(1,1) PRIMARY KEY,
InstallmentGroupCode VARCHAR(40) NOT NULL UNIQUE,
MediatorCode VARCHAR(20) NULL,
MediatorGroupCode VARCHAR(20) NULL,
IssueDate DATE NULL,
CreatedAt DATE NULL,
TotalItens INT NOT NULL DEFAULT 0,
ValorTotalOriginal DECIMAL(18,2) NULL,
ValorTotalFee DECIMAL(18,2) NULL,
PayloadJson NVARCHAR(MAX) NULL,
CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END
"""
)
cur.execute(
"""
IF OBJECT_ID('dbo.DocPedidosParcelas', 'U') IS NULL
BEGIN
CREATE TABLE dbo.DocPedidosParcelas (
Id INT IDENTITY(1,1) PRIMARY KEY,
DocPedidoId INT NOT NULL,
InstallmentCode VARCHAR(40) NOT NULL UNIQUE,
InstallmentGroupCode VARCHAR(40) NOT NULL,
InstallmentNumber INT NULL,
BorrowerCode VARCHAR(40) NULL,
DueDate DATE NULL,
IssueDate DATE NULL,
CreatedAt DATE NULL,
PaymentType VARCHAR(40) NULL,
OriginalAmount DECIMAL(18,2) NULL,
TotalFee DECIMAL(18,2) NULL,
PayloadJson NVARCHAR(MAX) NULL,
CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
CONSTRAINT FK_DocPedidosParcelas_DocPedidos FOREIGN KEY (DocPedidoId) REFERENCES dbo.DocPedidos(Id)
);
END
"""
)
def upsert_doc_pedidos_sqlserver(
rows: List[Dict[str, Any]],
connection_string: str,
mediator_code_log: Optional[int] = None,
) -> Dict[str, int]:
try:
import pyodbc # type: ignore
except Exception as e:
raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e
if not rows:
return {"pedidos": 0, "parcelas": 0}
cn = pyodbc.connect(_normalize_sql_conn(connection_string), timeout=30)
cn.autocommit = False
cur = cn.cursor()
try:
cn.timeout = int(os.getenv("SQL_STATEMENT_TIMEOUT_SEC", "120"))
except Exception:
pass
pedidos_count = 0
parcelas_count = 0
try:
_ensure_doc_tables(cur)
total_rows = len(rows)
if mediator_code_log is not None:
print(f"[sql-inicio] loja={mediator_code_log} pedidos_para_upsert={total_rows}")
for idx, row in enumerate(rows, start=1):
group_code = str(row.get("installmentGroupCode") or "").strip()
if not group_code:
continue
if mediator_code_log is not None:
print(f"[sql-progresso] loja={mediator_code_log} pedido={idx}/{total_rows} group={group_code}")
installments = row.get("installments") or []
first = installments[0] if installments else {}
mediator_code = str(row.get("mediatorCode") or first.get("mediatorCode") or "").strip() or None
mediator_group_code = str(first.get("mediatorGroupCode") or "").strip() or None
issue_date = _parse_iso_date(first.get("issueDate"))
created_at = _parse_iso_date(first.get("createdAt"))
total_itens = int(len(installments))
valor_total_original = sum(_to_float(x.get("originalAmount")) or 0.0 for x in installments)
valor_total_fee = sum(_to_float(x.get("totalFee")) or 0.0 for x in installments)
payload_json = json.dumps(row.get("rawResponse"), ensure_ascii=False)
cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code)
found = cur.fetchone()
if found:
doc_id = int(found[0])
cur.execute(
"""
UPDATE dbo.DocPedidos
SET MediatorCode=?, MediatorGroupCode=?, IssueDate=?, CreatedAt=?, TotalItens=?,
ValorTotalOriginal=?, ValorTotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME()
WHERE Id=?
""",
mediator_code,
mediator_group_code,
issue_date,
created_at,
total_itens,
float(valor_total_original),
float(valor_total_fee),
payload_json,
doc_id,
)
else:
cur.execute(
"""
INSERT INTO dbo.DocPedidos (
InstallmentGroupCode, MediatorCode, MediatorGroupCode, IssueDate, CreatedAt,
TotalItens, ValorTotalOriginal, ValorTotalFee, PayloadJson
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
group_code,
mediator_code,
mediator_group_code,
issue_date,
created_at,
total_itens,
float(valor_total_original),
float(valor_total_fee),
payload_json,
)
cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code)
got = cur.fetchone()
if not got:
continue
doc_id = int(got[0])
pedidos_count += 1
current_codes: List[str] = []
for item in installments:
installment_code = str(item.get("installmentCode") or "").strip()
if not installment_code:
continue
current_codes.append(installment_code)
cur.execute("SELECT Id FROM dbo.DocPedidosParcelas WHERE InstallmentCode = ?", installment_code)
par_found = cur.fetchone()
if par_found:
cur.execute(
"""
UPDATE dbo.DocPedidosParcelas
SET DocPedidoId=?, InstallmentGroupCode=?, InstallmentNumber=?, BorrowerCode=?, DueDate=?,
IssueDate=?, CreatedAt=?, PaymentType=?, OriginalAmount=?, TotalFee=?, PayloadJson=?,
AtualizadoEm=SYSUTCDATETIME()
WHERE InstallmentCode=?
""",
doc_id,
group_code,
int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None,
str(item.get("borrowerCode") or "")[:40] or None,
_parse_iso_date(item.get("dueDate")),
_parse_iso_date(item.get("issueDate")),
_parse_iso_date(item.get("createdAt")),
str(item.get("paymentType") or "")[:40] or None,
_to_float(item.get("originalAmount")),
_to_float(item.get("totalFee")),
json.dumps(item, ensure_ascii=False),
installment_code,
)
else:
cur.execute(
"""
INSERT INTO dbo.DocPedidosParcelas (
DocPedidoId, InstallmentCode, InstallmentGroupCode, InstallmentNumber, BorrowerCode,
DueDate, IssueDate, CreatedAt, PaymentType, OriginalAmount, TotalFee, PayloadJson
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
doc_id,
installment_code,
group_code,
int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None,
str(item.get("borrowerCode") or "")[:40] or None,
_parse_iso_date(item.get("dueDate")),
_parse_iso_date(item.get("issueDate")),
_parse_iso_date(item.get("createdAt")),
str(item.get("paymentType") or "")[:40] or None,
_to_float(item.get("originalAmount")),
_to_float(item.get("totalFee")),
json.dumps(item, ensure_ascii=False),
)
parcelas_count += 1
if current_codes:
placeholders = ",".join("?" for _ in current_codes)
cur.execute(
f"""
DELETE FROM dbo.DocPedidosParcelas
WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders})
""",
doc_id,
*current_codes,
)
cn.commit()
if mediator_code_log is not None:
print(
f"[sql-fim] loja={mediator_code_log} pedidos_upsert={pedidos_count} "
f"parcelas_upsert={parcelas_count}"
)
return {"pedidos": pedidos_count, "parcelas": parcelas_count}
except Exception:
cn.rollback()
raise
finally:
cur.close()
cn.close()
def main() -> None:
today = date.today()
default_start = date(today.year, 1, 1).isoformat()
default_end = today.isoformat()
start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE")
start_date_fixed = (start_date_env or "").strip() or None
end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end)
installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO"
first_page = int(os.getenv("PAGE_START", "1"))
max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000"))
# Padrao organizado: uma loja por vez (logs nao ficam intercalados).
store_workers = int(os.getenv("STORE_WORKERS", "1"))
group_workers = int(os.getenv("GROUP_WORKERS", "4"))
cookie_header = os.getenv("EXTRANET_COOKIE")
target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip()
target_mediator = int(target_mediator_env) if target_mediator_env else None
resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip()
resume_from_mediator = int(resume_from_env) if resume_from_env else None
log_group_codes = os.getenv("LOG_GROUP_CODES", "1") == "1"
output_dir = os.getenv("OUTPUT_DIR", "installments_por_loja")
save_json = os.getenv("SAVE_JSON", "0") == "1"
write_sql = os.getenv("WRITE_SQL", "1") == "1"
watermark_file = os.getenv("WATERMARK_FILE", "installments_watermark.json")
incremental_mode = os.getenv("INCREMENTAL_MODE", "1") == "1"
sql_conn = os.getenv(
"SQLSERVER_CONN",
(
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
),
)
session = requests.Session()
session.trust_env = False
auth = Auth(session)
stores = get_store_codes(session, auth, cookie_header)
if target_mediator is not None:
stores = [target_mediator]
print(f"[info] consulta focada na loja {target_mediator}")
elif resume_from_mediator is not None:
stores = [s for s in stores if s >= resume_from_mediator]
print(f"[info] retomando a partir da loja {resume_from_mediator} (restantes={len(stores)})")
stores = sorted(stores)
if save_json:
os.makedirs(output_dir, exist_ok=True)
watermark = load_watermark(watermark_file) if incremental_mode else {}
if incremental_mode:
print(f"[info] incremental ativo com watermark em {watermark_file}")
else:
print("[info] incremental desativado")
authorized: list[int] = []
unauthorized: list[int] = []
skipped_bad_request: list[int] = []
failed: Dict[int, str] = {}
def process_store(mediator_code: int) -> Dict[str, Any]:
store_start_date = start_date_fixed
if not store_start_date:
if incremental_mode:
store_start_date = watermark.get(str(mediator_code), default_start)
else:
store_start_date = default_start
local_session = requests.Session()
local_session.trust_env = False
local_auth = Auth(local_session)
try:
print(
f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}"
)
page = first_page
all_installments: List[Dict[str, Any]] = []
total_api = 0
total_pages = 1
while True:
try:
body = get_installments_page(
session=local_session,
auth=local_auth,
start_date=store_start_date,
end_date=end_date,
installment_change=installment_change,
mediator_code=mediator_code,
page=page,
cookie_header=cookie_header,
)
except Exception as e:
msg = str(e)
if "400 Bad Request" in msg and installment_change:
print(
f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; "
"tentando sem installmentChange"
)
body = get_installments_page(
session=local_session,
auth=local_auth,
start_date=store_start_date,
end_date=end_date,
installment_change=None,
mediator_code=mediator_code,
page=page,
cookie_header=cookie_header,
)
else:
raise
installments_page = (((body.get("data") or {}).get("installments")) or [])
all_installments.extend(installments_page)
pagination = ((body.get("data") or {}).get("pagination") or {})
total_api = int(pagination.get("total") or len(all_installments))
limit = int(pagination.get("limit") or len(installments_page) or 1)
total_pages = max(1, (total_api + limit - 1) // limit) if total_api else page
print(
f"[pagina-loja] loja={mediator_code} pagina={page}/{total_pages} "
f"itens_pagina={len(installments_page)}"
)
if not installments_page:
break
if page >= total_pages:
break
if page - first_page + 1 >= max_pages_per_query:
print(
f"[stop] loja={mediator_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido"
)
break
page += 1
installments = _dedupe_installments(all_installments)
group_codes = sorted(
{
str(item.get("installmentGroupCode")).strip()
for item in installments
if item.get("installmentGroupCode") is not None
and str(item.get("installmentGroupCode")).strip()
}
)
print(
f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} "
f"itens_total_agregados={len(installments)} grupos_unicos={len(group_codes)}"
)
if log_group_codes:
for gc in group_codes:
print(f"[grupo] loja={mediator_code} installmentGroupCode={gc}")
pedidos: Dict[str, Any] = {}
def fetch_group(group_code: str) -> Dict[str, Any]:
try:
group_session = requests.Session()
group_session.trust_env = False
group_auth = Auth(group_session)
group_page = first_page
all_group_installments: List[Dict[str, Any]] = []
group_total = 0
group_total_pages = 1
while True:
group_body = get_installments_page(
session=group_session,
auth=group_auth,
start_date=None,
end_date=None,
installment_change=None,
mediator_code=None,
page=group_page,
installment_group_code=group_code,
cookie_header=cookie_header,
)
group_page_items = (((group_body.get("data") or {}).get("installments")) or [])
all_group_installments.extend(group_page_items)
group_pagination = ((group_body.get("data") or {}).get("pagination") or {})
group_total = int(group_pagination.get("total") or len(all_group_installments))
group_limit = int(group_pagination.get("limit") or len(group_page_items) or 1)
group_total_pages = (
max(1, (group_total + group_limit - 1) // group_limit) if group_total else group_page
)
print(
f"[grupo-pagina] loja={mediator_code} installmentGroupCode={group_code} "
f"pagina={group_page}/{group_total_pages} itens_pagina={len(group_page_items)}"
)
if not group_page_items:
break
if group_page >= group_total_pages:
break
if group_page - first_page + 1 >= max_pages_per_query:
print(
f"[stop] grupo={group_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido"
)
break
group_page += 1
group_installments = _dedupe_installments(all_group_installments)
count = len(group_installments)
print(f"[grupo-ok] loja={mediator_code} installmentGroupCode={group_code} itens={count}")
group_body_agg = {
"data": {
"installments": group_installments,
"pagination": {"limit": count, "total": group_total or count},
},
"status": 200,
"message": "Success",
}
return {
"pedidoNumero": group_code,
"consulta": {
"installmentGroupCode": group_code,
"pageStart": first_page,
"pagesFetched": group_total_pages,
},
"ok": True,
"totalItens": count,
"detalhes": group_body_agg,
}
except Exception as e:
print(f"[grupo-erro] loja={mediator_code} installmentGroupCode={group_code} erro={e}")
return {
"pedidoNumero": group_code,
"consulta": {
"installmentGroupCode": group_code,
"pageStart": first_page,
},
"ok": False,
"error": str(e),
}
if group_codes:
with ThreadPoolExecutor(max_workers=max(1, group_workers)) as group_pool:
group_futures = {group_pool.submit(fetch_group, gc): gc for gc in group_codes}
for fut in as_completed(group_futures):
gc = group_futures[fut]
pedidos[gc] = fut.result()
store_out = {
"queryWindow": {
"startInstallmentChangeDate": store_start_date,
"endInstallmentChangeDate": end_date,
"installmentChange": installment_change,
"pageStart": first_page,
"pagesFetched": total_pages,
},
"mediatorCode": mediator_code,
"pedidosEncontradosPagina": len(group_codes),
"pedidos": pedidos,
}
if write_sql:
sql_rows: List[Dict[str, Any]] = []
for gc, pedido_payload in pedidos.items():
if not pedido_payload.get("ok"):
continue
raw_response = pedido_payload.get("detalhes") or {}
sql_rows.append(
{
"mediatorCode": mediator_code,
"installmentGroupCode": gc,
"rawResponse": raw_response,
"installments": (((raw_response.get("data") or {}).get("installments")) or []),
}
)
stats = upsert_doc_pedidos_sqlserver(
sql_rows,
sql_conn,
mediator_code_log=mediator_code,
)
store_out["sqlUpsert"] = stats
print(
f"[sql] loja={mediator_code} pedidos_upsert={stats.get('pedidos')} "
f"parcelas_upsert={stats.get('parcelas')}"
)
if save_json:
store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json")
with open(store_file, "w", encoding="utf-8") as f:
json.dump(store_out, f, ensure_ascii=False, indent=2)
print(f"[ok] loja={mediator_code} json_salvo={store_file}")
return {"status": "authorized", "mediatorCode": mediator_code, "endDate": end_date}
except Exception as e:
msg = str(e)
if "403 Forbidden no installments" in msg:
return {"status": "unauthorized", "mediatorCode": mediator_code}
if "400 Bad Request em installments" in msg and (
"Combina" in msg or "invalid" in msg.lower()
):
print(f"[skip-400] loja={mediator_code} filtro nao aplicavel para essa loja")
return {"status": "skipped_bad_request", "mediatorCode": mediator_code}
else:
print(f"[falha] loja={mediator_code} erro={msg}")
return {"status": "failed", "mediatorCode": mediator_code, "error": msg}
with ThreadPoolExecutor(max_workers=max(1, store_workers)) as store_pool:
store_futures = {store_pool.submit(process_store, mediator): mediator for mediator in stores}
for fut in as_completed(store_futures):
try:
result = fut.result()
mediator_code = int(result["mediatorCode"])
status = result["status"]
if status == "authorized":
authorized.append(mediator_code)
if incremental_mode:
watermark[str(mediator_code)] = str(result["endDate"])
save_watermark(watermark_file, watermark)
print(f"[loja-fim] loja={mediator_code} status=ok")
elif status == "unauthorized":
unauthorized.append(mediator_code)
print(f"[loja-fim] loja={mediator_code} status=nao_autorizada")
elif status == "skipped_bad_request":
skipped_bad_request.append(mediator_code)
print(f"[loja-fim] loja={mediator_code} status=skip_400")
else:
failed[mediator_code] = str(result.get("error") or "erro desconhecido")
print(f"[loja-fim] loja={mediator_code} status=falha")
except Exception as e:
mediator_code = int(store_futures[fut])
failed[mediator_code] = f"erro no worker: {e}"
print(f"[falha] loja={mediator_code} erro no worker: {e}")
print(
"[resumo] "
f"lojas_total={len(stores)} autorizadas={len(authorized)} "
f"nao_autorizadas={len(unauthorized)} skip_400={len(skipped_bad_request)} falhas={len(failed)} "
f"store_workers={store_workers} group_workers={group_workers} "
f"pasta_saida={output_dir}"
)
if failed:
first_code = sorted(failed.keys())[0]
print(f"[falha-exemplo] loja={first_code} erro={failed[first_code]}")
if __name__ == "__main__":
main()