Files
MemoryBear/api/app/services/memory_reflection_service.py
Eternity b471d56a86 fix(prompt): remove hard-coded import of prompt file paths (#279)
* Fix/develop memory bug (#274)

* 遗漏的历史映射

* 遗漏的历史映射

* fix_timeline_memories

* fix(web): update retrieve_type key

* Fix/develop memory bug (#276)

* 遗漏的历史映射

* 遗漏的历史映射

* fix_timeline_memories

* fix_timeline_memories

* write_gragp/bug_fix

* write_gragp/bug_fix

* write_gragp/bug_fix

* chore(celery): disable periodic task scheduling

* fix(prompt): remove hard-coded import of prompt file paths

---------

Co-authored-by: lixinyue11 <94037597+lixinyue11@users.noreply.github.com>
Co-authored-by: zhaoying <yzhao96@best-inc.com>
Co-authored-by: yingzhao <zhaoyingyz@126.com>
Co-authored-by: Ke Sun <kesun5@illinois.edu>
2026-02-03 10:29:51 +08:00

465 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
记忆反思服务
处理反思引擎的调用和执行
"""
from datetime import datetime
from typing import Dict, Any, Optional, Set
from fastapi import Depends
from sqlalchemy.orm import Session
from sqlalchemy import text
from app.db import get_db
from app.core.logging_config import get_api_logger
from app.core.memory.storage_services.reflection_engine import ReflectionConfig, ReflectionEngine
from app.core.memory.storage_services.reflection_engine.self_reflexion import ReflectionRange, ReflectionBaseline
from app.repositories.memory_config_repository import MemoryConfigRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.models.app_model import App
from app.models.app_release_model import AppRelease
from app.models.end_user_model import EndUser
from app.utils.config_utils import resolve_config_id
api_logger = get_api_logger()
class WorkspaceAppService:
"""Workplace Application Service Class """
def __init__(self, db: Session):
self.db = db
def get_workspace_apps_detailed(self, workspace_id: str) -> Dict[str, Any]:
"""
Get detailed information of all applications in the workspace
Args:
Workspace_id: Workspace ID
Returns:
Dictionary containing detailed application information
"""
apps = self.db.query(App).filter(
App.workspace_id == workspace_id,
App.is_active.is_(True)
).all()
app_ids = [str(app.id) for app in apps]
apps_detailed_info = []
for app in apps:
app_info = self._build_app_info(app)
self._process_app_releases(app, app_info)
self._process_end_users(app, app_info)
apps_detailed_info.append(app_info)
return {
"status": "成功",
"message": f"成功查询到 {len(app_ids)} 个应用及其详细信息",
"workspace_id": str(workspace_id),
"apps_count": len(app_ids),
"app_ids": app_ids,
"apps_detailed_info": apps_detailed_info
}
def _build_app_info(self, app: App) -> Dict[str, Any]:
"""base_infomation"""
return {
"id": str(app.id),
"name": app.name,
"description": app.description,
"type": app.type,
"status": app.status,
"visibility": app.visibility,
"created_at": app.created_at.isoformat() if app.created_at else None,
"updated_at": app.updated_at.isoformat() if app.updated_at else None,
"releases": [],
"memory_configs": [],
"end_users": []
}
def _process_app_releases(self, app: App, app_info: Dict[str, Any]) -> None:
"""Process the release version and configuration information of the application"""
app_releases = self.db.query(AppRelease).filter(AppRelease.app_id == app.id).all()
if not app_releases:
return
processed_configs: Set[str] = set()
for release in app_releases:
memory_content = self._extract_memory_content(release.config)
if memory_content and memory_content in processed_configs:
continue
release_info = {
"app_id": str(release.app_id),
"config": memory_content
}
if memory_content:
processed_configs.add(memory_content)
memory_config_info = self._get_memory_config(memory_content)
if memory_config_info:
if not any(dc["config_id"] == memory_config_info["config_id"] for dc in app_info["memory_configs"]):
app_info["memory_configs"].append(memory_config_info)
app_info["releases"].append(release_info)
def _extract_memory_content(self, config: Any) -> str:
"""Extract memory_comtent from config"""
if not config or not isinstance(config, dict):
return None
memory_obj = config.get('memory')
if memory_obj and isinstance(memory_obj, dict):
return memory_obj.get('memory_content')
return None
def _get_memory_config(self, memory_content: str) -> Dict[str, Any]:
"""Retrieve memory_config information based on memory_content"""
try:
memory_content = resolve_config_id(memory_content, self.db)
memory_config_result = MemoryConfigRepository.query_reflection_config_by_id(self.db, (memory_content))
if memory_config_result:
return {
"config_id": memory_content,
"enable_self_reflexion": memory_config_result.enable_self_reflexion,
"iteration_period": memory_config_result.iteration_period,
"reflexion_range": memory_config_result.reflexion_range,
"baseline": memory_config_result.baseline,
"reflection_model_id": memory_config_result.reflection_model_id,
"memory_verify": memory_config_result.memory_verify,
"quality_assessment": memory_config_result.quality_assessment,
"user_id": memory_config_result.user_id
}
except Exception as e:
api_logger.warning(f"查询memory_config失败memory_content: {memory_content}, 错误: {str(e)}")
return None
def _process_end_users(self, app: App, app_info: Dict[str, Any]) -> None:
"""Processing end-user information for applications"""
end_users = self.db.query(EndUser).filter(EndUser.app_id == app.id).all()
for end_user in end_users:
end_user_info = {
"id": str(end_user.id),
"app_id": str(end_user.app_id)
}
app_info["end_users"].append(end_user_info)
print(100*'-')
print(app_info)
def get_end_user_reflection_time(self, end_user_id: str) -> Optional[Any]:
"""
Read the reflection time of end users
Args:
End_user_id: End User ID
Returns:
Reflection time or None
"""
try:
end_user = self.db.query(EndUser).filter(EndUser.id == end_user_id).first()
if end_user:
return end_user.reflection_time
return None
except Exception as e:
api_logger.error(f"读取用户反思时间失败end_user_id: {end_user_id}, 错误: {str(e)}")
return None
def update_end_user_reflection_time(self, end_user_id: str) -> bool:
"""
Update the reflection time of end users to the current time
Args:
End_user_id: End User ID
Returns:
Is the update successful
"""
try:
from datetime import datetime
end_user = self.db.query(EndUser).filter(EndUser.id == end_user_id).first()
if end_user:
end_user.reflection_time = datetime.now()
self.db.commit()
api_logger.info(f"成功更新用户反思时间end_user_id: {end_user_id}")
return True
else:
api_logger.warning(f"未找到用户end_user_id: {end_user_id}")
return False
except Exception as e:
api_logger.error(f"更新用户反思时间失败end_user_id: {end_user_id}, 错误: {str(e)}")
self.db.rollback()
return False
class MemoryReflectionService:
"""Memory reflection service category"""
def __init__(self,db: Session = Depends(get_db)):
self.db=db
async def start_text_reflection(self, config_data: Dict[str, Any], end_user_id: str) -> Dict[str, Any]:
try:
config_id = config_data.get("config_id")
api_logger.info(f"从配置数据启动反思config_id: {config_id}, end_user_id: {end_user_id}")
if not config_data.get("enable_self_reflexion", False):
return {
"status": "跳过",
"message": "反思引擎未启用",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data
}
config_data_id = config_data['config_id']
reflection_config = WorkspaceAppService(self.db)._get_memory_config(config_data_id)
if reflection_config is not None and reflection_config['enable_self_reflexion']:
reflection_config = self._create_reflection_config_from_data(reflection_config)
# 3. 执行反思引擎
reflection_results = await self._execute_reflection_engine(
reflection_config, end_user_id
)
return {
"status": "完成",
"message": "反思引擎执行完成",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data,
"reflection_results": reflection_results
}
except Exception as e:
config_id = config_data.get("config_id", "unknown")
api_logger.error(f"启动反思失败config_id: {config_id}, end_user_id: {end_user_id}, 错误: {str(e)}")
return {
"status": "错误",
"message": f"启动反思失败: {str(e)}",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data
}
async def start_reflection_from_data(self, config_data: Dict[str, Any], end_user_id: str) -> Dict[str, Any]:
"""
Starting Reflection from Configuration Data
Args:
config_data: Configure data dictionary, including reflective configuration information
end_user_id: end_user_id
Returns:
Reflect on the execution results
"""
try:
config_id = config_data.get("config_id")
api_logger.info(f"从配置数据启动反思config_id: {config_id}, end_user_id: {end_user_id}")
if not config_data.get("enable_self_reflexion", False):
return {
"status": "跳过",
"message": "反思引擎未启用",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data
}
config_data_id=config_data['config_id']
reflection_config=WorkspaceAppService(self.db)._get_memory_config(config_data_id)
if reflection_config is not None and reflection_config['enable_self_reflexion']:
reflection_config= self._create_reflection_config_from_data(reflection_config)
iteration_period = int(reflection_config.iteration_period)
workspace_service = WorkspaceAppService(self.db)
current_reflection_time = workspace_service.get_end_user_reflection_time(end_user_id)
# 检查是否需要执行反思
should_execute = False
hours_diff = 0
if current_reflection_time is None:
# 首次执行反思
should_execute = True
api_logger.info(f"首次执行反思end_user_id: {end_user_id}")
else:
# 计算时间差
try:
if isinstance(current_reflection_time, str):
reflection_time = datetime.fromisoformat(current_reflection_time)
else:
reflection_time = current_reflection_time
current_time = datetime.now()
time_diff = current_time - reflection_time
hours_diff = int(time_diff.total_seconds() / 3600)
# 检查是否达到反思周期
if hours_diff >= iteration_period:
should_execute = True
api_logger.info(f"与上次的反思时间间隔为: {hours_diff} 小时,达到周期 {iteration_period} 小时")
else:
api_logger.info(f"与上次的反思时间间隔为: {hours_diff} 小时,未达到周期 {iteration_period} 小时")
except (ValueError, TypeError) as e:
api_logger.warning(f"解析反思时间失败: {e},将执行反思")
should_execute = True
if should_execute:
api_logger.info(f"与上次的反思时间间隔为: {hours_diff} 小时")
# 3. 执行反思引擎
reflection_results = await self._execute_reflection_engine(
reflection_config, end_user_id
)
# 更新反思时间为当前时间
update_success = workspace_service.update_end_user_reflection_time(end_user_id)
if update_success:
api_logger.info(f"成功更新用户 {end_user_id} 的反思时间")
else:
api_logger.error(f"更新用户 {end_user_id} 的反思时间失败")
return {
"status": "完成",
"message": "反思引擎执行完成",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data,
"reflection_results": reflection_results
}
else:
return {
"status": "等待中",
"message": f"反思引擎未开始执行,距离下次执行还需 {iteration_period - hours_diff} 小时",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data,
"hours_since_last_reflection": hours_diff,
"next_reflection_in_hours": iteration_period - hours_diff
}
except Exception as e:
config_id = config_data.get("config_id", "unknown")
api_logger.error(f"启动反思失败config_id: {config_id}, end_user_id: {end_user_id}, 错误: {str(e)}")
return {
"status": "错误",
"message": f"启动反思失败: {str(e)}",
"config_id": config_id,
"end_user_id": end_user_id,
"config_data": config_data
}
def _create_reflection_config_from_data(self, config_data: Dict[str, Any]) -> ReflectionConfig:
"""Create reflective configuration objects from configuration data"""
reflexion_range_value = config_data.get("reflexion_range")
if reflexion_range_value is None or reflexion_range_value == "":
reflexion_range_value = "partial"
reflexion_range = ReflectionRange(reflexion_range_value)
baseline_value = config_data.get("baseline")
if baseline_value is None or baseline_value == "":
baseline_value = "TIME"
baseline = ReflectionBaseline(baseline_value)
# iteration_period =
iteration_period = config_data.get("iteration_period", 24)
if isinstance(iteration_period, str):
try:
iteration_period = int(iteration_period)
except (ValueError, TypeError):
iteration_period = 24 # 默认24小时
return ReflectionConfig(
enabled=config_data.get("enable_self_reflexion", False),
iteration_period=str(iteration_period), # ReflectionConfig期望字符串
reflexion_range=reflexion_range,
baseline=baseline,
memory_verify=config_data.get("memory_verify", False),
quality_assessment=config_data.get("quality_assessment", False),
model_id=config_data.get("reflection_model_id", "")
)
async def _execute_reflection_engine(
self,
reflection_config: ReflectionConfig,
user_id: str
) -> Dict[str, Any]:
"""Execute Reflection Engine"""
try:
# 创建Neo4j连接器
connector = Neo4jConnector()
# 创建反思引擎
engine = ReflectionEngine(
config=reflection_config,
neo4j_connector=connector,
llm_client=reflection_config.model_id
)
# 执行反思
reflection_result = await engine.execute_reflection(user_id)
return {
"success": reflection_result.success,
"message": reflection_result.message,
"conflicts_found": reflection_result.conflicts_found,
"conflicts_resolved": reflection_result.conflicts_resolved,
"memories_updated": reflection_result.memories_updated,
"execution_time": reflection_result.execution_time,
"details": reflection_result.details
}
except Exception as e:
api_logger.error(f"反思引擎执行失败: {str(e)}")
return {
"success": False,
"message": f"反思引擎执行失败: {str(e)}",
"conflicts_found": 0,
"conflicts_resolved": 0,
"memories_updated": 0,
"execution_time": 0.0
}
class Memory_Reflection_Service:
"""Memory Reflection Service - Used for calling the/reflection interface"""
def __init__(self, db: Session):
self.db = db
self.reflection_service = MemoryReflectionService(db)
async def start_reflection(self, config_data: Dict[str, Any], end_user_id: str) -> Dict[str, Any]:
"""
Activate the reflection function
Args:
config_data: 配置数据,格式如下:
{
"config_id": 26,
"enable_self_reflexion": true,
"iteration_period": "6",
"reflexion_range": "partial",
"baseline": "TIME",
"reflection_model_id": "ea405fa6-c387-4d78-80ab-826d692301b3",
"memory_verify": true,
"quality_assessment": false,
"user_id": null
}
end_user_id: end_user_idexample "12a8b235-6eb1-4481-a53c-b77933b5c949"
Returns:
"""
api_logger.info(f"Memory_Reflection_Service启动反思config_id: {config_data.get('config_id')}, end_user_id: {end_user_id}")
# 调用核心反思服务
result = await self.reflection_service.start_reflection_from_data(config_data, end_user_id)
return result