215 lines
6.4 KiB
Python
215 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Any
|
|
|
|
import requests
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from requests_oauthlib import OAuth1
|
|
|
|
|
|
class PortalFornecedorSuccessResponse(BaseModel):
|
|
success: bool = True
|
|
message: str
|
|
processInstanceId: str = ""
|
|
content: dict[str, Any]
|
|
|
|
|
|
class PortalFornecedorErrorDetail(BaseModel):
|
|
message: str | None = None
|
|
|
|
|
|
def env(name: str, default: str = "") -> str:
|
|
value = os.getenv(name, default).strip()
|
|
if not value:
|
|
raise RuntimeError(f"Missing environment variable: {name}")
|
|
return value
|
|
|
|
|
|
app = FastAPI(title="Portal Fornecedor Proxy")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=os.getenv("PORTAL_FORNECEDOR_CORS_ORIGINS", "*").split(","),
|
|
allow_credentials=False,
|
|
allow_methods=["POST", "OPTIONS", "GET"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> dict[str, str]:
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post(
|
|
"/api/public/portalfornecedor/enviar",
|
|
response_model=PortalFornecedorSuccessResponse,
|
|
responses={
|
|
400: {"model": PortalFornecedorErrorDetail},
|
|
401: {"model": PortalFornecedorErrorDetail},
|
|
500: {"model": PortalFornecedorErrorDetail},
|
|
},
|
|
)
|
|
async def enviar(
|
|
arquivo: UploadFile = File(...),
|
|
targetState: int = Form(5),
|
|
comment: str = Form("Envio via portal fornecedor"),
|
|
data_abertura: str = Form(...),
|
|
emitido_por: str = Form(...),
|
|
entidade_responsavel: str = Form(...),
|
|
tipo_cadastro: str = Form(...),
|
|
emailSolicitante: str = Form(""),
|
|
cpf: str = Form(...),
|
|
tipo_documento: str = Form(""),
|
|
numero_documento: str = Form(...),
|
|
valor: str = Form(""),
|
|
justificativa: str = Form(...),
|
|
) -> PortalFornecedorSuccessResponse:
|
|
file_name = arquivo.filename or "anexo"
|
|
file_mime = arquivo.content_type or "application/octet-stream"
|
|
file_bytes = await arquivo.read()
|
|
|
|
if not file_bytes:
|
|
raise HTTPException(status_code=400, detail={"message": "Arquivo obrigatorio."})
|
|
|
|
auth = build_auth()
|
|
upload_binary(file_name, file_bytes, auth)
|
|
document_id = create_document(file_name, file_mime, auth)
|
|
|
|
process_payload = {
|
|
"targetState": targetState,
|
|
"comment": comment,
|
|
"formFields": {
|
|
"data_abertura": data_abertura,
|
|
"emitido_por": emitido_por,
|
|
"entidade_responsavel": entidade_responsavel,
|
|
"tipo_cadastro": tipo_cadastro,
|
|
"emailSolicitante": emailSolicitante,
|
|
"cpf": cpf,
|
|
"tipo_documento": tipo_documento,
|
|
"numero_documento": numero_documento,
|
|
"valor": valor,
|
|
"justificativa": justificativa,
|
|
"anexo_documento_id": str(document_id),
|
|
"anexo_documento_nome": file_name,
|
|
"anexo_documento_mime": file_mime,
|
|
},
|
|
}
|
|
|
|
response = requests.post(
|
|
process_start_endpoint(),
|
|
json=process_payload,
|
|
auth=auth,
|
|
headers={"Accept": "application/json"},
|
|
timeout=30,
|
|
)
|
|
|
|
if not response.ok:
|
|
raise HTTPException(status_code=response.status_code, detail=safe_json(response))
|
|
|
|
data = safe_json(response)
|
|
|
|
return PortalFornecedorSuccessResponse(
|
|
success=True,
|
|
message="Solicitacao enviada com sucesso.",
|
|
processInstanceId=extract_process_instance_id(data),
|
|
content=data,
|
|
)
|
|
|
|
|
|
def build_auth() -> OAuth1:
|
|
return OAuth1(
|
|
client_key=env("PORTAL_FORNECEDOR_CLIENT_KEY"),
|
|
client_secret=env("PORTAL_FORNECEDOR_CLIENT_SECRET"),
|
|
resource_owner_key=env("PORTAL_FORNECEDOR_RESOURCE_OWNER_KEY"),
|
|
resource_owner_secret=env("PORTAL_FORNECEDOR_RESOURCE_OWNER_SECRET"),
|
|
signature_method="HMAC-SHA1",
|
|
)
|
|
|
|
|
|
def base_url() -> str:
|
|
return env("PORTAL_FORNECEDOR_BASE_URL").rstrip("/")
|
|
|
|
|
|
def process_start_endpoint() -> str:
|
|
process_id = env("PORTAL_FORNECEDOR_PROCESS_ID", "FlowEssentials_LancamentodeDocumento")
|
|
return f"{base_url()}/process-management/api/v2/processes/{process_id}/start"
|
|
|
|
|
|
def upload_binary(file_name: str, file_bytes: bytes, auth: OAuth1) -> None:
|
|
response = requests.post(
|
|
f"{base_url()}/api/public/2.0/contentfiles/upload/",
|
|
params={"fileName": file_name},
|
|
data=file_bytes,
|
|
auth=auth,
|
|
headers={
|
|
"Content-Type": "application/octet-stream",
|
|
"Accept": "application/json",
|
|
},
|
|
timeout=30,
|
|
)
|
|
|
|
if not response.ok:
|
|
raise HTTPException(status_code=response.status_code, detail=safe_json(response))
|
|
|
|
|
|
def create_document(file_name: str, mime_type: str, auth: OAuth1) -> str:
|
|
payload = {
|
|
"companyId": env("PORTAL_FORNECEDOR_COMPANY_ID", "1"),
|
|
"description": file_name,
|
|
"parentId": int(env("PORTAL_FORNECEDOR_PARENT_FOLDER_ID", "10")),
|
|
"immutable": True,
|
|
"isPrivate": False,
|
|
"downloadEnabled": True,
|
|
"attachments": [{"fileName": file_name}],
|
|
"additionalComments": mime_type,
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{base_url()}/api/public/ecm/document/createDocument",
|
|
json=payload,
|
|
auth=auth,
|
|
headers={"Accept": "application/json"},
|
|
timeout=30,
|
|
)
|
|
|
|
if not response.ok:
|
|
raise HTTPException(status_code=response.status_code, detail=safe_json(response))
|
|
|
|
data = safe_json(response)
|
|
content = data.get("content", {}) if isinstance(data, dict) else {}
|
|
document_id = content.get("id") or content.get("documentId")
|
|
|
|
if not document_id:
|
|
raise HTTPException(status_code=500, detail={"message": "Fluig nao retornou documentId do anexo."})
|
|
|
|
return str(document_id)
|
|
|
|
|
|
def safe_json(response: requests.Response) -> Any:
|
|
try:
|
|
return response.json()
|
|
except Exception:
|
|
return {"message": response.text}
|
|
|
|
|
|
def extract_process_instance_id(data: Any) -> str:
|
|
if not isinstance(data, dict):
|
|
return ""
|
|
if data.get("processInstanceId"):
|
|
return str(data["processInstanceId"])
|
|
|
|
content = data.get("content")
|
|
if isinstance(content, dict):
|
|
if content.get("processInstanceId"):
|
|
return str(content["processInstanceId"])
|
|
if content.get("processInstanceid"):
|
|
return str(content["processInstanceid"])
|
|
if content.get("requestNumber"):
|
|
return str(content["requestNumber"])
|
|
|
|
return ""
|