Fix/interest distribution (#445)

* [fix] Revising the judgment method for the interest analysis tags

* [fix] Revising the judgment method for the interest analysis tags

* [add] Set cache for the distribution of interest tags

* [fix] Revising the judgment method for the interest analysis tags

* [add] Set cache for the distribution of interest tags

* [changes] 1.Use structured logs;
          2.Align the type and default value of "end_user_id" with the semantic meaning of "required".
This commit is contained in:
Ke Sun
2026-03-04 14:06:50 +08:00
committed by GitHub
10 changed files with 390 additions and 34 deletions

View File

@@ -3,9 +3,10 @@ Cache 缓存模块
提供各种缓存功能的统一入口 提供各种缓存功能的统一入口
""" """
from .memory import EmotionMemoryCache, ImplicitMemoryCache from .memory import EmotionMemoryCache, ImplicitMemoryCache, InterestMemoryCache
__all__ = [ __all__ = [
"EmotionMemoryCache", "EmotionMemoryCache",
"ImplicitMemoryCache", "ImplicitMemoryCache",
"InterestMemoryCache",
] ]

View File

@@ -5,8 +5,10 @@ Memory 缓存模块
""" """
from .emotion_memory import EmotionMemoryCache from .emotion_memory import EmotionMemoryCache
from .implicit_memory import ImplicitMemoryCache from .implicit_memory import ImplicitMemoryCache
from .interest_memory import InterestMemoryCache
__all__ = [ __all__ = [
"EmotionMemoryCache", "EmotionMemoryCache",
"ImplicitMemoryCache", "ImplicitMemoryCache",
"InterestMemoryCache",
] ]

122
api/app/cache/memory/interest_memory.py vendored Normal file
View File

@@ -0,0 +1,122 @@
"""
Interest Distribution Cache
兴趣分布缓存模块
用于缓存用户的兴趣分布标签数据,避免重复调用模型生成
"""
import json
import logging
from typing import Optional, List, Dict, Any
from datetime import datetime
from app.aioRedis import aio_redis
logger = logging.getLogger(__name__)
# 缓存过期时间24小时
INTEREST_CACHE_EXPIRE = 86400
class InterestMemoryCache:
"""兴趣分布缓存类"""
PREFIX = "cache:memory:interest_distribution"
@classmethod
def _get_key(cls, end_user_id: str, language: str) -> str:
"""生成 Redis key
Args:
end_user_id: 用户ID
language: 语言类型
Returns:
完整的 Redis key
"""
return f"{cls.PREFIX}:by_user:{end_user_id}:{language}"
@classmethod
async def set_interest_distribution(
cls,
end_user_id: str,
language: str,
data: List[Dict[str, Any]],
expire: int = INTEREST_CACHE_EXPIRE,
) -> bool:
"""设置用户兴趣分布缓存
Args:
end_user_id: 用户ID
language: 语言类型
data: 兴趣分布列表,格式 [{"name": "...", "frequency": ...}, ...]
expire: 过期时间默认24小时
Returns:
是否设置成功
"""
try:
key = cls._get_key(end_user_id, language)
payload = {
"data": data,
"generated_at": datetime.now().isoformat(),
"cached": True,
}
value = json.dumps(payload, ensure_ascii=False)
await aio_redis.set(key, value, ex=expire)
logger.info(f"设置兴趣分布缓存成功: {key}, 过期时间: {expire}")
return True
except Exception as e:
logger.error(f"设置兴趣分布缓存失败: {e}", exc_info=True)
return False
@classmethod
async def get_interest_distribution(
cls,
end_user_id: str,
language: str,
) -> Optional[List[Dict[str, Any]]]:
"""获取用户兴趣分布缓存
Args:
end_user_id: 用户ID
language: 语言类型
Returns:
兴趣分布列表,缓存不存在或已过期返回 None
"""
try:
key = cls._get_key(end_user_id, language)
value = await aio_redis.get(key)
if value:
payload = json.loads(value)
logger.info(f"命中兴趣分布缓存: {key}")
return payload.get("data")
logger.info(f"兴趣分布缓存不存在或已过期: {key}")
return None
except Exception as e:
logger.error(f"获取兴趣分布缓存失败: {e}", exc_info=True)
return None
@classmethod
async def delete_interest_distribution(
cls,
end_user_id: str,
language: str,
) -> bool:
"""删除用户兴趣分布缓存
Args:
end_user_id: 用户ID
language: 语言类型
Returns:
是否删除成功
"""
try:
key = cls._get_key(end_user_id, language)
result = await aio_redis.delete(key)
logger.info(f"删除兴趣分布缓存: {key}, 结果: {result}")
return result > 0
except Exception as e:
logger.error(f"删除兴趣分布缓存失败: {e}", exc_info=True)
return False

View File

@@ -1,5 +1,6 @@
from typing import List, Optional from typing import List, Optional
from app.cache.memory.interest_memory import InterestMemoryCache
from app.celery_app import celery_app from app.celery_app import celery_app
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
from app.core.language_utils import get_language_from_header from app.core.language_utils import get_language_from_header
@@ -661,34 +662,56 @@ async def get_knowledge_type_stats_api(
return fail(BizCode.INTERNAL_ERROR, "获取知识库类型统计失败", str(e)) return fail(BizCode.INTERNAL_ERROR, "获取知识库类型统计失败", str(e))
@router.get("/analytics/hot_memory_tags/by_user", response_model=ApiResponse) @router.get("/analytics/interest_distribution/by_user", response_model=ApiResponse)
async def get_hot_memory_tags_by_user_api( async def get_interest_distribution_by_user_api(
end_user_id: Optional[str] = Query(None, description="用户ID可选"), end_user_id: str = Query(..., description="用户ID必填"),
limit: int = Query(20, description="返回标签数量限制"), limit: int = Query(5, le=5, description="返回兴趣标签数量限制最多5个"),
language_type: str = Header(default=None, alias="X-Language-Type"),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
db: Session=Depends(get_db), db: Session = Depends(get_db),
): ):
""" """
获取指定用户的热门记忆标签 获取指定用户的兴趣分布标签
注意:标签语言由写入时的 X-Language-Type 决定,查询时不进行翻译 与热门标签不同,此接口专注于识别用户的兴趣活动(运动、爱好、学习、创作等),
过滤掉纯物品、工具、地点等不代表用户主动参与活动的名词。
返回格式: 返回格式:
[ [
{"name": "标签", "frequency": 频次}, {"name": "兴趣活动", "frequency": 频次},
... ...
] ]
""" """
api_logger.info(f"Hot memory tags by user requested: end_user_id={end_user_id}") language = get_language_from_header(language_type)
api_logger.info(f"Interest distribution by user requested: end_user_id={end_user_id}, language={language}")
try: try:
result = await memory_agent_service.get_hot_memory_tags_by_user( # 优先读取缓存
cached = await InterestMemoryCache.get_interest_distribution(
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit language=language,
) )
return success(data=result, msg="获取热门记忆标签成功") if cached is not None:
api_logger.info(f"Interest distribution cache hit: end_user_id={end_user_id}")
return success(data=cached, msg="获取兴趣分布标签成功")
# 缓存未命中,调用模型生成
result = await memory_agent_service.get_interest_distribution_by_user(
end_user_id=end_user_id,
limit=limit,
language=language
)
# 写入缓存24小时过期
await InterestMemoryCache.set_interest_distribution(
end_user_id=end_user_id,
language=language,
data=result,
)
return success(data=result, msg="获取兴趣分布标签成功")
except Exception as e: except Exception as e:
api_logger.error(f"Hot memory tags by user failed: {str(e)}") api_logger.error(f"Interest distribution by user failed: {str(e)}")
return fail(BizCode.INTERNAL_ERROR, "获取热门记忆标签失败", str(e)) return fail(BizCode.INTERNAL_ERROR, "获取兴趣分布标签失败", str(e))
@router.get("/analytics/user_profile", response_model=ApiResponse) @router.get("/analytics/user_profile", response_model=ApiResponse)

View File

@@ -229,7 +229,7 @@ class Settings:
# General Ontology Type Configuration # General Ontology Type Configuration
# ======================================================================== # ========================================================================
# 通用本体文件路径列表(逗号分隔) # 通用本体文件路径列表(逗号分隔)
GENERAL_ONTOLOGY_FILES: str = os.getenv("GENERAL_ONTOLOGY_FILES", "app/core/memory/ontology_services/General_purpose_entity.ttl") GENERAL_ONTOLOGY_FILES: str = os.getenv("GENERAL_ONTOLOGY_FILES", "api/app/core/memory/ontology_services/General_purpose_entity.ttl")
# 是否启用通用本体类型功能 # 是否启用通用本体类型功能
ENABLE_GENERAL_ONTOLOGY_TYPES: bool = os.getenv("ENABLE_GENERAL_ONTOLOGY_TYPES", "true").lower() == "true" ENABLE_GENERAL_ONTOLOGY_TYPES: bool = os.getenv("ENABLE_GENERAL_ONTOLOGY_TYPES", "true").lower() == "true"

View File

@@ -1,9 +1,12 @@
import asyncio import asyncio
import json import json
import logging
import os import os
from typing import List, Tuple from typing import List, Tuple
from app.core.config import settings from app.core.config import settings
logger = logging.getLogger(__name__)
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context from app.db import get_db_context
from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.neo4j.neo4j_connector import Neo4jConnector
@@ -16,6 +19,10 @@ class FilteredTags(BaseModel):
"""用于接收LLM筛选后的核心标签列表的模型。""" """用于接收LLM筛选后的核心标签列表的模型。"""
meaningful_tags: List[str] = Field(..., description="从原始列表中筛选出的具有核心代表意义的名词列表。") meaningful_tags: List[str] = Field(..., description="从原始列表中筛选出的具有核心代表意义的名词列表。")
class InterestTags(BaseModel):
"""用于接收LLM筛选后的兴趣活动标签列表的模型。"""
interest_tags: List[str] = Field(..., description="从原始列表中筛选出的代表用户兴趣活动的标签列表。")
async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]: async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]:
""" """
使用LLM筛选标签列表仅保留具有代表性的核心名词。 使用LLM筛选标签列表仅保留具有代表性的核心名词。
@@ -85,10 +92,74 @@ async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]:
return structured_response.meaningful_tags return structured_response.meaningful_tags
except Exception as e: except Exception as e:
print(f"LLM筛选过程中发生错误: {e}") logger.error(f"LLM筛选过程中发生错误: {e}", exc_info=True)
# 在LLM失败时返回原始标签确保流程继续 # 在LLM失败时返回原始标签确保流程继续
return tags return tags
async def filter_interests_with_llm(tags: List[str], end_user_id: str, language: str = "zh") -> List[str]:
"""
使用LLM从标签列表中筛选出代表用户兴趣活动的标签。
与 filter_tags_with_llm 不同,此函数专注于识别"活动/行为"类兴趣,
过滤掉纯物品、工具、地点等不代表用户主动参与活动的名词。
Args:
tags: 原始标签列表
end_user_id: 用户ID用于获取LLM配置
Returns:
筛选后的兴趣活动标签列表
"""
try:
with get_db_context() as db:
from app.services.memory_agent_service import (
get_end_user_connected_config,
)
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id")
workspace_id = connected_config.get("workspace_id")
if not config_id and not workspace_id:
raise ValueError(
f"No memory_config_id found for end_user_id: {end_user_id}."
)
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id
)
if not memory_config.llm_model_id:
raise ValueError(
f"No llm_model_id found in memory config {config_id}."
)
factory = MemoryClientFactory(db)
llm_client = factory.get_llm_client(memory_config.llm_model_id)
tag_list_str = ", ".join(tags)
from app.core.memory.utils.prompt.prompt_utils import render_interest_filter_prompt
rendered_prompt = render_interest_filter_prompt(tag_list_str, language=language)
messages = [
{
"role": "user",
"content": rendered_prompt
}
]
structured_response = await llm_client.response_structured(
messages=messages,
response_model=InterestTags
)
return structured_response.interest_tags
except Exception as e:
logger.error(f"兴趣标签LLM筛选过程中发生错误: {e}", exc_info=True)
return tags
async def get_raw_tags_from_db( async def get_raw_tags_from_db(
connector: Neo4jConnector, connector: Neo4jConnector,
end_user_id: str, end_user_id: str,
@@ -183,3 +254,56 @@ async def get_hot_memory_tags(end_user_id: str, limit: int = 10, by_user: bool =
finally: finally:
# 确保关闭连接 # 确保关闭连接
await connector.close() await connector.close()
async def get_interest_distribution(end_user_id: str, limit: int = 10, by_user: bool = False, language: str = "zh") -> List[Tuple[str, int]]:
"""
获取用户的兴趣分布标签。
与 get_hot_memory_tags 不同,此函数使用专门针对"活动/行为"的LLM prompt
过滤掉纯物品、工具、地点等,只保留能代表用户兴趣爱好的活动类标签。
Args:
end_user_id: 必需参数。如果by_user=False则为end_user_id如果by_user=True则为user_id
limit: 最终返回的标签数量限制默认10
by_user: 是否按user_id查询默认False按end_user_id查询
Raises:
ValueError: 如果end_user_id未提供或为空
"""
if not end_user_id or not end_user_id.strip():
raise ValueError(
"end_user_id is required. Please provide a valid end_user_id or user_id."
)
connector = Neo4jConnector()
try:
# 查询更多原始标签给LLM提供充足上下文
query_limit = 40
raw_tags_with_freq = await get_raw_tags_from_db(connector, end_user_id, query_limit, by_user=by_user)
if not raw_tags_with_freq:
return []
raw_tag_names = [tag for tag, freq in raw_tags_with_freq]
raw_freq_map = {tag: freq for tag, freq in raw_tags_with_freq}
# 使用兴趣活动专用prompt进行筛选支持语义推断出新标签
interest_tag_names = await filter_interests_with_llm(raw_tag_names, end_user_id, language=language)
# 构建最终标签列表:
# - 原始标签中存在的,保留原始频率
# - LLM推断出的新标签不在原始列表中赋予默认频率1
final_tags = []
seen = set()
for tag in interest_tag_names:
if tag in seen:
continue
seen.add(tag)
freq = raw_freq_map.get(tag, 1)
final_tags.append((tag, freq))
# 按频率降序排列
final_tags.sort(key=lambda x: x[1], reverse=True)
return final_tags[:limit]
finally:
await connector.close()

View File

@@ -548,3 +548,20 @@ async def render_ontology_extraction_prompt(
}) })
return rendered_prompt return rendered_prompt
def render_interest_filter_prompt(tag_list: str, language: str = "zh") -> str:
"""
Renders the interest filter prompt using the interest_filter.jinja2 template.
Args:
tag_list: Comma-separated string of raw tags to filter
language: Output language ("zh" for Chinese, "en" for English)
Returns:
Rendered prompt content as string
"""
template = prompt_env.get_template("interest_filter.jinja2")
rendered_prompt = template.render(tag_list=tag_list, language=language)
log_prompt_rendering('interest filter', rendered_prompt)
return rendered_prompt

View File

@@ -0,0 +1,67 @@
{% if language == "zh" %}
You are a user interest analysis expert. Your task is to infer and extract the user's core hobby/interest activities from a tag list. The tags may be specific project names, tool names, or compound nouns — your job is to identify the underlying interest they represent.
**Step 1 - Infer the underlying interest from each tag**:
Look at each tag and ask: "What hobby or interest does this tag suggest the user has?"
Examples of inference:
- '攀岩', '室内攀岩馆', '攀岩者数据仪表盘', '路线解锁地图', '指力', '路线等级', '当日攀岩流畅度' → '攀岩'
- '风光摄影元数据增强器', 'EXIF数据', '.CR2文件', '.NEF文件', '日出拍摄点', '曝光补偿', '光圈', '太阳高度角', '云量预测图层' → '摄影'
- '晨间冥想坚持天数', '身心协同峰值' → '冥想'
- '川味可视化', '川菜' → '烹饪'
- '开源项目命名建议', 'climbviz', '可视化', '力量增长雷达图' → '编程' 或 '数据可视化'
- '吉他', '指弹', '琴谱' → '吉他'
- '跑步', '5公里', '跑鞋' → '跑步'
- '瑜伽垫', '瑜伽课' → '瑜伽'
**Step 2 - Consolidate and deduplicate**:
- Merge tags that point to the same interest into one representative label
- Use concise, standard hobby names (e.g., '攀岩', '摄影', '编程', '烹饪', '冥想', '吉他', '跑步')
- If multiple tags all point to '攀岩', output '攀岩' only once
**Step 3 - Filter out non-interest tags**:
Remove tags that do NOT suggest any hobby or interest:
- Generic system/assistant terms (e.g., '助手', '用户', 'AI')
- Pure abstract metrics with no clear hobby link (e.g., '完成时间', '日期', '自我评分')
- Location names with no clear hobby link (e.g., '青城山后山' alone — but if combined with photography context, infer '摄影')
**Output format**: Return a list of concise interest activity names in Chinese.
**Example**:
Input: ['攀岩', '攀岩者数据仪表盘', '路线解锁地图', '指力', '风光摄影元数据增强器', 'EXIF数据', '晨间冥想坚持天数', '川味可视化', '可视化', '助手', '完成时间']
Output: ['攀岩', '摄影', '冥想', '烹饪', '编程']
Now process the following tag list and return the inferred interest activities in Chinese: {{ tag_list }}
{% else %}
You are a user interest analysis expert. Your task is to infer and extract the user's core hobby/interest activities from a tag list. The tags may be specific project names, tool names, or compound nouns — your job is to identify the underlying interest they represent.
**Step 1 - Infer the underlying interest from each tag**:
Look at each tag and ask: "What hobby or interest does this tag suggest the user has?"
Examples of inference:
- 'rock climbing', 'indoor climbing gym', 'climber dashboard', 'route map', 'finger strength' → 'rock climbing'
- 'landscape photography metadata enhancer', 'EXIF data', 'sunrise shooting spot', 'exposure compensation' → 'photography'
- 'morning meditation streak', 'mind-body peak' → 'meditation'
- 'Sichuan cuisine visualization', 'Sichuan food' → 'cooking'
- 'open source project', 'data visualization tool', 'Python' → 'programming'
- 'guitar', 'fingerpicking', 'sheet music' → 'guitar'
- 'running', '5km', 'running shoes' → 'running'
**Step 2 - Consolidate and deduplicate**:
- Merge tags that point to the same interest into one representative label
- Use concise, standard hobby names (e.g., 'rock climbing', 'photography', 'programming', 'cooking', 'meditation')
- If multiple tags all point to 'rock climbing', output 'rock climbing' only once
**Step 3 - Filter out non-interest tags**:
Remove tags that do NOT suggest any hobby or interest:
- Generic system/assistant terms (e.g., 'assistant', 'user', 'AI')
- Pure abstract metrics with no clear hobby link (e.g., 'completion time', 'date', 'self-rating')
**Output format**: Return a list of concise interest activity names in English.
**Example**:
Input: ['rock climbing', 'climber dashboard', 'route map', 'finger strength', 'landscape photography metadata enhancer', 'EXIF data', 'morning meditation streak', 'Sichuan cuisine visualization', 'visualization', 'assistant', 'completion time']
Output: ['rock climbing', 'photography', 'meditation', 'cooking', 'programming']
Now process the following tag list and return the inferred interest activities in English: {{ tag_list }}
{% endif %}

View File

@@ -36,7 +36,7 @@ from app.core.memory.agent.utils.messages_tools import (
) )
from app.core.memory.agent.utils.type_classifier import status_typle from app.core.memory.agent.utils.type_classifier import status_typle
from app.core.memory.agent.utils.write_tools import write # 新增:直接导入 write 函数 from app.core.memory.agent.utils.write_tools import write # 新增:直接导入 write 函数
from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags, get_interest_distribution
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context from app.db import get_db_context
from app.models.knowledge_model import Knowledge, KnowledgeType from app.models.knowledge_model import Knowledge, KnowledgeType
@@ -890,36 +890,36 @@ class MemoryAgentService:
return result return result
async def get_hot_memory_tags_by_user(
async def get_interest_distribution_by_user(
self, self,
end_user_id: Optional[str] = None, end_user_id: Optional[str] = None,
limit: int = 20 limit: int = 5,
language: str = "zh"
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
获取指定用户的热门记忆标签 获取指定用户的兴趣分布标签
与热门标签不同,此接口专注于识别用户的兴趣活动(运动、爱好、学习等),
过滤掉纯物品、工具、地点等不代表用户主动参与活动的名词。
参数: 参数:
- end_user_id: 用户ID可选对应Neo4j中的end_user_id字段 - end_user_id: 用户ID必填)
- limit: 返回标签数量限制 - limit: 返回标签数量限制
- language: 输出语言("zh" 中文, "en" 英文)
返回格式: 返回格式:
[ [
{"name": "标签", "frequency": 频次}, {"name": "兴趣活动", "frequency": 频次},
... ...
] ]
注意:标签语言由写入时的 X-Language-Type 决定,查询时不进行翻译
""" """
try: try:
# by_user=False 表示按 end_user_id 查询在Neo4j中end_user_id就是用户维度 tags = await get_interest_distribution(end_user_id, limit=limit, by_user=False, language=language)
tags = await get_hot_memory_tags(end_user_id, limit=limit, by_user=False) return [{"name": tag, "frequency": freq} for tag, freq in tags]
payload = []
for tag, freq in tags:
payload.append({"name": tag, "frequency": freq})
return payload
except Exception as e: except Exception as e:
logger.error(f"热门记忆标签查询失败: {e}") logger.error(f"兴趣分布标签查询失败: {e}")
raise Exception(f"热门记忆标签查询失败: {e}") raise Exception(f"兴趣分布标签查询失败: {e}")
async def get_user_profile( async def get_user_profile(

View File

@@ -139,7 +139,7 @@ SMTP_USER=
SMTP_PASSWORD= SMTP_PASSWORD=
# 本体类型融合配置 (记得写入env_example) # 本体类型融合配置 (记得写入env_example)
GENERAL_ONTOLOGY_FILES=app/core/memory/ontology_services/General_purpose_entity.ttl # 指定要加载的本体文件路径,多个文件用逗号分隔 GENERAL_ONTOLOGY_FILES=api/app/core/memory/ontology_services/General_purpose_entity.ttl # 指定要加载的本体文件路径,多个文件用逗号分隔
ENABLE_GENERAL_ONTOLOGY_TYPES=true # 总开关,控制是否启用通用本体类型融合功能(false = 不使用任何本体类型指导) ENABLE_GENERAL_ONTOLOGY_TYPES=true # 总开关,控制是否启用通用本体类型融合功能(false = 不使用任何本体类型指导)
MAX_ONTOLOGY_TYPES_IN_PROMPT=100 # 限制传给 LLM 的类型数量,防止 Prompt 过长 MAX_ONTOLOGY_TYPES_IN_PROMPT=100 # 限制传给 LLM 的类型数量,防止 Prompt 过长
CORE_GENERAL_TYPES=Person,Organization,Place,Event,Work,Concept # 定义核心类型列表,这些类型会优先包含在合并结果中 CORE_GENERAL_TYPES=Person,Organization,Place,Event,Work,Concept # 定义核心类型列表,这些类型会优先包含在合并结果中