add_remark

This commit is contained in:
lixinyue
2026-03-12 14:17:44 +08:00
parent d660521c5c
commit 2a66775e45
16 changed files with 1274 additions and 462 deletions

View File

@@ -1,3 +1,19 @@
"""
Memory Reflection Controller
This module provides REST API endpoints for managing memory reflection configurations
and operations. It handles reflection engine setup, configuration management, and
execution of self-reflection processes across memory systems.
Key Features:
- Reflection configuration management (save, retrieve, update)
- Workspace-wide reflection execution across multiple applications
- Individual configuration-based reflection runs
- Multi-language support for reflection outputs
- Integration with Neo4j memory storage and LLM models
- Comprehensive error handling and logging
"""
import asyncio
import time
import uuid
@@ -28,9 +44,13 @@ from sqlalchemy.orm import Session
from app.utils.config_utils import resolve_config_id
# Load environment variables for configuration
load_dotenv()
# Initialize API logger for request tracking and debugging
api_logger = get_api_logger()
# Configure router with prefix and tags for API organization
router = APIRouter(
prefix="/memory",
tags=["Memory"],
@@ -43,7 +63,38 @@ async def save_reflection_config(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
"""Save reflection configuration to data_comfig table"""
"""
Save reflection configuration to memory config table
Persists reflection engine configuration settings to the data_config table,
including reflection parameters, model settings, and evaluation criteria.
Validates configuration parameters and ensures data consistency.
Args:
request: Memory reflection configuration data including:
- config_id: Configuration identifier to update
- reflection_enabled: Whether reflection is enabled
- reflection_period_in_hours: Reflection execution interval
- reflexion_range: Scope of reflection (partial/all)
- baseline: Reflection strategy (time/fact/hybrid)
- reflection_model_id: LLM model for reflection operations
- memory_verify: Enable memory verification checks
- quality_assessment: Enable quality assessment evaluation
current_user: Authenticated user saving the configuration
db: Database session for data operations
Returns:
dict: Success response with saved reflection configuration data
Raises:
HTTPException 400: If config_id is missing or parameters are invalid
HTTPException 500: If configuration save operation fails
Database Operations:
- Updates memory_config table with reflection settings
- Commits transaction and refreshes entity
- Maintains configuration consistency
"""
try:
config_id = request.config_id
config_id = resolve_config_id(config_id, db)
@@ -54,6 +105,7 @@ async def save_reflection_config(
)
api_logger.info(f"用户 {current_user.username} 保存反思配置config_id: {config_id}")
# Update reflection configuration in database
memory_config = MemoryConfigRepository.update_reflection_config(
db,
config_id=config_id,
@@ -66,6 +118,7 @@ async def save_reflection_config(
quality_assessment=request.quality_assessment
)
# Commit transaction and refresh entity
db.commit()
db.refresh(memory_config)
@@ -102,13 +155,55 @@ async def start_workspace_reflection(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
"""启动工作空间中所有匹配应用的反思功能"""
"""
Start reflection functionality for all matching applications in workspace
Initiates reflection processes across all applications within the user's current
workspace that have valid memory configurations. Processes each application's
configurations and associated end users, executing reflection operations
with proper error isolation and transaction management.
This endpoint serves as a workspace-wide reflection orchestrator, ensuring
that reflection failures for individual users don't affect other operations.
Args:
current_user: Authenticated user initiating workspace reflection
db: Database session for configuration queries
Returns:
dict: Success response with reflection results for all processed applications:
- app_id: Application identifier
- config_id: Memory configuration identifier
- end_user_id: End user identifier
- reflection_result: Individual reflection operation result
Processing Logic:
1. Retrieve all applications in the current workspace
2. Filter applications with valid memory configurations
3. For each configuration, find matching releases
4. Execute reflection for each end user with isolated transactions
5. Aggregate results with error handling per user
Error Handling:
- Individual user reflection failures are isolated
- Failed operations are logged and included in results
- Database transactions are isolated per user to prevent cascading failures
- Comprehensive error reporting for debugging
Raises:
HTTPException 500: If workspace reflection initialization fails
Performance Notes:
- Uses independent database sessions for each user operation
- Prevents transaction failures from affecting other users
- Comprehensive logging for operation tracking
"""
workspace_id = current_user.current_workspace_id
try:
api_logger.info(f"用户 {current_user.username} 启动workspace反思workspace_id: {workspace_id}")
# 使用独立的数据库会话来获取工作空间应用详情,避免事务失败
# Use independent database session to get workspace app details, avoiding transaction failures
from app.db import get_db_context
with get_db_context() as query_db:
service = WorkspaceAppService(query_db)
@@ -116,8 +211,9 @@ async def start_workspace_reflection(
reflection_results = []
# Process each application in the workspace
for data in result['apps_detailed_info']:
# 跳过没有配置的应用
# Skip applications without configurations
if not data['memory_configs']:
api_logger.debug(f"应用 {data['id']} 没有memory_configs跳过")
continue
@@ -126,22 +222,22 @@ async def start_workspace_reflection(
memory_configs = data['memory_configs']
end_users = data['end_users']
# 为每个配置和用户组合执行反思
# Execute reflection for each configuration and user combination
for config in memory_configs:
config_id_str = str(config['config_id'])
# 找到匹配此配置的所有release
# Find all releases matching this configuration
matching_releases = [r for r in releases if str(r['config']) == config_id_str]
if not matching_releases:
api_logger.debug(f"配置 {config_id_str} 没有匹配的release")
continue
# 为每个用户执行反思 - 使用独立的数据库会话
# Execute reflection for each user - using independent database sessions
for user in end_users:
api_logger.info(f"为用户 {user['id']} 启动反思config_id: {config_id_str}")
# 为每个用户创建独立的数据库会话,避免事务失败影响其他用户
# Create independent database session for each user to avoid transaction failure impact
with get_db_context() as user_db:
try:
reflection_service = MemoryReflectionService(user_db)
@@ -184,14 +280,51 @@ async def start_reflection_configs(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
"""通过config_id查询memory_config表中的反思配置信息"""
"""
Query reflection configuration information by config_id
Retrieves detailed reflection configuration settings from the memory_config
table for a specific configuration ID. Provides comprehensive reflection
parameters including model settings, evaluation criteria, and operational flags.
Args:
config_id: Configuration identifier (UUID or integer) to query
current_user: Authenticated user making the request
db: Database session for data operations
Returns:
dict: Success response with detailed reflection configuration:
- config_id: Resolved configuration identifier
- reflection_enabled: Whether reflection is enabled for this config
- reflection_period_in_hours: Reflection execution interval
- reflexion_range: Scope of reflection operations (partial/all)
- baseline: Reflection strategy (time/fact/hybrid)
- reflection_model_id: LLM model identifier for reflection
- memory_verify: Memory verification flag
- quality_assessment: Quality assessment flag
Database Operations:
- Queries memory_config table by resolved config_id
- Retrieves all reflection-related configuration fields
- Resolves configuration ID for consistent formatting
Raises:
HTTPException 404: If configuration with specified ID is not found
HTTPException 500: If configuration query operation fails
ID Resolution:
- Supports both UUID and integer config_id formats
- Automatically resolves to appropriate internal format
- Maintains consistency across different ID representations
"""
config_id = resolve_config_id(config_id, db)
try:
config_id=resolve_config_id(config_id,db)
api_logger.info(f"用户 {current_user.username} 查询反思配置config_id: {config_id}")
result = MemoryConfigRepository.query_reflection_config_by_id(db, config_id)
memory_config_id = resolve_config_id(result.config_id, db)
# 构建返回数据
# Build response data with comprehensive configuration details
reflection_config = {
"config_id": memory_config_id,
"reflection_enabled": result.enable_self_reflexion,
@@ -204,10 +337,12 @@ async def start_reflection_configs(
}
api_logger.info(f"成功查询反思配置config_id: {config_id}")
return success(data=reflection_config, msg="反思配置查询成功")
api_logger.info(f"Successfully queried reflection config, config_id: {config_id}")
return success(data=reflection_config, msg="Reflection configuration query successful")
except HTTPException:
# 重新抛出HTTP异常
# Re-raise HTTP exceptions without modification
raise
except Exception as e:
api_logger.error(f"查询反思配置失败: {str(e)}")
@@ -223,13 +358,66 @@ async def reflection_run(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
"""Activate the reflection function for all matching applications in the workspace"""
# 使用集中化的语言校验
"""
Execute reflection engine with specified configuration
Runs the reflection engine using configuration parameters from the database.
Validates model availability, sets up the reflection engine with proper
configuration, and executes the reflection process with multi-language support.
This endpoint provides a test run capability for reflection configurations,
allowing users to validate their reflection settings and see results before
deploying to production environments.
Args:
config_id: Configuration identifier (UUID or integer) for reflection settings
language_type: Language preference header for output localization (optional)
current_user: Authenticated user executing the reflection
db: Database session for configuration queries
Returns:
dict: Success response with reflection execution results including:
- baseline: Reflection strategy used
- source_data: Input data processed
- memory_verifies: Memory verification results (if enabled)
- quality_assessments: Quality assessment results (if enabled)
- reflexion_data: Generated reflection insights and solutions
Configuration Validation:
- Verifies configuration exists in database
- Validates LLM model availability
- Falls back to default model if specified model is unavailable
- Ensures all required parameters are properly set
Reflection Engine Setup:
- Creates ReflectionConfig with database parameters
- Initializes Neo4j connector for memory access
- Sets up ReflectionEngine with validated model
- Configures language preferences for output
Error Handling:
- Model validation with fallback to default
- Configuration validation and error reporting
- Comprehensive logging for debugging
- Graceful handling of missing configurations
Raises:
HTTPException 404: If configuration is not found
HTTPException 500: If reflection execution fails
Performance Notes:
- Direct database query for configuration retrieval
- Model validation to prevent runtime failures
- Efficient reflection engine initialization
- Language-aware output processing
"""
# Use centralized language validation for consistent localization
language = get_language_from_header(language_type)
api_logger.info(f"用户 {current_user.username} 查询反思配置config_id: {config_id}")
config_id = resolve_config_id(config_id, db)
# 使用MemoryConfigRepository查询反思配置
# Query reflection configuration using MemoryConfigRepository
result = MemoryConfigRepository.query_reflection_config_by_id(db, config_id)
if not result:
raise HTTPException(
@@ -239,7 +427,7 @@ async def reflection_run(
api_logger.info(f"成功查询反思配置config_id: {config_id}")
# 验证模型ID是否存在
# Validate model ID existence
model_id = result.reflection_model_id
if model_id:
try:
@@ -250,6 +438,7 @@ async def reflection_run(
# 可以设置为None让反思引擎使用默认模型
model_id = None
# Create reflection configuration with database parameters
config = ReflectionConfig(
enabled=result.enable_self_reflexion,
iteration_period=result.iteration_period,
@@ -262,11 +451,13 @@ async def reflection_run(
model_id=model_id,
language_type=language_type
)
# Initialize Neo4j connector and reflection engine
connector = Neo4jConnector()
engine = ReflectionEngine(
config=config,
neo4j_connector=connector,
llm_client=model_id # 传入验证后的 model_id
llm_client=model_id # Pass validated model_id
)
result=await (engine.reflection_run())