import requests import pdfplumber import base64 import json import time import re import unicodedata import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from io import BytesIO from dataclasses import dataclass from typing import Any, Dict, List, Optional, Set, Tuple TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" DOCS_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/reports/documents-list" HANDLE_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/handle-images" # ----------------------------- # TOKEN (com cache simples) # ----------------------------- def _jwt_payload(jwt_token: str) -> Dict[str, Any]: parts = jwt_token.split(".") if len(parts) != 3: return {} payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) return json.loads(raw.decode("utf-8")) @dataclass class TokenCache: bearer: Optional[str] = None exp_epoch: int = 0 def valid(self, skew_seconds: int = 30) -> bool: return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) class Auth: def __init__(self, s: requests.Session): self.s = s self.cache = TokenCache() def get_bearer(self) -> str: if self.cache.valid(): return self.cache.bearer # type: ignore r = self.s.get(TOKENS_URL, timeout=30) r.raise_for_status() body = r.json() if not body.get("success"): raise RuntimeError(f"Token API retornou success=false: {body}") bearer = body["data"][0]["token"] # já vem "Bearer ..." jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer exp = int(_jwt_payload(jwt).get("exp") or 0) self.cache = TokenCache(bearer=bearer, exp_epoch=exp) return bearer def invalidate(self) -> None: self.cache = TokenCache() # ----------------------------- # PDF: download + leitura texto (fallback) # ----------------------------- def baixar_pdf_bytes(url_pdf: str, session: requests.Session) -> BytesIO: r = session.get(url_pdf, timeout=120) r.raise_for_status() return BytesIO(r.content) def ler_texto_pdf(pdf_bytes: BytesIO) -> str: texto_total = "" with pdfplumber.open(pdf_bytes) as pdf: for pagina in pdf.pages: texto = pagina.extract_text(layout=True) or "" if texto: texto_total += texto + "\n" return texto_total # ----------------------------- # EXTRAÇÃO: Remuneração Franquia # 1) tenta por TABELA (mais confiável) # 2) se falhar, cai no TEXTO (como antes) # ----------------------------- def _norm(s: str) -> str: base = _strip_accents(s or "") return re.sub(r"\s+", " ", base).strip().lower() def _parse_money_pt(s: str) -> float: s = (s or "").strip() # "12.769,43" -> "12769.43" if "," in s: s = s.replace(".", "").replace(",", ".") return float(s) def _money_from_text(s: str) -> Optional[float]: if not s: return None m = re.search(r"\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2}", s) if not m: return None try: return _parse_money_pt(m.group(0)) except Exception: return None def _float5_from_text(s: str) -> Optional[float]: if not s: return None m = re.search(r"\d+(?:[.,])\d{5}", s) # 1,00000 / 1.00000 if not m: return None x = m.group(0) x = x.replace(".", "").replace(",", ".") if "," in x else x try: return float(x) except Exception: return None def _pick_first_number(s: str) -> Optional[str]: if not s: return None chunks = re.findall(r"\d+", s) if not chunks: return None # Em alguns PDFs a NF vem quebrada em linhas, ex: "12282" + "58". # Se houver um sufixo curto após um bloco maior, recompõe. if len(chunks) >= 2 and len(chunks[-2]) >= 3 and len(chunks[-1]) <= 3: return chunks[-2] + chunks[-1] for c in chunks: if len(c) >= 3: return c return None def _extract_date_pt(s: str) -> Optional[str]: if not s: return None s = re.sub(r"(?<=\d)\s+(?=\d)", "", s) m = re.search(r"\d{2}\.\d{2}\.\d{4}", s) return m.group(0) if m else None def _nota_from_link_text(s: str) -> Optional[str]: if not s: return None # querystring da URL da NFS-e m = re.search(r"(?:nrnfs|nnfse)\s*=\s*(\d+)", s, flags=re.IGNORECASE) return m.group(1) if m else None def _nota_from_cell_text(s: str) -> Optional[str]: if not s: return None txt = " ".join(str(s).split()) # Evita confundir com valor monetario (ex: 1315.42) e aceita sufixo "-1". m = re.search(r"(? int: if not v: return 0 return len(re.sub(r"\D", "", str(v))) def _normalizar_nota_fiscal(v: Optional[str]) -> Optional[str]: if not v: return None digits = re.sub(r"\D", "", str(v)) if not digits: return None # Regra de negocio: manter somente os ultimos 9 digitos. return digits[-9:] if len(digits) > 9 else digits def _nota_ou_dps(nota: Optional[str], nota_link: Optional[str], dps_num: Optional[str]) -> Optional[str]: cands = [_normalizar_nota_fiscal(nota), _normalizar_nota_fiscal(nota_link), _normalizar_nota_fiscal(dps_num)] valid = [c for c in cands if _nota_fiscal_valida(c)] if valid: # Se houver mais de uma opção, prefere a mais completa (mais dígitos). return max(valid, key=_nota_digits_len) for c in cands: if c: return c return None def _dps_num_from_text(s: str) -> Optional[str]: if not s: return None m = re.search( r"(?:dados\s*dps|dps).{0,80}?\bnum(?:ero|\.?)\s*[:=]\s*(\d{3,})\b", s, flags=re.IGNORECASE | re.DOTALL, ) return m.group(1) if m else None def _nota_fiscal_valida(nota: Optional[str]) -> bool: if not nota: return False digits = re.sub(r"\D", "", str(nota)) # Evita ano (ex.: "2026") e outros falsos positivos curtos. return len(digits) >= 5 def _extrair_parcelas(dp_text: str, emissao: Optional[str] = None) -> List[Dict[str, Any]]: pares = re.findall( r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})\b", dp_text or "", ) parcelas: List[Dict[str, Any]] = [] seen = set() for data, v in pares: try: val = _parse_money_pt(v) except Exception: continue k = (data, val) if k not in seen: seen.add(k) parcelas.append({"data": data, "valor": val}) # Quando a coluna vem mesclada, pode capturar "emissao + desconto 0,00" como 1a parcela. if emissao and len(parcelas) > 1: parcelas = [ p for p in parcelas if not (p.get("data") == emissao and abs(float(p.get("valor") or 0.0)) < 0.00001) ] return _ordenar_parcelas_por_data(parcelas) def _date_from_iso(s: Optional[str]): if not s: return None try: return datetime.strptime(s, "%Y-%m-%d").date() except Exception: return None def _date_from_br(s: Optional[str]): if not s: return None try: return datetime.strptime(s, "%d.%m.%Y").date() except Exception: return None def _ordenar_parcelas_por_data(parcelas: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Ordena parcelas por data (dd.mm.aaaa) e, em empate, por valor. Parcelas sem data válida ficam no final. """ def _k(p: Dict[str, Any]): dt = _date_from_br(p.get("data")) val = p.get("valor") val_num = float(val) if isinstance(val, (int, float)) else float("inf") invalida = dt is None return (invalida, dt or datetime.max.date(), val_num) return sorted(parcelas or [], key=_k) def _isolar_bloco(texto: str, titulo_regex: str, proximos_titulos_regex: List[str]) -> Optional[str]: t = " ".join((texto or "").split()) m0 = re.search(titulo_regex, t, flags=re.IGNORECASE) if not m0: return None after = t[m0.start():] end_pos = None for rx in proximos_titulos_regex: m = re.search(rx, after, flags=re.IGNORECASE) if m: end_pos = m.start() break return after[:end_pos] if end_pos is not None else after def extrair_remuneracao_franquia_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]: """ Extrai APENAS a tabela do bloco: "Notas Fiscais de Serviço de taxa de remuneração de Franquia" Regra para não confundir com "Venda de Produtos": - o header TEM que ter "valor taxa" e "fat.conv" """ for page in pdf.pages: tables = page.extract_tables() or [] for tbl in tables: if not tbl or len(tbl) < 2: continue # procura um header que tenha as colunas específicas do bloco de remuneração header_idx = None header_join = "" for i, row in enumerate(tbl[:8]): header_join = " | ".join(_norm(c) for c in row if c) # precisa ter essas chaves (venda não tem "valor taxa") if ("valor taxa" in header_join) and ("fat" in header_join) and ("dados pagamento" in header_join): header_idx = i break if header_idx is None: continue nota_link_tabela = _nota_from_link_text(_table_text(tbl)) header = [_norm(c) for c in tbl[header_idx]] def idx_like(keys: List[str]) -> Optional[int]: for j, h in enumerate(header): for k in keys: if k in h: return j return None i_emissao = idx_like(["emissao"]) i_nota = idx_like(["nota fiscal"]) i_valor_taxa = idx_like(["valor taxa"]) i_valor_desc = idx_like(["valor desconto"]) i_fat = idx_like(["fat.conv", "fat conv", "fat"]) i_valor_nf = idx_like(["valor nf"]) i_enc = idx_like(["encargos"]) i_dp = idx_like(["dados pagamento"]) # essenciais do bloco if i_emissao is None or i_nota is None or i_valor_taxa is None or i_valor_nf is None or i_dp is None: continue # lê a primeira linha "real" de dados (normalmente só tem 1) for row in tbl[header_idx + 1:]: if not any((c or "").strip() for c in row): continue emissao_cell = row[i_emissao] if i_emissao < len(row) else "" nota_cell = row[i_nota] if i_nota < len(row) else "" dp_cell = _cell(row[i_dp]) if i_dp < len(row) else "" emissao = _extract_date_pt(_cell(emissao_cell)) nota = _nota_from_cell_text(_cell(nota_cell)) nota_link = _nota_from_link_text(dp_cell) dps_num = _dps_num_from_text(dp_cell) nota_fiscal = _nota_ou_dps(nota, nota_link or nota_link_tabela, dps_num) if not emissao: continue valor_taxa = _money_from_text(row[i_valor_taxa]) if i_valor_taxa < len(row) else None valor_desconto = _money_from_text(row[i_valor_desc]) if i_valor_desc is not None and i_valor_desc < len(row) else None fat_conv = _float5_from_text(row[i_fat]) if i_fat is not None and i_fat < len(row) else None valor_nf = _money_from_text(row[i_valor_nf]) if i_valor_nf < len(row) else None encargos = _money_from_text(row[i_enc]) if i_enc is not None and i_enc < len(row) else None # Dados pagamento: várias linhas data + valor dp = dp_cell parcelas = _extrair_parcelas(dp, emissao=emissao) tem_valor_bloco = any(v is not None for v in (valor_taxa, valor_nf, encargos)) or bool(parcelas) # Só considera TRF quando o bloco tem preenchimento real. if (not _nota_fiscal_valida(nota_fiscal)) or (not tem_valor_bloco): continue return { "tipo_bloco": "TAXA_REMUNERACAO", "emissao": emissao, "nota_fiscal": nota_fiscal, "valor_taxa": valor_taxa, "valor_desconto": valor_desconto, "fat_conv": fat_conv, "valor_nf": valor_nf, "encargos": encargos, "parcelas": parcelas, "error": None, } return None def _cell(c: Any) -> str: return " ".join(str(c or "").split()) def _table_text(tbl: List[List[Any]]) -> str: return " | ".join(_norm(_cell(c)) for row in tbl for c in (row or [])) def _strip_accents(s: str) -> str: return "".join(ch for ch in unicodedata.normalize("NFD", s or "") if unicodedata.category(ch) != "Mn") def extrair_remuneracao_franquia_por_texto(texto: str) -> Dict[str, Any]: """ Fallback (como você já fazia): acha a seção por regex e tenta extrair emissão, nota e parcelas. (Os valores podem falhar dependendo do PDF.) """ bloco = _isolar_bloco( texto, titulo_regex=r"Notas\s+Fiscais\s+de\s+Servi.{0,3}o\s+de\s+taxa\s+de\s+remunera.{0,5}o\s+de\s+Franquia", proximos_titulos_regex=[ r"\bLink\s*:", r"Notas\s+de\s+Despesas\s+de\s+Propaganda", r"Outras\s+Notas\s+de\s+D[eé]bito", r"Outras\s+Notas\s+de\s+Cr[eé]dito", ], ) if not bloco: return { "tipo_bloco": "TAXA_REMUNERACAO", "emissao": None, "nota_fiscal": None, "valor_taxa": None, "valor_desconto": None, "fat_conv": None, "valor_nf": None, "encargos": None, "parcelas": [], "error": "Seção não encontrada", } b = " ".join(bloco.split()) m_em = re.search(r"\b(\d{2}\.\d{2}\.\d{4})\b", b) if not m_em: return { "tipo_bloco": "TAXA_REMUNERACAO", "emissao": None, "nota_fiscal": None, "valor_taxa": None, "valor_desconto": None, "fat_conv": None, "valor_nf": None, "encargos": None, "parcelas": [], "error": "Não achei emissão + nota", } emissao = m_em.group(1) m_nota = re.search(r"\bnota\s+fiscal\b.{0,220}?\b(\d{3,}(?:-\d{1,4})?)\b", b, flags=re.IGNORECASE) nota = m_nota.group(1) if m_nota else None if not nota: m_nota2 = re.search(r"\b\d{2}\.\d{2}\.\d{4}\b\s+(\d{3,}(?:-\d{1,4})?)\b", b, flags=re.IGNORECASE) nota = m_nota2.group(1) if m_nota2 else None nota_link = _nota_from_link_text(texto) dps_num = _dps_num_from_text(texto) nota_fiscal = _nota_ou_dps(nota, nota_link, dps_num) # parcelas depois de "Dados Pagamento" idx_dp = b.lower().find("dados pagamento") dp = b[idx_dp:] if idx_dp != -1 else b parcelas = _extrair_parcelas(dp, emissao=emissao) valor_taxa = _money_from_text(re.search(r"valor\s+taxa.{0,80}", b, flags=re.IGNORECASE).group(0)) if re.search(r"valor\s+taxa.{0,80}", b, flags=re.IGNORECASE) else None valor_nf = _money_from_text(re.search(r"valor\s+nf.{0,80}", b, flags=re.IGNORECASE).group(0)) if re.search(r"valor\s+nf.{0,80}", b, flags=re.IGNORECASE) else None tem_valor_bloco = any(v is not None for v in (valor_taxa, valor_nf)) or bool(parcelas) if (not _nota_fiscal_valida(nota_fiscal)) or (not tem_valor_bloco): return { "tipo_bloco": "TAXA_REMUNERACAO", "emissao": None, "nota_fiscal": None, "valor_taxa": None, "valor_desconto": None, "fat_conv": None, "valor_nf": None, "encargos": None, "parcelas": [], "error": "Bloco TRF sem preenchimento valido", } # aqui você pode manter os valores como None no fallback return { "tipo_bloco": "TAXA_REMUNERACAO", "emissao": emissao, "nota_fiscal": nota_fiscal, "valor_taxa": None, "valor_desconto": None, "fat_conv": None, "valor_nf": None, "encargos": None, "parcelas": parcelas, "error": None, } def extrair_remuneracao_franquia(pdf_bytes: BytesIO) -> Dict[str, Any]: """ Função única que você chama no seu fluxo: - tenta tabela (melhor) - se não conseguir, cai no texto (como era) """ # tenta bloco principal por tabela (melhor cenário) with pdfplumber.open(pdf_bytes) as pdf: via_tabela = extrair_remuneracao_franquia_por_tabela(pdf) if via_tabela: return via_tabela # fallback texto do bloco principal pdf_bytes.seek(0) texto = ler_texto_pdf(pdf_bytes) via_texto = extrair_remuneracao_franquia_por_texto(texto) if via_texto.get("emissao"): return via_texto return via_texto def _tem_emissao_nota(d: Optional[Dict[str, Any]]) -> bool: if not d: return False return bool(d.get("emissao")) and bool(d.get("nota_fiscal")) def _dados_bloco(d: Dict[str, Any]) -> Dict[str, Any]: return { "emissao_nf": d.get("emissao"), "nota_fiscal": d.get("nota_fiscal"), "valor_taxa": d.get("valor_taxa"), "valor_desconto": d.get("valor_desconto"), "fat_conv": d.get("fat_conv"), "valor_nf": d.get("valor_nf"), "encargos": d.get("encargos"), "parcelas": d.get("parcelas") or [], } def extrair_remuneracao_franquia_detalhado(pdf_bytes: BytesIO) -> Dict[str, Any]: principal: Optional[Dict[str, Any]] = None pdf_bytes.seek(0) with pdfplumber.open(pdf_bytes) as pdf: via_tabela = extrair_remuneracao_franquia_por_tabela(pdf) if via_tabela: principal = via_tabela if not principal: pdf_bytes.seek(0) texto = ler_texto_pdf(pdf_bytes) principal = extrair_remuneracao_franquia_por_texto(texto) principal_ok = _tem_emissao_nota(principal) escolhido = principal if not escolhido: escolhido = { "tipo_bloco": "TAXA_REMUNERACAO", "emissao": None, "nota_fiscal": None, "valor_taxa": None, "valor_desconto": None, "fat_conv": None, "valor_nf": None, "encargos": None, "parcelas": [], "error": "Falha na extração", } warnings: List[str] = [] if principal and principal.get("error"): warnings.append(str(principal.get("error"))) blocos: List[Dict[str, Any]] = [] if principal: blocos.append({ "tipo": "TAXA_REMUNERACAO", "encontrado": principal_ok, "dados": _dados_bloco(principal) if principal_ok else None, "erro": None if principal_ok else principal.get("error"), }) return { "bloco_principal": "TAXA_REMUNERACAO", "status_principal": "ok" if principal_ok else "sem_dados", "bloco_utilizado": escolhido.get("tipo_bloco"), "resumo": { "emissao_nf": escolhido.get("emissao"), "nota_fiscal": escolhido.get("nota_fiscal"), "valor_nf": escolhido.get("valor_nf"), "encargos": escolhido.get("encargos"), }, "blocos": blocos, "warnings": warnings, "escolhido": escolhido, # compatibilidade com estrutura atual } # ----------------------------- # CLIENTE PRINCIPAL # ----------------------------- class Client: def __init__(self): self.s = requests.Session() self.auth = Auth(self.s) self._worker_local = threading.local() def _headers_json(self) -> Dict[str, str]: return { "Authorization": self.auth.get_bearer(), "Accept": "application/json", "Content-Type": "application/json", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", "User-Agent": "Mozilla/5.0", } def get_franchises(self, only_channels: Optional[Set[str]] = None) -> List[str]: r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30) if r.status_code in (401, 403): self.auth.invalidate() r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30) r.raise_for_status() body = r.json() seen = set() codes = [] for item in body.get("data", []): if only_channels is not None and item.get("channel") not in only_channels: continue code = item.get("code") if code is None: continue code = str(code).strip() if code and code not in seen: seen.add(code) codes.append(code) return codes def get_documents_page(self, cp_id: int, document_type: str, franchises: List[str], offset: int, limit: int): params = {"cpId": cp_id, "documentType": document_type, "offset": offset, "limit": limit} payload = {"franchises": franchises} r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60) if r.status_code in (401, 403): self.auth.invalidate() r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60) r.raise_for_status() return r.json() # {"total":..., "documents":[...]} def get_presigned_pdf_url(self, document_type: str, franchise_id: str, image_name: str) -> str: url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download" headers = { "Authorization": self.auth.get_bearer(), "Accept": "application/json", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", "User-Agent": "Mozilla/5.0", } r = self.s.get(url, headers=headers, timeout=60) if r.status_code in (401, 403): self.auth.invalidate() headers["Authorization"] = self.auth.get_bearer() r = self.s.get(url, headers=headers, timeout=60) r.raise_for_status() # pode vir JSON com {"url": "..."} ou texto puro try: body = r.json() if isinstance(body, dict) and "url" in body and isinstance(body["url"], str): return body["url"] except Exception: pass return r.text.strip() @staticmethod def _headers_with_auth(auth: Auth) -> Dict[str, str]: return { "Authorization": auth.get_bearer(), "Accept": "application/json", "Origin": "https://extranet.grupoboticario.com.br", "Referer": "https://extranet.grupoboticario.com.br/", "User-Agent": "Mozilla/5.0", } def _get_presigned_pdf_url_with_client( self, session: requests.Session, auth: Auth, document_type: str, franchise_id: str, image_name: str, ) -> str: url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download" headers = self._headers_with_auth(auth) r = session.get(url, headers=headers, timeout=60) if r.status_code in (401, 403): auth.invalidate() headers["Authorization"] = auth.get_bearer() r = session.get(url, headers=headers, timeout=60) r.raise_for_status() try: body = r.json() if isinstance(body, dict) and "url" in body and isinstance(body["url"], str): return body["url"] except Exception: pass return r.text.strip() def _get_worker_http(self) -> Tuple[requests.Session, Auth]: session = getattr(self._worker_local, "session", None) auth = getattr(self._worker_local, "auth", None) if session is None or auth is None: session = requests.Session() auth = Auth(session) self._worker_local.session = session self._worker_local.auth = auth return session, auth def _processar_documento(self, d: Dict[str, Any], document_type: str) -> Dict[str, Any]: franchise_id = str(d.get("franchiseId") or "").strip() image_name = str(d.get("imageName") or "").strip() base_item = { "id": d.get("id"), "UUID": d.get("UUID"), "franchiseId": franchise_id, "imageName": image_name, "emissionDate": d.get("emissionDate"), } if not franchise_id or not image_name: return {**base_item, "error": "sem franchiseId/imageName"} session, auth = self._get_worker_http() try: presigned = self._get_presigned_pdf_url_with_client( session, auth, document_type, franchise_id, image_name ) pdf_bytes = baixar_pdf_bytes(presigned, session) extra_detalhado = extrair_remuneracao_franquia_detalhado(pdf_bytes) extra = extra_detalhado.get("escolhido", {}) status_principal = extra_detalhado.get("status_principal") resumo = extra_detalhado.get("resumo") or {} emissao_nf = resumo.get("emissao_nf") nota_fiscal = resumo.get("nota_fiscal") bloco_ok = bool(status_principal == "ok" and emissao_nf and nota_fiscal) return { **base_item, "bloco_principal": extra_detalhado.get("bloco_principal"), "status_principal": status_principal, "bloco_utilizado": extra_detalhado.get("bloco_utilizado"), "resumo": resumo, "blocos": extra_detalhado.get("blocos"), "warnings": extra_detalhado.get("warnings"), "tipo_bloco": extra.get("tipo_bloco"), "emissao_nf": extra.get("emissao"), "nota_fiscal": extra.get("nota_fiscal"), "valor_taxa": extra.get("valor_taxa"), "valor_desconto": extra.get("valor_desconto"), "fat_conv": extra.get("fat_conv"), "valor_nf": extra.get("valor_nf"), "encargos": extra.get("encargos"), "parcelas": extra.get("parcelas"), "error": None if bloco_ok else "skip_sem_bloco_taxa_remuneracao", } except Exception as e: return {**base_item, "error": f"falha_processar_pdf: {e}"} def processar_pagina( self, cp_id: int = 10269, document_type: str = "EFAT", offset: int = 0, limit: int = 25, only_channels: Optional[Set[str]] = None, max_workers: int = 6, ) -> Dict[str, Any]: franchises = self.get_franchises(only_channels=only_channels) page = self.get_documents_page(cp_id, document_type, franchises, offset, limit) docs = page.get("documents", []) total = int(page.get("total") or 0) out: List[Dict[str, Any]] = [] if docs: workers = max(1, min(int(max_workers or 1), len(docs))) progress_step = max(1, min(10, len(docs) // 10 if len(docs) > 10 else 1)) if workers == 1: out = [] for i, d in enumerate(docs, start=1): out.append(self._processar_documento(d, document_type)) if i % progress_step == 0 or i == len(docs): print(f"[proc] offset={offset} processados={i}/{len(docs)}") else: ordered: List[Optional[Dict[str, Any]]] = [None] * len(docs) with ThreadPoolExecutor(max_workers=workers) as ex: futures = { ex.submit(self._processar_documento, d, document_type): i for i, d in enumerate(docs) } done = 0 for fut in as_completed(futures): i = futures[fut] try: ordered[i] = fut.result() except Exception as e: ordered[i] = { "id": docs[i].get("id"), "UUID": docs[i].get("UUID"), "franchiseId": str(docs[i].get("franchiseId") or "").strip(), "imageName": str(docs[i].get("imageName") or "").strip(), "emissionDate": docs[i].get("emissionDate"), "error": f"falha_processar_pdf: {e}", } done += 1 if done % progress_step == 0 or done == len(docs): print(f"[proc] offset={offset} processados={done}/{len(docs)}") out = [x for x in ordered if x is not None] return { "total": total, "offset": offset, "limit": limit, "count": len(docs), "hasNext": (offset + limit) < total, "items": out } # ----------------------------- # SQL SERVER: persistência # ----------------------------- class SqlServerSink: def __init__(self, connection_string: str): self.connection_string = connection_string self.cn = None self.cur = None self.doc_table = "dbo.TrfRemDocumento" self.parcela_table = "dbo.TrfRemParcela" def __enter__(self): try: import pyodbc # type: ignore except Exception as e: raise RuntimeError("pyodbc não encontrado. Instale com: pip install pyodbc") from e self.cn = pyodbc.connect(self.connection_string, timeout=30) self.cn.autocommit = False self.cur = self.cn.cursor() return self def __exit__(self, exc_type, exc, tb): if self.cur is not None: try: self.cur.close() except Exception: pass if self.cn is not None: try: if exc_type is None: self.cn.commit() else: self.cn.rollback() except Exception: pass try: self.cn.close() except Exception: pass def reconnect(self) -> None: try: if self.cur is not None: self.cur.close() except Exception: pass try: if self.cn is not None: self.cn.close() except Exception: pass import pyodbc # type: ignore self.cn = pyodbc.connect(self.connection_string, timeout=30) self.cn.autocommit = False self.cur = self.cn.cursor() @staticmethod def _is_comm_error(e: Exception) -> bool: msg = str(e) return ("08S01" in msg) or ("10054" in msg) or ("communication link failure" in msg.lower()) def ensure_schema(self) -> None: assert self.cur is not None self.cur.execute( f""" IF OBJECT_ID('{self.doc_table}', 'U') IS NULL BEGIN CREATE TABLE {self.doc_table} ( id BIGINT IDENTITY(1,1) PRIMARY KEY, UUID VARCHAR(80) NOT NULL UNIQUE, IdExterno BIGINT NULL, FranchiseId VARCHAR(20) NULL, ImageName VARCHAR(150) NULL, EmissionDate DATE NULL, EmissaoNF DATE NULL, NotaFiscal VARCHAR(40) NULL, ValorNF DECIMAL(18,2) NULL, Encargos DECIMAL(18,2) NULL, AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() ); END; """ ) self.cur.execute( f""" IF OBJECT_ID('{self.parcela_table}', 'U') IS NULL BEGIN CREATE TABLE {self.parcela_table} ( id BIGINT IDENTITY(1,1) PRIMARY KEY, DocumentoId BIGINT NOT NULL, NumeroParcela INT NOT NULL, DataVencimento DATE NOT NULL, ValorParcela DECIMAL(18,2) NOT NULL, CONSTRAINT FK_TrfRemParcela_Documento FOREIGN KEY (DocumentoId) REFERENCES {self.doc_table}(id), CONSTRAINT UQ_TrfRemParcela UNIQUE (DocumentoId, NumeroParcela) ); END; """ ) self.cur.execute( f""" IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_NotaFiscal' AND object_id=OBJECT_ID('{self.doc_table}')) CREATE INDEX IX_TrfRemDocumento_NotaFiscal ON {self.doc_table}(NotaFiscal); IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_UUID' AND object_id=OBJECT_ID('{self.doc_table}')) CREATE UNIQUE INDEX IX_TrfRemDocumento_UUID ON {self.doc_table}(UUID); IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_Franchise_Emission' AND object_id=OBJECT_ID('{self.doc_table}')) CREATE INDEX IX_TrfRemDocumento_Franchise_Emission ON {self.doc_table}(FranchiseId, EmissionDate); IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemParcela_DocumentoId_DataVenc' AND object_id=OBJECT_ID('{self.parcela_table}')) CREATE INDEX IX_TrfRemParcela_DocumentoId_DataVenc ON {self.parcela_table}(DocumentoId, DataVencimento); """ ) self.cn.commit() def upsert_documento(self, item: Dict[str, Any]) -> Tuple[int, bool]: assert self.cur is not None uuid = item.get("UUID") if not uuid: raise ValueError("item sem UUID") resumo = item.get("resumo") or {} self.cur.execute(f"SELECT id FROM {self.doc_table} WHERE UUID = ?", uuid) row = self.cur.fetchone() if row: documento_id = int(row[0]) self.cur.execute( f""" UPDATE {self.doc_table} SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME() WHERE id=? """, item.get("id"), item.get("franchiseId"), item.get("imageName"), _date_from_iso(item.get("emissionDate")), _date_from_br(resumo.get("emissao_nf")), resumo.get("nota_fiscal"), resumo.get("valor_nf"), resumo.get("encargos"), documento_id, ) return documento_id, False self.cur.execute( f""" INSERT INTO {self.doc_table} ( UUID, IdExterno, FranchiseId, ImageName, EmissionDate, EmissaoNF, NotaFiscal, ValorNF, Encargos ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """, uuid, item.get("id"), item.get("franchiseId"), item.get("imageName"), _date_from_iso(item.get("emissionDate")), _date_from_br(resumo.get("emissao_nf")), resumo.get("nota_fiscal"), resumo.get("valor_nf"), resumo.get("encargos"), ) self.cur.execute(f"SELECT id FROM {self.doc_table} WHERE UUID = ?", uuid) new_id = self.cur.fetchone() if not new_id: raise RuntimeError("Falha ao recuperar id após inserir TrfRemDocumento") return int(new_id[0]), True def replace_parcelas(self, documento_id: int, item: Dict[str, Any]) -> None: assert self.cur is not None self.cur.execute(f"DELETE FROM {self.parcela_table} WHERE DocumentoId = ?", documento_id) parcelas = _ordenar_parcelas_por_data(item.get("parcelas") or []) if not parcelas: return rows = [] for idx, p in enumerate(parcelas, start=1): dt = _date_from_br(p.get("data")) val = p.get("valor") if dt is None or val is None: continue rows.append((documento_id, idx, dt, val)) if rows: self.cur.fast_executemany = True self.cur.executemany( f"INSERT INTO {self.parcela_table} (DocumentoId, NumeroParcela, DataVencimento, ValorParcela) VALUES (?, ?, ?, ?)", rows, ) def persist_items(self, items: List[Dict[str, Any]]) -> Tuple[int, int]: count = 0 novos = 0 for item in items: err = item.get("error") err_txt = str(err) if err is not None else "" if err_txt: continue if not item.get("UUID"): continue resumo = item.get("resumo") or {} if item.get("status_principal") != "ok": continue if not resumo.get("emissao_nf") or not resumo.get("nota_fiscal"): continue if not _nota_fiscal_valida(resumo.get("nota_fiscal")): continue if (resumo.get("valor_nf") is None) and (item.get("valor_taxa") is None) and not (item.get("parcelas") or []): continue try: doc_id, is_new = self.upsert_documento(item) self.replace_parcelas(doc_id, item) except Exception as e: if not self._is_comm_error(e): raise # reconecta e tenta uma única vez o item atual self.reconnect() doc_id, is_new = self.upsert_documento(item) self.replace_parcelas(doc_id, item) count += 1 if is_new: novos += 1 return count, novos def sincronizar_paginas_sqlserver( connection_string: str, cp_id: int = 10269, document_type: str = "EFAT", limit: int = 100, start_offset: int = 0, only_channels: Optional[Set[str]] = None, commit_cada_paginas: int = 1, ) -> Dict[str, Any]: cli = Client() offset = start_offset paginas = 0 docs_persistidos = 0 total = None with SqlServerSink(connection_string) as sink: sink.ensure_schema() while True: pagina = cli.processar_pagina( cp_id=cp_id, document_type=document_type, offset=offset, limit=limit, only_channels=only_channels, ) if total is None: total = int(pagina.get("total") or 0) itens = pagina.get("items") or [] persistidos_pag, novos_pag = sink.persist_items(itens) docs_persistidos += persistidos_pag paginas += 1 if paginas % commit_cada_paginas == 0: sink.cn.commit() print(f"[sync] offset={offset} count={len(itens)} novos_pag={novos_pag} persistidos={docs_persistidos} total={total}") if not pagina.get("hasNext"): break offset += limit sink.cn.commit() return { "total": total, "paginas_processadas": paginas, "documentos_persistidos": docs_persistidos, "offset_final": offset, } def sincronizar_incremental_sqlserver( connection_string: str, cp_id: int = 10269, document_type: str = "EFAT", limit: int = 100, only_channels: Optional[Set[str]] = None, max_paginas_sem_novidade: int = 3, max_paginas: int = 20, ) -> Dict[str, Any]: cli = Client() offset = 0 paginas = 0 docs_persistidos = 0 docs_novos = 0 sem_novidade = 0 total = None with SqlServerSink(connection_string) as sink: sink.ensure_schema() while True: pagina = cli.processar_pagina( cp_id=cp_id, document_type=document_type, offset=offset, limit=limit, only_channels=only_channels, ) if total is None: total = int(pagina.get("total") or 0) itens = pagina.get("items") or [] persistidos_pag, novos_pag = sink.persist_items(itens) sink.cn.commit() docs_persistidos += persistidos_pag docs_novos += novos_pag paginas += 1 sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1) print( f"[inc] offset={offset} count={len(itens)} novos_pag={novos_pag} " f"sem_novidade={sem_novidade}/{max_paginas_sem_novidade} total_novos={docs_novos}" ) if sem_novidade >= max_paginas_sem_novidade: break if paginas >= max_paginas: break if not pagina.get("hasNext"): break offset += limit return { "total": total, "paginas_processadas": paginas, "documentos_persistidos": docs_persistidos, "documentos_novos": docs_novos, "offset_final": offset, "parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade, } # ----------------------------- # RUN # ----------------------------- if __name__ == "__main__": # Modos: # - "full": carga completa paginada # - "incremental": para após X páginas sem novidades # - "json": só imprime uma página em JSON RUN_MODE = "incremental" # "full" | "incremental" | "json" if RUN_MODE in ("full", "incremental"): SQLSERVER_CONN = ( "DRIVER={ODBC Driver 17 for SQL Server};" "SERVER=10.77.77.10;" "DATABASE=GINSENG;" "UID=andrey;" "PWD=88253332;" "TrustServerCertificate=yes;" ) if RUN_MODE == "full": resultado = sincronizar_paginas_sqlserver( connection_string=SQLSERVER_CONN, cp_id=10269, document_type="EFAT", limit=100, start_offset=0, only_channels=None, commit_cada_paginas=1, ) else: resultado = sincronizar_incremental_sqlserver( connection_string=SQLSERVER_CONN, cp_id=10269, document_type="EFAT", limit=100, only_channels=None, max_paginas_sem_novidade=3, max_paginas=20, ) else: c = Client() resultado = c.processar_pagina( cp_id=10269, document_type="EFAT", offset=0, limit=25, only_channels=None, ) print(json.dumps(resultado, ensure_ascii=False, indent=2))