diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 8ef44975..f422f4a0 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -82,7 +82,7 @@ celery_app.conf.update( 'app.tasks.workspace_reflection_task': {'queue': 'periodic_tasks'}, 'app.tasks.regenerate_memory_cache': {'queue': 'periodic_tasks'}, 'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'}, - 'app.controllers.memory_storage_controller.search_all': {'queue': 'periodic_tasks'}, + 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, }, ) @@ -115,16 +115,11 @@ beat_schedule_config = { "config_id": None, # 使用默认配置,可以通过环境变量配置 }, }, + "write-all-workspaces-memory": { + "task": "app.tasks.write_all_workspaces_memory_task", + "schedule": memory_increment_schedule, + "args": (), + }, } -#如果配置了默认工作空间ID,则添加记忆总量统计任务 -if settings.DEFAULT_WORKSPACE_ID: - beat_schedule_config["write-total-memory"] = { - "task": "app.controllers.memory_storage_controller.search_all", - "schedule": memory_increment_schedule, - "kwargs": { - "workspace_id": settings.DEFAULT_WORKSPACE_ID, - }, - } - celery_app.conf.beat_schedule = beat_schedule_config diff --git a/api/app/core/config.py b/api/app/core/config.py index 0962b545..19998d32 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -201,7 +201,6 @@ class Settings: REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300")) HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600")) MEMORY_INCREMENT_INTERVAL_HOURS: float = float(os.getenv("MEMORY_INCREMENT_INTERVAL_HOURS", "24")) - DEFAULT_WORKSPACE_ID: Optional[str] = os.getenv("DEFAULT_WORKSPACE_ID", None) REFLECTION_INTERVAL_TIME: Optional[str] = int(os.getenv("REFLECTION_INTERVAL_TIME", 30)) # Memory Cache Regeneration Configuration diff --git a/api/app/tasks.py b/api/app/tasks.py index d408a0da..8e3aea85 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1304,6 +1304,203 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]: "workspace_id": workspace_id, "elapsed_time": elapsed_time, } +@celery_app.task( + name="app.tasks.write_all_workspaces_memory_task", + bind=True, + ignore_result=False, + max_retries=3, + acks_late=True, + time_limit=3600, + soft_time_limit=3300, +) +def write_all_workspaces_memory_task(self) -> Dict[str, Any]: + """定时任务:遍历所有工作空间,统计并写入记忆增量 + + 此任务会: + 1. 查询所有活跃的工作空间 + 2. 对每个工作空间统计记忆总量 + 3. 将统计结果写入 memory_increments 表 + + Returns: + 包含任务执行结果的字典 + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_api_logger + from app.models.workspace_model import Workspace + from app.models.app_model import App + from app.models.end_user_model import EndUser + from app.repositories.memory_increment_repository import write_memory_increment + from app.services.memory_storage_service import search_all + + api_logger = get_api_logger() + + with get_db_context() as db: + try: + # 获取所有活跃的工作空间 + workspaces = db.query(Workspace).filter( + Workspace.is_active.is_(True) + ).all() + + if not workspaces: + api_logger.warning("没有找到活跃的工作空间") + return { + "status": "SUCCESS", + "message": "没有找到活跃的工作空间", + "workspace_count": 0, + "workspace_results": [] + } + + api_logger.info(f"开始统计 {len(workspaces)} 个工作空间的记忆增量") + all_workspace_results = [] + + # 遍历每个工作空间 + for workspace in workspaces: + workspace_id = workspace.id + api_logger.info(f"开始处理工作空间: {workspace.name} (ID: {workspace_id})") + + try: + # 1. 查询当前workspace下的所有app(仅未删除的) + apps = db.query(App).filter( + App.workspace_id == workspace_id, + App.is_active.is_(True) + ).all() + + if not apps: + # 如果没有app,总量为0 + memory_increment = write_memory_increment( + db=db, + workspace_id=workspace_id, + total_num=0 + ) + all_workspace_results.append({ + "workspace_id": str(workspace_id), + "workspace_name": workspace.name, + "status": "SUCCESS", + "total_num": 0, + "end_user_count": 0, + "memory_increment_id": str(memory_increment.id), + "created_at": memory_increment.created_at.isoformat(), + }) + api_logger.info(f"工作空间 {workspace.name} 没有应用,记录总量为0") + continue + + # 2. 查询所有app下的end_user_id(去重) + app_ids = [app.id for app in apps] + end_users = db.query(EndUser.id).filter( + EndUser.app_id.in_(app_ids) + ).distinct().all() + + # 3. 遍历所有end_user,查询每个宿主的记忆总量并累加 + total_num = 0 + end_user_details = [] + + for (end_user_id,) in end_users: + try: + # 调用 search_all 接口查询该宿主的总量 + result = await search_all(str(end_user_id)) + user_total = result.get("total", 0) + total_num += user_total + end_user_details.append({ + "end_user_id": str(end_user_id), + "total": user_total + }) + except Exception as e: + # 记录单个用户查询失败,但继续处理其他用户 + api_logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}") + end_user_details.append({ + "end_user_id": str(end_user_id), + "total": 0, + "error": str(e) + }) + + # 4. 写入数据库 + memory_increment = write_memory_increment( + db=db, + workspace_id=workspace_id, + total_num=total_num + ) + + all_workspace_results.append({ + "workspace_id": str(workspace_id), + "workspace_name": workspace.name, + "status": "SUCCESS", + "total_num": total_num, + "end_user_count": len(end_users), + "memory_increment_id": str(memory_increment.id), + "created_at": memory_increment.created_at.isoformat(), + }) + + api_logger.info( + f"工作空间 {workspace.name} 统计完成: 总量={total_num}, 用户数={len(end_users)}" + ) + + except Exception as e: + db.rollback() # 回滚失败的事务,允许继续处理下一个工作空间 + api_logger.error(f"处理工作空间 {workspace.name} (ID: {workspace_id}) 失败: {str(e)}") + all_workspace_results.append({ + "workspace_id": str(workspace_id), + "workspace_name": workspace.name, + "status": "FAILURE", + "error": str(e), + "total_num": 0, + "end_user_count": 0, + }) + + total_memory = sum(r.get("total_num", 0) for r in all_workspace_results) + success_count = sum(1 for r in all_workspace_results if r.get("status") == "SUCCESS") + + return { + "status": "SUCCESS", + "message": f"成功处理 {success_count}/{len(workspaces)} 个工作空间,总记忆量: {total_memory}", + "workspace_count": len(workspaces), + "success_count": success_count, + "total_memory": total_memory, + "workspace_results": all_workspace_results + } + + except Exception as e: + api_logger.error(f"记忆增量统计任务执行失败: {str(e)}") + return { + "status": "FAILURE", + "error": str(e), + "workspace_count": 0, + "workspace_results": [] + } + + try: + # 使用 nest_asyncio 来避免事件循环冲突 + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + # 尝试获取现有事件循环,如果不存在则创建新的 + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + elapsed_time = time.time() - start_time + result["elapsed_time"] = elapsed_time + result["task_id"] = self.request.id + + return result + except Exception as e: + elapsed_time = time.time() - start_time + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": elapsed_time, + "task_id": self.request.id + } @celery_app.task(