From 6eca5f6cdff1e5c1d246c4cff7e67ab63c337bac Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 1 Apr 2026 18:17:25 +0800 Subject: [PATCH 1/5] feat:(controllers & services) Changes in data from yesterday to today --- api/app/controllers/knowledge_controller.py | 1 + .../memory_dashboard_controller.py | 134 +++++++++--- api/app/services/memory_dashboard_service.py | 200 ++++++++++++++++++ api/app/services/memory_storage_service.py | 44 ++-- 4 files changed, 320 insertions(+), 59 deletions(-) diff --git a/api/app/controllers/knowledge_controller.py b/api/app/controllers/knowledge_controller.py index 74b832cd..afda7cce 100644 --- a/api/app/controllers/knowledge_controller.py +++ b/api/app/controllers/knowledge_controller.py @@ -352,6 +352,7 @@ async def delete_knowledge( # 2. Soft-delete knowledge base api_logger.debug(f"Perform a soft delete: {db_knowledge.name} (ID: {knowledge_id})") db_knowledge.status = 2 + db_knowledge.updated_at = datetime.datetime.now() db.commit() api_logger.info(f"The knowledge base has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})") return success(msg="The knowledge base has been successfully deleted") diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index bedee987..c97d708c 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -611,38 +611,66 @@ async def dashboard_data( except Exception as e: api_logger.warning(f"获取记忆总量失败: {str(e)}") - # 2. 获取知识库类型统计(total_knowledge) + # 2. 获取知识库数量(total_knowledge),排除用户知识库(permission_id='Memory') try: - from app.services.memory_agent_service import MemoryAgentService - memory_agent_service = MemoryAgentService() - knowledge_stats = await memory_agent_service.get_knowledge_type_stats( - end_user_id=end_user_id, - only_active=True, - current_workspace_id=workspace_id, - db=db - ) - neo4j_data["total_knowledge"] = knowledge_stats.get("total", 0) - api_logger.info(f"成功获取知识库类型统计total: {neo4j_data['total_knowledge']}") + from sqlalchemy import func as _func + from app.models.knowledge_model import Knowledge as _Knowledge + total_knowledge = db.query(_func.count(_Knowledge.id)).filter( + _Knowledge.workspace_id == workspace_id, + _Knowledge.status == 1, + _Knowledge.permission_id != "Memory" + ).scalar() or 0 + neo4j_data["total_knowledge"] = total_knowledge + api_logger.info(f"成功获取知识库数量: {neo4j_data['total_knowledge']}") except Exception as e: - api_logger.warning(f"获取知识库类型统计失败: {str(e)}") + api_logger.warning(f"获取知识库数量失败: {str(e)}") - # 3. 获取API调用统计(total_api_call) + # 3. 获取API调用统计(total_api_call)—— 仅统计当天api_key_log调用次数 try: - # 使用 AppStatisticsService 获取真实的API调用统计 - app_stats_service = AppStatisticsService(db) - api_stats = app_stats_service.get_workspace_api_statistics( - workspace_id=workspace_id, - start_date=start_date, - end_date=end_date - ) - # 计算总调用次数 - total_api_calls = sum(item.get("total_calls", 0) for item in api_stats) + from datetime import datetime + from sqlalchemy import func as _api_func + from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog + + _now = datetime.now() + _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) + + _api_key_ids = [ + row[0] for row in db.query(_ApiKey.id).filter( + _ApiKey.workspace_id == workspace_id + ).all() + ] + if _api_key_ids: + total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( + _ApiKeyLog.api_key_id.in_(_api_key_ids), + _ApiKeyLog.created_at >= _today_start, + _ApiKeyLog.created_at < _now + ).scalar() or 0 + else: + total_api_calls = 0 neo4j_data["total_api_call"] = total_api_calls - api_logger.info(f"成功获取API调用统计: {neo4j_data['total_api_call']}") + api_logger.info(f"成功获取API调用统计(当天): {neo4j_data['total_api_call']}") except Exception as e: api_logger.error(f"获取API调用统计失败: {str(e)}") neo4j_data["total_api_call"] = 0 + # 计算昨日对比 + try: + changes = memory_dashboard_service.get_dashboard_yesterday_changes( + db=db, + workspace_id=workspace_id, + storage_type=storage_type, + today_data=neo4j_data + ) + neo4j_data.update(changes) + except Exception as e: + api_logger.warning(f"计算neo4j昨日对比失败: {str(e)}") + neo4j_data.update({ + "total_memory_change": None, + "total_app_change": None, + "total_knowledge_change": None, + "total_api_call_change": None, + }) + result["neo4j_data"] = neo4j_data api_logger.info("成功获取neo4j_data") @@ -669,22 +697,40 @@ async def dashboard_data( ) rag_data["total_app"] = total_app - # total_knowledge: 使用 total_kb(总知识库数) - total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user) + # total_knowledge: 直接查knowledges表status=1的总数,排除用户知识库(permission_id='Memory') + from sqlalchemy import func as _func + from app.models.knowledge_model import Knowledge as _Knowledge + total_kb = db.query(_func.count(_Knowledge.id)).filter( + _Knowledge.workspace_id == workspace_id, + _Knowledge.status == 1, + _Knowledge.permission_id != "Memory" + ).scalar() or 0 rag_data["total_knowledge"] = total_kb - # total_api_call: 使用 AppStatisticsService 获取真实的API调用统计 + # total_api_call: 仅统计当天api_key_log调用次数 try: - app_stats_service = AppStatisticsService(db) - api_stats = app_stats_service.get_workspace_api_statistics( - workspace_id=workspace_id, - start_date=start_date, - end_date=end_date - ) - # 计算总调用次数 - total_api_calls = sum(item.get("total_calls", 0) for item in api_stats) + from datetime import datetime + from sqlalchemy import func as _api_func + from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog + + _now = datetime.now() + _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) + + _api_key_ids = [ + row[0] for row in db.query(_ApiKey.id).filter( + _ApiKey.workspace_id == workspace_id + ).all() + ] + if _api_key_ids: + total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( + _ApiKeyLog.api_key_id.in_(_api_key_ids), + _ApiKeyLog.created_at >= _today_start, + _ApiKeyLog.created_at < _now + ).scalar() or 0 + else: + total_api_calls = 0 rag_data["total_api_call"] = total_api_calls - api_logger.info(f"成功获取RAG模式API调用统计: {rag_data['total_api_call']}") + api_logger.info(f"成功获取RAG模式API调用统计(当天): {rag_data['total_api_call']}") except Exception as e: api_logger.warning(f"获取RAG模式API调用统计失败,使用默认值: {str(e)}") rag_data["total_api_call"] = 0 @@ -693,6 +739,24 @@ async def dashboard_data( except Exception as e: api_logger.warning(f"获取RAG相关数据失败: {str(e)}") + # 计算昨日对比 + try: + changes = memory_dashboard_service.get_dashboard_yesterday_changes( + db=db, + workspace_id=workspace_id, + storage_type=storage_type, + today_data=rag_data + ) + rag_data.update(changes) + except Exception as e: + api_logger.warning(f"计算RAG昨日对比失败: {str(e)}") + rag_data.update({ + "total_memory_change": None, + "total_app_change": None, + "total_knowledge_change": None, + "total_api_call_change": None, + }) + result["rag_data"] = rag_data api_logger.info("成功获取rag_data") diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 791a6fe8..7e8aa807 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -519,6 +519,206 @@ def get_rag_user_kb_total_chunk( business_logger.error(f"获取用户知识库总chunk数失败: workspace_id={workspace_id} - {str(e)}") raise +def get_dashboard_yesterday_changes( + db: Session, + workspace_id: uuid.UUID, + storage_type: str, + today_data: dict +) -> dict: + """ + 计算各指标相比昨天的变化量。 + + Args: + db: 数据库会话 + workspace_id: 工作空间ID + storage_type: 存储类型 'neo4j' | 'rag' + today_data: 当前数据,包含 total_memory, total_app, total_knowledge, total_api_call + + Returns: + { + "total_memory_change": int | None, + "total_app_change": int | None, + "total_knowledge_change": int | None, + "total_api_call_change": int | None + } + """ + from datetime import datetime, timedelta + from sqlalchemy import func + from app.models.api_key_model import ApiKey, ApiKeyLog + from app.models.knowledge_model import Knowledge + from app.models.app_model import App + from app.models.appshare_model import AppShare + + business_logger.info(f"计算昨日对比: workspace_id={workspace_id}, storage_type={storage_type}") + + now_local = datetime.now() + today_start = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + yesterday_start = today_start - timedelta(days=1) + + changes = { + "total_memory_change": None, + "total_app_change": None, + "total_knowledge_change": None, + "total_api_call_change": None, + } + + # --- total_api_call_change --- + try: + # 获取该workspace下所有api_key的id + api_key_ids = [ + row[0] for row in db.query(ApiKey.id).filter( + ApiKey.workspace_id == workspace_id + ).all() + ] + if api_key_ids: + # 今日累计 + today_api_count = db.query(func.count(ApiKeyLog.id)).filter( + ApiKeyLog.api_key_id.in_(api_key_ids), + ApiKeyLog.created_at >= today_start, + ApiKeyLog.created_at < now_local + ).scalar() or 0 + # 昨日全天 + yesterday_api_count = db.query(func.count(ApiKeyLog.id)).filter( + ApiKeyLog.api_key_id.in_(api_key_ids), + ApiKeyLog.created_at >= yesterday_start, + ApiKeyLog.created_at < today_start + ).scalar() or 0 + changes["total_api_call_change"] = today_api_count - yesterday_api_count + else: + # 没有api_key,如果今日也是0则无对比意义 + changes["total_api_call_change"] = None + except Exception as e: + business_logger.warning(f"计算API调用昨日对比失败: {str(e)}") + + # --- total_knowledge_change --- + try: + # 今天有效总量:当前status=1的知识库总数,排除用户知识库(permission_id='Memory') + today_knowledge = db.query(func.count(Knowledge.id)).filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 1, + Knowledge.permission_id != "Memory" + ).scalar() or 0 + # 昨日有效总量:昨天之前创建的、当前仍有效的知识库,排除用户知识库 + yesterday_knowledge = db.query(func.count(Knowledge.id)).filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 1, + Knowledge.permission_id != "Memory", + Knowledge.created_at < today_start + ).scalar() or 0 + # 今日软删:今天被软删的知识库(status=2 且 updated_at >= today_start),排除用户知识库 + today_deleted_knowledge = db.query(func.count(Knowledge.id)).filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 2, + Knowledge.permission_id != "Memory", + Knowledge.updated_at >= today_start + ).scalar() or 0 + + if yesterday_knowledge == 0 and today_knowledge == 0 and today_deleted_knowledge == 0: + changes["total_knowledge_change"] = None + else: + # change = 今天有效总量 - 今日软删 - 昨日有效总量 + changes["total_knowledge_change"] = today_knowledge - today_deleted_knowledge - yesterday_knowledge + except Exception as e: + business_logger.warning(f"计算知识库昨日对比失败: {str(e)}") + + # --- total_app_change --- + try: + # === 自有app === + # 今天有效总量 + today_own_apps = db.query(func.count(App.id)).filter( + App.workspace_id == workspace_id, + App.is_active == True + ).scalar() or 0 + # 昨日有效总量 + yesterday_own_apps = db.query(func.count(App.id)).filter( + App.workspace_id == workspace_id, + App.is_active == True, + App.created_at < today_start + ).scalar() or 0 + # 今日软删 + today_deleted_own_apps = db.query(func.count(App.id)).filter( + App.workspace_id == workspace_id, + App.is_active == False, + App.updated_at >= today_start + ).scalar() or 0 + + # === 被分享app === + # 今天有效总量 + today_shared_apps = db.query(func.count(AppShare.id)).filter( + AppShare.target_workspace_id == workspace_id, + AppShare.is_active == True + ).scalar() or 0 + # 昨日有效总量 + yesterday_shared_apps = db.query(func.count(AppShare.id)).filter( + AppShare.target_workspace_id == workspace_id, + AppShare.is_active == True, + AppShare.created_at < today_start + ).scalar() or 0 + # 今日软删 + today_deleted_shared_apps = db.query(func.count(AppShare.id)).filter( + AppShare.target_workspace_id == workspace_id, + AppShare.is_active == False, + AppShare.updated_at >= today_start + ).scalar() or 0 + + today_total_app = today_own_apps + today_shared_apps + yesterday_total_app = yesterday_own_apps + yesterday_shared_apps + total_deleted = today_deleted_own_apps + today_deleted_shared_apps + + if yesterday_total_app == 0 and today_total_app == 0 and total_deleted == 0: + changes["total_app_change"] = None + else: + # change = 今天有效总量 - 今日软删 - 昨日有效总量 + changes["total_app_change"] = today_total_app - total_deleted - yesterday_total_app + except Exception as e: + business_logger.warning(f"计算应用数量昨日对比失败: {str(e)}") + + # --- total_memory_change --- + try: + today_memory = today_data.get("total_memory") + if today_memory is None: + changes["total_memory_change"] = None + elif storage_type == "neo4j": + # 从 memory_increments 取最近一条 created_at < today_start 的记录 + last_record = db.query(MemoryIncrement).filter( + MemoryIncrement.workspace_id == workspace_id, + MemoryIncrement.created_at < today_start + ).order_by(desc(MemoryIncrement.created_at)).first() + if last_record is None: + changes["total_memory_change"] = None + else: + changes["total_memory_change"] = today_memory - last_record.total_num + elif storage_type == "rag": + # RAG: 查 documents 表中 created_at < today_start 的 chunk_num 之和 + from app.models.document_model import Document + from app.models.end_user_model import EndUser as _EndUser + from app.models.app_model import App as _App + + end_user_ids = [ + str(eid) for (eid,) in db.query(_EndUser.id) + .join(_App, _EndUser.app_id == _App.id) + .filter(_App.workspace_id == workspace_id) + .all() + ] + if not end_user_ids: + changes["total_memory_change"] = None + else: + file_names = [f"{uid}.txt" for uid in end_user_ids] + yesterday_chunk = db.query(func.sum(Document.chunk_num)).filter( + Document.file_name.in_(file_names), + Document.created_at < today_start + ).scalar() + if yesterday_chunk is None: + changes["total_memory_change"] = None + else: + changes["total_memory_change"] = today_memory - int(yesterday_chunk) + except Exception as e: + business_logger.warning(f"计算记忆总量昨日对比失败: {str(e)}") + + business_logger.info(f"昨日对比计算完成: {changes}") + return changes + + def get_current_user_total_chunk( end_user_id: str, db: Session, diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index b3a66734..fe0f3c32 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -614,34 +614,30 @@ async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]: async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]: + """查询用户的记忆总量(简化版本,只返回total) + + Args: + end_user_id: 用户ID + + Returns: + Dict[str, Any]: {"total": int} + """ + if not end_user_id: + return {"total": 0} + result = await _neo4j_connector.execute_query( - MemoryConfigRepository.SEARCH_FOR_ALL, - end_user_id=end_user_id, + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=[end_user_id], ) - # 检查结果是否为空或长度不足 - if not result or len(result) < 4: - data = { - "total": 0, - "counts": { - "dialogue": 0, - "chunk": 0, - "statement": 0, - "entity": 0, - }, - } - return data + # 从批量查询结果中提取该用户的total + total = 0 + for row in result: + if row["user_id"] == end_user_id: + total = row["total"] + break - data = { - "total": result[-1]["Count"], - "counts": { - "dialogue": result[0]["Count"], - "chunk": result[1]["Count"], - "statement": result[2]["Count"], - "entity": result[3]["Count"], - }, - } - return data + return {"total": total} async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]: From bd48b4fdbef51cc199a853ebda44a66d257e98ed Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 2 Apr 2026 12:26:20 +0800 Subject: [PATCH 2/5] changes:(controllers) Modify the statistical method of the knowledge base --- api/app/controllers/memory_dashboard_controller.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index c97d708c..32ab8f60 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -611,14 +611,16 @@ async def dashboard_data( except Exception as e: api_logger.warning(f"获取记忆总量失败: {str(e)}") - # 2. 获取知识库数量(total_knowledge),排除用户知识库(permission_id='Memory') + # 2. 获取知识库数量(total_knowledge) + # 逻辑:统计 knowledges 表中 workspace_id = 当前工作空间 且 parent_id = workspace_id 的记录数 + # 即只统计顶层知识库(parent_id 指向所属工作空间) try: from sqlalchemy import func as _func from app.models.knowledge_model import Knowledge as _Knowledge total_knowledge = db.query(_func.count(_Knowledge.id)).filter( _Knowledge.workspace_id == workspace_id, _Knowledge.status == 1, - _Knowledge.permission_id != "Memory" + _Knowledge.parent_id == _Knowledge.workspace_id ).scalar() or 0 neo4j_data["total_knowledge"] = total_knowledge api_logger.info(f"成功获取知识库数量: {neo4j_data['total_knowledge']}") From 8abd59b26e82214e61ad31c5dc3984a805c5cb76 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 2 Apr 2026 13:02:21 +0800 Subject: [PATCH 3/5] changes:(controllers & services) The method for calculating general data is extracted and presented as a shared function. --- .../memory_dashboard_controller.py | 117 +++--------------- api/app/services/memory_dashboard_service.py | 64 +++++++++- 2 files changed, 77 insertions(+), 104 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 32ab8f60..525fe1eb 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -591,7 +591,7 @@ async def dashboard_data( "total_api_call": None } - # 1. 获取记忆总量(total_memory) + # 1. 获取记忆总量(total_memory)—— neo4j 独有逻辑:查询 neo4j 存储节点 try: total_memory_data = await memory_dashboard_service.get_workspace_total_memory_count( db=db, @@ -600,60 +600,14 @@ async def dashboard_data( end_user_id=end_user_id ) neo4j_data["total_memory"] = total_memory_data.get("total_memory_count", 0) - # total_app: 统计当前空间下的所有app数量 - # 包含自有app + 被分享给本工作空间的app - from app.services import app_service as _app_svc - _, total_app = _app_svc.AppService(db).list_apps( - workspace_id=workspace_id, include_shared=True, pagesize=1 - ) - neo4j_data["total_app"] = total_app - api_logger.info(f"成功获取记忆总量: {neo4j_data['total_memory']}, 应用数量: {neo4j_data['total_app']}") + api_logger.info(f"成功获取记忆总量: {neo4j_data['total_memory']}") except Exception as e: api_logger.warning(f"获取记忆总量失败: {str(e)}") - # 2. 获取知识库数量(total_knowledge) - # 逻辑:统计 knowledges 表中 workspace_id = 当前工作空间 且 parent_id = workspace_id 的记录数 - # 即只统计顶层知识库(parent_id 指向所属工作空间) - try: - from sqlalchemy import func as _func - from app.models.knowledge_model import Knowledge as _Knowledge - total_knowledge = db.query(_func.count(_Knowledge.id)).filter( - _Knowledge.workspace_id == workspace_id, - _Knowledge.status == 1, - _Knowledge.parent_id == _Knowledge.workspace_id - ).scalar() or 0 - neo4j_data["total_knowledge"] = total_knowledge - api_logger.info(f"成功获取知识库数量: {neo4j_data['total_knowledge']}") - except Exception as e: - api_logger.warning(f"获取知识库数量失败: {str(e)}") - - # 3. 获取API调用统计(total_api_call)—— 仅统计当天api_key_log调用次数 - try: - from datetime import datetime - from sqlalchemy import func as _api_func - from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog - - _now = datetime.now() - _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) - - _api_key_ids = [ - row[0] for row in db.query(_ApiKey.id).filter( - _ApiKey.workspace_id == workspace_id - ).all() - ] - if _api_key_ids: - total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( - _ApiKeyLog.api_key_id.in_(_api_key_ids), - _ApiKeyLog.created_at >= _today_start, - _ApiKeyLog.created_at < _now - ).scalar() or 0 - else: - total_api_calls = 0 - neo4j_data["total_api_call"] = total_api_calls - api_logger.info(f"成功获取API调用统计(当天): {neo4j_data['total_api_call']}") - except Exception as e: - api_logger.error(f"获取API调用统计失败: {str(e)}") - neo4j_data["total_api_call"] = 0 + # 2. 获取共享统计数据(total_app、total_knowledge、total_api_call) + common_stats = memory_dashboard_service.get_dashboard_common_stats(db, workspace_id) + neo4j_data.update(common_stats) + api_logger.info(f"成功获取共享统计: app={common_stats['total_app']}, knowledge={common_stats['total_knowledge']}, api_call={common_stats['total_api_call']}") # 计算昨日对比 try: @@ -685,61 +639,18 @@ async def dashboard_data( "total_api_call": None } - # 获取RAG相关数据 + # 1. 获取记忆总量(total_memory)—— rag 独有逻辑:查询 document 表的 chunk_num try: - # total_memory: 只统计用户知识库(permission_id='Memory')的chunk数 total_chunk = memory_dashboard_service.get_rag_user_kb_total_chunk(db, current_user) rag_data["total_memory"] = total_chunk - - # total_app: 统计当前空间下的所有app数量 - # 包含自有app + 被分享给本工作空间的app - from app.services import app_service as _app_svc - _, total_app = _app_svc.AppService(db).list_apps( - workspace_id=workspace_id, include_shared=True, pagesize=1 - ) - rag_data["total_app"] = total_app - - # total_knowledge: 直接查knowledges表status=1的总数,排除用户知识库(permission_id='Memory') - from sqlalchemy import func as _func - from app.models.knowledge_model import Knowledge as _Knowledge - total_kb = db.query(_func.count(_Knowledge.id)).filter( - _Knowledge.workspace_id == workspace_id, - _Knowledge.status == 1, - _Knowledge.permission_id != "Memory" - ).scalar() or 0 - rag_data["total_knowledge"] = total_kb - - # total_api_call: 仅统计当天api_key_log调用次数 - try: - from datetime import datetime - from sqlalchemy import func as _api_func - from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog - - _now = datetime.now() - _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) - - _api_key_ids = [ - row[0] for row in db.query(_ApiKey.id).filter( - _ApiKey.workspace_id == workspace_id - ).all() - ] - if _api_key_ids: - total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( - _ApiKeyLog.api_key_id.in_(_api_key_ids), - _ApiKeyLog.created_at >= _today_start, - _ApiKeyLog.created_at < _now - ).scalar() or 0 - else: - total_api_calls = 0 - rag_data["total_api_call"] = total_api_calls - api_logger.info(f"成功获取RAG模式API调用统计(当天): {rag_data['total_api_call']}") - except Exception as e: - api_logger.warning(f"获取RAG模式API调用统计失败,使用默认值: {str(e)}") - rag_data["total_api_call"] = 0 - - api_logger.info(f"成功获取RAG相关数据: memory={total_chunk}, app={total_app}, knowledge={total_kb}, api_calls={rag_data['total_api_call']}") + api_logger.info(f"成功获取RAG记忆总量: {total_chunk}") except Exception as e: - api_logger.warning(f"获取RAG相关数据失败: {str(e)}") + api_logger.warning(f"获取RAG记忆总量失败: {str(e)}") + + # 2. 获取共享统计数据(total_app、total_knowledge、total_api_call) + common_stats = memory_dashboard_service.get_dashboard_common_stats(db, workspace_id) + rag_data.update(common_stats) + api_logger.info(f"成功获取共享统计: app={common_stats['total_app']}, knowledge={common_stats['total_knowledge']}, api_call={common_stats['total_api_call']}") # 计算昨日对比 try: diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 7e8aa807..b14a06af 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -1081,4 +1081,66 @@ async def generate_rag_profile( "tags_count": len(tags), "personas_count": len(personas), "insight_generated": bool(insight_sections.get("memory_insight")), - } \ No newline at end of file + } + + +def get_dashboard_common_stats(db: Session, workspace_id) -> dict: + """ + 获取 dashboard 中 neo4j/rag 分支共享的统计数据: + total_app、total_knowledge、total_api_call + + Returns: + dict: {"total_app": int, "total_knowledge": int, "total_api_call": int} + """ + result = {"total_app": 0, "total_knowledge": 0, "total_api_call": 0} + + # total_app: 统计当前空间下的所有app数量(包含自有 + 被分享给本工作空间的app) + try: + from app.services import app_service as _app_svc + _, total_app = _app_svc.AppService(db).list_apps( + workspace_id=workspace_id, include_shared=True, pagesize=1 + ) + result["total_app"] = total_app + except Exception as e: + business_logger.warning(f"获取应用数量失败: {e}") + + # total_knowledge: 统计顶层知识库(parent_id = workspace_id) + try: + from sqlalchemy import func as _func + from app.models.knowledge_model import Knowledge as _Knowledge + total_knowledge = db.query(_func.count(_Knowledge.id)).filter( + _Knowledge.workspace_id == workspace_id, + _Knowledge.status == 1, + _Knowledge.parent_id == _Knowledge.workspace_id + ).scalar() or 0 + result["total_knowledge"] = total_knowledge + except Exception as e: + business_logger.warning(f"获取知识库数量失败: {e}") + + # total_api_call: 仅统计当天 api_key_log 调用次数 + try: + from datetime import datetime + from sqlalchemy import func as _api_func + from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog + + _now = datetime.now() + _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) + + _api_key_ids = [ + row[0] for row in db.query(_ApiKey.id).filter( + _ApiKey.workspace_id == workspace_id + ).all() + ] + if _api_key_ids: + total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( + _ApiKeyLog.api_key_id.in_(_api_key_ids), + _ApiKeyLog.created_at >= _today_start, + _ApiKeyLog.created_at < _now + ).scalar() or 0 + else: + total_api_calls = 0 + result["total_api_call"] = total_api_calls + except Exception as e: + business_logger.warning(f"获取API调用统计失败: {e}") + + return result From 960ee9f2dfa817aa337b4a2c86aed40b6e4ececb Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 2 Apr 2026 14:07:51 +0800 Subject: [PATCH 4/5] changes:(services) Modify the query method for user memory to batch processing. --- api/app/services/memory_dashboard_service.py | 55 ++++++++------------ api/app/utils/performance_timer.py | 28 +++++++++- 2 files changed, 47 insertions(+), 36 deletions(-) diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index b14a06af..b96f4bde 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -353,15 +353,13 @@ async def get_workspace_total_memory_count( "details": [] } - # 2. 对每个 host_id 调用 search_all 获取 total + # 2. 使用 search_all_batch 批量查询所有宿主的记忆数量 from app.services import memory_storage_service - total_count = 0 - details = [] - # 如果提供了 end_user_id,只查询该用户 if end_user_id: - search_result = await memory_storage_service.search_all(end_user_id=end_user_id) + batch_result = await memory_storage_service.search_all_batch([end_user_id]) + count = batch_result.get(end_user_id, 0) # 查询用户名称 from app.repositories.end_user_repository import EndUserRepository repo = EndUserRepository(db) @@ -369,42 +367,31 @@ async def get_workspace_total_memory_count( user_name = end_user.other_name if end_user else None return { - "total_memory_count": search_result.get("total", 0), + "total_memory_count": count, "host_count": 1, "details": [{ "end_user_id": end_user_id, - "count": search_result.get("total", 0), + "count": count, "name": user_name }] } - for host in hosts: - try: - end_user_id_str = str(host.id) - - search_result = await memory_storage_service.search_all( - end_user_id=end_user_id_str - ) - - host_total = search_result.get("total", 0) - total_count += host_total - - details.append({ - "end_user_id": end_user_id_str, - "count": host_total, - "name": host.other_name # 使用 other_name 字段 - }) - - business_logger.debug(f"EndUser {end_user_id_str} ({host.other_name}) 记忆数: {host_total}") - - except Exception as e: - business_logger.warning(f"获取 end_user {host.id} 记忆数失败: {str(e)}") - # 失败的 host 记为 0 - details.append({ - "end_user_id": str(host.id), - "count": 0, - "name": host.other_name # 使用 other_name 字段 - }) + # 批量查询所有宿主记忆数量(一次 Neo4j 查询) + end_user_ids = [str(host.id) for host in hosts] + batch_result = await memory_storage_service.search_all_batch(end_user_ids) + + # 构建 host name 映射 + host_name_map = {str(host.id): host.other_name for host in hosts} + + total_count = sum(batch_result.values()) + details = [ + { + "end_user_id": uid, + "count": batch_result.get(uid, 0), + "name": host_name_map.get(uid) + } + for uid in end_user_ids + ] result = { "total_memory_count": total_count, diff --git a/api/app/utils/performance_timer.py b/api/app/utils/performance_timer.py index 6b0ec5d6..04e52fb1 100644 --- a/api/app/utils/performance_timer.py +++ b/api/app/utils/performance_timer.py @@ -6,13 +6,13 @@ """ import time -from contextlib import contextmanager +from contextlib import asynccontextmanager, contextmanager from app.core.logging_config import get_api_logger # 获取API专用日志器 api_logger = get_api_logger() - +# 同步的上下文管理器,使用@contextmanager修饰 @contextmanager def timer(label: str, user_count: int = 0): """上下文管理器:用于测量代码块执行时间 @@ -35,3 +35,27 @@ def timer(label: str, user_count: int = 0): elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒 extra_info = f", 用户数: {user_count}" if user_count > 0 else "" api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}") + +# 异步的上下文管理器,使用@asynccontextmanager装饰 +@asynccontextmanager +async def async_timer(label: str, user_count: int = 0): + """异步上下文管理器:用于测量包含 await 的异步代码块执行时间 + + Args: + label: 统计标签,用于标识被测量的代码块 + user_count: 用户数,可选参数,用于记录处理的用户数量 + + Usage: + async with async_timer("获取用户列表"): + users = await get_users() + + async with async_timer("批量处理", user_count=len(user_ids)): + await process_users(user_ids) + """ + start = time.perf_counter() + try: + yield + finally: + elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒 + extra_info = f", 用户数: {user_count}" if user_count > 0 else "" + api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}") From abbd92b74c999300060a1e6db70c4015f393beda Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 2 Apr 2026 14:19:27 +0800 Subject: [PATCH 5/5] Interface performance optimization, using only one function --- .../controllers/memory_storage_controller.py | 7 ++- api/app/services/memory_storage_service.py | 27 -------- api/app/tasks.py | 61 ++++++------------- 3 files changed, 23 insertions(+), 72 deletions(-) diff --git a/api/app/controllers/memory_storage_controller.py b/api/app/controllers/memory_storage_controller.py index d8b39325..76eed50f 100644 --- a/api/app/controllers/memory_storage_controller.py +++ b/api/app/controllers/memory_storage_controller.py @@ -26,7 +26,7 @@ from app.services.memory_storage_service import ( analytics_hot_memory_tags, analytics_recent_activity_stats, kb_type_distribution, - search_all, + search_all_batch, search_chunk, search_detials, search_dialogue, @@ -409,7 +409,10 @@ async def search_all_num( ) -> dict: api_logger.info(f"Search all requested for end_user_id: {end_user_id}") try: - result = await search_all(end_user_id) + if not end_user_id: + return success(data={"total": 0}, msg="查询成功") + batch_result = await search_all_batch([end_user_id]) + result = {"total": batch_result.get(end_user_id, 0)} return success(data=result, msg="查询成功") except Exception as e: api_logger.error(f"Search all failed: {str(e)}") diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index fe0f3c32..132370b6 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -613,33 +613,6 @@ async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]: return data -async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]: - """查询用户的记忆总量(简化版本,只返回total) - - Args: - end_user_id: 用户ID - - Returns: - Dict[str, Any]: {"total": int} - """ - if not end_user_id: - return {"total": 0} - - result = await _neo4j_connector.execute_query( - MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, - end_user_ids=[end_user_id], - ) - - # 从批量查询结果中提取该用户的total - total = 0 - for row in result: - if row["user_id"] == end_user_id: - total = row["total"] - break - - return {"total": total} - - async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]: """统一知识库类型分布接口。 diff --git a/api/app/tasks.py b/api/app/tasks.py index c15aaeeb..5d29488a 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1324,7 +1324,7 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]: from app.models.app_model import App from app.models.end_user_model import EndUser from app.repositories.memory_increment_repository import write_memory_increment - from app.services.memory_storage_service import search_all + from app.services.memory_storage_service import search_all_batch with get_db_context() as db: try: @@ -1358,27 +1358,15 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]: EndUser.workspace_id == workspace_id ).distinct().all() - # 3. 遍历所有end_user,查询每个宿主的记忆总量并累加 - total_num = 0 - end_user_details = [] + # 3. 批量查询所有宿主的记忆总量 + end_user_id_list = [str(eid) for (eid,) in end_users] + batch_result = await search_all_batch(end_user_id_list) - for (end_user_id,) in end_users: - try: - # 调用 search_all 接口查询该宿主的总量 - result = await search_all(str(end_user_id)) - user_total = result.get("total", 0) - total_num += user_total - end_user_details.append({ - "end_user_id": str(end_user_id), - "total": user_total - }) - except Exception as e: - # 记录单个用户查询失败,但继续处理其他用户 - end_user_details.append({ - "end_user_id": str(end_user_id), - "total": 0, - "error": str(e) - }) + total_num = sum(batch_result.values()) + end_user_details = [ + {"end_user_id": uid, "total": batch_result.get(uid, 0)} + for uid in end_user_id_list + ] # 4. 写入数据库 memory_increment = write_memory_increment( @@ -1441,7 +1429,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]: from app.models.end_user_model import EndUser from app.models.workspace_model import Workspace from app.repositories.memory_increment_repository import write_memory_increment - from app.services.memory_storage_service import search_all + from app.services.memory_storage_service import search_all_batch with get_db_context() as db: try: @@ -1499,28 +1487,15 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]: EndUser.workspace_id == workspace_id ).distinct().all() - # 3. 遍历所有end_user,查询每个宿主的记忆总量并累加 - total_num = 0 - end_user_details = [] + # 3. 批量查询所有宿主的记忆总量 + end_user_id_list = [str(eid) for (eid,) in end_users] + batch_result = await search_all_batch(end_user_id_list) - for (end_user_id,) in end_users: - try: - # 调用 search_all 接口查询该宿主的总量 - result = await search_all(str(end_user_id)) - user_total = result.get("total", 0) - total_num += user_total - end_user_details.append({ - "end_user_id": str(end_user_id), - "total": user_total - }) - except Exception as e: - # 记录单个用户查询失败,但继续处理其他用户 - logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}") - end_user_details.append({ - "end_user_id": str(end_user_id), - "total": 0, - "error": str(e) - }) + total_num = sum(batch_result.values()) + end_user_details = [ + {"end_user_id": uid, "total": batch_result.get(uid, 0)} + for uid in end_user_id_list + ] # 4. 写入数据库 memory_increment = write_memory_increment(