trm_rem_saggezza/debit_notes_pdf_reader.py
Andrey Cunh@ 912466bde4 ATT
2026-02-26 13:17:11 -03:00

613 lines
21 KiB
Python

import base64
import json
import re
import time
from dataclasses import dataclass
from datetime import datetime
from io import BytesIO
from typing import Any, Dict, List, Optional
import requests
try:
import pdfplumber # type: ignore
except Exception:
pdfplumber = None
TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
DEBIT_NOTES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/debit-notes/documents-list"
HANDLE_IMAGES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/handle-images"
class UnauthorizedTokenError(RuntimeError):
pass
def _jwt_payload(jwt_token: str) -> Dict[str, Any]:
parts = jwt_token.split(".")
if len(parts) != 3:
return {}
payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4)
raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
return json.loads(raw.decode("utf-8"))
@dataclass
class TokenCache:
bearer: Optional[str] = None
exp_epoch: int = 0
def valid(self, skew_seconds: int = 30) -> bool:
return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds))
class Auth:
def __init__(self, session: requests.Session):
self.s = session
self.cache = TokenCache()
def get_bearer(self, force_refresh: bool = False) -> str:
if (not force_refresh) and self.cache.valid():
return self.cache.bearer # type: ignore[return-value]
r = self.s.get(TOKENS_URL, timeout=30)
r.raise_for_status()
body = r.json()
if not body.get("success"):
raise RuntimeError(f"Token API retornou success=false: {body}")
bearer = body["data"][0]["token"]
jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer
exp = int(_jwt_payload(jwt).get("exp") or 0)
self.cache = TokenCache(bearer=bearer, exp_epoch=exp)
return bearer
def invalidate(self) -> None:
self.cache = TokenCache()
def _headers(auth: Auth, content_type: bool = True) -> Dict[str, str]:
h = {
"Authorization": auth.get_bearer(),
"Accept": "application/json, text/plain, */*",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
if content_type:
h["Content-Type"] = "application/json"
return h
def get_franchise_ids(session: requests.Session, auth: Auth) -> List[str]:
r = None
for attempt in range(4):
r = session.get(STORES_URL, headers=_headers(auth, content_type=False), timeout=30)
if r.status_code not in (401, 403):
break
print(f"[warn] token invalido ao listar franquias (tentativa {attempt + 1}/4), renovando token...")
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(min(3, attempt + 1))
assert r is not None
r.raise_for_status()
out: List[str] = []
seen = set()
for item in r.json().get("data", []):
code = str(item.get("code") or "").strip()
if code and code not in seen:
seen.add(code)
out.append(code)
return out
def get_debit_notes_page(
session: requests.Session,
auth: Auth,
cp_id: int,
skip: int,
take: int,
franchise_ids: List[str],
) -> Dict[str, Any]:
params = {"cpId": cp_id, "skip": skip, "take": take}
payload = {"franchiseId": franchise_ids}
r = None
max_attempts = 8
transient_status = {429, 500, 502, 503, 504}
for attempt in range(max_attempts):
r = session.post(DEBIT_NOTES_URL, headers=_headers(auth), params=params, json=payload, timeout=90)
if r.status_code in (401, 403):
print(
f"[warn] token invalido ao buscar pagina skip={skip} "
f"(tentativa {attempt + 1}/{max_attempts}), renovando token..."
)
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(min(5, attempt + 1))
continue
if r.status_code in transient_status:
wait_s = min(30, 2 ** min(5, attempt))
print(
f"[warn] erro temporario {r.status_code} em skip={skip} "
f"(tentativa {attempt + 1}/{max_attempts}), relogando e aguardando {wait_s}s..."
)
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(wait_s)
continue
break
assert r is not None
if r.status_code in (401, 403):
raise UnauthorizedTokenError(
f"401/403 persistente em documents-list skip={skip} body={r.text[:300]}"
)
if r.status_code in transient_status:
raise RuntimeError(
f"Falha temporaria persistente ({r.status_code}) em documents-list skip={skip}. "
f"Body={r.text[:300]}"
)
r.raise_for_status()
return r.json()
def get_download_url(
session: requests.Session,
auth: Auth,
document_type: str,
franchise_id: str,
document_id: int,
image_name: str,
) -> str:
url = f"{HANDLE_IMAGES_URL}/{document_type}/{franchise_id}/{document_id}/{image_name}/download"
r = None
for attempt in range(4):
r = session.get(url, headers=_headers(auth, content_type=False), timeout=60)
if r.status_code not in (401, 403):
break
print(
f"[warn] token invalido no handle-images doc={document_id} "
f"(tentativa {attempt + 1}/4), renovando token..."
)
auth.invalidate()
auth.get_bearer(force_refresh=True)
time.sleep(min(3, attempt + 1))
assert r is not None
r.raise_for_status()
txt = (r.text or "").strip()
if txt.startswith("http://") or txt.startswith("https://"):
return txt
try:
body = r.json()
if isinstance(body, dict) and isinstance(body.get("url"), str):
return body["url"]
except Exception:
pass
raise RuntimeError("Resposta de handle-images nao contem URL de download")
def download_pdf_bytes(session: requests.Session, url: str) -> bytes:
r = session.get(url, timeout=180)
r.raise_for_status()
return r.content
def extract_pdf_text(pdf_bytes: bytes) -> str:
if pdfplumber is None:
raise RuntimeError("pdfplumber nao instalado (pip install pdfplumber)")
text = []
with pdfplumber.open(BytesIO(pdf_bytes)) as pdf:
for page in pdf.pages:
page_text = page.extract_text(layout=True) or ""
if page_text:
text.append(page_text)
return "\n".join(text).strip()
def parse_money_br(value: Optional[str]) -> Optional[float]:
if not value:
return None
s = value.strip().replace(".", "").replace(",", ".")
try:
return float(s)
except Exception:
return None
def parse_date_br(value: Optional[str]):
if not value:
return None
try:
return datetime.strptime(value.strip(), "%d.%m.%Y").date()
except Exception:
return None
def numero_parcela_from_text(value: Optional[str], fallback: int) -> int:
s = (value or "").strip()
if "/" in s:
right = s.split("/", 1)[1].strip()
if right.isdigit():
n = int(right)
# NumeroParcela em TrfParcela é int, mas semanticamente é contador curto.
# Evita valores absurdos vindos de identificadores longos.
if 1 <= n <= 9999:
return n
return int(fallback)
m = re.search(r"(\d+)$", s)
if m:
n = int(m.group(1))
if 1 <= n <= 9999:
return n
return int(fallback)
def parse_pdf_fields(text: str) -> Dict[str, Any]:
m_cliente = re.search(r"Cliente:\s*(\d+)", text, flags=re.IGNORECASE)
m_nota = re.search(r"NOTA\s+DE\s+D[ÉE]BITO\s*:\s*([A-Z0-9-]+)", text, flags=re.IGNORECASE)
m_emissao = re.search(
r"Data\s+(?:de\s+)?emiss[aã]o\s*:\s*(\d{2}\.\d{2}\.\d{4})",
text,
flags=re.IGNORECASE,
)
m_total = re.search(
r"Valor\s+Total\s+de\s+.+?\s+(\d{1,3}(?:\.\d{3})*,\d{2})",
text,
flags=re.IGNORECASE,
)
parcelas: List[Dict[str, Any]] = []
for num, venc, val in re.findall(
# Modelos suportados:
# - "269326604/1 25.03.2026 17,04"
# - "CTPT022609765408 09.02.2026 6,60"
r"([A-Z0-9/-]+)\s+(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2})",
text,
flags=re.IGNORECASE,
):
parcelas.append(
{
"numero": num.strip(),
"vencimento": venc.strip(),
"valor": val.strip(),
"valor_num": parse_money_br(val),
}
)
denominacao = None
m_den = re.search(
r"Denomina[cç][aã]o\s+Valor\s+(.+?)\s+(\d{1,3}(?:\.\d{3})*,\d{2})",
text,
flags=re.IGNORECASE | re.DOTALL,
)
if m_den:
denominacao = " ".join(m_den.group(1).split()).strip()
if not denominacao:
# Fallback para layout novo: pega a primeira linha de item da seção Denominação.
m_den2 = re.search(
r"Denomina[cç][aã]o\s+Valor\s+(.+?)\s+\d{1,3}(?:\.\d{3})*,\d{2}\s+Valor\s+Total\s+de\s+D[ÉE]bito",
text,
flags=re.IGNORECASE | re.DOTALL,
)
if m_den2:
denominacao = " ".join(m_den2.group(1).split()).strip()
return {
"cliente": m_cliente.group(1) if m_cliente else None,
"notaDebito": m_nota.group(1) if m_nota else None,
"dataEmissao": m_emissao.group(1) if m_emissao else None,
"valorTotalDebito": m_total.group(1) if m_total else None,
"valorTotalDebitoNum": parse_money_br(m_total.group(1) if m_total else None),
"denominacao": denominacao,
"parcelas": parcelas,
"parcelasCount": len(parcelas),
"parcelasSomaNum": sum(p["valor_num"] for p in parcelas if isinstance(p.get("valor_num"), (int, float))),
}
def upsert_rows_sqlserver(rows: List[Dict[str, Any]], connection_string: str) -> Dict[str, int]:
try:
import pyodbc # type: ignore
except Exception as e:
raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e
if not rows:
return {"documentos": 0, "parcelas": 0}
cn = pyodbc.connect(connection_string, timeout=30)
cn.autocommit = False
cur = cn.cursor()
docs = 0
pars = 0
try:
for row in rows:
id_externo = row.get("id")
if id_externo is None:
continue
uuid = f"ND-{id_externo}"
cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid)
found = cur.fetchone()
if found:
doc_id = int(found[0])
cur.execute(
"""
UPDATE dbo.TrfDocumento
SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?,
EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME()
WHERE id=?
""",
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
row.get("valorTotalDebitoNum"),
0.0,
doc_id,
)
else:
cur.execute(
"""
INSERT INTO dbo.TrfDocumento (
UUID, IdExterno, FranchiseId, ImageName, EmissionDate,
EmissaoNF, NotaFiscal, ValorNF, Encargos
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
uuid,
int(id_externo),
str(row.get("franchiseId") or "")[:20] or None,
str(row.get("imageName") or "")[:150] or None,
parse_date_br(row.get("dataEmissao")),
parse_date_br(row.get("dataEmissao")),
str(row.get("notaDebito") or "")[:40] or None,
row.get("valorTotalDebitoNum"),
0.0,
)
cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid)
got = cur.fetchone()
if not got:
continue
doc_id = int(got[0])
docs += 1
cur.execute("DELETE FROM dbo.TrfParcela WHERE DocumentoId = ?", doc_id)
for idx, p in enumerate(row.get("parcelas") or [], start=1):
dt = parse_date_br(p.get("vencimento"))
val = p.get("valor_num")
if dt is None or val is None:
continue
num_parc = numero_parcela_from_text(p.get("numero"), idx)
cur.execute(
"""
INSERT INTO dbo.TrfParcela (DocumentoId, NumeroParcela, DataVencimento, ValorParcela)
VALUES (?, ?, ?, ?)
""",
doc_id,
int(num_parc),
dt,
float(val),
)
pars += 1
cn.commit()
return {"documentos": docs, "parcelas": pars}
except Exception:
cn.rollback()
raise
finally:
cur.close()
cn.close()
def get_existing_docs_map_sqlserver(doc_ids: List[int], connection_string: str) -> Dict[int, Optional[str]]:
try:
import pyodbc # type: ignore
except Exception as e:
raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e
clean_ids = sorted({int(x) for x in doc_ids if x is not None})
if not clean_ids:
return {}
cn = pyodbc.connect(connection_string, timeout=30)
cur = cn.cursor()
out: Dict[int, Optional[str]] = {}
try:
chunk_size = 900 # evita limite de parametros do SQL Server
for i in range(0, len(clean_ids), chunk_size):
chunk = clean_ids[i : i + chunk_size]
placeholders = ",".join("?" for _ in chunk)
sql = f"""
SELECT IdExterno, ImageName
FROM dbo.TrfDocumento
WHERE IdExterno IN ({placeholders})
"""
cur.execute(sql, *chunk)
for row in cur.fetchall():
ext_id = int(row[0])
img = str(row[1]).strip() if row[1] is not None else None
out[ext_id] = img
return out
finally:
cur.close()
cn.close()
def main() -> None:
# Fluxo fixo: leitura da API -> extracao do PDF em memoria -> upsert SQL.
CP_ID = 10269
TAKE = 25
MAX_PAGINAS_RECENTES = 50
MAX_PAGINAS_SEM_NOVIDADE = 5
DOCUMENT_TYPE = "NDEB"
SQL_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
def _new_client() -> tuple[requests.Session, Auth, List[str]]:
s = requests.Session()
s.trust_env = False
a = Auth(s)
f = get_franchise_ids(s, a)
return s, a, f
session, auth, franchises = _new_client()
total_docs_upsert = 0
total_parcs_upsert = 0
skip = 0 # para testes, pular os primeiros 900 registros (36 paginas) e ir direto para os mais recentes. Ajustar para 0 para rodar do inicio. O endpoint suporta skip alto, mas pode ser mais lento. O ideal é rodar periodicamente com skip=0 para pegar os novos registros.
skip_inicial = skip
total = None
pagina = 0
total_paginas = None
relogins = 0
max_relogins = 20
paginas_sem_novidade = 0
while True:
try:
page = get_debit_notes_page(session, auth, CP_ID, skip, TAKE, franchises)
except UnauthorizedTokenError as e:
relogins += 1
if relogins > max_relogins:
raise RuntimeError(
f"Falha apos {max_relogins} relogins. Ultimo erro: {e}"
) from e
print(
f"[relogin] 401 persistente em skip={skip}. "
f"Refazendo sessao/token ({relogins}/{max_relogins})..."
)
session, auth, franchises = _new_client()
time.sleep(2)
continue
if total is None:
try:
total = int(page.get("documentsTotal") or 0)
except Exception:
total = 0
total_paginas = (total + TAKE - 1) // TAKE if total > 0 else None
if total_paginas:
print(f"[info] total_registros={total} total_paginas={total_paginas} take={TAKE}")
else:
print(f"[info] total_registros={total} take={TAKE}")
docs = page.get("documentsList") or []
if not docs:
break
pagina += 1
pagina_global = (skip // TAKE) + 1
total_paginas_restantes = None
if total is not None and total > 0:
restantes = max(0, total - skip_inicial)
total_paginas_restantes = (restantes + TAKE - 1) // TAKE if restantes > 0 else 0
if total_paginas:
if total_paginas_restantes is not None:
print(
f"[page] baixando pagina_execucao={pagina}/{total_paginas_restantes} "
f"pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})"
)
else:
print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})")
else:
print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global} (itens={len(docs)})")
doc_ids_page: List[int] = []
for d in docs:
try:
doc_ids_page.append(int(d.get("id")))
except Exception:
continue
existing_map = get_existing_docs_map_sqlserver(doc_ids_page, SQL_CONN)
page_rows: List[Dict[str, Any]] = []
skipped_existing = 0
for d in docs:
doc_id = int(d.get("id"))
franchise_id = str(d.get("franchiseId") or "").strip()
image_name = str(d.get("imageName") or "").strip()
existing_image_name = existing_map.get(doc_id)
if doc_id in existing_map and (existing_image_name or "") == image_name:
skipped_existing += 1
print(f"[skip] {doc_id} ja existe no SQL com mesmo imageName")
continue
try:
dl_url = get_download_url(
session,
auth,
DOCUMENT_TYPE,
franchise_id,
doc_id, # para esse endpoint usa "id" do documento
image_name,
)
pdf_bytes = download_pdf_bytes(session, dl_url)
txt = extract_pdf_text(pdf_bytes)
parsed = parse_pdf_fields(txt)
page_rows.append(
{
"id": doc_id,
"franchiseId": franchise_id,
"imageName": image_name,
**parsed,
}
)
print(f"[ok] {doc_id} -> {image_name}")
except Exception as e:
print(f"[erro] {doc_id} -> {e}")
novos_na_pagina = len(page_rows)
if page_rows:
stats_page = upsert_rows_sqlserver(page_rows, SQL_CONN)
total_docs_upsert += int(stats_page.get("documentos") or 0)
total_parcs_upsert += int(stats_page.get("parcelas") or 0)
print(
f"[sql] pagina={pagina} docs_upsert={stats_page['documentos']} "
f"parc_upsert={stats_page['parcelas']} acumulado_docs={total_docs_upsert} "
f"acumulado_parc={total_parcs_upsert}"
)
if novos_na_pagina == 0:
paginas_sem_novidade += 1
else:
paginas_sem_novidade = 0
print(
f"[page] pagina={pagina} novos={novos_na_pagina} skip_sql={skipped_existing} "
f"sem_novidade={paginas_sem_novidade}/{MAX_PAGINAS_SEM_NOVIDADE}"
)
skip += TAKE
print(f"[page] concluida {pagina} acumulado_docs={total_docs_upsert} total={total}")
if pagina >= MAX_PAGINAS_RECENTES:
print(f"[stop] limite diario atingido: {MAX_PAGINAS_RECENTES} paginas recentes")
break
if paginas_sem_novidade >= MAX_PAGINAS_SEM_NOVIDADE:
print(f"[stop] sem novidades por {MAX_PAGINAS_SEM_NOVIDADE} paginas consecutivas")
break
if total and skip >= total:
break
print(f"SQL upsert final -> documentos={total_docs_upsert} parcelas={total_parcs_upsert}")
if __name__ == "__main__":
main()