773 lines
34 KiB
Python
773 lines
34 KiB
Python
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
# from selenium.webdriver.chrome.service import Service # Usado apenas para Linux/Docker
|
||
from selenium.webdriver.chrome.options import Options
|
||
import time
|
||
import os
|
||
import csv
|
||
import unicodedata
|
||
import pyodbc
|
||
|
||
from datetime import datetime, timedelta
|
||
import json
|
||
|
||
# Configuração da data
|
||
USE_MANUAL_DATE = False # False = usar dia anterior automaticamente; True = usar MANUAL_DATE_STR
|
||
MANUAL_DATE_STR = "20112025" # Formato DDMMAAAA, usado quando USE_MANUAL_DATE=True
|
||
|
||
# Configuração de intervalo de datas (execução dia por dia)
|
||
USE_DATE_RANGE = False # True = usar intervalo de datas; False = usar USE_MANUAL_DATE
|
||
DATE_RANGE_START = "15/10/2025" # Formato DD/MM/YYYY
|
||
DATE_RANGE_END = "19/10/2025" # Formato DD/MM/YYYY
|
||
STATE_FILE = "date_range_state.json" # Arquivo para rastrear progresso
|
||
RANGE_COMPLETED = False # Flag para indicar que o intervalo foi completado
|
||
|
||
def _parse_date_ddmmyyyy(date_str: str) -> datetime:
|
||
"""Converte string DD/MM/YYYY para datetime."""
|
||
try:
|
||
return datetime.strptime(date_str.strip(), "%d/%m/%Y")
|
||
except Exception:
|
||
raise ValueError(f"Formato de data inválido: {date_str}. Use DD/MM/YYYY")
|
||
|
||
def _get_next_date_in_range() -> str:
|
||
"""Retorna a próxima data do intervalo no formato DDMMAAAA.
|
||
Rastreia o progresso em um arquivo JSON.
|
||
Quando chega ao fim, para de executar."""
|
||
global RANGE_COMPLETED
|
||
|
||
try:
|
||
start_dt = _parse_date_ddmmyyyy(DATE_RANGE_START)
|
||
end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END)
|
||
except ValueError as e:
|
||
print(f"Erro ao parsear datas: {e}")
|
||
raise
|
||
|
||
# Carregar estado anterior
|
||
current_date = start_dt
|
||
if os.path.exists(STATE_FILE):
|
||
try:
|
||
with open(STATE_FILE, 'r') as f:
|
||
state = json.load(f)
|
||
# Verificar se o intervalo já foi completado
|
||
if state.get('completed', False):
|
||
print(f"Intervalo de datas já foi completado!")
|
||
print(f"Última data processada: {state.get('last_date')}")
|
||
print(f"Para reiniciar, delete o arquivo '{STATE_FILE}'")
|
||
RANGE_COMPLETED = True
|
||
return None # Sinal para parar
|
||
|
||
last_date_str = state.get('last_date')
|
||
if last_date_str:
|
||
last_date = datetime.strptime(last_date_str, "%d%m%Y")
|
||
# Se a última data foi antes do fim, incrementar um dia
|
||
if last_date < end_dt:
|
||
current_date = last_date + timedelta(days=1)
|
||
else:
|
||
# Chegou ao fim
|
||
RANGE_COMPLETED = True
|
||
return None # Sinal para parar
|
||
except Exception as e:
|
||
print(f"Erro ao ler arquivo de estado: {e}. Iniciando do início.")
|
||
current_date = start_dt
|
||
|
||
print(f"Data a processar: {current_date.strftime('%d/%m/%Y')}")
|
||
return current_date.strftime("%d%m%Y")
|
||
|
||
def _save_date_state(date_str: str) -> None:
|
||
"""Salva o estado da data processada com sucesso.
|
||
Deve ser chamado apenas após a conclusão bem-sucedida.
|
||
Se for a data final, marca como completo."""
|
||
try:
|
||
end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END)
|
||
current_dt = datetime.strptime(date_str, "%d%m%Y")
|
||
|
||
state = {'last_date': date_str}
|
||
# Se chegou na data final, marcar como completo
|
||
if current_dt >= end_dt:
|
||
state['completed'] = True
|
||
print(f"Intervalo de datas COMPLETO! Última data: {date_str}")
|
||
|
||
with open(STATE_FILE, 'w') as f:
|
||
json.dump(state, f)
|
||
print(f"Estado salvo: {date_str}")
|
||
except Exception as e:
|
||
print(f"Aviso: não foi possível salvar estado: {e}")
|
||
|
||
def resolve_data(use_manual: bool, manual_str: str):
|
||
"""Retorna a data no formato DDMMAAAA.
|
||
Se USE_DATE_RANGE=True, usa o intervalo de datas (uma por execução).
|
||
Se use_manual=True, valida e usa manual_str; caso contrário, usa o dia anterior.
|
||
Retorna None se o intervalo de datas foi completado."""
|
||
if USE_DATE_RANGE:
|
||
return _get_next_date_in_range()
|
||
|
||
if use_manual:
|
||
s = ''.join(ch for ch in str(manual_str) if ch.isdigit())
|
||
if len(s) == 8:
|
||
return s
|
||
else:
|
||
print("Data manual inválida; usando dia anterior automaticamente.")
|
||
return (datetime.now() - timedelta(days=1)).strftime("%d%m%Y")
|
||
|
||
def choose_sql_driver() -> str:
|
||
try:
|
||
available = pyodbc.drivers()
|
||
except Exception:
|
||
available = []
|
||
# Prefer 18, depois 17, depois o primeiro da lista que contenha 'SQL Server'
|
||
for candidate in ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server', 'SQL Server']:
|
||
if candidate in available:
|
||
return candidate
|
||
for d in available:
|
||
if 'SQL Server' in d:
|
||
return d
|
||
# Fallback (pode falhar na conexão se realmente não houver driver)
|
||
return 'ODBC Driver 17 for SQL Server'
|
||
|
||
|
||
def main():
|
||
# Configurar opções do Chrome para ambiente Kubernetes/Docker
|
||
chrome_options = Options()
|
||
|
||
# CONFIGURAÇÃO: Defina como False para ver o navegador (útil para debug no Windows)
|
||
USE_HEADLESS = True # True = modo headless (sem interface), False = com interface
|
||
|
||
# Configurações essenciais para rodar em Docker/Kubernetes (sem interface gráfica)
|
||
if USE_HEADLESS:
|
||
chrome_options.add_argument('--headless=new') # Novo modo headless (mais estável)
|
||
|
||
chrome_options.add_argument('--no-sandbox') # Necessário para rodar como root
|
||
chrome_options.add_argument('--disable-dev-shm-usage') # Evita problemas de memória compartilhada
|
||
|
||
# Configurações adicionais recomendadas
|
||
chrome_options.add_argument('--disable-software-rasterizer')
|
||
chrome_options.add_argument('--disable-extensions')
|
||
chrome_options.add_argument('--disable-blink-features=AutomationControlled') # Evita detecção de automação
|
||
|
||
# Configurações de janela
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_argument('--start-maximized')
|
||
|
||
# Desabilitar notificações e popups
|
||
chrome_options.add_argument('--disable-notifications')
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging', 'enable-automation'])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# Definir pasta de download
|
||
# Tenta usar Desktop, se não existir usa /tmp (para Kubernetes/Docker)
|
||
desktop_dir = os.path.join(os.path.expanduser("~"), "Desktop")
|
||
if os.path.exists(desktop_dir):
|
||
download_dir = desktop_dir
|
||
else:
|
||
# Em ambiente Kubernetes/Docker, usar /tmp ou criar diretório
|
||
download_dir = "/tmp/downloads"
|
||
os.makedirs(download_dir, exist_ok=True)
|
||
print(f"Usando diretório de download: {download_dir}")
|
||
|
||
prefs = {
|
||
"download.default_directory": download_dir,
|
||
"download.prompt_for_download": False,
|
||
"download.directory_upgrade": True,
|
||
"safebrowsing.enabled": True,
|
||
"profile.default_content_setting_values.notifications": 2 # Desabilitar notificações
|
||
}
|
||
chrome_options.add_experimental_option("prefs", prefs)
|
||
|
||
# Usar o binário do Chromium instalado no sistema (para Kubernetes/Docker)
|
||
# Comentar a linha abaixo se estiver rodando localmente no Windows
|
||
# chrome_options.binary_location = '/usr/bin/chromium'
|
||
|
||
# Inicializar o driver do Chrome
|
||
print("Iniciando o navegador...")
|
||
print(f"Modo headless: {USE_HEADLESS}")
|
||
|
||
try:
|
||
# Criar o service apontando para o chromedriver do sistema (Kubernetes/Docker)
|
||
# Comentar a linha abaixo se estiver rodando localmente no Windows
|
||
# from selenium.webdriver.chrome.service import Service
|
||
# service = Service('/usr/bin/chromedriver')
|
||
# driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
|
||
# Para rodar localmente no Windows, use:
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
print("Navegador iniciado com sucesso!")
|
||
except Exception as e:
|
||
print(f"Erro ao iniciar o navegador: {e}")
|
||
print("\nPossíveis soluções:")
|
||
print("1. Verifique se o Chrome está instalado")
|
||
print("2. Verifique se o ChromeDriver está instalado e atualizado")
|
||
print("3. Execute: pip install --upgrade selenium")
|
||
print("4. Tente desabilitar o modo headless (USE_HEADLESS = False)")
|
||
raise
|
||
|
||
try:
|
||
# Acessar a URL
|
||
url = "https://cp10269.retaguarda.grupoboticario.com.br/app/#/"
|
||
print(f"Acessando {url}...")
|
||
driver.get(url)
|
||
|
||
# Aguardar o campo de input estar presente e visível
|
||
print("Aguardando o campo de login aparecer...")
|
||
wait = WebDriverWait(driver, 10)
|
||
input_field = wait.until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-usuario-input-field"]'))
|
||
)
|
||
|
||
# Digitar "daniel.medeiros" no campo de login
|
||
print("Digitando 'daniel.medeiros' no campo de login...")
|
||
input_field.clear()
|
||
input_field.send_keys("daniel.medeiros")
|
||
|
||
# Aguardar o campo de senha aparecer
|
||
print("Aguardando o campo de senha aparecer...")
|
||
password_field = wait.until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-senha-input-field"]'))
|
||
)
|
||
|
||
# Digitar a senha
|
||
print("Digitando a senha...")
|
||
password_field.clear()
|
||
password_field.send_keys("@ginseng")
|
||
|
||
# Clicar no botão de entrar com retentativa se houver erro do reCAPTCHA
|
||
print("Clicando no botão Entrar...")
|
||
max_login_attempts = 3
|
||
for attempt in range(1, max_login_attempts + 1):
|
||
try:
|
||
entrar_button = wait.until(
|
||
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]'))
|
||
)
|
||
# usar JS click para maior robustez
|
||
driver.execute_script("arguments[0].click();", entrar_button)
|
||
except Exception:
|
||
try:
|
||
entrar_button = wait.until(
|
||
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]'))
|
||
)
|
||
entrar_button.click()
|
||
except Exception:
|
||
pass
|
||
|
||
# Verificar se o menu "Venda" apareceu como sinal de login bem-sucedido
|
||
try:
|
||
WebDriverWait(driver, 10).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]'))
|
||
)
|
||
print("Login confirmado.")
|
||
break
|
||
except Exception:
|
||
# Verificar mensagem de erro no corpo da página
|
||
try:
|
||
body_text = driver.execute_script("return dment.body ? document.body.innerText : ''") or ""
|
||
except Exception:
|
||
body_text = ""
|
||
if 'grecaptcha.execute is not a function' in body_text.lower() or 'this.grecaptcha.execute is not a function' in body_text.lower():
|
||
print("Aviso do reCAPTCHA detectado; tentando clicar Entrar novamente...")
|
||
else:
|
||
print(f"Login ainda não confirmado (tentativa {attempt}/{max_login_attempts}). Tentando novamente...")
|
||
time.sleep(2)
|
||
if attempt == max_login_attempts:
|
||
raise Exception("Falha ao efetuar login após múltiplas tentativas.")
|
||
|
||
# Clicar em "Venda" no menu lateral
|
||
print("Clicando em 'Venda'...")
|
||
venda_menu = wait.until(
|
||
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]'))
|
||
)
|
||
venda_menu.click()
|
||
time.sleep(1)
|
||
|
||
# Clicar em "Relatórios"
|
||
print("Clicando em 'Relatórios'...")
|
||
relatorios_menu = wait.until(
|
||
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-relatorios"]'))
|
||
)
|
||
relatorios_menu.click()
|
||
time.sleep(1)
|
||
|
||
# Clicar em "Vendas por Hora"
|
||
print("Clicando em 'Vendas por Hora'...")
|
||
vendas_hora_menu = wait.until(
|
||
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-vendas-por-hora"]'))
|
||
)
|
||
vendas_hora_menu.click()
|
||
|
||
# Aguardar a página de relatório carregar
|
||
print("Aguardando a página de relatório carregar...")
|
||
time.sleep(3)
|
||
|
||
# Resolver a data a ser utilizada (dia anterior ou manual), formato DDMMAAAA
|
||
data_formatada = resolve_data(USE_MANUAL_DATE, MANUAL_DATE_STR)
|
||
|
||
# Se retornar None, significa que o intervalo de datas foi completado
|
||
if data_formatada is None:
|
||
print("Intervalo de datas completado. Encerrando...")
|
||
return
|
||
|
||
print(f"Data a ser preenchida: {data_formatada}")
|
||
|
||
# Clicar no canto inferior direito da página usando JavaScript
|
||
print("Clicando no canto inferior direito da página...")
|
||
driver.execute_script("""
|
||
var width = window.innerWidth;
|
||
var height = window.innerHeight;
|
||
var element = document.elementFromPoint(width - 50, height - 50);
|
||
if (element) {
|
||
element.click();
|
||
}
|
||
""")
|
||
time.sleep(0.5)
|
||
|
||
# Apertar CTRL + K
|
||
print("Apertando CTRL + K...")
|
||
from selenium.webdriver.common.action_chains import ActionChains
|
||
actions = ActionChains(driver)
|
||
actions.key_down(Keys.CONTROL).send_keys('k').key_up(Keys.CONTROL).perform()
|
||
|
||
# Aguardar a página carregar completamente
|
||
print("Aguardando 10 segundos para a página carregar...")
|
||
time.sleep(10)
|
||
|
||
# Apertar TAB 3 vezes
|
||
print("Apertando TAB 3 vezes...")
|
||
for i in range(3):
|
||
actions.send_keys(Keys.TAB).perform()
|
||
time.sleep(0.3)
|
||
|
||
# Digitar a data do dia anterior
|
||
print(f"Digitando a data inicial: {data_formatada}")
|
||
actions.send_keys(data_formatada).perform()
|
||
time.sleep(0.5)
|
||
|
||
# Digitar a mesma data novamente (data final)
|
||
print(f"Digitando a data final: {data_formatada}")
|
||
actions.send_keys(data_formatada).perform()
|
||
time.sleep(2)
|
||
|
||
# Pressionar ENTER ou ESC para fechar possíveis popups/calendários
|
||
print("Pressionando ESC para fechar calendários...")
|
||
actions.send_keys(Keys.ESCAPE).perform()
|
||
time.sleep(1)
|
||
|
||
# Rolar a página para baixo para carregar mais elementos
|
||
print("Rolando a página para baixo para carregar elementos...")
|
||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
||
time.sleep(2)
|
||
|
||
# Rolar para cima
|
||
driver.execute_script("window.scrollTo(0, 0);")
|
||
time.sleep(1)
|
||
|
||
# Rolar para o meio
|
||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight / 2);")
|
||
time.sleep(2)
|
||
|
||
# Trocar para o iframe que contém o formulário
|
||
print("\nMudando para o iframe do formulário...")
|
||
wait.until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe')))
|
||
|
||
# Confirmar que estamos no iframe correto procurando um campo conhecido
|
||
wait.until(EC.presence_of_element_located((By.ID, 'dataInicial')))
|
||
print("Dentro do iframe do formulário.")
|
||
|
||
# Tentar clicar no radio 'Produto' por várias estratégias
|
||
print("Procurando o radio 'Produto'...")
|
||
radio_xpath = "//input[@type='radio' and @name='filtro.formato' and (@id='filtro_produto' or translate(@value,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz')='produto')]"
|
||
try:
|
||
radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, radio_xpath)))
|
||
except Exception:
|
||
# Fallback: procurar label com texto 'Produto' e um input dentro
|
||
radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, "//label[normalize-space()[contains(., 'Produto')]]//input[@type='radio']")))
|
||
|
||
# Garantir visibilidade e clicar
|
||
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", radio_produto)
|
||
time.sleep(0.5)
|
||
driver.execute_script("arguments[0].click();", radio_produto)
|
||
print("Radio 'Produto' selecionado.")
|
||
time.sleep(0.5)
|
||
|
||
# Selecionar formato CSV (label dentro de #divArquivo)
|
||
print("Selecionando formato CSV...")
|
||
formato_csv_label = wait.until(EC.element_to_be_clickable((By.XPATH, "//*[@id='divArquivo']/div[2]/label")))
|
||
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", formato_csv_label)
|
||
time.sleep(0.3)
|
||
driver.execute_script("arguments[0].click();", formato_csv_label)
|
||
time.sleep(0.5)
|
||
|
||
# Antes de gerar, capturar arquivos existentes na pasta de download
|
||
before_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload'))
|
||
|
||
# Clicar no botão Gerar
|
||
print("Clicando no botão Gerar...")
|
||
botao_gerar = wait.until(EC.element_to_be_clickable((By.ID, 'gerar')))
|
||
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", botao_gerar)
|
||
time.sleep(0.3)
|
||
driver.execute_script("arguments[0].click();", botao_gerar)
|
||
|
||
# Voltar para o conteúdo principal
|
||
driver.switch_to.default_content()
|
||
|
||
# Aguardar download finalizar e tratar/renomear com a data usada
|
||
print("Aguardando download finalizar para tratar e renomear...")
|
||
end_time = time.time() + 180 # até 3 minutos
|
||
downloaded_file = None
|
||
while time.time() < end_time:
|
||
# Esperar terminar downloads parciais (.crdownload ou arquivos temporários do Chrome)
|
||
current_dir_files = os.listdir(download_dir)
|
||
if any(name.endswith('.crdownload') for name in current_dir_files):
|
||
time.sleep(1)
|
||
continue
|
||
|
||
# Filtrar arquivos válidos:
|
||
# - Não termina com .crdownload
|
||
# - Não começa com . (arquivos temporários/ocultos do Chrome)
|
||
# - Não contém .com.google.Chrome (arquivos temporários específicos)
|
||
def is_valid_file(filename):
|
||
if filename.endswith('.crdownload'):
|
||
return False
|
||
if filename.startswith('.'):
|
||
return False
|
||
if '.com.google.Chrome' in filename:
|
||
return False
|
||
return True
|
||
|
||
current_files = set(f for f in current_dir_files if is_valid_file(f))
|
||
new_files = [f for f in current_files - before_files]
|
||
|
||
if new_files:
|
||
# Escolher o mais recente
|
||
candidate = max((os.path.join(download_dir, f) for f in new_files), key=os.path.getmtime)
|
||
# Verificar se o arquivo realmente existe e tem tamanho > 0
|
||
if os.path.exists(candidate) and os.path.getsize(candidate) > 0:
|
||
downloaded_file = candidate
|
||
break
|
||
time.sleep(1)
|
||
|
||
def _normalize_label(s: str) -> str:
|
||
if s is None:
|
||
return ""
|
||
s = str(s)
|
||
s = unicodedata.normalize('NFKD', s)
|
||
s = ''.join(c for c in s if not unicodedata.combining(c))
|
||
s = s.lower()
|
||
# remover caracteres comuns
|
||
for ch in ['%', '(', ')', '/', '-', '_', '.', ':']:
|
||
s = s.replace(ch, ' ')
|
||
s = ' '.join(s.split())
|
||
return s
|
||
|
||
if downloaded_file:
|
||
print(f"Arquivo baixado detectado: {downloaded_file}")
|
||
# Preparar nome final (garantindo nome nico)
|
||
final_name = f"vendas_por_hora_{data_formatada}.csv"
|
||
final_path = os.path.join(download_dir, final_name)
|
||
if os.path.exists(final_path):
|
||
base_no_ext, ext = os.path.splitext(final_name)
|
||
counter = 1
|
||
while os.path.exists(os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")):
|
||
counter += 1
|
||
final_path = os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")
|
||
# Processar CSV removendo colunas e transformando campos
|
||
cols_para_remover = {
|
||
_normalize_label(n) for n in [
|
||
'Receita líquida', 'Receita liquida',
|
||
'Participação Receita (%)', 'Participacao Receita (%)',
|
||
'Maior receita', 'Maior Receita',
|
||
'Participação Boletos (%)', 'Participacao Boletos (%)'
|
||
]
|
||
}
|
||
try:
|
||
print("Abrindo arquivo CSV para leitura...")
|
||
# Detectar delimitador e ler
|
||
with open(downloaded_file, 'r', encoding='utf-8-sig', errors='ignore') as f:
|
||
print("Detectando delimitador do CSV...")
|
||
sample = f.read(4096)
|
||
f.seek(0)
|
||
try:
|
||
dialect = csv.Sniffer().sniff(sample, delimiters=';,\t|')
|
||
print(f"Delimitador detectado: '{dialect.delimiter}'")
|
||
except Exception as e:
|
||
print(f"Erro ao detectar delimitador, usando ';' por padrão: {e}")
|
||
class Simple(csv.Dialect):
|
||
delimiter = ';'
|
||
quotechar = '"'
|
||
escapechar = None
|
||
doublequote = True
|
||
skipinitialspace = False
|
||
lineterminator = '\n'
|
||
quoting = csv.QUOTE_MINIMAL
|
||
dialect = Simple
|
||
|
||
print("Lendo cabeçalhos do CSV...")
|
||
reader = csv.DictReader(f, dialect=dialect)
|
||
original_headers = reader.fieldnames or []
|
||
print(f"Cabeçalhos encontrados: {original_headers}")
|
||
|
||
# Construir novos headers, removendo colunas e substituindo Produto
|
||
new_headers = []
|
||
produto_idx = None
|
||
for h in original_headers:
|
||
nh = _normalize_label(h)
|
||
if nh in cols_para_remover:
|
||
continue
|
||
if nh == _normalize_label('Produto'):
|
||
produto_idx = len(new_headers)
|
||
new_headers.extend(['SKU', 'Descricao'])
|
||
elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'):
|
||
new_headers.append('PRECO_MEDIO')
|
||
else:
|
||
new_headers.append(h)
|
||
|
||
# Se Produto não existir, garante as colunas novas ao final
|
||
if 'SKU' not in new_headers:
|
||
new_headers.extend(['SKU', 'Descricao'])
|
||
|
||
print(f"Novos cabeçalhos processados: {new_headers}")
|
||
print("Processando linhas do CSV...")
|
||
|
||
rows_out = []
|
||
row_count = 0
|
||
for row in reader:
|
||
row_count += 1
|
||
if row_count % 100 == 0:
|
||
print(f"Processadas {row_count} linhas...")
|
||
new_row = {}
|
||
for h in original_headers:
|
||
nh = _normalize_label(h)
|
||
if nh in cols_para_remover:
|
||
continue
|
||
if nh == _normalize_label('Produto'):
|
||
val = row.get(h, '') or ''
|
||
parts = [p.strip() for p in val.split(' - ', 1)]
|
||
sku = parts[0] if parts else ''
|
||
desc = parts[1] if len(parts) > 1 else val
|
||
new_row['SKU'] = sku
|
||
new_row['Descricao'] = desc
|
||
elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'):
|
||
val = row.get(h, '')
|
||
new_row['PRECO_MEDIO'] = val
|
||
else:
|
||
val = row.get(h, '')
|
||
if nh == _normalize_label('Loja'):
|
||
val = (val or '').split(' - ', 1)[0].strip()
|
||
new_row[h] = val
|
||
# Garantir chaves SKU/Descricao existam
|
||
new_row.setdefault('SKU', '')
|
||
new_row.setdefault('Descricao', '')
|
||
new_row.setdefault('PRECO_MEDIO', '')
|
||
rows_out.append(new_row)
|
||
|
||
print(f"Total de linhas processadas: {row_count}")
|
||
print(f"Total de linhas válidas: {len(rows_out)}")
|
||
|
||
# Inserir os dados tratados no banco de dados
|
||
print("Preparando para inserir dados no banco...")
|
||
def _find_header(headers, targets):
|
||
targets_norm = { _normalize_label(t) for t in targets }
|
||
for hh in headers:
|
||
if _normalize_label(hh) in targets_norm:
|
||
return hh
|
||
return None
|
||
|
||
def _parse_number(val):
|
||
s = str(val or '').strip()
|
||
if s == '':
|
||
return 0
|
||
# converter formato pt-BR para ponto flutuante
|
||
s = s.replace('.', '').replace(',', '.')
|
||
try:
|
||
num = float(s)
|
||
# se for inteiro, retorna int
|
||
return int(num) if abs(num - int(num)) < 1e-9 else num
|
||
except Exception:
|
||
return 0
|
||
|
||
# Mapear colunas de interesse
|
||
pdv_header = _find_header(new_headers, ['Loja', 'PDV'])
|
||
vendas_header = _find_header(new_headers, ['Vendas', 'Quantidade', 'Qtd', 'Qtd. Vendida', 'Qtd Vendida', 'Qtde Vendida', 'Qtde. Vendida', 'Quantidade Vendida', 'Qtde'])
|
||
preco_medio_header = _find_header(new_headers, ['PRECO_MEDIO', 'Preço Médio', 'Preco Medio'])
|
||
|
||
if not pdv_header:
|
||
raise Exception("Não foi possível identificar a coluna de PDV/Loja no CSV tratado.")
|
||
if not vendas_header:
|
||
print("Aviso: coluna de Vendas não encontrada; valores serão inseridos como 0.")
|
||
if not preco_medio_header:
|
||
print("Aviso: coluna de Preço Médio não encontrada; valores serão inseridos como 0.00.")
|
||
|
||
# Conectar ao SQL Server
|
||
print("Escolhendo driver SQL Server...")
|
||
driver_name = choose_sql_driver()
|
||
print(f"Driver selecionado: {driver_name}")
|
||
|
||
connection_string = (
|
||
f'DRIVER={{{driver_name}}};'
|
||
f'SERVER=10.77.77.10;'
|
||
f'DATABASE=GINSENG;'
|
||
f'UID=supginseng;'
|
||
f'PWD=Ginseng@;'
|
||
f'PORT=1433;'
|
||
f'TrustServerCertificate=yes;'
|
||
f'Encrypt=yes'
|
||
)
|
||
|
||
# Converter data para formato aceito pelo banco (date)
|
||
data_db = datetime.strptime(data_formatada, '%d%m%Y').date()
|
||
print(f"Data para inserção no banco: {data_db}")
|
||
|
||
print("Conectando ao banco de dados...")
|
||
inserted = 0
|
||
with pyodbc.connect(connection_string) as conn:
|
||
print("Conexão estabelecida com sucesso!")
|
||
conn.autocommit = False
|
||
cur = conn.cursor()
|
||
|
||
# Apagar dados existentes para a data
|
||
print(f"Deletando dados existentes para a data {data_db}...")
|
||
cur.execute("DELETE FROM [GINSENG].[dbo].[rgb_sales_selenium] WHERE [Data] = ?", data_db)
|
||
print("Dados antigos deletados.")
|
||
|
||
# Inserir linhas
|
||
print("Preparando inserção de dados...")
|
||
insert_sql = (
|
||
"INSERT INTO [GINSENG].[dbo].[rgb_sales_selenium] ([Data],[PDV],[SKU],[DESCRICAO],[VENDAS],[PRECO_MEDIO]) "
|
||
"VALUES (?,?,?,?,?,?)"
|
||
)
|
||
batch = []
|
||
batch_count = 0
|
||
for r in rows_out:
|
||
pdv_val = r.get(pdv_header, '')
|
||
try:
|
||
pdv_val = int(str(pdv_val).strip().split()[0]) if str(pdv_val).strip() else None
|
||
except Exception:
|
||
pdv_val = None
|
||
sku_val = r.get('SKU', '')
|
||
desc_val = r.get('Descricao', '')
|
||
vendas_val = _parse_number(r.get(vendas_header, '')) if vendas_header else 0
|
||
preco_medio_val = _parse_number(r.get(preco_medio_header, '')) if preco_medio_header else 0.00
|
||
if pdv_val is None and r.get('Loja'):
|
||
try:
|
||
pdv_val = int(str(r.get('Loja')).strip().split()[0])
|
||
except Exception:
|
||
pdv_val = None
|
||
batch.append((data_db, pdv_val, sku_val, desc_val, vendas_val, preco_medio_val))
|
||
if len(batch) >= 1000:
|
||
batch_count += 1
|
||
print(f"Inserindo lote {batch_count} ({len(batch)} registros)...")
|
||
cur.executemany(insert_sql, batch)
|
||
inserted += len(batch)
|
||
batch = []
|
||
if batch:
|
||
batch_count += 1
|
||
print(f"Inserindo lote final {batch_count} ({len(batch)} registros)...")
|
||
cur.executemany(insert_sql, batch)
|
||
inserted += len(batch)
|
||
|
||
print("Fazendo commit das alterações...")
|
||
conn.commit()
|
||
print("Commit realizado com sucesso!")
|
||
|
||
print(f"Dados inseridos no banco: {inserted} registros para a data {data_db}.")
|
||
|
||
# Remover arquivo original
|
||
print(f"Removendo arquivo temporário: {downloaded_file}")
|
||
try:
|
||
os.remove(downloaded_file)
|
||
print("Arquivo removido com sucesso.")
|
||
except Exception as e:
|
||
print(f"Aviso: não foi possível remover o arquivo: {e}")
|
||
|
||
# Salvar estado apenas após sucesso completo
|
||
if USE_DATE_RANGE:
|
||
print("Salvando estado do processamento...")
|
||
_save_date_state(data_formatada)
|
||
|
||
print("Processamento concluído com sucesso!")
|
||
except Exception as e:
|
||
print(f"Falha ao tratar o CSV: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
else:
|
||
print("Não foi possível detectar o arquivo baixado dentro do tempo limite.")
|
||
|
||
# Pausa breve antes de encerrar
|
||
time.sleep(2)
|
||
|
||
print("Fechando o navegador...")
|
||
|
||
except Exception as e:
|
||
print(f"Erro durante a execução: {e}")
|
||
|
||
finally:
|
||
# Fechar o navegador
|
||
driver.quit()
|
||
print("Script finalizado.")
|
||
|
||
def run_with_retry(max_retries=3, retry_delay=120):
|
||
"""Executa main() com retentativas em caso de erro de timeout/conexão/arquivo."""
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
main()
|
||
return True # Sucesso
|
||
except Exception as e:
|
||
error_str = str(e).lower()
|
||
# Verificar se é erro de timeout/conexão/arquivo temporário que vale retry
|
||
is_retryable = any(keyword in error_str for keyword in [
|
||
'read timed out',
|
||
'timed out',
|
||
'timeout',
|
||
'connectionpool',
|
||
'connection refused',
|
||
'connection reset',
|
||
'httpconnectionpool',
|
||
'no such file or directory',
|
||
'.com.google.chrome',
|
||
'filenotfounderror'
|
||
])
|
||
|
||
if is_retryable and attempt < max_retries:
|
||
print(f"\n{'=' * 60}")
|
||
print(f"Erro de conexão/timeout detectado (tentativa {attempt}/{max_retries})")
|
||
print(f"Erro: {e}")
|
||
print(f"Aguardando {retry_delay} segundos antes de tentar novamente...")
|
||
print(f"{'=' * 60}\n")
|
||
time.sleep(retry_delay)
|
||
else:
|
||
if attempt == max_retries and is_retryable:
|
||
print(f"\n{'=' * 60}")
|
||
print(f"Erro persistente após {max_retries} tentativas.")
|
||
print(f"Último erro: {e}")
|
||
print(f"{'=' * 60}\n")
|
||
raise
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if USE_DATE_RANGE:
|
||
print("=" * 60)
|
||
print("MODO LOOP ATIVADO - O script executará continuamente")
|
||
print(f"Intervalo: {DATE_RANGE_START} até {DATE_RANGE_END}")
|
||
print("=" * 60)
|
||
while not RANGE_COMPLETED:
|
||
try:
|
||
run_with_retry(max_retries=3, retry_delay=120)
|
||
if RANGE_COMPLETED:
|
||
break
|
||
print("\n" + "=" * 60)
|
||
print("Execução concluída. Aguardando 10 segundos antes da próxima execução...")
|
||
print("=" * 60 + "\n")
|
||
time.sleep(10) # Aguarda 10 segundos antes da próxima execução
|
||
except Exception as e:
|
||
print(f"\nErro na execução: {e}")
|
||
print("Tentando novamente em 10 segundos...")
|
||
time.sleep(10)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Intervalo de datas completado. Script finalizado.")
|
||
print("=" * 60)
|
||
else:
|
||
run_with_retry(max_retries=3, retry_delay=120)
|
||
|