feat: enhance HTTP request debugging and extend logging data
- feat(http_request): augment debugging capabilities with raw request generation and improved error handling. - feat(app_log): extend session filtering logic to support retrieving all session types. - feat(log): add 'process' field to node execution records for better data tracking.
This commit is contained in:
@@ -24,17 +24,18 @@ def list_app_logs(
|
|||||||
app_id: uuid.UUID,
|
app_id: uuid.UUID,
|
||||||
page: int = Query(1, ge=1),
|
page: int = Query(1, ge=1),
|
||||||
pagesize: int = Query(20, ge=1, le=100),
|
pagesize: int = Query(20, ge=1, le=100),
|
||||||
is_draft: bool = Query(False, description="是否草稿会话(默认false,即发布会话)"),
|
is_draft: Optional[bool] = Query(None, description="是否草稿会话(不传则返回全部)"),
|
||||||
keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"),
|
keyword: Optional[str] = Query(None, description="搜索关键词(匹配消息内容)"),
|
||||||
db: Session = Depends(get_db),
|
db: Session = Depends(get_db),
|
||||||
current_user=Depends(get_current_user),
|
current_user=Depends(get_current_user),
|
||||||
):
|
):
|
||||||
"""查看应用下所有会话记录(分页)
|
"""查看应用下所有会话记录(分页)
|
||||||
|
|
||||||
- 支持按 is_draft 筛选(草稿会话 / 发布会话)
|
- is_draft 不传则返回所有会话(草稿 + 正式)
|
||||||
|
- is_draft=True 只返回草稿会话
|
||||||
|
- is_draft=False 只返回发布会话
|
||||||
- 支持按 keyword 搜索(匹配消息内容)
|
- 支持按 keyword 搜索(匹配消息内容)
|
||||||
- 按最新更新时间倒序排列
|
- 按最新更新时间倒序排列
|
||||||
- 所有人(包括共享者和被共享者)都只能查看自己的会话记录
|
|
||||||
"""
|
"""
|
||||||
workspace_id = current_user.current_workspace_id
|
workspace_id = current_user.current_workspace_id
|
||||||
|
|
||||||
|
|||||||
@@ -167,8 +167,9 @@ class EventStreamHandler:
|
|||||||
"node_id": node_id,
|
"node_id": node_id,
|
||||||
"status": "failed",
|
"status": "failed",
|
||||||
"input": data.get("input_data"),
|
"input": data.get("input_data"),
|
||||||
"elapsed_time": data.get("elapsed_time"),
|
|
||||||
"output": None,
|
"output": None,
|
||||||
|
"process": data.get("process_data"),
|
||||||
|
"elapsed_time": data.get("elapsed_time"),
|
||||||
"error": data.get("error")
|
"error": data.get("error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -266,6 +267,7 @@ class EventStreamHandler:
|
|||||||
).timestamp() * 1000),
|
).timestamp() * 1000),
|
||||||
"input": result.get("node_outputs", {}).get(node_name, {}).get("input"),
|
"input": result.get("node_outputs", {}).get(node_name, {}).get("input"),
|
||||||
"output": result.get("node_outputs", {}).get(node_name, {}).get("output"),
|
"output": result.get("node_outputs", {}).get(node_name, {}).get("output"),
|
||||||
|
"process": result.get("node_outputs", {}).get(node_name, {}).get("process"),
|
||||||
"elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"),
|
"elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"),
|
||||||
"token_usage": result.get("node_outputs", {}).get(node_name, {}).get("token_usage")
|
"token_usage": result.get("node_outputs", {}).get(node_name, {}).get("token_usage")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -132,6 +132,11 @@ class HttpErrorDefaultTemplate(BaseModel):
|
|||||||
description="Default HTTP headers returned on error",
|
description="Default HTTP headers returned on error",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
files: list = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Default files list returned on error",
|
||||||
|
)
|
||||||
|
|
||||||
output: str = Field(
|
output: str = Field(
|
||||||
default="SUCCESS",
|
default="SUCCESS",
|
||||||
description="HTTP response body",
|
description="HTTP response body",
|
||||||
@@ -246,6 +251,13 @@ class HttpRequestNodeConfig(BaseNodeConfig):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class HttpRequestDataProcessing(BaseModel):
|
||||||
|
request: str = Field(
|
||||||
|
default="",
|
||||||
|
description="Raw HTTP request format for debugging",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class HttpRequestNodeOutput(BaseModel):
|
class HttpRequestNodeOutput(BaseModel):
|
||||||
body: str = Field(
|
body: str = Field(
|
||||||
...,
|
...,
|
||||||
|
|||||||
@@ -160,6 +160,7 @@ class HttpRequestNode(BaseNode):
|
|||||||
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any], down_stream_nodes: list[str]):
|
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any], down_stream_nodes: list[str]):
|
||||||
super().__init__(node_config, workflow_config, down_stream_nodes)
|
super().__init__(node_config, workflow_config, down_stream_nodes)
|
||||||
self.typed_config: HttpRequestNodeConfig | None = None
|
self.typed_config: HttpRequestNodeConfig | None = None
|
||||||
|
self.last_request: str = ""
|
||||||
|
|
||||||
def _output_types(self) -> dict[str, VariableType]:
|
def _output_types(self) -> dict[str, VariableType]:
|
||||||
return {
|
return {
|
||||||
@@ -170,6 +171,47 @@ class HttpRequestNode(BaseNode):
|
|||||||
"output": VariableType.STRING
|
"output": VariableType.STRING
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _extract_output(self, business_result: Any) -> Any:
|
||||||
|
if isinstance(business_result, dict):
|
||||||
|
result = {k: v for k, v in business_result.items() if k != "request"}
|
||||||
|
return result
|
||||||
|
return business_result
|
||||||
|
|
||||||
|
def _extract_extra_fields(self, business_result: Any) -> dict[str, Any]:
|
||||||
|
if isinstance(business_result, dict) and "request" in business_result:
|
||||||
|
return {
|
||||||
|
"process": {
|
||||||
|
"request": business_result.get("request", "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def _wrap_error(
|
||||||
|
self,
|
||||||
|
error_message: str,
|
||||||
|
elapsed_time: float,
|
||||||
|
state: WorkflowState,
|
||||||
|
variable_pool: VariablePool
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
input_data = self._extract_input(state, variable_pool)
|
||||||
|
node_output = {
|
||||||
|
"node_id": self.node_id,
|
||||||
|
"node_type": self.node_type,
|
||||||
|
"node_name": self.node_name,
|
||||||
|
"status": "failed",
|
||||||
|
"input": input_data,
|
||||||
|
"output": None,
|
||||||
|
"process": {"request": self.last_request} if self.last_request else None,
|
||||||
|
"elapsed_time": elapsed_time,
|
||||||
|
"token_usage": None,
|
||||||
|
"error": error_message
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"node_outputs": {self.node_id: node_output},
|
||||||
|
"error": error_message,
|
||||||
|
"error_node": self.node_id
|
||||||
|
}
|
||||||
|
|
||||||
def _build_timeout(self) -> Timeout:
|
def _build_timeout(self) -> Timeout:
|
||||||
"""
|
"""
|
||||||
Build httpx Timeout configuration.
|
Build httpx Timeout configuration.
|
||||||
@@ -255,10 +297,13 @@ class HttpRequestNode(BaseNode):
|
|||||||
case HttpContentType.NONE:
|
case HttpContentType.NONE:
|
||||||
return {}
|
return {}
|
||||||
case HttpContentType.JSON:
|
case HttpContentType.JSON:
|
||||||
content["json"] = json.loads(self._render_template(
|
rendered_body = self._render_template(
|
||||||
self.typed_config.body.data, variable_pool
|
self.typed_config.body.data, variable_pool
|
||||||
))
|
).strip()
|
||||||
|
if not rendered_body:
|
||||||
|
content["json"] = {}
|
||||||
|
else:
|
||||||
|
content["json"] = json.loads(rendered_body)
|
||||||
case HttpContentType.FROM_DATA:
|
case HttpContentType.FROM_DATA:
|
||||||
data = {}
|
data = {}
|
||||||
files = []
|
files = []
|
||||||
@@ -326,6 +371,62 @@ class HttpRequestNode(BaseNode):
|
|||||||
case _:
|
case _:
|
||||||
raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}")
|
raise RuntimeError(f"HttpRequest method not supported: {self.typed_config.method}")
|
||||||
|
|
||||||
|
def _generate_raw_request(
|
||||||
|
self,
|
||||||
|
variable_pool: VariablePool,
|
||||||
|
url: str,
|
||||||
|
headers: dict[str, str],
|
||||||
|
params: dict[str, str],
|
||||||
|
content: dict[str, Any]
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Generate raw HTTP request format for debugging.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
variable_pool: Variable Pool
|
||||||
|
url: Rendered URL
|
||||||
|
headers: Request headers
|
||||||
|
params: Query parameters
|
||||||
|
content: Request body content
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Raw HTTP request string
|
||||||
|
"""
|
||||||
|
method = self.typed_config.method.value
|
||||||
|
|
||||||
|
if params:
|
||||||
|
param_str = "&".join([f"{k}={v}" for k, v in params.items()])
|
||||||
|
full_url = f"{url}?{param_str}" if "?" not in url else f"{url}&{param_str}"
|
||||||
|
else:
|
||||||
|
full_url = url
|
||||||
|
|
||||||
|
lines = [f"{method} {full_url} HTTP/1.1"]
|
||||||
|
|
||||||
|
for key, value in headers.items():
|
||||||
|
lines.append(f"{key}: {value}")
|
||||||
|
|
||||||
|
if "json" in content and content["json"]:
|
||||||
|
json_body = json.dumps(content["json"], ensure_ascii=False)
|
||||||
|
lines.append(f"Content-Length: {len(json_body)}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(json_body)
|
||||||
|
elif "data" in content and "files" not in content:
|
||||||
|
if isinstance(content["data"], dict):
|
||||||
|
body_str = "&".join([f"{k}={v}" for k, v in content["data"].items()])
|
||||||
|
lines.append(f"Content-Length: {len(body_str)}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(body_str)
|
||||||
|
elif "content" in content:
|
||||||
|
lines.append(f"Content-Length: {len(content['content'])}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(content["content"])
|
||||||
|
elif "files" in content:
|
||||||
|
lines.append("Content-Length: 0")
|
||||||
|
lines.append("")
|
||||||
|
lines.append("# Note: This request includes file uploads")
|
||||||
|
|
||||||
|
return "\r\n".join(lines)
|
||||||
|
|
||||||
async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str:
|
async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> dict | str:
|
||||||
"""
|
"""
|
||||||
Execute the HTTP request node.
|
Execute the HTTP request node.
|
||||||
@@ -344,11 +445,25 @@ class HttpRequestNode(BaseNode):
|
|||||||
- str: Branch identifier (e.g. "ERROR") when branching is enabled
|
- str: Branch identifier (e.g. "ERROR") when branching is enabled
|
||||||
"""
|
"""
|
||||||
self.typed_config = HttpRequestNodeConfig(**self.config)
|
self.typed_config = HttpRequestNodeConfig(**self.config)
|
||||||
|
|
||||||
|
# Build request components
|
||||||
|
headers = self._build_header(variable_pool) | self._build_auth(variable_pool)
|
||||||
|
params = self._build_params(variable_pool)
|
||||||
|
content = await self._build_content(variable_pool)
|
||||||
|
url = self._render_template(self.typed_config.url, variable_pool)
|
||||||
|
|
||||||
|
logger.info(f"Node {self.node_id}: headers={headers}, params={params}, content keys={list(content.keys())}")
|
||||||
|
|
||||||
|
# Generate raw HTTP request for debugging
|
||||||
|
raw_request = self._generate_raw_request(variable_pool, url, headers, params, content)
|
||||||
|
self.last_request = raw_request
|
||||||
|
logger.info(f"Node {self.node_id}: Generated HTTP request:\n{raw_request}")
|
||||||
|
|
||||||
async with httpx.AsyncClient(
|
async with httpx.AsyncClient(
|
||||||
verify=self.typed_config.verify_ssl,
|
verify=self.typed_config.verify_ssl,
|
||||||
timeout=self._build_timeout(),
|
timeout=self._build_timeout(),
|
||||||
headers=self._build_header(variable_pool) | self._build_auth(variable_pool),
|
headers=headers,
|
||||||
params=self._build_params(variable_pool),
|
params=params,
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
) as client:
|
) as client:
|
||||||
retries = self.typed_config.retry.max_attempts
|
retries = self.typed_config.retry.max_attempts
|
||||||
@@ -356,18 +471,21 @@ class HttpRequestNode(BaseNode):
|
|||||||
try:
|
try:
|
||||||
request_func = self._get_client_method(client)
|
request_func = self._get_client_method(client)
|
||||||
resp = await request_func(
|
resp = await request_func(
|
||||||
url=self._render_template(self.typed_config.url, variable_pool),
|
url=url,
|
||||||
**(await self._build_content(variable_pool))
|
**content
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
logger.info(f"Node {self.node_id}: HTTP request succeeded")
|
logger.info(f"Node {self.node_id}: HTTP request succeeded")
|
||||||
response = HttpResponse(resp)
|
response = HttpResponse(resp)
|
||||||
return HttpRequestNodeOutput(
|
return {
|
||||||
body=response.body,
|
**HttpRequestNodeOutput(
|
||||||
status_code=resp.status_code,
|
body=response.body,
|
||||||
headers=resp.headers,
|
status_code=resp.status_code,
|
||||||
files=response.files
|
headers=resp.headers,
|
||||||
).model_dump()
|
files=response.files
|
||||||
|
).model_dump(),
|
||||||
|
"request": raw_request
|
||||||
|
}
|
||||||
except (httpx.HTTPStatusError, httpx.RequestError) as e:
|
except (httpx.HTTPStatusError, httpx.RequestError) as e:
|
||||||
logger.error(f"HTTP request node exception: {e}")
|
logger.error(f"HTTP request node exception: {e}")
|
||||||
retries -= 1
|
retries -= 1
|
||||||
@@ -383,10 +501,19 @@ class HttpRequestNode(BaseNode):
|
|||||||
logger.warning(
|
logger.warning(
|
||||||
f"Node {self.node_id}: HTTP request failed, returning default result"
|
f"Node {self.node_id}: HTTP request failed, returning default result"
|
||||||
)
|
)
|
||||||
return self.typed_config.error_handle.default.model_dump()
|
error_result = self.typed_config.error_handle.default.model_dump()
|
||||||
|
error_result["request"] = raw_request
|
||||||
|
return error_result
|
||||||
case HttpErrorHandle.BRANCH:
|
case HttpErrorHandle.BRANCH:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Node {self.node_id}: HTTP request failed, switching to error handling branch"
|
f"Node {self.node_id}: HTTP request failed, switching to error handling branch"
|
||||||
)
|
)
|
||||||
return {"output": "ERROR"}
|
return {
|
||||||
|
"output": "ERROR",
|
||||||
|
"body": "",
|
||||||
|
"status_code": 500,
|
||||||
|
"headers": {},
|
||||||
|
"files": [],
|
||||||
|
"request": raw_request
|
||||||
|
}
|
||||||
raise RuntimeError("http request failed")
|
raise RuntimeError("http request failed")
|
||||||
|
|||||||
@@ -203,7 +203,7 @@ class ConversationRepository:
|
|||||||
self,
|
self,
|
||||||
app_id: uuid.UUID,
|
app_id: uuid.UUID,
|
||||||
workspace_id: uuid.UUID,
|
workspace_id: uuid.UUID,
|
||||||
is_draft: bool = False,
|
is_draft: Optional[bool] = None,
|
||||||
keyword: Optional[str] = None,
|
keyword: Optional[str] = None,
|
||||||
page: int = 1,
|
page: int = 1,
|
||||||
pagesize: int = 20
|
pagesize: int = 20
|
||||||
@@ -214,7 +214,7 @@ class ConversationRepository:
|
|||||||
Args:
|
Args:
|
||||||
app_id: 应用 ID
|
app_id: 应用 ID
|
||||||
workspace_id: 工作空间 ID
|
workspace_id: 工作空间 ID
|
||||||
is_draft: 是否草稿会话(默认False,即发布会话)
|
is_draft: 是否草稿会话(None表示返回全部)
|
||||||
keyword: 搜索关键词(匹配消息内容)
|
keyword: 搜索关键词(匹配消息内容)
|
||||||
page: 页码(从 1 开始)
|
page: 页码(从 1 开始)
|
||||||
pagesize: 每页数量
|
pagesize: 每页数量
|
||||||
@@ -222,12 +222,15 @@ class ConversationRepository:
|
|||||||
Returns:
|
Returns:
|
||||||
Tuple[List[Conversation], int]: (会话列表,总数)
|
Tuple[List[Conversation], int]: (会话列表,总数)
|
||||||
"""
|
"""
|
||||||
base_stmt = select(Conversation).where(
|
base_conditions = [
|
||||||
Conversation.app_id == app_id,
|
Conversation.app_id == app_id,
|
||||||
Conversation.workspace_id == workspace_id,
|
Conversation.workspace_id == workspace_id,
|
||||||
Conversation.is_active.is_(True),
|
Conversation.is_active.is_(True),
|
||||||
Conversation.is_draft == is_draft
|
]
|
||||||
)
|
if is_draft is not None:
|
||||||
|
base_conditions.append(Conversation.is_draft == is_draft)
|
||||||
|
|
||||||
|
base_stmt = select(Conversation).where(*base_conditions)
|
||||||
|
|
||||||
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
|
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
|
||||||
if keyword:
|
if keyword:
|
||||||
|
|||||||
@@ -56,6 +56,7 @@ class AppLogNodeExecution(BaseModel):
|
|||||||
status: str = "pending"
|
status: str = "pending"
|
||||||
error: Optional[str] = None
|
error: Optional[str] = None
|
||||||
input: Optional[Any] = None
|
input: Optional[Any] = None
|
||||||
|
process: Optional[Any] = None
|
||||||
output: Optional[Any] = None
|
output: Optional[Any] = None
|
||||||
elapsed_time: Optional[float] = None
|
elapsed_time: Optional[float] = None
|
||||||
token_usage: Optional[Dict[str, Any]] = None
|
token_usage: Optional[Dict[str, Any]] = None
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ class AppLogService:
|
|||||||
workspace_id: uuid.UUID,
|
workspace_id: uuid.UUID,
|
||||||
page: int = 1,
|
page: int = 1,
|
||||||
pagesize: int = 20,
|
pagesize: int = 20,
|
||||||
is_draft: bool = False,
|
is_draft: Optional[bool] = None,
|
||||||
keyword: Optional[str] = None,
|
keyword: Optional[str] = None,
|
||||||
) -> Tuple[list[Conversation], int]:
|
) -> Tuple[list[Conversation], int]:
|
||||||
"""
|
"""
|
||||||
@@ -40,7 +40,7 @@ class AppLogService:
|
|||||||
workspace_id: 工作空间 ID
|
workspace_id: 工作空间 ID
|
||||||
page: 页码(从 1 开始)
|
page: 页码(从 1 开始)
|
||||||
pagesize: 每页数量
|
pagesize: 每页数量
|
||||||
is_draft: 是否草稿会话(默认False,即发布会话)
|
is_draft: 是否草稿会话(None表示返回全部)
|
||||||
keyword: 搜索关键词(匹配消息内容)
|
keyword: 搜索关键词(匹配消息内容)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -226,6 +226,7 @@ class AppLogService:
|
|||||||
status=node_data.get("status", "unknown"),
|
status=node_data.get("status", "unknown"),
|
||||||
error=node_data.get("error"),
|
error=node_data.get("error"),
|
||||||
input=node_data.get("input"),
|
input=node_data.get("input"),
|
||||||
|
process=node_data.get("process"),
|
||||||
output=node_data.get("output"),
|
output=node_data.get("output"),
|
||||||
elapsed_time=node_data.get("elapsed_time"),
|
elapsed_time=node_data.get("elapsed_time"),
|
||||||
token_usage=node_data.get("token_usage"),
|
token_usage=node_data.get("token_usage"),
|
||||||
|
|||||||
Reference in New Issue
Block a user