trm_rem_saggezza/installments_reader.py
2026-05-22 14:24:01 -03:00

907 lines
37 KiB
Python

import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os
import re
import threading
import time
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode
import requests
TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
INSTALLMENTS_URL = (
"https://bff-credit-container-portal-apigw.prd.produto-financeiro.app.grupoboticario.com.br/"
"v1/franchisee/installments"
)
_RATE_LIMIT_LOCK = threading.Lock()
_LAST_INSTALLMENTS_REQUEST_TS = 0.0
def _set_global_backoff(wait_s: float) -> None:
"""After a 429, push the shared timestamp forward so ALL threads pause."""
global _LAST_INSTALLMENTS_REQUEST_TS
min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip()
try:
min_interval_s = int(min_interval_ms_raw or "0") / 1000.0
except Exception:
min_interval_s = 0.0
with _RATE_LIMIT_LOCK:
target = time.monotonic() + wait_s - min_interval_s
if target > _LAST_INSTALLMENTS_REQUEST_TS:
_LAST_INSTALLMENTS_REQUEST_TS = target
def _jwt_payload(jwt_token: str) -> Dict[str, Any]:
parts = jwt_token.split(".")
if len(parts) != 3:
return {}
payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4)
raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
return json.loads(raw.decode("utf-8"))
@dataclass
class TokenCache:
bearer: Optional[str] = None
exp_epoch: int = 0
def valid(self, skew_seconds: int = 30) -> bool:
return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds))
class Auth:
def __init__(self, session: requests.Session):
self.s = session
self.cache = TokenCache()
self.override_bearer = os.getenv("EXTRANET_BEARER")
def get_bearer(self, force_refresh: bool = False) -> str:
if self.override_bearer:
return self.override_bearer
if (not force_refresh) and self.cache.valid():
return self.cache.bearer # type: ignore[return-value]
r = self.s.get(TOKENS_URL, timeout=30)
r.raise_for_status()
body = r.json()
if not body.get("success"):
raise RuntimeError(f"Token API retornou success=false: {body}")
bearer = body["data"][0]["token"]
jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer
exp = int(_jwt_payload(jwt).get("exp") or 0)
self.cache = TokenCache(bearer=bearer, exp_epoch=exp)
return bearer
def invalidate(self) -> None:
self.cache = TokenCache()
def _apply_installments_pacing() -> None:
min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip()
try:
min_interval_ms = int(min_interval_ms_raw or "0")
except Exception:
min_interval_ms = 0
if min_interval_ms <= 0:
return
min_interval_s = float(min_interval_ms) / 1000.0
global _LAST_INSTALLMENTS_REQUEST_TS
while True:
with _RATE_LIMIT_LOCK:
now = time.monotonic()
wait_s = (_LAST_INSTALLMENTS_REQUEST_TS + min_interval_s) - now
if wait_s <= 0:
_LAST_INSTALLMENTS_REQUEST_TS = now
return
time.sleep(min(wait_s, 1.0))
def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]:
h = {
"Authorization": auth.get_bearer(),
"Accept": "*/*",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8,en-US;q=0.7",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Content-Type": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0",
}
if cookie_header:
h["Cookie"] = cookie_header
return h
def get_installments_page(
session: requests.Session,
auth: Auth,
start_date: Optional[str],
end_date: Optional[str],
installment_change: Optional[str],
mediator_code: Optional[int],
page: int,
installment_group_code: Optional[str] = None,
cookie_header: Optional[str] = None,
page_size: Optional[int] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"page": page}
if page_size and page_size > 0:
params["limit"] = page_size
if installment_group_code:
params["installmentGroupCode"] = installment_group_code
else:
if not start_date or not end_date or mediator_code is None:
raise ValueError("Para consulta por periodo, informe start/end/mediator_code.")
params["endInstallmentChangeDate"] = end_date
params["startInstallmentChangeDate"] = start_date
params["mediatorCode"] = mediator_code
if installment_change:
params["installmentChange"] = installment_change
r = None
max_attempts = 8
transient_status = {429, 500, 502, 503, 504}
for attempt in range(max_attempts):
_apply_installments_pacing()
r = session.get(
INSTALLMENTS_URL,
headers=_headers(auth, cookie_header=cookie_header),
params=params,
timeout=60,
)
if r.status_code == 401:
print(
f"[warn] 401 no installments (tentativa {attempt + 1}/{max_attempts}), "
"renovando token..."
)
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(min(3, attempt + 1))
continue
if r.status_code == 403:
break
if r.status_code in transient_status:
wait_s = min(30, 2 ** min(5, attempt))
if r.status_code == 429:
min_429_wait_raw = os.getenv("INSTALLMENTS_429_MIN_WAIT_SEC", "3").strip()
try:
min_429_wait = float(min_429_wait_raw or "3")
except Exception:
min_429_wait = 3.0
retry_after = r.headers.get("Retry-After") or r.headers.get("retry-after")
if retry_after:
try:
min_429_wait = max(min_429_wait, float(retry_after))
print(f"[info] Retry-After da API: {retry_after}s")
except Exception:
pass
wait_s = max(wait_s, min_429_wait)
_set_global_backoff(wait_s)
print(
f"[warn] erro temporario {r.status_code} no installments "
f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..."
)
time.sleep(wait_s)
continue
break
assert r is not None
if r.status_code == 403:
payload = _jwt_payload(auth.get_bearer().split(" ", 1)[1])
stores_claim = payload.get("stores")
raise RuntimeError(
"403 Forbidden no installments. Possiveis causas: mediatorCode sem permissao no token "
f"ou falta de Cookie CloudFront. stores no token={stores_claim}. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
if r.status_code in transient_status:
if r.status_code == 429:
recovery_s = float(os.getenv("THROTTLE_RECOVERY_PAUSE_SEC", "900"))
print(
f"[throttle-recovery] 429 persistente apos {max_attempts} tentativas. "
f"Aplicando cooldown global de {int(recovery_s)}s para proxima loja..."
)
_set_global_backoff(recovery_s)
raise RuntimeError(
f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
if r.status_code == 400:
raise RuntimeError(
f"400 Bad Request em installments. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
if r.status_code == 412:
raise RuntimeError(
"412 Precondition Failed em installments. "
"A API recusou a pre-condicao da requisicao (normalmente cabecalhos/sessao). "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
r.raise_for_status()
return r.json()
def get_store_codes(
session: requests.Session,
auth: Auth,
cookie_header: Optional[str],
allowed_channels: Optional[set[str]] = None,
) -> list[int]:
r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30)
r.raise_for_status()
data = r.json().get("data") or []
if allowed_channels:
data = [
x
for x in data
if str(x.get("channel") or "").strip().casefold() in allowed_channels
]
out = sorted({int(x.get("code")) for x in data if x.get("code") is not None})
return out
def load_watermark(path: str) -> Dict[str, str]:
try:
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
if isinstance(raw, dict):
return {str(k): str(v) for k, v in raw.items()}
return {}
except Exception:
return {}
def save_watermark(path: str, data: Dict[str, str]) -> None:
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(tmp, path)
def _normalize_sql_conn(conn: str) -> str:
s = (conn or "").strip().rstrip(";")
if not s:
return s
if re.search(r"(?i)\bencrypt\s*=", s):
s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s)
else:
s += ";Encrypt=no"
if not re.search(r"(?i)\btrustservercertificate\s*=", s):
s += ";TrustServerCertificate=yes"
return s + ";"
def _parse_iso_date(value: Optional[str]):
if not value:
return None
try:
return date.fromisoformat(str(value)[:10])
except Exception:
return None
def _to_float(value: Any) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except Exception:
return None
def _dedupe_installments(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen = set()
out: List[Dict[str, Any]] = []
for item in items:
code = str(item.get("installmentCode") or "").strip()
key = code or (
str(item.get("installmentGroupCode") or ""),
str(item.get("installmentNumber") or ""),
str(item.get("borrowerCode") or ""),
str(item.get("dueDate") or ""),
)
if key in seen:
continue
seen.add(key)
out.append(item)
return out
def _ensure_doc_tables(cur) -> None:
cur.execute(
"""
IF OBJECT_ID('dbo.DocPedidos', 'U') IS NULL
BEGIN
CREATE TABLE dbo.DocPedidos (
Id INT IDENTITY(1,1) PRIMARY KEY,
InstallmentGroupCode VARCHAR(40) NOT NULL UNIQUE,
MediatorCode VARCHAR(20) NULL,
MediatorGroupCode VARCHAR(20) NULL,
IssueDate DATE NULL,
CreatedAt DATE NULL,
TotalItens INT NOT NULL DEFAULT 0,
ValorTotalOriginal DECIMAL(18,2) NULL,
ValorTotalFee DECIMAL(18,2) NULL,
PayloadJson NVARCHAR(MAX) NULL,
CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END
"""
)
cur.execute(
"""
IF OBJECT_ID('dbo.DocPedidosParcelas', 'U') IS NULL
BEGIN
CREATE TABLE dbo.DocPedidosParcelas (
Id INT IDENTITY(1,1) PRIMARY KEY,
DocPedidoId INT NOT NULL,
InstallmentCode VARCHAR(40) NOT NULL UNIQUE,
InstallmentGroupCode VARCHAR(40) NOT NULL,
InstallmentNumber INT NULL,
BorrowerCode VARCHAR(40) NULL,
DueDate DATE NULL,
IssueDate DATE NULL,
CreatedAt DATE NULL,
PaymentType VARCHAR(40) NULL,
OriginalAmount DECIMAL(18,2) NULL,
TotalFee DECIMAL(18,2) NULL,
PayloadJson NVARCHAR(MAX) NULL,
CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
CONSTRAINT FK_DocPedidosParcelas_DocPedidos FOREIGN KEY (DocPedidoId) REFERENCES dbo.DocPedidos(Id)
);
END
"""
)
def upsert_doc_pedidos_sqlserver(
rows: List[Dict[str, Any]],
connection_string: str,
mediator_code_log: Optional[int] = None,
) -> Dict[str, int]:
try:
import pyodbc # type: ignore
except Exception as e:
raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e
if not rows:
return {"pedidos": 0, "parcelas": 0}
cn = pyodbc.connect(_normalize_sql_conn(connection_string), timeout=30)
cn.autocommit = False
cur = cn.cursor()
try:
cn.timeout = int(os.getenv("SQL_STATEMENT_TIMEOUT_SEC", "120"))
except Exception:
pass
pedidos_count = 0
parcelas_count = 0
try:
_ensure_doc_tables(cur)
total_rows = len(rows)
if mediator_code_log is not None:
print(f"[sql-inicio] loja={mediator_code_log} pedidos_para_upsert={total_rows}")
for idx, row in enumerate(rows, start=1):
group_code = str(row.get("installmentGroupCode") or "").strip()
if not group_code:
continue
if mediator_code_log is not None:
print(f"[sql-progresso] loja={mediator_code_log} pedido={idx}/{total_rows} group={group_code}")
installments = row.get("installments") or []
first = installments[0] if installments else {}
mediator_code = str(row.get("mediatorCode") or first.get("mediatorCode") or "").strip() or None
mediator_group_code = str(first.get("mediatorGroupCode") or "").strip() or None
issue_date = _parse_iso_date(first.get("issueDate"))
created_at = _parse_iso_date(first.get("createdAt"))
total_itens = int(len(installments))
valor_total_original = sum(_to_float(x.get("originalAmount")) or 0.0 for x in installments)
valor_total_fee = sum(_to_float(x.get("totalFee")) or 0.0 for x in installments)
payload_json = json.dumps(row.get("rawResponse"), ensure_ascii=False)
cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code)
found = cur.fetchone()
if found:
doc_id = int(found[0])
cur.execute(
"""
UPDATE dbo.DocPedidos
SET MediatorCode=?, MediatorGroupCode=?, IssueDate=?, CreatedAt=?, TotalItens=?,
ValorTotalOriginal=?, ValorTotalFee=?, PayloadJson=?, AtualizadoEm=SYSUTCDATETIME()
WHERE Id=?
""",
mediator_code,
mediator_group_code,
issue_date,
created_at,
total_itens,
float(valor_total_original),
float(valor_total_fee),
payload_json,
doc_id,
)
else:
cur.execute(
"""
INSERT INTO dbo.DocPedidos (
InstallmentGroupCode, MediatorCode, MediatorGroupCode, IssueDate, CreatedAt,
TotalItens, ValorTotalOriginal, ValorTotalFee, PayloadJson
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
group_code,
mediator_code,
mediator_group_code,
issue_date,
created_at,
total_itens,
float(valor_total_original),
float(valor_total_fee),
payload_json,
)
cur.execute("SELECT Id FROM dbo.DocPedidos WHERE InstallmentGroupCode = ?", group_code)
got = cur.fetchone()
if not got:
continue
doc_id = int(got[0])
pedidos_count += 1
current_codes: List[str] = []
for item in installments:
installment_code = str(item.get("installmentCode") or "").strip()
if not installment_code:
continue
current_codes.append(installment_code)
cur.execute("SELECT Id FROM dbo.DocPedidosParcelas WHERE InstallmentCode = ?", installment_code)
par_found = cur.fetchone()
if par_found:
cur.execute(
"""
UPDATE dbo.DocPedidosParcelas
SET DocPedidoId=?, InstallmentGroupCode=?, InstallmentNumber=?, BorrowerCode=?, DueDate=?,
IssueDate=?, CreatedAt=?, PaymentType=?, OriginalAmount=?, TotalFee=?, PayloadJson=?,
AtualizadoEm=SYSUTCDATETIME()
WHERE InstallmentCode=?
""",
doc_id,
group_code,
int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None,
str(item.get("borrowerCode") or "")[:40] or None,
_parse_iso_date(item.get("dueDate")),
_parse_iso_date(item.get("issueDate")),
_parse_iso_date(item.get("createdAt")),
str(item.get("paymentType") or "")[:40] or None,
_to_float(item.get("originalAmount")),
_to_float(item.get("totalFee")),
json.dumps(item, ensure_ascii=False),
installment_code,
)
else:
cur.execute(
"""
INSERT INTO dbo.DocPedidosParcelas (
DocPedidoId, InstallmentCode, InstallmentGroupCode, InstallmentNumber, BorrowerCode,
DueDate, IssueDate, CreatedAt, PaymentType, OriginalAmount, TotalFee, PayloadJson
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
doc_id,
installment_code,
group_code,
int(item.get("installmentNumber")) if item.get("installmentNumber") is not None else None,
str(item.get("borrowerCode") or "")[:40] or None,
_parse_iso_date(item.get("dueDate")),
_parse_iso_date(item.get("issueDate")),
_parse_iso_date(item.get("createdAt")),
str(item.get("paymentType") or "")[:40] or None,
_to_float(item.get("originalAmount")),
_to_float(item.get("totalFee")),
json.dumps(item, ensure_ascii=False),
)
parcelas_count += 1
if current_codes:
placeholders = ",".join("?" for _ in current_codes)
cur.execute(
f"""
DELETE FROM dbo.DocPedidosParcelas
WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders})
""",
doc_id,
*current_codes,
)
cn.commit()
if mediator_code_log is not None:
print(
f"[sql-fim] loja={mediator_code_log} pedidos_upsert={pedidos_count} "
f"parcelas_upsert={parcelas_count}"
)
return {"pedidos": pedidos_count, "parcelas": parcelas_count}
except Exception:
cn.rollback()
raise
finally:
cur.close()
cn.close()
def main() -> None:
today = date.today()
start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE")
start_date_fixed = (start_date_env or "").strip() or None
last_days_env = os.getenv("LAST_N_DAYS", "5").strip()
last_n_days = int(last_days_env) if last_days_env else None
rolling_start = None
if (not start_date_fixed) and (last_n_days is not None):
rolling_start = (today - timedelta(days=last_n_days)).isoformat()
chunk_days_env = os.getenv("CHUNK_DAYS", "0").strip()
chunk_days = int(chunk_days_env) if chunk_days_env else 0
if chunk_days < 0:
chunk_days = 0
default_start = date(today.year, 1, 1).isoformat()
default_end = today.isoformat()
end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end)
installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO"
first_page = int(os.getenv("PAGE_START", "1"))
max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000"))
page_size_env = os.getenv("INSTALLMENTS_PAGE_SIZE", "").strip()
page_size = int(page_size_env) if page_size_env else None
flush_every_pages = int(os.getenv("FLUSH_EVERY_PAGES", "50"))
# Padrao organizado: uma loja por vez (logs nao ficam intercalados).
store_workers = int(os.getenv("STORE_WORKERS", "1"))
cookie_header = os.getenv("EXTRANET_COOKIE")
store_channel_env = os.getenv("STORE_CHANNEL", "VD").strip()
allowed_channels = {
c.strip().casefold()
for c in re.split(r"[,\s;]+", store_channel_env)
if c.strip()
}
if "all" in allowed_channels or "*" in allowed_channels:
allowed_channels = set()
target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip()
target_mediator = int(target_mediator_env) if target_mediator_env else None
resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip()
resume_from_mediator = int(resume_from_env) if resume_from_env else None
log_group_codes = os.getenv("LOG_GROUP_CODES", "1") == "1"
output_dir = os.getenv("OUTPUT_DIR", "installments_por_loja")
save_json = os.getenv("SAVE_JSON", "0") == "1"
write_sql = os.getenv("WRITE_SQL", "1") == "1"
watermark_file = os.getenv("WATERMARK_FILE", "installments_watermark.json")
incremental_mode = os.getenv("INCREMENTAL_MODE", "1") == "1"
sql_conn = os.getenv(
"SQLSERVER_CONN",
(
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
),
)
session = requests.Session()
session.trust_env = False
auth = Auth(session)
stores = get_store_codes(
session,
auth,
cookie_header,
allowed_channels=allowed_channels or None,
)
if allowed_channels:
channels_txt = ",".join(sorted(c.upper() for c in allowed_channels))
print(f"[info] filtro de canal ativo: STORE_CHANNEL={channels_txt} (lojas={len(stores)})")
if target_mediator is not None:
stores = [target_mediator]
print(f"[info] consulta focada na loja {target_mediator}")
elif resume_from_mediator is not None:
stores = [s for s in stores if s >= resume_from_mediator]
print(f"[info] retomando a partir da loja {resume_from_mediator} (restantes={len(stores)})")
stores = sorted(stores)
if save_json:
os.makedirs(output_dir, exist_ok=True)
watermark = load_watermark(watermark_file) if incremental_mode else {}
if incremental_mode:
print(f"[info] incremental ativo com watermark em {watermark_file}")
else:
print("[info] incremental desativado")
if rolling_start is not None and last_n_days is not None:
print(f"[info] janela movel ativa: ultimos {last_n_days} dias ({rolling_start}..{end_date})")
if chunk_days > 0:
print(f"[info] fatiamento de periodo ativo: CHUNK_DAYS={chunk_days}")
authorized: list[int] = []
unauthorized: list[int] = []
skipped_bad_request: list[int] = []
failed: Dict[int, str] = {}
def process_store(mediator_code: int) -> Dict[str, Any]:
if rolling_start:
store_start_date = rolling_start
else:
store_start_date = start_date_fixed
if not store_start_date:
if incremental_mode:
store_start_date = watermark.get(str(mediator_code), default_start)
else:
store_start_date = default_start
local_session = requests.Session()
local_session.trust_env = False
local_auth = Auth(local_session)
try:
print(
f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}"
)
store_start_obj = _parse_iso_date(store_start_date)
store_end_obj = _parse_iso_date(end_date)
if not store_start_obj or not store_end_obj:
raise RuntimeError(
f"Periodo invalido para consulta. start={store_start_date} end={end_date}"
)
if store_start_obj > store_end_obj:
raise RuntimeError(
f"Periodo invalido para consulta. start={store_start_date} end={end_date}"
)
windows: List[tuple[str, str]] = []
if chunk_days > 0:
cursor = store_start_obj
step_days = max(1, chunk_days)
while cursor <= store_end_obj:
win_end = min(store_end_obj, cursor + timedelta(days=step_days - 1))
windows.append((cursor.isoformat(), win_end.isoformat()))
cursor = win_end + timedelta(days=1)
else:
windows = [(store_start_obj.isoformat(), store_end_obj.isoformat())]
all_group_codes_seen: set = set()
total_api = 0
pages_fetched_total = 0
total_sql_pedidos = 0
total_sql_parcelas = 0
def _flush(buffer: List[Dict[str, Any]], label: str) -> None:
"""Agrupa itens do scan por installmentGroupCode e persiste no banco."""
nonlocal total_sql_pedidos, total_sql_parcelas
if not buffer:
return
groups: Dict[str, List[Dict[str, Any]]] = {}
for item in _dedupe_installments(buffer):
gc = str(item.get("installmentGroupCode") or "").strip()
if not gc or gc in all_group_codes_seen:
continue
groups.setdefault(gc, []).append(item)
if not groups:
return
all_group_codes_seen.update(groups.keys())
if log_group_codes:
for gc in groups:
print(f"[grupo] loja={mediator_code} {label} installmentGroupCode={gc}")
print(f"[flush] loja={mediator_code} {label} grupos_novos={len(groups)}")
if write_sql:
sql_rows: List[Dict[str, Any]] = []
for gc, items in groups.items():
sql_rows.append({
"mediatorCode": mediator_code,
"installmentGroupCode": gc,
"rawResponse": {"data": {"installments": items}},
"installments": items,
})
if sql_rows:
stats = upsert_doc_pedidos_sqlserver(sql_rows, sql_conn, mediator_code_log=mediator_code)
total_sql_pedidos += stats.get("pedidos", 0)
total_sql_parcelas += stats.get("parcelas", 0)
print(
f"[sql-flush] loja={mediator_code} {label} "
f"pedidos_upsert={stats.get('pedidos')} parcelas_upsert={stats.get('parcelas')}"
)
for window_idx, (window_start, window_end) in enumerate(windows, start=1):
print(
f"[consulta-janela] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"periodo={window_start}..{window_end}"
)
page = first_page
window_total = 0
window_total_pages = 1
window_pages_fetched = 0
flush_buffer: List[Dict[str, Any]] = []
while True:
try:
body = get_installments_page(
session=local_session,
auth=local_auth,
start_date=window_start,
end_date=window_end,
installment_change=installment_change,
mediator_code=mediator_code,
page=page,
cookie_header=cookie_header,
page_size=page_size,
)
except Exception as e:
msg = str(e)
if "400 Bad Request" in msg and installment_change:
print(
f"[fallback] loja={mediator_code} 400 com installmentChange={installment_change}; "
"tentando sem installmentChange"
)
body = get_installments_page(
session=local_session,
auth=local_auth,
start_date=window_start,
end_date=window_end,
installment_change=None,
mediator_code=mediator_code,
page=page,
cookie_header=cookie_header,
page_size=page_size,
)
else:
raise
installments_page = (((body.get("data") or {}).get("installments")) or [])
flush_buffer.extend(installments_page)
pagination = ((body.get("data") or {}).get("pagination") or {})
window_total = int(pagination.get("total") or 0)
limit = int(pagination.get("limit") or len(installments_page) or 1)
window_total_pages = max(1, (window_total + limit - 1) // limit) if window_total else page
window_pages_fetched += 1
print(
f"[pagina-loja] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"pagina={page}/{window_total_pages} itens_pagina={len(installments_page)}"
)
if flush_every_pages > 0 and window_pages_fetched % flush_every_pages == 0:
_flush(flush_buffer, f"janela={window_idx}/{len(windows)} pagina={page}")
flush_buffer = []
if not installments_page:
break
if page >= window_total_pages:
break
if page - first_page + 1 >= max_pages_per_query:
print(
f"[stop] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido"
)
break
page += 1
# Flush do restante da janela que não atingiu o threshold
_flush(flush_buffer, f"janela={window_idx}/{len(windows)}")
total_api += window_total
pages_fetched_total += window_pages_fetched
print(
f"[resultado-janela] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"grupos_acumulados={len(all_group_codes_seen)}"
)
print(
f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} "
f"grupos_unicos={len(all_group_codes_seen)}"
)
store_out = {
"queryWindow": {
"startInstallmentChangeDate": store_start_date,
"endInstallmentChangeDate": end_date,
"installmentChange": installment_change,
"pageStart": first_page,
"pagesFetched": pages_fetched_total,
"windowsFetched": len(windows),
"chunkDays": chunk_days if chunk_days > 0 else None,
},
"mediatorCode": mediator_code,
"pedidosEncontradosPagina": len(all_group_codes_seen),
"grupos_unicos": len(all_group_codes_seen),
}
if write_sql:
store_out["sqlUpsert"] = {"pedidos": total_sql_pedidos, "parcelas": total_sql_parcelas}
print(
f"[sql] loja={mediator_code} pedidos_upsert={total_sql_pedidos} "
f"parcelas_upsert={total_sql_parcelas}"
)
if save_json:
store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json")
with open(store_file, "w", encoding="utf-8") as f:
json.dump(store_out, f, ensure_ascii=False, indent=2)
print(f"[ok] loja={mediator_code} json_salvo={store_file}")
return {"status": "authorized", "mediatorCode": mediator_code, "endDate": end_date}
except Exception as e:
msg = str(e)
if "403 Forbidden no installments" in msg:
return {"status": "unauthorized", "mediatorCode": mediator_code}
if "400 Bad Request em installments" in msg and (
"Combina" in msg or "invalid" in msg.lower()
):
print(f"[skip-400] loja={mediator_code} filtro nao aplicavel para essa loja")
return {"status": "skipped_bad_request", "mediatorCode": mediator_code}
else:
print(f"[falha] loja={mediator_code} erro={msg}")
return {"status": "failed", "mediatorCode": mediator_code, "error": msg}
with ThreadPoolExecutor(max_workers=max(1, store_workers)) as store_pool:
store_futures = {store_pool.submit(process_store, mediator): mediator for mediator in stores}
for fut in as_completed(store_futures):
try:
result = fut.result()
mediator_code = int(result["mediatorCode"])
status = result["status"]
if status == "authorized":
authorized.append(mediator_code)
if incremental_mode:
watermark[str(mediator_code)] = str(result["endDate"])
save_watermark(watermark_file, watermark)
print(f"[loja-fim] loja={mediator_code} status=ok")
elif status == "unauthorized":
unauthorized.append(mediator_code)
print(f"[loja-fim] loja={mediator_code} status=nao_autorizada")
elif status == "skipped_bad_request":
skipped_bad_request.append(mediator_code)
print(f"[loja-fim] loja={mediator_code} status=skip_400")
else:
failed[mediator_code] = str(result.get("error") or "erro desconhecido")
print(f"[loja-fim] loja={mediator_code} status=falha")
except Exception as e:
mediator_code = int(store_futures[fut])
failed[mediator_code] = f"erro no worker: {e}"
print(f"[falha] loja={mediator_code} erro no worker: {e}")
retry_wait_sec = int(os.getenv("RETRY_FAILED_WAIT_SEC", "90"))
if failed and retry_wait_sec > 0:
retry_candidates = sorted(
k for k, v in failed.items() if "429" in str(v) or "temporaria" in str(v).lower()
)
if retry_candidates:
print(
f"[retry] {len(retry_candidates)} loja(s) falharam por throttle. "
f"Aguardando {retry_wait_sec}s antes de retentar sequencialmente..."
)
time.sleep(retry_wait_sec)
for mc in retry_candidates:
result = process_store(mc)
status = result["status"]
if status == "authorized":
authorized.append(mc)
failed.pop(mc, None)
if incremental_mode:
watermark[str(mc)] = str(result["endDate"])
save_watermark(watermark_file, watermark)
print(f"[retry-ok] loja={mc}")
else:
failed[mc] = str(result.get("error") or "erro desconhecido")
print(f"[retry-falha] loja={mc} erro={failed[mc]}")
print(
"[resumo] "
f"lojas_total={len(stores)} autorizadas={len(authorized)} "
f"nao_autorizadas={len(unauthorized)} skip_400={len(skipped_bad_request)} falhas={len(failed)} "
f"store_workers={store_workers} group_workers={group_workers} "
f"pasta_saida={output_dir}"
)
if failed:
first_code = sorted(failed.keys())[0]
print(f"[falha-exemplo] loja={first_code} erro={failed[first_code]}")
if __name__ == "__main__":
main()