diff --git a/AutoDL/autodl_scheduling/entity/instance_pool.py b/AutoDL/autodl_scheduling/entity/instance_pool.py index ff8c1d3..947a1b6 100644 --- a/AutoDL/autodl_scheduling/entity/instance_pool.py +++ b/AutoDL/autodl_scheduling/entity/instance_pool.py @@ -33,15 +33,16 @@ class InstancePool: self.instances:List[Instance] = [] self.executor = ThreadPoolExecutor(max_workers=os.cpu_count()*2) self.threads = [] + self.intro_threads = [] - def scale_instance(self, target_instance): + def scale_instance(self, target_instance, disable_shrink=True): if target_instance + self.buffer_instance < self.min_instance: return self._scale(self.min_instance) if target_instance + self.buffer_instance > self.max_instance: return self._scale(self.max_instance) if target_instance + self.buffer_instance == len(self.instances): return True - return self._scale(target_instance + self.buffer_instance) + return self._scale(target_instance + self.buffer_instance, disable_shrink) def remove_instance(self, instance:Instance): if instance_operate(instance.uuid, "power_off"): @@ -73,21 +74,34 @@ class InstancePool: def introspection(self): # 停止超时实例(运行超时和无任务超时) + before=len(self.instances) instance_copy = copy.deepcopy(self.instances) + flag = False for instance in instance_copy: if instance.active: if (time.time() - instance.last_active_time) > self.timeout: - self.threads.append(self.executor.submit(self.remove_instance, instance=instance)) + flag = True + self.intro_threads.append(self.executor.submit(self.remove_instance, instance=instance)) else: if (time.time() - instance.last_active_time) > self.scaledown_window: - self.threads.append(self.executor.submit(self.remove_instance, instance=instance)) + flag = True + self.intro_threads.append(self.executor.submit(self.remove_instance, instance=instance)) + while len(self.intro_threads) > 0: + for t in self.intro_threads: + t.result(timeout=self.timeout//2) + self.intro_threads.remove(t) + after = len(self.instances) + if flag: + loguru.logger.info("Instance Num Before Introspecting %d After Introspecting %d" % (before, after)) - def _scale(self, target_instance:int): - loguru.logger.info("Instance Num Before Scaling %d ; Target %d" % (len(self.instances), target_instance)) + def _scale(self, target_instance:int, disable_shrink=True): self.introspection() # 调整实例数量 instance_copy = copy.deepcopy(self.instances) dest = target_instance - len(instance_copy) + if (disable_shrink and dest < 0) or dest == 0: + return True + loguru.logger.info("Instance Num Before Scaling %d ; Target %d" % (len(self.instances), target_instance)) if dest < 0: dest = abs(dest) for instance in instance_copy: diff --git a/AutoDL/autodl_scheduling/entity/running_pool.py b/AutoDL/autodl_scheduling/entity/running_pool.py index 5bc8927..36aa859 100644 --- a/AutoDL/autodl_scheduling/entity/running_pool.py +++ b/AutoDL/autodl_scheduling/entity/running_pool.py @@ -1,4 +1,5 @@ import time +import traceback import uuid from concurrent.futures.thread import ThreadPoolExecutor from typing import Union @@ -18,6 +19,8 @@ class RunningPool: def run(self, instance_id, uid, base_url:str, video_file:Union[UploadFile,str], audio_file:Union[UploadFile,str]) -> Union[bool, None]: try: code = self._req(base_url, video_file, audio_file) + if not code: + raise Exception("[Code] is None") except Exception as e: loguru.logger.error("%s Task Submit Failed: %s" % (uid, e)) return None @@ -41,6 +44,7 @@ class RunningPool: return {"status": resp.json()['status'], "msg": resp.json()['msg'], "instance_id": self.tasks[uid]["instance_id"]} except Exception as e: + traceback.print_exc() loguru.logger.error("%s Get Result Failed: %s" % (uid, e)) return None else: diff --git a/AutoDL/autodl_scheduling/server.py b/AutoDL/autodl_scheduling/server.py index 8e108f2..1d79ccf 100644 --- a/AutoDL/autodl_scheduling/server.py +++ b/AutoDL/autodl_scheduling/server.py @@ -53,20 +53,17 @@ class Server: return {"status": "queuing", "msg":""} def introspect_instance(self): - loguru.logger.info("Introspecting worker started") + loguru.logger.info("Introspecting worker started || Scaledown Window: %ds" % self.instance_pool.scaledown_window) while True: self.instance_pool.introspection() - time.sleep(0.5) + time.sleep(1) def scaling_worker(self): loguru.logger.info("Scaling worker started") try: - last_target = 0 while True: # 提交任务 - if last_target != self.waiting_queue.get_size()+self.running_pool.get_running_size(): - last_target = self.waiting_queue.get_size()+self.running_pool.get_running_size() - self.instance_pool.scale_instance(last_target) + self.instance_pool.scale_instance(self.waiting_queue.get_size()+self.running_pool.get_running_size(), disable_shrink=True) for instance in self.instance_pool.instances: if not instance.active: # 从等待队列取出任务