Merge pull request #985 from wanxunyang/develop-wxy

feat: enhance workflow debugging, logging and auth middleware
This commit is contained in:
山程漫悟
2026-04-24 10:17:32 +08:00
committed by GitHub
14 changed files with 358 additions and 41 deletions

View File

@@ -24,15 +24,18 @@ def list_app_logs(
app_id: uuid.UUID, app_id: uuid.UUID,
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
pagesize: int = Query(20, ge=1, le=100), pagesize: int = Query(20, ge=1, le=100),
is_draft: Optional[bool] = None, is_draft: Optional[bool] = Query(None, description="是否草稿会话(不传则返回全部)"),
keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
"""查看应用下所有会话记录(分页) """查看应用下所有会话记录(分页)
- 支持按 is_draft 筛选(草稿会话 / 发布会话 - is_draft 不传则返回所有会话(草稿 + 正式
- is_draft=True 只返回草稿会话
- is_draft=False 只返回发布会话
- 支持按 keyword 搜索(匹配消息内容)
- 按最新更新时间倒序排列 - 按最新更新时间倒序排列
- 所有人(包括共享者和被共享者)都只能查看自己的会话记录
""" """
workspace_id = current_user.current_workspace_id workspace_id = current_user.current_workspace_id
@@ -47,7 +50,8 @@ def list_app_logs(
workspace_id=workspace_id, workspace_id=workspace_id,
page=page, page=page,
pagesize=pagesize, pagesize=pagesize,
is_draft=is_draft is_draft=is_draft,
keyword=keyword
) )
items = [AppLogConversation.model_validate(c) for c in conversations] items = [AppLogConversation.model_validate(c) for c in conversations]
@@ -78,12 +82,13 @@ def get_app_log_detail(
# 使用 Service 层查询 # 使用 Service 层查询
log_service = AppLogService(db) log_service = AppLogService(db)
conversation = log_service.get_conversation_detail( conversation, node_executions_map = log_service.get_conversation_detail(
app_id=app_id, app_id=app_id,
conversation_id=conversation_id, conversation_id=conversation_id,
workspace_id=workspace_id workspace_id=workspace_id
) )
detail = AppLogConversationDetail.model_validate(conversation) detail = AppLogConversationDetail.model_validate(conversation)
detail.node_executions_map = node_executions_map
return success(data=detail) return success(data=detail)

View File

@@ -373,7 +373,6 @@ def delete_composite_model(
@router.put("/{model_id}", response_model=ApiResponse) @router.put("/{model_id}", response_model=ApiResponse)
@check_model_activation_quota
def update_model( def update_model(
model_id: uuid.UUID, model_id: uuid.UUID,
model_data: model_schema.ModelConfigUpdate, model_data: model_schema.ModelConfigUpdate,

View File

@@ -70,6 +70,8 @@ def require_api_key(
}) })
raise BusinessException("API Key 无效或已过期", BizCode.API_KEY_INVALID) raise BusinessException("API Key 无效或已过期", BizCode.API_KEY_INVALID)
ApiKeyAuthService.check_app_published(db, api_key_obj)
if scopes: if scopes:
missing_scopes = [] missing_scopes = []
for scope in scopes: for scope in scopes:

View File

@@ -66,6 +66,7 @@ class BizCode(IntEnum):
PERMISSION_DENIED = 6010 PERMISSION_DENIED = 6010
INVALID_CONVERSATION = 6011 INVALID_CONVERSATION = 6011
CONFIG_MISSING = 6012 CONFIG_MISSING = 6012
APP_NOT_PUBLISHED = 6013
# 模型7xxx # 模型7xxx
MODEL_CONFIG_INVALID = 7001 MODEL_CONFIG_INVALID = 7001

View File

@@ -156,8 +156,13 @@ class DifyConverter(BaseConverter):
def replacer(match: re.Match) -> str: def replacer(match: re.Match) -> str:
raw_name = match.group(1) raw_name = match.group(1)
new_name = self.process_var_selector(raw_name) try:
return f"{{{{{new_name}}}}}" new_name = self.process_var_selector(raw_name)
if not new_name:
return match.group(0)
return f"{{{{{new_name}}}}}"
except Exception:
return match.group(0)
return pattern.sub(replacer, content) return pattern.sub(replacer, content)
@@ -633,8 +638,15 @@ class DifyConverter(BaseConverter):
] = self.trans_variable_format(content["value"]) ] = self.trans_variable_format(content["value"])
else: else:
if node_data["body"]["data"]: if node_data["body"]["data"]:
body_content = (node_data["body"]["data"][0].get("value") or data_entry = node_data["body"]["data"][0]
self._process_list_variable_literal(node_data["body"]["data"][0].get("file"))) body_content = data_entry.get("value")
if not body_content and data_entry.get("file"):
body_content = self._process_list_variable_literal(data_entry.get("file"))
if not body_content:
body_content = ""
elif isinstance(body_content, str):
# Convert session variable format for JSON body
body_content = self.trans_variable_format(body_content)
else: else:
body_content = "" body_content = ""

View File

@@ -167,8 +167,9 @@ class EventStreamHandler:
"node_id": node_id, "node_id": node_id,
"status": "failed", "status": "failed",
"input": data.get("input_data"), "input": data.get("input_data"),
"elapsed_time": data.get("elapsed_time"),
"output": None, "output": None,
"process": data.get("process_data"),
"elapsed_time": data.get("elapsed_time"),
"error": data.get("error") "error": data.get("error")
} }
} }
@@ -266,6 +267,7 @@ class EventStreamHandler:
).timestamp() * 1000), ).timestamp() * 1000),
"input": result.get("node_outputs", {}).get(node_name, {}).get("input"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input"),
"output": result.get("node_outputs", {}).get(node_name, {}).get("output"), "output": result.get("node_outputs", {}).get(node_name, {}).get("output"),
"process": result.get("node_outputs", {}).get(node_name, {}).get("process"),
"elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"), "elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"),
"token_usage": result.get("node_outputs", {}).get(node_name, {}).get("token_usage") "token_usage": result.get("node_outputs", {}).get(node_name, {}).get("token_usage")
} }

View File

@@ -132,6 +132,11 @@ class HttpErrorDefaultTemplate(BaseModel):
description="Default HTTP headers returned on error", description="Default HTTP headers returned on error",
) )
files: list = Field(
default_factory=list,
description="Default files list returned on error",
)
output: str = Field( output: str = Field(
default="SUCCESS", default="SUCCESS",
description="HTTP response body", description="HTTP response body",
@@ -246,6 +251,13 @@ class HttpRequestNodeConfig(BaseNodeConfig):
} }
class HttpRequestDataProcessing(BaseModel):
request: str = Field(
default="",
description="Raw HTTP request format for debugging",
)
class HttpRequestNodeOutput(BaseModel): class HttpRequestNodeOutput(BaseModel):
body: str = Field( body: str = Field(
..., ...,

View File

@@ -160,6 +160,7 @@ class HttpRequestNode(BaseNode):
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any], down_stream_nodes: list[str]): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any], down_stream_nodes: list[str]):
super().__init__(node_config, workflow_config, down_stream_nodes) super().__init__(node_config, workflow_config, down_stream_nodes)
self.typed_config: HttpRequestNodeConfig | None = None self.typed_config: HttpRequestNodeConfig | None = None
self.last_request: str = ""
def _output_types(self) -> dict[str, VariableType]: def _output_types(self) -> dict[str, VariableType]:
return { return {
@@ -170,6 +171,47 @@ class HttpRequestNode(BaseNode):
"output": VariableType.STRING "output": VariableType.STRING
} }
def _extract_output(self, business_result: Any) -> Any:
if isinstance(business_result, dict):
result = {k: v for k, v in business_result.items() if k != "request"}
return result
return business_result
def _extract_extra_fields(self, business_result: Any) -> dict[str, Any]:
if isinstance(business_result, dict) and "request" in business_result:
return {
"process": {
"request": business_result.get("request", "")
}
}
return {}
def _wrap_error(
self,
error_message: str,
elapsed_time: float,
state: WorkflowState,
variable_pool: VariablePool
) -> dict[str, Any]:
input_data = self._extract_input(state, variable_pool)
node_output = {
"node_id": self.node_id,
"node_type": self.node_type,
"node_name": self.node_name,
"status": "failed",
"input": input_data,
"output": None,
"process": {"request": self.last_request} if self.last_request else None,
"elapsed_time": elapsed_time,
"token_usage": None,
"error": error_message
}
return {
"node_outputs": {self.node_id: node_output},
"error": error_message,
"error_node": self.node_id
}
def _build_timeout(self) -> Timeout: def _build_timeout(self) -> Timeout:
""" """
Build httpx Timeout configuration. Build httpx Timeout configuration.
@@ -255,9 +297,13 @@ class HttpRequestNode(BaseNode):
case HttpContentType.NONE: case HttpContentType.NONE:
return {} return {}
case HttpContentType.JSON: case HttpContentType.JSON:
content["json"] = json.loads(self._render_template( rendered_body = self._render_template(
self.typed_config.body.data, variable_pool self.typed_config.body.data, variable_pool
)) ).strip()
if not rendered_body:
content["json"] = {}
else:
content["json"] = json.loads(rendered_body)
case HttpContentType.FROM_DATA: case HttpContentType.FROM_DATA:
data = {} data = {}
files = [] files = []
@@ -325,6 +371,62 @@ class HttpRequestNode(BaseNode):
case _: case _:
raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}") raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}")
def _generate_raw_request(
self,
variable_pool: VariablePool,
url: str,
headers: dict[str, str],
params: dict[str, str],
content: dict[str, Any]
) -> str:
"""
Generate raw HTTP request format for debugging.
Args:
variable_pool: Variable Pool
url: Rendered URL
headers: Request headers
params: Query parameters
content: Request body content
Returns:
Raw HTTP request string
"""
method = self.typed_config.method.value
if params:
param_str = "&".join([f"{k}={v}" for k, v in params.items()])
full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}"
else:
full_url = url
lines = [f"{method} {full_url} HTTP/1.1"]
for key, value in headers.items():
lines.append(f"{key}: {value}")
if "json" in content and content["json"]:
json_body = json.dumps(content["json"], ensure_ascii=False)
lines.append(f"Content-Length: {len(json_body)}")
lines.append("")
lines.append(json_body)
elif "data" in content and "files" not in content:
if isinstance(content["data"], dict):
body_str = "&".join([f"{k}={v}" for k, v in content["data"].items()])
lines.append(f"Content-Length: {len(body_str)}")
lines.append("")
lines.append(body_str)
elif "content" in content:
lines.append(f"Content-Length: {len(content['content'])}")
lines.append("")
lines.append(content["content"])
elif "files" in content:
lines.append("Content-Length: 0")
lines.append("")
lines.append("# Note: This request includes file uploads")
return "\r\n".join(lines)
async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str: async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str:
""" """
Execute the HTTP request node. Execute the HTTP request node.
@@ -343,11 +445,25 @@ class HttpRequestNode(BaseNode):
- str: Branch identifier (e.g. "ERROR") when branching is enabled - str: Branch identifier (e.g. "ERROR") when branching is enabled
""" """
self.typed_config = HttpRequestNodeConfig(**self.config) self.typed_config = HttpRequestNodeConfig(**self.config)
# Build request components
headers = self._build_header(variable_pool) | self._build_auth(variable_pool)
params = self._build_params(variable_pool)
content = await self._build_content(variable_pool)
url = self._render_template(self.typed_config.url, variable_pool)
logger.info(f"Node {self.node_id}: headers={headers}, params={params}, content keys={list(content.keys())}")
# Generate raw HTTP request for debugging
raw_request = self._generate_raw_request(variable_pool, url, headers, params, content)
self.last_request = raw_request
logger.info(f"Node {self.node_id}: Generated HTTP request:\n{raw_request}")
async with httpx.AsyncClient( async with httpx.AsyncClient(
verify=self.typed_config.verify_ssl, verify=self.typed_config.verify_ssl,
timeout=self._build_timeout(), timeout=self._build_timeout(),
headers=self._build_header(variable_pool) | self._build_auth(variable_pool), headers=headers,
params=self._build_params(variable_pool), params=params,
follow_redirects=True follow_redirects=True
) as client: ) as client:
retries = self.typed_config.retry.max_attempts retries = self.typed_config.retry.max_attempts
@@ -355,18 +471,21 @@ class HttpRequestNode(BaseNode):
try: try:
request_func = self._get_client_method(client) request_func = self._get_client_method(client)
resp = await request_func( resp = await request_func(
url=self._render_template(self.typed_config.url, variable_pool), url=url,
**(await self._build_content(variable_pool)) **content
) )
resp.raise_for_status() resp.raise_for_status()
logger.info(f"Node {self.node_id}: HTTP request succeeded") logger.info(f"Node {self.node_id}: HTTP request succeeded")
response = HttpResponse(resp) response = HttpResponse(resp)
return HttpRequestNodeOutput( return {
body=response.body, **HttpRequestNodeOutput(
status_code=resp.status_code, body=response.body,
headers=resp.headers, status_code=resp.status_code,
files=response.files headers=resp.headers,
).model_dump() files=response.files
).model_dump(),
"request": raw_request
}
except (httpx.HTTPStatusError, httpx.RequestError) as e: except (httpx.HTTPStatusError, httpx.RequestError) as e:
logger.error(f"HTTP request node exception: {e}") logger.error(f"HTTP request node exception: {e}")
retries -= 1 retries -= 1
@@ -382,10 +501,19 @@ class HttpRequestNode(BaseNode):
logger.warning( logger.warning(
f"Node {self.node_id}: HTTP request failed, returning default result" f"Node {self.node_id}: HTTP request failed, returning default result"
) )
return self.typed_config.error_handle.default.model_dump() error_result = self.typed_config.error_handle.default.model_dump()
error_result["request"] = raw_request
return error_result
case HttpErrorHandle.BRANCH: case HttpErrorHandle.BRANCH:
logger.warning( logger.warning(
f"Node {self.node_id}: HTTP request failed, switching to error handling branch" f"Node {self.node_id}: HTTP request failed, switching to error handling branch"
) )
return {"output": "ERROR"} return {
"output": "ERROR",
"body": "",
"status_code": 500,
"headers": {},
"files": [],
"request": raw_request
}
raise RuntimeError("http request failed") raise RuntimeError("http request failed")

View File

@@ -564,6 +564,7 @@ async def get_app_or_workspace(
if not app: if not app:
auth_logger.warning(f"App not found for API Key: {api_key_obj.resource_id}") auth_logger.warning(f"App not found for API Key: {api_key_obj.resource_id}")
raise credentials_exception raise credentials_exception
ApiKeyAuthService.check_app_published(db, api_key_obj)
auth_logger.info(f"App access granted: {app.id}") auth_logger.info(f"App access granted: {app.id}")
return app return app

View File

@@ -204,6 +204,7 @@ class ConversationRepository:
app_id: uuid.UUID, app_id: uuid.UUID,
workspace_id: uuid.UUID, workspace_id: uuid.UUID,
is_draft: Optional[bool] = None, is_draft: Optional[bool] = None,
keyword: Optional[str] = None,
page: int = 1, page: int = 1,
pagesize: int = 20 pagesize: int = 20
) -> tuple[list[Conversation], int]: ) -> tuple[list[Conversation], int]:
@@ -213,29 +214,41 @@ class ConversationRepository:
Args: Args:
app_id: 应用 ID app_id: 应用 ID
workspace_id: 工作空间 ID workspace_id: 工作空间 ID
is_draft: 是否草稿会话None 表示不过滤 is_draft: 是否草稿会话None表示返回全部
keyword: 搜索关键词(匹配消息内容)
page: 页码(从 1 开始) page: 页码(从 1 开始)
pagesize: 每页数量 pagesize: 每页数量
Returns: Returns:
Tuple[List[Conversation], int]: (会话列表,总数) Tuple[List[Conversation], int]: (会话列表,总数)
""" """
stmt = select(Conversation).where( base_conditions = [
Conversation.app_id == app_id, Conversation.app_id == app_id,
Conversation.workspace_id == workspace_id, Conversation.workspace_id == workspace_id,
Conversation.is_active.is_(True) Conversation.is_active.is_(True),
) ]
if is_draft is not None: if is_draft is not None:
stmt = stmt.where(Conversation.is_draft == is_draft) base_conditions.append(Conversation.is_draft == is_draft)
base_stmt = select(Conversation).where(*base_conditions)
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
if keyword:
# 查找包含关键词的 conversation_id 列表
keyword_stmt = (
select(Message.conversation_id)
.where(Message.content.ilike(f"%{keyword}%"))
.distinct()
)
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
# Calculate total number of records # Calculate total number of records
total = int(self.db.execute( total = int(self.db.execute(
select(func.count()).select_from(stmt.subquery()) select(func.count()).select_from(base_stmt.subquery())
).scalar_one()) ).scalar_one())
# Apply pagination # Apply pagination
stmt = stmt.order_by(desc(Conversation.updated_at)) stmt = base_stmt.order_by(desc(Conversation.updated_at))
stmt = stmt.offset((page - 1) * pagesize).limit(pagesize) stmt = stmt.offset((page - 1) * pagesize).limit(pagesize)
conversations = list(self.db.scalars(stmt).all()) conversations = list(self.db.scalars(stmt).all())
@@ -245,6 +258,7 @@ class ConversationRepository:
extra={ extra={
"app_id": str(app_id), "app_id": str(app_id),
"workspace_id": str(workspace_id), "workspace_id": str(workspace_id),
"keyword": keyword,
"returned": len(conversations), "returned": len(conversations),
"total": total "total": total
} }

View File

@@ -48,6 +48,21 @@ class AppLogConversation(BaseModel):
return int(dt.timestamp() * 1000) if dt else None return int(dt.timestamp() * 1000) if dt else None
class AppLogNodeExecution(BaseModel):
"""工作流节点执行记录"""
node_id: str
node_type: str
node_name: Optional[str] = None
status: str = "pending"
error: Optional[str] = None
input: Optional[Any] = None
process: Optional[Any] = None
output: Optional[Any] = None
elapsed_time: Optional[float] = None
token_usage: Optional[Dict[str, Any]] = None
class AppLogConversationDetail(AppLogConversation): class AppLogConversationDetail(AppLogConversation):
"""会话详情(包含消息列表)""" """会话详情(包含消息列表)"""
messages: List[AppLogMessage] = Field(default_factory=list) messages: List[AppLogMessage] = Field(default_factory=list)
node_executions_map: Dict[str, List[AppLogNodeExecution]] = Field(default_factory=dict, description="按消息ID分组的节点执行记录")

View File

@@ -19,6 +19,7 @@ from app.core.exceptions import (
) )
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.models.app_model import App
logger = get_business_logger() logger = get_business_logger()
@@ -442,6 +443,17 @@ class ApiKeyAuthService:
return api_key_obj return api_key_obj
@staticmethod
def check_app_published(db: Session, api_key_obj: ApiKey) -> None:
"""
检查应用是否已发布,未发布则抛出异常
"""
if not api_key_obj.resource_id:
return
app = db.get(App, api_key_obj.resource_id)
if not app or not app.current_release_id:
raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED)
@staticmethod @staticmethod
def check_scope(api_key: ApiKey, required_scope: str) -> bool: def check_scope(api_key: ApiKey, required_scope: str) -> bool:
"""检查权限范围""" """检查权限范围"""

View File

@@ -3,11 +3,14 @@ import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from datetime import datetime from datetime import datetime
from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.models.conversation_model import Conversation, Message from app.models.conversation_model import Conversation, Message
from app.models.workflow_model import WorkflowExecution
from app.repositories.conversation_repository import ConversationRepository, MessageRepository from app.repositories.conversation_repository import ConversationRepository, MessageRepository
from app.schemas.app_log_schema import AppLogNodeExecution
logger = get_business_logger() logger = get_business_logger()
@@ -27,6 +30,7 @@ class AppLogService:
page: int = 1, page: int = 1,
pagesize: int = 20, pagesize: int = 20,
is_draft: Optional[bool] = None, is_draft: Optional[bool] = None,
keyword: Optional[str] = None,
) -> Tuple[list[Conversation], int]: ) -> Tuple[list[Conversation], int]:
""" """
查询应用日志会话列表 查询应用日志会话列表
@@ -36,7 +40,8 @@ class AppLogService:
workspace_id: 工作空间 ID workspace_id: 工作空间 ID
page: 页码(从 1 开始) page: 页码(从 1 开始)
pagesize: 每页数量 pagesize: 每页数量
is_draft: 是否草稿会话None 表示不过滤 is_draft: 是否草稿会话None表示返回全部
keyword: 搜索关键词(匹配消息内容)
Returns: Returns:
Tuple[list[Conversation], int]: (会话列表,总数) Tuple[list[Conversation], int]: (会话列表,总数)
@@ -48,7 +53,8 @@ class AppLogService:
"workspace_id": str(workspace_id), "workspace_id": str(workspace_id),
"page": page, "page": page,
"pagesize": pagesize, "pagesize": pagesize,
"is_draft": is_draft "is_draft": is_draft,
"keyword": keyword
} }
) )
@@ -57,6 +63,7 @@ class AppLogService:
app_id=app_id, app_id=app_id,
workspace_id=workspace_id, workspace_id=workspace_id,
is_draft=is_draft, is_draft=is_draft,
keyword=keyword,
page=page, page=page,
pagesize=pagesize pagesize=pagesize
) )
@@ -77,9 +84,9 @@ class AppLogService:
app_id: uuid.UUID, app_id: uuid.UUID,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
workspace_id: uuid.UUID workspace_id: uuid.UUID
) -> Conversation: ) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]:
""" """
查询会话详情(包含消息) 查询会话详情(包含消息和工作流节点执行记录
Args: Args:
app_id: 应用 ID app_id: 应用 ID
@@ -87,7 +94,8 @@ class AppLogService:
workspace_id: 工作空间 ID workspace_id: 工作空间 ID
Returns: Returns:
Conversation: 包含消息的会话对象 Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]:
(包含消息的会话对象, 按消息ID分组的节点执行记录)
Raises: Raises:
ResourceNotFoundException: 当会话不存在时 ResourceNotFoundException: 当会话不存在时
@@ -116,13 +124,117 @@ class AppLogService:
# 将消息附加到会话对象 # 将消息附加到会话对象
conversation.messages = messages conversation.messages = messages
# 查询工作流节点执行记录(按消息分组)
_, node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
)
logger.info( logger.info(
"查询应用日志会话详情成功", "查询应用日志会话详情成功",
extra={ extra={
"app_id": str(app_id), "app_id": str(app_id),
"conversation_id": str(conversation_id), "conversation_id": str(conversation_id),
"message_count": len(messages) "message_count": len(messages),
"message_with_nodes_count": len(node_executions_map)
} }
) )
return conversation return conversation, node_executions_map
def _get_workflow_node_executions_with_map(
self,
conversation_id: uuid.UUID,
messages: list[Message]
) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
"""
从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组
Args:
conversation_id: 会话 ID
messages: 消息列表
Returns:
Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
(所有节点执行记录列表, 按 message_id 分组的节点执行记录字典)
"""
node_executions = []
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
# 查询该会话关联的所有工作流执行记录(按时间正序)
stmt = select(WorkflowExecution).where(
WorkflowExecution.conversation_id == conversation_id,
WorkflowExecution.status == "completed"
).order_by(WorkflowExecution.started_at.asc())
executions = self.db.scalars(stmt).all()
logger.info(
f"查询到 {len(executions)} 条工作流执行记录",
extra={
"conversation_id": str(conversation_id),
"execution_count": len(executions),
"execution_ids": [str(e.id) for e in executions]
}
)
# 筛选出 workflow 执行产生的 assistant 消息(排除开场白)
# workflow 结果的 meta_data 包含 usage而开场白包含 suggested_questions
assistant_messages = [
m for m in messages
if m.role == "assistant" and m.meta_data and "usage" in m.meta_data
]
# 通过时序匹配,将 execution 和 assistant message 关联
used_message_ids: set[str] = set()
for execution in executions:
if not execution.output_data:
continue
# 找到该 execution 对应的 assistant message
# 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message
best_msg = None
best_dt = None
for msg in assistant_messages:
msg_id_str = str(msg.id)
if msg_id_str in used_message_ids:
continue
if msg.created_at and msg.created_at >= execution.started_at:
dt = (msg.created_at - execution.started_at).total_seconds()
if best_dt is None or dt < best_dt:
best_dt = dt
best_msg = msg
if not best_msg:
continue
msg_id_str = str(best_msg.id)
used_message_ids.add(msg_id_str)
# 提取节点输出
output_data = execution.output_data
if isinstance(output_data, dict):
node_outputs = output_data.get("node_outputs", {})
execution_nodes = []
for node_id, node_data in node_outputs.items():
if not isinstance(node_data, dict):
continue
node_execution = AppLogNodeExecution(
node_id=node_data.get("node_id", node_id),
node_type=node_data.get("node_type", "unknown"),
node_name=node_data.get("node_name"),
status=node_data.get("status", "unknown"),
error=node_data.get("error"),
input=node_data.get("input"),
process=node_data.get("process"),
output=node_data.get("output"),
elapsed_time=node_data.get("elapsed_time"),
token_usage=node_data.get("token_usage"),
)
node_executions.append(node_execution)
execution_nodes.append(node_execution)
# 将节点记录关联到 message_id
node_executions_map[msg_id_str] = execution_nodes
return node_executions, node_executions_map

View File

@@ -14,6 +14,7 @@ from app.core.exceptions import BusinessException
from app.core.workflow.adapters.base_adapter import WorkflowImportResult, WorkflowParserResult from app.core.workflow.adapters.base_adapter import WorkflowImportResult, WorkflowParserResult
from app.core.workflow.adapters.errors import UnsupportedPlatform, InvalidConfiguration from app.core.workflow.adapters.errors import UnsupportedPlatform, InvalidConfiguration
from app.core.workflow.adapters.registry import PlatformAdapterRegistry from app.core.workflow.adapters.registry import PlatformAdapterRegistry
from app.models.app_model import AppType
from app.schemas import AppCreate from app.schemas import AppCreate
from app.schemas.workflow_schema import WorkflowConfigCreate from app.schemas.workflow_schema import WorkflowConfigCreate
from app.services.app_service import AppService from app.services.app_service import AppService
@@ -86,11 +87,12 @@ class WorkflowImportService:
if config is None: if config is None:
raise BusinessException("Configuration import timed out. Please try again.") raise BusinessException("Configuration import timed out. Please try again.")
config = json.loads(config) config = json.loads(config)
unique_name = self.app_service._unique_app_name(name, workspace_id, AppType.WORKFLOW)
app = self.app_service.create_app( app = self.app_service.create_app(
user_id=user_id, user_id=user_id,
workspace_id=workspace_id, workspace_id=workspace_id,
data=AppCreate( data=AppCreate(
name=name, name=unique_name,
description=description, description=description,
type="workflow", type="workflow",
workflow_config=WorkflowConfigCreate( workflow_config=WorkflowConfigCreate(