From b6e27da7b0d63d93a437e94c9813f666258ea914 Mon Sep 17 00:00:00 2001
From: Eternity <1533512157@qq.com>
Date: Fri, 24 Apr 2026 19:56:55 +0800
Subject: [PATCH 01/11] fix(api): convert end_user_id to string in write_router
---
.../memory/agent/langgraph_graph/routing/write_router.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/api/app/core/memory/agent/langgraph_graph/routing/write_router.py b/api/app/core/memory/agent/langgraph_graph/routing/write_router.py
index d016f2e0..3a225319 100644
--- a/api/app/core/memory/agent/langgraph_graph/routing/write_router.py
+++ b/api/app/core/memory/agent/langgraph_graph/routing/write_router.py
@@ -94,9 +94,9 @@ async def write(
# )
scheduler.push_task(
"app.core.memory.agent.write_message",
- actual_end_user_id,
+ str(actual_end_user_id),
{
- "end_user_id": actual_end_user_id,
+ "end_user_id": str(actual_end_user_id),
"message": structured_messages,
"config_id": str(actual_config_id),
"storage_type": storage_type,
@@ -177,9 +177,9 @@ async def window_dialogue(end_user_id, langchain_messages, memory_config, scope)
scheduler.push_task(
"app.core.memory.agent.write_message",
- end_user_id,
+ str(end_user_id),
{
- "end_user_id": end_user_id,
+ "end_user_id": str(end_user_id),
"message": redis_messages,
"config_id": config_id,
"storage_type": AgentMemory_Long_Term.STORAGE_NEO4J,
From cd34d5f5ce084130785f8978260ad01bfd9fe7ff Mon Sep 17 00:00:00 2001
From: Eternity <1533512157@qq.com>
Date: Fri, 24 Apr 2026 20:13:46 +0800
Subject: [PATCH 02/11] fix(api): convert config_id to string in write_router
---
.../core/memory/agent/langgraph_graph/routing/write_router.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/api/app/core/memory/agent/langgraph_graph/routing/write_router.py b/api/app/core/memory/agent/langgraph_graph/routing/write_router.py
index 3a225319..a896130f 100644
--- a/api/app/core/memory/agent/langgraph_graph/routing/write_router.py
+++ b/api/app/core/memory/agent/langgraph_graph/routing/write_router.py
@@ -181,7 +181,7 @@ async def window_dialogue(end_user_id, langchain_messages, memory_config, scope)
{
"end_user_id": str(end_user_id),
"message": redis_messages,
- "config_id": config_id,
+ "config_id": str(config_id),
"storage_type": AgentMemory_Long_Term.STORAGE_NEO4J,
"user_rag_memory_id": ""
}
From a268d0f7f187a4e05df973d0ead15a97ecef9e3f Mon Sep 17 00:00:00 2001
From: Timebomb2018 <18868801967@163.com>
Date: Mon, 27 Apr 2026 12:25:27 +0800
Subject: [PATCH 03/11] =?UTF-8?q?fix(multimodal=5Fservice):=20add=20'?=
=?UTF-8?q?=E6=96=87=E6=A1=A3=E5=86=85=E5=AE=B9=EF=BC=9A'=20prefix=20to=20?=
=?UTF-8?q?document=20text=20and=20simplify=20image=20placeholder=20text?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
api/app/services/multimodal_service.py | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/api/app/services/multimodal_service.py b/api/app/services/multimodal_service.py
index 08a33a48..c362158c 100644
--- a/api/app/services/multimodal_service.py
+++ b/api/app/services/multimodal_service.py
@@ -95,7 +95,7 @@ class DashScopeFormatStrategy(MultimodalFormatStrategy):
"""通义千问文档格式"""
return True, {
"type": "text",
- "text": f"\n{text}\n"
+ "text": f"\n文档内容:\n{text}\n"
}
async def format_audio(
@@ -167,6 +167,7 @@ class BedrockFormatStrategy(MultimodalFormatStrategy):
async def format_document(self, file_name: str, text: str) -> tuple[bool, Dict[str, Any]]:
"""Bedrock/Anthropic 文档格式(需要 base64 编码)"""
# Bedrock 文档需要 base64 编码
+ text = f"文档内容:\n{text}\n"
text_bytes = text.encode('utf-8')
base64_text = base64.b64encode(text_bytes).decode('utf-8')
@@ -223,7 +224,7 @@ class OpenAIFormatStrategy(MultimodalFormatStrategy):
"""OpenAI 文档格式"""
return True, {
"type": "text",
- "text": f"\n{text}\n"
+ "text": f"\n文档内容:\n{text}\n"
}
async def format_audio(
@@ -395,7 +396,7 @@ class MultimodalService:
ext = img_info.get("ext", "png")
try:
_, img_url = await self._save_doc_image_to_storage(img_info["bytes"], ext, tenant_id, workspace_id)
- placeholder = f"第{page}页 第{index + 1}张图片" if page > 0 else f"第{index + 1}张图片"
+ placeholder = f"第{page}页 第{index + 1}张" if page > 0 else f"第{index + 1}张"
# 在文本内容中追加图片位置标记
if result and result[-1].get("type") in ("text", "document"):
key = "text" if "text" in result[-1] else list(result[-1].keys())[-1]
From 546bfb96276258eff37466337d81ac94d4135319 Mon Sep 17 00:00:00 2001
From: wxy
Date: Mon, 27 Apr 2026 14:05:06 +0800
Subject: [PATCH 04/11] fix(api_key): bypass publication check for SERVICE type
API keys
- Exclude SERVICE type keys from application publication validation since their resource_id targets the workspace instead of an application.
---
api/app/services/api_key_service.py | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py
index e4367e98..9044af37 100644
--- a/api/app/services/api_key_service.py
+++ b/api/app/services/api_key_service.py
@@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from sqlalchemy import select
from app.aioRedis import aio_redis
-from app.models.api_key_model import ApiKey
+from app.models.api_key_model import ApiKey, ApiKeyType
from app.repositories.api_key_repository import ApiKeyRepository, ApiKeyLogRepository
from app.schemas import api_key_schema
from app.schemas.response_schema import PageData, PageMeta
@@ -65,7 +65,8 @@ class ApiKeyService:
BizCode.BAD_REQUEST
)
- if data.resource_id:
+ # SERVICE 类型的 resource_id 指向 workspace,非应用,跳过应用发布校验
+ if data.resource_id and data.type != ApiKeyType.SERVICE.value:
app = db.get(App, data.resource_id)
if not app or not app.current_release_id:
raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED)
@@ -452,9 +453,12 @@ class ApiKeyAuthService:
def check_app_published(db: Session, api_key_obj: ApiKey) -> None:
"""
检查应用是否已发布,未发布则抛出异常
+ SERVICE 类型的 api_key 不绑定应用(resource_id 指向 workspace),跳过校验
"""
if not api_key_obj.resource_id:
return
+ if api_key_obj.type == ApiKeyType.SERVICE.value:
+ return
app = db.get(App, api_key_obj.resource_id)
if not app or not app.current_release_id:
raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED)
From b64bcc2c50dc9bc38354acbf219cfc03a6df7746 Mon Sep 17 00:00:00 2001
From: wxy
Date: Mon, 27 Apr 2026 15:20:25 +0800
Subject: [PATCH 05/11] feat(workflow): augment logging queries and ameliorate
error handling
- Augment log search with app type filtering to enable keyword searching within workflow_executions.
- Introduce execution sequence markers to ensure logs are displayed in the correct chronological order.
- Ameliorate error handling to capture successful node outputs alongside failure details.
- Rectify the processing of empty JSON bodies in HTTP request nodes.
---
api/app/controllers/app_log_controller.py | 5 ++-
api/app/core/workflow/executor.py | 38 ++++++++++++++++++-
api/app/core/workflow/nodes/base_node.py | 28 +++++++++++++-
.../workflow/nodes/cycle_graph/iteration.py | 3 ++
.../core/workflow/nodes/http_request/node.py | 13 ++++++-
.../repositories/conversation_repository.py | 38 +++++++++++++++----
api/app/services/app_log_service.py | 20 ++++++++--
7 files changed, 126 insertions(+), 19 deletions(-)
diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py
index ea7962c1..90fbd4ea 100644
--- a/api/app/controllers/app_log_controller.py
+++ b/api/app/controllers/app_log_controller.py
@@ -41,7 +41,7 @@ def list_app_logs(
# 验证应用访问权限
app_service = AppService(db)
- app_service.get_app(app_id, workspace_id)
+ app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询
log_service = AppLogService(db)
@@ -51,7 +51,8 @@ def list_app_logs(
page=page,
pagesize=pagesize,
is_draft=is_draft,
- keyword=keyword
+ keyword=keyword,
+ app_type=app.type,
)
items = [AppLogConversation.model_validate(c) for c in conversations]
diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py
index 6ac48ede..ea05db87 100644
--- a/api/app/core/workflow/executor.py
+++ b/api/app/core/workflow/executor.py
@@ -16,6 +16,7 @@ from app.core.workflow.engine.runtime_schema import ExecutionContext
from app.core.workflow.engine.state_manager import WorkflowStateManager
from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator
from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer
+from app.core.workflow.nodes.base_node import NodeExecutionError
logger = logging.getLogger(__name__)
@@ -326,10 +327,43 @@ class WorkflowExecutor:
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True)
+
+ # 1) 尝试从 checkpoint 回补已成功节点的 node_outputs
+ recovered: dict[str, Any] = {}
+ try:
+ if self.graph is not None:
+ recovered = self.graph.get_state(
+ self.execution_context.checkpoint_config
+ ).values or {}
+ except Exception as recover_err:
+ logger.warning(
+ f"Recover state on failure failed: {recover_err}, "
+ f"execution_id={self.execution_context.execution_id}"
+ )
+
if result is None:
- result = {"error": str(e)}
+ result = dict(recovered) if recovered else {}
else:
- result["error"] = str(e)
+ # 已有 result 与 recovered 合并,node_outputs 深度合并
+ for k, v in recovered.items():
+ if k == "node_outputs" and isinstance(v, dict):
+ existing = result.get("node_outputs") or {}
+ result["node_outputs"] = {**v, **existing}
+ else:
+ result.setdefault(k, v)
+
+ # 2) 如果是节点抛出的 NodeExecutionError,把失败节点的 node_output 注入 node_outputs
+ failed_node_id: str | None = None
+ if isinstance(e, NodeExecutionError):
+ failed_node_id = e.node_id
+ node_outputs = result.setdefault("node_outputs", {})
+ # 不覆盖已有(理论上不会有),保底写入失败节点记录
+ node_outputs.setdefault(e.node_id, e.node_output)
+
+ result["error"] = str(e)
+ if failed_node_id:
+ result["error_node"] = failed_node_id
+
yield {
"event": "workflow_end",
"data": self.result_builder.build_final_output(
diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py
index 5458a80c..5d08670a 100644
--- a/api/app/core/workflow/nodes/base_node.py
+++ b/api/app/core/workflow/nodes/base_node.py
@@ -1,5 +1,6 @@
import asyncio
import logging
+import time
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
@@ -22,6 +23,20 @@ from app.services.multimodal_service import MultimodalService
logger = logging.getLogger(__name__)
+class NodeExecutionError(Exception):
+ """节点执行失败异常。
+
+ 携带失败节点的完整 node_output,供 executor 兜底注入 node_outputs,
+ 保证 workflow_executions.output_data 里能看到失败节点的日志记录。
+ """
+
+ def __init__(self, node_id: str, node_output: dict[str, Any], error_message: str):
+ super().__init__(f"Node {node_id} execution failed: {error_message}")
+ self.node_id = node_id
+ self.node_output = node_output
+ self.error_message = error_message
+
+
class BaseNode(ABC):
"""Base class for workflow nodes.
@@ -396,6 +411,8 @@ class BaseNode(ABC):
"elapsed_time": elapsed_time,
"token_usage": token_usage,
"error": None,
+ # 单调递增序号,用于日志按执行顺序排序(JSONB 不保证 key 顺序)
+ "execution_order": time.monotonic_ns(),
**self._extract_extra_fields(business_result),
}
final_output = {
@@ -444,7 +461,9 @@ class BaseNode(ABC):
"output": None,
"elapsed_time": elapsed_time,
"token_usage": None,
- "error": error_message
+ "error": error_message,
+ # 单调递增序号,用于日志按执行顺序排序
+ "execution_order": time.monotonic_ns(),
}
# if error_edge:
@@ -466,7 +485,12 @@ class BaseNode(ABC):
**node_output
})
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
- raise Exception(f"Node {self.node_id} execution failed: {error_message}")
+ # 抛出自定义异常,把 node_output 带给 executor,供其写入 node_outputs
+ raise NodeExecutionError(
+ node_id=self.node_id,
+ node_output=node_output,
+ error_message=error_message,
+ )
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
"""Extracts the input data for this node (used for logging or audit).
diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py
index 3ee7774f..3ce22ab2 100644
--- a/api/app/core/workflow/nodes/cycle_graph/iteration.py
+++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py
@@ -174,6 +174,9 @@ class IterationRuntime:
continue
node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type")
cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None
+ node_cfg = next(
+ (n for n in self.cycle_nodes if n.get("id") == node_name), None
+ )
self.event_write({
"type": "cycle_item",
"data": {
diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py
index da13ceec..6b117368 100644
--- a/api/app/core/workflow/nodes/http_request/node.py
+++ b/api/app/core/workflow/nodes/http_request/node.py
@@ -255,9 +255,18 @@ class HttpRequestNode(BaseNode):
case HttpContentType.NONE:
return {}
case HttpContentType.JSON:
- content["json"] = json.loads(self._render_template(
+ rendered = self._render_template(
self.typed_config.body.data, variable_pool
- ))
+ )
+ if not rendered or not rendered.strip():
+ # 第三方导入的工作流可能出现 content_type=json 但 data 为空的情况,视为无 body
+ return {}
+ try:
+ content["json"] = json.loads(rendered)
+ except json.JSONDecodeError as e:
+ raise RuntimeError(
+ f"Invalid JSON body for HTTP request node: {e.msg} (data={rendered!r})"
+ )
case HttpContentType.FROM_DATA:
data = {}
files = []
diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py
index 129e1f02..e3447dbd 100644
--- a/api/app/repositories/conversation_repository.py
+++ b/api/app/repositories/conversation_repository.py
@@ -1,13 +1,15 @@
import uuid
from typing import Optional
-from sqlalchemy import select, desc, func
+from sqlalchemy import select, desc, func, or_, cast, Text
from sqlalchemy.orm import Session
from app.core.exceptions import ResourceNotFoundException
from app.core.logging_config import get_db_logger
from app.models import Conversation, Message
+from app.models.app_model import AppType
from app.models.conversation_model import ConversationDetail
+from app.models.workflow_model import WorkflowExecution
logger = get_db_logger()
@@ -206,7 +208,8 @@ class ConversationRepository:
is_draft: Optional[bool] = None,
keyword: Optional[str] = None,
page: int = 1,
- pagesize: int = 20
+ pagesize: int = 20,
+ app_type: Optional[str] = None,
) -> tuple[list[Conversation], int]:
"""
查询应用日志会话列表(带分页和过滤)
@@ -218,6 +221,9 @@ class ConversationRepository:
keyword: 搜索关键词(匹配消息内容)
page: 页码(从 1 开始)
pagesize: 每页数量
+ app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的
+ input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表);
+ 其他类型仍走 messages 表。
Returns:
Tuple[List[Conversation], int]: (会话列表,总数)
@@ -234,12 +240,28 @@ class ConversationRepository:
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
if keyword:
- # 查找包含关键词的 conversation_id 列表
- keyword_stmt = (
- select(Message.conversation_id)
- .where(Message.content.ilike(f"%{keyword}%"))
- .distinct()
- )
+ kw_pattern = f"%{keyword}%"
+ if app_type == AppType.WORKFLOW:
+ # 工作流:从 workflow_executions 的 input_data / output_data 匹配
+ # (messages 表只存开场白 assistant 消息,失败的工作流也不会写入)
+ keyword_stmt = (
+ select(WorkflowExecution.conversation_id)
+ .where(
+ WorkflowExecution.conversation_id.is_not(None),
+ or_(
+ cast(WorkflowExecution.input_data, Text).ilike(kw_pattern),
+ cast(WorkflowExecution.output_data, Text).ilike(kw_pattern),
+ ),
+ )
+ .distinct()
+ )
+ else:
+ # Agent 等其他类型:仍走 messages 表(user + assistant 内容)
+ keyword_stmt = (
+ select(Message.conversation_id)
+ .where(Message.content.ilike(kw_pattern))
+ .distinct()
+ )
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
# Calculate total number of records
diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py
index 7ca05d42..c2cff2a6 100644
--- a/api/app/services/app_log_service.py
+++ b/api/app/services/app_log_service.py
@@ -32,6 +32,7 @@ class AppLogService:
pagesize: int = 20,
is_draft: Optional[bool] = None,
keyword: Optional[str] = None,
+ app_type: Optional[str] = None,
) -> Tuple[list[Conversation], int]:
"""
查询应用日志会话列表
@@ -43,6 +44,7 @@ class AppLogService:
pagesize: 每页数量
is_draft: 是否草稿会话(None表示返回全部)
keyword: 搜索关键词(匹配消息内容)
+ app_type: 应用类型(WORKFLOW 时关键词将从 workflow_executions 搜索)
Returns:
Tuple[list[Conversation], int]: (会话列表,总数)
@@ -55,7 +57,8 @@ class AppLogService:
"page": page,
"pagesize": pagesize,
"is_draft": is_draft,
- "keyword": keyword
+ "keyword": keyword,
+ "app_type": app_type,
}
)
@@ -66,7 +69,8 @@ class AppLogService:
is_draft=is_draft,
keyword=keyword,
page=page,
- pagesize=pagesize
+ pagesize=pagesize,
+ app_type=app_type,
)
logger.info(
@@ -368,8 +372,16 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
if not output_data:
return []
node_outputs: dict = output_data.get("node_outputs") or {}
+ # 按 execution_order(节点执行时写入的单调递增序号)排序。
+ # PostgreSQL JSONB 不保证 key 顺序,不能依赖 dict 插入顺序;
+ # 缺失 execution_order 的历史数据退化到 0,保持在最前。
+ ordered_items = sorted(
+ node_outputs.items(),
+ key=lambda kv: (kv[1] or {}).get("execution_order", 0)
+ if isinstance(kv[1], dict) else 0
+ )
result = []
- for node_id, node_data in node_outputs.items():
+ for node_id, node_data in ordered_items:
if not isinstance(node_data, dict):
continue
output = dict(node_data)
@@ -382,6 +394,8 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
inp = output.pop("input", None)
elapsed_time = output.pop("elapsed_time", None)
token_usage = output.pop("token_usage", None)
+ # execution_order 仅用于排序,不返回给前端
+ output.pop("execution_order", None)
result.append(AppLogNodeExecution(
node_id=node_id,
node_type=node_type,
From faf8d1a51a8bedb72a1f0109979ceb0e76911374 Mon Sep 17 00:00:00 2001
From: Timebomb2018 <18868801967@163.com>
Date: Mon, 27 Apr 2026 15:35:26 +0800
Subject: [PATCH 06/11] fix(workflow): add reasoning content, suggested
questions, citations and audio status support
- Introduce `reasoning_content`, `suggested_questions`, `citations`, and `audio_status` fields in conversation and app response schemas
- Conditionally set `audio_status` to `"pending"` only when `audio_url` is present
- Replace `model_dump` override with `@model_serializer(mode="wrap")` for cleaner serialization logic
- Change knowledge base validation failure from `RuntimeError` to warning + `continue` to avoid halting retrieval on invalid KB
---
api/app/core/workflow/nodes/knowledge/node.py | 3 ++-
api/app/schemas/app_schema.py | 8 +++++---
api/app/schemas/conversation_schema.py | 14 +++++++++++++-
api/app/services/app_chat_service.py | 2 +-
api/app/services/draft_run_service.py | 2 +-
5 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/api/app/core/workflow/nodes/knowledge/node.py b/api/app/core/workflow/nodes/knowledge/node.py
index bd1ba998..c3fda4e2 100644
--- a/api/app/core/workflow/nodes/knowledge/node.py
+++ b/api/app/core/workflow/nodes/knowledge/node.py
@@ -334,7 +334,8 @@ class KnowledgeRetrievalNode(BaseNode):
for kb_config in knowledge_bases:
db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_config.kb_id)
if not (db_knowledge and db_knowledge.chunk_num > 0 and db_knowledge.status == 1):
- raise RuntimeError("The knowledge base does not exist or access is denied.")
+ logger.warning("The knowledge base does not exist or access is denied.")
+ continue
tasks.append(self.knowledge_retrieval(db, query, db_knowledge, kb_config))
if tasks:
result = await asyncio.gather(*tasks)
diff --git a/api/app/schemas/app_schema.py b/api/app/schemas/app_schema.py
index 11c27b56..89603322 100644
--- a/api/app/schemas/app_schema.py
+++ b/api/app/schemas/app_schema.py
@@ -3,7 +3,7 @@ import uuid
from typing import Optional, Any, List, Dict, Union
from enum import Enum, StrEnum
-from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator
+from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator, model_serializer
from app.schemas.workflow_schema import WorkflowConfigCreate
@@ -661,9 +661,11 @@ class DraftRunResponse(BaseModel):
suggested_questions: List[str] = Field(default_factory=list, description="下一步建议问题")
citations: List[Dict[str, Any]] = Field(default_factory=list, description="引用来源")
audio_url: Optional[str] = Field(default=None, description="TTS 语音URL")
+ audio_status: Optional[str] = Field(default=None, description="TTS 语音状态")
- def model_dump(self, **kwargs):
- data = super().model_dump(**kwargs)
+ @model_serializer(mode="wrap")
+ def _serialize(self, handler):
+ data = handler(self)
if not data.get("reasoning_content"):
data.pop("reasoning_content", None)
return data
diff --git a/api/app/schemas/conversation_schema.py b/api/app/schemas/conversation_schema.py
index fd1be5d9..20782fd9 100644
--- a/api/app/schemas/conversation_schema.py
+++ b/api/app/schemas/conversation_schema.py
@@ -2,7 +2,7 @@
import uuid
import datetime
from typing import Optional, Dict, Any, List
-from pydantic import BaseModel, Field, ConfigDict, field_serializer
+from pydantic import BaseModel, Field, ConfigDict, field_serializer, model_serializer
# 导入 FileInput(用于体验运行)
from app.schemas.app_schema import FileInput
@@ -94,6 +94,18 @@ class ChatResponse(BaseModel):
message_id: str
usage: Optional[Dict[str, Any]] = None
elapsed_time: Optional[float] = None
+ reasoning_content: Optional[str] = None
+ suggested_questions: Optional[List[str]] = None
+ citations: Optional[List[Any]] = None
+ audio_url: Optional[str] = None
+ audio_status: Optional[str] = None
+
+ @model_serializer(mode="wrap")
+ def _serialize(self, handler):
+ data = handler(self)
+ if not data.get("reasoning_content"):
+ data.pop("reasoning_content", None)
+ return data
# ---------- Conversation Summary Schemas ----------
diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py
index 7c52c2d6..12f54c03 100644
--- a/api/app/services/app_chat_service.py
+++ b/api/app/services/app_chat_service.py
@@ -317,7 +317,7 @@ class AppChatService:
"suggested_questions": suggested_questions,
"citations": filtered_citations,
"audio_url": audio_url,
- "audio_status": "pending"
+ "audio_status": "pending" if audio_url else None
}
async def agnet_chat_stream(
diff --git a/api/app/services/draft_run_service.py b/api/app/services/draft_run_service.py
index 10d50ece..2566a50f 100644
--- a/api/app/services/draft_run_service.py
+++ b/api/app/services/draft_run_service.py
@@ -754,7 +754,7 @@ class AgentRunService:
) if not sub_agent else [],
"citations": filtered_citations,
"audio_url": audio_url,
- "audio_status": "pending"
+ "audio_status": "pending" if audio_url else None
}
logger.info(
From 12a08a487d02f720ede1a666e6a48d0912f443e0 Mon Sep 17 00:00:00 2001
From: Timebomb2018 <18868801967@163.com>
Date: Mon, 27 Apr 2026 15:47:34 +0800
Subject: [PATCH 07/11] fix(tool_controller): re-raise HTTPException to
preserve original status codes
---
api/app/controllers/tool_controller.py | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/api/app/controllers/tool_controller.py b/api/app/controllers/tool_controller.py
index 74b8d88e..688ab518 100644
--- a/api/app/controllers/tool_controller.py
+++ b/api/app/controllers/tool_controller.py
@@ -173,6 +173,8 @@ async def delete_tool(
return success(msg="工具删除成功")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
+ except HTTPException:
+ raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -249,6 +251,8 @@ async def parse_openapi_schema(
if result["success"] is False:
raise HTTPException(status_code=400, detail=result["message"])
return success(data=result, msg="Schema解析完成")
+ except HTTPException:
+ raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
From 98d8d7b26138630a622344a235b06a4ed69ad857 Mon Sep 17 00:00:00 2001
From: Timebomb2018 <18868801967@163.com>
Date: Mon, 27 Apr 2026 15:49:21 +0800
Subject: [PATCH 08/11] fix(conversation_schema): refine citations field type
to Dict[str, Any]
---
api/app/schemas/conversation_schema.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/api/app/schemas/conversation_schema.py b/api/app/schemas/conversation_schema.py
index 20782fd9..7c3a0f03 100644
--- a/api/app/schemas/conversation_schema.py
+++ b/api/app/schemas/conversation_schema.py
@@ -96,7 +96,7 @@ class ChatResponse(BaseModel):
elapsed_time: Optional[float] = None
reasoning_content: Optional[str] = None
suggested_questions: Optional[List[str]] = None
- citations: Optional[List[Any]] = None
+ citations: Optional[List[Dict[str, Any]]] = None
audio_url: Optional[str] = None
audio_status: Optional[str] = None
From 30cdf229de27370613e465d6177e4327ffa037b1 Mon Sep 17 00:00:00 2001
From: Mark <348207283@qq.com>
Date: Mon, 27 Apr 2026 16:05:27 +0800
Subject: [PATCH 09/11] [modify] rag file system
---
api/app/controllers/chunk_controller.py | 40 +-
api/app/controllers/document_controller.py | 37 +-
api/app/controllers/file_controller.py | 420 ++++++------------
.../rag/prompts/vision_llm_describe_prompt.md | 1 +
api/app/models/file_model.py | 1 +
api/app/schemas/file_schema.py | 1 +
api/app/services/file_storage_service.py | 34 +-
api/app/tasks.py | 42 +-
8 files changed, 228 insertions(+), 348 deletions(-)
diff --git a/api/app/controllers/chunk_controller.py b/api/app/controllers/chunk_controller.py
index b2cc3695..f031efbb 100644
--- a/api/app/controllers/chunk_controller.py
+++ b/api/app/controllers/chunk_controller.py
@@ -82,19 +82,32 @@ async def get_preview_chunks(
detail="The file does not exist or you do not have permission to access it"
)
- # 5. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext}
- file_path = os.path.join(
- settings.FILE_PATH,
- str(db_file.kb_id),
- str(db_file.parent_id),
- f"{db_file.id}{db_file.file_ext}"
- )
-
- # 6. Check if the file exists
- if not os.path.exists(file_path):
+ # 5. Get file content from storage backend
+ if not db_file.file_key:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
- detail="File not found (possibly deleted)"
+ detail="File has no storage key (legacy data not migrated)"
+ )
+
+ from app.services.file_storage_service import FileStorageService
+ import asyncio
+ storage_service = FileStorageService()
+
+ async def _download():
+ return await storage_service.download_file(db_file.file_key)
+
+ try:
+ file_binary = asyncio.run(_download())
+ except RuntimeError:
+ loop = asyncio.new_event_loop()
+ try:
+ file_binary = loop.run_until_complete(_download())
+ finally:
+ loop.close()
+ except Exception as e:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"File not found in storage: {e}"
)
# 7. Document parsing & segmentation
@@ -104,11 +117,12 @@ async def get_preview_chunks(
vision_model = QWenCV(
key=db_knowledge.image2text.api_keys[0].api_key,
model_name=db_knowledge.image2text.api_keys[0].model_name,
- lang="Chinese", # Default to Chinese
+ lang="Chinese",
base_url=db_knowledge.image2text.api_keys[0].api_base
)
from app.core.rag.app.naive import chunk
- res = chunk(filename=file_path,
+ res = chunk(filename=db_file.file_name,
+ binary=file_binary,
from_page=0,
to_page=5,
callback=progress_callback,
diff --git a/api/app/controllers/document_controller.py b/api/app/controllers/document_controller.py
index 350acc0e..02e16943 100644
--- a/api/app/controllers/document_controller.py
+++ b/api/app/controllers/document_controller.py
@@ -305,38 +305,25 @@ async def parse_documents(
detail="The file does not exist or you do not have permission to access it"
)
- # 3. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext}
- file_path = os.path.join(
- settings.FILE_PATH,
- str(db_file.kb_id),
- str(db_file.parent_id),
- f"{db_file.id}{db_file.file_ext}"
- )
-
- # 4. Check if the file exists
- api_logger.debug(f"Constructed file path: {file_path}")
- api_logger.debug(f"File metadata - kb_id: {db_file.kb_id}, parent_id: {db_file.parent_id}, file_id: {db_file.id}, extension: {db_file.file_ext}")
- if not os.path.exists(file_path):
- api_logger.error(f"File not found (possibly deleted): file_path={file_path}, file_id={db_file.id}, document_id={document_id}")
+ # 3. Get file_key for storage backend
+ if not db_file.file_key:
+ api_logger.error(f"File has no storage key (legacy data not migrated): file_id={db_file.id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
- detail="File not found (possibly deleted)"
+ detail="File has no storage key (legacy data not migrated)"
)
- # 5. Obtain knowledge base information
- api_logger.info( f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}")
+ # 4. Obtain knowledge base information
+ api_logger.info(f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user)
if not db_knowledge:
- api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={db_document.kb_id}")
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="The knowledge base does not exist or access is denied"
- )
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge base not found")
- # 6. Task: Document parsing, vectorization, and storage
- # from app.tasks import parse_document
- # parse_document(file_path, document_id)
- task = celery_app.send_task("app.core.rag.tasks.parse_document", args=[file_path, document_id])
+ # 5. Dispatch parse task with file_key (not file_path)
+ task = celery_app.send_task(
+ "app.core.rag.tasks.parse_document",
+ args=[db_file.file_key, document_id, db_file.file_name]
+ )
result = {
"task_id": task.id
}
diff --git a/api/app/controllers/file_controller.py b/api/app/controllers/file_controller.py
index f7bd0e7a..c213b6c6 100644
--- a/api/app/controllers/file_controller.py
+++ b/api/app/controllers/file_controller.py
@@ -1,12 +1,10 @@
import os
-from pathlib import Path
-import shutil
from typing import Any, Optional
import uuid
from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query
from fastapi.encoders import jsonable_encoder
-from fastapi.responses import FileResponse
+from fastapi.responses import Response
from sqlalchemy.orm import Session
from app.core.config import settings
@@ -19,9 +17,13 @@ from app.models.user_model import User
from app.schemas import file_schema, document_schema
from app.schemas.response_schema import ApiResponse
from app.services import file_service, document_service
+from app.services.knowledge_service import get_knowledge_by_id as get_kb_by_id
+from app.services.file_storage_service import (
+ FileStorageService,
+ generate_kb_file_key,
+ get_file_storage_service,
+)
-
-# Obtain a dedicated API logger
api_logger = get_api_logger()
router = APIRouter(
@@ -34,67 +36,37 @@ router = APIRouter(
async def get_files(
kb_id: uuid.UUID,
parent_id: uuid.UUID,
- page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
- pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
+ page: int = Query(1, gt=0),
+ pagesize: int = Query(20, gt=0, le=100),
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"),
desc: Optional[bool] = Query(False, description="Is it descending order"),
keywords: Optional[str] = Query(None, description="Search keywords (file name)"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
- """
- Paged query file list
- - Support filtering by kb_id and parent_id
- - Support keyword search for file names
- - Support dynamic sorting
- - Return paging metadata + file list
- """
- api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}, keywords={keywords}, username: {current_user.username}")
- # 1. parameter validation
- if page < 1 or pagesize < 1:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="The paging parameter must be greater than 0"
- )
+ """Paged query file list"""
+ api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}")
- # 2. Construct query conditions
- filters = [
- file_model.File.kb_id == kb_id
- ]
+ if page < 1 or pagesize < 1:
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The paging parameter must be greater than 0")
+
+ filters = [file_model.File.kb_id == kb_id]
if parent_id:
filters.append(file_model.File.parent_id == parent_id)
- # Keyword search (fuzzy matching of file name)
if keywords:
filters.append(file_model.File.file_name.ilike(f"%{keywords}%"))
- # 3. Execute paged query
try:
- api_logger.debug("Start executing file paging query")
total, items = file_service.get_files_paginated(
- db=db,
- filters=filters,
- page=page,
- pagesize=pagesize,
- orderby=orderby,
- desc=desc,
- current_user=current_user
+ db=db, filters=filters, page=page, pagesize=pagesize,
+ orderby=orderby, desc=desc, current_user=current_user
)
- api_logger.info(f"File query successful: total={total}, returned={len(items)} records")
except Exception as e:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Query failed: {str(e)}"
- )
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query failed: {str(e)}")
- # 4. Return structured response
result = {
"items": items,
- "page": {
- "page": page,
- "pagesize": pagesize,
- "total": total,
- "has_next": True if page * pagesize < total else False
- }
+ "page": {"page": page, "pagesize": pagesize, "total": total, "has_next": page * pagesize < total}
}
return success(data=jsonable_encoder(result), msg="Query of file list succeeded")
@@ -107,23 +79,14 @@ async def create_folder(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
- """
- Create a new folder
- """
- api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}, username: {current_user.username}")
-
+ """Create a new folder"""
+ api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}")
try:
- api_logger.debug(f"Start creating a folder: {folder_name}")
- create_folder = file_schema.FileCreate(
- kb_id=kb_id,
- created_by=current_user.id,
- parent_id=parent_id,
- file_name=folder_name,
- file_ext='folder',
- file_size=0,
+ create_folder_data = file_schema.FileCreate(
+ kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
+ file_name=folder_name, file_ext='folder', file_size=0,
)
- db_file = file_service.create_file(db=db, file=create_folder, current_user=current_user)
- api_logger.info(f"Folder created successfully: {db_file.file_name} (ID: {db_file.id})")
+ db_file = file_service.create_file(db=db, file=create_folder_data, current_user=current_user)
return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="Folder creation successful")
except Exception as e:
api_logger.error(f"Folder creation failed: {folder_name} - {str(e)}")
@@ -136,76 +99,58 @@ async def upload_file(
parent_id: uuid.UUID,
file: UploadFile = File(...),
db: Session = Depends(get_db),
- current_user: User = Depends(get_current_user)
+ current_user: User = Depends(get_current_user),
+ storage_service: FileStorageService = Depends(get_file_storage_service),
):
- """
- upload file
- """
- api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}, username: {current_user.username}")
+ """Upload file to storage backend"""
+ api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}")
- # Read the contents of the file
contents = await file.read()
- # Check file size
file_size = len(contents)
- print(f"file size: {file_size} byte")
if file_size == 0:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="The file is empty."
- )
- # If the file size exceeds 50MB (50 * 1024 * 1024 bytes)
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The file is empty.")
if file_size > settings.MAX_FILE_SIZE:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=f"The file size exceeds the {settings.MAX_FILE_SIZE}byte limit"
- )
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"File size exceeds {settings.MAX_FILE_SIZE} byte limit")
- # Extract the extension using `os.path.splitext`
_, file_extension = os.path.splitext(file.filename)
- upload_file = file_schema.FileCreate(
- kb_id=kb_id,
- created_by=current_user.id,
- parent_id=parent_id,
- file_name=file.filename,
- file_ext=file_extension.lower(),
- file_size=file_size,
+ file_ext = file_extension.lower()
+
+ # Create File record
+ upload_file_data = file_schema.FileCreate(
+ kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
+ file_name=file.filename, file_ext=file_ext, file_size=file_size,
)
- db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user)
+ db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user)
- # Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension}
- save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id))
- Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
- save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
+ # Upload to storage backend
+ file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=file_ext)
+ try:
+ await storage_service.storage.upload(file_key=file_key, content=contents, content_type=file.content_type)
+ except Exception as e:
+ api_logger.error(f"Storage upload failed: {e}")
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}")
- # Save file
- with open(save_path, "wb") as f:
- f.write(contents)
+ # Save file_key
+ db_file.file_key = file_key
+ db.commit()
+ db.refresh(db_file)
- # Verify whether the file has been saved successfully
- if not os.path.exists(save_path):
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="File save failed"
- )
+ # Create document (inherit parser_config from knowledge base)
+ default_parser_config = {
+ "layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n",
+ "auto_keywords": 0, "auto_questions": 0, "html4excel": "false"
+ }
+ try:
+ db_knowledge = get_kb_by_id(db, knowledge_id=kb_id, current_user=current_user)
+ if db_knowledge and db_knowledge.parser_config:
+ default_parser_config.update(dict(db_knowledge.parser_config))
+ except Exception:
+ pass
- # Create a document
create_data = document_schema.DocumentCreate(
- kb_id=kb_id,
- created_by=current_user.id,
- file_id=db_file.id,
- file_name=db_file.file_name,
- file_ext=db_file.file_ext,
- file_size=db_file.file_size,
- file_meta={},
- parser_id="naive",
- parser_config={
- "layout_recognize": "DeepDOC",
- "chunk_token_num": 128,
- "delimiter": "\n",
- "auto_keywords": 0,
- "auto_questions": 0,
- "html4excel": "false"
- }
+ kb_id=kb_id, created_by=current_user.id, file_id=db_file.id,
+ file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size,
+ file_meta={}, parser_id="naive", parser_config=default_parser_config
)
db_document = document_service.create_document(db=db, document=create_data, current_user=current_user)
@@ -219,123 +164,73 @@ async def custom_text(
parent_id: uuid.UUID,
create_data: file_schema.CustomTextFileCreate,
db: Session = Depends(get_db),
- current_user: User = Depends(get_current_user)
+ current_user: User = Depends(get_current_user),
+ storage_service: FileStorageService = Depends(get_file_storage_service),
):
- """
- custom text
- """
- api_logger.info(f"custom text upload request: kb_id={kb_id}, parent_id={parent_id}, title={create_data.title}, content={create_data.content}, username: {current_user.username}")
-
- # Check file content size
- # 将内容编码为字节(UTF-8)
+ """Custom text upload"""
content_bytes = create_data.content.encode('utf-8')
file_size = len(content_bytes)
- print(f"file size: {file_size} byte")
if file_size == 0:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="The content is empty."
- )
- # If the file size exceeds 50MB (50 * 1024 * 1024 bytes)
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The content is empty.")
if file_size > settings.MAX_FILE_SIZE:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=f"The content size exceeds the {settings.MAX_FILE_SIZE}byte limit"
- )
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Content size exceeds {settings.MAX_FILE_SIZE} byte limit")
- upload_file = file_schema.FileCreate(
- kb_id=kb_id,
- created_by=current_user.id,
- parent_id=parent_id,
- file_name=f"{create_data.title}.txt",
- file_ext=".txt",
- file_size=file_size,
+ upload_file_data = file_schema.FileCreate(
+ kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
+ file_name=f"{create_data.title}.txt", file_ext=".txt", file_size=file_size,
)
- db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user)
+ db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user)
- # Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension}
- save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id))
- Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
- save_path = os.path.join(save_dir, f"{db_file.id}.txt")
+ # Upload to storage backend
+ file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=".txt")
+ try:
+ await storage_service.storage.upload(file_key=file_key, content=content_bytes, content_type="text/plain")
+ except Exception as e:
+ api_logger.error(f"Storage upload failed: {e}")
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}")
- # Save file
- with open(save_path, "wb") as f:
- f.write(content_bytes)
+ db_file.file_key = file_key
+ db.commit()
+ db.refresh(db_file)
- # Verify whether the file has been saved successfully
- if not os.path.exists(save_path):
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="File save failed"
- )
-
- # Create a document
create_document_data = document_schema.DocumentCreate(
- kb_id=kb_id,
- created_by=current_user.id,
- file_id=db_file.id,
- file_name=db_file.file_name,
- file_ext=db_file.file_ext,
- file_size=db_file.file_size,
- file_meta={},
- parser_id="naive",
- parser_config={
- "layout_recognize": "DeepDOC",
- "chunk_token_num": 128,
- "delimiter": "\n",
- "auto_keywords": 0,
- "auto_questions": 0,
- "html4excel": "false"
- }
+ kb_id=kb_id, created_by=current_user.id, file_id=db_file.id,
+ file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size,
+ file_meta={}, parser_id="naive",
+ parser_config={"layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n",
+ "auto_keywords": 0, "auto_questions": 0, "html4excel": "false"}
)
db_document = document_service.create_document(db=db, document=create_document_data, current_user=current_user)
- api_logger.info(f"custom text upload successfully: {create_data.title} (file_id: {db_file.id}, document_id: {db_document.id})")
return success(data=jsonable_encoder(document_schema.Document.model_validate(db_document)), msg="custom text upload successful")
@router.get("/{file_id}", response_model=Any)
async def get_file(
file_id: uuid.UUID,
- db: Session = Depends(get_db)
+ db: Session = Depends(get_db),
+ storage_service: FileStorageService = Depends(get_file_storage_service),
) -> Any:
- """
- Download the file based on the file_id
- - Query file information from the database
- - Construct the file path and check if it exists
- - Return a FileResponse to download the file
- """
- api_logger.info(f"Download the file based on the file_id: file_id={file_id}")
-
- # 1. Query file information from the database
+ """Download file by file_id"""
db_file = file_service.get_file_by_id(db, file_id=file_id)
if not db_file:
- api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}")
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="The file does not exist or you do not have permission to access it"
- )
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
- # 2. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext}
- file_path = os.path.join(
- settings.FILE_PATH,
- str(db_file.kb_id),
- str(db_file.parent_id),
- f"{db_file.id}{db_file.file_ext}"
- )
+ if not db_file.file_key:
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File has no storage key (legacy data not migrated)")
- # 3. Check if the file exists
- if not os.path.exists(file_path):
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="File not found (possibly deleted)"
- )
+ try:
+ content = await storage_service.download_file(db_file.file_key)
+ except Exception as e:
+ api_logger.error(f"Storage download failed: {e}")
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found in storage")
- # 4.Return FileResponse (automatically handle download)
- return FileResponse(
- path=file_path,
- filename=db_file.file_name, # Use original file name
- media_type="application/octet-stream" # Universal binary stream type
+ import mimetypes
+ media_type = mimetypes.guess_type(db_file.file_name)[0] or "application/octet-stream"
+ return Response(
+ content=content,
+ media_type=media_type,
+ headers={"Content-Disposition": f'attachment; filename="{db_file.file_name}"'}
)
@@ -346,50 +241,22 @@ async def update_file(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
- """
- Update file information (such as file name)
- - Only specified fields such as file_name are allowed to be modified
- """
- api_logger.debug(f"Query the file to be updated: {file_id}")
-
- # 1. Check if the file exists
+ """Update file information (such as file name)"""
db_file = file_service.get_file_by_id(db, file_id=file_id)
-
if not db_file:
- api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}")
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="The file does not exist or you do not have permission to access it"
- )
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
- # 2. Update fields (only update non-null fields)
- api_logger.debug(f"Start updating the file fields: {file_id}")
- updated_fields = []
for field, value in update_data.dict(exclude_unset=True).items():
if hasattr(db_file, field):
- old_value = getattr(db_file, field)
- if old_value != value:
- # update value
- setattr(db_file, field, value)
- updated_fields.append(f"{field}: {old_value} -> {value}")
+ setattr(db_file, field, value)
- if updated_fields:
- api_logger.debug(f"updated fields: {', '.join(updated_fields)}")
-
- # 3. Save to database
try:
db.commit()
db.refresh(db_file)
- api_logger.info(f"The file has been successfully updated: {db_file.file_name} (ID: {db_file.id})")
except Exception as e:
db.rollback()
- api_logger.error(f"File update failed: file_id={file_id} - {str(e)}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"File update failed: {str(e)}"
- )
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File update failed: {str(e)}")
- # 4. Return the updated file
return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="File information updated successfully")
@@ -397,60 +264,43 @@ async def update_file(
async def delete_file(
file_id: uuid.UUID,
db: Session = Depends(get_db),
- current_user: User = Depends(get_current_user)
+ current_user: User = Depends(get_current_user),
+ storage_service: FileStorageService = Depends(get_file_storage_service),
):
- """
- Delete a file or folder
- """
- api_logger.info(f"Request to delete file: file_id={file_id}, username: {current_user.username}")
- await _delete_file(db=db, file_id=file_id, current_user=current_user)
+ """Delete a file or folder"""
+ api_logger.info(f"Request to delete file: file_id={file_id}")
+ await _delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service)
return success(msg="File deleted successfully")
+
async def _delete_file(
file_id: uuid.UUID,
- db: Session = Depends(get_db),
- current_user: User = Depends(get_current_user)
+ db: Session,
+ current_user: User,
+ storage_service: FileStorageService,
) -> None:
- """
- Delete a file or folder
- """
- # 1. Check if the file exists
+ """Delete a file or folder from storage and database"""
db_file = file_service.get_file_by_id(db, file_id=file_id)
-
if not db_file:
- api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}")
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="The file does not exist or you do not have permission to access it"
- )
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
- # 2. Construct physical path
- file_path = Path(
- settings.FILE_PATH,
- str(db_file.kb_id),
- str(db_file.id)
- ) if db_file.file_ext == 'folder' else Path(
- settings.FILE_PATH,
- str(db_file.kb_id),
- str(db_file.parent_id),
- f"{db_file.id}{db_file.file_ext}"
- )
-
- # 3. Delete physical files/folders
- try:
- if file_path.exists():
- if db_file.file_ext == 'folder':
- shutil.rmtree(file_path) # Recursively delete folders
- else:
- file_path.unlink() # Delete a single file
- except Exception as e:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to delete physical file/folder: {str(e)}"
- )
-
- # 4.Delete db_file
+ # Delete from storage backend
if db_file.file_ext == 'folder':
+ # For folders, delete all child files from storage first
+ child_files = db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).all()
+ for child in child_files:
+ if child.file_key:
+ try:
+ await storage_service.delete_file(child.file_key)
+ except Exception as e:
+ api_logger.warning(f"Failed to delete child file from storage: {child.file_key} - {e}")
db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).delete()
+ else:
+ if db_file.file_key:
+ try:
+ await storage_service.delete_file(db_file.file_key)
+ except Exception as e:
+ api_logger.warning(f"Failed to delete file from storage: {db_file.file_key} - {e}")
+
db.delete(db_file)
db.commit()
diff --git a/api/app/core/rag/prompts/vision_llm_describe_prompt.md b/api/app/core/rag/prompts/vision_llm_describe_prompt.md
index 8800703d..151e06b6 100644
--- a/api/app/core/rag/prompts/vision_llm_describe_prompt.md
+++ b/api/app/core/rag/prompts/vision_llm_describe_prompt.md
@@ -14,6 +14,7 @@ Transcribe the content from the provided PDF page image into clean Markdown form
6. Do NOT wrap the output in ```markdown or ``` blocks.
7. Only apply Markdown structure to headings, paragraphs, lists, and tables, strictly based on the layout of the image. Do NOT create tables unless an actual table exists in the image.
8. Preserve the original language, information, and order exactly as shown in the image.
+9. Your output language MUST match the language of the content in the image. If the image contains Chinese text, output in Chinese. If English, output in English. Never translate.
{% if page %}
At the end of the transcription, add the page divider: `--- Page {{ page }} ---`.
diff --git a/api/app/models/file_model.py b/api/app/models/file_model.py
index 44a7d613..5f11b185 100644
--- a/api/app/models/file_model.py
+++ b/api/app/models/file_model.py
@@ -15,4 +15,5 @@ class File(Base):
file_ext = Column(String, index=True, nullable=False, comment="file extension:folder|pdf")
file_size = Column(Integer, default=0, comment="file size(byte)")
file_url = Column(String, index=True, nullable=True, comment="file comes from a website url")
+ file_key = Column(String(512), nullable=True, index=True, comment="storage file key for FileStorageService")
created_at = Column(DateTime, default=datetime.datetime.now)
\ No newline at end of file
diff --git a/api/app/schemas/file_schema.py b/api/app/schemas/file_schema.py
index 7245671a..d01e8c77 100644
--- a/api/app/schemas/file_schema.py
+++ b/api/app/schemas/file_schema.py
@@ -11,6 +11,7 @@ class FileBase(BaseModel):
file_ext: str
file_size: int
file_url: str | None = None
+ file_key: str | None = None
created_at: datetime.datetime | None = None
diff --git a/api/app/services/file_storage_service.py b/api/app/services/file_storage_service.py
index 5897936b..22a864dc 100644
--- a/api/app/services/file_storage_service.py
+++ b/api/app/services/file_storage_service.py
@@ -34,26 +34,7 @@ def generate_file_key(
Generate a unique file key for storage.
The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext}
-
- Args:
- tenant_id: The tenant UUID.
- workspace_id: The workspace UUID.
- file_id: The file UUID.
- file_ext: The file extension (e.g., '.pdf', '.txt').
-
- Returns:
- A unique file key string.
-
- Example:
- >>> generate_file_key(
- ... uuid.UUID('550e8400-e29b-41d4-a716-446655440000'),
- ... uuid.UUID('660e8400-e29b-41d4-a716-446655440001'),
- ... uuid.UUID('770e8400-e29b-41d4-a716-446655440002'),
- ... '.pdf'
- ... )
- '550e8400-e29b-41d4-a716-446655440000/660e8400-e29b-41d4-a716-446655440001/770e8400-e29b-41d4-a716-446655440002.pdf'
"""
- # Ensure file_ext starts with a dot
if file_ext and not file_ext.startswith('.'):
file_ext = f'.{file_ext}'
if workspace_id:
@@ -61,6 +42,21 @@ def generate_file_key(
return f"{tenant_id}/{file_id}{file_ext}"
+def generate_kb_file_key(
+ kb_id: uuid.UUID,
+ file_id: uuid.UUID,
+ file_ext: str,
+) -> str:
+ """
+ Generate a file key for knowledge base files.
+
+ Format: kb/{kb_id}/{file_id}{file_ext}
+ """
+ if file_ext and not file_ext.startswith('.'):
+ file_ext = f'.{file_ext}'
+ return f"kb/{kb_id}/{file_id}{file_ext}"
+
+
class FileStorageService:
"""
High-level service for file storage operations.
diff --git a/api/app/tasks.py b/api/app/tasks.py
index 5a71066a..2e024255 100644
--- a/api/app/tasks.py
+++ b/api/app/tasks.py
@@ -210,9 +210,14 @@ def _build_vision_model(file_path: str, db_knowledge):
@celery_app.task(name="app.core.rag.tasks.parse_document")
-def parse_document(file_path: str, document_id: uuid.UUID):
+def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""):
"""
- Document parsing, vectorization, and storage
+ Document parsing, vectorization, and storage.
+
+ Args:
+ file_key: Storage key for FileStorageService (e.g. "kb/{kb_id}/{file_id}.docx")
+ document_id: Document UUID
+ file_name: Original file name (used for extension detection in chunk())
"""
db_document = None
@@ -223,7 +228,6 @@ def parse_document(file_path: str, document_id: uuid.UUID):
with get_db_context() as db:
try:
- # Celery JSON 序列化会将 UUID 转为字符串,需要确保类型正确
if not isinstance(document_id, uuid.UUID):
document_id = uuid.UUID(str(document_id))
@@ -234,7 +238,11 @@ def parse_document(file_path: str, document_id: uuid.UUID):
if db_knowledge is None:
raise ValueError(f"Knowledge {db_document.kb_id} not found")
- # 1. Document parsing & segmentation
+ # Use file_name from argument or fall back to document record
+ if not file_name:
+ file_name = db_document.file_name
+
+ # 1. Download file from storage backend
progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} Start to parse.")
start_time = time.time()
db_document.progress = 0.0
@@ -245,14 +253,36 @@ def parse_document(file_path: str, document_id: uuid.UUID):
db.commit()
db.refresh(db_document)
+ # Read file content from storage backend (no NFS dependency)
+ from app.services.file_storage_service import FileStorageService
+ import asyncio
+ storage_service = FileStorageService()
+
+ async def _download():
+ return await storage_service.download_file(file_key)
+
+ try:
+ file_binary = asyncio.run(_download())
+ except RuntimeError:
+ # If there's already a running loop (e.g. in some worker configurations)
+ loop = asyncio.new_event_loop()
+ try:
+ file_binary = loop.run_until_complete(_download())
+ finally:
+ loop.close()
+ if not file_binary:
+ raise IOError(f"Downloaded empty file from storage: {file_key}")
+ logger.info(f"[ParseDoc] Downloaded {len(file_binary)} bytes from storage key: {file_key}")
+
def progress_callback(prog=None, msg=None):
progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} parse progress: {prog} msg: {msg}.")
# Prepare vision_model for parsing
- vision_model = _build_vision_model(file_path, db_knowledge)
+ vision_model = _build_vision_model(file_name, db_knowledge)
from app.core.rag.app.naive import chunk
- res = chunk(filename=file_path,
+ res = chunk(filename=file_name,
+ binary=file_binary,
from_page=0,
to_page=DEFAULT_PARSE_TO_PAGE,
callback=progress_callback,
From c53fcf3981cc7ded5005de8fe04730b1ccd662c0 Mon Sep 17 00:00:00 2001
From: Mark <348207283@qq.com>
Date: Mon, 27 Apr 2026 17:10:00 +0800
Subject: [PATCH 10/11] [fix] old code file_path
---
api/app/tasks.py | 32 +-------------------------------
1 file changed, 1 insertion(+), 31 deletions(-)
diff --git a/api/app/tasks.py b/api/app/tasks.py
index 578a0e8d..3ad1a0dd 100644
--- a/api/app/tasks.py
+++ b/api/app/tasks.py
@@ -280,39 +280,9 @@ def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""):
# Prepare vision_model for parsing
vision_model = _build_vision_model(file_name, db_knowledge)
- # 先将文件读入内存,避免解析过程中依赖 NFS 文件持续可访问
- # python-docx 等库在 binary=None 时会用路径直接打开文件,
- # 在 NFS/共享存储上可能因缓存失效导致 "Package not found"
- max_wait_seconds = 30
- wait_interval = 2
- waited = 0
- file_binary = None
- while waited <= max_wait_seconds:
- # os.listdir 强制 NFS 客户端刷新目录缓存
- parent_dir = os.path.dirname(file_path)
- try:
- os.listdir(parent_dir)
- except OSError:
- pass
- try:
- with open(file_path, "rb") as f:
- file_binary = f.read()
- if not file_binary:
- # NFS 上文件存在但内容为空(可能还在同步中)
- raise IOError(f"File is empty (0 bytes), NFS may still be syncing: {file_path}")
- break
- except (FileNotFoundError, IOError) as e:
- if waited >= max_wait_seconds:
- raise type(e)(
- f"File not accessible at '{file_path}' after waiting {max_wait_seconds}s: {e}"
- )
- logger.warning(f"File not ready on this node, retrying in {wait_interval}s: {file_path} ({e})")
- time.sleep(wait_interval)
- waited += wait_interval
-
from app.core.rag.app.naive import chunk
logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}")
- res = chunk(filename=file_path,
+ res = chunk(filename=file_name,
binary=file_binary,
from_page=0,
to_page=DEFAULT_PARSE_TO_PAGE,
From 4bef9b578b4be8cdb65301ea339637cb80664a81 Mon Sep 17 00:00:00 2001
From: Mark <348207283@qq.com>
Date: Mon, 27 Apr 2026 17:35:13 +0800
Subject: [PATCH 11/11] [fix] document file delete
---
api/app/controllers/document_controller.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/api/app/controllers/document_controller.py b/api/app/controllers/document_controller.py
index 02e16943..a08aebe1 100644
--- a/api/app/controllers/document_controller.py
+++ b/api/app/controllers/document_controller.py
@@ -20,6 +20,7 @@ from app.models.user_model import User
from app.schemas import document_schema
from app.schemas.response_schema import ApiResponse
from app.services import document_service, file_service, knowledge_service
+from app.services.file_storage_service import FileStorageService, get_file_storage_service
# Obtain a dedicated API logger
@@ -231,7 +232,8 @@ async def update_document(
async def delete_document(
document_id: uuid.UUID,
db: Session = Depends(get_db),
- current_user: User = Depends(get_current_user)
+ current_user: User = Depends(get_current_user),
+ storage_service: FileStorageService = Depends(get_file_storage_service),
):
"""
Delete document
@@ -257,7 +259,7 @@ async def delete_document(
db.commit()
# 3. Delete file
- await file_controller._delete_file(db=db, file_id=file_id, current_user=current_user)
+ await file_controller._delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service)
# 4. Delete vector index
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user)