This commit is contained in:
Andrey Cunh@ 2026-03-02 16:02:01 -03:00
parent 912466bde4
commit 2d09ab55fd
3 changed files with 377 additions and 149 deletions

Binary file not shown.

View File

@ -2,9 +2,10 @@ import base64
import json import json
import re import re
import time import time
from contextlib import redirect_stderr
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from io import BytesIO from io import BytesIO, StringIO
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import requests import requests
@ -18,6 +19,7 @@ TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
DEBIT_NOTES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/debit-notes/documents-list" DEBIT_NOTES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/debit-notes/documents-list"
HANDLE_IMAGES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/handle-images" HANDLE_IMAGES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/handle-images"
FRANCHISES_LIST_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/franchises/list/franchise"
class UnauthorizedTokenError(RuntimeError): class UnauthorizedTokenError(RuntimeError):
@ -104,6 +106,39 @@ def get_franchise_ids(session: requests.Session, auth: Auth) -> List[str]:
return out return out
def get_franchise_ids_from_sf(session: requests.Session, auth: Auth, cp_id: int) -> List[str]:
r = None
for attempt in range(4):
r = session.get(FRANCHISES_LIST_URL, headers=_headers(auth, content_type=False), timeout=60)
if r.status_code not in (401, 403):
break
print(f"[warn] token invalido ao listar franchises do sf (tentativa {attempt + 1}/4), renovando token...")
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(min(3, attempt + 1))
assert r is not None
r.raise_for_status()
body = r.json()
if isinstance(body, list):
items = body
elif isinstance(body, dict) and isinstance(body.get("data"), list):
items = body.get("data")
else:
raise RuntimeError("Resposta inesperada em /v1/franchises/list/franchise")
out: List[str] = []
seen = set()
for item in items:
if str(item.get("cpId") or "") != str(cp_id):
continue
code = str(item.get("sapCode") or "").strip()
if code and code not in seen:
seen.add(code)
out.append(code)
return out
def get_debit_notes_page( def get_debit_notes_page(
session: requests.Session, session: requests.Session,
auth: Auth, auth: Auth,
@ -212,6 +247,14 @@ def extract_pdf_text(pdf_bytes: bytes) -> str:
return "\n".join(text).strip() return "\n".join(text).strip()
def extract_pdf_text_with_diagnostics(pdf_bytes: bytes) -> tuple[str, List[str]]:
buf = StringIO()
with redirect_stderr(buf):
text = extract_pdf_text(pdf_bytes)
lines = [ln.strip() for ln in buf.getvalue().splitlines() if ln.strip()]
return text, lines
def parse_money_br(value: Optional[str]) -> Optional[float]: def parse_money_br(value: Optional[str]) -> Optional[float]:
if not value: if not value:
return None return None
@ -252,6 +295,7 @@ def numero_parcela_from_text(value: Optional[str], fallback: int) -> int:
def parse_pdf_fields(text: str) -> Dict[str, Any]: def parse_pdf_fields(text: str) -> Dict[str, Any]:
m_cliente = re.search(r"Cliente:\s*(\d+)", text, flags=re.IGNORECASE) m_cliente = re.search(r"Cliente:\s*(\d+)", text, flags=re.IGNORECASE)
m_cnpj = re.search(r"CNPJ\s*:\s*([0-9.\-\/]+)", text, flags=re.IGNORECASE)
m_nota = re.search(r"NOTA\s+DE\s+D[ÉE]BITO\s*:\s*([A-Z0-9-]+)", text, flags=re.IGNORECASE) m_nota = re.search(r"NOTA\s+DE\s+D[ÉE]BITO\s*:\s*([A-Z0-9-]+)", text, flags=re.IGNORECASE)
m_emissao = re.search( m_emissao = re.search(
r"Data\s+(?:de\s+)?emiss[aã]o\s*:\s*(\d{2}\.\d{2}\.\d{4})", r"Data\s+(?:de\s+)?emiss[aã]o\s*:\s*(\d{2}\.\d{2}\.\d{4})",
@ -302,6 +346,7 @@ def parse_pdf_fields(text: str) -> Dict[str, Any]:
return { return {
"cliente": m_cliente.group(1) if m_cliente else None, "cliente": m_cliente.group(1) if m_cliente else None,
"cnpj": m_cnpj.group(1).strip() if m_cnpj else None,
"notaDebito": m_nota.group(1) if m_nota else None, "notaDebito": m_nota.group(1) if m_nota else None,
"dataEmissao": m_emissao.group(1) if m_emissao else None, "dataEmissao": m_emissao.group(1) if m_emissao else None,
"valorTotalDebito": m_total.group(1) if m_total else None, "valorTotalDebito": m_total.group(1) if m_total else None,
@ -327,8 +372,29 @@ def upsert_rows_sqlserver(rows: List[Dict[str, Any]], connection_string: str) ->
cur = cn.cursor() cur = cn.cursor()
docs = 0 docs = 0
pars = 0 pars = 0
has_denominacao_col = False
has_cnpj_col = False
try: try:
cur.execute(
"""
SELECT 1
FROM sys.columns
WHERE object_id = OBJECT_ID('dbo.TrfDocumento')
AND name = 'Denominacao'
"""
)
has_denominacao_col = cur.fetchone() is not None
cur.execute(
"""
SELECT 1
FROM sys.columns
WHERE object_id = OBJECT_ID('dbo.TrfDocumento')
AND name = 'CNPJ'
"""
)
has_cnpj_col = cur.fetchone() is not None
for row in rows: for row in rows:
id_externo = row.get("id") id_externo = row.get("id")
if id_externo is None: if id_externo is None:
@ -339,6 +405,65 @@ def upsert_rows_sqlserver(rows: List[Dict[str, Any]], connection_string: str) ->
found = cur.fetchone() found = cur.fetchone()
if found: if found:
doc_id = int(found[0]) doc_id = int(found[0])
if has_denominacao_col and has_cnpj_col:
cur.execute(
"""
UPDATE dbo.TrfDocumento
SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?,
EmissaoNF=?, NotaFiscal=?, Denominacao=?, CNPJ=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME()
WHERE id=?
""",
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
str(row.get("denominacao") or "")[:255] or None,
str(row.get("cnpj") or "")[:20] or None,
row.get("valorTotalDebitoNum"),
0.0,
doc_id,
)
elif has_denominacao_col:
cur.execute(
"""
UPDATE dbo.TrfDocumento
SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?,
EmissaoNF=?, NotaFiscal=?, Denominacao=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME()
WHERE id=?
""",
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
str(row.get("denominacao") or "")[:255] or None,
row.get("valorTotalDebitoNum"),
0.0,
doc_id,
)
elif has_cnpj_col:
cur.execute(
"""
UPDATE dbo.TrfDocumento
SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?,
EmissaoNF=?, NotaFiscal=?, CNPJ=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME()
WHERE id=?
""",
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
str(row.get("cnpj") or "")[:20] or None,
row.get("valorTotalDebitoNum"),
0.0,
doc_id,
)
else:
cur.execute( cur.execute(
""" """
UPDATE dbo.TrfDocumento UPDATE dbo.TrfDocumento
@ -356,6 +481,68 @@ WHERE id=?
0.0, 0.0,
doc_id, doc_id,
) )
else:
if has_denominacao_col and has_cnpj_col:
cur.execute(
"""
INSERT INTO dbo.TrfDocumento (
UUID, IdExterno, FranchiseId, ImageName, EmissionDate,
EmissaoNF, NotaFiscal, Denominacao, CNPJ, ValorNF, Encargos
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
uuid,
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
str(row.get("denominacao") or "")[:255] or None,
str(row.get("cnpj") or "")[:20] or None,
row.get("valorTotalDebitoNum"),
0.0,
)
elif has_denominacao_col:
cur.execute(
"""
INSERT INTO dbo.TrfDocumento (
UUID, IdExterno, FranchiseId, ImageName, EmissionDate,
EmissaoNF, NotaFiscal, Denominacao, ValorNF, Encargos
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
uuid,
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
str(row.get("denominacao") or "")[:255] or None,
row.get("valorTotalDebitoNum"),
0.0,
)
elif has_cnpj_col:
cur.execute(
"""
INSERT INTO dbo.TrfDocumento (
UUID, IdExterno, FranchiseId, ImageName, EmissionDate,
EmissaoNF, NotaFiscal, CNPJ, ValorNF, Encargos
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
uuid,
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
str(row.get("cnpj") or "")[:20] or None,
row.get("valorTotalDebitoNum"),
0.0,
)
else: else:
cur.execute( cur.execute(
""" """
@ -450,8 +637,10 @@ def main() -> None:
# Fluxo fixo: leitura da API -> extracao do PDF em memoria -> upsert SQL. # Fluxo fixo: leitura da API -> extracao do PDF em memoria -> upsert SQL.
CP_ID = 10269 CP_ID = 10269
TAKE = 25 TAKE = 25
MAX_PAGINAS_RECENTES = 50 # Modo diario: varre apenas as paginas mais recentes por franquia.
MAX_PAGINAS_RECENTES = 5
MAX_PAGINAS_SEM_NOVIDADE = 5 MAX_PAGINAS_SEM_NOVIDADE = 5
SKIP_EXISTENTE_MESMO_IMAGENAME = True
DOCUMENT_TYPE = "NDEB" DOCUMENT_TYPE = "NDEB"
SQL_CONN = ( SQL_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};" "DRIVER={ODBC Driver 17 for SQL Server};"
@ -466,13 +655,18 @@ def main() -> None:
s = requests.Session() s = requests.Session()
s.trust_env = False s.trust_env = False
a = Auth(s) a = Auth(s)
f = get_franchise_ids(s, a) f = get_franchise_ids_from_sf(s, a, CP_ID)
return s, a, f return s, a, f
session, auth, franchises = _new_client() session, auth, all_franchises = _new_client()
target_franchises = list(all_franchises)
total_docs_upsert = 0 total_docs_upsert = 0
total_parcs_upsert = 0 total_parcs_upsert = 0
skip = 0 # para testes, pular os primeiros 900 registros (36 paginas) e ir direto para os mais recentes. Ajustar para 0 para rodar do inicio. O endpoint suporta skip alto, mas pode ser mais lento. O ideal é rodar periodicamente com skip=0 para pegar os novos registros. font_warning_files: List[str] = []
for idx_fr, franchise_code in enumerate(target_franchises, start=1):
print(f"[franchise] iniciando {idx_fr}/{len(target_franchises)} franchiseId={franchise_code}")
skip = 0
skip_inicial = skip skip_inicial = skip
total = None total = None
pagina = 0 pagina = 0
@ -480,9 +674,12 @@ def main() -> None:
relogins = 0 relogins = 0
max_relogins = 20 max_relogins = 20
paginas_sem_novidade = 0 paginas_sem_novidade = 0
docs_upsert_fr = 0
parcs_upsert_fr = 0
while True: while True:
try: try:
page = get_debit_notes_page(session, auth, CP_ID, skip, TAKE, franchises) page = get_debit_notes_page(session, auth, CP_ID, skip, TAKE, [franchise_code])
except UnauthorizedTokenError as e: except UnauthorizedTokenError as e:
relogins += 1 relogins += 1
if relogins > max_relogins: if relogins > max_relogins:
@ -490,10 +687,10 @@ def main() -> None:
f"Falha apos {max_relogins} relogins. Ultimo erro: {e}" f"Falha apos {max_relogins} relogins. Ultimo erro: {e}"
) from e ) from e
print( print(
f"[relogin] 401 persistente em skip={skip}. " f"[relogin] 401 persistente em skip={skip} franchiseId={franchise_code}. "
f"Refazendo sessao/token ({relogins}/{max_relogins})..." f"Refazendo sessao/token ({relogins}/{max_relogins})..."
) )
session, auth, franchises = _new_client() session, auth, _ = _new_client()
time.sleep(2) time.sleep(2)
continue continue
if total is None: if total is None:
@ -503,9 +700,12 @@ def main() -> None:
total = 0 total = 0
total_paginas = (total + TAKE - 1) // TAKE if total > 0 else None total_paginas = (total + TAKE - 1) // TAKE if total > 0 else None
if total_paginas: if total_paginas:
print(f"[info] total_registros={total} total_paginas={total_paginas} take={TAKE}") print(
f"[info] franchiseId={franchise_code} total_registros={total} "
f"total_paginas={total_paginas} take={TAKE}"
)
else: else:
print(f"[info] total_registros={total} take={TAKE}") print(f"[info] franchiseId={franchise_code} total_registros={total} take={TAKE}")
docs = page.get("documentsList") or [] docs = page.get("documentsList") or []
if not docs: if not docs:
@ -520,13 +720,21 @@ def main() -> None:
if total_paginas: if total_paginas:
if total_paginas_restantes is not None: if total_paginas_restantes is not None:
print( print(
f"[page] baixando pagina_execucao={pagina}/{total_paginas_restantes} " f"[page] franchiseId={franchise_code} "
f"baixando pagina_execucao={pagina}/{total_paginas_restantes} "
f"pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})" f"pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})"
) )
else: else:
print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})") print(
f"[page] franchiseId={franchise_code} "
f"baixando pagina_execucao={pagina} pagina_global={pagina_global}/{total_paginas} "
f"(itens={len(docs)})"
)
else: else:
print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global} (itens={len(docs)})") print(
f"[page] franchiseId={franchise_code} "
f"baixando pagina_execucao={pagina} pagina_global={pagina_global} (itens={len(docs)})"
)
doc_ids_page: List[int] = [] doc_ids_page: List[int] = []
for d in docs: for d in docs:
@ -543,7 +751,7 @@ def main() -> None:
franchise_id = str(d.get("franchiseId") or "").strip() franchise_id = str(d.get("franchiseId") or "").strip()
image_name = str(d.get("imageName") or "").strip() image_name = str(d.get("imageName") or "").strip()
existing_image_name = existing_map.get(doc_id) existing_image_name = existing_map.get(doc_id)
if doc_id in existing_map and (existing_image_name or "") == image_name: if SKIP_EXISTENTE_MESMO_IMAGENAME and doc_id in existing_map and (existing_image_name or "") == image_name:
skipped_existing += 1 skipped_existing += 1
print(f"[skip] {doc_id} ja existe no SQL com mesmo imageName") print(f"[skip] {doc_id} ja existe no SQL com mesmo imageName")
continue continue
@ -557,7 +765,11 @@ def main() -> None:
image_name, image_name,
) )
pdf_bytes = download_pdf_bytes(session, dl_url) pdf_bytes = download_pdf_bytes(session, dl_url)
txt = extract_pdf_text(pdf_bytes) txt, diag_lines = extract_pdf_text_with_diagnostics(pdf_bytes)
has_font_warn = any("FontBBox" in ln for ln in diag_lines)
if has_font_warn:
font_warning_files.append(image_name)
print(f"[warn-font] {doc_id} -> {image_name}")
parsed = parse_pdf_fields(txt) parsed = parse_pdf_fields(txt)
page_rows.append( page_rows.append(
{ {
@ -574,12 +786,16 @@ def main() -> None:
novos_na_pagina = len(page_rows) novos_na_pagina = len(page_rows)
if page_rows: if page_rows:
stats_page = upsert_rows_sqlserver(page_rows, SQL_CONN) stats_page = upsert_rows_sqlserver(page_rows, SQL_CONN)
total_docs_upsert += int(stats_page.get("documentos") or 0) docs_page = int(stats_page.get("documentos") or 0)
total_parcs_upsert += int(stats_page.get("parcelas") or 0) parcs_page = int(stats_page.get("parcelas") or 0)
total_docs_upsert += docs_page
total_parcs_upsert += parcs_page
docs_upsert_fr += docs_page
parcs_upsert_fr += parcs_page
print( print(
f"[sql] pagina={pagina} docs_upsert={stats_page['documentos']} " f"[sql] franchiseId={franchise_code} pagina={pagina} docs_upsert={docs_page} "
f"parc_upsert={stats_page['parcelas']} acumulado_docs={total_docs_upsert} " f"parc_upsert={parcs_page} acumulado_fr_docs={docs_upsert_fr} "
f"acumulado_parc={total_parcs_upsert}" f"acumulado_fr_parc={parcs_upsert_fr}"
) )
if novos_na_pagina == 0: if novos_na_pagina == 0:
@ -588,25 +804,37 @@ def main() -> None:
paginas_sem_novidade = 0 paginas_sem_novidade = 0
print( print(
f"[page] pagina={pagina} novos={novos_na_pagina} skip_sql={skipped_existing} " f"[page] franchiseId={franchise_code} pagina={pagina} novos={novos_na_pagina} "
f"sem_novidade={paginas_sem_novidade}/{MAX_PAGINAS_SEM_NOVIDADE}" f"skip_sql={skipped_existing} sem_novidade={paginas_sem_novidade}/{MAX_PAGINAS_SEM_NOVIDADE}"
) )
skip += TAKE skip += TAKE
print(f"[page] concluida {pagina} acumulado_docs={total_docs_upsert} total={total}") print(
f"[page] franchiseId={franchise_code} concluida {pagina} "
f"acumulado_fr_docs={docs_upsert_fr} total={total}"
)
if pagina >= MAX_PAGINAS_RECENTES: if pagina >= MAX_PAGINAS_RECENTES:
print(f"[stop] limite diario atingido: {MAX_PAGINAS_RECENTES} paginas recentes") print(f"[stop] franchiseId={franchise_code} limite diario atingido: {MAX_PAGINAS_RECENTES} paginas recentes")
break break
if paginas_sem_novidade >= MAX_PAGINAS_SEM_NOVIDADE: if paginas_sem_novidade >= MAX_PAGINAS_SEM_NOVIDADE:
print(f"[stop] sem novidades por {MAX_PAGINAS_SEM_NOVIDADE} paginas consecutivas") print(f"[stop] franchiseId={franchise_code} sem novidades por {MAX_PAGINAS_SEM_NOVIDADE} paginas consecutivas")
break break
if total and skip >= total: if total and skip >= total:
break break
print(
f"[franchise] concluida franchiseId={franchise_code} "
f"docs_upsert={docs_upsert_fr} parcelas_upsert={parcs_upsert_fr}"
)
if font_warning_files:
uniq_font_warn = sorted(set(font_warning_files))
print(f"[warn-font] total_arquivos_com_warning={len(uniq_font_warn)}")
for name in uniq_font_warn:
print(f"[warn-font] arquivo={name}")
print(f"SQL upsert final -> documentos={total_docs_upsert} parcelas={total_parcs_upsert}") print(f"SQL upsert final -> documentos={total_docs_upsert} parcelas={total_parcs_upsert}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()