diff --git a/src-tauri/src/commands/ai_video.rs b/src-tauri/src/commands/ai_video.rs index e19aeb6..d1961ce 100644 --- a/src-tauri/src/commands/ai_video.rs +++ b/src-tauri/src/commands/ai_video.rs @@ -1,6 +1,10 @@ use serde::Deserialize; use std::process::Command; use std::io::{BufRead, BufReader}; +use std::time::{Duration, Instant}; +use std::thread; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc; #[derive(Debug, Deserialize)] pub struct AIVideoRequest { @@ -24,7 +28,7 @@ pub struct BatchAIVideoRequest { async fn execute_python_command(_app: tauri::AppHandle, args: &[String]) -> Result { println!("Executing Python command with args: {:?}", args); - + // Get project root directory let current_dir = std::env::current_dir() .map_err(|e| format!("Failed to get current directory: {}", e))?; @@ -38,24 +42,34 @@ async fn execute_python_command(_app: tauri::AppHandle, args: &[String]) -> Resu println!("Working directory: {:?}", project_root); // Try multiple Python commands in order of preference + let python_commands = if cfg!(target_os = "windows") { + vec!["python", "python3", "py"] + } else { + vec!["python3", "python"] + }; let mut child = None; let mut last_error = String::new(); - let mut cmd = Command::new("python"); - cmd.current_dir(&project_root); - cmd.args(args); - cmd.stdout(std::process::Stdio::piped()); - cmd.stderr(std::process::Stdio::piped()); + for python_cmd in python_commands { + println!("Trying Python command: {}", python_cmd); + let mut cmd = Command::new(python_cmd); + cmd.current_dir(&project_root); + cmd.args(args); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); - match cmd.spawn() { - Ok(process) => { - println!("Successfully started Python process"); - child = Some(process); - } - Err(e) => { - last_error = format!("Failed to process: {}", e); - println!("{}", last_error); + match cmd.spawn() { + Ok(process) => { + println!("Successfully started Python process with: {}", python_cmd); + child = Some(process); + break; + } + Err(e) => { + last_error = format!("Failed to start {} process: {}", python_cmd, e); + println!("{}", last_error); + continue; + } } } @@ -66,56 +80,127 @@ async fn execute_python_command(_app: tauri::AppHandle, args: &[String]) -> Resu let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); - let stdout_reader = BufReader::new(stdout); - let stderr_reader = BufReader::new(stderr); + // Use channels for concurrent reading + let (stdout_tx, stdout_rx) = mpsc::channel(); + let (stderr_tx, stderr_rx) = mpsc::channel(); + let (result_tx, result_rx) = mpsc::channel(); - let mut progress_messages = Vec::new(); - let mut error_messages = Vec::new(); - let mut final_result: Option = None; + let progress_messages = Arc::new(Mutex::new(Vec::new())); + let error_messages = Arc::new(Mutex::new(Vec::new())); + let final_result = Arc::new(Mutex::new(None::)); - // Read stdout - for line in stdout_reader.lines() { - let line = line.map_err(|e| format!("Failed to read stdout: {}", e))?; - println!("Python stdout: {}", line); + // Spawn thread for reading stdout + let progress_messages_clone = Arc::clone(&progress_messages); + let final_result_clone = Arc::clone(&final_result); + thread::spawn(move || { + let reader = BufReader::new(stdout); + for line in reader.lines() { + match line { + Ok(line) => { + println!("Python stdout: {}", line); - // Parse JSON-RPC messages - if line.starts_with("JSONRPC:") { - let json_str = &line[8..]; // Remove "JSONRPC:" prefix - if let Ok(json_value) = serde_json::from_str::(json_str) { - if let Some(method) = json_value.get("method") { - if method == "progress" { - progress_messages.push(json_str.to_string()); - println!("Progress: {}", json_str); + // Parse JSON-RPC messages + if line.starts_with("JSONRPC:") { + let json_str = &line[8..]; // Remove "JSONRPC:" prefix + if let Ok(json_value) = serde_json::from_str::(json_str) { + if let Some(method) = json_value.get("method") { + if method == "progress" { + if let Ok(mut messages) = progress_messages_clone.lock() { + messages.push(json_str.to_string()); + } + println!("Progress: {}", json_str); + } + } else if json_value.get("result").is_some() || json_value.get("error").is_some() { + // This is a final result or error response + if let Ok(mut result) = final_result_clone.lock() { + *result = Some(json_str.to_string()); + } + println!("JSON-RPC result found: {}", json_str); + } + } + } else if line.trim().starts_with('{') && line.trim().ends_with('}') { + // Fallback: try to parse as direct JSON result + if let Ok(json_value) = serde_json::from_str::(line.trim()) { + // Check if this looks like a final result (has status field) + if json_value.get("status").is_some() { + if let Ok(mut result) = final_result_clone.lock() { + *result = Some(line.trim().to_string()); + } + println!("Direct JSON result: {}", line.trim()); + } + } + } else { + println!("Python other: {}", line); + } + + if stdout_tx.send(line).is_err() { + break; } - } else if json_value.get("result").is_some() || json_value.get("error").is_some() { - // This is a final result or error response - always update to get the latest - final_result = Some(json_str.to_string()); - println!("JSON-RPC result found: {}", json_str); } - } - } else if line.trim().starts_with('{') && line.trim().ends_with('}') { - // Fallback: try to parse as direct JSON result - if let Ok(json_value) = serde_json::from_str::(line.trim()) { - // Check if this looks like a final result (has status field) - if json_value.get("status").is_some() { - final_result = Some(line.trim().to_string()); - println!("Direct JSON result: {}", line.trim()); + Err(e) => { + println!("Error reading stdout: {}", e); + break; } } } - } + }); - // Read stderr - for line in stderr_reader.lines() { - let line = line.map_err(|e| format!("Failed to read stderr: {}", e))?; - println!("Python stderr: {}", line); - error_messages.push(line); - } + // Spawn thread for reading stderr + let error_messages_clone = Arc::clone(&error_messages); + thread::spawn(move || { + let reader = BufReader::new(stderr); + for line in reader.lines() { + match line { + Ok(line) => { + println!("Python stderr: {}", line); + if let Ok(mut messages) = error_messages_clone.lock() { + messages.push(line.clone()); + } + if stderr_tx.send(line).is_err() { + break; + } + } + Err(e) => { + println!("Error reading stderr: {}", e); + break; + } + } + } + }); - // Wait for process to complete + // Spawn thread to wait for process completion + thread::spawn(move || { + match child.wait() { + Ok(status) => { + let _ = result_tx.send(Ok(status)); + } + Err(e) => { + let _ = result_tx.send(Err(format!("Failed to wait for process: {}", e))); + } + } + }); + + // Wait for process to complete with timeout println!("Waiting for Python process to complete..."); - let exit_status = child.wait() - .map_err(|e| format!("Failed to wait for Python process: {}", e))?; + let timeout_duration = Duration::from_secs(600); // 10 minutes timeout + let start_time = Instant::now(); + + let exit_status = loop { + match result_rx.try_recv() { + Ok(Ok(status)) => break status, + Ok(Err(e)) => return Err(e), + Err(mpsc::TryRecvError::Empty) => { + if start_time.elapsed() > timeout_duration { + return Err("Python process timed out after 10 minutes".to_string()); + } + thread::sleep(Duration::from_millis(100)); + continue; + } + Err(mpsc::TryRecvError::Disconnected) => { + return Err("Process monitoring thread disconnected".to_string()); + } + } + }; println!("Python process terminated with code: {:?}", exit_status.code()); println!("Python script exit code: {:?}", exit_status.code()); @@ -136,8 +221,19 @@ async fn execute_python_command(_app: tauri::AppHandle, args: &[String]) -> Resu println!("Python process was terminated by signal (killed externally)"); } + // Extract final results from shared state + let final_result_value = { + final_result.lock().unwrap().clone() + }; + let progress_messages_value = { + progress_messages.lock().unwrap().clone() + }; + let error_messages_value = { + error_messages.lock().unwrap().clone() + }; + // Return the final result if we found one - if let Some(result) = final_result { + if let Some(result) = final_result_value { println!("Extracted JSON-RPC result: {}", result); return Ok(result); } @@ -145,7 +241,7 @@ async fn execute_python_command(_app: tauri::AppHandle, args: &[String]) -> Resu // If no JSON-RPC result found, provide detailed error information if exit_status.success() { // Process succeeded but no output - this is unusual - let error_msg = if progress_messages.is_empty() { + let error_msg = if progress_messages_value.is_empty() { "Python script completed successfully but produced no output" } else { "Python script completed but did not return a valid JSON result" @@ -168,12 +264,12 @@ async fn execute_python_command(_app: tauri::AppHandle, args: &[String]) -> Resu // Include any error output we captured let mut full_error = error_msg; - if !progress_messages.is_empty() { - full_error.push_str(&format!("\n\nLast stdout: {}", progress_messages.join("\n"))); + if !progress_messages_value.is_empty() { + full_error.push_str(&format!("\n\nLast stdout: {}", progress_messages_value.join("\n"))); } - if !error_messages.is_empty() { - full_error.push_str(&format!("\n\nStderr: {}", error_messages.join("\n"))); + if !error_messages_value.is_empty() { + full_error.push_str(&format!("\n\nStderr: {}", error_messages_value.join("\n"))); } Err(full_error)