diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 35410ca..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# 默认忽略的文件 -/shelf/ -/workspace.xml -# 基于编辑器的 HTTP 客户端请求 -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml deleted file mode 100644 index f44cd4f..0000000 --- a/.idea/dataSources.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - mysql.8 - true - com.mysql.cj.jdbc.Driver - jdbc:mysql://root:MrYRE^u9aU$n@sh-cynosdbmysql-grp-hofmpgvu.sql.tencentcdb.com:23314/local_merge - $ProjectFileDir$ - - - \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml deleted file mode 100644 index d93e1eb..0000000 --- a/.idea/inspectionProfiles/Project_Default.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml deleted file mode 100644 index 105ce2d..0000000 --- a/.idea/inspectionProfiles/profiles_settings.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index b657481..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/modalDeploy.iml b/.idea/modalDeploy.iml deleted file mode 100644 index b118175..0000000 --- a/.idea/modalDeploy.iml +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index f4632ae..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 94a25f7..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2412368..4a0e084 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "scalar-fastapi>=1.0.3", "modal>=0.76.3", "python-dotenv>=1.1.0", + "python-multipart>=0.0.20", ] classifiers = [ "Programming Language :: Python :: 3", diff --git a/src/BowongModalFunctions/api.py b/src/BowongModalFunctions/api.py index 7ff86e5..a2bf2a6 100644 --- a/src/BowongModalFunctions/api.py +++ b/src/BowongModalFunctions/api.py @@ -182,9 +182,9 @@ def batch_remove_cloudflare_kv(keys: List[str]): async def get_modal_task_status(task_id: str) -> Tuple[ TaskStatus, Optional[int], Optional[str], Optional[Any], Optional[SentryTransactionInfo]]: """ - - :param task_id: - :return: (TaskStaus, errorCode, errorReason, results, sentryTransactionInfo) + 使用modal任务id查看任务运行状态和结果 + :param task_id: modal 任务id + :return: (TaskStaus 任务运行状态, errorCode 错误代码, errorReason 错误原因, results 任务结果, sentryTransactionInfo Sentry跟踪信息) """ try: fn_task = modal.FunctionCall.from_id(task_id) @@ -402,7 +402,7 @@ async def slice_media(request: FFMPEGSliceRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(request.media, request.markers, sentry_trace) + fn_call = fn.spawn(media=request.media, markers=request.markers, sentry_trace=sentry_trace, webhook=request.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -439,7 +439,7 @@ async def concat_media(body: FFMPEGConcatRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(medias, sentry_trace) + fn_call = fn.spawn(medias, sentry_trace, webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -478,7 +478,7 @@ async def extract_audio(body: FFMPEGExtractAudioRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(media, sentry_trace) + fn_call = fn.spawn(media, sentry_trace, webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -513,7 +513,7 @@ async def corner_mirror(body: FFMPEGCornerMirrorRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(media=media, sentry_trace=sentry_trace) + fn_call = fn.spawn(media=media, sentry_trace=sentry_trace, webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -546,7 +546,7 @@ async def overlay_gif(body: FFMPEGOverlayGifRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(media=body.media, gif=body.gif, sentry_trace=sentry_trace) + fn_call = fn.spawn(media=body.media, gif=body.gif, sentry_trace=sentry_trace, webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -576,7 +576,8 @@ async def zoom_loop(body: FFMPEGZoomLoopRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(media=body.media, duration=body.duration, zoom=body.zoom, sentry_trace=sentry_trace) + fn_call = fn.spawn(media=body.media, duration=body.duration, zoom=body.zoom, sentry_trace=sentry_trace, + webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -607,7 +608,8 @@ async def subtitle_apply(body: FFMPEGSubtitleOverlayRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(media=body.media, subtitle=body.subtitle, fonts=body.fonts, sentry_trace=sentry_trace) + fn_call = fn.spawn(media=body.media, subtitle=body.subtitle, fonts=body.fonts, sentry_trace=sentry_trace, + webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) @@ -639,7 +641,7 @@ async def bgm_nosie_reduce(body: FFMPEGMixBgmWithNoiseReduceRequest, if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) fn_call = fn.spawn(media=body.media, bgm=body.bgm, video_volume=body.video_volume, music_volume=body.music_volume, - noise_sample=body.noise_sample, sentry_trace=sentry_trace) + noise_sample=body.noise_sample, sentry_trace=sentry_trace, webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) diff --git a/src/BowongModalFunctions/models/media_model.py b/src/BowongModalFunctions/models/media_model.py index 034be3c..f71b532 100644 --- a/src/BowongModalFunctions/models/media_model.py +++ b/src/BowongModalFunctions/models/media_model.py @@ -55,6 +55,8 @@ class MediaSource(BaseModel): urn=media_url) elif media_url.startswith('s3://'): # s3://{endpoint}/{bucket}/{url} paths = media_url[5:].split('/') + if len(paths) < 3: + raise ValidationError("URN-s3 格式错误") return MediaSource(path='/'.join(paths[2:]), protocol=MediaProtocol.s3, endpoint=paths[0], @@ -62,6 +64,8 @@ class MediaSource(BaseModel): urn=media_url) elif media_url.startswith('vod://'): # vod://{endpoint}/{subAppId}/{fileId} paths = media_url[6:].split('/') + if len(paths) < 3: + raise ValidationError("URN-vod 格式错误") # 兼容有文件类型后缀和没有文件类型后缀的格式 url = paths[2] if '.' in os.path.basename(paths[2]) else paths[2] + ".mp4" return MediaSource(path=url, @@ -105,7 +109,7 @@ class MediaSource(BaseModel): @computed_field(description="s3挂载路径下的缓存相对路径") @property - def cache_filepath(self, local_mount: bool = False) -> str: + def cache_filepath(self) -> str: match self.protocol: case MediaProtocol.s3: # 本地挂载缓存 diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index 807a3d5..6603cf1 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -53,7 +53,7 @@ class WebhookNotify(BaseModel): endpoint: HttpUrl = Field(description="Webhook回调端点", examples=["https://webhook.example.com"]) method: WebhookMethodEnum = Field(description="Webhook回调请求方法", examples=["get", "post"]) authentication: Optional[str] = Field(description="Webhook authentication回调请求降权token,如不需要鉴权则不填", - examples=["Bearer 123456"]) + examples=["Bearer 123456"], default=None) diff --git a/src/BowongModalFunctions/utils/SentryUtils.py b/src/BowongModalFunctions/utils/SentryUtils.py index ac9962d..f47cb3b 100644 --- a/src/BowongModalFunctions/utils/SentryUtils.py +++ b/src/BowongModalFunctions/utils/SentryUtils.py @@ -1,8 +1,10 @@ -from typing import Optional, Tuple, Any +import asyncio +from typing import Optional, Tuple, Any, Coroutine import httpx import psutil import sentry_sdk from loguru import logger +import functools from BowongModalFunctions.models.web_model import WebhookNotify, WebhookMethodEnum, BaseFFMPEGTaskStatusResponse, \ TaskStatus, ErrorCode @@ -21,6 +23,7 @@ class SentryUtils: def sentry_tracker(sentry_trace_id: Optional[str], sentry_baggage: Optional[str], op: Optional[str], name: Optional[str], fn_id: str): def decorator(func): + @functools.wraps(func) def wrapper(*args, **kwargs): if sentry_trace_id and sentry_baggage: transaction = sentry_sdk.continue_trace(environ_or_headers={"sentry-trace": sentry_trace_id, @@ -46,9 +49,10 @@ class SentryUtils: @staticmethod def webhook_handler(webhook: WebhookNotify, func_id: str): def decorator(func): - def wrapper(*args, **kwargs): + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): try: - result = func(*args, **kwargs) + result = await func(*args, **kwargs) status = TaskStatus.success error = None code = ErrorCode.SUCCESS.value @@ -58,20 +62,23 @@ class SentryUtils: status = TaskStatus.failed error = e.message if hasattr(e, 'message') else str(e) code = ErrorCode.SYSTEM_ERROR.value - with httpx.Client() as client: - match webhook.method: - case WebhookMethodEnum.POST: - response = client.post(url=webhook.endpoint.__str__(), - json=BaseFFMPEGTaskStatusResponse( - taskId=func_id, status=status, error=error, - code=code, - results=[result] if isinstance(result, str) else result, - ).model_dump()) - case WebhookMethodEnum.GET: - response = client.post(url=webhook.endpoint.__str__()) - response.raise_for_status() + logger.info(f"webhook = {webhook}") + if webhook: + with httpx.Client() as client: + match webhook.method: + case WebhookMethodEnum.POST: + response = client.post(url=webhook.endpoint.__str__(), + json=BaseFFMPEGTaskStatusResponse( + taskId=func_id, status=status, error=error, + code=code, + results=[result] if isinstance(result, str) else result, + ).model_dump()) + logger.info(f"webhook response = {response}") + case WebhookMethodEnum.GET: + response = client.post(url=webhook.endpoint.__str__()) + response.raise_for_status() return result - return wrapper + return async_wrapper return decorator diff --git a/src/cluster/ffmpeg_app.py b/src/cluster/ffmpeg_app.py index 6aed804..b343439 100644 --- a/src/cluster/ffmpeg_app.py +++ b/src/cluster/ffmpeg_app.py @@ -133,16 +133,15 @@ with ffmpeg_worker_image.imports(): input_videos = [f"{s3_mount}/{media_source.cache_filepath}" for media_source in media_sources.inputs] local_output_path = await VideoUtils.ffmpeg_concat_medias(media_paths=input_videos, output_path=output_filepath) - return local_output_path + s3_outputs = local_copy_to_s3([local_output_path]) + return s3_outputs[0] output_path = f"{output_path_prefix}/{config.modal_environment}/concat/outputs/{fn_id}/output.mp4" - local_output = await ffmpeg_process(media_sources=medias, output_filepath=output_path) - s3_outputs = local_copy_to_s3([local_output]) - + s3_output = await ffmpeg_process(media_sources=medias, output_filepath=output_path) if not sentry_trace: sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), x_baggage=sentry_sdk.get_baggage()) - return s3_outputs[0], sentry_trace + return s3_output, sentry_trace @app.function( diff --git a/uv.lock b/uv.lock index 6130918..16d32d4 100644 --- a/uv.lock +++ b/uv.lock @@ -186,6 +186,7 @@ dependencies = [ { name = "pyloudnorm" }, { name = "python-dotenv" }, { name = "python-ffmpeg" }, + { name = "python-multipart" }, { name = "scalar-fastapi" }, { name = "sentry-sdk", extra = ["loguru"] }, { name = "soundfile" }, @@ -217,6 +218,7 @@ requires-dist = [ { name = "pyloudnorm", specifier = ">=0.1.1" }, { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "python-ffmpeg", specifier = ">=2.0.12" }, + { name = "python-multipart", specifier = ">=0.0.20" }, { name = "scalar-fastapi", specifier = ">=1.0.3" }, { name = "sentry-sdk", extras = ["loguru"], specifier = ">=2.26.1" }, { name = "soundfile", specifier = ">=0.13.1" },