trm_rem_saggezza/trf_registroerro.py
2026-05-22 14:24:01 -03:00

247 lines
7.4 KiB
Python

import json
from typing import Any, Dict, Optional, Set
import trf
FRANCHISES_LIST_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/franchises/list/franchise"
def _collect_store_codes(payload: Any) -> list[str]:
keys = ("sapCode", "code", "franchiseId", "franchiseCode", "mediatorCode", "storeCode")
out: list[str] = []
seen = set()
def _push(v: Any) -> None:
if v is None:
return
s = str(v).strip()
if not s:
return
if s not in seen:
seen.add(s)
out.append(s)
def _walk(obj: Any) -> None:
if isinstance(obj, dict):
for k in keys:
if k in obj:
_push(obj.get(k))
for v in obj.values():
_walk(v)
return
if isinstance(obj, list):
for item in obj:
_walk(item)
_walk(payload)
return out
class ClientLoja(trf.Client):
def __init__(self, store_code: str):
super().__init__()
self.store_code = str(store_code).strip()
def get_franchises(self, only_channels: Optional[Set[str]] = None):
r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30)
if r.status_code in (401, 403):
self.auth.invalidate()
r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30)
r.raise_for_status()
body = r.json()
source = body.get("data") if isinstance(body, dict) and "data" in body else body
lojas = _collect_store_codes(source)
if not lojas:
raise RuntimeError(
"Endpoint de franquias nao retornou codigos de loja validos. "
f"Resposta resumida: {str(body)[:500]}"
)
if self.store_code not in lojas:
raise RuntimeError(
f"Loja {self.store_code} nao encontrada/sem permissao no usuario. "
f"Lojas disponiveis: {', '.join(lojas[:30])}{'...' if len(lojas) > 30 else ''}"
)
return [self.store_code]
def sincronizar_paginas_sqlserver_loja(
connection_string: str,
store_code: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
start_offset: int = 0,
only_channels: Optional[Set[str]] = None,
commit_cada_paginas: int = 1,
) -> Dict[str, Any]:
cli = ClientLoja(store_code)
offset = start_offset
paginas = 0
docs_persistidos = 0
total = None
with trf.SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
docs_persistidos += persistidos_pag
paginas += 1
if paginas % commit_cada_paginas == 0:
sink.cn.commit()
print(
f"[sync][loja={store_code}] offset={offset} count={len(itens)} "
f"novos_pag={novos_pag} persistidos={docs_persistidos} total={total}"
)
if not pagina.get("hasNext"):
break
offset += limit
sink.cn.commit()
return {
"store_code": store_code,
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"offset_final": offset,
}
def sincronizar_incremental_sqlserver_loja(
connection_string: str,
store_code: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
only_channels: Optional[Set[str]] = None,
max_paginas_sem_novidade: int = 3,
max_paginas: int = 20,
) -> Dict[str, Any]:
cli = ClientLoja(store_code)
offset = 0
paginas = 0
docs_persistidos = 0
docs_novos = 0
sem_novidade = 0
total = None
with trf.SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
sink.cn.commit()
docs_persistidos += persistidos_pag
docs_novos += novos_pag
paginas += 1
sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1)
print(
f"[inc][loja={store_code}] offset={offset} count={len(itens)} "
f"novos_pag={novos_pag} sem_novidade={sem_novidade}/{max_paginas_sem_novidade} "
f"total_novos={docs_novos}"
)
if sem_novidade >= max_paginas_sem_novidade:
break
if paginas >= max_paginas:
break
if not pagina.get("hasNext"):
break
offset += limit
return {
"store_code": store_code,
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"documentos_novos": docs_novos,
"offset_final": offset,
"parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade,
}
if __name__ == "__main__":
RUN_MODE = "full" # edite: full | incremental | json
if RUN_MODE not in ("full", "incremental", "json"):
raise RuntimeError("RUN_MODE invalido. Use: full, incremental ou json.")
TARGET_STORE_CODE = "24430" # edite: codigo da loja, ex: 20997
store_code = str(TARGET_STORE_CODE).strip()
if not store_code:
raise RuntimeError("Preencha TARGET_STORE_CODE no codigo.")
print(f"[info] consulta focada na loja {store_code}")
if RUN_MODE in ("full", "incremental"):
SQLSERVER_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
if RUN_MODE == "full":
resultado = sincronizar_paginas_sqlserver_loja(
connection_string=SQLSERVER_CONN,
store_code=store_code,
cp_id=10269,
document_type="EFAT",
limit=100,
start_offset=0,
only_channels=None,
commit_cada_paginas=1,
)
else:
resultado = sincronizar_incremental_sqlserver_loja(
connection_string=SQLSERVER_CONN,
store_code=store_code,
cp_id=10269,
document_type="EFAT",
limit=100,
only_channels=None,
max_paginas_sem_novidade=5,
max_paginas=15,
)
else:
c = ClientLoja(store_code)
resultado = c.processar_pagina(
cp_id=10269,
document_type="EFAT",
offset=0,
limit=25,
only_channels=None,
)
print(json.dumps(resultado, ensure_ascii=False, indent=2))