From ede96911c9b3b9be1d3c7b8630dd862a4c62fbe0 Mon Sep 17 00:00:00 2001 From: zjf Date: Wed, 28 May 2025 11:19:54 +0800 Subject: [PATCH] rpa --- src/cluster/rpa.py | 83 ++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/src/cluster/rpa.py b/src/cluster/rpa.py index ca2318c..6e29968 100644 --- a/src/cluster/rpa.py +++ b/src/cluster/rpa.py @@ -81,6 +81,10 @@ headers = { 'X-Secsdk-Csrf-Token': '000100000001df7d0d7e4fdc21901514fabb1fcf3df2aa62a132716321e6fface6dbe67cdb171841ce4d6d7edc27' } +TOP_PROMOTION = None +PROMOTIONS = [] +PROMOTIONS_TOTAL = 0 + async def check_live_status(webcast_id: str, page: Optional[Page] = None) -> bool: """ @@ -110,6 +114,8 @@ async def handle_response(custom_param, response): TODO 有部分主播会移除商品,搞一个全局记录,类似page的部分返回值,初始化使用这个 TODO:处理录屏问题,录屏时不能刷新,最好模拟点击 """ + global TOP_PROMOTION, PROMOTIONS, PROMOTIONS_TOTAL + if "live.douyin.com/live/promotions/page" in response.url: print(f"Custom param: {custom_param}") print(f"Response URL: {response.url}") @@ -118,15 +124,24 @@ async def handle_response(custom_param, response): # 获取响应内容 try: json_data = await response.json() - logger.debug(f"json_data:{json.dumps(json_data, indent=2, ensure_ascii=False)}") + if "top_promotion" in json_data: - logger.debug(f'top_promotion:{json_data["top_promotion"]}') + logger.debug(f'top_promotion get') + TOP_PROMOTION = json_data["top_promotion"] + if "promotions" in json_data: - logger.debug(f'promotions:{json_data["promotions"]}') + logger.debug(f'promotions:{len(json_data["promotions"])}') + PROMOTIONS = json_data["promotions"] + if "total" in json_data: logger.debug(f'total:{json_data["total"]}') - except: - logger.debug("Non-JSON response") + PROMOTIONS_TOTAL = json_data["total"] + + except BaseException as e: + logger.error(f"Non-JSON response {e}") + + logger.debug( + f"TOP_PROMOTION:{TOP_PROMOTION is not None},PROMOTIONS:{len(PROMOTIONS)},PROMOTIONS_TOTAL: {PROMOTIONS_TOTAL}") @retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) @@ -423,33 +438,11 @@ with rpa_image.imports(): async with async_playwright() as p: browser: Browser = await p.chromium.launch(headless=True) - - context = await browser.new_context() - await context.add_cookies(cookies) - logger.info("Loaded cookies") - - page: Page = await context.new_page() - url = f"https://live.douyin.com/{webcast_id}?open_promotion_list=1" - logger.info(f"Navigating to {url}") - await page.goto(url, timeout=60000) - await page.wait_for_load_state("networkidle", timeout=60000) - - result.title = await page.title() - logger.info(f"Retrieved title: {result.title}") - - live_status = await get_live_status_by_web_page(page, webcast_id) - - if live_status == LIVE_END: - result.msg = LIVE_END - else: - result = await get_promotion_list_text(page, result, data) - logger.info(f"result: {result}") - - # TODO 后续单独接口 Generate FFmpeg cut commands - # result.cut_commands = [ - # f"ffmpeg -i input.mp4 -ss {t.start_time / 1000} -t {t.duration / 1000} -c copy output_{t.start_time}.mp4" - # for t in product_sessions - # ] + try: + result = await goto_live(browser, data, result, webcast_id) + except BaseException as e: + logger.error(f"Failed to start RPA run: {str(e)}") + await browser.close() # Save result to JSON file with ULID naming,TODO 后续s3上传 await save_result_to_json(result, data) @@ -457,6 +450,32 @@ with rpa_image.imports(): return result.dict() # Return as dict for JSON compatibility + async def goto_live(browser, data, result, webcast_id): + context = await browser.new_context() + await context.add_cookies(cookies) + logger.info("Loaded cookies") + page: Page = await context.new_page() + url = f"https://live.douyin.com/{webcast_id}?open_promotion_list=1" + logger.info(f"Navigating to {url}") + await page.goto(url, timeout=60000) + await page.wait_for_load_state("networkidle", timeout=60000) + result.title = await page.title() + logger.info(f"Retrieved title: {result.title}") + live_status = await get_live_status_by_web_page(page, webcast_id) + if live_status == LIVE_END: + result.msg = LIVE_END + else: + result = await get_promotion_list_text(page, result, data) + logger.info(f"result: {result}") + + # TODO 后续单独接口 Generate FFmpeg cut commands + # result.cut_commands = [ + # f"ffmpeg -i input.mp4 -ss {t.start_time / 1000} -t {t.duration / 1000} -c copy output_{t.start_time}.mp4" + # for t in product_sessions + # ] + return result + + @app.local_entrypoint() async def local_run(): logger.info("Starting local RPA test")