fluig/Lançamento de documentos/lançamentos/portalfornecedor_proxy_example.py
2026-04-15 14:40:55 -03:00

222 lines
6.7 KiB
Python

from __future__ import annotations
import os
from typing import Any
import requests
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from requests_oauthlib import OAuth1
class PortalFornecedorSuccessResponse(BaseModel):
success: bool = True
message: str
processInstanceId: str = ""
content: dict[str, Any]
class PortalFornecedorErrorDetail(BaseModel):
message: str | None = None
def env(name: str, default: str = "") -> str:
value = os.getenv(name, default).strip()
if not value:
raise RuntimeError(f"Missing environment variable: {name}")
return value
app = FastAPI(title="Portal Fornecedor Proxy")
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("PORTAL_FORNECEDOR_CORS_ORIGINS", "*").split(","),
allow_credentials=False,
allow_methods=["POST", "OPTIONS", "GET"],
allow_headers=["*"],
)
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.post(
"/api/public/portalfornecedor/enviar",
response_model=PortalFornecedorSuccessResponse,
responses={
400: {"model": PortalFornecedorErrorDetail},
401: {"model": PortalFornecedorErrorDetail},
500: {"model": PortalFornecedorErrorDetail},
},
)
async def enviar(
arquivo: UploadFile = File(...),
targetState: int = Form(5),
comment: str = Form("Envio via portal fornecedor"),
data_abertura: str = Form(...),
emitido_por: str = Form(...),
entidade_responsavel: str = Form(...),
tipo_cadastro: str = Form(...),
emailSolicitante: str = Form(""),
cpf: str = Form(...),
tipo_documento: str = Form(""),
numero_documento: str = Form(...),
valor: str = Form(""),
autorizador_responsavel: str = Form(""),
autorizadorResponsavel: str = Form(""),
justificativa: str = Form(...),
) -> PortalFornecedorSuccessResponse:
file_name = arquivo.filename or "anexo"
file_mime = arquivo.content_type or "application/octet-stream"
file_bytes = await arquivo.read()
if not file_bytes:
raise HTTPException(status_code=400, detail={"message": "Arquivo obrigatorio."})
autorizador = (autorizador_responsavel or autorizadorResponsavel or "").strip()
if not autorizador:
raise HTTPException(status_code=400, detail={"message": "Campo obrigatorio nao informado: autorizador_responsavel."})
auth = build_auth()
upload_binary(file_name, file_bytes, auth)
document_id = create_document(file_name, file_mime, auth)
process_payload = {
"targetState": targetState,
"comment": comment,
"formFields": {
"data_abertura": data_abertura,
"emitido_por": emitido_por,
"entidade_responsavel": entidade_responsavel,
"tipo_cadastro": tipo_cadastro,
"emailSolicitante": emailSolicitante,
"cpf": cpf,
"tipo_documento": tipo_documento,
"numero_documento": numero_documento,
"valor": valor,
"autorizador_responsavel": autorizador,
"justificativa": justificativa,
"anexo_documento_id": str(document_id),
"anexo_documento_nome": file_name,
"anexo_documento_mime": file_mime,
},
}
response = requests.post(
process_start_endpoint(),
json=process_payload,
auth=auth,
headers={"Accept": "application/json"},
timeout=30,
)
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=safe_json(response))
data = safe_json(response)
return PortalFornecedorSuccessResponse(
success=True,
message="Solicitacao enviada com sucesso.",
processInstanceId=extract_process_instance_id(data),
content=data,
)
def build_auth() -> OAuth1:
return OAuth1(
client_key=env("PORTAL_FORNECEDOR_CLIENT_KEY"),
client_secret=env("PORTAL_FORNECEDOR_CLIENT_SECRET"),
resource_owner_key=env("PORTAL_FORNECEDOR_RESOURCE_OWNER_KEY"),
resource_owner_secret=env("PORTAL_FORNECEDOR_RESOURCE_OWNER_SECRET"),
signature_method="HMAC-SHA1",
)
def base_url() -> str:
return env("PORTAL_FORNECEDOR_BASE_URL").rstrip("/")
def process_start_endpoint() -> str:
process_id = env("PORTAL_FORNECEDOR_PROCESS_ID", "FlowEssentials_LancamentodeDocumento")
return f"{base_url()}/process-management/api/v2/processes/{process_id}/start"
def upload_binary(file_name: str, file_bytes: bytes, auth: OAuth1) -> None:
response = requests.post(
f"{base_url()}/api/public/2.0/contentfiles/upload/",
params={"fileName": file_name},
data=file_bytes,
auth=auth,
headers={
"Content-Type": "application/octet-stream",
"Accept": "application/json",
},
timeout=30,
)
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=safe_json(response))
def create_document(file_name: str, mime_type: str, auth: OAuth1) -> str:
payload = {
"companyId": env("PORTAL_FORNECEDOR_COMPANY_ID", "1"),
"description": file_name,
"parentId": int(env("PORTAL_FORNECEDOR_PARENT_FOLDER_ID", "10")),
"immutable": True,
"isPrivate": False,
"downloadEnabled": True,
"attachments": [{"fileName": file_name}],
"additionalComments": mime_type,
}
response = requests.post(
f"{base_url()}/api/public/ecm/document/createDocument",
json=payload,
auth=auth,
headers={"Accept": "application/json"},
timeout=30,
)
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=safe_json(response))
data = safe_json(response)
content = data.get("content", {}) if isinstance(data, dict) else {}
document_id = content.get("id") or content.get("documentId")
if not document_id:
raise HTTPException(status_code=500, detail={"message": "Fluig nao retornou documentId do anexo."})
return str(document_id)
def safe_json(response: requests.Response) -> Any:
try:
return response.json()
except Exception:
return {"message": response.text}
def extract_process_instance_id(data: Any) -> str:
if not isinstance(data, dict):
return ""
if data.get("processInstanceId"):
return str(data["processInstanceId"])
content = data.get("content")
if isinstance(content, dict):
if content.get("processInstanceId"):
return str(content["processInstanceId"])
if content.get("processInstanceid"):
return str(content["processInstanceid"])
if content.get("requestNumber"):
return str(content["requestNumber"])
return ""