From 68145aa3a46059f33c5b2aae05f34a80104494dd Mon Sep 17 00:00:00 2001 From: zjf Date: Wed, 28 May 2025 10:44:29 +0800 Subject: [PATCH] rpa --- src/cluster/rpa.py | 45 +++++++++++++------------------- src/cluster/rpa_comm.py | 22 ++++++++++++++-- src/cluster/rpa_web_end_point.py | 7 ++++- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/cluster/rpa.py b/src/cluster/rpa.py index 2300cf4..ca2318c 100644 --- a/src/cluster/rpa.py +++ b/src/cluster/rpa.py @@ -130,20 +130,13 @@ async def handle_response(custom_param, response): @retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) -async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int = 8 * 60 * 60) -> List[ProductSession]: - """ - Polls the promotion list text every 3 seconds for the entire live stream. - Refreshes the page to ensure updated innerText. - Tracks live streaming status for multiple products, recording start, end times, - duration, product name, and product ID as ProductSession objects. - Stops if HTTP request indicates live stream has ended. - - :param page: Playwright page object - :param webcast_id: The ID of the live stream - :param max_duration: Maximum duration to poll (in seconds, default 8 hours) - :return: Tuple of (promotion_list_text, product_sessions) +async def get_promotion_list_text(page: Page, result: LiveStreamResult, + data: LiveStreamProductWatchRequest, + max_duration: int = 8 * 60 * 60) -> LiveStreamResult: """ + """ + logger.debug(f"get_promotion_list_text result: {result}") product_name_js, promotion_list_js = await init_load(page) product_sessions: List[ProductSession] = [] # List of ProductSession objects @@ -156,11 +149,10 @@ async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int last_status_check = start_polling while time.time() - start_polling < max_duration: - # try: # Check live status every 60 seconds if (time.time() - last_status_check) > 60: - if not await check_live_status(webcast_id, page): + if not await check_live_status(result.live_id, page): logger.info("Live stream ended based on HTTP status") break last_status_check = time.time() @@ -207,12 +199,12 @@ async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int start_time) await asyncio.sleep(3) # Poll every 3 seconds - # except Exception as e: - # logger.error(f"Error during polling or page refresh: {str(e)}") - # await page.context.clear_cookies() - # await page.context.add_cookies(cookies) - # await asyncio.sleep(3) - # continue + # 此处临时写 + if len(product_sessions) > 0: + result.product_sessions = product_sessions + result.duration = sum(t.duration for t in result.product_sessions) # Total duration + logger.info(f"curr Total duration: {result.duration}ms") + await save_result_to_json(result, data) # Finalize ongoing session if valid if start_time is not None and last_live_time is not None and current_product_name and current_product_img: @@ -224,7 +216,10 @@ async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int product_sessions, start_time) - return product_sessions + result.product_sessions = product_sessions + result.duration = sum(t.duration for t in result.product_sessions) # Total duration + logger.info(f"Total duration: {result.duration}ms") + return result async def do_finalized_product(current_product_id: str, @@ -357,6 +352,7 @@ async def save_result_to_json(result: LiveStreamResult, data: LiveStreamProductW output_dir = Path("/data/live_watch") output_dir.mkdir(exist_ok=True, parents=True) # ulid = str(ULID()) + # 使用live_stream_name ulid = data.live_stream_name file_path = os.path.join(output_dir, f"{ulid}.json") with open(file_path, "w", encoding="utf-8") as f: @@ -446,10 +442,7 @@ with rpa_image.imports(): if live_status == LIVE_END: result.msg = LIVE_END else: - product_sessions = await get_promotion_list_text(page, webcast_id) - result.product_sessions = product_sessions - result.duration = sum(t.duration for t in product_sessions) # Total duration - logger.info(f"Total duration: {result.duration}ms") + result = await get_promotion_list_text(page, result, data) logger.info(f"result: {result}") # TODO 后续单独接口 Generate FFmpeg cut commands @@ -458,8 +451,6 @@ with rpa_image.imports(): # for t in product_sessions # ] - logger.info(f"Generated {len(result.cut_commands)} FFmpeg cut commands") - # Save result to JSON file with ULID naming,TODO 后续s3上传 await save_result_to_json(result, data) diff --git a/src/cluster/rpa_comm.py b/src/cluster/rpa_comm.py index 037c604..d2b4507 100644 --- a/src/cluster/rpa_comm.py +++ b/src/cluster/rpa_comm.py @@ -4,14 +4,17 @@ from pydantic import BaseModel, Field from typing import List, Optional, Any from ulid import ULID + class LiveStreamProductWatchRequest(BaseModel): """ 直播流商品观测 """ - live_stream_name: Optional[str] = Field(str(ULID()), description="直播间推流名称约定ULID:参考https://github.com/ulid/spec") + live_stream_name: Optional[str] = Field(str(ULID()), + description="直播间推流名称约定ULID:参考https://github.com/ulid/spec") """直播间推流名称""" - live_stream_start_time: Optional[int] = Field(int(time.time() * 1000), description="直播推流开始世界时间(毫秒时间戳)") + live_stream_start_time: Optional[int] = Field(int(time.time() * 1000), + description="直播推流开始世界时间(毫秒时间戳)") """直播推流开始世界时间(毫秒时间戳)""" live_stream_url: Optional[str] = None """直播间推流链接""" @@ -67,3 +70,18 @@ class LiveStreamResult(BaseModel): LIVE_WATCH_TASK_DOING: str = "LIVE_WATCH_TASK_DOING" + + +def is_ulid(str_value: str) -> bool: + if str_value is None or str_value.strip() == "": + return False + try: + ULID.from_str(str_value) + return True + except BaseException as e: + print(e) + return False + + +if __name__ == '__main__': + ULID.from_str("123") diff --git a/src/cluster/rpa_web_end_point.py b/src/cluster/rpa_web_end_point.py index f524d84..93c769b 100644 --- a/src/cluster/rpa_web_end_point.py +++ b/src/cluster/rpa_web_end_point.py @@ -3,8 +3,9 @@ import fastapi from typing import List, Optional, Any, Coroutine import modal from loguru import logger +from ulid import ULID -from .rpa_comm import LiveStreamProductWatchRequest, LiveStreamResult, ProductSession, LIVE_WATCH_TASK_DOING +from .rpa_comm import LiveStreamProductWatchRequest, LiveStreamResult, ProductSession, LIVE_WATCH_TASK_DOING, is_ulid from scalar_fastapi import get_scalar_api_reference image = (modal.Image.debian_slim() @@ -40,6 +41,9 @@ async def scalar_html(): @web_app.post("/live_watch/submit",name="直播观测提交任务",description="监测直播侧发起,观测直播商品&切条") async def submit_job_endpoint(data: LiveStreamProductWatchRequest) -> Optional[Any]: logger.debug(f"input data: {data}") + if is_ulid(data.live_stream_name) is False: + return {"call_id": None,"error": "live_stream_name is not valid"} + process_job = modal.Function.from_name("rpa", "rpa_run") call = process_job.spawn(data) @@ -61,3 +65,4 @@ async def get_job_result_endpoint(call_id: str) -> Optional[LiveStreamResult]: return result # modal serve my_job_queue_endpoint.py +