Merge pull request #686 from SuanmoSuanyangTechnology/feature/user-alias

Feature/user alias
This commit is contained in:
Ke Sun
2026-03-26 17:34:00 +08:00
committed by GitHub
17 changed files with 908 additions and 258 deletions

View File

@@ -23,9 +23,11 @@ from app.services.memory_entity_relationship_service import MemoryEntityService,
from app.schemas.response_schema import ApiResponse
from app.schemas.memory_storage_schema import GenerateCacheRequest
from app.repositories.workspace_repository import WorkspaceRepository
from app.schemas.end_user_schema import (
EndUserProfileResponse,
EndUserProfileUpdate,
from app.repositories.end_user_repository import EndUserRepository
from app.schemas.end_user_info_schema import (
EndUserInfoResponse,
EndUserInfoCreate,
EndUserInfoUpdate,
)
from app.models.end_user_model import EndUser
from app.dependencies import get_current_user
@@ -336,102 +338,118 @@ async def get_community_graph_data_api(
api_logger.error(f"社区图谱查询失败: end_user_id={end_user_id}, error={str(e)}")
return fail(BizCode.INTERNAL_ERROR, "社区图谱查询失败", str(e))
#=======================终端用户信息接口=======================
@router.get("/read_end_user/profile", response_model=ApiResponse)
async def get_end_user_profile(
end_user_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
@router.get("/end_user_info", response_model=ApiResponse)
async def get_end_user_info(
end_user_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
workspace_id = current_user.current_workspace_id
workspace_repo = WorkspaceRepository(db)
workspace_models = workspace_repo.get_workspace_models_configs(workspace_id)
"""
查询终端用户信息记录
根据 end_user_id 查询单条终端用户信息记录。
"""
workspace_id = current_user.current_workspace_id
if workspace_models:
model_id = workspace_models.get("llm", None)
else:
model_id = None
# 检查用户是否已选择工作空间
if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试查询用户信息但未选择工作空间")
api_logger.warning(f"用户 {current_user.username} 尝试查询终端用户信息但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
api_logger.info(
f"用户信息查询请求: end_user_id={end_user_id}, user={current_user.username}, "
f"查询终端用户信息请求: end_user_id={end_user_id}, user={current_user.username}, "
f"workspace={workspace_id}"
)
try:
# 查询终端用户
end_user = db.query(EndUser).filter(EndUser.id == end_user_id).first()
if not end_user:
api_logger.warning(f"终端用户不存在: end_user_id={end_user_id}")
return fail(BizCode.INVALID_PARAMETER, "终端用户不存在", f"end_user_id={end_user_id}")
# 构建响应数据
profile_data = EndUserProfileResponse(
id=end_user.id,
other_name=end_user.other_name,
position=end_user.position,
department=end_user.department,
contact=end_user.contact,
phone=end_user.phone,
hire_date=end_user.hire_date,
updatetime_profile=end_user.updatetime_profile
# 校验 end_user 是否属于当前工作空间
end_user_repo = EndUserRepository(db)
end_user = end_user_repo.get_end_user_by_id(end_user_id)
if end_user is None:
return fail(BizCode.USER_NOT_FOUND, "终端用户不存在", "end_user not found")
if str(end_user.workspace_id) != str(workspace_id):
api_logger.warning(
f"用户 {current_user.username} 尝试查询不属于工作空间 {workspace_id} 的终端用户 {end_user_id}"
)
return fail(BizCode.PERMISSION_DENIED, "该终端用户不属于当前工作空间", "end_user workspace mismatch")
api_logger.info(f"成功获取用户信息: end_user_id={end_user_id}")
return success(data=UserMemoryService.convert_profile_to_dict_with_timestamp(profile_data), msg="查询成功")
except Exception as e:
api_logger.error(f"用户信息查询失败: end_user_id={end_user_id}, error={str(e)}")
return fail(BizCode.INTERNAL_ERROR, "用户信息查询失败", str(e))
@router.post("/updated_end_user/profile", response_model=ApiResponse)
async def update_end_user_profile(
profile_update: EndUserProfileUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
"""
更新终端用户的基本信息
该接口可以更新用户的姓名、职位、部门、联系方式、电话和入职日期等信息。
所有字段都是可选的,只更新提供的字段。
"""
workspace_id = current_user.current_workspace_id
end_user_id = profile_update.end_user_id
# 验证工作空间
if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试更新用户信息但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
api_logger.info(
f"用户信息更新请求: end_user_id={end_user_id}, user={current_user.username}, "
f"workspace={workspace_id}"
)
# 调用 Service 层处理业务逻辑
result = user_memory_service.update_end_user_profile(db, end_user_id, profile_update)
result = user_memory_service.get_end_user_info(db, end_user_id)
if result["success"]:
api_logger.info(f"成功更新用户信息: end_user_id={end_user_id}")
api_logger.info(f"成功查询终端用户信息: end_user_id={end_user_id}")
return success(data=result["data"], msg="查询成功")
else:
error_msg = result["error"]
api_logger.error(f"查询终端用户信息失败: end_user_id={end_user_id}, error={error_msg}")
if error_msg == "终端用户信息记录不存在":
return fail(BizCode.USER_NOT_FOUND, "终端用户信息记录不存在", error_msg)
elif error_msg == "无效的终端用户ID格式":
return fail(BizCode.INVALID_USER_ID, "无效的终端用户ID格式", error_msg)
else:
return fail(BizCode.INTERNAL_ERROR, "查询终端用户信息失败", error_msg)
@router.post("/end_user_info/updated", response_model=ApiResponse)
async def update_end_user_info(
info_update: EndUserInfoUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
"""
更新终端用户信息记录
根据 end_user_id 更新终端用户信息记录,支持批量更新多个别名。
示例请求体:
{
"end_user_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"other_name": "张三1",
"aliases": ["小张", "张工"],
"meta_data": {"position": "工程师", "department": "技术部"}
}
"""
workspace_id = current_user.current_workspace_id
end_user_id = info_update.end_user_id
if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试更新终端用户信息但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
api_logger.info(
f"更新终端用户信息请求: end_user_id={end_user_id}, user={current_user.username}, "
f"workspace={workspace_id}"
)
# 校验 end_user 是否属于当前工作空间
end_user_repo = EndUserRepository(db)
end_user = end_user_repo.get_end_user_by_id(end_user_id)
if end_user is None:
return fail(BizCode.USER_NOT_FOUND, "终端用户不存在", "end_user not found")
if str(end_user.workspace_id) != str(workspace_id):
api_logger.warning(
f"用户 {current_user.username} 尝试更新不属于工作空间 {workspace_id} 的终端用户 {end_user_id}"
)
return fail(BizCode.PERMISSION_DENIED, "该终端用户不属于当前工作空间", "end_user workspace mismatch")
# 获取更新数据(排除 end_user_id
update_data = info_update.model_dump(exclude_unset=True, exclude={'end_user_id'})
result = user_memory_service.update_end_user_info(db, end_user_id, update_data)
if result["success"]:
api_logger.info(f"成功更新终端用户信息: end_user_id={end_user_id}")
return success(data=result["data"], msg="更新成功")
else:
error_msg = result["error"]
api_logger.error(f"用户信息更新失败: end_user_id={end_user_id}, error={error_msg}")
# 根据错误类型映射到合适的业务错误码
if error_msg == "终端用户不存在":
return fail(BizCode.USER_NOT_FOUND, "终端用户不存在", error_msg)
elif error_msg == "无效的用户ID格式":
return fail(BizCode.INVALID_USER_ID, "无效的用户ID格式", error_msg)
api_logger.error(f"终端用户信息更新失败: end_user_id={end_user_id}, error={error_msg}")
if error_msg == "终端用户信息记录不存在":
return fail(BizCode.USER_NOT_FOUND, "终端用户信息记录不存在", error_msg)
elif error_msg == "无效的终端用户ID格式":
return fail(BizCode.INVALID_USER_ID, "无效的终端用户ID格式", error_msg)
else:
# 只有未预期的错误才使用 INTERNAL_ERROR
return fail(BizCode.INTERNAL_ERROR, "用户信息更新失败", error_msg)
return fail(BizCode.INTERNAL_ERROR, "终端用户信息更新失败", error_msg)
@router.get("/memory_space/timeline_memories", response_model=ApiResponse)
async def memory_space_timeline_of_shared_memories(

View File

@@ -39,6 +39,30 @@
比如:输入历史信息内容:[{'Query': '4月27日我和你推荐过一本书书名是什么', 'ANswer': '张曼玉推荐了《小王子》'}]
拆分问题4月27日我和你推荐过一本书书名是什么可以拆分为4月27日张曼玉推荐过一本书书名是什么
## 指代消歧规则Coreference Resolution
在拆分问题时,必须解析并替换所有指代词和抽象称呼,使问题具体化:
1. **"用户"的消歧**
- "用户是谁?" → 分析历史记录,找出对话发起者的姓名
- 如果历史中有"我叫X"、"我的名字是X"、或多次提到某个人物,则"用户"指的就是这个人
- 示例:历史中有"老李的原名叫李建国",则"用户是谁?"应拆分为"李建国是谁?"或"老李(李建国)是谁?"
2. **"我"的消歧**
- "我喜欢什么?" → 从历史中找出对话发起者的姓名,替换为"X喜欢什么"
- 示例:历史中有"张曼玉推荐了《小王子》",则"我推荐的书是什么?"应拆分为"张曼玉推荐的书是什么?"
3. **"他/她/它"的消歧**
- 从上下文或历史中找出最近提到的同类实体
- 示例:历史中有"老李的同事叫他建国哥",则"他的同事怎么称呼他?"应拆分为"老李的同事怎么称呼他?"
4. **"那个人/这个人"的消歧**
- 从历史中找出最近提到的人物
- 示例:历史中有"李建国",则"那个人的原名是什么?"应拆分为"李建国的原名是什么?"
5. **优先级**
- 如果历史记录中反复出现某个人物(如"老李"、"李建国"、"建国哥"),则"用户"很可能指的就是这个人
- 如果无法从历史中确定指代对象保留原问题但在reason中说明"无法确定指代对象"
输出要求:
@@ -71,6 +95,34 @@
"reason": "输出原问题的关键要素"
}
]
## 指代消歧示例(重要):
示例1 - "用户"的消歧:
输入历史:[{'Query': '老李的原名叫什么?', 'Answer': '李建国'}, {'Query': '老李的同事叫他什么?', 'Answer': '建国哥'}]
输入问题:"用户是谁?"
输出:
[
{
"original_question": "用户是谁?",
"extended_question": "李建国是谁?",
"type": "单跳",
"reason": "历史中反复提到'老李/李建国/建国哥''用户'指的就是对话发起者李建国"
}
]
示例2 - "我"的消歧:
输入历史:[{'Query': '张曼玉推荐了什么书?', 'Answer': '《小王子》'}]
输入问题:"我推荐的书是什么?"
输出:
[
{
"original_question": "我推荐的书是什么?",
"extended_question": "张曼玉推荐的书是什么?",
"type": "单跳",
"reason": "历史中提到张曼玉推荐了书,'我'指的就是张曼玉"
}
]
**Output format**
**CRITICAL JSON FORMATTING REQUIREMENTS:**
1. Use only standard ASCII double quotes (") for JSON structure - never use Chinese quotation marks ("") or other Unicode quotes

View File

@@ -27,6 +27,30 @@
比如:输入历史信息内容:[{'Query': '4月27日我和你推荐过一本书书名是什么', 'ANswer': '张曼玉推荐了《小王子》'}]
拆分问题4月27日我和你推荐过一本书书名是什么可以拆分为4月27日张曼玉推荐过一本书书名是什么
## 指代消歧规则Coreference Resolution
在拆分问题时,必须解析并替换所有指代词和抽象称呼,使问题具体化:
1. **"用户"的消歧**
- "用户是谁?" → 分析历史记录,找出对话发起者的姓名
- 如果历史中有"我叫X"、"我的名字是X"、或多次提到某个人物(如"老李"、"李建国"),则"用户"指的就是这个人
- 示例:历史中反复出现"老李/李建国/建国哥",则"用户是谁?"应拆分为"李建国是谁?"或"老李(李建国)是谁?"
2. **"我"的消歧**
- "我喜欢什么?" → 从历史中找出对话发起者的姓名,替换为"X喜欢什么"
- 示例:历史中有"张曼玉推荐了《小王子》",则"我推荐的书是什么?"应拆分为"张曼玉推荐的书是什么?"
3. **"他/她/它"的消歧**
- 从上下文或历史中找出最近提到的同类实体
- 示例:历史中有"老李的同事叫他建国哥",则"他的同事怎么称呼他?"应拆分为"老李的同事怎么称呼他?"
4. **"那个人/这个人"的消歧**
- 从历史中找出最近提到的人物
- 示例:历史中有"李建国",则"那个人的原名是什么?"应拆分为"李建国的原名是什么?"
5. **优先级**
- 如果历史记录中反复出现某个人物(如"老李"、"李建国"、"建国哥"),则"用户"很可能指的就是这个人
- 如果无法从历史中确定指代对象保留原问题但在reason中说明"无法确定指代对象"
## 指令:
你是一个智能数据拆分助手,请根据数据特性判断输入属于哪种类型:
单跳Single-hop
@@ -151,6 +175,34 @@
]
- 必须通过json.loads()的格式支持的形式输出
- 必须通过json.loads()的格式支持的形式输出,响应必须是与此确切模式匹配的有效JSON对象。不要在JSON之前或之后包含任何文本。
## 指代消歧示例(重要):
示例1 - "用户"的消歧:
输入历史:[{'Query': '老李的原名叫什么?', 'Answer': '李建国'}, {'Query': '老李的同事叫他什么?', 'Answer': '建国哥'}]
输入问题:"用户是谁?"
输出:
[
{
"id": "Q1",
"question": "李建国是谁?",
"type": "单跳",
"reason": "历史中反复提到'老李/李建国/建国哥''用户'指的就是对话发起者李建国"
}
]
示例2 - "我"的消歧:
输入历史:[{'Query': '张曼玉推荐了什么书?', 'Answer': '《小王子》'}]
输入问题:"我推荐的书是什么?"
输出:
[
{
"id": "Q1",
"question": "张曼玉推荐的书是什么?",
"type": "单跳",
"reason": "历史中提到张曼玉推荐了书,'我'指的就是张曼玉"
}
]
- 关键的JSON格式要求
1.JSON结构仅使用标准ASCII双引号-切勿使用中文引号“”或其他Unicode引号
2.如果提取的语句文本包含引号,请使用反斜杠(\“)正确转义它们

View File

@@ -176,6 +176,22 @@ async def write(
)
if success:
logger.info("Successfully saved all data to Neo4j")
# 同步用户别名到 PostgreSQL
try:
# 创建一个临时的 orchestrator 实例来调用同步方法
temp_orchestrator = ExtractionOrchestrator(
llm_client=llm_client,
embedder_client=embedder_client,
connector=neo4j_connector,
embedding_id=embedding_model_id
)
await temp_orchestrator._update_end_user_other_name(all_entity_nodes, chunked_dialogs)
logger.info("Successfully synced user aliases to PostgreSQL")
except Exception as sync_error:
logger.error(f"Failed to sync user aliases to PostgreSQL: {sync_error}", exc_info=True)
# 不影响主流程
# 写入成功后,同步等待聚类完成(避免与 Memory Summary 并发冲突)
await _trigger_clustering_sync(
all_entity_nodes,

View File

@@ -203,6 +203,7 @@ def accurate_match(
) -> Tuple[List[ExtractedEntityNode], Dict[str, str], Dict[str, Dict]]:
"""
精确匹配:按 (end_user_id, name, entity_type) 合并实体并建立重定向与合并记录。
同时检测某实体的 name 是否命中另一实体的 aliases若命中则直接合并。
返回: (deduped_entities, id_redirect, exact_merge_map)
"""
exact_merge_map: Dict[str, Dict] = {}
@@ -240,6 +241,48 @@ def accurate_match(
pass
deduped_entities = list(canonical_map.values())
# 2) 第二轮:检测某实体的 name 是否命中另一实体的 aliasesalias-to-name 精确合并)
# 场景LLM 把 aliases 中的词(如"齐齐")又单独抽取为独立实体,需在此阶段合并掉
# 优化:先构建 (end_user_id, alias_lower) -> canonical 的反向索引,查找 O(1)
alias_index: Dict[tuple, ExtractedEntityNode] = {}
for canonical in deduped_entities:
uid = getattr(canonical, "end_user_id", None)
for alias in (getattr(canonical, "aliases", []) or []):
alias_lower = alias.strip().lower()
if alias_lower:
alias_index[(uid, alias_lower)] = canonical
i = 0
while i < len(deduped_entities):
ent = deduped_entities[i]
ent_name = (getattr(ent, "name", "") or "").strip().lower()
ent_uid = getattr(ent, "end_user_id", None)
canonical = alias_index.get((ent_uid, ent_name))
# 确保不是自身
if canonical is not None and canonical.id != ent.id:
_merge_attribute(canonical, ent)
id_redirect[ent.id] = canonical.id
for k, v in list(id_redirect.items()):
if v == ent.id:
id_redirect[k] = canonical.id
try:
k = f"{canonical.end_user_id}|{(canonical.name or '').strip()}|{(canonical.entity_type or '').strip()}"
if k not in exact_merge_map:
exact_merge_map[k] = {
"canonical_id": canonical.id,
"end_user_id": canonical.end_user_id,
"name": canonical.name,
"entity_type": canonical.entity_type,
"merged_ids": set(),
}
exact_merge_map[k]["merged_ids"].add(ent.id)
except Exception:
pass
deduped_entities.pop(i)
else:
i += 1
return deduped_entities, id_redirect, exact_merge_map
def fuzzy_match(

View File

@@ -19,6 +19,7 @@
import asyncio
import logging
import os
import uuid
from datetime import datetime
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
@@ -62,6 +63,10 @@ from app.core.memory.storage_services.extraction_engine.pipeline_help import (
export_test_input_doc,
)
from app.core.memory.utils.data.ontology import TemporalInfo
from app.db import get_db_context
from app.models.end_user_info_model import EndUserInfo
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
# 配置日志
@@ -1324,6 +1329,151 @@ class ExtractionOrchestrator:
perceptual_edges
)
async def _update_end_user_other_name(
self,
entity_nodes: List[ExtractedEntityNode],
dialog_data_list: List[DialogData]
) -> None:
"""
从 Neo4j 读取用户实体的最终 aliases同步到 end_user 和 end_user_info 表
注意:
1. other_name 使用本次对话提取的第一个别名(保持时间顺序)
2. aliases 从 Neo4j 读取(保持完整性)
Args:
entity_nodes: 实体节点列表
dialog_data_list: 对话数据列表
"""
try:
if not dialog_data_list:
logger.warning("dialog_data_list 为空,跳过用户别名同步")
return
end_user_id = dialog_data_list[0].end_user_id
if not end_user_id:
logger.warning("end_user_id 为空,跳过用户别名同步")
return
# 1. 提取本次对话的用户别名(保持 LLM 提取的原始顺序,不排序)
current_aliases = self._extract_current_aliases(entity_nodes)
# 2. 从 Neo4j 获取完整 aliases权威数据源
neo4j_aliases = await self._fetch_neo4j_user_aliases(end_user_id)
if not neo4j_aliases:
# Neo4j 中没有别名,使用本次对话提取的别名
neo4j_aliases = current_aliases
if not neo4j_aliases:
logger.debug(f"aliases 为空,跳过同步: end_user_id={end_user_id}")
return
logger.info(f"本次对话提取的 aliases: {current_aliases}")
logger.info(f"Neo4j 中的完整 aliases: {neo4j_aliases}")
# 3. 同步到数据库
end_user_uuid = uuid.UUID(end_user_id)
with get_db_context() as db:
# 更新 end_user 表
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if not end_user:
logger.warning(f"未找到 end_user_id={end_user_id} 的用户记录")
return
new_name = self._resolve_other_name(end_user.other_name, current_aliases, neo4j_aliases)
if new_name is not None:
end_user.other_name = new_name
logger.info(f"更新 end_user 表 other_name → {new_name}")
else:
logger.debug(f"end_user 表 other_name 保持不变: {end_user.other_name}")
# 更新或创建 end_user_info 记录
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
if info:
new_name_info = self._resolve_other_name(info.other_name, current_aliases, neo4j_aliases)
if new_name_info is not None:
info.other_name = new_name_info
logger.info(f"更新 end_user_info 表 other_name → {new_name_info}")
if info.aliases != neo4j_aliases:
info.aliases = neo4j_aliases
logger.info(f"同步 Neo4j aliases 到 end_user_info: {neo4j_aliases}")
else:
first_alias = current_aliases[0].strip() if current_aliases else ""
if first_alias:
db.add(EndUserInfo(
end_user_id=end_user_uuid,
other_name=first_alias,
aliases=neo4j_aliases,
meta_data={}
))
logger.info(f"创建 end_user_info 记录other_name={first_alias}, aliases={neo4j_aliases}")
db.commit()
except Exception as e:
logger.error(f"更新 end_user other_name 失败: {e}", exc_info=True)
def _extract_current_aliases(self, entity_nodes: List[ExtractedEntityNode]) -> List[str]:
"""从实体节点提取用户别名(保持 LLM 提取的原始顺序,不进行任何排序)
这个方法直接返回 LLM 提取的别名列表,不做任何修改。
第一个别名将被用作 other_name。
Args:
entity_nodes: 实体节点列表
Returns:
别名列表(保持 LLM 提取的原始顺序)
"""
USER_NAMES = {'用户', '', 'User', 'I'}
for entity in entity_nodes:
if getattr(entity, 'name', '').strip() in USER_NAMES:
aliases = getattr(entity, 'aliases', []) or []
logger.debug(f"提取到用户别名(原始顺序): {aliases}")
return aliases
return []
async def _fetch_neo4j_user_aliases(self, end_user_id: str) -> List[str]:
"""从 Neo4j 查询用户实体的完整 aliases 列表"""
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '', 'User', 'I']
RETURN e.aliases AS aliases
LIMIT 1
"""
result = await Neo4jConnector().execute_query(cypher, end_user_id=end_user_id)
if not result:
logger.debug(f"Neo4j 中未找到用户实体: end_user_id={end_user_id}")
return []
aliases = result[0].get('aliases') or []
if not aliases:
logger.debug(f"Neo4j 用户实体 aliases 为空: end_user_id={end_user_id}")
return aliases
def _resolve_other_name(
self,
current: Optional[str],
current_aliases: List[str],
neo4j_aliases: List[str]
) -> Optional[str]:
"""
决定 other_name 是否需要更新,返回新值;无需更新返回 None。
决策规则:
- 为空 → 用本次对话第一个别名
- 不在 Neo4j aliases 中 → 用 Neo4j 第一个别名(说明已被删除)
- 否则 → 保持不变(返回 None
"""
if not current or not current.strip():
return current_aliases[0].strip() if current_aliases else None
if current not in neo4j_aliases:
return neo4j_aliases[0].strip() if neo4j_aliases else None
return None
async def _run_dedup_and_write_summary(
self,
dialogue_nodes: List[DialogueNode],

View File

@@ -5,6 +5,15 @@
===Task===
Extract entities and knowledge triplets from the given statement.
**⚠️ CRITICAL REQUIREMENTS:**
1. **ALIASES ORDER IS CRITICAL**: The FIRST alias in the array will be used as the user's primary display name (other_name). You MUST put the most important/frequently used name FIRST.
2. **ALWAYS include aliases field**: Even if empty, you MUST include "aliases": [] in EVERY entity.
<!-- TODO: v0.2.10 - denied_aliases 功能暂时禁用,将通过 Cypher 查询实现
2. **DENIED_ALIASES**: When user explicitly denies a name (e.g., "我不叫X", "I'm not called X"), you MUST put X in denied_aliases field, NOT in aliases.
3. **ALWAYS include both fields**: Even if empty, you MUST include "aliases": [] and "denied_aliases": [] in EVERY entity.
-->
{% if language == "zh" %}
**重要请使用中文生成实体名称name、描述description和示例example。**
{% else %}
@@ -18,34 +27,29 @@ Extract entities and knowledge triplets from the given statement.
{% if ontology_types %}
===Ontology Type Guidance===
**CRITICAL RULE: You MUST ONLY use the predefined ontology type names listed below for the entity "type" field. Do NOT use any other type names, even if they seem reasonable.**
**CRITICAL: Use ONLY predefined type names below. If no exact match, use CLOSEST type. NEVER invent new types.**
**If no predefined type fits an entity, use the CLOSEST matching predefined type. NEVER invent new type names.**
**Type Priority:**
1. [场景类型] Scene Types (domain-specific, prefer first)
2. [通用类型] General Types (standard ontologies)
3. [通用父类] Parent Types (hierarchy context)
**Type Priority (from highest to lowest):**
1. **[场景类型] Scene Types** - Domain-specific types, ALWAYS prefer these first
2. **[通用类型] General Types** - Common types from standard ontologies (DBpedia)
3. **[通用父类] Parent Types** - Provide type hierarchy context
**Rules:**
- Type MUST exactly match predefined names
- Do NOT modify, translate, or abbreviate type names
- Prefer scene types over general types
**Type Matching Rules:**
- Entity type MUST exactly match one of the predefined type names below
- Do NOT use types like "Equipment", "Component", "Concept", "Action", "Condition", "Data", "Duration" unless they appear in the predefined list
- Do NOT modify, translate, abbreviate, or create variations of type names
- Prefer scene types (marked [场景类型]) over general types when both could apply
- If uncertain, check the type description to find the best match
**Predefined Ontology Types:**
**Predefined Types:**
{{ ontology_types }}
{% if type_hierarchy_hints %}
**Type Hierarchy Reference:**
The following shows type inheritance relationships (Child → Parent → Grandparent):
**Hierarchy:**
{% for hint in type_hierarchy_hints %}
- {{ hint }}
{% endfor %}
{% endif %}
**ALLOWED Type Names (use EXACTLY one of these, no exceptions):**
**ALLOWED Names:**
{{ ontology_type_names | join(', ') }}
{% endif %}
@@ -62,66 +66,88 @@ The following shows type inheritance relationships (Child → Parent → Grandpa
- **Entity descriptions must be in English**
- **Examples must be in English**
{% endif %}
- **Semantic Memory Classification (is_explicit_memory):**
* Set to `true` if the entity represents **explicit/semantic memory**:
- **Concepts:** "Machine Learning", "Photosynthesis", "Democracy"
- **Knowledge:** "Python Programming Language", "Theory of Relativity"
- **Definitions:** "API (Application Programming Interface)", "REST API"
- **Principles:** "SOLID Principles", "First Law of Thermodynamics"
- **Theories:** "Evolution Theory", "Quantum Mechanics"
- **Methods/Techniques:** "Agile Development", "Machine Learning Algorithm"
- **Technical Terms:** "Neural Network", "Database"
* Set to `false` for:
- **People:** "John Smith", "Dr. Wang"
- **Organizations:** "Microsoft", "Harvard University"
- **Locations:** "Beijing", "Central Park"
- **Events:** "2024 Conference", "Project Meeting"
- **Specific objects:** "iPhone 15", "Building A"
- **Example Generation (IMPORTANT for semantic memory entities):**
* For entities where `is_explicit_memory=true`, generate a **concise example (around 20 characters)** to help understand the concept
* The example should be:
- **Specific and concrete**: Use real-world scenarios or applications
- **Brief**: Around 20 characters (can be slightly longer if needed for clarity)
- **Semantic Memory (is_explicit_memory):**
* `true` for: Concepts, Knowledge, Definitions, Theories, Methods (e.g., "Machine Learning", "REST API")
* `false` for: People, Organizations, Locations, Events, Specific objects
* For `is_explicit_memory=true`, provide concise example (~20 chars{% if language == "zh" %},使用中文{% endif %})
**🚨🚨🚨 ALIASES & DENIED_ALIASES - MANDATORY FIELDS 🚨🚨🚨**
**CRITICAL RULES (违反将导致提取失败):**
1. **EVERY entity MUST have aliases field:**
- `"aliases": [...]` - REQUIRED, even if empty `[]`
2. **ALIASES - 别名提取规则:**
{% if language == "zh" %}
- **使用中文**
- 包含:昵称、全名、简称、别称、网名等
- 顺序:**第一个别名将作为用户的主显示名称other_name必须把最重要/最常用的名字放在第一位**
- 提取顺序:严格按照对话中首次出现的顺序
- 示例:
* "我叫张三,大家叫我小张" → aliases=["张三", "小张"](张三是第一个,将成为 other_name
* "大家叫我小李,我全名叫李明" → aliases=["小李", "李明"](小李先出现,将成为 other_name
- 空值:如果没有别名,使用 `[]`
- 重要:只提取本次对话中明确提到的别名,不要推测或添加未提及的名字
{% else %}
- **In English**
- Include: nicknames, full names, abbreviations, alternative names
- Order: **The FIRST alias will be used as the user's primary display name (other_name). Put the most important/frequently used name FIRST**
- Extraction order: Strictly follow the order of first appearance in conversation
- Examples:
* "I'm John, people call me Johnny" → aliases=["John", "Johnny"] (John is first, will become other_name)
* "People call me Mike, my full name is Michael" → aliases=["Mike", "Michael"] (Mike appears first, will become other_name)
- Empty: If no aliases, use `[]`
- Important: Only extract aliases explicitly mentioned in current conversation, do not infer or add unmentioned names
{% endif %}
* For non-semantic entities (`is_explicit_memory=false`), the example field can be empty
- **Aliases Extraction:**
3. **USER ENTITY SPECIAL HANDLING:**
{% if language == "zh" %}
* 别名使用中文
- 用户实体的 name 字段:使用 "用户" 或 "我"
- 用户的真实姓名:放入 aliases
- 示例:
* "我叫李明" → name="用户", aliases=["李明"]
{% else %}
* Aliases should be in English
- User entity name field: use "User" or "I"
- User's real name: put in aliases
- Examples:
* "I'm John" → name="User", aliases=["John"]
{% endif %}
* Include common alternative names, abbreviations and full names
* If no aliases exist, use empty array: []
- Exclude lengthy quotes, calendar dates, temporal ranges, and temporal expressions
- For numeric values: extract as separate entities (instance_of: 'Numeric', name: units, numeric_value: value)
Example: £30 → name: 'GBP', numeric_value: 30, instance_of: 'Numeric'
4. **ALIASES ORDER:**
{% if language == "zh" %}
- 顺序优先级:按出现顺序,先出现的在前
{% else %}
- Order priority: by appearance order, first mentioned comes first
{% endif %}
**EXAMPLES OF CORRECT EXTRACTION:**
{% if language == "zh" %}
- "我叫张三" → aliases=["张三"] (张三将成为 other_name
- "大家叫我小明,我全名叫李明" → aliases=["小明", "李明"] (小明先出现,将成为 other_name
- "我是李华,网名叫华仔" → aliases=["李华", "华仔"] (李华先出现,将成为 other_name
{% else %}
- "I'm John" → aliases=["John"] (John will become other_name)
- "People call me Mike, my full name is Michael" → aliases=["Mike", "Michael"] (Mike appears first, will become other_name)
- "I'm John Smith, username JSmith" → aliases=["John Smith", "JSmith"] (John Smith appears first, will become other_name)
{% endif %}
- Exclude lengthy quotes, dates, temporal expressions
- Numeric values: extract as entities (instance_of: 'Numeric', name: units, numeric_value: value)
**Triplet Extraction:**
- Extract (subject, predicate, object) triplets where:
- Subject: main entity performing the action or being described
- Predicate: relationship between entities (e.g., 'is', 'works at', 'believes')
- Object: entity, value, or concept affected by the predicate
- Extract (subject, predicate, object) where subject/object are entities, predicate is relationship
{% if language == "zh" %}
- subject_name 和 object_name 必须使用中文
- subject_name 和 object_name 使用中文
{% else %}
- subject_name and object_name must be in English (translate if original is in another language)
- subject_name and object_name in English
{% endif %}
- Exclude all temporal expressions from every field
- Use ONLY the predicates listed in "Predicate Instructions" (uppercase English tokens)
- Do NOT translate predicate tokens
- Do NOT include `statement_id` field (assigned automatically)
**When NOT to extract triplets:**
- Non-propositional utterances (emotions, fillers, onomatopoeia)
- No clear predicate from the given definitions applies
- Standalone noun phrases or checklist items → extract as entities only
- Do NOT invent generic predicates (e.g., "IS_DOING", "FEELS", "MENTIONS")
**If no valid triplet exists:** Return triplets: [], extract entities if present, otherwise both arrays empty.
- Use ONLY predicates from "Predicate Instructions" (uppercase tokens)
- Exclude temporal expressions, do NOT include `statement_id`
- **When NOT to extract:** emotions, fillers, no clear predicate, standalone nouns
- **If no valid triplet:** Return triplets: []
{%- if predicate_instructions -%}
**Predicate Instructions:**
@@ -207,26 +233,44 @@ Output:
{"entity_idx": 0, "name": "三脚架", "type": "Equipment", "description": "摄影器材配件", "example": "", "aliases": ["相机三脚架"], "is_explicit_memory": false}
]
}
**Example 4 (别名 - Chinese):** "我的名字是乐力齐,我的小名是齐齐,同事们都叫我小乐"
Output:
{
"triplets": [],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "Person", "description": "用户本人", "example": "", "aliases": ["乐力齐", "齐齐", "小乐"], "is_explicit_memory": false}
]
}
**Example 5 (别名顺序 - Chinese):** "我叫陈思远。对了,我的网名叫「远山」"
Output:
{
"triplets": [],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "Person", "description": "用户本人", "example": "", "aliases": ["陈思远", "远山"], "is_explicit_memory": false}
]
}
{% endif %}
===End of Examples===
{% if ontology_types %}
**⚠️ REMINDER: The examples above use generic type names for illustration only. You MUST use ONLY the predefined ontology type names from the "ALLOWED Type Names" list above. For example, use "PredictiveMaintenance" instead of "Concept", use "ProductionLine" instead of "Equipment", etc. Map each entity to the closest matching predefined type.**
**⚠️ REMINDER: Examples use generic types for illustration. You MUST use predefined types from "ALLOWED Names" above.**
{% endif %}
===Output Format===
**JSON Requirements:**
- Use only ASCII double quotes (") for JSON structure
- Never use Chinese quotation marks ("") or Unicode quotes
- Escape quotation marks in text with backslashes (\")
- Ensure proper string closure and comma separation
- No line breaks within JSON string values
- Use ASCII double quotes ("), escape with \"
- No Chinese quotes (""), no line breaks in strings
{% if language == "zh" %}
- **语言要求实体名称name、描述description)、示例(example、subject_name、object_name 必须使用中文**
- **语言name、descriptionexample、subject_name、object_name 使用中文**
{% else %}
- **Language Requirement: Entity names, descriptions, examples, subject_name, object_name must be in English**
- **If the original text is in Chinese, translate all names to English**
- **Language: names, descriptions, examples in English (translate if needed)**
{% endif %}
- **⚠️ ALIASES ORDER: preserve temporal order of appearance**
- **🚨 MANDATORY FIELD: EVERY entity MUST include "aliases" field, even if empty array []**
{{ json_schema }}

View File

@@ -16,6 +16,7 @@ from .agent_app_config_model import AgentConfig
from .app_release_model import AppRelease
from .memory_increment_model import MemoryIncrement
from .end_user_model import EndUser
from .end_user_info_model import EndUserInfo
from .appshare_model import AppShare
from .release_share_model import ReleaseShare
from .conversation_model import Conversation, Message
@@ -60,6 +61,7 @@ __all__ = [
"AppRelease",
"MemoryIncrement",
"EndUser",
"EndUserInfo",
"AppShare",
"ReleaseShare",
"Conversation",

View File

@@ -0,0 +1,24 @@
import datetime
import uuid
from sqlalchemy import Column, DateTime, ForeignKey, String, Text, ARRAY
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.db import Base
class EndUserInfo(Base):
"""终端用户信息表 - 存储用户的别名和扩展信息"""
__tablename__ = "end_user_info"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False, index=True)
end_user_id = Column(UUID(as_uuid=True), ForeignKey("end_users.id"), nullable=False, index=True, comment="关联的终端用户ID")
other_name = Column(String, nullable=False, comment="关联的用户名称")
aliases = Column(ARRAY(String), nullable=True, comment="用户别名列表(字符串数组)")
meta_data = Column(JSONB, nullable=True, comment="用户相关的扩展信息JSON格式")
created_at = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now, comment="更新时间")
# 与 EndUser 的关系
end_user = relationship("EndUser", back_populates="info")

View File

@@ -30,14 +30,6 @@ class EndUser(Base):
comment="关联的记忆配置ID"
)
# 用户基本信息字段
position = Column(String, nullable=True, comment="职位")
department = Column(String, nullable=True, comment="部门")
contact = Column(String, nullable=True, comment="联系方式")
phone = Column(String, nullable=True, comment="电话")
hire_date = Column(DateTime, nullable=True, comment="入职日期")
updatetime_profile = Column(DateTime, nullable=True, comment="核心档案信息最后更新时间")
# 用户摘要四个维度 - User Summary Four Dimensions
user_summary = Column(Text, nullable=True, comment="缓存的用户摘要(基本介绍)")
personality_traits = Column(Text, nullable=True, comment="性格特点")
@@ -65,4 +57,7 @@ class EndUser(Base):
)
# 与 WorkSpace 的反向关系
workspace = relationship("Workspace", back_populates="end_users")
workspace = relationship("Workspace", back_populates="end_users")
# 与 EndUserInfo 的反向关系
info = relationship("EndUserInfo", back_populates="end_user", cascade="all, delete-orphan")

View File

@@ -0,0 +1,71 @@
"""
终端用户信息仓储层
"""
import uuid
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.end_user_info_model import EndUserInfo
from app.core.logging_config import get_logger
logger = get_logger(__name__)
class EndUserInfoRepository:
"""终端用户信息仓储类"""
def __init__(self, db: Session):
self.db = db
def create(self, end_user_id: uuid.UUID, other_name: str, aliases: List[str] = None, meta_data: dict = None) -> EndUserInfo:
"""创建终端用户信息"""
end_user_info = EndUserInfo(
end_user_id=end_user_id,
other_name=other_name,
aliases=aliases or [],
meta_data=meta_data
)
self.db.add(end_user_info)
self.db.commit()
self.db.refresh(end_user_info)
logger.info(f"创建终端用户信息: end_user_id={end_user_id}, aliases={aliases}")
return end_user_info
def get_by_id(self, info_id: uuid.UUID) -> Optional[EndUserInfo]:
"""根据ID获取用户信息"""
return self.db.query(EndUserInfo).filter(EndUserInfo.id == info_id).first()
def get_by_end_user_id(self, end_user_id: uuid.UUID) -> Optional[EndUserInfo]:
"""获取用户的信息记录"""
return self.db.query(EndUserInfo).filter(EndUserInfo.end_user_id == end_user_id).first()
def update(self, info_id: uuid.UUID, aliases: List[str] = None, meta_data: dict = None) -> Optional[EndUserInfo]:
"""更新用户信息"""
end_user_info = self.get_by_id(info_id)
if end_user_info:
if aliases is not None:
end_user_info.aliases = aliases
if meta_data is not None:
end_user_info.meta_data = meta_data
self.db.commit()
self.db.refresh(end_user_info)
logger.info(f"更新终端用户信息: info_id={info_id}")
return end_user_info
def delete(self, info_id: uuid.UUID) -> bool:
"""删除用户信息"""
end_user_info = self.get_by_id(info_id)
if end_user_info:
self.db.delete(end_user_info)
self.db.commit()
logger.info(f"删除终端用户信息: info_id={info_id}")
return True
return False
def delete_by_end_user_id(self, end_user_id: uuid.UUID) -> int:
"""删除用户的所有信息记录"""
count = self.db.query(EndUserInfo).filter(EndUserInfo.end_user_id == end_user_id).delete()
self.db.commit()
logger.info(f"删除用户所有信息记录: end_user_id={end_user_id}, count={count}")
return count

View File

@@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
from app.core.logging_config import get_db_logger
from app.models.app_model import App
from app.models.end_user_model import EndUser
from app.models.end_user_info_model import EndUserInfo
from app.models.workspace_model import Workspace
# 获取数据库专用日志器
@@ -70,7 +71,8 @@ class EndUserRepository:
app_id: uuid.UUID,
workspace_id: uuid.UUID,
other_id: str,
original_user_id: Optional[str] = None
original_user_id: Optional[str] = None,
other_name: Optional[str] = None
) -> EndUser:
"""获取或创建终端用户
@@ -79,6 +81,7 @@ class EndUserRepository:
workspace_id: 工作空间ID
other_id: 第三方ID
original_user_id: 原始用户ID (存储到 other_id)
other_name: 用户名称(用于创建 EndUserInfo
"""
try:
# 尝试查找现有用户
@@ -106,10 +109,22 @@ class EndUserRepository:
other_id=other_id
)
self.db.add(end_user)
self.db.flush() # 刷新以获取 end_user.id但不提交事务
# 创建对应的 EndUserInfo 记录
end_user_info = EndUserInfo(
end_user_id=end_user.id,
other_name=other_name or "", # 如果没有提供 other_name使用空字符串
aliases=[],
meta_data={}
)
self.db.add(end_user_info)
# 一起提交
self.db.commit()
self.db.refresh(end_user)
db_logger.info(f"创建新终端用户: (other_id: {other_id}) for workspace {workspace_id}")
db_logger.info(f"创建新终端用户及其信息: (other_id: {other_id}) for workspace {workspace_id}")
return end_user
except Exception as e:

View File

@@ -336,6 +336,48 @@ ORDER BY score DESC
LIMIT $limit
"""
SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $q) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
WITH e, score
UNION
MATCH (e:ExtractedEntity)
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
AND e.aliases IS NOT NULL
AND ANY(alias IN e.aliases WHERE toLower(alias) CONTAINS toLower($q))
WITH e,
CASE
WHEN ANY(alias IN e.aliases WHERE toLower(alias) = toLower($q)) THEN 1.0
WHEN ANY(alias IN e.aliases WHERE toLower(alias) STARTS WITH toLower($q)) THEN 0.9
ELSE 0.8
END AS score
WITH DISTINCT e, MAX(score) AS score
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
RETURN e.id AS id,
e.name AS name,
e.end_user_id AS end_user_id,
e.entity_type AS entity_type,
e.created_at AS created_at,
e.expired_at AS expired_at,
e.entity_idx AS entity_idx,
e.statement_id AS statement_id,
e.description AS description,
e.aliases AS aliases,
e.name_embedding AS name_embedding,
e.connect_strength AS connect_strength,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT c.id) AS chunk_ids,
COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value,
COALESCE(e.importance_score, 0.5) AS importance_score,
e.last_access_time AS last_access_time,
COALESCE(e.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_CHUNKS_BY_CONTENT = """
CALL db.index.fulltext.queryNodes("chunksFulltext", $q) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)

View File

@@ -13,6 +13,7 @@ from app.repositories.neo4j.cypher_queries import (
SEARCH_COMMUNITIES_BY_KEYWORD,
SEARCH_DIALOGUE_BY_DIALOG_ID,
SEARCH_ENTITIES_BY_NAME,
SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
SEARCH_STATEMENTS_BY_CREATED_AT,
SEARCH_STATEMENTS_BY_KEYWORD,
@@ -264,7 +265,7 @@ async def search_graph(
if "entities" in include:
tasks.append(connector.execute_query(
SEARCH_ENTITIES_BY_NAME,
SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
q=q,
end_user_id=end_user_id,
limit=limit,

View File

@@ -0,0 +1,35 @@
import uuid
import datetime
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from pydantic import ConfigDict
class EndUserInfoBase(BaseModel):
"""终端用户信息基础模型"""
other_name: str = Field(description="关联的用户名称")
aliases: Optional[List[str]] = Field(description="用户别名列表", default=None)
meta_data: Optional[Dict[str, Any]] = Field(description="用户相关的扩展信息", default=None)
class EndUserInfoCreate(EndUserInfoBase):
"""创建终端用户信息请求模型"""
end_user_id: str = Field(description="关联的终端用户ID")
class EndUserInfoUpdate(BaseModel):
"""更新终端用户信息请求模型"""
end_user_id: str = Field(description="终端用户ID")
other_name: Optional[str] = Field(description="用户名称", default=None)
aliases: Optional[List[str]] = Field(description="用户别名列表", default=None)
meta_data: Optional[Dict[str, Any]] = Field(description="用户相关的扩展信息", default=None)
class EndUserInfoResponse(EndUserInfoBase):
"""终端用户信息响应模型"""
model_config = ConfigDict(from_attributes=True)
end_user_info_id: uuid.UUID = Field(description="终端用户信息记录ID")
end_user_id: uuid.UUID = Field(description="关联的终端用户ID")
created_at: datetime.datetime = Field(description="创建时间")
updated_at: datetime.datetime = Field(description="更新时间")

View File

@@ -1,6 +1,6 @@
import uuid
import datetime
from typing import Optional
from typing import Optional, List
from pydantic import BaseModel, Field
from pydantic import ConfigDict
@@ -17,40 +17,6 @@ class EndUser(BaseModel):
created_at: datetime.datetime = Field(description="创建时间", default_factory=datetime.datetime.now)
updated_at: datetime.datetime = Field(description="更新时间", default_factory=datetime.datetime.now)
# 用户基本信息字段
position: Optional[str] = Field(description="职位", default=None)
department: Optional[str] = Field(description="部门", default=None)
contact: Optional[str] = Field(description="联系方式", default=None)
phone: Optional[str] = Field(description="电话", default=None)
hire_date: Optional[datetime.datetime] = Field(description="入职日期", default=None)
updatetime_profile: Optional[datetime.datetime] = Field(description="核心档案信息最后更新时间", default=None)
# 用户摘要和洞察更新时间
user_summary_updated_at: Optional[datetime.datetime] = Field(description="用户摘要最后更新时间", default=None)
memory_insight_updated_at: Optional[datetime.datetime] = Field(description="洞察报告最后更新时间", default=None)
class EndUserProfileResponse(BaseModel):
"""终端用户基本信息响应模型"""
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID = Field(description="终端用户ID")
other_name: Optional[str] = Field(description="其他名称", default="")
position: Optional[str] = Field(description="职位", default=None)
department: Optional[str] = Field(description="部门", default=None)
contact: Optional[str] = Field(description="联系方式", default=None)
phone: Optional[str] = Field(description="电话", default=None)
hire_date: Optional[datetime.datetime] = Field(description="入职日期", default=None)
updatetime_profile: Optional[datetime.datetime] = Field(description="核心档案信息最后更新时间", default=None)
class EndUserProfileUpdate(BaseModel):
"""终端用户基本信息更新请求模型"""
end_user_id: str = Field(description="终端用户ID")
other_name: Optional[str] = Field(description="其他名称", default="")
position: Optional[str] = Field(description="职位", default=None)
department: Optional[str] = Field(description="部门", default=None)
contact: Optional[str] = Field(description="联系方式", default=None)
phone: Optional[str] = Field(description="电话", default=None)
hire_date: Optional[int] = Field(description="入职日期(时间戳,毫秒)", default=None)
memory_insight_updated_at: Optional[datetime.datetime] = Field(description="洞察报告最后更新时间", default=None)

View File

@@ -361,83 +361,58 @@ class UserMemoryService:
if hasattr(original_value, 'timestamp'):
data[key] = UserMemoryService._datetime_to_timestamp(original_value)
return data
def update_end_user_profile(
# ======================== 用户别名及信息 ========================
def get_end_user_info(
self,
db: Session,
end_user_id: str,
profile_update: Any
end_user_id: str
) -> Dict[str, Any]:
"""
更新终端用户的基本信息
查询单个终端用户信息记录
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
profile_update: 包含更新字段的 Pydantic 模型
Returns:
{
"success": bool,
"data": dict, # 更新后的用户档案数据
"data": dict,
"error": Optional[str]
}
"""
try:
# 转换为UUID并查询用户
user_uuid = uuid.UUID(end_user_id)
repo = EndUserRepository(db)
end_user = repo.get_by_id(user_uuid)
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.core.api_key_utils import datetime_to_timestamp
if not end_user:
logger.warning(f"终端用户不存在: end_user_id={end_user_id}")
# 转换为UUID并查询
user_uuid = uuid.UUID(end_user_id)
end_user_info_record = EndUserInfoRepository(db).get_by_end_user_id(user_uuid)
if not end_user_info_record:
logger.warning(f"终端用户信息记录不存在: end_user_id={end_user_id}")
return {
"success": False,
"data": None,
"error": "终端用户不存在"
"error": "终端用户信息记录不存在"
}
# 获取更新数据(排除 end_user_id 字段
update_data = profile_update.model_dump(exclude_unset=True, exclude={'end_user_id'})
# 构建响应数据(转换时间为毫秒时间戳
response_data = {
"end_user_info_id": str(end_user_info_record.id),
"end_user_id": str(end_user_info_record.end_user_id),
"other_name": end_user_info_record.other_name,
"aliases": end_user_info_record.aliases,
"meta_data": end_user_info_record.meta_data,
"created_at": datetime_to_timestamp(end_user_info_record.created_at),
"updated_at": datetime_to_timestamp(end_user_info_record.updated_at)
}
# 特殊处理 hire_date如果提供了时间戳转换为 DateTime
if 'hire_date' in update_data:
hire_date_timestamp = update_data['hire_date']
if hire_date_timestamp is not None:
from app.core.api_key_utils import timestamp_to_datetime
update_data['hire_date'] = timestamp_to_datetime(hire_date_timestamp)
# 如果是 None保持 None允许清空
# 更新字段
for field, value in update_data.items():
setattr(end_user, field, value)
# 更新时间戳
end_user.updated_at = datetime.now()
end_user.updatetime_profile = datetime.now()
# 提交更改
db.commit()
db.refresh(end_user)
# 构建响应数据
from app.schemas.end_user_schema import EndUserProfileResponse
profile_data = EndUserProfileResponse(
id=end_user.id,
other_name=end_user.other_name,
position=end_user.position,
department=end_user.department,
contact=end_user.contact,
phone=end_user.phone,
hire_date=end_user.hire_date,
updatetime_profile=end_user.updatetime_profile
)
logger.info(f"成功更新用户信息: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}")
logger.info(f"成功查询终端用户信息记录: end_user_id={end_user_id}")
return {
"success": True,
"data": self.convert_profile_to_dict_with_timestamp(profile_data),
"data": response_data,
"error": None
}
@@ -446,17 +421,166 @@ class UserMemoryService:
return {
"success": False,
"data": None,
"error": "无效的用户ID格式"
"error": "无效的终端用户ID格式"
}
except Exception as e:
db.rollback()
logger.error(f"用户信息更新失败: end_user_id={end_user_id}, error={str(e)}")
logger.error(f"查询终端用户信息记录失败: end_user_id={end_user_id}, error={str(e)}")
return {
"success": False,
"data": None,
"error": str(e)
}
def update_end_user_info(
self,
db: Session,
end_user_id: str,
update_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
更新终端用户信息记录
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
update_data: 更新数据字典
Returns:
{
"success": bool,
"data": dict,
"error": Optional[str]
}
"""
try:
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.repositories.end_user_repository import EndUserRepository
from app.core.api_key_utils import datetime_to_timestamp
# 转换为UUID并查询
user_uuid = uuid.UUID(end_user_id)
end_user_info_record = EndUserInfoRepository(db).get_by_end_user_id(user_uuid)
if not end_user_info_record:
logger.warning(f"终端用户信息记录不存在: end_user_id={end_user_id}")
return {
"success": False,
"data": None,
"error": "终端用户信息记录不存在"
}
# 定义允许更新的字段白名单
allowed_fields = {'other_name', 'aliases', 'meta_data'}
# 检查是否更新了 aliases 字段
aliases_updated = 'aliases' in update_data and update_data['aliases'] != end_user_info_record.aliases
# 检查是否更新了 other_name 字段
other_name_updated = 'other_name' in update_data and update_data['other_name'] != end_user_info_record.other_name
# 更新字段(仅允许白名单中的字段)
for field, value in update_data.items():
if field in allowed_fields:
setattr(end_user_info_record, field, value)
# 更新时间戳
end_user_info_record.updated_at = datetime.now()
# 如果 other_name 被更新,同步更新 end_user 表
if other_name_updated:
end_user_record = EndUserRepository(db).get_by_id(user_uuid)
if end_user_record:
end_user_record.other_name = update_data['other_name']
end_user_record.updated_at = datetime.now()
logger.info(f"同步更新 end_user 表的 other_name: end_user_id={end_user_id}, other_name={update_data['other_name']}")
else:
logger.warning(f"未找到对应的 end_user 记录: end_user_id={end_user_id}")
# 提交更改
db.commit()
db.refresh(end_user_info_record)
# 如果 aliases 被更新,同步到 Neo4j
if aliases_updated:
try:
import asyncio
asyncio.run(self._sync_aliases_to_neo4j(end_user_id, update_data['aliases']))
logger.info(f"已触发 aliases 同步到 Neo4j: end_user_id={end_user_id}, aliases={update_data['aliases']}")
except Exception as sync_error:
logger.error(f"触发同步 aliases 到 Neo4j 失败: {sync_error}", exc_info=True)
# 不影响主流程,只记录错误
# 构建响应数据(转换时间为毫秒时间戳)
response_data = {
"end_user_info_id": str(end_user_info_record.id),
"end_user_id": str(end_user_info_record.end_user_id),
"other_name": end_user_info_record.other_name,
"aliases": end_user_info_record.aliases,
"meta_data": end_user_info_record.meta_data,
"created_at": datetime_to_timestamp(end_user_info_record.created_at),
"updated_at": datetime_to_timestamp(end_user_info_record.updated_at)
}
logger.info(f"成功更新终端用户信息记录: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}")
return {
"success": True,
"data": response_data,
"error": None
}
except ValueError:
logger.error(f"无效的 end_user_id 格式: {end_user_id}")
return {
"success": False,
"data": None,
"error": "无效的终端用户ID格式"
}
except Exception as e:
db.rollback()
logger.error(f"更新终端用户信息记录失败: end_user_id={end_user_id}, error={str(e)}")
return {
"success": False,
"data": None,
"error": str(e)
}
async def _sync_aliases_to_neo4j(self, end_user_id: str, aliases: List[str]) -> None:
"""
将 aliases 同步到 Neo4j 中的用户实体
Args:
end_user_id: 终端用户ID
aliases: 别名列表
"""
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
# Cypher 查询:更新用户实体的 aliases
cypher_query = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id
AND e.name IN ['用户', '', 'User', 'I']
SET e.aliases = $aliases
RETURN e.id AS entity_id, e.name AS entity_name, e.aliases AS updated_aliases
"""
connector = Neo4jConnector()
try:
result = await connector.execute_query(
cypher_query,
end_user_id=end_user_id,
aliases=aliases
)
if result:
logger.info(f"成功同步 aliases 到 Neo4j: end_user_id={end_user_id}, 更新了 {len(result)} 个实体节点")
else:
logger.warning(f"未找到需要更新的用户实体节点: end_user_id={end_user_id}")
except Exception as e:
logger.error(f"同步 aliases 到 Neo4j 失败: {e}", exc_info=True)
raise
async def get_cached_memory_insight(
self,
db: Session,