Merge pull request #520 from SuanmoSuanyangTechnology/feature/interest-exists
Feature/interest exists
This commit is contained in:
@@ -114,6 +114,7 @@ celery_app.conf.update(
|
|||||||
'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'},
|
'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'},
|
||||||
'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'},
|
'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'},
|
||||||
'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'},
|
'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'},
|
||||||
|
'app.tasks.init_interest_distribution_for_users': {'queue': 'periodic_tasks'},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -156,9 +156,13 @@ async def get_workspace_end_users(
|
|||||||
"app.tasks.init_implicit_emotions_for_users",
|
"app.tasks.init_implicit_emotions_for_users",
|
||||||
kwargs={"end_user_ids": end_user_ids},
|
kwargs={"end_user_ids": end_user_ids},
|
||||||
)
|
)
|
||||||
api_logger.info(f"已触发隐性记忆按需初始化任务,候选用户数: {len(end_user_ids)}")
|
_celery_app.send_task(
|
||||||
|
"app.tasks.init_interest_distribution_for_users",
|
||||||
|
kwargs={"end_user_ids": end_user_ids},
|
||||||
|
)
|
||||||
|
api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.warning(f"触发隐性记忆按需初始化任务失败(不影响主流程): {e}")
|
api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}")
|
||||||
|
|
||||||
# 并发执行配置查询和记忆数量查询
|
# 并发执行配置查询和记忆数量查询
|
||||||
memory_configs_map, memory_nums_map = await asyncio.gather(
|
memory_configs_map, memory_nums_map = await asyncio.gather(
|
||||||
|
|||||||
@@ -8,6 +8,13 @@ import logging
|
|||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from typing import Generator, Optional
|
from typing import Generator, Optional
|
||||||
|
|
||||||
|
|
||||||
|
class TimeFilterUnavailableError(Exception):
|
||||||
|
"""redis_client 不可用,无法执行时间轴筛选。
|
||||||
|
|
||||||
|
调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。
|
||||||
|
"""
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
from sqlalchemy import exists, not_, select
|
from sqlalchemy import exists, not_, select
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
@@ -113,7 +120,7 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]:
|
def get_users_needing_refresh(self, redis_client: redis.StrictRedis, batch_size: int = 100) -> Generator[str, None, None]:
|
||||||
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
|
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
|
||||||
|
|
||||||
筛选逻辑:
|
筛选逻辑:
|
||||||
@@ -123,27 +130,21 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
- 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新
|
- 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新
|
||||||
- 若 last_done <= updated_at,说明已是最新,跳过
|
- 若 last_done <= updated_at,说明已是最新,跳过
|
||||||
|
|
||||||
如果 redis_client 为 None,则降级为返回所有用户(禁用时间过滤)。
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),如果为 None 则禁用时间过滤
|
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB)
|
||||||
batch_size: 每批次加载的数量
|
batch_size: 每批次加载的数量
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TimeFilterUnavailableError: redis_client 为 None 时抛出,调用方可捕获并回退到 get_all_user_ids
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
需要刷新的用户ID字符串
|
需要刷新的用户ID字符串
|
||||||
"""
|
"""
|
||||||
from datetime import timezone
|
if redis_client is None:
|
||||||
|
raise TimeFilterUnavailableError("redis_client 不可用,无法执行时间轴筛选")
|
||||||
|
|
||||||
from redis.exceptions import RedisError
|
from redis.exceptions import RedisError
|
||||||
|
|
||||||
# 如果 Redis 不可用,降级为处理所有用户
|
|
||||||
if redis_client is None:
|
|
||||||
logger.warning(
|
|
||||||
"Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户"
|
|
||||||
)
|
|
||||||
yield from self.get_all_user_ids(batch_size)
|
|
||||||
return
|
|
||||||
|
|
||||||
offset = 0
|
offset = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -178,16 +179,14 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
try:
|
try:
|
||||||
CST = timezone(timedelta(hours=8))
|
CST = timezone(timedelta(hours=8))
|
||||||
last_done = datetime.fromisoformat(raw)
|
last_done = datetime.fromisoformat(raw)
|
||||||
# 统一转为 CST naive 时间做比较
|
# last_done 写入时已是 CST naive,直接使用,无需转换
|
||||||
if last_done.tzinfo is None:
|
if last_done.tzinfo is not None:
|
||||||
last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
|
|
||||||
else:
|
|
||||||
last_done = last_done.astimezone(CST).replace(tzinfo=None)
|
last_done = last_done.astimezone(CST).replace(tzinfo=None)
|
||||||
|
|
||||||
if updated_at is None:
|
if updated_at is None:
|
||||||
yield end_user_id
|
yield end_user_id
|
||||||
continue
|
continue
|
||||||
# updated_at 同样转为 CST naive
|
# updated_at 数据库存的是 UTC naive,转为 CST naive 再比较
|
||||||
if updated_at.tzinfo is None:
|
if updated_at.tzinfo is None:
|
||||||
updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
|
updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
|
||||||
else:
|
else:
|
||||||
|
|||||||
117
api/app/tasks.py
117
api/app/tasks.py
@@ -2228,6 +2228,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
|||||||
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||||
from app.repositories.implicit_emotions_storage_repository import (
|
from app.repositories.implicit_emotions_storage_repository import (
|
||||||
ImplicitEmotionsStorageRepository,
|
ImplicitEmotionsStorageRepository,
|
||||||
|
TimeFilterUnavailableError,
|
||||||
)
|
)
|
||||||
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
||||||
from app.services.implicit_memory_service import ImplicitMemoryService
|
from app.services.implicit_memory_service import ImplicitMemoryService
|
||||||
@@ -2256,7 +2257,14 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
|||||||
_redis_client = get_sync_redis_client()
|
_redis_client = get_sync_redis_client()
|
||||||
|
|
||||||
# 只处理 last_done > updated_at 的用户(有新记忆写入的用户)
|
# 只处理 last_done > updated_at 的用户(有新记忆写入的用户)
|
||||||
for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100):
|
# Redis 不可用时回退到全量处理
|
||||||
|
try:
|
||||||
|
refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100)
|
||||||
|
except TimeFilterUnavailableError as e:
|
||||||
|
logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}")
|
||||||
|
refresh_iter = repo.get_all_user_ids(batch_size=100)
|
||||||
|
|
||||||
|
for end_user_id in refresh_iter:
|
||||||
logger.info(f"开始处理用户: {end_user_id}")
|
logger.info(f"开始处理用户: {end_user_id}")
|
||||||
user_start_time = time.time()
|
user_start_time = time.time()
|
||||||
|
|
||||||
@@ -2605,3 +2613,110 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
|
|||||||
"elapsed_time": time.time() - start_time,
|
"elapsed_time": time.time() - start_time,
|
||||||
"task_id": self.request.id,
|
"task_id": self.request.id,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
@celery_app.task(
|
||||||
|
name="app.tasks.init_interest_distribution_for_users",
|
||||||
|
bind=True,
|
||||||
|
ignore_result=True,
|
||||||
|
max_retries=0,
|
||||||
|
acks_late=False,
|
||||||
|
time_limit=3600,
|
||||||
|
soft_time_limit=3300,
|
||||||
|
)
|
||||||
|
def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]:
|
||||||
|
"""事件触发任务:检查指定用户列表的兴趣分布缓存,无缓存则生成并写入 Redis。
|
||||||
|
|
||||||
|
由 /dashboard/end_users 接口触发,已有缓存的用户直接跳过。
|
||||||
|
默认生成中文(zh)兴趣分布数据。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_user_ids: 需要检查的用户ID列表
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含任务执行结果的字典
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
async def _run() -> Dict[str, Any]:
|
||||||
|
from app.core.logging_config import get_logger
|
||||||
|
from app.cache.memory.interest_memory import InterestMemoryCache, INTEREST_CACHE_EXPIRE
|
||||||
|
from app.services.memory_agent_service import MemoryAgentService
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
logger.info(f"开始按需初始化兴趣分布缓存,候选用户数: {len(end_user_ids)}")
|
||||||
|
|
||||||
|
initialized = 0
|
||||||
|
failed = 0
|
||||||
|
skipped = 0
|
||||||
|
language = "zh"
|
||||||
|
|
||||||
|
service = MemoryAgentService()
|
||||||
|
|
||||||
|
with get_db_context() as db:
|
||||||
|
for end_user_id in end_user_ids:
|
||||||
|
# 存在性检查:缓存有数据则跳过
|
||||||
|
cached = await InterestMemoryCache.get_interest_distribution(
|
||||||
|
end_user_id=end_user_id,
|
||||||
|
language=language,
|
||||||
|
)
|
||||||
|
if cached is not None:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(f"用户 {end_user_id} 无兴趣分布缓存,开始生成")
|
||||||
|
try:
|
||||||
|
result = await service.get_interest_distribution_by_user(
|
||||||
|
end_user_id=end_user_id,
|
||||||
|
limit=5,
|
||||||
|
language=language,
|
||||||
|
)
|
||||||
|
await InterestMemoryCache.set_interest_distribution(
|
||||||
|
end_user_id=end_user_id,
|
||||||
|
language=language,
|
||||||
|
data=result,
|
||||||
|
expire=INTEREST_CACHE_EXPIRE,
|
||||||
|
)
|
||||||
|
initialized += 1
|
||||||
|
logger.info(f"用户 {end_user_id} 兴趣分布缓存生成成功")
|
||||||
|
except Exception as e:
|
||||||
|
failed += 1
|
||||||
|
logger.error(f"用户 {end_user_id} 兴趣分布缓存生成失败: {e}")
|
||||||
|
|
||||||
|
logger.info(f"兴趣分布按需初始化完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}")
|
||||||
|
return {
|
||||||
|
"status": "SUCCESS",
|
||||||
|
"initialized": initialized,
|
||||||
|
"skipped": skipped,
|
||||||
|
"failed": failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
import nest_asyncio
|
||||||
|
nest_asyncio.apply()
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
if loop.is_closed():
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
except RuntimeError:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
result = loop.run_until_complete(_run())
|
||||||
|
result["elapsed_time"] = time.time() - start_time
|
||||||
|
result["task_id"] = self.request.id
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
return {
|
||||||
|
"status": "FAILURE",
|
||||||
|
"error": str(e),
|
||||||
|
"elapsed_time": time.time() - start_time,
|
||||||
|
"task_id": self.request.id,
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user