Bulk Orcamento

This commit is contained in:
João Monezi 2025-04-03 10:45:22 -03:00
parent 806a4db7f9
commit 7332a9d773
8 changed files with 1736 additions and 0 deletions

View File

@ -0,0 +1,49 @@
#!/usr/bin/env python
# coding: utf-8
# ## Notebook - API_refresh_PBI_dataset
#
# New notebook
# In[2]:
pip install adal
# In[3]:
import adal
import requests
# Credenciais que serão utilizadas na atualização do dataset que esta no power BI
client_id = "e30706c6-bee0-4a2a-a208-420587d04d1c"
username = "nicolas.evilasio@grupoginseng.com.br"
password = "Ginseng@"
workspace_id = '8834c28f-adc5-424b-a337-3ee2ecec60c3'
dataset_id = '64e76869-2d76-4919-b471-815f7d8db031'
# Credencias de acesso do lado da microsoft
authority_url = 'https://login.windows.net/common'
resource_url = 'https://analysis.windows.net/powerbi/api'
url = r'https://api.powerbi.com/v1.0/myorg/groups/' + workspace_id + '/datasets/' + dataset_id + '/refreshes?$top=1'
# Context client, token e access_token para acessarmos o power bi da cyrela
context = adal.AuthenticationContext(authority=authority_url, validate_authority=True, api_version=None)
token = context.acquire_token_with_username_password(resource=resource_url, client_id=client_id, username=username, password=password)
access_token = token.get('accessToken')
print(access_token)
# Requisição
header = {'Authorization': f'Bearer {access_token}'}
r = requests.post(url=url, headers=header)
# Verificando o status da requisição
if r.status_code == 202:
print("Atualização iniciada com sucesso!")
else:
print(f"Erro ao iniciar a atualização: {r.status_code}")
print(f"Detalhes do erro: {r.text}") # Imprime os detalhes do erro retornado pela API

View File

@ -0,0 +1,732 @@
#!/usr/bin/env python
# coding: utf-8
# ## Notebook - ETL notas API Conexao
#
# New notebook
# In[1]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# import requests
# import json
# import pandas as pd
# from sqlalchemy import create_engine
# from datetime import datetime, timedelta
# import math
# from bs4 import BeautifulSoup
# from lxml import etree
# import xmltodict
# In[2]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def gerar_datas_inicio_fim(data_ultimo_registro):
# '''Gera uma lista de dicionários com as datas de início e fim que devem ser utilizadas como parâmetros
# na API de buscar as chaves das notas fiscais
# :param str data_ultimo_registro: Data da última emissão de nota fiscal presente no banco de dados, no formato 'YYYYMMDD'.
# :return: Lista de dicionários contendo as datas de início e fim para cada intervalo de busca na API.
# :rtype: list[dict]
# '''
# max_data_bd = datetime.strptime(data_ultimo_registro, '%Y%m%d').date()
# data_hoje = datetime.today().date()
# data_amanha = datetime.today().date() + timedelta(days=1)
# # Define a data de início para a busca, 10 dias antes da data do último registro
# data_inicio = max_data_bd - timedelta(days=10)
# qtd_dias_pendentes = (data_hoje - data_inicio).days
# # A API limita a busca de chaves no intervalo de 10 dias, portanto isso faremos x iteracoes para abranger o período desejado
# qtd_iteracoes = math.ceil(qtd_dias_pendentes / 10)
# contador = 1
# lista_datas = []
# for i in range(qtd_iteracoes):
# # Atuaiza a data de início para cada iteração
# data_inicio += timedelta(days = contador)
# # Variável que controla a data limite de busca
# contador = 10 if i < qtd_iteracoes - 1 else (data_amanha - data_inicio).days
# data_fim = data_inicio + timedelta(days = contador)
# # extende a lista com um dict que contém a data inicial e data final de cada iteração
# lista_datas.extend([{'inicio': data_inicio.strftime('%Y-%m-%d'), 'fim': data_fim.strftime('%Y-%m-%d')}])
# return lista_datas
# In[3]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # sessao = requests.Session()
# id_integracao = 'c7cd9625-4df7-4674-bfd9-109bac618b9c'
# In[4]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def autenticar(id_integracao):
# '''Realiza a autenticação com o servidor.
# Caso alguma requisição retorne 402, a função autenticar deve ser invocada novamente, gerando um novo token
# id_integracao - uuid disponibilizado na seção "administrar" no site da conexão
# Retorna o Bearer token que deve ser usado nas requisições'''
# host = 'https://api.conexaonfe.com.br/v1/'
# endpoint = 'autenticacao'
# headers = {'id-integracao': id_integracao}
# response = requests.get(host + endpoint, headers=headers)
# print(response.status_code)
# response_text = response.text
# token = json.loads(response_text)
# return token
# In[5]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def listar_chaves_nfe_recebidas(data_inicio, data_fim, token) -> list:
# '''Lista de chaves das notas fiscais eletronicas recebidas no período informado. (max 10 dias)
# data_inicio - data inicial (formato yyyy-mm-dd)
# data_fim - data final (formato yyyy-mm-dd)
# token - bearer token de autenticação
# Retorna um json com a lista das chaves'''
# host = 'https://api.conexaonfe.com.br/v1/'
# endpoint = 'nfe/recebidas/por-data-emissao/'
# url = host + endpoint + data_inicio + '/' + data_fim
# headers = {'Authorization': f'Bearer {token}'}
# response = requests.get(url, headers=headers)
# if response.status_code == 404:
# return []
# else:
# print(response.status_code)
# response_text = response.text
# lista_chaves = json.loads(response_text)
# return lista_chaves
# In[6]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def buscar_xml_por_chave(lista_chaves: list, token: str) -> list:
# '''Buscar xml das notas através das chaves.
# Só estarão disponíveis os xmls capturados pela Conexão.
# lista_chaves - lista de chaves dos xmls
# bearer token de autenticação
# Retorna uma lista de dicionários com os xmls'''
# host = 'https://api.conexaonfe.com.br/v1/'
# endpoint = 'dfes/'
# lista_dicts_xml = []
# for chave in lista_chaves:
# url = host + endpoint + chave
# headers = {'Authorization': 'Bearer ' + token}
# print(url)
# response = requests.get(url, headers=headers)
# print(response.status_code)
# response_text = response.text
# lista_dicts_xml.append(response_text)
# # lista_dicts_xml.append(json.loads(response_text))
# return lista_dicts_xml
# In[7]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def validateAttribute(funcao, valor_se_erro):
# resultado = None
# try:
# resultado = funcao()
# except AttributeError:
# resultado = valor_se_erro
# except Exception as e:
# print(e)
# return resultado
# In[8]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# max_data = spark.sql("SELECT MAX(data_emissao) FROM DL_Ginseng.fato_notas_entrada")
# max_data = max_data.collect()[0]['max(data_emissao)'].strftime('%Y%m%d')
# In[9]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# max_data
# In[10]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_data = gerar_datas_inicio_fim(max_data)
# lista_data
# In[11]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Consumindo a API ==========================================<<<<<<<<<<<<<<<<<<<<< ALTERAR <<<<<<<<<<<<<<<<<<<<<<
# token = autenticar(id_integracao)
# chaves = []
# In[12]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# for datas in lista_data:
# response_api = listar_chaves_nfe_recebidas(datas['inicio'], datas['fim'], token['token'])
# if len(response_api) > 0:
# chaves.extend(response_api)
# # Remover duplicatas usando set() e compreensão de lista
# chaves_sem_duplicatas = [dict(t) for t in {tuple(d.items()) for d in chaves}]
# chaves_request = [(item['chaveAcesso']) for item in chaves_sem_duplicatas]
# len(chaves_request)
# In[13]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Query para retornar as chaves que já estão no banco ==========================================<<<<<<<<<<<<<<<<<<<<< ALTERAR <<<<<<<<<<<<<<<<<<<<<<
# chaves_bd = spark.sql(f"SELECT chave FROM DL_Ginseng.fato_notas_entrada WHERE data_emissao BETWEEN '{lista_data[0]['inicio']}' AND '{lista_data[-1]['fim']}' AND UPPER(nome_emissor) LIKE '%CALAMO%'""")
# In[14]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_chaves_request = pd.DataFrame(chaves_request, columns=['chave'])
# In[15]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Convertendo o DataFrame do PySpark para o DataFrame do Pandas
# df_chaves_bd_pandas = chaves_bd.toPandas()
# # Mesclando os DataFrames do Pandas
# merged_df = pd.merge(df_chaves_bd_pandas, df_chaves_request, on='chave', how='outer', indicator=True)
# In[16]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Chaves que estão no banco de dados, mas não retornaram na consulta da API
# merged_df[merged_df['_merge'] == 'left_only']
# In[17]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Chaves que serão inseridas no banco
# merged_df[merged_df['_merge'] == 'right_only']
# In[18]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# chaves_novas = merged_df[merged_df['_merge'] == 'right_only']['chave'].to_list()
# In[19]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_xml = buscar_xml_por_chave(chaves_novas, token['token'])
# In[20]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# len(chaves_novas) == len(lista_xml)
# In[21]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_notas = []
# lista_erros = []
# lista_itens_notas = []
# lista_itens_erros = []
# qtd_sucesso = 0
# for nota in lista_xml:
# # Verifique se a nota não está vazia
# if not nota.strip():
# print("Nota vazia encontrada.")
# lista_erros.append(["Nota vazia", nota])
# continue
# # Parse do XML
# try:
# xml_tree = etree.fromstring(nota.encode('utf-8'))
# except Exception as e:
# print(e)
# lista_erros.append([e, nota])
# continue
# # O resto do código continua igual
# # Parse do XML
# try:
# xml_tree = etree.fromstring(nota.encode('utf-8'))
# except Exception as e:
# print(e)
# lista_erros.append([e, nota])
# continue
# xml = BeautifulSoup(nota, 'xml')
# def buscar_elementos_filhos(elemento, elemento_pai, xpath):
# return [validateAttribute(lambda: item.find(elemento).text, None) for item in xpath.find_all(elemento_pai)]
# def validateAttribute(funcao, valor_se_erro):
# resultado = None
# try:
# resultado = funcao()
# except AttributeError:
# resultado = valor_se_erro
# except Exception as e:
# print(e)
# return resultado
# # definindo os campos do cabeçalho da nota
# lista_notas.append(
# {
# 'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
# 'cNF': validateAttribute(lambda:xml.ide.nNF.text, None),
# 'serie': validateAttribute(lambda:xml.ide.serie.text, None),
# 'data_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[0], None),
# 'hora_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[1][:-6], None),
# 'cnpj_emissor': validateAttribute(lambda:xml.emit.CNPJ.text, None),
# 'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
# 'cnpj_destinatario': validateAttribute(lambda:xml.dest.CNPJ.text, None),
# 'valor_total_produtos': validateAttribute(lambda:xml.total.ICMSTot.vProd.text, None),
# 'valor_ICMSST': validateAttribute(lambda:xml.total.ICMSTot.vST.text, None),
# 'valor_FCPST': validateAttribute(lambda:xml.total.ICMSTot.vFCPST.text, None),
# 'valor_frete': validateAttribute(lambda:xml.total.ICMSTot.vFrete.text, None),
# 'valor_seguro': validateAttribute(lambda:xml.total.ICMSTot.vSeg.text, None),
# 'valor_outras_despesas': validateAttribute(lambda:xml.total.ICMSTot.vOutro.text, None),
# 'valor_II': validateAttribute(lambda:xml.total.ICMSTot.vII.text, None), ## validar =================
# 'valor_IPI': validateAttribute(lambda:xml.total.ICMSTot.vIPI.text, None),
# 'valor_IPI_Devol': validateAttribute(lambda:xml.total.ICMSTot.vIPIDevol.text, None),
# 'valor_servicos': validateAttribute(lambda: xml.total.ISSQNtot.vServ.text, 0),
# 'valor_desconto': validateAttribute(lambda:xml.total.ICMSTot.vDesc.text, None),
# 'valor_ICMS_desonerado': validateAttribute(lambda:xml.total.ICMSTot.vICMSDeson.text, None),
# 'valor_liquido': validateAttribute(lambda:xml.total.ICMSTot.vNF.text, None),
# 'tipo_pagamento_JSON': '', # implementar JSON com os tipods de pagamento ([{tPag, vPag}, {tPag, vPag}, {tPag, vPag}])
# 'numero_fatura': validateAttribute(lambda:xml.cobr.fat.nFat.text, None),
# 'qtd_parcelas': validateAttribute(lambda: len(xml.cobr.find_all('dup')), None),
# 'duplicatas_json': '', # desenvolver JSON com as duplicatas ([{nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}])
# 'valor_ICMS': validateAttribute(lambda: xml.total.ICMSTot.vICMS.text, None), ## validar =================
# 'situacao': validateAttribute(lambda:xml.protNFe.infProt.xMotivo.text, None)
# }
# )
# # definindo os campos dos itens das notas
# qtd_itens = len(validateAttribute(lambda: xml.infNFe.find_all('det'), 0))
# lista_itens_notas.append(
# {
# 'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
# 'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
# 'n_item': validateAttribute(lambda: [item.get('nItem') for item in xml.infNFe.find_all('det')], 0),
# 'cod_produto': validateAttribute(lambda: buscar_elementos_filhos('cProd', 'det', xml.infNFe), [None] * qtd_itens),
# 'produto': validateAttribute(lambda: buscar_elementos_filhos('xProd', 'det', xml.infNFe), [None] * qtd_itens),
# 'cEAN': validateAttribute(lambda: buscar_elementos_filhos('cEAN', 'det', xml.infNFe), [None] * qtd_itens),
# 'NCM': validateAttribute(lambda: buscar_elementos_filhos('NCM', 'det', xml.infNFe), [None] * qtd_itens),
# 'CEST': validateAttribute(lambda: buscar_elementos_filhos('CEST', 'det', xml.infNFe), [None] * qtd_itens),
# 'CFOP': validateAttribute(lambda: buscar_elementos_filhos('CFOP', 'det', xml.infNFe), [None] * qtd_itens),
# 'unidade_medida': validateAttribute(lambda: buscar_elementos_filhos('uCom', 'det', xml.infNFe), [None] * qtd_itens),
# 'quantidade': validateAttribute(lambda: buscar_elementos_filhos('qCom', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_unitario': validateAttribute(lambda: buscar_elementos_filhos('vUnCom', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_total_produtos': validateAttribute(lambda: buscar_elementos_filhos('vProd', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_frete': validateAttribute(lambda: buscar_elementos_filhos('vFrete', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_seguro': validateAttribute(lambda: buscar_elementos_filhos('vSeg', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_desconto': validateAttribute(lambda: buscar_elementos_filhos('vDesc', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_outras_despesas': validateAttribute(lambda: buscar_elementos_filhos('vOutro', 'det', xml.infNFe), [None] * qtd_itens),
# 'codigo_pedido': validateAttribute(lambda: buscar_elementos_filhos('xPed', 'det', xml.infNFe), [None] * qtd_itens),
# 'cod_origem': validateAttribute(lambda: buscar_elementos_filhos('orig', 'det', xml.infNFe), [None] * qtd_itens),
# 'CST': validateAttribute(lambda: buscar_elementos_filhos('CST', 'det', xml.infNFe), [None] * qtd_itens),
# 'modalidade_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('modBC', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vBC', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_ICMS': validateAttribute(lambda: buscar_elementos_filhos('pICMS', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vICMS', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_FCP': validateAttribute(lambda: buscar_elementos_filhos('vBCFCP', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_FCP': validateAttribute(lambda: buscar_elementos_filhos('pFCP', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_FCP': validateAttribute(lambda: buscar_elementos_filhos('vFCP', 'det', xml.infNFe), [None] * qtd_itens),
# 'modalidade_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('modBCST' , 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_MVA_ST': validateAttribute(lambda: buscar_elementos_filhos('pMVAST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('vBCST', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_ICMS_ST': validateAttribute(lambda: buscar_elementos_filhos('pICMSST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_ICMSST': validateAttribute(lambda: buscar_elementos_filhos('vICMSST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vBCFCPST', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_FCPST': validateAttribute(lambda: buscar_elementos_filhos('pFCPST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vFCPST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_II': validateAttribute(lambda: buscar_elementos_filhos('vII', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_IPI': validateAttribute(lambda: buscar_elementos_filhos('vIPI' , 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_ICMS_desonerado': validateAttribute(lambda: buscar_elementos_filhos('vICMSDeson', 'det', xml.infNFe), [None] * qtd_itens)
# }
# )
# qtd_sucesso += 1
# # if qtd_sucesso + len(lista_erros) == 100:
# # break ### ENCERRAR ==== DEBUG
# print(f'qtd com erro: {len(lista_erros)}')
# print(f'qtd sucesso: {qtd_sucesso}')
# In[22]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas = pd.DataFrame.from_dict(lista_notas)
# df_notas_itens = pd.DataFrame.from_dict(lista_itens_notas)
# In[23]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# len(df_notas) == len(df_notas_itens['chave'].unique())
# In[24]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas['cNF'] = df_notas['cNF'].str.zfill(9)
# df_notas['serie'] = df_notas['serie'].str.zfill(3)
# In[25]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas['valor_total_produtos'] = df_notas['valor_total_produtos'].astype(float)
# df_notas['valor_ICMSST'] = df_notas['valor_ICMSST'].astype(float)
# df_notas['valor_FCPST'] = df_notas['valor_FCPST'].astype(float)
# df_notas['valor_frete'] = df_notas['valor_frete'].astype(float)
# df_notas['valor_seguro'] = df_notas['valor_seguro'].astype(float)
# df_notas['valor_outras_despesas'] = df_notas['valor_outras_despesas'].astype(float)
# df_notas['valor_II'] = df_notas['valor_II'].astype(float)
# df_notas['valor_IPI'] = df_notas['valor_IPI'].astype(float)
# df_notas['valor_IPI_Devol'] = df_notas['valor_IPI_Devol'].astype(float)
# df_notas['valor_servicos'] = df_notas['valor_servicos'].astype(float)
# df_notas['valor_desconto'] = df_notas['valor_desconto'].astype(float)
# df_notas['valor_ICMS_desonerado'] = df_notas['valor_ICMS_desonerado'].astype(float)
# df_notas['valor_liquido'] = df_notas['valor_liquido'].astype(float)
# df_notas['valor_ICMS'] = df_notas['valor_ICMS'].astype(float)
# In[26]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo = df_notas[df_notas['nome_emissor'].str.upper().str.startswith('CALAMO')]
# In[27]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo.iloc[0]
# In[28]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Converter o DataFrame pandas para DataFrame Spark
# df_spark = spark.createDataFrame(df_notas_calamo)
# from pyspark.sql.functions import to_date
# from pyspark.sql.functions import col
# # Converter a coluna 'data_emissao' para o tipo DateType
# df_spark = df_spark.withColumn('data_emissao', to_date(df_spark['data_emissao']))
# # Converter a coluna 'qtd_parcelas' para o tipo LongType
# df_spark = df_spark.withColumn('qtd_parcelas', col('qtd_parcelas').cast('long'))
# # Salvar o DataFrame no formato Delta
# df_spark.write.format("delta").mode("overwrite").saveAsTable("DL_Ginseng.fato_notas_entrada")
# In[29]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # df = spark.sql("SELECT DISTINCT data_emissao FROM DL_Ginseng.fato_notas_entrada ORDER BY data_emissao")
# df = spark.sql("SELECT COUNT(1) FROM DL_Ginseng.fato_notas_entrada")
# display(df)
# In[30]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_explode = [
# 'n_item',
# 'cod_produto',
# 'produto',
# 'cEAN',
# 'NCM',
# 'CEST',
# 'CFOP',
# 'unidade_medida',
# 'quantidade',
# 'valor_unitario',
# 'valor_total_produtos',
# 'valor_frete',
# 'valor_seguro',
# 'valor_desconto',
# 'valor_outras_despesas',
# 'codigo_pedido',
# 'cod_origem',
# 'CST',
# 'modalidade_BC_ICMS',
# 'valor_BC_ICMS',
# 'aliquota_ICMS',
# 'valor_ICMS',
# 'valor_BC_FCP',
# 'aliquota_FCP',
# 'valor_FCP',
# 'modalidade_BC_ST',
# 'aliquota_MVA_ST',
# 'valor_BC_ST',
# 'aliquota_ICMS_ST',
# 'valor_ICMSST',
# 'valor_BC_FCPST',
# 'aliquota_FCPST',
# 'valor_FCPST',
# 'valor_II',
# 'valor_IPI',
# 'valor_ICMS_desonerado'
# ]
# In[31]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_itens.info()
# In[32]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens = df_notas_itens[df_notas_itens['nome_emissor'].str.upper().str.startswith('CALAMO')]
# In[33]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido = df_notas_calamo_itens.explode(lista_explode)
# In[34]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido.head()
# In[35]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # preenchendo com zero à equerda
# df_notas_calamo_itens_expandido['cod_produto'] = df_notas_calamo_itens_expandido['cod_produto'].str.zfill(5)
# df_notas_calamo_itens_expandido['NCM'] = df_notas_calamo_itens_expandido['NCM'].str.zfill(8)
# df_notas_calamo_itens_expandido['CEST'] = df_notas_calamo_itens_expandido['CEST'].str.zfill(7)
# df_notas_calamo_itens_expandido['CFOP'] = df_notas_calamo_itens_expandido['CFOP'].str.zfill(4)
# df_notas_calamo_itens_expandido['CST'] = df_notas_calamo_itens_expandido['CST'].str.zfill(2)
# In[36]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # inteiros
# df_notas_calamo_itens_expandido['quantidade'] = df_notas_calamo_itens_expandido['quantidade'].astype(float).astype(int)
# # decimais
# df_notas_calamo_itens_expandido['valor_unitario'] = df_notas_calamo_itens_expandido['valor_unitario'].astype(float)
# df_notas_calamo_itens_expandido['valor_total_produtos'] = df_notas_calamo_itens_expandido['valor_total_produtos'].astype(float)
# df_notas_calamo_itens_expandido['valor_frete'] = df_notas_calamo_itens_expandido['valor_frete'].astype(float)
# df_notas_calamo_itens_expandido['valor_seguro'] = df_notas_calamo_itens_expandido['valor_seguro'].astype(float)
# df_notas_calamo_itens_expandido['valor_desconto'] = df_notas_calamo_itens_expandido['valor_desconto'].astype(float)
# df_notas_calamo_itens_expandido['valor_outras_despesas'] = df_notas_calamo_itens_expandido['valor_outras_despesas'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_ICMS'] = df_notas_calamo_itens_expandido['valor_BC_ICMS'].astype(float)
# df_notas_calamo_itens_expandido['valor_ICMS'] = df_notas_calamo_itens_expandido['valor_ICMS'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_FCP'] = df_notas_calamo_itens_expandido['valor_BC_FCP'].astype(float)
# df_notas_calamo_itens_expandido['valor_FCP'] = df_notas_calamo_itens_expandido['valor_FCP'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_ST'] = df_notas_calamo_itens_expandido['valor_BC_ST'].astype(float)
# df_notas_calamo_itens_expandido['valor_ICMSST'] = df_notas_calamo_itens_expandido['valor_ICMSST'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_FCPST'] = df_notas_calamo_itens_expandido['valor_BC_FCPST'].astype(float)
# df_notas_calamo_itens_expandido['valor_FCPST'] = df_notas_calamo_itens_expandido['valor_FCPST'].astype(float)
# df_notas_calamo_itens_expandido['valor_II'] = df_notas_calamo_itens_expandido['valor_II'].astype(float)
# df_notas_calamo_itens_expandido['valor_IPI'] = df_notas_calamo_itens_expandido['valor_IPI'].astype(float)
# df_notas_calamo_itens_expandido['valor_ICMS_desonerado'] = df_notas_calamo_itens_expandido['valor_ICMS_desonerado'].astype(float)
# # percentual
# df_notas_calamo_itens_expandido['aliquota_ICMS'] = df_notas_calamo_itens_expandido['aliquota_ICMS'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_FCP'] = df_notas_calamo_itens_expandido['aliquota_FCP'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_MVA_ST'] = df_notas_calamo_itens_expandido['aliquota_MVA_ST'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_ICMS_ST'] = df_notas_calamo_itens_expandido['aliquota_ICMS_ST'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_FCPST'] = df_notas_calamo_itens_expandido['aliquota_FCPST'].astype(float) / 100
# In[37]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido.info()
# In[38]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# if not df_notas_calamo_itens_expandido.empty:
# print(df_notas_calamo_itens_expandido.iloc[0])
# else:
# print("O DataFrame está vazio.")
# In[39]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # for i in range(100):
# df_notas_calamo_itens_expandido.iloc[0]
# In[40]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# len(df_notas_calamo_itens_expandido['chave'].unique())
# In[41]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido.drop(['nome_emissor'], axis=1, inplace=True)
# In[42]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Conveter do DataFrame pandas para DataFrame Spark
# df_spark_itens = spark.createDataFrame(df_notas_calamo_itens_expandido)
# # Tipagem de dados
# df_spark_itens = df_spark_itens.withColumn('n_item', col('n_item').cast('long'))
# df_spark_itens = df_spark_itens.withColumn('quantidade', col('quantidade').cast('long'))
# # Salvar o DataFrame no formato Delta
# df_spark_itens.write.format("delta").mode("overwrite").saveAsTable("DL_Ginseng.fato_notas_entrada_itens")

View File

@ -0,0 +1,168 @@
import requests
import json
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import math
from bs4 import BeautifulSoup
from lxml import etree
import xmltodict
id_integracao = 'c7cd9625-4df7-4674-bfd9-109bac618b9c'
def gerar_datas_inicio_fim(data_ultimo_registro):
'''Gera uma lista de dicionários com as datas de início e fim que devem ser utilizadas como parâmetros
na API de buscar as chaves das notas fiscais
:param str data_ultimo_registro: Data da última emissão de nota fiscal presente no banco de dados, no formato 'YYYYMMDD'.
:return: Lista de dicionários contendo as datas de início e fim para cada intervalo de busca na API.
:rtype: list[dict]
'''
max_data_bd = datetime.strptime(data_ultimo_registro, '%Y%m%d').date()
data_hoje = datetime.today().date()
data_amanha = datetime.today().date() + timedelta(days=1)
# Define a data de início para a busca, 10 dias antes da data do último registro
data_inicio = max_data_bd - timedelta(days=10)
qtd_dias_pendentes = (data_hoje - data_inicio).days
# A API limita a busca de chaves no intervalo de 10 dias, portanto isso faremos x iteracoes para abranger o período desejado
qtd_iteracoes = math.ceil(qtd_dias_pendentes / 10)
contador = 1
lista_datas = []
for i in range(qtd_iteracoes):
# Atuaiza a data de início para cada iteração
data_inicio += timedelta(days = contador)
# Variável que controla a data limite de busca
contador = 10 if i < qtd_iteracoes - 1 else (data_amanha - data_inicio).days
data_fim = data_inicio + timedelta(days = contador)
# extende a lista com um dict que contém a data inicial e data final de cada iteração
lista_datas.extend([{'inicio': data_inicio.strftime('%Y-%m-%d'), 'fim': data_fim.strftime('%Y-%m-%d')}])
return lista_datas
def autenticar(id_integracao):
'''Realiza a autenticação com o servidor.
Caso alguma requisição retorne 402, a função autenticar deve ser invocada novamente, gerando um novo token
id_integracao - uuid disponibilizado na seção "administrar" no site da conexão
Retorna o Bearer token que deve ser usado nas requisições'''
host = 'https://api.conexaonfe.com.br/v1/'
endpoint = 'autenticacao'
headers = {'id-integracao': id_integracao}
response = requests.get(host + endpoint, headers=headers)
print(response.status_code)
response_text = response.text
token = json.loads(response_text)
return token
def listar_chaves_nfe_recebidas(data_inicio, data_fim, token) -> list:
'''Lista de chaves das notas fiscais eletronicas recebidas no período informado. (max 10 dias)
data_inicio - data inicial (formato yyyy-mm-dd)
data_fim - data final (formato yyyy-mm-dd)
token - bearer token de autenticação
Retorna um json com a lista das chaves'''
host = 'https://api.conexaonfe.com.br/v1/'
endpoint = 'nfe/recebidas/por-data-emissao/'
url = host + endpoint + data_inicio + '/' + data_fim
headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers)
if response.status_code == 404:
return []
else:
print(response.status_code)
response_text = response.text
lista_chaves = json.loads(response_text)
return lista_chaves
import os
def buscar_xml_por_chave(lista_chaves: list, token: str) -> list:
'''Buscar xml das notas através das chaves e salvar localmente.
lista_chaves - lista de chaves dos xmls
token - bearer token de autenticação
Retorna uma lista de dicionários com os xmls'''
host = 'https://api.conexaonfe.com.br/v1/'
endpoint = 'dfes/'
pasta_destino = r'\\10.77.77.11\HubSupply\raw_ata\orcamento'
# Cria a pasta se não existir
os.makedirs(pasta_destino, exist_ok=True)
lista_dicts_xml = []
for chave in lista_chaves:
url = host + endpoint + chave
headers = {'Authorization': 'Bearer ' + token}
print(f"Buscando XML: {chave}")
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code}")
if response.status_code == 200:
response_text = response.text
lista_dicts_xml.append(response_text)
# Salvar XML em arquivo local
caminho_arquivo = os.path.join(pasta_destino, f'{chave}.xml')
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo_xml:
arquivo_xml.write(response_text)
else:
print(f"Erro ao buscar chave {chave}: {response.status_code}")
return lista_dicts_xml
# lista_dicts_xml.append(json.loads(response_text))
def validateAttribute(funcao, valor_se_erro):
resultado = None
try:
resultado = funcao()
except AttributeError:
resultado = valor_se_erro
except Exception as e:
print(e)
return resultado
token = autenticar(id_integracao)
chaves = []
lista_data = gerar_datas_inicio_fim('20250101')
for datas in lista_data:
response_api = listar_chaves_nfe_recebidas(datas['inicio'], datas['fim'], token['token'])
if len(response_api) > 0:
chaves.extend(response_api)
# Remover duplicatas usando set() e compreensão de lista
chaves_sem_duplicatas = [dict(t) for t in {tuple(d.items()) for d in chaves}]
chaves_request = [(item['chaveAcesso']) for item in chaves_sem_duplicatas]
len(chaves_request)
lista_xml = buscar_xml_por_chave(chaves_request, token['token'])

View File

@ -0,0 +1,295 @@
import os
import requests
import json
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import math
from bs4 import BeautifulSoup
from lxml import etree
import xmltodict
import sqlite3
# Caminho para a pasta onde os XMLs estão armazenados
pasta_xmls = r'\\10.77.77.11\HubSupply\raw_data\orcamento'
# Lista para armazenar o conteúdo dos XMLs
lista_xml = []
print('Iterando XML')
# Itera sobre os arquivos na pasta
for nome_arquivo in os.listdir(pasta_xmls):
# Verifica se o arquivo tem a extensão .xml
if nome_arquivo.endswith('.xml'):
# Construa o caminho completo do arquivo
caminho_arquivo = os.path.join(pasta_xmls, nome_arquivo)
# Abre e lê o conteúdo do arquivo XML
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
conteudo_xml = arquivo.read()
lista_xml.append(conteudo_xml)
print('Finalizei iteracao')
# Verifique se os XMLs foram carregados
lista_notas = []
lista_erros = []
lista_itens_notas = []
lista_itens_erros = []
qtd_sucesso = 0
print('Transformando os xmls em um DF')
for nota in lista_xml:
# Verifique se a nota não está vazia
if not nota.strip():
print("Nota vazia encontrada.")
lista_erros.append(["Nota vazia", nota])
continue
# Parse do XML
try:
xml_tree = etree.fromstring(nota.encode('utf-8'))
except Exception as e:
print(e)
lista_erros.append([e, nota])
continue
# O resto do código continua igual
# Parse do XML
try:
xml_tree = etree.fromstring(nota.encode('utf-8'))
except Exception as e:
print(e)
lista_erros.append([e, nota])
continue
xml = BeautifulSoup(nota, 'xml')
def buscar_elementos_filhos(elemento, elemento_pai, xpath):
return [validateAttribute(lambda: item.find(elemento).text, None) for item in xpath.find_all(elemento_pai)]
def validateAttribute(funcao, valor_se_erro):
resultado = None
try:
resultado = funcao()
except AttributeError:
resultado = valor_se_erro
except Exception as e:
print(e)
return resultado
# definindo os campos do cabeçalho da nota
lista_notas.append(
{
'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
'cNF': validateAttribute(lambda:xml.ide.nNF.text, None),
'serie': validateAttribute(lambda:xml.ide.serie.text, None),
'data_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[0], None),
'hora_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[1][:-6], None),
'cnpj_emissor': validateAttribute(lambda:xml.emit.CNPJ.text, None),
'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
'cnpj_destinatario': validateAttribute(lambda:xml.dest.CNPJ.text, None),
'valor_total_produtos': validateAttribute(lambda:xml.total.ICMSTot.vProd.text, None),
'valor_ICMSST': validateAttribute(lambda:xml.total.ICMSTot.vST.text, None),
'valor_FCPST': validateAttribute(lambda:xml.total.ICMSTot.vFCPST.text, None),
'valor_frete': validateAttribute(lambda:xml.total.ICMSTot.vFrete.text, None),
'valor_seguro': validateAttribute(lambda:xml.total.ICMSTot.vSeg.text, None),
'valor_outras_despesas': validateAttribute(lambda:xml.total.ICMSTot.vOutro.text, None),
'valor_II': validateAttribute(lambda:xml.total.ICMSTot.vII.text, None), ## validar =================
'valor_IPI': validateAttribute(lambda:xml.total.ICMSTot.vIPI.text, None),
'valor_IPI_Devol': validateAttribute(lambda:xml.total.ICMSTot.vIPIDevol.text, None),
'valor_servicos': validateAttribute(lambda: xml.total.ISSQNtot.vServ.text, 0),
'valor_desconto': validateAttribute(lambda:xml.total.ICMSTot.vDesc.text, None),
'valor_ICMS_desonerado': validateAttribute(lambda:xml.total.ICMSTot.vICMSDeson.text, None),
'valor_liquido': validateAttribute(lambda:xml.total.ICMSTot.vNF.text, None),
'tipo_pagamento_JSON': '', # implementar JSON com os tipods de pagamento ([{tPag, vPag}, {tPag, vPag}, {tPag, vPag}])
'numero_fatura': validateAttribute(lambda:xml.cobr.fat.nFat.text, None),
'qtd_parcelas': validateAttribute(lambda: len(xml.cobr.find_all('dup')), None),
'duplicatas_json': '', # desenvolver JSON com as duplicatas ([{nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}])
'valor_ICMS': validateAttribute(lambda: xml.total.ICMSTot.vICMS.text, None), ## validar =================
'situacao': validateAttribute(lambda:xml.protNFe.infProt.xMotivo.text, None)
}
)
# definindo os campos dos itens das notas
qtd_itens = len(validateAttribute(lambda: xml.infNFe.find_all('det'), 0))
lista_itens_notas.append(
{
'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
'n_item': validateAttribute(lambda: [item.get('nItem') for item in xml.infNFe.find_all('det')], 0),
'cod_produto': validateAttribute(lambda: buscar_elementos_filhos('cProd', 'det', xml.infNFe), [None] * qtd_itens),
'produto': validateAttribute(lambda: buscar_elementos_filhos('xProd', 'det', xml.infNFe), [None] * qtd_itens),
'cEAN': validateAttribute(lambda: buscar_elementos_filhos('cEAN', 'det', xml.infNFe), [None] * qtd_itens),
'NCM': validateAttribute(lambda: buscar_elementos_filhos('NCM', 'det', xml.infNFe), [None] * qtd_itens),
'CEST': validateAttribute(lambda: buscar_elementos_filhos('CEST', 'det', xml.infNFe), [None] * qtd_itens),
'CFOP': validateAttribute(lambda: buscar_elementos_filhos('CFOP', 'det', xml.infNFe), [None] * qtd_itens),
'unidade_medida': validateAttribute(lambda: buscar_elementos_filhos('uCom', 'det', xml.infNFe), [None] * qtd_itens),
'quantidade': validateAttribute(lambda: buscar_elementos_filhos('qCom', 'det', xml.infNFe), [None] * qtd_itens),
'valor_unitario': validateAttribute(lambda: buscar_elementos_filhos('vUnCom', 'det', xml.infNFe), [None] * qtd_itens),
'valor_total_produtos': validateAttribute(lambda: buscar_elementos_filhos('vProd', 'det', xml.infNFe), [None] * qtd_itens),
'valor_frete': validateAttribute(lambda: buscar_elementos_filhos('vFrete', 'det', xml.infNFe), [None] * qtd_itens),
'valor_seguro': validateAttribute(lambda: buscar_elementos_filhos('vSeg', 'det', xml.infNFe), [None] * qtd_itens),
'valor_desconto': validateAttribute(lambda: buscar_elementos_filhos('vDesc', 'det', xml.infNFe), [None] * qtd_itens),
'valor_outras_despesas': validateAttribute(lambda: buscar_elementos_filhos('vOutro', 'det', xml.infNFe), [None] * qtd_itens),
'codigo_pedido': validateAttribute(lambda: buscar_elementos_filhos('xPed', 'det', xml.infNFe), [None] * qtd_itens),
'cod_origem': validateAttribute(lambda: buscar_elementos_filhos('orig', 'det', xml.infNFe), [None] * qtd_itens),
'CST': validateAttribute(lambda: buscar_elementos_filhos('CST', 'det', xml.infNFe), [None] * qtd_itens),
'modalidade_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('modBC', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vBC', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_ICMS': validateAttribute(lambda: buscar_elementos_filhos('pICMS', 'det', xml.infNFe), [None] * qtd_itens),
'valor_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vICMS', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_FCP': validateAttribute(lambda: buscar_elementos_filhos('vBCFCP', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_FCP': validateAttribute(lambda: buscar_elementos_filhos('pFCP', 'det', xml.infNFe), [None] * qtd_itens),
'valor_FCP': validateAttribute(lambda: buscar_elementos_filhos('vFCP', 'det', xml.infNFe), [None] * qtd_itens),
'modalidade_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('modBCST' , 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_MVA_ST': validateAttribute(lambda: buscar_elementos_filhos('pMVAST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('vBCST', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_ICMS_ST': validateAttribute(lambda: buscar_elementos_filhos('pICMSST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_ICMSST': validateAttribute(lambda: buscar_elementos_filhos('vICMSST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vBCFCPST', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_FCPST': validateAttribute(lambda: buscar_elementos_filhos('pFCPST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vFCPST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_II': validateAttribute(lambda: buscar_elementos_filhos('vII', 'det', xml.infNFe), [None] * qtd_itens),
'valor_IPI': validateAttribute(lambda: buscar_elementos_filhos('vIPI' , 'det', xml.infNFe), [None] * qtd_itens),
'valor_ICMS_desonerado': validateAttribute(lambda: buscar_elementos_filhos('vICMSDeson', 'det', xml.infNFe), [None] * qtd_itens)
}
)
qtd_sucesso += 1
# if qtd_sucesso + len(lista_erros) == 100:
# break ### ENCERRAR ==== DEBUG
print('Transformando os xmls em um DF')
print(f'qtd com erro: {len(lista_erros)}')
print(f'qtd sucesso: {qtd_sucesso}')
print('Comecei a tipagem')
df_notas = pd.DataFrame.from_dict(lista_notas)
df_notas_itens = pd.DataFrame.from_dict(lista_itens_notas)
len(df_notas) == len(df_notas_itens['chave'].unique())
df_notas['cNF'] = df_notas['cNF'].str.zfill(9)
df_notas['serie'] = df_notas['serie'].str.zfill(3)
df_notas['valor_total_produtos'] = df_notas['valor_total_produtos'].astype(float)
df_notas['valor_ICMSST'] = df_notas['valor_ICMSST'].astype(float)
df_notas['valor_FCPST'] = df_notas['valor_FCPST'].astype(float)
df_notas['valor_frete'] = df_notas['valor_frete'].astype(float)
df_notas['valor_seguro'] = df_notas['valor_seguro'].astype(float)
df_notas['valor_outras_despesas'] = df_notas['valor_outras_despesas'].astype(float)
df_notas['valor_II'] = df_notas['valor_II'].astype(float)
df_notas['valor_IPI'] = df_notas['valor_IPI'].astype(float)
df_notas['valor_IPI_Devol'] = df_notas['valor_IPI_Devol'].astype(float)
df_notas['valor_servicos'] = df_notas['valor_servicos'].astype(float)
df_notas['valor_desconto'] = df_notas['valor_desconto'].astype(float)
df_notas['valor_ICMS_desonerado'] = df_notas['valor_ICMS_desonerado'].astype(float)
df_notas['valor_liquido'] = df_notas['valor_liquido'].astype(float)
df_notas['valor_ICMS'] = df_notas['valor_ICMS'].astype(float)
df_notas_calamo = df_notas[df_notas['nome_emissor'].str.upper().str.startswith('CALAMO')]
df_notas_calamo.iloc[0]
print(df_notas_calamo)
lista_explode = [
'n_item',
'cod_produto',
'produto',
'cEAN',
'NCM',
'CEST',
'CFOP',
'unidade_medida',
'quantidade',
'valor_unitario',
'valor_total_produtos',
'valor_frete',
'valor_seguro',
'valor_desconto',
'valor_outras_despesas',
'codigo_pedido',
'cod_origem',
'CST',
'modalidade_BC_ICMS',
'valor_BC_ICMS',
'aliquota_ICMS',
'valor_ICMS',
'valor_BC_FCP',
'aliquota_FCP',
'valor_FCP',
'modalidade_BC_ST',
'aliquota_MVA_ST',
'valor_BC_ST',
'aliquota_ICMS_ST',
'valor_ICMSST',
'valor_BC_FCPST',
'aliquota_FCPST',
'valor_FCPST',
'valor_II',
'valor_IPI',
'valor_ICMS_desonerado'
]
df_notas_calamo_itens = df_notas_itens[df_notas_itens['nome_emissor'].str.upper().str.startswith('CALAMO')]
df_notas_calamo_itens_expandido = df_notas_calamo_itens.explode(lista_explode)
df_notas_calamo_itens_expandido.head()
df_notas_calamo_itens_expandido['cod_produto'] = df_notas_calamo_itens_expandido['cod_produto'].str.zfill(5)
df_notas_calamo_itens_expandido['NCM'] = df_notas_calamo_itens_expandido['NCM'].str.zfill(8)
df_notas_calamo_itens_expandido['CEST'] = df_notas_calamo_itens_expandido['CEST'].str.zfill(7)
df_notas_calamo_itens_expandido['CFOP'] = df_notas_calamo_itens_expandido['CFOP'].str.zfill(4)
df_notas_calamo_itens_expandido['CST'] = df_notas_calamo_itens_expandido['CST'].str.zfill(2)
df_notas_calamo_itens_expandido['quantidade'] = df_notas_calamo_itens_expandido['quantidade'].astype(float).astype(int)
# decimais
df_notas_calamo_itens_expandido['valor_unitario'] = df_notas_calamo_itens_expandido['valor_unitario'].astype(float)
df_notas_calamo_itens_expandido['valor_total_produtos'] = df_notas_calamo_itens_expandido['valor_total_produtos'].astype(float)
df_notas_calamo_itens_expandido['valor_frete'] = df_notas_calamo_itens_expandido['valor_frete'].astype(float)
df_notas_calamo_itens_expandido['valor_seguro'] = df_notas_calamo_itens_expandido['valor_seguro'].astype(float)
df_notas_calamo_itens_expandido['valor_desconto'] = df_notas_calamo_itens_expandido['valor_desconto'].astype(float)
df_notas_calamo_itens_expandido['valor_outras_despesas'] = df_notas_calamo_itens_expandido['valor_outras_despesas'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_ICMS'] = df_notas_calamo_itens_expandido['valor_BC_ICMS'].astype(float)
df_notas_calamo_itens_expandido['valor_ICMS'] = df_notas_calamo_itens_expandido['valor_ICMS'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_FCP'] = df_notas_calamo_itens_expandido['valor_BC_FCP'].astype(float)
df_notas_calamo_itens_expandido['valor_FCP'] = df_notas_calamo_itens_expandido['valor_FCP'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_ST'] = df_notas_calamo_itens_expandido['valor_BC_ST'].astype(float)
df_notas_calamo_itens_expandido['valor_ICMSST'] = df_notas_calamo_itens_expandido['valor_ICMSST'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_FCPST'] = df_notas_calamo_itens_expandido['valor_BC_FCPST'].astype(float)
df_notas_calamo_itens_expandido['valor_FCPST'] = df_notas_calamo_itens_expandido['valor_FCPST'].astype(float)
df_notas_calamo_itens_expandido['valor_II'] = df_notas_calamo_itens_expandido['valor_II'].astype(float)
df_notas_calamo_itens_expandido['valor_IPI'] = df_notas_calamo_itens_expandido['valor_IPI'].astype(float)
df_notas_calamo_itens_expandido['valor_ICMS_desonerado'] = df_notas_calamo_itens_expandido['valor_ICMS_desonerado'].astype(float)
# percentual
df_notas_calamo_itens_expandido['aliquota_ICMS'] = df_notas_calamo_itens_expandido['aliquota_ICMS'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_FCP'] = df_notas_calamo_itens_expandido['aliquota_FCP'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_MVA_ST'] = df_notas_calamo_itens_expandido['aliquota_MVA_ST'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_ICMS_ST'] = df_notas_calamo_itens_expandido['aliquota_ICMS_ST'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_FCPST'] = df_notas_calamo_itens_expandido['aliquota_FCPST'].astype(float) / 100
print(df_notas_calamo_itens_expandido.iloc[0])
df_notas_calamo_itens_expandido.drop(['nome_emissor'], axis=1, inplace=True)
print(df_notas_calamo_itens_expandido)
print('Finalizei a tipagem')
conn = sqlite3.connect(r'\\10.77.77.11\HubSupply\db_data\DL_Ginseng.db')
print('Carregando para o Banco de Dados')
df_notas_calamo.to_sql('fato_notas_entrada', conn, if_exists='replace', index=False)
df_notas_calamo_itens_expandido.to_sql('fato_notas_entrada_itens', conn, if_exists='replace', index=False)
query1 = 'SELECT * FROM fato_notas_entrada'
query2 = 'SELECT * FROM fato_notas_entrada_itens'
# Lendo os dados de volta
df1_from_db = pd.read_sql(query1, conn)
df2_from_db = pd.read_sql(query2, conn)
print("Tabela 1 carregada do banco:")
print(df1_from_db)
print("\nTabela 2 carregada do banco:")
print(df2_from_db)
conn.close()

View File

@ -0,0 +1,27 @@
import pandas as pd
from sqlalchemy import create_engine
import sqlite3
PDV = pd.read_excel(r'S:/5.PROJETOS BI/DASHBOARDS/SUPRIMENTOS/Tabela Dimensão/PDVs.xlsx')
SKU = pd.read_excel(r'S:/5.PROJETOS BI/DASHBOARDS/SUPRIMENTOS/Tabela Dimensão/SKUS.xlsx')
conn = sqlite3.connect(r'\\10.77.77.11\HubSupply\db_data\DL_Ginseng.db')
SKU.to_sql('SKUS', conn, if_exists='replace', index=False)
PDV.to_sql('PDV', conn, if_exists='replace', index=False)
query1 = 'SELECT * FROM SKUS'
query2 = 'SELECT * FROM PDV'
df1_from_db = pd.read_sql(query1, conn)
df2_from_db = pd.read_sql(query2, conn)
print("Tabela 1 carregada do banco:")
print(df1_from_db)
print("\nTabela 2 carregada do banco:")
print(df2_from_db)
conn.close()

168
src/extracao_orcamento.py Normal file
View File

@ -0,0 +1,168 @@
import requests
import json
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import math
from bs4 import BeautifulSoup
from lxml import etree
import xmltodict
id_integracao = 'c7cd9625-4df7-4674-bfd9-109bac618b9c'
def gerar_datas_inicio_fim(data_ultimo_registro):
'''Gera uma lista de dicionários com as datas de início e fim que devem ser utilizadas como parâmetros
na API de buscar as chaves das notas fiscais
:param str data_ultimo_registro: Data da última emissão de nota fiscal presente no banco de dados, no formato 'YYYYMMDD'.
:return: Lista de dicionários contendo as datas de início e fim para cada intervalo de busca na API.
:rtype: list[dict]
'''
max_data_bd = datetime.strptime(data_ultimo_registro, '%Y%m%d').date()
data_hoje = datetime.today().date()
data_amanha = datetime.today().date() + timedelta(days=1)
# Define a data de início para a busca, 10 dias antes da data do último registro
data_inicio = max_data_bd - timedelta(days=10)
qtd_dias_pendentes = (data_hoje - data_inicio).days
# A API limita a busca de chaves no intervalo de 10 dias, portanto isso faremos x iteracoes para abranger o período desejado
qtd_iteracoes = math.ceil(qtd_dias_pendentes / 10)
contador = 1
lista_datas = []
for i in range(qtd_iteracoes):
# Atuaiza a data de início para cada iteração
data_inicio += timedelta(days = contador)
# Variável que controla a data limite de busca
contador = 10 if i < qtd_iteracoes - 1 else (data_amanha - data_inicio).days
data_fim = data_inicio + timedelta(days = contador)
# extende a lista com um dict que contém a data inicial e data final de cada iteração
lista_datas.extend([{'inicio': data_inicio.strftime('%Y-%m-%d'), 'fim': data_fim.strftime('%Y-%m-%d')}])
return lista_datas
def autenticar(id_integracao):
'''Realiza a autenticação com o servidor.
Caso alguma requisição retorne 402, a função autenticar deve ser invocada novamente, gerando um novo token
id_integracao - uuid disponibilizado na seção "administrar" no site da conexão
Retorna o Bearer token que deve ser usado nas requisições'''
host = 'https://api.conexaonfe.com.br/v1/'
endpoint = 'autenticacao'
headers = {'id-integracao': id_integracao}
response = requests.get(host + endpoint, headers=headers)
print(response.status_code)
response_text = response.text
token = json.loads(response_text)
return token
def listar_chaves_nfe_recebidas(data_inicio, data_fim, token) -> list:
'''Lista de chaves das notas fiscais eletronicas recebidas no período informado. (max 10 dias)
data_inicio - data inicial (formato yyyy-mm-dd)
data_fim - data final (formato yyyy-mm-dd)
token - bearer token de autenticação
Retorna um json com a lista das chaves'''
host = 'https://api.conexaonfe.com.br/v1/'
endpoint = 'nfe/recebidas/por-data-emissao/'
url = host + endpoint + data_inicio + '/' + data_fim
headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers)
if response.status_code == 404:
return []
else:
print(response.status_code)
response_text = response.text
lista_chaves = json.loads(response_text)
return lista_chaves
import os
def buscar_xml_por_chave(lista_chaves: list, token: str) -> list:
'''Buscar xml das notas através das chaves e salvar localmente.
lista_chaves - lista de chaves dos xmls
token - bearer token de autenticação
Retorna uma lista de dicionários com os xmls'''
host = 'https://api.conexaonfe.com.br/v1/'
endpoint = 'dfes/'
pasta_destino = r'\\10.77.77.11\HubSupply\raw_ata\orcamento'
# Cria a pasta se não existir
os.makedirs(pasta_destino, exist_ok=True)
lista_dicts_xml = []
for chave in lista_chaves:
url = host + endpoint + chave
headers = {'Authorization': 'Bearer ' + token}
print(f"Buscando XML: {chave}")
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code}")
if response.status_code == 200:
response_text = response.text
lista_dicts_xml.append(response_text)
# Salvar XML em arquivo local
caminho_arquivo = os.path.join(pasta_destino, f'{chave}.xml')
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo_xml:
arquivo_xml.write(response_text)
else:
print(f"Erro ao buscar chave {chave}: {response.status_code}")
return lista_dicts_xml
# lista_dicts_xml.append(json.loads(response_text))
def validateAttribute(funcao, valor_se_erro):
resultado = None
try:
resultado = funcao()
except AttributeError:
resultado = valor_se_erro
except Exception as e:
print(e)
return resultado
token = autenticar(id_integracao)
chaves = []
lista_data = gerar_datas_inicio_fim('20250101')
for datas in lista_data:
response_api = listar_chaves_nfe_recebidas(datas['inicio'], datas['fim'], token['token'])
if len(response_api) > 0:
chaves.extend(response_api)
# Remover duplicatas usando set() e compreensão de lista
chaves_sem_duplicatas = [dict(t) for t in {tuple(d.items()) for d in chaves}]
chaves_request = [(item['chaveAcesso']) for item in chaves_sem_duplicatas]
len(chaves_request)
lista_xml = buscar_xml_por_chave(chaves_request, token['token'])

297
src/tratamento_orcamento.py Normal file
View File

@ -0,0 +1,297 @@
import os
import requests
import json
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import math
from bs4 import BeautifulSoup
from lxml import etree
import xmltodict
import sqlite3
# Caminho para a pasta onde os XMLs estão armazenados
pasta_xmls = r'\\10.77.77.11\HubSupply\raw_data\orcamento'
# Lista para armazenar o conteúdo dos XMLs
lista_xml = []
print('Iterando XML')
# Itera sobre os arquivos na pasta
for nome_arquivo in os.listdir(pasta_xmls):
# Verifica se o arquivo tem a extensão .xml
if nome_arquivo.endswith('.xml'):
# Construa o caminho completo do arquivo
caminho_arquivo = os.path.join(pasta_xmls, nome_arquivo)
# Abre e lê o conteúdo do arquivo XML
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
conteudo_xml = arquivo.read()
lista_xml.append(conteudo_xml)
print('Finalizei iteracao')
# Verifique se os XMLs foram carregados
lista_notas = []
lista_erros = []
lista_itens_notas = []
lista_itens_erros = []
qtd_sucesso = 0
print('Transformando os xmls em um DF')
for nota in lista_xml:
# Verifique se a nota não está vazia
if not nota.strip():
print("Nota vazia encontrada.")
lista_erros.append(["Nota vazia", nota])
continue
# Parse do XML
try:
xml_tree = etree.fromstring(nota.encode('utf-8'))
except Exception as e:
print(e)
lista_erros.append([e, nota])
continue
# O resto do código continua igual
# Parse do XML
try:
xml_tree = etree.fromstring(nota.encode('utf-8'))
except Exception as e:
print(e)
lista_erros.append([e, nota])
continue
xml = BeautifulSoup(nota, 'xml')
def buscar_elementos_filhos(elemento, elemento_pai, xpath):
return [validateAttribute(lambda: item.find(elemento).text, None) for item in xpath.find_all(elemento_pai)]
def validateAttribute(funcao, valor_se_erro):
resultado = None
try:
resultado = funcao()
except AttributeError:
resultado = valor_se_erro
except Exception as e:
print(e)
return resultado
# definindo os campos do cabeçalho da nota
lista_notas.append(
{
'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
'cNF': validateAttribute(lambda:xml.ide.nNF.text, None),
'serie': validateAttribute(lambda:xml.ide.serie.text, None),
'data_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[0], None),
'hora_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[1][:-6], None),
'cnpj_emissor': validateAttribute(lambda:xml.emit.CNPJ.text, None),
'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
'cnpj_destinatario': validateAttribute(lambda:xml.dest.CNPJ.text, None),
'valor_total_produtos': validateAttribute(lambda:xml.total.ICMSTot.vProd.text, None),
'valor_ICMSST': validateAttribute(lambda:xml.total.ICMSTot.vST.text, None),
'valor_FCPST': validateAttribute(lambda:xml.total.ICMSTot.vFCPST.text, None),
'valor_frete': validateAttribute(lambda:xml.total.ICMSTot.vFrete.text, None),
'valor_seguro': validateAttribute(lambda:xml.total.ICMSTot.vSeg.text, None),
'valor_outras_despesas': validateAttribute(lambda:xml.total.ICMSTot.vOutro.text, None),
'valor_II': validateAttribute(lambda:xml.total.ICMSTot.vII.text, None), ## validar =================
'valor_IPI': validateAttribute(lambda:xml.total.ICMSTot.vIPI.text, None),
'valor_IPI_Devol': validateAttribute(lambda:xml.total.ICMSTot.vIPIDevol.text, None),
'valor_servicos': validateAttribute(lambda: xml.total.ISSQNtot.vServ.text, 0),
'valor_desconto': validateAttribute(lambda:xml.total.ICMSTot.vDesc.text, None),
'valor_ICMS_desonerado': validateAttribute(lambda:xml.total.ICMSTot.vICMSDeson.text, None),
'valor_liquido': validateAttribute(lambda:xml.total.ICMSTot.vNF.text, None),
'tipo_pagamento_JSON': '', # implementar JSON com os tipods de pagamento ([{tPag, vPag}, {tPag, vPag}, {tPag, vPag}])
'numero_fatura': validateAttribute(lambda:xml.cobr.fat.nFat.text, None),
'qtd_parcelas': validateAttribute(lambda: len(xml.cobr.find_all('dup')), None),
'duplicatas_json': '', # desenvolver JSON com as duplicatas ([{nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}])
'valor_ICMS': validateAttribute(lambda: xml.total.ICMSTot.vICMS.text, None), ## validar =================
'situacao': validateAttribute(lambda:xml.protNFe.infProt.xMotivo.text, None)
}
)
# definindo os campos dos itens das notas
qtd_itens = len(validateAttribute(lambda: xml.infNFe.find_all('det'), 0))
lista_itens_notas.append(
{
'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
'n_item': validateAttribute(lambda: [item.get('nItem') for item in xml.infNFe.find_all('det')], 0),
'cod_produto': validateAttribute(lambda: buscar_elementos_filhos('cProd', 'det', xml.infNFe), [None] * qtd_itens),
'produto': validateAttribute(lambda: buscar_elementos_filhos('xProd', 'det', xml.infNFe), [None] * qtd_itens),
'cEAN': validateAttribute(lambda: buscar_elementos_filhos('cEAN', 'det', xml.infNFe), [None] * qtd_itens),
'NCM': validateAttribute(lambda: buscar_elementos_filhos('NCM', 'det', xml.infNFe), [None] * qtd_itens),
'CEST': validateAttribute(lambda: buscar_elementos_filhos('CEST', 'det', xml.infNFe), [None] * qtd_itens),
'CFOP': validateAttribute(lambda: buscar_elementos_filhos('CFOP', 'det', xml.infNFe), [None] * qtd_itens),
'unidade_medida': validateAttribute(lambda: buscar_elementos_filhos('uCom', 'det', xml.infNFe), [None] * qtd_itens),
'quantidade': validateAttribute(lambda: buscar_elementos_filhos('qCom', 'det', xml.infNFe), [None] * qtd_itens),
'valor_unitario': validateAttribute(lambda: buscar_elementos_filhos('vUnCom', 'det', xml.infNFe), [None] * qtd_itens),
'valor_total_produtos': validateAttribute(lambda: buscar_elementos_filhos('vProd', 'det', xml.infNFe), [None] * qtd_itens),
'valor_frete': validateAttribute(lambda: buscar_elementos_filhos('vFrete', 'det', xml.infNFe), [None] * qtd_itens),
'valor_seguro': validateAttribute(lambda: buscar_elementos_filhos('vSeg', 'det', xml.infNFe), [None] * qtd_itens),
'valor_desconto': validateAttribute(lambda: buscar_elementos_filhos('vDesc', 'det', xml.infNFe), [None] * qtd_itens),
'valor_outras_despesas': validateAttribute(lambda: buscar_elementos_filhos('vOutro', 'det', xml.infNFe), [None] * qtd_itens),
'codigo_pedido': validateAttribute(lambda: buscar_elementos_filhos('xPed', 'det', xml.infNFe), [None] * qtd_itens),
'cod_origem': validateAttribute(lambda: buscar_elementos_filhos('orig', 'det', xml.infNFe), [None] * qtd_itens),
'CST': validateAttribute(lambda: buscar_elementos_filhos('CST', 'det', xml.infNFe), [None] * qtd_itens),
'modalidade_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('modBC', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vBC', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_ICMS': validateAttribute(lambda: buscar_elementos_filhos('pICMS', 'det', xml.infNFe), [None] * qtd_itens),
'valor_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vICMS', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_FCP': validateAttribute(lambda: buscar_elementos_filhos('vBCFCP', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_FCP': validateAttribute(lambda: buscar_elementos_filhos('pFCP', 'det', xml.infNFe), [None] * qtd_itens),
'valor_FCP': validateAttribute(lambda: buscar_elementos_filhos('vFCP', 'det', xml.infNFe), [None] * qtd_itens),
'modalidade_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('modBCST' , 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_MVA_ST': validateAttribute(lambda: buscar_elementos_filhos('pMVAST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('vBCST', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_ICMS_ST': validateAttribute(lambda: buscar_elementos_filhos('pICMSST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_ICMSST': validateAttribute(lambda: buscar_elementos_filhos('vICMSST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_BC_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vBCFCPST', 'det', xml.infNFe), [None] * qtd_itens),
'aliquota_FCPST': validateAttribute(lambda: buscar_elementos_filhos('pFCPST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vFCPST', 'det', xml.infNFe), [None] * qtd_itens),
'valor_II': validateAttribute(lambda: buscar_elementos_filhos('vII', 'det', xml.infNFe), [None] * qtd_itens),
'valor_IPI': validateAttribute(lambda: buscar_elementos_filhos('vIPI' , 'det', xml.infNFe), [None] * qtd_itens),
'valor_ICMS_desonerado': validateAttribute(lambda: buscar_elementos_filhos('vICMSDeson', 'det', xml.infNFe), [None] * qtd_itens)
}
)
qtd_sucesso += 1
# if qtd_sucesso + len(lista_erros) == 100:
# break ### ENCERRAR ==== DEBUG
print('Transformando os xmls em um DF')
print(f'qtd com erro: {len(lista_erros)}')
print(f'qtd sucesso: {qtd_sucesso}')
print('Comecei a tipagem')
df_notas = pd.DataFrame.from_dict(lista_notas)
df_notas_itens = pd.DataFrame.from_dict(lista_itens_notas)
len(df_notas) == len(df_notas_itens['chave'].unique())
df_notas['cNF'] = df_notas['cNF'].str.zfill(9)
df_notas['serie'] = df_notas['serie'].str.zfill(3)
df_notas['valor_total_produtos'] = df_notas['valor_total_produtos'].astype(float)
df_notas['valor_ICMSST'] = df_notas['valor_ICMSST'].astype(float)
df_notas['valor_FCPST'] = df_notas['valor_FCPST'].astype(float)
df_notas['valor_frete'] = df_notas['valor_frete'].astype(float)
df_notas['valor_seguro'] = df_notas['valor_seguro'].astype(float)
df_notas['valor_outras_despesas'] = df_notas['valor_outras_despesas'].astype(float)
df_notas['valor_II'] = df_notas['valor_II'].astype(float)
df_notas['valor_IPI'] = df_notas['valor_IPI'].astype(float)
df_notas['valor_IPI_Devol'] = df_notas['valor_IPI_Devol'].astype(float)
df_notas['valor_servicos'] = df_notas['valor_servicos'].astype(float)
df_notas['valor_desconto'] = df_notas['valor_desconto'].astype(float)
df_notas['valor_ICMS_desonerado'] = df_notas['valor_ICMS_desonerado'].astype(float)
df_notas['valor_liquido'] = df_notas['valor_liquido'].astype(float)
df_notas['valor_ICMS'] = df_notas['valor_ICMS'].astype(float)
df_notas_calamo = df_notas[df_notas['nome_emissor'].str.upper().str.startswith('CALAMO')]
df_notas_calamo.iloc[0]
print(df_notas_calamo)
lista_explode = [
'n_item',
'cod_produto',
'produto',
'cEAN',
'NCM',
'CEST',
'CFOP',
'unidade_medida',
'quantidade',
'valor_unitario',
'valor_total_produtos',
'valor_frete',
'valor_seguro',
'valor_desconto',
'valor_outras_despesas',
'codigo_pedido',
'cod_origem',
'CST',
'modalidade_BC_ICMS',
'valor_BC_ICMS',
'aliquota_ICMS',
'valor_ICMS',
'valor_BC_FCP',
'aliquota_FCP',
'valor_FCP',
'modalidade_BC_ST',
'aliquota_MVA_ST',
'valor_BC_ST',
'aliquota_ICMS_ST',
'valor_ICMSST',
'valor_BC_FCPST',
'aliquota_FCPST',
'valor_FCPST',
'valor_II',
'valor_IPI',
'valor_ICMS_desonerado'
]
df_notas_calamo_itens = df_notas_itens[df_notas_itens['nome_emissor'].str.upper().str.startswith('CALAMO')]
df_notas_calamo_itens_expandido = df_notas_calamo_itens.explode(lista_explode)
df_notas_calamo_itens_expandido.head()
df_notas_calamo_itens_expandido['cod_produto'] = df_notas_calamo_itens_expandido['cod_produto'].str.zfill(5)
df_notas_calamo_itens_expandido['NCM'] = df_notas_calamo_itens_expandido['NCM'].str.zfill(8)
df_notas_calamo_itens_expandido['CEST'] = df_notas_calamo_itens_expandido['CEST'].str.zfill(7)
df_notas_calamo_itens_expandido['CFOP'] = df_notas_calamo_itens_expandido['CFOP'].str.zfill(4)
df_notas_calamo_itens_expandido['CST'] = df_notas_calamo_itens_expandido['CST'].str.zfill(2)
df_notas_calamo_itens_expandido['quantidade'] = df_notas_calamo_itens_expandido['quantidade'].astype(float).astype(int)
# decimais
df_notas_calamo_itens_expandido['valor_unitario'] = df_notas_calamo_itens_expandido['valor_unitario'].astype(float)
df_notas_calamo_itens_expandido['valor_total_produtos'] = df_notas_calamo_itens_expandido['valor_total_produtos'].astype(float)
df_notas_calamo_itens_expandido['valor_frete'] = df_notas_calamo_itens_expandido['valor_frete'].astype(float)
df_notas_calamo_itens_expandido['valor_seguro'] = df_notas_calamo_itens_expandido['valor_seguro'].astype(float)
df_notas_calamo_itens_expandido['valor_desconto'] = df_notas_calamo_itens_expandido['valor_desconto'].astype(float)
df_notas_calamo_itens_expandido['valor_outras_despesas'] = df_notas_calamo_itens_expandido['valor_outras_despesas'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_ICMS'] = df_notas_calamo_itens_expandido['valor_BC_ICMS'].astype(float)
df_notas_calamo_itens_expandido['valor_ICMS'] = df_notas_calamo_itens_expandido['valor_ICMS'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_FCP'] = df_notas_calamo_itens_expandido['valor_BC_FCP'].astype(float)
df_notas_calamo_itens_expandido['valor_FCP'] = df_notas_calamo_itens_expandido['valor_FCP'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_ST'] = df_notas_calamo_itens_expandido['valor_BC_ST'].astype(float)
df_notas_calamo_itens_expandido['valor_ICMSST'] = df_notas_calamo_itens_expandido['valor_ICMSST'].astype(float)
df_notas_calamo_itens_expandido['valor_BC_FCPST'] = df_notas_calamo_itens_expandido['valor_BC_FCPST'].astype(float)
df_notas_calamo_itens_expandido['valor_FCPST'] = df_notas_calamo_itens_expandido['valor_FCPST'].astype(float)
df_notas_calamo_itens_expandido['valor_II'] = df_notas_calamo_itens_expandido['valor_II'].astype(float)
df_notas_calamo_itens_expandido['valor_IPI'] = df_notas_calamo_itens_expandido['valor_IPI'].astype(float)
df_notas_calamo_itens_expandido['valor_ICMS_desonerado'] = df_notas_calamo_itens_expandido['valor_ICMS_desonerado'].astype(float)
# percentual
df_notas_calamo_itens_expandido['aliquota_ICMS'] = df_notas_calamo_itens_expandido['aliquota_ICMS'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_FCP'] = df_notas_calamo_itens_expandido['aliquota_FCP'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_MVA_ST'] = df_notas_calamo_itens_expandido['aliquota_MVA_ST'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_ICMS_ST'] = df_notas_calamo_itens_expandido['aliquota_ICMS_ST'].astype(float) / 100
df_notas_calamo_itens_expandido['aliquota_FCPST'] = df_notas_calamo_itens_expandido['aliquota_FCPST'].astype(float) / 100
print(df_notas_calamo_itens_expandido.iloc[0])
df_notas_calamo_itens_expandido.drop(['nome_emissor'], axis=1, inplace=True)
print(df_notas_calamo_itens_expandido)
print('Finalizei a tipagem')
# Carregamento
conn = sqlite3.connect(r'\\10.77.77.11\HubSupply\db_data\DL_Ginseng.db')
print('Carregando para o Banco de Dados')
df_notas_calamo.to_sql('fato_notas_entrada', conn, if_exists='replace', index=False)
df_notas_calamo_itens_expandido.to_sql('fato_notas_entrada_itens', conn, if_exists='replace', index=False)
query1 = 'SELECT * FROM fato_notas_entrada'
query2 = 'SELECT * FROM fato_notas_entrada_itens'
# Lendo os dados de volta
df1_from_db = pd.read_sql(query1, conn)
df2_from_db = pd.read_sql(query2, conn)
print("Tabela 1 carregada do banco:")
print(df1_from_db)
print("\nTabela 2 carregada do banco:")
print(df2_from_db)
conn.close()