diff --git a/api/app/services/memory_entity_relationship_service.py b/api/app/services/memory_entity_relationship_service.py index f544cb75..06c1616d 100644 --- a/api/app/services/memory_entity_relationship_service.py +++ b/api/app/services/memory_entity_relationship_service.py @@ -387,9 +387,14 @@ class MemoryEntityService: logger.warning(f"转换时间格式失败: {e}, 原始值: {dt}") return str(dt) if dt is not None else None - async def _translate_list(self, data_list: List[Dict[str, Any]], model_id: str, fields: List[str]) -> List[Dict[str, Any]]: + async def _translate_list( + self, + data_list: List[Dict[str, Any]], + model_id: str, + fields: List[str] + ) -> List[Dict[str, Any]]: """ - 翻译列表中每个字典的指定字段 + 翻译列表中每个字典的指定字段(并发有限度以降低整体延迟) Args: data_list: 要翻译的字典列表 @@ -399,27 +404,97 @@ class MemoryEntityService: Returns: 翻译后的字典列表 """ - if not data_list: + # 空列表或无字段时直接返回 + if not data_list or not fields: return data_list - translated_list = [] - for item in data_list: + import asyncio + + # 并发限制,避免一次性发起过多请求 + # 可根据实际情况调整(建议 5-10) + concurrency_limit = 5 + semaphore = asyncio.Semaphore(concurrency_limit) + + async def translate_single_field( + index: int, + field: str, + value: Any, + ) -> Optional[tuple]: + """ + 翻译单个字段并返回 (索引, 字段名, 翻译结果) + + Returns: + (index, field, translated_value) 或 None(如果跳过) + """ + # 跳过空值 + if value is None or value == "": + return None + + # 统一转成字符串再翻译,防止非字符串类型导致错误 + text = str(value) + + try: + async with semaphore: + # 调用 Translation_English 进行翻译 + # 注意:Translation_English 的参数顺序是 (model_id, text) + translated = await Translation_English(model_id, text) + + # 如果翻译结果为空,保留原值 + if translated is None or translated == "": + return None + + return index, field, translated + except Exception as e: + logger.warning(f"翻译字段 {field} (索引 {index}) 失败: {e}") + return None + + # 构造所有需要翻译的任务 + tasks = [] + for idx, item in enumerate(data_list): + # 防御性检查:确保 item 是字典 if not isinstance(item, dict): - translated_list.append(item) continue - translated_item = item.copy() for field in fields: - if field in translated_item and translated_item[field]: - try: - # 调用Translation_English翻译单个字段 - translated_value = await Translation_English(model_id, translated_item[field]) - if translated_value: - translated_item[field] = translated_value - except Exception as e: - logger.warning(f"翻译字段 {field} 失败: {e}") + if field not in item: + continue + + value = item.get(field) + + # 对于 None 或空字符串的值,直接跳过,不创建任务 + if value is None or value == "": + continue + + tasks.append( + asyncio.create_task( + translate_single_field(idx, field, value) + ) + ) + + # 如果没有需要翻译的任务,直接返回原列表 + if not tasks: + return data_list + + # 使用 gather 并发执行翻译任务(受 semaphore 限制) + # return_exceptions=True 可以防止单个任务失败导致整体失败 + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 创建深拷贝以避免修改原始数据 + translated_list = [item.copy() if isinstance(item, dict) else item for item in data_list] + + # 将翻译结果回填到列表 + for result in results: + # 跳过 None 结果和异常 + if result is None or isinstance(result, Exception): + if isinstance(result, Exception): + logger.warning(f"翻译任务异常: {result}") + continue - translated_list.append(translated_item) + idx, field, translated = result + + # 防御性检查索引范围 + if 0 <= idx < len(translated_list) and isinstance(translated_list[idx], dict): + translated_list[idx][field] = translated return translated_list