""" Memory Storage Service Handles business logic for memory storage operations. """ import asyncio import json import os import time from datetime import datetime from typing import Any, AsyncGenerator, Dict, List, Optional from app.core.logging_config import get_config_logger, get_logger from app.core.memory.analytics.hot_memory_tags import ( get_hot_memory_tags, get_raw_tags_from_db, filter_tags_with_llm, ) from app.core.memory.analytics.recent_activity_stats import get_recent_activity_stats from app.models.user_model import User from app.repositories.memory_config_repository import MemoryConfigRepository from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.schemas.memory_config_schema import ConfigurationError from app.schemas.memory_storage_schema import ( ConfigKey, ConfigParamsCreate, ConfigParamsDelete, ConfigPilotRun, ConfigUpdate, ConfigUpdateExtracted, ) from app.services.memory_config_service import MemoryConfigService from app.utils.sse_utils import format_sse_message from dotenv import load_dotenv from sqlalchemy.orm import Session logger = get_logger(__name__) config_logger = get_config_logger() # Load environment variables for Neo4j connector load_dotenv() _neo4j_connector = Neo4jConnector() class MemoryStorageService: """Service for memory storage operations""" def __init__(self): logger.info("MemoryStorageService initialized") async def get_storage_info(self) -> dict: """ Example wrapper method - retrieves storage information Args: Returns: Storage information dictionary """ logger.info("Getting storage info ") # Empty wrapper - implement your logic here result = { "status": "active", "message": "This is an example wrapper" } return result class DataConfigService: # 数据配置服务类(PostgreSQL) """Service layer for config params CRUD. 使用 SQLAlchemy ORM 进行数据库操作。 """ def __init__(self, db: Session) -> None: """初始化服务 Args: db: SQLAlchemy 数据库会话 """ self.db = db @staticmethod def _convert_timestamps_to_format(data_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """将 created_at 和 updated_at 字段从 datetime 对象转换为 YYYYMMDDHHmmss 格式""" for item in data_list: for field in ['created_at', 'updated_at']: if field in item and item[field] is not None: value = item[field] dt = None # 处理不同类型的时间值 if hasattr(value, 'to_native'): # Neo4j DateTime 对象 dt = value.to_native() elif isinstance(value, datetime): # Python datetime 对象 dt = value elif isinstance(value, str): # 字符串格式 try: dt = datetime.fromisoformat(value.replace('Z', '+00:00')) except Exception: pass # 保持原值 # 转换为 YYYYMMDDHHmmss 格式 if dt: item[field] = dt.strftime('%Y%m%d%H%M%S') return data_list # --- Create --- def create(self, params: ConfigParamsCreate) -> Dict[str, Any]: # 创建配置参数(仅名称与描述) # 如果workspace_id存在且模型字段未全部指定,则自动获取 if params.workspace_id and not all([params.llm_id, params.embedding_id, params.rerank_id]): configs = self._get_workspace_configs(params.workspace_id) if configs is None: raise ValueError(f"工作空间不存在: workspace_id={params.workspace_id}") # 只在未指定时填充(允许手动覆盖) if not params.llm_id: params.llm_id = configs.get('llm') if not params.embedding_id: params.embedding_id = configs.get('embedding') if not params.rerank_id: params.rerank_id = configs.get('rerank') config = MemoryConfigRepository.create(self.db, params) self.db.commit() return {"affected": 1, "config_id": config.config_id} def _get_workspace_configs(self, workspace_id) -> Optional[Dict[str, Any]]: """获取工作空间模型配置(内部方法,便于测试)""" from app.db import SessionLocal from app.repositories.workspace_repository import get_workspace_models_configs db_session = SessionLocal() try: return get_workspace_models_configs(db_session, workspace_id) finally: db_session.close() # --- Delete --- def delete(self, key: ConfigParamsDelete) -> Dict[str, Any]: # 删除配置参数(按配置ID) success = MemoryConfigRepository.delete(self.db, key.config_id) if not success: raise ValueError("未找到配置") return {"affected": 1} # --- Update --- def update(self, update: ConfigUpdate) -> Dict[str, Any]: # 部分更新配置参数 config = MemoryConfigRepository.update(self.db, update) if not config: raise ValueError("未找到配置") return {"affected": 1} def update_extracted(self, update: ConfigUpdateExtracted) -> Dict[str, Any]: # 更新记忆萃取引擎配置参数 config = MemoryConfigRepository.update_extracted(self.db, update) if not config: raise ValueError("未找到配置") return {"affected": 1} # --- Forget config params --- # 遗忘引擎配置方法已迁移到 memory_forget_service.py # 使用新方法: MemoryForgetService.read_forgetting_config() 和 MemoryForgetService.update_forgetting_config() # --- Read --- def get_extracted(self, key: ConfigKey) -> Dict[str, Any]: # 获取萃取配置参数 result = MemoryConfigRepository.get_extracted_config(self.db, key.config_id) if not result: raise ValueError("未找到配置") return result # --- Read All --- def get_all(self, workspace_id = None) -> List[Dict[str, Any]]: # 获取所有配置参数 configs = MemoryConfigRepository.get_all(self.db, workspace_id) # 将 ORM 对象转换为字典列表 data_list = [] for config in configs: # 安全地转换 user_id 为 int config_id_old = None if config.config_id_old: try: config_id_old = int(config.config_id_old) except (ValueError, TypeError): config_id_old = None if config_id_old: memory_config=config_id_old else: memory_config=config.config_id config_dict = { "config_id": memory_config, "config_name": config.config_name, "config_desc": config.config_desc, "workspace_id": str(config.workspace_id) if config.workspace_id else None, "end_user_id": config.end_user_id, "config_id_old": config_id_old, "apply_id": config.apply_id, "llm_id": config.llm_id, "embedding_id": config.embedding_id, "rerank_id": config.rerank_id, "enable_llm_dedup_blockwise": config.enable_llm_dedup_blockwise, "enable_llm_disambiguation": config.enable_llm_disambiguation, "deep_retrieval": config.deep_retrieval, "t_type_strict": config.t_type_strict, "t_name_strict": config.t_name_strict, "t_overall": config.t_overall, "state": config.state, "chunker_strategy": config.chunker_strategy, "pruning_enabled": config.pruning_enabled, "pruning_scene": config.pruning_scene, "pruning_threshold": config.pruning_threshold, "enable_self_reflexion": config.enable_self_reflexion, "iteration_period": config.iteration_period, "reflexion_range": config.reflexion_range, "baseline": config.baseline, "statement_granularity": config.statement_granularity, "include_dialogue_context": config.include_dialogue_context, "max_context": config.max_context, "lambda_time": config.lambda_time, "lambda_mem": config.lambda_mem, "offset": config.offset, "created_at": config.created_at, "updated_at": config.updated_at, } data_list.append(config_dict) # 将 created_at 和 updated_at 转换为 YYYYMMDDHHmmss 格式 return self._convert_timestamps_to_format(data_list) async def pilot_run_stream(self, payload: ConfigPilotRun) -> AsyncGenerator[str, None]: """ 流式执行试运行,产生 SSE 格式的进度事件 Args: payload: 试运行配置和对话文本 Yields: SSE 格式的字符串,包含以下事件类型: - 各种阶段名称: 进度更新 (如 starting, knowledge_extraction_complete 等) - result: 最终结果 - error: 错误信息 - done: 完成标记 Raises: ValueError: 当配置无效或参数缺失时 RuntimeError: 当管线执行失败时 """ from pathlib import Path project_root = str(Path(__file__).resolve().parents[2]) try: # 发出初始进度事件 yield format_sse_message("starting", { "message": "开始试运行...", "time": int(time.time() * 1000) }) # 步骤 1: 配置加载和验证(数据库优先) payload_cid = str(getattr(payload, "config_id", "") or "").strip() cid: Optional[str] = payload_cid if payload_cid else None if not cid: raise ValueError("未提供 payload.config_id,禁止启动试运行") # 验证 dialogue_text 必须提供 dialogue_text = payload.dialogue_text.strip() if payload.dialogue_text else "" logger.info(f"[PILOT_RUN_STREAM] Received dialogue_text length: {len(dialogue_text)}, preview: {dialogue_text[:100]}") if not dialogue_text: raise ValueError("试运行模式必须提供 dialogue_text 参数") # Load configuration from database only using centralized manager try: config_service = MemoryConfigService(self.db) memory_config = config_service.load_memory_config( config_id=str(cid), service_name="MemoryStorageService.pilot_run_stream" ) logger.info(f"Configuration loaded successfully: {memory_config.config_name}") except ConfigurationError as e: raise RuntimeError(f"Configuration loading failed: {e}") # 步骤 2: 创建进度回调函数捕获管线进度 # 使用队列在回调和生成器之间传递进度事件 progress_queue: asyncio.Queue = asyncio.Queue() async def progress_callback(stage: str, message: str, data: Optional[Dict[str, Any]] = None) -> None: """ 进度回调函数,将进度事件放入队列 Args: stage: 阶段标识 message: 进度消息 data: 可选的结果数据(用于传递节点执行结果) """ await progress_queue.put((stage, message, data)) # 步骤 3: 在后台任务中执行管线 async def run_pipeline(): """在后台执行管线并捕获异常""" try: from app.services.pilot_run_service import run_pilot_extraction logger.info(f"[PILOT_RUN_STREAM] Calling run_pilot_extraction with dialogue_text length: {len(dialogue_text)}") await run_pilot_extraction( memory_config=memory_config, dialogue_text=dialogue_text, db=self.db, progress_callback=progress_callback, ) logger.info("[PILOT_RUN_STREAM] pipeline_main completed") # 标记管线完成 await progress_queue.put(("__PIPELINE_COMPLETE__", "", None)) except Exception as e: # 将异常放入队列 await progress_queue.put(("__PIPELINE_ERROR__", str(e), None)) # 启动后台任务 pipeline_task = asyncio.create_task(run_pipeline()) # 步骤 4: 从队列中读取进度事件并发出 while True: try: # 等待进度事件,设置超时以检测客户端断开 stage, message, data = await asyncio.wait_for( progress_queue.get(), timeout=0.5 ) # 检查特殊标记 if stage == "__PIPELINE_COMPLETE__": break elif stage == "__PIPELINE_ERROR__": raise RuntimeError(message) # 构建进度事件数据 progress_data = { "message": message, "time": int(time.time() * 1000) } # 如果有结果数据,添加到事件中 if data: progress_data["data"] = data # 发出进度事件,使用 stage 作为事件类型 yield format_sse_message(stage, progress_data) except TimeoutError: # 超时,继续等待(这允许检测客户端断开) continue # 等待管线任务完成 await pipeline_task # 步骤 5: 读取提取结果 from app.core.config import settings result_path = settings.get_memory_output_path("extracted_result.json") if not os.path.isfile(result_path): raise FileNotFoundError(f"试运行完成,但未找到提取结果文件: {result_path}") with open(result_path, "r", encoding="utf-8") as rf: extracted_result = json.load(rf) # 步骤 6: 发出结果事件 result_data = { "config_id": cid, "time_log": os.path.join(project_root, "logs", "time.log"), "extracted_result": extracted_result, } yield format_sse_message("result", result_data) # 步骤 7: 发出完成事件 yield format_sse_message("done", { "message": "试运行完成", "time": int(time.time() * 1000) }) except asyncio.CancelledError: # 客户端断开连接 logger.info("[PILOT_RUN_STREAM] Client disconnected during streaming") raise except Exception as e: # 发出错误事件 logger.error(f"[PILOT_RUN_STREAM] Error during streaming: {e}", exc_info=True) yield format_sse_message("error", { "code": 5000, "message": "试运行失败", "error": str(e), "time": int(time.time() * 1000) }) # -------------------- Neo4j Search & Analytics (fused from data_search_service.py) -------------------- # Ensure env for connector (e.g., NEO4J_PASSWORD) load_dotenv() _neo4j_connector = Neo4jConnector() async def search_dialogue(end_user_id: Optional[str] = None) -> Dict[str, Any]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_DIALOGUE, end_user_id=end_user_id, ) data = {"search_for": "dialogue", "num": result[0]["num"]} return data async def search_chunk(end_user_id: Optional[str] = None) -> Dict[str, Any]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_CHUNK, end_user_id=end_user_id, ) data = {"search_for": "chunk", "num": result[0]["num"]} return data async def search_statement(end_user_id: Optional[str] = None) -> Dict[str, Any]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_STATEMENT, end_user_id=end_user_id, ) data = {"search_for": "statement", "num": result[0]["num"]} return data async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_ENTITY, end_user_id=end_user_id, ) data = {"search_for": "entity", "num": result[0]["num"]} return data async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_ALL, end_user_id=end_user_id, ) # 检查结果是否为空或长度不足 if not result or len(result) < 4: data = { "total": 0, "counts": { "dialogue": 0, "chunk": 0, "statement": 0, "entity": 0, }, } return data data = { "total": result[-1]["Count"], "counts": { "dialogue": result[0]["Count"], "chunk": result[1]["Count"], "statement": result[2]["Count"], "entity": result[3]["Count"], }, } return data async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]: """统一知识库类型分布接口。 聚合 dialogue/chunk/statement/entity 四类计数,返回统一的分布结构,便于前端一次性消费。 """ result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_ALL, end_user_id=end_user_id, ) # 检查结果是否为空或长度不足 if not result or len(result) < 4: data = { "total": 0, "distribution": [ {"type": "dialogue", "count": 0}, {"type": "chunk", "count": 0}, {"type": "statement", "count": 0}, {"type": "entity", "count": 0}, ] } return data total = result[-1]["Count"] distribution = [ {"type": "dialogue", "count": result[0]["Count"]}, {"type": "chunk", "count": result[1]["Count"]}, {"type": "statement", "count": result[2]["Count"]}, {"type": "entity", "count": result[3]["Count"]}, ] data = {"total": total, "distribution": distribution} return data async def search_detials(end_user_id: Optional[str] = None) -> List[Dict[str, Any]]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_DETIALS, end_user_id=end_user_id, ) return result async def search_edges(end_user_id: Optional[str] = None) -> List[Dict[str, Any]]: result = await _neo4j_connector.execute_query( MemoryConfigRepository.SEARCH_FOR_EDGES, end_user_id=end_user_id, ) return result async def analytics_hot_memory_tags( db: Session, current_user: User, limit: int = 10 ) -> List[Dict[str, Any]]: """ 获取热门记忆标签,按数量排序并返回前N个 优化策略: 1. 先从所有用户收集原始标签(不调用LLM) 2. 聚合并合并相同标签的频率 3. 排序后取前N个 4. 只调用一次LLM进行筛选 """ workspace_id = current_user.current_workspace_id # 获取更多标签供LLM筛选(获取limit*4个标签) raw_limit = limit * 4 from app.services.memory_dashboard_service import get_workspace_end_users # 使用 asyncio.to_thread 避免阻塞事件循环 end_users = await asyncio.to_thread(get_workspace_end_users, db, workspace_id, current_user) if not end_users: return [] # 步骤1: 收集所有用户的原始标签(不调用LLM) connector = Neo4jConnector() try: all_raw_tags = [] for end_user in end_users: raw_tags = await get_raw_tags_from_db( connector, str(end_user.id), limit=raw_limit, by_user=False ) if raw_tags: all_raw_tags.extend(raw_tags) if not all_raw_tags: return [] # 步骤2: 聚合相同标签的频率 tag_frequency_map = {} for tag_name, frequency in all_raw_tags: if tag_name in tag_frequency_map: tag_frequency_map[tag_name] += frequency else: tag_frequency_map[tag_name] = frequency # 步骤3: 按频率降序排序,取前raw_limit个 sorted_tags = sorted( tag_frequency_map.items(), key=lambda x: x[1], reverse=True )[:raw_limit] if not sorted_tags: return [] # 步骤4: 只调用一次LLM进行筛选 tag_names = [tag for tag, _ in sorted_tags] # 使用第一个用户的end_user_id来获取LLM配置 # 因为同一工作空间下的用户应该使用相同的配置 first_end_user_id = str(end_users[0].id) filtered_tag_names = await filter_tags_with_llm(tag_names, first_end_user_id) # 步骤5: 根据LLM筛选结果构建最终列表(保留频率) final_tags = [] for tag, freq in sorted_tags: if tag in filtered_tag_names: final_tags.append((tag, freq)) # 步骤6: 只返回前limit个 top_tags = final_tags[:limit] return [{"name": t, "frequency": f} for t, f in top_tags] finally: await connector.close() async def analytics_recent_activity_stats() -> Dict[str, Any]: stats, _msg = get_recent_activity_stats() total = ( stats.get("chunk_count", 0) + stats.get("statements_count", 0) + stats.get("triplet_entities_count", 0) + stats.get("triplet_relations_count", 0) + stats.get("temporal_count", 0) ) # 精简:仅提供“最新一次活动多久前” latest_relative = None try: info = stats.get("log_path", "") idx = info.rfind("最新:") if idx != -1: latest_path = info[idx + 3 :].strip() if latest_path and os.path.exists(latest_path): import time diff = max(0.0, time.time() - os.path.getmtime(latest_path)) m = int(diff // 60) if m < 1: latest_relative = "刚刚" elif m < 60: latest_relative = f"{m}分钟前" else: h = int(m // 60) latest_relative = f"{h}小时前" if h < 24 else f"{int(h // 24)}天前" except Exception: pass data = {"total": total, "stats": stats, "latest_relative": latest_relative} return data