Merge branch 'refs/heads/develop' into feature/agent-tool_xjn

This commit is contained in:
Timebomb2018
2026-04-24 19:41:23 +08:00
52 changed files with 1752 additions and 217 deletions

View File

@@ -17,6 +17,7 @@ def _mask_url(url: str) -> str:
"""隐藏 URL 中的密码部分,适用于 redis:// 和 amqp:// 等协议""" """隐藏 URL 中的密码部分,适用于 redis:// 和 amqp:// 等协议"""
return re.sub(r'(://[^:]*:)[^@]+(@)', r'\1***\2', url) return re.sub(r'(://[^:]*:)[^@]+(@)', r'\1***\2', url)
# macOS fork() safety - must be set before any Celery initialization # macOS fork() safety - must be set before any Celery initialization
if platform.system() == 'Darwin': if platform.system() == 'Darwin':
os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES') os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')
@@ -29,7 +30,7 @@ if platform.system() == 'Darwin':
# 这些名称会被 Celery CLI 的 Click 框架劫持,详见 docs/celery-env-bug-report.md # 这些名称会被 Celery CLI 的 Click 框架劫持,详见 docs/celery-env-bug-report.md
_broker_url = os.getenv("CELERY_BROKER_URL") or \ _broker_url = os.getenv("CELERY_BROKER_URL") or \
f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BROKER}" f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BROKER}"
_backend_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BACKEND}" _backend_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BACKEND}"
os.environ["CELERY_BROKER_URL"] = _broker_url os.environ["CELERY_BROKER_URL"] = _broker_url
os.environ["CELERY_RESULT_BACKEND"] = _backend_url os.environ["CELERY_RESULT_BACKEND"] = _backend_url
@@ -66,11 +67,11 @@ celery_app.conf.update(
task_serializer='json', task_serializer='json',
accept_content=['json'], accept_content=['json'],
result_serializer='json', result_serializer='json',
# # 时区 # # 时区
# timezone='Asia/Shanghai', # timezone='Asia/Shanghai',
# enable_utc=False, # enable_utc=False,
# 任务追踪 # 任务追踪
task_track_started=True, task_track_started=True,
task_ignore_result=False, task_ignore_result=False,

View File

@@ -0,0 +1,500 @@
import hashlib
import json
import os
import socket
import threading
import time
import uuid
import redis
from app.core.config import settings
from app.core.logging_config import get_named_logger
from app.celery_app import celery_app
logger = get_named_logger("task_scheduler")
# per-user queue scheduler:uq:{user_id}
USER_QUEUE_PREFIX = "scheduler:uq:"
# User Collection of Pending Messages
ACTIVE_USERS = "scheduler:active_users"
# Set of users that can dispatch (ready signal)
READY_SET = "scheduler:ready_users"
# Metadata of tasks that have been dispatched and are pending completion
PENDING_HASH = "scheduler:pending_tasks"
# Dynamic Sharding: Instance Registry
REGISTRY_KEY = "scheduler:instances"
TASK_TIMEOUT = 7800 # Task timeout (seconds), considered lost if exceeded
HEARTBEAT_INTERVAL = 10 # Heartbeat interval (seconds)
INSTANCE_TTL = 30 # Instance timeout (seconds)
LUA_ATOMIC_LOCK = """
local dispatch_lock = KEYS[1]
local lock_key = KEYS[2]
local instance_id = ARGV[1]
local dispatch_ttl = tonumber(ARGV[2])
local lock_ttl = tonumber(ARGV[3])
if redis.call('SET', dispatch_lock, instance_id, 'NX', 'EX', dispatch_ttl) == false then
return 0
end
if redis.call('EXISTS', lock_key) == 1 then
redis.call('DEL', dispatch_lock)
return -1
end
redis.call('SET', lock_key, 'dispatching', 'EX', lock_ttl)
return 1
"""
LUA_SAFE_DELETE = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
def stable_hash(value: str) -> int:
return int.from_bytes(
hashlib.md5(value.encode("utf-8")).digest(),
"big"
)
def health_check_server(scheduler_ref):
import uvicorn
from fastapi import FastAPI
health_app = FastAPI()
@health_app.get("/")
def health():
return scheduler_ref.health()
port = int(os.environ.get("SCHEDULER_HEALTH_PORT", "8001"))
threading.Thread(
target=uvicorn.run,
kwargs={
"app": health_app,
"host": "0.0.0.0",
"port": port,
"log_config": None,
},
daemon=True,
).start()
logger.info("[Health] Server started at http://0.0.0.0:%s", port)
class RedisTaskScheduler:
def __init__(self):
self.redis = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB_CELERY_BACKEND,
password=settings.REDIS_PASSWORD,
decode_responses=True,
)
self.running = False
self.dispatched = 0
self.errors = 0
self.instance_id = f"{socket.gethostname()}-{os.getpid()}"
self._shard_index = 0
self._shard_count = 1
self._last_heartbeat = 0.0
def push_task(self, task_name, user_id, params):
try:
msg_id = str(uuid.uuid4())
msg = json.dumps({
"msg_id": msg_id,
"task_name": task_name,
"user_id": user_id,
"params": json.dumps(params),
})
lock_key = f"{task_name}:{user_id}"
queue_key = f"{USER_QUEUE_PREFIX}{user_id}"
pipe = self.redis.pipeline()
pipe.rpush(queue_key, msg)
pipe.sadd(ACTIVE_USERS, user_id)
pipe.set(
f"task_tracker:{msg_id}",
json.dumps({"status": "QUEUED", "task_id": None}),
ex=86400,
)
pipe.execute()
if not self.redis.exists(lock_key):
self.redis.sadd(READY_SET, user_id)
logger.info("Task pushed: msg_id=%s task=%s user=%s", msg_id, task_name, user_id)
return msg_id
except Exception as e:
logger.error("Push task exception %s", e, exc_info=True)
raise
def get_task_status(self, msg_id: str) -> dict:
raw = self.redis.get(f"task_tracker:{msg_id}")
if raw is None:
return {"status": "NOT_FOUND"}
tracker = json.loads(raw)
status = tracker["status"]
task_id = tracker.get("task_id")
result_content = tracker.get("result") or {}
if status == "DISPATCHED" and task_id:
result_raw = self.redis.get(f"celery-task-meta-{task_id}")
if result_raw:
result_data = json.loads(result_raw)
status = result_data.get("status", status)
result_content = result_data.get("result")
return {"status": status, "task_id": task_id, "result": result_content}
def _cleanup_finished(self):
pending = self.redis.hgetall(PENDING_HASH)
if not pending:
return
now = time.time()
task_ids = list(pending.keys())
pipe = self.redis.pipeline()
for task_id in task_ids:
pipe.get(f"celery-task-meta-{task_id}")
results = pipe.execute()
cleanup_pipe = self.redis.pipeline()
has_cleanup = False
ready_user_ids = set()
for task_id, raw_result in zip(task_ids, results):
try:
meta = json.loads(pending[task_id])
lock_key = meta["lock_key"]
dispatched_at = meta.get("dispatched_at", 0)
age = now - dispatched_at
should_cleanup = False
result_data = {}
if raw_result is not None:
result_data = json.loads(raw_result)
if result_data.get("status") in ("SUCCESS", "FAILURE", "REVOKED"):
should_cleanup = True
logger.info(
"Task finished: %s state=%s", task_id,
result_data.get("status"),
)
elif age > TASK_TIMEOUT:
should_cleanup = True
logger.warning(
"Task expired or lost: %s age=%.0fs, force cleanup",
task_id, age,
)
if should_cleanup:
final_status = (
result_data.get("status", "UNKNOWN") if result_data else "EXPIRED"
)
self.redis.eval(LUA_SAFE_DELETE, 1, lock_key, task_id)
cleanup_pipe.hdel(PENDING_HASH, task_id)
tracker_msg_id = meta.get("msg_id")
if tracker_msg_id:
cleanup_pipe.set(
f"task_tracker:{tracker_msg_id}",
json.dumps({
"status": final_status,
"task_id": task_id,
"result": result_data.get("result") or {},
}),
ex=86400,
)
has_cleanup = True
parts = lock_key.split(":", 1)
if len(parts) == 2:
ready_user_ids.add(parts[1])
except Exception as e:
logger.error("Cleanup error for %s: %s", task_id, e, exc_info=True)
self.errors += 1
if has_cleanup:
cleanup_pipe.execute()
if ready_user_ids:
self.redis.sadd(READY_SET, *ready_user_ids)
def _heartbeat(self):
now = time.time()
if now - self._last_heartbeat < HEARTBEAT_INTERVAL:
return
self._last_heartbeat = now
self.redis.hset(REGISTRY_KEY, self.instance_id, str(now))
all_instances = self.redis.hgetall(REGISTRY_KEY)
alive = []
dead = []
for iid, ts in all_instances.items():
if now - float(ts) < INSTANCE_TTL:
alive.append(iid)
else:
dead.append(iid)
if dead:
pipe = self.redis.pipeline()
for iid in dead:
pipe.hdel(REGISTRY_KEY, iid)
pipe.execute()
logger.info("Cleaned dead instances: %s", dead)
alive.sort()
self._shard_count = max(len(alive), 1)
self._shard_index = (
alive.index(self.instance_id) if self.instance_id in alive else 0
)
logger.debug(
"Shard: %s/%s (instance=%s, alive=%d)",
self._shard_index, self._shard_count,
self.instance_id, len(alive),
)
def _is_mine(self, user_id: str) -> bool:
if self._shard_count <= 1:
return True
return stable_hash(user_id) % self._shard_count == self._shard_index
def _dispatch(self, msg_id, msg_data) -> bool:
user_id = msg_data["user_id"]
task_name = msg_data["task_name"]
params = json.loads(msg_data.get("params", "{}"))
lock_key = f"{task_name}:{user_id}"
dispatch_lock = f"dispatch:{msg_id}"
result = self.redis.eval(
LUA_ATOMIC_LOCK, 2,
dispatch_lock, lock_key,
self.instance_id, str(300), str(3600),
)
if result == 0:
return False
if result == -1:
return False
try:
task = celery_app.send_task(task_name, kwargs=params)
except Exception as e:
pipe = self.redis.pipeline()
pipe.delete(dispatch_lock)
pipe.delete(lock_key)
pipe.execute()
self.errors += 1
logger.error(
"send_task failed for %s:%s msg=%s: %s",
task_name, user_id, msg_id, e, exc_info=True,
)
return False
try:
pipe = self.redis.pipeline()
pipe.set(lock_key, task.id, ex=3600)
pipe.hset(PENDING_HASH, task.id, json.dumps({
"lock_key": lock_key,
"dispatched_at": time.time(),
"msg_id": msg_id,
}))
pipe.delete(dispatch_lock)
pipe.set(
f"task_tracker:{msg_id}",
json.dumps({"status": "DISPATCHED", "task_id": task.id}),
ex=86400,
)
pipe.execute()
except Exception as e:
logger.error(
"Post-dispatch state update failed for %s: %s",
task.id, e, exc_info=True,
)
self.errors += 1
self.dispatched += 1
logger.info("Task dispatched: %s (msg=%s)", task.id, msg_id)
return True
def _process_batch(self, user_ids):
if not user_ids:
return
pipe = self.redis.pipeline()
for uid in user_ids:
pipe.lindex(f"{USER_QUEUE_PREFIX}{uid}", 0)
heads = pipe.execute()
candidates = [] # (user_id, msg_dict)
empty_users = []
for uid, head in zip(user_ids, heads):
if head is None:
empty_users.append(uid)
else:
try:
candidates.append((uid, json.loads(head)))
except (json.JSONDecodeError, TypeError) as e:
logger.error("Bad message in queue for user %s: %s", uid, e)
self.redis.lpop(f"{USER_QUEUE_PREFIX}{uid}")
if empty_users:
pipe = self.redis.pipeline()
for uid in empty_users:
pipe.srem(ACTIVE_USERS, uid)
pipe.execute()
if not candidates:
return
for uid, msg in candidates:
if self._dispatch(msg["msg_id"], msg):
self.redis.lpop(f"{USER_QUEUE_PREFIX}{uid}")
def schedule_loop(self):
self._heartbeat()
self._cleanup_finished()
pipe = self.redis.pipeline()
pipe.smembers(READY_SET)
pipe.delete(READY_SET)
results = pipe.execute()
ready_users = results[0] or set()
my_users = [uid for uid in ready_users if self._is_mine(uid)]
if not my_users:
time.sleep(0.5)
return
self._process_batch(my_users)
time.sleep(0.1)
def _full_scan(self):
cursor = 0
ready_batch = []
while True:
cursor, user_ids = self.redis.sscan(
ACTIVE_USERS, cursor=cursor, count=1000,
)
if user_ids:
my_users = [uid for uid in user_ids if self._is_mine(uid)]
if my_users:
pipe = self.redis.pipeline()
for uid in my_users:
pipe.lindex(f"{USER_QUEUE_PREFIX}{uid}", 0)
heads = pipe.execute()
for uid, head in zip(my_users, heads):
if head is None:
continue
try:
msg = json.loads(head)
lock_key = f"{msg['task_name']}:{uid}"
ready_batch.append((uid, lock_key))
except (json.JSONDecodeError, TypeError):
continue
if cursor == 0:
break
if not ready_batch:
return
pipe = self.redis.pipeline()
for _, lock_key in ready_batch:
pipe.exists(lock_key)
lock_exists = pipe.execute()
ready_uids = [
uid for (uid, _), locked in zip(ready_batch, lock_exists)
if not locked
]
if ready_uids:
self.redis.sadd(READY_SET, *ready_uids)
logger.info("Full scan found %d ready users", len(ready_uids))
def run_server(self):
health_check_server(self)
self.running = True
last_full_scan = 0.0
full_scan_interval = 30.0
logger.info(
"Scheduler started: instance=%s", self.instance_id,
)
while True:
try:
self.schedule_loop()
now = time.time()
if now - last_full_scan > full_scan_interval:
self._full_scan()
last_full_scan = now
except Exception as e:
logger.error("Scheduler exception %s", e, exc_info=True)
self.errors += 1
time.sleep(5)
def health(self) -> dict:
return {
"running": self.running,
"active_users": self.redis.scard(ACTIVE_USERS),
"ready_users": self.redis.scard(READY_SET),
"pending_tasks": self.redis.hlen(PENDING_HASH),
"dispatched": self.dispatched,
"errors": self.errors,
"shard": f"{self._shard_index}/{self._shard_count}",
"instance": self.instance_id,
}
def shutdown(self):
logger.info("Scheduler shutting down: instance=%s", self.instance_id)
self.running = False
try:
self.redis.hdel(REGISTRY_KEY, self.instance_id)
except Exception as e:
logger.error("Shutdown cleanup error: %s", e)
scheduler: RedisTaskScheduler | None = None
if scheduler is None:
scheduler = RedisTaskScheduler()
if __name__ == "__main__":
import signal
import sys
def _signal_handler(signum, frame):
scheduler.shutdown()
sys.exit(0)
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
scheduler.run_server()

View File

@@ -9,7 +9,7 @@ from app.core.logging_config import get_business_logger
from app.core.response_utils import success from app.core.response_utils import success
from app.db import get_db from app.db import get_db
from app.dependencies import get_current_user, cur_workspace_access_guard from app.dependencies import get_current_user, cur_workspace_access_guard
from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage
from app.schemas.response_schema import PageData, PageMeta from app.schemas.response_schema import PageData, PageMeta
from app.services.app_service import AppService from app.services.app_service import AppService
from app.services.app_log_service import AppLogService from app.services.app_log_service import AppLogService
@@ -78,17 +78,32 @@ def get_app_log_detail(
# 验证应用访问权限 # 验证应用访问权限
app_service = AppService(db) app_service = AppService(db)
app_service.get_app(app_id, workspace_id) app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询 # 使用 Service 层查询
log_service = AppLogService(db) log_service = AppLogService(db)
conversation, node_executions_map = log_service.get_conversation_detail( conversation, messages, node_executions_map = log_service.get_conversation_detail(
app_id=app_id, app_id=app_id,
conversation_id=conversation_id, conversation_id=conversation_id,
workspace_id=workspace_id workspace_id=workspace_id,
app_type=app.type
) )
detail = AppLogConversationDetail.model_validate(conversation) # 构建基础会话信息(不经过 ORM relationship
detail.node_executions_map = node_executions_map base = AppLogConversation.model_validate(conversation)
# 单独处理 messages避免触发 SQLAlchemy relationship 校验
if messages and isinstance(messages[0], AppLogMessage):
# 工作流:已经是 AppLogMessage 实例
msg_list = messages
else:
# AgentORM Message 对象逐个转换
msg_list = [AppLogMessage.model_validate(m) for m in messages]
detail = AppLogConversationDetail(
**base.model_dump(),
messages=msg_list,
node_executions_map=node_executions_map,
)
return success(data=detail) return success(data=detail)

View File

@@ -4,7 +4,9 @@
处理显性记忆相关的API接口包括情景记忆和语义记忆的查询。 处理显性记忆相关的API接口包括情景记忆和语义记忆的查询。
""" """
from fastapi import APIRouter, Depends from typing import Optional
from fastapi import APIRouter, Depends, Query
from app.core.logging_config import get_api_logger from app.core.logging_config import get_api_logger
from app.core.response_utils import success, fail from app.core.response_utils import success, fail
@@ -69,6 +71,140 @@ async def get_explicit_memory_overview_api(
return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e)) return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e))
@router.get("/episodics", response_model=ApiResponse)
async def get_episodic_memory_list_api(
end_user_id: str = Query(..., description="end user ID"),
page: int = Query(1, gt=0, description="page number, starting from 1"),
pagesize: int = Query(10, gt=0, le=100, description="number of items per page, max 100"),
start_date: Optional[int] = Query(None, description="start timestamp (ms)"),
end_date: Optional[int] = Query(None, description="end timestamp (ms)"),
episodic_type: str = Query("all", description="episodic type all/conversation/project_work/learning/decision/important_event"),
current_user: User = Depends(get_current_user),
) -> dict:
"""
获取情景记忆分页列表
返回指定用户的情景记忆列表,支持分页、时间范围筛选和情景类型筛选。
Args:
end_user_id: 终端用户ID必填
page: 页码从1开始默认1
pagesize: 每页数量默认10最大100
start_date: 开始时间戳(可选,毫秒),自动扩展到当天 00:00:00
end_date: 结束时间戳(可选,毫秒),自动扩展到当天 23:59:59
episodic_type: 情景类型筛选可选默认all
current_user: 当前用户
Returns:
ApiResponse: 包含情景记忆分页列表
Examples:
- 基础分页查询GET /episodics?end_user_id=xxx&page=1&pagesize=5
返回第1页每页5条数据
- 按时间范围筛选GET /episodics?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000
返回指定时间范围内的数据
- 按情景类型筛选GET /episodics?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event
返回类型为"重要事件"的数据
Notes:
- start_date 和 end_date 必须同时提供或同时不提供
- start_date 不能大于 end_date
- episodic_type 可选值all, conversation, project_work, learning, decision, important_event
- total 为该用户情景记忆总数(不受筛选条件影响)
- page.total 为筛选后的总条数
"""
workspace_id = current_user.current_workspace_id
# 检查用户是否已选择工作空间
if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆列表但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
api_logger.info(
f"情景记忆分页查询: end_user_id={end_user_id}, "
f"start_date={start_date}, end_date={end_date}, episodic_type={episodic_type}, "
f"page={page}, pagesize={pagesize}, username={current_user.username}"
)
# 1. 参数校验
if page < 1 or pagesize < 1:
api_logger.warning(f"分页参数错误: page={page}, pagesize={pagesize}")
return fail(BizCode.INVALID_PARAMETER, "分页参数必须大于0")
valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"]
if episodic_type not in valid_episodic_types:
api_logger.warning(f"无效的情景类型参数: {episodic_type}")
return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}")
# 时间戳参数校验
if (start_date is not None and end_date is None) or (end_date is not None and start_date is None):
return fail(BizCode.INVALID_PARAMETER, "start_date和end_date必须同时提供")
if start_date is not None and end_date is not None and start_date > end_date:
return fail(BizCode.INVALID_PARAMETER, "start_date不能大于end_date")
# 2. 执行查询
try:
result = await memory_explicit_service.get_episodic_memory_list(
end_user_id=end_user_id,
page=page,
pagesize=pagesize,
start_date=start_date,
end_date=end_date,
episodic_type=episodic_type,
)
api_logger.info(
f"情景记忆分页查询成功: end_user_id={end_user_id}, "
f"total={result['total']}, 返回={len(result['items'])}"
)
except Exception as e:
api_logger.error(f"情景记忆分页查询失败: end_user_id={end_user_id}, error={str(e)}")
return fail(BizCode.INTERNAL_ERROR, "情景记忆分页查询失败", str(e))
# 3. 返回结构化响应
return success(data=result, msg="查询成功")
@router.get("/semantics", response_model=ApiResponse)
async def get_semantic_memory_list_api(
end_user_id: str = Query(..., description="终端用户ID"),
current_user: User = Depends(get_current_user),
) -> dict:
"""
获取语义记忆列表
返回指定用户的全量语义记忆列表。
Args:
end_user_id: 终端用户ID必填
current_user: 当前用户
Returns:
ApiResponse: 包含语义记忆全量列表
"""
workspace_id = current_user.current_workspace_id
if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试查询语义记忆列表但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
api_logger.info(
f"语义记忆列表查询: end_user_id={end_user_id}, username={current_user.username}"
)
try:
result = await memory_explicit_service.get_semantic_memory_list(
end_user_id=end_user_id
)
api_logger.info(
f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(result)}"
)
except Exception as e:
api_logger.error(f"语义记忆列表查询失败: end_user_id={end_user_id}, error={str(e)}")
return fail(BizCode.INTERNAL_ERROR, "语义记忆列表查询失败", str(e))
return success(data=result, msg="查询成功")
@router.post("/details", response_model=ApiResponse) @router.post("/details", response_model=ApiResponse)
async def get_explicit_memory_details_api( async def get_explicit_memory_details_api(
request: ExplicitMemoryDetailsRequest, request: ExplicitMemoryDetailsRequest,

View File

@@ -14,6 +14,7 @@ from . import (
rag_api_document_controller, rag_api_document_controller,
rag_api_file_controller, rag_api_file_controller,
rag_api_knowledge_controller, rag_api_knowledge_controller,
user_memory_api_controller,
) )
# 创建 V1 API 路由器 # 创建 V1 API 路由器
@@ -28,5 +29,6 @@ service_router.include_router(rag_api_chunk_controller.router)
service_router.include_router(memory_api_controller.router) service_router.include_router(memory_api_controller.router)
service_router.include_router(end_user_api_controller.router) service_router.include_router(end_user_api_controller.router)
service_router.include_router(memory_config_api_controller.router) service_router.include_router(memory_config_api_controller.router)
service_router.include_router(user_memory_api_controller.router)
__all__ = ["service_router"] __all__ = ["service_router"]

View File

@@ -3,6 +3,7 @@
from fastapi import APIRouter, Body, Depends, Query, Request from fastapi import APIRouter, Body, Depends, Query, Request
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.celery_task_scheduler import scheduler
from app.core.api_key_auth import require_api_key from app.core.api_key_auth import require_api_key
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.core.quota_stub import check_end_user_quota from app.core.quota_stub import check_end_user_quota
@@ -86,7 +87,7 @@ async def write_memory(
user_rag_memory_id=payload.user_rag_memory_id, user_rag_memory_id=payload.user_rag_memory_id,
) )
logger.info(f"Memory write task submitted: task_id={result['task_id']}, end_user_id: {payload.end_user_id}") logger.info(f"Memory write task submitted: task_id: {result['task_id']} end_user_id: {payload.end_user_id}")
return success(data=MemoryWriteResponse(**result).model_dump(), msg="Memory write task submitted") return success(data=MemoryWriteResponse(**result).model_dump(), msg="Memory write task submitted")
@@ -105,8 +106,7 @@ async def get_write_task_status(
""" """
logger.info(f"Write task status check - task_id: {task_id}") logger.info(f"Write task status check - task_id: {task_id}")
from app.services.task_service import get_task_memory_write_result result = scheduler.get_task_status(task_id)
result = get_task_memory_write_result(task_id)
return success(data=_sanitize_task_result(result), msg="Task status retrieved") return success(data=_sanitize_task_result(result), msg="Task status retrieved")

View File

@@ -0,0 +1,230 @@
"""User Memory 服务接口 — 基于 API Key 认证
包装 user_memory_controllers.py 和 memory_agent_controller.py 中的内部接口,
提供基于 API Key 认证的对外服务:
1./analytics/graph_data - 知识图谱数据接口
2./analytics/community_graph - 社区图谱接口
3./analytics/node_statistics - 记忆节点统计接口
4./analytics/user_summary - 用户摘要接口
5./analytics/memory_insight - 记忆洞察接口
6./analytics/interest_distribution - 兴趣分布接口
7./analytics/end_user_info - 终端用户信息接口
8./analytics/generate_cache - 缓存生成接口
路由前缀: /memory
子路径: /analytics/...
最终路径: /v1/memory/analytics/...
认证方式: API Key (@require_api_key)
"""
from typing import Optional
from fastapi import APIRouter, Depends, Header, Query, Request, Body
from sqlalchemy.orm import Session
from app.core.api_key_auth import require_api_key
from app.core.api_key_utils import get_current_user_from_api_key, validate_end_user_in_workspace
from app.core.logging_config import get_business_logger
from app.db import get_db
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.memory_storage_schema import GenerateCacheRequest
# 包装内部服务 controller
from app.controllers import user_memory_controllers, memory_agent_controller
router = APIRouter(prefix="/memory", tags=["V1 - User Memory API"])
logger = get_business_logger()
# ==================== 知识图谱 ====================
@router.get("/analytics/graph_data")
@require_api_key(scopes=["memory"])
async def get_graph_data(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
node_types: Optional[str] = Query(None, description="Comma-separated node types filter"),
limit: int = Query(100, description="Max nodes to return (auto-capped at 1000 in service layer)"),
depth: int = Query(1, description="Graph traversal depth (auto-capped at 3 in service layer)"),
center_node_id: Optional[str] = Query(None, description="Center node for subgraph"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get knowledge graph data (nodes + edges) for an end user."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.get_graph_data_api(
end_user_id=end_user_id,
node_types=node_types,
limit=limit,
depth=depth,
center_node_id=center_node_id,
current_user=current_user,
db=db,
)
@router.get("/analytics/community_graph")
@require_api_key(scopes=["memory"])
async def get_community_graph(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get community clustering graph for an end user."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.get_community_graph_data_api(
end_user_id=end_user_id,
current_user=current_user,
db=db,
)
# ==================== 节点统计 ====================
@router.get("/analytics/node_statistics")
@require_api_key(scopes=["memory"])
async def get_node_statistics(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get memory node type statistics for an end user."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.get_node_statistics_api(
end_user_id=end_user_id,
current_user=current_user,
db=db,
)
# ==================== 用户摘要 & 洞察 ====================
@router.get("/analytics/user_summary")
@require_api_key(scopes=["memory"])
async def get_user_summary(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
language_type: str = Header(default=None, alias="X-Language-Type"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get cached user summary for an end user."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.get_user_summary_api(
end_user_id=end_user_id,
language_type=language_type,
current_user=current_user,
db=db,
)
@router.get("/analytics/memory_insight")
@require_api_key(scopes=["memory"])
async def get_memory_insight(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get cached memory insight report for an end user."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.get_memory_insight_report_api(
end_user_id=end_user_id,
current_user=current_user,
db=db,
)
# ==================== 兴趣分布 ====================
@router.get("/analytics/interest_distribution")
@require_api_key(scopes=["memory"])
async def get_interest_distribution(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
limit: int = Query(5, le=5, description="Max interest tags to return"),
language_type: str = Header(default=None, alias="X-Language-Type"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get interest distribution tags for an end user."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await memory_agent_controller.get_interest_distribution_by_user_api(
end_user_id=end_user_id,
limit=limit,
language_type=language_type,
current_user=current_user,
db=db,
)
# ==================== 终端用户信息 ====================
@router.get("/analytics/end_user_info")
@require_api_key(scopes=["memory"])
async def get_end_user_info(
request: Request,
end_user_id: str = Query(..., description="End user ID"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""Get end user basic information (name, aliases, metadata)."""
current_user = get_current_user_from_api_key(db, api_key_auth)
validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.get_end_user_info(
end_user_id=end_user_id,
current_user=current_user,
db=db,
)
# ==================== 缓存生成 ====================
@router.post("/analytics/generate_cache")
@require_api_key(scopes=["memory"])
async def generate_cache(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(None, description="Request body"),
language_type: str = Header(default=None, alias="X-Language-Type"),
):
"""Trigger cache generation (user summary + memory insight) for an end user or all workspace users."""
body = await request.json()
cache_request = GenerateCacheRequest(**body)
current_user = get_current_user_from_api_key(db, api_key_auth)
if cache_request.end_user_id:
validate_end_user_in_workspace(db, cache_request.end_user_id, api_key_auth.workspace_id)
return await user_memory_controllers.generate_cache_api(
request=cache_request,
language_type=language_type,
current_user=current_user,
db=db,
)

View File

@@ -1,8 +1,15 @@
"""API Key 工具函数""" """API Key 工具函数"""
import secrets import secrets
import uuid as _uuid
from typing import Optional, Union from typing import Optional, Union
from datetime import datetime from datetime import datetime
from sqlalchemy.orm import Session as _Session
from app.core.error_codes import BizCode as _BizCode
from app.core.exceptions import BusinessException as _BusinessException
from app.models.end_user_model import EndUser as _EndUser
from app.repositories.end_user_repository import EndUserRepository as _EndUserRepository
from app.models.api_key_model import ApiKeyType from app.models.api_key_model import ApiKeyType
from fastapi import Response from fastapi import Response
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -65,3 +72,72 @@ def datetime_to_timestamp(dt: Optional[datetime]) -> Optional[int]:
return None return None
return int(dt.timestamp() * 1000) return int(dt.timestamp() * 1000)
def get_current_user_from_api_key(db: _Session, api_key_auth):
"""通过 API Key 构造 current_user 对象。
从 API Key 反查创建者(管理员用户),并设置其 workspace 上下文。
与内部接口的 Depends(get_current_user) (JWT) 等价。
Args:
db: 数据库会话
api_key_auth: API Key 认证信息ApiKeyAuth
Returns:
User ORM 对象,已设置 current_workspace_id
"""
from app.services import api_key_service
api_key = api_key_service.ApiKeyService.get_api_key(
db, api_key_auth.api_key_id, api_key_auth.workspace_id
)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return current_user
def validate_end_user_in_workspace(
db: _Session,
end_user_id: str,
workspace_id,
) -> _EndUser:
"""校验 end_user 是否存在且属于指定 workspace。
Args:
db: 数据库会话
end_user_id: 终端用户 ID
workspace_id: 工作空间 IDUUID 或字符串均可)
Returns:
EndUser ORM 对象(校验通过时)
Raises:
BusinessException(INVALID_PARAMETER): end_user_id 格式无效
BusinessException(USER_NOT_FOUND): end_user 不存在
BusinessException(PERMISSION_DENIED): end_user 不属于该 workspace
"""
try:
_uuid.UUID(end_user_id)
except (ValueError, AttributeError):
raise _BusinessException(
f"Invalid end_user_id format: {end_user_id}",
_BizCode.INVALID_PARAMETER,
)
end_user_repo = _EndUserRepository(db)
end_user = end_user_repo.get_end_user_by_id(end_user_id)
if end_user is None:
raise _BusinessException(
"End user not found",
_BizCode.USER_NOT_FOUND,
)
if str(end_user.workspace_id) != str(workspace_id):
raise _BusinessException(
"End user does not belong to this workspace",
_BizCode.PERMISSION_DENIED,
)
return end_user

View File

@@ -1,6 +1,7 @@
import json import json
import os import os
from app.celery_task_scheduler import scheduler
from app.core.logging_config import get_agent_logger from app.core.logging_config import get_agent_logger
from app.core.memory.agent.langgraph_graph.tools.write_tool import format_parsing, messages_parse from app.core.memory.agent.langgraph_graph.tools.write_tool import format_parsing, messages_parse
from app.core.memory.agent.models.write_aggregate_model import WriteAggregateModel from app.core.memory.agent.models.write_aggregate_model import WriteAggregateModel
@@ -12,8 +13,6 @@ from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context from app.db import get_db_context
from app.repositories.memory_short_repository import LongTermMemoryRepository from app.repositories.memory_short_repository import LongTermMemoryRepository
from app.schemas.memory_agent_schema import AgentMemory_Long_Term from app.schemas.memory_agent_schema import AgentMemory_Long_Term
from app.services.task_service import get_task_memory_write_result
from app.tasks import write_message_task
from app.utils.config_utils import resolve_config_id from app.utils.config_utils import resolve_config_id
logger = get_agent_logger(__name__) logger = get_agent_logger(__name__)
@@ -86,16 +85,28 @@ async def write(
logger.info( logger.info(
f"[WRITE] Submitting Celery task - user={actual_end_user_id}, messages={len(structured_messages)}, config={actual_config_id}") f"[WRITE] Submitting Celery task - user={actual_end_user_id}, messages={len(structured_messages)}, config={actual_config_id}")
write_id = write_message_task.delay( # write_id = write_message_task.delay(
actual_end_user_id, # end_user_id: User ID # actual_end_user_id, # end_user_id: User ID
structured_messages, # message: JSON string format message list # structured_messages, # message: JSON string format message list
str(actual_config_id), # config_id: Configuration ID string # str(actual_config_id), # config_id: Configuration ID string
storage_type, # storage_type: "neo4j" # storage_type, # storage_type: "neo4j"
user_rag_memory_id or "" # user_rag_memory_id: RAG memory ID (not used in Neo4j mode) # user_rag_memory_id or "" # user_rag_memory_id: RAG memory ID (not used in Neo4j mode)
# )
scheduler.push_task(
"app.core.memory.agent.write_message",
actual_end_user_id,
{
"end_user_id": actual_end_user_id,
"message": structured_messages,
"config_id": str(actual_config_id),
"storage_type": storage_type,
"user_rag_memory_id": user_rag_memory_id or ""
}
) )
logger.info(f"[WRITE] Celery task submitted - task_id={write_id}")
write_status = get_task_memory_write_result(str(write_id)) # logger.info(f"[WRITE] Celery task submitted - task_id={write_id}")
logger.info(f'[WRITE] Task result - user={actual_end_user_id}, status={write_status}') # write_status = get_task_memory_write_result(str(write_id))
# logger.info(f'[WRITE] Task result - user={actual_end_user_id}')
async def term_memory_save(end_user_id, strategy_type, scope): async def term_memory_save(end_user_id, strategy_type, scope):
@@ -164,13 +175,24 @@ async def window_dialogue(end_user_id, langchain_messages, memory_config, scope)
else: else:
config_id = memory_config config_id = memory_config
write_message_task.delay( scheduler.push_task(
end_user_id, # end_user_id: User ID "app.core.memory.agent.write_message",
redis_messages, # message: JSON string format message list end_user_id,
config_id, # config_id: Configuration ID string {
AgentMemory_Long_Term.STORAGE_NEO4J, # storage_type: "neo4j" "end_user_id": end_user_id,
"" # user_rag_memory_id: RAG memory ID (not used in Neo4j mode) "message": redis_messages,
"config_id": config_id,
"storage_type": AgentMemory_Long_Term.STORAGE_NEO4J,
"user_rag_memory_id": ""
}
) )
# write_message_task.delay(
# end_user_id, # end_user_id: User ID
# redis_messages, # message: JSON string format message list
# config_id, # config_id: Configuration ID string
# AgentMemory_Long_Term.STORAGE_NEO4J, # storage_type: "neo4j"
# "" # user_rag_memory_id: RAG memory ID (not used in Neo4j mode)
# )
count_store.update_sessions_count(end_user_id, 0, []) count_store.update_sessions_count(end_user_id, 0, [])

View File

@@ -1,8 +1,8 @@
from app.core.memory.enums import SearchStrategy, StorageType from app.core.memory.enums import SearchStrategy, StorageType
from app.core.memory.models.service_models import MemorySearchResult from app.core.memory.models.service_models import MemorySearchResult
from app.core.memory.pipelines.base_pipeline import ModelClientMixin, DBRequiredPipeline from app.core.memory.pipelines.base_pipeline import ModelClientMixin, DBRequiredPipeline
from app.core.memory.read_services.content_search import Neo4jSearchService, RAGSearchService from app.core.memory.read_services.search_engine.content_search import Neo4jSearchService, RAGSearchService
from app.core.memory.read_services.query_preprocessor import QueryPreprocessor from app.core.memory.read_services.generate_engine.query_preprocessor import QueryPreprocessor
class ReadPipeLine(ModelClientMixin, DBRequiredPipeline): class ReadPipeLine(ModelClientMixin, DBRequiredPipeline):

View File

@@ -8,4 +8,4 @@ class RetrievalSummaryProcessor:
@staticmethod @staticmethod
def verify(content: str, llm_client: RedBearLLM): def verify(content: str, llm_client: RedBearLLM):
return return

View File

@@ -8,7 +8,7 @@ from neo4j import Session
from app.core.memory.enums import Neo4jNodeType from app.core.memory.enums import Neo4jNodeType
from app.core.memory.memory_service import MemoryContext from app.core.memory.memory_service import MemoryContext
from app.core.memory.models.service_models import Memory, MemorySearchResult from app.core.memory.models.service_models import Memory, MemorySearchResult
from app.core.memory.read_services.result_builder import data_builder_factory from app.core.memory.read_services.search_engine.result_builder import data_builder_factory
from app.core.models import RedBearEmbeddings from app.core.models import RedBearEmbeddings
from app.core.rag.nlp.search import knowledge_retrieval from app.core.rag.nlp.search import knowledge_retrieval
from app.repositories import knowledge_repository from app.repositories import knowledge_repository

View File

@@ -73,6 +73,7 @@ class CustomTool(BaseTool):
# 添加通用参数(基于第一个操作的参数) # 添加通用参数(基于第一个操作的参数)
if self._parsed_operations: if self._parsed_operations:
first_operation = next(iter(self._parsed_operations.values())) first_operation = next(iter(self._parsed_operations.values()))
# path/query 参数
for param_name, param_info in first_operation.get("parameters", {}).items(): for param_name, param_info in first_operation.get("parameters", {}).items():
params.append(ToolParameter( params.append(ToolParameter(
name=param_name, name=param_name,
@@ -85,6 +86,23 @@ class CustomTool(BaseTool):
maximum=param_info.get("maximum"), maximum=param_info.get("maximum"),
pattern=param_info.get("pattern") pattern=param_info.get("pattern")
)) ))
# requestBody 参数 — 将 body 字段平铺为独立参数暴露给模型
request_body = first_operation.get("request_body")
if request_body:
body_schema = request_body.get("properties", {})
required_fields = request_body.get("required", [])
for prop_name, prop_schema in body_schema.items():
params.append(ToolParameter(
name=prop_name,
type=self._convert_openapi_type(prop_schema.get("type", "string")),
description=prop_schema.get("description", ""),
required=prop_name in required_fields,
default=prop_schema.get("default"),
enum=prop_schema.get("enum"),
minimum=prop_schema.get("minimum"),
maximum=prop_schema.get("maximum"),
pattern=prop_schema.get("pattern")
))
return params return params

View File

@@ -180,6 +180,9 @@ class IterationRuntime:
"cycle_id": self.node_id, "cycle_id": self.node_id,
"cycle_idx": idx, "cycle_idx": idx,
"node_id": node_name, "node_id": node_name,
"node_type": node_type,
"node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name,
"status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"),
"input": result.get("node_outputs", {}).get(node_name, {}).get("input") "input": result.get("node_outputs", {}).get(node_name, {}).get("input")
if not cycle_variable else cycle_variable, if not cycle_variable else cycle_variable,
"output": result.get("node_outputs", {}).get(node_name, {}).get("output") "output": result.get("node_outputs", {}).get(node_name, {}).get("output")

View File

@@ -210,6 +210,9 @@ class LoopRuntime:
"cycle_id": self.node_id, "cycle_id": self.node_id,
"cycle_idx": idx, "cycle_idx": idx,
"node_id": node_name, "node_id": node_name,
"node_type": node_type,
"node_name": node_name,
"status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"),
"input": result.get("node_outputs", {}).get(node_name, {}).get("input") "input": result.get("node_outputs", {}).get(node_name, {}).get("input")
if not cycle_variable else cycle_variable, if not cycle_variable else cycle_variable,
"output": result.get("node_outputs", {}).get(node_name, {}).get("output") "output": result.get("node_outputs", {}).get(node_name, {}).get("output")

View File

@@ -1,6 +1,7 @@
import re import re
from typing import Any from typing import Any
from app.celery_task_scheduler import scheduler
from app.core.memory.enums import SearchStrategy from app.core.memory.enums import SearchStrategy
from app.core.memory.memory_service import MemoryService from app.core.memory.memory_service import MemoryService
from app.core.workflow.engine.state_manager import WorkflowState from app.core.workflow.engine.state_manager import WorkflowState
@@ -11,7 +12,6 @@ from app.core.workflow.variable.base_variable import VariableType
from app.core.workflow.variable.variable_objects import FileVariable, ArrayVariable from app.core.workflow.variable.variable_objects import FileVariable, ArrayVariable
from app.db import get_db_read from app.db import get_db_read
from app.schemas import FileInput from app.schemas import FileInput
from app.tasks import write_message_task
class MemoryReadNode(BaseNode): class MemoryReadNode(BaseNode):
@@ -126,12 +126,23 @@ class MemoryWriteNode(BaseNode):
"files": file_info "files": file_info
}) })
write_message_task.delay( scheduler.push_task(
end_user_id=end_user_id, "app.core.memory.agent.write_message",
message=messages, end_user_id,
config_id=str(self.typed_config.config_id), {
storage_type=state["memory_storage_type"], "end_user_id": end_user_id,
user_rag_memory_id=state["user_rag_memory_id"] "message": messages,
"config_id": str(self.typed_config.config_id),
"storage_type": state["memory_storage_type"],
"user_rag_memory_id": state["user_rag_memory_id"]
}
) )
# write_message_task.delay(
# end_user_id=end_user_id,
# message=messages,
# config_id=str(self.typed_config.config_id),
# storage_type=state["memory_storage_type"],
# user_rag_memory_id=state["user_rag_memory_id"]
# )
return "success" return "success"

View File

@@ -14,6 +14,7 @@ class AppLogMessage(BaseModel):
conversation_id: uuid.UUID conversation_id: uuid.UUID
role: str = Field(description="角色: user / assistant / system") role: str = Field(description="角色: user / assistant / system")
content: str content: str
status: Optional[str] = Field(default=None, description="执行状态(工作流专用): completed / failed")
meta_data: Optional[Dict[str, Any]] = None meta_data: Optional[Dict[str, Any]] = None
created_at: datetime.datetime created_at: datetime.datetime
@@ -58,6 +59,7 @@ class AppLogNodeExecution(BaseModel):
input: Optional[Any] = None input: Optional[Any] = None
process: Optional[Any] = None process: Optional[Any] = None
output: Optional[Any] = None output: Optional[Any] = None
cycle_items: Optional[List[Any]] = None
elapsed_time: Optional[float] = None elapsed_time: Optional[float] = None
token_usage: Optional[Dict[str, Any]] = None token_usage: Optional[Dict[str, Any]] = None

View File

@@ -112,12 +112,12 @@ class MemoryWriteResponse(BaseModel):
"""Response schema for memory write operation. """Response schema for memory write operation.
Attributes: Attributes:
task_id: Celery task ID for status polling task_id: task ID for status polling
status: Initial task status (PENDING) status: Initial task status (QUEUED)
end_user_id: End user ID the write was submitted for end_user_id: End user ID the write was submitted for
""" """
task_id: str = Field(..., description="Celery task ID for polling") task_id: str = Field(..., description="task ID for polling")
status: str = Field(..., description="Task status: PENDING") status: str = Field(..., description="Task status: QUEUED")
end_user_id: str = Field(..., description="End user ID") end_user_id: str = Field(..., description="End user ID")

View File

@@ -65,6 +65,11 @@ class ApiKeyService:
BizCode.BAD_REQUEST BizCode.BAD_REQUEST
) )
if data.resource_id:
app = db.get(App, data.resource_id)
if not app or not app.current_release_id:
raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED)
# 生成 API Key # 生成 API Key
api_key = generate_api_key(data.type) api_key = generate_api_key(data.type)

View File

@@ -1,16 +1,17 @@
"""应用日志服务层""" """应用日志服务层"""
import uuid import uuid
import datetime as dt
from typing import Optional, Tuple from typing import Optional, Tuple
from datetime import datetime
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.models.app_model import AppType
from app.models.conversation_model import Conversation, Message from app.models.conversation_model import Conversation, Message
from app.models.workflow_model import WorkflowExecution from app.models.workflow_model import WorkflowExecution
from app.repositories.conversation_repository import ConversationRepository, MessageRepository from app.repositories.conversation_repository import ConversationRepository, MessageRepository
from app.schemas.app_log_schema import AppLogNodeExecution from app.schemas.app_log_schema import AppLogMessage, AppLogNodeExecution
logger = get_business_logger() logger = get_business_logger()
@@ -83,51 +84,40 @@ class AppLogService:
self, self,
app_id: uuid.UUID, app_id: uuid.UUID,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
workspace_id: uuid.UUID workspace_id: uuid.UUID,
) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: app_type: str = AppType.AGENT
) -> Tuple[Conversation, list, dict[str, list[AppLogNodeExecution]]]:
""" """
查询会话详情(包含消息和工作流节点执行记录) 查询会话详情
Args:
app_id: 应用 ID
conversation_id: 会话 ID
workspace_id: 工作空间 ID
Returns: Returns:
Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: Tuple[Conversation, list[AppLogMessage|Message], dict[str, list[AppLogNodeExecution]]]
(包含消息的会话对象, 按消息ID分组的节点执行记录)
Raises:
ResourceNotFoundException: 当会话不存在时
""" """
logger.info( logger.info(
"查询应用日志会话详情", "查询应用日志会话详情",
extra={ extra={
"app_id": str(app_id), "app_id": str(app_id),
"conversation_id": str(conversation_id), "conversation_id": str(conversation_id),
"workspace_id": str(workspace_id) "workspace_id": str(workspace_id),
"app_type": app_type
} }
) )
# 查询会话
conversation = self.conversation_repository.get_conversation_for_app_log( conversation = self.conversation_repository.get_conversation_for_app_log(
conversation_id=conversation_id, conversation_id=conversation_id,
app_id=app_id, app_id=app_id,
workspace_id=workspace_id workspace_id=workspace_id
) )
# 查询消息(按时间正序) if app_type == AppType.WORKFLOW:
messages = self.message_repository.get_messages_by_conversation( messages, node_executions_map = self._get_workflow_messages_and_nodes(conversation_id)
conversation_id=conversation_id else:
) messages = self.message_repository.get_messages_by_conversation(
conversation_id=conversation_id
# 将消息附加到会话对象 )
conversation.messages = messages node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
# 查询工作流节点执行记录(按消息分组) )
_, node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
)
logger.info( logger.info(
"查询应用日志会话详情成功", "查询应用日志会话详情成功",
@@ -139,13 +129,129 @@ class AppLogService:
} }
) )
return conversation, node_executions_map return conversation, messages, node_executions_map
def _get_workflow_messages_and_nodes(
self,
conversation_id: uuid.UUID,
) -> Tuple[list[AppLogMessage], dict[str, list[AppLogNodeExecution]]]:
"""
工作流应用专用:从 workflow_executions 构建 messages 和节点日志。
每条 WorkflowExecution 对应一轮对话:
- user message来自 execution.input_datacontent 取 message 字段files 放 meta_data
- assistant message来自 execution.output_data失败时内容为错误信息
开场白的 suggested_questions 合并到第一条 assistant message 的 meta_data 里。
Returns:
(messages 列表, node_executions_map)
"""
stmt = (
select(WorkflowExecution)
.where(
WorkflowExecution.conversation_id == conversation_id,
WorkflowExecution.status.in_(["completed", "failed"])
)
.order_by(WorkflowExecution.started_at.asc())
)
executions = list(self.db.scalars(stmt).all())
# 查开场白Message 表里 meta_data 含 suggested_questions 的第一条 assistant 消息
opening_stmt = (
select(Message)
.where(
Message.conversation_id == conversation_id,
Message.role == "assistant",
)
.order_by(Message.created_at.asc())
.limit(10)
)
early_messages = list(self.db.scalars(opening_stmt).all())
suggested_questions: list = []
for m in early_messages:
if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data:
suggested_questions = m.meta_data.get("suggested_questions") or []
break
messages: list[AppLogMessage] = []
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
# 如果有开场白,作为第一条 assistant 消息插入
if suggested_questions or early_messages:
opening_msg = next(
(m for m in early_messages
if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data),
None
)
if opening_msg:
messages.append(AppLogMessage(
id=opening_msg.id,
conversation_id=conversation_id,
role="assistant",
content=opening_msg.content,
status=None,
meta_data={"suggested_questions": suggested_questions},
created_at=opening_msg.created_at,
))
for execution in executions:
started_at = execution.started_at or dt.datetime.now()
completed_at = execution.completed_at or started_at
# assistant message 的 id同时作为 node_executions_map 的 key
assistant_msg_id = uuid.uuid5(execution.id, "assistant")
# --- user message输入---
input_data = execution.input_data or {}
input_content = input_data.get("message") or _extract_text(input_data)
# 跳过没有用户输入的 execution如开场白触发的记录
if not input_content or not input_content.strip():
continue
files = input_data.get("files") or []
user_msg = AppLogMessage(
id=uuid.uuid5(execution.id, "user"),
conversation_id=conversation_id,
role="user",
content=input_content,
meta_data={"files": files} if files else None,
created_at=started_at,
)
messages.append(user_msg)
# --- assistant message输出---
if execution.status == "completed":
output_content = _extract_text(execution.output_data)
meta = {"usage": execution.token_usage or {}, "elapsed_time": execution.elapsed_time}
else:
output_content = _extract_text(execution.output_data) or ""
meta = {"error": execution.error_message, "error_node_id": execution.error_node_id}
assistant_msg = AppLogMessage(
id=assistant_msg_id,
conversation_id=conversation_id,
role="assistant",
content=output_content,
status=execution.status,
meta_data=meta,
created_at=completed_at,
)
messages.append(assistant_msg)
# --- 节点执行记录,从 workflow_executions.output_data["node_outputs"] 读取 ---
execution_nodes = _build_nodes_from_output_data(execution.output_data)
if execution_nodes:
node_executions_map[str(assistant_msg_id)] = execution_nodes
return messages, node_executions_map
def _get_workflow_node_executions_with_map( def _get_workflow_node_executions_with_map(
self, self,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
messages: list[Message] messages: list[Message]
) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: ) -> dict[str, list[AppLogNodeExecution]]:
""" """
从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组 从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组
@@ -157,13 +263,12 @@ class AppLogService:
Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
(所有节点执行记录列表, 按 message_id 分组的节点执行记录字典) (所有节点执行记录列表, 按 message_id 分组的节点执行记录字典)
""" """
node_executions = []
node_executions_map: dict[str, list[AppLogNodeExecution]] = {} node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
# 查询该会话关联的所有工作流执行记录(按时间正序) # 查询该会话关联的所有工作流执行记录(按时间正序)
stmt = select(WorkflowExecution).where( stmt = select(WorkflowExecution).where(
WorkflowExecution.conversation_id == conversation_id, WorkflowExecution.conversation_id == conversation_id,
WorkflowExecution.status == "completed" WorkflowExecution.status.in_(["completed", "failed"])
).order_by(WorkflowExecution.started_at.asc()) ).order_by(WorkflowExecution.started_at.asc())
executions = self.db.scalars(stmt).all() executions = self.db.scalars(stmt).all()
@@ -188,10 +293,18 @@ class AppLogService:
used_message_ids: set[str] = set() used_message_ids: set[str] = set()
for execution in executions: for execution in executions:
if not execution.output_data: # 构建节点执行记录列表,从 workflow_executions.output_data["node_outputs"] 读取
execution_nodes = _build_nodes_from_output_data(execution.output_data)
if not execution_nodes:
continue continue
# 找到该 execution 对应的 assistant message # 失败的执行没有 assistant message,直接用 execution id 作为 key
if execution.status == "failed":
node_executions_map[f"execution_{str(execution.id)}"] = execution_nodes
continue
# completed通过时序匹配关联到对应的 assistant message
# 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message # 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message
best_msg = None best_msg = None
best_dt = None best_dt = None
@@ -200,9 +313,9 @@ class AppLogService:
if msg_id_str in used_message_ids: if msg_id_str in used_message_ids:
continue continue
if msg.created_at and msg.created_at >= execution.started_at: if msg.created_at and msg.created_at >= execution.started_at:
dt = (msg.created_at - execution.started_at).total_seconds() delta = (msg.created_at - execution.started_at).total_seconds()
if best_dt is None or dt < best_dt: if best_dt is None or delta < best_dt:
best_dt = dt best_dt = delta
best_msg = msg best_msg = msg
if not best_msg: if not best_msg:
@@ -210,31 +323,76 @@ class AppLogService:
msg_id_str = str(best_msg.id) msg_id_str = str(best_msg.id)
used_message_ids.add(msg_id_str) used_message_ids.add(msg_id_str)
node_executions_map[msg_id_str] = execution_nodes
# 提取节点输出 return node_executions_map
output_data = execution.output_data
if isinstance(output_data, dict):
node_outputs = output_data.get("node_outputs", {})
execution_nodes = []
for node_id, node_data in node_outputs.items():
if not isinstance(node_data, dict):
continue
node_execution = AppLogNodeExecution(
node_id=node_data.get("node_id", node_id),
node_type=node_data.get("node_type", "unknown"),
node_name=node_data.get("node_name"),
status=node_data.get("status", "unknown"),
error=node_data.get("error"),
input=node_data.get("input"),
process=node_data.get("process"),
output=node_data.get("output"),
elapsed_time=node_data.get("elapsed_time"),
token_usage=node_data.get("token_usage"),
)
node_executions.append(node_execution)
execution_nodes.append(node_execution)
# 将节点记录关联到 message_id
node_executions_map[msg_id_str] = execution_nodes
return node_executions, node_executions_map def _extract_text(data: Optional[dict]) -> str:
"""从 workflow execution 的 input_data / output_data 中提取可读文本。
优先取 'text''content''output' 字段;若都没有则 JSON 序列化整个 dict。
"""
if not data:
return ""
for key in ("message", "text", "content", "output", "result", "answer"):
if key in data and isinstance(data[key], str):
return data[key]
import json
return json.dumps(data, ensure_ascii=False)
def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNodeExecution]:
"""从 workflow_executions.output_data["node_outputs"] 构建节点执行记录列表。
output_data 结构:
{
"node_outputs": {
"<node_id>": {
"node_type": ...,
"node_name": ...,
"status": ...,
"input": ...,
"output": ...,
"elapsed_time": ...,
"token_usage": ...,
"error": ...,
"cycle_items": [...],
...
}
},
"error": ...,
...
}
"""
if not output_data:
return []
node_outputs: dict = output_data.get("node_outputs") or {}
result = []
for node_id, node_data in node_outputs.items():
if not isinstance(node_data, dict):
continue
output = dict(node_data)
cycle_items = output.pop("cycle_items", None)
# 把已知的顶层字段剥离,剩余的作为 output
node_type = output.pop("node_type", "unknown")
node_name = output.pop("node_name", None)
status = output.pop("status", "completed")
error = output.pop("error", None)
inp = output.pop("input", None)
elapsed_time = output.pop("elapsed_time", None)
token_usage = output.pop("token_usage", None)
result.append(AppLogNodeExecution(
node_id=node_id,
node_type=node_type,
node_name=node_name,
status=status,
error=error,
input=inp,
process=None,
output=output if output else None,
cycle_items=cycle_items,
elapsed_time=elapsed_time,
token_usage=token_usage,
))
return result

View File

@@ -10,6 +10,7 @@ from typing import Any, Dict, Optional
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.celery_task_scheduler import scheduler
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException, ResourceNotFoundException from app.core.exceptions import BusinessException, ResourceNotFoundException
from app.core.logging_config import get_logger from app.core.logging_config import get_logger
@@ -166,20 +167,31 @@ class MemoryAPIService:
# Convert to message list format expected by write_message_task # Convert to message list format expected by write_message_task
messages = message if isinstance(message, list) else [{"role": "user", "content": message}] messages = message if isinstance(message, list) else [{"role": "user", "content": message}]
from app.tasks import write_message_task # from app.tasks import write_message_task
task = write_message_task.delay( # task = write_message_task.delay(
# end_user_id,
# messages,
# config_id,
# storage_type,
# user_rag_memory_id or "",
# )
task_id = scheduler.push_task(
"app.core.memory.agent.write_message",
end_user_id, end_user_id,
messages, {
config_id, "end_user_id": end_user_id,
storage_type, "message": messages,
user_rag_memory_id or "", "config_id": config_id,
"storage_type": storage_type,
"user_rag_memory_id": user_rag_memory_id or ""
}
) )
logger.info(f"Memory write task submitted: task_id={task.id}, end_user_id={end_user_id}") logger.info(f"Memory write task submitted, task_id={task_id} end_user_id={end_user_id}")
return { return {
"task_id": task.id, "task_id": task_id,
"status": "PENDING", "status": "QUEUED",
"end_user_id": end_user_id, "end_user_id": end_user_id,
} }

View File

@@ -4,7 +4,7 @@
处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。 处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。
""" """
from typing import Any, Dict from typing import Any, Dict, Optional
from app.core.logging_config import get_logger from app.core.logging_config import get_logger
from app.services.memory_base_service import MemoryBaseService from app.services.memory_base_service import MemoryBaseService
@@ -104,7 +104,7 @@ class MemoryExplicitService(MemoryBaseService):
e.description AS core_definition e.description AS core_definition
ORDER BY e.name ASC ORDER BY e.name ASC
""" """
semantic_result = await self.neo4j_connector.execute_query( semantic_result = await self.neo4j_connector.execute_query(
semantic_query, semantic_query,
end_user_id=end_user_id end_user_id=end_user_id
@@ -146,6 +146,209 @@ class MemoryExplicitService(MemoryBaseService):
logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True) logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True)
raise raise
async def get_episodic_memory_list(
self,
end_user_id: str,
page: int,
pagesize: int,
start_date: Optional[int] = None,
end_date: Optional[int] = None,
episodic_type: str = "all",
) -> Dict[str, Any]:
"""
获取情景记忆分页列表
Args:
end_user_id: 终端用户ID
page: 页码
pagesize: 每页数量
start_date: 开始时间戳(毫秒),可选
end_date: 结束时间戳(毫秒),可选
episodic_type: 情景类型筛选
Returns:
{
"total": int, # 该用户情景记忆总数(不受筛选影响)
"items": [...], # 当前页数据
"page": {
"page": int,
"pagesize": int,
"total": int, # 筛选后总数
"hasnext": bool
}
}
"""
try:
logger.info(
f"情景记忆分页查询: end_user_id={end_user_id}, "
f"start_date={start_date}, end_date={end_date}, "
f"episodic_type={episodic_type}, page={page}, pagesize={pagesize}"
)
# 1. 查询情景记忆总数(不受筛选条件限制)
total_all_query = """
MATCH (s:MemorySummary)
WHERE s.end_user_id = $end_user_id
RETURN count(s) AS total
"""
total_all_result = await self.neo4j_connector.execute_query(
total_all_query, end_user_id=end_user_id
)
total_all = total_all_result[0]["total"] if total_all_result else 0
# 2. 构建筛选条件
where_clauses = ["s.end_user_id = $end_user_id"]
params = {"end_user_id": end_user_id}
# 时间戳筛选(毫秒时间戳转为 UTC ISO 字符串,使用 Neo4j datetime() 精确比较)
if start_date is not None and end_date is not None:
from datetime import datetime, timezone
start_dt = datetime.fromtimestamp(start_date / 1000, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_date / 1000, tz=timezone.utc)
# 开始时间取当天 UTC 00:00:00结束时间取当天 UTC 23:59:59.999999
start_iso = start_dt.strftime("%Y-%m-%dT") + "00:00:00.000000"
end_iso = end_dt.strftime("%Y-%m-%dT") + "23:59:59.999999"
where_clauses.append("datetime(s.created_at) >= datetime($start_iso) AND datetime(s.created_at) <= datetime($end_iso)")
params["start_iso"] = start_iso
params["end_iso"] = end_iso
# 类型筛选下推到 Cypher兼容中英文
if episodic_type != "all":
type_mapping = {
"conversation": "对话",
"project_work": "项目/工作",
"learning": "学习",
"decision": "决策",
"important_event": "重要事件"
}
chinese_type = type_mapping.get(episodic_type)
if chinese_type:
where_clauses.append(
"(s.memory_type = $episodic_type OR s.memory_type = $chinese_type)"
)
params["episodic_type"] = episodic_type
params["chinese_type"] = chinese_type
else:
where_clauses.append("s.memory_type = $episodic_type")
params["episodic_type"] = episodic_type
where_str = " AND ".join(where_clauses)
# 3. 查询筛选后的总数
count_query = f"""
MATCH (s:MemorySummary)
WHERE {where_str}
RETURN count(s) AS total
"""
count_result = await self.neo4j_connector.execute_query(count_query, **params)
filtered_total = count_result[0]["total"] if count_result else 0
# 4. 查询分页数据
skip = (page - 1) * pagesize
data_query = f"""
MATCH (s:MemorySummary)
WHERE {where_str}
RETURN elementId(s) AS id,
s.name AS title,
s.memory_type AS memory_type,
s.content AS content,
s.created_at AS created_at
ORDER BY s.created_at DESC
SKIP $skip LIMIT $limit
"""
params["skip"] = skip
params["limit"] = pagesize
result = await self.neo4j_connector.execute_query(data_query, **params)
# 5. 处理结果
items = []
if result:
for record in result:
raw_created_at = record.get("created_at")
created_at_timestamp = self.parse_timestamp(raw_created_at)
items.append({
"id": record["id"],
"title": record.get("title") or "未命名",
"memory_type": record.get("memory_type") or "其他",
"content": record.get("content") or "",
"created_at": created_at_timestamp
})
# 6. 构建返回结果
return {
"total": total_all,
"items": items,
"page": {
"page": page,
"pagesize": pagesize,
"total": filtered_total,
"hasnext": (page * pagesize) < filtered_total
}
}
except Exception as e:
logger.error(f"情景记忆分页查询出错: {str(e)}", exc_info=True)
raise
async def get_semantic_memory_list(
self,
end_user_id: str
) -> list:
"""
获取语义记忆全量列表
Args:
end_user_id: 终端用户ID
Returns:
[
{
"id": str,
"name": str,
"entity_type": str,
"core_definition": str
}
]
"""
try:
logger.info(f"语义记忆列表查询: end_user_id={end_user_id}")
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id
AND e.is_explicit_memory = true
RETURN elementId(e) AS id,
e.name AS name,
e.entity_type AS entity_type,
e.description AS core_definition
ORDER BY e.name ASC
"""
result = await self.neo4j_connector.execute_query(
semantic_query, end_user_id=end_user_id
)
items = []
if result:
for record in result:
items.append({
"id": record["id"],
"name": record.get("name") or "未命名",
"entity_type": record.get("entity_type") or "未分类",
"core_definition": record.get("core_definition") or ""
})
logger.info(f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(items)}")
return items
except Exception as e:
logger.error(f"语义记忆列表查询出错: {str(e)}", exc_info=True)
raise
async def get_explicit_memory_details( async def get_explicit_memory_details(
self, self,
end_user_id: str, end_user_id: str,

View File

@@ -815,11 +815,12 @@ class ToolService:
"default": param_info.get("default") "default": param_info.get("default")
}) })
# 请求体参数 # 请求体参数 — _extract_request_body 返回 {"schema": {...}, "required": bool, ...}
request_body = operation.get("request_body") request_body = operation.get("request_body")
if request_body: if request_body:
schema_props = request_body.get("schema", {}).get("properties", {}) body_schema = request_body.get("schema", {})
required_props = request_body.get("schema", {}).get("required", []) schema_props = body_schema.get("properties", {})
required_props = body_schema.get("required", [])
for prop_name, prop_schema in schema_props.items(): for prop_name, prop_schema in schema_props.items():
parameters.append({ parameters.append({

View File

@@ -17,8 +17,9 @@ from app.core.workflow.executor import execute_workflow, execute_workflow_stream
from app.core.workflow.nodes.enums import NodeType from app.core.workflow.nodes.enums import NodeType
from app.core.workflow.validator import validate_workflow_config from app.core.workflow.validator import validate_workflow_config
from app.db import get_db from app.db import get_db
from sqlalchemy import select
from app.models import App from app.models import App
from app.models.workflow_model import WorkflowConfig, WorkflowExecution from app.models.workflow_model import WorkflowConfig, WorkflowExecution, WorkflowNodeExecution
from app.repositories import knowledge_repository from app.repositories import knowledge_repository
from app.repositories.workflow_repository import ( from app.repositories.workflow_repository import (
WorkflowConfigRepository, WorkflowConfigRepository,
@@ -918,6 +919,7 @@ class WorkflowService:
input_data["conv_messages"] = conv_messages input_data["conv_messages"] = conv_messages
init_message_length = len(input_data.get("conv_messages", [])) init_message_length = len(input_data.get("conv_messages", []))
message_id = uuid.uuid4() message_id = uuid.uuid4()
_cycle_items: dict[str, list] = {}
# 新会话时写入开场白 # 新会话时写入开场白
is_new_conversation = init_message_length == 0 is_new_conversation = init_message_length == 0
@@ -948,6 +950,15 @@ class WorkflowService:
memory_storage_type=storage_type, memory_storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id user_rag_memory_id=user_rag_memory_id
): ):
event_type = event.get("event")
event_data = event.get("data", {})
if event_type == "cycle_item":
cycle_id = event_data.get("cycle_id")
if cycle_id not in _cycle_items:
_cycle_items[cycle_id] = []
_cycle_items[cycle_id].append(event_data)
if event.get("event") == "workflow_end": if event.get("event") == "workflow_end":
status = event.get("data", {}).get("status") status = event.get("data", {}).get("status")
token_usage = event.get("data", {}).get("token_usage", {}) or {} token_usage = event.get("data", {}).get("token_usage", {}) or {}
@@ -1019,6 +1030,18 @@ class WorkflowService:
) )
else: else:
logger.error(f"unexpect workflow run status, status: {status}") logger.error(f"unexpect workflow run status, status: {status}")
# 把积累的 cycle_item 写入 workflow_executions.output_data["node_outputs"]
if _cycle_items and execution.output_data:
import copy
new_output_data = copy.deepcopy(execution.output_data)
node_outputs = new_output_data.setdefault("node_outputs", {})
for cycle_node_id, items in _cycle_items.items():
if cycle_node_id in node_outputs:
node_outputs[cycle_node_id]["cycle_items"] = items
else:
node_outputs[cycle_node_id] = {"cycle_items": items}
execution.output_data = new_output_data
self.db.commit()
elif event.get("event") == "workflow_start": elif event.get("event") == "workflow_start":
event["data"]["message_id"] = str(message_id) event["data"]["message_id"] = str(message_id)
event = self._emit(public, event) event = self._emit(public, event)

View File

@@ -34,7 +34,7 @@ from app.core.rag.prompts.generator import question_proposal
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ( from app.core.rag.vdb.elasticsearch.elasticsearch_vector import (
ElasticSearchVectorFactory, ElasticSearchVectorFactory,
) )
from app.db import get_db, get_db_context from app.db import get_db_context
from app.models import Document, File, Knowledge from app.models import Document, File, Knowledge
from app.models.end_user_model import EndUser from app.models.end_user_model import EndUser
from app.schemas import document_schema, file_schema from app.schemas import document_schema, file_schema
@@ -2025,7 +2025,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
end_users = db.query(EndUser).all() end_users = db.query(EndUser).all()
if not end_users: if not end_users:
logger.info("没有终端用户,跳过遗忘周期") logger.info("没有终端用户,跳过遗忘周期")
return {"status": "SUCCESS", "message": "没有终端用户", return {"status": "SUCCESS", "message": "没有终端用户",
"report": {"merged_count": 0, "failed_count": 0, "processed_users": 0}, "report": {"merged_count": 0, "failed_count": 0, "processed_users": 0},
"duration_seconds": time.time() - start_time} "duration_seconds": time.time() - start_time}
@@ -2039,7 +2039,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
# 获取用户配置(自动回退到工作空间默认配置) # 获取用户配置(自动回退到工作空间默认配置)
connected_config = get_end_user_connected_config(str(end_user.id), db) connected_config = get_end_user_connected_config(str(end_user.id), db)
user_config_id = resolve_config_id(connected_config.get("memory_config_id"), db) user_config_id = resolve_config_id(connected_config.get("memory_config_id"), db)
if not user_config_id: if not user_config_id:
failed_users.append({"end_user_id": str(end_user.id), "error": "无法获取配置"}) failed_users.append({"end_user_id": str(end_user.id), "error": "无法获取配置"})
continue continue
@@ -2048,13 +2048,13 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
report = await forget_service.trigger_forgetting_cycle( report = await forget_service.trigger_forgetting_cycle(
db=db, end_user_id=str(end_user.id), config_id=user_config_id db=db, end_user_id=str(end_user.id), config_id=user_config_id
) )
total_merged += report.get('merged_count', 0) total_merged += report.get('merged_count', 0)
total_failed += report.get('failed_count', 0) total_failed += report.get('failed_count', 0)
processed_users += 1 processed_users += 1
logger.info(f"用户 {end_user.id}: 融合 {report.get('merged_count', 0)} 对节点") logger.info(f"用户 {end_user.id}: 融合 {report.get('merged_count', 0)} 对节点")
except Exception as e: except Exception as e:
logger.error(f"处理用户 {end_user.id} 失败: {e}", exc_info=True) logger.error(f"处理用户 {end_user.id} 失败: {e}", exc_info=True)
failed_users.append({"end_user_id": str(end_user.id), "error": str(e)}) failed_users.append({"end_user_id": str(end_user.id), "error": str(e)})
@@ -2801,18 +2801,18 @@ def run_incremental_clustering(
包含任务执行结果的字典 包含任务执行结果的字典
""" """
start_time = time.time() start_time = time.time()
async def _run() -> Dict[str, Any]: async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger from app.core.logging_config import get_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.memory.storage_services.clustering_engine.label_propagation import LabelPropagationEngine from app.core.memory.storage_services.clustering_engine.label_propagation import LabelPropagationEngine
logger = get_logger(__name__) logger = get_logger(__name__)
logger.info( logger.info(
f"[IncrementalClustering] 开始增量聚类任务 - end_user_id={end_user_id}, " f"[IncrementalClustering] 开始增量聚类任务 - end_user_id={end_user_id}, "
f"实体数={len(new_entity_ids)}, llm_model_id={llm_model_id}" f"实体数={len(new_entity_ids)}, llm_model_id={llm_model_id}"
) )
connector = Neo4jConnector() connector = Neo4jConnector()
try: try:
engine = LabelPropagationEngine( engine = LabelPropagationEngine(
@@ -2820,12 +2820,12 @@ def run_incremental_clustering(
llm_model_id=llm_model_id, llm_model_id=llm_model_id,
embedding_model_id=embedding_model_id, embedding_model_id=embedding_model_id,
) )
# 执行增量聚类 # 执行增量聚类
await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids) await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids)
logger.info(f"[IncrementalClustering] 增量聚类完成 - end_user_id={end_user_id}") logger.info(f"[IncrementalClustering] 增量聚类完成 - end_user_id={end_user_id}")
return { return {
"status": "SUCCESS", "status": "SUCCESS",
"end_user_id": end_user_id, "end_user_id": end_user_id,
@@ -2836,18 +2836,18 @@ def run_incremental_clustering(
raise raise
finally: finally:
await connector.close() await connector.close()
try: try:
loop = set_asyncio_event_loop() loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run()) result = loop.run_until_complete(_run())
result["elapsed_time"] = time.time() - start_time result["elapsed_time"] = time.time() - start_time
result["task_id"] = self.request.id result["task_id"] = self.request.id
logger.info( logger.info(
f"[IncrementalClustering] 任务完成 - task_id={self.request.id}, " f"[IncrementalClustering] 任务完成 - task_id={self.request.id}, "
f"elapsed_time={result['elapsed_time']:.2f}s" f"elapsed_time={result['elapsed_time']:.2f}s"
) )
return result return result
except Exception as e: except Exception as e:
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time

View File

@@ -63,6 +63,23 @@ services:
networks: networks:
- celery - celery
celery-task-scheduler:
image: redbear-mem-open:latest
container_name: celery-task-scheduler
env_file:
- .env
volumes:
- /etc/localtime:/etc/localtime:ro
command: python -m app.celery_task_scheduler
restart: unless-stopped
healthcheck:
test: CMD curl -f 127.0.0.1:8001 || exit 1
interval: 30s
timeout: 5s
retries: 3
networks:
- celery
# Celery Beat - scheduler # Celery Beat - scheduler
beat: beat:
image: redbear-mem-open:latest image: redbear-mem-open:latest

View File

@@ -1,12 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<svg width="16px" height="16px" viewBox="0 0 16 16" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink"> <svg width="16px" height="16px" viewBox="0 0 16 16" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<title></title> <title></title>
<g id="空间里层页面优化" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd" stroke-linecap="round"> <g id="空间里层页面优化" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd" stroke-linecap="round">
<g id="记忆库-个人记忆-感知记忆-文本" transform="translate(-573, -158)" stroke="#171719"> <g id="记忆库-个人记忆-感知记忆-文本" transform="translate(-555, -158)" stroke="#171719">
<g id="导" transform="translate(573, 158)"> <g id="导" transform="translate(555, 158)">
<g id="编组-54" transform="translate(3, 3)"> <g id="编组-54" transform="translate(3, 3)">
<path d="M10,6 L10,7.5 C10,8.88071187 8.88071187,10 7.5,10 L2.5,10 C1.11928813,10 0,8.88071187 0,7.5 L0,6 L0,6" id="路径"></path> <path d="M10,6 L10,7.5 C10,8.88071187 8.88071187,10 7.5,10 L2.5,10 C1.11928813,10 0,8.88071187 0,7.5 L0,6 L0,6" id="路径"></path>
<g id="编组-11" transform="translate(2, 0)"> <g id="编组-11" transform="translate(5, 3.4982) scale(1, -1) translate(-5, -3.4982)translate(2, 0)">
<line x1="3" y1="0.08499952" x2="3" y2="6.99635859" id="路径-24"></line> <line x1="3" y1="0.08499952" x2="3" y2="6.99635859" id="路径-24"></line>
<polyline id="路径-25" stroke-linejoin="round" points="0 3 2.98005548 6.08298138e-18 6 3"></polyline> <polyline id="路径-25" stroke-linejoin="round" points="0 3 2.98005548 6.08298138e-18 6 3"></polyline>
</g> </g>

Before

Width:  |  Height:  |  Size: 1.1 KiB

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -1,12 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<svg width="16px" height="16px" viewBox="0 0 16 16" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink"> <svg width="16px" height="16px" viewBox="0 0 16 16" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<title></title> <title></title>
<g id="空间里层页面优化" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd" stroke-linecap="round"> <g id="空间里层页面优化" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd" stroke-linecap="round">
<g id="记忆库-个人记忆-感知记忆-文本" transform="translate(-555, -158)" stroke="#171719"> <g id="记忆库-个人记忆-感知记忆-文本" transform="translate(-573, -158)" stroke="#171719">
<g id="导" transform="translate(555, 158)"> <g id="导" transform="translate(573, 158)">
<g id="编组-54" transform="translate(3, 3)"> <g id="编组-54" transform="translate(3, 3)">
<path d="M10,6 L10,7.5 C10,8.88071187 8.88071187,10 7.5,10 L2.5,10 C1.11928813,10 0,8.88071187 0,7.5 L0,6 L0,6" id="路径"></path> <path d="M10,6 L10,7.5 C10,8.88071187 8.88071187,10 7.5,10 L2.5,10 C1.11928813,10 0,8.88071187 0,7.5 L0,6 L0,6" id="路径"></path>
<g id="编组-11" transform="translate(5, 3.4982) scale(1, -1) translate(-5, -3.4982)translate(2, 0)"> <g id="编组-11" transform="translate(2, 0)">
<line x1="3" y1="0.08499952" x2="3" y2="6.99635859" id="路径-24"></line> <line x1="3" y1="0.08499952" x2="3" y2="6.99635859" id="路径-24"></line>
<polyline id="路径-25" stroke-linejoin="round" points="0 3 2.98005548 6.08298138e-18 6 3"></polyline> <polyline id="路径-25" stroke-linejoin="round" points="0 3 2.98005548 6.08298138e-18 6 3"></polyline>
</g> </g>

Before

Width:  |  Height:  |  Size: 1.1 KiB

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -272,14 +272,21 @@ const ChatContent: FC<ChatContentProps> = ({
<Flex vertical gap={4} className="rb:mt-1! rb:pt-3! rb-border-t rb:mb-2!"> <Flex vertical gap={4} className="rb:mt-1! rb:pt-3! rb-border-t rb:mb-2!">
<div className="rb:font-medium">{t('memoryConversation.citations')}</div> <div className="rb:font-medium">{t('memoryConversation.citations')}</div>
{item.meta_data?.citations?.map((citation, idx) => ( {item.meta_data?.citations?.map((citation, idx) => (
<div <Flex key={idx} align="center" gap={12}>
key={idx} <div
className="rb:text-[#155EEF] rb:leading-5 rb:underline rb:cursor-pointer" className="rb:text-[#155EEF] rb:leading-5 rb:underline rb:cursor-pointer"
onClick={() => { onClick={() => {
const params = new URLSearchParams({ documentId: citation.document_id, parentId: citation.knowledge_id }); const params = new URLSearchParams({ documentId: citation.document_id, parentId: citation.knowledge_id });
window.open(`/#/knowledge-base/${citation.knowledge_id}/DocumentDetails?${params}`, '_blank'); window.open(`/#/knowledge-base/${citation.knowledge_id}/DocumentDetails?${params}`, '_blank');
}} }}
>{citation.file_name}</div> >{citation.file_name}</div>
{citation.download_url &&
<div className="rb:size-4 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/application/export.svg')]"
onClick={() => handleDownload({ url: citation.download_url })}
></div>
}
</Flex>
))} ))}
</Flex> </Flex>
} }

View File

@@ -24,7 +24,7 @@ export interface ChatItem {
subContent?: Record<string, any>[]; subContent?: Record<string, any>[];
error?: string; error?: string;
meta_data?: { meta_data?: {
audio_url?: string; audio_url?: string | null;
audio_status?: string; audio_status?: string;
files?: any[]; files?: any[];
suggested_questions?: string[]; suggested_questions?: string[];
@@ -33,6 +33,7 @@ export interface ChatItem {
file_name: string; file_name: string;
knowledge_id: string; knowledge_id: string;
score: string; score: string;
download_url?: string;
}[]; }[];
reasoning_content?: string; reasoning_content?: string;
}, },

View File

@@ -1460,6 +1460,7 @@ export const en = {
maxCount: 'Max Files', maxCount: 'Max Files',
singleMaxSize: 'Max Size', singleMaxSize: 'Max Size',
unix: 'items', unix: 'items',
document_image_recognition: 'Enable image recognition in documents',
text_to_speech: 'Text to Speech', text_to_speech: 'Text to Speech',
text_to_speech_desc: 'Text can be converted to speech', text_to_speech_desc: 'Text can be converted to speech',
opening_statement: 'Conversation Opening', opening_statement: 'Conversation Opening',
@@ -1469,6 +1470,7 @@ export const en = {
add_questions: 'Add Option', add_questions: 'Add Option',
citation: 'Citation and Attribution', citation: 'Citation and Attribution',
citation_desc: 'Display the attribution of source documents and generated content', citation_desc: 'Display the attribution of source documents and generated content',
allow_download: 'Allow downloading cited source text',
invalidVariablesTitle: "The following undefined variables are referenced in the conversation opening. Do you want to save the opening configuration?", invalidVariablesTitle: "The following undefined variables are referenced in the conversation opening. Do you want to save the opening configuration?",
deep_thinking: 'Enable Deep Thinking', deep_thinking: 'Enable Deep Thinking',
@@ -1536,6 +1538,7 @@ export const en = {
json_output: 'Support JSON formatted output', json_output: 'Support JSON formatted output',
thinking_budget_tokens: 'thinking budget tokens', thinking_budget_tokens: 'thinking budget tokens',
thinking_budget_tokens_max_error: "Cannot exceed the max tokens limit ({{max}})", thinking_budget_tokens_max_error: "Cannot exceed the max tokens limit ({{max}})",
logSearchPlaceholder: 'Search log content',
}, },
userMemory: { userMemory: {
userMemory: 'User Memory', userMemory: 'User Memory',
@@ -2529,6 +2532,7 @@ Memory Bear: After the rebellion, regional warlordism intensified for several re
input_result: 'Input', input_result: 'Input',
output_result: 'Output', output_result: 'Output',
process_result: 'Data Processing',
error: 'Error Message', error: 'Error Message',
loopNum: ' loops', loopNum: ' loops',
iterationNum: ' iterations', iterationNum: ' iterations',

View File

@@ -790,6 +790,7 @@ export const zh = {
maxCount: '最大文件数', maxCount: '最大文件数',
singleMaxSize: '单文件最大大小', singleMaxSize: '单文件最大大小',
unix: '个', unix: '个',
document_image_recognition: '是否识别文档中的图片',
text_to_speech: '文字转语音', text_to_speech: '文字转语音',
text_to_speech_desc: '文本可以转换成语音', text_to_speech_desc: '文本可以转换成语音',
opening_statement: '对话开场白', opening_statement: '对话开场白',
@@ -799,6 +800,7 @@ export const zh = {
add_questions: '添加选项', add_questions: '添加选项',
citation: '引用和归属', citation: '引用和归属',
citation_desc: '显示源文档和生成内容的归属部分', citation_desc: '显示源文档和生成内容的归属部分',
allow_download: '允许下载引用原文',
invalidVariablesTitle: "对话开场白中引用了以下未定义的变量,是否保存开场白配置?", invalidVariablesTitle: "对话开场白中引用了以下未定义的变量,是否保存开场白配置?",
deep_thinking: '开启深度思考', deep_thinking: '开启深度思考',
@@ -866,6 +868,7 @@ export const zh = {
json_output: '支持JSON格式化输出', json_output: '支持JSON格式化输出',
thinking_budget_tokens: '深度思考预算Token数', thinking_budget_tokens: '深度思考预算Token数',
thinking_budget_tokens_max_error: "不能超过 最大令牌数 ({{max}})", thinking_budget_tokens_max_error: "不能超过 最大令牌数 ({{max}})",
logSearchPlaceholder: '搜索日志内容',
}, },
table: { table: {
totalRecords: '共 {{total}} 条记录' totalRecords: '共 {{total}} 条记录'
@@ -2493,6 +2496,7 @@ export const zh = {
input_result: '输入', input_result: '输入',
output_result: '输出', output_result: '输出',
process_result: '数据处理',
error: '错误信息', error: '错误信息',
loopNum: '个循环', loopNum: '个循环',
iterationNum: '个迭代', iterationNum: '个迭代',

View File

@@ -7,7 +7,7 @@
import { useEffect, useRef, useState, forwardRef, useImperativeHandle, useMemo } from 'react'; import { useEffect, useRef, useState, forwardRef, useImperativeHandle, useMemo } from 'react';
import { useTranslation } from 'react-i18next' import { useTranslation } from 'react-i18next'
import { useParams } from 'react-router-dom'; import { useParams } from 'react-router-dom';
import { Row, Col, Space, Form, Input, Button, App, Spin, Flex } from 'antd' import { Row, Col, Space, Form, Input, Button, App, Flex } from 'antd'
import Chat from './components/Chat' import Chat from './components/Chat'
import RbCard from '@/components/RbCard/Card' import RbCard from '@/components/RbCard/Card'
@@ -357,21 +357,23 @@ const Agent = forwardRef<AgentRef, { onFeaturesLoad?: (features: FeaturesConfigF
const { statement = '' } = value?.opening_statement || {} const { statement = '' } = value?.opening_statement || {}
onFeaturesLoad?.(value) onFeaturesLoad?.(value)
const usedVars = [...new Set([...(statement?.matchAll(/\{\{(\w+)\}\}/g) ?? [])].map(m => m[1]))] if (value?.opening_statement?.enabled) {
const variables = values?.variables const usedVars = [...new Set([...(statement?.matchAll(/\{\{(\w+)\}\}/g) ?? [])].map(m => m[1]))]
const validNames = new Set(variables.map(v => v.name)) const variables = values?.variables
const invalid = usedVars.filter(v => !validNames.has(v)) const validNames = new Set(variables.map(v => v.name))
if (invalid.length > 0) { const invalid = usedVars.filter(v => !validNames.has(v))
const newVars = invalid.map((name, i) => ({ if (invalid.length > 0) {
index: variables.length + i, const newVars = invalid.map((name, i) => ({
name, index: variables.length + i,
display_name: name, name,
type: 'text', display_name: name,
required: true, type: 'text',
max_length: 48, required: true,
})) max_length: 48,
}))
form.setFieldValue('variables', [...variables, ...newVars]) form.setFieldValue('variables', [...variables, ...newVars])
}
} }
} }
const modelLogo = useMemo(() => { const modelLogo = useMemo(() => {

View File

@@ -7,7 +7,7 @@
import { type FC, useRef } from 'react'; import { type FC, useRef } from 'react';
import { useTranslation } from 'react-i18next'; import { useTranslation } from 'react-i18next';
import { useParams } from 'react-router-dom'; import { useParams } from 'react-router-dom';
import { Flex, Button } from 'antd'; import { Flex, Button, Form } from 'antd';
import type { ColumnsType } from 'antd/es/table'; import type { ColumnsType } from 'antd/es/table';
import { getAppLogsUrl } from '@/api/application'; import { getAppLogsUrl } from '@/api/application';
@@ -15,11 +15,14 @@ import Table from '@/components/Table'
import { formatDateTime } from '@/utils/format'; import { formatDateTime } from '@/utils/format';
import type { LogItem, LogDetailModalRef } from './types' import type { LogItem, LogDetailModalRef } from './types'
import LogDetailModal from './components/LogDetailModal' import LogDetailModal from './components/LogDetailModal'
import SearchInput from '@/components/SearchInput'
const Statistics: FC = () => { const Statistics: FC = () => {
const { t } = useTranslation(); const { t } = useTranslation();
const { id } = useParams(); const { id } = useParams();
const logDetailRef = useRef<LogDetailModalRef>(null); const logDetailRef = useRef<LogDetailModalRef>(null);
const [form] = Form.useForm();
const values = Form.useWatch([], form);
const handleViewDetail = (item: LogItem) => { const handleViewDetail = (item: LogItem) => {
logDetailRef.current?.handleOpen(item); logDetailRef.current?.handleOpen(item);
@@ -62,15 +65,26 @@ const Statistics: FC = () => {
]; ];
return ( return (
<div className="rb:bg-white rb:rounded-lg rb:pt-3 rb:px-3"> <div className="rb:bg-white rb:rounded-lg rb:pt-3 rb:px-3">
<Flex justify="flex-end" className="rb:mb-3!">
<Form form={form}>
<Form.Item name="keyword" noStyle>
<SearchInput
placeholder={t('application.logSearchPlaceholder')}
variant="outlined"
/>
</Form.Item>
</Form>
</Flex>
<Table<LogItem> <Table<LogItem>
apiUrl={getAppLogsUrl(id || '')} apiUrl={getAppLogsUrl(id || '')}
apiParams={{ apiParams={{
is_draft: false, is_draft: false,
...(values ?? {})
}} }}
columns={columns} columns={columns}
rowKey="id" rowKey="id"
isScroll={true} isScroll={true}
scrollY="calc(100vh - 214px)" scrollY="calc(100vh - 242px)"
/> />
<LogDetailModal ref={logDetailRef} /> <LogDetailModal ref={logDetailRef} />
</div> </div>

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-03-13 17:27:52 * @Date: 2026-03-13 17:27:52
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-07 21:48:30 * @Last Modified time: 2026-04-24 18:14:25
*/ */
import { type FC, useState, useRef, useEffect } from 'react' import { type FC, useState, useRef, useEffect } from 'react'
import { useTranslation } from 'react-i18next' import { useTranslation } from 'react-i18next'
@@ -59,6 +59,7 @@ interface NodeData {
node_type?: string; node_type?: string;
input?: any; input?: any;
output?: any; output?: any;
process?: any;
elapsed_time?: string; elapsed_time?: string;
error?: any; error?: any;
state: Record<string, any>; state: Record<string, any>;
@@ -485,7 +486,7 @@ const TestChat: FC<TestChatProps> = ({
} }
const updateWorkflowNodeEndMessage = (data: NodeData) => { const updateWorkflowNodeEndMessage = (data: NodeData) => {
const { node_id, input, output, error, elapsed_time, status } = data; const { node_id, input, output, process, error, elapsed_time, status } = data;
setChatList(prev => { setChatList(prev => {
const newList = [...prev] const newList = [...prev]
const lastIndex = newList.length - 1 const lastIndex = newList.length - 1
@@ -498,6 +499,7 @@ const TestChat: FC<TestChatProps> = ({
content: { content: {
input, input,
output, output,
process,
error, error,
}, },
status: status || 'completed', status: status || 'completed',
@@ -514,7 +516,7 @@ const TestChat: FC<TestChatProps> = ({
} }
const updateWorkflowCycleMessage = (data: NodeData) => { const updateWorkflowCycleMessage = (data: NodeData) => {
const { node_id, cycle_id, cycle_idx, input, output, error, elapsed_time, status } = data; const { node_id, cycle_id, cycle_idx, input, output, process, error, elapsed_time, status } = data;
const { nodes } = config as WorkflowConfig const { nodes } = config as WorkflowConfig
const node = nodes.find(n => n.id === node_id); const node = nodes.find(n => n.id === node_id);
const { name, type } = node || {} const { name, type } = node || {}
@@ -538,6 +540,7 @@ const TestChat: FC<TestChatProps> = ({
cycle_idx, cycle_idx,
input, input,
output, output,
process,
error, error,
}, },
status: status || 'completed', status: status || 'completed',

View File

@@ -155,6 +155,12 @@ const FeaturesConfigModal = forwardRef<FeaturesConfigModalRef, FeaturesConfigMod
name={['citation', "enabled"]} name={['citation', "enabled"]}
desc={t('application.citation_desc')} desc={t('application.citation_desc')}
/> />
<SwitchFormItem
title={t(`application.allow_download`)}
name={['citation', "allow_download"]}
disabled={!values?.citation?.enabled}
className="rb:mt-2!"
/>
</div> </div>
<div className="rb:relative rb:border rb:border-[#DFE4ED] rb:p-3 rb:rounded-lg rb:bg-[#f5f7fc]"> <div className="rb:relative rb:border rb:border-[#DFE4ED] rb:p-3 rb:rounded-lg rb:bg-[#f5f7fc]">

View File

@@ -97,6 +97,7 @@ export const defaultValues: FileUpload = {
"json", "json",
"md", "md",
], ],
document_image_recognition: false,
video_enabled: false, video_enabled: false,
video_max_size_mb: 100, video_max_size_mb: 100,
video_allowed_extensions: [ video_allowed_extensions: [
@@ -219,11 +220,22 @@ const FileUploadSettingModal = forwardRef<FileUploadSettingModalRef, FileUploadS
</Col> </Col>
</Row> </Row>
{isEnabled && ( {isEnabled && (
<Flex align="center" gap={12} className="rb:mt-3! rb:pt-3! rb:border-t rb:border-[#DFE4ED]"> <Flex align="center" gap={16} className="rb:mt-3! rb:pt-3! rb:border-t rb:border-[#DFE4ED]">
<div>{t('application.singleMaxSize')}: </div> <div>
<Form.Item name={sizeKey} noStyle> <div>{t('application.singleMaxSize')}</div>
<InputNumber min={1} max={100} suffix="MB" className="rb:flex-1" /> <Form.Item name={sizeKey} noStyle>
</Form.Item> <InputNumber min={1} max={100} suffix="MB" className="rb:flex-1" />
</Form.Item>
</div>
{option.type === 'document' &&
<div>
<div>{t('application.document_image_recognition')}</div>
<Form.Item name="document_image_recognition" valuePropName="checked" noStyle>
<Switch className="rb:mt-1.5!" />
</Form.Item>
</div>
}
<Form.Item name={`${option.type}_allowed_extensions`} hidden /> <Form.Item name={`${option.type}_allowed_extensions`} hidden />
</Flex> </Flex>
)} )}

View File

@@ -104,6 +104,7 @@ const OpenStatementSettingModal = forwardRef<OpenStatementSettingModalRef, OpenS
<Form.Item <Form.Item
label={t('application.opening_statement')} label={t('application.opening_statement')}
name="statement" name="statement"
rules={[{ required: true, message: t('common.pleaseEnter') }]}
> >
{source === 'workflow' {source === 'workflow'
? <Editor options={chatVariables as any} variant="outlined" /> ? <Editor options={chatVariables as any} variant="outlined" />

View File

@@ -1,8 +1,8 @@
/* /*
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-03-24 16:31:24 * @Date: 2026-03-24 16:31:24
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-24 16:31:24 * @Last Modified time: 2026-04-24 17:49:58
*/ */
import { forwardRef, useImperativeHandle, useState, useEffect } from 'react'; import { forwardRef, useImperativeHandle, useState, useEffect } from 'react';
import { Flex, Button, Empty, Skeleton } from 'antd'; import { Flex, Button, Empty, Skeleton } from 'antd';
@@ -14,6 +14,12 @@ import { getAppLogDetail } from '@/api/application'
import ChatContent from '@/components/Chat/ChatContent' import ChatContent from '@/components/Chat/ChatContent'
import { formatDateTime } from '@/utils/format' import { formatDateTime } from '@/utils/format'
import type { ChatItem } from '@/components/Chat/types' import type { ChatItem } from '@/components/Chat/types'
import Runtime from '@/views/Workflow/components/Chat/Runtime'
import { nodeLibrary } from '@/views/Workflow/constant'
const nodeIconMap = Object.fromEntries(
nodeLibrary.flatMap(c => c.nodes.map(n => [n.type, n.icon]))
)
/** Log detail data with conversation messages */ /** Log detail data with conversation messages */
type Data = LogItem & { type Data = LogItem & {
@@ -54,7 +60,30 @@ const LogDetailModal = forwardRef<LogDetailModalRef>((_props, ref) => {
if (!vo) return if (!vo) return
setLoading(true) setLoading(true)
getAppLogDetail(vo.app_id, vo.id).then(res => { getAppLogDetail(vo.app_id, vo.id).then(res => {
setData(res as Data) const { node_executions_map, messages, ...rest } = res as Data;
let hasSubContentMessages = messages
if (messages && messages.length > 0 && node_executions_map && Object.keys(node_executions_map).length > 0) {
hasSubContentMessages = messages.map(item => {
if (item.id && node_executions_map[item.id]) {
item.subContent = node_executions_map[item.id]?.map(({ input, output, cycle_items = [], error, process, ...node }: any) => {
const converted: any = { ...node, icon: nodeIconMap[node.node_type], content: { input, output, process, error } }
if (node.node_type === 'loop' && Array.isArray(cycle_items) && cycle_items.length > 0) {
converted.subContent = cycle_items.map(({ input: cInput, output: cOutput, error: cError, process: cProcess, ...cNode }: any) => ({
...cNode,
icon: nodeIconMap[cNode.node_type],
content: { input: cInput, output: cOutput, process: cProcess, error: cError }
}))
}
return converted
})
}
return { ...item }
})
}
setData({
...rest,
messages: hasSubContentMessages
})
}) })
.finally(() => { .finally(() => {
setLoading(false) setLoading(false)
@@ -66,6 +95,8 @@ const LogDetailModal = forwardRef<LogDetailModalRef>((_props, ref) => {
handleClose handleClose
})); }));
console.log('data', data)
return ( return (
<RbModal <RbModal
title={<> title={<>
@@ -92,6 +123,7 @@ const LogDetailModal = forwardRef<LogDetailModalRef>((_props, ref) => {
data={data.messages || []} data={data.messages || []}
streamLoading={false} streamLoading={false}
labelFormat={(item) => formatDateTime(item.created_at)} labelFormat={(item) => formatDateTime(item.created_at)}
renderRuntime={(item, index) => <Runtime item={item} index={index} />}
/> />
) )
} }

View File

@@ -184,7 +184,8 @@ const ModelConfigModal = forwardRef<ModelConfigModalRef, ModelConfigModalProps>(
{ {
validator: (_, value) => { validator: (_, value) => {
const maxTokens = values?.max_tokens const maxTokens = values?.max_tokens
if (value !== undefined && maxTokens !== undefined && value > maxTokens) { const deep_thinking = values?.deep_thinking;
if (deep_thinking && value !== undefined && maxTokens !== undefined && value > maxTokens) {
return Promise.reject(t('application.thinking_budget_tokens_max_error', { max: maxTokens })) return Promise.reject(t('application.thinking_budget_tokens_max_error', { max: maxTokens }))
} }
return Promise.resolve() return Promise.resolve()

View File

@@ -438,6 +438,7 @@ interface FileSetttings {
document_enabled: boolean; document_enabled: boolean;
document_max_size_mb: number; document_max_size_mb: number;
document_allowed_extensions: string[]; document_allowed_extensions: string[];
document_image_recognition: boolean;
video_enabled: boolean; video_enabled: boolean;
video_max_size_mb: number; video_max_size_mb: number;
video_allowed_extensions: string[]; video_allowed_extensions: string[];
@@ -499,6 +500,7 @@ export interface LogItem {
is_draft: boolean; is_draft: boolean;
created_at: number; created_at: number;
updated_at: number; updated_at: number;
node_executions_map?: Record<string, ChatItem['subContent']>
} }
export interface LogDetailModalRef { export interface LogDetailModalRef {
handleOpen: (vo: LogItem) => void; handleOpen: (vo: LogItem) => void;

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-02-06 21:10:56 * @Date: 2026-02-06 21:10:56
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-21 14:59:13 * @Last Modified time: 2026-04-24 18:13:22
*/ */
/** /**
* Workflow Chat Component * Workflow Chat Component
@@ -66,7 +66,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
const [fileList, setFileList] = useState<any[]>([]) const [fileList, setFileList] = useState<any[]>([])
const [message, setMessage] = useState<string | undefined>(undefined) const [message, setMessage] = useState<string | undefined>(undefined)
console.log('abortRef', abortRef) console.log('abortRef', abortRef, chatList)
/** /**
* Opens the chat drawer and loads workflow variables from the start node * Opens the chat drawer and loads workflow variables from the start node
@@ -185,7 +185,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
*/ */
const handleStreamMessage = (data: SSEMessage[]) => { const handleStreamMessage = (data: SSEMessage[]) => {
data.forEach(item => { data.forEach(item => {
const { content, conversation_id, node_id, cycle_id, cycle_idx, input, output, error, elapsed_time, status, citations } = item.data as { const { content, conversation_id, node_id, cycle_id, cycle_idx, input, output, process, error, elapsed_time, status, citations } = item.data as {
content: string; content: string;
conversation_id: string | null; conversation_id: string | null;
cycle_id: string; cycle_id: string;
@@ -193,6 +193,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
node_id: string; node_id: string;
node_name?: string; node_name?: string;
node_type?: string; node_type?: string;
process?: any;
input?: any; input?: any;
output?: any; output?: any;
elapsed_time?: string; elapsed_time?: string;
@@ -277,6 +278,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
content: { content: {
input, input,
output, output,
process,
error, error,
}, },
status: status || 'completed', status: status || 'completed',
@@ -305,13 +307,14 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
cycle_id, cycle_id,
cycle_idx, cycle_idx,
node_id, node_id,
node_name: name, node_name: type === 'cycle-start' ? t('workflow.cycle-start') : name,
node_type: type, node_type: type,
icon, icon,
content: { content: {
cycle_idx, cycle_idx,
input, input,
output, output,
process,
error, error,
}, },
status: status || 'completed', status: status || 'completed',

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-02-24 17:57:08 * @Date: 2026-02-24 17:57:08
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-20 15:33:48 * @Last Modified time: 2026-04-24 18:04:31
*/ */
/* /*
* Runtime Component * Runtime Component
@@ -184,27 +184,30 @@ const Runtime: FC<{ item: ChatItem; index: number;}> = ({
</Flex> </Flex>
)} )}
{/* Display input and output data as JSON code blocks */} {/* Display input and output data as JSON code blocks */}
{['input', 'output'].map(key => ( {['input', 'process', 'output'].map(key => {
<div key={key} className="rb:bg-[#EBEBEB] rb:rounded-lg"> if (vo.node_type !== 'http-request' && key === 'process') return null
<div className="rb:py-2 rb:px-3 rb:flex rb:justify-between rb:items-center rb:text-[12px]"> return (
{isLoop ? t(`workflow.runtime.${key}_cycle_vars`) : t(`workflow.${key}_result`)} <div key={key} className="rb:bg-[#EBEBEB] rb:rounded-lg">
<Button <div className="rb:py-2 rb:px-3 rb:flex rb:justify-between rb:items-center rb:text-[12px]">
className="rb:py-0! rb:px-1! rb:text-[12px]!" {isLoop ? t(`workflow.runtime.${key}_cycle_vars`) : t(`workflow.${key}_result`)}
size="small" <Button
onClick={() => handleCopy(typeof vo.content === 'object' && vo.content?.[key] ? JSON.stringify(vo.content[key], null, 2) : '{}')} className="rb:py-0! rb:px-1! rb:text-[12px]!"
>{t('common.copy')}</Button> size="small"
onClick={() => handleCopy(typeof vo.content === 'object' && vo.content?.[key] ? JSON.stringify(vo.content[key], null, 2) : '{}')}
>{t('common.copy')}</Button>
</div>
<div className="rb:max-h-40 rb:overflow-auto">
<CodeBlock
size="small"
value={typeof vo.content === 'object' && vo.content?.[key] ? JSON.stringify(vo.content[key], null, 2) : '{}'}
needCopy={false}
showLineNumbers={true}
background="#EBEBEB"
/>
</div>
</div> </div>
<div className="rb:max-h-40 rb:overflow-auto"> )
<CodeBlock })}
size="small"
value={typeof vo.content === 'object' && vo.content?.[key] ? JSON.stringify(vo.content[key], null, 2) : '{}'}
needCopy={false}
showLineNumbers={true}
background="#EBEBEB"
/>
</div>
</div>
))}
</Flex> </Flex>
) )
}]} }]}

View File

@@ -65,8 +65,8 @@ const ConditionNode: ReactShapeConfig['component'] = ({ node }) => {
return ( return (
<div className={clsx('rb:cursor-pointer rb:group rb:relative rb:h-full rb:w-full rb:p-3 rb:border rb:rounded-2xl rb:bg-[#FCFCFD] rb:shadow-[0px_2px_4px_0px_rgba(23,23,25,0.03)]', { <div className={clsx('rb:cursor-pointer rb:group rb:relative rb:h-full rb:w-full rb:p-3 rb:border rb:rounded-2xl rb:bg-[#FCFCFD] rb:shadow-[0px_2px_4px_0px_rgba(23,23,25,0.03)]', {
'rb:border-[#171719]!': data.isSelected, 'rb:border-[#171719]!': data.isSelected && !data.executionStatus,
'rb:border-[#FCFCFD]': !data.isSelected, 'rb:border-[#FCFCFD]': !data.isSelected && !data.executionStatus,
'rb:border-[#369F21]!': !data.isSelected && data.executionStatus === 'completed', 'rb:border-[#369F21]!': !data.isSelected && data.executionStatus === 'completed',
'rb:border-[#FF5D34]!': !data.isSelected && data.executionStatus === 'failed', 'rb:border-[#FF5D34]!': !data.isSelected && data.executionStatus === 'failed',
})}> })}>

View File

@@ -131,8 +131,8 @@ const LoopNode: ReactShapeConfig['component'] = ({ node, graph }) => {
return ( return (
<div className={clsx('rb:cursor-pointer rb:group rb:relative rb:h-full rb:w-full rb:p-3 rb:border rb:rounded-2xl rb:bg-[#FCFCFD] rb:shadow-[0px_2px_4px_0px_rgba(23,23,25,0.03)]', { <div className={clsx('rb:cursor-pointer rb:group rb:relative rb:h-full rb:w-full rb:p-3 rb:border rb:rounded-2xl rb:bg-[#FCFCFD] rb:shadow-[0px_2px_4px_0px_rgba(23,23,25,0.03)]', {
'rb:border-[#171719]': data.isSelected, 'rb:border-[#171719]!': data.isSelected && !data.executionStatus,
'rb:border-[#FCFCFD]': !data.isSelected, 'rb:border-[#FCFCFD]': !data.isSelected && !data.executionStatus,
'rb:border-[#369F21]!': !data.isSelected && data.executionStatus === 'completed', 'rb:border-[#369F21]!': !data.isSelected && data.executionStatus === 'completed',
'rb:border-[#FF5D34]!': !data.isSelected && data.executionStatus === 'failed', 'rb:border-[#FF5D34]!': !data.isSelected && data.executionStatus === 'failed',
})}> })}>

View File

@@ -12,8 +12,8 @@ const NormalNode: ReactShapeConfig['component'] = ({ node }) => {
return ( return (
<div className={clsx('rb:cursor-pointer rb:group rb:relative rb:h-full rb:w-full rb:p-3 rb:border rb:rounded-2xl rb:bg-[#FCFCFD] rb:shadow-[0px_2px_4px_0px_rgba(23,23,25,0.03)]', { <div className={clsx('rb:cursor-pointer rb:group rb:relative rb:h-full rb:w-full rb:p-3 rb:border rb:rounded-2xl rb:bg-[#FCFCFD] rb:shadow-[0px_2px_4px_0px_rgba(23,23,25,0.03)]', {
'rb:border-[#171719]!': data.isSelected, 'rb:border-[#171719]!': data.isSelected && !data.executionStatus,
'rb:border-[#FCFCFD]': !data.isSelected, 'rb:border-[#FCFCFD]': !data.isSelected && !data.executionStatus,
'rb:border-[#369F21]!': !data.isSelected && data.executionStatus === 'completed', 'rb:border-[#369F21]!': !data.isSelected && data.executionStatus === 'completed',
'rb:border-[#FF5D34]!': !data.isSelected && data.executionStatus === 'failed', 'rb:border-[#FF5D34]!': !data.isSelected && data.executionStatus === 'failed',
})}> })}>

View File

@@ -56,6 +56,8 @@ const NODE_VARIABLES = {
], ],
'document-extractor': [ 'document-extractor': [
{ label: 'text', dataType: 'string', field: 'text' }, { label: 'text', dataType: 'string', field: 'text' },
// { label: 'chunks', dataType: 'array[string]', field: 'chunks' },
{ label: 'images', dataType: 'array[file]', field: 'images' },
], ],
'list-operator': [ 'list-operator': [
{ label: 'result', dataType: 'array[string]', field: 'result' }, { label: 'result', dataType: 'array[string]', field: 'result' },

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing * @Author: ZhaoYing
* @Date: 2026-02-03 15:17:48 * @Date: 2026-02-03 15:17:48
* @Last Modified by: ZhaoYing * @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-20 16:00:26 * @Last Modified time: 2026-04-24 17:21:09
*/ */
import { Clipboard, Graph, Keyboard, MiniMap, Node, Snapline, History, type Edge } from '@antv/x6'; import { Clipboard, Graph, Keyboard, MiniMap, Node, Snapline, History, type Edge } from '@antv/x6';
import type { HistoryCommand as Command } from '@antv/x6/lib/plugin/history/type'; import type { HistoryCommand as Command } from '@antv/x6/lib/plugin/history/type';
@@ -1492,7 +1492,7 @@ export const useWorkflowGraph = ({
// Reset all node execution status first // Reset all node execution status first
nodes.forEach(node => { nodes.forEach(node => {
const data = node.getData(); const data = node.getData();
if (typeof data.status === 'string') { if (typeof data.executionStatus === 'string') {
node.setData({ ...data, executionStatus: undefined }); node.setData({ ...data, executionStatus: undefined });
} }
}); });