G-Scripts/rgb_sale_receipts.py
daniel.rodrigues 2fa715913e att
2026-01-23 14:20:44 -03:00

1427 lines
61 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Script para acessar a API do Grupo Boticário - Sale Receipts
Busca recibos de vendas por data específica
"""
import requests
import pyodbc
from datetime import datetime
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
# ==============================
# Configurações de paralelismo
# ==============================
MAX_WORKERS = 5 # Número de requisições paralelas
PAGE_SIZE = 50 # Itens por página (padrão da API)
MAX_RETRIES = 5 # Máximo de tentativas por página
RETRY_DELAY = 5 # Segundos entre tentativas
FINAL_RETRY_DELAY = 30 # Segundos entre rodadas de retry final
# Lock para print thread-safe
print_lock = threading.Lock()
class BoticarioAPI:
"""Cliente para acessar a API do Grupo Boticário"""
def __init__(self, bearer_token: str):
"""
Inicializa o cliente da API
Args:
bearer_token: Token de autenticação Bearer
"""
self.base_url = "https://api.grupoboticario.com.br"
self.bearer_token = bearer_token
self.headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
self.token_lock = threading.Lock()
def refresh_token(self):
"""Renova o token de autenticação buscando do banco de dados"""
with self.token_lock:
print("\n🔄 Renovando token de autenticação...")
new_token = get_bearer_token_from_database()
if new_token:
self.bearer_token = new_token
self.headers["Authorization"] = f"Bearer {new_token}"
print("✅ Token renovado com sucesso!")
return True
else:
print("❌ Falha ao renovar token")
return False
def update_token(self, new_token: str):
"""Atualiza o token manualmente"""
with self.token_lock:
self.bearer_token = new_token
self.headers["Authorization"] = f"Bearer {new_token}"
def get_sale_receipts_page(self, sale_date: str, start: int = 0, silent: bool = False) -> Optional[Dict]:
"""
Busca uma página de recibos de vendas por data
Args:
sale_date: Data da venda no formato YYYY-MM-DD
start: Índice inicial para paginação (padrão: 0)
silent: Se True, não imprime logs (para uso em paralelo)
Returns:
Resposta da API ou None em caso de erro
Retorna {"token_expired": True} se o token expirou
"""
endpoint = "/global/v1/franchising/gb-stores-data/sale/receipts"
url = f"{self.base_url}{endpoint}"
params = {
"receipt.saleDate": sale_date,
"start": start
}
try:
if not silent:
print(f"Fazendo requisição para: {url}")
print(f"Parâmetros: {params}")
response = requests.get(
url=url,
headers=self.headers,
params=params,
timeout=60
)
if not silent:
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
return response.json()
elif response.status_code in [401, 403]:
# Token expirado ou inválido
if not silent:
print(f"⚠️ Token expirado ou inválido (HTTP {response.status_code})")
return {"token_expired": True}
else:
if not silent:
print(f"Erro na requisição: {response.status_code}")
print(f"Resposta: {response.text}")
return None
except requests.exceptions.RequestException as e:
if not silent:
print(f"Erro na requisição: {e}")
return None
def fetch_page_with_retry(self, sale_date: str, start: int, total_pages: int, page_num: int) -> Dict:
"""
Busca uma página específica com retry e retorna resultado
Args:
sale_date: Data da venda
start: Índice inicial
total_pages: Total de páginas (para log)
page_num: Número da página atual (para log)
Returns:
Dict com 'success', 'start', 'items', 'token_expired'
"""
for attempt in range(1, MAX_RETRIES + 1):
try:
result = self.get_sale_receipts_page(sale_date, start=start, silent=True)
# Verificar se token expirou
if result and result.get('token_expired'):
with print_lock:
print(f" ⚠️ Página {page_num} (start={start}): Token expirado!")
return {"success": False, "start": start, "items": [], "token_expired": True}
if result and result.get('items') is not None:
items = result.get('items', [])
with print_lock:
print(f" ✓ Página {page_num}/{total_pages}: {len(items)} recibos (start={start})")
return {"success": True, "start": start, "items": items, "token_expired": False}
else:
with print_lock:
print(f" ✗ Página {page_num} (start={start}): Erro - Tentativa {attempt}/{MAX_RETRIES}")
except Exception as e:
with print_lock:
print(f" ✗ Página {page_num} (start={start}): {str(e)[:50]} - Tentativa {attempt}/{MAX_RETRIES}")
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY)
return {"success": False, "start": start, "items": [], "token_expired": False}
def get_all_sale_receipts(self, sale_date: str) -> Optional[Dict]:
"""
Busca TODOS os recibos de vendas por data usando requisições PARALELAS
Com renovação automática de token quando expirar
Args:
sale_date: Data da venda no formato YYYY-MM-DD
Returns:
Dicionário com todos os recibos consolidados ou None em caso de erro
"""
print(f"🔍 Iniciando busca paralela de recibos para {sale_date}")
# Primeira requisição para descobrir o total
first_page = self.get_sale_receipts_page(sale_date, start=0)
# Se token expirou na primeira requisição, renovar e tentar novamente
if first_page and first_page.get('token_expired'):
print("⚠️ Token expirado na primeira requisição. Renovando...")
if self.refresh_token():
first_page = self.get_sale_receipts_page(sale_date, start=0)
else:
print("❌ Falha ao renovar token")
return None
if not first_page:
print("❌ Erro na primeira requisição")
return None
# Extrair informações de paginação
total = first_page.get('total', 0)
first_items = first_page.get('items', [])
page_size = len(first_items) if first_items else PAGE_SIZE
print(f"📊 Informações iniciais:")
print(f" - Total de recibos: {total}")
print(f" - Itens por página: {page_size}")
# Se já temos todos os registros, retornar
if len(first_items) >= total:
print("✅ Todos os registros obtidos na primeira requisição")
return first_page
# Calcular páginas necessárias
total_pages = (total + page_size - 1) // page_size
starts = [i * page_size for i in range(total_pages)]
print(f" - Total de páginas: {total_pages}")
print(f" - Requisições paralelas: {MAX_WORKERS}")
print(f"\n📥 Iniciando download paralelo...")
# Coletar todos os items
all_items = []
failed_pages = []
token_expired_detected = False
# Buscar páginas em paralelo
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(
self.fetch_page_with_retry,
sale_date,
start,
total_pages,
(start // page_size) + 1
): start for start in starts
}
for future in as_completed(futures):
result = future.result()
if result["success"]:
all_items.extend(result["items"])
else:
failed_pages.append(result["start"])
if result.get("token_expired"):
token_expired_detected = True
# Se detectou token expirado, renovar e reprocessar páginas que falharam
if token_expired_detected and failed_pages:
print("\n🔄 Token expirado detectado. Renovando token...")
if self.refresh_token():
print(f"✅ Token renovado. Reprocessando {len(failed_pages)} páginas...")
# Retry para páginas que falharam (com token renovado se necessário)
retry_round = 0
max_retry_rounds = 10 # Limite de rodadas de retry
while failed_pages and retry_round < max_retry_rounds:
retry_round += 1
print(f"\n ⚠ Rodada de retry #{retry_round}: {len(failed_pages)} páginas falharam")
print(f" Aguardando {FINAL_RETRY_DELAY} segundos...")
time.sleep(FINAL_RETRY_DELAY)
still_failed = []
token_expired_in_retry = False
for start in failed_pages:
page_num = (start // page_size) + 1
result = self.fetch_page_with_retry(sale_date, start, total_pages, page_num)
if result["success"]:
all_items.extend(result["items"])
else:
still_failed.append(start)
if result.get("token_expired"):
token_expired_in_retry = True
# Se token expirou durante retry, renovar
if token_expired_in_retry and still_failed:
print("\n🔄 Token expirou durante retry. Renovando...")
self.refresh_token()
failed_pages = still_failed
if failed_pages:
print(f" Ainda restam {len(failed_pages)} páginas com falha. Tentando novamente...")
# Criar resposta consolidada
consolidated_response = {
"start": 0,
"count": len(all_items),
"total": total,
"items": all_items
}
print(f"\n🎉 Download paralelo concluído!")
print(f" - Total de registros obtidos: {len(all_items)}")
print(f" - Total esperado: {total}")
print(f" - Sucesso: {'' if len(all_items) == total else '⚠️'}")
if failed_pages:
print(f" - ⚠️ Páginas não recuperadas: {len(failed_pages)}")
return consolidated_response
def connect_to_database(self):
"""
Conecta ao banco de dados SQL Server
Returns:
Conexão pyodbc ou None em caso de erro
"""
# Lista de drivers para tentar em ordem de preferência
drivers = [
'ODBC Driver 18 for SQL Server',
'ODBC Driver 17 for SQL Server',
'ODBC Driver 13 for SQL Server',
'ODBC Driver 11 for SQL Server',
'SQL Server Native Client 11.0',
'SQL Server Native Client 10.0',
'SQL Server'
]
print("🔍 Verificando drivers ODBC disponíveis...")
available_drivers = pyodbc.drivers()
print(f"Drivers encontrados: {available_drivers}")
for driver in drivers:
if driver in available_drivers:
print(f"🔄 Tentando conectar com driver: {driver}")
try:
connection_string = (
f'DRIVER={{{driver}}};'
'SERVER=10.77.77.10;'
'DATABASE=GINSENG;'
'UID=supginseng;'
'PWD=Iphone2513@;'
'PORT=1433;'
'TrustServerCertificate=yes;'
'Encrypt=yes'
)
conn = pyodbc.connect(connection_string)
print(f"✅ Conexão estabelecida com sucesso usando: {driver}")
return conn
except Exception as e:
print(f"❌ Falha com {driver}: {e}")
continue
# Se nenhum driver funcionou, tentar sem TrustServerCertificate e Encrypt
print("🔄 Tentando conexão sem SSL...")
for driver in drivers:
if driver in available_drivers:
try:
connection_string = (
f'DRIVER={{{driver}}};'
'SERVER=10.77.77.10;'
'DATABASE=GINSENG;'
'UID=supginseng;'
'PWD=Iphone2513@;'
'PORT=1433'
)
conn = pyodbc.connect(connection_string)
print(f"✅ Conexão estabelecida sem SSL usando: {driver}")
return conn
except Exception as e:
print(f"❌ Falha sem SSL com {driver}: {e}")
continue
print("❌ Não foi possível conectar com nenhum driver disponível")
print("💡 Sugestão: Instale o Microsoft ODBC Driver for SQL Server")
print(" Download: https://docs.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server")
return None
def check_and_install_odbc_driver(self):
"""
Verifica se há drivers ODBC disponíveis e fornece instruções de instalação
"""
print("\n🔧 DIAGNÓSTICO DE DRIVERS ODBC")
print("="*40)
available_drivers = pyodbc.drivers()
print(f"Drivers ODBC encontrados: {len(available_drivers)}")
if available_drivers:
for i, driver in enumerate(available_drivers, 1):
print(f" {i}. {driver}")
else:
print(" ❌ Nenhum driver ODBC encontrado!")
# Verificar se há drivers SQL Server específicos
sql_server_drivers = [d for d in available_drivers if 'SQL Server' in d]
if not sql_server_drivers:
print("\n❌ Nenhum driver SQL Server encontrado!")
print("\n💡 SOLUÇÕES:")
print("1. Instalar Microsoft ODBC Driver 18 for SQL Server:")
print(" https://go.microsoft.com/fwlink/?linkid=2249006")
print("\n2. Ou executar no PowerShell como Administrador:")
print(" winget install Microsoft.ODBCDriver.18.SQLServer")
print("\n3. Ou baixar manualmente:")
print(" https://docs.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server")
return False
else:
print(f"\n✅ Encontrados {len(sql_server_drivers)} driver(s) SQL Server:")
for driver in sql_server_drivers:
print(f"{driver}")
return True
def insert_receipts_to_database(self, receipts_data: Dict, skip_duplicates: bool = True) -> bool:
"""
Insere os dados dos recibos no banco de dados SQL Server
Args:
receipts_data: Dados dos recibos retornados pela API
skip_duplicates: Se True, pula registros duplicados; se False, tenta inserir todos
Returns:
True se inserção foi bem-sucedida, False caso contrário
"""
conn = self.connect_to_database()
if not conn:
return False
try:
cursor = conn.cursor()
# Detectar a data de venda dos dados
sale_date = self.get_sale_date_from_data(receipts_data)
if not sale_date:
print("❌ Não foi possível detectar a data de venda. Abortando inserção.")
return False
# Deletar dados existentes para essa data
print(f"\n🧹 LIMPEZA DE DADOS EXISTENTES")
print("="*40)
delete_success = self.delete_existing_data_by_date(sale_date)
if not delete_success:
print("❌ Falha ao deletar dados existentes. Abortando inserção.")
return False
# Obter lista de items
items = receipts_data.get('items', [])
total_records = 0
batch_size = 1000
print(f"\n🔄 INICIANDO INSERÇÃO DE NOVOS DADOS")
print("="*40)
print(f"📅 Data: {sale_date}")
print(f"📊 Recibos para processar: {len(items)}")
print(f"📦 Processando em lotes de {batch_size} registros")
if skip_duplicates:
print("⚠️ Modo: Pular registros duplicados")
else:
print("🔄 Modo: Inserir todos os registros")
# Processar em lotes usando inserção em massa
total_batches = (len(items) + batch_size - 1) // batch_size
for batch_num in range(total_batches):
start_idx = batch_num * batch_size
end_idx = min(start_idx + batch_size, len(items))
batch_items = items[start_idx:end_idx]
print(f"\n📦 Processando lote {batch_num + 1}/{total_batches} ({len(batch_items)} recibos)")
try:
# Preparar dados do lote para inserção em massa
batch_values = self._prepare_batch_data(batch_items)
if batch_values:
# Preparar SQL para inserção em massa
fields = [
# Campos do cupom (29 campos)
'cupomId', 'cupomReceiptSequence', 'cupomCashRegisterNumber', 'cupomStoreId', 'cupomCoo',
'cupomEmployeeId', 'cupomEmployeeName', 'cupomValue', 'cupomAdditionalValue', 'cupomDiscountValue',
'cupomItemsQuantity', 'cupomUnitsQuantity', 'cupomCancelledItemsQuantity', 'cupomCancelledItemsValue',
'cupomSaleType', 'cupomCancelledUnitsQuantity', 'cupomSaleDate', 'cupomInvoiceXMLStatus',
'cupomReceiptOpeningDateTime', 'cupomReceiptClosingDateTime', 'cupomEletronicKey', 'cupomSaleOrderId',
'cupomExternalId', 'cupomDiscountReason', 'cupomLoyaltyDiscountValue', 'cupomCancellingReason',
'cupomCancelledReceiptSequence', 'cupomChannel', 'cupomChannelDescription',
# Campos do item (27 campos)
'itemId', 'itemCancelled', 'itemProductId', 'itemSellerId', 'itemSellerName', 'itemQuantity',
'itemUnitValue', 'itemGrossValue', 'itemAdditionalValue', 'itemDiscountValue', 'itemTotalValue',
'itemTabelaA', 'itemNcm', 'itemNcmExcecao', 'itemNatureza', 'itemCfop', 'itemCsosn',
'itemCstICMS', 'itemAliquotaICMS', 'itemValorReducaoAliquotaICMS', 'itemValorICMSDesonerado',
'itemValorFecop', 'itemAliquotaFecop', 'itemCstPIS', 'itemAliquotaPIS', 'itemCstCOFINS', 'itemAliquotaCOFINS',
# Campos do payment (16 campos)
'paymentId', 'paymentMethodId', 'paymentMethodDescription', 'paymentValue', 'paymentChange',
'paymentInstallmentQuantity', 'paymentCheckIssuer', 'paymentCardAuthorization', 'paymentCardFlag',
'paymentCardFlagDescription', 'paymentCardModality', 'paymentRedeAdquirente', 'paymentNsu',
'paymentAuthorizationNsu', 'paymentNsuCancelling', 'paymentCardBinNumber',
# Campos do tef (12 campos)
'teftransId', 'teftransSequential', 'teftransPaymentMethodDescription', 'teftransValue',
'teftransCardModality', 'teftransCancellingModality', 'teftransCardType', 'teftranSitefNsu',
'teftransAuthorizerHostNsu', 'teftransAuthorizationCode', 'teftransInstallmentQuantity', 'teftransFirstIntallmentDate'
]
fields_str = ', '.join([f'[{field}]' for field in fields])
placeholders = ', '.join(['?'] * len(fields))
sql = f"""
INSERT INTO [GINSENG].[dbo].[rgb_sale_receipts] (
{fields_str}
) VALUES (
{placeholders}
)
"""
# Inserção em massa usando executemany
print(f" 🚀 Inserindo {len(batch_values)} registros em massa...")
cursor.executemany(sql, batch_values)
batch_inserted = len(batch_values)
total_records += batch_inserted
print(f" ✅ Lote {batch_num + 1} concluído: {batch_inserted} registros inseridos")
else:
print(f" ⚠️ Lote {batch_num + 1} vazio - nenhum registro para inserir")
except Exception as e:
print(f" ❌ Erro no lote {batch_num + 1}: {e}")
if not skip_duplicates:
# Se não está pulando duplicatas, continuar com próximo lote
continue
else:
raise e
# Commit do lote
conn.commit()
print(f"\n🎉 INSERÇÃO EM LOTES COMPLETA!")
print(f" 📊 Total de registros inseridos: {total_records}")
print(f" 📈 Total de recibos processados: {len(items)}")
print(f" <20> Total de lotes processados: {total_batches}")
print(f" ⚡ Inserção otimizada em lotes de {batch_size} registros")
return True
except Exception as e:
print(f"❌ Erro durante inserção no banco: {e}")
conn.rollback()
return False
finally:
conn.close()
print("🔌 Conexão com banco de dados fechada")
def delete_existing_data_by_date(self, sale_date: str) -> bool:
"""
Deleta todos os registros existentes para uma data específica
Args:
sale_date: Data da venda no formato YYYY-MM-DD
Returns:
True se deleção foi bem-sucedida, False caso contrário
"""
conn = self.connect_to_database()
if not conn:
return False
try:
cursor = conn.cursor()
# Primeiro, verificar quantos registros existem para essa data
count_sql = """
SELECT COUNT(*) FROM [GINSENG].[dbo].[rgb_sale_receipts]
WHERE [cupomSaleDate] = ?
"""
cursor.execute(count_sql, (sale_date,))
existing_count = cursor.fetchone()[0]
if existing_count > 0:
print(f"🗑️ Encontrados {existing_count} registros existentes para a data {sale_date}")
print(f"🔄 Deletando registros antigos...")
# Deletar registros existentes
delete_sql = """
DELETE FROM [GINSENG].[dbo].[rgb_sale_receipts]
WHERE [cupomSaleDate] = ?
"""
cursor.execute(delete_sql, (sale_date,))
deleted_count = cursor.rowcount
# Commit da deleção
conn.commit()
print(f"{deleted_count} registros deletados com sucesso!")
return True
else:
print(f" Nenhum registro existente encontrado para a data {sale_date}")
return True
except Exception as e:
print(f"❌ Erro ao deletar dados existentes: {e}")
conn.rollback()
return False
finally:
conn.close()
def get_sale_date_from_data(self, receipts_data: Dict) -> str:
"""
Extrai e valida a data de venda dos dados dos recibos
Args:
receipts_data: Dados dos recibos retornados pela API
Returns:
Data da venda no formato YYYY-MM-DD ou None se não encontrada
"""
items = receipts_data.get('items', [])
if not items:
print("⚠️ Nenhum recibo encontrado nos dados")
return None
# Pegar a data do primeiro recibo
first_receipt = items[0]
sale_date = first_receipt.get('saleDate')
if not sale_date:
print("⚠️ Não foi possível detectar a data de venda dos dados")
return None
# Validar se todos os recibos são da mesma data
different_dates = []
for i, receipt in enumerate(items):
receipt_date = receipt.get('saleDate')
if receipt_date != sale_date:
different_dates.append(f"Recibo {i+1}: {receipt_date}")
if different_dates:
print(f"⚠️ ATENÇÃO: Encontradas datas diferentes nos recibos!")
print(f" Data principal: {sale_date}")
for diff_date in different_dates[:5]: # Mostrar apenas os primeiros 5
print(f" {diff_date}")
if len(different_dates) > 5:
print(f" ... e mais {len(different_dates) - 5} recibos com datas diferentes")
print(f"🔄 Usando data principal: {sale_date}")
print(f"📅 Data de venda detectada: {sale_date}")
return sale_date
def _prepare_batch_data(self, receipts_batch: List[Dict]) -> List[List]:
"""
Prepara dados de um lote de recibos para inserção em massa
Args:
receipts_batch: Lista de recibos para processar
Returns:
Lista de listas com valores para inserção
"""
batch_values = []
for receipt in receipts_batch:
# Processar cada recibo e extrair todos os registros (linhas)
receipt_records = self._extract_receipt_records(receipt)
batch_values.extend(receipt_records)
return batch_values
def _extract_receipt_records(self, receipt: Dict) -> List[List]:
"""
Extrai todos os registros (linhas) de um recibo
Cada item vira uma linha separada
"""
records = []
# Dados do cupom (serão repetidos para cada item)
cupom_data = self._extract_cupom_data(receipt)
# Dados consolidados de payments e tef
consolidated_payment_data = self._consolidate_payments(receipt.get('payments', []))
consolidated_tef_data = self._consolidate_tef_transactions(receipt.get('tefTransactionItems', []))
# Processar items
items = receipt.get('items', [])
if not items:
# Se não há items, criar uma linha apenas com dados do cupom
record_values = self._build_record_values(cupom_data, None, consolidated_payment_data, consolidated_tef_data)
records.append(record_values)
else:
# Para cada item, criar uma linha
for item in items:
item_data = self._extract_item_data(item)
record_values = self._build_record_values(cupom_data, item_data, consolidated_payment_data, consolidated_tef_data)
records.append(record_values)
return records
def _extract_cupom_data(self, receipt: Dict) -> Dict:
"""Extrai dados do cupom"""
return {
'cupomId': receipt.get('id'),
'cupomReceiptSequence': receipt.get('receiptSequence'),
'cupomCashRegisterNumber': receipt.get('cashRegisterNumber'),
'cupomStoreId': receipt.get('storeId'),
'cupomCoo': receipt.get('coo'),
'cupomEmployeeId': receipt.get('employeeId'),
'cupomEmployeeName': receipt.get('employeeName'),
'cupomValue': receipt.get('value'),
'cupomAdditionalValue': receipt.get('additionalValue'),
'cupomDiscountValue': receipt.get('discountValue'),
'cupomItemsQuantity': receipt.get('itemsQuantity'),
'cupomUnitsQuantity': receipt.get('unitsQuantity'),
'cupomCancelledItemsQuantity': receipt.get('cancelledItemsQuantity'),
'cupomCancelledItemsValue': receipt.get('cancelledItemsValue'),
'cupomSaleType': receipt.get('saleType'),
'cupomCancelledUnitsQuantity': receipt.get('cancelledUnitsQuantity'),
'cupomSaleDate': receipt.get('saleDate'),
'cupomInvoiceXMLStatus': receipt.get('invoiceXMLStatus'),
'cupomReceiptOpeningDateTime': receipt.get('receiptOpeningDateTime'),
'cupomReceiptClosingDateTime': receipt.get('receiptClosingDateTime'),
'cupomEletronicKey': receipt.get('eletronicKey'),
'cupomSaleOrderId': receipt.get('saleOrderId'),
'cupomExternalId': receipt.get('externalId'),
'cupomDiscountReason': receipt.get('discountReason'),
'cupomLoyaltyDiscountValue': receipt.get('loyaltyDiscountValue'),
'cupomCancellingReason': receipt.get('cancellingReason'),
'cupomCancelledReceiptSequence': receipt.get('cancelledReceiptSequence'),
'cupomChannel': receipt.get('channel'),
'cupomChannelDescription': receipt.get('channelDescription')
}
def _extract_item_data(self, item: Dict) -> Dict:
"""Extrai dados do item"""
return {
'itemId': item.get('id'),
'itemCancelled': item.get('cancelled'),
'itemProductId': item.get('productId'),
'itemSellerId': item.get('sellerId'),
'itemSellerName': item.get('sellerName'),
'itemQuantity': item.get('quantity'),
'itemUnitValue': item.get('unitValue'),
'itemGrossValue': item.get('grossValue'),
'itemAdditionalValue': item.get('additionalValue'),
'itemDiscountValue': item.get('discountValue'),
'itemTotalValue': item.get('totalValue'),
'itemTabelaA': item.get('tabelaA'),
'itemNcm': item.get('ncm'),
'itemNcmExcecao': item.get('ncmExcecao'),
'itemNatureza': item.get('natureza'),
'itemCfop': item.get('cfop'),
'itemCsosn': item.get('csosn'),
'itemCstICMS': item.get('cstICMS'),
'itemAliquotaICMS': item.get('aliquotaICMS'),
'itemValorReducaoAliquotaICMS': item.get('valorReducaoAliquotaICMS'),
'itemValorICMSDesonerado': item.get('valorICMSDesonerado'),
'itemValorFecop': item.get('valorFecop'),
'itemAliquotaFecop': item.get('aliquotaFecop'),
'itemCstPIS': item.get('cstPIS'),
'itemAliquotaPIS': item.get('aliquotaPIS'),
'itemCstCOFINS': item.get('cstCOFINS'),
'itemAliquotaCOFINS': item.get('aliquotaCOFINS')
}
def _build_record_values(self, cupom_data: Dict, item_data: Dict, payment_data: Dict, tef_data: Dict) -> List:
"""
Constrói uma lista de valores na ordem correta para inserção
"""
values = []
# Campos do cupom (29 valores)
cupom_fields = [
'cupomId', 'cupomReceiptSequence', 'cupomCashRegisterNumber', 'cupomStoreId', 'cupomCoo',
'cupomEmployeeId', 'cupomEmployeeName', 'cupomValue', 'cupomAdditionalValue', 'cupomDiscountValue',
'cupomItemsQuantity', 'cupomUnitsQuantity', 'cupomCancelledItemsQuantity', 'cupomCancelledItemsValue',
'cupomSaleType', 'cupomCancelledUnitsQuantity', 'cupomSaleDate', 'cupomInvoiceXMLStatus',
'cupomReceiptOpeningDateTime', 'cupomReceiptClosingDateTime', 'cupomEletronicKey', 'cupomSaleOrderId',
'cupomExternalId', 'cupomDiscountReason', 'cupomLoyaltyDiscountValue', 'cupomCancellingReason',
'cupomCancelledReceiptSequence', 'cupomChannel', 'cupomChannelDescription'
]
for field in cupom_fields:
values.append(cupom_data.get(field) if cupom_data else None)
# Campos do item (27 valores)
item_fields = [
'itemId', 'itemCancelled', 'itemProductId', 'itemSellerId', 'itemSellerName', 'itemQuantity',
'itemUnitValue', 'itemGrossValue', 'itemAdditionalValue', 'itemDiscountValue', 'itemTotalValue',
'itemTabelaA', 'itemNcm', 'itemNcmExcecao', 'itemNatureza', 'itemCfop', 'itemCsosn',
'itemCstICMS', 'itemAliquotaICMS', 'itemValorReducaoAliquotaICMS', 'itemValorICMSDesonerado',
'itemValorFecop', 'itemAliquotaFecop', 'itemCstPIS', 'itemAliquotaPIS', 'itemCstCOFINS', 'itemAliquotaCOFINS'
]
for field in item_fields:
values.append(item_data.get(field) if item_data else None)
# Campos do pagamento (16 valores)
payment_fields = [
'paymentId', 'paymentMethodId', 'paymentMethodDescription', 'paymentValue', 'paymentChange',
'paymentInstallmentQuantity', 'paymentCheckIssuer', 'paymentCardAuthorization', 'paymentCardFlag',
'paymentCardFlagDescription', 'paymentCardModality', 'paymentRedeAdquirente', 'paymentNsu',
'paymentAuthorizationNsu', 'paymentNsuCancelling', 'paymentCardBinNumber'
]
for field in payment_fields:
values.append(payment_data.get(field) if payment_data else None)
# Campos do TEF (12 valores)
tef_fields = [
'teftransId', 'teftransSequential', 'teftransPaymentMethodDescription', 'teftransValue',
'teftransCardModality', 'teftransCancellingModality', 'teftransCardType', 'teftranSitefNsu',
'teftransAuthorizerHostNsu', 'teftransAuthorizationCode', 'teftransInstallmentQuantity', 'teftransFirstIntallmentDate'
]
for field in tef_fields:
values.append(tef_data.get(field) if tef_data else None)
return values
def _insert_receipt_data(self, cursor, receipt: Dict, skip_duplicates: bool = True) -> Dict:
"""
Insere dados de um recibo específico - UMA LINHA POR ITEM
Cada item gera uma linha separada com os dados do cupom repetidos
Args:
cursor: Cursor do banco de dados
receipt: Dados de um recibo específico
skip_duplicates: Se True, pula registros duplicados
Returns:
Dict com estatísticas: {'inserted': int, 'duplicates': int}
"""
records_inserted = 0
duplicates_skipped = 0
# Dados do cupom (receipt) - serão repetidos para cada item
cupom_data = {
'cupomId': receipt.get('id'),
'cupomReceiptSequence': receipt.get('receiptSequence'),
'cupomCashRegisterNumber': receipt.get('cashRegisterNumber'),
'cupomStoreId': receipt.get('storeId'),
'cupomCoo': receipt.get('coo'),
'cupomEmployeeId': receipt.get('employeeId'),
'cupomEmployeeName': receipt.get('employeeName'),
'cupomValue': receipt.get('value'),
'cupomAdditionalValue': receipt.get('additionalValue'),
'cupomDiscountValue': receipt.get('discountValue'),
'cupomItemsQuantity': receipt.get('itemsQuantity'),
'cupomUnitsQuantity': receipt.get('unitsQuantity'),
'cupomCancelledItemsQuantity': receipt.get('cancelledItemsQuantity'),
'cupomCancelledItemsValue': receipt.get('cancelledItemsValue'),
'cupomSaleType': receipt.get('saleType'),
'cupomCancelledUnitsQuantity': receipt.get('cancelledUnitsQuantity'),
'cupomSaleDate': receipt.get('saleDate'),
'cupomInvoiceXMLStatus': receipt.get('invoiceXMLStatus'),
'cupomReceiptOpeningDateTime': receipt.get('receiptOpeningDateTime'),
'cupomReceiptClosingDateTime': receipt.get('receiptClosingDateTime'),
'cupomEletronicKey': receipt.get('eletronicKey'),
'cupomSaleOrderId': receipt.get('saleOrderId'),
'cupomExternalId': receipt.get('externalId'),
'cupomDiscountReason': receipt.get('discountReason'),
'cupomLoyaltyDiscountValue': receipt.get('loyaltyDiscountValue'),
'cupomCancellingReason': receipt.get('cancellingReason'),
'cupomCancelledReceiptSequence': receipt.get('cancelledReceiptSequence'),
'cupomChannel': receipt.get('channel'),
'cupomChannelDescription': receipt.get('channelDescription')
}
# Preparar dados de payments e tef (serão repetidos para cada item)
payments = receipt.get('payments', [])
tef_transactions = receipt.get('tefTransactionItems', [])
# Preparar dados consolidados de payments (todos os payments em uma estrutura)
consolidated_payment_data = self._consolidate_payments(payments)
consolidated_tef_data = self._consolidate_tef_transactions(tef_transactions)
# Processar items - CADA ITEM = UMA LINHA
items = receipt.get('items', [])
if not items:
# Se não há items, inserir apenas dados do cupom (cupom vazio)
result = self._insert_single_record(
cursor, cupom_data, None, consolidated_payment_data, consolidated_tef_data, skip_duplicates
)
if result == 1:
records_inserted += 1
elif result == -1:
duplicates_skipped += 1
else:
# Para cada item, criar UMA LINHA com cupom + item + todos os payments + todos os tef
for item in items:
item_data = {
'itemId': item.get('id'),
'itemCancelled': item.get('cancelled'),
'itemProductId': item.get('productId'),
'itemSellerId': item.get('sellerId'),
'itemSellerName': item.get('sellerName'),
'itemQuantity': item.get('quantity'),
'itemUnitValue': item.get('unitValue'),
'itemGrossValue': item.get('grossValue'),
'itemAdditionalValue': item.get('additionalValue'),
'itemDiscountValue': item.get('discountValue'),
'itemTotalValue': item.get('totalValue'),
'itemTabelaA': item.get('tabelaA'),
'itemNcm': item.get('ncm'),
'itemNcmExcecao': item.get('ncmExcecao'),
'itemNatureza': item.get('natureza'),
'itemCfop': item.get('cfop'),
'itemCsosn': item.get('csosn'),
'itemCstICMS': item.get('cstICMS'),
'itemAliquotaICMS': item.get('aliquotaICMS'),
'itemValorReducaoAliquotaICMS': item.get('valorReducaoAliquotaICMS'),
'itemValorICMSDesonerado': item.get('valorICMSDesonerado'),
'itemValorFecop': item.get('valorFecop'),
'itemAliquotaFecop': item.get('aliquotaFecop'),
'itemCstPIS': item.get('cstPIS'),
'itemAliquotaPIS': item.get('aliquotaPIS'),
'itemCstCOFINS': item.get('cstCOFINS'),
'itemAliquotaCOFINS': item.get('aliquotaCOFINS')
}
# Inserir UMA linha por item (com dados do cupom, payments e tef repetidos)
result = self._insert_single_record(
cursor, cupom_data, item_data, consolidated_payment_data, consolidated_tef_data, skip_duplicates
)
if result == 1:
records_inserted += 1
elif result == -1:
duplicates_skipped += 1
return {
'inserted': records_inserted,
'duplicates': duplicates_skipped
}
def _consolidate_payments(self, payments: List[Dict]) -> Dict:
"""
Consolida múltiplos payments em uma estrutura única
Se houver múltiplos payments, pega o primeiro ou consolida conforme necessário
"""
if not payments:
return None
# Por enquanto, vamos pegar apenas o primeiro payment
# Você pode ajustar esta lógica conforme necessário
first_payment = payments[0]
return {
'paymentId': first_payment.get('id'),
'paymentMethodId': first_payment.get('paymentMethodId'),
'paymentMethodDescription': first_payment.get('paymentMethodDescription'),
'paymentValue': first_payment.get('value'),
'paymentChange': first_payment.get('change'),
'paymentInstallmentQuantity': first_payment.get('installmentQuantity'),
'paymentCheckIssuer': first_payment.get('checkIssuer'),
'paymentCardAuthorization': first_payment.get('cardAuthorization'),
'paymentCardFlag': first_payment.get('cardFlag'),
'paymentCardFlagDescription': first_payment.get('cardFlagDescription'),
'paymentCardModality': first_payment.get('cardModality'),
'paymentRedeAdquirente': first_payment.get('redeAdquirente'),
'paymentNsu': first_payment.get('nsu'),
'paymentAuthorizationNsu': first_payment.get('authorizationNsu'),
'paymentNsuCancelling': first_payment.get('nsuCancelling'),
'paymentCardBinNumber': first_payment.get('cardBinNumber')
}
def _consolidate_tef_transactions(self, tef_transactions: List[Dict]) -> Dict:
"""
Consolida múltiplas transações TEF em uma estrutura única
Se houver múltiplas transações, pega a primeira ou consolida conforme necessário
"""
if not tef_transactions:
return None
# Por enquanto, vamos pegar apenas a primeira transação TEF
# Você pode ajustar esta lógica conforme necessário
first_tef = tef_transactions[0]
return {
'teftransId': first_tef.get('id'),
'teftransSequential': first_tef.get('sequential'),
'teftransPaymentMethodDescription': first_tef.get('paymentMethodDescription'),
'teftransValue': first_tef.get('value'),
'teftransCardModality': first_tef.get('cardModality'),
'teftransCancellingModality': first_tef.get('cancellingModality'),
'teftransCardType': first_tef.get('cardType'),
'teftranSitefNsu': first_tef.get('sitefNsu'),
'teftransAuthorizerHostNsu': first_tef.get('authorizerHostNsu'),
'teftransAuthorizationCode': first_tef.get('authorizationCode'),
'teftransInstallmentQuantity': first_tef.get('installmentQuantity'),
'teftransFirstIntallmentDate': first_tef.get('firstIntallmentDate')
}
def _insert_single_record(self, cursor, cupom_data: Dict, item_data: Dict = None,
payment_data: Dict = None, tef_data: Dict = None, skip_duplicates: bool = True) -> int:
"""
Insere um único registro na tabela rgb_sale_receipts
Verifica se já existe antes de inserir para evitar duplicatas
Args:
cursor: Cursor do banco de dados
cupom_data: Dados do cupom
item_data: Dados do item (opcional)
payment_data: Dados do pagamento (opcional)
tef_data: Dados da transação TEF (opcional)
skip_duplicates: Se True, pula duplicatas; se False, tenta inserir
Returns:
1 se inserção foi bem-sucedida
-1 se registro já existe e foi pulado
0 se houve erro na inserção
"""
try:
# Verificar se o registro já existe (baseado na restrição UNIQUE) apenas se skip_duplicates=True
if skip_duplicates:
cupom_id = cupom_data.get('cupomId') if cupom_data else None
item_id = item_data.get('itemId') if item_data else None
if cupom_id and item_id:
# Verificar se já existe a combinação cupomId + itemId
check_sql = """
SELECT COUNT(*) FROM [GINSENG].[dbo].[rgb_sale_receipts]
WHERE [cupomId] = ? AND [itemId] = ?
"""
cursor.execute(check_sql, (cupom_id, item_id))
exists = cursor.fetchone()[0] > 0
if exists:
print(f"⚠️ Registro já existe: cupomId={cupom_id}, itemId={item_id} - Pulando...")
return -1 # Código para "duplicata pulada"
elif cupom_id:
# Se não há itemId, verificar apenas por cupomId (para cupons sem items)
check_sql = """
SELECT COUNT(*) FROM [GINSENG].[dbo].[rgb_sale_receipts]
WHERE [cupomId] = ? AND [itemId] IS NULL
"""
cursor.execute(check_sql, (cupom_id,))
exists = cursor.fetchone()[0] > 0
if exists:
print(f"⚠️ Cupom sem item já existe: cupomId={cupom_id} - Pulando...")
return -1 # Código para "duplicata pulada"
# Definir campos da tabela (baseado na estrutura fornecida)
fields = [
# Campos do cupom (29 campos)
'cupomId', 'cupomReceiptSequence', 'cupomCashRegisterNumber', 'cupomStoreId', 'cupomCoo',
'cupomEmployeeId', 'cupomEmployeeName', 'cupomValue', 'cupomAdditionalValue', 'cupomDiscountValue',
'cupomItemsQuantity', 'cupomUnitsQuantity', 'cupomCancelledItemsQuantity', 'cupomCancelledItemsValue',
'cupomSaleType', 'cupomCancelledUnitsQuantity', 'cupomSaleDate', 'cupomInvoiceXMLStatus',
'cupomReceiptOpeningDateTime', 'cupomReceiptClosingDateTime', 'cupomEletronicKey', 'cupomSaleOrderId',
'cupomExternalId', 'cupomDiscountReason', 'cupomLoyaltyDiscountValue', 'cupomCancellingReason',
'cupomCancelledReceiptSequence', 'cupomChannel', 'cupomChannelDescription',
# Campos do item (27 campos)
'itemId', 'itemCancelled', 'itemProductId', 'itemSellerId', 'itemSellerName', 'itemQuantity',
'itemUnitValue', 'itemGrossValue', 'itemAdditionalValue', 'itemDiscountValue', 'itemTotalValue',
'itemTabelaA', 'itemNcm', 'itemNcmExcecao', 'itemNatureza', 'itemCfop', 'itemCsosn',
'itemCstICMS', 'itemAliquotaICMS', 'itemValorReducaoAliquotaICMS', 'itemValorICMSDesonerado',
'itemValorFecop', 'itemAliquotaFecop', 'itemCstPIS', 'itemAliquotaPIS', 'itemCstCOFINS', 'itemAliquotaCOFINS',
# Campos do payment (16 campos)
'paymentId', 'paymentMethodId', 'paymentMethodDescription', 'paymentValue', 'paymentChange',
'paymentInstallmentQuantity', 'paymentCheckIssuer', 'paymentCardAuthorization', 'paymentCardFlag',
'paymentCardFlagDescription', 'paymentCardModality', 'paymentRedeAdquirente', 'paymentNsu',
'paymentAuthorizationNsu', 'paymentNsuCancelling', 'paymentCardBinNumber',
# Campos do tef (12 campos)
'teftransId', 'teftransSequential', 'teftransPaymentMethodDescription', 'teftransValue',
'teftransCardModality', 'teftransCancellingModality', 'teftransCardType', 'teftranSitefNsu',
'teftransAuthorizerHostNsu', 'teftransAuthorizationCode', 'teftransInstallmentQuantity', 'teftransFirstIntallmentDate'
]
# Total: 29 + 27 + 16 + 12 = 84 campos
# Nota: Não incluímos o campo 'id' pois pode ser auto-incremento
total_fields = len(fields)
# Montar o SQL dinamicamente
fields_str = ', '.join([f'[{field}]' for field in fields])
placeholders = ', '.join(['?'] * total_fields)
sql = f"""
INSERT INTO [GINSENG].[dbo].[rgb_sale_receipts] (
{fields_str}
) VALUES (
{placeholders}
)
"""
print(f"🔧 SQL preparado com {total_fields} campos e {total_fields} placeholders")
# Preparar valores para inserção na ordem exata dos campos
values = []
# Dados do cupom (29 valores)
cupom_fields = [
'cupomId', 'cupomReceiptSequence', 'cupomCashRegisterNumber', 'cupomStoreId', 'cupomCoo',
'cupomEmployeeId', 'cupomEmployeeName', 'cupomValue', 'cupomAdditionalValue', 'cupomDiscountValue',
'cupomItemsQuantity', 'cupomUnitsQuantity', 'cupomCancelledItemsQuantity', 'cupomCancelledItemsValue',
'cupomSaleType', 'cupomCancelledUnitsQuantity', 'cupomSaleDate', 'cupomInvoiceXMLStatus',
'cupomReceiptOpeningDateTime', 'cupomReceiptClosingDateTime', 'cupomEletronicKey', 'cupomSaleOrderId',
'cupomExternalId', 'cupomDiscountReason', 'cupomLoyaltyDiscountValue', 'cupomCancellingReason',
'cupomCancelledReceiptSequence', 'cupomChannel', 'cupomChannelDescription'
]
for field in cupom_fields:
values.append(cupom_data.get(field) if cupom_data else None)
# Dados do item (27 valores)
item_fields = [
'itemId', 'itemCancelled', 'itemProductId', 'itemSellerId', 'itemSellerName', 'itemQuantity',
'itemUnitValue', 'itemGrossValue', 'itemAdditionalValue', 'itemDiscountValue', 'itemTotalValue',
'itemTabelaA', 'itemNcm', 'itemNcmExcecao', 'itemNatureza', 'itemCfop', 'itemCsosn',
'itemCstICMS', 'itemAliquotaICMS', 'itemValorReducaoAliquotaICMS', 'itemValorICMSDesonerado',
'itemValorFecop', 'itemAliquotaFecop', 'itemCstPIS', 'itemAliquotaPIS', 'itemCstCOFINS', 'itemAliquotaCOFINS'
]
for field in item_fields:
values.append(item_data.get(field) if item_data else None)
# Dados do pagamento (16 valores)
payment_fields = [
'paymentId', 'paymentMethodId', 'paymentMethodDescription', 'paymentValue', 'paymentChange',
'paymentInstallmentQuantity', 'paymentCheckIssuer', 'paymentCardAuthorization', 'paymentCardFlag',
'paymentCardFlagDescription', 'paymentCardModality', 'paymentRedeAdquirente', 'paymentNsu',
'paymentAuthorizationNsu', 'paymentNsuCancelling', 'paymentCardBinNumber'
]
for field in payment_fields:
values.append(payment_data.get(field) if payment_data else None)
# Dados do TEF (12 valores)
tef_fields = [
'teftransId', 'teftransSequential', 'teftransPaymentMethodDescription', 'teftransValue',
'teftransCardModality', 'teftransCancellingModality', 'teftransCardType', 'teftranSitefNsu',
'teftransAuthorizerHostNsu', 'teftransAuthorizationCode', 'teftransInstallmentQuantity', 'teftransFirstIntallmentDate'
]
for field in tef_fields:
values.append(tef_data.get(field) if tef_data else None)
# Verificar se temos exatamente 84 valores
if len(values) != total_fields:
print(f"❌ ERRO: Esperado {total_fields} valores, mas temos {len(values)}")
return 0
print(f"✅ Preparados {len(values)} valores para inserção")
# Executar inserção
cursor.execute(sql, values)
return 1
except Exception as e:
print(f"❌ Erro ao inserir registro: {e}")
return 0
def get_bearer_token_from_database():
"""
Busca o token de autenticação do banco de dados
Returns:
str: Token de autenticação ou None em caso de erro
"""
# Lista de drivers para tentar em ordem de preferência
drivers = [
'ODBC Driver 18 for SQL Server',
'ODBC Driver 17 for SQL Server',
'ODBC Driver 13 for SQL Server',
'ODBC Driver 11 for SQL Server',
'SQL Server Native Client 11.0',
'SQL Server Native Client 10.0',
'SQL Server'
]
print("🔍 Buscando token de autenticação no banco de dados...")
available_drivers = pyodbc.drivers()
for driver in drivers:
if driver in available_drivers:
try:
connection_string = (
f'DRIVER={{{driver}}};'
'SERVER=10.77.77.10;'
'DATABASE=GINSENG;'
'UID=supginseng;'
'PWD=Iphone2513@;'
'PORT=1433;'
'TrustServerCertificate=yes;'
'Encrypt=yes'
)
conn = pyodbc.connect(connection_string)
cursor = conn.cursor()
# Buscar o token da tabela dbo.rgb_token com id = 1
query = "SELECT token FROM dbo.rgb_token WHERE id = 1"
cursor.execute(query)
result = cursor.fetchone()
conn.close()
if result:
print("✅ Token obtido do banco de dados com sucesso!")
return result[0]
else:
print("❌ Token não encontrado na tabela dbo.rgb_token com id = 1")
return None
except Exception as e:
print(f"❌ Erro ao buscar token com {driver}: {e}")
continue
# Se nenhum driver funcionou, tentar sem SSL
print("🔄 Tentando buscar token sem SSL...")
for driver in drivers:
if driver in available_drivers:
try:
connection_string = (
f'DRIVER={{{driver}}};'
'SERVER=10.77.77.10;'
'DATABASE=GINSENG;'
'UID=supginseng;'
'PWD=Iphone2513@;'
'PORT=1433'
)
conn = pyodbc.connect(connection_string)
cursor = conn.cursor()
# Buscar o token da tabela dbo.rgb_token com id = 1
query = "SELECT token FROM dbo.rgb_token WHERE id = 1"
cursor.execute(query)
result = cursor.fetchone()
conn.close()
if result:
print("✅ Token obtido do banco de dados com sucesso!")
return result[0]
else:
print("❌ Token não encontrado na tabela dbo.rgb_token com id = 1")
return None
except Exception as e:
print(f"❌ Erro ao buscar token sem SSL com {driver}: {e}")
continue
print("❌ Não foi possível conectar ao banco para buscar o token")
return None
def main():
"""Função principal do script"""
# Buscar token de autenticação do banco de dados
print("🔐 Obtendo token de autenticação...")
BEARER_TOKEN = get_bearer_token_from_database()
if not BEARER_TOKEN:
print("❌ Erro: Não foi possível obter o token de autenticação do banco de dados")
print("💡 Verifique se:")
print(" - A tabela dbo.rgb_token existe")
print(" - Existe um registro com id = 1")
print(" - A coluna 'token' contém um valor válido")
return
# ========================================
# CONFIGURAÇÃO DE DATA
# ========================================
# USE_DATE_RANGE:
# True = Processar intervalo de datas (START_DATE até END_DATE)
# False = Usar data única (ontem ou MANUAL_DATE)
USE_DATE_RANGE = False
# Intervalo de datas (usado quando USE_DATE_RANGE = True)
START_DATE = "2026-01-22"
END_DATE = "2026-01-22"
# USE_MANUAL_DATE:
# True = Usar MANUAL_DATE
# False = Usar data de ONTEM automaticamente
USE_MANUAL_DATE = False
MANUAL_DATE = "2025-09-25"
# Criar cliente da API
api_client = BoticarioAPI(BEARER_TOKEN)
# Verificar drivers ODBC antes de começar
has_drivers = api_client.check_and_install_odbc_driver()
if not has_drivers:
print("\n⚠️ Pulando inserção no banco de dados devido à falta de drivers ODBC")
print(" Execute as instruções acima para instalar o driver e execute o script novamente")
return
if USE_DATE_RANGE:
# ========================================
# MODO INTERVALO DE DATAS
# ========================================
from datetime import timedelta
start_dt = datetime.strptime(START_DATE, "%Y-%m-%d")
end_dt = datetime.strptime(END_DATE, "%Y-%m-%d")
total_days = (end_dt - start_dt).days + 1
print("\n" + "="*60)
print("🚀 SCRIPT DE ACESSO À API DO GRUPO BOTICÁRIO")
print("="*60)
print(f"📅 MODO: Intervalo de datas")
print(f"📅 De: {START_DATE} até {END_DATE}")
print(f"📅 Total de dias: {total_days}")
print("="*60)
# Estatísticas globais
total_stats = {
"dias_processados": 0,
"dias_com_erro": 0,
"total_recibos": 0,
"total_inseridos": 0
}
current_dt = start_dt
day_num = 0
while current_dt <= end_dt:
day_num += 1
SALE_DATE = current_dt.strftime("%Y-%m-%d")
print(f"\n{'='*60}")
print(f"📆 DIA {day_num}/{total_days}: {SALE_DATE}")
print(f"{'='*60}")
# Buscar TODOS os recibos (com paginação automática)
receipts_data = api_client.get_all_sale_receipts(SALE_DATE)
if receipts_data:
items = receipts_data.get('items', [])
total_stats["total_recibos"] += len(items)
print(f"{len(items)} recibos obtidos")
if len(items) > 0:
# Inserir dados no banco de dados
success = api_client.insert_receipts_to_database(receipts_data, skip_duplicates=False)
if success:
total_stats["dias_processados"] += 1
print(f"✅ Dados inseridos com sucesso!")
else:
total_stats["dias_com_erro"] += 1
print(f"❌ Erro ao inserir dados")
else:
total_stats["dias_processados"] += 1
print(f" Nenhum recibo para esta data")
else:
total_stats["dias_com_erro"] += 1
print(f"❌ Erro ao buscar dados da API")
# Próximo dia
current_dt += timedelta(days=1)
# Resumo final
print(f"\n{'='*60}")
print("🎉 PROCESSAMENTO DO INTERVALO CONCLUÍDO!")
print(f"{'='*60}")
print(f"📅 Período: {START_DATE} até {END_DATE}")
print(f"📊 Dias processados: {total_stats['dias_processados']}/{total_days}")
print(f"❌ Dias com erro: {total_stats['dias_com_erro']}")
print(f"📦 Total de recibos: {total_stats['total_recibos']}")
print(f"{'='*60}")
else:
# ========================================
# MODO DATA ÚNICA (comportamento original)
# ========================================
from datetime import timedelta
if USE_MANUAL_DATE:
SALE_DATE = MANUAL_DATE
print("📅 CONFIGURAÇÃO: Data manual ativada")
print(f"📅 Data selecionada: {SALE_DATE}")
else:
# Usar data de ONTEM
SALE_DATE = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
print("📅 CONFIGURAÇÃO: Data de ontem automática")
print(f"📅 Data selecionada: {SALE_DATE} (ontem)")
print("\n" + "="*60)
print("🚀 SCRIPT DE ACESSO À API DO GRUPO BOTICÁRIO")
print("="*60)
print(f"🎯 Buscando recibos de vendas para a data: {SALE_DATE}")
print("-" * 60)
# Buscar TODOS os recibos (com paginação automática)
receipts_data = api_client.get_all_sale_receipts(SALE_DATE)
if receipts_data:
print("\n✅ Dados obtidos com sucesso!")
if isinstance(receipts_data, dict):
print(f"\n📊 Resumo dos dados obtidos da API:")
print(f" - Start: {receipts_data.get('start', 'N/A')}")
print(f" - Count: {receipts_data.get('count', 'N/A')}")
print(f" - Total: {receipts_data.get('total', 'N/A')}")
items = receipts_data.get('items', [])
print(f" - Items obtidos: {len(items)}")
if receipts_data.get('total', 0) == len(items):
print(" - Status: ✅ Todos os registros obtidos da API")
else:
print(" - Status: ⚠️ Alguns registros podem estar faltando")
print(" - Destino: 🗄️ Direto para o banco de dados (sem arquivo local)")
# Inserir dados no banco de dados
print("\n" + "="*60)
print("🗄️ PROCESSAMENTO DO BANCO DE DADOS")
print("="*60)
print("🔄 Estratégia: Limpar dados existentes + Inserir novos dados")
print("📦 Inserção otimizada em lotes de 1000 registros")
success = api_client.insert_receipts_to_database(receipts_data, skip_duplicates=False)
if success:
print("\n🎉 PROCESSO COMPLETO!")
print("✅ Dados antigos removidos e novos dados inseridos com sucesso!")
else:
print("\n❌ PROCESSO FALHOU!")
print("❌ Falha durante o processamento do banco de dados")
else:
print("\n❌ Não foi possível obter os dados da API")
if __name__ == "__main__":
main()