import sqlite3 import json from http.server import HTTPServer, BaseHTTPRequestHandler import urllib.parse # --- 配置 --- DATABASE_FILE = "workflows.sqlite" PORT = 8000 # --- 数据库初始化 --- def init_db(): """初始化数据库,如果表不存在则创建它""" with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS workflows ( name TEXT PRIMARY KEY, workflow_json TEXT NOT NULL ) """) conn.commit() print(f"数据库 '{DATABASE_FILE}' 已准备就绪。") # --- API 处理器 --- class PersistentAPIHandler(BaseHTTPRequestHandler): def _send_cors_headers(self): """发送CORS头部""" self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS') self.send_header("Access-Control-Allow-Headers", "X-Requested-With, Content-Type") def do_OPTIONS(self): """处理预检请求""" self.send_response(200, "ok") self._send_cors_headers() self.end_headers() def do_GET(self): """处理获取所有工作流的请求""" if self.path == '/api/workflow': try: with sqlite3.connect(DATABASE_FILE) as conn: conn.row_factory = sqlite3.Row # 让查询结果可以像字典一样访问 cursor = conn.cursor() cursor.execute("SELECT name, workflow_json FROM workflows") rows = cursor.fetchall() # 将数据库行转换为前端期望的JSON格式 workflows_list = [ {"name": row["name"], "workflow": json.loads(row["workflow_json"])} for row in rows ] self.send_response(200) self.send_header('Content-type', 'application/json') self._send_cors_headers() self.end_headers() self.wfile.write(json.dumps(workflows_list).encode()) except Exception as e: self._send_error(f"获取工作流时出错: {e}") else: self._send_error("路径未找到", 404) def do_POST(self): """处理发布/更新工作流的请求""" if self.path == '/api/workflow': try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data) name = data.get('name') workflow_data = data.get('workflow') if not name or not workflow_data: return self._send_error("请求体中缺少 'name' 或 'workflow'", 400) workflow_json_str = json.dumps(workflow_data) with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() # 使用 INSERT OR REPLACE 来处理创建和更新,非常方便 cursor.execute( "INSERT OR REPLACE INTO workflows (name, workflow_json) VALUES (?, ?)", (name, workflow_json_str) ) conn.commit() print(f"--- 已保存/更新工作流: {name} ---") self.send_response(200) self.send_header('Content-type', 'application/json') self._send_cors_headers() self.end_headers() self.wfile.write(json.dumps({"status": "success", "name": name}).encode()) except Exception as e: self._send_error(f"保存工作流时出错: {e}") else: self._send_error("路径未找到", 404) def do_DELETE(self): """处理删除指定工作流的请求""" parts = self.path.split('/api/workflow/') if len(parts) == 2 and parts[1]: workflow_name_to_delete = urllib.parse.unquote(parts[1]) try: with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM workflows WHERE name = ?", (workflow_name_to_delete,)) conn.commit() # 检查是否真的有行被删除了 if cursor.rowcount > 0: print(f"--- 已删除工作流: {workflow_name_to_delete} ---") self.send_response(200) self.send_header('Content-type', 'application/json') self._send_cors_headers() self.end_headers() self.wfile.write(json.dumps({"status": "deleted", "name": workflow_name_to_delete}).encode()) else: self._send_error("工作流未找到,无法删除", 404) except Exception as e: self._send_error(f"删除工作流时出错: {e}") else: self._send_error("无效的删除路径", 400) def _send_error(self, message, code=500): self.send_response(code) self.send_header('Content-type', 'application/json') self._send_cors_headers() self.end_headers() self.wfile.write(json.dumps({"status": "error", "message": message}).encode()) # --- 主执行函数 --- def run(server_class=HTTPServer, handler_class=PersistentAPIHandler, port=PORT): # 1. 初始化数据库 init_db() # 2. 启动服务器 server_address = ('', port) httpd = server_class(server_address, handler_class) print(f"\n持久化API服务器已在 http://localhost:{port} 上启动...") print(f"数据库文件: ./{DATABASE_FILE}") print(" - POST /api/workflow -> 发布/更新工作流") print(" - GET /api/workflow -> 获取所有工作流") print(" - DELETE /api/workflow/ -> 删除指定工作流") print("\n按 Ctrl+C 停止服务器。") httpd.serve_forever() if __name__ == '__main__': run()