diff --git a/src/cluster/rpa.py b/src/cluster/rpa.py index 7125e53..1361161 100644 --- a/src/cluster/rpa.py +++ b/src/cluster/rpa.py @@ -273,6 +273,8 @@ with rpa_image.imports(): :return: Dictionary containing title, live_id, product sessions, total duration, cut commands, and error """ + logger.debug(f"input data: {data}") + webcast_id: str = data.live_id logger.info(f"Starting RPA run for webcast_id: {webcast_id}") result = LiveStreamResult(live_id=webcast_id) diff --git a/src/cluster/rpa_comm.py b/src/cluster/rpa_comm.py index 263e48c..037c604 100644 --- a/src/cluster/rpa_comm.py +++ b/src/cluster/rpa_comm.py @@ -1,15 +1,17 @@ +import time + from pydantic import BaseModel, Field from typing import List, Optional, Any - +from ulid import ULID class LiveStreamProductWatchRequest(BaseModel): """ 直播流商品观测 """ - live_stream_name: Optional[str] = Field(None, description="直播间推流名称") + live_stream_name: Optional[str] = Field(str(ULID()), description="直播间推流名称约定ULID:参考https://github.com/ulid/spec") """直播间推流名称""" - live_stream_start_time: Optional[int] = 0 + live_stream_start_time: Optional[int] = Field(int(time.time() * 1000), description="直播推流开始世界时间(毫秒时间戳)") """直播推流开始世界时间(毫秒时间戳)""" live_stream_url: Optional[str] = None """直播间推流链接""" @@ -17,6 +19,8 @@ class LiveStreamProductWatchRequest(BaseModel): """直播间id""" cookie_string: Optional[str] = None """cookie string ,split('; ') """ + live_cut_url: Optional[str] = None + """当前商品直播开始(一段时间)的截图/视频""" ext_data: Optional[Any] = None """扩展字段""" @@ -36,9 +40,9 @@ class ProductSession(BaseModel): product_img: Optional[str] = None """商品图片url""" start_canvas_img: Optional[str] = None - """当前商品直播开始截图""" + """当前商品直播开始截图/视频""" end_canvas_img: Optional[str] = None - """当前商品直播结束截图""" + """当前商品直播结束截图/视频""" ext_data: Optional[Any] = None """扩展字段""" diff --git a/src/cluster/rpa_web_end_point.py b/src/cluster/rpa_web_end_point.py index 5d16736..f524d84 100644 --- a/src/cluster/rpa_web_end_point.py +++ b/src/cluster/rpa_web_end_point.py @@ -13,6 +13,7 @@ image = (modal.Image.debian_slim() "pydantic", "scalar-fastapi>=1.0.3", "loguru>=0.7.3", + "python-ulid>=3.0.0", ]) .pip_install()) app = modal.App("fastapi_rpa", image=image) @@ -38,6 +39,7 @@ async def scalar_html(): @web_app.post("/live_watch/submit",name="直播观测提交任务",description="监测直播侧发起,观测直播商品&切条") async def submit_job_endpoint(data: LiveStreamProductWatchRequest) -> Optional[Any]: + logger.debug(f"input data: {data}") process_job = modal.Function.from_name("rpa", "rpa_run") call = process_job.spawn(data) @@ -45,19 +47,17 @@ async def submit_job_endpoint(data: LiveStreamProductWatchRequest) -> Optional[A @web_app.get("/live_watch/result/{call_id}",name="查询直播观测任务结果",description="异步查询") -async def get_job_result_endpoint(call_id: str) -> Any: +async def get_job_result_endpoint(call_id: str) -> Optional[LiveStreamResult]: + logger.debug(f"input call_id: {call_id}") + function_call = modal.FunctionCall.from_id(call_id) try: result = function_call.get(timeout=0) - except modal.exception.OutputExpiredError: - return fastapi.responses.JSONResponse(content="", status_code=404) - except TimeoutError: - return fastapi.responses.JSONResponse(content="", status_code=202) + except BaseException as e: + logger.error(f"Exception: {e}") + return None logger.debug("res:",result) - if len(result) == 0: - return fastapi.responses.JSONResponse(content=LIVE_WATCH_TASK_DOING, status_code=200) - return result # modal serve my_job_queue_endpoint.py