config_id字段改成UUID

This commit is contained in:
lixinyue
2026-01-22 20:40:41 +08:00
parent b84c82880c
commit f2d6fd7b08
7 changed files with 177 additions and 108 deletions

View File

@@ -162,9 +162,10 @@ async def write_server(
api_logger.info(f"Write service requested for group {user_input.end_user_id}, storage_type: {storage_type}, user_rag_memory_id: {user_rag_memory_id}") api_logger.info(f"Write service requested for group {user_input.end_user_id}, storage_type: {storage_type}, user_rag_memory_id: {user_rag_memory_id}")
try: try:
messages_list = memory_agent_service.get_messages_list(user_input)
result = await memory_agent_service.write_memory( result = await memory_agent_service.write_memory(
user_input.end_user_id, user_input.end_user_id,
user_input.messages, messages_list,
config_id, config_id,
db, db,
storage_type, storage_type,

View File

@@ -1,44 +1,54 @@
from app.core.memory.agent.utils.llm_tools import WriteState
from app.core.memory.agent.utils.llm_tools import WriteState
from app.core.memory.agent.utils.write_tools import write from app.core.memory.agent.utils.write_tools import write
from app.core.logging_config import get_agent_logger from app.core.logging_config import get_agent_logger
logger = get_agent_logger(__name__) logger = get_agent_logger(__name__)
async def write_node(state: WriteState) -> WriteState: async def write_node(state: WriteState) -> WriteState:
""" """
Write data to the database/file system. Write data to the database/file system.
Args: Args:
content: Data content to write state: WriteState containing messages, end_user_id, and memory_config
end_user_id: End user identifier
memory_config: MemoryConfig object containing all configuration
Returns: Returns:
dict: Contains 'status', 'saved_to', and 'data' fields dict: Contains 'write_result' with status and data fields
""" """
content=state.get('data','') messages = state.get('messages', [])
end_user_id=state.get('end_user_id','') end_user_id = state.get('end_user_id', '')
memory_config=state.get('memory_config', '') memory_config = state.get('memory_config', '')
# Convert LangChain messages to structured format expected by write()
structured_messages = []
for msg in messages:
if hasattr(msg, 'type') and hasattr(msg, 'content'):
# Map LangChain message types to role names
role = 'user' if msg.type == 'human' else 'assistant' if msg.type == 'ai' else msg.type
structured_messages.append({
"role": role,
"content": msg.content # content is now guaranteed to be a string
})
try: try:
result=await write( result = await write(
messages=structured_messages,
end_user_id=end_user_id, end_user_id=end_user_id,
memory_config=memory_config, memory_config=memory_config,
messages=content, # 修复:使用正确的参数名 messages
) )
logger.info(f"Write completed successfully! Config: {memory_config.config_name}") logger.info(f"Write completed successfully! Config: {memory_config.config_name}")
write_result= { write_result = {
"status": "success", "status": "success",
"data": content, "data": structured_messages,
"config_id": memory_config.config_id, "config_id": memory_config.config_id,
"config_name": memory_config.config_name, "config_name": memory_config.config_name,
} }
return {"write_result":write_result} return {"write_result": write_result}
except Exception as e: except Exception as e:
logger.error(f"Data_write failed: {e}", exc_info=True) logger.error(f"Data_write failed: {e}", exc_info=True)
write_result= { write_result = {
"status": "error", "status": "error",
"message": str(e), "message": str(e),
} }

View File

@@ -10,55 +10,58 @@ from app.core.memory.models.message_models import DialogData, ConversationContex
async def get_chunked_dialogs( async def get_chunked_dialogs(
chunker_strategy: str = "RecursiveChunker", chunker_strategy: str = "RecursiveChunker",
end_user_id: str = "group_1", end_user_id: str = "group_1",
content: str = "这是用户的输入", messages: list = None,
ref_id: str = "wyl_20251027", ref_id: str = "wyl_20251027",
config_id: str = None config_id: str = None
) -> List[DialogData]: ) -> List[DialogData]:
"""Generate chunks from all test data entries using the specified chunker strategy. """Generate chunks from structured messages using the specified chunker strategy.
Args: Args:
chunker_strategy: The chunking strategy to use (default: RecursiveChunker) chunker_strategy: The chunking strategy to use (default: RecursiveChunker)
end_user_id: End user identifier group_id: Group identifier
content: Dialog content messages: Structured message list [{"role": "user", "content": "..."}, ...]
ref_id: Reference identifier ref_id: Reference identifier
config_id: Configuration ID for processing config_id: Configuration ID for processing
Returns: Returns:
List of DialogData objects with generated chunks for each test entry List of DialogData objects with generated chunks
""" """
dialog_data_list = [] from app.core.logging_config import get_agent_logger
messages = [] logger = get_agent_logger(__name__)
messages.append(ConversationMessage(role="用户", msg=content)) if not messages or not isinstance(messages, list) or len(messages) == 0:
raise ValueError("messages parameter must be a non-empty list")
# Create DialogData conversation_messages = []
conversation_context = ConversationContext(msgs=messages)
# Create DialogData with end_user_id for idx, msg in enumerate(messages):
if not isinstance(msg, dict) or 'role' not in msg or 'content' not in msg:
raise ValueError(f"Message {idx} format error: must contain 'role' and 'content' fields")
role = msg['role']
content = msg['content']
if role not in ['user', 'assistant']:
raise ValueError(f"Message {idx} role must be 'user' or 'assistant', got: {role}")
if content.strip():
conversation_messages.append(ConversationMessage(role=role, msg=content.strip()))
if not conversation_messages:
raise ValueError("Message list cannot be empty after filtering")
conversation_context = ConversationContext(msgs=conversation_messages)
dialog_data = DialogData( dialog_data = DialogData(
context=conversation_context, context=conversation_context,
ref_id=ref_id, ref_id=ref_id,
end_user_id=end_user_id, end_user_id=end_user_id,
config_id=config_id config_id=config_id
) )
# Create DialogueChunker and process the dialogue
chunker = DialogueChunker(chunker_strategy) chunker = DialogueChunker(chunker_strategy)
extracted_chunks = await chunker.process_dialogue(dialog_data) extracted_chunks = await chunker.process_dialogue(dialog_data)
dialog_data.chunks = extracted_chunks dialog_data.chunks = extracted_chunks
dialog_data_list.append(dialog_data) logger.info(f"DialogData created with {len(extracted_chunks)} chunks")
# Convert to dict with datetime serialized return [dialog_data]
def serialize_datetime(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
combined_output = [dd.model_dump() for dd in dialog_data_list]
print(dialog_data_list)
# with open(os.path.join(os.path.dirname(__file__), "chunker_test_output.txt"), "w", encoding="utf-8") as f:
# json.dump(combined_output, f, ensure_ascii=False, indent=4, default=serialize_datetime)
return dialog_data_list

View File

@@ -38,7 +38,9 @@ async def write(
Execute the complete knowledge extraction pipeline. Execute the complete knowledge extraction pipeline.
Args: Args:
end_user_id: End user identifier user_id: User identifier
apply_id: Application identifier
group_id: Group identifier
memory_config: MemoryConfig object containing all configuration memory_config: MemoryConfig object containing all configuration
messages: Structured message list [{"role": "user", "content": "..."}, ...] messages: Structured message list [{"role": "user", "content": "..."}, ...]
ref_id: Reference ID, defaults to "wyl20251027" ref_id: Reference ID, defaults to "wyl20251027"
@@ -54,7 +56,7 @@ async def write(
logger.info(f"LLM model: {memory_config.llm_model_name}") logger.info(f"LLM model: {memory_config.llm_model_name}")
logger.info(f"Embedding model: {memory_config.embedding_model_name}") logger.info(f"Embedding model: {memory_config.embedding_model_name}")
logger.info(f"Chunker strategy: {chunker_strategy}") logger.info(f"Chunker strategy: {chunker_strategy}")
logger.info(f"End User ID: {end_user_id}") logger.info(f"end_user_id ID: {end_user_id}")
# Construct clients from memory_config using factory pattern with db session # Construct clients from memory_config using factory pattern with db session
with get_db_context() as db: with get_db_context() as db:
@@ -77,25 +79,10 @@ async def write(
# Step 1: Load and chunk data # Step 1: Load and chunk data
step_start = time.time() step_start = time.time()
# Convert messages list to content string
# messages format: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]
if isinstance(messages, list) and len(messages) > 0:
# Extract content from the last user message or concatenate all messages
if isinstance(messages[-1], dict) and 'content' in messages[-1]:
content = messages[-1]['content']
else:
# Fallback: concatenate all message contents
content = " ".join([msg.get('content', '') for msg in messages if isinstance(msg, dict)])
elif isinstance(messages, str):
content = messages
else:
content = str(messages)
chunked_dialogs = await get_chunked_dialogs( chunked_dialogs = await get_chunked_dialogs(
chunker_strategy=chunker_strategy, chunker_strategy=chunker_strategy,
end_user_id=end_user_id, end_user_id=end_user_id,
content=content, # 修复:使用 content 参数而不是 messages messages=messages,
ref_id=ref_id, ref_id=ref_id,
config_id=config_id, config_id=config_id,
) )

View File

@@ -27,6 +27,50 @@ from uuid import UUID
logger = get_logger(__name__) logger = get_logger(__name__)
config_logger = get_config_logger() config_logger = get_config_logger()
import uuid
def _validate_config_id(config_id):
"""Validate configuration ID format."""
if isinstance(config_id, uuid.UUID):
return config_id
if config_id is None:
raise InvalidConfigError(
"Configuration ID cannot be None",
field_name="config_id",
invalid_value=config_id,
)
if isinstance(config_id, int):
if config_id <= 0:
raise InvalidConfigError(
f"Configuration ID must be positive: {config_id}",
field_name="config_id",
invalid_value=config_id,
)
return config_id
if isinstance(config_id, str):
try:
parsed_id = int(config_id.strip())
if parsed_id <= 0:
raise InvalidConfigError(
f"Configuration ID must be positive: {parsed_id}",
field_name="config_id",
invalid_value=config_id,
)
return parsed_id
except ValueError:
raise InvalidConfigError(
f"Invalid configuration ID format: '{config_id}'",
field_name="config_id",
invalid_value=config_id,
)
raise InvalidConfigError(
f"Invalid type for configuration ID: expected int or str, got {type(config_id).__name__}",
field_name="config_id",
invalid_value=config_id,
)
class MemoryConfigService: class MemoryConfigService:
@@ -69,7 +113,7 @@ class MemoryConfigService:
ConfigurationError: If validation fails ConfigurationError: If validation fails
""" """
start_time = time.time() start_time = time.time()
validated_config_id = _validate_config_id(config_id)
config_logger.info( config_logger.info(
"Starting memory configuration loading", "Starting memory configuration loading",
extra={ extra={

View File

@@ -383,7 +383,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID):
@celery_app.task(name="app.core.memory.agent.read_message", bind=True) @celery_app.task(name="app.core.memory.agent.read_message", bind=True)
def read_message_task(self, end_user_id: str, message: str, history: List[Dict[str, Any]], search_switch: str, config_id: uuid.UUID, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: def read_message_task(self, end_user_id: str, message: str, history: List[Dict[str, Any]], search_switch: str, config_id: str, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]:
"""Celery task to process a read message via MemoryAgentService. """Celery task to process a read message via MemoryAgentService.
@@ -392,7 +392,7 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s
message: User message to process message: User message to process
history: Conversation history history: Conversation history
search_switch: Search switch parameter search_switch: Search switch parameter
config_id: Optional configuration ID config_id: Configuration ID as string (will be converted to UUID)
Returns: Returns:
Dict containing the result and metadata Dict containing the result and metadata
@@ -402,8 +402,16 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s
""" """
start_time = time.time() start_time = time.time()
# Convert config_id string to UUID
actual_config_id = None
if config_id:
try:
actual_config_id = uuid.UUID(config_id) if isinstance(config_id, str) else config_id
except (ValueError, AttributeError):
# If conversion fails, leave as None and try to resolve
pass
# Resolve config_id if None # Resolve config_id if None
actual_config_id = config_id
if actual_config_id is None: if actual_config_id is None:
try: try:
from app.services.memory_agent_service import get_end_user_connected_config from app.services.memory_agent_service import get_end_user_connected_config
@@ -473,13 +481,13 @@ def read_message_task(self, end_user_id: str, message: str, history: List[Dict[s
@celery_app.task(name="app.core.memory.agent.write_message", bind=True) @celery_app.task(name="app.core.memory.agent.write_message", bind=True)
def write_message_task(self, end_user_id: str, message: str, config_id: uuid.UUID, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: def write_message_task(self, end_user_id: str, message: str, config_id: str, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]:
"""Celery task to process a write message via MemoryAgentService. """Celery task to process a write message via MemoryAgentService.
Args: Args:
end_user_id: Group ID for the memory agent (also used as end_user_id) end_user_id: Group ID for the memory agent (also used as end_user_id)
message: Message to write message: Message to write
config_id: Optional configuration ID config_id: Configuration ID as string (will be converted to UUID)
Returns: Returns:
Dict containing the result and metadata Dict containing the result and metadata
@@ -493,8 +501,24 @@ def write_message_task(self, end_user_id: str, message: str, config_id: uuid.UUI
logger.info(f"[CELERY WRITE] Starting write task - end_user_id={end_user_id}, config_id={config_id}, storage_type={storage_type}") logger.info(f"[CELERY WRITE] Starting write task - end_user_id={end_user_id}, config_id={config_id}, storage_type={storage_type}")
start_time = time.time() start_time = time.time()
# Convert config_id string to UUID
actual_config_id = None
if config_id:
try:
actual_config_id = uuid.UUID(config_id) if isinstance(config_id, str) else config_id
logger.info(f"[CELERY WRITE] Converted config_id to UUID: {actual_config_id} (type: {type(actual_config_id).__name__})")
except (ValueError, AttributeError) as e:
logger.error(f"[CELERY WRITE] Invalid config_id format: {config_id}, error: {e}")
return {
"status": "FAILURE",
"error": f"Invalid config_id format: {config_id}",
"end_user_id": end_user_id,
"config_id": config_id,
"elapsed_time": 0.0,
"task_id": self.request.id
}
# Resolve config_id if None # Resolve config_id if None
actual_config_id = config_id
if actual_config_id is None: if actual_config_id is None:
try: try:
from app.services.memory_agent_service import get_end_user_connected_config from app.services.memory_agent_service import get_end_user_connected_config
@@ -511,7 +535,7 @@ def write_message_task(self, end_user_id: str, message: str, config_id: uuid.UUI
async def _run() -> str: async def _run() -> str:
db = next(get_db()) db = next(get_db())
try: try:
logger.info(f"[CELERY WRITE] Executing MemoryAgentService.write_memory") logger.info(f"[CELERY WRITE] Executing MemoryAgentService.write_memory with config_id={actual_config_id} (type: {type(actual_config_id).__name__})")
service = MemoryAgentService() service = MemoryAgentService()
result = await service.write_memory(end_user_id, message, actual_config_id, db, storage_type, user_rag_memory_id) result = await service.write_memory(end_user_id, message, actual_config_id, db, storage_type, user_rag_memory_id)
logger.info(f"[CELERY WRITE] Write completed successfully: {result}") logger.info(f"[CELERY WRITE] Write completed successfully: {result}")