This commit is contained in:
zjf 2025-05-27 13:58:43 +08:00
parent 890f017a1e
commit 866870f07d
1 changed files with 45 additions and 24 deletions

View File

@ -3,6 +3,7 @@ import hashlib
import json
import os
import time
from pathlib import Path
from typing import List, Optional
import modal
@ -26,6 +27,11 @@ rpa_image = (modal.Image.debian_slim(python_version="3.11")
)
app = modal.App(name='rpa', image=rpa_image, include_source=False)
app.include(rpa_web_end_point_app)
# Create a persisted dict - the data gets retained between app runs
rpa_dict = modal.Dict.from_name("rpa-dict", create_if_missing=True)
rpa_queue = modal.Queue.from_name("rpa-queue", create_if_missing=True)
rpa_vol = modal.Volume.from_name("rpa-volume", create_if_missing=True)
############################ Modal app set############################
EXPLAINING: str = "讲解中"
@ -256,7 +262,7 @@ def get_job_result(call_id: str, timeout=0):
with rpa_image.imports():
@app.function(timeout=8 * 60 * 60)
@app.function(timeout=8 * 60 * 60,volumes={"/data": rpa_vol})
async def rpa_run(data: LiveStreamProductWatchRequest) -> dict:
"""
Records product sessions for all products in a Douyin live session.
@ -264,7 +270,7 @@ with rpa_image.imports():
Stops if HTTP request indicates live stream has ended.
Includes total duration of all sessions.
:param webcast_id: The ID of the live stream
:return: Dictionary containing title, live_id, product sessions, total duration, cut commands, and error
"""
webcast_id: str = data.live_id
@ -291,21 +297,20 @@ with rpa_image.imports():
if live_status == LIVE_END:
result.msg = LIVE_END
return result.dict()
else:
product_sessions = await get_promotion_list_text(page, webcast_id)
result.product_sessions = product_sessions
result.duration = sum(t.duration for t in product_sessions) # Total duration
logger.info(f"Total duration: {result.duration}ms")
logger.info(f"result: {result}")
product_sessions = await get_promotion_list_text(page, webcast_id)
result.product_sessions = product_sessions
result.duration = sum(t.duration for t in product_sessions) # Total duration
logger.info(f"Total duration: {result.duration}ms")
logger.info(f"result: {result}")
# TODO 后续单独接口 Generate FFmpeg cut commands
# result.cut_commands = [
# f"ffmpeg -i input.mp4 -ss {t.start_time / 1000} -t {t.duration / 1000} -c copy output_{t.start_time}.mp4"
# for t in product_sessions
# ]
# TODO 后续单独接口 Generate FFmpeg cut commands
# result.cut_commands = [
# f"ffmpeg -i input.mp4 -ss {t.start_time / 1000} -t {t.duration / 1000} -c copy output_{t.start_time}.mp4"
# for t in product_sessions
# ]
logger.info(f"Generated {len(result.cut_commands)} FFmpeg cut commands")
logger.info(f"Generated {len(result.cut_commands)} FFmpeg cut commands")
finally:
await browser.close()
@ -314,18 +319,34 @@ with rpa_image.imports():
result.error = str(e)
# Save result to JSON file with ULID naming,TODO 后续s3上传
try:
ulid = str(ULID())
file_path = os.path.join(os.getcwd(), f"{ulid}.json")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(result.dict(), f, ensure_ascii=False, indent=2)
logger.info(f"Saved result to JSON file: {file_path}")
except IOError as e:
logger.error(f"Failed to save JSON file: {str(e)}")
await save_result_to_json(result)
return result.dict() # Return as dict for JSON compatibility
async def save_result_to_json(result):
try:
output_dir = Path("/data/live_watch")
output_dir.mkdir(exist_ok=True, parents=True)
ulid = str(ULID())
file_path = os.path.join(output_dir, f"{ulid}.json")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(result.dict(), f, ensure_ascii=False, indent=2)
rpa_vol.commit() # Needed to make sure all changes are persisted before exit
logger.info(f"Saved result to JSON file: {file_path}")
rpa_dict[ulid] = result.dict() # setting a value
res_value = rpa_dict[ulid] # getting a value
logger.info(f"Saved result: {res_value}")
rpa_queue.put(ulid) # adding a value
queue_value = rpa_queue.get() # retrieving a value
logger.info(f"Queued result: {queue_value}")
except IOError as e:
logger.error(f"Failed to save JSON file: {str(e)}")
async def get_live_status_by_web_page(page: Page, webcast_id: str):
live_status = None
await page.wait_for_selector("[class='pip-anchor']", timeout=60000)
@ -355,7 +376,7 @@ with rpa_image.imports():
# 312898731614
# webcast_ids = ["36861064178"]
# webcast_ids = ["36861064178"]
webcast_id = "36861064178"
webcast_id = "40018769057"
data: LiveStreamProductWatchRequest = LiveStreamProductWatchRequest(live_id=webcast_id)
# TODO other info