[add] Create community nodes
This commit is contained in:
129
api/app/repositories/neo4j/community_repository.py
Normal file
129
api/app/repositories/neo4j/community_repository.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Community 节点仓库
|
||||
|
||||
管理 Neo4j 中 Community 节点及 BELONGS_TO_COMMUNITY 边的 CRUD 操作。
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
|
||||
from app.repositories.neo4j.cypher_queries import (
|
||||
COMMUNITY_NODE_UPSERT,
|
||||
ENTITY_JOIN_COMMUNITY,
|
||||
ENTITY_LEAVE_ALL_COMMUNITIES,
|
||||
GET_ENTITY_NEIGHBORS,
|
||||
GET_ALL_ENTITIES_FOR_USER,
|
||||
GET_COMMUNITY_MEMBERS,
|
||||
CHECK_USER_HAS_COMMUNITIES,
|
||||
UPDATE_COMMUNITY_MEMBER_COUNT,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CommunityRepository:
|
||||
def __init__(self, connector: Neo4jConnector):
|
||||
self.connector = connector
|
||||
|
||||
async def upsert_community(
|
||||
self, community_id: str, end_user_id: str, member_count: int = 0
|
||||
) -> Optional[str]:
|
||||
"""创建或更新 Community 节点,返回 community_id。"""
|
||||
try:
|
||||
result = await self.connector.execute_query(
|
||||
COMMUNITY_NODE_UPSERT,
|
||||
community_id=community_id,
|
||||
end_user_id=end_user_id,
|
||||
member_count=member_count,
|
||||
)
|
||||
return result[0]["community_id"] if result else None
|
||||
except Exception as e:
|
||||
logger.error(f"upsert_community failed: {e}")
|
||||
return None
|
||||
|
||||
async def assign_entity_to_community(
|
||||
self, entity_id: str, community_id: str, end_user_id: str
|
||||
) -> bool:
|
||||
"""将实体关联到社区(先解除旧关联,再建立新关联)。"""
|
||||
try:
|
||||
await self.connector.execute_query(
|
||||
ENTITY_LEAVE_ALL_COMMUNITIES,
|
||||
entity_id=entity_id,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
result = await self.connector.execute_query(
|
||||
ENTITY_JOIN_COMMUNITY,
|
||||
entity_id=entity_id,
|
||||
community_id=community_id,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
return bool(result)
|
||||
except Exception as e:
|
||||
logger.error(f"assign_entity_to_community failed: {e}")
|
||||
return False
|
||||
|
||||
async def get_entity_neighbors(
|
||||
self, entity_id: str, end_user_id: str
|
||||
) -> List[Dict]:
|
||||
"""查询实体的直接邻居及其社区归属。"""
|
||||
try:
|
||||
return await self.connector.execute_query(
|
||||
GET_ENTITY_NEIGHBORS,
|
||||
entity_id=entity_id,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"get_entity_neighbors failed: {e}")
|
||||
return []
|
||||
|
||||
async def get_all_entities(self, end_user_id: str) -> List[Dict]:
|
||||
"""拉取某用户下所有实体及其当前社区归属。"""
|
||||
try:
|
||||
return await self.connector.execute_query(
|
||||
GET_ALL_ENTITIES_FOR_USER,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"get_all_entities failed: {e}")
|
||||
return []
|
||||
|
||||
async def get_community_members(
|
||||
self, community_id: str, end_user_id: str
|
||||
) -> List[Dict]:
|
||||
"""查询社区成员列表。"""
|
||||
try:
|
||||
return await self.connector.execute_query(
|
||||
GET_COMMUNITY_MEMBERS,
|
||||
community_id=community_id,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"get_community_members failed: {e}")
|
||||
return []
|
||||
|
||||
async def has_communities(self, end_user_id: str) -> bool:
|
||||
"""检查该用户是否已有 Community 节点(用于判断全量 vs 增量)。"""
|
||||
try:
|
||||
result = await self.connector.execute_query(
|
||||
CHECK_USER_HAS_COMMUNITIES,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
return result[0]["community_count"] > 0 if result else False
|
||||
except Exception as e:
|
||||
logger.error(f"has_communities failed: {e}")
|
||||
return False
|
||||
|
||||
async def refresh_member_count(
|
||||
self, community_id: str, end_user_id: str
|
||||
) -> int:
|
||||
"""重新统计并更新社区成员数,返回最新数量。"""
|
||||
try:
|
||||
result = await self.connector.execute_query(
|
||||
UPDATE_COMMUNITY_MEMBER_COUNT,
|
||||
community_id=community_id,
|
||||
end_user_id=end_user_id,
|
||||
)
|
||||
return result[0]["member_count"] if result else 0
|
||||
except Exception as e:
|
||||
logger.error(f"refresh_member_count failed: {e}")
|
||||
return 0
|
||||
Reference in New Issue
Block a user