|
|
||
|---|---|---|
| .. | ||
| ai_video | ||
| api | ||
| audio_processing | ||
| cli | ||
| database | ||
| models | ||
| scene_detection | ||
| services | ||
| utils | ||
| video_processing | ||
| __init__.py | ||
| build.spec | ||
| build_enhanced.bat | ||
| build_enhanced.spec | ||
| build_simple.spec | ||
| build_test.bat | ||
| config.py | ||
| kv.py | ||
| main.py | ||
| main_simple.py | ||
| mock_services.py | ||
| readme.md | ||
| requirements.txt | ||
| test_functionality.py | ||
readme.md
Python Core 架构设计
🎯 硬性要求
- ✅ 命令行集成 - 将所有功能集成到命令行触发
- ✅ 进度反馈 - 所有命令行功能都基于 JSON RPC Progress 带进度条反馈
- ✅ API迁移 - 所有命令行依赖的功能后期要迁移到API,支持无痛切换
💾 存储设计
- ✅ 多存储支持 - 支持多种存储方式切换
- ✅ 当前实现 - 存储在JSON文件
- ✅ 未来扩展 - 可方便切换为数据库/MongoDB等
🏗️ 整体架构
分层架构设计
┌─────────────────────────────────────────────────────────────┐
│ CLI Layer (命令行层) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ MediaManager │ │ SceneDetection │ │ TemplateManager │ │
│ │ Commander │ │ Commander │ │ Commander │ │
│ │ (进度条) │ │ (进度条) │ │ (进度条) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Service Layer (服务层) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ MediaManager │ │ SceneDetection │ │ TemplateManager │ │
│ │ Service │ │ Service │ │ Service │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Storage Layer (存储层) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ JSON Storage │ │ Database │ │ MongoDB │ │
│ │ (当前) │ │ Storage │ │ Storage │ │
│ │ │ │ (未来) │ │ (未来) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Utils Layer (工具层) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Progress │ │ Logger │ │ Config │ │
│ │ Commander │ │ Utils │ │ Manager │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
核心设计原则
1. 命令行优先 (CLI-First)
- 🎯 所有功能都通过命令行暴露
- 📊 统一的进度反馈机制
- 🔧 标准化的参数处理
2. 服务分离 (Service Separation)
- 🧩 业务逻辑与CLI分离
- 🔌 服务可独立测试和复用
- 📦 清晰的接口定义
3. 存储抽象 (Storage Abstraction)
- 💾 存储接口标准化
- 🔄 多种存储实现
- 🚀 无缝切换能力
4. 进度统一 (Progress Unified)
- 📈 JSON-RPC 2.0 进度协议
- ⏱️ 实时进度反馈
- 📊 统一的进度管理
📁 目录结构设计
python_core/
├── services/ # 服务层
│ ├── media_manager/ # 媒体管理服务
│ │ ├── __init__.py
│ │ ├── types.py # 数据类型
│ │ ├── storage.py # 存储接口
│ │ ├── manager.py # 业务逻辑
│ │ └── cli.py # 命令行接口
│ ├── scene_detection/ # 场景检测服务
│ │ ├── __init__.py
│ │ ├── types.py
│ │ ├── detector.py
│ │ └── cli.py
│ └── template_manager/ # 模板管理服务
│ ├── __init__.py
│ ├── types.py
│ ├── manager.py
│ └── cli.py
├── storage/ # 存储层
│ ├── __init__.py
│ ├── base.py # 存储基类
│ ├── json_storage.py # JSON存储实现
│ ├── database_storage.py # 数据库存储实现
│ └── mongo_storage.py # MongoDB存储实现
├── utils/ # 工具层
│ ├── __init__.py
│ ├── commander/ # 命令行工具
│ │ ├── __init__.py
│ │ └── base.py
│ ├── progress/ # 进度管理
│ │ ├── __init__.py
│ │ ├── commander.py
│ │ └── reporter.py
│ ├── logger.py # 日志工具
│ └── config.py # 配置管理
├── config.py # 全局配置
└── readme.md # 架构文档
🔧 核心组件设计
1. 进度命令基类
class ProgressJSONRPCCommander(ABC):
"""带进度的JSON-RPC命令基类"""
def _is_progressive_command(self, command: str) -> bool:
"""判断是否需要进度报告"""
pass
def _execute_with_progress(self, command: str, args: dict) -> Any:
"""执行带进度的命令"""
pass
def _execute_simple_command(self, command: str, args: dict) -> Any:
"""执行简单命令"""
pass
def create_task(self, name: str, total: int):
"""创建进度任务"""
pass
2. 存储抽象接口
class StorageInterface(ABC):
"""存储接口基类"""
@abstractmethod
def save(self, key: str, data: Any) -> bool:
"""保存数据"""
pass
@abstractmethod
def load(self, key: str) -> Any:
"""加载数据"""
pass
@abstractmethod
def delete(self, key: str) -> bool:
"""删除数据"""
pass
@abstractmethod
def exists(self, key: str) -> bool:
"""检查数据是否存在"""
pass
3. 服务基类
class ServiceBase(ABC):
"""服务基类"""
def __init__(self, storage: StorageInterface):
self.storage = storage
@abstractmethod
def get_service_name(self) -> str:
"""获取服务名称"""
pass
🚀 实现策略
阶段1: 当前实现 (JSON存储)
- ✅ JSON文件存储 - 简单、快速、易调试
- ✅ 进度命令行 - 统一的进度反馈
- ✅ 模块化服务 - 清晰的服务分离
阶段2: 存储扩展
- 🔄 数据库支持 - SQLite/PostgreSQL/MySQL
- 🔄 NoSQL支持 - MongoDB/Redis
- 🔄 云存储支持 - AWS S3/阿里云OSS
阶段3: API化
- 🌐 REST API - 标准HTTP接口
- 📡 WebSocket - 实时进度推送
- 🔌 GraphQL - 灵活的数据查询
阶段4: 微服务化
- 🐳 容器化 - Docker部署
- ⚖️ 负载均衡 - 高可用架构
- 📊 监控告警 - 完整的运维体系
📊 进度协议设计
JSON-RPC 2.0 进度协议
{
"jsonrpc": "2.0",
"method": "progress",
"params": {
"step": "service_name",
"progress": 0.65,
"message": "处理中: 文件名 (3/5)",
"timestamp": 1752242302.647,
"details": {
"current": 3,
"total": 5,
"elapsed_time": 2.5,
"estimated_remaining": 1.2,
"throughput": "1.2 files/sec"
}
}
}
进度状态管理
class ProgressTask:
"""进度任务管理"""
def __init__(self, name: str, total: int):
self.name = name
self.total = total
self.current = 0
self.start_time = time.time()
def update(self, current: int = None, message: str = None):
"""更新进度"""
pass
def finish(self, message: str = "任务完成"):
"""完成任务"""
pass
🔌 API迁移设计
无痛切换策略
1. 接口标准化
# 命令行接口
def execute_command(command: str, args: dict) -> dict:
"""执行命令"""
pass
# API接口 (未来)
@app.post("/api/v1/{service}/{command}")
async def api_execute_command(service: str, command: str, args: dict) -> dict:
"""API执行命令"""
# 直接调用相同的服务逻辑
return service_instance.execute_command(command, args)
2. 进度推送
# 命令行进度 (当前)
def progress_callback(progress_data: dict):
print(json.dumps(progress_data))
# WebSocket进度 (未来)
async def websocket_progress(websocket: WebSocket, progress_data: dict):
await websocket.send_json(progress_data)
3. 配置统一
class Config:
"""统一配置管理"""
# 存储配置
STORAGE_TYPE = "json" # json/database/mongodb
STORAGE_PATH = "data/"
# API配置
API_ENABLED = False
API_HOST = "0.0.0.0"
API_PORT = 8000
# 进度配置
PROGRESS_ENABLED = True
PROGRESS_WEBSOCKET = False
📈 扩展性设计
1. 插件化架构
class PluginManager:
"""插件管理器"""
def register_service(self, service_class: Type[ServiceBase]):
"""注册服务"""
pass
def register_storage(self, storage_class: Type[StorageInterface]):
"""注册存储"""
pass
def register_commander(self, commander_class: Type[ProgressJSONRPCCommander]):
"""注册命令行"""
pass
2. 配置驱动
# config.yaml
services:
media_manager:
enabled: true
storage: json
scene_detection:
enabled: true
storage: json
storage:
json:
path: "./data"
database:
url: "postgresql://user:pass@localhost/db"
mongodb:
url: "mongodb://localhost:27017/db"
api:
enabled: false
host: "0.0.0.0"
port: 8000
3. 监控和日志
class ServiceMonitor:
"""服务监控"""
def track_command_execution(self, service: str, command: str, duration: float):
"""跟踪命令执行"""
pass
def track_progress(self, service: str, progress: float):
"""跟踪进度"""
pass
def track_error(self, service: str, error: Exception):
"""跟踪错误"""
pass
🎯 使用示例
当前使用方式 (命令行)
# 媒体管理
python -m python_core.services.media_manager upload video.mp4
# 场景检测
python -m python_core.services.scene_detection batch_detect /videos
# 模板管理
python -m python_core.services.template_manager import /templates
未来使用方式 (API)
# REST API
curl -X POST http://localhost:8000/api/v1/media_manager/upload \
-H "Content-Type: application/json" \
-d '{"video_path": "video.mp4"}'
# WebSocket进度
ws://localhost:8000/ws/progress
混合使用
# 程序化调用
from python_core.services.media_manager import MediaManagerService
service = MediaManagerService()
result = service.upload_video("video.mp4")
🎉 架构优势
1. 统一性
- 🎯 命令行优先 - 所有功能都可通过CLI访问
- 📊 进度统一 - 统一的进度反馈机制
- 🔧 接口标准 - 标准化的服务接口
2. 可扩展性
- 🔌 插件化 - 易于添加新服务
- 💾 存储灵活 - 支持多种存储后端
- 🌐 API就绪 - 无痛迁移到API
3. 可维护性
- 🧩 模块化 - 清晰的模块分离
- 📝 文档完整 - 详细的架构文档
- 🧪 测试友好 - 易于单元测试
4. 用户友好
- 📊 进度可视 - 实时进度反馈
- 🔧 配置灵活 - 丰富的配置选项
- 📄 输出多样 - 多种输出格式
这个架构设计完全满足您的硬性要求,提供了从当前JSON存储到未来API化的完整迁移路径!
- 文件存储 当前是存储到本地 后续会存储到远程 需要无痛切换