176 lines
6.5 KiB
Python
176 lines
6.5 KiB
Python
import os
|
|
import unittest
|
|
import statistics
|
|
import uuid
|
|
|
|
import m3u8
|
|
import time
|
|
from datetime import datetime
|
|
from typing import List, Dict
|
|
from loguru import logger
|
|
from BowongModalFunctions.utils.HTTPUtils import HTTPDownloadUtils
|
|
import modal
|
|
|
|
class CloudFrontCDNTestCase(unittest.IsolatedAsyncioTestCase):
|
|
|
|
async def test_modal_restore(self):
|
|
fn_id = ""
|
|
fn = modal.Function.from_name("bowong-ai-video","ffmpeg_stream_record_restore", environment_name="test")
|
|
restore_id = fn.spawn(fn_id=fn_id).object_id
|
|
logger.info(f"restore id: {restore_id}")
|
|
|
|
async def test_hls_speed(self):
|
|
# 生成唯一ID和临时目录
|
|
id = uuid.uuid4()
|
|
temp_dir = f"./videos/{id}"
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
# HLS URL
|
|
# hls_url = "https://d2nj71io21vkj2.cloudfront.net/test/records/hls/fc-01JXBVTMMYSFNZ4SN1D225NMYV/playlist.m3u8"
|
|
hls_url = "https://cdn.roasmax.cn/test/records/hls/fc-01JWZS8954RZP4B13H3TA6TKMM/playlist.m3u8"
|
|
|
|
# 加载播放列表
|
|
playlist = m3u8.load(hls_url)
|
|
total_sample_size = len(playlist.segments)
|
|
used_sample_size = min(100, total_sample_size)
|
|
logger.info(f"total samples = {total_sample_size}, used sample = {used_sample_size}")
|
|
# 准备下载任务
|
|
urls = [segment.absolute_uri for segment in playlist.segments]
|
|
output_paths = [f"{temp_dir}/{segment.uri}" for segment in playlist.segments]
|
|
|
|
# 记录下载开始时间
|
|
start_time = time.time()
|
|
|
|
# 执行下载并收集结果
|
|
results = await HTTPDownloadUtils.batch_download(
|
|
urls=urls[:used_sample_size],
|
|
output_paths=output_paths[:used_sample_size]
|
|
)
|
|
|
|
# 计算总下载时间
|
|
total_time = time.time() - start_time
|
|
|
|
# 生成报告
|
|
report = self._generate_speed_report(
|
|
results=results,
|
|
urls=urls,
|
|
total_time=total_time,
|
|
playlist=playlist
|
|
)
|
|
|
|
# 打印报告
|
|
self._print_report(report)
|
|
|
|
# 保存报告到文件
|
|
self._save_report(report, temp_dir)
|
|
|
|
# 清理临时文件
|
|
self._cleanup(temp_dir)
|
|
|
|
return report
|
|
|
|
def _generate_speed_report(
|
|
self,
|
|
results: List[str],
|
|
urls: List[str],
|
|
total_time: float,
|
|
playlist: m3u8.M3U8
|
|
) -> Dict:
|
|
"""生成速度测试报告"""
|
|
|
|
# 计算每个分片的大小和下载时间
|
|
segment_stats = []
|
|
total_size = 0
|
|
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
logger.error(f"Failed to download segment {i}: {str(result)}")
|
|
continue
|
|
|
|
segment_size = os.path.getsize(result)
|
|
total_size += segment_size
|
|
|
|
segment_stats.append({
|
|
'index': i,
|
|
'url': urls[i],
|
|
'size': segment_size,
|
|
'duration': playlist.segments[i].duration,
|
|
'bandwidth': segment_size / playlist.segments[i].duration if playlist.segments[i].duration else 0
|
|
})
|
|
|
|
# 计算统计指标
|
|
sizes = [stat['size'] for stat in segment_stats]
|
|
bandwidths = [stat['bandwidth'] for stat in segment_stats]
|
|
|
|
report = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'total_segments': len(playlist.segments),
|
|
'successful_downloads': len(segment_stats),
|
|
'total_size': total_size,
|
|
'total_time': total_time,
|
|
'average_speed': total_size / total_time if total_time > 0 else 0,
|
|
'statistics': {
|
|
'size': {
|
|
'mean': statistics.mean(sizes),
|
|
'median': statistics.median(sizes),
|
|
'stdev': statistics.stdev(sizes) if len(sizes) > 1 else 0,
|
|
'min': min(sizes),
|
|
'max': max(sizes)
|
|
},
|
|
'bandwidth': {
|
|
'mean': statistics.mean(bandwidths),
|
|
'median': statistics.median(bandwidths),
|
|
'stdev': statistics.stdev(bandwidths) if len(bandwidths) > 1 else 0,
|
|
'min': min(bandwidths),
|
|
'max': max(bandwidths)
|
|
}
|
|
},
|
|
'segments': segment_stats
|
|
}
|
|
|
|
return report
|
|
|
|
def _print_report(self, report: Dict):
|
|
"""打印报告到控制台"""
|
|
logger.info("=== CloudFront CDN Speed Test Report ===")
|
|
logger.info(f"Timestamp: {report['timestamp']}")
|
|
logger.info(f"Total Segments: {report['total_segments']}")
|
|
logger.info(f"Successful Downloads: {report['successful_downloads']}")
|
|
logger.info(f"Total Size: {report['total_size'] / 1024 / 1024:.2f} MB")
|
|
logger.info(f"Total Time: {report['total_time']:.2f} seconds")
|
|
logger.info(f"Average Speed: {report['average_speed'] / 1024 / 1024:.2f} MB/s")
|
|
|
|
logger.info("Size Statistics:")
|
|
logger.info(f" Mean: {report['statistics']['size']['mean'] / 1024:.2f} KB")
|
|
logger.info(f" Median: {report['statistics']['size']['median'] / 1024:.2f} KB")
|
|
logger.info(f" StdDev: {report['statistics']['size']['stdev'] / 1024:.2f} KB")
|
|
logger.info(f" Min: {report['statistics']['size']['min'] / 1024:.2f} KB")
|
|
logger.info(f" Max: {report['statistics']['size']['max'] / 1024:.2f} KB")
|
|
|
|
logger.info("Bandwidth Statistics:")
|
|
logger.info(f" Mean: {report['statistics']['bandwidth']['mean'] / 1024:.2f} KB/s")
|
|
logger.info(f" Median: {report['statistics']['bandwidth']['median'] / 1024:.2f} KB/s")
|
|
logger.info(f" StdDev: {report['statistics']['bandwidth']['stdev'] / 1024:.2f} KB/s")
|
|
logger.info(f" Min: {report['statistics']['bandwidth']['min'] / 1024:.2f} KB/s")
|
|
logger.info(f" Max: {report['statistics']['bandwidth']['max'] / 1024:.2f} KB/s")
|
|
|
|
def _save_report(self, report: Dict, temp_dir: str):
|
|
"""保存报告到文件"""
|
|
import json
|
|
report_path = os.path.join(temp_dir, 'speed_test_report.json')
|
|
with open(report_path, 'w') as f:
|
|
json.dump(report, f, indent=2)
|
|
logger.info(f"\nReport saved to: {report_path}")
|
|
|
|
def _cleanup(self, temp_dir: str):
|
|
"""清理临时文件"""
|
|
import shutil
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
logger.info(f"Cleaned up temporary directory: {temp_dir}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to cleanup temporary directory: {str(e)}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |