247 lines
7.4 KiB
Python
247 lines
7.4 KiB
Python
import json
|
|
from typing import Any, Dict, Optional, Set
|
|
|
|
import trf
|
|
|
|
FRANCHISES_LIST_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/franchises/list/franchise"
|
|
|
|
|
|
def _collect_store_codes(payload: Any) -> list[str]:
|
|
keys = ("sapCode", "code", "franchiseId", "franchiseCode", "mediatorCode", "storeCode")
|
|
out: list[str] = []
|
|
seen = set()
|
|
|
|
def _push(v: Any) -> None:
|
|
if v is None:
|
|
return
|
|
s = str(v).strip()
|
|
if not s:
|
|
return
|
|
if s not in seen:
|
|
seen.add(s)
|
|
out.append(s)
|
|
|
|
def _walk(obj: Any) -> None:
|
|
if isinstance(obj, dict):
|
|
for k in keys:
|
|
if k in obj:
|
|
_push(obj.get(k))
|
|
for v in obj.values():
|
|
_walk(v)
|
|
return
|
|
if isinstance(obj, list):
|
|
for item in obj:
|
|
_walk(item)
|
|
|
|
_walk(payload)
|
|
return out
|
|
|
|
|
|
class ClientLoja(trf.Client):
|
|
def __init__(self, store_code: str):
|
|
super().__init__()
|
|
self.store_code = str(store_code).strip()
|
|
|
|
def get_franchises(self, only_channels: Optional[Set[str]] = None):
|
|
r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30)
|
|
if r.status_code in (401, 403):
|
|
self.auth.invalidate()
|
|
r = self.s.get(FRANCHISES_LIST_URL, headers=self._headers_json(), timeout=30)
|
|
r.raise_for_status()
|
|
|
|
body = r.json()
|
|
source = body.get("data") if isinstance(body, dict) and "data" in body else body
|
|
lojas = _collect_store_codes(source)
|
|
if not lojas:
|
|
raise RuntimeError(
|
|
"Endpoint de franquias nao retornou codigos de loja validos. "
|
|
f"Resposta resumida: {str(body)[:500]}"
|
|
)
|
|
|
|
if self.store_code not in lojas:
|
|
raise RuntimeError(
|
|
f"Loja {self.store_code} nao encontrada/sem permissao no usuario. "
|
|
f"Lojas disponiveis: {', '.join(lojas[:30])}{'...' if len(lojas) > 30 else ''}"
|
|
)
|
|
return [self.store_code]
|
|
|
|
|
|
def sincronizar_paginas_sqlserver_loja(
|
|
connection_string: str,
|
|
store_code: str,
|
|
cp_id: int = 10269,
|
|
document_type: str = "EFAT",
|
|
limit: int = 100,
|
|
start_offset: int = 0,
|
|
only_channels: Optional[Set[str]] = None,
|
|
commit_cada_paginas: int = 1,
|
|
) -> Dict[str, Any]:
|
|
cli = ClientLoja(store_code)
|
|
offset = start_offset
|
|
paginas = 0
|
|
docs_persistidos = 0
|
|
total = None
|
|
|
|
with trf.SqlServerSink(connection_string) as sink:
|
|
sink.ensure_schema()
|
|
while True:
|
|
pagina = cli.processar_pagina(
|
|
cp_id=cp_id,
|
|
document_type=document_type,
|
|
offset=offset,
|
|
limit=limit,
|
|
only_channels=only_channels,
|
|
)
|
|
if total is None:
|
|
total = int(pagina.get("total") or 0)
|
|
|
|
itens = pagina.get("items") or []
|
|
persistidos_pag, novos_pag = sink.persist_items(itens)
|
|
docs_persistidos += persistidos_pag
|
|
paginas += 1
|
|
|
|
if paginas % commit_cada_paginas == 0:
|
|
sink.cn.commit()
|
|
|
|
print(
|
|
f"[sync][loja={store_code}] offset={offset} count={len(itens)} "
|
|
f"novos_pag={novos_pag} persistidos={docs_persistidos} total={total}"
|
|
)
|
|
if not pagina.get("hasNext"):
|
|
break
|
|
offset += limit
|
|
|
|
sink.cn.commit()
|
|
|
|
return {
|
|
"store_code": store_code,
|
|
"total": total,
|
|
"paginas_processadas": paginas,
|
|
"documentos_persistidos": docs_persistidos,
|
|
"offset_final": offset,
|
|
}
|
|
|
|
|
|
def sincronizar_incremental_sqlserver_loja(
|
|
connection_string: str,
|
|
store_code: str,
|
|
cp_id: int = 10269,
|
|
document_type: str = "EFAT",
|
|
limit: int = 100,
|
|
only_channels: Optional[Set[str]] = None,
|
|
max_paginas_sem_novidade: int = 3,
|
|
max_paginas: int = 20,
|
|
) -> Dict[str, Any]:
|
|
cli = ClientLoja(store_code)
|
|
offset = 0
|
|
paginas = 0
|
|
docs_persistidos = 0
|
|
docs_novos = 0
|
|
sem_novidade = 0
|
|
total = None
|
|
|
|
with trf.SqlServerSink(connection_string) as sink:
|
|
sink.ensure_schema()
|
|
while True:
|
|
pagina = cli.processar_pagina(
|
|
cp_id=cp_id,
|
|
document_type=document_type,
|
|
offset=offset,
|
|
limit=limit,
|
|
only_channels=only_channels,
|
|
)
|
|
if total is None:
|
|
total = int(pagina.get("total") or 0)
|
|
|
|
itens = pagina.get("items") or []
|
|
persistidos_pag, novos_pag = sink.persist_items(itens)
|
|
sink.cn.commit()
|
|
|
|
docs_persistidos += persistidos_pag
|
|
docs_novos += novos_pag
|
|
paginas += 1
|
|
sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1)
|
|
|
|
print(
|
|
f"[inc][loja={store_code}] offset={offset} count={len(itens)} "
|
|
f"novos_pag={novos_pag} sem_novidade={sem_novidade}/{max_paginas_sem_novidade} "
|
|
f"total_novos={docs_novos}"
|
|
)
|
|
|
|
if sem_novidade >= max_paginas_sem_novidade:
|
|
break
|
|
if paginas >= max_paginas:
|
|
break
|
|
if not pagina.get("hasNext"):
|
|
break
|
|
offset += limit
|
|
|
|
return {
|
|
"store_code": store_code,
|
|
"total": total,
|
|
"paginas_processadas": paginas,
|
|
"documentos_persistidos": docs_persistidos,
|
|
"documentos_novos": docs_novos,
|
|
"offset_final": offset,
|
|
"parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade,
|
|
}
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
RUN_MODE = "full" # edite: full | incremental | json
|
|
if RUN_MODE not in ("full", "incremental", "json"):
|
|
raise RuntimeError("RUN_MODE invalido. Use: full, incremental ou json.")
|
|
|
|
TARGET_STORE_CODE = "24430" # edite: codigo da loja, ex: 20997
|
|
store_code = str(TARGET_STORE_CODE).strip()
|
|
if not store_code:
|
|
raise RuntimeError("Preencha TARGET_STORE_CODE no codigo.")
|
|
print(f"[info] consulta focada na loja {store_code}")
|
|
|
|
if RUN_MODE in ("full", "incremental"):
|
|
SQLSERVER_CONN = (
|
|
"DRIVER={ODBC Driver 17 for SQL Server};"
|
|
"SERVER=10.77.77.10;"
|
|
"DATABASE=GINSENG;"
|
|
"UID=andrey;"
|
|
"PWD=88253332;"
|
|
"TrustServerCertificate=yes;"
|
|
)
|
|
|
|
if RUN_MODE == "full":
|
|
resultado = sincronizar_paginas_sqlserver_loja(
|
|
connection_string=SQLSERVER_CONN,
|
|
store_code=store_code,
|
|
cp_id=10269,
|
|
document_type="EFAT",
|
|
limit=100,
|
|
start_offset=0,
|
|
only_channels=None,
|
|
commit_cada_paginas=1,
|
|
)
|
|
else:
|
|
resultado = sincronizar_incremental_sqlserver_loja(
|
|
connection_string=SQLSERVER_CONN,
|
|
store_code=store_code,
|
|
cp_id=10269,
|
|
document_type="EFAT",
|
|
limit=100,
|
|
only_channels=None,
|
|
max_paginas_sem_novidade=5,
|
|
max_paginas=15,
|
|
)
|
|
else:
|
|
c = ClientLoja(store_code)
|
|
resultado = c.processar_pagina(
|
|
cp_id=10269,
|
|
document_type="EFAT",
|
|
offset=0,
|
|
limit=25,
|
|
only_channels=None,
|
|
)
|
|
|
|
print(json.dumps(resultado, ensure_ascii=False, indent=2))
|
|
|