Merge pull request #442 from SuanmoSuanyangTechnology/fix/storage
Fix/storage
This commit is contained in:
7
api/app/cache/__init__.py
vendored
7
api/app/cache/__init__.py
vendored
@@ -2,10 +2,7 @@
|
||||
Cache 缓存模块
|
||||
|
||||
提供各种缓存功能的统一入口
|
||||
注意:隐性记忆和情绪建议已迁移到数据库存储,不再使用Redis缓存
|
||||
"""
|
||||
from .memory import EmotionMemoryCache, ImplicitMemoryCache
|
||||
|
||||
__all__ = [
|
||||
"EmotionMemoryCache",
|
||||
"ImplicitMemoryCache",
|
||||
]
|
||||
__all__ = []
|
||||
|
||||
8
api/app/cache/memory/__init__.py
vendored
8
api/app/cache/memory/__init__.py
vendored
@@ -2,11 +2,7 @@
|
||||
Memory 缓存模块
|
||||
|
||||
提供记忆系统相关的缓存功能
|
||||
注意:隐性记忆和情绪建议已迁移到数据库存储,不再使用Redis缓存
|
||||
"""
|
||||
from .emotion_memory import EmotionMemoryCache
|
||||
from .implicit_memory import ImplicitMemoryCache
|
||||
|
||||
__all__ = [
|
||||
"EmotionMemoryCache",
|
||||
"ImplicitMemoryCache",
|
||||
]
|
||||
__all__ = []
|
||||
|
||||
134
api/app/cache/memory/emotion_memory.py
vendored
134
api/app/cache/memory/emotion_memory.py
vendored
@@ -1,134 +0,0 @@
|
||||
"""
|
||||
Emotion Suggestions Cache
|
||||
|
||||
情绪个性化建议缓存模块
|
||||
用于缓存用户的情绪个性化建议数据
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
|
||||
from app.aioRedis import aio_redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmotionMemoryCache:
|
||||
"""情绪建议缓存类"""
|
||||
|
||||
# Key 前缀
|
||||
PREFIX = "cache:memory:emotion_memory"
|
||||
|
||||
@classmethod
|
||||
def _get_key(cls, *parts: str) -> str:
|
||||
"""生成 Redis key
|
||||
|
||||
Args:
|
||||
*parts: key 的各个部分
|
||||
|
||||
Returns:
|
||||
完整的 Redis key
|
||||
"""
|
||||
return ":".join([cls.PREFIX] + list(parts))
|
||||
|
||||
@classmethod
|
||||
async def set_emotion_suggestions(
|
||||
cls,
|
||||
user_id: str,
|
||||
suggestions_data: Dict[str, Any],
|
||||
expire: int = 86400
|
||||
) -> bool:
|
||||
"""设置用户情绪建议缓存
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
suggestions_data: 建议数据字典,包含:
|
||||
- health_summary: 健康状态摘要
|
||||
- suggestions: 建议列表
|
||||
- generated_at: 生成时间(可选)
|
||||
expire: 过期时间(秒),默认24小时(86400秒)
|
||||
|
||||
Returns:
|
||||
是否设置成功
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("suggestions", user_id)
|
||||
|
||||
# 添加生成时间戳
|
||||
if "generated_at" not in suggestions_data:
|
||||
suggestions_data["generated_at"] = datetime.now().isoformat()
|
||||
|
||||
# 添加缓存标记
|
||||
suggestions_data["cached"] = True
|
||||
|
||||
value = json.dumps(suggestions_data, ensure_ascii=False)
|
||||
await aio_redis.set(key, value, ex=expire)
|
||||
logger.info(f"设置情绪建议缓存成功: {key}, 过期时间: {expire}秒")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"设置情绪建议缓存失败: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
async def get_emotion_suggestions(cls, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""获取用户情绪建议缓存
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
|
||||
Returns:
|
||||
建议数据字典,如果不存在或已过期返回 None
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("suggestions", user_id)
|
||||
value = await aio_redis.get(key)
|
||||
|
||||
if value:
|
||||
data = json.loads(value)
|
||||
logger.info(f"成功获取情绪建议缓存: {key}")
|
||||
return data
|
||||
|
||||
logger.info(f"情绪建议缓存不存在或已过期: {key}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"获取情绪建议缓存失败: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def delete_emotion_suggestions(cls, user_id: str) -> bool:
|
||||
"""删除用户情绪建议缓存
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
|
||||
Returns:
|
||||
是否删除成功
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("suggestions", user_id)
|
||||
result = await aio_redis.delete(key)
|
||||
logger.info(f"删除情绪建议缓存: {key}, 结果: {result}")
|
||||
return result > 0
|
||||
except Exception as e:
|
||||
logger.error(f"删除情绪建议缓存失败: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
async def get_suggestions_ttl(cls, user_id: str) -> int:
|
||||
"""获取情绪建议缓存的剩余过期时间
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
|
||||
Returns:
|
||||
剩余秒数,-1表示永不过期,-2表示key不存在
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("suggestions", user_id)
|
||||
ttl = await aio_redis.ttl(key)
|
||||
logger.debug(f"情绪建议缓存TTL: {key} = {ttl}秒")
|
||||
return ttl
|
||||
except Exception as e:
|
||||
logger.error(f"获取情绪建议缓存TTL失败: {e}")
|
||||
return -2
|
||||
136
api/app/cache/memory/implicit_memory.py
vendored
136
api/app/cache/memory/implicit_memory.py
vendored
@@ -1,136 +0,0 @@
|
||||
"""
|
||||
Implicit Memory Profile Cache
|
||||
|
||||
隐式记忆用户画像缓存模块
|
||||
用于缓存用户的完整画像数据(偏好标签、四维画像、兴趣领域、行为习惯)
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
|
||||
from app.aioRedis import aio_redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImplicitMemoryCache:
|
||||
"""隐式记忆用户画像缓存类"""
|
||||
|
||||
# Key 前缀
|
||||
PREFIX = "cache:memory:implicit_memory"
|
||||
|
||||
@classmethod
|
||||
def _get_key(cls, *parts: str) -> str:
|
||||
"""生成 Redis key
|
||||
|
||||
Args:
|
||||
*parts: key 的各个部分
|
||||
|
||||
Returns:
|
||||
完整的 Redis key
|
||||
"""
|
||||
return ":".join([cls.PREFIX] + list(parts))
|
||||
|
||||
@classmethod
|
||||
async def set_user_profile(
|
||||
cls,
|
||||
user_id: str,
|
||||
profile_data: Dict[str, Any],
|
||||
expire: int = 86400
|
||||
) -> bool:
|
||||
"""设置用户完整画像缓存
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
profile_data: 画像数据字典,包含:
|
||||
- preferences: 偏好标签列表
|
||||
- portrait: 四维画像对象
|
||||
- interest_areas: 兴趣领域分布对象
|
||||
- habits: 行为习惯列表
|
||||
- generated_at: 生成时间(可选)
|
||||
expire: 过期时间(秒),默认24小时(86400秒)
|
||||
|
||||
Returns:
|
||||
是否设置成功
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("profile", user_id)
|
||||
|
||||
# 添加生成时间戳
|
||||
if "generated_at" not in profile_data:
|
||||
profile_data["generated_at"] = datetime.now().isoformat()
|
||||
|
||||
# 添加缓存标记
|
||||
profile_data["cached"] = True
|
||||
|
||||
value = json.dumps(profile_data, ensure_ascii=False)
|
||||
await aio_redis.set(key, value, ex=expire)
|
||||
logger.info(f"设置用户画像缓存成功: {key}, 过期时间: {expire}秒")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"设置用户画像缓存失败: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
async def get_user_profile(cls, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""获取用户完整画像缓存
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
|
||||
Returns:
|
||||
画像数据字典,如果不存在或已过期返回 None
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("profile", user_id)
|
||||
value = await aio_redis.get(key)
|
||||
|
||||
if value:
|
||||
data = json.loads(value)
|
||||
logger.info(f"成功获取用户画像缓存: {key}")
|
||||
return data
|
||||
|
||||
logger.info(f"用户画像缓存不存在或已过期: {key}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"获取用户画像缓存失败: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def delete_user_profile(cls, user_id: str) -> bool:
|
||||
"""删除用户完整画像缓存
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
|
||||
Returns:
|
||||
是否删除成功
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("profile", user_id)
|
||||
result = await aio_redis.delete(key)
|
||||
logger.info(f"删除用户画像缓存: {key}, 结果: {result}")
|
||||
return result > 0
|
||||
except Exception as e:
|
||||
logger.error(f"删除用户画像缓存失败: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
async def get_profile_ttl(cls, user_id: str) -> int:
|
||||
"""获取用户画像缓存的剩余过期时间
|
||||
|
||||
Args:
|
||||
user_id: 用户ID(end_user_id)
|
||||
|
||||
Returns:
|
||||
剩余秒数,-1表示永不过期,-2表示key不存在
|
||||
"""
|
||||
try:
|
||||
key = cls._get_key("profile", user_id)
|
||||
ttl = await aio_redis.ttl(key)
|
||||
logger.debug(f"用户画像缓存TTL: {key} = {ttl}秒")
|
||||
return ttl
|
||||
except Exception as e:
|
||||
logger.error(f"获取用户画像缓存TTL失败: {e}")
|
||||
return -2
|
||||
@@ -82,7 +82,8 @@ celery_app.conf.update(
|
||||
'app.tasks.workspace_reflection_task': {'queue': 'periodic_tasks'},
|
||||
'app.tasks.regenerate_memory_cache': {'queue': 'periodic_tasks'},
|
||||
'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'},
|
||||
'app.controllers.memory_storage_controller.search_all': {'queue': 'periodic_tasks'},
|
||||
'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'},
|
||||
'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -95,6 +96,7 @@ memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGEN
|
||||
# 这个30秒的设计不合理
|
||||
workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME
|
||||
forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期
|
||||
implicit_emotions_update_schedule = timedelta(hours=24) # 每24小时更新一次隐性记忆和情绪数据
|
||||
|
||||
#构建定时任务配置
|
||||
beat_schedule_config = {
|
||||
@@ -115,16 +117,16 @@ beat_schedule_config = {
|
||||
"config_id": None, # 使用默认配置,可以通过环境变量配置
|
||||
},
|
||||
},
|
||||
"write-all-workspaces-memory": {
|
||||
"task": "app.tasks.write_all_workspaces_memory_task",
|
||||
"schedule": memory_increment_schedule,
|
||||
"args": (),
|
||||
},
|
||||
"update-implicit-emotions-storage": {
|
||||
"task": "app.tasks.update_implicit_emotions_storage",
|
||||
"schedule": implicit_emotions_update_schedule,
|
||||
"args": (),
|
||||
},
|
||||
}
|
||||
|
||||
#如果配置了默认工作空间ID,则添加记忆总量统计任务
|
||||
if settings.DEFAULT_WORKSPACE_ID:
|
||||
beat_schedule_config["write-total-memory"] = {
|
||||
"task": "app.controllers.memory_storage_controller.search_all",
|
||||
"schedule": memory_increment_schedule,
|
||||
"kwargs": {
|
||||
"workspace_id": settings.DEFAULT_WORKSPACE_ID,
|
||||
},
|
||||
}
|
||||
|
||||
celery_app.conf.beat_schedule = beat_schedule_config
|
||||
|
||||
@@ -208,14 +208,64 @@ async def get_emotion_health(
|
||||
|
||||
|
||||
|
||||
# @router.post("/check-data", response_model=ApiResponse)
|
||||
# async def check_emotion_data_exists(
|
||||
# request: EmotionSuggestionsRequest,
|
||||
# db: Session = Depends(get_db),
|
||||
# current_user: User = Depends(get_current_user),
|
||||
# ):
|
||||
# """检查用户情绪建议数据是否存在
|
||||
|
||||
# Args:
|
||||
# request: 包含 end_user_id
|
||||
# db: 数据库会话
|
||||
# current_user: 当前用户
|
||||
|
||||
# Returns:
|
||||
# 数据存在状态
|
||||
# """
|
||||
# try:
|
||||
# api_logger.info(
|
||||
# f"检查用户情绪建议数据是否存在: {request.end_user_id}",
|
||||
# extra={"end_user_id": request.end_user_id}
|
||||
# )
|
||||
|
||||
# # 从数据库获取建议
|
||||
# data = await emotion_service.get_cached_suggestions(
|
||||
# end_user_id=request.end_user_id,
|
||||
# db=db
|
||||
# )
|
||||
|
||||
# if data is None:
|
||||
# api_logger.info(f"用户 {request.end_user_id} 的情绪建议数据不存在")
|
||||
# return fail(
|
||||
# BizCode.NOT_FOUND,
|
||||
# "情绪建议数据不存在,请点击右上角刷新进行初始化",
|
||||
# {"exists": False}
|
||||
# )
|
||||
|
||||
# api_logger.info(f"用户 {request.end_user_id} 的情绪建议数据存在")
|
||||
# return success(data={"exists": True}, msg="情绪建议数据已存在")
|
||||
|
||||
# except Exception as e:
|
||||
# api_logger.error(
|
||||
# f"检查情绪建议数据失败: {str(e)}",
|
||||
# extra={"end_user_id": request.end_user_id},
|
||||
# exc_info=True
|
||||
# )
|
||||
# raise HTTPException(
|
||||
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
# detail=f"检查情绪建议数据失败: {str(e)}"
|
||||
# )
|
||||
|
||||
|
||||
@router.post("/suggestions", response_model=ApiResponse)
|
||||
async def get_emotion_suggestions(
|
||||
request: EmotionSuggestionsRequest,
|
||||
language_type: str = Header(default=None, alias="X-Language-Type"),
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""获取个性化情绪建议(从缓存读取)
|
||||
"""获取个性化情绪建议(从数据库读取)
|
||||
|
||||
Args:
|
||||
request: 包含 end_user_id 和可选的 config_id
|
||||
@@ -223,77 +273,42 @@ async def get_emotion_suggestions(
|
||||
current_user: 当前用户
|
||||
|
||||
Returns:
|
||||
缓存的个性化情绪建议响应
|
||||
存储的个性化情绪建议响应
|
||||
"""
|
||||
try:
|
||||
# 使用集中化的语言校验
|
||||
language = get_language_from_header(language_type)
|
||||
|
||||
api_logger.info(
|
||||
f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)",
|
||||
f"用户 {current_user.username} 请求获取个性化情绪建议",
|
||||
extra={
|
||||
"end_user_id": request.end_user_id,
|
||||
"config_id": request.config_id
|
||||
}
|
||||
)
|
||||
|
||||
# 从缓存获取建议
|
||||
# 从数据库获取建议
|
||||
data = await emotion_service.get_cached_suggestions(
|
||||
end_user_id=request.end_user_id,
|
||||
db=db
|
||||
)
|
||||
|
||||
if data is None:
|
||||
# 缓存不存在或已过期,自动触发生成
|
||||
api_logger.info(
|
||||
f"用户 {request.end_user_id} 的建议缓存不存在或已过期,自动生成新建议",
|
||||
f"用户 {request.end_user_id} 的建议数据不存在",
|
||||
extra={"end_user_id": request.end_user_id}
|
||||
)
|
||||
try:
|
||||
data = await emotion_service.generate_emotion_suggestions(
|
||||
end_user_id=request.end_user_id,
|
||||
db=db,
|
||||
language=language
|
||||
)
|
||||
# 保存到缓存
|
||||
await emotion_service.save_suggestions_cache(
|
||||
end_user_id=request.end_user_id,
|
||||
suggestions_data=data,
|
||||
db=db,
|
||||
expires_hours=24
|
||||
)
|
||||
except (ValueError, KeyError) as gen_e:
|
||||
# 预期内的业务异常:配置缺失、数据格式问题等
|
||||
api_logger.warning(
|
||||
f"自动生成建议失败(业务异常): {str(gen_e)}",
|
||||
extra={"end_user_id": request.end_user_id}
|
||||
)
|
||||
return fail(
|
||||
BizCode.NOT_FOUND,
|
||||
f"自动生成建议失败: {str(gen_e)}",
|
||||
""
|
||||
)
|
||||
except Exception as gen_e:
|
||||
# 非预期异常:记录完整 traceback 便于排查
|
||||
api_logger.error(
|
||||
f"自动生成建议时发生未预期异常: {str(gen_e)}",
|
||||
extra={"end_user_id": request.end_user_id},
|
||||
exc_info=True
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"生成建议时发生内部错误: {str(gen_e)}"
|
||||
)
|
||||
return fail(
|
||||
code=404,
|
||||
msg="情绪建议数据不存在,请点击右上角刷新进行初始化"
|
||||
)
|
||||
|
||||
api_logger.info(
|
||||
"个性化建议获取成功(缓存)",
|
||||
"个性化建议获取成功",
|
||||
extra={
|
||||
"end_user_id": request.end_user_id,
|
||||
"suggestions_count": len(data.get("suggestions", []))
|
||||
}
|
||||
)
|
||||
|
||||
return success(data=data, msg="个性化建议获取成功(缓存)")
|
||||
return success(data=data, msg="个性化建议获取成功")
|
||||
|
||||
except Exception as e:
|
||||
api_logger.error(
|
||||
@@ -314,7 +329,7 @@ async def generate_emotion_suggestions(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""生成个性化情绪建议(调用LLM并缓存)
|
||||
"""生成个性化情绪建议(调用LLM并保存到数据库)
|
||||
|
||||
Args:
|
||||
request: 包含 end_user_id
|
||||
@@ -342,12 +357,11 @@ async def generate_emotion_suggestions(
|
||||
language=language
|
||||
)
|
||||
|
||||
# 保存到缓存
|
||||
# 保存到数据库
|
||||
await emotion_service.save_suggestions_cache(
|
||||
end_user_id=request.end_user_id,
|
||||
suggestions_data=data,
|
||||
db=db,
|
||||
expires_hours=24
|
||||
db=db
|
||||
)
|
||||
|
||||
api_logger.info(
|
||||
@@ -369,4 +383,4 @@ async def generate_emotion_suggestions(
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"生成个性化建议失败: {str(e)}"
|
||||
)
|
||||
)
|
||||
@@ -122,6 +122,48 @@ def validate_confidence_threshold(threshold: float) -> None:
|
||||
raise ValueError("confidence_threshold must be between 0.0 and 1.0")
|
||||
|
||||
|
||||
@router.get("/check-data/{end_user_id}", response_model=ApiResponse)
|
||||
@cur_workspace_access_guard()
|
||||
async def check_user_data_exists(
|
||||
end_user_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
) -> ApiResponse:
|
||||
"""
|
||||
检查用户画像数据是否存在
|
||||
|
||||
Args:
|
||||
end_user_id: 目标用户ID
|
||||
|
||||
Returns:
|
||||
数据存在状态
|
||||
"""
|
||||
api_logger.info(f"检查用户画像数据是否存在: {end_user_id}")
|
||||
|
||||
try:
|
||||
# Validate inputs
|
||||
validate_user_id(end_user_id)
|
||||
|
||||
# Create service with user-specific config
|
||||
service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
|
||||
|
||||
# Get cached profile
|
||||
cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db)
|
||||
|
||||
if cached_profile is None:
|
||||
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
|
||||
return success(
|
||||
data={"exists": False},
|
||||
msg="画像数据不存在,请点击右上角刷新进行初始化"
|
||||
)
|
||||
|
||||
api_logger.info(f"用户 {end_user_id} 的画像数据存在")
|
||||
return success(data={"exists": True}, msg="画像数据已存在")
|
||||
|
||||
except Exception as e:
|
||||
return handle_implicit_memory_error(e, "检查画像数据", end_user_id)
|
||||
|
||||
|
||||
@router.get("/preferences/{end_user_id}", response_model=ApiResponse)
|
||||
@cur_workspace_access_guard()
|
||||
async def get_preference_tags(
|
||||
@@ -159,12 +201,8 @@ async def get_preference_tags(
|
||||
cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db)
|
||||
|
||||
if cached_profile is None:
|
||||
api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期")
|
||||
return fail(
|
||||
BizCode.NOT_FOUND,
|
||||
"画像缓存不存在或已过期,请右上角刷新生成新画像",
|
||||
""
|
||||
)
|
||||
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
|
||||
return fail(BizCode.NOT_FOUND, "", "")
|
||||
|
||||
# Extract preferences from cache
|
||||
preferences = cached_profile.get("preferences", [])
|
||||
@@ -230,12 +268,8 @@ async def get_dimension_portrait(
|
||||
cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db)
|
||||
|
||||
if cached_profile is None:
|
||||
api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期")
|
||||
return fail(
|
||||
BizCode.NOT_FOUND,
|
||||
"画像缓存不存在或已过期,请右上角刷新生成新画像",
|
||||
""
|
||||
)
|
||||
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
|
||||
return fail(BizCode.NOT_FOUND, "", "")
|
||||
|
||||
# Extract portrait from cache
|
||||
portrait = cached_profile.get("portrait", {})
|
||||
@@ -278,12 +312,8 @@ async def get_interest_area_distribution(
|
||||
cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db)
|
||||
|
||||
if cached_profile is None:
|
||||
api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期")
|
||||
return fail(
|
||||
BizCode.NOT_FOUND,
|
||||
"画像缓存不存在或已过期,请右上角刷新生成新画像",
|
||||
""
|
||||
)
|
||||
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
|
||||
return fail(BizCode.NOT_FOUND, "", "")
|
||||
|
||||
# Extract interest areas from cache
|
||||
interest_areas = cached_profile.get("interest_areas", {})
|
||||
@@ -330,12 +360,8 @@ async def get_behavior_habits(
|
||||
cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db)
|
||||
|
||||
if cached_profile is None:
|
||||
api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期")
|
||||
return fail(
|
||||
BizCode.NOT_FOUND,
|
||||
"画像缓存不存在或已过期,请右上角刷新生成新画像",
|
||||
""
|
||||
)
|
||||
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
|
||||
return fail(BizCode.NOT_FOUND, "", "")
|
||||
|
||||
# Extract habits from cache
|
||||
habits = cached_profile.get("habits", [])
|
||||
|
||||
@@ -35,6 +35,7 @@ from .ontology_scene import OntologyScene
|
||||
from .ontology_class import OntologyClass
|
||||
from .ontology_scene import OntologyScene
|
||||
from .ontology_class import OntologyClass
|
||||
from .implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||
|
||||
__all__ = [
|
||||
"Tenants",
|
||||
@@ -90,5 +91,6 @@ __all__ = [
|
||||
"MemoryPerceptualModel",
|
||||
"ModelBase",
|
||||
"LoadBalanceStrategy",
|
||||
"Skill"
|
||||
"Skill",
|
||||
"ImplicitEmotionsStorage"
|
||||
]
|
||||
|
||||
45
api/app/models/implicit_emotions_storage_model.py
Normal file
45
api/app/models/implicit_emotions_storage_model.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
Implicit Emotions Storage Model
|
||||
|
||||
数据库模型:存储用户的隐性记忆画像和情绪建议数据
|
||||
替代原有的Redis缓存方式
|
||||
"""
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from sqlalchemy import Column, String, Text, DateTime, Index
|
||||
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
||||
from app.db import Base
|
||||
|
||||
|
||||
class ImplicitEmotionsStorage(Base):
|
||||
"""隐性记忆和情绪存储表"""
|
||||
|
||||
__tablename__ = "implicit_emotions_storage"
|
||||
|
||||
# 主键
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, comment="主键ID")
|
||||
|
||||
# 用户标识(unique=True会自动创建唯一索引)
|
||||
end_user_id = Column(String(255), nullable=False, unique=True, comment="终端用户ID")
|
||||
|
||||
# 隐性记忆画像数据(JSON格式)
|
||||
implicit_profile = Column(JSONB, nullable=True, comment="隐性记忆用户画像数据")
|
||||
|
||||
# 情绪建议数据(JSON格式)
|
||||
emotion_suggestions = Column(JSONB, nullable=True, comment="情绪个性化建议数据")
|
||||
|
||||
# 时间戳
|
||||
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="创建时间")
|
||||
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="更新时间")
|
||||
|
||||
# 数据生成时间(用于业务逻辑)
|
||||
implicit_generated_at = Column(DateTime, nullable=True, comment="隐性记忆画像生成时间")
|
||||
emotion_generated_at = Column(DateTime, nullable=True, comment="情绪建议生成时间")
|
||||
|
||||
# 索引(只为updated_at创建索引,end_user_id的unique约束已自动创建索引)
|
||||
__table_args__ = (
|
||||
Index('idx_updated_at', 'updated_at'),
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<ImplicitEmotionsStorage(id={self.id}, end_user_id={self.end_user_id})>"
|
||||
121
api/app/repositories/implicit_emotions_storage_repository.py
Normal file
121
api/app/repositories/implicit_emotions_storage_repository.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""
|
||||
Implicit Emotions Storage Repository
|
||||
|
||||
数据访问层:处理隐性记忆和情绪数据的数据库操作
|
||||
事务由调用方控制,仓储层只使用 flush/refresh
|
||||
"""
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, Generator
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImplicitEmotionsStorageRepository:
|
||||
"""隐性记忆和情绪存储仓储类"""
|
||||
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitEmotionsStorage]:
|
||||
"""根据终端用户ID获取存储记录"""
|
||||
try:
|
||||
stmt = select(ImplicitEmotionsStorage).where(
|
||||
ImplicitEmotionsStorage.end_user_id == end_user_id
|
||||
)
|
||||
return self.db.execute(stmt).scalar_one_or_none()
|
||||
except Exception as e:
|
||||
logger.error(f"获取用户存储记录失败: end_user_id={end_user_id}, error={e}")
|
||||
return None
|
||||
|
||||
def create(self, end_user_id: str) -> ImplicitEmotionsStorage:
|
||||
"""创建新的存储记录(事务由调用方提交)"""
|
||||
storage = ImplicitEmotionsStorage(
|
||||
end_user_id=end_user_id,
|
||||
created_at=datetime.utcnow(),
|
||||
updated_at=datetime.utcnow()
|
||||
)
|
||||
self.db.add(storage)
|
||||
self.db.flush()
|
||||
self.db.refresh(storage)
|
||||
logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}")
|
||||
return storage
|
||||
|
||||
def update_implicit_profile(
|
||||
self,
|
||||
end_user_id: str,
|
||||
profile_data: dict
|
||||
) -> ImplicitEmotionsStorage:
|
||||
"""更新隐性记忆画像数据(事务由调用方提交)"""
|
||||
storage = self.get_by_end_user_id(end_user_id)
|
||||
if storage is None:
|
||||
storage = self.create(end_user_id)
|
||||
|
||||
storage.implicit_profile = profile_data
|
||||
storage.implicit_generated_at = datetime.utcnow()
|
||||
storage.updated_at = datetime.utcnow()
|
||||
|
||||
self.db.flush()
|
||||
self.db.refresh(storage)
|
||||
logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}")
|
||||
return storage
|
||||
|
||||
def update_emotion_suggestions(
|
||||
self,
|
||||
end_user_id: str,
|
||||
suggestions_data: dict
|
||||
) -> ImplicitEmotionsStorage:
|
||||
"""更新情绪建议数据(事务由调用方提交)"""
|
||||
storage = self.get_by_end_user_id(end_user_id)
|
||||
if storage is None:
|
||||
storage = self.create(end_user_id)
|
||||
|
||||
storage.emotion_suggestions = suggestions_data
|
||||
storage.emotion_generated_at = datetime.utcnow()
|
||||
storage.updated_at = datetime.utcnow()
|
||||
|
||||
self.db.flush()
|
||||
self.db.refresh(storage)
|
||||
logger.info(f"更新情绪建议成功: end_user_id={end_user_id}")
|
||||
return storage
|
||||
|
||||
def get_all_user_ids(self, batch_size: int = 100) -> Generator[str, None, None]:
|
||||
"""分批次获取所有已存储数据的用户ID(避免大数据量内存溢出)
|
||||
|
||||
Args:
|
||||
batch_size: 每批次加载的数量,默认100
|
||||
|
||||
Yields:
|
||||
用户ID字符串
|
||||
"""
|
||||
offset = 0
|
||||
while True:
|
||||
try:
|
||||
stmt = (
|
||||
select(ImplicitEmotionsStorage.end_user_id)
|
||||
.order_by(ImplicitEmotionsStorage.end_user_id)
|
||||
.limit(batch_size)
|
||||
.offset(offset)
|
||||
)
|
||||
batch = self.db.execute(stmt).scalars().all()
|
||||
if not batch:
|
||||
break
|
||||
yield from batch
|
||||
offset += batch_size
|
||||
except Exception as e:
|
||||
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
||||
break
|
||||
|
||||
def delete_by_end_user_id(self, end_user_id: str) -> bool:
|
||||
"""删除用户的存储记录(事务由调用方提交)"""
|
||||
storage = self.get_by_end_user_id(end_user_id)
|
||||
if storage:
|
||||
self.db.delete(storage)
|
||||
self.db.flush()
|
||||
logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}")
|
||||
return True
|
||||
return False
|
||||
@@ -843,32 +843,33 @@ class EmotionAnalyticsService:
|
||||
end_user_id: str,
|
||||
db: Session,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""从 Redis 缓存获取个性化情绪建议
|
||||
"""从数据库获取个性化情绪建议
|
||||
|
||||
Args:
|
||||
end_user_id: 宿主ID(用户组ID)
|
||||
db: 数据库会话(保留参数以保持接口兼容性)
|
||||
db: 数据库会话
|
||||
|
||||
Returns:
|
||||
Dict: 缓存的建议数据,如果不存在或已过期返回 None
|
||||
Dict: 存储的建议数据,如果不存在返回 None
|
||||
"""
|
||||
try:
|
||||
from app.cache.memory.emotion_memory import EmotionMemoryCache
|
||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
||||
|
||||
logger.info(f"尝试从 Redis 缓存获取情绪建议: user={end_user_id}")
|
||||
logger.info(f"尝试从数据库获取情绪建议: user={end_user_id}")
|
||||
|
||||
# 从 Redis 获取缓存
|
||||
cached_data = await EmotionMemoryCache.get_emotion_suggestions(end_user_id)
|
||||
# 从数据库获取存储记录
|
||||
repo = ImplicitEmotionsStorageRepository(db)
|
||||
storage = repo.get_by_end_user_id(end_user_id)
|
||||
|
||||
if cached_data is None:
|
||||
logger.info(f"用户 {end_user_id} 的建议缓存不存在或已过期")
|
||||
if storage is None or storage.emotion_suggestions is None:
|
||||
logger.info(f"用户 {end_user_id} 的建议数据不存在")
|
||||
return None
|
||||
|
||||
logger.info(f"成功从 Redis 缓存获取建议: user={end_user_id}")
|
||||
return cached_data
|
||||
logger.info(f"成功从数据库获取建议: user={end_user_id}")
|
||||
return storage.emotion_suggestions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"从 Redis 缓存获取建议失败: {str(e)}", exc_info=True)
|
||||
logger.error(f"从数据库获取建议失败: {str(e)}", exc_info=True)
|
||||
return None
|
||||
|
||||
async def save_suggestions_cache(
|
||||
@@ -876,36 +877,27 @@ class EmotionAnalyticsService:
|
||||
end_user_id: str,
|
||||
suggestions_data: Dict[str, Any],
|
||||
db: Session,
|
||||
expires_hours: int = 24
|
||||
expires_hours: int = 24 # 参数保留以保持接口兼容性
|
||||
) -> None:
|
||||
"""保存建议到 Redis 缓存
|
||||
"""保存建议到数据库
|
||||
|
||||
Args:
|
||||
end_user_id: 宿主ID(用户组ID)
|
||||
suggestions_data: 建议数据
|
||||
db: 数据库会话(保留参数以保持接口兼容性)
|
||||
expires_hours: 过期时间(小时),默认24小时
|
||||
db: 数据库会话
|
||||
expires_hours: 保留参数(兼容性)
|
||||
"""
|
||||
try:
|
||||
from app.cache.memory.emotion_memory import EmotionMemoryCache
|
||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
||||
|
||||
logger.info(f"保存建议到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时")
|
||||
logger.info(f"保存建议到数据库: user={end_user_id}")
|
||||
|
||||
# 计算过期时间(秒)
|
||||
expire_seconds = expires_hours * 3600
|
||||
repo = ImplicitEmotionsStorageRepository(db)
|
||||
repo.update_emotion_suggestions(end_user_id, suggestions_data)
|
||||
db.commit()
|
||||
|
||||
# 保存到 Redis
|
||||
success = await EmotionMemoryCache.set_emotion_suggestions(
|
||||
user_id=end_user_id,
|
||||
suggestions_data=suggestions_data,
|
||||
expire=expire_seconds
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info(f"建议缓存保存成功: user={end_user_id}")
|
||||
else:
|
||||
logger.warning(f"建议缓存保存失败: user={end_user_id}")
|
||||
logger.info(f"建议保存成功: user={end_user_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True)
|
||||
# 不抛出异常,缓存失败不应影响主流程
|
||||
db.rollback()
|
||||
logger.error(f"保存建议失败: {str(e)}", exc_info=True)
|
||||
@@ -422,32 +422,33 @@ class ImplicitMemoryService:
|
||||
end_user_id: str,
|
||||
db: Session
|
||||
) -> Optional[dict]:
|
||||
"""从 Redis 缓存获取完整用户画像
|
||||
"""从数据库获取完整用户画像
|
||||
|
||||
Args:
|
||||
end_user_id: 终端用户ID
|
||||
db: 数据库会话(保留参数以保持接口兼容性)
|
||||
db: 数据库会话
|
||||
|
||||
Returns:
|
||||
Dict: 缓存的画像数据,如果不存在或已过期返回 None
|
||||
Dict: 存储的画像数据,如果不存在返回 None
|
||||
"""
|
||||
try:
|
||||
from app.cache.memory.implicit_memory import ImplicitMemoryCache
|
||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
||||
|
||||
logger.info(f"尝试从 Redis 缓存获取用户画像: user={end_user_id}")
|
||||
logger.info(f"尝试从数据库获取用户画像: user={end_user_id}")
|
||||
|
||||
# 从 Redis 获取缓存
|
||||
cached_data = await ImplicitMemoryCache.get_user_profile(end_user_id)
|
||||
# 从数据库获取存储记录
|
||||
repo = ImplicitEmotionsStorageRepository(db)
|
||||
storage = repo.get_by_end_user_id(end_user_id)
|
||||
|
||||
if cached_data is None:
|
||||
logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期")
|
||||
if storage is None or storage.implicit_profile is None:
|
||||
logger.info(f"用户 {end_user_id} 的画像数据不存在")
|
||||
return None
|
||||
|
||||
logger.info(f"成功从 Redis 缓存获取用户画像: user={end_user_id}")
|
||||
return cached_data
|
||||
logger.info(f"成功从数据库获取用户画像: user={end_user_id}")
|
||||
return storage.implicit_profile
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"从 Redis 缓存获取用户画像失败: {str(e)}", exc_info=True)
|
||||
logger.error(f"从数据库获取用户画像失败: {str(e)}", exc_info=True)
|
||||
return None
|
||||
|
||||
async def save_profile_cache(
|
||||
@@ -455,36 +456,27 @@ class ImplicitMemoryService:
|
||||
end_user_id: str,
|
||||
profile_data: dict,
|
||||
db: Session,
|
||||
expires_hours: int = 168 # 默认7天
|
||||
expires_hours: int = 168 # 参数保留以保持接口兼容性
|
||||
) -> None:
|
||||
"""保存用户画像到 Redis 缓存
|
||||
"""保存用户画像到数据库
|
||||
|
||||
Args:
|
||||
end_user_id: 终端用户ID
|
||||
profile_data: 画像数据
|
||||
db: 数据库会话(保留参数以保持接口兼容性)
|
||||
expires_hours: 过期时间(小时),默认168小时(7天)
|
||||
db: 数据库会话
|
||||
expires_hours: 保留参数(兼容性)
|
||||
"""
|
||||
try:
|
||||
from app.cache.memory.implicit_memory import ImplicitMemoryCache
|
||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
||||
|
||||
logger.info(f"保存用户画像到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时")
|
||||
logger.info(f"保存用户画像到数据库: user={end_user_id}")
|
||||
|
||||
# 计算过期时间(秒)
|
||||
expire_seconds = expires_hours * 3600
|
||||
repo = ImplicitEmotionsStorageRepository(db)
|
||||
repo.update_implicit_profile(end_user_id, profile_data)
|
||||
db.commit()
|
||||
|
||||
# 保存到 Redis
|
||||
success = await ImplicitMemoryCache.set_user_profile(
|
||||
user_id=end_user_id,
|
||||
profile_data=profile_data,
|
||||
expire=expire_seconds
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info(f"用户画像缓存保存成功: user={end_user_id}")
|
||||
else:
|
||||
logger.warning(f"用户画像缓存保存失败: user={end_user_id}")
|
||||
logger.info(f"用户画像保存成功: user={end_user_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True)
|
||||
# 不抛出异常,缓存失败不应影响主流程
|
||||
db.rollback()
|
||||
logger.error(f"保存用户画像失败: {str(e)}", exc_info=True)
|
||||
|
||||
216
api/app/tasks.py
216
api/app/tasks.py
@@ -1924,4 +1924,218 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
|
||||
# "config_id": config_id,
|
||||
# "elapsed_time": elapsed_time,
|
||||
# "task_id": self.request.id
|
||||
# }
|
||||
# }
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 隐性记忆和情绪数据更新定时任务
|
||||
# =============================================================================
|
||||
|
||||
@celery_app.task(
|
||||
name="app.tasks.update_implicit_emotions_storage",
|
||||
bind=True,
|
||||
ignore_result=True,
|
||||
max_retries=0,
|
||||
acks_late=False,
|
||||
time_limit=7200, # 2小时硬超时
|
||||
soft_time_limit=6900, # 1小时55分钟软超时
|
||||
)
|
||||
def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
||||
"""定时任务:更新所有用户的隐性记忆画像和情绪建议数据
|
||||
|
||||
遍历数据库中所有已存在数据的用户,为每个用户重新生成隐性记忆画像和情绪建议。
|
||||
实现错误隔离,单个用户失败不影响其他用户的处理。
|
||||
|
||||
Returns:
|
||||
包含任务执行结果的字典,包括:
|
||||
- status: 任务状态 (SUCCESS/FAILURE)
|
||||
- message: 执行消息
|
||||
- total_users: 总用户数
|
||||
- successful_implicit: 成功更新隐性记忆的用户数
|
||||
- successful_emotion: 成功更新情绪建议的用户数
|
||||
- failed: 失败的用户数
|
||||
- user_results: 每个用户的详细结果
|
||||
- elapsed_time: 执行耗时(秒)
|
||||
- task_id: 任务ID
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
async def _run() -> Dict[str, Any]:
|
||||
from app.core.logging_config import get_logger
|
||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
||||
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||
from sqlalchemy import select, func
|
||||
from app.services.implicit_memory_service import ImplicitMemoryService
|
||||
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
||||
|
||||
logger = get_logger(__name__)
|
||||
logger.info("开始执行隐性记忆和情绪数据更新定时任务")
|
||||
|
||||
total_users = 0
|
||||
successful_implicit = 0
|
||||
successful_emotion = 0
|
||||
failed = 0
|
||||
user_results = []
|
||||
|
||||
with get_db_context() as db:
|
||||
try:
|
||||
# 获取所有已存储数据的用户ID(分批次处理)
|
||||
repo = ImplicitEmotionsStorageRepository(db)
|
||||
|
||||
# 先统计总数用于日志
|
||||
from sqlalchemy import func
|
||||
total_users = db.execute(
|
||||
select(func.count()).select_from(ImplicitEmotionsStorage)
|
||||
).scalar() or 0
|
||||
logger.info(f"找到 {total_users} 个需要更新的用户")
|
||||
|
||||
# 遍历每个用户并更新数据(分批次,避免一次性加载所有ID)
|
||||
for end_user_id in repo.get_all_user_ids(batch_size=100):
|
||||
logger.info(f"开始处理用户: {end_user_id}")
|
||||
user_start_time = time.time()
|
||||
|
||||
implicit_success = False
|
||||
emotion_success = False
|
||||
errors = []
|
||||
|
||||
try:
|
||||
# 更新隐性记忆画像
|
||||
try:
|
||||
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
|
||||
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
|
||||
await implicit_service.save_profile_cache(
|
||||
end_user_id=end_user_id,
|
||||
profile_data=profile_data,
|
||||
db=db
|
||||
)
|
||||
implicit_success = True
|
||||
logger.info(f"成功更新用户 {end_user_id} 的隐性记忆画像")
|
||||
except Exception as e:
|
||||
error_msg = f"隐性记忆更新失败: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.error(f"用户 {end_user_id} {error_msg}")
|
||||
|
||||
# 更新情绪建议
|
||||
try:
|
||||
emotion_service = EmotionAnalyticsService(db=db, end_user_id=end_user_id)
|
||||
suggestions_data = await emotion_service.generate_emotion_suggestions(
|
||||
end_user_id=end_user_id,
|
||||
db=db,
|
||||
language="zh"
|
||||
)
|
||||
await emotion_service.save_suggestions_cache(
|
||||
end_user_id=end_user_id,
|
||||
suggestions_data=suggestions_data,
|
||||
db=db
|
||||
)
|
||||
emotion_success = True
|
||||
logger.info(f"成功更新用户 {end_user_id} 的情绪建议")
|
||||
except Exception as e:
|
||||
error_msg = f"情绪建议更新失败: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.error(f"用户 {end_user_id} {error_msg}")
|
||||
|
||||
# 统计结果
|
||||
if implicit_success:
|
||||
successful_implicit += 1
|
||||
if emotion_success:
|
||||
successful_emotion += 1
|
||||
if not implicit_success and not emotion_success:
|
||||
failed += 1
|
||||
|
||||
user_elapsed = time.time() - user_start_time
|
||||
|
||||
# 记录用户处理结果
|
||||
user_result = {
|
||||
"end_user_id": end_user_id,
|
||||
"implicit_success": implicit_success,
|
||||
"emotion_success": emotion_success,
|
||||
"errors": errors,
|
||||
"elapsed_time": user_elapsed
|
||||
}
|
||||
user_results.append(user_result)
|
||||
|
||||
logger.info(
|
||||
f"用户 {end_user_id} 处理完成: "
|
||||
f"隐性记忆={'成功' if implicit_success else '失败'}, "
|
||||
f"情绪建议={'成功' if emotion_success else '失败'}, "
|
||||
f"耗时={user_elapsed:.2f}秒"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# 单个用户失败不影响其他用户(错误隔离)
|
||||
failed += 1
|
||||
user_elapsed = time.time() - user_start_time
|
||||
error_info = {
|
||||
"end_user_id": end_user_id,
|
||||
"implicit_success": False,
|
||||
"emotion_success": False,
|
||||
"errors": [str(e)],
|
||||
"elapsed_time": user_elapsed
|
||||
}
|
||||
user_results.append(error_info)
|
||||
logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}")
|
||||
|
||||
# 记录总体统计信息
|
||||
logger.info(
|
||||
f"隐性记忆和情绪数据更新定时任务完成: "
|
||||
f"总用户数={total_users}, "
|
||||
f"隐性记忆成功={successful_implicit}, "
|
||||
f"情绪建议成功={successful_emotion}, "
|
||||
f"失败={failed}"
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"message": f"成功处理 {total_users} 个用户,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功",
|
||||
"total_users": total_users,
|
||||
"successful_implicit": successful_implicit,
|
||||
"successful_emotion": successful_emotion,
|
||||
"failed": failed,
|
||||
"user_results": user_results[:50] # 只保留前50个用户的详细结果
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"隐性记忆和情绪数据更新定时任务执行失败: {str(e)}")
|
||||
return {
|
||||
"status": "FAILURE",
|
||||
"error": str(e),
|
||||
"total_users": total_users,
|
||||
"successful_implicit": successful_implicit,
|
||||
"successful_emotion": successful_emotion,
|
||||
"failed": failed,
|
||||
"user_results": user_results[:50]
|
||||
}
|
||||
|
||||
try:
|
||||
# 使用 nest_asyncio 来避免事件循环冲突
|
||||
try:
|
||||
import nest_asyncio
|
||||
nest_asyncio.apply()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# 尝试获取现有事件循环,如果不存在则创建新的
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
result = loop.run_until_complete(_run())
|
||||
elapsed_time = time.time() - start_time
|
||||
result["elapsed_time"] = elapsed_time
|
||||
result["task_id"] = self.request.id
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
elapsed_time = time.time() - start_time
|
||||
return {
|
||||
"status": "FAILURE",
|
||||
"error": str(e),
|
||||
"elapsed_time": elapsed_time,
|
||||
"task_id": self.request.id
|
||||
}
|
||||
|
||||
Submodule redbear-mem-benchmark updated: 4b0257bb4e...8494e82498
Reference in New Issue
Block a user