Merge remote-tracking branch 'origin/develop' into develop

# Conflicts:
#	api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py
This commit is contained in:
lixinyue
2026-01-22 10:20:37 +08:00
11 changed files with 200 additions and 219 deletions

View File

@@ -338,8 +338,9 @@ This project is licensed under the Apache License 2.0. For details, see the LICE
Join our community to ask questions, share your work, and connect with fellow developers. Join our community to ask questions, share your work, and connect with fellow developers.
- **GitHub Issues**: Report bugs, request features, or track known issues via [GitHub Issues](https://github.com/redbear-ai/memorybear/issues). - **GitHub Issues**: Report bugs, request features, or track known issues via [GitHub Issues](https://github.com/SuanmoSuanyangTechnology/MemoryBear/issues).
- **GitHub Pull Requests**: Contribute code improvements or fixes through [Pull Requests](https://github.com/redbear-ai/memorybear/pulls). - **GitHub Pull Requests**: Contribute code improvements or fixes through [Pull Requests](https://github.com/SuanmoSuanyangTechnology/MemoryBear/pulls).
- **GitHub Discussions**: Ask questions, share ideas, and engage with the community in [GitHub Discussions](https://github.com/redbear-ai/memorybear/discussions). - **GitHub Discussions**: Ask questions, share ideas, and engage with the community in [GitHub Discussions](https://github.com/SuanmoSuanyangTechnology/MemoryBear/discussions).
- **WeChat**: Scan the QR code below to join our WeChat community group. - **WeChat**: Scan the QR code below to join our WeChat community group.
- ![wecom-temp-114020-47fe87a75da439f09f5dc93a01593046](https://github.com/user-attachments/assets/8c81885c-4134-40d5-96e2-7f78cc082dc6)
- **Contact**: If you are interested in contributing or collaborating, feel free to reach out at tianyou_hubm@redbearai.com - **Contact**: If you are interested in contributing or collaborating, feel free to reach out at tianyou_hubm@redbearai.com

View File

@@ -9,6 +9,8 @@ from app.db import get_db
from app.dependencies import cur_workspace_access_guard, get_current_user from app.dependencies import cur_workspace_access_guard, get_current_user
from app.models import ModelApiKey from app.models import ModelApiKey
from app.models.user_model import User from app.models.user_model import User
from app.core.memory.agent.utils.session_tools import SessionService
from app.core.memory.agent.utils.redis_tool import store
from app.repositories import knowledge_repository, WorkspaceRepository from app.repositories import knowledge_repository, WorkspaceRepository
from app.schemas.memory_agent_schema import UserInput, Write_UserInput from app.schemas.memory_agent_schema import UserInput, Write_UserInput
from app.schemas.response_schema import ApiResponse from app.schemas.response_schema import ApiResponse
@@ -291,6 +293,19 @@ async def read_server(
storage_type, storage_type,
user_rag_memory_id user_rag_memory_id
) )
if str(user_input.search_switch) == "2":
retrieve_info = result['answer']
history = await SessionService(store).get_history(user_input.group_id, user_input.group_id, user_input.group_id)
query = user_input.message
# 调用 memory_agent_service 的方法生成最终答案
result['answer'] = await memory_agent_service.generate_summary_from_retrieve(
retrieve_info=retrieve_info,
history=history,
query=query,
config_id=config_id,
db=db
)
return success(data=result, msg="回复对话消息成功") return success(data=result, msg="回复对话消息成功")
except BaseException as e: except BaseException as e:
# Handle ExceptionGroup from TaskGroup (Python 3.11+) or BaseExceptionGroup # Handle ExceptionGroup from TaskGroup (Python 3.11+) or BaseExceptionGroup
@@ -682,7 +697,7 @@ async def get_user_profile_api(
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
): ):
""" """
获取用户详情,包含: 获取工作空间下Popular Memory Tags,包含:
- name: 用户名字(直接使用 end_user_id - name: 用户名字(直接使用 end_user_id
- tags: 3个用户特征标签从语句和实体中LLM总结 - tags: 3个用户特征标签从语句和实体中LLM总结
- hot_tags: 4个热门记忆标签 - hot_tags: 4个热门记忆标签

View File

@@ -5,7 +5,6 @@ from app.core.response_utils import success
from app.db import get_db from app.db import get_db
from app.dependencies import get_current_user from app.dependencies import get_current_user
from app.models.user_model import User from app.models.user_model import User
from app.schemas.memory_agent_schema import End_User_Information
from app.schemas.response_schema import ApiResponse from app.schemas.response_schema import ApiResponse
from app.services import memory_dashboard_service, memory_storage_service, workspace_service from app.services import memory_dashboard_service, memory_storage_service, workspace_service
@@ -40,54 +39,7 @@ def get_workspace_total_end_users(
api_logger.info(f"成功获取最新用户总数: total_num={total_end_users.get('total_num', 0)}") api_logger.info(f"成功获取最新用户总数: total_num={total_end_users.get('total_num', 0)}")
return success(data=total_end_users, msg="用户数量获取成功") return success(data=total_end_users, msg="用户数量获取成功")
@router.post("/update/end_users", response_model=ApiResponse)
async def update_workspace_end_users(
user_input: End_User_Information,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
更新工作空间的宿主信息
"""
username = user_input.end_user_name # 要更新的用户名
end_user_input_id = user_input.id # 宿主ID
workspace_id = current_user.current_workspace_id
api_logger.info(f"用户 {current_user.username} 请求更新工作空间 {workspace_id} 的宿主信息")
api_logger.info(f"更新参数: username={username}, end_user_id={end_user_input_id}")
try:
# 导入更新函数
from app.repositories.end_user_repository import update_end_user_other_name
import uuid
# 转换 end_user_id 为 UUID 类型
end_user_uuid = uuid.UUID(end_user_input_id)
# 直接更新数据库中的 other_name 字段
updated_count = update_end_user_other_name(
db=db,
end_user_id=end_user_uuid,
other_name=username
)
api_logger.info(f"成功更新宿主 {end_user_input_id} 的 other_name 为: {username}")
return success(
data={
"updated_count": updated_count,
"end_user_id": end_user_input_id,
"updated_other_name": username
},
msg=f"成功更新 {updated_count} 个宿主的信息"
)
except Exception as e:
api_logger.error(f"更新宿主信息失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"更新宿主信息失败: {str(e)}"
)

View File

@@ -28,7 +28,6 @@ from app.services.memory_storage_service import (
search_dialogue, search_dialogue,
search_edges, search_edges,
search_entity, search_entity,
search_entity_graph,
search_statement, search_statement,
) )
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
@@ -412,21 +411,7 @@ async def search_entity_edges(
api_logger.error(f"Search edges failed: {str(e)}") api_logger.error(f"Search edges failed: {str(e)}")
return fail(BizCode.INTERNAL_ERROR, "边查询失败", str(e)) return fail(BizCode.INTERNAL_ERROR, "边查询失败", str(e))
@router.get("/search/entity_graph", response_model=ApiResponse)
async def search_for_entity_graph(
end_user_id: Optional[str] = None,
current_user: User = Depends(get_current_user),
) -> dict:
"""
搜索所有实体之间的关系网络
"""
api_logger.info(f"Search entity graph requested for end_user_id: {end_user_id}")
try:
result = await search_entity_graph(end_user_id)
return success(data=result, msg="查询成功")
except Exception as e:
api_logger.error(f"Search entity graph failed: {str(e)}")
return fail(BizCode.INTERNAL_ERROR, "实体图查询失败", str(e))
@router.get("/analytics/hot_memory_tags", response_model=ApiResponse) @router.get("/analytics/hot_memory_tags", response_model=ApiResponse)

View File

@@ -351,12 +351,11 @@ async def update_end_user_profile(
该接口可以更新用户的姓名、职位、部门、联系方式、电话和入职日期等信息。 该接口可以更新用户的姓名、职位、部门、联系方式、电话和入职日期等信息。
所有字段都是可选的,只更新提供的字段。 所有字段都是可选的,只更新提供的字段。
""" """
workspace_id = current_user.current_workspace_id workspace_id = current_user.current_workspace_id
end_user_id = profile_update.end_user_id end_user_id = profile_update.end_user_id
# 检查用户是否已选择工作空间 # 验证工作空间
if workspace_id is None: if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试更新用户信息但未选择工作空间") api_logger.warning(f"用户 {current_user.username} 尝试更新用户信息但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
@@ -366,57 +365,24 @@ async def update_end_user_profile(
f"workspace={workspace_id}" f"workspace={workspace_id}"
) )
try: # 调用 Service 层处理业务逻辑
# 查询终端用户 result = user_memory_service.update_end_user_profile(db, end_user_id, profile_update)
end_user = db.query(EndUser).filter(EndUser.id == end_user_id).first()
if not end_user: if result["success"]:
api_logger.warning(f"终端用户不存在: end_user_id={end_user_id}") api_logger.info(f"成功更新用户信息: end_user_id={end_user_id}")
return fail(BizCode.INVALID_PARAMETER, "终端用户不存在", f"end_user_id={end_user_id}") return success(data=result["data"], msg="更新成功")
else:
error_msg = result["error"]
api_logger.error(f"用户信息更新失败: end_user_id={end_user_id}, error={error_msg}")
# 更新字段(只更新提供的字段,排除 end_user_id # 根据错误类型映射到合适的业务错误码
# 允许 None 值来重置字段(如 hire_date if error_msg == "终端用户不存在":
update_data = profile_update.model_dump(exclude_unset=True, exclude={'end_user_id'}) return fail(BizCode.USER_NOT_FOUND, "终端用户不存在", error_msg)
elif error_msg == "无效的用户ID格式":
# 特殊处理 hire_date如果提供了时间戳转换为 DateTime return fail(BizCode.INVALID_USER_ID, "无效的用户ID格式", error_msg)
if 'hire_date' in update_data: else:
hire_date_timestamp = update_data['hire_date'] # 只有未预期的错误才使用 INTERNAL_ERROR
if hire_date_timestamp is not None: return fail(BizCode.INTERNAL_ERROR, "用户信息更新失败", error_msg)
update_data['hire_date'] = timestamp_to_datetime(hire_date_timestamp)
# 如果是 None保持 None允许清空
for field, value in update_data.items():
setattr(end_user, field, value)
# 更新 updated_at 时间戳
end_user.updated_at = datetime.datetime.now()
# 更新 updatetime_profile 为当前时间
end_user.updatetime_profile = datetime.datetime.now()
# 提交更改
db.commit()
db.refresh(end_user)
# 构建响应数据
profile_data = EndUserProfileResponse(
id=end_user.id,
other_name=end_user.other_name,
position=end_user.position,
department=end_user.department,
contact=end_user.contact,
phone=end_user.phone,
hire_date=end_user.hire_date,
updatetime_profile=end_user.updatetime_profile
)
api_logger.info(f"成功更新用户信息: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}")
return success(data=UserMemoryService.convert_profile_to_dict_with_timestamp(profile_data), msg="更新成功")
except Exception as e:
db.rollback()
api_logger.error(f"用户信息更新失败: end_user_id={end_user_id}, error={str(e)}")
return fail(BizCode.INTERNAL_ERROR, "用户信息更新失败", str(e))
@router.get("/memory_space/timeline_memories", response_model=ApiResponse) @router.get("/memory_space/timeline_memories", response_model=ApiResponse)
async def memory_space_timeline_of_shared_memories(id: str, label: str,language_type: str="zh", async def memory_space_timeline_of_shared_memories(id: str, label: str,language_type: str="zh",

View File

@@ -104,38 +104,6 @@ class DataConfigRepository:
r.statement AS statement r.statement AS statement
""" """
# Entity graph within group (source node, edge, target node)
SEARCH_FOR_ENTITY_GRAPH = """
MATCH (n:ExtractedEntity)-[r]->(m:ExtractedEntity)
WHERE n.group_id = $group_id
RETURN
{
entity_idx: n.entity_idx,
connect_strength: n.connect_strength,
description: n.description,
entity_type: n.entity_type,
name: n.name,
fact_summary: COALESCE(n.fact_summary, ''),
id: n.id
} AS sourceNode,
{
rel_id: elementId(r),
source_id: startNode(r).id,
target_id: endNode(r).id,
predicate: r.predicate,
statement_id: r.statement_id,
statement: r.statement
} AS edge,
{
entity_idx: m.entity_idx,
connect_strength: m.connect_strength,
description: m.description,
entity_type: m.entity_type,
name: m.name,
fact_summary: COALESCE(m.fact_summary, ''),
id: m.id
} AS targetNode
"""
@staticmethod @staticmethod
def update_reflection_config( def update_reflection_config(
db: Session, db: Session,

View File

@@ -276,42 +276,6 @@ def get_end_user_by_id(db: Session, end_user_id: uuid.UUID) -> Optional[EndUser]
end_user = repo.get_end_user_by_id(end_user_id) end_user = repo.get_end_user_by_id(end_user_id)
return end_user return end_user
def update_end_user_other_name(
db: Session,
end_user_id: uuid.UUID,
other_name: str
) -> int:
"""
通过 end_user_id 更新 end_user 表中的 other_name 字段
Args:
db: 数据库会话
end_user_id: 宿主ID
other_name: 要更新的用户名
Returns:
int: 更新的记录数
"""
try:
# 执行更新
updated_count = (
db.query(EndUser)
.filter(EndUser.id == end_user_id)
.update(
{EndUser.other_name: other_name},
synchronize_session=False
)
)
db.commit()
db_logger.info(f"成功更新宿主 {end_user_id} 的 other_name 为: {other_name}")
return updated_count
except Exception as e:
db.rollback()
db_logger.error(f"更新宿主 {end_user_id} 的 other_name 时出错: {str(e)}")
raise
# 新增的缓存操作函数(保持与类方法一致的接口) # 新增的缓存操作函数(保持与类方法一致的接口)
def get_by_id(db: Session, end_user_id: uuid.UUID) -> Optional[EndUser]: def get_by_id(db: Session, end_user_id: uuid.UUID) -> Optional[EndUser]:
"""根据ID获取终端用户用于缓存操作""" """根据ID获取终端用户用于缓存操作"""

View File

@@ -15,7 +15,3 @@ class Write_UserInput(BaseModel):
messages: list[dict] messages: list[dict]
group_id: str group_id: str
config_id: Optional[str] = None config_id: Optional[str] = None
class End_User_Information(BaseModel):
end_user_name: str # 这是要更新的用户名
id: str # 宿主ID用于匹配条件

View File

@@ -683,7 +683,67 @@ class MemoryAgentService:
logger.debug(f"Message type: {status}") logger.debug(f"Message type: {status}")
return status return status
# ==================== 新增的三个接口方法 ==================== async def generate_summary_from_retrieve(
self,
retrieve_info: str,
history: List[Dict],
query: str,
config_id: str,
db: Session
) -> str:
"""
基于检索信息、历史对话和查询生成最终答案
使用 Retrieve_Summary_prompt.jinja2 模板调用大模型生成答案
Args:
retrieve_info: 检索到的信息
history: 历史对话记录
query: 用户查询
config_id: 配置ID
db: 数据库会话
Returns:
生成的答案文本
"""
logger.info(f"Generating summary from retrieve info for query: {query[:50]}...")
try:
# 加载配置
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
service_name="MemoryAgentService"
)
# 导入必要的模块
from app.core.memory.agent.langgraph_graph.nodes.summary_nodes import summary_llm
from app.core.memory.agent.models.summary_models import RetrieveSummaryResponse
# 构建状态对象
state = {
"data": query,
"memory_config": memory_config
}
# 直接调用 summary_llm 函数
answer = await summary_llm(
state=state,
history=history,
retrieve_info=retrieve_info,
template_name='Retrieve_Summary_prompt.jinja2',
operation_name='retrieve_summary',
response_model=RetrieveSummaryResponse,
search_mode="1"
)
logger.info(f"Successfully generated summary: {answer[:100] if answer else 'None'}...")
return answer if answer else "信息不足,无法回答。"
except Exception as e:
logger.error(f"生成摘要失败: {str(e)}", exc_info=True)
return "信息不足,无法回答。"
async def get_knowledge_type_stats( async def get_knowledge_type_stats(
self, self,
@@ -1157,7 +1217,7 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
""" """
from app.models.app_release_model import AppRelease from app.models.app_release_model import AppRelease
from app.models.end_user_model import EndUser from app.models.end_user_model import EndUser
from app.models.memory_config_model import MemoryConfig from app.models.data_config_model import DataConfig
from sqlalchemy import select from sqlalchemy import select
logger.info(f"Batch getting connected configs for {len(end_user_ids)} end_users") logger.info(f"Batch getting connected configs for {len(end_user_ids)} end_users")
@@ -1215,8 +1275,8 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
# 批量查询 memory_config_name # 批量查询 memory_config_name
config_id_to_name = {} config_id_to_name = {}
if memory_config_ids: if memory_config_ids:
memory_configs = db.query(MemoryConfig).filter(MemoryConfig.id.in_(memory_config_ids)).all() memory_configs = db.query(DataConfig).filter(DataConfig.config_id.in_(memory_config_ids)).all()
config_id_to_name = {str(mc.id): mc.config_name for mc in memory_configs} config_id_to_name = {str(mc.config_id): mc.config_name for mc in memory_configs}
# 4. 构建最终结果 # 4. 构建最终结果
for end_user_id, app_id in user_to_app.items(): for end_user_id, app_id in user_to_app.items():
@@ -1233,7 +1293,7 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
# 获取配置名称 # 获取配置名称
memory_config_name = config_id_to_name.get(memory_config_id) if memory_config_id else None memory_config_name = config_id_to_name.get(str(memory_config_id)) if memory_config_id else None
result[end_user_id] = { result[end_user_id] = {
"memory_config_id": memory_config_id, "memory_config_id": memory_config_id,

View File

@@ -506,27 +506,6 @@ async def search_edges(end_user_id: Optional[str] = None) -> List[Dict[str, Any]
return result return result
async def search_entity_graph(end_user_id: Optional[str] = None) -> Dict[str, Any]:
"""搜索所有实体之间的关系网络group 维度)。"""
result = await _neo4j_connector.execute_query(
DataConfigRepository.SEARCH_FOR_ENTITY_GRAPH,
group_id=end_user_id,
)
# 对source_node 和 target_node 的 fact_summary进行截取只截取前三条的内容需要提取前三条“来源”
for item in result:
source_fact = item["sourceNode"]["fact_summary"]
target_fact = item["targetNode"]["fact_summary"]
# 截取前三条“来源”
item["sourceNode"]["fact_summary"] = source_fact.split("\n")[:4] if source_fact else []
item["targetNode"]["fact_summary"] = target_fact.split("\n")[:4] if target_fact else []
# 与现有返回风格保持一致,携带搜索类型、数量与详情
data = {
"search_for": "entity_graph",
"num": len(result),
"detials": result,
}
return data
async def analytics_hot_memory_tags( async def analytics_hot_memory_tags(
db: Session, db: Session,

View File

@@ -357,6 +357,101 @@ class UserMemoryService:
data[key] = UserMemoryService._datetime_to_timestamp(original_value) data[key] = UserMemoryService._datetime_to_timestamp(original_value)
return data return data
def update_end_user_profile(
self,
db: Session,
end_user_id: str,
profile_update: Any
) -> Dict[str, Any]:
"""
更新终端用户的基本信息
Args:
db: 数据库会话
end_user_id: 终端用户ID (UUID)
profile_update: 包含更新字段的 Pydantic 模型
Returns:
{
"success": bool,
"data": dict, # 更新后的用户档案数据
"error": Optional[str]
}
"""
try:
# 转换为UUID并查询用户
user_uuid = uuid.UUID(end_user_id)
repo = EndUserRepository(db)
end_user = repo.get_by_id(user_uuid)
if not end_user:
logger.warning(f"终端用户不存在: end_user_id={end_user_id}")
return {
"success": False,
"data": None,
"error": "终端用户不存在"
}
# 获取更新数据(排除 end_user_id 字段)
update_data = profile_update.model_dump(exclude_unset=True, exclude={'end_user_id'})
# 特殊处理 hire_date如果提供了时间戳转换为 DateTime
if 'hire_date' in update_data:
hire_date_timestamp = update_data['hire_date']
if hire_date_timestamp is not None:
from app.core.api_key_utils import timestamp_to_datetime
update_data['hire_date'] = timestamp_to_datetime(hire_date_timestamp)
# 如果是 None保持 None允许清空
# 更新字段
for field, value in update_data.items():
setattr(end_user, field, value)
# 更新时间戳
end_user.updated_at = datetime.now()
end_user.updatetime_profile = datetime.now()
# 提交更改
db.commit()
db.refresh(end_user)
# 构建响应数据
from app.schemas.end_user_schema import EndUserProfileResponse
profile_data = EndUserProfileResponse(
id=end_user.id,
other_name=end_user.other_name,
position=end_user.position,
department=end_user.department,
contact=end_user.contact,
phone=end_user.phone,
hire_date=end_user.hire_date,
updatetime_profile=end_user.updatetime_profile
)
logger.info(f"成功更新用户信息: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}")
return {
"success": True,
"data": self.convert_profile_to_dict_with_timestamp(profile_data),
"error": None
}
except ValueError:
logger.error(f"无效的 end_user_id 格式: {end_user_id}")
return {
"success": False,
"data": None,
"error": "无效的用户ID格式"
}
except Exception as e:
db.rollback()
logger.error(f"用户信息更新失败: end_user_id={end_user_id}, error={str(e)}")
return {
"success": False,
"data": None,
"error": str(e)
}
async def get_cached_memory_insight( async def get_cached_memory_insight(
self, self,
db: Session, db: Session,