feat: 添加详细的AI分类日志系统

日志增强:
- Gemini API访问令牌获取详细日志
- 视频上传过程完整日志记录
- AI内容分析请求和响应日志
- 视频分类服务处理流程日志
- 任务队列处理状态详细跟踪

 日志内容:
- URL请求地址和参数
- HTTP状态码和响应头
- 请求和响应体内容
- 处理耗时统计
- 错误详细信息和堆栈

 问题诊断:
- 便于分析404 Not Found错误原因
- 跟踪API调用完整流程
- 监控性能瓶颈
- 调试响应解析问题

这些日志将帮助快速定位和解决AI分类功能的问题。
This commit is contained in:
imeepos 2025-07-14 13:06:35 +08:00
parent a743bedd98
commit b0ea168db8
3 changed files with 202 additions and 42 deletions

View File

@ -237,41 +237,55 @@ impl VideoClassificationQueue {
/// 处理下一个任务
async fn process_next_task(&self) -> Result<Option<String>> {
println!("🔍 查找待处理任务...");
// 获取待处理任务
let pending_tasks = self.service.get_pending_tasks(Some(1)).await?;
if let Some(task) = pending_tasks.first() {
let task_id = task.id.clone();
println!("📋 找到待处理任务: {}", task_id);
println!("📁 任务视频文件: {}", task.video_file_path);
println!("🔢 任务优先级: {}", task.priority);
// 设置当前任务
{
let mut current_task = self.current_task.lock().await;
*current_task = Some(task_id.clone());
println!("🎯 设置当前任务: {}", task_id);
}
// 更新任务进度
println!("📊 更新任务进度: 开始处理");
self.update_task_progress(&task_id, TaskStatus::Uploading, 10.0, "开始处理").await;
let task_start_time = std::time::Instant::now();
// 处理任务
println!("🚀 开始处理分类任务: {}", task_id);
match self.service.process_classification_task(&task_id).await {
Ok(_) => {
let task_duration = task_start_time.elapsed();
self.update_task_progress(&task_id, TaskStatus::Completed, 100.0, "处理完成").await;
println!("任务处理成功: {}", task_id);
println!("任务处理成功: {} (耗时: {:?})", task_id, task_duration);
}
Err(e) => {
let task_duration = task_start_time.elapsed();
self.update_task_progress_with_error(&task_id, TaskStatus::Failed, e.to_string()).await;
println!("任务处理失败: {} - {}", task_id, e);
println!("任务处理失败: {} - {} (耗时: {:?})", task_id, e, task_duration);
}
}
// 清除当前任务
{
let mut current_task = self.current_task.lock().await;
*current_task = None;
println!("🔄 清除当前任务");
}
Ok(Some(task_id))
} else {
println!("⏸️ 没有待处理任务,等待中...");
Ok(None)
}
}

View File

@ -121,19 +121,56 @@ impl VideoClassificationService {
/// 使用Gemini进行视频分类
async fn classify_video_with_gemini(&self, task: &mut VideoClassificationTask, prompt: &str) -> Result<VideoClassificationRecord> {
println!("🎯 开始使用Gemini进行视频分类");
println!("📋 任务ID: {}", task.id);
println!("📁 视频文件: {}", task.video_file_path);
let mut gemini_service = self.gemini_service.lock().await;
// 调用Gemini API进行分类
let (file_uri, raw_response) = gemini_service.classify_video(&task.video_file_path, prompt).await?;
println!("🚀 调用Gemini API进行分类...");
let classification_start = std::time::Instant::now();
let (file_uri, raw_response) = match gemini_service.classify_video(&task.video_file_path, prompt).await {
Ok(result) => {
println!("✅ Gemini API调用成功");
result
}
Err(e) => {
println!("❌ Gemini API调用失败: {}", e);
return Err(e);
}
};
let classification_duration = classification_start.elapsed();
println!("⏱️ Gemini分类耗时: {:?}", classification_duration);
// 更新任务状态
println!("📝 更新任务状态为分析中...");
task.set_analyzing(file_uri.clone(), prompt.to_string());
self.video_repo.update_classification_task(task).await?;
// 解析Gemini响应
let gemini_response = self.parse_gemini_response(&raw_response)?;
println!("🔍 解析Gemini响应...");
println!("📄 原始响应长度: {} 字符", raw_response.len());
println!("📄 原始响应内容: {}", &raw_response[..std::cmp::min(500, raw_response.len())]);
let gemini_response = match self.parse_gemini_response(&raw_response) {
Ok(response) => {
println!("✅ 响应解析成功");
println!("🏷️ 分类结果: {}", response.category);
println!("📊 置信度: {:.2}", response.confidence);
println!("⭐ 质量评分: {:.2}", response.quality_score);
response
}
Err(e) => {
println!("❌ 响应解析失败: {}", e);
return Err(e);
}
};
// 创建分类记录
println!("💾 创建分类记录...");
let mut record = VideoClassificationRecord::new(
task.segment_id.clone(),
task.material_id.clone(),
@ -145,32 +182,53 @@ impl VideoClassificationService {
// 检查是否需要人工审核
if record.needs_review() {
println!("⚠️ 分类结果需要人工审核 (置信度: {:.2}, 质量: {:.2})", record.confidence, record.quality_score);
record.mark_as_needs_review("置信度或质量评分较低,建议人工审核".to_string());
} else {
println!("✅ 分类结果质量良好,无需人工审核");
}
// 保存分类记录
println!("💾 保存分类记录到数据库...");
let saved_record = self.video_repo.create_classification_record(record).await?;
println!("✅ 分类记录保存成功记录ID: {}", saved_record.id);
Ok(saved_record)
}
/// 解析Gemini响应为结构化数据
fn parse_gemini_response(&self, raw_response: &str) -> Result<GeminiClassificationResponse> {
println!("🔍 开始解析Gemini响应...");
// 尝试从响应中提取JSON
let json_start = raw_response.find('{');
let json_end = raw_response.rfind('}');
println!("📍 JSON位置: start={:?}, end={:?}", json_start, json_end);
if let (Some(start), Some(end)) = (json_start, json_end) {
let json_str = &raw_response[start..=end];
println!("📄 提取的JSON字符串: {}", json_str);
match serde_json::from_str::<GeminiClassificationResponse>(json_str) {
Ok(response) => Ok(response),
Err(_) => {
Ok(response) => {
println!("✅ JSON解析成功");
println!("🏷️ 解析结果 - 分类: {}", response.category);
println!("📊 解析结果 - 置信度: {:.2}", response.confidence);
println!("💭 解析结果 - 理由: {}", response.reasoning);
println!("🔍 解析结果 - 特征: {:?}", response.features);
println!("🎯 解析结果 - 商品匹配: {}", response.product_match);
println!("⭐ 解析结果 - 质量评分: {:.2}", response.quality_score);
Ok(response)
}
Err(e) => {
println!("❌ JSON解析失败: {}", e);
println!("🔄 使用默认分类响应");
// 如果解析失败,创建一个默认响应
Ok(GeminiClassificationResponse {
category: "未分类".to_string(),
confidence: 0.5,
reasoning: "AI响应解析失败使用默认分类".to_string(),
reasoning: format!("AI响应解析失败: {} - 原始JSON: {}", e, json_str),
features: vec!["解析失败".to_string()],
product_match: false,
quality_score: 0.5,
@ -178,11 +236,13 @@ impl VideoClassificationService {
}
}
} else {
println!("❌ 未找到JSON格式");
println!("🔄 使用默认分类响应");
// 没有找到JSON格式创建默认响应
Ok(GeminiClassificationResponse {
category: "未分类".to_string(),
confidence: 0.3,
reasoning: format!("AI响应格式异常: {}", raw_response),
reasoning: format!("AI响应格式异常未找到JSON: {}", &raw_response[..std::cmp::min(200, raw_response.len())]),
features: vec!["格式异常".to_string()],
product_match: false,
quality_score: 0.3,

View File

@ -128,6 +128,8 @@ impl GeminiService {
/// 获取Google访问令牌
async fn get_access_token(&mut self) -> Result<String> {
println!("🔐 开始获取访问令牌...");
// 检查缓存的令牌是否仍然有效
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)?
@ -135,24 +137,46 @@ impl GeminiService {
if let (Some(token), Some(expires_at)) = (&self.access_token, self.token_expires_at) {
if current_time < expires_at - 300 { // 提前5分钟刷新
println!("✅ 使用缓存的访问令牌 (剩余时间: {}秒)", expires_at - current_time);
return Ok(token.clone());
} else {
println!("⏰ 缓存的令牌即将过期,需要刷新");
}
} else {
println!("🆕 首次获取访问令牌");
}
// 获取新的访问令牌
let url = format!("{}/google/auth/token", self.config.base_url);
let url = format!("{}/google/access-token", self.config.base_url);
println!("📡 请求URL: {}", url);
println!("🔑 Bearer Token: {}", self.config.bearer_token);
let response = self.client
.post(&url)
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.bearer_token))
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow!("获取访问令牌失败: {}", response.status()));
let status = response.status();
let headers = response.headers().clone();
println!("📥 响应状态: {}", status);
println!("📋 响应头: {:?}", headers);
if !status.is_success() {
let error_body = response.text().await.unwrap_or_default();
println!("❌ 错误响应体: {}", error_body);
return Err(anyhow!("获取访问令牌失败: {} - {}", status, error_body));
}
let token_response: TokenResponse = response.json().await?;
let response_text = response.text().await?;
println!("📄 响应内容: {}", response_text);
let token_response: TokenResponse = serde_json::from_str(&response_text)
.map_err(|e| anyhow!("解析令牌响应失败: {} - 响应内容: {}", e, response_text))?;
println!("✅ 成功获取访问令牌,有效期: {}", token_response.expires_in);
// 缓存令牌
self.access_token = Some(token_response.access_token.clone());
self.token_expires_at = Some(current_time + token_response.expires_in);
@ -162,17 +186,27 @@ impl GeminiService {
/// 上传视频文件到Gemini
pub async fn upload_video_file(&mut self, video_path: &str) -> Result<String> {
println!("📤 开始上传视频文件: {}", video_path);
// 获取访问令牌
let access_token = self.get_access_token().await?;
// 读取视频文件
let video_data = fs::read(video_path).await?;
println!("📁 读取视频文件...");
let video_data = fs::read(video_path).await
.map_err(|e| anyhow!("读取视频文件失败: {} - {}", video_path, e))?;
let file_size = video_data.len();
println!("📊 视频文件大小: {} bytes ({:.2} MB)", file_size, file_size as f64 / 1024.0 / 1024.0);
let file_name = Path::new(video_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("video.mp4");
println!("📝 文件名: {}", file_name);
// 创建multipart表单
println!("📦 创建multipart表单...");
let form = multipart::Form::new()
.part("file", multipart::Part::bytes(video_data)
.file_name(file_name.to_string())
@ -180,35 +214,58 @@ impl GeminiService {
// 上传到Vertex AI
let upload_url = format!("{}/google/vertex-ai/upload", self.config.base_url);
let query_params = [
("bucket", "dy-media-storage"),
("prefix", "video-analysis")
];
println!("📡 上传URL: {}", upload_url);
println!("🔍 查询参数: {:?}", query_params);
println!("🔑 Authorization: Bearer {}", &access_token[..std::cmp::min(20, access_token.len())]);
let response = self.client
.post(&upload_url)
.header("Authorization", format!("Bearer {}", access_token))
.header("x-google-api-key", &access_token)
.query(&[
("bucket", "dy-media-storage"),
("prefix", "video-analysis")
])
.query(&query_params)
.multipart(form)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let status = response.status();
let headers = response.headers().clone();
println!("📥 上传响应状态: {}", status);
println!("📋 上传响应头: {:?}", headers);
if !status.is_success() {
let error_text = response.text().await.unwrap_or_default();
println!("❌ 上传失败响应体: {}", error_text);
return Err(anyhow!("上传视频失败: {} - {}", status, error_text));
}
let upload_response: UploadResponse = response.json().await?;
let response_text = response.text().await?;
println!("📄 上传成功响应: {}", response_text);
let upload_response: UploadResponse = serde_json::from_str(&response_text)
.map_err(|e| anyhow!("解析上传响应失败: {} - 响应内容: {}", e, response_text))?;
println!("✅ 视频上传成功文件URI: {}", upload_response.file_uri);
Ok(upload_response.file_uri)
}
/// 生成内容分析
pub async fn generate_content_analysis(&mut self, file_uri: &str, prompt: &str) -> Result<String> {
println!("🧠 开始生成内容分析...");
println!("📁 文件URI: {}", file_uri);
println!("💬 提示词长度: {} 字符", prompt.len());
// 获取访问令牌
let access_token = self.get_access_token().await?;
// 格式化GCS URI
let formatted_uri = self.format_gcs_uri(file_uri);
println!("🔗 格式化后的URI: {}", formatted_uri);
// 准备请求数据
let request_data = GenerateContentRequest {
@ -232,8 +289,12 @@ impl GeminiService {
},
};
println!("📦 请求数据: {}", serde_json::to_string_pretty(&request_data).unwrap_or_default());
// 发送生成请求
let generate_url = format!("{}/google/vertex-ai/generate", self.config.base_url);
println!("📡 生成URL: {}", generate_url);
let response = self.client
.post(&generate_url)
.header("Authorization", format!("Bearer {}", access_token))
@ -243,20 +304,32 @@ impl GeminiService {
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let status = response.status();
let headers = response.headers().clone();
println!("📥 生成响应状态: {}", status);
println!("📋 生成响应头: {:?}", headers);
if !status.is_success() {
let error_text = response.text().await.unwrap_or_default();
println!("❌ 生成失败响应体: {}", error_text);
return Err(anyhow!("生成内容分析失败: {} - {}", status, error_text));
}
let gemini_response: GeminiResponse = response.json().await?;
let response_text = response.text().await?;
println!("📄 生成成功响应: {}", response_text);
let gemini_response: GeminiResponse = serde_json::from_str(&response_text)
.map_err(|e| anyhow!("解析生成响应失败: {} - 响应内容: {}", e, response_text))?;
if let Some(candidate) = gemini_response.candidates.first() {
if let Some(part) = candidate.content.parts.first() {
println!("✅ 内容分析完成,响应长度: {} 字符", part.text.len());
return Ok(part.text.clone());
}
}
println!("❌ Gemini响应格式无效: {:?}", gemini_response);
Err(anyhow!("Gemini响应格式无效"))
}
@ -275,15 +348,28 @@ impl GeminiService {
/// 完整的视频分类流程
pub async fn classify_video(&mut self, video_path: &str, prompt: &str) -> Result<(String, String)> {
println!("🎬 开始完整的视频分类流程");
println!("📁 视频路径: {}", video_path);
println!("💬 提示词预览: {}...", &prompt[..std::cmp::min(100, prompt.len())]);
let start_time = std::time::Instant::now();
// 1. 上传视频
println!("正在上传视频到Gemini: {}", video_path);
println!("📤 步骤1: 上传视频到Gemini");
let upload_start = std::time::Instant::now();
let file_uri = self.upload_video_file(video_path).await?;
println!("视频上传成功URI: {}", file_uri);
let upload_duration = upload_start.elapsed();
println!("✅ 视频上传成功,耗时: {:?}, URI: {}", upload_duration, file_uri);
// 2. 生成分析
println!("正在进行AI分析...");
println!("🧠 步骤2: 进行AI分析");
let analysis_start = std::time::Instant::now();
let analysis_result = self.generate_content_analysis(&file_uri, prompt).await?;
println!("AI分析完成");
let analysis_duration = analysis_start.elapsed();
println!("✅ AI分析完成耗时: {:?}", analysis_duration);
let total_duration = start_time.elapsed();
println!("🎉 视频分类流程完成,总耗时: {:?}", total_duration);
Ok((file_uri, analysis_result))
}