G-Scripts/extracao_vendashora_rgb.py
daniel.rodrigues e250eb3cf6 first commit
2026-02-20 09:00:12 -03:00

773 lines
34 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# from selenium.webdriver.chrome.service import Service # Usado apenas para Linux/Docker
from selenium.webdriver.chrome.options import Options
import time
import os
import csv
import unicodedata
import pyodbc
from datetime import datetime, timedelta
import json
# Configuração da data
USE_MANUAL_DATE = False # False = usar dia anterior automaticamente; True = usar MANUAL_DATE_STR
MANUAL_DATE_STR = "20112025" # Formato DDMMAAAA, usado quando USE_MANUAL_DATE=True
# Configuração de intervalo de datas (execução dia por dia)
USE_DATE_RANGE = False # True = usar intervalo de datas; False = usar USE_MANUAL_DATE
DATE_RANGE_START = "15/10/2025" # Formato DD/MM/YYYY
DATE_RANGE_END = "19/10/2025" # Formato DD/MM/YYYY
STATE_FILE = "date_range_state.json" # Arquivo para rastrear progresso
RANGE_COMPLETED = False # Flag para indicar que o intervalo foi completado
def _parse_date_ddmmyyyy(date_str: str) -> datetime:
"""Converte string DD/MM/YYYY para datetime."""
try:
return datetime.strptime(date_str.strip(), "%d/%m/%Y")
except Exception:
raise ValueError(f"Formato de data inválido: {date_str}. Use DD/MM/YYYY")
def _get_next_date_in_range() -> str:
"""Retorna a próxima data do intervalo no formato DDMMAAAA.
Rastreia o progresso em um arquivo JSON.
Quando chega ao fim, para de executar."""
global RANGE_COMPLETED
try:
start_dt = _parse_date_ddmmyyyy(DATE_RANGE_START)
end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END)
except ValueError as e:
print(f"Erro ao parsear datas: {e}")
raise
# Carregar estado anterior
current_date = start_dt
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
state = json.load(f)
# Verificar se o intervalo já foi completado
if state.get('completed', False):
print(f"Intervalo de datas já foi completado!")
print(f"Última data processada: {state.get('last_date')}")
print(f"Para reiniciar, delete o arquivo '{STATE_FILE}'")
RANGE_COMPLETED = True
return None # Sinal para parar
last_date_str = state.get('last_date')
if last_date_str:
last_date = datetime.strptime(last_date_str, "%d%m%Y")
# Se a última data foi antes do fim, incrementar um dia
if last_date < end_dt:
current_date = last_date + timedelta(days=1)
else:
# Chegou ao fim
RANGE_COMPLETED = True
return None # Sinal para parar
except Exception as e:
print(f"Erro ao ler arquivo de estado: {e}. Iniciando do início.")
current_date = start_dt
print(f"Data a processar: {current_date.strftime('%d/%m/%Y')}")
return current_date.strftime("%d%m%Y")
def _save_date_state(date_str: str) -> None:
"""Salva o estado da data processada com sucesso.
Deve ser chamado apenas após a conclusão bem-sucedida.
Se for a data final, marca como completo."""
try:
end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END)
current_dt = datetime.strptime(date_str, "%d%m%Y")
state = {'last_date': date_str}
# Se chegou na data final, marcar como completo
if current_dt >= end_dt:
state['completed'] = True
print(f"Intervalo de datas COMPLETO! Última data: {date_str}")
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
print(f"Estado salvo: {date_str}")
except Exception as e:
print(f"Aviso: não foi possível salvar estado: {e}")
def resolve_data(use_manual: bool, manual_str: str):
"""Retorna a data no formato DDMMAAAA.
Se USE_DATE_RANGE=True, usa o intervalo de datas (uma por execução).
Se use_manual=True, valida e usa manual_str; caso contrário, usa o dia anterior.
Retorna None se o intervalo de datas foi completado."""
if USE_DATE_RANGE:
return _get_next_date_in_range()
if use_manual:
s = ''.join(ch for ch in str(manual_str) if ch.isdigit())
if len(s) == 8:
return s
else:
print("Data manual inválida; usando dia anterior automaticamente.")
return (datetime.now() - timedelta(days=1)).strftime("%d%m%Y")
def choose_sql_driver() -> str:
try:
available = pyodbc.drivers()
except Exception:
available = []
# Prefer 18, depois 17, depois o primeiro da lista que contenha 'SQL Server'
for candidate in ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server', 'SQL Server']:
if candidate in available:
return candidate
for d in available:
if 'SQL Server' in d:
return d
# Fallback (pode falhar na conexão se realmente não houver driver)
return 'ODBC Driver 17 for SQL Server'
def main():
# Configurar opções do Chrome para ambiente Kubernetes/Docker
chrome_options = Options()
# CONFIGURAÇÃO: Defina como False para ver o navegador (útil para debug no Windows)
USE_HEADLESS = True # True = modo headless (sem interface), False = com interface
# Configurações essenciais para rodar em Docker/Kubernetes (sem interface gráfica)
if USE_HEADLESS:
chrome_options.add_argument('--headless=new') # Novo modo headless (mais estável)
chrome_options.add_argument('--no-sandbox') # Necessário para rodar como root
chrome_options.add_argument('--disable-dev-shm-usage') # Evita problemas de memória compartilhada
# Configurações adicionais recomendadas
chrome_options.add_argument('--disable-software-rasterizer')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-blink-features=AutomationControlled') # Evita detecção de automação
# Configurações de janela
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--start-maximized')
# Desabilitar notificações e popups
chrome_options.add_argument('--disable-notifications')
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging', 'enable-automation'])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Definir pasta de download
# Tenta usar Desktop, se não existir usa /tmp (para Kubernetes/Docker)
desktop_dir = os.path.join(os.path.expanduser("~"), "Desktop")
if os.path.exists(desktop_dir):
download_dir = desktop_dir
else:
# Em ambiente Kubernetes/Docker, usar /tmp ou criar diretório
download_dir = "/tmp/downloads"
os.makedirs(download_dir, exist_ok=True)
print(f"Usando diretório de download: {download_dir}")
prefs = {
"download.default_directory": download_dir,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True,
"profile.default_content_setting_values.notifications": 2 # Desabilitar notificações
}
chrome_options.add_experimental_option("prefs", prefs)
# Usar o binário do Chromium instalado no sistema (para Kubernetes/Docker)
# Comentar a linha abaixo se estiver rodando localmente no Windows
# chrome_options.binary_location = '/usr/bin/chromium'
# Inicializar o driver do Chrome
print("Iniciando o navegador...")
print(f"Modo headless: {USE_HEADLESS}")
try:
# Criar o service apontando para o chromedriver do sistema (Kubernetes/Docker)
# Comentar a linha abaixo se estiver rodando localmente no Windows
# from selenium.webdriver.chrome.service import Service
# service = Service('/usr/bin/chromedriver')
# driver = webdriver.Chrome(service=service, options=chrome_options)
# Para rodar localmente no Windows, use:
driver = webdriver.Chrome(options=chrome_options)
print("Navegador iniciado com sucesso!")
except Exception as e:
print(f"Erro ao iniciar o navegador: {e}")
print("\nPossíveis soluções:")
print("1. Verifique se o Chrome está instalado")
print("2. Verifique se o ChromeDriver está instalado e atualizado")
print("3. Execute: pip install --upgrade selenium")
print("4. Tente desabilitar o modo headless (USE_HEADLESS = False)")
raise
try:
# Acessar a URL
url = "https://cp10269.retaguarda.grupoboticario.com.br/app/#/"
print(f"Acessando {url}...")
driver.get(url)
# Aguardar o campo de input estar presente e visível
print("Aguardando o campo de login aparecer...")
wait = WebDriverWait(driver, 10)
input_field = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-usuario-input-field"]'))
)
# Digitar "daniel.medeiros" no campo de login
print("Digitando 'daniel.medeiros' no campo de login...")
input_field.clear()
input_field.send_keys("daniel.medeiros")
# Aguardar o campo de senha aparecer
print("Aguardando o campo de senha aparecer...")
password_field = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-senha-input-field"]'))
)
# Digitar a senha
print("Digitando a senha...")
password_field.clear()
password_field.send_keys("@ginseng")
# Clicar no botão de entrar com retentativa se houver erro do reCAPTCHA
print("Clicando no botão Entrar...")
max_login_attempts = 3
for attempt in range(1, max_login_attempts + 1):
try:
entrar_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]'))
)
# usar JS click para maior robustez
driver.execute_script("arguments[0].click();", entrar_button)
except Exception:
try:
entrar_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]'))
)
entrar_button.click()
except Exception:
pass
# Verificar se o menu "Venda" apareceu como sinal de login bem-sucedido
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]'))
)
print("Login confirmado.")
break
except Exception:
# Verificar mensagem de erro no corpo da página
try:
body_text = driver.execute_script("return dment.body ? document.body.innerText : ''") or ""
except Exception:
body_text = ""
if 'grecaptcha.execute is not a function' in body_text.lower() or 'this.grecaptcha.execute is not a function' in body_text.lower():
print("Aviso do reCAPTCHA detectado; tentando clicar Entrar novamente...")
else:
print(f"Login ainda não confirmado (tentativa {attempt}/{max_login_attempts}). Tentando novamente...")
time.sleep(2)
if attempt == max_login_attempts:
raise Exception("Falha ao efetuar login após múltiplas tentativas.")
# Clicar em "Venda" no menu lateral
print("Clicando em 'Venda'...")
venda_menu = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]'))
)
venda_menu.click()
time.sleep(1)
# Clicar em "Relatórios"
print("Clicando em 'Relatórios'...")
relatorios_menu = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-relatorios"]'))
)
relatorios_menu.click()
time.sleep(1)
# Clicar em "Vendas por Hora"
print("Clicando em 'Vendas por Hora'...")
vendas_hora_menu = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-vendas-por-hora"]'))
)
vendas_hora_menu.click()
# Aguardar a página de relatório carregar
print("Aguardando a página de relatório carregar...")
time.sleep(3)
# Resolver a data a ser utilizada (dia anterior ou manual), formato DDMMAAAA
data_formatada = resolve_data(USE_MANUAL_DATE, MANUAL_DATE_STR)
# Se retornar None, significa que o intervalo de datas foi completado
if data_formatada is None:
print("Intervalo de datas completado. Encerrando...")
return
print(f"Data a ser preenchida: {data_formatada}")
# Clicar no canto inferior direito da página usando JavaScript
print("Clicando no canto inferior direito da página...")
driver.execute_script("""
var width = window.innerWidth;
var height = window.innerHeight;
var element = document.elementFromPoint(width - 50, height - 50);
if (element) {
element.click();
}
""")
time.sleep(0.5)
# Apertar CTRL + K
print("Apertando CTRL + K...")
from selenium.webdriver.common.action_chains import ActionChains
actions = ActionChains(driver)
actions.key_down(Keys.CONTROL).send_keys('k').key_up(Keys.CONTROL).perform()
# Aguardar a página carregar completamente
print("Aguardando 10 segundos para a página carregar...")
time.sleep(10)
# Apertar TAB 3 vezes
print("Apertando TAB 3 vezes...")
for i in range(3):
actions.send_keys(Keys.TAB).perform()
time.sleep(0.3)
# Digitar a data do dia anterior
print(f"Digitando a data inicial: {data_formatada}")
actions.send_keys(data_formatada).perform()
time.sleep(0.5)
# Digitar a mesma data novamente (data final)
print(f"Digitando a data final: {data_formatada}")
actions.send_keys(data_formatada).perform()
time.sleep(2)
# Pressionar ENTER ou ESC para fechar possíveis popups/calendários
print("Pressionando ESC para fechar calendários...")
actions.send_keys(Keys.ESCAPE).perform()
time.sleep(1)
# Rolar a página para baixo para carregar mais elementos
print("Rolando a página para baixo para carregar elementos...")
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# Rolar para cima
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(1)
# Rolar para o meio
driver.execute_script("window.scrollTo(0, document.body.scrollHeight / 2);")
time.sleep(2)
# Trocar para o iframe que contém o formulário
print("\nMudando para o iframe do formulário...")
wait.until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe')))
# Confirmar que estamos no iframe correto procurando um campo conhecido
wait.until(EC.presence_of_element_located((By.ID, 'dataInicial')))
print("Dentro do iframe do formulário.")
# Tentar clicar no radio 'Produto' por várias estratégias
print("Procurando o radio 'Produto'...")
radio_xpath = "//input[@type='radio' and @name='filtro.formato' and (@id='filtro_produto' or translate(@value,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz')='produto')]"
try:
radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, radio_xpath)))
except Exception:
# Fallback: procurar label com texto 'Produto' e um input dentro
radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, "//label[normalize-space()[contains(., 'Produto')]]//input[@type='radio']")))
# Garantir visibilidade e clicar
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", radio_produto)
time.sleep(0.5)
driver.execute_script("arguments[0].click();", radio_produto)
print("Radio 'Produto' selecionado.")
time.sleep(0.5)
# Selecionar formato CSV (label dentro de #divArquivo)
print("Selecionando formato CSV...")
formato_csv_label = wait.until(EC.element_to_be_clickable((By.XPATH, "//*[@id='divArquivo']/div[2]/label")))
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", formato_csv_label)
time.sleep(0.3)
driver.execute_script("arguments[0].click();", formato_csv_label)
time.sleep(0.5)
# Antes de gerar, capturar arquivos existentes na pasta de download
before_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload'))
# Clicar no botão Gerar
print("Clicando no botão Gerar...")
botao_gerar = wait.until(EC.element_to_be_clickable((By.ID, 'gerar')))
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", botao_gerar)
time.sleep(0.3)
driver.execute_script("arguments[0].click();", botao_gerar)
# Voltar para o conteúdo principal
driver.switch_to.default_content()
# Aguardar download finalizar e tratar/renomear com a data usada
print("Aguardando download finalizar para tratar e renomear...")
end_time = time.time() + 180 # até 3 minutos
downloaded_file = None
while time.time() < end_time:
# Esperar terminar downloads parciais (.crdownload ou arquivos temporários do Chrome)
current_dir_files = os.listdir(download_dir)
if any(name.endswith('.crdownload') for name in current_dir_files):
time.sleep(1)
continue
# Filtrar arquivos válidos:
# - Não termina com .crdownload
# - Não começa com . (arquivos temporários/ocultos do Chrome)
# - Não contém .com.google.Chrome (arquivos temporários específicos)
def is_valid_file(filename):
if filename.endswith('.crdownload'):
return False
if filename.startswith('.'):
return False
if '.com.google.Chrome' in filename:
return False
return True
current_files = set(f for f in current_dir_files if is_valid_file(f))
new_files = [f for f in current_files - before_files]
if new_files:
# Escolher o mais recente
candidate = max((os.path.join(download_dir, f) for f in new_files), key=os.path.getmtime)
# Verificar se o arquivo realmente existe e tem tamanho > 0
if os.path.exists(candidate) and os.path.getsize(candidate) > 0:
downloaded_file = candidate
break
time.sleep(1)
def _normalize_label(s: str) -> str:
if s is None:
return ""
s = str(s)
s = unicodedata.normalize('NFKD', s)
s = ''.join(c for c in s if not unicodedata.combining(c))
s = s.lower()
# remover caracteres comuns
for ch in ['%', '(', ')', '/', '-', '_', '.', ':']:
s = s.replace(ch, ' ')
s = ' '.join(s.split())
return s
if downloaded_file:
print(f"Arquivo baixado detectado: {downloaded_file}")
# Preparar nome final (garantindo nome nico)
final_name = f"vendas_por_hora_{data_formatada}.csv"
final_path = os.path.join(download_dir, final_name)
if os.path.exists(final_path):
base_no_ext, ext = os.path.splitext(final_name)
counter = 1
while os.path.exists(os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")):
counter += 1
final_path = os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")
# Processar CSV removendo colunas e transformando campos
cols_para_remover = {
_normalize_label(n) for n in [
'Receita líquida', 'Receita liquida',
'Participação Receita (%)', 'Participacao Receita (%)',
'Maior receita', 'Maior Receita',
'Participação Boletos (%)', 'Participacao Boletos (%)'
]
}
try:
print("Abrindo arquivo CSV para leitura...")
# Detectar delimitador e ler
with open(downloaded_file, 'r', encoding='utf-8-sig', errors='ignore') as f:
print("Detectando delimitador do CSV...")
sample = f.read(4096)
f.seek(0)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=';,\t|')
print(f"Delimitador detectado: '{dialect.delimiter}'")
except Exception as e:
print(f"Erro ao detectar delimitador, usando ';' por padrão: {e}")
class Simple(csv.Dialect):
delimiter = ';'
quotechar = '"'
escapechar = None
doublequote = True
skipinitialspace = False
lineterminator = '\n'
quoting = csv.QUOTE_MINIMAL
dialect = Simple
print("Lendo cabeçalhos do CSV...")
reader = csv.DictReader(f, dialect=dialect)
original_headers = reader.fieldnames or []
print(f"Cabeçalhos encontrados: {original_headers}")
# Construir novos headers, removendo colunas e substituindo Produto
new_headers = []
produto_idx = None
for h in original_headers:
nh = _normalize_label(h)
if nh in cols_para_remover:
continue
if nh == _normalize_label('Produto'):
produto_idx = len(new_headers)
new_headers.extend(['SKU', 'Descricao'])
elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'):
new_headers.append('PRECO_MEDIO')
else:
new_headers.append(h)
# Se Produto não existir, garante as colunas novas ao final
if 'SKU' not in new_headers:
new_headers.extend(['SKU', 'Descricao'])
print(f"Novos cabeçalhos processados: {new_headers}")
print("Processando linhas do CSV...")
rows_out = []
row_count = 0
for row in reader:
row_count += 1
if row_count % 100 == 0:
print(f"Processadas {row_count} linhas...")
new_row = {}
for h in original_headers:
nh = _normalize_label(h)
if nh in cols_para_remover:
continue
if nh == _normalize_label('Produto'):
val = row.get(h, '') or ''
parts = [p.strip() for p in val.split(' - ', 1)]
sku = parts[0] if parts else ''
desc = parts[1] if len(parts) > 1 else val
new_row['SKU'] = sku
new_row['Descricao'] = desc
elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'):
val = row.get(h, '')
new_row['PRECO_MEDIO'] = val
else:
val = row.get(h, '')
if nh == _normalize_label('Loja'):
val = (val or '').split(' - ', 1)[0].strip()
new_row[h] = val
# Garantir chaves SKU/Descricao existam
new_row.setdefault('SKU', '')
new_row.setdefault('Descricao', '')
new_row.setdefault('PRECO_MEDIO', '')
rows_out.append(new_row)
print(f"Total de linhas processadas: {row_count}")
print(f"Total de linhas válidas: {len(rows_out)}")
# Inserir os dados tratados no banco de dados
print("Preparando para inserir dados no banco...")
def _find_header(headers, targets):
targets_norm = { _normalize_label(t) for t in targets }
for hh in headers:
if _normalize_label(hh) in targets_norm:
return hh
return None
def _parse_number(val):
s = str(val or '').strip()
if s == '':
return 0
# converter formato pt-BR para ponto flutuante
s = s.replace('.', '').replace(',', '.')
try:
num = float(s)
# se for inteiro, retorna int
return int(num) if abs(num - int(num)) < 1e-9 else num
except Exception:
return 0
# Mapear colunas de interesse
pdv_header = _find_header(new_headers, ['Loja', 'PDV'])
vendas_header = _find_header(new_headers, ['Vendas', 'Quantidade', 'Qtd', 'Qtd. Vendida', 'Qtd Vendida', 'Qtde Vendida', 'Qtde. Vendida', 'Quantidade Vendida', 'Qtde'])
preco_medio_header = _find_header(new_headers, ['PRECO_MEDIO', 'Preço Médio', 'Preco Medio'])
if not pdv_header:
raise Exception("Não foi possível identificar a coluna de PDV/Loja no CSV tratado.")
if not vendas_header:
print("Aviso: coluna de Vendas não encontrada; valores serão inseridos como 0.")
if not preco_medio_header:
print("Aviso: coluna de Preço Médio não encontrada; valores serão inseridos como 0.00.")
# Conectar ao SQL Server
print("Escolhendo driver SQL Server...")
driver_name = choose_sql_driver()
print(f"Driver selecionado: {driver_name}")
connection_string = (
f'DRIVER={{{driver_name}}};'
f'SERVER=10.77.77.10;'
f'DATABASE=GINSENG;'
f'UID=supginseng;'
f'PWD=Ginseng@;'
f'PORT=1433;'
f'TrustServerCertificate=yes;'
f'Encrypt=yes'
)
# Converter data para formato aceito pelo banco (date)
data_db = datetime.strptime(data_formatada, '%d%m%Y').date()
print(f"Data para inserção no banco: {data_db}")
print("Conectando ao banco de dados...")
inserted = 0
with pyodbc.connect(connection_string) as conn:
print("Conexão estabelecida com sucesso!")
conn.autocommit = False
cur = conn.cursor()
# Apagar dados existentes para a data
print(f"Deletando dados existentes para a data {data_db}...")
cur.execute("DELETE FROM [GINSENG].[dbo].[rgb_sales_selenium] WHERE [Data] = ?", data_db)
print("Dados antigos deletados.")
# Inserir linhas
print("Preparando inserção de dados...")
insert_sql = (
"INSERT INTO [GINSENG].[dbo].[rgb_sales_selenium] ([Data],[PDV],[SKU],[DESCRICAO],[VENDAS],[PRECO_MEDIO]) "
"VALUES (?,?,?,?,?,?)"
)
batch = []
batch_count = 0
for r in rows_out:
pdv_val = r.get(pdv_header, '')
try:
pdv_val = int(str(pdv_val).strip().split()[0]) if str(pdv_val).strip() else None
except Exception:
pdv_val = None
sku_val = r.get('SKU', '')
desc_val = r.get('Descricao', '')
vendas_val = _parse_number(r.get(vendas_header, '')) if vendas_header else 0
preco_medio_val = _parse_number(r.get(preco_medio_header, '')) if preco_medio_header else 0.00
if pdv_val is None and r.get('Loja'):
try:
pdv_val = int(str(r.get('Loja')).strip().split()[0])
except Exception:
pdv_val = None
batch.append((data_db, pdv_val, sku_val, desc_val, vendas_val, preco_medio_val))
if len(batch) >= 1000:
batch_count += 1
print(f"Inserindo lote {batch_count} ({len(batch)} registros)...")
cur.executemany(insert_sql, batch)
inserted += len(batch)
batch = []
if batch:
batch_count += 1
print(f"Inserindo lote final {batch_count} ({len(batch)} registros)...")
cur.executemany(insert_sql, batch)
inserted += len(batch)
print("Fazendo commit das alterações...")
conn.commit()
print("Commit realizado com sucesso!")
print(f"Dados inseridos no banco: {inserted} registros para a data {data_db}.")
# Remover arquivo original
print(f"Removendo arquivo temporário: {downloaded_file}")
try:
os.remove(downloaded_file)
print("Arquivo removido com sucesso.")
except Exception as e:
print(f"Aviso: não foi possível remover o arquivo: {e}")
# Salvar estado apenas após sucesso completo
if USE_DATE_RANGE:
print("Salvando estado do processamento...")
_save_date_state(data_formatada)
print("Processamento concluído com sucesso!")
except Exception as e:
print(f"Falha ao tratar o CSV: {e}")
import traceback
traceback.print_exc()
else:
print("Não foi possível detectar o arquivo baixado dentro do tempo limite.")
# Pausa breve antes de encerrar
time.sleep(2)
print("Fechando o navegador...")
except Exception as e:
print(f"Erro durante a execução: {e}")
finally:
# Fechar o navegador
driver.quit()
print("Script finalizado.")
def run_with_retry(max_retries=3, retry_delay=120):
"""Executa main() com retentativas em caso de erro de timeout/conexão/arquivo."""
for attempt in range(1, max_retries + 1):
try:
main()
return True # Sucesso
except Exception as e:
error_str = str(e).lower()
# Verificar se é erro de timeout/conexão/arquivo temporário que vale retry
is_retryable = any(keyword in error_str for keyword in [
'read timed out',
'timed out',
'timeout',
'connectionpool',
'connection refused',
'connection reset',
'httpconnectionpool',
'no such file or directory',
'.com.google.chrome',
'filenotfounderror'
])
if is_retryable and attempt < max_retries:
print(f"\n{'=' * 60}")
print(f"Erro de conexão/timeout detectado (tentativa {attempt}/{max_retries})")
print(f"Erro: {e}")
print(f"Aguardando {retry_delay} segundos antes de tentar novamente...")
print(f"{'=' * 60}\n")
time.sleep(retry_delay)
else:
if attempt == max_retries and is_retryable:
print(f"\n{'=' * 60}")
print(f"Erro persistente após {max_retries} tentativas.")
print(f"Último erro: {e}")
print(f"{'=' * 60}\n")
raise
return False
if __name__ == "__main__":
if USE_DATE_RANGE:
print("=" * 60)
print("MODO LOOP ATIVADO - O script executará continuamente")
print(f"Intervalo: {DATE_RANGE_START} até {DATE_RANGE_END}")
print("=" * 60)
while not RANGE_COMPLETED:
try:
run_with_retry(max_retries=3, retry_delay=120)
if RANGE_COMPLETED:
break
print("\n" + "=" * 60)
print("Execução concluída. Aguardando 10 segundos antes da próxima execução...")
print("=" * 60 + "\n")
time.sleep(10) # Aguarda 10 segundos antes da próxima execução
except Exception as e:
print(f"\nErro na execução: {e}")
print("Tentando novamente em 10 segundos...")
time.sleep(10)
print("\n" + "=" * 60)
print("Intervalo de datas completado. Script finalizado.")
print("=" * 60)
else:
run_with_retry(max_retries=3, retry_delay=120)