From 48adb98969e4a90cbac68f8a663ff4508cd64677 Mon Sep 17 00:00:00 2001 From: zjf Date: Mon, 26 May 2025 17:06:11 +0800 Subject: [PATCH] rpa --- src/__init__.py | 0 src/cluster/rpa.md | 5 ++- src/cluster/rpa.py | 43 ++-------------------- src/cluster/rpa_comm.py | 62 ++++++++++++++++++++++++++++++++ src/cluster/rpa_web_end_point.py | 42 ++++++++++++++++++++++ 5 files changed, 110 insertions(+), 42 deletions(-) create mode 100644 src/__init__.py create mode 100644 src/cluster/rpa_comm.py create mode 100644 src/cluster/rpa_web_end_point.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cluster/rpa.md b/src/cluster/rpa.md index e8e0c37..c974ce1 100644 --- a/src/cluster/rpa.md +++ b/src/cluster/rpa.md @@ -1,4 +1,7 @@ - +modal setup +```shell +python -m modal setup +``` deploy by env ```shell diff --git a/src/cluster/rpa.py b/src/cluster/rpa.py index db56251..b3f1aa0 100644 --- a/src/cluster/rpa.py +++ b/src/cluster/rpa.py @@ -11,6 +11,8 @@ from playwright.async_api import async_playwright, Browser, Page from pydantic import BaseModel from tenacity import retry, stop_after_attempt, wait_fixed from ulid import ULID +from .rpa_comm import LiveStreamProductWatchRequest,LiveStreamResult,ProductSession + rpa_image = (modal.Image.debian_slim(python_version="3.11") .pip_install_from_pyproject("../pyproject_rpa.toml") @@ -25,47 +27,6 @@ app = modal.App(name='rpa', image=rpa_image, include_source=False) ############################ Modal app set############################ -# Define Pydantic models -class ProductSession(BaseModel): - start_time: Optional[int] = 0 - """当前商品直播开始世界时间(毫秒时间戳)""" - end_time: Optional[int] = 0 - """当前商品直播结束世界时间(毫秒时间戳)""" - duration: Optional[int] = 0 - """当前商品直播持续时间(毫秒)""" - product_name: Optional[str] = None - """商品名称""" - product_id: Optional[str] = None - """商品id""" - product_img: Optional[str] = None - """商品图片url""" - start_canvas_img: Optional[str] = None - """当前商品直播开始截图""" - end_canvas_img: Optional[str] = None - """当前商品直播结束截图""" - ext_data: Optional[Any] = None - """扩展字段""" - - -class LiveStreamResult(BaseModel): - title: Optional[str] = None - """直播间名称""" - live_id: Optional[str] = None - """直播间id""" - start_time: Optional[int] = 0 - """直播开始世界时间(毫秒时间戳)""" - product_sessions: List[ProductSession] = [] - """直播所有商品记录""" - duration: Optional[int] = 0 - """直播持续时间""" - cut_commands: List[str] = [] - """冗余字段""" - ext_data: Optional[Any] = None - """扩展字段""" - error: Optional[str] = None - msg: Optional[str] = None - - EXPLAINING: str = "讲解中" LIVE_END: str = "直播已结束" diff --git a/src/cluster/rpa_comm.py b/src/cluster/rpa_comm.py new file mode 100644 index 0000000..cc870ed --- /dev/null +++ b/src/cluster/rpa_comm.py @@ -0,0 +1,62 @@ +from pydantic import BaseModel +from typing import List, Optional,Any + +class LiveStreamProductWatchRequest(BaseModel): + """ + 直播流商品观测 + """ + + live_stream_name: Optional[str] = None + """直播间推流名称""" + live_stream_start_time: Optional[int] = 0 + """直播推流开始世界时间(毫秒时间戳)""" + live_stream_url: Optional[str] = None + """直播间推流链接""" + live_id: Optional[str] = None + """直播间id""" + cookie_string: Optional[str] = None + """cookie string ,split('; ') """ + ext_data: Optional[Any] = None + """扩展字段""" + + +# Define Pydantic models +class ProductSession(BaseModel): + start_time: Optional[int] = 0 + """当前商品直播开始世界时间(毫秒时间戳)""" + end_time: Optional[int] = 0 + """当前商品直播结束世界时间(毫秒时间戳)""" + duration: Optional[int] = 0 + """当前商品直播持续时间(毫秒)""" + product_name: Optional[str] = None + """商品名称""" + product_id: Optional[str] = None + """商品id""" + product_img: Optional[str] = None + """商品图片url""" + start_canvas_img: Optional[str] = None + """当前商品直播开始截图""" + end_canvas_img: Optional[str] = None + """当前商品直播结束截图""" + ext_data: Optional[Any] = None + """扩展字段""" + + +class LiveStreamResult(BaseModel): + title: Optional[str] = None + """直播间名称""" + live_id: Optional[str] = None + """直播间id""" + start_time: Optional[int] = 0 + """直播开始世界时间(毫秒时间戳)""" + product_sessions: List[ProductSession] = [] + """直播所有商品记录""" + duration: Optional[int] = 0 + """直播持续时间""" + cut_commands: List[str] = [] + """冗余字段""" + ext_data: Optional[Any] = None + """扩展字段""" + error: Optional[str] = None + msg: Optional[str] = None + diff --git a/src/cluster/rpa_web_end_point.py b/src/cluster/rpa_web_end_point.py new file mode 100644 index 0000000..9280f28 --- /dev/null +++ b/src/cluster/rpa_web_end_point.py @@ -0,0 +1,42 @@ +# my_job_queue_endpoint.py +import fastapi +from typing import List, Optional,Any +import modal +from .rpa_comm import LiveStreamProductWatchRequest,LiveStreamResult,ProductSession + + +image = modal.Image.debian_slim().pip_install("fastapi[standard]") +app = modal.App("fastapi_rpa", image=image) + +############################ Modal app set############################ + +web_app = fastapi.FastAPI() + + +@app.function() +@modal.asgi_app() +def fastapi_app(): + return web_app + + +@web_app.post("/submit") +async def submit_job_endpoint(data:LiveStreamProductWatchRequest): + process_job = modal.Function.from_name("rpa", "rpa_run") + + call = process_job.spawn(data) + return {"call_id": call.object_id} + + +@web_app.get("/result/{call_id}") +async def get_job_result_endpoint(call_id: str): + function_call = modal.FunctionCall.from_id(call_id) + try: + result = function_call.get(timeout=0) + except modal.exception.OutputExpiredError: + return fastapi.responses.JSONResponse(content="", status_code=404) + except TimeoutError: + return fastapi.responses.JSONResponse(content="", status_code=202) + + return result + +# modal serve my_job_queue_endpoint.py