From 71d9ae15a1b668513a060518b610f6c24c7cc85f Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 12 Mar 2026 13:53:47 +0800 Subject: [PATCH 01/42] [changes] Remove the non-existent "storage_type" --- api/app/repositories/end_user_repository.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/app/repositories/end_user_repository.py b/api/app/repositories/end_user_repository.py index 0b828a8b..61faf6d4 100644 --- a/api/app/repositories/end_user_repository.py +++ b/api/app/repositories/end_user_repository.py @@ -247,7 +247,6 @@ class EndUserRepository: EndUser.user_summary: user_summary, EndUser.rag_tags: rag_tags, EndUser.rag_personas: rag_personas, - EndUser.storage_type: "rag", EndUser.rag_summary_updated_at: datetime.datetime.now(), }, synchronize_session=False @@ -286,7 +285,6 @@ class EndUserRepository: .update( { EndUser.memory_insight: memory_insight, - EndUser.storage_type: "rag", EndUser.memory_insight_updated_at: datetime.datetime.now(), }, synchronize_session=False From d7911244fc6bbeb80df414a7462558e0b38d19b5 Mon Sep 17 00:00:00 2001 From: zhaoying Date: Thu, 12 Mar 2026 17:00:30 +0800 Subject: [PATCH 02/42] fix(web): update i18n --- web/src/i18n/en.ts | 2 +- web/src/i18n/zh.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/web/src/i18n/en.ts b/web/src/i18n/en.ts index c93500f6..af2a3926 100644 --- a/web/src/i18n/en.ts +++ b/web/src/i18n/en.ts @@ -1370,7 +1370,7 @@ export const en = { gotoList: 'Return to Application List', gotoDetail: 'View Details', dify: 'Dify', - pleaseUploadFile: 'Please upload workflow file', + pleaseUploadFile: 'Please upload file', }, userMemory: { userMemory: 'User Memory', diff --git a/web/src/i18n/zh.ts b/web/src/i18n/zh.ts index 37d70c2c..97c37771 100644 --- a/web/src/i18n/zh.ts +++ b/web/src/i18n/zh.ts @@ -754,7 +754,7 @@ export const zh = { gotoList: '返回应用列表', gotoDetail: '查看详情', dify: 'Dify', - pleaseUploadFile: '请上传工作流文件', + pleaseUploadFile: '请上传文件', }, table: { totalRecords: '共 {{total}} 条记录' From 2961ea4e44ccd41b6209b82cc7c40c5c7db356fc Mon Sep 17 00:00:00 2001 From: zhaoying Date: Thu, 12 Mar 2026 17:20:16 +0800 Subject: [PATCH 03/42] fix(web): upload cancel add refresh --- .../views/ApplicationManagement/components/UploadModal.tsx | 4 ++-- .../ApplicationManagement/components/UploadWorkflowModal.tsx | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/web/src/views/ApplicationManagement/components/UploadModal.tsx b/web/src/views/ApplicationManagement/components/UploadModal.tsx index f354c145..a7acc093 100644 --- a/web/src/views/ApplicationManagement/components/UploadModal.tsx +++ b/web/src/views/ApplicationManagement/components/UploadModal.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-28 14:08:14 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-06 12:05:46 + * @Last Modified time: 2026-03-12 17:19:46 */ /** * UploadModal Component @@ -63,6 +63,7 @@ const UploadModal = forwardRef(({ * Resets all states and form fields */ const handleClose = () => { + refresh() setVisible(false); form.resetFields(); setCurrent(0); @@ -211,7 +212,6 @@ const UploadModal = forwardRef(({ fileSize={100} maxCount={1} fileType={['yml']} - draggerHeight={200} /> diff --git a/web/src/views/ApplicationManagement/components/UploadWorkflowModal.tsx b/web/src/views/ApplicationManagement/components/UploadWorkflowModal.tsx index e1353843..a4adf0c5 100644 --- a/web/src/views/ApplicationManagement/components/UploadWorkflowModal.tsx +++ b/web/src/views/ApplicationManagement/components/UploadWorkflowModal.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-28 14:08:14 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-06 12:05:46 + * @Last Modified time: 2026-03-12 17:19:33 */ /** * UploadWorkflowModal Component @@ -72,6 +72,7 @@ const UploadWorkflowModal = forwardRef Date: Thu, 12 Mar 2026 17:36:42 +0800 Subject: [PATCH 04/42] fix(app): Bug fixes for application import and export --- api/app/services/app_dsl_service.py | 30 ++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/api/app/services/app_dsl_service.py b/api/app/services/app_dsl_service.py index fc071177..0c778b81 100644 --- a/api/app/services/app_dsl_service.py +++ b/api/app/services/app_dsl_service.py @@ -18,6 +18,7 @@ from app.models.tool_model import ToolConfig as ToolConfigModel from app.models.workflow_model import WorkflowConfig from app.services.workflow_service import WorkflowService from app.core.workflow.adapters.memory_bear.memory_bear_adapter import MemoryBearAdapter +from app.models.memory_config_model import MemoryConfig as MemoryConfigModel class AppDslService: @@ -220,7 +221,7 @@ class AppDslService: id=uuid.uuid4(), workspace_id=workspace_id, created_by=user_id, - name=app_meta.get("name", "导入应用"), + name=self._unique_app_name(app_meta.get("name", "导入应用"), workspace_id, app_type), description=app_meta.get("description"), icon=app_meta.get("icon"), icon_type=app_meta.get("icon_type"), @@ -296,6 +297,19 @@ class AppDslService: self.db.refresh(new_app) return new_app, warnings + def _unique_app_name(self, name: str, workspace_id: uuid.UUID, app_type: AppType) -> str: + existing = {r[0] for r in self.db.query(App.name).filter( + App.workspace_id == workspace_id, + App.type == app_type, + App.is_active.is_(True) + ).all()} + if name not in existing: + return name + counter = 1 + while f"{name}({counter})" in existing: + counter += 1 + return f"{name}({counter})" + def _resolve_model(self, ref: Optional[dict], tenant_id: uuid.UUID, warnings: list) -> Optional[uuid.UUID]: if not ref: return None @@ -398,9 +412,19 @@ class AppDslService: config_id = memory.get("memory_config_id") or memory.get("memory_content") if not config_id: return memory - from app.models.memory_config_model import MemoryConfig as MemoryConfigModel + try: + config_uuid = uuid.UUID(str(config_id)) + except (ValueError, AttributeError): + exists = self.db.query(MemoryConfigModel).filter( + MemoryConfigModel.config_id_old == int(config_id), + MemoryConfigModel.workspace_id == workspace_id + ).first() + if not exists: + warnings.append(f"记忆配置 '{config_id}' 未匹配,已置空,请导入后手动配置") + return {**memory, "memory_config_id": None, "enabled": False} + return memory exists = self.db.query(MemoryConfigModel).filter( - MemoryConfigModel.config_id == config_id, + MemoryConfigModel.config_id == config_uuid, MemoryConfigModel.workspace_id == workspace_id ).first() if not exists: From 110de0afbc08ae542e0b4fb37359e8470ae6f09a Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 12 Mar 2026 18:35:09 +0800 Subject: [PATCH 05/42] [add] RAG storage displays the page effect --- .../memory_dashboard_controller.py | 7 +- api/app/services/memory_dashboard_service.py | 99 +++++++++++-------- 2 files changed, 63 insertions(+), 43 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 3bbb5cf7..582b759f 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -403,14 +403,15 @@ def get_current_user_rag_total_num( @router.get("/rag_content", response_model=ApiResponse) def get_rag_content( end_user_id: str = Query(..., description="宿主ID"), - limit: int = Query(15, description="返回记录数"), + page: int = Query(1, gt=0, description="页码,从1开始"), + pagesize: int = Query(15, gt=0, le=100, description="每页返回记录数"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ - 获取当前宿主知识库中的chunk内容 + 获取当前宿主知识库中的chunk内容(分页) """ - data = memory_dashboard_service.get_rag_content(end_user_id, limit, db, current_user) + data = memory_dashboard_service.get_rag_content(end_user_id, page, pagesize, db, current_user) return success(data=data, msg="宿主RAGchunk数据获取成功") diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index db49c50a..a3859fe5 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -535,7 +535,8 @@ def get_users_total_chunk_batch( def get_rag_content( end_user_id: str, - limit: int, + page: int, + pagesize: int, db: Session, current_user: User ) -> dict: @@ -543,9 +544,9 @@ def get_rag_content( 先在documents表中查询file_name=='end_user_id'+'.txt'的id和kb_id, 然后调用/chunks/{kb_id}/{document_id}/chunks接口的相关代码获取所有内容, 接着对获取的内容进行提取,只要page_content的内容, - 最后返回数据 + 最后返回分页数据 """ - business_logger.info(f"获取RAG内容: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") + business_logger.info(f"获取RAG内容: end_user_id={end_user_id}, page={page}, pagesize={pagesize}, 操作者: {current_user.username}") try: from app.models.document_model import Document @@ -562,63 +563,76 @@ def get_rag_content( if not documents: business_logger.warning(f"未找到文件: {file_name}") return { - "total": 0, - "contents": [] + "page": { + "page": page, + "pagesize": pagesize, + "total": 0, + "hasnext": False, + }, + "items": [] } business_logger.info(f"找到 {len(documents)} 个文档记录") - # 3. 获取所有chunks的page_content - all_contents = [] - total_chunks = 0 + # 3. 按全局偏移量计算当前页数据 + # 全局偏移范围:[offset_start, offset_end) + offset_start = (page - 1) * pagesize + offset_end = offset_start + pagesize + + global_total = 0 # 所有文档的 chunk 总数 + page_contents = [] # 当前页的内容 for document in documents: try: - # 获取知识库信息 kb = knowledge_repository.get_knowledge_by_id(db, document.kb_id) if not kb: business_logger.warning(f"知识库不存在: kb_id={document.kb_id}") continue - # 初始化向量服务 vector_service = ElasticSearchVectorFactory().init_vector(knowledge=kb) - # 获取该文档的所有chunks(分页获取) - page = 1 - pagesize = 100 # 每页100条 + # 先用 pagesize=1 获取该文档的 chunk 总数 + doc_total, _ = vector_service.search_by_segment( + document_id=str(document.id), + query=None, + pagesize=1, + page=1, + asc=True + ) - while True: - total, items = vector_service.search_by_segment( + doc_offset_start = global_total # 该文档在全局中的起始偏移 + doc_offset_end = global_total + doc_total # 该文档在全局中的结束偏移 + global_total += doc_total + + # 当前页与该文档无交集,跳过 + if doc_offset_end <= offset_start or doc_offset_start >= offset_end: + continue + + # 计算需要从该文档取的局部范围 + local_start = max(offset_start - doc_offset_start, 0) + local_end = min(offset_end - doc_offset_start, doc_total) + need_count = local_end - local_start + + # 换算成 ES 分页参数(ES page 从1开始) + es_page = (local_start // pagesize) + 1 + es_offset_in_page = local_start % pagesize + + fetched = [] + while len(fetched) < es_offset_in_page + need_count: + _, items = vector_service.search_by_segment( document_id=str(document.id), query=None, pagesize=pagesize, - page=page, + page=es_page, asc=True ) - if not items: break - - # 提取page_content - for item in items: - all_contents.append(item.page_content) - total_chunks += 1 - - # # 如果达到limit限制,直接返回 - # if limit > 0 and total_chunks >= limit: - # business_logger.info(f"已达到limit限制: {limit}") - # return { - # "total": total_chunks, - # "contents": all_contents[:limit] - # } - - # 检查是否还有下一页 - if page * pagesize >= total: - break - - page += 1 + fetched.extend(items) + es_page += 1 - business_logger.info(f"文档 {document.id} 获取了 {len(items)} 个chunks") + slice_items = fetched[es_offset_in_page: es_offset_in_page + need_count] + page_contents.extend([item.page_content for item in slice_items]) except Exception as e: business_logger.error(f"获取文档 {document.id} 的chunks失败: {str(e)}") @@ -626,11 +640,16 @@ def get_rag_content( # 4. 返回结果 result = { - "total": total_chunks, - "contents": all_contents[:limit] if limit > 0 else all_contents + "page": { + "page": page, + "pagesize": pagesize, + "total": global_total, + "hasnext": offset_end < global_total, + }, + "items": page_contents } - business_logger.info(f"成功获取RAG内容: total={total_chunks}, 返回={len(result['contents'])} 条") + business_logger.info(f"成功获取RAG内容: total={global_total}, page={page}, 返回={len(page_contents)} 条") return result except Exception as e: From 75b87955dd2e71592ad200bca53ffe1fe68cfad8 Mon Sep 17 00:00:00 2001 From: zhaoying Date: Thu, 12 Mar 2026 18:36:49 +0800 Subject: [PATCH 06/42] feat(web): rag content add page --- web/src/api/memory.ts | 7 +- web/src/components/PageScrollList/index.tsx | 80 ++++++++++--------- web/src/views/UserMemoryDetail/Rag.tsx | 11 ++- .../components/ConversationMemory.tsx | 77 ++++++------------ 4 files changed, 75 insertions(+), 100 deletions(-) diff --git a/web/src/api/memory.ts b/web/src/api/memory.ts index 491f78ea..c40e3f30 100644 --- a/web/src/api/memory.ts +++ b/web/src/api/memory.ts @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-03 14:00:06 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-04 10:58:41 + * @Last Modified time: 2026-03-12 18:25:06 */ import { request } from '@/utils/request' import type { @@ -118,8 +118,9 @@ export const getChunkInsight = (end_user_id: string) => { return request.get(`/dashboard/chunk_insight`, { end_user_id }) } // RAG User Memory - Storage content -export const getRagContent = (end_user_id: string) => { - return request.get(`/dashboard/rag_content`, { end_user_id, limit: 20 }) +export const getRagContentUrl = '/dashboard/rag_content' +export const getRagContent = (end_user_id: string, page = 1, pagesize = 20) => { + return request.get(getRagContentUrl, { end_user_id, page, pagesize }) } // Emotion distribution analysis export const getWordCloud = (end_user_id: string) => { diff --git a/web/src/components/PageScrollList/index.tsx b/web/src/components/PageScrollList/index.tsx index a877a9c7..0a32dfdb 100644 --- a/web/src/components/PageScrollList/index.tsx +++ b/web/src/components/PageScrollList/index.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-02 15:18:19 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-02-03 15:44:42 + * @Last Modified time: 2026-03-12 18:36:19 */ /** * PageScrollList Component @@ -60,8 +60,8 @@ interface PageScrollListProps> { /** Infinite scroll list component with pagination support */ const PageScrollList = forwardRef(>({ - renderItem, - query, + renderItem, + query, url, column = 4, className = '', @@ -69,68 +69,70 @@ const PageScrollList = forwardRef(>({ }: PageScrollListProps, ref: React.Ref) => { /** Expose refresh method to parent component */ useImperativeHandle(ref, () => ({ - refresh, + refresh: () => { + pageRef.current = 1; + loadingRef.current = false; + setHasMore(true); + setData([]); + loadMoreData(true); + }, })); const [loading, setLoading] = useState(false); const [data, setData] = useState([]); - const [page, setPage] = useState(1); const [hasMore, setHasMore] = useState(true); const scrollRef = useRef(null); + const pageRef = useRef(1); + const loadingRef = useRef(false); + const hasMoreRef = useRef(true); /** Load more data from API with pagination */ - const loadMoreData = (flag?: boolean) => { - if (!flag && (loading || !hasMore)) { - return; - } + const loadMoreData = (reset?: boolean) => { + if (loadingRef.current || (!reset && !hasMoreRef.current)) return; + loadingRef.current = true; setLoading(true); + const currentPage = reset ? 1 : pageRef.current; request.get(url, { - page: page, + page: currentPage, pagesize: PAGE_SIZE, - ...(query||{}), + ...(query || {}), }) .then((res) => { const response = res as ApiResponse; const results = Array.isArray(response.items) ? response.items : Array.isArray(response) ? response as T[] : []; - // Replace data if flag is true, otherwise append - if (flag) { - setData(results); - } else { - setData(data.concat(results)); - } - setPage(response.page.page + 1); + pageRef.current = response.page.page + 1; + setData(prev => reset ? results : [...prev, ...results]); + hasMoreRef.current = response.page?.hasnext; setHasMore(response.page?.hasnext); - setLoading(false); - console.log(`${results.length} more items loaded!`); }) .catch(() => { - setLoading(false); + hasMoreRef.current = false; setHasMore(false); - console.error('Failed to load data'); }) .finally(() => { + loadingRef.current = false; setLoading(false); + // 内容不足以填满容器时,主动继续加载 + setTimeout(() => { + const el = scrollRef.current; + console.log(el, el?.scrollHeight, el?.clientHeight, hasMoreRef.current) + if (el && hasMoreRef.current && el.scrollHeight <= el.clientHeight) { + loadMoreData(); + } + }, 0); }); }; - /** Reset list to initial state and reload data */ - const refresh = () => { - setPage(1); + /** Reset and reload when query parameters change */ + const queryKey = JSON.stringify(query); + useEffect(() => { + pageRef.current = 1; + loadingRef.current = false; + hasMoreRef.current = true; setHasMore(true); setData([]); - } + loadMoreData(true); + }, [queryKey]); - /** Refresh when query parameters change */ - useEffect(() => { - refresh() - }, [query]); - - /** Load initial data when list is reset */ - useEffect(() => { - if (page === 1 && hasMore && data.length === 0) { - loadMoreData(true); - } - }, [page, hasMore, data]) - return ( <>
>({ > loadMoreData()} hasMore={hasMore} loader={loading && needLoading ? : false} // endMessage={It is all, nothing more 🤐} diff --git a/web/src/views/UserMemoryDetail/Rag.tsx b/web/src/views/UserMemoryDetail/Rag.tsx index 0f82ee05..3935bcc0 100644 --- a/web/src/views/UserMemoryDetail/Rag.tsx +++ b/web/src/views/UserMemoryDetail/Rag.tsx @@ -1,8 +1,8 @@ /* * @Author: ZhaoYing * @Date: 2026-02-03 17:57:11 - * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-02-03 17:57:11 + * @Last Modified by: ZhaoYing + * @Last Modified time: 2026-03-12 18:00:11 */ /** * RAG User Memory Detail View @@ -150,9 +150,12 @@ const Rag: FC = () => { }) } return ( - + - +
{name?.[0]}
diff --git a/web/src/views/UserMemoryDetail/components/ConversationMemory.tsx b/web/src/views/UserMemoryDetail/components/ConversationMemory.tsx index 928b340f..2f5b4ce6 100644 --- a/web/src/views/UserMemoryDetail/components/ConversationMemory.tsx +++ b/web/src/views/UserMemoryDetail/components/ConversationMemory.tsx @@ -1,74 +1,43 @@ /* * @Author: ZhaoYing * @Date: 2026-02-03 18:34:04 - * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-02-03 18:34:04 + * @Last Modified by: ZhaoYing + * @Last Modified time: 2026-03-12 18:34:52 */ -/** - * Conversation Memory Component - * Displays RAG conversation memory content list - */ - -import { type FC, useEffect, useState } from 'react' +import { type FC } from 'react' import { useTranslation } from 'react-i18next' import { useParams } from 'react-router-dom' -import { Skeleton, List } from 'antd'; import RbCard from '@/components/RbCard/Card' -import Empty from '@/components/Empty'; +import PageScrollList from '@/components/PageScrollList' import Markdown from '@/components/Markdown' -import { - getRagContent -} from '@/api/memory' +import { getRagContentUrl } from '@/api/memory' -const ConversationMemory:FC = () => { +const ConversationMemory: FC = () => { const { t } = useTranslation() const { id } = useParams() - const [loading, setLoading] = useState(true) - const [list, setList] = useState([]) - - useEffect(() => { - if (!id) return - getList() - }, [id]) - /** Fetch conversation memory list */ - const getList = () => { - if (!id) return - setLoading(true) - getRagContent(id).then((res) => { - setList((res as { contents?: [] }).contents || []) - }) - .finally(() => { - setLoading(false) - }) - } return ( - - {loading - ? - : list.length > 0 - ? ( - -
- -
-
- )} - /> - : - } + + url={getRagContentUrl} + query={{ end_user_id: id }} + column={1} + renderItem={(item) => ( +
+ +
+ )} + className="rb:h-full!" + // className="rb:h-[calc(100%-24px)]!" + />
) } -export default ConversationMemory \ No newline at end of file + +export default ConversationMemory From 572ce7f9eca43cbb5cb0f5be6d3dc0c85b14f0b9 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 12 Mar 2026 19:13:24 +0800 Subject: [PATCH 07/42] [changes] Field standardization --- api/app/services/memory_dashboard_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index a3859fe5..7bcf15cf 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -749,8 +749,8 @@ async def generate_rag_profile( if not end_user: raise ValueError(f"end_user {end_user_id} 不存在") - rag_content = get_rag_content(end_user_id, limit, db, current_user) - chunks = rag_content.get("contents", []) + rag_content = get_rag_content(end_user_id, 1, limit, db, current_user) + chunks = rag_content.get("items", []) if not chunks: business_logger.warning(f"未找到chunk内容,无法生产RAG画像: end_user_id={end_user_id}") From e368f1c1d695803103509b5134332ce246b5c1a6 Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Thu, 12 Mar 2026 19:40:14 +0800 Subject: [PATCH 08/42] fix(mcp square): Do not obtain the mcp service when the token is empty. --- .../mcp_market_config_controller.py | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/api/app/controllers/mcp_market_config_controller.py b/api/app/controllers/mcp_market_config_controller.py index 7f73663e..930a2aea 100644 --- a/api/app/controllers/mcp_market_config_controller.py +++ b/api/app/controllers/mcp_market_config_controller.py @@ -65,13 +65,18 @@ async def get_mcp_servers( api_logger.warning( f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or access is denied" ) # 3. Execute paged query - api = MCPApi() token = db_mcp_market_config.token + if not token: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="MCP market config token is not configured" + ) + api = MCPApi() api.login(token) body = { @@ -141,13 +146,18 @@ async def get_operational_mcp_servers( api_logger.warning( f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or access is denied" ) # 2. Execute paged query - api = MCPApi() token = db_mcp_market_config.token + if not token: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="MCP market config token is not configured" + ) + api = MCPApi() api.login(token) url = f'{api.mcp_base_url}/operational' @@ -199,13 +209,18 @@ async def get_mcp_server( api_logger.warning( f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or access is denied" ) # 2. Get detailed information for a specific MCP Server - api = MCPApi() token = db_mcp_market_config.token + if not token: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="MCP market config token is not configured" + ) + api = MCPApi() api.login(token) result = api.get_mcp_server(server_id=server_id) @@ -263,7 +278,7 @@ async def get_mcp_market_config( if not db_mcp_market_config: api_logger.warning(f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or access is denied" ) @@ -296,7 +311,7 @@ async def get_mcp_market_config_by_mcp_market_id( if not db_mcp_market_config: api_logger.warning(f"The mcp market config does not exist or access is denied: mcp_market_id={mcp_market_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or access is denied" ) @@ -325,7 +340,7 @@ async def update_mcp_market_config( api_logger.warning( f"The mcp market config does not exist or you do not have permission to access it: mcp_market_config_id={mcp_market_config_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or you do not have permission to access it" ) @@ -382,7 +397,7 @@ async def delete_mcp_market_config( api_logger.warning( f"The mcp market config does not exist or you do not have permission to access it: mcp_market_config_id={mcp_market_config_id}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status_code=status.HTTP_400_BAD_REQUEST, detail="The mcp market config does not exist or you do not have permission to access it" ) From 6a67f028ce8e8bc10088d58f024686e4b99e9f89 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 12 Mar 2026 19:50:32 +0800 Subject: [PATCH 09/42] [changes] Set constants --- api/app/services/memory_dashboard_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 7bcf15cf..be656acb 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -749,7 +749,7 @@ async def generate_rag_profile( if not end_user: raise ValueError(f"end_user {end_user_id} 不存在") - rag_content = get_rag_content(end_user_id, 1, limit, db, current_user) + rag_content = get_rag_content(end_user_id, page=1, pagesize=limit, db=db, current_user=current_user) chunks = rag_content.get("items", []) if not chunks: From 6c691812909ac154b94a9efb6a73155f50dceb87 Mon Sep 17 00:00:00 2001 From: yujiangping Date: Fri, 13 Mar 2026 10:33:11 +0800 Subject: [PATCH 10/42] fix:reset --- web/src/views/ToolManagement/Market.tsx | 80 +++++++++++++++++++------ 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/web/src/views/ToolManagement/Market.tsx b/web/src/views/ToolManagement/Market.tsx index 9bcf5f67..c1f25100 100644 --- a/web/src/views/ToolManagement/Market.tsx +++ b/web/src/views/ToolManagement/Market.tsx @@ -9,7 +9,6 @@ import type { McpServiceModalRef } from './types'; import pageEmptyIcon from '@/assets/images/empty/pageEmpty.png' import Empty from '@/components/Empty/index' import { getMarketTools, getMarketConfig, getMarketMCPs, getMarketMCPDetail, getMarketMCPsActivated, getTools } from '@/api/tools'; -import BodyWrapper from '@/components/Empty/BodyWrapper'; interface MarketSource { id: string; name: string; @@ -75,6 +74,7 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => const [activatedMcps, setActivatedMcps] = useState([]); const [currentPage, setCurrentPage] = useState(1); const pageSize = 20; + const searchTimerRef = useRef(null); // 获取市场数据 useEffect(() => { @@ -109,7 +109,7 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => fetchMarketData(); }, [message]); - const fetchMcpList = async (sourceId: string, page = 1, append = false) => { + const fetchMcpList = async (sourceId: string, page = 1, append = false, keywords = '') => { setLoading(true); try { let configId = configIdMap[sourceId]; @@ -139,7 +139,12 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => const allTools: any = await getTools({ tool_type: 'mcp' }); const toolsList = Array.isArray(allTools) ? allTools : []; - const res: any = await getMarketMCPs({ mcp_market_config_id: configId, page, pagesize: pageSize }); + const res: any = await getMarketMCPs({ + mcp_market_config_id: configId, + page, + pagesize: pageSize, + ...(keywords ? { keywords } : {}) + }); if (res?.items && Array.isArray(res.items)) { // 标记已激活和已入库的 MCP const mcpsWithActivated = res.items.map((item: MarketMcp) => { @@ -176,8 +181,46 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => const loadMore = useCallback(() => { if (!selectedSource || loading) return; - fetchMcpList(selectedSource, currentPage + 1, true); - }, [selectedSource, currentPage, loading]); + fetchMcpList(selectedSource, currentPage + 1, true, searchKeyword); + }, [selectedSource, currentPage, loading, searchKeyword]); + + const handleSearchChange = (value: string) => { + setSearchKeyword(value); + + // 清除之前的定时器 + if (searchTimerRef.current) { + clearTimeout(searchTimerRef.current); + } + + // 如果清空搜索框,恢复原始列表 + if (!value.trim()) { + if (selectedSource) { + // 清除缓存,重新加载原始列表 + setMcpCache(prev => { + const next = { ...prev }; + delete next[selectedSource]; + return next; + }); + setCurrentPage(1); + fetchMcpList(selectedSource, 1, false, ''); + } + return; + } + + // 设置新的定时器,500ms 后执行搜索 + searchTimerRef.current = setTimeout(() => { + if (selectedSource) { + // 清除缓存,重新搜索 + setMcpCache(prev => { + const next = { ...prev }; + delete next[selectedSource]; + return next; + }); + setCurrentPage(1); + fetchMcpList(selectedSource, 1, false, value); + } + }, 500); + }; const handleSelectSource = async (sourceId: string) => { setSelectedSource(sourceId); @@ -313,12 +356,6 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => if (!source) return null; const mcpList = mcpCache[selectedSource] || []; - const filteredList = mcpList.filter(mcp => { - const name = getLocaleField(mcp, 'name'); - const desc = getLocaleField(mcp, 'description'); - return name.toLowerCase().includes(searchKeyword.toLowerCase()) || - desc.toLowerCase().includes(searchKeyword.toLowerCase()); - }); return ( <> @@ -358,15 +395,14 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => {t('tool.marketRefresh')} )} - {mcpList.length > 0 && ( } placeholder={t('tool.marketSearchPlaceholder')} value={searchKeyword} - onChange={(e) => setSearchKeyword(e.target.value)} + onChange={(e) => handleSearchChange(e.target.value)} + allowClear style={{ width: 200 }} /> - )}
-
+ {!loading && mcpList.length === 0 ? ( + + ) : ( } @@ -394,7 +438,7 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => gridTemplateColumns: 'repeat(auto-fill, minmax(300px, 1fr))' }} > - {filteredList.map(mcp => ( + {mcpList.map(mcp => (
ReactNode }> = () => ))}
+ )}
-
); From b88e9c5f5e7c1254200904bcdfcc0f33131b44f6 Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Fri, 13 Mar 2026 11:07:32 +0800 Subject: [PATCH 11/42] fix(mcp): The MCP Square can obtain a maximum of 100 MCP services. --- api/app/controllers/mcp_market_config_controller.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/app/controllers/mcp_market_config_controller.py b/api/app/controllers/mcp_market_config_controller.py index 930a2aea..b1c22975 100644 --- a/api/app/controllers/mcp_market_config_controller.py +++ b/api/app/controllers/mcp_market_config_controller.py @@ -55,6 +55,12 @@ async def get_mcp_servers( status_code=status.HTTP_400_BAD_REQUEST, detail="The paging parameter must be greater than 0" ) + if page * pagesize > 100: + api_logger.warning(f"Paging parameters exceed ModelScope limit: page={page}, pagesize={pagesize}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"page × pagesize must not exceed 100 (got {page} × {pagesize} = {page * pagesize})" + ) # 2. Query mcp market config information from the database api_logger.debug(f"Query mcp market config: {mcp_market_config_id}") From dac1c01a2c923a10001ffde800d43b200b768cd0 Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Fri, 13 Mar 2026 14:05:27 +0800 Subject: [PATCH 12/42] fix(mcp): bug fix --- .../mcp_market_config_controller.py | 35 ++++--------------- api/app/core/tools/mcp/client.py | 1 + 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/api/app/controllers/mcp_market_config_controller.py b/api/app/controllers/mcp_market_config_controller.py index b1c22975..95449310 100644 --- a/api/app/controllers/mcp_market_config_controller.py +++ b/api/app/controllers/mcp_market_config_controller.py @@ -70,10 +70,7 @@ async def get_mcp_servers( if not db_mcp_market_config: api_logger.warning( f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or access is denied" - ) + return success(msg='The mcp market config does not exist or access is denied') # 3. Execute paged query token = db_mcp_market_config.token @@ -151,10 +148,7 @@ async def get_operational_mcp_servers( if not db_mcp_market_config: api_logger.warning( f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or access is denied" - ) + return success(msg='The mcp market config does not exist or access is denied') # 2. Execute paged query token = db_mcp_market_config.token @@ -214,10 +208,7 @@ async def get_mcp_server( if not db_mcp_market_config: api_logger.warning( f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or access is denied" - ) + return success(msg='The mcp market config does not exist or access is denied') # 2. Get detailed information for a specific MCP Server token = db_mcp_market_config.token @@ -283,10 +274,7 @@ async def get_mcp_market_config( db_mcp_market_config = mcp_market_config_service.get_mcp_market_config_by_id(db, mcp_market_config_id=mcp_market_config_id, current_user=current_user) if not db_mcp_market_config: api_logger.warning(f"The mcp market config does not exist or access is denied: mcp_market_config_id={mcp_market_config_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or access is denied" - ) + return success(msg='The mcp market config does not exist or access is denied') api_logger.info(f"mcp market config query successful: (ID: {db_mcp_market_config.id})") return success(data=jsonable_encoder(mcp_market_config_schema.McpMarketConfig.model_validate(db_mcp_market_config)), @@ -316,10 +304,7 @@ async def get_mcp_market_config_by_mcp_market_id( db_mcp_market_config = mcp_market_config_service.get_mcp_market_config_by_mcp_market_id(db, mcp_market_id=mcp_market_id, current_user=current_user) if not db_mcp_market_config: api_logger.warning(f"The mcp market config does not exist or access is denied: mcp_market_id={mcp_market_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or access is denied" - ) + return success(msg='The mcp market config does not exist or access is denied') api_logger.info(f"mcp market config query successful: (ID: {db_mcp_market_config.id})") return success(data=jsonable_encoder(mcp_market_config_schema.McpMarketConfig.model_validate(db_mcp_market_config)), @@ -345,10 +330,7 @@ async def update_mcp_market_config( if not db_mcp_market_config: api_logger.warning( f"The mcp market config does not exist or you do not have permission to access it: mcp_market_config_id={mcp_market_config_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or you do not have permission to access it" - ) + return success(msg='The mcp market config does not exist or access is denied') # 2. Update fields (only update non-null fields) api_logger.debug(f"Start updating the mcp market config fields: {mcp_market_config_id}") @@ -402,10 +384,7 @@ async def delete_mcp_market_config( if not db_mcp_market_config: api_logger.warning( f"The mcp market config does not exist or you do not have permission to access it: mcp_market_config_id={mcp_market_config_id}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The mcp market config does not exist or you do not have permission to access it" - ) + return success(msg='The mcp market config does not exist or access is denied') # 2. Deleting mcp market config mcp_market_config_service.delete_mcp_market_config_by_id(db, mcp_market_config_id=mcp_market_config_id, current_user=current_user) diff --git a/api/app/core/tools/mcp/client.py b/api/app/core/tools/mcp/client.py index c082b314..f19902a2 100644 --- a/api/app/core/tools/mcp/client.py +++ b/api/app/core/tools/mcp/client.py @@ -53,6 +53,7 @@ class SimpleMCPClient: else: await self._connect_http() except Exception as e: + await self.disconnect() logger.error(f"MCP连接失败: {self.server_url}, 错误: {e}") raise MCPConnectionError(f"连接失败: {e}") From d03473da10a1bfe23632885664441254b4be3b19 Mon Sep 17 00:00:00 2001 From: yujiangping Date: Fri, 13 Mar 2026 14:07:41 +0800 Subject: [PATCH 13/42] fix:input disable --- .../views/ToolManagement/components/MarketConfigModal.tsx | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/web/src/views/ToolManagement/components/MarketConfigModal.tsx b/web/src/views/ToolManagement/components/MarketConfigModal.tsx index 2b4496fa..280512de 100644 --- a/web/src/views/ToolManagement/components/MarketConfigModal.tsx +++ b/web/src/views/ToolManagement/components/MarketConfigModal.tsx @@ -37,6 +37,7 @@ const MarketConfigModal = forwardRef(null); const [showApiKey, setShowApiKey] = useState(false); + const formValues = Form.useWatch([], form); const handleClose = () => { setVisible(false); @@ -101,6 +102,9 @@ const MarketConfigModal = forwardRef 0; + useImperativeHandle(ref, () => ({ handleOpen, handleClose @@ -116,6 +120,7 @@ const MarketConfigModal = forwardRef
@@ -169,7 +174,7 @@ const MarketConfigModal = forwardRef - API Key ({t('tool.marketApiKeyOptional')}) + API Key } extra={{t('tool.marketApiKeyExtra')}} From db8257b67ae526e3bf2cd42dd8f1bfc012c0411e Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 17:06:43 +0800 Subject: [PATCH 14/42] [add] Create community nodes --- .../clustering_engine/__init__.py | 3 + .../clustering_engine/label_propagation.py | 311 ++++++++++++++++++ .../neo4j/community_repository.py | 129 ++++++++ api/app/repositories/neo4j/cypher_queries.py | 93 +++++- api/app/repositories/neo4j/graph_saver.py | 34 ++ 5 files changed, 569 insertions(+), 1 deletion(-) create mode 100644 api/app/core/memory/storage_services/clustering_engine/__init__.py create mode 100644 api/app/core/memory/storage_services/clustering_engine/label_propagation.py create mode 100644 api/app/repositories/neo4j/community_repository.py diff --git a/api/app/core/memory/storage_services/clustering_engine/__init__.py b/api/app/core/memory/storage_services/clustering_engine/__init__.py new file mode 100644 index 00000000..992d8bff --- /dev/null +++ b/api/app/core/memory/storage_services/clustering_engine/__init__.py @@ -0,0 +1,3 @@ +from app.core.memory.storage_services.clustering_engine.label_propagation import LabelPropagationEngine + +__all__ = ["LabelPropagationEngine"] diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py new file mode 100644 index 00000000..80e238fd --- /dev/null +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -0,0 +1,311 @@ +"""标签传播聚类引擎 + +基于 ZEP 论文的动态标签传播算法,对 Neo4j 中的 ExtractedEntity 节点进行社区聚类。 + +支持两种模式: +- 全量初始化(full_clustering):首次运行,对所有实体做完整 LPA 迭代 +- 增量更新(incremental_update):新实体到达时,只处理新实体及其邻居 +""" + +import logging +import uuid +from math import sqrt +from typing import Dict, List, Optional + +from app.repositories.neo4j.community_repository import CommunityRepository +from app.repositories.neo4j.neo4j_connector import Neo4jConnector + +logger = logging.getLogger(__name__) + +# 全量迭代最大轮数,防止不收敛 +MAX_ITERATIONS = 10 + + +def _cosine_similarity(v1: Optional[List[float]], v2: Optional[List[float]]) -> float: + """计算两个向量的余弦相似度,任一为空则返回 0。""" + if not v1 or not v2 or len(v1) != len(v2): + return 0.0 + dot = sum(a * b for a, b in zip(v1, v2)) + norm1 = sqrt(sum(a * a for a in v1)) + norm2 = sqrt(sum(b * b for b in v2)) + if norm1 == 0 or norm2 == 0: + return 0.0 + return dot / (norm1 * norm2) + + +def _weighted_vote( + neighbors: List[Dict], + self_embedding: Optional[List[float]], +) -> Optional[str]: + """ + 加权多数投票,选出得票最高的社区。 + + 权重 = 语义相似度(name_embedding 余弦)* activation_value 加成 + 没有 community_id 的邻居不参与投票。 + """ + votes: Dict[str, float] = {} + for nb in neighbors: + cid = nb.get("community_id") + if not cid: + continue + sem = _cosine_similarity(self_embedding, nb.get("name_embedding")) + act = nb.get("activation_value") or 0.5 + # 语义相似度权重 0.6,激活值权重 0.4 + weight = 0.6 * sem + 0.4 * act + votes[cid] = votes.get(cid, 0.0) + weight + + if not votes: + return None + return max(votes, key=votes.__getitem__) + + +class LabelPropagationEngine: + """标签传播聚类引擎""" + + def __init__(self, connector: Neo4jConnector): + self.connector = connector + self.repo = CommunityRepository(connector) + + # ────────────────────────────────────────────────────────────────────────── + # 公开接口 + # ────────────────────────────────────────────────────────────────────────── + + async def run( + self, + end_user_id: str, + new_entity_ids: Optional[List[str]] = None, + ) -> None: + """ + 统一入口:自动判断全量还是增量。 + + - 若该用户尚无 Community 节点 → 全量初始化 + - 否则 → 增量更新(仅处理 new_entity_ids) + """ + has_communities = await self.repo.has_communities(end_user_id) + if not has_communities: + logger.info(f"[Clustering] 用户 {end_user_id} 首次聚类,执行全量初始化") + await self.full_clustering(end_user_id) + else: + if new_entity_ids: + logger.info( + f"[Clustering] 增量更新,新实体数: {len(new_entity_ids)}" + ) + await self.incremental_update(new_entity_ids, end_user_id) + + async def full_clustering(self, end_user_id: str) -> None: + """ + 全量标签传播初始化。 + + 1. 拉取所有实体,初始化每个实体为独立社区 + 2. 迭代:每轮对所有实体做邻居投票,更新社区标签 + 3. 直到标签不再变化或达到 MAX_ITERATIONS + 4. 将最终标签写入 Neo4j + """ + entities = await self.repo.get_all_entities(end_user_id) + if not entities: + logger.info(f"[Clustering] 用户 {end_user_id} 无实体,跳过全量聚类") + return + + # 初始化:每个实体持有自己 id 作为社区标签 + labels: Dict[str, str] = {e["id"]: e["id"] for e in entities} + embeddings: Dict[str, Optional[List[float]]] = { + e["id"]: e.get("name_embedding") for e in entities + } + + for iteration in range(MAX_ITERATIONS): + changed = 0 + # 随机顺序(Python dict 在 3.7+ 保持插入顺序,这里直接遍历) + for entity in entities: + eid = entity["id"] + neighbors = await self.repo.get_entity_neighbors(eid, end_user_id) + + # 将邻居的当前内存标签注入(覆盖 Neo4j 中的旧值) + enriched = [] + for nb in neighbors: + nb_copy = dict(nb) + nb_copy["community_id"] = labels.get(nb["id"], nb.get("community_id")) + enriched.append(nb_copy) + + new_label = _weighted_vote(enriched, embeddings.get(eid)) + if new_label and new_label != labels[eid]: + labels[eid] = new_label + changed += 1 + + logger.info( + f"[Clustering] 全量迭代 {iteration + 1}/{MAX_ITERATIONS}," + f"标签变化数: {changed}" + ) + if changed == 0: + logger.info("[Clustering] 标签已收敛,提前结束迭代") + break + + # 将最终标签写入 Neo4j + await self._flush_labels(labels, end_user_id) + logger.info( + f"[Clustering] 全量聚类完成,共 {len(set(labels.values()))} 个社区," + f"{len(labels)} 个实体" + ) + + async def incremental_update( + self, new_entity_ids: List[str], end_user_id: str + ) -> None: + """ + 增量更新:只处理新实体及其邻居,不重跑全图。 + + 1. 对每个新实体查询邻居 + 2. 加权多数投票决定社区归属 + 3. 若邻居无社区 → 创建新社区 + 4. 若邻居分属多个社区 → 评估是否合并 + """ + for entity_id in new_entity_ids: + await self._process_single_entity(entity_id, end_user_id) + + # ────────────────────────────────────────────────────────────────────────── + # 内部方法 + # ────────────────────────────────────────────────────────────────────────── + + async def _process_single_entity( + self, entity_id: str, end_user_id: str + ) -> None: + """处理单个新实体的社区分配。""" + neighbors = await self.repo.get_entity_neighbors(entity_id, end_user_id) + + # 查询自身 embedding(从邻居查询结果中无法获取,需单独查) + self_embedding = await self._get_entity_embedding(entity_id, end_user_id) + + if not neighbors: + # 孤立实体:创建单成员社区 + new_cid = self._new_community_id() + await self.repo.upsert_community(new_cid, end_user_id, member_count=1) + await self.repo.assign_entity_to_community(entity_id, new_cid, end_user_id) + logger.debug(f"[Clustering] 孤立实体 {entity_id} → 新社区 {new_cid}") + return + + # 统计邻居社区分布 + community_ids_in_neighbors = set( + nb["community_id"] for nb in neighbors if nb.get("community_id") + ) + + target_cid = _weighted_vote(neighbors, self_embedding) + + if target_cid is None: + # 邻居都没有社区,连同新实体一起创建新社区 + new_cid = self._new_community_id() + await self.repo.upsert_community(new_cid, end_user_id) + await self.repo.assign_entity_to_community(entity_id, new_cid, end_user_id) + for nb in neighbors: + await self.repo.assign_entity_to_community( + nb["id"], new_cid, end_user_id + ) + await self.repo.refresh_member_count(new_cid, end_user_id) + logger.debug( + f"[Clustering] 新实体 {entity_id} 与 {len(neighbors)} 个无社区邻居 → 新社区 {new_cid}" + ) + else: + # 加入得票最多的社区 + await self.repo.assign_entity_to_community(entity_id, target_cid, end_user_id) + await self.repo.refresh_member_count(target_cid, end_user_id) + logger.debug(f"[Clustering] 新实体 {entity_id} → 社区 {target_cid}") + + # 若邻居分属多个社区,评估合并 + if len(community_ids_in_neighbors) > 1: + await self._evaluate_merge( + list(community_ids_in_neighbors), end_user_id + ) + + async def _evaluate_merge( + self, community_ids: List[str], end_user_id: str + ) -> None: + """ + 评估多个社区是否应合并。 + + 策略:计算各社区成员 embedding 的平均向量,若两两余弦相似度 > 0.75 则合并。 + 合并时保留成员数最多的社区,其余成员迁移过来。 + """ + MERGE_THRESHOLD = 0.75 + + community_embeddings: Dict[str, Optional[List[float]]] = {} + community_sizes: Dict[str, int] = {} + + for cid in community_ids: + members = await self.repo.get_community_members(cid, end_user_id) + community_sizes[cid] = len(members) + # 计算社区成员 embedding 的平均向量 + valid_embeddings = [ + m["name_embedding"] + for m in members + if m.get("name_embedding") + ] + if valid_embeddings: + dim = len(valid_embeddings[0]) + avg = [ + sum(e[i] for e in valid_embeddings) / len(valid_embeddings) + for i in range(dim) + ] + community_embeddings[cid] = avg + else: + community_embeddings[cid] = None + + # 找出应合并的社区对 + to_merge: List[tuple] = [] + cids = list(community_ids) + for i in range(len(cids)): + for j in range(i + 1, len(cids)): + sim = _cosine_similarity( + community_embeddings[cids[i]], + community_embeddings[cids[j]], + ) + if sim > MERGE_THRESHOLD: + to_merge.append((cids[i], cids[j])) + + for c1, c2 in to_merge: + keep = c1 if community_sizes.get(c1, 0) >= community_sizes.get(c2, 0) else c2 + dissolve = c2 if keep == c1 else c1 + members = await self.repo.get_community_members(dissolve, end_user_id) + for m in members: + await self.repo.assign_entity_to_community( + m["id"], keep, end_user_id + ) + await self.repo.refresh_member_count(keep, end_user_id) + logger.info( + f"[Clustering] 社区合并: {dissolve} → {keep}," + f"迁移 {len(members)} 个成员" + ) + + async def _flush_labels( + self, labels: Dict[str, str], end_user_id: str + ) -> None: + """将内存中的标签批量写入 Neo4j。""" + # 先创建所有唯一社区节点 + unique_communities = set(labels.values()) + for cid in unique_communities: + await self.repo.upsert_community(cid, end_user_id) + + # 再批量分配实体 + for entity_id, community_id in labels.items(): + await self.repo.assign_entity_to_community( + entity_id, community_id, end_user_id + ) + + # 刷新成员数 + for cid in unique_communities: + await self.repo.refresh_member_count(cid, end_user_id) + + async def _get_entity_embedding( + self, entity_id: str, end_user_id: str + ) -> Optional[List[float]]: + """查询单个实体的 name_embedding。""" + try: + result = await self.connector.execute_query( + "MATCH (e:ExtractedEntity {id: $eid, end_user_id: $uid}) " + "RETURN e.name_embedding AS name_embedding", + eid=entity_id, + uid=end_user_id, + ) + return result[0]["name_embedding"] if result else None + except Exception: + return None + + @staticmethod + def _new_community_id() -> str: + return str(uuid.uuid4()) diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py new file mode 100644 index 00000000..16e30a10 --- /dev/null +++ b/api/app/repositories/neo4j/community_repository.py @@ -0,0 +1,129 @@ +"""Community 节点仓库 + +管理 Neo4j 中 Community 节点及 BELONGS_TO_COMMUNITY 边的 CRUD 操作。 +""" + +import logging +from typing import Dict, List, Optional + +from app.repositories.neo4j.neo4j_connector import Neo4jConnector +from app.repositories.neo4j.cypher_queries import ( + COMMUNITY_NODE_UPSERT, + ENTITY_JOIN_COMMUNITY, + ENTITY_LEAVE_ALL_COMMUNITIES, + GET_ENTITY_NEIGHBORS, + GET_ALL_ENTITIES_FOR_USER, + GET_COMMUNITY_MEMBERS, + CHECK_USER_HAS_COMMUNITIES, + UPDATE_COMMUNITY_MEMBER_COUNT, +) + +logger = logging.getLogger(__name__) + + +class CommunityRepository: + def __init__(self, connector: Neo4jConnector): + self.connector = connector + + async def upsert_community( + self, community_id: str, end_user_id: str, member_count: int = 0 + ) -> Optional[str]: + """创建或更新 Community 节点,返回 community_id。""" + try: + result = await self.connector.execute_query( + COMMUNITY_NODE_UPSERT, + community_id=community_id, + end_user_id=end_user_id, + member_count=member_count, + ) + return result[0]["community_id"] if result else None + except Exception as e: + logger.error(f"upsert_community failed: {e}") + return None + + async def assign_entity_to_community( + self, entity_id: str, community_id: str, end_user_id: str + ) -> bool: + """将实体关联到社区(先解除旧关联,再建立新关联)。""" + try: + await self.connector.execute_query( + ENTITY_LEAVE_ALL_COMMUNITIES, + entity_id=entity_id, + end_user_id=end_user_id, + ) + result = await self.connector.execute_query( + ENTITY_JOIN_COMMUNITY, + entity_id=entity_id, + community_id=community_id, + end_user_id=end_user_id, + ) + return bool(result) + except Exception as e: + logger.error(f"assign_entity_to_community failed: {e}") + return False + + async def get_entity_neighbors( + self, entity_id: str, end_user_id: str + ) -> List[Dict]: + """查询实体的直接邻居及其社区归属。""" + try: + return await self.connector.execute_query( + GET_ENTITY_NEIGHBORS, + entity_id=entity_id, + end_user_id=end_user_id, + ) + except Exception as e: + logger.error(f"get_entity_neighbors failed: {e}") + return [] + + async def get_all_entities(self, end_user_id: str) -> List[Dict]: + """拉取某用户下所有实体及其当前社区归属。""" + try: + return await self.connector.execute_query( + GET_ALL_ENTITIES_FOR_USER, + end_user_id=end_user_id, + ) + except Exception as e: + logger.error(f"get_all_entities failed: {e}") + return [] + + async def get_community_members( + self, community_id: str, end_user_id: str + ) -> List[Dict]: + """查询社区成员列表。""" + try: + return await self.connector.execute_query( + GET_COMMUNITY_MEMBERS, + community_id=community_id, + end_user_id=end_user_id, + ) + except Exception as e: + logger.error(f"get_community_members failed: {e}") + return [] + + async def has_communities(self, end_user_id: str) -> bool: + """检查该用户是否已有 Community 节点(用于判断全量 vs 增量)。""" + try: + result = await self.connector.execute_query( + CHECK_USER_HAS_COMMUNITIES, + end_user_id=end_user_id, + ) + return result[0]["community_count"] > 0 if result else False + except Exception as e: + logger.error(f"has_communities failed: {e}") + return False + + async def refresh_member_count( + self, community_id: str, end_user_id: str + ) -> int: + """重新统计并更新社区成员数,返回最新数量。""" + try: + result = await self.connector.execute_query( + UPDATE_COMMUNITY_MEMBER_COUNT, + community_id=community_id, + end_user_id=end_user_id, + ) + return result[0]["member_count"] if result else 0 + except Exception as e: + logger.error(f"refresh_member_count failed: {e}") + return 0 diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 651c513f..947097a2 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1058,4 +1058,95 @@ Graph_Node_query = """ 3 AS priority LIMIT $limit - """ \ No newline at end of file + """ + + +# ============================================================ +# Community 节点 & BELONGS_TO_COMMUNITY 边 +# ============================================================ + +COMMUNITY_NODE_SAVE = """ +MERGE (c:Community {community_id: $community_id}) +SET c.end_user_id = $end_user_id, + c.formed_at = $formed_at, + c.updated_at = datetime(), + c.status = $status, + c.member_count = $member_count +RETURN c.community_id AS community_id +""" + +COMMUNITY_ADD_MEMBER = """ +MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) +MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) +MERGE (e)-[:BELONGS_TO_COMMUNITY]->(c) +SET c.updated_at = datetime(), + c.member_count = $member_count +""" + + + +# ─── Community 聚类相关 Cypher 模板 ─────────────────────────────────────────── + +COMMUNITY_NODE_UPSERT = """ +MERGE (c:Community {community_id: $community_id}) +SET c.end_user_id = $end_user_id, + c.member_count = $member_count, + c.updated_at = datetime() +RETURN c.community_id AS community_id +""" + +ENTITY_JOIN_COMMUNITY = """ +MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) +MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) +MERGE (e)-[:BELONGS_TO_COMMUNITY]->(c) +SET c.updated_at = datetime() +RETURN e.id AS entity_id, c.community_id AS community_id +""" + +ENTITY_LEAVE_ALL_COMMUNITIES = """ +MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) +MATCH (e)-[r:BELONGS_TO_COMMUNITY]->(:Community) +DELETE r +""" + +GET_ENTITY_NEIGHBORS = """ +MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) +OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb:ExtractedEntity {end_user_id: $end_user_id}) +OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) +RETURN DISTINCT + nb.id AS id, + nb.name AS name, + nb.name_embedding AS name_embedding, + nb.activation_value AS activation_value, + CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id +""" + +GET_ALL_ENTITIES_FOR_USER = """ +MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) +OPTIONAL MATCH (e)-[:BELONGS_TO_COMMUNITY]->(c:Community) +RETURN e.id AS id, + e.name AS name, + e.name_embedding AS name_embedding, + e.activation_value AS activation_value, + CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id +""" + +GET_COMMUNITY_MEMBERS = """ +MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id}) +RETURN e.id AS id, e.name AS name, e.entity_type AS entity_type, + e.importance_score AS importance_score, e.activation_value AS activation_value, + e.name_embedding AS name_embedding +ORDER BY coalesce(e.activation_value, 0) DESC +""" + +CHECK_USER_HAS_COMMUNITIES = """ +MATCH (c:Community {end_user_id: $end_user_id}) +RETURN count(c) AS community_count +""" + +UPDATE_COMMUNITY_MEMBER_COUNT = """ +MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id}) +WITH c, count(e) AS cnt +SET c.member_count = cnt +RETURN c.community_id AS community_id, cnt AS member_count +""" diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index 526d16ec..a94bc23b 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -1,3 +1,4 @@ +import asyncio from typing import List # 使用新的仓储层 @@ -288,6 +289,14 @@ async def save_dialog_and_statements_to_neo4j( } logger.info("Transaction completed. Summary: %s", summary) logger.debug("Full transaction results: %r", results) + + # 写入成功后,触发聚类 + if entity_nodes: + end_user_id = entity_nodes[0].end_user_id + new_entity_ids = [e.id for e in entity_nodes] + logger.info(f"[Clustering] 准备触发聚类,实体数: {len(new_entity_ids)}, end_user_id: {end_user_id}") + await _trigger_clustering(new_entity_ids, end_user_id) + return True except Exception as e: @@ -295,3 +304,28 @@ async def save_dialog_and_statements_to_neo4j( print(f"Neo4j integration error: {e}") print("Continuing without database storage...") return False + + +async def _trigger_clustering( + new_entity_ids: List[str], + end_user_id: str, +) -> None: + """ + 聚类触发函数,自动判断全量初始化还是增量更新。 + """ + connector = None + try: + from app.core.memory.storage_services.clustering_engine import LabelPropagationEngine + logger.info(f"[Clustering] 开始聚类,end_user_id={end_user_id}, 实体数={len(new_entity_ids)}") + connector = Neo4jConnector() + engine = LabelPropagationEngine(connector) + await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids) + logger.info(f"[Clustering] 聚类完成,end_user_id={end_user_id}") + except Exception as e: + logger.error(f"[Clustering] 聚类触发失败: {e}", exc_info=True) + finally: + if connector: + try: + await connector.close() + except Exception: + pass From 744ba31ba6072b1e3f9f9af26a87949d6a1d6313 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 11 Mar 2026 18:04:04 +0800 Subject: [PATCH 15/42] [changes] Initial stage of community integration --- .../clustering_engine/label_propagation.py | 94 ++++++++++++++----- .../neo4j/community_repository.py | 20 ++++ api/app/repositories/neo4j/cypher_queries.py | 48 +++++----- 3 files changed, 115 insertions(+), 47 deletions(-) diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index 80e238fd..cb6e5804 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -141,8 +141,18 @@ class LabelPropagationEngine: # 将最终标签写入 Neo4j await self._flush_labels(labels, end_user_id) + pre_merge_count = len(set(labels.values())) logger.info( - f"[Clustering] 全量聚类完成,共 {len(set(labels.values()))} 个社区," + f"[Clustering] 全量迭代完成,共 {pre_merge_count} 个社区," + f"{len(labels)} 个实体,开始后处理合并" + ) + + # 全量初始化后做一轮社区合并(基于 name_embedding 余弦相似度) + all_community_ids = list(set(labels.values())) + await self._evaluate_merge(all_community_ids, end_user_id) + + logger.info( + f"[Clustering] 全量聚类完成,合并前 {pre_merge_count} 个社区," f"{len(labels)} 个实体" ) @@ -221,30 +231,50 @@ class LabelPropagationEngine: 策略:计算各社区成员 embedding 的平均向量,若两两余弦相似度 > 0.75 则合并。 合并时保留成员数最多的社区,其余成员迁移过来。 + + 全量场景(社区数 > 20)使用批量查询,避免 N 次数据库往返。 """ MERGE_THRESHOLD = 0.75 + BATCH_THRESHOLD = 20 # 超过此数量走批量查询 community_embeddings: Dict[str, Optional[List[float]]] = {} community_sizes: Dict[str, int] = {} - for cid in community_ids: - members = await self.repo.get_community_members(cid, end_user_id) - community_sizes[cid] = len(members) - # 计算社区成员 embedding 的平均向量 - valid_embeddings = [ - m["name_embedding"] - for m in members - if m.get("name_embedding") - ] - if valid_embeddings: - dim = len(valid_embeddings[0]) - avg = [ - sum(e[i] for e in valid_embeddings) / len(valid_embeddings) - for i in range(dim) + if len(community_ids) > BATCH_THRESHOLD: + # 批量查询:一次拉取所有社区成员 + all_members = await self.repo.get_all_community_members_batch( + community_ids, end_user_id + ) + for cid in community_ids: + members = all_members.get(cid, []) + community_sizes[cid] = len(members) + valid_embeddings = [ + m["name_embedding"] for m in members if m.get("name_embedding") ] - community_embeddings[cid] = avg - else: - community_embeddings[cid] = None + if valid_embeddings: + dim = len(valid_embeddings[0]) + community_embeddings[cid] = [ + sum(e[i] for e in valid_embeddings) / len(valid_embeddings) + for i in range(dim) + ] + else: + community_embeddings[cid] = None + else: + # 增量场景:逐个查询 + for cid in community_ids: + members = await self.repo.get_community_members(cid, end_user_id) + community_sizes[cid] = len(members) + valid_embeddings = [ + m["name_embedding"] for m in members if m.get("name_embedding") + ] + if valid_embeddings: + dim = len(valid_embeddings[0]) + community_embeddings[cid] = [ + sum(e[i] for e in valid_embeddings) / len(valid_embeddings) + for i in range(dim) + ] + else: + community_embeddings[cid] = None # 找出应合并的社区对 to_merge: List[tuple] = [] @@ -258,14 +288,32 @@ class LabelPropagationEngine: if sim > MERGE_THRESHOLD: to_merge.append((cids[i], cids[j])) + logger.info(f"[Clustering] 发现 {len(to_merge)} 对可合并社区") + + # 执行合并:用 union-find 思路避免重复迁移已被合并的社区 + # 维护一个 canonical 映射,确保链式合并正确收敛 + canonical: Dict[str, str] = {cid: cid for cid in cids} + + def find(x: str) -> str: + while canonical[x] != x: + canonical[x] = canonical[canonical[x]] + x = canonical[x] + return x + for c1, c2 in to_merge: - keep = c1 if community_sizes.get(c1, 0) >= community_sizes.get(c2, 0) else c2 - dissolve = c2 if keep == c1 else c1 + root1, root2 = find(c1), find(c2) + if root1 == root2: + continue # 已经在同一社区,跳过 + keep = root1 if community_sizes.get(root1, 0) >= community_sizes.get(root2, 0) else root2 + dissolve = root2 if keep == root1 else root1 + canonical[dissolve] = keep + members = await self.repo.get_community_members(dissolve, end_user_id) for m in members: - await self.repo.assign_entity_to_community( - m["id"], keep, end_user_id - ) + await self.repo.assign_entity_to_community(m["id"], keep, end_user_id) + # 更新 sizes 以便后续合并决策准确 + community_sizes[keep] = community_sizes.get(keep, 0) + len(members) + community_sizes[dissolve] = 0 await self.repo.refresh_member_count(keep, end_user_id) logger.info( f"[Clustering] 社区合并: {dissolve} → {keep}," diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py index 16e30a10..2a1f4f2b 100644 --- a/api/app/repositories/neo4j/community_repository.py +++ b/api/app/repositories/neo4j/community_repository.py @@ -14,6 +14,7 @@ from app.repositories.neo4j.cypher_queries import ( GET_ENTITY_NEIGHBORS, GET_ALL_ENTITIES_FOR_USER, GET_COMMUNITY_MEMBERS, + GET_ALL_COMMUNITY_MEMBERS_BATCH, CHECK_USER_HAS_COMMUNITIES, UPDATE_COMMUNITY_MEMBER_COUNT, ) @@ -101,6 +102,25 @@ class CommunityRepository: logger.error(f"get_community_members failed: {e}") return [] + async def get_all_community_members_batch( + self, community_ids: List[str], end_user_id: str + ) -> Dict[str, List[Dict]]: + """批量查询多个社区的成员,返回 {community_id: [members]} 字典。""" + try: + rows = await self.connector.execute_query( + GET_ALL_COMMUNITY_MEMBERS_BATCH, + community_ids=community_ids, + end_user_id=end_user_id, + ) + result: Dict[str, List[Dict]] = {} + for row in rows: + cid = row["community_id"] + result.setdefault(cid, []).append(row) + return result + except Exception as e: + logger.error(f"get_all_community_members_batch failed: {e}") + return {} + async def has_communities(self, end_user_id: str) -> bool: """检查该用户是否已有 Community 节点(用于判断全量 vs 增量)。""" try: diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 947097a2..84889d65 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1065,26 +1065,6 @@ Graph_Node_query = """ # Community 节点 & BELONGS_TO_COMMUNITY 边 # ============================================================ -COMMUNITY_NODE_SAVE = """ -MERGE (c:Community {community_id: $community_id}) -SET c.end_user_id = $end_user_id, - c.formed_at = $formed_at, - c.updated_at = datetime(), - c.status = $status, - c.member_count = $member_count -RETURN c.community_id AS community_id -""" - -COMMUNITY_ADD_MEMBER = """ -MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) -MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) -MERGE (e)-[:BELONGS_TO_COMMUNITY]->(c) -SET c.updated_at = datetime(), - c.member_count = $member_count -""" - - - # ─── Community 聚类相关 Cypher 模板 ─────────────────────────────────────────── COMMUNITY_NODE_UPSERT = """ @@ -1111,12 +1091,23 @@ DELETE r GET_ENTITY_NEIGHBORS = """ MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) -OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb:ExtractedEntity {end_user_id: $end_user_id}) + +// 来源一:直接关系邻居(EXTRACTED_RELATIONSHIP 边) +OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id}) + +// 来源二:同 Statement 共现邻居(REFERENCES_ENTITY 边) +OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) +OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id}) +WHERE nb2.id <> e.id + +WITH collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors +UNWIND all_neighbors AS nb +WITH nb WHERE nb IS NOT NULL OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN DISTINCT - nb.id AS id, - nb.name AS name, - nb.name_embedding AS name_embedding, + nb.id AS id, + nb.name AS name, + nb.name_embedding AS name_embedding, nb.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id """ @@ -1139,6 +1130,15 @@ RETURN e.id AS id, e.name AS name, e.entity_type AS entity_type, ORDER BY coalesce(e.activation_value, 0) DESC """ +GET_ALL_COMMUNITY_MEMBERS_BATCH = """ +MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community) +WHERE c.community_id IN $community_ids +RETURN c.community_id AS community_id, + e.id AS id, + e.name_embedding AS name_embedding, + e.activation_value AS activation_value +""" + CHECK_USER_HAS_COMMUNITIES = """ MATCH (c:Community {end_user_id: $end_user_id}) RETURN count(c) AS community_count From 6d8b1aede481b53b4f4fbe7b1c686dad532f59df Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 12 Mar 2026 20:27:50 +0800 Subject: [PATCH 16/42] [add] Create the attribute values of the community nodes --- .../core/memory/agent/utils/write_tools.py | 4 +- .../clustering_engine/label_propagation.py | 83 ++++++++++++++++++- .../neo4j/community_repository.py | 24 ++++++ api/app/repositories/neo4j/cypher_queries.py | 9 ++ api/app/repositories/neo4j/graph_saver.py | 20 +++-- 5 files changed, 132 insertions(+), 8 deletions(-) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 22030278..b3707083 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -165,7 +165,9 @@ async def write( statement_chunk_edges=all_statement_chunk_edges, statement_entity_edges=all_statement_entity_edges, entity_edges=all_entity_entity_edges, - connector=neo4j_connector + connector=neo4j_connector, + config_id=config_id, + llm_model_id=str(memory_config.llm_model_id) if memory_config.llm_model_id else None, ) if success: logger.info("Successfully saved all data to Neo4j") diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index cb6e5804..251d4fea 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -19,6 +19,8 @@ logger = logging.getLogger(__name__) # 全量迭代最大轮数,防止不收敛 MAX_ITERATIONS = 10 +# 社区摘要核心实体数量 +CORE_ENTITY_LIMIT = 5 def _cosine_similarity(v1: Optional[List[float]], v2: Optional[List[float]]) -> float: @@ -62,9 +64,16 @@ def _weighted_vote( class LabelPropagationEngine: """标签传播聚类引擎""" - def __init__(self, connector: Neo4jConnector): + def __init__( + self, + connector: Neo4jConnector, + config_id: Optional[str] = None, + llm_model_id: Optional[str] = None, + ): self.connector = connector self.repo = CommunityRepository(connector) + self.config_id = config_id + self.llm_model_id = llm_model_id # ────────────────────────────────────────────────────────────────────────── # 公开接口 @@ -155,6 +164,10 @@ class LabelPropagationEngine: f"[Clustering] 全量聚类完成,合并前 {pre_merge_count} 个社区," f"{len(labels)} 个实体" ) + # 为所有社区生成元数据 + unique_communities = list(set(labels.values())) + for cid in unique_communities: + await self._generate_community_metadata(cid, end_user_id) async def incremental_update( self, new_entity_ids: List[str], end_user_id: str @@ -211,6 +224,7 @@ class LabelPropagationEngine: logger.debug( f"[Clustering] 新实体 {entity_id} 与 {len(neighbors)} 个无社区邻居 → 新社区 {new_cid}" ) + await self._generate_community_metadata(new_cid, end_user_id) else: # 加入得票最多的社区 await self.repo.assign_entity_to_community(entity_id, target_cid, end_user_id) @@ -222,6 +236,7 @@ class LabelPropagationEngine: await self._evaluate_merge( list(community_ids_in_neighbors), end_user_id ) + await self._generate_community_metadata(target_cid, end_user_id) async def _evaluate_merge( self, community_ids: List[str], end_user_id: str @@ -354,6 +369,72 @@ class LabelPropagationEngine: except Exception: return None + async def _generate_community_metadata( + self, community_id: str, end_user_id: str + ) -> None: + """ + 为社区生成并写入元数据:名称、摘要、核心实体。 + + - core_entities:按 activation_value 排序取 top-N 实体名称列表(无需 LLM) + - name / summary:若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底 + """ + try: + members = await self.repo.get_community_members(community_id, end_user_id) + if not members: + return + + # 核心实体:按 activation_value 降序取 top-N + sorted_members = sorted( + members, + key=lambda m: m.get("activation_value") or 0, + reverse=True, + ) + core_entities = [m["name"] for m in sorted_members[:CORE_ENTITY_LIMIT] if m.get("name")] + all_names = [m["name"] for m in members if m.get("name")] + + name = "、".join(core_entities[:3]) if core_entities else community_id[:8] + summary = f"包含实体:{', '.join(all_names)}" + + # 若有 LLM 配置,调用 LLM 生成更好的名称和摘要 + if self.llm_model_id: + try: + from app.db import get_db_context + from app.core.memory.utils.llm.llm_utils import MemoryClientFactory + + entity_list_str = "、".join(all_names) + prompt = ( + f"以下是一组语义相关的实体:{entity_list_str}\n\n" + f"请为这组实体所代表的主题:\n" + f"1. 起一个简洁的中文名称(不超过10个字)\n" + f"2. 写一句话摘要(不超过50个字)\n\n" + f"严格按以下格式输出,不要有其他内容:\n" + f"名称:<名称>\n摘要:<摘要>" + ) + with get_db_context() as db: + factory = MemoryClientFactory(db) + llm_client = factory.get_llm_client(self.llm_model_id) + response = await llm_client.chat([{"role": "user", "content": prompt}]) + text = response.content if hasattr(response, "content") else str(response) + + for line in text.strip().splitlines(): + if line.startswith("名称:"): + name = line[3:].strip() + elif line.startswith("摘要:"): + summary = line[3:].strip() + except Exception as e: + logger.warning(f"[Clustering] LLM 生成社区元数据失败,使用兜底值: {e}") + + await self.repo.update_community_metadata( + community_id=community_id, + end_user_id=end_user_id, + name=name, + summary=summary, + core_entities=core_entities, + ) + logger.debug(f"[Clustering] 社区 {community_id} 元数据已更新: name={name}") + except Exception as e: + logger.error(f"[Clustering] _generate_community_metadata failed for {community_id}: {e}") + @staticmethod def _new_community_id() -> str: return str(uuid.uuid4()) diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py index 2a1f4f2b..6c5c7618 100644 --- a/api/app/repositories/neo4j/community_repository.py +++ b/api/app/repositories/neo4j/community_repository.py @@ -17,6 +17,7 @@ from app.repositories.neo4j.cypher_queries import ( GET_ALL_COMMUNITY_MEMBERS_BATCH, CHECK_USER_HAS_COMMUNITIES, UPDATE_COMMUNITY_MEMBER_COUNT, + UPDATE_COMMUNITY_METADATA, ) logger = logging.getLogger(__name__) @@ -147,3 +148,26 @@ class CommunityRepository: except Exception as e: logger.error(f"refresh_member_count failed: {e}") return 0 + + async def update_community_metadata( + self, + community_id: str, + end_user_id: str, + name: str, + summary: str, + core_entities: List[str], + ) -> bool: + """更新社区的名称、摘要和核心实体列表。""" + try: + result = await self.connector.execute_query( + UPDATE_COMMUNITY_METADATA, + community_id=community_id, + end_user_id=end_user_id, + name=name, + summary=summary, + core_entities=core_entities, + ) + return bool(result) + except Exception as e: + logger.error(f"update_community_metadata failed: {e}") + return False diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 84889d65..b270ed64 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1150,3 +1150,12 @@ WITH c, count(e) AS cnt SET c.member_count = cnt RETURN c.community_id AS community_id, cnt AS member_count """ + +UPDATE_COMMUNITY_METADATA = """ +MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) +SET c.name = $name, + c.summary = $summary, + c.core_entities = $core_entities, + c.updated_at = datetime() +RETURN c.community_id AS community_id +""" diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index a94bc23b..2ef9bafc 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -1,5 +1,6 @@ import asyncio -from typing import List +import os +from typing import List, Optional # 使用新的仓储层 from app.repositories.neo4j.neo4j_connector import Neo4jConnector @@ -156,7 +157,9 @@ async def save_dialog_and_statements_to_neo4j( entity_edges: List[EntityEntityEdge], statement_chunk_edges: List[StatementChunkEdge], statement_entity_edges: List[StatementEntityEdge], - connector: Neo4jConnector + connector: Neo4jConnector, + config_id: Optional[str] = None, + llm_model_id: Optional[str] = None, ) -> bool: """Save dialogue nodes, chunk nodes, statement nodes, entities, and all relationships to Neo4j using graph models. @@ -290,12 +293,15 @@ async def save_dialog_and_statements_to_neo4j( logger.info("Transaction completed. Summary: %s", summary) logger.debug("Full transaction results: %r", results) - # 写入成功后,触发聚类 - if entity_nodes: + # 写入成功后,触发聚类(可通过环境变量 CLUSTERING_ENABLED=false 禁用,用于基准测试对比) + clustering_enabled = os.getenv("CLUSTERING_ENABLED", "true").lower() != "false" + if entity_nodes and clustering_enabled: end_user_id = entity_nodes[0].end_user_id new_entity_ids = [e.id for e in entity_nodes] logger.info(f"[Clustering] 准备触发聚类,实体数: {len(new_entity_ids)}, end_user_id: {end_user_id}") - await _trigger_clustering(new_entity_ids, end_user_id) + asyncio.create_task(_trigger_clustering(new_entity_ids, end_user_id, config_id=config_id, llm_model_id=llm_model_id)) + elif entity_nodes and not clustering_enabled: + logger.info("[Clustering] 聚类已禁用(CLUSTERING_ENABLED=false),跳过聚类触发") return True @@ -309,6 +315,8 @@ async def save_dialog_and_statements_to_neo4j( async def _trigger_clustering( new_entity_ids: List[str], end_user_id: str, + config_id: Optional[str] = None, + llm_model_id: Optional[str] = None, ) -> None: """ 聚类触发函数,自动判断全量初始化还是增量更新。 @@ -318,7 +326,7 @@ async def _trigger_clustering( from app.core.memory.storage_services.clustering_engine import LabelPropagationEngine logger.info(f"[Clustering] 开始聚类,end_user_id={end_user_id}, 实体数={len(new_entity_ids)}") connector = Neo4jConnector() - engine = LabelPropagationEngine(connector) + engine = LabelPropagationEngine(connector, config_id=config_id, llm_model_id=llm_model_id) await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids) logger.info(f"[Clustering] 聚类完成,end_user_id={end_user_id}") except Exception as e: From 967139cea4e6527f5d823b18c49702b40b187a8b Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 12:59:36 +0800 Subject: [PATCH 17/42] [add] Community node interface development --- .../controllers/user_memory_controllers.py | 37 ++++ api/app/services/user_memory_service.py | 160 ++++++++++++++++++ 2 files changed, 197 insertions(+) diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index d3fe7d83..be796ff9 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -17,6 +17,7 @@ from app.services.user_memory_service import ( UserMemoryService, analytics_memory_types, analytics_graph_data, + analytics_community_graph_data, ) from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction from app.schemas.response_schema import ApiResponse @@ -295,6 +296,42 @@ async def get_graph_data_api( return fail(BizCode.INTERNAL_ERROR, "图数据查询失败", str(e)) +@router.get("/analytics/community_graph", response_model=ApiResponse) +async def get_community_graph_data_api( + end_user_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +) -> dict: + workspace_id = current_user.current_workspace_id + + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询社区图谱但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"社区图谱查询请求: end_user_id={end_user_id}, user={current_user.username}, " + f"workspace={workspace_id}" + ) + + try: + result = await analytics_community_graph_data(db=db, end_user_id=end_user_id) + + if "message" in result and result["statistics"]["total_nodes"] == 0: + api_logger.warning(f"社区图谱查询返回空结果: {result.get('message')}") + return success(data=result, msg=result.get("message", "查询成功")) + + api_logger.info( + f"成功获取社区图谱: end_user_id={end_user_id}, " + f"nodes={result['statistics']['total_nodes']}, " + f"edges={result['statistics']['total_edges']}" + ) + return success(data=result, msg="查询成功") + + except Exception as e: + api_logger.error(f"社区图谱查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "社区图谱查询失败", str(e)) + + @router.get("/read_end_user/profile", response_model=ApiResponse) async def get_end_user_profile( end_user_id: str, diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 8bacc112..d21df064 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -1727,6 +1727,166 @@ async def analytics_graph_data( # 辅助函数 +async def analytics_community_graph_data( + db: Session, + end_user_id: str, +) -> Dict[str, Any]: + """ + 获取社区图谱数据,包含 Community 节点、ExtractedEntity 节点及其关系。 + + Returns: + 包含 nodes、edges、statistics 的字典,格式与 analytics_graph_data 一致 + """ + try: + user_uuid = uuid.UUID(end_user_id) + repo = EndUserRepository(db) + end_user = repo.get_by_id(user_uuid) + if not end_user: + return { + "nodes": [], "edges": [], + "statistics": {"total_nodes": 0, "total_edges": 0, "node_types": {}, "edge_types": {}}, + "message": "用户不存在" + } + + # 查询社区节点、实体节点、BELONGS_TO_COMMUNITY 边、实体间关系 + cypher = """ + MATCH (c:Community {end_user_id: $end_user_id}) + MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[b:BELONGS_TO_COMMUNITY]->(c) + OPTIONAL MATCH (e)-[r:EXTRACTED_RELATIONSHIP]-(e2:ExtractedEntity {end_user_id: $end_user_id}) + RETURN + elementId(c) AS c_id, + properties(c) AS c_props, + elementId(e) AS e_id, + properties(e) AS e_props, + elementId(b) AS b_id, + elementId(e2) AS e2_id, + properties(e2) AS e2_props, + elementId(r) AS r_id, + type(r) AS r_type, + properties(r) AS r_props, + startNode(r) = e AS r_from_e + """ + rows = await _neo4j_connector.execute_query(cypher, end_user_id=end_user_id) + + nodes_map: Dict[str, dict] = {} + edges_map: Dict[str, dict] = {} + # 记录每个 Community 对应的实体 id 列表 + community_members: Dict[str, list] = {} + + for row in rows: + # Community 节点 + c_id = row["c_id"] + if c_id and c_id not in nodes_map: + raw = row["c_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "community_id", "end_user_id", "member_count", "updated_at", + "name", "summary", "core_entities", + ) if k in raw} + nodes_map[c_id] = { + "id": c_id, + "label": "Community", + "properties": props, + } + + # ExtractedEntity 节点 (e) + e_id = row["e_id"] + if e_id and e_id not in nodes_map: + raw = row["e_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "name", "end_user_id", "description", "created_at", "entity_type", + ) if k in raw} + # 注入所属社区名称(c 是 e 直接归属的社区) + c_raw = row["c_props"] or {} + props["community_name"] = _clean_neo4j_value(c_raw.get("name")) or "" + nodes_map[e_id] = { + "id": e_id, + "label": "ExtractedEntity", + "properties": props, + } + + # ExtractedEntity 节点 (e2,可选) + e2_id = row.get("e2_id") + if e2_id and e2_id not in nodes_map: + raw = row["e2_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "name", "end_user_id", "description", "created_at", "entity_type", + ) if k in raw} + # e2 的社区归属在后处理阶段通过 community_members 补充 + props["community_name"] = "" + nodes_map[e2_id] = { + "id": e2_id, + "label": "ExtractedEntity", + "properties": props, + } + + # BELONGS_TO_COMMUNITY 边 + b_id = row["b_id"] + if b_id and b_id not in edges_map: + edges_map[b_id] = { + "id": b_id, + "source": e_id, + "target": c_id, + } + # 收集社区成员 id + if c_id and e_id: + community_members.setdefault(c_id, []) + if e_id not in community_members[c_id]: + community_members[c_id].append(e_id) + + # EXTRACTED_RELATIONSHIP 边(可选) + r_id = row.get("r_id") + if r_id and r_id not in edges_map and e2_id: + r_props = {k: _clean_neo4j_value(v) for k, v in (row["r_props"] or {}).items()} + source = e_id if row.get("r_from_e") else e2_id + target = e2_id if row.get("r_from_e") else e_id + edges_map[r_id] = { + "id": r_id, + "source": source, + "target": target, + } + + nodes = list(nodes_map.values()) + edges = list(edges_map.values()) + + # 为每个 Community 节点注入 member_entity_ids,同时补全 e2 节点的 community_name + for c_id, member_ids in community_members.items(): + c_node = nodes_map.get(c_id) + if c_node: + c_node["properties"]["member_entity_ids"] = member_ids + c_name = c_node["properties"].get("name") or "" + # 补全属于该社区但 community_name 为空的实体(即 e2 节点) + for eid in member_ids: + e_node = nodes_map.get(eid) + if e_node and e_node["label"] == "ExtractedEntity": + if not e_node["properties"].get("community_name"): + e_node["properties"]["community_name"] = c_name + + node_type_counts: Dict[str, int] = {} + for n in nodes: + node_type_counts[n["label"]] = node_type_counts.get(n["label"], 0) + 1 + + return { + "nodes": nodes, + "edges": edges, + "statistics": { + "total_nodes": len(nodes), + "total_edges": len(edges), + "node_types": node_type_counts, + } + } + + except ValueError: + logger.error(f"无效的 end_user_id 格式: {end_user_id}") + return { + "nodes": [], "edges": [], + "statistics": {"total_nodes": 0, "total_edges": 0, "node_types": {}, "edge_types": {}}, + "message": "无效的用户ID格式" + } + except Exception as e: + logger.error(f"获取社区图谱数据失败: {str(e)}", exc_info=True) + raise + + async def _extract_node_properties(label: str, properties: Dict[str, Any],node_id: str) -> Dict[str, Any]: """ 根据节点类型提取需要的属性字段 From 668539e737c43f79330ca5652181ba5de16f1fa9 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 15:47:53 +0800 Subject: [PATCH 18/42] [add] Selective merge submission --- api/app/celery_app.py | 1 + .../memory_dashboard_controller.py | 14 +- .../clustering_engine/label_propagation.py | 70 +++++++-- api/app/tasks.py | 138 ++++++++++++++++++ 4 files changed, 206 insertions(+), 17 deletions(-) diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 60c22855..e77ed683 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -116,6 +116,7 @@ celery_app.conf.update( 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, 'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'}, 'app.tasks.init_interest_distribution_for_users': {'queue': 'periodic_tasks'}, + 'app.tasks.init_community_clustering_for_users': {'queue': 'periodic_tasks'}, }, ) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 3bbb5cf7..ce32a519 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -193,7 +193,19 @@ async def get_workspace_end_users( await aio_redis_set(cache_key, json.dumps(result), expire=30) except Exception as e: api_logger.warning(f"Redis 缓存写入失败: {str(e)}") - + + # 触发社区聚类补全任务(异步,不阻塞接口响应) + # 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类 + try: + from app.tasks import init_community_clustering_for_users + init_community_clustering_for_users.apply_async( + kwargs={"end_user_ids": end_user_ids}, + queue="periodic_tasks", + ) + api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}") + except Exception as e: + api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") + api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return success(data=result, msg="宿主列表获取成功") diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index 251d4fea..4491b416 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -165,8 +165,15 @@ class LabelPropagationEngine: f"{len(labels)} 个实体" ) # 为所有社区生成元数据 - unique_communities = list(set(labels.values())) - for cid in unique_communities: + # 注意:_evaluate_merge 后部分社区已被合并消解,需重新从 Neo4j 查询实际存活的社区 + # 不能复用 labels.values(),那里包含已被 dissolve 的旧社区 ID + surviving_communities = await self.repo.get_all_entities(end_user_id) + surviving_community_ids = list({ + e.get("community_id") for e in surviving_communities + if e.get("community_id") + }) + logger.info(f"[Clustering] 合并后实际存活社区数: {len(surviving_community_ids)}") + for cid in surviving_community_ids: await self._generate_community_metadata(cid, end_user_id) async def incremental_update( @@ -249,7 +256,7 @@ class LabelPropagationEngine: 全量场景(社区数 > 20)使用批量查询,避免 N 次数据库往返。 """ - MERGE_THRESHOLD = 0.75 + MERGE_THRESHOLD = 0.85 BATCH_THRESHOLD = 20 # 超过此数量走批量查询 community_embeddings: Dict[str, Optional[List[float]]] = {} @@ -305,34 +312,65 @@ class LabelPropagationEngine: logger.info(f"[Clustering] 发现 {len(to_merge)} 对可合并社区") - # 执行合并:用 union-find 思路避免重复迁移已被合并的社区 - # 维护一个 canonical 映射,确保链式合并正确收敛 - canonical: Dict[str, str] = {cid: cid for cid in cids} + # 执行合并:逐对处理,每次合并后重新计算合并社区的平均向量 + # 避免 union-find 链式传递导致语义不相关的社区被间接合并 + # (A≈B、B≈C 不代表 A≈C,不能因传递性把 A/B/C 全部合并) + merged_into: Dict[str, str] = {} # dissolve → keep 的最终映射 - def find(x: str) -> str: - while canonical[x] != x: - canonical[x] = canonical[canonical[x]] - x = canonical[x] + def get_root(x: str) -> str: + """路径压缩,找到 x 当前所属的根社区。""" + while x in merged_into: + merged_into[x] = merged_into.get(merged_into[x], merged_into[x]) + x = merged_into[x] return x for c1, c2 in to_merge: - root1, root2 = find(c1), find(c2) + root1, root2 = get_root(c1), get_root(c2) if root1 == root2: - continue # 已经在同一社区,跳过 + continue + + # 用合并后的最新平均向量重新验证相似度 + # 防止链式传递:A≈B 合并后 B 的向量已更新,C 必须和新 B 相似才能合并 + current_sim = _cosine_similarity( + community_embeddings.get(root1), + community_embeddings.get(root2), + ) + if current_sim <= MERGE_THRESHOLD: + # 合并后向量已漂移,不再满足阈值,跳过 + logger.debug( + f"[Clustering] 跳过合并 {root1} ↔ {root2}," + f"当前相似度 {current_sim:.3f} ≤ {MERGE_THRESHOLD}" + ) + continue + keep = root1 if community_sizes.get(root1, 0) >= community_sizes.get(root2, 0) else root2 dissolve = root2 if keep == root1 else root1 - canonical[dissolve] = keep + merged_into[dissolve] = keep members = await self.repo.get_community_members(dissolve, end_user_id) for m in members: await self.repo.assign_entity_to_community(m["id"], keep, end_user_id) - # 更新 sizes 以便后续合并决策准确 - community_sizes[keep] = community_sizes.get(keep, 0) + len(members) + + # 合并后重新计算 keep 的平均向量(加权平均) + keep_emb = community_embeddings.get(keep) + dissolve_emb = community_embeddings.get(dissolve) + keep_size = community_sizes.get(keep, 0) + dissolve_size = community_sizes.get(dissolve, 0) + total_size = keep_size + dissolve_size + if keep_emb and dissolve_emb and total_size > 0: + dim = len(keep_emb) + community_embeddings[keep] = [ + (keep_emb[i] * keep_size + dissolve_emb[i] * dissolve_size) / total_size + for i in range(dim) + ] + community_embeddings[dissolve] = None + + community_sizes[keep] = total_size community_sizes[dissolve] = 0 await self.repo.refresh_member_count(keep, end_user_id) logger.info( f"[Clustering] 社区合并: {dissolve} → {keep}," - f"迁移 {len(members)} 个成员" + f"相似度={current_sim:.3f},迁移 {len(members)} 个成员" ) async def _flush_labels( diff --git a/api/app/tasks.py b/api/app/tasks.py index 5e1550bd..f737bb48 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2662,3 +2662,141 @@ def write_perceptual_memory( file_url, file_message, )) + + +# ============================================================================= +# 社区聚类补全任务(触发型) +# ============================================================================= + +@celery_app.task( + name="app.tasks.init_community_clustering_for_users", + bind=True, + ignore_result=False, + max_retries=0, + acks_late=False, + time_limit=7200, # 2小时硬超时 + soft_time_limit=6900, +) +def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: + """触发型任务:检查指定用户列表,对有 ExtractedEntity 但无 Community 节点的用户执行全量聚类。 + + 由 /dashboard/end_users 接口触发,已有社区节点的用户直接跳过。 + + Args: + end_user_ids: 需要检查的用户 ID 列表 + + Returns: + 包含任务执行结果的字典 + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_logger + from app.repositories.neo4j.community_repository import CommunityRepository + from app.repositories.neo4j.neo4j_connector import Neo4jConnector + from app.core.memory.storage_services.clustering_engine.label_propagation import LabelPropagationEngine + + logger = get_logger(__name__) + logger.info(f"[CommunityCluster] 开始社区聚类补全任务,候选用户数: {len(end_user_ids)}") + + initialized = 0 + skipped = 0 + failed = 0 + + connector = Neo4jConnector() + try: + repo = CommunityRepository(connector) + + # 获取 llm_model_id(从第一个用户的配置中读取,作为全局兜底) + llm_model_id = None + try: + with get_db_context() as db: + from app.services.memory_agent_service import get_end_user_connected_config + from app.services.memory_config_service import MemoryConfigService + for uid in end_user_ids: + try: + connected = get_end_user_connected_config(uid, db) + config_id = connected.get("memory_config_id") + workspace_id = connected.get("workspace_id") + if config_id or workspace_id: + cfg = MemoryConfigService(db).load_memory_config( + config_id=config_id, workspace_id=workspace_id + ) + llm_model_id = str(cfg.llm_model_id) + break + except Exception: + continue + except Exception as e: + logger.warning(f"[CommunityCluster] 获取 LLM 配置失败,将使用兜底值: {e}") + + engine = LabelPropagationEngine( + connector=connector, + llm_model_id=llm_model_id, + ) + + for end_user_id in end_user_ids: + try: + # 已有社区节点则跳过 + has_communities = await repo.has_communities(end_user_id) + if has_communities: + skipped += 1 + logger.debug(f"[CommunityCluster] 用户 {end_user_id} 已有社区节点,跳过") + continue + + # 检查是否有 ExtractedEntity 节点 + entities = await repo.get_all_entities(end_user_id) + if not entities: + skipped += 1 + logger.debug(f"[CommunityCluster] 用户 {end_user_id} 无实体节点,跳过") + continue + + logger.info(f"[CommunityCluster] 用户 {end_user_id} 有 {len(entities)} 个实体,开始全量聚类") + await engine.full_clustering(end_user_id) + initialized += 1 + logger.info(f"[CommunityCluster] 用户 {end_user_id} 聚类完成") + + except Exception as e: + failed += 1 + logger.error(f"[CommunityCluster] 用户 {end_user_id} 聚类失败: {e}") + + finally: + await connector.close() + + logger.info( + f"[CommunityCluster] 任务完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}" + ) + return { + "status": "SUCCESS", + "initialized": initialized, + "skipped": skipped, + "failed": failed, + } + + try: + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + result["elapsed_time"] = time.time() - start_time + result["task_id"] = self.request.id + return result + + except Exception as e: + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": time.time() - start_time, + "task_id": self.request.id, + } From 5141a9104187f4c6b29ded29ed0ec9646a78ef0f Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 15:52:38 +0800 Subject: [PATCH 19/42] [changes] --- .../clustering_engine/label_propagation.py | 8 +++++- .../neo4j/community_repository.py | 21 ++++++++++++++++ api/app/repositories/neo4j/cypher_queries.py | 25 +++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index 4491b416..cbc303b1 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -121,12 +121,18 @@ class LabelPropagationEngine: e["id"]: e.get("name_embedding") for e in entities } + # 预加载所有实体的邻居,避免迭代内 O(iterations * |E|) 次 Neo4j 往返 + logger.info(f"[Clustering] 预加载 {len(entities)} 个实体的邻居图...") + neighbors_cache: Dict[str, List[Dict]] = await self.repo.get_all_entity_neighbors_batch(end_user_id) + logger.info(f"[Clustering] 邻居预加载完成,覆盖实体数: {len(neighbors_cache)}") + for iteration in range(MAX_ITERATIONS): changed = 0 # 随机顺序(Python dict 在 3.7+ 保持插入顺序,这里直接遍历) for entity in entities: eid = entity["id"] - neighbors = await self.repo.get_entity_neighbors(eid, end_user_id) + # 直接从缓存取邻居,不再发起 Neo4j 查询 + neighbors = neighbors_cache.get(eid, []) # 将邻居的当前内存标签注入(覆盖 Neo4j 中的旧值) enriched = [] diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py index 6c5c7618..f2f11f76 100644 --- a/api/app/repositories/neo4j/community_repository.py +++ b/api/app/repositories/neo4j/community_repository.py @@ -15,6 +15,7 @@ from app.repositories.neo4j.cypher_queries import ( GET_ALL_ENTITIES_FOR_USER, GET_COMMUNITY_MEMBERS, GET_ALL_COMMUNITY_MEMBERS_BATCH, + GET_ALL_ENTITY_NEIGHBORS_BATCH, CHECK_USER_HAS_COMMUNITIES, UPDATE_COMMUNITY_MEMBER_COUNT, UPDATE_COMMUNITY_METADATA, @@ -78,6 +79,26 @@ class CommunityRepository: logger.error(f"get_entity_neighbors failed: {e}") return [] + async def get_all_entity_neighbors_batch( + self, end_user_id: str + ) -> Dict[str, List[Dict]]: + """一次性批量拉取该用户下所有实体的邻居,返回 {entity_id: [neighbors]} 字典。 + 用于全量聚类预加载,避免每个实体单独查询。""" + try: + rows = await self.connector.execute_query( + GET_ALL_ENTITY_NEIGHBORS_BATCH, + end_user_id=end_user_id, + ) + result: Dict[str, List[Dict]] = {} + for row in rows: + eid = row["entity_id"] + neighbor = {k: v for k, v in row.items() if k != "entity_id"} + result.setdefault(eid, []).append(neighbor) + return result + except Exception as e: + logger.error(f"get_all_entity_neighbors_batch failed: {e}") + return {} + async def get_all_entities(self, end_user_id: str) -> List[Dict]: """拉取某用户下所有实体及其当前社区归属。""" try: diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index b270ed64..3487e55e 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1159,3 +1159,28 @@ SET c.name = $name, c.updated_at = datetime() RETURN c.community_id AS community_id """ + +GET_ALL_ENTITY_NEIGHBORS_BATCH = """ +// 批量拉取某用户下所有实体的邻居(用于全量聚类预加载) +MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) + +// 来源一:直接关系邻居 +OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id}) + +// 来源二:同 Statement 共现邻居 +OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) +OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id}) +WHERE nb2.id <> e.id + +WITH e, collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors +UNWIND all_neighbors AS nb +WITH e, nb WHERE nb IS NOT NULL +OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) +RETURN DISTINCT + e.id AS entity_id, + nb.id AS id, + nb.name AS name, + nb.name_embedding AS name_embedding, + nb.activation_value AS activation_value, + CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id +""" From c354618e2004407f875f46db53be6f97cecd1508 Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 13 Mar 2026 15:57:23 +0800 Subject: [PATCH 20/42] feat(app): add shared_only filter and batch unshare endpoints --- api/app/controllers/app_controller.py | 36 ++++++++++++ api/app/services/app_service.py | 80 ++++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index 5d1551d8..73ff7279 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -53,6 +53,7 @@ def list_apps( status: str | None = None, search: str | None = None, include_shared: bool = True, + shared_only: bool = False, page: int = 1, pagesize: int = 10, ids: Optional[str] = None, @@ -84,6 +85,7 @@ def list_apps( status=status, search=search, include_shared=include_shared, + shared_only=shared_only, page=page, pagesize=pagesize, ) @@ -107,6 +109,23 @@ def list_my_shared_out( return success(data=data) +@router.delete("/share/{target_workspace_id}", summary="取消对某工作空间的所有应用分享") +@cur_workspace_access_guard() +def unshare_all_apps_to_workspace( + target_workspace_id: uuid.UUID, + db: Session = Depends(get_db), + current_user=Depends(get_current_user), +): + """Cancel all app shares from current workspace to a target workspace.""" + workspace_id = current_user.current_workspace_id + service = app_service.AppService(db) + count = service.unshare_all_apps_to_workspace( + target_workspace_id=target_workspace_id, + workspace_id=workspace_id + ) + return success(msg=f"已取消 {count} 个应用的分享") + + @router.get("/{app_id}", summary="获取应用详情") @cur_workspace_access_guard() def get_app( @@ -397,6 +416,23 @@ def list_app_shares( return success(data=data) +@router.delete("/shared/{source_workspace_id}", summary="批量移除某来源工作空间的所有共享应用") +@cur_workspace_access_guard() +def remove_all_shared_apps_from_workspace( + source_workspace_id: uuid.UUID, + db: Session = Depends(get_db), + current_user=Depends(get_current_user), +): + """Remove all shared apps from a specific source workspace (recipient operation).""" + workspace_id = current_user.current_workspace_id + service = app_service.AppService(db) + count = service.remove_all_shared_apps_from_workspace( + source_workspace_id=source_workspace_id, + workspace_id=workspace_id + ) + return success(msg=f"已移除 {count} 个共享应用") + + @router.delete("/{app_id}/shared", summary="移除共享给我的应用") @cur_workspace_access_guard() def remove_shared_app( diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index 2326be6e..d83f88d5 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -12,7 +12,7 @@ import uuid from typing import Annotated, Any, Dict, List, Optional, Tuple from fastapi import Depends -from sqlalchemy import and_, func, or_, select +from sqlalchemy import and_, delete, func, or_, select from sqlalchemy.orm import Session from app.core.error_codes import BizCode @@ -878,6 +878,7 @@ class AppService: status: Optional[str] = None, search: Optional[str] = None, include_shared: bool = True, + shared_only: bool = False, page: int = 1, pagesize: int = 10, ) -> Tuple[List[App], int]: @@ -924,7 +925,14 @@ class AppService: filters.append(func.lower(App.name).like(f"%{search.lower()}%")) # 基础查询:本工作空间的应用 - if include_shared: + if shared_only: + # 只返回共享给本工作空间的应用,不含自有应用 + shared_app_ids_stmt = ( + select(AppShare.source_app_id) + .where(AppShare.target_workspace_id == workspace_id) + ) + stmt = select(App).where(App.id.in_(shared_app_ids_stmt)) + elif include_shared: # 查询本工作空间的应用 + 分享给本工作空间的应用 # 使用 OR 条件:workspace_id = current OR app_id IN (shared apps) @@ -1891,6 +1899,39 @@ class AppService: extra={"app_id": str(app_id), "target_workspace_id": str(target_workspace_id)} ) + def unshare_all_apps_to_workspace( + self, + *, + target_workspace_id: uuid.UUID, + workspace_id: uuid.UUID + ) -> int: + """Cancel all app shares from current workspace to a target workspace. + + Args: + target_workspace_id: Target workspace ID to cancel all shares to + workspace_id: Current workspace ID (source) + + Returns: + Number of share records deleted + """ + from app.models import AppShare + + logger.info( + "取消对目标工作空间的所有应用分享", + extra={"target_workspace_id": str(target_workspace_id), "workspace_id": str(workspace_id)} + ) + + stmt = delete(AppShare).where( + AppShare.source_workspace_id == workspace_id, + AppShare.target_workspace_id == target_workspace_id + ) + result = self.db.execute(stmt) + self.db.commit() + + count = result.rowcount + logger.info("已取消分享记录数", extra={"count": count}) + return count + def list_app_shares( self, *, @@ -1976,6 +2017,39 @@ class AppService: extra={"app_id": str(app_id), "workspace_id": str(workspace_id)} ) + def remove_all_shared_apps_from_workspace( + self, + *, + source_workspace_id: uuid.UUID, + workspace_id: uuid.UUID + ) -> int: + """Remove all shared apps from a specific source workspace. + + Args: + source_workspace_id: The workspace that shared the apps + workspace_id: Current workspace ID (recipient) + + Returns: + Number of share records deleted + """ + from app.models import AppShare + + logger.info( + "批量移除来源工作空间的共享应用", + extra={"source_workspace_id": str(source_workspace_id), "workspace_id": str(workspace_id)} + ) + + stmt = delete(AppShare).where( + AppShare.source_workspace_id == source_workspace_id, + AppShare.target_workspace_id == workspace_id + ) + result = self.db.execute(stmt) + self.db.commit() + + count = result.rowcount + logger.info("已移除共享记录数", extra={"count": count}) + return count + def list_my_shared_out( self, *, @@ -2139,6 +2213,7 @@ def list_apps( status: Optional[str] = None, search: Optional[str] = None, include_shared: bool = True, + shared_only: bool = False, page: int = 1, pagesize: int = 10, ) -> Tuple[List[App], int]: @@ -2151,6 +2226,7 @@ def list_apps( status=status, search=search, include_shared=include_shared, + shared_only=shared_only, page=page, pagesize=pagesize, ) From d4f2094ee01b7d795e57fd245b65d7e95cad9189 Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Fri, 13 Mar 2026 16:18:46 +0800 Subject: [PATCH 21/42] fix(mcp): The token configuration modification of MCP Market needs to be verified. --- .../mcp_market_config_controller.py | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/api/app/controllers/mcp_market_config_controller.py b/api/app/controllers/mcp_market_config_controller.py index 95449310..ab4c0768 100644 --- a/api/app/controllers/mcp_market_config_controller.py +++ b/api/app/controllers/mcp_market_config_controller.py @@ -59,7 +59,7 @@ async def get_mcp_servers( api_logger.warning(f"Paging parameters exceed ModelScope limit: page={page}, pagesize={pagesize}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail=f"page × pagesize must not exceed 100 (got {page} × {pagesize} = {page * pagesize})" + detail=f"The maximum number of MCP services can view is 100. Please visit the ModelScope MCP Plaza." ) # 2. Query mcp market config information from the database @@ -238,7 +238,26 @@ async def create_mcp_market_config( try: api_logger.debug(f"Start creating the mcp market config: {create_data.mcp_market_id}") - # 1. Check if the mcp market name already exists + # 1. Validate token can access ModelScope MCP market + if not create_data.token: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Token is required to access ModelScope MCP market" + ) + try: + api = MCPApi() + api.login(create_data.token) + body = {'filter': {}, 'page_number': 1, 'page_size': 1, 'search': None} + cookies = api.get_cookies(create_data.token) + r = api.session.put(url=api.mcp_base_url, headers=api.builder_headers(api.headers), json=body, cookies=cookies) + raise_for_http_status(r) + except Exception as e: + api_logger.warning(f"Token validation failed for ModelScope MCP market: {str(e)}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unable to access ModelScope MCP market with the provided token: {str(e)}" + ) + # 2. Check if the mcp market name already exists db_mcp_market_config_exist = mcp_market_config_service.get_mcp_market_config_by_mcp_market_id(db, mcp_market_id=create_data.mcp_market_id, current_user=current_user) if db_mcp_market_config_exist: api_logger.warning(f"The mcp market id already exists: {create_data.mcp_market_id}") @@ -332,7 +351,23 @@ async def update_mcp_market_config( f"The mcp market config does not exist or you do not have permission to access it: mcp_market_config_id={mcp_market_config_id}") return success(msg='The mcp market config does not exist or access is denied') - # 2. Update fields (only update non-null fields) + # 2. Validate new token if provided + if update_data.token is not None: + try: + api = MCPApi() + api.login(update_data.token) + body = {'filter': {}, 'page_number': 1, 'page_size': 1, 'search': None} + cookies = api.get_cookies(update_data.token) + r = api.session.put(url=api.mcp_base_url, headers=api.builder_headers(api.headers), json=body, cookies=cookies) + raise_for_http_status(r) + except Exception as e: + api_logger.warning(f"Token validation failed for ModelScope MCP market: {str(e)}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unable to access ModelScope MCP market with the provided token: {str(e)}" + ) + + # 3. Update fields (only update non-null fields) api_logger.debug(f"Start updating the mcp market config fields: {mcp_market_config_id}") update_dict = update_data.dict(exclude_unset=True) updated_fields = [] @@ -347,7 +382,7 @@ async def update_mcp_market_config( if updated_fields: api_logger.debug(f"updated fields: {', '.join(updated_fields)}") - # 3. Save to database + # 4. Save to database try: db.commit() db.refresh(db_mcp_market_config) From aab54ca1a8d6c21bfb99a341233f2c461a63a590 Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 13 Mar 2026 16:19:35 +0800 Subject: [PATCH 22/42] refactor(app): address AI review suggestions on sharing endpoints --- api/app/controllers/app_controller.py | 4 ++-- api/app/services/app_service.py | 28 +++++++++++++++++++-------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index 73ff7279..63f484ca 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -123,7 +123,7 @@ def unshare_all_apps_to_workspace( target_workspace_id=target_workspace_id, workspace_id=workspace_id ) - return success(msg=f"已取消 {count} 个应用的分享") + return success(msg=f"已取消 {count} 个应用的分享", data={"count": count}) @router.get("/{app_id}", summary="获取应用详情") @@ -430,7 +430,7 @@ def remove_all_shared_apps_from_workspace( source_workspace_id=source_workspace_id, workspace_id=workspace_id ) - return success(msg=f"已移除 {count} 个共享应用") + return success(msg=f"已移除 {count} 个共享应用", data={"count": count}) @router.delete("/{app_id}/shared", summary="移除共享给我的应用") diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index d83f88d5..ca9a3c4a 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -924,6 +924,10 @@ class AppService: if search: filters.append(func.lower(App.name).like(f"%{search.lower()}%")) + # shared_only implies include_shared; enforce to avoid confusing API usage + if shared_only: + include_shared = True + # 基础查询:本工作空间的应用 if shared_only: # 只返回共享给本工作空间的应用,不含自有应用 @@ -1921,14 +1925,18 @@ class AppService: extra={"target_workspace_id": str(target_workspace_id), "workspace_id": str(workspace_id)} ) - stmt = delete(AppShare).where( + # Query IDs first to get a reliable count, avoiding rowcount driver inconsistencies + id_stmt = select(AppShare.id).where( AppShare.source_workspace_id == workspace_id, AppShare.target_workspace_id == target_workspace_id ) - result = self.db.execute(stmt) - self.db.commit() + ids = list(self.db.scalars(id_stmt).all()) + count = len(ids) + + if ids: + self.db.execute(delete(AppShare).where(AppShare.id.in_(ids))) + self.db.commit() - count = result.rowcount logger.info("已取消分享记录数", extra={"count": count}) return count @@ -2039,14 +2047,18 @@ class AppService: extra={"source_workspace_id": str(source_workspace_id), "workspace_id": str(workspace_id)} ) - stmt = delete(AppShare).where( + # Query IDs first to get a reliable count, avoiding rowcount driver inconsistencies + id_stmt = select(AppShare.id).where( AppShare.source_workspace_id == source_workspace_id, AppShare.target_workspace_id == workspace_id ) - result = self.db.execute(stmt) - self.db.commit() + ids = list(self.db.scalars(id_stmt).all()) + count = len(ids) + + if ids: + self.db.execute(delete(AppShare).where(AppShare.id.in_(ids))) + self.db.commit() - count = result.rowcount logger.info("已移除共享记录数", extra={"count": count}) return count From 098a2e54ae5b379cd155ee00c35791605031dd7d Mon Sep 17 00:00:00 2001 From: yujiangping Date: Fri, 13 Mar 2026 16:41:55 +0800 Subject: [PATCH 23/42] fix:loading --- web/src/i18n/en.ts | 1 + web/src/i18n/zh.ts | 1 + web/src/views/ToolManagement/Market.tsx | 3 +- .../components/MarketConfigModal.tsx | 53 +++++++++++++------ 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/web/src/i18n/en.ts b/web/src/i18n/en.ts index b470ce07..ce357e74 100644 --- a/web/src/i18n/en.ts +++ b/web/src/i18n/en.ts @@ -1990,6 +1990,7 @@ Memory Bear: After the rebellion, regional warlordism intensified for several re marketUrlPlaceholder: 'Market URL', marketCopy: 'Copy', marketApiKeyOptional: 'Optional', + marketApiKeyRequired: 'API Key is required', marketApiKeyExtra: 'Some markets require an API Key to access the full service list', marketApiKeyPlaceholder: 'Enter API Key to access more services', marketConnectionStatus: 'Connection Status', diff --git a/web/src/i18n/zh.ts b/web/src/i18n/zh.ts index 1151014b..01b1ab9e 100644 --- a/web/src/i18n/zh.ts +++ b/web/src/i18n/zh.ts @@ -1986,6 +1986,7 @@ export const zh = { marketUrlPlaceholder: '市场地址', marketCopy: '复制', marketApiKeyOptional: '可选', + marketApiKeyRequired: '请输入 API Key', marketApiKeyExtra: '部分市场需要 API Key 才能获取完整的服务列表', marketApiKeyPlaceholder: '输入 API Key 以获取更多服务', marketConnectionStatus: '连接状态', diff --git a/web/src/views/ToolManagement/Market.tsx b/web/src/views/ToolManagement/Market.tsx index 351ae8b7..e6250e71 100644 --- a/web/src/views/ToolManagement/Market.tsx +++ b/web/src/views/ToolManagement/Market.tsx @@ -255,6 +255,7 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => if (!source) return; try { const config: any = await getMarketConfig(sourceId); + console.log('获取到的配置数据:', config); marketConfigModalRef.current?.handleOpen({ ...source, connected: config?.status === 1, @@ -431,7 +432,7 @@ const Market: React.FC<{ getStatusTag?: (status: string) => ReactNode }> = () => dataLength={mcpList.length} next={loadMore} hasMore={hasMore} - loader={} + loader={null} scrollableTarget="mcpScrollableDiv" >
(null); const [showApiKey, setShowApiKey] = useState(false); + const [initialValues, setInitialValues] = useState<{ token: string }>({ token: '' }); const formValues = Form.useWatch([], form); const handleClose = () => { @@ -45,16 +46,29 @@ const MarketConfigModal = forwardRef { + console.log('Modal 接收到的数据:', source); setCurrentSource(source); - form.setFieldsValue({ - token: source.token || '', - }); + setInitialValues({ token: source.token || '' }); setVisible(true); }; + const handleAfterOpenChange = (open: boolean) => { + if (open && currentSource) { + // Modal 完全打开后再设置表单值,使用 setTimeout 确保在下一个事件循环 + setTimeout(() => { + form.setFieldsValue({ + token: currentSource.token || '', + }); + console.log('Modal 打开后设置表单值:', { token: currentSource.token || '' }); + console.log('当前表单所有值:', form.getFieldsValue()); + }, 100); + } + }; + const handleSave = () => { form .validateFields() @@ -103,7 +117,7 @@ const MarketConfigModal = forwardRef 0; + const canSave = formValues?.token?.trim().length > 0; useImperativeHandle(ref, () => ({ handleOpen, @@ -117,6 +131,7 @@ const MarketConfigModal = forwardRef
@@ -177,19 +194,25 @@ const MarketConfigModal = forwardRef } + rules={[ + { required: true, message: t('tool.marketApiKeyRequired') }, + { whitespace: true, message: t('tool.marketApiKeyRequired') } + ]} extra={{t('tool.marketApiKeyExtra')}} > - - - +
+ } + type="info" + showIcon + /> +
+ ); + } + + if (!isPreviewable()) { return ( @@ -230,23 +273,26 @@ const DocumentPreview: FC = ({ message="预览失败" description={
-

无法加载文档预览,可能的原因:

-
    -
  • 文件需要认证访问,Office 预览服务无法访问
  • -
  • 文件 URL 无法公开访问(需要配置公开访问或临时签名 URL)
  • -
  • 文件大小超过限制(Office 预览通常限制 10MB)
  • -
  • 预览服务暂时不可用
  • +

    无法加载文档预览

    + {errorMessage && ( +

    + 错误详情:{errorMessage} +

    + )} +

    可能的原因:

    +
      +
    • 文件 URL 无法访问(401/403/404)
    • +
    • 认证 token 已过期
    • +
    • 文件格式损坏或不匹配
    • +
    • 网络连接问题
    -

    建议:请下载文件到本地查看

    - {showModeSwitch && !isPdfFile() && ( - - )} +
} @@ -256,26 +302,23 @@ const DocumentPreview: FC = ({ )} - {/* 图片文件预览 */} {isImageFile() && !error && !loading && (
{fileName setError(true)} + onError={() => handleError('图片加载失败')} />
)} - {/* Markdown 文件预览 */} {isMarkdownFile() && !error && !loading && (
)} - {/* 文本文件预览 */} {isTextFile() && !error && !loading && (
@@ -284,44 +327,52 @@ const DocumentPreview: FC = ({
         
)} - {/* PDF 文件预览(使用浏览器内置预览) */} + {isWordFile() && !error && !loading && ( +
+
+
+ )} + + {isExcelFile() && !error && !loading && ( +
+ {excelData.map((sheet, index) => ( +
+

{sheet.sheetName}

+ {sheet.data.length > 0 && ( + ({ key: idx, ...row }))} + columns={sheet.data[0]?.map((header: any, colIdx: number) => ({ + title: header || `列 ${colIdx + 1}`, + dataIndex: colIdx, + key: colIdx, + width: 150, + })) || []} + pagination={false} + scroll={{ x: 'max-content' }} + size="small" + bordered + /> + )} + + ))} + + )} + {isPdfFile() && !error && !loading && (