新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段

This commit is contained in:
lixinyue
2026-01-21 11:03:37 +08:00
parent 84c6c7e2a6
commit 0e2e495d09

View File

@@ -387,9 +387,14 @@ class MemoryEntityService:
logger.warning(f"转换时间格式失败: {e}, 原始值: {dt}")
return str(dt) if dt is not None else None
async def _translate_list(self, data_list: List[Dict[str, Any]], model_id: str, fields: List[str]) -> List[Dict[str, Any]]:
async def _translate_list(
self,
data_list: List[Dict[str, Any]],
model_id: str,
fields: List[str]
) -> List[Dict[str, Any]]:
"""
翻译列表中每个字典的指定字段
翻译列表中每个字典的指定字段(并发有限度以降低整体延迟)
Args:
data_list: 要翻译的字典列表
@@ -399,27 +404,97 @@ class MemoryEntityService:
Returns:
翻译后的字典列表
"""
if not data_list:
# 空列表或无字段时直接返回
if not data_list or not fields:
return data_list
translated_list = []
for item in data_list:
import asyncio
# 并发限制,避免一次性发起过多请求
# 可根据实际情况调整(建议 5-10
concurrency_limit = 5
semaphore = asyncio.Semaphore(concurrency_limit)
async def translate_single_field(
index: int,
field: str,
value: Any,
) -> Optional[tuple]:
"""
翻译单个字段并返回 (索引, 字段名, 翻译结果)
Returns:
(index, field, translated_value) 或 None如果跳过
"""
# 跳过空值
if value is None or value == "":
return None
# 统一转成字符串再翻译,防止非字符串类型导致错误
text = str(value)
try:
async with semaphore:
# 调用 Translation_English 进行翻译
# 注意Translation_English 的参数顺序是 (model_id, text)
translated = await Translation_English(model_id, text)
# 如果翻译结果为空,保留原值
if translated is None or translated == "":
return None
return index, field, translated
except Exception as e:
logger.warning(f"翻译字段 {field} (索引 {index}) 失败: {e}")
return None
# 构造所有需要翻译的任务
tasks = []
for idx, item in enumerate(data_list):
# 防御性检查:确保 item 是字典
if not isinstance(item, dict):
translated_list.append(item)
continue
translated_item = item.copy()
for field in fields:
if field in translated_item and translated_item[field]:
try:
# 调用Translation_English翻译单个字段
translated_value = await Translation_English(model_id, translated_item[field])
if translated_value:
translated_item[field] = translated_value
except Exception as e:
logger.warning(f"翻译字段 {field} 失败: {e}")
if field not in item:
continue
value = item.get(field)
# 对于 None 或空字符串的值,直接跳过,不创建任务
if value is None or value == "":
continue
tasks.append(
asyncio.create_task(
translate_single_field(idx, field, value)
)
)
# 如果没有需要翻译的任务,直接返回原列表
if not tasks:
return data_list
# 使用 gather 并发执行翻译任务(受 semaphore 限制)
# return_exceptions=True 可以防止单个任务失败导致整体失败
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建深拷贝以避免修改原始数据
translated_list = [item.copy() if isinstance(item, dict) else item for item in data_list]
# 将翻译结果回填到列表
for result in results:
# 跳过 None 结果和异常
if result is None or isinstance(result, Exception):
if isinstance(result, Exception):
logger.warning(f"翻译任务异常: {result}")
continue
translated_list.append(translated_item)
idx, field, translated = result
# 防御性检查索引范围
if 0 <= idx < len(translated_list) and isinstance(translated_list[idx], dict):
translated_list[idx][field] = translated
return translated_list