From b16c9d53efad9562c404af51c07bb438df61b0b6 Mon Sep 17 00:00:00 2001 From: Ke Sun Date: Mon, 9 Feb 2026 17:28:42 +0800 Subject: [PATCH] refactor(memory): consolidate memory config extraction and remove unused validator - Add workspace default LLM fallback for emotion model in extraction orchestrator - Consolidate memory config ID extraction logic into MemoryConfigService - Remove duplicate extraction methods from AppService (_extract_memory_config_id_from_agent, _extract_memory_config_id_from_workflow) - Remove unused validate_embedding_model function from validators - Simplify AppService by delegating memory config extraction to MemoryConfigService - Update validator exports to remove validate_embedding_model - Improve code maintainability by centralizing memory configuration logic --- .../extraction_orchestrator.py | 18 +- api/app/core/validators/__init__.py | 2 - .../validators/memory_config_validators.py | 53 --- .../repositories/memory_config_repository.py | 92 ++++ api/app/services/app_service.py | 126 +---- api/app/services/memory_agent_service.py | 198 +++++++- api/app/services/memory_config_service.py | 442 +++++++++++++----- api/app/services/memory_reflection_service.py | 33 +- api/app/services/workspace_service.py | 104 ++++- 9 files changed, 736 insertions(+), 332 deletions(-) diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index 711e7a94..a47497da 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -645,9 +645,25 @@ class ExtractionOrchestrator: logger.info(f"总陈述句: {total_statements}, 用户陈述句: {filtered_statements}, 开始全局并行提取情绪") # 初始化情绪提取服务 + # 如果 emotion_model_id 为空,回退到工作空间默认 LLM from app.services.emotion_extraction_service import EmotionExtractionService + + emotion_model_id = memory_config.emotion_model_id + if not emotion_model_id and memory_config.workspace_id: + from app.repositories.workspace_repository import get_workspace_models_configs + from app.db import SessionLocal + + db = SessionLocal() + try: + workspace_models = get_workspace_models_configs(db, memory_config.workspace_id) + if workspace_models and workspace_models.get("llm"): + emotion_model_id = workspace_models["llm"] + logger.info(f"emotion_model_id 为空,使用工作空间默认 LLM: {emotion_model_id}") + finally: + db.close() + emotion_service = EmotionExtractionService( - llm_id=memory_config.emotion_model_id if memory_config.emotion_model_id else None + llm_id=emotion_model_id if emotion_model_id else None ) # 全局并行处理所有陈述句 diff --git a/api/app/core/validators/__init__.py b/api/app/core/validators/__init__.py index 1fadc869..f0b1c415 100644 --- a/api/app/core/validators/__init__.py +++ b/api/app/core/validators/__init__.py @@ -4,7 +4,6 @@ Validators package for various validation utilities. from app.core.validators.file_validator import FileValidator, ValidationResult from app.core.validators.memory_config_validators import ( validate_and_resolve_model_id, - validate_embedding_model, validate_llm_model, validate_model_exists_and_active, ) @@ -16,6 +15,5 @@ __all__ = [ # Memory config validators "validate_model_exists_and_active", "validate_and_resolve_model_id", - "validate_embedding_model", "validate_llm_model", ] diff --git a/api/app/core/validators/memory_config_validators.py b/api/app/core/validators/memory_config_validators.py index ba26c5f2..aea26892 100644 --- a/api/app/core/validators/memory_config_validators.py +++ b/api/app/core/validators/memory_config_validators.py @@ -6,7 +6,6 @@ This module provides validation functions for memory configuration models. Functions: validate_model_exists_and_active: Validate model exists and is active validate_and_resolve_model_id: Validate and resolve model ID with DB lookup - validate_embedding_model: Validate embedding model availability validate_llm_model: Validate LLM model availability """ @@ -203,58 +202,6 @@ def validate_and_resolve_model_id( return model_uuid, model_name -def validate_embedding_model( - config_id: UUID, - embedding_id: Union[str, UUID, None], - db: Session, - tenant_id: Optional[UUID] = None, - workspace_id: Optional[UUID] = None -) -> tuple[UUID, str]: - """Validate that embedding model is available and return its UUID and name. - - Returns: - Tuple of (embedding_uuid, embedding_name) - - Raises: - InvalidConfigError: If embedding_id is not provided or invalid - ModelNotFoundError: If embedding model does not exist - ModelInactiveError: If embedding model is inactive - """ - if embedding_id is None or (isinstance(embedding_id, str) and not embedding_id.strip()): - raise InvalidConfigError( - f"Configuration {config_id} has no embedding model configured", - field_name="embedding_model_id", - invalid_value=embedding_id, - config_id=config_id, - workspace_id=workspace_id - ) - - embedding_uuid, embedding_name = validate_and_resolve_model_id( - embedding_id, "embedding", db, tenant_id, required=True, - config_id=config_id, workspace_id=workspace_id - ) - - logger.debug( - "Embedding model validated", - extra={ - "embedding_uuid": str(embedding_uuid), - "embedding_name": embedding_name, - "config_id": config_id - } - ) - - if embedding_uuid is None: - raise InvalidConfigError( - f"Configuration {config_id} has no embedding model configured", - field_name="embedding_model_id", - invalid_value=embedding_id, - config_id=config_id, - workspace_id=workspace_id - ) - - return embedding_uuid, embedding_name - - def validate_llm_model( config_id: UUID, llm_id: Union[str, UUID, None], diff --git a/api/app/repositories/memory_config_repository.py b/api/app/repositories/memory_config_repository.py index 68e7cb04..2dae51ef 100644 --- a/api/app/repositories/memory_config_repository.py +++ b/api/app/repositories/memory_config_repository.py @@ -715,3 +715,95 @@ class MemoryConfigRepository: db_logger.error(f"删除记忆配置失败: config_id={config_id} - {str(e)}") raise + @staticmethod + def get_workspace_default(db: Session, workspace_id: uuid.UUID) -> Optional[MemoryConfig]: + """获取工作空间的默认记忆配置 + + 优先返回标记为默认的配置,如果没有则返回最早创建的活跃配置。 + + Args: + db: 数据库会话 + workspace_id: 工作空间ID + + Returns: + Optional[MemoryConfig]: 默认配置对象,不存在则返回None + """ + db_logger.debug(f"查询工作空间默认配置: workspace_id={workspace_id}") + + try: + # 优先查找显式标记为默认的配置 + stmt = ( + select(MemoryConfig) + .where( + MemoryConfig.workspace_id == workspace_id, + MemoryConfig.is_default.is_(True), + MemoryConfig.state.is_(True), + ) + .limit(1) + ) + + config = db.scalars(stmt).first() + + if config: + db_logger.debug(f"找到默认配置: config_id={config.config_id}") + return config + + # 回退:获取最早创建的活跃配置 + stmt = ( + select(MemoryConfig) + .where( + MemoryConfig.workspace_id == workspace_id, + MemoryConfig.state.is_(True), + ) + .order_by(MemoryConfig.created_at.asc()) + .limit(1) + ) + + config = db.scalars(stmt).first() + + if config: + db_logger.debug(f"使用最早创建的配置作为默认: config_id={config.config_id}") + else: + db_logger.warning(f"工作空间没有活跃的记忆配置: workspace_id={workspace_id}") + + return config + + except Exception as e: + db_logger.error(f"查询工作空间默认配置失败: workspace_id={workspace_id} - {str(e)}") + raise + + @staticmethod + def get_with_fallback( + db: Session, + config_id: Optional[uuid.UUID], + workspace_id: uuid.UUID + ) -> Optional[MemoryConfig]: + """获取记忆配置,支持回退到工作空间默认配置 + + 如果 config_id 为 None 或配置不存在,则回退到工作空间默认配置。 + + Args: + db: 数据库会话 + config_id: 配置ID(可为None) + workspace_id: 工作空间ID,用于回退查询 + + Returns: + Optional[MemoryConfig]: 配置对象,如果都不存在则返回None + """ + db_logger.debug(f"查询配置(支持回退): config_id={config_id}, workspace_id={workspace_id}") + + if not config_id: + db_logger.debug("config_id 为空,使用工作空间默认配置") + return MemoryConfigRepository.get_workspace_default(db, workspace_id) + + config = db.get(MemoryConfig, config_id) + + if config: + return config + + db_logger.warning( + f"配置不存在,回退到工作空间默认配置: missing_config_id={config_id}, workspace_id={workspace_id}" + ) + + return MemoryConfigRepository.get_workspace_default(db, workspace_id) + diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index 71bf50f7..f3c6260a 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -1193,7 +1193,7 @@ class AppService: app_type: str, config: Dict[str, Any] ) -> Tuple[Optional[uuid.UUID], bool]: - """从发布配置中提取 memory_config_id(根据应用类型分发) + """从发布配置中提取 memory_config_id(委托给 MemoryConfigService) Args: app_type: 应用类型 (agent, workflow, multi_agent) @@ -1204,128 +1204,10 @@ class AppService: - memory_config_id: 提取的配置ID,如果不存在或为旧格式则返回 None - is_legacy_int: 是否检测到旧格式 int 数据,需要回退到工作空间默认配置 """ - if app_type == AppType.AGENT: - return self._extract_memory_config_id_from_agent(config) - elif app_type == AppType.WORKFLOW: - return self._extract_memory_config_id_from_workflow(config) - elif app_type == AppType.MULTI_AGENT: - # Multi-agent 暂不支持记忆配置提取 - logger.debug(f"多智能体应用暂不支持记忆配置提取: app_type={app_type}") - return None, False - else: - logger.warning(f"不支持的应用类型,无法提取记忆配置: app_type={app_type}") - return None, False - - def _extract_memory_config_id_from_agent( - self, - config: Dict[str, Any] - ) -> Tuple[Optional[uuid.UUID], bool]: - """从 Agent 应用配置中提取 memory_config_id + from app.services.memory_config_service import MemoryConfigService - 路径: config.memory.memory_content - - Args: - config: Agent 配置字典 - - Returns: - Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) - - memory_config_id: 记忆配置ID,如果不存在或为旧格式则返回 None - - is_legacy_int: 是否检测到旧格式 int 数据 - """ - try: - memory_dict = config.get("memory", {}) - # Support both field names: memory_config_id (new) and memory_content (legacy) - memory_value = memory_dict.get("memory_config_id") or memory_dict.get("memory_content") - logger.info(f"Extracting memory_config_id: memory_value={memory_value}, type={type(memory_value).__name__ if memory_value else 'None'}") - if memory_value: - # 处理字符串、UUID 和 int(旧数据兼容)三种情况 - if isinstance(memory_value, uuid.UUID): - return memory_value, False - elif isinstance(memory_value, str): - # Check if it's a numeric string (legacy int format) - if memory_value.isdigit(): - logger.warning( - f"Agent 配置中 memory_config_id 为旧格式 int 字符串,将使用工作空间默认配置: " - f"value={memory_value}" - ) - return None, True - try: - return uuid.UUID(memory_value), False - except ValueError: - logger.warning(f"Invalid UUID string: {memory_value}") - return None, False - elif isinstance(memory_value, int): - # 旧数据存储为 int,需要回退到工作空间默认配置 - logger.warning( - f"Agent 配置中 memory_config_id 为旧格式 int,将使用工作空间默认配置: " - f"value={memory_value}" - ) - return None, True - else: - logger.warning( - f"Agent 配置中 memory_config_id 格式无效: type={type(memory_value)}, " - f"value={memory_value}" - ) - return None, False - except (ValueError, TypeError) as e: - logger.warning( - f"Agent 配置中 memory_config_id 格式无效: error={str(e)}" - ) - return None, False - - def _extract_memory_config_id_from_workflow( - self, - config: Dict[str, Any] - ) -> Tuple[Optional[uuid.UUID], bool]: - """从 Workflow 应用配置中提取 memory_config_id - - 扫描工作流节点,查找 MemoryRead 或 MemoryWrite 节点。 - 返回第一个找到的记忆节点的 config_id。 - - Args: - config: Workflow 配置字典 - - Returns: - Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) - - memory_config_id: 记忆配置ID,如果不存在或为旧格式则返回 None - - is_legacy_int: 是否检测到旧格式 int 数据 - """ - nodes = config.get("nodes", []) - - for node in nodes: - node_type = node.get("type", "") - - # 检查是否为记忆节点 (support both formats: memory-read/memory-write and MemoryRead/MemoryWrite) - if node_type.lower() in ["memoryread", "memorywrite", "memory-read", "memory-write"]: - config_id = node.get("config", {}).get("config_id") - - if config_id: - try: - # 处理字符串、UUID 和 int(旧数据兼容)三种情况 - if isinstance(config_id, uuid.UUID): - return config_id, False - elif isinstance(config_id, str): - return uuid.UUID(config_id), False - elif isinstance(config_id, int): - # 旧数据存储为 int,需要回退到工作空间默认配置 - logger.warning( - f"工作流记忆节点 config_id 为旧格式 int,将使用工作空间默认配置: " - f"node_id={node.get('id')}, node_type={node_type}, value={config_id}" - ) - return None, True - else: - logger.warning( - f"工作流记忆节点 config_id 格式无效: node_id={node.get('id')}, " - f"node_type={node_type}, type={type(config_id)}" - ) - except (ValueError, TypeError) as e: - logger.warning( - f"工作流记忆节点 config_id 格式无效: node_id={node.get('id')}, " - f"node_type={node_type}, error={str(e)}" - ) - - logger.debug("工作流配置中未找到记忆节点") - return None, False + service = MemoryConfigService(self.db) + return service.extract_memory_config_id(app_type, config) def _get_workspace_default_memory_config_id( self, diff --git a/api/app/services/memory_agent_service.py b/api/app/services/memory_agent_service.py index aa8caf02..da8a8e06 100644 --- a/api/app/services/memory_agent_service.py +++ b/api/app/services/memory_agent_service.py @@ -1191,8 +1191,8 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An """ 获取终端用户关联的记忆配置 - 使用 MemoryConfigService.get_config_with_fallback 获取配置, - 支持终端用户已分配配置和工作空间默认配置的回退机制。 + 兼容旧数据:如果 end_user.memory_config_id 为空,则从 AppRelease.config 中获取 + 并回填到 end_user.memory_config_id 字段(懒迁移)。 Args: end_user_id: 终端用户ID @@ -1204,7 +1204,13 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An Raises: ValueError: 当终端用户不存在或应用未发布时 """ + import json as json_module + import uuid + + from sqlalchemy import select + from app.models.app_model import App + from app.models.app_release_model import AppRelease from app.models.end_user_model import EndUser from app.services.memory_config_service import MemoryConfigService @@ -1217,6 +1223,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An raise ValueError(f"终端用户不存在: {end_user_id}") app_id = end_user.app_id + logger.debug(f"Found end_user app_id: {app_id}") # 2. 获取应用以确定 workspace_id app = db.query(App).filter(App.id == app_id).first() @@ -1228,10 +1235,71 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An logger.warning(f"No current release for app: {app_id}") raise ValueError(f"应用未发布: {app_id}") - # 3. 使用 get_config_with_fallback 获取记忆配置 + # 3. 兼容旧数据:如果 memory_config_id 为空,从 AppRelease.config 获取并回填 + memory_config_id_to_use = end_user.memory_config_id + + # 如果已有 memory_config_id,直接使用 + # 如果新创建enduser,enduser.memory_config_id 必定为none + # 那么使用从release中获取memory_config_id为预期行为,并且回填到 + # end_user.memory_config_id + if not memory_config_id_to_use: + logger.info(f"end_user.memory_config_id is None, migrating from AppRelease.config") + + # 获取最新发布版本 + stmt = ( + select(AppRelease) + .where(AppRelease.app_id == app_id, AppRelease.is_active.is_(True)) + .order_by(AppRelease.version.desc()) + ) + # TODO: change to current_release_id + latest_release = db.scalars(stmt).first() + + if latest_release: + config = latest_release.config or {} + + # 如果 config 是字符串,解析为字典 + if isinstance(config, str): + try: + config = json_module.loads(config) + except json_module.JSONDecodeError: + logger.warning(f"Failed to parse config JSON for release {latest_release.id}") + config = {} + + # 使用 MemoryConfigService 的提取方法 + memory_config_service = MemoryConfigService(db) + legacy_config_id, is_legacy_int = memory_config_service.extract_memory_config_id( + app_type=app.type, + config=config + ) + + if legacy_config_id: + # 验证提取的 config_id 是否存在于数据库中 + from app.models.memory_config_model import MemoryConfig as MemoryConfigModel + existing_config = db.get(MemoryConfigModel, legacy_config_id) + + if existing_config: + memory_config_id_to_use = legacy_config_id + + # 回填到 end_user 表(lazy update) + end_user.memory_config_id = memory_config_id_to_use + db.commit() + logger.info( + f"Migrated memory_config_id for end_user {end_user_id}: {memory_config_id_to_use}" + ) + else: + logger.warning( + f"Extracted memory_config_id does not exist, skipping backfill: " + f"end_user_id={end_user_id}, config_id={legacy_config_id}" + ) + elif is_legacy_int: + logger.info( + f"Legacy int config detected for end_user {end_user_id}, will use workspace default" + ) + + # 4. 使用 get_config_with_fallback 获取记忆配置 memory_config_service = MemoryConfigService(db) memory_config = memory_config_service.get_config_with_fallback( - memory_config_id=end_user.memory_config_id, + memory_config_id=memory_config_id_to_use, workspace_id=app.workspace_id ) @@ -1255,7 +1323,8 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) 使用与 get_end_user_connected_config 相同的逻辑: 1. 优先使用 end_user.memory_config_id - 2. 如果没有,回退到工作空间默认配置 + 2. 如果没有,尝试从 AppRelease.config 提取并回填 + 3. 如果仍然没有,回退到工作空间默认配置 Args: end_user_ids: 终端用户ID列表 @@ -1269,7 +1338,12 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) ... } """ + import json as json_module + + from sqlalchemy import select + from app.models.app_model import App + from app.models.app_release_model import AppRelease from app.models.end_user_model import EndUser from app.models.memory_config_model import MemoryConfig from app.services.memory_config_service import MemoryConfigService @@ -1284,7 +1358,8 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) # 1. 批量查询所有 end_user 及其 app_id 和 memory_config_id end_users = db.query(EndUser).filter(EndUser.id.in_(end_user_ids)).all() - # 创建映射 + # 创建映射 - 保留 EndUser 对象引用以便回填 + end_user_map = {str(eu.id): eu for eu in end_users} user_data = {str(eu.id): {"app_id": eu.app_id, "memory_config_id": eu.memory_config_id} for eu in end_users} # 记录未找到的用户 @@ -1295,15 +1370,116 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) for user_id in missing_user_ids: result[user_id] = {"memory_config_id": None, "memory_config_name": None} - # 2. 批量获取所有相关应用以获取 workspace_id + # 2. 批量获取所有相关应用以获取 workspace_id 和 type app_ids = list(set(data["app_id"] for data in user_data.values())) if not app_ids: return result apps = db.query(App).filter(App.id.in_(app_ids)).all() + app_map = {app.id: app for app in apps} app_to_workspace = {app.id: app.workspace_id for app in apps} - # 3. 收集需要查询的 memory_config_id 和需要回退的 workspace_id + # 3. 对于没有 memory_config_id 的用户,尝试从 AppRelease.config 提取 + users_needing_migration = [ + (end_user_id, data["app_id"]) + for end_user_id, data in user_data.items() + if not data["memory_config_id"] + ] + + if users_needing_migration: + # 批量获取相关应用的最新发布版本 + migration_app_ids = list(set(app_id for _, app_id in users_needing_migration)) + + # 查询每个应用的最新活跃发布版本 + app_latest_releases = {} + for app_id in migration_app_ids: + stmt = ( + select(AppRelease) + .where(AppRelease.app_id == app_id, AppRelease.is_active.is_(True)) + .order_by(AppRelease.version.desc()) + .limit(1) + ) + latest_release = db.scalars(stmt).first() + if latest_release: + app_latest_releases[app_id] = latest_release + + # 为每个需要迁移的用户提取 memory_config_id + config_service = MemoryConfigService(db) + users_to_backfill = [] # [(end_user, memory_config_id), ...] + + for end_user_id, app_id in users_needing_migration: + latest_release = app_latest_releases.get(app_id) + if not latest_release: + continue + + config = latest_release.config or {} + + # 如果 config 是字符串,解析为字典 + if isinstance(config, str): + try: + config = json_module.loads(config) + except json_module.JSONDecodeError: + logger.warning(f"Failed to parse config JSON for release {latest_release.id}") + continue + + # 使用 MemoryConfigService 的提取方法 + app = app_map.get(app_id) + if not app: + continue + + legacy_config_id, is_legacy_int = config_service.extract_memory_config_id( + app_type=app.type, + config=config + ) + + if legacy_config_id: + # 更新 user_data 中的 memory_config_id + user_data[end_user_id]["memory_config_id"] = legacy_config_id + + # 记录需要回填的用户(稍后验证配置存在后再回填) + end_user = end_user_map.get(end_user_id) + if end_user: + users_to_backfill.append((end_user, legacy_config_id)) + elif is_legacy_int: + logger.info( + f"Legacy int config detected for end_user {end_user_id}, will use workspace default" + ) + + # 验证提取的 config_id 是否存在于数据库中 + if users_to_backfill: + config_ids_to_validate = list(set(cid for _, cid in users_to_backfill)) + existing_configs = db.query(MemoryConfig).filter( + MemoryConfig.config_id.in_(config_ids_to_validate) + ).all() + valid_config_ids = {mc.config_id for mc in existing_configs} + + # 只回填存在的配置 + valid_backfills = [ + (eu, cid) for eu, cid in users_to_backfill + if cid in valid_config_ids + ] + invalid_backfills = [ + (eu, cid) for eu, cid in users_to_backfill + if cid not in valid_config_ids + ] + + if invalid_backfills: + invalid_ids = [str(cid) for _, cid in invalid_backfills] + logger.warning( + f"Skipping backfill for non-existent memory_config_ids: {invalid_ids}" + ) + # 清除 user_data 中无效的 config_id + for eu, cid in invalid_backfills: + user_data[str(eu.id)]["memory_config_id"] = None + + # 批量回填 end_user.memory_config_id + if valid_backfills: + for end_user, memory_config_id in valid_backfills: + end_user.memory_config_id = memory_config_id + db.commit() + logger.info(f"Migrated memory_config_id for {len(valid_backfills)} end_users") + + # 4. 收集需要查询的 memory_config_id 和需要回退的 workspace_id direct_config_ids = [] workspace_fallback_users = [] # [(end_user_id, workspace_id), ...] @@ -1315,13 +1491,13 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) if workspace_id: workspace_fallback_users.append((end_user_id, workspace_id)) - # 4. 批量查询直接分配的配置 + # 5. 批量查询直接分配的配置 config_id_to_config = {} if direct_config_ids: configs = db.query(MemoryConfig).filter(MemoryConfig.config_id.in_(direct_config_ids)).all() config_id_to_config = {mc.config_id: mc for mc in configs} - # 5. 获取工作空间默认配置(需要逐个查询,因为 get_workspace_default_config 有复杂逻辑) + # 6. 获取工作空间默认配置(需要逐个查询,因为 get_workspace_default_config 有复杂逻辑) workspace_default_configs = {} unique_workspace_ids = list(set(ws_id for _, ws_id in workspace_fallback_users)) @@ -1332,7 +1508,7 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) if default_config: workspace_default_configs[workspace_id] = default_config - # 6. 构建最终结果 + # 7. 构建最终结果 for end_user_id, data in user_data.items(): memory_config = None diff --git a/api/app/services/memory_config_service.py b/api/app/services/memory_config_service.py index f6eb84e2..c2ddbf2c 100644 --- a/api/app/services/memory_config_service.py +++ b/api/app/services/memory_config_service.py @@ -17,7 +17,6 @@ from sqlalchemy.orm import Session from app.core.logging_config import get_config_logger, get_logger from app.core.validators.memory_config_validators import ( validate_and_resolve_model_id, - validate_embedding_model, ) from app.models.memory_config_model import MemoryConfig as MemoryConfigModel from app.repositories.memory_config_repository import MemoryConfigRepository @@ -217,53 +216,108 @@ class MemoryConfigService: memory_config, workspace = result - # Step 2: Validate embedding model (returns both UUID and name) + # Helper function to validate model with workspace fallback + def _validate_model_with_fallback( + model_id: str, + model_type: str, + workspace_default: str, + required: bool = False + ) -> tuple: + """Validate model ID, falling back to workspace default if invalid. + + Args: + model_id: The model ID to validate + model_type: Type of model (llm, embedding, rerank) + workspace_default: Workspace default model ID to use as fallback + required: Whether the model is required + + Returns: + Tuple of (model_uuid, model_name) or (None, None) + """ + # Try the configured model first + if model_id: + try: + return validate_and_resolve_model_id( + model_id, + model_type, + self.db, + workspace.tenant_id, + required=False, + config_id=validated_config_id, + workspace_id=workspace.id, + ) + except Exception as e: + logger.warning( + f"{model_type} model validation failed, trying workspace default: {e}" + ) + + # Fallback to workspace default + if workspace_default: + try: + result = validate_and_resolve_model_id( + workspace_default, + model_type, + self.db, + workspace.tenant_id, + required=required, + config_id=validated_config_id, + workspace_id=workspace.id, + ) + if result[0]: + logger.info( + f"Using workspace default {model_type} model: {workspace_default}" + ) + return result + except Exception as e: + logger.error(f"Workspace default {model_type} model also invalid: {e}") + if required: + raise + + if required: + raise InvalidConfigError( + f"{model_type.title()} model is required but not configured", + field_name=f"{model_type}_model_id", + invalid_value=model_id, + config_id=validated_config_id, + workspace_id=workspace.id + ) + + return None, None + + # Step 2: Validate embedding model with workspace fallback embed_start = time.time() - embedding_uuid, embedding_name = validate_embedding_model( - validated_config_id, + embedding_uuid, embedding_name = _validate_model_with_fallback( memory_config.embedding_id, - self.db, - workspace.tenant_id, - workspace.id, + "embedding", + workspace.embedding, + required=True ) embed_time = time.time() - embed_start logger.info(f"[PERF] Embedding validation: {embed_time:.4f}s") - # Step 3: Resolve LLM model + # Step 3: Resolve LLM model with workspace fallback llm_start = time.time() - llm_uuid, llm_name = validate_and_resolve_model_id( + llm_uuid, llm_name = _validate_model_with_fallback( memory_config.llm_id, "llm", - self.db, - workspace.tenant_id, - required=True, - config_id=validated_config_id, - workspace_id=workspace.id, + workspace.llm, + required=True ) llm_time = time.time() - llm_start logger.info(f"[PERF] LLM validation: {llm_time:.4f}s") - # Step 4: Resolve optional rerank model + # Step 4: Resolve optional rerank model with workspace fallback rerank_start = time.time() - rerank_uuid = None - rerank_name = None - if memory_config.rerank_id: - rerank_uuid, rerank_name = validate_and_resolve_model_id( - memory_config.rerank_id, - "rerank", - self.db, - workspace.tenant_id, - required=False, - config_id=validated_config_id, - workspace_id=workspace.id, - ) + rerank_uuid, rerank_name = _validate_model_with_fallback( + memory_config.rerank_id, + "rerank", + workspace.rerank, + required=False + ) rerank_time = time.time() - rerank_start - if memory_config.rerank_id: + if memory_config.rerank_id or workspace.rerank: logger.info(f"[PERF] Rerank validation: {rerank_time:.4f}s") - # Note: embedding_name is now returned from validate_embedding_model above - # No need for redundant query! - # Create immutable MemoryConfig object config = MemoryConfig( config_id=memory_config.config_id, @@ -530,38 +584,7 @@ class MemoryConfigService: Returns: Optional[MemoryConfigModel]: Default config or None if no configs exist """ - from sqlalchemy import select - - from app.models.memory_config_model import MemoryConfig as MemoryConfigModel - - # First, try to find the explicitly marked default config - stmt = ( - select(MemoryConfigModel) - .where( - MemoryConfigModel.workspace_id == workspace_id, - MemoryConfigModel.is_default.is_(True), - MemoryConfigModel.state.is_(True), - ) - .limit(1) - ) - - config = self.db.scalars(stmt).first() - - if config: - return config - - # Fallback: get the oldest active config if no explicit default - stmt = ( - select(MemoryConfigModel) - .where( - MemoryConfigModel.workspace_id == workspace_id, - MemoryConfigModel.state.is_(True), - ) - .order_by(MemoryConfigModel.created_at.asc()) - .limit(1) - ) - - config = self.db.scalars(stmt).first() + config = MemoryConfigRepository.get_workspace_default(self.db, workspace_id) if not config: logger.warning( @@ -588,29 +611,28 @@ class MemoryConfigService: Returns: Optional[MemoryConfigModel]: Memory config or None if no fallback available """ - from app.models.memory_config_model import MemoryConfig as MemoryConfigModel - if not memory_config_id: logger.debug( "No memory config ID provided, using workspace default", extra={"workspace_id": str(workspace_id)} ) - return self.get_workspace_default_config(workspace_id) - config = self.db.get(MemoryConfigModel, memory_config_id) - - if config: - return config - - logger.warning( - "Memory config not found, falling back to workspace default", - extra={ - "missing_config_id": str(memory_config_id), - "workspace_id": str(workspace_id) - } + config = MemoryConfigRepository.get_with_fallback( + self.db, + memory_config_id, + workspace_id ) - return self.get_workspace_default_config(workspace_id) + if not config and memory_config_id: + logger.warning( + "Memory config not found, falling back to workspace default", + extra={ + "missing_config_id": str(memory_config_id), + "workspace_id": str(workspace_id) + } + ) + + return config def delete_config( self, @@ -624,7 +646,7 @@ class MemoryConfigService: Args: config_id: Memory config ID to delete (UUID or legacy int) - force: If True, delete even if end users are connected + force: If True, clear end user references before deleting Returns: Dict with status, message, and affected_users count @@ -632,8 +654,11 @@ class MemoryConfigService: Raises: ResourceNotFoundException: If config doesn't exist """ + from sqlalchemy.exc import IntegrityError + from app.core.exceptions import ResourceNotFoundException from app.models.memory_config_model import MemoryConfig as MemoryConfigModel + from app.repositories.end_user_repository import EndUserRepository # 处理旧格式 int 类型的 config_id if isinstance(config_id, int): @@ -663,54 +688,227 @@ class MemoryConfigService: "is_default": True } - # TODO: add back delete warning - # # Count connected end users - # end_user_repo = EndUserRepository(self.db) - # connected_count = end_user_repo.count_by_memory_config_id(config_id) + # Use repository to count connected end users + end_user_repo = EndUserRepository(self.db) + connected_count = end_user_repo.count_by_memory_config_id(config_id) - # if connected_count > 0 and not force: - # logger.warning( - # "Attempted to delete memory config with connected end users", - # extra={ - # "config_id": str(config_id), - # "connected_count": connected_count - # } - # ) + if connected_count > 0 and not force: + logger.warning( + "Attempted to delete memory config with connected end users", + extra={ + "config_id": str(config_id), + "connected_count": connected_count + } + ) - # return { - # "status": "warning", - # "message": f"Cannot delete memory config: {connected_count} end users are using it", - # "connected_count": connected_count, - # "force_required": True - # } + return { + "status": "warning", + "message": f"无法删除记忆配置:{connected_count} 个终端用户正在使用此配置", + "connected_count": connected_count, + "force_required": True + } - # # Force delete: clear end user references first - # if connected_count > 0 and force: - # cleared_count = end_user_repo.clear_memory_config_id(config_id) + # Force delete: use repository to clear end user references first + if connected_count > 0 and force: + cleared_count = end_user_repo.clear_memory_config_id(config_id) - # logger.warning( - # "Force deleting memory config", - # extra={ - # "config_id": str(config_id), - # "cleared_end_users": cleared_count - # } - # ) - connected_count = 0 + logger.warning( + "Force deleting memory config, clearing end user references", + extra={ + "config_id": str(config_id), + "cleared_end_users": cleared_count + } + ) - self.db.delete(config) - self.db.commit() - - logger.info( - "Memory config deleted", - extra={ - "config_id": str(config_id), - "force": force, + try: + self.db.delete(config) + self.db.commit() + + logger.info( + "Memory config deleted", + extra={ + "config_id": str(config_id), + "force": force, + "affected_users": connected_count + } + ) + + return { + "status": "success", + "message": "记忆配置删除成功", "affected_users": connected_count } - ) + + except IntegrityError as e: + self.db.rollback() + + # Handle foreign key violation gracefully + error_str = str(e.orig) if e.orig else str(e) + if "ForeignKeyViolation" in error_str or "foreign key constraint" in error_str.lower(): + logger.warning( + "Delete failed due to foreign key constraint", + extra={ + "config_id": str(config_id), + "error": error_str + } + ) + return { + "status": "error", + "message": "无法删除记忆配置:仍有终端用户引用此配置,请使用 force=true 强制删除", + "force_required": True + } + + # Re-raise other integrity errors + logger.error( + "Delete failed due to integrity error", + extra={ + "config_id": str(config_id), + "error": error_str + }, + exc_info=True + ) + raise + + # ==================== 记忆配置提取方法 ==================== + + def extract_memory_config_id( + self, + app_type: str, + config: dict + ) -> tuple[Optional[uuid.UUID], bool]: + """从发布配置中提取 memory_config_id(根据应用类型分发) - return { - "status": "success", - "message": "Memory config deleted successfully", - "affected_users": connected_count - } + Args: + app_type: 应用类型 (agent, workflow, multi_agent) + config: 发布配置字典 + + Returns: + Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) + - memory_config_id: 提取的配置ID,如果不存在或为旧格式则返回 None + - is_legacy_int: 是否检测到旧格式 int 数据,需要回退到工作空间默认配置 + """ + if app_type == "agent": + return self._extract_memory_config_id_from_agent(config) + elif app_type == "workflow": + return self._extract_memory_config_id_from_workflow(config) + elif app_type == "multi_agent": + # Multi-agent 暂不支持记忆配置提取 + logger.debug(f"多智能体应用暂不支持记忆配置提取: app_type={app_type}") + return None, False + else: + logger.warning(f"不支持的应用类型,无法提取记忆配置: app_type={app_type}") + return None, False + + def _extract_memory_config_id_from_agent( + self, + config: dict + ) -> tuple[Optional[uuid.UUID], bool]: + """从 Agent 应用配置中提取 memory_config_id + + 路径: config.memory.memory_content 或 config.memory.memory_config_id + + Args: + config: Agent 配置字典 + + Returns: + Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) + - memory_config_id: 记忆配置ID,如果不存在或为旧格式则返回 None + - is_legacy_int: 是否检测到旧格式 int 数据 + """ + try: + memory_dict = config.get("memory", {}) + # Support both field names: memory_config_id (new) and memory_content (legacy) + memory_value = memory_dict.get("memory_config_id") or memory_dict.get("memory_content") + logger.info( + f"Extracting memory_config_id: memory_value={memory_value}, " + f"type={type(memory_value).__name__ if memory_value else 'None'}" + ) + if memory_value: + # 处理字符串、UUID 和 int(旧数据兼容)三种情况 + if isinstance(memory_value, uuid.UUID): + return memory_value, False + elif isinstance(memory_value, str): + # Check if it's a numeric string (legacy int format) + if memory_value.isdigit(): + logger.warning( + f"Agent 配置中 memory_config_id 为旧格式 int 字符串,将使用工作空间默认配置: " + f"value={memory_value}" + ) + return None, True + try: + return uuid.UUID(memory_value), False + except ValueError: + logger.warning(f"Invalid UUID string: {memory_value}") + return None, False + elif isinstance(memory_value, int): + # 旧数据存储为 int,需要回退到工作空间默认配置 + logger.warning( + f"Agent 配置中 memory_config_id 为旧格式 int,将使用工作空间默认配置: " + f"value={memory_value}" + ) + return None, True + else: + logger.warning( + f"Agent 配置中 memory_config_id 格式无效: type={type(memory_value)}, " + f"value={memory_value}" + ) + return None, False + except (ValueError, TypeError) as e: + logger.warning( + f"Agent 配置中 memory_config_id 格式无效: error={str(e)}" + ) + return None, False + + def _extract_memory_config_id_from_workflow( + self, + config: dict + ) -> tuple[Optional[uuid.UUID], bool]: + """从 Workflow 应用配置中提取 memory_config_id + + 扫描工作流节点,查找 MemoryRead 或 MemoryWrite 节点。 + 返回第一个找到的记忆节点的 config_id。 + + Args: + config: Workflow 配置字典 + + Returns: + Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int) + - memory_config_id: 记忆配置ID,如果不存在或为旧格式则返回 None + - is_legacy_int: 是否检测到旧格式 int 数据 + """ + nodes = config.get("nodes", []) + + for node in nodes: + node_type = node.get("type", "") + + # 检查是否为记忆节点 (support both formats: memory-read/memory-write and MemoryRead/MemoryWrite) + if node_type.lower() in ["memoryread", "memorywrite", "memory-read", "memory-write"]: + config_id = node.get("config", {}).get("config_id") + + if config_id: + try: + # 处理字符串、UUID 和 int(旧数据兼容)三种情况 + if isinstance(config_id, uuid.UUID): + return config_id, False + elif isinstance(config_id, str): + return uuid.UUID(config_id), False + elif isinstance(config_id, int): + # 旧数据存储为 int,需要回退到工作空间默认配置 + logger.warning( + f"工作流记忆节点 config_id 为旧格式 int,将使用工作空间默认配置: " + f"node_id={node.get('id')}, node_type={node_type}, value={config_id}" + ) + return None, True + else: + logger.warning( + f"工作流记忆节点 config_id 格式无效: node_id={node.get('id')}, " + f"node_type={node_type}, type={type(config_id)}" + ) + except (ValueError, TypeError) as e: + logger.warning( + f"工作流记忆节点 config_id 格式无效: node_id={node.get('id')}, " + f"node_type={node_type}, error={str(e)}" + ) + + logger.debug("工作流配置中未找到记忆节点") + return None, False diff --git a/api/app/services/memory_reflection_service.py b/api/app/services/memory_reflection_service.py index 66ccd72d..316d7cc5 100644 --- a/api/app/services/memory_reflection_service.py +++ b/api/app/services/memory_reflection_service.py @@ -120,7 +120,14 @@ class WorkspaceAppService: return None def _get_memory_config(self, memory_content: str) -> Dict[str, Any]: - """Retrieve memory_config information based on memory_content""" + """Retrieve memory_config information based on memory_content + + Args: + memory_content: Memory config ID string + + Returns: + Dict containing memory config info including workspace_id for model fallback + """ try: memory_content = resolve_config_id(memory_content, self.db) memory_config_result = MemoryConfigRepository.query_reflection_config_by_id(self.db, (memory_content)) @@ -128,6 +135,7 @@ class WorkspaceAppService: if memory_config_result: return { "config_id": memory_content, + "workspace_id": memory_config_result.workspace_id, "enable_self_reflexion": memory_config_result.enable_self_reflexion, "iteration_period": memory_config_result.iteration_period, "reflexion_range": memory_config_result.reflexion_range, @@ -359,7 +367,17 @@ class MemoryReflectionService: } def _create_reflection_config_from_data(self, config_data: Dict[str, Any]) -> ReflectionConfig: - """Create reflective configuration objects from configuration data""" + """Create reflective configuration objects from configuration data + + If reflection_model_id is not set, falls back to workspace default LLM. + + Args: + config_data: Dict containing reflection config including workspace_id + + Returns: + ReflectionConfig object with model_id resolved + """ + from app.repositories.workspace_repository import get_workspace_models_configs reflexion_range_value = config_data.get("reflexion_range") if reflexion_range_value is None or reflexion_range_value == "": @@ -392,6 +410,17 @@ class MemoryReflectionService: if reflection_model_id: reflection_model_id = str(reflection_model_id) + # 如果 reflection_model_id 为空,回退到工作空间默认 LLM + if not reflection_model_id: + workspace_id = config_data.get("workspace_id") + if workspace_id: + workspace_models = get_workspace_models_configs(self.db, workspace_id) + if workspace_models and workspace_models.get("llm"): + reflection_model_id = workspace_models["llm"] + api_logger.info( + f"reflection_model_id 为空,使用工作空间默认 LLM: {reflection_model_id}" + ) + return ReflectionConfig( enabled=config_data.get("enable_self_reflexion", False), iteration_period=str(iteration_period), # ReflectionConfig期望字符串 diff --git a/api/app/services/workspace_service.py b/api/app/services/workspace_service.py index 43116b34..9ee98fa0 100644 --- a/api/app/services/workspace_service.py +++ b/api/app/services/workspace_service.py @@ -899,6 +899,8 @@ def update_workspace_models_configs( def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None: """Ensure a workspace has a default memory config, creating one if missing. + Also fills empty model fields for all configs in this workspace. + Args: db: Database session workspace: The workspace to check @@ -911,28 +913,92 @@ def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None: MemoryConfig.is_default == True ).first() - if existing_default: + if not existing_default: + # No default config exists, create one + business_logger.info( + f"Workspace {workspace.id} missing default memory config, creating one" + ) + + try: + _create_default_memory_config( + db=db, + workspace_id=workspace.id, + workspace_name=workspace.name, + llm_id=uuid.UUID(workspace.llm) if workspace.llm else None, + embedding_id=uuid.UUID(workspace.embedding) if workspace.embedding else None, + rerank_id=uuid.UUID(workspace.rerank) if workspace.rerank else None, + ) + except Exception as e: + business_logger.error( + f"Failed to create default memory config for workspace {workspace.id}: {str(e)}" + ) + + # Fill empty model fields for ALL configs in this workspace + _fill_workspace_configs_model_defaults(db, workspace) + + +def _fill_workspace_configs_model_defaults( + db: Session, + workspace: Workspace +) -> None: + """Fill empty model fields for all memory configs in a workspace. + + Updates llm_id, embedding_id, rerank_id, reflection_model_id, and emotion_model_id + if they are None, using the corresponding workspace default models. + + Args: + db: Database session + workspace: The workspace containing default model settings + """ + from app.models.memory_config_model import MemoryConfig + + # Get all configs for this workspace + configs = db.query(MemoryConfig).filter( + MemoryConfig.workspace_id == workspace.id + ).all() + + if not configs: return - # No default config exists, create one - business_logger.info( - f"Workspace {workspace.id} missing default memory config, creating one" - ) + # Map of memory_config field -> workspace field + model_field_mappings = [ + ("llm_id", "llm"), + ("embedding_id", "embedding"), + ("rerank_id", "rerank"), + ("reflection_model_id", "llm"), # reflection uses LLM + ("emotion_model_id", "llm"), # emotion uses LLM + ] - try: - _create_default_memory_config( - db=db, - workspace_id=workspace.id, - workspace_name=workspace.name, - llm_id=uuid.UUID(workspace.llm) if workspace.llm else None, - embedding_id=uuid.UUID(workspace.embedding) if workspace.embedding else None, - rerank_id=uuid.UUID(workspace.rerank) if workspace.rerank else None, - ) - except Exception as e: - business_logger.error( - f"Failed to create default memory config for workspace {workspace.id}: {str(e)}" - ) - # Don't fail the workspace list operation if config creation fails + configs_updated = 0 + + for memory_config in configs: + updated_fields = [] + + for config_field, workspace_field in model_field_mappings: + config_value = getattr(memory_config, config_field, None) + workspace_value = getattr(workspace, workspace_field, None) + + if not config_value and workspace_value: + setattr(memory_config, config_field, workspace_value) + updated_fields.append(config_field) + + if updated_fields: + configs_updated += 1 + business_logger.debug( + f"Updated memory config {memory_config.config_id} fields: {updated_fields}" + ) + + if configs_updated > 0: + try: + db.commit() + business_logger.info( + f"Updated {configs_updated} memory configs in workspace {workspace.id} with default models" + ) + except Exception as e: + db.rollback() + business_logger.error( + f"Failed to update memory configs in workspace {workspace.id}: {str(e)}" + ) def _create_default_memory_config(