From 6eca5f6cdff1e5c1d246c4cff7e67ab63c337bac Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 1 Apr 2026 18:17:25 +0800 Subject: [PATCH] feat:(controllers & services) Changes in data from yesterday to today --- api/app/controllers/knowledge_controller.py | 1 + .../memory_dashboard_controller.py | 134 +++++++++--- api/app/services/memory_dashboard_service.py | 200 ++++++++++++++++++ api/app/services/memory_storage_service.py | 44 ++-- 4 files changed, 320 insertions(+), 59 deletions(-) diff --git a/api/app/controllers/knowledge_controller.py b/api/app/controllers/knowledge_controller.py index 74b832cd..afda7cce 100644 --- a/api/app/controllers/knowledge_controller.py +++ b/api/app/controllers/knowledge_controller.py @@ -352,6 +352,7 @@ async def delete_knowledge( # 2. Soft-delete knowledge base api_logger.debug(f"Perform a soft delete: {db_knowledge.name} (ID: {knowledge_id})") db_knowledge.status = 2 + db_knowledge.updated_at = datetime.datetime.now() db.commit() api_logger.info(f"The knowledge base has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})") return success(msg="The knowledge base has been successfully deleted") diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index bedee987..c97d708c 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -611,38 +611,66 @@ async def dashboard_data( except Exception as e: api_logger.warning(f"获取记忆总量失败: {str(e)}") - # 2. 获取知识库类型统计(total_knowledge) + # 2. 获取知识库数量(total_knowledge),排除用户知识库(permission_id='Memory') try: - from app.services.memory_agent_service import MemoryAgentService - memory_agent_service = MemoryAgentService() - knowledge_stats = await memory_agent_service.get_knowledge_type_stats( - end_user_id=end_user_id, - only_active=True, - current_workspace_id=workspace_id, - db=db - ) - neo4j_data["total_knowledge"] = knowledge_stats.get("total", 0) - api_logger.info(f"成功获取知识库类型统计total: {neo4j_data['total_knowledge']}") + from sqlalchemy import func as _func + from app.models.knowledge_model import Knowledge as _Knowledge + total_knowledge = db.query(_func.count(_Knowledge.id)).filter( + _Knowledge.workspace_id == workspace_id, + _Knowledge.status == 1, + _Knowledge.permission_id != "Memory" + ).scalar() or 0 + neo4j_data["total_knowledge"] = total_knowledge + api_logger.info(f"成功获取知识库数量: {neo4j_data['total_knowledge']}") except Exception as e: - api_logger.warning(f"获取知识库类型统计失败: {str(e)}") + api_logger.warning(f"获取知识库数量失败: {str(e)}") - # 3. 获取API调用统计(total_api_call) + # 3. 获取API调用统计(total_api_call)—— 仅统计当天api_key_log调用次数 try: - # 使用 AppStatisticsService 获取真实的API调用统计 - app_stats_service = AppStatisticsService(db) - api_stats = app_stats_service.get_workspace_api_statistics( - workspace_id=workspace_id, - start_date=start_date, - end_date=end_date - ) - # 计算总调用次数 - total_api_calls = sum(item.get("total_calls", 0) for item in api_stats) + from datetime import datetime + from sqlalchemy import func as _api_func + from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog + + _now = datetime.now() + _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) + + _api_key_ids = [ + row[0] for row in db.query(_ApiKey.id).filter( + _ApiKey.workspace_id == workspace_id + ).all() + ] + if _api_key_ids: + total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( + _ApiKeyLog.api_key_id.in_(_api_key_ids), + _ApiKeyLog.created_at >= _today_start, + _ApiKeyLog.created_at < _now + ).scalar() or 0 + else: + total_api_calls = 0 neo4j_data["total_api_call"] = total_api_calls - api_logger.info(f"成功获取API调用统计: {neo4j_data['total_api_call']}") + api_logger.info(f"成功获取API调用统计(当天): {neo4j_data['total_api_call']}") except Exception as e: api_logger.error(f"获取API调用统计失败: {str(e)}") neo4j_data["total_api_call"] = 0 + # 计算昨日对比 + try: + changes = memory_dashboard_service.get_dashboard_yesterday_changes( + db=db, + workspace_id=workspace_id, + storage_type=storage_type, + today_data=neo4j_data + ) + neo4j_data.update(changes) + except Exception as e: + api_logger.warning(f"计算neo4j昨日对比失败: {str(e)}") + neo4j_data.update({ + "total_memory_change": None, + "total_app_change": None, + "total_knowledge_change": None, + "total_api_call_change": None, + }) + result["neo4j_data"] = neo4j_data api_logger.info("成功获取neo4j_data") @@ -669,22 +697,40 @@ async def dashboard_data( ) rag_data["total_app"] = total_app - # total_knowledge: 使用 total_kb(总知识库数) - total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user) + # total_knowledge: 直接查knowledges表status=1的总数,排除用户知识库(permission_id='Memory') + from sqlalchemy import func as _func + from app.models.knowledge_model import Knowledge as _Knowledge + total_kb = db.query(_func.count(_Knowledge.id)).filter( + _Knowledge.workspace_id == workspace_id, + _Knowledge.status == 1, + _Knowledge.permission_id != "Memory" + ).scalar() or 0 rag_data["total_knowledge"] = total_kb - # total_api_call: 使用 AppStatisticsService 获取真实的API调用统计 + # total_api_call: 仅统计当天api_key_log调用次数 try: - app_stats_service = AppStatisticsService(db) - api_stats = app_stats_service.get_workspace_api_statistics( - workspace_id=workspace_id, - start_date=start_date, - end_date=end_date - ) - # 计算总调用次数 - total_api_calls = sum(item.get("total_calls", 0) for item in api_stats) + from datetime import datetime + from sqlalchemy import func as _api_func + from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog + + _now = datetime.now() + _today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0) + + _api_key_ids = [ + row[0] for row in db.query(_ApiKey.id).filter( + _ApiKey.workspace_id == workspace_id + ).all() + ] + if _api_key_ids: + total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter( + _ApiKeyLog.api_key_id.in_(_api_key_ids), + _ApiKeyLog.created_at >= _today_start, + _ApiKeyLog.created_at < _now + ).scalar() or 0 + else: + total_api_calls = 0 rag_data["total_api_call"] = total_api_calls - api_logger.info(f"成功获取RAG模式API调用统计: {rag_data['total_api_call']}") + api_logger.info(f"成功获取RAG模式API调用统计(当天): {rag_data['total_api_call']}") except Exception as e: api_logger.warning(f"获取RAG模式API调用统计失败,使用默认值: {str(e)}") rag_data["total_api_call"] = 0 @@ -693,6 +739,24 @@ async def dashboard_data( except Exception as e: api_logger.warning(f"获取RAG相关数据失败: {str(e)}") + # 计算昨日对比 + try: + changes = memory_dashboard_service.get_dashboard_yesterday_changes( + db=db, + workspace_id=workspace_id, + storage_type=storage_type, + today_data=rag_data + ) + rag_data.update(changes) + except Exception as e: + api_logger.warning(f"计算RAG昨日对比失败: {str(e)}") + rag_data.update({ + "total_memory_change": None, + "total_app_change": None, + "total_knowledge_change": None, + "total_api_call_change": None, + }) + result["rag_data"] = rag_data api_logger.info("成功获取rag_data") diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 791a6fe8..7e8aa807 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -519,6 +519,206 @@ def get_rag_user_kb_total_chunk( business_logger.error(f"获取用户知识库总chunk数失败: workspace_id={workspace_id} - {str(e)}") raise +def get_dashboard_yesterday_changes( + db: Session, + workspace_id: uuid.UUID, + storage_type: str, + today_data: dict +) -> dict: + """ + 计算各指标相比昨天的变化量。 + + Args: + db: 数据库会话 + workspace_id: 工作空间ID + storage_type: 存储类型 'neo4j' | 'rag' + today_data: 当前数据,包含 total_memory, total_app, total_knowledge, total_api_call + + Returns: + { + "total_memory_change": int | None, + "total_app_change": int | None, + "total_knowledge_change": int | None, + "total_api_call_change": int | None + } + """ + from datetime import datetime, timedelta + from sqlalchemy import func + from app.models.api_key_model import ApiKey, ApiKeyLog + from app.models.knowledge_model import Knowledge + from app.models.app_model import App + from app.models.appshare_model import AppShare + + business_logger.info(f"计算昨日对比: workspace_id={workspace_id}, storage_type={storage_type}") + + now_local = datetime.now() + today_start = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + yesterday_start = today_start - timedelta(days=1) + + changes = { + "total_memory_change": None, + "total_app_change": None, + "total_knowledge_change": None, + "total_api_call_change": None, + } + + # --- total_api_call_change --- + try: + # 获取该workspace下所有api_key的id + api_key_ids = [ + row[0] for row in db.query(ApiKey.id).filter( + ApiKey.workspace_id == workspace_id + ).all() + ] + if api_key_ids: + # 今日累计 + today_api_count = db.query(func.count(ApiKeyLog.id)).filter( + ApiKeyLog.api_key_id.in_(api_key_ids), + ApiKeyLog.created_at >= today_start, + ApiKeyLog.created_at < now_local + ).scalar() or 0 + # 昨日全天 + yesterday_api_count = db.query(func.count(ApiKeyLog.id)).filter( + ApiKeyLog.api_key_id.in_(api_key_ids), + ApiKeyLog.created_at >= yesterday_start, + ApiKeyLog.created_at < today_start + ).scalar() or 0 + changes["total_api_call_change"] = today_api_count - yesterday_api_count + else: + # 没有api_key,如果今日也是0则无对比意义 + changes["total_api_call_change"] = None + except Exception as e: + business_logger.warning(f"计算API调用昨日对比失败: {str(e)}") + + # --- total_knowledge_change --- + try: + # 今天有效总量:当前status=1的知识库总数,排除用户知识库(permission_id='Memory') + today_knowledge = db.query(func.count(Knowledge.id)).filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 1, + Knowledge.permission_id != "Memory" + ).scalar() or 0 + # 昨日有效总量:昨天之前创建的、当前仍有效的知识库,排除用户知识库 + yesterday_knowledge = db.query(func.count(Knowledge.id)).filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 1, + Knowledge.permission_id != "Memory", + Knowledge.created_at < today_start + ).scalar() or 0 + # 今日软删:今天被软删的知识库(status=2 且 updated_at >= today_start),排除用户知识库 + today_deleted_knowledge = db.query(func.count(Knowledge.id)).filter( + Knowledge.workspace_id == workspace_id, + Knowledge.status == 2, + Knowledge.permission_id != "Memory", + Knowledge.updated_at >= today_start + ).scalar() or 0 + + if yesterday_knowledge == 0 and today_knowledge == 0 and today_deleted_knowledge == 0: + changes["total_knowledge_change"] = None + else: + # change = 今天有效总量 - 今日软删 - 昨日有效总量 + changes["total_knowledge_change"] = today_knowledge - today_deleted_knowledge - yesterday_knowledge + except Exception as e: + business_logger.warning(f"计算知识库昨日对比失败: {str(e)}") + + # --- total_app_change --- + try: + # === 自有app === + # 今天有效总量 + today_own_apps = db.query(func.count(App.id)).filter( + App.workspace_id == workspace_id, + App.is_active == True + ).scalar() or 0 + # 昨日有效总量 + yesterday_own_apps = db.query(func.count(App.id)).filter( + App.workspace_id == workspace_id, + App.is_active == True, + App.created_at < today_start + ).scalar() or 0 + # 今日软删 + today_deleted_own_apps = db.query(func.count(App.id)).filter( + App.workspace_id == workspace_id, + App.is_active == False, + App.updated_at >= today_start + ).scalar() or 0 + + # === 被分享app === + # 今天有效总量 + today_shared_apps = db.query(func.count(AppShare.id)).filter( + AppShare.target_workspace_id == workspace_id, + AppShare.is_active == True + ).scalar() or 0 + # 昨日有效总量 + yesterday_shared_apps = db.query(func.count(AppShare.id)).filter( + AppShare.target_workspace_id == workspace_id, + AppShare.is_active == True, + AppShare.created_at < today_start + ).scalar() or 0 + # 今日软删 + today_deleted_shared_apps = db.query(func.count(AppShare.id)).filter( + AppShare.target_workspace_id == workspace_id, + AppShare.is_active == False, + AppShare.updated_at >= today_start + ).scalar() or 0 + + today_total_app = today_own_apps + today_shared_apps + yesterday_total_app = yesterday_own_apps + yesterday_shared_apps + total_deleted = today_deleted_own_apps + today_deleted_shared_apps + + if yesterday_total_app == 0 and today_total_app == 0 and total_deleted == 0: + changes["total_app_change"] = None + else: + # change = 今天有效总量 - 今日软删 - 昨日有效总量 + changes["total_app_change"] = today_total_app - total_deleted - yesterday_total_app + except Exception as e: + business_logger.warning(f"计算应用数量昨日对比失败: {str(e)}") + + # --- total_memory_change --- + try: + today_memory = today_data.get("total_memory") + if today_memory is None: + changes["total_memory_change"] = None + elif storage_type == "neo4j": + # 从 memory_increments 取最近一条 created_at < today_start 的记录 + last_record = db.query(MemoryIncrement).filter( + MemoryIncrement.workspace_id == workspace_id, + MemoryIncrement.created_at < today_start + ).order_by(desc(MemoryIncrement.created_at)).first() + if last_record is None: + changes["total_memory_change"] = None + else: + changes["total_memory_change"] = today_memory - last_record.total_num + elif storage_type == "rag": + # RAG: 查 documents 表中 created_at < today_start 的 chunk_num 之和 + from app.models.document_model import Document + from app.models.end_user_model import EndUser as _EndUser + from app.models.app_model import App as _App + + end_user_ids = [ + str(eid) for (eid,) in db.query(_EndUser.id) + .join(_App, _EndUser.app_id == _App.id) + .filter(_App.workspace_id == workspace_id) + .all() + ] + if not end_user_ids: + changes["total_memory_change"] = None + else: + file_names = [f"{uid}.txt" for uid in end_user_ids] + yesterday_chunk = db.query(func.sum(Document.chunk_num)).filter( + Document.file_name.in_(file_names), + Document.created_at < today_start + ).scalar() + if yesterday_chunk is None: + changes["total_memory_change"] = None + else: + changes["total_memory_change"] = today_memory - int(yesterday_chunk) + except Exception as e: + business_logger.warning(f"计算记忆总量昨日对比失败: {str(e)}") + + business_logger.info(f"昨日对比计算完成: {changes}") + return changes + + def get_current_user_total_chunk( end_user_id: str, db: Session, diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index b3a66734..fe0f3c32 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -614,34 +614,30 @@ async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]: async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]: + """查询用户的记忆总量(简化版本,只返回total) + + Args: + end_user_id: 用户ID + + Returns: + Dict[str, Any]: {"total": int} + """ + if not end_user_id: + return {"total": 0} + result = await _neo4j_connector.execute_query( - MemoryConfigRepository.SEARCH_FOR_ALL, - end_user_id=end_user_id, + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=[end_user_id], ) - # 检查结果是否为空或长度不足 - if not result or len(result) < 4: - data = { - "total": 0, - "counts": { - "dialogue": 0, - "chunk": 0, - "statement": 0, - "entity": 0, - }, - } - return data + # 从批量查询结果中提取该用户的total + total = 0 + for row in result: + if row["user_id"] == end_user_id: + total = row["total"] + break - data = { - "total": result[-1]["Count"], - "counts": { - "dialogue": result[0]["Count"], - "chunk": result[1]["Count"], - "statement": result[2]["Count"], - "entity": result[3]["Count"], - }, - } - return data + return {"total": total} async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]: