Merge branch 'develop' of github.com:SuanmoSuanyangTechnology/MemoryBear into develop

This commit is contained in:
yujiangping
2026-03-04 15:03:04 +08:00
20 changed files with 534 additions and 162 deletions

View File

@@ -3,9 +3,10 @@ Cache 缓存模块
提供各种缓存功能的统一入口 提供各种缓存功能的统一入口
""" """
from .memory import EmotionMemoryCache, ImplicitMemoryCache from .memory import EmotionMemoryCache, ImplicitMemoryCache, InterestMemoryCache
__all__ = [ __all__ = [
"EmotionMemoryCache", "EmotionMemoryCache",
"ImplicitMemoryCache", "ImplicitMemoryCache",
"InterestMemoryCache",
] ]

View File

@@ -5,8 +5,10 @@ Memory 缓存模块
""" """
from .emotion_memory import EmotionMemoryCache from .emotion_memory import EmotionMemoryCache
from .implicit_memory import ImplicitMemoryCache from .implicit_memory import ImplicitMemoryCache
from .interest_memory import InterestMemoryCache
__all__ = [ __all__ = [
"EmotionMemoryCache", "EmotionMemoryCache",
"ImplicitMemoryCache", "ImplicitMemoryCache",
"InterestMemoryCache",
] ]

122
api/app/cache/memory/interest_memory.py vendored Normal file
View File

@@ -0,0 +1,122 @@
"""
Interest Distribution Cache
兴趣分布缓存模块
用于缓存用户的兴趣分布标签数据,避免重复调用模型生成
"""
import json
import logging
from typing import Optional, List, Dict, Any
from datetime import datetime
from app.aioRedis import aio_redis
logger = logging.getLogger(__name__)
# 缓存过期时间24小时
INTEREST_CACHE_EXPIRE = 86400
class InterestMemoryCache:
"""兴趣分布缓存类"""
PREFIX = "cache:memory:interest_distribution"
@classmethod
def _get_key(cls, end_user_id: str, language: str) -> str:
"""生成 Redis key
Args:
end_user_id: 用户ID
language: 语言类型
Returns:
完整的 Redis key
"""
return f"{cls.PREFIX}:by_user:{end_user_id}:{language}"
@classmethod
async def set_interest_distribution(
cls,
end_user_id: str,
language: str,
data: List[Dict[str, Any]],
expire: int = INTEREST_CACHE_EXPIRE,
) -> bool:
"""设置用户兴趣分布缓存
Args:
end_user_id: 用户ID
language: 语言类型
data: 兴趣分布列表,格式 [{"name": "...", "frequency": ...}, ...]
expire: 过期时间默认24小时
Returns:
是否设置成功
"""
try:
key = cls._get_key(end_user_id, language)
payload = {
"data": data,
"generated_at": datetime.now().isoformat(),
"cached": True,
}
value = json.dumps(payload, ensure_ascii=False)
await aio_redis.set(key, value, ex=expire)
logger.info(f"设置兴趣分布缓存成功: {key}, 过期时间: {expire}")
return True
except Exception as e:
logger.error(f"设置兴趣分布缓存失败: {e}", exc_info=True)
return False
@classmethod
async def get_interest_distribution(
cls,
end_user_id: str,
language: str,
) -> Optional[List[Dict[str, Any]]]:
"""获取用户兴趣分布缓存
Args:
end_user_id: 用户ID
language: 语言类型
Returns:
兴趣分布列表,缓存不存在或已过期返回 None
"""
try:
key = cls._get_key(end_user_id, language)
value = await aio_redis.get(key)
if value:
payload = json.loads(value)
logger.info(f"命中兴趣分布缓存: {key}")
return payload.get("data")
logger.info(f"兴趣分布缓存不存在或已过期: {key}")
return None
except Exception as e:
logger.error(f"获取兴趣分布缓存失败: {e}", exc_info=True)
return None
@classmethod
async def delete_interest_distribution(
cls,
end_user_id: str,
language: str,
) -> bool:
"""删除用户兴趣分布缓存
Args:
end_user_id: 用户ID
language: 语言类型
Returns:
是否删除成功
"""
try:
key = cls._get_key(end_user_id, language)
result = await aio_redis.delete(key)
logger.info(f"删除兴趣分布缓存: {key}, 结果: {result}")
return result > 0
except Exception as e:
logger.error(f"删除兴趣分布缓存失败: {e}", exc_info=True)
return False

View File

@@ -1,6 +1,7 @@
import os import os
import platform import platform
from datetime import timedelta from datetime import timedelta
from celery.schedules import crontab
from urllib.parse import quote from urllib.parse import quote
from celery import Celery from celery import Celery
@@ -90,11 +91,10 @@ celery_app.conf.update(
celery_app.autodiscover_tasks(['app']) celery_app.autodiscover_tasks(['app'])
# Celery Beat schedule for periodic tasks # Celery Beat schedule for periodic tasks
memory_increment_schedule = timedelta(hours=settings.MEMORY_INCREMENT_INTERVAL_HOURS) memory_increment_schedule = crontab(hour=settings.MEMORY_INCREMENT_HOUR, minute=settings.MEMORY_INCREMENT_MINUTE)
memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGENERATION_HOURS) memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGENERATION_HOURS)
# 这个30秒的设计不合理 workspace_reflection_schedule = timedelta(seconds=settings.WORKSPACE_REFLECTION_INTERVAL_SECONDS)
workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME forgetting_cycle_schedule = timedelta(hours=settings.FORGETTING_CYCLE_INTERVAL_HOURS)
forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期
#构建定时任务配置 #构建定时任务配置
beat_schedule_config = { beat_schedule_config = {

View File

@@ -1,5 +1,6 @@
from typing import List, Optional from typing import List, Optional
from app.cache.memory.interest_memory import InterestMemoryCache
from app.celery_app import celery_app from app.celery_app import celery_app
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
from app.core.language_utils import get_language_from_header from app.core.language_utils import get_language_from_header
@@ -661,34 +662,56 @@ async def get_knowledge_type_stats_api(
return fail(BizCode.INTERNAL_ERROR, "获取知识库类型统计失败", str(e)) return fail(BizCode.INTERNAL_ERROR, "获取知识库类型统计失败", str(e))
@router.get("/analytics/hot_memory_tags/by_user", response_model=ApiResponse) @router.get("/analytics/interest_distribution/by_user", response_model=ApiResponse)
async def get_hot_memory_tags_by_user_api( async def get_interest_distribution_by_user_api(
end_user_id: Optional[str] = Query(None, description="用户ID可选"), end_user_id: str = Query(..., description="用户ID必填"),
limit: int = Query(20, description="返回标签数量限制"), limit: int = Query(5, le=5, description="返回兴趣标签数量限制最多5个"),
language_type: str = Header(default=None, alias="X-Language-Type"),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
db: Session=Depends(get_db), db: Session = Depends(get_db),
): ):
""" """
获取指定用户的热门记忆标签 获取指定用户的兴趣分布标签
注意:标签语言由写入时的 X-Language-Type 决定,查询时不进行翻译 与热门标签不同,此接口专注于识别用户的兴趣活动(运动、爱好、学习、创作等),
过滤掉纯物品、工具、地点等不代表用户主动参与活动的名词。
返回格式: 返回格式:
[ [
{"name": "标签", "frequency": 频次}, {"name": "兴趣活动", "frequency": 频次},
... ...
] ]
""" """
api_logger.info(f"Hot memory tags by user requested: end_user_id={end_user_id}") language = get_language_from_header(language_type)
api_logger.info(f"Interest distribution by user requested: end_user_id={end_user_id}, language={language}")
try: try:
result = await memory_agent_service.get_hot_memory_tags_by_user( # 优先读取缓存
cached = await InterestMemoryCache.get_interest_distribution(
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit language=language,
) )
return success(data=result, msg="获取热门记忆标签成功") if cached is not None:
api_logger.info(f"Interest distribution cache hit: end_user_id={end_user_id}")
return success(data=cached, msg="获取兴趣分布标签成功")
# 缓存未命中,调用模型生成
result = await memory_agent_service.get_interest_distribution_by_user(
end_user_id=end_user_id,
limit=limit,
language=language
)
# 写入缓存24小时过期
await InterestMemoryCache.set_interest_distribution(
end_user_id=end_user_id,
language=language,
data=result,
)
return success(data=result, msg="获取兴趣分布标签成功")
except Exception as e: except Exception as e:
api_logger.error(f"Hot memory tags by user failed: {str(e)}") api_logger.error(f"Interest distribution by user failed: {str(e)}")
return fail(BizCode.INTERNAL_ERROR, "获取热门记忆标签失败", str(e)) return fail(BizCode.INTERNAL_ERROR, "获取兴趣分布标签失败", str(e))
@router.get("/analytics/user_profile", response_model=ApiResponse) @router.get("/analytics/user_profile", response_model=ApiResponse)

View File

@@ -1,9 +1,10 @@
import json import json
import os import os
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional from typing import Annotated, Any, Dict, Optional
from dotenv import load_dotenv from dotenv import load_dotenv
from pydantic import Field, TypeAdapter
load_dotenv() load_dotenv()
@@ -200,12 +201,25 @@ class Settings:
REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300")) REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300"))
HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600")) HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600"))
MEMORY_INCREMENT_INTERVAL_HOURS: float = float(os.getenv("MEMORY_INCREMENT_INTERVAL_HOURS", "24"))
REFLECTION_INTERVAL_TIME: Optional[str] = int(os.getenv("REFLECTION_INTERVAL_TIME", 30)) REFLECTION_INTERVAL_TIME: Optional[str] = int(os.getenv("REFLECTION_INTERVAL_TIME", 30))
# Memory Cache Regeneration Configuration # Memory Cache Regeneration Configuration
MEMORY_CACHE_REGENERATION_HOURS: int = int(os.getenv("MEMORY_CACHE_REGENERATION_HOURS", "24")) MEMORY_CACHE_REGENERATION_HOURS: int = int(os.getenv("MEMORY_CACHE_REGENERATION_HOURS", "24"))
# Celery Beat Schedule Configuration (定时任务执行频率)
MEMORY_INCREMENT_HOUR: int = TypeAdapter(
Annotated[int, Field(ge=0, le=23, description="cron hour [0, 23]")]
).validate_python(int(os.getenv("MEMORY_INCREMENT_HOUR", "2")))
MEMORY_INCREMENT_MINUTE: int = TypeAdapter(
Annotated[int, Field(ge=0, le=59, description="cron minute [0, 59]")]
).validate_python(int(os.getenv("MEMORY_INCREMENT_MINUTE", "0")))
WORKSPACE_REFLECTION_INTERVAL_SECONDS: int = TypeAdapter(
Annotated[int, Field(ge=1, description="reflection interval in seconds, must be >= 1")]
).validate_python(int(os.getenv("WORKSPACE_REFLECTION_INTERVAL_SECONDS", "30")))
FORGETTING_CYCLE_INTERVAL_HOURS: int = TypeAdapter(
Annotated[int, Field(ge=1, description="forgetting cycle interval in hours, must be >= 1")]
).validate_python(int(os.getenv("FORGETTING_CYCLE_INTERVAL_HOURS", "24")))
# Memory Module Configuration (internal) # Memory Module Configuration (internal)
MEMORY_OUTPUT_DIR: str = os.getenv("MEMORY_OUTPUT_DIR", "logs/memory-output") MEMORY_OUTPUT_DIR: str = os.getenv("MEMORY_OUTPUT_DIR", "logs/memory-output")
MEMORY_CONFIG_DIR: str = os.getenv("MEMORY_CONFIG_DIR", "app/core/memory") MEMORY_CONFIG_DIR: str = os.getenv("MEMORY_CONFIG_DIR", "app/core/memory")
@@ -230,7 +244,7 @@ class Settings:
# General Ontology Type Configuration # General Ontology Type Configuration
# ======================================================================== # ========================================================================
# 通用本体文件路径列表(逗号分隔) # 通用本体文件路径列表(逗号分隔)
GENERAL_ONTOLOGY_FILES: str = os.getenv("GENERAL_ONTOLOGY_FILES", "app/core/memory/ontology_services/General_purpose_entity.ttl") GENERAL_ONTOLOGY_FILES: str = os.getenv("GENERAL_ONTOLOGY_FILES", "api/app/core/memory/ontology_services/General_purpose_entity.ttl")
# 是否启用通用本体类型功能 # 是否启用通用本体类型功能
ENABLE_GENERAL_ONTOLOGY_TYPES: bool = os.getenv("ENABLE_GENERAL_ONTOLOGY_TYPES", "true").lower() == "true" ENABLE_GENERAL_ONTOLOGY_TYPES: bool = os.getenv("ENABLE_GENERAL_ONTOLOGY_TYPES", "true").lower() == "true"

View File

@@ -1,9 +1,12 @@
import asyncio import asyncio
import json import json
import logging
import os import os
from typing import List, Tuple from typing import List, Tuple
from app.core.config import settings from app.core.config import settings
logger = logging.getLogger(__name__)
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context from app.db import get_db_context
from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.neo4j.neo4j_connector import Neo4jConnector
@@ -16,6 +19,10 @@ class FilteredTags(BaseModel):
"""用于接收LLM筛选后的核心标签列表的模型。""" """用于接收LLM筛选后的核心标签列表的模型。"""
meaningful_tags: List[str] = Field(..., description="从原始列表中筛选出的具有核心代表意义的名词列表。") meaningful_tags: List[str] = Field(..., description="从原始列表中筛选出的具有核心代表意义的名词列表。")
class InterestTags(BaseModel):
"""用于接收LLM筛选后的兴趣活动标签列表的模型。"""
interest_tags: List[str] = Field(..., description="从原始列表中筛选出的代表用户兴趣活动的标签列表。")
async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]: async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]:
""" """
使用LLM筛选标签列表仅保留具有代表性的核心名词。 使用LLM筛选标签列表仅保留具有代表性的核心名词。
@@ -85,10 +92,74 @@ async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]:
return structured_response.meaningful_tags return structured_response.meaningful_tags
except Exception as e: except Exception as e:
print(f"LLM筛选过程中发生错误: {e}") logger.error(f"LLM筛选过程中发生错误: {e}", exc_info=True)
# 在LLM失败时返回原始标签确保流程继续 # 在LLM失败时返回原始标签确保流程继续
return tags return tags
async def filter_interests_with_llm(tags: List[str], end_user_id: str, language: str = "zh") -> List[str]:
"""
使用LLM从标签列表中筛选出代表用户兴趣活动的标签。
与 filter_tags_with_llm 不同,此函数专注于识别"活动/行为"类兴趣,
过滤掉纯物品、工具、地点等不代表用户主动参与活动的名词。
Args:
tags: 原始标签列表
end_user_id: 用户ID用于获取LLM配置
Returns:
筛选后的兴趣活动标签列表
"""
try:
with get_db_context() as db:
from app.services.memory_agent_service import (
get_end_user_connected_config,
)
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id")
workspace_id = connected_config.get("workspace_id")
if not config_id and not workspace_id:
raise ValueError(
f"No memory_config_id found for end_user_id: {end_user_id}."
)
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id
)
if not memory_config.llm_model_id:
raise ValueError(
f"No llm_model_id found in memory config {config_id}."
)
factory = MemoryClientFactory(db)
llm_client = factory.get_llm_client(memory_config.llm_model_id)
tag_list_str = ", ".join(tags)
from app.core.memory.utils.prompt.prompt_utils import render_interest_filter_prompt
rendered_prompt = render_interest_filter_prompt(tag_list_str, language=language)
messages = [
{
"role": "user",
"content": rendered_prompt
}
]
structured_response = await llm_client.response_structured(
messages=messages,
response_model=InterestTags
)
return structured_response.interest_tags
except Exception as e:
logger.error(f"兴趣标签LLM筛选过程中发生错误: {e}", exc_info=True)
return tags
async def get_raw_tags_from_db( async def get_raw_tags_from_db(
connector: Neo4jConnector, connector: Neo4jConnector,
end_user_id: str, end_user_id: str,
@@ -183,3 +254,56 @@ async def get_hot_memory_tags(end_user_id: str, limit: int = 10, by_user: bool =
finally: finally:
# 确保关闭连接 # 确保关闭连接
await connector.close() await connector.close()
async def get_interest_distribution(end_user_id: str, limit: int = 10, by_user: bool = False, language: str = "zh") -> List[Tuple[str, int]]:
"""
获取用户的兴趣分布标签。
与 get_hot_memory_tags 不同,此函数使用专门针对"活动/行为"的LLM prompt
过滤掉纯物品、工具、地点等,只保留能代表用户兴趣爱好的活动类标签。
Args:
end_user_id: 必需参数。如果by_user=False则为end_user_id如果by_user=True则为user_id
limit: 最终返回的标签数量限制默认10
by_user: 是否按user_id查询默认False按end_user_id查询
Raises:
ValueError: 如果end_user_id未提供或为空
"""
if not end_user_id or not end_user_id.strip():
raise ValueError(
"end_user_id is required. Please provide a valid end_user_id or user_id."
)
connector = Neo4jConnector()
try:
# 查询更多原始标签给LLM提供充足上下文
query_limit = 40
raw_tags_with_freq = await get_raw_tags_from_db(connector, end_user_id, query_limit, by_user=by_user)
if not raw_tags_with_freq:
return []
raw_tag_names = [tag for tag, freq in raw_tags_with_freq]
raw_freq_map = {tag: freq for tag, freq in raw_tags_with_freq}
# 使用兴趣活动专用prompt进行筛选支持语义推断出新标签
interest_tag_names = await filter_interests_with_llm(raw_tag_names, end_user_id, language=language)
# 构建最终标签列表:
# - 原始标签中存在的,保留原始频率
# - LLM推断出的新标签不在原始列表中赋予默认频率1
final_tags = []
seen = set()
for tag in interest_tag_names:
if tag in seen:
continue
seen.add(tag)
freq = raw_freq_map.get(tag, 1)
final_tags.append((tag, freq))
# 按频率降序排列
final_tags.sort(key=lambda x: x[1], reverse=True)
return final_tags[:limit]
finally:
await connector.close()

View File

@@ -548,3 +548,20 @@ async def render_ontology_extraction_prompt(
}) })
return rendered_prompt return rendered_prompt
def render_interest_filter_prompt(tag_list: str, language: str = "zh") -> str:
"""
Renders the interest filter prompt using the interest_filter.jinja2 template.
Args:
tag_list: Comma-separated string of raw tags to filter
language: Output language ("zh" for Chinese, "en" for English)
Returns:
Rendered prompt content as string
"""
template = prompt_env.get_template("interest_filter.jinja2")
rendered_prompt = template.render(tag_list=tag_list, language=language)
log_prompt_rendering('interest filter', rendered_prompt)
return rendered_prompt

View File

@@ -0,0 +1,67 @@
{% if language == "zh" %}
You are a user interest analysis expert. Your task is to infer and extract the user's core hobby/interest activities from a tag list. The tags may be specific project names, tool names, or compound nouns — your job is to identify the underlying interest they represent.
**Step 1 - Infer the underlying interest from each tag**:
Look at each tag and ask: "What hobby or interest does this tag suggest the user has?"
Examples of inference:
- '攀岩', '室内攀岩馆', '攀岩者数据仪表盘', '路线解锁地图', '指力', '路线等级', '当日攀岩流畅度' → '攀岩'
- '风光摄影元数据增强器', 'EXIF数据', '.CR2文件', '.NEF文件', '日出拍摄点', '曝光补偿', '光圈', '太阳高度角', '云量预测图层' → '摄影'
- '晨间冥想坚持天数', '身心协同峰值' → '冥想'
- '川味可视化', '川菜' → '烹饪'
- '开源项目命名建议', 'climbviz', '可视化', '力量增长雷达图' → '编程' 或 '数据可视化'
- '吉他', '指弹', '琴谱' → '吉他'
- '跑步', '5公里', '跑鞋' → '跑步'
- '瑜伽垫', '瑜伽课' → '瑜伽'
**Step 2 - Consolidate and deduplicate**:
- Merge tags that point to the same interest into one representative label
- Use concise, standard hobby names (e.g., '攀岩', '摄影', '编程', '烹饪', '冥想', '吉他', '跑步')
- If multiple tags all point to '攀岩', output '攀岩' only once
**Step 3 - Filter out non-interest tags**:
Remove tags that do NOT suggest any hobby or interest:
- Generic system/assistant terms (e.g., '助手', '用户', 'AI')
- Pure abstract metrics with no clear hobby link (e.g., '完成时间', '日期', '自我评分')
- Location names with no clear hobby link (e.g., '青城山后山' alone — but if combined with photography context, infer '摄影')
**Output format**: Return a list of concise interest activity names in Chinese.
**Example**:
Input: ['攀岩', '攀岩者数据仪表盘', '路线解锁地图', '指力', '风光摄影元数据增强器', 'EXIF数据', '晨间冥想坚持天数', '川味可视化', '可视化', '助手', '完成时间']
Output: ['攀岩', '摄影', '冥想', '烹饪', '编程']
Now process the following tag list and return the inferred interest activities in Chinese: {{ tag_list }}
{% else %}
You are a user interest analysis expert. Your task is to infer and extract the user's core hobby/interest activities from a tag list. The tags may be specific project names, tool names, or compound nouns — your job is to identify the underlying interest they represent.
**Step 1 - Infer the underlying interest from each tag**:
Look at each tag and ask: "What hobby or interest does this tag suggest the user has?"
Examples of inference:
- 'rock climbing', 'indoor climbing gym', 'climber dashboard', 'route map', 'finger strength' → 'rock climbing'
- 'landscape photography metadata enhancer', 'EXIF data', 'sunrise shooting spot', 'exposure compensation' → 'photography'
- 'morning meditation streak', 'mind-body peak' → 'meditation'
- 'Sichuan cuisine visualization', 'Sichuan food' → 'cooking'
- 'open source project', 'data visualization tool', 'Python' → 'programming'
- 'guitar', 'fingerpicking', 'sheet music' → 'guitar'
- 'running', '5km', 'running shoes' → 'running'
**Step 2 - Consolidate and deduplicate**:
- Merge tags that point to the same interest into one representative label
- Use concise, standard hobby names (e.g., 'rock climbing', 'photography', 'programming', 'cooking', 'meditation')
- If multiple tags all point to 'rock climbing', output 'rock climbing' only once
**Step 3 - Filter out non-interest tags**:
Remove tags that do NOT suggest any hobby or interest:
- Generic system/assistant terms (e.g., 'assistant', 'user', 'AI')
- Pure abstract metrics with no clear hobby link (e.g., 'completion time', 'date', 'self-rating')
**Output format**: Return a list of concise interest activity names in English.
**Example**:
Input: ['rock climbing', 'climber dashboard', 'route map', 'finger strength', 'landscape photography metadata enhancer', 'EXIF data', 'morning meditation streak', 'Sichuan cuisine visualization', 'visualization', 'assistant', 'completion time']
Output: ['rock climbing', 'photography', 'meditation', 'cooking', 'programming']
Now process the following tag list and return the inferred interest activities in English: {{ tag_list }}
{% endif %}

View File

@@ -127,7 +127,7 @@ class EventStreamHandler:
yield { yield {
"event": "message", "event": "message",
"data": { "data": {
"chunk": data.get("chunk") "content": data.get("chunk")
} }
} }

View File

@@ -274,7 +274,7 @@ class StreamOutputCoordinator:
yield { yield {
"event": "message", "event": "message",
"data": { "data": {
"chunk": final_chunk "content": final_chunk
} }
} }

View File

@@ -272,7 +272,7 @@ class WorkflowExecutor:
event_type = data.get("type", "node_chunk") # "message" or "node_chunk" event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
if event_type == "node_chunk": if event_type == "node_chunk":
async for msg_event in self.event_handler.handle_node_chunk_event(data): async for msg_event in self.event_handler.handle_node_chunk_event(data):
full_content += msg_event["data"]["chunk"] full_content += msg_event["data"]["content"]
yield msg_event yield msg_event
elif event_type == "node_error": elif event_type == "node_error":
@@ -295,12 +295,12 @@ class WorkflowExecutor:
self.graph, self.graph,
self.execution_context.checkpoint_config self.execution_context.checkpoint_config
): ):
full_content += msg_event["data"]['chunk'] full_content += msg_event["data"]['content']
yield msg_event yield msg_event
# Flush any remaining chunks # Flush any remaining chunks
async for msg_event in self.stream_coordinator.flush_remaining_chunk(self.variable_pool): async for msg_event in self.stream_coordinator.flush_remaining_chunk(self.variable_pool):
full_content += msg_event["data"]['chunk'] full_content += msg_event["data"]['content']
yield msg_event yield msg_event
result = graph.get_state(self.execution_context.checkpoint_config).values result = graph.get_state(self.execution_context.checkpoint_config).values

View File

@@ -36,7 +36,7 @@ from app.core.memory.agent.utils.messages_tools import (
) )
from app.core.memory.agent.utils.type_classifier import status_typle from app.core.memory.agent.utils.type_classifier import status_typle
from app.core.memory.agent.utils.write_tools import write # 新增:直接导入 write 函数 from app.core.memory.agent.utils.write_tools import write # 新增:直接导入 write 函数
from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags, get_interest_distribution
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context from app.db import get_db_context
from app.models.knowledge_model import Knowledge, KnowledgeType from app.models.knowledge_model import Knowledge, KnowledgeType
@@ -890,36 +890,36 @@ class MemoryAgentService:
return result return result
async def get_hot_memory_tags_by_user(
async def get_interest_distribution_by_user(
self, self,
end_user_id: Optional[str] = None, end_user_id: Optional[str] = None,
limit: int = 20 limit: int = 5,
language: str = "zh"
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
获取指定用户的热门记忆标签 获取指定用户的兴趣分布标签
与热门标签不同,此接口专注于识别用户的兴趣活动(运动、爱好、学习等),
过滤掉纯物品、工具、地点等不代表用户主动参与活动的名词。
参数: 参数:
- end_user_id: 用户ID可选对应Neo4j中的end_user_id字段 - end_user_id: 用户ID必填)
- limit: 返回标签数量限制 - limit: 返回标签数量限制
- language: 输出语言("zh" 中文, "en" 英文)
返回格式: 返回格式:
[ [
{"name": "标签", "frequency": 频次}, {"name": "兴趣活动", "frequency": 频次},
... ...
] ]
注意:标签语言由写入时的 X-Language-Type 决定,查询时不进行翻译
""" """
try: try:
# by_user=False 表示按 end_user_id 查询在Neo4j中end_user_id就是用户维度 tags = await get_interest_distribution(end_user_id, limit=limit, by_user=False, language=language)
tags = await get_hot_memory_tags(end_user_id, limit=limit, by_user=False) return [{"name": tag, "frequency": freq} for tag, freq in tags]
payload = []
for tag, freq in tags:
payload.append({"name": tag, "frequency": freq})
return payload
except Exception as e: except Exception as e:
logger.error(f"热门记忆标签查询失败: {e}") logger.error(f"兴趣分布标签查询失败: {e}")
raise Exception(f"热门记忆标签查询失败: {e}") raise Exception(f"兴趣分布标签查询失败: {e}")
async def get_user_profile( async def get_user_profile(

View File

@@ -13,6 +13,7 @@ from sqlalchemy.orm import Session
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException from app.core.exceptions import BusinessException
from app.core.workflow.adapters.registry import PlatformAdapterRegistry from app.core.workflow.adapters.registry import PlatformAdapterRegistry
from app.core.workflow.executor import execute_workflow, execute_workflow_stream
from app.core.workflow.nodes.enums import NodeType from app.core.workflow.nodes.enums import NodeType
from app.core.workflow.validator import validate_workflow_config from app.core.workflow.validator import validate_workflow_config
from app.db import get_db from app.db import get_db
@@ -23,7 +24,7 @@ from app.repositories.workflow_repository import (
WorkflowExecutionRepository, WorkflowExecutionRepository,
WorkflowNodeExecutionRepository WorkflowNodeExecutionRepository
) )
from app.schemas import DraftRunRequest from app.schemas import DraftRunRequest, FileInput
from app.services.conversation_service import ConversationService from app.services.conversation_service import ConversationService
from app.services.multi_agent_service import convert_uuids_to_str from app.services.multi_agent_service import convert_uuids_to_str
from app.services.multimodal_service import MultimodalService from app.services.multimodal_service import MultimodalService
@@ -445,6 +446,91 @@ class WorkflowService:
"success_rate": completed / total if total > 0 else 0 "success_rate": completed / total if total > 0 else 0
} }
async def _handle_file_input(self, files: list[FileInput]):
if not files:
return []
files_struct = []
for file in files:
files_struct.append(
{
"type": file.type,
"url": await self.multimodal_service.get_file_url(file),
"__file": True
}
)
return files_struct
@staticmethod
def _map_public_event(event: dict) -> dict | None:
"""
Map internal workflow events to public-facing event formats.
Purpose:
- Hide internal execution details
- Expose a stable and simplified public event schema
- Filter out non-public events
- Maintain backward compatibility when possible
Args:
event (dict): Internal event object, e.g.:
{
"event": "workflow_start",
"data": {...}
}
Returns:
dict | None:
- Returns the mapped public event
- Returns None if the event should not be exposed
"""
event_type = event.get("event")
payload = event.get("data")
match event_type:
case "workflow_start":
return {
"event": "start",
"data": {
"conversation_id": payload.get("conversation_id"),
}
}
case "workflow_end":
return {
"event": "end",
"data": {
"elapsed_time": payload.get("elapsed_time"),
"message_length": len(payload.get("output", "")),
"error": payload.get("error", "")
}
}
case "node_start" | "node_end" | "node_error" | "cycle_item":
return None
case _:
return event
def _emit(self, public: bool, internal_event: dict):
"""
Unified event emission entry.
Args:
public (bool):
- True -> Emit mapped public event
- False -> Emit raw internal event
internal_event (dict):
The original internal event object
Returns:
dict | None:
- The mapped event
- Or None if the event is filtered out
"""
if public:
mapped = self._map_public_event(internal_event)
else:
mapped = internal_event
return mapped
# ==================== 工作流执行 ==================== # ==================== 工作流执行 ====================
async def run( async def run(
@@ -479,10 +565,11 @@ class WorkflowService:
message=f"工作流配置不存在: app_id={app_id}" message=f"工作流配置不存在: app_id={app_id}"
) )
input_data = {"message": payload.message, "variables": payload.variables, input_data = {
"conversation_id": payload.conversation_id, "message": payload.message, "variables": payload.variables,
"files": [file.model_dump(mode='json') for file in payload.files] "conversation_id": payload.conversation_id,
} "files": [file.model_dump(mode='json') for file in payload.files]
}
# 转换 conversation_id 为 UUID # 转换 conversation_id 为 UUID
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
@@ -506,22 +593,8 @@ class WorkflowService:
"execution_config": config.execution_config "execution_config": config.execution_config
} }
# 4. 获取工作空间 ID从 app 获取)
# 5. 执行工作流
from app.core.workflow.executor import execute_workflow
try: try:
files = [] files = await self._handle_file_input(payload.files)
if payload.files:
for file in payload.files:
files.append(
{
"type": file.type,
"url": await self.multimodal_service.get_file_url(file),
"__file": True
}
)
input_data["files"] = files input_data["files"] = files
# 更新状态为运行中 # 更新状态为运行中
self.update_execution_status(execution.execution_id, "running") self.update_execution_status(execution.execution_id, "running")
@@ -601,42 +674,6 @@ class WorkflowService:
message=f"工作流执行失败: {str(e)}" message=f"工作流执行失败: {str(e)}"
) )
@staticmethod
def _map_public_event(event: dict) -> dict | None:
event_type = event.get("event")
payload = event.get("data")
match event_type:
case "workflow_start":
return {
"event": "start",
"data": {
"conversation_id": payload.get("conversation_id"),
}
}
case "workflow_end":
return {
"event": "end",
"data": {
"elapsed_time": payload.get("elapsed_time"),
"message_length": len(payload.get("output", "")),
"error": payload.get("error", "")
}
}
case "node_start" | "node_end" | "node_error" | "cycle_item":
return None
case _:
return event
def _emit(self, public: bool, internal_event: dict):
"""
decide
"""
if public:
mapped = self._map_public_event(internal_event)
else:
mapped = internal_event
return mapped
async def run_stream( async def run_stream(
self, self,
app_id: uuid.UUID, app_id: uuid.UUID,
@@ -671,10 +708,11 @@ class WorkflowService:
message=f"工作流配置不存在: app_id={app_id}" message=f"工作流配置不存在: app_id={app_id}"
) )
input_data = {"message": payload.message, "variables": payload.variables, input_data = {
"conversation_id": payload.conversation_id, "message": payload.message, "variables": payload.variables,
"files": [file.model_dump(mode='json') for file in payload.files] "conversation_id": payload.conversation_id,
} "files": [file.model_dump(mode='json') for file in payload.files]
}
# 转换 conversation_id 为 UUID # 转换 conversation_id 为 UUID
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
@@ -699,16 +737,7 @@ class WorkflowService:
} }
try: try:
files = [] files = await self._handle_file_input(payload.files)
if payload.files:
for file in payload.files:
files.append(
{
"type": file.type,
"url": await self.multimodal_service.get_file_url(file),
"__file": True
}
)
input_data["files"] = files input_data["files"] = files
self.update_execution_status(execution.execution_id, "running") self.update_execution_status(execution.execution_id, "running")
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
@@ -723,7 +752,6 @@ class WorkflowService:
input_data["conv_messages"] = last_state.get("messages") or [] input_data["conv_messages"] = last_state.get("messages") or []
break break
init_message_length = len(input_data.get("conv_messages", [])) init_message_length = len(input_data.get("conv_messages", []))
from app.core.workflow.executor import execute_workflow_stream
async for event in execute_workflow_stream( async for event in execute_workflow_stream(
workflow_config=workflow_config_dict, workflow_config=workflow_config_dict,
@@ -789,37 +817,6 @@ class WorkflowService:
return node.get("config", {}).get("variables", []) return node.get("config", {}).get("variables", [])
raise BusinessException("workflow config error - start node not found") raise BusinessException("workflow config error - start node not found")
def _clean_event_for_json(self, event: dict[str, Any]) -> dict[str, Any]:
"""清理事件数据,移除不可序列化的对象
Args:
event: 原始事件数据
Returns:
可序列化的事件数据
"""
from langchain_core.messages import BaseMessage
def clean_value(value):
"""递归清理值"""
if isinstance(value, BaseMessage):
# 将 Message 对象转换为字典
return {
"type": value.__class__.__name__,
"content": value.content,
}
elif isinstance(value, dict):
return {k: clean_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [clean_value(item) for item in value]
elif isinstance(value, (str, int, float, bool, type(None))):
return value
else:
# 其他不可序列化的对象转换为字符串
return str(value)
return clean_value(event)
# ==================== 依赖注入函数 ==================== # ==================== 依赖注入函数 ====================

View File

@@ -139,7 +139,7 @@ SMTP_USER=
SMTP_PASSWORD= SMTP_PASSWORD=
# 本体类型融合配置 (记得写入env_example) # 本体类型融合配置 (记得写入env_example)
GENERAL_ONTOLOGY_FILES=app/core/memory/ontology_services/General_purpose_entity.ttl # 指定要加载的本体文件路径,多个文件用逗号分隔 GENERAL_ONTOLOGY_FILES=api/app/core/memory/ontology_services/General_purpose_entity.ttl # 指定要加载的本体文件路径,多个文件用逗号分隔
ENABLE_GENERAL_ONTOLOGY_TYPES=true # 总开关,控制是否启用通用本体类型融合功能(false = 不使用任何本体类型指导) ENABLE_GENERAL_ONTOLOGY_TYPES=true # 总开关,控制是否启用通用本体类型融合功能(false = 不使用任何本体类型指导)
MAX_ONTOLOGY_TYPES_IN_PROMPT=100 # 限制传给 LLM 的类型数量,防止 Prompt 过长 MAX_ONTOLOGY_TYPES_IN_PROMPT=100 # 限制传给 LLM 的类型数量,防止 Prompt 过长
CORE_GENERAL_TYPES=Person,Organization,Place,Event,Work,Concept # 定义核心类型列表,这些类型会优先包含在合并结果中 CORE_GENERAL_TYPES=Person,Organization,Place,Event,Work,Concept # 定义核心类型列表,这些类型会优先包含在合并结果中

View File

@@ -1,8 +1,8 @@
/* /*
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-02-03 14:00:06 * @Date: 2026-02-03 14:00:06
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-02-03 14:00:06 * @Last Modified time: 2026-03-04 10:58:41
*/ */
import { request } from '@/utils/request' import { request } from '@/utils/request'
import type { import type {
@@ -98,8 +98,8 @@ export const getMemorySearchEdges = (end_user_id: string) => {
return request.get(`/memory-storage/analytics/graph_data`, { end_user_id }) return request.get(`/memory-storage/analytics/graph_data`, { end_user_id })
} }
// User Memory - User interest distribution // User Memory - User interest distribution
export const getHotMemoryTagsByUser = (end_user_id: string) => { export const getInterestDistributionByUser = (end_user_id: string) => {
return request.get(`/memory/analytics/hot_memory_tags/by_user`, { end_user_id }) return request.get(`/memory/analytics/interest_distribution/by_user`, { end_user_id })
} }
// User Memory - Total memory count // User Memory - Total memory count
export const getTotalMemoryCountByUser = (end_user_id: string) => { export const getTotalMemoryCountByUser = (end_user_id: string) => {

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-02-03 16:58:03 * @Date: 2026-02-03 16:58:03
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-03 13:46:22 * @Last Modified time: 2026-03-04 12:10:44
*/ */
/** /**
* Conversation Page * Conversation Page
@@ -267,8 +267,8 @@ const Conversation: FC = () => {
currentConversationId = newId currentConversationId = newId
break break
case 'message': case 'message':
const { content, chunk, conversation_id: curId } = item.data as { content: string; chunk: string; conversation_id: string; } const { content, conversation_id: curId } = item.data as { content: string; conversation_id: string; }
updateAssistantMessage(content ?? chunk) updateAssistantMessage(content)
if (curId) { if (curId) {
currentConversationId = curId; currentConversationId = curId;

View File

@@ -15,7 +15,7 @@ import { useParams } from 'react-router-dom'
import ReactEcharts from 'echarts-for-react'; import ReactEcharts from 'echarts-for-react';
import { Space } from 'antd' import { Space } from 'antd'
import { getHotMemoryTagsByUser } from '@/api/memory'; import { getInterestDistributionByUser } from '@/api/memory';
import Empty from '@/components/Empty'; import Empty from '@/components/Empty';
import Loading from '@/components/Empty/Loading'; import Loading from '@/components/Empty/Loading';
import RbCard from '@/components/RbCard/Card'; import RbCard from '@/components/RbCard/Card';
@@ -38,7 +38,7 @@ const InterestDistribution: FC = () => {
/** Fetch interest distribution data */ /** Fetch interest distribution data */
const getData = () => { const getData = () => {
setLoading(true) setLoading(true)
getHotMemoryTagsByUser(id as string).then(res => { getInterestDistributionByUser(id as string).then(res => {
const response = res as { name: string; frequency: number }[] const response = res as { name: string; frequency: number }[]
setData(response.map(item => ({ setData(response.map(item => ({
...item, ...item,

View File

@@ -6,6 +6,7 @@ import {
getShortTerm, getShortTerm,
} from '@/api/memory' } from '@/api/memory'
import Empty from '@/components/Empty' import Empty from '@/components/Empty'
import Markdown from '@/components/Markdown'
interface ShortTermItem { interface ShortTermItem {
retrieval: Array<{ query: string; retrieval: string[]; }>; retrieval: Array<{ query: string; retrieval: string[]; }>;
@@ -85,7 +86,9 @@ const ShortTermDetail: FC = () => {
))} ))}
<div> <div>
<div className="rb:font-medium rb:leading-5 rb:mb-1">{t('shortTermDetail.answer')}</div> <div className="rb:font-medium rb:leading-5 rb:mb-1">{t('shortTermDetail.answer')}</div>
<div className="rb:bg-[#FFFFFF] rb:border rb:border-[#DFE4ED] rb:rounded-md rb:px-3 rb:py-2.5 rb:leading-5">{vo.answer}</div> <div className="rb:bg-[#FFFFFF] rb:border rb:border-[#DFE4ED] rb:rounded-md rb:px-3 rb:py-2.5 rb:leading-5">
<Markdown content={vo.answer} />
</div>
</div> </div>
</Space> </Space>
</div> </div>
@@ -103,7 +106,9 @@ const ShortTermDetail: FC = () => {
: data.long_term?.map((vo, voIdx) => ( : data.long_term?.map((vo, voIdx) => (
<div key={voIdx} className="rb:leading-5 rb:shadow-[inset_3px_0px_0px_0px_#155EEF] rb:bg-[#FBFDFF] rb:border rb:border-[#DFE4ED] rb:rounded-lg rb:px-6 rb:py-3"> <div key={voIdx} className="rb:leading-5 rb:shadow-[inset_3px_0px_0px_0px_#155EEF] rb:bg-[#FBFDFF] rb:border rb:border-[#DFE4ED] rb:rounded-lg rb:px-6 rb:py-3">
<div className="rb:mb-1 rb:font-medium rb:leading-5.5">{vo.query}</div> <div className="rb:mb-1 rb:font-medium rb:leading-5.5">{vo.query}</div>
<div className="rb:mt-1 rb:leading-5 rb:text-[#5B6167] rb:text-[12px]">{vo.retrieval}</div> <div className="rb:mt-1 rb:leading-5 rb:text-[#5B6167] rb:text-[12px]">
<Markdown content={vo.retrieval} />
</div>
</div> </div>
)) ))
} }

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-02-06 21:10:56 * @Date: 2026-02-06 21:10:56
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-02-28 16:43:06 * @Last Modified time: 2026-03-04 12:10:17
*/ */
/** /**
* Workflow Chat Component * Workflow Chat Component
@@ -174,8 +174,8 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef }>(({ appId
*/ */
const handleStreamMessage = (data: SSEMessage[]) => { const handleStreamMessage = (data: SSEMessage[]) => {
data.forEach(item => { data.forEach(item => {
const { chunk, conversation_id, node_id, cycle_id, cycle_idx, input, output, error, elapsed_time, status } = item.data as { const { content, conversation_id, node_id, cycle_id, cycle_idx, input, output, error, elapsed_time, status } = item.data as {
chunk: string; content: string;
conversation_id: string | null; conversation_id: string | null;
cycle_id: string; cycle_id: string;
cycle_idx: number; cycle_idx: number;
@@ -202,7 +202,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef }>(({ appId
if (lastIndex >= 0) { if (lastIndex >= 0) {
newList[lastIndex] = { newList[lastIndex] = {
...newList[lastIndex], ...newList[lastIndex],
content: newList[lastIndex].content + chunk content: newList[lastIndex].content + content
} }
} }
return newList return newList