From 4974f9aa98568b89cbc21e2ac85618ab4e006919 Mon Sep 17 00:00:00 2001 From: wxy Date: Mon, 30 Mar 2026 18:27:44 +0800 Subject: [PATCH] refactor: extract app log SQL queries to Service and Repository layers --- api/app/controllers/app_log_controller.py | 81 +++-------- .../repositories/conversation_repository.py | 118 ++++++++++++++++ api/app/services/app_log_service.py | 128 ++++++++++++++++++ 3 files changed, 268 insertions(+), 59 deletions(-) create mode 100644 api/app/services/app_log_service.py diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index adf90ca4..92b5becd 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -3,17 +3,16 @@ import uuid from typing import Optional from fastapi import APIRouter, Depends, Query -from sqlalchemy import select, desc, func from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger from app.core.response_utils import success from app.db import get_db from app.dependencies import get_current_user, cur_workspace_access_guard -from app.models.conversation_model import Conversation, Message -from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage +from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail from app.schemas.response_schema import PageData, PageMeta from app.services.app_service import AppService +from app.services.app_log_service import AppLogService router = APIRouter(prefix="/apps", tags=["App Logs"]) logger = get_business_logger() @@ -38,35 +37,22 @@ def list_app_logs( workspace_id = current_user.current_workspace_id # 验证应用访问权限 - service = AppService(db) - service.get_app(app_id, workspace_id) + app_service = AppService(db) + app_service.get_app(app_id, workspace_id) - stmt = select(Conversation).where( - Conversation.app_id == app_id, - Conversation.workspace_id == workspace_id, - Conversation.is_active.is_(True), + # 使用 Service 层查询 + log_service = AppLogService(db) + conversations, total = log_service.list_conversations( + app_id=app_id, + workspace_id=workspace_id, + page=page, + pagesize=pagesize, + is_draft=is_draft ) - if is_draft is not None: - stmt = stmt.where(Conversation.is_draft == is_draft) - - total = int(db.execute( - select(func.count()).select_from(stmt.subquery()) - ).scalar_one()) - - stmt = stmt.order_by(desc(Conversation.updated_at)) - stmt = stmt.offset((page - 1) * pagesize).limit(pagesize) - - conversations = list(db.scalars(stmt).all()) - items = [AppLogConversation.model_validate(c) for c in conversations] meta = PageMeta(page=page, pagesize=pagesize, total=total, hasnext=(page * pagesize) < total) - logger.info( - "查询应用日志会话列表", - extra={"app_id": str(app_id), "total": total, "page": page} - ) - return success(data=PageData(page=meta, items=items)) @@ -87,40 +73,17 @@ def get_app_log_detail( workspace_id = current_user.current_workspace_id # 验证应用访问权限 - service = AppService(db) - service.get_app(app_id, workspace_id) + app_service = AppService(db) + app_service.get_app(app_id, workspace_id) - # 查询会话(确保属于该应用和工作空间) - conversation = db.scalars( - select(Conversation).where( - Conversation.id == conversation_id, - Conversation.app_id == app_id, - Conversation.workspace_id == workspace_id, - Conversation.is_active.is_(True), - ) - ).first() - - if not conversation: - from app.core.exceptions import ResourceNotFoundException - raise ResourceNotFoundException("会话", str(conversation_id)) - - # 查询消息(按时间正序) - messages = list(db.scalars( - select(Message) - .where(Message.conversation_id == conversation_id) - .order_by(Message.created_at) - ).all()) - - detail = AppLogConversationDetail.model_validate(conversation) - detail.messages = [AppLogMessage.model_validate(m) for m in messages] - - logger.info( - "查询应用日志会话详情", - extra={ - "app_id": str(app_id), - "conversation_id": str(conversation_id), - "message_count": len(messages) - } + # 使用 Service 层查询 + log_service = AppLogService(db) + conversation = log_service.get_conversation_detail( + app_id=app_id, + conversation_id=conversation_id, + workspace_id=workspace_id ) + detail = AppLogConversationDetail.model_validate(conversation) + return success(data=detail) diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py index 90f2d6ec..0676a255 100644 --- a/api/app/repositories/conversation_repository.py +++ b/api/app/repositories/conversation_repository.py @@ -199,6 +199,96 @@ class ConversationRepository: ) return conversations, total + def list_app_conversations( + self, + app_id: uuid.UUID, + workspace_id: uuid.UUID, + is_draft: Optional[bool] = None, + page: int = 1, + pagesize: int = 20 + ) -> tuple[list[Conversation], int]: + """ + 查询应用日志会话列表(带分页和过滤) + + Args: + app_id: 应用 ID + workspace_id: 工作空间 ID + is_draft: 是否草稿会话(None 表示不过滤) + page: 页码(从 1 开始) + pagesize: 每页数量 + + Returns: + Tuple[List[Conversation], int]: (会话列表,总数) + """ + stmt = select(Conversation).where( + Conversation.app_id == app_id, + Conversation.workspace_id == workspace_id, + Conversation.is_active.is_(True) + ) + + if is_draft is not None: + stmt = stmt.where(Conversation.is_draft == is_draft) + + # Calculate total number of records + total = int(self.db.execute( + select(func.count()).select_from(stmt.subquery()) + ).scalar_one()) + + # Apply pagination + stmt = stmt.order_by(desc(Conversation.updated_at)) + stmt = stmt.offset((page - 1) * pagesize).limit(pagesize) + + conversations = list(self.db.scalars(stmt).all()) + + logger.info( + "Listed app conversations successfully", + extra={ + "app_id": str(app_id), + "workspace_id": str(workspace_id), + "returned": len(conversations), + "total": total + } + ) + return conversations, total + + def get_conversation_for_app_log( + self, + conversation_id: uuid.UUID, + app_id: uuid.UUID, + workspace_id: uuid.UUID + ) -> Conversation: + """ + 查询应用日志的会话详情 + + Args: + conversation_id: 会话 ID + app_id: 应用 ID + workspace_id: 工作空间 ID + + Returns: + Conversation: 会话对象 + + Raises: + ResourceNotFoundException: 当会话不存在时 + """ + logger.info(f"Fetching conversation for app log: {conversation_id}") + + stmt = select(Conversation).where( + Conversation.id == conversation_id, + Conversation.app_id == app_id, + Conversation.workspace_id == workspace_id, + Conversation.is_active.is_(True) + ) + + conversation = self.db.scalars(stmt).first() + + if not conversation: + logger.warning(f"Conversation not found: {conversation_id}") + raise ResourceNotFoundException("会话", str(conversation_id)) + + logger.info(f"Conversation fetched successfully: {conversation_id}") + return conversation + def soft_delete_conversation_by_conversation_id( self, conversation_id: uuid.UUID, @@ -290,6 +380,34 @@ class MessageRepository: self.db.add(message) return message + def get_messages_by_conversation( + self, + conversation_id: uuid.UUID + ) -> list[Message]: + """ + 查询会话的所有消息(按时间正序) + + Args: + conversation_id: 会话 ID + + Returns: + List[Message]: 消息列表 + """ + stmt = select(Message).where( + Message.conversation_id == conversation_id + ).order_by(Message.created_at) + + messages = list(self.db.scalars(stmt).all()) + + logger.info( + "Fetched messages for conversation", + extra={ + "conversation_id": str(conversation_id), + "message_count": len(messages) + } + ) + return messages + def get_message_by_conversation_id( self, conversation_id: uuid.UUID, diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py new file mode 100644 index 00000000..856045d1 --- /dev/null +++ b/api/app/services/app_log_service.py @@ -0,0 +1,128 @@ +"""应用日志服务层""" +import uuid +from typing import Optional, Tuple +from datetime import datetime + +from sqlalchemy.orm import Session + +from app.core.logging_config import get_business_logger +from app.models.conversation_model import Conversation, Message +from app.repositories.conversation_repository import ConversationRepository, MessageRepository + +logger = get_business_logger() + + +class AppLogService: + """应用日志服务""" + + def __init__(self, db: Session): + self.db = db + self.conversation_repository = ConversationRepository(db) + self.message_repository = MessageRepository(db) + + def list_conversations( + self, + app_id: uuid.UUID, + workspace_id: uuid.UUID, + page: int = 1, + pagesize: int = 20, + is_draft: Optional[bool] = None, + ) -> Tuple[list[Conversation], int]: + """ + 查询应用日志会话列表 + + Args: + app_id: 应用 ID + workspace_id: 工作空间 ID + page: 页码(从 1 开始) + pagesize: 每页数量 + is_draft: 是否草稿会话(None 表示不过滤) + + Returns: + Tuple[list[Conversation], int]: (会话列表,总数) + """ + logger.info( + "查询应用日志会话列表", + extra={ + "app_id": str(app_id), + "workspace_id": str(workspace_id), + "page": page, + "pagesize": pagesize, + "is_draft": is_draft + } + ) + + # 使用 Repository 查询 + conversations, total = self.conversation_repository.list_app_conversations( + app_id=app_id, + workspace_id=workspace_id, + is_draft=is_draft, + page=page, + pagesize=pagesize + ) + + logger.info( + "查询应用日志会话列表成功", + extra={ + "app_id": str(app_id), + "total": total, + "returned": len(conversations) + } + ) + + return conversations, total + + def get_conversation_detail( + self, + app_id: uuid.UUID, + conversation_id: uuid.UUID, + workspace_id: uuid.UUID + ) -> Conversation: + """ + 查询会话详情(包含消息) + + Args: + app_id: 应用 ID + conversation_id: 会话 ID + workspace_id: 工作空间 ID + + Returns: + Conversation: 包含消息的会话对象 + + Raises: + ResourceNotFoundException: 当会话不存在时 + """ + logger.info( + "查询应用日志会话详情", + extra={ + "app_id": str(app_id), + "conversation_id": str(conversation_id), + "workspace_id": str(workspace_id) + } + ) + + # 查询会话 + conversation = self.conversation_repository.get_conversation_for_app_log( + conversation_id=conversation_id, + app_id=app_id, + workspace_id=workspace_id + ) + + # 查询消息(按时间正序) + messages = self.message_repository.get_messages_by_conversation( + conversation_id=conversation_id + ) + + # 将消息附加到会话对象 + conversation.messages = messages + + logger.info( + "查询应用日志会话详情成功", + extra={ + "app_id": str(app_id), + "conversation_id": str(conversation_id), + "message_count": len(messages) + } + ) + + return conversation