commit 5abb7018476d2f5eeb0a6c7a5dea7abe367cc92b Author: daniel.rodrigues Date: Thu Oct 30 16:58:42 2025 -0300 Primeiro commit diff --git a/rgb_token_client.py b/rgb_token_client.py new file mode 100644 index 0000000..8be1f4b --- /dev/null +++ b/rgb_token_client.py @@ -0,0 +1,310 @@ +import requests +import json +import pyodbc +import time +import threading +from datetime import datetime, timedelta +from typing import Dict, Optional + + +class RGBTokenClient: + """Cliente para obter token JWT da API do Grupo Boticário""" + + def __init__(self): + self.base_url = "https://api.grupoboticario.com.br/global/v2/jwt-token/token" + self.client_id = "88ymKwAUNfu06sD85i0RiokCxWGSkFBkx9ytgI5y1ZKxX3OQ" + self.client_secret = "YDFz43qAzL6ApNIKVCxu3dAmS9GWOqJbcc2aPnFDkmEaBXexSpsHGfcItg56i2dE" + + # Configurações do banco de dados + self.driver = self._get_available_sql_server_driver() + self.connection_string = ( + f'DRIVER={self.driver};' + 'SERVER=10.77.77.10;' + 'DATABASE=GINSENG;' + 'UID=supginseng;' + 'PWD=Iphone2513@;' + 'PORT=1433;' + 'TrustServerCertificate=yes;' + 'Encrypt=yes' + ) + + # Controle de token e renovação + self.current_token = None + self.token_expires_at = None + self.renewal_threshold = 200 # Renovar quando restam 200 segundos + self.running = False + self.renewal_thread = None + + def _get_available_sql_server_driver(self) -> str: + """ + Detecta automaticamente o driver SQL Server disponível no sistema + + Returns: + str: Nome do driver disponível + """ + # Lista de drivers SQL Server em ordem de preferência + drivers_to_try = [ + '{ODBC Driver 18 for SQL Server}', + '{ODBC Driver 17 for SQL Server}', + '{ODBC Driver 13 for SQL Server}', + '{ODBC Driver 11 for SQL Server}', + '{SQL Server Native Client 11.0}', + '{SQL Server Native Client 10.0}', + '{SQL Server}' + ] + + available_drivers = pyodbc.drivers() + print(f"Drivers ODBC disponíveis no sistema:") + for driver in available_drivers: + print(f" - {driver}") + + # Procurar o melhor driver disponível + for preferred_driver in drivers_to_try: + driver_name = preferred_driver.strip('{}') + if driver_name in available_drivers: + print(f"? Usando driver: {preferred_driver}") + return preferred_driver + + # Se nenhum driver SQL Server for encontrado, usar o primeiro disponível + if available_drivers: + fallback_driver = f"{{{available_drivers[0]}}}" + print(f"?? Nenhum driver SQL Server encontrado. Tentando com: {fallback_driver}") + return fallback_driver + + # Se não há drivers disponíveis + print("? Nenhum driver ODBC encontrado no sistema!") + raise Exception("Nenhum driver ODBC disponível. Instale o Microsoft ODBC Driver for SQL Server.") + + def get_token(self) -> Optional[Dict]: + """ + Faz requisição para obter o token JWT + + Returns: + Dict com a resposta da API ou None em caso de erro'' + """ + try: + # Parâmetros da requisição + params = { + "grant_type": "client_credentials" + } + + # Body da requisição com as credenciais + data = { + "client_id": self.client_id, + "client_secret": self.client_secret + } + + # Headers + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json" + } + + print(f"Fazendo requisição para: {self.base_url}") + print(f"Parâmetros: {params}") + + # Fazer a requisição POST + response = requests.post( + self.base_url, + params=params, + data=data, + headers=headers, + timeout=30 + ) + + print(f"Status Code: {response.status_code}") + + # Verificar se a requisição foi bem-sucedida + if response.status_code == 200: + token_data = response.json() + print("Token obtido com sucesso!") + + # Atualizar informações do token atual + if "access_token" in token_data: + self.current_token = token_data["access_token"] + + # Calcular quando o token expira + if "expires_in" in token_data: + expires_in = token_data["expires_in"] + self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) + print(f"Token expira em: {self.token_expires_at.strftime('%Y-%m-%d %H:%M:%S')}") + + return token_data + else: + print(f"Erro na requisição: {response.status_code}") + print(f"Resposta: {response.text}") + return None + + except requests.exceptions.RequestException as e: + print(f"Erro na requisição: {e}") + return None + except json.JSONDecodeError as e: + print(f"Erro ao decodificar JSON: {e}") + return None + except Exception as e: + print(f"Erro inesperado: {e}") + return None + + def insert_token_to_database(self, token: str) -> bool: + """ + Atualiza o token no banco de dados SQL Server (ID = 1) + + Args: + token (str): Token a ser atualizado no banco + + Returns: + bool: True se atualização foi bem-sucedida, False caso contrário + """ + try: + print(f"Conectando ao banco de dados...") + + # Estabelecer conexão com o banco + connection = pyodbc.connect(self.connection_string) + cursor = connection.cursor() + + # Query de atualização para o ID 1 + update_query = "UPDATE dbo.rgb_token SET token = ?, updatedAt = GETDATE() WHERE id = 1" + + print(f"Atualizando token no banco de dados (ID = 1)...") + + # Executar a atualização + cursor.execute(update_query, token) + rows_affected = cursor.rowcount + connection.commit() + + if rows_affected > 0: + print(f"Token atualizado com sucesso no banco de dados! ({rows_affected} registro(s) afetado(s))") + else: + print("Nenhum registro foi atualizado. Verifique se existe um registro com ID = 1.") + + # Fechar conexões + cursor.close() + connection.close() + + return rows_affected > 0 + + except pyodbc.Error as e: + print(f"Erro de banco de dados: {e}") + return False + except Exception as e: + print(f"Erro inesperado ao atualizar no banco: {e}") + return False + + def get_token_and_save(self) -> bool: + """ + Obtém o token da API e salva no banco de dados + + Returns: + bool: True se operação foi bem-sucedida, False caso contrário + """ + print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Obtendo novo token...") + + token_response = self.get_token() + + if token_response and "access_token" in token_response: + access_token = token_response["access_token"] + + # Atualizar o token no banco de dados + success = self.insert_token_to_database(access_token) + + if success: + print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Token renovado e atualizado com sucesso!") + return True + else: + print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao atualizar token no banco.") + return False + else: + print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao obter token da API.") + return False + + def _renewal_worker(self): + """ + Worker thread que monitora e renova o token automaticamente + """ + while self.running: + try: + if self.token_expires_at: + # Calcular tempo restante até expiração + now = datetime.now() + time_until_expiry = (self.token_expires_at - now).total_seconds() + + # Se restam menos que o threshold, renovar o token + if time_until_expiry <= self.renewal_threshold: + print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token expira em {int(time_until_expiry)} segundos. Renovando...") + + success = self.get_token_and_save() + + if not success: + print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ? Falha na renovação automática do token!") + else: + # Mostrar status a cada 60 segundos + if int(time_until_expiry) % 60 == 0: + print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token válido por mais {int(time_until_expiry)} segundos") + + # Verificar a cada segundo + time.sleep(1) + + except Exception as e: + print(f"Erro no worker de renovação: {e}") + time.sleep(5) # Aguardar 5 segundos antes de tentar novamente + + def start_auto_renewal(self): + """ + Inicia o processo de renovação automática do token + """ + if not self.running: + self.running = True + self.renewal_thread = threading.Thread(target=self._renewal_worker, daemon=True) + self.renewal_thread.start() + print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática iniciado!") + + def stop_auto_renewal(self): + """ + Para o processo de renovação automática do token + """ + if self.running: + self.running = False + if self.renewal_thread: + self.renewal_thread.join(timeout=2) + print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática parado!") + + +def main(): + """Função principal com renovação automática do token""" + client = RGBTokenClient() + + print("=== Cliente RGB Token com Renovação Automática ===") + print("Obtendo token JWT da API do Grupo Boticário...\n") + + # Obter o primeiro token + success = client.get_token_and_save() + + if success: + print(f"\n? Token inicial obtido e salvo com sucesso!") + print(f"Token atual: {client.current_token[:50] if client.current_token else 'N/A'}...") + + # Iniciar sistema de renovação automática + client.start_auto_renewal() + + try: + print(f"\n?? Sistema rodando... Pressione Ctrl+C para parar") + print(f"?? Configurações:") + print(f" - Renovação automática: {client.renewal_threshold} segundos antes da expiração") + print(f" - Próxima expiração: {client.token_expires_at.strftime('%Y-%m-%d %H:%M:%S') if client.token_expires_at else 'N/A'}") + print(f"\n" + "="*60) + + # Loop principal - manter o programa rodando + while True: + time.sleep(1) + + except KeyboardInterrupt: + print(f"\n\n?? Interrupção detectada. Parando sistema...") + client.stop_auto_renewal() + print("Sistema finalizado com sucesso!") + + else: + print("\n? Falha ao obter o token inicial. Verifique as configurações.") + + +if __name__ == "__main__": + main() \ No newline at end of file