Fix/optimize inerface (#183)
* [changes]Optimize the time consumption of the "/end_users" interface * [fix]Optimize the time consumption of the "/hot_memory_tags" interface * [changes]Optimize the time consumption of the "/end_users" interface * [fix]Optimize the time consumption of the "/hot_memory_tags" interface * [changes]Improve the code based on AI review
This commit is contained in:
@@ -49,63 +49,135 @@ async def get_workspace_end_users(
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""
|
||||
获取工作空间的宿主列表
|
||||
获取工作空间的宿主列表(高性能优化版本 v2)
|
||||
|
||||
返回格式与原 memory_list 接口中的 end_users 字段相同,
|
||||
并包含每个用户的记忆配置信息(memory_config_id 和 memory_config_name)
|
||||
优化策略:
|
||||
1. 批量查询 end_users(一次查询而非循环)
|
||||
2. 并发查询所有用户的记忆数量(Neo4j)
|
||||
3. RAG 模式使用批量查询(一次 SQL)
|
||||
4. 只返回必要字段减少数据传输
|
||||
5. 添加短期缓存减少重复查询
|
||||
6. 并发执行配置查询和记忆数量查询
|
||||
|
||||
返回格式:
|
||||
{
|
||||
"end_user": {"id": "uuid", "other_name": "名称"},
|
||||
"memory_num": {"total": 数量},
|
||||
"memory_config": {"memory_config_id": "id", "memory_config_name": "名称"}
|
||||
}
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
from app.aioRedis import aio_redis_get, aio_redis_set
|
||||
|
||||
workspace_id = current_user.current_workspace_id
|
||||
|
||||
# 尝试从缓存获取(30秒缓存)
|
||||
cache_key = f"end_users:workspace:{workspace_id}"
|
||||
try:
|
||||
cached_data = await aio_redis_get(cache_key)
|
||||
if cached_data:
|
||||
api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
|
||||
return success(data=json.loads(cached_data), msg="宿主列表获取成功")
|
||||
except Exception as e:
|
||||
api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
|
||||
|
||||
# 获取当前空间类型
|
||||
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
|
||||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表")
|
||||
|
||||
# 获取 end_users(已优化为批量查询)
|
||||
end_users = memory_dashboard_service.get_workspace_end_users(
|
||||
db=db,
|
||||
workspace_id=workspace_id,
|
||||
current_user=current_user
|
||||
)
|
||||
|
||||
# 批量获取所有用户的记忆配置信息(优化:一次查询而非 N 次)
|
||||
end_user_ids = [str(user.id) for user in end_users]
|
||||
memory_configs_map = {}
|
||||
if end_user_ids:
|
||||
if not end_users:
|
||||
api_logger.info("工作空间下没有宿主")
|
||||
# 缓存空结果,避免重复查询
|
||||
try:
|
||||
memory_configs_map = get_end_users_connected_configs_batch(end_user_ids, db)
|
||||
await aio_redis_set(cache_key, json.dumps([]), expire=30)
|
||||
except Exception as e:
|
||||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||||
return success(data=[], msg="宿主列表获取成功")
|
||||
|
||||
end_user_ids = [str(user.id) for user in end_users]
|
||||
|
||||
# 并发执行两个独立的查询任务
|
||||
async def get_memory_configs():
|
||||
"""获取记忆配置(在线程池中执行同步查询)"""
|
||||
try:
|
||||
return await asyncio.to_thread(
|
||||
get_end_users_connected_configs_batch,
|
||||
end_user_ids, db
|
||||
)
|
||||
except Exception as e:
|
||||
api_logger.error(f"批量获取记忆配置失败: {str(e)}")
|
||||
# 失败时使用空字典,不影响其他数据返回
|
||||
return {}
|
||||
|
||||
async def get_memory_nums():
|
||||
"""获取记忆数量"""
|
||||
if current_workspace_type == "rag":
|
||||
# RAG 模式:批量查询
|
||||
try:
|
||||
chunk_map = await asyncio.to_thread(
|
||||
memory_dashboard_service.get_users_total_chunk_batch,
|
||||
end_user_ids, db, current_user
|
||||
)
|
||||
return {uid: {"total": count} for uid, count in chunk_map.items()}
|
||||
except Exception as e:
|
||||
api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
|
||||
return {uid: {"total": 0} for uid in end_user_ids}
|
||||
|
||||
elif current_workspace_type == "neo4j":
|
||||
# Neo4j 模式:并发查询(带并发限制)
|
||||
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
|
||||
MAX_CONCURRENT_QUERIES = 10
|
||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)
|
||||
|
||||
async def get_neo4j_memory_num(end_user_id: str):
|
||||
async with semaphore:
|
||||
try:
|
||||
return await memory_storage_service.search_all(end_user_id)
|
||||
except Exception as e:
|
||||
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}")
|
||||
return {"total": 0}
|
||||
|
||||
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids])
|
||||
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))}
|
||||
|
||||
return {uid: {"total": 0} for uid in end_user_ids}
|
||||
|
||||
# 并发执行配置查询和记忆数量查询
|
||||
memory_configs_map, memory_nums_map = await asyncio.gather(
|
||||
get_memory_configs(),
|
||||
get_memory_nums()
|
||||
)
|
||||
|
||||
# 构建结果(优化:使用列表推导式)
|
||||
result = []
|
||||
for end_user in end_users:
|
||||
memory_num = {}
|
||||
if current_workspace_type == "neo4j":
|
||||
# EndUser 是 Pydantic 模型,直接访问属性而不是使用 .get()
|
||||
memory_num = await memory_storage_service.search_all(str(end_user.id))
|
||||
elif current_workspace_type == "rag":
|
||||
memory_num = {
|
||||
"total":memory_dashboard_service.get_current_user_total_chunk(str(end_user.id), db, current_user)
|
||||
}
|
||||
|
||||
# 从批量查询结果中获取配置信息
|
||||
user_id = str(end_user.id)
|
||||
memory_config_info = memory_configs_map.get(user_id, {
|
||||
"memory_config_id": None,
|
||||
"memory_config_name": None
|
||||
})
|
||||
|
||||
# 只保留需要的字段,移除 error 字段(如果有)
|
||||
memory_config = {
|
||||
"memory_config_id": memory_config_info.get("memory_config_id"),
|
||||
"memory_config_name": memory_config_info.get("memory_config_name")
|
||||
}
|
||||
|
||||
result.append(
|
||||
{
|
||||
'end_user': end_user,
|
||||
'memory_num': memory_num,
|
||||
'memory_config': memory_config
|
||||
config_info = memory_configs_map.get(user_id, {})
|
||||
result.append({
|
||||
'end_user': {
|
||||
'id': user_id,
|
||||
'other_name': end_user.other_name
|
||||
},
|
||||
'memory_num': memory_nums_map.get(user_id, {"total": 0}),
|
||||
'memory_config': {
|
||||
"memory_config_id": config_info.get("memory_config_id"),
|
||||
"memory_config_name": config_info.get("memory_config_name")
|
||||
}
|
||||
)
|
||||
|
||||
})
|
||||
|
||||
# 写入缓存(30秒过期)
|
||||
try:
|
||||
await aio_redis_set(cache_key, json.dumps(result), expire=30)
|
||||
except Exception as e:
|
||||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||||
|
||||
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
|
||||
return success(data=result, msg="宿主列表获取成功")
|
||||
|
||||
|
||||
@@ -420,15 +420,95 @@ async def get_hot_memory_tags_api(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> dict:
|
||||
api_logger.info(f"Hot memory tags requested for current_user: {current_user.id}")
|
||||
"""
|
||||
获取热门记忆标签(带Redis缓存)
|
||||
|
||||
缓存策略:
|
||||
- 缓存键:workspace_id + limit
|
||||
- 过期时间:5分钟(300秒)
|
||||
- 缓存命中:~50ms
|
||||
- 缓存未命中:~600-800ms(取决于LLM速度)
|
||||
"""
|
||||
workspace_id = current_user.current_workspace_id
|
||||
|
||||
# 构建缓存键
|
||||
cache_key = f"hot_memory_tags:{workspace_id}:{limit}"
|
||||
|
||||
api_logger.info(f"Hot memory tags requested for workspace: {workspace_id}, limit: {limit}")
|
||||
|
||||
try:
|
||||
# 尝试从Redis缓存获取
|
||||
from app.aioRedis import aio_redis_get, aio_redis_set
|
||||
import json
|
||||
|
||||
cached_result = await aio_redis_get(cache_key)
|
||||
if cached_result:
|
||||
api_logger.info(f"Cache hit for key: {cache_key}")
|
||||
try:
|
||||
data = json.loads(cached_result)
|
||||
return success(data=data, msg="查询成功(缓存)")
|
||||
except json.JSONDecodeError:
|
||||
api_logger.warning(f"Failed to parse cached data, will refresh")
|
||||
|
||||
# 缓存未命中,执行查询
|
||||
api_logger.info(f"Cache miss for key: {cache_key}, executing query")
|
||||
result = await analytics_hot_memory_tags(db, current_user, limit)
|
||||
|
||||
# 写入缓存(过期时间:5分钟)
|
||||
# 注意:result是列表,需要转换为JSON字符串
|
||||
try:
|
||||
cache_data = json.dumps(result, ensure_ascii=False)
|
||||
await aio_redis_set(cache_key, cache_data, expire=300)
|
||||
api_logger.info(f"Cached result for key: {cache_key}")
|
||||
except Exception as cache_error:
|
||||
# 缓存写入失败不影响主流程
|
||||
api_logger.warning(f"Failed to cache result: {str(cache_error)}")
|
||||
|
||||
return success(data=result, msg="查询成功")
|
||||
|
||||
except Exception as e:
|
||||
api_logger.error(f"Hot memory tags failed: {str(e)}")
|
||||
return fail(BizCode.INTERNAL_ERROR, "热门标签查询失败", str(e))
|
||||
|
||||
|
||||
@router.delete("/analytics/hot_memory_tags/cache", response_model=ApiResponse)
|
||||
async def clear_hot_memory_tags_cache(
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> dict:
|
||||
"""
|
||||
清除热门标签缓存
|
||||
|
||||
用于:
|
||||
- 手动刷新数据
|
||||
- 调试和测试
|
||||
- 数据更新后立即生效
|
||||
"""
|
||||
workspace_id = current_user.current_workspace_id
|
||||
|
||||
api_logger.info(f"Clear hot memory tags cache requested for workspace: {workspace_id}")
|
||||
|
||||
try:
|
||||
from app.aioRedis import aio_redis_delete
|
||||
|
||||
# 清除所有limit的缓存(常见的limit值)
|
||||
cleared_count = 0
|
||||
for limit in [5, 10, 15, 20, 30, 50]:
|
||||
cache_key = f"hot_memory_tags:{workspace_id}:{limit}"
|
||||
result = await aio_redis_delete(cache_key)
|
||||
if result:
|
||||
cleared_count += 1
|
||||
api_logger.info(f"Cleared cache for key: {cache_key}")
|
||||
|
||||
return success(
|
||||
data={"cleared_count": cleared_count},
|
||||
msg=f"成功清除 {cleared_count} 个缓存"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
api_logger.error(f"Clear cache failed: {str(e)}")
|
||||
return fail(BizCode.INTERNAL_ERROR, "清除缓存失败", str(e))
|
||||
|
||||
|
||||
@router.get("/analytics/recent_activity_stats", response_model=ApiResponse)
|
||||
async def get_recent_activity_stats_api(
|
||||
current_user: User = Depends(get_current_user),
|
||||
|
||||
@@ -53,18 +53,28 @@ def get_workspace_end_users(
|
||||
workspace_id: uuid.UUID,
|
||||
current_user: User
|
||||
) -> List[EndUser]:
|
||||
"""获取工作空间的所有宿主"""
|
||||
"""获取工作空间的所有宿主(优化版本:减少数据库查询次数)"""
|
||||
business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}")
|
||||
|
||||
try:
|
||||
# 查询应用(ORM)并转换为 Pydantic 模型
|
||||
# 查询应用(ORM)
|
||||
apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id)
|
||||
apps = [AppSchema.model_validate(h) for h in apps_orm]
|
||||
app_ids = [app.id for app in apps]
|
||||
end_users = []
|
||||
for app_id in app_ids:
|
||||
end_user_orm_list = end_user_repository.get_end_users_by_app_id(db, app_id)
|
||||
end_users.extend([EndUserSchema.model_validate(h) for h in end_user_orm_list])
|
||||
|
||||
if not apps_orm:
|
||||
business_logger.info("工作空间下没有应用")
|
||||
return []
|
||||
|
||||
# 提取所有 app_id
|
||||
app_ids = [app.id for app in apps_orm]
|
||||
|
||||
# 批量查询所有 end_users(一次查询而非循环查询)
|
||||
from app.models.end_user_model import EndUser as EndUserModel
|
||||
end_users_orm = db.query(EndUserModel).filter(
|
||||
EndUserModel.app_id.in_(app_ids)
|
||||
).all()
|
||||
|
||||
# 转换为 Pydantic 模型(只在需要时转换)
|
||||
end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm]
|
||||
|
||||
business_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
|
||||
return end_users
|
||||
@@ -414,6 +424,67 @@ def get_current_user_total_chunk(
|
||||
business_logger.error(f"获取用户总chunk数失败: end_user_id={end_user_id} - {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
def get_users_total_chunk_batch(
|
||||
end_user_ids: List[str],
|
||||
db: Session,
|
||||
current_user: User
|
||||
) -> dict:
|
||||
"""
|
||||
批量获取多个用户的总chunk数(性能优化版本)
|
||||
|
||||
Args:
|
||||
end_user_ids: 用户ID列表
|
||||
db: 数据库会话
|
||||
current_user: 当前用户
|
||||
|
||||
Returns:
|
||||
字典,key为end_user_id,value为chunk总数
|
||||
格式: {"user_id_1": 100, "user_id_2": 50, ...}
|
||||
"""
|
||||
business_logger.info(f"批量获取 {len(end_user_ids)} 个用户的总chunk数, 操作者: {current_user.username}")
|
||||
|
||||
try:
|
||||
from app.models.document_model import Document
|
||||
from sqlalchemy import func, case
|
||||
|
||||
if not end_user_ids:
|
||||
return {}
|
||||
|
||||
# 构造所有文件名
|
||||
file_names = [f"{user_id}.txt" for user_id in end_user_ids]
|
||||
|
||||
# 一次查询获取所有用户的chunk总数
|
||||
# 使用 GROUP BY file_name 来分组统计
|
||||
results = db.query(
|
||||
Document.file_name,
|
||||
func.sum(Document.chunk_num).label('total_chunk')
|
||||
).filter(
|
||||
Document.file_name.in_(file_names)
|
||||
).group_by(
|
||||
Document.file_name
|
||||
).all()
|
||||
|
||||
# 构建结果字典
|
||||
chunk_map = {}
|
||||
for file_name, total_chunk in results:
|
||||
# 从文件名中提取 end_user_id (去掉 .txt 后缀)
|
||||
user_id = file_name.replace('.txt', '')
|
||||
chunk_map[user_id] = int(total_chunk or 0)
|
||||
|
||||
# 对于没有记录的用户,设置为0
|
||||
for user_id in end_user_ids:
|
||||
if user_id not in chunk_map:
|
||||
chunk_map[user_id] = 0
|
||||
|
||||
business_logger.info(f"成功批量获取 {len(chunk_map)} 个用户的总chunk数")
|
||||
return chunk_map
|
||||
|
||||
except Exception as e:
|
||||
business_logger.error(f"批量获取用户总chunk数失败: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
def get_rag_content(
|
||||
end_user_id: str,
|
||||
limit: int,
|
||||
|
||||
@@ -12,7 +12,11 @@ from datetime import datetime
|
||||
from typing import Any, AsyncGenerator, Dict, List, Optional
|
||||
|
||||
from app.core.logging_config import get_config_logger, get_logger
|
||||
from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags
|
||||
from app.core.memory.analytics.hot_memory_tags import (
|
||||
get_hot_memory_tags,
|
||||
get_raw_tags_from_db,
|
||||
filter_tags_with_llm,
|
||||
)
|
||||
from app.core.memory.analytics.recent_activity_stats import get_recent_activity_stats
|
||||
from app.models.user_model import User
|
||||
from app.repositories.data_config_repository import DataConfigRepository
|
||||
@@ -515,27 +519,79 @@ async def analytics_hot_memory_tags(
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取热门记忆标签,按数量排序并返回前N个
|
||||
|
||||
优化策略:
|
||||
1. 先从所有用户收集原始标签(不调用LLM)
|
||||
2. 聚合并合并相同标签的频率
|
||||
3. 排序后取前N个
|
||||
4. 只调用一次LLM进行筛选
|
||||
"""
|
||||
workspace_id = current_user.current_workspace_id
|
||||
# 获取更多标签供LLM筛选(获取limit*4个标签)
|
||||
raw_limit = limit * 4
|
||||
from app.services.memory_dashboard_service import get_workspace_end_users
|
||||
end_users = get_workspace_end_users(db, workspace_id, current_user)
|
||||
# 使用 asyncio.to_thread 避免阻塞事件循环
|
||||
end_users = await asyncio.to_thread(get_workspace_end_users, db, workspace_id, current_user)
|
||||
|
||||
tags = []
|
||||
for end_user in end_users:
|
||||
tag = await get_hot_memory_tags(str(end_user.id), limit=raw_limit)
|
||||
if tag:
|
||||
# 将每个用户的标签列表展平到总列表中
|
||||
tags.extend(tag)
|
||||
|
||||
# 按频率降序排序(虽然数据库已经排序,但为了确保正确性再次排序)
|
||||
sorted_tags = sorted(tags, key=lambda x: x[1], reverse=True)
|
||||
if not end_users:
|
||||
return []
|
||||
|
||||
# 只返回前limit个
|
||||
top_tags = sorted_tags[:limit]
|
||||
|
||||
return [{"name": t, "frequency": f} for t, f in top_tags]
|
||||
# 步骤1: 收集所有用户的原始标签(不调用LLM)
|
||||
connector = Neo4jConnector()
|
||||
try:
|
||||
all_raw_tags = []
|
||||
for end_user in end_users:
|
||||
raw_tags = await get_raw_tags_from_db(
|
||||
connector,
|
||||
str(end_user.id),
|
||||
limit=raw_limit,
|
||||
by_user=False
|
||||
)
|
||||
if raw_tags:
|
||||
all_raw_tags.extend(raw_tags)
|
||||
|
||||
if not all_raw_tags:
|
||||
return []
|
||||
|
||||
# 步骤2: 聚合相同标签的频率
|
||||
tag_frequency_map = {}
|
||||
for tag_name, frequency in all_raw_tags:
|
||||
if tag_name in tag_frequency_map:
|
||||
tag_frequency_map[tag_name] += frequency
|
||||
else:
|
||||
tag_frequency_map[tag_name] = frequency
|
||||
|
||||
# 步骤3: 按频率降序排序,取前raw_limit个
|
||||
sorted_tags = sorted(
|
||||
tag_frequency_map.items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True
|
||||
)[:raw_limit]
|
||||
|
||||
if not sorted_tags:
|
||||
return []
|
||||
|
||||
# 步骤4: 只调用一次LLM进行筛选
|
||||
tag_names = [tag for tag, _ in sorted_tags]
|
||||
|
||||
# 使用第一个用户的group_id来获取LLM配置
|
||||
# 因为同一工作空间下的用户应该使用相同的配置
|
||||
first_end_user_id = str(end_users[0].id)
|
||||
filtered_tag_names = await filter_tags_with_llm(tag_names, first_end_user_id)
|
||||
|
||||
# 步骤5: 根据LLM筛选结果构建最终列表(保留频率)
|
||||
final_tags = []
|
||||
for tag, freq in sorted_tags:
|
||||
if tag in filtered_tag_names:
|
||||
final_tags.append((tag, freq))
|
||||
|
||||
# 步骤6: 只返回前limit个
|
||||
top_tags = final_tags[:limit]
|
||||
|
||||
return [{"name": t, "frequency": f} for t, f in top_tags]
|
||||
|
||||
finally:
|
||||
await connector.close()
|
||||
|
||||
|
||||
async def analytics_recent_activity_stats() -> Dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user