Merge branch 'refs/heads/develop' into fix/memory_bug_fix

# Conflicts:
#	api/app/services/memory_storage_service.py
This commit is contained in:
lixinyue
2026-01-26 17:08:55 +08:00
55 changed files with 2068 additions and 698 deletions

View File

@@ -133,7 +133,7 @@ class WorkflowExecutor:
for node in self.workflow_config.get("nodes")
if node.get("type") in [NodeType.LOOP, NodeType.ITERATION]
], # loop, iteration node id
"looping": False, # loop runing flag, only use in loop node,not use in main loop
"looping": 0, # loop runing flag, only use in loop node,not use in main loop
"activate": {
self.start_node_id: True
}
@@ -261,7 +261,7 @@ class WorkflowExecutor:
"data": {
"execution_id": self.execution_id,
"workspace_id": self.workspace_id,
"timestamp": start_time.isoformat()
"timestamp": int(start_time.timestamp() * 1000)
}
}
@@ -293,20 +293,33 @@ class WorkflowExecutor:
# Handle custom streaming events (chunks from nodes via stream writer)
chunk_count += 1
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}"
f"- execution_id: {self.execution_id}")
yield {
"event": event_type, # "message" or "node_chunk"
"data": {
"node_id": data.get("node_id"),
"chunk": data.get("chunk"),
"full_content": data.get("full_content"),
"chunk_index": data.get("chunk_index"),
"is_prefix": data.get("is_prefix"),
"is_suffix": data.get("is_suffix"),
"conversation_id": input_data.get("conversation_id"),
if event_type in ("message", "node_chunk"):
logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}"
f"- execution_id: {self.execution_id}")
yield {
"event": event_type, # "message" or "node_chunk"
"data": {
"node_id": data.get("node_id"),
"chunk": data.get("chunk"),
"full_content": data.get("full_content"),
"chunk_index": data.get("chunk_index"),
"is_prefix": data.get("is_prefix"),
"is_suffix": data.get("is_suffix"),
"conversation_id": input_data.get("conversation_id"),
}
}
elif event_type == "node_error":
yield {
"event": event_type, # "message" or "node_chunk"
"data": {
"node_id": data.get("node_id"),
"status": "failed",
"input": data.get("input_data"),
"elapsed_time": data.get("elapsed_time"),
"output": None,
"error": data.get("error")
}
}
}
elif mode == "debug":
# Handle debug information (node execution status)
@@ -325,14 +338,15 @@ class WorkflowExecutor:
conversation_id = input_data.get("conversation_id")
logger.info(f"[NODE-START] Node starts execution: {node_name} "
f"- execution_id: {self.execution_id}")
yield {
"event": "node_start",
"data": {
"node_id": node_name,
"conversation_id": conversation_id,
"execution_id": self.execution_id,
"timestamp": data.get("timestamp"),
"timestamp": int(datetime.datetime.fromisoformat(
data.get("timestamp")
).timestamp() * 1000),
}
}
elif event_type == "task_result":
@@ -351,13 +365,18 @@ class WorkflowExecutor:
"node_id": node_name,
"conversation_id": conversation_id,
"execution_id": self.execution_id,
"timestamp": data.get("timestamp"),
"state": result.get("node_outputs", {}).get(node_name),
"timestamp": int(datetime.datetime.fromisoformat(
data.get("timestamp")
).timestamp() * 1000),
"input": result.get("node_outputs", {}).get(node_name, {}).get("input"),
"output": result.get("node_outputs", {}).get(node_name, {}).get("output"),
"elapsed_time": result.get("node_outputs", {}).get(node_name, {}).get("elapsed_time"),
}
}
elif mode == "updates":
# Handle state updates - store final state
# TODO:流式输出点
logger.debug(f"[UPDATES] 收到 state 更新 from {list(data.keys())} "
f"- execution_id: {self.execution_id}")

View File

@@ -19,13 +19,17 @@ from app.core.workflow.variable_pool import VariablePool
logger = logging.getLogger(__name__)
def merget_activate_state(x, y):
def merge_activate_state(x, y):
return {
k: x.get(k, False) or y.get(k, False)
for k in set(x) | set(y)
}
def merge_looping_state(x, y):
return y if y > x else x
class WorkflowState(TypedDict):
"""Workflow state
@@ -36,7 +40,7 @@ class WorkflowState(TypedDict):
# Set of loop node IDs, used for assigning values in loop nodes
cycle_nodes: list
looping: Annotated[bool, lambda x, y: x and y]
looping: Annotated[int, merge_looping_state]
# Input variables (passed from configured variables)
# Uses a deep merge function, supporting nested dict updates (e.g., conv.xxx)
@@ -68,7 +72,7 @@ class WorkflowState(TypedDict):
streaming_buffer: Annotated[dict[str, Any], lambda x, y: {**x, **y}]
# node activate status
activate: Annotated[dict[str, bool], merget_activate_state]
activate: Annotated[dict[str, bool], merge_activate_state]
class BaseNode(ABC):
@@ -540,6 +544,11 @@ class BaseNode(ABC):
"error_node": self.node_id
}
else:
writer = get_stream_writer()
writer({
"type": "node_error",
**node_output
})
# 无错误边:抛出异常停止工作流
logger.error(f"节点 {self.node_id} 执行失败,停止工作流: {error_message}")
raise Exception(f"节点 {self.node_id} 执行失败: {error_message}")

View File

@@ -28,6 +28,6 @@ class BreakNode(BaseNode):
Returns:
Optional dictionary indicating the loop has been stopped.
"""
state["looping"] = False
state["looping"] = 2
logger.info(f"Setting cycle node exit flag, cycle={self.cycle}, looping={state['looping']}")

View File

@@ -58,10 +58,10 @@ class IterationRuntime:
idx: Index of the element in the input array.
Returns:
A deep copy of the workflow state with iteration-specific variables set.
A copy of the workflow state with iteration-specific variables set.
"""
loopstate = WorkflowState(
**copy.deepcopy(self.state)
**self.state
)
loopstate["runtime_vars"][self.node_id] = {
"item": item,
@@ -71,7 +71,7 @@ class IterationRuntime:
"item": item,
"index": idx,
}
loopstate["looping"] = True
loopstate["looping"] = 1
loopstate["activate"][self.start_id] = True
return loopstate
@@ -89,7 +89,7 @@ class IterationRuntime:
self.result.extend(output)
else:
self.result.append(output)
if not result["looping"]:
if result["looping"] == 2:
self.looping = False
return result
@@ -150,10 +150,9 @@ class IterationRuntime:
self.result.extend(output)
else:
self.result.append(output)
if not result["looping"]:
if result["looping"] == 2:
self.looping = False
idx += 1
logger.info(f"Iteration node {self.node_id}: execution completed")
return {
"output": self.result,

View File

@@ -46,6 +46,7 @@ class LoopRuntime:
self.state = state
self.node_id = node_id
self.typed_config = LoopNodeConfig(**config)
self.looping = True
def _init_loop_state(self):
"""
@@ -88,7 +89,7 @@ class LoopRuntime:
loopstate = WorkflowState(
**self.state
)
loopstate["looping"] = True
loopstate["looping"] = 1
loopstate["activate"][self.start_id] = True
return loopstate
@@ -179,9 +180,12 @@ class LoopRuntime:
loopstate = self._init_loop_state()
loop_time = self.typed_config.max_loop
child_state = []
while self.evaluate_conditional(loopstate) and loopstate["looping"] and loop_time > 0:
while self.evaluate_conditional(loopstate) and self.looping and loop_time > 0:
logger.info(f"loop node {self.node_id}: running")
child_state.append(await self.graph.ainvoke(loopstate))
result = await self.graph.ainvoke(loopstate)
child_state.append(result)
if result["looping"] == 2:
self.looping = False
loop_time -= 1
logger.info(f"loop node {self.node_id}: execution completed")

View File

@@ -0,0 +1,52 @@
"""2026011240
Revision ID: 325b759cd66b
Revises: 9a936a9ebb20
Create Date: 2026-01-26 12:37:35.946749
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = '325b759cd66b'
down_revision: Union[str, None] = '9a936a9ebb20'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# 1. 重命名表 data_config -> memory_config
op.rename_table('data_config', 'memory_config')
# 2. 重命名列 group_id -> end_user_id
op.alter_column('memory_config', 'group_id', new_column_name='end_user_id')
# 3. config_id: INTEGER -> UUID保留旧值以便回滚
op.drop_constraint('data_config_pkey', 'memory_config', type_='primary')
op.alter_column('memory_config', 'config_id', new_column_name='config_id_old', nullable=True)
op.add_column('memory_config', sa.Column('config_id', sa.UUID(), nullable=True))
op.execute("UPDATE memory_config SET config_id = apply_id::uuid")
op.alter_column('memory_config', 'config_id', nullable=False)
op.create_primary_key('memory_config_pkey', 'memory_config', ['config_id'])
op.execute("DROP SEQUENCE IF EXISTS data_config_config_id_seq")
def downgrade() -> None:
# 1. config_id: UUID -> INTEGER恢复旧值空值生成新ID
op.execute("CREATE SEQUENCE IF NOT EXISTS data_config_config_id_seq")
op.execute("UPDATE memory_config SET config_id_old = nextval('data_config_config_id_seq') WHERE config_id_old IS NULL")
op.drop_constraint('memory_config_pkey', 'memory_config', type_='primary')
op.drop_column('memory_config', 'config_id')
op.alter_column('memory_config', 'config_id_old', new_column_name='config_id', nullable=False)
op.create_primary_key('data_config_pkey', 'memory_config', ['config_id'])
op.execute("ALTER SEQUENCE data_config_config_id_seq OWNED BY memory_config.config_id")
op.execute("SELECT setval('data_config_config_id_seq', COALESCE((SELECT MAX(config_id) FROM memory_config), 1))")
# 2. 重命名列 end_user_id -> group_id
op.alter_column('memory_config', 'end_user_id', new_column_name='group_id')
# 3. 重命名表 memory_config -> data_config
op.rename_table('memory_config', 'data_config')