import asyncio import json import math import unittest import xml.etree.ElementTree as ET import boto3 import httpx from botocore.client import BaseClient from botocore.config import Config from loguru import logger from BowongModalFunctions.utils.PathUtils import FileUtils class S3TestCase(unittest.IsolatedAsyncioTestCase): AWS_ACCESS_KEY_ID = "AKIAYRH5NGRSWHN2L4M6" AWS_SECRET_ACCESS_KEY = "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" S3_BUCKET = "modal-media-cache" S3_REGION = "ap-northeast-2" client: BaseClient semaphore = asyncio.Semaphore(20) def setUp(self): self.client = boto3.client("s3", aws_access_key_id=self.AWS_ACCESS_KEY_ID, aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY, region_name=self.S3_REGION, endpoint_url="https://s3-accelerate.amazonaws.com", config=Config(s3={'addressing_style': 'virtual'}, signature_version='s3v4')) async def test_cloudfront_presign_upload(self): key = "upload/test.mp4" signed_url = self.client.generate_presigned_url("put_object", Params={ 'Bucket': self.S3_BUCKET, 'Key': key, "ContentType": "video/mp4", }, ExpiresIn=3600) logger.info(signed_url) async def test_cloudfront_presign_multipart_upload(self): local_filename = "./videos/fc-01JWDYP1PXBQ0MQGM9YBNPZQ1E.mp4" chunk_size = 1024 * 1024 * 5 total_file_size = FileUtils.get_file_size(local_filename) chunk_count = math.ceil(total_file_size / chunk_size) logger.info(f"{chunk_count} * {chunk_size} of {total_file_size} bytes") key = "upload/test2.mp4" content_type = "video/mp4" multipart_upload_response = self.client.create_multipart_upload(Bucket=self.S3_BUCKET, Key=key, ContentType=content_type, ) logger.info(multipart_upload_response) upload_id = multipart_upload_response.get("UploadId") signed_urls = [] for i in range(chunk_count): signed_url = self.client.generate_presigned_url("upload_part", Params={ 'Bucket': self.S3_BUCKET, 'Key': key, 'PartNumber': i + 1, 'UploadId': upload_id, }, ExpiresIn=3600) signed_urls.append(signed_url) # logger.info(signed_url) logger.info(signed_urls) signed_completed_url = self.client.generate_presigned_url("complete_multipart_upload", Params={ 'Bucket': self.S3_BUCKET, 'Key': key, 'UploadId': upload_id, }, ExpiresIn=3600) signed_list_url = self.client.generate_presigned_url("list_parts", Params={ 'Bucket': self.S3_BUCKET, 'Key': key, 'UploadId': upload_id, }, ExpiresIn=3600) json_data = { "parts": signed_urls, "list": signed_list_url, "completed": signed_completed_url } logger.info(json.dumps(json_data, ensure_ascii=False, indent=2)) async def upload_task(part_number: int, url: str, local_file: str, start: int, end: int, client: httpx.AsyncClient): async with self.semaphore: try: logger.info(f"[{part_number}] Uploading {start} -> {end}") with open(local_file, "rb") as f: f.seek(start) data = f.read(end - start) response = await client.put(url=url, content=data) response.raise_for_status() Etag = response.headers.get("ETag").strip('"') # logger.info(f"headers: {response.headers}") logger.info(Etag) return {"ETag": Etag, "PartNumber": part_number} except Exception as e: logger.exception(e) raise e upload_tasks = [] async with httpx.AsyncClient(timeout=30) as client: async with self.semaphore: for i, signed_url in enumerate(signed_urls): start = i * chunk_size end = min(start + chunk_size, total_file_size) curr_chunk_size = end - start logger.info(f"No.{i + 1} chunks of {curr_chunk_size} bytes") upload_tasks.append( upload_task(part_number=i + 1, url=signed_url, local_file=local_filename, start=start, end=end, client=client)) # 等待所有上传任务完成 results = await asyncio.gather(*upload_tasks) logger.info(results) results.sort(key=lambda x: x["PartNumber"]) logger.info(json.dumps(results, ensure_ascii=False, indent=2)) # 生成XML格式的请求体 root = ET.Element('CompleteMultipartUpload') for part in results: part_elem = ET.SubElement(root, 'Part') part_number = ET.SubElement(part_elem, 'PartNumber') part_number.text = str(part['PartNumber']) etag = ET.SubElement(part_elem, 'ETag') etag_str = part['ETag'] etag.text = f'"{etag_str}"' # 将XML转换为字符串 xml_data = ET.tostring(root, encoding='utf-8') logger.info(f"xml : {xml_data}") completed_response = await client.post(url=signed_completed_url, content=xml_data, headers={"Content-Type": "application/xml"}) logger.info(f"[HTTP {completed_response.status_code}] {completed_response.text}") if __name__ == '__main__': unittest.main()