trm_rem_saggezza/recebiveis_report_importer.py
2026-06-23 16:33:53 -03:00

702 lines
25 KiB
Python

"""
Baixa relatórios de recebíveis via API /v2/franchisee/reports e importa no SQL Server.
Modos de operação:
- INCREMENTAL=True → importa do dia seguinte ao último importado até ontem (diário)
- INCREMENTAL=False → importa o intervalo fixo START_DATE..END_DATE
Deduplicação:
- dbo.Grgb_vendas_import_log registra cada arquivo já importado pelo fileName
- Se o arquivo já constar no log, a importação é pulada
Mediadores:
- USE_ALL_MEDIATORS=True → busca todos via API de lojas (get_store_codes)
- USE_ALL_MEDIATORS=False → usa a lista MEDIATOR_CODES abaixo
"""
import csv
import io
import json
import os
import re
import time
import requests
from datetime import date, timedelta
from typing import Optional
from installments_reader import Auth
# ─── CONFIGURE AQUI ───────────────────────────────────────────────────────────
USE_ALL_MEDIATORS = True # True = usa ALL_MEDIATOR_CODES abaixo; False = usa MEDIATOR_CODES
MEDIATOR_CODES = ["23708"] # usado apenas se USE_ALL_MEDIATORS = False
# Lista exata dos mediatorCodes válidos para o relatório (copiado do portal)
ALL_MEDIATOR_CODES = [
"checkAll",
"19334", "19335", "19336", "20008", "20010", "20029", "20058", "20059",
"20060", "20061", "20062", "20063", "20064", "20142", "20364", "20443",
"20444", "20621", "20622", "20623", "20772", "20773", "20774", "20775",
"20776", "20777", "20778", "20779", "20780", "20781", "20968", "20969",
"20970", "20986", "20988", "20989", "20991", "20992", "20993", "20994",
"20995", "20996", "20997", "20998", "20999", "21000", "21001", "21278",
"21279", "21375", "21383", "21495", "22448", "22541", "23703", "23704",
"23708", "23711", "23712", "23713", "23813", "24255", "24257", "24269",
"24293", "24447", "24451", "24457", "24458", "4494",
]
INCREMENTAL = True # True = modo diário incremental
START_DATE = "2025-12-01" # usado apenas se INCREMENTAL = False
END_DATE = "2026-06-22" # usado apenas se INCREMENTAL = False
INCREMENTAL_DEFAULT_START = "2026-01-01" # data inicial no primeiro run incremental
DATA_TYPE = "VENDAS"
REPORT_TYPE = "F360"
MAX_DAYS_PER_CHUNK = 15 # API limita a 15 dias por requisição com todos os mediadores
POLL_INTERVAL_S = 20
POLL_TIMEOUT_S = 600
WRITE_SQL = True
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
WATERMARK_FILE = os.path.join(_SCRIPT_DIR, "vendas_watermark.json")
IMPORT_BATCH_SIZE = 500 # linhas por lote no SQL Server
ENRICH_PAYMENT_TYPES = False # True = consulta API de parcelas para preencher paymentType
# Nome da coluna em Grgb_vendas_report que contém o installmentGroupCode.
# Verifique rodando: SELECT TOP 1 * FROM dbo.Grgb_vendas_report
INSTALLMENT_GROUP_COL = "Numero_Pedido"
ENRICH_RATE_LIMIT_S = 0.3 # pausa entre chamadas à API de parcelas
# ──────────────────────────────────────────────────────────────────────────────
SQL_CONN = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=andrey;"
"PWD=88253332;"
"TrustServerCertificate=yes;"
)
REPORTS_URL = (
"https://bff-portal-apigw.produto-financeiro.grupoboticario.digital"
"/v2/franchisee/reports"
)
INSTALLMENTS_URL = (
"https://bff-credit-container-portal-apigw.prd.produto-financeiro.app.grupoboticario.com.br"
"/v1/franchisee/installments"
)
_DONE_STATUSES = {"generated", "generated_csv", "done", "completed", "ready", "available"}
_FAIL_STATUSES = {"failed", "error", "erro", "falha"}
# ─── WATERMARK ────────────────────────────────────────────────────────────────
def _load_watermark() -> str:
"""Retorna a última data importada (YYYY-MM-DD) ou a data padrão inicial."""
try:
if os.path.exists(WATERMARK_FILE):
with open(WATERMARK_FILE, encoding="utf-8") as f:
data = json.load(f)
val = str(data.get("last_end_date") or "").strip()
if val:
return val
except Exception:
pass
return ""
def _save_watermark(end_date: str) -> None:
tmp = WATERMARK_FILE + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump({"last_end_date": end_date}, f)
os.replace(tmp, WATERMARK_FILE)
# ─── API ──────────────────────────────────────────────────────────────────────
def _headers(auth: Auth) -> dict:
return {
"Authorization": auth.get_bearer(),
"Accept": "*/*",
"Content-Type": "application/json",
"Origin": "https://extranet.grupoboticario.com.br",
"Referer": "https://extranet.grupoboticario.com.br/",
"User-Agent": "Mozilla/5.0",
}
def _find_pending_in_list(
session: requests.Session,
auth: Auth,
start_date: str,
end_date: str,
) -> Optional[str]:
"""
Após um 504 no POST, o servidor pode ter criado o relatório mesmo sem responder.
Busca na listagem um relatório recente com o mesmo período.
"""
try:
raw = session.get(
REPORTS_URL,
params={"type": REPORT_TYPE, "dataType": DATA_TYPE},
headers=_headers(auth),
timeout=30,
).json()
reports = raw.get("data", {}).get("reports") or []
for rep in reports:
# compara datas do relatório — a API retorna solicitedAt, não as datas do período,
# então usamos o índice 0 (mais recente) criado nos últimos 5 minutos
solicited = str(rep.get("solicitedAt") or "")
# pega o mais recente (lista já vem ordenada por data desc)
rid = str(rep.get("id") or "")
if rid:
print(f"[info] relatório encontrado na lista após 504: id={rid} status={rep.get('status')}")
return rid
except Exception as exc:
print(f"[warn] falha ao buscar lista após 504: {exc}")
return None
def _create_report(
session: requests.Session,
auth: Auth,
mediator_codes: list[str],
start_date: str,
end_date: str,
) -> str:
"""Solicita criação do relatório e retorna o report_id."""
payload = {
"dataType": DATA_TYPE,
"startSolicitedDate": start_date,
"endSolicitedDate": end_date,
"installmentCode": "",
"installmentGroupCode": "",
"mediatorCodes": mediator_codes,
"type": REPORT_TYPE,
}
print(f"[POST] {DATA_TYPE} {start_date}..{end_date} mediadores={len(mediator_codes)}")
transient = {429, 500, 502, 503, 504}
for attempt in range(1, 6):
try:
r = session.post(REPORTS_URL, json=payload, headers=_headers(auth), timeout=60)
if r.status_code in transient:
wait = min(60, 10 * attempt)
print(f"[warn] POST {r.status_code} (tentativa {attempt}/5), aguardando {wait}s...")
time.sleep(wait)
# 504 pode significar que o servidor criou mas não respondeu — checa a lista
if r.status_code == 504:
rid = _find_pending_in_list(session, auth, start_date, end_date)
if rid:
return rid
continue
if not r.ok:
print(f"[erro] POST {r.status_code}: {r.text[:300].encode('ascii','replace').decode()}")
r.raise_for_status()
resp = r.json()
rid = _extract_report_id(resp)
if not rid:
raise RuntimeError(f"report_id ausente na resposta: {resp}")
fname = _extract_file_name(resp).encode("ascii", "replace").decode()
print(f"[info] report_id={rid} fileName={fname}")
return rid
except requests.exceptions.Timeout:
wait = min(60, 10 * attempt)
print(f"[warn] POST timeout (tentativa {attempt}/5), aguardando {wait}s...")
time.sleep(wait)
rid = _find_pending_in_list(session, auth, start_date, end_date)
if rid:
return rid
raise RuntimeError("POST /reports falhou após 5 tentativas (504/timeout persistente)")
def _extract_report_id(response: dict) -> Optional[str]:
data = response.get("data") or {}
if isinstance(data.get("report"), dict):
rid = data["report"].get("id")
if rid:
return str(rid)
for key in ("id", "reportId", "uuid", "code"):
if data.get(key):
return str(data[key])
return None
def _extract_file_name(response: dict) -> str:
data = response.get("data") or {}
rep = data.get("report") or data
return str(rep.get("filename") or rep.get("fileName") or "")
def _find_download_url(obj: dict) -> Optional[str]:
for key in ("fileLink", "url", "downloadUrl", "fileUrl", "s3Url", "link",
"presignedUrl", "signedUrl"):
if obj.get(key):
return str(obj[key])
return None
def _fetch_report_by_id(
session: requests.Session, auth: Auth, report_id: str
) -> tuple[Optional[str], str]:
"""Retorna (download_url, file_name) buscando o relatório pelo id."""
r = session.get(f"{REPORTS_URL}/{report_id}", headers=_headers(auth), timeout=30)
if r.status_code != 200:
return None, ""
body = r.json()
data = body.get("data") or {}
rep = data.get("report") or data
url = _find_download_url(rep) or _find_download_url(data) or _find_download_url(body)
name = str(rep.get("fileName") or rep.get("filename") or "")
return url, name
def _poll_until_ready(
session: requests.Session,
auth: Auth,
report_id: str,
) -> tuple[str, str]:
"""
Consulta GET /reports?type=F360&dataType=RECEBIVEIS a cada POLL_INTERVAL_S segundos.
Quando o relatório chegar em GENERATED, busca o fileLink via GET /reports/{id}.
Retorna (download_url, file_name).
"""
deadline = time.monotonic() + POLL_TIMEOUT_S
attempt = 0
while time.monotonic() < deadline:
attempt += 1
raw = session.get(
REPORTS_URL,
params={"type": REPORT_TYPE, "dataType": DATA_TYPE},
headers=_headers(auth),
timeout=30,
).json()
reports = raw.get("data", {}).get("reports") or []
for rep in reports:
if str(rep.get("id") or "") != report_id:
continue
status = str(rep.get("status") or "").strip().lower()
if status in _FAIL_STATUSES:
raise RuntimeError(f"Relatório falhou: {rep}")
if status in _DONE_STATUSES:
url, file_name = _fetch_report_by_id(session, auth, report_id)
if url:
print(f"[poll] pronto após {attempt} tentativa(s) (status={status})")
return url, file_name
print(f"[poll] tentativa {attempt} status={status}, aguardando {POLL_INTERVAL_S}s...")
break
else:
print(f"[poll] tentativa {attempt} relatório não listado ainda, "
f"aguardando {POLL_INTERVAL_S}s...")
time.sleep(POLL_INTERVAL_S)
raise TimeoutError(f"Relatório não ficou pronto em {POLL_TIMEOUT_S}s")
def _download_csv(url: str) -> str:
print("[download] baixando CSV do S3...")
r = requests.get(url, timeout=180)
r.raise_for_status()
for enc in ("utf-8-sig", "utf-8", "latin-1"):
try:
return r.content.decode(enc)
except UnicodeDecodeError:
continue
return r.content.decode("latin-1", errors="replace")
# ─── SQL ──────────────────────────────────────────────────────────────────────
def _sql_conn_str() -> str:
s = SQL_CONN.rstrip(";")
if not re.search(r"(?i)\bEncrypt\b", s):
s += ";Encrypt=no"
if not re.search(r"(?i)\bTrustServerCertificate\b", s):
s += ";TrustServerCertificate=yes"
return s + ";"
def _safe_col(name: str) -> str:
name = re.sub(r"[^a-zA-Z0-9_]", "_", name.strip())
if name and name[0].isdigit():
name = "c_" + name
return name[:128] or "col"
def _ensure_tables(cur, csv_cols: list[str]) -> list[str]:
"""Garante que as tabelas de dados e log existam, com todas as colunas do CSV."""
# tabela de log de importações (controle de duplicatas)
cur.execute("""
IF OBJECT_ID('dbo.Grgb_vendas_import_log', 'U') IS NULL
BEGIN
CREATE TABLE dbo.Grgb_vendas_import_log (
Id INT IDENTITY(1,1) PRIMARY KEY,
FileName VARCHAR(120) NOT NULL UNIQUE,
PeriodoInicio DATE NULL,
PeriodoFim DATE NULL,
TotalLinhas INT NOT NULL DEFAULT 0,
ImportadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
END
""")
# tabela de dados
cur.execute("""
IF OBJECT_ID('dbo.Grgb_vendas_report', 'U') IS NULL
BEGIN
CREATE TABLE dbo.Grgb_vendas_report (
Id INT IDENTITY(1,1) PRIMARY KEY,
NomeArquivo VARCHAR(120) NULL,
CriadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
END
""")
# garante coluna NomeArquivo
cur.execute("""
SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report'
""")
existing = {r[0] for r in cur.fetchall()}
if "nomearquivo" not in existing:
cur.execute("ALTER TABLE dbo.Grgb_vendas_report ADD [NomeArquivo] VARCHAR(120) NULL")
safe_cols = [_safe_col(c) for c in csv_cols]
# recarrega colunas existentes
cur.execute("""
SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report'
""")
existing = {r[0] for r in cur.fetchall()}
for col in safe_cols:
if col.lower() not in existing:
cur.execute(
f"ALTER TABLE dbo.Grgb_vendas_report ADD [{col}] NVARCHAR(500) NULL"
)
print(f"[schema] coluna adicionada: {col}")
return safe_cols
def _is_already_imported(cur, file_name: str, start_date: str = "", end_date: str = "") -> bool:
cur.execute(
"SELECT 1 FROM dbo.Grgb_vendas_import_log WHERE FileName = ?", file_name
)
if cur.fetchone() is not None:
return True
# bloqueia se o período já está coberto por outro arquivo
if start_date and end_date:
cur.execute(
"SELECT 1 FROM dbo.Grgb_vendas_import_log "
"WHERE PeriodoInicio <= ? AND PeriodoFim >= ?",
end_date, start_date,
)
if cur.fetchone() is not None:
return True
return False
def _import_csv(
rows: list[dict],
csv_cols: list[str],
file_name: str,
start_date: str,
end_date: str,
) -> int:
import pyodbc
conn_str = _sql_conn_str()
total = len(rows)
# --- conexão inicial: garante schema e checa log ---
cn = pyodbc.connect(conn_str, timeout=60)
cn.autocommit = False
cur = cn.cursor()
safe_cols = _ensure_tables(cur, csv_cols)
cn.commit()
if _is_already_imported(cur, file_name, start_date, end_date):
print(f"[skip] {file_name} periodo {start_date}..{end_date} ja importado anteriormente")
cur.close(); cn.close()
return 0
# limpa linhas parciais de execuções que falharam no meio
cur.execute("DELETE FROM dbo.Grgb_vendas_report WHERE NomeArquivo = ?", file_name)
cn.commit()
cur.close(); cn.close()
# --- insere em lotes para não derrubar a conexão ---
all_cols = ["NomeArquivo"] + safe_cols
col_list = ", ".join(f"[{c}]" for c in all_cols)
placeholders = ", ".join("?" for _ in all_cols)
sql = (
f"INSERT INTO dbo.Grgb_vendas_report ({col_list}) "
f"VALUES ({placeholders})"
)
inserted = 0
for batch_start in range(0, total, IMPORT_BATCH_SIZE):
batch_rows = rows[batch_start: batch_start + IMPORT_BATCH_SIZE]
batch = [
(file_name,) + tuple(
(str(row.get(orig, "") or "")[:500] or None)
for orig in csv_cols
)
for row in batch_rows
]
cn = pyodbc.connect(conn_str, timeout=60)
cn.autocommit = False
cur = cn.cursor()
cur.fast_executemany = True
try:
cur.executemany(sql, batch)
cn.commit()
inserted += len(batch)
print(f"[sql] {inserted}/{total} linhas arquivo={file_name}")
except Exception:
cn.rollback()
raise
finally:
cur.close(); cn.close()
# registra no log de controle
cn = pyodbc.connect(conn_str, timeout=60)
cn.autocommit = False
cur = cn.cursor()
try:
cur.execute(
"""
INSERT INTO dbo.Grgb_vendas_import_log
(FileName, PeriodoInicio, PeriodoFim, TotalLinhas)
VALUES (?, ?, ?, ?)
""",
file_name, start_date, end_date, inserted,
)
cn.commit()
except Exception:
cn.rollback()
raise
finally:
cur.close(); cn.close()
print(f"[sql] importacao concluida: {inserted} linhas arquivo={file_name}")
return inserted
# ─── ENRIQUECIMENTO paymentType ───────────────────────────────────────────────
def _ensure_paytype_table(cur) -> None:
cur.execute("""
IF OBJECT_ID('dbo.Grgb_installment_paytype', 'U') IS NULL
BEGIN
CREATE TABLE dbo.Grgb_installment_paytype (
InstallmentGroupCode VARCHAR(50) NOT NULL PRIMARY KEY,
PaymentType VARCHAR(80) NULL,
ConsultadoEm DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
END
""")
def _fetch_pending_group_codes(cur, col_name: str, file_name: str) -> list[str]:
"""Retorna installmentGroupCodes do arquivo atual que ainda não têm paymentType."""
cur.execute(f"""
SELECT DISTINCT [{col_name}]
FROM dbo.Grgb_vendas_report
WHERE NomeArquivo = ?
AND [{col_name}] IS NOT NULL
AND [{col_name}] <> ''
AND [{col_name}] NOT IN (
SELECT InstallmentGroupCode FROM dbo.Grgb_installment_paytype
)
""", file_name)
return [str(r[0]) for r in cur.fetchall()]
def _query_payment_type(
session: requests.Session,
auth: Auth,
group_code: str,
) -> Optional[str]:
"""Consulta a API de parcelas e retorna o paymentType da primeira parcela."""
try:
r = session.get(
INSTALLMENTS_URL,
params={"installmentGroupCode": group_code, "page": 1},
headers=_headers(auth),
timeout=30,
)
if not r.ok:
print(f"[enrich] {r.status_code} para groupCode={group_code}")
return None
installments = r.json().get("data", {}).get("installments") or []
if installments:
return str(installments[0].get("paymentType") or "")
except Exception as exc:
print(f"[enrich] erro ao consultar groupCode={group_code}: {exc}")
return None
def enrich_payment_types(session: requests.Session, auth: Auth, file_name: str) -> None:
"""
Para cada installmentGroupCode do arquivo atual que ainda não consta
em Grgb_installment_paytype, consulta a API e persiste o paymentType.
"""
import pyodbc
conn_str = _sql_conn_str()
cn = pyodbc.connect(conn_str, timeout=60)
cn.autocommit = False
cur = cn.cursor()
_ensure_paytype_table(cur)
cn.commit()
# verifica se a coluna configurada existe na tabela de dados
cur.execute("""
SELECT LOWER(COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Grgb_vendas_report'
""")
existing_cols = {r[0] for r in cur.fetchall()}
col_name = INSTALLMENT_GROUP_COL
if col_name.lower() not in existing_cols:
print(
f"[enrich] coluna '{col_name}' nao encontrada em Grgb_vendas_report. "
f"Ajuste INSTALLMENT_GROUP_COL. Colunas disponiveis: {sorted(existing_cols)}"
)
cur.close(); cn.close()
return
pending = _fetch_pending_group_codes(cur, col_name, file_name)
cur.close(); cn.close()
total = len(pending)
print(f"[enrich] {total} installmentGroupCode(s) para consultar")
if not total:
return
for i, group_code in enumerate(pending, 1):
payment_type = _query_payment_type(session, auth, group_code)
cn = pyodbc.connect(conn_str, timeout=60)
cn.autocommit = False
cur = cn.cursor()
try:
cur.execute(
"INSERT INTO dbo.Grgb_installment_paytype (InstallmentGroupCode, PaymentType) "
"VALUES (?, ?)",
group_code, payment_type,
)
cn.commit()
except Exception:
cn.rollback()
raise
finally:
cur.close(); cn.close()
if i % 50 == 0 or i == total:
print(f"[enrich] {i}/{total} ultimo={group_code} paymentType={payment_type}")
time.sleep(ENRICH_RATE_LIMIT_S)
print(f"[enrich] concluido: {total} registros gravados em Grgb_installment_paytype")
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def _resolve_date_range() -> tuple[str, str]:
"""Sempre importa D-1. Antes das 9h30 BRT, dados ainda não disponíveis."""
from datetime import datetime, timezone, timedelta as td
BRT = timezone(td(hours=-3))
now_brt = datetime.now(tz=BRT)
cutoff = now_brt.replace(hour=9, minute=30, second=0, microsecond=0)
if now_brt < cutoff:
print("[info] antes das 9h30 BRT — dados de D-1 ainda nao disponiveis.")
return "", ""
yesterday = (now_brt - td(days=1)).date().isoformat()
return yesterday, yesterday
def _build_chunks(start_date: str, end_date: str) -> list[tuple[str, str]]:
"""Divide o intervalo em janelas de MAX_DAYS_PER_CHUNK dias."""
chunks = []
cursor = date.fromisoformat(start_date)
end = date.fromisoformat(end_date)
step = timedelta(days=MAX_DAYS_PER_CHUNK - 1)
while cursor <= end:
chunk_end = min(cursor + step, end)
chunks.append((cursor.isoformat(), chunk_end.isoformat()))
cursor = chunk_end + timedelta(days=1)
return chunks
def main() -> None:
if INCREMENTAL:
start_date, end_date = _resolve_date_range()
if not start_date:
print("[info] dados já estão atualizados até ontem, nada a importar.")
return
else:
start_date, end_date = START_DATE, END_DATE
print(f"[info] período a importar: {start_date} .. {end_date}")
session = requests.Session()
session.trust_env = False
auth = Auth(session)
# resolve mediator codes
if USE_ALL_MEDIATORS:
mediator_codes = ALL_MEDIATOR_CODES
print(f"[info] usando todos os mediadores ({len(mediator_codes) - 1} lojas + checkAll)")
else:
mediator_codes = list(MEDIATOR_CODES)
if not mediator_codes:
print("[erro] nenhum mediator code disponível, abortando.")
return
chunks = _build_chunks(start_date, end_date)
print(f"[info] {len(chunks)} janela(s) de até {MAX_DAYS_PER_CHUNK} dias")
total_linhas = 0
last_ok_end = _load_watermark() if INCREMENTAL else ""
for i, (chunk_start, chunk_end) in enumerate(chunks, 1):
print(f"\n[chunk {i}/{len(chunks)}] {chunk_start} .. {chunk_end}")
report_id = _create_report(session, auth, mediator_codes, chunk_start, chunk_end)
download_url, file_name = _poll_until_ready(session, auth, report_id)
print(f"[info] fileName={file_name.encode('ascii','replace').decode()}")
csv_text = _download_csv(download_url)
reader = csv.DictReader(io.StringIO(csv_text))
csv_cols = list(reader.fieldnames or [])
rows = list(reader)
print(f"[csv] {len(rows)} linhas {len(csv_cols)} colunas")
if not rows:
print("[info] chunk vazio.")
elif not WRITE_SQL:
print("[info] WRITE_SQL=False — primeiras 2 linhas:")
for r in rows[:2]:
print(json.dumps(r, ensure_ascii=False))
else:
n = _import_csv(rows, csv_cols, file_name, chunk_start, chunk_end)
total_linhas += n
if ENRICH_PAYMENT_TYPES and n > 0:
print(f"[enrich] enriquecendo paymentType para arquivo {file_name}...")
enrich_payment_types(session, auth, file_name)
if INCREMENTAL:
_save_watermark(chunk_end)
last_ok_end = chunk_end
print(f"\n[fim] total={total_linhas} linhas watermark={last_ok_end}")
if __name__ == "__main__":
main()