diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index f55ea5b5..43f177ef 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -11,15 +11,16 @@ from app.core.response_utils import success from app.db import get_db from app.dependencies import get_current_user, cur_workspace_access_guard from app.models import User -from app.models.app_model import AppType, App +from app.models.app_model import AppType from app.repositories import knowledge_repository +from app.repositories.end_user_repository import EndUserRepository from app.schemas import app_schema from app.schemas.response_schema import PageData, PageMeta +from app.schemas.workflow_schema import WorkflowConfig as WorkflowConfigSchema from app.schemas.workflow_schema import WorkflowConfigUpdate from app.services import app_service, workspace_service from app.services.agent_config_helper import enrich_agent_config from app.services.app_service import AppService -from app.schemas.workflow_schema import WorkflowConfig as WorkflowConfigSchema from app.services.workflow_service import WorkflowService, get_workflow_service router = APIRouter(prefix="/apps", tags=["Apps"]) @@ -405,6 +406,15 @@ async def draft_run( # 只读操作,允许访问共享应用 service._validate_app_accessible(app, workspace_id) + if payload.user_id is None: + end_user_repo = EndUserRepository(db) + new_end_user = end_user_repo.get_or_create_end_user( + app_id=app_id, + other_id=str(current_user.id), + original_user_id=str(current_user.id) # Save original user_id to other_id + ) + payload.user_id = str(new_end_user.id) + # 处理会话ID(创建或验证) conversation_id = await draft_service._ensure_conversation( conversation_id=payload.conversation_id, diff --git a/api/app/controllers/emotion_controller.py b/api/app/controllers/emotion_controller.py index 7f0cb91b..24bdc434 100644 --- a/api/app/controllers/emotion_controller.py +++ b/api/app/controllers/emotion_controller.py @@ -18,6 +18,7 @@ from app.models.user_model import User from app.schemas.emotion_schema import ( EmotionHealthRequest, EmotionSuggestionsRequest, + EmotionGenerateSuggestionsRequest, EmotionTagsRequest, EmotionWordcloudRequest, ) @@ -58,7 +59,7 @@ async def get_emotion_tags( "limit": request.limit } ) - + # 调用服务层 data = await emotion_service.get_emotion_tags( end_user_id=request.group_id, @@ -67,7 +68,7 @@ async def get_emotion_tags( end_date=request.end_date, limit=request.limit ) - + api_logger.info( "情绪标签统计获取成功", extra={ @@ -76,9 +77,9 @@ async def get_emotion_tags( "tags_count": len(data.get("tags", [])) } ) - + return success(data=data, msg="情绪标签获取成功") - + except Exception as e: api_logger.error( f"获取情绪标签统计失败: {str(e)}", @@ -107,14 +108,14 @@ async def get_emotion_wordcloud( "limit": request.limit } ) - + # 调用服务层 data = await emotion_service.get_emotion_wordcloud( end_user_id=request.group_id, emotion_type=request.emotion_type, limit=request.limit ) - + api_logger.info( "情绪词云数据获取成功", extra={ @@ -122,9 +123,9 @@ async def get_emotion_wordcloud( "total_keywords": data.get("total_keywords", 0) } ) - + return success(data=data, msg="情绪词云获取成功") - + except Exception as e: api_logger.error( f"获取情绪词云数据失败: {str(e)}", @@ -151,7 +152,7 @@ async def get_emotion_health( status_code=status.HTTP_400_BAD_REQUEST, detail="时间范围参数无效,必须是 7d、30d 或 90d" ) - + api_logger.info( f"用户 {current_user.username} 请求获取情绪健康指数", extra={ @@ -159,13 +160,13 @@ async def get_emotion_health( "time_range": request.time_range } ) - + # 调用服务层 data = await emotion_service.calculate_emotion_health_index( end_user_id=request.group_id, time_range=request.time_range ) - + api_logger.info( "情绪健康指数获取成功", extra={ @@ -174,9 +175,9 @@ async def get_emotion_health( "level": data.get("level", "未知") } ) - + return success(data=data, msg="情绪健康指数获取成功") - + except HTTPException: raise except Exception as e: @@ -198,15 +199,80 @@ async def get_emotion_suggestions( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """获取个性化情绪建议 - + """获取个性化情绪建议(从缓存读取) + Args: request: 包含 group_id 和可选的 config_id db: 数据库会话 current_user: 当前用户 - + Returns: - 个性化情绪建议响应 + 缓存的个性化情绪建议响应 + """ + try: + api_logger.info( + f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)", + extra={ + "group_id": request.group_id, + "config_id": request.config_id + } + ) + + # 从缓存获取建议 + data = await emotion_service.get_cached_suggestions( + end_user_id=request.group_id, + db=db + ) + + if data is None: + # 缓存不存在或已过期 + api_logger.info( + f"用户 {request.group_id} 的建议缓存不存在或已过期", + extra={"group_id": request.group_id} + ) + return fail( + BizCode.RESOURCE_NOT_FOUND, + "建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议", + None + ) + + api_logger.info( + "个性化建议获取成功(缓存)", + extra={ + "group_id": request.group_id, + "suggestions_count": len(data.get("suggestions", [])) + } + ) + + return success(data=data, msg="个性化建议获取成功(缓存)") + + except Exception as e: + api_logger.error( + f"获取个性化建议失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取个性化建议失败: {str(e)}" + ) + + +@router.post("/generate_suggestions", response_model=ApiResponse) +async def generate_emotion_suggestions( + request: EmotionGenerateSuggestionsRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """生成个性化情绪建议(调用LLM并缓存) + + Args: + request: 包含 group_id、可选的 config_id 和 force_refresh + db: 数据库会话 + current_user: 当前用户 + + Returns: + 新生成的个性化情绪建议响应 """ try: # 验证 config_id(如果提供) @@ -232,38 +298,46 @@ async def get_emotion_suggestions( return fail(BizCode.INVALID_PARAMETER, "配置ID无效", f"配置 {config_id} 不存在") except Exception as e: return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e)) - + api_logger.info( - f"用户 {current_user.username} 请求获取个性化情绪建议", + f"用户 {current_user.username} 请求生成个性化情绪建议", extra={ "group_id": request.group_id, "config_id": config_id } ) - - # 调用服务层 + + # 调用服务层生成建议 data = await emotion_service.generate_emotion_suggestions( end_user_id=request.group_id, db=db ) - + + # 保存到缓存 + await emotion_service.save_suggestions_cache( + end_user_id=request.group_id, + suggestions_data=data, + db=db, + expires_hours=24 + ) + api_logger.info( - "个性化建议获取成功", + "个性化建议生成成功", extra={ "group_id": request.group_id, "suggestions_count": len(data.get("suggestions", [])) } ) - - return success(data=data, msg="个性化建议获取成功") - + + return success(data=data, msg="个性化建议生成成功") + except Exception as e: api_logger.error( - f"获取个性化建议失败: {str(e)}", + f"生成个性化建议失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"获取个性化建议失败: {str(e)}" + detail=f"生成个性化建议失败: {str(e)}" ) diff --git a/api/app/controllers/home_page_controller.py b/api/app/controllers/home_page_controller.py index 0b758cd1..de4a78a3 100644 --- a/api/app/controllers/home_page_controller.py +++ b/api/app/controllers/home_page_controller.py @@ -33,5 +33,12 @@ def get_workspace_list( def get_system_version(): """获取系统版本号+说明""" current_version = settings.SYSTEM_VERSION - version_introduction = HomePageService.load_version_introduction(current_version) - return success(data={"version": current_version, "introduction": version_introduction}, msg="系统版本获取成功") \ No newline at end of file + version_info = HomePageService.load_version_introduction(current_version) + return success( + data={ + "version": current_version, + "introduction": version_info.get("introduction"), + "introduction_en": version_info.get("introduction_en") + }, + msg="系统版本获取成功" + ) \ No newline at end of file diff --git a/api/app/controllers/implicit_memory_controller.py b/api/app/controllers/implicit_memory_controller.py index 6ef39929..eb7037ff 100644 --- a/api/app/controllers/implicit_memory_controller.py +++ b/api/app/controllers/implicit_memory_controller.py @@ -11,6 +11,7 @@ from app.dependencies import ( ) from app.models.user_model import User from app.schemas.response_schema import ApiResponse +from app.schemas.implicit_memory_schema import GenerateProfileRequest from app.services.implicit_memory_service import ImplicitMemoryService from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session @@ -133,7 +134,7 @@ async def get_preference_tags( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user preference tags with filtering options. + Get user preference tags from cache. Args: user_id: Target user ID @@ -143,35 +144,56 @@ async def get_preference_tags( end_date: Optional end date filter Returns: - List of preference tags matching the filters + List of preference tags from cache """ - api_logger.info(f"Preference tags requested for user: {user_id}") + api_logger.info(f"Preference tags requested for user: {user_id} (from cache)") try: # Validate inputs validate_user_id(user_id) - validate_confidence_threshold(confidence_threshold) - validate_date_range(start_date, end_date) # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - # Build date range - date_range = None - if start_date and end_date: - from app.schemas.implicit_memory_schema import DateRange - date_range = DateRange(start_date=start_date, end_date=end_date) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - # Get preference tags - tags = await service.get_preference_tags( - user_id=user_id, - confidence_threshold=confidence_threshold, - tag_category=tag_category, - date_range=date_range - ) + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) - api_logger.info(f"Retrieved {len(tags)} preference tags for user: {user_id}") - return success(data=[tag.model_dump(mode='json') for tag in tags], msg="偏好标签获取成功") + # Extract preferences from cache + preferences = cached_profile.get("preferences", []) + + # Apply filters (client-side filtering on cached data) + filtered_preferences = [] + for pref in preferences: + # Filter by confidence threshold + if confidence_threshold is not None and pref.get("confidence_score", 0) < confidence_threshold: + continue + + # Filter by category if specified + if tag_category and pref.get("category") != tag_category: + continue + + # Filter by date range if specified + if start_date or end_date: + created_at_ts = pref.get("created_at") + if created_at_ts: + created_at = datetime.fromtimestamp(created_at_ts / 1000) + if start_date and created_at < start_date: + continue + if end_date and created_at > end_date: + continue + + filtered_preferences.append(pref) + + api_logger.info(f"Retrieved {len(filtered_preferences)} preference tags for user: {user_id} (from cache)") + return success(data=filtered_preferences, msg="偏好标签获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "偏好标签获取", user_id) @@ -186,16 +208,16 @@ async def get_dimension_portrait( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's four-dimension personality portrait. + Get user's four-dimension personality portrait from cache. Args: user_id: Target user ID - include_history: Whether to include historical trend data + include_history: Whether to include historical trend data (ignored for cached data) Returns: - Four-dimension personality portrait with scores and evidence + Four-dimension personality portrait from cache """ - api_logger.info(f"Dimension portrait requested for user: {user_id}") + api_logger.info(f"Dimension portrait requested for user: {user_id} (from cache)") try: # Validate inputs @@ -204,13 +226,22 @@ async def get_dimension_portrait( # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - portrait = await service.get_dimension_portrait( - user_id=user_id, - include_history=include_history - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Dimension portrait retrieved for user: {user_id}") - return success(data=portrait.model_dump(mode='json'), msg="四维画像获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) + + # Extract portrait from cache + portrait = cached_profile.get("portrait", {}) + + api_logger.info(f"Dimension portrait retrieved for user: {user_id} (from cache)") + return success(data=portrait, msg="四维画像获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "四维画像获取", user_id) @@ -225,16 +256,16 @@ async def get_interest_area_distribution( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's interest area distribution across four areas. + Get user's interest area distribution from cache. Args: user_id: Target user ID - include_trends: Whether to include trend analysis data + include_trends: Whether to include trend analysis data (ignored for cached data) Returns: - Interest area distribution with percentages and evidence + Interest area distribution from cache """ - api_logger.info(f"Interest area distribution requested for user: {user_id}") + api_logger.info(f"Interest area distribution requested for user: {user_id} (from cache)") try: # Validate inputs @@ -243,13 +274,22 @@ async def get_interest_area_distribution( # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - distribution = await service.get_interest_area_distribution( - user_id=user_id, - include_trends=include_trends - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Interest area distribution retrieved for user: {user_id}") - return success(data=distribution.model_dump(mode='json'), msg="兴趣领域分布获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) + + # Extract interest areas from cache + interest_areas = cached_profile.get("interest_areas", {}) + + api_logger.info(f"Interest area distribution retrieved for user: {user_id} (from cache)") + return success(data=interest_areas, msg="兴趣领域分布获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id) @@ -266,7 +306,7 @@ async def get_behavior_habits( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's behavioral habits with filtering options. + Get user's behavioral habits from cache. Args: user_id: Target user ID @@ -275,38 +315,117 @@ async def get_behavior_habits( time_period: Filter by time period (current, past) Returns: - List of behavioral habits matching the filters + List of behavioral habits from cache """ - api_logger.info(f"Behavior habits requested for user: {user_id}") + api_logger.info(f"Behavior habits requested for user: {user_id} (from cache)") try: # Validate inputs validate_user_id(user_id) - # Convert string confidence level to numerical - numerical_confidence = None - if confidence_level: - confidence_mapping = { - "high": 85, - "medium": 50, - "low": 20 - } - numerical_confidence = confidence_mapping.get(confidence_level.lower()) - # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - habits = await service.get_behavior_habits( - user_id=user_id, - confidence_level=numerical_confidence, - frequency_pattern=frequency_pattern, - time_period=time_period - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Retrieved {len(habits)} behavior habits for user: {user_id}") - return success(data=[habit.model_dump(mode='json') for habit in habits], msg="行为习惯获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) + + # Extract habits from cache + habits = cached_profile.get("habits", []) + + # Apply filters (client-side filtering on cached data) + filtered_habits = [] + for habit in habits: + # Filter by confidence level + if confidence_level: + confidence_mapping = { + "high": 85, + "medium": 50, + "low": 20 + } + numerical_confidence = confidence_mapping.get(confidence_level.lower()) + if habit.get("confidence_level", 0) < numerical_confidence: + continue + + # Filter by frequency pattern + if frequency_pattern and habit.get("frequency_pattern") != frequency_pattern: + continue + + # Filter by time period + if time_period: + is_current = habit.get("is_current", True) + if time_period.lower() == "current" and not is_current: + continue + elif time_period.lower() == "past" and is_current: + continue + + filtered_habits.append(habit) + + api_logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user: {user_id} (from cache)") + return success(data=filtered_habits, msg="行为习惯获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "行为习惯获取", user_id) + + + +@router.post("/generate_profile", response_model=ApiResponse) +@cur_workspace_access_guard() +async def generate_implicit_memory_profile( + request: GenerateProfileRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +) -> ApiResponse: + """ + Generate complete user profile (all 4 modules) and cache it. + + Args: + request: Generate profile request with end_user_id + db: Database session + current_user: Current authenticated user + + Returns: + Complete user profile with all modules + """ + end_user_id = request.end_user_id + api_logger.info(f"Generate profile requested for user: {end_user_id}") + + try: + # Validate inputs + validate_user_id(end_user_id) + + # Create service with user-specific config + service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + + # Generate complete profile (calls LLM for all 4 modules) + api_logger.info(f"开始生成完整用户画像: user={end_user_id}") + profile_data = await service.generate_complete_profile(user_id=end_user_id) + + # Save to cache + await service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db, + expires_hours=168 # 7 days + ) + + api_logger.info(f"用户画像生成并缓存成功: user={end_user_id}") + + # Add metadata + profile_data["end_user_id"] = end_user_id + profile_data["cached"] = False + + return success(data=profile_data, msg="用户画像生成成功") + + except Exception as e: + api_logger.error(f"生成用户画像失败: user={end_user_id}, error={str(e)}", exc_info=True) + return handle_implicit_memory_error(e, "用户画像生成", end_user_id) diff --git a/api/app/controllers/multi_agent_controller.py b/api/app/controllers/multi_agent_controller.py index 55614dea..dbcc2536 100644 --- a/api/app/controllers/multi_agent_controller.py +++ b/api/app/controllers/multi_agent_controller.py @@ -74,7 +74,7 @@ def get_multi_agent_configs( "app_id": str(app_id), "default_model_config_id": None, "model_parameters": None, - "orchestration_mode": "conditional", + "orchestration_mode": "supervisor", "sub_agents": [], "routing_rules": [], "execution_config": { diff --git a/api/app/controllers/public_share_controller.py b/api/app/controllers/public_share_controller.py index 354a58ef..04da05df 100644 --- a/api/app/controllers/public_share_controller.py +++ b/api/app/controllers/public_share_controller.py @@ -466,7 +466,7 @@ async def chat( conversation_id=conversation.id, # 使用已创建的会话 ID user_id=str(new_end_user.id), # 转换为字符串 variables=payload.variables, - config= payload.agent_config, + config=agent_config, web_search=payload.web_search, memory=payload.memory, storage_type=storage_type, @@ -565,11 +565,12 @@ async def chat( config = workflow_config_4_app_release(release) if payload.stream: async def event_generator(): + async for event in app_chat_service.workflow_chat_stream( message=payload.message, conversation_id=conversation.id, # 使用已创建的会话 ID - user_id=new_end_user.id, # 转换为字符串 + user_id=end_user_id, # 转换为字符串 variables=payload.variables, config=config, web_search=payload.web_search, @@ -601,7 +602,7 @@ async def chat( message=payload.message, conversation_id=conversation.id, # 使用已创建的会话 ID - user_id=new_end_user.id, # 转换为字符串 + user_id=end_user_id, # 转换为字符串 variables=payload.variables, config=config, web_search=payload.web_search, diff --git a/api/app/core/error_codes.py b/api/app/core/error_codes.py index 23023ca4..cb0084b7 100644 --- a/api/app/core/error_codes.py +++ b/api/app/core/error_codes.py @@ -110,24 +110,24 @@ HTTP_MAPPING = { BizCode.TOKEN_EXPIRED: 401, BizCode.TOKEN_BLACKLISTED: 401, BizCode.FORBIDDEN: 403, - BizCode.TENANT_NOT_FOUND: 404, + BizCode.TENANT_NOT_FOUND: 400, BizCode.WORKSPACE_NO_ACCESS: 403, - BizCode.NOT_FOUND: 404, + BizCode.NOT_FOUND: 400, BizCode.USER_NOT_FOUND: 200, - BizCode.WORKSPACE_NOT_FOUND: 404, - BizCode.MODEL_NOT_FOUND: 404, - BizCode.KNOWLEDGE_NOT_FOUND: 404, - BizCode.DOCUMENT_NOT_FOUND: 404, - BizCode.FILE_NOT_FOUND: 404, - BizCode.APP_NOT_FOUND: 404, - BizCode.RELEASE_NOT_FOUND: 404, + BizCode.WORKSPACE_NOT_FOUND: 400, + BizCode.MODEL_NOT_FOUND: 400, + BizCode.KNOWLEDGE_NOT_FOUND: 400, + BizCode.DOCUMENT_NOT_FOUND: 400, + BizCode.FILE_NOT_FOUND: 400, + BizCode.APP_NOT_FOUND: 400, + BizCode.RELEASE_NOT_FOUND: 400, BizCode.DUPLICATE_NAME: 409, BizCode.RESOURCE_ALREADY_EXISTS: 409, BizCode.VERSION_ALREADY_EXISTS: 409, BizCode.STATE_CONFLICT: 409, BizCode.PUBLISH_FAILED: 500, BizCode.NO_DRAFT_TO_PUBLISH: 400, - BizCode.ROLLBACK_TARGET_NOT_FOUND: 404, + BizCode.ROLLBACK_TARGET_NOT_FOUND: 400, BizCode.APP_TYPE_NOT_SUPPORTED: 400, BizCode.AGENT_CONFIG_MISSING: 400, BizCode.SHARE_DISABLED: 403, diff --git a/api/app/core/tools/mcp/client.py b/api/app/core/tools/mcp/client.py index e513a147..c082b314 100644 --- a/api/app/core/tools/mcp/client.py +++ b/api/app/core/tools/mcp/client.py @@ -96,10 +96,7 @@ class SimpleMCPClient: """初始化 SSE MCP 会话 - 参考 Dify 实现""" try: # 建立 SSE 连接 - response = await self._session.get( - self.server_url, - headers={"Accept": "text/event-stream"} - ) + response = await self._session.get(self.server_url) if response.status != 200: error_text = await response.text() diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index 189876a5..f45991cd 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -27,6 +27,8 @@ from .tool_model import ( ToolExecution, ToolType, ToolStatus, AuthType, ExecutionStatus ) from .memory_perceptual_model import MemoryPerceptualModel +from .emotion_suggestions_cache_model import EmotionSuggestionsCache +from .implicit_memory_cache_model import ImplicitMemoryCache __all__ = [ "Tenants", @@ -76,5 +78,7 @@ __all__ = [ "ToolStatus", "AuthType", "ExecutionStatus", - "MemoryPerceptualModel" + "MemoryPerceptualModel", + "EmotionSuggestionsCache", + "ImplicitMemoryCache" ] diff --git a/api/app/models/emotion_suggestions_cache_model.py b/api/app/models/emotion_suggestions_cache_model.py new file mode 100644 index 00000000..9b32f424 --- /dev/null +++ b/api/app/models/emotion_suggestions_cache_model.py @@ -0,0 +1,24 @@ +"""情绪建议缓存模型""" + +import uuid +import datetime +from sqlalchemy import Column, String, Text, Integer, DateTime, JSON +from sqlalchemy.dialects.postgresql import UUID +from app.db import Base + + +class EmotionSuggestionsCache(Base): + """情绪建议缓存表 + + 用于缓存个性化情绪建议,减少 LLM 调用成本,提升响应速度。 + """ + __tablename__ = "emotion_suggestions_cache" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) + end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID(组ID)") + health_summary = Column(Text, nullable=False, comment="健康状态摘要") + suggestions = Column(JSON, nullable=False, comment="建议列表(JSON格式)") + generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间") + expires_at = Column(DateTime, nullable=True, comment="过期时间") + created_at = Column(DateTime, default=datetime.datetime.now) + updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now) diff --git a/api/app/models/implicit_memory_cache_model.py b/api/app/models/implicit_memory_cache_model.py new file mode 100644 index 00000000..32defbab --- /dev/null +++ b/api/app/models/implicit_memory_cache_model.py @@ -0,0 +1,27 @@ +"""隐性记忆缓存模型""" + +import uuid +import datetime +from sqlalchemy import Column, String, Integer, DateTime, JSON +from sqlalchemy.dialects.postgresql import UUID +from app.db import Base + + +class ImplicitMemoryCache(Base): + """隐性记忆缓存表 + + 用于缓存用户的完整隐性记忆画像,包括偏好标签、四维画像、兴趣领域和行为习惯。 + 减少 LLM 调用成本,提升响应速度。 + """ + __tablename__ = "implicit_memory_cache" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) + end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID") + preferences = Column(JSON, nullable=False, comment="偏好标签列表(JSON格式)") + portrait = Column(JSON, nullable=False, comment="四维画像对象(JSON格式)") + interest_areas = Column(JSON, nullable=False, comment="兴趣领域分布对象(JSON格式)") + habits = Column(JSON, nullable=False, comment="行为习惯列表(JSON格式)") + generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间") + expires_at = Column(DateTime, nullable=True, comment="过期时间") + created_at = Column(DateTime, default=datetime.datetime.now) + updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now) diff --git a/api/app/repositories/data_config_repository.py b/api/app/repositories/data_config_repository.py index 7843acc2..135c0063 100644 --- a/api/app/repositories/data_config_repository.py +++ b/api/app/repositories/data_config_repository.py @@ -81,7 +81,7 @@ class DataConfigRepository: n.description AS description, n.entity_type AS entity_type, n.name AS name, - n.fact_summary AS fact_summary, + COALESCE(n.fact_summary, '') AS fact_summary, n.group_id AS group_id, n.apply_id AS apply_id, n.user_id AS user_id, @@ -115,7 +115,7 @@ class DataConfigRepository: description: n.description, entity_type: n.entity_type, name: n.name, - fact_summary: n.fact_summary, + fact_summary: COALESCE(n.fact_summary, ''), id: n.id } AS sourceNode, { @@ -132,7 +132,7 @@ class DataConfigRepository: description: m.description, entity_type: m.entity_type, name: m.name, - fact_summary: m.fact_summary, + fact_summary: COALESCE(m.fact_summary, ''), id: m.id } AS targetNode """ diff --git a/api/app/repositories/emotion_suggestions_cache_repository.py b/api/app/repositories/emotion_suggestions_cache_repository.py new file mode 100644 index 00000000..1c0430d5 --- /dev/null +++ b/api/app/repositories/emotion_suggestions_cache_repository.py @@ -0,0 +1,163 @@ +"""情绪建议缓存仓储层""" + +from sqlalchemy.orm import Session +from typing import Optional, Dict, Any +import datetime + +from app.models.emotion_suggestions_cache_model import EmotionSuggestionsCache +from app.core.logging_config import get_db_logger + +# 获取数据库专用日志器 +db_logger = get_db_logger() + + +class EmotionSuggestionsCacheRepository: + """情绪建议缓存仓储类""" + + def __init__(self, db: Session): + self.db = db + + def get_by_end_user_id(self, end_user_id: str) -> Optional[EmotionSuggestionsCache]: + """根据终端用户ID获取缓存 + + Args: + end_user_id: 终端用户ID(组ID) + + Returns: + 缓存记录,如果不存在返回 None + """ + try: + cache = ( + self.db.query(EmotionSuggestionsCache) + .filter(EmotionSuggestionsCache.end_user_id == end_user_id) + .first() + ) + if cache: + db_logger.info(f"成功获取用户 {end_user_id} 的情绪建议缓存") + else: + db_logger.info(f"用户 {end_user_id} 的情绪建议缓存不存在") + return cache + except Exception as e: + db_logger.error(f"获取用户 {end_user_id} 的情绪建议缓存失败: {str(e)}") + raise + + def create_or_update( + self, + end_user_id: str, + health_summary: str, + suggestions: list, + expires_hours: int = 24 + ) -> EmotionSuggestionsCache: + """创建或更新缓存 + + Args: + end_user_id: 终端用户ID(组ID) + health_summary: 健康状态摘要 + suggestions: 建议列表 + expires_hours: 过期时间(小时),默认24小时 + + Returns: + 缓存记录 + """ + try: + # 查找现有记录 + cache = self.get_by_end_user_id(end_user_id) + + now = datetime.datetime.now() + expires_at = now + datetime.timedelta(hours=expires_hours) + + if cache: + # 更新现有记录 + cache.health_summary = health_summary + cache.suggestions = suggestions + cache.generated_at = now + cache.expires_at = expires_at + cache.updated_at = now + db_logger.info(f"更新用户 {end_user_id} 的情绪建议缓存") + else: + # 创建新记录 + cache = EmotionSuggestionsCache( + end_user_id=end_user_id, + health_summary=health_summary, + suggestions=suggestions, + generated_at=now, + expires_at=expires_at, + created_at=now, + updated_at=now + ) + self.db.add(cache) + db_logger.info(f"创建用户 {end_user_id} 的情绪建议缓存") + + self.db.commit() + self.db.refresh(cache) + return cache + except Exception as e: + self.db.rollback() + db_logger.error(f"创建或更新用户 {end_user_id} 的情绪建议缓存失败: {str(e)}") + raise + + def delete_by_end_user_id(self, end_user_id: str) -> bool: + """删除缓存 + + Args: + end_user_id: 终端用户ID(组ID) + + Returns: + 是否删除成功 + """ + try: + cache = self.get_by_end_user_id(end_user_id) + if cache: + self.db.delete(cache) + self.db.commit() + db_logger.info(f"删除用户 {end_user_id} 的情绪建议缓存") + return True + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"删除用户 {end_user_id} 的情绪建议缓存失败: {str(e)}") + raise + + @staticmethod + def is_expired(cache: EmotionSuggestionsCache) -> bool: + """检查缓存是否过期 + + Args: + cache: 缓存记录 + + Returns: + 是否过期 + """ + if cache.expires_at is None: + return False + return datetime.datetime.now() > cache.expires_at + + +# 便捷函数 +def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[EmotionSuggestionsCache]: + """根据终端用户ID获取缓存""" + repo = EmotionSuggestionsCacheRepository(db) + return repo.get_by_end_user_id(end_user_id) + + +def create_or_update_cache( + db: Session, + end_user_id: str, + health_summary: str, + suggestions: list, + expires_hours: int = 24 +) -> EmotionSuggestionsCache: + """创建或更新缓存""" + repo = EmotionSuggestionsCacheRepository(db) + return repo.create_or_update(end_user_id, health_summary, suggestions, expires_hours) + + +def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool: + """删除缓存""" + repo = EmotionSuggestionsCacheRepository(db) + return repo.delete_by_end_user_id(end_user_id) + + +def is_cache_expired(cache: EmotionSuggestionsCache) -> bool: + """检查缓存是否过期""" + return EmotionSuggestionsCacheRepository.is_expired(cache) diff --git a/api/app/repositories/implicit_memory_cache_repository.py b/api/app/repositories/implicit_memory_cache_repository.py new file mode 100644 index 00000000..65356980 --- /dev/null +++ b/api/app/repositories/implicit_memory_cache_repository.py @@ -0,0 +1,175 @@ +"""隐性记忆缓存仓储层""" + +from sqlalchemy.orm import Session +from typing import Optional, Dict, Any +import datetime + +from app.models.implicit_memory_cache_model import ImplicitMemoryCache +from app.core.logging_config import get_db_logger + +# 获取数据库专用日志器 +db_logger = get_db_logger() + + +class ImplicitMemoryCacheRepository: + """隐性记忆缓存仓储类""" + + def __init__(self, db: Session): + self.db = db + + def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitMemoryCache]: + """根据终端用户ID获取缓存 + + Args: + end_user_id: 终端用户ID + + Returns: + 缓存记录,如果不存在返回 None + """ + try: + cache = ( + self.db.query(ImplicitMemoryCache) + .filter(ImplicitMemoryCache.end_user_id == end_user_id) + .first() + ) + if cache: + db_logger.info(f"成功获取用户 {end_user_id} 的隐性记忆缓存") + else: + db_logger.info(f"用户 {end_user_id} 的隐性记忆缓存不存在") + return cache + except Exception as e: + db_logger.error(f"获取用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}") + raise + + def create_or_update( + self, + end_user_id: str, + preferences: list, + portrait: dict, + interest_areas: dict, + habits: list, + expires_hours: int = 168 # 默认7天 + ) -> ImplicitMemoryCache: + """创建或更新缓存 + + Args: + end_user_id: 终端用户ID + preferences: 偏好标签列表 + portrait: 四维画像对象 + interest_areas: 兴趣领域分布对象 + habits: 行为习惯列表 + expires_hours: 过期时间(小时),默认168小时(7天) + + Returns: + 缓存记录 + """ + try: + # 查找现有记录 + cache = self.get_by_end_user_id(end_user_id) + + now = datetime.datetime.now() + expires_at = now + datetime.timedelta(hours=expires_hours) + + if cache: + # 更新现有记录 + cache.preferences = preferences + cache.portrait = portrait + cache.interest_areas = interest_areas + cache.habits = habits + cache.generated_at = now + cache.expires_at = expires_at + cache.updated_at = now + db_logger.info(f"更新用户 {end_user_id} 的隐性记忆缓存") + else: + # 创建新记录 + cache = ImplicitMemoryCache( + end_user_id=end_user_id, + preferences=preferences, + portrait=portrait, + interest_areas=interest_areas, + habits=habits, + generated_at=now, + expires_at=expires_at, + created_at=now, + updated_at=now + ) + self.db.add(cache) + db_logger.info(f"创建用户 {end_user_id} 的隐性记忆缓存") + + self.db.commit() + self.db.refresh(cache) + return cache + except Exception as e: + self.db.rollback() + db_logger.error(f"创建或更新用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}") + raise + + def delete_by_end_user_id(self, end_user_id: str) -> bool: + """删除缓存 + + Args: + end_user_id: 终端用户ID + + Returns: + 是否删除成功 + """ + try: + cache = self.get_by_end_user_id(end_user_id) + if cache: + self.db.delete(cache) + self.db.commit() + db_logger.info(f"删除用户 {end_user_id} 的隐性记忆缓存") + return True + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"删除用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}") + raise + + @staticmethod + def is_expired(cache: ImplicitMemoryCache) -> bool: + """检查缓存是否过期 + + Args: + cache: 缓存记录 + + Returns: + 是否过期 + """ + if cache.expires_at is None: + return False + return datetime.datetime.now() > cache.expires_at + + +# 便捷函数 +def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[ImplicitMemoryCache]: + """根据终端用户ID获取缓存""" + repo = ImplicitMemoryCacheRepository(db) + return repo.get_by_end_user_id(end_user_id) + + +def create_or_update_cache( + db: Session, + end_user_id: str, + preferences: list, + portrait: dict, + interest_areas: dict, + habits: list, + expires_hours: int = 168 +) -> ImplicitMemoryCache: + """创建或更新缓存""" + repo = ImplicitMemoryCacheRepository(db) + return repo.create_or_update( + end_user_id, preferences, portrait, interest_areas, habits, expires_hours + ) + + +def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool: + """删除缓存""" + repo = ImplicitMemoryCacheRepository(db) + return repo.delete_by_end_user_id(end_user_id) + + +def is_cache_expired(cache: ImplicitMemoryCache) -> bool: + """检查缓存是否过期""" + return ImplicitMemoryCacheRepository.is_expired(cache) diff --git a/api/app/repositories/memory_increment_repository.py b/api/app/repositories/memory_increment_repository.py index 37396fbd..f3a56622 100644 --- a/api/app/repositories/memory_increment_repository.py +++ b/api/app/repositories/memory_increment_repository.py @@ -25,7 +25,7 @@ class MemoryIncrementRepository: MemoryIncrement, func.row_number().over( partition_by=func.date(MemoryIncrement.created_at), # 按日期分区 - order_by=MemoryIncrement.created_at.desc() # 按时间戳升序排序 + order_by=MemoryIncrement.created_at.desc() # 按时间戳降序排序,取每天最新的 ).label('row_num') ) .filter(MemoryIncrement.workspace_id == workspace_id) @@ -34,14 +34,24 @@ class MemoryIncrementRepository: memory_increment_alias = aliased(MemoryIncrement, subquery) - memory_increments = ( + # 先取最近的limit条记录的子查询 + recent_records_subquery = ( self.db.query(memory_increment_alias) .filter(subquery.c.row_num == 1) # 只取每个日期的第一条(最新的) - .order_by(memory_increment_alias.created_at.asc()) # 按时间戳降序排序 + .order_by(memory_increment_alias.created_at.desc()) # 按时间戳降序排序,取最近的 .limit(limit) + .subquery() + ) + + # 在外层按升序排列(从旧到新) + recent_alias = aliased(MemoryIncrement, recent_records_subquery) + memory_increments = ( + self.db.query(recent_alias) + .order_by(recent_alias.created_at.asc()) # 按时间戳升序排序 .all() ) - db_logger.info(f"成功查询工作空间 {workspace_id} 下的内存增量") + + db_logger.info(f"成功查询工作空间 {workspace_id} 下的内存增量,返回最近 {len(memory_increments)} 条记录") return memory_increments except Exception as e: db_logger.error(f"查询工作空间 {workspace_id} 下内存增量时出错: {str(e)}") diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index fb28b81e..c91c2e80 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -332,7 +332,7 @@ RETURN e.id AS id, e.description AS description, e.aliases AS aliases, e.name_embedding AS name_embedding, - e.fact_summary AS fact_summary, + COALESCE(e.fact_summary, '') AS fact_summary, e.connect_strength AS connect_strength, collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT c.id) AS chunk_ids, diff --git a/api/app/schemas/emotion_schema.py b/api/app/schemas/emotion_schema.py index 9f14884d..37e9a2e3 100644 --- a/api/app/schemas/emotion_schema.py +++ b/api/app/schemas/emotion_schema.py @@ -30,3 +30,9 @@ class EmotionSuggestionsRequest(BaseModel): """获取个性化情绪建议请求""" group_id: str = Field(..., description="组ID") config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") + + +class EmotionGenerateSuggestionsRequest(BaseModel): + """生成个性化情绪建议请求""" + group_id: str = Field(..., description="组ID") + config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") diff --git a/api/app/schemas/implicit_memory_schema.py b/api/app/schemas/implicit_memory_schema.py index e1770b18..ced50b92 100644 --- a/api/app/schemas/implicit_memory_schema.py +++ b/api/app/schemas/implicit_memory_schema.py @@ -262,3 +262,25 @@ InterestCategory = InterestCategoryResponse InterestAreaDistribution = InterestAreaDistributionResponse BehaviorHabit = BehaviorHabitResponse UserProfile = UserProfileResponse + + +# Cache-related Schemas + +class GenerateProfileRequest(BaseModel): + """生成完整用户画像请求""" + end_user_id: str = Field(..., description="终端用户ID") + + +class CompleteProfileResponse(BaseModel): + """完整用户画像响应(包含所有模块)""" + user_id: str + preferences: List[PreferenceTagResponse] + portrait: DimensionPortraitResponse + interest_areas: InterestAreaDistributionResponse + habits: List[BehaviorHabitResponse] + generated_at: datetime.datetime + cached: bool = Field(False, description="是否来自缓存") + + @field_serializer("generated_at", when_used="json") + def _serialize_generated_at(self, dt: datetime.datetime): + return int(dt.timestamp() * 1000) if dt else None diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 0065c64b..bc2d6ca3 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -558,7 +558,7 @@ class AppChatService: config: WorkflowConfig, app_id: uuid.UUID, workspace_id: uuid.UUID, - user_id: Optional[str] = None, + user_id: str = None, variables: Optional[Dict[str, Any]] = None, web_search: bool = False, memory: bool = True, diff --git a/api/app/services/conversation_service.py b/api/app/services/conversation_service.py index 3695a222..275d6413 100644 --- a/api/app/services/conversation_service.py +++ b/api/app/services/conversation_service.py @@ -516,8 +516,16 @@ class ConversationService: conversation_messages = self.get_conversation_history( conversation_id=conversation_id, - max_history=30 + max_history=20 ) + if len(conversation_messages) == 0: + return ConversationOut( + theme="", + question=[], + summary="", + takeaways=[], + info_score=0, + ) with open('app/services/prompt/conversation_summary_system.jinja2', 'r', encoding='utf-8') as f: system_prompt = f.read() @@ -536,6 +544,7 @@ class ConversationService: ] logger.info(f"Invoking LLM for conversation_id={conversation_id}") model_resp = await llm.ainvoke(messages) + try: if isinstance(model_resp.content, str): result = json_repair.repair_json(model_resp.content, return_objects=True) diff --git a/api/app/services/draft_run_service.py b/api/app/services/draft_run_service.py index 569684d5..50934226 100644 --- a/api/app/services/draft_run_service.py +++ b/api/app/services/draft_run_service.py @@ -245,7 +245,8 @@ class DraftRunService: storage_type: Optional[str] = None, user_rag_memory_id: Optional[str] = None, web_search: bool = True, - memory: bool = True + memory: bool = True, + sub_agent: bool = False ) -> Dict[str, Any]: """执行试运行(使用 LangChain Agent) @@ -435,7 +436,7 @@ class DraftRunService: elapsed_time = time.time() - start_time # 8. 保存会话消息 - if agent_config.memory and agent_config.memory.get("enabled"): + if not sub_agent and agent_config.memory and agent_config.memory.get("enabled"): await self._save_conversation_message( conversation_id=conversation_id, user_message=message, diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py index f2532557..50773b91 100644 --- a/api/app/services/emotion_analytics_service.py +++ b/api/app/services/emotion_analytics_service.py @@ -705,3 +705,85 @@ class EmotionAnalyticsService: health_summary=summary, suggestions=suggestions ) + + async def get_cached_suggestions( + self, + end_user_id: str, + db: Session, + ) -> Optional[Dict[str, Any]]: + """从缓存获取个性化情绪建议 + + Args: + end_user_id: 宿主ID(用户组ID) + db: 数据库会话 + + Returns: + Dict: 缓存的建议数据,如果不存在或已过期返回 None + """ + try: + from app.repositories.emotion_suggestions_cache_repository import ( + EmotionSuggestionsCacheRepository, + ) + + logger.info(f"尝试从缓存获取情绪建议: user={end_user_id}") + + cache_repo = EmotionSuggestionsCacheRepository(db) + cache = cache_repo.get_by_end_user_id(end_user_id) + + if cache is None: + logger.info(f"用户 {end_user_id} 的建议缓存不存在") + return None + + # 检查是否过期 + if cache_repo.is_expired(cache): + logger.info(f"用户 {end_user_id} 的建议缓存已过期") + return None + + logger.info(f"成功从缓存获取建议: user={end_user_id}") + + return { + "health_summary": cache.health_summary, + "suggestions": cache.suggestions, + "generated_at": cache.generated_at.isoformat(), + "cached": True + } + + except Exception as e: + logger.error(f"从缓存获取建议失败: {str(e)}", exc_info=True) + return None + + async def save_suggestions_cache( + self, + end_user_id: str, + suggestions_data: Dict[str, Any], + db: Session, + expires_hours: int = 24 + ) -> None: + """保存建议到缓存 + + Args: + end_user_id: 宿主ID(用户组ID) + suggestions_data: 建议数据 + db: 数据库会话 + expires_hours: 过期时间(小时) + """ + try: + from app.repositories.emotion_suggestions_cache_repository import ( + EmotionSuggestionsCacheRepository, + ) + + logger.info(f"保存建议到缓存: user={end_user_id}") + + cache_repo = EmotionSuggestionsCacheRepository(db) + cache_repo.create_or_update( + end_user_id=end_user_id, + health_summary=suggestions_data["health_summary"], + suggestions=suggestions_data["suggestions"], + expires_hours=expires_hours + ) + + logger.info(f"建议缓存保存成功: user={end_user_id}") + + except Exception as e: + logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True) + # 不抛出异常,缓存失败不应影响主流程 \ No newline at end of file diff --git a/api/app/services/home_page_service.py b/api/app/services/home_page_service.py index a84a8214..8326ad40 100644 --- a/api/app/services/home_page_service.py +++ b/api/app/services/home_page_service.py @@ -11,6 +11,22 @@ from app.repositories.home_page_repository import HomePageRepository from app.schemas.home_page_schema import HomeStatistics, WorkspaceInfo class HomePageService: + + DEFAULT_RETURN_DATA: Dict[str, Any] = { + "message": "", + "introduction": { + "codeName": "", + "releaseDate": "", + "upgradePosition": "", + "coreUpgrades": [] + }, + "introduction_en": { + "codeName": "", + "releaseDate": "", + "upgradePosition": "", + "coreUpgrades": [] + } + } @staticmethod def get_home_statistics(db: Session, tenant_id: UUID) -> HomeStatistics: @@ -82,60 +98,36 @@ class HomePageService: :param version: 系统版本号(如 "0.2.0") :return: 对应版本的详细介绍 """ - # 1. 定义 JSON 文件路径(使用 Path 处理跨平台路径问题) - json_file_path = Path(__file__).parent.parent / "version_info.json" - # 转换为绝对路径,便于调试 - json_abs_path = json_file_path.resolve() + # 2. 定义 JSON 文件路径(简化路径处理,保留绝对路径调试特性) + json_abs_path = Path(__file__).parent.parent / "version_info.json" + json_abs_path = json_abs_path.resolve() + + # 3. 初始化返回结果(深拷贝默认模板,避免修改原常量) + from copy import deepcopy + result = deepcopy(HomePageService.DEFAULT_RETURN_DATA) try: - # 2. 读取 JSON 文件 + # 4. 简化文件存在性判断(合并逻辑,减少分支) if not json_abs_path.exists(): - return { - "message": f"版本介绍文件不存在:{json_abs_path}", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } + result["message"] = f"版本介绍文件不存在:{json_abs_path}" + return result + # 5. 读取并解析 JSON 文件(简化文件操作流程) with open(json_abs_path, "r", encoding="utf-8") as f: changelogs = json.load(f) - # 3. 匹配对应版本的介绍,若版本不存在返回默认提示 - if version not in changelogs: - return { - "message": f"暂未查询到 {version} 版本的详细介绍", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } - return changelogs[version] + # 6. 简化版本匹配逻辑,直接返回结果或更新提示信息 + if version in changelogs: + return changelogs[version] + result["message"] = f"暂未查询到 {version} 版本的详细介绍" + return result except FileNotFoundError as e: - # 处理文件不存在异常 - return { - "message": f"系统内部错误:{str(e)}", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } + result["message"] = f"系统内部错误:{str(e)}" + return result except json.JSONDecodeError: - # 处理 JSON 格式错误 - return { - "message": "版本介绍文件格式错误,无法解析 JSON", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } + result["message"] = "版本介绍文件格式错误,无法解析 JSON" + return result except Exception as e: - # 处理其他未知异常 - return { - "message": f"加载版本介绍失败:{str(e)}", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } \ No newline at end of file + result["message"] = f"加载版本介绍失败:{str(e)}" + return result \ No newline at end of file diff --git a/api/app/services/implicit_memory_service.py b/api/app/services/implicit_memory_service.py index 8155b7a1..c98f14bc 100644 --- a/api/app/services/implicit_memory_service.py +++ b/api/app/services/implicit_memory_service.py @@ -7,6 +7,7 @@ user profiles from memory summaries. """ import logging +import asyncio from datetime import datetime from typing import List, Optional @@ -372,4 +373,129 @@ class ImplicitMemoryService: except Exception as e: logger.error(f"Failed to get behavior habits for user {user_id}: {e}") raise - \ No newline at end of file + + + async def generate_complete_profile( + self, + user_id: str + ) -> dict: + """生成完整的用户画像(包含所有4个模块) + + Args: + user_id: 用户ID + + Returns: + Dict: 包含所有模块的完整画像数据 + """ + logger.info(f"生成完整用户画像: user={user_id}") + + try: + # 并行调用4个分析方法 + preferences, portrait, interest_areas, habits = await asyncio.gather( + self.get_preference_tags(user_id=user_id), + self.get_dimension_portrait(user_id=user_id), + self.get_interest_area_distribution(user_id=user_id), + self.get_behavior_habits(user_id=user_id) + ) + + # 转换为可序列化的格式 + profile_data = { + "preferences": [tag.model_dump(mode='json') for tag in preferences], + "portrait": portrait.model_dump(mode='json'), + "interest_areas": interest_areas.model_dump(mode='json'), + "habits": [habit.model_dump(mode='json') for habit in habits] + } + + logger.info(f"完整用户画像生成完成: user={user_id}") + return profile_data + + except Exception as e: + logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True) + raise + + async def get_cached_profile( + self, + end_user_id: str, + db: Session + ) -> Optional[dict]: + """从缓存获取完整用户画像 + + Args: + end_user_id: 终端用户ID + db: 数据库会话 + + Returns: + Dict: 缓存的画像数据,如果不存在或已过期返回 None + """ + try: + from app.repositories.implicit_memory_cache_repository import ( + ImplicitMemoryCacheRepository, + ) + + logger.info(f"尝试从缓存获取用户画像: user={end_user_id}") + + cache_repo = ImplicitMemoryCacheRepository(db) + cache = cache_repo.get_by_end_user_id(end_user_id) + + if cache is None: + logger.info(f"用户 {end_user_id} 的画像缓存不存在") + return None + + # 检查是否过期 + if cache_repo.is_expired(cache): + logger.info(f"用户 {end_user_id} 的画像缓存已过期") + return None + + logger.info(f"成功从缓存获取用户画像: user={end_user_id}") + + return { + "end_user_id": cache.end_user_id, + "preferences": cache.preferences, + "portrait": cache.portrait, + "interest_areas": cache.interest_areas, + "habits": cache.habits, + "generated_at": cache.generated_at.isoformat(), + "cached": True + } + + except Exception as e: + logger.error(f"从缓存获取用户画像失败: {str(e)}", exc_info=True) + return None + + async def save_profile_cache( + self, + end_user_id: str, + profile_data: dict, + db: Session, + expires_hours: int = 168 # 默认7天 + ) -> None: + """保存用户画像到缓存 + + Args: + end_user_id: 终端用户ID + profile_data: 画像数据 + db: 数据库会话 + expires_hours: 过期时间(小时),默认168小时(7天) + """ + try: + from app.repositories.implicit_memory_cache_repository import ( + ImplicitMemoryCacheRepository, + ) + + logger.info(f"保存用户画像到缓存: user={end_user_id}") + + cache_repo = ImplicitMemoryCacheRepository(db) + cache_repo.create_or_update( + end_user_id=end_user_id, + preferences=profile_data["preferences"], + portrait=profile_data["portrait"], + interest_areas=profile_data["interest_areas"], + habits=profile_data["habits"], + expires_hours=expires_hours + ) + + logger.info(f"用户画像缓存保存成功: user={end_user_id}") + + except Exception as e: + logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True) + # 不抛出异常,缓存失败不应影响主流程 diff --git a/api/app/services/memory_agent_service.py b/api/app/services/memory_agent_service.py index ccebdcd6..f0756764 100644 --- a/api/app/services/memory_agent_service.py +++ b/api/app/services/memory_agent_service.py @@ -9,30 +9,30 @@ import os import re import time import uuid -from threading import Lock + from typing import Any, AsyncGenerator, Dict, List, Optional import redis -from langchain_core.messages import HumanMessage - from app.core.config import settings from app.core.logging_config import get_config_logger, get_logger from app.core.memory.agent.langgraph_graph.read_graph import make_read_graph from app.core.memory.agent.langgraph_graph.write_graph import make_write_graph from app.core.memory.agent.logger_file.log_streamer import LogStreamer -from app.core.memory.agent.utils.messages_tools import merge_multiple_search_results, reorder_output_results +from app.core.memory.agent.utils.mcp_tools import get_mcp_server_config from app.core.memory.agent.utils.type_classifier import status_typle from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags -from app.repositories.memory_short_repository import ShortTermMemoryRepository from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.db import get_db_context from app.models.knowledge_model import Knowledge, KnowledgeType +from app.repositories.memory_short_repository import ShortTermMemoryRepository from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.schemas.memory_config_schema import ConfigurationError from app.services.memory_config_service import MemoryConfigService from app.services.memory_konwledges_server import ( write_rag, ) +from langchain_mcp_adapters.client import MultiServerMCPClient +from langchain_mcp_adapters.tools import load_mcp_tools from pydantic import BaseModel, Field from sqlalchemy import func from sqlalchemy.orm import Session @@ -51,20 +51,20 @@ _neo4j_connector = Neo4jConnector() class MemoryAgentService: """Service for memory agent operations""" - def __init__(self): - self.user_locks: Dict[str, Lock] = {} - self.locks_lock = Lock() - def writer_messages_deal(self, messages, start_time, group_id, config_id, message, context): + + def writer_messages_deal(self,messages,start_time,group_id,config_id,message): + messages = str(messages).replace("'", '"').replace('\\n', '').replace('\n', '').replace('\\', '') + countext = re.findall(r'"status": "(.*?)",', messages)[0] duration = time.time() - start_time - if str(messages) == 'success': + if countext == 'success': logger.info(f"Write operation successful for group {group_id} with config_id {config_id}") # 记录成功的操作 if audit_logger: audit_logger.log_operation(operation="WRITE", config_id=config_id, group_id=group_id, success=True, duration=duration, details={"message_length": len(message)}) - return context + return countext else: logger.warning(f"Write operation failed for group {group_id}") @@ -81,12 +81,7 @@ class MemoryAgentService: raise ValueError(f"写入失败: {messages}") - def get_group_lock(self, group_id: str) -> Lock: - """Get lock for specific group to prevent concurrent processing""" - with self.locks_lock: - if group_id not in self.user_locks: - self.user_locks[group_id] = Lock() - return self.user_locks[group_id] + def extract_tool_call_info(self, event: Dict) -> bool: """Extract tool call information from event""" @@ -148,26 +143,8 @@ class MemoryAgentService: else: status = "unknown" - # Add database connection pool status - try: - from app.db import get_pool_status - pool_status = get_pool_status() - logger.info(f"Database pool status: {pool_status}") - - # Check if pool usage is too high - if pool_status.get("usage_percent", 0) > 80: - logger.warning(f"High database pool usage: {pool_status['usage_percent']}%") - status = "warning" - - except Exception as e: - logger.error(f"Failed to get pool status: {e}") - pool_status = {"error": str(e)} - logger.info(f"Health status: {status}") - return { - "status": status, - "database_pool": pool_status - } + return {"status": status} def get_log_content(self) -> str: """ @@ -324,42 +301,54 @@ class MemoryAgentService: audit_logger.log_operation(operation="WRITE", config_id=config_id, group_id=group_id, success=False, duration=duration, error=error_msg) raise ValueError(error_msg) + mcp_config = get_mcp_server_config() + client = MultiServerMCPClient(mcp_config) + + if storage_type == "rag": + result = await write_rag(group_id, message, user_rag_memory_id) + return result + else: + async with client.session("data_flow") as session: + logger.debug("Connected to MCP Server: data_flow") + tools = await load_mcp_tools(session) + workflow_errors = [] # Track errors from workflow + + # Pass memory_config to the graph workflow + async with make_write_graph(group_id, tools, group_id, group_id, memory_config=memory_config) as graph: + logger.debug("Write graph created successfully") - try: - if storage_type == "rag": - result = await write_rag(group_id, message, user_rag_memory_id) - return result - else: - async with make_write_graph() as graph: config = {"configurable": {"thread_id": group_id}} - # 初始状态 - 包含所有必要字段 - initial_state = {"messages": [HumanMessage(content=message)], "group_id": group_id, - "memory_config": memory_config} - # 获取节点更新信息 - async for update_event in graph.astream( - initial_state, - stream_mode="updates", + async for event in graph.astream( + {"messages": message, "memory_config": memory_config, "errors": []}, + stream_mode="values", config=config ): - for node_name, node_data in update_event.items(): - if 'save_neo4j' == node_name: - massages = node_data - massagesstatus = massages.get('write_result')['status'] - contents = massages.get('write_result') - return self.writer_messages_deal(massagesstatus, start_time, group_id, config_id, message, contents) - except Exception as e: - # Ensure proper error handling and logging - error_msg = f"Write operation failed: {str(e)}" - logger.error(error_msg) - if audit_logger: - duration = time.time() - start_time - audit_logger.log_operation(operation="WRITE", config_id=config_id, group_id=group_id, success=False, duration=duration, error=error_msg) - raise ValueError(error_msg) - - - - + messages = event.get('messages') + # Capture any errors from the state + if event.get('errors'): + workflow_errors.extend(event.get('errors', [])) + + # Check for workflow errors + if workflow_errors: + error_details = "; ".join([f"{e['tool']}: {e['error']}" for e in workflow_errors]) + logger.error(f"Write workflow failed with errors: {error_details}") + + if audit_logger: + duration = time.time() - start_time + audit_logger.log_operation( + operation="WRITE", + config_id=config_id, + group_id=group_id, + success=False, + duration=duration, + error=error_details + ) + + raise ValueError(f"Write workflow failed: {error_details}") + + return self.writer_messages_deal(messages, start_time, group_id, config_id, message) + async def read_memory( self, group_id: str, @@ -398,9 +387,8 @@ class MemoryAgentService: import time start_time = time.time() - end_user_id=group_id ori_message=message - + end_user_id=group_id # Resolve config_id if None using end_user's connected config if config_id is None: try: @@ -413,169 +401,336 @@ class MemoryAgentService: raise # Re-raise our specific error logger.error(f"Failed to get connected config for end_user {group_id}: {e}") raise ValueError(f"Unable to determine memory configuration for end_user {group_id}: {e}") - + logger.info(f"Read operation for group {group_id} with config_id {config_id}") - + # 导入审计日志记录器 try: from app.core.memory.utils.log.audit_logger import audit_logger except ImportError: audit_logger = None - - # Get group lock to prevent concurrent processing - group_lock = self.get_group_lock(group_id) - with group_lock: - # Step 1: Load configuration from database only - try: - config_service = MemoryConfigService(db) - memory_config = config_service.load_memory_config( + try: + config_service = MemoryConfigService(db) + memory_config = config_service.load_memory_config( + config_id=config_id, + service_name="MemoryAgentService" + ) + logger.info(f"Configuration loaded successfully: {memory_config.config_name}") + except ConfigurationError as e: + error_msg = f"Failed to load configuration for config_id: {config_id}: {e}" + logger.error(error_msg) + + # Log failed operation + if audit_logger: + duration = time.time() - start_time + audit_logger.log_operation( + operation="READ", config_id=config_id, - service_name="MemoryAgentService" + group_id=group_id, + success=False, + duration=duration, + error=error_msg ) - logger.info(f"Configuration loaded successfully: {memory_config.config_name}") - except ConfigurationError as e: - error_msg = f"Failed to load configuration for config_id: {config_id}: {e}" - logger.error(error_msg) - - # Log failed operation - if audit_logger: - duration = time.time() - start_time - audit_logger.log_operation( - operation="READ", - config_id=config_id, - group_id=group_id, - success=False, - duration=duration, - error=error_msg - ) - - raise ValueError(error_msg) - - # Step 2: Prepare history - history.append({"role": "user", "content": message}) - logger.debug(f"Group ID:{group_id}, Message:{message}, History:{history}, Config ID:{config_id}") - - # Step 3: Initialize MCP client and execute read workflow - try: - async with make_read_graph() as graph: - config = {"configurable": {"thread_id": group_id}} - # 初始状态 - 包含所有必要字段 - initial_state = {"messages": [HumanMessage(content=message)], "search_switch": search_switch, - "group_id": group_id - , "storage_type": storage_type, "user_rag_memory_id": user_rag_memory_id, - "memory_config": memory_config} - # 获取节点更新信息 - _intermediate_outputs = [] - summary = '' - async for update_event in graph.astream( - initial_state, - stream_mode="updates", - config=config - ): - for node_name, node_data in update_event.items(): - # 处理不同Summary节点的返回结构 - if 'Summary' in node_name: - if 'InputSummary' in node_data and 'summary_result' in node_data['InputSummary']: - summary = node_data['InputSummary']['summary_result'] - elif 'RetrieveSummary' in node_data and 'summary_result' in node_data['RetrieveSummary']: - summary = node_data['RetrieveSummary']['summary_result'] - elif 'summary' in node_data and 'summary_result' in node_data['summary']: - summary = node_data['summary']['summary_result'] - elif 'SummaryFails' in node_data and 'summary_result' in node_data['SummaryFails']: - summary = node_data['SummaryFails']['summary_result'] + raise ValueError(error_msg) - spit_data = node_data.get('spit_data', {}).get('_intermediate', None) - if spit_data and spit_data != [] and spit_data != {}: - _intermediate_outputs.append(spit_data) + # Step 2: Prepare history + history.append({"role": "user", "content": message}) + logger.debug(f"Group ID:{group_id}, Message:{message}, History:{history}, Config ID:{config_id}") - # Problem_Extension 节点 - problem_extension = node_data.get('problem_extension', {}).get('_intermediate', None) - if problem_extension and problem_extension != [] and problem_extension != {}: - _intermediate_outputs.append(problem_extension) + # Step 3: Initialize MCP client and execute read workflow + mcp_config = get_mcp_server_config() + client = MultiServerMCPClient(mcp_config) - # Retrieve 节点 - retrieve_node = node_data.get('retrieve', {}).get('_intermediate_outputs', None) - if retrieve_node and retrieve_node != [] and retrieve_node != {}: - _intermediate_outputs.extend(retrieve_node) + async with client.session('data_flow') as session: + session_start = time.time() + logger.debug("Connected to MCP Server: data_flow") - # Verify 节点 - verify_n = node_data.get('verify', {}).get('_intermediate', None) - if verify_n and verify_n != [] and verify_n != {}: - _intermediate_outputs.append(verify_n) + tools_start = time.time() + tools = await load_mcp_tools(session) + tools_time = time.time() - tools_start + logger.info(f"[PERF] MCP tools loading took: {tools_time:.4f}s") - # Summary 节点 - summary_n = node_data.get('summary', {}).get('_intermediate', None) - if summary_n and summary_n != [] and summary_n != {}: - _intermediate_outputs.append(summary_n) + outputs = [] + intermediate_outputs = [] + seen_intermediates = set() # Track seen intermediate outputs to avoid duplicates - _intermediate_outputs = [item for item in _intermediate_outputs if item and item != [] and item != {}] + # Pass memory_config to the graph workflow + graph_start = time.time() + async with make_read_graph(group_id, tools, search_switch, group_id, group_id, memory_config=memory_config, storage_type=storage_type, user_rag_memory_id=user_rag_memory_id) as graph: + graph_init_time = time.time() - graph_start + logger.info(f"[PERF] Graph initialization took: {graph_init_time:.4f}s") - optimized_outputs = merge_multiple_search_results(_intermediate_outputs) - result = reorder_output_results(optimized_outputs) + start = time.time() + config = {"configurable": {"thread_id": group_id}} + workflow_errors = [] # Track errors from workflow - # Log successful operation - if audit_logger: - duration = time.time() - start_time - audit_logger.log_operation( - operation="READ", - config_id=config_id, - group_id=group_id, - success=True, - duration=duration - ) + event_count = 0 + async for event in graph.astream( + {"messages": history, "memory_config": memory_config, "errors": []}, + stream_mode="values", + config=config + ): + event_count += 1 + event_start = time.time() + messages = event.get('messages') + # Capture any errors from the state + if event.get('errors'): + workflow_errors.extend(event.get('errors', [])) - retrieved_content = [] - repo = ShortTermMemoryRepository(db) - if str(search_switch).strip() != "2": - for intermediate in result: - intermediate_type = intermediate['type'] - if intermediate_type == "search_result": - query = intermediate['query'] - raw_results = intermediate['raw_results'] - reranked_results = raw_results.get('reranked_results', []) + for msg in messages: + msg_content = msg.content + msg_role = msg.__class__.__name__.lower().replace("message", "") + outputs.append({ + "role": msg_role, + "content": msg_content + }) + + # Extract intermediate outputs + if hasattr(msg, 'content'): try: - statements = [statement['statement'] for statement in - reranked_results.get('statements', [])] - except Exception: - statements = [] - statements = list(set(statements)) - retrieved_content.append({query: statements}) - if retrieved_content == []: - retrieved_content = '' - if '信息不足,无法回答。' != str(summary) and str(search_switch).strip() != "2": # and retrieved_content!=[] - # 使用 upsert 方法 - repo.upsert( - end_user_id=end_user_id, # 确保这个变量在作用域内 - messages=ori_message, - aimessages=summary, - retrieved_content=retrieved_content, - search_switch=str(search_switch) - ) - print("写入成功") + # Handle MCP content format: [{'type': 'text', 'text': '...'}] + content_to_parse = msg_content + if isinstance(msg_content, list): + for block in msg_content: + if isinstance(block, dict) and block.get('type') == 'text': + content_to_parse = block.get('text', '') + break + else: + continue # No text block found - return { - "answer": summary, - "intermediate_outputs": result + # Try to parse content as JSON + if isinstance(content_to_parse, str): + try: + parsed = json.loads(content_to_parse) + if isinstance(parsed, dict): + # Check for single intermediate output + if '_intermediate' in parsed: + intermediate_data = parsed['_intermediate'] + output_key = self._create_intermediate_key(intermediate_data) + + if output_key not in seen_intermediates: + seen_intermediates.add(output_key) + intermediate_outputs.append(self._format_intermediate_output(intermediate_data)) + + # Check for multiple intermediate outputs (from Retrieve) + if '_intermediates' in parsed: + for intermediate_data in parsed['_intermediates']: + output_key = self._create_intermediate_key(intermediate_data) + + if output_key not in seen_intermediates: + seen_intermediates.add(output_key) + intermediate_outputs.append(self._format_intermediate_output(intermediate_data)) + except (json.JSONDecodeError, ValueError): + pass + except Exception as e: + logger.debug(f"Failed to extract intermediate output: {e}") + + event_time = time.time() - event_start + logger.info(f"[PERF] Event {event_count} processing took: {event_time:.4f}s") + + workflow_duration = time.time() - start + session_duration = time.time() - session_start + logger.info(f"[PERF] Read graph workflow completed in {workflow_duration}s") + logger.info(f"[PERF] Total session duration: {session_duration:.4f}s") + logger.info(f"[PERF] Total events processed: {event_count}") + # Extract final answer + final_answer = "" + for messages in outputs: + if messages['role'] == 'tool': + message = messages['content'] + + # Handle MCP content format: [{'type': 'text', 'text': '...'}] + if isinstance(message, list): + # Extract text from MCP content blocks + for block in message: + if isinstance(block, dict) and block.get('type') == 'text': + message = block.get('text', '') + break + else: + continue # No text block found + + try: + parsed = json.loads(message) if isinstance(message, str) else message + if isinstance(parsed, dict): + if parsed.get('status') == 'success': + summary_result = parsed.get('summary_result') + if summary_result: + final_answer = summary_result + except (json.JSONDecodeError, ValueError): + pass + + # 记录成功的操作 + total_duration = time.time() - start_time + + # Check for workflow errors + if workflow_errors: + error_details = "; ".join([f"{e['tool']}: {e['error']}" for e in workflow_errors]) + logger.warning(f"Read workflow completed with errors: {error_details}") + + if audit_logger: + audit_logger.log_operation( + operation="READ", + config_id=config_id, + group_id=group_id, + success=False, + duration=total_duration, + error=error_details, + details={ + "search_switch": search_switch, + "history_length": len(history), + "intermediate_outputs_count": len(intermediate_outputs), + "has_answer": bool(final_answer), + "errors": workflow_errors } - except Exception as e: - # Ensure proper error handling and logging - error_msg = f"Read operation failed: {str(e)}" - logger.error(error_msg) - if audit_logger: - duration = time.time() - start_time - audit_logger.log_operation( - operation="READ", - config_id=config_id, - group_id=group_id, - success=False, - duration=duration, - error=error_msg - ) - raise ValueError(error_msg) - + ) + # Raise error if no answer was produced + if not final_answer: + raise ValueError(f"Read workflow failed: {error_details}") + + if audit_logger and not workflow_errors: + audit_logger.log_operation( + operation="READ", + config_id=config_id, + group_id=group_id, + success=True, + duration=total_duration, + details={ + "search_switch": search_switch, + "history_length": len(history), + "intermediate_outputs_count": len(intermediate_outputs), + "has_answer": bool(final_answer) + } + ) + retrieved_content=[] + repo = ShortTermMemoryRepository(db) + if str(search_switch)!="2": + for intermediate in intermediate_outputs: + print(intermediate) + intermediate_type=intermediate['type'] + if intermediate_type=="search_result": + query=intermediate['query'] + raw_results=intermediate['raw_results'] + reranked_results=raw_results.get('reranked_results',[]) + try: + statements=[statement['statement'] for statement in reranked_results.get('statements', [])] + except Exception: + statements=[] + statements=list(set(statements)) + retrieved_content.append({query:statements}) + if retrieved_content==[]: + retrieved_content='' + if '信息不足,无法回答。' != str(final_answer) and str(search_switch).strip() != "2":#and retrieved_content!=[] + # 使用 upsert 方法 + repo.upsert( + end_user_id=end_user_id, # 确保这个变量在作用域内 + messages=ori_message, + aimessages=final_answer, + retrieved_content=retrieved_content, + search_switch=str(search_switch) + ) + print("写入成功") + + + + return { + "answer": final_answer, + "intermediate_outputs": intermediate_outputs + } + + def _create_intermediate_key(self, output: Dict) -> str: + """ + Create a unique key for an intermediate output to detect duplicates. + + Args: + output: Intermediate output dictionary + + Returns: + Unique string key for this output + """ + output_type = output.get('type', 'unknown') + + if output_type == 'problem_split': + # Use type + original query as key + return f"split:{output.get('original_query', '')}" + elif output_type == 'problem_extension': + # Use type + original query as key + return f"extension:{output.get('original_query', '')}" + elif output_type == 'search_result': + # Use type + query + index as key + return f"search:{output.get('query', '')}:{output.get('index', 0)}" + elif output_type == 'retrieval_summary': + # Use type + query as key + return f"summary:{output.get('query', '')}" + elif output_type == 'verification': + # Use type + query as key + return f"verification:{output.get('query', '')}" + elif output_type == 'input_summary': + # Use type + query as key + return f"input_summary:{output.get('query', '')}" + else: + # Fallback: use JSON representation + import json + return json.dumps(output, sort_keys=True) + + def _format_intermediate_output(self, output: Dict) -> Dict: + """Format intermediate output for frontend display.""" + output_type = output.get('type', 'unknown') + + if output_type == 'problem_split': + return { + 'type': 'problem_split', + 'title': '问题拆分', + 'data': output.get('data', []), + 'original_query': output.get('original_query', '') + } + elif output_type == 'problem_extension': + return { + 'type': 'problem_extension', + 'title': '问题扩展', + 'data': output.get('data', {}), + 'original_query': output.get('original_query', '') + } + elif output_type == 'search_result': + return { + 'type': 'search_result', + 'title': f'检索结果 ({output.get("index", 0)}/{output.get("total", 0)})', + 'query': output.get('query', ''), + 'raw_results': output.get('raw_results', ''), + 'index': output.get('index', 0), + 'total': output.get('total', 0) + } + elif output_type == 'retrieval_summary': + return { + 'type': 'retrieval_summary', + 'title': '检索总结', + 'summary': output.get('summary', ''), + 'query': output.get('query', ''), + 'raw_results': output.get('raw_results'), + + } + elif output_type == 'verification': + return { + 'type': 'verification', + 'title': '数据验证', + 'result': output.get('result', 'unknown'), + 'reason': output.get('reason', ''), + 'query': output.get('query', ''), + 'verified_count': output.get('verified_count', 0) + } + elif output_type == 'input_summary': + return { + 'type': 'input_summary', + 'title': '快速答案', + 'summary': output.get('summary', ''), + 'query': output.get('query', ''), + 'raw_results': output.get('raw_results'), + + } + else: + return output async def classify_message_type(self, message: str, config_id: int, db: Session) -> Dict: """ @@ -683,7 +838,6 @@ class MemoryAgentService: # 获取当前空间下的所有宿主 from app.repositories import app_repository, end_user_repository from app.schemas.app_schema import App as AppSchema - from app.schemas.end_user_schema import EndUser as EndUserSchema # 查询应用并转换为 Pydantic 模型 apps_orm = app_repository.get_apps_by_workspace_id(db, current_workspace_id) @@ -981,6 +1135,43 @@ class MemoryAgentService: logger.info("Log streaming completed, cleaning up resources") # LogStreamer uses context manager for file handling, so cleanup is automatic +# async def get_api_docs(self, file_path: Optional[str] = None) -> Dict[str, Any]: +# """ +# Parse and return API documentation + +# Args: +# file_path: Optional path to API docs file. If None, uses default path. + +# Returns: +# Dict containing parsed API documentation or error information +# """ +# try: +# target = file_path or get_default_docs_path() + +# if not os.path.isfile(target): +# return { +# "success": False, +# "msg": "API文档文件不存在", +# "error_code": "DOC_NOT_FOUND", +# "data": {"path": target} +# } + +# data = parse_api_docs(target) +# return { +# "success": True, +# "msg": "解析成功", +# "data": data +# } +# except Exception as e: +# logger.error(f"Failed to parse API docs: {e}") +# return { +# "success": False, +# "msg": "解析失败", +# "error_code": "DOC_PARSE_ERROR", +# "data": {"error": str(e)} +# } + + def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, Any]: """ 获取终端用户关联的记忆配置 @@ -989,18 +1180,20 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An 1. 根据 end_user_id 获取用户的 app_id 2. 获取该应用的最新发布版本 3. 从发布版本的 config 字段中提取 memory_config_id + 4. 根据 memory_config_id 查询配置名称 Args: end_user_id: 终端用户ID db: 数据库会话 Returns: - 包含 memory_config_id 和相关信息的字典 + 包含 memory_config_id、config_name 和相关信息的字典 Raises: ValueError: 当终端用户不存在或应用未发布时 """ from app.models.app_release_model import AppRelease + from app.models.data_config_model import DataConfig from app.models.end_user_model import EndUser from sqlalchemy import select @@ -1034,15 +1227,31 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An memory_obj = config.get('memory', {}) memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None + # 4. 根据 memory_config_id 查询配置名称 + config_name = None + if memory_config_id: + try: + # memory_config_id 可能是整数或字符串,需要转换 + config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id + data_config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first() + if data_config: + config_name = data_config.config_name + logger.debug(f"Found config_name: {config_name} for config_id: {config_id}") + else: + logger.warning(f"DataConfig not found for config_id: {config_id}") + except (ValueError, TypeError) as e: + logger.warning(f"Invalid memory_config_id format: {memory_config_id}, error: {str(e)}") + result = { "end_user_id": str(end_user_id), "app_id": str(app_id), "release_id": str(latest_release.id), "release_version": latest_release.version, - "memory_config_id": memory_config_id + "memory_config_id": memory_config_id, + "memory_config_name": config_name } - logger.info(f"Successfully retrieved connected config: memory_config_id={memory_config_id}") + logger.info(f"Successfully retrieved connected config: memory_config_id={memory_config_id}, config_name={config_name}") return result @@ -1050,112 +1259,126 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) """ 批量获取多个终端用户关联的记忆配置 - 通过以下流程获取配置: - 1. 批量查询所有 end_user 及其 app_id - 2. 批量获取所有应用的最新发布版本 - 3. 从发布版本的 config 字段中提取 memory_config_id 和 memory_config_name + 通过优化的查询减少数据库往返次数: + 1. 一次性查询所有 end_user 及其 app_id + 2. 批量查询所有相关的 app_release + 3. 批量查询所有相关的 data_config Args: end_user_ids: 终端用户ID列表 db: 数据库会话 Returns: - 字典,key 为 end_user_id,value 为包含 memory_config_id 和 memory_config_name 的字典 - 格式: { - "user_id_1": {"memory_config_id": "xxx", "memory_config_name": "xxx"}, - "user_id_2": {"memory_config_id": None, "memory_config_name": None}, - ... - } + 字典,key 为 end_user_id,value 为配置信息字典 + 对于查询失败的用户,value 包含 error 字段 """ from app.models.app_release_model import AppRelease + from app.models.data_config_model import DataConfig from app.models.end_user_model import EndUser - from app.models.memory_config_model import MemoryConfig from sqlalchemy import select - logger.info(f"Batch getting connected configs for {len(end_user_ids)} end_users") + logger.info(f"Batch getting connected configs for {len(end_user_ids)} end users") result = {} # 1. 批量查询所有 end_user 及其 app_id end_users = db.query(EndUser).filter(EndUser.id.in_(end_user_ids)).all() - # 创建 end_user_id 到 app_id 的映射 - user_to_app = {str(eu.id): eu.app_id for eu in end_users} + # 构建 end_user_id -> end_user 的映射 + end_user_map = {str(user.id): user for user in end_users} - # 获取所有相关的 app_id - app_ids = list(set(user_to_app.values())) + # 记录不存在的用户 + for user_id in end_user_ids: + if user_id not in end_user_map: + result[user_id] = { + "end_user_id": user_id, + "memory_config_id": None, + "memory_config_name": None, + "error": f"终端用户不存在: {user_id}" + } - if not app_ids: - logger.warning("No valid app_ids found for the provided end_user_ids") - # 返回空配置 - for user_id in end_user_ids: - result[user_id] = {"memory_config_id": None, "memory_config_name": None} + if not end_users: + logger.warning("No valid end users found") return result - # 2. 批量获取所有应用的最新发布版本 + # 2. 批量查询所有相关应用的最新发布版本 + app_ids = [user.app_id for user in end_users] + # 使用子查询找到每个 app 的最新版本 - from sqlalchemy import func + from sqlalchemy import and_ - subq = ( - select( - AppRelease.app_id, - func.max(AppRelease.version).label('max_version') + # 查询所有相关的活跃发布版本 + releases = db.query(AppRelease).filter( + and_( + AppRelease.app_id.in_(app_ids), + AppRelease.is_active.is_(True) ) - .where(AppRelease.app_id.in_(app_ids), AppRelease.is_active.is_(True)) - .group_by(AppRelease.app_id) - .subquery() - ) + ).order_by(AppRelease.app_id, AppRelease.version.desc()).all() - stmt = ( - select(AppRelease) - .join( - subq, - (AppRelease.app_id == subq.c.app_id) & (AppRelease.version == subq.c.max_version) - ) - .where(AppRelease.is_active.is_(True)) - ) + # 构建 app_id -> latest_release 的映射(每个 app 只保留最新版本) + app_release_map = {} + for release in releases: + app_id_str = str(release.app_id) + if app_id_str not in app_release_map: + app_release_map[app_id_str] = release - latest_releases = db.scalars(stmt).all() - - # 创建 app_id 到 release 的映射 - app_to_release = {str(release.app_id): release for release in latest_releases} - - # 3. 提取所有 memory_config_id + # 3. 收集所有 memory_config_id memory_config_ids = [] - for release in latest_releases: + for release in app_release_map.values(): config = release.config or {} memory_obj = config.get('memory', {}) memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None if memory_config_id: - memory_config_ids.append(memory_config_id) + try: + config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id + memory_config_ids.append(config_id) + except (ValueError, TypeError): + pass - # 4. 批量查询 memory_config_name - memory_configs = {} + # 4. 批量查询所有 data_config + config_name_map = {} if memory_config_ids: - configs = db.query(MemoryConfig).filter(MemoryConfig.id.in_(memory_config_ids)).all() - memory_configs = {str(cfg.id): cfg.config_name for cfg in configs} + data_configs = db.query(DataConfig).filter( + DataConfig.config_id.in_(memory_config_ids) + ).all() + config_name_map = {config.config_id: config.config_name for config in data_configs} # 5. 组装结果 - for user_id in end_user_ids: - app_id = user_to_app.get(user_id) - if not app_id: - result[user_id] = {"memory_config_id": None, "memory_config_name": None} + for user in end_users: + user_id = str(user.id) + app_id = str(user.app_id) + + # 检查是否有发布版本 + if app_id not in app_release_map: + result[user_id] = { + "end_user_id": user_id, + "memory_config_id": None, + "memory_config_name": None, + "error": f"应用未发布: {app_id}" + } continue - release = app_to_release.get(str(app_id)) - if not release: - result[user_id] = {"memory_config_id": None, "memory_config_name": None} - continue + release = app_release_map[app_id] + # 提取 memory_config_id config = release.config or {} memory_obj = config.get('memory', {}) memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None - memory_config_name = memory_configs.get(memory_config_id) if memory_config_id else None + + # 获取 config_name + config_name = None + if memory_config_id: + try: + config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id + config_name = config_name_map.get(config_id) + except (ValueError, TypeError): + pass result[user_id] = { + "end_user_id": user_id, "memory_config_id": memory_config_id, - "memory_config_name": memory_config_name + "memory_config_name": config_name } - logger.info(f"Successfully retrieved {len(result)} connected configs") + logger.info(f"Successfully retrieved batch configs: total={len(result)}, with_config={sum(1 for v in result.values() if v.get('memory_config_id'))}") return result \ No newline at end of file diff --git a/api/app/services/memory_entity_relationship_service.py b/api/app/services/memory_entity_relationship_service.py index ca97fb39..eedb7c29 100644 --- a/api/app/services/memory_entity_relationship_service.py +++ b/api/app/services/memory_entity_relationship_service.py @@ -597,7 +597,7 @@ class MemoryInteraction: group_id = ori_data[0]['group_id'] Space_User = await self.connector.execute_query(Memory_Space_User, group_id=group_id) if not Space_User: - return '不存在用户' + return [] user_id=Space_User[0]['id'] results = await self.connector.execute_query(Memory_Space_Associative, id=self.id,user_id=user_id) diff --git a/api/app/services/memory_forget_service.py b/api/app/services/memory_forget_service.py index 8979682d..2db4cdc7 100644 --- a/api/app/services/memory_forget_service.py +++ b/api/app/services/memory_forget_service.py @@ -267,14 +267,14 @@ class MemoryForgetService: elif node_type_label == 'memorysummary': node_type_label = 'summary' - # 将 Neo4j DateTime 对象转换为时间戳 + # 将 Neo4j DateTime 对象转换为时间戳(毫秒) last_access_time = result['last_access_time'] last_access_dt = convert_neo4j_datetime_to_python(last_access_time) # 确保 datetime 带有时区信息(假定为 UTC),避免 naive datetime 导致的时区偏差 if last_access_dt: if last_access_dt.tzinfo is None: last_access_dt = last_access_dt.replace(tzinfo=timezone.utc) - last_access_timestamp = int(last_access_dt.timestamp()) + last_access_timestamp = int(last_access_dt.timestamp() * 1000) else: last_access_timestamp = 0 @@ -520,7 +520,7 @@ class MemoryForgetService: 'average_activation_value': result['average_activation'], 'low_activation_nodes': result['low_activation_nodes'] or 0, 'forgetting_threshold': forgetting_threshold, - 'timestamp': int(datetime.now().timestamp()) + 'timestamp': int(datetime.now().timestamp() * 1000) } else: activation_metrics = { @@ -530,7 +530,7 @@ class MemoryForgetService: 'average_activation_value': None, 'low_activation_nodes': 0, 'forgetting_threshold': forgetting_threshold, - 'timestamp': int(datetime.now().timestamp()) + 'timestamp': int(datetime.now().timestamp() * 1000) } # 收集节点类型分布 @@ -620,7 +620,7 @@ class MemoryForgetService: 'merged_count': record.merged_count, 'average_activation': record.average_activation_value, 'total_nodes': record.total_nodes, - 'execution_time': int(record.execution_time.timestamp()) + 'execution_time': int(record.execution_time.timestamp() * 1000) }) api_logger.info(f"成功获取最近 {len(recent_trends)} 个日期的历史趋势数据") @@ -661,7 +661,7 @@ class MemoryForgetService: 'node_distribution': node_distribution, 'recent_trends': recent_trends, 'pending_nodes': pending_nodes, - 'timestamp': int(datetime.now().timestamp()) + 'timestamp': int(datetime.now().timestamp() * 1000) } api_logger.info( diff --git a/api/app/services/multi_agent_orchestrator.py b/api/app/services/multi_agent_orchestrator.py index b0c7a957..1972f344 100644 --- a/api/app/services/multi_agent_orchestrator.py +++ b/api/app/services/multi_agent_orchestrator.py @@ -1327,7 +1327,8 @@ class MultiAgentOrchestrator: web_search=web_search, memory=memory, storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id + user_rag_memory_id=user_rag_memory_id, + sub_agent=True ) return result diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 8f25f477..9221ab06 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -16,6 +16,7 @@ from app.db import get_db_context from app.repositories.conversation_repository import ConversationRepository from app.repositories.end_user_repository import EndUserRepository from app.repositories.neo4j.neo4j_connector import Neo4jConnector +from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping from app.services.implicit_memory_service import ImplicitMemoryService from app.services.memory_base_service import MemoryBaseService from app.services.memory_config_service import MemoryConfigService diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index f9988352..974d5418 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -13,11 +13,10 @@ from sqlalchemy.orm import Session from app.core.error_codes import BizCode from app.core.exceptions import BusinessException from app.core.workflow.validator import validate_workflow_config -from app.db import get_db, get_db_context +from app.db import get_db +from app.models.conversation_model import Message from app.models.workflow_model import WorkflowConfig, WorkflowExecution from app.repositories.conversation_repository import MessageRepository -from app.models.conversation_model import Message -from app.repositories.end_user_repository import EndUserRepository from app.repositories.workflow_repository import ( WorkflowConfigRepository, WorkflowExecutionRepository, @@ -483,14 +482,6 @@ class WorkflowService: try: # 更新状态为运行中 self.update_execution_status(execution.execution_id, "running") - with get_db_context() as db: - end_user_repo = EndUserRepository(db) - new_end_user = end_user_repo.get_or_create_end_user( - app_id=app_id, - other_id=payload.user_id, - original_user_id=payload.user_id # Save original user_id to other_id - ) - end_user_id = str(new_end_user.id) executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) @@ -511,7 +502,7 @@ class WorkflowService: input_data=input_data, execution_id=execution.execution_id, workspace_id=str(workspace_id), - user_id=end_user_id + user_id=payload.user_id ) # 更新执行结果 @@ -638,14 +629,6 @@ class WorkflowService: try: # 更新状态为运行中 self.update_execution_status(execution.execution_id, "running") - with get_db_context() as db: - end_user_repo = EndUserRepository(db) - new_end_user = end_user_repo.get_or_create_end_user( - app_id=app_id, - other_id=payload.user_id, - original_user_id=payload.user_id # Save original user_id to other_id - ) - end_user_id = str(new_end_user.id) executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) for exec_res in executions: @@ -665,7 +648,7 @@ class WorkflowService: input_data=input_data, execution_id=execution.execution_id, workspace_id=str(workspace_id), - user_id=end_user_id + user_id=payload.user_id ): if event.get("event") == "workflow_end": diff --git a/api/app/utils/app_config_utils.py b/api/app/utils/app_config_utils.py index 834d22af..4a35a4cc 100644 --- a/api/app/utils/app_config_utils.py +++ b/api/app/utils/app_config_utils.py @@ -8,9 +8,11 @@ import uuid from typing import Dict, Any, Optional, Union from datetime import datetime +from app.db import get_db_read from app.models import AppRelease, WorkflowConfig from app.models.agent_app_config_model import AgentConfig from app.models.multi_agent_model import MultiAgentConfig +from app.repositories.workflow_repository import WorkflowConfigRepository def model_parameters_to_dict(model_parameters: Any) -> Optional[Dict[str, Any]]: @@ -24,18 +26,18 @@ def model_parameters_to_dict(model_parameters: Any) -> Optional[Dict[str, Any]]: """ if model_parameters is None: return None - + if isinstance(model_parameters, dict): return model_parameters - + # Pydantic v2 if hasattr(model_parameters, 'model_dump'): return model_parameters.model_dump() - + # Pydantic v1 if hasattr(model_parameters, 'dict'): return model_parameters.dict() - + # 其他情况尝试转换 try: return dict(model_parameters) @@ -54,17 +56,18 @@ def dict_to_model_parameters(data: Optional[Dict[str, Any]]) -> Optional[Any]: """ if data is None: return None - + from app.schemas import ModelParameters - + if isinstance(data, ModelParameters): return data - + if isinstance(data, dict): return ModelParameters(**data) - + return None + class AgentConfigProxy: """Proxy class for AgentConfig (legacy compatibility)""" @@ -78,8 +81,7 @@ class AgentConfigProxy: self.default_model_config_id = release.default_model_config_id -def agent_config_4_app_release(release: AppRelease ) -> AgentConfig: - +def agent_config_4_app_release(release: AppRelease) -> AgentConfig: config_dict = release.config agent_config = AgentConfig( @@ -95,18 +97,17 @@ def agent_config_4_app_release(release: AppRelease ) -> AgentConfig: return agent_config -def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig: +def multi_agent_config_4_app_release(release: AppRelease) -> MultiAgentConfig: config_dict = release.config - agent_config = MultiAgentConfig( app_id=release.app_id, default_model_config_id=release.default_model_config_id, model_parameters=config_dict.get("model_parameters"), master_agent_id=config_dict.get("master_agent_id"), master_agent_name=config_dict.get("master_agent_name"), - orchestration_mode=config_dict.get("orchestration_mode", "conditional"), + orchestration_mode=config_dict.get("orchestration_mode", "supervisor"), sub_agents=config_dict.get("sub_agents", []), routing_rules=config_dict.get("routing_rules"), execution_config=config_dict.get("execution_config", {}), @@ -116,24 +117,26 @@ def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig: return agent_config -def workflow_config_4_app_release(release: AppRelease ) -> WorkflowConfig: +def workflow_config_4_app_release(release: AppRelease) -> WorkflowConfig: config_dict = release.config - + with get_db_read() as db: + source_config = WorkflowConfigRepository(db).get_by_app_id(release.app_id) + source_config_id = source_config.id config = WorkflowConfig( - id=release.id, + id=source_config_id, app_id=release.app_id, nodes=config_dict.get("nodes", []), edges=config_dict.get("edges", []), variables=config_dict.get("variables", []), execution_config=config_dict.get("execution_config", {}), triggers=config_dict.get("triggers", []) - ) return config + def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): """Convert dict to MultiAgentConfig model object @@ -149,7 +152,7 @@ def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uui ... "app_id": "uuid-here", ... "master_agent_id": "master-uuid", ... "master_agent_name": "Master Agent", - ... "orchestration_mode": "conditional", + ... "orchestration_mode": "supervisor", ... "sub_agents": [ ... {"agent_id": "sub1-uuid", "name": "Sub Agent 1", "role": "specialist", "priority": 1}, ... {"agent_id": "sub2-uuid", "name": "Sub Agent 2", "role": "specialist", "priority": 2} @@ -186,7 +189,7 @@ def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uui app_id=final_app_id, master_agent_id=master_agent_id, master_agent_name=config_dict.get("master_agent_name"), - orchestration_mode=config_dict.get("orchestration_mode", "conditional"), + orchestration_mode=config_dict.get("orchestration_mode", "supervisor"), sub_agents=config_dict.get("sub_agents", []), routing_rules=config_dict.get("routing_rules"), execution_config=config_dict.get("execution_config", {}), @@ -276,7 +279,8 @@ def agent_config_to_dict(agent_config) -> Dict[str, Any]: "id": str(agent_config.id), "app_id": str(agent_config.app_id), "system_prompt": agent_config.system_prompt, - "default_model_config_id": str(agent_config.default_model_config_id) if agent_config.default_model_config_id else None, + "default_model_config_id": str( + agent_config.default_model_config_id) if agent_config.default_model_config_id else None, "model_parameters": agent_config.model_parameters, "knowledge_retrieval": agent_config.knowledge_retrieval, "memory": agent_config.memory, @@ -338,6 +342,3 @@ def workflow_config_to_dict(workflow_config) -> Dict[str, Any]: "created_at": workflow_config.created_at.isoformat() if workflow_config.created_at else None, "updated_at": workflow_config.updated_at.isoformat() if workflow_config.updated_at else None } - - - diff --git a/api/app/version_info.json b/api/app/version_info.json index 87e313e4..20896845 100644 --- a/api/app/version_info.json +++ b/api/app/version_info.json @@ -1,33 +1,68 @@ { "v0.2.0": { - "codeName": "启知", - "releaseDate": "2026-1-16", - "upgradePosition": "本次为架构升级,核心目标是把“被动存储”升级为“主动认知”,让系统具备情绪感知、情景理解与类人记忆机制,为后续多智能体协作与专业场景落地奠定底座。", - "coreUpgrades": [ - "记忆详情:拟人记忆——情绪引擎、情景记忆、短期记忆、工作记忆、感知记忆、显性记忆、隐性记忆,并配套类脑遗忘机制,实现从感知→情绪→情景→长期沉淀的完整人类记忆闭环", - "可视化工作流:拖拽式节点编排(LLM、知识库、逻辑、工具),业务落地周期由天缩至小时。", - "多模态知识处理:PDF、PPT、MP3、MP4 一键解析,时间感知检索准确率 94.3%,问答对数据即插即用。", - "Agent集群内置“记忆-知识-工具-审核”四类角色模板,用户一键生成;主控Agent把复杂任务拆为子任务并行分发,再靠情景记忆统一消解冲突、校验一致性,输出完整报告。" - ] + "introduction": { + "codeName": "启知", + "releaseDate": "2026-1-16", + "upgradePosition": "本次为架构升级,核心目标是把\"被动存储\"升级为\"主动认知\",让系统具备情绪感知、情景理解与类人记忆机制,为后续多智能体协作与专业场景落地奠定底座。", + "coreUpgrades": [ + "记忆详情:拟人记忆——情绪引擎、情景记忆、短期记忆、工作记忆、感知记忆、显性记忆、隐性记忆,并配套类脑遗忘机制,实现从感知→情绪→情景→长期沉淀的完整人类记忆闭环", + "可视化工作流:拖拽式节点编排(LLM、知识库、逻辑、工具),业务落地周期由天缩至小时。", + "多模态知识处理:PDF、PPT、MP3、MP4 一键解析,时间感知检索准确率 94.3%,问答对数据即插即用。", + "Agent集群内置\"记忆-知识-工具-审核\"四类角色模板,用户一键生成;主控Agent把复杂任务拆为子任务并行分发,再靠情景记忆统一消解冲突、校验一致性,输出完整报告。" + ] + }, + "introduction_en": { + "codeName": "Qizhi", + "releaseDate": "2026-1-16", + "upgradePosition": "This release marks a foundational upgrade to the system’s cognitive architecture. The core objective is to evolve the platform from passive information storage into active cognitive intelligence—enabling emotional awareness, situational understanding, and human-like memory mechanisms. This upgrade lays the groundwork for future multi-agent collaboration and domain-specific, production-grade AI applications.", + "coreUpgrades": [ + "Human-Like Memory Architecture: A comprehensive, human-inspired memory system is introduced, encompassing emotional processing, situational memory, short-term and working memory, perceptual memory, as well as explicit and implicit memory. Combined with brain-inspired forgetting mechanisms, the system now supports a complete cognitive loop—from perception → emotion → context → long-term consolidation, closely mirroring human memory formation.", + "Visual Workflow Orchestration: A fully visual, drag-and-drop workflow enables modular composition of LLMs, knowledge bases, logic, and tools. This dramatically reduces the time required to move from experimentation to production—from days to hours.", + "Multimodal Knowledge Processing: The system now supports one-click parsing and ingestion of PDF, PPT, MP3, and MP4 content. With time-aware retrieval accuracy reaching 94.3%, structured Q&A data becomes instantly usable for downstream reasoning and generation.", + "Built-in Agent Clusters: Predefined role templates across four categories—Memory, Knowledge, Tools, and Review—can be generated with a single click. A Coordinator Agent decomposes complex tasks into parallel subtasks, while situational memory is used to resolve conflicts, validate consistency, and synthesize outputs into a coherent, end-to-end report." + ] + } }, "v0.1.0": { - "codeName": "初心", - "releaseDate": "2025-12-01", - "upgradePosition": "这是一款专注于管理和利用AI记忆的工具,支持RAG和知识图谱两种主流存储方式,旨在为AI应用提供持久化、结构化的“记忆”能力。", - "coreUpgrades": [ - "记忆空间:用户可以创建独立的空间来隔离不同记忆,并灵活选择存储方式。", - "记忆配置:简化了配置流程,内置自动提取关键信息的“记忆萃取”和管理生命周期的\"遗忘\"引擎。", - "知识检索:提供语义、分词和混合三种检索模式,并支持多种参数微调和结果重排序,以提升召回效果。", - "全局管理:支持统一设置默认检索参数,并可一键应用到所有知识库。", - "测试与调试:内置\"召回测试\"功能,方便用户实时验证检索效果并调整参数,支持通过分享码与他人协作。", - "记忆洞察:可查看详细的对话记录、用户画像和分析报告,帮助理解AI的\"记忆\"内容。", - "集成与管理:提供API Key用于系统集成,并包含基本的用户管理功能。", - "界面与体验:采用现代化的卡片式布局和渐变色设计,注重交互的流畅性和视觉美感。", - "起步与使用:文档中提供了清晰的基础使用流程,引导用户从创建空间、配置记忆到测试检索快速上手。", - "版本说明与限制: 记忆熊 v0.1.0 版本\"初心\"囊括智能记忆管理的核心思路和基础能力,为后续开发奠定了基础。", - "文档资源:用户手册、API文档、FAQ", - "问题反馈:GitHub Issues、邮件支持", - "致谢:感谢所有参与测试和提供反馈的用户!" - ] + "introduction": { + "codeName": "初心", + "releaseDate": "2025-12-01", + "upgradePosition": "这是一款专注于管理和利用AI记忆的工具,支持RAG和知识图谱两种主流存储方式,旨在为AI应用提供持久化、结构化的\"记忆\"能力。", + "coreUpgrades": [ + "记忆空间:用户可以创建独立的空间来隔离不同记忆,并灵活选择存储方式。", + "记忆配置:简化了配置流程,内置自动提取关键信息的\"记忆萃取\"和管理生命周期的\"遗忘\"引擎。", + "知识检索:提供语义、分词和混合三种检索模式,并支持多种参数微调和结果重排序,以提升召回效果。", + "全局管理:支持统一设置默认检索参数,并可一键应用到所有知识库。", + "测试与调试:内置\"召回测试\"功能,方便用户实时验证检索效果并调整参数,支持通过分享码与他人协作。", + "记忆洞察:可查看详细的对话记录、用户画像和分析报告,帮助理解AI的\"记忆\"内容。", + "集成与管理:提供API Key用于系统集成,并包含基本的用户管理功能。", + "界面与体验:采用现代化的卡片式布局和渐变色设计,注重交互的流畅性和视觉美感。", + "起步与使用:文档中提供了清晰的基础使用流程,引导用户从创建空间、配置记忆到测试检索快速上手。", + "版本说明与限制: 记忆熊 v0.1.0 版本\"初心\"囊括智能记忆管理的核心思路和基础能力,为后续开发奠定了基础。", + "文档资源:用户手册、API文档、FAQ", + "问题反馈:GitHub Issues、邮件支持", + "致谢:感谢所有参与测试和提供反馈的用户!" + ] + }, + "introduction_en": { + "codeName": "Original Intent", + "releaseDate": "2025-12-01", + "upgradePosition": "A tool focused on managing and utilizing AI memory, supporting both RAG and knowledge graph storage methods, aiming to provide persistent and structured 'memory' capabilities for AI applications.", + "coreUpgrades": [ + "Memory Space: Users can create independent spaces to isolate different memories and flexibly choose storage methods.", + "Memory Configuration: Simplified configuration process with built-in 'memory extraction' for automatic key information extraction and 'forgetting' engine for lifecycle management.", + "Knowledge Retrieval: Provides semantic, tokenization, and hybrid retrieval modes with various parameter tuning and result reranking to improve recall.", + "Global Management: Supports unified default retrieval parameter settings with one-click application to all knowledge bases.", + "Testing & Debugging: Built-in 'recall testing' for real-time verification of retrieval effects and parameter adjustment, with sharing code support for collaboration.", + "Memory Insights: View detailed conversation records, user profiles, and analysis reports to understand AI 'memory' content.", + "Integration & Management: Provides API Key for system integration with basic user management features.", + "Interface & Experience: Modern card-based layout with gradient design, focusing on interaction fluidity and visual aesthetics.", + "Getting Started: Documentation provides clear basic usage flow, guiding users from creating spaces, configuring memory to testing retrieval.", + "Version Notes: MemoryBear v0.1.0 'Original Intent' encompasses core concepts and basic capabilities of intelligent memory management, laying foundation for future development.", + "Documentation: User Manual, API Documentation, FAQ", + "Feedback: GitHub Issues, Email Support", + "Acknowledgments: Thanks to all users who participated in testing and provided feedback!" + ] + } } -} \ No newline at end of file +} diff --git a/api/migrations/versions/1fd7d0e703b3_20260116.py b/api/migrations/versions/1fd7d0e703b3_20260116.py new file mode 100644 index 00000000..0b1453cb --- /dev/null +++ b/api/migrations/versions/1fd7d0e703b3_20260116.py @@ -0,0 +1,62 @@ +"""20260116 + +Revision ID: 1fd7d0e703b3 +Revises: 9ab9b6393f32 +Create Date: 2026-01-16 13:17:37.060026 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '1fd7d0e703b3' +down_revision: Union[str, None] = '9ab9b6393f32' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('emotion_suggestions_cache', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('end_user_id', sa.String(length=255), nullable=False, comment='终端用户ID(组ID)'), + sa.Column('health_summary', sa.Text(), nullable=False, comment='健康状态摘要'), + sa.Column('suggestions', sa.JSON(), nullable=False, comment='建议列表(JSON格式)'), + sa.Column('generated_at', sa.DateTime(), nullable=False, comment='生成时间'), + sa.Column('expires_at', sa.DateTime(), nullable=True, comment='过期时间'), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_emotion_suggestions_cache_end_user_id'), 'emotion_suggestions_cache', ['end_user_id'], unique=True) + op.create_index(op.f('ix_emotion_suggestions_cache_id'), 'emotion_suggestions_cache', ['id'], unique=False) + op.create_table('implicit_memory_cache', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('end_user_id', sa.String(length=255), nullable=False, comment='终端用户ID'), + sa.Column('preferences', sa.JSON(), nullable=False, comment='偏好标签列表(JSON格式)'), + sa.Column('portrait', sa.JSON(), nullable=False, comment='四维画像对象(JSON格式)'), + sa.Column('interest_areas', sa.JSON(), nullable=False, comment='兴趣领域分布对象(JSON格式)'), + sa.Column('habits', sa.JSON(), nullable=False, comment='行为习惯列表(JSON格式)'), + sa.Column('generated_at', sa.DateTime(), nullable=False, comment='生成时间'), + sa.Column('expires_at', sa.DateTime(), nullable=True, comment='过期时间'), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_implicit_memory_cache_end_user_id'), 'implicit_memory_cache', ['end_user_id'], unique=True) + op.create_index(op.f('ix_implicit_memory_cache_id'), 'implicit_memory_cache', ['id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_implicit_memory_cache_id'), table_name='implicit_memory_cache') + op.drop_index(op.f('ix_implicit_memory_cache_end_user_id'), table_name='implicit_memory_cache') + op.drop_table('implicit_memory_cache') + op.drop_index(op.f('ix_emotion_suggestions_cache_id'), table_name='emotion_suggestions_cache') + op.drop_index(op.f('ix_emotion_suggestions_cache_end_user_id'), table_name='emotion_suggestions_cache') + op.drop_table('emotion_suggestions_cache') + # ### end Alembic commands ### diff --git a/web/src/App.tsx b/web/src/App.tsx index 8e3140d9..1abbc2cc 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -30,15 +30,12 @@ import 'dayjs/plugin/utc' import { cookieUtils } from './utils/request'; - - - function App() { const { t } = useTranslation(); const { locale, language, timeZone } = useI18n() useEffect(() => { const authToken = cookieUtils.get('authToken') - if (!authToken && !window.location.hash.includes('#/login')) { + if (!authToken && !window.location.hash.includes('#/login') && !window.location.hash.includes('#/conversation/')) { window.location.href = `/#/login`; } }, []) diff --git a/web/src/api/common.ts b/web/src/api/common.ts index 01568640..2f6033d1 100644 --- a/web/src/api/common.ts +++ b/web/src/api/common.ts @@ -31,6 +31,12 @@ export interface versionResponse{ coreUpgrades: string[]; codeName: string; }; + introduction_en?: { + releaseDate: string; + upgradePosition: string; + coreUpgrades: string[]; + codeName: string; + }; } // 首页数据统计 export const getDashboardData = `/home-page/workspaces` diff --git a/web/src/assets/images/empty/chatEmpty.png b/web/src/assets/images/empty/chatEmpty.png new file mode 100644 index 00000000..8ce1f719 Binary files /dev/null and b/web/src/assets/images/empty/chatEmpty.png differ diff --git a/web/src/assets/images/menu/helpCenter.svg b/web/src/assets/images/menu/helpCenter.svg new file mode 100644 index 00000000..504e309c --- /dev/null +++ b/web/src/assets/images/menu/helpCenter.svg @@ -0,0 +1,14 @@ + + \ No newline at end of file diff --git a/web/src/assets/images/menu/helpCenter_active.svg b/web/src/assets/images/menu/helpCenter_active.svg new file mode 100644 index 00000000..2840c421 --- /dev/null +++ b/web/src/assets/images/menu/helpCenter_active.svg @@ -0,0 +1,14 @@ + + \ No newline at end of file diff --git a/web/src/components/Layout/AuthLayout.tsx b/web/src/components/Layout/AuthLayout.tsx index 94d28e11..a969298d 100644 --- a/web/src/components/Layout/AuthLayout.tsx +++ b/web/src/components/Layout/AuthLayout.tsx @@ -13,7 +13,7 @@ const { Content } = Layout; // 认证布局组件,使用useRouteGuard hook进行路由鉴权 const AuthLayout: FC = () => { - const { getUserInfo, getStorageType } = useUser(); + const { getUserInfo } = useUser(); // 使用路由守卫hook处理认证和权限检查 useRouteGuard('manage'); // 自动更新面包屑导航 @@ -24,7 +24,6 @@ const AuthLayout: FC = () => { window.location.href = `/#/login`; } else { getUserInfo() - getStorageType() } }, []); diff --git a/web/src/components/Layout/NoAuthLayout.tsx b/web/src/components/Layout/NoAuthLayout.tsx new file mode 100644 index 00000000..a2e6f274 --- /dev/null +++ b/web/src/components/Layout/NoAuthLayout.tsx @@ -0,0 +1,14 @@ +import { Outlet } from 'react-router-dom'; +import { type FC } from 'react'; + +// 基础布局组件,用于展示内容并保留用户信息获取功能 +const NoAuthLayout: FC = () => { + + return ( +
{children}
}
export default Code
diff --git a/web/src/components/Table/index.tsx b/web/src/components/Table/index.tsx
index 62c68dc3..08a7e627 100644
--- a/web/src/components/Table/index.tsx
+++ b/web/src/components/Table/index.tsx
@@ -16,6 +16,7 @@ interface TableComponentProps extends Omit