Files
MemoryBear/api/app/repositories/neo4j/community_repository.py
2026-03-13 15:52:38 +08:00

195 lines
6.9 KiB
Python

"""Community 节点仓库
管理 Neo4j 中 Community 节点及 BELONGS_TO_COMMUNITY 边的 CRUD 操作。
"""
import logging
from typing import Dict, List, Optional
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.repositories.neo4j.cypher_queries import (
COMMUNITY_NODE_UPSERT,
ENTITY_JOIN_COMMUNITY,
ENTITY_LEAVE_ALL_COMMUNITIES,
GET_ENTITY_NEIGHBORS,
GET_ALL_ENTITIES_FOR_USER,
GET_COMMUNITY_MEMBERS,
GET_ALL_COMMUNITY_MEMBERS_BATCH,
GET_ALL_ENTITY_NEIGHBORS_BATCH,
CHECK_USER_HAS_COMMUNITIES,
UPDATE_COMMUNITY_MEMBER_COUNT,
UPDATE_COMMUNITY_METADATA,
)
logger = logging.getLogger(__name__)
class CommunityRepository:
def __init__(self, connector: Neo4jConnector):
self.connector = connector
async def upsert_community(
self, community_id: str, end_user_id: str, member_count: int = 0
) -> Optional[str]:
"""创建或更新 Community 节点,返回 community_id。"""
try:
result = await self.connector.execute_query(
COMMUNITY_NODE_UPSERT,
community_id=community_id,
end_user_id=end_user_id,
member_count=member_count,
)
return result[0]["community_id"] if result else None
except Exception as e:
logger.error(f"upsert_community failed: {e}")
return None
async def assign_entity_to_community(
self, entity_id: str, community_id: str, end_user_id: str
) -> bool:
"""将实体关联到社区(先解除旧关联,再建立新关联)。"""
try:
await self.connector.execute_query(
ENTITY_LEAVE_ALL_COMMUNITIES,
entity_id=entity_id,
end_user_id=end_user_id,
)
result = await self.connector.execute_query(
ENTITY_JOIN_COMMUNITY,
entity_id=entity_id,
community_id=community_id,
end_user_id=end_user_id,
)
return bool(result)
except Exception as e:
logger.error(f"assign_entity_to_community failed: {e}")
return False
async def get_entity_neighbors(
self, entity_id: str, end_user_id: str
) -> List[Dict]:
"""查询实体的直接邻居及其社区归属。"""
try:
return await self.connector.execute_query(
GET_ENTITY_NEIGHBORS,
entity_id=entity_id,
end_user_id=end_user_id,
)
except Exception as e:
logger.error(f"get_entity_neighbors failed: {e}")
return []
async def get_all_entity_neighbors_batch(
self, end_user_id: str
) -> Dict[str, List[Dict]]:
"""一次性批量拉取该用户下所有实体的邻居,返回 {entity_id: [neighbors]} 字典。
用于全量聚类预加载,避免每个实体单独查询。"""
try:
rows = await self.connector.execute_query(
GET_ALL_ENTITY_NEIGHBORS_BATCH,
end_user_id=end_user_id,
)
result: Dict[str, List[Dict]] = {}
for row in rows:
eid = row["entity_id"]
neighbor = {k: v for k, v in row.items() if k != "entity_id"}
result.setdefault(eid, []).append(neighbor)
return result
except Exception as e:
logger.error(f"get_all_entity_neighbors_batch failed: {e}")
return {}
async def get_all_entities(self, end_user_id: str) -> List[Dict]:
"""拉取某用户下所有实体及其当前社区归属。"""
try:
return await self.connector.execute_query(
GET_ALL_ENTITIES_FOR_USER,
end_user_id=end_user_id,
)
except Exception as e:
logger.error(f"get_all_entities failed: {e}")
return []
async def get_community_members(
self, community_id: str, end_user_id: str
) -> List[Dict]:
"""查询社区成员列表。"""
try:
return await self.connector.execute_query(
GET_COMMUNITY_MEMBERS,
community_id=community_id,
end_user_id=end_user_id,
)
except Exception as e:
logger.error(f"get_community_members failed: {e}")
return []
async def get_all_community_members_batch(
self, community_ids: List[str], end_user_id: str
) -> Dict[str, List[Dict]]:
"""批量查询多个社区的成员,返回 {community_id: [members]} 字典。"""
try:
rows = await self.connector.execute_query(
GET_ALL_COMMUNITY_MEMBERS_BATCH,
community_ids=community_ids,
end_user_id=end_user_id,
)
result: Dict[str, List[Dict]] = {}
for row in rows:
cid = row["community_id"]
result.setdefault(cid, []).append(row)
return result
except Exception as e:
logger.error(f"get_all_community_members_batch failed: {e}")
return {}
async def has_communities(self, end_user_id: str) -> bool:
"""检查该用户是否已有 Community 节点(用于判断全量 vs 增量)。"""
try:
result = await self.connector.execute_query(
CHECK_USER_HAS_COMMUNITIES,
end_user_id=end_user_id,
)
return result[0]["community_count"] > 0 if result else False
except Exception as e:
logger.error(f"has_communities failed: {e}")
return False
async def refresh_member_count(
self, community_id: str, end_user_id: str
) -> int:
"""重新统计并更新社区成员数,返回最新数量。"""
try:
result = await self.connector.execute_query(
UPDATE_COMMUNITY_MEMBER_COUNT,
community_id=community_id,
end_user_id=end_user_id,
)
return result[0]["member_count"] if result else 0
except Exception as e:
logger.error(f"refresh_member_count failed: {e}")
return 0
async def update_community_metadata(
self,
community_id: str,
end_user_id: str,
name: str,
summary: str,
core_entities: List[str],
) -> bool:
"""更新社区的名称、摘要和核心实体列表。"""
try:
result = await self.connector.execute_query(
UPDATE_COMMUNITY_METADATA,
community_id=community_id,
end_user_id=end_user_id,
name=name,
summary=summary,
core_entities=core_entities,
)
return bool(result)
except Exception as e:
logger.error(f"update_community_metadata failed: {e}")
return False