diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index 71e6e7ca..cf40e99a 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -22,6 +22,7 @@ from app.services import app_service, workspace_service from app.services.agent_config_helper import enrich_agent_config from app.services.app_service import AppService from app.services.workflow_service import WorkflowService, get_workflow_service +from app.services.app_statistics_service import AppStatisticsService router = APIRouter(prefix="/apps", tags=["Apps"]) logger = get_business_logger() @@ -904,8 +905,6 @@ def get_app_statistics( - total_tokens: 总token消耗 """ workspace_id = current_user.current_workspace_id - - from app.services.app_statistics_service import AppStatisticsService stats_service = AppStatisticsService(db) result = stats_service.get_app_statistics( @@ -916,3 +915,36 @@ def get_app_statistics( ) return success(data=result) + + +@router.get("/workspace/api-statistics", summary="工作空间API调用统计") +@cur_workspace_access_guard() +def get_workspace_api_statistics( + start_date: int, + end_date: int, + db: Session = Depends(get_db), + current_user=Depends(get_current_user), +): + """获取工作空间API调用统计 + + Args: + start_date: 开始时间戳(毫秒) + end_date: 结束时间戳(毫秒) + + Returns: + 每日统计数据列表,每项包含: + - date: 日期 + - total_calls: 当日总调用次数 + - app_calls: 当日应用调用次数 + - service_calls: 当日服务调用次数 + """ + workspace_id = current_user.current_workspace_id + stats_service = AppStatisticsService(db) + + result = stats_service.get_workspace_api_statistics( + workspace_id=workspace_id, + start_date=start_date, + end_date=end_date + ) + + return success(data=result) diff --git a/api/app/repositories/model_repository.py b/api/app/repositories/model_repository.py index 3d66964a..f323b30c 100644 --- a/api/app/repositories/model_repository.py +++ b/api/app/repositories/model_repository.py @@ -583,7 +583,7 @@ class ModelApiKeyRepository: db_api_key.usage_count = str(current_count + 1) db_api_key.last_used_at = func.now() - db.commit() + db.flush() db_logger.debug(f"API Key使用统计更新成功: api_key_id={api_key_id}") return True diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 3556bb88..5e989150 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -64,7 +64,7 @@ class AppChatService: # 获取模型配置ID model_config_id = config.default_model_config_id - api_key_obj = ModelApiKeyService.get_a_api_key(self.db, model_config_id) + api_key_obj = ModelApiKeyService.get_available_api_key(self.db, model_config_id) # 处理系统提示词(支持变量替换) system_prompt = config.system_prompt if variables: @@ -211,6 +211,8 @@ class AppChatService: } ) + ModelApiKeyService.record_api_key_usage(self.db, api_key_obj.id) + elapsed_time = time.time() - start_time return { @@ -249,7 +251,7 @@ class AppChatService: # 获取模型配置ID model_config_id = config.default_model_config_id - api_key_obj = ModelApiKeyService.get_a_api_key(self.db, model_config_id) + api_key_obj = ModelApiKeyService.get_available_api_key(self.db, model_config_id) # 处理系统提示词(支持变量替换) system_prompt = config.system_prompt if variables: @@ -411,6 +413,8 @@ class AppChatService: } ) + ModelApiKeyService.record_api_key_usage(self.db, api_key_obj.id) + # 发送结束事件 end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)} yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n" diff --git a/api/app/services/app_statistics_service.py b/api/app/services/app_statistics_service.py index 5cfa3229..9eefd343 100644 --- a/api/app/services/app_statistics_service.py +++ b/api/app/services/app_statistics_service.py @@ -1,15 +1,13 @@ """应用统计服务""" from datetime import datetime, timedelta -from typing import Dict, Any, List +from typing import Dict, Any import uuid from sqlalchemy import func, and_, cast, Date from sqlalchemy.orm import Session from app.models.conversation_model import Conversation, Message from app.models.end_user_model import EndUser -from app.models.api_key_model import ApiKey, ApiKeyLog -from app.core.exceptions import BusinessException -from app.core.error_codes import BizCode +from app.models.api_key_model import ApiKey, ApiKeyLog, ApiKeyType class AppStatisticsService: @@ -146,7 +144,6 @@ class AppStatisticsService: end_dt: datetime ) -> Dict[str, Any]: """获取Token消耗统计(从Message的meta_data中提取)""" - from sqlalchemy import text # 查询所有相关消息的token使用情况 # meta_data中可能包含: {"usage": {"total_tokens": 100}} 或 {"tokens": 100} @@ -191,3 +188,76 @@ class AppStatisticsService: total = sum(row["count"] for row in daily_data) return {"daily": daily_data, "total": total} + + def get_workspace_api_statistics( + self, + workspace_id: uuid.UUID, + start_date: int, + end_date: int + ) -> list[Any]: + """获取工作空间API调用统计 + + Args: + workspace_id: 工作空间ID + start_date: 开始时间戳(毫秒) + end_date: 结束时间戳(毫秒) + + Returns: + 每日统计数据列表 + """ + # 将毫秒时间戳转换为 datetime + start_time = datetime.fromtimestamp(start_date / 1000) + end_time = datetime.fromtimestamp(end_date / 1000) + + # 应用类型(agent, multi_agent, workflow) + app_types = [ApiKeyType.AGENT, ApiKeyType.CLUSTER, ApiKeyType.WORKFLOW] + + # 每日应用类型调用次数 + daily_app_calls = self.db.query( + cast(ApiKeyLog.created_at, Date).label('date'), + func.count(ApiKeyLog.id).label('count') + ).join( + ApiKey, ApiKeyLog.api_key_id == ApiKey.id + ).filter( + and_( + ApiKey.workspace_id == workspace_id, + ApiKey.type.in_(app_types), + ApiKeyLog.created_at >= start_time, + ApiKeyLog.created_at <= end_time + ) + ).group_by(cast(ApiKeyLog.created_at, Date)).all() + + # 每日服务类型调用次数 + daily_service_calls = self.db.query( + cast(ApiKeyLog.created_at, Date).label('date'), + func.count(ApiKeyLog.id).label('count') + ).join( + ApiKey, ApiKeyLog.api_key_id == ApiKey.id + ).filter( + and_( + ApiKey.workspace_id == workspace_id, + ApiKey.type == ApiKeyType.SERVICE, + ApiKeyLog.created_at >= start_time, + ApiKeyLog.created_at <= end_time + ) + ).group_by(cast(ApiKeyLog.created_at, Date)).all() + + # 构建每日数据 + app_calls_dict = {str(row.date): row.count for row in daily_app_calls} + service_calls_dict = {str(row.date): row.count for row in daily_service_calls} + + # 合并所有日期 + all_dates = sorted(set(app_calls_dict.keys()) | set(service_calls_dict.keys())) + + daily_data = [] + for date in all_dates: + app_count = app_calls_dict.get(date, 0) + service_count = service_calls_dict.get(date, 0) + daily_data.append({ + "date": date, + "total_calls": app_count + service_count, + "app_calls": app_count, + "service_calls": service_count + }) + + return daily_data diff --git a/api/app/services/collaborative_orchestrator.py b/api/app/services/collaborative_orchestrator.py index f01b7e01..00a731de 100644 --- a/api/app/services/collaborative_orchestrator.py +++ b/api/app/services/collaborative_orchestrator.py @@ -24,6 +24,7 @@ from app.core.error_codes import BizCode from app.core.models import RedBearLLM from app.core.models.base import RedBearModelConfig from app.models import ModelType +from app.services.model_service import ModelApiKeyService logger = get_business_logger() @@ -357,6 +358,8 @@ class CollaborativeOrchestrator: "usage": response.get("usage", {"total_tokens": 0}), "is_final_answer": True } + + ModelApiKeyService.record_api_key_usage(self.db, agent_config.get("api_key_id")) # 检查是否有工具调用(handoff) tool_calls = response.get("tool_calls", []) @@ -427,7 +430,7 @@ class CollaborativeOrchestrator: ) # 获取 API Key - api_key_config = ModelApiKeyService.get_a_api_key(self.db, model_config_id) + api_key_config = ModelApiKeyService.get_available_api_key(self.db, model_config_id) if not api_key_config: raise BusinessException( f"Agent 模型没有可用的 API Key: {agent_id}", @@ -442,7 +445,8 @@ class CollaborativeOrchestrator: "provider": api_key_config.provider, "api_key": api_key_config.api_key, "api_base": api_key_config.api_base, - "model_parameters": config_data.get("model_parameters", {}) + "model_parameters": config_data.get("model_parameters", {}), + "api_key_id": api_key_config.id } except ValueError: diff --git a/api/app/services/draft_run_service.py b/api/app/services/draft_run_service.py index 61b41b6c..40ef4971 100644 --- a/api/app/services/draft_run_service.py +++ b/api/app/services/draft_run_service.py @@ -29,6 +29,7 @@ from app.services import task_service from app.services.langchain_tool_server import Search from app.services.memory_agent_service import MemoryAgentService from app.services.model_parameter_merger import ModelParameterMerger +from app.services.model_service import ModelApiKeyService from app.services.tool_service import ToolService from app.services.multimodal_service import MultimodalService from app.core.agent.agent_middleware import AgentMiddleware @@ -471,6 +472,8 @@ class DraftRunService: elapsed_time = time.time() - start_time + ModelApiKeyService.record_api_key_usage(self.db, api_key_config.get("api_key_id")) + # 9. 保存会话消息 if not sub_agent and agent_config.memory and agent_config.memory.get("enabled"): await self._save_conversation_message( @@ -742,6 +745,8 @@ class DraftRunService: elapsed_time = time.time() - start_time + ModelApiKeyService.record_api_key_usage(self.db, api_key_config.get("api_key_id")) + if sub_agent: yield self._format_sse_event("sub_usage", { "total_tokens": total_tokens @@ -808,7 +813,7 @@ class DraftRunService: Raises: BusinessException: 当没有可用的 API Key 时 """ - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config_id) + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config_id) # stmt = ( # select(ModelApiKey).join( # ModelConfig, ModelApiKey.model_configs @@ -822,7 +827,8 @@ class DraftRunService: # ) # # api_key = self.db.scalars(stmt).first() - api_key = api_keys[0] if api_keys else None + # api_key = api_keys[0] if api_keys else None + api_key = ModelApiKeyService.get_available_api_key(self.db, model_config_id) if not api_key: raise BusinessException("没有可用的 API Key", BizCode.AGENT_CONFIG_MISSING) @@ -831,7 +837,8 @@ class DraftRunService: "model_name": api_key.model_name, "provider": api_key.provider, "api_key": api_key.api_key, - "api_base": api_key.api_base + "api_base": api_key.api_base, + "api_key_id": api_key.id } async def _ensure_conversation( diff --git a/api/app/services/handoffs_service.py b/api/app/services/handoffs_service.py index 10e4d646..e490eea4 100644 --- a/api/app/services/handoffs_service.py +++ b/api/app/services/handoffs_service.py @@ -537,7 +537,7 @@ def convert_multi_agent_config_to_handoffs( # 获取该 Agent 的模型配置 if release.default_model_config_id: - model_api_key = ModelApiKeyService.get_a_api_key(db, release.default_model_config_id) + model_api_key = ModelApiKeyService.get_available_api_key(db, release.default_model_config_id) if model_api_key: model_config = RedBearModelConfig( model_name=model_api_key.model_name, @@ -551,6 +551,7 @@ def convert_multi_agent_config_to_handoffs( } ) logger.debug(f"Agent {agent_name} 使用模型: {model_api_key.model_name}") + ModelApiKeyService.record_api_key_usage(db, model_api_key.id) else: logger.warning(f"Agent {agent_name} 模型配置无效: {release.default_model_config_id}") else: diff --git a/api/app/services/llm_router.py b/api/app/services/llm_router.py index 9e102ac3..e56ad5aa 100644 --- a/api/app/services/llm_router.py +++ b/api/app/services/llm_router.py @@ -382,6 +382,7 @@ class LLMRouter: from app.core.models import RedBearLLM from app.core.models.base import RedBearModelConfig from app.models import ModelApiKey, ModelType + from app.services.model_service import ModelApiKeyService # 获取 API Key 配置(通过关联关系) # api_key_config = self.db.query(ModelApiKey).join( @@ -389,8 +390,9 @@ class LLMRouter: # ).filter(ModelConfig.id == self.routing_model_config.id, # ModelApiKey.is_active == True # ).first() - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, self.routing_model_config.id) - api_key_config = api_keys[0] if api_keys else None + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, self.routing_model_config.id) + # api_key_config = api_keys[0] if api_keys else None + api_key_config = ModelApiKeyService.get_available_api_key(self.db, self.routing_model_config.id) if not api_key_config: raise Exception("路由模型没有可用的 API Key") @@ -424,7 +426,6 @@ class LLMRouter: # 调用模型 response = await llm.ainvoke(prompt) - from app.services.model_service import ModelApiKeyService ModelApiKeyService.record_api_key_usage(self.db, api_key_config.id) # 提取响应内容 diff --git a/api/app/services/master_agent_router.py b/api/app/services/master_agent_router.py index 87fdb22c..3cf3ecc3 100644 --- a/api/app/services/master_agent_router.py +++ b/api/app/services/master_agent_router.py @@ -349,7 +349,7 @@ class MasterAgentRouter: from app.models import ModelApiKey, ModelType # 获取 API Key 配置 - api_key_config = ModelApiKeyService.get_a_api_key(self.db, self.master_model_config.id) + api_key_config = ModelApiKeyService.get_available_api_key(self.db, self.master_model_config.id) if not api_key_config: raise Exception("Master Agent 模型没有可用的 API Key") @@ -400,6 +400,7 @@ class MasterAgentRouter: # 调用模型 response = await llm.ainvoke(prompt) + ModelApiKeyService.record_api_key_usage(self.db, api_key_config.id) # 提取响应内容 if hasattr(response, 'content'): diff --git a/api/app/services/model_service.py b/api/app/services/model_service.py index dee6cd1d..d382b1b1 100644 --- a/api/app/services/model_service.py +++ b/api/app/services/model_service.py @@ -6,7 +6,7 @@ import math import time import asyncio -from app.models.models_model import ModelConfig, ModelApiKey, ModelType +from app.models.models_model import ModelConfig, ModelApiKey, ModelType, LoadBalanceStrategy from app.repositories.model_repository import ModelConfigRepository, ModelApiKeyRepository, ModelBaseRepository from app.schemas import model_schema from app.schemas.model_schema import ( @@ -633,19 +633,31 @@ class ModelApiKeyService: @staticmethod def get_available_api_key(db: Session, model_config_id: uuid.UUID) -> Optional[ModelApiKey]: - """获取可用的API Key(按优先级和负载均衡)""" - api_keys = ModelApiKeyRepository.get_by_model_config(db, model_config_id, is_active=True) + """获取可用的API Key(根据负载均衡策略)""" + model_config = ModelConfigRepository.get_by_id(db, model_config_id) + if not model_config: + return None + + api_keys = [key for key in model_config.api_keys if key.is_active] if not api_keys: return None - return min(api_keys, key=lambda x: int(x.usage_count or "0")) + + # 如果是轮询策略,按使用次数最少,次数相同则选最早使用的 + if model_config.load_balance_strategy == LoadBalanceStrategy.ROUND_ROBIN: + return min(api_keys, key=lambda x: (int(x.usage_count or "0"), x.last_used_at or datetime.min)) + + # 否则返回第一个 + return api_keys[0] @staticmethod - def record_api_key_usage(db: Session, api_key_id: uuid.UUID) -> bool: + def record_api_key_usage(db: Session, api_key_id: uuid.UUID | None) -> bool: """记录API Key使用""" - success = ModelApiKeyRepository.update_usage(db, api_key_id) - if success: - db.commit() - return success + if api_key_id: + success = ModelApiKeyRepository.update_usage(db, api_key_id) + if success: + db.commit() + return success + return False @staticmethod def get_a_api_key(db: Session, model_config_id: uuid.UUID) -> ModelApiKey: diff --git a/api/app/services/multi_agent_orchestrator.py b/api/app/services/multi_agent_orchestrator.py index b28bafbf..d1aa46d1 100644 --- a/api/app/services/multi_agent_orchestrator.py +++ b/api/app/services/multi_agent_orchestrator.py @@ -14,6 +14,7 @@ from app.services.conversation_state_manager import ConversationStateManager from app.core.exceptions import BusinessException, ResourceNotFoundException from app.core.error_codes import BizCode from app.core.logging_config import get_business_logger +from app.services.model_service import ModelApiKeyService logger = get_business_logger() @@ -2569,8 +2570,9 @@ class MultiAgentOrchestrator: # ModelConfig.id == default_model_config_id, # ModelApiKey.is_active.is_(True) # ).first() - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, default_model_config_id) - api_key_config = api_keys[0] if api_keys else None + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, default_model_config_id) + # api_key_config = api_keys[0] if api_keys else None + api_key_config = ModelApiKeyService.get_available_api_key(self.db, default_model_config_id) if not api_key_config: logger.warning("Master Agent 没有可用的 API Key,使用简单整合") @@ -2601,6 +2603,8 @@ class MultiAgentOrchestrator: # 调用模型进行整合 response = await llm.ainvoke(merge_prompt) + ModelApiKeyService.record_api_key_usage(self.db, api_key_config.id) + # 提取响应内容 if hasattr(response, 'content'): merged_response = response.content @@ -2730,8 +2734,9 @@ class MultiAgentOrchestrator: # ModelConfig.id == default_model_config_id, # ModelApiKey.is_active.is_(True) # ).first() - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, default_model_config_id) - api_key_config = api_keys[0] if api_keys else None + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, default_model_config_id) + # api_key_config = api_keys[0] if api_keys else None + api_key_config = ModelApiKeyService.get_available_api_key(self.db, default_model_config_id) if not api_key_config: logger.warning("Master Agent 没有可用的 API Key,使用简单整合") @@ -2790,6 +2795,8 @@ class MultiAgentOrchestrator: logger.debug(f"收到流式 chunk #{chunk_count}: {content[:30]}...") yield self._format_sse_event("message", {"content": content}) + ModelApiKeyService.record_api_key_usage(self.db, api_key_config.id) + logger.info(f"Master Agent 流式整合完成,共 {chunk_count} 个 chunks") except AttributeError as e: diff --git a/api/app/services/prompt_optimizer_service.py b/api/app/services/prompt_optimizer_service.py index 966ac6e0..99edcc0e 100644 --- a/api/app/services/prompt_optimizer_service.py +++ b/api/app/services/prompt_optimizer_service.py @@ -23,6 +23,7 @@ from app.repositories.prompt_optimizer_repository import ( PromptReleaseRepository ) from app.schemas.prompt_optimizer_schema import OptimizePromptResult +from app.services.model_service import ModelApiKeyService logger = get_business_logger() @@ -176,8 +177,9 @@ class PromptOptimizerService: logger.info(f"Prompt optimization started, user_id={user_id}, session_id={session_id}") # Create LLM instance - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config.id) - api_config: ModelApiKey = api_keys[0] if api_keys else None + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config.id) + # api_config: ModelApiKey = api_keys[0] if api_keys else None + api_config: ModelApiKey = ModelApiKeyService.get_available_api_key(self.db, model_config.id) llm = RedBearLLM(RedBearModelConfig( model_name=api_config.model_name, provider=api_config.provider, @@ -252,6 +254,7 @@ class PromptOptimizerService: optim_result = json_repair.repair_json(buffer, return_objects=True) # prompt = optim_result.get("prompt") desc = optim_result.get("desc") + ModelApiKeyService.record_api_key_usage(self.db, api_config.id) self.create_message( tenant_id=tenant_id, session_id=session_id, diff --git a/api/app/services/shared_chat_service.py b/api/app/services/shared_chat_service.py index a92c2649..6fa5961c 100644 --- a/api/app/services/shared_chat_service.py +++ b/api/app/services/shared_chat_service.py @@ -10,6 +10,7 @@ from app.services.memory_konwledges_server import write_rag from app.models import ReleaseShare, AppRelease, Conversation from app.services.conversation_service import ConversationService from app.services.draft_run_service import create_web_search_tool +from app.services.model_service import ModelApiKeyService from app.services.release_share_service import ReleaseShareService from app.core.exceptions import BusinessException, ResourceNotFoundException from app.core.error_codes import BizCode @@ -178,8 +179,9 @@ class SharedChatService: # .limit(1) # ) # api_key_obj = self.db.scalars(stmt).first() - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config_id) - api_key_obj = api_keys[0] if api_keys else None + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config_id) + # api_key_obj = api_keys[0] if api_keys else None + api_key_obj = ModelApiKeyService.get_available_api_key(self.db, model_config_id) if not api_key_obj: raise BusinessException("没有可用的 API Key", BizCode.AGENT_CONFIG_MISSING) @@ -309,6 +311,8 @@ class SharedChatService: elapsed_time = time.time() - start_time + ModelApiKeyService.record_api_key_usage(self.db, api_key_obj.id) + return { "conversation_id": conversation.id, @@ -383,8 +387,9 @@ class SharedChatService: # .limit(1) # ) # api_key_obj = self.db.scalars(stmt).first() - api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config_id) - api_key_obj = api_keys[0] if api_keys else None + # api_keys = ModelApiKeyRepository.get_by_model_config(self.db, model_config_id) + # api_key_obj = api_keys[0] if api_keys else None + api_key_obj = ModelApiKeyService.get_available_api_key(self.db, model_config_id) if not api_key_obj: raise BusinessException("没有可用的 API Key", BizCode.AGENT_CONFIG_MISSING) @@ -513,7 +518,8 @@ class SharedChatService: } ) - + ModelApiKeyService.record_api_key_usage(self.db, api_key_obj.id) + # 发送结束事件 end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)} yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n"