This commit is contained in:
daniel.rodrigues 2025-10-31 11:28:12 -03:00
parent fbf60fed57
commit 290219632d

View File

@ -1,14 +1,11 @@
import requests
import json
import pyodbc
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, Optional
class RGBTokenClient:
"""Cliente para obter token JWT da API do Grupo Boticário"""
"""Cliente para obter token JWT da API do Grupo Boticário (execução única)"""
def __init__(self):
self.base_url = "https://api.grupoboticario.com.br/global/v2/jwt-token/token"
@ -28,21 +25,8 @@ class RGBTokenClient:
'Encrypt=yes'
)
# Controle de token e renovação
self.current_token = None
self.token_expires_at = None
self.renewal_threshold = 200 # Renovar quando restam 200 segundos
self.running = False
self.renewal_thread = None
def _get_available_sql_server_driver(self) -> str:
"""
Detecta automaticamente o driver SQL Server disponível no sistema
Returns:
str: Nome do driver disponível
"""
# Lista de drivers SQL Server em ordem de preferência
"""Detecta automaticamente o driver SQL Server disponível"""
drivers_to_try = [
'{ODBC Driver 18 for SQL Server}',
'{ODBC Driver 17 for SQL Server}',
@ -54,257 +38,97 @@ class RGBTokenClient:
]
available_drivers = pyodbc.drivers()
print(f"Drivers ODBC disponíveis no sistema:")
for driver in available_drivers:
print(f" - {driver}")
print("Drivers ODBC disponíveis:")
for d in available_drivers:
print(f" - {d}")
# Procurar o melhor driver disponível
for preferred_driver in drivers_to_try:
driver_name = preferred_driver.strip('{}')
if driver_name in available_drivers:
print(f"? Usando driver: {preferred_driver}")
return preferred_driver
for preferred in drivers_to_try:
if preferred.strip("{}") in available_drivers:
print(f"✅ Usando driver: {preferred}")
return preferred
# Se nenhum driver SQL Server for encontrado, usar o primeiro disponível
if available_drivers:
fallback_driver = f"{{{available_drivers[0]}}}"
print(f"?? Nenhum driver SQL Server encontrado. Tentando com: {fallback_driver}")
return fallback_driver
fallback = f"{{{available_drivers[0]}}}"
print(f"⚠️ Nenhum driver padrão encontrado. Usando: {fallback}")
return fallback
# Se não há drivers disponíveis
print("? Nenhum driver ODBC encontrado no sistema!")
raise Exception("Nenhum driver ODBC disponível. Instale o Microsoft ODBC Driver for SQL Server.")
raise Exception("Nenhum driver ODBC encontrado no sistema.")
def get_token(self) -> Optional[Dict]:
"""
Faz requisição para obter o token JWT
Returns:
Dict com a resposta da API ou None em caso de erro''
"""
def get_token(self):
"""Obtém o token JWT da API"""
try:
# Parâmetros da requisição
params = {
"grant_type": "client_credentials"
}
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Solicitando novo token...")
# Body da requisição com as credenciais
data = {
"client_id": self.client_id,
"client_secret": self.client_secret
}
# Headers
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
print(f"Fazendo requisição para: {self.base_url}")
print(f"Parâmetros: {params}")
# Fazer a requisição POST
response = requests.post(
self.base_url,
params=params,
data=data,
headers=headers,
params={"grant_type": "client_credentials"},
data={
"client_id": self.client_id,
"client_secret": self.client_secret
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
},
timeout=30
)
print(f"Status Code: {response.status_code}")
# Verificar se a requisição foi bem-sucedida
if response.status_code == 200:
token_data = response.json()
print("Token obtido com sucesso!")
data = response.json()
token = data.get("access_token")
expires_in = data.get("expires_in", 0)
expira = datetime.now() + timedelta(seconds=expires_in)
# Atualizar informações do token atual
if "access_token" in token_data:
self.current_token = token_data["access_token"]
# Calcular quando o token expira
if "expires_in" in token_data:
expires_in = token_data["expires_in"]
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in)
print(f"Token expira em: {self.token_expires_at.strftime('%Y-%m-%d %H:%M:%S')}")
return token_data
print(f"✅ Token obtido com sucesso! Expira em: {expira}")
return token
else:
print(f"Erro na requisição: {response.status_code}")
print(f"❌ Erro na requisição: {response.status_code}")
print(f"Resposta: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Erro na requisição: {e}")
return None
except json.JSONDecodeError as e:
print(f"Erro ao decodificar JSON: {e}")
return None
except Exception as e:
print(f"Erro inesperado: {e}")
print(f"❌ Erro ao obter token: {e}")
return None
def insert_token_to_database(self, token: str) -> bool:
"""
Atualiza o token no banco de dados SQL Server (ID = 1)
Args:
token (str): Token a ser atualizado no banco
Returns:
bool: True se atualização foi bem-sucedida, False caso contrário
"""
def save_token(self, token: str) -> bool:
"""Atualiza o token no banco de dados"""
try:
print(f"Conectando ao banco de dados...")
print("Conectando ao banco de dados...")
conn = pyodbc.connect(self.connection_string)
cursor = conn.cursor()
# Estabelecer conexão com o banco
connection = pyodbc.connect(self.connection_string)
cursor = connection.cursor()
query = "UPDATE dbo.rgb_token SET token = ?, updatedAt = GETDATE() WHERE id = 1"
cursor.execute(query, token)
conn.commit()
# Query de atualização para o ID 1
update_query = "UPDATE dbo.rgb_token SET token = ?, updatedAt = GETDATE() WHERE id = 1"
print(f"Atualizando token no banco de dados (ID = 1)...")
# Executar a atualização
cursor.execute(update_query, token)
rows_affected = cursor.rowcount
connection.commit()
if rows_affected > 0:
print(f"Token atualizado com sucesso no banco de dados! ({rows_affected} registro(s) afetado(s))")
else:
print("Nenhum registro foi atualizado. Verifique se existe um registro com ID = 1.")
# Fechar conexões
rows = cursor.rowcount
cursor.close()
connection.close()
conn.close()
return rows_affected > 0
except pyodbc.Error as e:
print(f"Erro de banco de dados: {e}")
return False
except Exception as e:
print(f"Erro inesperado ao atualizar no banco: {e}")
return False
def get_token_and_save(self) -> bool:
"""
Obtém o token da API e salva no banco de dados
Returns:
bool: True se operação foi bem-sucedida, False caso contrário
"""
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Obtendo novo token...")
token_response = self.get_token()
if token_response and "access_token" in token_response:
access_token = token_response["access_token"]
# Atualizar o token no banco de dados
success = self.insert_token_to_database(access_token)
if success:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Token renovado e atualizado com sucesso!")
if rows > 0:
print(f"✅ Token atualizado com sucesso no banco ({rows} registro(s)).")
return True
else:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao atualizar token no banco.")
print("⚠️ Nenhum registro atualizado (verifique o ID = 1).")
return False
else:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao obter token da API.")
except Exception as e:
print(f"❌ Erro ao salvar token no banco: {e}")
return False
def _renewal_worker(self):
"""
Worker thread que monitora e renova o token automaticamente
"""
while self.running:
try:
if self.token_expires_at:
# Calcular tempo restante até expiração
now = datetime.now()
time_until_expiry = (self.token_expires_at - now).total_seconds()
# Se restam menos que o threshold, renovar o token
if time_until_expiry <= self.renewal_threshold:
print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token expira em {int(time_until_expiry)} segundos. Renovando...")
success = self.get_token_and_save()
if not success:
print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ? Falha na renovação automática do token!")
else:
# Mostrar status a cada 60 segundos
if int(time_until_expiry) % 60 == 0:
print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token válido por mais {int(time_until_expiry)} segundos")
# Verificar a cada segundo
time.sleep(1)
except Exception as e:
print(f"Erro no worker de renovação: {e}")
time.sleep(5) # Aguardar 5 segundos antes de tentar novamente
def start_auto_renewal(self):
"""
Inicia o processo de renovação automática do token
"""
if not self.running:
self.running = True
self.renewal_thread = threading.Thread(target=self._renewal_worker, daemon=True)
self.renewal_thread.start()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática iniciado!")
def stop_auto_renewal(self):
"""
Para o processo de renovação automática do token
"""
if self.running:
self.running = False
if self.renewal_thread:
self.renewal_thread.join(timeout=2)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática parado!")
def main():
"""Função principal com renovação automática do token"""
client = RGBTokenClient()
token = client.get_token()
print("=== Cliente RGB Token com Renovação Automática ===")
print("Obtendo token JWT da API do Grupo Boticário...\n")
# Obter o primeiro token
success = client.get_token_and_save()
if success:
print(f"\n? Token inicial obtido e salvo com sucesso!")
print(f"Token atual: {client.current_token[:50] if client.current_token else 'N/A'}...")
# Iniciar sistema de renovação automática
client.start_auto_renewal()
try:
print(f"\n?? Sistema rodando... Pressione Ctrl+C para parar")
print(f"?? Configurações:")
print(f" - Renovação automática: {client.renewal_threshold} segundos antes da expiração")
print(f" - Próxima expiração: {client.token_expires_at.strftime('%Y-%m-%d %H:%M:%S') if client.token_expires_at else 'N/A'}")
print(f"\n" + "="*60)
# Loop principal - manter o programa rodando
while True:
time.sleep(1)
except KeyboardInterrupt:
print(f"\n\n?? Interrupção detectada. Parando sistema...")
client.stop_auto_renewal()
print("Sistema finalizado com sucesso!")
if token:
client.save_token(token)
print("🎯 Execução finalizada com sucesso.")
else:
print("\n? Falha ao obter o token inicial. Verifique as configurações.")
print("❌ Falha ao obter ou salvar o token.")
if __name__ == "__main__":
main()
main()