From 866870f07d22b97281d5ef2202c44cc282270ff5 Mon Sep 17 00:00:00 2001 From: zjf Date: Tue, 27 May 2025 13:58:43 +0800 Subject: [PATCH] rpa --- src/cluster/rpa.py | 69 ++++++++++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 24 deletions(-) diff --git a/src/cluster/rpa.py b/src/cluster/rpa.py index fc9a466..fb87246 100644 --- a/src/cluster/rpa.py +++ b/src/cluster/rpa.py @@ -3,6 +3,7 @@ import hashlib import json import os import time +from pathlib import Path from typing import List, Optional import modal @@ -26,6 +27,11 @@ rpa_image = (modal.Image.debian_slim(python_version="3.11") ) app = modal.App(name='rpa', image=rpa_image, include_source=False) app.include(rpa_web_end_point_app) +# Create a persisted dict - the data gets retained between app runs +rpa_dict = modal.Dict.from_name("rpa-dict", create_if_missing=True) +rpa_queue = modal.Queue.from_name("rpa-queue", create_if_missing=True) +rpa_vol = modal.Volume.from_name("rpa-volume", create_if_missing=True) + ############################ Modal app set############################ EXPLAINING: str = "讲解中" @@ -256,7 +262,7 @@ def get_job_result(call_id: str, timeout=0): with rpa_image.imports(): - @app.function(timeout=8 * 60 * 60) + @app.function(timeout=8 * 60 * 60,volumes={"/data": rpa_vol}) async def rpa_run(data: LiveStreamProductWatchRequest) -> dict: """ Records product sessions for all products in a Douyin live session. @@ -264,7 +270,7 @@ with rpa_image.imports(): Stops if HTTP request indicates live stream has ended. Includes total duration of all sessions. - :param webcast_id: The ID of the live stream + :return: Dictionary containing title, live_id, product sessions, total duration, cut commands, and error """ webcast_id: str = data.live_id @@ -291,21 +297,20 @@ with rpa_image.imports(): if live_status == LIVE_END: result.msg = LIVE_END - return result.dict() + else: + product_sessions = await get_promotion_list_text(page, webcast_id) + result.product_sessions = product_sessions + result.duration = sum(t.duration for t in product_sessions) # Total duration + logger.info(f"Total duration: {result.duration}ms") + logger.info(f"result: {result}") - product_sessions = await get_promotion_list_text(page, webcast_id) - result.product_sessions = product_sessions - result.duration = sum(t.duration for t in product_sessions) # Total duration - logger.info(f"Total duration: {result.duration}ms") - logger.info(f"result: {result}") + # TODO 后续单独接口 Generate FFmpeg cut commands + # result.cut_commands = [ + # f"ffmpeg -i input.mp4 -ss {t.start_time / 1000} -t {t.duration / 1000} -c copy output_{t.start_time}.mp4" + # for t in product_sessions + # ] - # TODO 后续单独接口 Generate FFmpeg cut commands - # result.cut_commands = [ - # f"ffmpeg -i input.mp4 -ss {t.start_time / 1000} -t {t.duration / 1000} -c copy output_{t.start_time}.mp4" - # for t in product_sessions - # ] - - logger.info(f"Generated {len(result.cut_commands)} FFmpeg cut commands") + logger.info(f"Generated {len(result.cut_commands)} FFmpeg cut commands") finally: await browser.close() @@ -314,18 +319,34 @@ with rpa_image.imports(): result.error = str(e) # Save result to JSON file with ULID naming,TODO 后续s3上传 - try: - ulid = str(ULID()) - file_path = os.path.join(os.getcwd(), f"{ulid}.json") - with open(file_path, "w", encoding="utf-8") as f: - json.dump(result.dict(), f, ensure_ascii=False, indent=2) - logger.info(f"Saved result to JSON file: {file_path}") - except IOError as e: - logger.error(f"Failed to save JSON file: {str(e)}") + await save_result_to_json(result) return result.dict() # Return as dict for JSON compatibility + async def save_result_to_json(result): + try: + output_dir = Path("/data/live_watch") + output_dir.mkdir(exist_ok=True, parents=True) + ulid = str(ULID()) + file_path = os.path.join(output_dir, f"{ulid}.json") + with open(file_path, "w", encoding="utf-8") as f: + json.dump(result.dict(), f, ensure_ascii=False, indent=2) + rpa_vol.commit() # Needed to make sure all changes are persisted before exit + logger.info(f"Saved result to JSON file: {file_path}") + + rpa_dict[ulid] = result.dict() # setting a value + res_value = rpa_dict[ulid] # getting a value + logger.info(f"Saved result: {res_value}") + + rpa_queue.put(ulid) # adding a value + queue_value = rpa_queue.get() # retrieving a value + logger.info(f"Queued result: {queue_value}") + + except IOError as e: + logger.error(f"Failed to save JSON file: {str(e)}") + + async def get_live_status_by_web_page(page: Page, webcast_id: str): live_status = None await page.wait_for_selector("[class='pip-anchor']", timeout=60000) @@ -355,7 +376,7 @@ with rpa_image.imports(): # 312898731614 # webcast_ids = ["36861064178"] # webcast_ids = ["36861064178"] - webcast_id = "36861064178" + webcast_id = "40018769057" data: LiveStreamProductWatchRequest = LiveStreamProductWatchRequest(live_id=webcast_id) # TODO other info