Merge #9 into develop from fix/memory_reflection

新增反思功能(功能配置接口+反思celery后台检测反思的迭代周期)

* fix/memory_reflection: (24 commits squashed)

  - 新增反思功能(功能配置接口+反思celery后台检测反思的迭代周期)

  - 新增反思功能(功能配置接口+反思celery后台检测反思的迭代周期)

  - 新增反思功能(检测代码/规范化程序)

  - 新增反思功能(检测代码/规范化程序)

  - 新增反思功能(检测代码/规范化程序)

  - 新增反思功能(检测代码/规范化程序)

  - 新增反思功能(检测代码/规范化程序)

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

  - 反思优化

Signed-off-by: aliyun8644380055 <accounts_68c0f5d519f260d93ee2997e@mail.teambition.com>
Commented-by: aliyun8644380055 <accounts_68c0f5d519f260d93ee2997e@mail.teambition.com>
Commented-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
Reviewed-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
Merged-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>

CR-link: https://codeup.aliyun.com/redbearai/python/redbear-mem-open/change/9
This commit is contained in:
李新月
2025-12-19 08:04:12 +00:00
committed by 孙科
parent 8d810af1d0
commit 5c0d8b42f3
21 changed files with 2384 additions and 337 deletions

View File

@@ -16,48 +16,46 @@ import uuid
from app.models.data_config_model import DataConfig
from app.schemas.memory_storage_schema import (
ConfigParamsCreate,
ConfigParamsDelete,
ConfigUpdate,
ConfigUpdateExtracted,
ConfigUpdateForget,
ConfigKey,
)
from app.core.logging_config import get_db_logger
# 获取数据库专用日志器
db_logger = get_db_logger()
TABLE_NAME = "data_config"
class DataConfigRepository:
"""数据配置Repository
提供data_config表的数据访问方法包括
- SQLAlchemy ORM 数据库操作
- Neo4j Cypher查询常量
"""
# ==================== Neo4j Cypher 查询常量 ====================
# Dialogue count by group
SEARCH_FOR_DIALOGUE = """
MATCH (n:Dialogue) WHERE n.group_id = $group_id RETURN COUNT(n) AS num
"""
# Chunk count by group
SEARCH_FOR_CHUNK = """
MATCH (n:Chunk) WHERE n.group_id = $group_id RETURN COUNT(n) AS num
"""
# Statement count by group
SEARCH_FOR_STATEMENT = """
MATCH (n:Statement) WHERE n.group_id = $group_id RETURN COUNT(n) AS num
"""
# ExtractedEntity count by group
SEARCH_FOR_ENTITY = """
MATCH (n:ExtractedEntity) WHERE n.group_id = $group_id RETURN COUNT(n) AS num
"""
# All counts by label and total
SEARCH_FOR_ALL = """
OPTIONAL MATCH (n:Dialogue) WHERE n.group_id = $group_id RETURN 'Dialogue' AS Label, COUNT(n) AS Count
@@ -70,7 +68,7 @@ class DataConfigRepository:
UNION ALL
OPTIONAL MATCH (n) WHERE n.group_id = $group_id RETURN 'ALL' AS Label, COUNT(n) AS Count
"""
# Extracted entity details within group/app/user
SEARCH_FOR_DETIALS = """
MATCH (n:ExtractedEntity)
@@ -86,7 +84,7 @@ class DataConfigRepository:
n.user_id AS user_id,
n.id AS id
"""
# Edges between extracted entities within group/app/user
SEARCH_FOR_EDGES = """
MATCH (n:ExtractedEntity)-[r]->(m:ExtractedEntity)
@@ -102,7 +100,7 @@ class DataConfigRepository:
r.statement_id AS statement_id,
r.statement AS statement
"""
# Entity graph within group (source node, edge, target node)
SEARCH_FOR_ENTITY_GRAPH = """
MATCH (n:ExtractedEntity)-[r]->(m:ExtractedEntity)
@@ -135,22 +133,106 @@ class DataConfigRepository:
id: m.id
} AS targetNode
"""
# ==================== SQLAlchemy ORM 数据库操作方法 ====================
@staticmethod
def build_update_reflection(config_id: int, **kwargs) -> Tuple[str, Dict]:
"""构建反思配置更新语句SQLAlchemy text() 命名参数)
Args:
config_id: 配置ID
**kwargs: 反思配置参数
Returns:
Tuple[str, Dict]: (SQL查询字符串, 参数字典)
Raises:
ValueError: 没有字段需要更新时抛出
"""
db_logger.debug(f"构建反思配置更新语句: config_id={config_id}")
key_where = "config_id = :config_id"
set_fields: List[str] = []
params: Dict = {
"config_id": config_id,
}
# 反思配置字段映射
mapping = {
"enable_self_reflexion": "enable_self_reflexion",
"iteration_period": "iteration_period",
"reflexion_range": "reflexion_range",
"baseline": "baseline",
"reflection_model_id": "reflection_model_id",
"memory_verify": "memory_verify",
"quality_assessment": "quality_assessment",
}
for api_field, db_col in mapping.items():
if api_field in kwargs and kwargs[api_field] is not None:
set_fields.append(f"{db_col} = :{api_field}")
params[api_field] = kwargs[api_field]
if not set_fields:
raise ValueError("No fields to update")
set_fields.append("updated_at = timezone('Asia/Shanghai', now())")
query = f"UPDATE {TABLE_NAME} SET " + ", ".join(set_fields) + f" WHERE {key_where}"
return query, params
@staticmethod
def build_select_reflection(config_id: int) -> Tuple[str, Dict]:
"""构建反思配置查询语句通过config_id查询反思配置SQLAlchemy text() 命名参数)
Args:
config_id: 配置ID
Returns:
Tuple[str, Dict]: (SQL查询字符串, 参数字典)
"""
db_logger.debug(f"构建反思配置查询语句: config_id={config_id}")
query = (
f"SELECT config_id, enable_self_reflexion, iteration_period, reflexion_range, baseline, "
f"reflection_model_id, memory_verify, quality_assessment, user_id "
f"FROM {TABLE_NAME} WHERE config_id = :config_id"
)
params = {"config_id": config_id}
return query, params
@staticmethod
def build_select_all(workspace_id: uuid.UUID) -> Tuple[str, Dict]:
"""构建查询所有配置的语句SQLAlchemy text() 命名参数)
Args:
workspace_id: 工作空间ID
Returns:
Tuple[str, Dict]: (SQL查询字符串, 参数字典)
"""
db_logger.debug(f"构建查询所有配置语句: workspace_id={workspace_id}")
query = (
f"SELECT config_id, config_name, enable_self_reflexion, iteration_period, reflexion_range, baseline, "
f"reflection_model_id, memory_verify, quality_assessment, user_id, created_at, updated_at "
f"FROM {TABLE_NAME} WHERE workspace_id = :workspace_id ORDER BY updated_at DESC"
)
params = {"workspace_id": workspace_id}
return query, params
@staticmethod
def create(db: Session, params: ConfigParamsCreate) -> DataConfig:
"""创建数据配置
Args:
db: 数据库会话
params: 配置参数创建模型
Returns:
DataConfig: 创建的配置对象
"""
db_logger.debug(f"创建数据配置: config_name={params.config_name}, workspace_id={params.workspace_id}")
try:
db_config = DataConfig(
config_name=params.config_name,
@@ -162,37 +244,37 @@ class DataConfigRepository:
)
db.add(db_config)
db.flush() # 获取自增ID但不提交事务
db_logger.info(f"数据配置已添加到会话: {db_config.config_name} (ID: {db_config.config_id})")
return db_config
except Exception as e:
db.rollback()
db_logger.error(f"创建数据配置失败: {params.config_name} - {str(e)}")
raise
@staticmethod
def update(db: Session, update: ConfigUpdate) -> Optional[DataConfig]:
"""更新基础配置
Args:
db: 数据库会话
update: 配置更新模型
Returns:
Optional[DataConfig]: 更新后的配置对象不存在则返回None
Raises:
ValueError: 没有字段需要更新时抛出
"""
db_logger.debug(f"更新数据配置: config_id={update.config_id}")
try:
db_config = db.query(DataConfig).filter(DataConfig.config_id == update.config_id).first()
if not db_config:
db_logger.warning(f"数据配置不存在: config_id={update.config_id}")
return None
# 更新字段
has_update = False
if update.config_name is not None:
@@ -201,44 +283,44 @@ class DataConfigRepository:
if update.config_desc is not None:
db_config.config_desc = update.config_desc
has_update = True
if not has_update:
raise ValueError("No fields to update")
db.commit()
db.refresh(db_config)
db_logger.info(f"数据配置更新成功: {db_config.config_name} (ID: {update.config_id})")
return db_config
except Exception as e:
db.rollback()
db_logger.error(f"更新数据配置失败: config_id={update.config_id} - {str(e)}")
raise
@staticmethod
def update_extracted(db: Session, update: ConfigUpdateExtracted) -> Optional[DataConfig]:
"""更新记忆萃取引擎配置
Args:
db: 数据库会话
update: 萃取配置更新模型
Returns:
Optional[DataConfig]: 更新后的配置对象不存在则返回None
Raises:
ValueError: 没有字段需要更新时抛出
"""
db_logger.debug(f"更新萃取配置: config_id={update.config_id}")
try:
db_config = db.query(DataConfig).filter(DataConfig.config_id == update.config_id).first()
if not db_config:
db_logger.warning(f"数据配置不存在: config_id={update.config_id}")
return None
# 更新字段映射
field_mapping = {
# 模型选择
@@ -268,50 +350,50 @@ class DataConfigRepository:
"reflexion_range": "reflexion_range",
"baseline": "baseline",
}
has_update = False
for api_field, db_field in field_mapping.items():
value = getattr(update, api_field, None)
if value is not None:
setattr(db_config, db_field, value)
has_update = True
if not has_update:
raise ValueError("No fields to update")
db.commit()
db.refresh(db_config)
db_logger.info(f"萃取配置更新成功: config_id={update.config_id}")
return db_config
except Exception as e:
db.rollback()
db_logger.error(f"更新萃取配置失败: config_id={update.config_id} - {str(e)}")
raise
@staticmethod
def update_forget(db: Session, update: ConfigUpdateForget) -> Optional[DataConfig]:
"""更新遗忘引擎配置
Args:
db: 数据库会话
update: 遗忘配置更新模型
Returns:
Optional[DataConfig]: 更新后的配置对象不存在则返回None
Raises:
ValueError: 没有字段需要更新时抛出
"""
db_logger.debug(f"更新遗忘配置: config_id={update.config_id}")
try:
db_config = db.query(DataConfig).filter(DataConfig.config_id == update.config_id).first()
if not db_config:
db_logger.warning(f"数据配置不存在: config_id={update.config_id}")
return None
# 更新字段
has_update = False
if update.lambda_time is not None:
@@ -323,40 +405,40 @@ class DataConfigRepository:
if update.offset is not None:
db_config.offset = update.offset
has_update = True
if not has_update:
raise ValueError("No fields to update")
db.commit()
db.refresh(db_config)
db_logger.info(f"遗忘配置更新成功: config_id={update.config_id}")
return db_config
except Exception as e:
db.rollback()
db_logger.error(f"更新遗忘配置失败: config_id={update.config_id} - {str(e)}")
raise
@staticmethod
def get_extracted_config(db: Session, config_id: int) -> Optional[Dict]:
"""获取萃取配置,通过主键查询某条配置
Args:
db: 数据库会话
config_id: 配置ID
Returns:
Optional[Dict]: 萃取配置字典不存在则返回None
"""
db_logger.debug(f"查询萃取配置: config_id={config_id}")
try:
db_config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first()
if not db_config:
db_logger.debug(f"萃取配置不存在: config_id={config_id}")
return None
result = {
"llm_id": db_config.llm_id,
"embedding_id": db_config.embedding_id,
@@ -379,62 +461,62 @@ class DataConfigRepository:
"reflexion_range": db_config.reflexion_range,
"baseline": db_config.baseline,
}
db_logger.debug(f"萃取配置查询成功: config_id={config_id}")
return result
except Exception as e:
db_logger.error(f"查询萃取配置失败: config_id={config_id} - {str(e)}")
raise
@staticmethod
def get_forget_config(db: Session, config_id: int) -> Optional[Dict]:
"""获取遗忘配置,通过主键查询某条配置
Args:
db: 数据库会话
config_id: 配置ID
Returns:
Optional[Dict]: 遗忘配置字典不存在则返回None
"""
db_logger.debug(f"查询遗忘配置: config_id={config_id}")
try:
db_config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first()
if not db_config:
db_logger.debug(f"遗忘配置不存在: config_id={config_id}")
return None
result = {
"lambda_time": db_config.lambda_time,
"lambda_mem": db_config.lambda_mem,
"offset": db_config.offset,
}
db_logger.debug(f"遗忘配置查询成功: config_id={config_id}")
return result
except Exception as e:
db_logger.error(f"查询遗忘配置失败: config_id={config_id} - {str(e)}")
raise
@staticmethod
def get_by_id(db: Session, config_id: int) -> Optional[DataConfig]:
"""根据ID获取数据配置
Args:
db: 数据库会话
config_id: 配置ID
Returns:
Optional[DataConfig]: 配置对象不存在则返回None
"""
db_logger.debug(f"根据ID查询数据配置: config_id={config_id}")
try:
config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first()
if config:
db_logger.debug(f"数据配置查询成功: {config.config_name} (ID: {config_id})")
else:
@@ -443,60 +525,60 @@ class DataConfigRepository:
except Exception as e:
db_logger.error(f"根据ID查询数据配置失败: config_id={config_id} - {str(e)}")
raise
@staticmethod
def get_all(db: Session, workspace_id: Optional[uuid.UUID] = None) -> List[DataConfig]:
"""获取所有配置参数
Args:
db: 数据库会话
workspace_id: 工作空间ID用于过滤查询结果
Returns:
List[DataConfig]: 配置列表
"""
db_logger.debug(f"查询所有配置: workspace_id={workspace_id}")
try:
query = db.query(DataConfig)
if workspace_id:
query = query.filter(DataConfig.workspace_id == workspace_id)
configs = query.order_by(desc(DataConfig.updated_at)).all()
db_logger.debug(f"配置列表查询成功: 数量={len(configs)}")
return configs
except Exception as e:
db_logger.error(f"查询所有配置失败: workspace_id={workspace_id} - {str(e)}")
raise
@staticmethod
def delete(db: Session, config_id: int) -> bool:
"""删除数据配置
Args:
db: 数据库会话
config_id: 配置ID
Returns:
bool: 删除成功返回True配置不存在返回False
"""
db_logger.debug(f"删除数据配置: config_id={config_id}")
try:
db_config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first()
if not db_config:
db_logger.warning(f"数据配置不存在: config_id={config_id}")
return False
db.delete(db_config)
db.commit()
db_logger.info(f"数据配置删除成功: config_id={config_id}")
return True
except Exception as e:
db.rollback()
db_logger.error(f"删除数据配置失败: config_id={config_id} - {str(e)}")

View File

@@ -746,3 +746,57 @@ DETACH DELETE losing
RETURN count(losing) as deleted
"""
neo4j_statement_part = '''
MATCH (n:Statement)
WHERE n.group_id = "{}"
AND datetime(n.created_at) >= datetime() - duration('P3D')
RETURN
n.statement as statement_name,
n.id as statement_id,
n.created_at as statement_created_at
'''
neo4j_statement_all = '''
MATCH (n:Statement)
WHERE n.group_id = "{}"
RETURN
n.statement as statement_name,
n.id as statement_id
'''
neo4j_query_part = """
MATCH (n)-[r]-(m:ExtractedEntity)
WHERE n.group_id = "{}"
AND datetime(n.created_at) >= datetime() - duration('P3D')
WITH DISTINCT m
OPTIONAL MATCH (m)-[rel]-(other:ExtractedEntity)
RETURN
m.name as entity1_name,
m.description as description,
m.statement_id as statement_id,
m.created_at as created_at,
m.expired_at as expired_at,
CASE WHEN rel IS NULL THEN "NO_RELATIONSHIP" ELSE type(rel) END as relationship_type,
rel as relationship,
CASE WHEN other IS NULL THEN "ISOLATED_NODE" ELSE other.name END as entity2_name,
other as entity2
"""
neo4j_query_all = """
MATCH (n)-[r]-(m:ExtractedEntity)
WHERE n.group_id = "{}"
WITH DISTINCT m
OPTIONAL MATCH (m)-[rel]-(other:ExtractedEntity)
RETURN
m.name as entity1_name,
m.description as description,
m.statement_id as statement_id,
m.created_at as created_at,
m.expired_at as expired_at,
CASE WHEN rel IS NULL THEN "NO_RELATIONSHIP" ELSE type(rel) END as relationship_type,
rel as relationship,
CASE WHEN other IS NULL THEN "ISOLATED_NODE" ELSE other.name END as entity2_name,
other as entity2
"""

View File

@@ -0,0 +1,227 @@
from app.repositories import Neo4jConnector
neo4j_connector = Neo4jConnector()
async def update_neo4j_data(neo4j_dict_data, update_databases):
"""
Update Neo4j data based on query criteria and update parameters
Args:
neo4j_dict_data: find
update_databases: update
"""
try:
# 构建WHERE条件
where_conditions = []
params = {}
for key, value in neo4j_dict_data.items():
if value is not None:
param_name = f"param_{key}"
where_conditions.append(f"e.{key} = ${param_name}")
params[param_name] = value
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
# 构建SET条件
set_conditions = []
for key, value in update_databases.items():
if value is not None:
param_name = f"update_{key}"
set_conditions.append(f"e.{key} = ${param_name}")
params[param_name] = value
set_clause = ", ".join(set_conditions)
if not set_clause:
print("警告: 没有需要更新的字段")
return False
# 构建Cypher查询
cypher_query = f"""
MATCH (e:ExtractedEntity)
WHERE {where_clause}
SET {set_clause}
RETURN count(e) as updated_count, collect(e.name) as updated_names
"""
print(f"\n执行Cypher查询: {cypher_query}")
print(f"参数: {params}")
# 执行更新
result = await neo4j_connector.execute_query(cypher_query, **params)
if result:
updated_count = result[0].get('updated_count', 0)
updated_names = result[0].get('updated_names', [])
print(f"成功更新 {updated_count} 个节点")
if updated_names:
print(f"更新的实体名称: {updated_names}")
return updated_count > 0
else:
return False
except Exception as e:
print(f"更新过程中出现错误: {e}")
import traceback
traceback.print_exc()
return False
def map_field_names(data_dict):
mapped_dict = {}
has_name_field = False
# 第一遍检查是否有name相关字段
for key, value in data_dict.items():
if key in ['name', 'entity2.name', 'entity1.name']:
has_name_field = True
break
print(f"字段检查: has_name_field = {has_name_field}")
# 第二遍:根据规则映射和过滤字段
for key, value in data_dict.items():
if key == 'entity2.name' or key == 'entity2_name':
# 将 entity2.name 映射为 name
mapped_dict['name'] = value
print(f"字段名映射: {key} -> name")
elif key == 'entity1.name' or key == 'entity1_name':
# 将 entity1.name 映射为 name
mapped_dict['name'] = value
print(f"字段名映射: {key} -> name")
elif key == 'entity1.description':
# 将 entity1.description 映射为 description
mapped_dict['description'] = value
print(f"字段名映射: {key} -> description")
elif key == 'entity2.description':
# 将 entity2.description 映射为 description
mapped_dict['description'] = value
print(f"字段名映射: {key} -> description")
elif key == 'relationship_type':
# 跳过relationship_type字段
print(f"字段过滤: 跳过不需要的字段 '{key}'")
continue
elif key == 'entity1_name':
if has_name_field:
# 如果有name字段跳过entity1_name
print(f"字段过滤: 由于存在name字段跳过 '{key}'")
continue
else:
# 如果没有name字段保留entity1_name
mapped_dict[key] = value
print(f"字段保留: {key}")
elif key == 'entity2_name':
if has_name_field:
# 如果有name字段跳过entity2_name
print(f"字段过滤: 由于存在name字段跳过 '{key}'")
continue
else:
# 即使没有name字段也不使用entity2_name根据需求
print(f"字段过滤: 跳过不推荐的字段 '{key}'")
continue
elif '.' not in key:
# 不包含点号的其他字段直接保留
mapped_dict[key] = value
else:
# 其他包含点号的字段跳过并警告
print(f"警告: 跳过不支持的嵌套字段 '{key}'")
print(f"字段映射结果: {mapped_dict}")
return mapped_dict
async def neo4j_data(solved_data):
"""
Process the resolved data and update the Neo4j database
Args:
Solved_data: Solution Data List
Returns:
Int: Number of successfully updated records
"""
success_count = 0
for i in solved_data:
neo4j_dict_data = {}
update_databases = {}
results = i['results']
for data in results:
resolved = data.get('resolved')
if not resolved:
print("跳过resolved为None")
continue
try:
change_list = resolved.get('change', [])
except (AttributeError, TypeError):
change_list = []
if change_list == []:
print("跳过change_list为空")
continue
if change_list and len(change_list) > 0:
change = change_list[0]
print(f"change: {change}")
field_data = change.get('field', [])
print(f"field_data: {field_data}")
print(f"field_data type: {type(field_data)}")
# 字段名映射和过滤函数
# 处理field数据可能是字典或列表
if isinstance(field_data, dict):
# 如果是字典,映射字段名后更新
mapped_data = map_field_names(field_data)
update_databases.update(mapped_data)
elif isinstance(field_data, list):
# 如果是列表,遍历每个字典并更新
for field_item in field_data:
if isinstance(field_item, dict):
mapped_item = map_field_names(field_item)
update_databases.update(mapped_item)
else:
print(f"警告: field_item不是字典: {field_item}")
else:
print(f"警告: field_data类型不支持: {type(field_data)}")
if 'entity1_name' in data:
data['name'] = data.pop('entity1_name')
if 'entity2_name' in data:
data.pop('entity2_name', None)
resolved_memory = resolved.get('resolved_memory', {})
entity2 = None
if isinstance(resolved_memory, dict):
entity2 = resolved_memory.get('entity2')
if entity2 and isinstance(entity2, dict) and len(entity2) >= 5:
stat_id = resolved.get('original_memory_id')
# 安全地获取description
statement_id = None
if isinstance(resolved_memory, dict):
statement_id = resolved_memory.get('statement_id')
# 只有当neo4j_dict_data中还没有statement_id时才使用original_memory_id
if statement_id and 'id' not in neo4j_dict_data:
neo4j_dict_data['id'] = stat_id
neo4j_dict_data['statement_id'] = statement_id
else:
# 处理original_memory_id它可能是字符串或字典
try:
for key, value in resolved_memory.items():
if key == 'statement_id':
neo4j_dict_data['statement_id'] = value
if key == 'description':
neo4j_dict_data['description'] = value
except AttributeError:
neo4j_dict_data=[]
print(neo4j_dict_data)
print(update_databases)
if neo4j_dict_data!=[]:
await update_neo4j_data(neo4j_dict_data, update_databases)
success_count += 1
return success_count