This commit is contained in:
zjf 2025-05-27 19:38:52 +08:00
parent d37484f1d9
commit c94889e189
1 changed files with 56 additions and 55 deletions

View File

@ -349,6 +349,60 @@ def get_job_result(call_id: str, timeout=0):
return result
async def save_result_to_json(result: LiveStreamResult, data: LiveStreamProductWatchRequest):
try:
output_dir = Path("/data/live_watch")
output_dir.mkdir(exist_ok=True, parents=True)
# ulid = str(ULID())
ulid = data.live_stream_name
file_path = os.path.join(output_dir, f"{ulid}.json")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(result.dict(), f, ensure_ascii=False, indent=2)
rpa_vol.commit() # Needed to make sure all changes are persisted before exit
logger.info(f"Saved result to JSON file: {file_path}")
rpa_dict[ulid] = result.dict() # setting a value
res_value = rpa_dict[ulid] # getting a value
logger.info(f"Saved result: {res_value}")
# rpa_queue.put(ulid) # adding a value
# queue_value = rpa_queue.get() # retrieving a value
# logger.info(f"Queued result: {queue_value}")
except IOError as e:
logger.error(f"Failed to save JSON file: {str(e)}")
async def get_live_status_by_web_page(page: Page, webcast_id: str):
live_status = None
logger.debug(f"Getting live status for webcast_id: {webcast_id}")
try:
await page.wait_for_selector("[class='pip-anchor']", timeout=3 * 000)
await page.wait_for_function(
"""() => {
const element = document.querySelector('[class="pip-anchor"]');
return element && element.innerText !== "";
}""",
timeout=3 * 000
)
live_status_js = """document.querySelector('[class="pip-anchor"]')?.innerText || ''"""
live_status_text: str = await page.evaluate(live_status_js)
if live_status_text.__contains__(LIVE_END):
logger.warning(f"live stream has ended,webcast_id:{webcast_id},{LIVE_END}")
live_status = LIVE_END
except BaseException as e:
logger.error(f"Failed to save JSON file: {str(e)}")
logger.info(f"Live status: {live_status}")
return live_status
def submit_job(data: LiveStreamProductWatchRequest):
call = rpa_run.spawn(data)
return call.object_id
########################################rpa_image start######################################################
with rpa_image.imports():
@app.function(timeout=8 * 60 * 60, volumes={"/data": rpa_vol})
async def rpa_run(data: LiveStreamProductWatchRequest) -> dict:
@ -407,61 +461,6 @@ with rpa_image.imports():
return result.dict() # Return as dict for JSON compatibility
async def save_result_to_json(result: LiveStreamResult, data: LiveStreamProductWatchRequest):
try:
output_dir = Path("/data/live_watch")
output_dir.mkdir(exist_ok=True, parents=True)
# ulid = str(ULID())
ulid = data.live_stream_name
file_path = os.path.join(output_dir, f"{ulid}.json")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(result.dict(), f, ensure_ascii=False, indent=2)
rpa_vol.commit() # Needed to make sure all changes are persisted before exit
logger.info(f"Saved result to JSON file: {file_path}")
rpa_dict[ulid] = result.dict() # setting a value
res_value = rpa_dict[ulid] # getting a value
logger.info(f"Saved result: {res_value}")
# rpa_queue.put(ulid) # adding a value
# queue_value = rpa_queue.get() # retrieving a value
# logger.info(f"Queued result: {queue_value}")
except IOError as e:
logger.error(f"Failed to save JSON file: {str(e)}")
async def get_live_status_by_web_page(page: Page, webcast_id: str):
live_status = None
logger.debug(f"Getting live status for webcast_id: {webcast_id}")
try:
await page.wait_for_selector("[class='pip-anchor']", timeout=3 * 000)
await page.wait_for_function(
"""() => {
const element = document.querySelector('[class="pip-anchor"]');
return element && element.innerText !== "";
}""",
timeout=3 * 000
)
live_status_js = """document.querySelector('[class="pip-anchor"]')?.innerText || ''"""
live_status_text: str = await page.evaluate(live_status_js)
if live_status_text.__contains__(LIVE_END):
logger.warning(f"live stream has ended,webcast_id:{webcast_id},{LIVE_END}")
live_status = LIVE_END
except BaseException as e:
logger.error(f"Failed to save JSON file: {str(e)}")
logger.info(f"Live status: {live_status}")
return live_status
def submit_job(data: LiveStreamProductWatchRequest):
call = rpa_run.spawn(data)
return call.object_id
@app.local_entrypoint()
async def local_run():
logger.info("Starting local RPA test")
@ -491,3 +490,5 @@ with rpa_image.imports():
# # 36861064178 333555252930
# title = await rpa_run.remote.aio(webcast_id="36861064178")
# logger.info(f"title = {title}")
########################################rpa_image end######################################################