import time import traceback import uuid from concurrent.futures.thread import ThreadPoolExecutor from typing import Union import loguru import requests from fastapi import UploadFile SERVER_TIME_OUT = 60 * 35 class RunningPool: def __init__(self): self.tasks = {} self._headers = { "Authorization": "Bearer 7468B79CA2CF53436C06B29A6F16451C" } def run(self, instance_id, uid, base_url:str, video_file:Union[UploadFile,str], audio_file:Union[UploadFile,str]) -> Union[bool, None]: try: code = self._req(base_url, video_file, audio_file) if not code: raise Exception("[Code] is None") except Exception as e: loguru.logger.error("%s Task Submit Failed: %s" % (uid, e)) return None self.tasks[uid] = {"instance_id": instance_id, "base_url":base_url, "code": code, "status": False, "start_time": time.time()} return True def result(self, uid:str) -> Union[dict, None]: if uid in self.tasks: try: resp = requests.get(self.tasks[uid]["base_url"] + "/" + self.tasks[uid]["code"], headers=self._headers, timeout=10) if resp.status_code != 200: raise Exception("%s Get Result Failed: %s" % (uid, resp.status_code)) else: if resp.json()["status"] == "waiting": if self.tasks[uid]["start_time"] + SERVER_TIME_OUT + 10 < time.time(): self.tasks[uid]["status"] = True return {"status": "fail", "msg": "Instance Timeout", "instance_id": self.tasks[uid]["instance_id"]} return None else: self.tasks[uid]["status"] = True return {"status": resp.json()['status'], "msg": resp.json()['msg'], "instance_id": self.tasks[uid]["instance_id"]} except Exception as e: traceback.print_exc() loguru.logger.error("%s Get Result Failed: %s" % (uid, e)) return None else: raise KeyError(uid) def get_running_size(self): return len([i for i in self.tasks.values() if not i["status"]]) def _req(self, base_url, video_file, audio_file): data = { "video_file": video_file, "audio_file": audio_file } resp = requests.post(base_url, data, headers=self._headers, allow_redirects=True, stream=True) loguru.logger.info("Submit Response: %s" % resp.text) if resp.status_code == 200: if resp.json()["status"] == "success": code = resp.json()["code"] return code else: return None else: return None if __name__ == "__main__": rp = RunningPool() id = rp.run("https://u336391-b31a-07132bc4.westx.seetacloud.com:8443", "https://sucai-1324682537.cos.ap-shanghai.myqcloud.com/tiktok/video/1111.mp4", "https://sucai-1324682537.cos.ap-shanghai.myqcloud.com/tiktok/video/XBR037ruAZsA.mp3") while True: r = rp.result(id) if r: loguru.logger.success(r) break else: loguru.logger.info("waiting...") time.sleep(5)