From f2d6fd7b085aef6e7ede111cd8f9a3946f887f8a Mon Sep 17 00:00:00 2001 From: lixinyue <2569494688@qq.com> Date: Thu, 22 Jan 2026 20:40:41 +0800 Subject: [PATCH] =?UTF-8?q?config=5Fid=E5=AD=97=E6=AE=B5=E6=94=B9=E6=88=90?= =?UTF-8?q?UUID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controllers/memory_agent_controller.py | 3 +- .../langgraph_graph/nodes/write_nodes.py | 42 +++++---- .../core/memory/agent/utils/get_dialogs.py | 59 ++++++------ .../core/memory/agent/utils/write_tools.py | 27 ++---- .../core/memory/llm_tools/chunker_client.py | 24 ++--- api/app/services/memory_config_service.py | 92 ++++++++++++++----- api/app/tasks.py | 38 ++++++-- 7 files changed, 177 insertions(+), 108 deletions(-) diff --git a/api/app/controllers/memory_agent_controller.py b/api/app/controllers/memory_agent_controller.py index 3f3a513e..e9ae8bae 100644 --- a/api/app/controllers/memory_agent_controller.py +++ b/api/app/controllers/memory_agent_controller.py @@ -162,9 +162,10 @@ async def write_server( api_logger.info(f"Write service requested for group {user_input.end_user_id}, storage_type: {storage_type}, user_rag_memory_id: {user_rag_memory_id}") try: + messages_list = memory_agent_service.get_messages_list(user_input) result = await memory_agent_service.write_memory( user_input.end_user_id, - user_input.messages, + messages_list, config_id, db, storage_type, diff --git a/api/app/core/memory/agent/langgraph_graph/nodes/write_nodes.py b/api/app/core/memory/agent/langgraph_graph/nodes/write_nodes.py index 1dab1b0a..b85130ad 100644 --- a/api/app/core/memory/agent/langgraph_graph/nodes/write_nodes.py +++ b/api/app/core/memory/agent/langgraph_graph/nodes/write_nodes.py @@ -1,44 +1,54 @@ - -from app.core.memory.agent.utils.llm_tools import WriteState +from app.core.memory.agent.utils.llm_tools import WriteState from app.core.memory.agent.utils.write_tools import write from app.core.logging_config import get_agent_logger logger = get_agent_logger(__name__) + + async def write_node(state: WriteState) -> WriteState: """ Write data to the database/file system. Args: - content: Data content to write - end_user_id: End user identifier - memory_config: MemoryConfig object containing all configuration + state: WriteState containing messages, end_user_id, and memory_config Returns: - dict: Contains 'status', 'saved_to', and 'data' fields + dict: Contains 'write_result' with status and data fields """ - content=state.get('data','') - end_user_id=state.get('end_user_id','') - memory_config=state.get('memory_config', '') + messages = state.get('messages', []) + end_user_id = state.get('end_user_id', '') + memory_config = state.get('memory_config', '') + + # Convert LangChain messages to structured format expected by write() + structured_messages = [] + for msg in messages: + if hasattr(msg, 'type') and hasattr(msg, 'content'): + # Map LangChain message types to role names + role = 'user' if msg.type == 'human' else 'assistant' if msg.type == 'ai' else msg.type + structured_messages.append({ + "role": role, + "content": msg.content # content is now guaranteed to be a string + }) + try: - result=await write( + result = await write( + messages=structured_messages, end_user_id=end_user_id, memory_config=memory_config, - messages=content, # 修复:使用正确的参数名 messages ) logger.info(f"Write completed successfully! Config: {memory_config.config_name}") - write_result= { + write_result = { "status": "success", - "data": content, + "data": structured_messages, "config_id": memory_config.config_id, "config_name": memory_config.config_name, } - return {"write_result":write_result} - + return {"write_result": write_result} except Exception as e: logger.error(f"Data_write failed: {e}", exc_info=True) - write_result= { + write_result = { "status": "error", "message": str(e), } diff --git a/api/app/core/memory/agent/utils/get_dialogs.py b/api/app/core/memory/agent/utils/get_dialogs.py index 4751f18c..a56a32fa 100644 --- a/api/app/core/memory/agent/utils/get_dialogs.py +++ b/api/app/core/memory/agent/utils/get_dialogs.py @@ -10,55 +10,58 @@ from app.core.memory.models.message_models import DialogData, ConversationContex async def get_chunked_dialogs( chunker_strategy: str = "RecursiveChunker", end_user_id: str = "group_1", - content: str = "这是用户的输入", + messages: list = None, ref_id: str = "wyl_20251027", config_id: str = None ) -> List[DialogData]: - """Generate chunks from all test data entries using the specified chunker strategy. + """Generate chunks from structured messages using the specified chunker strategy. Args: chunker_strategy: The chunking strategy to use (default: RecursiveChunker) - end_user_id: End user identifier - content: Dialog content + group_id: Group identifier + messages: Structured message list [{"role": "user", "content": "..."}, ...] ref_id: Reference identifier config_id: Configuration ID for processing Returns: - List of DialogData objects with generated chunks for each test entry + List of DialogData objects with generated chunks """ - dialog_data_list = [] - messages = [] + from app.core.logging_config import get_agent_logger + logger = get_agent_logger(__name__) - messages.append(ConversationMessage(role="用户", msg=content)) + if not messages or not isinstance(messages, list) or len(messages) == 0: + raise ValueError("messages parameter must be a non-empty list") - # Create DialogData - conversation_context = ConversationContext(msgs=messages) - # Create DialogData with end_user_id + conversation_messages = [] + + for idx, msg in enumerate(messages): + if not isinstance(msg, dict) or 'role' not in msg or 'content' not in msg: + raise ValueError(f"Message {idx} format error: must contain 'role' and 'content' fields") + + role = msg['role'] + content = msg['content'] + + if role not in ['user', 'assistant']: + raise ValueError(f"Message {idx} role must be 'user' or 'assistant', got: {role}") + + if content.strip(): + conversation_messages.append(ConversationMessage(role=role, msg=content.strip())) + + if not conversation_messages: + raise ValueError("Message list cannot be empty after filtering") + + conversation_context = ConversationContext(msgs=conversation_messages) dialog_data = DialogData( context=conversation_context, ref_id=ref_id, end_user_id=end_user_id, config_id=config_id ) - # Create DialogueChunker and process the dialogue + chunker = DialogueChunker(chunker_strategy) extracted_chunks = await chunker.process_dialogue(dialog_data) dialog_data.chunks = extracted_chunks - dialog_data_list.append(dialog_data) + logger.info(f"DialogData created with {len(extracted_chunks)} chunks") - # Convert to dict with datetime serialized - def serialize_datetime(obj): - if isinstance(obj, datetime): - return obj.isoformat() - raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") - - combined_output = [dd.model_dump() for dd in dialog_data_list] - - print(dialog_data_list) - - # with open(os.path.join(os.path.dirname(__file__), "chunker_test_output.txt"), "w", encoding="utf-8") as f: - # json.dump(combined_output, f, ensure_ascii=False, indent=4, default=serialize_datetime) - - - return dialog_data_list + return [dialog_data] diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index b8bc58eb..d32d152c 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -36,9 +36,11 @@ async def write( ) -> None: """ Execute the complete knowledge extraction pipeline. - + Args: - end_user_id: End user identifier + user_id: User identifier + apply_id: Application identifier + group_id: Group identifier memory_config: MemoryConfig object containing all configuration messages: Structured message list [{"role": "user", "content": "..."}, ...] ref_id: Reference ID, defaults to "wyl20251027" @@ -47,14 +49,14 @@ async def write( embedding_model_id = str(memory_config.embedding_model_id) chunker_strategy = memory_config.chunker_strategy config_id = str(memory_config.config_id) - + logger.info("=== MemSci Knowledge Extraction Pipeline ===") logger.info(f"Config: {memory_config.config_name} (ID: {config_id})") logger.info(f"Workspace: {memory_config.workspace_name}") logger.info(f"LLM model: {memory_config.llm_model_name}") logger.info(f"Embedding model: {memory_config.embedding_model_name}") logger.info(f"Chunker strategy: {chunker_strategy}") - logger.info(f"End User ID: {end_user_id}") + logger.info(f"end_user_id ID: {end_user_id}") # Construct clients from memory_config using factory pattern with db session with get_db_context() as db: @@ -77,25 +79,10 @@ async def write( # Step 1: Load and chunk data step_start = time.time() - - # Convert messages list to content string - # messages format: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] - if isinstance(messages, list) and len(messages) > 0: - # Extract content from the last user message or concatenate all messages - if isinstance(messages[-1], dict) and 'content' in messages[-1]: - content = messages[-1]['content'] - else: - # Fallback: concatenate all message contents - content = " ".join([msg.get('content', '') for msg in messages if isinstance(msg, dict)]) - elif isinstance(messages, str): - content = messages - else: - content = str(messages) - chunked_dialogs = await get_chunked_dialogs( chunker_strategy=chunker_strategy, end_user_id=end_user_id, - content=content, # 修复:使用 content 参数而不是 messages + messages=messages, ref_id=ref_id, config_id=config_id, ) diff --git a/api/app/core/memory/llm_tools/chunker_client.py b/api/app/core/memory/llm_tools/chunker_client.py index 87cdb9f4..93a2df82 100644 --- a/api/app/core/memory/llm_tools/chunker_client.py +++ b/api/app/core/memory/llm_tools/chunker_client.py @@ -187,11 +187,11 @@ class ChunkerClient: async def generate_chunks(self, dialogue: DialogData): """ Generate chunks following 1 Message = 1 Chunk strategy. - + Each message creates one chunk, directly inheriting role information. If a message is too long, it will be split into multiple sub-chunks, each maintaining the same speaker. - + Raises: ValueError: If dialogue has no messages or chunking fails """ @@ -201,9 +201,9 @@ class ChunkerClient: f"Dialogue {dialogue.ref_id} has no messages. " f"Cannot generate chunks from empty dialogue." ) - + dialogue.chunks = [] - + # 按消息分块:每个消息创建一个或多个 chunk,直接继承角色 for msg_idx, msg in enumerate(dialogue.context.msgs): # Validate message has required attributes @@ -212,13 +212,13 @@ class ChunkerClient: f"Message {msg_idx} in dialogue {dialogue.ref_id} " f"missing 'role' or 'msg' attribute" ) - + msg_content = msg.msg.strip() - + # Skip empty messages if not msg_content: continue - + # 如果消息太长,可以进一步分块 if len(msg_content) > self.chunk_size: # 对单个消息的内容进行分块 @@ -228,14 +228,14 @@ class ChunkerClient: raise ValueError( f"Failed to chunk long message {msg_idx} in dialogue {dialogue.ref_id}: {e}" ) - + for idx, sub_chunk in enumerate(sub_chunks): sub_chunk_text = sub_chunk.text if hasattr(sub_chunk, 'text') else str(sub_chunk) sub_chunk_text = sub_chunk_text.strip() - + if len(sub_chunk_text) < (self.min_characters_per_chunk or 50): continue - + chunk = Chunk( content=f"{msg.role}: {sub_chunk_text}", speaker=msg.role, # 直接继承角色 @@ -260,7 +260,7 @@ class ChunkerClient: }, ) dialogue.chunks.append(chunk) - + # Validate we generated at least one chunk if not dialogue.chunks: raise ValueError( @@ -268,7 +268,7 @@ class ChunkerClient: f"All messages were either empty or too short. " f"Messages count: {len(dialogue.context.msgs)}" ) - + return dialogue def evaluate_chunking(self, dialogue: DialogData) -> dict: diff --git a/api/app/services/memory_config_service.py b/api/app/services/memory_config_service.py index d7f7c8a6..af9c0c5d 100644 --- a/api/app/services/memory_config_service.py +++ b/api/app/services/memory_config_service.py @@ -27,29 +27,73 @@ from uuid import UUID logger = get_logger(__name__) config_logger = get_config_logger() +import uuid + +def _validate_config_id(config_id): + """Validate configuration ID format.""" + if isinstance(config_id, uuid.UUID): + return config_id + if config_id is None: + raise InvalidConfigError( + "Configuration ID cannot be None", + field_name="config_id", + invalid_value=config_id, + ) + + if isinstance(config_id, int): + if config_id <= 0: + raise InvalidConfigError( + f"Configuration ID must be positive: {config_id}", + field_name="config_id", + invalid_value=config_id, + ) + return config_id + + if isinstance(config_id, str): + try: + parsed_id = int(config_id.strip()) + if parsed_id <= 0: + raise InvalidConfigError( + f"Configuration ID must be positive: {parsed_id}", + field_name="config_id", + invalid_value=config_id, + ) + return parsed_id + except ValueError: + raise InvalidConfigError( + f"Invalid configuration ID format: '{config_id}'", + field_name="config_id", + invalid_value=config_id, + ) + + raise InvalidConfigError( + f"Invalid type for configuration ID: expected int or str, got {type(config_id).__name__}", + field_name="config_id", + invalid_value=config_id, + ) class MemoryConfigService: """ Centralized service for memory configuration loading and validation. - + This class provides a single implementation of configuration loading logic that can be shared across multiple services, eliminating code duplication. - + Usage: config_service = MemoryConfigService(db) memory_config = config_service.load_memory_config(config_id) model_config = config_service.get_model_config(model_id) """ - + def __init__(self, db: Session): """Initialize the service with a database session. - + Args: db: SQLAlchemy database session """ self.db = db - + def load_memory_config( self, config_id: UUID, @@ -57,19 +101,19 @@ class MemoryConfigService: ) -> MemoryConfig: """ Load memory configuration from database by config_id. - + Args: config_id: Configuration ID (UUID) from database service_name: Name of the calling service (for logging purposes) - + Returns: MemoryConfig: Immutable configuration object - + Raises: ConfigurationError: If validation fails """ start_time = time.time() - + validated_config_id = _validate_config_id(config_id) config_logger.info( "Starting memory configuration loading", extra={ @@ -78,9 +122,9 @@ class MemoryConfigService: "config_id": str(config_id), }, ) - + logger.info(f"Loading memory configuration from database: config_id={config_id}") - + try: # Validate config_id is UUID if not isinstance(config_id, UUID): @@ -99,7 +143,7 @@ class MemoryConfigService: field_name="config_id", invalid_value=config_id, ) - + # Step 1: Get config and workspace db_query_start = time.time() result = MemoryConfigRepository.get_config_with_workspace(self.db, config_id) @@ -120,9 +164,9 @@ class MemoryConfigService: raise ConfigurationError( f"Configuration {config_id} not found in database" ) - + memory_config, workspace = result - + # Step 2: Validate embedding model (returns both UUID and name) embed_start = time.time() embedding_uuid, embedding_name = validate_embedding_model( @@ -134,7 +178,7 @@ class MemoryConfigService: ) embed_time = time.time() - embed_start logger.info(f"[PERF] Embedding validation: {embed_time:.4f}s") - + # Step 3: Resolve LLM model llm_start = time.time() llm_uuid, llm_name = validate_and_resolve_model_id( @@ -148,7 +192,7 @@ class MemoryConfigService: ) llm_time = time.time() - llm_start logger.info(f"[PERF] LLM validation: {llm_time:.4f}s") - + # Step 4: Resolve optional rerank model rerank_start = time.time() rerank_uuid = None @@ -166,10 +210,10 @@ class MemoryConfigService: rerank_time = time.time() - rerank_start if memory_config.rerank_id: logger.info(f"[PERF] Rerank validation: {rerank_time:.4f}s") - + # Note: embedding_name is now returned from validate_embedding_model above # No need for redundant query! - + # Create immutable MemoryConfig object config = MemoryConfig( config_id=memory_config.config_id, @@ -210,9 +254,9 @@ class MemoryConfigService: pruning_scene=memory_config.pruning_scene or "education", pruning_threshold=float(memory_config.pruning_threshold) if memory_config.pruning_threshold is not None else 0.5, ) - + elapsed_ms = (time.time() - start_time) * 1000 - + config_logger.info( "Memory configuration loaded successfully", extra={ @@ -225,13 +269,13 @@ class MemoryConfigService: "elapsed_ms": elapsed_ms, }, ) - + logger.info(f"Memory configuration loaded successfully: {config.config_name}") return config - + except Exception as e: elapsed_ms = (time.time() - start_time) * 1000 - + config_logger.error( "Failed to load memory configuration", extra={ @@ -245,7 +289,7 @@ class MemoryConfigService: }, exc_info=True, ) - + logger.error(f"Failed to load memory configuration {config_id}: {e}") if isinstance(e, (ConfigurationError, ValueError)): raise diff --git a/api/app/tasks.py b/api/app/tasks.py index 3ef2653a..38488aa5 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -383,7 +383,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID): @celery_app.task(name="app.core.memory.agent.read_message", bind=True) -def read_message_task(self, end_user_id: str, message: str, history: List[Dict[str, Any]], search_switch: str, config_id: uuid.UUID, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: +def read_message_task(self, end_user_id: str, message: str, history: List[Dict[str, Any]], search_switch: str, config_id: str, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: """Celery task to process a read message via MemoryAgentService. @@ -392,7 +392,7 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s message: User message to process history: Conversation history search_switch: Search switch parameter - config_id: Optional configuration ID + config_id: Configuration ID as string (will be converted to UUID) Returns: Dict containing the result and metadata @@ -402,8 +402,16 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s """ start_time = time.time() + # Convert config_id string to UUID + actual_config_id = None + if config_id: + try: + actual_config_id = uuid.UUID(config_id) if isinstance(config_id, str) else config_id + except (ValueError, AttributeError): + # If conversion fails, leave as None and try to resolve + pass + # Resolve config_id if None - actual_config_id = config_id if actual_config_id is None: try: from app.services.memory_agent_service import get_end_user_connected_config @@ -473,13 +481,13 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s @celery_app.task(name="app.core.memory.agent.write_message", bind=True) -def write_message_task(self, end_user_id: str, message: str, config_id: uuid.UUID, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: +def write_message_task(self, end_user_id: str, message: str, config_id: str, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: """Celery task to process a write message via MemoryAgentService. Args: end_user_id: Group ID for the memory agent (also used as end_user_id) message: Message to write - config_id: Optional configuration ID + config_id: Configuration ID as string (will be converted to UUID) Returns: Dict containing the result and metadata @@ -493,8 +501,24 @@ def write_message_task(self, end_user_id: str, message: str, config_id: uuid.UUI logger.info(f"[CELERY WRITE] Starting write task - end_user_id={end_user_id}, config_id={config_id}, storage_type={storage_type}") start_time = time.time() + # Convert config_id string to UUID + actual_config_id = None + if config_id: + try: + actual_config_id = uuid.UUID(config_id) if isinstance(config_id, str) else config_id + logger.info(f"[CELERY WRITE] Converted config_id to UUID: {actual_config_id} (type: {type(actual_config_id).__name__})") + except (ValueError, AttributeError) as e: + logger.error(f"[CELERY WRITE] Invalid config_id format: {config_id}, error: {e}") + return { + "status": "FAILURE", + "error": f"Invalid config_id format: {config_id}", + "end_user_id": end_user_id, + "config_id": config_id, + "elapsed_time": 0.0, + "task_id": self.request.id + } + # Resolve config_id if None - actual_config_id = config_id if actual_config_id is None: try: from app.services.memory_agent_service import get_end_user_connected_config @@ -511,7 +535,7 @@ def write_message_task(self, end_user_id: str, message: str, config_id: uuid.UUI async def _run() -> str: db = next(get_db()) try: - logger.info(f"[CELERY WRITE] Executing MemoryAgentService.write_memory") + logger.info(f"[CELERY WRITE] Executing MemoryAgentService.write_memory with config_id={actual_config_id} (type: {type(actual_config_id).__name__})") service = MemoryAgentService() result = await service.write_memory(end_user_id, message, actual_config_id, db, storage_type, user_rag_memory_id) logger.info(f"[CELERY WRITE] Write completed successfully: {result}")