Merge branch 'feature/modal-cluster' into feature/temp-comfy-cluster

# Conflicts:
#	.idea/misc.xml
#	.idea/modalDeploy.iml
#	src/BowongModalFunctions/api.py
This commit is contained in:
kyj@bowong.ai 2025-05-16 18:19:21 +08:00
commit 8f60f205ed
15 changed files with 49 additions and 105 deletions

8
.idea/.gitignore vendored
View File

@ -1,8 +0,0 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" multifile-model="true">
<data-source source="LOCAL" name="MySQL" uuid="0c96f6ba-5b9d-4b2d-ac3f-629fdfb28b12">
<driver-ref>mysql.8</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>com.mysql.cj.jdbc.Driver</jdbc-driver>
<jdbc-url>jdbc:mysql://root:MrYRE^u9aU$n@sh-cynosdbmysql-grp-hofmpgvu.sql.tencentcdb.com:23314/local_merge</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
</component>
</project>

View File

@ -1,13 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N802" />
</list>
</option>
</inspection_tool>
</profile>
</component>

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.9 (pythonProject)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="uv (modalDeploy)" project-jdk-type="Python SDK" />
</project>

View File

@ -1,11 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="uv (modalDeploy)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/modalDeploy.iml" filepath="$PROJECT_DIR$/.idea/modalDeploy.iml" />
</modules>
</component>
</project>

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@ -30,6 +30,7 @@ dependencies = [
"scalar-fastapi>=1.0.3",
"modal>=0.76.3",
"python-dotenv>=1.1.0",
"python-multipart>=0.0.20",
]
classifiers = [
"Programming Language :: Python :: 3",

View File

@ -182,9 +182,9 @@ def batch_remove_cloudflare_kv(keys: List[str]):
async def get_modal_task_status(task_id: str) -> Tuple[
TaskStatus, Optional[int], Optional[str], Optional[Any], Optional[SentryTransactionInfo]]:
"""
:param task_id:
:return: (TaskStaus, errorCode, errorReason, results, sentryTransactionInfo)
使用modal任务id查看任务运行状态和结果
:param task_id: modal 任务id
:return: (TaskStaus 任务运行状态, errorCode 错误代码, errorReason 错误原因, results 任务结果, sentryTransactionInfo Sentry跟踪信息)
"""
try:
fn_task = modal.FunctionCall.from_id(task_id)
@ -402,7 +402,7 @@ async def slice_media(request: FFMPEGSliceRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(request.media, request.markers, sentry_trace)
fn_call = fn.spawn(media=request.media, markers=request.markers, sentry_trace=sentry_trace, webhook=request.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -439,7 +439,7 @@ async def concat_media(body: FFMPEGConcatRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(medias, sentry_trace)
fn_call = fn.spawn(medias, sentry_trace, webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -478,7 +478,7 @@ async def extract_audio(body: FFMPEGExtractAudioRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media, sentry_trace)
fn_call = fn.spawn(media, sentry_trace, webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -513,7 +513,7 @@ async def corner_mirror(body: FFMPEGCornerMirrorRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media=media, sentry_trace=sentry_trace)
fn_call = fn.spawn(media=media, sentry_trace=sentry_trace, webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -546,7 +546,7 @@ async def overlay_gif(body: FFMPEGOverlayGifRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media=body.media, gif=body.gif, sentry_trace=sentry_trace)
fn_call = fn.spawn(media=body.media, gif=body.gif, sentry_trace=sentry_trace, webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -576,7 +576,8 @@ async def zoom_loop(body: FFMPEGZoomLoopRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media=body.media, duration=body.duration, zoom=body.zoom, sentry_trace=sentry_trace)
fn_call = fn.spawn(media=body.media, duration=body.duration, zoom=body.zoom, sentry_trace=sentry_trace,
webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -607,7 +608,8 @@ async def subtitle_apply(body: FFMPEGSubtitleOverlayRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media=body.media, subtitle=body.subtitle, fonts=body.fonts, sentry_trace=sentry_trace)
fn_call = fn.spawn(media=body.media, subtitle=body.subtitle, fonts=body.fonts, sentry_trace=sentry_trace,
webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@ -639,7 +641,7 @@ async def bgm_nosie_reduce(body: FFMPEGMixBgmWithNoiseReduceRequest,
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media=body.media, bgm=body.bgm, video_volume=body.video_volume, music_volume=body.music_volume,
noise_sample=body.noise_sample, sentry_trace=sentry_trace)
noise_sample=body.noise_sample, sentry_trace=sentry_trace, webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)

View File

@ -55,6 +55,8 @@ class MediaSource(BaseModel):
urn=media_url)
elif media_url.startswith('s3://'): # s3://{endpoint}/{bucket}/{url}
paths = media_url[5:].split('/')
if len(paths) < 3:
raise ValidationError("URN-s3 格式错误")
return MediaSource(path='/'.join(paths[2:]),
protocol=MediaProtocol.s3,
endpoint=paths[0],
@ -62,6 +64,8 @@ class MediaSource(BaseModel):
urn=media_url)
elif media_url.startswith('vod://'): # vod://{endpoint}/{subAppId}/{fileId}
paths = media_url[6:].split('/')
if len(paths) < 3:
raise ValidationError("URN-vod 格式错误")
# 兼容有文件类型后缀和没有文件类型后缀的格式
url = paths[2] if '.' in os.path.basename(paths[2]) else paths[2] + ".mp4"
return MediaSource(path=url,
@ -105,7 +109,7 @@ class MediaSource(BaseModel):
@computed_field(description="s3挂载路径下的缓存相对路径")
@property
def cache_filepath(self, local_mount: bool = False) -> str:
def cache_filepath(self) -> str:
match self.protocol:
case MediaProtocol.s3:
# 本地挂载缓存

View File

@ -53,7 +53,7 @@ class WebhookNotify(BaseModel):
endpoint: HttpUrl = Field(description="Webhook回调端点", examples=["https://webhook.example.com"])
method: WebhookMethodEnum = Field(description="Webhook回调请求方法", examples=["get", "post"])
authentication: Optional[str] = Field(description="Webhook authentication回调请求降权token如不需要鉴权则不填",
examples=["Bearer 123456"])
examples=["Bearer 123456"], default=None)

View File

@ -1,8 +1,10 @@
from typing import Optional, Tuple, Any
import asyncio
from typing import Optional, Tuple, Any, Coroutine
import httpx
import psutil
import sentry_sdk
from loguru import logger
import functools
from BowongModalFunctions.models.web_model import WebhookNotify, WebhookMethodEnum, BaseFFMPEGTaskStatusResponse, \
TaskStatus, ErrorCode
@ -21,6 +23,7 @@ class SentryUtils:
def sentry_tracker(sentry_trace_id: Optional[str], sentry_baggage: Optional[str],
op: Optional[str], name: Optional[str], fn_id: str):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if sentry_trace_id and sentry_baggage:
transaction = sentry_sdk.continue_trace(environ_or_headers={"sentry-trace": sentry_trace_id,
@ -46,9 +49,10 @@ class SentryUtils:
@staticmethod
def webhook_handler(webhook: WebhookNotify, func_id: str):
def decorator(func):
def wrapper(*args, **kwargs):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
result = await func(*args, **kwargs)
status = TaskStatus.success
error = None
code = ErrorCode.SUCCESS.value
@ -58,20 +62,23 @@ class SentryUtils:
status = TaskStatus.failed
error = e.message if hasattr(e, 'message') else str(e)
code = ErrorCode.SYSTEM_ERROR.value
with httpx.Client() as client:
match webhook.method:
case WebhookMethodEnum.POST:
response = client.post(url=webhook.endpoint.__str__(),
json=BaseFFMPEGTaskStatusResponse(
taskId=func_id, status=status, error=error,
code=code,
results=[result] if isinstance(result, str) else result,
).model_dump())
case WebhookMethodEnum.GET:
response = client.post(url=webhook.endpoint.__str__())
response.raise_for_status()
logger.info(f"webhook = {webhook}")
if webhook:
with httpx.Client() as client:
match webhook.method:
case WebhookMethodEnum.POST:
response = client.post(url=webhook.endpoint.__str__(),
json=BaseFFMPEGTaskStatusResponse(
taskId=func_id, status=status, error=error,
code=code,
results=[result] if isinstance(result, str) else result,
).model_dump())
logger.info(f"webhook response = {response}")
case WebhookMethodEnum.GET:
response = client.post(url=webhook.endpoint.__str__())
response.raise_for_status()
return result
return wrapper
return async_wrapper
return decorator

View File

@ -133,16 +133,15 @@ with ffmpeg_worker_image.imports():
input_videos = [f"{s3_mount}/{media_source.cache_filepath}" for media_source in media_sources.inputs]
local_output_path = await VideoUtils.ffmpeg_concat_medias(media_paths=input_videos,
output_path=output_filepath)
return local_output_path
s3_outputs = local_copy_to_s3([local_output_path])
return s3_outputs[0]
output_path = f"{output_path_prefix}/{config.modal_environment}/concat/outputs/{fn_id}/output.mp4"
local_output = await ffmpeg_process(media_sources=medias, output_filepath=output_path)
s3_outputs = local_copy_to_s3([local_output])
s3_output = await ffmpeg_process(media_sources=medias, output_filepath=output_path)
if not sentry_trace:
sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(),
x_baggage=sentry_sdk.get_baggage())
return s3_outputs[0], sentry_trace
return s3_output, sentry_trace
@app.function(

View File

@ -186,6 +186,7 @@ dependencies = [
{ name = "pyloudnorm" },
{ name = "python-dotenv" },
{ name = "python-ffmpeg" },
{ name = "python-multipart" },
{ name = "scalar-fastapi" },
{ name = "sentry-sdk", extra = ["loguru"] },
{ name = "soundfile" },
@ -217,6 +218,7 @@ requires-dist = [
{ name = "pyloudnorm", specifier = ">=0.1.1" },
{ name = "python-dotenv", specifier = ">=1.1.0" },
{ name = "python-ffmpeg", specifier = ">=2.0.12" },
{ name = "python-multipart", specifier = ">=0.0.20" },
{ name = "scalar-fastapi", specifier = ">=1.0.3" },
{ name = "sentry-sdk", extras = ["loguru"], specifier = ">=2.26.1" },
{ name = "soundfile", specifier = ">=0.13.1" },