G-Scripts/Grgb_sale_receipts.py
daniel.rodrigues 1b24655d64 att
2026-02-03 10:22:25 -03:00

574 lines
20 KiB
Python

import requests
import pyodbc
from datetime import datetime, timedelta
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
from threading import Thread, Event, Lock
# =====================================================
# CONFIGURAÇÕES DO BANCO DE DADOS SQL SERVER
# =====================================================
CONNECTION_STRING = (
"DRIVER={ODBC Driver 18 for SQL Server};"
"SERVER=10.77.77.10;"
"DATABASE=GINSENG;"
"UID=supginseng;"
"PWD=Iphone2513@;"
"PORT=1433;"
"TrustServerCertificate=yes"
)
# =====================================================
# CONFIGURAÇÕES DAS APIs
# =====================================================
API_TOKEN_URL = "https://api.grupoginseng.com.br/api/rgb_token"
API_VENDAS_URL = "https://api.grupoboticario.com.br/global/v1/franchising/gb-stores-data/sale/receipts"
# =====================================================
# CONFIGURAÇÕES DE PARALELISMO
# =====================================================
MAX_WORKERS = 3 # Número de requisições simultâneas (reduzido para evitar 429)
PAGE_SIZE = 50 # Itens por página
MAX_RETRIES = 5 # Tentativas em caso de falha
RETRY_DELAY = 3 # Segundos entre tentativas
RATE_LIMIT_DELAY = 5 # Segundos extras para erro 429 (Too Many Requests)
REQUEST_DELAY = 0.5 # Delay entre cada requisição (respeitar rate limit)
# =====================================================
# GERENCIADOR DE TOKEN (com renovação automática)
# =====================================================
class TokenManager:
"""Gerencia o token com renovação automática quando expira"""
def __init__(self):
self._token = None
self._lock = Lock()
self._ultimo_refresh = None
def _buscar_novo_token(self):
"""Busca um novo token da API"""
try:
response = requests.get(API_TOKEN_URL, timeout=30)
response.raise_for_status()
data = response.json()
if data.get('success') and data.get('data'):
return data['data'][0]['token']
except Exception as e:
print(f" ✗ Erro ao buscar token: {e}")
return None
def obter_token(self, forcar_refresh=False):
"""Obtém o token atual ou busca um novo se necessário"""
with self._lock:
if self._token is None or forcar_refresh:
print(" 🔑 Obtendo novo token..." if forcar_refresh else "", end="")
novo_token = self._buscar_novo_token()
if novo_token:
self._token = novo_token
self._ultimo_refresh = datetime.now()
if forcar_refresh:
print("")
return self._token
else:
return None
return self._token
def renovar_token(self):
"""Força a renovação do token"""
return self.obter_token(forcar_refresh=True)
# Instância global do gerenciador de token
token_manager = TokenManager()
def obter_token():
"""Obtém o token de autenticação da API"""
token = token_manager.obter_token()
if token:
print(f"✓ Token obtido com sucesso")
else:
print("✗ Erro: Não foi possível obter o token")
return token
def buscar_pagina(data_venda, start, count, tentativa=1):
"""Busca uma página específica da API com retry automático e renovação de token"""
token = token_manager.obter_token()
# Pequeno delay antes de cada requisição para respeitar rate limit
time.sleep(REQUEST_DELAY)
try:
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
params = {
'receipt.saleDate': data_venda,
'start': start,
'count': count
}
response = requests.get(API_VENDAS_URL, headers=headers, params=params, timeout=30)
# Verifica se o token expirou (401 Unauthorized)
if response.status_code == 401:
print(f" 🔄 Token expirado, renovando...")
token_manager.renovar_token()
if tentativa < MAX_RETRIES:
return buscar_pagina(data_venda, start, count, tentativa + 1)
# Erro 429 - Too Many Requests (rate limit)
if response.status_code == 429:
if tentativa < MAX_RETRIES:
wait_time = RATE_LIMIT_DELAY * tentativa # Aumenta espera progressivamente
print(f" ⏳ Rate limit (429), aguardando {wait_time}s...")
time.sleep(wait_time)
return buscar_pagina(data_venda, start, count, tentativa + 1)
response.raise_for_status()
data = response.json()
# Log de sucesso após retry
if tentativa > 1:
print(f" ✓ Página {start} OK (após {tentativa-1} retry)")
return {
'start': start,
'items': data.get('items', []),
'total': data.get('total', 0),
'success': True
}
except requests.exceptions.HTTPError as e:
# Se for erro de autenticação, tenta renovar o token
if hasattr(e, 'response') and e.response is not None:
status = e.response.status_code
if status in [401, 403]:
print(f" 🔄 Erro de autenticação, renovando token...")
token_manager.renovar_token()
if tentativa < MAX_RETRIES:
time.sleep(RETRY_DELAY)
return buscar_pagina(data_venda, start, count, tentativa + 1)
# Erro 429 - Too Many Requests
if status == 429:
if tentativa < MAX_RETRIES:
wait_time = RATE_LIMIT_DELAY * tentativa
print(f" ⏳ Rate limit (429), aguardando {wait_time}s...")
time.sleep(wait_time)
return buscar_pagina(data_venda, start, count, tentativa + 1)
if tentativa < MAX_RETRIES:
print(f" ⚠ Retry página {start}, tentativa {tentativa}/{MAX_RETRIES}...")
time.sleep(RETRY_DELAY)
return buscar_pagina(data_venda, start, count, tentativa + 1)
else:
print(f" ✗ FALHOU página {start} após {MAX_RETRIES} tentativas: {e}")
return {
'start': start,
'items': [],
'total': 0,
'success': False,
'error': str(e)
}
except Exception as e:
if tentativa < MAX_RETRIES:
print(f" ⚠ Retry página {start}, tentativa {tentativa}/{MAX_RETRIES}...")
time.sleep(RETRY_DELAY)
return buscar_pagina(data_venda, start, count, tentativa + 1)
else:
print(f" ✗ FALHOU página {start} após {MAX_RETRIES} tentativas: {e}")
return {
'start': start,
'items': [],
'total': 0,
'success': False,
'error': str(e)
}
def obter_total_registros(data_venda):
"""Faz uma requisição inicial para descobrir o total de registros"""
resultado = buscar_pagina(data_venda, 0, 1)
if resultado['success']:
return resultado['total']
return 0
def parse_datetime(dt_string):
"""Converte string de datetime para formato SQL Server"""
if not dt_string:
return None
try:
dt_string = dt_string.replace('-03:00', '').replace('-02:00', '')
dt = datetime.fromisoformat(dt_string)
return dt.strftime('%Y-%m-%d %H:%M:%S')
except:
return None
def inserir_venda(cursor, venda):
"""Insere uma venda na tabela Grgb_sales_receipts usando MERGE (upsert)"""
sql = """
MERGE INTO Grgb_sales_receipts AS target
USING (SELECT ? AS id) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
value = ?,
discount_value = ?,
invoice_xml_status = ?,
updated_at = GETDATE()
WHEN NOT MATCHED THEN
INSERT (
id, receipt_sequence, cash_register_number, store_id, coo,
employee_id, employee_name, value, additional_value, discount_value,
items_quantity, units_quantity, cancelled_items_quantity, cancelled_items_value,
sale_type, cancelled_units_quantity, sale_date, invoice_xml_status,
receipt_opening_datetime, receipt_closing_datetime, eletronic_key,
sale_order_id, external_id, discount_reason, loyalty_discount_value,
cancelling_reason, cancelled_receipt_sequence, channel, channel_description
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
sale_id = venda.get('id')
valores = (
sale_id,
venda.get('value'),
venda.get('discountValue'),
venda.get('invoiceXMLStatus'),
sale_id,
venda.get('receiptSequence'),
venda.get('cashRegisterNumber'),
venda.get('storeId'),
venda.get('coo'),
venda.get('employeeId'),
venda.get('employeeName'),
venda.get('value'),
venda.get('additionalValue'),
venda.get('discountValue'),
venda.get('itemsQuantity'),
venda.get('unitsQuantity'),
venda.get('cancelledItemsQuantity'),
venda.get('cancelledItemsValue'),
venda.get('saleType'),
venda.get('cancelledUnitsQuantity'),
venda.get('saleDate'),
venda.get('invoiceXMLStatus'),
parse_datetime(venda.get('receiptOpeningDateTime')),
parse_datetime(venda.get('receiptClosingDateTime')),
venda.get('eletronicKey'),
venda.get('saleOrderId'),
venda.get('externalId'),
venda.get('discountReason'),
venda.get('loyaltyDiscountValue'),
venda.get('cancellingReason'),
venda.get('cancelledReceiptSequence'),
venda.get('channel'),
venda.get('channelDescription')
)
cursor.execute(sql, valores)
return sale_id
def inserir_itens(cursor, sale_id, itens):
"""Insere os itens de uma venda na tabela Grgb_sales_receipts_itemsvenda usando MERGE"""
sql = """
MERGE INTO Grgb_sales_receipts_itemsvenda AS target
USING (SELECT ? AS id) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
quantity = ?,
total_value = ?,
cancelled = ?
WHEN NOT MATCHED THEN
INSERT (
id, sale_id, cancelled, product_id, seller_id, seller_name,
quantity, unit_value, gross_value, additional_value, discount_value,
total_value, tabela_a, ncm, ncm_excecao, natureza, cfop, csosn,
cst_icms, aliquota_icms, valor_reducao_aliquota_icms, valor_icms_desonerado,
valor_fecop, aliquota_fecop, cst_pis, aliquota_pis, cst_cofins, aliquota_cofins
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
for item in itens:
item_id = item.get('id')
valores = (
item_id,
item.get('quantity'),
item.get('totalValue'),
item.get('cancelled'),
item_id,
sale_id,
item.get('cancelled'),
item.get('productId'),
item.get('sellerId'),
item.get('sellerName'),
item.get('quantity'),
item.get('unitValue'),
item.get('grossValue'),
item.get('additionalValue'),
item.get('discountValue'),
item.get('totalValue'),
item.get('tabelaA'),
item.get('ncm'),
item.get('ncmExcecao'),
item.get('natureza'),
item.get('cfop'),
item.get('csosn'),
item.get('cstICMS'),
item.get('aliquotaICMS'),
item.get('valorReducaoAliquotaICMS'),
item.get('valorICMSDesonerado'),
item.get('valorFecop'),
item.get('aliquotaFecop'),
item.get('cstPIS'),
item.get('aliquotaPIS'),
item.get('cstCOFINS'),
item.get('aliquotaCOFINS')
)
cursor.execute(sql, valores)
def inserir_pagamentos(cursor, sale_id, pagamentos):
"""Insere os pagamentos de uma venda na tabela Grgb_sales_receipts_pagamentosvenda usando MERGE"""
sql = """
MERGE INTO Grgb_sales_receipts_pagamentosvenda AS target
USING (SELECT ? AS id) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
value = ?,
payment_method_description = ?
WHEN NOT MATCHED THEN
INSERT (
id, sale_id, payment_method_id, payment_method_description, value,
change_value, installment_quantity, check_issuer, card_authorization,
card_flag, card_flag_description, card_modality, rede_adquirente,
nsu, authorization_nsu, nsu_cancelling, card_bin_number
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
for pagamento in pagamentos:
pag_id = pagamento.get('id')
valores = (
pag_id,
pagamento.get('value'),
pagamento.get('paymentMethodDescription'),
pag_id,
sale_id,
pagamento.get('paymentMethodId'),
pagamento.get('paymentMethodDescription'),
pagamento.get('value'),
pagamento.get('change'),
pagamento.get('installmentQuantity'),
pagamento.get('checkIssuer'),
pagamento.get('cardAuthorization'),
pagamento.get('cardFlag'),
pagamento.get('cardFlagDescription'),
pagamento.get('cardModality'),
pagamento.get('redeAdquirente'),
pagamento.get('nsu'),
pagamento.get('authorizationNsu'),
pagamento.get('nsuCancelling'),
pagamento.get('cardBinNumber')
)
cursor.execute(sql, valores)
def worker_inserir_banco(fila, conn_string, stats, stop_event):
"""Worker que consome a fila e insere no banco"""
conn = pyodbc.connect(conn_string)
cursor = conn.cursor()
while not stop_event.is_set() or not fila.empty():
try:
vendas = fila.get(timeout=1)
except:
continue
for venda in vendas:
try:
sale_id = inserir_venda(cursor, venda)
stats['vendas'] += 1
itens = venda.get('items', [])
if itens:
inserir_itens(cursor, sale_id, itens)
stats['itens'] += len(itens)
pagamentos = venda.get('payments', [])
if pagamentos:
inserir_pagamentos(cursor, sale_id, pagamentos)
stats['pagamentos'] += len(pagamentos)
except Exception:
stats['erros'] += 1
# Commit a cada lote
conn.commit()
fila.task_done()
cursor.close()
conn.close()
def processar_dia(data_venda, fila, stats):
"""Processa um dia específico - busca da API e coloca na fila"""
# Descobrir total de registros para este dia
total = obter_total_registros(data_venda)
if total == 0:
print(f" {data_venda}: Nenhum registro")
return True # Sucesso, apenas não tem dados
print(f" {data_venda}: {total} registros, buscando...")
# Calcular páginas
paginas = list(range(0, total, PAGE_SIZE))
obtidas = 0
falhas = 0
# Buscar páginas em paralelo
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(buscar_pagina, data_venda, start, PAGE_SIZE): start
for start in paginas
}
for future in as_completed(futures):
resultado = future.result()
if resultado['success'] and resultado['items']:
# Coloca na fila para o worker do banco processar
fila.put(resultado['items'])
obtidas += len(resultado['items'])
stats['api_obtidas'] += len(resultado['items'])
elif not resultado['success']:
falhas += 1
if falhas > 0:
print(f" {data_venda}: ⚠ {obtidas} vendas OK, {falhas} páginas falharam")
else:
print(f" {data_venda}: ✓ {obtidas} vendas enviadas para o banco")
return falhas == 0
def gerar_datas(data_inicio, data_fim):
"""Gera lista de datas entre início e fim"""
datas = []
atual = data_inicio
while atual <= data_fim:
datas.append(atual.strftime('%Y-%m-%d'))
atual += timedelta(days=1)
return datas
def processar_periodo(data_inicio_str, data_fim_str):
"""Processo principal - processa um período de datas"""
data_inicio = datetime.strptime(data_inicio_str, '%Y-%m-%d')
data_fim = datetime.strptime(data_fim_str, '%Y-%m-%d')
datas = gerar_datas(data_inicio, data_fim)
print("=" * 60)
print(f"IMPORTAÇÃO DE VENDAS")
print(f"Período: {data_inicio_str} até {data_fim_str} ({len(datas)} dias)")
print("=" * 60)
# 1. Obter token inicial
print("\n[1/3] Obtendo token de autenticação...")
token = obter_token()
if not token:
return False
# 2. Configurar pipeline
print("\n[2/3] Iniciando pipeline (API → Banco)...")
fila = Queue(maxsize=20) # Buffer de 20 lotes
stop_event = Event()
stats = {'vendas': 0, 'itens': 0, 'pagamentos': 0, 'erros': 0, 'api_obtidas': 0}
# Iniciar worker do banco
worker = Thread(target=worker_inserir_banco, args=(fila, CONNECTION_STRING, stats, stop_event))
worker.start()
# 3. Processar cada dia
print("\n[3/3] Processando dias...")
inicio = time.time()
dias_com_falha = []
try:
for i, data_venda in enumerate(datas, 1):
print(f"\n[{i}/{len(datas)}] ", end="")
sucesso = processar_dia(data_venda, fila, stats)
if not sucesso:
dias_com_falha.append(data_venda)
# Mostrar stats parciais a cada 5 dias
if i % 5 == 0:
print(f" 📊 Parcial: {stats['vendas']} vendas, {stats['itens']} itens, {stats['pagamentos']} pagamentos no banco")
except KeyboardInterrupt:
print("\n\n⚠ Interrompido pelo usuário!")
# Sinalizar fim e aguardar worker
print("\n\nFinalizando inserções pendentes...")
stop_event.set()
fila.join()
worker.join()
tempo_total = time.time() - inicio
# Resultado final
print("\n" + "=" * 60)
print("IMPORTAÇÃO CONCLUÍDA!")
print("=" * 60)
print(f" Período: {data_inicio_str} até {data_fim_str}")
print(f" Dias processados: {len(datas)}")
print(f" Vendas inseridas: {stats['vendas']}")
print(f" Itens inseridos: {stats['itens']}")
print(f" Pagamentos inseridos: {stats['pagamentos']}")
print(f" Erros de inserção: {stats['erros']}")
print(f" Tempo total: {tempo_total:.1f}s ({tempo_total/60:.1f} min)")
if dias_com_falha:
print(f"\n ⚠ Dias com falhas parciais: {', '.join(dias_com_falha)}")
print("=" * 60)
return True
if __name__ == "__main__":
# Configuração do período
DATA_INICIO = "2025-09-24"
DATA_FIM = datetime.now().strftime('%Y-%m-%d') # Hoje
# Permite passar datas como argumentos: python script.py 2025-09-24 2025-12-31
if len(sys.argv) >= 3:
DATA_INICIO = sys.argv[1]
DATA_FIM = sys.argv[2]
elif len(sys.argv) == 2:
# Se passar só uma data, processa só aquele dia
DATA_INICIO = sys.argv[1]
DATA_FIM = sys.argv[1]
processar_periodo(DATA_INICIO, DATA_FIM)