mxivideo/python_core/utils/TimeUtils.py

164 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from datetime import timedelta, datetime
def parse_time(time_str):
"""解析时间字符串为datetime对象"""
parsed_time = None
if re.match(r"^\d{2}:\d{2}:\d{2}\.\d{3}$", time_str):
# 先尝试完整格式 HH:MM:SS.fff
parsed_time = datetime.strptime(time_str, '%H:%M:%S.%f')
elif re.match(r"^\d{2}:\d{2}:\d{2}:\d{3}$", time_str):
# 如果失败,尝试 HH:MM:SS:fff 格式
parsed_time = datetime.strptime(time_str, '%H:%M:%S:%f')
elif re.match(r"^\d{2}:\d{2}\.\d{3}$", time_str):
# 如果失败,尝试 MM:SS.fff 格式
dt = datetime.strptime(time_str, '%M:%S.%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}\.\d{2}:\d{3}$", time_str):
# 如果失败,尝试 MM.SS:fff 格式
dt = datetime.strptime(time_str, '%M.%S:%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}\.\d{2}\.\d{3}$", time_str):
# 如果失败,尝试 MM.SS.fff 格式
dt = datetime.strptime(time_str, '%M.%S.%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}:\d{2}:\d{3}$", time_str):
# 如果失败,尝试 MM:SS:fff 格式
dt = datetime.strptime(time_str, '%M:%S:%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}:\d{2}$", time_str):
# 如果失败,尝试 MM:SS:fff 格式
dt = datetime.strptime(time_str, '%M:%S')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}\.\d{2}$", time_str):
# 如果失败,尝试 MM:SS:fff 格式
dt = datetime.strptime(time_str, '%M.%S')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
else:
raise RuntimeError(f"转换时间格式失败 {time_str}")
return parsed_time
def format_time(dt):
"""将datetime对象格式化为时间字符串"""
return dt.strftime('%H:%M:%S.%f')[:-3] # 保留3位小数
def parse_timeline_item(item):
"""解析时间线项目,提取开始和结束时间"""
time_range, _ = item.split(' (', 1)
start_time_str, end_time_str = time_range.split(' - ')
return parse_time(start_time_str), parse_time(end_time_str)
def format_timeline_item(start_time, end_time, activity):
"""格式化时间线项目"""
return f"{format_time(start_time)} - {format_time(end_time)} ({activity})"
def merge_timeline_items(items, merge_diff=5):
"""合并相邻的时间线项目,保留不同的活动描述"""
if not items:
return []
# 解析所有项目
parsed_items = []
for item in items:
start_time, end_time = parse_timeline_item(item)
activity = item.split(' (', 1)[1].rstrip(')')
parsed_items.append((start_time, end_time, activity))
# 按开始时间排序
parsed_items.sort(key=lambda x: x[0])
# 合并相邻的时间段
merged_items = [parsed_items[0]]
for current in parsed_items[1:]:
last = merged_items[-1]
# 合法时间段
if current[1] > current[0]:
# 如果当前项目的开始时间与上一个项目的结束时间相邻或重叠 活动描述相同,直接合并
if current[0] <= last[1] + timedelta(seconds=merge_diff) and last[2] == current[2]:
# 更新结束时间为两个结束时间的最大值
new_end_time = max(last[1], current[1])
merged_items[-1] = (last[0], new_end_time, last[2])
else:
# 不相邻,添加新项目
merged_items.append(current)
# 格式化回原始字符串格式
return [format_timeline_item(start, end, activity) for start, end, activity in merged_items]
def convert_time(time_str):
# 去除秒字段并转换为标准时间
parts = time_str.split(':')
if len(parts) == 3:
new_time_str = f"00:{parts[0]}:{parts[1]}.{parts[2].split('.')[1]}"
return new_time_str
return time_str
def merge_product_data(data, start_time_str, end_time_str, merge_diff=5):
"""合并相同产品的数据"""
start_time = parse_time(start_time_str)
end_time = parse_time(end_time_str)
duration = end_time - start_time
# 手动格式化时间差
total_seconds = duration.total_seconds()
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
microseconds = duration.microseconds
max_time_str = f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}.{microseconds // 1000:03d}"
product_dict = {}
# 按产品名称分组
for item in data:
product = item["product"]
if product not in product_dict:
product_dict[product] = {"product": product, "timeline": []}
product_dict[product]["timeline"].extend(item["timeline"])
# 合并每个产品的时间线
for product in product_dict:
timeline = product_dict[product]["timeline"]
new_timeline = []
for item in timeline:
start, end = parse_timeline_item(item)
# 比较起始时间与时间差
start_str = format_time(start)
if (start - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration and not start_str.startswith("00"):
new_start_str = convert_time(start_str)
else:
new_start_str = start_str
if (parse_time(new_start_str) - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration:
new_start_str = max_time_str
end_str = format_time(end)
if (end - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration and not end_str.startswith("00"):
new_end_str = convert_time(end_str)
else:
new_end_str = end_str
if (parse_time(new_end_str) - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration:
new_end_str = max_time_str
activity = item.split(' (', 1)[1].rstrip(')')
new_item = f"{new_start_str} - {new_end_str} ({activity})"
new_timeline.append(new_item)
product_dict[product]["timeline"] = merge_timeline_items(new_timeline, merge_diff=merge_diff)
# 返回合并后的列表
return list(product_dict.values())