From 94129053a4f7fa999089a4ca0265156009eb74a1 Mon Sep 17 00:00:00 2001 From: "daniel.rodrigues" Date: Tue, 4 Nov 2025 09:32:25 -0300 Subject: [PATCH] att --- extacao_vendashora_rgb.py | 617 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 617 insertions(+) create mode 100644 extacao_vendashora_rgb.py diff --git a/extacao_vendashora_rgb.py b/extacao_vendashora_rgb.py new file mode 100644 index 0000000..3d0c17f --- /dev/null +++ b/extacao_vendashora_rgb.py @@ -0,0 +1,617 @@ +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options +import time +import os +import csv +import unicodedata +import pyodbc + +from datetime import datetime, timedelta +import json + +# Configuração da data +USE_MANUAL_DATE = False # False = usar dia anterior automaticamente; True = usar MANUAL_DATE_STR +MANUAL_DATE_STR = "15102025" # Formato DDMMAAAA, usado quando USE_MANUAL_DATE=True + +# Configuração de intervalo de datas (execução dia por dia) +USE_DATE_RANGE = False # True = usar intervalo de datas; False = usar USE_MANUAL_DATE +DATE_RANGE_START = "15/10/2025" # Formato DD/MM/YYYY +DATE_RANGE_END = "19/10/2025" # Formato DD/MM/YYYY +STATE_FILE = "date_range_state.json" # Arquivo para rastrear progresso +RANGE_COMPLETED = False # Flag para indicar que o intervalo foi completado + +def _parse_date_ddmmyyyy(date_str: str) -> datetime: + """Converte string DD/MM/YYYY para datetime.""" + try: + return datetime.strptime(date_str.strip(), "%d/%m/%Y") + except Exception: + raise ValueError(f"Formato de data inválido: {date_str}. Use DD/MM/YYYY") + +def _get_next_date_in_range() -> str: + """Retorna a próxima data do intervalo no formato DDMMAAAA. + Rastreia o progresso em um arquivo JSON. + Quando chega ao fim, para de executar.""" + global RANGE_COMPLETED + + try: + start_dt = _parse_date_ddmmyyyy(DATE_RANGE_START) + end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END) + except ValueError as e: + print(f"Erro ao parsear datas: {e}") + raise + + # Carregar estado anterior + current_date = start_dt + if os.path.exists(STATE_FILE): + try: + with open(STATE_FILE, 'r') as f: + state = json.load(f) + # Verificar se o intervalo já foi completado + if state.get('completed', False): + print(f"Intervalo de datas já foi completado!") + print(f"Última data processada: {state.get('last_date')}") + print(f"Para reiniciar, delete o arquivo '{STATE_FILE}'") + RANGE_COMPLETED = True + return None # Sinal para parar + + last_date_str = state.get('last_date') + if last_date_str: + last_date = datetime.strptime(last_date_str, "%d%m%Y") + # Se a última data foi antes do fim, incrementar um dia + if last_date < end_dt: + current_date = last_date + timedelta(days=1) + else: + # Chegou ao fim + RANGE_COMPLETED = True + return None # Sinal para parar + except Exception as e: + print(f"Erro ao ler arquivo de estado: {e}. Iniciando do início.") + current_date = start_dt + + print(f"Data a processar: {current_date.strftime('%d/%m/%Y')}") + return current_date.strftime("%d%m%Y") + +def _save_date_state(date_str: str) -> None: + """Salva o estado da data processada com sucesso. + Deve ser chamado apenas após a conclusão bem-sucedida. + Se for a data final, marca como completo.""" + try: + end_dt = _parse_date_ddmmyyyy(DATE_RANGE_END) + current_dt = datetime.strptime(date_str, "%d%m%Y") + + state = {'last_date': date_str} + # Se chegou na data final, marcar como completo + if current_dt >= end_dt: + state['completed'] = True + print(f"Intervalo de datas COMPLETO! Última data: {date_str}") + + with open(STATE_FILE, 'w') as f: + json.dump(state, f) + print(f"Estado salvo: {date_str}") + except Exception as e: + print(f"Aviso: não foi possível salvar estado: {e}") + +def resolve_data(use_manual: bool, manual_str: str): + """Retorna a data no formato DDMMAAAA. + Se USE_DATE_RANGE=True, usa o intervalo de datas (uma por execução). + Se use_manual=True, valida e usa manual_str; caso contrário, usa o dia anterior. + Retorna None se o intervalo de datas foi completado.""" + if USE_DATE_RANGE: + return _get_next_date_in_range() + + if use_manual: + s = ''.join(ch for ch in str(manual_str) if ch.isdigit()) + if len(s) == 8: + return s + else: + print("Data manual inválida; usando dia anterior automaticamente.") + return (datetime.now() - timedelta(days=1)).strftime("%d%m%Y") + +def choose_sql_driver() -> str: + try: + available = pyodbc.drivers() + except Exception: + available = [] + # Prefer 18, depois 17, depois o primeiro da lista que contenha 'SQL Server' + for candidate in ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server', 'SQL Server']: + if candidate in available: + return candidate + for d in available: + if 'SQL Server' in d: + return d + # Fallback (pode falhar na conexão se realmente não houver driver) + return 'ODBC Driver 17 for SQL Server' + + +def main(): + # Configurar opções do Chrome + chrome_options = Options() + # Remova o comentário abaixo se quiser executar em modo headless (sem interface gráfica) + chrome_options.add_argument('--headless') + + # Definir pasta de download para a Área de Trabalho + download_dir = os.path.join(os.path.expanduser("~"), "Desktop") + prefs = { + "download.default_directory": download_dir, + "download.prompt_for_download": False, + "download.directory_upgrade": True, + "safebrowsing.enabled": True + } + chrome_options.add_experimental_option("prefs", prefs) + + # Inicializar o driver do Chrome + print("Iniciando o navegador...") + driver = webdriver.Chrome(options=chrome_options) + + try: + # Acessar a URL + url = "https://cp10269.retaguarda.grupoboticario.com.br/app/#/" + print(f"Acessando {url}...") + driver.get(url) + + # Aguardar o campo de input estar presente e visível + print("Aguardando o campo de login aparecer...") + wait = WebDriverWait(driver, 10) + input_field = wait.until( + EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-usuario-input-field"]')) + ) + + # Digitar "daniel.medeiros" no campo de login + print("Digitando 'daniel.medeiros' no campo de login...") + input_field.clear() + input_field.send_keys("daniel.medeiros") + + # Aguardar o campo de senha aparecer + print("Aguardando o campo de senha aparecer...") + password_field = wait.until( + EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="login-senha-input-field"]')) + ) + + # Digitar a senha + print("Digitando a senha...") + password_field.clear() + password_field.send_keys("@ginseng") + + # Clicar no botão de entrar com retentativa se houver erro do reCAPTCHA + print("Clicando no botão Entrar...") + max_login_attempts = 3 + for attempt in range(1, max_login_attempts + 1): + try: + entrar_button = wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]')) + ) + # usar JS click para maior robustez + driver.execute_script("arguments[0].click();", entrar_button) + except Exception: + try: + entrar_button = wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="login-entrar-button"]')) + ) + entrar_button.click() + except Exception: + pass + + # Verificar se o menu "Venda" apareceu como sinal de login bem-sucedido + try: + WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]')) + ) + print("Login confirmado.") + break + except Exception: + # Verificar mensagem de erro no corpo da página + try: + body_text = driver.execute_script("return dment.body ? document.body.innerText : ''") or "" + except Exception: + body_text = "" + if 'grecaptcha.execute is not a function' in body_text.lower() or 'this.grecaptcha.execute is not a function' in body_text.lower(): + print("Aviso do reCAPTCHA detectado; tentando clicar Entrar novamente...") + else: + print(f"Login ainda não confirmado (tentativa {attempt}/{max_login_attempts}). Tentando novamente...") + time.sleep(2) + if attempt == max_login_attempts: + raise Exception("Falha ao efetuar login após múltiplas tentativas.") + + # Clicar em "Venda" no menu lateral + print("Clicando em 'Venda'...") + venda_menu = wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-venda"]')) + ) + venda_menu.click() + time.sleep(1) + + # Clicar em "Relatórios" + print("Clicando em 'Relatórios'...") + relatorios_menu = wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-relatorios"]')) + ) + relatorios_menu.click() + time.sleep(1) + + # Clicar em "Vendas por Hora" + print("Clicando em 'Vendas por Hora'...") + vendas_hora_menu = wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, '[data-cy="sidemenu-item-vendas-por-hora"]')) + ) + vendas_hora_menu.click() + + # Aguardar a página de relatório carregar + print("Aguardando a página de relatório carregar...") + time.sleep(3) + + # Resolver a data a ser utilizada (dia anterior ou manual), formato DDMMAAAA + data_formatada = resolve_data(USE_MANUAL_DATE, MANUAL_DATE_STR) + + # Se retornar None, significa que o intervalo de datas foi completado + if data_formatada is None: + print("Intervalo de datas completado. Encerrando...") + return + + print(f"Data a ser preenchida: {data_formatada}") + + # Clicar no canto inferior direito da página usando JavaScript + print("Clicando no canto inferior direito da página...") + driver.execute_script(""" + var width = window.innerWidth; + var height = window.innerHeight; + var element = document.elementFromPoint(width - 50, height - 50); + if (element) { + element.click(); + } + """) + time.sleep(0.5) + + # Apertar CTRL + K + print("Apertando CTRL + K...") + from selenium.webdriver.common.action_chains import ActionChains + actions = ActionChains(driver) + actions.key_down(Keys.CONTROL).send_keys('k').key_up(Keys.CONTROL).perform() + time.sleep(1) + + # Apertar TAB 3 vezes + print("Apertando TAB 3 vezes...") + for i in range(3): + actions.send_keys(Keys.TAB).perform() + time.sleep(0.3) + + # Digitar a data do dia anterior + print(f"Digitando a data inicial: {data_formatada}") + actions.send_keys(data_formatada).perform() + time.sleep(0.5) + + # Digitar a mesma data novamente (data final) + print(f"Digitando a data final: {data_formatada}") + actions.send_keys(data_formatada).perform() + time.sleep(2) + + # Pressionar ENTER ou ESC para fechar possíveis popups/calendários + print("Pressionando ESC para fechar calendários...") + actions.send_keys(Keys.ESCAPE).perform() + time.sleep(1) + + # Rolar a página para baixo para carregar mais elementos + print("Rolando a página para baixo para carregar elementos...") + driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) + + # Rolar para cima + driver.execute_script("window.scrollTo(0, 0);") + time.sleep(1) + + # Rolar para o meio + driver.execute_script("window.scrollTo(0, document.body.scrollHeight / 2);") + time.sleep(2) + + # Trocar para o iframe que contém o formulário + print("\nMudando para o iframe do formulário...") + wait.until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe'))) + + # Confirmar que estamos no iframe correto procurando um campo conhecido + wait.until(EC.presence_of_element_located((By.ID, 'dataInicial'))) + print("Dentro do iframe do formulário.") + + # Tentar clicar no radio 'Produto' por várias estratégias + print("Procurando o radio 'Produto'...") + radio_xpath = "//input[@type='radio' and @name='filtro.formato' and (@id='filtro_produto' or translate(@value,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz')='produto')]" + try: + radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, radio_xpath))) + except Exception: + # Fallback: procurar label com texto 'Produto' e um input dentro + radio_produto = wait.until(EC.element_to_be_clickable((By.XPATH, "//label[normalize-space()[contains(., 'Produto')]]//input[@type='radio']"))) + + # Garantir visibilidade e clicar + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", radio_produto) + time.sleep(0.5) + driver.execute_script("arguments[0].click();", radio_produto) + print("Radio 'Produto' selecionado.") + time.sleep(0.5) + + # Selecionar formato CSV (label dentro de #divArquivo) + print("Selecionando formato CSV...") + formato_csv_label = wait.until(EC.element_to_be_clickable((By.XPATH, "//*[@id='divArquivo']/div[2]/label"))) + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", formato_csv_label) + time.sleep(0.3) + driver.execute_script("arguments[0].click();", formato_csv_label) + time.sleep(0.5) + + # Antes de gerar, capturar arquivos existentes na pasta de download + before_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload')) + + # Clicar no botão Gerar + print("Clicando no botão Gerar...") + botao_gerar = wait.until(EC.element_to_be_clickable((By.ID, 'gerar'))) + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", botao_gerar) + time.sleep(0.3) + driver.execute_script("arguments[0].click();", botao_gerar) + + # Voltar para o conteúdo principal + driver.switch_to.default_content() + + # Aguardar download finalizar e tratar/renomear com a data usada + print("Aguardando download finalizar para tratar e renomear...") + end_time = time.time() + 180 # até 3 minutos + downloaded_file = None + while time.time() < end_time: + # Esperar terminar downloads parciais + if any(name.endswith('.crdownload') for name in os.listdir(download_dir)): + time.sleep(1) + continue + current_files = set(f for f in os.listdir(download_dir) if not f.endswith('.crdownload')) + new_files = [f for f in current_files - before_files] + if new_files: + # Escolher o mais recente + candidate = max((os.path.join(download_dir, f) for f in new_files), key=os.path.getmtime) + downloaded_file = candidate + break + time.sleep(1) + + def _normalize_label(s: str) -> str: + if s is None: + return "" + s = str(s) + s = unicodedata.normalize('NFKD', s) + s = ''.join(c for c in s if not unicodedata.combining(c)) + s = s.lower() + # remover caracteres comuns + for ch in ['%', '(', ')', '/', '-', '_', '.', ':']: + s = s.replace(ch, ' ') + s = ' '.join(s.split()) + return s + + if downloaded_file: + print(f"Arquivo baixado detectado: {downloaded_file}") + # Preparar nome final (garantindo nome nico) + final_name = f"vendas_por_hora_{data_formatada}.csv" + final_path = os.path.join(download_dir, final_name) + if os.path.exists(final_path): + base_no_ext, ext = os.path.splitext(final_name) + counter = 1 + while os.path.exists(os.path.join(download_dir, f"{base_no_ext}({counter}){ext}")): + counter += 1 + final_path = os.path.join(download_dir, f"{base_no_ext}({counter}){ext}") + # Processar CSV removendo colunas e transformando campos + cols_para_remover = { + _normalize_label(n) for n in [ + 'Receita líquida', 'Receita liquida', + 'Participação Receita (%)', 'Participacao Receita (%)', + 'Maior receita', 'Maior Receita', + 'Participação Boletos (%)', 'Participacao Boletos (%)' + ] + } + try: + # Detectar delimitador e ler + with open(downloaded_file, 'r', encoding='utf-8-sig', errors='ignore') as f: + sample = f.read(4096) + f.seek(0) + try: + dialect = csv.Sniffer().sniff(sample, delimiters=';,\t|') + except Exception: + class Simple(csv.Dialect): + delimiter = ';' + quotechar = '"' + escapechar = None + doublequote = True + skipinitialspace = False + lineterminator = '\n' + quoting = csv.QUOTE_MINIMAL + dialect = Simple + reader = csv.DictReader(f, dialect=dialect) + original_headers = reader.fieldnames or [] + + # Construir novos headers, removendo colunas e substituindo Produto + new_headers = [] + produto_idx = None + for h in original_headers: + nh = _normalize_label(h) + if nh in cols_para_remover: + continue + if nh == _normalize_label('Produto'): + produto_idx = len(new_headers) + new_headers.extend(['SKU', 'Descricao']) + elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'): + new_headers.append('PRECO_MEDIO') + else: + new_headers.append(h) + + # Se Produto não existir, garante as colunas novas ao final + if 'SKU' not in new_headers: + new_headers.extend(['SKU', 'Descricao']) + + rows_out = [] + for row in reader: + new_row = {} + for h in original_headers: + nh = _normalize_label(h) + if nh in cols_para_remover: + continue + if nh == _normalize_label('Produto'): + val = row.get(h, '') or '' + parts = [p.strip() for p in val.split(' - ', 1)] + sku = parts[0] if parts else '' + desc = parts[1] if len(parts) > 1 else val + new_row['SKU'] = sku + new_row['Descricao'] = desc + elif nh == _normalize_label('Preço Médio') or nh == _normalize_label('Preco Medio'): + val = row.get(h, '') + new_row['PRECO_MEDIO'] = val + else: + val = row.get(h, '') + if nh == _normalize_label('Loja'): + val = (val or '').split(' - ', 1)[0].strip() + new_row[h] = val + # Garantir chaves SKU/Descricao existam + new_row.setdefault('SKU', '') + new_row.setdefault('Descricao', '') + new_row.setdefault('PRECO_MEDIO', '') + rows_out.append(new_row) + + # Inserir os dados tratados no banco de dados + def _find_header(headers, targets): + targets_norm = { _normalize_label(t) for t in targets } + for hh in headers: + if _normalize_label(hh) in targets_norm: + return hh + return None + + def _parse_number(val): + s = str(val or '').strip() + if s == '': + return 0 + # converter formato pt-BR para ponto flutuante + s = s.replace('.', '').replace(',', '.') + try: + num = float(s) + # se for inteiro, retorna int + return int(num) if abs(num - int(num)) < 1e-9 else num + except Exception: + return 0 + + # Mapear colunas de interesse + pdv_header = _find_header(new_headers, ['Loja', 'PDV']) + vendas_header = _find_header(new_headers, ['Vendas', 'Quantidade', 'Qtd', 'Qtd. Vendida', 'Qtd Vendida', 'Qtde Vendida', 'Qtde. Vendida', 'Quantidade Vendida', 'Qtde']) + preco_medio_header = _find_header(new_headers, ['PRECO_MEDIO', 'Preço Médio', 'Preco Medio']) + + if not pdv_header: + raise Exception("Não foi possível identificar a coluna de PDV/Loja no CSV tratado.") + if not vendas_header: + print("Aviso: coluna de Vendas não encontrada; valores serão inseridos como 0.") + if not preco_medio_header: + print("Aviso: coluna de Preço Médio não encontrada; valores serão inseridos como 0.00.") + + # Conectar ao SQL Server + driver_name = choose_sql_driver() + try: + available_drivers = pyodbc.drivers() + except Exception: + available_drivers = [] + connection_string = ( + f'DRIVER={{{driver_name}}};' + f'SERVER=10.77.77.10;' + f'DATABASE=GINSENG;' + f'UID=supginseng;' + f'PWD=Iphone2513@;' + f'PORT=1433;' + f'TrustServerCertificate=yes;' + f'Encrypt=yes' + ) + + # Converter data para formato aceito pelo banco (date) + data_db = datetime.strptime(data_formatada, '%d%m%Y').date() + + inserted = 0 + with pyodbc.connect(connection_string) as conn: + conn.autocommit = False + cur = conn.cursor() + # Apagar dados existentes para a data + cur.execute("DELETE FROM [GINSENG].[dbo].[rgb_sales_selenium] WHERE [Data] = ?", data_db) + + # Inserir linhas + insert_sql = ( + "INSERT INTO [GINSENG].[dbo].[rgb_sales_selenium] ([Data],[PDV],[SKU],[DESCRICAO],[VENDAS],[PRECO_MEDIO]) " + "VALUES (?,?,?,?,?,?)" + ) + batch = [] + for r in rows_out: + pdv_val = r.get(pdv_header, '') + try: + pdv_val = int(str(pdv_val).strip().split()[0]) if str(pdv_val).strip() else None + except Exception: + pdv_val = None + sku_val = r.get('SKU', '') + desc_val = r.get('Descricao', '') + vendas_val = _parse_number(r.get(vendas_header, '')) if vendas_header else 0 + preco_medio_val = _parse_number(r.get(preco_medio_header, '')) if preco_medio_header else 0.00 + if pdv_val is None and r.get('Loja'): + try: + pdv_val = int(str(r.get('Loja')).strip().split()[0]) + except Exception: + pdv_val = None + batch.append((data_db, pdv_val, sku_val, desc_val, vendas_val, preco_medio_val)) + if len(batch) >= 1000: + cur.executemany(insert_sql, batch) + inserted += len(batch) + batch = [] + if batch: + cur.executemany(insert_sql, batch) + inserted += len(batch) + conn.commit() + + print(f"Dados inseridos no banco: {inserted} registros para a data {data_db}.") + # Remover arquivo original + try: + os.remove(downloaded_file) + except Exception: + pass + + # Salvar estado apenas após sucesso completo + if USE_DATE_RANGE: + _save_date_state(data_formatada) + except Exception as e: + print(f"Falha ao tratar o CSV: {e}") + else: + print("Não foi possível detectar o arquivo baixado dentro do tempo limite.") + + # Pausa breve antes de encerrar + time.sleep(2) + + print("Fechando o navegador...") + + except Exception as e: + print(f"Erro durante a execução: {e}") + + finally: + # Fechar o navegador + driver.quit() + print("Script finalizado.") + +if __name__ == "__main__": + if USE_DATE_RANGE: + print("=" * 60) + print("MODO LOOP ATIVADO - O script executará continuamente") + print(f"Intervalo: {DATE_RANGE_START} até {DATE_RANGE_END}") + print("=" * 60) + while not RANGE_COMPLETED: + try: + main() + if RANGE_COMPLETED: + break + print("\n" + "=" * 60) + print("Execução concluída. Aguardando 10 segundos antes da próxima execução...") + print("=" * 60 + "\n") + time.sleep(10) # Aguarda 10 segundos antes da próxima execução + except Exception as e: + print(f"\nErro na execução: {e}") + print("Tentando novamente em 10 segundos...") + time.sleep(10) + + print("\n" + "=" * 60) + print("Intervalo de datas completado. Script finalizado.") + print("=" * 60) + else: + main() +