Merge remote-tracking branch 'origin/develop' into feat/enduser-info-apikey

This commit is contained in:
miao
2026-04-17 10:21:26 +08:00
331 changed files with 14018 additions and 4405 deletions

View File

@@ -248,6 +248,35 @@ class RateLimiterService:
def __init__(self):
self.redis = aio_redis
async def check_tenant_rate_limit(self, tenant_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
"""
按 tenant_id 做 1 秒滑动窗口限速,限制值来自套餐配额 api_ops_rate_limit
"""
now = time.time()
window_start = now - 1 # 1 秒窗口
key = f"rate_limit:tenant_qps:{tenant_id}"
async with self.redis.pipeline() as pipe:
# 清理 1 秒前的旧记录
pipe.zremrangebyscore(key, 0, window_start)
# 加入当前请求score=时间戳member=时间戳+随机数保证唯一)
pipe.zadd(key, {f"{now}:{uuid.uuid4().hex}": now})
# 统计窗口内请求数
pipe.zcard(key)
# 设置 key 过期2 秒后自动清理)
pipe.expire(key, 2)
results = await pipe.execute()
current = results[2]
remaining = max(0, limit - current)
reset_time = int(now) + 1
return current <= limit, {
"limit": limit,
"remaining": remaining,
"reset": reset_time,
}
async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
"""
检查QPS限制

View File

@@ -26,6 +26,7 @@ from app.services.model_service import ModelApiKeyService
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
from app.services.multimodal_service import MultimodalService
from app.services.workflow_service import WorkflowService
from app.models.file_metadata_model import FileMetadata
logger = get_business_logger()
@@ -117,7 +118,10 @@ class AppChatService:
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
deep_thinking=model_parameters.get("deep_thinking", False),
thinking_budget_tokens=model_parameters.get("thinking_budget_tokens"),
json_output=model_parameters.get("json_output", False),
capability=api_key_obj.capability or [],
)
model_info = ModelInfo(
@@ -163,7 +167,14 @@ class AppChatService:
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 为需要运行时上下文的工具注入上下文
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, 'set_runtime_context'):
t.tool_instance.set_runtime_context(
user_id=user_id or "anonymous",
conversation_id=str(conversation_id) if conversation_id else None,
uploaded_files=processed_files or []
)
# 调用 Agent支持多模态
result = await agent.chat(
message=message,
@@ -205,14 +216,33 @@ class AppChatService:
"model": api_key_obj.model_name,
"usage": result.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}),
"audio_url": None,
"citations": filtered_citations
"citations": filtered_citations,
"reasoning_content": result.get("reasoning_content")
}
if files:
local_ids = [f.upload_file_id for f in files
if f.transfer_method.value == "local_file" and f.upload_file_id
and (not f.name or not f.size)]
meta_map = {}
if local_ids:
rows = self.db.query(FileMetadata).filter(
FileMetadata.id.in_(local_ids),
FileMetadata.status == "completed"
).all()
meta_map = {str(r.id): r for r in rows}
for f in files:
# url = await MultimodalService(self.db).get_file_url(f)
name, size = f.name, f.size
if f.transfer_method.value == "local_file" and f.upload_file_id and (not name or not size):
meta = meta_map.get(str(f.upload_file_id))
if meta:
name = name or meta.file_name
size = size or meta.file_size
human_meta["files"].append({
"type": f.type,
"url": f.url
"url": f.url,
"name": name,
"size": size,
"file_type": f.file_type,
})
if processed_files:
@@ -228,8 +258,13 @@ class AppChatService:
if memory_flag:
connected_config = get_end_user_connected_config(user_id, self.db)
memory_config_id: str = connected_config.get("memory_config_id")
file_list = []
for file in files:
file_dict = file.model_dump()
file_dict["upload_file_id"] = str(file_dict["upload_file_id"]) if file_dict["upload_file_id"] else None
file_list.append(file_dict)
messages = [
{"role": "user", "content": message, "files": [file.model_dump() for file in files]},
{"role": "user", "content": message, "files": file_list},
{"role": "assistant", "content": result["content"]}
]
if memory_config_id:
@@ -258,6 +293,7 @@ class AppChatService:
"conversation_id": conversation_id,
"message_id": str(message_id),
"message": result["content"],
"reasoning_content": result.get("reasoning_content"),
"usage": result.get("usage", {
"prompt_tokens": 0,
"completion_tokens": 0,
@@ -354,7 +390,11 @@ class AppChatService:
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
streaming=True
streaming=True,
deep_thinking=model_parameters.get("deep_thinking", False),
thinking_budget_tokens=model_parameters.get("thinking_budget_tokens"),
json_output=model_parameters.get("json_output", False),
capability=api_key_obj.capability or [],
)
model_info = ModelInfo(
@@ -401,8 +441,18 @@ class AppChatService:
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 为需要运行时上下文的工具注入上下文
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, 'set_runtime_context'):
t.tool_instance.set_runtime_context(
user_id=user_id or "anonymous",
conversation_id=str(conversation_id) if conversation_id else None,
uploaded_files=processed_files or []
)
# 流式调用 Agent支持多模态同时并行启动 TTS
full_content = ""
full_reasoning = ""
total_tokens = 0
text_queue: asyncio.Queue = asyncio.Queue()
@@ -426,6 +476,9 @@ class AppChatService:
):
if isinstance(chunk, int):
total_tokens = chunk
elif isinstance(chunk, dict) and chunk.get("type") == "reasoning":
full_reasoning += chunk['content']
yield f"event: reasoning\ndata: {json.dumps({'content': chunk['content']}, ensure_ascii=False)}\n\n"
else:
full_content += chunk
yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
@@ -472,14 +525,34 @@ class AppChatService:
"model": api_key_obj.model_name,
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": total_tokens},
"audio_url": None,
"citations": filtered_citations
"citations": filtered_citations,
"reasoning_content": full_reasoning or None
}
if files:
local_ids = [f.upload_file_id for f in files
if f.transfer_method.value == "local_file" and f.upload_file_id
and (not f.name or not f.size)]
meta_map = {}
if local_ids:
rows = self.db.query(FileMetadata).filter(
FileMetadata.id.in_(local_ids),
FileMetadata.status == "completed"
).all()
meta_map = {str(r.id): r for r in rows}
for f in files:
name, size = f.name, f.size
if f.transfer_method.value == "local_file" and f.upload_file_id and (not name or not size):
meta = meta_map.get(str(f.upload_file_id))
if meta:
name = name or meta.file_name
size = size or meta.file_size
human_meta["files"].append({
"type": f.type,
"url": f.url
"url": f.url,
"name": name,
"size": size,
"file_type": f.file_type,
})
if processed_files:
human_meta["history_files"] = {
@@ -494,8 +567,13 @@ class AppChatService:
if memory_flag:
connected_config = get_end_user_connected_config(user_id, self.db)
memory_config_id: str = connected_config.get("memory_config_id")
file_list = []
for file in files:
file_dict = file.model_dump()
file_dict["upload_file_id"] = str(file_dict["upload_file_id"]) if file_dict["upload_file_id"] else None
file_list.append(file_dict)
messages = [
{"role": "user", "content": message, "files": [file.model_dump() for file in files]},
{"role": "user", "content": message, "files": file_list},
{"role": "assistant", "content": full_content}
]
if memory_config_id:
@@ -652,13 +730,13 @@ class AppChatService:
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id
):
if "sub_usage" in event:
# 拦截 sub_usage 事件,累加 token
if "event: sub_usage" in event:
if "data:" in event:
try:
data_line = event.split("data: ", 1)[1].strip()
data = json.loads(data_line)
if "total_tokens" in data:
total_tokens += data["total_tokens"]
total_tokens += data.get("total_tokens", 0)
except:
pass
else:

View File

@@ -73,15 +73,14 @@ class AppDslService:
AppType.MULTI_AGENT: "multi_agent_config",
AppType.WORKFLOW: "workflow"
}.get(app.type, "config")
config_data = self._enrich_release_config(app.type, release.config or {})
config_data = self._enrich_release_config(app.type, release.config or {}, release.default_model_config_id)
dsl = {**meta, "app": app_meta, config_key: config_data}
return yaml.dump(dsl, default_flow_style=False, allow_unicode=True), f"{release.name}_v{release.version_name}.yaml"
def _enrich_release_config(self, app_type: str, cfg: dict) -> dict:
def _enrich_release_config(self, app_type: str, cfg: dict, default_model_config_id=None) -> dict:
if app_type == AppType.AGENT:
enriched = {**cfg}
if "default_model_config_id" in cfg:
enriched["default_model_config_ref"] = self._model_ref(cfg["default_model_config_id"])
enriched["default_model_config_ref"] = self._model_ref(default_model_config_id)
if "knowledge_retrieval" in cfg:
enriched["knowledge_retrieval"] = self._enrich_knowledge_retrieval(cfg["knowledge_retrieval"])
if "tools" in cfg:
@@ -91,8 +90,7 @@ class AppDslService:
return enriched
if app_type == AppType.MULTI_AGENT:
enriched = {**cfg}
if "default_model_config_id" in cfg:
enriched["default_model_config_ref"] = self._model_ref(cfg["default_model_config_id"])
enriched["default_model_config_ref"] = self._model_ref(default_model_config_id)
if "master_agent_id" in cfg:
enriched["master_agent_ref"] = self._release_ref(cfg["master_agent_id"])
if "sub_agents" in cfg:
@@ -229,8 +227,11 @@ class AppDslService:
workspace_id: uuid.UUID,
tenant_id: uuid.UUID,
user_id: uuid.UUID,
app_id: Optional[uuid.UUID] = None,
) -> tuple[App, list[str]]:
"""解析 DSL创建应用配置,返回 (new_app, warnings)"""
"""解析 DSL创建或覆盖应用配置,返回 (app, warnings)
app_id 不为空时:校验类型一致后覆盖配置;为空时创建新应用。
"""
app_meta = dsl.get("app", {})
app_type = app_meta.get("type")
if app_type not in (AppType.AGENT, AppType.MULTI_AGENT, AppType.WORKFLOW):
@@ -239,6 +240,9 @@ class AppDslService:
warnings: list[str] = []
now = datetime.datetime.now()
if app_id is not None:
return self._overwrite_dsl(dsl, app_id, app_type, workspace_id, tenant_id, warnings, now)
new_app = App(
id=uuid.uuid4(),
workspace_id=workspace_id,
@@ -258,11 +262,57 @@ class AppDslService:
self.db.add(new_app)
self.db.flush()
self._write_config(new_app.id, app_type, dsl, workspace_id, tenant_id, warnings, now, create=True)
self.db.commit()
self.db.refresh(new_app)
return new_app, warnings
def _overwrite_dsl(
self,
dsl: dict,
app_id: uuid.UUID,
app_type: str,
workspace_id: uuid.UUID,
tenant_id: uuid.UUID,
warnings: list,
now: datetime.datetime,
) -> tuple[App, list[str]]:
"""覆盖已有应用的配置,类型不一致时抛出异常"""
app = self.db.query(App).filter(
App.id == app_id,
App.workspace_id == workspace_id,
App.is_active.is_(True)
).first()
if not app:
raise ResourceNotFoundException("应用", str(app_id))
if app.type != app_type:
raise BusinessException(
f"YAML 类型 '{app_type}' 与应用类型 '{app.type}' 不一致,无法导入",
BizCode.BAD_REQUEST
)
self._write_config(app_id, app_type, dsl, workspace_id, tenant_id, warnings, now, create=False)
self.db.commit()
self.db.refresh(app)
return app, warnings
def _write_config(
self,
app_id: uuid.UUID,
app_type: str,
dsl: dict,
workspace_id: uuid.UUID,
tenant_id: uuid.UUID,
warnings: list,
now: datetime.datetime,
create: bool,
) -> None:
"""写入(新建或覆盖)应用配置"""
if app_type == AppType.AGENT:
cfg = dsl.get("agent_config") or {}
self.db.add(AgentConfig(
id=uuid.uuid4(),
app_id=new_app.id,
fields = dict(
system_prompt=cfg.get("system_prompt"),
model_parameters=cfg.get("model_parameters"),
default_model_config_id=self._resolve_model(cfg.get("default_model_config_ref"), tenant_id, warnings),
@@ -272,16 +322,21 @@ class AppDslService:
tools=self._resolve_tools(cfg.get("tools", []), tenant_id, warnings),
skills=self._resolve_skills(cfg.get("skills", {}), tenant_id, warnings),
features=cfg.get("features", {}),
is_active=True,
created_at=now,
updated_at=now,
))
)
if create:
self.db.add(AgentConfig(id=uuid.uuid4(), app_id=app_id, is_active=True, created_at=now, **fields))
else:
existing = self.db.query(AgentConfig).filter(AgentConfig.app_id == app_id).first()
if existing:
for k, v in fields.items():
setattr(existing, k, v)
else:
self.db.add(AgentConfig(id=uuid.uuid4(), app_id=app_id, is_active=True, created_at=now, **fields))
elif app_type == AppType.MULTI_AGENT:
cfg = dsl.get("multi_agent_config") or {}
self.db.add(MultiAgentConfig(
id=uuid.uuid4(),
app_id=new_app.id,
fields = dict(
orchestration_mode=cfg.get("orchestration_mode", "collaboration"),
master_agent_name=cfg.get("master_agent_name"),
model_parameters=cfg.get("model_parameters"),
@@ -291,10 +346,17 @@ class AppDslService:
routing_rules=self._resolve_routing_rules(cfg.get("routing_rules"), warnings),
execution_config=cfg.get("execution_config", {}),
aggregation_strategy=cfg.get("aggregation_strategy", "merge"),
is_active=True,
created_at=now,
updated_at=now,
))
)
if create:
self.db.add(MultiAgentConfig(id=uuid.uuid4(), app_id=app_id, is_active=True, created_at=now, **fields))
else:
existing = self.db.query(MultiAgentConfig).filter(MultiAgentConfig.app_id == app_id).first()
if existing:
for k, v in fields.items():
setattr(existing, k, v)
else:
self.db.add(MultiAgentConfig(id=uuid.uuid4(), app_id=app_id, is_active=True, created_at=now, **fields))
elif app_type == AppType.WORKFLOW:
adapter = MemoryBearAdapter(dsl)
@@ -306,20 +368,39 @@ class AppDslService:
for w in result.warnings:
warnings.append(f"[节点警告] {w.node_name or w.node_id}: {w.detail}")
wf = dsl.get("workflow") or {}
WorkflowService(self.db).create_workflow_config(
app_id=new_app.id,
nodes=[n.model_dump() for n in result.nodes],
edges=[e.model_dump() for e in result.edges],
variables=[v.model_dump() for v in result.variables],
execution_config=wf.get("execution_config", {}),
features=wf.get("features", {}),
triggers=wf.get("triggers", []),
validate=False,
)
self.db.commit()
self.db.refresh(new_app)
return new_app, warnings
wf_service = WorkflowService(self.db)
if create:
wf_service.create_workflow_config(
app_id=app_id,
nodes=[n.model_dump() for n in result.nodes],
edges=[e.model_dump() for e in result.edges],
variables=[v.model_dump() for v in result.variables],
execution_config=wf.get("execution_config", {}),
features=wf.get("features", {}),
triggers=wf.get("triggers", []),
validate=False,
)
else:
existing = self.db.query(WorkflowConfig).filter(WorkflowConfig.app_id == app_id).first()
if existing:
existing.nodes = [n.model_dump() for n in result.nodes]
existing.edges = [e.model_dump() for e in result.edges]
existing.variables = [v.model_dump() for v in result.variables]
existing.execution_config = wf.get("execution_config", {})
existing.features = wf.get("features", {})
existing.triggers = wf.get("triggers", [])
existing.updated_at = now
else:
wf_service.create_workflow_config(
app_id=app_id,
nodes=[n.model_dump() for n in result.nodes],
edges=[e.model_dump() for e in result.edges],
variables=[v.model_dump() for v in result.variables],
execution_config=wf.get("execution_config", {}),
features=wf.get("features", {}),
triggers=wf.get("triggers", []),
validate=False,
)
def _unique_app_name(self, name: str, workspace_id: uuid.UUID, app_type: AppType) -> str:
"""生成唯一应用名称,同时检查本空间自有应用和共享到本空间的应用"""

View File

@@ -13,7 +13,7 @@ import uuid
from typing import Annotated, Any, Dict, List, Optional, Tuple
from fastapi import Depends
from sqlalchemy import and_, delete, func, or_, select
from sqlalchemy import and_, delete, func, or_, select, update as sa_update
from sqlalchemy.orm import Session
from app.core.error_codes import BizCode
@@ -401,7 +401,7 @@ class AppService:
def _create_workflow_config(
self,
app_id: uuid.UUID,
data: app_schema.WorkflowConfigCreate,
data,
now: datetime.datetime
):
workflow_cfg = WorkflowConfig(
@@ -411,6 +411,7 @@ class AppService:
edges=[edge.model_dump() for edge in data.edges] if data.edges else [],
variables=[var.model_dump() for var in data.variables] if data.variables else [],
execution_config=data.execution_config.model_dump() if data.execution_config else {},
features=data.features if data.features else {},
triggers=[trigger.model_dump() for trigger in data.triggers] if data.triggers else [],
is_active=True,
created_at=now,
@@ -619,6 +620,28 @@ class AppService:
self._validate_app_accessible(app, workspace_id)
return app
def get_release_by_id(self, app_id: uuid.UUID, release_id: uuid.UUID) -> AppRelease:
"""按发布版本ID获取发布快照
Args:
app_id: 应用ID
release_id: 发布版本ID
Returns:
AppRelease: 发布快照
Raises:
BusinessException: 版本不存在或已下线
"""
from app.repositories.app_repository import get_release_by_id
release = get_release_by_id(self.db, app_id, release_id)
if not release:
raise BusinessException(
f"版本 {release_id} 不存在或已下线",
BizCode.RELEASE_NOT_FOUND,
)
return release
def create_app(
self,
*,
@@ -678,7 +701,9 @@ class AppService:
self._create_multi_agent_config(app.id, data.multi_agent_config, now)
if app.type == "workflow" and data.workflow_config:
self._create_workflow_config(app.id, data.workflow_config, now)
from app.schemas.workflow_schema import WorkflowConfigCreate
wf_data = WorkflowConfigCreate(**data.workflow_config) if isinstance(data.workflow_config, dict) else data.workflow_config
self._create_workflow_config(app.id, wf_data, now)
self.db.commit()
self.db.refresh(app)
@@ -757,6 +782,17 @@ class AppService:
# 逻辑删除应用
app.is_active = False
# 更新 app_shares 表中该应用的所有共享记录为失效状态,并更新 updated_at 时间
stmt = sa_update(AppShare).where(
AppShare.source_app_id == app_id,
AppShare.is_active.is_(True)
).values(
is_active=False,
updated_at=datetime.datetime.now()
)
self.db.execute(stmt)
self.db.commit()
logger.info(
@@ -1347,6 +1383,7 @@ class AppService:
variables=cfg.get("variables", []),
execution_config=cfg.get("execution_config", {}),
triggers=cfg.get("triggers", []),
features=cfg.get("features", {}),
is_active=True,
created_at=now,
updated_at=now,

View File

@@ -534,6 +534,7 @@ class ConversationService:
api_key = api_config.api_key
api_base = api_config.api_base
is_omni = api_config.is_omni
capability = api_config.capability
model_type = config.type
llm = RedBearLLM(
@@ -542,7 +543,8 @@ class ConversationService:
provider=provider,
api_key=api_key,
base_url=api_base,
is_omni=is_omni
is_omni=is_omni,
capability=capability,
),
type=ModelType(model_type)
)

View File

@@ -458,7 +458,7 @@ class AgentRunService:
statement = opening["statement"]
suggested_questions = opening["suggested_questions"]
# 如果有变量,进行替换(仅支持 {{var_name}} 格式)
if variables:
for var_name, var_value in variables.items():
@@ -595,6 +595,10 @@ class AgentRunService:
max_tokens=effective_params.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
deep_thinking=effective_params.get("deep_thinking", False),
thinking_budget_tokens=effective_params.get("thinking_budget_tokens"),
json_output=effective_params.get("json_output", False),
capability=api_key_config.get("capability", []),
)
# 5. 处理会话ID创建或验证新会话时写入开场白
@@ -637,7 +641,14 @@ class AgentRunService:
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
# 为需要运行时上下文的工具注入上下文
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, 'set_runtime_context'):
t.tool_instance.set_runtime_context(
user_id=user_id or "anonymous",
conversation_id=str(conversation_id) if conversation_id else None,
uploaded_files=processed_files or []
)
# 7. 知识库检索
context = None
@@ -689,7 +700,8 @@ class AgentRunService:
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
})
}),
"reasoning_content": result.get("reasoning_content")
},
files=files,
processed_files=processed_files,
@@ -701,6 +713,7 @@ class AgentRunService:
response = {
"message": result["content"],
"reasoning_content": result.get("reasoning_content"),
"conversation_id": conversation_id,
"usage": result.get("usage", {
"prompt_tokens": 0,
@@ -838,7 +851,11 @@ class AgentRunService:
max_tokens=effective_params.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
streaming=True
streaming=True,
deep_thinking=effective_params.get("deep_thinking", False),
thinking_budget_tokens=effective_params.get("thinking_budget_tokens"),
json_output=effective_params.get("json_output", False),
capability=api_key_config.get("capability", []),
)
# 5. 处理会话ID创建或验证新会话时写入开场白
@@ -882,7 +899,14 @@ class AgentRunService:
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
# 为需要运行时上下文的工具注入上下文
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, 'set_runtime_context'):
t.tool_instance.set_runtime_context(
user_id=user_id or "anonymous",
conversation_id=str(conversation_id) if conversation_id else None,
uploaded_files=processed_files or []
)
# 7. 知识库检索
context = None
@@ -898,6 +922,7 @@ class AgentRunService:
# 9. 流式调用 Agent支持多模态同时并行启动 TTS
full_content = ""
full_reasoning = ""
total_tokens = 0
# 启动流式 TTS文本边输出边合成
@@ -916,6 +941,9 @@ class AgentRunService:
):
if isinstance(chunk, int):
total_tokens = chunk
elif isinstance(chunk, dict) and chunk.get("type") == "reasoning":
full_reasoning += chunk["content"]
yield self._format_sse_event("reasoning", {"content": chunk["content"]})
else:
full_content += chunk
yield self._format_sse_event("message", {"content": chunk})
@@ -944,7 +972,8 @@ class AgentRunService:
app_id=agent_config.app_id,
user_id=user_id,
meta_data={
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": total_tokens}
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": total_tokens},
"reasoning_content": full_reasoning or None
},
files=files,
processed_files=processed_files,
@@ -1272,10 +1301,30 @@ class AgentRunService:
"history_files": {}
}
if files:
from app.models.file_metadata_model import FileMetadata
local_ids = [f.upload_file_id for f in files
if f.transfer_method.value == "local_file" and f.upload_file_id
and (not f.name or not f.size)]
meta_map = {}
if local_ids:
rows = self.db.query(FileMetadata).filter(
FileMetadata.id.in_(local_ids),
FileMetadata.status == "completed"
).all()
meta_map = {str(r.id): r for r in rows}
for f in files:
name, size = f.name, f.size
if f.transfer_method.value == "local_file" and f.upload_file_id and (not name or not size):
meta = meta_map.get(str(f.upload_file_id))
if meta:
name = name or meta.file_name
size = size or meta.file_size
human_meta["files"].append({
"type": f.type,
"url": f.url
"url": f.url,
"file_type": f.file_type,
"name": name,
"size": size
})
# 保存 history_files包含 provider 和 is_omni 信息
@@ -1665,7 +1714,7 @@ class AgentRunService:
"""从 text_queue 取文本按句子切分后喂给 synthesizer"""
import re
buf = ""
sentence_end = re.compile(r'[\u3002\uff01\uff1f\.!?\n]')
sentence_end = re.compile(r'[\u3002\uff01\uff1f.!?\n]')
while True:
chunk = await text_queue.get()
if chunk is None:
@@ -1894,6 +1943,7 @@ class AgentRunService:
"conversation_id": result['conversation_id'],
"parameters_used": model_info["parameters"],
"message": result.get("message"),
"reasoning_content": result.get("reasoning_content"),
"usage": usage,
"elapsed_time": elapsed,
"tokens_per_second": (
@@ -2012,7 +2062,7 @@ class AgentRunService:
# 需要从 ModelApiKey 获取实际的模型名称,或者在 ModelConfig 中添加 model 字段
return None
def _with_parameters(self, agent_config: AgentConfig, parameters: Dict[str, Any]) -> AgentConfig:
def _with_parameters(self, agent_config: AgentConfig, parameters: Dict[str, Any]) -> tuple[AgentConfig, Any]:
"""创建一个带有覆盖参数的 agent_config浅拷贝只修改 model_parameters
Args:
@@ -2110,6 +2160,7 @@ class AgentRunService:
start_time = time.time()
full_content = ""
full_reasoning = ""
returned_conversation_id = model_conversation_id
audio_url = None
audio_status = None
@@ -2168,6 +2219,18 @@ class AgentRunService:
"content": chunk
}))
# 转发深度思考事件(带模型标识)
if event_type == "reasoning" and event_data:
reasoning_chunk = event_data.get("content", "")
full_reasoning += reasoning_chunk
await event_queue.put(self._format_sse_event("model_reasoning", {
"model_index": idx,
"model_config_id": model_config_id,
"label": model_label,
"conversation_id": returned_conversation_id,
"content": event_data.get("content", "")
}))
# 从 end 事件中提取 features 输出字段
if event_type == "end" and event_data:
audio_url = event_data.get("audio_url")
@@ -2199,6 +2262,7 @@ class AgentRunService:
"conversation_id": returned_conversation_id,
"parameters_used": model_info["parameters"],
"message": full_content,
"reasoning_content": full_reasoning or None,
"elapsed_time": elapsed,
"audio_url": audio_url,
"audio_status": audio_status,
@@ -2351,6 +2415,7 @@ class AgentRunService:
"label": r["label"],
"conversation_id": r.get("conversation_id"),
"message": r.get("message"),
"reasoning_content": r.get("reasoning_content"),
"elapsed_time": r.get("elapsed_time", 0),
"audio_url": r.get("audio_url"),
"audio_status": r.get("audio_status"),

View File

@@ -679,9 +679,9 @@ class EmotionAnalyticsService:
# 查询用户的实体和标签
query = """
MATCH (e:Entity)
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id
RETURN e.name as name, e.type as type
RETURN e.name as name, e.entity_type as type
ORDER BY e.created_at DESC
LIMIT 20
"""

View File

@@ -34,6 +34,7 @@ from app.schemas.implicit_memory_schema import (
UserMemorySummary,
)
from app.schemas.memory_config_schema import MemoryConfig
from app.services.memory_base_service import MIN_MEMORY_SUMMARY_COUNT
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
@@ -379,12 +380,59 @@ class ImplicitMemoryService:
raise
def _build_empty_profile(self) -> dict:
"""构建 MemorySummary 不足时返回的固定空白画像数据"""
now_ms = int(datetime.utcnow().timestamp() * 1000)
insufficient = "Insufficient data for analysis"
def _empty_dimension(name: str) -> dict:
return {
"evidence": [insufficient],
"reasoning": f"No clear evidence found for {name} dimension",
"percentage": 0.0,
"dimension_name": name,
"confidence_level": 20,
}
def _empty_category(name: str) -> dict:
return {
"evidence": [insufficient],
"percentage": 25.0,
"category_name": name,
"trending_direction": None,
}
return {
"habits": [],
"portrait": {
"aesthetic": _empty_dimension("aesthetic"),
"creativity": _empty_dimension("creativity"),
"literature": _empty_dimension("literature"),
"technology": _empty_dimension("technology"),
"historical_trends": None,
"analysis_timestamp": now_ms,
"total_summaries_analyzed": 0,
},
"preferences": [],
"interest_areas": {
"art": _empty_category("art"),
"tech": _empty_category("tech"),
"music": _empty_category("music"),
"lifestyle": _empty_category("lifestyle"),
"analysis_timestamp": now_ms,
"total_summaries_analyzed": 0,
},
}
async def generate_complete_profile(
self,
user_id: str
) -> dict:
"""生成完整的用户画像包含所有4个模块
需要该用户的 MemorySummary 节点数量 >= 5 才会真正调用 LLM 生成画像,
否则返回固定的空白画像数据。
Args:
user_id: 用户ID
@@ -394,6 +442,16 @@ class ImplicitMemoryService:
logger.info(f"生成完整用户画像: user={user_id}")
try:
# 前置检查:查询该用户有效的 MemorySummary 节点数量(排除孤立节点)
from app.services.memory_base_service import MemoryBaseService
base_service = MemoryBaseService()
memory_summary_count = await base_service.get_valid_memory_summary_count(user_id)
logger.info(f"用户 MemorySummary 节点数量: {memory_summary_count} (user={user_id})")
if memory_summary_count < MIN_MEMORY_SUMMARY_COUNT:
logger.info(f"MemorySummary 数量不足 {MIN_MEMORY_SUMMARY_COUNT}(当前 {memory_summary_count}),返回空白画像: user={user_id}")
return self._build_empty_profile()
# 并行调用4个分析方法
preferences, portrait, interest_areas, habits = await asyncio.gather(
self.get_preference_tags(user_id=user_id),

View File

@@ -415,8 +415,11 @@ class LLMRouter:
api_key=api_key_config.api_key,
base_url=api_key_config.api_base,
is_omni=api_key_config.is_omni,
temperature=0.3,
max_tokens=500
capability=api_key_config.capability,
extra_params={
"temperature": 0.3,
"max_tokens": 500
}
)
logger.debug(f"创建 LLM 实例 - Provider: {api_key_config.provider}, Model: {api_key_config.model_name}")

View File

@@ -393,6 +393,7 @@ class MasterAgentRouter:
api_key=api_key_config.api_key,
base_url=api_key_config.api_base,
is_omni=api_key_config.is_omni,
capability=api_key_config.capability,
extra_params = extra_params
)
@@ -403,6 +404,17 @@ class MasterAgentRouter:
response = await llm.ainvoke(prompt)
ModelApiKeyService.record_api_key_usage(self.db, api_key_config.id)
# 提取 token 消耗
self._last_routing_tokens = 0
if hasattr(response, 'usage_metadata') and response.usage_metadata:
um = response.usage_metadata
self._last_routing_tokens = um.get("total_tokens", 0) if isinstance(um, dict) else getattr(um, "total_tokens", 0)
elif hasattr(response, 'response_metadata') and response.response_metadata:
token_usage = response.response_metadata.get("token_usage") or response.response_metadata.get("usage", {})
if isinstance(token_usage, dict):
self._last_routing_tokens = token_usage.get("total_tokens", 0)
logger.info(f"Master Agent 路由 token 消耗: {self._last_routing_tokens}")
# 提取响应内容
if hasattr(response, 'content'):
return response.content

View File

@@ -462,11 +462,6 @@ class MemoryAgentService:
logger.info(f"Read operation for group {end_user_id} with config_id {config_id}")
# 导入审计日志记录器
config_load_start = time.time()
try:
# Use a separate database session to avoid transaction failures
@@ -507,10 +502,13 @@ class MemoryAgentService:
async with make_read_graph() as graph:
config = {"configurable": {"thread_id": end_user_id}}
# 初始状态 - 包含所有必要字段
initial_state = {"messages": [HumanMessage(content=message)], "search_switch": search_switch,
"end_user_id": end_user_id
, "storage_type": storage_type, "user_rag_memory_id": user_rag_memory_id,
"memory_config": memory_config}
initial_state = {
"messages": [HumanMessage(content=message)],
"search_switch": search_switch,
"end_user_id": end_user_id
, "storage_type": storage_type,
"user_rag_memory_id": user_rag_memory_id,
"memory_config": memory_config}
# 获取节点更新信息
_intermediate_outputs = []
summary = ''
@@ -522,7 +520,7 @@ class MemoryAgentService:
for node_name, node_data in update_event.items():
# if 'save_neo4j' == node_name:
# massages = node_data
print(f"处理节点: {node_name}")
logger.info(f"处理节点: {node_name}")
# 处理不同Summary节点的返回结构
if 'Summary' in node_name:
@@ -549,6 +547,11 @@ class MemoryAgentService:
if retrieve_node and retrieve_node != [] and retrieve_node != {}:
_intermediate_outputs.extend(retrieve_node)
# Perceptual_Retrieve 节点
perceptual_node = node_data.get('perceptual_data', {}).get('_intermediate', None)
if perceptual_node and perceptual_node != [] and perceptual_node != {}:
_intermediate_outputs.append(perceptual_node)
# Verify 节点
verify_n = node_data.get('verify', {}).get('_intermediate', None)
if verify_n and verify_n != [] and verify_n != {}:

View File

@@ -265,12 +265,50 @@ async def Translation_English(modid, text, fields=None):
# 其他类型数字、布尔值、None等原样返回
else:
return text
# 隐性记忆画像生成所需的最低 MemorySummary 节点数量
MIN_MEMORY_SUMMARY_COUNT = 5
class MemoryBaseService:
"""记忆服务基类,提供共享的辅助方法"""
def __init__(self):
self.neo4j_connector = Neo4jConnector()
async def get_valid_memory_summary_count(
self,
end_user_id: str
) -> int:
"""获取用户有效的 MemorySummary 节点数量(排除孤立节点)。
只统计存在 DERIVED_FROM_STATEMENT 关系的 MemorySummary 节点。
Args:
end_user_id: 终端用户ID
Returns:
有效 MemorySummary 节点数量
"""
try:
query = """
MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement)
WHERE n.end_user_id = $end_user_id
RETURN count(DISTINCT n) as count
"""
result = await self.neo4j_connector.execute_query(
query, end_user_id=end_user_id
)
count = result[0]["count"] if result and len(result) > 0 else 0
logger.debug(
f"有效 MemorySummary 节点数量: {count} (end_user_id={end_user_id})"
)
return count
except Exception as e:
logger.error(
f"获取有效 MemorySummary 数量失败: {str(e)}", exc_info=True
)
return 0
@staticmethod
def parse_timestamp(timestamp_value) -> Optional[int]:
"""

View File

@@ -353,15 +353,13 @@ async def get_workspace_total_memory_count(
"details": []
}
# 2. 对每个 host_id 调用 search_all 获取 total
# 2. 使用 search_all_batch 批量查询所有宿主的记忆数量
from app.services import memory_storage_service
total_count = 0
details = []
# 如果提供了 end_user_id只查询该用户
if end_user_id:
search_result = await memory_storage_service.search_all(end_user_id=end_user_id)
batch_result = await memory_storage_service.search_all_batch([end_user_id])
count = batch_result.get(end_user_id, 0)
# 查询用户名称
from app.repositories.end_user_repository import EndUserRepository
repo = EndUserRepository(db)
@@ -369,42 +367,31 @@ async def get_workspace_total_memory_count(
user_name = end_user.other_name if end_user else None
return {
"total_memory_count": search_result.get("total", 0),
"total_memory_count": count,
"host_count": 1,
"details": [{
"end_user_id": end_user_id,
"count": search_result.get("total", 0),
"count": count,
"name": user_name
}]
}
for host in hosts:
try:
end_user_id_str = str(host.id)
search_result = await memory_storage_service.search_all(
end_user_id=end_user_id_str
)
host_total = search_result.get("total", 0)
total_count += host_total
details.append({
"end_user_id": end_user_id_str,
"count": host_total,
"name": host.other_name # 使用 other_name 字段
})
business_logger.debug(f"EndUser {end_user_id_str} ({host.other_name}) 记忆数: {host_total}")
except Exception as e:
business_logger.warning(f"获取 end_user {host.id} 记忆数失败: {str(e)}")
# 失败的 host 记为 0
details.append({
"end_user_id": str(host.id),
"count": 0,
"name": host.other_name # 使用 other_name 字段
})
# 批量查询所有宿主记忆数量(一次 Neo4j 查询)
end_user_ids = [str(host.id) for host in hosts]
batch_result = await memory_storage_service.search_all_batch(end_user_ids)
# 构建 host name 映射
host_name_map = {str(host.id): host.other_name for host in hosts}
total_count = sum(batch_result.values())
details = [
{
"end_user_id": uid,
"count": batch_result.get(uid, 0),
"name": host_name_map.get(uid)
}
for uid in end_user_ids
]
result = {
"total_memory_count": total_count,
@@ -519,6 +506,180 @@ def get_rag_user_kb_total_chunk(
business_logger.error(f"获取用户知识库总chunk数失败: workspace_id={workspace_id} - {str(e)}")
raise
def get_dashboard_yesterday_changes(
db: Session,
workspace_id: uuid.UUID,
storage_type: str,
today_data: dict
) -> dict:
"""
计算各指标相比昨天的变化百分比。
- total_app_change / total_knowledge_change只看活跃记录
百分比 = (截止今日活跃总量 - 截止昨日活跃总量) / 截止昨日活跃总量
- total_memory_change / total_api_call_change
百分比 = (今日总量 - 昨日总量) / 昨日总量
昨日总量为 0 时返回 None。返回值为浮点数例如 0.5 表示增长 50%
Args:
db: 数据库会话
workspace_id: 工作空间ID
storage_type: 存储类型 'neo4j' | 'rag'
today_data: 当前数据,包含 total_memory, total_app, total_knowledge, total_api_call
Returns:
{
"total_memory_change": float | None,
"total_app_change": float | None,
"total_knowledge_change": float | None,
"total_api_call_change": float | None
}
"""
from datetime import datetime
from sqlalchemy import func
from app.models.api_key_model import ApiKey, ApiKeyLog
from app.models.knowledge_model import Knowledge
from app.models.app_model import App
from app.models.appshare_model import AppShare
business_logger.info(f"计算昨日对比百分比: workspace_id={workspace_id}, storage_type={storage_type}")
now_local = datetime.now()
today_start = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
changes = {
"total_memory_change": None,
"total_app_change": None,
"total_knowledge_change": None,
"total_api_call_change": None,
}
def _calc_percentage(today_val, yesterday_val):
"""计算百分比昨日为0时返回None"""
if yesterday_val is None or yesterday_val == 0:
return None
return round((today_val - yesterday_val) / yesterday_val, 4)
# --- total_api_call_change: (截止今日累计总数 - 截止昨日累计总数) / 截止昨日累计总数 ---
try:
api_key_ids = [
row[0] for row in db.query(ApiKey.id).filter(
ApiKey.workspace_id == workspace_id
).all()
]
if api_key_ids:
# 截止今日的累计调用总数
total_api_until_now = db.query(func.count(ApiKeyLog.id)).filter(
ApiKeyLog.api_key_id.in_(api_key_ids),
ApiKeyLog.created_at < now_local
).scalar() or 0
# 截止昨日的累计调用总数today_start 即昨日结束)
total_api_until_yesterday = db.query(func.count(ApiKeyLog.id)).filter(
ApiKeyLog.api_key_id.in_(api_key_ids),
ApiKeyLog.created_at < today_start
).scalar() or 0
changes["total_api_call_change"] = _calc_percentage(total_api_until_now, total_api_until_yesterday)
else:
changes["total_api_call_change"] = None
except Exception as e:
business_logger.warning(f"计算API调用昨日对比失败: {str(e)}")
# --- total_knowledge_change: 只看活跃(status=1)且为顶层知识库(parent_id=workspace_id),百分比 = (今日活跃总量 - 昨日活跃总量) / 昨日活跃总量 ---
try:
# 截止今日的活跃知识库总量(当前 status=1parent_id=workspace_id
today_knowledge = db.query(func.count(Knowledge.id)).filter(
Knowledge.workspace_id == workspace_id,
Knowledge.status == 1,
Knowledge.parent_id == Knowledge.workspace_id
).scalar() or 0
# 截止昨日的活跃知识库总量(昨日之前创建的、当前仍 status=1parent_id=workspace_id
yesterday_knowledge = db.query(func.count(Knowledge.id)).filter(
Knowledge.workspace_id == workspace_id,
Knowledge.status == 1,
Knowledge.parent_id == Knowledge.workspace_id,
Knowledge.created_at < today_start
).scalar() or 0
changes["total_knowledge_change"] = _calc_percentage(today_knowledge, yesterday_knowledge)
except Exception as e:
business_logger.warning(f"计算知识库昨日对比失败: {str(e)}")
# --- total_app_change: 只看活跃(is_active=True),百分比 = (今日活跃总量 - 昨日活跃总量) / 昨日活跃总量 ---
try:
# === 自有app ===
today_own_apps = db.query(func.count(App.id)).filter(
App.workspace_id == workspace_id,
App.is_active == True
).scalar() or 0
yesterday_own_apps = db.query(func.count(App.id)).filter(
App.workspace_id == workspace_id,
App.is_active == True,
App.created_at < today_start
).scalar() or 0
# === 被分享app ===
today_shared_apps = db.query(func.count(AppShare.id)).filter(
AppShare.target_workspace_id == workspace_id,
AppShare.is_active == True
).scalar() or 0
yesterday_shared_apps = db.query(func.count(AppShare.id)).filter(
AppShare.target_workspace_id == workspace_id,
AppShare.is_active == True,
AppShare.created_at < today_start
).scalar() or 0
today_total_app = today_own_apps + today_shared_apps
yesterday_total_app = yesterday_own_apps + yesterday_shared_apps
changes["total_app_change"] = _calc_percentage(today_total_app, yesterday_total_app)
except Exception as e:
business_logger.warning(f"计算应用数量昨日对比失败: {str(e)}")
# --- total_memory_change: (今日总量 - 昨日总量) / 昨日总量 ---
try:
today_memory = today_data.get("total_memory")
if today_memory is None:
changes["total_memory_change"] = None
elif storage_type == "neo4j":
last_record = db.query(MemoryIncrement).filter(
MemoryIncrement.workspace_id == workspace_id,
MemoryIncrement.created_at < today_start
).order_by(desc(MemoryIncrement.created_at)).first()
if last_record is None or last_record.total_num == 0:
changes["total_memory_change"] = None
else:
changes["total_memory_change"] = _calc_percentage(today_memory, last_record.total_num)
elif storage_type == "rag":
from app.models.document_model import Document
from app.models.end_user_model import EndUser as _EndUser
from app.models.app_model import App as _App
end_user_ids = [
str(eid) for (eid,) in db.query(_EndUser.id)
.join(_App, _EndUser.app_id == _App.id)
.filter(_App.workspace_id == workspace_id)
.all()
]
if not end_user_ids:
changes["total_memory_change"] = None
else:
file_names = [f"{uid}.txt" for uid in end_user_ids]
yesterday_chunk = int(db.query(func.sum(Document.chunk_num)).filter(
Document.file_name.in_(file_names),
Document.created_at < today_start
).scalar() or 0)
if yesterday_chunk == 0:
changes["total_memory_change"] = None
else:
changes["total_memory_change"] = _calc_percentage(today_memory, yesterday_chunk)
except Exception as e:
business_logger.warning(f"计算记忆总量昨日对比失败: {str(e)}")
business_logger.info(f"昨日对比百分比计算完成: {changes}")
return changes
def get_current_user_total_chunk(
end_user_id: str,
db: Session,
@@ -642,7 +803,6 @@ def get_rag_content(
"page": {
"page": page,
"pagesize": pagesize,
"total": 0,
"hasnext": False,
},
"items": []
@@ -736,13 +896,12 @@ def get_rag_content(
"page": {
"page": page,
"pagesize": pagesize,
"total": global_total,
"hasnext": offset_end < global_total,
},
"items": conversations
}
business_logger.info(f"成功获取RAG内容: total={global_total}, page={page}, 返回={len(conversations)} 条对话")
business_logger.info(f"成功获取RAG内容: page={page}, 返回={len(conversations)} 条对话")
return result
except Exception as e:
@@ -881,4 +1040,60 @@ async def generate_rag_profile(
"tags_count": len(tags),
"personas_count": len(personas),
"insight_generated": bool(insight_sections.get("memory_insight")),
}
}
def get_dashboard_common_stats(db: Session, workspace_id) -> dict:
"""
获取 dashboard 中 neo4j/rag 分支共享的统计数据:
total_app、total_knowledge、total_api_call
Returns:
dict: {"total_app": int, "total_knowledge": int, "total_api_call": int}
"""
result = {"total_app": 0, "total_knowledge": 0, "total_api_call": 0}
# total_app: 统计当前空间下的所有app数量包含自有 + 被分享给本工作空间的app
try:
from app.services import app_service as _app_svc
_, total_app = _app_svc.AppService(db).list_apps(
workspace_id=workspace_id, include_shared=True, pagesize=1
)
result["total_app"] = total_app
except Exception as e:
business_logger.warning(f"获取应用数量失败: {e}")
# total_knowledge: 统计顶层知识库parent_id = workspace_id
try:
from sqlalchemy import func as _func
from app.models.knowledge_model import Knowledge as _Knowledge
total_knowledge = db.query(_func.count(_Knowledge.id)).filter(
_Knowledge.workspace_id == workspace_id,
_Knowledge.status == 1,
_Knowledge.parent_id == _Knowledge.workspace_id
).scalar() or 0
result["total_knowledge"] = total_knowledge
except Exception as e:
business_logger.warning(f"获取知识库数量失败: {e}")
# total_api_call: 截止当前的历史累计调用总数
try:
from sqlalchemy import func as _api_func
from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog
_api_key_ids = [
row[0] for row in db.query(_ApiKey.id).filter(
_ApiKey.workspace_id == workspace_id
).all()
]
if _api_key_ids:
total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter(
_ApiKeyLog.api_key_id.in_(_api_key_ids)
).scalar() or 0
else:
total_api_calls = 0
result["total_api_call"] = total_api_calls
except Exception as e:
business_logger.warning(f"获取API调用统计失败: {e}")
return result

View File

@@ -232,7 +232,8 @@ class MemoryPerceptualService:
provider=model_config.provider,
api_key=model_config.api_key,
base_url=model_config.api_base,
is_omni=model_config.is_omni
is_omni=model_config.is_omni,
capability=model_config.capability,
)
)
return llm, model_config

View File

@@ -613,37 +613,6 @@ async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]:
return data
async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]:
result = await _neo4j_connector.execute_query(
MemoryConfigRepository.SEARCH_FOR_ALL,
end_user_id=end_user_id,
)
# 检查结果是否为空或长度不足
if not result or len(result) < 4:
data = {
"total": 0,
"counts": {
"dialogue": 0,
"chunk": 0,
"statement": 0,
"entity": 0,
},
}
return data
data = {
"total": result[-1]["Count"],
"counts": {
"dialogue": result[0]["Count"],
"chunk": result[1]["Count"],
"statement": result[2]["Count"],
"entity": result[3]["Count"],
},
}
return data
async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]:
"""统一知识库类型分布接口。

View File

@@ -45,12 +45,21 @@ class ModelParameterMerger:
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"n": 1,
"stop": None
"stop": None,
"deep_thinking": False,
"thinking_budget_tokens": None,
"json_output": False
}
# 合并参数:默认值 -> 模型配置 -> Agent 配置
merged = default_params.copy()
# Pydantic 对象转为 dict
if model_config_params and hasattr(model_config_params, 'model_dump'):
model_config_params = model_config_params.model_dump()
if agent_config_params and hasattr(agent_config_params, 'model_dump'):
agent_config_params = agent_config_params.model_dump()
# 应用模型配置参数
if model_config_params:
for key in default_params:

View File

@@ -85,15 +85,16 @@ class ModelConfigService:
@staticmethod
async def validate_model_config(
db: Session,
*,
model_name: str,
provider: str,
api_key: str,
api_base: Optional[str] = None,
model_type: str = "llm",
test_message: str = "Hello",
is_omni: bool = False
db: Session,
*,
model_name: str,
provider: str,
api_key: str,
api_base: Optional[str] = None,
model_type: str = "llm",
test_message: str = "Hello",
is_omni: bool = False,
capability: Optional[list] = None
) -> Dict[str, Any]:
"""验证模型配置是否有效
@@ -124,8 +125,11 @@ class ModelConfigService:
api_key=api_key,
base_url=api_base,
is_omni=is_omni,
temperature=0.7,
max_tokens=100
capability=capability,
extra_params={
"temperature": 0.7,
"max_tokens": 100
}
)
# 根据模型类型选择不同的验证方式
@@ -320,7 +324,8 @@ class ModelConfigService:
api_base=api_key_data.api_base,
model_type=model_data.type,
test_message="Hello",
is_omni=model_data.is_omni
is_omni=model_data.is_omni,
capability=model_data.capability
)
if not validation_result["valid"]:
raise BusinessException(
@@ -590,7 +595,8 @@ class ModelApiKeyService:
api_base=data.api_base,
model_type=model_config.type,
test_message="Hello",
is_omni=data.is_omni
is_omni=data.is_omni,
capability=model_config.capability
)
if not validation_result["valid"]:
# 记录验证失败的模型,但不抛出异常
@@ -675,7 +681,8 @@ class ModelApiKeyService:
api_base=api_key_data.api_base,
model_type=model_config.type,
test_message="Hello",
is_omni=api_key_data.is_omni
is_omni=api_key_data.is_omni,
capability=model_config.capability
)
if not validation_result["valid"]:
raise BusinessException(
@@ -707,7 +714,8 @@ class ModelApiKeyService:
api_base=api_key_data.api_base or existing_api_key.api_base,
model_type=model_config.type,
test_message="Hello",
is_omni=model_config.is_omni
is_omni=model_config.is_omni,
capability=model_config.capability
)
if not validation_result["valid"]:
raise BusinessException(
@@ -723,10 +731,21 @@ class ModelApiKeyService:
@staticmethod
def delete_api_key(db: Session, api_key_id: uuid.UUID) -> bool:
"""删除API Key"""
if not ModelApiKeyRepository.get_by_id(db, api_key_id):
api_key = ModelApiKeyRepository.get_by_id(db, api_key_id)
if not api_key:
raise BusinessException("API Key不存在", BizCode.NOT_FOUND)
model_config_ids = [mc.id for mc in api_key.model_configs]
success = ModelApiKeyRepository.delete(db, api_key_id)
for model_config_id in model_config_ids:
model_config = ModelConfigRepository.get_by_id(db, model_config_id)
if model_config:
has_active_key = any(key.is_active for key in model_config.api_keys)
if not has_active_key and model_config.is_active:
model_config.is_active = False
db.commit()
return success

View File

@@ -287,6 +287,11 @@ class MultiAgentOrchestrator:
sub_conversation_id = None
total_tokens = 0
# 累加 Master Agent 路由决策消耗的 token
total_tokens += task_analysis.get("routing_tokens", 0)
# 累加 Master Agent 整合消耗的 token
total_tokens += getattr(self, '_last_merge_tokens', 0)
if isinstance(results, dict):
sub_conversation_id = results.get("conversation_id") or results.get("result", {}).get("conversation_id")
# 提取 token 信息
@@ -358,12 +363,16 @@ class MultiAgentOrchestrator:
variables=variables
)
# 获取路由决策消耗的 token
routing_tokens = getattr(self.router, '_last_routing_tokens', 0)
logger.info(
"Master Agent 分析完成",
extra={
"selected_agent": routing_decision.get("selected_agent_id"),
"confidence": routing_decision.get("confidence"),
"strategy": routing_decision.get("strategy")
"strategy": routing_decision.get("strategy"),
"routing_tokens": routing_tokens
}
)
@@ -372,7 +381,8 @@ class MultiAgentOrchestrator:
"variables": variables or {},
"sub_agents": self.config.sub_agents,
"initial_context": variables or {},
"routing_decision": routing_decision
"routing_decision": routing_decision,
"routing_tokens": routing_tokens
}
async def _execute_sequential(
@@ -1032,6 +1042,11 @@ class MultiAgentOrchestrator:
# 5. 流式执行子 Agent
sub_conversation_id = None
# Master Agent 路由决策消耗的 token通过 sub_usage 事件发送给上层
routing_tokens = task_analysis.get("routing_tokens", 0)
if routing_tokens > 0:
yield self._format_sse_event("sub_usage", {"total_tokens": routing_tokens})
async for event in self._execute_sub_agent_stream(
agent_data["config"],
message,
@@ -1054,6 +1069,7 @@ class MultiAgentOrchestrator:
except:
pass
# 直接透传所有事件(包括 sub_usage累加统一由上层处理
yield event
# 6. 如果有会话 ID发送一个包含它的事件
@@ -2600,8 +2616,11 @@ class MultiAgentOrchestrator:
api_key=api_key_config.api_key,
base_url=api_key_config.api_base,
is_omni=api_key_config.is_omni,
temperature=0.7, # 整合任务使用中等温度
max_tokens=2000
capability=api_key_config.capability,
extra_params={
"temperature": 0.7, # 整合任务使用中等温度
"max_tokens": 2000
}
)
# 创建 LLM 实例
@@ -2612,6 +2631,17 @@ class MultiAgentOrchestrator:
ModelApiKeyService.record_api_key_usage(self.db, api_key_config.id)
# 提取整合消耗的 token
merge_tokens = 0
if hasattr(response, 'usage_metadata') and response.usage_metadata:
um = response.usage_metadata
merge_tokens = um.get("total_tokens", 0) if isinstance(um, dict) else getattr(um, "total_tokens", 0)
elif hasattr(response, 'response_metadata') and response.response_metadata:
token_usage = response.response_metadata.get("token_usage") or response.response_metadata.get("usage", {})
if isinstance(token_usage, dict):
merge_tokens = token_usage.get("total_tokens", 0)
self._last_merge_tokens = merge_tokens
# 提取响应内容
if hasattr(response, 'content'):
merged_response = response.content
@@ -2621,7 +2651,8 @@ class MultiAgentOrchestrator:
logger.info(
"Master Agent 整合完成",
extra={
"merged_length": len(merged_response)
"merged_length": len(merged_response),
"merge_tokens": merge_tokens
}
)
@@ -2766,9 +2797,12 @@ class MultiAgentOrchestrator:
api_key=api_key_config.api_key,
base_url=api_key_config.api_base,
is_omni=api_key_config.is_omni,
temperature=0.7,
max_tokens=2000,
extra_params={"streaming": True} # 启用流式输出
capability=api_key_config.capability,
extra_params={
"temperature": 0.7,
"max_tokens": 2000,
"streaming": True # 启用流式输出
}
)
# 创建 LLM 实例

View File

@@ -185,7 +185,8 @@ class PromptOptimizerService:
provider=api_config.provider,
api_key=api_config.api_key,
base_url=api_config.api_base,
is_omni=api_config.is_omni
is_omni=api_config.is_omni,
capability=api_config.capability,
), type=ModelType(model_config.type))
try:
prompt_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompt')
@@ -226,10 +227,20 @@ class PromptOptimizerService:
content = getattr(chunk, "content", chunk)
if not content:
continue
buffer += content
if isinstance(content, str):
buffer += content
elif isinstance(content, list):
for _ in content:
buffer += _["text"]
else:
logger.error(f"Unsupported content type - {content}")
raise Exception("Unsupported content type")
cache = buffer[:-20]
last_idx = 19
while cache and cache[-1] == '\\' and last_idx > 0:
cache = buffer[:-last_idx]
last_idx -= 1
# 尝试找到 "prompt": " 开始位置
if prompt_finished:
continue
@@ -271,7 +282,7 @@ class PromptOptimizerService:
def parser_prompt_variables(prompt: str):
try:
pattern = r'\{\{\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\}\}'
matches = re.findall(pattern, prompt)
matches = re.findall(pattern, str(prompt))
variables = list(set(matches))
return variables
except Exception as e:

View File

@@ -248,7 +248,10 @@ class SharedChatService:
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
deep_thinking=model_parameters.get("deep_thinking", False),
thinking_budget_tokens=model_parameters.get("thinking_budget_tokens"),
json_output=model_parameters.get("json_output", False),
capability=api_key_obj.capability,
)
# 加载历史消息
@@ -450,7 +453,11 @@ class SharedChatService:
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
streaming=True
streaming=True,
deep_thinking=model_parameters.get("deep_thinking", False),
thinking_budget_tokens=model_parameters.get("thinking_budget_tokens"),
json_output=model_parameters.get("json_output", False),
capability=api_key_obj.capability or [],
)
# 加载历史消息
@@ -479,6 +486,8 @@ class SharedChatService:
):
if isinstance(chunk, int):
total_tokens = chunk
elif isinstance(chunk, dict) and chunk.get("type") == "reasoning":
yield f"event: reasoning\ndata: {json.dumps({'content': chunk['content']}, ensure_ascii=False)}\n\n"
else:
full_content += chunk
# 发送消息块事件

View File

@@ -34,7 +34,8 @@ BUILTIN_TOOLS = {
"JsonTool": "app.core.tools.builtin.json_tool",
"BaiduSearchTool": "app.core.tools.builtin.baidu_search_tool",
"MinerUTool": "app.core.tools.builtin.mineru_tool",
"TextInTool": "app.core.tools.builtin.textin_tool"
"TextInTool": "app.core.tools.builtin.textin_tool",
"OpenClawTool": "app.core.tools.builtin.openclaw_tool",
}
@@ -340,18 +341,18 @@ class ToolService:
return {"success": False, "message": f"测试失败: {str(e)}"}
def ensure_builtin_tools_initialized(self, tenant_id: uuid.UUID):
"""确保内置工具已初始化"""
existing = self.tool_repo.exists_builtin_for_tenant(self.db, tenant_id)
if existing:
"""确保内置工具已初始化(支持增量补充新工具)"""
builtin_config = self._load_builtin_config()
if not builtin_config:
return
# 从配置文件加载内置工具定义
builtin_config = self._load_builtin_config()
existing_classes = self.builtin_repo.get_existing_tool_classes(self.db, tenant_id)
added = False
for tool_key, tool_info in builtin_config.items():
if tool_info['tool_class'] in existing_classes:
continue
try:
# 创建工具配置
initial_status = self._determine_initial_status(tool_info)
tool_config = ToolConfig(
name=tool_info['name'],
@@ -367,7 +368,6 @@ class ToolService:
self.db.add(tool_config)
self.db.flush()
# 创建内置工具配置
builtin_config_obj = BuiltinToolConfig(
id=tool_config.id,
tool_class=tool_info['tool_class'],
@@ -375,12 +375,14 @@ class ToolService:
requires_config=tool_info.get('requires_config', False)
)
self.db.add(builtin_config_obj)
added = True
except Exception as e:
logger.error(f"初始化内置工具失败: {tool_key}, {e}")
self.db.commit()
logger.info(f"租户 {tenant_id} 内置工具初始化完成")
if added:
self.db.commit()
logger.info(f"租户 {tenant_id} 内置工具增量初始化完成")
async def get_tool_methods(self, tool_id: str, tenant_id: uuid.UUID) -> Optional[List[Dict[str, Any]]]:
"""获取工具的所有方法
@@ -458,6 +460,9 @@ class ToolService:
# 对于json_tool根据操作类型返回相关参数
elif hasattr(tool_instance, 'name') and tool_instance.name == 'json_tool':
return self._get_json_tool_params(operation)
# 对于openclaw_tool根据操作类型返回不同描述的参数
elif hasattr(tool_instance, 'name') and tool_instance.name == 'openclaw_tool':
return self._get_openclaw_tool_params(operation)
# 其他工具的默认处理返回除operation外的所有参数
return [{
@@ -574,6 +579,29 @@ class ToolService:
"default": "Asia/Shanghai"
}
]
elif operation == "datetime_to_timestamp":
return [
{
"name": "input_value",
"type": "string",
"description": "输入值时间字符串2026-04-07 10:30:25",
"required": True
},
{
"name": "input_format",
"type": "string",
"description": "输入时间格式(如:%Y-%m-%d %H:%M:%S",
"required": False,
"default": "%Y-%m-%d %H:%M:%S"
},
{
"name": "from_timezone",
"type": "string",
"description": "源时区UTC, Asia/Shanghai",
"required": False,
"default": "Asia/Shanghai"
}
]
else:
# 默认返回所有参数除了operation
return [
@@ -687,6 +715,65 @@ class ToolService:
return base_params
@staticmethod
def _get_openclaw_tool_params(operation: str) -> List[Dict[str, Any]]:
"""获取 openclaw_tool 特定操作的参数"""
if operation == "print_task":
return [
{
"name": "message",
"type": "string",
"description": "发送给 OpenClaw 的打印任务描述,将用户的原始消息原封不动地传递给 OpenClaw禁止改写、补充或润色用户的原文",
"required": True
},
{
"name": "image_url",
"type": "string",
"description": "可选附带的设计图片或参考图OpenClaw 可据此生成 3D 模型",
"required": False
}
]
elif operation == "device_query":
return [
{
"name": "message",
"type": "string",
"description": "发送给 OpenClaw 的设备查询指令",
"required": True
}
]
elif operation == "image_understand":
return [
{
"name": "message",
"type": "string",
"description": "发送给 OpenClaw 的图片理解任务,应描述需要对图片做什么(如描述内容、提取文字、分析信息)",
"required": True
},
{
"name": "image_url",
"type": "string",
"description": "要分析的图片 URL 或 base64 data URI",
"required": False
}
]
else:
# general 及其他
return [
{
"name": "message",
"type": "string",
"description": "发送给 OpenClaw Agent 的任务描述,应包含完整的任务需求",
"required": True
},
{
"name": "image_url",
"type": "string",
"description": "可选,附带的图片 URL 或 base64 data URI",
"required": False
}
]
async def _get_custom_tool_methods(self, config: ToolConfig) -> List[Dict[str, Any]]:
"""获取自定义工具的方法"""
custom_config = self.custom_repo.find_by_tool_id(self.db, config.id)

View File

@@ -14,6 +14,7 @@ from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.core.logging_config import get_logger
from app.core.memory.storage_services.extraction_engine.deduplication.deduped_and_disamb import _USER_PLACEHOLDER_NAMES
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from app.repositories.conversation_repository import ConversationRepository
@@ -21,7 +22,7 @@ from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.cypher_queries import Graph_Node_query
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping
from app.services.memory_base_service import MemoryBaseService
from app.services.memory_base_service import MemoryBaseService, MIN_MEMORY_SUMMARY_COUNT
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.services.memory_short_service import ShortService
@@ -398,12 +399,16 @@ class UserMemoryService:
}
# 构建响应数据(转换时间为毫秒时间戳)
# 将 meta_data 中的 profile、knowledge_tags、behavioral_hints 平铺到顶层
meta = end_user_info_record.meta_data or {}
response_data = {
"end_user_info_id": str(end_user_info_record.id),
"end_user_id": str(end_user_info_record.end_user_id),
"other_name": end_user_info_record.other_name,
"aliases": end_user_info_record.aliases,
"meta_data": end_user_info_record.meta_data,
"profile": meta.get("profile"),
"knowledge_tags": meta.get("knowledge_tags"),
"behavioral_hints": meta.get("behavioral_hints"),
"created_at": datetime_to_timestamp(end_user_info_record.created_at),
"updated_at": datetime_to_timestamp(end_user_info_record.updated_at)
}
@@ -473,7 +478,7 @@ class UserMemoryService:
allowed_fields = {'other_name', 'aliases', 'meta_data'}
# 用户占位名称黑名单,不允许作为 other_name 或出现在 aliases 中
_user_placeholder_names = {'用户', '', 'User', 'I'}
_user_placeholder_names = _USER_PLACEHOLDER_NAMES
# 过滤 other_name不允许设置为占位名称
if 'other_name' in update_data and update_data['other_name'] and update_data['other_name'].strip() in _user_placeholder_names:
@@ -1500,7 +1505,7 @@ async def analytics_memory_types(
2. 工作记忆 (WORKING_MEMORY) = 会话数量(通过 ConversationRepository.get_conversation_by_user_id 获取)
3. 短期记忆 (SHORT_TERM_MEMORY) = /short_term 接口返回的问答对数量
4. 显性记忆 (EXPLICIT_MEMORY) = 情景记忆 + 语义记忆(通过 MemoryBaseService.get_explicit_memory_count 获取)
5. 隐性记忆 (IMPLICIT_MEMORY) = Statement 节点数量的三分之一
5. 隐性记忆 (IMPLICIT_MEMORY) = MemorySummary 节点数量(需 >= MIN_MEMORY_SUMMARY_COUNT 才显示,否则为 0
6. 情绪记忆 (EMOTIONAL_MEMORY) = 情绪标签统计总数(通过 MemoryBaseService.get_emotional_memory_count 获取)
7. 情景记忆 (EPISODIC_MEMORY) = memory_summary通过 MemoryBaseService.get_episodic_memory_count 获取)
8. 遗忘记忆 (FORGET_MEMORY) = 激活值低于阈值的节点数(通过 MemoryBaseService.get_forget_memory_count 获取)
@@ -1557,23 +1562,15 @@ async def analytics_memory_types(
logger.warning(f"获取会话数量失败工作记忆数量设为0: {str(e)}")
work_count = 0
# 获取隐性记忆数量(基于 Statement 节点数量的三分之一
# 获取隐性记忆数量(基于有关联关系的 MemorySummary 节点数量,需 >= MIN_MEMORY_SUMMARY_COUNT 才计入
implicit_count = 0
if end_user_id:
try:
# 查询 Statement 节点数量
query = """
MATCH (n:Statement)
WHERE n.end_user_id = $end_user_id
RETURN count(n) as count
"""
result = await _neo4j_connector.execute_query(query, end_user_id=end_user_id)
statement_count = result[0]["count"] if result and len(result) > 0 else 0
# 取三分之一作为隐性记忆数量
implicit_count = round(statement_count / 3)
logger.debug(f"隐性记忆数量Statement数量的1/3: {implicit_count} (Statement总数={statement_count}, end_user_id={end_user_id})")
memory_summary_count = await base_service.get_valid_memory_summary_count(end_user_id)
implicit_count = memory_summary_count if memory_summary_count >= MIN_MEMORY_SUMMARY_COUNT else 0
logger.debug(f"隐性记忆数量有效MemorySummary节点数: {implicit_count} (有效MemorySummary总数={memory_summary_count}, end_user_id={end_user_id})")
except Exception as e:
logger.warning(f"获取Statement数量失败隐性记忆数量设为0: {str(e)}")
logger.warning(f"获取MemorySummary数量失败隐性记忆数量设为0: {str(e)}")
implicit_count = 0
# 原有的基于行为习惯的统计方式(已注释)
@@ -1639,7 +1636,7 @@ async def analytics_memory_types(
"WORKING_MEMORY": work_count, # 工作记忆(基于会话数量)
"SHORT_TERM_MEMORY": short_term_count, # 短期记忆(基于问答对数量)
"EXPLICIT_MEMORY": explicit_count, # 显性记忆(情景记忆 + 语义记忆)
"IMPLICIT_MEMORY": implicit_count, # 隐性记忆(Statement数量的1/3
"IMPLICIT_MEMORY": implicit_count, # 隐性记忆(MemorySummary节点数需>=MIN_MEMORY_SUMMARY_COUNT
"EMOTIONAL_MEMORY": emotion_count, # 情绪记忆(使用情绪标签统计)
"EPISODIC_MEMORY": episodic_count, # 情景记忆
"FORGET_MEMORY": forget_count # 遗忘记忆(激活值低于阈值)

View File

@@ -285,7 +285,7 @@ def activate_user(db: Session, user_id_to_activate: uuid.UUID, current_user: Use
try:
# 查找用户
business_logger.debug(f"查找待激活用户: {user_id_to_activate}")
db_user = user_repository.get_user_by_id(db, user_id=user_id_to_activate)
db_user = user_repository.get_user_by_id_regardless_active(db, user_id=user_id_to_activate)
if not db_user:
business_logger.warning(f"用户不存在: {user_id_to_activate}")
raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND)

View File

@@ -69,6 +69,7 @@ class WorkflowImportService:
edges=workflow_config.edges,
nodes=workflow_config.nodes,
variables=workflow_config.variables,
features=workflow_config.features,
warnings=workflow_config.warnings,
errors=workflow_config.errors
)
@@ -95,7 +96,8 @@ class WorkflowImportService:
workflow_config=WorkflowConfigCreate(
nodes=config["nodes"],
edges=config["edges"],
variables=config["variables"]
variables=config["variables"],
features=config.get("features", {})
)
)
)

View File

@@ -16,7 +16,6 @@ from app.core.workflow.adapters.registry import PlatformAdapterRegistry
from app.core.workflow.executor import execute_workflow, execute_workflow_stream
from app.core.workflow.nodes.enums import NodeType
from app.core.workflow.validator import validate_workflow_config
from app.core.workflow.variable.base_variable import FileObject
from app.db import get_db
from app.models import App
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
@@ -26,7 +25,7 @@ from app.repositories.workflow_repository import (
WorkflowExecutionRepository,
WorkflowNodeExecutionRepository
)
from app.schemas import DraftRunRequest, FileInput
from app.schemas import DraftRunRequest, FileInput, FileType
from app.services.conversation_service import ConversationService
from app.services.multi_agent_service import convert_uuids_to_str
from app.services.multimodal_service import MultimodalService
@@ -453,22 +452,70 @@ class WorkflowService:
"success_rate": completed / total if total > 0 else 0
}
async def _resolve_variables_file_defaults(
self,
variables: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Convert FileInput-format defaults in workflow variables to full FileObject dicts."""
from app.core.workflow.utils.file_processor import (
resolve_local_file_object_dict,
fetch_remote_file_meta,
)
async def _resolve_one(item: dict) -> dict | None:
if not isinstance(item, dict) or item.get("is_file"):
return item
transfer_method = item.get("transfer_method", "remote_url")
file_type = FileType.trans(item.get("type", "document"))
origin_file_type = item.get("file_type") or file_type
if transfer_method == "remote_url":
url = item.get("url", "")
return await fetch_remote_file_meta(url, file_type, origin_file_type) if url else None
else:
return resolve_local_file_object_dict(self.db, item.get("upload_file_id"), file_type, origin_file_type)
result = []
for var_def in variables:
var_type = var_def.get("type", "")
default = var_def.get("default")
if var_type == "file" and isinstance(default, dict) and not default.get("is_file"):
var_def = {**var_def, "default": await _resolve_one(default)}
elif var_type == "array[file]" and isinstance(default, list):
resolved = []
for item in default:
r = await _resolve_one(item)
if r is not None:
resolved.append(r)
var_def = {**var_def, "default": resolved}
result.append(var_def)
return result
async def _handle_file_input(self, files: list[FileInput]):
if not files:
return []
from app.core.workflow.utils.file_processor import (
resolve_local_file_object_dict,
build_file_object_dict_from_meta,
fetch_remote_file_meta,
)
files_struct = []
for file in files:
files_struct.append(
FileObject(
type=file.type,
url=await self.multimodal_service.get_file_url(file),
transfer_method=file.transfer_method,
file_id=str(file.upload_file_id) if file.upload_file_id else None,
origin_file_type=file.file_type,
is_file=True
).model_dump()
)
url = await self.multimodal_service.get_file_url(file)
file_type = str(file.type)
origin_file_type = file.file_type or file_type
if file.transfer_method.value == "local_file" and file.upload_file_id:
fo = resolve_local_file_object_dict(self.db, file.upload_file_id, file_type, origin_file_type)
files_struct.append(fo or build_file_object_dict_from_meta(
file_type=file_type, transfer_method="local_file",
origin_file_type=origin_file_type,
file_id=str(file.upload_file_id), url=url,
file_name=None, file_size=None, file_ext=None, content_type=None,
))
else:
files_struct.append(await fetch_remote_file_meta(url, file_type, origin_file_type))
return files_struct
@staticmethod
@@ -545,6 +592,12 @@ class WorkflowService:
def _get_memory_store_info(self, workspace_id: uuid.UUID) -> tuple[str, str]:
storage_type = get_workspace_storage_type_without_auth(self.db, workspace_id)
user_rag_memory_id = ""
# 如果 storage_type 为 None使用默认值 'neo4j'
if not storage_type:
storage_type = 'neo4j'
logger.warning(
f"Storage type not set for workspace {workspace_id}, using default: neo4j"
)
if storage_type == "rag":
knowledge = knowledge_repository.get_knowledge_by_name(
db=self.db,
@@ -659,6 +712,26 @@ class WorkflowService:
input_data["conv_messages"] = conv_messages
init_message_length = len(input_data.get("conv_messages", []))
# 新会话时写入开场白
is_new_conversation = init_message_length == 0
if is_new_conversation:
opening_cfg = feature_configs.get("opening_statement", {})
if isinstance(opening_cfg, dict) and opening_cfg.get("enabled") and opening_cfg.get("statement"):
statement = opening_cfg["statement"]
suggested_questions = opening_cfg.get("suggested_questions", [])
if payload.variables:
for var_name, var_value in payload.variables.items():
statement = statement.replace(f"{{{{{var_name}}}}}", str(var_value))
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="assistant",
content=statement,
meta_data={"suggested_questions": suggested_questions}
)
# 注入到 conv_messages让 LLM 感知开场白
input_data["conv_messages"] = [{"role": "assistant", "content": statement}]
init_message_length = 1
result = await execute_workflow(
workflow_config=workflow_config_dict,
input_data=input_data,
@@ -696,12 +769,21 @@ class WorkflowService:
content=human_message,
meta_data=human_meta
)
# 过滤 citations
citations = result.get("citations", [])
citation_cfg = feature_configs.get("citation", {})
filtered_citations = (
citations if isinstance(citation_cfg, dict) and citation_cfg.get("enabled") else []
)
assistant_meta = {"usage": token_usage, "audio_url": None}
if filtered_citations:
assistant_meta["citations"] = filtered_citations
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage, "audio_url": None}
meta_data=assistant_meta
)
self.update_execution_status(
execution.execution_id,
@@ -720,6 +802,7 @@ class WorkflowService:
)
logger.error(f"Workflow Run Failed, execution_id: {execution.execution_id},"
f" error: {result.get('error')}")
filtered_citations = []
# 返回增强的响应结构
return {
@@ -734,7 +817,8 @@ class WorkflowService:
"conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID
"error_message": result.get("error"),
"elapsed_time": result.get("elapsed_time"),
"token_usage": result.get("token_usage")
"token_usage": result.get("token_usage"),
"citations": filtered_citations,
}
except Exception as e:
@@ -825,6 +909,27 @@ class WorkflowService:
input_data["conv_messages"] = conv_messages
init_message_length = len(input_data.get("conv_messages", []))
message_id = uuid.uuid4()
# 新会话时写入开场白
is_new_conversation = init_message_length == 0
if is_new_conversation:
opening_cfg = feature_configs.get("opening_statement", {})
if isinstance(opening_cfg, dict) and opening_cfg.get("enabled") and opening_cfg.get("statement"):
statement = opening_cfg["statement"]
suggested_questions = opening_cfg.get("suggested_questions", [])
if payload.variables:
for var_name, var_value in payload.variables.items():
statement = statement.replace(f"{{{{{var_name}}}}}", str(var_value))
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="assistant",
content=statement,
meta_data={"suggested_questions": suggested_questions}
)
# 注入到 conv_messages让 LLM 感知开场白
input_data["conv_messages"] = [{"role": "assistant", "content": statement}]
init_message_length = 1
async for event in execute_workflow_stream(
workflow_config=workflow_config_dict,
input_data=input_data,
@@ -852,7 +957,10 @@ class WorkflowService:
for file in message["content"]:
human_meta["files"].append({
"type": file.get("type"),
"url": file.get("url")
"url": file.get("url"),
"file_type": file.get("origin_file_type"),
"name": file.get("name"),
"size": file.get("size")
})
if message["role"] == "assistant":
assistant_message = message["content"]
@@ -862,12 +970,21 @@ class WorkflowService:
content=human_message,
meta_data=human_meta
)
# 过滤 citations
citations = event.get("data", {}).get("citations", [])
citation_cfg = feature_configs.get("citation", {})
filtered_citations = (
citations if isinstance(citation_cfg, dict) and citation_cfg.get("enabled") else []
)
assistant_meta = {"usage": token_usage, "audio_url": None}
if filtered_citations:
assistant_meta["citations"] = filtered_citations
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage, "audio_url": None}
meta_data=assistant_meta
)
self.update_execution_status(
execution.execution_id,
@@ -875,6 +992,7 @@ class WorkflowService:
output_data=event.get("data"),
token_usage=token_usage.get("total_tokens", None)
)
event.setdefault("data", {})["citations"] = filtered_citations
logger.info(f"Workflow Run Success, "
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
elif status == "failed":

View File

@@ -480,21 +480,21 @@ def create_workspace_invite(
try:
# 检查权限
_check_workspace_admin_permission(db, workspace_id, user)
if settings.ENABLE_SINGLE_WORKSPACE:
# 检查被邀请用户是否已经在工作空间中
from app.repositories import user_repository
invited_user = user_repository.get_user_by_email(db, invite_data.email)
# if settings.ENABLE_SINGLE_WORKSPACE:
# 检查被邀请用户是否已经在工作空间中
from app.repositories import user_repository
invited_user = user_repository.get_user_by_email(db, invite_data.email)
if invited_user:
# 用户存在,检查是否已经是工作空间成员
existing_member = workspace_repository.get_member_in_workspace(
db=db,
user_id=invited_user.id,
workspace_id=workspace_id
)
if existing_member:
business_logger.warning(f"用户 {invite_data.email} 已经是工作空间成员")
raise BusinessException("该用户已经是工作空间成员", BizCode.RESOURCE_ALREADY_EXISTS)
if invited_user:
# 用户存在,检查是否已经是工作空间成员
existing_member = workspace_repository.get_member_in_workspace(
db=db,
user_id=invited_user.id,
workspace_id=workspace_id
)
if existing_member:
business_logger.warning(f"用户 {invite_data.email} 已经是工作空间成员")
raise BusinessException("该用户已经是工作空间成员", BizCode.RESOURCE_ALREADY_EXISTS)
# 检查是否已有待处理的邀请
invite_repo = WorkspaceInviteRepository(db)