Merge pull request #518 from SuanmoSuanyangTechnology/feature/timer-shaft

Feature/timer shaft
This commit is contained in:
Ke Sun
2026-03-09 14:44:46 +08:00
committed by GitHub
6 changed files with 457 additions and 160 deletions

View File

@@ -113,6 +113,7 @@ celery_app.conf.update(
'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'}, 'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'},
'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'},
'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'},
'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'},
}, },
) )

View File

@@ -149,6 +149,17 @@ async def get_workspace_end_users(
return {uid: {"total": 0} for uid in end_user_ids} return {uid: {"total": 0} for uid in end_user_ids}
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
try:
from app.celery_app import celery_app as _celery_app
_celery_app.send_task(
"app.tasks.init_implicit_emotions_for_users",
kwargs={"end_user_ids": end_user_ids},
)
api_logger.info(f"已触发隐性记忆按需初始化任务,候选用户数: {len(end_user_ids)}")
except Exception as e:
api_logger.warning(f"触发隐性记忆按需初始化任务失败(不影响主流程): {e}")
# 并发执行配置查询和记忆数量查询 # 并发执行配置查询和记忆数量查询
memory_configs_map, memory_nums_map = await asyncio.gather( memory_configs_map, memory_nums_map = await asyncio.gather(
get_memory_configs(), get_memory_configs(),

View File

@@ -5,13 +5,15 @@ Implicit Emotions Storage Repository
事务由调用方控制,仓储层只使用 flush/refresh 事务由调用方控制,仓储层只使用 flush/refresh
""" """
import logging import logging
from datetime import datetime, date, timezone, timedelta from datetime import date, datetime, timedelta, timezone
from typing import Optional, Generator from typing import Generator, Optional
from sqlalchemy.orm import Session
from sqlalchemy import select, not_, exists import redis
from sqlalchemy import exists, not_, select
from sqlalchemy.orm import Session
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from app.models.end_user_model import EndUser from app.models.end_user_model import EndUser
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -111,6 +113,96 @@ class ImplicitEmotionsStorageRepository:
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
break break
def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
筛选逻辑:
- 查询 implicit_emotions_storage 中所有用户的 end_user_id 和 updated_at
- 从 Redis 读取 write_message:last_done:{end_user_id} 的时间戳
- 若 Redis 中无记录(该用户从未写入过记忆),跳过
- 若 last_done > updated_at说明上次刷新后又有新记忆写入需要刷新
- 若 last_done <= updated_at说明已是最新跳过
如果 redis_client 为 None则降级为返回所有用户禁用时间过滤
Args:
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB如果为 None 则禁用时间过滤
batch_size: 每批次加载的数量
Yields:
需要刷新的用户ID字符串
"""
from datetime import timezone
from redis.exceptions import RedisError
# 如果 Redis 不可用,降级为处理所有用户
if redis_client is None:
logger.warning(
"Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户"
)
yield from self.get_all_user_ids(batch_size)
return
offset = 0
while True:
try:
stmt = (
select(ImplicitEmotionsStorage.end_user_id, ImplicitEmotionsStorage.updated_at)
.order_by(ImplicitEmotionsStorage.end_user_id)
.limit(batch_size)
.offset(offset)
)
batch = self.db.execute(stmt).all()
if not batch:
break
# 批量获取当前批次所有用户的 last_done 时间戳(一次网络往返)
keys = [f"write_message:last_done:{end_user_id}" for end_user_id, _ in batch]
try:
raw_values = redis_client.mget(keys)
except RedisError as e:
logger.error(
f"Redis mget 操作失败: {e},当前批次降级为处理所有用户",
extra={"offset": offset, "batch_size": len(batch)}
)
# Redis 操作失败,降级为返回当前批次所有用户
yield from (end_user_id for end_user_id, _ in batch)
offset += batch_size
continue
for (end_user_id, updated_at), raw in zip(batch, raw_values):
if raw is None:
continue
try:
CST = timezone(timedelta(hours=8))
last_done = datetime.fromisoformat(raw)
# 统一转为 CST naive 时间做比较
if last_done.tzinfo is None:
last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
else:
last_done = last_done.astimezone(CST).replace(tzinfo=None)
if updated_at is None:
yield end_user_id
continue
# updated_at 同样转为 CST naive
if updated_at.tzinfo is None:
updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
else:
updated_at_cst = updated_at.astimezone(CST).replace(tzinfo=None)
if last_done > updated_at_cst:
yield end_user_id
except Exception as e:
logger.warning(f"解析 last_done 时间戳失败: end_user_id={end_user_id}, raw={raw}, error={e}")
offset += batch_size
except Exception as e:
logger.error(f"get_users_needing_refresh 分批查询失败: offset={offset}, error={e}")
break
def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]: def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID """分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID
@@ -124,7 +216,8 @@ class ImplicitEmotionsStorageRepository:
Yields: Yields:
用户ID字符串 用户ID字符串
""" """
from sqlalchemy import cast, String as SAString from sqlalchemy import String as SAString
from sqlalchemy import cast
CST = timezone(timedelta(hours=8)) CST = timezone(timedelta(hours=8))
now_cst = datetime.now(CST) now_cst = datetime.now(CST)
today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None) today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None)

View File

@@ -130,6 +130,7 @@ def _create_workspace_only(
business_logger.error(f"创建工作空间失败: {workspace.name} - {str(e)}") business_logger.error(f"创建工作空间失败: {workspace.name} - {str(e)}")
raise raise
def create_workspace( def create_workspace(
db: Session, workspace: WorkspaceCreate, user: User, language: str = "zh" db: Session, workspace: WorkspaceCreate, user: User, language: str = "zh"
) -> Workspace: ) -> Workspace:
@@ -966,6 +967,125 @@ def update_workspace_models_configs(
raise BusinessException(f"更新模型配置失败: {str(e)}", BizCode.INTERNAL_ERROR) raise BusinessException(f"更新模型配置失败: {str(e)}", BizCode.INTERNAL_ERROR)
def _fill_workspace_configs_model_defaults(
db: Session,
workspace: Workspace
) -> None:
"""Fill empty model fields for all memory configs in a workspace.
Updates llm_id, embedding_id, rerank_id, reflection_model_id, and emotion_model_id
if they are None, using the corresponding workspace default models.
Args:
db: Database session
workspace: The workspace containing default model settings
"""
from app.models.memory_config_model import MemoryConfig
# Get all configs for this workspace
configs = db.query(MemoryConfig).filter(
MemoryConfig.workspace_id == workspace.id
).all()
if not configs:
return
# Map of memory_config field -> workspace field
model_field_mappings = [
("llm_id", "llm"),
("embedding_id", "embedding"),
("rerank_id", "rerank"),
("reflection_model_id", "llm"), # reflection uses LLM
("emotion_model_id", "llm"), # emotion uses LLM
]
configs_updated = 0
for memory_config in configs:
updated_fields = []
for config_field, workspace_field in model_field_mappings:
config_value = getattr(memory_config, config_field, None)
workspace_value = getattr(workspace, workspace_field, None)
if not config_value and workspace_value:
setattr(memory_config, config_field, workspace_value)
updated_fields.append(config_field)
if updated_fields:
configs_updated += 1
business_logger.debug(
f"Updated memory config {memory_config.config_id} fields: {updated_fields}"
)
if configs_updated > 0:
try:
db.commit()
business_logger.info(
f"Updated {configs_updated} memory configs in workspace {workspace.id} with default models"
)
except Exception as e:
db.rollback()
business_logger.error(
f"Failed to update memory configs in workspace {workspace.id}: {str(e)}"
)
def _create_default_memory_config(
db: Session,
workspace_id: uuid.UUID,
workspace_name: str,
llm_id: Optional[uuid.UUID] = None,
embedding_id: Optional[uuid.UUID] = None,
rerank_id: Optional[uuid.UUID] = None,
scene_id: Optional[uuid.UUID] = None,
pruning_scene_name: Optional[str] = None,
) -> None:
"""Create a default memory config for a newly created workspace.
Args:
db: Database session
workspace_id: The workspace ID
workspace_name: The workspace name (used for config naming)
llm_id: Optional LLM model ID
embedding_id: Optional embedding model ID
rerank_id: Optional rerank model ID
scene_id: Optional ontology scene ID (默认关联教育场景)
pruning_scene_name: Optional pruning scene name取自 ontology_scene.scene_name
"""
from app.models.memory_config_model import MemoryConfig
config_id = uuid.uuid4()
default_config = MemoryConfig(
config_id=config_id,
config_name=f"{workspace_name} 默认配置",
config_desc="工作空间创建时自动生成的默认记忆配置",
workspace_id=workspace_id,
llm_id=str(llm_id) if llm_id else None,
embedding_id=str(embedding_id) if embedding_id else None,
rerank_id=str(rerank_id) if rerank_id else None,
scene_id=scene_id, # 关联本体场景ID默认为"在线教育"场景)
pruning_scene=pruning_scene_name, # 语义剪枝场景直接使用 scene_name
state=True, # Active by default
is_default=True, # Mark as workspace default
)
db.add(default_config)
db.flush() # 使用 flush 而不是 commit让调用者统一提交
business_logger.info(
"Created default memory config for workspace",
extra={
"workspace_id": str(workspace_id),
"config_id": str(config_id),
"config_name": default_config.config_name,
"scene_id": str(scene_id) if scene_id else None,
}
)
# ==================== 检查配置相关服务 ====================
def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None: def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None:
"""Ensure a workspace has a default memory config, creating one if missing. """Ensure a workspace has a default memory config, creating one if missing.
@@ -1045,70 +1165,6 @@ def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None:
_fill_workspace_configs_model_defaults(db, workspace) _fill_workspace_configs_model_defaults(db, workspace)
def _fill_workspace_configs_model_defaults(
db: Session,
workspace: Workspace
) -> None:
"""Fill empty model fields for all memory configs in a workspace.
Updates llm_id, embedding_id, rerank_id, reflection_model_id, and emotion_model_id
if they are None, using the corresponding workspace default models.
Args:
db: Database session
workspace: The workspace containing default model settings
"""
from app.models.memory_config_model import MemoryConfig
# Get all configs for this workspace
configs = db.query(MemoryConfig).filter(
MemoryConfig.workspace_id == workspace.id
).all()
if not configs:
return
# Map of memory_config field -> workspace field
model_field_mappings = [
("llm_id", "llm"),
("embedding_id", "embedding"),
("rerank_id", "rerank"),
("reflection_model_id", "llm"), # reflection uses LLM
("emotion_model_id", "llm"), # emotion uses LLM
]
configs_updated = 0
for memory_config in configs:
updated_fields = []
for config_field, workspace_field in model_field_mappings:
config_value = getattr(memory_config, config_field, None)
workspace_value = getattr(workspace, workspace_field, None)
if not config_value and workspace_value:
setattr(memory_config, config_field, workspace_value)
updated_fields.append(config_field)
if updated_fields:
configs_updated += 1
business_logger.debug(
f"Updated memory config {memory_config.config_id} fields: {updated_fields}"
)
if configs_updated > 0:
try:
db.commit()
business_logger.info(
f"Updated {configs_updated} memory configs in workspace {workspace.id} with default models"
)
except Exception as e:
db.rollback()
business_logger.error(
f"Failed to update memory configs in workspace {workspace.id}: {str(e)}"
)
def _ensure_default_ontology_scenes(db: Session, workspace: Workspace) -> None: def _ensure_default_ontology_scenes(db: Session, workspace: Workspace) -> None:
"""Ensure a workspace has default ontology scenes, creating them if missing. """Ensure a workspace has default ontology scenes, creating them if missing.
@@ -1154,56 +1210,3 @@ def _ensure_default_ontology_scenes(db: Session, workspace: Workspace) -> None:
f"为工作空间 {workspace.id} 补建默认本体场景异常: {str(e)}" f"为工作空间 {workspace.id} 补建默认本体场景异常: {str(e)}"
) )
def _create_default_memory_config(
db: Session,
workspace_id: uuid.UUID,
workspace_name: str,
llm_id: Optional[uuid.UUID] = None,
embedding_id: Optional[uuid.UUID] = None,
rerank_id: Optional[uuid.UUID] = None,
scene_id: Optional[uuid.UUID] = None,
pruning_scene_name: Optional[str] = None,
) -> None:
"""Create a default memory config for a newly created workspace.
Args:
db: Database session
workspace_id: The workspace ID
workspace_name: The workspace name (used for config naming)
llm_id: Optional LLM model ID
embedding_id: Optional embedding model ID
rerank_id: Optional rerank model ID
scene_id: Optional ontology scene ID (默认关联教育场景)
pruning_scene_name: Optional pruning scene name取自 ontology_scene.scene_name
"""
from app.models.memory_config_model import MemoryConfig
config_id = uuid.uuid4()
default_config = MemoryConfig(
config_id=config_id,
config_name=f"{workspace_name} 默认配置",
config_desc="工作空间创建时自动生成的默认记忆配置",
workspace_id=workspace_id,
llm_id=str(llm_id) if llm_id else None,
embedding_id=str(embedding_id) if embedding_id else None,
rerank_id=str(rerank_id) if rerank_id else None,
scene_id=scene_id, # 关联本体场景ID默认为"在线教育"场景)
pruning_scene=pruning_scene_name, # 语义剪枝场景直接使用 scene_name
state=True, # Active by default
is_default=True, # Mark as workspace default
)
db.add(default_config)
db.flush() # 使用 flush 而不是 commit让调用者统一提交
business_logger.info(
"Created default memory config for workspace",
extra={
"workspace_id": str(workspace_id),
"config_id": str(config_id),
"config_name": default_config.config_name,
"scene_id": str(scene_id) if scene_id else None,
}
)

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import json import json
import logging
import os import os
import re import re
import shutil import shutil
@@ -14,6 +15,62 @@ from uuid import UUID
import redis import redis
import requests import requests
from redis.exceptions import RedisError
logger = logging.getLogger(__name__)
# 模块级同步 Redis 连接池,供 Celery 任务共享使用
# 连接 CELERY_BACKEND DB与 write_message:last_done 时间戳写入保持一致
# 使用连接池而非单例客户端,提供更好的并发性能和自动重连
_sync_redis_pool: redis.ConnectionPool = None
def _get_or_create_redis_pool() -> redis.ConnectionPool:
"""获取或创建 Redis 连接池(懒初始化)"""
global _sync_redis_pool
if _sync_redis_pool is None:
try:
_sync_redis_pool = redis.ConnectionPool(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB_CELERY_BACKEND,
password=settings.REDIS_PASSWORD,
decode_responses=True,
max_connections=10,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30,
)
logger.info("Redis connection pool created for Celery tasks")
except Exception as e:
logger.error(f"Failed to create Redis connection pool: {e}", exc_info=True)
return None
return _sync_redis_pool
def get_sync_redis_client() -> Optional[redis.StrictRedis]:
"""获取同步 Redis 客户端(使用连接池)
使用连接池提供的客户端,支持自动重连和健康检查。
如果 Redis 不可用,返回 None调用方应优雅降级。
Returns:
redis.StrictRedis: Redis 客户端实例,如果连接失败则返回 None
"""
try:
pool = _get_or_create_redis_pool()
if pool is None:
return None
client = redis.StrictRedis(connection_pool=pool)
# 验证连接可用性
client.ping()
return client
except RedisError as e:
logger.error(f"Redis connection failed: {e}", exc_info=True)
return None
except Exception as e:
logger.error(f"Unexpected error getting Redis client: {e}", exc_info=True)
return None
# Import a unified Celery instance # Import a unified Celery instance
from app.celery_app import celery_app from app.celery_app import celery_app
@@ -1090,6 +1147,22 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
logger.info( logger.info(
f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
# 记录该用户最后一次 write_message 成功的时间,供时间轴筛选使用
try:
_r = get_sync_redis_client()
if _r is not None:
from datetime import timedelta as _td
from datetime import timezone as _tz
_CST = _tz(_td(hours=8))
_now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat()
_r.set(
f"write_message:last_done:{end_user_id}",
_now_cst,
ex=86400 * 30,
)
except Exception as _e:
logger.warning(f"[CELERY WRITE] 写入 last_done 时间戳失败(不影响主流程): {_e}")
return { return {
"status": "SUCCESS", "status": "SUCCESS",
"result": result, "result": result,
@@ -2149,12 +2222,15 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
start_time = time.time() start_time = time.time()
async def _run() -> Dict[str, Any]: async def _run() -> Dict[str, Any]:
from sqlalchemy import func, select
from app.core.logging_config import get_logger from app.core.logging_config import get_logger
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from sqlalchemy import select, func from app.repositories.implicit_emotions_storage_repository import (
from app.services.implicit_memory_service import ImplicitMemoryService ImplicitEmotionsStorageRepository,
)
from app.services.emotion_analytics_service import EmotionAnalyticsService from app.services.emotion_analytics_service import EmotionAnalyticsService
from app.services.implicit_memory_service import ImplicitMemoryService
logger = get_logger(__name__) logger = get_logger(__name__)
logger.info("开始执行隐性记忆和情绪数据更新定时任务") logger.info("开始执行隐性记忆和情绪数据更新定时任务")
@@ -2167,18 +2243,20 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
with get_db_context() as db: with get_db_context() as db:
try: try:
# 获取所有已存储数据的用户ID分批次处理
repo = ImplicitEmotionsStorageRepository(db) repo = ImplicitEmotionsStorageRepository(db)
# 先统计总数用于日志 # 先统计总数用于日志
from sqlalchemy import func from sqlalchemy import func
total_users = db.execute( total_users = db.execute(
select(func.count()).select_from(ImplicitEmotionsStorage) select(func.count()).select_from(ImplicitEmotionsStorage)
).scalar() or 0 ).scalar() or 0
logger.info(f"找到 {total_users} 个需要更新的用户") logger.info(f"表中存量用户总数: {total_users},开始时间轴筛选")
# 遍历每个用户并更新数据分批次避免一次性加载所有ID # 构建 Redis 同步客户端,用于时间轴筛选
for end_user_id in repo.get_all_user_ids(batch_size=100): _redis_client = get_sync_redis_client()
# 只处理 last_done > updated_at 的用户(有新记忆写入的用户)
for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100):
logger.info(f"开始处理用户: {end_user_id}") logger.info(f"开始处理用户: {end_user_id}")
user_start_time = time.time() user_start_time = time.time()
@@ -2264,10 +2342,10 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_results.append(error_info) user_results.append(error_info)
logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}") logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}")
# ---- 处理增量用户(当天新增、尚未初始化的用户)---- # ---- 当天新增用户兜底初始化 ----
new_users_initialized = 0 new_users_initialized = 0
new_users_failed = 0 new_users_failed = 0
logger.info("开始处理当天新增的增量用户初始化") logger.info("开始处理当天新增用户的兜底初始化")
for end_user_id in repo.get_new_user_ids_today(batch_size=100): for end_user_id in repo.get_new_user_ids_today(batch_size=100):
logger.info(f"开始初始化新用户: {end_user_id}") logger.info(f"开始初始化新用户: {end_user_id}")
@@ -2281,35 +2359,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
await implicit_service.save_profile_cache( await implicit_service.save_profile_cache(
end_user_id=end_user_id, end_user_id=end_user_id, profile_data=profile_data, db=db
profile_data=profile_data,
db=db
) )
implicit_success = True implicit_success = True
logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像") logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像")
except Exception as e: except Exception as e:
error_msg = f"隐性记忆初始化失败: {str(e)}" errors.append(f"隐性记忆初始化失败: {str(e)}")
errors.append(error_msg) logger.error(f"新用户 {end_user_id} 隐性记忆初始化失败: {e}")
logger.error(f"新用户 {end_user_id} {error_msg}")
try: try:
emotion_service = EmotionAnalyticsService() emotion_service = EmotionAnalyticsService()
suggestions_data = await emotion_service.generate_emotion_suggestions( suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id, end_user_id=end_user_id, db=db, language="zh"
db=db,
language="zh"
) )
await emotion_service.save_suggestions_cache( await emotion_service.save_suggestions_cache(
end_user_id=end_user_id, end_user_id=end_user_id, suggestions_data=suggestions_data, db=db
suggestions_data=suggestions_data,
db=db
) )
emotion_success = True emotion_success = True
logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议") logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议")
except Exception as e: except Exception as e:
error_msg = f"情绪建议初始化失败: {str(e)}" errors.append(f"情绪建议初始化失败: {str(e)}")
errors.append(error_msg) logger.error(f"新用户 {end_user_id} 情绪建议初始化失败: {e}")
logger.error(f"新用户 {end_user_id} {error_msg}")
if implicit_success or emotion_success: if implicit_success or emotion_success:
new_users_initialized += 1 new_users_initialized += 1
@@ -2319,7 +2389,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_elapsed = time.time() - user_start_time user_elapsed = time.time() - user_start_time
user_results.append({ user_results.append({
"end_user_id": end_user_id, "end_user_id": end_user_id,
"type": "init", "type": "new_user_init",
"implicit_success": implicit_success, "implicit_success": implicit_success,
"emotion_success": emotion_success, "emotion_success": emotion_success,
"errors": errors, "errors": errors,
@@ -2331,7 +2401,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_elapsed = time.time() - user_start_time user_elapsed = time.time() - user_start_time
user_results.append({ user_results.append({
"end_user_id": end_user_id, "end_user_id": end_user_id,
"type": "init", "type": "new_user_init",
"implicit_success": False, "implicit_success": False,
"emotion_success": False, "emotion_success": False,
"errors": [str(e)], "errors": [str(e)],
@@ -2339,27 +2409,24 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
}) })
logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}") logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}")
logger.info( logger.info(f"当天新增用户兜底初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}")
f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}" # ---- 新增用户兜底初始化结束 ----
)
# ---- 增量用户处理结束 ----
# 记录总体统计信息
logger.info( logger.info(
f"隐性记忆和情绪数据更新定时任务完成: " f"隐性记忆和情绪数据更新定时任务完成: "
f"存量用户总数={total_users}, " f"存量用户总数={total_users}, "
f"隐性记忆成功={successful_implicit}, " f"隐性记忆成功={successful_implicit}, "
f"情绪建议成功={successful_emotion}, " f"情绪建议成功={successful_emotion}, "
f"存量失败={failed}, " f"存量失败={failed}, "
f"增量初始化成功={new_users_initialized}, " f"新增用户初始化成功={new_users_initialized}, "
f"增量初始化失败={new_users_failed}" f"新增用户初始化失败={new_users_failed}"
) )
return { return {
"status": "SUCCESS", "status": "SUCCESS",
"message": ( "message": (
f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;" f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;"
f"量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" f"当天新增用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败"
), ),
"total_users": total_users, "total_users": total_users,
"successful_implicit": successful_implicit, "successful_implicit": successful_implicit,
@@ -2367,7 +2434,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
"failed": failed, "failed": failed,
"new_users_initialized": new_users_initialized, "new_users_initialized": new_users_initialized,
"new_users_failed": new_users_failed, "new_users_failed": new_users_failed,
"user_results": user_results[:50] # 只保留前50个用户的详细结果 "user_results": user_results[:50]
} }
except Exception as e: except Exception as e:
@@ -2416,3 +2483,125 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"task_id": self.request.id "task_id": self.request.id
} }
# =============================================================================
@celery_app.task(
name="app.tasks.init_implicit_emotions_for_users",
bind=True,
ignore_result=True,
max_retries=0,
acks_late=False,
time_limit=3600,
soft_time_limit=3300,
# 触发型任务标识,区别于 periodic_tasks 队列中的定时任务
triggered=True,
)
def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]:
"""事件触发任务:对指定用户列表做存在性检查,无记录则执行首次初始化。
由 /dashboard/end_users 接口触发,已有数据的用户直接跳过。
存量用户的数据刷新由定时任务 update_implicit_emotions_storage 负责。
Args:
end_user_ids: 需要检查的用户ID列表
Returns:
包含任务执行结果的字典
"""
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger
from app.repositories.implicit_emotions_storage_repository import (
ImplicitEmotionsStorageRepository,
)
from app.services.emotion_analytics_service import EmotionAnalyticsService
from app.services.implicit_memory_service import ImplicitMemoryService
logger = get_logger(__name__)
logger.info(f"开始按需初始化隐性记忆/情绪数据,候选用户数: {len(end_user_ids)}")
initialized = 0
failed = 0
skipped = 0
with get_db_context() as db:
repo = ImplicitEmotionsStorageRepository(db)
for end_user_id in end_user_ids:
existing = repo.get_by_end_user_id(end_user_id)
if existing is not None:
skipped += 1
continue
logger.info(f"用户 {end_user_id} 无记录,开始初始化")
implicit_ok = False
emotion_ok = False
try:
try:
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
await implicit_service.save_profile_cache(
end_user_id=end_user_id, profile_data=profile_data, db=db
)
implicit_ok = True
except Exception as e:
logger.error(f"用户 {end_user_id} 隐性记忆初始化失败: {e}")
try:
emotion_service = EmotionAnalyticsService()
suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id, db=db, language="zh"
)
await emotion_service.save_suggestions_cache(
end_user_id=end_user_id, suggestions_data=suggestions_data, db=db
)
emotion_ok = True
except Exception as e:
logger.error(f"用户 {end_user_id} 情绪建议初始化失败: {e}")
if implicit_ok or emotion_ok:
initialized += 1
else:
failed += 1
except Exception as e:
failed += 1
logger.error(f"用户 {end_user_id} 初始化异常: {e}")
logger.info(f"按需初始化完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}")
return {
"status": "SUCCESS",
"initialized": initialized,
"skipped": skipped,
"failed": failed,
}
try:
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_run())
result["elapsed_time"] = time.time() - start_time
result["task_id"] = self.request.id
return result
except Exception as e:
return {
"status": "FAILURE",
"error": str(e),
"elapsed_time": time.time() - start_time,
"task_id": self.request.id,
}

View File

@@ -49,7 +49,7 @@ services:
networks: networks:
- celery - celery
# Periodic worker - Scheduled/beat tasks (prefork, low concurrency) # Periodic worker - Scheduled/beat tasks + API-triggered tasks (prefork, low concurrency)
worker-periodic: worker-periodic:
image: redbear-mem-open:latest image: redbear-mem-open:latest
container_name: worker-periodic container_name: worker-periodic