G-Scripts/ruptura.py
daniel.rodrigues 0ac2401d76 att
2026-02-11 12:08:26 -03:00

603 lines
21 KiB
Python

import requests
import json
from datetime import datetime, timedelta
import time
import pyodbc
import pandas as pd
import os
from io import BytesIO
# ==============================
# 1) Buscar token dinâmico
# ==============================
def get_token():
url = "https://api.grupoginseng.com.br/api/tokens"
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Erro ao buscar token: {response.status_code} {response.text}")
data = response.json()
# Extrair o token dentro de data[0]["token"]
token = data["data"][0]["token"]
return token
# ==============================
# 2) Buscar cycles
# ==============================
def get_cycles(token):
url = "https://api-extranet.grupoboticario.digital/api/v2/cycles"
headers = {
"accept": "application/json, text/plain, */*",
"authorization": token,
"user-agent": "Mozilla/5.0"
}
response = requests.get(url, headers=headers)
return response.json()
# ==============================
# 3) Determinar ciclo atual formatado para cada marca
# ==============================
def ciclo_formatado_por_marca(cycles_json):
hoje = datetime.now()
ano = hoje.year
ignorar = [ "VD Multimarca"] # Marcas excluídas
MAPA_NOMES = {
"O Boticário": "BOT",
"O.U.I": "OUI",
"Quem Disse Berenice": "QDB",
"Eudora": "EUD"
}
resultado = {}
for brand in cycles_json["data"]:
nome_marca = brand["brandName"]
# Pula marcas indesejadas
if nome_marca in ignorar:
continue
ciclo_formatado = None
for cycle in brand["cycles"]:
start = datetime.fromisoformat(cycle["startDate"])
end = datetime.fromisoformat(cycle["endDate"])
if cycle["isCurrent"] or (start <= hoje <= end):
ciclo_num = f"{cycle['number']:02d}"
ciclo_formatado = int(f"{ano}{ciclo_num}")
break
nome_final = MAPA_NOMES.get(nome_marca, nome_marca) # usa abreviação
resultado[nome_final] = ciclo_formatado
return resultado
# ==============================
# 4) Fazer a requisição do EXPORT
# ==============================
def export_with_token(token, business_unit, cycle):
"""
Faz a requisição de exportação para uma marca específica.
Args:
token: Token de autenticação
business_unit: Unidade de negócio (BOT, OUI, QDB)
cycle: Ciclo formatado (ex: 202516)
Returns:
ID da requisição se sucesso, None caso contrário
"""
url = "https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/orders-bff/api/export/RUPTURE_INDICATOR_PAGE"
headers = {
"accept": "*/*",
"accept-language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
"authorization": token,
"content-type": "application/json",
"origin": "https://extranet.grupoboticario.com.br",
"priority": "u=1, i",
"referer": "https://extranet.grupoboticario.com.br/",
"sec-ch-ua": "\"Chromium\";v=\"142\", \"Google Chrome\";v=\"142\", \"Not_A Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"x-correlation-id": "bd6f2e2d-c7ef-48d0-a0f5-e40de830442b",
"x-user-id": "163165",
"x-username": "daniel.rodrigue"
}
payload = {
"storeCodes": [],
"cpId": 10269,
"fileType": "XLSX",
"userId": "163165",
"metadata": {
"storeCodes": [],
"userName": "Daniel Jose Medeiros Rodrigues",
"fileFormattedName": f"Indicadore de Ruptura {business_unit} {datetime.now().strftime('%d-%m-%Y')}.XLSX",
"create_at": datetime.now().isoformat() + "Z",
"exportType": "RUPTURE_INDICATOR_PAGE"
},
"filters": {
"cycles": [cycle],
"businessUnit": business_unit
},
"validateOnRequest": True
}
print(f"\n{'='*50}")
print(f"Exportando: {business_unit} - Ciclo {cycle}")
print(f"{'='*50}")
response = requests.post(url, headers=headers, json=payload)
print("Status:", response.status_code)
print("Resposta:")
print(response.text)
# Retornar o ID da requisição se a exportação foi criada
if response.status_code == 201:
response_data = response.json()
return response_data.get("id")
return None
# ==============================
# 5) Verificar status da exportação
# ==============================
def check_export_status(token, request_id, max_attempts=300, interval=5):
"""
Verifica o status da exportação periodicamente até que seja concluída.
Args:
token: Token de autenticação
request_id: ID da requisição de exportação
max_attempts: Número máximo de tentativas (padrão: 60 = 5 minutos)
interval: Intervalo entre verificações em segundos (padrão: 5)
Returns:
True se a exportação foi concluída com sucesso, False caso contrário
"""
url = "https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/orders-bff/api/export/RUPTURE_INDICATOR_PAGE/sidebar?userId=163165&viewed=true"
headers = {
"accept": "*/*",
"accept-language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
"authorization": token,
"content-type": "application/json",
"origin": "https://extranet.grupoboticario.com.br",
"priority": "u=1, i",
"referer": "https://extranet.grupoboticario.com.br/",
"sec-ch-ua": "\"Chromium\";v=\"142\", \"Google Chrome\";v=\"142\", \"Not_A Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"x-correlation-id": "788106b9-3985-44a5-aba6-1e0f2d1eff6c",
"x-user-id": "163165",
"x-username": "daniel.rodrigue"
}
print("\n" + "="*50)
print("Verificando status da exportação...")
print("="*50)
attempt = 0
while attempt < max_attempts:
attempt += 1
response = requests.get(url, headers=headers)
print(f"\n[Tentativa {attempt}/{max_attempts}] Status da verificação: {response.status_code}")
if response.status_code == 200:
data = response.json()
items = data.get("items", [])
# Procurar pelo requestId na lista
for item in items:
if item.get("requestId") == request_id:
status = item.get('status')
print(f"✓ Exportação encontrada!")
print(f" Request ID: {item.get('requestId')}")
print(f" Status: {status}")
print(f" Nome do arquivo: {item.get('fileName', 'N/A')}")
print(f" Mensagem: {item.get('message', 'N/A')}")
if status == 'SUCCESS':
print(f"\n{'='*50}")
print(f"✓✓ EXPORTAÇÃO CONCLUÍDA COM SUCESSO! ✓✓")
print(f"{'='*50}")
# Retornar o nome do arquivo junto com o sucesso
file_name = item.get('fileName', 'arquivo_exportado.csv')
return True, file_name
elif status == 'FAILED' or status == 'ERROR':
print(f"\n{'='*50}")
print(f"✗✗ EXPORTAÇÃO FALHOU! ✗✗")
print(f"{'='*50}")
return False, None
else:
print(f"⚠ Exportação ainda em processamento...")
print(f"Aguardando {interval} segundos para próxima verificação...")
time.sleep(interval)
break
else:
# Se não encontrou o item na lista
print(f"✗ Exportação com ID '{request_id}' não encontrada na lista.")
print(f"Total de itens na lista: {len(items)}")
print(f"Aguardando {interval} segundos para próxima verificação...")
time.sleep(interval)
else:
print(f"Erro ao verificar status: {response.text}")
print(f"Aguardando {interval} segundos para próxima verificação...")
time.sleep(interval)
print(f"\n{'='*50}")
print(f"✗ Tempo limite excedido após {max_attempts} tentativas.")
print(f"{'='*50}")
return False, None
# ==============================
# 6) Conectar ao banco de dados
# ==============================
def get_db_connection():
"""
Cria e retorna uma conexão com o banco de dados SQL Server.
Returns:
Conexão pyodbc
"""
conn = pyodbc.connect(
'DRIVER={ODBC Driver 18 for SQL Server};'
'SERVER=10.77.77.10;'
'DATABASE=GINSENG;'
'UID=supginseng;'
'PWD=Ginseng@;'
'PORT=1433;'
'TrustServerCertificate=yes'
)
return conn
# ==============================
# 7) Baixar arquivo e inserir no banco
# ==============================
def download_and_insert_to_db(token, request_id, file_name, business_unit, cycle):
"""
Faz o download do arquivo exportado e insere os dados no banco de dados.
Remove dados existentes para a mesma data, marca e ciclo antes de inserir.
Args:
token: Token de autenticação
request_id: ID da requisição de exportação
file_name: Nome do arquivo (usado apenas para referência)
business_unit: Unidade de negócio (BOT, OUI, QDB)
cycle: Ciclo formatado (ex: 202516)
Returns:
True se o download e inserção foram bem-sucedidos, False caso contrário
"""
url = f"https://mar-api-gateway-front.demanda-abastecimento.grupoboticario.digital/orders-bff/api/export/{request_id}/download?redirect=false"
headers = {
"accept": "*/*",
"accept-language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
"authorization": token,
"origin": "https://extranet.grupoboticario.com.br",
"priority": "u=1, i",
"referer": "https://extranet.grupoboticario.com.br/",
"sec-ch-ua": "\"Chromium\";v=\"142\", \"Google Chrome\";v=\"142\", \"Not_A Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"x-correlation-id": "788106b9-3985-44a5-aba6-1e0f2d1eff6c",
"x-user-id": "163165",
"x-username": "daniel.rodrigue"
}
print("\n" + "="*50)
print("Iniciando download e inserção no banco...")
print("="*50)
print(f"Request ID: {request_id}")
print(f"Arquivo: {file_name}")
try:
# Primeira requisição: obter a URL do S3
response = requests.get(url, headers=headers)
print(f"Status da requisição: {response.status_code}")
if response.status_code == 200:
# Parsear o JSON para obter a URL do arquivo
data = response.json()
file_url = data.get("fileUrl")
expire_in = data.get("expireIn")
if not file_url:
print(f"\n{'='*50}")
print(f"✗✗ URL DO ARQUIVO NÃO ENCONTRADA! ✗✗")
print(f"{'='*50}")
print(f"Resposta: {response.text}")
return False
print(f"URL do arquivo obtida com sucesso!")
print(f"Expira em: {expire_in}")
print(f"\nBaixando arquivo do S3...")
# Segunda requisição: baixar o arquivo do S3
file_response = requests.get(file_url)
print(f"Status do download: {file_response.status_code}")
if file_response.status_code == 200:
print(f"✓ Arquivo baixado com sucesso!")
print(f"\nLendo arquivo Excel...")
# Ler o arquivo Excel diretamente da memória
excel_data = BytesIO(file_response.content)
df = pd.read_excel(excel_data, engine='openpyxl')
print(f"✓ Arquivo lido com sucesso!")
print(f"Total de linhas: {len(df)}")
# Mapeamento das colunas do Excel para as colunas do banco
column_mapping = {
'Ciclo': 'Ciclo',
'Ponto de Venda': 'PontoDeVenda',
'Canal': 'Canal',
'SKU': 'SKU',
'Descrição': 'Descricao',
'Categoria': 'Categoria',
'Marca': 'Marca',
'Classe': 'Classe',
'Valor da Receita (R$)': 'ValorReceita',
'Valor da Ruptura (R$)': 'ValorRuptura',
'Percentual da Ruptura (%)': 'PercentualRuptura',
'Quantidade de Ruptura': 'QuantidadeRuptura',
'Macro Causa': 'MacroCausa',
'Origem Ruptura': 'OrigemRuptura'
}
# Renomear as colunas
df.rename(columns=column_mapping, inplace=True)
print(f"\nConectando ao banco de dados...")
conn = get_db_connection()
cursor = conn.cursor()
print(f"✓ Conectado ao banco de dados!")
# Calcular a data de ruptura (dia anterior)
dt_ruptura = (datetime.now() - timedelta(days=1)).date()
print(f"Data de ruptura: {dt_ruptura}")
# Deletar dados existentes para a mesma data, marca e ciclo
print(f"\nVerificando dados existentes para {business_unit} - Ciclo {cycle} em {dt_ruptura}...")
delete_query = """
DELETE FROM RupturaG
WHERE dt_ruptura = ? AND Marca = ? AND Ciclo = ?
"""
cursor.execute(delete_query, dt_ruptura, business_unit, cycle)
rows_deleted = cursor.rowcount
conn.commit()
if rows_deleted > 0:
print(f"{rows_deleted} registros anteriores removidos para {business_unit} - Ciclo {cycle} em {dt_ruptura}")
else:
print(f"✓ Nenhum registro anterior encontrado para {business_unit} - Ciclo {cycle} em {dt_ruptura}")
print(f"\nInserindo dados na tabela RupturaG...")
# Inserir os dados linha por linha
insert_query = """
INSERT INTO RupturaG (
Ciclo, PontoDeVenda, Canal, SKU, Descricao, Categoria,
Marca, Classe, ValorReceita, ValorRuptura, PercentualRuptura,
QuantidadeRuptura, MacroCausa, OrigemRuptura, dt_ruptura
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
rows_inserted = 0
for index, row in df.iterrows():
try:
cursor.execute(insert_query,
row['Ciclo'],
row['PontoDeVenda'],
row['Canal'],
row['SKU'],
row['Descricao'],
row['Categoria'],
row['Marca'],
row['Classe'],
row['ValorReceita'],
row['ValorRuptura'],
row['PercentualRuptura'],
row['QuantidadeRuptura'],
row['MacroCausa'],
row['OrigemRuptura'],
dt_ruptura
)
rows_inserted += 1
# Commit a cada 1000 linhas para melhor performance
if rows_inserted % 1000 == 0:
conn.commit()
print(f"{rows_inserted} linhas inseridas...")
except Exception as e:
print(f" ✗ Erro ao inserir linha {index}: {str(e)}")
continue
# Commit final
conn.commit()
cursor.close()
conn.close()
print(f"\n{'='*50}")
print(f"✓✓ DADOS INSERIDOS NO BANCO COM SUCESSO! ✓✓")
print(f"{'='*50}")
print(f"Total de linhas inseridas: {rows_inserted}/{len(df)}")
return True
else:
print(f"\n{'='*50}")
print(f"✗✗ ERRO AO BAIXAR ARQUIVO DO S3! ✗✗")
print(f"{'='*50}")
print(f"Status: {file_response.status_code}")
return False
else:
print(f"\n{'='*50}")
print(f"✗✗ ERRO AO OBTER URL DO ARQUIVO! ✗✗")
print(f"{'='*50}")
print(f"Resposta: {response.text}")
return False
except Exception as e:
print(f"\n{'='*50}")
print(f"✗✗ ERRO AO PROCESSAR ARQUIVO! ✗✗")
print(f"{'='*50}")
print(f"Erro: {str(e)}")
import traceback
traceback.print_exc()
return False
# ==============================
# 7) Processar exportação de uma marca
# ==============================
def processar_marca(token, business_unit, cycle):
"""
Processa a exportação completa de uma marca (exportar, verificar, baixar).
Args:
token: Token de autenticação
business_unit: Unidade de negócio (BOT, OUI, QDB)
cycle: Ciclo formatado (ex: 202516)
Returns:
True se todo o processo foi bem-sucedido, False caso contrário
"""
print(f"\n{'#'*300}")
print(f"# PROCESSANDO MARCA: {business_unit} - CICLO: {cycle}")
print(f"{'#'*300}")
# 1. Enviar requisição de exportação
request_id = export_with_token(token, business_unit, cycle)
if not request_id:
print(f"\n✗ Não foi possível obter o ID da requisição para {business_unit}.")
return False
print(f"\nID da requisição gerado: {request_id}")
# 2. Verificar o status da exportação
success, file_name = check_export_status(token, request_id)
if not success:
print(f"\n✗ Exportação não foi concluída com sucesso para {business_unit}.")
return False
if not file_name:
print(f"\n⚠ Exportação concluída, mas nome do arquivo não disponível para {business_unit}.")
return False
# 3. Fazer o download do arquivo e inserir no banco
download_success = download_and_insert_to_db(token, request_id, file_name, business_unit, cycle)
if download_success:
print(f"\n{'='*60}")
print(f"✓✓✓ MARCA {business_unit} PROCESSADA COM SUCESSO! ✓✓✓")
print(f"{'='*60}")
return True
else:
print(f"\n✗ Erro ao processar arquivo para {business_unit}.")
return False
# ==============================
# EXECUTAR
# ==============================
if __name__ == "__main__":
print("="*60)
print("INICIANDO PROCESSO DE EXPORTAÇÃO DE RUPTURAS")
print("="*60)
# 1. Buscar token
print("\n[1/3] Buscando token...")
token = get_token()
print("✓ Token obtido com sucesso!")
# 2. Buscar ciclos de todas as marcas
print("\n[2/3] Buscando ciclos das marcas...")
cycles_json = get_cycles(token)
ciclos_por_marca = ciclo_formatado_por_marca(cycles_json)
print("✓ Ciclos obtidos:")
for marca, ciclo in ciclos_por_marca.items():
print(f" - {marca}: {ciclo}")
# 3. Processar cada marca sequencialmente
print("\n[3/3] Processando exportações...")
resultados = {}
for business_unit, cycle in ciclos_por_marca.items():
if cycle is None:
print(f"\n⚠ Ciclo não encontrado para {business_unit}, pulando...")
resultados[business_unit] = False
continue
sucesso = processar_marca(token, business_unit, cycle)
resultados[business_unit] = sucesso
# Pequena pausa entre marcas para não sobrecarregar a API
if business_unit != list(ciclos_por_marca.keys())[-1]: # Se não for a última marca
print(f"\n{'~'*60}")
print("Aguardando 3 segundos antes de processar próxima marca...")
print(f"{'~'*60}")
time.sleep(3)
# Resumo final
print(f"\n{'='*60}")
print("RESUMO FINAL")
print(f"{'='*60}")
for marca, sucesso in resultados.items():
status = "✓ SUCESSO" if sucesso else "✗ FALHOU"
print(f"{marca}: {status}")
total_sucesso = sum(1 for s in resultados.values() if s)
total_marcas = len(resultados)
print(f"\nTotal: {total_sucesso}/{total_marcas} marcas processadas com sucesso")
print(f"{'='*60}")