[fix]Reconstructing memory incremental statistical scheduling task

This commit is contained in:
lanceyq
2026-02-27 12:20:51 +08:00
parent 3d068a9c96
commit 3a36d038ee
3 changed files with 203 additions and 12 deletions

View File

@@ -82,7 +82,7 @@ celery_app.conf.update(
'app.tasks.workspace_reflection_task': {'queue': 'periodic_tasks'},
'app.tasks.regenerate_memory_cache': {'queue': 'periodic_tasks'},
'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'},
'app.controllers.memory_storage_controller.search_all': {'queue': 'periodic_tasks'},
'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'},
},
)
@@ -115,16 +115,11 @@ beat_schedule_config = {
"config_id": None, # 使用默认配置,可以通过环境变量配置
},
},
"write-all-workspaces-memory": {
"task": "app.tasks.write_all_workspaces_memory_task",
"schedule": memory_increment_schedule,
"args": (),
},
}
#如果配置了默认工作空间ID则添加记忆总量统计任务
if settings.DEFAULT_WORKSPACE_ID:
beat_schedule_config["write-total-memory"] = {
"task": "app.controllers.memory_storage_controller.search_all",
"schedule": memory_increment_schedule,
"kwargs": {
"workspace_id": settings.DEFAULT_WORKSPACE_ID,
},
}
celery_app.conf.beat_schedule = beat_schedule_config

View File

@@ -201,7 +201,6 @@ class Settings:
REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300"))
HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600"))
MEMORY_INCREMENT_INTERVAL_HOURS: float = float(os.getenv("MEMORY_INCREMENT_INTERVAL_HOURS", "24"))
DEFAULT_WORKSPACE_ID: Optional[str] = os.getenv("DEFAULT_WORKSPACE_ID", None)
REFLECTION_INTERVAL_TIME: Optional[str] = int(os.getenv("REFLECTION_INTERVAL_TIME", 30))
# Memory Cache Regeneration Configuration

View File

@@ -1304,6 +1304,203 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]:
"workspace_id": workspace_id,
"elapsed_time": elapsed_time,
}
@celery_app.task(
name="app.tasks.write_all_workspaces_memory_task",
bind=True,
ignore_result=False,
max_retries=3,
acks_late=True,
time_limit=3600,
soft_time_limit=3300,
)
def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
"""定时任务:遍历所有工作空间,统计并写入记忆增量
此任务会:
1. 查询所有活跃的工作空间
2. 对每个工作空间统计记忆总量
3. 将统计结果写入 memory_increments 表
Returns:
包含任务执行结果的字典
"""
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_api_logger
from app.models.workspace_model import Workspace
from app.models.app_model import App
from app.models.end_user_model import EndUser
from app.repositories.memory_increment_repository import write_memory_increment
from app.services.memory_storage_service import search_all
api_logger = get_api_logger()
with get_db_context() as db:
try:
# 获取所有活跃的工作空间
workspaces = db.query(Workspace).filter(
Workspace.is_active.is_(True)
).all()
if not workspaces:
api_logger.warning("没有找到活跃的工作空间")
return {
"status": "SUCCESS",
"message": "没有找到活跃的工作空间",
"workspace_count": 0,
"workspace_results": []
}
api_logger.info(f"开始统计 {len(workspaces)} 个工作空间的记忆增量")
all_workspace_results = []
# 遍历每个工作空间
for workspace in workspaces:
workspace_id = workspace.id
api_logger.info(f"开始处理工作空间: {workspace.name} (ID: {workspace_id})")
try:
# 1. 查询当前workspace下的所有app仅未删除的
apps = db.query(App).filter(
App.workspace_id == workspace_id,
App.is_active.is_(True)
).all()
if not apps:
# 如果没有app总量为0
memory_increment = write_memory_increment(
db=db,
workspace_id=workspace_id,
total_num=0
)
all_workspace_results.append({
"workspace_id": str(workspace_id),
"workspace_name": workspace.name,
"status": "SUCCESS",
"total_num": 0,
"end_user_count": 0,
"memory_increment_id": str(memory_increment.id),
"created_at": memory_increment.created_at.isoformat(),
})
api_logger.info(f"工作空间 {workspace.name} 没有应用记录总量为0")
continue
# 2. 查询所有app下的end_user_id去重
app_ids = [app.id for app in apps]
end_users = db.query(EndUser.id).filter(
EndUser.app_id.in_(app_ids)
).distinct().all()
# 3. 遍历所有end_user查询每个宿主的记忆总量并累加
total_num = 0
end_user_details = []
for (end_user_id,) in end_users:
try:
# 调用 search_all 接口查询该宿主的总量
result = await search_all(str(end_user_id))
user_total = result.get("total", 0)
total_num += user_total
end_user_details.append({
"end_user_id": str(end_user_id),
"total": user_total
})
except Exception as e:
# 记录单个用户查询失败,但继续处理其他用户
api_logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}")
end_user_details.append({
"end_user_id": str(end_user_id),
"total": 0,
"error": str(e)
})
# 4. 写入数据库
memory_increment = write_memory_increment(
db=db,
workspace_id=workspace_id,
total_num=total_num
)
all_workspace_results.append({
"workspace_id": str(workspace_id),
"workspace_name": workspace.name,
"status": "SUCCESS",
"total_num": total_num,
"end_user_count": len(end_users),
"memory_increment_id": str(memory_increment.id),
"created_at": memory_increment.created_at.isoformat(),
})
api_logger.info(
f"工作空间 {workspace.name} 统计完成: 总量={total_num}, 用户数={len(end_users)}"
)
except Exception as e:
db.rollback() # 回滚失败的事务,允许继续处理下一个工作空间
api_logger.error(f"处理工作空间 {workspace.name} (ID: {workspace_id}) 失败: {str(e)}")
all_workspace_results.append({
"workspace_id": str(workspace_id),
"workspace_name": workspace.name,
"status": "FAILURE",
"error": str(e),
"total_num": 0,
"end_user_count": 0,
})
total_memory = sum(r.get("total_num", 0) for r in all_workspace_results)
success_count = sum(1 for r in all_workspace_results if r.get("status") == "SUCCESS")
return {
"status": "SUCCESS",
"message": f"成功处理 {success_count}/{len(workspaces)} 个工作空间,总记忆量: {total_memory}",
"workspace_count": len(workspaces),
"success_count": success_count,
"total_memory": total_memory,
"workspace_results": all_workspace_results
}
except Exception as e:
api_logger.error(f"记忆增量统计任务执行失败: {str(e)}")
return {
"status": "FAILURE",
"error": str(e),
"workspace_count": 0,
"workspace_results": []
}
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
result["elapsed_time"] = elapsed_time
result["task_id"] = self.request.id
return result
except Exception as e:
elapsed_time = time.time() - start_time
return {
"status": "FAILURE",
"error": str(e),
"elapsed_time": elapsed_time,
"task_id": self.request.id
}
@celery_app.task(