modalDeploy/AutoDL/autodl_scheduling/entity/running_pool.py

86 lines
3.3 KiB
Python

import time
import traceback
import uuid
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Union
import loguru
import requests
from fastapi import UploadFile
SERVER_TIME_OUT = 60 * 35
class RunningPool:
def __init__(self):
self.tasks = {}
self._headers = {
"Authorization": "Bearer 7468B79CA2CF53436C06B29A6F16451C"
}
def run(self, instance_id, uid, base_url:str, video_file:Union[UploadFile,str], audio_file:Union[UploadFile,str]) -> Union[bool, None]:
try:
code = self._req(base_url, video_file, audio_file)
if not code:
raise Exception("[Code] is None")
except Exception as e:
loguru.logger.error("%s Task Submit Failed: %s" % (uid, e))
return None
self.tasks[uid] = {"instance_id": instance_id, "base_url":base_url, "code": code, "status": False, "start_time": time.time()}
return True
def result(self, uid:str) -> Union[dict, None]:
if uid in self.tasks:
try:
resp = requests.get(self.tasks[uid]["base_url"] + "/" + self.tasks[uid]["code"], headers=self._headers)
if resp.status_code != 200:
raise Exception("%s Get Result Failed: %s" % (uid, resp.status_code))
else:
if resp.json()["status"] == "waiting":
if self.tasks[uid]["start_time"] + SERVER_TIME_OUT + 10 < time.time():
self.tasks[uid]["status"] = True
return {"status": "fail", "msg": "Instance Timeout", "instance_id": self.tasks[uid]["instance_id"]}
return None
else:
self.tasks[uid]["status"] = True
return {"status": resp.json()['status'], "msg": resp.json()['msg'],
"instance_id": self.tasks[uid]["instance_id"]}
except Exception as e:
traceback.print_exc()
loguru.logger.error("%s Get Result Failed: %s" % (uid, e))
return None
else:
raise KeyError(uid)
def get_running_size(self):
return len([i for i in self.tasks.values() if not i["status"]])
def _req(self, base_url, video_file, audio_file):
data = {
"video_file": video_file,
"audio_file": audio_file
}
resp = requests.post(base_url, data, headers=self._headers, allow_redirects=True, stream=True)
loguru.logger.info("Submit Response: " % resp.text)
if resp.status_code == 200:
if resp.json()["status"] == "success":
code = resp.json()["code"]
return code
else:
return None
else:
return None
if __name__ == "__main__":
rp = RunningPool()
id = rp.run("https://u336391-b31a-07132bc4.westx.seetacloud.com:8443",
"https://sucai-1324682537.cos.ap-shanghai.myqcloud.com/tiktok/video/1111.mp4",
"https://sucai-1324682537.cos.ap-shanghai.myqcloud.com/tiktok/video/XBR037ruAZsA.mp3")
while True:
r = rp.result(id)
if r:
loguru.logger.success(r)
break
else:
loguru.logger.info("waiting...")
time.sleep(5)