This commit is contained in:
daniel.rodrigues 2025-11-04 09:32:25 -03:00
parent 7e2fe4a785
commit 94129053a4

617
extacao_vendashora_rgb.py Normal file
View File

@ -0,0 +1,617 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time
import os
import csv
import unicodedata
import pyodbc
from datetime import datetime, timedelta
import json
# Configuração da data
USE_MANUAL_DATE = False # False = usar dia anterior automaticamente; True = usar MANUAL_DATE_STR
MANUAL_DATE_STR = "15102025" # Formato DDMMAAAA, usado quando USE_MANUAL_DATE=True
# Configuração de intervalo de datas (execução dia por dia)
USE_DATE_RANGE = False # True = usar intervalo de datas; False = usar USE_MANUAL_DATE
DATE_RANGE_START = "15/10/2025" # Formato DD/MM/YYYY
DATE_RANGE_END = "19/10/2025" # Formato DD/MM/YYYY
STATE_FILE = "date_range_state.json" # Arquivo para rastrear progresso
RANGE_COMPLETED = False # Flag para indicar que o intervalo foi completado
def _parse_date_ddmmyyyy(date_str: str) -> datetime:
"""Converte string DD/MM/YYYY para datetime."""
try:
return datetime.strptime(date_str.strip(), "%d/%m/%Y")
except Exception:
raise ValueError(f"Formato de data inválido: {date_str}. Use DD/MM/YYYY")
def _get_next_date_in_range() -> str:
"""Retorna a próxima data do intervalo no formato DDMMAAAA.
Rastreia o progresso em um arquivo JSON.
Quando chega ao fim, para de executar."""
global RANGE_COMPLETED
try:
start_dt = _parse_date_ddmmyyyy(DATE_RANGE_START)
end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END)
except ValueError as e:
print(f"Erro ao parsear datas: {e}")
raise
# Carregar estado anterior
current_date = start_dt
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
state = json.load(f)
# Verificar se o intervalo já foi completado
if state.get('completed', False):
print(f"Intervalo de datas já foi completado!")
print(f"Última data processada: {state.get('last_date')}")
print(f"Para reiniciar, delete o arquivo '{STATE_FILE}'")
RANGE_COMPLETED = True
return None # Sinal para parar
last_date_str = state.get('last_date')
if last_date_str:
last_date = datetime.strptime(last_date_str, "%d%m%Y")
# Se a última data foi antes do fim, incrementar um dia
if last_date < end_dt:
current_date = last_date + timedelta(days=1)
else:
# Chegou ao fim
RANGE_COMPLETED = True
return None # Sinal para parar
except Exception as e:
print(f"Erro ao ler arquivo de estado: {e}. Iniciando do início.")
current_date = start_dt
print(f"Data a processar: {current_date.strftime('%d/%m/%Y')}")
return current_date.strftime("%d%m%Y")
def _save_date_state(date_str: str) -> None:
"""Salva o estado da data processada com sucesso.
Deve ser chamado apenas após a conclusão bem-sucedida.
Se for a data final, marca como completo."""
try:
end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END)
current_dt = datetime.strptime(date_str, "%d%m%Y")
state = {'last_date': date_str}
# Se chegou na data final, marcar como completo
if current_dt >= end_dt:
state['completed'] = True
print(f"Intervalo de datas COMPLETO! Última data: {date_str}")
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
print(f"Estado salvo: {date_str}")
except Exception as e:
print(f"Aviso: não foi possível salvar estado: {e}")
def resolve_data(use_manual: bool, manual_str: str):
"""Retorna a data no formato DDMMAAAA.
Se USE_DATE_RANGE=True, usa o intervalo de datas (uma por execução).
Se use_manual=True, valida e usa manual_str; caso contrário, usa o dia anterior.
Retorna None se o intervalo de datas foi completado."""
if USE_DATE_RANGE:
return _get_next_date_in_range()
if use_manual:
s = ''.join(ch for ch in str(manual_str) if ch.isdigit())
if len(s) == 8:
return s
else:
print("Data manual inválida; usando dia anterior automaticamente.")
return (datetime.now() - timedelta(days=1)).strftime("%d%m%Y")
def choose_sql_driver() -> str:
try:
available = pyodbc.drivers()
except Exception:
available = []
# Prefer 18, depois 17, depois o primeiro da lista que contenha 'SQL Server'
for candidate in ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server', 'SQL Server']:
if candidate in available:
return candidate
for d in available:
if 'SQL Server' in d:
return d
# Fallback (pode falhar na conexão se realmente não houver driver)
return 'ODBC Driver 17 for SQL Server'
def main():
# Configurar opções do Chrome
chrome_options = Options()
# Remova o comentário abaixo se quiser executar em modo headless (sem interface gráfica)
chrome_options.add_argument('--headless')
# Definir pasta de download para a Área de Trabalho
download_dir = os.path.join(os.path.expanduser("~"), "Desktop")
prefs = {
"download.default_directory": download_dir,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
chrome_options.add_experimental_option("prefs", prefs)
# Inicializar o driver do Chrome
print("Iniciando o navegador...")
driver = webdriver.Chrome(options=chrome_options)
try:
# Acessar a URL
url = "https://cp10269.retaguarda.grupoboticario.com.br/app/#/"
print(f"Acessando {url}...")
driver.get(url)
# Aguardar o campo de input estar presente e visível
print("Aguardando o campo de login aparecer...")
wait = WebDriverWait(driver, 10)
input_field = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-usuario-input-field"]'))
)
# Digitar "daniel.medeiros" no campo de login
print("Digitando 'daniel.medeiros' no campo de login...")
input_field.clear()
input_field.send_keys("daniel.medeiros")
# Aguardar o campo de senha aparecer
print("Aguardando o campo de senha aparecer...")
password_field = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-senha-input-field"]'))
)
# Digitar a senha
print("Digitando a senha...")
password_field.clear()
password_field.send_keys("@ginseng")
# Clicar no botão de entrar com retentativa se houver erro do reCAPTCHA
print("Clicando no botão Entrar...")
max_login_attempts = 3
for attempt in range(1, max_login_attempts + 1):
try:
entrar_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]'))
)
# usar JS click para maior robustez
driver.execute_script("arguments[0].click();", entrar_button)
except Exception:
try:
entrar_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]'))
)
entrar_button.click()
except Exception:
pass
# Verificar se o menu "Venda" apareceu como sinal de login bem-sucedido
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]'))
)
print("Login confirmado.")
break
except Exception:
# Verificar mensagem de erro no corpo da página
try:
body_text = driver.execute_script("return dment.body ? document.body.innerText : ''") or ""
except Exception:
body_text = ""
if 'grecaptcha.execute is not a function' in body_text.lower() or 'this.grecaptcha.execute is not a function' in body_text.lower():
print("Aviso do reCAPTCHA detectado; tentando clicar Entrar novamente...")
else:
print(f"Login ainda não confirmado (tentativa {attempt}/{max_login_attempts}). Tentando novamente...")
time.sleep(2)
if attempt == max_login_attempts:
raise Exception("Falha ao efetuar login após múltiplas tentativas.")
# Clicar em "Venda" no menu lateral
print("Clicando em 'Venda'...")
venda_menu = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]'))
)
venda_menu.click()
time.sleep(1)
# Clicar em "Relatórios"
print("Clicando em 'Relatórios'...")
relatorios_menu = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-relatorios"]'))
)
relatorios_menu.click()
time.sleep(1)
# Clicar em "Vendas por Hora"
print("Clicando em 'Vendas por Hora'...")
vendas_hora_menu = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-vendas-por-hora"]'))
)
vendas_hora_menu.click()
# Aguardar a página de relatório carregar
print("Aguardando a página de relatório carregar...")
time.sleep(3)
# Resolver a data a ser utilizada (dia anterior ou manual), formato DDMMAAAA
data_formatada = resolve_data(USE_MANUAL_DATE, MANUAL_DATE_STR)
# Se retornar None, significa que o intervalo de datas foi completado
if data_formatada is None:
print("Intervalo de datas completado. Encerrando...")
return
print(f"Data a ser preenchida: {data_formatada}")
# Clicar no canto inferior direito da página usando JavaScript
print("Clicando no canto inferior direito da página...")
driver.execute_script("""
var width = window.innerWidth;
var height = window.innerHeight;
var element = document.elementFromPoint(width - 50, height - 50);
if (element) {
element.click();
}
""")
time.sleep(0.5)
# Apertar CTRL + K
print("Apertando CTRL + K...")
from selenium.webdriver.common.action_chains import ActionChains
actions = ActionChains(driver)
actions.key_down(Keys.CONTROL).send_keys('k').key_up(Keys.CONTROL).perform()
time.sleep(1)
# Apertar TAB 3 vezes
print("Apertando TAB 3 vezes...")
for i in range(3):
actions.send_keys(Keys.TAB).perform()
time.sleep(0.3)
# Digitar a data do dia anterior
print(f"Digitando a data inicial: {data_formatada}")
actions.send_keys(data_formatada).perform()
time.sleep(0.5)
# Digitar a mesma data novamente (data final)
print(f"Digitando a data final: {data_formatada}")
actions.send_keys(data_formatada).perform()
time.sleep(2)
# Pressionar ENTER ou ESC para fechar possíveis popups/calendários
print("Pressionando ESC para fechar calendários...")
actions.send_keys(Keys.ESCAPE).perform()
time.sleep(1)
# Rolar a página para baixo para carregar mais elementos
print("Rolando a página para baixo para carregar elementos...")
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# Rolar para cima
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(1)
# Rolar para o meio
driver.execute_script("window.scrollTo(0, document.body.scrollHeight / 2);")
time.sleep(2)
# Trocar para o iframe que contém o formulário
print("\nMudando para o iframe do formulário...")
wait.until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe')))
# Confirmar que estamos no iframe correto procurando um campo conhecido
wait.until(EC.presence_of_element_located((By.ID, 'dataInicial')))
print("Dentro do iframe do formulário.")
# Tentar clicar no radio 'Produto' por várias estratégias
print("Procurando o radio 'Produto'...")
radio_xpath = "//input[@type='radio' and @name='filtro.formato' and (@id='filtro_produto' or translate(@value,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz')='produto')]"
try:
radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, radio_xpath)))
except Exception:
# Fallback: procurar label com texto 'Produto' e um input dentro
radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, "//label[normalize-space()[contains(., 'Produto')]]//input[@type='radio']")))
# Garantir visibilidade e clicar
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", radio_produto)
time.sleep(0.5)
driver.execute_script("arguments[0].click();", radio_produto)
print("Radio 'Produto' selecionado.")
time.sleep(0.5)
# Selecionar formato CSV (label dentro de #divArquivo)
print("Selecionando formato CSV...")
formato_csv_label = wait.until(EC.element_to_be_clickable((By.XPATH, "//*[@id='divArquivo']/div[2]/label")))
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", formato_csv_label)
time.sleep(0.3)
driver.execute_script("arguments[0].click();", formato_csv_label)
time.sleep(0.5)
# Antes de gerar, capturar arquivos existentes na pasta de download
before_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload'))
# Clicar no botão Gerar
print("Clicando no botão Gerar...")
botao_gerar = wait.until(EC.element_to_be_clickable((By.ID, 'gerar')))
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", botao_gerar)
time.sleep(0.3)
driver.execute_script("arguments[0].click();", botao_gerar)
# Voltar para o conteúdo principal
driver.switch_to.default_content()
# Aguardar download finalizar e tratar/renomear com a data usada
print("Aguardando download finalizar para tratar e renomear...")
end_time = time.time() + 180 # até 3 minutos
downloaded_file = None
while time.time() < end_time:
# Esperar terminar downloads parciais
if any(name.endswith('.crdownload') for name in os.listdir(download_dir)):
time.sleep(1)
continue
current_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload'))
new_files = [f for f in current_files - before_files]
if new_files:
# Escolher o mais recente
candidate = max((os.path.join(download_dir, f) for f in new_files), key=os.path.getmtime)
downloaded_file = candidate
break
time.sleep(1)
def _normalize_label(s: str) -> str:
if s is None:
return ""
s = str(s)
s = unicodedata.normalize('NFKD', s)
s = ''.join(c for c in s if not unicodedata.combining(c))
s = s.lower()
# remover caracteres comuns
for ch in ['%', '(', ')', '/', '-', '_', '.', ':']:
s = s.replace(ch, ' ')
s = ' '.join(s.split())
return s
if downloaded_file:
print(f"Arquivo baixado detectado: {downloaded_file}")
# Preparar nome final (garantindo nome nico)
final_name = f"vendas_por_hora_{data_formatada}.csv"
final_path = os.path.join(download_dir, final_name)
if os.path.exists(final_path):
base_no_ext, ext = os.path.splitext(final_name)
counter = 1
while os.path.exists(os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")):
counter += 1
final_path = os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")
# Processar CSV removendo colunas e transformando campos
cols_para_remover = {
_normalize_label(n) for n in [
'Receita líquida', 'Receita liquida',
'Participação Receita (%)', 'Participacao Receita (%)',
'Maior receita', 'Maior Receita',
'Participação Boletos (%)', 'Participacao Boletos (%)'
]
}
try:
# Detectar delimitador e ler
with open(downloaded_file, 'r', encoding='utf-8-sig', errors='ignore') as f:
sample = f.read(4096)
f.seek(0)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=';,\t|')
except Exception:
class Simple(csv.Dialect):
delimiter = ';'
quotechar = '"'
escapechar = None
doublequote = True
skipinitialspace = False
lineterminator = '\n'
quoting = csv.QUOTE_MINIMAL
dialect = Simple
reader = csv.DictReader(f, dialect=dialect)
original_headers = reader.fieldnames or []
# Construir novos headers, removendo colunas e substituindo Produto
new_headers = []
produto_idx = None
for h in original_headers:
nh = _normalize_label(h)
if nh in cols_para_remover:
continue
if nh == _normalize_label('Produto'):
produto_idx = len(new_headers)
new_headers.extend(['SKU', 'Descricao'])
elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'):
new_headers.append('PRECO_MEDIO')
else:
new_headers.append(h)
# Se Produto não existir, garante as colunas novas ao final
if 'SKU' not in new_headers:
new_headers.extend(['SKU', 'Descricao'])
rows_out = []
for row in reader:
new_row = {}
for h in original_headers:
nh = _normalize_label(h)
if nh in cols_para_remover:
continue
if nh == _normalize_label('Produto'):
val = row.get(h, '') or ''
parts = [p.strip() for p in val.split(' - ', 1)]
sku = parts[0] if parts else ''
desc = parts[1] if len(parts) > 1 else val
new_row['SKU'] = sku
new_row['Descricao'] = desc
elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'):
val = row.get(h, '')
new_row['PRECO_MEDIO'] = val
else:
val = row.get(h, '')
if nh == _normalize_label('Loja'):
val = (val or '').split(' - ', 1)[0].strip()
new_row[h] = val
# Garantir chaves SKU/Descricao existam
new_row.setdefault('SKU', '')
new_row.setdefault('Descricao', '')
new_row.setdefault('PRECO_MEDIO', '')
rows_out.append(new_row)
# Inserir os dados tratados no banco de dados
def _find_header(headers, targets):
targets_norm = { _normalize_label(t) for t in targets }
for hh in headers:
if _normalize_label(hh) in targets_norm:
return hh
return None
def _parse_number(val):
s = str(val or '').strip()
if s == '':
return 0
# converter formato pt-BR para ponto flutuante
s = s.replace('.', '').replace(',', '.')
try:
num = float(s)
# se for inteiro, retorna int
return int(num) if abs(num - int(num)) < 1e-9 else num
except Exception:
return 0
# Mapear colunas de interesse
pdv_header = _find_header(new_headers, ['Loja', 'PDV'])
vendas_header = _find_header(new_headers, ['Vendas', 'Quantidade', 'Qtd', 'Qtd. Vendida', 'Qtd Vendida', 'Qtde Vendida', 'Qtde. Vendida', 'Quantidade Vendida', 'Qtde'])
preco_medio_header = _find_header(new_headers, ['PRECO_MEDIO', 'Preço Médio', 'Preco Medio'])
if not pdv_header:
raise Exception("Não foi possível identificar a coluna de PDV/Loja no CSV tratado.")
if not vendas_header:
print("Aviso: coluna de Vendas não encontrada; valores serão inseridos como 0.")
if not preco_medio_header:
print("Aviso: coluna de Preço Médio não encontrada; valores serão inseridos como 0.00.")
# Conectar ao SQL Server
driver_name = choose_sql_driver()
try:
available_drivers = pyodbc.drivers()
except Exception:
available_drivers = []
connection_string = (
f'DRIVER={{{driver_name}}};'
f'SERVER=10.77.77.10;'
f'DATABASE=GINSENG;'
f'UID=supginseng;'
f'PWD=Iphone2513@;'
f'PORT=1433;'
f'TrustServerCertificate=yes;'
f'Encrypt=yes'
)
# Converter data para formato aceito pelo banco (date)
data_db = datetime.strptime(data_formatada, '%d%m%Y').date()
inserted = 0
with pyodbc.connect(connection_string) as conn:
conn.autocommit = False
cur = conn.cursor()
# Apagar dados existentes para a data
cur.execute("DELETE FROM [GINSENG].[dbo].[rgb_sales_selenium] WHERE [Data] = ?", data_db)
# Inserir linhas
insert_sql = (
"INSERT INTO [GINSENG].[dbo].[rgb_sales_selenium] ([Data],[PDV],[SKU],[DESCRICAO],[VENDAS],[PRECO_MEDIO]) "
"VALUES (?,?,?,?,?,?)"
)
batch = []
for r in rows_out:
pdv_val = r.get(pdv_header, '')
try:
pdv_val = int(str(pdv_val).strip().split()[0]) if str(pdv_val).strip() else None
except Exception:
pdv_val = None
sku_val = r.get('SKU', '')
desc_val = r.get('Descricao', '')
vendas_val = _parse_number(r.get(vendas_header, '')) if vendas_header else 0
preco_medio_val = _parse_number(r.get(preco_medio_header, '')) if preco_medio_header else 0.00
if pdv_val is None and r.get('Loja'):
try:
pdv_val = int(str(r.get('Loja')).strip().split()[0])
except Exception:
pdv_val = None
batch.append((data_db, pdv_val, sku_val, desc_val, vendas_val, preco_medio_val))
if len(batch) >= 1000:
cur.executemany(insert_sql, batch)
inserted += len(batch)
batch = []
if batch:
cur.executemany(insert_sql, batch)
inserted += len(batch)
conn.commit()
print(f"Dados inseridos no banco: {inserted} registros para a data {data_db}.")
# Remover arquivo original
try:
os.remove(downloaded_file)
except Exception:
pass
# Salvar estado apenas após sucesso completo
if USE_DATE_RANGE:
_save_date_state(data_formatada)
except Exception as e:
print(f"Falha ao tratar o CSV: {e}")
else:
print("Não foi possível detectar o arquivo baixado dentro do tempo limite.")
# Pausa breve antes de encerrar
time.sleep(2)
print("Fechando o navegador...")
except Exception as e:
print(f"Erro durante a execução: {e}")
finally:
# Fechar o navegador
driver.quit()
print("Script finalizado.")
if __name__ == "__main__":
if USE_DATE_RANGE:
print("=" * 60)
print("MODO LOOP ATIVADO - O script executará continuamente")
print(f"Intervalo: {DATE_RANGE_START} até {DATE_RANGE_END}")
print("=" * 60)
while not RANGE_COMPLETED:
try:
main()
if RANGE_COMPLETED:
break
print("\n" + "=" * 60)
print("Execução concluída. Aguardando 10 segundos antes da próxima execução...")
print("=" * 60 + "\n")
time.sleep(10) # Aguarda 10 segundos antes da próxima execução
except Exception as e:
print(f"\nErro na execução: {e}")
print("Tentando novamente em 10 segundos...")
time.sleep(10)
print("\n" + "=" * 60)
print("Intervalo de datas completado. Script finalizado.")
print("=" * 60)
else:
main()