Merge branch 'refs/heads/develop' into fix/memory_mcp2_1

# Conflicts:
#	api/app/services/memory_agent_service.py
This commit is contained in:
lixinyue
2026-01-19 11:51:16 +08:00
80 changed files with 2243 additions and 805 deletions

View File

@@ -11,15 +11,16 @@ from app.core.response_utils import success
from app.db import get_db
from app.dependencies import get_current_user, cur_workspace_access_guard
from app.models import User
from app.models.app_model import AppType, App
from app.models.app_model import AppType
from app.repositories import knowledge_repository
from app.repositories.end_user_repository import EndUserRepository
from app.schemas import app_schema
from app.schemas.response_schema import PageData, PageMeta
from app.schemas.workflow_schema import WorkflowConfig as WorkflowConfigSchema
from app.schemas.workflow_schema import WorkflowConfigUpdate
from app.services import app_service, workspace_service
from app.services.agent_config_helper import enrich_agent_config
from app.services.app_service import AppService
from app.schemas.workflow_schema import WorkflowConfig as WorkflowConfigSchema
from app.services.workflow_service import WorkflowService, get_workflow_service
router = APIRouter(prefix="/apps", tags=["Apps"])
@@ -405,6 +406,15 @@ async def draft_run(
# 只读操作,允许访问共享应用
service._validate_app_accessible(app, workspace_id)
if payload.user_id is None:
end_user_repo = EndUserRepository(db)
new_end_user = end_user_repo.get_or_create_end_user(
app_id=app_id,
other_id=str(current_user.id),
original_user_id=str(current_user.id) # Save original user_id to other_id
)
payload.user_id = str(new_end_user.id)
# 处理会话ID创建或验证
conversation_id = await draft_service._ensure_conversation(
conversation_id=payload.conversation_id,

View File

@@ -18,6 +18,7 @@ from app.models.user_model import User
from app.schemas.emotion_schema import (
EmotionHealthRequest,
EmotionSuggestionsRequest,
EmotionGenerateSuggestionsRequest,
EmotionTagsRequest,
EmotionWordcloudRequest,
)
@@ -58,7 +59,7 @@ async def get_emotion_tags(
"limit": request.limit
}
)
# 调用服务层
data = await emotion_service.get_emotion_tags(
end_user_id=request.group_id,
@@ -67,7 +68,7 @@ async def get_emotion_tags(
end_date=request.end_date,
limit=request.limit
)
api_logger.info(
"情绪标签统计获取成功",
extra={
@@ -76,9 +77,9 @@ async def get_emotion_tags(
"tags_count": len(data.get("tags", []))
}
)
return success(data=data, msg="情绪标签获取成功")
except Exception as e:
api_logger.error(
f"获取情绪标签统计失败: {str(e)}",
@@ -107,14 +108,14 @@ async def get_emotion_wordcloud(
"limit": request.limit
}
)
# 调用服务层
data = await emotion_service.get_emotion_wordcloud(
end_user_id=request.group_id,
emotion_type=request.emotion_type,
limit=request.limit
)
api_logger.info(
"情绪词云数据获取成功",
extra={
@@ -122,9 +123,9 @@ async def get_emotion_wordcloud(
"total_keywords": data.get("total_keywords", 0)
}
)
return success(data=data, msg="情绪词云获取成功")
except Exception as e:
api_logger.error(
f"获取情绪词云数据失败: {str(e)}",
@@ -151,7 +152,7 @@ async def get_emotion_health(
status_code=status.HTTP_400_BAD_REQUEST,
detail="时间范围参数无效,必须是 7d、30d 或 90d"
)
api_logger.info(
f"用户 {current_user.username} 请求获取情绪健康指数",
extra={
@@ -159,13 +160,13 @@ async def get_emotion_health(
"time_range": request.time_range
}
)
# 调用服务层
data = await emotion_service.calculate_emotion_health_index(
end_user_id=request.group_id,
time_range=request.time_range
)
api_logger.info(
"情绪健康指数获取成功",
extra={
@@ -174,9 +175,9 @@ async def get_emotion_health(
"level": data.get("level", "未知")
}
)
return success(data=data, msg="情绪健康指数获取成功")
except HTTPException:
raise
except Exception as e:
@@ -198,15 +199,80 @@ async def get_emotion_suggestions(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取个性化情绪建议
"""获取个性化情绪建议(从缓存读取)
Args:
request: 包含 group_id 和可选的 config_id
db: 数据库会话
current_user: 当前用户
Returns:
个性化情绪建议响应
缓存的个性化情绪建议响应
"""
try:
api_logger.info(
f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)",
extra={
"group_id": request.group_id,
"config_id": request.config_id
}
)
# 从缓存获取建议
data = await emotion_service.get_cached_suggestions(
end_user_id=request.group_id,
db=db
)
if data is None:
# 缓存不存在或已过期
api_logger.info(
f"用户 {request.group_id} 的建议缓存不存在或已过期",
extra={"group_id": request.group_id}
)
return fail(
BizCode.RESOURCE_NOT_FOUND,
"建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议",
None
)
api_logger.info(
"个性化建议获取成功(缓存)",
extra={
"group_id": request.group_id,
"suggestions_count": len(data.get("suggestions", []))
}
)
return success(data=data, msg="个性化建议获取成功(缓存)")
except Exception as e:
api_logger.error(
f"获取个性化建议失败: {str(e)}",
extra={"group_id": request.group_id},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取个性化建议失败: {str(e)}"
)
@router.post("/generate_suggestions", response_model=ApiResponse)
async def generate_emotion_suggestions(
request: EmotionGenerateSuggestionsRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""生成个性化情绪建议调用LLM并缓存
Args:
request: 包含 group_id、可选的 config_id 和 force_refresh
db: 数据库会话
current_user: 当前用户
Returns:
新生成的个性化情绪建议响应
"""
try:
# 验证 config_id如果提供
@@ -232,38 +298,46 @@ async def get_emotion_suggestions(
return fail(BizCode.INVALID_PARAMETER, "配置ID无效", f"配置 {config_id} 不存在")
except Exception as e:
return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e))
api_logger.info(
f"用户 {current_user.username} 请求获取个性化情绪建议",
f"用户 {current_user.username} 请求生成个性化情绪建议",
extra={
"group_id": request.group_id,
"config_id": config_id
}
)
# 调用服务层
# 调用服务层生成建议
data = await emotion_service.generate_emotion_suggestions(
end_user_id=request.group_id,
db=db
)
# 保存到缓存
await emotion_service.save_suggestions_cache(
end_user_id=request.group_id,
suggestions_data=data,
db=db,
expires_hours=24
)
api_logger.info(
"个性化建议获取成功",
"个性化建议生成成功",
extra={
"group_id": request.group_id,
"suggestions_count": len(data.get("suggestions", []))
}
)
return success(data=data, msg="个性化建议获取成功")
return success(data=data, msg="个性化建议生成成功")
except Exception as e:
api_logger.error(
f"获取个性化建议失败: {str(e)}",
f"生成个性化建议失败: {str(e)}",
extra={"group_id": request.group_id},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取个性化建议失败: {str(e)}"
detail=f"生成个性化建议失败: {str(e)}"
)

View File

@@ -33,5 +33,12 @@ def get_workspace_list(
def get_system_version():
"""获取系统版本号+说明"""
current_version = settings.SYSTEM_VERSION
version_introduction = HomePageService.load_version_introduction(current_version)
return success(data={"version": current_version, "introduction": version_introduction}, msg="系统版本获取成功")
version_info = HomePageService.load_version_introduction(current_version)
return success(
data={
"version": current_version,
"introduction": version_info.get("introduction"),
"introduction_en": version_info.get("introduction_en")
},
msg="系统版本获取成功"
)

View File

@@ -11,6 +11,7 @@ from app.dependencies import (
)
from app.models.user_model import User
from app.schemas.response_schema import ApiResponse
from app.schemas.implicit_memory_schema import GenerateProfileRequest
from app.services.implicit_memory_service import ImplicitMemoryService
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
@@ -133,7 +134,7 @@ async def get_preference_tags(
current_user: User = Depends(get_current_user)
) -> ApiResponse:
"""
Get user preference tags with filtering options.
Get user preference tags from cache.
Args:
user_id: Target user ID
@@ -143,35 +144,56 @@ async def get_preference_tags(
end_date: Optional end date filter
Returns:
List of preference tags matching the filters
List of preference tags from cache
"""
api_logger.info(f"Preference tags requested for user: {user_id}")
api_logger.info(f"Preference tags requested for user: {user_id} (from cache)")
try:
# Validate inputs
validate_user_id(user_id)
validate_confidence_threshold(confidence_threshold)
validate_date_range(start_date, end_date)
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id)
# Build date range
date_range = None
if start_date and end_date:
from app.schemas.implicit_memory_schema import DateRange
date_range = DateRange(start_date=start_date, end_date=end_date)
# Get cached profile
cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
# Get preference tags
tags = await service.get_preference_tags(
user_id=user_id,
confidence_threshold=confidence_threshold,
tag_category=tag_category,
date_range=date_range
)
if cached_profile is None:
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
api_logger.info(f"Retrieved {len(tags)} preference tags for user: {user_id}")
return success(data=[tag.model_dump(mode='json') for tag in tags], msg="偏好标签获取成功")
# Extract preferences from cache
preferences = cached_profile.get("preferences", [])
# Apply filters (client-side filtering on cached data)
filtered_preferences = []
for pref in preferences:
# Filter by confidence threshold
if confidence_threshold is not None and pref.get("confidence_score", 0) < confidence_threshold:
continue
# Filter by category if specified
if tag_category and pref.get("category") != tag_category:
continue
# Filter by date range if specified
if start_date or end_date:
created_at_ts = pref.get("created_at")
if created_at_ts:
created_at = datetime.fromtimestamp(created_at_ts / 1000)
if start_date and created_at < start_date:
continue
if end_date and created_at > end_date:
continue
filtered_preferences.append(pref)
api_logger.info(f"Retrieved {len(filtered_preferences)} preference tags for user: {user_id} (from cache)")
return success(data=filtered_preferences, msg="偏好标签获取成功(缓存)")
except Exception as e:
return handle_implicit_memory_error(e, "偏好标签获取", user_id)
@@ -186,16 +208,16 @@ async def get_dimension_portrait(
current_user: User = Depends(get_current_user)
) -> ApiResponse:
"""
Get user's four-dimension personality portrait.
Get user's four-dimension personality portrait from cache.
Args:
user_id: Target user ID
include_history: Whether to include historical trend data
include_history: Whether to include historical trend data (ignored for cached data)
Returns:
Four-dimension personality portrait with scores and evidence
Four-dimension personality portrait from cache
"""
api_logger.info(f"Dimension portrait requested for user: {user_id}")
api_logger.info(f"Dimension portrait requested for user: {user_id} (from cache)")
try:
# Validate inputs
@@ -204,13 +226,22 @@ async def get_dimension_portrait(
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id)
portrait = await service.get_dimension_portrait(
user_id=user_id,
include_history=include_history
)
# Get cached profile
cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
api_logger.info(f"Dimension portrait retrieved for user: {user_id}")
return success(data=portrait.model_dump(mode='json'), msg="四维画像获取成功")
if cached_profile is None:
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
# Extract portrait from cache
portrait = cached_profile.get("portrait", {})
api_logger.info(f"Dimension portrait retrieved for user: {user_id} (from cache)")
return success(data=portrait, msg="四维画像获取成功(缓存)")
except Exception as e:
return handle_implicit_memory_error(e, "四维画像获取", user_id)
@@ -225,16 +256,16 @@ async def get_interest_area_distribution(
current_user: User = Depends(get_current_user)
) -> ApiResponse:
"""
Get user's interest area distribution across four areas.
Get user's interest area distribution from cache.
Args:
user_id: Target user ID
include_trends: Whether to include trend analysis data
include_trends: Whether to include trend analysis data (ignored for cached data)
Returns:
Interest area distribution with percentages and evidence
Interest area distribution from cache
"""
api_logger.info(f"Interest area distribution requested for user: {user_id}")
api_logger.info(f"Interest area distribution requested for user: {user_id} (from cache)")
try:
# Validate inputs
@@ -243,13 +274,22 @@ async def get_interest_area_distribution(
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id)
distribution = await service.get_interest_area_distribution(
user_id=user_id,
include_trends=include_trends
)
# Get cached profile
cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
api_logger.info(f"Interest area distribution retrieved for user: {user_id}")
return success(data=distribution.model_dump(mode='json'), msg="兴趣领域分布获取成功")
if cached_profile is None:
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
# Extract interest areas from cache
interest_areas = cached_profile.get("interest_areas", {})
api_logger.info(f"Interest area distribution retrieved for user: {user_id} (from cache)")
return success(data=interest_areas, msg="兴趣领域分布获取成功(缓存)")
except Exception as e:
return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id)
@@ -266,7 +306,7 @@ async def get_behavior_habits(
current_user: User = Depends(get_current_user)
) -> ApiResponse:
"""
Get user's behavioral habits with filtering options.
Get user's behavioral habits from cache.
Args:
user_id: Target user ID
@@ -275,38 +315,117 @@ async def get_behavior_habits(
time_period: Filter by time period (current, past)
Returns:
List of behavioral habits matching the filters
List of behavioral habits from cache
"""
api_logger.info(f"Behavior habits requested for user: {user_id}")
api_logger.info(f"Behavior habits requested for user: {user_id} (from cache)")
try:
# Validate inputs
validate_user_id(user_id)
# Convert string confidence level to numerical
numerical_confidence = None
if confidence_level:
confidence_mapping = {
"high": 85,
"medium": 50,
"low": 20
}
numerical_confidence = confidence_mapping.get(confidence_level.lower())
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id)
habits = await service.get_behavior_habits(
user_id=user_id,
confidence_level=numerical_confidence,
frequency_pattern=frequency_pattern,
time_period=time_period
)
# Get cached profile
cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
api_logger.info(f"Retrieved {len(habits)} behavior habits for user: {user_id}")
return success(data=[habit.model_dump(mode='json') for habit in habits], msg="行为习惯获取成功")
if cached_profile is None:
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
# Extract habits from cache
habits = cached_profile.get("habits", [])
# Apply filters (client-side filtering on cached data)
filtered_habits = []
for habit in habits:
# Filter by confidence level
if confidence_level:
confidence_mapping = {
"high": 85,
"medium": 50,
"low": 20
}
numerical_confidence = confidence_mapping.get(confidence_level.lower())
if habit.get("confidence_level", 0) < numerical_confidence:
continue
# Filter by frequency pattern
if frequency_pattern and habit.get("frequency_pattern") != frequency_pattern:
continue
# Filter by time period
if time_period:
is_current = habit.get("is_current", True)
if time_period.lower() == "current" and not is_current:
continue
elif time_period.lower() == "past" and is_current:
continue
filtered_habits.append(habit)
api_logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user: {user_id} (from cache)")
return success(data=filtered_habits, msg="行为习惯获取成功(缓存)")
except Exception as e:
return handle_implicit_memory_error(e, "行为习惯获取", user_id)
@router.post("/generate_profile", response_model=ApiResponse)
@cur_workspace_access_guard()
async def generate_implicit_memory_profile(
request: GenerateProfileRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> ApiResponse:
"""
Generate complete user profile (all 4 modules) and cache it.
Args:
request: Generate profile request with end_user_id
db: Database session
current_user: Current authenticated user
Returns:
Complete user profile with all modules
"""
end_user_id = request.end_user_id
api_logger.info(f"Generate profile requested for user: {end_user_id}")
try:
# Validate inputs
validate_user_id(end_user_id)
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
# Generate complete profile (calls LLM for all 4 modules)
api_logger.info(f"开始生成完整用户画像: user={end_user_id}")
profile_data = await service.generate_complete_profile(user_id=end_user_id)
# Save to cache
await service.save_profile_cache(
end_user_id=end_user_id,
profile_data=profile_data,
db=db,
expires_hours=168 # 7 days
)
api_logger.info(f"用户画像生成并缓存成功: user={end_user_id}")
# Add metadata
profile_data["end_user_id"] = end_user_id
profile_data["cached"] = False
return success(data=profile_data, msg="用户画像生成成功")
except Exception as e:
api_logger.error(f"生成用户画像失败: user={end_user_id}, error={str(e)}", exc_info=True)
return handle_implicit_memory_error(e, "用户画像生成", end_user_id)

View File

@@ -74,7 +74,7 @@ def get_multi_agent_configs(
"app_id": str(app_id),
"default_model_config_id": None,
"model_parameters": None,
"orchestration_mode": "conditional",
"orchestration_mode": "supervisor",
"sub_agents": [],
"routing_rules": [],
"execution_config": {

View File

@@ -466,7 +466,7 @@ async def chat(
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=str(new_end_user.id), # 转换为字符串
variables=payload.variables,
config= payload.agent_config,
config=agent_config,
web_search=payload.web_search,
memory=payload.memory,
storage_type=storage_type,
@@ -565,11 +565,12 @@ async def chat(
config = workflow_config_4_app_release(release)
if payload.stream:
async def event_generator():
async for event in app_chat_service.workflow_chat_stream(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=new_end_user.id, # 转换为字符串
user_id=end_user_id, # 转换为字符串
variables=payload.variables,
config=config,
web_search=payload.web_search,
@@ -601,7 +602,7 @@ async def chat(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=new_end_user.id, # 转换为字符串
user_id=end_user_id, # 转换为字符串
variables=payload.variables,
config=config,
web_search=payload.web_search,

View File

@@ -110,24 +110,24 @@ HTTP_MAPPING = {
BizCode.TOKEN_EXPIRED: 401,
BizCode.TOKEN_BLACKLISTED: 401,
BizCode.FORBIDDEN: 403,
BizCode.TENANT_NOT_FOUND: 404,
BizCode.TENANT_NOT_FOUND: 400,
BizCode.WORKSPACE_NO_ACCESS: 403,
BizCode.NOT_FOUND: 404,
BizCode.NOT_FOUND: 400,
BizCode.USER_NOT_FOUND: 200,
BizCode.WORKSPACE_NOT_FOUND: 404,
BizCode.MODEL_NOT_FOUND: 404,
BizCode.KNOWLEDGE_NOT_FOUND: 404,
BizCode.DOCUMENT_NOT_FOUND: 404,
BizCode.FILE_NOT_FOUND: 404,
BizCode.APP_NOT_FOUND: 404,
BizCode.RELEASE_NOT_FOUND: 404,
BizCode.WORKSPACE_NOT_FOUND: 400,
BizCode.MODEL_NOT_FOUND: 400,
BizCode.KNOWLEDGE_NOT_FOUND: 400,
BizCode.DOCUMENT_NOT_FOUND: 400,
BizCode.FILE_NOT_FOUND: 400,
BizCode.APP_NOT_FOUND: 400,
BizCode.RELEASE_NOT_FOUND: 400,
BizCode.DUPLICATE_NAME: 409,
BizCode.RESOURCE_ALREADY_EXISTS: 409,
BizCode.VERSION_ALREADY_EXISTS: 409,
BizCode.STATE_CONFLICT: 409,
BizCode.PUBLISH_FAILED: 500,
BizCode.NO_DRAFT_TO_PUBLISH: 400,
BizCode.ROLLBACK_TARGET_NOT_FOUND: 404,
BizCode.ROLLBACK_TARGET_NOT_FOUND: 400,
BizCode.APP_TYPE_NOT_SUPPORTED: 400,
BizCode.AGENT_CONFIG_MISSING: 400,
BizCode.SHARE_DISABLED: 403,

View File

@@ -96,10 +96,7 @@ class SimpleMCPClient:
"""初始化 SSE MCP 会话 - 参考 Dify 实现"""
try:
# 建立 SSE 连接
response = await self._session.get(
self.server_url,
headers={"Accept": "text/event-stream"}
)
response = await self._session.get(self.server_url)
if response.status != 200:
error_text = await response.text()

View File

@@ -27,6 +27,8 @@ from .tool_model import (
ToolExecution, ToolType, ToolStatus, AuthType, ExecutionStatus
)
from .memory_perceptual_model import MemoryPerceptualModel
from .emotion_suggestions_cache_model import EmotionSuggestionsCache
from .implicit_memory_cache_model import ImplicitMemoryCache
__all__ = [
"Tenants",
@@ -76,5 +78,7 @@ __all__ = [
"ToolStatus",
"AuthType",
"ExecutionStatus",
"MemoryPerceptualModel"
"MemoryPerceptualModel",
"EmotionSuggestionsCache",
"ImplicitMemoryCache"
]

View File

@@ -0,0 +1,24 @@
"""情绪建议缓存模型"""
import uuid
import datetime
from sqlalchemy import Column, String, Text, Integer, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID
from app.db import Base
class EmotionSuggestionsCache(Base):
"""情绪建议缓存表
用于缓存个性化情绪建议,减少 LLM 调用成本,提升响应速度。
"""
__tablename__ = "emotion_suggestions_cache"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID组ID")
health_summary = Column(Text, nullable=False, comment="健康状态摘要")
suggestions = Column(JSON, nullable=False, comment="建议列表JSON格式")
generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间")
expires_at = Column(DateTime, nullable=True, comment="过期时间")
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)

View File

@@ -0,0 +1,27 @@
"""隐性记忆缓存模型"""
import uuid
import datetime
from sqlalchemy import Column, String, Integer, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID
from app.db import Base
class ImplicitMemoryCache(Base):
"""隐性记忆缓存表
用于缓存用户的完整隐性记忆画像,包括偏好标签、四维画像、兴趣领域和行为习惯。
减少 LLM 调用成本,提升响应速度。
"""
__tablename__ = "implicit_memory_cache"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID")
preferences = Column(JSON, nullable=False, comment="偏好标签列表JSON格式")
portrait = Column(JSON, nullable=False, comment="四维画像对象JSON格式")
interest_areas = Column(JSON, nullable=False, comment="兴趣领域分布对象JSON格式")
habits = Column(JSON, nullable=False, comment="行为习惯列表JSON格式")
generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间")
expires_at = Column(DateTime, nullable=True, comment="过期时间")
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)

View File

@@ -81,7 +81,7 @@ class DataConfigRepository:
n.description AS description,
n.entity_type AS entity_type,
n.name AS name,
n.fact_summary AS fact_summary,
COALESCE(n.fact_summary, '') AS fact_summary,
n.group_id AS group_id,
n.apply_id AS apply_id,
n.user_id AS user_id,
@@ -115,7 +115,7 @@ class DataConfigRepository:
description: n.description,
entity_type: n.entity_type,
name: n.name,
fact_summary: n.fact_summary,
fact_summary: COALESCE(n.fact_summary, ''),
id: n.id
} AS sourceNode,
{
@@ -132,7 +132,7 @@ class DataConfigRepository:
description: m.description,
entity_type: m.entity_type,
name: m.name,
fact_summary: m.fact_summary,
fact_summary: COALESCE(m.fact_summary, ''),
id: m.id
} AS targetNode
"""

View File

@@ -0,0 +1,163 @@
"""情绪建议缓存仓储层"""
from sqlalchemy.orm import Session
from typing import Optional, Dict, Any
import datetime
from app.models.emotion_suggestions_cache_model import EmotionSuggestionsCache
from app.core.logging_config import get_db_logger
# 获取数据库专用日志器
db_logger = get_db_logger()
class EmotionSuggestionsCacheRepository:
"""情绪建议缓存仓储类"""
def __init__(self, db: Session):
self.db = db
def get_by_end_user_id(self, end_user_id: str) -> Optional[EmotionSuggestionsCache]:
"""根据终端用户ID获取缓存
Args:
end_user_id: 终端用户ID组ID
Returns:
缓存记录,如果不存在返回 None
"""
try:
cache = (
self.db.query(EmotionSuggestionsCache)
.filter(EmotionSuggestionsCache.end_user_id == end_user_id)
.first()
)
if cache:
db_logger.info(f"成功获取用户 {end_user_id} 的情绪建议缓存")
else:
db_logger.info(f"用户 {end_user_id} 的情绪建议缓存不存在")
return cache
except Exception as e:
db_logger.error(f"获取用户 {end_user_id} 的情绪建议缓存失败: {str(e)}")
raise
def create_or_update(
self,
end_user_id: str,
health_summary: str,
suggestions: list,
expires_hours: int = 24
) -> EmotionSuggestionsCache:
"""创建或更新缓存
Args:
end_user_id: 终端用户ID组ID
health_summary: 健康状态摘要
suggestions: 建议列表
expires_hours: 过期时间小时默认24小时
Returns:
缓存记录
"""
try:
# 查找现有记录
cache = self.get_by_end_user_id(end_user_id)
now = datetime.datetime.now()
expires_at = now + datetime.timedelta(hours=expires_hours)
if cache:
# 更新现有记录
cache.health_summary = health_summary
cache.suggestions = suggestions
cache.generated_at = now
cache.expires_at = expires_at
cache.updated_at = now
db_logger.info(f"更新用户 {end_user_id} 的情绪建议缓存")
else:
# 创建新记录
cache = EmotionSuggestionsCache(
end_user_id=end_user_id,
health_summary=health_summary,
suggestions=suggestions,
generated_at=now,
expires_at=expires_at,
created_at=now,
updated_at=now
)
self.db.add(cache)
db_logger.info(f"创建用户 {end_user_id} 的情绪建议缓存")
self.db.commit()
self.db.refresh(cache)
return cache
except Exception as e:
self.db.rollback()
db_logger.error(f"创建或更新用户 {end_user_id} 的情绪建议缓存失败: {str(e)}")
raise
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除缓存
Args:
end_user_id: 终端用户ID组ID
Returns:
是否删除成功
"""
try:
cache = self.get_by_end_user_id(end_user_id)
if cache:
self.db.delete(cache)
self.db.commit()
db_logger.info(f"删除用户 {end_user_id} 的情绪建议缓存")
return True
return False
except Exception as e:
self.db.rollback()
db_logger.error(f"删除用户 {end_user_id} 的情绪建议缓存失败: {str(e)}")
raise
@staticmethod
def is_expired(cache: EmotionSuggestionsCache) -> bool:
"""检查缓存是否过期
Args:
cache: 缓存记录
Returns:
是否过期
"""
if cache.expires_at is None:
return False
return datetime.datetime.now() > cache.expires_at
# 便捷函数
def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[EmotionSuggestionsCache]:
"""根据终端用户ID获取缓存"""
repo = EmotionSuggestionsCacheRepository(db)
return repo.get_by_end_user_id(end_user_id)
def create_or_update_cache(
db: Session,
end_user_id: str,
health_summary: str,
suggestions: list,
expires_hours: int = 24
) -> EmotionSuggestionsCache:
"""创建或更新缓存"""
repo = EmotionSuggestionsCacheRepository(db)
return repo.create_or_update(end_user_id, health_summary, suggestions, expires_hours)
def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool:
"""删除缓存"""
repo = EmotionSuggestionsCacheRepository(db)
return repo.delete_by_end_user_id(end_user_id)
def is_cache_expired(cache: EmotionSuggestionsCache) -> bool:
"""检查缓存是否过期"""
return EmotionSuggestionsCacheRepository.is_expired(cache)

View File

@@ -0,0 +1,175 @@
"""隐性记忆缓存仓储层"""
from sqlalchemy.orm import Session
from typing import Optional, Dict, Any
import datetime
from app.models.implicit_memory_cache_model import ImplicitMemoryCache
from app.core.logging_config import get_db_logger
# 获取数据库专用日志器
db_logger = get_db_logger()
class ImplicitMemoryCacheRepository:
"""隐性记忆缓存仓储类"""
def __init__(self, db: Session):
self.db = db
def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitMemoryCache]:
"""根据终端用户ID获取缓存
Args:
end_user_id: 终端用户ID
Returns:
缓存记录,如果不存在返回 None
"""
try:
cache = (
self.db.query(ImplicitMemoryCache)
.filter(ImplicitMemoryCache.end_user_id == end_user_id)
.first()
)
if cache:
db_logger.info(f"成功获取用户 {end_user_id} 的隐性记忆缓存")
else:
db_logger.info(f"用户 {end_user_id} 的隐性记忆缓存不存在")
return cache
except Exception as e:
db_logger.error(f"获取用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}")
raise
def create_or_update(
self,
end_user_id: str,
preferences: list,
portrait: dict,
interest_areas: dict,
habits: list,
expires_hours: int = 168 # 默认7天
) -> ImplicitMemoryCache:
"""创建或更新缓存
Args:
end_user_id: 终端用户ID
preferences: 偏好标签列表
portrait: 四维画像对象
interest_areas: 兴趣领域分布对象
habits: 行为习惯列表
expires_hours: 过期时间小时默认168小时7天
Returns:
缓存记录
"""
try:
# 查找现有记录
cache = self.get_by_end_user_id(end_user_id)
now = datetime.datetime.now()
expires_at = now + datetime.timedelta(hours=expires_hours)
if cache:
# 更新现有记录
cache.preferences = preferences
cache.portrait = portrait
cache.interest_areas = interest_areas
cache.habits = habits
cache.generated_at = now
cache.expires_at = expires_at
cache.updated_at = now
db_logger.info(f"更新用户 {end_user_id} 的隐性记忆缓存")
else:
# 创建新记录
cache = ImplicitMemoryCache(
end_user_id=end_user_id,
preferences=preferences,
portrait=portrait,
interest_areas=interest_areas,
habits=habits,
generated_at=now,
expires_at=expires_at,
created_at=now,
updated_at=now
)
self.db.add(cache)
db_logger.info(f"创建用户 {end_user_id} 的隐性记忆缓存")
self.db.commit()
self.db.refresh(cache)
return cache
except Exception as e:
self.db.rollback()
db_logger.error(f"创建或更新用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}")
raise
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除缓存
Args:
end_user_id: 终端用户ID
Returns:
是否删除成功
"""
try:
cache = self.get_by_end_user_id(end_user_id)
if cache:
self.db.delete(cache)
self.db.commit()
db_logger.info(f"删除用户 {end_user_id} 的隐性记忆缓存")
return True
return False
except Exception as e:
self.db.rollback()
db_logger.error(f"删除用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}")
raise
@staticmethod
def is_expired(cache: ImplicitMemoryCache) -> bool:
"""检查缓存是否过期
Args:
cache: 缓存记录
Returns:
是否过期
"""
if cache.expires_at is None:
return False
return datetime.datetime.now() > cache.expires_at
# 便捷函数
def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[ImplicitMemoryCache]:
"""根据终端用户ID获取缓存"""
repo = ImplicitMemoryCacheRepository(db)
return repo.get_by_end_user_id(end_user_id)
def create_or_update_cache(
db: Session,
end_user_id: str,
preferences: list,
portrait: dict,
interest_areas: dict,
habits: list,
expires_hours: int = 168
) -> ImplicitMemoryCache:
"""创建或更新缓存"""
repo = ImplicitMemoryCacheRepository(db)
return repo.create_or_update(
end_user_id, preferences, portrait, interest_areas, habits, expires_hours
)
def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool:
"""删除缓存"""
repo = ImplicitMemoryCacheRepository(db)
return repo.delete_by_end_user_id(end_user_id)
def is_cache_expired(cache: ImplicitMemoryCache) -> bool:
"""检查缓存是否过期"""
return ImplicitMemoryCacheRepository.is_expired(cache)

View File

@@ -25,7 +25,7 @@ class MemoryIncrementRepository:
MemoryIncrement,
func.row_number().over(
partition_by=func.date(MemoryIncrement.created_at), # 按日期分区
order_by=MemoryIncrement.created_at.desc() # 按时间戳序排序
order_by=MemoryIncrement.created_at.desc() # 按时间戳序排序,取每天最新的
).label('row_num')
)
.filter(MemoryIncrement.workspace_id == workspace_id)
@@ -34,14 +34,24 @@ class MemoryIncrementRepository:
memory_increment_alias = aliased(MemoryIncrement, subquery)
memory_increments = (
# 先取最近的limit条记录的子查询
recent_records_subquery = (
self.db.query(memory_increment_alias)
.filter(subquery.c.row_num == 1) # 只取每个日期的第一条(最新的)
.order_by(memory_increment_alias.created_at.asc()) # 按时间戳降序排序
.order_by(memory_increment_alias.created_at.desc()) # 按时间戳降序排序,取最近的
.limit(limit)
.subquery()
)
# 在外层按升序排列(从旧到新)
recent_alias = aliased(MemoryIncrement, recent_records_subquery)
memory_increments = (
self.db.query(recent_alias)
.order_by(recent_alias.created_at.asc()) # 按时间戳升序排序
.all()
)
db_logger.info(f"成功查询工作空间 {workspace_id} 下的内存增量")
db_logger.info(f"成功查询工作空间 {workspace_id} 下的内存增量,返回最近 {len(memory_increments)} 条记录")
return memory_increments
except Exception as e:
db_logger.error(f"查询工作空间 {workspace_id} 下内存增量时出错: {str(e)}")

View File

@@ -332,7 +332,7 @@ RETURN e.id AS id,
e.description AS description,
e.aliases AS aliases,
e.name_embedding AS name_embedding,
e.fact_summary AS fact_summary,
COALESCE(e.fact_summary, '') AS fact_summary,
e.connect_strength AS connect_strength,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT c.id) AS chunk_ids,

View File

@@ -30,3 +30,9 @@ class EmotionSuggestionsRequest(BaseModel):
"""获取个性化情绪建议请求"""
group_id: str = Field(..., description="组ID")
config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型")
class EmotionGenerateSuggestionsRequest(BaseModel):
"""生成个性化情绪建议请求"""
group_id: str = Field(..., description="组ID")
config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型")

View File

@@ -262,3 +262,25 @@ InterestCategory = InterestCategoryResponse
InterestAreaDistribution = InterestAreaDistributionResponse
BehaviorHabit = BehaviorHabitResponse
UserProfile = UserProfileResponse
# Cache-related Schemas
class GenerateProfileRequest(BaseModel):
"""生成完整用户画像请求"""
end_user_id: str = Field(..., description="终端用户ID")
class CompleteProfileResponse(BaseModel):
"""完整用户画像响应(包含所有模块)"""
user_id: str
preferences: List[PreferenceTagResponse]
portrait: DimensionPortraitResponse
interest_areas: InterestAreaDistributionResponse
habits: List[BehaviorHabitResponse]
generated_at: datetime.datetime
cached: bool = Field(False, description="是否来自缓存")
@field_serializer("generated_at", when_used="json")
def _serialize_generated_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None

View File

@@ -558,7 +558,7 @@ class AppChatService:
config: WorkflowConfig,
app_id: uuid.UUID,
workspace_id: uuid.UUID,
user_id: Optional[str] = None,
user_id: str = None,
variables: Optional[Dict[str, Any]] = None,
web_search: bool = False,
memory: bool = True,

View File

@@ -516,8 +516,16 @@ class ConversationService:
conversation_messages = self.get_conversation_history(
conversation_id=conversation_id,
max_history=30
max_history=20
)
if len(conversation_messages) == 0:
return ConversationOut(
theme="",
question=[],
summary="",
takeaways=[],
info_score=0,
)
with open('app/services/prompt/conversation_summary_system.jinja2', 'r', encoding='utf-8') as f:
system_prompt = f.read()
@@ -536,6 +544,7 @@ class ConversationService:
]
logger.info(f"Invoking LLM for conversation_id={conversation_id}")
model_resp = await llm.ainvoke(messages)
try:
if isinstance(model_resp.content, str):
result = json_repair.repair_json(model_resp.content, return_objects=True)

View File

@@ -245,7 +245,8 @@ class DraftRunService:
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
web_search: bool = True,
memory: bool = True
memory: bool = True,
sub_agent: bool = False
) -> Dict[str, Any]:
"""执行试运行(使用 LangChain Agent
@@ -435,7 +436,7 @@ class DraftRunService:
elapsed_time = time.time() - start_time
# 8. 保存会话消息
if agent_config.memory and agent_config.memory.get("enabled"):
if not sub_agent and agent_config.memory and agent_config.memory.get("enabled"):
await self._save_conversation_message(
conversation_id=conversation_id,
user_message=message,

View File

@@ -705,3 +705,85 @@ class EmotionAnalyticsService:
health_summary=summary,
suggestions=suggestions
)
async def get_cached_suggestions(
self,
end_user_id: str,
db: Session,
) -> Optional[Dict[str, Any]]:
"""从缓存获取个性化情绪建议
Args:
end_user_id: 宿主ID用户组ID
db: 数据库会话
Returns:
Dict: 缓存的建议数据,如果不存在或已过期返回 None
"""
try:
from app.repositories.emotion_suggestions_cache_repository import (
EmotionSuggestionsCacheRepository,
)
logger.info(f"尝试从缓存获取情绪建议: user={end_user_id}")
cache_repo = EmotionSuggestionsCacheRepository(db)
cache = cache_repo.get_by_end_user_id(end_user_id)
if cache is None:
logger.info(f"用户 {end_user_id} 的建议缓存不存在")
return None
# 检查是否过期
if cache_repo.is_expired(cache):
logger.info(f"用户 {end_user_id} 的建议缓存已过期")
return None
logger.info(f"成功从缓存获取建议: user={end_user_id}")
return {
"health_summary": cache.health_summary,
"suggestions": cache.suggestions,
"generated_at": cache.generated_at.isoformat(),
"cached": True
}
except Exception as e:
logger.error(f"从缓存获取建议失败: {str(e)}", exc_info=True)
return None
async def save_suggestions_cache(
self,
end_user_id: str,
suggestions_data: Dict[str, Any],
db: Session,
expires_hours: int = 24
) -> None:
"""保存建议到缓存
Args:
end_user_id: 宿主ID用户组ID
suggestions_data: 建议数据
db: 数据库会话
expires_hours: 过期时间(小时)
"""
try:
from app.repositories.emotion_suggestions_cache_repository import (
EmotionSuggestionsCacheRepository,
)
logger.info(f"保存建议到缓存: user={end_user_id}")
cache_repo = EmotionSuggestionsCacheRepository(db)
cache_repo.create_or_update(
end_user_id=end_user_id,
health_summary=suggestions_data["health_summary"],
suggestions=suggestions_data["suggestions"],
expires_hours=expires_hours
)
logger.info(f"建议缓存保存成功: user={end_user_id}")
except Exception as e:
logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True)
# 不抛出异常,缓存失败不应影响主流程

View File

@@ -11,6 +11,22 @@ from app.repositories.home_page_repository import HomePageRepository
from app.schemas.home_page_schema import HomeStatistics, WorkspaceInfo
class HomePageService:
DEFAULT_RETURN_DATA: Dict[str, Any] = {
"message": "",
"introduction": {
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
},
"introduction_en": {
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
}
@staticmethod
def get_home_statistics(db: Session, tenant_id: UUID) -> HomeStatistics:
@@ -82,60 +98,36 @@ class HomePageService:
:param version: 系统版本号(如 "0.2.0"
:return: 对应版本的详细介绍
"""
# 1. 定义 JSON 文件路径(使用 Path 处理跨平台路径问题
json_file_path = Path(__file__).parent.parent / "version_info.json"
# 转换为绝对路径,便于调试
json_abs_path = json_file_path.resolve()
# 2. 定义 JSON 文件路径(简化路径处理,保留绝对路径调试特性
json_abs_path = Path(__file__).parent.parent / "version_info.json"
json_abs_path = json_abs_path.resolve()
# 3. 初始化返回结果(深拷贝默认模板,避免修改原常量)
from copy import deepcopy
result = deepcopy(HomePageService.DEFAULT_RETURN_DATA)
try:
# 2. 读取 JSON 文件
# 4. 简化文件存在性判断(合并逻辑,减少分支)
if not json_abs_path.exists():
return {
"message": f"版本介绍文件不存在:{json_abs_path}",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
result["message"] = f"版本介绍文件不存在:{json_abs_path}"
return result
# 5. 读取并解析 JSON 文件(简化文件操作流程)
with open(json_abs_path, "r", encoding="utf-8") as f:
changelogs = json.load(f)
# 3. 匹配对应版本的介绍,若版本不存在返回默认提示
if version not in changelogs:
return {
"message": f"暂未查询到 {version} 版本的详细介绍",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
return changelogs[version]
# 6. 简化版本匹配逻辑,直接返回结果或更新提示信息
if version in changelogs:
return changelogs[version]
result["message"] = f"暂未查询到 {version} 版本的详细介绍"
return result
except FileNotFoundError as e:
# 处理文件不存在异常
return {
"message": f"系统内部错误:{str(e)}",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
result["message"] = f"系统内部错误:{str(e)}"
return result
except json.JSONDecodeError:
# 处理 JSON 格式错误
return {
"message": "版本介绍文件格式错误,无法解析 JSON",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
result["message"] = "版本介绍文件格式错误,无法解析 JSON"
return result
except Exception as e:
# 处理其他未知异常
return {
"message": f"加载版本介绍失败:{str(e)}",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
result["message"] = f"加载版本介绍失败:{str(e)}"
return result

View File

@@ -7,6 +7,7 @@ user profiles from memory summaries.
"""
import logging
import asyncio
from datetime import datetime
from typing import List, Optional
@@ -372,4 +373,129 @@ class ImplicitMemoryService:
except Exception as e:
logger.error(f"Failed to get behavior habits for user {user_id}: {e}")
raise
async def generate_complete_profile(
self,
user_id: str
) -> dict:
"""生成完整的用户画像包含所有4个模块
Args:
user_id: 用户ID
Returns:
Dict: 包含所有模块的完整画像数据
"""
logger.info(f"生成完整用户画像: user={user_id}")
try:
# 并行调用4个分析方法
preferences, portrait, interest_areas, habits = await asyncio.gather(
self.get_preference_tags(user_id=user_id),
self.get_dimension_portrait(user_id=user_id),
self.get_interest_area_distribution(user_id=user_id),
self.get_behavior_habits(user_id=user_id)
)
# 转换为可序列化的格式
profile_data = {
"preferences": [tag.model_dump(mode='json') for tag in preferences],
"portrait": portrait.model_dump(mode='json'),
"interest_areas": interest_areas.model_dump(mode='json'),
"habits": [habit.model_dump(mode='json') for habit in habits]
}
logger.info(f"完整用户画像生成完成: user={user_id}")
return profile_data
except Exception as e:
logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True)
raise
async def get_cached_profile(
self,
end_user_id: str,
db: Session
) -> Optional[dict]:
"""从缓存获取完整用户画像
Args:
end_user_id: 终端用户ID
db: 数据库会话
Returns:
Dict: 缓存的画像数据,如果不存在或已过期返回 None
"""
try:
from app.repositories.implicit_memory_cache_repository import (
ImplicitMemoryCacheRepository,
)
logger.info(f"尝试从缓存获取用户画像: user={end_user_id}")
cache_repo = ImplicitMemoryCacheRepository(db)
cache = cache_repo.get_by_end_user_id(end_user_id)
if cache is None:
logger.info(f"用户 {end_user_id} 的画像缓存不存在")
return None
# 检查是否过期
if cache_repo.is_expired(cache):
logger.info(f"用户 {end_user_id} 的画像缓存已过期")
return None
logger.info(f"成功从缓存获取用户画像: user={end_user_id}")
return {
"end_user_id": cache.end_user_id,
"preferences": cache.preferences,
"portrait": cache.portrait,
"interest_areas": cache.interest_areas,
"habits": cache.habits,
"generated_at": cache.generated_at.isoformat(),
"cached": True
}
except Exception as e:
logger.error(f"从缓存获取用户画像失败: {str(e)}", exc_info=True)
return None
async def save_profile_cache(
self,
end_user_id: str,
profile_data: dict,
db: Session,
expires_hours: int = 168 # 默认7天
) -> None:
"""保存用户画像到缓存
Args:
end_user_id: 终端用户ID
profile_data: 画像数据
db: 数据库会话
expires_hours: 过期时间小时默认168小时7天
"""
try:
from app.repositories.implicit_memory_cache_repository import (
ImplicitMemoryCacheRepository,
)
logger.info(f"保存用户画像到缓存: user={end_user_id}")
cache_repo = ImplicitMemoryCacheRepository(db)
cache_repo.create_or_update(
end_user_id=end_user_id,
preferences=profile_data["preferences"],
portrait=profile_data["portrait"],
interest_areas=profile_data["interest_areas"],
habits=profile_data["habits"],
expires_hours=expires_hours
)
logger.info(f"用户画像缓存保存成功: user={end_user_id}")
except Exception as e:
logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True)
# 不抛出异常,缓存失败不应影响主流程

View File

@@ -9,30 +9,30 @@ import os
import re
import time
import uuid
from threading import Lock
from typing import Any, AsyncGenerator, Dict, List, Optional
import redis
from langchain_core.messages import HumanMessage
from app.core.config import settings
from app.core.logging_config import get_config_logger, get_logger
from app.core.memory.agent.langgraph_graph.read_graph import make_read_graph
from app.core.memory.agent.langgraph_graph.write_graph import make_write_graph
from app.core.memory.agent.logger_file.log_streamer import LogStreamer
from app.core.memory.agent.utils.messages_tools import merge_multiple_search_results, reorder_output_results
from app.core.memory.agent.utils.mcp_tools import get_mcp_server_config
from app.core.memory.agent.utils.type_classifier import status_typle
from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags
from app.repositories.memory_short_repository import ShortTermMemoryRepository
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from app.models.knowledge_model import Knowledge, KnowledgeType
from app.repositories.memory_short_repository import ShortTermMemoryRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_konwledges_server import (
write_rag,
)
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from pydantic import BaseModel, Field
from sqlalchemy import func
from sqlalchemy.orm import Session
@@ -51,20 +51,20 @@ _neo4j_connector = Neo4jConnector()
class MemoryAgentService:
"""Service for memory agent operations"""
def __init__(self):
self.user_locks: Dict[str, Lock] = {}
self.locks_lock = Lock()
def writer_messages_deal(self, messages, start_time, group_id, config_id, message, context):
def writer_messages_deal(self,messages,start_time,group_id,config_id,message):
messages = str(messages).replace("'", '"').replace('\\n', '').replace('\n', '').replace('\\', '')
countext = re.findall(r'"status": "(.*?)",', messages)[0]
duration = time.time() - start_time
if str(messages) == 'success':
if countext == 'success':
logger.info(f"Write operation successful for group {group_id} with config_id {config_id}")
# 记录成功的操作
if audit_logger:
audit_logger.log_operation(operation="WRITE", config_id=config_id, group_id=group_id, success=True,
duration=duration, details={"message_length": len(message)})
return context
return countext
else:
logger.warning(f"Write operation failed for group {group_id}")
@@ -81,12 +81,7 @@ class MemoryAgentService:
raise ValueError(f"写入失败: {messages}")
def get_group_lock(self, group_id: str) -> Lock:
"""Get lock for specific group to prevent concurrent processing"""
with self.locks_lock:
if group_id not in self.user_locks:
self.user_locks[group_id] = Lock()
return self.user_locks[group_id]
def extract_tool_call_info(self, event: Dict) -> bool:
"""Extract tool call information from event"""
@@ -148,26 +143,8 @@ class MemoryAgentService:
else:
status = "unknown"
# Add database connection pool status
try:
from app.db import get_pool_status
pool_status = get_pool_status()
logger.info(f"Database pool status: {pool_status}")
# Check if pool usage is too high
if pool_status.get("usage_percent", 0) > 80:
logger.warning(f"High database pool usage: {pool_status['usage_percent']}%")
status = "warning"
except Exception as e:
logger.error(f"Failed to get pool status: {e}")
pool_status = {"error": str(e)}
logger.info(f"Health status: {status}")
return {
"status": status,
"database_pool": pool_status
}
return {"status": status}
def get_log_content(self) -> str:
"""
@@ -324,42 +301,54 @@ class MemoryAgentService:
audit_logger.log_operation(operation="WRITE", config_id=config_id, group_id=group_id, success=False, duration=duration, error=error_msg)
raise ValueError(error_msg)
mcp_config = get_mcp_server_config()
client = MultiServerMCPClient(mcp_config)
if storage_type == "rag":
result = await write_rag(group_id, message, user_rag_memory_id)
return result
else:
async with client.session("data_flow") as session:
logger.debug("Connected to MCP Server: data_flow")
tools = await load_mcp_tools(session)
workflow_errors = [] # Track errors from workflow
# Pass memory_config to the graph workflow
async with make_write_graph(group_id, tools, group_id, group_id, memory_config=memory_config) as graph:
logger.debug("Write graph created successfully")
try:
if storage_type == "rag":
result = await write_rag(group_id, message, user_rag_memory_id)
return result
else:
async with make_write_graph() as graph:
config = {"configurable": {"thread_id": group_id}}
# 初始状态 - 包含所有必要字段
initial_state = {"messages": [HumanMessage(content=message)], "group_id": group_id,
"memory_config": memory_config}
# 获取节点更新信息
async for update_event in graph.astream(
initial_state,
stream_mode="updates",
async for event in graph.astream(
{"messages": message, "memory_config": memory_config, "errors": []},
stream_mode="values",
config=config
):
for node_name, node_data in update_event.items():
if 'save_neo4j' == node_name:
massages = node_data
massagesstatus = massages.get('write_result')['status']
contents = massages.get('write_result')
return self.writer_messages_deal(massagesstatus, start_time, group_id, config_id, message, contents)
except Exception as e:
# Ensure proper error handling and logging
error_msg = f"Write operation failed: {str(e)}"
logger.error(error_msg)
if audit_logger:
duration = time.time() - start_time
audit_logger.log_operation(operation="WRITE", config_id=config_id, group_id=group_id, success=False, duration=duration, error=error_msg)
raise ValueError(error_msg)
messages = event.get('messages')
# Capture any errors from the state
if event.get('errors'):
workflow_errors.extend(event.get('errors', []))
# Check for workflow errors
if workflow_errors:
error_details = "; ".join([f"{e['tool']}: {e['error']}" for e in workflow_errors])
logger.error(f"Write workflow failed with errors: {error_details}")
if audit_logger:
duration = time.time() - start_time
audit_logger.log_operation(
operation="WRITE",
config_id=config_id,
group_id=group_id,
success=False,
duration=duration,
error=error_details
)
raise ValueError(f"Write workflow failed: {error_details}")
return self.writer_messages_deal(messages, start_time, group_id, config_id, message)
async def read_memory(
self,
group_id: str,
@@ -398,9 +387,8 @@ class MemoryAgentService:
import time
start_time = time.time()
end_user_id=group_id
ori_message=message
end_user_id=group_id
# Resolve config_id if None using end_user's connected config
if config_id is None:
try:
@@ -413,169 +401,336 @@ class MemoryAgentService:
raise # Re-raise our specific error
logger.error(f"Failed to get connected config for end_user {group_id}: {e}")
raise ValueError(f"Unable to determine memory configuration for end_user {group_id}: {e}")
logger.info(f"Read operation for group {group_id} with config_id {config_id}")
# 导入审计日志记录器
try:
from app.core.memory.utils.log.audit_logger import audit_logger
except ImportError:
audit_logger = None
# Get group lock to prevent concurrent processing
group_lock = self.get_group_lock(group_id)
with group_lock:
# Step 1: Load configuration from database only
try:
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
try:
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
service_name="MemoryAgentService"
)
logger.info(f"Configuration loaded successfully: {memory_config.config_name}")
except ConfigurationError as e:
error_msg = f"Failed to load configuration for config_id: {config_id}: {e}"
logger.error(error_msg)
# Log failed operation
if audit_logger:
duration = time.time() - start_time
audit_logger.log_operation(
operation="READ",
config_id=config_id,
service_name="MemoryAgentService"
group_id=group_id,
success=False,
duration=duration,
error=error_msg
)
logger.info(f"Configuration loaded successfully: {memory_config.config_name}")
except ConfigurationError as e:
error_msg = f"Failed to load configuration for config_id: {config_id}: {e}"
logger.error(error_msg)
# Log failed operation
if audit_logger:
duration = time.time() - start_time
audit_logger.log_operation(
operation="READ",
config_id=config_id,
group_id=group_id,
success=False,
duration=duration,
error=error_msg
)
raise ValueError(error_msg)
# Step 2: Prepare history
history.append({"role": "user", "content": message})
logger.debug(f"Group ID:{group_id}, Message:{message}, History:{history}, Config ID:{config_id}")
# Step 3: Initialize MCP client and execute read workflow
try:
async with make_read_graph() as graph:
config = {"configurable": {"thread_id": group_id}}
# 初始状态 - 包含所有必要字段
initial_state = {"messages": [HumanMessage(content=message)], "search_switch": search_switch,
"group_id": group_id
, "storage_type": storage_type, "user_rag_memory_id": user_rag_memory_id,
"memory_config": memory_config}
# 获取节点更新信息
_intermediate_outputs = []
summary = ''
async for update_event in graph.astream(
initial_state,
stream_mode="updates",
config=config
):
for node_name, node_data in update_event.items():
# 处理不同Summary节点的返回结构
if 'Summary' in node_name:
if 'InputSummary' in node_data and 'summary_result' in node_data['InputSummary']:
summary = node_data['InputSummary']['summary_result']
elif 'RetrieveSummary' in node_data and 'summary_result' in node_data['RetrieveSummary']:
summary = node_data['RetrieveSummary']['summary_result']
elif 'summary' in node_data and 'summary_result' in node_data['summary']:
summary = node_data['summary']['summary_result']
elif 'SummaryFails' in node_data and 'summary_result' in node_data['SummaryFails']:
summary = node_data['SummaryFails']['summary_result']
raise ValueError(error_msg)
spit_data = node_data.get('spit_data', {}).get('_intermediate', None)
if spit_data and spit_data != [] and spit_data != {}:
_intermediate_outputs.append(spit_data)
# Step 2: Prepare history
history.append({"role": "user", "content": message})
logger.debug(f"Group ID:{group_id}, Message:{message}, History:{history}, Config ID:{config_id}")
# Problem_Extension 节点
problem_extension = node_data.get('problem_extension', {}).get('_intermediate', None)
if problem_extension and problem_extension != [] and problem_extension != {}:
_intermediate_outputs.append(problem_extension)
# Step 3: Initialize MCP client and execute read workflow
mcp_config = get_mcp_server_config()
client = MultiServerMCPClient(mcp_config)
# Retrieve 节点
retrieve_node = node_data.get('retrieve', {}).get('_intermediate_outputs', None)
if retrieve_node and retrieve_node != [] and retrieve_node != {}:
_intermediate_outputs.extend(retrieve_node)
async with client.session('data_flow') as session:
session_start = time.time()
logger.debug("Connected to MCP Server: data_flow")
# Verify 节点
verify_n = node_data.get('verify', {}).get('_intermediate', None)
if verify_n and verify_n != [] and verify_n != {}:
_intermediate_outputs.append(verify_n)
tools_start = time.time()
tools = await load_mcp_tools(session)
tools_time = time.time() - tools_start
logger.info(f"[PERF] MCP tools loading took: {tools_time:.4f}s")
# Summary 节点
summary_n = node_data.get('summary', {}).get('_intermediate', None)
if summary_n and summary_n != [] and summary_n != {}:
_intermediate_outputs.append(summary_n)
outputs = []
intermediate_outputs = []
seen_intermediates = set() # Track seen intermediate outputs to avoid duplicates
_intermediate_outputs = [item for item in _intermediate_outputs if item and item != [] and item != {}]
# Pass memory_config to the graph workflow
graph_start = time.time()
async with make_read_graph(group_id, tools, search_switch, group_id, group_id, memory_config=memory_config, storage_type=storage_type, user_rag_memory_id=user_rag_memory_id) as graph:
graph_init_time = time.time() - graph_start
logger.info(f"[PERF] Graph initialization took: {graph_init_time:.4f}s")
optimized_outputs = merge_multiple_search_results(_intermediate_outputs)
result = reorder_output_results(optimized_outputs)
start = time.time()
config = {"configurable": {"thread_id": group_id}}
workflow_errors = [] # Track errors from workflow
# Log successful operation
if audit_logger:
duration = time.time() - start_time
audit_logger.log_operation(
operation="READ",
config_id=config_id,
group_id=group_id,
success=True,
duration=duration
)
event_count = 0
async for event in graph.astream(
{"messages": history, "memory_config": memory_config, "errors": []},
stream_mode="values",
config=config
):
event_count += 1
event_start = time.time()
messages = event.get('messages')
# Capture any errors from the state
if event.get('errors'):
workflow_errors.extend(event.get('errors', []))
retrieved_content = []
repo = ShortTermMemoryRepository(db)
if str(search_switch).strip() != "2":
for intermediate in result:
intermediate_type = intermediate['type']
if intermediate_type == "search_result":
query = intermediate['query']
raw_results = intermediate['raw_results']
reranked_results = raw_results.get('reranked_results', [])
for msg in messages:
msg_content = msg.content
msg_role = msg.__class__.__name__.lower().replace("message", "")
outputs.append({
"role": msg_role,
"content": msg_content
})
# Extract intermediate outputs
if hasattr(msg, 'content'):
try:
statements = [statement['statement'] for statement in
reranked_results.get('statements', [])]
except Exception:
statements = []
statements = list(set(statements))
retrieved_content.append({query: statements})
if retrieved_content == []:
retrieved_content = ''
if '信息不足,无法回答。' != str(summary) and str(search_switch).strip() != "2": # and retrieved_content!=[]
# 使用 upsert 方法
repo.upsert(
end_user_id=end_user_id, # 确保这个变量在作用域内
messages=ori_message,
aimessages=summary,
retrieved_content=retrieved_content,
search_switch=str(search_switch)
)
print("写入成功")
# Handle MCP content format: [{'type': 'text', 'text': '...'}]
content_to_parse = msg_content
if isinstance(msg_content, list):
for block in msg_content:
if isinstance(block, dict) and block.get('type') == 'text':
content_to_parse = block.get('text', '')
break
else:
continue # No text block found
return {
"answer": summary,
"intermediate_outputs": result
# Try to parse content as JSON
if isinstance(content_to_parse, str):
try:
parsed = json.loads(content_to_parse)
if isinstance(parsed, dict):
# Check for single intermediate output
if '_intermediate' in parsed:
intermediate_data = parsed['_intermediate']
output_key = self._create_intermediate_key(intermediate_data)
if output_key not in seen_intermediates:
seen_intermediates.add(output_key)
intermediate_outputs.append(self._format_intermediate_output(intermediate_data))
# Check for multiple intermediate outputs (from Retrieve)
if '_intermediates' in parsed:
for intermediate_data in parsed['_intermediates']:
output_key = self._create_intermediate_key(intermediate_data)
if output_key not in seen_intermediates:
seen_intermediates.add(output_key)
intermediate_outputs.append(self._format_intermediate_output(intermediate_data))
except (json.JSONDecodeError, ValueError):
pass
except Exception as e:
logger.debug(f"Failed to extract intermediate output: {e}")
event_time = time.time() - event_start
logger.info(f"[PERF] Event {event_count} processing took: {event_time:.4f}s")
workflow_duration = time.time() - start
session_duration = time.time() - session_start
logger.info(f"[PERF] Read graph workflow completed in {workflow_duration}s")
logger.info(f"[PERF] Total session duration: {session_duration:.4f}s")
logger.info(f"[PERF] Total events processed: {event_count}")
# Extract final answer
final_answer = ""
for messages in outputs:
if messages['role'] == 'tool':
message = messages['content']
# Handle MCP content format: [{'type': 'text', 'text': '...'}]
if isinstance(message, list):
# Extract text from MCP content blocks
for block in message:
if isinstance(block, dict) and block.get('type') == 'text':
message = block.get('text', '')
break
else:
continue # No text block found
try:
parsed = json.loads(message) if isinstance(message, str) else message
if isinstance(parsed, dict):
if parsed.get('status') == 'success':
summary_result = parsed.get('summary_result')
if summary_result:
final_answer = summary_result
except (json.JSONDecodeError, ValueError):
pass
# 记录成功的操作
total_duration = time.time() - start_time
# Check for workflow errors
if workflow_errors:
error_details = "; ".join([f"{e['tool']}: {e['error']}" for e in workflow_errors])
logger.warning(f"Read workflow completed with errors: {error_details}")
if audit_logger:
audit_logger.log_operation(
operation="READ",
config_id=config_id,
group_id=group_id,
success=False,
duration=total_duration,
error=error_details,
details={
"search_switch": search_switch,
"history_length": len(history),
"intermediate_outputs_count": len(intermediate_outputs),
"has_answer": bool(final_answer),
"errors": workflow_errors
}
except Exception as e:
# Ensure proper error handling and logging
error_msg = f"Read operation failed: {str(e)}"
logger.error(error_msg)
if audit_logger:
duration = time.time() - start_time
audit_logger.log_operation(
operation="READ",
config_id=config_id,
group_id=group_id,
success=False,
duration=duration,
error=error_msg
)
raise ValueError(error_msg)
)
# Raise error if no answer was produced
if not final_answer:
raise ValueError(f"Read workflow failed: {error_details}")
if audit_logger and not workflow_errors:
audit_logger.log_operation(
operation="READ",
config_id=config_id,
group_id=group_id,
success=True,
duration=total_duration,
details={
"search_switch": search_switch,
"history_length": len(history),
"intermediate_outputs_count": len(intermediate_outputs),
"has_answer": bool(final_answer)
}
)
retrieved_content=[]
repo = ShortTermMemoryRepository(db)
if str(search_switch)!="2":
for intermediate in intermediate_outputs:
print(intermediate)
intermediate_type=intermediate['type']
if intermediate_type=="search_result":
query=intermediate['query']
raw_results=intermediate['raw_results']
reranked_results=raw_results.get('reranked_results',[])
try:
statements=[statement['statement'] for statement in reranked_results.get('statements', [])]
except Exception:
statements=[]
statements=list(set(statements))
retrieved_content.append({query:statements})
if retrieved_content==[]:
retrieved_content=''
if '信息不足,无法回答。' != str(final_answer) and str(search_switch).strip() != "2":#and retrieved_content!=[]
# 使用 upsert 方法
repo.upsert(
end_user_id=end_user_id, # 确保这个变量在作用域内
messages=ori_message,
aimessages=final_answer,
retrieved_content=retrieved_content,
search_switch=str(search_switch)
)
print("写入成功")
return {
"answer": final_answer,
"intermediate_outputs": intermediate_outputs
}
def _create_intermediate_key(self, output: Dict) -> str:
"""
Create a unique key for an intermediate output to detect duplicates.
Args:
output: Intermediate output dictionary
Returns:
Unique string key for this output
"""
output_type = output.get('type', 'unknown')
if output_type == 'problem_split':
# Use type + original query as key
return f"split:{output.get('original_query', '')}"
elif output_type == 'problem_extension':
# Use type + original query as key
return f"extension:{output.get('original_query', '')}"
elif output_type == 'search_result':
# Use type + query + index as key
return f"search:{output.get('query', '')}:{output.get('index', 0)}"
elif output_type == 'retrieval_summary':
# Use type + query as key
return f"summary:{output.get('query', '')}"
elif output_type == 'verification':
# Use type + query as key
return f"verification:{output.get('query', '')}"
elif output_type == 'input_summary':
# Use type + query as key
return f"input_summary:{output.get('query', '')}"
else:
# Fallback: use JSON representation
import json
return json.dumps(output, sort_keys=True)
def _format_intermediate_output(self, output: Dict) -> Dict:
"""Format intermediate output for frontend display."""
output_type = output.get('type', 'unknown')
if output_type == 'problem_split':
return {
'type': 'problem_split',
'title': '问题拆分',
'data': output.get('data', []),
'original_query': output.get('original_query', '')
}
elif output_type == 'problem_extension':
return {
'type': 'problem_extension',
'title': '问题扩展',
'data': output.get('data', {}),
'original_query': output.get('original_query', '')
}
elif output_type == 'search_result':
return {
'type': 'search_result',
'title': f'检索结果 ({output.get("index", 0)}/{output.get("total", 0)})',
'query': output.get('query', ''),
'raw_results': output.get('raw_results', ''),
'index': output.get('index', 0),
'total': output.get('total', 0)
}
elif output_type == 'retrieval_summary':
return {
'type': 'retrieval_summary',
'title': '检索总结',
'summary': output.get('summary', ''),
'query': output.get('query', ''),
'raw_results': output.get('raw_results'),
}
elif output_type == 'verification':
return {
'type': 'verification',
'title': '数据验证',
'result': output.get('result', 'unknown'),
'reason': output.get('reason', ''),
'query': output.get('query', ''),
'verified_count': output.get('verified_count', 0)
}
elif output_type == 'input_summary':
return {
'type': 'input_summary',
'title': '快速答案',
'summary': output.get('summary', ''),
'query': output.get('query', ''),
'raw_results': output.get('raw_results'),
}
else:
return output
async def classify_message_type(self, message: str, config_id: int, db: Session) -> Dict:
"""
@@ -683,7 +838,6 @@ class MemoryAgentService:
# 获取当前空间下的所有宿主
from app.repositories import app_repository, end_user_repository
from app.schemas.app_schema import App as AppSchema
from app.schemas.end_user_schema import EndUser as EndUserSchema
# 查询应用并转换为 Pydantic 模型
apps_orm = app_repository.get_apps_by_workspace_id(db, current_workspace_id)
@@ -981,6 +1135,43 @@ class MemoryAgentService:
logger.info("Log streaming completed, cleaning up resources")
# LogStreamer uses context manager for file handling, so cleanup is automatic
# async def get_api_docs(self, file_path: Optional[str] = None) -> Dict[str, Any]:
# """
# Parse and return API documentation
# Args:
# file_path: Optional path to API docs file. If None, uses default path.
# Returns:
# Dict containing parsed API documentation or error information
# """
# try:
# target = file_path or get_default_docs_path()
# if not os.path.isfile(target):
# return {
# "success": False,
# "msg": "API文档文件不存在",
# "error_code": "DOC_NOT_FOUND",
# "data": {"path": target}
# }
# data = parse_api_docs(target)
# return {
# "success": True,
# "msg": "解析成功",
# "data": data
# }
# except Exception as e:
# logger.error(f"Failed to parse API docs: {e}")
# return {
# "success": False,
# "msg": "解析失败",
# "error_code": "DOC_PARSE_ERROR",
# "data": {"error": str(e)}
# }
def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, Any]:
"""
获取终端用户关联的记忆配置
@@ -989,18 +1180,20 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
1. 根据 end_user_id 获取用户的 app_id
2. 获取该应用的最新发布版本
3. 从发布版本的 config 字段中提取 memory_config_id
4. 根据 memory_config_id 查询配置名称
Args:
end_user_id: 终端用户ID
db: 数据库会话
Returns:
包含 memory_config_id 和相关信息的字典
包含 memory_config_id、config_name 和相关信息的字典
Raises:
ValueError: 当终端用户不存在或应用未发布时
"""
from app.models.app_release_model import AppRelease
from app.models.data_config_model import DataConfig
from app.models.end_user_model import EndUser
from sqlalchemy import select
@@ -1034,15 +1227,31 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
memory_obj = config.get('memory', {})
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
# 4. 根据 memory_config_id 查询配置名称
config_name = None
if memory_config_id:
try:
# memory_config_id 可能是整数或字符串,需要转换
config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id
data_config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first()
if data_config:
config_name = data_config.config_name
logger.debug(f"Found config_name: {config_name} for config_id: {config_id}")
else:
logger.warning(f"DataConfig not found for config_id: {config_id}")
except (ValueError, TypeError) as e:
logger.warning(f"Invalid memory_config_id format: {memory_config_id}, error: {str(e)}")
result = {
"end_user_id": str(end_user_id),
"app_id": str(app_id),
"release_id": str(latest_release.id),
"release_version": latest_release.version,
"memory_config_id": memory_config_id
"memory_config_id": memory_config_id,
"memory_config_name": config_name
}
logger.info(f"Successfully retrieved connected config: memory_config_id={memory_config_id}")
logger.info(f"Successfully retrieved connected config: memory_config_id={memory_config_id}, config_name={config_name}")
return result
@@ -1050,112 +1259,126 @@ def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session)
"""
批量获取多个终端用户关联的记忆配置
通过以下流程获取配置
1. 批量查询所有 end_user 及其 app_id
2. 批量获取所有应用的最新发布版本
3. 从发布版本的 config 字段中提取 memory_config_id 和 memory_config_name
通过优化的查询减少数据库往返次数
1. 一次性查询所有 end_user 及其 app_id
2. 批量查询所有相关的 app_release
3. 批量查询所有相关的 data_config
Args:
end_user_ids: 终端用户ID列表
db: 数据库会话
Returns:
字典key 为 end_user_idvalue 为包含 memory_config_id 和 memory_config_name 的字典
格式: {
"user_id_1": {"memory_config_id": "xxx", "memory_config_name": "xxx"},
"user_id_2": {"memory_config_id": None, "memory_config_name": None},
...
}
字典key 为 end_user_idvalue 为配置信息字典
对于查询失败的用户value 包含 error 字段
"""
from app.models.app_release_model import AppRelease
from app.models.data_config_model import DataConfig
from app.models.end_user_model import EndUser
from app.models.memory_config_model import MemoryConfig
from sqlalchemy import select
logger.info(f"Batch getting connected configs for {len(end_user_ids)} end_users")
logger.info(f"Batch getting connected configs for {len(end_user_ids)} end users")
result = {}
# 1. 批量查询所有 end_user 及其 app_id
end_users = db.query(EndUser).filter(EndUser.id.in_(end_user_ids)).all()
# 建 end_user_id 到 app_id 的映射
user_to_app = {str(eu.id): eu.app_id for eu in end_users}
# 建 end_user_id -> end_user 的映射
end_user_map = {str(user.id): user for user in end_users}
# 获取所有相关的 app_id
app_ids = list(set(user_to_app.values()))
# 记录不存在的用户
for user_id in end_user_ids:
if user_id not in end_user_map:
result[user_id] = {
"end_user_id": user_id,
"memory_config_id": None,
"memory_config_name": None,
"error": f"终端用户不存在: {user_id}"
}
if not app_ids:
logger.warning("No valid app_ids found for the provided end_user_ids")
# 返回空配置
for user_id in end_user_ids:
result[user_id] = {"memory_config_id": None, "memory_config_name": None}
if not end_users:
logger.warning("No valid end users found")
return result
# 2. 批量获取所有应用的最新发布版本
# 2. 批量查询所有相关应用的最新发布版本
app_ids = [user.app_id for user in end_users]
# 使用子查询找到每个 app 的最新版本
from sqlalchemy import func
from sqlalchemy import and_
subq = (
select(
AppRelease.app_id,
func.max(AppRelease.version).label('max_version')
# 查询所有相关的活跃发布版本
releases = db.query(AppRelease).filter(
and_(
AppRelease.app_id.in_(app_ids),
AppRelease.is_active.is_(True)
)
.where(AppRelease.app_id.in_(app_ids), AppRelease.is_active.is_(True))
.group_by(AppRelease.app_id)
.subquery()
)
).order_by(AppRelease.app_id, AppRelease.version.desc()).all()
stmt = (
select(AppRelease)
.join(
subq,
(AppRelease.app_id == subq.c.app_id) & (AppRelease.version == subq.c.max_version)
)
.where(AppRelease.is_active.is_(True))
)
# 构建 app_id -> latest_release 的映射(每个 app 只保留最新版本)
app_release_map = {}
for release in releases:
app_id_str = str(release.app_id)
if app_id_str not in app_release_map:
app_release_map[app_id_str] = release
latest_releases = db.scalars(stmt).all()
# 创建 app_id 到 release 的映射
app_to_release = {str(release.app_id): release for release in latest_releases}
# 3. 提取所有 memory_config_id
# 3. 收集所有 memory_config_id
memory_config_ids = []
for release in latest_releases:
for release in app_release_map.values():
config = release.config or {}
memory_obj = config.get('memory', {})
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
if memory_config_id:
memory_config_ids.append(memory_config_id)
try:
config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id
memory_config_ids.append(config_id)
except (ValueError, TypeError):
pass
# 4. 批量查询 memory_config_name
memory_configs = {}
# 4. 批量查询所有 data_config
config_name_map = {}
if memory_config_ids:
configs = db.query(MemoryConfig).filter(MemoryConfig.id.in_(memory_config_ids)).all()
memory_configs = {str(cfg.id): cfg.config_name for cfg in configs}
data_configs = db.query(DataConfig).filter(
DataConfig.config_id.in_(memory_config_ids)
).all()
config_name_map = {config.config_id: config.config_name for config in data_configs}
# 5. 组装结果
for user_id in end_user_ids:
app_id = user_to_app.get(user_id)
if not app_id:
result[user_id] = {"memory_config_id": None, "memory_config_name": None}
for user in end_users:
user_id = str(user.id)
app_id = str(user.app_id)
# 检查是否有发布版本
if app_id not in app_release_map:
result[user_id] = {
"end_user_id": user_id,
"memory_config_id": None,
"memory_config_name": None,
"error": f"应用未发布: {app_id}"
}
continue
release = app_to_release.get(str(app_id))
if not release:
result[user_id] = {"memory_config_id": None, "memory_config_name": None}
continue
release = app_release_map[app_id]
# 提取 memory_config_id
config = release.config or {}
memory_obj = config.get('memory', {})
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
memory_config_name = memory_configs.get(memory_config_id) if memory_config_id else None
# 获取 config_name
config_name = None
if memory_config_id:
try:
config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id
config_name = config_name_map.get(config_id)
except (ValueError, TypeError):
pass
result[user_id] = {
"end_user_id": user_id,
"memory_config_id": memory_config_id,
"memory_config_name": memory_config_name
"memory_config_name": config_name
}
logger.info(f"Successfully retrieved {len(result)} connected configs")
logger.info(f"Successfully retrieved batch configs: total={len(result)}, with_config={sum(1 for v in result.values() if v.get('memory_config_id'))}")
return result

View File

@@ -597,7 +597,7 @@ class MemoryInteraction:
group_id = ori_data[0]['group_id']
Space_User = await self.connector.execute_query(Memory_Space_User, group_id=group_id)
if not Space_User:
return '不存在用户'
return []
user_id=Space_User[0]['id']
results = await self.connector.execute_query(Memory_Space_Associative, id=self.id,user_id=user_id)

View File

@@ -267,14 +267,14 @@ class MemoryForgetService:
elif node_type_label == 'memorysummary':
node_type_label = 'summary'
# 将 Neo4j DateTime 对象转换为时间戳
# 将 Neo4j DateTime 对象转换为时间戳(毫秒)
last_access_time = result['last_access_time']
last_access_dt = convert_neo4j_datetime_to_python(last_access_time)
# 确保 datetime 带有时区信息(假定为 UTC),避免 naive datetime 导致的时区偏差
if last_access_dt:
if last_access_dt.tzinfo is None:
last_access_dt = last_access_dt.replace(tzinfo=timezone.utc)
last_access_timestamp = int(last_access_dt.timestamp())
last_access_timestamp = int(last_access_dt.timestamp() * 1000)
else:
last_access_timestamp = 0
@@ -520,7 +520,7 @@ class MemoryForgetService:
'average_activation_value': result['average_activation'],
'low_activation_nodes': result['low_activation_nodes'] or 0,
'forgetting_threshold': forgetting_threshold,
'timestamp': int(datetime.now().timestamp())
'timestamp': int(datetime.now().timestamp() * 1000)
}
else:
activation_metrics = {
@@ -530,7 +530,7 @@ class MemoryForgetService:
'average_activation_value': None,
'low_activation_nodes': 0,
'forgetting_threshold': forgetting_threshold,
'timestamp': int(datetime.now().timestamp())
'timestamp': int(datetime.now().timestamp() * 1000)
}
# 收集节点类型分布
@@ -620,7 +620,7 @@ class MemoryForgetService:
'merged_count': record.merged_count,
'average_activation': record.average_activation_value,
'total_nodes': record.total_nodes,
'execution_time': int(record.execution_time.timestamp())
'execution_time': int(record.execution_time.timestamp() * 1000)
})
api_logger.info(f"成功获取最近 {len(recent_trends)} 个日期的历史趋势数据")
@@ -661,7 +661,7 @@ class MemoryForgetService:
'node_distribution': node_distribution,
'recent_trends': recent_trends,
'pending_nodes': pending_nodes,
'timestamp': int(datetime.now().timestamp())
'timestamp': int(datetime.now().timestamp() * 1000)
}
api_logger.info(

View File

@@ -1327,7 +1327,8 @@ class MultiAgentOrchestrator:
web_search=web_search,
memory=memory,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id
user_rag_memory_id=user_rag_memory_id,
sub_agent=True
)
return result

View File

@@ -16,6 +16,7 @@ from app.db import get_db_context
from app.repositories.conversation_repository import ConversationRepository
from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping
from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.memory_base_service import MemoryBaseService
from app.services.memory_config_service import MemoryConfigService

View File

@@ -13,11 +13,10 @@ from sqlalchemy.orm import Session
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.core.workflow.validator import validate_workflow_config
from app.db import get_db, get_db_context
from app.db import get_db
from app.models.conversation_model import Message
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
from app.repositories.conversation_repository import MessageRepository
from app.models.conversation_model import Message
from app.repositories.end_user_repository import EndUserRepository
from app.repositories.workflow_repository import (
WorkflowConfigRepository,
WorkflowExecutionRepository,
@@ -483,14 +482,6 @@ class WorkflowService:
try:
# 更新状态为运行中
self.update_execution_status(execution.execution_id, "running")
with get_db_context() as db:
end_user_repo = EndUserRepository(db)
new_end_user = end_user_repo.get_or_create_end_user(
app_id=app_id,
other_id=payload.user_id,
original_user_id=payload.user_id # Save original user_id to other_id
)
end_user_id = str(new_end_user.id)
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
@@ -511,7 +502,7 @@ class WorkflowService:
input_data=input_data,
execution_id=execution.execution_id,
workspace_id=str(workspace_id),
user_id=end_user_id
user_id=payload.user_id
)
# 更新执行结果
@@ -638,14 +629,6 @@ class WorkflowService:
try:
# 更新状态为运行中
self.update_execution_status(execution.execution_id, "running")
with get_db_context() as db:
end_user_repo = EndUserRepository(db)
new_end_user = end_user_repo.get_or_create_end_user(
app_id=app_id,
other_id=payload.user_id,
original_user_id=payload.user_id # Save original user_id to other_id
)
end_user_id = str(new_end_user.id)
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
for exec_res in executions:
@@ -665,7 +648,7 @@ class WorkflowService:
input_data=input_data,
execution_id=execution.execution_id,
workspace_id=str(workspace_id),
user_id=end_user_id
user_id=payload.user_id
):
if event.get("event") == "workflow_end":

View File

@@ -8,9 +8,11 @@ import uuid
from typing import Dict, Any, Optional, Union
from datetime import datetime
from app.db import get_db_read
from app.models import AppRelease, WorkflowConfig
from app.models.agent_app_config_model import AgentConfig
from app.models.multi_agent_model import MultiAgentConfig
from app.repositories.workflow_repository import WorkflowConfigRepository
def model_parameters_to_dict(model_parameters: Any) -> Optional[Dict[str, Any]]:
@@ -24,18 +26,18 @@ def model_parameters_to_dict(model_parameters: Any) -> Optional[Dict[str, Any]]:
"""
if model_parameters is None:
return None
if isinstance(model_parameters, dict):
return model_parameters
# Pydantic v2
if hasattr(model_parameters, 'model_dump'):
return model_parameters.model_dump()
# Pydantic v1
if hasattr(model_parameters, 'dict'):
return model_parameters.dict()
# 其他情况尝试转换
try:
return dict(model_parameters)
@@ -54,17 +56,18 @@ def dict_to_model_parameters(data: Optional[Dict[str, Any]]) -> Optional[Any]:
"""
if data is None:
return None
from app.schemas import ModelParameters
if isinstance(data, ModelParameters):
return data
if isinstance(data, dict):
return ModelParameters(**data)
return None
class AgentConfigProxy:
"""Proxy class for AgentConfig (legacy compatibility)"""
@@ -78,8 +81,7 @@ class AgentConfigProxy:
self.default_model_config_id = release.default_model_config_id
def agent_config_4_app_release(release: AppRelease ) -> AgentConfig:
def agent_config_4_app_release(release: AppRelease) -> AgentConfig:
config_dict = release.config
agent_config = AgentConfig(
@@ -95,18 +97,17 @@ def agent_config_4_app_release(release: AppRelease ) -> AgentConfig:
return agent_config
def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig:
def multi_agent_config_4_app_release(release: AppRelease) -> MultiAgentConfig:
config_dict = release.config
agent_config = MultiAgentConfig(
app_id=release.app_id,
default_model_config_id=release.default_model_config_id,
model_parameters=config_dict.get("model_parameters"),
master_agent_id=config_dict.get("master_agent_id"),
master_agent_name=config_dict.get("master_agent_name"),
orchestration_mode=config_dict.get("orchestration_mode", "conditional"),
orchestration_mode=config_dict.get("orchestration_mode", "supervisor"),
sub_agents=config_dict.get("sub_agents", []),
routing_rules=config_dict.get("routing_rules"),
execution_config=config_dict.get("execution_config", {}),
@@ -116,24 +117,26 @@ def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig:
return agent_config
def workflow_config_4_app_release(release: AppRelease ) -> WorkflowConfig:
def workflow_config_4_app_release(release: AppRelease) -> WorkflowConfig:
config_dict = release.config
with get_db_read() as db:
source_config = WorkflowConfigRepository(db).get_by_app_id(release.app_id)
source_config_id = source_config.id
config = WorkflowConfig(
id=release.id,
id=source_config_id,
app_id=release.app_id,
nodes=config_dict.get("nodes", []),
edges=config_dict.get("edges", []),
variables=config_dict.get("variables", []),
execution_config=config_dict.get("execution_config", {}),
triggers=config_dict.get("triggers", [])
)
return config
def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
"""Convert dict to MultiAgentConfig model object
@@ -149,7 +152,7 @@ def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uui
... "app_id": "uuid-here",
... "master_agent_id": "master-uuid",
... "master_agent_name": "Master Agent",
... "orchestration_mode": "conditional",
... "orchestration_mode": "supervisor",
... "sub_agents": [
... {"agent_id": "sub1-uuid", "name": "Sub Agent 1", "role": "specialist", "priority": 1},
... {"agent_id": "sub2-uuid", "name": "Sub Agent 2", "role": "specialist", "priority": 2}
@@ -186,7 +189,7 @@ def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uui
app_id=final_app_id,
master_agent_id=master_agent_id,
master_agent_name=config_dict.get("master_agent_name"),
orchestration_mode=config_dict.get("orchestration_mode", "conditional"),
orchestration_mode=config_dict.get("orchestration_mode", "supervisor"),
sub_agents=config_dict.get("sub_agents", []),
routing_rules=config_dict.get("routing_rules"),
execution_config=config_dict.get("execution_config", {}),
@@ -276,7 +279,8 @@ def agent_config_to_dict(agent_config) -> Dict[str, Any]:
"id": str(agent_config.id),
"app_id": str(agent_config.app_id),
"system_prompt": agent_config.system_prompt,
"default_model_config_id": str(agent_config.default_model_config_id) if agent_config.default_model_config_id else None,
"default_model_config_id": str(
agent_config.default_model_config_id) if agent_config.default_model_config_id else None,
"model_parameters": agent_config.model_parameters,
"knowledge_retrieval": agent_config.knowledge_retrieval,
"memory": agent_config.memory,
@@ -338,6 +342,3 @@ def workflow_config_to_dict(workflow_config) -> Dict[str, Any]:
"created_at": workflow_config.created_at.isoformat() if workflow_config.created_at else None,
"updated_at": workflow_config.updated_at.isoformat() if workflow_config.updated_at else None
}

View File

@@ -1,33 +1,68 @@
{
"v0.2.0": {
"codeName": "启知",
"releaseDate": "2026-1-16",
"upgradePosition": "本次为架构升级,核心目标是把“被动存储”升级为“主动认知”,让系统具备情绪感知、情景理解与类人记忆机制,为后续多智能体协作与专业场景落地奠定底座。",
"coreUpgrades": [
"记忆详情:拟人记忆——情绪引擎、情景记忆、短期记忆、工作记忆、感知记忆、显性记忆、隐性记忆,并配套类脑遗忘机制,实现从感知→情绪→情景→长期沉淀的完整人类记忆闭环",
"可视化工作流拖拽式节点编排LLM、知识库、逻辑、工具业务落地周期由天缩至小时。",
"多模态知识处理PDF、PPT、MP3、MP4 一键解析,时间感知检索准确率 94.3%,问答对数据即插即用。",
"Agent集群内置“记忆-知识-工具-审核”四类角色模板用户一键生成主控Agent把复杂任务拆为子任务并行分发再靠情景记忆统一消解冲突、校验一致性输出完整报告。"
]
"introduction": {
"codeName": "启知",
"releaseDate": "2026-1-16",
"upgradePosition": "本次为架构升级,核心目标是把\"被动存储\"升级为\"主动认知\",让系统具备情绪感知、情景理解与类人记忆机制,为后续多智能体协作与专业场景落地奠定底座。",
"coreUpgrades": [
"记忆详情:拟人记忆——情绪引擎、情景记忆、短期记忆、工作记忆、感知记忆、显性记忆、隐性记忆,并配套类脑遗忘机制,实现从感知→情绪→情景→长期沉淀的完整人类记忆闭环",
"可视化工作流拖拽式节点编排LLM、知识库、逻辑、工具业务落地周期由天缩至小时。",
"多模态知识处理PDF、PPT、MP3、MP4 一键解析,时间感知检索准确率 94.3%,问答对数据即插即用。",
"Agent集群内置\"记忆-知识-工具-审核\"四类角色模板用户一键生成主控Agent把复杂任务拆为子任务并行分发再靠情景记忆统一消解冲突、校验一致性输出完整报告。"
]
},
"introduction_en": {
"codeName": "Qizhi",
"releaseDate": "2026-1-16",
"upgradePosition": "This release marks a foundational upgrade to the systems cognitive architecture. The core objective is to evolve the platform from passive information storage into active cognitive intelligence—enabling emotional awareness, situational understanding, and human-like memory mechanisms. This upgrade lays the groundwork for future multi-agent collaboration and domain-specific, production-grade AI applications.",
"coreUpgrades": [
"Human-Like Memory Architecture: A comprehensive, human-inspired memory system is introduced, encompassing emotional processing, situational memory, short-term and working memory, perceptual memory, as well as explicit and implicit memory. Combined with brain-inspired forgetting mechanisms, the system now supports a complete cognitive loop—from perception → emotion → context → long-term consolidation, closely mirroring human memory formation.",
"Visual Workflow Orchestration: A fully visual, drag-and-drop workflow enables modular composition of LLMs, knowledge bases, logic, and tools. This dramatically reduces the time required to move from experimentation to production—from days to hours.",
"Multimodal Knowledge Processing: The system now supports one-click parsing and ingestion of PDF, PPT, MP3, and MP4 content. With time-aware retrieval accuracy reaching 94.3%, structured Q&A data becomes instantly usable for downstream reasoning and generation.",
"Built-in Agent Clusters: Predefined role templates across four categories—Memory, Knowledge, Tools, and Review—can be generated with a single click. A Coordinator Agent decomposes complex tasks into parallel subtasks, while situational memory is used to resolve conflicts, validate consistency, and synthesize outputs into a coherent, end-to-end report."
]
}
},
"v0.1.0": {
"codeName": "初心",
"releaseDate": "2025-12-01",
"upgradePosition": "这是一款专注于管理和利用AI记忆的工具支持RAG和知识图谱两种主流存储方式旨在为AI应用提供持久化、结构化的“记忆”能力。",
"coreUpgrades": [
"记忆空间:用户可以创建独立的空间来隔离不同记忆,并灵活选择存储方式。",
"记忆配置:简化了配置流程,内置自动提取关键信息的“记忆萃取”和管理生命周期的\"遗忘\"引擎。",
"知识检索:提供语义、分词和混合三种检索模式,并支持多种参数微调和结果重排序,以提升召回效果。",
"全局管理:支持统一设置默认检索参数,并可一键应用到所有知识库。",
"测试与调试:内置\"召回测试\"功能,方便用户实时验证检索效果并调整参数,支持通过分享码与他人协作。",
"记忆洞察可查看详细的对话记录、用户画像和分析报告帮助理解AI的\"记忆\"内容。",
"集成与管理提供API Key用于系统集成并包含基本的用户管理功能。",
"界面与体验:采用现代化的卡片式布局和渐变色设计,注重交互的流畅性和视觉美感。",
"起步与使用:文档中提供了清晰的基础使用流程,引导用户从创建空间、配置记忆到测试检索快速上手。",
"版本说明与限制: 记忆熊 v0.1.0 版本\"初心\"囊括智能记忆管理的核心思路和基础能力,为后续开发奠定了基础。",
"文档资源用户手册、API文档、FAQ",
"问题反馈GitHub Issues、邮件支持",
"致谢:感谢所有参与测试和提供反馈的用户!"
]
"introduction": {
"codeName": "初心",
"releaseDate": "2025-12-01",
"upgradePosition": "这是一款专注于管理和利用AI记忆的工具支持RAG和知识图谱两种主流存储方式旨在为AI应用提供持久化、结构化的\"记忆\"能力。",
"coreUpgrades": [
"记忆空间:用户可以创建独立的空间来隔离不同记忆,并灵活选择存储方式。",
"记忆配置:简化了配置流程,内置自动提取关键信息的\"记忆萃取\"和管理生命周期的\"遗忘\"引擎。",
"知识检索:提供语义、分词和混合三种检索模式,并支持多种参数微调和结果重排序,以提升召回效果。",
"全局管理:支持统一设置默认检索参数,并可一键应用到所有知识库。",
"测试与调试:内置\"召回测试\"功能,方便用户实时验证检索效果并调整参数,支持通过分享码与他人协作。",
"记忆洞察可查看详细的对话记录、用户画像和分析报告帮助理解AI的\"记忆\"内容。",
"集成与管理提供API Key用于系统集成并包含基本的用户管理功能。",
"界面与体验:采用现代化的卡片式布局和渐变色设计,注重交互的流畅性和视觉美感。",
"起步与使用:文档中提供了清晰的基础使用流程,引导用户从创建空间、配置记忆到测试检索快速上手。",
"版本说明与限制: 记忆熊 v0.1.0 版本\"初心\"囊括智能记忆管理的核心思路和基础能力,为后续开发奠定了基础。",
"文档资源用户手册、API文档、FAQ",
"问题反馈GitHub Issues、邮件支持",
"致谢:感谢所有参与测试和提供反馈的用户!"
]
},
"introduction_en": {
"codeName": "Original Intent",
"releaseDate": "2025-12-01",
"upgradePosition": "A tool focused on managing and utilizing AI memory, supporting both RAG and knowledge graph storage methods, aiming to provide persistent and structured 'memory' capabilities for AI applications.",
"coreUpgrades": [
"Memory Space: Users can create independent spaces to isolate different memories and flexibly choose storage methods.",
"Memory Configuration: Simplified configuration process with built-in 'memory extraction' for automatic key information extraction and 'forgetting' engine for lifecycle management.",
"Knowledge Retrieval: Provides semantic, tokenization, and hybrid retrieval modes with various parameter tuning and result reranking to improve recall.",
"Global Management: Supports unified default retrieval parameter settings with one-click application to all knowledge bases.",
"Testing & Debugging: Built-in 'recall testing' for real-time verification of retrieval effects and parameter adjustment, with sharing code support for collaboration.",
"Memory Insights: View detailed conversation records, user profiles, and analysis reports to understand AI 'memory' content.",
"Integration & Management: Provides API Key for system integration with basic user management features.",
"Interface & Experience: Modern card-based layout with gradient design, focusing on interaction fluidity and visual aesthetics.",
"Getting Started: Documentation provides clear basic usage flow, guiding users from creating spaces, configuring memory to testing retrieval.",
"Version Notes: MemoryBear v0.1.0 'Original Intent' encompasses core concepts and basic capabilities of intelligent memory management, laying foundation for future development.",
"Documentation: User Manual, API Documentation, FAQ",
"Feedback: GitHub Issues, Email Support",
"Acknowledgments: Thanks to all users who participated in testing and provided feedback!"
]
}
}
}
}

View File

@@ -0,0 +1,62 @@
"""20260116
Revision ID: 1fd7d0e703b3
Revises: 9ab9b6393f32
Create Date: 2026-01-16 13:17:37.060026
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '1fd7d0e703b3'
down_revision: Union[str, None] = '9ab9b6393f32'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('emotion_suggestions_cache',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('end_user_id', sa.String(length=255), nullable=False, comment='终端用户ID组ID'),
sa.Column('health_summary', sa.Text(), nullable=False, comment='健康状态摘要'),
sa.Column('suggestions', sa.JSON(), nullable=False, comment='建议列表JSON格式'),
sa.Column('generated_at', sa.DateTime(), nullable=False, comment='生成时间'),
sa.Column('expires_at', sa.DateTime(), nullable=True, comment='过期时间'),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_emotion_suggestions_cache_end_user_id'), 'emotion_suggestions_cache', ['end_user_id'], unique=True)
op.create_index(op.f('ix_emotion_suggestions_cache_id'), 'emotion_suggestions_cache', ['id'], unique=False)
op.create_table('implicit_memory_cache',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('end_user_id', sa.String(length=255), nullable=False, comment='终端用户ID'),
sa.Column('preferences', sa.JSON(), nullable=False, comment='偏好标签列表JSON格式'),
sa.Column('portrait', sa.JSON(), nullable=False, comment='四维画像对象JSON格式'),
sa.Column('interest_areas', sa.JSON(), nullable=False, comment='兴趣领域分布对象JSON格式'),
sa.Column('habits', sa.JSON(), nullable=False, comment='行为习惯列表JSON格式'),
sa.Column('generated_at', sa.DateTime(), nullable=False, comment='生成时间'),
sa.Column('expires_at', sa.DateTime(), nullable=True, comment='过期时间'),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_implicit_memory_cache_end_user_id'), 'implicit_memory_cache', ['end_user_id'], unique=True)
op.create_index(op.f('ix_implicit_memory_cache_id'), 'implicit_memory_cache', ['id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_implicit_memory_cache_id'), table_name='implicit_memory_cache')
op.drop_index(op.f('ix_implicit_memory_cache_end_user_id'), table_name='implicit_memory_cache')
op.drop_table('implicit_memory_cache')
op.drop_index(op.f('ix_emotion_suggestions_cache_id'), table_name='emotion_suggestions_cache')
op.drop_index(op.f('ix_emotion_suggestions_cache_end_user_id'), table_name='emotion_suggestions_cache')
op.drop_table('emotion_suggestions_cache')
# ### end Alembic commands ###