use std::process::Command; use std::io::{BufRead, BufReader}; use std::time::{Duration, Instant}; use std::thread; use std::sync::{Arc, Mutex}; use std::sync::mpsc; use serde::{Deserialize, Serialize}; use tauri::{AppHandle, Emitter}; /// Generic progress information from Python processes #[derive(Debug, Serialize, Deserialize, Clone)] pub struct PythonProgress { pub step: String, pub progress: f64, // -1 for indeterminate, 0-100 for percentage pub message: String, pub details: Option, pub timestamp: f64, } /// Progress callback trait for handling Python process progress pub trait ProgressCallback: Send + Sync { fn on_progress(&self, progress: PythonProgress); } /// Simple function-based progress callback pub struct FunctionCallback where F: Fn(PythonProgress) + Send + Sync, { callback: F, } impl FunctionCallback where F: Fn(PythonProgress) + Send + Sync, { #[allow(dead_code)] pub fn new(callback: F) -> Self { Self { callback } } } impl ProgressCallback for FunctionCallback where F: Fn(PythonProgress) + Send + Sync, { fn on_progress(&self, progress: PythonProgress) { (self.callback)(progress); } } /// Tauri event-based progress callback pub struct TauriEventCallback { app: AppHandle, event_name: String, } impl TauriEventCallback { pub fn new(app: AppHandle, event_name: String) -> Self { Self { app, event_name } } } impl ProgressCallback for TauriEventCallback { fn on_progress(&self, progress: PythonProgress) { let _ = self.app.emit(&self.event_name, &progress); } } /// Configuration for Python command execution pub struct PythonExecutorConfig { pub timeout_seconds: u64, pub working_directory: Option, } impl Default for PythonExecutorConfig { fn default() -> Self { Self { timeout_seconds: 600, // 10 minutes default working_directory: None, } } } /// Execute a Python command with progress callback support /// /// This function handles: /// - Multiple Python command attempts (python, python3, py) /// - UTF-8 encoding setup /// - Concurrent stdout/stderr reading /// - JSON-RPC message parsing /// - Progress notifications via callback /// - Timeout handling pub async fn execute_python_command_with_progress

( app: tauri::AppHandle, args: &[String], config: Option, progress_callback: Option

, ) -> Result where P: ProgressCallback + 'static, { execute_python_internal(app, args, config, progress_callback).await } /// Execute a Python command with the given arguments (legacy interface) /// /// This function handles: /// - Multiple Python command attempts (python, python3, py) /// - UTF-8 encoding setup /// - Concurrent stdout/stderr reading /// - JSON-RPC message parsing /// - Timeout handling /// - Detailed error reporting pub async fn execute_python_command( app: tauri::AppHandle, args: &[String], config: Option ) -> Result { execute_python_internal(app, args, config, None::>).await } /// Execute Python command with a simple function callback #[allow(dead_code)] pub async fn execute_python_with_callback( app: tauri::AppHandle, args: &[String], config: Option, callback: F, ) -> Result where F: Fn(PythonProgress) + Send + Sync + 'static, { let progress_callback = FunctionCallback::new(callback); execute_python_command_with_progress(app, args, config, Some(progress_callback)).await } /// Execute Python command with Tauri event emission pub async fn execute_python_with_events( app: tauri::AppHandle, args: &[String], config: Option, event_name: &str, ) -> Result { let progress_callback = TauriEventCallback::new(app.clone(), event_name.to_string()); execute_python_command_with_progress(app, args, config, Some(progress_callback)).await } /// Python command builder for easier command construction #[allow(dead_code)] pub struct PythonCommandBuilder { module: String, args: Vec<(String, String)>, } #[allow(dead_code)] impl PythonCommandBuilder { pub fn new(module: &str) -> Self { Self { module: module.to_string(), args: Vec::new(), } } pub fn arg(mut self, key: &str, value: &str) -> Self { self.args.push((key.to_string(), value.to_string())); self } pub fn action(self, action: &str) -> Self { self.arg("--action", action) } pub fn build(self) -> Vec { let mut result = vec![ "-m".to_string(), self.module, ]; for (key, value) in self.args { result.push(key); result.push(value); } result } } /// Execute a Python module with progress support #[allow(dead_code)] pub async fn execute_python_module_with_progress( app: tauri::AppHandle, module: &str, action: &str, params: Vec<(&str, &str)>, event_name: &str, config: Option, ) -> Result { let mut builder = PythonCommandBuilder::new(module).action(action); for (key, value) in params { builder = builder.arg(key, value); } let args = builder.build(); execute_python_with_events(app, &args, config, event_name).await } /// Helper function to create Python module commands with progress /// /// # Arguments /// * `app` - Tauri app handle /// * `module` - Python module path (e.g., "python_core.services.template_manager") /// * `action` - Action to perform (e.g., "batch_import") /// * `params` - Additional parameters as key-value pairs /// * `event_name` - Event name for progress updates /// * `config` - Optional executor configuration #[allow(dead_code)] pub async fn execute_python_action_with_progress( app: tauri::AppHandle, module: &str, action: &str, params: &[(&str, &str)], event_name: &str, config: Option, ) -> Result { let mut args = vec![ "-m".to_string(), module.to_string(), "--action".to_string(), action.to_string(), ]; // Add additional parameters for (key, value) in params { args.push(key.to_string()); args.push(value.to_string()); } execute_python_with_events(app, &args, config, event_name).await } /// Internal implementation for Python command execution async fn execute_python_internal

( _app: tauri::AppHandle, args: &[String], config: Option, progress_callback: Option

, ) -> Result where P: ProgressCallback + 'static, { let config = config.unwrap_or_default(); println!("Executing Python command with args: {:?}", args); // Get project root directory let current_dir = std::env::current_dir() .map_err(|e| format!("Failed to get current directory: {}", e))?; let project_root = if let Some(working_dir) = config.working_directory { working_dir } else if current_dir.ends_with("src-tauri") { current_dir.parent().unwrap_or(¤t_dir).to_path_buf() } else { current_dir }; println!("Working directory: {:?}", project_root); // Try multiple Python commands in order of preference let python_commands = if cfg!(target_os = "windows") { vec!["python", "python3", "py"] } else { vec!["python3", "python"] }; let mut child = None; let mut last_error = String::new(); for python_cmd in python_commands { println!("Trying Python command: {}", python_cmd); let mut cmd = Command::new(python_cmd); cmd.current_dir(&project_root); cmd.args(args); cmd.stdout(std::process::Stdio::piped()); cmd.stderr(std::process::Stdio::piped()); // Set environment variables for consistent encoding cmd.env("PYTHONIOENCODING", "utf-8"); cmd.env("PYTHONUNBUFFERED", "1"); // On Windows, ensure UTF-8 console output if cfg!(target_os = "windows") { cmd.env("PYTHONUTF8", "1"); } match cmd.spawn() { Ok(process) => { println!("Successfully started Python process with: {}", python_cmd); child = Some(process); break; } Err(e) => { last_error = format!("Failed to start {} process: {}", python_cmd, e); println!("{}", last_error); continue; } } } let mut child = child.ok_or_else(|| { format!("Failed to start Python process with any command. Last error: {}", last_error) })?; let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); // Use channels for concurrent reading let (stdout_tx, _stdout_rx) = mpsc::channel(); let (stderr_tx, _stderr_rx) = mpsc::channel(); let (result_tx, result_rx) = mpsc::channel(); let progress_messages = Arc::new(Mutex::new(Vec::new())); let error_messages = Arc::new(Mutex::new(Vec::new())); let final_result = Arc::new(Mutex::new(None::)); // Wrap progress callback in Arc for thread sharing let progress_callback = Arc::new(progress_callback); // Spawn thread for reading stdout let progress_messages_clone = Arc::clone(&progress_messages); let final_result_clone = Arc::clone(&final_result); let progress_callback_clone = Arc::clone(&progress_callback); thread::spawn(move || { let mut reader = BufReader::new(stdout); let mut buffer = Vec::new(); // Read raw bytes and handle encoding issues loop { buffer.clear(); match reader.read_until(b'\n', &mut buffer) { Ok(0) => break, // EOF Ok(_) => { // Convert bytes to string, handling invalid UTF-8 let line = String::from_utf8_lossy(&buffer); let line = line.trim_end_matches('\n').trim_end_matches('\r'); if line.is_empty() { continue; } println!("Python stdout: {}", line); // Parse JSON-RPC messages if line.starts_with("JSONRPC:") { let json_str = &line[8..]; // Remove "JSONRPC:" prefix if let Ok(json_value) = serde_json::from_str::(json_str) { if let Some(method) = json_value.get("method") { if method == "progress" { // Store progress message if let Ok(mut messages) = progress_messages_clone.lock() { messages.push(json_str.to_string()); } // Call progress callback if available if let Some(ref callback) = *progress_callback_clone { if let Some(params) = json_value.get("params") { // Try to parse as PythonProgress if let Ok(progress) = serde_json::from_value::(params.clone()) { callback.on_progress(progress); } else { // Fallback: create basic progress from available data let step = params.get("step") .and_then(|v| v.as_str()) .unwrap_or("unknown") .to_string(); let message = params.get("message") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let progress_val = params.get("progress") .and_then(|v| v.as_f64()) .unwrap_or(-1.0); let progress = PythonProgress { step, progress: progress_val, message, details: Some(params.clone()), timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64(), }; callback.on_progress(progress); } } } println!("Progress: {}", json_str); } } else if json_value.get("result").is_some() || json_value.get("error").is_some() { // This is a final result or error response if let Ok(mut result) = final_result_clone.lock() { // Extract the result or error content from JSON-RPC if let Some(result_content) = json_value.get("result") { *result = Some(result_content.to_string()); } else if let Some(error_content) = json_value.get("error") { *result = Some(format!("{{\"status\":false,\"error\":{}}}", error_content)); } }; } } } else if line.trim().starts_with('{') && line.trim().ends_with('}') { // Fallback: try to parse as direct JSON result if let Ok(json_value) = serde_json::from_str::(line.trim()) { // Check if this looks like a final result (has status field) if json_value.get("status").is_some() { if let Ok(mut result) = final_result_clone.lock() { *result = Some(line.trim().to_string()); } println!("Direct JSON result: {}", line.trim()); } } } else { println!("Python other: {}", line); } if stdout_tx.send(line.to_string()).is_err() { break; } } Err(e) => { println!("Error reading stdout: {}", e); break; } } } }); // Spawn thread for reading stderr let error_messages_clone = Arc::clone(&error_messages); thread::spawn(move || { let mut reader = BufReader::new(stderr); let mut buffer = Vec::new(); // Read raw bytes and handle encoding issues loop { buffer.clear(); match reader.read_until(b'\n', &mut buffer) { Ok(0) => break, // EOF Ok(_) => { // Convert bytes to string, handling invalid UTF-8 let line = String::from_utf8_lossy(&buffer); let line = line.trim_end_matches('\n').trim_end_matches('\r'); if line.is_empty() { continue; } println!("Python stderr: {}", line); if let Ok(mut messages) = error_messages_clone.lock() { messages.push(line.to_string()); } if stderr_tx.send(line.to_string()).is_err() { break; } } Err(e) => { println!("Error reading stderr: {}", e); break; } } } }); // Spawn thread to wait for process completion thread::spawn(move || { match child.wait() { Ok(status) => { let _ = result_tx.send(Ok(status)); } Err(e) => { let _ = result_tx.send(Err(format!("Failed to wait for process: {}", e))); } } }); // Wait for process to complete with timeout println!("Waiting for Python process to complete..."); let timeout_duration = Duration::from_secs(config.timeout_seconds); let start_time = Instant::now(); let exit_status = loop { match result_rx.try_recv() { Ok(Ok(status)) => break status, Ok(Err(e)) => return Err(e), Err(mpsc::TryRecvError::Empty) => { if start_time.elapsed() > timeout_duration { return Err(format!("Python process timed out after {} seconds", config.timeout_seconds)); } thread::sleep(Duration::from_millis(100)); continue; } Err(mpsc::TryRecvError::Disconnected) => { return Err("Process monitoring thread disconnected".to_string()); } } }; println!("Python process terminated with code: {:?}", exit_status.code()); // Extract final results from shared state let final_result_value = { final_result.lock().unwrap().clone() }; let progress_messages_value = { progress_messages.lock().unwrap().clone() }; let error_messages_value = { error_messages.lock().unwrap().clone() }; // Return the final result if we found one if let Some(result) = final_result_value { println!("Extracted JSON-RPC result: {}", result); return Ok(result); } // If no JSON-RPC result found, provide detailed error information if exit_status.success() { // Process succeeded but no output - this is unusual let error_msg = if progress_messages_value.is_empty() { "Python script completed successfully but produced no output" } else { "Python script completed but did not return a valid JSON result" }; Err(error_msg.to_string()) } else { // Process failed - provide detailed error based on exit code let error_msg = if let Some(code) = exit_status.code() { let unsigned_code = code as u32; match code { 1 => "Python script failed with general error. Check if all dependencies are installed.".to_string(), 120 => "Python module import failed or function not supported. This may be due to missing dependencies or incompatible Python environment. Please run the environment test first.".to_string(), -1073741510 => "Python process was terminated by antivirus or system security. Please add the application to your antivirus whitelist.".to_string(), _ if unsigned_code == 3221225786 => "Python process was terminated by antivirus or system security. Please add the application to your antivirus whitelist.".to_string(), _ => format!("Python script failed with exit code: {}. This may indicate a system or environment issue.", code) } } else { "Python process was killed by external signal. This may be caused by antivirus software or system security policies.".to_string() }; // Include any error output we captured let mut full_error = error_msg; if !progress_messages_value.is_empty() { full_error.push_str(&format!("\n\nLast stdout: {}", progress_messages_value.join("\n"))); } if !error_messages_value.is_empty() { full_error.push_str(&format!("\n\nStderr: {}", error_messages_value.join("\n"))); } Err(full_error) } }