diff --git a/AutoDL/autodl_scheduling/entity/instance_pool.py b/AutoDL/autodl_scheduling/entity/instance_pool.py index e4554c0..6689f06 100644 --- a/AutoDL/autodl_scheduling/entity/instance_pool.py +++ b/AutoDL/autodl_scheduling/entity/instance_pool.py @@ -84,22 +84,25 @@ class InstancePool: try: before=len(self.instances) instance_copy = copy.deepcopy(self.instances) - flag = False + flag = [] for instance in instance_copy: if instance.active: if (time.time() - instance.last_active_time) > self.timeout: - flag = True + flag.append(instance.uuid) self.intro_threads.append(self.executor.submit(self.remove_instance, instance=instance)) else: if (time.time() - instance.last_active_time) > self.scaledown_window: - flag = True + flag.append(instance.uuid) self.intro_threads.append(self.executor.submit(self.remove_instance, instance=instance)) while len(self.intro_threads) > 0: for t in self.intro_threads: t.result(timeout=self.timeout//2) self.intro_threads.remove(t) after = len(self.instances) - if flag: + for instance in self.instances: + if instance.uuid in flag: + raise Exception("Instance[%s] Remove Failed" % instance.uuid) + if len(flag) > 0: loguru.logger.info("Instance Num Before Introspecting %d After Introspecting %d" % (before, after)) except Exception as e: loguru.logger.error("Fail to Introspect Instances: %s" % e) diff --git a/AutoDL/autodl_scheduling/entity/waiting_queue.py b/AutoDL/autodl_scheduling/entity/waiting_queue.py index 2e7e8e8..3809afa 100644 --- a/AutoDL/autodl_scheduling/entity/waiting_queue.py +++ b/AutoDL/autodl_scheduling/entity/waiting_queue.py @@ -11,10 +11,10 @@ class WaitingQueue: "video_path": video_path, "audio_path": audio_path } - self.queue.put(data) + self.queue.put(data,timeout=10) def dequeue(self): - data = self.queue.get() + data = self.queue.get(timeout=10) return data["uid"], data["video_path"], data["audio_path"] def get_size(self): diff --git a/AutoDL/autodl_scheduling/server.py b/AutoDL/autodl_scheduling/server.py index 4b903fc..a3da9bd 100644 --- a/AutoDL/autodl_scheduling/server.py +++ b/AutoDL/autodl_scheduling/server.py @@ -22,7 +22,7 @@ class Server: self.waiting_queue = WaitingQueue() self.running_pool = RunningPool() #账号限制max_instance不能超过30 - self.instance_pool = InstancePool(max_instance=3) + self.instance_pool = InstancePool(max_instance=2) self.result_map = ResultMap() self.executor = ThreadPoolExecutor(max_workers=2) self.worker_1 = self.executor.submit(self.scaling_worker) @@ -50,7 +50,11 @@ class Server: try: return self.result_map.get(uid) except: - return {"status": "queuing", "msg":""} + l = list(self.waiting_queue.queue.queue) + for i in l: + if i["uid"] == uid: + return {"status": "queuing", "msg":""} + return {"status": "not found", "msg":""} async def get_all_result(self): try: @@ -74,7 +78,7 @@ class Server: # 提交任务 self.instance_pool.scale_instance(self.waiting_queue.get_size()+self.running_pool.get_running_size(), disable_shrink=True) for instance in self.instance_pool.instances: - if not instance.active: + if not instance.active and self.waiting_queue.get_size() > 0: # 从等待队列取出任务 uid, video_path, audio_path = self.waiting_queue.dequeue() loguru.logger.info("Task[%s] Submitting to Instance[%s]" % (uid, instance.uuid))