feat(memory-config): Add V1 memory config management API endpoints

-Add full CRUD endpoints for memory config via API Key auth (/v1/memory_config)
-Add V1 request schemas: ConfigCreateRequest, ConfigUpdateRequest, ConfigUpdateExtractedRequest, ConfigUpdateForgettingRequest
-Add config-workspace ownership verification
-Add scenes/simple, read_all_config, read_config_extracted query endpoints
-Add create_config, update_config, update_config_extracted, update_config_forgetting, delete_config mutation endpoints
-Reuse management-side controllers with pre-validation ownership checks
This commit is contained in:
miao
2026-04-16 19:05:24 +08:00
parent 2f0bb793d8
commit ddfd81259a
3 changed files with 460 additions and 5 deletions

View File

@@ -108,8 +108,9 @@ async def create_end_user(
workspace_id=workspace_id,
other_id=payload.other_id,
memory_config_id=memory_config_id,
other_name=payload.other_name,
)
end_user.other_name = payload.other_name
logger.info(f"End user ready: {end_user.id}")
result = {

View File

@@ -1,20 +1,84 @@
"""Memory Config 服务接口 - 基于 API Key 认证"""
from fastapi import APIRouter, Depends, Request
from typing import Optional
import uuid
from fastapi import APIRouter, Body, Depends, Header, Query, Request
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.controllers import memory_storage_controller
from app.controllers import memory_forget_controller
from app.controllers import ontology_controller
from app.schemas.memory_storage_schema import ForgettingConfigUpdateRequest
from app.core.api_key_auth import require_api_key
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.repositories.memory_config_repository import MemoryConfigRepository
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.memory_api_schema import ListConfigsResponse
from app.schemas.memory_api_schema import (
ConfigUpdateExtractedRequest,
ConfigUpdateRequest,
ListConfigsResponse,
ConfigCreateRequest,
ConfigUpdateForgettingRequest,
)
from app.schemas.memory_storage_schema import (
ConfigUpdate,
ConfigUpdateExtracted,
ConfigParamsCreate,
)
from app.services import api_key_service
from app.services.memory_api_service import MemoryAPIService
from app.utils.config_utils import resolve_config_id
router = APIRouter(prefix="/memory_config", tags=["V1 - Memory Config API"])
logger = get_business_logger()
def _get_current_user(api_key_auth: ApiKeyAuth, db: Session):
"""Build a current_user object from API key auth
Args:
api_key_auth: Validated API key auth info
db: Database session
Returns:
User object with current_workspace_id set
"""
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return current_user
def _verify_config_ownership(config_id:str, workspace_id:uuid.UUID, db:Session):
"""Verify that the config belongs to the workspace.
Args:
config_id: The ID of the config to verify
workspace_id: The workspace ID tocheck against
db: Database session for querying
Raises:
BusinessException: If the config does not exist or does not belong to the workspace
"""
try:
resolved_id = resolve_config_id(config_id, db)
except ValueError as e:
raise BusinessException(
message=f"Invalid config_id: {e}",
code=BizCode.INVALID_PARAMETER,
)
config = MemoryConfigRepository.get_by_id(db, resolved_id)
if not config or config.workspace_id != workspace_id:
raise BusinessException(
message="Config not found or access denied",
code=BizCode.MEMORY_CONFIG_NOT_FOUND,
)
@router.get("/configs")
@require_api_key(scopes=["memory"])
async def list_memory_configs(
@@ -37,3 +101,245 @@ async def list_memory_configs(
logger.info(f"Listed {result['total']} configs for workspace: {api_key_auth.workspace_id}")
return success(data=ListConfigsResponse(**result).model_dump(), msg="Configs listed successfully")
@router.get("/read_all_config")
@require_api_key(scopes=["memory"])
async def read_all_config(
request:Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
List all memory configs with full details (enhanced version).
Returns complete config fields for the authorized workspace.
No config_id ownership check needed — results are filtered by workspace.
"""
logger.info(f"V1 get all configs (full) - workspace: {api_key_auth.workspace_id}")
current_user = _get_current_user(api_key_auth, db)
return memory_storage_controller.read_all_config(
current_user=current_user,
db=db,
)
@router.get("/scenes/simple")
@require_api_key(scopes=["memory"])
async def get_ontology_scenes(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Get available ontology scenes for the workspace.
Returns a simple list of scene_id and scene_name for dropdown selection.
Used before creating a memory config to choose which ontology scene to associate.
"""
logger.info(f"V1 get scenes - workspace: {api_key_auth.workspace_id}")
current_user = _get_current_user(api_key_auth, db)
return await ontology_controller.get_scenes_simple(
db=db,
current_user=current_user,
)
@router.get("/read_config_extracted")
@require_api_key(scopes=["memory"])
async def read_config_extracted(
request: Request,
config_id: str = Query(..., description="config_id"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Get extraction engine config details for a specific config.
Only configs belonging to the authorized workspace can be queried.
"""
logger.info(f"V1 read extracted config - config_id: {config_id}, workspace: {api_key_auth.workspace_id}")
_verify_config_ownership(config_id, api_key_auth.workspace_id, db)
current_user = _get_current_user(api_key_auth, db)
return memory_storage_controller.read_config_extracted(
config_id = config_id,
current_user = current_user,
db = db,
)
@router.post("/create_config")
@require_api_key(scopes=["memory"])
async def create_memory_config(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(None, description="Request body"),
x_language_type: Optional[str] = Header(None, alias="X-Language-Type"),
):
"""
Create a new memory config for the workspace.
The config will be associated with the workspace of the API Key.
config_name is required, other fields are optional.
"""
body = await request.json()
payload = ConfigCreateRequest(**body)
logger.info(f"V1 create config - workspace: {api_key_auth.workspace_id}, config_name: {payload.config_name}")
# 构造管理端 Schemaworkspace_id 从 API Key 注入
current_user = _get_current_user(api_key_auth, db)
mgmt_payload = ConfigParamsCreate(
config_name=payload.config_name,
config_desc=payload.config_desc or "",
scene_id=payload.scene_id,
llm_id=payload.llm_id,
embedding_id=payload.embedding_id,
rerank_id=payload.rerank_id,
reflection_model_id=payload.reflection_model_id,
emotion_model_id=payload.emotion_model_id,
)
#将返回数据中UUID序列化处理
result =memory_storage_controller.create_config(
payload=mgmt_payload,
current_user=current_user,
db=db,
x_language_type=x_language_type,
)
return jsonable_encoder(result)
@router.put("/update_config")
@require_api_key(scopes=["memory"])
async def update_memory_config(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(None, description="Request body"),
):
"""
Update memory config basic info (name, description, scene).
Requires API Key with 'memory' scope
Only configs belonging to the authorized workspace can be updated.
"""
body = await request.json()
payload = ConfigUpdateRequest(**body)
logger.info(f"V1 update config - config_id: {payload.config_id}, workspace: {api_key_auth.workspace_id}")
_verify_config_ownership(payload.config_id, api_key_auth.workspace_id, db)
current_user = _get_current_user(api_key_auth, db)
mgmt_payload = ConfigUpdate(
config_id = payload.config_id,
config_name = payload.config_name,
config_desc = payload.config_desc,
scene_id = payload.scene_id,
)
return memory_storage_controller.update_config(
payload = mgmt_payload,
current_user = current_user,
db = db,
)
@router.put("/update_config_extracted")
@require_api_key(scopes=["memory"])
async def update_memory_config_extracted(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(None, description="Request body"),
):
"""
update memory config extraction engine config (models, thresholds, chunking, pruning, etc.).
Requires API Key with 'memory' scope.
Only configs belonging to the authorized workspace can be updated.
"""
body = await request.json()
payload = ConfigUpdateExtractedRequest(**body)
logger.info(f"V1 update extracted config - config_id: {payload.config_id}, workspace: {api_key_auth.workspace_id}")
#校验权限
_verify_config_ownership(payload.config_id, api_key_auth.workspace_id, db)
current_user = _get_current_user(api_key_auth, db)
update_fields = payload.model_dump(exclude_unset=True)
mgmt_payload = ConfigUpdateExtracted(**update_fields)
return memory_storage_controller.update_config_extracted(
payload = mgmt_payload,
current_user = current_user,
db = db,
)
@router.put("/update_config_forgetting")
@require_api_key(scopes=["memory"])
async def update_memory_config_forgetting(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(None, description="Request body"),
):
"""
update memory config forgetting settings (forgetting strategy, parameters, etc.).
Requires API Key with 'memory' scope.
Only configs belonging to the authorized workspace can be updated.
"""
body = await request.json()
payload = ConfigUpdateForgettingRequest(**body)
logger.info(f"V1 update forgetting config - config_id: {payload.config_id}, workspace: {api_key_auth.workspace_id}")
#校验权限
_verify_config_ownership(payload.config_id, api_key_auth.workspace_id, db)
current_user = _get_current_user(api_key_auth, db)
update_fields = payload.model_dump(exclude_unset=True)
mgmt_payload = ForgettingConfigUpdateRequest(**update_fields)
#将返回数据中UUID序列化处理
result = await memory_forget_controller.update_forgetting_config(
payload = mgmt_payload,
current_user = current_user,
db = db,
)
return jsonable_encoder(result)
@router.delete("/delete_config")
@require_api_key(scopes=["memory"])
async def delete_memory_config(
config_id: str,
request: Request,
force: bool = Query(False, description="是否强制删除(即使有终端用户正在使用)"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Delete a memory config.
- Default configs cannot be deleted.
- If end users are connected and force=False, returns a warning.
- If force=True, clears end user references and deletes the config.
Only configs belonging to the authorized workspace can be deleted.
"""
logger.info(f"V1 delete config - config_id: {config_id}, force: {force}, workspace: {api_key_auth.workspace_id}")
_verify_config_ownership(config_id, api_key_auth.workspace_id, db)
current_user = _get_current_user(api_key_auth, db)
return memory_storage_controller.delete_config(
config_id=config_id,
force=force,
current_user=current_user,
db=db,
)

View File

@@ -4,9 +4,10 @@ This module defines Pydantic schemas for the Memory API Service endpoints,
including request validation and response structures for read and write operations.
"""
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional
import uuid
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
class MemoryWriteRequest(BaseModel):
@@ -231,6 +232,7 @@ class MemoryConfigItem(BaseModel):
created_at: Optional[str] = Field(None, description="Creation timestamp")
updated_at: Optional[str] = Field(None, description="Last update timestamp")
# ========== V1 记忆配置管理接口 Schema ==========
class ListConfigsResponse(BaseModel):
"""Response schema for listing memory configs.
@@ -241,3 +243,149 @@ class ListConfigsResponse(BaseModel):
"""
configs: List[MemoryConfigItem] = Field(default_factory=list, description="List of configs")
total: int = Field(0, description="Total number of configs")
class ConfigCreateRequest(BaseModel):
"""Request schema for creating a new memory config."""
config_name: str = Field(..., description="Configuration name")
config_desc: Optional[str] = Field("", description="Configuration description")
scene_id: uuid.UUID = Field(..., description="Associated ontology scene ID (UUID, required)")
llm_id: Optional[str] = Field(None, description="LLM model configuration ID")
embedding_id: Optional[str] = Field(None, description="Embedding model configuration ID")
rerank_id: Optional[str] = Field(None, description="Reranking model configuration ID")
reflection_model_id: Optional[str] = Field(None, description="Reflection model ID")
emotion_model_id: Optional[str] = Field(None, description="Emotion analysis model ID")
@field_validator("config_name")
@classmethod
def validate_config_name(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("config_name is required and cannot be empty")
return v.strip()
class ConfigUpdateRequest(BaseModel):
"""Request schema for updating memory config basic info.
Attributes:
config_id: Configuration UUID to update (required)
config_name: New configuration name
config_desc: New configuration description
scene_id: New associated ontology scene ID
"""
config_id: str = Field(..., description="Configuration ID to update")
config_name: Optional[str] = Field(None, description="Configuration name")
config_desc: Optional[str] = Field(None, description="Configuration description")
scene_id: Optional[uuid.UUID] = Field(None, description="Associated ontology scene ID")
@field_validator("config_id")
@classmethod
def validate_config_id(cls, v: str) -> str:
"""Validate that config_id is not empty."""
if not v or not v.strip():
raise ValueError("config_id is required and cannot be empty")
return v.strip()
class ConfigUpdateExtractedRequest(BaseModel):
"""Request schema for updating memory config extracted parameters.
Attributes:
config_id: Configuration UUID to update (required)
llm_id: Optional LLM model configuration ID
audio_id: Optional audio model configuration ID
vision_id: Optional vision model configuration ID
video_id: Optional video model configuration ID
embedding_id: Optional embedding model configuration ID
rerank_id: Optional reranking model configuration ID
enable_llm_dedup_blockwise: Optional toggle for LLM decision deduplication
enable_llm_disambiguation: Optional toggle for LLM decision disambiguation
deep_retrieval: Optional toggle for deep retrieval
t_type_strict: Optional float (0-1) for type strictness threshold
t_name_strict: Optional float (0-1) for name strictness threshold
t_overall: Optional float (0-1) for overall strictness threshold
state: Optional boolean for config active state
chunker_strategy: Optional string for memory chunking strategy
statement_granularity: Optional int (1-3) for statement extraction granularity
include_dialogue_context: Optional boolean for including dialogue context in retrieval
max_context: Optional int for maximum dialogue context length in characters
pruning_enabled: Optional boolean to enable intelligent semantic pruning
pruning_scene: Optional string for semantic pruning scene
pruning_threshold: Optional float (0-0.9) for semantic pruning threshold
enable_self_reflexion: Optional boolean to enable self-reflexion
iteration_period: Optional string for reflexion iteration period in hours (1, 3, 6, 12, 24)
reflexion_range: Optional string for reflexion range (partial or all)
baseline: Optional string for baseline (TIME/FACT/TIME-FACT)
"""
config_id: str = Field(..., description="Configuration ID (UUID)")
llm_id: Optional[str] = Field(None, description="LLM model configuration ID")
audio_id: Optional[str] = Field(None, description="Audio model ID")
vision_id: Optional[str] = Field(None, description="Vision model ID")
video_id: Optional[str] = Field(None, description="Video model ID")
embedding_id: Optional[str] = Field(None, description="Embedding model configuration ID")
rerank_id: Optional[str] = Field(None, description="Reranking model configuration ID")
enable_llm_dedup_blockwise: Optional[bool] = Field(None, description="Enable LLM decision deduplication")
enable_llm_disambiguation: Optional[bool] = Field(None, description="Enable LLM decision disambiguation")
deep_retrieval: Optional[bool] = Field(None, description="Deep retrieval toggle")
t_type_strict: Optional[float] = Field(None, ge=0.0, le=1.0, description="type strictness threshold")
t_name_strict: Optional[float] = Field(None, ge=0.0, le=1.0, description="name strictness threshold")
t_overall: Optional[float] = Field(None, ge=0.0, le=1.0, description="overall strictness threshold")
state: Optional[bool] = Field(None, description="config active state")
# 句子提取
chunker_strategy: Optional[str] = Field(None, description="memory chunking strategy")
statement_granularity: Optional[int] = Field(None, ge=1, le=3, description="statement extraction granularity")
include_dialogue_context: Optional[bool] = Field(None, description="whether to include dialogue context in retrieval")
max_context: Optional[int] = Field(None, gt=100, description="maximum dialogue context length in characters")
# 剪枝配置:与 runtime.json 中 pruning 段对应
pruning_enabled: Optional[bool] = Field(None, description="whether to enable intelligent semantic pruning")
pruning_scene: Optional[str] = Field(None, description="semantic pruning scene")
pruning_threshold: Optional[float] = Field(None, ge=0.0, le=0.9, description="semantic pruning threshold (0-0.9)")
enable_self_reflexion: Optional[bool] = Field(None, description="whether to enable self-reflexion")
iteration_period: Optional[Literal["1", "3", "6", "12", "24"]] = Field(None, description="reflexion iteration period in hours (1, 3, 6, 12, 24)")
reflexion_range: Optional[Literal["partial", "all"]] = Field(None, description="reflexion range: partial/all")
baseline: Optional[Literal["TIME", "FACT", "TIME-FACT"]] = Field(None, description="baseline: TIME/FACT/TIME-FACT")
@field_validator("config_id")
@classmethod
def validate_config_id(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("config_id is required and cannot be empty")
return v.strip()
class ConfigUpdateForgettingRequest(BaseModel):
"""Request schema for updating memory config forgetting parameters.
Attributes:
config_id: Configuration UUID to update (required)
decay_constant: Decay constant for forgetting
lambda_time: Time decay parameter
lambda_mem: Memory decay parameter
offset: Offset for forgetting curve
max_history_length: Maximum history length to consider for forgetting
forgetting_threshold: Threshold for forgetting
min_days_since_access: Minimum days since last access to trigger forgetting
enable_llm_summary: Whether to use LLM-generated summaries for forgetting
max_merge_batch_size: Maximum batch size for merging nodes during forgetting
forgetting_interval_hours: Interval in hours for periodic forgetting
"""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
config_id: str = Field(..., description="Configuration ID (UUID)")
decay_constant: Optional[float] = Field(None, ge=0.0, le=1.0, description="Decay constant for forgetting")
lambda_time: Optional[float] = Field(None, ge=0.0, le=1.0, description="Time decay parameter")
lambda_mem: Optional[float] = Field(None, ge=0.0, le=1.0, description="Memory decay parameter")
offset: Optional[float] = Field(None, ge=0.0, le=1.0, description="Offset for forgetting curve")
max_history_length: Optional[int] = Field(None, ge=10, le=1000, description="Maximum history length to consider for forgetting")
forgetting_threshold: Optional[float] = Field(None, ge=0.0, le=1.0, description="Forgetting threshold")
min_days_since_access: Optional[int] = Field(None, ge=1, le=365, description="Minimum days since last access to trigger forgetting")
enable_llm_summary: Optional[bool] = Field(None, description="Whether to use LLM-generated summaries for forgetting")
max_merge_batch_size: Optional[int] = Field(None, ge=1, le=1000, description="Maximum batch size for merging nodes during forgetting")
forgetting_interval_hours: Optional[int] = Field(None, ge=1, le=168, description="Interval in hours for periodic forgetting")
@field_validator("config_id")
@classmethod
def validate_config_id(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("config_id is required and cannot be empty")
return v.strip()