Andrey Cunha cb2c0f8e78 att
2026-02-14 23:37:32 -03:00

1191 lines
41 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import pdfplumber
import base64
import json
import time
import re
import unicodedata
from datetime import datetime
from io import BytesIO
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Tuple
TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
DOCS_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/reports/documents-list"
HANDLE_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/handle-images"
# -----------------------------
# TOKEN (com cache simples)
# -----------------------------
def _jwt_payload(jwt_token: str) -> Dict[str, Any]:
parts = jwt_token.split(".")
if len(parts) != 3:
return {}
payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4)
raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
return json.loads(raw.decode("utf-8"))
@dataclass
class TokenCache:
bearer: Optional[str] = None
exp_epoch: int = 0
def valid(self, skew_seconds: int = 30) -> bool:
return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds))
class Auth:
def __init__(self, s: requests.Session):
self.s = s
self.cache = TokenCache()
def get_bearer(self) -> str:
if self.cache.valid():
return self.cache.bearer # type: ignore
r = self.s.get(TOKENS_URL, timeout=30)
r.raise_for_status()
body = r.json()
if not body.get("success"):
raise RuntimeError(f"Token API retornou success=false: {body}")
bearer = body["data"][0]["token"] # jÃÆÃ†â€™Ãƒâ€šÃ¡ vem "Bearer ..."
jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer
exp = int(_jwt_payload(jwt).get("exp") or 0)
self.cache = TokenCache(bearer=bearer, exp_epoch=exp)
return bearer
def invalidate(self) -> None:
self.cache = TokenCache()
# -----------------------------
# PDF: download + leitura texto (fallback)
# -----------------------------
def baixar_pdf_bytes(url_pdf: str, session: requests.Session) -> BytesIO:
r = session.get(url_pdf, timeout=120)
r.raise_for_status()
return BytesIO(r.content)
def ler_texto_pdf(pdf_bytes: BytesIO) -> str:
texto_total = ""
with pdfplumber.open(pdf_bytes) as pdf:
for pagina in pdf.pages:
texto = pagina.extract_text(layout=True) or ""
if texto:
texto_total += texto + "\n"
return texto_total
# -----------------------------
# EXTRAÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã¡ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢O: RemuneraÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o Franquia
# 1) tenta por TABELA (mais confiÃÆÃ†â€™Ãƒâ€šÃ¡vel)
# 2) se falhar, cai no TEXTO (como antes)
# -----------------------------
def _norm(s: str) -> str:
return re.sub(r"\s+", " ", (s or "")).strip().lower()
def _parse_money_pt(s: str) -> float:
s = (s or "").strip()
# "12.769,43" -> "12769.43"
if "," in s:
s = s.replace(".", "").replace(",", ".")
return float(s)
def _money_from_text(s: str) -> Optional[float]:
if not s:
return None
m = re.search(r"\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2}", s)
if not m:
return None
try:
return _parse_money_pt(m.group(0))
except Exception:
return None
def _float5_from_text(s: str) -> Optional[float]:
if not s:
return None
m = re.search(r"\d+(?:[.,])\d{5}", s) # 1,00000 / 1.00000
if not m:
return None
x = m.group(0)
x = x.replace(".", "").replace(",", ".") if "," in x else x
try:
return float(x)
except Exception:
return None
def _pick_first_number(s: str) -> Optional[str]:
if not s:
return None
m = re.search(r"\b\d{3,}\b", s)
return m.group(0) if m else None
def _extract_date_pt(s: str) -> Optional[str]:
if not s:
return None
s = re.sub(r"(?<=\d)\s+(?=\d)", "", s)
m = re.search(r"\d{2}\.\d{2}\.\d{4}", s)
return m.group(0) if m else None
def _date_from_iso(s: Optional[str]):
if not s:
return None
try:
return datetime.strptime(s, "%Y-%m-%d").date()
except Exception:
return None
def _date_from_br(s: Optional[str]):
if not s:
return None
try:
return datetime.strptime(s, "%d.%m.%Y").date()
except Exception:
return None
def _isolar_bloco(texto: str, titulo_regex: str, proximos_titulos_regex: List[str]) -> Optional[str]:
t = " ".join((texto or "").split())
m0 = re.search(titulo_regex, t, flags=re.IGNORECASE)
if not m0:
return None
after = t[m0.start():]
end_pos = None
for rx in proximos_titulos_regex:
m = re.search(rx, after, flags=re.IGNORECASE)
if m:
end_pos = m.start()
break
return after[:end_pos] if end_pos is not None else after
def extrair_remuneracao_franquia_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]:
"""
Extrai APENAS a tabela do bloco:
"Notas Fiscais de ServiÃÆÃ†â€™Ãƒâ€šÃ§o de taxa de remuneraÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o de Franquia"
Regra para nÃÆÃ†â€™Ãƒâ€šÃ£o confundir com "Venda de Produtos":
- o header TEM que ter "valor taxa" e "fat.conv"
"""
for page in pdf.pages:
tables = page.extract_tables() or []
for tbl in tables:
if not tbl or len(tbl) < 2:
continue
# procura um header que tenha as colunas especÃÆÃ†â€™Ãƒâ€šÃ­ficas do bloco de remuneraÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o
header_idx = None
header_join = ""
for i, row in enumerate(tbl[:8]):
header_join = " | ".join(_norm(c) for c in row if c)
# precisa ter essas chaves (venda nÃÆÃ†â€™Ãƒâ€šÃ£o tem "valor taxa")
if ("valor taxa" in header_join) and ("fat" in header_join) and ("dados pagamento" in header_join):
header_idx = i
break
if header_idx is None:
continue
header = [_norm(c) for c in tbl[header_idx]]
def idx_like(keys: List[str]) -> Optional[int]:
for j, h in enumerate(header):
for k in keys:
if k in h:
return j
return None
i_emissao = idx_like(["emissÃÆÃ†â€™Ãƒâ€šÃ£o", "emissao"])
i_nota = idx_like(["nota fiscal"])
i_valor_taxa = idx_like(["valor taxa"])
i_valor_desc = idx_like(["valor desconto"])
i_fat = idx_like(["fat.conv", "fat conv", "fat"])
i_valor_nf = idx_like(["valor nf"])
i_enc = idx_like(["encargos"])
i_dp = idx_like(["dados pagamento"])
# essenciais do bloco
if i_emissao is None or i_nota is None or i_valor_taxa is None or i_valor_nf is None or i_dp is None:
continue
# lÃÆÃ†â€™Ãƒâ€šÃª a primeira linha "real" de dados (normalmente sÃÆÃ†â€™Ãƒâ€šÃ³ tem 1)
for row in tbl[header_idx + 1:]:
if not any((c or "").strip() for c in row):
continue
emissao_cell = row[i_emissao] if i_emissao < len(row) else ""
nota_cell = row[i_nota] if i_nota < len(row) else ""
emissao = _extract_date_pt(_cell(emissao_cell))
nota = _pick_first_number(nota_cell or "")
if not emissao or not nota:
continue
valor_taxa = _money_from_text(row[i_valor_taxa]) if i_valor_taxa < len(row) else None
valor_desconto = _money_from_text(row[i_valor_desc]) if i_valor_desc is not None and i_valor_desc < len(row) else None
fat_conv = _float5_from_text(row[i_fat]) if i_fat is not None and i_fat < len(row) else None
valor_nf = _money_from_text(row[i_valor_nf]) if i_valor_nf < len(row) else None
encargos = _money_from_text(row[i_enc]) if i_enc is not None and i_enc < len(row) else None
# Dados pagamento: vÃÆÃ†â€™Ãƒâ€šÃ¡rias linhas data + valor
parcelas: List[Dict[str, Any]] = []
dp = row[i_dp] if i_dp < len(row) else ""
pares = re.findall(
r"(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})",
dp or ""
)
seen = set()
for data, v in pares:
try:
val = _parse_money_pt(v)
except Exception:
continue
k = (data, val)
if k not in seen:
seen.add(k)
parcelas.append({"data": data, "valor": val})
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": emissao,
"nota_fiscal": nota,
"valor_taxa": valor_taxa,
"valor_desconto": valor_desconto,
"fat_conv": fat_conv,
"valor_nf": valor_nf,
"encargos": encargos,
"parcelas": parcelas,
"error": None,
}
return None
def _cell(c: Any) -> str:
return " ".join(str(c or "").split())
def _table_text(tbl: List[List[Any]]) -> str:
return " | ".join(_norm(_cell(c)) for row in tbl for c in (row or []))
def _strip_accents(s: str) -> str:
return "".join(ch for ch in unicodedata.normalize("NFD", s or "") if unicodedata.category(ch) != "Mn")
def _tipo_bloco_fallback(tbl: List[List[Any]]) -> str:
t = _strip_accents(_table_text(tbl))
if "outras notas de debito" in t or "outras nfs servicos" in t:
return "OUTRAS_NOTAS_DEBITO"
if "outras notas de credito" in t:
return "OUTRAS_NOTAS_CREDITO"
if "despesas de propaganda" in t or "esforcos de marketing" in t:
return "DESPESAS_MARKETING"
if "devolucao da franquia" in t:
return "DEVOLUCAO_FRANQUIA"
if "venda de produtos" in t:
return "VENDA_PRODUTOS"
return "BLOCO_ALTERNATIVO"
def extrair_bloco_com_informacao_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]:
"""
Fallback geral: encontra o primeiro bloco com emissao + nota,
quando TAXA_REMUNERACAO estiver vazio.
"""
for page in pdf.pages:
tables = page.extract_tables() or []
for tbl in tables:
if not tbl or len(tbl) < 2:
continue
header_idx = None
for i, row in enumerate(tbl[:10]):
header_join = _strip_accents(" | ".join(_norm(_cell(c)) for c in (row or [])))
has_emissao = "emissao" in header_join
has_nota = (
("nota fiscal" in header_join)
or ("nota debito" in header_join)
or ("nota credito" in header_join)
or ("n nota" in header_join)
or ("nota - duplicata" in header_join)
)
has_valor = ("valor nf" in header_join) or (re.search(r"\bvalor\b", header_join) is not None)
if has_emissao and has_nota and has_valor:
header_idx = i
break
if header_idx is None:
continue
header = [_strip_accents(_norm(_cell(c))) for c in tbl[header_idx]]
def idx_like(keys: List[str]) -> Optional[int]:
for j, h in enumerate(header):
for k in keys:
if k in h:
return j
return None
def idx_like_prefer(keys: List[str]) -> Optional[int]:
for k in keys:
for j, h in enumerate(header):
if k in h:
return j
return None
i_emissao = idx_like(["emissao"])
i_nota = idx_like_prefer([
"n nota debito",
"n nota credito",
"nota fiscal",
"nota debito",
"nota credito",
"nota - duplicata",
"nota duplicata",
"n nota",
])
i_valor_nf = idx_like(["valor nf", "val. nf autorizado", "valor da nf"])
i_valor = idx_like(["valor"])
i_enc = idx_like(["encargos"])
i_dp = idx_like(["dados pagamento"])
if i_emissao is None or i_nota is None:
continue
for row in tbl[header_idx + 1:]:
if not any(_cell(c) for c in row):
continue
emissao_cell = row[i_emissao] if i_emissao < len(row) else ""
nota_cell = row[i_nota] if i_nota < len(row) else ""
emissao = _extract_date_pt(_cell(emissao_cell))
nota = _pick_first_number(_cell(nota_cell))
if not emissao or not nota:
continue
valor_nf = None
if i_valor_nf is not None and i_valor_nf < len(row):
valor_nf = _money_from_text(_cell(row[i_valor_nf]))
if valor_nf is None and i_valor is not None and i_valor < len(row):
valor_nf = _money_from_text(_cell(row[i_valor]))
encargos = _money_from_text(_cell(row[i_enc])) if i_enc is not None and i_enc < len(row) else None
parcelas: List[Dict[str, Any]] = []
dp = _cell(row[i_dp]) if i_dp is not None and i_dp < len(row) else ""
pares = re.findall(
r"(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})",
dp or ""
)
seen = set()
for data, v in pares:
try:
val = _parse_money_pt(v)
except Exception:
continue
k = (data, val)
if k not in seen:
seen.add(k)
parcelas.append({"data": data, "valor": val})
return {
"tipo_bloco": _tipo_bloco_fallback(tbl),
"emissao": emissao,
"nota_fiscal": nota,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": valor_nf,
"encargos": encargos,
"parcelas": parcelas,
"error": "Taxa de remuneracao sem dados; usado bloco alternativo com informacao",
}
return None
def extrair_remuneracao_franquia_por_texto(texto: str) -> Dict[str, Any]:
"""
Fallback (como vocÃÆÃ†â€™Ãƒâ€šÃª jÃÆÃ†â€™Ãƒâ€šÃ¡ fazia): acha a seÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o por regex e tenta extrair
emissÃÆÃ†â€™Ãƒâ€šÃ£o, nota e parcelas. (Os valores podem falhar dependendo do PDF.)
"""
bloco = _isolar_bloco(
texto,
titulo_regex=r"Notas\s+Fiscais\s+de\s+Servi.{0,3}o\s+de\s+taxa\s+de\s+remunera.{0,5}o\s+de\s+Franquia",
proximos_titulos_regex=[
r"\bLink\s*:",
r"Notas\s+de\s+Despesas\s+de\s+Propaganda",
r"Outras\s+Notas\s+de\s+D[eÃÆÃ†â€™Ãƒâ€šÃ©]bito",
r"Outras\s+Notas\s+de\s+Cr[eÃÆÃ†â€™Ãƒâ€šÃ©]dito",
],
)
if not bloco:
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "SeÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o nÃÆÃ†â€™Ãƒâ€šÃ£o encontrada",
}
b = " ".join(bloco.split())
m = re.search(r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{4,})\b", b)
if not m:
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "NÃÆÃ†â€™Ãƒâ€šÃ£o achei emissÃÆÃ†â€™Ãƒâ€šÃ£o + nota",
}
emissao = m.group(1)
nota = m.group(2)
# parcelas depois de "Dados Pagamento"
idx_dp = b.lower().find("dados pagamento")
dp = b[idx_dp:] if idx_dp != -1 else b
pares = re.findall(
r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})\b",
dp
)
parcelas: List[Dict[str, Any]] = []
seen = set()
for data, v in pares:
try:
val = _parse_money_pt(v)
except Exception:
continue
k = (data, val)
if k not in seen:
seen.add(k)
parcelas.append({"data": data, "valor": val})
# aqui vocÃÆÃ†â€™Ãƒâ€šÃª pode manter os valores como None no fallback
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": emissao,
"nota_fiscal": nota,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": parcelas,
"error": None,
}
def extrair_remuneracao_franquia(pdf_bytes: BytesIO) -> Dict[str, Any]:
"""
Fun̮̤̮̣o ̮̼nica que voc̮̻ chama no seu fluxo:
- tenta tabela (melhor)
- se n̮̣o conseguir, cai no texto (como era)
"""
# tenta bloco principal por tabela (melhor cenÃÆÃ¡rio)
with pdfplumber.open(pdf_bytes) as pdf:
via_tabela = extrair_remuneracao_franquia_por_tabela(pdf)
if via_tabela:
return via_tabela
# fallback texto do bloco principal
pdf_bytes.seek(0)
texto = ler_texto_pdf(pdf_bytes)
via_texto = extrair_remuneracao_franquia_por_texto(texto)
if via_texto.get("emissao") and via_texto.get("nota_fiscal"):
return via_texto
# se TAXA_REMUNERACAO estiver vazio, tenta qualquer outro bloco com dados
pdf_bytes.seek(0)
with pdfplumber.open(pdf_bytes) as pdf:
via_fallback = extrair_bloco_com_informacao_por_tabela(pdf)
if via_fallback:
return via_fallback
return via_texto
def _tem_emissao_nota(d: Optional[Dict[str, Any]]) -> bool:
if not d:
return False
return bool(d.get("emissao") and d.get("nota_fiscal"))
def _dados_bloco(d: Dict[str, Any]) -> Dict[str, Any]:
return {
"emissao_nf": d.get("emissao"),
"nota_fiscal": d.get("nota_fiscal"),
"valor_taxa": d.get("valor_taxa"),
"valor_desconto": d.get("valor_desconto"),
"fat_conv": d.get("fat_conv"),
"valor_nf": d.get("valor_nf"),
"encargos": d.get("encargos"),
"parcelas": d.get("parcelas") or [],
}
def extrair_remuneracao_franquia_detalhado(pdf_bytes: BytesIO) -> Dict[str, Any]:
principal: Optional[Dict[str, Any]] = None
fallback: Optional[Dict[str, Any]] = None
pdf_bytes.seek(0)
with pdfplumber.open(pdf_bytes) as pdf:
via_tabela = extrair_remuneracao_franquia_por_tabela(pdf)
if via_tabela:
principal = via_tabela
if not principal:
pdf_bytes.seek(0)
texto = ler_texto_pdf(pdf_bytes)
principal = extrair_remuneracao_franquia_por_texto(texto)
principal_ok = _tem_emissao_nota(principal)
if not principal_ok:
pdf_bytes.seek(0)
with pdfplumber.open(pdf_bytes) as pdf:
fallback = extrair_bloco_com_informacao_por_tabela(pdf)
escolhido = fallback if fallback else principal
if not escolhido:
escolhido = {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "Falha na extração",
}
warnings: List[str] = []
if not principal_ok and fallback:
warnings.append("Taxa de remuneração sem dados; usado bloco alternativo")
if principal and principal.get("error"):
warnings.append(str(principal.get("error")))
blocos: List[Dict[str, Any]] = []
if principal:
blocos.append({
"tipo": "TAXA_REMUNERACAO",
"encontrado": principal_ok,
"dados": _dados_bloco(principal) if principal_ok else None,
"erro": None if principal_ok else principal.get("error"),
})
if fallback:
blocos.append({
"tipo": fallback.get("tipo_bloco"),
"encontrado": _tem_emissao_nota(fallback),
"dados": _dados_bloco(fallback),
"erro": fallback.get("error"),
})
return {
"bloco_principal": "TAXA_REMUNERACAO",
"status_principal": "ok" if principal_ok else "sem_dados",
"bloco_utilizado": escolhido.get("tipo_bloco"),
"resumo": {
"emissao_nf": escolhido.get("emissao"),
"nota_fiscal": escolhido.get("nota_fiscal"),
"valor_nf": escolhido.get("valor_nf"),
"encargos": escolhido.get("encargos"),
},
"blocos": blocos,
"warnings": warnings,
"escolhido": escolhido, # compatibilidade com estrutura atual
}
# -----------------------------
# CLIENTE PRINCIPAL
# -----------------------------
class Client:
def __init__(self):
self.s = requests.Session()
self.auth = Auth(self.s)
def _headers_json(self) -> Dict[str, str]:
return {
"Authorization": self.auth.get_bearer(),
"Accept": "application/json",
"Content-Type": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
def get_franchises(self, only_channels: Optional[Set[str]] = None) -> List[str]:
r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30)
if r.status_code in (401, 403):
self.auth.invalidate()
r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30)
r.raise_for_status()
body = r.json()
seen = set()
codes = []
for item in body.get("data", []):
if only_channels is not None and item.get("channel") not in only_channels:
continue
code = item.get("code")
if code is None:
continue
code = str(code).strip()
if code and code not in seen:
seen.add(code)
codes.append(code)
return codes
def get_documents_page(self, cp_id: int, document_type: str, franchises: List[str], offset: int, limit: int):
params = {"cpId": cp_id, "documentType": document_type, "offset": offset, "limit": limit}
payload = {"franchises": franchises}
r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60)
if r.status_code in (401, 403):
self.auth.invalidate()
r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60)
r.raise_for_status()
return r.json() # {"total":..., "documents":[...]}
def get_presigned_pdf_url(self, document_type: str, franchise_id: str, image_name: str) -> str:
url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download"
headers = {
"Authorization": self.auth.get_bearer(),
"Accept": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
r = self.s.get(url, headers=headers, timeout=60)
if r.status_code in (401, 403):
self.auth.invalidate()
headers["Authorization"] = self.auth.get_bearer()
r = self.s.get(url, headers=headers, timeout=60)
r.raise_for_status()
# pode vir JSON com {"url": "..."} ou texto puro
try:
body = r.json()
if isinstance(body, dict) and "url" in body and isinstance(body["url"], str):
return body["url"]
except Exception:
pass
return r.text.strip()
def processar_pagina(
self,
cp_id: int = 10269,
document_type: str = "EFAT",
offset: int = 0,
limit: int = 25,
only_channels: Optional[Set[str]] = None,
) -> Dict[str, Any]:
franchises = self.get_franchises(only_channels=only_channels)
page = self.get_documents_page(cp_id, document_type, franchises, offset, limit)
docs = page.get("documents", [])
total = int(page.get("total") or 0)
out = []
for d in docs:
franchise_id = str(d.get("franchiseId") or "").strip()
image_name = str(d.get("imageName") or "").strip()
base_item = {
"id": d.get("id"),
"UUID": d.get("UUID"),
"franchiseId": franchise_id,
"imageName": image_name,
"emissionDate": d.get("emissionDate"),
}
if not franchise_id or not image_name:
out.append({**base_item, "error": "sem franchiseId/imageName"})
continue
try:
presigned = self.get_presigned_pdf_url(document_type, franchise_id, image_name)
pdf_bytes = baixar_pdf_bytes(presigned, self.s)
extra_detalhado = extrair_remuneracao_franquia_detalhado(pdf_bytes)
extra = extra_detalhado.get("escolhido", {})
out.append({
**base_item,
"bloco_principal": extra_detalhado.get("bloco_principal"),
"status_principal": extra_detalhado.get("status_principal"),
"bloco_utilizado": extra_detalhado.get("bloco_utilizado"),
"resumo": extra_detalhado.get("resumo"),
"blocos": extra_detalhado.get("blocos"),
"warnings": extra_detalhado.get("warnings"),
"tipo_bloco": extra.get("tipo_bloco"),
"emissao_nf": extra.get("emissao"),
"nota_fiscal": extra.get("nota_fiscal"),
"valor_taxa": extra.get("valor_taxa"),
"valor_desconto": extra.get("valor_desconto"),
"fat_conv": extra.get("fat_conv"),
"valor_nf": extra.get("valor_nf"),
"encargos": extra.get("encargos"),
"parcelas": extra.get("parcelas"),
"error": extra.get("error"),
})
except Exception as e:
out.append({**base_item, "error": f"falha_processar_pdf: {e}"})
return {
"total": total,
"offset": offset,
"limit": limit,
"count": len(docs),
"hasNext": (offset + limit) < total,
"items": out
}
# -----------------------------
# SQL SERVER: persistência
# -----------------------------
class SqlServerSink:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.cn = None
self.cur = None
def __enter__(self):
try:
import pyodbc # type: ignore
except Exception as e:
raise RuntimeError("pyodbc não encontrado. Instale com: pip install pyodbc") from e
self.cn = pyodbc.connect(self.connection_string, timeout=30)
self.cn.autocommit = False
self.cur = self.cn.cursor()
return self
def __exit__(self, exc_type, exc, tb):
if self.cur is not None:
try:
self.cur.close()
except Exception:
pass
if self.cn is not None:
try:
if exc_type is None:
self.cn.commit()
else:
self.cn.rollback()
except Exception:
pass
try:
self.cn.close()
except Exception:
pass
def reconnect(self) -> None:
try:
if self.cur is not None:
self.cur.close()
except Exception:
pass
try:
if self.cn is not None:
self.cn.close()
except Exception:
pass
import pyodbc # type: ignore
self.cn = pyodbc.connect(self.connection_string, timeout=30)
self.cn.autocommit = False
self.cur = self.cn.cursor()
@staticmethod
def _is_comm_error(e: Exception) -> bool:
msg = str(e)
return ("08S01" in msg) or ("10054" in msg) or ("communication link failure" in msg.lower())
def ensure_schema(self) -> None:
assert self.cur is not None
self.cur.execute(
"""
IF OBJECT_ID('dbo.TrfDocumento', 'U') IS NULL
BEGIN
CREATE TABLE dbo.TrfDocumento (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
UUID VARCHAR(80) NOT NULL UNIQUE,
IdExterno BIGINT NULL,
FranchiseId VARCHAR(20) NULL,
ImageName VARCHAR(150) NULL,
EmissionDate DATE NULL,
EmissaoNF DATE NULL,
NotaFiscal VARCHAR(40) NULL,
ValorNF DECIMAL(18,2) NULL,
Encargos DECIMAL(18,2) NULL,
AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END;
"""
)
self.cur.execute(
"""
IF OBJECT_ID('dbo.TrfParcela', 'U') IS NULL
BEGIN
CREATE TABLE dbo.TrfParcela (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
DocumentoId BIGINT NOT NULL,
NumeroParcela INT NOT NULL,
DataVencimento DATE NOT NULL,
ValorParcela DECIMAL(18,2) NOT NULL,
CONSTRAINT FK_TrfParcela_Documento
FOREIGN KEY (DocumentoId) REFERENCES dbo.TrfDocumento(id),
CONSTRAINT UQ_TrfParcela UNIQUE (DocumentoId, NumeroParcela)
);
END;
"""
)
self.cur.execute(
"""
IF OBJECT_ID('dbo.TrfDocumento', 'U') IS NOT NULL
AND COL_LENGTH('dbo.TrfDocumento', 'id') IS NULL
AND COL_LENGTH('dbo.TrfDocumento', 'DocumentoId') IS NOT NULL
BEGIN
EXEC sp_rename 'dbo.TrfDocumento.DocumentoId', 'id', 'COLUMN';
END;
IF OBJECT_ID('dbo.TrfParcela', 'U') IS NOT NULL
AND COL_LENGTH('dbo.TrfParcela', 'id') IS NULL
AND COL_LENGTH('dbo.TrfParcela', 'ParcelaId') IS NOT NULL
BEGIN
EXEC sp_rename 'dbo.TrfParcela.ParcelaId', 'id', 'COLUMN';
END;
"""
)
self.cur.execute(
"""
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfDocumento_NotaFiscal' AND object_id=OBJECT_ID('dbo.TrfDocumento'))
CREATE INDEX IX_TrfDocumento_NotaFiscal ON dbo.TrfDocumento(NotaFiscal);
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfDocumento_UUID' AND object_id=OBJECT_ID('dbo.TrfDocumento'))
CREATE UNIQUE INDEX IX_TrfDocumento_UUID ON dbo.TrfDocumento(UUID);
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfDocumento_Franchise_Emission' AND object_id=OBJECT_ID('dbo.TrfDocumento'))
CREATE INDEX IX_TrfDocumento_Franchise_Emission ON dbo.TrfDocumento(FranchiseId, EmissionDate);
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfParcela_DocumentoId_DataVenc' AND object_id=OBJECT_ID('dbo.TrfParcela'))
CREATE INDEX IX_TrfParcela_DocumentoId_DataVenc ON dbo.TrfParcela(DocumentoId, DataVencimento);
"""
)
self.cn.commit()
def upsert_documento(self, item: Dict[str, Any]) -> Tuple[int, bool]:
assert self.cur is not None
uuid = item.get("UUID")
if not uuid:
raise ValueError("item sem UUID")
resumo = item.get("resumo") or {}
self.cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid)
row = self.cur.fetchone()
if row:
documento_id = int(row[0])
self.cur.execute(
"""
UPDATE dbo.TrfDocumento
SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?,
EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?,
AtualizadoEm=SYSUTCDATETIME()
WHERE id=?
""",
item.get("id"),
item.get("franchiseId"),
item.get("imageName"),
_date_from_iso(item.get("emissionDate")),
_date_from_br(resumo.get("emissao_nf")),
resumo.get("nota_fiscal"),
resumo.get("valor_nf"),
resumo.get("encargos"),
documento_id,
)
return documento_id, False
self.cur.execute(
"""
INSERT INTO dbo.TrfDocumento (
UUID, IdExterno, FranchiseId, ImageName, EmissionDate,
EmissaoNF, NotaFiscal, ValorNF, Encargos
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
""",
uuid,
item.get("id"),
item.get("franchiseId"),
item.get("imageName"),
_date_from_iso(item.get("emissionDate")),
_date_from_br(resumo.get("emissao_nf")),
resumo.get("nota_fiscal"),
resumo.get("valor_nf"),
resumo.get("encargos"),
)
self.cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid)
new_id = self.cur.fetchone()
if not new_id:
raise RuntimeError("Falha ao recuperar id após inserir TrfDocumento")
return int(new_id[0]), True
def replace_parcelas(self, documento_id: int, item: Dict[str, Any]) -> None:
assert self.cur is not None
self.cur.execute("DELETE FROM dbo.TrfParcela WHERE DocumentoId = ?", documento_id)
parcelas = item.get("parcelas") or []
if not parcelas:
return
rows = []
for idx, p in enumerate(parcelas, start=1):
dt = _date_from_br(p.get("data"))
val = p.get("valor")
if dt is None or val is None:
continue
rows.append((documento_id, idx, dt, val))
if rows:
self.cur.fast_executemany = True
self.cur.executemany(
"INSERT INTO dbo.TrfParcela (DocumentoId, NumeroParcela, DataVencimento, ValorParcela) VALUES (?, ?, ?, ?)",
rows,
)
def persist_items(self, items: List[Dict[str, Any]]) -> Tuple[int, int]:
count = 0
novos = 0
for item in items:
err = item.get("error")
err_txt = str(err) if err is not None else ""
if err_txt.startswith("falha_processar_pdf"):
continue
if not item.get("UUID"):
continue
try:
doc_id, is_new = self.upsert_documento(item)
self.replace_parcelas(doc_id, item)
except Exception as e:
if not self._is_comm_error(e):
raise
# reconecta e tenta uma única vez o item atual
self.reconnect()
doc_id, is_new = self.upsert_documento(item)
self.replace_parcelas(doc_id, item)
count += 1
if is_new:
novos += 1
return count, novos
def sincronizar_paginas_sqlserver(
connection_string: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
start_offset: int = 0,
only_channels: Optional[Set[str]] = None,
commit_cada_paginas: int = 1,
) -> Dict[str, Any]:
cli = Client()
offset = start_offset
paginas = 0
docs_persistidos = 0
total = None
with SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
docs_persistidos += persistidos_pag
paginas += 1
if paginas % commit_cada_paginas == 0:
sink.cn.commit()
print(f"[sync] offset={offset} count={len(itens)} novos_pag={novos_pag} persistidos={docs_persistidos} total={total}")
if not pagina.get("hasNext"):
break
offset += limit
sink.cn.commit()
return {
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"offset_final": offset,
}
def sincronizar_incremental_sqlserver(
connection_string: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
only_channels: Optional[Set[str]] = None,
max_paginas_sem_novidade: int = 3,
max_paginas: int = 20,
) -> Dict[str, Any]:
cli = Client()
offset = 0
paginas = 0
docs_persistidos = 0
docs_novos = 0
sem_novidade = 0
total = None
with SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
sink.cn.commit()
docs_persistidos += persistidos_pag
docs_novos += novos_pag
paginas += 1
sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1)
print(
f"[inc] offset={offset} count={len(itens)} novos_pag={novos_pag} "
f"sem_novidade={sem_novidade}/{max_paginas_sem_novidade} total_novos={docs_novos}"
)
if sem_novidade >= max_paginas_sem_novidade:
break
if paginas >= max_paginas:
break
if not pagina.get("hasNext"):
break
offset += limit
return {
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"documentos_novos": docs_novos,
"offset_final": offset,
"parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade,
}
# -----------------------------
# RUN
# -----------------------------
if __name__ == "__main__":
# Modos:
# - "full": carga completa paginada
# - "incremental": para após X páginas sem novidades
# - "json": só imprime uma página em JSON
RUN_MODE = "full" # "full" | "incremental" | "json"
if RUN_MODE in ("full", "incremental"):
SQLSERVER_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
if RUN_MODE == "incremental":
resultado = sincronizar_paginas_sqlserver(
connection_string=SQLSERVER_CONN,
cp_id=10269,
document_type="EFAT",
limit=100,
start_offset=0,
only_channels=None,
commit_cada_paginas=1,
)
else:
resultado = sincronizar_incremental_sqlserver(
connection_string=SQLSERVER_CONN,
cp_id=10269,
document_type="EFAT",
limit=100,
only_channels=None,
max_paginas_sem_novidade=3,
max_paginas=20,
)
else:
c = Client()
resultado = c.processar_pagina(
cp_id=10269,
document_type="EFAT",
offset=0,
limit=25,
only_channels=None,
)
print(json.dumps(resultado, ensure_ascii=False, indent=2))