This commit is contained in:
zjf 2025-05-28 10:44:29 +08:00
parent 09c55c5cbb
commit 68145aa3a4
3 changed files with 44 additions and 30 deletions

View File

@ -130,20 +130,13 @@ async def handle_response(custom_param, response):
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int = 8 * 60 * 60) -> List[ProductSession]: async def get_promotion_list_text(page: Page, result: LiveStreamResult,
""" data: LiveStreamProductWatchRequest,
Polls the promotion list text every 3 seconds for the entire live stream. max_duration: int = 8 * 60 * 60) -> LiveStreamResult:
Refreshes the page to ensure updated innerText.
Tracks live streaming status for multiple products, recording start, end times,
duration, product name, and product ID as ProductSession objects.
Stops if HTTP request indicates live stream has ended.
:param page: Playwright page object
:param webcast_id: The ID of the live stream
:param max_duration: Maximum duration to poll (in seconds, default 8 hours)
:return: Tuple of (promotion_list_text, product_sessions)
""" """
"""
logger.debug(f"get_promotion_list_text result: {result}")
product_name_js, promotion_list_js = await init_load(page) product_name_js, promotion_list_js = await init_load(page)
product_sessions: List[ProductSession] = [] # List of ProductSession objects product_sessions: List[ProductSession] = [] # List of ProductSession objects
@ -156,11 +149,10 @@ async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int
last_status_check = start_polling last_status_check = start_polling
while time.time() - start_polling < max_duration: while time.time() - start_polling < max_duration:
# try:
# Check live status every 60 seconds # Check live status every 60 seconds
if (time.time() - last_status_check) > 60: if (time.time() - last_status_check) > 60:
if not await check_live_status(webcast_id, page): if not await check_live_status(result.live_id, page):
logger.info("Live stream ended based on HTTP status") logger.info("Live stream ended based on HTTP status")
break break
last_status_check = time.time() last_status_check = time.time()
@ -207,12 +199,12 @@ async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int
start_time) start_time)
await asyncio.sleep(3) # Poll every 3 seconds await asyncio.sleep(3) # Poll every 3 seconds
# except Exception as e: # 此处临时写
# logger.error(f"Error during polling or page refresh: {str(e)}") if len(product_sessions) > 0:
# await page.context.clear_cookies() result.product_sessions = product_sessions
# await page.context.add_cookies(cookies) result.duration = sum(t.duration for t in result.product_sessions) # Total duration
# await asyncio.sleep(3) logger.info(f"curr Total duration: {result.duration}ms")
# continue await save_result_to_json(result, data)
# Finalize ongoing session if valid # Finalize ongoing session if valid
if start_time is not None and last_live_time is not None and current_product_name and current_product_img: if start_time is not None and last_live_time is not None and current_product_name and current_product_img:
@ -224,7 +216,10 @@ async def get_promotion_list_text(page: Page, webcast_id: str, max_duration: int
product_sessions, product_sessions,
start_time) start_time)
return product_sessions result.product_sessions = product_sessions
result.duration = sum(t.duration for t in result.product_sessions) # Total duration
logger.info(f"Total duration: {result.duration}ms")
return result
async def do_finalized_product(current_product_id: str, async def do_finalized_product(current_product_id: str,
@ -357,6 +352,7 @@ async def save_result_to_json(result: LiveStreamResult, data: LiveStreamProductW
output_dir = Path("/data/live_watch") output_dir = Path("/data/live_watch")
output_dir.mkdir(exist_ok=True, parents=True) output_dir.mkdir(exist_ok=True, parents=True)
# ulid = str(ULID()) # ulid = str(ULID())
# 使用live_stream_name
ulid = data.live_stream_name ulid = data.live_stream_name
file_path = os.path.join(output_dir, f"{ulid}.json") file_path = os.path.join(output_dir, f"{ulid}.json")
with open(file_path, "w", encoding="utf-8") as f: with open(file_path, "w", encoding="utf-8") as f:
@ -446,10 +442,7 @@ with rpa_image.imports():
if live_status == LIVE_END: if live_status == LIVE_END:
result.msg = LIVE_END result.msg = LIVE_END
else: else:
product_sessions = await get_promotion_list_text(page, webcast_id) result = await get_promotion_list_text(page, result, data)
result.product_sessions = product_sessions
result.duration = sum(t.duration for t in product_sessions) # Total duration
logger.info(f"Total duration: {result.duration}ms")
logger.info(f"result: {result}") logger.info(f"result: {result}")
# TODO 后续单独接口 Generate FFmpeg cut commands # TODO 后续单独接口 Generate FFmpeg cut commands
@ -458,8 +451,6 @@ with rpa_image.imports():
# for t in product_sessions # for t in product_sessions
# ] # ]
logger.info(f"Generated {len(result.cut_commands)} FFmpeg cut commands")
# Save result to JSON file with ULID naming,TODO 后续s3上传 # Save result to JSON file with ULID naming,TODO 后续s3上传
await save_result_to_json(result, data) await save_result_to_json(result, data)

View File

@ -4,14 +4,17 @@ from pydantic import BaseModel, Field
from typing import List, Optional, Any from typing import List, Optional, Any
from ulid import ULID from ulid import ULID
class LiveStreamProductWatchRequest(BaseModel): class LiveStreamProductWatchRequest(BaseModel):
""" """
直播流商品观测 直播流商品观测
""" """
live_stream_name: Optional[str] = Field(str(ULID()), description="直播间推流名称约定ULID参考https://github.com/ulid/spec") live_stream_name: Optional[str] = Field(str(ULID()),
description="直播间推流名称约定ULID参考https://github.com/ulid/spec")
"""直播间推流名称""" """直播间推流名称"""
live_stream_start_time: Optional[int] = Field(int(time.time() * 1000), description="直播推流开始世界时间(毫秒时间戳)") live_stream_start_time: Optional[int] = Field(int(time.time() * 1000),
description="直播推流开始世界时间(毫秒时间戳)")
"""直播推流开始世界时间(毫秒时间戳)""" """直播推流开始世界时间(毫秒时间戳)"""
live_stream_url: Optional[str] = None live_stream_url: Optional[str] = None
"""直播间推流链接""" """直播间推流链接"""
@ -67,3 +70,18 @@ class LiveStreamResult(BaseModel):
LIVE_WATCH_TASK_DOING: str = "LIVE_WATCH_TASK_DOING" LIVE_WATCH_TASK_DOING: str = "LIVE_WATCH_TASK_DOING"
def is_ulid(str_value: str) -> bool:
if str_value is None or str_value.strip() == "":
return False
try:
ULID.from_str(str_value)
return True
except BaseException as e:
print(e)
return False
if __name__ == '__main__':
ULID.from_str("123")

View File

@ -3,8 +3,9 @@ import fastapi
from typing import List, Optional, Any, Coroutine from typing import List, Optional, Any, Coroutine
import modal import modal
from loguru import logger from loguru import logger
from ulid import ULID
from .rpa_comm import LiveStreamProductWatchRequest, LiveStreamResult, ProductSession, LIVE_WATCH_TASK_DOING from .rpa_comm import LiveStreamProductWatchRequest, LiveStreamResult, ProductSession, LIVE_WATCH_TASK_DOING, is_ulid
from scalar_fastapi import get_scalar_api_reference from scalar_fastapi import get_scalar_api_reference
image = (modal.Image.debian_slim() image = (modal.Image.debian_slim()
@ -40,6 +41,9 @@ async def scalar_html():
@web_app.post("/live_watch/submit",name="直播观测提交任务",description="监测直播侧发起,观测直播商品&切条") @web_app.post("/live_watch/submit",name="直播观测提交任务",description="监测直播侧发起,观测直播商品&切条")
async def submit_job_endpoint(data: LiveStreamProductWatchRequest) -> Optional[Any]: async def submit_job_endpoint(data: LiveStreamProductWatchRequest) -> Optional[Any]:
logger.debug(f"input data: {data}") logger.debug(f"input data: {data}")
if is_ulid(data.live_stream_name) is False:
return {"call_id": None,"error": "live_stream_name is not valid"}
process_job = modal.Function.from_name("rpa", "rpa_run") process_job = modal.Function.from_name("rpa", "rpa_run")
call = process_job.spawn(data) call = process_job.spawn(data)
@ -61,3 +65,4 @@ async def get_job_result_endpoint(call_id: str) -> Optional[LiveStreamResult]:
return result return result
# modal serve my_job_queue_endpoint.py # modal serve my_job_queue_endpoint.py