337 lines
11 KiB
Python
337 lines
11 KiB
Python
import requests
|
|
import pyodbc
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
# ==============================
|
|
# Conexão com o banco de dados
|
|
# ==============================
|
|
|
|
DB_CONNECTION_STRING = (
|
|
'DRIVER={ODBC Driver 18 for SQL Server};'
|
|
'SERVER=10.77.77.10;'
|
|
'DATABASE=GINSENG;'
|
|
'UID=supginseng;'
|
|
'PWD=Iphone2513@;'
|
|
'PORT=1433;'
|
|
'TrustServerCertificate=yes'
|
|
)
|
|
|
|
# Configurações de paralelismo
|
|
MAX_WORKERS = 5 # Número de requisições paralelas
|
|
PAGE_SIZE = 50 # Itens por página
|
|
MAX_RETRIES = 5 # Máximo de tentativas por página (dentro do paralelo)
|
|
RETRY_DELAY = 5 # Segundos entre tentativas
|
|
FINAL_RETRY_DELAY = 30 # Segundos entre rodadas de retry final
|
|
|
|
# Lock para print thread-safe
|
|
print_lock = threading.Lock()
|
|
|
|
# Estatísticas globais
|
|
stats = {
|
|
"pages_downloaded": 0,
|
|
"pages_failed": 0,
|
|
"items_downloaded": 0,
|
|
"items_inserted": 0,
|
|
"errors": 0
|
|
}
|
|
stats_lock = threading.Lock()
|
|
|
|
# ==============================
|
|
# 1) Buscar token da API
|
|
# ==============================
|
|
|
|
def get_token():
|
|
url = "https://api.grupoginseng.com.br/api/rgb_token"
|
|
response = requests.get(url)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Erro ao buscar token: {response.status_code} {response.text}")
|
|
|
|
data = response.json()
|
|
token = data["data"][0]["token"]
|
|
return token
|
|
|
|
|
|
# ==============================
|
|
# 2) Consultar produtos
|
|
# ==============================
|
|
|
|
def insert_items_to_db(cursor, items):
|
|
"""Insere uma lista de itens no banco de dados, deletando SKUs existentes antes"""
|
|
|
|
# Query para deletar SKU existente
|
|
delete_query = "DELETE FROM [dbo].[rgb_product] WHERE [sku] = ?"
|
|
|
|
insert_query = """
|
|
INSERT INTO [dbo].[rgb_product] (
|
|
[sku], [marketId], [tacticId], [strategicId], [brand], [internalCode],
|
|
[description], [discontinued], [purpose], [discountAllowed], [maxDiscount],
|
|
[createdAt], [discontinuedDate], [updatedAt], [ncmId], [cest],
|
|
[grossWeight], [netWeight], [purchaseBlocked]
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
inserted = 0
|
|
deleted = 0
|
|
for item in items:
|
|
try:
|
|
sku = item.get("id")
|
|
|
|
# 1. Deletar SKU se já existir
|
|
cursor.execute(delete_query, sku)
|
|
if cursor.rowcount > 0:
|
|
deleted += cursor.rowcount
|
|
|
|
# 2. Inserir o novo registro
|
|
cursor.execute(insert_query,
|
|
sku,
|
|
item.get("marketId"),
|
|
item.get("tacticId"),
|
|
item.get("strategicId"),
|
|
item.get("brand"),
|
|
item.get("internalCode"),
|
|
item.get("description"),
|
|
item.get("discontinued"),
|
|
item.get("purpose"),
|
|
item.get("discountAllowed"),
|
|
item.get("maxDiscount"),
|
|
item.get("createdAt"),
|
|
item.get("discontinuedDate"),
|
|
item.get("updatedAt"),
|
|
item.get("ncmId"),
|
|
item.get("cest"),
|
|
item.get("grossWeight"),
|
|
item.get("netWeight"),
|
|
item.get("purchaseBlocked")
|
|
)
|
|
inserted += 1
|
|
except Exception as e:
|
|
with stats_lock:
|
|
stats["errors"] += 1
|
|
if stats["errors"] <= 5:
|
|
print(f" ✗ Erro ao inserir SKU {item.get('id')}: {e}")
|
|
|
|
# Atualizar estatística de deletados
|
|
with stats_lock:
|
|
stats["skus_updated"] = stats.get("skus_updated", 0) + deleted
|
|
|
|
return inserted
|
|
|
|
|
|
def fetch_and_insert_page(token, start, total_pages, cursor, db_lock):
|
|
"""Busca uma página específica com retry e insere no banco imediatamente"""
|
|
url = "https://api.grupoboticario.com.br/global/v1/franchising/gb-stores-data/product/products"
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Data de ontem para filtrar produtos atualizados
|
|
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
|
params = {
|
|
"count": PAGE_SIZE,
|
|
"start": start,
|
|
"product.updatedAt": yesterday
|
|
}
|
|
|
|
page_num = (start // PAGE_SIZE) + 1
|
|
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
try:
|
|
response = requests.get(url, headers=headers, params=params, timeout=60)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
items = data.get("items", [])
|
|
|
|
# Inserir no banco imediatamente (com lock para thread-safety)
|
|
with db_lock:
|
|
inserted = insert_items_to_db(cursor, items)
|
|
cursor.connection.commit()
|
|
|
|
with stats_lock:
|
|
stats["pages_downloaded"] += 1
|
|
stats["items_downloaded"] += len(items)
|
|
stats["items_inserted"] += inserted
|
|
|
|
with print_lock:
|
|
print(f" ✓ Página {page_num}/{total_pages}: {len(items)} baixados, {inserted} inseridos (Total: {stats['items_inserted']})")
|
|
|
|
return {"start": start, "success": True}
|
|
else:
|
|
with print_lock:
|
|
print(f" ✗ Página {page_num} (start={start}): Erro {response.status_code} - Tentativa {attempt}/{MAX_RETRIES}")
|
|
|
|
except Exception as e:
|
|
with print_lock:
|
|
print(f" ✗ Página {page_num} (start={start}): {str(e)[:50]} - Tentativa {attempt}/{MAX_RETRIES}")
|
|
|
|
if attempt < MAX_RETRIES:
|
|
time.sleep(RETRY_DELAY)
|
|
|
|
with stats_lock:
|
|
stats["pages_failed"] += 1
|
|
|
|
return {"start": start, "success": False}
|
|
|
|
|
|
def get_total_products(token, date_filter):
|
|
"""Faz uma requisição inicial para descobrir o total de produtos"""
|
|
url = "https://api.grupoboticario.com.br/global/v1/franchising/gb-stores-data/product/products"
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
params = {
|
|
"count": 1,
|
|
"start": 0,
|
|
"product.updatedAt": date_filter
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, params=params, timeout=60)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("total", 0)
|
|
|
|
return None
|
|
|
|
|
|
def fetch_and_insert_all_products(token):
|
|
"""Consulta todos os produtos com paginação paralela e insere no banco imediatamente"""
|
|
|
|
# Calcular data de ontem (dia anterior)
|
|
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
|
print(f"Consultando produtos e inserindo no banco...")
|
|
print(f" Filtro de data: {yesterday} (produtos atualizados ontem)")
|
|
|
|
# Reset stats
|
|
global stats
|
|
stats = {
|
|
"pages_downloaded": 0,
|
|
"pages_failed": 0,
|
|
"items_downloaded": 0,
|
|
"items_inserted": 0,
|
|
"errors": 0,
|
|
"skus_updated": 0 # SKUs que já existiam e foram atualizados
|
|
}
|
|
|
|
# 1. Descobrir o total de produtos
|
|
print(" Descobrindo total de produtos...")
|
|
total = get_total_products(token, yesterday)
|
|
|
|
if total is None:
|
|
print(" ✗ Erro ao descobrir total de produtos")
|
|
return None
|
|
|
|
print(f" Total de produtos na API: {total}")
|
|
|
|
if total == 0:
|
|
print(f" ⚠ Nenhum produto atualizado em {yesterday}")
|
|
return stats
|
|
|
|
# 2. Calcular páginas necessárias
|
|
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE
|
|
starts = [i * PAGE_SIZE for i in range(total_pages)]
|
|
|
|
print(f" Total de páginas: {total_pages}")
|
|
print(f" Requisições paralelas: {MAX_WORKERS}")
|
|
|
|
# 3. Preparar banco de dados
|
|
print(" Preparando banco de dados...")
|
|
conn = pyodbc.connect(DB_CONNECTION_STRING)
|
|
cursor = conn.cursor()
|
|
|
|
print(f" Estratégia: Deletar SKU existente antes de inserir (upsert)")
|
|
print(f" Iniciando download e inserção...\n")
|
|
|
|
# Lock para acesso ao banco (uma conexão compartilhada)
|
|
db_lock = threading.Lock()
|
|
|
|
# 4. Buscar páginas em paralelo e inserir imediatamente
|
|
failed_pages = []
|
|
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
futures = {executor.submit(fetch_and_insert_page, token, start, total_pages, cursor, db_lock): start for start in starts}
|
|
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
|
|
if not result["success"]:
|
|
failed_pages.append(result["start"])
|
|
|
|
# 5. Verificar páginas que falharam - continua tentando até todas serem bem-sucedidas
|
|
retry_round = 0
|
|
while failed_pages:
|
|
retry_round += 1
|
|
print(f"\n ⚠ Rodada de retry #{retry_round}: {len(failed_pages)} páginas falharam")
|
|
print(f" Páginas: {failed_pages[:10]}{'...' if len(failed_pages) > 10 else ''}")
|
|
print(f" Aguardando {FINAL_RETRY_DELAY} segundos antes de tentar novamente...")
|
|
time.sleep(FINAL_RETRY_DELAY)
|
|
|
|
still_failed = []
|
|
for start in failed_pages:
|
|
result = fetch_and_insert_page(token, start, total_pages, cursor, db_lock)
|
|
if not result["success"]:
|
|
still_failed.append(start)
|
|
|
|
failed_pages = still_failed
|
|
|
|
if failed_pages:
|
|
print(f" Ainda restam {len(failed_pages)} páginas com falha. Tentando novamente...")
|
|
|
|
# Fechar conexão
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
print(f"\n Download e inserção concluídos!")
|
|
|
|
return stats
|
|
|
|
|
|
# ==============================
|
|
# EXECUTAR
|
|
# ==============================
|
|
|
|
if __name__ == "__main__":
|
|
print("="*60)
|
|
print("RGB PRODUCTS - Consulta de Produtos")
|
|
print("="*60)
|
|
|
|
# 1. Buscar token
|
|
print("\n[1/2] Buscando token...")
|
|
token = get_token()
|
|
print(f"✓ Token obtido com sucesso!")
|
|
print(f" Token: {token[:50]}...")
|
|
|
|
# 2. Consultar produtos e inserir no banco (paralelo)
|
|
print("\n[2/2] Baixando e inserindo produtos...")
|
|
result = fetch_and_insert_all_products(token)
|
|
|
|
if result:
|
|
print(f"\n{'='*60}")
|
|
print("RESUMO FINAL")
|
|
print(f"{'='*60}")
|
|
print(f" Páginas baixadas: {result['pages_downloaded']}")
|
|
print(f" Páginas com falha: {result['pages_failed']}")
|
|
print(f" Itens baixados: {result['items_downloaded']}")
|
|
print(f" Itens inseridos: {result['items_inserted']}")
|
|
print(f" SKUs atualizados: {result.get('skus_updated', 0)} (já existiam)")
|
|
print(f" Erros de inserção: {result['errors']}")
|
|
print(f"{'='*60}")
|
|
|
|
if result['pages_failed'] == 0 and result['errors'] == 0:
|
|
print("✓ SUCESSO TOTAL!")
|
|
else:
|
|
print("⚠ Concluído com alguns problemas")
|
|
else:
|
|
print("\n✗ Não foi possível obter os produtos.")
|