feat(workflow): augment logging queries and ameliorate error handling
- Augment log search with app type filtering to enable keyword searching within workflow_executions. - Introduce execution sequence markers to ensure logs are displayed in the correct chronological order. - Ameliorate error handling to capture successful node outputs alongside failure details. - Rectify the processing of empty JSON bodies in HTTP request nodes.
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import select, desc, func
|
||||
from sqlalchemy import select, desc, func, or_, cast, Text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.exceptions import ResourceNotFoundException
|
||||
from app.core.logging_config import get_db_logger
|
||||
from app.models import Conversation, Message
|
||||
from app.models.app_model import AppType
|
||||
from app.models.conversation_model import ConversationDetail
|
||||
from app.models.workflow_model import WorkflowExecution
|
||||
|
||||
logger = get_db_logger()
|
||||
|
||||
@@ -206,7 +208,8 @@ class ConversationRepository:
|
||||
is_draft: Optional[bool] = None,
|
||||
keyword: Optional[str] = None,
|
||||
page: int = 1,
|
||||
pagesize: int = 20
|
||||
pagesize: int = 20,
|
||||
app_type: Optional[str] = None,
|
||||
) -> tuple[list[Conversation], int]:
|
||||
"""
|
||||
查询应用日志会话列表(带分页和过滤)
|
||||
@@ -218,6 +221,9 @@ class ConversationRepository:
|
||||
keyword: 搜索关键词(匹配消息内容)
|
||||
page: 页码(从 1 开始)
|
||||
pagesize: 每页数量
|
||||
app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的
|
||||
input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表);
|
||||
其他类型仍走 messages 表。
|
||||
|
||||
Returns:
|
||||
Tuple[List[Conversation], int]: (会话列表,总数)
|
||||
@@ -234,12 +240,28 @@ class ConversationRepository:
|
||||
|
||||
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
|
||||
if keyword:
|
||||
# 查找包含关键词的 conversation_id 列表
|
||||
keyword_stmt = (
|
||||
select(Message.conversation_id)
|
||||
.where(Message.content.ilike(f"%{keyword}%"))
|
||||
.distinct()
|
||||
)
|
||||
kw_pattern = f"%{keyword}%"
|
||||
if app_type == AppType.WORKFLOW:
|
||||
# 工作流:从 workflow_executions 的 input_data / output_data 匹配
|
||||
# (messages 表只存开场白 assistant 消息,失败的工作流也不会写入)
|
||||
keyword_stmt = (
|
||||
select(WorkflowExecution.conversation_id)
|
||||
.where(
|
||||
WorkflowExecution.conversation_id.is_not(None),
|
||||
or_(
|
||||
cast(WorkflowExecution.input_data, Text).ilike(kw_pattern),
|
||||
cast(WorkflowExecution.output_data, Text).ilike(kw_pattern),
|
||||
),
|
||||
)
|
||||
.distinct()
|
||||
)
|
||||
else:
|
||||
# Agent 等其他类型:仍走 messages 表(user + assistant 内容)
|
||||
keyword_stmt = (
|
||||
select(Message.conversation_id)
|
||||
.where(Message.content.ilike(kw_pattern))
|
||||
.distinct()
|
||||
)
|
||||
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
|
||||
|
||||
# Calculate total number of records
|
||||
|
||||
Reference in New Issue
Block a user