diff --git a/api/app/core/workflow/nodes/document_extractor/__init__.py b/api/app/core/workflow/nodes/document_extractor/__init__.py new file mode 100644 index 00000000..c51bc2c0 --- /dev/null +++ b/api/app/core/workflow/nodes/document_extractor/__init__.py @@ -0,0 +1,4 @@ +from .config import DocExtractorNodeConfig +from .node import DocExtractorNode + +__all__ = ["DocExtractorNode", "DocExtractorNodeConfig"] diff --git a/api/app/core/workflow/nodes/document_extractor/config.py b/api/app/core/workflow/nodes/document_extractor/config.py new file mode 100644 index 00000000..69f7f76d --- /dev/null +++ b/api/app/core/workflow/nodes/document_extractor/config.py @@ -0,0 +1,18 @@ +from pydantic import Field +from app.core.workflow.nodes.base_config import BaseNodeConfig + + +class DocExtractorNodeConfig(BaseNodeConfig): + file_selector: str = Field( + ..., + description="File variable selector, e.g. {{ sys.files }} or {{ node_id.file }}" + ) + + class Config: + json_schema_extra = { + "examples": [ + { + "file_selector": "{{ sys.files }}" + } + ] + } diff --git a/api/app/core/workflow/nodes/document_extractor/node.py b/api/app/core/workflow/nodes/document_extractor/node.py new file mode 100644 index 00000000..40641f3c --- /dev/null +++ b/api/app/core/workflow/nodes/document_extractor/node.py @@ -0,0 +1,103 @@ +import logging +from typing import Any + +from app.core.workflow.engine.state_manager import WorkflowState +from app.core.workflow.engine.variable_pool import VariablePool +from app.core.workflow.nodes.base_node import BaseNode +from app.core.workflow.nodes.document_extractor.config import DocExtractorNodeConfig +from app.core.workflow.variable.base_variable import VariableType, FileObject +from app.db import get_db_read +from app.schemas.app_schema import FileInput, FileType, TransferMethod + +logger = logging.getLogger(__name__) + + +def _file_object_to_file_input(f: FileObject) -> FileInput: + """Convert workflow FileObject to multimodal FileInput.""" + return FileInput( + type=FileType.DOCUMENT, + transfer_method=TransferMethod(f.transfer_method), + url=f.url or None, + upload_file_id=f.file_id or None, + file_type=f.origin_file_type or "", + ) + + +def _normalise_files(val: Any) -> list[FileObject]: + if isinstance(val, FileObject): + return [val] + if isinstance(val, dict) and val.get("is_file"): + return [FileObject(**val)] + if isinstance(val, list): + result: list[FileObject] = [] + for item in val: + if isinstance(item, FileObject): + result.append(item) + elif isinstance(item, dict) and item.get("is_file"): + result.append(FileObject(**item)) + else: + logger.warning("Ignoring non-file entry in file list for document extractor: %r", item) + return result + return [] + + +class DocExtractorNode(BaseNode): + """Document Extractor Node. + + Reads one or more file variables and extracts their text content + by delegating to MultimodalService._extract_document_text. + + Outputs: + text (string) – full concatenated text of all input files + chunks (array[string]) – per-file extracted text + """ + + def _output_types(self) -> dict[str, VariableType]: + return { + "text": VariableType.STRING, + "chunks": VariableType.ARRAY_STRING, + } + + def _extract_output(self, business_result: Any) -> Any: + return business_result + + def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: + return {"file_selector": self.config.get("file_selector")} + + async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> Any: + config = DocExtractorNodeConfig(**self.config) + + raw_val = self.get_variable(config.file_selector, variable_pool, strict=False) + if raw_val is None: + logger.warning(f"Node {self.node_id}: file variable '{config.file_selector}' is empty") + return {"text": "", "chunks": []} + + files = _normalise_files(raw_val) + if not files: + return {"text": "", "chunks": []} + + chunks: list[str] = [] + with get_db_read() as db: + from app.services.multimodal_service import MultimodalService + svc = MultimodalService(db) + for f in files: + try: + file_input = _file_object_to_file_input(f) + # Ensure URL is populated for local files + if not file_input.url: + file_input.url = await svc.get_file_url(file_input) + # Reuse cached bytes if already fetched + if f.get_content(): + file_input.set_content(f.get_content()) + text = await svc._extract_document_text(file_input) + chunks.append(text) + except Exception as e: + logger.error( + f"Node {self.node_id}: failed to extract file url={f.url} file_id={f.file_id}: {e}", + exc_info=True, + ) + chunks.append("") + + full_text = "\n\n".join(c for c in chunks if c) + logger.info(f"Node {self.node_id}: extracted {len(files)} file(s), total chars={len(full_text)}") + return {"text": full_text, "chunks": chunks} diff --git a/api/app/core/workflow/nodes/enums.py b/api/app/core/workflow/nodes/enums.py index 5a603ac9..529cd0b3 100644 --- a/api/app/core/workflow/nodes/enums.py +++ b/api/app/core/workflow/nodes/enums.py @@ -23,6 +23,7 @@ class NodeType(StrEnum): BREAK = "break" MEMORY_READ = "memory-read" MEMORY_WRITE = "memory-write" + DOCUMENT_EXTRACTOR = "document-extractor" UNKNOWN = "unknown" NOTES = "notes" diff --git a/api/app/core/workflow/nodes/node_factory.py b/api/app/core/workflow/nodes/node_factory.py index 9e5a7d24..49add867 100644 --- a/api/app/core/workflow/nodes/node_factory.py +++ b/api/app/core/workflow/nodes/node_factory.py @@ -26,6 +26,7 @@ from app.core.workflow.nodes.variable_aggregator import VariableAggregatorNode from app.core.workflow.nodes.question_classifier import QuestionClassifierNode from app.core.workflow.nodes.breaker import BreakNode from app.core.workflow.nodes.tool import ToolNode +from app.core.workflow.nodes.document_extractor import DocExtractorNode logger = logging.getLogger(__name__) @@ -49,7 +50,8 @@ WorkflowNode = Union[ ToolNode, MemoryReadNode, MemoryWriteNode, - CodeNode + CodeNode, + DocExtractorNode ] @@ -81,6 +83,7 @@ class NodeFactory: NodeType.MEMORY_READ: MemoryReadNode, NodeType.MEMORY_WRITE: MemoryWriteNode, NodeType.CODE: CodeNode, + NodeType.DOCUMENT_EXTRACTOR: DocExtractorNode } @classmethod diff --git a/api/app/services/multimodal_service.py b/api/app/services/multimodal_service.py index 3afd6206..4cf3d89d 100644 --- a/api/app/services/multimodal_service.py +++ b/api/app/services/multimodal_service.py @@ -403,71 +403,6 @@ class MultimodalService: logger.info(f"成功处理 {len(result)}/{len(files)} 个文件,provider={self.provider}") return result - async def history_process_files( - self, - files: Optional[List[FileInput]], - ) -> List[Dict[str, Any]]: - """ - 处理文件列表,返回 LLM 可用的格式 - - Args: - files: 文件输入列表 - - Returns: - List[Dict]: LLM 可用的内容格式列表(根据 provider 返回不同格式) - """ - if not files: - return [] - - # 获取对应的策略 - # dashscope 的 omni 模型使用 OpenAI 兼容格式 - if self.provider == "dashscope" and self.is_omni: - strategy_class = OpenAIFormatStrategy - else: - strategy_class = PROVIDER_STRATEGIES.get(self.provider) - if not strategy_class: - logger.warning(f"未找到 provider '{self.provider}' 的策略,使用默认策略") - strategy_class = DashScopeFormatStrategy - - result = [] - for idx, file in enumerate(files): - strategy = strategy_class(file) - if not file.url: - file.url = await self.get_file_url(file) - try: - if file.type == FileType.IMAGE and "vision" in self.capability: - is_support, content = await self._process_image(file, strategy) - result.append(content) - elif file.type == FileType.DOCUMENT: - is_support, content = await self._process_document(file, strategy) - result.append(content) - elif file.type == FileType.AUDIO and "audio" in self.capability: - is_support, content = await self._process_audio(file, strategy) - result.append(content) - elif file.type == FileType.VIDEO and "video" in self.capability: - is_support, content = await self._process_video(file, strategy) - result.append(content) - else: - logger.warning(f"不支持的文件类型: {file.type}") - except Exception as e: - logger.error( - f"处理文件失败", - extra={ - "file_index": idx, - "file_type": file.type, - "error": str(e) - }, - exc_info=True - ) - # 继续处理其他文件,不中断整个流程 - result.append({ - "type": "text", - "text": f"[文件处理失败: {str(e)}]" - }) - - logger.info(f"成功处理 {len(result)}/{len(files)} 个文件,provider={self.provider}") - return result - async def _process_image(self, file: FileInput, strategy) -> tuple[bool, Dict[str, Any]]: """ 处理图片文件