[fix]Correct the flaws existing in the semantic segmentation method
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
from typing import Optional
|
from typing import Optional, List, Any
|
||||||
|
from enum import Enum
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from app.core.logging_config import get_memory_logger
|
from app.core.logging_config import get_memory_logger
|
||||||
from app.core.memory.models.message_models import DialogData, Chunk
|
from app.core.memory.models.message_models import DialogData, Chunk
|
||||||
@@ -10,6 +12,20 @@ from app.core.memory.utils.config.config_utils import get_chunker_config
|
|||||||
logger = get_memory_logger(__name__)
|
logger = get_memory_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ChunkerStrategy(Enum):
|
||||||
|
"""Supported chunking strategies."""
|
||||||
|
RECURSIVE = "RecursiveChunker"
|
||||||
|
SEMANTIC = "SemanticChunker"
|
||||||
|
LATE = "LateChunker"
|
||||||
|
NEURAL = "NeuralChunker"
|
||||||
|
LLM = "LLMChunker"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_valid_strategies(cls) -> List[str]:
|
||||||
|
"""Get list of valid strategy names."""
|
||||||
|
return [strategy.value for strategy in cls]
|
||||||
|
|
||||||
|
|
||||||
class DialogueChunker:
|
class DialogueChunker:
|
||||||
"""A class that processes dialogues and fills them with chunks based on a specified strategy.
|
"""A class that processes dialogues and fills them with chunks based on a specified strategy.
|
||||||
|
|
||||||
@@ -17,23 +33,51 @@ class DialogueChunker:
|
|||||||
of different chunking strategies to dialogue data.
|
of different chunking strategies to dialogue data.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, chunker_strategy: str = "RecursiveChunker", llm_client=None):
|
def __init__(self, chunker_strategy: str = "RecursiveChunker", llm_client: Optional[Any] = None):
|
||||||
"""Initialize the DialogueChunker with a specific chunking strategy.
|
"""Initialize the DialogueChunker with a specific chunking strategy.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
chunker_strategy: The chunking strategy to use (default: RecursiveChunker)
|
chunker_strategy: The chunking strategy to use (default: RecursiveChunker)
|
||||||
Options: SemanticChunker, RecursiveChunker, LateChunker, NeuralChunker
|
Options: SemanticChunker, RecursiveChunker, LateChunker, NeuralChunker, LLMChunker
|
||||||
|
llm_client: LLM client instance (required for LLMChunker strategy)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If chunker_strategy is invalid or required parameters are missing
|
||||||
"""
|
"""
|
||||||
self.chunker_strategy = chunker_strategy
|
# Validate strategy
|
||||||
chunker_config_dict = get_chunker_config(chunker_strategy)
|
valid_strategies = ChunkerStrategy.get_valid_strategies()
|
||||||
self.chunker_config = ChunkerConfig.model_validate(chunker_config_dict)
|
if chunker_strategy not in valid_strategies:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid chunker_strategy: '{chunker_strategy}'. "
|
||||||
|
f"Must be one of {valid_strategies}"
|
||||||
|
)
|
||||||
|
|
||||||
if self.chunker_config.chunker_strategy == "LLMChunker":
|
self.chunker_strategy = chunker_strategy
|
||||||
self.chunker_client = ChunkerClient(self.chunker_config, llm_client)
|
logger.info(f"Initializing DialogueChunker with strategy: {chunker_strategy}")
|
||||||
else:
|
|
||||||
self.chunker_client = ChunkerClient(self.chunker_config)
|
try:
|
||||||
|
# Load and validate configuration
|
||||||
|
chunker_config_dict = get_chunker_config(chunker_strategy)
|
||||||
|
if not chunker_config_dict:
|
||||||
|
raise ValueError(f"Failed to load configuration for strategy: {chunker_strategy}")
|
||||||
|
|
||||||
|
self.chunker_config = ChunkerConfig.model_validate(chunker_config_dict)
|
||||||
|
|
||||||
|
# Initialize chunker client
|
||||||
|
if self.chunker_config.chunker_strategy == "LLMChunker":
|
||||||
|
if not llm_client:
|
||||||
|
raise ValueError("llm_client is required for LLMChunker strategy")
|
||||||
|
self.chunker_client = ChunkerClient(self.chunker_config, llm_client)
|
||||||
|
else:
|
||||||
|
self.chunker_client = ChunkerClient(self.chunker_config)
|
||||||
|
|
||||||
|
logger.info(f"DialogueChunker initialized successfully with strategy: {chunker_strategy}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to initialize DialogueChunker: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
async def process_dialogue(self, dialogue: DialogData) -> list[Chunk]:
|
async def process_dialogue(self, dialogue: DialogData) -> List[Chunk]:
|
||||||
"""Process a dialogue by generating chunks and adding them to the DialogData object.
|
"""Process a dialogue by generating chunks and adding them to the DialogData object.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -43,54 +87,125 @@ class DialogueChunker:
|
|||||||
A list of Chunk objects
|
A list of Chunk objects
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ValueError: If chunking fails or returns empty chunks
|
ValueError: If dialogue is invalid or chunking fails
|
||||||
|
Exception: If chunking process encounters an error
|
||||||
"""
|
"""
|
||||||
result_dialogue = await self.chunker_client.generate_chunks(dialogue)
|
# Validate input
|
||||||
chunks = result_dialogue.chunks
|
if not dialogue:
|
||||||
|
raise ValueError("dialogue cannot be None")
|
||||||
if not chunks or len(chunks) == 0:
|
|
||||||
|
if not dialogue.context or not dialogue.context.msgs:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Chunking failed: No chunks generated for dialogue {dialogue.ref_id}. "
|
f"Dialogue {dialogue.ref_id} has no messages to chunk. "
|
||||||
f"Messages: {len(dialogue.context.msgs) if dialogue.context else 0}, "
|
f"Context: {dialogue.context is not None}, "
|
||||||
f"Strategy: {self.chunker_config.chunker_strategy}"
|
f"Messages: {len(dialogue.context.msgs) if dialogue.context else 0}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Processing dialogue {dialogue.ref_id} with {len(dialogue.context.msgs)} messages "
|
||||||
|
f"using strategy: {self.chunker_strategy}"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Generate chunks
|
||||||
|
result_dialogue = await self.chunker_client.generate_chunks(dialogue)
|
||||||
|
chunks = result_dialogue.chunks
|
||||||
|
|
||||||
return chunks
|
# Validate results
|
||||||
|
if not chunks or len(chunks) == 0:
|
||||||
|
raise ValueError(
|
||||||
|
f"Chunking failed: No chunks generated for dialogue {dialogue.ref_id}. "
|
||||||
|
f"Messages: {len(dialogue.context.msgs)}, "
|
||||||
|
f"Content length: {len(dialogue.content) if dialogue.content else 0}, "
|
||||||
|
f"Strategy: {self.chunker_config.chunker_strategy}"
|
||||||
|
)
|
||||||
|
|
||||||
def save_chunking_results(self, dialogue: DialogData, output_path: Optional[str] = None) -> str:
|
logger.info(
|
||||||
|
f"Successfully generated {len(chunks)} chunks for dialogue {dialogue.ref_id}. "
|
||||||
|
f"Total characters processed: {len(dialogue.content) if dialogue.content else 0}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
# Re-raise validation errors
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error processing dialogue {dialogue.ref_id} with strategy {self.chunker_strategy}: {e}",
|
||||||
|
exc_info=True
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def save_chunking_results(
|
||||||
|
self,
|
||||||
|
chunks: List[Chunk],
|
||||||
|
dialogue: DialogData,
|
||||||
|
output_path: Optional[str] = None,
|
||||||
|
preview_length: int = 100
|
||||||
|
) -> str:
|
||||||
"""Save the chunking results to a file and return the output path.
|
"""Save the chunking results to a file and return the output path.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
dialogue: The processed DialogData object with chunks
|
chunks: List of Chunk objects to save
|
||||||
output_path: Optional path to save the output
|
dialogue: The DialogData object that was processed
|
||||||
|
output_path: Optional path to save the output (defaults to current directory)
|
||||||
|
preview_length: Maximum length of content preview (default: 100)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The path where the output was saved
|
The path where the output was saved
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If chunks or dialogue is invalid
|
||||||
|
IOError: If file writing fails
|
||||||
"""
|
"""
|
||||||
if not output_path:
|
# Validate input
|
||||||
output_path = os.path.join(
|
if not chunks:
|
||||||
os.path.dirname(__file__), "..", "..",
|
raise ValueError("chunks list cannot be empty")
|
||||||
f"chunker_output_{self.chunker_strategy.lower()}.txt"
|
if not dialogue:
|
||||||
)
|
raise ValueError("dialogue cannot be None")
|
||||||
|
|
||||||
output_lines = [
|
|
||||||
f"=== Chunking Results ({self.chunker_strategy}) ===",
|
|
||||||
f"Dialogue ID: {dialogue.ref_id}",
|
|
||||||
f"Original conversation has {len(dialogue.context.msgs)} messages",
|
|
||||||
f"Total characters: {len(dialogue.content)}",
|
|
||||||
f"Generated {len(dialogue.chunks)} chunks:"
|
|
||||||
]
|
|
||||||
|
|
||||||
for i, chunk in enumerate(dialogue.chunks):
|
# Generate default output path if not provided
|
||||||
output_lines.append(f" Chunk {i+1}: {len(chunk.content)} characters")
|
if not output_path:
|
||||||
output_lines.append(f" Content preview: {chunk.content}...")
|
output_dir = Path(__file__).parent.parent.parent
|
||||||
if chunk.metadata:
|
output_path = str(output_dir / f"chunker_output_{self.chunker_strategy.lower()}.txt")
|
||||||
output_lines.append(f" Metadata: {chunk.metadata}")
|
|
||||||
|
logger.info(f"Saving chunking results to: {output_path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Prepare output content
|
||||||
|
output_lines = [
|
||||||
|
f"=== Chunking Results ({self.chunker_strategy}) ===",
|
||||||
|
f"Dialogue ID: {dialogue.ref_id}",
|
||||||
|
f"Original conversation has {len(dialogue.context.msgs) if dialogue.context else 0} messages",
|
||||||
|
f"Total characters: {len(dialogue.content) if dialogue.content else 0}",
|
||||||
|
f"Generated {len(chunks)} chunks:",
|
||||||
|
""
|
||||||
|
]
|
||||||
|
|
||||||
|
for i, chunk in enumerate(chunks, 1):
|
||||||
|
content_preview = chunk.content[:preview_length] if chunk.content else ""
|
||||||
|
if len(chunk.content) > preview_length:
|
||||||
|
content_preview += "..."
|
||||||
|
|
||||||
|
output_lines.append(f" Chunk {i}: {len(chunk.content)} characters")
|
||||||
|
output_lines.append(f" Content preview: {content_preview}")
|
||||||
|
if chunk.metadata:
|
||||||
|
output_lines.append(f" Metadata: {chunk.metadata}")
|
||||||
|
output_lines.append("")
|
||||||
|
|
||||||
with open(output_path, "w", encoding="utf-8") as f:
|
# Write to file
|
||||||
f.write("\n".join(output_lines))
|
with open(output_path, "w", encoding="utf-8") as f:
|
||||||
|
f.write("\n".join(output_lines))
|
||||||
|
|
||||||
logger.info(f"Chunking results saved to: {output_path}")
|
logger.info(f"Successfully saved chunking results to: {output_path}")
|
||||||
return output_path
|
return output_path
|
||||||
|
|
||||||
|
except IOError as e:
|
||||||
|
logger.error(f"Failed to write chunking results to {output_path}: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error saving chunking results: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user