feat(memory): add legacy int data format detection and workspace default fallback

- Add .hypothesis/ to .gitignore for test framework artifacts
- Remove outdated comment from EndUser model memory_config_id field
- Update memory config extraction methods to return tuple with legacy format flag
- Add detection for legacy int-formatted memory_config_id in Agent and Workflow configs
- Implement workspace default memory config fallback when legacy int format detected
- Add _get_workspace_default_memory_config_id method to retrieve default or earliest active config
- Update return types from Optional[uuid.UUID] to Tuple[Optional[uuid.UUID], bool] for extraction methods
- Add comprehensive logging for legacy format detection and fallback behavior
- Improve backward compatibility for applications with old int-based memory configuration data
This commit is contained in:
Ke Sun
2026-01-29 21:00:09 +08:00
parent a01911ba5f
commit 8267761890
4 changed files with 171 additions and 25 deletions

1
.gitignore vendored
View File

@@ -21,6 +21,7 @@ examples/
# Temporary outputs
.DS_Store
.hypothesis/
time.log
celerybeat-schedule.db
search_results.json

View File

@@ -21,7 +21,6 @@ class EndUser(Base):
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)
# Memory config association - updated lazily during conversation
memory_config_id = Column(
UUID(as_uuid=True),
ForeignKey("memory_config.config_id"),

View File

@@ -1182,7 +1182,7 @@ class AppService:
self,
app_type: str,
config: Dict[str, Any]
) -> Optional[uuid.UUID]:
) -> Tuple[Optional[uuid.UUID], bool]:
"""从发布配置中提取 memory_config_id根据应用类型分发
Args:
@@ -1190,7 +1190,9 @@ class AppService:
config: 发布配置字典
Returns:
Optional[uuid.UUID]: 提取的 memory_config_id,如果不存在则返回 None
Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int)
- memory_config_id: 提取的配置ID如果不存在或为旧格式则返回 None
- is_legacy_int: 是否检测到旧格式 int 数据,需要回退到工作空间默认配置
"""
if app_type == AppType.AGENT:
return self._extract_memory_config_id_from_agent(config)
@@ -1199,15 +1201,15 @@ class AppService:
elif app_type == AppType.MULTI_AGENT:
# Multi-agent 暂不支持记忆配置提取
logger.debug(f"多智能体应用暂不支持记忆配置提取: app_type={app_type}")
return None
return None, False
else:
logger.warning(f"不支持的应用类型,无法提取记忆配置: app_type={app_type}")
return None
return None, False
def _extract_memory_config_id_from_agent(
self,
config: Dict[str, Any]
) -> Optional[uuid.UUID]:
) -> Tuple[Optional[uuid.UUID], bool]:
"""从 Agent 应用配置中提取 memory_config_id
路径: config.memory.memory_content
@@ -1216,33 +1218,42 @@ class AppService:
config: Agent 配置字典
Returns:
Optional[uuid.UUID]: 记忆配置ID如果不存在则返回 None
Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int)
- memory_config_id: 记忆配置ID如果不存在或为旧格式则返回 None
- is_legacy_int: 是否检测到旧格式 int 数据
"""
try:
memory_content = config.get("memory", {}).get("memory_content")
if memory_content:
# 处理字符串UUID 种情况
if isinstance(memory_content, str):
return uuid.UUID(memory_content)
elif isinstance(memory_content, uuid.UUID):
return memory_content
# 处理字符串UUID 和 int旧数据兼容种情况
if isinstance(memory_content, uuid.UUID):
return memory_content, False
elif isinstance(memory_content, str):
return uuid.UUID(memory_content), False
elif isinstance(memory_content, int):
# 旧数据存储为 int需要回退到工作空间默认配置
logger.warning(
f"Agent 配置中 memory_content 为旧格式 int将使用工作空间默认配置: "
f"value={memory_content}"
)
return None, True
else:
logger.warning(
f"Agent 配置中 memory_content 格式无效: type={type(memory_content)}, "
f"value={memory_content}"
)
return None
return None, False
except (ValueError, TypeError) as e:
logger.warning(
f"Agent 配置中 memory_content 格式无效: error={str(e)}, "
f"memory_content={memory_content}"
)
return None
return None, False
def _extract_memory_config_id_from_workflow(
self,
config: Dict[str, Any]
) -> Optional[uuid.UUID]:
) -> Tuple[Optional[uuid.UUID], bool]:
"""从 Workflow 应用配置中提取 memory_config_id
扫描工作流节点,查找 MemoryRead 或 MemoryWrite 节点。
@@ -1252,7 +1263,9 @@ class AppService:
config: Workflow 配置字典
Returns:
Optional[uuid.UUID]: 记忆配置ID如果不存在则返回 None
Tuple[Optional[uuid.UUID], bool]: (memory_config_id, is_legacy_int)
- memory_config_id: 记忆配置ID如果不存在或为旧格式则返回 None
- is_legacy_int: 是否检测到旧格式 int 数据
"""
nodes = config.get("nodes", [])
@@ -1265,11 +1278,18 @@ class AppService:
if config_id:
try:
# 处理字符串UUID 种情况
if isinstance(config_id, str):
return uuid.UUID(config_id)
elif isinstance(config_id, uuid.UUID):
return config_id
# 处理字符串UUID 和 int旧数据兼容种情况
if isinstance(config_id, uuid.UUID):
return config_id, False
elif isinstance(config_id, str):
return uuid.UUID(config_id), False
elif isinstance(config_id, int):
# 旧数据存储为 int需要回退到工作空间默认配置
logger.warning(
f"工作流记忆节点 config_id 为旧格式 int将使用工作空间默认配置: "
f"node_id={node.get('id')}, node_type={node_type}, value={config_id}"
)
return None, True
else:
logger.warning(
f"工作流记忆节点 config_id 格式无效: node_id={node.get('id')}, "
@@ -1282,7 +1302,57 @@ class AppService:
)
logger.debug("工作流配置中未找到记忆节点")
return None
return None, False
def _get_workspace_default_memory_config_id(
self,
workspace_id: uuid.UUID
) -> Optional[uuid.UUID]:
"""获取工作空间的默认记忆配置ID
Args:
workspace_id: 工作空间ID
Returns:
Optional[uuid.UUID]: 默认记忆配置ID如果不存在则返回 None
"""
from app.models.memory_config_model import MemoryConfig as MemoryConfigModel
# 查找工作空间的默认记忆配置
stmt = (
select(MemoryConfigModel.id)
.where(
MemoryConfigModel.workspace_id == workspace_id,
MemoryConfigModel.is_default.is_(True),
MemoryConfigModel.state.is_(True),
)
.limit(1)
)
config_id = self.db.scalars(stmt).first()
if config_id:
return config_id
# 回退:获取最早创建的活跃配置
stmt = (
select(MemoryConfigModel.id)
.where(
MemoryConfigModel.workspace_id == workspace_id,
MemoryConfigModel.state.is_(True),
)
.order_by(MemoryConfigModel.created_at.asc())
.limit(1)
)
config_id = self.db.scalars(stmt).first()
if not config_id:
logger.warning(
f"工作空间没有可用的记忆配置: workspace_id={workspace_id}"
)
return config_id
def _update_endusers_memory_config(
self,
@@ -1451,7 +1521,17 @@ class AppService:
self.db.flush() # 先 flush确保 release 已插入数据库
# 提取记忆配置ID并更新终端用户
memory_config_id = self._extract_memory_config_id(app.type, config)
memory_config_id, is_legacy_int = self._extract_memory_config_id(app.type, config)
# 如果检测到旧格式 int 数据,回退到工作空间默认配置
if is_legacy_int and not memory_config_id:
memory_config_id = self._get_workspace_default_memory_config_id(app.workspace_id)
if memory_config_id:
logger.info(
f"发布时使用工作空间默认记忆配置(旧数据兼容): app_id={app_id}, "
f"workspace_id={app.workspace_id}, memory_config_id={memory_config_id}"
)
if memory_config_id:
updated_count = self._update_endusers_memory_config(app_id, memory_config_id)
logger.info(
@@ -1575,7 +1655,17 @@ class AppService:
raise ResourceNotFoundException("发布版本", f"app_id={app_id}, version={version}")
# 提取记忆配置ID并更新终端用户
memory_config_id = self._extract_memory_config_id(release.type, release.config)
memory_config_id, is_legacy_int = self._extract_memory_config_id(release.type, release.config)
# 如果检测到旧格式 int 数据,回退到工作空间默认配置
if is_legacy_int and not memory_config_id:
memory_config_id = self._get_workspace_default_memory_config_id(app.workspace_id)
if memory_config_id:
logger.info(
f"回滚时使用工作空间默认记忆配置(旧数据兼容): app_id={app_id}, "
f"workspace_id={app.workspace_id}, memory_config_id={memory_config_id}"
)
if memory_config_id:
updated_count = self._update_endusers_memory_config(app_id, memory_config_id)
logger.info(

View File

@@ -87,9 +87,26 @@ def delete_workspace_member(
def get_user_workspaces(db: Session, user: User) -> List[Workspace]:
"""获取当前用户参与的所有工作空间"""
"""获取当前用户参与的所有工作空间
For neo4j storage type workspaces, ensures each has a default memory config.
If a workspace is missing a default config, one will be created automatically.
Args:
db: Database session
user: Current user
Returns:
List[Workspace]: List of workspaces the user belongs to
"""
business_logger.debug(f"获取用户工作空间列表: {user.username} (ID: {user.id})")
workspaces = workspace_repository.get_workspaces_by_user(db=db, user_id=user.id)
# Ensure each neo4j workspace has a default memory config
for workspace in workspaces:
if workspace.storage_type == 'neo4j':
_ensure_default_memory_config(db, workspace)
business_logger.info(f"用户 {user.username} 的工作空间数量: {len(workspaces)}")
return workspaces
@@ -879,6 +896,45 @@ def update_workspace_models_configs(
raise BusinessException(f"更新模型配置失败: {str(e)}", BizCode.INTERNAL_ERROR)
def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None:
"""Ensure a workspace has a default memory config, creating one if missing.
Args:
db: Database session
workspace: The workspace to check
"""
from app.models.memory_config_model import MemoryConfig
# Check if default config exists for this workspace
existing_default = db.query(MemoryConfig).filter(
MemoryConfig.workspace_id == workspace.id,
MemoryConfig.is_default == True
).first()
if existing_default:
return
# No default config exists, create one
business_logger.info(
f"Workspace {workspace.id} missing default memory config, creating one"
)
try:
_create_default_memory_config(
db=db,
workspace_id=workspace.id,
workspace_name=workspace.name,
llm_id=uuid.UUID(workspace.llm) if workspace.llm else None,
embedding_id=uuid.UUID(workspace.embedding) if workspace.embedding else None,
rerank_id=uuid.UUID(workspace.rerank) if workspace.rerank else None,
)
except Exception as e:
business_logger.error(
f"Failed to create default memory config for workspace {workspace.id}: {str(e)}"
)
# Don't fail the workspace list operation if config creation fails
def _create_default_memory_config(
db: Session,
workspace_id: uuid.UUID,