diff --git a/__pycache__/debit_notes_list_test.cpython-313.pyc b/__pycache__/debit_notes_list_test.cpython-313.pyc new file mode 100644 index 0000000..b2d4117 Binary files /dev/null and b/__pycache__/debit_notes_list_test.cpython-313.pyc differ diff --git a/__pycache__/debit_notes_pdf_reader.cpython-313.pyc b/__pycache__/debit_notes_pdf_reader.cpython-313.pyc index 85a36a2..f073ef2 100644 Binary files a/__pycache__/debit_notes_pdf_reader.cpython-313.pyc and b/__pycache__/debit_notes_pdf_reader.cpython-313.pyc differ diff --git a/debit_notes_pdf_reader.py b/debit_notes_pdf_reader.py index 964d486..38a2b6e 100644 --- a/debit_notes_pdf_reader.py +++ b/debit_notes_pdf_reader.py @@ -2,9 +2,10 @@ import base64 import json import re import time +from contextlib import redirect_stderr from dataclasses import dataclass from datetime import datetime -from io import BytesIO +from io import BytesIO, StringIO from typing import Any, Dict, List, Optional import requests @@ -18,6 +19,7 @@ TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" DEBIT_NOTES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/debit-notes/documents-list" HANDLE_IMAGES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/handle-images" +FRANCHISES_LIST_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/franchises/list/franchise" class UnauthorizedTokenError(RuntimeError): @@ -104,6 +106,39 @@ def get_franchise_ids(session: requests.Session, auth: Auth) -> List[str]: return out +def get_franchise_ids_from_sf(session: requests.Session, auth: Auth, cp_id: int) -> List[str]: + r = None + for attempt in range(4): + r = session.get(FRANCHISES_LIST_URL, headers=_headers(auth, content_type=False), timeout=60) + if r.status_code not in (401, 403): + break + print(f"[warn] token invalido ao listar franchises do sf (tentativa {attempt + 1}/4), renovando token...") + auth.invalidate() + auth.get_bearer(force_refresh=True) + time.sleep(min(3, attempt + 1)) + + assert r is not None + r.raise_for_status() + body = r.json() + if isinstance(body, list): + items = body + elif isinstance(body, dict) and isinstance(body.get("data"), list): + items = body.get("data") + else: + raise RuntimeError("Resposta inesperada em /v1/franchises/list/franchise") + + out: List[str] = [] + seen = set() + for item in items: + if str(item.get("cpId") or "") != str(cp_id): + continue + code = str(item.get("sapCode") or "").strip() + if code and code not in seen: + seen.add(code) + out.append(code) + return out + + def get_debit_notes_page( session: requests.Session, auth: Auth, @@ -212,6 +247,14 @@ def extract_pdf_text(pdf_bytes: bytes) -> str: return "\n".join(text).strip() +def extract_pdf_text_with_diagnostics(pdf_bytes: bytes) -> tuple[str, List[str]]: + buf = StringIO() + with redirect_stderr(buf): + text = extract_pdf_text(pdf_bytes) + lines = [ln.strip() for ln in buf.getvalue().splitlines() if ln.strip()] + return text, lines + + def parse_money_br(value: Optional[str]) -> Optional[float]: if not value: return None @@ -252,6 +295,7 @@ def numero_parcela_from_text(value: Optional[str], fallback: int) -> int: def parse_pdf_fields(text: str) -> Dict[str, Any]: m_cliente = re.search(r"Cliente:\s*(\d+)", text, flags=re.IGNORECASE) + m_cnpj = re.search(r"CNPJ\s*:\s*([0-9.\-\/]+)", text, flags=re.IGNORECASE) m_nota = re.search(r"NOTA\s+DE\s+D[ÉE]BITO\s*:\s*([A-Z0-9-]+)", text, flags=re.IGNORECASE) m_emissao = re.search( r"Data\s+(?:de\s+)?emiss[aã]o\s*:\s*(\d{2}\.\d{2}\.\d{4})", @@ -302,6 +346,7 @@ def parse_pdf_fields(text: str) -> Dict[str, Any]: return { "cliente": m_cliente.group(1) if m_cliente else None, + "cnpj": m_cnpj.group(1).strip() if m_cnpj else None, "notaDebito": m_nota.group(1) if m_nota else None, "dataEmissao": m_emissao.group(1) if m_emissao else None, "valorTotalDebito": m_total.group(1) if m_total else None, @@ -327,8 +372,29 @@ def upsert_rows_sqlserver(rows: List[Dict[str, Any]], connection_string: str) -> cur = cn.cursor() docs = 0 pars = 0 + has_denominacao_col = False + has_cnpj_col = False try: + cur.execute( + """ +SELECT 1 +FROM sys.columns +WHERE object_id = OBJECT_ID('dbo.TrfDocumento') + AND name = 'Denominacao' + """ + ) + has_denominacao_col = cur.fetchone() is not None + cur.execute( + """ +SELECT 1 +FROM sys.columns +WHERE object_id = OBJECT_ID('dbo.TrfDocumento') + AND name = 'CNPJ' + """ + ) + has_cnpj_col = cur.fetchone() is not None + for row in rows: id_externo = row.get("id") if id_externo is None: @@ -339,42 +405,163 @@ def upsert_rows_sqlserver(rows: List[Dict[str, Any]], connection_string: str) -> found = cur.fetchone() if found: doc_id = int(found[0]) - cur.execute( - """ + if has_denominacao_col and has_cnpj_col: + cur.execute( + """ +UPDATE dbo.TrfDocumento +SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, + EmissaoNF=?, NotaFiscal=?, Denominacao=?, CNPJ=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME() +WHERE id=? + """, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + str(row.get("denominacao") or "")[:255] or None, + str(row.get("cnpj") or "")[:20] or None, + row.get("valorTotalDebitoNum"), + 0.0, + doc_id, + ) + elif has_denominacao_col: + cur.execute( + """ +UPDATE dbo.TrfDocumento +SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, + EmissaoNF=?, NotaFiscal=?, Denominacao=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME() +WHERE id=? + """, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + str(row.get("denominacao") or "")[:255] or None, + row.get("valorTotalDebitoNum"), + 0.0, + doc_id, + ) + elif has_cnpj_col: + cur.execute( + """ +UPDATE dbo.TrfDocumento +SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, + EmissaoNF=?, NotaFiscal=?, CNPJ=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME() +WHERE id=? + """, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + str(row.get("cnpj") or "")[:20] or None, + row.get("valorTotalDebitoNum"), + 0.0, + doc_id, + ) + else: + cur.execute( + """ UPDATE dbo.TrfDocumento SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME() WHERE id=? - """, - int(id_externo), - str(row.get("franchiseId") or "")[:20] or None, - str(row.get("imageName") or "")[:150] or None, - parse_date_br(row.get("dataEmissao")), - parse_date_br(row.get("dataEmissao")), - str(row.get("notaDebito") or "")[:40] or None, - row.get("valorTotalDebitoNum"), - 0.0, - doc_id, - ) + """, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + row.get("valorTotalDebitoNum"), + 0.0, + doc_id, + ) else: - cur.execute( - """ + if has_denominacao_col and has_cnpj_col: + cur.execute( + """ +INSERT INTO dbo.TrfDocumento ( + UUID, IdExterno, FranchiseId, ImageName, EmissionDate, + EmissaoNF, NotaFiscal, Denominacao, CNPJ, ValorNF, Encargos +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + uuid, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + str(row.get("denominacao") or "")[:255] or None, + str(row.get("cnpj") or "")[:20] or None, + row.get("valorTotalDebitoNum"), + 0.0, + ) + elif has_denominacao_col: + cur.execute( + """ +INSERT INTO dbo.TrfDocumento ( + UUID, IdExterno, FranchiseId, ImageName, EmissionDate, + EmissaoNF, NotaFiscal, Denominacao, ValorNF, Encargos +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + uuid, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + str(row.get("denominacao") or "")[:255] or None, + row.get("valorTotalDebitoNum"), + 0.0, + ) + elif has_cnpj_col: + cur.execute( + """ +INSERT INTO dbo.TrfDocumento ( + UUID, IdExterno, FranchiseId, ImageName, EmissionDate, + EmissaoNF, NotaFiscal, CNPJ, ValorNF, Encargos +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + uuid, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + str(row.get("cnpj") or "")[:20] or None, + row.get("valorTotalDebitoNum"), + 0.0, + ) + else: + cur.execute( + """ INSERT INTO dbo.TrfDocumento ( UUID, IdExterno, FranchiseId, ImageName, EmissionDate, EmissaoNF, NotaFiscal, ValorNF, Encargos ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - uuid, - int(id_externo), - str(row.get("franchiseId") or "")[:20] or None, - str(row.get("imageName") or "")[:150] or None, - parse_date_br(row.get("dataEmissao")), - parse_date_br(row.get("dataEmissao")), - str(row.get("notaDebito") or "")[:40] or None, - row.get("valorTotalDebitoNum"), - 0.0, - ) + """, + uuid, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + row.get("valorTotalDebitoNum"), + 0.0, + ) cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid) got = cur.fetchone() if not got: @@ -450,8 +637,10 @@ def main() -> None: # Fluxo fixo: leitura da API -> extracao do PDF em memoria -> upsert SQL. CP_ID = 10269 TAKE = 25 - MAX_PAGINAS_RECENTES = 50 + # Modo diario: varre apenas as paginas mais recentes por franquia. + MAX_PAGINAS_RECENTES = 5 MAX_PAGINAS_SEM_NOVIDADE = 5 + SKIP_EXISTENTE_MESMO_IMAGENAME = True DOCUMENT_TYPE = "NDEB" SQL_CONN = ( "DRIVER={ODBC Driver 17 for SQL Server};" @@ -466,147 +655,186 @@ def main() -> None: s = requests.Session() s.trust_env = False a = Auth(s) - f = get_franchise_ids(s, a) + f = get_franchise_ids_from_sf(s, a, CP_ID) return s, a, f - session, auth, franchises = _new_client() + session, auth, all_franchises = _new_client() + target_franchises = list(all_franchises) total_docs_upsert = 0 total_parcs_upsert = 0 - skip = 0 # para testes, pular os primeiros 900 registros (36 paginas) e ir direto para os mais recentes. Ajustar para 0 para rodar do inicio. O endpoint suporta skip alto, mas pode ser mais lento. O ideal é rodar periodicamente com skip=0 para pegar os novos registros. - skip_inicial = skip - total = None - pagina = 0 - total_paginas = None - relogins = 0 - max_relogins = 20 - paginas_sem_novidade = 0 - while True: - try: - page = get_debit_notes_page(session, auth, CP_ID, skip, TAKE, franchises) - except UnauthorizedTokenError as e: - relogins += 1 - if relogins > max_relogins: - raise RuntimeError( - f"Falha apos {max_relogins} relogins. Ultimo erro: {e}" - ) from e - print( - f"[relogin] 401 persistente em skip={skip}. " - f"Refazendo sessao/token ({relogins}/{max_relogins})..." - ) - session, auth, franchises = _new_client() - time.sleep(2) - continue - if total is None: + font_warning_files: List[str] = [] + + for idx_fr, franchise_code in enumerate(target_franchises, start=1): + print(f"[franchise] iniciando {idx_fr}/{len(target_franchises)} franchiseId={franchise_code}") + skip = 0 + skip_inicial = skip + total = None + pagina = 0 + total_paginas = None + relogins = 0 + max_relogins = 20 + paginas_sem_novidade = 0 + docs_upsert_fr = 0 + parcs_upsert_fr = 0 + + while True: try: - total = int(page.get("documentsTotal") or 0) - except Exception: - total = 0 - total_paginas = (total + TAKE - 1) // TAKE if total > 0 else None - if total_paginas: - print(f"[info] total_registros={total} total_paginas={total_paginas} take={TAKE}") - else: - print(f"[info] total_registros={total} take={TAKE}") - - docs = page.get("documentsList") or [] - if not docs: - break - - pagina += 1 - pagina_global = (skip // TAKE) + 1 - total_paginas_restantes = None - if total is not None and total > 0: - restantes = max(0, total - skip_inicial) - total_paginas_restantes = (restantes + TAKE - 1) // TAKE if restantes > 0 else 0 - if total_paginas: - if total_paginas_restantes is not None: + page = get_debit_notes_page(session, auth, CP_ID, skip, TAKE, [franchise_code]) + except UnauthorizedTokenError as e: + relogins += 1 + if relogins > max_relogins: + raise RuntimeError( + f"Falha apos {max_relogins} relogins. Ultimo erro: {e}" + ) from e print( - f"[page] baixando pagina_execucao={pagina}/{total_paginas_restantes} " - f"pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})" + f"[relogin] 401 persistente em skip={skip} franchiseId={franchise_code}. " + f"Refazendo sessao/token ({relogins}/{max_relogins})..." ) + session, auth, _ = _new_client() + time.sleep(2) + continue + if total is None: + try: + total = int(page.get("documentsTotal") or 0) + except Exception: + total = 0 + total_paginas = (total + TAKE - 1) // TAKE if total > 0 else None + if total_paginas: + print( + f"[info] franchiseId={franchise_code} total_registros={total} " + f"total_paginas={total_paginas} take={TAKE}" + ) + else: + print(f"[info] franchiseId={franchise_code} total_registros={total} take={TAKE}") + + docs = page.get("documentsList") or [] + if not docs: + break + + pagina += 1 + pagina_global = (skip // TAKE) + 1 + total_paginas_restantes = None + if total is not None and total > 0: + restantes = max(0, total - skip_inicial) + total_paginas_restantes = (restantes + TAKE - 1) // TAKE if restantes > 0 else 0 + if total_paginas: + if total_paginas_restantes is not None: + print( + f"[page] franchiseId={franchise_code} " + f"baixando pagina_execucao={pagina}/{total_paginas_restantes} " + f"pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})" + ) + else: + print( + f"[page] franchiseId={franchise_code} " + f"baixando pagina_execucao={pagina} pagina_global={pagina_global}/{total_paginas} " + f"(itens={len(docs)})" + ) else: - print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})") - else: - print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global} (itens={len(docs)})") - - doc_ids_page: List[int] = [] - for d in docs: - try: - doc_ids_page.append(int(d.get("id"))) - except Exception: - continue - existing_map = get_existing_docs_map_sqlserver(doc_ids_page, SQL_CONN) - - page_rows: List[Dict[str, Any]] = [] - skipped_existing = 0 - for d in docs: - doc_id = int(d.get("id")) - franchise_id = str(d.get("franchiseId") or "").strip() - image_name = str(d.get("imageName") or "").strip() - existing_image_name = existing_map.get(doc_id) - if doc_id in existing_map and (existing_image_name or "") == image_name: - skipped_existing += 1 - print(f"[skip] {doc_id} ja existe no SQL com mesmo imageName") - continue - try: - dl_url = get_download_url( - session, - auth, - DOCUMENT_TYPE, - franchise_id, - doc_id, # para esse endpoint usa "id" do documento - image_name, + print( + f"[page] franchiseId={franchise_code} " + f"baixando pagina_execucao={pagina} pagina_global={pagina_global} (itens={len(docs)})" ) - pdf_bytes = download_pdf_bytes(session, dl_url) - txt = extract_pdf_text(pdf_bytes) - parsed = parse_pdf_fields(txt) - page_rows.append( - { - "id": doc_id, - "franchiseId": franchise_id, - "imageName": image_name, - **parsed, - } - ) - print(f"[ok] {doc_id} -> {image_name}") - except Exception as e: - print(f"[erro] {doc_id} -> {e}") - novos_na_pagina = len(page_rows) - if page_rows: - stats_page = upsert_rows_sqlserver(page_rows, SQL_CONN) - total_docs_upsert += int(stats_page.get("documentos") or 0) - total_parcs_upsert += int(stats_page.get("parcelas") or 0) + doc_ids_page: List[int] = [] + for d in docs: + try: + doc_ids_page.append(int(d.get("id"))) + except Exception: + continue + existing_map = get_existing_docs_map_sqlserver(doc_ids_page, SQL_CONN) + + page_rows: List[Dict[str, Any]] = [] + skipped_existing = 0 + for d in docs: + doc_id = int(d.get("id")) + franchise_id = str(d.get("franchiseId") or "").strip() + image_name = str(d.get("imageName") or "").strip() + existing_image_name = existing_map.get(doc_id) + if SKIP_EXISTENTE_MESMO_IMAGENAME and doc_id in existing_map and (existing_image_name or "") == image_name: + skipped_existing += 1 + print(f"[skip] {doc_id} ja existe no SQL com mesmo imageName") + continue + try: + dl_url = get_download_url( + session, + auth, + DOCUMENT_TYPE, + franchise_id, + doc_id, # para esse endpoint usa "id" do documento + image_name, + ) + pdf_bytes = download_pdf_bytes(session, dl_url) + txt, diag_lines = extract_pdf_text_with_diagnostics(pdf_bytes) + has_font_warn = any("FontBBox" in ln for ln in diag_lines) + if has_font_warn: + font_warning_files.append(image_name) + print(f"[warn-font] {doc_id} -> {image_name}") + parsed = parse_pdf_fields(txt) + page_rows.append( + { + "id": doc_id, + "franchiseId": franchise_id, + "imageName": image_name, + **parsed, + } + ) + print(f"[ok] {doc_id} -> {image_name}") + except Exception as e: + print(f"[erro] {doc_id} -> {e}") + + novos_na_pagina = len(page_rows) + if page_rows: + stats_page = upsert_rows_sqlserver(page_rows, SQL_CONN) + docs_page = int(stats_page.get("documentos") or 0) + parcs_page = int(stats_page.get("parcelas") or 0) + total_docs_upsert += docs_page + total_parcs_upsert += parcs_page + docs_upsert_fr += docs_page + parcs_upsert_fr += parcs_page + print( + f"[sql] franchiseId={franchise_code} pagina={pagina} docs_upsert={docs_page} " + f"parc_upsert={parcs_page} acumulado_fr_docs={docs_upsert_fr} " + f"acumulado_fr_parc={parcs_upsert_fr}" + ) + + if novos_na_pagina == 0: + paginas_sem_novidade += 1 + else: + paginas_sem_novidade = 0 + print( - f"[sql] pagina={pagina} docs_upsert={stats_page['documentos']} " - f"parc_upsert={stats_page['parcelas']} acumulado_docs={total_docs_upsert} " - f"acumulado_parc={total_parcs_upsert}" + f"[page] franchiseId={franchise_code} pagina={pagina} novos={novos_na_pagina} " + f"skip_sql={skipped_existing} sem_novidade={paginas_sem_novidade}/{MAX_PAGINAS_SEM_NOVIDADE}" ) - if novos_na_pagina == 0: - paginas_sem_novidade += 1 - else: - paginas_sem_novidade = 0 + skip += TAKE + print( + f"[page] franchiseId={franchise_code} concluida {pagina} " + f"acumulado_fr_docs={docs_upsert_fr} total={total}" + ) + if pagina >= MAX_PAGINAS_RECENTES: + print(f"[stop] franchiseId={franchise_code} limite diario atingido: {MAX_PAGINAS_RECENTES} paginas recentes") + break + if paginas_sem_novidade >= MAX_PAGINAS_SEM_NOVIDADE: + print(f"[stop] franchiseId={franchise_code} sem novidades por {MAX_PAGINAS_SEM_NOVIDADE} paginas consecutivas") + break + if total and skip >= total: + break print( - f"[page] pagina={pagina} novos={novos_na_pagina} skip_sql={skipped_existing} " - f"sem_novidade={paginas_sem_novidade}/{MAX_PAGINAS_SEM_NOVIDADE}" + f"[franchise] concluida franchiseId={franchise_code} " + f"docs_upsert={docs_upsert_fr} parcelas_upsert={parcs_upsert_fr}" ) - skip += TAKE - print(f"[page] concluida {pagina} acumulado_docs={total_docs_upsert} total={total}") - if pagina >= MAX_PAGINAS_RECENTES: - print(f"[stop] limite diario atingido: {MAX_PAGINAS_RECENTES} paginas recentes") - break - if paginas_sem_novidade >= MAX_PAGINAS_SEM_NOVIDADE: - print(f"[stop] sem novidades por {MAX_PAGINAS_SEM_NOVIDADE} paginas consecutivas") - break - if total and skip >= total: - break + if font_warning_files: + uniq_font_warn = sorted(set(font_warning_files)) + print(f"[warn-font] total_arquivos_com_warning={len(uniq_font_warn)}") + for name in uniq_font_warn: + print(f"[warn-font] arquivo={name}") print(f"SQL upsert final -> documentos={total_docs_upsert} parcelas={total_parcs_upsert}") if __name__ == "__main__": main() - -