#!/usr/bin/env python # coding: utf-8 # ## Notebook - ETL notas API Conexao # # New notebook # In[1]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # import requests # import json # import pandas as pd # from sqlalchemy import create_engine # from datetime import datetime, timedelta # import math # from bs4 import BeautifulSoup # from lxml import etree # import xmltodict # In[2]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # def gerar_datas_inicio_fim(data_ultimo_registro): # '''Gera uma lista de dicionários com as datas de início e fim que devem ser utilizadas como parâmetros # na API de buscar as chaves das notas fiscais # :param str data_ultimo_registro: Data da última emissão de nota fiscal presente no banco de dados, no formato 'YYYYMMDD'. # :return: Lista de dicionários contendo as datas de início e fim para cada intervalo de busca na API. # :rtype: list[dict] # ''' # max_data_bd = datetime.strptime(data_ultimo_registro, '%Y%m%d').date() # data_hoje = datetime.today().date() # data_amanha = datetime.today().date() + timedelta(days=1) # # Define a data de início para a busca, 10 dias antes da data do último registro # data_inicio = max_data_bd - timedelta(days=10) # qtd_dias_pendentes = (data_hoje - data_inicio).days # # A API limita a busca de chaves no intervalo de 10 dias, portanto isso faremos x iteracoes para abranger o período desejado # qtd_iteracoes = math.ceil(qtd_dias_pendentes / 10) # contador = 1 # lista_datas = [] # for i in range(qtd_iteracoes): # # Atuaiza a data de início para cada iteração # data_inicio += timedelta(days = contador) # # Variável que controla a data limite de busca # contador = 10 if i < qtd_iteracoes - 1 else (data_amanha - data_inicio).days # data_fim = data_inicio + timedelta(days = contador) # # extende a lista com um dict que contém a data inicial e data final de cada iteração # lista_datas.extend([{'inicio': data_inicio.strftime('%Y-%m-%d'), 'fim': data_fim.strftime('%Y-%m-%d')}]) # return lista_datas # In[3]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # sessao = requests.Session() # id_integracao = 'c7cd9625-4df7-4674-bfd9-109bac618b9c' # In[4]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # def autenticar(id_integracao): # '''Realiza a autenticação com o servidor. # Caso alguma requisição retorne 402, a função autenticar deve ser invocada novamente, gerando um novo token # id_integracao - uuid disponibilizado na seção "administrar" no site da conexão # Retorna o Bearer token que deve ser usado nas requisições''' # host = 'https://api.conexaonfe.com.br/v1/' # endpoint = 'autenticacao' # headers = {'id-integracao': id_integracao} # response = requests.get(host + endpoint, headers=headers) # print(response.status_code) # response_text = response.text # token = json.loads(response_text) # return token # In[5]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # def listar_chaves_nfe_recebidas(data_inicio, data_fim, token) -> list: # '''Lista de chaves das notas fiscais eletronicas recebidas no período informado. (max 10 dias) # data_inicio - data inicial (formato yyyy-mm-dd) # data_fim - data final (formato yyyy-mm-dd) # token - bearer token de autenticação # Retorna um json com a lista das chaves''' # host = 'https://api.conexaonfe.com.br/v1/' # endpoint = 'nfe/recebidas/por-data-emissao/' # url = host + endpoint + data_inicio + '/' + data_fim # headers = {'Authorization': f'Bearer {token}'} # response = requests.get(url, headers=headers) # if response.status_code == 404: # return [] # else: # print(response.status_code) # response_text = response.text # lista_chaves = json.loads(response_text) # return lista_chaves # In[6]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # def buscar_xml_por_chave(lista_chaves: list, token: str) -> list: # '''Buscar xml das notas através das chaves. # Só estarão disponíveis os xmls capturados pela Conexão. # lista_chaves - lista de chaves dos xmls # bearer token de autenticação # Retorna uma lista de dicionários com os xmls''' # host = 'https://api.conexaonfe.com.br/v1/' # endpoint = 'dfes/' # lista_dicts_xml = [] # for chave in lista_chaves: # url = host + endpoint + chave # headers = {'Authorization': 'Bearer ' + token} # print(url) # response = requests.get(url, headers=headers) # print(response.status_code) # response_text = response.text # lista_dicts_xml.append(response_text) # # lista_dicts_xml.append(json.loads(response_text)) # return lista_dicts_xml # In[7]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # def validateAttribute(funcao, valor_se_erro): # resultado = None # try: # resultado = funcao() # except AttributeError: # resultado = valor_se_erro # except Exception as e: # print(e) # return resultado # In[8]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # max_data = spark.sql("SELECT MAX(data_emissao) FROM DL_Ginseng.fato_notas_entrada") # max_data = max_data.collect()[0]['max(data_emissao)'].strftime('%Y%m%d') # In[9]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # max_data # In[10]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # lista_data = gerar_datas_inicio_fim(max_data) # lista_data # In[11]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Consumindo a API ==========================================<<<<<<<<<<<<<<<<<<<<< ALTERAR <<<<<<<<<<<<<<<<<<<<<< # token = autenticar(id_integracao) # chaves = [] # In[12]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # for datas in lista_data: # response_api = listar_chaves_nfe_recebidas(datas['inicio'], datas['fim'], token['token']) # if len(response_api) > 0: # chaves.extend(response_api) # # Remover duplicatas usando set() e compreensão de lista # chaves_sem_duplicatas = [dict(t) for t in {tuple(d.items()) for d in chaves}] # chaves_request = [(item['chaveAcesso']) for item in chaves_sem_duplicatas] # len(chaves_request) # In[13]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Query para retornar as chaves que já estão no banco ==========================================<<<<<<<<<<<<<<<<<<<<< ALTERAR <<<<<<<<<<<<<<<<<<<<<< # chaves_bd = spark.sql(f"SELECT chave FROM DL_Ginseng.fato_notas_entrada WHERE data_emissao BETWEEN '{lista_data[0]['inicio']}' AND '{lista_data[-1]['fim']}' AND UPPER(nome_emissor) LIKE '%CALAMO%'""") # In[14]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_chaves_request = pd.DataFrame(chaves_request, columns=['chave']) # In[15]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Convertendo o DataFrame do PySpark para o DataFrame do Pandas # df_chaves_bd_pandas = chaves_bd.toPandas() # # Mesclando os DataFrames do Pandas # merged_df = pd.merge(df_chaves_bd_pandas, df_chaves_request, on='chave', how='outer', indicator=True) # In[16]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Chaves que estão no banco de dados, mas não retornaram na consulta da API # merged_df[merged_df['_merge'] == 'left_only'] # In[17]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Chaves que serão inseridas no banco # merged_df[merged_df['_merge'] == 'right_only'] # In[18]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # chaves_novas = merged_df[merged_df['_merge'] == 'right_only']['chave'].to_list() # In[19]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # lista_xml = buscar_xml_por_chave(chaves_novas, token['token']) # In[20]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # len(chaves_novas) == len(lista_xml) # In[21]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # lista_notas = [] # lista_erros = [] # lista_itens_notas = [] # lista_itens_erros = [] # qtd_sucesso = 0 # for nota in lista_xml: # # Verifique se a nota não está vazia # if not nota.strip(): # print("Nota vazia encontrada.") # lista_erros.append(["Nota vazia", nota]) # continue # # Parse do XML # try: # xml_tree = etree.fromstring(nota.encode('utf-8')) # except Exception as e: # print(e) # lista_erros.append([e, nota]) # continue # # O resto do código continua igual # # Parse do XML # try: # xml_tree = etree.fromstring(nota.encode('utf-8')) # except Exception as e: # print(e) # lista_erros.append([e, nota]) # continue # xml = BeautifulSoup(nota, 'xml') # def buscar_elementos_filhos(elemento, elemento_pai, xpath): # return [validateAttribute(lambda: item.find(elemento).text, None) for item in xpath.find_all(elemento_pai)] # def validateAttribute(funcao, valor_se_erro): # resultado = None # try: # resultado = funcao() # except AttributeError: # resultado = valor_se_erro # except Exception as e: # print(e) # return resultado # # definindo os campos do cabeçalho da nota # lista_notas.append( # { # 'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None), # 'cNF': validateAttribute(lambda:xml.ide.nNF.text, None), # 'serie': validateAttribute(lambda:xml.ide.serie.text, None), # 'data_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[0], None), # 'hora_emissao': validateAttribute(lambda:xml.ide.dhEmi.text.split('T')[1][:-6], None), # 'cnpj_emissor': validateAttribute(lambda:xml.emit.CNPJ.text, None), # 'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None), # 'cnpj_destinatario': validateAttribute(lambda:xml.dest.CNPJ.text, None), # 'valor_total_produtos': validateAttribute(lambda:xml.total.ICMSTot.vProd.text, None), # 'valor_ICMSST': validateAttribute(lambda:xml.total.ICMSTot.vST.text, None), # 'valor_FCPST': validateAttribute(lambda:xml.total.ICMSTot.vFCPST.text, None), # 'valor_frete': validateAttribute(lambda:xml.total.ICMSTot.vFrete.text, None), # 'valor_seguro': validateAttribute(lambda:xml.total.ICMSTot.vSeg.text, None), # 'valor_outras_despesas': validateAttribute(lambda:xml.total.ICMSTot.vOutro.text, None), # 'valor_II': validateAttribute(lambda:xml.total.ICMSTot.vII.text, None), ## validar ================= # 'valor_IPI': validateAttribute(lambda:xml.total.ICMSTot.vIPI.text, None), # 'valor_IPI_Devol': validateAttribute(lambda:xml.total.ICMSTot.vIPIDevol.text, None), # 'valor_servicos': validateAttribute(lambda: xml.total.ISSQNtot.vServ.text, 0), # 'valor_desconto': validateAttribute(lambda:xml.total.ICMSTot.vDesc.text, None), # 'valor_ICMS_desonerado': validateAttribute(lambda:xml.total.ICMSTot.vICMSDeson.text, None), # 'valor_liquido': validateAttribute(lambda:xml.total.ICMSTot.vNF.text, None), # 'tipo_pagamento_JSON': '', # implementar JSON com os tipods de pagamento ([{tPag, vPag}, {tPag, vPag}, {tPag, vPag}]) # 'numero_fatura': validateAttribute(lambda:xml.cobr.fat.nFat.text, None), # 'qtd_parcelas': validateAttribute(lambda: len(xml.cobr.find_all('dup')), None), # 'duplicatas_json': '', # desenvolver JSON com as duplicatas ([{nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}, {nDup, dVenc, vDup}]) # 'valor_ICMS': validateAttribute(lambda: xml.total.ICMSTot.vICMS.text, None), ## validar ================= # 'situacao': validateAttribute(lambda:xml.protNFe.infProt.xMotivo.text, None) # } # ) # # definindo os campos dos itens das notas # qtd_itens = len(validateAttribute(lambda: xml.infNFe.find_all('det'), 0)) # lista_itens_notas.append( # { # 'chave': validateAttribute(lambda: xml.protNFe.infProt.chNFe.text, None), # 'nome_emissor': validateAttribute(lambda:xml.emit.xNome.text, None), # 'n_item': validateAttribute(lambda: [item.get('nItem') for item in xml.infNFe.find_all('det')], 0), # 'cod_produto': validateAttribute(lambda: buscar_elementos_filhos('cProd', 'det', xml.infNFe), [None] * qtd_itens), # 'produto': validateAttribute(lambda: buscar_elementos_filhos('xProd', 'det', xml.infNFe), [None] * qtd_itens), # 'cEAN': validateAttribute(lambda: buscar_elementos_filhos('cEAN', 'det', xml.infNFe), [None] * qtd_itens), # 'NCM': validateAttribute(lambda: buscar_elementos_filhos('NCM', 'det', xml.infNFe), [None] * qtd_itens), # 'CEST': validateAttribute(lambda: buscar_elementos_filhos('CEST', 'det', xml.infNFe), [None] * qtd_itens), # 'CFOP': validateAttribute(lambda: buscar_elementos_filhos('CFOP', 'det', xml.infNFe), [None] * qtd_itens), # 'unidade_medida': validateAttribute(lambda: buscar_elementos_filhos('uCom', 'det', xml.infNFe), [None] * qtd_itens), # 'quantidade': validateAttribute(lambda: buscar_elementos_filhos('qCom', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_unitario': validateAttribute(lambda: buscar_elementos_filhos('vUnCom', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_total_produtos': validateAttribute(lambda: buscar_elementos_filhos('vProd', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_frete': validateAttribute(lambda: buscar_elementos_filhos('vFrete', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_seguro': validateAttribute(lambda: buscar_elementos_filhos('vSeg', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_desconto': validateAttribute(lambda: buscar_elementos_filhos('vDesc', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_outras_despesas': validateAttribute(lambda: buscar_elementos_filhos('vOutro', 'det', xml.infNFe), [None] * qtd_itens), # 'codigo_pedido': validateAttribute(lambda: buscar_elementos_filhos('xPed', 'det', xml.infNFe), [None] * qtd_itens), # 'cod_origem': validateAttribute(lambda: buscar_elementos_filhos('orig', 'det', xml.infNFe), [None] * qtd_itens), # 'CST': validateAttribute(lambda: buscar_elementos_filhos('CST', 'det', xml.infNFe), [None] * qtd_itens), # 'modalidade_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('modBC', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_BC_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vBC', 'det', xml.infNFe), [None] * qtd_itens), # 'aliquota_ICMS': validateAttribute(lambda: buscar_elementos_filhos('pICMS', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_ICMS': validateAttribute(lambda: buscar_elementos_filhos('vICMS', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_BC_FCP': validateAttribute(lambda: buscar_elementos_filhos('vBCFCP', 'det', xml.infNFe), [None] * qtd_itens), # 'aliquota_FCP': validateAttribute(lambda: buscar_elementos_filhos('pFCP', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_FCP': validateAttribute(lambda: buscar_elementos_filhos('vFCP', 'det', xml.infNFe), [None] * qtd_itens), # 'modalidade_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('modBCST' , 'det', xml.infNFe), [None] * qtd_itens), # 'aliquota_MVA_ST': validateAttribute(lambda: buscar_elementos_filhos('pMVAST', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_BC_ST': validateAttribute(lambda: buscar_elementos_filhos('vBCST', 'det', xml.infNFe), [None] * qtd_itens), # 'aliquota_ICMS_ST': validateAttribute(lambda: buscar_elementos_filhos('pICMSST', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_ICMSST': validateAttribute(lambda: buscar_elementos_filhos('vICMSST', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_BC_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vBCFCPST', 'det', xml.infNFe), [None] * qtd_itens), # 'aliquota_FCPST': validateAttribute(lambda: buscar_elementos_filhos('pFCPST', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_FCPST': validateAttribute(lambda: buscar_elementos_filhos('vFCPST', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_II': validateAttribute(lambda: buscar_elementos_filhos('vII', 'det', xml.infNFe), [None] * qtd_itens), # 'valor_IPI': validateAttribute(lambda: buscar_elementos_filhos('vIPI' , 'det', xml.infNFe), [None] * qtd_itens), # 'valor_ICMS_desonerado': validateAttribute(lambda: buscar_elementos_filhos('vICMSDeson', 'det', xml.infNFe), [None] * qtd_itens) # } # ) # qtd_sucesso += 1 # # if qtd_sucesso + len(lista_erros) == 100: # # break ### ENCERRAR ==== DEBUG # print(f'qtd com erro: {len(lista_erros)}') # print(f'qtd sucesso: {qtd_sucesso}') # In[22]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas = pd.DataFrame.from_dict(lista_notas) # df_notas_itens = pd.DataFrame.from_dict(lista_itens_notas) # In[23]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # len(df_notas) == len(df_notas_itens['chave'].unique()) # In[24]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas['cNF'] = df_notas['cNF'].str.zfill(9) # df_notas['serie'] = df_notas['serie'].str.zfill(3) # In[25]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas['valor_total_produtos'] = df_notas['valor_total_produtos'].astype(float) # df_notas['valor_ICMSST'] = df_notas['valor_ICMSST'].astype(float) # df_notas['valor_FCPST'] = df_notas['valor_FCPST'].astype(float) # df_notas['valor_frete'] = df_notas['valor_frete'].astype(float) # df_notas['valor_seguro'] = df_notas['valor_seguro'].astype(float) # df_notas['valor_outras_despesas'] = df_notas['valor_outras_despesas'].astype(float) # df_notas['valor_II'] = df_notas['valor_II'].astype(float) # df_notas['valor_IPI'] = df_notas['valor_IPI'].astype(float) # df_notas['valor_IPI_Devol'] = df_notas['valor_IPI_Devol'].astype(float) # df_notas['valor_servicos'] = df_notas['valor_servicos'].astype(float) # df_notas['valor_desconto'] = df_notas['valor_desconto'].astype(float) # df_notas['valor_ICMS_desonerado'] = df_notas['valor_ICMS_desonerado'].astype(float) # df_notas['valor_liquido'] = df_notas['valor_liquido'].astype(float) # df_notas['valor_ICMS'] = df_notas['valor_ICMS'].astype(float) # In[26]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo = df_notas[df_notas['nome_emissor'].str.upper().str.startswith('CALAMO')] # In[27]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo.iloc[0] # In[28]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Converter o DataFrame pandas para DataFrame Spark # df_spark = spark.createDataFrame(df_notas_calamo) # from pyspark.sql.functions import to_date # from pyspark.sql.functions import col # # Converter a coluna 'data_emissao' para o tipo DateType # df_spark = df_spark.withColumn('data_emissao', to_date(df_spark['data_emissao'])) # # Converter a coluna 'qtd_parcelas' para o tipo LongType # df_spark = df_spark.withColumn('qtd_parcelas', col('qtd_parcelas').cast('long')) # # Salvar o DataFrame no formato Delta # df_spark.write.format("delta").mode("overwrite").saveAsTable("DL_Ginseng.fato_notas_entrada") # In[29]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # df = spark.sql("SELECT DISTINCT data_emissao FROM DL_Ginseng.fato_notas_entrada ORDER BY data_emissao") # df = spark.sql("SELECT COUNT(1) FROM DL_Ginseng.fato_notas_entrada") # display(df) # In[30]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # lista_explode = [ # 'n_item', # 'cod_produto', # 'produto', # 'cEAN', # 'NCM', # 'CEST', # 'CFOP', # 'unidade_medida', # 'quantidade', # 'valor_unitario', # 'valor_total_produtos', # 'valor_frete', # 'valor_seguro', # 'valor_desconto', # 'valor_outras_despesas', # 'codigo_pedido', # 'cod_origem', # 'CST', # 'modalidade_BC_ICMS', # 'valor_BC_ICMS', # 'aliquota_ICMS', # 'valor_ICMS', # 'valor_BC_FCP', # 'aliquota_FCP', # 'valor_FCP', # 'modalidade_BC_ST', # 'aliquota_MVA_ST', # 'valor_BC_ST', # 'aliquota_ICMS_ST', # 'valor_ICMSST', # 'valor_BC_FCPST', # 'aliquota_FCPST', # 'valor_FCPST', # 'valor_II', # 'valor_IPI', # 'valor_ICMS_desonerado' # ] # In[31]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_itens.info() # In[32]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo_itens = df_notas_itens[df_notas_itens['nome_emissor'].str.upper().str.startswith('CALAMO')] # In[33]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo_itens_expandido = df_notas_calamo_itens.explode(lista_explode) # In[34]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo_itens_expandido.head() # In[35]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # preenchendo com zero à equerda # df_notas_calamo_itens_expandido['cod_produto'] = df_notas_calamo_itens_expandido['cod_produto'].str.zfill(5) # df_notas_calamo_itens_expandido['NCM'] = df_notas_calamo_itens_expandido['NCM'].str.zfill(8) # df_notas_calamo_itens_expandido['CEST'] = df_notas_calamo_itens_expandido['CEST'].str.zfill(7) # df_notas_calamo_itens_expandido['CFOP'] = df_notas_calamo_itens_expandido['CFOP'].str.zfill(4) # df_notas_calamo_itens_expandido['CST'] = df_notas_calamo_itens_expandido['CST'].str.zfill(2) # In[36]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # inteiros # df_notas_calamo_itens_expandido['quantidade'] = df_notas_calamo_itens_expandido['quantidade'].astype(float).astype(int) # # decimais # df_notas_calamo_itens_expandido['valor_unitario'] = df_notas_calamo_itens_expandido['valor_unitario'].astype(float) # df_notas_calamo_itens_expandido['valor_total_produtos'] = df_notas_calamo_itens_expandido['valor_total_produtos'].astype(float) # df_notas_calamo_itens_expandido['valor_frete'] = df_notas_calamo_itens_expandido['valor_frete'].astype(float) # df_notas_calamo_itens_expandido['valor_seguro'] = df_notas_calamo_itens_expandido['valor_seguro'].astype(float) # df_notas_calamo_itens_expandido['valor_desconto'] = df_notas_calamo_itens_expandido['valor_desconto'].astype(float) # df_notas_calamo_itens_expandido['valor_outras_despesas'] = df_notas_calamo_itens_expandido['valor_outras_despesas'].astype(float) # df_notas_calamo_itens_expandido['valor_BC_ICMS'] = df_notas_calamo_itens_expandido['valor_BC_ICMS'].astype(float) # df_notas_calamo_itens_expandido['valor_ICMS'] = df_notas_calamo_itens_expandido['valor_ICMS'].astype(float) # df_notas_calamo_itens_expandido['valor_BC_FCP'] = df_notas_calamo_itens_expandido['valor_BC_FCP'].astype(float) # df_notas_calamo_itens_expandido['valor_FCP'] = df_notas_calamo_itens_expandido['valor_FCP'].astype(float) # df_notas_calamo_itens_expandido['valor_BC_ST'] = df_notas_calamo_itens_expandido['valor_BC_ST'].astype(float) # df_notas_calamo_itens_expandido['valor_ICMSST'] = df_notas_calamo_itens_expandido['valor_ICMSST'].astype(float) # df_notas_calamo_itens_expandido['valor_BC_FCPST'] = df_notas_calamo_itens_expandido['valor_BC_FCPST'].astype(float) # df_notas_calamo_itens_expandido['valor_FCPST'] = df_notas_calamo_itens_expandido['valor_FCPST'].astype(float) # df_notas_calamo_itens_expandido['valor_II'] = df_notas_calamo_itens_expandido['valor_II'].astype(float) # df_notas_calamo_itens_expandido['valor_IPI'] = df_notas_calamo_itens_expandido['valor_IPI'].astype(float) # df_notas_calamo_itens_expandido['valor_ICMS_desonerado'] = df_notas_calamo_itens_expandido['valor_ICMS_desonerado'].astype(float) # # percentual # df_notas_calamo_itens_expandido['aliquota_ICMS'] = df_notas_calamo_itens_expandido['aliquota_ICMS'].astype(float) / 100 # df_notas_calamo_itens_expandido['aliquota_FCP'] = df_notas_calamo_itens_expandido['aliquota_FCP'].astype(float) / 100 # df_notas_calamo_itens_expandido['aliquota_MVA_ST'] = df_notas_calamo_itens_expandido['aliquota_MVA_ST'].astype(float) / 100 # df_notas_calamo_itens_expandido['aliquota_ICMS_ST'] = df_notas_calamo_itens_expandido['aliquota_ICMS_ST'].astype(float) / 100 # df_notas_calamo_itens_expandido['aliquota_FCPST'] = df_notas_calamo_itens_expandido['aliquota_FCPST'].astype(float) / 100 # In[37]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo_itens_expandido.info() # In[38]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # if not df_notas_calamo_itens_expandido.empty: # print(df_notas_calamo_itens_expandido.iloc[0]) # else: # print("O DataFrame está vazio.") # In[39]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # for i in range(100): # df_notas_calamo_itens_expandido.iloc[0] # In[40]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # len(df_notas_calamo_itens_expandido['chave'].unique()) # In[41]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # df_notas_calamo_itens_expandido.drop(['nome_emissor'], axis=1, inplace=True) # In[42]: # The command is not a standard IPython magic command. It is designed for use within Fabric notebooks only. # %%pyspark # # Conveter do DataFrame pandas para DataFrame Spark # df_spark_itens = spark.createDataFrame(df_notas_calamo_itens_expandido) # # Tipagem de dados # df_spark_itens = df_spark_itens.withColumn('n_item', col('n_item').cast('long')) # df_spark_itens = df_spark_itens.withColumn('quantidade', col('quantidade').cast('long')) # # Salvar o DataFrame no formato Delta # df_spark_itens.write.format("delta").mode("overwrite").saveAsTable("DL_Ginseng.fato_notas_entrada_itens")