# my_job_queue_endpoint.py import fastapi from typing import List, Optional, Any import modal from .rpa_comm import LiveStreamProductWatchRequest, LiveStreamResult, ProductSession from scalar_fastapi import get_scalar_api_reference image = (modal.Image.debian_slim() .pip_install([ "fastapi[standard]", "pydantic", "scalar-fastapi>=1.0.3", ]) .pip_install()) app = modal.App("fastapi_rpa", image=image) ############################ Modal app set############################ web_app = fastapi.FastAPI() @app.function() @modal.asgi_app() def fastapi_app(): return web_app @web_app.get("/scalar", include_in_schema=False) async def scalar_html(): return get_scalar_api_reference( openapi_url=web_app.openapi_url, title=web_app.title, ) @web_app.post("/live_watch/submit") async def submit_job_endpoint(data: LiveStreamProductWatchRequest) -> Optional[Any]: process_job = modal.Function.from_name("rpa", "rpa_run") call = process_job.spawn(data) return {"call_id": call.object_id} @web_app.get("/live_watch/result/{call_id}") async def get_job_result_endpoint(call_id: str) -> Optional[LiveStreamResult]: function_call = modal.FunctionCall.from_id(call_id) try: result = function_call.get(timeout=0) except modal.exception.OutputExpiredError: return fastapi.responses.JSONResponse(content="", status_code=404) except TimeoutError: return fastapi.responses.JSONResponse(content="", status_code=202) return result # modal serve my_job_queue_endpoint.py