Merge pull request #695 from SuanmoSuanyangTechnology/feature/agent-tool_xjn
feat(workflow)
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
from .config import DocExtractorNodeConfig
|
||||
from .node import DocExtractorNode
|
||||
|
||||
__all__ = ["DocExtractorNode", "DocExtractorNodeConfig"]
|
||||
18
api/app/core/workflow/nodes/document_extractor/config.py
Normal file
18
api/app/core/workflow/nodes/document_extractor/config.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from pydantic import Field
|
||||
from app.core.workflow.nodes.base_config import BaseNodeConfig
|
||||
|
||||
|
||||
class DocExtractorNodeConfig(BaseNodeConfig):
|
||||
file_selector: str = Field(
|
||||
...,
|
||||
description="File variable selector, e.g. {{ sys.files }} or {{ node_id.file }}"
|
||||
)
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"examples": [
|
||||
{
|
||||
"file_selector": "{{ sys.files }}"
|
||||
}
|
||||
]
|
||||
}
|
||||
103
api/app/core/workflow/nodes/document_extractor/node.py
Normal file
103
api/app/core/workflow/nodes/document_extractor/node.py
Normal file
@@ -0,0 +1,103 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from app.core.workflow.engine.state_manager import WorkflowState
|
||||
from app.core.workflow.engine.variable_pool import VariablePool
|
||||
from app.core.workflow.nodes.base_node import BaseNode
|
||||
from app.core.workflow.nodes.document_extractor.config import DocExtractorNodeConfig
|
||||
from app.core.workflow.variable.base_variable import VariableType, FileObject
|
||||
from app.db import get_db_read
|
||||
from app.schemas.app_schema import FileInput, FileType, TransferMethod
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _file_object_to_file_input(f: FileObject) -> FileInput:
|
||||
"""Convert workflow FileObject to multimodal FileInput."""
|
||||
return FileInput(
|
||||
type=FileType.DOCUMENT,
|
||||
transfer_method=TransferMethod(f.transfer_method),
|
||||
url=f.url or None,
|
||||
upload_file_id=f.file_id or None,
|
||||
file_type=f.origin_file_type or "",
|
||||
)
|
||||
|
||||
|
||||
def _normalise_files(val: Any) -> list[FileObject]:
|
||||
if isinstance(val, FileObject):
|
||||
return [val]
|
||||
if isinstance(val, dict) and val.get("is_file"):
|
||||
return [FileObject(**val)]
|
||||
if isinstance(val, list):
|
||||
result: list[FileObject] = []
|
||||
for item in val:
|
||||
if isinstance(item, FileObject):
|
||||
result.append(item)
|
||||
elif isinstance(item, dict) and item.get("is_file"):
|
||||
result.append(FileObject(**item))
|
||||
else:
|
||||
logger.warning("Ignoring non-file entry in file list for document extractor: %r", item)
|
||||
return result
|
||||
return []
|
||||
|
||||
|
||||
class DocExtractorNode(BaseNode):
|
||||
"""Document Extractor Node.
|
||||
|
||||
Reads one or more file variables and extracts their text content
|
||||
by delegating to MultimodalService._extract_document_text.
|
||||
|
||||
Outputs:
|
||||
text (string) – full concatenated text of all input files
|
||||
chunks (array[string]) – per-file extracted text
|
||||
"""
|
||||
|
||||
def _output_types(self) -> dict[str, VariableType]:
|
||||
return {
|
||||
"text": VariableType.STRING,
|
||||
"chunks": VariableType.ARRAY_STRING,
|
||||
}
|
||||
|
||||
def _extract_output(self, business_result: Any) -> Any:
|
||||
return business_result
|
||||
|
||||
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
|
||||
return {"file_selector": self.config.get("file_selector")}
|
||||
|
||||
async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> Any:
|
||||
config = DocExtractorNodeConfig(**self.config)
|
||||
|
||||
raw_val = self.get_variable(config.file_selector, variable_pool, strict=False)
|
||||
if raw_val is None:
|
||||
logger.warning(f"Node {self.node_id}: file variable '{config.file_selector}' is empty")
|
||||
return {"text": "", "chunks": []}
|
||||
|
||||
files = _normalise_files(raw_val)
|
||||
if not files:
|
||||
return {"text": "", "chunks": []}
|
||||
|
||||
chunks: list[str] = []
|
||||
with get_db_read() as db:
|
||||
from app.services.multimodal_service import MultimodalService
|
||||
svc = MultimodalService(db)
|
||||
for f in files:
|
||||
try:
|
||||
file_input = _file_object_to_file_input(f)
|
||||
# Ensure URL is populated for local files
|
||||
if not file_input.url:
|
||||
file_input.url = await svc.get_file_url(file_input)
|
||||
# Reuse cached bytes if already fetched
|
||||
if f.get_content():
|
||||
file_input.set_content(f.get_content())
|
||||
text = await svc._extract_document_text(file_input)
|
||||
chunks.append(text)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Node {self.node_id}: failed to extract file url={f.url} file_id={f.file_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
chunks.append("")
|
||||
|
||||
full_text = "\n\n".join(c for c in chunks if c)
|
||||
logger.info(f"Node {self.node_id}: extracted {len(files)} file(s), total chars={len(full_text)}")
|
||||
return {"text": full_text, "chunks": chunks}
|
||||
@@ -23,6 +23,7 @@ class NodeType(StrEnum):
|
||||
BREAK = "break"
|
||||
MEMORY_READ = "memory-read"
|
||||
MEMORY_WRITE = "memory-write"
|
||||
DOCUMENT_EXTRACTOR = "document-extractor"
|
||||
|
||||
UNKNOWN = "unknown"
|
||||
NOTES = "notes"
|
||||
|
||||
@@ -26,6 +26,7 @@ from app.core.workflow.nodes.variable_aggregator import VariableAggregatorNode
|
||||
from app.core.workflow.nodes.question_classifier import QuestionClassifierNode
|
||||
from app.core.workflow.nodes.breaker import BreakNode
|
||||
from app.core.workflow.nodes.tool import ToolNode
|
||||
from app.core.workflow.nodes.document_extractor import DocExtractorNode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -49,7 +50,8 @@ WorkflowNode = Union[
|
||||
ToolNode,
|
||||
MemoryReadNode,
|
||||
MemoryWriteNode,
|
||||
CodeNode
|
||||
CodeNode,
|
||||
DocExtractorNode
|
||||
]
|
||||
|
||||
|
||||
@@ -81,6 +83,7 @@ class NodeFactory:
|
||||
NodeType.MEMORY_READ: MemoryReadNode,
|
||||
NodeType.MEMORY_WRITE: MemoryWriteNode,
|
||||
NodeType.CODE: CodeNode,
|
||||
NodeType.DOCUMENT_EXTRACTOR: DocExtractorNode
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -403,71 +403,6 @@ class MultimodalService:
|
||||
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件,provider={self.provider}")
|
||||
return result
|
||||
|
||||
async def history_process_files(
|
||||
self,
|
||||
files: Optional[List[FileInput]],
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
处理文件列表,返回 LLM 可用的格式
|
||||
|
||||
Args:
|
||||
files: 文件输入列表
|
||||
|
||||
Returns:
|
||||
List[Dict]: LLM 可用的内容格式列表(根据 provider 返回不同格式)
|
||||
"""
|
||||
if not files:
|
||||
return []
|
||||
|
||||
# 获取对应的策略
|
||||
# dashscope 的 omni 模型使用 OpenAI 兼容格式
|
||||
if self.provider == "dashscope" and self.is_omni:
|
||||
strategy_class = OpenAIFormatStrategy
|
||||
else:
|
||||
strategy_class = PROVIDER_STRATEGIES.get(self.provider)
|
||||
if not strategy_class:
|
||||
logger.warning(f"未找到 provider '{self.provider}' 的策略,使用默认策略")
|
||||
strategy_class = DashScopeFormatStrategy
|
||||
|
||||
result = []
|
||||
for idx, file in enumerate(files):
|
||||
strategy = strategy_class(file)
|
||||
if not file.url:
|
||||
file.url = await self.get_file_url(file)
|
||||
try:
|
||||
if file.type == FileType.IMAGE and "vision" in self.capability:
|
||||
is_support, content = await self._process_image(file, strategy)
|
||||
result.append(content)
|
||||
elif file.type == FileType.DOCUMENT:
|
||||
is_support, content = await self._process_document(file, strategy)
|
||||
result.append(content)
|
||||
elif file.type == FileType.AUDIO and "audio" in self.capability:
|
||||
is_support, content = await self._process_audio(file, strategy)
|
||||
result.append(content)
|
||||
elif file.type == FileType.VIDEO and "video" in self.capability:
|
||||
is_support, content = await self._process_video(file, strategy)
|
||||
result.append(content)
|
||||
else:
|
||||
logger.warning(f"不支持的文件类型: {file.type}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"处理文件失败",
|
||||
extra={
|
||||
"file_index": idx,
|
||||
"file_type": file.type,
|
||||
"error": str(e)
|
||||
},
|
||||
exc_info=True
|
||||
)
|
||||
# 继续处理其他文件,不中断整个流程
|
||||
result.append({
|
||||
"type": "text",
|
||||
"text": f"[文件处理失败: {str(e)}]"
|
||||
})
|
||||
|
||||
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件,provider={self.provider}")
|
||||
return result
|
||||
|
||||
async def _process_image(self, file: FileInput, strategy) -> tuple[bool, Dict[str, Any]]:
|
||||
"""
|
||||
处理图片文件
|
||||
|
||||
Reference in New Issue
Block a user