import requests from itertools import product import pyodbc import time # =============================== # CONFIGURAÇÕES # =============================== TOKEN_API_URL = "https://api.grupoginseng.com.br/api/tokens" PDVS_API_URL = "https://api.grupoginseng.com.br/pdvs" ORDERS_URL = ( "https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/" "orders-bff/api/tracking/v5/orders" ) SUMMARY_URL = ( "https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/" "orders-bff/api/customer-orders/summary/" ) ITEMS_URL = ( "https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/" "orders-bff/api/customer-orders/v2/items/" ) PARAMS_BASE = { "sort": "orderid,desc" } # =============================== # CONFIGURAÇÕES DO BANCO DE DADOS # =============================== DB_DRIVER = "ODBC Driver 17 for SQL Server" DB_CONNECTION_STRING = ( f'DRIVER={{{DB_DRIVER}}};' 'SERVER=10.77.77.10;' 'DATABASE=GINSENG;' 'UID=supginseng;' 'PWD=Iphone2513@;' 'PORT=1433;' 'TrustServerCertificate=yes;' 'Encrypt=yes' ) # =============================== # CONFIGURAÇÕES DE EXECUÇÃO # =============================== REQUEST_DELAY = 0.5 # Delay em segundos entre requisições para evitar rate limiting TOKEN_REFRESH_INTERVAL = 50 # Renovar token a cada N lojas processadas FAILED_STORES_MAX_RETRIES = 3 # Número máximo de tentativas para lojas que falharam FAILED_STORES_RETRY_DELAY = 180 # Tempo de espera (em segundos) antes de tentar novamente lojas que falharam # =============================== # FUNÇÕES AUXILIARES # =============================== def get_new_token(): """ Obtém um novo token da API Retorna o token ou None em caso de erro """ try: print("\n[TOKEN] Obtendo novo token...") token_response = requests.get(TOKEN_API_URL, timeout=30) token_response.raise_for_status() token = token_response.json()["data"][0]["token"] print("[TOKEN] Token renovado com sucesso!") return token except Exception as e: print(f"[TOKEN] Erro ao obter token: {e}") return None def wrap(text, width=30): """Mantém o texto sem quebras para Excel""" if not text: return "" return str(text) def format_money(value): """ Formata valor monetário evitando notação científica """ try: return f"{float(value):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") except Exception: return "" def to_decimal(value): """ Converte valor para decimal (float) para inserção no banco """ if not value or value == "": return None try: # Remove formatação de moeda brasileira se houver if isinstance(value, str): value = value.replace(".", "").replace(",", ".") return float(value) except Exception: return None def to_int(value): """ Converte valor para inteiro para inserção no banco """ if not value or value == "": return None try: return int(value) except Exception: return None def to_date(value): """ Converte string de data para formato aceito pelo SQL Server Formato esperado: dd/mm/yyyy """ if not value or value == "" or value == "-": return None try: # Se já vier no formato dd/mm/yyyy, converte para datetime from datetime import datetime if isinstance(value, str) and "/" in value: parts = value.split("/") if len(parts) == 3: # Converte dd/mm/yyyy para yyyy-mm-dd return f"{parts[2]}-{parts[1]}-{parts[0]}" return value except Exception: return None def expand_sell_orders(sell_orders_list): """ Garante: - 1 sellOrder por coluna - duplica linhas quando houver mais de um sellOrder por marca """ sell_map = { "BOT": [], "EUD": [], "QDB": [] } for item in sell_orders_list: bu = item.get("businessUnit") orders = item.get("sellOrders", []) if bu in sell_map: sell_map[bu] = orders or [""] # garante pelo menos 1 linha for k in sell_map: if not sell_map[k]: sell_map[k] = [""] return list(product( sell_map["BOT"], sell_map["EUD"], sell_map["QDB"] )) def process_store_with_retry(store, headers, max_retries=3, retry_delay=5, token_refresh_callback=None): """ Processa uma loja com tentativas de retry em caso de erro Retorna tupla (lista de dados da loja, needs_token_refresh) needs_token_refresh indica se o token precisa ser renovado """ needs_token_refresh = False for attempt in range(max_retries): # IMPORTANTE: Limpar dados a cada tentativa para evitar duplicação # caso uma tentativa anterior tenha processado pedidos parcialmente antes de falhar store_data = [] try: # Primeira chamada para obter totalElements e calcular total de páginas params = PARAMS_BASE.copy() params["page"] = 0 initial_response = requests.get( ORDERS_URL, headers=headers, params=params, timeout=30 ) if initial_response.status_code != 200: print(f" [ERRO] Status code: {initial_response.status_code}") # Se for erro de autenticação, sinaliza para renovar token if initial_response.status_code in [401, 403]: print(f" [ERRO] Token expirado ou inválido!") needs_token_refresh = True # Se tiver callback para renovar token, usa if token_refresh_callback and attempt < max_retries - 1: new_token = token_refresh_callback() if new_token: headers["authorization"] = new_token headers["x-authorization"] = new_token needs_token_refresh = False print(f" [TOKEN] Tentando novamente com novo token...") time.sleep(retry_delay) continue if attempt < max_retries - 1: print(f" Tentativa {attempt + 1}/{max_retries} falhou. Aguardando {retry_delay}s antes de tentar novamente...") time.sleep(retry_delay) continue else: print(f" Erro ao buscar informações da loja {store} após {max_retries} tentativas") return [], needs_token_refresh initial_data = initial_response.json() total_elements = initial_data.get("totalElements", 0) total_pages = initial_data.get("totalPages", 1) page_size = initial_data.get("size", 25) print(f" Total de pedidos: {total_elements}") print(f" Total de páginas disponíveis: {total_pages}") print(f" Tamanho da página: {page_size}") # Buscar apenas página 0 pages_to_fetch = [0] print(f" Buscando páginas: {pages_to_fetch}") for page in pages_to_fetch: print(f" Buscando página {page}...") # Delay entre requisições para evitar rate limiting time.sleep(REQUEST_DELAY) params = PARAMS_BASE.copy() params["page"] = page response = requests.get( ORDERS_URL, headers=headers, params=params, timeout=30 ) if response.status_code != 200: print(f" [ERRO] Erro ao buscar página {page} da loja {store} - Status: {response.status_code}") # Se for erro de autenticação na página if response.status_code in [401, 403]: print(f" [ERRO] Token expirado durante busca de página!") needs_token_refresh = True continue orders = response.json().get("content", []) for order in orders: order_id = order.get("orderId") print(f" Processando pedido {order_id}...") # Delay entre requisições time.sleep(REQUEST_DELAY) summary_response = requests.get( f"{SUMMARY_URL}{order_id}", headers=headers, timeout=30 ) total_items = "" total_skus = "" total_value = "" percent_txt = [] sell_combinations = [("", "", "")] if summary_response.status_code == 200: summary = summary_response.json() order_summary = summary.get("orderSummary", {}) totals = summary.get("orderTotals", {}) sell_combinations = expand_sell_orders( order_summary.get("order", {}).get("sellOrdersList", []) ) total_items = totals.get("totalItems", "") total_skus = totals.get("totalSKUs", "") total_value = format_money(totals.get("totalOrderValue", "")) for p in totals.get("totalPerBrand", []): percent_txt.append(f"{p['businessUnit']}: {p['percentage']}%") # Buscar itens do pedido time.sleep(REQUEST_DELAY) items_response = requests.get( f"{ITEMS_URL}{order_id}", headers=headers, params={"page": 0, "sort": "business_unit,asc"}, timeout=30 ) items_list = [] if items_response.status_code == 200: items_data = items_response.json() items_list = items_data.get("content", []) # Se não houver itens, adiciona uma linha com os dados do pedido if not items_list: for sell_bot, sell_eud, sell_qdb in sell_combinations: store_data.append({ "Loja": store, "Pedido": order_id, "Data": order.get("orderDate"), "Tipo": order.get("orderType"), "Status": order.get("status"), "Valor (R$)": format_money(order.get("orderValue", 0)), "PDV": wrap(order.get("pdv")), "Observação": wrap(order.get("observation")), "Sell BOT": sell_bot, "Sell EUD": sell_eud, "Sell QDB": sell_qdb, "Itens": total_items, "SKUs": total_skus, "Valor Total": total_value, "% Atendido": wrap("\n".join(percent_txt)), # Dados do item "Item BU": "", "SKU": "", "Descrição Item": "", "Status Item": "", "Preço Unit.": "", "Qtd Solicitada": "", "Qtd Aceita": "", "Qtd Atendida": "", "Qtd Faturada": "", "% Item": "", "NF Número": "", "NF Data MAR": "", "Data Entrega": "", "Código Atendimento": "", "Motivo Recusa": "" }) else: # Adiciona uma linha para cada item do pedido for item in items_list: qty = item.get("quantity", {}) for sell_bot, sell_eud, sell_qdb in sell_combinations: store_data.append({ "Loja": store, "Pedido": order_id, "Data": order.get("orderDate"), "Tipo": order.get("orderType"), "Status": order.get("status"), "Valor (R$)": format_money(order.get("orderValue", 0)), "PDV": wrap(order.get("pdv")), "Observação": wrap(order.get("observation")), "Sell BOT": sell_bot, "Sell EUD": sell_eud, "Sell QDB": sell_qdb, "Itens": total_items, "SKUs": total_skus, "Valor Total": total_value, "% Atendido": wrap("\n".join(percent_txt)), # Dados do item "Item BU": item.get("bussinessUnit", ""), "SKU": item.get("sku", ""), "Descrição Item": wrap(item.get("description", "")), "Status Item": item.get("status", ""), "Preço Unit.": format_money(item.get("price", "")), "Qtd Solicitada": qty.get("requested", ""), "Qtd Aceita": qty.get("accepted", ""), "Qtd Atendida": qty.get("attended", ""), "Qtd Faturada": qty.get("invoiced", ""), "% Item": qty.get("percentage", ""), "NF Número": item.get("nfNumber", ""), "NF Data MAR": item.get("nfNumberMarDate", ""), "Data Entrega": item.get("deliveryDate", ""), "Código Atendimento": item.get("fullfilmentCode", ""), "Motivo Recusa": wrap(item.get("refusalReason", "")) }) # Se chegou aqui, processou com sucesso return store_data, needs_token_refresh except Exception as e: if attempt < max_retries - 1: print(f" [ERRO] Erro na tentativa {attempt + 1}/{max_retries}: {e}") print(f" Aguardando {retry_delay}s antes de tentar novamente...") time.sleep(retry_delay) else: print(f" [ERRO] Erro ao processar loja {store} após {max_retries} tentativas: {e}") return [], needs_token_refresh return [], needs_token_refresh # =============================== # 1) OBTER TOKEN # =============================== token_response = requests.get(TOKEN_API_URL, timeout=30) token_response.raise_for_status() TOKEN = token_response.json()["data"][0]["token"] print("Token obtido com sucesso") # =============================== # 2) OBTER LISTA DE LOJAS (PDVs) # =============================== print("Buscando lista de lojas...") pdvs_response = requests.get(PDVS_API_URL, timeout=30) pdvs_response.raise_for_status() pdvs_data = pdvs_response.json() STORES = [str(pdv) for pdv in pdvs_data["data"]["pdvs"]] print(f"Total de lojas encontradas: {len(STORES)}") print(f"Lojas: {', '.join(STORES[:10])}{'...' if len(STORES) > 10 else ''}") # =============================== # 3) HEADERS BASE # =============================== BASE_HEADERS = { "accept": "*/*", "accept-language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7", "authorization": TOKEN, "x-authorization": TOKEN, "content-type": "application/json", "origin": "https://extranet.grupoboticario.com.br", "referer": "https://extranet.grupoboticario.com.br/", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/143.0.0.0 Safari/537.36" ), "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "cross-site", "sec-ch-ua": "\"Google Chrome\";v=\"143\", \"Chromium\";v=\"143\", \"Not A(Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "x-user-id": "163165", "x-username": "daniel.rodrigue" } # =============================== # 4) LOOP POR LOJA # =============================== def process_stores(stores_to_process, all_data, base_headers, token, is_retry=False): """ Processa uma lista de lojas e retorna os dados coletados e as lojas que falharam. Se is_retry=True, remove dados existentes da loja antes de adicionar novos (evita duplicação). """ failed = [] processed_count = 0 for store in stores_to_process: print(f"\nProcessando loja {store}... ({processed_count + 1}/{len(stores_to_process)})") # Renovar token periodicamente a cada N lojas if processed_count > 0 and processed_count % TOKEN_REFRESH_INTERVAL == 0: print(f"\n[TOKEN] Renovação preventiva após {processed_count} lojas...") new_token = get_new_token() if new_token: token = new_token base_headers["authorization"] = token base_headers["x-authorization"] = token headers = base_headers.copy() headers["storecode"] = store # Processar loja com retry e callback para renovar token store_data, needs_refresh = process_store_with_retry( store, headers, max_retries=3, retry_delay=5, token_refresh_callback=get_new_token ) # Se precisou renovar token, atualiza os headers base if needs_refresh: new_token = get_new_token() if new_token: token = new_token base_headers["authorization"] = token base_headers["x-authorization"] = token # Se é retry e a loja foi processada com sucesso, remove dados antigos dessa loja # para evitar duplicação (caso tenha coletado dados parciais antes de falhar) if is_retry and len(store_data) > 0: old_count = len(all_data) all_data[:] = [record for record in all_data if record["Loja"] != store] removed = old_count - len(all_data) if removed > 0: print(f" [RETRY] Removidos {removed} registros antigos da loja {store} para evitar duplicação") # Adicionar dados da loja ao total all_data.extend(store_data) processed_count += 1 if len(store_data) == 0: failed.append(store) print(f"Loja {store} processada: {len(store_data)} registros") return failed, token all_data = [] failed_stores = [] # Estatísticas para relatório final stats = { "total_lojas": len(STORES), "lojas_sucesso_primeira": 0, "lojas_falha_inicial": [], "lojas_recuperadas_retry": [], "lojas_falha_final": [], "registros_inseridos": 0, "registros_deletados": 0, "duplicatas_removidas": 0, "pedidos_unicos": 0 } # Primeira passagem: processar todas as lojas failed_stores, TOKEN = process_stores(STORES, all_data, BASE_HEADERS, TOKEN) # Guardar lojas que falharam na primeira tentativa stats["lojas_falha_inicial"] = failed_stores.copy() stats["lojas_sucesso_primeira"] = len(STORES) - len(failed_stores) # Retry das lojas que falharam retry_round = 0 while failed_stores and retry_round < FAILED_STORES_MAX_RETRIES: retry_round += 1 print(f"\n{'=' * 60}") print(f"[RETRY {retry_round}/{FAILED_STORES_MAX_RETRIES}] {len(failed_stores)} lojas falharam: {', '.join(failed_stores[:20])}{'...' if len(failed_stores) > 20 else ''}") print(f"Aguardando {FAILED_STORES_RETRY_DELAY} segundos antes de tentar novamente...") print(f"{'=' * 60}") time.sleep(FAILED_STORES_RETRY_DELAY) # Renovar token antes de tentar novamente print(f"\n[TOKEN] Renovando token antes do retry...") new_token = get_new_token() if new_token: TOKEN = new_token BASE_HEADERS["authorization"] = TOKEN BASE_HEADERS["x-authorization"] = TOKEN # Guardar lojas antes do retry para comparar depois lojas_antes_retry = set(failed_stores) # Tentar processar as lojas que falharam (is_retry=True para evitar duplicação) stores_to_retry = failed_stores.copy() failed_stores, TOKEN = process_stores(stores_to_retry, all_data, BASE_HEADERS, TOKEN, is_retry=True) # Identificar lojas que foram recuperadas neste retry lojas_recuperadas = lojas_antes_retry - set(failed_stores) stats["lojas_recuperadas_retry"].extend(lojas_recuperadas) if not failed_stores: print(f"\n[SUCESSO] Todas as lojas foram processadas com sucesso no retry {retry_round}!") # Guardar lojas que falharam definitivamente stats["lojas_falha_final"] = failed_stores.copy() # Resumo final das lojas que ainda falharam if failed_stores: print(f"\n{'=' * 60}") print(f"[AVISO FINAL] {len(failed_stores)} lojas falharam após {FAILED_STORES_MAX_RETRIES} tentativas de retry:") print(f" {', '.join(failed_stores)}") print(f"{'=' * 60}") # =============================== # 5) SALVAR NO BANCO DE DADOS # =============================== if all_data: # Remover duplicatas em all_data antes de inserir # Usa uma chave composta: Loja + Pedido + SKU print(f"\nVerificando duplicatas em all_data...") seen = set() unique_data = [] duplicates_removed = 0 for record in all_data: # Criar chave única baseada nos campos que identificam um registro key = ( record.get("Loja", ""), record.get("Pedido", ""), record.get("SKU", "") ) if key not in seen: seen.add(key) unique_data.append(record) else: duplicates_removed += 1 if duplicates_removed > 0: print(f"[AVISO] Removidas {duplicates_removed} linhas duplicadas de all_data") all_data = unique_data stats["duplicatas_removidas"] = duplicates_removed print(f"Total de registros após deduplicação: {len(all_data)}") print(f"\nConectando ao banco de dados...") try: # Conectar ao banco de dados conn = pyodbc.connect(DB_CONNECTION_STRING) cursor = conn.cursor() print(f"Conexão estabelecida com sucesso!") # Obter lista única de pedidos para deletar unique_orders = list(set([record["Pedido"] for record in all_data])) stats["pedidos_unicos"] = len(unique_orders) print(f"Total de pedidos únicos a processar: {len(unique_orders)}") # Deletar registros existentes em lotes de 500 print(f"Deletando registros existentes dos pedidos...") batch_size = 500 deleted_count = 0 for i in range(0, len(unique_orders), batch_size): batch = unique_orders[i:i + batch_size] placeholders = ','.join(['?' for _ in batch]) delete_query = f"DELETE FROM [GINSENG].[dbo].[extrato_pedidos_mar] WHERE [Pedido] IN ({placeholders})" cursor.execute(delete_query, batch) deleted_count += cursor.rowcount conn.commit() print(f" Deletados {deleted_count} registros (lote {i//batch_size + 1}/{(len(unique_orders)-1)//batch_size + 1})...") stats["registros_deletados"] = deleted_count print(f"Total de registros deletados: {deleted_count}") # Query de inserção print(f"\nInserindo {len(all_data)} novos registros...") insert_query = """ INSERT INTO [GINSENG].[dbo].[extrato_pedidos_mar] ([Loja], [Pedido], [Data], [Tipo], [Status], [Valor], [PDV], [Observacao], [SellBOT], [SellEUD], [SellQDB], [Itens], [SKUs], [ValorTotal], [PercentualAtendido], [ItemBU], [SKU], [DescricaoItem], [StatusItem], [PrecoUnitario], [QtdSolicitada], [QtdAceita], [QtdAtendida], [QtdFaturada], [PercentualItem], [NFNumero], [NFDataMAR], [DataEntrega], [CodigoAtendimento], [MotivoRecusa]) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ # Inserir dados em lote records_inserted = 0 for record in all_data: cursor.execute(insert_query, record["Loja"], # varchar record["Pedido"], # varchar to_date(record["Data"]), # date record["Tipo"], # varchar record["Status"], # nvarchar to_decimal(record["Valor (R$)"]), # decimal record["PDV"] if record["PDV"] else None, # varchar record["Observação"] if record["Observação"] else None, # varchar record["Sell BOT"] if record["Sell BOT"] else None, # varchar record["Sell EUD"] if record["Sell EUD"] else None, # varchar record["Sell QDB"] if record["Sell QDB"] else None, # varchar to_int(record["Itens"]), # int str(record["SKUs"]) if record["SKUs"] else None, # varchar (não int!) to_decimal(record["Valor Total"]), # decimal record["% Atendido"] if record["% Atendido"] else None, # varchar record["Item BU"] if record["Item BU"] else None, # varchar record["SKU"] if record["SKU"] else None, # varchar record["Descrição Item"] if record["Descrição Item"] else None, # nvarchar record["Status Item"] if record["Status Item"] else None, # varchar to_decimal(record["Preço Unit."]), # decimal to_int(record["Qtd Solicitada"]), # int to_int(record["Qtd Aceita"]), # int to_int(record["Qtd Atendida"]), # int to_int(record["Qtd Faturada"]), # int to_int(record["% Item"]), # int record["NF Número"] if record["NF Número"] else None, # varchar to_date(record["NF Data MAR"]), # date to_date(record["Data Entrega"]), # date record["Código Atendimento"] if record["Código Atendimento"] else None, # varchar record["Motivo Recusa"] if record["Motivo Recusa"] else None # varchar ) records_inserted += 1 # Commit a cada 100 registros para melhor performance if records_inserted % 100 == 0: conn.commit() print(f" {records_inserted}/{len(all_data)} registros inseridos...") # Commit final conn.commit() stats["registros_inseridos"] = records_inserted print(f"\n✓ Dados salvos com sucesso no banco de dados!") print(f"Total de registros inseridos: {records_inserted}") # Fechar conexão cursor.close() conn.close() except Exception as e: print(f"\n✗ Erro ao salvar no banco de dados: {e}") else: print("\nNenhum dado encontrado para salvar.") # =============================== # 6) RELATÓRIO FINAL # =============================== print("\n") print("=" * 70) print(" RELATÓRIO FINAL DE EXECUÇÃO") print("=" * 70) print("\n📊 ESTATÍSTICAS DE LOJAS:") print(f" Total de lojas processadas: {stats['total_lojas']}") print(f" Lojas com sucesso na 1ª tentativa: {stats['lojas_sucesso_primeira']}") print(f" Lojas que falharam inicialmente: {len(stats['lojas_falha_inicial'])}") print(f" Lojas recuperadas após retry: {len(stats['lojas_recuperadas_retry'])}") print(f" Lojas que falharam definitivamente: {len(stats['lojas_falha_final'])}") if stats['lojas_recuperadas_retry']: print(f"\n Lojas recuperadas no retry:") for loja in stats['lojas_recuperadas_retry']: print(f" - {loja}") if stats['lojas_falha_final']: print(f"\n Lojas com falha definitiva:") for loja in stats['lojas_falha_final']: print(f" - {loja}") print("\n📦 ESTATÍSTICAS DE DADOS:") print(f" Pedidos únicos processados: {stats['pedidos_unicos']}") print(f" Duplicatas removidas (memória): {stats['duplicatas_removidas']}") print(f" Registros deletados do banco: {stats['registros_deletados']}") print(f" Registros inseridos no banco: {stats['registros_inseridos']}") # Calcular taxa de sucesso taxa_sucesso = ((stats['total_lojas'] - len(stats['lojas_falha_final'])) / stats['total_lojas'] * 100) if stats['total_lojas'] > 0 else 0 print("\n📈 RESUMO:") print(f" Taxa de sucesso: {taxa_sucesso:.1f}%") if len(stats['lojas_falha_final']) == 0: print(f" Status: ✓ SUCESSO TOTAL") elif len(stats['lojas_falha_final']) < stats['total_lojas']: print(f" Status: ⚠ SUCESSO PARCIAL") else: print(f" Status: ✗ FALHA TOTAL") print("\n" + "=" * 70) print(" FIM DO RELATÓRIO") print("=" * 70)