diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index 7732dd37..dea555b9 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -24,17 +24,18 @@ def list_app_logs( app_id: uuid.UUID, page: int = Query(1, ge=1), pagesize: int = Query(20, ge=1, le=100), - is_draft: bool = Query(False, description="是否草稿会话(默认false,即发布会话)"), + is_draft: Optional[bool] = Query(None, description="是否草稿会话(不传则返回全部)"), keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """查看应用下所有会话记录(分页) - - 支持按 is_draft 筛选(草稿会话 / 发布会话) + - is_draft 不传则返回所有会话(草稿 + 正式) + - is_draft=True 只返回草稿会话 + - is_draft=False 只返回发布会话 - 支持按 keyword 搜索(匹配消息内容) - 按最新更新时间倒序排列 - - 所有人(包括共享者和被共享者)都只能查看自己的会话记录 """ workspace_id = current_user.current_workspace_id diff --git a/api/app/core/workflow/engine/event_stream_handler.py b/api/app/core/workflow/engine/event_stream_handler.py index dc3cd04d..8012c41d 100644 --- a/api/app/core/workflow/engine/event_stream_handler.py +++ b/api/app/core/workflow/engine/event_stream_handler.py @@ -167,8 +167,9 @@ class EventStreamHandler: "node_id": node_id, "status": "failed", "input": data.get("input_data"), - "elapsed_time": data.get("elapsed_time"), "output": None, + "process": data.get("process_data"), + "elapsed_time": data.get("elapsed_time"), "error": data.get("error") } } @@ -266,6 +267,7 @@ class EventStreamHandler: ).timestamp() * 1000), "input": result.get("node_outputs", {}).get(node_name, {}).get("input"), "output": result.get("node_outputs", {}).get(node_name, {}).get("output"), + "process": result.get("node_outputs", {}).get(node_name, {}).get("process"), "elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"), "token_usage": result.get("node_outputs", {}).get(node_name, {}).get("token_usage") } diff --git a/api/app/core/workflow/nodes/http_request/config.py b/api/app/core/workflow/nodes/http_request/config.py index 72474436..a6d16912 100644 --- a/api/app/core/workflow/nodes/http_request/config.py +++ b/api/app/core/workflow/nodes/http_request/config.py @@ -132,6 +132,11 @@ class HttpErrorDefaultTemplate(BaseModel): description="Default HTTP headers returned on error", ) + files: list = Field( + default_factory=list, + description="Default files list returned on error", + ) + output: str = Field( default="SUCCESS", description="HTTP response body", @@ -246,6 +251,13 @@ class HttpRequestNodeConfig(BaseNodeConfig): } +class HttpRequestDataProcessing(BaseModel): + request: str = Field( + default="", + description="Raw HTTP request format for debugging", + ) + + class HttpRequestNodeOutput(BaseModel): body: str = Field( ..., diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index f3367230..cfdb95b0 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -160,6 +160,7 @@ class HttpRequestNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any], down_stream_nodes: list[str]): super().__init__(node_config, workflow_config, down_stream_nodes) self.typed_config: HttpRequestNodeConfig | None = None + self.last_request: str = "" def _output_types(self) -> dict[str, VariableType]: return { @@ -170,6 +171,47 @@ class HttpRequestNode(BaseNode): "output": VariableType.STRING } + def _extract_output(self, business_result: Any) -> Any: + if isinstance(business_result, dict): + result = {k: v for k, v in business_result.items() if k != "request"} + return result + return business_result + + def _extract_extra_fields(self, business_result: Any) -> dict[str, Any]: + if isinstance(business_result, dict) and "request" in business_result: + return { + "process": { + "request": business_result.get("request", "") + } + } + return {} + + def _wrap_error( + self, + error_message: str, + elapsed_time: float, + state: WorkflowState, + variable_pool: VariablePool + ) -> dict[str, Any]: + input_data = self._extract_input(state, variable_pool) + node_output = { + "node_id": self.node_id, + "node_type": self.node_type, + "node_name": self.node_name, + "status": "failed", + "input": input_data, + "output": None, + "process": {"request": self.last_request} if self.last_request else None, + "elapsed_time": elapsed_time, + "token_usage": None, + "error": error_message + } + return { + "node_outputs": {self.node_id: node_output}, + "error": error_message, + "error_node": self.node_id + } + def _build_timeout(self) -> Timeout: """ Build httpx Timeout configuration. @@ -255,10 +297,13 @@ class HttpRequestNode(BaseNode): case HttpContentType.NONE: return {} case HttpContentType.JSON: - content["json"] = json.loads(self._render_template( + rendered_body = self._render_template( self.typed_config.body.data, variable_pool - )) - + ).strip() + if not rendered_body: + content["json"] = {} + else: + content["json"] = json.loads(rendered_body) case HttpContentType.FROM_DATA: data = {} files = [] @@ -326,6 +371,62 @@ class HttpRequestNode(BaseNode): case _: raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}") + def _generate_raw_request( + self, + variable_pool: VariablePool, + url: str, + headers: dict[str, str], + params: dict[str, str], + content: dict[str, Any] + ) -> str: + """ + Generate raw HTTP request format for debugging. + + Args: + variable_pool: Variable Pool + url: Rendered URL + headers: Request headers + params: Query parameters + content: Request body content + + Returns: + Raw HTTP request string + """ + method = self.typed_config.method.value + + if params: + param_str = "&".join([f"{k}={v}" for k, v in params.items()]) + full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}" + else: + full_url = url + + lines = [f"{method} {full_url} HTTP/1.1"] + + for key, value in headers.items(): + lines.append(f"{key}: {value}") + + if "json" in content and content["json"]: + json_body = json.dumps(content["json"], ensure_ascii=False) + lines.append(f"Content-Length: {len(json_body)}") + lines.append("") + lines.append(json_body) + elif "data" in content and "files" not in content: + if isinstance(content["data"], dict): + body_str = "&".join([f"{k}={v}" for k, v in content["data"].items()]) + lines.append(f"Content-Length: {len(body_str)}") + lines.append("") + lines.append(body_str) + elif "content" in content: + lines.append(f"Content-Length: {len(content['content'])}") + lines.append("") + lines.append(content["content"]) + elif "files" in content: + lines.append("Content-Length: 0") + lines.append("") + lines.append("# Note: This request includes file uploads") + + return "\r\n".join(lines) + async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str: """ Execute the HTTP request node. @@ -344,11 +445,25 @@ class HttpRequestNode(BaseNode): - str: Branch identifier (e.g. "ERROR") when branching is enabled """ self.typed_config = HttpRequestNodeConfig(**self.config) + + # Build request components + headers = self._build_header(variable_pool) | self._build_auth(variable_pool) + params = self._build_params(variable_pool) + content = await self._build_content(variable_pool) + url = self._render_template(self.typed_config.url, variable_pool) + + logger.info(f"Node {self.node_id}: headers={headers}, params={params}, content keys={list(content.keys())}") + + # Generate raw HTTP request for debugging + raw_request = self._generate_raw_request(variable_pool, url, headers, params, content) + self.last_request = raw_request + logger.info(f"Node {self.node_id}: Generated HTTP request:\n{raw_request}") + async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), - headers=self._build_header(variable_pool) | self._build_auth(variable_pool), - params=self._build_params(variable_pool), + headers=headers, + params=params, follow_redirects=True ) as client: retries = self.typed_config.retry.max_attempts @@ -356,18 +471,21 @@ class HttpRequestNode(BaseNode): try: request_func = self._get_client_method(client) resp = await request_func( - url=self._render_template(self.typed_config.url, variable_pool), - **(await self._build_content(variable_pool)) + url=url, + **content ) resp.raise_for_status() logger.info(f"Node {self.node_id}: HTTP request succeeded") response = HttpResponse(resp) - return HttpRequestNodeOutput( - body=response.body, - status_code=resp.status_code, - headers=resp.headers, - files=response.files - ).model_dump() + return { + **HttpRequestNodeOutput( + body=response.body, + status_code=resp.status_code, + headers=resp.headers, + files=response.files + ).model_dump(), + "request": raw_request + } except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error(f"HTTP request node exception: {e}") retries -= 1 @@ -383,10 +501,19 @@ class HttpRequestNode(BaseNode): logger.warning( f"Node {self.node_id}: HTTP request failed, returning default result" ) - return self.typed_config.error_handle.default.model_dump() + error_result = self.typed_config.error_handle.default.model_dump() + error_result["request"] = raw_request + return error_result case HttpErrorHandle.BRANCH: logger.warning( f"Node {self.node_id}: HTTP request failed, switching to error handling branch" ) - return {"output": "ERROR"} + return { + "output": "ERROR", + "body": "", + "status_code": 500, + "headers": {}, + "files": [], + "request": raw_request + } raise RuntimeError("http request failed") diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py index 2b4e7ab7..129e1f02 100644 --- a/api/app/repositories/conversation_repository.py +++ b/api/app/repositories/conversation_repository.py @@ -203,7 +203,7 @@ class ConversationRepository: self, app_id: uuid.UUID, workspace_id: uuid.UUID, - is_draft: bool = False, + is_draft: Optional[bool] = None, keyword: Optional[str] = None, page: int = 1, pagesize: int = 20 @@ -214,7 +214,7 @@ class ConversationRepository: Args: app_id: 应用 ID workspace_id: 工作空间 ID - is_draft: 是否草稿会话(默认False,即发布会话) + is_draft: 是否草稿会话(None表示返回全部) keyword: 搜索关键词(匹配消息内容) page: 页码(从 1 开始) pagesize: 每页数量 @@ -222,12 +222,15 @@ class ConversationRepository: Returns: Tuple[List[Conversation], int]: (会话列表,总数) """ - base_stmt = select(Conversation).where( + base_conditions = [ Conversation.app_id == app_id, Conversation.workspace_id == workspace_id, Conversation.is_active.is_(True), - Conversation.is_draft == is_draft - ) + ] + if is_draft is not None: + base_conditions.append(Conversation.is_draft == is_draft) + + base_stmt = select(Conversation).where(*base_conditions) # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation if keyword: diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index 0a1855f7..a60a8428 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -56,6 +56,7 @@ class AppLogNodeExecution(BaseModel): status: str = "pending" error: Optional[str] = None input: Optional[Any] = None + process: Optional[Any] = None output: Optional[Any] = None elapsed_time: Optional[float] = None token_usage: Optional[Dict[str, Any]] = None diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 8f6752ef..8f5052e6 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -29,7 +29,7 @@ class AppLogService: workspace_id: uuid.UUID, page: int = 1, pagesize: int = 20, - is_draft: bool = False, + is_draft: Optional[bool] = None, keyword: Optional[str] = None, ) -> Tuple[list[Conversation], int]: """ @@ -40,7 +40,7 @@ class AppLogService: workspace_id: 工作空间 ID page: 页码(从 1 开始) pagesize: 每页数量 - is_draft: 是否草稿会话(默认False,即发布会话) + is_draft: 是否草稿会话(None表示返回全部) keyword: 搜索关键词(匹配消息内容) Returns: @@ -226,6 +226,7 @@ class AppLogService: status=node_data.get("status", "unknown"), error=node_data.get("error"), input=node_data.get("input"), + process=node_data.get("process"), output=node_data.get("output"), elapsed_time=node_data.get("elapsed_time"), token_usage=node_data.get("token_usage"),