from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # from selenium.webdriver.chrome.service import Service # Usado apenas para Linux/Docker from selenium.webdriver.chrome.options import Options import time import os import csv import unicodedata import pyodbc from datetime import datetime, timedelta import json # Configuração da data USE_MANUAL_DATE = False # False = usar dia anterior automaticamente; True = usar MANUAL_DATE_STR MANUAL_DATE_STR = "20112025" # Formato DDMMAAAA, usado quando USE_MANUAL_DATE=True # Configuração de intervalo de datas (execução dia por dia) USE_DATE_RANGE = False # True = usar intervalo de datas; False = usar USE_MANUAL_DATE DATE_RANGE_START = "15/10/2025" # Formato DD/MM/YYYY DATE_RANGE_END = "19/10/2025" # Formato DD/MM/YYYY STATE_FILE = "date_range_state.json" # Arquivo para rastrear progresso RANGE_COMPLETED = False # Flag para indicar que o intervalo foi completado def _parse_date_ddmmyyyy(date_str: str) -> datetime: """Converte string DD/MM/YYYY para datetime.""" try: return datetime.strptime(date_str.strip(), "%d/%m/%Y") except Exception: raise ValueError(f"Formato de data inválido: {date_str}. Use DD/MM/YYYY") def _get_next_date_in_range() -> str: """Retorna a próxima data do intervalo no formato DDMMAAAA. Rastreia o progresso em um arquivo JSON. Quando chega ao fim, para de executar.""" global RANGE_COMPLETED try: start_dt = _parse_date_ddmmyyyy(DATE_RANGE_START) end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END) except ValueError as e: print(f"Erro ao parsear datas: {e}") raise # Carregar estado anterior current_date = start_dt if os.path.exists(STATE_FILE): try: with open(STATE_FILE, 'r') as f: state = json.load(f) # Verificar se o intervalo já foi completado if state.get('completed', False): print(f"Intervalo de datas já foi completado!") print(f"Última data processada: {state.get('last_date')}") print(f"Para reiniciar, delete o arquivo '{STATE_FILE}'") RANGE_COMPLETED = True return None # Sinal para parar last_date_str = state.get('last_date') if last_date_str: last_date = datetime.strptime(last_date_str, "%d%m%Y") # Se a última data foi antes do fim, incrementar um dia if last_date < end_dt: current_date = last_date + timedelta(days=1) else: # Chegou ao fim RANGE_COMPLETED = True return None # Sinal para parar except Exception as e: print(f"Erro ao ler arquivo de estado: {e}. Iniciando do início.") current_date = start_dt print(f"Data a processar: {current_date.strftime('%d/%m/%Y')}") return current_date.strftime("%d%m%Y") def _save_date_state(date_str: str) -> None: """Salva o estado da data processada com sucesso. Deve ser chamado apenas após a conclusão bem-sucedida. Se for a data final, marca como completo.""" try: end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END) current_dt = datetime.strptime(date_str, "%d%m%Y") state = {'last_date': date_str} # Se chegou na data final, marcar como completo if current_dt >= end_dt: state['completed'] = True print(f"Intervalo de datas COMPLETO! Última data: {date_str}") with open(STATE_FILE, 'w') as f: json.dump(state, f) print(f"Estado salvo: {date_str}") except Exception as e: print(f"Aviso: não foi possível salvar estado: {e}") def resolve_data(use_manual: bool, manual_str: str): """Retorna a data no formato DDMMAAAA. Se USE_DATE_RANGE=True, usa o intervalo de datas (uma por execução). Se use_manual=True, valida e usa manual_str; caso contrário, usa o dia anterior. Retorna None se o intervalo de datas foi completado.""" if USE_DATE_RANGE: return _get_next_date_in_range() if use_manual: s = ''.join(ch for ch in str(manual_str) if ch.isdigit()) if len(s) == 8: return s else: print("Data manual inválida; usando dia anterior automaticamente.") return (datetime.now() - timedelta(days=1)).strftime("%d%m%Y") def choose_sql_driver() -> str: try: available = pyodbc.drivers() except Exception: available = [] # Prefer 18, depois 17, depois o primeiro da lista que contenha 'SQL Server' for candidate in ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server', 'SQL Server']: if candidate in available: return candidate for d in available: if 'SQL Server' in d: return d # Fallback (pode falhar na conexão se realmente não houver driver) return 'ODBC Driver 17 for SQL Server' def main(): # Configurar opções do Chrome para ambiente Kubernetes/Docker chrome_options = Options() # CONFIGURAÇÃO: Defina como False para ver o navegador (útil para debug no Windows) USE_HEADLESS = True # True = modo headless (sem interface), False = com interface # Configurações essenciais para rodar em Docker/Kubernetes (sem interface gráfica) if USE_HEADLESS: chrome_options.add_argument('--headless=new') # Novo modo headless (mais estável) chrome_options.add_argument('--no-sandbox') # Necessário para rodar como root chrome_options.add_argument('--disable-dev-shm-usage') # Evita problemas de memória compartilhada # Configurações adicionais recomendadas chrome_options.add_argument('--disable-software-rasterizer') chrome_options.add_argument('--disable-extensions') chrome_options.add_argument('--disable-blink-features=AutomationControlled') # Evita detecção de automação # Configurações de janela chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--start-maximized') # Desabilitar notificações e popups chrome_options.add_argument('--disable-notifications') chrome_options.add_experimental_option('excludeSwitches', ['enable-logging', 'enable-automation']) chrome_options.add_experimental_option('useAutomationExtension', False) # Definir pasta de download # Tenta usar Desktop, se não existir usa /tmp (para Kubernetes/Docker) desktop_dir = os.path.join(os.path.expanduser("~"), "Desktop") if os.path.exists(desktop_dir): download_dir = desktop_dir else: # Em ambiente Kubernetes/Docker, usar /tmp ou criar diretório download_dir = "/tmp/downloads" os.makedirs(download_dir, exist_ok=True) print(f"Usando diretório de download: {download_dir}") prefs = { "download.default_directory": download_dir, "download.prompt_for_download": False, "download.directory_upgrade": True, "safebrowsing.enabled": True, "profile.default_content_setting_values.notifications": 2 # Desabilitar notificações } chrome_options.add_experimental_option("prefs", prefs) # Usar o binário do Chromium instalado no sistema (para Kubernetes/Docker) # Comentar a linha abaixo se estiver rodando localmente no Windows # chrome_options.binary_location = '/usr/bin/chromium' # Inicializar o driver do Chrome print("Iniciando o navegador...") print(f"Modo headless: {USE_HEADLESS}") try: # Criar o service apontando para o chromedriver do sistema (Kubernetes/Docker) # Comentar a linha abaixo se estiver rodando localmente no Windows # from selenium.webdriver.chrome.service import Service # service = Service('/usr/bin/chromedriver') # driver = webdriver.Chrome(service=service, options=chrome_options) # Para rodar localmente no Windows, use: driver = webdriver.Chrome(options=chrome_options) print("Navegador iniciado com sucesso!") except Exception as e: print(f"Erro ao iniciar o navegador: {e}") print("\nPossíveis soluções:") print("1. Verifique se o Chrome está instalado") print("2. Verifique se o ChromeDriver está instalado e atualizado") print("3. Execute: pip install --upgrade selenium") print("4. Tente desabilitar o modo headless (USE_HEADLESS = False)") raise try: # Acessar a URL url = "https://cp10269.retaguarda.grupoboticario.com.br/app/#/" print(f"Acessando {url}...") driver.get(url) # Aguardar o campo de input estar presente e visível print("Aguardando o campo de login aparecer...") wait = WebDriverWait(driver, 10) input_field = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-usuario-input-field"]')) ) # Digitar "daniel.medeiros" no campo de login print("Digitando 'daniel.medeiros' no campo de login...") input_field.clear() input_field.send_keys("daniel.medeiros") # Aguardar o campo de senha aparecer print("Aguardando o campo de senha aparecer...") password_field = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-senha-input-field"]')) ) # Digitar a senha print("Digitando a senha...") password_field.clear() password_field.send_keys("@ginseng") # Clicar no botão de entrar com retentativa se houver erro do reCAPTCHA print("Clicando no botão Entrar...") max_login_attempts = 3 for attempt in range(1, max_login_attempts + 1): try: entrar_button = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]')) ) # usar JS click para maior robustez driver.execute_script("arguments[0].click();", entrar_button) except Exception: try: entrar_button = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]')) ) entrar_button.click() except Exception: pass # Verificar se o menu "Venda" apareceu como sinal de login bem-sucedido try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]')) ) print("Login confirmado.") break except Exception: # Verificar mensagem de erro no corpo da página try: body_text = driver.execute_script("return dment.body ? document.body.innerText : ''") or "" except Exception: body_text = "" if 'grecaptcha.execute is not a function' in body_text.lower() or 'this.grecaptcha.execute is not a function' in body_text.lower(): print("Aviso do reCAPTCHA detectado; tentando clicar Entrar novamente...") else: print(f"Login ainda não confirmado (tentativa {attempt}/{max_login_attempts}). Tentando novamente...") time.sleep(2) if attempt == max_login_attempts: raise Exception("Falha ao efetuar login após múltiplas tentativas.") # Clicar em "Venda" no menu lateral print("Clicando em 'Venda'...") venda_menu = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]')) ) venda_menu.click() time.sleep(1) # Clicar em "Relatórios" print("Clicando em 'Relatórios'...") relatorios_menu = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-relatorios"]')) ) relatorios_menu.click() time.sleep(1) # Clicar em "Vendas por Hora" print("Clicando em 'Vendas por Hora'...") vendas_hora_menu = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-vendas-por-hora"]')) ) vendas_hora_menu.click() # Aguardar a página de relatório carregar print("Aguardando a página de relatório carregar...") time.sleep(3) # Resolver a data a ser utilizada (dia anterior ou manual), formato DDMMAAAA data_formatada = resolve_data(USE_MANUAL_DATE, MANUAL_DATE_STR) # Se retornar None, significa que o intervalo de datas foi completado if data_formatada is None: print("Intervalo de datas completado. Encerrando...") return print(f"Data a ser preenchida: {data_formatada}") # Clicar no canto inferior direito da página usando JavaScript print("Clicando no canto inferior direito da página...") driver.execute_script(""" var width = window.innerWidth; var height = window.innerHeight; var element = document.elementFromPoint(width - 50, height - 50); if (element) { element.click(); } """) time.sleep(0.5) # Apertar CTRL + K print("Apertando CTRL + K...") from selenium.webdriver.common.action_chains import ActionChains actions = ActionChains(driver) actions.key_down(Keys.CONTROL).send_keys('k').key_up(Keys.CONTROL).perform() time.sleep(1) # Apertar TAB 3 vezes print("Apertando TAB 3 vezes...") for i in range(3): actions.send_keys(Keys.TAB).perform() time.sleep(0.3) # Digitar a data do dia anterior print(f"Digitando a data inicial: {data_formatada}") actions.send_keys(data_formatada).perform() time.sleep(0.5) # Digitar a mesma data novamente (data final) print(f"Digitando a data final: {data_formatada}") actions.send_keys(data_formatada).perform() time.sleep(2) # Pressionar ENTER ou ESC para fechar possíveis popups/calendários print("Pressionando ESC para fechar calendários...") actions.send_keys(Keys.ESCAPE).perform() time.sleep(1) # Rolar a página para baixo para carregar mais elementos print("Rolando a página para baixo para carregar elementos...") driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) # Rolar para cima driver.execute_script("window.scrollTo(0, 0);") time.sleep(1) # Rolar para o meio driver.execute_script("window.scrollTo(0, document.body.scrollHeight / 2);") time.sleep(2) # Trocar para o iframe que contém o formulário print("\nMudando para o iframe do formulário...") wait.until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe'))) # Confirmar que estamos no iframe correto procurando um campo conhecido wait.until(EC.presence_of_element_located((By.ID, 'dataInicial'))) print("Dentro do iframe do formulário.") # Tentar clicar no radio 'Produto' por várias estratégias print("Procurando o radio 'Produto'...") radio_xpath = "//input[@type='radio' and @name='filtro.formato' and (@id='filtro_produto' or translate(@value,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz')='produto')]" try: radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, radio_xpath))) except Exception: # Fallback: procurar label com texto 'Produto' e um input dentro radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, "//label[normalize-space()[contains(., 'Produto')]]//input[@type='radio']"))) # Garantir visibilidade e clicar driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", radio_produto) time.sleep(0.5) driver.execute_script("arguments[0].click();", radio_produto) print("Radio 'Produto' selecionado.") time.sleep(0.5) # Selecionar formato CSV (label dentro de #divArquivo) print("Selecionando formato CSV...") formato_csv_label = wait.until(EC.element_to_be_clickable((By.XPATH, "//*[@id='divArquivo']/div[2]/label"))) driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", formato_csv_label) time.sleep(0.3) driver.execute_script("arguments[0].click();", formato_csv_label) time.sleep(0.5) # Antes de gerar, capturar arquivos existentes na pasta de download before_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload')) # Clicar no botão Gerar print("Clicando no botão Gerar...") botao_gerar = wait.until(EC.element_to_be_clickable((By.ID, 'gerar'))) driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", botao_gerar) time.sleep(0.3) driver.execute_script("arguments[0].click();", botao_gerar) # Voltar para o conteúdo principal driver.switch_to.default_content() # Aguardar download finalizar e tratar/renomear com a data usada print("Aguardando download finalizar para tratar e renomear...") end_time = time.time() + 180 # até 3 minutos downloaded_file = None while time.time() < end_time: # Esperar terminar downloads parciais (.crdownload ou arquivos temporários do Chrome) current_dir_files = os.listdir(download_dir) if any(name.endswith('.crdownload') for name in current_dir_files): time.sleep(1) continue # Filtrar arquivos válidos: # - Não termina com .crdownload # - Não começa com . (arquivos temporários/ocultos do Chrome) # - Não contém .com.google.Chrome (arquivos temporários específicos) def is_valid_file(filename): if filename.endswith('.crdownload'): return False if filename.startswith('.'): return False if '.com.google.Chrome' in filename: return False return True current_files = set(f for f in current_dir_files if is_valid_file(f)) new_files = [f for f in current_files - before_files] if new_files: # Escolher o mais recente candidate = max((os.path.join(download_dir, f) for f in new_files), key=os.path.getmtime) # Verificar se o arquivo realmente existe e tem tamanho > 0 if os.path.exists(candidate) and os.path.getsize(candidate) > 0: downloaded_file = candidate break time.sleep(1) def _normalize_label(s: str) -> str: if s is None: return "" s = str(s) s = unicodedata.normalize('NFKD', s) s = ''.join(c for c in s if not unicodedata.combining(c)) s = s.lower() # remover caracteres comuns for ch in ['%', '(', ')', '/', '-', '_', '.', ':']: s = s.replace(ch, ' ') s = ' '.join(s.split()) return s if downloaded_file: print(f"Arquivo baixado detectado: {downloaded_file}") # Preparar nome final (garantindo nome nico) final_name = f"vendas_por_hora_{data_formatada}.csv" final_path = os.path.join(download_dir, final_name) if os.path.exists(final_path): base_no_ext, ext = os.path.splitext(final_name) counter = 1 while os.path.exists(os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")): counter += 1 final_path = os.path.join(download_dir, f"{base_no_ext}({counter}){ext}") # Processar CSV removendo colunas e transformando campos cols_para_remover = { _normalize_label(n) for n in [ 'Receita líquida', 'Receita liquida', 'Participação Receita (%)', 'Participacao Receita (%)', 'Maior receita', 'Maior Receita', 'Participação Boletos (%)', 'Participacao Boletos (%)' ] } try: print("Abrindo arquivo CSV para leitura...") # Detectar delimitador e ler with open(downloaded_file, 'r', encoding='utf-8-sig', errors='ignore') as f: print("Detectando delimitador do CSV...") sample = f.read(4096) f.seek(0) try: dialect = csv.Sniffer().sniff(sample, delimiters=';,\t|') print(f"Delimitador detectado: '{dialect.delimiter}'") except Exception as e: print(f"Erro ao detectar delimitador, usando ';' por padrão: {e}") class Simple(csv.Dialect): delimiter = ';' quotechar = '"' escapechar = None doublequote = True skipinitialspace = False lineterminator = '\n' quoting = csv.QUOTE_MINIMAL dialect = Simple print("Lendo cabeçalhos do CSV...") reader = csv.DictReader(f, dialect=dialect) original_headers = reader.fieldnames or [] print(f"Cabeçalhos encontrados: {original_headers}") # Construir novos headers, removendo colunas e substituindo Produto new_headers = [] produto_idx = None for h in original_headers: nh = _normalize_label(h) if nh in cols_para_remover: continue if nh == _normalize_label('Produto'): produto_idx = len(new_headers) new_headers.extend(['SKU', 'Descricao']) elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'): new_headers.append('PRECO_MEDIO') else: new_headers.append(h) # Se Produto não existir, garante as colunas novas ao final if 'SKU' not in new_headers: new_headers.extend(['SKU', 'Descricao']) print(f"Novos cabeçalhos processados: {new_headers}") print("Processando linhas do CSV...") rows_out = [] row_count = 0 for row in reader: row_count += 1 if row_count % 100 == 0: print(f"Processadas {row_count} linhas...") new_row = {} for h in original_headers: nh = _normalize_label(h) if nh in cols_para_remover: continue if nh == _normalize_label('Produto'): val = row.get(h, '') or '' parts = [p.strip() for p in val.split(' - ', 1)] sku = parts[0] if parts else '' desc = parts[1] if len(parts) > 1 else val new_row['SKU'] = sku new_row['Descricao'] = desc elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'): val = row.get(h, '') new_row['PRECO_MEDIO'] = val else: val = row.get(h, '') if nh == _normalize_label('Loja'): val = (val or '').split(' - ', 1)[0].strip() new_row[h] = val # Garantir chaves SKU/Descricao existam new_row.setdefault('SKU', '') new_row.setdefault('Descricao', '') new_row.setdefault('PRECO_MEDIO', '') rows_out.append(new_row) print(f"Total de linhas processadas: {row_count}") print(f"Total de linhas válidas: {len(rows_out)}") # Inserir os dados tratados no banco de dados print("Preparando para inserir dados no banco...") def _find_header(headers, targets): targets_norm = { _normalize_label(t) for t in targets } for hh in headers: if _normalize_label(hh) in targets_norm: return hh return None def _parse_number(val): s = str(val or '').strip() if s == '': return 0 # converter formato pt-BR para ponto flutuante s = s.replace('.', '').replace(',', '.') try: num = float(s) # se for inteiro, retorna int return int(num) if abs(num - int(num)) < 1e-9 else num except Exception: return 0 # Mapear colunas de interesse pdv_header = _find_header(new_headers, ['Loja', 'PDV']) vendas_header = _find_header(new_headers, ['Vendas', 'Quantidade', 'Qtd', 'Qtd. Vendida', 'Qtd Vendida', 'Qtde Vendida', 'Qtde. Vendida', 'Quantidade Vendida', 'Qtde']) preco_medio_header = _find_header(new_headers, ['PRECO_MEDIO', 'Preço Médio', 'Preco Medio']) if not pdv_header: raise Exception("Não foi possível identificar a coluna de PDV/Loja no CSV tratado.") if not vendas_header: print("Aviso: coluna de Vendas não encontrada; valores serão inseridos como 0.") if not preco_medio_header: print("Aviso: coluna de Preço Médio não encontrada; valores serão inseridos como 0.00.") # Conectar ao SQL Server print("Escolhendo driver SQL Server...") driver_name = choose_sql_driver() print(f"Driver selecionado: {driver_name}") connection_string = ( f'DRIVER={{{driver_name}}};' f'SERVER=10.77.77.10;' f'DATABASE=GINSENG;' f'UID=supginseng;' f'PWD=Ginseng@;' f'PORT=1433;' f'TrustServerCertificate=yes;' f'Encrypt=yes' ) # Converter data para formato aceito pelo banco (date) data_db = datetime.strptime(data_formatada, '%d%m%Y').date() print(f"Data para inserção no banco: {data_db}") print("Conectando ao banco de dados...") inserted = 0 with pyodbc.connect(connection_string) as conn: print("Conexão estabelecida com sucesso!") conn.autocommit = False cur = conn.cursor() # Apagar dados existentes para a data print(f"Deletando dados existentes para a data {data_db}...") cur.execute("DELETE FROM [GINSENG].[dbo].[rgb_sales_selenium] WHERE [Data] = ?", data_db) print("Dados antigos deletados.") # Inserir linhas print("Preparando inserção de dados...") insert_sql = ( "INSERT INTO [GINSENG].[dbo].[rgb_sales_selenium] ([Data],[PDV],[SKU],[DESCRICAO],[VENDAS],[PRECO_MEDIO]) " "VALUES (?,?,?,?,?,?)" ) batch = [] batch_count = 0 for r in rows_out: pdv_val = r.get(pdv_header, '') try: pdv_val = int(str(pdv_val).strip().split()[0]) if str(pdv_val).strip() else None except Exception: pdv_val = None sku_val = r.get('SKU', '') desc_val = r.get('Descricao', '') vendas_val = _parse_number(r.get(vendas_header, '')) if vendas_header else 0 preco_medio_val = _parse_number(r.get(preco_medio_header, '')) if preco_medio_header else 0.00 if pdv_val is None and r.get('Loja'): try: pdv_val = int(str(r.get('Loja')).strip().split()[0]) except Exception: pdv_val = None batch.append((data_db, pdv_val, sku_val, desc_val, vendas_val, preco_medio_val)) if len(batch) >= 1000: batch_count += 1 print(f"Inserindo lote {batch_count} ({len(batch)} registros)...") cur.executemany(insert_sql, batch) inserted += len(batch) batch = [] if batch: batch_count += 1 print(f"Inserindo lote final {batch_count} ({len(batch)} registros)...") cur.executemany(insert_sql, batch) inserted += len(batch) print("Fazendo commit das alterações...") conn.commit() print("Commit realizado com sucesso!") print(f"Dados inseridos no banco: {inserted} registros para a data {data_db}.") # Remover arquivo original print(f"Removendo arquivo temporário: {downloaded_file}") try: os.remove(downloaded_file) print("Arquivo removido com sucesso.") except Exception as e: print(f"Aviso: não foi possível remover o arquivo: {e}") # Salvar estado apenas após sucesso completo if USE_DATE_RANGE: print("Salvando estado do processamento...") _save_date_state(data_formatada) print("Processamento concluído com sucesso!") except Exception as e: print(f"Falha ao tratar o CSV: {e}") import traceback traceback.print_exc() else: print("Não foi possível detectar o arquivo baixado dentro do tempo limite.") # Pausa breve antes de encerrar time.sleep(2) print("Fechando o navegador...") except Exception as e: print(f"Erro durante a execução: {e}") finally: # Fechar o navegador driver.quit() print("Script finalizado.") def run_with_retry(max_retries=3, retry_delay=120): """Executa main() com retentativas em caso de erro de timeout/conexão/arquivo.""" for attempt in range(1, max_retries + 1): try: main() return True # Sucesso except Exception as e: error_str = str(e).lower() # Verificar se é erro de timeout/conexão/arquivo temporário que vale retry is_retryable = any(keyword in error_str for keyword in [ 'read timed out', 'timed out', 'timeout', 'connectionpool', 'connection refused', 'connection reset', 'httpconnectionpool', 'no such file or directory', '.com.google.chrome', 'filenotfounderror' ]) if is_retryable and attempt < max_retries: print(f"\n{'=' * 60}") print(f"Erro de conexão/timeout detectado (tentativa {attempt}/{max_retries})") print(f"Erro: {e}") print(f"Aguardando {retry_delay} segundos antes de tentar novamente...") print(f"{'=' * 60}\n") time.sleep(retry_delay) else: if attempt == max_retries and is_retryable: print(f"\n{'=' * 60}") print(f"Erro persistente após {max_retries} tentativas.") print(f"Último erro: {e}") print(f"{'=' * 60}\n") raise return False if __name__ == "__main__": if USE_DATE_RANGE: print("=" * 60) print("MODO LOOP ATIVADO - O script executará continuamente") print(f"Intervalo: {DATE_RANGE_START} até {DATE_RANGE_END}") print("=" * 60) while not RANGE_COMPLETED: try: run_with_retry(max_retries=3, retry_delay=120) if RANGE_COMPLETED: break print("\n" + "=" * 60) print("Execução concluída. Aguardando 10 segundos antes da próxima execução...") print("=" * 60 + "\n") time.sleep(10) # Aguarda 10 segundos antes da próxima execução except Exception as e: print(f"\nErro na execução: {e}") print("Tentando novamente em 10 segundos...") time.sleep(10) print("\n" + "=" * 60) print("Intervalo de datas completado. Script finalizado.") print("=" * 60) else: run_with_retry(max_retries=3, retry_delay=120)