From 826776189033c5d6019574e58df31470ac682541 Mon Sep 17 00:00:00 2001 From: Ke Sun Date: Thu, 29 Jan 2026 21:00:09 +0800 Subject: [PATCH] feat(memory): add legacy int data format detection and workspace default fallback - Add .hypothesis/ to .gitignore for test framework artifacts - Remove outdated comment from EndUser model memory_config_id field - Update memory config extraction methods to return tuple with legacy format flag - Add detection for legacy int-formatted memory_config_id in Agent and Workflow configs - Implement workspace default memory config fallback when legacy int format detected - Add _get_workspace_default_memory_config_id method to retrieve default or earliest active config - Update return types from Optional[uuid.UUID] to Tuple[Optional[uuid.UUID], bool] for extraction methods - Add comprehensive logging for legacy format detection and fallback behavior - Improve backward compatibility for applications with old int-based memory configuration data --- .gitignore | 1 + api/app/models/end_user_model.py | 1 - api/app/services/app_service.py | 136 +++++++++++++++++++++----- api/app/services/workspace_service.py | 58 ++++++++++- 4 files changed, 171 insertions(+), 25 deletions(-) diff --git a/.gitignore b/.gitignore index de160688..2fcdbcd6 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ examples/ # Temporary outputs .DS_Store +.hypothesis/ time.log celerybeat-schedule.db search_results.json diff --git a/api/app/models/end_user_model.py b/api/app/models/end_user_model.py index eff17f7e..30b56fc5 100644 --- a/api/app/models/end_user_model.py +++ b/api/app/models/end_user_model.py @@ -21,7 +21,6 @@ class EndUser(Base): created_at = Column(DateTime, default=datetime.datetime.now) updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now) - # Memory config association - updated lazily during conversation memory_config_id = Column( UUID(as_uuid=True), ForeignKey("memory_config.config_id"), diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index eaa9bfd6..ffbd7fbf 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -1182,7 +1182,7 @@ class AppService: self, app_type: str, config: Dict[str, Any] - ) -> Optional[uuid.UUID]: + ) -> Tuple[Optional[uuid.UUID], bool]: """从发布配置中提取 memory_config_id(根据应用类型分发) Args: @@ -1190,7 +1190,9 @@ class AppService: config: 发布配置字典 Returns: - Optional[uuid.UUID]: 提取的 memory_config_id,如果不存在则返回 None + Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) + - memory_config_id: 提取的配置ID,如果不存在或为旧格式则返回 None + - is_legacy_int: 是否检测到旧格式 int 数据,需要回退到工作空间默认配置 """ if app_type == AppType.AGENT: return self._extract_memory_config_id_from_agent(config) @@ -1199,15 +1201,15 @@ class AppService: elif app_type == AppType.MULTI_AGENT: # Multi-agent 暂不支持记忆配置提取 logger.debug(f"多智能体应用暂不支持记忆配置提取: app_type={app_type}") - return None + return None, False else: logger.warning(f"不支持的应用类型,无法提取记忆配置: app_type={app_type}") - return None + return None, False def _extract_memory_config_id_from_agent( self, config: Dict[str, Any] - ) -> Optional[uuid.UUID]: + ) -> Tuple[Optional[uuid.UUID], bool]: """从 Agent 应用配置中提取 memory_config_id 路径: config.memory.memory_content @@ -1216,33 +1218,42 @@ class AppService: config: Agent 配置字典 Returns: - Optional[uuid.UUID]: 记忆配置ID,如果不存在则返回 None + Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) + - memory_config_id: 记忆配置ID,如果不存在或为旧格式则返回 None + - is_legacy_int: 是否检测到旧格式 int 数据 """ try: memory_content = config.get("memory", {}).get("memory_content") if memory_content: - # 处理字符串和 UUID 两种情况 - if isinstance(memory_content, str): - return uuid.UUID(memory_content) - elif isinstance(memory_content, uuid.UUID): - return memory_content + # 处理字符串、UUID 和 int(旧数据兼容)三种情况 + if isinstance(memory_content, uuid.UUID): + return memory_content, False + elif isinstance(memory_content, str): + return uuid.UUID(memory_content), False + elif isinstance(memory_content, int): + # 旧数据存储为 int,需要回退到工作空间默认配置 + logger.warning( + f"Agent 配置中 memory_content 为旧格式 int,将使用工作空间默认配置: " + f"value={memory_content}" + ) + return None, True else: logger.warning( f"Agent 配置中 memory_content 格式无效: type={type(memory_content)}, " f"value={memory_content}" ) - return None + return None, False except (ValueError, TypeError) as e: logger.warning( f"Agent 配置中 memory_content 格式无效: error={str(e)}, " f"memory_content={memory_content}" ) - return None + return None, False def _extract_memory_config_id_from_workflow( self, config: Dict[str, Any] - ) -> Optional[uuid.UUID]: + ) -> Tuple[Optional[uuid.UUID], bool]: """从 Workflow 应用配置中提取 memory_config_id 扫描工作流节点,查找 MemoryRead 或 MemoryWrite 节点。 @@ -1252,7 +1263,9 @@ class AppService: config: Workflow 配置字典 Returns: - Optional[uuid.UUID]: 记忆配置ID,如果不存在则返回 None + Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) + - memory_config_id: 记忆配置ID,如果不存在或为旧格式则返回 None + - is_legacy_int: 是否检测到旧格式 int 数据 """ nodes = config.get("nodes", []) @@ -1265,11 +1278,18 @@ class AppService: if config_id: try: - # 处理字符串和 UUID 两种情况 - if isinstance(config_id, str): - return uuid.UUID(config_id) - elif isinstance(config_id, uuid.UUID): - return config_id + # 处理字符串、UUID 和 int(旧数据兼容)三种情况 + if isinstance(config_id, uuid.UUID): + return config_id, False + elif isinstance(config_id, str): + return uuid.UUID(config_id), False + elif isinstance(config_id, int): + # 旧数据存储为 int,需要回退到工作空间默认配置 + logger.warning( + f"工作流记忆节点 config_id 为旧格式 int,将使用工作空间默认配置: " + f"node_id={node.get('id')}, node_type={node_type}, value={config_id}" + ) + return None, True else: logger.warning( f"工作流记忆节点 config_id 格式无效: node_id={node.get('id')}, " @@ -1282,7 +1302,57 @@ class AppService: ) logger.debug("工作流配置中未找到记忆节点") - return None + return None, False + + def _get_workspace_default_memory_config_id( + self, + workspace_id: uuid.UUID + ) -> Optional[uuid.UUID]: + """获取工作空间的默认记忆配置ID + + Args: + workspace_id: 工作空间ID + + Returns: + Optional[uuid.UUID]: 默认记忆配置ID,如果不存在则返回 None + """ + from app.models.memory_config_model import MemoryConfig as MemoryConfigModel + + # 查找工作空间的默认记忆配置 + stmt = ( + select(MemoryConfigModel.id) + .where( + MemoryConfigModel.workspace_id == workspace_id, + MemoryConfigModel.is_default.is_(True), + MemoryConfigModel.state.is_(True), + ) + .limit(1) + ) + + config_id = self.db.scalars(stmt).first() + + if config_id: + return config_id + + # 回退:获取最早创建的活跃配置 + stmt = ( + select(MemoryConfigModel.id) + .where( + MemoryConfigModel.workspace_id == workspace_id, + MemoryConfigModel.state.is_(True), + ) + .order_by(MemoryConfigModel.created_at.asc()) + .limit(1) + ) + + config_id = self.db.scalars(stmt).first() + + if not config_id: + logger.warning( + f"工作空间没有可用的记忆配置: workspace_id={workspace_id}" + ) + + return config_id def _update_endusers_memory_config( self, @@ -1451,7 +1521,17 @@ class AppService: self.db.flush() # 先 flush,确保 release 已插入数据库 # 提取记忆配置ID并更新终端用户 - memory_config_id = self._extract_memory_config_id(app.type, config) + memory_config_id, is_legacy_int = self._extract_memory_config_id(app.type, config) + + # 如果检测到旧格式 int 数据,回退到工作空间默认配置 + if is_legacy_int and not memory_config_id: + memory_config_id = self._get_workspace_default_memory_config_id(app.workspace_id) + if memory_config_id: + logger.info( + f"发布时使用工作空间默认记忆配置(旧数据兼容): app_id={app_id}, " + f"workspace_id={app.workspace_id}, memory_config_id={memory_config_id}" + ) + if memory_config_id: updated_count = self._update_endusers_memory_config(app_id, memory_config_id) logger.info( @@ -1575,7 +1655,17 @@ class AppService: raise ResourceNotFoundException("发布版本", f"app_id={app_id}, version={version}") # 提取记忆配置ID并更新终端用户 - memory_config_id = self._extract_memory_config_id(release.type, release.config) + memory_config_id, is_legacy_int = self._extract_memory_config_id(release.type, release.config) + + # 如果检测到旧格式 int 数据,回退到工作空间默认配置 + if is_legacy_int and not memory_config_id: + memory_config_id = self._get_workspace_default_memory_config_id(app.workspace_id) + if memory_config_id: + logger.info( + f"回滚时使用工作空间默认记忆配置(旧数据兼容): app_id={app_id}, " + f"workspace_id={app.workspace_id}, memory_config_id={memory_config_id}" + ) + if memory_config_id: updated_count = self._update_endusers_memory_config(app_id, memory_config_id) logger.info( diff --git a/api/app/services/workspace_service.py b/api/app/services/workspace_service.py index eb0fe46b..43116b34 100644 --- a/api/app/services/workspace_service.py +++ b/api/app/services/workspace_service.py @@ -87,9 +87,26 @@ def delete_workspace_member( def get_user_workspaces(db: Session, user: User) -> List[Workspace]: - """获取当前用户参与的所有工作空间""" + """获取当前用户参与的所有工作空间 + + For neo4j storage type workspaces, ensures each has a default memory config. + If a workspace is missing a default config, one will be created automatically. + + Args: + db: Database session + user: Current user + + Returns: + List[Workspace]: List of workspaces the user belongs to + """ business_logger.debug(f"获取用户工作空间列表: {user.username} (ID: {user.id})") workspaces = workspace_repository.get_workspaces_by_user(db=db, user_id=user.id) + + # Ensure each neo4j workspace has a default memory config + for workspace in workspaces: + if workspace.storage_type == 'neo4j': + _ensure_default_memory_config(db, workspace) + business_logger.info(f"用户 {user.username} 的工作空间数量: {len(workspaces)}") return workspaces @@ -879,6 +896,45 @@ def update_workspace_models_configs( raise BusinessException(f"更新模型配置失败: {str(e)}", BizCode.INTERNAL_ERROR) +def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None: + """Ensure a workspace has a default memory config, creating one if missing. + + Args: + db: Database session + workspace: The workspace to check + """ + from app.models.memory_config_model import MemoryConfig + + # Check if default config exists for this workspace + existing_default = db.query(MemoryConfig).filter( + MemoryConfig.workspace_id == workspace.id, + MemoryConfig.is_default == True + ).first() + + if existing_default: + return + + # No default config exists, create one + business_logger.info( + f"Workspace {workspace.id} missing default memory config, creating one" + ) + + try: + _create_default_memory_config( + db=db, + workspace_id=workspace.id, + workspace_name=workspace.name, + llm_id=uuid.UUID(workspace.llm) if workspace.llm else None, + embedding_id=uuid.UUID(workspace.embedding) if workspace.embedding else None, + rerank_id=uuid.UUID(workspace.rerank) if workspace.rerank else None, + ) + except Exception as e: + business_logger.error( + f"Failed to create default memory config for workspace {workspace.id}: {str(e)}" + ) + # Don't fail the workspace list operation if config creation fails + + def _create_default_memory_config( db: Session, workspace_id: uuid.UUID,