commit cb2c0f8e785a746702273716cdf579078ae4d0e7 Author: Andrey Cunha Date: Sat Feb 14 23:37:32 2026 -0300 att diff --git a/__pycache__/trf.cpython-311.pyc.2092010911952 b/__pycache__/trf.cpython-311.pyc.2092010911952 new file mode 100644 index 0000000..cc6ec87 Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2092010911952 differ diff --git a/__pycache__/trf.cpython-311.pyc.2230932420912 b/__pycache__/trf.cpython-311.pyc.2230932420912 new file mode 100644 index 0000000..f716bcc Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2230932420912 differ diff --git a/__pycache__/trf.cpython-311.pyc.2382066170064 b/__pycache__/trf.cpython-311.pyc.2382066170064 new file mode 100644 index 0000000..e26b580 Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2382066170064 differ diff --git a/__pycache__/trf.cpython-311.pyc.2533927434448 b/__pycache__/trf.cpython-311.pyc.2533927434448 new file mode 100644 index 0000000..f78c84b Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2533927434448 differ diff --git a/__pycache__/trf.cpython-311.pyc.2691276814640 b/__pycache__/trf.cpython-311.pyc.2691276814640 new file mode 100644 index 0000000..5e20918 Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2691276814640 differ diff --git a/__pycache__/trf.cpython-311.pyc.2693554719680 b/__pycache__/trf.cpython-311.pyc.2693554719680 new file mode 100644 index 0000000..3100eab Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2693554719680 differ diff --git a/__pycache__/trf.cpython-311.pyc.2733743963568 b/__pycache__/trf.cpython-311.pyc.2733743963568 new file mode 100644 index 0000000..f716bcc Binary files /dev/null and b/__pycache__/trf.cpython-311.pyc.2733743963568 differ diff --git a/trf.py b/trf.py new file mode 100644 index 0000000..6e96e56 --- /dev/null +++ b/trf.py @@ -0,0 +1,1190 @@ +import requests +import pdfplumber +import base64 +import json +import time +import re +import unicodedata +from datetime import datetime +from io import BytesIO +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Set, Tuple + +TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" +STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" +DOCS_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/reports/documents-list" +HANDLE_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/handle-images" + + +# ----------------------------- +# TOKEN (com cache simples) +# ----------------------------- +def _jwt_payload(jwt_token: str) -> Dict[str, Any]: + parts = jwt_token.split(".") + if len(parts) != 3: + return {} + payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) + raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) + return json.loads(raw.decode("utf-8")) + + +@dataclass +class TokenCache: + bearer: Optional[str] = None + exp_epoch: int = 0 + + def valid(self, skew_seconds: int = 30) -> bool: + return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) + + +class Auth: + def __init__(self, s: requests.Session): + self.s = s + self.cache = TokenCache() + + def get_bearer(self) -> str: + if self.cache.valid(): + return self.cache.bearer # type: ignore + + r = self.s.get(TOKENS_URL, timeout=30) + r.raise_for_status() + body = r.json() + + if not body.get("success"): + raise RuntimeError(f"Token API retornou success=false: {body}") + + bearer = body["data"][0]["token"] # já vem "Bearer ..." + jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer + exp = int(_jwt_payload(jwt).get("exp") or 0) + + self.cache = TokenCache(bearer=bearer, exp_epoch=exp) + return bearer + + def invalidate(self) -> None: + self.cache = TokenCache() + + +# ----------------------------- +# PDF: download + leitura texto (fallback) +# ----------------------------- +def baixar_pdf_bytes(url_pdf: str, session: requests.Session) -> BytesIO: + r = session.get(url_pdf, timeout=120) + r.raise_for_status() + return BytesIO(r.content) + + +def ler_texto_pdf(pdf_bytes: BytesIO) -> str: + texto_total = "" + with pdfplumber.open(pdf_bytes) as pdf: + for pagina in pdf.pages: + texto = pagina.extract_text(layout=True) or "" + if texto: + texto_total += texto + "\n" + return texto_total + + +# ----------------------------- +# EXTRAÇÃO: Remuneração Franquia +# 1) tenta por TABELA (mais confiável) +# 2) se falhar, cai no TEXTO (como antes) +# ----------------------------- +def _norm(s: str) -> str: + return re.sub(r"\s+", " ", (s or "")).strip().lower() + + +def _parse_money_pt(s: str) -> float: + s = (s or "").strip() + # "12.769,43" -> "12769.43" + if "," in s: + s = s.replace(".", "").replace(",", ".") + return float(s) + + +def _money_from_text(s: str) -> Optional[float]: + if not s: + return None + m = re.search(r"\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2}", s) + if not m: + return None + try: + return _parse_money_pt(m.group(0)) + except Exception: + return None + + +def _float5_from_text(s: str) -> Optional[float]: + if not s: + return None + m = re.search(r"\d+(?:[.,])\d{5}", s) # 1,00000 / 1.00000 + if not m: + return None + x = m.group(0) + x = x.replace(".", "").replace(",", ".") if "," in x else x + try: + return float(x) + except Exception: + return None + + +def _pick_first_number(s: str) -> Optional[str]: + if not s: + return None + m = re.search(r"\b\d{3,}\b", s) + return m.group(0) if m else None + + +def _extract_date_pt(s: str) -> Optional[str]: + if not s: + return None + s = re.sub(r"(?<=\d)\s+(?=\d)", "", s) + m = re.search(r"\d{2}\.\d{2}\.\d{4}", s) + return m.group(0) if m else None + + +def _date_from_iso(s: Optional[str]): + if not s: + return None + try: + return datetime.strptime(s, "%Y-%m-%d").date() + except Exception: + return None + + +def _date_from_br(s: Optional[str]): + if not s: + return None + try: + return datetime.strptime(s, "%d.%m.%Y").date() + except Exception: + return None + + +def _isolar_bloco(texto: str, titulo_regex: str, proximos_titulos_regex: List[str]) -> Optional[str]: + t = " ".join((texto or "").split()) + m0 = re.search(titulo_regex, t, flags=re.IGNORECASE) + if not m0: + return None + after = t[m0.start():] + + end_pos = None + for rx in proximos_titulos_regex: + m = re.search(rx, after, flags=re.IGNORECASE) + if m: + end_pos = m.start() + break + + return after[:end_pos] if end_pos is not None else after + + +def extrair_remuneracao_franquia_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]: + """ + Extrai APENAS a tabela do bloco: + "Notas Fiscais de Serviço de taxa de remuneração de Franquia" + + Regra para não confundir com "Venda de Produtos": + - o header TEM que ter "valor taxa" e "fat.conv" + """ + for page in pdf.pages: + tables = page.extract_tables() or [] + for tbl in tables: + if not tbl or len(tbl) < 2: + continue + + # procura um header que tenha as colunas específicas do bloco de remuneração + header_idx = None + header_join = "" + for i, row in enumerate(tbl[:8]): + header_join = " | ".join(_norm(c) for c in row if c) + # precisa ter essas chaves (venda não tem "valor taxa") + if ("valor taxa" in header_join) and ("fat" in header_join) and ("dados pagamento" in header_join): + header_idx = i + break + + if header_idx is None: + continue + + header = [_norm(c) for c in tbl[header_idx]] + + def idx_like(keys: List[str]) -> Optional[int]: + for j, h in enumerate(header): + for k in keys: + if k in h: + return j + return None + + i_emissao = idx_like(["emissão", "emissao"]) + i_nota = idx_like(["nota fiscal"]) + i_valor_taxa = idx_like(["valor taxa"]) + i_valor_desc = idx_like(["valor desconto"]) + i_fat = idx_like(["fat.conv", "fat conv", "fat"]) + i_valor_nf = idx_like(["valor nf"]) + i_enc = idx_like(["encargos"]) + i_dp = idx_like(["dados pagamento"]) + + # essenciais do bloco + if i_emissao is None or i_nota is None or i_valor_taxa is None or i_valor_nf is None or i_dp is None: + continue + + # lê a primeira linha "real" de dados (normalmente só tem 1) + for row in tbl[header_idx + 1:]: + if not any((c or "").strip() for c in row): + continue + + emissao_cell = row[i_emissao] if i_emissao < len(row) else "" + nota_cell = row[i_nota] if i_nota < len(row) else "" + + emissao = _extract_date_pt(_cell(emissao_cell)) + nota = _pick_first_number(nota_cell or "") + + if not emissao or not nota: + continue + + valor_taxa = _money_from_text(row[i_valor_taxa]) if i_valor_taxa < len(row) else None + valor_desconto = _money_from_text(row[i_valor_desc]) if i_valor_desc is not None and i_valor_desc < len(row) else None + fat_conv = _float5_from_text(row[i_fat]) if i_fat is not None and i_fat < len(row) else None + valor_nf = _money_from_text(row[i_valor_nf]) if i_valor_nf < len(row) else None + encargos = _money_from_text(row[i_enc]) if i_enc is not None and i_enc < len(row) else None + + # Dados pagamento: várias linhas data + valor + parcelas: List[Dict[str, Any]] = [] + dp = row[i_dp] if i_dp < len(row) else "" + pares = re.findall( + r"(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})", + dp or "" + ) + seen = set() + for data, v in pares: + try: + val = _parse_money_pt(v) + except Exception: + continue + k = (data, val) + if k not in seen: + seen.add(k) + parcelas.append({"data": data, "valor": val}) + + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": emissao, + "nota_fiscal": nota, + "valor_taxa": valor_taxa, + "valor_desconto": valor_desconto, + "fat_conv": fat_conv, + "valor_nf": valor_nf, + "encargos": encargos, + "parcelas": parcelas, + "error": None, + } + + return None + + +def _cell(c: Any) -> str: + return " ".join(str(c or "").split()) + + +def _table_text(tbl: List[List[Any]]) -> str: + return " | ".join(_norm(_cell(c)) for row in tbl for c in (row or [])) + + +def _strip_accents(s: str) -> str: + return "".join(ch for ch in unicodedata.normalize("NFD", s or "") if unicodedata.category(ch) != "Mn") + + +def _tipo_bloco_fallback(tbl: List[List[Any]]) -> str: + t = _strip_accents(_table_text(tbl)) + if "outras notas de debito" in t or "outras nfs servicos" in t: + return "OUTRAS_NOTAS_DEBITO" + if "outras notas de credito" in t: + return "OUTRAS_NOTAS_CREDITO" + if "despesas de propaganda" in t or "esforcos de marketing" in t: + return "DESPESAS_MARKETING" + if "devolucao da franquia" in t: + return "DEVOLUCAO_FRANQUIA" + if "venda de produtos" in t: + return "VENDA_PRODUTOS" + return "BLOCO_ALTERNATIVO" + + +def extrair_bloco_com_informacao_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]: + """ + Fallback geral: encontra o primeiro bloco com emissao + nota, + quando TAXA_REMUNERACAO estiver vazio. + """ + for page in pdf.pages: + tables = page.extract_tables() or [] + for tbl in tables: + if not tbl or len(tbl) < 2: + continue + + header_idx = None + for i, row in enumerate(tbl[:10]): + header_join = _strip_accents(" | ".join(_norm(_cell(c)) for c in (row or []))) + has_emissao = "emissao" in header_join + has_nota = ( + ("nota fiscal" in header_join) + or ("nota debito" in header_join) + or ("nota credito" in header_join) + or ("n nota" in header_join) + or ("nota - duplicata" in header_join) + ) + has_valor = ("valor nf" in header_join) or (re.search(r"\bvalor\b", header_join) is not None) + if has_emissao and has_nota and has_valor: + header_idx = i + break + + if header_idx is None: + continue + + header = [_strip_accents(_norm(_cell(c))) for c in tbl[header_idx]] + + def idx_like(keys: List[str]) -> Optional[int]: + for j, h in enumerate(header): + for k in keys: + if k in h: + return j + return None + + def idx_like_prefer(keys: List[str]) -> Optional[int]: + for k in keys: + for j, h in enumerate(header): + if k in h: + return j + return None + + i_emissao = idx_like(["emissao"]) + + i_nota = idx_like_prefer([ + "n nota debito", + "n nota credito", + "nota fiscal", + "nota debito", + "nota credito", + "nota - duplicata", + "nota duplicata", + "n nota", + ]) + i_valor_nf = idx_like(["valor nf", "val. nf autorizado", "valor da nf"]) + + i_valor = idx_like(["valor"]) + + i_enc = idx_like(["encargos"]) + + i_dp = idx_like(["dados pagamento"]) + + + if i_emissao is None or i_nota is None: + continue + + for row in tbl[header_idx + 1:]: + if not any(_cell(c) for c in row): + continue + + emissao_cell = row[i_emissao] if i_emissao < len(row) else "" + nota_cell = row[i_nota] if i_nota < len(row) else "" + + emissao = _extract_date_pt(_cell(emissao_cell)) + nota = _pick_first_number(_cell(nota_cell)) + if not emissao or not nota: + continue + + valor_nf = None + if i_valor_nf is not None and i_valor_nf < len(row): + valor_nf = _money_from_text(_cell(row[i_valor_nf])) + if valor_nf is None and i_valor is not None and i_valor < len(row): + valor_nf = _money_from_text(_cell(row[i_valor])) + + encargos = _money_from_text(_cell(row[i_enc])) if i_enc is not None and i_enc < len(row) else None + + parcelas: List[Dict[str, Any]] = [] + dp = _cell(row[i_dp]) if i_dp is not None and i_dp < len(row) else "" + pares = re.findall( + r"(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})", + dp or "" + ) + seen = set() + for data, v in pares: + try: + val = _parse_money_pt(v) + except Exception: + continue + k = (data, val) + if k not in seen: + seen.add(k) + parcelas.append({"data": data, "valor": val}) + + return { + "tipo_bloco": _tipo_bloco_fallback(tbl), + "emissao": emissao, + "nota_fiscal": nota, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": valor_nf, + "encargos": encargos, + "parcelas": parcelas, + "error": "Taxa de remuneracao sem dados; usado bloco alternativo com informacao", + } + + return None + +def extrair_remuneracao_franquia_por_texto(texto: str) -> Dict[str, Any]: + """ + Fallback (como você já fazia): acha a seção por regex e tenta extrair + emissão, nota e parcelas. (Os valores podem falhar dependendo do PDF.) + """ + bloco = _isolar_bloco( + texto, + titulo_regex=r"Notas\s+Fiscais\s+de\s+Servi.{0,3}o\s+de\s+taxa\s+de\s+remunera.{0,5}o\s+de\s+Franquia", + proximos_titulos_regex=[ + r"\bLink\s*:", + r"Notas\s+de\s+Despesas\s+de\s+Propaganda", + r"Outras\s+Notas\s+de\s+D[eé]bito", + r"Outras\s+Notas\s+de\s+Cr[eé]dito", + ], + ) + if not bloco: + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Seção não encontrada", + } + + b = " ".join(bloco.split()) + + m = re.search(r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{4,})\b", b) + if not m: + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Não achei emissão + nota", + } + + emissao = m.group(1) + nota = m.group(2) + + # parcelas depois de "Dados Pagamento" + idx_dp = b.lower().find("dados pagamento") + dp = b[idx_dp:] if idx_dp != -1 else b + + pares = re.findall( + r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})\b", + dp + ) + parcelas: List[Dict[str, Any]] = [] + seen = set() + for data, v in pares: + try: + val = _parse_money_pt(v) + except Exception: + continue + k = (data, val) + if k not in seen: + seen.add(k) + parcelas.append({"data": data, "valor": val}) + + # aqui você pode manter os valores como None no fallback + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": emissao, + "nota_fiscal": nota, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": parcelas, + "error": None, + } + + +def extrair_remuneracao_franquia(pdf_bytes: BytesIO) -> Dict[str, Any]: + """ + Função única que você chama no seu fluxo: + - tenta tabela (melhor) + - se não conseguir, cai no texto (como era) + """ + # tenta bloco principal por tabela (melhor cenário) + with pdfplumber.open(pdf_bytes) as pdf: + via_tabela = extrair_remuneracao_franquia_por_tabela(pdf) + if via_tabela: + return via_tabela + + # fallback texto do bloco principal + pdf_bytes.seek(0) + texto = ler_texto_pdf(pdf_bytes) + via_texto = extrair_remuneracao_franquia_por_texto(texto) + if via_texto.get("emissao") and via_texto.get("nota_fiscal"): + return via_texto + + # se TAXA_REMUNERACAO estiver vazio, tenta qualquer outro bloco com dados + pdf_bytes.seek(0) + with pdfplumber.open(pdf_bytes) as pdf: + via_fallback = extrair_bloco_com_informacao_por_tabela(pdf) + if via_fallback: + return via_fallback + + return via_texto + + +def _tem_emissao_nota(d: Optional[Dict[str, Any]]) -> bool: + if not d: + return False + return bool(d.get("emissao") and d.get("nota_fiscal")) + + +def _dados_bloco(d: Dict[str, Any]) -> Dict[str, Any]: + return { + "emissao_nf": d.get("emissao"), + "nota_fiscal": d.get("nota_fiscal"), + "valor_taxa": d.get("valor_taxa"), + "valor_desconto": d.get("valor_desconto"), + "fat_conv": d.get("fat_conv"), + "valor_nf": d.get("valor_nf"), + "encargos": d.get("encargos"), + "parcelas": d.get("parcelas") or [], + } + + +def extrair_remuneracao_franquia_detalhado(pdf_bytes: BytesIO) -> Dict[str, Any]: + principal: Optional[Dict[str, Any]] = None + fallback: Optional[Dict[str, Any]] = None + + pdf_bytes.seek(0) + with pdfplumber.open(pdf_bytes) as pdf: + via_tabela = extrair_remuneracao_franquia_por_tabela(pdf) + if via_tabela: + principal = via_tabela + + if not principal: + pdf_bytes.seek(0) + texto = ler_texto_pdf(pdf_bytes) + principal = extrair_remuneracao_franquia_por_texto(texto) + + principal_ok = _tem_emissao_nota(principal) + if not principal_ok: + pdf_bytes.seek(0) + with pdfplumber.open(pdf_bytes) as pdf: + fallback = extrair_bloco_com_informacao_por_tabela(pdf) + + escolhido = fallback if fallback else principal + if not escolhido: + escolhido = { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Falha na extração", + } + + warnings: List[str] = [] + if not principal_ok and fallback: + warnings.append("Taxa de remuneração sem dados; usado bloco alternativo") + if principal and principal.get("error"): + warnings.append(str(principal.get("error"))) + + blocos: List[Dict[str, Any]] = [] + if principal: + blocos.append({ + "tipo": "TAXA_REMUNERACAO", + "encontrado": principal_ok, + "dados": _dados_bloco(principal) if principal_ok else None, + "erro": None if principal_ok else principal.get("error"), + }) + if fallback: + blocos.append({ + "tipo": fallback.get("tipo_bloco"), + "encontrado": _tem_emissao_nota(fallback), + "dados": _dados_bloco(fallback), + "erro": fallback.get("error"), + }) + + return { + "bloco_principal": "TAXA_REMUNERACAO", + "status_principal": "ok" if principal_ok else "sem_dados", + "bloco_utilizado": escolhido.get("tipo_bloco"), + "resumo": { + "emissao_nf": escolhido.get("emissao"), + "nota_fiscal": escolhido.get("nota_fiscal"), + "valor_nf": escolhido.get("valor_nf"), + "encargos": escolhido.get("encargos"), + }, + "blocos": blocos, + "warnings": warnings, + "escolhido": escolhido, # compatibilidade com estrutura atual + } + + +# ----------------------------- +# CLIENTE PRINCIPAL +# ----------------------------- +class Client: + def __init__(self): + self.s = requests.Session() + self.auth = Auth(self.s) + + def _headers_json(self) -> Dict[str, str]: + return { + "Authorization": self.auth.get_bearer(), + "Accept": "application/json", + "Content-Type": "application/json", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + + def get_franchises(self, only_channels: Optional[Set[str]] = None) -> List[str]: + r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30) + if r.status_code in (401, 403): + self.auth.invalidate() + r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30) + r.raise_for_status() + + body = r.json() + seen = set() + codes = [] + + for item in body.get("data", []): + if only_channels is not None and item.get("channel") not in only_channels: + continue + code = item.get("code") + if code is None: + continue + code = str(code).strip() + if code and code not in seen: + seen.add(code) + codes.append(code) + + return codes + + def get_documents_page(self, cp_id: int, document_type: str, franchises: List[str], offset: int, limit: int): + params = {"cpId": cp_id, "documentType": document_type, "offset": offset, "limit": limit} + payload = {"franchises": franchises} + + r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60) + if r.status_code in (401, 403): + self.auth.invalidate() + r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60) + + r.raise_for_status() + return r.json() # {"total":..., "documents":[...]} + + def get_presigned_pdf_url(self, document_type: str, franchise_id: str, image_name: str) -> str: + url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download" + + headers = { + "Authorization": self.auth.get_bearer(), + "Accept": "application/json", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + + r = self.s.get(url, headers=headers, timeout=60) + if r.status_code in (401, 403): + self.auth.invalidate() + headers["Authorization"] = self.auth.get_bearer() + r = self.s.get(url, headers=headers, timeout=60) + + r.raise_for_status() + + # pode vir JSON com {"url": "..."} ou texto puro + try: + body = r.json() + if isinstance(body, dict) and "url" in body and isinstance(body["url"], str): + return body["url"] + except Exception: + pass + + return r.text.strip() + + def processar_pagina( + self, + cp_id: int = 10269, + document_type: str = "EFAT", + offset: int = 0, + limit: int = 25, + only_channels: Optional[Set[str]] = None, + ) -> Dict[str, Any]: + franchises = self.get_franchises(only_channels=only_channels) + + page = self.get_documents_page(cp_id, document_type, franchises, offset, limit) + docs = page.get("documents", []) + total = int(page.get("total") or 0) + + out = [] + for d in docs: + franchise_id = str(d.get("franchiseId") or "").strip() + image_name = str(d.get("imageName") or "").strip() + + base_item = { + "id": d.get("id"), + "UUID": d.get("UUID"), + "franchiseId": franchise_id, + "imageName": image_name, + "emissionDate": d.get("emissionDate"), + } + + if not franchise_id or not image_name: + out.append({**base_item, "error": "sem franchiseId/imageName"}) + continue + + try: + presigned = self.get_presigned_pdf_url(document_type, franchise_id, image_name) + + pdf_bytes = baixar_pdf_bytes(presigned, self.s) + extra_detalhado = extrair_remuneracao_franquia_detalhado(pdf_bytes) + extra = extra_detalhado.get("escolhido", {}) + + out.append({ + **base_item, + "bloco_principal": extra_detalhado.get("bloco_principal"), + "status_principal": extra_detalhado.get("status_principal"), + "bloco_utilizado": extra_detalhado.get("bloco_utilizado"), + "resumo": extra_detalhado.get("resumo"), + "blocos": extra_detalhado.get("blocos"), + "warnings": extra_detalhado.get("warnings"), + "tipo_bloco": extra.get("tipo_bloco"), + "emissao_nf": extra.get("emissao"), + "nota_fiscal": extra.get("nota_fiscal"), + "valor_taxa": extra.get("valor_taxa"), + "valor_desconto": extra.get("valor_desconto"), + "fat_conv": extra.get("fat_conv"), + "valor_nf": extra.get("valor_nf"), + "encargos": extra.get("encargos"), + "parcelas": extra.get("parcelas"), + "error": extra.get("error"), + }) + + except Exception as e: + out.append({**base_item, "error": f"falha_processar_pdf: {e}"}) + + return { + "total": total, + "offset": offset, + "limit": limit, + "count": len(docs), + "hasNext": (offset + limit) < total, + "items": out + } + + +# ----------------------------- +# SQL SERVER: persistência +# ----------------------------- +class SqlServerSink: + def __init__(self, connection_string: str): + self.connection_string = connection_string + self.cn = None + self.cur = None + + def __enter__(self): + try: + import pyodbc # type: ignore + except Exception as e: + raise RuntimeError("pyodbc não encontrado. Instale com: pip install pyodbc") from e + + self.cn = pyodbc.connect(self.connection_string, timeout=30) + self.cn.autocommit = False + self.cur = self.cn.cursor() + return self + + def __exit__(self, exc_type, exc, tb): + if self.cur is not None: + try: + self.cur.close() + except Exception: + pass + if self.cn is not None: + try: + if exc_type is None: + self.cn.commit() + else: + self.cn.rollback() + except Exception: + pass + try: + self.cn.close() + except Exception: + pass + + def reconnect(self) -> None: + try: + if self.cur is not None: + self.cur.close() + except Exception: + pass + try: + if self.cn is not None: + self.cn.close() + except Exception: + pass + + import pyodbc # type: ignore + self.cn = pyodbc.connect(self.connection_string, timeout=30) + self.cn.autocommit = False + self.cur = self.cn.cursor() + + @staticmethod + def _is_comm_error(e: Exception) -> bool: + msg = str(e) + return ("08S01" in msg) or ("10054" in msg) or ("communication link failure" in msg.lower()) + + def ensure_schema(self) -> None: + assert self.cur is not None + self.cur.execute( + """ +IF OBJECT_ID('dbo.TrfDocumento', 'U') IS NULL +BEGIN + CREATE TABLE dbo.TrfDocumento ( + id BIGINT IDENTITY(1,1) PRIMARY KEY, + UUID VARCHAR(80) NOT NULL UNIQUE, + IdExterno BIGINT NULL, + FranchiseId VARCHAR(20) NULL, + ImageName VARCHAR(150) NULL, + EmissionDate DATE NULL, + EmissaoNF DATE NULL, + NotaFiscal VARCHAR(40) NULL, + ValorNF DECIMAL(18,2) NULL, + Encargos DECIMAL(18,2) NULL, + AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() + ); +END; + """ + ) + self.cur.execute( + """ +IF OBJECT_ID('dbo.TrfParcela', 'U') IS NULL +BEGIN + CREATE TABLE dbo.TrfParcela ( + id BIGINT IDENTITY(1,1) PRIMARY KEY, + DocumentoId BIGINT NOT NULL, + NumeroParcela INT NOT NULL, + DataVencimento DATE NOT NULL, + ValorParcela DECIMAL(18,2) NOT NULL, + CONSTRAINT FK_TrfParcela_Documento + FOREIGN KEY (DocumentoId) REFERENCES dbo.TrfDocumento(id), + CONSTRAINT UQ_TrfParcela UNIQUE (DocumentoId, NumeroParcela) + ); +END; + """ + ) + self.cur.execute( + """ +IF OBJECT_ID('dbo.TrfDocumento', 'U') IS NOT NULL + AND COL_LENGTH('dbo.TrfDocumento', 'id') IS NULL + AND COL_LENGTH('dbo.TrfDocumento', 'DocumentoId') IS NOT NULL +BEGIN + EXEC sp_rename 'dbo.TrfDocumento.DocumentoId', 'id', 'COLUMN'; +END; + +IF OBJECT_ID('dbo.TrfParcela', 'U') IS NOT NULL + AND COL_LENGTH('dbo.TrfParcela', 'id') IS NULL + AND COL_LENGTH('dbo.TrfParcela', 'ParcelaId') IS NOT NULL +BEGIN + EXEC sp_rename 'dbo.TrfParcela.ParcelaId', 'id', 'COLUMN'; +END; + """ + ) + self.cur.execute( + """ +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfDocumento_NotaFiscal' AND object_id=OBJECT_ID('dbo.TrfDocumento')) + CREATE INDEX IX_TrfDocumento_NotaFiscal ON dbo.TrfDocumento(NotaFiscal); +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfDocumento_UUID' AND object_id=OBJECT_ID('dbo.TrfDocumento')) + CREATE UNIQUE INDEX IX_TrfDocumento_UUID ON dbo.TrfDocumento(UUID); +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfDocumento_Franchise_Emission' AND object_id=OBJECT_ID('dbo.TrfDocumento')) + CREATE INDEX IX_TrfDocumento_Franchise_Emission ON dbo.TrfDocumento(FranchiseId, EmissionDate); +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfParcela_DocumentoId_DataVenc' AND object_id=OBJECT_ID('dbo.TrfParcela')) + CREATE INDEX IX_TrfParcela_DocumentoId_DataVenc ON dbo.TrfParcela(DocumentoId, DataVencimento); + """ + ) + self.cn.commit() + + def upsert_documento(self, item: Dict[str, Any]) -> Tuple[int, bool]: + assert self.cur is not None + uuid = item.get("UUID") + if not uuid: + raise ValueError("item sem UUID") + + resumo = item.get("resumo") or {} + self.cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid) + row = self.cur.fetchone() + + if row: + documento_id = int(row[0]) + self.cur.execute( + """ +UPDATE dbo.TrfDocumento +SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, + EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?, + AtualizadoEm=SYSUTCDATETIME() +WHERE id=? + """, + item.get("id"), + item.get("franchiseId"), + item.get("imageName"), + _date_from_iso(item.get("emissionDate")), + _date_from_br(resumo.get("emissao_nf")), + resumo.get("nota_fiscal"), + resumo.get("valor_nf"), + resumo.get("encargos"), + documento_id, + ) + return documento_id, False + + self.cur.execute( + """ +INSERT INTO dbo.TrfDocumento ( + UUID, IdExterno, FranchiseId, ImageName, EmissionDate, + EmissaoNF, NotaFiscal, ValorNF, Encargos +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); + """, + uuid, + item.get("id"), + item.get("franchiseId"), + item.get("imageName"), + _date_from_iso(item.get("emissionDate")), + _date_from_br(resumo.get("emissao_nf")), + resumo.get("nota_fiscal"), + resumo.get("valor_nf"), + resumo.get("encargos"), + ) + self.cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid) + new_id = self.cur.fetchone() + if not new_id: + raise RuntimeError("Falha ao recuperar id após inserir TrfDocumento") + return int(new_id[0]), True + + def replace_parcelas(self, documento_id: int, item: Dict[str, Any]) -> None: + assert self.cur is not None + self.cur.execute("DELETE FROM dbo.TrfParcela WHERE DocumentoId = ?", documento_id) + + parcelas = item.get("parcelas") or [] + if not parcelas: + return + + rows = [] + for idx, p in enumerate(parcelas, start=1): + dt = _date_from_br(p.get("data")) + val = p.get("valor") + if dt is None or val is None: + continue + rows.append((documento_id, idx, dt, val)) + + if rows: + self.cur.fast_executemany = True + self.cur.executemany( + "INSERT INTO dbo.TrfParcela (DocumentoId, NumeroParcela, DataVencimento, ValorParcela) VALUES (?, ?, ?, ?)", + rows, + ) + + def persist_items(self, items: List[Dict[str, Any]]) -> Tuple[int, int]: + count = 0 + novos = 0 + for item in items: + err = item.get("error") + err_txt = str(err) if err is not None else "" + if err_txt.startswith("falha_processar_pdf"): + continue + if not item.get("UUID"): + continue + try: + doc_id, is_new = self.upsert_documento(item) + self.replace_parcelas(doc_id, item) + except Exception as e: + if not self._is_comm_error(e): + raise + # reconecta e tenta uma única vez o item atual + self.reconnect() + doc_id, is_new = self.upsert_documento(item) + self.replace_parcelas(doc_id, item) + count += 1 + if is_new: + novos += 1 + return count, novos + + +def sincronizar_paginas_sqlserver( + connection_string: str, + cp_id: int = 10269, + document_type: str = "EFAT", + limit: int = 100, + start_offset: int = 0, + only_channels: Optional[Set[str]] = None, + commit_cada_paginas: int = 1, +) -> Dict[str, Any]: + cli = Client() + offset = start_offset + paginas = 0 + docs_persistidos = 0 + total = None + + with SqlServerSink(connection_string) as sink: + sink.ensure_schema() + while True: + pagina = cli.processar_pagina( + cp_id=cp_id, + document_type=document_type, + offset=offset, + limit=limit, + only_channels=only_channels, + ) + if total is None: + total = int(pagina.get("total") or 0) + + itens = pagina.get("items") or [] + persistidos_pag, novos_pag = sink.persist_items(itens) + docs_persistidos += persistidos_pag + paginas += 1 + + if paginas % commit_cada_paginas == 0: + sink.cn.commit() + + print(f"[sync] offset={offset} count={len(itens)} novos_pag={novos_pag} persistidos={docs_persistidos} total={total}") + if not pagina.get("hasNext"): + break + offset += limit + + sink.cn.commit() + + return { + "total": total, + "paginas_processadas": paginas, + "documentos_persistidos": docs_persistidos, + "offset_final": offset, + } + + +def sincronizar_incremental_sqlserver( + connection_string: str, + cp_id: int = 10269, + document_type: str = "EFAT", + limit: int = 100, + only_channels: Optional[Set[str]] = None, + max_paginas_sem_novidade: int = 3, + max_paginas: int = 20, +) -> Dict[str, Any]: + cli = Client() + offset = 0 + paginas = 0 + docs_persistidos = 0 + docs_novos = 0 + sem_novidade = 0 + total = None + + with SqlServerSink(connection_string) as sink: + sink.ensure_schema() + while True: + pagina = cli.processar_pagina( + cp_id=cp_id, + document_type=document_type, + offset=offset, + limit=limit, + only_channels=only_channels, + ) + if total is None: + total = int(pagina.get("total") or 0) + + itens = pagina.get("items") or [] + persistidos_pag, novos_pag = sink.persist_items(itens) + sink.cn.commit() + + docs_persistidos += persistidos_pag + docs_novos += novos_pag + paginas += 1 + sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1) + + print( + f"[inc] offset={offset} count={len(itens)} novos_pag={novos_pag} " + f"sem_novidade={sem_novidade}/{max_paginas_sem_novidade} total_novos={docs_novos}" + ) + + if sem_novidade >= max_paginas_sem_novidade: + break + if paginas >= max_paginas: + break + if not pagina.get("hasNext"): + break + offset += limit + + return { + "total": total, + "paginas_processadas": paginas, + "documentos_persistidos": docs_persistidos, + "documentos_novos": docs_novos, + "offset_final": offset, + "parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade, + } + + +# ----------------------------- +# RUN +# ----------------------------- +if __name__ == "__main__": + # Modos: + # - "full": carga completa paginada + # - "incremental": para após X páginas sem novidades + # - "json": só imprime uma página em JSON + RUN_MODE = "full" # "full" | "incremental" | "json" + + if RUN_MODE in ("full", "incremental"): + SQLSERVER_CONN = ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" + ) + if RUN_MODE == "incremental": + resultado = sincronizar_paginas_sqlserver( + connection_string=SQLSERVER_CONN, + cp_id=10269, + document_type="EFAT", + limit=100, + start_offset=0, + only_channels=None, + commit_cada_paginas=1, + ) + else: + resultado = sincronizar_incremental_sqlserver( + connection_string=SQLSERVER_CONN, + cp_id=10269, + document_type="EFAT", + limit=100, + only_channels=None, + max_paginas_sem_novidade=3, + max_paginas=20, + ) + else: + c = Client() + resultado = c.processar_pagina( + cp_id=10269, + document_type="EFAT", + offset=0, + limit=25, + only_channels=None, + ) + + print(json.dumps(resultado, ensure_ascii=False, indent=2)) +