From c94889e189ab3fc0ecbc4f0884ada9205d8f3faf Mon Sep 17 00:00:00 2001 From: zjf Date: Tue, 27 May 2025 19:38:52 +0800 Subject: [PATCH] rpa --- src/cluster/rpa.py | 111 +++++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/src/cluster/rpa.py b/src/cluster/rpa.py index c6481f6..f5f9be3 100644 --- a/src/cluster/rpa.py +++ b/src/cluster/rpa.py @@ -349,6 +349,60 @@ def get_job_result(call_id: str, timeout=0): return result +async def save_result_to_json(result: LiveStreamResult, data: LiveStreamProductWatchRequest): + try: + output_dir = Path("/data/live_watch") + output_dir.mkdir(exist_ok=True, parents=True) + # ulid = str(ULID()) + ulid = data.live_stream_name + file_path = os.path.join(output_dir, f"{ulid}.json") + with open(file_path, "w", encoding="utf-8") as f: + json.dump(result.dict(), f, ensure_ascii=False, indent=2) + rpa_vol.commit() # Needed to make sure all changes are persisted before exit + logger.info(f"Saved result to JSON file: {file_path}") + + rpa_dict[ulid] = result.dict() # setting a value + res_value = rpa_dict[ulid] # getting a value + logger.info(f"Saved result: {res_value}") + + # rpa_queue.put(ulid) # adding a value + # queue_value = rpa_queue.get() # retrieving a value + # logger.info(f"Queued result: {queue_value}") + + except IOError as e: + logger.error(f"Failed to save JSON file: {str(e)}") + + +async def get_live_status_by_web_page(page: Page, webcast_id: str): + live_status = None + logger.debug(f"Getting live status for webcast_id: {webcast_id}") + try: + await page.wait_for_selector("[class='pip-anchor']", timeout=3 * 000) + await page.wait_for_function( + """() => { + const element = document.querySelector('[class="pip-anchor"]'); + return element && element.innerText !== ""; + }""", + timeout=3 * 000 + ) + live_status_js = """document.querySelector('[class="pip-anchor"]')?.innerText || ''""" + live_status_text: str = await page.evaluate(live_status_js) + if live_status_text.__contains__(LIVE_END): + logger.warning(f"live stream has ended,webcast_id:{webcast_id},{LIVE_END}") + live_status = LIVE_END + + except BaseException as e: + logger.error(f"Failed to save JSON file: {str(e)}") + + logger.info(f"Live status: {live_status}") + return live_status + + +def submit_job(data: LiveStreamProductWatchRequest): + call = rpa_run.spawn(data) + return call.object_id + +########################################rpa_image start###################################################### with rpa_image.imports(): @app.function(timeout=8 * 60 * 60, volumes={"/data": rpa_vol}) async def rpa_run(data: LiveStreamProductWatchRequest) -> dict: @@ -407,61 +461,6 @@ with rpa_image.imports(): return result.dict() # Return as dict for JSON compatibility - - async def save_result_to_json(result: LiveStreamResult, data: LiveStreamProductWatchRequest): - try: - output_dir = Path("/data/live_watch") - output_dir.mkdir(exist_ok=True, parents=True) - # ulid = str(ULID()) - ulid = data.live_stream_name - file_path = os.path.join(output_dir, f"{ulid}.json") - with open(file_path, "w", encoding="utf-8") as f: - json.dump(result.dict(), f, ensure_ascii=False, indent=2) - rpa_vol.commit() # Needed to make sure all changes are persisted before exit - logger.info(f"Saved result to JSON file: {file_path}") - - rpa_dict[ulid] = result.dict() # setting a value - res_value = rpa_dict[ulid] # getting a value - logger.info(f"Saved result: {res_value}") - - # rpa_queue.put(ulid) # adding a value - # queue_value = rpa_queue.get() # retrieving a value - # logger.info(f"Queued result: {queue_value}") - - except IOError as e: - logger.error(f"Failed to save JSON file: {str(e)}") - - - async def get_live_status_by_web_page(page: Page, webcast_id: str): - live_status = None - logger.debug(f"Getting live status for webcast_id: {webcast_id}") - try: - await page.wait_for_selector("[class='pip-anchor']", timeout=3 * 000) - await page.wait_for_function( - """() => { - const element = document.querySelector('[class="pip-anchor"]'); - return element && element.innerText !== ""; - }""", - timeout=3 * 000 - ) - live_status_js = """document.querySelector('[class="pip-anchor"]')?.innerText || ''""" - live_status_text: str = await page.evaluate(live_status_js) - if live_status_text.__contains__(LIVE_END): - logger.warning(f"live stream has ended,webcast_id:{webcast_id},{LIVE_END}") - live_status = LIVE_END - - except BaseException as e: - logger.error(f"Failed to save JSON file: {str(e)}") - - logger.info(f"Live status: {live_status}") - return live_status - - - def submit_job(data: LiveStreamProductWatchRequest): - call = rpa_run.spawn(data) - return call.object_id - - @app.local_entrypoint() async def local_run(): logger.info("Starting local RPA test") @@ -491,3 +490,5 @@ with rpa_image.imports(): # # 36861064178 333555252930 # title = await rpa_run.remote.aio(webcast_id="36861064178") # logger.info(f"title = {title}") + +########################################rpa_image end###################################################### \ No newline at end of file