import os import unittest import statistics import uuid import m3u8 import time from datetime import datetime from typing import List, Dict from loguru import logger from BowongModalFunctions.utils.HTTPUtils import HTTPDownloadUtils import modal class CloudFrontCDNTestCase(unittest.IsolatedAsyncioTestCase): async def test_modal_restore(self): fn_id = "" fn = modal.Function.from_name("bowong-ai-video","ffmpeg_stream_record_restore", environment_name="test") restore_id = fn.spawn(fn_id=fn_id).object_id logger.info(f"restore id: {restore_id}") async def test_hls_speed(self): # 生成唯一ID和临时目录 id = uuid.uuid4() temp_dir = f"./videos/{id}" os.makedirs(temp_dir, exist_ok=True) # HLS URL # hls_url = "https://d2nj71io21vkj2.cloudfront.net/test/records/hls/fc-01JXBVTMMYSFNZ4SN1D225NMYV/playlist.m3u8" hls_url = "https://cdn.roasmax.cn/test/records/hls/fc-01JWZS8954RZP4B13H3TA6TKMM/playlist.m3u8" # 加载播放列表 playlist = m3u8.load(hls_url) total_sample_size = len(playlist.segments) used_sample_size = min(100, total_sample_size) logger.info(f"total samples = {total_sample_size}, used sample = {used_sample_size}") # 准备下载任务 urls = [segment.absolute_uri for segment in playlist.segments] output_paths = [f"{temp_dir}/{segment.uri}" for segment in playlist.segments] # 记录下载开始时间 start_time = time.time() # 执行下载并收集结果 results = await HTTPDownloadUtils.batch_download( urls=urls[:used_sample_size], output_paths=output_paths[:used_sample_size] ) # 计算总下载时间 total_time = time.time() - start_time # 生成报告 report = self._generate_speed_report( results=results, urls=urls, total_time=total_time, playlist=playlist ) # 打印报告 self._print_report(report) # 保存报告到文件 self._save_report(report, temp_dir) # 清理临时文件 self._cleanup(temp_dir) return report def _generate_speed_report( self, results: List[str], urls: List[str], total_time: float, playlist: m3u8.M3U8 ) -> Dict: """生成速度测试报告""" # 计算每个分片的大小和下载时间 segment_stats = [] total_size = 0 for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"Failed to download segment {i}: {str(result)}") continue segment_size = os.path.getsize(result) total_size += segment_size segment_stats.append({ 'index': i, 'url': urls[i], 'size': segment_size, 'duration': playlist.segments[i].duration, 'bandwidth': segment_size / playlist.segments[i].duration if playlist.segments[i].duration else 0 }) # 计算统计指标 sizes = [stat['size'] for stat in segment_stats] bandwidths = [stat['bandwidth'] for stat in segment_stats] report = { 'timestamp': datetime.now().isoformat(), 'total_segments': len(playlist.segments), 'successful_downloads': len(segment_stats), 'total_size': total_size, 'total_time': total_time, 'average_speed': total_size / total_time if total_time > 0 else 0, 'statistics': { 'size': { 'mean': statistics.mean(sizes), 'median': statistics.median(sizes), 'stdev': statistics.stdev(sizes) if len(sizes) > 1 else 0, 'min': min(sizes), 'max': max(sizes) }, 'bandwidth': { 'mean': statistics.mean(bandwidths), 'median': statistics.median(bandwidths), 'stdev': statistics.stdev(bandwidths) if len(bandwidths) > 1 else 0, 'min': min(bandwidths), 'max': max(bandwidths) } }, 'segments': segment_stats } return report def _print_report(self, report: Dict): """打印报告到控制台""" logger.info("=== CloudFront CDN Speed Test Report ===") logger.info(f"Timestamp: {report['timestamp']}") logger.info(f"Total Segments: {report['total_segments']}") logger.info(f"Successful Downloads: {report['successful_downloads']}") logger.info(f"Total Size: {report['total_size'] / 1024 / 1024:.2f} MB") logger.info(f"Total Time: {report['total_time']:.2f} seconds") logger.info(f"Average Speed: {report['average_speed'] / 1024 / 1024:.2f} MB/s") logger.info("Size Statistics:") logger.info(f" Mean: {report['statistics']['size']['mean'] / 1024:.2f} KB") logger.info(f" Median: {report['statistics']['size']['median'] / 1024:.2f} KB") logger.info(f" StdDev: {report['statistics']['size']['stdev'] / 1024:.2f} KB") logger.info(f" Min: {report['statistics']['size']['min'] / 1024:.2f} KB") logger.info(f" Max: {report['statistics']['size']['max'] / 1024:.2f} KB") logger.info("Bandwidth Statistics:") logger.info(f" Mean: {report['statistics']['bandwidth']['mean'] / 1024:.2f} KB/s") logger.info(f" Median: {report['statistics']['bandwidth']['median'] / 1024:.2f} KB/s") logger.info(f" StdDev: {report['statistics']['bandwidth']['stdev'] / 1024:.2f} KB/s") logger.info(f" Min: {report['statistics']['bandwidth']['min'] / 1024:.2f} KB/s") logger.info(f" Max: {report['statistics']['bandwidth']['max'] / 1024:.2f} KB/s") def _save_report(self, report: Dict, temp_dir: str): """保存报告到文件""" import json report_path = os.path.join(temp_dir, 'speed_test_report.json') with open(report_path, 'w') as f: json.dump(report, f, indent=2) logger.info(f"\nReport saved to: {report_path}") def _cleanup(self, temp_dir: str): """清理临时文件""" import shutil try: shutil.rmtree(temp_dir) logger.info(f"Cleaned up temporary directory: {temp_dir}") except Exception as e: logger.error(f"Failed to cleanup temporary directory: {str(e)}") if __name__ == '__main__': unittest.main()