Fix/language unification (#283)

* [changes]add user_summary language unification

* [add]Entity extraction, user memory, emotion suggestions, unified language type for writing

* [add]Complete the switch between Chinese and English for the emotion labels and emotion suggestions fields.

* [changes]add user_summary language unification

* [add]Entity extraction, user memory, emotion suggestions, unified language type for writing

* [add]Complete the switch between Chinese and English for the emotion labels and emotion suggestions fields.

* [changes]Modify the code based on the AI review
This commit is contained in:
乐力齐
2026-02-03 16:03:08 +08:00
committed by GitHub
parent 63fa4dc8ec
commit 8670aaba1e
30 changed files with 1033 additions and 810 deletions

View File

@@ -57,24 +57,57 @@ class EmotionAnalyticsService:
self.emotion_repo = EmotionRepository(connector)
logger.info("情绪分析服务初始化完成")
# 情绪类型的中英文映射
EMOTION_TYPE_TRANSLATIONS = {
'joy': {'zh': '喜悦', 'en': 'Joy'},
'sadness': {'zh': '悲伤', 'en': 'Sadness'},
'anger': {'zh': '愤怒', 'en': 'Anger'},
'fear': {'zh': '恐惧', 'en': 'Fear'},
'surprise': {'zh': '惊讶', 'en': 'Surprise'},
'neutral': {'zh': '中性', 'en': 'Neutral'}
}
def _translate_emotion_type(self, emotion_type: str, language: str = "zh") -> str:
"""将情绪类型翻译成指定语言
Args:
emotion_type: 情绪类型英文key
language: 目标语言 ("zh""en")
Returns:
翻译后的情绪类型名称
"""
if emotion_type in self.EMOTION_TYPE_TRANSLATIONS:
return self.EMOTION_TYPE_TRANSLATIONS[emotion_type].get(language, emotion_type)
return emotion_type
async def get_emotion_tags(
self,
end_user_id: str,
emotion_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 10
limit: int = 10,
language: str = "zh"
) -> Dict[str, Any]:
"""获取情绪标签统计
查询指定用户的情绪类型分布,包括计数、百分比和平均强度。
确保返回所有6个情绪维度joy、sadness、anger、fear、surprise、neutral
即使某些维度没有数据也会返回count=0的记录。
Args:
end_user_id: 用户ID
emotion_type: 情绪类型过滤
start_date: 开始日期
end_date: 结束日期
limit: 返回数量限制
language: 输出语言 ("zh" 中文, "en" 英文)
"""
try:
logger.info(f"获取情绪标签统计: user={end_user_id}, type={emotion_type}, "
f"start={start_date}, end={end_date}, limit={limit}")
f"start={start_date}, end={end_date}, limit={limit}, language={language}")
# 调用仓储层查询
tags = await self.emotion_repo.get_emotion_tags(
@@ -91,15 +124,17 @@ class EmotionAnalyticsService:
# 将查询结果转换为字典,方便查找
tags_dict = {tag["emotion_type"]: tag for tag in tags}
# 补全缺失的情绪维度
# 补全缺失的情绪维度,并翻译 emotion_type
complete_tags = []
for emotion in all_emotion_types:
if emotion in tags_dict:
complete_tags.append(tags_dict[emotion])
tag = tags_dict[emotion].copy()
tag["emotion_type"] = self._translate_emotion_type(emotion, language)
complete_tags.append(tag)
else:
# 如果该情绪类型不存在,添加默认值
complete_tags.append({
"emotion_type": emotion,
"emotion_type": self._translate_emotion_type(emotion, language),
"count": 0,
"percentage": 0.0,
"avg_intensity": 0.0
@@ -475,6 +510,7 @@ class EmotionAnalyticsService:
self,
end_user_id: str,
db: Session,
language: str = "zh",
) -> Dict[str, Any]:
"""生成个性化情绪建议
@@ -483,6 +519,7 @@ class EmotionAnalyticsService:
Args:
end_user_id: 宿主ID用户组ID
db: 数据库会话
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
Dict: 包含个性化建议的响应:
@@ -533,7 +570,7 @@ class EmotionAnalyticsService:
user_profile = await self._get_simple_user_profile(end_user_id)
# 6. 构建LLM prompt
prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile)
prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile, language)
# 7. 调用LLM生成建议使用配置中的LLM
if llm_client is None:
@@ -554,12 +591,12 @@ class EmotionAnalyticsService:
except Exception as e:
logger.error(f"LLM 结构化输出失败: {str(e)}")
# 返回默认建议
suggestions_response = self._get_default_suggestions(health_data)
suggestions_response = self._get_default_suggestions(health_data, language)
# 8. 验证建议数量3-5条
if len(suggestions_response.suggestions) < 3:
logger.warning(f"建议数量不足: {len(suggestions_response.suggestions)}")
suggestions_response = self._get_default_suggestions(health_data)
suggestions_response = self._get_default_suggestions(health_data, language)
elif len(suggestions_response.suggestions) > 5:
logger.warning(f"建议数量过多: {len(suggestions_response.suggestions)}")
suggestions_response.suggestions = suggestions_response.suggestions[:5]
@@ -624,7 +661,8 @@ class EmotionAnalyticsService:
self,
health_data: Dict[str, Any],
patterns: Dict[str, Any],
user_profile: Dict[str, Any]
user_profile: Dict[str, Any],
language: str = "zh"
) -> str:
"""构建情绪建议生成的prompt
@@ -632,6 +670,7 @@ class EmotionAnalyticsService:
health_data: 情绪健康数据
patterns: 情绪模式分析结果
user_profile: 用户画像数据
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
str: LLM prompt
@@ -643,66 +682,114 @@ class EmotionAnalyticsService:
prompt = await render_emotion_suggestions_prompt(
health_data=health_data,
patterns=patterns,
user_profile=user_profile
user_profile=user_profile,
language=language
)
return prompt
def _get_default_suggestions(self, health_data: Dict[str, Any]) -> EmotionSuggestionsResponse:
def _get_default_suggestions(self, health_data: Dict[str, Any], language: str = "zh") -> EmotionSuggestionsResponse:
"""获取默认建议当LLM调用失败时使用
Args:
health_data: 情绪健康数据
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
EmotionSuggestionsResponse: 默认建议
"""
health_score = health_data.get('health_score', 0)
if health_score >= 80:
summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。"
elif health_score >= 60:
summary = "您的情绪健康状况良好,可以通过一些调整进一步提升。"
elif health_score >= 40:
summary = "您的情绪健康需要关注,建议采取一些改善措施。"
else:
summary = "您的情绪健康需要重点关注,建议寻求专业帮助。"
if language == "en":
if health_score >= 80:
summary = "Your emotional health is excellent. Keep up the positive attitude."
elif health_score >= 60:
summary = "Your emotional health is good. Some adjustments can further improve it."
elif health_score >= 40:
summary = "Your emotional health needs attention. Consider taking improvement measures."
else:
summary = "Your emotional health needs serious attention. Consider seeking professional help."
suggestions = [
EmotionSuggestion(
type="emotion_balance",
title="保持情绪平衡",
content="通过正念冥想和深呼吸练习,帮助您更好地管理情绪波动,提升情绪稳定性。",
priority="high",
actionable_steps=[
"每天早晨进行5-10分钟的正念冥想",
"感到情绪波动时进行3次深呼吸",
"记录每天的情绪变化,识别触发因素"
]
),
EmotionSuggestion(
type="activity_recommendation",
title="增加户外活动",
content="适度的户外运动可以有效改善情绪增强身心健康。建议每周进行3-4次户外活动。",
priority="medium",
actionable_steps=[
"每周安排2-330分钟的散步",
"周末尝试户外运动如骑行或爬山",
"在户外活动时关注周围环境,放松心情"
]
),
EmotionSuggestion(
type="social_connection",
title="加强社交联系",
content="与朋友和家人保持良好的社交联系,可以提供情感支持,改善情绪健康。",
priority="medium",
actionable_steps=[
"每周至少与一位朋友或家人深入交流",
"参加感兴趣的社交活动或兴趣小组",
"主动分享自己的感受和想法"
]
)
]
suggestions = [
EmotionSuggestion(
type="Emotion Balance",
title="Maintain Emotional Balance",
content="Through mindfulness meditation and deep breathing exercises, help you better manage emotional fluctuations and improve emotional stability.",
priority="High",
actionable_steps=[
"Practice 5-10 minutes of mindfulness meditation every morning",
"Take 3 deep breaths when feeling emotional fluctuations",
"Record daily emotional changes to identify triggers"
]
),
EmotionSuggestion(
type="Activity Recommendation",
title="Increase Outdoor Activities",
content="Moderate outdoor exercise can effectively improve mood and enhance physical and mental health. Recommend 3-4 outdoor activities per week.",
priority="Medium",
actionable_steps=[
"Schedule 2-3 30-minute walks per week",
"Try outdoor sports like cycling or hiking on weekends",
"Focus on surroundings and relax during outdoor activities"
]
),
EmotionSuggestion(
type="Social Connection",
title="Strengthen Social Connections",
content="Maintaining good social connections with friends and family can provide emotional support and improve emotional health.",
priority="Medium",
actionable_steps=[
"Have a deep conversation with at least one friend or family member weekly",
"Join social activities or interest groups you enjoy",
"Actively share your feelings and thoughts"
]
)
]
else:
if health_score >= 80:
summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。"
elif health_score >= 60:
summary = "您的情绪健康状况良好,可以通过一些调整进一步提升。"
elif health_score >= 40:
summary = "您的情绪健康需要关注,建议采取一些改善措施。"
else:
summary = "您的情绪健康需要重点关注,建议寻求专业帮助。"
suggestions = [
EmotionSuggestion(
type="情绪平衡",
title="保持情绪平衡",
content="通过正念冥想和深呼吸练习,帮助您更好地管理情绪波动,提升情绪稳定性。",
priority="",
actionable_steps=[
"每天早晨进行5-10分钟的正念冥想",
"感到情绪波动时进行3次深呼吸",
"记录每天的情绪变化,识别触发因素"
]
),
EmotionSuggestion(
type="活动建议",
title="增加户外活动",
content="适度的户外运动可以有效改善情绪增强身心健康。建议每周进行3-4次户外活动。",
priority="",
actionable_steps=[
"每周安排2-3次30分钟的散步",
"周末尝试户外运动如骑行或爬山",
"在户外活动时关注周围环境,放松心情"
]
),
EmotionSuggestion(
type="社交联系",
title="加强社交联系",
content="与朋友和家人保持良好的社交联系,可以提供情感支持,改善情绪健康。",
priority="",
actionable_steps=[
"每周至少与一位朋友或家人深入交流",
"参加感兴趣的社交活动或兴趣小组",
"主动分享自己的感受和想法"
]
)
]
return EmotionSuggestionsResponse(
health_summary=summary,

View File

@@ -32,7 +32,6 @@ from app.repositories.memory_short_repository import ShortTermMemoryRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_agent_schema import Write_UserInput
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_base_service import Translation_English
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_konwledges_server import (
write_rag,
@@ -265,7 +264,7 @@ class MemoryAgentService:
logger.info("Log streaming completed, cleaning up resources")
# LogStreamer uses context manager for file handling, so cleanup is automatic
async def write_memory(self, end_user_id: str, messages: list[dict], config_id: Optional[uuid.UUID]|int, db: Session, storage_type: str, user_rag_memory_id: str) -> str:
async def write_memory(self, end_user_id: str, messages: list[dict], config_id: Optional[uuid.UUID]|int, db: Session, storage_type: str, user_rag_memory_id: str, language: str = "zh") -> str:
"""
Process write operation with config_id
@@ -276,6 +275,7 @@ class MemoryAgentService:
db: SQLAlchemy database session
storage_type: Storage type (neo4j or rag)
user_rag_memory_id: User RAG memory ID
language: 语言类型 ("zh" 中文, "en" 英文)
Returns:
Write operation result status
@@ -341,7 +341,8 @@ class MemoryAgentService:
initial_state = {
"messages": langchain_messages,
"end_user_id": end_user_id,
"memory_config": memory_config
"memory_config": memory_config,
"language": language
}
# 获取节点更新信息
@@ -890,9 +891,7 @@ class MemoryAgentService:
async def get_hot_memory_tags_by_user(
self,
end_user_id: Optional[str] = None,
limit: int = 20,
model_id: Optional[str] = None,
language_type: Optional[str] = "zh"
limit: int = 20
) -> List[Dict[str, Any]]:
"""
获取指定用户的热门记忆标签
@@ -906,17 +905,15 @@ class MemoryAgentService:
{"name": "标签名", "frequency": 频次},
...
]
注意:标签语言由写入时的 X-Language-Type 决定,查询时不进行翻译
"""
try:
# by_user=False 表示按 end_user_id 查询在Neo4j中end_user_id就是用户维度
tags = await get_hot_memory_tags(end_user_id, limit=limit, by_user=False)
payload=[]
payload = []
for tag, freq in tags:
if language_type!="zh":
tag=await Translation_English(model_id, tag)
payload.append({"name": tag, "frequency": freq})
else:
payload.append({"name": tag, "frequency": freq})
payload.append({"name": tag, "frequency": freq})
return payload
except Exception as e:
logger.error(f"热门记忆标签查询失败: {e}")

View File

@@ -16,7 +16,6 @@ import json
from datetime import datetime
from app.schemas.memory_episodic_schema import EmotionType
from app.services.memory_base_service import Translation_English
logger = logging.getLogger(__name__)
@@ -374,119 +373,6 @@ class MemoryEntityService:
logger.warning(f"转换时间格式失败: {e}, 原始值: {dt}")
return str(dt) if dt is not None else None
async def _translate_list(
self,
data_list: List[Dict[str, Any]],
model_id: str,
fields: List[str]
) -> List[Dict[str, Any]]:
"""
翻译列表中每个字典的指定字段(并发有限度以降低整体延迟)
Args:
data_list: 要翻译的字典列表
model_id: 模型ID
fields: 需要翻译的字段列表
Returns:
翻译后的字典列表
"""
# 空列表或无字段时直接返回
if not data_list or not fields:
return data_list
import asyncio
# 并发限制,避免一次性发起过多请求
# 可根据实际情况调整(建议 5-10
concurrency_limit = 5
semaphore = asyncio.Semaphore(concurrency_limit)
async def translate_single_field(
index: int,
field: str,
value: Any,
) -> Optional[tuple]:
"""
翻译单个字段并返回 (索引, 字段名, 翻译结果)
Returns:
(index, field, translated_value) 或 None如果跳过
"""
# 跳过空值
if value is None or value == "":
return None
# 统一转成字符串再翻译,防止非字符串类型导致错误
text = str(value)
try:
async with semaphore:
# 调用 Translation_English 进行翻译
# 注意Translation_English 的参数顺序是 (model_id, text)
translated = await Translation_English(model_id, text)
# 如果翻译结果为空,保留原值
if translated is None or translated == "":
return None
return index, field, translated
except Exception as e:
logger.warning(f"翻译字段 {field} (索引 {index}) 失败: {e}")
return None
# 构造所有需要翻译的任务
tasks = []
for idx, item in enumerate(data_list):
# 防御性检查:确保 item 是字典
if not isinstance(item, dict):
continue
for field in fields:
if field not in item:
continue
value = item.get(field)
# 对于 None 或空字符串的值,直接跳过,不创建任务
if value is None or value == "":
continue
tasks.append(
asyncio.create_task(
translate_single_field(idx, field, value)
)
)
# 如果没有需要翻译的任务,直接返回原列表
if not tasks:
return data_list
# 使用 gather 并发执行翻译任务(受 semaphore 限制)
# return_exceptions=True 可以防止单个任务失败导致整体失败
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建深拷贝以避免修改原始数据
translated_list = [item.copy() if isinstance(item, dict) else item for item in data_list]
# 将翻译结果回填到列表
for result in results:
# 跳过 None 结果和异常
if result is None or isinstance(result, Exception):
if isinstance(result, Exception):
logger.warning(f"翻译任务异常: {result}")
continue
idx, field, translated = result
# 防御性检查索引范围
if 0 <= idx < len(translated_list) and isinstance(translated_list[idx], dict):
translated_list[idx][field] = translated
return translated_list
async def close(self):
"""关闭数据库连接"""

View File

@@ -236,12 +236,13 @@ class DataConfigService: # 数据配置服务类PostgreSQL
return self._convert_timestamps_to_format(data_list)
async def pilot_run_stream(self, payload: ConfigPilotRun) -> AsyncGenerator[str, None]:
async def pilot_run_stream(self, payload: ConfigPilotRun, language: str = "zh") -> AsyncGenerator[str, None]:
"""
流式执行试运行,产生 SSE 格式的进度事件
Args:
payload: 试运行配置和对话文本
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Yields:
SSE 格式的字符串,包含以下事件类型:
@@ -315,6 +316,7 @@ class DataConfigService: # 数据配置服务类PostgreSQL
dialogue_text=dialogue_text,
db=self.db,
progress_callback=progress_callback,
language=language,
)
logger.info("[PILOT_RUN_STREAM] pipeline_main completed")

View File

@@ -78,7 +78,8 @@ class OntologyService:
scenario: str,
domain: Optional[str] = None,
scene_id: Optional[Any] = None,
workspace_id: Optional[Any] = None
workspace_id: Optional[Any] = None,
language: str = "zh"
) -> OntologyExtractionResponse:
"""执行本体提取
@@ -91,6 +92,7 @@ class OntologyService:
domain: 可选的领域提示
scene_id: 可选的场景ID,用于权限验证(不再用于自动保存)
workspace_id: 可选的工作空间ID,用于权限验证
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
OntologyExtractionResponse: 提取结果
@@ -155,6 +157,7 @@ class OntologyService:
llm_max_tokens=self.DEFAULT_LLM_MAX_TOKENS,
max_description_length=self.DEFAULT_MAX_DESCRIPTION_LENGTH,
timeout=self.DEFAULT_LLM_TIMEOUT,
language=language,
)
extraction_duration = time.time() - extraction_start_time

View File

@@ -36,6 +36,7 @@ async def run_pilot_extraction(
dialogue_text: str,
db: Session,
progress_callback: Optional[Callable[[str, str, Optional[dict]], Awaitable[None]]] = None,
language: str = "zh",
) -> None:
"""
执行试运行模式的知识提取流水线。
@@ -43,10 +44,12 @@ async def run_pilot_extraction(
Args:
memory_config: 从数据库加载的内存配置对象
dialogue_text: 输入的对话文本
db: 数据库会话
progress_callback: 可选的进度回调函数
- 参数1 (stage): 当前处理阶段标识符
- 参数2 (message): 人类可读的进度消息
- 参数3 (data): 可选的附加数据字典
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
"""
log_file = "logs/time.log"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
@@ -146,6 +149,7 @@ async def run_pilot_extraction(
config=config,
progress_callback=progress_callback,
embedding_id=str(memory_config.embedding_model_id),
language=language,
)
log_time("Orchestrator Initialization", time.time() - step_start, log_file)
@@ -191,6 +195,7 @@ async def run_pilot_extraction(
chunked_dialogs,
llm_client=llm_client,
embedder_client=embedder_client,
language=language,
)
log_time("Memory Summary Generation", time.time() - step_start, log_file)

View File

@@ -18,7 +18,7 @@ from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping
from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.memory_base_service import MemoryBaseService, MemoryTransService, Translation_English
from app.services.memory_base_service import MemoryBaseService, MemoryTransService
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.services.memory_short_service import ShortService
@@ -455,9 +455,7 @@ class UserMemoryService:
async def get_cached_memory_insight(
self,
db: Session,
end_user_id: str,
model_id: str,
language_type: str
end_user_id: str
) -> Dict[str, Any]:
"""
从数据库获取缓存的记忆洞察(四个维度)
@@ -465,7 +463,7 @@ class UserMemoryService:
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
Returns:
{
"memory_insight": str, # 总体概述
@@ -519,10 +517,6 @@ class UserMemoryService:
memory_insight=end_user.memory_insight
behavior_pattern=end_user.behavior_pattern
growth_trajectory=end_user.growth_trajectory
if language_type!='zh':
memory_insight=await Translation_English(model_id,memory_insight)
behavior_pattern=await Translation_English(model_id,behavior_pattern)
growth_trajectory=await Translation_English(model_id,growth_trajectory)
return {
"memory_insight":memory_insight, # 总体概述存储在 memory_insight
"behavior_pattern":behavior_pattern,
@@ -571,6 +565,8 @@ class UserMemoryService:
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
model_id: 模型ID用于翻译
language_type: 语言类型 ("zh" 中文, "en" 英文)
Returns:
{
@@ -604,11 +600,10 @@ class UserMemoryService:
personality_traits=end_user.personality_traits
core_values=end_user.core_values
one_sentence_summary=end_user.one_sentence_summary
if language_type!='zh':
user_summary=await Translation_English(model_id, user_summary)
personality_traits = await Translation_English(model_id, personality_traits)
core_values = await Translation_English(model_id, core_values)
one_sentence_summary = await Translation_English(model_id, one_sentence_summary)
# 直接返回数据库中的数据,不进行二次翻译
# 语言由生成时的 X-Language-Type 决定
has_cache = any([
user_summary,
personality_traits,
@@ -658,7 +653,8 @@ class UserMemoryService:
self,
db: Session,
end_user_id: str,
workspace_id: Optional[uuid.UUID] = None
workspace_id: Optional[uuid.UUID] = None,
language: str = "zh"
) -> Dict[str, Any]:
"""
生成并缓存记忆洞察
@@ -667,6 +663,7 @@ class UserMemoryService:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
workspace_id: 工作空间ID (可选)
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
{
@@ -679,7 +676,7 @@ class UserMemoryService:
}
"""
try:
logger.info(f"开始为 end_user_id {end_user_id} 生成记忆洞察")
logger.info(f"开始为 end_user_id {end_user_id} 生成记忆洞察, language={language}")
# 转换为UUID并查询用户
user_uuid = uuid.UUID(end_user_id)
@@ -700,7 +697,7 @@ class UserMemoryService:
# 使用 end_user_id 调用分析函数
try:
logger.info(f"使用 end_user_id={end_user_id} 生成记忆洞察")
result = await analytics_memory_insight_report(end_user_id)
result = await analytics_memory_insight_report(end_user_id, language=language)
memory_insight = result.get("memory_insight", "")
behavior_pattern = result.get("behavior_pattern", "")
@@ -789,7 +786,8 @@ class UserMemoryService:
self,
db: Session,
end_user_id: str,
workspace_id: Optional[uuid.UUID] = None
workspace_id: Optional[uuid.UUID] = None,
language: str = "zh"
) -> Dict[str, Any]:
"""
生成并缓存用户摘要(四个部分)
@@ -798,6 +796,7 @@ class UserMemoryService:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
workspace_id: 工作空间ID (可选)
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
{
@@ -810,7 +809,7 @@ class UserMemoryService:
}
"""
try:
logger.info(f"开始为 end_user_id {end_user_id} 生成用户摘要")
logger.info(f"开始为 end_user_id {end_user_id} 生成用户摘要, language={language}")
# 转换为UUID并查询用户
user_uuid = uuid.UUID(end_user_id)
@@ -831,7 +830,7 @@ class UserMemoryService:
# 使用 end_user_id 调用分析函数
try:
logger.info(f"使用 end_user_id={end_user_id} 生成用户摘要")
result = await analytics_user_summary(end_user_id)
result = await analytics_user_summary(end_user_id, language=language)
user_summary = result.get("user_summary", "")
personality = result.get("personality", "")
@@ -915,7 +914,8 @@ class UserMemoryService:
async def generate_cache_for_workspace(
self,
db: Session,
workspace_id: uuid.UUID
workspace_id: uuid.UUID,
language: str = "zh"
) -> Dict[str, Any]:
"""
为整个工作空间生成缓存
@@ -923,6 +923,7 @@ class UserMemoryService:
Args:
db: 数据库会话
workspace_id: 工作空间ID
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
{
@@ -932,7 +933,7 @@ class UserMemoryService:
"errors": List[Dict]
}
"""
logger.info(f"开始为工作空间 {workspace_id} 批量生成缓存")
logger.info(f"开始为工作空间 {workspace_id} 批量生成缓存, language={language}")
total_users = 0
successful = 0
@@ -953,10 +954,10 @@ class UserMemoryService:
try:
# 生成记忆洞察
insight_result = await self.generate_and_cache_insight(db, end_user_id)
insight_result = await self.generate_and_cache_insight(db, end_user_id, language=language)
# 生成用户摘要
summary_result = await self.generate_and_cache_summary(db, end_user_id)
summary_result = await self.generate_and_cache_summary(db, end_user_id, language=language)
# 检查是否都成功
if insight_result["success"] and summary_result["success"]:
@@ -1007,7 +1008,7 @@ class UserMemoryService:
# 独立的分析函数
async def analytics_memory_insight_report(end_user_id: Optional[str] = None) -> Dict[str, Any]:
async def analytics_memory_insight_report(end_user_id: Optional[str] = None, language: str = "zh") -> Dict[str, Any]:
"""
生成记忆洞察报告(四个维度)
@@ -1019,6 +1020,7 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
Args:
end_user_id: 可选的终端用户ID
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
包含四个维度报告的字典: {
@@ -1029,8 +1031,12 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
}
"""
from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt
from app.core.language_utils import validate_language
import re
# 验证语言参数
language = validate_language(language)
insight = MemoryInsightHelper(end_user_id)
try:
@@ -1070,7 +1076,8 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
user_prompt = await render_memory_insight_prompt(
domain_distribution=domain_distribution_str,
active_periods=active_periods_str,
social_connections=social_connections_str
social_connections=social_connections_str,
language=language
)
messages = [
@@ -1097,11 +1104,11 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
full_response = str(content) if content is not None else ""
# 7. 解析四个部分
# 使用正则表达式提取四个部分
memory_insight_match = re.search(r'【总体概述】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
behavior_match = re.search(r'【行为模式】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
findings_match = re.search(r'【关键发现】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
trajectory_match = re.search(r'【成长轨迹】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
# 使用正则表达式提取四个部分(支持中英文双语标题)
memory_insight_match = re.search(r'(?:总体概述|Overview)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
behavior_match = re.search(r'(?:行为模式|Behavior Pattern)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
findings_match = re.search(r'(?:关键发现|Key Findings)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
trajectory_match = re.search(r'(?:成长轨迹|Growth Trajectory)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
memory_insight = memory_insight_match.group(1).strip() if memory_insight_match else ""
behavior_pattern = behavior_match.group(1).strip() if behavior_match else ""
@@ -1128,7 +1135,7 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
await insight.close()
async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str, Any]:
async def analytics_user_summary(end_user_id: Optional[str] = None, language: str = "zh") -> Dict[str, Any]:
"""
生成用户摘要(包含四个部分)
@@ -1139,6 +1146,7 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
Args:
end_user_id: 可选的终端用户ID
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
包含四部分摘要的字典: {
@@ -1149,8 +1157,12 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
}
"""
from app.core.memory.utils.prompt.prompt_utils import render_user_summary_prompt
from app.core.language_utils import validate_language
import re
# 验证语言参数
language = validate_language(language)
# 创建 UserSummaryHelper 实例
user_summary_tool = UserSummaryHelper(end_user_id or os.getenv("SELECTED_end_user_id", "group_123"))
@@ -1165,8 +1177,9 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
# 2) 使用 prompt_utils 渲染提示词
user_prompt = await render_user_summary_prompt(
user_id=user_summary_tool.user_id,
entities=", ".join(entity_lines) if entity_lines else "(空)",
statements=" | ".join(statement_samples) if statement_samples else "(空)"
entities=", ".join(entity_lines) if entity_lines else "(空)" if language == "zh" else "(empty)",
statements=" | ".join(statement_samples) if statement_samples else "(空)" if language == "zh" else "(empty)",
language=language
)
messages = [
@@ -1193,11 +1206,11 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
full_response = str(content) if content is not None else ""
# 5) 解析四个部分
# 使用正则表达式提取四个部分
user_summary_match = re.search(r'【基本介绍】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
personality_match = re.search(r'【性格特点】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
core_values_match = re.search(r'【核心价值观】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
one_sentence_match = re.search(r'【一句话总结】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
# 使用正则表达式提取四个部分(支持中英文标题)
user_summary_match = re.search(r'(?:基本介绍|Basic Introduction)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
personality_match = re.search(r'(?:性格特点|Personality Traits)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
core_values_match = re.search(r'(?:核心价值观|Core Values)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
one_sentence_match = re.search(r'(?:一句话总结|One-Sentence Summary)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
user_summary = user_summary_match.group(1).strip() if user_summary_match else ""
personality = personality_match.group(1).strip() if personality_match else ""