From d30b9224abd6fa6ec0d9dd4de77a0d882df19dc6 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Wed, 29 Apr 2026 11:14:21 +0800 Subject: [PATCH 1/7] [add] migration script --- .../versions/c87c9cdb52c4_202604281114.py | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 api/migrations/versions/c87c9cdb52c4_202604281114.py diff --git a/api/migrations/versions/c87c9cdb52c4_202604281114.py b/api/migrations/versions/c87c9cdb52c4_202604281114.py new file mode 100644 index 00000000..78d4c461 --- /dev/null +++ b/api/migrations/versions/c87c9cdb52c4_202604281114.py @@ -0,0 +1,140 @@ +"""202604281114 + +Revision ID: c87c9cdb52c4 +Revises: 4e89970f9e7c +Create Date: 2026-04-28 11:13:02.441905 + +""" +from typing import Dict, List, Sequence, Union + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = 'c87c9cdb52c4' +down_revision: Union[str, None] = '4e89970f9e7c' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +BATCH_SIZE = 500 + + +def _chunked(values: List[str], size: int) -> List[List[str]]: + return [values[index:index + size] for index in range(0, len(values), size)] + + +def _load_neo4j_end_user_ids(connection) -> List[str]: + """加载所有需要从 Neo4j 同步 memory_count 的宿主。 + + RAG 工作空间的记忆数量以 documents.chunk_num 为准,不写入 end_users.memory_count。 + """ + rows = connection.execute(sa.text(""" + SELECT eu.id::text AS end_user_id + FROM end_users eu + JOIN workspaces w ON eu.workspace_id = w.id + WHERE w.storage_type IS NULL OR w.storage_type <> 'rag' + """)).all() + return [row[0] for row in rows] + + +async def _fetch_neo4j_counts(end_user_ids: List[str]) -> Dict[str, int]: + if not end_user_ids: + return {} + + from app.repositories.memory_config_repository import MemoryConfigRepository + from app.repositories.neo4j.neo4j_connector import Neo4jConnector + + connector = Neo4jConnector() + try: + result = await connector.execute_query( + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=end_user_ids, + ) + finally: + await connector.close() + + counts = {str(row["user_id"]): int(row["total"]) for row in result} + for end_user_id in end_user_ids: + counts.setdefault(end_user_id, 0) + return counts + + +def _update_memory_counts(connection, counts: Dict[str, int]) -> int: + updated = 0 + for end_user_id, memory_count in counts.items(): + result = connection.execute( + sa.text(""" + UPDATE end_users + SET memory_count = :memory_count + WHERE id = CAST(:end_user_id AS uuid) + """), + { + "end_user_id": end_user_id, + "memory_count": memory_count, + }, + ) + updated += result.rowcount or 0 + return updated + + +def _sync_memory_count_from_neo4j() -> None: + """迁移时初始化 Neo4j 模式宿主的 memory_count。 + + """ + import asyncio + + print("[memory_count] 开始同步 Neo4j 模式宿主 memory_count") + connection = op.get_bind() + target_ids = _load_neo4j_end_user_ids(connection) + if not target_ids: + print("[memory_count] 没有需要同步的 Neo4j 模式宿主") + return + + print( + f"[memory_count] 待同步宿主数量: {len(target_ids)}, " + f"batch_size={BATCH_SIZE}" + ) + + total_updated = 0 + batches = _chunked(target_ids, BATCH_SIZE) + for batch_index, batch_ids in enumerate(batches, start=1): + print( + f"[memory_count] 正在查询 Neo4j: " + f"batch={batch_index}/{len(batches)}, size={len(batch_ids)}" + ) + counts = asyncio.run(_fetch_neo4j_counts(batch_ids)) + total_updated += _update_memory_counts(connection, counts) + print( + f"[memory_count] 已写入 PostgreSQL: " + f"updated={total_updated}/{len(target_ids)}" + ) + + print( + f"[memory_count] Neo4j 模式宿主同步完成: " + f"total={len(target_ids)}, updated={total_updated}" + ) + + +def upgrade() -> None: + op.add_column( + 'end_users', + sa.Column( + 'memory_count', + sa.Integer(), + server_default='0', + nullable=False, + comment='记忆节点总数', + ), + ) + _sync_memory_count_from_neo4j() + op.create_index( + op.f('ix_end_users_memory_count'), + 'end_users', + ['memory_count'], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f('ix_end_users_memory_count'), table_name='end_users') + op.drop_column('end_users', 'memory_count') From a7d3930f4d33aee3004c623b4d167d73f89b712a Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Wed, 29 Apr 2026 14:21:14 +0800 Subject: [PATCH 2/7] feat(memory): add end user memory count filtering - Sync memory_count after Neo4j write and forgetting cycle - Filter Neo4j end user list by memory_count > 0 - Filter RAG end user list by Memory knowledge chunk count --- .../memory_dashboard_controller.py | 124 ++++++++---------- .../core/memory/agent/utils/write_tools.py | 49 +++++++ .../forgetting_engine/forgetting_scheduler.py | 51 ++++++- api/app/models/end_user_model.py | 11 +- api/app/schemas/end_user_schema.py | 4 +- api/app/services/memory_dashboard_service.py | 104 ++++++++++++++- 6 files changed, 270 insertions(+), 73 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 525fe1eb..56276339 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -1,4 +1,4 @@ -import asyncio + import uuid from fastapi import APIRouter, Depends, HTTPException, status, Query from pydantic import BaseModel, Field @@ -10,7 +10,7 @@ from app.dependencies import get_current_user from app.models.user_model import User from app.schemas.response_schema import ApiResponse -from app.services import memory_dashboard_service, memory_storage_service, workspace_service +from app.services import memory_dashboard_service, workspace_service from app.services.memory_agent_service import get_end_users_connected_configs_batch from app.services.app_statistics_service import AppStatisticsService from app.core.logging_config import get_api_logger @@ -48,7 +48,7 @@ def get_workspace_total_end_users( @router.get("/end_users", response_model=ApiResponse) -async def get_workspace_end_users( +def get_workspace_end_users( workspace_id: Optional[uuid.UUID] = Query(None, description="工作空间ID(可选,默认当前用户工作空间)"), keyword: Optional[str] = Query(None, description="搜索关键词(同时模糊匹配 other_name 和 id)"), page: int = Query(1, ge=1, description="页码,从1开始"), @@ -58,6 +58,15 @@ async def get_workspace_end_users( ): """ 获取工作空间的宿主列表(分页查询,支持模糊搜索) + + 新增:记忆数量过滤: + Neo4j 模式: + - 使用 end_users.memory_count 过滤 memory_count > 0 的宿主 + - memory_num.total 直接取 end_user.memory_count + + RAG 模式: + - 使用 documents.chunk_num 聚合过滤 chunk 总数 > 0 的宿主 + - memory_num.total 取聚合后的 chunk 总数 返回工作空间下的宿主列表,支持分页查询和模糊搜索。 通过 keyword 参数同时模糊匹配 other_name 和 id 字段。 @@ -80,17 +89,29 @@ async def get_workspace_end_users( current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}") - # 获取分页的 end_users - end_users_result = memory_dashboard_service.get_workspace_end_users_paginated( - db=db, - workspace_id=workspace_id, - current_user=current_user, - page=page, - pagesize=pagesize, - keyword=keyword - ) + if current_workspace_type == "rag": + end_users_result = memory_dashboard_service.get_workspace_end_users_paginated_rag( + db=db, + workspace_id=workspace_id, + current_user=current_user, + page=page, + pagesize=pagesize, + keyword=keyword, + ) + raw_items = end_users_result.get("items", []) + end_users = [item["end_user"] for item in raw_items] + else: + end_users_result = memory_dashboard_service.get_workspace_end_users_paginated( + db=db, + workspace_id=workspace_id, + current_user=current_user, + page=page, + pagesize=pagesize, + keyword=keyword, + ) + raw_items = end_users_result.get("items", []) + end_users = raw_items - end_users = end_users_result.get("items", []) total = end_users_result.get("total", 0) if not end_users: @@ -101,50 +122,19 @@ async def get_workspace_end_users( "page": page, "pagesize": pagesize, "total": total, - "hasnext": (page * pagesize) < total - } + "hasnext": (page * pagesize) < total, + }, }, msg="宿主列表获取成功") end_user_ids = [str(user.id) for user in end_users] - # 并发执行两个独立的查询任务 - async def get_memory_configs(): - """获取记忆配置(在线程池中执行同步查询)""" - try: - return await asyncio.to_thread( - get_end_users_connected_configs_batch, - end_user_ids, db - ) - except Exception as e: - api_logger.error(f"批量获取记忆配置失败: {str(e)}") - return {} + try: + memory_configs_map = get_end_users_connected_configs_batch(end_user_ids, db) + except Exception as e: + api_logger.error(f"批量获取记忆配置失败: {str(e)}") + memory_configs_map = {} - async def get_memory_nums(): - """获取记忆数量""" - if current_workspace_type == "rag": - # RAG 模式:批量查询 - try: - chunk_map = await asyncio.to_thread( - memory_dashboard_service.get_users_total_chunk_batch, - end_user_ids, db, current_user - ) - return {uid: {"total": count} for uid, count in chunk_map.items()} - except Exception as e: - api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") - return {uid: {"total": 0} for uid in end_user_ids} - - elif current_workspace_type == "neo4j": - # Neo4j 模式:批量查询(简化版本,只返回total) - try: - batch_result = await memory_storage_service.search_all_batch(end_user_ids) - return {uid: {"total": count} for uid, count in batch_result.items()} - except Exception as e: - api_logger.error(f"批量获取 Neo4j 记忆数量失败: {str(e)}") - return {uid: {"total": 0} for uid in end_user_ids} - - return {uid: {"total": 0} for uid in end_user_ids} - - # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 + # 触发按需初始化:为 implicit_emotions_storage / interest_distribution 中没有记录的用户异步生成数据 try: from app.celery_app import celery_app as _celery_app _celery_app.send_task( @@ -159,27 +149,26 @@ async def get_workspace_end_users( except Exception as e: api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}") - # 并发执行配置查询和记忆数量查询 - memory_configs_map, memory_nums_map = await asyncio.gather( - get_memory_configs(), - get_memory_nums() - ) - - # 构建结果列表 items = [] - for end_user in end_users: + for index, end_user in enumerate(end_users): user_id = str(end_user.id) config_info = memory_configs_map.get(user_id, {}) + + if current_workspace_type == "rag": + memory_total = int(raw_items[index].get("memory_count", 0) or 0) + else: + memory_total = int(getattr(end_user, "memory_count", 0) or 0) + items.append({ - 'end_user': { - 'id': user_id, - 'other_name': end_user.other_name + "end_user": { + "id": user_id, + "other_name": end_user.other_name, }, - 'memory_num': memory_nums_map.get(user_id, {"total": 0}), - 'memory_config': { + "memory_num": {"total": memory_total}, + "memory_config": { "memory_config_id": config_info.get("memory_config_id"), - "memory_config_name": config_info.get("memory_config_name") - } + "memory_config_name": config_info.get("memory_config_name"), + }, }) # 触发社区聚类补全任务(异步,不阻塞接口响应) @@ -407,6 +396,7 @@ def get_current_user_rag_total_num( total_chunk = memory_dashboard_service.get_current_user_total_chunk(end_user_id, db, current_user) return success(data=total_chunk, msg="宿主RAG知识数据获取成功") + @router.get("/rag_content", response_model=ApiResponse) def get_rag_content( end_user_id: str = Query(..., description="宿主ID"), diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 3b0ea1ee..9b0be9c8 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -313,6 +313,9 @@ async def write( except Exception as cache_err: logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) + #同步neo4j记忆节点总数到pgsql,end_user表的memory_count字段 + await _sync_memory_count_after_write(end_user_id) + # Close LLM/Embedder underlying httpx clients to prevent # 'RuntimeError: Event loop is closed' during garbage collection for client_obj in (llm_client, embedder_client): @@ -331,3 +334,49 @@ async def write( logger.info("=== Pipeline Complete ===") logger.info(f"Total execution time: {total_time:.2f} seconds") + + +async def _sync_memory_count_after_write(end_user_id: str) -> None: + """ + 记忆写入完成后,查 Neo4j 全量节点数,绝对值同步到 PostgreSQL end_user 表的 memory_count 字段 + + 不使用增量累加: + - Neo4j 写入使用 MERGE 语义,节点列表长度不等于新增节点数。 + - 重试或重复写入可能匹配已有节点。 + - 绝对值覆盖可以避免越加越大的计数漂移。 + """ + if not end_user_id: + return + + try: + from app.models.end_user_model import EndUser + from app.repositories.memory_config_repository import MemoryConfigRepository + + connector = Neo4jConnector() + try: + result = await connector.execute_query( + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=[end_user_id], + ) + node_count = int(result[0]["total"]) if result else 0 + finally: + await connector.close() + + with get_db_context() as db: + db.query(EndUser).filter( + EndUser.id == uuid.UUID(end_user_id) + ).update( + {"memory_count": node_count}, + synchronize_session=False, + ) + db.commit() + + logger.info( + f"[MemoryCount] 写入后同步 memory_count: " + f"end_user_id={end_user_id}, count={node_count}" + ) + except Exception as e: + logger.warning( + f"[MemoryCount] 写入后同步 memory_count 失败(不影响主流程): {e}", + exc_info=True, + ) \ No newline at end of file diff --git a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py index 072d587c..acd436c7 100644 --- a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py +++ b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py @@ -145,7 +145,8 @@ class ForgettingScheduler: } logger.info("没有可遗忘的节点对,遗忘周期结束") - + # 同步 Neo4j 记忆节点总数到 PostgreSQL的 end_user 表的 memory_count 字段 + await self._sync_memory_count_to_mysql(end_user_id) return report # 步骤3:按激活值排序(激活值最低的优先) @@ -302,7 +303,8 @@ class ForgettingScheduler: f"({reduction_rate:.2%}), " f"耗时 {duration:.2f} 秒" ) - + # 同步 Neo4j 记忆节点总数到 PostgreSQL的 end_user 表的 memory_count 字段 + await self._sync_memory_count_to_mysql(end_user_id) return report except Exception as e: @@ -350,3 +352,48 @@ class ForgettingScheduler: if results: return results[0]['total'] return 0 + + async def _sync_memory_count_to_mysql( + self, + end_user_id: Optional[str] = None, + ) -> None: + """ + 遗忘周期结束后,用 SEARCH_FOR_ALL_BATCH 口径查全量节点数, + 同步到 PostgreSQL end_users.memory_count。 + + 不复用 _count_knowledge_nodes: + - _count_knowledge_nodes 只统计 Statement、ExtractedEntity、MemorySummary。 + - 宿主列表需要统计该 end_user_id 下全部 Neo4j 节点。 + """ + if not end_user_id: + return + + try: + from app.db import get_db_context + from app.models.end_user_model import EndUser + from app.repositories.memory_config_repository import MemoryConfigRepository + + result = await self.connector.execute_query( + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=[end_user_id], + ) + node_count = int(result[0]["total"]) if result else 0 + + with get_db_context() as db: + db.query(EndUser).filter( + EndUser.id == UUID(end_user_id) + ).update( + {"memory_count": node_count}, + synchronize_session=False, + ) + db.commit() + + logger.info( + f"[MemoryCount] 遗忘后同步 memory_count: " + f"end_user_id={end_user_id}, count={node_count}" + ) + except Exception as e: + logger.warning( + f"[MemoryCount] 遗忘后同步 memory_count 失败(不影响主流程): {e}", + exc_info=True, + ) diff --git a/api/app/models/end_user_model.py b/api/app/models/end_user_model.py index ff46786a..952d58eb 100644 --- a/api/app/models/end_user_model.py +++ b/api/app/models/end_user_model.py @@ -1,7 +1,7 @@ import datetime import uuid -from sqlalchemy import Column, DateTime, ForeignKey, String, Text +from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import relationship @@ -38,6 +38,15 @@ class EndUser(Base): comment="关联的记忆配置ID" ) + memory_count = Column( + Integer, + nullable=False, + default=0, + server_default="0", + index=True, + comment="记忆节点总数", + ) + # 用户摘要四个维度 - User Summary Four Dimensions user_summary = Column(Text, nullable=True, comment="缓存的用户摘要(基本介绍)") personality_traits = Column(Text, nullable=True, comment="性格特点") diff --git a/api/app/schemas/end_user_schema.py b/api/app/schemas/end_user_schema.py index c2498203..94d2e5dd 100644 --- a/api/app/schemas/end_user_schema.py +++ b/api/app/schemas/end_user_schema.py @@ -19,4 +19,6 @@ class EndUser(BaseModel): # 用户摘要和洞察更新时间 user_summary_updated_at: Optional[datetime.datetime] = Field(description="用户摘要最后更新时间", default=None) - memory_insight_updated_at: Optional[datetime.datetime] = Field(description="洞察报告最后更新时间", default=None) \ No newline at end of file + memory_insight_updated_at: Optional[datetime.datetime] = Field(description="洞察报告最后更新时间", default=None) + #用户记忆节点总数(Neo4j模式) + memory_count: int = Field(description="记忆节点总数", default=0) \ No newline at end of file diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index aaf9ac6d..6ce793a1 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -1,5 +1,5 @@ from sqlalchemy.orm import Session -from sqlalchemy import desc, nullslast, or_, and_, cast, String +from sqlalchemy import desc, nullslast, or_, and_, cast, String, func from typing import List, Optional, Dict, Any import uuid from fastapi import HTTPException @@ -102,6 +102,7 @@ def get_workspace_end_users_paginated( """获取工作空间的宿主列表(分页版本,支持模糊搜索) 返回结果按 created_at 从新到旧排序(NULL 值排在最后) + 固定过滤 memory_count > 0 的宿主,保证分页基于“有记忆宿主”集合计算。 支持通过 keyword 参数同时模糊搜索 other_name 和 id 字段 Args: @@ -120,7 +121,8 @@ def get_workspace_end_users_paginated( try: # 构建基础查询 base_query = db.query(EndUserModel).filter( - EndUserModel.workspace_id == workspace_id + EndUserModel.workspace_id == workspace_id, + EndUserModel.memory_count > 0 , # 只查询有记忆的宿主 ) # 构建搜索条件(过滤空字符串和None) @@ -169,6 +171,104 @@ def get_workspace_end_users_paginated( business_logger.error(f"获取工作空间宿主列表(分页)失败: workspace_id={workspace_id} - {str(e)}") raise +def get_workspace_end_users_paginated_rag( + db: Session, + workspace_id: uuid.UUID, + current_user: User, + page: int, + pagesize: int, + keyword: Optional[str] = None, +) -> Dict[str, Any]: + """RAG 模式宿主列表分页。 + + RAG 记忆数量以 documents.chunk_num 为准: + - file_name = end_user_id + ".txt" + - 只统计当前 workspace 下 permission_id="Memory" 的用户记忆知识库 + - 在 SQL 层过滤 chunk 总数为 0 的宿主,保证分页准确 + """ + business_logger.info( + f"获取 RAG 宿主列表(分页): workspace_id={workspace_id}, " + f"keyword={keyword}, page={page}, pagesize={pagesize}, 操作者: {current_user.username}" + ) + + try: + from app.models.document_model import Document + from app.models.knowledge_model import Knowledge + + chunk_subquery = ( + db.query( + Document.file_name.label("file_name"), + func.coalesce(func.sum(Document.chunk_num), 0).label("memory_count"), + ) + .join(Knowledge, Document.kb_id == Knowledge.id) + .filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 1, + Knowledge.permission_id == "Memory", + Document.status == 1, + ) + .group_by(Document.file_name) + .subquery() + ) + + base_query = ( + db.query( + EndUserModel, + chunk_subquery.c.memory_count.label("memory_count"), + ) + .join( + chunk_subquery, + chunk_subquery.c.file_name == func.concat(cast(EndUserModel.id, String), ".txt"), + ) + .filter( + EndUserModel.workspace_id == workspace_id, + chunk_subquery.c.memory_count > 0, + ) + ) + + keyword = keyword.strip() if keyword else None + if keyword: + keyword_pattern = f"%{keyword}%" + base_query = base_query.filter( + or_( + EndUserModel.other_name.ilike(keyword_pattern), + and_( + or_( + EndUserModel.other_name.is_(None), + EndUserModel.other_name == "", + ), + cast(EndUserModel.id, String).ilike(keyword_pattern), + ), + ) + ) + + total = base_query.count() + if total == 0: + business_logger.info("RAG 模式下没有符合条件的宿主") + return {"items": [], "total": 0} + + rows = base_query.order_by( + nullslast(desc(EndUserModel.created_at)), + desc(EndUserModel.id), + ).offset((page - 1) * pagesize).limit(pagesize).all() + + items = [] + for end_user_orm, memory_count in rows: + items.append({ + "end_user": EndUserSchema.model_validate(end_user_orm), + "memory_count": int(memory_count or 0), + }) + + business_logger.info(f"成功获取 RAG 宿主记录 {len(items)} 条,总计 {total} 条") + return {"items": items, "total": total} + + except HTTPException: + raise + except Exception as e: + business_logger.error( + f"获取 RAG 宿主列表(分页)失败: workspace_id={workspace_id} - {str(e)}" + ) + raise def get_workspace_memory_increment( db: Session, From c57490a063632d7f355d764897cb5d683ccf0f73 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Wed, 29 Apr 2026 16:35:46 +0800 Subject: [PATCH 3/7] fix(migration): move memory count revision to latest head --- api/migrations/versions/c87c9cdb52c4_202604281114.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/migrations/versions/c87c9cdb52c4_202604281114.py b/api/migrations/versions/c87c9cdb52c4_202604281114.py index 78d4c461..5e529d97 100644 --- a/api/migrations/versions/c87c9cdb52c4_202604281114.py +++ b/api/migrations/versions/c87c9cdb52c4_202604281114.py @@ -1,7 +1,7 @@ """202604281114 Revision ID: c87c9cdb52c4 -Revises: 4e89970f9e7c +Revises: e2d60c6d1a1a Create Date: 2026-04-28 11:13:02.441905 """ @@ -12,7 +12,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. revision: str = 'c87c9cdb52c4' -down_revision: Union[str, None] = '4e89970f9e7c' +down_revision: Union[str, None] = 'e2d60c6d1a1a' branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None From 89bdb9f4b54dfec9f59fcf934f2e8b88a730b8a8 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Wed, 29 Apr 2026 16:38:11 +0800 Subject: [PATCH 4/7] fix(memory): allow end user id keyword search - Match keyword against end_user_id even when other_name exists - Keep Neo4j and RAG end user list search behavior consistent --- .../forgetting_engine/forgetting_scheduler.py | 2 +- api/app/services/memory_dashboard_service.py | 19 +++---------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py index acd436c7..8432c4dd 100644 --- a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py +++ b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py @@ -353,7 +353,7 @@ class ForgettingScheduler: return results[0]['total'] return 0 - async def _sync_memory_count_to_mysql( + async def _sync_memory_count_to_db( self, end_user_id: Optional[str] = None, ) -> None: diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 6ce793a1..64874caf 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -130,20 +130,13 @@ def get_workspace_end_users_paginated( if keyword: keyword_pattern = f"%{keyword}%" - # other_name 匹配始终生效;id 匹配仅对 other_name 为空的记录生效 base_query = base_query.filter( or_( EndUserModel.other_name.ilike(keyword_pattern), - and_( - or_( - EndUserModel.other_name.is_(None), - EndUserModel.other_name == "", - ), - cast(EndUserModel.id, String).ilike(keyword_pattern), - ), + cast(EndUserModel.id, String).ilike(keyword_pattern), ) ) - business_logger.info(f"应用模糊搜索: keyword={keyword}(匹配 other_name;other_name 为空时匹配 id)") + business_logger.info(f"应用模糊搜索: keyword={keyword}(匹配 other_name 或 id)") # 获取总记录数 total = base_query.count() @@ -232,13 +225,7 @@ def get_workspace_end_users_paginated_rag( base_query = base_query.filter( or_( EndUserModel.other_name.ilike(keyword_pattern), - and_( - or_( - EndUserModel.other_name.is_(None), - EndUserModel.other_name == "", - ), - cast(EndUserModel.id, String).ilike(keyword_pattern), - ), + cast(EndUserModel.id, String).ilike(keyword_pattern), ) ) From 1d73c9e5a825f6d51d9ea001dd9ad28236934b62 Mon Sep 17 00:00:00 2001 From: xrzs <92666317+lm041520@users.noreply.github.com> Date: Wed, 29 Apr 2026 17:46:48 +0800 Subject: [PATCH 5/7] chore(migration): remove memory count revision --- .../versions/c87c9cdb52c4_202604281114.py | 140 ------------------ 1 file changed, 140 deletions(-) delete mode 100644 api/migrations/versions/c87c9cdb52c4_202604281114.py diff --git a/api/migrations/versions/c87c9cdb52c4_202604281114.py b/api/migrations/versions/c87c9cdb52c4_202604281114.py deleted file mode 100644 index 5e529d97..00000000 --- a/api/migrations/versions/c87c9cdb52c4_202604281114.py +++ /dev/null @@ -1,140 +0,0 @@ -"""202604281114 - -Revision ID: c87c9cdb52c4 -Revises: e2d60c6d1a1a -Create Date: 2026-04-28 11:13:02.441905 - -""" -from typing import Dict, List, Sequence, Union - -from alembic import op -import sqlalchemy as sa - -# revision identifiers, used by Alembic. -revision: str = 'c87c9cdb52c4' -down_revision: Union[str, None] = 'e2d60c6d1a1a' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - -BATCH_SIZE = 500 - - -def _chunked(values: List[str], size: int) -> List[List[str]]: - return [values[index:index + size] for index in range(0, len(values), size)] - - -def _load_neo4j_end_user_ids(connection) -> List[str]: - """加载所有需要从 Neo4j 同步 memory_count 的宿主。 - - RAG 工作空间的记忆数量以 documents.chunk_num 为准,不写入 end_users.memory_count。 - """ - rows = connection.execute(sa.text(""" - SELECT eu.id::text AS end_user_id - FROM end_users eu - JOIN workspaces w ON eu.workspace_id = w.id - WHERE w.storage_type IS NULL OR w.storage_type <> 'rag' - """)).all() - return [row[0] for row in rows] - - -async def _fetch_neo4j_counts(end_user_ids: List[str]) -> Dict[str, int]: - if not end_user_ids: - return {} - - from app.repositories.memory_config_repository import MemoryConfigRepository - from app.repositories.neo4j.neo4j_connector import Neo4jConnector - - connector = Neo4jConnector() - try: - result = await connector.execute_query( - MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, - end_user_ids=end_user_ids, - ) - finally: - await connector.close() - - counts = {str(row["user_id"]): int(row["total"]) for row in result} - for end_user_id in end_user_ids: - counts.setdefault(end_user_id, 0) - return counts - - -def _update_memory_counts(connection, counts: Dict[str, int]) -> int: - updated = 0 - for end_user_id, memory_count in counts.items(): - result = connection.execute( - sa.text(""" - UPDATE end_users - SET memory_count = :memory_count - WHERE id = CAST(:end_user_id AS uuid) - """), - { - "end_user_id": end_user_id, - "memory_count": memory_count, - }, - ) - updated += result.rowcount or 0 - return updated - - -def _sync_memory_count_from_neo4j() -> None: - """迁移时初始化 Neo4j 模式宿主的 memory_count。 - - """ - import asyncio - - print("[memory_count] 开始同步 Neo4j 模式宿主 memory_count") - connection = op.get_bind() - target_ids = _load_neo4j_end_user_ids(connection) - if not target_ids: - print("[memory_count] 没有需要同步的 Neo4j 模式宿主") - return - - print( - f"[memory_count] 待同步宿主数量: {len(target_ids)}, " - f"batch_size={BATCH_SIZE}" - ) - - total_updated = 0 - batches = _chunked(target_ids, BATCH_SIZE) - for batch_index, batch_ids in enumerate(batches, start=1): - print( - f"[memory_count] 正在查询 Neo4j: " - f"batch={batch_index}/{len(batches)}, size={len(batch_ids)}" - ) - counts = asyncio.run(_fetch_neo4j_counts(batch_ids)) - total_updated += _update_memory_counts(connection, counts) - print( - f"[memory_count] 已写入 PostgreSQL: " - f"updated={total_updated}/{len(target_ids)}" - ) - - print( - f"[memory_count] Neo4j 模式宿主同步完成: " - f"total={len(target_ids)}, updated={total_updated}" - ) - - -def upgrade() -> None: - op.add_column( - 'end_users', - sa.Column( - 'memory_count', - sa.Integer(), - server_default='0', - nullable=False, - comment='记忆节点总数', - ), - ) - _sync_memory_count_from_neo4j() - op.create_index( - op.f('ix_end_users_memory_count'), - 'end_users', - ['memory_count'], - unique=False, - ) - - -def downgrade() -> None: - op.drop_index(op.f('ix_end_users_memory_count'), table_name='end_users') - op.drop_column('end_users', 'memory_count') From f86c023477adf97c34e04d9813e721f1404577c7 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Wed, 29 Apr 2026 18:04:14 +0800 Subject: [PATCH 6/7] fix(memory): call renamed memory count sync method - Update forgetting cycle call sites to use _sync_memory_count_to_db --- .../forgetting_engine/forgetting_scheduler.py | 4 ++-- api/app/services/memory_dashboard_service.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py index 8432c4dd..cad6a4db 100644 --- a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py +++ b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py @@ -146,7 +146,7 @@ class ForgettingScheduler: logger.info("没有可遗忘的节点对,遗忘周期结束") # 同步 Neo4j 记忆节点总数到 PostgreSQL的 end_user 表的 memory_count 字段 - await self._sync_memory_count_to_mysql(end_user_id) + await self._sync_memory_count_to_db(end_user_id) return report # 步骤3:按激活值排序(激活值最低的优先) @@ -304,7 +304,7 @@ class ForgettingScheduler: f"耗时 {duration:.2f} 秒" ) # 同步 Neo4j 记忆节点总数到 PostgreSQL的 end_user 表的 memory_count 字段 - await self._sync_memory_count_to_mysql(end_user_id) + await self._sync_memory_count_to_db(end_user_id) return report except Exception as e: diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 64874caf..6d0f0a73 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -1,5 +1,5 @@ from sqlalchemy.orm import Session -from sqlalchemy import desc, nullslast, or_, and_, cast, String, func +from sqlalchemy import desc, nullslast, or_, cast, String, func from typing import List, Optional, Dict, Any import uuid from fastapi import HTTPException From 80902eb79a7f755e2b312ac52e2b5b7f1e279eb2 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Wed, 29 Apr 2026 18:35:49 +0800 Subject: [PATCH 7/7] refactor(memory): extract memory count sync utility - Add shared utility for syncing end user memory_count from Neo4j --- .../core/memory/agent/utils/write_tools.py | 69 +++++----------- .../forgetting_engine/forgetting_scheduler.py | 82 ++++++++----------- .../core/memory/utils/memory_count_utils.py | 36 ++++++++ 3 files changed, 91 insertions(+), 96 deletions(-) create mode 100644 api/app/core/memory/utils/memory_count_utils.py diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 9b0be9c8..1dcc73b2 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -20,6 +20,7 @@ from app.core.memory.storage_services.extraction_engine.knowledge_extraction.mem memory_summary_generation from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.log.logging_utils import log_time +from app.core.memory.utils.memory_count_utils import sync_end_user_memory_count_from_neo4j from app.db import get_db_context from app.repositories.neo4j.add_edges import add_memory_summary_statement_edges from app.repositories.neo4j.add_nodes import add_memory_summary_nodes @@ -313,8 +314,27 @@ async def write( except Exception as cache_err: logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) - #同步neo4j记忆节点总数到pgsql,end_user表的memory_count字段 - await _sync_memory_count_after_write(end_user_id) + # 同步 Neo4j 记忆节点总数到 PostgreSQL end_users.memory_count + if end_user_id: + try: + memory_count_connector = Neo4jConnector() + try: + node_count = await sync_end_user_memory_count_from_neo4j( + end_user_id, + memory_count_connector, + ) + finally: + await memory_count_connector.close() + + logger.info( + f"[MemoryCount] 写入后同步 memory_count: " + f"end_user_id={end_user_id}, count={node_count}" + ) + except Exception as e: + logger.warning( + f"[MemoryCount] 写入后同步 memory_count 失败(不影响主流程): {e}", + exc_info=True, + ) # Close LLM/Embedder underlying httpx clients to prevent # 'RuntimeError: Event loop is closed' during garbage collection @@ -335,48 +355,3 @@ async def write( logger.info("=== Pipeline Complete ===") logger.info(f"Total execution time: {total_time:.2f} seconds") - -async def _sync_memory_count_after_write(end_user_id: str) -> None: - """ - 记忆写入完成后,查 Neo4j 全量节点数,绝对值同步到 PostgreSQL end_user 表的 memory_count 字段 - - 不使用增量累加: - - Neo4j 写入使用 MERGE 语义,节点列表长度不等于新增节点数。 - - 重试或重复写入可能匹配已有节点。 - - 绝对值覆盖可以避免越加越大的计数漂移。 - """ - if not end_user_id: - return - - try: - from app.models.end_user_model import EndUser - from app.repositories.memory_config_repository import MemoryConfigRepository - - connector = Neo4jConnector() - try: - result = await connector.execute_query( - MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, - end_user_ids=[end_user_id], - ) - node_count = int(result[0]["total"]) if result else 0 - finally: - await connector.close() - - with get_db_context() as db: - db.query(EndUser).filter( - EndUser.id == uuid.UUID(end_user_id) - ).update( - {"memory_count": node_count}, - synchronize_session=False, - ) - db.commit() - - logger.info( - f"[MemoryCount] 写入后同步 memory_count: " - f"end_user_id={end_user_id}, count={node_count}" - ) - except Exception as e: - logger.warning( - f"[MemoryCount] 写入后同步 memory_count 失败(不影响主流程): {e}", - exc_info=True, - ) \ No newline at end of file diff --git a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py index cad6a4db..39c9eed6 100644 --- a/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py +++ b/api/app/core/memory/storage_services/forgetting_engine/forgetting_scheduler.py @@ -20,6 +20,7 @@ from uuid import UUID from datetime import datetime from app.core.memory.storage_services.forgetting_engine.forgetting_strategy import ForgettingStrategy +from app.core.memory.utils.memory_count_utils import sync_end_user_memory_count_from_neo4j from app.repositories.neo4j.neo4j_connector import Neo4jConnector @@ -145,8 +146,22 @@ class ForgettingScheduler: } logger.info("没有可遗忘的节点对,遗忘周期结束") - # 同步 Neo4j 记忆节点总数到 PostgreSQL的 end_user 表的 memory_count 字段 - await self._sync_memory_count_to_db(end_user_id) + # 同步 Neo4j 记忆节点总数到 PostgreSQL 的 end_users.memory_count + if end_user_id: + try: + node_count = await sync_end_user_memory_count_from_neo4j( + end_user_id, + self.connector, + ) + logger.info( + f"[MemoryCount] 遗忘后同步 memory_count: " + f"end_user_id={end_user_id}, count={node_count}" + ) + except Exception as e: + logger.warning( + f"[MemoryCount] 遗忘后同步 memory_count 失败(不影响主流程): {e}", + exc_info=True, + ) return report # 步骤3:按激活值排序(激活值最低的优先) @@ -303,8 +318,22 @@ class ForgettingScheduler: f"({reduction_rate:.2%}), " f"耗时 {duration:.2f} 秒" ) - # 同步 Neo4j 记忆节点总数到 PostgreSQL的 end_user 表的 memory_count 字段 - await self._sync_memory_count_to_db(end_user_id) + # 同步 Neo4j 记忆节点总数到 PostgreSQL 的 end_users.memory_count + if end_user_id: + try: + node_count = await sync_end_user_memory_count_from_neo4j( + end_user_id, + self.connector, + ) + logger.info( + f"[MemoryCount] 遗忘后同步 memory_count: " + f"end_user_id={end_user_id}, count={node_count}" + ) + except Exception as e: + logger.warning( + f"[MemoryCount] 遗忘后同步 memory_count 失败(不影响主流程): {e}", + exc_info=True, + ) return report except Exception as e: @@ -352,48 +381,3 @@ class ForgettingScheduler: if results: return results[0]['total'] return 0 - - async def _sync_memory_count_to_db( - self, - end_user_id: Optional[str] = None, - ) -> None: - """ - 遗忘周期结束后,用 SEARCH_FOR_ALL_BATCH 口径查全量节点数, - 同步到 PostgreSQL end_users.memory_count。 - - 不复用 _count_knowledge_nodes: - - _count_knowledge_nodes 只统计 Statement、ExtractedEntity、MemorySummary。 - - 宿主列表需要统计该 end_user_id 下全部 Neo4j 节点。 - """ - if not end_user_id: - return - - try: - from app.db import get_db_context - from app.models.end_user_model import EndUser - from app.repositories.memory_config_repository import MemoryConfigRepository - - result = await self.connector.execute_query( - MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, - end_user_ids=[end_user_id], - ) - node_count = int(result[0]["total"]) if result else 0 - - with get_db_context() as db: - db.query(EndUser).filter( - EndUser.id == UUID(end_user_id) - ).update( - {"memory_count": node_count}, - synchronize_session=False, - ) - db.commit() - - logger.info( - f"[MemoryCount] 遗忘后同步 memory_count: " - f"end_user_id={end_user_id}, count={node_count}" - ) - except Exception as e: - logger.warning( - f"[MemoryCount] 遗忘后同步 memory_count 失败(不影响主流程): {e}", - exc_info=True, - ) diff --git a/api/app/core/memory/utils/memory_count_utils.py b/api/app/core/memory/utils/memory_count_utils.py new file mode 100644 index 00000000..316cb635 --- /dev/null +++ b/api/app/core/memory/utils/memory_count_utils.py @@ -0,0 +1,36 @@ +from uuid import UUID + +from app.db import get_db_context +from app.models.end_user_model import EndUser +from app.repositories.memory_config_repository import MemoryConfigRepository +from app.repositories.neo4j.neo4j_connector import Neo4jConnector + + +async def sync_end_user_memory_count_from_neo4j( + end_user_id: str, + connector: Neo4jConnector, +) -> int: + """ + Sync one end user's Neo4j memory node count to PostgreSQL. + + The caller owns the Neo4j connector lifecycle. + """ + if not end_user_id: + return 0 + + result = await connector.execute_query( + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=[end_user_id], + ) + node_count = int(result[0]["total"]) if result else 0 + + with get_db_context() as db: + db.query(EndUser).filter( + EndUser.id == UUID(end_user_id) + ).update( + {"memory_count": node_count}, + synchronize_session=False, + ) + db.commit() + + return node_count