commit 8b72f58d028f659cca2517576b3ece7a9d11d570 Author: Andrey Cunh@ Date: Thu Feb 26 12:57:53 2026 -0300 att trf rem diff --git a/debit_notes_pdf_reader.py b/debit_notes_pdf_reader.py new file mode 100644 index 0000000..bb1a483 --- /dev/null +++ b/debit_notes_pdf_reader.py @@ -0,0 +1,596 @@ +import base64 +import json +import re +import time +from dataclasses import dataclass +from datetime import datetime +from io import BytesIO +from typing import Any, Dict, List, Optional + +import requests + +try: + import pdfplumber # type: ignore +except Exception: + pdfplumber = None + +TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" +STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" +DEBIT_NOTES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/debit-notes/documents-list" +HANDLE_IMAGES_URL = "https://sf-fiscal-api.grupoboticario.digital/v1/handle-images" + + +class UnauthorizedTokenError(RuntimeError): + pass + + +def _jwt_payload(jwt_token: str) -> Dict[str, Any]: + parts = jwt_token.split(".") + if len(parts) != 3: + return {} + payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) + raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) + return json.loads(raw.decode("utf-8")) + + +@dataclass +class TokenCache: + bearer: Optional[str] = None + exp_epoch: int = 0 + + def valid(self, skew_seconds: int = 30) -> bool: + return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) + + +class Auth: + def __init__(self, session: requests.Session): + self.s = session + self.cache = TokenCache() + + def get_bearer(self, force_refresh: bool = False) -> str: + if (not force_refresh) and self.cache.valid(): + return self.cache.bearer # type: ignore[return-value] + + r = self.s.get(TOKENS_URL, timeout=30) + r.raise_for_status() + body = r.json() + if not body.get("success"): + raise RuntimeError(f"Token API retornou success=false: {body}") + + bearer = body["data"][0]["token"] + jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer + exp = int(_jwt_payload(jwt).get("exp") or 0) + self.cache = TokenCache(bearer=bearer, exp_epoch=exp) + return bearer + + def invalidate(self) -> None: + self.cache = TokenCache() + + +def _headers(auth: Auth, content_type: bool = True) -> Dict[str, str]: + h = { + "Authorization": auth.get_bearer(), + "Accept": "application/json, text/plain, */*", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + if content_type: + h["Content-Type"] = "application/json" + return h + + +def get_franchise_ids(session: requests.Session, auth: Auth) -> List[str]: + r = None + for attempt in range(4): + r = session.get(STORES_URL, headers=_headers(auth, content_type=False), timeout=30) + if r.status_code not in (401, 403): + break + print(f"[warn] token invalido ao listar franquias (tentativa {attempt + 1}/4), renovando token...") + auth.invalidate() + auth.get_bearer(force_refresh=True) + time.sleep(min(3, attempt + 1)) + + assert r is not None + r.raise_for_status() + + out: List[str] = [] + seen = set() + for item in r.json().get("data", []): + code = str(item.get("code") or "").strip() + if code and code not in seen: + seen.add(code) + out.append(code) + return out + + +def get_debit_notes_page( + session: requests.Session, + auth: Auth, + cp_id: int, + skip: int, + take: int, + franchise_ids: List[str], +) -> Dict[str, Any]: + params = {"cpId": cp_id, "skip": skip, "take": take} + payload = {"franchiseId": franchise_ids} + r = None + max_attempts = 8 + transient_status = {429, 500, 502, 503, 504} + for attempt in range(max_attempts): + r = session.post(DEBIT_NOTES_URL, headers=_headers(auth), params=params, json=payload, timeout=90) + if r.status_code in (401, 403): + print( + f"[warn] token invalido ao buscar pagina skip={skip} " + f"(tentativa {attempt + 1}/{max_attempts}), renovando token..." + ) + auth.invalidate() + auth.get_bearer(force_refresh=True) + time.sleep(min(5, attempt + 1)) + continue + + if r.status_code in transient_status: + wait_s = min(30, 2 ** min(5, attempt)) + print( + f"[warn] erro temporario {r.status_code} em skip={skip} " + f"(tentativa {attempt + 1}/{max_attempts}), relogando e aguardando {wait_s}s..." + ) + auth.invalidate() + auth.get_bearer(force_refresh=True) + time.sleep(wait_s) + continue + + break + + assert r is not None + if r.status_code in (401, 403): + raise UnauthorizedTokenError( + f"401/403 persistente em documents-list skip={skip} body={r.text[:300]}" + ) + if r.status_code in transient_status: + raise RuntimeError( + f"Falha temporaria persistente ({r.status_code}) em documents-list skip={skip}. " + f"Body={r.text[:300]}" + ) + r.raise_for_status() + return r.json() + + +def get_download_url( + session: requests.Session, + auth: Auth, + document_type: str, + franchise_id: str, + document_id: int, + image_name: str, +) -> str: + url = f"{HANDLE_IMAGES_URL}/{document_type}/{franchise_id}/{document_id}/{image_name}/download" + r = None + for attempt in range(4): + r = session.get(url, headers=_headers(auth, content_type=False), timeout=60) + if r.status_code not in (401, 403): + break + print( + f"[warn] token invalido no handle-images doc={document_id} " + f"(tentativa {attempt + 1}/4), renovando token..." + ) + auth.invalidate() + auth.get_bearer(force_refresh=True) + time.sleep(min(3, attempt + 1)) + + assert r is not None + r.raise_for_status() + + txt = (r.text or "").strip() + if txt.startswith("http://") or txt.startswith("https://"): + return txt + try: + body = r.json() + if isinstance(body, dict) and isinstance(body.get("url"), str): + return body["url"] + except Exception: + pass + raise RuntimeError("Resposta de handle-images nao contem URL de download") + + +def download_pdf_bytes(session: requests.Session, url: str) -> bytes: + r = session.get(url, timeout=180) + r.raise_for_status() + return r.content + + +def extract_pdf_text(pdf_bytes: bytes) -> str: + if pdfplumber is None: + raise RuntimeError("pdfplumber nao instalado (pip install pdfplumber)") + + text = [] + with pdfplumber.open(BytesIO(pdf_bytes)) as pdf: + for page in pdf.pages: + page_text = page.extract_text(layout=True) or "" + if page_text: + text.append(page_text) + return "\n".join(text).strip() + + +def parse_money_br(value: Optional[str]) -> Optional[float]: + if not value: + return None + s = value.strip().replace(".", "").replace(",", ".") + try: + return float(s) + except Exception: + return None + + +def parse_date_br(value: Optional[str]): + if not value: + return None + try: + return datetime.strptime(value.strip(), "%d.%m.%Y").date() + except Exception: + return None + + +def numero_parcela_from_text(value: Optional[str], fallback: int) -> int: + s = (value or "").strip() + if "/" in s: + right = s.split("/", 1)[1].strip() + if right.isdigit(): + n = int(right) + # NumeroParcela em TrfParcela é int, mas semanticamente é contador curto. + # Evita valores absurdos vindos de identificadores longos. + if 1 <= n <= 9999: + return n + return int(fallback) + m = re.search(r"(\d+)$", s) + if m: + n = int(m.group(1)) + if 1 <= n <= 9999: + return n + return int(fallback) + + +def parse_pdf_fields(text: str) -> Dict[str, Any]: + m_cliente = re.search(r"Cliente:\s*(\d+)", text, flags=re.IGNORECASE) + m_nota = re.search(r"NOTA\s+DE\s+D[ÉE]BITO\s*:\s*([A-Z0-9-]+)", text, flags=re.IGNORECASE) + m_emissao = re.search( + r"Data\s+(?:de\s+)?emiss[aã]o\s*:\s*(\d{2}\.\d{2}\.\d{4})", + text, + flags=re.IGNORECASE, + ) + m_total = re.search( + r"Valor\s+Total\s+de\s+.+?\s+(\d{1,3}(?:\.\d{3})*,\d{2})", + text, + flags=re.IGNORECASE, + ) + + parcelas: List[Dict[str, Any]] = [] + for num, venc, val in re.findall( + # Modelos suportados: + # - "269326604/1 25.03.2026 17,04" + # - "CTPT022609765408 09.02.2026 6,60" + r"([A-Z0-9/-]+)\s+(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2})", + text, + flags=re.IGNORECASE, + ): + parcelas.append( + { + "numero": num.strip(), + "vencimento": venc.strip(), + "valor": val.strip(), + "valor_num": parse_money_br(val), + } + ) + + denominacao = None + m_den = re.search( + r"Denomina[cç][aã]o\s+Valor\s+(.+?)\s+(\d{1,3}(?:\.\d{3})*,\d{2})", + text, + flags=re.IGNORECASE | re.DOTALL, + ) + if m_den: + denominacao = " ".join(m_den.group(1).split()).strip() + if not denominacao: + # Fallback para layout novo: pega a primeira linha de item da seção Denominação. + m_den2 = re.search( + r"Denomina[cç][aã]o\s+Valor\s+(.+?)\s+\d{1,3}(?:\.\d{3})*,\d{2}\s+Valor\s+Total\s+de\s+D[ÉE]bito", + text, + flags=re.IGNORECASE | re.DOTALL, + ) + if m_den2: + denominacao = " ".join(m_den2.group(1).split()).strip() + + return { + "cliente": m_cliente.group(1) if m_cliente else None, + "notaDebito": m_nota.group(1) if m_nota else None, + "dataEmissao": m_emissao.group(1) if m_emissao else None, + "valorTotalDebito": m_total.group(1) if m_total else None, + "valorTotalDebitoNum": parse_money_br(m_total.group(1) if m_total else None), + "denominacao": denominacao, + "parcelas": parcelas, + "parcelasCount": len(parcelas), + "parcelasSomaNum": sum(p["valor_num"] for p in parcelas if isinstance(p.get("valor_num"), (int, float))), + } + + +def upsert_rows_sqlserver(rows: List[Dict[str, Any]], connection_string: str) -> Dict[str, int]: + try: + import pyodbc # type: ignore + except Exception as e: + raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e + + if not rows: + return {"documentos": 0, "parcelas": 0} + + cn = pyodbc.connect(connection_string, timeout=30) + cn.autocommit = False + cur = cn.cursor() + docs = 0 + pars = 0 + + try: + for row in rows: + id_externo = row.get("id") + if id_externo is None: + continue + uuid = f"ND-{id_externo}" + + cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid) + found = cur.fetchone() + if found: + doc_id = int(found[0]) + cur.execute( + """ +UPDATE dbo.TrfDocumento +SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, + EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?, AtualizadoEm=SYSUTCDATETIME() +WHERE id=? + """, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + row.get("valorTotalDebitoNum"), + 0.0, + doc_id, + ) + else: + cur.execute( + """ +INSERT INTO dbo.TrfDocumento ( + UUID, IdExterno, FranchiseId, ImageName, EmissionDate, + EmissaoNF, NotaFiscal, ValorNF, Encargos +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + uuid, + int(id_externo), + str(row.get("franchiseId") or "")[:20] or None, + str(row.get("imageName") or "")[:150] or None, + parse_date_br(row.get("dataEmissao")), + parse_date_br(row.get("dataEmissao")), + str(row.get("notaDebito") or "")[:40] or None, + row.get("valorTotalDebitoNum"), + 0.0, + ) + cur.execute("SELECT id FROM dbo.TrfDocumento WHERE UUID = ?", uuid) + got = cur.fetchone() + if not got: + continue + doc_id = int(got[0]) + + docs += 1 + cur.execute("DELETE FROM dbo.TrfParcela WHERE DocumentoId = ?", doc_id) + + for idx, p in enumerate(row.get("parcelas") or [], start=1): + dt = parse_date_br(p.get("vencimento")) + val = p.get("valor_num") + if dt is None or val is None: + continue + num_parc = numero_parcela_from_text(p.get("numero"), idx) + cur.execute( + """ +INSERT INTO dbo.TrfParcela (DocumentoId, NumeroParcela, DataVencimento, ValorParcela) +VALUES (?, ?, ?, ?) + """, + doc_id, + int(num_parc), + dt, + float(val), + ) + pars += 1 + + cn.commit() + return {"documentos": docs, "parcelas": pars} + except Exception: + cn.rollback() + raise + finally: + cur.close() + cn.close() + + +def get_existing_docs_map_sqlserver(doc_ids: List[int], connection_string: str) -> Dict[int, Optional[str]]: + try: + import pyodbc # type: ignore + except Exception as e: + raise RuntimeError("pyodbc nao encontrado. Instale com: pip install pyodbc") from e + + clean_ids = sorted({int(x) for x in doc_ids if x is not None}) + if not clean_ids: + return {} + + cn = pyodbc.connect(connection_string, timeout=30) + cur = cn.cursor() + out: Dict[int, Optional[str]] = {} + try: + chunk_size = 900 # evita limite de parametros do SQL Server + for i in range(0, len(clean_ids), chunk_size): + chunk = clean_ids[i : i + chunk_size] + placeholders = ",".join("?" for _ in chunk) + sql = f""" +SELECT IdExterno, ImageName +FROM dbo.TrfDocumento +WHERE IdExterno IN ({placeholders}) + """ + cur.execute(sql, *chunk) + for row in cur.fetchall(): + ext_id = int(row[0]) + img = str(row[1]).strip() if row[1] is not None else None + out[ext_id] = img + return out + finally: + cur.close() + cn.close() + + +def main() -> None: + # Fluxo fixo: leitura da API -> extracao do PDF em memoria -> upsert SQL. + CP_ID = 10269 + TAKE = 25 + MAX_PAGINAS_RECENTES = 15 + DOCUMENT_TYPE = "NDEB" + SQL_CONN = ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" + ) + + def _new_client() -> tuple[requests.Session, Auth, List[str]]: + s = requests.Session() + s.trust_env = False + a = Auth(s) + f = get_franchise_ids(s, a) + return s, a, f + + session, auth, franchises = _new_client() + total_docs_upsert = 0 + total_parcs_upsert = 0 + skip = 0 # para testes, pular os primeiros 900 registros (36 paginas) e ir direto para os mais recentes. Ajustar para 0 para rodar do inicio. O endpoint suporta skip alto, mas pode ser mais lento. O ideal é rodar periodicamente com skip=0 para pegar os novos registros. + skip_inicial = skip + total = None + pagina = 0 + total_paginas = None + relogins = 0 + max_relogins = 20 + while True: + try: + page = get_debit_notes_page(session, auth, CP_ID, skip, TAKE, franchises) + except UnauthorizedTokenError as e: + relogins += 1 + if relogins > max_relogins: + raise RuntimeError( + f"Falha apos {max_relogins} relogins. Ultimo erro: {e}" + ) from e + print( + f"[relogin] 401 persistente em skip={skip}. " + f"Refazendo sessao/token ({relogins}/{max_relogins})..." + ) + session, auth, franchises = _new_client() + time.sleep(2) + continue + if total is None: + try: + total = int(page.get("documentsTotal") or 0) + except Exception: + total = 0 + total_paginas = (total + TAKE - 1) // TAKE if total > 0 else None + if total_paginas: + print(f"[info] total_registros={total} total_paginas={total_paginas} take={TAKE}") + else: + print(f"[info] total_registros={total} take={TAKE}") + + docs = page.get("documentsList") or [] + if not docs: + break + + pagina += 1 + pagina_global = (skip // TAKE) + 1 + total_paginas_restantes = None + if total is not None and total > 0: + restantes = max(0, total - skip_inicial) + total_paginas_restantes = (restantes + TAKE - 1) // TAKE if restantes > 0 else 0 + if total_paginas: + if total_paginas_restantes is not None: + print( + f"[page] baixando pagina_execucao={pagina}/{total_paginas_restantes} " + f"pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})" + ) + else: + print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global}/{total_paginas} (itens={len(docs)})") + else: + print(f"[page] baixando pagina_execucao={pagina} pagina_global={pagina_global} (itens={len(docs)})") + + doc_ids_page: List[int] = [] + for d in docs: + try: + doc_ids_page.append(int(d.get("id"))) + except Exception: + continue + existing_map = get_existing_docs_map_sqlserver(doc_ids_page, SQL_CONN) + + page_rows: List[Dict[str, Any]] = [] + skipped_existing = 0 + for d in docs: + doc_id = int(d.get("id")) + franchise_id = str(d.get("franchiseId") or "").strip() + image_name = str(d.get("imageName") or "").strip() + existing_image_name = existing_map.get(doc_id) + if doc_id in existing_map and (existing_image_name or "") == image_name: + skipped_existing += 1 + print(f"[skip] {doc_id} ja existe no SQL com mesmo imageName") + continue + try: + dl_url = get_download_url( + session, + auth, + DOCUMENT_TYPE, + franchise_id, + doc_id, # para esse endpoint usa "id" do documento + image_name, + ) + pdf_bytes = download_pdf_bytes(session, dl_url) + txt = extract_pdf_text(pdf_bytes) + parsed = parse_pdf_fields(txt) + page_rows.append( + { + "id": doc_id, + "franchiseId": franchise_id, + "imageName": image_name, + **parsed, + } + ) + print(f"[ok] {doc_id} -> {image_name}") + except Exception as e: + print(f"[erro] {doc_id} -> {e}") + + if page_rows: + stats_page = upsert_rows_sqlserver(page_rows, SQL_CONN) + total_docs_upsert += int(stats_page.get("documentos") or 0) + total_parcs_upsert += int(stats_page.get("parcelas") or 0) + print( + f"[sql] pagina={pagina} docs_upsert={stats_page['documentos']} " + f"parc_upsert={stats_page['parcelas']} acumulado_docs={total_docs_upsert} " + f"acumulado_parc={total_parcs_upsert}" + ) + if skipped_existing: + print(f"[page] pagina={pagina} skip_sql={skipped_existing}") + + skip += TAKE + print(f"[page] concluida {pagina} acumulado_docs={total_docs_upsert} total={total}") + if pagina >= MAX_PAGINAS_RECENTES: + print(f"[stop] limite diario atingido: {MAX_PAGINAS_RECENTES} paginas recentes") + break + if total and skip >= total: + break + + print(f"SQL upsert final -> documentos={total_docs_upsert} parcelas={total_parcs_upsert}") + + +if __name__ == "__main__": + main() diff --git a/sagezza.py b/sagezza.py new file mode 100644 index 0000000..cbbc5ab --- /dev/null +++ b/sagezza.py @@ -0,0 +1,1144 @@ +import os +import re +import base64 +import json +import io +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock +from datetime import datetime +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urljoin, unquote +import xml.etree.ElementTree as ET +import unicodedata +from pathlib import Path + +import requests +import pdfplumber +from minio import Minio + + +BASE_URL = "https://sistema.sgztrade.com.br" +LOGIN_PATH = "/login" +DEFAULT_TIMEOUT = 30 +OWNCLOUD_WEBDAV_URL = "https://owncloud.hqssolucoes.com.br/public.php/webdav/" + +DEFAULT_SGZ_CREDENTIALS: List[Tuple[str, str]] = [ + ("20968@sgztrade.com.br", "sgz123"), + ("20969@sgztrade.com.br", "sgz123"), + ("20970@sgztrade.com.br", "sgz123"), + ("20986@sgztrade.com.br", "sgz123"), + ("20988@sgztrade.com.br", "sgz123"), + ("20989@sgztrade.com.br", "sgz123"), + ("20992@sgztrade.com.br", "sgz123"), + ("20993@sgztrade.com.br", "sgz123"), + ("20994@sgztrade.com.br", "sgz123"), + ("20995@sgztrade.com.br", "sgz123"), + ("20996@sgztrade.com.br", "sgz123"), + ("20997@sgztrade.com.br", "sgz123"), + ("20998@sgztrade.com.br", "sgz123"), + ("20999@sgztrade.com.br", "sgz123"), + ("21000@sgztrade.com.br", "sgz123"), + ("21001@sgztrade.com.br", "sgz123"), + ("21278@sgztrade.com.br", "sgz123"), + ("21495@sgztrade.com.br", "sgz123"), + ("21383@sgztrade.com.br", "sgz123"), + ("21375@sgztrade.com.br", "sgz123"), + ("20991@sgztrade.com.br", "sgz123"), + ("22541@sgztrade.com.br", "sgz123"), + ("23813@sgztrade.com.br", "sgz123@@"), + ("24257@sgztrade.com.br", "sgz123@@"), + ("24255@sgztrade.com.br", "sgz123@@"), + ("24293@sgztrade.com.br", "sgz123@@"), + ("24269@sgztrade.com.br", "sgz123@@"), + ("910173@sgztrade.com.br", "sgz123"), + ("910291@sgztrade.com.br", "sgz123"), + ("23711@sgztrade.com.br", "sgz123@@"), + ("23712@sgztrade.com.br", "sgz@@123"), + ("23708@sgztrade.com.br", "sgz123@@"), + ("23704@sgztrade.com.br", "sgz123@@"), + ("23703@sgztrade.com.br", "sgz123@@"), +] +MOCK_OWNCLOUD_SHARE_PASSWORD = "null" + +SQLSERVER_CONN = os.getenv( + "SQLSERVER_CONN", + ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "Encrypt=no;" + "TrustServerCertificate=yes;" + ), +) + +MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "10.77.77.29:31200") +MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "admin") +MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "admin123") +MINIO_BUCKET = os.getenv("MINIO_BUCKET", "boletosvitrine") +MINIO_PREFIX = os.getenv("MINIO_PREFIX", "Boletos") +MINIO_SECURE = os.getenv("MINIO_SECURE", "false").strip().lower() in ("1", "true", "yes") +MAX_PARALLEL_REQUESTS = 5 +MAX_PARALLEL_PDFS_PER_USER = 1 +DOWNLOAD_RETRY_ATTEMPTS = 4 +DOWNLOAD_RETRY_BASE_SECONDS = 0.8 +SQL_RETRY_ATTEMPTS = 4 +SQL_RETRY_BASE_SECONDS = 0.8 +SQL_UPSERT_LOCK = Lock() + + +def _normalize_sql_conn(conn: str) -> str: + s = (conn or "").strip().rstrip(";") + if not s: + return s + # Força compatibilidade neste ambiente. + if re.search(r"(?i)\bencrypt\s*=", s): + s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s) + else: + s += ";Encrypt=no" + if not re.search(r"(?i)\btrustservercertificate\s*=", s): + s += ";TrustServerCertificate=yes" + return s + ";" + + +def _extract_csrf_token(html: str) -> str: + """ + Extrai o valor do input hidden _token da página de login. + """ + m = re.search( + r']+name=["\']_token["\'][^>]+value=["\']([^"\']+)["\']', + html or "", + flags=re.IGNORECASE, + ) + if not m: + raise RuntimeError("Nao foi possivel extrair o token CSRF (_token) da pagina de login.") + return m.group(1) + + +@dataclass +class LoginResult: + ok: bool + status_code: int + final_url: str + + +@dataclass +class WebDavEntry: + href: str + name: str + is_dir: bool + size: Optional[int] = None + last_modified: Optional[str] = None + + +def _strip_accents(text: str) -> str: + return "".join( + c for c in unicodedata.normalize("NFKD", text or "") if not unicodedata.combining(c) + ) + + +def _norm_name(text: str) -> str: + return _strip_accents((text or "")).strip().lower() + + +def _normalize_spaces(text: str) -> str: + return re.sub(r"\s+", " ", (text or "")).strip() + + +def _first_group(pattern: str, text: str, flags: int = 0) -> Optional[str]: + m = re.search(pattern, text or "", flags=flags) + return m.group(1).strip() if m else None + + +def _extract_boleto_fields_from_text(text: str, source_name: str) -> Optional[Dict[str, Optional[str]]]: + t = text or "" + if not t: + return None + + linha_digitavel = _first_group( + r"(\d{5}\.\d{5}\s+\d{5}\.\d{6}\s+\d{5}\.\d{6}\s+\d\s+\d{14})", + t, + flags=re.IGNORECASE, + ) + if not linha_digitavel and "Recibo do Pagador" not in t: + return None + + beneficiario_nome = _first_group( + r"Benefici[aá]rio\s+(.+?)\s+CNPJ\s*:\s*[0-9./-]+", + t, + flags=re.IGNORECASE | re.DOTALL, + ) + cnpj_beneficiario = _first_group( + r"Benefici[aá]rio.+?CNPJ\s*:\s*([0-9./-]{14,18})", + t, + flags=re.IGNORECASE | re.DOTALL, + ) + pagador_nome = _first_group( + r"Pagador\s+(.+?)\s+CPF/CNPJ\s*:\s*[0-9./-]+", + t, + flags=re.IGNORECASE | re.DOTALL, + ) + cnpj_pagador = _first_group( + r"Pagador.+?CPF/CNPJ\s*:\s*([0-9./-]{14,18})", + t, + flags=re.IGNORECASE | re.DOTALL, + ) + vencimento = _first_group( + r"Vencimento\s+(\d{2}/\d{2}/\d{4})", + t, + flags=re.IGNORECASE, + ) + valor_documento = _first_group( + r"Valor do Documento.*?(\d{1,3}(?:\.\d{3})*,\d{2})", + t, + flags=re.IGNORECASE | re.DOTALL, + ) + agencia_codigo_beneficiario = _first_group( + r"Ag[êe]ncia\s*/\s*C[oó]digo Benefici[aá]rio\s+([0-9\-]+\s*/\s*[0-9\-]+)", + t, + flags=re.IGNORECASE, + ) + nosso_numero = _first_group( + r"Nosso N[uú]mero\s+.*?\n.*?(\d{2}/\d{6,}-\d)", + t, + flags=re.IGNORECASE, + ) or _first_group( + r"\b(\d{2}/\d{6,}-\d)\b", + t, + flags=re.IGNORECASE, + ) + + numero_documento = _first_group( + r"NF\s+(\d+)", + source_name, + flags=re.IGNORECASE, + ) or _first_group( + r"N[uú]mero do Documento.*?\n.*?(\d{4,})", + t, + flags=re.IGNORECASE, + ) + + return { + "beneficiario": _normalize_spaces(beneficiario_nome or ""), + "cnpj_beneficiario": (cnpj_beneficiario or "").strip(), + "pagador": _normalize_spaces(pagador_nome or ""), + "cnpj_pagador": (cnpj_pagador or "").strip(), + "numero_documento": (numero_documento or "").strip(), + "nosso_numero": (nosso_numero or "").strip(), + "vencimento": (vencimento or "").strip(), + "valor_documento": (valor_documento or "").strip(), + "linha_digitavel": (linha_digitavel or "").strip(), + "agencia_codigo_beneficiario": (agencia_codigo_beneficiario or "").strip(), + } + + +def extract_boleto_from_pdf_bytes( + pdf_bytes: bytes, + source_name: str, + source_path: str, +) -> Optional[Dict[str, Optional[str]]]: + text_parts: List[str] = [] + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: + for page in pdf.pages[:2]: + page_text = page.extract_text(layout=True) or "" + if page_text: + text_parts.append(page_text) + full_text = "\n".join(text_parts) + data = _extract_boleto_fields_from_text(full_text, source_name) + if not data: + return None + data["arquivo"] = source_path + return data + + +def _digits_only(v: Optional[str]) -> str: + return re.sub(r"\D+", "", v or "") + + +def _parse_money_br(v: Optional[str]) -> Optional[float]: + s = (v or "").strip() + if not s: + return None + s = s.replace(".", "").replace(",", ".") + try: + return float(s) + except Exception: + return None + + +def _parse_date_br(v: Optional[str]) -> Optional[datetime.date]: + s = (v or "").strip() + if not s: + return None + try: + return datetime.strptime(s, "%d/%m/%Y").date() + except Exception: + return None + + +def _extract_44_from_name(name: str) -> Optional[str]: + m = re.search(r"(? List[Tuple[str, str]]: + creds: List[Tuple[str, str]] = [] + seen = set() + for line in (raw or "").splitlines(): + row = (line or "").strip() + if not row: + continue + low = row.lower() + if "usuario" in low and "senha" in low: + continue + if row.startswith("#"): + continue + + parts = re.split(r"\s+", row) + if len(parts) < 2: + continue + email = parts[0].strip() + password = parts[1].strip() + if "@" not in email or not password: + continue + + key = (email.lower(), password) + if key in seen: + continue + seen.add(key) + creds.append((email, password)) + return creds + + +def load_sgz_credentials() -> List[Tuple[str, str]]: + """ + Carrega credenciais SGZ em lote nesta ordem: + 1) SGZ_CREDENTIALS_RAW (multilinha, formato: "email senha") + 2) SGZ_CREDENTIALS_FILE (se informado) + 3) credenciais padrao embutidas no codigo (DEFAULT_SGZ_CREDENTIALS) + 4) SGZ_EMAIL/SGZ_PASSWORD + """ + raw_env = os.getenv("SGZ_CREDENTIALS_RAW", "") + parsed_env = _parse_sgz_credentials_text(raw_env) + if parsed_env: + return parsed_env + + creds_file = os.getenv("SGZ_CREDENTIALS_FILE", "").strip() + if creds_file: + p = Path(creds_file) + if p.exists() and p.is_file(): + parsed_file = _parse_sgz_credentials_text( + p.read_text(encoding="utf-8", errors="ignore") + ) + if parsed_file: + return parsed_file + + if DEFAULT_SGZ_CREDENTIALS: + return list(DEFAULT_SGZ_CREDENTIALS) + + email = os.getenv("SGZ_EMAIL", "").strip() + password = os.getenv("SGZ_PASSWORD", "").strip() + if not email or not password: + raise RuntimeError( + "Defina SGZ_CREDENTIALS_RAW, SGZ_CREDENTIALS_FILE, " + "DEFAULT_SGZ_CREDENTIALS ou SGZ_EMAIL/SGZ_PASSWORD." + ) + return [(email, password)] + + +def _extract_nf_from_name(name: str) -> Optional[str]: + m = re.search(r"\bNF\s*(\d+)\b", name or "", flags=re.IGNORECASE) + return m.group(1) if m else None + + +def _guess_cte_nfe_keys_from_xml_names(xml_names: List[str]) -> Dict[str, Optional[str]]: + cte_key: Optional[str] = None + nfe_key: Optional[str] = None + for name in xml_names: + key = _extract_44_from_name(name) + if not key: + continue + model = key[20:22] + if model == "57": + cte_key = cte_key or key + elif model == "55": + nfe_key = nfe_key or key + elif "_" in Path(name).stem: + cte_key = cte_key or key + return {"chave_cte": cte_key, "chave_nfe": nfe_key} + + +def criar_minio_client(): + client = Minio( + MINIO_ENDPOINT, + access_key=MINIO_ACCESS_KEY, + secret_key=MINIO_SECRET_KEY, + secure=MINIO_SECURE, + ) + if not client.bucket_exists(MINIO_BUCKET): + client.make_bucket(MINIO_BUCKET) + return client, MINIO_BUCKET, MINIO_PREFIX + + +def upload_bytes_minio(client: Minio, bucket: str, prefix: str, object_name: str, data: bytes, content_type: str) -> str: + object_name = object_name.lstrip("/") + if prefix: + object_name = f"{prefix.rstrip('/')}/{object_name}" + + client.put_object( + bucket, + object_name, + io.BytesIO(data), + length=len(data), + content_type=content_type, + ) + return object_name + + +def _safe_file_part(name: str) -> str: + return re.sub(r"[^A-Za-z0-9._-]+", "_", name or "") + + +def _fit(v: Optional[str], max_len: int) -> Optional[str]: + if v is None: + return None + s = str(v).strip() + if not s: + return None + return s[:max_len] + + +def ensure_boleto_cte_schema(cur) -> None: + cur.execute( + """ +IF OBJECT_ID('dbo.BoletoCte', 'U') IS NULL +BEGIN + CREATE TABLE dbo.BoletoCte ( + id BIGINT IDENTITY(1,1) PRIMARY KEY, + ArquivoLocal NVARCHAR(500) NOT NULL, + Beneficiario NVARCHAR(300) NULL, + CnpjBeneficiario VARCHAR(14) NULL, + Pagador NVARCHAR(300) NULL, + CnpjPagador VARCHAR(14) NULL, + NumeroDocumento VARCHAR(40) NULL, + NossoNumero VARCHAR(60) NULL, + Vencimento DATE NULL, + ValorDocumento DECIMAL(18,2) NULL, + LinhaDigitavel VARCHAR(80) NULL, + AgenciaCodigoBeneficiario VARCHAR(40) NULL, + ChaveCte CHAR(44) NULL, + ChaveNfe CHAR(44) NULL, + MinioBucket VARCHAR(100) NULL, + MinioObjectKey VARCHAR(500) NULL, + AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() + ); +END; + """ + ) + cur.execute( + """ +IF NOT EXISTS ( + SELECT 1 FROM sys.indexes + WHERE name='UQ_BoletoCte_ArquivoLocal' + AND object_id=OBJECT_ID('dbo.BoletoCte') +) +BEGIN + CREATE UNIQUE INDEX UQ_BoletoCte_ArquivoLocal ON dbo.BoletoCte(ArquivoLocal); +END; + """ + ) + + +def upsert_boleto_cte(cur, row: Dict[str, Optional[str]]) -> None: + beneficiario = _fit(row.get("beneficiario"), 300) + cnpj_beneficiario = _fit(_digits_only(row.get("cnpj_beneficiario")) or None, 14) + pagador = _fit(row.get("pagador"), 300) + cnpj_pagador = _fit(_digits_only(row.get("cnpj_pagador")) or None, 14) + numero_documento = _fit(row.get("numero_documento"), 40) + nosso_numero = _fit(row.get("nosso_numero"), 60) + linha_digitavel = _fit(row.get("linha_digitavel"), 80) + ag_cod_benef = _fit(row.get("agencia_codigo_beneficiario"), 40) + chave_cte = _fit(row.get("chave_cte"), 44) + chave_nfe = _fit(row.get("chave_nfe"), 44) + minio_bucket = _fit(row.get("minio_bucket"), 100) + minio_object_key = _fit(row.get("minio_object_key"), 500) + arquivo_local = _fit(row.get("arquivo"), 500) + + cur.execute( + """ +SELECT id FROM dbo.BoletoCte WHERE ArquivoLocal = ? + """, + arquivo_local, + ) + exists = cur.fetchone() + if exists: + cur.execute( + """ +UPDATE dbo.BoletoCte +SET Beneficiario=?, CnpjBeneficiario=?, Pagador=?, CnpjPagador=?, + NumeroDocumento=?, NossoNumero=?, Vencimento=?, ValorDocumento=?, + LinhaDigitavel=?, AgenciaCodigoBeneficiario=?, ChaveCte=?, ChaveNfe=?, + MinioBucket=?, MinioObjectKey=?, AtualizadoEm=SYSUTCDATETIME() +WHERE ArquivoLocal=? + """, + beneficiario, + cnpj_beneficiario, + pagador, + cnpj_pagador, + numero_documento, + nosso_numero, + _parse_date_br(row.get("vencimento")), + _parse_money_br(row.get("valor_documento")), + linha_digitavel, + ag_cod_benef, + chave_cte, + chave_nfe, + minio_bucket, + minio_object_key, + arquivo_local, + ) + return + + cur.execute( + """ +INSERT INTO dbo.BoletoCte ( + ArquivoLocal, Beneficiario, CnpjBeneficiario, Pagador, CnpjPagador, + NumeroDocumento, NossoNumero, Vencimento, ValorDocumento, LinhaDigitavel, + AgenciaCodigoBeneficiario, ChaveCte, ChaveNfe, MinioBucket, MinioObjectKey +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + arquivo_local, + beneficiario, + cnpj_beneficiario, + pagador, + cnpj_pagador, + numero_documento, + nosso_numero, + _parse_date_br(row.get("vencimento")), + _parse_money_br(row.get("valor_documento")), + linha_digitavel, + ag_cod_benef, + chave_cte, + chave_nfe, + minio_bucket, + minio_object_key, + ) + + +def process_boletos_to_minio_sql(boletos: List[Dict[str, Optional[str]]]) -> int: + if not boletos: + return 0 + + client, bucket, prefix = criar_minio_client() + prepared_rows: List[Dict[str, Optional[str]]] = [] + uploads = 0 + for b in boletos: + arquivo_str = (b.get("arquivo") or "").strip() + arquivo = Path(arquivo_str) if arquivo_str else None + + pdf_bytes = b.get("_pdf_bytes") # type: ignore[assignment] + if isinstance(pdf_bytes, str): + pdf_bytes = pdf_bytes.encode("utf-8", errors="ignore") + if not isinstance(pdf_bytes, (bytes, bytearray)): + continue + + base_name = arquivo.name if arquivo else Path(arquivo_str or "boleto.pdf").name + nf = _extract_nf_from_name(base_name) + if nf and b.get("chave_cte"): + obj = f"boleto/cte_{b.get('chave_cte')}_nf_{nf}_{_safe_file_part(base_name)}" + elif nf and b.get("chave_nfe"): + obj = f"boleto/nfe_{b.get('chave_nfe')}_nf_{nf}_{_safe_file_part(base_name)}" + elif b.get("chave_nfe"): + obj = f"boleto/nfe_{b.get('chave_nfe')}_{_safe_file_part(base_name)}" + elif nf: + obj = f"boleto/nf_{nf}_{_safe_file_part(base_name)}" + else: + obj = f"boleto/{_safe_file_part(base_name)}" + object_key = upload_bytes_minio( + client, + bucket, + prefix, + obj, + pdf_bytes, + "application/pdf", + ) + uploads += 1 + + b["minio_bucket"] = bucket + b["minio_object_key"] = object_key + row = dict(b) + row.pop("_pdf_bytes", None) + prepared_rows.append(row) + + try: + import pyodbc # type: ignore + except Exception: + print(f"pyodbc ausente. Upload MinIO OK={uploads}. Upsert SQL ignorado.") + return uploads + + for attempt in range(1, SQL_RETRY_ATTEMPTS + 1): + try: + with SQL_UPSERT_LOCK: + cn = pyodbc.connect(_normalize_sql_conn(SQLSERVER_CONN), timeout=30) + cn.autocommit = False + cur = cn.cursor() + ensure_boleto_cte_schema(cur) + + upserts = 0 + for row in prepared_rows: + upsert_boleto_cte(cur, row) + upserts += 1 + cn.commit() + cur.close() + cn.close() + print(f"Upload MinIO OK={uploads} | Upsert SQL OK={upserts}") + return upserts + except Exception as e: + msg = str(e) + is_deadlock = " 1205" in msg or "(1205)" in msg or "deadlock" in msg.lower() + if is_deadlock and attempt < SQL_RETRY_ATTEMPTS: + time.sleep(SQL_RETRY_BASE_SECONDS * (2 ** (attempt - 1))) + continue + print(f"SQL indisponivel ({e}). Upload MinIO OK={uploads}. Upsert SQL ignorado.") + return uploads + + +class SagezzaClient: + """ + Cliente HTTP para autenticação e navegação autenticada no sistema Saggezza. + Fluxo: + 1) GET /login para obter cookies + CSRF (_token) + 2) POST /login com email, password, _token e remember + 3) Reuso da mesma session para páginas internas + """ + + def __init__( + self, + base_url: str = BASE_URL, + timeout: int = DEFAULT_TIMEOUT, + session: Optional[requests.Session] = None, + ): + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.s = session or requests.Session() + # Evita proxy automático do ambiente (causa erro 10061 nesta máquina). + self.s.trust_env = False + self.s.headers.update( + { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.0.0 Safari/537.36" + ) + } + ) + + def _abs_url(self, path_or_url: str) -> str: + if path_or_url.startswith("http://") or path_or_url.startswith("https://"): + return path_or_url + return urljoin(f"{self.base_url}/", path_or_url.lstrip("/")) + + def _get_login_page(self) -> str: + login_url = self._abs_url(LOGIN_PATH) + r = self.s.get(login_url, timeout=self.timeout) + r.raise_for_status() + return r.text + + def login(self, email: str, password: str, remember: bool = True) -> LoginResult: + login_page = self._get_login_page() + csrf_token = _extract_csrf_token(login_page) + + payload = { + "_token": csrf_token, + "email": email, + "password": password, + "remember": "on" if remember else "", + } + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Origin": self.base_url, + "Referer": self._abs_url(LOGIN_PATH), + } + + r = self.s.post( + self._abs_url(LOGIN_PATH), + data=payload, + headers=headers, + timeout=self.timeout, + allow_redirects=True, + ) + + # Em sucesso, normalmente sai de /login após o redirect. + final_url = (r.url or "").rstrip("/") + login_url = self._abs_url(LOGIN_PATH).rstrip("/") + ok = (r.status_code == 200) and (final_url != login_url) + return LoginResult(ok=ok, status_code=r.status_code, final_url=r.url) + + def is_logged(self) -> bool: + """ + Heurística simples: sessão autenticada costuma ter cookie de sessão + URL inicial acessível. + """ + if "jrd_session" not in self.s.cookies: + return False + r = self.s.get(self._abs_url("/"), timeout=self.timeout, allow_redirects=False) + return r.status_code in (200, 302) + + def get(self, path_or_url: str, **kwargs: Any) -> requests.Response: + timeout = kwargs.pop("timeout", self.timeout) + r = self.s.get(self._abs_url(path_or_url), timeout=timeout, **kwargs) + r.raise_for_status() + return r + + def post(self, path_or_url: str, **kwargs: Any) -> requests.Response: + timeout = kwargs.pop("timeout", self.timeout) + r = self.s.post(self._abs_url(path_or_url), timeout=timeout, **kwargs) + r.raise_for_status() + return r + + +class OwnCloudWebDavClient: + """ + Cliente WebDAV para listar pastas/arquivos no ownCloud via PROPFIND. + """ + + def __init__( + self, + webdav_url: str = OWNCLOUD_WEBDAV_URL, + timeout: int = DEFAULT_TIMEOUT, + session: Optional[requests.Session] = None, + auth_header: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ): + self.webdav_url = webdav_url.rstrip("/") + "/" + self.timeout = timeout + self.s = session or requests.Session() + # Evita proxy automático do ambiente (causa erro 10061 nesta máquina). + self.s.trust_env = False + self.auth_header = auth_header + self.username = username + self.password = password + self.s.headers.update( + { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.0.0 Safari/537.36" + ) + } + ) + + def _target_url(self, relative_path: str = "") -> str: + relative_path = (relative_path or "").lstrip("/") + return urljoin(self.webdav_url, relative_path) + + @staticmethod + def _href_to_rel_path(href: str) -> str: + path = unquote((href or "").strip()) + marker = "/public.php/webdav/" + if marker in path: + path = path.split(marker, 1)[1] + return path.strip("/") + + def _build_headers(self, depth: int) -> Dict[str, str]: + headers = { + "Depth": str(depth), + "Content-Type": "application/xml; charset=UTF-8", + } + if self.auth_header: + headers["Authorization"] = self.auth_header + return headers + + def _auth_tuple(self): + if self.username is not None: + return (self.username, self.password or "") + return None + + def _auth_headers(self) -> Dict[str, str]: + return {"Authorization": self.auth_header} if self.auth_header else {} + + @staticmethod + def build_basic_auth(username: str, password: str) -> str: + raw = f"{username}:{password}" + b64 = base64.b64encode(raw.encode("utf-8")).decode("ascii") + return f"Basic {b64}" + + def _propfind_body(self) -> str: + return ( + '' + '' + "" + "" + "" + "" + "" + "" + "" + ) + + @staticmethod + def _parse_multistatus(xml_text: str) -> List[WebDavEntry]: + ns = {"d": "DAV:"} + root = ET.fromstring(xml_text) + entries: List[WebDavEntry] = [] + first = True + + for resp in root.findall("d:response", ns): + href = (resp.findtext("d:href", default="", namespaces=ns) or "").strip() + prop = resp.find("d:propstat/d:prop", ns) + if prop is None: + continue + + displayname = ( + prop.findtext("d:displayname", default="", namespaces=ns) or "" + ).strip() + resourcetype = prop.find("d:resourcetype", ns) + is_dir = resourcetype is not None and resourcetype.find("d:collection", ns) is not None + size_raw = (prop.findtext("d:getcontentlength", default="", namespaces=ns) or "").strip() + last_modified = ( + prop.findtext("d:getlastmodified", default="", namespaces=ns) or "" + ).strip() or None + + # Em PROPFIND Depth 1, o primeiro item costuma ser a propria pasta consultada. + if first: + first = False + continue + + name = displayname or href.rstrip("/").split("/")[-1] + size = int(size_raw) if size_raw.isdigit() else None + entries.append( + WebDavEntry( + href=href, + name=name, + is_dir=is_dir, + size=size, + last_modified=last_modified, + ) + ) + return entries + + def list_entries(self, path: str = "", depth: int = 1) -> List[WebDavEntry]: + body = self._propfind_body() + headers = self._build_headers(depth=depth) + auth = self._auth_tuple() + r = self.s.request( + "PROPFIND", + self._target_url(path), + data=body.encode("utf-8"), + headers=headers, + auth=auth, + timeout=self.timeout, + ) + if r.status_code not in (207, 200): + if r.status_code == 401: + raise RuntimeError( + "PROPFIND 401 (nao autenticado no ownCloud). " + "Defina OWNCLOUD_AUTH_HEADER='Basic ...' " + "ou OWNCLOUD_USER/OWNCLOUD_PASS." + ) + raise RuntimeError( + f"PROPFIND falhou. status={r.status_code} body={r.text[:500]}" + ) + return self._parse_multistatus(r.text) + + def list_folders(self, path: str = "", depth: int = 1) -> List[WebDavEntry]: + return [x for x in self.list_entries(path=path, depth=depth) if x.is_dir] + + def download_file_bytes(self, remote_path: str) -> bytes: + last_exc: Optional[Exception] = None + for attempt in range(1, DOWNLOAD_RETRY_ATTEMPTS + 1): + try: + r = self.s.get( + self._target_url(remote_path), + timeout=self.timeout, + stream=True, + headers=self._auth_headers(), + auth=self._auth_tuple(), + ) + if r.status_code == 401: + # Algumas instancias exigem reafirmar auth em nova conexao. + r = self.s.get( + self._target_url(remote_path), + timeout=self.timeout, + stream=True, + headers=self._auth_headers(), + auth=self._auth_tuple(), + ) + r.raise_for_status() + chunks: List[bytes] = [] + for chunk in r.iter_content(chunk_size=1024 * 256): + if chunk: + chunks.append(chunk) + return b"".join(chunks) + except Exception as exc: + last_exc = exc + if attempt >= DOWNLOAD_RETRY_ATTEMPTS: + break + wait_s = DOWNLOAD_RETRY_BASE_SECONDS * (2 ** (attempt - 1)) + time.sleep(wait_s) + + raise RuntimeError( + f"Falha ao baixar arquivo remoto apos {DOWNLOAD_RETRY_ATTEMPTS} tentativas: {remote_path}" + ) from last_exc + + def list_files_recursive(self, remote_root: str) -> List[str]: + files: List[str] = [] + for entry in self.list_entries(path=remote_root, depth=1): + rel_remote = self._href_to_rel_path(entry.href) + if not rel_remote: + continue + if entry.is_dir: + files.extend(self.list_files_recursive(rel_remote)) + else: + files.append(rel_remote) + return files + + +def from_env() -> SagezzaClient: + """ + Constrói cliente e realiza login usando variáveis de ambiente: + - SGZ_EMAIL + - SGZ_PASSWORD + - SGZ_BASE_URL (opcional) + """ + email = os.getenv("SGZ_EMAIL", "").strip() + password = os.getenv("SGZ_PASSWORD", "").strip() + base_url = os.getenv("SGZ_BASE_URL", BASE_URL).strip() or BASE_URL + if not email or not password: + raise RuntimeError("Defina SGZ_EMAIL e SGZ_PASSWORD no ambiente.") + + client = SagezzaClient(base_url=base_url) + result = client.login(email=email, password=password, remember=True) + if not result.ok: + raise RuntimeError( + f"Falha no login Sagezza. status={result.status_code} final_url={result.final_url}" + ) + return client + + +def owncloud_from_env() -> OwnCloudWebDavClient: + """ + Cria cliente WebDAV lendo ambiente: + - OWNCLOUD_WEBDAV_URL (opcional) + - OWNCLOUD_AUTH_HEADER (opcional, ex: "Basic ...") + - OWNCLOUD_USER / OWNCLOUD_PASS (opcional) + - OWNCLOUD_SHARE_TOKEN / OWNCLOUD_SHARE_PASSWORD (opcional) + """ + webdav_url = os.getenv("OWNCLOUD_WEBDAV_URL", OWNCLOUD_WEBDAV_URL).strip() + auth_header = os.getenv("OWNCLOUD_AUTH_HEADER", "").strip() or None + username = os.getenv("OWNCLOUD_USER", "").strip() or None + password = os.getenv("OWNCLOUD_PASS", "").strip() or None + + # Para links publicos: user = share_token, pass = senha do link (ou "null" sem senha) + share_token = os.getenv("OWNCLOUD_SHARE_TOKEN", "").strip() + share_password = ( + os.getenv("OWNCLOUD_SHARE_PASSWORD", "").strip() or MOCK_OWNCLOUD_SHARE_PASSWORD + ) + if not auth_header and share_token: + auth_header = OwnCloudWebDavClient.build_basic_auth( + username=share_token, + password=share_password or "null", + ) + + return OwnCloudWebDavClient( + webdav_url=webdav_url, + auth_header=auth_header, + username=username, + password=password, + ) + + +def discover_owncloud_share_token(client: SagezzaClient) -> Optional[str]: + try: + html = client.get("/").text + except Exception: + return None + + m = re.search( + r"https?://owncloud\.hqssolucoes\.com\.br/index\.php/s/([A-Za-z0-9]+)", + html or "", + flags=re.IGNORECASE, + ) + if m: + return m.group(1) + + m = re.search(r"/index\.php/s/([A-Za-z0-9]+)", html or "", flags=re.IGNORECASE) + if m: + return m.group(1) + return None + + +def owncloud_from_share_token(share_token: str) -> OwnCloudWebDavClient: + webdav_url = os.getenv("OWNCLOUD_WEBDAV_URL", OWNCLOUD_WEBDAV_URL).strip() + share_password = ( + os.getenv("OWNCLOUD_SHARE_PASSWORD", "").strip() or MOCK_OWNCLOUD_SHARE_PASSWORD + ) + auth_header = OwnCloudWebDavClient.build_basic_auth( + username=share_token, + password=share_password or "null", + ) + return OwnCloudWebDavClient(webdav_url=webdav_url, auth_header=auth_header) + + +if __name__ == "__main__": + base_url = os.getenv("SGZ_BASE_URL", BASE_URL).strip() or BASE_URL + creds = load_sgz_credentials() + print(f"Usuarios SGZ para consulta: {len(creds)}") + + ok_logins: List[Tuple[str, SagezzaClient]] = [] + fail_logins: List[str] = [] + + def _try_login(email: str, password: str) -> Tuple[str, bool, str, Optional[SagezzaClient]]: + client_tmp = SagezzaClient(base_url=base_url) + try: + result = client_tmp.login(email=email, password=password, remember=True) + ok = result.ok and client_tmp.is_logged() + if ok: + return (email, True, f"status={result.status_code} | final_url={result.final_url}", client_tmp) + return (email, False, f"status={result.status_code} | final_url={result.final_url}", None) + except Exception as exc: + return (email, False, str(exc), None) + + with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as ex: + futures = { + ex.submit(_try_login, email, password): (email, password) + for email, password in creds + } + for fut in as_completed(futures): + email, password = futures[fut] + try: + r_email, ok, info, c_ok = fut.result() + if ok: + print(f"[OK] {r_email} | {info}") + if c_ok is not None: + ok_logins.append((email, c_ok)) + else: + print(f"[FALHA] {r_email} | {info}") + fail_logins.append(r_email) + except Exception as exc: + print(f"[ERRO] {email} | {exc}") + fail_logins.append(email) + + print( + f"Resumo logins SGZ: total={len(creds)} ok={len(ok_logins)} " + f"falha={len(fail_logins)}" + ) + if fail_logins: + print("Usuarios com falha:") + for email in fail_logins: + print(f"- {email}") + + if not ok_logins: + raise RuntimeError("Nenhum login SGZ valido na lista informada.") + + should_run_owncloud = os.getenv("SGZ_RUN_OWNCLOUD", "1").strip().lower() in ( + "1", + "true", + "yes", + "sim", + ) + if not should_run_owncloud: + print( + "Fluxo ownCloud desativado. Defina SGZ_RUN_OWNCLOUD=1 para " + "baixar/listar arquivos." + ) + raise SystemExit(0) + + has_owncloud_auth = True + if has_owncloud_auth: + ano_atual = str(datetime.now().year) + path_ano = os.getenv("OWNCLOUD_YEAR_PATH", "").strip() or ano_atual + alvo = os.getenv("OWNCLOUD_TARGET_FOLDER", "BOTICARIO").strip() or "BOTICARIO" + alvo_norm = _norm_name(alvo) + total_boletos = 0 + total_gravados = 0 + def _process_user(login_item: Tuple[str, SagezzaClient]) -> Tuple[str, int, int, Optional[str]]: + email_ativo, client_ativo = login_item + try: + token = discover_owncloud_share_token(client_ativo) + if not token: + return (email_ativo, 0, 0, "Token ownCloud nao encontrado no HTML do SGZ.") + ow = owncloud_from_share_token(token) + itens = ow.list_entries(path=path_ano, depth=1) + pasta_alvo = next( + (x for x in itens if x.is_dir and alvo_norm in _norm_name(unquote(x.name))), + None, + ) + if not pasta_alvo: + return (email_ativo, 0, 0, f"Pasta alvo nao encontrada dentro de '{path_ano}': {alvo}") + + path_alvo = unquote(pasta_alvo.href).replace("/public.php/webdav/", "").strip("/") + remote_files = ow.list_files_recursive(path_alvo) + xml_by_folder: Dict[str, List[str]] = {} + pdf_paths: List[str] = [] + for rp in remote_files: + low = rp.lower() + folder = str(Path(rp).parent).replace("\\", "/").strip("/") + if low.endswith(".xml"): + xml_by_folder.setdefault(folder, []).append(Path(rp).name) + elif low.endswith(".pdf"): + pdf_paths.append(rp) + + def _process_remote_pdf(rp: str) -> Optional[Dict[str, Optional[str]]]: + pdf_bytes = ow.download_file_bytes(rp) + row = extract_boleto_from_pdf_bytes( + pdf_bytes=pdf_bytes, + source_name=Path(rp).name, + source_path=rp, + ) + if not row: + return None + folder = str(Path(rp).parent).replace("\\", "/").strip("/") + keys = _guess_cte_nfe_keys_from_xml_names(xml_by_folder.get(folder, [])) + row["chave_cte"] = keys.get("chave_cte") + row["chave_nfe"] = keys.get("chave_nfe") + row["_pdf_bytes"] = pdf_bytes # type: ignore[index] + return row + + boletos: List[Dict[str, Optional[str]]] = [] + with ThreadPoolExecutor(max_workers=MAX_PARALLEL_PDFS_PER_USER) as ex_pdf: + futures_pdf = {ex_pdf.submit(_process_remote_pdf, rp): rp for rp in pdf_paths} + for fut_pdf in as_completed(futures_pdf): + rp = futures_pdf[fut_pdf] + try: + row = fut_pdf.result() + if row: + boletos.append(row) + except Exception as exc: + print(f"{email_ativo}: falha ao processar PDF remoto '{rp}': {exc}") + + gravados = process_boletos_to_minio_sql(boletos) if boletos else 0 + return (email_ativo, len(boletos), gravados, None) + except Exception as exc: + return (email_ativo, 0, 0, str(exc)) + + with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as ex_users: + futures_user = { + ex_users.submit(_process_user, login_item): login_item[0] for login_item in ok_logins + } + for fut_user in as_completed(futures_user): + email_ativo = futures_user[fut_user] + try: + email_done, n_boletos, n_gravados, err = fut_user.result() + if err: + print(f"Falha no processamento ownCloud para {email_done}: {err}") + continue + print(f"{email_done}: boletos extraidos (em memoria) = {n_boletos}") + print(f"{email_done}: enviados/upsert = {n_gravados}") + total_boletos += n_boletos + total_gravados += n_gravados + except Exception as exc: + print(f"Falha no processamento ownCloud para {email_ativo}: {exc}") + print( + f"Resumo ownCloud (todos logins): boletos_extraidos={total_boletos} " + f"enviados_upsert={total_gravados}" + ) + else: + print( + "OwnCloud nao executado: defina OWNCLOUD_AUTH_HEADER, " + "OWNCLOUD_USER/OWNCLOUD_PASS, ou OWNCLOUD_SHARE_TOKEN." + ) diff --git a/trf.py b/trf.py new file mode 100644 index 0000000..1774ac9 --- /dev/null +++ b/trf.py @@ -0,0 +1,1245 @@ +import requests +import pdfplumber +import base64 +import json +import time +import re +import unicodedata +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from io import BytesIO +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Set, Tuple + +TOKENS_URL = "https://api.grupoginseng.com.br/api/tokens" +STORES_URL = "https://api-extranet.grupoboticario.digital/api/person-logged/stores" +DOCS_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/reports/documents-list" +HANDLE_URL = "https://sf-relatorios-api.grupoboticario.digital/v1/handle-images" + + +# ----------------------------- +# TOKEN (com cache simples) +# ----------------------------- +def _jwt_payload(jwt_token: str) -> Dict[str, Any]: + parts = jwt_token.split(".") + if len(parts) != 3: + return {} + payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) + raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) + return json.loads(raw.decode("utf-8")) + + +@dataclass +class TokenCache: + bearer: Optional[str] = None + exp_epoch: int = 0 + + def valid(self, skew_seconds: int = 30) -> bool: + return bool(self.bearer) and (time.time() < (self.exp_epoch - skew_seconds)) + + +class Auth: + def __init__(self, s: requests.Session): + self.s = s + self.cache = TokenCache() + + def get_bearer(self) -> str: + if self.cache.valid(): + return self.cache.bearer # type: ignore + + r = self.s.get(TOKENS_URL, timeout=30) + r.raise_for_status() + body = r.json() + + if not body.get("success"): + raise RuntimeError(f"Token API retornou success=false: {body}") + + bearer = body["data"][0]["token"] # já vem "Bearer ..." + jwt = bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else bearer + exp = int(_jwt_payload(jwt).get("exp") or 0) + + self.cache = TokenCache(bearer=bearer, exp_epoch=exp) + return bearer + + def invalidate(self) -> None: + self.cache = TokenCache() + + +# ----------------------------- +# PDF: download + leitura texto (fallback) +# ----------------------------- +def baixar_pdf_bytes(url_pdf: str, session: requests.Session) -> BytesIO: + r = session.get(url_pdf, timeout=120) + r.raise_for_status() + return BytesIO(r.content) + + +def ler_texto_pdf(pdf_bytes: BytesIO) -> str: + texto_total = "" + with pdfplumber.open(pdf_bytes) as pdf: + for pagina in pdf.pages: + texto = pagina.extract_text(layout=True) or "" + if texto: + texto_total += texto + "\n" + return texto_total + + +# ----------------------------- +# EXTRAÇÃO: Remuneração Franquia +# 1) tenta por TABELA (mais confiável) +# 2) se falhar, cai no TEXTO (como antes) +# ----------------------------- +def _norm(s: str) -> str: + base = _strip_accents(s or "") + return re.sub(r"\s+", " ", base).strip().lower() + + +def _parse_money_pt(s: str) -> float: + s = (s or "").strip() + # "12.769,43" -> "12769.43" + if "," in s: + s = s.replace(".", "").replace(",", ".") + return float(s) + + +def _money_from_text(s: str) -> Optional[float]: + if not s: + return None + m = re.search(r"\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2}", s) + if not m: + return None + try: + return _parse_money_pt(m.group(0)) + except Exception: + return None + + +def _float5_from_text(s: str) -> Optional[float]: + if not s: + return None + m = re.search(r"\d+(?:[.,])\d{5}", s) # 1,00000 / 1.00000 + if not m: + return None + x = m.group(0) + x = x.replace(".", "").replace(",", ".") if "," in x else x + try: + return float(x) + except Exception: + return None + + +def _pick_first_number(s: str) -> Optional[str]: + if not s: + return None + chunks = re.findall(r"\d+", s) + if not chunks: + return None + + # Em alguns PDFs a NF vem quebrada em linhas, ex: "12282" + "58". + # Se houver um sufixo curto após um bloco maior, recompõe. + if len(chunks) >= 2 and len(chunks[-2]) >= 3 and len(chunks[-1]) <= 3: + return chunks[-2] + chunks[-1] + + for c in chunks: + if len(c) >= 3: + return c + return None + + +def _extract_date_pt(s: str) -> Optional[str]: + if not s: + return None + s = re.sub(r"(?<=\d)\s+(?=\d)", "", s) + m = re.search(r"\d{2}\.\d{2}\.\d{4}", s) + return m.group(0) if m else None + + +def _nota_from_link_text(s: str) -> Optional[str]: + if not s: + return None + # querystring da URL da NFS-e + m = re.search(r"(?:nrnfs|nnfse)\s*=\s*(\d+)", s, flags=re.IGNORECASE) + return m.group(1) if m else None + + +def _nota_from_cell_text(s: str) -> Optional[str]: + if not s: + return None + txt = " ".join(str(s).split()) + # Evita confundir com valor monetario (ex: 1315.42) e aceita sufixo "-1". + m = re.search(r"(? int: + if not v: + return 0 + return len(re.sub(r"\D", "", str(v))) + + +def _normalizar_nota_fiscal(v: Optional[str]) -> Optional[str]: + if not v: + return None + digits = re.sub(r"\D", "", str(v)) + if not digits: + return None + # Regra de negocio: manter somente os ultimos 9 digitos. + return digits[-9:] if len(digits) > 9 else digits + + +def _nota_ou_dps(nota: Optional[str], nota_link: Optional[str], dps_num: Optional[str]) -> Optional[str]: + cands = [_normalizar_nota_fiscal(nota), _normalizar_nota_fiscal(nota_link), _normalizar_nota_fiscal(dps_num)] + valid = [c for c in cands if _nota_fiscal_valida(c)] + if valid: + # Se houver mais de uma opção, prefere a mais completa (mais dígitos). + return max(valid, key=_nota_digits_len) + for c in cands: + if c: + return c + return None + + +def _dps_num_from_text(s: str) -> Optional[str]: + if not s: + return None + m = re.search( + r"(?:dados\s*dps|dps).{0,80}?\bnum(?:ero|\.?)\s*[:=]\s*(\d{3,})\b", + s, + flags=re.IGNORECASE | re.DOTALL, + ) + return m.group(1) if m else None + +def _nota_fiscal_valida(nota: Optional[str]) -> bool: + if not nota: + return False + digits = re.sub(r"\D", "", str(nota)) + # Evita ano (ex.: "2026") e outros falsos positivos curtos. + return len(digits) >= 5 + + +def _extrair_parcelas(dp_text: str, emissao: Optional[str] = None) -> List[Dict[str, Any]]: + pares = re.findall( + r"\b(\d{2}\.\d{2}\.\d{4})\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+\.\d{2})\b", + dp_text or "", + ) + parcelas: List[Dict[str, Any]] = [] + seen = set() + for data, v in pares: + try: + val = _parse_money_pt(v) + except Exception: + continue + k = (data, val) + if k not in seen: + seen.add(k) + parcelas.append({"data": data, "valor": val}) + + # Quando a coluna vem mesclada, pode capturar "emissao + desconto 0,00" como 1a parcela. + if emissao and len(parcelas) > 1: + parcelas = [ + p for p in parcelas + if not (p.get("data") == emissao and abs(float(p.get("valor") or 0.0)) < 0.00001) + ] + + return _ordenar_parcelas_por_data(parcelas) + + +def _date_from_iso(s: Optional[str]): + if not s: + return None + try: + return datetime.strptime(s, "%Y-%m-%d").date() + except Exception: + return None + + +def _date_from_br(s: Optional[str]): + if not s: + return None + try: + return datetime.strptime(s, "%d.%m.%Y").date() + except Exception: + return None + + +def _ordenar_parcelas_por_data(parcelas: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Ordena parcelas por data (dd.mm.aaaa) e, em empate, por valor. + Parcelas sem data válida ficam no final. + """ + def _k(p: Dict[str, Any]): + dt = _date_from_br(p.get("data")) + val = p.get("valor") + val_num = float(val) if isinstance(val, (int, float)) else float("inf") + invalida = dt is None + return (invalida, dt or datetime.max.date(), val_num) + + return sorted(parcelas or [], key=_k) + + +def _isolar_bloco(texto: str, titulo_regex: str, proximos_titulos_regex: List[str]) -> Optional[str]: + t = " ".join((texto or "").split()) + m0 = re.search(titulo_regex, t, flags=re.IGNORECASE) + if not m0: + return None + after = t[m0.start():] + + end_pos = None + for rx in proximos_titulos_regex: + m = re.search(rx, after, flags=re.IGNORECASE) + if m: + end_pos = m.start() + break + + return after[:end_pos] if end_pos is not None else after + + +def extrair_remuneracao_franquia_por_tabela(pdf: pdfplumber.PDF) -> Optional[Dict[str, Any]]: + """ + Extrai APENAS a tabela do bloco: + "Notas Fiscais de Serviço de taxa de remuneração de Franquia" + + Regra para não confundir com "Venda de Produtos": + - o header TEM que ter "valor taxa" e "fat.conv" + """ + for page in pdf.pages: + tables = page.extract_tables() or [] + for tbl in tables: + if not tbl or len(tbl) < 2: + continue + + # procura um header que tenha as colunas específicas do bloco de remuneração + header_idx = None + header_join = "" + for i, row in enumerate(tbl[:8]): + header_join = " | ".join(_norm(c) for c in row if c) + # precisa ter essas chaves (venda não tem "valor taxa") + if ("valor taxa" in header_join) and ("fat" in header_join) and ("dados pagamento" in header_join): + header_idx = i + break + + if header_idx is None: + continue + + nota_link_tabela = _nota_from_link_text(_table_text(tbl)) + header = [_norm(c) for c in tbl[header_idx]] + + def idx_like(keys: List[str]) -> Optional[int]: + for j, h in enumerate(header): + for k in keys: + if k in h: + return j + return None + + i_emissao = idx_like(["emissao"]) + i_nota = idx_like(["nota fiscal"]) + i_valor_taxa = idx_like(["valor taxa"]) + i_valor_desc = idx_like(["valor desconto"]) + i_fat = idx_like(["fat.conv", "fat conv", "fat"]) + i_valor_nf = idx_like(["valor nf"]) + i_enc = idx_like(["encargos"]) + i_dp = idx_like(["dados pagamento"]) + + # essenciais do bloco + if i_emissao is None or i_nota is None or i_valor_taxa is None or i_valor_nf is None or i_dp is None: + continue + + # lê a primeira linha "real" de dados (normalmente só tem 1) + for row in tbl[header_idx + 1:]: + if not any((c or "").strip() for c in row): + continue + + emissao_cell = row[i_emissao] if i_emissao < len(row) else "" + nota_cell = row[i_nota] if i_nota < len(row) else "" + dp_cell = _cell(row[i_dp]) if i_dp < len(row) else "" + + emissao = _extract_date_pt(_cell(emissao_cell)) + nota = _nota_from_cell_text(_cell(nota_cell)) + nota_link = _nota_from_link_text(dp_cell) + dps_num = _dps_num_from_text(dp_cell) + nota_fiscal = _nota_ou_dps(nota, nota_link or nota_link_tabela, dps_num) + + if not emissao: + continue + + valor_taxa = _money_from_text(row[i_valor_taxa]) if i_valor_taxa < len(row) else None + valor_desconto = _money_from_text(row[i_valor_desc]) if i_valor_desc is not None and i_valor_desc < len(row) else None + fat_conv = _float5_from_text(row[i_fat]) if i_fat is not None and i_fat < len(row) else None + valor_nf = _money_from_text(row[i_valor_nf]) if i_valor_nf < len(row) else None + encargos = _money_from_text(row[i_enc]) if i_enc is not None and i_enc < len(row) else None + + # Dados pagamento: várias linhas data + valor + dp = dp_cell + parcelas = _extrair_parcelas(dp, emissao=emissao) + tem_valor_bloco = any(v is not None for v in (valor_taxa, valor_nf, encargos)) or bool(parcelas) + + # Só considera TRF quando o bloco tem preenchimento real. + if (not _nota_fiscal_valida(nota_fiscal)) or (not tem_valor_bloco): + continue + + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": emissao, + "nota_fiscal": nota_fiscal, + "valor_taxa": valor_taxa, + "valor_desconto": valor_desconto, + "fat_conv": fat_conv, + "valor_nf": valor_nf, + "encargos": encargos, + "parcelas": parcelas, + "error": None, + } + + return None + + +def _cell(c: Any) -> str: + return " ".join(str(c or "").split()) + + +def _table_text(tbl: List[List[Any]]) -> str: + return " | ".join(_norm(_cell(c)) for row in tbl for c in (row or [])) + + +def _strip_accents(s: str) -> str: + return "".join(ch for ch in unicodedata.normalize("NFD", s or "") if unicodedata.category(ch) != "Mn") + + +def extrair_remuneracao_franquia_por_texto(texto: str) -> Dict[str, Any]: + """ + Fallback (como você já fazia): acha a seção por regex e tenta extrair + emissão, nota e parcelas. (Os valores podem falhar dependendo do PDF.) + """ + bloco = _isolar_bloco( + texto, + titulo_regex=r"Notas\s+Fiscais\s+de\s+Servi.{0,3}o\s+de\s+taxa\s+de\s+remunera.{0,5}o\s+de\s+Franquia", + proximos_titulos_regex=[ + r"\bLink\s*:", + r"Notas\s+de\s+Despesas\s+de\s+Propaganda", + r"Outras\s+Notas\s+de\s+D[eé]bito", + r"Outras\s+Notas\s+de\s+Cr[eé]dito", + ], + ) + if not bloco: + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Seção não encontrada", + } + + b = " ".join(bloco.split()) + + m_em = re.search(r"\b(\d{2}\.\d{2}\.\d{4})\b", b) + if not m_em: + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Não achei emissão + nota", + } + + emissao = m_em.group(1) + m_nota = re.search(r"\bnota\s+fiscal\b.{0,220}?\b(\d{3,}(?:-\d{1,4})?)\b", b, flags=re.IGNORECASE) + nota = m_nota.group(1) if m_nota else None + if not nota: + m_nota2 = re.search(r"\b\d{2}\.\d{2}\.\d{4}\b\s+(\d{3,}(?:-\d{1,4})?)\b", b, flags=re.IGNORECASE) + nota = m_nota2.group(1) if m_nota2 else None + nota_link = _nota_from_link_text(texto) + dps_num = _dps_num_from_text(texto) + nota_fiscal = _nota_ou_dps(nota, nota_link, dps_num) + + # parcelas depois de "Dados Pagamento" + idx_dp = b.lower().find("dados pagamento") + dp = b[idx_dp:] if idx_dp != -1 else b + + parcelas = _extrair_parcelas(dp, emissao=emissao) + valor_taxa = _money_from_text(re.search(r"valor\s+taxa.{0,80}", b, flags=re.IGNORECASE).group(0)) if re.search(r"valor\s+taxa.{0,80}", b, flags=re.IGNORECASE) else None + valor_nf = _money_from_text(re.search(r"valor\s+nf.{0,80}", b, flags=re.IGNORECASE).group(0)) if re.search(r"valor\s+nf.{0,80}", b, flags=re.IGNORECASE) else None + tem_valor_bloco = any(v is not None for v in (valor_taxa, valor_nf)) or bool(parcelas) + if (not _nota_fiscal_valida(nota_fiscal)) or (not tem_valor_bloco): + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Bloco TRF sem preenchimento valido", + } + + # aqui você pode manter os valores como None no fallback + return { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": emissao, + "nota_fiscal": nota_fiscal, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": parcelas, + "error": None, + } + + +def extrair_remuneracao_franquia(pdf_bytes: BytesIO) -> Dict[str, Any]: + """ + Função única que você chama no seu fluxo: + - tenta tabela (melhor) + - se não conseguir, cai no texto (como era) + """ + # tenta bloco principal por tabela (melhor cenário) + with pdfplumber.open(pdf_bytes) as pdf: + via_tabela = extrair_remuneracao_franquia_por_tabela(pdf) + if via_tabela: + return via_tabela + + # fallback texto do bloco principal + pdf_bytes.seek(0) + texto = ler_texto_pdf(pdf_bytes) + via_texto = extrair_remuneracao_franquia_por_texto(texto) + if via_texto.get("emissao"): + return via_texto + + return via_texto + + +def _tem_emissao_nota(d: Optional[Dict[str, Any]]) -> bool: + if not d: + return False + return bool(d.get("emissao")) and bool(d.get("nota_fiscal")) + + +def _dados_bloco(d: Dict[str, Any]) -> Dict[str, Any]: + return { + "emissao_nf": d.get("emissao"), + "nota_fiscal": d.get("nota_fiscal"), + "valor_taxa": d.get("valor_taxa"), + "valor_desconto": d.get("valor_desconto"), + "fat_conv": d.get("fat_conv"), + "valor_nf": d.get("valor_nf"), + "encargos": d.get("encargos"), + "parcelas": d.get("parcelas") or [], + } + + +def extrair_remuneracao_franquia_detalhado(pdf_bytes: BytesIO) -> Dict[str, Any]: + principal: Optional[Dict[str, Any]] = None + + pdf_bytes.seek(0) + with pdfplumber.open(pdf_bytes) as pdf: + via_tabela = extrair_remuneracao_franquia_por_tabela(pdf) + if via_tabela: + principal = via_tabela + + if not principal: + pdf_bytes.seek(0) + texto = ler_texto_pdf(pdf_bytes) + principal = extrair_remuneracao_franquia_por_texto(texto) + + principal_ok = _tem_emissao_nota(principal) + escolhido = principal + if not escolhido: + escolhido = { + "tipo_bloco": "TAXA_REMUNERACAO", + "emissao": None, + "nota_fiscal": None, + "valor_taxa": None, + "valor_desconto": None, + "fat_conv": None, + "valor_nf": None, + "encargos": None, + "parcelas": [], + "error": "Falha na extração", + } + + warnings: List[str] = [] + if principal and principal.get("error"): + warnings.append(str(principal.get("error"))) + + blocos: List[Dict[str, Any]] = [] + if principal: + blocos.append({ + "tipo": "TAXA_REMUNERACAO", + "encontrado": principal_ok, + "dados": _dados_bloco(principal) if principal_ok else None, + "erro": None if principal_ok else principal.get("error"), + }) + + return { + "bloco_principal": "TAXA_REMUNERACAO", + "status_principal": "ok" if principal_ok else "sem_dados", + "bloco_utilizado": escolhido.get("tipo_bloco"), + "resumo": { + "emissao_nf": escolhido.get("emissao"), + "nota_fiscal": escolhido.get("nota_fiscal"), + "valor_nf": escolhido.get("valor_nf"), + "encargos": escolhido.get("encargos"), + }, + "blocos": blocos, + "warnings": warnings, + "escolhido": escolhido, # compatibilidade com estrutura atual + } + + +# ----------------------------- +# CLIENTE PRINCIPAL +# ----------------------------- +class Client: + def __init__(self): + self.s = requests.Session() + self.auth = Auth(self.s) + self._worker_local = threading.local() + + def _headers_json(self) -> Dict[str, str]: + return { + "Authorization": self.auth.get_bearer(), + "Accept": "application/json", + "Content-Type": "application/json", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + + def get_franchises(self, only_channels: Optional[Set[str]] = None) -> List[str]: + r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30) + if r.status_code in (401, 403): + self.auth.invalidate() + r = self.s.get(STORES_URL, headers=self._headers_json(), timeout=30) + r.raise_for_status() + + body = r.json() + seen = set() + codes = [] + + for item in body.get("data", []): + if only_channels is not None and item.get("channel") not in only_channels: + continue + code = item.get("code") + if code is None: + continue + code = str(code).strip() + if code and code not in seen: + seen.add(code) + codes.append(code) + + return codes + + def get_documents_page(self, cp_id: int, document_type: str, franchises: List[str], offset: int, limit: int): + params = {"cpId": cp_id, "documentType": document_type, "offset": offset, "limit": limit} + payload = {"franchises": franchises} + + r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60) + if r.status_code in (401, 403): + self.auth.invalidate() + r = self.s.post(DOCS_URL, headers=self._headers_json(), params=params, json=payload, timeout=60) + + r.raise_for_status() + return r.json() # {"total":..., "documents":[...]} + + def get_presigned_pdf_url(self, document_type: str, franchise_id: str, image_name: str) -> str: + url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download" + + headers = { + "Authorization": self.auth.get_bearer(), + "Accept": "application/json", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + + r = self.s.get(url, headers=headers, timeout=60) + if r.status_code in (401, 403): + self.auth.invalidate() + headers["Authorization"] = self.auth.get_bearer() + r = self.s.get(url, headers=headers, timeout=60) + + r.raise_for_status() + + # pode vir JSON com {"url": "..."} ou texto puro + try: + body = r.json() + if isinstance(body, dict) and "url" in body and isinstance(body["url"], str): + return body["url"] + except Exception: + pass + + return r.text.strip() + + @staticmethod + def _headers_with_auth(auth: Auth) -> Dict[str, str]: + return { + "Authorization": auth.get_bearer(), + "Accept": "application/json", + "Origin": "https://extranet.grupoboticario.com.br", + "Referer": "https://extranet.grupoboticario.com.br/", + "User-Agent": "Mozilla/5.0", + } + + def _get_presigned_pdf_url_with_client( + self, + session: requests.Session, + auth: Auth, + document_type: str, + franchise_id: str, + image_name: str, + ) -> str: + url = f"{HANDLE_URL}/{document_type}/{franchise_id}/{image_name}/download" + headers = self._headers_with_auth(auth) + + r = session.get(url, headers=headers, timeout=60) + if r.status_code in (401, 403): + auth.invalidate() + headers["Authorization"] = auth.get_bearer() + r = session.get(url, headers=headers, timeout=60) + r.raise_for_status() + + try: + body = r.json() + if isinstance(body, dict) and "url" in body and isinstance(body["url"], str): + return body["url"] + except Exception: + pass + return r.text.strip() + + def _get_worker_http(self) -> Tuple[requests.Session, Auth]: + session = getattr(self._worker_local, "session", None) + auth = getattr(self._worker_local, "auth", None) + if session is None or auth is None: + session = requests.Session() + auth = Auth(session) + self._worker_local.session = session + self._worker_local.auth = auth + return session, auth + + def _processar_documento(self, d: Dict[str, Any], document_type: str) -> Dict[str, Any]: + franchise_id = str(d.get("franchiseId") or "").strip() + image_name = str(d.get("imageName") or "").strip() + + base_item = { + "id": d.get("id"), + "UUID": d.get("UUID"), + "franchiseId": franchise_id, + "imageName": image_name, + "emissionDate": d.get("emissionDate"), + } + + if not franchise_id or not image_name: + return {**base_item, "error": "sem franchiseId/imageName"} + + session, auth = self._get_worker_http() + try: + presigned = self._get_presigned_pdf_url_with_client( + session, auth, document_type, franchise_id, image_name + ) + pdf_bytes = baixar_pdf_bytes(presigned, session) + extra_detalhado = extrair_remuneracao_franquia_detalhado(pdf_bytes) + extra = extra_detalhado.get("escolhido", {}) + status_principal = extra_detalhado.get("status_principal") + resumo = extra_detalhado.get("resumo") or {} + emissao_nf = resumo.get("emissao_nf") + nota_fiscal = resumo.get("nota_fiscal") + bloco_ok = bool(status_principal == "ok" and emissao_nf and nota_fiscal) + + return { + **base_item, + "bloco_principal": extra_detalhado.get("bloco_principal"), + "status_principal": status_principal, + "bloco_utilizado": extra_detalhado.get("bloco_utilizado"), + "resumo": resumo, + "blocos": extra_detalhado.get("blocos"), + "warnings": extra_detalhado.get("warnings"), + "tipo_bloco": extra.get("tipo_bloco"), + "emissao_nf": extra.get("emissao"), + "nota_fiscal": extra.get("nota_fiscal"), + "valor_taxa": extra.get("valor_taxa"), + "valor_desconto": extra.get("valor_desconto"), + "fat_conv": extra.get("fat_conv"), + "valor_nf": extra.get("valor_nf"), + "encargos": extra.get("encargos"), + "parcelas": extra.get("parcelas"), + "error": None if bloco_ok else "skip_sem_bloco_taxa_remuneracao", + } + except Exception as e: + return {**base_item, "error": f"falha_processar_pdf: {e}"} + def processar_pagina( + self, + cp_id: int = 10269, + document_type: str = "EFAT", + offset: int = 0, + limit: int = 25, + only_channels: Optional[Set[str]] = None, + max_workers: int = 6, + ) -> Dict[str, Any]: + franchises = self.get_franchises(only_channels=only_channels) + + page = self.get_documents_page(cp_id, document_type, franchises, offset, limit) + docs = page.get("documents", []) + total = int(page.get("total") or 0) + + out: List[Dict[str, Any]] = [] + if docs: + workers = max(1, min(int(max_workers or 1), len(docs))) + progress_step = max(1, min(10, len(docs) // 10 if len(docs) > 10 else 1)) + if workers == 1: + out = [] + for i, d in enumerate(docs, start=1): + out.append(self._processar_documento(d, document_type)) + if i % progress_step == 0 or i == len(docs): + print(f"[proc] offset={offset} processados={i}/{len(docs)}") + else: + ordered: List[Optional[Dict[str, Any]]] = [None] * len(docs) + with ThreadPoolExecutor(max_workers=workers) as ex: + futures = { + ex.submit(self._processar_documento, d, document_type): i + for i, d in enumerate(docs) + } + done = 0 + for fut in as_completed(futures): + i = futures[fut] + try: + ordered[i] = fut.result() + except Exception as e: + ordered[i] = { + "id": docs[i].get("id"), + "UUID": docs[i].get("UUID"), + "franchiseId": str(docs[i].get("franchiseId") or "").strip(), + "imageName": str(docs[i].get("imageName") or "").strip(), + "emissionDate": docs[i].get("emissionDate"), + "error": f"falha_processar_pdf: {e}", + } + done += 1 + if done % progress_step == 0 or done == len(docs): + print(f"[proc] offset={offset} processados={done}/{len(docs)}") + out = [x for x in ordered if x is not None] + + return { + "total": total, + "offset": offset, + "limit": limit, + "count": len(docs), + "hasNext": (offset + limit) < total, + "items": out + } + + +# ----------------------------- +# SQL SERVER: persistência +# ----------------------------- +class SqlServerSink: + def __init__(self, connection_string: str): + self.connection_string = connection_string + self.cn = None + self.cur = None + self.doc_table = "dbo.TrfRemDocumento" + self.parcela_table = "dbo.TrfRemParcela" + + def __enter__(self): + try: + import pyodbc # type: ignore + except Exception as e: + raise RuntimeError("pyodbc não encontrado. Instale com: pip install pyodbc") from e + + self.cn = pyodbc.connect(self.connection_string, timeout=30) + self.cn.autocommit = False + self.cur = self.cn.cursor() + return self + + def __exit__(self, exc_type, exc, tb): + if self.cur is not None: + try: + self.cur.close() + except Exception: + pass + if self.cn is not None: + try: + if exc_type is None: + self.cn.commit() + else: + self.cn.rollback() + except Exception: + pass + try: + self.cn.close() + except Exception: + pass + + def reconnect(self) -> None: + try: + if self.cur is not None: + self.cur.close() + except Exception: + pass + try: + if self.cn is not None: + self.cn.close() + except Exception: + pass + + import pyodbc # type: ignore + self.cn = pyodbc.connect(self.connection_string, timeout=30) + self.cn.autocommit = False + self.cur = self.cn.cursor() + + @staticmethod + def _is_comm_error(e: Exception) -> bool: + msg = str(e) + return ("08S01" in msg) or ("10054" in msg) or ("communication link failure" in msg.lower()) + + def ensure_schema(self) -> None: + assert self.cur is not None + self.cur.execute( + f""" +IF OBJECT_ID('{self.doc_table}', 'U') IS NULL +BEGIN + CREATE TABLE {self.doc_table} ( + id BIGINT IDENTITY(1,1) PRIMARY KEY, + UUID VARCHAR(80) NOT NULL UNIQUE, + IdExterno BIGINT NULL, + FranchiseId VARCHAR(20) NULL, + ImageName VARCHAR(150) NULL, + EmissionDate DATE NULL, + EmissaoNF DATE NULL, + NotaFiscal VARCHAR(40) NULL, + ValorNF DECIMAL(18,2) NULL, + Encargos DECIMAL(18,2) NULL, + AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() + ); +END; + """ + ) + self.cur.execute( + f""" +IF OBJECT_ID('{self.parcela_table}', 'U') IS NULL +BEGIN + CREATE TABLE {self.parcela_table} ( + id BIGINT IDENTITY(1,1) PRIMARY KEY, + DocumentoId BIGINT NOT NULL, + NumeroParcela INT NOT NULL, + DataVencimento DATE NOT NULL, + ValorParcela DECIMAL(18,2) NOT NULL, + CONSTRAINT FK_TrfRemParcela_Documento + FOREIGN KEY (DocumentoId) REFERENCES {self.doc_table}(id), + CONSTRAINT UQ_TrfRemParcela UNIQUE (DocumentoId, NumeroParcela) + ); +END; + """ + ) + self.cur.execute( + f""" +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_NotaFiscal' AND object_id=OBJECT_ID('{self.doc_table}')) + CREATE INDEX IX_TrfRemDocumento_NotaFiscal ON {self.doc_table}(NotaFiscal); +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_UUID' AND object_id=OBJECT_ID('{self.doc_table}')) + CREATE UNIQUE INDEX IX_TrfRemDocumento_UUID ON {self.doc_table}(UUID); +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemDocumento_Franchise_Emission' AND object_id=OBJECT_ID('{self.doc_table}')) + CREATE INDEX IX_TrfRemDocumento_Franchise_Emission ON {self.doc_table}(FranchiseId, EmissionDate); +IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name='IX_TrfRemParcela_DocumentoId_DataVenc' AND object_id=OBJECT_ID('{self.parcela_table}')) + CREATE INDEX IX_TrfRemParcela_DocumentoId_DataVenc ON {self.parcela_table}(DocumentoId, DataVencimento); + """ + ) + self.cn.commit() + + def upsert_documento(self, item: Dict[str, Any]) -> Tuple[int, bool]: + assert self.cur is not None + uuid = item.get("UUID") + if not uuid: + raise ValueError("item sem UUID") + + resumo = item.get("resumo") or {} + self.cur.execute(f"SELECT id FROM {self.doc_table} WHERE UUID = ?", uuid) + row = self.cur.fetchone() + + if row: + documento_id = int(row[0]) + self.cur.execute( + f""" +UPDATE {self.doc_table} +SET IdExterno=?, FranchiseId=?, ImageName=?, EmissionDate=?, + EmissaoNF=?, NotaFiscal=?, ValorNF=?, Encargos=?, + AtualizadoEm=SYSUTCDATETIME() +WHERE id=? + """, + item.get("id"), + item.get("franchiseId"), + item.get("imageName"), + _date_from_iso(item.get("emissionDate")), + _date_from_br(resumo.get("emissao_nf")), + resumo.get("nota_fiscal"), + resumo.get("valor_nf"), + resumo.get("encargos"), + documento_id, + ) + return documento_id, False + + self.cur.execute( + f""" +INSERT INTO {self.doc_table} ( + UUID, IdExterno, FranchiseId, ImageName, EmissionDate, + EmissaoNF, NotaFiscal, ValorNF, Encargos +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); + """, + uuid, + item.get("id"), + item.get("franchiseId"), + item.get("imageName"), + _date_from_iso(item.get("emissionDate")), + _date_from_br(resumo.get("emissao_nf")), + resumo.get("nota_fiscal"), + resumo.get("valor_nf"), + resumo.get("encargos"), + ) + self.cur.execute(f"SELECT id FROM {self.doc_table} WHERE UUID = ?", uuid) + new_id = self.cur.fetchone() + if not new_id: + raise RuntimeError("Falha ao recuperar id após inserir TrfRemDocumento") + return int(new_id[0]), True + + def replace_parcelas(self, documento_id: int, item: Dict[str, Any]) -> None: + assert self.cur is not None + self.cur.execute(f"DELETE FROM {self.parcela_table} WHERE DocumentoId = ?", documento_id) + + parcelas = _ordenar_parcelas_por_data(item.get("parcelas") or []) + if not parcelas: + return + + rows = [] + for idx, p in enumerate(parcelas, start=1): + dt = _date_from_br(p.get("data")) + val = p.get("valor") + if dt is None or val is None: + continue + rows.append((documento_id, idx, dt, val)) + + if rows: + self.cur.fast_executemany = True + self.cur.executemany( + f"INSERT INTO {self.parcela_table} (DocumentoId, NumeroParcela, DataVencimento, ValorParcela) VALUES (?, ?, ?, ?)", + rows, + ) + + def persist_items(self, items: List[Dict[str, Any]]) -> Tuple[int, int]: + count = 0 + novos = 0 + for item in items: + err = item.get("error") + err_txt = str(err) if err is not None else "" + if err_txt: + continue + if not item.get("UUID"): + continue + resumo = item.get("resumo") or {} + if item.get("status_principal") != "ok": + continue + if not resumo.get("emissao_nf") or not resumo.get("nota_fiscal"): + continue + if not _nota_fiscal_valida(resumo.get("nota_fiscal")): + continue + if (resumo.get("valor_nf") is None) and (item.get("valor_taxa") is None) and not (item.get("parcelas") or []): + continue + try: + doc_id, is_new = self.upsert_documento(item) + self.replace_parcelas(doc_id, item) + except Exception as e: + if not self._is_comm_error(e): + raise + # reconecta e tenta uma única vez o item atual + self.reconnect() + doc_id, is_new = self.upsert_documento(item) + self.replace_parcelas(doc_id, item) + count += 1 + if is_new: + novos += 1 + return count, novos + + +def sincronizar_paginas_sqlserver( + connection_string: str, + cp_id: int = 10269, + document_type: str = "EFAT", + limit: int = 100, + start_offset: int = 0, + only_channels: Optional[Set[str]] = None, + commit_cada_paginas: int = 1, +) -> Dict[str, Any]: + cli = Client() + offset = start_offset + paginas = 0 + docs_persistidos = 0 + total = None + + with SqlServerSink(connection_string) as sink: + sink.ensure_schema() + while True: + pagina = cli.processar_pagina( + cp_id=cp_id, + document_type=document_type, + offset=offset, + limit=limit, + only_channels=only_channels, + ) + if total is None: + total = int(pagina.get("total") or 0) + + itens = pagina.get("items") or [] + persistidos_pag, novos_pag = sink.persist_items(itens) + docs_persistidos += persistidos_pag + paginas += 1 + + if paginas % commit_cada_paginas == 0: + sink.cn.commit() + + print(f"[sync] offset={offset} count={len(itens)} novos_pag={novos_pag} persistidos={docs_persistidos} total={total}") + if not pagina.get("hasNext"): + break + offset += limit + + sink.cn.commit() + + return { + "total": total, + "paginas_processadas": paginas, + "documentos_persistidos": docs_persistidos, + "offset_final": offset, + } + + +def sincronizar_incremental_sqlserver( + connection_string: str, + cp_id: int = 10269, + document_type: str = "EFAT", + limit: int = 100, + only_channels: Optional[Set[str]] = None, + max_paginas_sem_novidade: int = 3, + max_paginas: int = 20, +) -> Dict[str, Any]: + cli = Client() + offset = 0 + paginas = 0 + docs_persistidos = 0 + docs_novos = 0 + sem_novidade = 0 + total = None + + with SqlServerSink(connection_string) as sink: + sink.ensure_schema() + while True: + pagina = cli.processar_pagina( + cp_id=cp_id, + document_type=document_type, + offset=offset, + limit=limit, + only_channels=only_channels, + ) + if total is None: + total = int(pagina.get("total") or 0) + + itens = pagina.get("items") or [] + persistidos_pag, novos_pag = sink.persist_items(itens) + sink.cn.commit() + + docs_persistidos += persistidos_pag + docs_novos += novos_pag + paginas += 1 + sem_novidade = 0 if novos_pag > 0 else (sem_novidade + 1) + + print( + f"[inc] offset={offset} count={len(itens)} novos_pag={novos_pag} " + f"sem_novidade={sem_novidade}/{max_paginas_sem_novidade} total_novos={docs_novos}" + ) + + if sem_novidade >= max_paginas_sem_novidade: + break + if paginas >= max_paginas: + break + if not pagina.get("hasNext"): + break + offset += limit + + return { + "total": total, + "paginas_processadas": paginas, + "documentos_persistidos": docs_persistidos, + "documentos_novos": docs_novos, + "offset_final": offset, + "parada_por_sem_novidade": sem_novidade >= max_paginas_sem_novidade, + } + + +# ----------------------------- +# RUN +# ----------------------------- +if __name__ == "__main__": + # Modos: + # - "full": carga completa paginada + # - "incremental": para após X páginas sem novidades + # - "json": só imprime uma página em JSON + RUN_MODE = "incremental" # "full" | "incremental" | "json" + + if RUN_MODE in ("full", "incremental"): + SQLSERVER_CONN = ( + "DRIVER={ODBC Driver 17 for SQL Server};" + "SERVER=10.77.77.10;" + "DATABASE=GINSENG;" + "UID=andrey;" + "PWD=88253332;" + "TrustServerCertificate=yes;" + ) + if RUN_MODE == "full": + resultado = sincronizar_paginas_sqlserver( + connection_string=SQLSERVER_CONN, + cp_id=10269, + document_type="EFAT", + limit=100, + start_offset=0, + only_channels=None, + commit_cada_paginas=1, + ) + else: + resultado = sincronizar_incremental_sqlserver( + connection_string=SQLSERVER_CONN, + cp_id=10269, + document_type="EFAT", + limit=100, + only_channels=None, + max_paginas_sem_novidade=3, + max_paginas=20, + ) + else: + c = Client() + resultado = c.processar_pagina( + cp_id=10269, + document_type="EFAT", + offset=0, + limit=25, + only_channels=None, + ) + + print(json.dumps(resultado, ensure_ascii=False, indent=2)) +