[fix]Reconstructing memory incremental statistical scheduling task
This commit is contained in:
@@ -82,7 +82,7 @@ celery_app.conf.update(
|
|||||||
'app.tasks.workspace_reflection_task': {'queue': 'periodic_tasks'},
|
'app.tasks.workspace_reflection_task': {'queue': 'periodic_tasks'},
|
||||||
'app.tasks.regenerate_memory_cache': {'queue': 'periodic_tasks'},
|
'app.tasks.regenerate_memory_cache': {'queue': 'periodic_tasks'},
|
||||||
'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'},
|
'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'},
|
||||||
'app.controllers.memory_storage_controller.search_all': {'queue': 'periodic_tasks'},
|
'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -115,16 +115,11 @@ beat_schedule_config = {
|
|||||||
"config_id": None, # 使用默认配置,可以通过环境变量配置
|
"config_id": None, # 使用默认配置,可以通过环境变量配置
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"write-all-workspaces-memory": {
|
||||||
|
"task": "app.tasks.write_all_workspaces_memory_task",
|
||||||
|
"schedule": memory_increment_schedule,
|
||||||
|
"args": (),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#如果配置了默认工作空间ID,则添加记忆总量统计任务
|
|
||||||
if settings.DEFAULT_WORKSPACE_ID:
|
|
||||||
beat_schedule_config["write-total-memory"] = {
|
|
||||||
"task": "app.controllers.memory_storage_controller.search_all",
|
|
||||||
"schedule": memory_increment_schedule,
|
|
||||||
"kwargs": {
|
|
||||||
"workspace_id": settings.DEFAULT_WORKSPACE_ID,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
celery_app.conf.beat_schedule = beat_schedule_config
|
celery_app.conf.beat_schedule = beat_schedule_config
|
||||||
|
|||||||
@@ -202,7 +202,6 @@ class Settings:
|
|||||||
REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300"))
|
REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300"))
|
||||||
HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600"))
|
HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600"))
|
||||||
MEMORY_INCREMENT_INTERVAL_HOURS: float = float(os.getenv("MEMORY_INCREMENT_INTERVAL_HOURS", "24"))
|
MEMORY_INCREMENT_INTERVAL_HOURS: float = float(os.getenv("MEMORY_INCREMENT_INTERVAL_HOURS", "24"))
|
||||||
DEFAULT_WORKSPACE_ID: Optional[str] = os.getenv("DEFAULT_WORKSPACE_ID", None)
|
|
||||||
REFLECTION_INTERVAL_TIME: Optional[str] = int(os.getenv("REFLECTION_INTERVAL_TIME", 30))
|
REFLECTION_INTERVAL_TIME: Optional[str] = int(os.getenv("REFLECTION_INTERVAL_TIME", 30))
|
||||||
|
|
||||||
# Memory Cache Regeneration Configuration
|
# Memory Cache Regeneration Configuration
|
||||||
|
|||||||
197
api/app/tasks.py
197
api/app/tasks.py
@@ -1304,6 +1304,203 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]:
|
|||||||
"workspace_id": workspace_id,
|
"workspace_id": workspace_id,
|
||||||
"elapsed_time": elapsed_time,
|
"elapsed_time": elapsed_time,
|
||||||
}
|
}
|
||||||
|
@celery_app.task(
|
||||||
|
name="app.tasks.write_all_workspaces_memory_task",
|
||||||
|
bind=True,
|
||||||
|
ignore_result=False,
|
||||||
|
max_retries=3,
|
||||||
|
acks_late=True,
|
||||||
|
time_limit=3600,
|
||||||
|
soft_time_limit=3300,
|
||||||
|
)
|
||||||
|
def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
|
||||||
|
"""定时任务:遍历所有工作空间,统计并写入记忆增量
|
||||||
|
|
||||||
|
此任务会:
|
||||||
|
1. 查询所有活跃的工作空间
|
||||||
|
2. 对每个工作空间统计记忆总量
|
||||||
|
3. 将统计结果写入 memory_increments 表
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含任务执行结果的字典
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
async def _run() -> Dict[str, Any]:
|
||||||
|
from app.core.logging_config import get_api_logger
|
||||||
|
from app.models.workspace_model import Workspace
|
||||||
|
from app.models.app_model import App
|
||||||
|
from app.models.end_user_model import EndUser
|
||||||
|
from app.repositories.memory_increment_repository import write_memory_increment
|
||||||
|
from app.services.memory_storage_service import search_all
|
||||||
|
|
||||||
|
api_logger = get_api_logger()
|
||||||
|
|
||||||
|
with get_db_context() as db:
|
||||||
|
try:
|
||||||
|
# 获取所有活跃的工作空间
|
||||||
|
workspaces = db.query(Workspace).filter(
|
||||||
|
Workspace.is_active.is_(True)
|
||||||
|
).all()
|
||||||
|
|
||||||
|
if not workspaces:
|
||||||
|
api_logger.warning("没有找到活跃的工作空间")
|
||||||
|
return {
|
||||||
|
"status": "SUCCESS",
|
||||||
|
"message": "没有找到活跃的工作空间",
|
||||||
|
"workspace_count": 0,
|
||||||
|
"workspace_results": []
|
||||||
|
}
|
||||||
|
|
||||||
|
api_logger.info(f"开始统计 {len(workspaces)} 个工作空间的记忆增量")
|
||||||
|
all_workspace_results = []
|
||||||
|
|
||||||
|
# 遍历每个工作空间
|
||||||
|
for workspace in workspaces:
|
||||||
|
workspace_id = workspace.id
|
||||||
|
api_logger.info(f"开始处理工作空间: {workspace.name} (ID: {workspace_id})")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. 查询当前workspace下的所有app(仅未删除的)
|
||||||
|
apps = db.query(App).filter(
|
||||||
|
App.workspace_id == workspace_id,
|
||||||
|
App.is_active.is_(True)
|
||||||
|
).all()
|
||||||
|
|
||||||
|
if not apps:
|
||||||
|
# 如果没有app,总量为0
|
||||||
|
memory_increment = write_memory_increment(
|
||||||
|
db=db,
|
||||||
|
workspace_id=workspace_id,
|
||||||
|
total_num=0
|
||||||
|
)
|
||||||
|
all_workspace_results.append({
|
||||||
|
"workspace_id": str(workspace_id),
|
||||||
|
"workspace_name": workspace.name,
|
||||||
|
"status": "SUCCESS",
|
||||||
|
"total_num": 0,
|
||||||
|
"end_user_count": 0,
|
||||||
|
"memory_increment_id": str(memory_increment.id),
|
||||||
|
"created_at": memory_increment.created_at.isoformat(),
|
||||||
|
})
|
||||||
|
api_logger.info(f"工作空间 {workspace.name} 没有应用,记录总量为0")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 2. 查询所有app下的end_user_id(去重)
|
||||||
|
app_ids = [app.id for app in apps]
|
||||||
|
end_users = db.query(EndUser.id).filter(
|
||||||
|
EndUser.app_id.in_(app_ids)
|
||||||
|
).distinct().all()
|
||||||
|
|
||||||
|
# 3. 遍历所有end_user,查询每个宿主的记忆总量并累加
|
||||||
|
total_num = 0
|
||||||
|
end_user_details = []
|
||||||
|
|
||||||
|
for (end_user_id,) in end_users:
|
||||||
|
try:
|
||||||
|
# 调用 search_all 接口查询该宿主的总量
|
||||||
|
result = await search_all(str(end_user_id))
|
||||||
|
user_total = result.get("total", 0)
|
||||||
|
total_num += user_total
|
||||||
|
end_user_details.append({
|
||||||
|
"end_user_id": str(end_user_id),
|
||||||
|
"total": user_total
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
# 记录单个用户查询失败,但继续处理其他用户
|
||||||
|
api_logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}")
|
||||||
|
end_user_details.append({
|
||||||
|
"end_user_id": str(end_user_id),
|
||||||
|
"total": 0,
|
||||||
|
"error": str(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
# 4. 写入数据库
|
||||||
|
memory_increment = write_memory_increment(
|
||||||
|
db=db,
|
||||||
|
workspace_id=workspace_id,
|
||||||
|
total_num=total_num
|
||||||
|
)
|
||||||
|
|
||||||
|
all_workspace_results.append({
|
||||||
|
"workspace_id": str(workspace_id),
|
||||||
|
"workspace_name": workspace.name,
|
||||||
|
"status": "SUCCESS",
|
||||||
|
"total_num": total_num,
|
||||||
|
"end_user_count": len(end_users),
|
||||||
|
"memory_increment_id": str(memory_increment.id),
|
||||||
|
"created_at": memory_increment.created_at.isoformat(),
|
||||||
|
})
|
||||||
|
|
||||||
|
api_logger.info(
|
||||||
|
f"工作空间 {workspace.name} 统计完成: 总量={total_num}, 用户数={len(end_users)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
db.rollback() # 回滚失败的事务,允许继续处理下一个工作空间
|
||||||
|
api_logger.error(f"处理工作空间 {workspace.name} (ID: {workspace_id}) 失败: {str(e)}")
|
||||||
|
all_workspace_results.append({
|
||||||
|
"workspace_id": str(workspace_id),
|
||||||
|
"workspace_name": workspace.name,
|
||||||
|
"status": "FAILURE",
|
||||||
|
"error": str(e),
|
||||||
|
"total_num": 0,
|
||||||
|
"end_user_count": 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
total_memory = sum(r.get("total_num", 0) for r in all_workspace_results)
|
||||||
|
success_count = sum(1 for r in all_workspace_results if r.get("status") == "SUCCESS")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "SUCCESS",
|
||||||
|
"message": f"成功处理 {success_count}/{len(workspaces)} 个工作空间,总记忆量: {total_memory}",
|
||||||
|
"workspace_count": len(workspaces),
|
||||||
|
"success_count": success_count,
|
||||||
|
"total_memory": total_memory,
|
||||||
|
"workspace_results": all_workspace_results
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
api_logger.error(f"记忆增量统计任务执行失败: {str(e)}")
|
||||||
|
return {
|
||||||
|
"status": "FAILURE",
|
||||||
|
"error": str(e),
|
||||||
|
"workspace_count": 0,
|
||||||
|
"workspace_results": []
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 使用 nest_asyncio 来避免事件循环冲突
|
||||||
|
try:
|
||||||
|
import nest_asyncio
|
||||||
|
nest_asyncio.apply()
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 尝试获取现有事件循环,如果不存在则创建新的
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
if loop.is_closed():
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
except RuntimeError:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
result = loop.run_until_complete(_run())
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
result["elapsed_time"] = elapsed_time
|
||||||
|
result["task_id"] = self.request.id
|
||||||
|
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
return {
|
||||||
|
"status": "FAILURE",
|
||||||
|
"error": str(e),
|
||||||
|
"elapsed_time": elapsed_time,
|
||||||
|
"task_id": self.request.id
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(
|
@celery_app.task(
|
||||||
|
|||||||
Reference in New Issue
Block a user