From 591989e82608a43b57fee729d7c5e14ab3b4d608 Mon Sep 17 00:00:00 2001 From: "kyj@bowong.ai" Date: Wed, 30 Jul 2025 18:22:58 +0800 Subject: [PATCH] =?UTF-8?q?perf=20=E4=BC=98=E5=8C=96demo=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- workflow_server_demo.py | 192 +++++++++++++++++++++++++--------------- 1 file changed, 121 insertions(+), 71 deletions(-) diff --git a/workflow_server_demo.py b/workflow_server_demo.py index ba04602..0ffda3e 100644 --- a/workflow_server_demo.py +++ b/workflow_server_demo.py @@ -1,108 +1,158 @@ -from http.server import HTTPServer, BaseHTTPRequestHandler +import sqlite3 import json -import threading -import urllib +from http.server import HTTPServer, BaseHTTPRequestHandler +import urllib.parse -# "数据库" 和线程锁保持不变 -WORKFLOWS_DB = {} -db_lock = threading.Lock() +# --- 配置 --- +DATABASE_FILE = "workflows.sqlite" +PORT = 8000 -class SimpleAPIHandler(BaseHTTPRequestHandler): +# --- 数据库初始化 --- +def init_db(): + """初始化数据库,如果表不存在则创建它""" + with sqlite3.connect(DATABASE_FILE) as conn: + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS workflows ( + name TEXT PRIMARY KEY, + workflow_json TEXT NOT NULL + ) + """) + conn.commit() + print(f"数据库 '{DATABASE_FILE}' 已准备就绪。") + + +# --- API 处理器 --- +class PersistentAPIHandler(BaseHTTPRequestHandler): def _send_cors_headers(self): + """发送CORS头部""" self.send_header('Access-Control-Allow-Origin', '*') - self.send_header('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS') # 添加 DELETE + self.send_header('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS') self.send_header("Access-Control-Allow-Headers", "X-Requested-With, Content-Type") def do_OPTIONS(self): + """处理预检请求""" self.send_response(200, "ok") self._send_cors_headers() self.end_headers() def do_GET(self): - # ... (do_GET 保持不变) ... + """处理获取所有工作流的请求""" if self.path == '/api/workflow': - self.send_response(200) - self.send_header('Content-type', 'application/json') - self._send_cors_headers() - self.end_headers() - with db_lock: - response_data = list(WORKFLOWS_DB.values()) - self.wfile.write(json.dumps(response_data).encode()) - else: - self.send_response(404) - self.end_headers() - self.wfile.write(b'Not Found') - - def do_POST(self): - # ... (do_POST 保持不变) ... - if self.path == '/api/workflow': - content_length = int(self.headers['Content-Length']) - post_data = self.rfile.read(content_length) try: - data = json.loads(post_data) - name = data.get('name') - workflow_data = data.get('workflow') - if not name or not workflow_data: - raise ValueError("Missing 'name' or 'workflow' in request body") - with db_lock: - WORKFLOWS_DB[name] = {"name": name, "workflow": workflow_data} - print(f"--- Received/Updated workflow: {name} ---") + with sqlite3.connect(DATABASE_FILE) as conn: + conn.row_factory = sqlite3.Row # 让查询结果可以像字典一样访问 + cursor = conn.cursor() + cursor.execute("SELECT name, workflow_json FROM workflows") + rows = cursor.fetchall() + + # 将数据库行转换为前端期望的JSON格式 + workflows_list = [ + {"name": row["name"], "workflow": json.loads(row["workflow_json"])} + for row in rows + ] + self.send_response(200) self.send_header('Content-type', 'application/json') self._send_cors_headers() self.end_headers() - self.wfile.write(json.dumps({"status": "received", "name": name}).encode()) - except Exception as e: - self.send_response(400); - self.end_headers(); - self.wfile.write(f'Bad Request: {e}'.encode()) - else: - self.send_response(404); - self.end_headers(); - self.wfile.write(b'Not Found') + self.wfile.write(json.dumps(workflows_list).encode()) + + except Exception as e: + self._send_error(f"获取工作流时出错: {e}") + else: + self._send_error("路径未找到", 404) + + def do_POST(self): + """处理发布/更新工作流的请求""" + if self.path == '/api/workflow': + try: + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + data = json.loads(post_data) + + name = data.get('name') + workflow_data = data.get('workflow') + + if not name or not workflow_data: + return self._send_error("请求体中缺少 'name' 或 'workflow'", 400) + + workflow_json_str = json.dumps(workflow_data) + + with sqlite3.connect(DATABASE_FILE) as conn: + cursor = conn.cursor() + # 使用 INSERT OR REPLACE 来处理创建和更新,非常方便 + cursor.execute( + "INSERT OR REPLACE INTO workflows (name, workflow_json) VALUES (?, ?)", + (name, workflow_json_str) + ) + conn.commit() + + print(f"--- 已保存/更新工作流: {name} ---") + self.send_response(200) + self.send_header('Content-type', 'application/json') + self._send_cors_headers() + self.end_headers() + self.wfile.write(json.dumps({"status": "success", "name": name}).encode()) + + except Exception as e: + self._send_error(f"保存工作流时出错: {e}") + else: + self._send_error("路径未找到", 404) - # --- [核心新增] 添加 do_DELETE 方法 --- def do_DELETE(self): """处理删除指定工作流的请求""" - # 我们期望的路径格式: /api/workflow/工作流名称[版本号] parts = self.path.split('/api/workflow/') if len(parts) == 2 and parts[1]: - # URL解码工作流名称,因为它可能包含空格等特殊字符 workflow_name_to_delete = urllib.parse.unquote(parts[1]) + try: + with sqlite3.connect(DATABASE_FILE) as conn: + cursor = conn.cursor() + cursor.execute("DELETE FROM workflows WHERE name = ?", (workflow_name_to_delete,)) + conn.commit() - with db_lock: - if workflow_name_to_delete in WORKFLOWS_DB: - del WORKFLOWS_DB[workflow_name_to_delete] - print(f"--- Deleted workflow: {workflow_name_to_delete} ---") - self.send_response(200) - self.send_header('Content-type', 'application/json') - self._send_cors_headers() - self.end_headers() - self.wfile.write(json.dumps({"status": "deleted", "name": workflow_name_to_delete}).encode()) - else: - # 如果找不到要删除的工作流 - self.send_response(404) - self._send_cors_headers() - self.end_headers() - self.wfile.write(json.dumps({"status": "error", "message": "Workflow not found"}).encode()) + # 检查是否真的有行被删除了 + if cursor.rowcount > 0: + print(f"--- 已删除工作流: {workflow_name_to_delete} ---") + self.send_response(200) + self.send_header('Content-type', 'application/json') + self._send_cors_headers() + self.end_headers() + self.wfile.write(json.dumps({"status": "deleted", "name": workflow_name_to_delete}).encode()) + else: + self._send_error("工作流未找到,无法删除", 404) + + except Exception as e: + self._send_error(f"删除工作流时出错: {e}") else: - self.send_response(400) - self._send_cors_headers() - self.end_headers() - self.wfile.write(b'Bad Request: Invalid delete path') + self._send_error("无效的删除路径", 400) + + def _send_error(self, message, code=500): + self.send_response(code) + self.send_header('Content-type', 'application/json') + self._send_cors_headers() + self.end_headers() + self.wfile.write(json.dumps({"status": "error", "message": message}).encode()) -def run(server_class=HTTPServer, handler_class=SimpleAPIHandler, port=8000): +# --- 主执行函数 --- +def run(server_class=HTTPServer, handler_class=PersistentAPIHandler, port=PORT): + # 1. 初始化数据库 + init_db() + + # 2. 启动服务器 server_address = ('', port) httpd = server_class(server_address, handler_class) - print(f"测试API服务器已在 http://localhost:{port} 上启动...") - print(" - POST /api/workflow: 发布新工作流") - print(" - GET /api/workflow: 获取所有工作流") - print(" - DELETE /api/workflow/: 删除指定工作流") + print(f"\n持久化API服务器已在 http://localhost:{port} 上启动...") + print(f"数据库文件: ./{DATABASE_FILE}") + print(" - POST /api/workflow -> 发布/更新工作流") + print(" - GET /api/workflow -> 获取所有工作流") + print(" - DELETE /api/workflow/ -> 删除指定工作流") + print("\n按 Ctrl+C 停止服务器。") httpd.serve_forever() if __name__ == '__main__': - run() + run() \ No newline at end of file