feat(multimodel): support multimodal memory display and improve code style

This commit is contained in:
Eternity
2026-03-13 13:33:58 +08:00
parent cbc8714414
commit b71bc1f875
31 changed files with 877 additions and 543 deletions

View File

@@ -1,6 +1,5 @@
import asyncio
import json
import logging
import hashlib
import os
import re
import shutil
@@ -11,20 +10,48 @@ from datetime import datetime, timezone
from math import ceil
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import UUID
import redis
import requests
from redis.exceptions import RedisError
logger = logging.getLogger(__name__)
# Import a unified Celery instance
from app.celery_app import celery_app
from app.core.config import settings
from app.core.logging_config import get_logger
from app.core.rag.crawler.web_crawler import WebCrawler
from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb
from app.core.rag.graphrag.utils import get_llm_cache, set_llm_cache
from app.core.rag.integrations.feishu.client import FeishuAPIClient
from app.core.rag.integrations.feishu.models import FileInfo
from app.core.rag.integrations.yuque.client import YuqueAPIClient
from app.core.rag.integrations.yuque.models import YuqueDocInfo
from app.core.rag.llm.chat_model import Base
from app.core.rag.llm.cv_model import QWenCV
from app.core.rag.llm.embedding_model import OpenAIEmbed
from app.core.rag.llm.sequence2txt_model import QWenSeq2txt
from app.core.rag.models.chunk import DocumentChunk
from app.core.rag.prompts.generator import question_proposal
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import (
ElasticSearchVectorFactory,
)
from app.db import get_db, get_db_context
from app.models import Document, File, Knowledge
from app.schemas import document_schema, file_schema
from app.schemas.model_schema import ModelInfo
from app.services.memory_agent_service import MemoryAgentService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.utils.config_utils import resolve_config_id
from app.utils.redis_lock import RedisLock
logger = get_logger(__name__)
# 模块级同步 Redis 连接池,供 Celery 任务共享使用
# 连接 CELERY_BACKEND DB与 write_message:last_done 时间戳写入保持一致
# 使用连接池而非单例客户端,提供更好的并发性能和自动重连
_sync_redis_pool: redis.ConnectionPool = None
_sync_redis_pool: redis.ConnectionPool | None = None
def _get_or_create_redis_pool() -> redis.ConnectionPool:
def _get_or_create_redis_pool() -> redis.ConnectionPool | None:
"""获取或创建 Redis 连接池(懒初始化)"""
global _sync_redis_pool
if _sync_redis_pool is None:
@@ -47,6 +74,7 @@ def _get_or_create_redis_pool() -> redis.ConnectionPool:
return None
return _sync_redis_pool
def get_sync_redis_client() -> Optional[redis.StrictRedis]:
"""获取同步 Redis 客户端(使用连接池)
@@ -60,7 +88,7 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]:
pool = _get_or_create_redis_pool()
if pool is None:
return None
client = redis.StrictRedis(connection_pool=pool)
# 验证连接可用性
client.ping()
@@ -72,32 +100,18 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]:
logger.error(f"Unexpected error getting Redis client: {e}", exc_info=True)
return None
# Import a unified Celery instance
from app.celery_app import celery_app
from app.core.config import settings
from app.core.rag.crawler.web_crawler import WebCrawler
from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb
from app.core.rag.graphrag.utils import get_llm_cache, set_llm_cache
from app.core.rag.integrations.feishu.client import FeishuAPIClient
from app.core.rag.integrations.feishu.models import FileInfo
from app.core.rag.integrations.yuque.client import YuqueAPIClient
from app.core.rag.integrations.yuque.models import YuqueDocInfo
from app.core.rag.llm.chat_model import Base
from app.core.rag.llm.cv_model import QWenCV
from app.core.rag.llm.embedding_model import OpenAIEmbed
from app.core.rag.llm.sequence2txt_model import QWenSeq2txt
from app.core.rag.models.chunk import DocumentChunk
from app.core.rag.prompts.generator import question_proposal
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import (
ElasticSearchVectorFactory,
)
from app.db import get_db, get_db_context
from app.models.document_model import Document
from app.models.file_model import File
from app.models.knowledge_model import Knowledge
from app.schemas import document_schema, file_schema
from app.services.memory_agent_service import MemoryAgentService
from app.utils.config_utils import resolve_config_id
def set_asyncio_event_loop():
"""Set the asyncio event loop for the current thread."""
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
@celery_app.task(name="tasks.process_item")
@@ -294,9 +308,18 @@ def parse_document(file_path: str, document_id: uuid.UUID):
vector_size = len(vts[0])
init_graphrag(task, vector_size)
async def _run(row: dict, document_ids: list[str], language: str, parser_config: dict, vector_service,
chat_model, embedding_model, callback, with_resolution: bool = True,
with_community: bool = True, ) -> dict:
async def _run(
row: dict,
document_ids: list[str],
language: str,
parser_config: dict,
vector_service,
chat_model,
embedding_model,
callback,
with_resolution: bool = True,
with_community: bool = True
) -> dict:
await trio.sleep(5) # Delay for 10 seconds
nonlocal progress_msg # Declare the use of an external progress_msg variable
result = await run_graphrag_for_kb(
@@ -329,6 +352,7 @@ def parse_document(file_path: str, document_id: uuid.UUID):
with_community=with_community,
)
)
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(sync_task)
@@ -448,6 +472,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID):
with_community=with_community,
)
)
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(sync_task)
@@ -1002,29 +1027,21 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s
# Log but continue - will fail later with proper error
pass
async def _run() -> str:
async def _run() -> dict:
with get_db_context() as db:
service = MemoryAgentService()
return await service.read_memory(end_user_id, message, history, search_switch, actual_config_id, db,
storage_type, user_rag_memory_id)
return await service.read_memory(
end_user_id,
message,
history,
search_switch,
actual_config_id, db,
storage_type, user_rag_memory_id
)
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
@@ -1056,7 +1073,8 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s
@celery_app.task(name="app.core.memory.agent.write_message", bind=True)
def write_message_task(self, end_user_id: str, message: list[dict], config_id: str | int, storage_type: str, user_rag_memory_id: str,
def write_message_task(self, end_user_id: str, message: list[dict], config_id: str | int, storage_type: str,
user_rag_memory_id: str,
language: str = "zh") -> Dict[str, Any]:
"""Celery task to process a write message via MemoryAgentService.
Args:
@@ -1073,10 +1091,11 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
Raises:
Exception on failure
"""
from app.core.logging_config import get_logger
logger = get_logger(__name__)
logger.info(f"[CELERY WRITE] Starting write task - end_user_id={end_user_id}, config_id={config_id} (type: {type(config_id).__name__}), storage_type={storage_type}, language={language}")
logger.info(
f"[CELERY WRITE] Starting write task - end_user_id={end_user_id}, "
f"config_id={config_id} (type: {type(config_id).__name__}), "
f"storage_type={storage_type}, language={language}")
start_time = time.time()
# Convert config_id to UUID
@@ -1086,13 +1105,14 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
try:
with get_db_context() as db:
actual_config_id = resolve_config_id(config_id, db)
print(100*'-')
print(100 * '-')
print(actual_config_id)
print(100*'-')
print(100 * '-')
logger.info(
f"[CELERY WRITE] Converted config_id to UUID: {actual_config_id} (type: {type(actual_config_id).__name__})")
except (ValueError, AttributeError) as e:
logger.error(f"[CELERY WRITE] Invalid config_id format: {config_id} (type: {type(config_id).__name__}), error: {e}")
logger.error(
f"[CELERY WRITE] Invalid config_id format: {config_id} (type: {type(config_id).__name__}), error: {e}")
return {
"status": "FAILURE",
"error": f"Invalid config_id format: {config_id} - {str(e)}",
@@ -1116,7 +1136,8 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
async def _run() -> str:
with get_db_context() as db:
logger.info(
f"[CELERY WRITE] Executing MemoryAgentService.write_memory with config_id={actual_config_id} (type: {type(actual_config_id).__name__}), language={language}")
f"[CELERY WRITE] Executing MemoryAgentService.write_memory "
f"with config_id={actual_config_id} (type: {type(actual_config_id).__name__}), language={language}")
service = MemoryAgentService()
result = await service.write_memory(end_user_id, message, actual_config_id, db, storage_type,
user_rag_memory_id, language)
@@ -1124,22 +1145,8 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
return result
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
@@ -1193,28 +1200,6 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
}
def reflection_engine() -> None:
"""Empty function placeholder for timed background reflection.
Intentionally left blank; replace with real reflection logic later.
"""
import asyncio
from app.core.memory.utils.self_reflexion_utils.self_reflexion import self_reflexion
host_id = uuid.UUID("2f6ff1eb-50c7-4765-8e89-e4566be19122")
asyncio.run(self_reflexion(host_id))
@celery_app.task(name="app.core.memory.agent.reflection.timer")
def reflection_timer_task() -> None:
"""Periodic Celery task that invokes reflection_engine.
Raises an exception on failure.
"""
reflection_engine()
# unused task
# @celery_app.task(name="app.core.memory.agent.health.check_read_service")
# def check_read_service_task() -> Dict[str, str]:
@@ -1368,6 +1353,8 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]:
"workspace_id": workspace_id,
"elapsed_time": elapsed_time,
}
@celery_app.task(
name="app.tasks.write_all_workspaces_memory_task",
bind=True,
@@ -1391,15 +1378,12 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_api_logger
from app.models.app_model import App
from app.models.end_user_model import EndUser
from app.models.workspace_model import Workspace
from app.repositories.memory_increment_repository import write_memory_increment
from app.services.memory_storage_service import search_all
api_logger = get_api_logger()
with get_db_context() as db:
try:
# 获取所有活跃的工作空间
@@ -1408,7 +1392,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
).all()
if not workspaces:
api_logger.warning("没有找到活跃的工作空间")
logger.warning("没有找到活跃的工作空间")
return {
"status": "SUCCESS",
"message": "没有找到活跃的工作空间",
@@ -1416,13 +1400,13 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
"workspace_results": []
}
api_logger.info(f"开始统计 {len(workspaces)} 个工作空间的记忆增量")
logger.info(f"开始统计 {len(workspaces)} 个工作空间的记忆增量")
all_workspace_results = []
# 遍历每个工作空间
for workspace in workspaces:
workspace_id = workspace.id
api_logger.info(f"开始处理工作空间: {workspace.name} (ID: {workspace_id})")
logger.info(f"开始处理工作空间: {workspace.name} (ID: {workspace_id})")
try:
# 1. 查询当前workspace下的所有app仅未删除的
@@ -1447,7 +1431,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
"memory_increment_id": str(memory_increment.id),
"created_at": memory_increment.created_at.isoformat(),
})
api_logger.info(f"工作空间 {workspace.name} 没有应用记录总量为0")
logger.info(f"工作空间 {workspace.name} 没有应用记录总量为0")
continue
# 2. 查询所有app下的end_user_id去重
@@ -1472,7 +1456,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
})
except Exception as e:
# 记录单个用户查询失败,但继续处理其他用户
api_logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}")
logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}")
end_user_details.append({
"end_user_id": str(end_user_id),
"total": 0,
@@ -1496,13 +1480,13 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
"created_at": memory_increment.created_at.isoformat(),
})
api_logger.info(
logger.info(
f"工作空间 {workspace.name} 统计完成: 总量={total_num}, 用户数={len(end_users)}"
)
except Exception as e:
db.rollback() # 回滚失败的事务,允许继续处理下一个工作空间
api_logger.error(f"处理工作空间 {workspace.name} (ID: {workspace_id}) 失败: {str(e)}")
logger.error(f"处理工作空间 {workspace.name} (ID: {workspace_id}) 失败: {str(e)}")
all_workspace_results.append({
"workspace_id": str(workspace_id),
"workspace_name": workspace.name,
@@ -1525,7 +1509,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
}
except Exception as e:
api_logger.error(f"记忆增量统计任务执行失败: {str(e)}")
logger.error(f"记忆增量统计任务执行失败: {str(e)}")
return {
"status": "FAILURE",
"error": str(e),
@@ -1534,22 +1518,8 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
}
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
@@ -1597,11 +1567,9 @@ def regenerate_memory_cache(self) -> Dict[str, Any]:
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger
from app.repositories.end_user_repository import EndUserRepository
from app.services.user_memory_service import UserMemoryService
logger = get_logger(__name__)
logger.info("开始执行记忆缓存重新生成定时任务")
service = UserMemoryService()
@@ -1734,22 +1702,8 @@ def regenerate_memory_cache(self) -> Dict[str, Any]:
}
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
@@ -1785,15 +1739,12 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_api_logger
from app.models.workspace_model import Workspace
from app.services.memory_reflection_service import (
MemoryReflectionService,
WorkspaceAppService,
)
api_logger = get_api_logger()
with get_db_context() as db:
try:
# 获取所有工作空间
@@ -1812,7 +1763,7 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
# 遍历每个工作空间
for workspace in workspaces:
workspace_id = workspace.id
api_logger.info(f"开始处理工作空间反思workspace_id: {workspace_id}")
logger.info(f"开始处理工作空间反思workspace_id: {workspace_id}")
try:
reflection_service = MemoryReflectionService(db)
@@ -1824,7 +1775,7 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
workspace_reflection_results = []
for data in result['apps_detailed_info']:
if data['memory_configs'] == []:
if not data['memory_configs']:
continue
releases = data['releases']
@@ -1835,7 +1786,7 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
if str(base['config']) == str(config['config_id']) and str(base['app_id']) == str(
user['app_id']):
# 调用反思服务
api_logger.info(f"为用户 {user['id']} 启动反思config_id: {config['config_id']}")
logger.info(f"为用户 {user['id']} 启动反思config_id: {config['config_id']}")
reflection_result = await reflection_service.start_reflection_from_data(
config_data=config,
@@ -1855,12 +1806,12 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
"reflection_results": workspace_reflection_results
})
api_logger.info(
logger.info(
f"工作空间 {workspace_id} 反思处理完成,处理了 {len(workspace_reflection_results)} 个任务")
except Exception as e:
db.rollback() # Rollback failed transaction to allow next query
api_logger.error(f"处理工作空间 {workspace_id} 反思失败: {str(e)}")
logger.error(f"处理工作空间 {workspace_id} 反思失败: {str(e)}")
all_reflection_results.append({
"workspace_id": str(workspace_id),
"error": str(e),
@@ -1879,7 +1830,7 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
}
except Exception as e:
api_logger.error(f"工作空间反思任务执行失败: {str(e)}")
logger.error(f"工作空间反思任务执行失败: {str(e)}")
return {
"status": "FAILURE",
"error": str(e),
@@ -1888,22 +1839,8 @@ def workspace_reflection_task(self) -> Dict[str, Any]:
}
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
@@ -1944,18 +1881,16 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_api_logger
from app.services.memory_forget_service import MemoryForgetService
api_logger = get_api_logger()
with get_db_context() as db:
try:
api_logger.info(f"开始执行遗忘周期定时任务config_id: {config_id}")
logger.info(f"开始执行遗忘周期定时任务config_id: {config_id}")
forget_service = MemoryForgetService()
# 运行遗忘周期
# FIXME: MemeoryForgetService
report = await forget_service.trigger_forgetting(
db=db,
end_user_id=None, # 处理所有组
@@ -1964,7 +1899,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
duration = time.time() - start_time
api_logger.info(
logger.info(
f"遗忘周期定时任务完成: "
f"融合 {report['merged_count']} 对节点, "
f"失败 {report['failed_count']} 对, "
@@ -1980,7 +1915,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
except Exception as e:
duration = time.time() - start_time
api_logger.error(f"遗忘周期定时任务失败: {str(e)}", exc_info=True)
logger.error(f"遗忘周期定时任务失败: {str(e)}", exc_info=True)
return {
"status": "FAILED",
@@ -1997,6 +1932,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
finally:
loop.close()
# =============================================================================
# Long-term Memory Storage Tasks (Batched Write Strategies)
# =============================================================================
@@ -2222,9 +2158,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
start_time = time.time()
async def _run() -> Dict[str, Any]:
from sqlalchemy import func, select
from sqlalchemy import select
from app.core.logging_config import get_logger
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from app.repositories.implicit_emotions_storage_repository import (
ImplicitEmotionsStorageRepository,
@@ -2233,7 +2168,6 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
from app.services.emotion_analytics_service import EmotionAnalyticsService
from app.services.implicit_memory_service import ImplicitMemoryService
logger = get_logger(__name__)
logger.info("开始执行隐性记忆和情绪数据更新定时任务")
total_users = 0
@@ -2267,7 +2201,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
for end_user_id in refresh_iter:
logger.info(f"开始处理用户: {end_user_id}")
user_start_time = time.time()
implicit_success = False
emotion_success = False
errors = []
@@ -2318,7 +2252,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
failed += 1
user_elapsed = time.time() - user_start_time
# 记录用户处理结果
user_result = {
"end_user_id": end_user_id,
@@ -2460,22 +2394,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
}
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
@@ -2521,14 +2441,12 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger
from app.repositories.implicit_emotions_storage_repository import (
ImplicitEmotionsStorageRepository,
)
from app.services.emotion_analytics_service import EmotionAnalyticsService
from app.services.implicit_memory_service import ImplicitMemoryService
logger = get_logger(__name__)
logger.info(f"开始按需初始化隐性记忆/情绪数据,候选用户数: {len(end_user_ids)}")
initialized = 0
@@ -2587,20 +2505,7 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
}
try:
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
result["elapsed_time"] = time.time() - start_time
@@ -2633,6 +2538,7 @@ def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[
默认生成中文zh兴趣分布数据。
Args:
self: task object
end_user_ids: 需要检查的用户ID列表
Returns:
@@ -2641,11 +2547,9 @@ def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger
from app.cache.memory.interest_memory import InterestMemoryCache, INTEREST_CACHE_EXPIRE
from app.services.memory_agent_service import MemoryAgentService
logger = get_logger(__name__)
logger.info(f"开始按需初始化兴趣分布缓存,候选用户数: {len(end_user_ids)}")
initialized = 0
@@ -2694,20 +2598,7 @@ def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[
}
try:
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
result["elapsed_time"] = time.time() - start_time
@@ -2720,3 +2611,54 @@ def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[
"elapsed_time": time.time() - start_time,
"task_id": self.request.id,
}
@celery_app.task(
name="app.tasks.write_perceptual_memory",
bind=True,
ignore_result=True,
max_retries=0,
acks_late=False,
time_limit=3600,
soft_time_limit=3300,
)
def write_perceptual_memory(
self,
end_user_id: str,
model_api_config: dict,
file_type: str,
file_url: str,
file_message: dict
):
"""
Write perceptual memory for a user into PostgreSQL and Neo4j.
This task generates or updates the user's perceptual memory
in the backend databases. It is intended to be executed asynchronously
via Celery.
Args:
end_user_id (uuid.UUID): The unique identifier of the end user.
model_api_config (ModelInfo): API configuration for the model
used to generate perceptual memory.
file_type (str): The file type
file_url (url): The url of file
file_message (dict): The file message containing details about the file
to be processed.
Returns:
None
"""
file_url_md5 = hashlib.md5(file_url.encode("utf-8")).hexdigest()
set_asyncio_event_loop()
with RedisLock(f"perceptual:{file_url_md5}", redis_client=get_sync_redis_client()):
model_info = ModelInfo(**model_api_config)
with get_db_context() as db:
memory_perceptual_service = MemoryPerceptualService(db)
return asyncio.run(memory_perceptual_service.generate_perceptual_memory(
end_user_id,
model_info,
file_type,
file_url,
file_message,
))