G-Scripts/Pedidos_mar.py
daniel.rodrigues 7facb9b1bd att
2026-01-09 16:12:22 -03:00

611 lines
23 KiB
Python

import requests
from itertools import product
import pyodbc
import time
# ===============================
# CONFIGURAÇÕES
# ===============================
TOKEN_API_URL = "https://api.grupoginseng.com.br/api/tokens"
PDVS_API_URL = "https://api.grupoginseng.com.br/pdvs"
ORDERS_URL = (
"https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/"
"orders-bff/api/tracking/v5/orders"
)
SUMMARY_URL = (
"https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/"
"orders-bff/api/customer-orders/summary/"
)
ITEMS_URL = (
"https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/"
"orders-bff/api/customer-orders/v2/items/"
)
PARAMS_BASE = {
"sort": "orderid,desc"
}
# ===============================
# CONFIGURAÇÕES DO BANCO DE DADOS
# ===============================
DB_DRIVER = "ODBC Driver 17 for SQL Server"
DB_CONNECTION_STRING = (
f'DRIVER={{{DB_DRIVER}}};'
'SERVER=10.77.77.10;'
'DATABASE=GINSENG;'
'UID=supginseng;'
'PWD=Iphone2513@;'
'PORT=1433;'
'TrustServerCertificate=yes;'
'Encrypt=yes'
)
# ===============================
# CONFIGURAÇÕES DE EXECUÇÃO
# ===============================
REQUEST_DELAY = 0.5 # Delay em segundos entre requisições para evitar rate limiting
TOKEN_REFRESH_INTERVAL = 50 # Renovar token a cada N lojas processadas
# ===============================
# FUNÇÕES AUXILIARES
# ===============================
def get_new_token():
"""
Obtém um novo token da API
Retorna o token ou None em caso de erro
"""
try:
print("\n[TOKEN] Obtendo novo token...")
token_response = requests.get(TOKEN_API_URL, timeout=30)
token_response.raise_for_status()
token = token_response.json()["data"][0]["token"]
print("[TOKEN] Token renovado com sucesso!")
return token
except Exception as e:
print(f"[TOKEN] Erro ao obter token: {e}")
return None
def wrap(text, width=30):
"""Mantém o texto sem quebras para Excel"""
if not text:
return ""
return str(text)
def format_money(value):
"""
Formata valor monetário evitando notação científica
"""
try:
return f"{float(value):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
except Exception:
return ""
def to_decimal(value):
"""
Converte valor para decimal (float) para inserção no banco
"""
if not value or value == "":
return None
try:
# Remove formatação de moeda brasileira se houver
if isinstance(value, str):
value = value.replace(".", "").replace(",", ".")
return float(value)
except Exception:
return None
def to_int(value):
"""
Converte valor para inteiro para inserção no banco
"""
if not value or value == "":
return None
try:
return int(value)
except Exception:
return None
def to_date(value):
"""
Converte string de data para formato aceito pelo SQL Server
Formato esperado: dd/mm/yyyy
"""
if not value or value == "" or value == "-":
return None
try:
# Se já vier no formato dd/mm/yyyy, converte para datetime
from datetime import datetime
if isinstance(value, str) and "/" in value:
parts = value.split("/")
if len(parts) == 3:
# Converte dd/mm/yyyy para yyyy-mm-dd
return f"{parts[2]}-{parts[1]}-{parts[0]}"
return value
except Exception:
return None
def expand_sell_orders(sell_orders_list):
"""
Garante:
- 1 sellOrder por coluna
- duplica linhas quando houver mais de um sellOrder por marca
"""
sell_map = {
"BOT": [],
"EUD": [],
"QDB": []
}
for item in sell_orders_list:
bu = item.get("businessUnit")
orders = item.get("sellOrders", [])
if bu in sell_map:
sell_map[bu] = orders or [""]
# garante pelo menos 1 linha
for k in sell_map:
if not sell_map[k]:
sell_map[k] = [""]
return list(product(
sell_map["BOT"],
sell_map["EUD"],
sell_map["QDB"]
))
def process_store_with_retry(store, headers, max_retries=3, retry_delay=5, token_refresh_callback=None):
"""
Processa uma loja com tentativas de retry em caso de erro
Retorna tupla (lista de dados da loja, needs_token_refresh)
needs_token_refresh indica se o token precisa ser renovado
"""
store_data = []
needs_token_refresh = False
for attempt in range(max_retries):
try:
# Primeira chamada para obter totalElements e calcular total de páginas
params = PARAMS_BASE.copy()
params["page"] = 0
initial_response = requests.get(
ORDERS_URL,
headers=headers,
params=params,
timeout=30
)
if initial_response.status_code != 200:
print(f" [ERRO] Status code: {initial_response.status_code}")
# Se for erro de autenticação, sinaliza para renovar token
if initial_response.status_code in [401, 403]:
print(f" [ERRO] Token expirado ou inválido!")
needs_token_refresh = True
# Se tiver callback para renovar token, usa
if token_refresh_callback and attempt < max_retries - 1:
new_token = token_refresh_callback()
if new_token:
headers["authorization"] = new_token
headers["x-authorization"] = new_token
needs_token_refresh = False
print(f" [TOKEN] Tentando novamente com novo token...")
time.sleep(retry_delay)
continue
if attempt < max_retries - 1:
print(f" Tentativa {attempt + 1}/{max_retries} falhou. Aguardando {retry_delay}s antes de tentar novamente...")
time.sleep(retry_delay)
continue
else:
print(f" Erro ao buscar informações da loja {store} após {max_retries} tentativas")
return [], needs_token_refresh
initial_data = initial_response.json()
total_elements = initial_data.get("totalElements", 0)
total_pages = initial_data.get("totalPages", 1)
page_size = initial_data.get("size", 25)
print(f" Total de pedidos: {total_elements}")
print(f" Total de páginas disponíveis: {total_pages}")
print(f" Tamanho da página: {page_size}")
# Buscar apenas página 0
pages_to_fetch = [0]
print(f" Buscando páginas: {pages_to_fetch}")
for page in pages_to_fetch:
print(f" Buscando página {page}...")
# Delay entre requisições para evitar rate limiting
time.sleep(REQUEST_DELAY)
params = PARAMS_BASE.copy()
params["page"] = page
response = requests.get(
ORDERS_URL,
headers=headers,
params=params,
timeout=30
)
if response.status_code != 200:
print(f" [ERRO] Erro ao buscar página {page} da loja {store} - Status: {response.status_code}")
# Se for erro de autenticação na página
if response.status_code in [401, 403]:
print(f" [ERRO] Token expirado durante busca de página!")
needs_token_refresh = True
continue
orders = response.json().get("content", [])
for order in orders:
order_id = order.get("orderId")
print(f" Processando pedido {order_id}...")
# Delay entre requisições
time.sleep(REQUEST_DELAY)
summary_response = requests.get(
f"{SUMMARY_URL}{order_id}",
headers=headers,
timeout=30
)
total_items = ""
total_skus = ""
total_value = ""
percent_txt = []
sell_combinations = [("", "", "")]
if summary_response.status_code == 200:
summary = summary_response.json()
order_summary = summary.get("orderSummary", {})
totals = summary.get("orderTotals", {})
sell_combinations = expand_sell_orders(
order_summary.get("order", {}).get("sellOrdersList", [])
)
total_items = totals.get("totalItems", "")
total_skus = totals.get("totalSKUs", "")
total_value = format_money(totals.get("totalOrderValue", ""))
for p in totals.get("totalPerBrand", []):
percent_txt.append(f"{p['businessUnit']}: {p['percentage']}%")
# Buscar itens do pedido
time.sleep(REQUEST_DELAY)
items_response = requests.get(
f"{ITEMS_URL}{order_id}",
headers=headers,
params={"page": 0, "sort": "business_unit,asc"},
timeout=30
)
items_list = []
if items_response.status_code == 200:
items_data = items_response.json()
items_list = items_data.get("content", [])
# Se não houver itens, adiciona uma linha com os dados do pedido
if not items_list:
for sell_bot, sell_eud, sell_qdb in sell_combinations:
store_data.append({
"Loja": store,
"Pedido": order_id,
"Data": order.get("orderDate"),
"Tipo": order.get("orderType"),
"Status": order.get("status"),
"Valor (R$)": format_money(order.get("orderValue", 0)),
"PDV": wrap(order.get("pdv")),
"Observação": wrap(order.get("observation")),
"Sell BOT": sell_bot,
"Sell EUD": sell_eud,
"Sell QDB": sell_qdb,
"Itens": total_items,
"SKUs": total_skus,
"Valor Total": total_value,
"% Atendido": wrap("\n".join(percent_txt)),
# Dados do item
"Item BU": "",
"SKU": "",
"Descrição Item": "",
"Status Item": "",
"Preço Unit.": "",
"Qtd Solicitada": "",
"Qtd Aceita": "",
"Qtd Atendida": "",
"Qtd Faturada": "",
"% Item": "",
"NF Número": "",
"NF Data MAR": "",
"Data Entrega": "",
"Código Atendimento": "",
"Motivo Recusa": ""
})
else:
# Adiciona uma linha para cada item do pedido
for item in items_list:
qty = item.get("quantity", {})
for sell_bot, sell_eud, sell_qdb in sell_combinations:
store_data.append({
"Loja": store,
"Pedido": order_id,
"Data": order.get("orderDate"),
"Tipo": order.get("orderType"),
"Status": order.get("status"),
"Valor (R$)": format_money(order.get("orderValue", 0)),
"PDV": wrap(order.get("pdv")),
"Observação": wrap(order.get("observation")),
"Sell BOT": sell_bot,
"Sell EUD": sell_eud,
"Sell QDB": sell_qdb,
"Itens": total_items,
"SKUs": total_skus,
"Valor Total": total_value,
"% Atendido": wrap("\n".join(percent_txt)),
# Dados do item
"Item BU": item.get("bussinessUnit", ""),
"SKU": item.get("sku", ""),
"Descrição Item": wrap(item.get("description", "")),
"Status Item": item.get("status", ""),
"Preço Unit.": format_money(item.get("price", "")),
"Qtd Solicitada": qty.get("requested", ""),
"Qtd Aceita": qty.get("accepted", ""),
"Qtd Atendida": qty.get("attended", ""),
"Qtd Faturada": qty.get("invoiced", ""),
"% Item": qty.get("percentage", ""),
"NF Número": item.get("nfNumber", ""),
"NF Data MAR": item.get("nfNumberMarDate", ""),
"Data Entrega": item.get("deliveryDate", ""),
"Código Atendimento": item.get("fullfilmentCode", ""),
"Motivo Recusa": wrap(item.get("refusalReason", ""))
})
# Se chegou aqui, processou com sucesso
return store_data, needs_token_refresh
except Exception as e:
if attempt < max_retries - 1:
print(f" [ERRO] Erro na tentativa {attempt + 1}/{max_retries}: {e}")
print(f" Aguardando {retry_delay}s antes de tentar novamente...")
time.sleep(retry_delay)
else:
print(f" [ERRO] Erro ao processar loja {store} após {max_retries} tentativas: {e}")
return [], needs_token_refresh
return [], needs_token_refresh
# ===============================
# 1) OBTER TOKEN
# ===============================
token_response = requests.get(TOKEN_API_URL, timeout=30)
token_response.raise_for_status()
TOKEN = token_response.json()["data"][0]["token"]
print("Token obtido com sucesso")
# ===============================
# 2) OBTER LISTA DE LOJAS (PDVs)
# ===============================
print("Buscando lista de lojas...")
pdvs_response = requests.get(PDVS_API_URL, timeout=30)
pdvs_response.raise_for_status()
pdvs_data = pdvs_response.json()
STORES = [str(pdv) for pdv in pdvs_data["data"]["pdvs"]]
print(f"Total de lojas encontradas: {len(STORES)}")
print(f"Lojas: {', '.join(STORES[:10])}{'...' if len(STORES) > 10 else ''}")
# ===============================
# 3) HEADERS BASE
# ===============================
BASE_HEADERS = {
"accept": "*/*",
"accept-language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
"authorization": TOKEN,
"x-authorization": TOKEN,
"content-type": "application/json",
"origin": "https://extranet.grupoboticario.com.br",
"referer": "https://extranet.grupoboticario.com.br/",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/143.0.0.0 Safari/537.36"
),
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"sec-ch-ua": "\"Google Chrome\";v=\"143\", \"Chromium\";v=\"143\", \"Not A(Brand\";v=\"24\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"x-user-id": "163165",
"x-username": "daniel.rodrigue"
}
# ===============================
# 4) LOOP POR LOJA
# ===============================
all_data = []
stores_processed = 0
failed_stores = []
for store in STORES:
print(f"\nProcessando loja {store}... ({stores_processed + 1}/{len(STORES)})")
# Renovar token periodicamente a cada N lojas
if stores_processed > 0 and stores_processed % TOKEN_REFRESH_INTERVAL == 0:
print(f"\n[TOKEN] Renovação preventiva após {stores_processed} lojas...")
new_token = get_new_token()
if new_token:
TOKEN = new_token
BASE_HEADERS["authorization"] = TOKEN
BASE_HEADERS["x-authorization"] = TOKEN
headers = BASE_HEADERS.copy()
headers["storecode"] = store
# Processar loja com retry e callback para renovar token
store_data, needs_refresh = process_store_with_retry(
store,
headers,
max_retries=3,
retry_delay=5,
token_refresh_callback=get_new_token
)
# Se precisou renovar token, atualiza os headers base
if needs_refresh:
new_token = get_new_token()
if new_token:
TOKEN = new_token
BASE_HEADERS["authorization"] = TOKEN
BASE_HEADERS["x-authorization"] = TOKEN
# Adicionar dados da loja ao total
all_data.extend(store_data)
stores_processed += 1
if len(store_data) == 0:
failed_stores.append(store)
print(f"Loja {store} processada: {len(store_data)} registros")
# Resumo das lojas que falharam
if failed_stores:
print(f"\n[AVISO] {len(failed_stores)} lojas falharam: {', '.join(failed_stores[:20])}{'...' if len(failed_stores) > 20 else ''}")
# ===============================
# 5) SALVAR NO BANCO DE DADOS
# ===============================
if all_data:
print(f"\nConectando ao banco de dados...")
try:
# Conectar ao banco de dados
conn = pyodbc.connect(DB_CONNECTION_STRING)
cursor = conn.cursor()
print(f"Conexão estabelecida com sucesso!")
# Obter lista única de pedidos para deletar
unique_orders = list(set([record["Pedido"] for record in all_data]))
print(f"Total de pedidos únicos a processar: {len(unique_orders)}")
# Deletar registros existentes em lotes de 500
print(f"Deletando registros existentes dos pedidos...")
batch_size = 500
deleted_count = 0
for i in range(0, len(unique_orders), batch_size):
batch = unique_orders[i:i + batch_size]
placeholders = ','.join(['?' for _ in batch])
delete_query = f"DELETE FROM [GINSENG].[dbo].[extrato_pedidos_mar] WHERE [Pedido] IN ({placeholders})"
cursor.execute(delete_query, batch)
deleted_count += cursor.rowcount
conn.commit()
print(f" Deletados {deleted_count} registros (lote {i//batch_size + 1}/{(len(unique_orders)-1)//batch_size + 1})...")
print(f"Total de registros deletados: {deleted_count}")
# Query de inserção
print(f"\nInserindo {len(all_data)} novos registros...")
insert_query = """
INSERT INTO [GINSENG].[dbo].[extrato_pedidos_mar]
([Loja], [Pedido], [Data], [Tipo], [Status], [Valor], [PDV], [Observacao],
[SellBOT], [SellEUD], [SellQDB], [Itens], [SKUs], [ValorTotal], [PercentualAtendido],
[ItemBU], [SKU], [DescricaoItem], [StatusItem], [PrecoUnitario],
[QtdSolicitada], [QtdAceita], [QtdAtendida], [QtdFaturada], [PercentualItem],
[NFNumero], [NFDataMAR], [DataEntrega], [CodigoAtendimento], [MotivoRecusa])
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
# Inserir dados em lote
records_inserted = 0
for record in all_data:
cursor.execute(insert_query,
record["Loja"], # varchar
record["Pedido"], # varchar
to_date(record["Data"]), # date
record["Tipo"], # varchar
record["Status"], # nvarchar
to_decimal(record["Valor (R$)"]), # decimal
record["PDV"] if record["PDV"] else None, # varchar
record["Observação"] if record["Observação"] else None, # varchar
record["Sell BOT"] if record["Sell BOT"] else None, # varchar
record["Sell EUD"] if record["Sell EUD"] else None, # varchar
record["Sell QDB"] if record["Sell QDB"] else None, # varchar
to_int(record["Itens"]), # int
str(record["SKUs"]) if record["SKUs"] else None, # varchar (não int!)
to_decimal(record["Valor Total"]), # decimal
record["% Atendido"] if record["% Atendido"] else None, # varchar
record["Item BU"] if record["Item BU"] else None, # varchar
record["SKU"] if record["SKU"] else None, # varchar
record["Descrição Item"] if record["Descrição Item"] else None, # nvarchar
record["Status Item"] if record["Status Item"] else None, # varchar
to_decimal(record["Preço Unit."]), # decimal
to_int(record["Qtd Solicitada"]), # int
to_int(record["Qtd Aceita"]), # int
to_int(record["Qtd Atendida"]), # int
to_int(record["Qtd Faturada"]), # int
to_int(record["% Item"]), # int
record["NF Número"] if record["NF Número"] else None, # varchar
to_date(record["NF Data MAR"]), # date
to_date(record["Data Entrega"]), # date
record["Código Atendimento"] if record["Código Atendimento"] else None, # varchar
record["Motivo Recusa"] if record["Motivo Recusa"] else None # varchar
)
records_inserted += 1
# Commit a cada 100 registros para melhor performance
if records_inserted % 100 == 0:
conn.commit()
print(f" {records_inserted}/{len(all_data)} registros inseridos...")
# Commit final
conn.commit()
print(f"\n✓ Dados salvos com sucesso no banco de dados!")
print(f"Total de registros inseridos: {records_inserted}")
# Fechar conexão
cursor.close()
conn.close()
except Exception as e:
print(f"\n✗ Erro ao salvar no banco de dados: {e}")
else:
print("\nNenhum dado encontrado para salvar.")