Merge pull request #785 from wanxunyang/feat/app-log-wxy
feat(workflow): add opening statement and citation support
This commit is contained in:
@@ -59,6 +59,9 @@ class WorkflowResultBuilder:
|
||||
conversation_vars = variable_pool.get_all_conversation_vars()
|
||||
sys_vars = variable_pool.get_all_system_vars()
|
||||
|
||||
# 汇总所有 knowledge 节点的 citations
|
||||
citations = self.aggregate_citations(node_outputs)
|
||||
|
||||
return {
|
||||
"status": "completed" if success else "failed",
|
||||
"output": final_output,
|
||||
@@ -71,9 +74,25 @@ class WorkflowResultBuilder:
|
||||
"conversation_id": execution_context.conversation_id,
|
||||
"elapsed_time": elapsed_time,
|
||||
"token_usage": token_usage,
|
||||
"citations": citations,
|
||||
"error": result.get("error"),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def aggregate_citations(node_outputs: dict) -> list:
|
||||
"""从所有 knowledge 节点的输出中汇总 citations,去重"""
|
||||
seen = set()
|
||||
citations = []
|
||||
for node_output in node_outputs.values():
|
||||
if not isinstance(node_output, dict):
|
||||
continue
|
||||
for c in node_output.get("citations", []):
|
||||
key = c.get("document_id")
|
||||
if key and key not in seen:
|
||||
seen.add(key)
|
||||
citations.append(c)
|
||||
return citations
|
||||
|
||||
@staticmethod
|
||||
def aggregate_token_usage(node_outputs: dict) -> dict[str, int] | None:
|
||||
"""
|
||||
|
||||
@@ -395,7 +395,8 @@ class BaseNode(ABC):
|
||||
"output": output,
|
||||
"elapsed_time": elapsed_time,
|
||||
"token_usage": token_usage,
|
||||
"error": None
|
||||
"error": None,
|
||||
**self._extract_extra_fields(business_result),
|
||||
}
|
||||
final_output = {
|
||||
"node_outputs": {self.node_id: node_output},
|
||||
@@ -498,6 +499,13 @@ class BaseNode(ABC):
|
||||
# Default implementation returns the business result directly
|
||||
return business_result
|
||||
|
||||
def _extract_extra_fields(self, business_result: Any) -> dict:
|
||||
"""Extracts extra fields to merge into node_output (e.g. citations).
|
||||
|
||||
Subclasses may override to inject additional metadata.
|
||||
"""
|
||||
return {}
|
||||
|
||||
def _extract_token_usage(self, business_result: Any) -> dict[str, int] | None:
|
||||
"""Extracts token usage information from the business result.
|
||||
|
||||
|
||||
@@ -34,6 +34,20 @@ class KnowledgeRetrievalNode(BaseNode):
|
||||
"output": VariableType.ARRAY_STRING
|
||||
}
|
||||
|
||||
def _extract_output(self, business_result: Any) -> Any:
|
||||
"""下游节点只拿 chunks 列表"""
|
||||
if isinstance(business_result, dict) and "chunks" in business_result:
|
||||
return business_result["chunks"]
|
||||
return business_result
|
||||
|
||||
def _extract_citations(self, business_result: Any) -> list:
|
||||
if isinstance(business_result, dict):
|
||||
return business_result.get("citations", [])
|
||||
return []
|
||||
|
||||
def _extract_extra_fields(self, business_result: Any) -> dict:
|
||||
return {"citations": self._extract_citations(business_result)}
|
||||
|
||||
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
|
||||
return {
|
||||
"query": self._render_template(self.typed_config.query, variable_pool),
|
||||
@@ -314,4 +328,20 @@ class KnowledgeRetrievalNode(BaseNode):
|
||||
logger.info(
|
||||
f"Node {self.node_id}: knowledge base retrieval completed, results count: {len(final_rs)}"
|
||||
)
|
||||
return [chunk.page_content for chunk in final_rs]
|
||||
citations = []
|
||||
seen_doc_ids = set()
|
||||
for chunk in final_rs:
|
||||
meta = chunk.metadata or {}
|
||||
doc_id = meta.get("document_id") or meta.get("doc_id")
|
||||
if doc_id and doc_id not in seen_doc_ids:
|
||||
seen_doc_ids.add(doc_id)
|
||||
citations.append({
|
||||
"document_id": str(doc_id),
|
||||
"file_name": meta.get("file_name", ""),
|
||||
"knowledge_id": str(meta.get("knowledge_id", kb_config.kb_id)),
|
||||
"score": meta.get("score", 0.0),
|
||||
})
|
||||
return {
|
||||
"chunks": [chunk.page_content for chunk in final_rs],
|
||||
"citations": citations,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user