From 404ce9f9ba2dd04029126d6c51902812787bbace Mon Sep 17 00:00:00 2001 From: wwq Date: Thu, 23 Apr 2026 15:46:12 +0800 Subject: [PATCH] feat(workflow): enhance HTTP request node with curl debugging support - Augment HTTP request node capabilities and add generated curl commands for easier debugging. feat(log): implement workflow execution logs and search functionality - Add detailed logging for workflow node execution and enable search capabilities within application logs. feat(auth): introduce middleware to verify application publication status - Add a check to ensure the application is published before allowing access. fix(converter): rectify variable handling logic in Dify converter - Correct issues related to processing variables within the Dify converter module. refactor(model): remove quota check decorator from model update operations - Decouple quota validation from the model update process to streamline the logic. --- api/app/controllers/app_log_controller.py | 10 +- api/app/controllers/model_controller.py | 1 - api/app/core/api_key_auth.py | 2 + api/app/core/error_codes.py | 1 + .../core/workflow/adapters/dify/converter.py | 20 ++- .../workflow/nodes/http_request/config.py | 15 +++ .../core/workflow/nodes/http_request/node.py | 106 +++++++++++++-- api/app/dependencies.py | 1 + .../repositories/conversation_repository.py | 27 ++-- api/app/schemas/app_log_schema.py | 14 ++ api/app/services/api_key_service.py | 12 ++ api/app/services/app_log_service.py | 127 ++++++++++++++++-- api/app/services/workflow_import_service.py | 4 +- 13 files changed, 305 insertions(+), 35 deletions(-) diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index 92b5becd..7732dd37 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -24,13 +24,15 @@ def list_app_logs( app_id: uuid.UUID, page: int = Query(1, ge=1), pagesize: int = Query(20, ge=1, le=100), - is_draft: Optional[bool] = None, + is_draft: bool = Query(False, description="是否草稿会话(默认false,即发布会话)"), + keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """查看应用下所有会话记录(分页) - 支持按 is_draft 筛选(草稿会话 / 发布会话) + - 支持按 keyword 搜索(匹配消息内容) - 按最新更新时间倒序排列 - 所有人(包括共享者和被共享者)都只能查看自己的会话记录 """ @@ -47,7 +49,8 @@ def list_app_logs( workspace_id=workspace_id, page=page, pagesize=pagesize, - is_draft=is_draft + is_draft=is_draft, + keyword=keyword ) items = [AppLogConversation.model_validate(c) for c in conversations] @@ -78,12 +81,13 @@ def get_app_log_detail( # 使用 Service 层查询 log_service = AppLogService(db) - conversation = log_service.get_conversation_detail( + conversation, node_executions_map = log_service.get_conversation_detail( app_id=app_id, conversation_id=conversation_id, workspace_id=workspace_id ) detail = AppLogConversationDetail.model_validate(conversation) + detail.node_executions_map = node_executions_map return success(data=detail) diff --git a/api/app/controllers/model_controller.py b/api/app/controllers/model_controller.py index 57c22337..4958152b 100644 --- a/api/app/controllers/model_controller.py +++ b/api/app/controllers/model_controller.py @@ -373,7 +373,6 @@ def delete_composite_model( @router.put("/{model_id}", response_model=ApiResponse) -@check_model_activation_quota def update_model( model_id: uuid.UUID, model_data: model_schema.ModelConfigUpdate, diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index 05bca945..448a0f26 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -70,6 +70,8 @@ def require_api_key( }) raise BusinessException("API Key 无效或已过期", BizCode.API_KEY_INVALID) + ApiKeyAuthService.check_app_published(db, api_key_obj) + if scopes: missing_scopes = [] for scope in scopes: diff --git a/api/app/core/error_codes.py b/api/app/core/error_codes.py index 77bce6b4..2917a203 100644 --- a/api/app/core/error_codes.py +++ b/api/app/core/error_codes.py @@ -66,6 +66,7 @@ class BizCode(IntEnum): PERMISSION_DENIED = 6010 INVALID_CONVERSATION = 6011 CONFIG_MISSING = 6012 + APP_NOT_PUBLISHED = 6013 # 模型(7xxx) MODEL_CONFIG_INVALID = 7001 diff --git a/api/app/core/workflow/adapters/dify/converter.py b/api/app/core/workflow/adapters/dify/converter.py index ad9312e1..b61c2118 100644 --- a/api/app/core/workflow/adapters/dify/converter.py +++ b/api/app/core/workflow/adapters/dify/converter.py @@ -155,8 +155,13 @@ class DifyConverter(BaseConverter): def replacer(match: re.Match) -> str: raw_name = match.group(1) - new_name = self.process_var_selector(raw_name) - return f"{{{{{new_name}}}}}" + try: + new_name = self.process_var_selector(raw_name) + if not new_name: + return match.group(0) + return f"{{{{{new_name}}}}}" + except Exception: + return match.group(0) return pattern.sub(replacer, content) @@ -600,8 +605,15 @@ class DifyConverter(BaseConverter): ] = self.trans_variable_format(content["value"]) else: if node_data["body"]["data"]: - body_content = (node_data["body"]["data"][0].get("value") or - self._process_list_variable_literal(node_data["body"]["data"][0].get("file"))) + data_entry = node_data["body"]["data"][0] + body_content = data_entry.get("value") + if not body_content and data_entry.get("file"): + body_content = self._process_list_variable_literal(data_entry.get("file")) + if not body_content: + body_content = "" + elif isinstance(body_content, str): + # Convert session variable format for JSON body + body_content = self.trans_variable_format(body_content) else: body_content = "" diff --git a/api/app/core/workflow/nodes/http_request/config.py b/api/app/core/workflow/nodes/http_request/config.py index 72474436..527a80ff 100644 --- a/api/app/core/workflow/nodes/http_request/config.py +++ b/api/app/core/workflow/nodes/http_request/config.py @@ -132,11 +132,21 @@ class HttpErrorDefaultTemplate(BaseModel): description="Default HTTP headers returned on error", ) + files: list = Field( + default_factory=list, + description="Default files list returned on error", + ) + output: str = Field( default="SUCCESS", description="HTTP response body", ) + curl: str = Field( + default="", + description="Default curl command returned on error", + ) + class HttpErrorHandleConfig(BaseModel): method: HttpErrorHandle = Field( @@ -272,6 +282,11 @@ class HttpRequestNodeOutput(BaseModel): description="HTTP response body", ) + curl: str = Field( + default="", + description="Equivalent curl command for the HTTP request", + ) + # files: list[File] = Field( # ... # ) diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index 783c230b..f08e0bc1 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -167,7 +167,8 @@ class HttpRequestNode(BaseNode): "status_code": VariableType.NUMBER, "headers": VariableType.OBJECT, "files": VariableType.ARRAY_FILE, - "output": VariableType.STRING + "output": VariableType.STRING, + "curl": VariableType.STRING } def _build_timeout(self) -> Timeout: @@ -255,9 +256,13 @@ class HttpRequestNode(BaseNode): case HttpContentType.NONE: return {} case HttpContentType.JSON: - content["json"] = json.loads(self._render_template( + rendered_body = self._render_template( self.typed_config.body.data, variable_pool - )) + ).strip() + if not rendered_body: + content["json"] = {} + else: + content["json"] = json.loads(rendered_body) case HttpContentType.FROM_DATA: data = {} files = [] @@ -325,6 +330,65 @@ class HttpRequestNode(BaseNode): case _: raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}") + def _generate_curl_command( + self, + variable_pool: VariablePool, + url: str, + headers: dict[str, str], + params: dict[str, str], + content: dict[str, Any] + ) -> str: + """ + Generate equivalent curl command for debugging. + + Args: + variable_pool: Variable Pool + url: Rendered URL + headers: Request headers + params: Query parameters + content: Request body content + + Returns: + Curl command string + """ + # Start with curl command + curl_parts = ["curl"] + + # Add HTTP method + method = self.typed_config.method.value + if method != "GET": + curl_parts.append(f"-X {method}") + + # Add URL with query parameters + if params: + param_str = "&".join([f"{k}={v}" for k, v in params.items()]) + full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}" + else: + full_url = url + + curl_parts.append(f"'{full_url}'") + + # Add headers + for key, value in headers.items(): + curl_parts.append(f"-H '{key}: {value}'") + + # Add body based on content type + if "json" in content: + json_body = json.dumps(content["json"], ensure_ascii=False) + curl_parts.append(f"-d '{json_body}'") + elif "data" in content and "files" not in content: + # Form data + if isinstance(content["data"], dict): + for key, value in content["data"].items(): + curl_parts.append(f"-F '{key}={value}'") + elif "content" in content: + # Raw content + curl_parts.append(f"-d '{content['content']}'") + elif "files" in content: + curl_parts.append("# Note: This request includes file uploads") + + return " \\\n ".join(curl_parts) + async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str: """ Execute the HTTP request node. @@ -343,11 +407,22 @@ class HttpRequestNode(BaseNode): - str: Branch identifier (e.g. "ERROR") when branching is enabled """ self.typed_config = HttpRequestNodeConfig(**self.config) + + # Build request components + headers = self._build_header(variable_pool) | self._build_auth(variable_pool) + params = self._build_params(variable_pool) + content = await self._build_content(variable_pool) + url = self._render_template(self.typed_config.url, variable_pool) + + # Generate curl command for debugging + curl_command = self._generate_curl_command(variable_pool, url, headers, params, content) + logger.info(f"Node {self.node_id}: Generated curl command:\n{curl_command}") + async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), - headers=self._build_header(variable_pool) | self._build_auth(variable_pool), - params=self._build_params(variable_pool), + headers=headers, + params=params, follow_redirects=True ) as client: retries = self.typed_config.retry.max_attempts @@ -355,8 +430,8 @@ class HttpRequestNode(BaseNode): try: request_func = self._get_client_method(client) resp = await request_func( - url=self._render_template(self.typed_config.url, variable_pool), - **(await self._build_content(variable_pool)) + url=url, + **content ) resp.raise_for_status() logger.info(f"Node {self.node_id}: HTTP request succeeded") @@ -365,7 +440,8 @@ class HttpRequestNode(BaseNode): body=response.body, status_code=resp.status_code, headers=resp.headers, - files=response.files + files=response.files, + curl=curl_command ).model_dump() except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error(f"HTTP request node exception: {e}") @@ -382,10 +458,20 @@ class HttpRequestNode(BaseNode): logger.warning( f"Node {self.node_id}: HTTP request failed, returning default result" ) - return self.typed_config.error_handle.default.model_dump() + # Update curl command in default error template + error_result = self.typed_config.error_handle.default.model_dump() + error_result["curl"] = curl_command + return error_result case HttpErrorHandle.BRANCH: logger.warning( f"Node {self.node_id}: HTTP request failed, switching to error handling branch" ) - return {"output": "ERROR"} + return { + "output": "ERROR", + "body": "", + "status_code": 500, + "headers": {}, + "files": [], + "curl": curl_command + } raise RuntimeError("http request failed") diff --git a/api/app/dependencies.py b/api/app/dependencies.py index 10684788..e5b656a5 100644 --- a/api/app/dependencies.py +++ b/api/app/dependencies.py @@ -564,6 +564,7 @@ async def get_app_or_workspace( if not app: auth_logger.warning(f"App not found for API Key: {api_key_obj.resource_id}") raise credentials_exception + ApiKeyAuthService.check_app_published(db, api_key_obj) auth_logger.info(f"App access granted: {app.id}") return app diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py index 0676a255..2b4e7ab7 100644 --- a/api/app/repositories/conversation_repository.py +++ b/api/app/repositories/conversation_repository.py @@ -203,7 +203,8 @@ class ConversationRepository: self, app_id: uuid.UUID, workspace_id: uuid.UUID, - is_draft: Optional[bool] = None, + is_draft: bool = False, + keyword: Optional[str] = None, page: int = 1, pagesize: int = 20 ) -> tuple[list[Conversation], int]: @@ -213,29 +214,38 @@ class ConversationRepository: Args: app_id: 应用 ID workspace_id: 工作空间 ID - is_draft: 是否草稿会话(None 表示不过滤) + is_draft: 是否草稿会话(默认False,即发布会话) + keyword: 搜索关键词(匹配消息内容) page: 页码(从 1 开始) pagesize: 每页数量 Returns: Tuple[List[Conversation], int]: (会话列表,总数) """ - stmt = select(Conversation).where( + base_stmt = select(Conversation).where( Conversation.app_id == app_id, Conversation.workspace_id == workspace_id, - Conversation.is_active.is_(True) + Conversation.is_active.is_(True), + Conversation.is_draft == is_draft ) - if is_draft is not None: - stmt = stmt.where(Conversation.is_draft == is_draft) + # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation + if keyword: + # 查找包含关键词的 conversation_id 列表 + keyword_stmt = ( + select(Message.conversation_id) + .where(Message.content.ilike(f"%{keyword}%")) + .distinct() + ) + base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt)) # Calculate total number of records total = int(self.db.execute( - select(func.count()).select_from(stmt.subquery()) + select(func.count()).select_from(base_stmt.subquery()) ).scalar_one()) # Apply pagination - stmt = stmt.order_by(desc(Conversation.updated_at)) + stmt = base_stmt.order_by(desc(Conversation.updated_at)) stmt = stmt.offset((page - 1) * pagesize).limit(pagesize) conversations = list(self.db.scalars(stmt).all()) @@ -245,6 +255,7 @@ class ConversationRepository: extra={ "app_id": str(app_id), "workspace_id": str(workspace_id), + "keyword": keyword, "returned": len(conversations), "total": total } diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index bda78138..0a1855f7 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -48,6 +48,20 @@ class AppLogConversation(BaseModel): return int(dt.timestamp() * 1000) if dt else None +class AppLogNodeExecution(BaseModel): + """工作流节点执行记录""" + node_id: str + node_type: str + node_name: Optional[str] = None + status: str = "pending" + error: Optional[str] = None + input: Optional[Any] = None + output: Optional[Any] = None + elapsed_time: Optional[float] = None + token_usage: Optional[Dict[str, Any]] = None + + class AppLogConversationDetail(AppLogConversation): """会话详情(包含消息列表)""" messages: List[AppLogMessage] = Field(default_factory=list) + node_executions_map: Dict[str, List[AppLogNodeExecution]] = Field(default_factory=dict, description="按消息ID分组的节点执行记录") diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 4856365a..49b07121 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -19,6 +19,7 @@ from app.core.exceptions import ( ) from app.core.error_codes import BizCode from app.core.logging_config import get_business_logger +from app.models.app_model import App logger = get_business_logger() @@ -442,6 +443,17 @@ class ApiKeyAuthService: return api_key_obj + @staticmethod + def check_app_published(db: Session, api_key_obj: ApiKey) -> None: + """ + 检查应用是否已发布,未发布则抛出异常 + """ + if not api_key_obj.resource_id: + return + app = db.get(App, api_key_obj.resource_id) + if not app or not app.current_release_id: + raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED) + @staticmethod def check_scope(api_key: ApiKey, required_scope: str) -> bool: """检查权限范围""" diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 856045d1..8f6752ef 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -3,11 +3,14 @@ import uuid from typing import Optional, Tuple from datetime import datetime +from sqlalchemy import select from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger from app.models.conversation_model import Conversation, Message +from app.models.workflow_model import WorkflowExecution from app.repositories.conversation_repository import ConversationRepository, MessageRepository +from app.schemas.app_log_schema import AppLogNodeExecution logger = get_business_logger() @@ -26,7 +29,8 @@ class AppLogService: workspace_id: uuid.UUID, page: int = 1, pagesize: int = 20, - is_draft: Optional[bool] = None, + is_draft: bool = False, + keyword: Optional[str] = None, ) -> Tuple[list[Conversation], int]: """ 查询应用日志会话列表 @@ -36,7 +40,8 @@ class AppLogService: workspace_id: 工作空间 ID page: 页码(从 1 开始) pagesize: 每页数量 - is_draft: 是否草稿会话(None 表示不过滤) + is_draft: 是否草稿会话(默认False,即发布会话) + keyword: 搜索关键词(匹配消息内容) Returns: Tuple[list[Conversation], int]: (会话列表,总数) @@ -48,7 +53,8 @@ class AppLogService: "workspace_id": str(workspace_id), "page": page, "pagesize": pagesize, - "is_draft": is_draft + "is_draft": is_draft, + "keyword": keyword } ) @@ -57,6 +63,7 @@ class AppLogService: app_id=app_id, workspace_id=workspace_id, is_draft=is_draft, + keyword=keyword, page=page, pagesize=pagesize ) @@ -77,9 +84,9 @@ class AppLogService: app_id: uuid.UUID, conversation_id: uuid.UUID, workspace_id: uuid.UUID - ) -> Conversation: + ) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: """ - 查询会话详情(包含消息) + 查询会话详情(包含消息和工作流节点执行记录) Args: app_id: 应用 ID @@ -87,7 +94,8 @@ class AppLogService: workspace_id: 工作空间 ID Returns: - Conversation: 包含消息的会话对象 + Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: + (包含消息的会话对象, 按消息ID分组的节点执行记录) Raises: ResourceNotFoundException: 当会话不存在时 @@ -116,13 +124,116 @@ class AppLogService: # 将消息附加到会话对象 conversation.messages = messages + # 查询工作流节点执行记录(按消息分组) + _, node_executions_map = self._get_workflow_node_executions_with_map( + conversation_id, messages + ) + logger.info( "查询应用日志会话详情成功", extra={ "app_id": str(app_id), "conversation_id": str(conversation_id), - "message_count": len(messages) + "message_count": len(messages), + "message_with_nodes_count": len(node_executions_map) } ) - return conversation + return conversation, node_executions_map + + def _get_workflow_node_executions_with_map( + self, + conversation_id: uuid.UUID, + messages: list[Message] + ) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + """ + 从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组 + + Args: + conversation_id: 会话 ID + messages: 消息列表 + + Returns: + Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + (所有节点执行记录列表, 按 message_id 分组的节点执行记录字典) + """ + node_executions = [] + node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + + # 查询该会话关联的所有工作流执行记录(按时间正序) + stmt = select(WorkflowExecution).where( + WorkflowExecution.conversation_id == conversation_id, + WorkflowExecution.status == "completed" + ).order_by(WorkflowExecution.started_at.asc()) + + executions = self.db.scalars(stmt).all() + + logger.info( + f"查询到 {len(executions)} 条工作流执行记录", + extra={ + "conversation_id": str(conversation_id), + "execution_count": len(executions), + "execution_ids": [str(e.id) for e in executions] + } + ) + + # 筛选出 workflow 执行产生的 assistant 消息(排除开场白) + # workflow 结果的 meta_data 包含 usage,而开场白包含 suggested_questions + assistant_messages = [ + m for m in messages + if m.role == "assistant" and m.meta_data and "usage" in m.meta_data + ] + + # 通过时序匹配,将 execution 和 assistant message 关联 + used_message_ids: set[str] = set() + + for execution in executions: + if not execution.output_data: + continue + + # 找到该 execution 对应的 assistant message + # 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message + best_msg = None + best_dt = None + for msg in assistant_messages: + msg_id_str = str(msg.id) + if msg_id_str in used_message_ids: + continue + if msg.created_at and msg.created_at >= execution.started_at: + dt = (msg.created_at - execution.started_at).total_seconds() + if best_dt is None or dt < best_dt: + best_dt = dt + best_msg = msg + + if not best_msg: + continue + + msg_id_str = str(best_msg.id) + used_message_ids.add(msg_id_str) + + # 提取节点输出 + output_data = execution.output_data + if isinstance(output_data, dict): + node_outputs = output_data.get("node_outputs", {}) + execution_nodes = [] + for node_id, node_data in node_outputs.items(): + if not isinstance(node_data, dict): + continue + node_execution = AppLogNodeExecution( + node_id=node_data.get("node_id", node_id), + node_type=node_data.get("node_type", "unknown"), + node_name=node_data.get("node_name"), + status=node_data.get("status", "unknown"), + error=node_data.get("error"), + input=node_data.get("input"), + output=node_data.get("output"), + elapsed_time=node_data.get("elapsed_time"), + token_usage=node_data.get("token_usage"), + ) + node_executions.append(node_execution) + execution_nodes.append(node_execution) + + # 将节点记录关联到 message_id + node_executions_map[msg_id_str] = execution_nodes + + return node_executions, node_executions_map diff --git a/api/app/services/workflow_import_service.py b/api/app/services/workflow_import_service.py index 5a766a72..0c543d1f 100644 --- a/api/app/services/workflow_import_service.py +++ b/api/app/services/workflow_import_service.py @@ -14,6 +14,7 @@ from app.core.exceptions import BusinessException from app.core.workflow.adapters.base_adapter import WorkflowImportResult, WorkflowParserResult from app.core.workflow.adapters.errors import UnsupportedPlatform, InvalidConfiguration from app.core.workflow.adapters.registry import PlatformAdapterRegistry +from app.models.app_model import AppType from app.schemas import AppCreate from app.schemas.workflow_schema import WorkflowConfigCreate from app.services.app_service import AppService @@ -86,11 +87,12 @@ class WorkflowImportService: if config is None: raise BusinessException("Configuration import timed out. Please try again.") config = json.loads(config) + unique_name = self.app_service._unique_app_name(name, workspace_id, AppType.WORKFLOW) app = self.app_service.create_app( user_id=user_id, workspace_id=workspace_id, data=AppCreate( - name=name, + name=unique_name, description=description, type="workflow", workflow_config=WorkflowConfigCreate(