diff --git a/workflow_service/database/api.py b/workflow_service/database/api.py index 13c3520..54026e9 100644 --- a/workflow_service/database/api.py +++ b/workflow_service/database/api.py @@ -40,16 +40,23 @@ async def save_workflow(name: str, workflow_json: str): async def get_all_workflows() -> List[dict]: """获取所有工作流(最新版本)""" async with AsyncSessionLocal() as session: - # 使用子查询获取每个base_name的最新版本 + # 使用SQLAlchemy ORM语法,通过子查询获取每个base_name的最新版本 + from sqlalchemy import func + + # 子查询:获取每个base_name的最新版本 + latest_versions = ( + select(Workflow.base_name, func.max(Workflow.version).label("max_version")) + .group_by(Workflow.base_name) + .subquery() + ) + + # 主查询:关联获取完整的工作流信息 stmt = ( select(Workflow) - .where( - Workflow.version - == select(Workflow.version) - .where(Workflow.base_name == Workflow.base_name) - .order_by(Workflow.version.desc()) - .limit(1) - .scalar_subquery() + .join( + latest_versions, + (Workflow.base_name == latest_versions.c.base_name) + & (Workflow.version == latest_versions.c.max_version), ) .order_by(Workflow.base_name) ) @@ -256,14 +263,15 @@ async def get_running_workflow_runs() -> List[dict]: return [run.to_dict() for run in runs] -async def get_workflow_runs_recent(start_time: datetime, end_time: datetime) -> List[dict]: +async def get_workflow_runs_recent( + start_time: datetime, end_time: datetime +) -> List[dict]: """获取指定时间范围内的最近工作流运行记录""" async with AsyncSessionLocal() as session: stmt = ( select(WorkflowRun) .where( - WorkflowRun.created_at >= start_time, - WorkflowRun.created_at <= end_time + WorkflowRun.created_at >= start_time, WorkflowRun.created_at <= end_time ) .order_by(WorkflowRun.created_at.desc()) )