Andrey Cunh@ bd1c1f3a08 att
2026-02-26 13:12:25 -03:00

1246 lines
44 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import pdfplumber
import base64
import json
import time
import re
import unicodedata
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from io import BytesIO
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Tuple
TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens"
STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores"
DOCS_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/reports/documents-list"
HANDLE_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/handle-images"
# -----------------------------
# TOKEN (com cache simples)
# -----------------------------
def _jwt_payload(jwt_token: str) -> Dict[str, Any]:
parts = jwt_token.split(".")
if len(parts) != 3:
return {}
payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4)
raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
return json.loads(raw.decode("utf-8"))
@dataclass
class TokenCache:
bearer: Optional[str] = None
exp_epoch: int = 0
def valid(self, skew_seconds: int = 30) -> bool:
return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds))
class Auth:
def __init__(self, s: requests.Session):
self.s = s
self.cache = TokenCache()
def get_bearer(self) -> str:
if self.cache.valid():
return self.cache.bearer # type: ignore
r = self.s.get(TOKENS_URL, timeout=30)
r.raise_for_status()
body = r.json()
if not body.get("success"):
raise RuntimeError(f"Token API retornou success=false: {body}")
bearer = body["data"][0]["token"] # jÃÆÃ†â€™Ãƒâ€šÃ¡ vem "Bearer ..."
jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer
exp = int(_jwt_payload(jwt).get("exp") or 0)
self.cache = TokenCache(bearer=bearer, exp_epoch=exp)
return bearer
def invalidate(self) -> None:
self.cache = TokenCache()
# -----------------------------
# PDF: download + leitura texto (fallback)
# -----------------------------
def baixar_pdf_bytes(url_pdf: str, session: requests.Session) -> BytesIO:
r = session.get(url_pdf, timeout=120)
r.raise_for_status()
return BytesIO(r.content)
def ler_texto_pdf(pdf_bytes: BytesIO) -> str:
texto_total = ""
with pdfplumber.open(pdf_bytes) as pdf:
for pagina in pdf.pages:
texto = pagina.extract_text(layout=True) or ""
if texto:
texto_total += texto + "\n"
return texto_total
# -----------------------------
# EXTRAÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã¡ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢O: RemuneraÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o Franquia
# 1) tenta por TABELA (mais confiÃÆÃ†â€™Ãƒâ€šÃ¡vel)
# 2) se falhar, cai no TEXTO (como antes)
# -----------------------------
def _norm(s: str) -> str:
base = _strip_accents(s or "")
return re.sub(r"\s+", " ", base).strip().lower()
def _parse_money_pt(s: str) -> float:
s = (s or "").strip()
# "12.769,43" -> "12769.43"
if "," in s:
s = s.replace(".", "").replace(",", ".")
return float(s)
def _money_from_text(s: str) -> Optional[float]:
if not s:
return None
m = re.search(r"\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2}", s)
if not m:
return None
try:
return _parse_money_pt(m.group(0))
except Exception:
return None
def _float5_from_text(s: str) -> Optional[float]:
if not s:
return None
m = re.search(r"\d+(?:[.,])\d{5}", s) # 1,00000 / 1.00000
if not m:
return None
x = m.group(0)
x = x.replace(".", "").replace(",", ".") if "," in x else x
try:
return float(x)
except Exception:
return None
def _pick_first_number(s: str) -> Optional[str]:
if not s:
return None
chunks = re.findall(r"\d+", s)
if not chunks:
return None
# Em alguns PDFs a NF vem quebrada em linhas, ex: "12282" + "58".
# Se houver um sufixo curto após um bloco maior, recompõe.
if len(chunks) >= 2 and len(chunks[-2]) >= 3 and len(chunks[-1]) <= 3:
return chunks[-2] + chunks[-1]
for c in chunks:
if len(c) >= 3:
return c
return None
def _extract_date_pt(s: str) -> Optional[str]:
if not s:
return None
s = re.sub(r"(?<=\d)\s+(?=\d)", "", s)
m = re.search(r"\d{2}\.\d{2}\.\d{4}", s)
return m.group(0) if m else None
def _nota_from_link_text(s: str) -> Optional[str]:
if not s:
return None
# querystring da URL da NFS-e
m = re.search(r"(?:nrnfs|nnfse)\s*=\s*(\d+)", s, flags=re.IGNORECASE)
return m.group(1) if m else None
def _nota_from_cell_text(s: str) -> Optional[str]:
if not s:
return None
txt = " ".join(str(s).split())
# Evita confundir com valor monetario (ex: 1315.42) e aceita sufixo "-1".
m = re.search(r"(?<![\d])(\d{3,}(?:-\d{1,4})?)(?![.,]\d)", txt)
if not m:
return None
base = m.group(1)
# Em alguns PDFs a Nota Fiscal quebra em 2 linhas (ex: "2025000012854" + "90").
if "-" not in base:
mt = re.search(re.escape(base) + r"\s+(\d{1,4})(?!\d)", txt)
if mt:
return base + mt.group(1)
return base
def _nota_digits_len(v: Optional[str]) -> int:
if not v:
return 0
return len(re.sub(r"\D", "", str(v)))
def _normalizar_nota_fiscal(v: Optional[str]) -> Optional[str]:
if not v:
return None
digits = re.sub(r"\D", "", str(v))
if not digits:
return None
# Regra de negocio: manter somente os ultimos 9 digitos.
return digits[-9:] if len(digits) > 9 else digits
def _nota_ou_dps(nota: Optional[str], nota_link: Optional[str], dps_num: Optional[str]) -> Optional[str]:
cands = [_normalizar_nota_fiscal(nota), _normalizar_nota_fiscal(nota_link), _normalizar_nota_fiscal(dps_num)]
valid = [c for c in cands if _nota_fiscal_valida(c)]
if valid:
# Se houver mais de uma opção, prefere a mais completa (mais dígitos).
return max(valid, key=_nota_digits_len)
for c in cands:
if c:
return c
return None
def _dps_num_from_text(s: str) -> Optional[str]:
if not s:
return None
m = re.search(
r"(?:dados\s*dps|dps).{0,80}?\bnum(?:ero|\.?)\s*[:=]\s*(\d{3,})\b",
s,
flags=re.IGNORECASE | re.DOTALL,
)
return m.group(1) if m else None
def _nota_fiscal_valida(nota: Optional[str]) -> bool:
if not nota:
return False
digits = re.sub(r"\D", "", str(nota))
# Evita ano (ex.: "2026") e outros falsos positivos curtos.
return len(digits) >= 5
def _extrair_parcelas(dp_text: str, emissao: Optional[str] = None) -> List[Dict[str, Any]]:
pares = re.findall(
r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})\b",
dp_text or "",
)
parcelas: List[Dict[str, Any]] = []
seen = set()
for data, v in pares:
try:
val = _parse_money_pt(v)
except Exception:
continue
k = (data, val)
if k not in seen:
seen.add(k)
parcelas.append({"data": data, "valor": val})
# Quando a coluna vem mesclada, pode capturar "emissao + desconto 0,00" como 1a parcela.
if emissao and len(parcelas) > 1:
parcelas = [
p for p in parcelas
if not (p.get("data") == emissao and abs(float(p.get("valor") or 0.0)) < 0.00001)
]
return _ordenar_parcelas_por_data(parcelas)
def _date_from_iso(s: Optional[str]):
if not s:
return None
try:
return datetime.strptime(s, "%Y-%m-%d").date()
except Exception:
return None
def _date_from_br(s: Optional[str]):
if not s:
return None
try:
return datetime.strptime(s, "%d.%m.%Y").date()
except Exception:
return None
def _ordenar_parcelas_por_data(parcelas: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Ordena parcelas por data (dd.mm.aaaa) e, em empate, por valor.
Parcelas sem data válida ficam no final.
"""
def _k(p: Dict[str, Any]):
dt = _date_from_br(p.get("data"))
val = p.get("valor")
val_num = float(val) if isinstance(val, (int, float)) else float("inf")
invalida = dt is None
return (invalida, dt or datetime.max.date(), val_num)
return sorted(parcelas or [], key=_k)
def _isolar_bloco(texto: str, titulo_regex: str, proximos_titulos_regex: List[str]) -> Optional[str]:
t = " ".join((texto or "").split())
m0 = re.search(titulo_regex, t, flags=re.IGNORECASE)
if not m0:
return None
after = t[m0.start():]
end_pos = None
for rx in proximos_titulos_regex:
m = re.search(rx, after, flags=re.IGNORECASE)
if m:
end_pos = m.start()
break
return after[:end_pos] if end_pos is not None else after
def extrair_remuneracao_franquia_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]:
"""
Extrai APENAS a tabela do bloco:
"Notas Fiscais de ServiÃÆÃ†â€™Ãƒâ€šÃ§o de taxa de remuneraÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o de Franquia"
Regra para nÃÆÃ†â€™Ãƒâ€šÃ£o confundir com "Venda de Produtos":
- o header TEM que ter "valor taxa" e "fat.conv"
"""
for page in pdf.pages:
tables = page.extract_tables() or []
for tbl in tables:
if not tbl or len(tbl) < 2:
continue
# procura um header que tenha as colunas especÃÆÃ†â€™Ãƒâ€šÃ­ficas do bloco de remuneraÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o
header_idx = None
header_join = ""
for i, row in enumerate(tbl[:8]):
header_join = " | ".join(_norm(c) for c in row if c)
# precisa ter essas chaves (venda nÃÆÃ†â€™Ãƒâ€šÃ£o tem "valor taxa")
if ("valor taxa" in header_join) and ("fat" in header_join) and ("dados pagamento" in header_join):
header_idx = i
break
if header_idx is None:
continue
nota_link_tabela = _nota_from_link_text(_table_text(tbl))
header = [_norm(c) for c in tbl[header_idx]]
def idx_like(keys: List[str]) -> Optional[int]:
for j, h in enumerate(header):
for k in keys:
if k in h:
return j
return None
i_emissao = idx_like(["emissao"])
i_nota = idx_like(["nota fiscal"])
i_valor_taxa = idx_like(["valor taxa"])
i_valor_desc = idx_like(["valor desconto"])
i_fat = idx_like(["fat.conv", "fat conv", "fat"])
i_valor_nf = idx_like(["valor nf"])
i_enc = idx_like(["encargos"])
i_dp = idx_like(["dados pagamento"])
# essenciais do bloco
if i_emissao is None or i_nota is None or i_valor_taxa is None or i_valor_nf is None or i_dp is None:
continue
# lÃÆÃ†â€™Ãƒâ€šÃª a primeira linha "real" de dados (normalmente sÃÆÃ†â€™Ãƒâ€šÃ³ tem 1)
for row in tbl[header_idx + 1:]:
if not any((c or "").strip() for c in row):
continue
emissao_cell = row[i_emissao] if i_emissao < len(row) else ""
nota_cell = row[i_nota] if i_nota < len(row) else ""
dp_cell = _cell(row[i_dp]) if i_dp < len(row) else ""
emissao = _extract_date_pt(_cell(emissao_cell))
nota = _nota_from_cell_text(_cell(nota_cell))
nota_link = _nota_from_link_text(dp_cell)
dps_num = _dps_num_from_text(dp_cell)
nota_fiscal = _nota_ou_dps(nota, nota_link or nota_link_tabela, dps_num)
if not emissao:
continue
valor_taxa = _money_from_text(row[i_valor_taxa]) if i_valor_taxa < len(row) else None
valor_desconto = _money_from_text(row[i_valor_desc]) if i_valor_desc is not None and i_valor_desc < len(row) else None
fat_conv = _float5_from_text(row[i_fat]) if i_fat is not None and i_fat < len(row) else None
valor_nf = _money_from_text(row[i_valor_nf]) if i_valor_nf < len(row) else None
encargos = _money_from_text(row[i_enc]) if i_enc is not None and i_enc < len(row) else None
# Dados pagamento: vÃÆÃ†â€™Ãƒâ€šÃ¡rias linhas data + valor
dp = dp_cell
parcelas = _extrair_parcelas(dp, emissao=emissao)
tem_valor_bloco = any(v is not None for v in (valor_taxa, valor_nf, encargos)) or bool(parcelas)
# Só considera TRF quando o bloco tem preenchimento real.
if (not _nota_fiscal_valida(nota_fiscal)) or (not tem_valor_bloco):
continue
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": emissao,
"nota_fiscal": nota_fiscal,
"valor_taxa": valor_taxa,
"valor_desconto": valor_desconto,
"fat_conv": fat_conv,
"valor_nf": valor_nf,
"encargos": encargos,
"parcelas": parcelas,
"error": None,
}
return None
def _cell(c: Any) -> str:
return " ".join(str(c or "").split())
def _table_text(tbl: List[List[Any]]) -> str:
return " | ".join(_norm(_cell(c)) for row in tbl for c in (row or []))
def _strip_accents(s: str) -> str:
return "".join(ch for ch in unicodedata.normalize("NFD", s or "") if unicodedata.category(ch) != "Mn")
def extrair_remuneracao_franquia_por_texto(texto: str) -> Dict[str, Any]:
"""
Fallback (como vocÃÆÃ†â€™Ãƒâ€šÃª jÃÆÃ†â€™Ãƒâ€šÃ¡ fazia): acha a seÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o por regex e tenta extrair
emissÃÆÃ†â€™Ãƒâ€šÃ£o, nota e parcelas. (Os valores podem falhar dependendo do PDF.)
"""
bloco = _isolar_bloco(
texto,
titulo_regex=r"Notas\s+Fiscais\s+de\s+Servi.{0,3}o\s+de\s+taxa\s+de\s+remunera.{0,5}o\s+de\s+Franquia",
proximos_titulos_regex=[
r"\bLink\s*:",
r"Notas\s+de\s+Despesas\s+de\s+Propaganda",
r"Outras\s+Notas\s+de\s+D[eÃÆÃ†â€™Ãƒâ€šÃ©]bito",
r"Outras\s+Notas\s+de\s+Cr[eÃÆÃ†â€™Ãƒâ€šÃ©]dito",
],
)
if not bloco:
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "SeÃÆÃ†â€™Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€šÃ£o nÃÆÃ†â€™Ãƒâ€šÃ£o encontrada",
}
b = " ".join(bloco.split())
m_em = re.search(r"\b(\d{2}\.\d{2}\.\d{4})\b", b)
if not m_em:
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "NÃÆÃ†â€™Ãƒâ€šÃ£o achei emissÃÆÃ†â€™Ãƒâ€šÃ£o + nota",
}
emissao = m_em.group(1)
m_nota = re.search(r"\bnota\s+fiscal\b.{0,220}?\b(\d{3,}(?:-\d{1,4})?)\b", b, flags=re.IGNORECASE)
nota = m_nota.group(1) if m_nota else None
if not nota:
m_nota2 = re.search(r"\b\d{2}\.\d{2}\.\d{4}\b\s+(\d{3,}(?:-\d{1,4})?)\b", b, flags=re.IGNORECASE)
nota = m_nota2.group(1) if m_nota2 else None
nota_link = _nota_from_link_text(texto)
dps_num = _dps_num_from_text(texto)
nota_fiscal = _nota_ou_dps(nota, nota_link, dps_num)
# parcelas depois de "Dados Pagamento"
idx_dp = b.lower().find("dados pagamento")
dp = b[idx_dp:] if idx_dp != -1 else b
parcelas = _extrair_parcelas(dp, emissao=emissao)
valor_taxa = _money_from_text(re.search(r"valor\s+taxa.{0,80}", b, flags=re.IGNORECASE).group(0)) if re.search(r"valor\s+taxa.{0,80}", b, flags=re.IGNORECASE) else None
valor_nf = _money_from_text(re.search(r"valor\s+nf.{0,80}", b, flags=re.IGNORECASE).group(0)) if re.search(r"valor\s+nf.{0,80}", b, flags=re.IGNORECASE) else None
tem_valor_bloco = any(v is not None for v in (valor_taxa, valor_nf)) or bool(parcelas)
if (not _nota_fiscal_valida(nota_fiscal)) or (not tem_valor_bloco):
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "Bloco TRF sem preenchimento valido",
}
# aqui vocÃÆÃ†â€™Ãƒâ€šÃª pode manter os valores como None no fallback
return {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": emissao,
"nota_fiscal": nota_fiscal,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": parcelas,
"error": None,
}
def extrair_remuneracao_franquia(pdf_bytes: BytesIO) -> Dict[str, Any]:
"""
Fun̮̤̮̣o ̮̼nica que voc̮̻ chama no seu fluxo:
- tenta tabela (melhor)
- se n̮̣o conseguir, cai no texto (como era)
"""
# tenta bloco principal por tabela (melhor cenÃÆÃ¡rio)
with pdfplumber.open(pdf_bytes) as pdf:
via_tabela = extrair_remuneracao_franquia_por_tabela(pdf)
if via_tabela:
return via_tabela
# fallback texto do bloco principal
pdf_bytes.seek(0)
texto = ler_texto_pdf(pdf_bytes)
via_texto = extrair_remuneracao_franquia_por_texto(texto)
if via_texto.get("emissao"):
return via_texto
return via_texto
def _tem_emissao_nota(d: Optional[Dict[str, Any]]) -> bool:
if not d:
return False
return bool(d.get("emissao")) and bool(d.get("nota_fiscal"))
def _dados_bloco(d: Dict[str, Any]) -> Dict[str, Any]:
return {
"emissao_nf": d.get("emissao"),
"nota_fiscal": d.get("nota_fiscal"),
"valor_taxa": d.get("valor_taxa"),
"valor_desconto": d.get("valor_desconto"),
"fat_conv": d.get("fat_conv"),
"valor_nf": d.get("valor_nf"),
"encargos": d.get("encargos"),
"parcelas": d.get("parcelas") or [],
}
def extrair_remuneracao_franquia_detalhado(pdf_bytes: BytesIO) -> Dict[str, Any]:
principal: Optional[Dict[str, Any]] = None
pdf_bytes.seek(0)
with pdfplumber.open(pdf_bytes) as pdf:
via_tabela = extrair_remuneracao_franquia_por_tabela(pdf)
if via_tabela:
principal = via_tabela
if not principal:
pdf_bytes.seek(0)
texto = ler_texto_pdf(pdf_bytes)
principal = extrair_remuneracao_franquia_por_texto(texto)
principal_ok = _tem_emissao_nota(principal)
escolhido = principal
if not escolhido:
escolhido = {
"tipo_bloco": "TAXA_REMUNERACAO",
"emissao": None,
"nota_fiscal": None,
"valor_taxa": None,
"valor_desconto": None,
"fat_conv": None,
"valor_nf": None,
"encargos": None,
"parcelas": [],
"error": "Falha na extração",
}
warnings: List[str] = []
if principal and principal.get("error"):
warnings.append(str(principal.get("error")))
blocos: List[Dict[str, Any]] = []
if principal:
blocos.append({
"tipo": "TAXA_REMUNERACAO",
"encontrado": principal_ok,
"dados": _dados_bloco(principal) if principal_ok else None,
"erro": None if principal_ok else principal.get("error"),
})
return {
"bloco_principal": "TAXA_REMUNERACAO",
"status_principal": "ok" if principal_ok else "sem_dados",
"bloco_utilizado": escolhido.get("tipo_bloco"),
"resumo": {
"emissao_nf": escolhido.get("emissao"),
"nota_fiscal": escolhido.get("nota_fiscal"),
"valor_nf": escolhido.get("valor_nf"),
"encargos": escolhido.get("encargos"),
},
"blocos": blocos,
"warnings": warnings,
"escolhido": escolhido, # compatibilidade com estrutura atual
}
# -----------------------------
# CLIENTE PRINCIPAL
# -----------------------------
class Client:
def __init__(self):
self.s = requests.Session()
self.auth = Auth(self.s)
self._worker_local = threading.local()
def _headers_json(self) -> Dict[str, str]:
return {
"Authorization": self.auth.get_bearer(),
"Accept": "application/json",
"Content-Type": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
def get_franchises(self, only_channels: Optional[Set[str]] = None) -> List[str]:
r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30)
if r.status_code in (401, 403):
self.auth.invalidate()
r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30)
r.raise_for_status()
body = r.json()
seen = set()
codes = []
for item in body.get("data", []):
if only_channels is not None and item.get("channel") not in only_channels:
continue
code = item.get("code")
if code is None:
continue
code = str(code).strip()
if code and code not in seen:
seen.add(code)
codes.append(code)
return codes
def get_documents_page(self, cp_id: int, document_type: str, franchises: List[str], offset: int, limit: int):
params = {"cpId": cp_id, "documentType": document_type, "offset": offset, "limit": limit}
payload = {"franchises": franchises}
r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60)
if r.status_code in (401, 403):
self.auth.invalidate()
r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60)
r.raise_for_status()
return r.json() # {"total":..., "documents":[...]}
def get_presigned_pdf_url(self, document_type: str, franchise_id: str, image_name: str) -> str:
url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download"
headers = {
"Authorization": self.auth.get_bearer(),
"Accept": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
r = self.s.get(url, headers=headers, timeout=60)
if r.status_code in (401, 403):
self.auth.invalidate()
headers["Authorization"] = self.auth.get_bearer()
r = self.s.get(url, headers=headers, timeout=60)
r.raise_for_status()
# pode vir JSON com {"url": "..."} ou texto puro
try:
body = r.json()
if isinstance(body, dict) and "url" in body and isinstance(body["url"], str):
return body["url"]
except Exception:
pass
return r.text.strip()
@staticmethod
def _headers_with_auth(auth: Auth) -> Dict[str, str]:
return {
"Authorization": auth.get_bearer(),
"Accept": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
def _get_presigned_pdf_url_with_client(
self,
session: requests.Session,
auth: Auth,
document_type: str,
franchise_id: str,
image_name: str,
) -> str:
url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download"
headers = self._headers_with_auth(auth)
r = session.get(url, headers=headers, timeout=60)
if r.status_code in (401, 403):
auth.invalidate()
headers["Authorization"] = auth.get_bearer()
r = session.get(url, headers=headers, timeout=60)
r.raise_for_status()
try:
body = r.json()
if isinstance(body, dict) and "url" in body and isinstance(body["url"], str):
return body["url"]
except Exception:
pass
return r.text.strip()
def _get_worker_http(self) -> Tuple[requests.Session, Auth]:
session = getattr(self._worker_local, "session", None)
auth = getattr(self._worker_local, "auth", None)
if session is None or auth is None:
session = requests.Session()
auth = Auth(session)
self._worker_local.session = session
self._worker_local.auth = auth
return session, auth
def _processar_documento(self, d: Dict[str, Any], document_type: str) -> Dict[str, Any]:
franchise_id = str(d.get("franchiseId") or "").strip()
image_name = str(d.get("imageName") or "").strip()
base_item = {
"id": d.get("id"),
"UUID": d.get("UUID"),
"franchiseId": franchise_id,
"imageName": image_name,
"emissionDate": d.get("emissionDate"),
}
if not franchise_id or not image_name:
return {**base_item, "error": "sem franchiseId/imageName"}
session, auth = self._get_worker_http()
try:
presigned = self._get_presigned_pdf_url_with_client(
session, auth, document_type, franchise_id, image_name
)
pdf_bytes = baixar_pdf_bytes(presigned, session)
extra_detalhado = extrair_remuneracao_franquia_detalhado(pdf_bytes)
extra = extra_detalhado.get("escolhido", {})
status_principal = extra_detalhado.get("status_principal")
resumo = extra_detalhado.get("resumo") or {}
emissao_nf = resumo.get("emissao_nf")
nota_fiscal = resumo.get("nota_fiscal")
bloco_ok = bool(status_principal == "ok" and emissao_nf and nota_fiscal)
return {
**base_item,
"bloco_principal": extra_detalhado.get("bloco_principal"),
"status_principal": status_principal,
"bloco_utilizado": extra_detalhado.get("bloco_utilizado"),
"resumo": resumo,
"blocos": extra_detalhado.get("blocos"),
"warnings": extra_detalhado.get("warnings"),
"tipo_bloco": extra.get("tipo_bloco"),
"emissao_nf": extra.get("emissao"),
"nota_fiscal": extra.get("nota_fiscal"),
"valor_taxa": extra.get("valor_taxa"),
"valor_desconto": extra.get("valor_desconto"),
"fat_conv": extra.get("fat_conv"),
"valor_nf": extra.get("valor_nf"),
"encargos": extra.get("encargos"),
"parcelas": extra.get("parcelas"),
"error": None if bloco_ok else "skip_sem_bloco_taxa_remuneracao",
}
except Exception as e:
return {**base_item, "error": f"falha_processar_pdf: {e}"}
def processar_pagina(
self,
cp_id: int = 10269,
document_type: str = "EFAT",
offset: int = 0,
limit: int = 25,
only_channels: Optional[Set[str]] = None,
max_workers: int = 6,
) -> Dict[str, Any]:
franchises = self.get_franchises(only_channels=only_channels)
page = self.get_documents_page(cp_id, document_type, franchises, offset, limit)
docs = page.get("documents", [])
total = int(page.get("total") or 0)
out: List[Dict[str, Any]] = []
if docs:
workers = max(1, min(int(max_workers or 1), len(docs)))
progress_step = max(1, min(10, len(docs) // 10 if len(docs) > 10 else 1))
if workers == 1:
out = []
for i, d in enumerate(docs, start=1):
out.append(self._processar_documento(d, document_type))
if i % progress_step == 0 or i == len(docs):
print(f"[proc] offset={offset} processados={i}/{len(docs)}")
else:
ordered: List[Optional[Dict[str, Any]]] = [None] * len(docs)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = {
ex.submit(self._processar_documento, d, document_type): i
for i, d in enumerate(docs)
}
done = 0
for fut in as_completed(futures):
i = futures[fut]
try:
ordered[i] = fut.result()
except Exception as e:
ordered[i] = {
"id": docs[i].get("id"),
"UUID": docs[i].get("UUID"),
"franchiseId": str(docs[i].get("franchiseId") or "").strip(),
"imageName": str(docs[i].get("imageName") or "").strip(),
"emissionDate": docs[i].get("emissionDate"),
"error": f"falha_processar_pdf: {e}",
}
done += 1
if done % progress_step == 0 or done == len(docs):
print(f"[proc] offset={offset} processados={done}/{len(docs)}")
out = [x for x in ordered if x is not None]
return {
"total": total,
"offset": offset,
"limit": limit,
"count": len(docs),
"hasNext": (offset + limit) < total,
"items": out
}
# -----------------------------
# SQL SERVER: persistência
# -----------------------------
class SqlServerSink:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.cn = None
self.cur = None
self.doc_table = "dbo.TrfRemDocumento"
self.parcela_table = "dbo.TrfRemParcela"
def __enter__(self):
try:
import pyodbc # type: ignore
except Exception as e:
raise RuntimeError("pyodbc não encontrado. Instale com: pip install pyodbc") from e
self.cn = pyodbc.connect(self.connection_string, timeout=30)
self.cn.autocommit = False
self.cur = self.cn.cursor()
return self
def __exit__(self, exc_type, exc, tb):
if self.cur is not None:
try:
self.cur.close()
except Exception:
pass
if self.cn is not None:
try:
if exc_type is None:
self.cn.commit()
else:
self.cn.rollback()
except Exception:
pass
try:
self.cn.close()
except Exception:
pass
def reconnect(self) -> None:
try:
if self.cur is not None:
self.cur.close()
except Exception:
pass
try:
if self.cn is not None:
self.cn.close()
except Exception:
pass
import pyodbc # type: ignore
self.cn = pyodbc.connect(self.connection_string, timeout=30)
self.cn.autocommit = False
self.cur = self.cn.cursor()
@staticmethod
def _is_comm_error(e: Exception) -> bool:
msg = str(e)
return ("08S01" in msg) or ("10054" in msg) or ("communication link failure" in msg.lower())
def ensure_schema(self) -> None:
assert self.cur is not None
self.cur.execute(
f"""
IF OBJECT_ID('{self.doc_table}', 'U') IS NULL
BEGIN
CREATE TABLE {self.doc_table} (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
UUID VARCHAR(80) NOT NULL UNIQUE,
IdExterno BIGINT NULL,
FranchiseId VARCHAR(20) NULL,
ImageName VARCHAR(150) NULL,
EmissionDate DATE NULL,
EmissaoNF DATE NULL,
NotaFiscal VARCHAR(40) NULL,
ValorNF DECIMAL(18,2) NULL,
Encargos DECIMAL(18,2) NULL,
AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END;
"""
)
self.cur.execute(
f"""
IF OBJECT_ID('{self.parcela_table}', 'U') IS NULL
BEGIN
CREATE TABLE {self.parcela_table} (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
DocumentoId BIGINT NOT NULL,
NumeroParcela INT NOT NULL,
DataVencimento DATE NOT NULL,
ValorParcela DECIMAL(18,2) NOT NULL,
CONSTRAINT FK_TrfRemParcela_Documento
FOREIGN KEY (DocumentoId) REFERENCES {self.doc_table}(id),
CONSTRAINT UQ_TrfRemParcela UNIQUE (DocumentoId, NumeroParcela)
);
END;
"""
)
self.cur.execute(
f"""
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_NotaFiscal' AND object_id=OBJECT_ID('{self.doc_table}'))
CREATE INDEX IX_TrfRemDocumento_NotaFiscal ON {self.doc_table}(NotaFiscal);
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_UUID' AND object_id=OBJECT_ID('{self.doc_table}'))
CREATE UNIQUE INDEX IX_TrfRemDocumento_UUID ON {self.doc_table}(UUID);
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_Franchise_Emission' AND object_id=OBJECT_ID('{self.doc_table}'))
CREATE INDEX IX_TrfRemDocumento_Franchise_Emission ON {self.doc_table}(FranchiseId, EmissionDate);
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemParcela_DocumentoId_DataVenc' AND object_id=OBJECT_ID('{self.parcela_table}'))
CREATE INDEX IX_TrfRemParcela_DocumentoId_DataVenc ON {self.parcela_table}(DocumentoId, DataVencimento);
"""
)
self.cn.commit()
def upsert_documento(self, item: Dict[str, Any]) -> Tuple[int, bool]:
assert self.cur is not None
uuid = item.get("UUID")
if not uuid:
raise ValueError("item sem UUID")
resumo = item.get("resumo") or {}
self.cur.execute(f"SELECT id FROM {self.doc_table} WHERE UUID = ?", uuid)
row = self.cur.fetchone()
if row:
documento_id = int(row[0])
self.cur.execute(
f"""
UPDATE {self.doc_table}
SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?,
EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?,
AtualizadoEm=SYSUTCDATETIME()
WHERE id=?
""",
item.get("id"),
item.get("franchiseId"),
item.get("imageName"),
_date_from_iso(item.get("emissionDate")),
_date_from_br(resumo.get("emissao_nf")),
resumo.get("nota_fiscal"),
resumo.get("valor_nf"),
resumo.get("encargos"),
documento_id,
)
return documento_id, False
self.cur.execute(
f"""
INSERT INTO {self.doc_table} (
UUID, IdExterno, FranchiseId, ImageName, EmissionDate,
EmissaoNF, NotaFiscal, ValorNF, Encargos
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
""",
uuid,
item.get("id"),
item.get("franchiseId"),
item.get("imageName"),
_date_from_iso(item.get("emissionDate")),
_date_from_br(resumo.get("emissao_nf")),
resumo.get("nota_fiscal"),
resumo.get("valor_nf"),
resumo.get("encargos"),
)
self.cur.execute(f"SELECT id FROM {self.doc_table} WHERE UUID = ?", uuid)
new_id = self.cur.fetchone()
if not new_id:
raise RuntimeError("Falha ao recuperar id após inserir TrfRemDocumento")
return int(new_id[0]), True
def replace_parcelas(self, documento_id: int, item: Dict[str, Any]) -> None:
assert self.cur is not None
self.cur.execute(f"DELETE FROM {self.parcela_table} WHERE DocumentoId = ?", documento_id)
parcelas = _ordenar_parcelas_por_data(item.get("parcelas") or [])
if not parcelas:
return
rows = []
for idx, p in enumerate(parcelas, start=1):
dt = _date_from_br(p.get("data"))
val = p.get("valor")
if dt is None or val is None:
continue
rows.append((documento_id, idx, dt, val))
if rows:
self.cur.fast_executemany = True
self.cur.executemany(
f"INSERT INTO {self.parcela_table} (DocumentoId, NumeroParcela, DataVencimento, ValorParcela) VALUES (?, ?, ?, ?)",
rows,
)
def persist_items(self, items: List[Dict[str, Any]]) -> Tuple[int, int]:
count = 0
novos = 0
for item in items:
err = item.get("error")
err_txt = str(err) if err is not None else ""
if err_txt:
continue
if not item.get("UUID"):
continue
resumo = item.get("resumo") or {}
if item.get("status_principal") != "ok":
continue
if not resumo.get("emissao_nf") or not resumo.get("nota_fiscal"):
continue
if not _nota_fiscal_valida(resumo.get("nota_fiscal")):
continue
if (resumo.get("valor_nf") is None) and (item.get("valor_taxa") is None) and not (item.get("parcelas") or []):
continue
try:
doc_id, is_new = self.upsert_documento(item)
self.replace_parcelas(doc_id, item)
except Exception as e:
if not self._is_comm_error(e):
raise
# reconecta e tenta uma única vez o item atual
self.reconnect()
doc_id, is_new = self.upsert_documento(item)
self.replace_parcelas(doc_id, item)
count += 1
if is_new:
novos += 1
return count, novos
def sincronizar_paginas_sqlserver(
connection_string: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
start_offset: int = 0,
only_channels: Optional[Set[str]] = None,
commit_cada_paginas: int = 1,
) -> Dict[str, Any]:
cli = Client()
offset = start_offset
paginas = 0
docs_persistidos = 0
total = None
with SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
docs_persistidos += persistidos_pag
paginas += 1
if paginas % commit_cada_paginas == 0:
sink.cn.commit()
print(f"[sync] offset={offset} count={len(itens)} novos_pag={novos_pag} persistidos={docs_persistidos} total={total}")
if not pagina.get("hasNext"):
break
offset += limit
sink.cn.commit()
return {
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"offset_final": offset,
}
def sincronizar_incremental_sqlserver(
connection_string: str,
cp_id: int = 10269,
document_type: str = "EFAT",
limit: int = 100,
only_channels: Optional[Set[str]] = None,
max_paginas_sem_novidade: int = 3,
max_paginas: int = 20,
) -> Dict[str, Any]:
cli = Client()
offset = 0
paginas = 0
docs_persistidos = 0
docs_novos = 0
sem_novidade = 0
total = None
with SqlServerSink(connection_string) as sink:
sink.ensure_schema()
while True:
pagina = cli.processar_pagina(
cp_id=cp_id,
document_type=document_type,
offset=offset,
limit=limit,
only_channels=only_channels,
)
if total is None:
total = int(pagina.get("total") or 0)
itens = pagina.get("items") or []
persistidos_pag, novos_pag = sink.persist_items(itens)
sink.cn.commit()
docs_persistidos += persistidos_pag
docs_novos += novos_pag
paginas += 1
sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1)
print(
f"[inc] offset={offset} count={len(itens)} novos_pag={novos_pag} "
f"sem_novidade={sem_novidade}/{max_paginas_sem_novidade} total_novos={docs_novos}"
)
if sem_novidade >= max_paginas_sem_novidade:
break
if paginas >= max_paginas:
break
if not pagina.get("hasNext"):
break
offset += limit
return {
"total": total,
"paginas_processadas": paginas,
"documentos_persistidos": docs_persistidos,
"documentos_novos": docs_novos,
"offset_final": offset,
"parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade,
}
# -----------------------------
# RUN
# -----------------------------
if __name__ == "__main__":
# Modos:
# - "full": carga completa paginada
# - "incremental": para após X páginas sem novidades
# - "json": só imprime uma página em JSON
RUN_MODE = "incremental" # "full" | "incremental" | "json"
if RUN_MODE in ("full", "incremental"):
SQLSERVER_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
if RUN_MODE == "full":
resultado = sincronizar_paginas_sqlserver(
connection_string=SQLSERVER_CONN,
cp_id=10269,
document_type="EFAT",
limit=100,
start_offset=0,
only_channels=None,
commit_cada_paginas=1,
)
else:
resultado = sincronizar_incremental_sqlserver(
connection_string=SQLSERVER_CONN,
cp_id=10269,
document_type="EFAT",
limit=100,
only_channels=None,
max_paginas_sem_novidade=5,
max_paginas=15,
)
else:
c = Client()
resultado = c.processar_pagina(
cp_id=10269,
document_type="EFAT",
offset=0,
limit=25,
only_channels=None,
)
print(json.dumps(resultado, ensure_ascii=False, indent=2))