import requests import json import pyodbc import time import threading from datetime import datetime, timedelta from typing import Dict, Optional class RGBTokenClient: """Cliente para obter token JWT da API do Grupo Boticário""" def __init__(self): self.base_url = "https://api.grupoboticario.com.br/global/v2/jwt-token/token" self.client_id = "88ymKwAUNfu06sD85i0RiokCxWGSkFBkx9ytgI5y1ZKxX3OQ" self.client_secret = "YDFz43qAzL6ApNIKVCxu3dAmS9GWOqJbcc2aPnFDkmEaBXexSpsHGfcItg56i2dE" # Configurações do banco de dados self.driver = self._get_available_sql_server_driver() self.connection_string = ( f'DRIVER={self.driver};' 'SERVER=10.77.77.10;' 'DATABASE=GINSENG;' 'UID=supginseng;' 'PWD=Iphone2513@;' 'PORT=1433;' 'TrustServerCertificate=yes;' 'Encrypt=yes' ) # Controle de token e renovação self.current_token = None self.token_expires_at = None self.renewal_threshold = 200 # Renovar quando restam 200 segundos self.running = False self.renewal_thread = None def _get_available_sql_server_driver(self) -> str: """ Detecta automaticamente o driver SQL Server disponível no sistema Returns: str: Nome do driver disponível """ # Lista de drivers SQL Server em ordem de preferência drivers_to_try = [ '{ODBC Driver 18 for SQL Server}', '{ODBC Driver 17 for SQL Server}', '{ODBC Driver 13 for SQL Server}', '{ODBC Driver 11 for SQL Server}', '{SQL Server Native Client 11.0}', '{SQL Server Native Client 10.0}', '{SQL Server}' ] available_drivers = pyodbc.drivers() print(f"Drivers ODBC disponíveis no sistema:") for driver in available_drivers: print(f" - {driver}") # Procurar o melhor driver disponível for preferred_driver in drivers_to_try: driver_name = preferred_driver.strip('{}') if driver_name in available_drivers: print(f"? Usando driver: {preferred_driver}") return preferred_driver # Se nenhum driver SQL Server for encontrado, usar o primeiro disponível if available_drivers: fallback_driver = f"{{{available_drivers[0]}}}" print(f"?? Nenhum driver SQL Server encontrado. Tentando com: {fallback_driver}") return fallback_driver # Se não há drivers disponíveis print("? Nenhum driver ODBC encontrado no sistema!") raise Exception("Nenhum driver ODBC disponível. Instale o Microsoft ODBC Driver for SQL Server.") def get_token(self) -> Optional[Dict]: """ Faz requisição para obter o token JWT Returns: Dict com a resposta da API ou None em caso de erro'' """ try: # Parâmetros da requisição params = { "grant_type": "client_credentials" } # Body da requisição com as credenciais data = { "client_id": self.client_id, "client_secret": self.client_secret } # Headers headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json" } print(f"Fazendo requisição para: {self.base_url}") print(f"Parâmetros: {params}") # Fazer a requisição POST response = requests.post( self.base_url, params=params, data=data, headers=headers, timeout=30 ) print(f"Status Code: {response.status_code}") # Verificar se a requisição foi bem-sucedida if response.status_code == 200: token_data = response.json() print("Token obtido com sucesso!") # Atualizar informações do token atual if "access_token" in token_data: self.current_token = token_data["access_token"] # Calcular quando o token expira if "expires_in" in token_data: expires_in = token_data["expires_in"] self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) print(f"Token expira em: {self.token_expires_at.strftime('%Y-%m-%d %H:%M:%S')}") return token_data else: print(f"Erro na requisição: {response.status_code}") print(f"Resposta: {response.text}") return None except requests.exceptions.RequestException as e: print(f"Erro na requisição: {e}") return None except json.JSONDecodeError as e: print(f"Erro ao decodificar JSON: {e}") return None except Exception as e: print(f"Erro inesperado: {e}") return None def insert_token_to_database(self, token: str) -> bool: """ Atualiza o token no banco de dados SQL Server (ID = 1) Args: token (str): Token a ser atualizado no banco Returns: bool: True se atualização foi bem-sucedida, False caso contrário """ try: print(f"Conectando ao banco de dados...") # Estabelecer conexão com o banco connection = pyodbc.connect(self.connection_string) cursor = connection.cursor() # Query de atualização para o ID 1 update_query = "UPDATE dbo.rgb_token SET token = ?, updatedAt = GETDATE() WHERE id = 1" print(f"Atualizando token no banco de dados (ID = 1)...") # Executar a atualização cursor.execute(update_query, token) rows_affected = cursor.rowcount connection.commit() if rows_affected > 0: print(f"Token atualizado com sucesso no banco de dados! ({rows_affected} registro(s) afetado(s))") else: print("Nenhum registro foi atualizado. Verifique se existe um registro com ID = 1.") # Fechar conexões cursor.close() connection.close() return rows_affected > 0 except pyodbc.Error as e: print(f"Erro de banco de dados: {e}") return False except Exception as e: print(f"Erro inesperado ao atualizar no banco: {e}") return False def get_token_and_save(self) -> bool: """ Obtém o token da API e salva no banco de dados Returns: bool: True se operação foi bem-sucedida, False caso contrário """ print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Obtendo novo token...") token_response = self.get_token() if token_response and "access_token" in token_response: access_token = token_response["access_token"] # Atualizar o token no banco de dados success = self.insert_token_to_database(access_token) if success: print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Token renovado e atualizado com sucesso!") return True else: print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao atualizar token no banco.") return False else: print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao obter token da API.") return False def _renewal_worker(self): """ Worker thread que monitora e renova o token automaticamente """ while self.running: try: if self.token_expires_at: # Calcular tempo restante até expiração now = datetime.now() time_until_expiry = (self.token_expires_at - now).total_seconds() # Se restam menos que o threshold, renovar o token if time_until_expiry <= self.renewal_threshold: print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token expira em {int(time_until_expiry)} segundos. Renovando...") success = self.get_token_and_save() if not success: print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ? Falha na renovação automática do token!") else: # Mostrar status a cada 60 segundos if int(time_until_expiry) % 60 == 0: print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token válido por mais {int(time_until_expiry)} segundos") # Verificar a cada segundo time.sleep(1) except Exception as e: print(f"Erro no worker de renovação: {e}") time.sleep(5) # Aguardar 5 segundos antes de tentar novamente def start_auto_renewal(self): """ Inicia o processo de renovação automática do token """ if not self.running: self.running = True self.renewal_thread = threading.Thread(target=self._renewal_worker, daemon=True) self.renewal_thread.start() print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática iniciado!") def stop_auto_renewal(self): """ Para o processo de renovação automática do token """ if self.running: self.running = False if self.renewal_thread: self.renewal_thread.join(timeout=2) print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática parado!") def main(): """Função principal com renovação automática do token""" client = RGBTokenClient() print("=== Cliente RGB Token com Renovação Automática ===") print("Obtendo token JWT da API do Grupo Boticário...\n") # Obter o primeiro token success = client.get_token_and_save() if success: print(f"\n? Token inicial obtido e salvo com sucesso!") print(f"Token atual: {client.current_token[:50] if client.current_token else 'N/A'}...") # Iniciar sistema de renovação automática client.start_auto_renewal() try: print(f"\n?? Sistema rodando... Pressione Ctrl+C para parar") print(f"?? Configurações:") print(f" - Renovação automática: {client.renewal_threshold} segundos antes da expiração") print(f" - Próxima expiração: {client.token_expires_at.strftime('%Y-%m-%d %H:%M:%S') if client.token_expires_at else 'N/A'}") print(f"\n" + "="*60) # Loop principal - manter o programa rodando while True: time.sleep(1) except KeyboardInterrupt: print(f"\n\n?? Interrupção detectada. Parando sistema...") client.stop_auto_renewal() print("Sistema finalizado com sucesso!") else: print("\n? Falha ao obter o token inicial. Verifique as configurações.") if __name__ == "__main__": main()