refactor(memory): consolidate memory config extraction and remove unused validator

- Add workspace default LLM fallback for emotion model in extraction orchestrator
- Consolidate memory config ID extraction logic into MemoryConfigService
- Remove duplicate extraction methods from AppService (_extract_memory_config_id_from_agent, _extract_memory_config_id_from_workflow)
- Remove unused validate_embedding_model function from validators
- Simplify AppService by delegating memory config extraction to MemoryConfigService
- Update validator exports to remove validate_embedding_model
- Improve code maintainability by centralizing memory configuration logic
This commit is contained in:
Ke Sun
2026-02-09 17:28:42 +08:00
parent 5fe85fb457
commit b16c9d53ef
9 changed files with 736 additions and 332 deletions

View File

@@ -1191,8 +1191,8 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
"""
获取终端用户关联的记忆配置
使用 MemoryConfigService.get_config_with_fallback 获取配置,
支持终端用户已分配配置和工作空间默认配置的回退机制
兼容旧数据:如果 end_user.memory_config_id 为空,则从 AppRelease.config 中获取
并回填到 end_user.memory_config_id 字段(懒迁移)
Args:
end_user_id: 终端用户ID
@@ -1204,7 +1204,13 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
Raises:
ValueError: 当终端用户不存在或应用未发布时
"""
import json as json_module
import uuid
from sqlalchemy import select
from app.models.app_model import App
from app.models.app_release_model import AppRelease
from app.models.end_user_model import EndUser
from app.services.memory_config_service import MemoryConfigService
@@ -1217,6 +1223,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
raise ValueError(f"终端用户不存在: {end_user_id}")
app_id = end_user.app_id
logger.debug(f"Found end_user app_id: {app_id}")
# 2. 获取应用以确定 workspace_id
app = db.query(App).filter(App.id == app_id).first()
@@ -1228,10 +1235,71 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
logger.warning(f"No current release for app: {app_id}")
raise ValueError(f"应用未发布: {app_id}")
# 3. 使用 get_config_with_fallback 获取记忆配置
# 3. 兼容旧数据:如果 memory_config_id 为空,从 AppRelease.config 获取并回填
memory_config_id_to_use = end_user.memory_config_id
# 如果已有 memory_config_id直接使用
# 如果新创建enduserenduser.memory_config_id 必定为none
# 那么使用从release中获取memory_config_id为预期行为并且回填到
# end_user.memory_config_id
if not memory_config_id_to_use:
logger.info(f"end_user.memory_config_id is None, migrating from AppRelease.config")
# 获取最新发布版本
stmt = (
select(AppRelease)
.where(AppRelease.app_id == app_id, AppRelease.is_active.is_(True))
.order_by(AppRelease.version.desc())
)
# TODO: change to current_release_id
latest_release = db.scalars(stmt).first()
if latest_release:
config = latest_release.config or {}
# 如果 config 是字符串,解析为字典
if isinstance(config, str):
try:
config = json_module.loads(config)
except json_module.JSONDecodeError:
logger.warning(f"Failed to parse config JSON for release {latest_release.id}")
config = {}
# 使用 MemoryConfigService 的提取方法
memory_config_service = MemoryConfigService(db)
legacy_config_id, is_legacy_int = memory_config_service.extract_memory_config_id(
app_type=app.type,
config=config
)
if legacy_config_id:
# 验证提取的 config_id 是否存在于数据库中
from app.models.memory_config_model import MemoryConfig as MemoryConfigModel
existing_config = db.get(MemoryConfigModel, legacy_config_id)
if existing_config:
memory_config_id_to_use = legacy_config_id
# 回填到 end_user 表lazy update
end_user.memory_config_id = memory_config_id_to_use
db.commit()
logger.info(
f"Migrated memory_config_id for end_user {end_user_id}: {memory_config_id_to_use}"
)
else:
logger.warning(
f"Extracted memory_config_id does not exist, skipping backfill: "
f"end_user_id={end_user_id}, config_id={legacy_config_id}"
)
elif is_legacy_int:
logger.info(
f"Legacy int config detected for end_user {end_user_id}, will use workspace default"
)
# 4. 使用 get_config_with_fallback 获取记忆配置
memory_config_service = MemoryConfigService(db)
memory_config = memory_config_service.get_config_with_fallback(
memory_config_id=end_user.memory_config_id,
memory_config_id=memory_config_id_to_use,
workspace_id=app.workspace_id
)
@@ -1255,7 +1323,8 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
使用与 get_end_user_connected_config 相同的逻辑:
1. 优先使用 end_user.memory_config_id
2. 如果没有,回退到工作空间默认配置
2. 如果没有,尝试从 AppRelease.config 提取并回填
3. 如果仍然没有,回退到工作空间默认配置
Args:
end_user_ids: 终端用户ID列表
@@ -1269,7 +1338,12 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
...
}
"""
import json as json_module
from sqlalchemy import select
from app.models.app_model import App
from app.models.app_release_model import AppRelease
from app.models.end_user_model import EndUser
from app.models.memory_config_model import MemoryConfig
from app.services.memory_config_service import MemoryConfigService
@@ -1284,7 +1358,8 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
# 1. 批量查询所有 end_user 及其 app_id 和 memory_config_id
end_users = db.query(EndUser).filter(EndUser.id.in_(end_user_ids)).all()
# 创建映射
# 创建映射 - 保留 EndUser 对象引用以便回填
end_user_map = {str(eu.id): eu for eu in end_users}
user_data = {str(eu.id): {"app_id": eu.app_id, "memory_config_id": eu.memory_config_id} for eu in end_users}
# 记录未找到的用户
@@ -1295,15 +1370,116 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
for user_id in missing_user_ids:
result[user_id] = {"memory_config_id": None, "memory_config_name": None}
# 2. 批量获取所有相关应用以获取 workspace_id
# 2. 批量获取所有相关应用以获取 workspace_id 和 type
app_ids = list(set(data["app_id"] for data in user_data.values()))
if not app_ids:
return result
apps = db.query(App).filter(App.id.in_(app_ids)).all()
app_map = {app.id: app for app in apps}
app_to_workspace = {app.id: app.workspace_id for app in apps}
# 3. 收集需要查询的 memory_config_id 和需要回退的 workspace_id
# 3. 对于没有 memory_config_id 的用户,尝试从 AppRelease.config 提取
users_needing_migration = [
(end_user_id, data["app_id"])
for end_user_id, data in user_data.items()
if not data["memory_config_id"]
]
if users_needing_migration:
# 批量获取相关应用的最新发布版本
migration_app_ids = list(set(app_id for _, app_id in users_needing_migration))
# 查询每个应用的最新活跃发布版本
app_latest_releases = {}
for app_id in migration_app_ids:
stmt = (
select(AppRelease)
.where(AppRelease.app_id == app_id, AppRelease.is_active.is_(True))
.order_by(AppRelease.version.desc())
.limit(1)
)
latest_release = db.scalars(stmt).first()
if latest_release:
app_latest_releases[app_id] = latest_release
# 为每个需要迁移的用户提取 memory_config_id
config_service = MemoryConfigService(db)
users_to_backfill = [] # [(end_user, memory_config_id), ...]
for end_user_id, app_id in users_needing_migration:
latest_release = app_latest_releases.get(app_id)
if not latest_release:
continue
config = latest_release.config or {}
# 如果 config 是字符串,解析为字典
if isinstance(config, str):
try:
config = json_module.loads(config)
except json_module.JSONDecodeError:
logger.warning(f"Failed to parse config JSON for release {latest_release.id}")
continue
# 使用 MemoryConfigService 的提取方法
app = app_map.get(app_id)
if not app:
continue
legacy_config_id, is_legacy_int = config_service.extract_memory_config_id(
app_type=app.type,
config=config
)
if legacy_config_id:
# 更新 user_data 中的 memory_config_id
user_data[end_user_id]["memory_config_id"] = legacy_config_id
# 记录需要回填的用户(稍后验证配置存在后再回填)
end_user = end_user_map.get(end_user_id)
if end_user:
users_to_backfill.append((end_user, legacy_config_id))
elif is_legacy_int:
logger.info(
f"Legacy int config detected for end_user {end_user_id}, will use workspace default"
)
# 验证提取的 config_id 是否存在于数据库中
if users_to_backfill:
config_ids_to_validate = list(set(cid for _, cid in users_to_backfill))
existing_configs = db.query(MemoryConfig).filter(
MemoryConfig.config_id.in_(config_ids_to_validate)
).all()
valid_config_ids = {mc.config_id for mc in existing_configs}
# 只回填存在的配置
valid_backfills = [
(eu, cid) for eu, cid in users_to_backfill
if cid in valid_config_ids
]
invalid_backfills = [
(eu, cid) for eu, cid in users_to_backfill
if cid not in valid_config_ids
]
if invalid_backfills:
invalid_ids = [str(cid) for _, cid in invalid_backfills]
logger.warning(
f"Skipping backfill for non-existent memory_config_ids: {invalid_ids}"
)
# 清除 user_data 中无效的 config_id
for eu, cid in invalid_backfills:
user_data[str(eu.id)]["memory_config_id"] = None
# 批量回填 end_user.memory_config_id
if valid_backfills:
for end_user, memory_config_id in valid_backfills:
end_user.memory_config_id = memory_config_id
db.commit()
logger.info(f"Migrated memory_config_id for {len(valid_backfills)} end_users")
# 4. 收集需要查询的 memory_config_id 和需要回退的 workspace_id
direct_config_ids = []
workspace_fallback_users = [] # [(end_user_id, workspace_id), ...]
@@ -1315,13 +1491,13 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
if workspace_id:
workspace_fallback_users.append((end_user_id, workspace_id))
# 4. 批量查询直接分配的配置
# 5. 批量查询直接分配的配置
config_id_to_config = {}
if direct_config_ids:
configs = db.query(MemoryConfig).filter(MemoryConfig.config_id.in_(direct_config_ids)).all()
config_id_to_config = {mc.config_id: mc for mc in configs}
# 5. 获取工作空间默认配置(需要逐个查询,因为 get_workspace_default_config 有复杂逻辑)
# 6. 获取工作空间默认配置(需要逐个查询,因为 get_workspace_default_config 有复杂逻辑)
workspace_default_configs = {}
unique_workspace_ids = list(set(ws_id for _, ws_id in workspace_fallback_users))
@@ -1332,7 +1508,7 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
if default_config:
workspace_default_configs[workspace_id] = default_config
# 6. 构建最终结果
# 7. 构建最终结果
for end_user_id, data in user_data.items():
memory_config = None