From 404ce9f9ba2dd04029126d6c51902812787bbace Mon Sep 17 00:00:00 2001 From: wwq Date: Thu, 23 Apr 2026 15:46:12 +0800 Subject: [PATCH 1/5] feat(workflow): enhance HTTP request node with curl debugging support - Augment HTTP request node capabilities and add generated curl commands for easier debugging. feat(log): implement workflow execution logs and search functionality - Add detailed logging for workflow node execution and enable search capabilities within application logs. feat(auth): introduce middleware to verify application publication status - Add a check to ensure the application is published before allowing access. fix(converter): rectify variable handling logic in Dify converter - Correct issues related to processing variables within the Dify converter module. refactor(model): remove quota check decorator from model update operations - Decouple quota validation from the model update process to streamline the logic. --- api/app/controllers/app_log_controller.py | 10 +- api/app/controllers/model_controller.py | 1 - api/app/core/api_key_auth.py | 2 + api/app/core/error_codes.py | 1 + .../core/workflow/adapters/dify/converter.py | 20 ++- .../workflow/nodes/http_request/config.py | 15 +++ .../core/workflow/nodes/http_request/node.py | 106 +++++++++++++-- api/app/dependencies.py | 1 + .../repositories/conversation_repository.py | 27 ++-- api/app/schemas/app_log_schema.py | 14 ++ api/app/services/api_key_service.py | 12 ++ api/app/services/app_log_service.py | 127 ++++++++++++++++-- api/app/services/workflow_import_service.py | 4 +- 13 files changed, 305 insertions(+), 35 deletions(-) diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index 92b5becd..7732dd37 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -24,13 +24,15 @@ def list_app_logs( app_id: uuid.UUID, page: int = Query(1, ge=1), pagesize: int = Query(20, ge=1, le=100), - is_draft: Optional[bool] = None, + is_draft: bool = Query(False, description="是否草稿会话(默认false,即发布会话)"), + keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """查看应用下所有会话记录(分页) - 支持按 is_draft 筛选(草稿会话 / 发布会话) + - 支持按 keyword 搜索(匹配消息内容) - 按最新更新时间倒序排列 - 所有人(包括共享者和被共享者)都只能查看自己的会话记录 """ @@ -47,7 +49,8 @@ def list_app_logs( workspace_id=workspace_id, page=page, pagesize=pagesize, - is_draft=is_draft + is_draft=is_draft, + keyword=keyword ) items = [AppLogConversation.model_validate(c) for c in conversations] @@ -78,12 +81,13 @@ def get_app_log_detail( # 使用 Service 层查询 log_service = AppLogService(db) - conversation = log_service.get_conversation_detail( + conversation, node_executions_map = log_service.get_conversation_detail( app_id=app_id, conversation_id=conversation_id, workspace_id=workspace_id ) detail = AppLogConversationDetail.model_validate(conversation) + detail.node_executions_map = node_executions_map return success(data=detail) diff --git a/api/app/controllers/model_controller.py b/api/app/controllers/model_controller.py index 57c22337..4958152b 100644 --- a/api/app/controllers/model_controller.py +++ b/api/app/controllers/model_controller.py @@ -373,7 +373,6 @@ def delete_composite_model( @router.put("/{model_id}", response_model=ApiResponse) -@check_model_activation_quota def update_model( model_id: uuid.UUID, model_data: model_schema.ModelConfigUpdate, diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index 05bca945..448a0f26 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -70,6 +70,8 @@ def require_api_key( }) raise BusinessException("API Key 无效或已过期", BizCode.API_KEY_INVALID) + ApiKeyAuthService.check_app_published(db, api_key_obj) + if scopes: missing_scopes = [] for scope in scopes: diff --git a/api/app/core/error_codes.py b/api/app/core/error_codes.py index 77bce6b4..2917a203 100644 --- a/api/app/core/error_codes.py +++ b/api/app/core/error_codes.py @@ -66,6 +66,7 @@ class BizCode(IntEnum): PERMISSION_DENIED = 6010 INVALID_CONVERSATION = 6011 CONFIG_MISSING = 6012 + APP_NOT_PUBLISHED = 6013 # 模型(7xxx) MODEL_CONFIG_INVALID = 7001 diff --git a/api/app/core/workflow/adapters/dify/converter.py b/api/app/core/workflow/adapters/dify/converter.py index ad9312e1..b61c2118 100644 --- a/api/app/core/workflow/adapters/dify/converter.py +++ b/api/app/core/workflow/adapters/dify/converter.py @@ -155,8 +155,13 @@ class DifyConverter(BaseConverter): def replacer(match: re.Match) -> str: raw_name = match.group(1) - new_name = self.process_var_selector(raw_name) - return f"{{{{{new_name}}}}}" + try: + new_name = self.process_var_selector(raw_name) + if not new_name: + return match.group(0) + return f"{{{{{new_name}}}}}" + except Exception: + return match.group(0) return pattern.sub(replacer, content) @@ -600,8 +605,15 @@ class DifyConverter(BaseConverter): ] = self.trans_variable_format(content["value"]) else: if node_data["body"]["data"]: - body_content = (node_data["body"]["data"][0].get("value") or - self._process_list_variable_literal(node_data["body"]["data"][0].get("file"))) + data_entry = node_data["body"]["data"][0] + body_content = data_entry.get("value") + if not body_content and data_entry.get("file"): + body_content = self._process_list_variable_literal(data_entry.get("file")) + if not body_content: + body_content = "" + elif isinstance(body_content, str): + # Convert session variable format for JSON body + body_content = self.trans_variable_format(body_content) else: body_content = "" diff --git a/api/app/core/workflow/nodes/http_request/config.py b/api/app/core/workflow/nodes/http_request/config.py index 72474436..527a80ff 100644 --- a/api/app/core/workflow/nodes/http_request/config.py +++ b/api/app/core/workflow/nodes/http_request/config.py @@ -132,11 +132,21 @@ class HttpErrorDefaultTemplate(BaseModel): description="Default HTTP headers returned on error", ) + files: list = Field( + default_factory=list, + description="Default files list returned on error", + ) + output: str = Field( default="SUCCESS", description="HTTP response body", ) + curl: str = Field( + default="", + description="Default curl command returned on error", + ) + class HttpErrorHandleConfig(BaseModel): method: HttpErrorHandle = Field( @@ -272,6 +282,11 @@ class HttpRequestNodeOutput(BaseModel): description="HTTP response body", ) + curl: str = Field( + default="", + description="Equivalent curl command for the HTTP request", + ) + # files: list[File] = Field( # ... # ) diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index 783c230b..f08e0bc1 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -167,7 +167,8 @@ class HttpRequestNode(BaseNode): "status_code": VariableType.NUMBER, "headers": VariableType.OBJECT, "files": VariableType.ARRAY_FILE, - "output": VariableType.STRING + "output": VariableType.STRING, + "curl": VariableType.STRING } def _build_timeout(self) -> Timeout: @@ -255,9 +256,13 @@ class HttpRequestNode(BaseNode): case HttpContentType.NONE: return {} case HttpContentType.JSON: - content["json"] = json.loads(self._render_template( + rendered_body = self._render_template( self.typed_config.body.data, variable_pool - )) + ).strip() + if not rendered_body: + content["json"] = {} + else: + content["json"] = json.loads(rendered_body) case HttpContentType.FROM_DATA: data = {} files = [] @@ -325,6 +330,65 @@ class HttpRequestNode(BaseNode): case _: raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}") + def _generate_curl_command( + self, + variable_pool: VariablePool, + url: str, + headers: dict[str, str], + params: dict[str, str], + content: dict[str, Any] + ) -> str: + """ + Generate equivalent curl command for debugging. + + Args: + variable_pool: Variable Pool + url: Rendered URL + headers: Request headers + params: Query parameters + content: Request body content + + Returns: + Curl command string + """ + # Start with curl command + curl_parts = ["curl"] + + # Add HTTP method + method = self.typed_config.method.value + if method != "GET": + curl_parts.append(f"-X {method}") + + # Add URL with query parameters + if params: + param_str = "&".join([f"{k}={v}" for k, v in params.items()]) + full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}" + else: + full_url = url + + curl_parts.append(f"'{full_url}'") + + # Add headers + for key, value in headers.items(): + curl_parts.append(f"-H '{key}: {value}'") + + # Add body based on content type + if "json" in content: + json_body = json.dumps(content["json"], ensure_ascii=False) + curl_parts.append(f"-d '{json_body}'") + elif "data" in content and "files" not in content: + # Form data + if isinstance(content["data"], dict): + for key, value in content["data"].items(): + curl_parts.append(f"-F '{key}={value}'") + elif "content" in content: + # Raw content + curl_parts.append(f"-d '{content['content']}'") + elif "files" in content: + curl_parts.append("# Note: This request includes file uploads") + + return " \\\n ".join(curl_parts) + async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str: """ Execute the HTTP request node. @@ -343,11 +407,22 @@ class HttpRequestNode(BaseNode): - str: Branch identifier (e.g. "ERROR") when branching is enabled """ self.typed_config = HttpRequestNodeConfig(**self.config) + + # Build request components + headers = self._build_header(variable_pool) | self._build_auth(variable_pool) + params = self._build_params(variable_pool) + content = await self._build_content(variable_pool) + url = self._render_template(self.typed_config.url, variable_pool) + + # Generate curl command for debugging + curl_command = self._generate_curl_command(variable_pool, url, headers, params, content) + logger.info(f"Node {self.node_id}: Generated curl command:\n{curl_command}") + async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), - headers=self._build_header(variable_pool) | self._build_auth(variable_pool), - params=self._build_params(variable_pool), + headers=headers, + params=params, follow_redirects=True ) as client: retries = self.typed_config.retry.max_attempts @@ -355,8 +430,8 @@ class HttpRequestNode(BaseNode): try: request_func = self._get_client_method(client) resp = await request_func( - url=self._render_template(self.typed_config.url, variable_pool), - **(await self._build_content(variable_pool)) + url=url, + **content ) resp.raise_for_status() logger.info(f"Node {self.node_id}: HTTP request succeeded") @@ -365,7 +440,8 @@ class HttpRequestNode(BaseNode): body=response.body, status_code=resp.status_code, headers=resp.headers, - files=response.files + files=response.files, + curl=curl_command ).model_dump() except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error(f"HTTP request node exception: {e}") @@ -382,10 +458,20 @@ class HttpRequestNode(BaseNode): logger.warning( f"Node {self.node_id}: HTTP request failed, returning default result" ) - return self.typed_config.error_handle.default.model_dump() + # Update curl command in default error template + error_result = self.typed_config.error_handle.default.model_dump() + error_result["curl"] = curl_command + return error_result case HttpErrorHandle.BRANCH: logger.warning( f"Node {self.node_id}: HTTP request failed, switching to error handling branch" ) - return {"output": "ERROR"} + return { + "output": "ERROR", + "body": "", + "status_code": 500, + "headers": {}, + "files": [], + "curl": curl_command + } raise RuntimeError("http request failed") diff --git a/api/app/dependencies.py b/api/app/dependencies.py index 10684788..e5b656a5 100644 --- a/api/app/dependencies.py +++ b/api/app/dependencies.py @@ -564,6 +564,7 @@ async def get_app_or_workspace( if not app: auth_logger.warning(f"App not found for API Key: {api_key_obj.resource_id}") raise credentials_exception + ApiKeyAuthService.check_app_published(db, api_key_obj) auth_logger.info(f"App access granted: {app.id}") return app diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py index 0676a255..2b4e7ab7 100644 --- a/api/app/repositories/conversation_repository.py +++ b/api/app/repositories/conversation_repository.py @@ -203,7 +203,8 @@ class ConversationRepository: self, app_id: uuid.UUID, workspace_id: uuid.UUID, - is_draft: Optional[bool] = None, + is_draft: bool = False, + keyword: Optional[str] = None, page: int = 1, pagesize: int = 20 ) -> tuple[list[Conversation], int]: @@ -213,29 +214,38 @@ class ConversationRepository: Args: app_id: 应用 ID workspace_id: 工作空间 ID - is_draft: 是否草稿会话(None 表示不过滤) + is_draft: 是否草稿会话(默认False,即发布会话) + keyword: 搜索关键词(匹配消息内容) page: 页码(从 1 开始) pagesize: 每页数量 Returns: Tuple[List[Conversation], int]: (会话列表,总数) """ - stmt = select(Conversation).where( + base_stmt = select(Conversation).where( Conversation.app_id == app_id, Conversation.workspace_id == workspace_id, - Conversation.is_active.is_(True) + Conversation.is_active.is_(True), + Conversation.is_draft == is_draft ) - if is_draft is not None: - stmt = stmt.where(Conversation.is_draft == is_draft) + # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation + if keyword: + # 查找包含关键词的 conversation_id 列表 + keyword_stmt = ( + select(Message.conversation_id) + .where(Message.content.ilike(f"%{keyword}%")) + .distinct() + ) + base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt)) # Calculate total number of records total = int(self.db.execute( - select(func.count()).select_from(stmt.subquery()) + select(func.count()).select_from(base_stmt.subquery()) ).scalar_one()) # Apply pagination - stmt = stmt.order_by(desc(Conversation.updated_at)) + stmt = base_stmt.order_by(desc(Conversation.updated_at)) stmt = stmt.offset((page - 1) * pagesize).limit(pagesize) conversations = list(self.db.scalars(stmt).all()) @@ -245,6 +255,7 @@ class ConversationRepository: extra={ "app_id": str(app_id), "workspace_id": str(workspace_id), + "keyword": keyword, "returned": len(conversations), "total": total } diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index bda78138..0a1855f7 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -48,6 +48,20 @@ class AppLogConversation(BaseModel): return int(dt.timestamp() * 1000) if dt else None +class AppLogNodeExecution(BaseModel): + """工作流节点执行记录""" + node_id: str + node_type: str + node_name: Optional[str] = None + status: str = "pending" + error: Optional[str] = None + input: Optional[Any] = None + output: Optional[Any] = None + elapsed_time: Optional[float] = None + token_usage: Optional[Dict[str, Any]] = None + + class AppLogConversationDetail(AppLogConversation): """会话详情(包含消息列表)""" messages: List[AppLogMessage] = Field(default_factory=list) + node_executions_map: Dict[str, List[AppLogNodeExecution]] = Field(default_factory=dict, description="按消息ID分组的节点执行记录") diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 4856365a..49b07121 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -19,6 +19,7 @@ from app.core.exceptions import ( ) from app.core.error_codes import BizCode from app.core.logging_config import get_business_logger +from app.models.app_model import App logger = get_business_logger() @@ -442,6 +443,17 @@ class ApiKeyAuthService: return api_key_obj + @staticmethod + def check_app_published(db: Session, api_key_obj: ApiKey) -> None: + """ + 检查应用是否已发布,未发布则抛出异常 + """ + if not api_key_obj.resource_id: + return + app = db.get(App, api_key_obj.resource_id) + if not app or not app.current_release_id: + raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED) + @staticmethod def check_scope(api_key: ApiKey, required_scope: str) -> bool: """检查权限范围""" diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 856045d1..8f6752ef 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -3,11 +3,14 @@ import uuid from typing import Optional, Tuple from datetime import datetime +from sqlalchemy import select from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger from app.models.conversation_model import Conversation, Message +from app.models.workflow_model import WorkflowExecution from app.repositories.conversation_repository import ConversationRepository, MessageRepository +from app.schemas.app_log_schema import AppLogNodeExecution logger = get_business_logger() @@ -26,7 +29,8 @@ class AppLogService: workspace_id: uuid.UUID, page: int = 1, pagesize: int = 20, - is_draft: Optional[bool] = None, + is_draft: bool = False, + keyword: Optional[str] = None, ) -> Tuple[list[Conversation], int]: """ 查询应用日志会话列表 @@ -36,7 +40,8 @@ class AppLogService: workspace_id: 工作空间 ID page: 页码(从 1 开始) pagesize: 每页数量 - is_draft: 是否草稿会话(None 表示不过滤) + is_draft: 是否草稿会话(默认False,即发布会话) + keyword: 搜索关键词(匹配消息内容) Returns: Tuple[list[Conversation], int]: (会话列表,总数) @@ -48,7 +53,8 @@ class AppLogService: "workspace_id": str(workspace_id), "page": page, "pagesize": pagesize, - "is_draft": is_draft + "is_draft": is_draft, + "keyword": keyword } ) @@ -57,6 +63,7 @@ class AppLogService: app_id=app_id, workspace_id=workspace_id, is_draft=is_draft, + keyword=keyword, page=page, pagesize=pagesize ) @@ -77,9 +84,9 @@ class AppLogService: app_id: uuid.UUID, conversation_id: uuid.UUID, workspace_id: uuid.UUID - ) -> Conversation: + ) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: """ - 查询会话详情(包含消息) + 查询会话详情(包含消息和工作流节点执行记录) Args: app_id: 应用 ID @@ -87,7 +94,8 @@ class AppLogService: workspace_id: 工作空间 ID Returns: - Conversation: 包含消息的会话对象 + Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: + (包含消息的会话对象, 按消息ID分组的节点执行记录) Raises: ResourceNotFoundException: 当会话不存在时 @@ -116,13 +124,116 @@ class AppLogService: # 将消息附加到会话对象 conversation.messages = messages + # 查询工作流节点执行记录(按消息分组) + _, node_executions_map = self._get_workflow_node_executions_with_map( + conversation_id, messages + ) + logger.info( "查询应用日志会话详情成功", extra={ "app_id": str(app_id), "conversation_id": str(conversation_id), - "message_count": len(messages) + "message_count": len(messages), + "message_with_nodes_count": len(node_executions_map) } ) - return conversation + return conversation, node_executions_map + + def _get_workflow_node_executions_with_map( + self, + conversation_id: uuid.UUID, + messages: list[Message] + ) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + """ + 从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组 + + Args: + conversation_id: 会话 ID + messages: 消息列表 + + Returns: + Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + (所有节点执行记录列表, 按 message_id 分组的节点执行记录字典) + """ + node_executions = [] + node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + + # 查询该会话关联的所有工作流执行记录(按时间正序) + stmt = select(WorkflowExecution).where( + WorkflowExecution.conversation_id == conversation_id, + WorkflowExecution.status == "completed" + ).order_by(WorkflowExecution.started_at.asc()) + + executions = self.db.scalars(stmt).all() + + logger.info( + f"查询到 {len(executions)} 条工作流执行记录", + extra={ + "conversation_id": str(conversation_id), + "execution_count": len(executions), + "execution_ids": [str(e.id) for e in executions] + } + ) + + # 筛选出 workflow 执行产生的 assistant 消息(排除开场白) + # workflow 结果的 meta_data 包含 usage,而开场白包含 suggested_questions + assistant_messages = [ + m for m in messages + if m.role == "assistant" and m.meta_data and "usage" in m.meta_data + ] + + # 通过时序匹配,将 execution 和 assistant message 关联 + used_message_ids: set[str] = set() + + for execution in executions: + if not execution.output_data: + continue + + # 找到该 execution 对应的 assistant message + # 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message + best_msg = None + best_dt = None + for msg in assistant_messages: + msg_id_str = str(msg.id) + if msg_id_str in used_message_ids: + continue + if msg.created_at and msg.created_at >= execution.started_at: + dt = (msg.created_at - execution.started_at).total_seconds() + if best_dt is None or dt < best_dt: + best_dt = dt + best_msg = msg + + if not best_msg: + continue + + msg_id_str = str(best_msg.id) + used_message_ids.add(msg_id_str) + + # 提取节点输出 + output_data = execution.output_data + if isinstance(output_data, dict): + node_outputs = output_data.get("node_outputs", {}) + execution_nodes = [] + for node_id, node_data in node_outputs.items(): + if not isinstance(node_data, dict): + continue + node_execution = AppLogNodeExecution( + node_id=node_data.get("node_id", node_id), + node_type=node_data.get("node_type", "unknown"), + node_name=node_data.get("node_name"), + status=node_data.get("status", "unknown"), + error=node_data.get("error"), + input=node_data.get("input"), + output=node_data.get("output"), + elapsed_time=node_data.get("elapsed_time"), + token_usage=node_data.get("token_usage"), + ) + node_executions.append(node_execution) + execution_nodes.append(node_execution) + + # 将节点记录关联到 message_id + node_executions_map[msg_id_str] = execution_nodes + + return node_executions, node_executions_map diff --git a/api/app/services/workflow_import_service.py b/api/app/services/workflow_import_service.py index 5a766a72..0c543d1f 100644 --- a/api/app/services/workflow_import_service.py +++ b/api/app/services/workflow_import_service.py @@ -14,6 +14,7 @@ from app.core.exceptions import BusinessException from app.core.workflow.adapters.base_adapter import WorkflowImportResult, WorkflowParserResult from app.core.workflow.adapters.errors import UnsupportedPlatform, InvalidConfiguration from app.core.workflow.adapters.registry import PlatformAdapterRegistry +from app.models.app_model import AppType from app.schemas import AppCreate from app.schemas.workflow_schema import WorkflowConfigCreate from app.services.app_service import AppService @@ -86,11 +87,12 @@ class WorkflowImportService: if config is None: raise BusinessException("Configuration import timed out. Please try again.") config = json.loads(config) + unique_name = self.app_service._unique_app_name(name, workspace_id, AppType.WORKFLOW) app = self.app_service.create_app( user_id=user_id, workspace_id=workspace_id, data=AppCreate( - name=name, + name=unique_name, description=description, type="workflow", workflow_config=WorkflowConfigCreate( From 08a455f6b34f3383d1c2492753d35f07065732d9 Mon Sep 17 00:00:00 2001 From: wwq Date: Thu, 23 Apr 2026 18:20:05 +0800 Subject: [PATCH 2/5] feat(workflow): enhance HTTP request node with curl debugging support --- .../core/workflow/nodes/http_request/config.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/api/app/core/workflow/nodes/http_request/config.py b/api/app/core/workflow/nodes/http_request/config.py index 527a80ff..b854f460 100644 --- a/api/app/core/workflow/nodes/http_request/config.py +++ b/api/app/core/workflow/nodes/http_request/config.py @@ -132,22 +132,11 @@ class HttpErrorDefaultTemplate(BaseModel): description="Default HTTP headers returned on error", ) - files: list = Field( - default_factory=list, - description="Default files list returned on error", - ) - output: str = Field( default="SUCCESS", description="HTTP response body", ) - curl: str = Field( - default="", - description="Default curl command returned on error", - ) - - class HttpErrorHandleConfig(BaseModel): method: HttpErrorHandle = Field( default=HttpErrorHandle.NONE, @@ -282,11 +271,6 @@ class HttpRequestNodeOutput(BaseModel): description="HTTP response body", ) - curl: str = Field( - default="", - description="Equivalent curl command for the HTTP request", - ) - # files: list[File] = Field( # ... # ) From f6cf53f81c83488a39c956ae305c0adb752155f7 Mon Sep 17 00:00:00 2001 From: wwq Date: Thu, 23 Apr 2026 18:24:19 +0800 Subject: [PATCH 3/5] feat(workflow): enhance HTTP request node with curl debugging support --- .../workflow/nodes/http_request/config.py | 1 + .../core/workflow/nodes/http_request/node.py | 87 ++----------------- 2 files changed, 8 insertions(+), 80 deletions(-) diff --git a/api/app/core/workflow/nodes/http_request/config.py b/api/app/core/workflow/nodes/http_request/config.py index b854f460..72474436 100644 --- a/api/app/core/workflow/nodes/http_request/config.py +++ b/api/app/core/workflow/nodes/http_request/config.py @@ -137,6 +137,7 @@ class HttpErrorDefaultTemplate(BaseModel): description="HTTP response body", ) + class HttpErrorHandleConfig(BaseModel): method: HttpErrorHandle = Field( default=HttpErrorHandle.NONE, diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index f08e0bc1..719ca626 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -167,8 +167,7 @@ class HttpRequestNode(BaseNode): "status_code": VariableType.NUMBER, "headers": VariableType.OBJECT, "files": VariableType.ARRAY_FILE, - "output": VariableType.STRING, - "curl": VariableType.STRING + "output": VariableType.STRING } def _build_timeout(self) -> Timeout: @@ -256,13 +255,9 @@ class HttpRequestNode(BaseNode): case HttpContentType.NONE: return {} case HttpContentType.JSON: - rendered_body = self._render_template( - self.typed_config.body.data, variable_pool - ).strip() - if not rendered_body: - content["json"] = {} - else: - content["json"] = json.loads(rendered_body) + content["json"] = json.loads(self._render_template( + )) + case HttpContentType.FROM_DATA: data = {} files = [] @@ -330,65 +325,6 @@ class HttpRequestNode(BaseNode): case _: raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}") - def _generate_curl_command( - self, - variable_pool: VariablePool, - url: str, - headers: dict[str, str], - params: dict[str, str], - content: dict[str, Any] - ) -> str: - """ - Generate equivalent curl command for debugging. - - Args: - variable_pool: Variable Pool - url: Rendered URL - headers: Request headers - params: Query parameters - content: Request body content - - Returns: - Curl command string - """ - # Start with curl command - curl_parts = ["curl"] - - # Add HTTP method - method = self.typed_config.method.value - if method != "GET": - curl_parts.append(f"-X {method}") - - # Add URL with query parameters - if params: - param_str = "&".join([f"{k}={v}" for k, v in params.items()]) - full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}" - else: - full_url = url - - curl_parts.append(f"'{full_url}'") - - # Add headers - for key, value in headers.items(): - curl_parts.append(f"-H '{key}: {value}'") - - # Add body based on content type - if "json" in content: - json_body = json.dumps(content["json"], ensure_ascii=False) - curl_parts.append(f"-d '{json_body}'") - elif "data" in content and "files" not in content: - # Form data - if isinstance(content["data"], dict): - for key, value in content["data"].items(): - curl_parts.append(f"-F '{key}={value}'") - elif "content" in content: - # Raw content - curl_parts.append(f"-d '{content['content']}'") - elif "files" in content: - curl_parts.append("# Note: This request includes file uploads") - - return " \\\n ".join(curl_parts) - async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str: """ Execute the HTTP request node. @@ -407,17 +343,13 @@ class HttpRequestNode(BaseNode): - str: Branch identifier (e.g. "ERROR") when branching is enabled """ self.typed_config = HttpRequestNodeConfig(**self.config) - + # Build request components headers = self._build_header(variable_pool) | self._build_auth(variable_pool) params = self._build_params(variable_pool) content = await self._build_content(variable_pool) url = self._render_template(self.typed_config.url, variable_pool) - - # Generate curl command for debugging - curl_command = self._generate_curl_command(variable_pool, url, headers, params, content) - logger.info(f"Node {self.node_id}: Generated curl command:\n{curl_command}") - + async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), @@ -441,7 +373,6 @@ class HttpRequestNode(BaseNode): status_code=resp.status_code, headers=resp.headers, files=response.files, - curl=curl_command ).model_dump() except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error(f"HTTP request node exception: {e}") @@ -458,10 +389,7 @@ class HttpRequestNode(BaseNode): logger.warning( f"Node {self.node_id}: HTTP request failed, returning default result" ) - # Update curl command in default error template - error_result = self.typed_config.error_handle.default.model_dump() - error_result["curl"] = curl_command - return error_result + return self.typed_config.error_handle.default.model_dump() case HttpErrorHandle.BRANCH: logger.warning( f"Node {self.node_id}: HTTP request failed, switching to error handling branch" @@ -472,6 +400,5 @@ class HttpRequestNode(BaseNode): "status_code": 500, "headers": {}, "files": [], - "curl": curl_command } raise RuntimeError("http request failed") From 5f39d9a2086a091b5b8c4ce0bfd8e04e174f3470 Mon Sep 17 00:00:00 2001 From: wwq Date: Thu, 23 Apr 2026 18:26:49 +0800 Subject: [PATCH 4/5] feat(workflow): enhance HTTP request node with curl debugging support --- .../core/workflow/nodes/http_request/node.py | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index 719ca626..f3367230 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -256,6 +256,7 @@ class HttpRequestNode(BaseNode): return {} case HttpContentType.JSON: content["json"] = json.loads(self._render_template( + self.typed_config.body.data, variable_pool )) case HttpContentType.FROM_DATA: @@ -343,18 +344,11 @@ class HttpRequestNode(BaseNode): - str: Branch identifier (e.g. "ERROR") when branching is enabled """ self.typed_config = HttpRequestNodeConfig(**self.config) - - # Build request components - headers = self._build_header(variable_pool) | self._build_auth(variable_pool) - params = self._build_params(variable_pool) - content = await self._build_content(variable_pool) - url = self._render_template(self.typed_config.url, variable_pool) - async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), - headers=headers, - params=params, + headers=self._build_header(variable_pool) | self._build_auth(variable_pool), + params=self._build_params(variable_pool), follow_redirects=True ) as client: retries = self.typed_config.retry.max_attempts @@ -362,8 +356,8 @@ class HttpRequestNode(BaseNode): try: request_func = self._get_client_method(client) resp = await request_func( - url=url, - **content + url=self._render_template(self.typed_config.url, variable_pool), + **(await self._build_content(variable_pool)) ) resp.raise_for_status() logger.info(f"Node {self.node_id}: HTTP request succeeded") @@ -372,7 +366,7 @@ class HttpRequestNode(BaseNode): body=response.body, status_code=resp.status_code, headers=resp.headers, - files=response.files, + files=response.files ).model_dump() except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error(f"HTTP request node exception: {e}") @@ -394,11 +388,5 @@ class HttpRequestNode(BaseNode): logger.warning( f"Node {self.node_id}: HTTP request failed, switching to error handling branch" ) - return { - "output": "ERROR", - "body": "", - "status_code": 500, - "headers": {}, - "files": [], - } + return {"output": "ERROR"} raise RuntimeError("http request failed") From fb23c34475db5cc6c25281878553f4e7d2cc0bd9 Mon Sep 17 00:00:00 2001 From: wwq Date: Thu, 23 Apr 2026 20:55:34 +0800 Subject: [PATCH 5/5] feat: enhance HTTP request debugging and extend logging data - feat(http_request): augment debugging capabilities with raw request generation and improved error handling. - feat(app_log): extend session filtering logic to support retrieving all session types. - feat(log): add 'process' field to node execution records for better data tracking. --- api/app/controllers/app_log_controller.py | 7 +- .../workflow/engine/event_stream_handler.py | 4 +- .../workflow/nodes/http_request/config.py | 12 ++ .../core/workflow/nodes/http_request/node.py | 157 ++++++++++++++++-- .../repositories/conversation_repository.py | 13 +- api/app/schemas/app_log_schema.py | 1 + api/app/services/app_log_service.py | 5 +- 7 files changed, 173 insertions(+), 26 deletions(-) diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index 7732dd37..dea555b9 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -24,17 +24,18 @@ def list_app_logs( app_id: uuid.UUID, page: int = Query(1, ge=1), pagesize: int = Query(20, ge=1, le=100), - is_draft: bool = Query(False, description="是否草稿会话(默认false,即发布会话)"), + is_draft: Optional[bool] = Query(None, description="是否草稿会话(不传则返回全部)"), keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """查看应用下所有会话记录(分页) - - 支持按 is_draft 筛选(草稿会话 / 发布会话) + - is_draft 不传则返回所有会话(草稿 + 正式) + - is_draft=True 只返回草稿会话 + - is_draft=False 只返回发布会话 - 支持按 keyword 搜索(匹配消息内容) - 按最新更新时间倒序排列 - - 所有人(包括共享者和被共享者)都只能查看自己的会话记录 """ workspace_id = current_user.current_workspace_id diff --git a/api/app/core/workflow/engine/event_stream_handler.py b/api/app/core/workflow/engine/event_stream_handler.py index dc3cd04d..8012c41d 100644 --- a/api/app/core/workflow/engine/event_stream_handler.py +++ b/api/app/core/workflow/engine/event_stream_handler.py @@ -167,8 +167,9 @@ class EventStreamHandler: "node_id": node_id, "status": "failed", "input": data.get("input_data"), - "elapsed_time": data.get("elapsed_time"), "output": None, + "process": data.get("process_data"), + "elapsed_time": data.get("elapsed_time"), "error": data.get("error") } } @@ -266,6 +267,7 @@ class EventStreamHandler: ).timestamp() * 1000), "input": result.get("node_outputs", {}).get(node_name, {}).get("input"), "output": result.get("node_outputs", {}).get(node_name, {}).get("output"), + "process": result.get("node_outputs", {}).get(node_name, {}).get("process"), "elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"), "token_usage": result.get("node_outputs", {}).get(node_name, {}).get("token_usage") } diff --git a/api/app/core/workflow/nodes/http_request/config.py b/api/app/core/workflow/nodes/http_request/config.py index 72474436..a6d16912 100644 --- a/api/app/core/workflow/nodes/http_request/config.py +++ b/api/app/core/workflow/nodes/http_request/config.py @@ -132,6 +132,11 @@ class HttpErrorDefaultTemplate(BaseModel): description="Default HTTP headers returned on error", ) + files: list = Field( + default_factory=list, + description="Default files list returned on error", + ) + output: str = Field( default="SUCCESS", description="HTTP response body", @@ -246,6 +251,13 @@ class HttpRequestNodeConfig(BaseNodeConfig): } +class HttpRequestDataProcessing(BaseModel): + request: str = Field( + default="", + description="Raw HTTP request format for debugging", + ) + + class HttpRequestNodeOutput(BaseModel): body: str = Field( ..., diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index f3367230..cfdb95b0 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -160,6 +160,7 @@ class HttpRequestNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any], down_stream_nodes: list[str]): super().__init__(node_config, workflow_config, down_stream_nodes) self.typed_config: HttpRequestNodeConfig | None = None + self.last_request: str = "" def _output_types(self) -> dict[str, VariableType]: return { @@ -170,6 +171,47 @@ class HttpRequestNode(BaseNode): "output": VariableType.STRING } + def _extract_output(self, business_result: Any) -> Any: + if isinstance(business_result, dict): + result = {k: v for k, v in business_result.items() if k != "request"} + return result + return business_result + + def _extract_extra_fields(self, business_result: Any) -> dict[str, Any]: + if isinstance(business_result, dict) and "request" in business_result: + return { + "process": { + "request": business_result.get("request", "") + } + } + return {} + + def _wrap_error( + self, + error_message: str, + elapsed_time: float, + state: WorkflowState, + variable_pool: VariablePool + ) -> dict[str, Any]: + input_data = self._extract_input(state, variable_pool) + node_output = { + "node_id": self.node_id, + "node_type": self.node_type, + "node_name": self.node_name, + "status": "failed", + "input": input_data, + "output": None, + "process": {"request": self.last_request} if self.last_request else None, + "elapsed_time": elapsed_time, + "token_usage": None, + "error": error_message + } + return { + "node_outputs": {self.node_id: node_output}, + "error": error_message, + "error_node": self.node_id + } + def _build_timeout(self) -> Timeout: """ Build httpx Timeout configuration. @@ -255,10 +297,13 @@ class HttpRequestNode(BaseNode): case HttpContentType.NONE: return {} case HttpContentType.JSON: - content["json"] = json.loads(self._render_template( + rendered_body = self._render_template( self.typed_config.body.data, variable_pool - )) - + ).strip() + if not rendered_body: + content["json"] = {} + else: + content["json"] = json.loads(rendered_body) case HttpContentType.FROM_DATA: data = {} files = [] @@ -326,6 +371,62 @@ class HttpRequestNode(BaseNode): case _: raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}") + def _generate_raw_request( + self, + variable_pool: VariablePool, + url: str, + headers: dict[str, str], + params: dict[str, str], + content: dict[str, Any] + ) -> str: + """ + Generate raw HTTP request format for debugging. + + Args: + variable_pool: Variable Pool + url: Rendered URL + headers: Request headers + params: Query parameters + content: Request body content + + Returns: + Raw HTTP request string + """ + method = self.typed_config.method.value + + if params: + param_str = "&".join([f"{k}={v}" for k, v in params.items()]) + full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}" + else: + full_url = url + + lines = [f"{method} {full_url} HTTP/1.1"] + + for key, value in headers.items(): + lines.append(f"{key}: {value}") + + if "json" in content and content["json"]: + json_body = json.dumps(content["json"], ensure_ascii=False) + lines.append(f"Content-Length: {len(json_body)}") + lines.append("") + lines.append(json_body) + elif "data" in content and "files" not in content: + if isinstance(content["data"], dict): + body_str = "&".join([f"{k}={v}" for k, v in content["data"].items()]) + lines.append(f"Content-Length: {len(body_str)}") + lines.append("") + lines.append(body_str) + elif "content" in content: + lines.append(f"Content-Length: {len(content['content'])}") + lines.append("") + lines.append(content["content"]) + elif "files" in content: + lines.append("Content-Length: 0") + lines.append("") + lines.append("# Note: This request includes file uploads") + + return "\r\n".join(lines) + async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str: """ Execute the HTTP request node. @@ -344,11 +445,25 @@ class HttpRequestNode(BaseNode): - str: Branch identifier (e.g. "ERROR") when branching is enabled """ self.typed_config = HttpRequestNodeConfig(**self.config) + + # Build request components + headers = self._build_header(variable_pool) | self._build_auth(variable_pool) + params = self._build_params(variable_pool) + content = await self._build_content(variable_pool) + url = self._render_template(self.typed_config.url, variable_pool) + + logger.info(f"Node {self.node_id}: headers={headers}, params={params}, content keys={list(content.keys())}") + + # Generate raw HTTP request for debugging + raw_request = self._generate_raw_request(variable_pool, url, headers, params, content) + self.last_request = raw_request + logger.info(f"Node {self.node_id}: Generated HTTP request:\n{raw_request}") + async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), - headers=self._build_header(variable_pool) | self._build_auth(variable_pool), - params=self._build_params(variable_pool), + headers=headers, + params=params, follow_redirects=True ) as client: retries = self.typed_config.retry.max_attempts @@ -356,18 +471,21 @@ class HttpRequestNode(BaseNode): try: request_func = self._get_client_method(client) resp = await request_func( - url=self._render_template(self.typed_config.url, variable_pool), - **(await self._build_content(variable_pool)) + url=url, + **content ) resp.raise_for_status() logger.info(f"Node {self.node_id}: HTTP request succeeded") response = HttpResponse(resp) - return HttpRequestNodeOutput( - body=response.body, - status_code=resp.status_code, - headers=resp.headers, - files=response.files - ).model_dump() + return { + **HttpRequestNodeOutput( + body=response.body, + status_code=resp.status_code, + headers=resp.headers, + files=response.files + ).model_dump(), + "request": raw_request + } except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error(f"HTTP request node exception: {e}") retries -= 1 @@ -383,10 +501,19 @@ class HttpRequestNode(BaseNode): logger.warning( f"Node {self.node_id}: HTTP request failed, returning default result" ) - return self.typed_config.error_handle.default.model_dump() + error_result = self.typed_config.error_handle.default.model_dump() + error_result["request"] = raw_request + return error_result case HttpErrorHandle.BRANCH: logger.warning( f"Node {self.node_id}: HTTP request failed, switching to error handling branch" ) - return {"output": "ERROR"} + return { + "output": "ERROR", + "body": "", + "status_code": 500, + "headers": {}, + "files": [], + "request": raw_request + } raise RuntimeError("http request failed") diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py index 2b4e7ab7..129e1f02 100644 --- a/api/app/repositories/conversation_repository.py +++ b/api/app/repositories/conversation_repository.py @@ -203,7 +203,7 @@ class ConversationRepository: self, app_id: uuid.UUID, workspace_id: uuid.UUID, - is_draft: bool = False, + is_draft: Optional[bool] = None, keyword: Optional[str] = None, page: int = 1, pagesize: int = 20 @@ -214,7 +214,7 @@ class ConversationRepository: Args: app_id: 应用 ID workspace_id: 工作空间 ID - is_draft: 是否草稿会话(默认False,即发布会话) + is_draft: 是否草稿会话(None表示返回全部) keyword: 搜索关键词(匹配消息内容) page: 页码(从 1 开始) pagesize: 每页数量 @@ -222,12 +222,15 @@ class ConversationRepository: Returns: Tuple[List[Conversation], int]: (会话列表,总数) """ - base_stmt = select(Conversation).where( + base_conditions = [ Conversation.app_id == app_id, Conversation.workspace_id == workspace_id, Conversation.is_active.is_(True), - Conversation.is_draft == is_draft - ) + ] + if is_draft is not None: + base_conditions.append(Conversation.is_draft == is_draft) + + base_stmt = select(Conversation).where(*base_conditions) # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation if keyword: diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index 0a1855f7..a60a8428 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -56,6 +56,7 @@ class AppLogNodeExecution(BaseModel): status: str = "pending" error: Optional[str] = None input: Optional[Any] = None + process: Optional[Any] = None output: Optional[Any] = None elapsed_time: Optional[float] = None token_usage: Optional[Dict[str, Any]] = None diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 8f6752ef..8f5052e6 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -29,7 +29,7 @@ class AppLogService: workspace_id: uuid.UUID, page: int = 1, pagesize: int = 20, - is_draft: bool = False, + is_draft: Optional[bool] = None, keyword: Optional[str] = None, ) -> Tuple[list[Conversation], int]: """ @@ -40,7 +40,7 @@ class AppLogService: workspace_id: 工作空间 ID page: 页码(从 1 开始) pagesize: 每页数量 - is_draft: 是否草稿会话(默认False,即发布会话) + is_draft: 是否草稿会话(None表示返回全部) keyword: 搜索关键词(匹配消息内容) Returns: @@ -226,6 +226,7 @@ class AppLogService: status=node_data.get("status", "unknown"), error=node_data.get("error"), input=node_data.get("input"), + process=node_data.get("process"), output=node_data.get("output"), elapsed_time=node_data.get("elapsed_time"), token_usage=node_data.get("token_usage"),