From 290219632d93e53f01b0e7c73016016605fb6a99 Mon Sep 17 00:00:00 2001 From: "daniel.rodrigues" Date: Fri, 31 Oct 2025 11:28:12 -0300 Subject: [PATCH] att --- rgb_token_client.py | 286 +++++++++----------------------------------- 1 file changed, 55 insertions(+), 231 deletions(-) diff --git a/rgb_token_client.py b/rgb_token_client.py index 8be1f4b..f04960b 100644 --- a/rgb_token_client.py +++ b/rgb_token_client.py @@ -1,14 +1,11 @@ import requests import json import pyodbc -import time -import threading from datetime import datetime, timedelta -from typing import Dict, Optional class RGBTokenClient: - """Cliente para obter token JWT da API do Grupo Boticário""" + """Cliente para obter token JWT da API do Grupo Boticário (execução única)""" def __init__(self): self.base_url = "https://api.grupoboticario.com.br/global/v2/jwt-token/token" @@ -28,21 +25,8 @@ class RGBTokenClient: 'Encrypt=yes' ) - # Controle de token e renovação - self.current_token = None - self.token_expires_at = None - self.renewal_threshold = 200 # Renovar quando restam 200 segundos - self.running = False - self.renewal_thread = None - def _get_available_sql_server_driver(self) -> str: - """ - Detecta automaticamente o driver SQL Server disponível no sistema - - Returns: - str: Nome do driver disponível - """ - # Lista de drivers SQL Server em ordem de preferência + """Detecta automaticamente o driver SQL Server disponível""" drivers_to_try = [ '{ODBC Driver 18 for SQL Server}', '{ODBC Driver 17 for SQL Server}', @@ -54,257 +38,97 @@ class RGBTokenClient: ] available_drivers = pyodbc.drivers() - print(f"Drivers ODBC disponíveis no sistema:") - for driver in available_drivers: - print(f" - {driver}") + print("Drivers ODBC disponíveis:") + for d in available_drivers: + print(f" - {d}") - # Procurar o melhor driver disponível - for preferred_driver in drivers_to_try: - driver_name = preferred_driver.strip('{}') - if driver_name in available_drivers: - print(f"? Usando driver: {preferred_driver}") - return preferred_driver + for preferred in drivers_to_try: + if preferred.strip("{}") in available_drivers: + print(f"✅ Usando driver: {preferred}") + return preferred - # Se nenhum driver SQL Server for encontrado, usar o primeiro disponível if available_drivers: - fallback_driver = f"{{{available_drivers[0]}}}" - print(f"?? Nenhum driver SQL Server encontrado. Tentando com: {fallback_driver}") - return fallback_driver + fallback = f"{{{available_drivers[0]}}}" + print(f"⚠️ Nenhum driver padrão encontrado. Usando: {fallback}") + return fallback - # Se não há drivers disponíveis - print("? Nenhum driver ODBC encontrado no sistema!") - raise Exception("Nenhum driver ODBC disponível. Instale o Microsoft ODBC Driver for SQL Server.") + raise Exception("Nenhum driver ODBC encontrado no sistema.") - def get_token(self) -> Optional[Dict]: - """ - Faz requisição para obter o token JWT - - Returns: - Dict com a resposta da API ou None em caso de erro'' - """ + def get_token(self): + """Obtém o token JWT da API""" try: - # Parâmetros da requisição - params = { - "grant_type": "client_credentials" - } + print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Solicitando novo token...") - # Body da requisição com as credenciais - data = { - "client_id": self.client_id, - "client_secret": self.client_secret - } - - # Headers - headers = { - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/json" - } - - print(f"Fazendo requisição para: {self.base_url}") - print(f"Parâmetros: {params}") - - # Fazer a requisição POST response = requests.post( self.base_url, - params=params, - data=data, - headers=headers, + params={"grant_type": "client_credentials"}, + data={ + "client_id": self.client_id, + "client_secret": self.client_secret + }, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json" + }, timeout=30 ) print(f"Status Code: {response.status_code}") - # Verificar se a requisição foi bem-sucedida if response.status_code == 200: - token_data = response.json() - print("Token obtido com sucesso!") + data = response.json() + token = data.get("access_token") + expires_in = data.get("expires_in", 0) + expira = datetime.now() + timedelta(seconds=expires_in) - # Atualizar informações do token atual - if "access_token" in token_data: - self.current_token = token_data["access_token"] - - # Calcular quando o token expira - if "expires_in" in token_data: - expires_in = token_data["expires_in"] - self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) - print(f"Token expira em: {self.token_expires_at.strftime('%Y-%m-%d %H:%M:%S')}") - - return token_data + print(f"✅ Token obtido com sucesso! Expira em: {expira}") + return token else: - print(f"Erro na requisição: {response.status_code}") + print(f"❌ Erro na requisição: {response.status_code}") print(f"Resposta: {response.text}") return None - except requests.exceptions.RequestException as e: - print(f"Erro na requisição: {e}") - return None - except json.JSONDecodeError as e: - print(f"Erro ao decodificar JSON: {e}") - return None except Exception as e: - print(f"Erro inesperado: {e}") + print(f"❌ Erro ao obter token: {e}") return None - def insert_token_to_database(self, token: str) -> bool: - """ - Atualiza o token no banco de dados SQL Server (ID = 1) - - Args: - token (str): Token a ser atualizado no banco - - Returns: - bool: True se atualização foi bem-sucedida, False caso contrário - """ + def save_token(self, token: str) -> bool: + """Atualiza o token no banco de dados""" try: - print(f"Conectando ao banco de dados...") + print("Conectando ao banco de dados...") + conn = pyodbc.connect(self.connection_string) + cursor = conn.cursor() - # Estabelecer conexão com o banco - connection = pyodbc.connect(self.connection_string) - cursor = connection.cursor() + query = "UPDATE dbo.rgb_token SET token = ?, updatedAt = GETDATE() WHERE id = 1" + cursor.execute(query, token) + conn.commit() - # Query de atualização para o ID 1 - update_query = "UPDATE dbo.rgb_token SET token = ?, updatedAt = GETDATE() WHERE id = 1" - - print(f"Atualizando token no banco de dados (ID = 1)...") - - # Executar a atualização - cursor.execute(update_query, token) - rows_affected = cursor.rowcount - connection.commit() - - if rows_affected > 0: - print(f"Token atualizado com sucesso no banco de dados! ({rows_affected} registro(s) afetado(s))") - else: - print("Nenhum registro foi atualizado. Verifique se existe um registro com ID = 1.") - - # Fechar conexões + rows = cursor.rowcount cursor.close() - connection.close() + conn.close() - return rows_affected > 0 - - except pyodbc.Error as e: - print(f"Erro de banco de dados: {e}") - return False - except Exception as e: - print(f"Erro inesperado ao atualizar no banco: {e}") - return False - - def get_token_and_save(self) -> bool: - """ - Obtém o token da API e salva no banco de dados - - Returns: - bool: True se operação foi bem-sucedida, False caso contrário - """ - print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Obtendo novo token...") - - token_response = self.get_token() - - if token_response and "access_token" in token_response: - access_token = token_response["access_token"] - - # Atualizar o token no banco de dados - success = self.insert_token_to_database(access_token) - - if success: - print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Token renovado e atualizado com sucesso!") + if rows > 0: + print(f"✅ Token atualizado com sucesso no banco ({rows} registro(s)).") return True else: - print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao atualizar token no banco.") + print("⚠️ Nenhum registro atualizado (verifique o ID = 1).") return False - else: - print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ? Falha ao obter token da API.") + + except Exception as e: + print(f"❌ Erro ao salvar token no banco: {e}") return False - def _renewal_worker(self): - """ - Worker thread que monitora e renova o token automaticamente - """ - while self.running: - try: - if self.token_expires_at: - # Calcular tempo restante até expiração - now = datetime.now() - time_until_expiry = (self.token_expires_at - now).total_seconds() - - # Se restam menos que o threshold, renovar o token - if time_until_expiry <= self.renewal_threshold: - print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token expira em {int(time_until_expiry)} segundos. Renovando...") - - success = self.get_token_and_save() - - if not success: - print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ? Falha na renovação automática do token!") - else: - # Mostrar status a cada 60 segundos - if int(time_until_expiry) % 60 == 0: - print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] ?? Token válido por mais {int(time_until_expiry)} segundos") - - # Verificar a cada segundo - time.sleep(1) - - except Exception as e: - print(f"Erro no worker de renovação: {e}") - time.sleep(5) # Aguardar 5 segundos antes de tentar novamente - - def start_auto_renewal(self): - """ - Inicia o processo de renovação automática do token - """ - if not self.running: - self.running = True - self.renewal_thread = threading.Thread(target=self._renewal_worker, daemon=True) - self.renewal_thread.start() - print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática iniciado!") - - def stop_auto_renewal(self): - """ - Para o processo de renovação automática do token - """ - if self.running: - self.running = False - if self.renewal_thread: - self.renewal_thread.join(timeout=2) - print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ?? Sistema de renovação automática parado!") - def main(): - """Função principal com renovação automática do token""" client = RGBTokenClient() + token = client.get_token() - print("=== Cliente RGB Token com Renovação Automática ===") - print("Obtendo token JWT da API do Grupo Boticário...\n") - - # Obter o primeiro token - success = client.get_token_and_save() - - if success: - print(f"\n? Token inicial obtido e salvo com sucesso!") - print(f"Token atual: {client.current_token[:50] if client.current_token else 'N/A'}...") - - # Iniciar sistema de renovação automática - client.start_auto_renewal() - - try: - print(f"\n?? Sistema rodando... Pressione Ctrl+C para parar") - print(f"?? Configurações:") - print(f" - Renovação automática: {client.renewal_threshold} segundos antes da expiração") - print(f" - Próxima expiração: {client.token_expires_at.strftime('%Y-%m-%d %H:%M:%S') if client.token_expires_at else 'N/A'}") - print(f"\n" + "="*60) - - # Loop principal - manter o programa rodando - while True: - time.sleep(1) - - except KeyboardInterrupt: - print(f"\n\n?? Interrupção detectada. Parando sistema...") - client.stop_auto_renewal() - print("Sistema finalizado com sucesso!") - + if token: + client.save_token(token) + print("🎯 Execução finalizada com sucesso.") else: - print("\n? Falha ao obter o token inicial. Verifique as configurações.") + print("❌ Falha ao obter ou salvar o token.") if __name__ == "__main__": - main() \ No newline at end of file + main()