modalDeploy/tests/CDN_speed_test_case.py

176 lines
6.5 KiB
Python

import os
import unittest
import statistics
import uuid
import m3u8
import time
from datetime import datetime
from typing import List, Dict
from loguru import logger
from BowongModalFunctions.utils.HTTPUtils import HTTPDownloadUtils
import modal
class CloudFrontCDNTestCase(unittest.IsolatedAsyncioTestCase):
async def test_modal_restore(self):
fn_id = ""
fn = modal.Function.from_name("bowong-ai-video","ffmpeg_stream_record_restore", environment_name="test")
restore_id = fn.spawn(fn_id=fn_id).object_id
logger.info(f"restore id: {restore_id}")
async def test_hls_speed(self):
# 生成唯一ID和临时目录
id = uuid.uuid4()
temp_dir = f"./videos/{id}"
os.makedirs(temp_dir, exist_ok=True)
# HLS URL
# hls_url = "https://d2nj71io21vkj2.cloudfront.net/test/records/hls/fc-01JXBVTMMYSFNZ4SN1D225NMYV/playlist.m3u8"
hls_url = "https://cdn.roasmax.cn/test/records/hls/fc-01JWZS8954RZP4B13H3TA6TKMM/playlist.m3u8"
# 加载播放列表
playlist = m3u8.load(hls_url)
total_sample_size = len(playlist.segments)
used_sample_size = min(100, total_sample_size)
logger.info(f"total samples = {total_sample_size}, used sample = {used_sample_size}")
# 准备下载任务
urls = [segment.absolute_uri for segment in playlist.segments]
output_paths = [f"{temp_dir}/{segment.uri}" for segment in playlist.segments]
# 记录下载开始时间
start_time = time.time()
# 执行下载并收集结果
results = await HTTPDownloadUtils.batch_download(
urls=urls[:used_sample_size],
output_paths=output_paths[:used_sample_size]
)
# 计算总下载时间
total_time = time.time() - start_time
# 生成报告
report = self._generate_speed_report(
results=results,
urls=urls,
total_time=total_time,
playlist=playlist
)
# 打印报告
self._print_report(report)
# 保存报告到文件
self._save_report(report, temp_dir)
# 清理临时文件
self._cleanup(temp_dir)
return report
def _generate_speed_report(
self,
results: List[str],
urls: List[str],
total_time: float,
playlist: m3u8.M3U8
) -> Dict:
"""生成速度测试报告"""
# 计算每个分片的大小和下载时间
segment_stats = []
total_size = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Failed to download segment {i}: {str(result)}")
continue
segment_size = os.path.getsize(result)
total_size += segment_size
segment_stats.append({
'index': i,
'url': urls[i],
'size': segment_size,
'duration': playlist.segments[i].duration,
'bandwidth': segment_size / playlist.segments[i].duration if playlist.segments[i].duration else 0
})
# 计算统计指标
sizes = [stat['size'] for stat in segment_stats]
bandwidths = [stat['bandwidth'] for stat in segment_stats]
report = {
'timestamp': datetime.now().isoformat(),
'total_segments': len(playlist.segments),
'successful_downloads': len(segment_stats),
'total_size': total_size,
'total_time': total_time,
'average_speed': total_size / total_time if total_time > 0 else 0,
'statistics': {
'size': {
'mean': statistics.mean(sizes),
'median': statistics.median(sizes),
'stdev': statistics.stdev(sizes) if len(sizes) > 1 else 0,
'min': min(sizes),
'max': max(sizes)
},
'bandwidth': {
'mean': statistics.mean(bandwidths),
'median': statistics.median(bandwidths),
'stdev': statistics.stdev(bandwidths) if len(bandwidths) > 1 else 0,
'min': min(bandwidths),
'max': max(bandwidths)
}
},
'segments': segment_stats
}
return report
def _print_report(self, report: Dict):
"""打印报告到控制台"""
logger.info("=== CloudFront CDN Speed Test Report ===")
logger.info(f"Timestamp: {report['timestamp']}")
logger.info(f"Total Segments: {report['total_segments']}")
logger.info(f"Successful Downloads: {report['successful_downloads']}")
logger.info(f"Total Size: {report['total_size'] / 1024 / 1024:.2f} MB")
logger.info(f"Total Time: {report['total_time']:.2f} seconds")
logger.info(f"Average Speed: {report['average_speed'] / 1024 / 1024:.2f} MB/s")
logger.info("Size Statistics:")
logger.info(f" Mean: {report['statistics']['size']['mean'] / 1024:.2f} KB")
logger.info(f" Median: {report['statistics']['size']['median'] / 1024:.2f} KB")
logger.info(f" StdDev: {report['statistics']['size']['stdev'] / 1024:.2f} KB")
logger.info(f" Min: {report['statistics']['size']['min'] / 1024:.2f} KB")
logger.info(f" Max: {report['statistics']['size']['max'] / 1024:.2f} KB")
logger.info("Bandwidth Statistics:")
logger.info(f" Mean: {report['statistics']['bandwidth']['mean'] / 1024:.2f} KB/s")
logger.info(f" Median: {report['statistics']['bandwidth']['median'] / 1024:.2f} KB/s")
logger.info(f" StdDev: {report['statistics']['bandwidth']['stdev'] / 1024:.2f} KB/s")
logger.info(f" Min: {report['statistics']['bandwidth']['min'] / 1024:.2f} KB/s")
logger.info(f" Max: {report['statistics']['bandwidth']['max'] / 1024:.2f} KB/s")
def _save_report(self, report: Dict, temp_dir: str):
"""保存报告到文件"""
import json
report_path = os.path.join(temp_dir, 'speed_test_report.json')
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
logger.info(f"\nReport saved to: {report_path}")
def _cleanup(self, temp_dir: str):
"""清理临时文件"""
import shutil
try:
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.error(f"Failed to cleanup temporary directory: {str(e)}")
if __name__ == '__main__':
unittest.main()