From d1805be4d622ee31eca939b31cfb7fa4a3be972b Mon Sep 17 00:00:00 2001 From: "kyj@bowong.ai" Date: Thu, 17 Apr 2025 17:24:54 +0800 Subject: [PATCH] =?UTF-8?q?FIX=20AutoDL=E4=BF=AE=E5=A4=8D=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=A9=BA=E7=AD=89=E7=A9=BA=E6=8C=87=E9=92=88=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AutoDL/autodl_scheduling/entity/instance_pool.py | 11 +++++++---- AutoDL/autodl_scheduling/entity/waiting_queue.py | 4 ++-- AutoDL/autodl_scheduling/server.py | 10 +++++++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/AutoDL/autodl_scheduling/entity/instance_pool.py b/AutoDL/autodl_scheduling/entity/instance_pool.py index e4554c0..6689f06 100644 --- a/AutoDL/autodl_scheduling/entity/instance_pool.py +++ b/AutoDL/autodl_scheduling/entity/instance_pool.py @@ -84,22 +84,25 @@ class InstancePool: try: before=len(self.instances) instance_copy = copy.deepcopy(self.instances) - flag = False + flag = [] for instance in instance_copy: if instance.active: if (time.time() - instance.last_active_time) > self.timeout: - flag = True + flag.append(instance.uuid) self.intro_threads.append(self.executor.submit(self.remove_instance, instance=instance)) else: if (time.time() - instance.last_active_time) > self.scaledown_window: - flag = True + flag.append(instance.uuid) self.intro_threads.append(self.executor.submit(self.remove_instance, instance=instance)) while len(self.intro_threads) > 0: for t in self.intro_threads: t.result(timeout=self.timeout//2) self.intro_threads.remove(t) after = len(self.instances) - if flag: + for instance in self.instances: + if instance.uuid in flag: + raise Exception("Instance[%s] Remove Failed" % instance.uuid) + if len(flag) > 0: loguru.logger.info("Instance Num Before Introspecting %d After Introspecting %d" % (before, after)) except Exception as e: loguru.logger.error("Fail to Introspect Instances: %s" % e) diff --git a/AutoDL/autodl_scheduling/entity/waiting_queue.py b/AutoDL/autodl_scheduling/entity/waiting_queue.py index 2e7e8e8..3809afa 100644 --- a/AutoDL/autodl_scheduling/entity/waiting_queue.py +++ b/AutoDL/autodl_scheduling/entity/waiting_queue.py @@ -11,10 +11,10 @@ class WaitingQueue: "video_path": video_path, "audio_path": audio_path } - self.queue.put(data) + self.queue.put(data,timeout=10) def dequeue(self): - data = self.queue.get() + data = self.queue.get(timeout=10) return data["uid"], data["video_path"], data["audio_path"] def get_size(self): diff --git a/AutoDL/autodl_scheduling/server.py b/AutoDL/autodl_scheduling/server.py index 4b903fc..a3da9bd 100644 --- a/AutoDL/autodl_scheduling/server.py +++ b/AutoDL/autodl_scheduling/server.py @@ -22,7 +22,7 @@ class Server: self.waiting_queue = WaitingQueue() self.running_pool = RunningPool() #账号限制max_instance不能超过30 - self.instance_pool = InstancePool(max_instance=3) + self.instance_pool = InstancePool(max_instance=2) self.result_map = ResultMap() self.executor = ThreadPoolExecutor(max_workers=2) self.worker_1 = self.executor.submit(self.scaling_worker) @@ -50,7 +50,11 @@ class Server: try: return self.result_map.get(uid) except: - return {"status": "queuing", "msg":""} + l = list(self.waiting_queue.queue.queue) + for i in l: + if i["uid"] == uid: + return {"status": "queuing", "msg":""} + return {"status": "not found", "msg":""} async def get_all_result(self): try: @@ -74,7 +78,7 @@ class Server: # 提交任务 self.instance_pool.scale_instance(self.waiting_queue.get_size()+self.running_pool.get_running_size(), disable_shrink=True) for instance in self.instance_pool.instances: - if not instance.active: + if not instance.active and self.waiting_queue.get_size() > 0: # 从等待队列取出任务 uid, video_path, audio_path = self.waiting_queue.dequeue() loguru.logger.info("Task[%s] Submitting to Instance[%s]" % (uid, instance.uuid))