Compare commits

..

63 Commits

Author SHA1 Message Date
Mark
0f092e08f4 Merge pull request #658 from SuanmoSuanyangTechnology/fix/features_028
fix(app)
2026-03-20 20:19:17 +08:00
Timebomb2018
8e7603bcc4 fix(app): Multimodal file processing 2026-03-20 20:17:42 +08:00
Mark
a079358028 Merge pull request #657 from SuanmoSuanyangTechnology/fix/features_028
fix(app)
2026-03-20 19:54:37 +08:00
Timebomb2018
fa29a39920 fix(app): release notes 2026-03-20 19:52:28 +08:00
Mark
2146c555d2 Merge pull request #656 from SuanmoSuanyangTechnology/fix/features_028
fix(app)
2026-03-20 19:51:18 +08:00
Timebomb2018
240f1d431b fix(app): Multimodal file storage 2026-03-20 19:45:41 +08:00
Mark
726148d7ee Merge pull request #649 from SuanmoSuanyangTechnology/fix/features_028
fix(app)
2026-03-20 15:41:00 +08:00
Timebomb2018
0f1b1d7d10 fix(app): The processing features of the application 2026-03-20 15:36:04 +08:00
Mark
11aa2e1f9e Merge pull request #648 from SuanmoSuanyangTechnology/fix/features_028
Fix(app)
2026-03-20 15:18:07 +08:00
Timebomb2018
ca654cca74 Merge branch 'refs/heads/release/v0.2.8' into fix/features_028 2026-03-20 15:15:07 +08:00
Timebomb2018
bd1f649bd0 fix(app): The processing features of the application 2026-03-20 15:14:50 +08:00
Ke Sun
ea00747c66 Merge pull request #645 from SuanmoSuanyangTechnology/fix/features_028
Fix(app)
2026-03-20 14:38:30 +08:00
Timebomb2018
3db031891e Merge branch 'refs/heads/release/v0.2.8' into fix/features_028 2026-03-20 14:20:51 +08:00
Timebomb2018
fb6ca3909a fix(app): The copy processing features of the application 2026-03-20 14:20:23 +08:00
Mark
929afb1770 Merge pull request #644 from SuanmoSuanyangTechnology/fix/features_028
fix(app)
2026-03-20 13:47:49 +08:00
yujiangping
6235584b2e Merge branch 'release/v0.2.8' of github.com:SuanmoSuanyangTechnology/MemoryBear into release/v0.2.8 2026-03-20 12:33:55 +08:00
yujiangping
0b1ea33b41 fix:office view 2026-03-20 12:13:04 +08:00
Timebomb2018
3929f811b8 fix(app): The import and export processing features of the application 2026-03-20 12:05:35 +08:00
yingzhao
551a2b59a5 Merge pull request #642 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
fix(web): editor bug
2026-03-20 10:59:59 +08:00
zhaoying
9a765ac71e fix(web): editor bug 2026-03-20 10:58:58 +08:00
yingzhao
83e26732de Merge pull request #641 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
fix(web): max_file_count limit 1
2026-03-20 10:52:28 +08:00
zhaoying
52fdfc7744 fix(web): max_file_count limit 1 2026-03-20 10:49:04 +08:00
Mark
4e544325a0 Merge pull request #640 from SuanmoSuanyangTechnology/fix/features_028
fix(file)
2026-03-19 22:02:33 +08:00
Timebomb2018
99a2f396fd Merge branch 'refs/heads/release/v0.2.8' into fix/features_028 2026-03-19 22:00:18 +08:00
Timebomb2018
0157c9d262 fix(file): Routing repair 2026-03-19 21:59:00 +08:00
Mark
5ddacab162 Merge pull request #639 from SuanmoSuanyangTechnology/fix/features_028
fix(app features)
2026-03-19 21:48:47 +08:00
Timebomb2018
a51e34852c fix(app features): Support for xls and doc files 2026-03-19 21:41:45 +08:00
Mark
36f670b2e9 Merge pull request #627 from SuanmoSuanyangTechnology/fix/features_028
Fix(bug)
2026-03-19 20:50:55 +08:00
Mark
cbcbc8822c Merge pull request #631 from wanxunyang/feature/permanent-file-url-wxy
feat: add file storage controller with OSS/S3 support
2026-03-19 20:49:46 +08:00
yingzhao
aa2d1e7a35 Merge pull request #637 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
fix(web): url add check rules
2026-03-19 20:36:41 +08:00
Ke Sun
39b2f3ba0e Merge pull request #633 from SuanmoSuanyangTechnology/fix/knowledge-retrieval
fix(workflow): enable nested search in knowledge base retrieval node
2026-03-19 20:34:09 +08:00
zhaoying
43064ab71b fix(web): url add check rules 2026-03-19 20:33:14 +08:00
yingzhao
4144f0b9b5 Merge pull request #636 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
fix(web): file type required
2026-03-19 20:30:40 +08:00
zhaoying
08f0be17ce fix(web): file type required 2026-03-19 20:28:22 +08:00
yingzhao
2915e464bf Merge pull request #635 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
Fix/v0.2.8 zy
2026-03-19 20:25:47 +08:00
Ke Sun
152559ae46 Merge pull request #634 from SuanmoSuanyangTechnology/fix/celery
[changes] Modify the execution conditions of the task
2026-03-19 20:24:43 +08:00
zhaoying
1f531f1ace fix(web): community node validate key 2026-03-19 20:24:16 +08:00
zhaoying
7ec947189c fix(web): update file type 2026-03-19 20:19:30 +08:00
lanceyq
b4615bacdc [changes] Modify the execution conditions of the task 2026-03-19 20:17:43 +08:00
Eternity
e849fed5c1 fix(workflow): enable nested search in knowledge base retrieval node 2026-03-19 19:53:47 +08:00
yingzhao
0f5cae4590 Merge pull request #632 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
fix(web): ui update
2026-03-19 19:46:53 +08:00
zhaoying
1c3029f360 fix(web): ui update 2026-03-19 19:45:58 +08:00
wxy
e2411e0bdd fix: remove unused share_info variable in upload_file_with_share_token 2026-03-19 19:43:48 +08:00
Mark
7af88b19cf Merge pull request #629 from SuanmoSuanyangTechnology/fix/conversation-msgmetadata
fix(conversation): handle None meta_data in msg to prevent exceptions
2026-03-19 19:35:11 +08:00
Eternity
c3f8dbd4bc fix(conversation): handle None meta_data in msg to prevent exceptions 2026-03-19 19:27:58 +08:00
Ke Sun
c1e48fde86 Merge pull request #630 from SuanmoSuanyangTechnology/fix/celery
[changes]Community node attribute check
2026-03-19 19:26:52 +08:00
lanceyq
f644c84fbb [changes]Community node attribute check 2026-03-19 19:24:37 +08:00
yingzhao
d0afce27c4 Merge pull request #628 from SuanmoSuanyangTechnology/fix/v0.2.8_zy
Fix/v0.2.8 zy
2026-03-19 19:01:46 +08:00
zhaoying
b84aba71e7 feat(web): file add status 2026-03-19 19:00:31 +08:00
Timebomb2018
2e481df465 Merge branch 'refs/heads/release/v0.2.8' into fix/features_028 2026-03-19 18:59:18 +08:00
Timebomb2018
a322ec4fd5 fix(bug): tool exception display 2026-03-19 18:58:37 +08:00
Mark
bdbf9c0609 Merge pull request #626 from SuanmoSuanyangTechnology/fix/workmemory-conversations
feat(memory): add pagination support for conversation list in working memory
2026-03-19 18:52:11 +08:00
Ke Sun
ef7d59e442 Merge pull request #625 from SuanmoSuanyangTechnology/fix/reserve
[changes] keep two decimals
2026-03-19 18:52:09 +08:00
zhaoying
27b782e12a feat(web): work memory support page 2026-03-19 18:41:33 +08:00
Eternity
37a22fbfa9 feat(memory): add pagination support for conversation list in working memory 2026-03-19 18:23:09 +08:00
Mark
d798d101f7 Merge pull request #623 from SuanmoSuanyangTechnology/fix/workmemory-conversations
feat(memory): add pagination support for conversation list in working memory
2026-03-19 17:59:48 +08:00
Mark
825f225f63 Merge pull request #622 from SuanmoSuanyangTechnology/fix/features_028
fix(agetn features):
2026-03-19 17:59:00 +08:00
Timebomb2018
4d5e2958dc Merge branch 'refs/heads/release/v0.2.8' into fix/features_028 2026-03-19 17:58:17 +08:00
Timebomb2018
6105d46198 fix(bug): bug fix 2026-03-19 17:54:32 +08:00
lanceyq
7aec157859 [changes] keep two decimals 2026-03-19 17:53:01 +08:00
Eternity
13abb03d87 feat(memory): add pagination support for conversation list in working memory 2026-03-19 17:49:16 +08:00
wxy
e8947ad0bb feat: add permanent public URL support for remote storage (OSS/S3) 2026-03-19 17:48:46 +08:00
Timebomb2018
7056865726 fix(agetn features):
1. Historical multimodal message writing is incorporated into the conversation context;
2. Resolve the issues where csv, json, and txt files cannot be recognized due to encoding problems;
3. File quantity limit;
4. Error details
2026-03-19 17:25:44 +08:00
91 changed files with 1847 additions and 2542 deletions

2
.gitignore vendored
View File

@@ -25,8 +25,6 @@ examples/
time.log
celerybeat-schedule.db
search_results.json
redbear-mem-metrics/
pitch-deck/
api/migrations/versions
tmp

View File

@@ -13,7 +13,6 @@ from . import (
document_controller,
emotion_config_controller,
emotion_controller,
end_user_controller,
file_controller,
file_storage_controller,
home_page_controller,
@@ -97,6 +96,5 @@ manager_router.include_router(file_storage_controller.router)
manager_router.include_router(ontology_controller.router)
manager_router.include_router(skill_controller.router)
manager_router.include_router(i18n_controller.router)
manager_router.include_router(end_user_controller.router)
__all__ = ["manager_router"]

View File

@@ -1,48 +0,0 @@
"""End User 管理接口 - 无需认证"""
from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.repositories.end_user_repository import EndUserRepository
from app.schemas.memory_api_schema import (
CreateEndUserRequest,
CreateEndUserResponse,
)
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
router = APIRouter(prefix="/end_users", tags=["End Users"])
logger = get_business_logger()
@router.post("")
async def create_end_user(
data: CreateEndUserRequest,
db: Session = Depends(get_db),
):
"""
Create an end user.
Creates a new end user for the given workspace.
If an end user with the same other_id already exists in the workspace,
returns the existing one.
"""
logger.info(f"Create end user request - other_id: {data.other_id}, workspace_id: {data.workspace_id}")
end_user_repo = EndUserRepository(db)
end_user = end_user_repo.get_or_create_end_user(
app_id=None,
workspace_id=data.workspace_id,
other_id=data.other_id,
)
logger.info(f"End user ready: {end_user.id}")
result = {
"id": str(end_user.id),
"other_id": end_user.other_id or "",
"other_name": end_user.other_name or "",
"workspace_id": str(end_user.workspace_id),
}
return success(data=CreateEndUserResponse(**result).model_dump(), msg="End user created successfully")

View File

@@ -91,7 +91,7 @@ async def upload_file(
if file_size > settings.MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"The file size exceeds the {settings.MAX_FILE_SIZE} byte limit"
)
@@ -172,7 +172,6 @@ async def upload_file_with_share_token(
# Get share and release info from share_token
service = ReleaseShareService(db)
share_info = service.get_shared_release_info(share_token=share_data.share_token)
# Get share object to access app_id
share = service.repo.get_by_share_token(share_data.share_token)
@@ -499,6 +498,51 @@ async def get_file_url(
)
@router.get("/files/{file_id}/public-url", response_model=ApiResponse)
async def get_permanent_file_url(
file_id: uuid.UUID,
db: Session = Depends(get_db),
storage_service: FileStorageService = Depends(get_file_storage_service),
):
"""
获取文件的永久公开 URL无过期时间
- 本地存储:返回 API 永久访问地址(基于 FILE_LOCAL_SERVER_URL 配置)
- 远程存储OSS/S3返回 bucket 公读地址(需 bucket 已配置公共读权限)
"""
file_metadata = db.query(FileMetadata).filter(FileMetadata.id == file_id).first()
if not file_metadata:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="The file does not exist")
if file_metadata.status != "completed":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File upload not completed, status: {file_metadata.status}")
file_key = file_metadata.file_key
storage = storage_service.storage
try:
if isinstance(storage, LocalStorage):
url = f"{settings.FILE_LOCAL_SERVER_URL}/storage/permanent/{file_id}"
else:
url = await storage.get_permanent_url(file_key)
if not url:
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Permanent URL not supported for current storage backend")
api_logger.info(f"Generated permanent URL: file_id={file_id}")
return success(
data={"url": url, "expires_in": None, "permanent": True, "file_name": file_metadata.file_name},
msg="Permanent file URL generated successfully"
)
except HTTPException:
raise
except Exception as e:
api_logger.error(f"Failed to generate permanent URL: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate permanent URL: {str(e)}")
@router.get("/public/{file_id}", response_model=Any)
async def public_download_file(
request: Request,

View File

@@ -195,10 +195,9 @@ async def get_workspace_end_users(
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
# 触发社区聚类补全任务(异步,不阻塞接口响应)
# 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类
try:
from app.tasks import init_community_clustering_for_users
init_community_clustering_for_users.delay(end_user_ids=end_user_ids)
init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id))
api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}")
except Exception as e:
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")

View File

@@ -33,35 +33,47 @@ def get_memory_count(
@router.get("/{end_user_id}/conversations", response_model=ApiResponse)
def get_conversations(
end_user_id: uuid.UUID,
page: int = 1,
pagesize: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Retrieve all conversations for the current user in a specific group.
Retrieve conversations for the current user in a specific group with pagination.
Args:
end_user_id (UUID): The group identifier.
page (int): Page number (1-based). Defaults to 1.
pagesize (int): Number of items per page. Defaults to 20.
current_user (User, optional): The authenticated user.
db (Session, optional): SQLAlchemy session.
Returns:
ApiResponse: Contains a list of conversation IDs.
Notes:
- Initializes the ConversationService with the current DB session.
- Returns only conversation IDs for lightweight response.
- Logs can be added to trace requests in production.
ApiResponse: Contains a paginated list of conversations.
"""
page = max(1, page)
page_size = max(1, min(pagesize, 100)) # Limit page size between 1 and 100
conversation_service = ConversationService(db)
conversations = conversation_service.get_user_conversations(
end_user_id
conversations, total = conversation_service.get_user_conversations(
end_user_id,
page=page,
page_size=page_size
)
return success(data=[
{
"id": conversation.id,
"title": conversation.title
} for conversation in conversations
], msg="get conversations success")
return success(data={
"items": [
{
"id": conversation.id,
"title": conversation.title
} for conversation in conversations
],
"total": total,
"page": {
"page": page,
"pagesize": page_size,
"total": total,
"hasnext": (page * page_size) < total
},
}, msg="get conversations success")
@router.get("/{end_user_id}/messages", response_model=ApiResponse)

View File

@@ -6,7 +6,6 @@ from app.core.response_utils import success
from app.db import get_db
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.memory_api_schema import (
ListConfigsResponse,
MemoryReadRequest,
MemoryReadResponse,
MemoryWriteRequest,
@@ -32,15 +31,14 @@ async def write_memory_api_service(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="Message content"),
payload: MemoryWriteRequest = Body(..., embed=False),
):
"""
Write memory to storage.
Stores memory content for the specified end user using the Memory API Service.
"""
body = await request.json()
payload = MemoryWriteRequest(**body)
logger.info(f"Memory write request - end_user_id: {payload.end_user_id}, workspace_id: {api_key_auth.workspace_id}")
memory_api_service = MemoryAPIService(db)
@@ -64,15 +62,13 @@ async def read_memory_api_service(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="Query message"),
payload: MemoryReadRequest = Body(..., embed=False),
):
"""
Read memory from storage.
Queries and retrieves memories for the specified end user with context-aware responses.
"""
body = await request.json()
payload = MemoryReadRequest(**body)
logger.info(f"Memory read request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db)
@@ -89,27 +85,3 @@ async def read_memory_api_service(
logger.info(f"Memory read successful for end_user: {payload.end_user_id}")
return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully")
@router.get("/configs")
@require_api_key(scopes=["memory"])
async def list_memory_configs(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
List all memory configs for the workspace.
Returns all available memory configurations associated with the authorized workspace.
"""
logger.info(f"List configs request - workspace_id: {api_key_auth.workspace_id}")
memory_api_service = MemoryAPIService(db)
result = memory_api_service.list_memory_configs(
workspace_id=api_key_auth.workspace_id,
)
logger.info(f"Listed {result['total']} configs for workspace: {api_key_auth.workspace_id}")
return success(data=ListConfigsResponse(**result).model_dump(), msg="Configs listed successfully")

View File

@@ -76,6 +76,8 @@ async def get_tool_methods(
if methods is None:
raise HTTPException(status_code=404, detail="工具不存在")
return success(data=methods, msg="获取工具方法成功")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -121,6 +123,8 @@ async def create_tool(
raise HTTPException(status_code=400, detail=e.message)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -149,6 +153,8 @@ async def update_tool(
return success(msg="工具更新成功")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -191,6 +197,8 @@ async def set_tool_active(
return success(msg=f"工具已{action}")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -223,6 +231,8 @@ async def execute_tool(
},
msg="工具执行完成"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -97,6 +97,7 @@ class Settings:
# File Upload
MAX_FILE_SIZE: int = int(os.getenv("MAX_FILE_SIZE", "52428800"))
MAX_FILE_COUNT: int = int(os.getenv("MAX_FILE_COUNT", "20"))
FILE_PATH: str = os.getenv("FILE_PATH", "/files")
FILE_URL_EXPIRES: int = int(os.getenv("FILE_URL_EXPIRES", "3600"))

View File

@@ -155,7 +155,7 @@ async def clean_databases(data) -> str:
# Process reranked results
reranked = results.get('reranked_results', {})
if reranked:
for category in ['summaries', 'communities', 'statements', 'chunks', 'entities']:
for category in ['summaries', 'statements', 'chunks', 'entities']:
items = reranked.get(category, [])
if isinstance(items, list):
content_list.extend(items)
@@ -169,18 +169,11 @@ async def clean_databases(data) -> str:
elif isinstance(time_search, list):
content_list.extend(time_search)
# Extract text content,对 community 按 name 去重(多次 tool 调用会产生重复)
# Extract text content
text_parts = []
seen_community_names = set()
for item in content_list:
if isinstance(item, dict):
# community 节点用 name 去重
if 'member_count' in item or 'core_entities' in item:
community_name = item.get('name') or item.get('id', '')
if community_name in seen_community_names:
continue
seen_community_names.add(community_name)
text = item.get('statement') or item.get('content') or item.get('summary', '')
text = item.get('statement') or item.get('content', '')
if text:
text_parts.append(text)
elif isinstance(item, str):
@@ -361,11 +354,7 @@ async def retrieve(state: ReadState) -> ReadState:
)
time_retrieval_tool = create_time_retrieval_tool(end_user_id)
search_params = {
"end_user_id": end_user_id,
"return_raw_results": True,
"include": ["summaries", "statements", "chunks", "entities", "communities"],
}
search_params = {"end_user_id": end_user_id, "return_raw_results": True}
hybrid_retrieval = create_hybrid_retrieval_tool_sync(memory_config, **search_params)
agent = create_agent(
llm,
@@ -401,32 +390,8 @@ async def retrieve(state: ReadState) -> ReadState:
raw_results = tool_results['content']
clean_content = await clean_databases(raw_results)
# 社区展开:从 tool 返回结果中提取命中的 community
# 沿 BELONGS_TO_COMMUNITY 关系拉取关联 Statement 追加到 clean_content
_expanded_stmts_to_write = []
try:
results_dict = raw_results.get('results', {}) if isinstance(raw_results, dict) else {}
reranked = results_dict.get('reranked_results', {})
community_hits = reranked.get('communities', [])
if not community_hits:
community_hits = results_dict.get('communities', [])
if community_hits:
from app.core.memory.agent.services.search_service import expand_communities_to_statements
_expanded_stmts_to_write, new_texts = await expand_communities_to_statements(
community_results=community_hits,
end_user_id=end_user_id,
existing_content=clean_content,
)
if new_texts:
clean_content = clean_content + '\n' + '\n'.join(new_texts)
except Exception as parse_err:
logger.warning(f"[Retrieve] 解析社区命中结果失败,跳过展开: {parse_err}")
try:
raw_results = raw_results['results']
# 写回展开结果,接口返回中可见(已在 helper 中清洗过字段)
if _expanded_stmts_to_write and isinstance(raw_results, dict):
raw_results.setdefault('reranked_results', {})['expanded_statements'] = _expanded_stmts_to_write
except Exception:
raw_results = []

View File

@@ -334,22 +334,13 @@ async def Input_Summary(state: ReadState) -> ReadState:
"end_user_id": end_user_id,
"question": data,
"return_raw_results": True,
"include": ["summaries", "communities"] # MemorySummary 和 Community 同为高维度概括节点
"include": ["summaries"] # Only search summary nodes for faster performance
}
try:
if storage_type != "rag":
retrieve_info, question, raw_results = await SearchService().execute_hybrid_search(
**search_params,
memory_config=memory_config,
expand_communities=False, # 路径 "2" 只需要 community 的 summary 文本,不展开到 Statement
)
# 调试:打印 community 检索结果数量
if raw_results and isinstance(raw_results, dict):
reranked = raw_results.get('reranked_results', {})
community_hits = reranked.get('communities', [])
logger.debug(f"[Input_Summary] community 命中数: {len(community_hits)}, "
f"summary 命中数: {len(reranked.get('summaries', []))}")
retrieve_info, question, raw_results = await SearchService().execute_hybrid_search(**search_params,
memory_config=memory_config)
else:
retrieval_knowledge, retrieve_info, question, raw_results = await rag_knowledge(state, data)
except Exception as e:

View File

@@ -252,10 +252,9 @@ def create_hybrid_retrieval_tool_async(memory_config, **search_params):
# TODO: fact_summary functionality temporarily disabled, will be enabled after future development
fields_to_remove = {
'invalid_at', 'valid_at', 'chunk_id_from_rel', 'entity_ids',
'expired_at', 'created_at', 'chunk_id', 'apply_id',
'expired_at', 'created_at', 'chunk_id', 'id', 'apply_id',
'user_id', 'statement_ids', 'updated_at', "chunk_ids", "fact_summary"
}
# 注意:'id' 字段保留community 展开时需要用 community id 查询成员 statements
if isinstance(data, dict):
# Clean dictionary
@@ -311,7 +310,7 @@ def create_hybrid_retrieval_tool_async(memory_config, **search_params):
"search_type": search_type,
"end_user_id": end_user_id or search_params.get("end_user_id"),
"limit": limit or search_params.get("limit", 10),
"include": search_params.get("include", ["summaries", "statements", "chunks", "entities", "communities"]),
"include": search_params.get("include", ["summaries", "statements", "chunks", "entities"]),
"output_path": None, # Don't save to file
"memory_config": memory_config,
"rerank_alpha": rerank_alpha,

View File

@@ -13,72 +13,6 @@ from app.core.memory.utils.data.text_utils import escape_lucene_query
logger = get_agent_logger(__name__)
# 需要从展开结果中过滤的字段(含 Neo4j DateTime不可 JSON 序列化)
_EXPAND_FIELDS_TO_REMOVE = {
'invalid_at', 'valid_at', 'chunk_id_from_rel', 'entity_ids',
'expired_at', 'created_at', 'chunk_id', 'apply_id',
'user_id', 'statement_ids', 'updated_at', 'chunk_ids', 'fact_summary'
}
def _clean_expand_fields(obj):
"""递归过滤展开结果中不可序列化的字段DateTime 等)。"""
if isinstance(obj, dict):
return {k: _clean_expand_fields(v) for k, v in obj.items() if k not in _EXPAND_FIELDS_TO_REMOVE}
if isinstance(obj, list):
return [_clean_expand_fields(i) for i in obj]
return obj
async def expand_communities_to_statements(
community_results: List[dict],
end_user_id: str,
existing_content: str = "",
limit: int = 10,
) -> Tuple[List[dict], List[str]]:
"""
社区展开 helper给定命中的 community 列表,拉取关联 Statement。
- 对展开结果去重(过滤已在 existing_content 中出现的文本)
- 过滤不可序列化字段
- 返回 (cleaned_expanded_stmts, new_texts)
- cleaned_expanded_stmts: 可直接写回 raw_results 的列表
- new_texts: 去重后新增的 statement 文本列表,用于追加到 clean_content
"""
community_ids = [r.get("id") for r in community_results if r.get("id")]
if not community_ids or not end_user_id:
return [], []
from app.repositories.neo4j.graph_search import search_graph_community_expand
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
connector = Neo4jConnector()
try:
result = await search_graph_community_expand(
connector=connector,
community_ids=community_ids,
end_user_id=end_user_id,
limit=limit,
)
except Exception as e:
logger.warning(f"[expand_communities] 社区展开检索失败,跳过: {e}")
return [], []
finally:
await connector.close()
expanded_stmts = result.get("expanded_statements", [])
if not expanded_stmts:
return [], []
existing_lines = set(existing_content.splitlines())
new_texts = [
s["statement"] for s in expanded_stmts
if s.get("statement") and s["statement"] not in existing_lines
]
cleaned = _clean_expand_fields(expanded_stmts)
logger.info(f"[expand_communities] 展开 {len(expanded_stmts)} 条 statements新增 {len(new_texts)}community_ids={community_ids}")
return cleaned, new_texts
class SearchService:
"""Service for executing hybrid search and processing results."""
@@ -87,7 +21,7 @@ class SearchService:
"""Initialize the search service."""
logger.info("SearchService initialized")
def extract_content_from_result(self, result: dict, node_type: str = "") -> str:
def extract_content_from_result(self, result: dict) -> str:
"""
Extract only meaningful content from search results, dropping all metadata.
@@ -96,11 +30,9 @@ class SearchService:
- Entities: extract 'name' and 'fact_summary' fields
- Summaries: extract 'content' field
- Chunks: extract 'content' field
- Communities: extract 'content' field (c.summary), prefixed with community name
Args:
result: Search result dictionary
node_type: Hint for node type ("community", "summary", etc.)
Returns:
Clean content string without metadata
@@ -114,21 +46,8 @@ class SearchService:
if 'statement' in result and result['statement']:
content_parts.append(result['statement'])
# Community 节点:有 member_count 或 core_entities 字段,或 node_type 明确指定
# 用 "[主题:{name}]" 前缀区分,让 LLM 知道这是主题级摘要
is_community = (
node_type == "community"
or 'member_count' in result
or 'core_entities' in result
)
if is_community:
name = result.get('name', '')
content = result.get('content', '')
if content:
prefix = f"[主题:{name}] " if name else ""
content_parts.append(f"{prefix}{content}")
elif 'content' in result and result['content']:
# Summaries / Chunks
# Summaries/Chunks: extract content field
if 'content' in result and result['content']:
content_parts.append(result['content'])
# Entities: extract name and fact_summary (commented out in original)
@@ -180,8 +99,7 @@ class SearchService:
rerank_alpha: float = 0.4,
output_path: str = "search_results.json",
return_raw_results: bool = False,
memory_config = None,
expand_communities: bool = True,
memory_config = None
) -> Tuple[str, str, Optional[dict]]:
"""
Execute hybrid search and return clean content.
@@ -196,15 +114,13 @@ class SearchService:
output_path: Path to save search results (default: "search_results.json")
return_raw_results: If True, also return the raw search results as third element (default: False)
memory_config: Memory configuration object (required)
expand_communities: If True, expand community hits to member statements (default: True).
Set to False for quick-summary paths that only need community-level text.
Returns:
Tuple of (clean_content, cleaned_query, raw_results)
raw_results is None if return_raw_results=False
"""
if include is None:
include = ["statements", "chunks", "entities", "summaries", "communities"]
include = ["statements", "chunks", "entities", "summaries"]
# Clean query
cleaned_query = self.clean_query(question)
@@ -230,8 +146,8 @@ class SearchService:
if search_type == "hybrid":
reranked_results = answer.get('reranked_results', {})
# Priority order: summaries first (most contextual), then communities, statements, chunks, entities
priority_order = ['summaries', 'communities', 'statements', 'chunks', 'entities']
# Priority order: summaries first (most contextual), then statements, chunks, entities
priority_order = ['summaries', 'statements', 'chunks', 'entities']
for category in priority_order:
if category in include and category in reranked_results:
@@ -241,33 +157,19 @@ class SearchService:
else:
# For keyword or embedding search, results are directly in answer dict
# Apply same priority order
priority_order = ['summaries', 'communities', 'statements', 'chunks', 'entities']
priority_order = ['summaries', 'statements', 'chunks', 'entities']
for category in priority_order:
if category in include and category in answer:
category_results = answer[category]
if isinstance(category_results, list):
answer_list.extend(category_results)
# 对命中的 community 节点展开其成员 statements路径 "0"/"1" 需要,路径 "2" 不需要)
if expand_communities and "communities" in include:
community_results = (
answer.get('reranked_results', {}).get('communities', [])
if search_type == "hybrid"
else answer.get('communities', [])
)
cleaned_stmts, new_texts = await expand_communities_to_statements(
community_results=community_results,
end_user_id=end_user_id,
)
answer_list.extend(cleaned_stmts)
# Extract clean content from all results,按类型传入 node_type 区分 community
content_list = []
for ans in answer_list:
# community 节点有 member_count 或 core_entities 字段
ntype = "community" if ('member_count' in ans or 'core_entities' in ans) else ""
content_list.append(self.extract_content_from_result(ans, node_type=ntype))
# Extract clean content from all results
content_list = [
self.extract_content_from_result(ans)
for ans in answer_list
]
# Filter out empty strings and join with newlines

View File

@@ -84,7 +84,7 @@ async def get_chunked_dialogs(
pruning_scene=memory_config.pruning_scene or "education",
pruning_threshold=memory_config.pruning_threshold,
scene_id=str(memory_config.scene_id) if memory_config.scene_id else None,
ontology_class_infos=memory_config.ontology_class_infos,
ontology_classes=memory_config.ontology_classes,
)
logger.info(f"[剪枝] 加载配置: switch={pruning_config.pruning_switch}, scene={pruning_config.pruning_scene}, threshold={pruning_config.pruning_threshold}")

View File

@@ -19,7 +19,7 @@ from app.core.memory.utils.log.logging_utils import log_time
from app.db import get_db_context
from app.repositories.neo4j.add_edges import add_memory_summary_statement_edges
from app.repositories.neo4j.add_nodes import add_memory_summary_nodes
from app.repositories.neo4j.graph_saver import save_dialog_and_statements_to_neo4j, schedule_clustering_after_write
from app.repositories.neo4j.graph_saver import save_dialog_and_statements_to_neo4j
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_config_schema import MemoryConfig
@@ -166,15 +166,11 @@ async def write(
statement_entity_edges=all_statement_entity_edges,
entity_edges=all_entity_entity_edges,
connector=neo4j_connector,
config_id=config_id,
llm_model_id=str(memory_config.llm_model_id) if memory_config.llm_model_id else None,
)
if success:
logger.info("Successfully saved all data to Neo4j")
# 写入成功后,异步触发聚类(不阻塞写入响应)
schedule_clustering_after_write(
all_entity_nodes,
llm_model_id=str(memory_config.llm_model_id) if memory_config.llm_model_id else None,
embedding_model_id=str(memory_config.embedding_model_id) if memory_config.embedding_model_id else None,
)
break
else:
logger.warning("Failed to save some data to Neo4j")

View File

@@ -6,7 +6,6 @@ of the memory system including LLM, chunking, pruning, and search.
Classes:
LLMConfig: Configuration for LLM client
ChunkerConfig: Configuration for dialogue chunking
OntologyClassInfo: Single ontology class with name and description
PruningConfig: Configuration for semantic pruning
TemporalSearchParams: Parameters for temporal search queries
"""
@@ -51,41 +50,30 @@ class ChunkerConfig(BaseModel):
min_characters_per_chunk: Optional[int] = Field(24, ge=0, description="The minimum number of characters in each chunk.")
class OntologyClassInfo(BaseModel):
"""本体类型的名称与语义描述,用于剪枝提示词注入。
Attributes:
class_name: 本体类型名称(如"患者""课程"
class_description: 本体类型语义描述,告知 LLM 该类型在当前场景下的含义
"""
class_name: str = Field(..., description="本体类型名称")
class_description: str = Field(default="", description="本体类型语义描述")
class PruningConfig(BaseModel):
"""Configuration for semantic pruning of dialogue content.
Attributes:
pruning_switch: Enable or disable semantic pruning
pruning_scene: Scene name for pruning from ontology_scene table
pruning_scene: Scene name for pruning, either a built-in key
('education', 'online_service', 'outbound') or a custom scene_name
from ontology_scene table
pruning_threshold: Pruning ratio (0-0.9, max 0.9 to avoid complete removal)
scene_id: Optional ontology scene UUID
ontology_class_infos: Full ontology class info (name + description) from
ontology_class table, injected into the pruning prompt to drive
scene-aware preservation decisions
scene_id: Optional ontology scene UUID, used to load custom ontology classes
ontology_classes: List of class_name strings from ontology_class table,
injected into the prompt when pruning_scene is not a built-in scene
"""
pruning_switch: bool = Field(False, description="Enable semantic pruning when True.")
pruning_scene: str = Field(
"education",
description="Scene name from ontology_scene table.",
description="Scene for pruning: built-in key or custom scene_name from ontology_scene.",
)
pruning_threshold: float = Field(
0.5, ge=0.0, le=0.9,
description="Pruning ratio within 0-0.9 (max 0.9 to avoid termination).")
scene_id: Optional[str] = Field(None, description="Ontology scene UUID (optional).")
ontology_class_infos: List[OntologyClassInfo] = Field(
default_factory=list,
description="Full ontology class info (name + description) injected into pruning prompt."
ontology_classes: Optional[List[str]] = Field(
None, description="Class names from ontology_class table for custom scenes."
)

View File

@@ -238,7 +238,7 @@ def rerank_with_activation(
reranked: Dict[str, List[Dict[str, Any]]] = {}
for category in ["statements", "chunks", "entities", "summaries", "communities"]:
for category in ["statements", "chunks", "entities", "summaries"]:
keyword_items = keyword_results.get(category, [])
embedding_items = embedding_results.get(category, [])
@@ -281,23 +281,21 @@ def rerank_with_activation(
for item in items_list:
item_id = item.get("id") or item.get("uuid") or item.get("chunk_id")
if item_id and item_id in combined_items:
combined_items[item_id]["normalized_activation_value"] = item.get("normalized_activation_value")
combined_items[item_id]["normalized_activation_value"] = item.get("normalized_activation_value", 0)
# 步骤 4: 计算基础分数和最终分数
for item_id, item in combined_items.items():
bm25_norm = float(item.get("bm25_score", 0) or 0)
emb_norm = float(item.get("embedding_score", 0) or 0)
# normalized_activation_value 为 None 表示该节点无激活值,保留 None 语义
raw_act_norm = item.get("normalized_activation_value")
act_norm = float(raw_act_norm) if raw_act_norm is not None else None
act_norm = float(item.get("normalized_activation_value", 0) or 0)
# 第一阶段只考虑内容相关性BM25 + Embedding
# alpha 控制 BM25 权重,(1-alpha) 控制 Embedding 权重
content_score = alpha * bm25_norm + (1 - alpha) * emb_norm
base_score = content_score # 第一阶段用内容分数
# 存储激活度分数供第二阶段使用None 表示无激活值,不参与激活值排序)
item["activation_score"] = act_norm # 可能为 None
# 存储激活度分数供第二阶段使用
item["activation_score"] = act_norm
item["content_score"] = content_score
item["base_score"] = base_score
@@ -726,8 +724,6 @@ async def run_hybrid_search(
try:
keyword_task = None
embedding_task = None
keyword_results: Dict[str, List] = {}
embedding_results: Dict[str, List] = {}
if search_type in ["keyword", "hybrid"]:
# Keyword-based search
@@ -750,42 +746,35 @@ async def run_hybrid_search(
# 从数据库读取嵌入器配置(按 ID并构建 RedBearModelConfig
config_load_start = time.time()
try:
with get_db_context() as db:
config_service = MemoryConfigService(db)
embedder_config_dict = config_service.get_embedder_config(str(memory_config.embedding_model_id))
rb_config = RedBearModelConfig(
model_name=embedder_config_dict["model_name"],
provider=embedder_config_dict["provider"],
api_key=embedder_config_dict["api_key"],
base_url=embedder_config_dict["base_url"],
type="llm"
)
config_load_time = time.time() - config_load_start
logger.info(f"[PERF] Config loading took {config_load_time:.4f}s")
with get_db_context() as db:
config_service = MemoryConfigService(db)
embedder_config_dict = config_service.get_embedder_config(str(memory_config.embedding_model_id))
rb_config = RedBearModelConfig(
model_name=embedder_config_dict["model_name"],
provider=embedder_config_dict["provider"],
api_key=embedder_config_dict["api_key"],
base_url=embedder_config_dict["base_url"],
type="llm"
)
config_load_time = time.time() - config_load_start
logger.info(f"[PERF] Config loading took {config_load_time:.4f}s")
# Init embedder
embedder_init_start = time.time()
embedder = OpenAIEmbedderClient(model_config=rb_config)
embedder_init_time = time.time() - embedder_init_start
logger.info(f"[PERF] Embedder init took {embedder_init_time:.4f}s")
embedding_task = asyncio.create_task(
search_graph_by_embedding(
connector=connector,
embedder_client=embedder,
query_text=query_text,
end_user_id=end_user_id,
limit=limit,
include=include,
)
# Init embedder
embedder_init_start = time.time()
embedder = OpenAIEmbedderClient(model_config=rb_config)
embedder_init_time = time.time() - embedder_init_start
logger.info(f"[PERF] Embedder init took {embedder_init_time:.4f}s")
embedding_task = asyncio.create_task(
search_graph_by_embedding(
connector=connector,
embedder_client=embedder,
query_text=query_text,
end_user_id=end_user_id,
limit=limit,
include=include,
)
except Exception as emb_init_err:
logger.warning(
f"[PERF] Embedding search skipped due to init error "
f"(embedding_model_id={memory_config.embedding_model_id}): {emb_init_err}"
)
embedding_task = None
)
if keyword_task:
keyword_results = await keyword_task

View File

@@ -7,7 +7,6 @@
- 增量更新incremental_update新实体到达时只处理新实体及其邻居
"""
import asyncio
import logging
import uuid
from math import sqrt
@@ -20,9 +19,8 @@ logger = logging.getLogger(__name__)
# 全量迭代最大轮数,防止不收敛
MAX_ITERATIONS = 10
# 社区核心实体取 top-N 数量
CORE_ENTITY_LIMIT = 10
# 社区摘要核心实体数量
CORE_ENTITY_LIMIT = 5
def _cosine_similarity(v1: Optional[List[float]], v2: Optional[List[float]]) -> float:
@@ -69,11 +67,13 @@ class LabelPropagationEngine:
def __init__(
self,
connector: Neo4jConnector,
config_id: Optional[str] = None,
llm_model_id: Optional[str] = None,
embedding_model_id: Optional[str] = None,
):
self.connector = connector
self.repo = CommunityRepository(connector)
self.config_id = config_id
self.llm_model_id = llm_model_id
self.embedding_model_id = embedding_model_id
@@ -105,81 +105,58 @@ class LabelPropagationEngine:
async def full_clustering(self, end_user_id: str) -> None:
"""
全量标签传播初始化(分批处理,控制内存峰值)
全量标签传播初始化。
策略:
- 每次只加载 BATCH_SIZE 个实体及其邻居进内存
- labels 字典跨批次共享(只存 id→community_id内存极小
- 每批独立跑 MAX_ITERATIONS 轮 LPA批次间通过 labels 传递社区信息
- 所有批次完成后统一 flush 和 merge
1. 拉取所有实体,初始化每个实体为独立社区
2. 迭代:每轮对所有实体做邻居投票,更新社区标签
3. 直到标签不再变化或达到 MAX_ITERATIONS
4. 将最终标签写入 Neo4j
"""
BATCH_SIZE = 888 # 每批实体数,可按需调整
# 轻量查询:只获取总数和 ID 列表,不加载 embedding 等大字段
total_count = await self.repo.get_entity_count(end_user_id)
if not total_count:
entities = await self.repo.get_all_entities(end_user_id)
if not entities:
logger.info(f"[Clustering] 用户 {end_user_id} 无实体,跳过全量聚类")
return
all_entity_ids = await self.repo.get_all_entity_ids(end_user_id)
logger.info(f"[Clustering] 用户 {end_user_id}{total_count} 个实体,"
f"分批大小 {BATCH_SIZE},共 {(total_count + BATCH_SIZE - 1) // BATCH_SIZE}")
# 初始化:每个实体持有自己 id 作为社区标签
labels: Dict[str, str] = {e["id"]: e["id"] for e in entities}
embeddings: Dict[str, Optional[List[float]]] = {
e["id"]: e.get("name_embedding") for e in entities
}
# labels 跨批次共享:只存 id→community_id内存极小
labels: Dict[str, str] = {eid: eid for eid in all_entity_ids}
del all_entity_ids # 释放 ID 列表,后续按批次加载完整数据
# 预加载所有实体的邻居,避免迭代内 O(iterations * |E|) 次 Neo4j 往返
logger.info(f"[Clustering] 预加载 {len(entities)} 个实体的邻居图...")
neighbors_cache: Dict[str, List[Dict]] = await self.repo.get_all_entity_neighbors_batch(end_user_id)
logger.info(f"[Clustering] 邻居预加载完成,覆盖实体数: {len(neighbors_cache)}")
for batch_start in range(0, total_count, BATCH_SIZE):
batch_entities = await self.repo.get_entities_page(
end_user_id, skip=batch_start, limit=BATCH_SIZE
)
if not batch_entities:
break
for iteration in range(MAX_ITERATIONS):
changed = 0
# 随机顺序Python dict 在 3.7+ 保持插入顺序,这里直接遍历)
for entity in entities:
eid = entity["id"]
# 直接从缓存取邻居,不再发起 Neo4j 查询
neighbors = neighbors_cache.get(eid, [])
batch_ids = [e["id"] for e in batch_entities]
batch_embeddings: Dict[str, Optional[List[float]]] = {
e["id"]: e.get("name_embedding") for e in batch_entities
}
# 将邻居的当前内存标签注入(覆盖 Neo4j 中的旧值)
enriched = []
for nb in neighbors:
nb_copy = dict(nb)
nb_copy["community_id"] = labels.get(nb["id"], nb.get("community_id"))
enriched.append(nb_copy)
new_label = _weighted_vote(enriched, embeddings.get(eid))
if new_label and new_label != labels[eid]:
labels[eid] = new_label
changed += 1
logger.info(
f"[Clustering] 批次 {batch_start // BATCH_SIZE + 1}"
f"加载 {len(batch_entities)} 个实体的邻居图..."
f"[Clustering] 全量迭代 {iteration + 1}/{MAX_ITERATIONS}"
f"标签变化数: {changed}"
)
neighbors_cache = await self.repo.get_entity_neighbors_for_ids(
batch_ids, end_user_id
)
logger.info(f"[Clustering] 邻居预加载完成,覆盖实体数: {len(neighbors_cache)}")
if changed == 0:
logger.info("[Clustering] 标签已收敛,提前结束迭代")
break
for iteration in range(MAX_ITERATIONS):
changed = 0
for entity in batch_entities:
eid = entity["id"]
neighbors = neighbors_cache.get(eid, [])
# 注入跨批次的最新标签邻居可能在其他批次labels 里有其最新值)
enriched = []
for nb in neighbors:
nb_copy = dict(nb)
nb_copy["community_id"] = labels.get(nb["id"], nb.get("community_id"))
enriched.append(nb_copy)
new_label = _weighted_vote(enriched, batch_embeddings.get(eid))
if new_label and new_label != labels[eid]:
labels[eid] = new_label
changed += 1
logger.info(
f"[Clustering] 批次 {batch_start // BATCH_SIZE + 1} "
f"迭代 {iteration + 1}/{MAX_ITERATIONS},标签变化数: {changed}"
)
if changed == 0:
logger.info("[Clustering] 标签已收敛,提前结束本批迭代")
break
# 释放本批次的大对象
del neighbors_cache, batch_embeddings, batch_entities
# 所有批次完成,统一写入 Neo4j
# 将最终标签写入 Neo4j
await self._flush_labels(labels, end_user_id)
pre_merge_count = len(set(labels.values()))
logger.info(
@@ -187,6 +164,7 @@ class LabelPropagationEngine:
f"{len(labels)} 个实体,开始后处理合并"
)
# 全量初始化后做一轮社区合并(基于 name_embedding 余弦相似度)
all_community_ids = list(set(labels.values()))
await self._evaluate_merge(all_community_ids, end_user_id)
@@ -194,15 +172,17 @@ class LabelPropagationEngine:
f"[Clustering] 全量聚类完成,合并前 {pre_merge_count} 个社区,"
f"{len(labels)} 个实体"
)
# 查询存活社区并生成元数据
# 为所有社区生成元数据
# 注意_evaluate_merge 后部分社区已被合并消解,需重新从 Neo4j 查询实际存活社区
# 不能复用 labels.values(),那里包含已被 dissolve 的旧社区 ID
surviving_communities = await self.repo.get_all_entities(end_user_id)
surviving_community_ids = list({
e.get("community_id") for e in surviving_communities
if e.get("community_id")
})
logger.info(f"[Clustering] 合并后实际存活社区数: {len(surviving_community_ids)}")
await self._generate_community_metadata(surviving_community_ids, end_user_id)
for cid in surviving_community_ids:
await self._generate_community_metadata(cid, end_user_id)
async def incremental_update(
self, new_entity_ids: List[str], end_user_id: str
@@ -259,7 +239,7 @@ class LabelPropagationEngine:
logger.debug(
f"[Clustering] 新实体 {entity_id}{len(neighbors)} 个无社区邻居 → 新社区 {new_cid}"
)
await self._generate_community_metadata([new_cid], end_user_id)
await self._generate_community_metadata(new_cid, end_user_id)
else:
# 加入得票最多的社区
await self.repo.assign_entity_to_community(entity_id, target_cid, end_user_id)
@@ -271,7 +251,7 @@ class LabelPropagationEngine:
await self._evaluate_merge(
list(community_ids_in_neighbors), end_user_id
)
await self._generate_community_metadata([target_cid], end_user_id)
await self._generate_community_metadata(target_cid, end_user_id)
async def _evaluate_merge(
self, community_ids: List[str], end_user_id: str
@@ -435,137 +415,93 @@ class LabelPropagationEngine:
except Exception:
return None
@staticmethod
def _build_entity_lines(members: List[Dict]) -> List[str]:
"""将实体列表格式化为 prompt 行,包含 name、aliases、description、example。"""
lines = []
for m in members:
m_name = m.get("name", "")
aliases = m.get("aliases") or []
description = m.get("description") or ""
example = m.get("example") or ""
aliases_str = f"(别名:{''.join(aliases)}" if aliases else ""
desc_str = f"{description}" if description else ""
example_str = f"(示例:{example}" if example else ""
lines.append(f"- {m_name}{aliases_str}{desc_str}{example_str}")
return lines
async def _generate_community_metadata(
self, community_ids: List[str], end_user_id: str
self, community_id: str, end_user_id: str
) -> None:
"""
一个或多个社区生成并写入元数据。
为社区生成并写入元数据:名称、摘要、核心实体
流程:
1. 逐个社区调 LLM 生成 name / summary串行
2. 收集所有 summary一次性批量 embed
3. 单个社区用 update_community_metadata多个用 batch_update_community_metadata
- core_entities按 activation_value 排序取 top-N 实体名称列表(无需 LLM
- name / summary若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底
"""
if not community_ids:
return
try:
# 先检查属性是否已完整,完整则跳过,避免重复生成
check_embedding = bool(self.embedding_model_id)
if await self.repo.is_community_complete(community_id, end_user_id, check_embedding=check_embedding):
logger.debug(f"[Clustering] 社区 {community_id} 属性已完整,跳过生成")
return
from app.db import get_db_context
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
# --- 阶段1并发调 LLM 生成每个社区的 name / summary ---
async def _build_one(cid: str):
members = await self.repo.get_community_members(cid, end_user_id)
members = await self.repo.get_community_members(community_id, end_user_id)
if not members:
return None
return
# 核心实体:按 activation_value 降序取 top-N
sorted_members = sorted(
members,
key=lambda m: m.get("activation_value") or 0,
reverse=True,
)
core_entities = [m["name"] for m in sorted_members[:CORE_ENTITY_LIMIT] if m.get("name")]
all_names = [m["name"] for m in members if m.get("name")]
entity_list_str = "\n".join(self._build_entity_lines(members))
name = "".join(core_entities[:3]) if core_entities else community_id[:8]
summary = f"包含实体:{', '.join(all_names)}"
# 方案四:注入社区内实体间关系三元组
relationships = await self.repo.get_community_relationships(cid, end_user_id)
rel_lines = [
f"- {r['subject']}{r['predicate']}{r['object']}"
for r in relationships
if r.get("subject") and r.get("predicate") and r.get("object")
]
rel_section = (
f"\n实体间关系:\n" + "\n".join(rel_lines)
if rel_lines else ""
# 若有 LLM 配置,调用 LLM 生成更好的名称和摘要
if self.llm_model_id:
try:
from app.db import get_db_context
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
entity_list_str = "".join(all_names)
prompt = (
f"以下是一组语义相关的实体:{entity_list_str}\n\n"
f"请为这组实体所代表的主题:\n"
f"1. 起一个简洁的中文名称不超过10个字\n"
f"2. 写一句话摘要不超过50个字\n\n"
f"严格按以下格式输出,不要有其他内容:\n"
f"名称:<名称>\n摘要:<摘要>"
)
with get_db_context() as db:
factory = MemoryClientFactory(db)
llm_client = factory.get_llm_client(self.llm_model_id)
response = await llm_client.chat([{"role": "user", "content": prompt}])
text = response.content if hasattr(response, "content") else str(response)
for line in text.strip().splitlines():
if line.startswith("名称:"):
name = line[3:].strip()
elif line.startswith("摘要:"):
summary = line[3:].strip()
except Exception as e:
logger.warning(f"[Clustering] LLM 生成社区元数据失败,使用兜底值: {e}")
# 生成 summary_embedding
summary_embedding: Optional[List[float]] = None
if self.embedding_model_id and summary:
try:
from app.db import get_db_context
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
with get_db_context() as db:
embedder = MemoryClientFactory(db).get_embedder_client(self.embedding_model_id)
vectors = await embedder.response([summary])
if vectors:
summary_embedding = vectors[0]
except Exception as e:
logger.warning(f"[Clustering] 社区 {community_id} 生成 summary_embedding 失败: {e}")
await self.repo.update_community_metadata(
community_id=community_id,
end_user_id=end_user_id,
name=name,
summary=summary,
core_entities=core_entities,
summary_embedding=summary_embedding,
)
prompt = (
f"以下是一组语义相关的实体:\n{entity_list_str}{rel_section}\n\n"
f"请为这组实体所代表的主题:\n"
f"1. 起一个简洁的中文名称不超过10个字\n"
f"2. 写一句话摘要不超过80个字\n\n"
f"严格按以下格式输出,不要有其他内容:\n"
f"名称:<名称>\n摘要:<摘要>"
)
with get_db_context() as db:
llm_client = MemoryClientFactory(db).get_llm_client(self.llm_model_id)
response = await llm_client.chat([{"role": "user", "content": prompt}])
text = response.content if hasattr(response, "content") else str(response)
name, summary = "", ""
for line in text.strip().splitlines():
if line.startswith("名称:"):
name = line[3:].strip()
elif line.startswith("摘要:"):
summary = line[3:].strip()
return {
"community_id": cid,
"end_user_id": end_user_id,
"name": name,
"summary": summary,
"core_entities": core_entities,
"summary_embedding": None,
}
results = await asyncio.gather(
*[_build_one(cid) for cid in community_ids],
return_exceptions=True,
)
metadata_list = []
for cid, res in zip(community_ids, results):
if isinstance(res, Exception):
logger.error(f"[Clustering] 社区 {cid} 元数据准备失败: {res}", exc_info=res)
elif res is not None:
metadata_list.append(res)
if not metadata_list:
return
# --- 阶段2批量生成 summary_embedding ---
summaries = [m["summary"] for m in metadata_list]
with get_db_context() as db:
embedder = MemoryClientFactory(db).get_embedder_client(self.embedding_model_id)
embeddings = await embedder.response(summaries)
for i, meta in enumerate(metadata_list):
meta["summary_embedding"] = embeddings[i] if i < len(embeddings) else None
# --- 阶段3写入单个 or 批量)---
if len(metadata_list) == 1:
m = metadata_list[0]
result = await self.repo.update_community_metadata(
community_id=m["community_id"],
end_user_id=m["end_user_id"],
name=m["name"],
summary=m["summary"],
core_entities=m["core_entities"],
summary_embedding=m["summary_embedding"],
)
if result:
logger.info(f"[Clustering] 社区 {m['community_id']} 元数据写入成功: name={m['name']}, summary={m['summary'][:30]}...")
else:
logger.warning(f"[Clustering] 社区 {m['community_id']} 元数据写入返回 False")
else:
ok = await self.repo.batch_update_community_metadata(metadata_list)
if ok:
logger.info(f"[Clustering] 批量写入 {len(metadata_list)} 个社区元数据成功")
else:
logger.warning(f"[Clustering] 批量写入社区元数据失败")
logger.debug(f"[Clustering] 社区 {community_id} 元数据已更新: name={name}")
except Exception as e:
logger.error(f"[Clustering] _generate_community_metadata failed for {community_id}: {e}")
@staticmethod
def _new_community_id() -> str:

View File

@@ -20,6 +20,7 @@ from pydantic import BaseModel, Field
from app.core.memory.models.message_models import DialogData, ConversationMessage, ConversationContext
from app.core.memory.models.config_models import PruningConfig
from app.core.memory.utils.config.config_utils import get_pruning_config
from app.core.memory.utils.prompt.prompt_utils import prompt_env, log_prompt_rendering, log_template_rendering
from app.core.memory.storage_services.extraction_engine.data_preprocessing.scene_config import (
SceneConfigRegistry,
@@ -33,8 +34,6 @@ class DialogExtractionResponse(BaseModel):
- is_related对话与场景的相关性判定。
- times / ids / amounts / contacts / addresses / keywords重要信息片段用来在不相关对话中保留关键消息。
- preserve_keywords情绪/兴趣/爱好/个人观点相关词,包含这些词的消息必须强制保留。
- scene_unrelated_snippets与当前场景无关且无语义关联的消息片段原文截取
用于高阈值阶段精准删除跨场景内容。
"""
is_related: bool = Field(...)
times: List[str] = Field(default_factory=list)
@@ -44,7 +43,6 @@ class DialogExtractionResponse(BaseModel):
addresses: List[str] = Field(default_factory=list)
keywords: List[str] = Field(default_factory=list)
preserve_keywords: List[str] = Field(default_factory=list, description="情绪/兴趣/爱好/个人观点相关词,包含这些词的消息强制保留")
scene_unrelated_snippets: List[str] = Field(default_factory=list,description="与当前场景无关且无语义关联的消息原文片段,高阈值阶段用于精准删除跨场景内容")
class MessageImportanceResponse(BaseModel):
@@ -93,14 +91,12 @@ class SemanticPruner:
# 加载统一填充词库
self.scene_config: ScenePatterns = SceneConfigRegistry.get_config(self.config.pruning_scene)
# 本体类型列表:直接使用 ontology_class_infosname + description
self._ontology_class_infos = getattr(self.config, "ontology_class_infos", None) or []
# _ontology_classes 仅用于日志统计
self._ontology_classes = [info.class_name for info in self._ontology_class_infos]
# 本体类型列表(用于注入提示词,所有场景均支持
self._ontology_classes = getattr(self.config, "ontology_classes", None) or []
self._log(f"[剪枝-初始化] 场景={self.config.pruning_scene}")
if self._ontology_class_infos:
self._log(f"[剪枝-初始化] 注入本体类型({len(self._ontology_class_infos)}个): {self._ontology_classes}")
if self._ontology_classes:
self._log(f"[剪枝-初始化] 注入本体类型: {self._ontology_classes}")
else:
self._log(f"[剪枝-初始化] 未找到本体类型,将使用通用提示词")
@@ -125,8 +121,7 @@ class SemanticPruner:
1. 空消息
2. 场景特定填充词库精确匹配
3. 常见寒暄精确匹配
4. 组合寒暄模式(前缀+后缀组合,如"好的谢谢""同学你好""明白了"
5. 纯表情/标点
4. 纯表情/标点
"""
t = message.msg.strip()
if not t:
@@ -148,55 +143,6 @@ class SemanticPruner:
if t in common_greetings:
return True
# 组合寒暄模式短消息≤15字且完全由寒暄成分构成
# 策略:将消息拆分后,每个片段都能在填充词库或常见寒暄中找到,则整体为填充
if len(t) <= 15:
# 确认+称呼/感谢组合,如"好的谢谢"、"明白了"、"知道了谢谢"
_confirm_prefixes = {"好的", "", "", "嗯嗯", "", "明白", "明白了", "知道了", "了解", "收到", "没问题"}
_thanks_suffixes = {"谢谢", "谢谢你", "谢谢您", "多谢", "感谢", "谢了"}
_greeting_suffixes = {"你好", "您好", "老师好", "同学好", "大家好"}
_greeting_prefixes = {"同学", "老师", "您好", "你好"}
_close_patterns = {
"没有了", "没事了", "没问题了", "好了", "行了", "可以了",
"不用了", "不需要了", "就这样", "就这样吧", "那就这样",
}
_polite_responses = {
"不客气", "不用谢", "没关系", "没事", "应该的", "这是我应该做的",
}
# 规则1确认词 + 感谢词(如"好的谢谢"、"嗯谢谢"
for cp in _confirm_prefixes:
for ts in _thanks_suffixes:
if t == cp + ts or t == cp + "" + ts or t == cp + "," + ts:
return True
# 规则2称呼前缀 + 问候(如"同学你好"、"老师好"
for gp in _greeting_prefixes:
for gs in _greeting_suffixes:
if t == gp + gs or t.startswith(gp) and t.endswith(""):
return True
# 规则3结束语 + 感谢(如"没有了,谢谢老师"、"没有了谢谢"
for cp in _close_patterns:
if t.startswith(cp):
remainder = t[len(cp):].lstrip(",、 ")
if not remainder or any(remainder.startswith(ts) for ts in _thanks_suffixes):
return True
# 规则4礼貌回应如"不客气,祝你考试顺利"——前缀是礼貌词,后半是祝福套话)
for pr in _polite_responses:
if t.startswith(pr):
remainder = t[len(pr):].lstrip(",、 ")
# 后半是祝福/套话(不含实质信息)
if not remainder or re.match(r"^(祝|希望|期待|加油|顺利|好好|保重)", remainder):
return True
# 规则5纯确认词加"了"后缀(如"明白了"、"知道了"、"好了"
_confirm_base = {"明白", "知道", "了解", "收到", "", "", "可以", "没问题"}
for cb in _confirm_base:
if t == cb + "" or t == cb + "了。" or t == cb + "了!":
return True
# 检查是否为纯表情符号(方括号包裹)
if re.fullmatch(r"(\[[^\]]+\])+", t):
return True
@@ -385,13 +331,13 @@ class SemanticPruner:
rendered = self.template.render(
pruning_scene=self.config.pruning_scene,
ontology_class_infos=self._ontology_class_infos,
ontology_classes=self._ontology_classes,
dialog_text=dialog_text,
language=self.language
)
log_template_rendering("extracat_Pruning.jinja2", {
"pruning_scene": self.config.pruning_scene,
"ontology_class_infos_count": len(self._ontology_class_infos),
"ontology_classes_count": len(self._ontology_classes),
"language": self.language
})
log_prompt_rendering("pruning-extract", rendered)
@@ -431,183 +377,6 @@ class SemanticPruner:
)
return fallback_response
def _get_pruning_mode(self) -> str:
"""根据 pruning_threshold 返回当前剪枝阶段。
- 低阈值 [0.0, 0.3)conservative 只删填充,保留所有实质内容
- 中阈值 [0.3, 0.6)semantic 保留场景相关 + 有语义关联的内容,删除无关联内容
- 高阈值 [0.6, 0.9]strict 只保留场景相关内容,跨场景内容可被删除
"""
t = float(self.config.pruning_threshold)
if t < 0.3:
return "conservative"
elif t < 0.6:
return "semantic"
else:
return "strict"
def _apply_related_dialog_pruning(
self,
msgs: List[ConversationMessage],
extraction: "DialogExtractionResponse",
dialog_label: str,
pruning_mode: str,
) -> List[ConversationMessage]:
"""相关对话统一剪枝入口,消除 prune_dialog / prune_dataset 中的重复逻辑。
- conservative只删填充
- semantic / strict场景感知剪枝
"""
if pruning_mode == "conservative":
preserve_tokens = self._build_preserve_tokens(extraction)
return self._prune_fillers_only(msgs, preserve_tokens, dialog_label)
else:
return self._prune_with_scene_filter(msgs, extraction, dialog_label, pruning_mode)
def _prune_fillers_only(
self,
msgs: List[ConversationMessage],
preserve_tokens: List[str],
dialog_label: str,
) -> List[ConversationMessage]:
"""相关对话专用只删填充消息LLM 保护消息和实质内容一律保留。
不受 pruning_threshold 约束,删多少算多少(填充有多少删多少)。
至少保留 1 条消息。
注意:填充检测优先于 preserve_tokens 保护——填充消息本身无信息价值,
即使 LLM 误将其关键词放入 preserve_tokens 也应删除。
"""
to_delete_ids: set = set()
for m in msgs:
# 填充检测优先:先判断是否为填充,再看 LLM 保护
if self._is_filler_message(m):
to_delete_ids.add(id(m))
self._log(f" [填充] '{m.msg[:40]}' → 删除")
continue
if self._msg_matches_tokens(m, preserve_tokens):
self._log(f" [保护] '{m.msg[:40]}' → LLM保护跳过")
kept = [m for m in msgs if id(m) not in to_delete_ids]
if not kept and msgs:
kept = [msgs[0]]
deleted = len(msgs) - len(kept)
self._log(
f"[剪枝-相关] {dialog_label} 总消息={len(msgs)} "
f"填充删除={deleted} 保留={len(kept)}"
)
return kept
def _prune_with_scene_filter(
self,
msgs: List[ConversationMessage],
extraction: "DialogExtractionResponse",
dialog_label: str,
mode: str,
) -> List[ConversationMessage]:
"""场景感知剪枝,供 semantic / strict 两个阈值档位调用。
本函数体现剪枝系统的三层递进逻辑:
第一层conservative阈值 < 0.3
不进入本函数,由 _prune_fillers_only 处理。
保留标准:只问"有没有信息量",填充消息(嗯/好的/哈哈等)删除,其余一律保留。
第二层semantic阈值 [0.3, 0.6)
保留标准:内容价值优先,场景相关性是参考而非唯一标准。
- 填充消息 → 删除(最高优先级)
- 场景相关消息 → 保留
- 场景无关消息 → 有两次豁免机会:
1. 命中 scene_preserve_tokensLLM 标记的关键词/时间/金额等)→ 保留
2. 含情感词(感觉/压力/开心等)→ 保留(情感内容有记忆价值)
3. 两次豁免均未命中 → 删除
第三层strict阈值 [0.6, 0.9]
保留标准:场景相关性优先,无任何豁免。
- 填充消息 → 删除(最高优先级)
- 场景相关消息 → 保留
- 场景无关消息 → 直接删除preserve_keywords 和情感词在此模式下均不生效
至少保留 1 条消息(兜底取第一条)。
"""
# strict 模式收窄保护范围:只保护结构化关键信息(时间/编号/金额/联系方式/地址),
# 不保护 keywords / preserve_keywords让场景过滤能删掉更多内容。
# semantic 模式完整保护:包含 LLM 抽取的所有重要片段(含 keywords 和 preserve_keywords
if mode == "strict":
scene_preserve_tokens = (
extraction.times + extraction.ids + extraction.amounts +
extraction.contacts + extraction.addresses
)
else:
scene_preserve_tokens = self._build_preserve_tokens(extraction)
unrelated_snippets = extraction.scene_unrelated_snippets or []
to_delete_ids: set = set()
for m in msgs:
msg_text = m.msg.strip()
# 第一优先级:填充消息无论模式直接删除,不参与后续场景判断
if self._is_filler_message(m):
to_delete_ids.add(id(m))
self._log(f" [填充] '{msg_text[:40]}' → 删除")
continue
# 双向包含匹配:处理 LLM 返回片段与原始消息文本长度不完全一致的情况
is_scene_unrelated = any(
snip and (snip in msg_text or msg_text in snip)
for snip in unrelated_snippets
)
if is_scene_unrelated:
if mode == "strict":
# strict场景无关直接删除不做任何豁免
# 场景相关性是唯一裁决标准preserve_keywords 在此模式下不生效
to_delete_ids.add(id(m))
self._log(f" [场景无关-严格] '{msg_text[:40]}' → 删除")
elif mode == "semantic":
# semantic场景无关但有内容价值 → 保留
# 豁免第一层:命中 scene_preserve_tokens关键词/结构化信息保护)
if self._msg_matches_tokens(m, scene_preserve_tokens):
self._log(f" [保护] '{msg_text[:40]}' → 场景关键词保护,保留")
else:
# 豁免第二层:含情感词,认为有情境记忆价值,即使场景无关也保留
has_contextual_emotion = any(
word in msg_text
for word in ["感觉", "觉得", "心情", "开心", "难过", "高兴", "沮丧",
"喜欢", "讨厌", "", "", "担心", "害怕", "兴奋",
"压力", "", "疲惫", "", "焦虑", "委屈", "感动"]
)
if not has_contextual_emotion:
to_delete_ids.add(id(m))
self._log(f" [场景无关-语义] '{msg_text[:40]}' → 删除(无情感关联)")
else:
self._log(f" [场景关联-保留] '{msg_text[:40]}' → 有情感关联,保留")
else:
# 不在 scene_unrelated_snippets 中 → 场景相关,直接保留
if self._msg_matches_tokens(m, scene_preserve_tokens):
self._log(f" [保护] '{msg_text[:40]}' → LLM保护跳过")
# else: 普通场景相关消息,保留,不输出日志
kept = [m for m in msgs if id(m) not in to_delete_ids]
if not kept and msgs:
kept = [msgs[0]]
deleted = len(msgs) - len(kept)
self._log(
f"[剪枝-{mode}] {dialog_label} 总消息={len(msgs)} "
f"删除={deleted} 保留={len(kept)}"
)
return kept
def _build_preserve_tokens(self, extraction: "DialogExtractionResponse") -> List[str]:
"""统一构建 preserve_tokens合并 LLM 抽取的所有重要片段。"""
return (
extraction.times + extraction.ids + extraction.amounts +
extraction.contacts + extraction.addresses + extraction.keywords +
extraction.preserve_keywords
)
def _msg_matches_tokens(self, message: ConversationMessage, tokens: List[str]) -> bool:
"""判断消息是否包含任意抽取到的重要片段。"""
if not tokens:
@@ -628,18 +397,16 @@ class SemanticPruner:
proportion = float(self.config.pruning_threshold)
extraction = await self._extract_dialog_important(dialog.content)
pruning_mode = self._get_pruning_mode()
self._log(f"[剪枝-模式] 阈值={proportion} → 模式={pruning_mode}")
if extraction.is_related:
kept = self._apply_related_dialog_pruning(
dialog.context.msgs, extraction, f"对话ID={dialog.id}", pruning_mode
)
dialog.context = ConversationContext(msgs=kept)
# 相关对话不剪枝
return dialog
# 在不相关对话中LLM 已通过 preserve_tokens 标记需要保护的内容
preserve_tokens = self._build_preserve_tokens(extraction)
preserve_tokens = (
extraction.times + extraction.ids + extraction.amounts +
extraction.contacts + extraction.addresses + extraction.keywords +
extraction.preserve_keywords
)
msgs = dialog.context.msgs
# 分类:填充 / 其他可删LLM保护消息通过不加入任何桶来隐式保护
@@ -714,30 +481,11 @@ class SemanticPruner:
self._log(
f"[剪枝-数据集] 对话总数={len(dialogs)} 场景={self.config.pruning_scene} 删除比例={proportion} 开关={self.config.pruning_switch} 模式=消息级独立判断"
)
pruning_mode = self._get_pruning_mode()
self._log(f"[剪枝-数据集] 阈值={proportion} → 剪枝阶段={pruning_mode}")
result: List[DialogData] = []
total_original_msgs = 0
total_deleted_msgs = 0
# 统计对象:直接收集结构化数据,无需事后正则解析
stats = {
"scene": self.config.pruning_scene,
"dialog_total": len(dialogs),
"deletion_ratio": proportion,
"enabled": self.config.pruning_switch,
"pruning_mode": pruning_mode,
"related_count": 0,
"unrelated_count": 0,
"related_indices": [],
"unrelated_indices": [],
"total_deleted_messages": 0,
"remaining_dialogs": 0,
"dialogs": [],
}
# 并发执行所有对话的 LLM 抽取(获取 preserve_keywords 等保护信息)
semaphore = asyncio.Semaphore(self.max_concurrent)
@@ -757,31 +505,12 @@ class SemanticPruner:
original_count = len(msgs)
total_original_msgs += original_count
# 相关对话:根据阶段决定处理力度
if extraction.is_related:
stats["related_count"] += 1
stats["related_indices"].append(d_idx + 1)
kept = self._apply_related_dialog_pruning(
msgs, extraction, f"对话 {d_idx+1}", pruning_mode
)
deleted_count = original_count - len(kept)
total_deleted_msgs += deleted_count
dd.context.msgs = kept
result.append(dd)
stats["dialogs"].append({
"index": d_idx + 1,
"is_related": True,
"total_messages": original_count,
"deleted": deleted_count,
"kept": len(kept),
})
continue
stats["unrelated_count"] += 1
stats["unrelated_indices"].append(d_idx + 1)
# 从 LLM 抽取结果中获取所有需要保留的 token
preserve_tokens = self._build_preserve_tokens(extraction)
preserve_tokens = (
extraction.times + extraction.ids + extraction.amounts +
extraction.contacts + extraction.addresses + extraction.keywords +
extraction.preserve_keywords # 情绪/兴趣/爱好关键词
)
# 判断是否需要详细日志
should_log_details = self._detailed_prune_logging and original_count <= self._max_debug_msgs_per_dialog
@@ -814,16 +543,16 @@ class SemanticPruner:
# important_msgs 仅用于日志统计
important_msgs = llm_protected_msgs
# 计算删除配额
delete_target = int(original_count * proportion)
if proportion > 0 and original_count > 0 and delete_target == 0:
delete_target = 1
# 确保至少保留1条消息
max_deletable = max(0, original_count - 1)
delete_target = min(delete_target, max_deletable)
# 删除策略:优先删填充消息,再按出现顺序删其余可删消息
to_delete_indices = set()
deleted_details = []
@@ -841,65 +570,50 @@ class SemanticPruner:
break
to_delete_indices.add(idx)
deleted_details.append(f"[{idx}] 可删: '{msg.msg[:50]}'")
# 执行删除
kept_msgs = []
for idx, m in enumerate(msgs):
if idx not in to_delete_indices:
kept_msgs.append(m)
# 确保至少保留1条
if not kept_msgs and msgs:
kept_msgs = [msgs[0]]
dd.context.msgs = kept_msgs
deleted_count = original_count - len(kept_msgs)
total_deleted_msgs += deleted_count
# 输出删除详情
if deleted_details:
self._log(f"[剪枝-删除详情] 对话 {d_idx+1} 删除了以下消息:")
for detail in deleted_details:
self._log(f" {detail}")
# ========== 问答对统计(已注释) ==========
# qa_info = f",问答对={len(qa_pairs)}" if qa_pairs else ""
# ========================================
self._log(
f"[剪枝-对话] 对话 {d_idx+1} 总消息={original_count} "
f"(保护={len(important_msgs)} 填充={len(filler_msgs)} 可删={len(deletable_msgs)}) "
f"删除={deleted_count} 保留={len(kept_msgs)}"
)
stats["dialogs"].append({
"index": d_idx + 1,
"is_related": False,
"total_messages": original_count,
"protected": len(important_msgs),
"fillers": len(filler_msgs),
"deletable": len(deletable_msgs),
"deleted": deleted_count,
"kept": len(kept_msgs),
})
result.append(dd)
# 补全统计对象
stats["total_deleted_messages"] = total_deleted_msgs
stats["remaining_dialogs"] = len(result)
self._log(f"[剪枝-数据集] 剩余对话数={len(result)}")
self._log(f"[剪枝-数据集] 相关对话数={stats['related_count']} 不相关对话数={stats['unrelated_count']}")
self._log(f"[剪枝-数据集] 总删除 {total_deleted_msgs}")
# 直接序列化统计对象,无需正则解析
# 保存日志
try:
from app.core.config import settings
settings.ensure_memory_output_dir()
log_output_path = settings.get_memory_output_path("pruned_terminal.json")
sanitized_logs = [self._sanitize_log_line(l) for l in self.run_logs]
payload = self._parse_logs_to_structured(sanitized_logs)
with open(log_output_path, "w", encoding="utf-8") as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
json.dump(payload, f, ensure_ascii=False, indent=2)
except Exception as e:
self._log(f"[剪枝-数据集] 保存终端输出日志失败:{e}")
@@ -907,7 +621,7 @@ class SemanticPruner:
if not result:
print("警告: 语义剪枝后数据集为空,已回退为未剪枝数据以避免流程中断")
return dialogs
return result
def _log(self, msg: str) -> None:
@@ -919,4 +633,114 @@ class SemanticPruner:
pass
print(msg)
def _sanitize_log_line(self, line: str) -> str:
"""移除行首的方括号标签前缀,例如 [剪枝-数据集] 或 [剪枝-对话]。"""
try:
return re.sub(r"^\[[^\]]+\]\s*", "", line)
except Exception:
return line
def _parse_logs_to_structured(self, logs: List[str]) -> dict:
"""将已去前缀的日志列表解析为结构化 JSON便于数据对接。"""
summary = {
"scene": self.config.pruning_scene,
"dialog_total": None,
"deletion_ratio": None,
"enabled": None,
"related_count": None,
"unrelated_count": None,
"related_indices": [],
"unrelated_indices": [],
"total_deleted_messages": None,
"remaining_dialogs": None,
}
dialogs = []
# 解析函数
def parse_int(value: str) -> Optional[int]:
try:
return int(value)
except Exception:
return None
def parse_float(value: str) -> Optional[float]:
try:
return float(value)
except Exception:
return None
def parse_indices(s: str) -> List[int]:
s = s.strip()
if not s:
return []
parts = [p.strip() for p in s.split(",") if p.strip()]
out: List[int] = []
for p in parts:
try:
out.append(int(p))
except Exception:
pass
return out
# 正则
re_header = re.compile(r"对话总数=(\d+)\s+场景=([^\s]+)\s+删除比例=([0-9.]+)\s+开关=(True|False)")
re_counts = re.compile(r"相关对话数=(\d+)\s+不相关对话数=(\d+)")
re_indices = re.compile(r"相关对话:第\[(.*?)\]段;不相关对话:第\[(.*?)\]段")
re_dialog = re.compile(r"对话\s+(\d+)\s+总消息=(\d+)\s+分配删除=(\d+)\s+实删=(\d+)\s+保留=(\d+)")
re_total_del = re.compile(r"总删除\s+(\d+)\s+条")
re_remaining = re.compile(r"剩余对话数=(\d+)")
for line in logs:
# 第一行:总览
m = re_header.search(line)
if m:
summary["dialog_total"] = parse_int(m.group(1))
# 顶层 scene 依配置,这里不覆盖,但也可校验 m.group(2)
summary["deletion_ratio"] = parse_float(m.group(3))
summary["enabled"] = True if m.group(4) == "True" else False
continue
# 第二行:相关/不相关数量
m = re_counts.search(line)
if m:
summary["related_count"] = parse_int(m.group(1))
summary["unrelated_count"] = parse_int(m.group(2))
continue
# 第三行:相关/不相关索引
m = re_indices.search(line)
if m:
summary["related_indices"] = parse_indices(m.group(1))
summary["unrelated_indices"] = parse_indices(m.group(2))
continue
# 对话级统计
m = re_dialog.search(line)
if m:
dialogs.append({
"index": parse_int(m.group(1)),
"total_messages": parse_int(m.group(2)),
"quota_delete": parse_int(m.group(3)),
"actual_deleted": parse_int(m.group(4)),
"kept": parse_int(m.group(5)),
})
continue
# 全局删除总数
m = re_total_del.search(line)
if m:
summary["total_deleted_messages"] = parse_int(m.group(1))
continue
# 剩余对话数
m = re_remaining.search(line)
if m:
summary["remaining_dialogs"] = parse_int(m.group(1))
continue
return {
"scene": summary["scene"],
"timestamp": datetime.now().isoformat(),
"summary": {k: v for k, v in summary.items() if k != "scene"},
"dialogs": dialogs,
}

View File

@@ -384,14 +384,6 @@ class ExtractionOrchestrator:
logger.info(f"陈述句提取完成,共提取 {len(all_statements)} 条陈述句")
# 试运行模式下,所有分块提取完成后发送完成事件
if self.progress_callback and self.is_pilot_run:
await self.progress_callback(
"knowledge_extraction_complete",
f"陈述句提取完成,共提取 {len(all_statements)}",
{"total_statements": len(all_statements), "total_chunks": total_chunks}
)
return dialog_data_list
async def _extract_triplets(

View File

@@ -1,7 +1,6 @@
{#
对话级抽取与相关性判定模板(用于剪枝加速)
输入pruning_scene, ontology_class_infos, dialog_text, language
- ontology_class_infos: List[{class_name: str, class_description: str}]
输入pruning_scene, ontology_classes, dialog_text, language
输出:严格 JSON不要包含任何多余文本字段
- is_related: bool是否与所选场景相关
- times: [string],从对话中抽取的时间相关文本(日期、时间、时间段、有效期等)
@@ -19,16 +18,20 @@
#}
{# ── 确定场景说明 ── #}
{% if ontology_class_infos and ontology_class_infos | length > 0 %}
{% if ontology_classes and ontology_classes | length > 0 %}
{% if language == 'en' %}
{% set instruction = 'Scene "' ~ pruning_scene ~ '": The dialogue is relevant if it involves any of the following entity types.' %}
{% set custom_types_str = ontology_classes | join(', ') %}
{% set instruction = 'Scene "' ~ pruning_scene ~ '": The dialogue is related to this scene if it involves any of the following entity types: ' ~ custom_types_str ~ '.' %}
{% else %}
{% set instruction = '场景「' ~ pruning_scene ~ '」:对话涉及以下任意实体类型时视为相关。' %}
{% set custom_types_str = ontology_classes | join('、') %}
{% set instruction = '场景「' ~ pruning_scene ~ '」:对话涉及以下任意实体类型时视为相关:' ~ custom_types_str ~ '。' %}
{% endif %}
{% else %}
{% if language == 'en' %}
{% set custom_types_str = '' %}
{% set instruction = 'Scene "' ~ pruning_scene ~ '": Determine whether the dialogue content is relevant to this scene based on overall context.' %}
{% else %}
{% set custom_types_str = '' %}
{% set instruction = '场景「' ~ pruning_scene ~ '」:根据对话整体内容判断是否与该场景相关。' %}
{% endif %}
{% endif %}
@@ -39,17 +42,8 @@
2. 从对话中抽取所有需要保留的重要信息片段。
场景说明:{{ instruction }}
{% if ontology_class_infos and ontology_class_infos | length > 0 %}
【本场景实体类型定义】
以下实体类型定义了本场景中哪些内容是重要的。
凡是与以下任意类型相关的内容,都必须保留,并将关键词/短语提取到 keywords 字段:
{% for info in ontology_class_infos %}
- {{ info.class_name }}{{ info.class_description }}
{% endfor %}
重要提示只要对话中出现与上述任意实体类型相关的内容即判定为相关is_related=true
{% if custom_types_str %}
重要提示:只要对话中出现与上述实体类型({{ custom_types_str }}相关的内容即判定为相关is_related=true
{% endif %}
---
@@ -57,40 +51,13 @@
以下类型的内容无论是否与场景直接相关,都必须保留,请将其关键词/短语抽取到对应字段:
- 时间信息:日期、时间点、时间段、有效期 → times 字段
- 编号信息学号、工号、订单号、申请号、账号、ID → ids 字段
- 金额信息:价格、费用、金额(含货币符号或单位,如"100元"、"¥200")→ amounts 字段(注意:考试分数、成绩分数不属于金额,不要放入此字段)
- 金额信息:价格、费用、金额(含货币符号或单位 → amounts 字段
- 联系方式电话、手机号、邮箱、微信、QQ → contacts 字段
- 地址信息:地点、地址、位置 → addresses 字段
- 场景关键词:与**当前场景**强相关的专业术语、事件名称 → keywords 字段(注意:只放与当前场景直接相关的词,跨场景的内容不要放入此字段)
- 场景关键词:与场景强相关的专业术语、事件名称 → keywords 字段
- **情绪与情感**:喜悦、悲伤、愤怒、焦虑、开心、难过、委屈、兴奋、害怕、担心、压力、感动等情绪表达 → preserve_keywords 字段
- **兴趣与爱好**:喜欢、热爱、爱好、擅长、享受、沉迷、着迷、讨厌某事物等个人偏好表达 → preserve_keywords 字段
- **个人情感态度**:对人际关系、情感状态的明确表达(如"我跟室友闹矛盾了"、"我都快抑郁了"→ preserve_keywords 字段
- 注意:学业目标(如"我想考研")、成绩(如"87分")、学科偏好(如"喜欢数学")属于学业信息,不属于情绪/情感,不要放入 preserve_keywords 字段
【场景无关内容标记】
请从对话中识别出与当前场景({{ pruning_scene }}**既不相关、也无语义关联**的消息片段,将其原文(或关键片段)提取到 scene_unrelated_snippets 字段。
判断标准:
- 与场景实体类型完全无关
- 与场景话题没有因果/时间/情境上的关联(例如:不是"因为上课所以累"这种关联)
- 纯粹是另一个话题的内容(如在教育场景中讨论购物、娱乐等)
注意:有情绪/感受表达的消息即使话题不同,也可能有语义关联,请谨慎标记。
**重要scene_unrelated_snippets 必须认真填写,不能为空数组。**
如果对话中存在与场景无关的内容,必须将其原文片段提取出来。
示例(场景=在线教育):
- "我最近心情很差,跟室友闹矛盾了" → 与教育场景无关,加入 scene_unrelated_snippets
- "她总是很晚回来吵到我睡觉" → 与教育场景无关,加入 scene_unrelated_snippets
- "对,我都快抑郁了" → 与教育场景无关,加入 scene_unrelated_snippets
- "期末考试12月25日" → 与教育场景相关,不加入 scene_unrelated_snippets
- "我上次高数作业87分" → 与教育场景相关,不加入 scene_unrelated_snippets
- "我的目标是考研" → 与教育场景相关,不加入 scene_unrelated_snippets
示例(场景=情感陪伴):
- "我最近心情很差,跟室友闹矛盾了" → 与情感陪伴场景相关(情绪+关系),不加入 scene_unrelated_snippets
- "对,我都快抑郁了" → 与情感陪伴场景相关(情绪),不加入 scene_unrelated_snippets
- "期末考试12月25日3号教学楼201室" → 与情感陪伴场景无关(教育信息),加入 scene_unrelated_snippets
- "我上次高数作业87分这次能考好吗" → 与情感陪伴场景无关(学业信息),加入 scene_unrelated_snippets
- "我的目标是考研,想读应用数学" → 与情感陪伴场景无关(学业目标),加入 scene_unrelated_snippets
- **个人观点与态度**:对某事物的明确看法、评价、立场 → preserve_keywords 字段
【可以删除的内容】
以下类型的内容属于低价值信息,可以在剪枝时删除:
@@ -121,8 +88,7 @@
"contacts": [<string>...],
"addresses": [<string>...],
"keywords": [<string>...],
"preserve_keywords": [<string>...],
"scene_unrelated_snippets": [<string>...]
"preserve_keywords": [<string>...]
}
{% else %}
You are a dialogue content analysis assistant. Please analyze the full dialogue below in one pass and complete two tasks:
@@ -130,17 +96,8 @@ You are a dialogue content analysis assistant. Please analyze the full dialogue
2. Extract all important information fragments that must be preserved.
Scenario Description: {{ instruction }}
{% if ontology_class_infos and ontology_class_infos | length > 0 %}
[Scene Entity Type Definitions]
The following entity types define what content is important in this scene.
Content related to ANY of these types must be preserved and extracted into the keywords field:
{% for info in ontology_class_infos %}
- {{ info.class_name }}: {{ info.class_description }}
{% endfor %}
Important: If the dialogue contains content related to any of the entity types above, mark it as relevant (is_related=true).
{% if custom_types_str %}
Important: If the dialogue contains content related to any of the entity types above ({{ custom_types_str }}), mark it as relevant (is_related=true).
{% endif %}
---
@@ -148,22 +105,13 @@ Important: If the dialogue contains content related to any of the entity types a
The following types of content must always be preserved regardless of scene relevance. Extract their keywords/phrases into the corresponding fields:
- Time information: dates, time points, durations, expiry dates → times field
- ID information: student IDs, employee IDs, order numbers, application numbers, account IDs → ids field
- Amount information: prices, fees, amounts (with currency symbols or units, e.g., "$100", "¥200") → amounts field (Note: exam scores and grades are NOT amounts, do not put them here)
- Amount information: prices, fees, amounts (with currency symbols or units) → amounts field
- Contact information: phone numbers, emails, WeChat, QQ → contacts field
- Address information: locations, addresses, places → addresses field
- Scene keywords: professional terms and event names strongly related to **the current scene** → keywords field (Note: only put terms directly related to the current scene; cross-scene content should not be placed here)
- Scene keywords: professional terms and event names strongly related to the scene → keywords field
- **Emotions and feelings**: joy, sadness, anger, anxiety, happiness, sadness, excitement, fear, worry, stress, being moved, etc. → preserve_keywords field
- **Interests and hobbies**: likes, loves, hobbies, good at, enjoys, obsessed with, hates something, personal preferences → preserve_keywords field
- **Personal emotional attitudes**: clear expressions about interpersonal relationships or emotional states (e.g., "I had a fight with my roommate", "I'm almost depressed") → preserve_keywords field
- Note: Academic goals (e.g., "I want to pursue a master's degree"), grades (e.g., "87 points"), and subject preferences (e.g., "I like math") are academic information, NOT emotions/feelings — do not put them in preserve_keywords
[Scene-Unrelated Content Marking]
Please identify message snippets in the dialogue that are **neither relevant to nor semantically associated with** the current scene ({{ pruning_scene }}), and extract their original text (or key fragments) into the scene_unrelated_snippets field.
Criteria:
- Completely unrelated to the scene's entity types
- No causal/temporal/contextual association with the scene topic (e.g., "feeling tired because of class" IS associated)
- Purely belongs to a different topic (e.g., discussing shopping or entertainment in an education scene)
Note: Messages with emotional/feeling expressions may still have semantic association even if the topic differs — mark carefully.
- **Personal opinions and attitudes**: clear views, evaluations, or stances on something → preserve_keywords field
[CAN BE DELETED]
The following types of content are low-value and can be removed during pruning:
@@ -193,7 +141,6 @@ Output strict JSON only (fixed keys, order doesn't matter):
"contacts": [<string>...],
"addresses": [<string>...],
"keywords": [<string>...],
"preserve_keywords": [<string>...],
"scene_unrelated_snippets": [<string>...]
"preserve_keywords": [<string>...]
}
{% endif %}

View File

@@ -121,3 +121,18 @@ class StorageBackend(ABC):
URL for accessing the file.
"""
pass
async def get_permanent_url(self, file_key: str) -> Optional[str]:
"""
Get a permanent public URL for the file (no expiration).
Returns None by default; remote storage backends should override this
if the bucket is configured for public read access.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
A permanent public URL, or None if not supported.
"""
return None

View File

@@ -261,3 +261,13 @@ class OSSStorage(StorageBackend):
logger.error(f"Failed to generate presigned URL for {file_key}: {e}")
# Return a basic URL format as fallback
return f"https://{self.bucket_name}.{self.endpoint.replace('https://', '').replace('http://', '')}/{file_key}"
async def get_permanent_url(self, file_key: str) -> str:
"""
Get a permanent public URL for the file (requires bucket public read).
Returns:
A permanent URL in the format: https://{bucket}.{endpoint}/{file_key}
"""
host = self.endpoint.replace("https://", "").replace("http://", "")
return f"https://{self.bucket_name}.{host}/{file_key}"

View File

@@ -378,3 +378,12 @@ class S3Storage(StorageBackend):
logger.error(f"Failed to generate presigned URL for {file_key}: {e}")
# Return a basic URL format as fallback
return f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{file_key}"
async def get_permanent_url(self, file_key: str) -> str:
"""
Get a permanent public URL for the file (requires bucket public read).
Returns:
A permanent URL in the format: https://{bucket}.s3.{region}.amazonaws.com/{file_key}
"""
return f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{file_key}"

View File

@@ -20,21 +20,9 @@ from app.core.workflow.engine.variable_pool import VariablePool
from app.core.workflow.nodes import NodeFactory
from app.core.workflow.nodes.enums import NodeType, BRANCH_NODES
from app.core.workflow.utils.expression_evaluator import evaluate_condition
from app.core.workflow.validator import WorkflowValidator
logger = logging.getLogger(__name__)
# Regex to split output into:
# - variable placeholders: {{ ... }}
# - normal literal text
#
# Example:
# "Hello {{user.name}}!" ->
# ["Hello ", "{{user.name}}", "!"]
_OUTPUT_PATTERN = re.compile(r'\{\{.*?}}|[^{}]+')
# Strict variable format: {{ node_id.field_name }}
_VARIABLE_PATTERN = re.compile(r'\{\{\s*[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\s*}}')
class GraphBuilder:
def __init__(
@@ -49,13 +37,13 @@ class GraphBuilder:
self.stream = stream
self.subgraph = subgraph
self.start_node_id: str | None = None
self.start_node_id = None
self.end_node_ids = []
self.node_map = {node["id"]: node for node in self.nodes}
self.end_node_map: dict[str, StreamOutputConfig] = {}
self._find_upstream_activation_dep = lru_cache(
self._find_upstream_branch_node = lru_cache(
maxsize=len(self.nodes) * 2
)(self._find_upstream_activation_dep)
)(self._find_upstream_branch_node)
if variable_pool:
self.variable_pool = variable_pool
else:
@@ -63,18 +51,9 @@ class GraphBuilder:
self.graph = StateGraph(WorkflowState)
self.add_nodes()
self.reachable_nodes = WorkflowValidator.get_reachable_nodes(self.start_node_id, self.edges)
self.end_nodes = [
node
for node in self.nodes
if node.get("type") == "end" and node.get("id") in self.reachable_nodes
]
self.add_edges()
# EDGES MUST BE ADDED AFTER NODES ARE ADDED.
self._reverse_adj: dict[str, list[dict]] = defaultdict(list)
self._build_reverse_adj()
self._analyze_end_node_output()
# EDGES MUST BE ADDED AFTER NODES ARE ADDED.
@property
def nodes(self) -> list[dict[str, Any]]:
@@ -108,50 +87,60 @@ class GraphBuilder:
result[node[0]].append(node[1])
return result
def _build_reverse_adj(self):
for edge in self.edges:
if edge["source"] not in self.reachable_nodes:
continue
self._reverse_adj[edge.get("target")].append({
"id": edge["source"], "branch": edge.get("label")
})
def _find_upstream_branch_node(self, target_node: str) -> tuple[bool, tuple[tuple[str, str]]]:
"""
Recursively find all upstream branch (control) nodes that influence the execution
of the given target node.
def _find_upstream_activation_dep(
self,
target_node: str
) -> tuple[tuple[tuple[str, str]], tuple[str]]:
"""Find upstream dependencies that affect the activation of a target node.
This method walks upstream along the workflow graph starting from `target_node`.
It distinguishes between:
- branch nodes (node types listed in `BRANCH_NODES`)
- non-branch nodes (ordinary processing nodes)
Walks upstream along the workflow graph from the target node, collecting
two types of dependencies:
- Branch control nodes: upstream branch nodes (e.g. if-else) whose
routing outcome determines whether the target node executes.
- Output nodes: upstream END nodes that must complete their output
before the target node can activate.
Traversal rules:
1. For each immediate upstream node:
- If it is a branch node, it is recorded as an affecting control node.
- If it is a non-branch node, the traversal continues recursively upstream.
2. If ANY upstream path reaches a START / CYCLE_START node without encountering
a branch node, the traversal is considered invalid:
- `has_branch` will be False
- no branch nodes are returned.
3. Only when ALL upstream non-branch paths eventually lead to at least one
branch node will `has_branch` be True.
The traversal terminates early and returns empty tuples if any upstream
path reaches START/CYCLE_START without encountering a branch or output
node, indicating the target node is directly reachable and should be
activated immediately.
Special case:
- If `target_node` has no upstream nodes AND its type is START or CYCLE_START,
it is considered directly reachable from the workflow entry, and therefore
has no controlling branch nodes.
Args:
target_node: The ID of the node whose upstream activation
dependencies are to be resolved.
target_node (str):
The identifier of the node whose upstream control branches
are to be resolved.
Returns:
A tuple of two elements:
- A deduplicated tuple of (branch_node_id, branch_label) pairs
representing upstream branch control dependencies. Empty if
any clean path to START exists.
- A deduplicated tuple of upstream output node IDs that must
complete before this node activates.
tuple[bool, tuple[tuple[str, str]]]:
- has_branch (bool):
True if every upstream path from `target_node` encounters
at least one branch node.
False if any path reaches a start node without a branch.
- branch_nodes (tuple[tuple[str, str]]):
A deduplicated tuple of `(branch_node_id, branch_label)` pairs
representing all branch nodes that can influence `target_node`.
Returns an empty tuple if `has_branch` is False.
"""
source_nodes = self._reverse_adj[target_node]
source_nodes = [
{
"id": edge.get("source"),
"branch": edge.get("label")
}
for edge in self.edges
if edge.get("target") == target_node
]
if not source_nodes and self.get_node_type(target_node) in [NodeType.START, NodeType.CYCLE_START]:
return tuple(), tuple()
return False, tuple()
branch_nodes = []
output_nodes = []
non_branch_nodes = []
for node_info in source_nodes:
@@ -160,23 +149,19 @@ class GraphBuilder:
(node_info["id"], node_info["branch"])
)
else:
if self.get_node_type(node_info["id"]) == NodeType.END:
output_nodes.append(node_info["id"])
non_branch_nodes.append(node_info["id"])
has_branch = True
for node_id in non_branch_nodes:
upstream_control_nodes, upstream_output_nodes = self._find_upstream_activation_dep(node_id)
if not upstream_control_nodes:
if not upstream_output_nodes and node_id not in output_nodes:
return tuple(), tuple()
branch_nodes = []
has_branch = False
if has_branch:
branch_nodes.extend(upstream_control_nodes)
output_nodes.extend(upstream_output_nodes)
node_has_branch, nodes = self._find_upstream_branch_node(node_id)
has_branch = has_branch and node_has_branch
if not has_branch:
break
branch_nodes.extend(nodes)
if not has_branch:
branch_nodes = []
return tuple(set(branch_nodes)), tuple(set(output_nodes))
return has_branch, tuple(set(branch_nodes))
def _analyze_end_node_output(self):
"""
@@ -197,10 +182,11 @@ class GraphBuilder:
"""
# Collect all End nodes in the workflow
logger.info(f"[Prefix Analysis] Found {len(self.end_nodes)} End nodes")
end_nodes = [node for node in self.nodes if node.get("type") == "end"]
logger.info(f"[Prefix Analysis] Found {len(end_nodes)} End nodes")
# Iterate through each End node to analyze its output
for end_node in self.end_nodes:
for end_node in end_nodes:
end_node_id = end_node.get("id")
config = end_node.get("config", {})
output = config.get("output")
@@ -209,33 +195,42 @@ class GraphBuilder:
if not output:
continue
# Regex to split output into:
# - variable placeholders: {{ ... }}
# - normal literal text
#
# Example:
# "Hello {{user.name}}!" ->
# ["Hello ", "{{user.name}}", "!"]
pattern = r'\{\{.*?\}\}|[^{}]+'
# Strict variable format: {{ node_id.field_name }}
variable_pattern_string = r'\{\{\s*[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\s*\}\}'
variable_pattern = re.compile(variable_pattern_string)
# Split output into ordered segments
output_template = list(_OUTPUT_PATTERN.findall(output))
output_template = list(re.findall(pattern, output))
# Determine whether each segment is literal text
# True -> literal (can be directly output)
# False -> variable placeholder (needs runtime value)
output_flag = [
not bool(_VARIABLE_PATTERN.match(item))
not bool(variable_pattern.match(item))
for item in output_template
]
# Stream mode: output activation depends on upstream branch nodes
if self.stream:
# Find upstream branch nodes that can control this End node
upstream_control_nodes, upstream_output_nodes = self._find_upstream_activation_dep(end_node_id)
activate = not bool(upstream_control_nodes) and not bool(upstream_output_nodes)
has_branch, control_nodes = self._find_upstream_branch_node(end_node_id)
# Build StreamOutputConfig for this End node
self.end_node_map[end_node_id] = StreamOutputConfig(
id=end_node_id,
# If there is no upstream branch, output is active immediately
activate=activate,
activate=not has_branch,
# Branch nodes that control activation of this End node
control_nodes=self._merge_control_nodes(upstream_control_nodes),
upstream_output_nodes=list(upstream_output_nodes),
control_resolved=not bool(upstream_control_nodes),
output_resolved=not bool(upstream_output_nodes),
control_nodes=self._merge_control_nodes(control_nodes),
# Convert output segments into OutputContent objects
outputs=list(
@@ -254,16 +249,14 @@ class GraphBuilder:
cursor=0
)
logger.info(f"[Stream Analysis] end_id: {end_node_id}, "
f"activate: {activate}, "
f"control_nodes: {upstream_control_nodes},"
f"ref_outputs: {upstream_output_nodes},"
f"activate: {not has_branch}, "
f"control_nodes: {control_nodes},"
f"output: {output_template},"
f"output_activate: {output_flag}")
# Non-stream mode: all outputs are activated by default
else:
self.end_node_map[end_node_id] = StreamOutputConfig(
id=end_node_id,
activate=True,
control_nodes={},
outputs=list(
@@ -276,10 +269,7 @@ class GraphBuilder:
for output_string, activate in zip(output_template, output_flag)
]
),
cursor=0,
upstream_output_nodes=[],
control_resolved=True,
output_resolved=True,
cursor=0
)
def add_nodes(self):
@@ -314,6 +304,8 @@ class GraphBuilder:
# Record start and end node IDs
if node_type in [NodeType.START, NodeType.CYCLE_START]:
self.start_node_id = node_id
elif node_type == NodeType.END:
self.end_node_ids.append(node_id)
# Create node instance (start and end nodes are also created)
# NOTE:Loop node creation automatically removes the nodes and edges of the subgraph from the current graph
@@ -456,7 +448,7 @@ class GraphBuilder:
branch_activate = []
new_state = state.copy()
new_state["activate"] = dict(state.get("activate", {})) # deep copy of activate
node_output = variable_pool.get_node_output(src, default=dict(), strict=False)
node_output = variable_pool.get_node_output(src, defalut=dict(), strict=False)
for label, branch in unique_branch.items():
if node_output and evaluate_condition(
branch["condition"],
@@ -502,11 +494,9 @@ class GraphBuilder:
logger.debug(f"Added waiting edge: {sources} -> {target}")
# Connect End nodes to the global END node
for end_node in self.end_nodes:
end_node_id = end_node.get("id")
if end_node_id:
self.graph.add_edge(end_node_id, END)
logger.debug(f"Added edge: {end_node_id} -> END")
for end_node_id in self.end_node_ids:
self.graph.add_edge(end_node_id, END)
logger.debug(f"Added edge: {end_node_id} -> END")
return
def build(self) -> CompiledStateGraph:

View File

@@ -12,7 +12,6 @@ class WorkflowResultBuilder:
variable_pool: VariablePool,
elapsed_time: float,
final_output: str,
success: bool
):
"""Construct the final standardized output of the workflow execution.
@@ -30,7 +29,6 @@ class WorkflowResultBuilder:
elapsed_time (float): Total execution time in seconds.
final_output (Any): The aggregated or final output content of the workflow
(e.g., combined messages from all End nodes).
success (bool): Whether the execution was successful.
Returns:
dict: A dictionary containing the final workflow execution result with keys:
@@ -51,7 +49,7 @@ class WorkflowResultBuilder:
conversation_id = variable_pool.get_value("sys.conversation_id")
return {
"status": "completed" if success else "failed",
"status": "completed",
"output": final_output,
"variables": {
"conv": variable_pool.get_all_conversation_vars(),

View File

@@ -3,7 +3,6 @@
# @Email: 1533512157@qq.com
# @Time : 2026/2/9 15:11
import re
from queue import Queue
from typing import AsyncGenerator
from pydantic import BaseModel, Field, PrivateAttr
@@ -38,8 +37,8 @@ class OutputContent(BaseModel):
activate: bool = Field(
...,
description=(
"Whether this output segment is currently active."
"- True: allowed to be emitted/output"
"Whether this output segment is currently active.\n"
"- True: allowed to be emitted/output\n"
"- False: blocked until activated by branch control"
)
)
@@ -47,8 +46,8 @@ class OutputContent(BaseModel):
is_variable: bool = Field(
...,
description=(
"Whether this segment represents a variable placeholder."
"True -> variable (e.g. {{ node.field }})"
"Whether this segment represents a variable placeholder.\n"
"True -> variable (e.g. {{ node.field }})\n"
"False -> literal text"
)
)
@@ -87,16 +86,12 @@ class StreamOutputConfig(BaseModel):
- which upstream branch/control nodes gate the activation
- how each parsed output segment is streamed and activated
"""
id: str = Field(
...,
description="ID of the End node this configuration belongs to."
)
activate: bool = Field(
...,
description=(
"Global activation flag for the End node output."
"When False, output segments should not be emitted even if available."
"Global activation flag for the End node output.\n"
"When False, output segments should not be emitted even if available.\n"
"This flag typically becomes True once required control branch conditions "
"are satisfied."
)
@@ -105,46 +100,17 @@ class StreamOutputConfig(BaseModel):
control_nodes: dict[str, list[str]] = Field(
...,
description=(
"Control branch conditions for this End node output."
"Mapping of `branch_node_id -> expected_branch_label`."
"Control branch conditions for this End node output.\n"
"Mapping of `branch_node_id -> expected_branch_label`.\n"
"The End node output becomes globally active when a controlling branch node "
"reports a matching completion status."
)
)
upstream_output_nodes: list[str] = Field(
...,
description=(
"Upstream output node dependencies (data flow)."
"Represents END/output nodes that this output depends on."
"These nodes provide data sources required before this output can be activated "
"or streamed."
"Used to ensure correct ordering and dependency resolution in streaming mode."
)
)
control_resolved: bool = Field(
...,
description=(
"Whether all upstream branch control dependencies have been satisfied."
"True if no upstream branch nodes exist or the required branch "
"conditions have been met."
)
)
output_resolved: bool = Field(
...,
description=(
"Whether all upstream output node dependencies have been completed."
"True if no upstream output nodes exist or all upstream output "
"nodes have finished their output."
)
)
outputs: list[OutputContent] = Field(
...,
description=(
"Ordered list of output segments parsed from the output template."
"Ordered list of output segments parsed from the output template.\n"
"Each segment represents either a literal text block or a variable placeholder "
"that may be activated independently."
)
@@ -153,97 +119,49 @@ class StreamOutputConfig(BaseModel):
cursor: int = Field(
...,
description=(
"Streaming cursor index."
"Indicates the next output segment index to be emitted."
"Streaming cursor index.\n"
"Indicates the next output segment index to be emitted.\n"
"Segments with index < cursor are considered already streamed."
)
)
force: bool = Field(
default=False,
description=(
"Force flag for output emission."
"When True, all output segments are emitted regardless of activation state."
"Triggered when this output node has finished execution."
)
)
def update_activate(self, scope: str, status=None):
"""
Update streaming activation state based on upstream events.
Update streaming activation state based on an upstream node or special variable.
Args:
scope (str):
Identifier of the completed upstream entity.
- If a control branch node, it should match a key in `control_nodes`.
- If an upstream output node, it should match an entry in `upstream_output_nodes`.
- If a variable placeholder (e.g., "sys.xxx" or "node_id.field"),
it may appear in output segments.
- If a variable placeholder (e.g., "sys.xxx"), it may appear in output segments.
status (optional):
Completion status of the control branch node.
Required when `scope` refers to a control node.
Behavior:
1. Force activation:
- If `self.force` is True, the method returns immediately.
- If `scope == self.id`, the node marks itself as completed:
- `activate = True`
- `force = True`
This is typically used for final flushing when the node finishes execution.
1. Control branch nodes:
- If `scope` matches a key in `control_nodes` and `status` matches the expected
branch label, the End node output becomes globally active (`activate = True`).
2. Control dependency resolution:
- If `scope` matches a key in `control_nodes`:
- `status` must be provided.
- If `status` matches expected branch labels, mark control as resolved
(`control_resolved = True`).
3. Upstream output dependency resolution:
- If `scope` is in `upstream_output_nodes`,
mark data dependency as resolved (`output_resolved = True`).
4. Global activation condition:
- The node becomes active when BOTH conditions are satisfied:
- control_resolved == True
- output_resolved == True
- Once activated, `activate` remains True.
5. Variable segment activation:
- For each output segment that is a variable (`is_variable=True`):
- If the segment depends on the given `scope`,
mark the segment as active.
- This applies to both node variables (e.g., "node_id.field")
and system variables (e.g., "sys.xxx").
2. Variable output segments:
- For each segment that is a variable (`is_variable=True`):
- If the segment literal references `scope`, mark the segment as active.
- This applies both to regular node variables (e.g., "node_id.field")
and special system variables (e.g., "sys.xxx").
Notes:
- This method does NOT emit output or advance the streaming cursor.
- It only updates activation and dependency resolution states.
- Activation is driven by both control flow (branch nodes) and
data flow (upstream output nodes).
- This method does not emit output or advance the streaming cursor.
- It only updates activation flags based on upstream events or special variables.
"""
if self.force:
return
if scope == self.id:
self.activate = True
self.force = True
return
# resolve control branch dependency
# Case 1: resolve control branch dependency
if scope in self.control_nodes:
if status is None:
raise RuntimeError("[Stream Output] Control node activation status not provided")
if status in self.control_nodes[scope]:
self.control_resolved = True
self.activate = True
if scope in self.upstream_output_nodes:
self.upstream_output_nodes.remove(scope)
if not self.upstream_output_nodes:
self.output_resolved = True
self.activate = self.activate or (self.control_resolved and self.output_resolved)
# activate variable segments related to this node
# Case 2: activate variable segments related to this node
for i in range(len(self.outputs)):
if (
self.outputs[i].is_variable
@@ -256,17 +174,12 @@ class StreamOutputCoordinator:
def __init__(self):
self.end_outputs: dict[str, StreamOutputConfig] = {}
self.activate_end: str | None = None
self.output_queue: Queue = Queue()
self.processed_outputs = []
def initialize_end_outputs(
self,
end_node_map: dict[str, StreamOutputConfig]
):
self.end_outputs = end_node_map
self.processed_outputs = []
self.activate_end = None
self.output_queue = Queue()
@property
def current_activate_end_info(self):
@@ -298,11 +211,8 @@ class StreamOutputCoordinator:
"""
for node in self.end_outputs.keys():
self.end_outputs[node].update_activate(scope, status)
if self.end_outputs[node].activate and node not in self.processed_outputs:
self.output_queue.put(node)
self.processed_outputs.append(node)
if self.activate_end is None and not self.output_queue.empty():
self.activate_end = self.output_queue.get_nowait()
if self.end_outputs[node].activate and self.activate_end is None:
self.activate_end = node
async def emit_activate_chunk(
self,
@@ -346,7 +256,7 @@ class StreamOutputCoordinator:
final_chunk = ''
current_segment = end_info.outputs[end_info.cursor]
if not current_segment.activate and not force and not end_info.force:
if not current_segment.activate and not force:
# Stop processing until this segment becomes active
break
@@ -363,7 +273,7 @@ class StreamOutputCoordinator:
logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}, error: {e}")
if final_chunk:
logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk_length:{len(final_chunk)}")
logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}")
yield {
"event": "message",
"data": {
@@ -375,7 +285,8 @@ class StreamOutputCoordinator:
end_info.cursor += 1
if end_info.cursor >= len(end_info.outputs):
self.pop_current_activate_end()
self.end_outputs.pop(self.activate_end)
self.activate_end = None
async def flush_remaining_chunk(
self,
@@ -414,8 +325,6 @@ class StreamOutputCoordinator:
async for msg_event in self.emit_activate_chunk(variable_pool, force=True):
yield msg_event
if not self.output_queue.empty():
self.activate_end = self.output_queue.get_nowait()
# Move to next active End node if current one is done
if not self.activate_end and self.end_outputs:
self.activate_end = list(self.end_outputs.keys())[0]

View File

@@ -351,12 +351,12 @@ class VariablePool:
}
return runtime_vars
def get_node_output(self, node_id: str, default: Any = None, strict: bool = True) -> dict[str, Any] | None:
def get_node_output(self, node_id: str, defalut: Any = None, strict: bool = True) -> dict[str, Any] | None:
"""获取指定节点的输出(运行时变量)
Args:
node_id: 节点 ID
default: 默认值
defalut: 默认值
strict: 是否严格模式
Returns:
@@ -368,7 +368,7 @@ class VariablePool:
if strict:
raise KeyError(f"node {node_id} output not exist")
else:
return default
return defalut
def copy(self, pool: 'VariablePool'):
self.variables = deepcopy(pool.variables)

View File

@@ -128,100 +128,89 @@ class WorkflowExecutor:
- token_usage: aggregated token usage if available
- error: error message if any
"""
start = datetime.datetime.now()
async for event in self.execute_stream(input_data):
if event.get("event") == "workflow_end":
return event.get("data")
return self.result_builder.build_final_output(
{"error": "Workflow execution did not end as expected"},
self.variable_pool,
(datetime.datetime.now() - start).total_seconds(),
"",
success=False
)
# logger.info(f"Starting workflow execution: execution_id={self.execution_context.execution_id}")
#
# start_time = datetime.datetime.now()
#
# # Execute the workflow
# try:
# # Build the workflow graph
# graph = self.build_graph()
#
# # Initialize the variable pool with input data
# await self.variable_initializer.initialize(
# variable_pool=self.variable_pool,
# input_data=input_data,
# execution_context=self.execution_context
# )
# initial_state = self.state_manager.create_initial_state(
# workflow_config=self.workflow_config,
# input_data=input_data,
# execution_context=self.execution_context,
# start_node_id=self.start_node_id
# )
#
# result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config)
#
# # Aggregate output from all End nodes
# full_content = ''
# for end_id in self.stream_coordinator.end_outputs.keys():
# full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False)
#
# # Append messages for user and assistant
# if input_data.get("files"):
# result["messages"].extend(
# [
# {
# "role": "user",
# "content": input_data.get("message", '')
# },
# {
# "role": "user",
# "content": input_data.get("files")
# },
# {
# "role": "assistant",
# "content": full_content
# }
# ]
# )
# else:
# result["messages"].extend(
# [
# {
# "role": "user",
# "content": input_data.get("message", '')
# },
# {
# "role": "assistant",
# "content": full_content
# }
# ]
# )
# # Calculate elapsed time
# end_time = datetime.datetime.now()
# elapsed_time = (end_time - start_time).total_seconds()
#
# logger.info(
# f"Workflow execution completed: execution_id={self.execution_context.execution_id}, elapsed_time={elapsed_time:.2f}ms")
#
# return self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content)
#
# except Exception as e:
# end_time = datetime.datetime.now()
# elapsed_time = (end_time - start_time).total_seconds()
#
# logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
# exc_info=True)
# return {
# "status": "failed",
# "error": str(e),
# "output": None,
# "node_outputs": {},
# "elapsed_time": elapsed_time,
# "token_usage": None
# }
logger.info(f"Starting workflow execution: execution_id={self.execution_context.execution_id}")
start_time = datetime.datetime.now()
# Execute the workflow
try:
# Build the workflow graph
graph = self.build_graph()
# Initialize the variable pool with input data
await self.variable_initializer.initialize(
variable_pool=self.variable_pool,
input_data=input_data,
execution_context=self.execution_context
)
initial_state = self.state_manager.create_initial_state(
workflow_config=self.workflow_config,
input_data=input_data,
execution_context=self.execution_context,
start_node_id=self.start_node_id
)
result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config)
# Aggregate output from all End nodes
full_content = ''
for end_id in self.stream_coordinator.end_outputs.keys():
full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False)
# Append messages for user and assistant
if input_data.get("files"):
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "user",
"content": input_data.get("files")
},
{
"role": "assistant",
"content": full_content
}
]
)
else:
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
# Calculate elapsed time
end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
logger.info(
f"Workflow execution completed: execution_id={self.execution_context.execution_id}, elapsed_time={elapsed_time:.2f}ms")
return self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content)
except Exception as e:
end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True)
return {
"status": "failed",
"error": str(e),
"output": None,
"node_outputs": {},
"elapsed_time": elapsed_time,
"token_usage": None
}
async def execute_stream(
self,
@@ -259,8 +248,7 @@ class WorkflowExecutor:
"timestamp": int(start_time.timestamp() * 1000)
}
}
result = None
full_content = ''
try:
# Build the workflow graph in streaming mode
graph = self.build_graph(stream=True)
@@ -278,6 +266,7 @@ class WorkflowExecutor:
start_node_id=self.start_node_id
)
full_content = ''
self.stream_coordinator.update_scope_activation("sys")
# Execute the workflow with streaming
@@ -374,12 +363,7 @@ class WorkflowExecutor:
yield {
"event": "workflow_end",
"data": self.result_builder.build_final_output(
result,
self.variable_pool,
elapsed_time,
full_content,
success=True)
"data": self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content)
}
except Exception as e:
@@ -388,19 +372,16 @@ class WorkflowExecutor:
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True)
if result is None:
result = {"error": str(e)}
else:
result["error"] = str(e)
yield {
"event": "workflow_end",
"data": self.result_builder.build_final_output(
result,
self.variable_pool,
elapsed_time,
full_content,
success=False
)
"data": {
"execution_id": self.execution_context.execution_id,
"status": "failed",
"error": str(e),
"elapsed_time": elapsed_time,
"timestamp": end_time.isoformat()
}
}

View File

@@ -128,7 +128,7 @@ class CodeNode(BaseNode):
else:
raise ValueError(f"Unsupported language: {self.typed_config.language}")
async with httpx.AsyncClient(timeout=60) as client:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://sandbox:8194/v1/sandbox/run",
headers={

View File

@@ -51,7 +51,7 @@ class ConditionDetail(BaseModel):
)
right: Any = Field(
default=None,
...,
description="Right-hand operand of the comparison expression"
)

View File

@@ -158,7 +158,7 @@ class LoopRuntime:
self.variable_pool.variables["conv"].update(
self.child_variable_pool.variables["conv"]
)
loop_vars = self.child_variable_pool.get_node_output(self.node_id, default={}, strict=False)
loop_vars = self.child_variable_pool.get_node_output(self.node_id, defalut={}, strict=False)
loopstate["node_outputs"][self.node_id] = loop_vars
def evaluate_conditional(self) -> bool:
@@ -261,4 +261,4 @@ class LoopRuntime:
idx += 1
logger.info(f"loop node {self.node_id}: execution completed")
return self.child_variable_pool.get_node_output(self.node_id, default={}, strict=False) | {"__child_state": child_state}
return self.child_variable_pool.get_node_output(self.node_id) | {"__child_state": child_state}

View File

@@ -18,7 +18,7 @@ class ConditionDetail(BaseModel):
)
right: Any = Field(
default=None,
...,
description="Value to compare with"
)

View File

@@ -31,13 +31,13 @@ class IfElseNode(BaseNode):
expressions.append({
"left": self.get_variable(expression.left, variable_pool, strict=False),
"right": expression.right
if expression.input_type == ValueInputType.CONSTANT or expression.right is None
if expression.input_type == ValueInputType.CONSTANT
else self.get_variable(expression.right, variable_pool, strict=False),
"operator": str(expression.operator),
"operator": expression.operator,
})
result.append({
"expressions": expressions,
"logical_operator": str(case.logical_operator),
"logical_operator": case.logical_operator,
})
return {
"cases": result

View File

@@ -5,7 +5,7 @@ from typing import Any
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.core.models import RedBearRerank, RedBearModelConfig
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ElasticSearchVectorFactory
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ElasticSearchVectorFactory, ElasticSearchVector
from app.core.workflow.engine.state_manager import WorkflowState
from app.core.workflow.engine.variable_pool import VariablePool
from app.core.workflow.nodes.base_node import BaseNode
@@ -24,6 +24,7 @@ class KnowledgeRetrievalNode(BaseNode):
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]):
super().__init__(node_config, workflow_config)
self.typed_config: KnowledgeRetrievalNodeConfig | None = None
self.vector_service: ElasticSearchVector | None = None
def _output_types(self) -> dict[str, VariableType]:
return {
@@ -163,6 +164,50 @@ class KnowledgeRetrievalNode(BaseNode):
)
return reranker
def knowledge_retrieval(self, db, query, rs, db_knowledge, kb_config):
if db_knowledge.type == knowledge_model.KnowledgeType.FOLDER:
children = knowledge_repository.get_knowledges_by_parent_id(db=db, parent_id=db_knowledge.id)
for child in children:
if not (child and child.chunk_num > 0 and child.status == 1):
continue
kb_config.kb_id = child.id
self.knowledge_retrieval(db, query, rs, child, kb_config)
return
self.vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
indices = f"Vector_index_{kb_config.kb_id}_Node".lower()
match kb_config.retrieve_type:
case RetrieveType.PARTICIPLE:
rs.extend(self.vector_service.search_by_full_text(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.similarity_threshold))
case RetrieveType.SEMANTIC:
rs.extend(self.vector_service.search_by_vector(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.vector_similarity_weight))
case RetrieveType.HYBRID:
rs1 = self.vector_service.search_by_vector(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.vector_similarity_weight)
rs2 = self.vector_service.search_by_full_text(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.similarity_threshold)
# Deduplicate hybrid retrieval results
unique_rs = self._deduplicate_docs(rs1, rs2)
if not unique_rs:
return
if self.typed_config.reranker_id:
self.vector_service.reranker = self.get_reranker_model()
rs.extend(self.vector_service.rerank(query=query, docs=unique_rs, top_k=kb_config.top_k))
else:
rs.extend(sorted(
unique_rs,
key=lambda d: d.metadata.get("score", 0),
reverse=True
)[:kb_config.top_k])
case _:
raise RuntimeError("Unknown retrieval type")
async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> Any:
"""
Execute the knowledge retrieval workflow node.
@@ -191,56 +236,19 @@ class KnowledgeRetrievalNode(BaseNode):
query = self._render_template(self.typed_config.query, variable_pool)
with get_db_read() as db:
knowledge_bases = self.typed_config.knowledge_bases
existing_ids = self._get_existing_kb_ids(db, [kb.kb_id for kb in knowledge_bases])
if not existing_ids:
raise RuntimeError("Knowledge base retrieval failed: the knowledge base does not exist.")
rs = []
for kb_config in knowledge_bases:
db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_config.kb_id)
if not db_knowledge:
raise RuntimeError("The knowledge base does not exist or access is denied.")
self.knowledge_retrieval(db, query, rs, db_knowledge, kb_config)
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
indices = f"Vector_index_{kb_config.kb_id}_Node".lower()
match kb_config.retrieve_type:
case RetrieveType.PARTICIPLE:
rs.extend(vector_service.search_by_full_text(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.similarity_threshold))
case RetrieveType.SEMANTIC:
rs.extend(vector_service.search_by_vector(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.vector_similarity_weight))
case RetrieveType.HYBRID:
rs1 = vector_service.search_by_vector(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.vector_similarity_weight)
rs2 = vector_service.search_by_full_text(query=query, top_k=kb_config.top_k,
indices=indices,
score_threshold=kb_config.similarity_threshold)
# Deduplicate hy brid retrieval results
unique_rs = self._deduplicate_docs(rs1, rs2)
if not unique_rs:
continue
if self.typed_config.reranker_id:
vector_service.reranker = self.get_reranker_model()
rs.extend(vector_service.rerank(query=query, docs=unique_rs, top_k=kb_config.top_k))
else:
rs.extend(sorted(
unique_rs,
key=lambda d: d.metadata.get("score", 0),
reverse=True
)[:kb_config.top_k])
case _:
raise RuntimeError("Unknown retrieval type")
if not rs:
return []
if self.typed_config.reranker_id:
vector_service.reranker = self.get_reranker_model()
final_rs = vector_service.rerank(query=query, docs=rs, top_k=self.typed_config.reranker_top_k)
self.vector_service.reranker = self.get_reranker_model()
final_rs = self.vector_service.rerank(query=query, docs=rs, top_k=self.typed_config.reranker_top_k)
else:
final_rs = sorted(
rs,

View File

@@ -250,8 +250,6 @@ class ConditionBase(ABC):
self.type_limit = getattr(self, "type_limit", None)
def resolve_right_literal_value(self):
if self.right_selector is None:
return None
if self.input_type == ValueInputType.VARIABLE:
pattern = r"\{\{\s*(.*?)\s*\}\}"
right_expression = re.sub(pattern, r"\1", self.right_selector).strip()

View File

@@ -170,7 +170,7 @@ class WorkflowValidator:
# 仅在发布时验证所有节点可达
# 6. 验证所有节点可达(从 start 节点出发)
if start_nodes and not errors: # 只有在前面验证通过时才检查可达性
reachable = WorkflowValidator.get_reachable_nodes(
reachable = WorkflowValidator._get_reachable_nodes(
start_nodes[0]["id"],
edges
)
@@ -194,7 +194,7 @@ class WorkflowValidator:
return len(errors) == 0, errors
@staticmethod
def get_reachable_nodes(start_id: str, edges: list[dict]) -> set[str]:
def _get_reachable_nodes(start_id: str, edges: list[dict]) -> set[str]:
"""获取从 start 节点可达的所有节点
Args:

View File

@@ -2,7 +2,7 @@ from enum import StrEnum
from abc import abstractmethod, ABC
from typing import Any
from pydantic import BaseModel, Field, PrivateAttr
from pydantic import BaseModel, Field
from app.schemas import FileType
@@ -41,10 +41,10 @@ class VariableType(StrEnum):
"""
if isinstance(var, str):
return cls.STRING
elif isinstance(var, bool):
return cls.BOOLEAN
elif isinstance(var, (int, float)):
return cls.NUMBER
elif isinstance(var, bool):
return cls.BOOLEAN
elif isinstance(var, FileObject) or (isinstance(var, dict) and var.get('is_file')):
return cls.FILE
elif isinstance(var, dict):
@@ -116,7 +116,7 @@ class FileObject(BaseModel):
content_cache: dict = Field(default_factory=dict)
is_file: bool
_byte_content: bytes | None = PrivateAttr(default=None)
_byte_content: bytes | None = None
def get_content(self):
return self._byte_content

View File

@@ -10,7 +10,6 @@ T = TypeVar("T", bound=BaseVariable)
class StringVariable(BaseVariable):
value: str
type = 'str'
def valid_value(self, value) -> str:
@@ -23,7 +22,6 @@ class StringVariable(BaseVariable):
class NumberVariable(BaseVariable):
value: int | float
type = 'number'
def valid_value(self, value) -> int | float:
@@ -36,7 +34,6 @@ class NumberVariable(BaseVariable):
class BooleanVariable(BaseVariable):
value: bool
type = 'boolean'
def valid_value(self, value) -> bool:
@@ -49,7 +46,6 @@ class BooleanVariable(BaseVariable):
class DictVariable(BaseVariable):
value: dict
type = 'object'
def valid_value(self, value) -> dict:
@@ -62,7 +58,6 @@ class DictVariable(BaseVariable):
class FileVariable(BaseVariable):
value: FileObject
type = 'file'
def valid_value(self, value) -> FileObject:
@@ -107,7 +102,6 @@ class FileVariable(BaseVariable):
class ArrayVariable(BaseVariable, Generic[T]):
value: list[T]
type = 'array'
def __init__(self, child_type: Type[T], value: list[Any]):
@@ -135,7 +129,6 @@ class ArrayVariable(BaseVariable, Generic[T]):
class NestedArrayVariable(BaseVariable):
value: list[ArrayVariable]
type = 'array_nest'
def valid_value(self, value: list[T]) -> list[T]:
@@ -160,7 +153,6 @@ class NestedArrayVariable(BaseVariable):
category=RuntimeWarning
)
class AnyVariable(BaseVariable):
value: Any
type = 'any'
def valid_value(self, value: Any) -> Any:

View File

@@ -65,7 +65,6 @@ def get_db_read() -> Generator[Session, None, None]:
yield db
finally:
db.rollback() # 只读任务无需 commit
db.close()
def get_pool_status():

View File

@@ -506,10 +506,13 @@ async def http_exception_handler(request: Request, exc: HTTPException):
404: "errors.common.not_found",
405: "errors.common.method_not_allowed",
409: "errors.common.conflict",
413: "errors.common.payload_too_large",
422: "errors.common.validation_failed",
429: "errors.common.too_many_requests",
500: "errors.common.internal_error",
502: "errors.common.bad_gateway",
503: "errors.common.service_unavailable",
504: "errors.common.gateway_timeout",
}
# 如果有对应的翻译键,使用翻译
@@ -534,7 +537,7 @@ async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content=fail(code=exc.status_code, msg=translated_message, error=translated_message)
content=fail(code=exc.status_code, msg=translated_message, error=exc.detail)
)

View File

@@ -9,6 +9,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from app.db import Base
from app.schemas import FileType
class PerceptualType(IntEnum):
VISION = 1
AUDIO = 2

View File

@@ -90,27 +90,27 @@ class ConversationRepository:
self,
user_id: uuid.UUID,
workspace_id: uuid.UUID = None,
limit: int = 10,
is_activate: bool = True
) -> list[Conversation]:
is_activate: bool = True,
page: int = 1,
page_size: int = 20
) -> tuple[list[Conversation], int]:
"""
Retrieve recent conversations for a specific user.
Retrieve recent conversations for a specific user with pagination.
This method queries conversations associated with the given user ID,
optionally scoped to a specific workspace. Results are ordered by the
most recently updated conversations and limited to a fixed number.
most recently updated conversations.
Args:
user_id (uuid.UUID): Unique identifier of the user.
workspace_id (uuid.UUID, optional): Workspace scope for the query.
If provided, only conversations under this workspace will be returned.
limit (int): Maximum number of conversations to return.
Defaults to 10.
is_activate (bool): Convsersation State limit
is_activate (bool): Conversation State limit.
page (int): Page number (1-based). Defaults to 1.
page_size (int): Number of items per page. Defaults to 20.
Returns:
list[Conversation]: A list of conversation entities ordered by
last updated time (descending).
tuple[list[Conversation], int]: A list of conversation entities and total count.
"""
logger.info(f"Fetching conversation by user_id: {user_id}")
@@ -122,18 +122,25 @@ class ConversationRepository:
if workspace_id:
stmt = stmt.where(Conversation.workspace_id == workspace_id)
stmt = stmt.order_by(desc(Conversation.updated_at))
stmt = stmt.limit(limit)
# Calculate total count
total = int(self.db.execute(
select(func.count()).select_from(stmt.subquery())
).scalar_one())
convsersations = list(self.db.scalars(stmt).all())
# Apply ordering and pagination
stmt = stmt.order_by(desc(Conversation.updated_at))
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
conversations = list(self.db.scalars(stmt).all())
logger.info(
"Conversation fetched successfully",
extra={
"user_id": str(user_id),
"workspace_id": str(workspace_id),
"total": total,
}
)
return convsersations
return conversations, total
def list_conversations(
self,

View File

@@ -13,18 +13,16 @@ from app.repositories.neo4j.cypher_queries import (
ENTITY_LEAVE_ALL_COMMUNITIES,
GET_ENTITY_NEIGHBORS,
GET_ALL_ENTITIES_FOR_USER,
GET_ENTITY_COUNT_FOR_USER,
GET_ALL_ENTITY_IDS_FOR_USER,
GET_ENTITIES_PAGE,
GET_COMMUNITY_MEMBERS,
GET_COMMUNITY_RELATIONSHIPS,
GET_ALL_COMMUNITY_MEMBERS_BATCH,
GET_ALL_ENTITY_NEIGHBORS_BATCH,
GET_ENTITY_NEIGHBORS_BATCH_FOR_IDS,
CHECK_USER_HAS_COMMUNITIES,
UPDATE_COMMUNITY_MEMBER_COUNT,
UPDATE_COMMUNITY_METADATA,
BATCH_UPDATE_COMMUNITY_METADATA,
GET_INCOMPLETE_COMMUNITIES,
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING,
CHECK_COMMUNITY_IS_COMPLETE,
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING,
)
logger = logging.getLogger(__name__)
@@ -116,69 +114,10 @@ class CommunityRepository:
logger.error(f"get_all_entities failed: {e}")
return []
async def get_entity_count(self, end_user_id: str) -> int:
"""仅返回用户实体总数,不加载实体数据。"""
try:
result = await self.connector.execute_query(
GET_ENTITY_COUNT_FOR_USER,
end_user_id=end_user_id,
)
return result[0]["entity_count"] if result else 0
except Exception as e:
logger.error(f"get_entity_count failed: {e}")
return 0
async def get_all_entity_ids(self, end_user_id: str) -> List[str]:
"""仅返回用户所有实体 ID 列表,不加载 embedding 等大字段。"""
try:
result = await self.connector.execute_query(
GET_ALL_ENTITY_IDS_FOR_USER,
end_user_id=end_user_id,
)
return [r["id"] for r in result]
except Exception as e:
logger.error(f"get_all_entity_ids failed: {e}")
return []
async def get_entities_page(
self, end_user_id: str, skip: int, limit: int
) -> List[Dict]:
"""分页拉取实体,用于全量聚类分批处理。"""
try:
return await self.connector.execute_query(
GET_ENTITIES_PAGE,
end_user_id=end_user_id,
skip=skip,
limit=limit,
)
except Exception as e:
logger.error(f"get_entities_page failed: {e}")
return []
async def get_entity_neighbors_for_ids(
self, entity_ids: List[str], end_user_id: str
) -> Dict[str, List[Dict]]:
"""批量拉取指定实体列表的邻居,返回 {entity_id: [neighbors]}。"""
try:
rows = await self.connector.execute_query(
GET_ENTITY_NEIGHBORS_BATCH_FOR_IDS,
entity_ids=entity_ids,
end_user_id=end_user_id,
)
result: Dict[str, List[Dict]] = {}
for row in rows:
eid = row["entity_id"]
neighbor = {k: v for k, v in row.items() if k != "entity_id"}
result.setdefault(eid, []).append(neighbor)
return result
except Exception as e:
logger.error(f"get_entity_neighbors_for_ids failed: {e}")
return {}
async def get_community_members(
self, community_id: str, end_user_id: str
) -> List[Dict]:
"""查询社区成员列表(含 example 字段)"""
"""查询社区成员列表。"""
try:
return await self.connector.execute_query(
GET_COMMUNITY_MEMBERS,
@@ -189,20 +128,6 @@ class CommunityRepository:
logger.error(f"get_community_members failed: {e}")
return []
async def get_community_relationships(
self, community_id: str, end_user_id: str
) -> List[Dict]:
"""查询社区内实体间的关系三元组subject, predicate, object"""
try:
return await self.connector.execute_query(
GET_COMMUNITY_RELATIONSHIPS,
community_id=community_id,
end_user_id=end_user_id,
)
except Exception as e:
logger.error(f"get_community_relationships failed: {e}")
return []
async def get_all_community_members_batch(
self, community_ids: List[str], end_user_id: str
) -> Dict[str, List[Dict]]:
@@ -249,6 +174,31 @@ class CommunityRepository:
logger.error(f"refresh_member_count failed: {e}")
return 0
async def get_incomplete_communities(self, end_user_id: str, check_embedding: bool = False) -> List[str]:
"""查询该用户下属性不完整的 Community 节点 ID 列表。
Args:
end_user_id: 用户 ID
check_embedding: 为 True 时额外检查 summary_embedding 是否缺失(仅当用户有 embedding 模型配置时传 True
"""
try:
query = GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING if check_embedding else GET_INCOMPLETE_COMMUNITIES
result = await self.connector.execute_query(query, end_user_id=end_user_id)
return [row["community_id"] for row in result]
except Exception as e:
logger.error(f"get_incomplete_communities failed: {e}")
return []
async def is_community_complete(self, community_id: str, end_user_id: str, check_embedding: bool = False) -> bool:
"""检查单个社区节点的属性是否完整。"""
try:
query = CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING if check_embedding else CHECK_COMMUNITY_IS_COMPLETE
result = await self.connector.execute_query(query, community_id=community_id, end_user_id=end_user_id)
return result[0]["is_complete"] if result else False
except Exception as e:
logger.error(f"is_community_complete failed: {e}")
return False
async def update_community_metadata(
self,
community_id: str,
@@ -258,7 +208,7 @@ class CommunityRepository:
core_entities: List[str],
summary_embedding: Optional[List[float]] = None,
) -> bool:
"""更新社区的名称、摘要、核心实体列表和摘要向量"""
"""更新社区的名称、摘要、核心实体列表及 summary_embedding"""
try:
result = await self.connector.execute_query(
UPDATE_COMMUNITY_METADATA,
@@ -273,25 +223,3 @@ class CommunityRepository:
except Exception as e:
logger.error(f"update_community_metadata failed: {e}")
return False
async def batch_update_community_metadata(
self,
communities: List[Dict],
) -> bool:
"""批量更新多个社区的元数据。
Args:
communities: 每项包含 community_id, end_user_id, name, summary,
core_entities, summary_embedding
"""
if not communities:
return True
try:
await self.connector.execute_query(
BATCH_UPDATE_COMMUNITY_METADATA,
communities=communities,
)
return True
except Exception as e:
logger.error(f"batch_update_community_metadata failed: {e}")
return False

View File

@@ -42,13 +42,6 @@ async def create_fulltext_indexes():
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""")
print("✓ Created: summariesFulltext")
# 创建 Community 索引
await connector.execute_query("""
CREATE FULLTEXT INDEX communitiesFulltext IF NOT EXISTS FOR (c:Community) ON EACH [c.name, c.summary]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""")
print("✓ Created: communitiesFulltext")
print("\nFull-text indexes created successfully with BM25 support.")
except Exception as e:
@@ -119,18 +112,6 @@ async def create_vector_indexes():
}}
""")
print("✓ Created: summary_embedding_index")
# Community summary embedding index
await connector.execute_query("""
CREATE VECTOR INDEX community_summary_embedding_index IF NOT EXISTS
FOR (c:Community)
ON c.summary_embedding
OPTIONS {indexConfig: {
`vector.dimensions`: 1024,
`vector.similarity_function`: 'cosine'
}}
""")
print("✓ Created: community_summary_embedding_index")
# Dialogue embedding index (optional)
await connector.execute_query("""
@@ -143,18 +124,6 @@ async def create_vector_indexes():
}}
""")
print("✓ Created: dialogue_embedding_index")
# Community summary embedding index
await connector.execute_query("""
CREATE VECTOR INDEX community_summary_embedding_index IF NOT EXISTS
FOR (c:Community)
ON c.summary_embedding
OPTIONS {indexConfig: {
`vector.dimensions`: 1024,
`vector.similarity_function`: 'cosine'
}}
""")
print("✓ Created: community_summary_embedding_index")
print("\nVector indexes created successfully!")
print("\nExpected performance improvement:")

View File

@@ -1122,43 +1122,21 @@ RETURN e.id AS id,
CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id
"""
GET_ENTITY_COUNT_FOR_USER = """
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})
RETURN count(e) AS entity_count
"""
GET_ALL_ENTITY_IDS_FOR_USER = """
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})
RETURN e.id AS id
"""
GET_COMMUNITY_MEMBERS = """
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id})
RETURN e.id AS id, e.name AS name, e.entity_type AS entity_type,
e.importance_score AS importance_score, e.activation_value AS activation_value,
e.name_embedding AS name_embedding,
e.aliases AS aliases, e.description AS description,
e.example AS example
e.name_embedding AS name_embedding
ORDER BY coalesce(e.activation_value, 0) DESC
"""
GET_COMMUNITY_RELATIONSHIPS = """
MATCH (e1:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id})
MATCH (e2:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c)
MATCH (e1)-[r:EXTRACTED_RELATIONSHIP]->(e2)
RETURN e1.name AS subject, r.predicate AS predicate, e2.name AS object
ORDER BY e1.name, r.predicate, e2.name
LIMIT 20
"""
GET_ALL_COMMUNITY_MEMBERS_BATCH = """
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community)
WHERE c.community_id IN $community_ids
RETURN c.community_id AS community_id,
e.id AS id, e.name AS name, e.entity_type AS entity_type,
e.importance_score AS importance_score, e.activation_value AS activation_value,
e.id AS id,
e.name_embedding AS name_embedding,
e.aliases AS aliases, e.description AS description
ORDER BY c.community_id, coalesce(e.activation_value, 0) DESC
e.activation_value AS activation_value
"""
CHECK_USER_HAS_COMMUNITIES = """
@@ -1183,50 +1161,6 @@ SET c.name = $name,
RETURN c.community_id AS community_id
"""
BATCH_UPDATE_COMMUNITY_METADATA = """
UNWIND $communities AS row
MATCH (c:Community {community_id: row.community_id, end_user_id: row.end_user_id})
SET c.name = row.name,
c.summary = row.summary,
c.core_entities = row.core_entities,
c.summary_embedding = row.summary_embedding,
c.updated_at = datetime()
RETURN c.community_id AS community_id
"""
GET_ENTITIES_PAGE = """
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})
OPTIONAL MATCH (e)-[:BELONGS_TO_COMMUNITY]->(c:Community)
RETURN e.id AS id,
e.name AS name,
e.name_embedding AS name_embedding,
e.activation_value AS activation_value,
CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id
ORDER BY e.id
SKIP $skip LIMIT $limit
"""
GET_ENTITY_NEIGHBORS_BATCH_FOR_IDS = """
// 批量拉取指定实体列表的邻居(用于分批全量聚类)
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})
WHERE e.id IN $entity_ids
OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id})
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id})
WHERE nb2.id <> e.id
WITH e, collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors
UNWIND all_neighbors AS nb
WITH e, nb WHERE nb IS NOT NULL
OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community)
RETURN DISTINCT
e.id AS entity_id,
nb.id AS id,
nb.name AS name,
nb.name_embedding AS name_embedding,
nb.activation_value AS activation_value,
CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id
"""
GET_ALL_ENTITY_NEIGHBORS_BATCH = """
// 批量拉取某用户下所有实体的邻居(用于全量聚类预加载)
MATCH (e:ExtractedEntity {end_user_id: $end_user_id})
@@ -1270,59 +1204,37 @@ RETURN
startNode(r) = e AS r_from_e
"""
# Community keyword search: matches name or summary via fulltext index
SEARCH_COMMUNITIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("communitiesFulltext", $q) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.community_id AS id,
c.name AS name,
c.summary AS content,
c.core_entities AS core_entities,
c.member_count AS member_count,
c.end_user_id AS end_user_id,
c.updated_at AS updated_at,
score
ORDER BY score DESC
LIMIT $limit
CHECK_COMMUNITY_IS_COMPLETE = """
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
RETURN (
c.name IS NOT NULL AND c.name <> '' AND
c.summary IS NOT NULL AND c.summary <> '' AND
c.core_entities IS NOT NULL
) AS is_complete
"""
# Community 向量检索 ──────────────────────────────────────────────────
# Community embedding-based search: cosine similarity on Community.summary_embedding
COMMUNITY_EMBEDDING_SEARCH = """
CALL db.index.vector.queryNodes('community_summary_embedding_index', $limit * 100, $embedding)
YIELD node AS c, score
WHERE c.summary_embedding IS NOT NULL
AND ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.community_id AS id,
c.name AS name,
c.summary AS content,
c.core_entities AS core_entities,
c.member_count AS member_count,
c.end_user_id AS end_user_id,
c.updated_at AS updated_at,
score
ORDER BY score DESC
LIMIT $limit
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
RETURN (
c.name IS NOT NULL AND c.name <> '' AND
c.summary IS NOT NULL AND c.summary <> '' AND
c.core_entities IS NOT NULL AND
c.summary_embedding IS NOT NULL
) AS is_complete
"""
# Community 展开检索 ──────────────────────────────────────────────────
# 命中社区后,拉取该社区所有成员实体关联的 Statement 节点(主题→细节两级检索)
EXPAND_COMMUNITY_STATEMENTS = """
MATCH (c:Community {community_id: $community_id})
MATCH (e:ExtractedEntity)-[:BELONGS_TO_COMMUNITY]->(c)
MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
WHERE s.end_user_id = $end_user_id
RETURN s.statement AS statement,
s.id AS id,
s.end_user_id AS end_user_id,
s.created_at AS created_at,
s.valid_at AS valid_at,
s.invalid_at AS invalid_at,
COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value,
COALESCE(s.importance_score, 0.5) AS importance_score,
e.name AS source_entity,
c.name AS community_name
ORDER BY COALESCE(s.activation_value, 0) DESC
LIMIT $limit
GET_INCOMPLETE_COMMUNITIES = """
MATCH (c:Community {end_user_id: $end_user_id})
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
OR c.name = '' OR c.summary = ''
RETURN c.community_id AS community_id
"""
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """
MATCH (c:Community {end_user_id: $end_user_id})
WHERE c.name IS NULL OR c.name = ''
OR c.summary IS NULL OR c.summary = ''
OR c.core_entities IS NULL
OR (c.summary_embedding IS NULL AND c.summary IS NOT NULL AND c.summary <> '(empty)')
RETURN c.community_id AS community_id
"""

View File

@@ -158,12 +158,11 @@ async def save_dialog_and_statements_to_neo4j(
statement_chunk_edges: List[StatementChunkEdge],
statement_entity_edges: List[StatementEntityEdge],
connector: Neo4jConnector,
config_id: Optional[str] = None,
llm_model_id: Optional[str] = None,
) -> bool:
"""Save dialogue nodes, chunk nodes, statement nodes, entities, and all relationships to Neo4j using graph models.
只负责数据写入,不触发聚类。聚类由调用方在写入成功后通过
schedule_clustering_after_write() 显式触发。
Args:
dialogue_nodes: List of DialogueNode objects to save
chunk_nodes: List of ChunkNode objects to save
@@ -294,6 +293,9 @@ async def save_dialog_and_statements_to_neo4j(
logger.info("Transaction completed. Summary: %s", summary)
logger.debug("Full transaction results: %r", results)
# 写入成功后,异步触发聚类(不阻塞写入响应)
schedule_clustering_after_write(entity_nodes, config_id=config_id, llm_model_id=llm_model_id)
return True
except Exception as e:
@@ -305,8 +307,8 @@ async def save_dialog_and_statements_to_neo4j(
def schedule_clustering_after_write(
entity_nodes: List,
config_id: Optional[str] = None,
llm_model_id: Optional[str] = None,
embedding_model_id: Optional[str] = None,
) -> None:
"""
写入 Neo4j 成功后,调度后台聚类任务。
@@ -325,14 +327,14 @@ def schedule_clustering_after_write(
end_user_id = entity_nodes[0].end_user_id
new_entity_ids = [e.id for e in entity_nodes]
logger.info(f"[Clustering] 准备触发聚类,实体数: {len(new_entity_ids)}, end_user_id: {end_user_id}")
asyncio.create_task(_trigger_clustering(new_entity_ids, end_user_id, llm_model_id=llm_model_id, embedding_model_id=embedding_model_id))
asyncio.create_task(_trigger_clustering(new_entity_ids, end_user_id, config_id=config_id, llm_model_id=llm_model_id))
async def _trigger_clustering(
new_entity_ids: List[str],
end_user_id: str,
config_id: Optional[str] = None,
llm_model_id: Optional[str] = None,
embedding_model_id: Optional[str] = None,
) -> None:
"""
聚类触发函数,自动判断全量初始化还是增量更新。
@@ -342,7 +344,7 @@ async def _trigger_clustering(
from app.core.memory.storage_services.clustering_engine import LabelPropagationEngine
logger.info(f"[Clustering] 开始聚类end_user_id={end_user_id}, 实体数={len(new_entity_ids)}")
connector = Neo4jConnector()
engine = LabelPropagationEngine(connector, llm_model_id=llm_model_id, embedding_model_id=embedding_model_id)
engine = LabelPropagationEngine(connector, config_id=config_id, llm_model_id=llm_model_id)
await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids)
logger.info(f"[Clustering] 聚类完成end_user_id={end_user_id}")
except Exception as e:

View File

@@ -4,13 +4,10 @@ from typing import Any, Dict, List, Optional
from app.repositories.neo4j.cypher_queries import (
CHUNK_EMBEDDING_SEARCH,
COMMUNITY_EMBEDDING_SEARCH,
ENTITY_EMBEDDING_SEARCH,
EXPAND_COMMUNITY_STATEMENTS,
MEMORY_SUMMARY_EMBEDDING_SEARCH,
SEARCH_CHUNK_BY_CHUNK_ID,
SEARCH_CHUNKS_BY_CONTENT,
SEARCH_COMMUNITIES_BY_KEYWORD,
SEARCH_DIALOGUE_BY_DIALOG_ID,
SEARCH_ENTITIES_BY_NAME,
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
@@ -288,15 +285,6 @@ async def search_graph(
limit=limit,
))
task_keys.append("summaries")
if "communities" in include:
tasks.append(connector.execute_query(
SEARCH_COMMUNITIES_BY_KEYWORD,
q=q,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("communities")
# Execute all queries in parallel
task_results = await asyncio.gather(*tasks, return_exceptions=True)
@@ -305,7 +293,6 @@ async def search_graph(
results = {}
for key, result in zip(task_keys, task_results):
if isinstance(result, Exception):
logger.warning(f"search_graph: {key} 关键词查询异常: {result}")
results[key] = []
else:
results[key] = result
@@ -362,11 +349,7 @@ async def search_graph_by_embedding(
print(f"[PERF] Embedding generation took: {embed_time:.4f}s")
if not embeddings or not embeddings[0]:
logger.warning(
f"search_graph_by_embedding: embedding 生成失败或为空,"
f"query='{query_text[:50]}', end_user_id={end_user_id},向量检索跳过"
)
return {"statements": [], "chunks": [], "entities": [], "summaries": [], "communities": []}
return {"statements": [], "chunks": [], "entities": [], "summaries": []}
embedding = embeddings[0]
# Prepare tasks for parallel execution
@@ -413,16 +396,6 @@ async def search_graph_by_embedding(
))
task_keys.append("summaries")
# Communities (向量语义匹配)
if "communities" in include:
tasks.append(connector.execute_query(
COMMUNITY_EMBEDDING_SEARCH,
embedding=embedding,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("communities")
# Execute all queries in parallel
query_start = time.time()
task_results = await asyncio.gather(*tasks, return_exceptions=True)
@@ -435,12 +408,10 @@ async def search_graph_by_embedding(
"chunks": [],
"entities": [],
"summaries": [],
"communities": [],
}
for key, result in zip(task_keys, task_results):
if isinstance(result, Exception):
logger.warning(f"search_graph_by_embedding: {key} 向量查询异常: {result}")
results[key] = []
else:
results[key] = result
@@ -690,62 +661,6 @@ async def search_graph_by_chunk_id(
return {"chunks": chunks}
async def search_graph_community_expand(
connector: Neo4jConnector,
community_ids: List[str],
end_user_id: str,
limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
"""
三期:社区展开检索 —— 主题 → 细节两级检索。
命中 Community 节点后,沿 BELONGS_TO_COMMUNITY 关系拉取成员实体,
再沿 REFERENCES_ENTITY 关系拉取关联的 Statement 节点,
按 activation_value 降序返回,实现"主题摘要 → 具体记忆"的深度召回。
Args:
connector: Neo4j 连接器
community_ids: 已命中的社区 ID 列表
end_user_id: 用户 ID用于数据隔离
limit: 每个社区最多返回的 Statement 数量
Returns:
{"expanded_statements": [Statement 列表,含 community_name / source_entity 字段]}
"""
if not community_ids or not end_user_id:
return {"expanded_statements": []}
tasks = [
connector.execute_query(
EXPAND_COMMUNITY_STATEMENTS,
community_id=cid,
end_user_id=end_user_id,
limit=limit,
)
for cid in community_ids
]
task_results = await asyncio.gather(*tasks, return_exceptions=True)
expanded: List[Dict[str, Any]] = []
for cid, result in zip(community_ids, task_results):
if isinstance(result, Exception):
logger.warning(f"社区展开检索失败 community_id={cid}: {result}")
else:
expanded.extend(result)
# 按 activation_value 全局排序后去重
from app.core.memory.src.search import _deduplicate_results
expanded.sort(
key=lambda x: float(x.get("activation_value") or 0),
reverse=True,
)
expanded = _deduplicate_results(expanded)
logger.info(f"社区展开检索完成: community_ids={community_ids}, 展开 statements={len(expanded)}")
return {"expanded_statements": expanded}
async def search_graph_by_created_at(
connector: Neo4jConnector,
end_user_id: Optional[str] = None,

View File

@@ -43,6 +43,7 @@ class WorkflowConfigRepository:
edges: list[dict[str, Any]],
variables: list[dict[str, Any]] | None = None,
execution_config: dict[str, Any] | None = None,
features: dict[str, Any] | None = None,
triggers: list[dict[str, Any]] | None = None
) -> WorkflowConfig:
"""创建或更新工作流配置
@@ -53,6 +54,7 @@ class WorkflowConfigRepository:
edges: 边列表
variables: 变量列表
execution_config: 执行配置
features: 功能特性
triggers: 触发器列表
Returns:
@@ -82,6 +84,7 @@ class WorkflowConfigRepository:
edges=edges,
variables=variables or [],
execution_config=execution_config or {},
features=features or {},
triggers=triggers or []
)
self.db.add(config)

View File

@@ -149,18 +149,26 @@ class FileUploadConfig(BaseModel):
)
# 通用文件PDF/DOCX/XLSX/TXT/CSV/JSON最大 100MB
document_enabled: bool = Field(default=False)
document_max_size_mb: int = Field(default=100)
document_max_size_mb: int = Field(default=50)
document_allowed_extensions: List[str] = Field(
default=["pdf", "docx", "xlsx", "txt", "csv", "json", "md"]
default=["pdf", "docx", "doc", "xlsx", "xls", "txt", "csv", "json", "md"]
)
# 视频文件MP4/MOV/AVI/WebM最大 500MB
video_enabled: bool = Field(default=False)
video_max_size_mb: int = Field(default=500)
video_max_size_mb: int = Field(default=50)
video_allowed_extensions: List[str] = Field(
default=["mp4", "mov"]
default=["mp4"]
)
# 最大文件数量
max_file_count: int = Field(default=5, ge=1, le=20)
max_file_count: int = Field(default=5, ge=1)
@field_validator("max_file_count")
@classmethod
def validate_max_file_count(cls, v: int) -> int:
from app.core.config import settings
if v > settings.MAX_FILE_COUNT:
raise ValueError(f"max_file_count 不能超过 {settings.MAX_FILE_COUNT}")
return v
class OpeningStatementConfig(BaseModel):

View File

@@ -21,7 +21,7 @@ class MemoryWriteRequest(BaseModel):
"""
end_user_id: str = Field(..., description="End user ID (required)")
message: str = Field(..., description="Message content to store")
config_id: str = Field(..., description="Memory configuration ID (required)")
config_id: Optional[str] = Field(None, description="Memory configuration ID")
storage_type: str = Field("neo4j", description="Storage type: neo4j or rag")
user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID")
@@ -68,7 +68,7 @@ class MemoryReadRequest(BaseModel):
"0",
description="Search mode: 0=verify, 1=direct, 2=context"
)
config_id: str = Field(..., description="Memory configuration ID (required)")
config_id: Optional[str] = Field(None, description="Memory configuration ID")
storage_type: str = Field("neo4j", description="Storage type: neo4j or rag")
user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID")
@@ -132,79 +132,3 @@ class MemoryReadResponse(BaseModel):
description="Intermediate retrieval outputs"
)
end_user_id: str = Field(..., description="End user ID")
class CreateEndUserRequest(BaseModel):
"""Request schema for creating an end user.
Attributes:
workspace_id: Workspace ID (required)
other_id: External user identifier (required)
other_name: Display name for the end user
"""
workspace_id: str = Field(..., description="Workspace ID (required)")
other_id: str = Field(..., description="External user identifier (required)")
other_name: Optional[str] = Field("", description="Display name")
@field_validator("workspace_id")
@classmethod
def validate_workspace_id(cls, v: str) -> str:
"""Validate that workspace_id is not empty."""
if not v or not v.strip():
raise ValueError("workspace_id is required and cannot be empty")
return v.strip()
@field_validator("other_id")
@classmethod
def validate_other_id(cls, v: str) -> str:
"""Validate that other_id is not empty."""
if not v or not v.strip():
raise ValueError("other_id is required and cannot be empty")
return v.strip()
class CreateEndUserResponse(BaseModel):
"""Response schema for end user creation.
Attributes:
id: Created end user UUID
other_id: External user identifier
other_name: Display name
workspace_id: Workspace the user belongs to
"""
id: str = Field(..., description="End user UUID")
other_id: str = Field(..., description="External user identifier")
other_name: str = Field("", description="Display name")
workspace_id: str = Field(..., description="Workspace ID")
class MemoryConfigItem(BaseModel):
"""Schema for a single memory config in the list response.
Attributes:
config_id: Configuration UUID
config_name: Configuration name
config_desc: Configuration description
is_default: Whether this is the workspace default config
scene_name: Associated ontology scene name
created_at: Creation timestamp
updated_at: Last update timestamp
"""
config_id: str = Field(..., description="Configuration ID")
config_name: str = Field(..., description="Configuration name")
config_desc: Optional[str] = Field(None, description="Configuration description")
is_default: bool = Field(False, description="Whether this is the workspace default")
scene_name: Optional[str] = Field(None, description="Associated ontology scene name")
created_at: Optional[str] = Field(None, description="Creation timestamp")
updated_at: Optional[str] = Field(None, description="Last update timestamp")
class ListConfigsResponse(BaseModel):
"""Response schema for listing memory configs.
Attributes:
configs: List of memory config items
total: Total number of configs
"""
configs: List[MemoryConfigItem] = Field(default_factory=list, description="List of configs")
total: int = Field(0, description="Total number of configs")

View File

@@ -417,7 +417,7 @@ class MemoryConfig:
# Ontology scene association
scene_id: Optional[UUID] = None
ontology_class_infos: list[dict] = field(default_factory=list)
ontology_classes: Optional[list] = field(default=None)
def __post_init__(self):
"""Validate configuration after initialization."""

View File

@@ -118,28 +118,54 @@ class AppChatService:
)
model_info = ModelInfo(
model_name=api_key_obj.model_name,
provider=api_key_obj.provider,
api_key=api_key_obj.api_key,
api_base=api_key_obj.api_base,
capability=api_key_obj.capability,
is_omni=api_key_obj.is_omni,
model_type=ModelType.LLM
)
# 加载历史消息
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=10
)
history = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
history = []
for msg in messages:
content = [{"type": "text", "text": msg.content}]
# 处理 meta_data 中的 files
if msg.meta_data and msg.meta_data.get("files"):
files = msg.meta_data.get("files", [])
# 使用 MultimodalService 处理文件
multimodal_service = MultimodalService(self.db, api_config=model_info)
# 将 files 转换为 FileInput 格式
file_inputs = []
for file in files:
from app.schemas.app_schema import FileInput, TransferMethod
file_input = FileInput(
type=file.get("type"),
transfer_method=TransferMethod.REMOTE_URL,
url=file.get("url")
)
file_inputs.append(file_input)
history_processed_files = await multimodal_service.history_process_files(files=file_inputs)
content.extend(history_processed_files)
history.append({
"role": msg.role,
"content": content
})
# 处理多模态文件
processed_files = None
if files:
model_info = ModelInfo(
model_name=api_key_obj.model_name,
provider=api_key_obj.provider,
api_key=api_key_obj.api_key,
api_base=api_key_obj.api_base,
capability=api_key_obj.capability,
is_omni=api_key_obj.is_omni,
model_type=ModelType.LLM
)
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(user_id, files)
logger.info(f"处理了 {len(processed_files)} 个文件")
@@ -313,31 +339,54 @@ class AppChatService:
streaming=True
)
model_info = ModelInfo(
model_name=api_key_obj.model_name,
provider=api_key_obj.provider,
api_key=api_key_obj.api_key,
api_base=api_key_obj.api_base,
capability=api_key_obj.capability,
is_omni=api_key_obj.is_omni,
model_type=ModelType.LLM
)
# 加载历史消息
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=10
)
history = []
memory_config = {"enabled": True, 'max_history': 10}
if memory_config.get("enabled"):
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=memory_config.get("max_history", 10)
)
history = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
for msg in messages:
content = [{"type": "text", "text": msg.content}]
# 处理 meta_data 中的 files
if msg.meta_data and msg.meta_data.get("files"):
history_files = msg.meta_data.get("files", [])
# 使用 MultimodalService 处理文件
multimodal_service = MultimodalService(self.db, api_config=model_info)
# 将 files 转换为 FileInput 格式
file_inputs = []
for file in history_files:
from app.schemas.app_schema import FileInput, TransferMethod
file_input = FileInput(
type=file.get("type"),
transfer_method=TransferMethod.REMOTE_URL,
url=file.get("url")
)
file_inputs.append(file_input)
history_processed_files = await multimodal_service.history_process_files(files=file_inputs)
content.extend(history_processed_files)
history.append({
"role": msg.role,
"content": content
})
# 处理多模态文件
processed_files = None
if files:
model_info = ModelInfo(
model_name=api_key_obj.model_name,
provider=api_key_obj.provider,
api_key=api_key_obj.api_key,
api_base=api_key_obj.api_base,
capability=api_key_obj.capability,
is_omni=api_key_obj.is_omni,
model_type=ModelType.LLM
)
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(user_id, files)
logger.info(f"处理了 {len(processed_files)} 个文件")
@@ -347,8 +396,14 @@ class AppChatService:
total_tokens = 0
text_queue: asyncio.Queue = asyncio.Queue()
api_key_config = {
"model_name": api_key_obj.model_name,
"api_key": api_key_obj.api_key,
"api_base": api_key_obj.api_base,
"provider": api_key_obj.provider,
}
stream_audio_url, tts_task = await self.agent_service._generate_tts_streaming(
features_config, api_key_obj,
features_config, api_key_config,
text_queue=text_queue,
tenant_id=tenant_id, workspace_id=workspace_id
)

View File

@@ -16,6 +16,7 @@ from app.models.app_release_model import AppRelease
from app.models.knowledge_model import Knowledge
from app.models.models_model import ModelConfig
from app.models.tool_model import ToolConfig as ToolConfigModel
from app.models.skill_model import Skill
from app.models.workflow_model import WorkflowConfig
from app.services.workflow_service import WorkflowService
from app.core.workflow.adapters.memory_bear.memory_bear_adapter import MemoryBearAdapter
@@ -84,7 +85,9 @@ class AppDslService:
if "knowledge_retrieval" in cfg:
enriched["knowledge_retrieval"] = self._enrich_knowledge_retrieval(cfg["knowledge_retrieval"])
if "tools" in cfg:
enriched["tools"] = self._enrich_tools(cfg["tools"])
enriched["tools"] = self._enrich_tools(cfg.get("tools"))
if "skills" in cfg:
enriched["skills"] = self._enrich_skills(cfg.get("skills"))
return enriched
if app_type == AppType.MULTI_AGENT:
enriched = {**cfg}
@@ -108,6 +111,7 @@ class AppDslService:
"variables": config.variables if config else [],
"edges": config.edges if config else [],
"nodes": config.nodes if config else [],
"features": config.features if config else {},
"execution_config": config.execution_config if config else {},
"triggers": config.triggers if config else [],
} if config else {}
@@ -123,7 +127,8 @@ class AppDslService:
"memory": config.memory if config else None,
"variables": config.variables if config else [],
"tools": self._enrich_tools(config.tools) if config else [],
"skills": config.skills if config else {},
"skills": self._enrich_skills(config.skills) if config else {},
"features": config.features if config else {}
} if config else {}
dsl = {**meta, "app": app_meta, "agent_config": config_data}
@@ -185,6 +190,22 @@ class AppDslService:
def _enrich_tools(self, tools: list) -> list:
return [{**t, "_ref": self._tool_ref(t.get("tool_id"))} for t in (tools or [])]
def _skill_ref(self, skill_id) -> Optional[dict]:
if not skill_id:
return None
s = self.db.query(Skill).filter(Skill.id == skill_id).first()
return {"id": str(skill_id), "name": s.name} if s else {"id": str(skill_id)}
def _enrich_skills(self, skills: Optional[dict]) -> Optional[dict]:
if not skills:
return skills
skill_ids = skills.get("skill_ids", [])
enriched_ids = [
{"id": sid, "_ref": self._skill_ref(sid)}
for sid in (skill_ids or [])
]
return {**skills, "skill_ids": enriched_ids}
def _agent_ref(self, agent_id) -> Optional[dict]:
if not agent_id:
return None
@@ -249,7 +270,8 @@ class AppDslService:
memory=self._resolve_memory(cfg.get("memory"), workspace_id, warnings),
variables=cfg.get("variables", []),
tools=self._resolve_tools(cfg.get("tools", []), tenant_id, warnings),
skills=cfg.get("skills", {}),
skills=self._resolve_skills(cfg.get("skills", {}), tenant_id, warnings),
features=cfg.get("features", {}),
is_active=True,
created_at=now,
updated_at=now,
@@ -290,6 +312,7 @@ class AppDslService:
edges=[e.model_dump() for e in result.edges],
variables=[v.model_dump() for v in result.variables],
execution_config=wf.get("execution_config", {}),
features=wf.get("features", {}),
triggers=wf.get("triggers", []),
validate=False,
)
@@ -444,6 +467,46 @@ class AppDslService:
return {**memory, "memory_config_id": None, "enabled": False}
return memory
def _resolve_skills(self, skills: Optional[dict], tenant_id: uuid.UUID, warnings: list) -> dict:
if not skills:
return skills or {}
resolved_ids = []
for entry in (skills.get("skill_ids") or []):
# entry 可能是 {"id": "...", "_ref": {...}} 或直接是字符串
if isinstance(entry, dict):
ref = entry.get("_ref") or ({"name": None, "id": entry.get("id")} if entry.get("id") else None)
skill_id = self._resolve_skill(ref, tenant_id, warnings)
else:
skill_id = self._resolve_skill({"id": str(entry)}, tenant_id, warnings)
if skill_id:
resolved_ids.append(str(skill_id))
return {**{k: v for k, v in skills.items() if k != "skill_ids"}, "skill_ids": resolved_ids}
def _resolve_skill(self, ref: Optional[dict], tenant_id: uuid.UUID, warnings: list) -> Optional[str]:
if not ref:
return None
# 先按 id 匹配
if ref.get("id"):
try:
s = self.db.query(Skill).filter(
Skill.id == uuid.UUID(str(ref["id"])),
Skill.tenant_id == tenant_id
).first()
if s:
return str(s.id)
except Exception:
pass
# 再按名称匹配
if ref.get("name"):
s = self.db.query(Skill).filter(
Skill.name == ref["name"],
Skill.tenant_id == tenant_id
).first()
if s:
return str(s.id)
warnings.append(f"未找到技能: {ref}")
return None
def _resolve_tools(self, tools: list, tenant_id: uuid.UUID, warnings: list) -> list:
result = []
for t in (tools or []):

View File

@@ -833,8 +833,6 @@ class AppService:
# 跨工作空间时,获取目标工作空间的 tenant_id 用于判断模型配置是否可用
target_tenant_id = None
available_model_ids: set = set()
available_kb_ids: set = set()
if is_cross_workspace:
target_ws = self.db.get(Workspace, target_workspace_id)
if not target_ws:
@@ -849,28 +847,29 @@ class AppService:
if source_config:
if is_cross_workspace:
# Batch-collect and preload all referenced resources
model_ids, kb_ids = self._collect_resource_ids_from_config(
source_config.default_model_config_id,
source_config.knowledge_retrieval,
source_config.tools
# 跨工作空间model/tools/skills 属于 tenant 级别直接保留,
# knowledge_bases 属于 workspace 级别需过滤memory_config 需清空
_, kb_ids = self._collect_resource_ids_from_config(
None, source_config.knowledge_retrieval
)
available_model_ids, available_kb_ids = self._preload_cross_workspace_resources(
target_tenant_id, target_workspace_id, model_ids, kb_ids
)
new_model_config_id = self._is_model_available(
source_config.default_model_config_id, available_model_ids
_, available_kb_ids = self._preload_cross_workspace_resources(
target_tenant_id, target_workspace_id, set(), kb_ids
)
new_model_config_id = source_config.default_model_config_id
new_knowledge_retrieval = self._clean_knowledge_retrieval(
source_config.knowledge_retrieval, available_kb_ids
)
new_tools = self._clean_tools(
source_config.tools, available_kb_ids
new_tools = copy.deepcopy(source_config.tools) if source_config.tools else []
new_memory = self._clean_memory_cross_workspace(
source_config.memory, target_workspace_id
)
new_skills = copy.deepcopy(source_config.skills) if source_config.skills else {}
else:
new_model_config_id = source_config.default_model_config_id
new_knowledge_retrieval = copy.deepcopy(source_config.knowledge_retrieval) if source_config.knowledge_retrieval else None
new_tools = copy.deepcopy(source_config.tools) if source_config.tools else []
new_memory = copy.deepcopy(source_config.memory) if source_config.memory else None
new_skills = copy.deepcopy(source_config.skills) if source_config.skills else {}
new_config = AgentConfig(
id=uuid.uuid4(),
@@ -879,9 +878,11 @@ class AppService:
default_model_config_id=new_model_config_id,
model_parameters=copy.deepcopy(source_config.model_parameters) if source_config.model_parameters else None,
knowledge_retrieval=new_knowledge_retrieval,
memory=copy.deepcopy(source_config.memory) if source_config.memory else None,
memory=new_memory,
variables=copy.deepcopy(source_config.variables) if source_config.variables else [],
tools=new_tools,
skills=new_skills,
features=copy.deepcopy(source_config.features) if source_config.features else {},
is_active=True,
created_at=now,
updated_at=now,
@@ -894,28 +895,14 @@ class AppService:
).first()
if source_config:
if is_cross_workspace:
model_ids, kb_ids = self._collect_resource_ids_from_workflow_nodes(
source_config.nodes
)
available_model_ids, available_kb_ids = self._preload_cross_workspace_resources(
target_tenant_id, target_workspace_id, model_ids, kb_ids
)
new_nodes = self._clean_workflow_nodes_for_cross_workspace(
source_config.nodes or [],
available_model_ids,
available_kb_ids
)
else:
new_nodes = copy.deepcopy(source_config.nodes) if source_config.nodes else []
new_config = WorkflowConfig(
id=uuid.uuid4(),
app_id=new_app.id,
nodes=new_nodes,
nodes=copy.deepcopy(source_config.nodes) if source_config.nodes else [],
edges=copy.deepcopy(source_config.edges) if source_config.edges else [],
variables=copy.deepcopy(source_config.variables) if source_config.variables else [],
execution_config=copy.deepcopy(source_config.execution_config) if source_config.execution_config else {},
features=copy.deepcopy(source_config.features) if source_config.features else {},
triggers=copy.deepcopy(source_config.triggers) if source_config.triggers else [],
is_active=True,
created_at=now,
@@ -929,24 +916,15 @@ class AppService:
).first()
if source_config:
if is_cross_workspace:
model_ids = {source_config.default_model_config_id} if source_config.default_model_config_id else set()
available_model_ids, _ = self._preload_cross_workspace_resources(
target_tenant_id, target_workspace_id, model_ids, set()
)
new_model_config_id = self._is_model_available(
source_config.default_model_config_id, available_model_ids
)
else:
new_model_config_id = source_config.default_model_config_id
# multi_agent 的 model_config_id/sub_agents/routing_rules 均属于 tenant 级别直接保留
# 跨空间时 master_agent_idAppRelease属于源空间需清空
new_config = MultiAgentConfig(
id=uuid.uuid4(),
app_id=new_app.id,
master_agent_id=source_config.master_agent_id if not is_cross_workspace else None,
master_agent_name=source_config.master_agent_name,
default_model_config_id=new_model_config_id,
model_parameters=source_config.model_parameters,
default_model_config_id=source_config.default_model_config_id,
model_parameters=copy.deepcopy(source_config.model_parameters) if source_config.model_parameters else None,
orchestration_mode=source_config.orchestration_mode,
sub_agents=copy.deepcopy(source_config.sub_agents) if source_config.sub_agents else [],
routing_rules=copy.deepcopy(source_config.routing_rules) if source_config.routing_rules else None,
@@ -1037,8 +1015,7 @@ class AppService:
@staticmethod
def _collect_resource_ids_from_config(
model_config_id: Optional[uuid.UUID],
knowledge_retrieval: Optional[dict],
tools: Optional[list]
knowledge_retrieval: Optional[dict]
) -> tuple:
"""Extract all model config IDs and knowledge base IDs from an app config."""
model_ids: set = set()
@@ -1048,62 +1025,12 @@ class AppService:
model_ids.add(model_config_id)
if knowledge_retrieval and isinstance(knowledge_retrieval, dict):
if "kb_ids" in knowledge_retrieval:
for kid in knowledge_retrieval.get("kb_ids", []):
if kid:
kb_ids.add(str(kid))
if knowledge_retrieval.get("knowledge_id"):
kb_ids.add(str(knowledge_retrieval["knowledge_id"]))
if tools:
for tool in tools:
if isinstance(tool, dict):
kid = tool.get("knowledge_id") or tool.get("kb_id")
if kid:
kb_ids.add(str(kid))
if "knowledge_bases" in knowledge_retrieval:
for kid in knowledge_retrieval.get("knowledge_bases", []):
kb_ids.add(str(kid.get("kb_id")))
return model_ids, kb_ids
@staticmethod
def _collect_resource_ids_from_workflow_nodes(nodes: list) -> tuple:
"""Extract all model config IDs and knowledge base IDs from workflow nodes."""
model_ids: set = set()
kb_ids: set = set()
for node in (nodes or []):
if not isinstance(node, dict):
continue
data = node.get("data", {})
if not isinstance(data, dict):
continue
for key in ("model_config_id", "default_model_config_id"):
val = data.get(key)
if val:
try:
model_ids.add(uuid.UUID(str(val)))
except (ValueError, AttributeError):
pass
kr = data.get("knowledge_retrieval")
if isinstance(kr, dict):
for kid in kr.get("kb_ids", []):
if kid:
kb_ids.add(str(kid))
if kr.get("knowledge_id"):
kb_ids.add(str(kr["knowledge_id"]))
if data.get("knowledge_id"):
kb_ids.add(str(data["knowledge_id"]))
for kid in data.get("kb_ids", []):
if kid:
kb_ids.add(str(kid))
return model_ids, kb_ids
@staticmethod
def _is_model_available(model_config_id: Optional[uuid.UUID], available_model_ids: set) -> Optional[uuid.UUID]:
if not model_config_id:
return None
return model_config_id if model_config_id in available_model_ids else None
@staticmethod
def _is_kb_available(kb_id: Optional[str], available_kb_ids: set) -> Optional[str]:
if not kb_id:
@@ -1124,95 +1051,53 @@ class AppService:
cleaned = copy.deepcopy(knowledge_retrieval)
if "kb_ids" in cleaned and isinstance(cleaned["kb_ids"], list):
cleaned["kb_ids"] = [
kid for kid in cleaned["kb_ids"]
if self._is_kb_available(kid, available_kb_ids)
if "knowledge_bases" in cleaned and isinstance(cleaned["knowledge_bases"], list):
cleaned["knowledge_bases"] = [
kb for kb in cleaned["knowledge_bases"]
if self._is_kb_available(kb.get("kb_id"), available_kb_ids)
]
if "knowledge_id" in cleaned:
cleaned["knowledge_id"] = self._is_kb_available(
cleaned.get("knowledge_id"), available_kb_ids
)
return cleaned
def _clean_tools(
def _clean_memory_cross_workspace(
self,
tools: Optional[list],
available_kb_ids: set
) -> list:
"""Clean tools config, keeping built-in tools and tools with available KBs."""
if not tools:
return []
memory: Optional[dict],
target_workspace_id: uuid.UUID
) -> Optional[dict]:
"""Clear memory_config_id/memory_content if it doesn't belong to target workspace."""
if not memory:
return None
cleaned = []
for tool in tools:
if not isinstance(tool, dict):
cleaned.append(tool)
continue
from app.models.memory_config_model import MemoryConfig
tool_type = tool.get("type", "")
if tool_type in ("builtin", "built_in", "system"):
cleaned.append(copy.deepcopy(tool))
continue
cleaned = copy.deepcopy(memory)
# 兼容旧字段 memory_content 和新字段 memory_config_id
mid = cleaned.get("memory_config_id") or cleaned.get("memory_content")
if mid:
try:
mid_uuid = uuid.UUID(str(mid))
except (ValueError, AttributeError):
exists = self.db.query(MemoryConfig).filter(
MemoryConfig.config_id_old == int(mid),
MemoryConfig.workspace_id == target_workspace_id
).first()
if not exists:
cleaned["memory_config_id"] = None
cleaned.pop("memory_content", None)
cleaned["enabled"] = False
return cleaned
kb_id = tool.get("knowledge_id") or tool.get("kb_id")
if kb_id:
if self._is_kb_available(kb_id, available_kb_ids):
cleaned.append(copy.deepcopy(tool))
continue
exists = self.db.query(
self.db.query(MemoryConfig).filter(
MemoryConfig.config_id == mid_uuid,
MemoryConfig.workspace_id == target_workspace_id
).exists()
).scalar()
if not exists:
cleaned["memory_config_id"] = None
cleaned.pop("memory_content", None)
cleaned["enabled"] = False
cleaned.append(copy.deepcopy(tool))
return cleaned
def _clean_workflow_nodes_for_cross_workspace(
self,
nodes: list,
available_model_ids: set,
available_kb_ids: set
) -> list:
"""Clean workflow nodes, using pre-loaded resource sets. Uses deepcopy to avoid mutating source."""
if not nodes:
return []
cleaned = []
for node in nodes:
if not isinstance(node, dict):
cleaned.append(node)
continue
node_copy = copy.deepcopy(node)
data = node_copy.get("data")
if not isinstance(data, dict):
cleaned.append(node_copy)
continue
for key in ("model_config_id", "default_model_config_id"):
if key in data and data[key]:
try:
mid = uuid.UUID(str(data[key]))
except (ValueError, AttributeError):
data[key] = None
continue
data[key] = str(mid) if mid in available_model_ids else None
if "knowledge_retrieval" in data and data["knowledge_retrieval"]:
data["knowledge_retrieval"] = self._clean_knowledge_retrieval(
data["knowledge_retrieval"], available_kb_ids
)
if "knowledge_id" in data:
data["knowledge_id"] = self._is_kb_available(
data.get("knowledge_id"), available_kb_ids
)
if "kb_ids" in data and isinstance(data["kb_ids"], list):
data["kb_ids"] = [
kid for kid in data["kb_ids"]
if self._is_kb_available(kid, available_kb_ids)
]
cleaned.append(node_copy)
return cleaned
def list_apps(

View File

@@ -21,6 +21,7 @@ from app.models.conversation_model import ConversationDetail
from app.models.prompt_optimizer_model import RoleType
from app.repositories.conversation_repository import ConversationRepository, MessageRepository
from app.schemas.conversation_schema import ConversationOut
from app.schemas.model_schema import ModelInfo
from app.services import workspace_service
from app.services.model_service import ModelConfigService
@@ -119,25 +120,27 @@ class ConversationService:
def get_user_conversations(
self,
user_id: uuid.UUID
) -> list[Conversation]:
user_id: uuid.UUID,
page: int = 1,
page_size: int = 20
) -> tuple[list[Conversation], int]:
"""
Retrieve recent conversations for a specific user
This method delegates persistence logic to the repository layer and
applies service-level defaults (e.g. recent conversation limit).
Retrieve recent conversations for a specific user with pagination.
Args:
user_id (uuid.UUID): Unique identifier of the user.
page (int): Page number (1-based). Defaults to 1.
page_size (int): Number of items per page. Defaults to 20.
Returns:
list[Conversation]: A list of recent conversation entities.
tuple[list[Conversation], int]: A list of recent conversation entities and total count.
"""
conversations = self.conversation_repo.get_conversation_by_user_id(
conversations, total = self.conversation_repo.get_conversation_by_user_id(
user_id,
limit=10
page=page,
page_size=page_size
)
return conversations
return conversations, total
def list_conversations(
self,
@@ -267,10 +270,11 @@ class ConversationService:
return messages
def get_conversation_history(
async def get_conversation_history(
self,
conversation_id: uuid.UUID,
max_history: Optional[int] = None
max_history: Optional[int] = None,
api_config: Optional[ModelInfo] = None
) -> List[dict]:
"""
Retrieve historical conversation messages formatted as dictionaries.
@@ -278,6 +282,7 @@ class ConversationService:
Args:
conversation_id (uuid.UUID): Conversation UUID.
max_history (Optional[int]): Maximum number of messages to retrieve.
api_config (Optional[ModelInfo]): Model API configuration for multimodal processing.
Returns:
List[dict]: List of message dictionaries with keys 'role' and 'content'.
@@ -288,13 +293,37 @@ class ConversationService:
)
# 转换为字典格式
history = [
{
history = []
for msg in messages:
content = [{"type": "text", "text": msg.content}]
# 处理 meta_data 中的 files
if msg.meta_data and msg.meta_data.get("files"):
files = msg.meta_data.get("files", [])
if api_config:
# 使用 MultimodalService 处理文件
from app.services.multimodal_service import MultimodalService
multimodal_service = MultimodalService(self.db, api_config=api_config)
# 将 files 转换为 FileInput 格式
file_inputs = []
for file in files:
from app.schemas.app_schema import FileInput, TransferMethod
file_input = FileInput(
type=file.get("type"),
transfer_method=TransferMethod.REMOTE_URL,
url=file.get("url")
)
file_inputs.append(file_input)
processed_files = await multimodal_service.history_process_files(files=file_inputs)
content.extend(processed_files)
history.append({
"role": msg.role,
"content": msg.content
}
for msg in messages
]
"content": content
})
return history
@@ -522,9 +551,18 @@ class ConversationService:
type=ModelType(model_type)
)
conversation_messages = self.get_conversation_history(
conversation_messages = await self.get_conversation_history(
conversation_id=conversation_id,
max_history=20
max_history=20,
api_config=ModelInfo(
model_name=model_name,
provider=provider,
api_key=api_key,
api_base=api_base,
capability=api_config.capability,
is_omni=api_config.is_omni,
model_type=model_type
)
)
if len(conversation_messages) == 0:
return ConversationOut(

View File

@@ -579,9 +579,20 @@ class AgentRunService:
user_id=user_id
)
model_info = ModelInfo(
model_name=api_key_config["model_name"],
provider=api_key_config["provider"],
api_key=api_key_config["api_key"],
api_base=api_key_config["api_base"],
capability=api_key_config["capability"],
is_omni=api_key_config["is_omni"],
model_type=model_config.type
)
# 6. 加载历史消息
history = await self._load_conversation_history(
conversation_id=conversation_id,
api_config=model_info,
max_history=10
)
@@ -589,15 +600,6 @@ class AgentRunService:
processed_files = None
if files:
# 获取 provider 信息
model_info = ModelInfo(
model_name=api_key_config["model_name"],
provider=api_key_config["provider"],
api_key=api_key_config["api_key"],
api_base=api_key_config["api_base"],
capability=api_key_config["capability"],
is_omni=api_key_config["is_omni"],
model_type=ModelType.LLM
)
provider = api_key_config.get("provider", "openai")
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(user_id, files)
@@ -815,9 +817,20 @@ class AgentRunService:
sub_agent=sub_agent
)
model_info = ModelInfo(
model_name=api_key_config["model_name"],
provider=api_key_config["provider"],
api_key=api_key_config["api_key"],
api_base=api_key_config["api_base"],
capability=api_key_config["capability"],
is_omni=api_key_config["is_omni"],
model_type=model_config.type
)
# 6. 加载历史消息
history = await self._load_conversation_history(
conversation_id=conversation_id,
api_config=model_info,
max_history=memory_config.get("max_history", 10)
)
@@ -825,15 +838,6 @@ class AgentRunService:
processed_files = None
if files:
# 获取 provider 信息
model_info = ModelInfo(
model_name=api_key_config["model_name"],
provider=api_key_config["provider"],
api_key=api_key_config["api_key"],
api_base=api_key_config["api_base"],
capability=api_key_config["capability"],
is_omni=api_key_config["is_omni"],
model_type=ModelType.LLM
)
provider = api_key_config.get("provider", "openai")
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(user_id, files)
@@ -1115,6 +1119,7 @@ class AgentRunService:
async def _load_conversation_history(
self,
conversation_id: str,
api_config: ModelInfo | None = None,
max_history: int = 10
) -> List[Dict[str, str]]:
"""加载会话历史消息
@@ -1129,9 +1134,11 @@ class AgentRunService:
try:
conversation_service = ConversationService(self.db)
history = conversation_service.get_conversation_history(
# 获取 API 配置用于多模态处理
history = await conversation_service.get_conversation_history(
conversation_id=uuid.UUID(conversation_id),
max_history=max_history
max_history=max_history,
api_config=api_config
)
logger.debug(

View File

@@ -1179,7 +1179,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
app = db.query(App).filter(App.id == app_id).first()
if not app:
logger.warning(f"App not found: {app_id}")
# raise ValueError(f"应用不存在: {app_id}")
raise ValueError(f"应用不存在: {app_id}")
# TODO: temp fix for draft run
# if not app.current_release_id:
# logger.warning(f"No current release for app: {app_id}")
@@ -1252,15 +1252,17 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
memory_config_service = MemoryConfigService(db)
memory_config = memory_config_service.get_config_with_fallback(
memory_config_id=memory_config_id_to_use,
workspace_id=end_user.workspace_id
workspace_id=app.workspace_id
)
memory_config_id = str(memory_config.config_id) if memory_config else None
result = {
"end_user_id": str(end_user_id),
"app_id": str(app_id),
"release_id": str(app.current_release_id) if app.current_release_id else None,
"memory_config_id": memory_config_id,
"workspace_id": str(end_user.workspace_id)
"workspace_id": str(app.workspace_id)
}
logger.info(

View File

@@ -84,65 +84,43 @@ class MemoryAPIService:
if not app:
logger.warning(f"App not found for end_user: {end_user_id}")
# raise ResourceNotFoundException(
# resource_type="App",
# resource_id=str(end_user.app_id)
# )
# temporally allow any workspace to access
# if end_user.workspace_id != workspace_id:
# print(f"[DEBUG] end_user.workspace_id={end_user.workspace_id}, api_key.workspace_id={workspace_id}")
# logger.warning(
# f"End user {end_user_id} belongs to workspace {end_user.workspace_id}, "
# f"not authorized workspace {workspace_id}"
# )
# raise BusinessException(
# message=f"End user does not belong to authorized workspace. end_user.workspace_id={end_user.workspace_id}, api_key.workspace_id={workspace_id}",
# code=BizCode.FORBIDDEN
# )
raise ResourceNotFoundException(
resource_type="App",
resource_id=str(end_user.app_id)
)
if app.workspace_id != workspace_id:
logger.warning(
f"End user {end_user_id} belongs to workspace {app.workspace_id}, "
f"not authorized workspace {workspace_id}"
)
raise BusinessException(
message="End user does not belong to authorized workspace",
code=BizCode.FORBIDDEN
)
logger.info(f"End user {end_user_id} validated successfully")
return end_user
def _update_end_user_config(self, end_user_id: str, config_id: str) -> None:
"""Update the end user's memory_config_id.
Silently updates the config association. Logs warnings on failure
but does not raise, so it won't block the main read/write operation.
Args:
end_user_id: End user identifier
config_id: Memory configuration ID to assign
"""
try:
config_uuid = uuid.UUID(config_id)
from app.repositories.end_user_repository import EndUserRepository
end_user_repo = EndUserRepository(self.db)
end_user_repo.update_memory_config_id(
end_user_id=uuid.UUID(end_user_id),
memory_config_id=config_uuid,
)
except Exception as e:
logger.warning(f"Failed to update memory_config_id for end_user {end_user_id}: {e}")
async def write_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
config_id: str,
config_id: Optional[str] = None,
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Write memory with validation.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then delegates to MemoryAgentService.write_memory.
Validates end_user exists and belongs to workspace, then delegates
to MemoryAgentService.write_memory.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id)
message: Message content to store
config_id: Memory configuration ID (required)
config_id: Optional memory configuration ID
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
@@ -158,8 +136,7 @@ class MemoryAPIService:
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
# Use end_user_id as end_user_id for memory operations
try:
# Delegate to MemoryAgentService
@@ -211,21 +188,21 @@ class MemoryAPIService:
end_user_id: str,
message: str,
search_switch: str = "0",
config_id: str = "",
config_id: Optional[str] = None,
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Read memory with validation.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then delegates to MemoryAgentService.read_memory.
Validates end_user exists and belongs to workspace, then delegates
to MemoryAgentService.read_memory.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id)
message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Memory configuration ID (required)
config_id: Optional memory configuration ID
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
@@ -241,8 +218,7 @@ class MemoryAPIService:
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
# Use end_user_id as end_user_id for memory operations
try:
@@ -280,50 +256,3 @@ class MemoryAPIService:
message=f"Memory read failed: {str(e)}",
code=BizCode.MEMORY_READ_FAILED
)
def list_memory_configs(
self,
workspace_id: uuid.UUID,
) -> Dict[str, Any]:
"""List all memory configs for a workspace.
Args:
workspace_id: Workspace ID from API key authorization
Returns:
Dict with configs list and total count
Raises:
BusinessException: If listing fails
"""
logger.info(f"Listing memory configs for workspace: {workspace_id}")
try:
from app.repositories.memory_config_repository import MemoryConfigRepository
results = MemoryConfigRepository.get_all(self.db, workspace_id=workspace_id)
configs = []
for config, scene_name in results:
configs.append({
"config_id": str(config.config_id),
"config_name": config.config_name,
"config_desc": config.config_desc,
"is_default": config.is_default or False,
"scene_name": scene_name,
"created_at": config.created_at.isoformat() if config.created_at else None,
"updated_at": config.updated_at.isoformat() if config.updated_at else None,
})
logger.info(f"Found {len(configs)} memory configs for workspace {workspace_id}")
return {
"configs": configs,
"total": len(configs),
}
except Exception as e:
logger.error(f"Failed to list memory configs for workspace {workspace_id}: {e}")
raise BusinessException(
message=f"Failed to list memory configs: {str(e)}",
code=BizCode.MEMORY_READ_FAILED
)

View File

@@ -107,29 +107,28 @@ def _validate_config_id(config_id, db: Session = None):
)
def _load_ontology_class_infos(db: Session, scene_id) -> list:
"""从 ontology_class 表加载完整本体类型信息name + description,用于注入剪枝提示词。
def _load_ontology_classes(db: Session, scene_id, pruning_scene: Optional[str]) -> Optional[list]:
"""从 ontology_class 表加载场景类型名称列表,用于注入提示词。
Args:
db: 数据库会话
scene_id: 本体场景 UUID
pruning_scene: 语义剪枝场景名称(保留参数,暂未使用)
Returns:
[{"class_name": ..., "class_description": ...}, ...] 或空列表
class_name 字符串列表,或 None无数据时
"""
if not scene_id:
return []
return None
try:
from app.repositories.ontology_class_repository import OntologyClassRepository
repo = OntologyClassRepository(db)
classes = repo.get_classes_by_scene(scene_id)
return [
{"class_name": c.class_name, "class_description": c.class_description or ""}
for c in classes if c.class_name
]
names = [c.class_name for c in classes if c.class_name]
return names if names else None
except Exception as e:
logger.warning(f"Failed to load ontology class infos for scene_id={scene_id}: {e}")
return []
logger.warning(f"Failed to load ontology classes for scene_id={scene_id}: {e}")
return None
class MemoryConfigService:
@@ -384,7 +383,7 @@ class MemoryConfigService:
pruning_threshold=float(memory_config.pruning_threshold) if memory_config.pruning_threshold is not None else 0.5,
# Ontology scene association
scene_id=memory_config.scene_id,
ontology_class_infos=_load_ontology_class_infos(self.db, memory_config.scene_id),
ontology_classes=_load_ontology_classes(self.db, memory_config.scene_id, memory_config.pruning_scene),
)
elapsed_ms = (time.time() - start_time) * 1000
@@ -551,13 +550,11 @@ class MemoryConfigService:
- pruning_switch: bool
- pruning_scene: str
- pruning_threshold: float
- ontology_class_infos: list of {class_name, class_description} dicts
"""
return {
"pruning_switch": memory_config.pruning_enabled,
"pruning_scene": memory_config.pruning_scene,
"pruning_threshold": memory_config.pruning_threshold,
"ontology_class_infos": memory_config.ontology_class_infos or [],
}
def get_ontology_types(self, memory_config: MemoryConfig):

View File

@@ -619,7 +619,7 @@ class MemoryForgetService:
recent_trends.append({
'date': date_str,
'merged_count': record.merged_count,
'average_activation': record.average_activation_value,
'average_activation': round(record.average_activation_value, 2) if record.average_activation_value is not None else None,
'total_nodes': record.total_nodes,
'execution_time': int(record.execution_time.timestamp() * 1000)
})

View File

@@ -11,6 +11,8 @@
import base64
import io
import uuid
import zipfile
import chardet
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
@@ -42,12 +44,10 @@ PDF_MIME = ['application/pdf']
DOC_MIME = [
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/zip'
]
XLSX_MIME = [
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-excel',
'application/zip'
]
CSV_MIME = ['text/csv', 'application/csv']
JSON_MIME = ['application/json']
@@ -418,6 +418,71 @@ class MultimodalService:
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件provider={self.provider}")
return result
async def history_process_files(
self,
files: Optional[List[FileInput]],
) -> List[Dict[str, Any]]:
"""
处理文件列表,返回 LLM 可用的格式
Args:
files: 文件输入列表
Returns:
List[Dict]: LLM 可用的内容格式列表(根据 provider 返回不同格式)
"""
if not files:
return []
# 获取对应的策略
# dashscope 的 omni 模型使用 OpenAI 兼容格式
if self.provider == "dashscope" and self.is_omni:
strategy_class = OpenAIFormatStrategy
else:
strategy_class = PROVIDER_STRATEGIES.get(self.provider)
if not strategy_class:
logger.warning(f"未找到 provider '{self.provider}' 的策略,使用默认策略")
strategy_class = DashScopeFormatStrategy
result = []
for idx, file in enumerate(files):
strategy = strategy_class(file)
if not file.url:
file.url = await self.get_file_url(file)
try:
if file.type == FileType.IMAGE and "vision" in self.capability:
is_support, content = await self._process_image(file, strategy)
result.append(content)
elif file.type == FileType.DOCUMENT:
is_support, content = await self._process_document(file, strategy)
result.append(content)
elif file.type == FileType.AUDIO and "audio" in self.capability:
is_support, content = await self._process_audio(file, strategy)
result.append(content)
elif file.type == FileType.VIDEO and "video" in self.capability:
is_support, content = await self._process_video(file, strategy)
result.append(content)
else:
logger.warning(f"不支持的文件类型: {file.type}")
except Exception as e:
logger.error(
f"处理文件失败",
extra={
"file_index": idx,
"file_type": file.type,
"error": str(e)
},
exc_info=True
)
# 继续处理其他文件,不中断整个流程
result.append({
"type": "text",
"text": f"[文件处理失败: {str(e)}]"
})
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件provider={self.provider}")
return result
def write_perceptual_memory(
self,
end_user_id: str,
@@ -588,12 +653,12 @@ class MultimodalService:
file.set_content(file_content)
file_mime_type = magic.from_buffer(file_content, mime=True)
if file_mime_type in TEXT_MIME:
return file_content.decode("utf-8")
return self._decode_text_safe(file_content)
elif file_mime_type in PDF_MIME:
return await self._extract_pdf_text(file_content)
elif file_mime_type in DOC_MIME and file.file_type.endswith(('docx', 'doc')):
elif self._is_word_file(file_content, file_mime_type):
return await self._extract_word_text(file_content)
elif file_mime_type in XLSX_MIME and file.file_type.endswith(("xlsx", "xls")):
elif self._is_excel_file(file_content, file_mime_type):
return await self._extract_xlsx_text(file_content)
elif file_mime_type in CSV_MIME:
return await self._extract_csv_text(file_content)
@@ -622,52 +687,156 @@ class MultimodalService:
@staticmethod
async def _extract_word_text(file_content: bytes) -> str:
"""提取 Word 文档文本"""
"""提取 Word 文档文本(支持 .docx 和旧版 .doc"""
# 先尝试 docxZIP 格式)
if file_content[:2] == b'PK':
try:
word_file = io.BytesIO(file_content)
doc = Document(word_file)
return '\n'.join(p.text for p in doc.paragraphs)
except Exception as e:
logger.error(f"提取 docx 文本失败: {e}")
return f"[docx 提取失败: {str(e)}]"
# 旧版 .docOLE2 格式)
try:
word_file = io.BytesIO(file_content)
doc = Document(word_file)
text_parts = [paragraph.text for paragraph in doc.paragraphs]
return '\n'.join(text_parts)
import olefile
ole = olefile.OleFileIO(io.BytesIO(file_content))
if not ole.exists('WordDocument'):
return "[doc 提取失败: 未找到 WordDocument 流]"
# 读取 WordDocument 流,提取可见 ASCII/Unicode 文本
stream = ole.openstream('WordDocument').read()
# Word Binary Format: 文本在流中以 UTF-16-LE 编码存储
# 简单提取:过滤出可打印字符段
try:
text = stream.decode('utf-16-le', errors='ignore')
except Exception:
text = stream.decode('latin-1', errors='ignore')
# 过滤控制字符,保留可打印内容
import re
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
text = re.sub(r' +', ' ', text).strip()
ole.close()
return text
except Exception as e:
logger.error(f"提取 Word 文本失败: {e}")
return f"[Word 提取失败: {str(e)}]"
logger.error(f"提取 doc 文本失败: {e}")
return f"[doc 提取失败: {str(e)}]"
@staticmethod
async def _extract_xlsx_text(file_content: bytes) -> str:
"""提取 Excel 文本"""
"""提取 Excel 文本(支持 .xlsx 和旧版 .xls"""
# xlsxZIP 格式)
if file_content[:2] == b'PK':
try:
wb = openpyxl.load_workbook(io.BytesIO(file_content), read_only=True, data_only=True)
parts = []
for sheet in wb.worksheets:
parts.append(f"[Sheet: {sheet.title}]")
for row in sheet.iter_rows(values_only=True):
parts.append('\t'.join('' if v is None else str(v) for v in row))
return '\n'.join(parts)
except Exception as e:
logger.error(f"提取 xlsx 文本失败: {e}")
return f"[xlsx 提取失败: {str(e)}]"
# xlsOLE2/BIFF 格式)
try:
wb = openpyxl.load_workbook(io.BytesIO(file_content), read_only=True, data_only=True)
import xlrd
wb = xlrd.open_workbook(file_contents=file_content)
parts = []
for sheet in wb.worksheets:
parts.append(f"[Sheet: {sheet.title}]")
for row in sheet.iter_rows(values_only=True):
parts.append('\t'.join('' if v is None else str(v) for v in row))
for sheet in wb.sheets():
parts.append(f"[Sheet: {sheet.name}]")
for row_idx in range(sheet.nrows):
parts.append('\t'.join(str(sheet.cell_value(row_idx, col)) for col in range(sheet.ncols)))
return '\n'.join(parts)
except Exception as e:
logger.error(f"提取 Excel 文本失败: {e}")
return f"[Excel 提取失败: {str(e)}]"
logger.error(f"提取 xls 文本失败: {e}")
return f"[xls 提取失败: {str(e)}]"
@staticmethod
async def _extract_csv_text(file_content: bytes) -> str:
async def _extract_csv_text(self, file_content: bytes) -> str:
"""提取 CSV 文本"""
try:
text = file_content.decode('utf-8-sig')
text = self._decode_text_safe(file_content)
reader = csv.reader(io.StringIO(text))
return '\n'.join('\t'.join(row) for row in reader)
except Exception as e:
logger.error(f"提取 CSV 文本失败: {e}")
return f"[CSV 提取失败: {str(e)}]"
@staticmethod
async def _extract_json_text(file_content: bytes) -> str:
async def _extract_json_text(self, file_content: bytes) -> str:
"""提取 JSON 文本"""
try:
data = json.loads(file_content.decode('utf-8'))
text = self._decode_text_safe(file_content)
data = json.loads(text)
return json.dumps(data, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"提取 JSON 文本失败: {e}")
return f"[JSON 提取失败: {str(e)}]"
def _is_word_file(self, file_content: bytes, mime_type: str) -> bool:
"""判断是不是 Word 文件doc / docx不依赖后缀"""
# 旧版 .doc
if mime_type == 'application/msword':
return True
# 新版 .docxZIP 内部包含 word/document.xml
header = file_content[:4]
if header == b'PK\x03\x04':
try:
with zipfile.ZipFile(io.BytesIO(file_content)) as zf:
return "word/document.xml" in zf.namelist()
except:
pass
return False
def _is_excel_file(self, file_content: bytes, mime_type: str) -> bool:
"""判断是不是 Excel 文件xls / xlsx不依赖后缀"""
# 旧版 .xls
if mime_type == 'application/vnd.ms-excel':
return True
# 新版 .xlsxZIP 内部包含 xl/workbook.xml
header = file_content[:4]
if header == b'PK\x03\x04':
try:
with zipfile.ZipFile(io.BytesIO(file_content)) as zf:
return "xl/workbook.xml" in zf.namelist()
except:
pass
return False
@staticmethod
def _decode_text_safe(file_content: bytes) -> str:
"""
【万能文本解码】
自动检测编码,支持 utf-8 / gbk / gb2312 / utf-8-sig / ascii 等
永远不报错,永远不乱码
"""
if not file_content:
return ""
# 1. 自动检测文件编码
detect = chardet.detect(file_content)
encoding = detect.get("encoding") or "utf-8"
encoding = encoding.lower()
# 2. 兼容常见中文编码
compatible_encodings = ["utf-8", "gbk", "gb18030", "gb2312", "ascii", "latin-1"]
# 3. 按优先级尝试解码
for enc in [encoding] + compatible_encodings:
if not enc:
continue
try:
return file_content.decode(enc.strip())
except (UnicodeDecodeError, LookupError):
continue
# 终极兜底
return file_content.decode("utf-8", errors="replace")
def get_multimodal_service(db: Session) -> MultimodalService:
"""获取多模态服务实例(依赖注入)"""

View File

@@ -121,7 +121,7 @@ async def run_pilot_extraction(
"pruning_scene": memory_config.pruning_scene,
"pruning_threshold": memory_config.pruning_threshold,
"scene_id": str(memory_config.scene_id) if memory_config.scene_id else None,
"ontology_class_infos": memory_config.ontology_class_infos,
"ontology_classes": memory_config.ontology_classes,
}
config = PruningConfig(**pruning_config_dict)
@@ -232,11 +232,9 @@ async def run_pilot_extraction(
"chunker_strategy": memory_config.chunker_strategy,
}
# 添加剪枝统计信息(始终包含 pruning 字段,确保前端不会因字段缺失报错)
preprocessing_summary["pruning"] = pruning_stats if pruning_stats else {
"enabled": memory_config.pruning_enabled,
"deleted_count": 0,
}
# 添加剪枝统计信息
if pruning_stats:
preprocessing_summary["pruning"] = pruning_stats
await progress_callback("text_preprocessing_complete", "预处理文本完成(剪枝 + 分块)", preprocessing_summary)

View File

@@ -1408,12 +1408,11 @@ async def analytics_memory_types(
if end_user_id:
try:
conversation_repo = ConversationRepository(db)
conversations = conversation_repo.get_conversation_by_user_id(
conversations, total = conversation_repo.get_conversation_by_user_id(
user_id=uuid.UUID(end_user_id),
limit=100, # 获取更多会话以准确统计
is_activate=True
)
work_count = len(conversations)
work_count = total
logger.debug(f"工作记忆数量(会话数): {work_count} (end_user_id={end_user_id})")
except Exception as e:
logger.warning(f"获取会话数量失败工作记忆数量设为0: {str(e)}")

View File

@@ -25,7 +25,7 @@ from app.repositories.workflow_repository import (
WorkflowExecutionRepository,
WorkflowNodeExecutionRepository
)
from app.schemas import DraftRunRequest, FileInput
from app.schemas import DraftRunRequest, FileInput, FileType
from app.services.conversation_service import ConversationService
from app.services.multi_agent_service import convert_uuids_to_str
from app.services.multimodal_service import MultimodalService
@@ -55,6 +55,7 @@ class WorkflowService:
edges: list[dict[str, Any]],
variables: list[dict[str, Any]] | None = None,
execution_config: dict[str, Any] | None = None,
features: dict[str, Any] | None = None,
triggers: list[dict[str, Any]] | None = None,
validate: bool = True
) -> WorkflowConfig:
@@ -66,6 +67,7 @@ class WorkflowService:
edges: 边列表
variables: 变量列表
execution_config: 执行配置
features: 功能特性
triggers: 触发器列表
validate: 是否验证配置
@@ -81,6 +83,7 @@ class WorkflowService:
"edges": edges,
"variables": variables or [],
"execution_config": execution_config or {},
"features": features or {},
"triggers": triggers or []
}
@@ -101,6 +104,7 @@ class WorkflowService:
edges=edges,
variables=variables,
execution_config=execution_config,
features=features,
triggers=triggers
)

View File

@@ -2675,13 +2675,15 @@ def write_perceptual_memory(
time_limit=7200, # 2小时硬超时
soft_time_limit=6900,
)
def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]:
def init_community_clustering_for_users(self, end_user_ids: List[str], workspace_id: Optional[str] = None) -> Dict[str, Any]:
"""触发型任务:检查指定用户列表,对有 ExtractedEntity 但无 Community 节点的用户执行全量聚类。
由 /dashboard/end_users 接口触发,已有社区节点的用户直接跳过。
任务完成且所有用户数据均完整时,写入 Redis 标记,避免下次重复投递。
Args:
end_user_ids: 需要检查的用户 ID 列表
workspace_id: 工作空间 ID用于完成标记
Returns:
包含任务执行结果的字典
@@ -2707,6 +2709,7 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
# 批量预取所有用户的配置(内置兜底:用户配置不可用时自动回退到工作空间默认配置)
user_llm_map: Dict[str, Optional[str]] = {}
user_embedding_map: Dict[str, Optional[str]] = {}
try:
with get_db_context() as db:
from app.services.memory_agent_service import get_end_users_connected_configs_batch
@@ -2718,21 +2721,54 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
try:
cfg = MemoryConfigService(db).load_memory_config(config_id=config_id)
user_llm_map[uid] = str(cfg.llm_model_id) if cfg.llm_model_id else None
user_embedding_map[uid] = str(cfg.embedding_model_id) if cfg.embedding_model_id else None
except Exception as e:
logger.warning(f"[CommunityCluster] 用户 {uid} 加载 LLM 配置失败,将使用 None: {e}")
logger.warning(f"[CommunityCluster] 用户 {uid} 加载配置失败,将使用 None: {e}")
user_llm_map[uid] = None
user_embedding_map[uid] = None
else:
user_llm_map[uid] = None
user_embedding_map[uid] = None
except Exception as e:
logger.warning(f"[CommunityCluster] 批量获取 LLM 配置失败,所有用户将使用 None: {e}")
logger.warning(f"[CommunityCluster] 批量获取配置失败,所有用户将使用 None: {e}")
for end_user_id in end_user_ids:
try:
# 已有社区节点则跳过
# 已有社区节点时,检查是否存在属性不完整的节点
has_communities = await repo.has_communities(end_user_id)
if has_communities:
skipped += 1
logger.debug(f"[CommunityCluster] 用户 {end_user_id} 已有社区节点,跳过")
llm_model_id = user_llm_map.get(end_user_id)
embedding_model_id = user_embedding_map.get(end_user_id)
incomplete_ids = await repo.get_incomplete_communities(
end_user_id, check_embedding=bool(embedding_model_id)
)
if not incomplete_ids:
skipped += 1
logger.debug(f"[CommunityCluster] 用户 {end_user_id} 社区节点均完整,跳过")
continue
# 对不完整的社区节点逐一补全元数据
engine = LabelPropagationEngine(
connector=connector,
llm_model_id=llm_model_id,
embedding_model_id=embedding_model_id,
)
logger.info(
f"[CommunityCluster] 用户 {end_user_id} 发现 {len(incomplete_ids)} 个属性不完整的社区,开始补全"
)
patch_ok = 0
patch_fail = 0
for cid in incomplete_ids:
try:
await engine._generate_community_metadata(cid, end_user_id)
patch_ok += 1
except Exception as patch_err:
patch_fail += 1
logger.error(f"[CommunityCluster] 社区 {cid} 元数据补全失败: {patch_err}")
logger.info(
f"[CommunityCluster] 用户 {end_user_id} 社区补全完成: 成功={patch_ok}, 失败={patch_fail}"
)
initialized += 1
continue
# 检查是否有 ExtractedEntity 节点
@@ -2742,11 +2778,13 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
logger.debug(f"[CommunityCluster] 用户 {end_user_id} 无实体节点,跳过")
continue
# 每个用户使用自己的 llm_model_id
# 每个用户使用自己的 llm_model_id / embedding_model_id
llm_model_id = user_llm_map.get(end_user_id)
embedding_model_id = user_embedding_map.get(end_user_id)
engine = LabelPropagationEngine(
connector=connector,
llm_model_id=llm_model_id,
embedding_model_id=embedding_model_id,
)
logger.info(f"[CommunityCluster] 用户 {end_user_id}{len(entities)} 个实体开始全量聚类llm_model_id={llm_model_id}")

View File

@@ -1,4 +1,38 @@
{
"v0.2.8": {
"introduction": {
"codeName": "景玉",
"releaseDate": "2026-3-20",
"upgradePosition": "🐻 MemoryBear v0.2.8 社区版全面升级应用共享、多模态交互与平台基础设施,引入语音交互、感知记忆和云端存储,打造更强大的开放 AI 记忆平台",
"coreUpgrades": [
"1. 应用共享与发布<br>* 应用共享Agent、工作流、Agent 集群):全类型应用共享至其他空间<br>* 分享应用默认开启记忆功能:发布分享后记忆默认开启,关闭时提醒<br>* 工作流记忆分享规则:按记忆配置自动控制分享页记忆开关<br>* 分享会话联网搜索修复:恢复分享应用的联网搜索能力",
"2. 多模态与交互 💬<br>* 语音输入:模型接口和应用支持语音输入<br>* 语音回复:应用支持语音回复模态<br>* 多模态感知记忆:记忆系统支持视觉、音频、图片和文件的感知记忆<br>* 对话框文件展示:试运行和体验分享中正确展示上传文件",
"3. 平台与基础设施 ⚙️<br>* i18n 国际化:全面多语言多地区支持<br>* 云端文件存储OSS + S3支持阿里云 OSS 和 S3 云端上传<br>* Flower 容器监控Celery 异步任务监控与管理",
"4. EndUser 身份迁移 🔐<br>* EndUser 从 app_id 迁移至 workspace_id身份从应用级迁移至工作空间级",
"5. 情景记忆 🧠<br>* 情景记忆聚类算法:基于社区图谱的聚类算法,支持老用户图谱生成",
"6. 稳健性与缺陷修复 🔧<br>* MCP 服务删除后工具 404修复删除 MCP 服务后接口报错<br>* 应用导出配置不一致:导出已保存配置而非画布状态<br>* 工作流节点 ID 重复:修复复制节点后 ID 冲突<br>* 条件分支连线错误:修复保存刷新后连线错乱<br>* 回复节点内容丢失:修复点击画布后内容消失<br>* 连接桩规则优化:禁止非法连接方向<br>* 知识库状态列宽度:锁定或自适应宽度<br>* 等待中文档预览:支持未完成解析文档预览<br>* 知识库关联修复:统一修复关联问题<br>* 多模态对话连续性:修复多模态内容后无法继续对话<br>* 时区统一:环境变量统一控制存储和任务时区<br>* 遗忘强度精度:修复小数显示过长",
"<br>",
"v0.2.8 社区版在应用共享和多模态交互方面实现重大升级,感知记忆扩展了平台的认知维度。后续将深化多智能体协作、情景记忆聚类,并持续优化平台稳定性与开放生态。",
"MemoryBear —— 让 AI 拥有记忆 🐻✨"
]
},
"introduction_en": {
"codeName": "JingYu",
"releaseDate": "2026-3-20",
"upgradePosition": "🐻 MemoryBear v0.2.8 Community delivers multimodal interaction, perceptual memory, cloud storage, and workspace-level identity for a more capable open AI memory platform",
"coreUpgrades": [
"1. Application Sharing & Publishing<br>* Application Sharing (Agent, Workflow, Agent Cluster): Full sharing across all app types<br>* Memory Enabled by Default: Memory auto-enabled on shared apps with disable reminder<br>* Workflow Memory Sharing Rules: Auto-controlled based on memory configuration<br>* Shared Session Web Search Fix: Restored web search for shared apps",
"2. Multimodal & Interaction 💬<br>* Voice Input: Model interfaces and apps support voice input<br>* Voice Reply: Apps support voice reply modality<br>* Multimodal Perceptual Memory: Memory system supports visual, audio, image, and file perception<br>* File Display in Chat: Uploaded files display correctly in dry-run and sharing",
"3. Platform & Infrastructure ⚙️<br>* i18n Internationalization: Full multi-language multi-region support<br>* Cloud File Storage (OSS + S3): Alibaba Cloud OSS and S3 cloud uploads<br>* Flower Container Monitoring: Celery async task monitoring and management",
"4. EndUser Identity Migration 🔐<br>* EndUser Migration from app_id to workspace_id: Identity migrated to workspace level",
"5. Episodic Memory 🧠<br>* Episodic Memory Clustering: Community-graph-based clustering with legacy user support",
"6. Robustness & Bug Fixes 🔧<br>* MCP Service Deletion 404: Fixed tool endpoint error after MCP removal<br>* App Export Config Mismatch: Exports saved config instead of canvas state<br>* Workflow Duplicate Node ID: Fixed ID conflict on node duplication<br>* Conditional Branch Wiring: Fixed wiring reset after save/refresh<br>* Reply Node Content Loss: Fixed content disappearing on canvas click<br>* Port Connection Rules: Prohibited invalid connection directions<br>* Knowledge Base Status Width: Locked or adaptive column width<br>* Pending Document Preview: Preview support for unparsed documents<br>* Knowledge Base Association Fixes: Consolidated association fixes<br>* Multimodal Conversation Continuity: Fixed single-round limit after multimodal input<br>* Timezone Unification: Env-var controlled unified timezone<br>* Forgetting Strength Precision: Fixed excessive decimal display",
"<br>",
"v0.2.8 Community delivers major upgrades in application sharing and multimodal interaction, with perceptual memory expanding the platform's cognitive dimensions. Multi-agent collaboration, episodic clustering, and continued platform stability improvements are ahead.",
"MemoryBear — Give AI Memory 🐻✨"
]
}
},
"v0.2.7": {
"introduction": {
"codeName": "武陵",

View File

@@ -1,156 +0,0 @@
"""20260311000
Revision ID: 74b51dfece29
Revises: f017efe4831c
Create Date: 2026-03-19 10:15:42.488027
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '74b51dfece29'
down_revision: Union[str, None] = 'f017efe4831c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# 先删除旧的触发器(如果存在)
op.execute("DROP TRIGGER IF EXISTS tr_documents_update_stats ON documents;")
# 创建或更新 knowledges 统计信息的函数
op.execute("""
CREATE OR REPLACE FUNCTION update_knowledge_stats()
RETURNS TRIGGER AS $$
DECLARE
-- 声明变量用于存储当前处理的知识库ID
current_kb_id UUID;
-- 声明变量用于存储文件夹知识库ID如果存在
folder_kb_id UUID;
-- 声明变量用于存储递归查询结果
folder_ids UUID[];
BEGIN
-- 处理 documents 表的插入、更新或删除
IF TG_TABLE_NAME = 'documents' THEN
-- 1. 更新 knowledges 表的 doc_num
UPDATE knowledges SET doc_num = (
SELECT COUNT(*) FROM documents
WHERE kb_id = knowledges.id AND status = 1
)
WHERE id = NEW.kb_id OR id = OLD.kb_id;
-- 2. 更新 knowledges 表的 chunk_num
UPDATE knowledges SET chunk_num = (
SELECT COALESCE(SUM(chunk_num), 0) FROM documents
WHERE kb_id = knowledges.id AND status = 1
)
WHERE id = NEW.kb_id OR id = OLD.kb_id;
-- 通过 knowledge_shares 表同步统计信息
-- 1. 使用 source_kb_id 的 doc_num 更新 target_kb_id 的 doc_num
UPDATE knowledges AS target
SET doc_num = source.doc_num
FROM knowledge_shares ks
JOIN knowledges AS source ON source.id = ks.source_kb_id
WHERE ks.target_kb_id = target.id
AND (source.id = NEW.kb_id OR source.id = OLD.kb_id);
-- 2. 使用 source_kb_id 的 chunk_num 更新 target_kb_id 的 chunk_num
UPDATE knowledges AS target
SET chunk_num = source.chunk_num
FROM knowledge_shares ks
JOIN knowledges AS source ON source.id = ks.source_kb_id
WHERE ks.target_kb_id = target.id
AND (source.id = NEW.kb_id OR source.id = OLD.kb_id);
-- 处理文件夹知识库的统计更新
-- 获取当前处理的知识库ID可能是NEW或OLD中的kb_id
IF NEW.kb_id IS NOT NULL THEN
current_kb_id := NEW.kb_id;
ELSIF OLD.kb_id IS NOT NULL THEN
current_kb_id := OLD.kb_id;
ELSE
RETURN NULL;
END IF;
-- 查找当前知识库的父文件夹(如果有)
SELECT id INTO folder_kb_id FROM knowledges
WHERE id IN (
SELECT parent_id FROM knowledges WHERE id = current_kb_id
) AND type = 'Folder';
-- 如果存在父文件夹,递归处理所有父文件夹
IF folder_kb_id IS NOT NULL THEN
-- 使用递归CTE获取所有父文件夹ID包括多级嵌套
WITH RECURSIVE folder_hierarchy AS (
-- 基础查询:获取直接父文件夹
SELECT id FROM knowledges
WHERE id = folder_kb_id AND type = 'Folder'
UNION ALL
-- 递归查询:获取父文件夹的父文件夹
SELECT k.id FROM knowledges k
JOIN folder_hierarchy fh ON k.id = k.parent_id
WHERE k.type = 'Folder'
)
-- 将结果存入数组以便处理
SELECT array_agg(id) INTO folder_ids FROM folder_hierarchy;
-- 遍历所有父文件夹并更新统计信息
FOR i IN 1..array_length(folder_ids, 1) LOOP
-- 更新文件夹的doc_num汇总所有子知识库的doc_num
UPDATE knowledges SET doc_num = (
-- 汇总直接子知识库的doc_num
SELECT COALESCE(SUM(child.doc_num), 0)
FROM knowledges child
WHERE child.parent_id = folder_ids[i] AND child.status = 1
-- 加上直接属于该文件夹的文档数(如果有)
UNION ALL
SELECT COALESCE(COUNT(*), 0)
FROM documents
WHERE kb_id = folder_ids[i] AND status = 1
LIMIT 1
)
WHERE id = folder_ids[i];
-- 更新文件夹的chunk_num汇总所有子知识库的chunk_num
UPDATE knowledges SET chunk_num = (
-- 汇总直接子知识库的chunk_num
SELECT COALESCE(SUM(child.chunk_num), 0)
FROM knowledges child
WHERE child.parent_id = folder_ids[i] AND child.status = 1
-- 加上直接属于该文件夹的文档的chunk_num如果有
UNION ALL
SELECT COALESCE(SUM(d.chunk_num), 0)
FROM documents d
WHERE d.kb_id = folder_ids[i] AND d.status = 1
LIMIT 1
)
WHERE id = folder_ids[i];
END LOOP;
END IF;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
""")
# documents 表上的触发器(插入、更新、删除后)
op.execute("""
CREATE TRIGGER tr_documents_update_stats
AFTER INSERT OR UPDATE OR DELETE ON documents
FOR EACH ROW
EXECUTE FUNCTION update_knowledge_stats();
""")
def downgrade() -> None:
# 删除触发器
op.execute("DROP TRIGGER IF EXISTS tr_documents_update_stats ON documents;")
# 删除函数
op.execute("DROP FUNCTION IF EXISTS update_knowledge_stats();")

View File

@@ -303,7 +303,7 @@ async def test_get_node_output_not_exist_with_default():
"""测试获取不存在的节点输出(使用默认值)"""
pool = VariablePool()
result = pool.get_node_output("nonexistent_node", default=None, strict=False)
result = pool.get_node_output("nonexistent_node", defalut=None, strict=False)
assert result is None

View File

@@ -52,6 +52,10 @@ export const getKnowledgeBaseTypeList = async (): Promise<string[]> => {
// 如果不是数组,返回空数组
return [];
};
// 获取文件地址
export const getFileUrl = (fileId: string) => {
return `${apiPrefix}/files/${fileId}`;
};
// 知识库文档解析类型
export const getKnowledgeBaseDocumentParseTypeList = async () => {
const response = await request.get(`${apiPrefix}/knowledges/parsertype`);

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 14:00:06
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-13 10:48:41
* @Last Modified time: 2026-03-19 18:35:10
*/
import { request } from '@/utils/request'
import type { AxiosRequestConfig } from 'axios'
@@ -218,8 +218,8 @@ export const getExplicitMemory = (end_user_id: string) => {
export const getExplicitMemoryDetails = (data: { end_user_id: string, memory_id: string; }) => {
return request.post(`/memory/explicit-memory/details`, data)
}
export const getConversations = (end_user_id: string) => {
return request.get(`/memory/work/${end_user_id}/conversations`)
export const getConversations = (end_user_id: string, page = 1, pagesize = 20) => {
return request.get(`/memory/work/${end_user_id}/conversations`, { page, pagesize })
}
export const getConversationMessages = (end_user_id: string, conversation_id: string) => {
return request.get(`/memory/work/${end_user_id}/messages`, { conversation_id })

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2025-12-10 16:46:17
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-19 13:38:20
* @Last Modified time: 2026-03-19 19:45:40
*/
import { type FC, useRef, useEffect, useState } from 'react'
import clsx from 'clsx'
@@ -143,15 +143,20 @@ const ChatContent: FC<ChatContentProps> = ({
}
return (
<div key={file.url || file.uid} className="rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:p-1! rb:cursor-pointer" onClick={() => handleDownload(file)}>
{(file.type.includes('doc') || file.type.includes('docx') || file.type.includes('word') || file.type.includes('wordprocessingml.document')) && <div
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/word.svg')]"
></div>}
{(file.type.includes('pdf')) && <div
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/pdf.svg')]"
></div>}
{(file.type.includes('excel') || file.type.includes('spreadsheetml.sheet') || file.type.includes('csv')) && <div
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/excel.svg')]"
></div>}
{(file.type.includes('excel') || file.type.includes('spreadsheetml.sheet') || file.type.includes('csv'))
? <div
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/excel.svg')]"
></div>
:(file.type.includes('pdf'))
? <div
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/pdf.svg')]"
></div>
: (file.type.includes('doc') || file.type.includes('docx') || file.type.includes('word') || file.type.includes('wordprocessingml.document'))
? <div
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/word.svg')]"
></div>
: null
}
</div>
)
})}

View File

@@ -2,10 +2,11 @@
* @Author: ZhaoYing
* @Date: 2025-12-10 16:46:14
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-19 16:05:56
* @Last Modified time: 2026-03-19 18:44:51
*/
import { type FC, useEffect, useMemo } from 'react'
import { Flex, Input, Form } from 'antd'
import { Flex, Input, Form, Spin } from 'antd'
import clsx from 'clsx'
import SendIcon from '@/assets/images/conversation/send.svg'
import SendDisabledIcon from '@/assets/images/conversation/sendDisabled.svg'
@@ -69,6 +70,8 @@ const ChatInput: FC<ChatInputProps> = ({
onSend(values.message)
}
console.log('previewFileList', previewFileList)
return (
<div className={`rb:absolute rb:bottom-3 rb:left-0 rb:right-0 rb:w-full ${className}`}>
<Flex vertical justify="space-between" className="rb:border rb:border-[#DFE4ED] rb:rounded-xl rb:min-h-30">
@@ -76,62 +79,78 @@ const ChatInput: FC<ChatInputProps> = ({
{previewFileList.map((file) => {
if (file.type.includes('image')) {
return (
<div key={file.url || file.uid} className="rb:inline-block rb:group rb:relative rb:rounded-lg">
<img src={file.url} alt={file.name} className="rb:size-12! rb:rounded-lg rb:object-cover rb:cursor-pointer" />
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
<Spin key={`${file.url || file.uid}_${file.status}`} spinning={file.status === 'uploading'}>
<div key={file.url || file.uid} className={clsx("rb:inline-block rb:group rb:relative rb:rounded-lg", {
'rb:border rb:border-[#FF5D34]': file.status === 'error'
})}>
<img src={file.url} alt={file.name} className="rb:size-12! rb:rounded-lg rb:object-cover rb:cursor-pointer" />
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
</Spin>
)
}
if (file.type.includes('video')) {
return (
<div key={file.url || file.uid} className="rb:w-45 rb:h-16 rb:inline-block rb:group rb:relative rb:rounded-lg">
<video src={file.url} controls className="rb:w-45 rb:h-16 rb:rounded-lg rb:object-cover rb:cursor-pointer" />
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
<Spin key={`${file.url || file.uid}_${file.status}`} spinning={file.status === 'uploading'}>
<div key={file.url || file.uid} className={clsx("rb:w-45 rb:h-16 rb:inline-block rb:group rb:relative rb:rounded-lg", {
'rb:border rb:border-[#FF5D34]': file.status === 'error'
})}>
<video src={file.url} controls className="rb:w-45 rb:h-15.5 rb:rounded-lg rb:object-cover rb:cursor-pointer" />
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
</Spin>
)
}
if (file.type.includes('audio')) {
return (
<div key={file.url || file.uid} className="rb:w-45 rb:h-16 rb:inline-flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5 rb:gap-2">
<audio src={file.url} controls className="rb:w-45 rb:h-16" />
<Spin key={`${file.url || file.uid}_${file.status}`} spinning={file.status === 'uploading'}>
<div key={file.url || file.uid} className={clsx("rb:w-45 rb:h-16 rb:inline-flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5 rb:gap-2", {
'rb:border rb:border-[#FF5D34]': file.status === 'error'
})}>
<audio src={file.url} controls className="rb:w-45 rb:h-15.5" />
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
</Spin>
)
}
return (
<Spin key={`${file.url || file.uid}_${file.status}`} spinning={file.status === 'uploading'}>
<div key={file.url || file.uid} className={clsx("rb:w-45 rb:text-[12px] rb:gap-2.5 rb:flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5", {
'rb:border rb:border-[#FF5D34]': file.status === 'error'
})}>
{file.type.includes('pdf')
? <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/pdf_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/pdf.svg')]"
></div>
: (file.type.includes('excel') || file.type.includes('spreadsheetml.sheet') || file.type.includes('csv'))
? <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/excel_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/excel.svg')]"
></div>
: (file.type.includes('doc') || file.type.includes('docx') || file.type.includes('word') || file.type.includes('wordprocessingml.document'))
? <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/word_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/word.svg')]"
></div>
: null
}
<div className="rb:flex-1 rb:w-32.5">
<div className="rb:leading-4 rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap">{file.name}</div>
<div className="rb:leading-3.5 rb:mt-0.5 rb:text-[#5B6167] rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap">{file.type} · {file.size}</div>
</div>
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
)
}
return (
<div key={file.url || file.uid} className="rb:w-45 rb:text-[12px] rb:gap-2.5 rb:flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5">
{file.type.includes('pdf')
? <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/pdf_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/pdf.svg')]"
></div>
: (file.type.includes('excel') || file.type.includes('spreadsheetml.sheet') || file.type.includes('csv'))
? <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/excel_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/excel.svg')]"
></div>
: (file.type.includes('doc') || file.type.includes('docx') || file.type.includes('word') || file.type.includes('wordprocessingml.document'))
? <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/word_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/word.svg')]"
></div>
: null
}
<div className="rb:flex-1 rb:w-32.5">
<div className="rb:leading-4 rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap">{file.name}</div>
<div className="rb:leading-3.5 rb:mt-0.5 rb:text-[#5B6167] rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap">{file.type} · {file.size}</div>
</div>
<div
className="rb:hidden rb:group-hover:block rb:absolute rb:-right-1 rb:-top-1 rb:size-3.5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/delete.svg')] rb:hover:bg-[url('@/assets/images/conversation/delete_hover.svg')]"
onClick={() => handleDelete(file)}
></div>
</div>
</div>
</Spin>
)
})}
</Flex></div>}

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-03-17 14:22:25
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 15:55:13
* @Last Modified time: 2026-03-19 18:59:37
*/
// Toolbar component for chat input area, supporting file upload, audio recording, and variable configuration
import { useRef, forwardRef, useImperativeHandle, type ReactNode, useEffect } from 'react'
@@ -49,6 +49,7 @@ interface FormValues {
memory?: boolean;
}
const max_file_count = 1;
const ChatToolbar = forwardRef<ChatToolbarRef, ChatToolbarProps>(({
features,
extra,
@@ -85,10 +86,18 @@ const ChatToolbar = forwardRef<ChatToolbarRef, ChatToolbarProps>(({
// Append newly uploaded file to the file list when upload is complete
const fileChange = (file?: any) => {
if (file?.status !== 'done') return
const files = [...(queryValues?.files || []), file]
form.setFieldValue('files', files)
onFilesChange?.(files)
console.log('file', file)
const lastFiles = form.getFieldValue('files') || [];
const index = lastFiles.findIndex((item: any) => item.uid === file.uid)
if (index > -1) {
lastFiles[index] = file
} else {
lastFiles.push(file)
}
form.setFieldValue('files', [...lastFiles])
onFilesChange?.([...lastFiles])
console.log('lastFiles', lastFiles)
}
// Append recorded audio file to the file list and notify parent
@@ -128,8 +137,8 @@ const ChatToolbar = forwardRef<ChatToolbarRef, ChatToolbarProps>(({
key: 'url',
label: t('memoryConversation.addRemoteFile'),
onClick: () => {
if ((queryValues?.files?.length || 0) >= file_upload.max_file_count) {
messageApi.warning(t('common.fileNumTip', { num: file_upload.max_file_count }))
if ((queryValues?.files?.length || 0) >= max_file_count) {
messageApi.warning(t('common.fileNumTip', { num: max_file_count }))
return
}
uploadFileListModalRef.current?.handleOpen()
@@ -145,7 +154,7 @@ const ChatToolbar = forwardRef<ChatToolbarRef, ChatToolbarProps>(({
onChange={fileChange}
requestConfig={uploadRequestConfig}
featureConfig={file_upload}
disabled={(queryValues?.files?.length || 0) >= file_upload.max_file_count}
disabled={(queryValues?.files?.length || 0) >= max_file_count}
/>
)
})
@@ -177,7 +186,7 @@ const ChatToolbar = forwardRef<ChatToolbarRef, ChatToolbarProps>(({
{file_upload?.audio_enabled && file_upload?.allowed_transfer_methods?.includes('local_file') && (
<Flex align="center">
<AudioRecorder
disabled={(queryValues?.files?.length || 0) >= file_upload.max_file_count}
disabled={(queryValues?.files?.length || 0) >= max_file_count}
action={uploadAction}
requestConfig={uploadRequestConfig}
onRecordingComplete={handleRecordingComplete}

View File

@@ -4,7 +4,7 @@
* @Author: yujiangping
* @Date: 2026-03-16 19:01:12
* @LastEditors: yujiangping
* @LastEditTime: 2026-03-18 18:35:53
* @LastEditTime: 2026-03-20 12:12:20
*/
import { useState, useEffect, useRef, useCallback, type FC } from 'react';
import { Spin, Alert, Button, Table, InputNumber, Image } from 'antd';
@@ -309,23 +309,64 @@ const DocumentPreview: FC<DocumentPreviewProps> = ({
}
};
const [csvTruncated, setCsvTruncated] = useState(false);
const isCsvFile = () => getFileExtension() === '.csv';
// CSV 预览大小限制1MB
const CSV_PREVIEW_SIZE = 1 * 1024 * 1024;
// 最大预览行数
const MAX_PREVIEW_ROWS = 500;
const fetchFileBufferWithLimit = async (url: string, maxBytes?: number): Promise<ArrayBuffer> => {
const requestUrl = getRequestUrl(url);
const headers: Record<string, string> = {
'Authorization': `Bearer ${cookieUtils.get('authToken') || ''}`,
};
if (maxBytes) {
headers['Range'] = `bytes=0-${maxBytes - 1}`;
}
const response = await fetch(requestUrl, {
credentials: 'include',
headers,
});
if (!response.ok && response.status !== 206) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return response.arrayBuffer();
};
const loadExcelFile = async () => {
setLoading(true);
setError(false);
setErrorMessage('');
setCsvTruncated(false);
try {
const arrayBuffer = await fetchFileBuffer(fileUrl);
// CSV 文件需要处理编码问题(可能是 GBK/GB2312
// CSV 文件需要处理编码问题(可能是 GBK/GB2312且大文件只取前 1MB
if (isCsvFile()) {
let arrayBuffer: ArrayBuffer;
let truncated = false;
try {
// 先尝试 Range 请求只取前 1MB
arrayBuffer = await fetchFileBufferWithLimit(fileUrl, CSV_PREVIEW_SIZE);
// 如果返回的数据刚好等于限制大小,说明可能被截断了
if (arrayBuffer.byteLength >= CSV_PREVIEW_SIZE) {
truncated = true;
}
} catch {
// Range 请求不支持时,全量获取后截断
const fullBuffer = await fetchFileBuffer(fileUrl);
if (fullBuffer.byteLength > CSV_PREVIEW_SIZE) {
arrayBuffer = fullBuffer.slice(0, CSV_PREVIEW_SIZE);
truncated = true;
} else {
arrayBuffer = fullBuffer;
}
}
let csvText: string;
// 先尝试 UTF-8 解码
const utf8Text = new TextDecoder('utf-8').decode(arrayBuffer);
// 检测是否有乱码特征(常见的 GBK 被错误解析为 UTF-8 的替换字符)
if (utf8Text.includes('\uFFFD') || /[\x80-\xff]/.test(utf8Text.slice(0, 200))) {
// 尝试 GBK 解码
try {
csvText = new TextDecoder('gbk').decode(arrayBuffer);
} catch {
@@ -334,19 +375,35 @@ const DocumentPreview: FC<DocumentPreviewProps> = ({
} else {
csvText = utf8Text;
}
// 如果被截断,去掉最后一行不完整的数据
if (truncated) {
const lastNewline = csvText.lastIndexOf('\n');
if (lastNewline > 0) {
csvText = csvText.substring(0, lastNewline);
}
}
const workbook = XLSX.read(csvText, { type: 'string' });
const sheets = workbook.SheetNames.map(sheetName => {
const worksheet = workbook.Sheets[sheetName];
const data = XLSX.utils.sheet_to_json(worksheet, { header: 1 }) as any[][];
let data = XLSX.utils.sheet_to_json(worksheet, { header: 1 }) as any[][];
// 限制最大行数
if (data.length > MAX_PREVIEW_ROWS + 1) {
data = data.slice(0, MAX_PREVIEW_ROWS + 1); // +1 保留表头
truncated = true;
}
return { sheetName, data };
});
setCsvTruncated(truncated);
setExcelData(sheets);
setLoading(false);
return;
}
const arrayBuffer = await fetchFileBuffer(fileUrl);
const workbook = XLSX.read(arrayBuffer, { type: 'array' });
const sheets = workbook.SheetNames.map(sheetName => {
const sheets = workbook.SheetNames.map((sheetName: string) => {
const worksheet = workbook.Sheets[sheetName];
const data = XLSX.utils.sheet_to_json(worksheet, { header: 1 }) as any[][];
return { sheetName, data };
@@ -522,9 +579,14 @@ const DocumentPreview: FC<DocumentPreviewProps> = ({
)
)}
{/* Excel 预览 */}
{/* Excel/CSV 预览 */}
{isExcelFile() && !error && !loading && (
<div className="rb:w-full rb:flex-1 rb:overflow-auto rb:bg-white rb:p-4 rb:rounded rb:border rb:border-gray-200">
{csvTruncated && (
<div className="rb:mb-3 rb:px-3 rb:py-2 rb:bg-yellow-50 rb:border rb:border-yellow-200 rb:rounded rb:text-sm rb:text-yellow-700">
{MAX_PREVIEW_ROWS}
</div>
)}
{excelData.map((sheet, index) => (
<div key={index} className="rb:mb-6">
<h3 className="rb:text-lg rb:font-semibold rb:mb-3">{sheet.sheetName}</h3>
@@ -541,6 +603,7 @@ const DocumentPreview: FC<DocumentPreviewProps> = ({
scroll={{ x: 'max-content' }}
size="small"
bordered
virtual
/>
)}
</div>

View File

@@ -460,6 +460,7 @@ export const en = {
nameInvalid: 'Name cannot start or end with a space',
notAllSpaces: 'Cannot be all spaces',
view: 'View',
callbackUrlInvalid: 'Please enter a valid URL',
},
model: {
searchPlaceholder: 'search model…',
@@ -1773,8 +1774,6 @@ Memory Bear: After the rebellion, regional warlordism intensified for several re
memoryConversationAnalysisEmpty: 'No conversation analysis available.',
memoryConversationAnalysisEmptySubTitle: 'Conversation analysis will appear here.',
communities: 'Cluster Communities',
summaries: 'Memory Summaries',
uploadFile: 'Upload File',
fileType: 'File Type',
image: 'Image',

View File

@@ -1093,6 +1093,7 @@ export const zh = {
nameInvalid: '不能是空格开头或结尾',
notAllSpaces: '不能是纯空格',
view: '查看',
callbackUrlInvalid: '请输入有效的 URL',
},
model: {
searchPlaceholder: '搜索模型…',
@@ -1769,8 +1770,6 @@ export const zh = {
memoryConversationAnalysisEmpty: '目前没有可用的对话分析内容',
memoryConversationAnalysisEmptySubTitle: '输入您的用户ID后点击"测试记忆"查看对话记忆',
communities: '聚类社区',
summaries: '记忆摘要',
uploadFile: '上传文件',
fileType: '文件类型',
image: '图片',

View File

@@ -183,7 +183,7 @@ const TestChat: FC<TestChatProps> = ({
const handleSend = () => {
if (loading || !application || !message || !message?.trim()) return
const files = toolbarRef.current?.getFiles() || []
const files = (toolbarRef.current?.getFiles() || []).filter(item => !['uploading', 'error'].includes(item.status))
const variables = toolbarRef.current?.getVariables() || []
const { isCanSend, params } = buildVariableParams(variables)
if (!isCanSend) return
@@ -235,7 +235,7 @@ const TestChat: FC<TestChatProps> = ({
const handleWorkflowSend = () => {
if (loading || !application || !message || !message?.trim()) return
const files = toolbarRef.current?.getFiles() || []
const files = (toolbarRef.current?.getFiles() || []).filter(item => !['uploading', 'error'].includes(item.status))
const variables = toolbarRef.current?.getVariables() || []
const { isCanSend, params } = buildVariableParams(variables)
if (!isCanSend) return

View File

@@ -191,7 +191,7 @@ const Chat: FC<ChatProps> = ({
.then(() => {
const message = msg
if (!message?.trim()) return
const files = toolbarRef.current?.getFiles() || []
const files = (toolbarRef.current?.getFiles() || []).filter(item => !['uploading', 'error'].includes(item.status))
// Validate required variables before sending
let isCanSend = true
const params: Record<string, any> = {}
@@ -352,7 +352,7 @@ const Chat: FC<ChatProps> = ({
.then(() => {
const message = msg
if (!message || message.trim() === '') return
const files = toolbarRef.current?.getFiles() || []
const files = (toolbarRef.current?.getFiles() || []).filter(item => !['uploading', 'error'].includes(item.status))
addUserMessage(message, files)
setMessage(undefined)
toolbarRef.current?.setFiles([])

View File

@@ -24,7 +24,7 @@ interface FeaturesConfigModalProps {
refresh: (value: FeaturesConfigForm) => void;
source?: Application['type'];
}
const max_file_count = 1;
/**
* Modal for copying applications
*/
@@ -133,7 +133,7 @@ const FeaturesConfigModal = forwardRef<FeaturesConfigModalRef, FeaturesConfigMod
</div>
<div>
<div className="rb:text-[12px] rb:text-[#5B6167] rb:py-1">{t('application.maxCount')}</div>
{fu.max_file_count} {t('application.unix')}
{max_file_count} {t('application.unix')}
</div>
</Flex>
<Button block onClick={handleOpenSettings}>{t('application.setting')}</Button>

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-03-05
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-19 15:18:20
* @Last Modified time: 2026-03-19 20:19:14
*/
import { forwardRef, useImperativeHandle, useState } from 'react';
import { Form, InputNumber, Flex, Switch, Row, Col, Radio } from 'antd';
@@ -82,28 +82,27 @@ const defaultValues: FileUpload = {
"mp3",
"wav",
"m4a",
"ogg",
"flac"
],
document_enabled: false,
document_max_size_mb: 100,
document_allowed_extensions: [
"pdf",
"docx",
"doc",
"xlsx",
"xls",
"txt",
"csv",
"json"
"json",
"md",
],
video_enabled: false,
video_max_size_mb: 100,
video_allowed_extensions: [
"mp4",
"mov",
"avi",
"webm"
],
max_file_count: 5,
max_file_count: 1,
allowed_transfer_methods: 'both'
}
@@ -168,8 +167,8 @@ const FileUploadSettingModal = forwardRef<FileUploadSettingModalRef, FileUploadS
</Radio.Group>
</Form.Item>
<div className="rb:text-[12px] rb:text-[#5B6167] rb:mb-1">{t('application.maxCount')}</div>
<Form.Item label={t('application.maxCount')} name="max_file_count">
{/* <div className="rb:text-[12px] rb:text-[#5B6167] rb:mb-1">{t('application.maxCount')}</div> */}
<Form.Item label={t('application.maxCount')} name="max_file_count" hidden>
<InputNumber min={1} max={20} precision={0} className="rb:w-full!" placeholder={t('common.pleaseEnter')} />
</Form.Item>

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-06 21:09:42
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 20:32:54
* @Last Modified time: 2026-03-19 18:38:41
*/
/**
* File Upload Component
@@ -23,7 +23,7 @@
import { useState, useEffect, forwardRef, useImperativeHandle, useMemo } from 'react';
import { Upload, Progress, App } from 'antd';
import type { UploadProps, UploadFile } from 'antd';
import type { UploadProps as RcUploadProps } from 'antd/es/upload/interface';
import type { UploadProps as RcUploadProps, RcFile, UploadFileStatus } from 'antd/es/upload/interface';
import { useTranslation } from 'react-i18next';
import { request } from '@/utils/request'
@@ -221,17 +221,29 @@ const UploadFiles = forwardRef<UploadFilesRef, UploadFilesProps>(({
*/
const handleCustomRequest: RcUploadProps['customRequest'] = async (options) => {
const { file, onSuccess, onError } = options;
try {
const formData = new FormData();
formData.append('file', file);
const response = await request.uploadFile(action, formData, requestConfig);
onSuccess?.({data: response});
} catch (error) {
onError?.(error as Error);
if (typeof file === 'string') return;
const rcFile = file as RcFile;
const formData = new FormData();
formData.append('file', rcFile);
const fileVo: UploadFile = {
uid: rcFile.uid,
name: rcFile.name,
status: 'uploading' as UploadFileStatus,
percent: 0,
type: rcFile.type,
originFileObj: rcFile,
thumbUrl: URL.createObjectURL(rcFile)
}
onChange?.(fileVo)
request.uploadFile(action, formData, requestConfig)
.then(res => {
onSuccess?.({ data: res });
})
.catch((error) => {
onError?.(error as Error);
fileVo.status = 'error'
onChange?.(fileVo)
})
};
/**

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-06 21:09:47
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 21:10:01
* @Last Modified time: 2026-03-19 20:32:32
*/
/**
* Upload File List Modal Component
@@ -19,7 +19,10 @@
* @component
*/
import { forwardRef, useImperativeHandle, useState, useMemo } from 'react';
import { Form, Input, Select, Button, Flex } from 'antd';
import { Form, Input, Select,
// Button,
Flex
} from 'antd';
import { useTranslation } from 'react-i18next';
import type { UploadFileListModalRef } from '../types'
@@ -105,9 +108,11 @@ const UploadFileListModal = forwardRef<UploadFileListModalRef, UploadFileListMod
onOk={handleSave}
confirmLoading={loading}
>
<Form form={form} layout="vertical">
<Form form={form} layout="vertical" initialValues={{ files: [{ type: undefined, url: undefined }] }}>
<Form.List name="files">
{(fields, { add, remove }) => (
{(fields,
// { add, remove }
) => (
<>
{/* Render each file entry with type selector and URL input */}
{fields.map(({ key, name, ...restField }) => (
@@ -116,6 +121,9 @@ const UploadFileListModal = forwardRef<UploadFileListModalRef, UploadFileListMod
{...restField}
name={[name, 'type']}
className="rb:mb-0!"
rules={[
{ required: true, message: t('common.pleaseSelect') }
]}
>
<Select
placeholder={t('memoryConversation.fileType')}
@@ -126,22 +134,25 @@ const UploadFileListModal = forwardRef<UploadFileListModalRef, UploadFileListMod
<FormItem
{...restField}
name={[name, 'url']}
rules={[{ required: true, message: t('common.pleaseEnter') }]}
rules={[
{ required: true, message: t('common.pleaseEnter') },
{ type: 'url', message: t('common.callbackUrlInvalid') },
]}
className="rb:mb-0! rb:flex-1!"
>
<Input placeholder={t('memoryConversation.fileUrl')} />
</FormItem>
<div
{/* <div
className="rb:w-5 rb:h-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/delete.svg')] rb:hover:bg-[url('@/assets/images/delete_hover.svg')]"
onClick={() => remove(name)}
></div>
></div> */}
</Flex>
))}
<Form.Item noStyle>
{/* <Form.Item noStyle>
<Button type="dashed" onClick={() => add()} block>
+ {t('common.add')}
</Button>
</Form.Item>
</Form.Item> */}
</>
)}
</Form.List>

View File

@@ -200,7 +200,7 @@ const Conversation: FC = () => {
/** Send message and handle streaming response */
const handleSend = () => {
if (!token || !shareToken) return
const files = toolbarRef.current?.getFiles() || []
const files = (toolbarRef.current?.getFiles() || []).filter(item => !['uploading', 'error'].includes(item.status))
const variables = toolbarRef.current?.getVariables() || []
let isCanSend = true
const params: Record<string, any> = {}

View File

@@ -11,7 +11,7 @@ import { useNavigate, useParams, useLocation } from 'react-router-dom';
import { useTranslation } from 'react-i18next';
import { useBreadcrumbManager, type BreadcrumbPath } from '@/hooks/useBreadcrumbManager';
import { Button, Spin, message, Switch } from 'antd';
import { getDocumentDetail, getDocumentChunkList, downloadFile, updateDocument, updateDocumentChunk, createDocumentChunk } from '@/api/knowledgeBase';
import { getDocumentDetail, getDocumentChunkList, downloadFile, updateDocument, updateDocumentChunk, createDocumentChunk, getFileUrl } from '@/api/knowledgeBase';
import type { KnowledgeBaseDocumentData, RecallTestData } from '@/views/KnowledgeBase/types';
import { formatDateTime } from '@/utils/format';
import InfoPanel, { type InfoItem } from '../components/InfoPanel';
@@ -138,7 +138,7 @@ const DocumentDetails: FC = () => {
const response = await getDocumentDetail(documentId);
setDocument(response);
setInfoItems(formatDocumentInfo(response));
const url = `${imagePath}/api/files/${response.file_id}`
const url = `${window.location.origin}/api/files/${response.file_id}`;
setFileUrl(url);
setParserMode(response?.parser_config?.auto_questions || 0)
// ChunkList will be called automatically in useEffect based on document.progress

View File

@@ -1,8 +1,8 @@
/*
* @Author: ZhaoYing
* @Date: 2026-02-03 17:09:03
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-17 16:21:47
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-02-03 17:09:03
*/
/**
* Memory Conversation Page
@@ -92,7 +92,7 @@ export interface LogItem {
type: string;
title: string;
data?: DataItem[] | AnyObject;
raw_results?: string | Record<string, AnyObject>;
raw_results?: string | AnyObject;
summary?: string;
query?: string;
reason?: string;
@@ -264,25 +264,22 @@ const MemoryConversation: FC = () => {
</ContentWrapper>
))}
</Space>
: log.type === 'search_result' && log.raw_results && typeof log.raw_results !== 'string'
: log.type === 'search_result' && log.raw_results
? <ContentWrapper>
<div className="rb:font-medium rb:text-[#212332] rb:mb-2">{log.query}</div>
{(log.raw_results.reranked_results as AnyObject)?.communities?.length > 0 && <>
<div className="rb:font-medium rb:text-[#212332] rb:text-[12px]">{t('memoryConversation.communities')}</div>
<ul className='rb:mt-2 rb:text-[12px] rb:text-[#5B6167] rb:list-disc rb:pl-4'>
{((log.raw_results.reranked_results as AnyObject)?.communities as { content: string }[]).map((item, index: number) => (
<li key={index}>{item.content}</li>
))}
</ul>
</>}
{(log.raw_results.reranked_results as AnyObject)?.summaries?.length > 0 && <>
<div className="rb:font-medium rb:text-[#212332] rb:text-[12px]">{t('memoryConversation.summaries')}</div>
<ul className='rb:mt-2 rb:text-[12px] rb:text-[#5B6167] rb:list-disc rb:pl-4'>
{((log.raw_results.reranked_results as AnyObject)?.summaries as { content: string }[]).map((item, index: number) => (
<li key={index}>{item.content}</li>
))}
</ul>
</>}
<div className='rb:mt-2 rb:text-[12px] rb:text-[#5B6167]'>
{typeof log.raw_results === 'string'
? <Markdown content={log.raw_results} />
: <>
{log.raw_results.reranked_results?.statements.length > 0 && log.raw_results.reranked_results?.statements.map((item: { statement: string }, index: number) => (
<div key={index}>{item.statement}</div>
))}
{log.raw_results.reranked_results?.summaries.length > 0 && log.raw_results.reranked_results?.summaries.map((item: { content: string }, index: number) => (
<div key={index}>{item.content}</div>
))}
</>
}
</div>
</ContentWrapper>
: log.type === 'retrieval_summary' && log.summary
? <ContentWrapper><div className="rb:text-[12px] rb:text-[#5B6167]">{log.summary}</div></ContentWrapper>
@@ -299,22 +296,22 @@ const MemoryConversation: FC = () => {
</ContentWrapper>
: log.type === 'input_summary' && log.raw_results
? <ContentWrapper>
<div className="rb:font-medium rb:text-[#212332] rb:mb-2">{log.query}</div>
<div className="rb:font-medium rb:text-[12px] rb:text-[#5B6167] rb:mb-2">{log.summary}</div>
<div className='rb:mt-2 rb:text-[12px] rb:text-[#5B6167]'>
{typeof log.raw_results === 'string'
? <Markdown content={log.raw_results} />
: <>
{(log.raw_results.reranked_results as AnyObject)?.statements?.length > 0 && ((log.raw_results.reranked_results as AnyObject)?.statements as { statement: string }[]).map((item, index: number) => (
<div key={index}>{item.statement}</div>
))}
{(log.raw_results.reranked_results as AnyObject)?.summaries?.length > 0 && ((log.raw_results.reranked_results as AnyObject)?.summaries as { content: string }[]).map((item, index: number) => (
<div key={index}>{item.content}</div>
))}
</>
}
</div>
</ContentWrapper>
<div className="rb:font-medium rb:text-[#212332] rb:mb-2">{log.query}</div>
<div className="rb:font-medium rb:text-[12px] rb:text-[#5B6167] rb:mb-2">{log.summary}</div>
<div className='rb:mt-2 rb:text-[12px] rb:text-[#5B6167]'>
{typeof log.raw_results === 'string'
? <Markdown content={log.raw_results} />
: <>
{log.raw_results.reranked_results?.statements.length > 0 && log.raw_results.reranked_results?.statements.map((item: { statement: string; } , index: number) => (
<div key={index}>{item.statement}</div>
))}
{log.raw_results.reranked_results?.summaries.length > 0 && log.raw_results.reranked_results?.summaries.map((item: { content: string; }, index: number) => (
<div key={index}>{item.content}</div>
))}
</>
}
</div>
</ContentWrapper>
: null
}
</div>

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 18:32:00
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-13 14:51:17
* @Last Modified time: 2026-03-19 20:23:42
*/
/**
* Relationship Network Component
@@ -287,22 +287,26 @@ const RelationshipNetwork:FC = () => {
: (selectedNode as RawCommunityNode).properties.community_id
? <div className="rb:p-3 rb:pt-0">
<div className="rb:font-medium rb:text-[#212332] rb:text-[16px] rb:leading-5.5 rb:pl-1">
{(selectedNode as RawCommunityNode).properties.name}
</div>
<div className="rb:mt-3 rb:font-medium rb:leading-5 rb:pl-1">{t('userMemory.summary')}</div>
<div className="rb:bg-[#F6F6F6] rb:rounded-xl rb:px-3 rb:py-2.5 rb:mt-2">
{(selectedNode as RawCommunityNode).properties.summary}
{(selectedNode as RawCommunityNode).properties.name || selectedNode.id}
</div>
{(selectedNode as RawCommunityNode).properties.summary && <>
<div className="rb:mt-3 rb:font-medium rb:leading-5 rb:pl-1">{t('userMemory.summary')}</div>
<div className="rb:bg-[#F6F6F6] rb:rounded-xl rb:px-3 rb:py-2.5 rb:mt-2">
{(selectedNode as RawCommunityNode).properties.summary}
</div>
</>}
<Flex align="center" justify="space-between" className="rb:mt-5!">
<span className="rb:text-[#5B6167] rb:font-regular rb:pl-1">{t('userMemory.member_count')}</span>
<span className="rb:font-medium">{(selectedNode as RawCommunityNode).properties.member_count}{t('userMemory.member_count_desc')}</span>
</Flex>
<Divider className='rb:my-2.5!' />
<div className="rb:font-medium rb:leading-5 rb:pl-1">{t('userMemory.core_entities')}</div>
<ul className="rb:list-disc rb:pl-4 rb:text-[#5B6167] rb:mt-2">
{(selectedNode as RawCommunityNode).properties.core_entities.map((entity, index) => <li key={index}>{entity}</li>)}
</ul>
{(selectedNode as RawCommunityNode).properties.core_entities && <>
<Divider className='rb:my-2.5!' />
<div className="rb:font-medium rb:leading-5 rb:pl-1">{t('userMemory.core_entities')}</div>
<ul className="rb:list-disc rb:pl-4 rb:text-[#5B6167] rb:mt-2">
{(selectedNode as RawCommunityNode).properties.core_entities?.map((entity, index) => <li key={index}>{entity}</li>)}
</ul>
</>}
</div>
: <>
{(selectedNode as Node).name && (

View File

@@ -1,8 +1,10 @@
import { type FC, useEffect, useState, useMemo } from 'react'
import { type FC, useEffect, useState, useMemo, useRef } from 'react'
import clsx from 'clsx'
import { useTranslation } from 'react-i18next'
import { useParams } from 'react-router-dom'
import { Row, Col, Skeleton, Button, Divider, Tooltip } from 'antd'
import InfiniteScroll from 'react-infinite-scroll-component'
import RbCard from '@/components/RbCard/Card'
import {
getConversations,
@@ -34,6 +36,8 @@ const WorkingDetail: FC = () => {
const { id } = useParams()
const [loading, setLoading] = useState<boolean>(false)
const [data, setData] = useState<Conversation[]>([])
const [hasMore, setHasMore] = useState<boolean>(true)
const pageRef = useRef<number>(1)
const [messagesLoading, setMessagesLoading] = useState<boolean>(false)
const [messages, setMessages] = useState<ChatItem[]>([])
const [detailLoading, setDetailLoading] = useState<boolean>(false)
@@ -51,16 +55,30 @@ const WorkingDetail: FC = () => {
setSelected(null)
setDetail(null)
setData([])
getConversations(id).then((res) => {
const response = res as Conversation[]
setData(response)
setSelected(response[0] || null)
setHasMore(true)
pageRef.current = 1
getConversations(id, 1).then((res) => {
const response = res as { items: Conversation[], page: { hasnext: boolean } }
setData(response.items)
setSelected(response.items[0] || null)
setHasMore(response.page.hasnext)
})
.finally(() => {
setLoading(false)
})
}
const loadMore = () => {
if (!id) return
const nextPage = pageRef.current + 1
getConversations(id, nextPage).then((res) => {
const response = res as {items: Conversation[], page: { hasnext: boolean }}
setData(prev => [...prev, ...response.items])
pageRef.current = nextPage
setHasMore(response.page.hasnext)
})
}
useEffect(() => {
if (!id || !selected || !selected.id) return
getDetail(selected.id)
@@ -103,22 +121,30 @@ const WorkingDetail: FC = () => {
: data.length === 0
? <Empty />
:(
<Row gutter={16} className="rb:h-full">
<Row gutter={16}>
<Col span={5}>
<div className="rb:h-full! rb:border-r rb:border-[#EAECEE] rb:py-3 rb:px-4">
{data.map(item => (
<div key={item.id} className="rb:mb-3">
<Tooltip title={item.title}>
<div className={clsx("rb:p-[8px_13px] rb:rounded-lg rb:leading-5 rb:cursor-pointer rb:hover:bg-[#F0F3F8] rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap", {
'rb:bg-[#FFFFFF] rb:shadow-[0px_2px_4px_0px_rgba(0,0,0,0.15)] rb:font-medium rb:hover:bg-[#FFFFFF]!': item.id === selected?.id,
})}
onClick={() => setSelected(item)}
>
{item.title}
</div>
</Tooltip>
</div>
))}
<div id="conversation-list" className="rb:h-[calc(100vh-76px)]! rb:border-r rb:border-[#EAECEE] rb:py-3 rb:px-4 rb:overflow-y-auto">
<InfiniteScroll
dataLength={data.length}
next={loadMore}
hasMore={hasMore}
loader={null}
scrollableTarget="conversation-list"
>
{data.map(item => (
<div key={item.id} className="rb:mb-3">
<Tooltip title={item.title}>
<div className={clsx("rb:p-[8px_13px] rb:rounded-lg rb:leading-5 rb:cursor-pointer rb:hover:bg-[#F0F3F8] rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap", {
'rb:bg-[#FFFFFF] rb:shadow-[0px_2px_4px_0px_rgba(0,0,0,0.15)] rb:font-medium rb:hover:bg-[#FFFFFF]!': item.id === selected?.id,
})}
onClick={() => setSelected(item)}
>
{item.title}
</div>
</Tooltip>
</div>
))}
</InfiniteScroll>
</div>
</Col>
{selected && <>

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-06 21:10:56
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 20:46:35
* @Last Modified time: 2026-03-19 18:41:07
*/
/**
* Workflow Chat Component
@@ -151,7 +151,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
setLoading(true)
const message = msg
const files = toolbarRef.current?.getFiles() || []
const files = (toolbarRef.current?.getFiles() || []).filter(item => !['uploading', 'error'].includes(item.status))
setChatList(prev => [...prev, {
role: 'user',
content: message,

View File

@@ -18,8 +18,8 @@ const InitialValuePlugin: React.FC<InitialValuePluginProps> = ({ value, options
const isUserInputRef = useRef(false);
useEffect(() => {
// 监听编辑器变化,标记是否为用户输入
const removeListener = editor.registerUpdateListener(({ editorState }) => {
const removeListener = editor.registerUpdateListener(({ editorState, tags }) => {
if (tags.has('programmatic')) return;
editorState.read(() => {
const root = $getRoot();
const textContent = root.getTextContent();
@@ -107,7 +107,7 @@ const InitialValuePlugin: React.FC<InitialValuePluginProps> = ({ value, options
});
root.append(paragraph);
}
}, { discrete: true });
}, { discrete: true, tag: 'programmatic' });
});
}