Merge branch 'develop' into fix/memory-enduser-config

This commit is contained in:
Ke Sun
2026-02-03 19:40:08 +08:00
174 changed files with 7483 additions and 4006 deletions

View File

@@ -3,7 +3,7 @@ import asyncio
import json
import time
import uuid
from typing import Optional, Dict, Any, AsyncGenerator, Annotated
from typing import Optional, Dict, Any, AsyncGenerator, Annotated, List
from fastapi import Depends
from sqlalchemy.orm import Session
@@ -15,6 +15,7 @@ from app.core.logging_config import get_business_logger
from app.db import get_db, get_db_context
from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig
from app.schemas import DraftRunRequest
from app.schemas.app_schema import FileInput
from app.services.tool_service import ToolService
from app.repositories.tool_repository import ToolRepository
from app.db import get_db
@@ -26,6 +27,7 @@ from app.services.draft_run_service import create_web_search_tool
from app.services.model_service import ModelApiKeyService
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
from app.services.workflow_service import WorkflowService
from app.services.multimodal_service import MultimodalService
logger = get_business_logger()
@@ -48,7 +50,8 @@ class AppChatService:
memory: bool = True,
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
workspace_id: Optional[str] = None
workspace_id: Optional[str] = None,
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> Dict[str, Any]:
"""聊天(非流式)"""
@@ -155,7 +158,14 @@ class AppChatService:
for msg in messages
]
# 调用 Agent
# 处理多模态文件
processed_files = None
if files:
multimodal_service = MultimodalService(self.db)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 调用 Agent支持多模态
result = await agent.chat(
message=message,
history=history,
@@ -164,7 +174,8 @@ class AppChatService:
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
)
# 保存消息
@@ -206,6 +217,7 @@ class AppChatService:
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
workspace_id: Optional[str] = None,
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> AsyncGenerator[str, None]:
"""聊天(流式)"""
@@ -312,10 +324,17 @@ class AppChatService:
for msg in messages
]
# 处理多模态文件
processed_files = None
if files:
multimodal_service = MultimodalService(self.db)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n"
# 流式调用 Agent
# 流式调用 Agent(支持多模态)
full_content = ""
total_tokens = 0
async for chunk in agent.chat_stream(
@@ -326,7 +345,8 @@ class AppChatService:
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
):
if isinstance(chunk, int):
total_tokens = chunk

View File

@@ -19,11 +19,13 @@ from app.models import AgentConfig, ModelApiKey, ModelConfig
from app.repositories.model_repository import ModelApiKeyRepository
from app.repositories.tool_repository import ToolRepository
from app.schemas.prompt_schema import PromptMessageRole, render_prompt_message
from app.schemas.app_schema import FileInput
from app.services import task_service
from app.services.langchain_tool_server import Search
from app.services.memory_agent_service import MemoryAgentService
from app.services.model_parameter_merger import ModelParameterMerger
from app.services.tool_service import ToolService
from app.services.multimodal_service import MultimodalService
from langchain.tools import tool
from pydantic import BaseModel, Field
from sqlalchemy import select
@@ -62,26 +64,23 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str
@tool(args_schema=LongTermMemoryInput)
def long_term_memory(question: str) -> str:
"""
从用户的历史记忆中检索相关信息。这是一个强大的工具,可以帮助你了解用户的背景、偏好和历史对话内容。
从用户的历史记忆中检索相关信息。用于了解用户的背景、偏好和历史对话内容。
以下场景不需要使用此工具:
1. 情绪/社交问候场景(如"你好""谢谢""再见"等简单寒暄
2. 纯任务性场景(如"帮我写代码""翻译这段文字"等不需要历史上下文的任务
3. 处理外部内容时如用户提供的文本、代码、RAG数据等这些内容本身已经包含所需信息
**何时使用此工具:**
- 用户明确询问历史信息(如"我之前说过什么""上次我们聊了什么"
- 用户询问个人信息或偏好(如"我喜欢什么""我的习惯是什么"
- 需要基于历史上下文提供个性化建议
除上述场景外的所有其他情况都应该使用此工具,特别是:
- 用户询问个人信息或历史对话内容
- 需要了解用户偏好、习惯或背景
- 用户提到"之前""上次""记得"等涉及历史的词汇
- 需要个性化回复或基于历史上下文的建议
- 用户询问关于自己的任何信息
**何时不使用此工具:**
- 简单问候(如"你好""谢谢""再见"
- 纯任务性请求(如"写代码""翻译文字""分析图片"
- 用户已提供完整信息(如提供了文本、图片、文档等内容)
- 创作性任务(如"写诗""编故事""创作谜语"
**重要:如果用户的问题可以直接回答,不要调用此工具。只在确实需要历史信息时才使用。**
需要对question改写/优化:
需要重点关注一以下几点
- 相关的关键词,保持原问题的核心语义不变, 根据上下文,使问题更具体、更清晰,将模糊的表达转换为明确的搜索词
- 使用同义词或相关术语扩展查询
Args:
question: question改写之后的内容
question: 需要检索的问题(保持原问题的核心语义,使用清晰的关键词)
Returns:
检索到的历史记忆内容
@@ -124,6 +123,10 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str
}
)
# 检查是否有有效内容
if not memory_content or str(memory_content).strip() == "" or "answer" in str(memory_content) and str(memory_content).count("''") > 0:
return "未找到相关的历史记忆。请直接回答用户的问题,不要再次调用此工具。"
return f"检索到以下历史记忆:\n\n{memory_content}"
except Exception as e:
logger.error("长期记忆检索失败", extra={"error": str(e), "error_type": type(e).__name__})
@@ -246,7 +249,8 @@ class DraftRunService:
user_rag_memory_id: Optional[str] = None,
web_search: bool = True,
memory: bool = True,
sub_agent: bool = False
sub_agent: bool = False,
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> Dict[str, Any]:
"""执行试运行(使用 LangChain Agent
@@ -406,7 +410,16 @@ class DraftRunService:
max_history=agent_config.memory.get("max_history", 10)
)
# 6. 知识库检索
# 6. 处理多模态文件
processed_files = None
if files:
# 获取 provider 信息
provider = api_key_config.get("provider", "openai")
multimodal_service = MultimodalService(self.db, provider=provider)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
# 7. 知识库检索
context = None
logger.debug(
@@ -414,14 +427,15 @@ class DraftRunService:
extra={
"model": api_key_config["model_name"],
"has_history": bool(history),
"has_context": bool(context)
"has_context": bool(context),
"has_files": bool(processed_files)
}
)
memory_config_= agent_config.memory
config_id = memory_config_.get("memory_content") or memory_config_.get("memory_config",None)
# 7. 调用 Agent
# 8. 调用 Agent(支持多模态)
result = await agent.chat(
message=message,
history=history,
@@ -430,12 +444,13 @@ class DraftRunService:
config_id=config_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
)
elapsed_time = time.time() - start_time
# 8. 保存会话消息
# 9. 保存会话消息
if not sub_agent and agent_config.memory and agent_config.memory.get("enabled"):
await self._save_conversation_message(
conversation_id=conversation_id,
@@ -493,7 +508,8 @@ class DraftRunService:
user_rag_memory_id: Optional[str] = None,
web_search: bool = True, # 布尔类型默认值
memory: bool = True, # 布尔类型默认值
sub_agent: bool = False # 是否是作为子Agent运行
sub_agent: bool = False, # 是否是作为子Agent运行
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> AsyncGenerator[str, None]:
"""执行试运行(流式返回,使用 LangChain Agent
@@ -642,6 +658,15 @@ class DraftRunService:
max_history=agent_config.memory.get("max_history", 10)
)
# 6. 处理多模态文件
processed_files = None
if files:
# 获取 provider 信息
provider = api_key_config.get("provider", "openai")
multimodal_service = MultimodalService(self.db, provider=provider)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
# 7. 知识库检索
context = None
@@ -654,7 +679,7 @@ class DraftRunService:
memory_config_ = agent_config.memory
config_id = memory_config_.get("memory_content") or memory_config_.get("memory_config",None)
# 9. 流式调用 Agent
# 9. 流式调用 Agent(支持多模态)
full_content = ""
total_tokens = 0
async for chunk in agent.chat_stream(
@@ -665,7 +690,8 @@ class DraftRunService:
config_id=config_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
):
if isinstance(chunk, int):
total_tokens = chunk

View File

@@ -57,24 +57,57 @@ class EmotionAnalyticsService:
self.emotion_repo = EmotionRepository(connector)
logger.info("情绪分析服务初始化完成")
# 情绪类型的中英文映射
EMOTION_TYPE_TRANSLATIONS = {
'joy': {'zh': '喜悦', 'en': 'Joy'},
'sadness': {'zh': '悲伤', 'en': 'Sadness'},
'anger': {'zh': '愤怒', 'en': 'Anger'},
'fear': {'zh': '恐惧', 'en': 'Fear'},
'surprise': {'zh': '惊讶', 'en': 'Surprise'},
'neutral': {'zh': '中性', 'en': 'Neutral'}
}
def _translate_emotion_type(self, emotion_type: str, language: str = "zh") -> str:
"""将情绪类型翻译成指定语言
Args:
emotion_type: 情绪类型英文key
language: 目标语言 ("zh""en")
Returns:
翻译后的情绪类型名称
"""
if emotion_type in self.EMOTION_TYPE_TRANSLATIONS:
return self.EMOTION_TYPE_TRANSLATIONS[emotion_type].get(language, emotion_type)
return emotion_type
async def get_emotion_tags(
self,
end_user_id: str,
emotion_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 10
limit: int = 10,
language: str = "zh"
) -> Dict[str, Any]:
"""获取情绪标签统计
查询指定用户的情绪类型分布,包括计数、百分比和平均强度。
确保返回所有6个情绪维度joy、sadness、anger、fear、surprise、neutral
即使某些维度没有数据也会返回count=0的记录。
Args:
end_user_id: 用户ID
emotion_type: 情绪类型过滤
start_date: 开始日期
end_date: 结束日期
limit: 返回数量限制
language: 输出语言 ("zh" 中文, "en" 英文)
"""
try:
logger.info(f"获取情绪标签统计: user={end_user_id}, type={emotion_type}, "
f"start={start_date}, end={end_date}, limit={limit}")
f"start={start_date}, end={end_date}, limit={limit}, language={language}")
# 调用仓储层查询
tags = await self.emotion_repo.get_emotion_tags(
@@ -91,15 +124,17 @@ class EmotionAnalyticsService:
# 将查询结果转换为字典,方便查找
tags_dict = {tag["emotion_type"]: tag for tag in tags}
# 补全缺失的情绪维度
# 补全缺失的情绪维度,并翻译 emotion_type
complete_tags = []
for emotion in all_emotion_types:
if emotion in tags_dict:
complete_tags.append(tags_dict[emotion])
tag = tags_dict[emotion].copy()
tag["emotion_type"] = self._translate_emotion_type(emotion, language)
complete_tags.append(tag)
else:
# 如果该情绪类型不存在,添加默认值
complete_tags.append({
"emotion_type": emotion,
"emotion_type": self._translate_emotion_type(emotion, language),
"count": 0,
"percentage": 0.0,
"avg_intensity": 0.0
@@ -475,6 +510,7 @@ class EmotionAnalyticsService:
self,
end_user_id: str,
db: Session,
language: str = "zh",
) -> Dict[str, Any]:
"""生成个性化情绪建议
@@ -483,6 +519,7 @@ class EmotionAnalyticsService:
Args:
end_user_id: 宿主ID用户组ID
db: 数据库会话
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
Dict: 包含个性化建议的响应:
@@ -533,7 +570,7 @@ class EmotionAnalyticsService:
user_profile = await self._get_simple_user_profile(end_user_id)
# 6. 构建LLM prompt
prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile)
prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile, language)
# 7. 调用LLM生成建议使用配置中的LLM
if llm_client is None:
@@ -554,12 +591,12 @@ class EmotionAnalyticsService:
except Exception as e:
logger.error(f"LLM 结构化输出失败: {str(e)}")
# 返回默认建议
suggestions_response = self._get_default_suggestions(health_data)
suggestions_response = self._get_default_suggestions(health_data, language)
# 8. 验证建议数量3-5条
if len(suggestions_response.suggestions) < 3:
logger.warning(f"建议数量不足: {len(suggestions_response.suggestions)}")
suggestions_response = self._get_default_suggestions(health_data)
suggestions_response = self._get_default_suggestions(health_data, language)
elif len(suggestions_response.suggestions) > 5:
logger.warning(f"建议数量过多: {len(suggestions_response.suggestions)}")
suggestions_response.suggestions = suggestions_response.suggestions[:5]
@@ -624,7 +661,8 @@ class EmotionAnalyticsService:
self,
health_data: Dict[str, Any],
patterns: Dict[str, Any],
user_profile: Dict[str, Any]
user_profile: Dict[str, Any],
language: str = "zh"
) -> str:
"""构建情绪建议生成的prompt
@@ -632,6 +670,7 @@ class EmotionAnalyticsService:
health_data: 情绪健康数据
patterns: 情绪模式分析结果
user_profile: 用户画像数据
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
str: LLM prompt
@@ -643,66 +682,114 @@ class EmotionAnalyticsService:
prompt = await render_emotion_suggestions_prompt(
health_data=health_data,
patterns=patterns,
user_profile=user_profile
user_profile=user_profile,
language=language
)
return prompt
def _get_default_suggestions(self, health_data: Dict[str, Any]) -> EmotionSuggestionsResponse:
def _get_default_suggestions(self, health_data: Dict[str, Any], language: str = "zh") -> EmotionSuggestionsResponse:
"""获取默认建议当LLM调用失败时使用
Args:
health_data: 情绪健康数据
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
EmotionSuggestionsResponse: 默认建议
"""
health_score = health_data.get('health_score', 0)
if health_score >= 80:
summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。"
elif health_score >= 60:
summary = "您的情绪健康状况良好,可以通过一些调整进一步提升。"
elif health_score >= 40:
summary = "您的情绪健康需要关注,建议采取一些改善措施。"
else:
summary = "您的情绪健康需要重点关注,建议寻求专业帮助。"
if language == "en":
if health_score >= 80:
summary = "Your emotional health is excellent. Keep up the positive attitude."
elif health_score >= 60:
summary = "Your emotional health is good. Some adjustments can further improve it."
elif health_score >= 40:
summary = "Your emotional health needs attention. Consider taking improvement measures."
else:
summary = "Your emotional health needs serious attention. Consider seeking professional help."
suggestions = [
EmotionSuggestion(
type="emotion_balance",
title="保持情绪平衡",
content="通过正念冥想和深呼吸练习,帮助您更好地管理情绪波动,提升情绪稳定性。",
priority="high",
actionable_steps=[
"每天早晨进行5-10分钟的正念冥想",
"感到情绪波动时进行3次深呼吸",
"记录每天的情绪变化,识别触发因素"
]
),
EmotionSuggestion(
type="activity_recommendation",
title="增加户外活动",
content="适度的户外运动可以有效改善情绪增强身心健康。建议每周进行3-4次户外活动。",
priority="medium",
actionable_steps=[
"每周安排2-330分钟的散步",
"周末尝试户外运动如骑行或爬山",
"在户外活动时关注周围环境,放松心情"
]
),
EmotionSuggestion(
type="social_connection",
title="加强社交联系",
content="与朋友和家人保持良好的社交联系,可以提供情感支持,改善情绪健康。",
priority="medium",
actionable_steps=[
"每周至少与一位朋友或家人深入交流",
"参加感兴趣的社交活动或兴趣小组",
"主动分享自己的感受和想法"
]
)
]
suggestions = [
EmotionSuggestion(
type="Emotion Balance",
title="Maintain Emotional Balance",
content="Through mindfulness meditation and deep breathing exercises, help you better manage emotional fluctuations and improve emotional stability.",
priority="High",
actionable_steps=[
"Practice 5-10 minutes of mindfulness meditation every morning",
"Take 3 deep breaths when feeling emotional fluctuations",
"Record daily emotional changes to identify triggers"
]
),
EmotionSuggestion(
type="Activity Recommendation",
title="Increase Outdoor Activities",
content="Moderate outdoor exercise can effectively improve mood and enhance physical and mental health. Recommend 3-4 outdoor activities per week.",
priority="Medium",
actionable_steps=[
"Schedule 2-3 30-minute walks per week",
"Try outdoor sports like cycling or hiking on weekends",
"Focus on surroundings and relax during outdoor activities"
]
),
EmotionSuggestion(
type="Social Connection",
title="Strengthen Social Connections",
content="Maintaining good social connections with friends and family can provide emotional support and improve emotional health.",
priority="Medium",
actionable_steps=[
"Have a deep conversation with at least one friend or family member weekly",
"Join social activities or interest groups you enjoy",
"Actively share your feelings and thoughts"
]
)
]
else:
if health_score >= 80:
summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。"
elif health_score >= 60:
summary = "您的情绪健康状况良好,可以通过一些调整进一步提升。"
elif health_score >= 40:
summary = "您的情绪健康需要关注,建议采取一些改善措施。"
else:
summary = "您的情绪健康需要重点关注,建议寻求专业帮助。"
suggestions = [
EmotionSuggestion(
type="情绪平衡",
title="保持情绪平衡",
content="通过正念冥想和深呼吸练习,帮助您更好地管理情绪波动,提升情绪稳定性。",
priority="",
actionable_steps=[
"每天早晨进行5-10分钟的正念冥想",
"感到情绪波动时进行3次深呼吸",
"记录每天的情绪变化,识别触发因素"
]
),
EmotionSuggestion(
type="活动建议",
title="增加户外活动",
content="适度的户外运动可以有效改善情绪增强身心健康。建议每周进行3-4次户外活动。",
priority="",
actionable_steps=[
"每周安排2-3次30分钟的散步",
"周末尝试户外运动如骑行或爬山",
"在户外活动时关注周围环境,放松心情"
]
),
EmotionSuggestion(
type="社交联系",
title="加强社交联系",
content="与朋友和家人保持良好的社交联系,可以提供情感支持,改善情绪健康。",
priority="",
actionable_steps=[
"每周至少与一位朋友或家人深入交流",
"参加感兴趣的社交活动或兴趣小组",
"主动分享自己的感受和想法"
]
)
]
return EmotionSuggestionsResponse(
health_summary=summary,

View File

@@ -37,7 +37,6 @@ from app.repositories.memory_short_repository import ShortTermMemoryRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_agent_schema import Write_UserInput
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_base_service import Translation_English
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_konwledges_server import (
write_rag,
@@ -265,7 +264,7 @@ class MemoryAgentService:
logger.info("Log streaming completed, cleaning up resources")
# LogStreamer uses context manager for file handling, so cleanup is automatic
async def write_memory(self, end_user_id: str, messages: list[dict], config_id: Optional[uuid.UUID]|int, db: Session, storage_type: str, user_rag_memory_id: str) -> str:
async def write_memory(self, end_user_id: str, messages: list[dict], config_id: Optional[uuid.UUID]|int, db: Session, storage_type: str, user_rag_memory_id: str, language: str = "zh") -> str:
"""
Process write operation with config_id
@@ -276,6 +275,7 @@ class MemoryAgentService:
db: SQLAlchemy database session
storage_type: Storage type (neo4j or rag)
user_rag_memory_id: User RAG memory ID
language: 语言类型 ("zh" 中文, "en" 英文)
Returns:
Write operation result status
@@ -341,7 +341,8 @@ class MemoryAgentService:
initial_state = {
"messages": langchain_messages,
"end_user_id": end_user_id,
"memory_config": memory_config
"memory_config": memory_config,
"language": language
}
# 获取节点更新信息
@@ -896,9 +897,7 @@ class MemoryAgentService:
async def get_hot_memory_tags_by_user(
self,
end_user_id: Optional[str] = None,
limit: int = 20,
model_id: Optional[str] = None,
language_type: Optional[str] = "zh"
limit: int = 20
) -> List[Dict[str, Any]]:
"""
获取指定用户的热门记忆标签
@@ -912,17 +911,15 @@ class MemoryAgentService:
{"name": "标签名", "frequency": 频次},
...
]
注意:标签语言由写入时的 X-Language-Type 决定,查询时不进行翻译
"""
try:
# by_user=False 表示按 end_user_id 查询在Neo4j中end_user_id就是用户维度
tags = await get_hot_memory_tags(end_user_id, limit=limit, by_user=False)
payload=[]
payload = []
for tag, freq in tags:
if language_type!="zh":
tag=await Translation_English(model_id, tag)
payload.append({"name": tag, "frequency": freq})
else:
payload.append({"name": tag, "frequency": freq})
payload.append({"name": tag, "frequency": freq})
return payload
except Exception as e:
logger.error(f"热门记忆标签查询失败: {e}")

View File

@@ -16,7 +16,6 @@ import json
from datetime import datetime
from app.schemas.memory_episodic_schema import EmotionType
from app.services.memory_base_service import Translation_English
logger = logging.getLogger(__name__)
@@ -374,119 +373,6 @@ class MemoryEntityService:
logger.warning(f"转换时间格式失败: {e}, 原始值: {dt}")
return str(dt) if dt is not None else None
async def _translate_list(
self,
data_list: List[Dict[str, Any]],
model_id: str,
fields: List[str]
) -> List[Dict[str, Any]]:
"""
翻译列表中每个字典的指定字段(并发有限度以降低整体延迟)
Args:
data_list: 要翻译的字典列表
model_id: 模型ID
fields: 需要翻译的字段列表
Returns:
翻译后的字典列表
"""
# 空列表或无字段时直接返回
if not data_list or not fields:
return data_list
import asyncio
# 并发限制,避免一次性发起过多请求
# 可根据实际情况调整(建议 5-10
concurrency_limit = 5
semaphore = asyncio.Semaphore(concurrency_limit)
async def translate_single_field(
index: int,
field: str,
value: Any,
) -> Optional[tuple]:
"""
翻译单个字段并返回 (索引, 字段名, 翻译结果)
Returns:
(index, field, translated_value) 或 None如果跳过
"""
# 跳过空值
if value is None or value == "":
return None
# 统一转成字符串再翻译,防止非字符串类型导致错误
text = str(value)
try:
async with semaphore:
# 调用 Translation_English 进行翻译
# 注意Translation_English 的参数顺序是 (model_id, text)
translated = await Translation_English(model_id, text)
# 如果翻译结果为空,保留原值
if translated is None or translated == "":
return None
return index, field, translated
except Exception as e:
logger.warning(f"翻译字段 {field} (索引 {index}) 失败: {e}")
return None
# 构造所有需要翻译的任务
tasks = []
for idx, item in enumerate(data_list):
# 防御性检查:确保 item 是字典
if not isinstance(item, dict):
continue
for field in fields:
if field not in item:
continue
value = item.get(field)
# 对于 None 或空字符串的值,直接跳过,不创建任务
if value is None or value == "":
continue
tasks.append(
asyncio.create_task(
translate_single_field(idx, field, value)
)
)
# 如果没有需要翻译的任务,直接返回原列表
if not tasks:
return data_list
# 使用 gather 并发执行翻译任务(受 semaphore 限制)
# return_exceptions=True 可以防止单个任务失败导致整体失败
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建深拷贝以避免修改原始数据
translated_list = [item.copy() if isinstance(item, dict) else item for item in data_list]
# 将翻译结果回填到列表
for result in results:
# 跳过 None 结果和异常
if result is None or isinstance(result, Exception):
if isinstance(result, Exception):
logger.warning(f"翻译任务异常: {result}")
continue
idx, field, translated = result
# 防御性检查索引范围
if 0 <= idx < len(translated_list) and isinstance(translated_list[idx], dict):
translated_list[idx][field] = translated
return translated_list
async def close(self):
"""关闭数据库连接"""

View File

@@ -236,12 +236,13 @@ class DataConfigService: # 数据配置服务类PostgreSQL
return self._convert_timestamps_to_format(data_list)
async def pilot_run_stream(self, payload: ConfigPilotRun) -> AsyncGenerator[str, None]:
async def pilot_run_stream(self, payload: ConfigPilotRun, language: str = "zh") -> AsyncGenerator[str, None]:
"""
流式执行试运行,产生 SSE 格式的进度事件
Args:
payload: 试运行配置和对话文本
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Yields:
SSE 格式的字符串,包含以下事件类型:
@@ -315,6 +316,7 @@ class DataConfigService: # 数据配置服务类PostgreSQL
dialogue_text=dialogue_text,
db=self.db,
progress_callback=progress_callback,
language=language,
)
logger.info("[PILOT_RUN_STREAM] pipeline_main completed")

View File

@@ -0,0 +1,429 @@
"""
多模态文件处理服务
处理图片、文档等多模态文件,转换为 LLM 可用的格式
支持的 Provider:
- DashScope (通义千问): 支持 URL 格式
- Bedrock/Anthropic: 仅支持 base64 格式
- OpenAI: 支持 URL 和 base64 格式
"""
import uuid
from typing import List, Dict, Any, Optional, Protocol
from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger
from app.core.exceptions import BusinessException
from app.core.error_codes import BizCode
from app.schemas.app_schema import FileInput, FileType, TransferMethod
from app.models.generic_file_model import GenericFile
logger = get_business_logger()
class ImageFormatStrategy(Protocol):
"""图片格式策略接口"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""将图片 URL 转换为特定 provider 的格式"""
...
class DashScopeImageStrategy:
"""通义千问图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""通义千问格式: {"type": "image", "image": "url"}"""
return {
"type": "image",
"image": url
}
class BedrockImageStrategy:
"""Bedrock/Anthropic 图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""
Bedrock/Anthropic 格式: base64 编码
{"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}}
"""
import httpx
import base64
from mimetypes import guess_type
logger.info(f"下载并编码图片: {url}")
# 下载图片
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
# 获取图片数据
image_data = response.content
# 确定 media type
content_type = response.headers.get("content-type")
if content_type and content_type.startswith("image/"):
media_type = content_type
else:
guessed_type, _ = guess_type(url)
media_type = guessed_type if guessed_type and guessed_type.startswith("image/") else "image/jpeg"
# 转换为 base64
base64_data = base64.b64encode(image_data).decode("utf-8")
logger.info(f"图片编码完成: media_type={media_type}, size={len(base64_data)}")
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data
}
}
class OpenAIImageStrategy:
"""OpenAI 图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""OpenAI 格式: {"type": "image_url", "image_url": {"url": "..."}}"""
return {
"type": "image_url",
"image_url": {
"url": url
}
}
# Provider 到策略的映射
PROVIDER_STRATEGIES = {
"dashscope": DashScopeImageStrategy,
"bedrock": BedrockImageStrategy,
"anthropic": BedrockImageStrategy,
"openai": OpenAIImageStrategy,
}
class MultimodalService:
"""多模态文件处理服务"""
def __init__(self, db: Session, provider: str = "dashscope"):
"""
初始化多模态服务
Args:
db: 数据库会话
provider: 模型提供商dashscope, bedrock, anthropic 等)
"""
self.db = db
self.provider = provider.lower()
async def process_files(
self,
files: Optional[List[FileInput]]
) -> List[Dict[str, Any]]:
"""
处理文件列表,返回 LLM 可用的格式
Args:
files: 文件输入列表
Returns:
List[Dict]: LLM 可用的内容格式列表(根据 provider 返回不同格式)
"""
if not files:
return []
result = []
for idx, file in enumerate(files):
try:
if file.type == FileType.IMAGE:
content = await self._process_image(file)
result.append(content)
elif file.type == FileType.DOCUMENT:
content = await self._process_document(file)
result.append(content)
elif file.type == FileType.AUDIO:
content = await self._process_audio(file)
result.append(content)
elif file.type == FileType.VIDEO:
content = await self._process_video(file)
result.append(content)
else:
logger.warning(f"不支持的文件类型: {file.type}")
except Exception as e:
logger.error(
f"处理文件失败",
extra={
"file_index": idx,
"file_type": file.type,
"error": str(e)
}
)
# 继续处理其他文件,不中断整个流程
result.append({
"type": "text",
"text": f"[文件处理失败: {str(e)}]"
})
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件provider={self.provider}")
return result
async def _process_image(self, file: FileInput) -> Dict[str, Any]:
"""
处理图片文件
Args:
file: 图片文件输入
Returns:
Dict: 根据 provider 返回不同格式
- Anthropic/Bedrock: {"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}}
- 通义千问: {"type": "image", "image": "url"}
"""
if file.transfer_method == TransferMethod.REMOTE_URL:
url = file.url
else:
# 本地文件,获取访问 URL
url = await self._get_file_url(file.upload_file_id)
logger.debug(f"处理图片: {url}, provider={self.provider}")
# 根据 provider 返回不同格式
if self.provider in ["bedrock", "anthropic"]:
# Anthropic/Bedrock 只支持 base64 格式,需要下载并转换
try:
logger.info(f"开始下载并编码图片: {url}")
base64_data, media_type = await self._download_and_encode_image(url)
result = {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data[:100] + "..." # 只记录前100个字符
}
}
logger.info(f"图片编码完成: media_type={media_type}, data_length={len(base64_data)}")
# 返回完整数据
result["source"]["data"] = base64_data
return result
except Exception as e:
logger.error(f"下载并编码图片失败: {e}", exc_info=True)
# 返回错误提示
return {
"type": "text",
"text": f"[图片加载失败: {str(e)}]"
}
else:
# 通义千问等其他格式支持 URL
return {
"type": "image",
"image": url
}
async def _download_and_encode_image(self, url: str) -> tuple[str, str]:
"""
下载图片并转换为 base64
Args:
url: 图片 URL
Returns:
tuple: (base64_data, media_type)
"""
import httpx
import base64
from mimetypes import guess_type
# 下载图片
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
# 获取图片数据
image_data = response.content
# 确定 media type
content_type = response.headers.get("content-type")
if content_type and content_type.startswith("image/"):
media_type = content_type
else:
# 从 URL 推断
guessed_type, _ = guess_type(url)
media_type = guessed_type if guessed_type and guessed_type.startswith("image/") else "image/jpeg"
# 转换为 base64
base64_data = base64.b64encode(image_data).decode("utf-8")
logger.debug(f"图片编码完成: media_type={media_type}, size={len(base64_data)}")
return base64_data, media_type
async def _process_document(self, file: FileInput) -> Dict[str, Any]:
"""
处理文档文件PDF、Word 等)
Args:
file: 文档文件输入
Returns:
Dict: text 格式的内容(包含提取的文本)
"""
if file.transfer_method == TransferMethod.REMOTE_URL:
# 远程文档暂不支持提取
return {
"type": "text",
"text": f"<document url=\"{file.url}\">\n[远程文档,暂不支持内容提取]\n</document>"
}
else:
# 本地文件,提取文本内容
text = await self._extract_document_text(file.upload_file_id)
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file.upload_file_id
).first()
file_name = generic_file.file_name if generic_file else "unknown"
return {
"type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
}
async def _process_audio(self, file: FileInput) -> Dict[str, Any]:
"""
处理音频文件
Args:
file: 音频文件输入
Returns:
Dict: 音频内容(暂时返回占位符)
"""
# TODO: 实现音频转文字功能
return {
"type": "text",
"text": "[音频文件,暂不支持处理]"
}
async def _process_video(self, file: FileInput) -> Dict[str, Any]:
"""
处理视频文件
Args:
file: 视频文件输入
Returns:
Dict: 视频内容(暂时返回占位符)
"""
# TODO: 实现视频处理功能
return {
"type": "text",
"text": "[视频文件,暂不支持处理]"
}
async def _get_file_url(self, file_id: uuid.UUID) -> str:
"""
获取文件的访问 URL
Args:
file_id: 文件ID
Returns:
str: 文件访问 URL
Raises:
BusinessException: 文件不存在
"""
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file_id,
GenericFile.status == "active"
).first()
if not generic_file:
raise BusinessException(
f"文件不存在或已删除: {file_id}",
BizCode.NOT_FOUND
)
# 如果有 access_url直接返回
if generic_file.access_url:
return generic_file.access_url
# 否则,根据 storage_path 生成 URL
# TODO: 根据实际存储方式生成 URL本地存储、OSS 等)
# 这里暂时返回一个占位 URL
return f"/api/files/{file_id}/download"
async def _extract_document_text(self, file_id: uuid.UUID) -> str:
"""
提取文档文本内容
Args:
file_id: 文件ID
Returns:
str: 提取的文本内容
"""
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file_id,
GenericFile.status == "active"
).first()
if not generic_file:
raise BusinessException(
f"文件不存在或已删除: {file_id}",
BizCode.NOT_FOUND
)
# TODO: 根据文件类型提取文本
# - PDF: 使用 PyPDF2 或 pdfplumber
# - Word: 使用 python-docx
# - TXT/MD: 直接读取
file_ext = generic_file.file_ext.lower()
if file_ext in ['.txt', '.md', '.markdown']:
return await self._read_text_file(generic_file.storage_path)
elif file_ext == '.pdf':
return await self._extract_pdf_text(generic_file.storage_path)
elif file_ext in ['.doc', '.docx']:
return await self._extract_word_text(generic_file.storage_path)
else:
return f"[不支持的文档格式: {file_ext}]"
async def _read_text_file(self, storage_path: str) -> str:
"""读取纯文本文件"""
try:
with open(storage_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logger.error(f"读取文本文件失败: {e}")
return f"[文件读取失败: {str(e)}]"
async def _extract_pdf_text(self, storage_path: str) -> str:
"""提取 PDF 文本"""
try:
# TODO: 实现 PDF 文本提取
# import PyPDF2 或 pdfplumber
return "[PDF 文本提取功能待实现]"
except Exception as e:
logger.error(f"提取 PDF 文本失败: {e}")
return f"[PDF 提取失败: {str(e)}]"
async def _extract_word_text(self, storage_path: str) -> str:
"""提取 Word 文档文本"""
try:
# TODO: 实现 Word 文本提取
# import docx
return "[Word 文本提取功能待实现]"
except Exception as e:
logger.error(f"提取 Word 文本失败: {e}")
return f"[Word 提取失败: {str(e)}]"
def get_multimodal_service(db: Session) -> MultimodalService:
"""获取多模态服务实例(依赖注入)"""
return MultimodalService(db)

View File

@@ -78,7 +78,8 @@ class OntologyService:
scenario: str,
domain: Optional[str] = None,
scene_id: Optional[Any] = None,
workspace_id: Optional[Any] = None
workspace_id: Optional[Any] = None,
language: str = "zh"
) -> OntologyExtractionResponse:
"""执行本体提取
@@ -91,6 +92,7 @@ class OntologyService:
domain: 可选的领域提示
scene_id: 可选的场景ID,用于权限验证(不再用于自动保存)
workspace_id: 可选的工作空间ID,用于权限验证
language: 输出语言 ("zh" 中文, "en" 英文)
Returns:
OntologyExtractionResponse: 提取结果
@@ -155,6 +157,7 @@ class OntologyService:
llm_max_tokens=self.DEFAULT_LLM_MAX_TOKENS,
max_description_length=self.DEFAULT_MAX_DESCRIPTION_LENGTH,
timeout=self.DEFAULT_LLM_TIMEOUT,
language=language,
)
extraction_duration = time.time() - extraction_start_time

View File

@@ -36,6 +36,7 @@ async def run_pilot_extraction(
dialogue_text: str,
db: Session,
progress_callback: Optional[Callable[[str, str, Optional[dict]], Awaitable[None]]] = None,
language: str = "zh",
) -> None:
"""
执行试运行模式的知识提取流水线。
@@ -43,10 +44,12 @@ async def run_pilot_extraction(
Args:
memory_config: 从数据库加载的内存配置对象
dialogue_text: 输入的对话文本
db: 数据库会话
progress_callback: 可选的进度回调函数
- 参数1 (stage): 当前处理阶段标识符
- 参数2 (message): 人类可读的进度消息
- 参数3 (data): 可选的附加数据字典
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
"""
log_file = "logs/time.log"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
@@ -146,6 +149,7 @@ async def run_pilot_extraction(
config=config,
progress_callback=progress_callback,
embedding_id=str(memory_config.embedding_model_id),
language=language,
)
log_time("Orchestrator Initialization", time.time() - step_start, log_file)
@@ -191,6 +195,7 @@ async def run_pilot_extraction(
chunked_dialogs,
llm_client=llm_client,
embedder_client=embedder_client,
language=language,
)
log_time("Memory Summary Generation", time.time() - step_start, log_file)

View File

@@ -18,7 +18,7 @@ from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping
from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.memory_base_service import MemoryBaseService, MemoryTransService, Translation_English
from app.services.memory_base_service import MemoryBaseService, MemoryTransService
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.services.memory_short_service import ShortService
@@ -455,9 +455,7 @@ class UserMemoryService:
async def get_cached_memory_insight(
self,
db: Session,
end_user_id: str,
model_id: str,
language_type: str
end_user_id: str
) -> Dict[str, Any]:
"""
从数据库获取缓存的记忆洞察(四个维度)
@@ -465,7 +463,7 @@ class UserMemoryService:
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
Returns:
{
"memory_insight": str, # 总体概述
@@ -519,10 +517,6 @@ class UserMemoryService:
memory_insight=end_user.memory_insight
behavior_pattern=end_user.behavior_pattern
growth_trajectory=end_user.growth_trajectory
if language_type!='zh':
memory_insight=await Translation_English(model_id,memory_insight)
behavior_pattern=await Translation_English(model_id,behavior_pattern)
growth_trajectory=await Translation_English(model_id,growth_trajectory)
return {
"memory_insight":memory_insight, # 总体概述存储在 memory_insight
"behavior_pattern":behavior_pattern,
@@ -571,6 +565,8 @@ class UserMemoryService:
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
model_id: 模型ID用于翻译
language_type: 语言类型 ("zh" 中文, "en" 英文)
Returns:
{
@@ -604,11 +600,10 @@ class UserMemoryService:
personality_traits=end_user.personality_traits
core_values=end_user.core_values
one_sentence_summary=end_user.one_sentence_summary
if language_type!='zh':
user_summary=await Translation_English(model_id, user_summary)
personality_traits = await Translation_English(model_id, personality_traits)
core_values = await Translation_English(model_id, core_values)
one_sentence_summary = await Translation_English(model_id, one_sentence_summary)
# 直接返回数据库中的数据,不进行二次翻译
# 语言由生成时的 X-Language-Type 决定
has_cache = any([
user_summary,
personality_traits,
@@ -658,7 +653,8 @@ class UserMemoryService:
self,
db: Session,
end_user_id: str,
workspace_id: Optional[uuid.UUID] = None
workspace_id: Optional[uuid.UUID] = None,
language: str = "zh"
) -> Dict[str, Any]:
"""
生成并缓存记忆洞察
@@ -667,6 +663,7 @@ class UserMemoryService:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
workspace_id: 工作空间ID (可选)
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
{
@@ -679,7 +676,7 @@ class UserMemoryService:
}
"""
try:
logger.info(f"开始为 end_user_id {end_user_id} 生成记忆洞察")
logger.info(f"开始为 end_user_id {end_user_id} 生成记忆洞察, language={language}")
# 转换为UUID并查询用户
user_uuid = uuid.UUID(end_user_id)
@@ -700,7 +697,7 @@ class UserMemoryService:
# 使用 end_user_id 调用分析函数
try:
logger.info(f"使用 end_user_id={end_user_id} 生成记忆洞察")
result = await analytics_memory_insight_report(end_user_id)
result = await analytics_memory_insight_report(end_user_id, language=language)
memory_insight = result.get("memory_insight", "")
behavior_pattern = result.get("behavior_pattern", "")
@@ -789,7 +786,8 @@ class UserMemoryService:
self,
db: Session,
end_user_id: str,
workspace_id: Optional[uuid.UUID] = None
workspace_id: Optional[uuid.UUID] = None,
language: str = "zh"
) -> Dict[str, Any]:
"""
生成并缓存用户摘要(四个部分)
@@ -798,6 +796,7 @@ class UserMemoryService:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
workspace_id: 工作空间ID (可选)
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
{
@@ -810,7 +809,7 @@ class UserMemoryService:
}
"""
try:
logger.info(f"开始为 end_user_id {end_user_id} 生成用户摘要")
logger.info(f"开始为 end_user_id {end_user_id} 生成用户摘要, language={language}")
# 转换为UUID并查询用户
user_uuid = uuid.UUID(end_user_id)
@@ -831,7 +830,7 @@ class UserMemoryService:
# 使用 end_user_id 调用分析函数
try:
logger.info(f"使用 end_user_id={end_user_id} 生成用户摘要")
result = await analytics_user_summary(end_user_id)
result = await analytics_user_summary(end_user_id, language=language)
user_summary = result.get("user_summary", "")
personality = result.get("personality", "")
@@ -915,7 +914,8 @@ class UserMemoryService:
async def generate_cache_for_workspace(
self,
db: Session,
workspace_id: uuid.UUID
workspace_id: uuid.UUID,
language: str = "zh"
) -> Dict[str, Any]:
"""
为整个工作空间生成缓存
@@ -923,6 +923,7 @@ class UserMemoryService:
Args:
db: 数据库会话
workspace_id: 工作空间ID
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
{
@@ -932,7 +933,7 @@ class UserMemoryService:
"errors": List[Dict]
}
"""
logger.info(f"开始为工作空间 {workspace_id} 批量生成缓存")
logger.info(f"开始为工作空间 {workspace_id} 批量生成缓存, language={language}")
total_users = 0
successful = 0
@@ -953,10 +954,10 @@ class UserMemoryService:
try:
# 生成记忆洞察
insight_result = await self.generate_and_cache_insight(db, end_user_id)
insight_result = await self.generate_and_cache_insight(db, end_user_id, language=language)
# 生成用户摘要
summary_result = await self.generate_and_cache_summary(db, end_user_id)
summary_result = await self.generate_and_cache_summary(db, end_user_id, language=language)
# 检查是否都成功
if insight_result["success"] and summary_result["success"]:
@@ -1007,7 +1008,7 @@ class UserMemoryService:
# 独立的分析函数
async def analytics_memory_insight_report(end_user_id: Optional[str] = None) -> Dict[str, Any]:
async def analytics_memory_insight_report(end_user_id: Optional[str] = None, language: str = "zh") -> Dict[str, Any]:
"""
生成记忆洞察报告(四个维度)
@@ -1019,6 +1020,7 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
Args:
end_user_id: 可选的终端用户ID
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
包含四个维度报告的字典: {
@@ -1029,8 +1031,12 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
}
"""
from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt
from app.core.language_utils import validate_language
import re
# 验证语言参数
language = validate_language(language)
insight = MemoryInsightHelper(end_user_id)
try:
@@ -1070,7 +1076,8 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
user_prompt = await render_memory_insight_prompt(
domain_distribution=domain_distribution_str,
active_periods=active_periods_str,
social_connections=social_connections_str
social_connections=social_connections_str,
language=language
)
messages = [
@@ -1097,11 +1104,11 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
full_response = str(content) if content is not None else ""
# 7. 解析四个部分
# 使用正则表达式提取四个部分
memory_insight_match = re.search(r'【总体概述】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
behavior_match = re.search(r'【行为模式】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
findings_match = re.search(r'【关键发现】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
trajectory_match = re.search(r'【成长轨迹】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
# 使用正则表达式提取四个部分(支持中英文双语标题)
memory_insight_match = re.search(r'(?:总体概述|Overview)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
behavior_match = re.search(r'(?:行为模式|Behavior Pattern)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
findings_match = re.search(r'(?:关键发现|Key Findings)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
trajectory_match = re.search(r'(?:成长轨迹|Growth Trajectory)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
memory_insight = memory_insight_match.group(1).strip() if memory_insight_match else ""
behavior_pattern = behavior_match.group(1).strip() if behavior_match else ""
@@ -1128,7 +1135,7 @@ async def analytics_memory_insight_report(end_user_id: Optional[str] = None) ->
await insight.close()
async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str, Any]:
async def analytics_user_summary(end_user_id: Optional[str] = None, language: str = "zh") -> Dict[str, Any]:
"""
生成用户摘要(包含四个部分)
@@ -1139,6 +1146,7 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
Args:
end_user_id: 可选的终端用户ID
language: 语言类型 ("zh" 中文, "en" 英文),默认中文
Returns:
包含四部分摘要的字典: {
@@ -1149,8 +1157,12 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
}
"""
from app.core.memory.utils.prompt.prompt_utils import render_user_summary_prompt
from app.core.language_utils import validate_language
import re
# 验证语言参数
language = validate_language(language)
# 创建 UserSummaryHelper 实例
user_summary_tool = UserSummaryHelper(end_user_id or os.getenv("SELECTED_end_user_id", "group_123"))
@@ -1165,8 +1177,9 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
# 2) 使用 prompt_utils 渲染提示词
user_prompt = await render_user_summary_prompt(
user_id=user_summary_tool.user_id,
entities=", ".join(entity_lines) if entity_lines else "(空)",
statements=" | ".join(statement_samples) if statement_samples else "(空)"
entities=", ".join(entity_lines) if entity_lines else "(空)" if language == "zh" else "(empty)",
statements=" | ".join(statement_samples) if statement_samples else "(空)" if language == "zh" else "(empty)",
language=language
)
messages = [
@@ -1193,11 +1206,11 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
full_response = str(content) if content is not None else ""
# 5) 解析四个部分
# 使用正则表达式提取四个部分
user_summary_match = re.search(r'【基本介绍】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
personality_match = re.search(r'【性格特点】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
core_values_match = re.search(r'【核心价值观】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
one_sentence_match = re.search(r'【一句话总结】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
# 使用正则表达式提取四个部分(支持中英文标题)
user_summary_match = re.search(r'(?:基本介绍|Basic Introduction)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
personality_match = re.search(r'(?:性格特点|Personality Traits)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
core_values_match = re.search(r'(?:核心价值观|Core Values)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
one_sentence_match = re.search(r'(?:一句话总结|One-Sentence Summary)\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
user_summary = user_summary_match.group(1).strip() if user_summary_match else ""
personality = personality_match.group(1).strip() if personality_match else ""