import re from datetime import timedelta, datetime def parse_time(time_str): """解析时间字符串为datetime对象""" parsed_time = None if re.match(r"^\d{2}:\d{2}:\d{2}\.\d{3}$", time_str): # 先尝试完整格式 HH:MM:SS.fff parsed_time = datetime.strptime(time_str, '%H:%M:%S.%f') elif re.match(r"^\d{2}:\d{2}:\d{2}:\d{3}$", time_str): # 如果失败,尝试 HH:MM:SS:fff 格式 parsed_time = datetime.strptime(time_str, '%H:%M:%S:%f') elif re.match(r"^\d{2}:\d{2}\.\d{3}$", time_str): # 如果失败,尝试 MM:SS.fff 格式 dt = datetime.strptime(time_str, '%M:%S.%f') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) elif re.match(r"^\d{2}\.\d{2}:\d{3}$", time_str): # 如果失败,尝试 MM.SS:fff 格式 dt = datetime.strptime(time_str, '%M.%S:%f') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) elif re.match(r"^\d{2}\.\d{2}\.\d{3}$", time_str): # 如果失败,尝试 MM.SS.fff 格式 dt = datetime.strptime(time_str, '%M.%S.%f') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) elif re.match(r"^\d{2}:\d{2}:\d{3}$", time_str): # 如果失败,尝试 MM:SS:fff 格式 dt = datetime.strptime(time_str, '%M:%S:%f') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) elif re.match(r"^\d{2}:\d{2}$", time_str): # 如果失败,尝试 MM:SS:fff 格式 dt = datetime.strptime(time_str, '%M:%S') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) elif re.match(r"^\d{2}\.\d{2}$", time_str): # 如果失败,尝试 MM:SS:fff 格式 dt = datetime.strptime(time_str, '%M.%S') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) else: raise RuntimeError(f"转换时间格式失败 {time_str}") return parsed_time def format_time(dt): """将datetime对象格式化为时间字符串""" return dt.strftime('%H:%M:%S.%f')[:-3] # 保留3位小数 def parse_timeline_item(item): """解析时间线项目,提取开始和结束时间""" time_range, _ = item.split(' (', 1) start_time_str, end_time_str = time_range.split(' - ') return parse_time(start_time_str), parse_time(end_time_str) def format_timeline_item(start_time, end_time, activity): """格式化时间线项目""" return f"{format_time(start_time)} - {format_time(end_time)} ({activity})" def merge_timeline_items(items, merge_diff=5): """合并相邻的时间线项目,保留不同的活动描述""" if not items: return [] # 解析所有项目 parsed_items = [] for item in items: start_time, end_time = parse_timeline_item(item) activity = item.split(' (', 1)[1].rstrip(')') parsed_items.append((start_time, end_time, activity)) # 按开始时间排序 parsed_items.sort(key=lambda x: x[0]) # 合并相邻的时间段 merged_items = [parsed_items[0]] for current in parsed_items[1:]: last = merged_items[-1] # 合法时间段 if current[1] > current[0]: # 如果当前项目的开始时间与上一个项目的结束时间相邻或重叠 活动描述相同,直接合并 if current[0] <= last[1] + timedelta(seconds=merge_diff) and last[2] == current[2]: # 更新结束时间为两个结束时间的最大值 new_end_time = max(last[1], current[1]) merged_items[-1] = (last[0], new_end_time, last[2]) else: # 不相邻,添加新项目 merged_items.append(current) # 格式化回原始字符串格式 return [format_timeline_item(start, end, activity) for start, end, activity in merged_items] def convert_time(time_str): # 去除秒字段并转换为标准时间 parts = time_str.split(':') if len(parts) == 3: new_time_str = f"00:{parts[0]}:{parts[1]}.{parts[2].split('.')[1]}" return new_time_str return time_str def merge_product_data(data, start_time_str, end_time_str, merge_diff=5): """合并相同产品的数据""" start_time = parse_time(start_time_str) end_time = parse_time(end_time_str) duration = end_time - start_time # 手动格式化时间差 total_seconds = duration.total_seconds() hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) microseconds = duration.microseconds max_time_str = f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}.{microseconds // 1000:03d}" product_dict = {} # 按产品名称分组 for item in data: product = item["product"] if product not in product_dict: product_dict[product] = {"product": product, "timeline": []} product_dict[product]["timeline"].extend(item["timeline"]) # 合并每个产品的时间线 for product in product_dict: timeline = product_dict[product]["timeline"] new_timeline = [] for item in timeline: start, end = parse_timeline_item(item) # 比较起始时间与时间差 start_str = format_time(start) if (start - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration and not start_str.startswith("00"): new_start_str = convert_time(start_str) else: new_start_str = start_str if (parse_time(new_start_str) - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration: new_start_str = max_time_str end_str = format_time(end) if (end - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration and not end_str.startswith("00"): new_end_str = convert_time(end_str) else: new_end_str = end_str if (parse_time(new_end_str) - datetime.strptime("00:00:00.000", '%H:%M:%S.%f')) > duration: new_end_str = max_time_str activity = item.split(' (', 1)[1].rstrip(')') new_item = f"{new_start_str} - {new_end_str} ({activity})" new_timeline.append(new_item) product_dict[product]["timeline"] = merge_timeline_items(new_timeline, merge_diff=merge_diff) # 返回合并后的列表 return list(product_dict.values())