#!/usr/bin/env python3 """ Video Processing Core Module 视频处理核心模块 This module provides the main video processing functionality using MoviePy and FFmpeg. """ import argparse import json import sys from pathlib import Path from typing import Dict, Any, Optional import moviepy.editor as mp from moviepy.video.fx import resize, crop, rotate from moviepy.audio.fx import volumex import ffmpeg import cv2 import numpy as np import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import settings from utils import setup_logger, validate_video_file logger = setup_logger(__name__) class VideoProcessor: """Main video processing class.""" def __init__(self): self.temp_dir = settings.temp_dir self.cache_dir = settings.cache_dir def process_video(self, input_path: str, output_path: str, operation: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """ Process video with specified operation and parameters. Args: input_path: Path to input video file output_path: Path to output video file operation: Type of operation to perform parameters: Operation-specific parameters Returns: Dictionary with processing results """ try: if not validate_video_file(input_path): raise ValueError(f"Invalid video file: {input_path}") logger.info(f"Processing video: {operation} on {input_path}") # Load video video = mp.VideoFileClip(input_path) # Apply operation based on type if operation == "trim": result_video = self._trim_video(video, parameters) elif operation == "resize": result_video = self._resize_video(video, parameters) elif operation == "crop": result_video = self._crop_video(video, parameters) elif operation == "rotate": result_video = self._rotate_video(video, parameters) elif operation == "adjust_brightness": result_video = self._adjust_brightness(video, parameters) elif operation == "adjust_contrast": result_video = self._adjust_contrast(video, parameters) elif operation == "add_text": result_video = self._add_text(video, parameters) elif operation == "merge": result_video = self._merge_videos(parameters) else: raise ValueError(f"Unknown operation: {operation}") # Write output result_video.write_videofile( output_path, codec=settings.default_video_codec, audio_codec=settings.default_audio_codec, fps=parameters.get("fps", settings.default_fps) ) # Clean up video.close() result_video.close() # Get output info output_info = self._get_video_info(output_path) return { "status": "success", "output_path": output_path, "info": output_info } except Exception as e: logger.error(f"Video processing failed: {str(e)}") return { "status": "error", "error": str(e) } def _trim_video(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Trim video to specified start and end times.""" start_time = params.get("start_time", 0) end_time = params.get("end_time", video.duration) return video.subclip(start_time, end_time) def _resize_video(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Resize video to specified dimensions.""" width = params.get("width") height = params.get("height") if width and height: return video.fx(resize, newsize=(width, height)) elif width: return video.fx(resize, width=width) elif height: return video.fx(resize, height=height) else: raise ValueError("Width or height must be specified for resize operation") def _crop_video(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Crop video to specified region.""" x1 = params.get("x1", 0) y1 = params.get("y1", 0) x2 = params.get("x2", video.w) y2 = params.get("y2", video.h) return video.fx(crop, x1=x1, y1=y1, x2=x2, y2=y2) def _rotate_video(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Rotate video by specified angle.""" angle = params.get("angle", 0) return video.fx(rotate, angle) def _adjust_brightness(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Adjust video brightness.""" factor = params.get("factor", 1.0) def brightness_effect(get_frame, t): frame = get_frame(t) return np.clip(frame * factor, 0, 255).astype(np.uint8) return video.fl(brightness_effect) def _adjust_contrast(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Adjust video contrast.""" factor = params.get("factor", 1.0) def contrast_effect(get_frame, t): frame = get_frame(t) return np.clip(128 + factor * (frame - 128), 0, 255).astype(np.uint8) return video.fl(contrast_effect) def _add_text(self, video: mp.VideoFileClip, params: Dict[str, Any]) -> mp.VideoFileClip: """Add text overlay to video.""" text = params.get("text", "") fontsize = params.get("fontsize", 50) color = params.get("color", "white") position = params.get("position", ("center", "bottom")) duration = params.get("duration", video.duration) txt_clip = mp.TextClip(text, fontsize=fontsize, color=color).set_position(position).set_duration(duration) return mp.CompositeVideoClip([video, txt_clip]) def _merge_videos(self, params: Dict[str, Any]) -> mp.VideoFileClip: """Merge multiple videos.""" video_paths = params.get("video_paths", []) if len(video_paths) < 2: raise ValueError("At least 2 videos required for merge operation") clips = [mp.VideoFileClip(path) for path in video_paths] return mp.concatenate_videoclips(clips) def _get_video_info(self, video_path: str) -> Dict[str, Any]: """Get video file information.""" try: probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) info = { "duration": float(probe['format']['duration']), "size": int(probe['format']['size']), "bitrate": int(probe['format']['bit_rate']), } if video_stream: info.update({ "width": int(video_stream['width']), "height": int(video_stream['height']), "fps": eval(video_stream['r_frame_rate']), "video_codec": video_stream['codec_name'] }) if audio_stream: info.update({ "audio_codec": audio_stream['codec_name'], "sample_rate": int(audio_stream['sample_rate']), "channels": int(audio_stream['channels']) }) return info except Exception as e: logger.error(f"Failed to get video info: {str(e)}") return {} def main(): """Command line interface for video processing.""" parser = argparse.ArgumentParser(description="Video Processing Core") parser.add_argument("--input", required=True, help="Input video file path") parser.add_argument("--output", required=True, help="Output video file path") parser.add_argument("--operation", required=True, help="Operation to perform") parser.add_argument("--parameters", required=True, help="Operation parameters as JSON string") args = parser.parse_args() try: parameters = json.loads(args.parameters) processor = VideoProcessor() result = processor.process_video(args.input, args.output, args.operation, parameters) print(json.dumps(result)) except Exception as e: error_result = { "status": "error", "error": str(e) } print(json.dumps(error_result)) sys.exit(1) if __name__ == "__main__": main()