This commit is contained in:
Cunha 2026-05-22 14:24:01 -03:00
parent b57fae393e
commit bbb9a16986
7 changed files with 770 additions and 182 deletions

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,48 @@
import os
from datetime import date
import installments_reader
def _setdefault_env(key: str, value: str) -> None:
if not os.getenv(key):
os.environ[key] = value
def main() -> None:
today = date.today()
start_of_year = date(today.year, 1, 1)
defaults = {
"STORE_CHANNEL": "VD",
"START_INSTALLMENT_CHANGE_DATE": start_of_year.isoformat(),
"END_INSTALLMENT_CHANGE_DATE": today.isoformat(),
"CHUNK_DAYS": "30",
"LAST_N_DAYS": "",
"STORE_WORKERS": "1",
"GROUP_WORKERS": "1",
"WRITE_SQL": "1",
"SAVE_JSON": "0",
"INCREMENTAL_MODE": "0",
"LOG_GROUP_CODES": "0",
"INSTALLMENTS_PAGE_SIZE": "50",
"FLUSH_EVERY_PAGES": "50",
"INSTALLMENTS_MIN_INTERVAL_MS": "3500",
"INSTALLMENTS_429_MIN_WAIT_SEC": "45",
"THROTTLE_RECOVERY_PAUSE_SEC": "900",
}
for key, value in defaults.items():
_setdefault_env(key, value)
print(
"[backfill] modo=VD "
f"periodo={os.environ['START_INSTALLMENT_CHANGE_DATE']}..{os.environ['END_INSTALLMENT_CHANGE_DATE']} "
f"chunk_days={os.environ['CHUNK_DAYS']} write_sql={os.environ['WRITE_SQL']} "
f"min_interval_ms={os.environ['INSTALLMENTS_MIN_INTERVAL_MS']} "
f"wait429_min_s={os.environ['INSTALLMENTS_429_MIN_WAIT_SEC']}"
)
installments_reader.main()
if __name__ == "__main__":
main()

185
installments_by_order.py Normal file
View File

@ -0,0 +1,185 @@
"""
Consulta a API de parcelas por installmentGroupCode (orderNumber) e grava
em DocPedidos + DocPedidosParcelas.
Fonte dos códigos: SELECT orderNumber FROM Grgb_fiscal_invoices_items
"""
import os
import time
import requests
import installments_reader
from installments_reader import Auth, get_installments_page, upsert_doc_pedidos_sqlserver
# ─── CONFIGURE AQUI ───────────────────────────────────────────────────────────
WRITE_SQL = True # True = grava no banco | False = só lê e exibe
SKIP_EXISTING = True # True = pula códigos já gravados em DocPedidos
BATCH_SIZE = 10 # grava no banco a cada N pedidos processados
MIN_INTERVAL_MS = 3500 # ms mínimo entre chamadas à API
# ──────────────────────────────────────────────────────────────────────────────
SQL_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
def _get_order_numbers() -> list[str]:
import pyodbc
cn = pyodbc.connect(SQL_CONN, timeout=30)
cur = cn.cursor()
cur.execute(
"SELECT DISTINCT [orderNumber] "
"FROM [GINSENG].[dbo].[Grgb_fiscal_invoices_items] "
"WHERE [orderNumber] IS NOT NULL AND LTRIM(RTRIM(CAST([orderNumber] AS VARCHAR))) <> ''"
)
return [str(row[0]).strip() for row in cur.fetchall()]
def _get_existing_group_codes() -> set[str]:
import pyodbc
try:
cn = pyodbc.connect(SQL_CONN, timeout=30)
cur = cn.cursor()
cur.execute("SELECT InstallmentGroupCode FROM dbo.DocPedidos")
return {str(row[0]).strip() for row in cur.fetchall()}
except Exception:
return set()
def _fetch_all_pages(session: requests.Session, auth: Auth, group_code: str) -> list:
all_items = []
page = 1
while True:
body = get_installments_page(
session=session,
auth=auth,
start_date=None,
end_date=None,
installment_change=None,
mediator_code=None,
page=page,
installment_group_code=group_code,
cookie_header=None,
)
items = ((body.get("data") or {}).get("installments")) or []
all_items.extend(items)
pagination = (body.get("data") or {}).get("pagination") or {}
total = int(pagination.get("total") or 0)
limit = int(pagination.get("limit") or len(items) or 10)
total_pages = max(1, (total + limit - 1) // limit) if total else page
print(f" pagina={page}/{total_pages} itens={len(items)}")
if not items or page >= total_pages:
break
page += 1
return all_items
def _flush(batch: list, label: str = "") -> dict:
if not batch or not WRITE_SQL:
return {}
stats = upsert_doc_pedidos_sqlserver(batch, SQL_CONN)
print(
f"[sql] {label} "
f"pedidos={stats.get('pedidos', 0)} parcelas={stats.get('parcelas', 0)}"
)
return stats
def main() -> None:
os.environ.setdefault("INSTALLMENTS_MIN_INTERVAL_MS", str(MIN_INTERVAL_MS))
os.environ.setdefault("INSTALLMENTS_429_MIN_WAIT_SEC", "45")
os.environ.setdefault("THROTTLE_RECOVERY_PAUSE_SEC", "900")
print("[info] buscando orderNumbers no banco...")
order_numbers = _get_order_numbers()
print(f"[info] total encontrado: {len(order_numbers)}")
existing: set[str] = set()
if SKIP_EXISTING:
existing = _get_existing_group_codes()
print(f"[info] já em DocPedidos: {len(existing)} — serão pulados")
pending = [c for c in order_numbers if c not in existing]
print(f"[info] para consultar na API: {len(pending)}\n")
session = requests.Session()
session.trust_env = False
auth = Auth(session)
batch: list = []
batch_first_idx: int = 0
batch_first_code: str = ""
total_pedidos = 0
total_parcelas = 0
erros = 0
start_ts = time.monotonic()
def _fmt(s: float) -> str:
s = int(s)
if s < 60:
return f"{s}s"
if s < 3600:
return f"{s // 60}m:{s % 60:02d}s"
return f"{s // 3600}h:{(s % 3600) // 60:02d}m"
for idx, group_code in enumerate(pending, 1):
elapsed = time.monotonic() - start_ts
avg_s = elapsed / idx
restante = avg_s * (len(pending) - idx)
print(f"[{idx}/{len(pending)}] group_code={group_code} "
f"decorrido={_fmt(elapsed)} restante~{_fmt(restante)}")
try:
items = _fetch_all_pages(session, auth, group_code)
if not items:
print(" sem dados na API")
continue
if not batch:
batch_first_idx = idx
batch_first_code = group_code
batch.append({
"installmentGroupCode": group_code,
"rawResponse": {"data": {"installments": items}},
"installments": items,
})
if len(batch) >= BATCH_SIZE:
label = (f"inserindo consultas {batch_first_idx}..{idx} "
f"(group {batch_first_code} até {group_code})")
stats = _flush(batch, label)
total_pedidos += stats.get("pedidos", 0)
total_parcelas += stats.get("parcelas", 0)
batch = []
batch_first_idx = 0
batch_first_code = ""
except Exception as exc:
print(f" [erro] {exc}")
erros += 1
# flush final
if batch:
label = (f"inserindo consultas {batch_first_idx}..{len(pending)} "
f"(group {batch_first_code} até {batch[-1]['installmentGroupCode']})")
else:
label = "flush final"
stats = _flush(batch, label)
total_pedidos += stats.get("pedidos", 0)
total_parcelas += stats.get("parcelas", 0)
print(
f"\n[fim] pedidos={total_pedidos} parcelas={total_parcelas} erros={erros}"
)
if __name__ == "__main__":
main()

View File

@ -3,6 +3,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
import json import json
import os import os
import re import re
import threading
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date, timedelta from datetime import date, timedelta
@ -14,10 +15,27 @@ import requests
TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
INSTALLMENTS_URL = ( INSTALLMENTS_URL = (
"https://bff-credit-container-portal-apigw.produto-financeiro.grupoboticario.digital/" "https://bff-credit-container-portal-apigw.prd.produto-financeiro.app.grupoboticario.com.br/"
"v1/franchisee/installments" "v1/franchisee/installments"
) )
_RATE_LIMIT_LOCK = threading.Lock()
_LAST_INSTALLMENTS_REQUEST_TS = 0.0
def _set_global_backoff(wait_s: float) -> None:
"""After a 429, push the shared timestamp forward so ALL threads pause."""
global _LAST_INSTALLMENTS_REQUEST_TS
min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip()
try:
min_interval_s = int(min_interval_ms_raw or "0") / 1000.0
except Exception:
min_interval_s = 0.0
with _RATE_LIMIT_LOCK:
target = time.monotonic() + wait_s - min_interval_s
if target > _LAST_INSTALLMENTS_REQUEST_TS:
_LAST_INSTALLMENTS_REQUEST_TS = target
def _jwt_payload(jwt_token: str) -> Dict[str, Any]: def _jwt_payload(jwt_token: str) -> Dict[str, Any]:
parts = jwt_token.split(".") parts = jwt_token.split(".")
@ -65,12 +83,40 @@ class Auth:
self.cache = TokenCache() self.cache = TokenCache()
def _apply_installments_pacing() -> None:
min_interval_ms_raw = os.getenv("INSTALLMENTS_MIN_INTERVAL_MS", "0").strip()
try:
min_interval_ms = int(min_interval_ms_raw or "0")
except Exception:
min_interval_ms = 0
if min_interval_ms <= 0:
return
min_interval_s = float(min_interval_ms) / 1000.0
global _LAST_INSTALLMENTS_REQUEST_TS
while True:
with _RATE_LIMIT_LOCK:
now = time.monotonic()
wait_s = (_LAST_INSTALLMENTS_REQUEST_TS + min_interval_s) - now
if wait_s <= 0:
_LAST_INSTALLMENTS_REQUEST_TS = now
return
time.sleep(min(wait_s, 1.0))
def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]: def _headers(auth: Auth, cookie_header: Optional[str]) -> Dict[str, str]:
h = { h = {
"Authorization": auth.get_bearer(), "Authorization": auth.get_bearer(),
"Accept": "application/json, text/plain, */*", "Accept": "*/*",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8,en-US;q=0.7",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Content-Type": "application/json",
"Origin": "https://extranet.grupoboticario.com.br", "Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/", "Referer": "https://extranet.grupoboticario.com.br/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0", "User-Agent": "Mozilla/5.0",
} }
if cookie_header: if cookie_header:
@ -88,8 +134,11 @@ def get_installments_page(
page: int, page: int,
installment_group_code: Optional[str] = None, installment_group_code: Optional[str] = None,
cookie_header: Optional[str] = None, cookie_header: Optional[str] = None,
page_size: Optional[int] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
params: Dict[str, Any] = {"page": page} params: Dict[str, Any] = {"page": page}
if page_size and page_size > 0:
params["limit"] = page_size
if installment_group_code: if installment_group_code:
params["installmentGroupCode"] = installment_group_code params["installmentGroupCode"] = installment_group_code
else: else:
@ -105,6 +154,7 @@ def get_installments_page(
max_attempts = 8 max_attempts = 8
transient_status = {429, 500, 502, 503, 504} transient_status = {429, 500, 502, 503, 504}
for attempt in range(max_attempts): for attempt in range(max_attempts):
_apply_installments_pacing()
r = session.get( r = session.get(
INSTALLMENTS_URL, INSTALLMENTS_URL,
headers=_headers(auth, cookie_header=cookie_header), headers=_headers(auth, cookie_header=cookie_header),
@ -124,6 +174,21 @@ def get_installments_page(
break break
if r.status_code in transient_status: if r.status_code in transient_status:
wait_s = min(30, 2 ** min(5, attempt)) wait_s = min(30, 2 ** min(5, attempt))
if r.status_code == 429:
min_429_wait_raw = os.getenv("INSTALLMENTS_429_MIN_WAIT_SEC", "3").strip()
try:
min_429_wait = float(min_429_wait_raw or "3")
except Exception:
min_429_wait = 3.0
retry_after = r.headers.get("Retry-After") or r.headers.get("retry-after")
if retry_after:
try:
min_429_wait = max(min_429_wait, float(retry_after))
print(f"[info] Retry-After da API: {retry_after}s")
except Exception:
pass
wait_s = max(wait_s, min_429_wait)
_set_global_backoff(wait_s)
print( print(
f"[warn] erro temporario {r.status_code} no installments " f"[warn] erro temporario {r.status_code} no installments "
f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..." f"(tentativa {attempt + 1}/{max_attempts}), aguardando {wait_s}s..."
@ -142,6 +207,13 @@ def get_installments_page(
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
) )
if r.status_code in transient_status: if r.status_code in transient_status:
if r.status_code == 429:
recovery_s = float(os.getenv("THROTTLE_RECOVERY_PAUSE_SEC", "900"))
print(
f"[throttle-recovery] 429 persistente apos {max_attempts} tentativas. "
f"Aplicando cooldown global de {int(recovery_s)}s para proxima loja..."
)
_set_global_backoff(recovery_s)
raise RuntimeError( raise RuntimeError(
f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. " f"Falha temporaria persistente ({r.status_code}) no installments apos {max_attempts} tentativas. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
@ -151,14 +223,31 @@ def get_installments_page(
f"400 Bad Request em installments. " f"400 Bad Request em installments. "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}" f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
) )
if r.status_code == 412:
raise RuntimeError(
"412 Precondition Failed em installments. "
"A API recusou a pre-condicao da requisicao (normalmente cabecalhos/sessao). "
f"url={INSTALLMENTS_URL}?{urlencode(params)} body={r.text[:500]}"
)
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
def get_store_codes(session: requests.Session, auth: Auth, cookie_header: Optional[str]) -> list[int]: def get_store_codes(
session: requests.Session,
auth: Auth,
cookie_header: Optional[str],
allowed_channels: Optional[set[str]] = None,
) -> list[int]:
r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30) r = session.get(STORES_URL, headers=_headers(auth, cookie_header=cookie_header), timeout=30)
r.raise_for_status() r.raise_for_status()
data = r.json().get("data") or [] data = r.json().get("data") or []
if allowed_channels:
data = [
x
for x in data
if str(x.get("channel") or "").strip().casefold() in allowed_channels
]
out = sorted({int(x.get("code")) for x in data if x.get("code") is not None}) out = sorted({int(x.get("code")) for x in data if x.get("code") is not None})
return out return out
@ -453,21 +542,37 @@ WHERE DocPedidoId = ? AND InstallmentCode NOT IN ({placeholders})
def main() -> None: def main() -> None:
today = date.today() today = date.today()
last_days_env = os.getenv("LAST_N_DAYS", "5").strip()
last_n_days = int(last_days_env) if last_days_env else None
rolling_start = (today - timedelta(days=last_n_days)).isoformat() if last_n_days is not None else None
default_start = date(today.year, 1, 1).isoformat()
default_end = today.isoformat()
start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE") start_date_env = os.getenv("START_INSTALLMENT_CHANGE_DATE")
start_date_fixed = (start_date_env or "").strip() or None start_date_fixed = (start_date_env or "").strip() or None
last_days_env = os.getenv("LAST_N_DAYS", "5").strip()
last_n_days = int(last_days_env) if last_days_env else None
rolling_start = None
if (not start_date_fixed) and (last_n_days is not None):
rolling_start = (today - timedelta(days=last_n_days)).isoformat()
chunk_days_env = os.getenv("CHUNK_DAYS", "0").strip()
chunk_days = int(chunk_days_env) if chunk_days_env else 0
if chunk_days < 0:
chunk_days = 0
default_start = date(today.year, 1, 1).isoformat()
default_end = today.isoformat()
end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end) end_date = os.getenv("END_INSTALLMENT_CHANGE_DATE", default_end)
installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO" installment_change = os.getenv("INSTALLMENT_CHANGE", "CRIACAO").strip() or "CRIACAO"
first_page = int(os.getenv("PAGE_START", "1")) first_page = int(os.getenv("PAGE_START", "1"))
max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000")) max_pages_per_query = int(os.getenv("MAX_PAGES_PER_QUERY", "10000"))
page_size_env = os.getenv("INSTALLMENTS_PAGE_SIZE", "").strip()
page_size = int(page_size_env) if page_size_env else None
flush_every_pages = int(os.getenv("FLUSH_EVERY_PAGES", "50"))
# Padrao organizado: uma loja por vez (logs nao ficam intercalados). # Padrao organizado: uma loja por vez (logs nao ficam intercalados).
store_workers = int(os.getenv("STORE_WORKERS", "1")) store_workers = int(os.getenv("STORE_WORKERS", "1"))
group_workers = int(os.getenv("GROUP_WORKERS", "4"))
cookie_header = os.getenv("EXTRANET_COOKIE") cookie_header = os.getenv("EXTRANET_COOKIE")
store_channel_env = os.getenv("STORE_CHANNEL", "VD").strip()
allowed_channels = {
c.strip().casefold()
for c in re.split(r"[,\s;]+", store_channel_env)
if c.strip()
}
if "all" in allowed_channels or "*" in allowed_channels:
allowed_channels = set()
target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip() target_mediator_env = os.getenv("TARGET_MEDIATOR_CODE", "").strip()
target_mediator = int(target_mediator_env) if target_mediator_env else None target_mediator = int(target_mediator_env) if target_mediator_env else None
resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip() resume_from_env = os.getenv("RESUME_FROM_MEDIATOR_CODE", "").strip()
@ -494,7 +599,15 @@ def main() -> None:
session.trust_env = False session.trust_env = False
auth = Auth(session) auth = Auth(session)
stores = get_store_codes(session, auth, cookie_header) stores = get_store_codes(
session,
auth,
cookie_header,
allowed_channels=allowed_channels or None,
)
if allowed_channels:
channels_txt = ",".join(sorted(c.upper() for c in allowed_channels))
print(f"[info] filtro de canal ativo: STORE_CHANNEL={channels_txt} (lojas={len(stores)})")
if target_mediator is not None: if target_mediator is not None:
stores = [target_mediator] stores = [target_mediator]
print(f"[info] consulta focada na loja {target_mediator}") print(f"[info] consulta focada na loja {target_mediator}")
@ -509,8 +622,10 @@ def main() -> None:
print(f"[info] incremental ativo com watermark em {watermark_file}") print(f"[info] incremental ativo com watermark em {watermark_file}")
else: else:
print("[info] incremental desativado") print("[info] incremental desativado")
if last_n_days is not None: if rolling_start is not None and last_n_days is not None:
print(f"[info] janela movel ativa: ultimos {last_n_days} dias ({rolling_start}..{end_date})") print(f"[info] janela movel ativa: ultimos {last_n_days} dias ({rolling_start}..{end_date})")
if chunk_days > 0:
print(f"[info] fatiamento de periodo ativo: CHUNK_DAYS={chunk_days}")
authorized: list[int] = [] authorized: list[int] = []
unauthorized: list[int] = [] unauthorized: list[int] = []
@ -531,27 +646,97 @@ def main() -> None:
local_session = requests.Session() local_session = requests.Session()
local_session.trust_env = False local_session.trust_env = False
local_auth = Auth(local_session) local_auth = Auth(local_session)
store_bearer = local_auth.get_bearer()
try: try:
print( print(
f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}" f"[consulta] loja={mediator_code} periodo={store_start_date}..{end_date} pagina_inicial={first_page}"
) )
page = first_page store_start_obj = _parse_iso_date(store_start_date)
all_installments: List[Dict[str, Any]] = [] store_end_obj = _parse_iso_date(end_date)
if not store_start_obj or not store_end_obj:
raise RuntimeError(
f"Periodo invalido para consulta. start={store_start_date} end={end_date}"
)
if store_start_obj > store_end_obj:
raise RuntimeError(
f"Periodo invalido para consulta. start={store_start_date} end={end_date}"
)
windows: List[tuple[str, str]] = []
if chunk_days > 0:
cursor = store_start_obj
step_days = max(1, chunk_days)
while cursor <= store_end_obj:
win_end = min(store_end_obj, cursor + timedelta(days=step_days - 1))
windows.append((cursor.isoformat(), win_end.isoformat()))
cursor = win_end + timedelta(days=1)
else:
windows = [(store_start_obj.isoformat(), store_end_obj.isoformat())]
all_group_codes_seen: set = set()
total_api = 0 total_api = 0
total_pages = 1 pages_fetched_total = 0
total_sql_pedidos = 0
total_sql_parcelas = 0
def _flush(buffer: List[Dict[str, Any]], label: str) -> None:
"""Agrupa itens do scan por installmentGroupCode e persiste no banco."""
nonlocal total_sql_pedidos, total_sql_parcelas
if not buffer:
return
groups: Dict[str, List[Dict[str, Any]]] = {}
for item in _dedupe_installments(buffer):
gc = str(item.get("installmentGroupCode") or "").strip()
if not gc or gc in all_group_codes_seen:
continue
groups.setdefault(gc, []).append(item)
if not groups:
return
all_group_codes_seen.update(groups.keys())
if log_group_codes:
for gc in groups:
print(f"[grupo] loja={mediator_code} {label} installmentGroupCode={gc}")
print(f"[flush] loja={mediator_code} {label} grupos_novos={len(groups)}")
if write_sql:
sql_rows: List[Dict[str, Any]] = []
for gc, items in groups.items():
sql_rows.append({
"mediatorCode": mediator_code,
"installmentGroupCode": gc,
"rawResponse": {"data": {"installments": items}},
"installments": items,
})
if sql_rows:
stats = upsert_doc_pedidos_sqlserver(sql_rows, sql_conn, mediator_code_log=mediator_code)
total_sql_pedidos += stats.get("pedidos", 0)
total_sql_parcelas += stats.get("parcelas", 0)
print(
f"[sql-flush] loja={mediator_code} {label} "
f"pedidos_upsert={stats.get('pedidos')} parcelas_upsert={stats.get('parcelas')}"
)
for window_idx, (window_start, window_end) in enumerate(windows, start=1):
print(
f"[consulta-janela] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"periodo={window_start}..{window_end}"
)
page = first_page
window_total = 0
window_total_pages = 1
window_pages_fetched = 0
flush_buffer: List[Dict[str, Any]] = []
while True: while True:
try: try:
body = get_installments_page( body = get_installments_page(
session=local_session, session=local_session,
auth=local_auth, auth=local_auth,
start_date=store_start_date, start_date=window_start,
end_date=end_date, end_date=window_end,
installment_change=installment_change, installment_change=installment_change,
mediator_code=mediator_code, mediator_code=mediator_code,
page=page, page=page,
cookie_header=cookie_header, cookie_header=cookie_header,
page_size=page_size,
) )
except Exception as e: except Exception as e:
msg = str(e) msg = str(e)
@ -563,141 +748,56 @@ def main() -> None:
body = get_installments_page( body = get_installments_page(
session=local_session, session=local_session,
auth=local_auth, auth=local_auth,
start_date=store_start_date, start_date=window_start,
end_date=end_date, end_date=window_end,
installment_change=None, installment_change=None,
mediator_code=mediator_code, mediator_code=mediator_code,
page=page, page=page,
cookie_header=cookie_header, cookie_header=cookie_header,
page_size=page_size,
) )
else: else:
raise raise
installments_page = (((body.get("data") or {}).get("installments")) or []) installments_page = (((body.get("data") or {}).get("installments")) or [])
all_installments.extend(installments_page) flush_buffer.extend(installments_page)
pagination = ((body.get("data") or {}).get("pagination") or {}) pagination = ((body.get("data") or {}).get("pagination") or {})
total_api = int(pagination.get("total") or len(all_installments)) window_total = int(pagination.get("total") or 0)
limit = int(pagination.get("limit") or len(installments_page) or 1) limit = int(pagination.get("limit") or len(installments_page) or 1)
total_pages = max(1, (total_api + limit - 1) // limit) if total_api else page window_total_pages = max(1, (window_total + limit - 1) // limit) if window_total else page
window_pages_fetched += 1
print( print(
f"[pagina-loja] loja={mediator_code} pagina={page}/{total_pages} " f"[pagina-loja] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"itens_pagina={len(installments_page)}" f"pagina={page}/{window_total_pages} itens_pagina={len(installments_page)}"
) )
if flush_every_pages > 0 and window_pages_fetched % flush_every_pages == 0:
_flush(flush_buffer, f"janela={window_idx}/{len(windows)} pagina={page}")
flush_buffer = []
if not installments_page: if not installments_page:
break break
if page >= total_pages: if page >= window_total_pages:
break break
if page - first_page + 1 >= max_pages_per_query: if page - first_page + 1 >= max_pages_per_query:
print( print(
f"[stop] loja={mediator_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido" f"[stop] loja={mediator_code} janela={window_idx}/{len(windows)} "
f"limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido"
) )
break break
page += 1 page += 1
installments = _dedupe_installments(all_installments) # Flush do restante da janela que não atingiu o threshold
group_codes = sorted( _flush(flush_buffer, f"janela={window_idx}/{len(windows)}")
{
str(item.get("installmentGroupCode")).strip() total_api += window_total
for item in installments pages_fetched_total += window_pages_fetched
if item.get("installmentGroupCode") is not None print(
and str(item.get("installmentGroupCode")).strip() f"[resultado-janela] loja={mediator_code} janela={window_idx}/{len(windows)} "
} f"grupos_acumulados={len(all_group_codes_seen)}"
) )
print( print(
f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} " f"[resultado-loja] loja={mediator_code} pedidos_encontrados={total_api} "
f"itens_total_agregados={len(installments)} grupos_unicos={len(group_codes)}" f"grupos_unicos={len(all_group_codes_seen)}"
) )
if log_group_codes:
for gc in group_codes:
print(f"[grupo] loja={mediator_code} installmentGroupCode={gc}")
pedidos: Dict[str, Any] = {}
def fetch_group(group_code: str) -> Dict[str, Any]:
try:
group_session = requests.Session()
group_session.trust_env = False
group_auth = Auth(group_session)
# Reutiliza o token da loja para evitar nova ida ao endpoint /api/tokens por grupo.
group_auth.override_bearer = store_bearer
group_page = first_page
all_group_installments: List[Dict[str, Any]] = []
group_total = 0
group_total_pages = 1
while True:
group_body = get_installments_page(
session=group_session,
auth=group_auth,
start_date=None,
end_date=None,
installment_change=None,
mediator_code=None,
page=group_page,
installment_group_code=group_code,
cookie_header=cookie_header,
)
group_page_items = (((group_body.get("data") or {}).get("installments")) or [])
all_group_installments.extend(group_page_items)
group_pagination = ((group_body.get("data") or {}).get("pagination") or {})
group_total = int(group_pagination.get("total") or len(all_group_installments))
group_limit = int(group_pagination.get("limit") or len(group_page_items) or 1)
group_total_pages = (
max(1, (group_total + group_limit - 1) // group_limit) if group_total else group_page
)
print(
f"[grupo-pagina] loja={mediator_code} installmentGroupCode={group_code} "
f"pagina={group_page}/{group_total_pages} itens_pagina={len(group_page_items)}"
)
if not group_page_items:
break
if group_page >= group_total_pages:
break
if group_page - first_page + 1 >= max_pages_per_query:
print(
f"[stop] grupo={group_code} limite MAX_PAGES_PER_QUERY={max_pages_per_query} atingido"
)
break
group_page += 1
group_installments = _dedupe_installments(all_group_installments)
count = len(group_installments)
print(f"[grupo-ok] loja={mediator_code} installmentGroupCode={group_code} itens={count}")
group_body_agg = {
"data": {
"installments": group_installments,
"pagination": {"limit": count, "total": group_total or count},
},
"status": 200,
"message": "Success",
}
return {
"pedidoNumero": group_code,
"consulta": {
"installmentGroupCode": group_code,
"pageStart": first_page,
"pagesFetched": group_total_pages,
},
"ok": True,
"totalItens": count,
"detalhes": group_body_agg,
}
except Exception as e:
print(f"[grupo-erro] loja={mediator_code} installmentGroupCode={group_code} erro={e}")
return {
"pedidoNumero": group_code,
"consulta": {
"installmentGroupCode": group_code,
"pageStart": first_page,
},
"ok": False,
"error": str(e),
}
if group_codes:
with ThreadPoolExecutor(max_workers=max(1, group_workers)) as group_pool:
group_futures = {group_pool.submit(fetch_group, gc): gc for gc in group_codes}
for fut in as_completed(group_futures):
gc = group_futures[fut]
pedidos[gc] = fut.result()
store_out = { store_out = {
"queryWindow": { "queryWindow": {
@ -705,35 +805,19 @@ def main() -> None:
"endInstallmentChangeDate": end_date, "endInstallmentChangeDate": end_date,
"installmentChange": installment_change, "installmentChange": installment_change,
"pageStart": first_page, "pageStart": first_page,
"pagesFetched": total_pages, "pagesFetched": pages_fetched_total,
"windowsFetched": len(windows),
"chunkDays": chunk_days if chunk_days > 0 else None,
}, },
"mediatorCode": mediator_code, "mediatorCode": mediator_code,
"pedidosEncontradosPagina": len(group_codes), "pedidosEncontradosPagina": len(all_group_codes_seen),
"pedidos": pedidos, "grupos_unicos": len(all_group_codes_seen),
} }
if write_sql: if write_sql:
sql_rows: List[Dict[str, Any]] = [] store_out["sqlUpsert"] = {"pedidos": total_sql_pedidos, "parcelas": total_sql_parcelas}
for gc, pedido_payload in pedidos.items():
if not pedido_payload.get("ok"):
continue
raw_response = pedido_payload.get("detalhes") or {}
sql_rows.append(
{
"mediatorCode": mediator_code,
"installmentGroupCode": gc,
"rawResponse": raw_response,
"installments": (((raw_response.get("data") or {}).get("installments")) or []),
}
)
stats = upsert_doc_pedidos_sqlserver(
sql_rows,
sql_conn,
mediator_code_log=mediator_code,
)
store_out["sqlUpsert"] = stats
print( print(
f"[sql] loja={mediator_code} pedidos_upsert={stats.get('pedidos')} " f"[sql] loja={mediator_code} pedidos_upsert={total_sql_pedidos} "
f"parcelas_upsert={stats.get('parcelas')}" f"parcelas_upsert={total_sql_parcelas}"
) )
if save_json: if save_json:
store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json") store_file = os.path.join(output_dir, f"installments_loja_{mediator_code}.json")
@ -781,6 +865,31 @@ def main() -> None:
failed[mediator_code] = f"erro no worker: {e}" failed[mediator_code] = f"erro no worker: {e}"
print(f"[falha] loja={mediator_code} erro no worker: {e}") print(f"[falha] loja={mediator_code} erro no worker: {e}")
retry_wait_sec = int(os.getenv("RETRY_FAILED_WAIT_SEC", "90"))
if failed and retry_wait_sec > 0:
retry_candidates = sorted(
k for k, v in failed.items() if "429" in str(v) or "temporaria" in str(v).lower()
)
if retry_candidates:
print(
f"[retry] {len(retry_candidates)} loja(s) falharam por throttle. "
f"Aguardando {retry_wait_sec}s antes de retentar sequencialmente..."
)
time.sleep(retry_wait_sec)
for mc in retry_candidates:
result = process_store(mc)
status = result["status"]
if status == "authorized":
authorized.append(mc)
failed.pop(mc, None)
if incremental_mode:
watermark[str(mc)] = str(result["endDate"])
save_watermark(watermark_file, watermark)
print(f"[retry-ok] loja={mc}")
else:
failed[mc] = str(result.get("error") or "erro desconhecido")
print(f"[retry-falha] loja={mc} erro={failed[mc]}")
print( print(
"[resumo] " "[resumo] "
f"lojas_total={len(stores)} autorizadas={len(authorized)} " f"lojas_total={len(stores)} autorizadas={len(authorized)} "

246
trf_registroerro.py Normal file
View File

@ -0,0 +1,246 @@
import json
from typing import Any, Dict, Optional, Set
import trf
FRANCHISES_LIST_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/franchises/list/franchise"
def _collect_store_codes(payload: Any) -> list[str]:
keys = ("sapCode", "code", "franchiseId", "franchiseCode", "mediatorCode", "storeCode")
out: list[str] = []
seen = set()
def _push(v: Any) -> None:
if v is None:
return
s = str(v).strip()
if not s:
return
if s not in seen:
seen.add(s)
out.append(s)
def _walk(obj: Any) -> None:
if isinstance(obj, dict):
for k in keys:
if k in obj:
_push(obj.get(k))
for v in obj.values():
_walk(v)
return
if isinstance(obj, list):
for item in obj:
_walk(item)
_walk(payload)
return out
class ClientLoja(trf.Client):
def __init__(self, store_code: str):
super().__init__()
self.store_code = str(store_code).strip()
def get_franchises(self, only_channels: Optional[Set[str]] = None):
r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30)
if r.status_code in (401, 403):
self.auth.invalidate()
r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30)
r.raise_for_status()
body = r.json()
source = body.get("data") if isinstance(body, dict) and "data" in body else body
lojas = _collect_store_codes(source)
if not lojas:
raise RuntimeError(
"Endpoint de franquias nao retornou codigos de loja validos. "
f"Resposta resumida: {str(body)[:500]}"
)
if self.store_code not in lojas:
raise RuntimeError(
f"Loja {self.store_code} nao encontrada/sem permissao no usuario. "
f"Lojas disponiveis: {', '.join(lojas[:30])}{'...' if len(lojas) > 30 else ''}"
)
return [self.store_code]
def sincronizar_paginas_sqlserver_loja(
connection_string: str,
store_code: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
start_offset: int = 0,
only_channels: Optional[Set[str]] = None,
commit_cada_paginas: int = 1,
) -> Dict[str, Any]:
cli = ClientLoja(store_code)
offset = start_offset
paginas = 0
docs_persistidos = 0
total = None
with trf.SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
docs_persistidos += persistidos_pag
paginas += 1
if paginas % commit_cada_paginas == 0:
sink.cn.commit()
print(
f"[sync][loja={store_code}] offset={offset} count={len(itens)} "
f"novos_pag={novos_pag} persistidos={docs_persistidos} total={total}"
)
if not pagina.get("hasNext"):
break
offset += limit
sink.cn.commit()
return {
"store_code": store_code,
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"offset_final": offset,
}
def sincronizar_incremental_sqlserver_loja(
connection_string: str,
store_code: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
only_channels: Optional[Set[str]] = None,
max_paginas_sem_novidade: int = 3,
max_paginas: int = 20,
) -> Dict[str, Any]:
cli = ClientLoja(store_code)
offset = 0
paginas = 0
docs_persistidos = 0
docs_novos = 0
sem_novidade = 0
total = None
with trf.SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
sink.cn.commit()
docs_persistidos += persistidos_pag
docs_novos += novos_pag
paginas += 1
sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1)
print(
f"[inc][loja={store_code}] offset={offset} count={len(itens)} "
f"novos_pag={novos_pag} sem_novidade={sem_novidade}/{max_paginas_sem_novidade} "
f"total_novos={docs_novos}"
)
if sem_novidade >= max_paginas_sem_novidade:
break
if paginas >= max_paginas:
break
if not pagina.get("hasNext"):
break
offset += limit
return {
"store_code": store_code,
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"documentos_novos": docs_novos,
"offset_final": offset,
"parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade,
}
if __name__ == "__main__":
RUN_MODE = "full" # edite: full | incremental | json
if RUN_MODE not in ("full", "incremental", "json"):
raise RuntimeError("RUN_MODE invalido. Use: full, incremental ou json.")
TARGET_STORE_CODE = "24430" # edite: codigo da loja, ex: 20997
store_code = str(TARGET_STORE_CODE).strip()
if not store_code:
raise RuntimeError("Preencha TARGET_STORE_CODE no codigo.")
print(f"[info] consulta focada na loja {store_code}")
if RUN_MODE in ("full", "incremental"):
SQLSERVER_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
if RUN_MODE == "full":
resultado = sincronizar_paginas_sqlserver_loja(
connection_string=SQLSERVER_CONN,
store_code=store_code,
cp_id=10269,
document_type="EFAT",
limit=100,
start_offset=0,
only_channels=None,
commit_cada_paginas=1,
)
else:
resultado = sincronizar_incremental_sqlserver_loja(
connection_string=SQLSERVER_CONN,
store_code=store_code,
cp_id=10269,
document_type="EFAT",
limit=100,
only_channels=None,
max_paginas_sem_novidade=5,
max_paginas=15,
)
else:
c = ClientLoja(store_code)
resultado = c.processar_pagina(
cp_id=10269,
document_type="EFAT",
offset=0,
limit=25,
only_channels=None,
)
print(json.dumps(resultado, ensure_ascii=False, indent=2))