mixvideo-v2/apps/desktop/src-tauri/src/business/services/volcano_video_service.rs

856 lines
33 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use anyhow::{anyhow, Result};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{error, info, warn};
use uuid::Uuid;
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use sha2::{Digest, Sha256};
use hex;
use url::Url;
use crate::data::models::video_generation_record::{
VideoGenerationRecord, CreateVideoGenerationRequest, VideoGenerationQuery
};
use crate::data::repositories::video_generation_record_repository::VideoGenerationRecordRepository;
use crate::infrastructure::database::Database;
use crate::business::services::cloud_upload_service::CloudUploadService;
/// 火山云视频生成API响应提交任务
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoVideoGenerationResponse {
pub code: i32,
pub message: String,
pub data: Option<VolcanoVideoGenerationData>,
pub request_id: String,
pub status: i32,
pub time_elapsed: String,
}
/// 提交任务返回数据
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoVideoGenerationData {
pub task_id: String,
}
/// 火山云查询任务API响应
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoVideoQueryResponse {
pub code: i32,
pub message: String,
pub data: Option<VolcanoVideoQueryData>,
pub request_id: String,
pub status: i32,
pub time_elapsed: String,
}
/// 查询任务返回数据
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoVideoQueryData {
pub status: String, // in_queue, generating, done, not_found, expired
pub video_url: Option<String>,
pub resp_data: Option<String>, // JSON字符串包含详细信息
}
/// 驱动视频信息
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DrivingVideoInfo {
/// 驱动视频存储类型传固定值0
pub store_type: i32,
/// 视频url
pub video_url: String,
}
/// 火山云视频生成请求参数(提交任务)
/// 基于火山云实际API文档的参数
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoVideoGenerationRequest {
/// 服务标识,固定值
pub req_key: String,
/// 输入图片URL必需
pub image_url: String,
/// 驱动视频信息
pub driving_video_info: DrivingVideoInfo,
}
/// 火山云查询任务请求参数
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoVideoQueryRequest {
/// 服务标识,固定值
pub req_key: String,
/// 任务ID
pub task_id: String,
}
/// OmniHuman 主体识别请求参数
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealmanAvatarPictureCreateRoleOmniRequest {
/// 服务标识,固定值
pub req_key: String,
/// 人像图片URL链接
pub image_url: String,
}
/// 火山云 API 响应元数据
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolcanoResponseMetadata {
#[serde(rename = "Action")]
pub action: String,
#[serde(rename = "Region")]
pub region: String,
#[serde(rename = "RequestId")]
pub request_id: String,
#[serde(rename = "Service")]
pub service: String,
#[serde(rename = "Version")]
pub version: String,
}
/// OmniHuman 主体识别响应 (火山云格式)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealmanAvatarPictureCreateRoleOmniResponse {
#[serde(rename = "ResponseMetadata")]
pub response_metadata: VolcanoResponseMetadata,
#[serde(rename = "Result")]
pub result: RealmanAvatarPictureCreateRoleOmniResult,
}
/// OmniHuman 主体识别结果
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealmanAvatarPictureCreateRoleOmniResult {
pub code: i32,
pub message: String,
pub data: Option<RealmanAvatarPictureCreateRoleOmniData>,
pub request_id: String,
pub status: i32,
pub time_elapsed: String,
}
/// OmniHuman 主体识别返回数据
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealmanAvatarPictureCreateRoleOmniData {
/// 输出处理过的图片url数组单张图
pub image_urls: Option<Vec<String>>,
/// 返回图base64数组
pub binary_data_base64: Option<Vec<String>>,
/// 任务ID
pub task_id: String,
/// 算法返回数据
pub resp_data: Option<String>,
/// 任务状态
pub status: Option<String>,
/// 算法返回信息
pub algorithm_base_resp: Option<serde_json::Value>,
}
/// 火山云视频生成服务
/// 遵循 Tauri 开发规范的服务层设计原则
pub struct VolcanoVideoService {
database: Arc<Database>,
repository: VideoGenerationRecordRepository,
http_client: Client,
cloud_upload_service: CloudUploadService,
vol_access_key: String,
vol_secret_key: String,
}
impl VolcanoVideoService {
/// 创建新的火山云视频生成服务实例
pub fn new(database: Arc<Database>) -> Self {
let repository = VideoGenerationRecordRepository::new(database.clone());
let http_client = Client::new();
// 从环境变量或配置中获取火山云密钥
let vol_access_key = std::env::var("VOL_ACCESS_KEY_ID")
.unwrap_or_else(|_| "AKLTZmEwYzNlZDI4NDI2NDUyNDg1ZTQ5NzVlNmQ4OGNkMDY".to_string());
let vol_secret_key = std::env::var("VOL_ACCESS_SECRET_KEY")
.unwrap_or_else(|_| "T1RRd1l6UTROV05pTUdRMU5EWTRNV0V4TldGak9ESmhZbVF4TnpCalpHTQ==".to_string());
Self {
database,
repository,
http_client,
cloud_upload_service: CloudUploadService::new(),
vol_access_key,
vol_secret_key,
}
}
/// 创建视频生成任务
pub async fn create_video_generation(&self, request: CreateVideoGenerationRequest) -> Result<VideoGenerationRecord> {
info!("创建视频生成任务: {}", request.name);
// 验证输入参数
if request.image_url.is_none() {
return Err(anyhow!("图片URL不能为空"));
}
// 创建记录
let id = Uuid::new_v4().to_string();
let mut record = VideoGenerationRecord::new(
id,
request.name,
request.image_url,
request.audio_url,
request.prompt,
);
// 设置可选参数
if let Some(desc) = request.description {
record.description = Some(desc);
}
if let Some(negative_prompt) = request.negative_prompt {
record.negative_prompt = Some(negative_prompt);
}
if let Some(duration) = request.duration {
record.duration = duration;
}
if let Some(fps) = request.fps {
record.fps = fps;
}
if let Some(resolution) = request.resolution {
record.resolution = resolution;
}
if let Some(style) = request.style {
record.style = Some(style);
}
if let Some(motion_strength) = request.motion_strength {
record.motion_strength = motion_strength;
}
// 保存到数据库
self.repository.create(&record).await?;
// 异步启动视频生成任务
let service_clone = self.clone();
let record_id = record.id.clone();
tokio::spawn(async move {
if let Err(e) = service_clone.process_video_generation(record_id).await {
error!("视频生成任务处理失败: {}", e);
}
});
Ok(record)
}
/// 处理视频生成任务
async fn process_video_generation(&self, record_id: String) -> Result<()> {
info!("开始处理视频生成任务: {}", record_id);
// 获取记录
let mut record = self.repository.get_by_id(&record_id).await?
.ok_or_else(|| anyhow!("找不到视频生成记录: {}", record_id))?;
// 标记为处理中
record.mark_as_processing(None);
self.repository.update(&record).await?;
// 调用火山云API
match self.call_volcano_api(&record).await {
Ok(response) => {
if let Some(data) = response.data {
// 更新任务ID
record.vol_task_id = Some(data.task_id.clone());
record.vol_request_id = Some(response.request_id);
self.repository.update(&record).await?;
// 轮询任务状态
self.poll_task_status(record_id, data.task_id).await?;
} else {
record.mark_as_failed("火山云API返回数据为空".to_string(), Some("EMPTY_RESPONSE".to_string()));
self.repository.update(&record).await?;
}
}
Err(e) => {
error!("调用火山云API失败: {}", e);
record.mark_as_failed(e.to_string(), Some("API_ERROR".to_string()));
self.repository.update(&record).await?;
}
}
Ok(())
}
/// 调用火山云视频生成API
async fn call_volcano_api(&self, record: &VideoGenerationRecord) -> Result<VolcanoVideoGenerationResponse> {
// 获取本地文件路径
let image_path = record.image_url.as_ref()
.ok_or_else(|| anyhow!("图片路径不能为空"))?;
let driving_video_path = record.audio_url.as_ref()
.ok_or_else(|| anyhow!("驱动视频路径不能为空,请上传驱动视频文件"))?;
// 上传图片到云端
info!("正在上传图片到云端: {}", image_path);
let image_upload_result = self.cloud_upload_service
.upload_file(image_path, None, None)
.await?;
if !image_upload_result.success {
return Err(anyhow!("图片上传失败: {}",
image_upload_result.error_message.unwrap_or_default()));
}
let image_s3_url = image_upload_result.remote_url
.ok_or_else(|| anyhow!("图片上传成功但未返回URL"))?;
// 将S3 URL转换为HTTPS CDN URL
let image_url = Self::convert_s3_to_cdn_url(&image_s3_url);
info!("图片S3 URL: {} -> CDN URL: {}", image_s3_url, image_url);
// 上传驱动视频到云端
info!("正在上传驱动视频到云端: {}", driving_video_path);
let video_upload_result = self.cloud_upload_service
.upload_file(driving_video_path, None, None)
.await?;
if !video_upload_result.success {
return Err(anyhow!("驱动视频上传失败: {}",
video_upload_result.error_message.unwrap_or_default()));
}
let driving_video_s3_url = video_upload_result.remote_url
.ok_or_else(|| anyhow!("驱动视频上传成功但未返回URL"))?;
// 将S3 URL转换为HTTPS CDN URL
let driving_video_url = Self::convert_s3_to_cdn_url(&driving_video_s3_url);
info!("驱动视频S3 URL: {} -> CDN URL: {}", driving_video_s3_url, driving_video_url);
info!("文件上传完成 - 图片: {}, 驱动视频: {}", image_url, driving_video_url);
let request_body = VolcanoVideoGenerationRequest {
req_key: "realman_avatar_imitator_v2v_gen_video".to_string(),
image_url,
driving_video_info: DrivingVideoInfo {
store_type: 0, // 固定值
video_url: driving_video_url,
},
};
info!("调用火山云视频生成API: {:?}", request_body);
// 实际的火山云API调用 - 提交任务
let api_url = "https://visual.volcengineapi.com?Action=CVSubmitTask&Version=2022-08-31";
// 构建认证头
let now = Utc::now();
let timestamp = now.format("%Y%m%dT%H%M%SZ").to_string();
let date = now.format("%Y%m%d").to_string();
let auth_header = self.build_auth_header("POST", api_url, &timestamp, &date, &request_body)?;
let response = self.http_client
.post(api_url)
.header("Authorization", auth_header)
.header("Content-Type", "application/json")
.header("X-Date", timestamp)
.json(&request_body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(anyhow!("火山云API调用失败: {} - {}", status, error_text));
}
let api_response: VolcanoVideoGenerationResponse = response.json().await?;
if api_response.code != 10000 {
return Err(anyhow!("火山云API返回错误: {} - {}", api_response.code, api_response.message));
}
Ok(api_response)
}
/// 轮询任务状态
async fn poll_task_status(&self, record_id: String, task_id: String) -> Result<()> {
info!("开始轮询任务状态: {} - {}", record_id, task_id);
let mut attempts = 0;
let max_attempts = 60; // 最多轮询60次约10分钟
let mut consecutive_failures = 0;
let max_consecutive_failures = 3; // 连续失败3次就标记为失败
while attempts < max_attempts {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
attempts += 1;
match self.check_task_status(&task_id).await {
Ok(status_response) => {
// 重置连续失败计数器
consecutive_failures = 0;
if let Some(data) = status_response.data {
let mut record = self.repository.get_by_id(&record_id).await?
.ok_or_else(|| anyhow!("找不到视频生成记录: {}", record_id))?;
match data.status.as_str() {
"done" => {
if let Some(video_url) = data.video_url {
// 下载视频并上传到云端
match self.download_and_upload_video(&video_url, &record_id).await {
Ok(cdn_url) => {
record.mark_as_completed(cdn_url, None); // 使用CDN URL
self.repository.update(&record).await?;
info!("视频生成任务完成已上传到CDN: {}", record_id);
return Ok(());
}
Err(e) => {
warn!("视频上传到CDN失败使用原始URL: {} - {}", record_id, e);
record.mark_as_completed(video_url, None); // 使用原始URL作为fallback
self.repository.update(&record).await?;
info!("视频生成任务完成使用原始URL: {}", record_id);
return Ok(());
}
}
} else {
record.mark_as_failed("视频URL为空".to_string(), Some("EMPTY_VIDEO_URL".to_string()));
self.repository.update(&record).await?;
return Err(anyhow!("视频生成完成但视频URL为空"));
}
}
"not_found" => {
record.mark_as_failed("任务未找到".to_string(), Some("TASK_NOT_FOUND".to_string()));
self.repository.update(&record).await?;
return Err(anyhow!("火山云任务未找到"));
}
"expired" => {
record.mark_as_failed("任务已过期".to_string(), Some("TASK_EXPIRED".to_string()));
self.repository.update(&record).await?;
return Err(anyhow!("火山云任务已过期"));
}
"in_queue" | "generating" => {
// 任务还在处理中,继续轮询
info!("任务状态: {} - {}", data.status, record_id);
continue;
}
_ => {
warn!("未知的任务状态: {}", data.status);
continue;
}
}
}
}
Err(e) => {
warn!("检查任务状态失败: {}", e);
consecutive_failures += 1;
// 如果连续失败次数达到上限,标记任务为失败
if consecutive_failures >= max_consecutive_failures {
error!("连续{}次检查任务状态失败,标记任务为失败: {}", max_consecutive_failures, e);
let mut record = self.repository.get_by_id(&record_id).await?
.ok_or_else(|| anyhow!("找不到视频生成记录: {}", record_id))?;
record.mark_as_failed(
format!("连续{}次API调用失败: {}", max_consecutive_failures, e),
Some("API_CONSECUTIVE_FAILURES".to_string())
);
self.repository.update(&record).await?;
return Err(anyhow!("连续{}次检查任务状态失败", max_consecutive_failures));
}
continue;
}
}
}
// 超时处理
let mut record = self.repository.get_by_id(&record_id).await?
.ok_or_else(|| anyhow!("找不到视频生成记录: {}", record_id))?;
record.mark_as_failed("任务超时".to_string(), Some("TIMEOUT".to_string()));
self.repository.update(&record).await?;
Err(anyhow!("视频生成任务超时"))
}
/// 检查任务状态
async fn check_task_status(&self, task_id: &str) -> Result<VolcanoVideoQueryResponse> {
info!("检查任务状态: {}", task_id);
// 构建查询请求
let request_body = VolcanoVideoQueryRequest {
req_key: "realman_avatar_imitator_v2v_gen_video".to_string(),
task_id: task_id.to_string(),
};
// 实际的火山云API调用 - 查询任务
let api_url = "https://visual.volcengineapi.com?Action=CVGetResult&Version=2022-08-31";
// 构建认证头
let now = Utc::now();
let timestamp = now.format("%Y%m%dT%H%M%SZ").to_string();
let date = now.format("%Y%m%d").to_string();
let auth_header = self.build_auth_header("POST", api_url, &timestamp, &date, &request_body)?;
let response = self.http_client
.post(api_url)
.header("Authorization", auth_header)
.header("Content-Type", "application/json")
.header("X-Date", timestamp)
.json(&request_body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(anyhow!("火山云API调用失败: {} - {}", status, error_text));
}
let api_response: VolcanoVideoQueryResponse = response.json().await?;
if api_response.code != 10000 {
return Err(anyhow!("火山云API返回错误: {} - {}", api_response.code, api_response.message));
}
Ok(api_response)
}
/// 构建火山云API认证头
fn build_auth_header<T: Serialize>(&self, method: &str, url: &str, timestamp: &str, date: &str, body: &T) -> Result<String> {
// 1. 创建规范请求 (CanonicalRequest)
let canonical_request = self.create_canonical_request(method, url, timestamp, body)?;
// 2. 创建待签名字符串 (StringToSign)
let credential_scope = format!("{}/cn-north-1/cv/request", date);
let string_to_sign = self.create_string_to_sign(timestamp, &credential_scope, &canonical_request)?;
// 3. 派生签名密钥 (kSigning)
let signing_key = self.derive_signing_key(&self.vol_secret_key, date, "cn-north-1", "cv")?;
// 4. 计算签名 (Signature)
let signature = self.calculate_signature(&signing_key, &string_to_sign)?;
// 5. 构建Authorization头
let auth_string = format!(
"HMAC-SHA256 Credential={}/{}, SignedHeaders=host;x-date, Signature={}",
self.vol_access_key,
credential_scope,
signature
);
Ok(auth_string)
}
/// 获取视频生成记录列表
pub async fn get_video_generations(&self, query: VideoGenerationQuery) -> Result<Vec<VideoGenerationRecord>> {
self.repository.get_list(query).await
}
/// 根据ID获取视频生成记录
pub async fn get_video_generation_by_id(&self, id: &str) -> Result<Option<VideoGenerationRecord>> {
self.repository.get_by_id(id).await
}
/// 删除视频生成记录
pub async fn delete_video_generation(&self, id: &str) -> Result<()> {
self.repository.delete(id).await
}
/// OmniHuman 主体识别 - 提交任务
/// 用于识别图片中是否包含人、类人、拟人等主体
pub async fn realman_avatar_picture_create_role_omni_submit_task(
&self,
image_url: String
) -> Result<RealmanAvatarPictureCreateRoleOmniResponse> {
info!("开始 OmniHuman 主体识别任务: {}", image_url);
// 构建请求参数
let request_body = RealmanAvatarPictureCreateRoleOmniRequest {
req_key: "realman_avatar_picture_create_role_omni".to_string(),
image_url,
};
// 火山云API调用 - OmniHuman 主体识别
let api_url = "https://visual.volcengineapi.com?Action=RealmanAvatarPictureCreateRoleOmniSubmitTask&Version=2024-06-06";
// 构建认证头
let now = Utc::now();
let timestamp = now.format("%Y%m%dT%H%M%SZ").to_string();
let date = now.format("%Y%m%d").to_string();
let auth_header = self.build_auth_header("POST", api_url, &timestamp, &date, &request_body)?;
info!("调用 OmniHuman 主体识别 API: {:?}", request_body);
let response = self.http_client
.post(api_url)
.header("Authorization", auth_header)
.header("Content-Type", "application/json")
.header("X-Date", timestamp)
.json(&request_body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(anyhow!("OmniHuman 主体识别 API 调用失败: {} - {}", status, error_text));
}
// 先获取响应文本进行调试
let response_text = response.text().await?;
info!("OmniHuman API 原始响应: {}", response_text);
// 尝试解析响应
let api_response: RealmanAvatarPictureCreateRoleOmniResponse = serde_json::from_str(&response_text)
.map_err(|e| anyhow!("解析响应失败: {} - 响应内容: {}", e, response_text))?;
if api_response.result.code != 10000 {
return Err(anyhow!("OmniHuman 主体识别 API 返回错误: {} - {}", api_response.result.code, api_response.result.message));
}
info!("OmniHuman 主体识别任务提交成功: {:?}", api_response);
Ok(api_response)
}
/// 批量删除视频生成记录
pub async fn batch_delete_video_generations(&self, ids: Vec<String>) -> Result<()> {
for id in ids {
self.repository.delete(&id).await?;
}
Ok(())
}
/// 创建规范请求
fn create_canonical_request<T: Serialize>(&self, method: &str, url: &str, timestamp: &str, body: &T) -> Result<String> {
// 解析URL获取host和path
let parsed_url = Url::parse(url)?;
let host = parsed_url.host_str().ok_or_else(|| anyhow!("无效的URL"))?;
let path = parsed_url.path();
let query = parsed_url.query().unwrap_or("");
// 规范化查询字符串
let canonical_query_string = self.canonicalize_query_string(query);
// 规范化请求头
let canonical_headers = format!("host:{}\nx-date:{}\n", host, timestamp);
// 签名的请求头
let signed_headers = "host;x-date";
// 计算请求体的哈希值
let body_json = serde_json::to_string(body)?;
let body_hash = hex::encode(Sha256::digest(body_json.as_bytes()));
// 构建规范请求
let canonical_request = format!(
"{}\n{}\n{}\n{}\n{}\n{}",
method,
path,
canonical_query_string,
canonical_headers,
signed_headers,
body_hash
);
Ok(canonical_request)
}
/// 规范化查询字符串
fn canonicalize_query_string(&self, query: &str) -> String {
if query.is_empty() {
return String::new();
}
let mut params: Vec<(String, String)> = query
.split('&')
.filter_map(|param| {
let mut parts = param.splitn(2, '=');
let key = parts.next()?.to_string();
let value = parts.next().unwrap_or("").to_string();
Some((key, value))
})
.collect();
// 按参数名称的ASCII升序排序
params.sort_by(|a, b| a.0.cmp(&b.0));
// 构建规范化查询字符串
params
.into_iter()
.map(|(key, value)| format!("{}={}", key, value))
.collect::<Vec<_>>()
.join("&")
}
/// 创建待签名字符串
fn create_string_to_sign(&self, timestamp: &str, credential_scope: &str, canonical_request: &str) -> Result<String> {
let canonical_request_hash = hex::encode(Sha256::digest(canonical_request.as_bytes()));
let string_to_sign = format!(
"HMAC-SHA256\n{}\n{}\n{}",
timestamp,
credential_scope,
canonical_request_hash
);
Ok(string_to_sign)
}
/// 派生签名密钥
fn derive_signing_key(&self, secret_key: &str, date: &str, region: &str, service: &str) -> Result<Vec<u8>> {
type HmacSha256 = Hmac<Sha256>;
// kDate = HMAC(kSecret, Date)
let mut mac = HmacSha256::new_from_slice(secret_key.as_bytes())?;
mac.update(date.as_bytes());
let k_date = mac.finalize().into_bytes();
// kRegion = HMAC(kDate, Region)
let mut mac = HmacSha256::new_from_slice(&k_date)?;
mac.update(region.as_bytes());
let k_region = mac.finalize().into_bytes();
// kService = HMAC(kRegion, Service)
let mut mac = HmacSha256::new_from_slice(&k_region)?;
mac.update(service.as_bytes());
let k_service = mac.finalize().into_bytes();
// kSigning = HMAC(kService, "request")
let mut mac = HmacSha256::new_from_slice(&k_service)?;
mac.update(b"request");
let k_signing = mac.finalize().into_bytes();
Ok(k_signing.to_vec())
}
/// 计算签名
fn calculate_signature(&self, signing_key: &[u8], string_to_sign: &str) -> Result<String> {
type HmacSha256 = Hmac<Sha256>;
let mut mac = HmacSha256::new_from_slice(signing_key)?;
mac.update(string_to_sign.as_bytes());
let signature = mac.finalize().into_bytes();
Ok(hex::encode(signature))
}
/// 下载视频并上传到云端服务
async fn download_and_upload_video(&self, video_url: &str, record_id: &str) -> Result<String> {
info!("开始下载并上传视频: {} -> {}", video_url, record_id);
// 创建临时文件路径
let temp_dir = std::env::temp_dir();
let temp_filename = format!("volcano_video_{}_{}.mp4", record_id, chrono::Utc::now().timestamp());
let temp_file_path = temp_dir.join(&temp_filename);
let temp_file_str = temp_file_path.to_string_lossy().to_string();
// 下载视频到本地临时文件
info!("正在下载视频到临时文件: {}", temp_file_str);
match self.download_video_to_file(video_url, &temp_file_str).await {
Ok(_) => {
info!("视频下载成功: {}", temp_file_str);
}
Err(e) => {
error!("视频下载失败: {} - {}", video_url, e);
return Err(anyhow!("视频下载失败: {}", e));
}
}
// 上传到云端服务
info!("正在上传视频到云端: {}", temp_file_str);
let upload_result = match self.cloud_upload_service.upload_file(&temp_file_str, None, None).await {
Ok(result) => result,
Err(e) => {
// 清理临时文件
let _ = tokio::fs::remove_file(&temp_file_path).await;
error!("视频上传失败: {} - {}", temp_file_str, e);
return Err(anyhow!("视频上传失败: {}", e));
}
};
// 清理临时文件
if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
warn!("清理临时文件失败: {} - {}", temp_file_str, e);
}
if !upload_result.success {
let error_msg = upload_result.error_message.unwrap_or_default();
error!("视频上传失败: {}", error_msg);
return Err(anyhow!("视频上传失败: {}", error_msg));
}
let s3_url = upload_result.remote_url
.ok_or_else(|| anyhow!("上传成功但未返回S3 URL"))?;
// 转换S3 URL为CDN URL
let cdn_url = Self::convert_s3_to_cdn_url(&s3_url);
info!("视频上传成功: {} -> {} -> {}", video_url, s3_url, cdn_url);
Ok(cdn_url)
}
/// 下载视频文件到本地
async fn download_video_to_file(&self, video_url: &str, file_path: &str) -> Result<()> {
// 创建带有防盗链绕过的HTTP客户端
let client = reqwest::Client::builder()
.user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
.timeout(std::time::Duration::from_secs(300)) // 5分钟超时
.build()
.map_err(|e| anyhow!("创建HTTP客户端失败: {}", e))?;
// 构建请求,添加必要的头部信息绕过防盗链
let request = client
.get(video_url)
.header("Referer", "https://www.volcengine.com/")
.header("Origin", "https://www.volcengine.com")
.header("Accept", "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5")
.header("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8")
.header("Cache-Control", "no-cache")
.header("Pragma", "no-cache")
.header("Sec-Fetch-Dest", "video")
.header("Sec-Fetch-Mode", "no-cors")
.header("Sec-Fetch-Site", "cross-site");
let response = request.send().await
.map_err(|e| anyhow!("请求视频失败: {}", e))?;
if !response.status().is_success() {
return Err(anyhow!("下载视频失败HTTP状态码: {}", response.status()));
}
let bytes = response.bytes().await
.map_err(|e| anyhow!("读取视频数据失败: {}", e))?;
tokio::fs::write(file_path, &bytes).await
.map_err(|e| anyhow!("保存视频文件失败: {}", e))?;
info!("视频文件保存成功: {} ({} bytes)", file_path, bytes.len());
Ok(())
}
/// 将S3 URL转换为可访问的CDN地址
fn convert_s3_to_cdn_url(s3_url: &str) -> String {
if s3_url.starts_with("s3://ap-northeast-2/modal-media-cache/") {
// 将 s3://ap-northeast-2/modal-media-cache/ 替换为 https://cdn.roasmax.cn/
s3_url.replace("s3://ap-northeast-2/modal-media-cache/", "https://cdn.roasmax.cn/")
} else if s3_url.starts_with("s3://") {
// 处理其他 s3:// 格式转换为通用CDN格式
s3_url.replace("s3://", "https://cdn.roasmax.cn/")
} else if s3_url.contains("amazonaws.com") {
// 如果是完整的S3 HTTPS URL提取key部分
if let Some(key_start) = s3_url.find(".com/") {
let key = &s3_url[key_start + 5..];
format!("https://cdn.roasmax.cn/{}", key)
} else {
s3_url.to_string()
}
} else {
// 如果不是预期的S3格式返回原URL
s3_url.to_string()
}
}
}
// 实现Clone trait以支持异步任务
impl Clone for VolcanoVideoService {
fn clone(&self) -> Self {
Self {
database: self.database.clone(),
repository: VideoGenerationRecordRepository::new(self.database.clone()),
http_client: self.http_client.clone(),
cloud_upload_service: CloudUploadService::new(),
vol_access_key: self.vol_access_key.clone(),
vol_secret_key: self.vol_secret_key.clone(),
}
}
}