import os import re import base64 import json import io import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock from datetime import datetime from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urljoin, unquote import xml.etree.ElementTree as ET import unicodedata from pathlib import Path import requests import pdfplumber from minio import Minio BASE_URL = "https://sistema.sgztrade.com.br" LOGIN_PATH = "/login" DEFAULT_TIMEOUT = 30 OWNCLOUD_WEBDAV_URL = "https://owncloud.hqssolucoes.com.br/public.php/webdav/" DEFAULT_SGZ_CREDENTIALS: List[Tuple[str, str]] = [ ("20968@sgztrade.com.br", "sgz123"), ("20969@sgztrade.com.br", "sgz123"), ("20970@sgztrade.com.br", "sgz123"), ("20986@sgztrade.com.br", "sgz123"), ("20988@sgztrade.com.br", "sgz123"), ("20989@sgztrade.com.br", "sgz123"), ("20992@sgztrade.com.br", "sgz123"), ("20993@sgztrade.com.br", "sgz123"), ("20994@sgztrade.com.br", "sgz123"), ("20995@sgztrade.com.br", "sgz123"), ("20996@sgztrade.com.br", "sgz123"), ("20997@sgztrade.com.br", "sgz123"), ("20998@sgztrade.com.br", "sgz123"), ("20999@sgztrade.com.br", "sgz123"), ("21000@sgztrade.com.br", "sgz123"), ("21001@sgztrade.com.br", "sgz123"), ("21278@sgztrade.com.br", "sgz123"), ("21495@sgztrade.com.br", "sgz123"), ("21383@sgztrade.com.br", "sgz123"), ("21375@sgztrade.com.br", "sgz123"), ("20991@sgztrade.com.br", "sgz123"), ("22541@sgztrade.com.br", "sgz123"), ("23813@sgztrade.com.br", "sgz123@@"), ("24257@sgztrade.com.br", "sgz123@@"), ("24255@sgztrade.com.br", "sgz123@@"), ("24293@sgztrade.com.br", "sgz123@@"), ("24269@sgztrade.com.br", "sgz123@@"), ("910173@sgztrade.com.br", "sgz123"), ("910291@sgztrade.com.br", "sgz123"), ("23711@sgztrade.com.br", "sgz123@@"), ("23712@sgztrade.com.br", "sgz@@123"), ("23708@sgztrade.com.br", "sgz123@@"), ("23704@sgztrade.com.br", "sgz123@@"), ("23703@sgztrade.com.br", "sgz123@@"), ] MOCK_OWNCLOUD_SHARE_PASSWORD = "null" SQLSERVER_CONN = os.getenv( "SQLSERVER_CONN", ( "DRIVER={ODBC Driver 17 for SQL Server};" "SERVER=10.77.77.10;" "DATABASE=GINSENG;" "UID=andrey;" "PWD=88253332;" "Encrypt=no;" "TrustServerCertificate=yes;" ), ) MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "10.77.77.29:31200") MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "admin") MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "admin123") MINIO_BUCKET = os.getenv("MINIO_BUCKET", "boletosvitrine") MINIO_PREFIX = os.getenv("MINIO_PREFIX", "Boletos") MINIO_SECURE = os.getenv("MINIO_SECURE", "false").strip().lower() in ("1", "true", "yes") MAX_PARALLEL_REQUESTS = 5 MAX_PARALLEL_PDFS_PER_USER = 1 DOWNLOAD_RETRY_ATTEMPTS = 4 DOWNLOAD_RETRY_BASE_SECONDS = 0.8 SQL_RETRY_ATTEMPTS = 4 SQL_RETRY_BASE_SECONDS = 0.8 SQL_UPSERT_LOCK = Lock() def _normalize_sql_conn(conn: str) -> str: s = (conn or "").strip().rstrip(";") if not s: return s # Força compatibilidade neste ambiente. if re.search(r"(?i)\bencrypt\s*=", s): s = re.sub(r"(?i)\bencrypt\s*=\s*[^;]+", "Encrypt=no", s) else: s += ";Encrypt=no" if not re.search(r"(?i)\btrustservercertificate\s*=", s): s += ";TrustServerCertificate=yes" return s + ";" def _extract_csrf_token(html: str) -> str: """ Extrai o valor do input hidden _token da página de login. """ m = re.search( r']+name=["\']_token["\'][^>]+value=["\']([^"\']+)["\']', html or "", flags=re.IGNORECASE, ) if not m: raise RuntimeError("Nao foi possivel extrair o token CSRF (_token) da pagina de login.") return m.group(1) @dataclass class LoginResult: ok: bool status_code: int final_url: str @dataclass class WebDavEntry: href: str name: str is_dir: bool size: Optional[int] = None last_modified: Optional[str] = None def _strip_accents(text: str) -> str: return "".join( c for c in unicodedata.normalize("NFKD", text or "") if not unicodedata.combining(c) ) def _norm_name(text: str) -> str: return _strip_accents((text or "")).strip().lower() def _normalize_spaces(text: str) -> str: return re.sub(r"\s+", " ", (text or "")).strip() def _first_group(pattern: str, text: str, flags: int = 0) -> Optional[str]: m = re.search(pattern, text or "", flags=flags) return m.group(1).strip() if m else None def _extract_boleto_fields_from_text(text: str, source_name: str) -> Optional[Dict[str, Optional[str]]]: t = text or "" if not t: return None linha_digitavel = _first_group( r"(\d{5}\.\d{5}\s+\d{5}\.\d{6}\s+\d{5}\.\d{6}\s+\d\s+\d{14})", t, flags=re.IGNORECASE, ) if not linha_digitavel and "Recibo do Pagador" not in t: return None beneficiario_nome = _first_group( r"Benefici[aá]rio\s+(.+?)\s+CNPJ\s*:\s*[0-9./-]+", t, flags=re.IGNORECASE | re.DOTALL, ) cnpj_beneficiario = _first_group( r"Benefici[aá]rio.+?CNPJ\s*:\s*([0-9./-]{14,18})", t, flags=re.IGNORECASE | re.DOTALL, ) pagador_nome = _first_group( r"Pagador\s+(.+?)\s+CPF/CNPJ\s*:\s*[0-9./-]+", t, flags=re.IGNORECASE | re.DOTALL, ) cnpj_pagador = _first_group( r"Pagador.+?CPF/CNPJ\s*:\s*([0-9./-]{14,18})", t, flags=re.IGNORECASE | re.DOTALL, ) vencimento = _first_group( r"Vencimento\s+(\d{2}/\d{2}/\d{4})", t, flags=re.IGNORECASE, ) valor_documento = _first_group( r"Valor do Documento.*?(\d{1,3}(?:\.\d{3})*,\d{2})", t, flags=re.IGNORECASE | re.DOTALL, ) agencia_codigo_beneficiario = _first_group( r"Ag[êe]ncia\s*/\s*C[oó]digo Benefici[aá]rio\s+([0-9\-]+\s*/\s*[0-9\-]+)", t, flags=re.IGNORECASE, ) nosso_numero = _first_group( r"Nosso N[uú]mero\s+.*?\n.*?(\d{2}/\d{6,}-\d)", t, flags=re.IGNORECASE, ) or _first_group( r"\b(\d{2}/\d{6,}-\d)\b", t, flags=re.IGNORECASE, ) numero_documento = _first_group( r"NF\s+(\d+)", source_name, flags=re.IGNORECASE, ) or _first_group( r"N[uú]mero do Documento.*?\n.*?(\d{4,})", t, flags=re.IGNORECASE, ) return { "beneficiario": _normalize_spaces(beneficiario_nome or ""), "cnpj_beneficiario": (cnpj_beneficiario or "").strip(), "pagador": _normalize_spaces(pagador_nome or ""), "cnpj_pagador": (cnpj_pagador or "").strip(), "numero_documento": (numero_documento or "").strip(), "nosso_numero": (nosso_numero or "").strip(), "vencimento": (vencimento or "").strip(), "valor_documento": (valor_documento or "").strip(), "linha_digitavel": (linha_digitavel or "").strip(), "agencia_codigo_beneficiario": (agencia_codigo_beneficiario or "").strip(), } def extract_boleto_from_pdf_bytes( pdf_bytes: bytes, source_name: str, source_path: str, ) -> Optional[Dict[str, Optional[str]]]: text_parts: List[str] = [] with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: for page in pdf.pages[:2]: page_text = page.extract_text(layout=True) or "" if page_text: text_parts.append(page_text) full_text = "\n".join(text_parts) data = _extract_boleto_fields_from_text(full_text, source_name) if not data: return None data["arquivo"] = source_path return data def _digits_only(v: Optional[str]) -> str: return re.sub(r"\D+", "", v or "") def _parse_money_br(v: Optional[str]) -> Optional[float]: s = (v or "").strip() if not s: return None s = s.replace(".", "").replace(",", ".") try: return float(s) except Exception: return None def _parse_date_br(v: Optional[str]) -> Optional[datetime.date]: s = (v or "").strip() if not s: return None try: return datetime.strptime(s, "%d/%m/%Y").date() except Exception: return None def _extract_44_from_name(name: str) -> Optional[str]: m = re.search(r"(? List[Tuple[str, str]]: creds: List[Tuple[str, str]] = [] seen = set() for line in (raw or "").splitlines(): row = (line or "").strip() if not row: continue low = row.lower() if "usuario" in low and "senha" in low: continue if row.startswith("#"): continue parts = re.split(r"\s+", row) if len(parts) < 2: continue email = parts[0].strip() password = parts[1].strip() if "@" not in email or not password: continue key = (email.lower(), password) if key in seen: continue seen.add(key) creds.append((email, password)) return creds def load_sgz_credentials() -> List[Tuple[str, str]]: """ Carrega credenciais SGZ em lote nesta ordem: 1) SGZ_CREDENTIALS_RAW (multilinha, formato: "email senha") 2) SGZ_CREDENTIALS_FILE (se informado) 3) credenciais padrao embutidas no codigo (DEFAULT_SGZ_CREDENTIALS) 4) SGZ_EMAIL/SGZ_PASSWORD """ raw_env = os.getenv("SGZ_CREDENTIALS_RAW", "") parsed_env = _parse_sgz_credentials_text(raw_env) if parsed_env: return parsed_env creds_file = os.getenv("SGZ_CREDENTIALS_FILE", "").strip() if creds_file: p = Path(creds_file) if p.exists() and p.is_file(): parsed_file = _parse_sgz_credentials_text( p.read_text(encoding="utf-8", errors="ignore") ) if parsed_file: return parsed_file if DEFAULT_SGZ_CREDENTIALS: return list(DEFAULT_SGZ_CREDENTIALS) email = os.getenv("SGZ_EMAIL", "").strip() password = os.getenv("SGZ_PASSWORD", "").strip() if not email or not password: raise RuntimeError( "Defina SGZ_CREDENTIALS_RAW, SGZ_CREDENTIALS_FILE, " "DEFAULT_SGZ_CREDENTIALS ou SGZ_EMAIL/SGZ_PASSWORD." ) return [(email, password)] def _extract_nf_from_name(name: str) -> Optional[str]: m = re.search(r"\bNF\s*(\d+)\b", name or "", flags=re.IGNORECASE) return m.group(1) if m else None def _guess_cte_nfe_keys_from_xml_names(xml_names: List[str]) -> Dict[str, Optional[str]]: cte_key: Optional[str] = None nfe_key: Optional[str] = None for name in xml_names: key = _extract_44_from_name(name) if not key: continue model = key[20:22] if model == "57": cte_key = cte_key or key elif model == "55": nfe_key = nfe_key or key elif "_" in Path(name).stem: cte_key = cte_key or key return {"chave_cte": cte_key, "chave_nfe": nfe_key} def criar_minio_client(): client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_SECURE, ) if not client.bucket_exists(MINIO_BUCKET): client.make_bucket(MINIO_BUCKET) return client, MINIO_BUCKET, MINIO_PREFIX def upload_bytes_minio(client: Minio, bucket: str, prefix: str, object_name: str, data: bytes, content_type: str) -> str: object_name = object_name.lstrip("/") if prefix: object_name = f"{prefix.rstrip('/')}/{object_name}" client.put_object( bucket, object_name, io.BytesIO(data), length=len(data), content_type=content_type, ) return object_name def _safe_file_part(name: str) -> str: return re.sub(r"[^A-Za-z0-9._-]+", "_", name or "") def _fit(v: Optional[str], max_len: int) -> Optional[str]: if v is None: return None s = str(v).strip() if not s: return None return s[:max_len] def ensure_boleto_cte_schema(cur) -> None: cur.execute( """ IF OBJECT_ID('dbo.BoletoCte', 'U') IS NULL BEGIN CREATE TABLE dbo.BoletoCte ( id BIGINT IDENTITY(1,1) PRIMARY KEY, ArquivoLocal NVARCHAR(500) NOT NULL, Beneficiario NVARCHAR(300) NULL, CnpjBeneficiario VARCHAR(14) NULL, Pagador NVARCHAR(300) NULL, CnpjPagador VARCHAR(14) NULL, NumeroDocumento VARCHAR(40) NULL, NossoNumero VARCHAR(60) NULL, Vencimento DATE NULL, ValorDocumento DECIMAL(18,2) NULL, LinhaDigitavel VARCHAR(80) NULL, AgenciaCodigoBeneficiario VARCHAR(40) NULL, ChaveCte CHAR(44) NULL, ChaveNfe CHAR(44) NULL, MinioBucket VARCHAR(100) NULL, MinioObjectKey VARCHAR(500) NULL, AtualizadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME() ); END; """ ) cur.execute( """ IF NOT EXISTS ( SELECT 1 FROM sys.indexes WHERE name='UQ_BoletoCte_ArquivoLocal' AND object_id=OBJECT_ID('dbo.BoletoCte') ) BEGIN CREATE UNIQUE INDEX UQ_BoletoCte_ArquivoLocal ON dbo.BoletoCte(ArquivoLocal); END; """ ) def upsert_boleto_cte(cur, row: Dict[str, Optional[str]]) -> None: beneficiario = _fit(row.get("beneficiario"), 300) cnpj_beneficiario = _fit(_digits_only(row.get("cnpj_beneficiario")) or None, 14) pagador = _fit(row.get("pagador"), 300) cnpj_pagador = _fit(_digits_only(row.get("cnpj_pagador")) or None, 14) numero_documento = _fit(row.get("numero_documento"), 40) nosso_numero = _fit(row.get("nosso_numero"), 60) linha_digitavel = _fit(row.get("linha_digitavel"), 80) ag_cod_benef = _fit(row.get("agencia_codigo_beneficiario"), 40) chave_cte = _fit(row.get("chave_cte"), 44) chave_nfe = _fit(row.get("chave_nfe"), 44) minio_bucket = _fit(row.get("minio_bucket"), 100) minio_object_key = _fit(row.get("minio_object_key"), 500) arquivo_local = _fit(row.get("arquivo"), 500) cur.execute( """ SELECT id FROM dbo.BoletoCte WHERE ArquivoLocal = ? """, arquivo_local, ) exists = cur.fetchone() if exists: cur.execute( """ UPDATE dbo.BoletoCte SET Beneficiario=?, CnpjBeneficiario=?, Pagador=?, CnpjPagador=?, NumeroDocumento=?, NossoNumero=?, Vencimento=?, ValorDocumento=?, LinhaDigitavel=?, AgenciaCodigoBeneficiario=?, ChaveCte=?, ChaveNfe=?, MinioBucket=?, MinioObjectKey=?, AtualizadoEm=SYSUTCDATETIME() WHERE ArquivoLocal=? """, beneficiario, cnpj_beneficiario, pagador, cnpj_pagador, numero_documento, nosso_numero, _parse_date_br(row.get("vencimento")), _parse_money_br(row.get("valor_documento")), linha_digitavel, ag_cod_benef, chave_cte, chave_nfe, minio_bucket, minio_object_key, arquivo_local, ) return cur.execute( """ INSERT INTO dbo.BoletoCte ( ArquivoLocal, Beneficiario, CnpjBeneficiario, Pagador, CnpjPagador, NumeroDocumento, NossoNumero, Vencimento, ValorDocumento, LinhaDigitavel, AgenciaCodigoBeneficiario, ChaveCte, ChaveNfe, MinioBucket, MinioObjectKey ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, arquivo_local, beneficiario, cnpj_beneficiario, pagador, cnpj_pagador, numero_documento, nosso_numero, _parse_date_br(row.get("vencimento")), _parse_money_br(row.get("valor_documento")), linha_digitavel, ag_cod_benef, chave_cte, chave_nfe, minio_bucket, minio_object_key, ) def process_boletos_to_minio_sql(boletos: List[Dict[str, Optional[str]]]) -> int: if not boletos: return 0 client, bucket, prefix = criar_minio_client() prepared_rows: List[Dict[str, Optional[str]]] = [] uploads = 0 for b in boletos: arquivo_str = (b.get("arquivo") or "").strip() arquivo = Path(arquivo_str) if arquivo_str else None pdf_bytes = b.get("_pdf_bytes") # type: ignore[assignment] if isinstance(pdf_bytes, str): pdf_bytes = pdf_bytes.encode("utf-8", errors="ignore") if not isinstance(pdf_bytes, (bytes, bytearray)): continue base_name = arquivo.name if arquivo else Path(arquivo_str or "boleto.pdf").name nf = _extract_nf_from_name(base_name) if nf and b.get("chave_cte"): obj = f"boleto/cte_{b.get('chave_cte')}_nf_{nf}_{_safe_file_part(base_name)}" elif nf and b.get("chave_nfe"): obj = f"boleto/nfe_{b.get('chave_nfe')}_nf_{nf}_{_safe_file_part(base_name)}" elif b.get("chave_nfe"): obj = f"boleto/nfe_{b.get('chave_nfe')}_{_safe_file_part(base_name)}" elif nf: obj = f"boleto/nf_{nf}_{_safe_file_part(base_name)}" else: obj = f"boleto/{_safe_file_part(base_name)}" object_key = upload_bytes_minio( client, bucket, prefix, obj, pdf_bytes, "application/pdf", ) uploads += 1 b["minio_bucket"] = bucket b["minio_object_key"] = object_key row = dict(b) row.pop("_pdf_bytes", None) prepared_rows.append(row) try: import pyodbc # type: ignore except Exception: print(f"pyodbc ausente. Upload MinIO OK={uploads}. Upsert SQL ignorado.") return uploads for attempt in range(1, SQL_RETRY_ATTEMPTS + 1): try: with SQL_UPSERT_LOCK: cn = pyodbc.connect(_normalize_sql_conn(SQLSERVER_CONN), timeout=30) cn.autocommit = False cur = cn.cursor() ensure_boleto_cte_schema(cur) upserts = 0 for row in prepared_rows: upsert_boleto_cte(cur, row) upserts += 1 cn.commit() cur.close() cn.close() print(f"Upload MinIO OK={uploads} | Upsert SQL OK={upserts}") return upserts except Exception as e: msg = str(e) is_deadlock = " 1205" in msg or "(1205)" in msg or "deadlock" in msg.lower() if is_deadlock and attempt < SQL_RETRY_ATTEMPTS: time.sleep(SQL_RETRY_BASE_SECONDS * (2 ** (attempt - 1))) continue print(f"SQL indisponivel ({e}). Upload MinIO OK={uploads}. Upsert SQL ignorado.") return uploads class SagezzaClient: """ Cliente HTTP para autenticação e navegação autenticada no sistema Saggezza. Fluxo: 1) GET /login para obter cookies + CSRF (_token) 2) POST /login com email, password, _token e remember 3) Reuso da mesma session para páginas internas """ def __init__( self, base_url: str = BASE_URL, timeout: int = DEFAULT_TIMEOUT, session: Optional[requests.Session] = None, ): self.base_url = base_url.rstrip("/") self.timeout = timeout self.s = session or requests.Session() # Evita proxy automático do ambiente (causa erro 10061 nesta máquina). self.s.trust_env = False self.s.headers.update( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.0.0 Safari/537.36" ) } ) def _abs_url(self, path_or_url: str) -> str: if path_or_url.startswith("http://") or path_or_url.startswith("https://"): return path_or_url return urljoin(f"{self.base_url}/", path_or_url.lstrip("/")) def _get_login_page(self) -> str: login_url = self._abs_url(LOGIN_PATH) r = self.s.get(login_url, timeout=self.timeout) r.raise_for_status() return r.text def login(self, email: str, password: str, remember: bool = True) -> LoginResult: login_page = self._get_login_page() csrf_token = _extract_csrf_token(login_page) payload = { "_token": csrf_token, "email": email, "password": password, "remember": "on" if remember else "", } headers = { "Content-Type": "application/x-www-form-urlencoded", "Origin": self.base_url, "Referer": self._abs_url(LOGIN_PATH), } r = self.s.post( self._abs_url(LOGIN_PATH), data=payload, headers=headers, timeout=self.timeout, allow_redirects=True, ) # Em sucesso, normalmente sai de /login após o redirect. final_url = (r.url or "").rstrip("/") login_url = self._abs_url(LOGIN_PATH).rstrip("/") ok = (r.status_code == 200) and (final_url != login_url) return LoginResult(ok=ok, status_code=r.status_code, final_url=r.url) def is_logged(self) -> bool: """ Heurística simples: sessão autenticada costuma ter cookie de sessão + URL inicial acessível. """ if "jrd_session" not in self.s.cookies: return False r = self.s.get(self._abs_url("/"), timeout=self.timeout, allow_redirects=False) return r.status_code in (200, 302) def get(self, path_or_url: str, **kwargs: Any) -> requests.Response: timeout = kwargs.pop("timeout", self.timeout) r = self.s.get(self._abs_url(path_or_url), timeout=timeout, **kwargs) r.raise_for_status() return r def post(self, path_or_url: str, **kwargs: Any) -> requests.Response: timeout = kwargs.pop("timeout", self.timeout) r = self.s.post(self._abs_url(path_or_url), timeout=timeout, **kwargs) r.raise_for_status() return r class OwnCloudWebDavClient: """ Cliente WebDAV para listar pastas/arquivos no ownCloud via PROPFIND. """ def __init__( self, webdav_url: str = OWNCLOUD_WEBDAV_URL, timeout: int = DEFAULT_TIMEOUT, session: Optional[requests.Session] = None, auth_header: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, ): self.webdav_url = webdav_url.rstrip("/") + "/" self.timeout = timeout self.s = session or requests.Session() # Evita proxy automático do ambiente (causa erro 10061 nesta máquina). self.s.trust_env = False self.auth_header = auth_header self.username = username self.password = password self.s.headers.update( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.0.0 Safari/537.36" ) } ) def _target_url(self, relative_path: str = "") -> str: relative_path = (relative_path or "").lstrip("/") return urljoin(self.webdav_url, relative_path) @staticmethod def _href_to_rel_path(href: str) -> str: path = unquote((href or "").strip()) marker = "/public.php/webdav/" if marker in path: path = path.split(marker, 1)[1] return path.strip("/") def _build_headers(self, depth: int) -> Dict[str, str]: headers = { "Depth": str(depth), "Content-Type": "application/xml; charset=UTF-8", } if self.auth_header: headers["Authorization"] = self.auth_header return headers def _auth_tuple(self): if self.username is not None: return (self.username, self.password or "") return None def _auth_headers(self) -> Dict[str, str]: return {"Authorization": self.auth_header} if self.auth_header else {} @staticmethod def build_basic_auth(username: str, password: str) -> str: raw = f"{username}:{password}" b64 = base64.b64encode(raw.encode("utf-8")).decode("ascii") return f"Basic {b64}" def _propfind_body(self) -> str: return ( '' '' "" "" "" "" "" "" "" ) @staticmethod def _parse_multistatus(xml_text: str) -> List[WebDavEntry]: ns = {"d": "DAV:"} root = ET.fromstring(xml_text) entries: List[WebDavEntry] = [] first = True for resp in root.findall("d:response", ns): href = (resp.findtext("d:href", default="", namespaces=ns) or "").strip() prop = resp.find("d:propstat/d:prop", ns) if prop is None: continue displayname = ( prop.findtext("d:displayname", default="", namespaces=ns) or "" ).strip() resourcetype = prop.find("d:resourcetype", ns) is_dir = resourcetype is not None and resourcetype.find("d:collection", ns) is not None size_raw = (prop.findtext("d:getcontentlength", default="", namespaces=ns) or "").strip() last_modified = ( prop.findtext("d:getlastmodified", default="", namespaces=ns) or "" ).strip() or None # Em PROPFIND Depth 1, o primeiro item costuma ser a propria pasta consultada. if first: first = False continue name = displayname or href.rstrip("/").split("/")[-1] size = int(size_raw) if size_raw.isdigit() else None entries.append( WebDavEntry( href=href, name=name, is_dir=is_dir, size=size, last_modified=last_modified, ) ) return entries def list_entries(self, path: str = "", depth: int = 1) -> List[WebDavEntry]: body = self._propfind_body() headers = self._build_headers(depth=depth) auth = self._auth_tuple() r = self.s.request( "PROPFIND", self._target_url(path), data=body.encode("utf-8"), headers=headers, auth=auth, timeout=self.timeout, ) if r.status_code not in (207, 200): if r.status_code == 401: raise RuntimeError( "PROPFIND 401 (nao autenticado no ownCloud). " "Defina OWNCLOUD_AUTH_HEADER='Basic ...' " "ou OWNCLOUD_USER/OWNCLOUD_PASS." ) raise RuntimeError( f"PROPFIND falhou. status={r.status_code} body={r.text[:500]}" ) return self._parse_multistatus(r.text) def list_folders(self, path: str = "", depth: int = 1) -> List[WebDavEntry]: return [x for x in self.list_entries(path=path, depth=depth) if x.is_dir] def download_file_bytes(self, remote_path: str) -> bytes: last_exc: Optional[Exception] = None for attempt in range(1, DOWNLOAD_RETRY_ATTEMPTS + 1): try: r = self.s.get( self._target_url(remote_path), timeout=self.timeout, stream=True, headers=self._auth_headers(), auth=self._auth_tuple(), ) if r.status_code == 401: # Algumas instancias exigem reafirmar auth em nova conexao. r = self.s.get( self._target_url(remote_path), timeout=self.timeout, stream=True, headers=self._auth_headers(), auth=self._auth_tuple(), ) r.raise_for_status() chunks: List[bytes] = [] for chunk in r.iter_content(chunk_size=1024 * 256): if chunk: chunks.append(chunk) return b"".join(chunks) except Exception as exc: last_exc = exc if attempt >= DOWNLOAD_RETRY_ATTEMPTS: break wait_s = DOWNLOAD_RETRY_BASE_SECONDS * (2 ** (attempt - 1)) time.sleep(wait_s) raise RuntimeError( f"Falha ao baixar arquivo remoto apos {DOWNLOAD_RETRY_ATTEMPTS} tentativas: {remote_path}" ) from last_exc def list_files_recursive(self, remote_root: str) -> List[str]: files: List[str] = [] for entry in self.list_entries(path=remote_root, depth=1): rel_remote = self._href_to_rel_path(entry.href) if not rel_remote: continue if entry.is_dir: files.extend(self.list_files_recursive(rel_remote)) else: files.append(rel_remote) return files def from_env() -> SagezzaClient: """ Constrói cliente e realiza login usando variáveis de ambiente: - SGZ_EMAIL - SGZ_PASSWORD - SGZ_BASE_URL (opcional) """ email = os.getenv("SGZ_EMAIL", "").strip() password = os.getenv("SGZ_PASSWORD", "").strip() base_url = os.getenv("SGZ_BASE_URL", BASE_URL).strip() or BASE_URL if not email or not password: raise RuntimeError("Defina SGZ_EMAIL e SGZ_PASSWORD no ambiente.") client = SagezzaClient(base_url=base_url) result = client.login(email=email, password=password, remember=True) if not result.ok: raise RuntimeError( f"Falha no login Sagezza. status={result.status_code} final_url={result.final_url}" ) return client def owncloud_from_env() -> OwnCloudWebDavClient: """ Cria cliente WebDAV lendo ambiente: - OWNCLOUD_WEBDAV_URL (opcional) - OWNCLOUD_AUTH_HEADER (opcional, ex: "Basic ...") - OWNCLOUD_USER / OWNCLOUD_PASS (opcional) - OWNCLOUD_SHARE_TOKEN / OWNCLOUD_SHARE_PASSWORD (opcional) """ webdav_url = os.getenv("OWNCLOUD_WEBDAV_URL", OWNCLOUD_WEBDAV_URL).strip() auth_header = os.getenv("OWNCLOUD_AUTH_HEADER", "").strip() or None username = os.getenv("OWNCLOUD_USER", "").strip() or None password = os.getenv("OWNCLOUD_PASS", "").strip() or None # Para links publicos: user = share_token, pass = senha do link (ou "null" sem senha) share_token = os.getenv("OWNCLOUD_SHARE_TOKEN", "").strip() share_password = ( os.getenv("OWNCLOUD_SHARE_PASSWORD", "").strip() or MOCK_OWNCLOUD_SHARE_PASSWORD ) if not auth_header and share_token: auth_header = OwnCloudWebDavClient.build_basic_auth( username=share_token, password=share_password or "null", ) return OwnCloudWebDavClient( webdav_url=webdav_url, auth_header=auth_header, username=username, password=password, ) def discover_owncloud_share_token(client: SagezzaClient) -> Optional[str]: try: html = client.get("/").text except Exception: return None m = re.search( r"https?://owncloud\.hqssolucoes\.com\.br/index\.php/s/([A-Za-z0-9]+)", html or "", flags=re.IGNORECASE, ) if m: return m.group(1) m = re.search(r"/index\.php/s/([A-Za-z0-9]+)", html or "", flags=re.IGNORECASE) if m: return m.group(1) return None def owncloud_from_share_token(share_token: str) -> OwnCloudWebDavClient: webdav_url = os.getenv("OWNCLOUD_WEBDAV_URL", OWNCLOUD_WEBDAV_URL).strip() share_password = ( os.getenv("OWNCLOUD_SHARE_PASSWORD", "").strip() or MOCK_OWNCLOUD_SHARE_PASSWORD ) auth_header = OwnCloudWebDavClient.build_basic_auth( username=share_token, password=share_password or "null", ) return OwnCloudWebDavClient(webdav_url=webdav_url, auth_header=auth_header) if __name__ == "__main__": base_url = os.getenv("SGZ_BASE_URL", BASE_URL).strip() or BASE_URL creds = load_sgz_credentials() print(f"Usuarios SGZ para consulta: {len(creds)}") ok_logins: List[Tuple[str, SagezzaClient]] = [] fail_logins: List[str] = [] def _try_login(email: str, password: str) -> Tuple[str, bool, str, Optional[SagezzaClient]]: client_tmp = SagezzaClient(base_url=base_url) try: result = client_tmp.login(email=email, password=password, remember=True) ok = result.ok and client_tmp.is_logged() if ok: return (email, True, f"status={result.status_code} | final_url={result.final_url}", client_tmp) return (email, False, f"status={result.status_code} | final_url={result.final_url}", None) except Exception as exc: return (email, False, str(exc), None) with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as ex: futures = { ex.submit(_try_login, email, password): (email, password) for email, password in creds } for fut in as_completed(futures): email, password = futures[fut] try: r_email, ok, info, c_ok = fut.result() if ok: print(f"[OK] {r_email} | {info}") if c_ok is not None: ok_logins.append((email, c_ok)) else: print(f"[FALHA] {r_email} | {info}") fail_logins.append(r_email) except Exception as exc: print(f"[ERRO] {email} | {exc}") fail_logins.append(email) print( f"Resumo logins SGZ: total={len(creds)} ok={len(ok_logins)} " f"falha={len(fail_logins)}" ) if fail_logins: print("Usuarios com falha:") for email in fail_logins: print(f"- {email}") if not ok_logins: raise RuntimeError("Nenhum login SGZ valido na lista informada.") should_run_owncloud = os.getenv("SGZ_RUN_OWNCLOUD", "1").strip().lower() in ( "1", "true", "yes", "sim", ) if not should_run_owncloud: print( "Fluxo ownCloud desativado. Defina SGZ_RUN_OWNCLOUD=1 para " "baixar/listar arquivos." ) raise SystemExit(0) has_owncloud_auth = True if has_owncloud_auth: ano_atual = str(datetime.now().year) path_ano = os.getenv("OWNCLOUD_YEAR_PATH", "").strip() or ano_atual alvo = os.getenv("OWNCLOUD_TARGET_FOLDER", "BOTICARIO").strip() or "BOTICARIO" alvo_norm = _norm_name(alvo) total_boletos = 0 total_gravados = 0 def _process_user(login_item: Tuple[str, SagezzaClient]) -> Tuple[str, int, int, Optional[str]]: email_ativo, client_ativo = login_item try: token = discover_owncloud_share_token(client_ativo) if not token: return (email_ativo, 0, 0, "Token ownCloud nao encontrado no HTML do SGZ.") ow = owncloud_from_share_token(token) itens = ow.list_entries(path=path_ano, depth=1) pasta_alvo = next( (x for x in itens if x.is_dir and alvo_norm in _norm_name(unquote(x.name))), None, ) if not pasta_alvo: return (email_ativo, 0, 0, f"Pasta alvo nao encontrada dentro de '{path_ano}': {alvo}") path_alvo = unquote(pasta_alvo.href).replace("/public.php/webdav/", "").strip("/") remote_files = ow.list_files_recursive(path_alvo) xml_by_folder: Dict[str, List[str]] = {} pdf_paths: List[str] = [] for rp in remote_files: low = rp.lower() folder = str(Path(rp).parent).replace("\\", "/").strip("/") if low.endswith(".xml"): xml_by_folder.setdefault(folder, []).append(Path(rp).name) elif low.endswith(".pdf"): pdf_paths.append(rp) def _process_remote_pdf(rp: str) -> Optional[Dict[str, Optional[str]]]: pdf_bytes = ow.download_file_bytes(rp) row = extract_boleto_from_pdf_bytes( pdf_bytes=pdf_bytes, source_name=Path(rp).name, source_path=rp, ) if not row: return None folder = str(Path(rp).parent).replace("\\", "/").strip("/") keys = _guess_cte_nfe_keys_from_xml_names(xml_by_folder.get(folder, [])) row["chave_cte"] = keys.get("chave_cte") row["chave_nfe"] = keys.get("chave_nfe") row["_pdf_bytes"] = pdf_bytes # type: ignore[index] return row boletos: List[Dict[str, Optional[str]]] = [] with ThreadPoolExecutor(max_workers=MAX_PARALLEL_PDFS_PER_USER) as ex_pdf: futures_pdf = {ex_pdf.submit(_process_remote_pdf, rp): rp for rp in pdf_paths} for fut_pdf in as_completed(futures_pdf): rp = futures_pdf[fut_pdf] try: row = fut_pdf.result() if row: boletos.append(row) except Exception as exc: print(f"{email_ativo}: falha ao processar PDF remoto '{rp}': {exc}") gravados = process_boletos_to_minio_sql(boletos) if boletos else 0 return (email_ativo, len(boletos), gravados, None) except Exception as exc: return (email_ativo, 0, 0, str(exc)) with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as ex_users: futures_user = { ex_users.submit(_process_user, login_item): login_item[0] for login_item in ok_logins } for fut_user in as_completed(futures_user): email_ativo = futures_user[fut_user] try: email_done, n_boletos, n_gravados, err = fut_user.result() if err: print(f"Falha no processamento ownCloud para {email_done}: {err}") continue print(f"{email_done}: boletos extraidos (em memoria) = {n_boletos}") print(f"{email_done}: enviados/upsert = {n_gravados}") total_boletos += n_boletos total_gravados += n_gravados except Exception as exc: print(f"Falha no processamento ownCloud para {email_ativo}: {exc}") print( f"Resumo ownCloud (todos logins): boletos_extraidos={total_boletos} " f"enviados_upsert={total_gravados}" ) else: print( "OwnCloud nao executado: defina OWNCLOUD_AUTH_HEADER, " "OWNCLOUD_USER/OWNCLOUD_PASS, ou OWNCLOUD_SHARE_TOKEN." )