diff --git a/api/app/controllers/__init__.py b/api/app/controllers/__init__.py
index bcf7614f..a45c701f 100644
--- a/api/app/controllers/__init__.py
+++ b/api/app/controllers/__init__.py
@@ -40,7 +40,8 @@ from . import (
workspace_controller,
memory_forget_controller,
home_page_controller,
- memory_perceptual_controller
+ memory_perceptual_controller,
+ memory_working_controller,
)
# 创建管理端 API 路由器
@@ -82,5 +83,6 @@ manager_router.include_router(memory_forget_controller.router)
manager_router.include_router(home_page_controller.router)
manager_router.include_router(implicit_memory_controller.router)
manager_router.include_router(memory_perceptual_controller.router)
+manager_router.include_router(memory_working_controller.router)
__all__ = ["manager_router"]
diff --git a/api/app/controllers/memory_working_controller.py b/api/app/controllers/memory_working_controller.py
new file mode 100644
index 00000000..cdd90de9
--- /dev/null
+++ b/api/app/controllers/memory_working_controller.py
@@ -0,0 +1,123 @@
+import uuid
+
+from fastapi import APIRouter, Depends
+from sqlalchemy.orm import Session
+from app.core.response_utils import success, fail
+
+from app.core.logging_config import get_api_logger
+from app.db import get_db
+from app.dependencies import get_current_user
+from app.models import User
+from app.schemas.response_schema import ApiResponse
+from app.services import conversation_service
+from app.services.conversation_service import ConversationService
+
+api_logger = get_api_logger()
+
+router = APIRouter(
+ prefix="/memory/work",
+ tags=["Perceptual Memory System"],
+ dependencies=[Depends(get_current_user)]
+)
+
+
+@router.get("/{group_id}/count", response_model=ApiResponse)
+def get_memory_count(
+ group_id: uuid.UUID,
+ current_user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ pass
+
+
+@router.get("/{group_id}/conversations", response_model=ApiResponse)
+def get_conversations(
+ group_id: uuid.UUID,
+ current_user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """
+ Retrieve all conversations for the current user in a specific group.
+
+ Args:
+ group_id (UUID): The group identifier.
+ current_user (User, optional): The authenticated user.
+ db (Session, optional): SQLAlchemy session.
+
+ Returns:
+ ApiResponse: Contains a list of conversation IDs.
+
+ Notes:
+ - Initializes the ConversationService with the current DB session.
+ - Returns only conversation IDs for lightweight response.
+ - Logs can be added to trace requests in production.
+ """
+ conversation_service = ConversationService(db)
+ conversations = conversation_service.get_user_conversations(
+ group_id,
+ current_user.current_workspace_id
+ )
+ return success(data=[conversation.id for conversation in conversations], msg="get conversations success")
+
+
+@router.get("/{group_id}/messages", response_model=ApiResponse)
+def get_messages(
+ conversation_id: uuid.UUID,
+ current_user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """
+ Retrieve the message history for a specific conversation.
+
+ Args:
+ conversation_id (UUID): The ID of the conversation to fetch messages from.
+ current_user (User, optional): The authenticated user.
+ db (Session, optional): SQLAlchemy session.
+
+ Returns:
+ ApiResponse: Contains the list of messages in the conversation.
+
+ Notes:
+ - Uses ConversationService to fetch messages.
+ - Consider paginating results if message history is large.
+ - Logging can be added for audit and debugging.
+ """
+ conversation_service = ConversationService(db)
+ messages = conversation_service.get_conversation_history(
+ conversation_id,
+ )
+ return success(data=messages, msg="get conversation history success")
+
+
+@router.get("/{group_id}/detail", response_model=ApiResponse)
+async def get_conversation_detail(
+ conversation_id: uuid.UUID,
+ current_user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """
+ Retrieve detailed information about a specific conversation.
+
+ This endpoint will fetch the conversation detail for the user. If the detail
+ does not exist or is outdated, it will trigger the LLM to generate a new summary.
+
+ Args:
+ conversation_id (UUID): The ID of the conversation.
+ current_user (User, optional): The authenticated user making the request.
+ db (Session, optional): SQLAlchemy session.
+
+ Returns:
+ ApiResponse: Contains the conversation detail serialized as a dictionary.
+
+ Notes:
+ - Uses async ConversationService to fetch or generate the conversation detail.
+ - Handles workspace and user-specific context automatically.
+ - Logging and exception handling should be implemented for production monitoring.
+ """
+ conversation_service = ConversationService(db)
+ detail = await conversation_service.get_conversation_detail(
+ user=current_user,
+ conversation_id=conversation_id,
+ workspace_id=current_user.current_workspace_id
+ )
+ return success(data=detail.model_dump(), msg="get conversation detail success")
diff --git a/api/app/core/workflow/nodes/assigner/config.py b/api/app/core/workflow/nodes/assigner/config.py
index 092f0b51..dd8a460e 100644
--- a/api/app/core/workflow/nodes/assigner/config.py
+++ b/api/app/core/workflow/nodes/assigner/config.py
@@ -22,7 +22,7 @@ class AssignmentItem(BaseModel):
)
value: Any = Field(
- ...,
+ default=None,
description="Value(s) to assign to the variable(s)",
)
diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py
index a1ec2e1d..e7007884 100644
--- a/api/app/core/workflow/nodes/base_node.py
+++ b/api/app/core/workflow/nodes/base_node.py
@@ -534,7 +534,7 @@ class BaseNode(ABC):
return edge
return None
- def _render_template(self, template: str, state: WorkflowState | None) -> str:
+ def _render_template(self, template: str, state: WorkflowState | None, struct: bool = True) -> str:
"""渲染模板
支持的变量命名空间:
@@ -568,7 +568,8 @@ class BaseNode(ABC):
template=template,
variables=variables,
node_outputs=pool.get_all_node_outputs(),
- system_vars=pool.get_all_system_vars()
+ system_vars=pool.get_all_system_vars(),
+ struct=struct
)
def _evaluate_condition(self, expression: str, state: WorkflowState | None) -> bool:
diff --git a/api/app/core/workflow/nodes/cycle_graph/config.py b/api/app/core/workflow/nodes/cycle_graph/config.py
index 2e57536e..445ddd9a 100644
--- a/api/app/core/workflow/nodes/cycle_graph/config.py
+++ b/api/app/core/workflow/nodes/cycle_graph/config.py
@@ -55,7 +55,7 @@ class ConditionDetail(BaseModel):
)
input_type: ValueInputType = Field(
- ...,
+ default=ValueInputType.CONSTANT,
description="Input type of the loop variable"
)
diff --git a/api/app/core/workflow/nodes/end/node.py b/api/app/core/workflow/nodes/end/node.py
index efc62dc5..6230345c 100644
--- a/api/app/core/workflow/nodes/end/node.py
+++ b/api/app/core/workflow/nodes/end/node.py
@@ -37,7 +37,7 @@ class EndNode(BaseNode):
# 如果配置了输出模板,使用模板渲染;否则使用默认输出
if output_template:
- output = self._render_template(output_template, state)
+ output = self._render_template(output_template, state, struct=False)
else:
output = "工作流已完成"
diff --git a/api/app/core/workflow/nodes/if_else/config.py b/api/app/core/workflow/nodes/if_else/config.py
index 90b4bcde..3e5ea22a 100644
--- a/api/app/core/workflow/nodes/if_else/config.py
+++ b/api/app/core/workflow/nodes/if_else/config.py
@@ -23,7 +23,7 @@ class ConditionDetail(BaseModel):
)
input_type: ValueInputType = Field(
- ...,
+ default=ValueInputType.CONSTANT,
description="Value input type for comparison"
)
diff --git a/api/app/core/workflow/nodes/if_else/node.py b/api/app/core/workflow/nodes/if_else/node.py
index fd5864a8..8c6d222f 100644
--- a/api/app/core/workflow/nodes/if_else/node.py
+++ b/api/app/core/workflow/nodes/if_else/node.py
@@ -71,7 +71,10 @@ class IfElseNode(BaseNode):
for expression in case_branch.expressions:
pattern = r"\{\{\s*(.*?)\s*\}\}"
left_string = re.sub(pattern, r"\1", expression.left).strip()
- left_value = self.get_variable(left_string, state)
+ try:
+ left_value = self.get_variable(left_string, state)
+ except KeyError:
+ left_value = None
evaluator = ConditionExpressionResolver.resolve_by_value(left_value)(
self.get_variable_pool(state),
expression.left,
diff --git a/api/app/core/workflow/nodes/knowledge/node.py b/api/app/core/workflow/nodes/knowledge/node.py
index d9caae7e..5a6b2a7f 100644
--- a/api/app/core/workflow/nodes/knowledge/node.py
+++ b/api/app/core/workflow/nodes/knowledge/node.py
@@ -205,10 +205,14 @@ class KnowledgeRetrievalNode(BaseNode):
score_threshold=kb_config.similarity_threshold)
# Deduplicate hy brid retrieval results
unique_rs = self._deduplicate_docs(rs1, rs2)
+ if not unique_rs:
+ continue
vector_service.reranker = self.get_reranker_model()
rs.extend(vector_service.rerank(query=query, docs=unique_rs, top_k=kb_config.top_k))
case _:
raise RuntimeError("Unknown retrieval type")
+ if not rs:
+ return []
vector_service.reranker = self.get_reranker_model()
# TODO:其他重排序方式支持
final_rs = vector_service.rerank(query=query, docs=rs, top_k=self.typed_config.reranker_top_k)
diff --git a/api/app/core/workflow/nodes/llm/node.py b/api/app/core/workflow/nodes/llm/node.py
index 334229f7..5fb86ae2 100644
--- a/api/app/core/workflow/nodes/llm/node.py
+++ b/api/app/core/workflow/nodes/llm/node.py
@@ -65,11 +65,12 @@ class LLMNode(BaseNode):
- user/human: 用户消息(HumanMessage)
- ai/assistant: AI 消息(AIMessage)
"""
+
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]):
super().__init__(node_config, workflow_config)
self.typed_config = LLMNodeConfig(**self.config)
- def _render_context(self, message,state):
+ def _render_context(self, message, state):
context = f"{self._render_template(self.typed_config.context, state)}"
return re.sub(r"{{context}}", context, message)
diff --git a/api/app/core/workflow/nodes/operators.py b/api/app/core/workflow/nodes/operators.py
index fc856aee..ab6ad3e1 100644
--- a/api/app/core/workflow/nodes/operators.py
+++ b/api/app/core/workflow/nodes/operators.py
@@ -387,6 +387,11 @@ class ArrayComparisonOperator(ConditionBase):
return self.right_value not in self.left_value
+class NoneObjectComparisonOperator(ConditionBase):
+ def __getattr__(self, name):
+ return lambda *args, **kwargs: False
+
+
CompareOperatorInstance = Union[
StringComparisonOperator,
NumberComparisonOperator,
@@ -405,6 +410,7 @@ class ConditionExpressionResolver:
float: NumberComparisonOperator,
list: ArrayComparisonOperator,
dict: ObjectComparisonOperator,
+ type(None): NoneObjectComparisonOperator
}
@classmethod
diff --git a/api/app/core/workflow/template_renderer.py b/api/app/core/workflow/template_renderer.py
index df6053b0..198a3322 100644
--- a/api/app/core/workflow/template_renderer.py
+++ b/api/app/core/workflow/template_renderer.py
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class TemplateRenderer:
"""模板渲染器"""
-
+
def __init__(self, strict: bool = True):
"""初始化渲染器
@@ -25,13 +25,13 @@ class TemplateRenderer:
undefined=StrictUndefined if strict else Undefined,
autoescape=False # 不自动转义,因为我们处理的是文本而非 HTML
)
-
+
def render(
- self,
- template: str,
- variables: dict[str, Any],
- node_outputs: dict[str, Any],
- system_vars: dict[str, Any] | None = None
+ self,
+ template: str,
+ variables: dict[str, Any],
+ node_outputs: dict[str, Any],
+ system_vars: dict[str, Any] | None = None
) -> str:
"""渲染模板
@@ -69,40 +69,40 @@ class TemplateRenderer:
# variables 的结构:{"sys": {...}, "conv": {...}}
sys_vars = variables.get("sys", {}) if isinstance(variables, dict) else {}
conv_vars = variables.get("conv", {}) if isinstance(variables, dict) else {}
-
+
context = {
- "conv": conv_vars, # 会话变量:{{conv.user_name}}
- "node": node_outputs, # 节点输出:{{node.node_1.output}}
+ "conv": conv_vars, # 会话变量:{{conv.user_name}}
+ "node": node_outputs, # 节点输出:{{node.node_1.output}}
"sys": {**(system_vars or {}), **sys_vars}, # 系统变量:{{sys.execution_id}}(合并两个来源)
}
-
+
# 支持直接通过节点ID访问节点输出:{{llm_qa.output}}
# 将所有节点输出添加到顶层上下文
if node_outputs:
context.update(node_outputs)
-
+
# 支持直接访问会话变量(不需要 conv. 前缀):{{user_name}}
if conv_vars:
context.update(conv_vars)
-
+
context["nodes"] = node_outputs or {} # 旧语法兼容
-
+
try:
tmpl = self.env.from_string(template)
return tmpl.render(**context)
-
+
except TemplateSyntaxError as e:
logger.error(f"模板语法错误: {template}, 错误: {e}")
raise ValueError(f"模板语法错误: {e}")
-
+
except UndefinedError as e:
logger.error(f"模板中引用了未定义的变量: {template}, 错误: {e}")
raise ValueError(f"未定义的变量: {e}")
-
+
except Exception as e:
logger.error(f"模板渲染异常: {template}, 错误: {e}")
raise ValueError(f"模板渲染失败: {e}")
-
+
def validate(self, template: str) -> list[str]:
"""验证模板语法
@@ -121,14 +121,14 @@ class TemplateRenderer:
['模板语法错误: ...']
"""
errors = []
-
+
try:
self.env.from_string(template)
except TemplateSyntaxError as e:
errors.append(f"模板语法错误: {e}")
except Exception as e:
errors.append(f"模板验证失败: {e}")
-
+
return errors
@@ -137,14 +137,16 @@ _default_renderer = TemplateRenderer(strict=True)
def render_template(
- template: str,
- variables: dict[str, Any],
- node_outputs: dict[str, Any],
- system_vars: dict[str, Any] | None = None
+ template: str,
+ variables: dict[str, Any],
+ node_outputs: dict[str, Any],
+ system_vars: dict[str, Any] | None = None,
+ struct: bool = True
) -> str:
"""渲染模板(便捷函数)
Args:
+ struct: 渲染模式
template: 模板字符串
variables: 用户变量
node_outputs: 节点输出
@@ -162,7 +164,8 @@ def render_template(
... )
'请分析: 这是一段文本'
"""
- return _default_renderer.render(template, variables, node_outputs, system_vars)
+ renderer = TemplateRenderer(strict=struct)
+ return renderer.render(template, variables, node_outputs, system_vars)
def validate_template(template: str) -> list[str]:
diff --git a/api/app/models/conversation_model.py b/api/app/models/conversation_model.py
index e7f9e8c4..9e222610 100644
--- a/api/app/models/conversation_model.py
+++ b/api/app/models/conversation_model.py
@@ -3,6 +3,7 @@
"""
import uuid
import datetime
+
from sqlalchemy import Column, String, DateTime, ForeignKey, Boolean, Integer, Text, JSON
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
@@ -25,56 +26,69 @@ class Conversation(Base):
__tablename__ = "conversations"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
-
+
# 关联信息
app_id = Column(UUID(as_uuid=True), ForeignKey("apps.id"), nullable=False, comment="应用ID")
workspace_id = Column(UUID(as_uuid=True), ForeignKey("workspaces.id"), nullable=False, comment="工作空间ID")
user_id = Column(String, nullable=True, comment="用户ID(外部系统)")
-
+
# 会话信息
title = Column(String(255), comment="会话标题")
summary = Column(Text, comment="会话摘要")
-
+
# 会话类型:True=草稿会话(使用草稿配置),False=发布会话(使用发布配置)
is_draft = Column(Boolean, default=True, nullable=False, comment="是否为草稿会话")
-
+
# 配置快照:保存创建会话时的完整配置,用于审计和问题追溯
config_snapshot = Column(JSON, comment="配置快照(Agent配置、模型配置等)")
-
+
# 统计信息
message_count = Column(Integer, default=0, comment="消息数量")
-
+
# 状态
is_active = Column(Boolean, default=True, nullable=False, comment="是否活跃")
-
+
# 时间戳
created_at = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now, comment="更新时间")
-
+
# 关联关系
app = relationship("App", back_populates="conversations")
workspace = relationship("Workspace")
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
+class ConversationDetail(Base):
+ __tablename__ = "conversation_details"
+
+ id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
+ conversation_id = Column(UUID(as_uuid=True), ForeignKey("conversations.id"))
+
+ theme = Column(String, comment="会话主题")
+ theme_intro = Column(String, comment="主题介绍")
+ summary = Column(String, comment="会话摘要")
+ takeaways = Column(JSON, comment="会话要点")
+ info_score = Column(Integer, comment="会话信息量评分")
+
+
class Message(Base):
"""消息表"""
__tablename__ = "messages"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
-
+
# 关联信息
conversation_id = Column(UUID(as_uuid=True), ForeignKey("conversations.id"), nullable=False, comment="会话ID")
-
+
# 消息内容
role = Column(String(20), nullable=False, comment="角色: user/assistant/system")
content = Column(Text, nullable=False, comment="消息内容")
-
+
# 元数据(避免使用 metadata 保留字)
meta_data = Column(JSON, comment="消息元数据(如模型、token使用等)")
-
+
# 时间戳
created_at = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
-
+
# 关联关系
conversation = relationship("Conversation", back_populates="messages")
diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py
new file mode 100644
index 00000000..eb5d3c61
--- /dev/null
+++ b/api/app/repositories/conversation_repository.py
@@ -0,0 +1,317 @@
+import uuid
+from typing import Optional
+
+from sqlalchemy import select, desc, func
+from sqlalchemy.orm import Session
+
+from app.core.exceptions import ResourceNotFoundException
+from app.core.logging_config import get_db_logger
+from app.models import Conversation, Message
+from app.models.conversation_model import ConversationDetail
+
+logger = get_db_logger()
+
+
+class ConversationRepository:
+ """Repository for Conversation entity, encapsulating CRUD operations."""
+
+ def __init__(self, db: Session):
+ self.db = db
+
+ def create_conversation(
+ self,
+ app_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ user_id: Optional[str] = None,
+ title: Optional[str] = None,
+ is_draft: bool = False,
+ config_snapshot: Optional[dict] = None
+ ) -> Conversation:
+ """
+ Create a new conversation record.
+
+ Args:
+ app_id: Application ID the conversation belongs to.
+ workspace_id: Workspace ID where the conversation is created.
+ user_id: Optional user ID associated with the conversation.
+ title: Optional conversation title. Defaults to "New Conversation".
+ is_draft: Whether the conversation is a draft.
+ config_snapshot: Optional configuration snapshot.
+
+ Returns:
+ Conversation: Newly created Conversation instance.
+ """
+ conversation = Conversation(
+ app_id=app_id,
+ workspace_id=workspace_id,
+ user_id=user_id,
+ title=title or "New Conversation",
+ is_draft=is_draft,
+ config_snapshot=config_snapshot
+ )
+ self.db.add(conversation)
+ return conversation
+
+ def get_conversation_by_conversation_id(
+ self,
+ conversation_id: uuid.UUID,
+ workspace_id: Optional[uuid.UUID] = None
+ ) -> Conversation:
+ """
+ Retrieve a conversation by its ID, optionally filtered by workspace.
+
+ Args:
+ conversation_id: The UUID of the conversation.
+ workspace_id: Optional workspace UUID to filter the conversation.
+
+ Raises:
+ ResourceNotFoundException: If conversation does not exist.
+
+ Returns:
+ Conversation: The matching Conversation instance.
+ """
+ logger.info(f"Fetching conversation: {conversation_id}")
+
+ stmt = select(Conversation).where(Conversation.id == conversation_id)
+
+ if workspace_id:
+ stmt = stmt.where(Conversation.workspace_id == workspace_id)
+
+ conversation = self.db.scalars(stmt).first()
+
+ if not conversation:
+ logger.warning(f"Conversation not found: {conversation_id}")
+ raise ResourceNotFoundException("Conversation", str(conversation_id))
+
+ logger.info(f"Conversation fetched successfully: {conversation_id}")
+ return conversation
+
+ def get_conversation_by_user_id(
+ self,
+ user_id: uuid.UUID,
+ workspace_id: uuid.UUID = None,
+ limit: int = 10,
+ is_activate: bool = True
+ ) -> list[Conversation]:
+ """
+ Retrieve recent conversations for a specific user.
+
+ This method queries conversations associated with the given user ID,
+ optionally scoped to a specific workspace. Results are ordered by the
+ most recently updated conversations and limited to a fixed number.
+
+ Args:
+ user_id (uuid.UUID): Unique identifier of the user.
+ workspace_id (uuid.UUID, optional): Workspace scope for the query.
+ If provided, only conversations under this workspace will be returned.
+ limit (int): Maximum number of conversations to return.
+ Defaults to 10.
+ is_activate (bool): Convsersation State limit
+
+ Returns:
+ list[Conversation]: A list of conversation entities ordered by
+ last updated time (descending).
+ """
+ logger.info(f"Fetching conversation by user_id: {user_id}")
+
+ stmt = select(Conversation).where(
+ Conversation.user_id == str(user_id),
+ Conversation.is_active.is_(is_activate)
+ )
+
+ if workspace_id:
+ stmt = stmt.where(Conversation.workspace_id == workspace_id)
+
+ stmt = stmt.order_by(desc(Conversation.updated_at))
+ stmt = stmt.limit(limit)
+
+ convsersations = list(self.db.scalars(stmt).all())
+ logger.info(
+ "Conversation fetched successfully",
+ extra={
+ "user_id": str(user_id),
+ "workspace_id": str(workspace_id),
+ }
+ )
+ return convsersations
+
+ def list_conversations(
+ self,
+ app_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ user_id: Optional[str] = None,
+ is_draft: Optional[bool] = None,
+ page: int = 1,
+ pagesize: int = 20
+ ) -> tuple[list[Conversation], int]:
+ """
+ List conversations with optional filters and pagination.
+
+ Args:
+ app_id: Application ID filter.
+ workspace_id: Workspace ID filter.
+ user_id: Optional user ID filter.
+ is_draft: Optional draft status filter.
+ page: Page number (1-based).
+ pagesize: Number of items per page.
+
+ Returns:
+ Tuple[List[Conversation], int]: List of Conversation instances and total count.
+ """
+ stmt = select(Conversation).where(
+ Conversation.app_id == app_id,
+ Conversation.workspace_id == workspace_id,
+ Conversation.is_active.is_(True)
+ )
+
+ if user_id:
+ stmt = stmt.where(Conversation.user_id == str(user_id))
+
+ if is_draft is not None:
+ stmt = stmt.where(Conversation.is_draft == is_draft)
+
+ # Calculate total number of records
+ total = int(self.db.execute(
+ select(func.count()).select_from(stmt.subquery())
+ ).scalar_one())
+
+ # Apply pagination
+ stmt = stmt.order_by(desc(Conversation.updated_at))
+ stmt = stmt.offset((page - 1) * pagesize).limit(pagesize)
+
+ conversations = list(self.db.scalars(stmt).all())
+
+ logger.info(
+ "Listed conversations successfully",
+ extra={
+ "app_id": str(app_id),
+ "workspace_id": str(workspace_id),
+ "returned": len(conversations),
+ "total": total
+ }
+ )
+ return conversations, total
+
+ def soft_delete_conversation_by_conversation_id(
+ self,
+ conversation_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ ):
+ """
+ Soft delete a conversation by setting is_active to False.
+
+ Args:
+ conversation_id: The UUID of the conversation.
+ workspace_id: Workspace ID for verification.
+ """
+ conversation = self.get_conversation_by_conversation_id(
+ conversation_id,
+ workspace_id
+ )
+ conversation.is_active = False
+
+ def get_conversation_detail(
+ self,
+ conversation_id: uuid.UUID
+ ) -> ConversationDetail | None:
+ """
+ Retrieve the detail of a conversation by its ID.
+
+ Args:
+ conversation_id (UUID): The unique identifier of the conversation.
+
+ Returns:
+ ConversationDetail or None: The conversation detail object if found,
+ otherwise None.
+
+ Notes:
+ - This method queries the database but does not modify it.
+ - The caller is responsible for handling the case where None is returned.
+ """
+ stmt = select(ConversationDetail).where(
+ ConversationDetail.conversation_id == conversation_id
+ )
+ detail = self.db.scalars(stmt).first()
+ return detail
+
+ def add_conversation_detail(
+ self,
+ conversation_detail: ConversationDetail,
+ ):
+ """
+ Add a new conversation detail record to the database session.
+
+ Args:
+ conversation_detail (ConversationDetail): The ORM object representing
+ the conversation detail to add.
+
+ Returns:
+ ConversationDetail: The same object added to the session.
+
+ Notes:
+ - This method only adds the object to the current session.
+ - It does not commit the transaction; commit/rollback is handled
+ by the caller.
+ - Useful for batch operations or transactional control.
+ """
+ self.db.add(conversation_detail)
+ return conversation_detail
+
+
+class MessageRepository:
+ """Repository for Message entity, encapsulating CRUD operations."""
+
+ def __init__(self, db: Session):
+ self.db = db
+
+ def add_message(self, message: Message) -> Message:
+ """
+ Add a new message record to the conversation.
+
+ Args:
+ message (Message): The Message ORM object to be added.
+
+ Returns:
+ Message: The same message object added to the conversation.
+
+ Notes:
+ - This method only adds the object to the current conversation.
+ - It does not commit the transaction; commit/rollback should be handled
+ by the caller.
+ - Useful for transactional control or batch operations.
+ """
+ self.db.add(message)
+ return message
+
+ def get_message_by_conversation_id(
+ self,
+ conversation_id: uuid.UUID,
+ limit: Optional[int] = None
+ ) -> list[Message]:
+ """
+ Retrieve messages by conversation ID.
+
+ Args:
+ conversation_id: The UUID of the conversation.
+ limit: Optional limit on the number of messages returned.
+
+ Returns:
+ List[Message]: List of Message instances.
+ """
+ stmt = select(Message).where(
+ Message.conversation_id == conversation_id
+ ).order_by(Message.created_at)
+
+ if limit:
+ stmt = stmt.limit(limit)
+
+ messages = list(self.db.scalars(stmt).all())
+
+ logger.info(
+ "Fetched messages successfully",
+ extra={
+ "conversation_id": str(conversation_id),
+ "returned": len(messages)
+ }
+ )
+ return messages
diff --git a/api/app/schemas/conversation_schema.py b/api/app/schemas/conversation_schema.py
index 63db6685..e119bb4a 100644
--- a/api/app/schemas/conversation_schema.py
+++ b/api/app/schemas/conversation_schema.py
@@ -35,14 +35,14 @@ class ChatRequest(BaseModel):
class Message(BaseModel):
"""消息输出"""
model_config = ConfigDict(from_attributes=True)
-
+
id: uuid.UUID
conversation_id: uuid.UUID
role: str
content: str
meta_data: Optional[Dict[str, Any]] = None
created_at: datetime.datetime
-
+
@field_serializer("created_at", when_used="json")
def _serialize_created_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@@ -51,7 +51,7 @@ class Message(BaseModel):
class Conversation(BaseModel):
"""会话输出"""
model_config = ConfigDict(from_attributes=True)
-
+
id: uuid.UUID
app_id: uuid.UUID
workspace_id: uuid.UUID
@@ -63,11 +63,11 @@ class Conversation(BaseModel):
is_active: bool
created_at: datetime.datetime
updated_at: datetime.datetime
-
+
@field_serializer("created_at", when_used="json")
def _serialize_created_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
-
+
@field_serializer("updated_at", when_used="json")
def _serialize_updated_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@@ -84,3 +84,12 @@ class ChatResponse(BaseModel):
message: str
usage: Optional[Dict[str, Any]] = None
elapsed_time: Optional[float] = None
+
+
+# ---------- Conversation Summary Schemas ----------
+class ConversationOut(BaseModel):
+ theme: str
+ theme_intro: str
+ summary: str
+ takeaways: list[str]
+ info_score: int
diff --git a/api/app/schemas/memory_perceptual_schema.py b/api/app/schemas/memory_perceptual_schema.py
index 41b74a36..c2e4517e 100644
--- a/api/app/schemas/memory_perceptual_schema.py
+++ b/api/app/schemas/memory_perceptual_schema.py
@@ -39,10 +39,11 @@ class PerceptualMemoryItem(BaseModel):
id: uuid.UUID = Field(..., description="Unique memory ID")
perceptual_type: PerceptualType = Field(..., description="Type of perception, e.g., text, audio, or video")
file_path: str = Field(..., description="File path in the storage service")
+ file_ext: str = Field(..., description="File extension")
file_name: str = Field(..., description="File name")
- summary: Optional[str] = Field(None, description="摘要")
+ summary: Optional[str] = Field(None, description="summary")
storage_type: FileStorageType = Field(..., description="Storage type for file")
- created_time: Optional[datetime] = Field(None, description="创建时间")
+ created_time: int = Field(..., description="create time")
class Config:
from_attributes = True
@@ -114,6 +115,8 @@ class AudioModal(BaseModel):
class TextModal(BaseModel):
section_count: int
+ title: str
+ first_line: str
class Asset(BaseModel):
diff --git a/api/app/services/conversation_service.py b/api/app/services/conversation_service.py
index 122d0d87..113fe258 100644
--- a/api/app/services/conversation_service.py
+++ b/api/app/services/conversation_service.py
@@ -1,177 +1,290 @@
"""会话服务"""
import uuid
+from datetime import datetime, timedelta
from typing import Annotated
from typing import Optional, List, Tuple
+import json_repair
from fastapi import Depends
-from sqlalchemy import select, desc
+from jinja2 import Template
from sqlalchemy.orm import Session
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.core.exceptions import ResourceNotFoundException
from app.core.logging_config import get_business_logger
+from app.core.models import RedBearLLM, RedBearModelConfig
from app.db import get_db
-from app.models import Conversation, Message
+from app.models import Conversation, Message, User, ModelType
+from app.models.conversation_model import ConversationDetail
+from app.models.prompt_optimizer_model import RoleType
+from app.repositories.conversation_repository import ConversationRepository, MessageRepository
+from app.schemas.conversation_schema import ConversationOut
+from app.services import workspace_service
+from app.services.model_service import ModelConfigService
logger = get_business_logger()
class ConversationService:
- """会话服务"""
+ """
+ Service layer for managing conversations and messages.
+ Provides methods to create, retrieve, list, and manipulate conversations and messages.
+ Delegates database operations to repositories.
+ """
def __init__(self, db: Session):
self.db = db
+ self.conversation_repo = ConversationRepository(db)
+ self.message_repo = MessageRepository(db)
def create_conversation(
- self,
- app_id: uuid.UUID,
- workspace_id: uuid.UUID,
- user_id: Optional[str] = None,
- title: Optional[str] = None,
- is_draft: bool = False,
- config_snapshot: Optional[dict] = None
+ self,
+ app_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ user_id: Optional[str] = None,
+ title: Optional[str] = None,
+ is_draft: bool = False,
+ config_snapshot: Optional[dict] = None
) -> Conversation:
- """创建会话"""
- conversation = Conversation(
- app_id=app_id,
- workspace_id=workspace_id,
- user_id=user_id,
- title=title or "新会话",
- is_draft=is_draft,
- config_snapshot=config_snapshot
- )
+ """
+ Create a new conversation in the system.
- self.db.add(conversation)
- self.db.commit()
- self.db.refresh(conversation)
+ Args:
+ app_id (uuid.UUID): The application ID the conversation belongs to.
+ workspace_id (uuid.UUID): Workspace ID for context.
+ user_id (Optional[str]): Optional user ID for the conversation owner.
+ title (Optional[str]): Conversation title. Defaults to 'New Conversation' if not provided.
+ is_draft (bool): Whether the conversation is a draft.
+ config_snapshot (Optional[dict]): Optional configuration snapshot.
- logger.info(
- "创建会话成功",
- extra={
- "conversation_id": str(conversation.id),
- "app_id": str(app_id),
- "workspace_id": str(workspace_id),
- "is_draft": is_draft
- }
- )
+ Returns:
+ Conversation: Newly created Conversation instance.
+ """
+ try:
+ conversation = self.conversation_repo.create_conversation(
+ app_id=app_id,
+ workspace_id=workspace_id,
+ user_id=user_id,
+ title=title or "New Conversation",
+ is_draft=is_draft,
+ config_snapshot=config_snapshot
+ )
+ self.db.commit()
+ self.db.refresh(conversation)
+
+ logger.info(
+ "Create Conversation Success",
+ extra={
+ "conversation_id": str(conversation.id),
+ "app_id": str(app_id),
+ "workspace_id": str(workspace_id),
+ "is_draft": is_draft
+ }
+ )
+ except Exception as e:
+ logger.error(
+ f"Create Conversation Failed - {str(e)}"
+ )
+ self.db.rollback()
+ raise BusinessException(f"Error create Convsersation", code=BizCode.DB_ERROR)
return conversation
def get_conversation(
- self,
- conversation_id: uuid.UUID,
- workspace_id: Optional[uuid.UUID] = None
+ self,
+ conversation_id: uuid.UUID,
+ workspace_id: Optional[uuid.UUID] = None
) -> Conversation:
- """获取会话"""
- stmt = select(Conversation).where(Conversation.id == conversation_id)
+ """
+ Retrieve a conversation by its ID.
- if workspace_id:
- stmt = stmt.where(Conversation.workspace_id == workspace_id)
+ Args:
+ conversation_id (uuid.UUID): The conversation UUID.
+ workspace_id (Optional[uuid.UUID]): Optional workspace UUID to restrict the query.
- conversation = self.db.scalars(stmt).first()
+ Raises:
+ ResourceNotFoundException: If the conversation does not exist.
- if not conversation:
- raise ResourceNotFoundException("会话", str(conversation_id))
+ Returns:
+ Conversation: The requested Conversation instance.
+ """
+ conversation = self.conversation_repo.get_conversation_by_conversation_id(
+ conversation_id=conversation_id,
+ workspace_id=workspace_id
+ )
return conversation
- def list_conversations(
- self,
- app_id: uuid.UUID,
- workspace_id: uuid.UUID,
- user_id: Optional[str] = None,
- is_draft: Optional[bool] = None,
- page: int = 1,
- pagesize: int = 20
- ) -> Tuple[List[Conversation], int]:
- """列出会话"""
- stmt = select(Conversation).where(
- Conversation.app_id == app_id,
- Conversation.workspace_id == workspace_id,
- Conversation.is_active == True
+ def get_user_conversations(
+ self,
+ user_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ ) -> list[Conversation]:
+ """
+ Retrieve recent conversations for a specific user within a workspace.
+
+ This method delegates persistence logic to the repository layer and
+ applies service-level defaults (e.g. recent conversation limit).
+
+ Args:
+ user_id (uuid.UUID): Unique identifier of the user.
+ workspace_id (uuid.UUID): Workspace scope for the query.
+
+ Returns:
+ list[Conversation]: A list of recent conversation entities.
+ """
+ conversations = self.conversation_repo.get_conversation_by_user_id(
+ user_id,
+ workspace_id,
+ limit=10
)
+ return conversations
- if user_id:
- stmt = stmt.where(Conversation.user_id == user_id)
+ def list_conversations(
+ self,
+ app_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ user_id: Optional[str] = None,
+ is_draft: Optional[bool] = None,
+ page: int = 1,
+ pagesize: int = 20
+ ) -> Tuple[List[Conversation], int]:
+ """
+ List conversations with optional filters and pagination.
- if is_draft is not None:
- stmt = stmt.where(Conversation.is_draft == is_draft)
+ Args:
+ app_id (uuid.UUID): Application ID filter.
+ workspace_id (uuid.UUID): Workspace ID filter.
+ user_id (Optional[str]): Optional user ID filter.
+ is_draft (Optional[bool]): Optional draft status filter.
+ page (int): Page number, 1-based.
+ pagesize (int): Number of items per page.
- # 总数
- count_stmt = stmt.with_only_columns(Conversation.id)
- total = len(self.db.execute(count_stmt).all())
-
- # 分页
- stmt = stmt.order_by(desc(Conversation.updated_at))
- stmt = stmt.offset((page - 1) * pagesize).limit(pagesize)
-
- conversations = list(self.db.scalars(stmt).all())
+ Returns:
+ Tuple[List[Conversation], int]: A list of Conversation instances and the total count.
+ """
+ conversations, total = self.conversation_repo.list_conversations(
+ app_id=app_id,
+ workspace_id=workspace_id,
+ user_id=user_id,
+ is_draft=is_draft,
+ page=page,
+ pagesize=pagesize
+ )
return conversations, total
def add_message(
- self,
- conversation_id: uuid.UUID,
- role: str,
- content: str,
- meta_data: Optional[dict] = None
+ self,
+ conversation_id: uuid.UUID,
+ role: str,
+ content: str,
+ meta_data: Optional[dict] = None
) -> Message:
- """添加消息"""
- message = Message(
- conversation_id=conversation_id,
- role=role,
- content=content,
- meta_data=meta_data
- )
+ """
+ Add a message to a conversation using UnitOfWork.
- self.db.add(message)
+ Args:
+ conversation_id (uuid.UUID): Conversation UUID.
+ role (str): Role of the message sender ('user' or 'assistant').
+ content (str): Message content.
+ meta_data (Optional[dict]): Optional metadata.
- # 更新会话的消息计数和更新时间
- conversation = self.get_conversation(conversation_id)
- conversation.message_count += 1
+ Returns:
+ Message: Newly created Message instance.
+ """
+ try:
+ conversation = self.conversation_repo.get_conversation_by_conversation_id(
+ conversation_id
+ )
- # 如果是第一条用户消息,可以用它作为标题
- if conversation.message_count == 1 and role == "user":
- conversation.title = content[:50] + ("..." if len(content) > 50 else "")
+ message = Message(
+ conversation_id=conversation_id,
+ role=role,
+ content=content,
+ meta_data=meta_data,
+ )
- self.db.commit()
- self.db.refresh(message)
+ self.message_repo.add_message(message)
- return message
+ conversation.message_count += 1
+
+ if conversation.message_count == 1 and role == "user":
+ conversation.title = (
+ content[:50] + ("..." if len(content) > 50 else "")
+ )
+
+ self.db.commit()
+ self.db.refresh(message)
+
+ logger.info(
+ "Message added successfully",
+ extra={
+ "conversation_id": str(conversation_id),
+ "message_id": str(message.id),
+ "role": role,
+ "content_length": len(content),
+ },
+ )
+
+ return message
+ except Exception as e:
+ logger.error(
+ f"Message added error, db roll back - {str(e)}",
+ extra={
+ "conversation_id": str(conversation_id),
+ "role": role,
+ "content_length": len(content),
+ },
+ )
+ self.db.rollback()
+ raise BusinessException(
+ f"Error adding message, conversation_id={conversation_id}",
+ code=BizCode.DB_ERROR
+ )
def get_messages(
- self,
- conversation_id: uuid.UUID,
- limit: Optional[int] = None
+ self,
+ conversation_id: uuid.UUID,
+ limit: Optional[int] = None
) -> List[Message]:
- """获取会话消息"""
- stmt = select(Message).where(
- Message.conversation_id == conversation_id
- ).order_by(Message.created_at)
+ """
+ Retrieve messages for a conversation.
- if limit:
- stmt = stmt.limit(limit)
+ Args:
+ conversation_id (uuid.UUID): Conversation UUID.
+ limit (Optional[int]): Optional maximum number of messages.
- messages = list(self.db.scalars(stmt).all())
+ Returns:
+ List[Message]: List of messages ordered by creation time.
+ """
+ messages = self.message_repo.get_message_by_conversation_id(
+ conversation_id,
+ limit
+ )
return messages
def get_conversation_history(
- self,
- conversation_id: uuid.UUID,
- max_history: Optional[int] = None
+ self,
+ conversation_id: uuid.UUID,
+ max_history: Optional[int] = None
) -> List[dict]:
- """获取会话历史消息
+ """
+ Retrieve historical conversation messages formatted as dictionaries.
Args:
- conversation_id: 会话ID
- max_history: 最大历史消息数量
+ conversation_id (uuid.UUID): Conversation UUID.
+ max_history (Optional[int]): Maximum number of messages to retrieve.
Returns:
- List[dict]: 历史消息列表,格式为 [{"role": "user", "content": "..."}, ...]
+ List[dict]: List of message dictionaries with keys 'role' and 'content'.
"""
- messages = self.get_messages(conversation_id, limit=max_history)
+ messages = self.message_repo.get_message_by_conversation_id(
+ conversation_id,
+ limit=max_history
+ )
# 转换为字典格式
history = [
@@ -185,20 +298,25 @@ class ConversationService:
return history
def save_conversation_messages(
- self,
- conversation_id: uuid.UUID,
- user_message: str,
- assistant_message: str
+ self,
+ conversation_id: uuid.UUID,
+ user_message: str,
+ assistant_message: str
):
- """保存会话消息(用户消息和助手回复)"""
- # 添加用户消息
+ """
+ Save a pair of user and assistant messages to the conversation.
+
+ Args:
+ conversation_id (uuid.UUID): Conversation UUID.
+ user_message (str): User's message content.
+ assistant_message (str): Assistant's response content.
+ """
self.add_message(
conversation_id=conversation_id,
role="user",
content=user_message
)
- # 添加助手消息
self.add_message(
conversation_id=conversation_id,
role="assistant",
@@ -206,7 +324,7 @@ class ConversationService:
)
logger.debug(
- "保存会话消息成功",
+ "Saved conversation messages successfully",
extra={
"conversation_id": str(conversation_id),
"user_message_length": len(user_message),
@@ -215,35 +333,59 @@ class ConversationService:
)
def delete_conversation(
- self,
- conversation_id: uuid.UUID,
- workspace_id: uuid.UUID
+ self,
+ conversation_id: uuid.UUID,
+ workspace_id: uuid.UUID
):
- """删除会话(软删除)"""
- conversation = self.get_conversation(conversation_id, workspace_id)
- conversation.is_active = False
+ """
+ Soft delete a conversation.
- self.db.commit()
+ Args:
+ conversation_id (uuid.UUID): Conversation UUID.
+ workspace_id (uuid.UUID): Workspace UUID for validation.
+ """
+ try:
+ self.conversation_repo.soft_delete_conversation_by_conversation_id(
+ conversation_id,
+ workspace_id
+ )
+ self.db.commit()
- logger.info(
- "删除会话成功",
- extra={
- "conversation_id": str(conversation_id),
- "workspace_id": str(workspace_id)
- }
- )
+ logger.info(
+ "Soft deleted conversation successfully",
+ extra={
+ "conversation_id": str(conversation_id),
+ "workspace_id": str(workspace_id)
+ }
+ )
+ except Exception as e:
+ self.db.rollback()
+ logger.error(
+ f"Error deleting conversation, conversation_id={conversation_id} - {str(e)}",
+ )
+ raise BusinessException("Error deleting conversation", code=BizCode.DB_ERROR)
def create_or_get_conversation(
- self,
- app_id: uuid.UUID,
- workspace_id: uuid.UUID,
- is_draft: bool = False,
- conversation_id: Optional[uuid.UUID] = None,
- user_id: Optional[str] = None,
+ self,
+ app_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ is_draft: bool = False,
+ conversation_id: Optional[uuid.UUID] = None,
+ user_id: Optional[str] = None,
) -> Conversation:
- """创建或获取会话"""
+ """
+ Retrieve an existing conversation by ID or create a new one.
- # 如果提供了 conversation_id,尝试获取现有会话
+ Args:
+ app_id (uuid.UUID): Application ID.
+ workspace_id (uuid.UUID): Workspace ID.
+ is_draft (bool): Whether the conversation should be a draft.
+ conversation_id (Optional[uuid.UUID]): Optional conversation ID to retrieve.
+ user_id (Optional[str]): Optional user ID.
+
+ Returns:
+ Conversation: Existing or newly created conversation.
+ """
if conversation_id:
try:
conversation = self.get_conversation(
@@ -253,11 +395,14 @@ class ConversationService:
# 验证会话是否属于该应用
if conversation.app_id != app_id:
- raise BusinessException("会话不属于该应用", BizCode.INVALID_CONVERSATION)
+ raise BusinessException(
+ "Conversation does not belong to this app",
+ BizCode.INVALID_CONVERSATION
+ )
return conversation
except ResourceNotFoundException:
logger.warning(
- "会话不存在,将创建新会话",
+ "Conversation not found. A new conversation will be created.",
extra={"conversation_id": str(conversation_id)}
)
@@ -270,15 +415,179 @@ class ConversationService:
)
logger.info(
- "为分享链接创建新会话"
+ "Created a new conversation for shared link usage",
+ extra={
+ "conversation_id": str(conversation_id),
+ }
)
return conversation
-# ==================== 依赖注入函数 ====================
+ async def get_conversation_detail(
+ self,
+ user: User,
+ conversation_id: uuid.UUID,
+ workspace_id: uuid.UUID,
+ language: str = "zh"
+ ) -> ConversationOut:
+ """
+ Retrieve or generate the summary and theme of a conversation.
+
+ This method first attempts to fetch the conversation detail from the repository.
+ If no detail exists or the conversation is outdated (>1 day), it generates a new
+ summary using the configured LLM model, stores it, and returns it.
+
+ Args:
+ user (User): The user requesting the conversation summary.
+ conversation_id (UUID): Unique identifier of the conversation.
+ workspace_id (UUID): Identifier of the workspace where the conversation belongs.
+ language (str, optional): Language for the summary generation. Defaults to "zh".
+
+ Returns:
+ ConversationOut: An object containing the conversation's theme, summary,
+ takeaways, and information score.
+
+ Raises:
+ BusinessException: If the workspace model is not configured, the model does
+ not exist, API keys are missing, or the LLM output is invalid.
+
+ Notes:
+ - If conversation details exist and are recent, they are returned directly.
+ - LLM generation uses system and user prompt templates from the filesystem.
+ - JSON repair is applied to ensure model outputs can be safely parsed.
+ - Commits the new conversation detail only if it is generated or outdated.
+ """
+ logger.info(f"Fetching conversation detail for conversation_id={conversation_id}, workspace_id={workspace_id}")
+
+ conversation_detail = self.conversation_repo.get_conversation_detail(
+ conversation_id=conversation_id,
+ )
+ conversation = self.get_conversation(
+ conversation_id=conversation_id,
+ )
+ if conversation_detail:
+ logger.info(f"Conversation detail found in repository for conversation_id={conversation_id}")
+ return ConversationOut(
+ theme=conversation_detail.theme,
+ theme_intro=conversation_detail.theme_intro,
+ summary=conversation_detail.summary,
+ takeaways=conversation_detail.takeaways,
+ info_score=conversation_detail.info_score,
+ )
+ logger.info("Conversation detail not found, generating new summary using LLM")
+ configs = workspace_service.get_workspace_models_configs(
+ db=self.db,
+ workspace_id=workspace_id,
+ user=user
+ )
+ model_id = configs.get('llm')
+ if not model_id:
+ logger.error(f"Workspace model configuration not found for workspace_id={workspace_id}")
+ raise BusinessException("Workspace model configuration not found. Please configure a model first.", code=BizCode.MODEL_NOT_FOUND)
+ config = ModelConfigService.get_model_by_id(db=self.db, model_id=model_id)
+
+ if not config:
+ logger.error("Configured model not found for model_id={model_id}")
+ raise BusinessException("Configured model does not exist.", BizCode.NOT_FOUND)
+
+ if not config.api_keys or len(config.api_keys) == 0:
+ logger.error(f"Model API keys missing for model_id={model_id}", )
+ raise BusinessException("Model configuration missing API keys.", BizCode.INVALID_PARAMETER)
+
+ api_config = config.api_keys[0]
+ model_name = api_config.model_name
+ provider = api_config.provider
+ api_key = api_config.api_key
+ api_base = api_config.api_base
+ model_type = config.type
+
+ llm = RedBearLLM(
+ RedBearModelConfig(
+ model_name=model_name,
+ provider=provider,
+ api_key=api_key,
+ base_url=api_base
+ ),
+ type=ModelType(model_type)
+ )
+
+ conversation_messages = self.get_conversation_history(
+ conversation_id=conversation_id,
+ max_history=30
+ )
+
+ with open('app/services/prompt/conversation_summary_system.jinja2', 'r', encoding='utf-8') as f:
+ system_prompt = f.read()
+ rendered_system_message = Template(system_prompt).render()
+
+ with open('app/services/prompt/conversation_summary_user.jinja2', 'r', encoding='utf-8') as f:
+ user_prompt = f.read()
+ rendered_user_message = Template(user_prompt).render(
+ language=language,
+ conversation=str(conversation_messages)
+ )
+ messages = [
+ (RoleType.SYSTEM, rendered_system_message),
+ (RoleType.USER, rendered_user_message),
+ ]
+ logger.info(f"Invoking LLM for conversation_id={conversation_id}")
+ model_resp = await llm.ainvoke(messages)
+ try:
+ if isinstance(model_resp.content, str):
+ result = json_repair.repair_json(model_resp.content, return_objects=True)
+ elif isinstance(model_resp.content, list):
+ result = json_repair.repair_json(model_resp.content[0].get("text"), return_objects=True)
+ elif isinstance(model_resp.content, dict):
+ result = model_resp.content
+ else:
+ raise BusinessException("Unexpect model output", code=BizCode.LLM_ERROR)
+ except Exception as e:
+ logger.exception(f"Failed to parse LLM response for conversation_id={conversation_id}")
+ raise BusinessException("Failed to parse LLM response", code=BizCode.LLM_ERROR) from e
+
+ summary = result.get('summary', "")
+ theme = result.get('theme', "")
+ theme_intro = result.get("theme_intro", "")
+ takeaways = result.get("takeaways") or []
+ info_score = result.get("info_score", 50)
+
+ if datetime.now() - conversation.updated_at > timedelta(days=1):
+ logger.info(f"Updating conversation detail in DB for conversation_id={conversation_id}")
+ conversation_detail = ConversationDetail(
+ conversation_id=conversation.id,
+ summary=summary,
+ theme=theme,
+ theme_intro=theme_intro,
+ takeaways=takeaways,
+ info_score=info_score
+ )
+ self.conversation_repo.add_conversation_detail(conversation_detail)
+
+ self.db.commit()
+ self.db.refresh(conversation_detail)
+ logger.info(f"Returning conversation summary for conversation_id={conversation_id}")
+ conversation_out = ConversationOut(
+ theme=theme,
+ theme_intro=theme_intro,
+ summary=summary,
+ takeaways=takeaways,
+ info_score=info_score
+ )
+ return conversation_out
+
+
+# ==================== Dependency Injection ====================
def get_conversation_service(
db: Annotated[Session, Depends(get_db)]
) -> ConversationService:
- """获取工作流服务(依赖注入)"""
+ """
+ Dependency injection function to provide ConversationService instance.
+
+ Args:
+ db (Session): Database session provided by FastAPI dependency.
+
+ Returns:
+ ConversationService: Service instance.
+ """
return ConversationService(db)
diff --git a/api/app/services/memory_perceptual_service.py b/api/app/services/memory_perceptual_service.py
index a74dc5a7..5fafe48d 100644
--- a/api/app/services/memory_perceptual_service.py
+++ b/api/app/services/memory_perceptual_service.py
@@ -99,7 +99,7 @@ class MemoryPerceptualService:
"keywords": content.keywords,
"topic": content.topic,
"domain": content.domain,
- "created_time": memory.created_time.isoformat() if memory.created_time else None,
+ "created_time": int(memory.created_time.timestamp()*1000),
**detail
}
@@ -141,8 +141,9 @@ class MemoryPerceptualService:
perceptual_type=PerceptualType(memory.perceptual_type),
file_path=memory.file_path,
file_name=memory.file_name,
+ file_ext=memory.file_ext,
summary=memory.summary,
- created_time=memory.created_time,
+ created_time=int(memory.created_time.timestamp()*1000),
storage_type=FileStorageType(memory.storage_service),
)
memory_items.append(memory_item)
diff --git a/api/app/services/prompt/conversation_summary_system.jinja2 b/api/app/services/prompt/conversation_summary_system.jinja2
new file mode 100644
index 00000000..256ae4bf
--- /dev/null
+++ b/api/app/services/prompt/conversation_summary_system.jinja2
@@ -0,0 +1,50 @@
+{% raw %}
+# Role Definition
+You are a professional dialogue content summarizer, specializing in extracting core information from multi-turn conversations between users and AI. Your goal is to generate concise, accurate summaries with extended key fields that help users quickly grasp the conversation's theme, key points, and value.
+
+# Core Rules
+- **Mandatory Rules**:
+ 1. Fully extract explicit user requests (questions/tasks) without omitting key details;
+ 2. Accurately summarize AI’s core responses (explanations/guidance) aligned with user requests;
+ 3. Reflect cause-and-effect relationships in multi-turn interactions (follow-up questions, clarifications);
+ 4. Clearly identify and describe the conversation’s theme, key收获 (takeaways), and other required extended fields.
+- **Constraints**:
+ 1. Do not add unmentioned information or subjective assumptions;
+ 2. Avoid vague expressions (e.g., "the user asked some questions"); be specific;
+ 3. For repetitive content (same question multiple times), keep only the initial request and final response.
+
+# Input Processing
+- Reading Order: Chronological sentence-by-sentence reading;
+- Priority: User requests > AI responses > interaction logic > theme/takeaway extraction;
+- Exception Handling: If the conversation is empty/invalid (only greetings, no substantive content), output "The conversation content is invalid and a summary cannot be generated."
+
+# Execution Process
+1. **Information Extraction**:
+ - Input: {{conversation}}
+ - Operation: Label user requests, AI responses, interaction nodes, conversation theme (core topic), and takeaways (key insights/results) sentence by sentence;
+2. **Logic Organization**:
+ - Input: Labeled extracted information
+ - Operation: Match requests with responses, organize interaction progression, and associate theme/takeaways with core content;
+3. **Summary Generation**:
+ - Input: Organized logical relationships and extended fields
+ - Operation: Integrate core information, theme, and takeaways into coherent language, ensuring all key elements are covered while removing redundancy.
+
+# Output Specifications (JSON Format)
+- Language: Please strictly output content in the language specified by the tag.
+- Structure: JSON object with five fields,:
+ 1. `theme`: A concise phrase describing the conversation’s core topic (e.g., "inquiry about delivery time rules");
+ 2. `theme_intro`: A brief explanation of the conversation’s core theme to clarify its specific scope and focus (e.g., "The conversation focuses on the user's inquiry about delivery time standards for regular and remote areas");
+ 3. `summary`: A single sentence including "user request + AI response + interaction logic" (≤100 words);
+ 4. `takeaways`: A list of brief bullet-point takeaways summarizing the key points from the conversation (e.g., ["User clarified delivery time differences between regular and remote areas"]).
+ 5. `info_score`: Numerical score (0–100) representing conversation information richness.
+- Language Style: Concise, objective, conversational (avoid overly formal terms).
+
+# Example JSON Output
+{
+ "theme": string,
+ "theme_intro": string,
+ "summary": string,
+ "takeaways": array[string],
+ "info_score": 85
+}
+{% endraw %}
diff --git a/api/app/services/prompt/conversation_summary_user.jinja2 b/api/app/services/prompt/conversation_summary_user.jinja2
new file mode 100644
index 00000000..51efe34e
--- /dev/null
+++ b/api/app/services/prompt/conversation_summary_user.jinja2
@@ -0,0 +1,2 @@
+{{ language }}
+{{ conversation }}
\ No newline at end of file
diff --git a/api/app/templates/prompt/prompt_optimizer_system.jinja2 b/api/app/services/prompt/prompt_optimizer_system.jinja2
similarity index 100%
rename from api/app/templates/prompt/prompt_optimizer_system.jinja2
rename to api/app/services/prompt/prompt_optimizer_system.jinja2
diff --git a/api/app/templates/prompt/prompt_optimizer_user.jinja2 b/api/app/services/prompt/prompt_optimizer_user.jinja2
similarity index 100%
rename from api/app/templates/prompt/prompt_optimizer_user.jinja2
rename to api/app/services/prompt/prompt_optimizer_user.jinja2
diff --git a/api/app/services/prompt_optimizer_service.py b/api/app/services/prompt_optimizer_service.py
index 135ddc5d..c6142c01 100644
--- a/api/app/services/prompt_optimizer_service.py
+++ b/api/app/services/prompt_optimizer_service.py
@@ -3,9 +3,8 @@ import uuid
from typing import Any, AsyncGenerator
import json_repair
-from langchain_core.prompts import ChatPromptTemplate
-from sqlalchemy.orm import Session
from jinja2 import Template
+from sqlalchemy.orm import Session
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
@@ -177,11 +176,11 @@ class PromptOptimizerService:
base_url=api_config.api_base
), type=ModelType(model_config.type))
try:
- with open('app/templates/prompt/prompt_optimizer_system.jinja2', 'r', encoding='utf-8') as f:
+ with open('app/services/prompt/prompt_optimizer_system.jinja2', 'r', encoding='utf-8') as f:
opt_system_prompt = f.read()
rendered_system_message = Template(opt_system_prompt).render()
- with open('app/templates/prompt/prompt_optimizer_user.jinja2', 'r', encoding='utf-8') as f:
+ with open('app/services/prompt/prompt_optimizer_user.jinja2', 'r', encoding='utf-8') as f:
opt_user_prompt = f.read()
except FileNotFoundError:
raise BusinessException(message="System prompt template not found", code=BizCode.NOT_FOUND)