Ruptura_Projetada/Notebook - ETL notas API Conexao.py
2025-10-24 15:54:54 -03:00

733 lines
28 KiB
Python

#!/usr/bin/env python
# coding: utf-8
# ## Notebook - ETL notas API Conexao
#
# New notebook
# In[1]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# import requests
# import json
# import pandas as pd
# from sqlalchemy import create_engine
# from datetime import datetime, timedelta
# import math
# from bs4 import BeautifulSoup
# from lxml import etree
# import xmltodict
# In[2]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def gerar_datas_inicio_fim(data_ultimo_registro):
# '''Gera uma lista de dicionários com as datas de início e fim que devem ser utilizadas como parâmetros
# na API de buscar as chaves das notas fiscais
# :param str data_ultimo_registro: Data da última emissão de nota fiscal presente no banco de dados, no formato 'YYYYMMDD'.
# :return: Lista de dicionários contendo as datas de início e fim para cada intervalo de busca na API.
# :rtype: list[dict]
# '''
# max_data_bd = datetime.strptime(data_ultimo_registro, '%Y%m%d').date()
# data_hoje = datetime.today().date()
# data_amanha = datetime.today().date() + timedelta(days=1)
# # Define a data de início para a busca, 10 dias antes da data do último registro
# data_inicio = max_data_bd - timedelta(days=10)
# qtd_dias_pendentes = (data_hoje - data_inicio).days
# # A API limita a busca de chaves no intervalo de 10 dias, portanto isso faremos x iteracoes para abranger o período desejado
# qtd_iteracoes = math.ceil(qtd_dias_pendentes / 10)
# contador = 1
# lista_datas = []
# for i in range(qtd_iteracoes):
# # Atuaiza a data de início para cada iteração
# data_inicio += timedelta(days = contador)
# # Variável que controla a data limite de busca
# contador = 10 if i < qtd_iteracoes - 1 else (data_amanha - data_inicio).days
# data_fim = data_inicio + timedelta(days = contador)
# # extende a lista com um dict que contém a data inicial e data final de cada iteração
# lista_datas.extend([{'inicio': data_inicio.strftime('%Y-%m-%d'), 'fim': data_fim.strftime('%Y-%m-%d')}])
# return lista_datas
# In[3]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # sessao = requests.Session()
# id_integracao = 'c7cd9625-4df7-4674-bfd9-109bac618b9c'
# In[4]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def autenticar(id_integracao):
# '''Realiza a autenticação com o servidor.
# Caso alguma requisição retorne 402, a função autenticar deve ser invocada novamente, gerando um novo token
# id_integracao - uuid disponibilizado na seção "administrar" no site da conexão
# Retorna o Bearer token que deve ser usado nas requisições'''
# host = 'https://api.conexaonfe.com.br/v1/'
# endpoint = 'autenticacao'
# headers = {'id-integracao': id_integracao}
# response = requests.get(host + endpoint, headers=headers)
# print(response.status_code)
# response_text = response.text
# token = json.loads(response_text)
# return token
# In[5]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def listar_chaves_nfe_recebidas(data_inicio, data_fim, token) -> list:
# '''Lista de chaves das notas fiscais eletronicas recebidas no período informado. (max 10 dias)
# data_inicio - data inicial (formato yyyy-mm-dd)
# data_fim - data final (formato yyyy-mm-dd)
# token - bearer token de autenticação
# Retorna um json com a lista das chaves'''
# host = 'https://api.conexaonfe.com.br/v1/'
# endpoint = 'nfe/recebidas/por-data-emissao/'
# url = host + endpoint + data_inicio + '/' + data_fim
# headers = {'Authorization': f'Bearer {token}'}
# response = requests.get(url, headers=headers)
# if response.status_code == 404:
# return []
# else:
# print(response.status_code)
# response_text = response.text
# lista_chaves = json.loads(response_text)
# return lista_chaves
# In[6]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def buscar_xml_por_chave(lista_chaves: list, token: str) -> list:
# '''Buscar xml das notas através das chaves.
# Só estarão disponíveis os xmls capturados pela Conexão.
# lista_chaves - lista de chaves dos xmls
# bearer token de autenticação
# Retorna uma lista de dicionários com os xmls'''
# host = 'https://api.conexaonfe.com.br/v1/'
# endpoint = 'dfes/'
# lista_dicts_xml = []
# for chave in lista_chaves:
# url = host + endpoint + chave
# headers = {'Authorization': 'Bearer ' + token}
# print(url)
# response = requests.get(url, headers=headers)
# print(response.status_code)
# response_text = response.text
# lista_dicts_xml.append(response_text)
# # lista_dicts_xml.append(json.loads(response_text))
# return lista_dicts_xml
# In[7]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# def validateAttribute(funcao, valor_se_erro):
# resultado = None
# try:
# resultado = funcao()
# except AttributeError:
# resultado = valor_se_erro
# except Exception as e:
# print(e)
# return resultado
# In[8]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# max_data = spark.sql("SELECT MAX(data_emissao) FROM DL_Ginseng.fato_notas_entrada")
# max_data = max_data.collect()[0]['max(data_emissao)'].strftime('%Y%m%d')
# In[9]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# max_data
# In[10]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_data = gerar_datas_inicio_fim(max_data)
# lista_data
# In[11]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Consumindo a API ==========================================<<<<<<<<<<<<<<<<<<<<< ALTERAR <<<<<<<<<<<<<<<<<<<<<<
# token = autenticar(id_integracao)
# chaves = []
# In[12]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# for datas in lista_data:
# response_api = listar_chaves_nfe_recebidas(datas['inicio'], datas['fim'], token['token'])
# if len(response_api) > 0:
# chaves.extend(response_api)
# # Remover duplicatas usando set() e compreensão de lista
# chaves_sem_duplicatas = [dict(t) for t in {tuple(d.items()) for d in chaves}]
# chaves_request = [(item['chaveAcesso']) for item in chaves_sem_duplicatas]
# len(chaves_request)
# In[13]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Query para retornar as chaves que já estão no banco ==========================================<<<<<<<<<<<<<<<<<<<<< ALTERAR <<<<<<<<<<<<<<<<<<<<<<
# chaves_bd = spark.sql(f"SELECT chave FROM DL_Ginseng.fato_notas_entrada WHERE data_emissao BETWEEN '{lista_data[0]['inicio']}' AND '{lista_data[-1]['fim']}' AND UPPER(nome_emissor) LIKE '%CALAMO%'""")
# In[14]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_chaves_request = pd.DataFrame(chaves_request, columns=['chave'])
# In[15]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Convertendo o DataFrame do PySpark para o DataFrame do Pandas
# df_chaves_bd_pandas = chaves_bd.toPandas()
# # Mesclando os DataFrames do Pandas
# merged_df = pd.merge(df_chaves_bd_pandas, df_chaves_request, on='chave', how='outer', indicator=True)
# In[16]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Chaves que estão no banco de dados, mas não retornaram na consulta da API
# merged_df[merged_df['_merge'] == 'left_only']
# In[17]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Chaves que serão inseridas no banco
# merged_df[merged_df['_merge'] == 'right_only']
# In[18]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# chaves_novas = merged_df[merged_df['_merge'] == 'right_only']['chave'].to_list()
# In[19]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_xml = buscar_xml_por_chave(chaves_novas, token['token'])
# In[20]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# len(chaves_novas) == len(lista_xml)
# In[21]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_notas = []
# lista_erros = []
# lista_itens_notas = []
# lista_itens_erros = []
# qtd_sucesso = 0
# for nota in lista_xml:
# # Verifique se a nota não está vazia
# if not nota.strip():
# print("Nota vazia encontrada.")
# lista_erros.append(["Nota vazia", nota])
# continue
# # Parse do XML
# try:
# xml_tree = etree.fromstring(nota.encode('utf-8'))
# except Exception as e:
# print(e)
# lista_erros.append([e, nota])
# continue
# # O resto do código continua igual
# # Parse do XML
# try:
# xml_tree = etree.fromstring(nota.encode('utf-8'))
# except Exception as e:
# print(e)
# lista_erros.append([e, nota])
# continue
# xml = BeautifulSoup(nota, 'xml')
# def buscar_elementos_filhos(elemento, elemento_pai, xpath):
# return [validateAttribute(lambda: item.find(elemento).text, None) for item in xpath.find_all(elemento_pai)]
# def validateAttribute(funcao, valor_se_erro):
# resultado = None
# try:
# resultado = funcao()
# except AttributeError:
# resultado = valor_se_erro
# except Exception as e:
# print(e)
# return resultado
# # definindo os campos do cabeçalho da nota
# lista_notas.append(
# {
# 'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
# 'cNF': validateAttribute(lambda:xml.ide.nNF.text, None),
# 'serie': validateAttribute(lambda:xml.ide.serie.text, None),
# 'data_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[0], None),
# 'hora_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[1][:-6], None),
# 'cnpj_emissor': validateAttribute(lambda:xml.emit.CNPJ.text, None),
# 'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
# 'cnpj_destinatario': validateAttribute(lambda:xml.dest.CNPJ.text, None),
# 'valor_total_produtos': validateAttribute(lambda:xml.total.ICMSTot.vProd.text, None),
# 'valor_ICMSST': validateAttribute(lambda:xml.total.ICMSTot.vST.text, None),
# 'valor_FCPST': validateAttribute(lambda:xml.total.ICMSTot.vFCPST.text, None),
# 'valor_frete': validateAttribute(lambda:xml.total.ICMSTot.vFrete.text, None),
# 'valor_seguro': validateAttribute(lambda:xml.total.ICMSTot.vSeg.text, None),
# 'valor_outras_despesas': validateAttribute(lambda:xml.total.ICMSTot.vOutro.text, None),
# 'valor_II': validateAttribute(lambda:xml.total.ICMSTot.vII.text, None), ## validar =================
# 'valor_IPI': validateAttribute(lambda:xml.total.ICMSTot.vIPI.text, None),
# 'valor_IPI_Devol': validateAttribute(lambda:xml.total.ICMSTot.vIPIDevol.text, None),
# 'valor_servicos': validateAttribute(lambda: xml.total.ISSQNtot.vServ.text, 0),
# 'valor_desconto': validateAttribute(lambda:xml.total.ICMSTot.vDesc.text, None),
# 'valor_ICMS_desonerado': validateAttribute(lambda:xml.total.ICMSTot.vICMSDeson.text, None),
# 'valor_liquido': validateAttribute(lambda:xml.total.ICMSTot.vNF.text, None),
# 'tipo_pagamento_JSON': '', # implementar JSON com os tipods de pagamento ([{tPag, vPag}, {tPag, vPag}, {tPag, vPag}])
# 'numero_fatura': validateAttribute(lambda:xml.cobr.fat.nFat.text, None),
# 'qtd_parcelas': validateAttribute(lambda: len(xml.cobr.find_all('dup')), None),
# 'duplicatas_json': '', # desenvolver JSON com as duplicatas ([{nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}])
# 'valor_ICMS': validateAttribute(lambda: xml.total.ICMSTot.vICMS.text, None), ## validar =================
# 'situacao': validateAttribute(lambda:xml.protNFe.infProt.xMotivo.text, None)
# }
# )
# # definindo os campos dos itens das notas
# qtd_itens = len(validateAttribute(lambda: xml.infNFe.find_all('det'), 0))
# lista_itens_notas.append(
# {
# 'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None),
# 'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None),
# 'n_item': validateAttribute(lambda: [item.get('nItem') for item in xml.infNFe.find_all('det')], 0),
# 'cod_produto': validateAttribute(lambda: buscar_elementos_filhos('cProd', 'det', xml.infNFe), [None] * qtd_itens),
# 'produto': validateAttribute(lambda: buscar_elementos_filhos('xProd', 'det', xml.infNFe), [None] * qtd_itens),
# 'cEAN': validateAttribute(lambda: buscar_elementos_filhos('cEAN', 'det', xml.infNFe), [None] * qtd_itens),
# 'NCM': validateAttribute(lambda: buscar_elementos_filhos('NCM', 'det', xml.infNFe), [None] * qtd_itens),
# 'CEST': validateAttribute(lambda: buscar_elementos_filhos('CEST', 'det', xml.infNFe), [None] * qtd_itens),
# 'CFOP': validateAttribute(lambda: buscar_elementos_filhos('CFOP', 'det', xml.infNFe), [None] * qtd_itens),
# 'unidade_medida': validateAttribute(lambda: buscar_elementos_filhos('uCom', 'det', xml.infNFe), [None] * qtd_itens),
# 'quantidade': validateAttribute(lambda: buscar_elementos_filhos('qCom', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_unitario': validateAttribute(lambda: buscar_elementos_filhos('vUnCom', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_total_produtos': validateAttribute(lambda: buscar_elementos_filhos('vProd', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_frete': validateAttribute(lambda: buscar_elementos_filhos('vFrete', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_seguro': validateAttribute(lambda: buscar_elementos_filhos('vSeg', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_desconto': validateAttribute(lambda: buscar_elementos_filhos('vDesc', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_outras_despesas': validateAttribute(lambda: buscar_elementos_filhos('vOutro', 'det', xml.infNFe), [None] * qtd_itens),
# 'codigo_pedido': validateAttribute(lambda: buscar_elementos_filhos('xPed', 'det', xml.infNFe), [None] * qtd_itens),
# 'cod_origem': validateAttribute(lambda: buscar_elementos_filhos('orig', 'det', xml.infNFe), [None] * qtd_itens),
# 'CST': validateAttribute(lambda: buscar_elementos_filhos('CST', 'det', xml.infNFe), [None] * qtd_itens),
# 'modalidade_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('modBC', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vBC', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_ICMS': validateAttribute(lambda: buscar_elementos_filhos('pICMS', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vICMS', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_FCP': validateAttribute(lambda: buscar_elementos_filhos('vBCFCP', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_FCP': validateAttribute(lambda: buscar_elementos_filhos('pFCP', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_FCP': validateAttribute(lambda: buscar_elementos_filhos('vFCP', 'det', xml.infNFe), [None] * qtd_itens),
# 'modalidade_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('modBCST' , 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_MVA_ST': validateAttribute(lambda: buscar_elementos_filhos('pMVAST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('vBCST', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_ICMS_ST': validateAttribute(lambda: buscar_elementos_filhos('pICMSST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_ICMSST': validateAttribute(lambda: buscar_elementos_filhos('vICMSST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_BC_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vBCFCPST', 'det', xml.infNFe), [None] * qtd_itens),
# 'aliquota_FCPST': validateAttribute(lambda: buscar_elementos_filhos('pFCPST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vFCPST', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_II': validateAttribute(lambda: buscar_elementos_filhos('vII', 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_IPI': validateAttribute(lambda: buscar_elementos_filhos('vIPI' , 'det', xml.infNFe), [None] * qtd_itens),
# 'valor_ICMS_desonerado': validateAttribute(lambda: buscar_elementos_filhos('vICMSDeson', 'det', xml.infNFe), [None] * qtd_itens)
# }
# )
# qtd_sucesso += 1
# # if qtd_sucesso + len(lista_erros) == 100:
# # break ### ENCERRAR ==== DEBUG
# print(f'qtd com erro: {len(lista_erros)}')
# print(f'qtd sucesso: {qtd_sucesso}')
# In[22]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas = pd.DataFrame.from_dict(lista_notas)
# df_notas_itens = pd.DataFrame.from_dict(lista_itens_notas)
# In[23]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# len(df_notas) == len(df_notas_itens['chave'].unique())
# In[24]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas['cNF'] = df_notas['cNF'].str.zfill(9)
# df_notas['serie'] = df_notas['serie'].str.zfill(3)
# In[25]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas['valor_total_produtos'] = df_notas['valor_total_produtos'].astype(float)
# df_notas['valor_ICMSST'] = df_notas['valor_ICMSST'].astype(float)
# df_notas['valor_FCPST'] = df_notas['valor_FCPST'].astype(float)
# df_notas['valor_frete'] = df_notas['valor_frete'].astype(float)
# df_notas['valor_seguro'] = df_notas['valor_seguro'].astype(float)
# df_notas['valor_outras_despesas'] = df_notas['valor_outras_despesas'].astype(float)
# df_notas['valor_II'] = df_notas['valor_II'].astype(float)
# df_notas['valor_IPI'] = df_notas['valor_IPI'].astype(float)
# df_notas['valor_IPI_Devol'] = df_notas['valor_IPI_Devol'].astype(float)
# df_notas['valor_servicos'] = df_notas['valor_servicos'].astype(float)
# df_notas['valor_desconto'] = df_notas['valor_desconto'].astype(float)
# df_notas['valor_ICMS_desonerado'] = df_notas['valor_ICMS_desonerado'].astype(float)
# df_notas['valor_liquido'] = df_notas['valor_liquido'].astype(float)
# df_notas['valor_ICMS'] = df_notas['valor_ICMS'].astype(float)
# In[26]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo = df_notas[df_notas['nome_emissor'].str.upper().str.startswith('CALAMO')]
# In[27]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo.iloc[0]
# In[28]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Converter o DataFrame pandas para DataFrame Spark
# df_spark = spark.createDataFrame(df_notas_calamo)
# from pyspark.sql.functions import to_date
# from pyspark.sql.functions import col
# # Converter a coluna 'data_emissao' para o tipo DateType
# df_spark = df_spark.withColumn('data_emissao', to_date(df_spark['data_emissao']))
# # Converter a coluna 'qtd_parcelas' para o tipo LongType
# df_spark = df_spark.withColumn('qtd_parcelas', col('qtd_parcelas').cast('long'))
# # Salvar o DataFrame no formato Delta
# df_spark.write.format("delta").mode("overwrite").saveAsTable("DL_Ginseng.fato_notas_entrada")
# In[29]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # df = spark.sql("SELECT DISTINCT data_emissao FROM DL_Ginseng.fato_notas_entrada ORDER BY data_emissao")
# df = spark.sql("SELECT COUNT(1) FROM DL_Ginseng.fato_notas_entrada")
# display(df)
# In[30]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# lista_explode = [
# 'n_item',
# 'cod_produto',
# 'produto',
# 'cEAN',
# 'NCM',
# 'CEST',
# 'CFOP',
# 'unidade_medida',
# 'quantidade',
# 'valor_unitario',
# 'valor_total_produtos',
# 'valor_frete',
# 'valor_seguro',
# 'valor_desconto',
# 'valor_outras_despesas',
# 'codigo_pedido',
# 'cod_origem',
# 'CST',
# 'modalidade_BC_ICMS',
# 'valor_BC_ICMS',
# 'aliquota_ICMS',
# 'valor_ICMS',
# 'valor_BC_FCP',
# 'aliquota_FCP',
# 'valor_FCP',
# 'modalidade_BC_ST',
# 'aliquota_MVA_ST',
# 'valor_BC_ST',
# 'aliquota_ICMS_ST',
# 'valor_ICMSST',
# 'valor_BC_FCPST',
# 'aliquota_FCPST',
# 'valor_FCPST',
# 'valor_II',
# 'valor_IPI',
# 'valor_ICMS_desonerado'
# ]
# In[31]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_itens.info()
# In[32]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens = df_notas_itens[df_notas_itens['nome_emissor'].str.upper().str.startswith('CALAMO')]
# In[33]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido = df_notas_calamo_itens.explode(lista_explode)
# In[34]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido.head()
# In[35]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # preenchendo com zero à equerda
# df_notas_calamo_itens_expandido['cod_produto'] = df_notas_calamo_itens_expandido['cod_produto'].str.zfill(5)
# df_notas_calamo_itens_expandido['NCM'] = df_notas_calamo_itens_expandido['NCM'].str.zfill(8)
# df_notas_calamo_itens_expandido['CEST'] = df_notas_calamo_itens_expandido['CEST'].str.zfill(7)
# df_notas_calamo_itens_expandido['CFOP'] = df_notas_calamo_itens_expandido['CFOP'].str.zfill(4)
# df_notas_calamo_itens_expandido['CST'] = df_notas_calamo_itens_expandido['CST'].str.zfill(2)
# In[36]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # inteiros
# df_notas_calamo_itens_expandido['quantidade'] = df_notas_calamo_itens_expandido['quantidade'].astype(float).astype(int)
# # decimais
# df_notas_calamo_itens_expandido['valor_unitario'] = df_notas_calamo_itens_expandido['valor_unitario'].astype(float)
# df_notas_calamo_itens_expandido['valor_total_produtos'] = df_notas_calamo_itens_expandido['valor_total_produtos'].astype(float)
# df_notas_calamo_itens_expandido['valor_frete'] = df_notas_calamo_itens_expandido['valor_frete'].astype(float)
# df_notas_calamo_itens_expandido['valor_seguro'] = df_notas_calamo_itens_expandido['valor_seguro'].astype(float)
# df_notas_calamo_itens_expandido['valor_desconto'] = df_notas_calamo_itens_expandido['valor_desconto'].astype(float)
# df_notas_calamo_itens_expandido['valor_outras_despesas'] = df_notas_calamo_itens_expandido['valor_outras_despesas'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_ICMS'] = df_notas_calamo_itens_expandido['valor_BC_ICMS'].astype(float)
# df_notas_calamo_itens_expandido['valor_ICMS'] = df_notas_calamo_itens_expandido['valor_ICMS'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_FCP'] = df_notas_calamo_itens_expandido['valor_BC_FCP'].astype(float)
# df_notas_calamo_itens_expandido['valor_FCP'] = df_notas_calamo_itens_expandido['valor_FCP'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_ST'] = df_notas_calamo_itens_expandido['valor_BC_ST'].astype(float)
# df_notas_calamo_itens_expandido['valor_ICMSST'] = df_notas_calamo_itens_expandido['valor_ICMSST'].astype(float)
# df_notas_calamo_itens_expandido['valor_BC_FCPST'] = df_notas_calamo_itens_expandido['valor_BC_FCPST'].astype(float)
# df_notas_calamo_itens_expandido['valor_FCPST'] = df_notas_calamo_itens_expandido['valor_FCPST'].astype(float)
# df_notas_calamo_itens_expandido['valor_II'] = df_notas_calamo_itens_expandido['valor_II'].astype(float)
# df_notas_calamo_itens_expandido['valor_IPI'] = df_notas_calamo_itens_expandido['valor_IPI'].astype(float)
# df_notas_calamo_itens_expandido['valor_ICMS_desonerado'] = df_notas_calamo_itens_expandido['valor_ICMS_desonerado'].astype(float)
# # percentual
# df_notas_calamo_itens_expandido['aliquota_ICMS'] = df_notas_calamo_itens_expandido['aliquota_ICMS'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_FCP'] = df_notas_calamo_itens_expandido['aliquota_FCP'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_MVA_ST'] = df_notas_calamo_itens_expandido['aliquota_MVA_ST'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_ICMS_ST'] = df_notas_calamo_itens_expandido['aliquota_ICMS_ST'].astype(float) / 100
# df_notas_calamo_itens_expandido['aliquota_FCPST'] = df_notas_calamo_itens_expandido['aliquota_FCPST'].astype(float) / 100
# In[37]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido.info()
# In[38]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# if not df_notas_calamo_itens_expandido.empty:
# print(df_notas_calamo_itens_expandido.iloc[0])
# else:
# print("O DataFrame está vazio.")
# In[39]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # for i in range(100):
# df_notas_calamo_itens_expandido.iloc[0]
# In[40]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# len(df_notas_calamo_itens_expandido['chave'].unique())
# In[41]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# df_notas_calamo_itens_expandido.drop(['nome_emissor'], axis=1, inplace=True)
# In[42]:
# The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only.
# %%pyspark
# # Conveter do DataFrame pandas para DataFrame Spark
# df_spark_itens = spark.createDataFrame(df_notas_calamo_itens_expandido)
# # Tipagem de dados
# df_spark_itens = df_spark_itens.withColumn('n_item', col('n_item').cast('long'))
# df_spark_itens = df_spark_itens.withColumn('quantidade', col('quantidade').cast('long'))
# # Salvar o DataFrame no formato Delta
# df_spark_itens.write.format("delta").mode("overwrite").saveAsTable("DL_Ginseng.fato_notas_entrada_itens")