diff --git a/api/app/cache/__init__.py b/api/app/cache/__init__.py new file mode 100644 index 00000000..a79d4cb2 --- /dev/null +++ b/api/app/cache/__init__.py @@ -0,0 +1,11 @@ +""" +Cache 缓存模块 + +提供各种缓存功能的统一入口 +""" +from .memory import EmotionMemoryCache, ImplicitMemoryCache + +__all__ = [ + "EmotionMemoryCache", + "ImplicitMemoryCache", +] diff --git a/api/app/cache/memory/__init__.py b/api/app/cache/memory/__init__.py new file mode 100644 index 00000000..4ada3153 --- /dev/null +++ b/api/app/cache/memory/__init__.py @@ -0,0 +1,12 @@ +""" +Memory 缓存模块 + +提供记忆系统相关的缓存功能 +""" +from .emotion_memory import EmotionMemoryCache +from .implicit_memory import ImplicitMemoryCache + +__all__ = [ + "EmotionMemoryCache", + "ImplicitMemoryCache", +] diff --git a/api/app/cache/memory/emotion_memory.py b/api/app/cache/memory/emotion_memory.py new file mode 100644 index 00000000..45ea90de --- /dev/null +++ b/api/app/cache/memory/emotion_memory.py @@ -0,0 +1,134 @@ +""" +Emotion Suggestions Cache + +情绪个性化建议缓存模块 +用于缓存用户的情绪个性化建议数据 +""" +import json +import logging +from typing import Optional, Dict, Any +from datetime import datetime + +from app.aioRedis import aio_redis + +logger = logging.getLogger(__name__) + + +class EmotionMemoryCache: + """情绪建议缓存类""" + + # Key 前缀 + PREFIX = "cache:memory:emotion_memory" + + @classmethod + def _get_key(cls, *parts: str) -> str: + """生成 Redis key + + Args: + *parts: key 的各个部分 + + Returns: + 完整的 Redis key + """ + return ":".join([cls.PREFIX] + list(parts)) + + @classmethod + async def set_emotion_suggestions( + cls, + user_id: str, + suggestions_data: Dict[str, Any], + expire: int = 86400 + ) -> bool: + """设置用户情绪建议缓存 + + Args: + user_id: 用户ID(end_user_id) + suggestions_data: 建议数据字典,包含: + - health_summary: 健康状态摘要 + - suggestions: 建议列表 + - generated_at: 生成时间(可选) + expire: 过期时间(秒),默认24小时(86400秒) + + Returns: + 是否设置成功 + """ + try: + key = cls._get_key("suggestions", user_id) + + # 添加生成时间戳 + if "generated_at" not in suggestions_data: + suggestions_data["generated_at"] = datetime.now().isoformat() + + # 添加缓存标记 + suggestions_data["cached"] = True + + value = json.dumps(suggestions_data, ensure_ascii=False) + await aio_redis.set(key, value, ex=expire) + logger.info(f"设置情绪建议缓存成功: {key}, 过期时间: {expire}秒") + return True + except Exception as e: + logger.error(f"设置情绪建议缓存失败: {e}", exc_info=True) + return False + + @classmethod + async def get_emotion_suggestions(cls, user_id: str) -> Optional[Dict[str, Any]]: + """获取用户情绪建议缓存 + + Args: + user_id: 用户ID(end_user_id) + + Returns: + 建议数据字典,如果不存在或已过期返回 None + """ + try: + key = cls._get_key("suggestions", user_id) + value = await aio_redis.get(key) + + if value: + data = json.loads(value) + logger.info(f"成功获取情绪建议缓存: {key}") + return data + + logger.info(f"情绪建议缓存不存在或已过期: {key}") + return None + except Exception as e: + logger.error(f"获取情绪建议缓存失败: {e}", exc_info=True) + return None + + @classmethod + async def delete_emotion_suggestions(cls, user_id: str) -> bool: + """删除用户情绪建议缓存 + + Args: + user_id: 用户ID(end_user_id) + + Returns: + 是否删除成功 + """ + try: + key = cls._get_key("suggestions", user_id) + result = await aio_redis.delete(key) + logger.info(f"删除情绪建议缓存: {key}, 结果: {result}") + return result > 0 + except Exception as e: + logger.error(f"删除情绪建议缓存失败: {e}", exc_info=True) + return False + + @classmethod + async def get_suggestions_ttl(cls, user_id: str) -> int: + """获取情绪建议缓存的剩余过期时间 + + Args: + user_id: 用户ID(end_user_id) + + Returns: + 剩余秒数,-1表示永不过期,-2表示key不存在 + """ + try: + key = cls._get_key("suggestions", user_id) + ttl = await aio_redis.ttl(key) + logger.debug(f"情绪建议缓存TTL: {key} = {ttl}秒") + return ttl + except Exception as e: + logger.error(f"获取情绪建议缓存TTL失败: {e}") + return -2 diff --git a/api/app/cache/memory/implicit_memory.py b/api/app/cache/memory/implicit_memory.py new file mode 100644 index 00000000..21f08e9a --- /dev/null +++ b/api/app/cache/memory/implicit_memory.py @@ -0,0 +1,136 @@ +""" +Implicit Memory Profile Cache + +隐式记忆用户画像缓存模块 +用于缓存用户的完整画像数据(偏好标签、四维画像、兴趣领域、行为习惯) +""" +import json +import logging +from typing import Optional, Dict, Any +from datetime import datetime + +from app.aioRedis import aio_redis + +logger = logging.getLogger(__name__) + + +class ImplicitMemoryCache: + """隐式记忆用户画像缓存类""" + + # Key 前缀 + PREFIX = "cache:memory:implicit_memory" + + @classmethod + def _get_key(cls, *parts: str) -> str: + """生成 Redis key + + Args: + *parts: key 的各个部分 + + Returns: + 完整的 Redis key + """ + return ":".join([cls.PREFIX] + list(parts)) + + @classmethod + async def set_user_profile( + cls, + user_id: str, + profile_data: Dict[str, Any], + expire: int = 86400 + ) -> bool: + """设置用户完整画像缓存 + + Args: + user_id: 用户ID(end_user_id) + profile_data: 画像数据字典,包含: + - preferences: 偏好标签列表 + - portrait: 四维画像对象 + - interest_areas: 兴趣领域分布对象 + - habits: 行为习惯列表 + - generated_at: 生成时间(可选) + expire: 过期时间(秒),默认24小时(86400秒) + + Returns: + 是否设置成功 + """ + try: + key = cls._get_key("profile", user_id) + + # 添加生成时间戳 + if "generated_at" not in profile_data: + profile_data["generated_at"] = datetime.now().isoformat() + + # 添加缓存标记 + profile_data["cached"] = True + + value = json.dumps(profile_data, ensure_ascii=False) + await aio_redis.set(key, value, ex=expire) + logger.info(f"设置用户画像缓存成功: {key}, 过期时间: {expire}秒") + return True + except Exception as e: + logger.error(f"设置用户画像缓存失败: {e}", exc_info=True) + return False + + @classmethod + async def get_user_profile(cls, user_id: str) -> Optional[Dict[str, Any]]: + """获取用户完整画像缓存 + + Args: + user_id: 用户ID(end_user_id) + + Returns: + 画像数据字典,如果不存在或已过期返回 None + """ + try: + key = cls._get_key("profile", user_id) + value = await aio_redis.get(key) + + if value: + data = json.loads(value) + logger.info(f"成功获取用户画像缓存: {key}") + return data + + logger.info(f"用户画像缓存不存在或已过期: {key}") + return None + except Exception as e: + logger.error(f"获取用户画像缓存失败: {e}", exc_info=True) + return None + + @classmethod + async def delete_user_profile(cls, user_id: str) -> bool: + """删除用户完整画像缓存 + + Args: + user_id: 用户ID(end_user_id) + + Returns: + 是否删除成功 + """ + try: + key = cls._get_key("profile", user_id) + result = await aio_redis.delete(key) + logger.info(f"删除用户画像缓存: {key}, 结果: {result}") + return result > 0 + except Exception as e: + logger.error(f"删除用户画像缓存失败: {e}", exc_info=True) + return False + + @classmethod + async def get_profile_ttl(cls, user_id: str) -> int: + """获取用户画像缓存的剩余过期时间 + + Args: + user_id: 用户ID(end_user_id) + + Returns: + 剩余秒数,-1表示永不过期,-2表示key不存在 + """ + try: + key = cls._get_key("profile", user_id) + ttl = await aio_redis.ttl(key) + logger.debug(f"用户画像缓存TTL: {key} = {ttl}秒") + return ttl + except Exception as e: + logger.error(f"获取用户画像缓存TTL失败: {e}") + return -2 diff --git a/api/app/controllers/emotion_controller.py b/api/app/controllers/emotion_controller.py index 7f0cb91b..07e50774 100644 --- a/api/app/controllers/emotion_controller.py +++ b/api/app/controllers/emotion_controller.py @@ -18,6 +18,7 @@ from app.models.user_model import User from app.schemas.emotion_schema import ( EmotionHealthRequest, EmotionSuggestionsRequest, + EmotionGenerateSuggestionsRequest, EmotionTagsRequest, EmotionWordcloudRequest, ) @@ -58,7 +59,7 @@ async def get_emotion_tags( "limit": request.limit } ) - + # 调用服务层 data = await emotion_service.get_emotion_tags( end_user_id=request.group_id, @@ -67,7 +68,7 @@ async def get_emotion_tags( end_date=request.end_date, limit=request.limit ) - + api_logger.info( "情绪标签统计获取成功", extra={ @@ -76,9 +77,9 @@ async def get_emotion_tags( "tags_count": len(data.get("tags", [])) } ) - + return success(data=data, msg="情绪标签获取成功") - + except Exception as e: api_logger.error( f"获取情绪标签统计失败: {str(e)}", @@ -107,14 +108,14 @@ async def get_emotion_wordcloud( "limit": request.limit } ) - + # 调用服务层 data = await emotion_service.get_emotion_wordcloud( end_user_id=request.group_id, emotion_type=request.emotion_type, limit=request.limit ) - + api_logger.info( "情绪词云数据获取成功", extra={ @@ -122,9 +123,9 @@ async def get_emotion_wordcloud( "total_keywords": data.get("total_keywords", 0) } ) - + return success(data=data, msg="情绪词云获取成功") - + except Exception as e: api_logger.error( f"获取情绪词云数据失败: {str(e)}", @@ -151,7 +152,7 @@ async def get_emotion_health( status_code=status.HTTP_400_BAD_REQUEST, detail="时间范围参数无效,必须是 7d、30d 或 90d" ) - + api_logger.info( f"用户 {current_user.username} 请求获取情绪健康指数", extra={ @@ -159,13 +160,13 @@ async def get_emotion_health( "time_range": request.time_range } ) - + # 调用服务层 data = await emotion_service.calculate_emotion_health_index( end_user_id=request.group_id, time_range=request.time_range ) - + api_logger.info( "情绪健康指数获取成功", extra={ @@ -174,9 +175,9 @@ async def get_emotion_health( "level": data.get("level", "未知") } ) - + return success(data=data, msg="情绪健康指数获取成功") - + except HTTPException: raise except Exception as e: @@ -198,15 +199,80 @@ async def get_emotion_suggestions( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """获取个性化情绪建议 - + """获取个性化情绪建议(从缓存读取) + Args: request: 包含 group_id 和可选的 config_id db: 数据库会话 current_user: 当前用户 - + Returns: - 个性化情绪建议响应 + 缓存的个性化情绪建议响应 + """ + try: + api_logger.info( + f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)", + extra={ + "group_id": request.group_id, + "config_id": request.config_id + } + ) + + # 从缓存获取建议 + data = await emotion_service.get_cached_suggestions( + end_user_id=request.group_id, + db=db + ) + + if data is None: + # 缓存不存在或已过期 + api_logger.info( + f"用户 {request.group_id} 的建议缓存不存在或已过期", + extra={"group_id": request.group_id} + ) + return fail( + BizCode.NOT_FOUND, + "建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议", + "" + ) + + api_logger.info( + "个性化建议获取成功(缓存)", + extra={ + "group_id": request.group_id, + "suggestions_count": len(data.get("suggestions", [])) + } + ) + + return success(data=data, msg="个性化建议获取成功(缓存)") + + except Exception as e: + api_logger.error( + f"获取个性化建议失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取个性化建议失败: {str(e)}" + ) + + +@router.post("/generate_suggestions", response_model=ApiResponse) +async def generate_emotion_suggestions( + request: EmotionGenerateSuggestionsRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """生成个性化情绪建议(调用LLM并缓存) + + Args: + request: 包含 group_id、可选的 config_id 和 force_refresh + db: 数据库会话 + current_user: 当前用户 + + Returns: + 新生成的个性化情绪建议响应 """ try: # 验证 config_id(如果提供) @@ -232,38 +298,46 @@ async def get_emotion_suggestions( return fail(BizCode.INVALID_PARAMETER, "配置ID无效", f"配置 {config_id} 不存在") except Exception as e: return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e)) - + api_logger.info( - f"用户 {current_user.username} 请求获取个性化情绪建议", + f"用户 {current_user.username} 请求生成个性化情绪建议", extra={ "group_id": request.group_id, "config_id": config_id } ) - - # 调用服务层 + + # 调用服务层生成建议 data = await emotion_service.generate_emotion_suggestions( end_user_id=request.group_id, db=db ) - + + # 保存到缓存 + await emotion_service.save_suggestions_cache( + end_user_id=request.group_id, + suggestions_data=data, + db=db, + expires_hours=24 + ) + api_logger.info( - "个性化建议获取成功", + "个性化建议生成成功", extra={ "group_id": request.group_id, "suggestions_count": len(data.get("suggestions", [])) } ) - - return success(data=data, msg="个性化建议获取成功") - + + return success(data=data, msg="个性化建议生成成功") + except Exception as e: api_logger.error( - f"获取个性化建议失败: {str(e)}", + f"生成个性化建议失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"获取个性化建议失败: {str(e)}" + detail=f"生成个性化建议失败: {str(e)}" ) diff --git a/api/app/controllers/home_page_controller.py b/api/app/controllers/home_page_controller.py index 0b758cd1..de4a78a3 100644 --- a/api/app/controllers/home_page_controller.py +++ b/api/app/controllers/home_page_controller.py @@ -33,5 +33,12 @@ def get_workspace_list( def get_system_version(): """获取系统版本号+说明""" current_version = settings.SYSTEM_VERSION - version_introduction = HomePageService.load_version_introduction(current_version) - return success(data={"version": current_version, "introduction": version_introduction}, msg="系统版本获取成功") \ No newline at end of file + version_info = HomePageService.load_version_introduction(current_version) + return success( + data={ + "version": current_version, + "introduction": version_info.get("introduction"), + "introduction_en": version_info.get("introduction_en") + }, + msg="系统版本获取成功" + ) \ No newline at end of file diff --git a/api/app/controllers/implicit_memory_controller.py b/api/app/controllers/implicit_memory_controller.py index 6ef39929..62d1e428 100644 --- a/api/app/controllers/implicit_memory_controller.py +++ b/api/app/controllers/implicit_memory_controller.py @@ -11,6 +11,7 @@ from app.dependencies import ( ) from app.models.user_model import User from app.schemas.response_schema import ApiResponse +from app.schemas.implicit_memory_schema import GenerateProfileRequest from app.services.implicit_memory_service import ImplicitMemoryService from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session @@ -133,7 +134,7 @@ async def get_preference_tags( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user preference tags with filtering options. + Get user preference tags from cache. Args: user_id: Target user ID @@ -143,35 +144,56 @@ async def get_preference_tags( end_date: Optional end date filter Returns: - List of preference tags matching the filters + List of preference tags from cache """ - api_logger.info(f"Preference tags requested for user: {user_id}") + api_logger.info(f"Preference tags requested for user: {user_id} (from cache)") try: # Validate inputs validate_user_id(user_id) - validate_confidence_threshold(confidence_threshold) - validate_date_range(start_date, end_date) # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - # Build date range - date_range = None - if start_date and end_date: - from app.schemas.implicit_memory_schema import DateRange - date_range = DateRange(start_date=start_date, end_date=end_date) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - # Get preference tags - tags = await service.get_preference_tags( - user_id=user_id, - confidence_threshold=confidence_threshold, - tag_category=tag_category, - date_range=date_range - ) + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + "" + ) - api_logger.info(f"Retrieved {len(tags)} preference tags for user: {user_id}") - return success(data=[tag.model_dump(mode='json') for tag in tags], msg="偏好标签获取成功") + # Extract preferences from cache + preferences = cached_profile.get("preferences", []) + + # Apply filters (client-side filtering on cached data) + filtered_preferences = [] + for pref in preferences: + # Filter by confidence threshold + if confidence_threshold is not None and pref.get("confidence_score", 0) < confidence_threshold: + continue + + # Filter by category if specified + if tag_category and pref.get("category") != tag_category: + continue + + # Filter by date range if specified + if start_date or end_date: + created_at_ts = pref.get("created_at") + if created_at_ts: + created_at = datetime.fromtimestamp(created_at_ts / 1000) + if start_date and created_at < start_date: + continue + if end_date and created_at > end_date: + continue + + filtered_preferences.append(pref) + + api_logger.info(f"Retrieved {len(filtered_preferences)} preference tags for user: {user_id} (from cache)") + return success(data=filtered_preferences, msg="偏好标签获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "偏好标签获取", user_id) @@ -186,16 +208,16 @@ async def get_dimension_portrait( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's four-dimension personality portrait. + Get user's four-dimension personality portrait from cache. Args: user_id: Target user ID - include_history: Whether to include historical trend data + include_history: Whether to include historical trend data (ignored for cached data) Returns: - Four-dimension personality portrait with scores and evidence + Four-dimension personality portrait from cache """ - api_logger.info(f"Dimension portrait requested for user: {user_id}") + api_logger.info(f"Dimension portrait requested for user: {user_id} (from cache)") try: # Validate inputs @@ -204,13 +226,22 @@ async def get_dimension_portrait( # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - portrait = await service.get_dimension_portrait( - user_id=user_id, - include_history=include_history - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Dimension portrait retrieved for user: {user_id}") - return success(data=portrait.model_dump(mode='json'), msg="四维画像获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + "" + ) + + # Extract portrait from cache + portrait = cached_profile.get("portrait", {}) + + api_logger.info(f"Dimension portrait retrieved for user: {user_id} (from cache)") + return success(data=portrait, msg="四维画像获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "四维画像获取", user_id) @@ -225,16 +256,16 @@ async def get_interest_area_distribution( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's interest area distribution across four areas. + Get user's interest area distribution from cache. Args: user_id: Target user ID - include_trends: Whether to include trend analysis data + include_trends: Whether to include trend analysis data (ignored for cached data) Returns: - Interest area distribution with percentages and evidence + Interest area distribution from cache """ - api_logger.info(f"Interest area distribution requested for user: {user_id}") + api_logger.info(f"Interest area distribution requested for user: {user_id} (from cache)") try: # Validate inputs @@ -243,13 +274,22 @@ async def get_interest_area_distribution( # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - distribution = await service.get_interest_area_distribution( - user_id=user_id, - include_trends=include_trends - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Interest area distribution retrieved for user: {user_id}") - return success(data=distribution.model_dump(mode='json'), msg="兴趣领域分布获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + "" + ) + + # Extract interest areas from cache + interest_areas = cached_profile.get("interest_areas", {}) + + api_logger.info(f"Interest area distribution retrieved for user: {user_id} (from cache)") + return success(data=interest_areas, msg="兴趣领域分布获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id) @@ -266,7 +306,7 @@ async def get_behavior_habits( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's behavioral habits with filtering options. + Get user's behavioral habits from cache. Args: user_id: Target user ID @@ -275,38 +315,117 @@ async def get_behavior_habits( time_period: Filter by time period (current, past) Returns: - List of behavioral habits matching the filters + List of behavioral habits from cache """ - api_logger.info(f"Behavior habits requested for user: {user_id}") + api_logger.info(f"Behavior habits requested for user: {user_id} (from cache)") try: # Validate inputs validate_user_id(user_id) - # Convert string confidence level to numerical - numerical_confidence = None - if confidence_level: - confidence_mapping = { - "high": 85, - "medium": 50, - "low": 20 - } - numerical_confidence = confidence_mapping.get(confidence_level.lower()) - # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - habits = await service.get_behavior_habits( - user_id=user_id, - confidence_level=numerical_confidence, - frequency_pattern=frequency_pattern, - time_period=time_period - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Retrieved {len(habits)} behavior habits for user: {user_id}") - return success(data=[habit.model_dump(mode='json') for habit in habits], msg="行为习惯获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + "" + ) + + # Extract habits from cache + habits = cached_profile.get("habits", []) + + # Apply filters (client-side filtering on cached data) + filtered_habits = [] + for habit in habits: + # Filter by confidence level + if confidence_level: + confidence_mapping = { + "high": 85, + "medium": 50, + "low": 20 + } + numerical_confidence = confidence_mapping.get(confidence_level.lower()) + if habit.get("confidence_level", 0) < numerical_confidence: + continue + + # Filter by frequency pattern + if frequency_pattern and habit.get("frequency_pattern") != frequency_pattern: + continue + + # Filter by time period + if time_period: + is_current = habit.get("is_current", True) + if time_period.lower() == "current" and not is_current: + continue + elif time_period.lower() == "past" and is_current: + continue + + filtered_habits.append(habit) + + api_logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user: {user_id} (from cache)") + return success(data=filtered_habits, msg="行为习惯获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "行为习惯获取", user_id) + + + +@router.post("/generate_profile", response_model=ApiResponse) +@cur_workspace_access_guard() +async def generate_implicit_memory_profile( + request: GenerateProfileRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +) -> ApiResponse: + """ + Generate complete user profile (all 4 modules) and cache it. + + Args: + request: Generate profile request with end_user_id + db: Database session + current_user: Current authenticated user + + Returns: + Complete user profile with all modules + """ + end_user_id = request.end_user_id + api_logger.info(f"Generate profile requested for user: {end_user_id}") + + try: + # Validate inputs + validate_user_id(end_user_id) + + # Create service with user-specific config + service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + + # Generate complete profile (calls LLM for all 4 modules) + api_logger.info(f"开始生成完整用户画像: user={end_user_id}") + profile_data = await service.generate_complete_profile(user_id=end_user_id) + + # Save to cache + await service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db, + expires_hours=168 # 7 days + ) + + api_logger.info(f"用户画像生成并缓存成功: user={end_user_id}") + + # Add metadata + profile_data["end_user_id"] = end_user_id + profile_data["cached"] = False + + return success(data=profile_data, msg="用户画像生成成功") + + except Exception as e: + api_logger.error(f"生成用户画像失败: user={end_user_id}, error={str(e)}", exc_info=True) + return handle_implicit_memory_error(e, "用户画像生成", end_user_id) diff --git a/api/app/controllers/public_share_controller.py b/api/app/controllers/public_share_controller.py index 04da05df..17ad70a7 100644 --- a/api/app/controllers/public_share_controller.py +++ b/api/app/controllers/public_share_controller.py @@ -8,9 +8,10 @@ from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger from app.core.response_utils import success -from app.db import get_db +from app.db import get_db, get_db_read from app.dependencies import get_share_user_id, ShareTokenData from app.repositories import knowledge_repository +from app.repositories.workflow_repository import WorkflowConfigRepository from app.schemas import release_share_schema, conversation_schema from app.schemas.response_schema import PageData, PageMeta from app.services import workspace_service @@ -19,7 +20,8 @@ from app.services.conversation_service import ConversationService from app.services.release_share_service import ReleaseShareService from app.services.shared_chat_service import SharedChatService from app.services.app_chat_service import AppChatService, get_app_chat_service -from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release +from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, \ + agent_config_4_app_release, multi_agent_config_4_app_release router = APIRouter(prefix="/public/share", tags=["Public Share"]) logger = get_business_logger() @@ -65,10 +67,10 @@ def get_or_generate_user_id(payload_user_id: str, request: Request) -> str: summary="获取访问 token" ) def get_access_token( - share_token: str, - payload: release_share_schema.TokenRequest, - request: Request, - db: Session = Depends(get_db), + share_token: str, + payload: release_share_schema.TokenRequest, + request: Request, + db: Session = Depends(get_db), ): """获取访问 token @@ -113,9 +115,9 @@ def get_access_token( response_model=None ) def get_shared_release( - password: str = Query(None, description="访问密码(如果需要)"), - share_data: ShareTokenData = Depends(get_share_user_id), - db: Session = Depends(get_db), + password: str = Query(None, description="访问密码(如果需要)"), + share_data: ShareTokenData = Depends(get_share_user_id), + db: Session = Depends(get_db), ): """获取公开分享的发布版本信息 @@ -137,9 +139,9 @@ def get_shared_release( summary="验证访问密码" ) def verify_password( - payload: release_share_schema.PasswordVerifyRequest, - share_data: ShareTokenData = Depends(get_share_user_id), - db: Session = Depends(get_db), + payload: release_share_schema.PasswordVerifyRequest, + share_data: ShareTokenData = Depends(get_share_user_id), + db: Session = Depends(get_db), ): """验证分享的访问密码 @@ -159,11 +161,11 @@ def verify_password( summary="获取嵌入代码" ) def get_embed_code( - width: str = Query("100%", description="iframe 宽度"), - height: str = Query("600px", description="iframe 高度"), - request: Request = None, - share_data: ShareTokenData = Depends(get_share_user_id), - db: Session = Depends(get_db), + width: str = Query("100%", description="iframe 宽度"), + height: str = Query("600px", description="iframe 高度"), + request: Request = None, + share_data: ShareTokenData = Depends(get_share_user_id), + db: Session = Depends(get_db), ): """获取嵌入代码 @@ -183,7 +185,6 @@ def get_embed_code( return success(data=embed_code) - # ---------- 会话管理接口 ---------- @router.get( @@ -191,11 +192,11 @@ def get_embed_code( summary="获取会话列表" ) def list_conversations( - password: str = Query(None, description="访问密码"), - page: int = Query(1, ge=1), - pagesize: int = Query(20, ge=1, le=100), - share_data: ShareTokenData = Depends(get_share_user_id), - db: Session = Depends(get_db), + password: str = Query(None, description="访问密码"), + page: int = Query(1, ge=1), + pagesize: int = Query(20, ge=1, le=100), + share_data: ShareTokenData = Depends(get_share_user_id), + db: Session = Depends(get_db), ): """获取分享应用的会话列表 @@ -209,9 +210,9 @@ def list_conversations( from app.repositories.end_user_repository import EndUserRepository end_user_repo = EndUserRepository(db) new_end_user = end_user_repo.get_or_create_end_user( - app_id=share.app_id, - other_id=other_id - ) + app_id=share.app_id, + other_id=other_id + ) logger.debug(new_end_user.id) service = SharedChatService(db) conversations, total = service.list_conversations( @@ -233,10 +234,10 @@ def list_conversations( summary="获取会话详情(含消息)" ) def get_conversation( - conversation_id: uuid.UUID, - password: str = Query(None, description="访问密码"), - share_data: ShareTokenData = Depends(get_share_user_id), - db: Session = Depends(get_db), + conversation_id: uuid.UUID, + password: str = Query(None, description="访问密码"), + share_data: ShareTokenData = Depends(get_share_user_id), + db: Session = Depends(get_db), ): """获取会话详情和消息历史""" chat_service = SharedChatService(db) @@ -266,10 +267,10 @@ def get_conversation( summary="发送消息(支持流式和非流式)" ) async def chat( - payload: conversation_schema.ChatRequest, - share_data: ShareTokenData = Depends(get_share_user_id), - db: Session = Depends(get_db), - app_chat_service: Annotated[AppChatService, Depends(get_app_chat_service)] = None, + payload: conversation_schema.ChatRequest, + share_data: ShareTokenData = Depends(get_share_user_id), + db: Session = Depends(get_db), + app_chat_service: Annotated[AppChatService, Depends(get_app_chat_service)] = None, ): """发送消息并获取回复 @@ -313,7 +314,7 @@ async def chat( ) end_user_id = str(new_end_user.id) - appid=share.app_id + appid = share.app_id """获取存储类型和工作空间的ID""" # 直接通过 SQLAlchemy 查询 app @@ -425,16 +426,16 @@ async def chat( # ) async def event_generator(): async for event in app_chat_service.agnet_chat_stream( - message=payload.message, - conversation_id=conversation.id, # 使用已创建的会话 ID - user_id= str(new_end_user.id), # 转换为字符串 - variables=payload.variables, - web_search=payload.web_search, - config=agent_config, - memory=payload.memory, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id, - workspace_id=workspace_id + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=str(new_end_user.id), # 转换为字符串 + variables=payload.variables, + web_search=payload.web_search, + config=agent_config, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + workspace_id=workspace_id ): yield event @@ -481,15 +482,15 @@ async def chat( async def event_generator(): async for event in app_chat_service.multi_agent_chat_stream( - message=payload.message, - conversation_id=conversation.id, # 使用已创建的会话 ID - user_id=str(new_end_user.id), # 转换为字符串 - variables=payload.variables, - config=config, - web_search=payload.web_search, - memory=payload.memory, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=str(new_end_user.id), # 转换为字符串 + variables=payload.variables, + config=config, + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id ): yield event @@ -561,24 +562,27 @@ async def chat( # return success(data=conversation_schema.ChatResponse(**result)) elif app_type == AppType.WORKFLOW: - config = workflow_config_4_app_release(release) + if not config.id: + with get_db_read() as db: + source_config = WorkflowConfigRepository(db).get_by_app_id(release.app_id) + config.id = source_config.id + config.id = uuid.UUID(config.id) if payload.stream: async def event_generator(): - async for event in app_chat_service.workflow_chat_stream( - - message=payload.message, - conversation_id=conversation.id, # 使用已创建的会话 ID - user_id=end_user_id, # 转换为字符串 - variables=payload.variables, - config=config, - web_search=payload.web_search, - memory=payload.memory, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id, - app_id=release.app_id, - workspace_id=workspace_id + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=end_user_id, # 转换为字符串 + variables=payload.variables, + config=config, + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + app_id=release.app_id, + workspace_id=workspace_id, + release_id=release.id ): event_type = event.get("event", "message") event_data = event.get("data", {}) @@ -610,7 +614,8 @@ async def chat( storage_type=storage_type, user_rag_memory_id=user_rag_memory_id, app_id=release.app_id, - workspace_id=workspace_id + workspace_id=workspace_id, + release_id=release.id ) logger.debug( "工作流试运行返回结果", diff --git a/api/app/controllers/service/app_api_controller.py b/api/app/controllers/service/app_api_controller.py index 583b4700..677e1623 100644 --- a/api/app/controllers/service/app_api_controller.py +++ b/api/app/controllers/service/app_api_controller.py @@ -242,8 +242,9 @@ async def chat( memory=payload.memory, storage_type=storage_type, user_rag_memory_id=user_rag_memory_id, - app_id=app.app_id, - workspace_id=workspace_id + app_id=app.id, + workspace_id=workspace_id, + release_id=app.current_release.id, ): event_type = event.get("event", "message") event_data = event.get("data", {}) @@ -274,8 +275,9 @@ async def chat( memory=payload.memory, storage_type=storage_type, user_rag_memory_id=user_rag_memory_id, - app_id=app.app_id, - workspace_id=workspace_id + app_id=app.id, + workspace_id=workspace_id, + release_id=app.current_release.id ) logger.debug( "工作流试运行返回结果", diff --git a/api/app/core/config.py b/api/app/core/config.py index 5f4f91c4..01983457 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -38,6 +38,7 @@ class Settings: REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379")) REDIS_DB: int = int(os.getenv("REDIS_DB", "1")) REDIS_PASSWORD: str = os.getenv("REDIS_PASSWORD", "") + # ElasticSearch configuration ELASTICSEARCH_HOST: str = os.getenv("ELASTICSEARCH_HOST", "https://127.0.0.1") diff --git a/api/app/core/rag/llm/cv_model.py b/api/app/core/rag/llm/cv_model.py index 24d4a35b..4207304a 100644 --- a/api/app/core/rag/llm/cv_model.py +++ b/api/app/core/rag/llm/cv_model.py @@ -243,6 +243,33 @@ class QWenCV(GptV4): tmp_path = tmp.name video_path = f"file://{tmp_path}" + prompt_ch = """ + 你是一名专业的视频转录助手,能够将视频文件的内容转写为文本,并**精确标记每句话或每个段落对应的时间戳**(开始时间-结束时间)。\n + **任务要求**: + 1.输入是MP4等视频文件,解析带时间戳的文本。 + 2.时间戳格式为 `[HH:MM:SS.mmm]`(毫秒可选),例如 `[00:01:23.456]`。 + 3.时间戳需尽可能贴近实际视频的起止时间,误差不超过1秒。 + 4.如果无法确定具体时间,请根据上下文合理估算。 + 5.最后总结:这段视频的内容是什么?,并用恰当的句子总结这个视频。 + + **示例输出**: + [00:00:00.000] 今天天气真好, + [00:00:02.500] 我们一起去公园散步吧。 + [00:00:05.800] 公园里的花开得非常漂亮。 + 这段视频的内容是关于如何在CREAMS系统中进行楼宇管理集合的编辑或删除操作。视频演示了 ...""" + prompt_en = """ + You are a professional video transcription assistant, capable of transcribing the content of video files into text and **precisely marking the timestamp (start time-end time) corresponding to each sentence or paragraph**. + **Task requirements**: + 1. Input is MP4 or other video files, and parse the text with timestamps. + 2. The timestamp format is `[HH:MM:SS.mmm]` (milliseconds are optional), for example, `[00:01:23.456]`. + 3. The timestamp should be as close as possible to the actual start and end time of the video, with an error not exceeding 1 second. + 4. If the specific time cannot be determined, please make a reasonable estimation based on the context. + 5. Final summary: What is the content of this video? Summarize this video in an appropriate sentence. + + **Example output**: + [00:00:00.000] The weather is really nice today, [00:00:02.500] let's go for a walk in the park together. + [00:00:05.800] The flowers in the park are blooming beautifully. + The content of this video is about how to edit or delete building management collections in the CREAMS system. The video demonstrates ..""" messages = [ { "role": "user", @@ -252,7 +279,7 @@ class QWenCV(GptV4): "fps": 2, }, { - "text": "视频的内容是什么?,并且,请用恰当的句子总结这个视频。" if self.lang.lower() == "chinese" else "What is the content of the video? And please summarize this video in proper sentences.", + "text": prompt_ch if self.lang.lower() == "chinese" else prompt_en, }, ], } diff --git a/api/app/core/rag/llm/sequence2txt_model.py b/api/app/core/rag/llm/sequence2txt_model.py index be4d3649..468dda55 100644 --- a/api/app/core/rag/llm/sequence2txt_model.py +++ b/api/app/core/rag/llm/sequence2txt_model.py @@ -60,6 +60,34 @@ class QWenSeq2txt(Base): from dashscope import MultiModalConversation audio_path = f"file://{audio_path}" + prompt_ch = """ + 你是一名专业的音频转录助手,能够将MP3音频文件的内容转写为文本,并**精确标记每句话或每个段落对应的时间戳**(开始时间-结束时间)。\n + **任务要求**: + 1.输入是MP3,解析带时间戳的文本。 + 2.时间戳格式为 `[HH:MM:SS.mmm]`(毫秒可选),例如 `[00:01:23.456]`。 + 3.时间戳需尽可能贴近实际语音的起止时间,误差不超过1秒。 + 4.如果无法确定具体时间,请根据上下文合理估算。 + 5.最后总结:这段音频在说什么? + + **示例输出**: + [00:00:00.000] 今天天气真好, + [00:00:02.500] 我们一起去公园散步吧。 + [00:00:05.800] 公园里的花开得非常漂亮。 + 这段音频讲述的是一个关于**“吃水不忘挖井人”**的感人故事,主 ...""" + prompt_en = """ + You are a professional audio transcription assistant, capable of transcribing the content of MP3 audio files into text and **precisely marking the timestamps (start time - end time) corresponding to each sentence or paragraph**. + **Task requirements**: + 1. Input is MP3, parse text with timestamps. + 2. The timestamp format is `[HH:MM:SS.mmm]` (milliseconds are optional), for example, `[00:01:23.456]`. + 3. The timestamp should be as close as possible to the actual start and end time of the voice, with an error not exceeding 1 second. + 4. If a specific time cannot be determined, please make a reasonable estimation based on the context. + 5. Final summary: What is this audio talking about? + + **Example Output**: + [00:00:00.000] The weather is really nice today, + [00:00:02.500] let's go for a walk in the park together. + [00:00:05.800] The flowers in the park are blooming beautifully. + This audio tells a touching story about **"Remembering the one who dug the well when drinking water"** ..""" messages = [ { "role": "user", @@ -68,7 +96,7 @@ class QWenSeq2txt(Base): "audio": audio_path }, { - "text": "这段音频在说什么?" if self.lang.lower() == "chinese" else "What is this audio saying?", + "text": prompt_ch if self.lang.lower() == "chinese" else prompt_en, }, ], } diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index c048f447..ad03fec1 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -8,6 +8,7 @@ import logging import uuid from typing import Any +from langchain_core.runnables import RunnableConfig from langgraph.graph.state import CompiledStateGraph from app.core.workflow.graph_builder import GraphBuilder @@ -53,11 +54,11 @@ class WorkflowExecutor: self.edges = workflow_config.get("edges", []) self.execution_config = workflow_config.get("execution_config", {}) - self.checkpoint_config = { - "configurable": { + self.checkpoint_config = RunnableConfig( + configurable={ "thread_id": uuid.uuid4(), } - } + ) def _prepare_initial_state(self, input_data: dict[str, Any]) -> WorkflowState: """准备初始状态(注入系统变量和会话变量) @@ -214,13 +215,13 @@ class WorkflowExecutor: return { "status": "completed", "output": final_output, + "variables": result.get("variables", {}), "node_outputs": node_outputs, "messages": result.get("messages", []), "conversation_id": conversation_id, "elapsed_time": elapsed_time, "token_usage": token_usage, "error": result.get("error"), - "variables": result.get("variables", {}), } def build_graph(self, stream=False) -> CompiledStateGraph: @@ -326,11 +327,10 @@ class WorkflowExecutor: } # 1. 构建图 - graph = self.build_graph(True) + graph = self.build_graph(stream=True) # 2. 初始化状态(自动注入系统变量) initial_state = self._prepare_initial_state(input_data) - # 3. Execute workflow try: chunk_count = 0 @@ -346,14 +346,16 @@ class WorkflowExecutor: mode, data = event else: # Unexpected format, log and skip - logger.warning(f"[STREAM] Unexpected event format: {type(event)}, value: {event}") + logger.warning(f"[STREAM] Unexpected event format: {type(event)}, value: {event}" + f"- execution_id: {self.execution_id}") continue if mode == "custom": # Handle custom streaming events (chunks from nodes via stream writer) chunk_count += 1 event_type = data.get("type", "node_chunk") # "message" or "node_chunk" - logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}") + logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}" + f"- execution_id: {self.execution_id}") yield { "event": event_type, # "message" or "node_chunk" "data": { @@ -380,7 +382,8 @@ class WorkflowExecutor: variables_sys = variables.get("sys", {}) conversation_id = input_data.get("conversation_id") execution_id = variables_sys.get("execution_id") - logger.info(f"[DEBUG] Node starts execution: {node_name}") + logger.info(f"[NODE-START] Node starts execution: {node_name} " + f"- execution_id: {self.execution_id}") yield { "event": "node_start", @@ -399,7 +402,8 @@ class WorkflowExecutor: variables_sys = variables.get("sys", {}) conversation_id = input_data.get("conversation_id") execution_id = variables_sys.get("execution_id") - logger.info(f"[DEBUG] Node execution completed: {node_name}") + logger.info(f"[NODE-END] Node execution completed: {node_name} " + f"- execution_id: {self.execution_id}") yield { "event": "node_end", @@ -407,13 +411,15 @@ class WorkflowExecutor: "node_id": node_name, "conversation_id": conversation_id, "execution_id": execution_id, - "timestamp": data.get("timestamp") + "timestamp": data.get("timestamp"), + "state": result.get("node_outputs", {}).get(node_name), } } elif mode == "updates": # Handle state updates - store final state - logger.debug(f"[UPDATES] 收到 state 更新 from {list(data.keys())}") + logger.debug(f"[UPDATES] 收到 state 更新 from {list(data.keys())} " + f"- execution_id: {self.execution_id}") # 计算耗时 end_time = datetime.datetime.now() @@ -421,7 +427,7 @@ class WorkflowExecutor: result = graph.get_state(self.checkpoint_config).values logger.info( f"Workflow execution completed (streaming), " - f"total chunks: {chunk_count}, elapsed: {elapsed_time:.2f}s" + f"total chunks: {chunk_count}, elapsed: {elapsed_time:.2f}s, execution_id: {self.execution_id}" ) # 发送 workflow_end 事件 @@ -449,7 +455,8 @@ class WorkflowExecutor: } } - def _extract_final_output(self, node_outputs: dict[str, Any]) -> str | None: + @staticmethod + def _extract_final_output(node_outputs: dict[str, Any]) -> str | None: """从节点输出中提取最终输出 优先级: @@ -473,7 +480,8 @@ class WorkflowExecutor: return None - def _aggregate_token_usage(self, node_outputs: dict[str, Any]) -> dict[str, int] | None: + @staticmethod + def _aggregate_token_usage(node_outputs: dict[str, Any]) -> dict[str, int] | None: """聚合所有节点的 token 使用情况 Args: diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 72fd0bb5..b31213d8 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -25,7 +25,7 @@ class WorkflowState(TypedDict): The state object passed between nodes in a workflow, containing messages, variables, node outputs, etc. """ # List of messages (append mode) - messages: list[dict[str, str]] + messages: Annotated[list[dict[str, str]], lambda x, y: y] # Set of loop node IDs, used for assigning values in loop nodes cycle_nodes: list diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 4ae8e118..66c3a700 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -21,6 +21,7 @@ class IterationRuntime: optional parallel execution, flattening of output, and loop control via the workflow state. """ + def __init__( self, graph: CompiledStateGraph, @@ -87,6 +88,7 @@ class IterationRuntime: self.result.append(output) if not result["looping"]: self.looping = False + return result def _create_iteration_tasks(self, array_obj, idx): """ @@ -124,7 +126,7 @@ class IterationRuntime: array_obj = VariablePool(self.state).get(input_expression) if not isinstance(array_obj, list): raise RuntimeError("Cannot iterate over a non-list variable") - + child_state = [] idx = 0 if self.typed_config.parallel: # Execute iterations in parallel batches @@ -132,15 +134,14 @@ class IterationRuntime: tasks = self._create_iteration_tasks(array_obj, idx) logger.info(f"Iteration node {self.node_id}: running, concurrency {len(tasks)}") idx += self.typed_config.parallel_count - await asyncio.gather(*tasks) - logger.info(f"Iteration node {self.node_id}: execution completed") - return self.result + child_state.extend(await asyncio.gather(*tasks)) else: # Execute iterations sequentially while idx < len(array_obj) and self.looping: logger.info(f"Iteration node {self.node_id}: running") item = array_obj[idx] result = await self.graph.ainvoke(self._init_iteration_state(item, idx)) + child_state.append(result) output = VariablePool(result).get(self.output_value) if isinstance(output, list) and self.typed_config.flatten: self.result.extend(output) @@ -150,5 +151,8 @@ class IterationRuntime: self.looping = False idx += 1 - logger.info(f"Iteration node {self.node_id}: execution completed") - return self.result + logger.info(f"Iteration node {self.node_id}: execution completed") + return { + "output": self.result, + "__child_state": child_state + } diff --git a/api/app/core/workflow/nodes/cycle_graph/loop.py b/api/app/core/workflow/nodes/cycle_graph/loop.py index 2e2ab4fb..38d4b21c 100644 --- a/api/app/core/workflow/nodes/cycle_graph/loop.py +++ b/api/app/core/workflow/nodes/cycle_graph/loop.py @@ -67,7 +67,9 @@ class LoopRuntime: variables=pool.get_all_conversation_vars(), node_outputs=pool.get_all_node_outputs(), system_vars=pool.get_all_system_vars(), - ) if variable.input_type == ValueInputType.VARIABLE else TypeTransformer.transform(variable.value, variable.type) + ) + if variable.input_type == ValueInputType.VARIABLE + else TypeTransformer.transform(variable.value, variable.type) for variable in self.typed_config.cycle_vars } self.state["node_outputs"][self.node_id] = { @@ -76,7 +78,9 @@ class LoopRuntime: variables=pool.get_all_conversation_vars(), node_outputs=pool.get_all_node_outputs(), system_vars=pool.get_all_system_vars(), - ) if variable.input_type == ValueInputType.VARIABLE else TypeTransformer.transform(variable.value, variable.type) + ) + if variable.input_type == ValueInputType.VARIABLE + else TypeTransformer.transform(variable.value, variable.type) for variable in self.typed_config.cycle_vars } loopstate = WorkflowState( @@ -171,10 +175,11 @@ class LoopRuntime: """ loopstate = self._init_loop_state() loop_time = self.typed_config.max_loop + child_state = [] while self.evaluate_conditional(loopstate) and loopstate["looping"] and loop_time > 0: logger.info(f"loop node {self.node_id}: running") - await self.graph.ainvoke(loopstate) + child_state.append(await self.graph.ainvoke(loopstate)) loop_time -= 1 logger.info(f"loop node {self.node_id}: execution completed") - return loopstate["runtime_vars"][self.node_id] + return loopstate["runtime_vars"][self.node_id] | {"__child_state": child_state} diff --git a/api/app/core/workflow/nodes/knowledge/node.py b/api/app/core/workflow/nodes/knowledge/node.py index 221ca079..997135f3 100644 --- a/api/app/core/workflow/nodes/knowledge/node.py +++ b/api/app/core/workflow/nodes/knowledge/node.py @@ -10,9 +10,8 @@ from app.core.workflow.nodes.base_node import BaseNode, WorkflowState from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNodeConfig from app.db import get_db_read from app.models import knowledge_model, knowledgeshare_model, ModelType -from app.repositories import knowledge_repository +from app.repositories import knowledge_repository, knowledgeshare_repository from app.schemas.chunk_schema import RetrieveType -from app.services import knowledge_service, knowledgeshare_service from app.services.model_service import ModelConfigService logger = logging.getLogger(__name__) @@ -96,7 +95,7 @@ class KnowledgeRetrievalNode(BaseNode): filters = self._build_kb_filter(kb_ids, knowledge_model.PermissionType.Share) - share_ids = knowledge_service.knowledge_repository.get_chunked_knowledgeids( + share_ids = knowledge_repository.get_chunked_knowledgeids( db=db, filters=filters ) @@ -105,7 +104,7 @@ class KnowledgeRetrievalNode(BaseNode): filters = [ knowledgeshare_model.KnowledgeShare.target_kb_id.in_(kb_ids) ] - items = knowledgeshare_service.knowledgeshare_repository.get_source_kb_ids_by_target_kb_id( + items = knowledgeshare_repository.get_source_kb_ids_by_target_kb_id( db=db, filters=filters ) diff --git a/api/app/core/workflow/nodes/llm/config.py b/api/app/core/workflow/nodes/llm/config.py index f65d5879..265724f3 100644 --- a/api/app/core/workflow/nodes/llm/config.py +++ b/api/app/core/workflow/nodes/llm/config.py @@ -66,7 +66,7 @@ class LLMNodeConfig(BaseNodeConfig): ) memory: MemoryWindowSetting = Field( - ..., + default_factory=MemoryWindowSetting, description="对话上下文窗口" ) diff --git a/api/app/core/workflow/nodes/llm/node.py b/api/app/core/workflow/nodes/llm/node.py index e25bd35d..a74e0b60 100644 --- a/api/app/core/workflow/nodes/llm/node.py +++ b/api/app/core/workflow/nodes/llm/node.py @@ -85,6 +85,7 @@ class LLMNode(BaseNode): """ # 1. 处理消息格式(优先使用 messages) + self.typed_config = LLMNodeConfig(**self.config) messages_config = self.typed_config.messages if messages_config: @@ -167,7 +168,7 @@ class LLMNode(BaseNode): Returns: LLM 响应消息 """ - self.typed_config = LLMNodeConfig(**self.config) + # self.typed_config = LLMNodeConfig(**self.config) llm, prompt_or_messages = self._prepare_llm(state, True) logger.info(f"节点 {self.node_id} 开始执行 LLM 调用(非流式)") @@ -269,12 +270,16 @@ class LLMNode(BaseNode): chunk_count = 0 # 调用 LLM(流式,支持字符串或消息列表) - async for chunk in llm.astream(prompt_or_messages): + last_meta_data = {} + async for chunk in llm.astream(prompt_or_messages, stream_usage=True): # 提取内容 if hasattr(chunk, 'content'): content = chunk.content else: content = str(chunk) + if hasattr(chunk, 'response_metadata'): + if chunk.response_metadata: + last_meta_data = chunk.response_metadata # 只有当内容不为空时才处理 if content: @@ -288,13 +293,10 @@ class LLMNode(BaseNode): logger.info(f"节点 {self.node_id} LLM 调用完成,输出长度: {len(full_response)}, 总 chunks: {chunk_count}") # 构建完整的 AIMessage(包含元数据) - if isinstance(last_chunk, AIMessage): - final_message = AIMessage( - content=full_response, - response_metadata=last_chunk.response_metadata if hasattr(last_chunk, 'response_metadata') else {} - ) - else: - final_message = AIMessage(content=full_response) + final_message = AIMessage( + content=full_response, + response_metadata=last_meta_data + ) # yield 完成标记 yield {"__final__": True, "result": final_message} diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index 189876a5..81cc6ead 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -76,5 +76,5 @@ __all__ = [ "ToolStatus", "AuthType", "ExecutionStatus", - "MemoryPerceptualModel" + "MemoryPerceptualModel", ] diff --git a/api/app/models/workflow_model.py b/api/app/models/workflow_model.py index d599f717..4f9ffe68 100644 --- a/api/app/models/workflow_model.py +++ b/api/app/models/workflow_model.py @@ -75,6 +75,14 @@ class WorkflowExecution(Base): nullable=False, index=True ) + + release_id = Column( + UUID(as_uuid=True), + ForeignKey("app_releases.id", ondelete="CASCADE"), + nullable=True, + index=True + ) + app_id = Column( UUID(as_uuid=True), ForeignKey("apps.id", ondelete="CASCADE"), diff --git a/api/app/repositories/data_config_repository.py b/api/app/repositories/data_config_repository.py index ea9fadea..d26058b2 100644 --- a/api/app/repositories/data_config_repository.py +++ b/api/app/repositories/data_config_repository.py @@ -81,7 +81,7 @@ class DataConfigRepository: n.description AS description, n.entity_type AS entity_type, n.name AS name, - n.fact_summary AS fact_summary, + COALESCE(n.fact_summary, '') AS fact_summary, n.group_id AS group_id, n.apply_id AS apply_id, n.user_id AS user_id, @@ -115,7 +115,7 @@ class DataConfigRepository: description: n.description, entity_type: n.entity_type, name: n.name, - fact_summary: n.fact_summary, + fact_summary: COALESCE(n.fact_summary, ''), id: n.id } AS sourceNode, { @@ -132,7 +132,7 @@ class DataConfigRepository: description: m.description, entity_type: m.entity_type, name: m.name, - fact_summary: m.fact_summary, + fact_summary: COALESCE(m.fact_summary, ''), id: m.id } AS targetNode """ diff --git a/api/app/repositories/memory_increment_repository.py b/api/app/repositories/memory_increment_repository.py index 37396fbd..f3a56622 100644 --- a/api/app/repositories/memory_increment_repository.py +++ b/api/app/repositories/memory_increment_repository.py @@ -25,7 +25,7 @@ class MemoryIncrementRepository: MemoryIncrement, func.row_number().over( partition_by=func.date(MemoryIncrement.created_at), # 按日期分区 - order_by=MemoryIncrement.created_at.desc() # 按时间戳升序排序 + order_by=MemoryIncrement.created_at.desc() # 按时间戳降序排序,取每天最新的 ).label('row_num') ) .filter(MemoryIncrement.workspace_id == workspace_id) @@ -34,14 +34,24 @@ class MemoryIncrementRepository: memory_increment_alias = aliased(MemoryIncrement, subquery) - memory_increments = ( + # 先取最近的limit条记录的子查询 + recent_records_subquery = ( self.db.query(memory_increment_alias) .filter(subquery.c.row_num == 1) # 只取每个日期的第一条(最新的) - .order_by(memory_increment_alias.created_at.asc()) # 按时间戳降序排序 + .order_by(memory_increment_alias.created_at.desc()) # 按时间戳降序排序,取最近的 .limit(limit) + .subquery() + ) + + # 在外层按升序排列(从旧到新) + recent_alias = aliased(MemoryIncrement, recent_records_subquery) + memory_increments = ( + self.db.query(recent_alias) + .order_by(recent_alias.created_at.asc()) # 按时间戳升序排序 .all() ) - db_logger.info(f"成功查询工作空间 {workspace_id} 下的内存增量") + + db_logger.info(f"成功查询工作空间 {workspace_id} 下的内存增量,返回最近 {len(memory_increments)} 条记录") return memory_increments except Exception as e: db_logger.error(f"查询工作空间 {workspace_id} 下内存增量时出错: {str(e)}") diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index aac591b3..cd3cbed7 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -332,7 +332,7 @@ RETURN e.id AS id, e.description AS description, e.aliases AS aliases, e.name_embedding AS name_embedding, - e.fact_summary AS fact_summary, + COALESCE(e.fact_summary, '') AS fact_summary, e.connect_strength AS connect_strength, collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT c.id) AS chunk_ids, diff --git a/api/app/schemas/emotion_schema.py b/api/app/schemas/emotion_schema.py index 9f14884d..37e9a2e3 100644 --- a/api/app/schemas/emotion_schema.py +++ b/api/app/schemas/emotion_schema.py @@ -30,3 +30,9 @@ class EmotionSuggestionsRequest(BaseModel): """获取个性化情绪建议请求""" group_id: str = Field(..., description="组ID") config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") + + +class EmotionGenerateSuggestionsRequest(BaseModel): + """生成个性化情绪建议请求""" + group_id: str = Field(..., description="组ID") + config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") diff --git a/api/app/schemas/implicit_memory_schema.py b/api/app/schemas/implicit_memory_schema.py index e1770b18..ced50b92 100644 --- a/api/app/schemas/implicit_memory_schema.py +++ b/api/app/schemas/implicit_memory_schema.py @@ -262,3 +262,25 @@ InterestCategory = InterestCategoryResponse InterestAreaDistribution = InterestAreaDistributionResponse BehaviorHabit = BehaviorHabitResponse UserProfile = UserProfileResponse + + +# Cache-related Schemas + +class GenerateProfileRequest(BaseModel): + """生成完整用户画像请求""" + end_user_id: str = Field(..., description="终端用户ID") + + +class CompleteProfileResponse(BaseModel): + """完整用户画像响应(包含所有模块)""" + user_id: str + preferences: List[PreferenceTagResponse] + portrait: DimensionPortraitResponse + interest_areas: InterestAreaDistributionResponse + habits: List[BehaviorHabitResponse] + generated_at: datetime.datetime + cached: bool = Field(False, description="是否来自缓存") + + @field_serializer("generated_at", when_used="json") + def _serialize_generated_at(self, dt: datetime.datetime): + return int(dt.timestamp() * 1000) if dt else None diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index bc2d6ca3..c0a66e03 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -527,6 +527,7 @@ class AppChatService: conversation_id: uuid.UUID, config: WorkflowConfig, app_id: uuid.UUID, + release_id: uuid.UUID, workspace_id: uuid.UUID, user_id: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, @@ -549,6 +550,7 @@ class AppChatService: payload=payload, config=config, workspace_id=workspace_id, + release_id=release_id, ) async def workflow_chat_stream( @@ -557,6 +559,7 @@ class AppChatService: conversation_id: uuid.UUID, config: WorkflowConfig, app_id: uuid.UUID, + release_id: uuid.UUID, workspace_id: uuid.UUID, user_id: str = None, variables: Optional[Dict[str, Any]] = None, @@ -565,7 +568,7 @@ class AppChatService: storage_type: Optional[str] = None, user_rag_memory_id: Optional[str] = None, - ) -> AsyncGenerator[str, None]: + ) -> AsyncGenerator[dict, None]: """聊天(流式)""" workflow_service = WorkflowService(self.db) payload = DraftRunRequest( @@ -580,6 +583,7 @@ class AppChatService: payload=payload, config=config, workspace_id=workspace_id, + release_id=release_id ): yield event diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index 6d5204f8..2ac9ac05 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -129,7 +129,7 @@ class AppService: Raises: ResourceNotFoundException: 当应用不存在时 """ - app = get_apps_by_id(self.db,app_id) + app = get_apps_by_id(self.db, app_id) if not app: logger.warning("应用不存在", extra={"app_id": str(app_id)}) raise ResourceNotFoundException("应用", str(app_id)) @@ -227,7 +227,6 @@ class AppService: if not model_api_key: raise ResourceNotFoundException("模型配置", str(multi_agent_config.default_model_config_id)) - # 3. 检查子 Agent 配置 if not multi_agent_config.sub_agents or len(multi_agent_config.sub_agents) == 0: raise BusinessException( @@ -281,10 +280,10 @@ class AppService: ) def _create_agent_config( - self, - app_id: uuid.UUID, - config_data: app_schema.AgentConfigCreate, - now: datetime.datetime + self, + app_id: uuid.UUID, + config_data: app_schema.AgentConfigCreate, + now: datetime.datetime ) -> None: """创建 Agent 配置(内部方法) @@ -313,10 +312,10 @@ class AppService: logger.debug("Agent 配置已创建", extra={"app_id": str(app_id)}) def _create_multi_agent_config( - self, - app_id: uuid.UUID, - config_data: Dict[str, Any], - now: datetime.datetime + self, + app_id: uuid.UUID, + config_data: Dict[str, Any], + now: datetime.datetime ) -> None: """创建多 Agent 配置(内部方法) @@ -411,9 +410,9 @@ class AppService: return 1 if max_ver is None else int(max_ver) + 1 def _convert_to_schema( - self, - app: App, - current_workspace_id: uuid.UUID + self, + app: App, + current_workspace_id: uuid.UUID ) -> app_schema.App: """将 App 模型转换为 Schema,并设置 is_shared 字段 @@ -447,9 +446,9 @@ class AppService: # ==================== 应用管理 ==================== def get_app( - self, - app_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + app_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> App: """获取应用详情 @@ -469,11 +468,11 @@ class AppService: return app def create_app( - self, - *, - user_id: uuid.UUID, - workspace_id: uuid.UUID, - data: app_schema.AppCreate + self, + *, + user_id: uuid.UUID, + workspace_id: uuid.UUID, + data: app_schema.AppCreate ) -> App: """创建应用 @@ -535,11 +534,11 @@ class AppService: raise BusinessException(f"应用创建失败: {str(e)}", BizCode.INTERNAL_ERROR, cause=e) def update_app( - self, - *, - app_id: uuid.UUID, - data: app_schema.AppUpdate, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + data: app_schema.AppUpdate, + workspace_id: Optional[uuid.UUID] = None ) -> App: """更新应用基本信息 @@ -578,10 +577,10 @@ class AppService: return app def delete_app( - self, - *, - app_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> None: """删除应用 @@ -612,12 +611,12 @@ class AppService: ) def copy_app( - self, - *, - app_id: uuid.UUID, - user_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None, - new_name: Optional[str] = None + self, + *, + app_id: uuid.UUID, + user_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None, + new_name: Optional[str] = None ) -> App: """复制应用(包括基础信息和配置) @@ -716,16 +715,16 @@ class AppService: raise BusinessException(f"应用复制失败: {str(e)}", BizCode.INTERNAL_ERROR, cause=e) def list_apps( - self, - *, - workspace_id: uuid.UUID, - type: Optional[str] = None, - visibility: Optional[str] = None, - status: Optional[str] = None, - search: Optional[str] = None, - include_shared: bool = True, - page: int = 1, - pagesize: int = 10, + self, + *, + workspace_id: uuid.UUID, + type: Optional[str] = None, + visibility: Optional[str] = None, + status: Optional[str] = None, + search: Optional[str] = None, + include_shared: bool = True, + page: int = 1, + pagesize: int = 10, ) -> Tuple[List[App], int]: """列出工作空间中的应用(分页) @@ -759,8 +758,7 @@ class AppService: ) # 构建查询条件 - filters = [] - filters.append(App.is_active == True) + filters = [App.is_active == True] if type: filters.append(App.type == type) if visibility: @@ -813,9 +811,9 @@ class AppService: return items, int(total) def get_apps_by_ids( - self, - app_ids: List[str], - workspace_id: uuid.UUID + self, + app_ids: List[str], + workspace_id: uuid.UUID ) -> List[App]: """根据ID列表获取应用 @@ -846,11 +844,11 @@ class AppService: # ==================== Agent 配置管理 ==================== def update_agent_config( - self, - *, - app_id: uuid.UUID, - data: app_schema.AgentConfigUpdate, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + data: app_schema.AgentConfigUpdate, + workspace_id: Optional[uuid.UUID] = None ) -> AgentConfig: """更新 Agent 配置 @@ -875,7 +873,8 @@ class AppService: self._validate_workspace_access(app, workspace_id) - stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active==True).order_by(AgentConfig.updated_at.desc()) + stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by( + AgentConfig.updated_at.desc()) agent_cfg: Optional[AgentConfig] = self.db.scalars(stmt).first() now = datetime.datetime.now() @@ -918,10 +917,10 @@ class AppService: return agent_cfg def get_agent_config( - self, - *, - app_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> AgentConfig: """获取 Agent 配置 @@ -948,7 +947,12 @@ class AppService: # 只读操作,允许访问共享应用 self._validate_app_accessible(app, workspace_id) - stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(AgentConfig.updated_at.desc()) + stmt = select(AgentConfig).where( + AgentConfig.app_id == app_id, + AgentConfig.is_active.is_(True) + ).order_by( + AgentConfig.updated_at.desc() + ) config = self.db.scalars(stmt).first() if config: @@ -1166,13 +1170,13 @@ class AppService: # ==================== 应用发布管理 ==================== def publish( - self, - *, - app_id: uuid.UUID, - publisher_id: uuid.UUID, - version_name: str, - workspace_id: Optional[uuid.UUID] = None, - release_notes: Optional[str] = None + self, + *, + app_id: uuid.UUID, + publisher_id: uuid.UUID, + version_name: str, + workspace_id: Optional[uuid.UUID] = None, + release_notes: Optional[str] = None ) -> AppRelease: """发布应用(创建不可变快照) @@ -1200,7 +1204,8 @@ class AppService: default_model_config_id = None if app.type == AppType.AGENT: - stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(AgentConfig.updated_at.desc()) + stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by( + AgentConfig.updated_at.desc()) agent_cfg = self.db.scalars(stmt).first() if not agent_cfg: raise BusinessException("Agent 应用缺少配置,无法发布", BizCode.AGENT_CONFIG_MISSING) @@ -1236,8 +1241,7 @@ class AppService: default_model_config_id = multi_agent_cfg.default_model_config_id # 4. 构建配置快照 - - + config = { "model_parameters": model_parameters_to_dict(multi_agent_cfg.model_parameters), "master_agent_id": str(multi_agent_cfg.master_agent_id), @@ -1264,6 +1268,7 @@ class AppService: raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING) config = { + "id": str(workflow_cfg.id), "nodes": workflow_cfg.nodes, "edges": workflow_cfg.edges, "variables": workflow_cfg.variables, @@ -1285,7 +1290,7 @@ class AppService: id=uuid.uuid4(), app_id=app_id, version=version, - version_name = version_name, + version_name=version_name, release_notes=release_notes, name=app.name, description=app.description, @@ -1319,10 +1324,10 @@ class AppService: return release def get_current_release( - self, - *, - app_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> Optional[AppRelease]: """获取当前发布版本 @@ -1349,10 +1354,10 @@ class AppService: return self.db.get(AppRelease, app.current_release_id) def list_releases( - self, - *, - app_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> List[AppRelease]: """列出应用的所有发布版本(倒序) @@ -1381,11 +1386,11 @@ class AppService: return list(self.db.scalars(stmt).all()) def rollback( - self, - *, - app_id: uuid.UUID, - version: int, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + version: int, + workspace_id: Optional[uuid.UUID] = None ) -> AppRelease: """回滚到指定版本 @@ -1434,12 +1439,12 @@ class AppService: # ==================== 应用分享功能 ==================== def share_app( - self, - *, - app_id: uuid.UUID, - target_workspace_ids: List[uuid.UUID], - user_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + target_workspace_ids: List[uuid.UUID], + user_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> AppShare: """分享应用到其他工作空间 @@ -1457,7 +1462,6 @@ class AppService: BusinessException: 当应用不在指定工作空间或目标工作空间无效时 """ - logger.info( "分享应用", extra={ @@ -1536,11 +1540,11 @@ class AppService: return shares def unshare_app( - self, - *, - app_id: uuid.UUID, - target_workspace_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + target_workspace_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> None: """取消应用分享 @@ -1594,10 +1598,10 @@ class AppService: ) def list_app_shares( - self, - *, - app_id: uuid.UUID, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + workspace_id: Optional[uuid.UUID] = None ) -> List[AppShare]: """列出应用的所有分享记录 @@ -1637,14 +1641,14 @@ class AppService: # ==================== 试运行功能 ==================== async def draft_run( - self, - *, - app_id: uuid.UUID, - message: str, - conversation_id: Optional[str] = None, - user_id: Optional[str] = None, - variables: Optional[Dict[str, Any]] = None, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + message: str, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + workspace_id: Optional[uuid.UUID] = None ) -> Dict[str, Any]: """试运行 Agent(使用当前草稿配置) @@ -1736,14 +1740,14 @@ class AppService: return result async def draft_run_stream( - self, - *, - app_id: uuid.UUID, - message: str, - conversation_id: Optional[str] = None, - user_id: Optional[str] = None, - variables: Optional[Dict[str, Any]] = None, - workspace_id: Optional[uuid.UUID] = None + self, + *, + app_id: uuid.UUID, + message: str, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + workspace_id: Optional[uuid.UUID] = None ): """试运行 Agent(流式返回) @@ -1794,30 +1798,30 @@ class AppService: # 4. 调用流式试运行服务 draft_service = DraftRunService(self.db) async for event in draft_service.run_stream( - agent_config=agent_cfg, - model_config=model_config, - message=message, - workspace_id=workspace_id, - conversation_id=conversation_id, - user_id=user_id, - variables=variables + agent_config=agent_cfg, + model_config=model_config, + message=message, + workspace_id=workspace_id, + conversation_id=conversation_id, + user_id=user_id, + variables=variables ): yield event # ==================== 多模型对比试运行 ==================== async def draft_run_compare( - self, - *, - app_id: uuid.UUID, - message: str, - models: List[app_schema.ModelCompareItem], - conversation_id: Optional[str] = None, - user_id: Optional[str] = None, - variables: Optional[Dict[str, Any]] = None, - workspace_id: Optional[uuid.UUID] = None, - parallel: bool = True, - timeout: int = 60 + self, + *, + app_id: uuid.UUID, + message: str, + models: List[app_schema.ModelCompareItem], + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + workspace_id: Optional[uuid.UUID] = None, + parallel: bool = True, + timeout: int = 60 ) -> Dict[str, Any]: """多模型对比试运行 @@ -1907,17 +1911,17 @@ class AppService: return result async def draft_run_compare_stream( - self, - *, - app_id: uuid.UUID, - message: str, - models: List[app_schema.ModelCompareItem], - conversation_id: Optional[str] = None, - user_id: Optional[str] = None, - variables: Optional[Dict[str, Any]] = None, - workspace_id: Optional[uuid.UUID] = None, - parallel: bool = True, - timeout: int = 60 + self, + *, + app_id: uuid.UUID, + message: str, + models: List[app_schema.ModelCompareItem], + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + workspace_id: Optional[uuid.UUID] = None, + parallel: bool = True, + timeout: int = 60 ): """多模型对比试运行(流式返回) @@ -1982,15 +1986,15 @@ class AppService: # 4. 调用 DraftRunService 的流式对比方法 draft_service = DraftRunService(self.db) async for event in draft_service.run_compare_stream( - agent_config=agent_cfg, - models=model_configs, - message=message, - workspace_id=workspace_id, - conversation_id=conversation_id, - user_id=user_id, - variables=variables, - parallel=parallel, - timeout=timeout + agent_config=agent_cfg, + models=model_configs, + message=message, + workspace_id=workspace_id, + conversation_id=conversation_id, + user_id=user_id, + variables=variables, + parallel=parallel, + timeout=timeout ): yield event @@ -2009,7 +2013,8 @@ def create_app(db: Session, *, user_id: uuid.UUID, workspace_id: uuid.UUID, data return service.create_app(user_id=user_id, workspace_id=workspace_id, data=data) -def update_app(db: Session, *, app_id: uuid.UUID, data: app_schema.AppUpdate, workspace_id: uuid.UUID | None = None) -> App: +def update_app(db: Session, *, app_id: uuid.UUID, data: app_schema.AppUpdate, + workspace_id: uuid.UUID | None = None) -> App: """更新应用(向后兼容接口)""" service = AppService(db) return service.update_app(app_id=app_id, data=data, workspace_id=workspace_id) @@ -2021,12 +2026,15 @@ def delete_app(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None return service.delete_app(app_id=app_id, workspace_id=workspace_id) -def update_agent_config(db: Session, *, app_id: uuid.UUID, data: app_schema.AgentConfigUpdate, workspace_id: uuid.UUID | None = None) -> AgentConfig: +def update_agent_config(db: Session, *, app_id: uuid.UUID, data: app_schema.AgentConfigUpdate, + workspace_id: uuid.UUID | None = None) -> AgentConfig: """更新 Agent 配置(向后兼容接口)""" service = AppService(db) return service.update_agent_config(app_id=app_id, data=data, workspace_id=workspace_id) -def update_workflow_config(db: Session, *, app_id: uuid.UUID, data: WorkflowConfigUpdate, workspace_id: uuid.UUID | None = None) -> WorkflowConfig: + +def update_workflow_config(db: Session, *, app_id: uuid.UUID, data: WorkflowConfigUpdate, + workspace_id: uuid.UUID | None = None) -> WorkflowConfig: """更新 Agent 配置(向后兼容接口)""" service = AppService(db) return service.update_workflow_config(app_id=app_id, data=data, workspace_id=workspace_id) @@ -2040,6 +2048,7 @@ def get_agent_config(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID service = AppService(db) return service.get_agent_config(app_id=app_id, workspace_id=workspace_id) + def get_workflow_config(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None = None) -> WorkflowConfig: """获取 Agent 配置(向后兼容接口) @@ -2049,13 +2058,20 @@ def get_workflow_config(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UU return service.get_workflow_config(app_id=app_id, workspace_id=workspace_id) -def publish(db: Session, *, app_id: uuid.UUID, publisher_id: uuid.UUID, workspace_id: uuid.UUID | None = None,version_name:str, release_notes: Optional[str] = None) -> AppRelease: +def publish(db: Session, *, app_id: uuid.UUID, publisher_id: uuid.UUID, workspace_id: uuid.UUID | None = None, + version_name: str, release_notes: Optional[str] = None) -> AppRelease: """发布应用(向后兼容接口)""" service = AppService(db) - return service.publish(app_id=app_id, publisher_id=publisher_id,version_name = version_name, workspace_id=workspace_id, release_notes=release_notes) + return service.publish(app_id=app_id, publisher_id=publisher_id, version_name=version_name, + workspace_id=workspace_id, release_notes=release_notes) -def get_current_release(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None = None) -> Optional[AppRelease]: +def get_current_release( + db: Session, + *, + app_id: uuid.UUID, + workspace_id: uuid.UUID | None = None +) -> Optional[AppRelease]: """获取当前发布版本(向后兼容接口)""" service = AppService(db) return service.get_current_release(app_id=app_id, workspace_id=workspace_id) @@ -2074,16 +2090,16 @@ def rollback(db: Session, *, app_id: uuid.UUID, version: int, workspace_id: uuid def list_apps( - db: Session, - *, - workspace_id: uuid.UUID, - type: Optional[str] = None, - visibility: Optional[str] = None, - status: Optional[str] = None, - search: Optional[str] = None, - include_shared: bool = True, - page: int = 1, - pagesize: int = 10, + db: Session, + *, + workspace_id: uuid.UUID, + type: Optional[str] = None, + visibility: Optional[str] = None, + status: Optional[str] = None, + search: Optional[str] = None, + include_shared: bool = True, + page: int = 1, + pagesize: int = 10, ) -> Tuple[List[App], int]: """列出应用(向后兼容接口)""" service = AppService(db) @@ -2100,9 +2116,9 @@ def list_apps( def get_apps_by_ids( - db: Session, - app_ids: List[str], - workspace_id: uuid.UUID + db: Session, + app_ids: List[str], + workspace_id: uuid.UUID ) -> List[App]: """根据ID列表获取应用(向后兼容接口)""" service = AppService(db) @@ -2112,14 +2128,14 @@ def get_apps_by_ids( # ==================== 向后兼容的函数接口 ==================== async def draft_run( - db: Session, - *, - app_id: uuid.UUID, - message: str, - conversation_id: Optional[str] = None, - user_id: Optional[str] = None, - variables: Optional[Dict[str, Any]] = None, - workspace_id: Optional[uuid.UUID] = None + db: Session, + *, + app_id: uuid.UUID, + message: str, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + workspace_id: Optional[uuid.UUID] = None ) -> Dict[str, Any]: """试运行 Agent(向后兼容接口)""" service = AppService(db) @@ -2134,30 +2150,28 @@ async def draft_run( async def draft_run_stream( - db: Session, - *, - app_id: uuid.UUID, - message: str, - conversation_id: Optional[str] = None, - user_id: Optional[str] = None, - variables: Optional[Dict[str, Any]] = None, - workspace_id: Optional[uuid.UUID] = None + db: Session, + *, + app_id: uuid.UUID, + message: str, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + workspace_id: Optional[uuid.UUID] = None ): """试运行 Agent 流式返回(向后兼容接口)""" service = AppService(db) async for event in service.draft_run_stream( - app_id=app_id, - message=message, - conversation_id=conversation_id, - user_id=user_id, - variables=variables, - workspace_id=workspace_id + app_id=app_id, + message=message, + conversation_id=conversation_id, + user_id=user_id, + variables=variables, + workspace_id=workspace_id ): yield event - - # ==================== 依赖注入函数 ==================== def get_app_service( diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py index f2532557..601d2921 100644 --- a/api/app/services/emotion_analytics_service.py +++ b/api/app/services/emotion_analytics_service.py @@ -705,3 +705,75 @@ class EmotionAnalyticsService: health_summary=summary, suggestions=suggestions ) + + async def get_cached_suggestions( + self, + end_user_id: str, + db: Session, + ) -> Optional[Dict[str, Any]]: + """从 Redis 缓存获取个性化情绪建议 + + Args: + end_user_id: 宿主ID(用户组ID) + db: 数据库会话(保留参数以保持接口兼容性) + + Returns: + Dict: 缓存的建议数据,如果不存在或已过期返回 None + """ + try: + from app.cache.memory.emotion_memory import EmotionMemoryCache + + logger.info(f"尝试从 Redis 缓存获取情绪建议: user={end_user_id}") + + # 从 Redis 获取缓存 + cached_data = await EmotionMemoryCache.get_emotion_suggestions(end_user_id) + + if cached_data is None: + logger.info(f"用户 {end_user_id} 的建议缓存不存在或已过期") + return None + + logger.info(f"成功从 Redis 缓存获取建议: user={end_user_id}") + return cached_data + + except Exception as e: + logger.error(f"从 Redis 缓存获取建议失败: {str(e)}", exc_info=True) + return None + + async def save_suggestions_cache( + self, + end_user_id: str, + suggestions_data: Dict[str, Any], + db: Session, + expires_hours: int = 24 + ) -> None: + """保存建议到 Redis 缓存 + + Args: + end_user_id: 宿主ID(用户组ID) + suggestions_data: 建议数据 + db: 数据库会话(保留参数以保持接口兼容性) + expires_hours: 过期时间(小时),默认24小时 + """ + try: + from app.cache.memory.emotion_memory import EmotionMemoryCache + + logger.info(f"保存建议到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时") + + # 计算过期时间(秒) + expire_seconds = expires_hours * 3600 + + # 保存到 Redis + success = await EmotionMemoryCache.set_emotion_suggestions( + user_id=end_user_id, + suggestions_data=suggestions_data, + expire=expire_seconds + ) + + if success: + logger.info(f"建议缓存保存成功: user={end_user_id}") + else: + logger.warning(f"建议缓存保存失败: user={end_user_id}") + + except Exception as e: + logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True) + # 不抛出异常,缓存失败不应影响主流程 \ No newline at end of file diff --git a/api/app/services/home_page_service.py b/api/app/services/home_page_service.py index a84a8214..8326ad40 100644 --- a/api/app/services/home_page_service.py +++ b/api/app/services/home_page_service.py @@ -11,6 +11,22 @@ from app.repositories.home_page_repository import HomePageRepository from app.schemas.home_page_schema import HomeStatistics, WorkspaceInfo class HomePageService: + + DEFAULT_RETURN_DATA: Dict[str, Any] = { + "message": "", + "introduction": { + "codeName": "", + "releaseDate": "", + "upgradePosition": "", + "coreUpgrades": [] + }, + "introduction_en": { + "codeName": "", + "releaseDate": "", + "upgradePosition": "", + "coreUpgrades": [] + } + } @staticmethod def get_home_statistics(db: Session, tenant_id: UUID) -> HomeStatistics: @@ -82,60 +98,36 @@ class HomePageService: :param version: 系统版本号(如 "0.2.0") :return: 对应版本的详细介绍 """ - # 1. 定义 JSON 文件路径(使用 Path 处理跨平台路径问题) - json_file_path = Path(__file__).parent.parent / "version_info.json" - # 转换为绝对路径,便于调试 - json_abs_path = json_file_path.resolve() + # 2. 定义 JSON 文件路径(简化路径处理,保留绝对路径调试特性) + json_abs_path = Path(__file__).parent.parent / "version_info.json" + json_abs_path = json_abs_path.resolve() + + # 3. 初始化返回结果(深拷贝默认模板,避免修改原常量) + from copy import deepcopy + result = deepcopy(HomePageService.DEFAULT_RETURN_DATA) try: - # 2. 读取 JSON 文件 + # 4. 简化文件存在性判断(合并逻辑,减少分支) if not json_abs_path.exists(): - return { - "message": f"版本介绍文件不存在:{json_abs_path}", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } + result["message"] = f"版本介绍文件不存在:{json_abs_path}" + return result + # 5. 读取并解析 JSON 文件(简化文件操作流程) with open(json_abs_path, "r", encoding="utf-8") as f: changelogs = json.load(f) - # 3. 匹配对应版本的介绍,若版本不存在返回默认提示 - if version not in changelogs: - return { - "message": f"暂未查询到 {version} 版本的详细介绍", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } - return changelogs[version] + # 6. 简化版本匹配逻辑,直接返回结果或更新提示信息 + if version in changelogs: + return changelogs[version] + result["message"] = f"暂未查询到 {version} 版本的详细介绍" + return result except FileNotFoundError as e: - # 处理文件不存在异常 - return { - "message": f"系统内部错误:{str(e)}", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } + result["message"] = f"系统内部错误:{str(e)}" + return result except json.JSONDecodeError: - # 处理 JSON 格式错误 - return { - "message": "版本介绍文件格式错误,无法解析 JSON", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } + result["message"] = "版本介绍文件格式错误,无法解析 JSON" + return result except Exception as e: - # 处理其他未知异常 - return { - "message": f"加载版本介绍失败:{str(e)}", - "codeName": "", - "releaseDate": "", - "upgradePosition": "", - "coreUpgrades": [] - } \ No newline at end of file + result["message"] = f"加载版本介绍失败:{str(e)}" + return result \ No newline at end of file diff --git a/api/app/services/implicit_memory_service.py b/api/app/services/implicit_memory_service.py index 8155b7a1..106fa808 100644 --- a/api/app/services/implicit_memory_service.py +++ b/api/app/services/implicit_memory_service.py @@ -7,6 +7,7 @@ user profiles from memory summaries. """ import logging +import asyncio from datetime import datetime from typing import List, Optional @@ -372,4 +373,114 @@ class ImplicitMemoryService: except Exception as e: logger.error(f"Failed to get behavior habits for user {user_id}: {e}") raise - \ No newline at end of file + + + async def generate_complete_profile( + self, + user_id: str + ) -> dict: + """生成完整的用户画像(包含所有4个模块) + + Args: + user_id: 用户ID + + Returns: + Dict: 包含所有模块的完整画像数据 + """ + logger.info(f"生成完整用户画像: user={user_id}") + + try: + # 并行调用4个分析方法 + preferences, portrait, interest_areas, habits = await asyncio.gather( + self.get_preference_tags(user_id=user_id), + self.get_dimension_portrait(user_id=user_id), + self.get_interest_area_distribution(user_id=user_id), + self.get_behavior_habits(user_id=user_id) + ) + + # 转换为可序列化的格式 + profile_data = { + "preferences": [tag.model_dump(mode='json') for tag in preferences], + "portrait": portrait.model_dump(mode='json'), + "interest_areas": interest_areas.model_dump(mode='json'), + "habits": [habit.model_dump(mode='json') for habit in habits] + } + + logger.info(f"完整用户画像生成完成: user={user_id}") + return profile_data + + except Exception as e: + logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True) + raise + + async def get_cached_profile( + self, + end_user_id: str, + db: Session + ) -> Optional[dict]: + """从 Redis 缓存获取完整用户画像 + + Args: + end_user_id: 终端用户ID + db: 数据库会话(保留参数以保持接口兼容性) + + Returns: + Dict: 缓存的画像数据,如果不存在或已过期返回 None + """ + try: + from app.cache.memory.implicit_memory import ImplicitMemoryCache + + logger.info(f"尝试从 Redis 缓存获取用户画像: user={end_user_id}") + + # 从 Redis 获取缓存 + cached_data = await ImplicitMemoryCache.get_user_profile(end_user_id) + + if cached_data is None: + logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") + return None + + logger.info(f"成功从 Redis 缓存获取用户画像: user={end_user_id}") + return cached_data + + except Exception as e: + logger.error(f"从 Redis 缓存获取用户画像失败: {str(e)}", exc_info=True) + return None + + async def save_profile_cache( + self, + end_user_id: str, + profile_data: dict, + db: Session, + expires_hours: int = 168 # 默认7天 + ) -> None: + """保存用户画像到 Redis 缓存 + + Args: + end_user_id: 终端用户ID + profile_data: 画像数据 + db: 数据库会话(保留参数以保持接口兼容性) + expires_hours: 过期时间(小时),默认168小时(7天) + """ + try: + from app.cache.memory.implicit_memory import ImplicitMemoryCache + + logger.info(f"保存用户画像到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时") + + # 计算过期时间(秒) + expire_seconds = expires_hours * 3600 + + # 保存到 Redis + success = await ImplicitMemoryCache.set_user_profile( + user_id=end_user_id, + profile_data=profile_data, + expire=expire_seconds + ) + + if success: + logger.info(f"用户画像缓存保存成功: user={end_user_id}") + else: + logger.warning(f"用户画像缓存保存失败: user={end_user_id}") + + except Exception as e: + logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True) + # 不抛出异常,缓存失败不应影响主流程 diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 974d5418..b7d5df02 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -4,7 +4,7 @@ import datetime import logging import uuid -from typing import Any, Annotated, AsyncGenerator +from typing import Any, Annotated, AsyncGenerator, Optional from deprecated import deprecated from fastapi import Depends @@ -14,15 +14,14 @@ from app.core.error_codes import BizCode from app.core.exceptions import BusinessException from app.core.workflow.validator import validate_workflow_config from app.db import get_db -from app.models.conversation_model import Message from app.models.workflow_model import WorkflowConfig, WorkflowExecution -from app.repositories.conversation_repository import MessageRepository from app.repositories.workflow_repository import ( WorkflowConfigRepository, WorkflowExecutionRepository, WorkflowNodeExecutionRepository ) from app.schemas import DraftRunRequest +from app.services.conversation_service import ConversationService from app.services.multi_agent_service import convert_uuids_to_str logger = logging.getLogger(__name__) @@ -36,7 +35,7 @@ class WorkflowService: self.config_repo = WorkflowConfigRepository(db) self.execution_repo = WorkflowExecutionRepository(db) self.node_execution_repo = WorkflowNodeExecutionRepository(db) - self.message_repo = MessageRepository(db) + self.conversation_service = ConversationService(db) # ==================== 配置管理 ==================== @@ -266,6 +265,7 @@ class WorkflowService: workflow_config_id: uuid.UUID, app_id: uuid.UUID, trigger_type: str, + release_id: uuid.UUID | None = None, triggered_by: uuid.UUID | None = None, conversation_id: uuid.UUID | None = None, input_data: dict[str, Any] | None = None @@ -273,6 +273,7 @@ class WorkflowService: """创建工作流执行记录 Args: + release_id: 应用发布 ID workflow_config_id: 工作流配置 ID app_id: 应用 ID trigger_type: 触发类型 @@ -289,6 +290,7 @@ class WorkflowService: execution = WorkflowExecution( workflow_config_id=workflow_config_id, app_id=app_id, + release_id=release_id, conversation_id=conversation_id, execution_id=execution_id, trigger_type=trigger_type, @@ -337,6 +339,7 @@ class WorkflowService: self, execution_id: str, status: str, + token_usage: int | None = None, output_data: dict[str, Any] | None = None, error_message: str | None = None, error_node_id: str | None = None @@ -346,6 +349,7 @@ class WorkflowService: Args: execution_id: 执行 ID status: 状态 + token_usage: token消耗 output_data: 输出数据 error_message: 错误信息 error_node_id: 出错节点 ID @@ -364,6 +368,8 @@ class WorkflowService: ) execution.status = status + if token_usage is not None: + execution.token_usage = token_usage if output_data is not None: execution.output_data = convert_uuids_to_str(output_data) if error_message is not None: @@ -414,12 +420,14 @@ class WorkflowService: payload: DraftRunRequest, config: WorkflowConfig, workspace_id: uuid.UUID, + release_id: uuid.UUID | None = None, ): """运行工作流 Args: - workspace_id: - config: + release_id: 发布 ID + workspace_id:工作空间 ID + config: 配置 payload: app_id: 应用 ID @@ -463,7 +471,8 @@ class WorkflowService: trigger_type="manual", triggered_by=None, conversation_id=conversation_id_uuid, - input_data=input_data + input_data=input_data, + release_id=release_id, ) # 3. 构建工作流配置字典 @@ -507,20 +516,20 @@ class WorkflowService: # 更新执行结果 if result.get("status") == "completed": + token_usage = result.get("token_usage", {}) or {} self.update_execution_status( execution.execution_id, "completed", - output_data=result + output_data=result, + token_usage=token_usage.get("total_tokens", None) ) final_messages = result.get("messages", [])[init_message_length:] for message in final_messages: - message_obj = Message( + self.conversation_service.add_message( conversation_id=conversation_id_uuid, role=message["role"], - content=message["content"], + content=message["content"] ) - self.message_repo.add_message(message_obj) - self.db.commit() logger.info(f"Workflow Run Success, " f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") else: @@ -562,10 +571,12 @@ class WorkflowService: payload: DraftRunRequest, config: WorkflowConfig, workspace_id: uuid.UUID, + release_id: Optional[uuid.UUID] = None, ): """运行工作流(流式) Args: + release_id: 发布id workspace_id: app_id: 应用 ID payload: 请求对象(包含 message, variables, conversation_id 等) @@ -611,7 +622,8 @@ class WorkflowService: trigger_type="manual", triggered_by=None, conversation_id=conversation_id_uuid, - input_data=input_data + input_data=input_data, + release_id=release_id, ) # 3. 构建工作流配置字典 @@ -653,21 +665,21 @@ class WorkflowService: if event.get("event") == "workflow_end": status = event.get("data", {}).get("status") + token_usage = event.get("data", {}).get("token_usage", {}) or {} if status == "completed": self.update_execution_status( execution.execution_id, "completed", - output_data=event.get("data") + output_data=event.get("data"), + token_usage=token_usage.get("total_tokens", None) ) final_messages = event.get("data", {}).get("messages", [])[init_message_length:] for message in final_messages: - message_obj = Message( + self.conversation_service.add_message( conversation_id=conversation_id_uuid, role=message["role"], - content=message["content"], + content=message["content"] ) - self.message_repo.add_message(message_obj) - self.db.commit() logger.info(f"Workflow Run Success, " f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") elif status == "failed": @@ -784,10 +796,12 @@ class WorkflowService: # 更新执行结果 if result.get("status") == "completed": + token_usage = result.get("data").get("token_usage", {}) or {} self.update_execution_status( execution.execution_id, "completed", - output_data=result.get("node_outputs", {}) + output_data=result.get("node_outputs", {}), + token_usage=token_usage.get("total_tokens", None) ) else: self.update_execution_status( @@ -882,13 +896,14 @@ class WorkflowService: ): # 直接转发事件(executor 已经返回正确格式) if event.get("event") == "workflow_end": - + token_usage = event.get("data").get("token_usage", {}) or {} status = event.get("data", {}).get("status") if status == "completed": self.update_execution_status( execution_id, "completed", - output_data=event.get("data") + output_data=event.get("data"), + token_usage=token_usage.get("total_tokens", None) ) elif status == "failed": self.update_execution_status( diff --git a/api/app/templates/workflows/simple_qa/template.yml b/api/app/templates/workflows/simple_qa/template.yml index 14de4a73..2cf0f9b1 100644 --- a/api/app/templates/workflows/simple_qa/template.yml +++ b/api/app/templates/workflows/simple_qa/template.yml @@ -53,7 +53,7 @@ nodes: type: end name: 结束 config: - output: "{{ llm_qa.output }}" + output: "{{llm_qa.output}}" position: x: 900 y: 100 diff --git a/api/app/utils/app_config_utils.py b/api/app/utils/app_config_utils.py index 4a35a4cc..514e4565 100644 --- a/api/app/utils/app_config_utils.py +++ b/api/app/utils/app_config_utils.py @@ -120,12 +120,9 @@ def multi_agent_config_4_app_release(release: AppRelease) -> MultiAgentConfig: def workflow_config_4_app_release(release: AppRelease) -> WorkflowConfig: config_dict = release.config - with get_db_read() as db: - source_config = WorkflowConfigRepository(db).get_by_app_id(release.app_id) - source_config_id = source_config.id config = WorkflowConfig( - id=source_config_id, + id=config_dict.get("id"), app_id=release.app_id, nodes=config_dict.get("nodes", []), edges=config_dict.get("edges", []), diff --git a/api/app/version_info.json b/api/app/version_info.json index 87e313e4..20896845 100644 --- a/api/app/version_info.json +++ b/api/app/version_info.json @@ -1,33 +1,68 @@ { "v0.2.0": { - "codeName": "启知", - "releaseDate": "2026-1-16", - "upgradePosition": "本次为架构升级,核心目标是把“被动存储”升级为“主动认知”,让系统具备情绪感知、情景理解与类人记忆机制,为后续多智能体协作与专业场景落地奠定底座。", - "coreUpgrades": [ - "记忆详情:拟人记忆——情绪引擎、情景记忆、短期记忆、工作记忆、感知记忆、显性记忆、隐性记忆,并配套类脑遗忘机制,实现从感知→情绪→情景→长期沉淀的完整人类记忆闭环", - "可视化工作流:拖拽式节点编排(LLM、知识库、逻辑、工具),业务落地周期由天缩至小时。", - "多模态知识处理:PDF、PPT、MP3、MP4 一键解析,时间感知检索准确率 94.3%,问答对数据即插即用。", - "Agent集群内置“记忆-知识-工具-审核”四类角色模板,用户一键生成;主控Agent把复杂任务拆为子任务并行分发,再靠情景记忆统一消解冲突、校验一致性,输出完整报告。" - ] + "introduction": { + "codeName": "启知", + "releaseDate": "2026-1-16", + "upgradePosition": "本次为架构升级,核心目标是把\"被动存储\"升级为\"主动认知\",让系统具备情绪感知、情景理解与类人记忆机制,为后续多智能体协作与专业场景落地奠定底座。", + "coreUpgrades": [ + "记忆详情:拟人记忆——情绪引擎、情景记忆、短期记忆、工作记忆、感知记忆、显性记忆、隐性记忆,并配套类脑遗忘机制,实现从感知→情绪→情景→长期沉淀的完整人类记忆闭环", + "可视化工作流:拖拽式节点编排(LLM、知识库、逻辑、工具),业务落地周期由天缩至小时。", + "多模态知识处理:PDF、PPT、MP3、MP4 一键解析,时间感知检索准确率 94.3%,问答对数据即插即用。", + "Agent集群内置\"记忆-知识-工具-审核\"四类角色模板,用户一键生成;主控Agent把复杂任务拆为子任务并行分发,再靠情景记忆统一消解冲突、校验一致性,输出完整报告。" + ] + }, + "introduction_en": { + "codeName": "Qizhi", + "releaseDate": "2026-1-16", + "upgradePosition": "This release marks a foundational upgrade to the system’s cognitive architecture. The core objective is to evolve the platform from passive information storage into active cognitive intelligence—enabling emotional awareness, situational understanding, and human-like memory mechanisms. This upgrade lays the groundwork for future multi-agent collaboration and domain-specific, production-grade AI applications.", + "coreUpgrades": [ + "Human-Like Memory Architecture: A comprehensive, human-inspired memory system is introduced, encompassing emotional processing, situational memory, short-term and working memory, perceptual memory, as well as explicit and implicit memory. Combined with brain-inspired forgetting mechanisms, the system now supports a complete cognitive loop—from perception → emotion → context → long-term consolidation, closely mirroring human memory formation.", + "Visual Workflow Orchestration: A fully visual, drag-and-drop workflow enables modular composition of LLMs, knowledge bases, logic, and tools. This dramatically reduces the time required to move from experimentation to production—from days to hours.", + "Multimodal Knowledge Processing: The system now supports one-click parsing and ingestion of PDF, PPT, MP3, and MP4 content. With time-aware retrieval accuracy reaching 94.3%, structured Q&A data becomes instantly usable for downstream reasoning and generation.", + "Built-in Agent Clusters: Predefined role templates across four categories—Memory, Knowledge, Tools, and Review—can be generated with a single click. A Coordinator Agent decomposes complex tasks into parallel subtasks, while situational memory is used to resolve conflicts, validate consistency, and synthesize outputs into a coherent, end-to-end report." + ] + } }, "v0.1.0": { - "codeName": "初心", - "releaseDate": "2025-12-01", - "upgradePosition": "这是一款专注于管理和利用AI记忆的工具,支持RAG和知识图谱两种主流存储方式,旨在为AI应用提供持久化、结构化的“记忆”能力。", - "coreUpgrades": [ - "记忆空间:用户可以创建独立的空间来隔离不同记忆,并灵活选择存储方式。", - "记忆配置:简化了配置流程,内置自动提取关键信息的“记忆萃取”和管理生命周期的\"遗忘\"引擎。", - "知识检索:提供语义、分词和混合三种检索模式,并支持多种参数微调和结果重排序,以提升召回效果。", - "全局管理:支持统一设置默认检索参数,并可一键应用到所有知识库。", - "测试与调试:内置\"召回测试\"功能,方便用户实时验证检索效果并调整参数,支持通过分享码与他人协作。", - "记忆洞察:可查看详细的对话记录、用户画像和分析报告,帮助理解AI的\"记忆\"内容。", - "集成与管理:提供API Key用于系统集成,并包含基本的用户管理功能。", - "界面与体验:采用现代化的卡片式布局和渐变色设计,注重交互的流畅性和视觉美感。", - "起步与使用:文档中提供了清晰的基础使用流程,引导用户从创建空间、配置记忆到测试检索快速上手。", - "版本说明与限制: 记忆熊 v0.1.0 版本\"初心\"囊括智能记忆管理的核心思路和基础能力,为后续开发奠定了基础。", - "文档资源:用户手册、API文档、FAQ", - "问题反馈:GitHub Issues、邮件支持", - "致谢:感谢所有参与测试和提供反馈的用户!" - ] + "introduction": { + "codeName": "初心", + "releaseDate": "2025-12-01", + "upgradePosition": "这是一款专注于管理和利用AI记忆的工具,支持RAG和知识图谱两种主流存储方式,旨在为AI应用提供持久化、结构化的\"记忆\"能力。", + "coreUpgrades": [ + "记忆空间:用户可以创建独立的空间来隔离不同记忆,并灵活选择存储方式。", + "记忆配置:简化了配置流程,内置自动提取关键信息的\"记忆萃取\"和管理生命周期的\"遗忘\"引擎。", + "知识检索:提供语义、分词和混合三种检索模式,并支持多种参数微调和结果重排序,以提升召回效果。", + "全局管理:支持统一设置默认检索参数,并可一键应用到所有知识库。", + "测试与调试:内置\"召回测试\"功能,方便用户实时验证检索效果并调整参数,支持通过分享码与他人协作。", + "记忆洞察:可查看详细的对话记录、用户画像和分析报告,帮助理解AI的\"记忆\"内容。", + "集成与管理:提供API Key用于系统集成,并包含基本的用户管理功能。", + "界面与体验:采用现代化的卡片式布局和渐变色设计,注重交互的流畅性和视觉美感。", + "起步与使用:文档中提供了清晰的基础使用流程,引导用户从创建空间、配置记忆到测试检索快速上手。", + "版本说明与限制: 记忆熊 v0.1.0 版本\"初心\"囊括智能记忆管理的核心思路和基础能力,为后续开发奠定了基础。", + "文档资源:用户手册、API文档、FAQ", + "问题反馈:GitHub Issues、邮件支持", + "致谢:感谢所有参与测试和提供反馈的用户!" + ] + }, + "introduction_en": { + "codeName": "Original Intent", + "releaseDate": "2025-12-01", + "upgradePosition": "A tool focused on managing and utilizing AI memory, supporting both RAG and knowledge graph storage methods, aiming to provide persistent and structured 'memory' capabilities for AI applications.", + "coreUpgrades": [ + "Memory Space: Users can create independent spaces to isolate different memories and flexibly choose storage methods.", + "Memory Configuration: Simplified configuration process with built-in 'memory extraction' for automatic key information extraction and 'forgetting' engine for lifecycle management.", + "Knowledge Retrieval: Provides semantic, tokenization, and hybrid retrieval modes with various parameter tuning and result reranking to improve recall.", + "Global Management: Supports unified default retrieval parameter settings with one-click application to all knowledge bases.", + "Testing & Debugging: Built-in 'recall testing' for real-time verification of retrieval effects and parameter adjustment, with sharing code support for collaboration.", + "Memory Insights: View detailed conversation records, user profiles, and analysis reports to understand AI 'memory' content.", + "Integration & Management: Provides API Key for system integration with basic user management features.", + "Interface & Experience: Modern card-based layout with gradient design, focusing on interaction fluidity and visual aesthetics.", + "Getting Started: Documentation provides clear basic usage flow, guiding users from creating spaces, configuring memory to testing retrieval.", + "Version Notes: MemoryBear v0.1.0 'Original Intent' encompasses core concepts and basic capabilities of intelligent memory management, laying foundation for future development.", + "Documentation: User Manual, API Documentation, FAQ", + "Feedback: GitHub Issues, Email Support", + "Acknowledgments: Thanks to all users who participated in testing and provided feedback!" + ] + } } -} \ No newline at end of file +} diff --git a/api/migrations/versions/1fd7d0e703b3_20260116.py b/api/migrations/versions/1fd7d0e703b3_20260116.py new file mode 100644 index 00000000..0b1453cb --- /dev/null +++ b/api/migrations/versions/1fd7d0e703b3_20260116.py @@ -0,0 +1,62 @@ +"""20260116 + +Revision ID: 1fd7d0e703b3 +Revises: 9ab9b6393f32 +Create Date: 2026-01-16 13:17:37.060026 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '1fd7d0e703b3' +down_revision: Union[str, None] = '9ab9b6393f32' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('emotion_suggestions_cache', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('end_user_id', sa.String(length=255), nullable=False, comment='终端用户ID(组ID)'), + sa.Column('health_summary', sa.Text(), nullable=False, comment='健康状态摘要'), + sa.Column('suggestions', sa.JSON(), nullable=False, comment='建议列表(JSON格式)'), + sa.Column('generated_at', sa.DateTime(), nullable=False, comment='生成时间'), + sa.Column('expires_at', sa.DateTime(), nullable=True, comment='过期时间'), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_emotion_suggestions_cache_end_user_id'), 'emotion_suggestions_cache', ['end_user_id'], unique=True) + op.create_index(op.f('ix_emotion_suggestions_cache_id'), 'emotion_suggestions_cache', ['id'], unique=False) + op.create_table('implicit_memory_cache', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('end_user_id', sa.String(length=255), nullable=False, comment='终端用户ID'), + sa.Column('preferences', sa.JSON(), nullable=False, comment='偏好标签列表(JSON格式)'), + sa.Column('portrait', sa.JSON(), nullable=False, comment='四维画像对象(JSON格式)'), + sa.Column('interest_areas', sa.JSON(), nullable=False, comment='兴趣领域分布对象(JSON格式)'), + sa.Column('habits', sa.JSON(), nullable=False, comment='行为习惯列表(JSON格式)'), + sa.Column('generated_at', sa.DateTime(), nullable=False, comment='生成时间'), + sa.Column('expires_at', sa.DateTime(), nullable=True, comment='过期时间'), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_implicit_memory_cache_end_user_id'), 'implicit_memory_cache', ['end_user_id'], unique=True) + op.create_index(op.f('ix_implicit_memory_cache_id'), 'implicit_memory_cache', ['id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_implicit_memory_cache_id'), table_name='implicit_memory_cache') + op.drop_index(op.f('ix_implicit_memory_cache_end_user_id'), table_name='implicit_memory_cache') + op.drop_table('implicit_memory_cache') + op.drop_index(op.f('ix_emotion_suggestions_cache_id'), table_name='emotion_suggestions_cache') + op.drop_index(op.f('ix_emotion_suggestions_cache_end_user_id'), table_name='emotion_suggestions_cache') + op.drop_table('emotion_suggestions_cache') + # ### end Alembic commands ### diff --git a/api/migrations/versions/8cd790908f92_202601191615.py b/api/migrations/versions/8cd790908f92_202601191615.py new file mode 100644 index 00000000..8e4624ee --- /dev/null +++ b/api/migrations/versions/8cd790908f92_202601191615.py @@ -0,0 +1,34 @@ +"""202601191615 + +Revision ID: 8cd790908f92 +Revises: 1fd7d0e703b3 +Create Date: 2026-01-19 16:15:35.058649 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '8cd790908f92' +down_revision: Union[str, None] = '1fd7d0e703b3' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('workflow_executions', sa.Column('release_id', sa.UUID(), nullable=True)) + op.create_index(op.f('ix_workflow_executions_release_id'), 'workflow_executions', ['release_id'], unique=False) + op.create_foreign_key(None, 'workflow_executions', 'app_releases', ['release_id'], ['id'], ondelete='CASCADE') + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint(None, 'workflow_executions', type_='foreignkey') + op.drop_index(op.f('ix_workflow_executions_release_id'), table_name='workflow_executions') + op.drop_column('workflow_executions', 'release_id') + # ### end Alembic commands ### diff --git a/web/src/api/common.ts b/web/src/api/common.ts index 01568640..2f6033d1 100644 --- a/web/src/api/common.ts +++ b/web/src/api/common.ts @@ -31,6 +31,12 @@ export interface versionResponse{ coreUpgrades: string[]; codeName: string; }; + introduction_en?: { + releaseDate: string; + upgradePosition: string; + coreUpgrades: string[]; + codeName: string; + }; } // 首页数据统计 export const getDashboardData = `/home-page/workspaces` diff --git a/web/src/assets/images/workflow/deleteBg.svg b/web/src/assets/images/workflow/deleteBg.svg new file mode 100644 index 00000000..f3827fef --- /dev/null +++ b/web/src/assets/images/workflow/deleteBg.svg @@ -0,0 +1,21 @@ + + + 编组 33 + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/web/src/assets/images/workflow/deleteBg_hover.svg b/web/src/assets/images/workflow/deleteBg_hover.svg new file mode 100644 index 00000000..9e92cf75 --- /dev/null +++ b/web/src/assets/images/workflow/deleteBg_hover.svg @@ -0,0 +1,22 @@ + + + 编组 33 + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/web/src/assets/images/workflow/delete_cycle.svg b/web/src/assets/images/workflow/delete_cycle.svg new file mode 100644 index 00000000..0d85650d --- /dev/null +++ b/web/src/assets/images/workflow/delete_cycle.svg @@ -0,0 +1,18 @@ + + + 编组 33 + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/web/src/assets/images/workflow/recall.svg b/web/src/assets/images/workflow/recall.svg new file mode 100644 index 00000000..756f9060 --- /dev/null +++ b/web/src/assets/images/workflow/recall.svg @@ -0,0 +1,18 @@ + + + 召回 + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/web/src/assets/images/workflow/recall_hover.svg b/web/src/assets/images/workflow/recall_hover.svg new file mode 100644 index 00000000..a2e949a0 --- /dev/null +++ b/web/src/assets/images/workflow/recall_hover.svg @@ -0,0 +1,18 @@ + + + 召回 + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/web/src/components/FormItem/DescWrapper.tsx b/web/src/components/FormItem/DescWrapper.tsx new file mode 100644 index 00000000..300fc2b6 --- /dev/null +++ b/web/src/components/FormItem/DescWrapper.tsx @@ -0,0 +1,12 @@ +import clsx from "clsx"; +import type { FC, ReactNode } from "react"; + +const DescWrapper: FC<{desc: string | ReactNode, className?: string}> = ({desc, className}) => { + return ( +
+ {desc} +
+ ) +} + +export default DescWrapper \ No newline at end of file diff --git a/web/src/components/FormItem/LabelWrapper.tsx b/web/src/components/FormItem/LabelWrapper.tsx new file mode 100644 index 00000000..461250d8 --- /dev/null +++ b/web/src/components/FormItem/LabelWrapper.tsx @@ -0,0 +1,13 @@ +import clsx from "clsx"; +import type { FC, ReactNode } from "react"; + +const LabelWrapper: FC<{ title: string | ReactNode, className?: string; children?: ReactNode}> = ({title, className, children}) => { + return ( +
+
{title}
+ {children} +
+ ) +} + +export default LabelWrapper \ No newline at end of file diff --git a/web/src/components/FormItem/SwitchFormItem.tsx b/web/src/components/FormItem/SwitchFormItem.tsx new file mode 100644 index 00000000..e17a8728 --- /dev/null +++ b/web/src/components/FormItem/SwitchFormItem.tsx @@ -0,0 +1,45 @@ +import { Switch, Form, ConfigProvider } from "antd"; +import useSize from 'antd/lib/config-provider/hooks/useSize' +import type { FC, ReactNode } from "react"; +import { useContext } from "react"; + +import LabelWrapper from './LabelWrapper' +import DescWrapper from './DescWrapper' + +interface SwitchFormItemProps { + title: string | ReactNode; + desc?: string | ReactNode; + name: string | string[]; + size?: 'small' | 'default' + className?: string; + disabled?: boolean; +} + +const SwitchFormItem: FC = ({ + title, + desc, + name, + size = 'default', + className, + disabled +}) => { + const componentSize = useSize() + console.log('componentSize', componentSize) + + return ( +
+ + {desc && } + + + + +
+ ) +} + +export default SwitchFormItem \ No newline at end of file diff --git a/web/src/components/Header/index.tsx b/web/src/components/Header/index.tsx index 9aeeab6b..fac432f5 100644 --- a/web/src/components/Header/index.tsx +++ b/web/src/components/Header/index.tsx @@ -54,7 +54,7 @@ const AppHeader: FC<{source?: 'space' | 'manage';}> = ({source = 'manage'}) => { key: '1', label: (<>
{user.username}
-
{user.email}
+
{user.email}
), }, { diff --git a/web/src/components/Layout/AuthLayout.tsx b/web/src/components/Layout/AuthLayout.tsx index 94d28e11..a969298d 100644 --- a/web/src/components/Layout/AuthLayout.tsx +++ b/web/src/components/Layout/AuthLayout.tsx @@ -13,7 +13,7 @@ const { Content } = Layout; // 认证布局组件,使用useRouteGuard hook进行路由鉴权 const AuthLayout: FC = () => { - const { getUserInfo, getStorageType } = useUser(); + const { getUserInfo } = useUser(); // 使用路由守卫hook处理认证和权限检查 useRouteGuard('manage'); // 自动更新面包屑导航 @@ -24,7 +24,6 @@ const AuthLayout: FC = () => { window.location.href = `/#/login`; } else { getUserInfo() - getStorageType() } }, []); diff --git a/web/src/components/Markdown/index.tsx b/web/src/components/Markdown/index.tsx index d16b72e4..58650207 100644 --- a/web/src/components/Markdown/index.tsx +++ b/web/src/components/Markdown/index.tsx @@ -150,9 +150,19 @@ const RbMarkdown: FC = ({ ) } + // 处理键盘快捷键 + const handleKeyDown = (e: React.KeyboardEvent) => { + if ((e.ctrlKey || e.metaKey) && e.key === 'c') { + const selection = window.getSelection() + if (selection && selection.toString()) { + navigator.clipboard.writeText(selection.toString()) + } + } + } + // 预览模式 return ( -
+