modalDeploy/tests/s3_test_case.py

139 lines
7.0 KiB
Python

import asyncio
import json
import math
import unittest
import xml.etree.ElementTree as ET
import boto3
import httpx
from botocore.client import BaseClient
from botocore.config import Config
from loguru import logger
from BowongModalFunctions.utils.PathUtils import FileUtils
class S3TestCase(unittest.IsolatedAsyncioTestCase):
AWS_ACCESS_KEY_ID = "AKIAYRH5NGRSWHN2L4M6"
AWS_SECRET_ACCESS_KEY = "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"
S3_BUCKET = "modal-media-cache"
S3_REGION = "ap-northeast-2"
client: BaseClient
semaphore = asyncio.Semaphore(20)
def setUp(self):
self.client = boto3.client("s3",
aws_access_key_id=self.AWS_ACCESS_KEY_ID,
aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
region_name=self.S3_REGION,
endpoint_url="https://s3-accelerate.amazonaws.com",
config=Config(s3={'addressing_style': 'virtual'}, signature_version='s3v4'))
async def test_cloudfront_presign_upload(self):
key = "upload/test.mp4"
signed_url = self.client.generate_presigned_url("put_object",
Params={
'Bucket': self.S3_BUCKET,
'Key': key,
"ContentType": "video/mp4",
}, ExpiresIn=3600)
logger.info(signed_url)
async def test_cloudfront_presign_multipart_upload(self):
local_filename = "./videos/fc-01JWDYP1PXBQ0MQGM9YBNPZQ1E.mp4"
chunk_size = 1024 * 1024 * 5
total_file_size = FileUtils.get_file_size(local_filename)
chunk_count = math.ceil(total_file_size / chunk_size)
logger.info(f"{chunk_count} * {chunk_size} of {total_file_size} bytes")
key = "upload/test2.mp4"
content_type = "video/mp4"
multipart_upload_response = self.client.create_multipart_upload(Bucket=self.S3_BUCKET, Key=key,
ContentType=content_type, )
logger.info(multipart_upload_response)
upload_id = multipart_upload_response.get("UploadId")
signed_urls = []
for i in range(chunk_count):
signed_url = self.client.generate_presigned_url("upload_part",
Params={
'Bucket': self.S3_BUCKET,
'Key': key,
'PartNumber': i + 1,
'UploadId': upload_id,
}, ExpiresIn=3600)
signed_urls.append(signed_url)
# logger.info(signed_url)
logger.info(signed_urls)
signed_completed_url = self.client.generate_presigned_url("complete_multipart_upload",
Params={
'Bucket': self.S3_BUCKET,
'Key': key,
'UploadId': upload_id,
}, ExpiresIn=3600)
signed_list_url = self.client.generate_presigned_url("list_parts",
Params={
'Bucket': self.S3_BUCKET,
'Key': key,
'UploadId': upload_id,
}, ExpiresIn=3600)
json_data = {
"parts": signed_urls,
"list": signed_list_url,
"completed": signed_completed_url
}
logger.info(json.dumps(json_data, ensure_ascii=False, indent=2))
async def upload_task(part_number: int, url: str, local_file: str, start: int, end: int,
client: httpx.AsyncClient):
async with self.semaphore:
try:
logger.info(f"[{part_number}] Uploading {start} -> {end}")
with open(local_file, "rb") as f:
f.seek(start)
data = f.read(end - start)
response = await client.put(url=url, content=data)
response.raise_for_status()
Etag = response.headers.get("ETag").strip('"')
# logger.info(f"headers: {response.headers}")
logger.info(Etag)
return {"ETag": Etag, "PartNumber": part_number}
except Exception as e:
logger.exception(e)
raise e
upload_tasks = []
async with httpx.AsyncClient(timeout=30) as client:
async with self.semaphore:
for i, signed_url in enumerate(signed_urls):
start = i * chunk_size
end = min(start + chunk_size, total_file_size)
curr_chunk_size = end - start
logger.info(f"No.{i + 1} chunks of {curr_chunk_size} bytes")
upload_tasks.append(
upload_task(part_number=i + 1, url=signed_url,
local_file=local_filename, start=start, end=end,
client=client))
# 等待所有上传任务完成
results = await asyncio.gather(*upload_tasks)
logger.info(results)
results.sort(key=lambda x: x["PartNumber"])
logger.info(json.dumps(results, ensure_ascii=False, indent=2))
# 生成XML格式的请求体
root = ET.Element('CompleteMultipartUpload')
for part in results:
part_elem = ET.SubElement(root, 'Part')
part_number = ET.SubElement(part_elem, 'PartNumber')
part_number.text = str(part['PartNumber'])
etag = ET.SubElement(part_elem, 'ETag')
etag_str = part['ETag']
etag.text = f'"{etag_str}"'
# 将XML转换为字符串
xml_data = ET.tostring(root, encoding='utf-8')
logger.info(f"xml : {xml_data}")
completed_response = await client.post(url=signed_completed_url, content=xml_data,
headers={"Content-Type": "application/xml"})
logger.info(f"[HTTP {completed_response.status_code}] {completed_response.text}")
if __name__ == '__main__':
unittest.main()