refactor: extract app log SQL queries to Service and Repository layers

This commit is contained in:
wxy
2026-03-30 18:27:44 +08:00
parent 83774d7443
commit 4974f9aa98
3 changed files with 268 additions and 59 deletions

View File

@@ -3,17 +3,16 @@ import uuid
from typing import Optional from typing import Optional
from fastapi import APIRouter, Depends, Query from fastapi import APIRouter, Depends, Query
from sqlalchemy import select, desc, func
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.core.response_utils import success from app.core.response_utils import success
from app.db import get_db from app.db import get_db
from app.dependencies import get_current_user, cur_workspace_access_guard from app.dependencies import get_current_user, cur_workspace_access_guard
from app.models.conversation_model import Conversation, Message from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail
from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage
from app.schemas.response_schema import PageData, PageMeta from app.schemas.response_schema import PageData, PageMeta
from app.services.app_service import AppService from app.services.app_service import AppService
from app.services.app_log_service import AppLogService
router = APIRouter(prefix="/apps", tags=["App Logs"]) router = APIRouter(prefix="/apps", tags=["App Logs"])
logger = get_business_logger() logger = get_business_logger()
@@ -38,35 +37,22 @@ def list_app_logs(
workspace_id = current_user.current_workspace_id workspace_id = current_user.current_workspace_id
# 验证应用访问权限 # 验证应用访问权限
service = AppService(db) app_service = AppService(db)
service.get_app(app_id, workspace_id) app_service.get_app(app_id, workspace_id)
stmt = select(Conversation).where( # 使用 Service 层查询
Conversation.app_id == app_id, log_service = AppLogService(db)
Conversation.workspace_id == workspace_id, conversations, total = log_service.list_conversations(
Conversation.is_active.is_(True), app_id=app_id,
workspace_id=workspace_id,
page=page,
pagesize=pagesize,
is_draft=is_draft
) )
if is_draft is not None:
stmt = stmt.where(Conversation.is_draft == is_draft)
total = int(db.execute(
select(func.count()).select_from(stmt.subquery())
).scalar_one())
stmt = stmt.order_by(desc(Conversation.updated_at))
stmt = stmt.offset((page - 1) * pagesize).limit(pagesize)
conversations = list(db.scalars(stmt).all())
items = [AppLogConversation.model_validate(c) for c in conversations] items = [AppLogConversation.model_validate(c) for c in conversations]
meta = PageMeta(page=page, pagesize=pagesize, total=total, hasnext=(page * pagesize) < total) meta = PageMeta(page=page, pagesize=pagesize, total=total, hasnext=(page * pagesize) < total)
logger.info(
"查询应用日志会话列表",
extra={"app_id": str(app_id), "total": total, "page": page}
)
return success(data=PageData(page=meta, items=items)) return success(data=PageData(page=meta, items=items))
@@ -87,40 +73,17 @@ def get_app_log_detail(
workspace_id = current_user.current_workspace_id workspace_id = current_user.current_workspace_id
# 验证应用访问权限 # 验证应用访问权限
service = AppService(db) app_service = AppService(db)
service.get_app(app_id, workspace_id) app_service.get_app(app_id, workspace_id)
# 查询会话(确保属于该应用和工作空间) # 使用 Service 层查询
conversation = db.scalars( log_service = AppLogService(db)
select(Conversation).where( conversation = log_service.get_conversation_detail(
Conversation.id == conversation_id, app_id=app_id,
Conversation.app_id == app_id, conversation_id=conversation_id,
Conversation.workspace_id == workspace_id, workspace_id=workspace_id
Conversation.is_active.is_(True),
)
).first()
if not conversation:
from app.core.exceptions import ResourceNotFoundException
raise ResourceNotFoundException("会话", str(conversation_id))
# 查询消息(按时间正序)
messages = list(db.scalars(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at)
).all())
detail = AppLogConversationDetail.model_validate(conversation)
detail.messages = [AppLogMessage.model_validate(m) for m in messages]
logger.info(
"查询应用日志会话详情",
extra={
"app_id": str(app_id),
"conversation_id": str(conversation_id),
"message_count": len(messages)
}
) )
detail = AppLogConversationDetail.model_validate(conversation)
return success(data=detail) return success(data=detail)

View File

@@ -199,6 +199,96 @@ class ConversationRepository:
) )
return conversations, total return conversations, total
def list_app_conversations(
self,
app_id: uuid.UUID,
workspace_id: uuid.UUID,
is_draft: Optional[bool] = None,
page: int = 1,
pagesize: int = 20
) -> tuple[list[Conversation], int]:
"""
查询应用日志会话列表(带分页和过滤)
Args:
app_id: 应用 ID
workspace_id: 工作空间 ID
is_draft: 是否草稿会话None 表示不过滤)
page: 页码(从 1 开始)
pagesize: 每页数量
Returns:
Tuple[List[Conversation], int]: (会话列表,总数)
"""
stmt = select(Conversation).where(
Conversation.app_id == app_id,
Conversation.workspace_id == workspace_id,
Conversation.is_active.is_(True)
)
if is_draft is not None:
stmt = stmt.where(Conversation.is_draft == is_draft)
# Calculate total number of records
total = int(self.db.execute(
select(func.count()).select_from(stmt.subquery())
).scalar_one())
# Apply pagination
stmt = stmt.order_by(desc(Conversation.updated_at))
stmt = stmt.offset((page - 1) * pagesize).limit(pagesize)
conversations = list(self.db.scalars(stmt).all())
logger.info(
"Listed app conversations successfully",
extra={
"app_id": str(app_id),
"workspace_id": str(workspace_id),
"returned": len(conversations),
"total": total
}
)
return conversations, total
def get_conversation_for_app_log(
self,
conversation_id: uuid.UUID,
app_id: uuid.UUID,
workspace_id: uuid.UUID
) -> Conversation:
"""
查询应用日志的会话详情
Args:
conversation_id: 会话 ID
app_id: 应用 ID
workspace_id: 工作空间 ID
Returns:
Conversation: 会话对象
Raises:
ResourceNotFoundException: 当会话不存在时
"""
logger.info(f"Fetching conversation for app log: {conversation_id}")
stmt = select(Conversation).where(
Conversation.id == conversation_id,
Conversation.app_id == app_id,
Conversation.workspace_id == workspace_id,
Conversation.is_active.is_(True)
)
conversation = self.db.scalars(stmt).first()
if not conversation:
logger.warning(f"Conversation not found: {conversation_id}")
raise ResourceNotFoundException("会话", str(conversation_id))
logger.info(f"Conversation fetched successfully: {conversation_id}")
return conversation
def soft_delete_conversation_by_conversation_id( def soft_delete_conversation_by_conversation_id(
self, self,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
@@ -290,6 +380,34 @@ class MessageRepository:
self.db.add(message) self.db.add(message)
return message return message
def get_messages_by_conversation(
self,
conversation_id: uuid.UUID
) -> list[Message]:
"""
查询会话的所有消息(按时间正序)
Args:
conversation_id: 会话 ID
Returns:
List[Message]: 消息列表
"""
stmt = select(Message).where(
Message.conversation_id == conversation_id
).order_by(Message.created_at)
messages = list(self.db.scalars(stmt).all())
logger.info(
"Fetched messages for conversation",
extra={
"conversation_id": str(conversation_id),
"message_count": len(messages)
}
)
return messages
def get_message_by_conversation_id( def get_message_by_conversation_id(
self, self,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,

View File

@@ -0,0 +1,128 @@
"""应用日志服务层"""
import uuid
from typing import Optional, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger
from app.models.conversation_model import Conversation, Message
from app.repositories.conversation_repository import ConversationRepository, MessageRepository
logger = get_business_logger()
class AppLogService:
"""应用日志服务"""
def __init__(self, db: Session):
self.db = db
self.conversation_repository = ConversationRepository(db)
self.message_repository = MessageRepository(db)
def list_conversations(
self,
app_id: uuid.UUID,
workspace_id: uuid.UUID,
page: int = 1,
pagesize: int = 20,
is_draft: Optional[bool] = None,
) -> Tuple[list[Conversation], int]:
"""
查询应用日志会话列表
Args:
app_id: 应用 ID
workspace_id: 工作空间 ID
page: 页码(从 1 开始)
pagesize: 每页数量
is_draft: 是否草稿会话None 表示不过滤)
Returns:
Tuple[list[Conversation], int]: (会话列表,总数)
"""
logger.info(
"查询应用日志会话列表",
extra={
"app_id": str(app_id),
"workspace_id": str(workspace_id),
"page": page,
"pagesize": pagesize,
"is_draft": is_draft
}
)
# 使用 Repository 查询
conversations, total = self.conversation_repository.list_app_conversations(
app_id=app_id,
workspace_id=workspace_id,
is_draft=is_draft,
page=page,
pagesize=pagesize
)
logger.info(
"查询应用日志会话列表成功",
extra={
"app_id": str(app_id),
"total": total,
"returned": len(conversations)
}
)
return conversations, total
def get_conversation_detail(
self,
app_id: uuid.UUID,
conversation_id: uuid.UUID,
workspace_id: uuid.UUID
) -> Conversation:
"""
查询会话详情(包含消息)
Args:
app_id: 应用 ID
conversation_id: 会话 ID
workspace_id: 工作空间 ID
Returns:
Conversation: 包含消息的会话对象
Raises:
ResourceNotFoundException: 当会话不存在时
"""
logger.info(
"查询应用日志会话详情",
extra={
"app_id": str(app_id),
"conversation_id": str(conversation_id),
"workspace_id": str(workspace_id)
}
)
# 查询会话
conversation = self.conversation_repository.get_conversation_for_app_log(
conversation_id=conversation_id,
app_id=app_id,
workspace_id=workspace_id
)
# 查询消息(按时间正序)
messages = self.message_repository.get_messages_by_conversation(
conversation_id=conversation_id
)
# 将消息附加到会话对象
conversation.messages = messages
logger.info(
"查询应用日志会话详情成功",
extra={
"app_id": str(app_id),
"conversation_id": str(conversation_id),
"message_count": len(messages)
}
)
return conversation