Feature/ontology class clean (#249)
* [add] Complete ontology engineering feature implementation * [add] Add ontology feature integration and validation utilities * [add] Add OWL validator and validation utilities * [fix] Add missing render_ontology_extraction_prompt function * [fix]Add dependencies, fix functionality
This commit is contained in:
@@ -58,6 +58,12 @@ from app.core.memory.models.triplet_models import (
|
||||
TripletExtractionResponse,
|
||||
)
|
||||
|
||||
# Ontology models
|
||||
from app.core.memory.models.ontology_models import (
|
||||
OntologyClass,
|
||||
OntologyExtractionResponse,
|
||||
)
|
||||
|
||||
# Variable configuration models
|
||||
from app.core.memory.models.variate_config import (
|
||||
StatementExtractionConfig,
|
||||
@@ -105,6 +111,9 @@ __all__ = [
|
||||
"Entity",
|
||||
"Triplet",
|
||||
"TripletExtractionResponse",
|
||||
# Ontology models
|
||||
"OntologyClass",
|
||||
"OntologyExtractionResponse",
|
||||
# Variable configuration
|
||||
"StatementExtractionConfig",
|
||||
"ForgettingEngineConfig",
|
||||
|
||||
135
api/app/core/memory/models/ontology_models.py
Normal file
135
api/app/core/memory/models/ontology_models.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Models for ontology classes and extraction responses.
|
||||
|
||||
This module contains Pydantic models for representing extracted ontology classes
|
||||
from scenario descriptions, following OWL ontology engineering standards.
|
||||
|
||||
Classes:
|
||||
OntologyClass: Represents an extracted ontology class
|
||||
OntologyExtractionResponse: Response model containing extracted ontology classes
|
||||
"""
|
||||
|
||||
from typing import List, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
|
||||
|
||||
class OntologyClass(BaseModel):
|
||||
"""Represents an extracted ontology class from scenario description.
|
||||
|
||||
An ontology class represents an abstract category or concept in a domain,
|
||||
following OWL ontology engineering standards and naming conventions.
|
||||
|
||||
Attributes:
|
||||
id: Unique string identifier for the ontology class
|
||||
name: Name of the class in PascalCase format (e.g., 'MedicalProcedure')
|
||||
name_chinese: Chinese translation of the class name (e.g., '医疗程序')
|
||||
description: Textual description of the class
|
||||
examples: List of concrete instance examples of this class
|
||||
parent_class: Optional name of the parent class in the hierarchy
|
||||
entity_type: Type/category of the entity (e.g., 'Person', 'Organization', 'Concept')
|
||||
domain: Domain this class belongs to (e.g., 'Healthcare', 'Education')
|
||||
|
||||
Config:
|
||||
extra: Ignore extra fields from LLM output
|
||||
"""
|
||||
model_config = ConfigDict(extra='ignore')
|
||||
|
||||
id: str = Field(
|
||||
default_factory=lambda: uuid4().hex,
|
||||
description="Unique identifier for the ontology class"
|
||||
)
|
||||
name: str = Field(
|
||||
...,
|
||||
description="Name of the class in PascalCase format"
|
||||
)
|
||||
name_chinese: Optional[str] = Field(
|
||||
None,
|
||||
description="Chinese translation of the class name"
|
||||
)
|
||||
description: str = Field(
|
||||
...,
|
||||
description="Description of the class"
|
||||
)
|
||||
examples: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="List of concrete instance examples"
|
||||
)
|
||||
parent_class: Optional[str] = Field(
|
||||
None,
|
||||
description="Name of the parent class in the hierarchy"
|
||||
)
|
||||
entity_type: str = Field(
|
||||
...,
|
||||
description="Type/category of the entity"
|
||||
)
|
||||
domain: str = Field(
|
||||
...,
|
||||
description="Domain this class belongs to"
|
||||
)
|
||||
|
||||
@field_validator('name')
|
||||
@classmethod
|
||||
def validate_pascal_case(cls, v: str) -> str:
|
||||
"""Validate that the class name follows PascalCase convention.
|
||||
|
||||
PascalCase rules:
|
||||
- Must start with an uppercase letter
|
||||
- Cannot contain spaces
|
||||
- Should not contain special characters except underscores
|
||||
|
||||
Args:
|
||||
v: The class name to validate
|
||||
|
||||
Returns:
|
||||
The validated class name
|
||||
|
||||
Raises:
|
||||
ValueError: If the name doesn't follow PascalCase convention
|
||||
"""
|
||||
if not v:
|
||||
raise ValueError("Class name cannot be empty")
|
||||
|
||||
if not v[0].isupper():
|
||||
raise ValueError(
|
||||
f"Class name '{v}' must start with an uppercase letter (PascalCase)"
|
||||
)
|
||||
|
||||
if ' ' in v:
|
||||
raise ValueError(
|
||||
f"Class name '{v}' cannot contain spaces (PascalCase)"
|
||||
)
|
||||
|
||||
# Check for invalid characters (allow alphanumeric and underscore only)
|
||||
if not all(c.isalnum() or c == '_' for c in v):
|
||||
raise ValueError(
|
||||
f"Class name '{v}' contains invalid characters. "
|
||||
"Only alphanumeric characters and underscores are allowed"
|
||||
)
|
||||
|
||||
return v
|
||||
|
||||
|
||||
class OntologyExtractionResponse(BaseModel):
|
||||
"""Response model for ontology extraction from LLM.
|
||||
|
||||
This model represents the structured output from the LLM when
|
||||
extracting ontology classes from scenario descriptions.
|
||||
|
||||
Attributes:
|
||||
classes: List of extracted ontology classes
|
||||
domain: Domain/field the scenario belongs to
|
||||
|
||||
Config:
|
||||
extra: Ignore extra fields from LLM output
|
||||
"""
|
||||
model_config = ConfigDict(extra='ignore')
|
||||
|
||||
classes: List[OntologyClass] = Field(
|
||||
default_factory=list,
|
||||
description="List of extracted ontology classes"
|
||||
)
|
||||
domain: str = Field(
|
||||
...,
|
||||
description="Domain/field the scenario belongs to"
|
||||
)
|
||||
@@ -8,4 +8,5 @@
|
||||
- TemporalExtractor: 时间信息提取
|
||||
- EmbeddingGenerator: 嵌入向量生成
|
||||
- MemorySummaryGenerator: 记忆摘要生成
|
||||
- OntologyExtractor: 本体类提取
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,482 @@
|
||||
"""Ontology class extraction from scenario descriptions using LLM.
|
||||
|
||||
This module provides the OntologyExtractor class for extracting ontology classes
|
||||
from natural language scenario descriptions. It uses LLM-driven extraction combined
|
||||
with two-layer validation (string validation + OWL semantic validation).
|
||||
|
||||
Classes:
|
||||
OntologyExtractor: Extracts ontology classes from scenario descriptions
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
from app.core.memory.llm_tools.openai_client import OpenAIClient
|
||||
from app.core.memory.models.ontology_models import (
|
||||
OntologyClass,
|
||||
OntologyExtractionResponse,
|
||||
)
|
||||
from app.core.memory.utils.validation.ontology_validator import OntologyValidator
|
||||
from app.core.memory.utils.validation.owl_validator import OWLValidator
|
||||
from app.core.memory.utils.prompt.prompt_utils import render_ontology_extraction_prompt
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OntologyExtractor:
|
||||
"""Extractor for ontology classes from scenario descriptions.
|
||||
|
||||
This extractor uses LLM to identify abstract classes and concepts from
|
||||
natural language scenario descriptions, following OWL ontology engineering
|
||||
standards. It performs two-layer validation:
|
||||
1. String validation (naming conventions, reserved words, duplicates)
|
||||
2. OWL semantic validation (consistency checking, circular inheritance)
|
||||
|
||||
Attributes:
|
||||
llm_client: OpenAI client for LLM calls
|
||||
validator: String validator for class names and descriptions
|
||||
owl_validator: OWL validator for semantic validation
|
||||
"""
|
||||
|
||||
def __init__(self, llm_client: OpenAIClient):
|
||||
"""Initialize the OntologyExtractor.
|
||||
|
||||
Args:
|
||||
llm_client: OpenAIClient instance for LLM processing
|
||||
"""
|
||||
self.llm_client = llm_client
|
||||
self.validator = OntologyValidator()
|
||||
self.owl_validator = OWLValidator()
|
||||
|
||||
logger.info("OntologyExtractor initialized")
|
||||
|
||||
async def extract_ontology_classes(
|
||||
self,
|
||||
scenario: str,
|
||||
domain: Optional[str] = None,
|
||||
max_classes: int = 15,
|
||||
min_classes: int = 5,
|
||||
enable_owl_validation: bool = True,
|
||||
llm_temperature: float = 0.3,
|
||||
llm_max_tokens: int = 2000,
|
||||
max_description_length: int = 500,
|
||||
timeout: Optional[float] = None,
|
||||
) -> OntologyExtractionResponse:
|
||||
"""Extract ontology classes from a scenario description.
|
||||
|
||||
This is the main extraction method that orchestrates the entire process:
|
||||
1. Call LLM to extract ontology classes
|
||||
2. Perform first-layer validation (string validation and cleaning)
|
||||
3. Perform second-layer validation (OWL semantic validation)
|
||||
4. Filter invalid classes based on validation errors
|
||||
5. Return validated ontology classes
|
||||
|
||||
Args:
|
||||
scenario: Natural language scenario description
|
||||
domain: Optional domain hint (e.g., "Healthcare", "Education")
|
||||
max_classes: Maximum number of classes to extract (default: 15)
|
||||
min_classes: Minimum number of classes to extract (default: 5)
|
||||
enable_owl_validation: Whether to enable OWL validation (default: True)
|
||||
llm_temperature: LLM temperature parameter (default: 0.3)
|
||||
llm_max_tokens: LLM max tokens parameter (default: 2000)
|
||||
max_description_length: Maximum description length (default: 500)
|
||||
timeout: Optional timeout in seconds for LLM call (default: None, no timeout)
|
||||
|
||||
Returns:
|
||||
OntologyExtractionResponse containing validated ontology classes
|
||||
|
||||
Raises:
|
||||
ValueError: If scenario is empty or invalid
|
||||
asyncio.TimeoutError: If extraction times out
|
||||
|
||||
Examples:
|
||||
>>> extractor = OntologyExtractor(llm_client)
|
||||
>>> response = await extractor.extract_ontology_classes(
|
||||
... scenario="A hospital manages patient records...",
|
||||
... domain="Healthcare",
|
||||
... max_classes=10,
|
||||
... timeout=30.0
|
||||
... )
|
||||
>>> len(response.classes)
|
||||
7
|
||||
"""
|
||||
# Start timing
|
||||
start_time = time.time()
|
||||
|
||||
# Validate input
|
||||
if not scenario or not scenario.strip():
|
||||
logger.error("Scenario description is empty")
|
||||
raise ValueError("Scenario description cannot be empty")
|
||||
|
||||
scenario = scenario.strip()
|
||||
|
||||
logger.info(
|
||||
f"Starting ontology extraction - scenario_length={len(scenario)}, "
|
||||
f"domain={domain}, max_classes={max_classes}, min_classes={min_classes}, "
|
||||
f"timeout={timeout}"
|
||||
)
|
||||
|
||||
try:
|
||||
# Step 1: Call LLM for extraction with timeout
|
||||
logger.info("Step 1: Calling LLM for ontology extraction")
|
||||
llm_start_time = time.time()
|
||||
|
||||
if timeout is not None:
|
||||
# Wrap LLM call with timeout
|
||||
try:
|
||||
response = await asyncio.wait_for(
|
||||
self._call_llm_for_extraction(
|
||||
scenario=scenario,
|
||||
domain=domain,
|
||||
max_classes=max_classes,
|
||||
llm_temperature=llm_temperature,
|
||||
llm_max_tokens=llm_max_tokens,
|
||||
),
|
||||
timeout=timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
llm_duration = time.time() - llm_start_time
|
||||
logger.error(
|
||||
f"LLM extraction timed out after {timeout} seconds "
|
||||
f"(actual duration: {llm_duration:.2f}s)"
|
||||
)
|
||||
# Return empty response on timeout
|
||||
return OntologyExtractionResponse(
|
||||
classes=[],
|
||||
domain=domain or "Unknown",
|
||||
)
|
||||
else:
|
||||
# No timeout specified, call directly
|
||||
response = await self._call_llm_for_extraction(
|
||||
scenario=scenario,
|
||||
domain=domain,
|
||||
max_classes=max_classes,
|
||||
llm_temperature=llm_temperature,
|
||||
llm_max_tokens=llm_max_tokens,
|
||||
)
|
||||
|
||||
llm_duration = time.time() - llm_start_time
|
||||
logger.info(
|
||||
f"LLM returned {len(response.classes)} classes in {llm_duration:.2f}s"
|
||||
)
|
||||
|
||||
# Step 2: First-layer validation (string validation and cleaning)
|
||||
logger.info("Step 2: Performing first-layer validation (string validation)")
|
||||
validation_start_time = time.time()
|
||||
|
||||
response = self._validate_and_clean(
|
||||
response=response,
|
||||
max_description_length=max_description_length,
|
||||
)
|
||||
|
||||
validation_duration = time.time() - validation_start_time
|
||||
logger.info(
|
||||
f"After first-layer validation: {len(response.classes)} classes remain "
|
||||
f"(validation took {validation_duration:.2f}s)"
|
||||
)
|
||||
|
||||
# Check if we have enough classes after first-layer validation
|
||||
if len(response.classes) < min_classes:
|
||||
logger.warning(
|
||||
f"Only {len(response.classes)} classes remain after validation, "
|
||||
f"which is below minimum of {min_classes}"
|
||||
)
|
||||
|
||||
# Step 3: Second-layer validation (OWL semantic validation)
|
||||
if enable_owl_validation and response.classes:
|
||||
logger.info("Step 3: Performing second-layer validation (OWL validation)")
|
||||
owl_start_time = time.time()
|
||||
|
||||
is_valid, errors, world = self.owl_validator.validate_ontology_classes(
|
||||
classes=response.classes,
|
||||
)
|
||||
|
||||
owl_duration = time.time() - owl_start_time
|
||||
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"OWL validation found {len(errors)} issues in {owl_duration:.2f}s: {errors}"
|
||||
)
|
||||
|
||||
# Filter invalid classes based on errors
|
||||
response = self._filter_invalid_classes(
|
||||
response=response,
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"After second-layer validation: {len(response.classes)} classes remain"
|
||||
)
|
||||
else:
|
||||
logger.info(f"OWL validation passed successfully in {owl_duration:.2f}s")
|
||||
else:
|
||||
if not enable_owl_validation:
|
||||
logger.info("Step 3: OWL validation disabled, skipping")
|
||||
else:
|
||||
logger.info("Step 3: No classes to validate, skipping OWL validation")
|
||||
|
||||
# Calculate total duration
|
||||
total_duration = time.time() - start_time
|
||||
|
||||
# Log extraction statistics
|
||||
logger.info(
|
||||
f"Ontology extraction completed - "
|
||||
f"final_class_count={len(response.classes)}, "
|
||||
f"domain={response.domain}, "
|
||||
f"total_duration={total_duration:.2f}s, "
|
||||
f"llm_duration={llm_duration:.2f}s"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# Re-raise timeout errors
|
||||
total_duration = time.time() - start_time
|
||||
logger.error(
|
||||
f"Ontology extraction timed out after {timeout} seconds "
|
||||
f"(total duration: {total_duration:.2f}s)",
|
||||
exc_info=True
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
total_duration = time.time() - start_time
|
||||
logger.error(
|
||||
f"Ontology extraction failed after {total_duration:.2f}s: {str(e)}",
|
||||
exc_info=True
|
||||
)
|
||||
# Return empty response on failure
|
||||
return OntologyExtractionResponse(
|
||||
classes=[],
|
||||
domain=domain or "Unknown",
|
||||
)
|
||||
|
||||
async def _call_llm_for_extraction(
|
||||
self,
|
||||
scenario: str,
|
||||
domain: Optional[str],
|
||||
max_classes: int,
|
||||
llm_temperature: float,
|
||||
llm_max_tokens: int,
|
||||
) -> OntologyExtractionResponse:
|
||||
"""Call LLM to extract ontology classes from scenario.
|
||||
|
||||
This method renders the extraction prompt using the Jinja2 template
|
||||
and calls the LLM with structured output to get ontology classes.
|
||||
|
||||
Args:
|
||||
scenario: Scenario description text
|
||||
domain: Optional domain hint
|
||||
max_classes: Maximum number of classes to extract
|
||||
llm_temperature: LLM temperature parameter
|
||||
llm_max_tokens: LLM max tokens parameter
|
||||
|
||||
Returns:
|
||||
OntologyExtractionResponse from LLM
|
||||
|
||||
Raises:
|
||||
Exception: If LLM call fails
|
||||
"""
|
||||
try:
|
||||
# Render prompt using template
|
||||
prompt_content = await render_ontology_extraction_prompt(
|
||||
scenario=scenario,
|
||||
domain=domain,
|
||||
max_classes=max_classes,
|
||||
json_schema=OntologyExtractionResponse.model_json_schema(),
|
||||
)
|
||||
|
||||
logger.debug(f"Rendered prompt length: {len(prompt_content)}")
|
||||
|
||||
# Create messages for LLM
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"You are an expert ontology engineer specializing in knowledge "
|
||||
"representation and OWL standards. Extract ontology classes from "
|
||||
"scenario descriptions following the provided instructions. "
|
||||
"Return valid JSON conforming to the schema."
|
||||
),
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": prompt_content,
|
||||
},
|
||||
]
|
||||
|
||||
# Call LLM with structured output
|
||||
logger.debug(
|
||||
f"Calling LLM with temperature={llm_temperature}, "
|
||||
f"max_tokens={llm_max_tokens}"
|
||||
)
|
||||
|
||||
response = await self.llm_client.response_structured(
|
||||
messages=messages,
|
||||
response_model=OntologyExtractionResponse,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"LLM extraction successful - extracted {len(response.classes)} classes"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"LLM extraction failed: {str(e)}",
|
||||
exc_info=True
|
||||
)
|
||||
raise
|
||||
|
||||
def _validate_and_clean(
|
||||
self,
|
||||
response: OntologyExtractionResponse,
|
||||
max_description_length: int,
|
||||
) -> OntologyExtractionResponse:
|
||||
"""Perform first-layer validation: string validation and cleaning.
|
||||
|
||||
This method validates and cleans the extracted ontology classes:
|
||||
1. Validate class names (PascalCase, no reserved words)
|
||||
2. Sanitize invalid class names
|
||||
3. Truncate long descriptions
|
||||
4. Remove duplicate classes
|
||||
|
||||
Args:
|
||||
response: OntologyExtractionResponse from LLM
|
||||
max_description_length: Maximum description length
|
||||
|
||||
Returns:
|
||||
Cleaned OntologyExtractionResponse
|
||||
"""
|
||||
if not response.classes:
|
||||
logger.debug("No classes to validate")
|
||||
return response
|
||||
|
||||
logger.debug(f"Validating {len(response.classes)} classes")
|
||||
|
||||
validated_classes = []
|
||||
|
||||
for ontology_class in response.classes:
|
||||
# Validate class name
|
||||
is_valid, error_msg = self.validator.validate_class_name(
|
||||
ontology_class.name
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"Invalid class name '{ontology_class.name}': {error_msg}"
|
||||
)
|
||||
|
||||
# Attempt to sanitize
|
||||
sanitized_name = self.validator.sanitize_class_name(
|
||||
ontology_class.name
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Sanitized class name: '{ontology_class.name}' -> '{sanitized_name}'"
|
||||
)
|
||||
|
||||
# Update class name
|
||||
ontology_class.name = sanitized_name
|
||||
|
||||
# Re-validate sanitized name
|
||||
is_valid, error_msg = self.validator.validate_class_name(
|
||||
sanitized_name
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.error(
|
||||
f"Failed to sanitize class name '{ontology_class.name}': {error_msg}. "
|
||||
"Skipping this class."
|
||||
)
|
||||
continue
|
||||
|
||||
# Truncate description if too long
|
||||
if ontology_class.description:
|
||||
original_length = len(ontology_class.description)
|
||||
ontology_class.description = self.validator.truncate_description(
|
||||
ontology_class.description,
|
||||
max_length=max_description_length,
|
||||
)
|
||||
|
||||
if len(ontology_class.description) < original_length:
|
||||
logger.debug(
|
||||
f"Truncated description for '{ontology_class.name}': "
|
||||
f"{original_length} -> {len(ontology_class.description)} chars"
|
||||
)
|
||||
|
||||
validated_classes.append(ontology_class)
|
||||
|
||||
# Remove duplicates (case-insensitive)
|
||||
original_count = len(validated_classes)
|
||||
validated_classes = self.validator.remove_duplicates(validated_classes)
|
||||
|
||||
if len(validated_classes) < original_count:
|
||||
logger.info(
|
||||
f"Removed {original_count - len(validated_classes)} duplicate classes"
|
||||
)
|
||||
|
||||
# Return cleaned response
|
||||
return OntologyExtractionResponse(
|
||||
classes=validated_classes,
|
||||
domain=response.domain,
|
||||
)
|
||||
|
||||
def _filter_invalid_classes(
|
||||
self,
|
||||
response: OntologyExtractionResponse,
|
||||
errors: List[str],
|
||||
) -> OntologyExtractionResponse:
|
||||
"""Filter invalid classes based on OWL validation errors.
|
||||
|
||||
This method analyzes OWL validation errors and removes classes
|
||||
that caused validation failures (e.g., circular inheritance,
|
||||
inconsistencies).
|
||||
|
||||
Args:
|
||||
response: OntologyExtractionResponse to filter
|
||||
errors: List of error messages from OWL validation
|
||||
|
||||
Returns:
|
||||
Filtered OntologyExtractionResponse
|
||||
"""
|
||||
if not errors:
|
||||
return response
|
||||
|
||||
logger.debug(f"Filtering classes based on {len(errors)} OWL validation errors")
|
||||
|
||||
# Extract class names mentioned in errors
|
||||
invalid_class_names = set()
|
||||
|
||||
for error in errors:
|
||||
# Look for class names in error messages
|
||||
for ontology_class in response.classes:
|
||||
if ontology_class.name in error:
|
||||
invalid_class_names.add(ontology_class.name)
|
||||
logger.debug(
|
||||
f"Class '{ontology_class.name}' marked as invalid due to error: {error}"
|
||||
)
|
||||
|
||||
# Filter out invalid classes
|
||||
if invalid_class_names:
|
||||
original_count = len(response.classes)
|
||||
|
||||
filtered_classes = [
|
||||
c for c in response.classes
|
||||
if c.name not in invalid_class_names
|
||||
]
|
||||
|
||||
logger.info(
|
||||
f"Filtered out {original_count - len(filtered_classes)} invalid classes: "
|
||||
f"{invalid_class_names}"
|
||||
)
|
||||
|
||||
return OntologyExtractionResponse(
|
||||
classes=filtered_classes,
|
||||
domain=response.domain,
|
||||
)
|
||||
|
||||
return response
|
||||
@@ -409,3 +409,42 @@ async def render_episodic_title_and_type_prompt(content: str) -> str:
|
||||
})
|
||||
|
||||
return rendered_prompt
|
||||
|
||||
|
||||
async def render_ontology_extraction_prompt(
|
||||
scenario: str,
|
||||
domain: str | None = None,
|
||||
max_classes: int = 15,
|
||||
json_schema: dict | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Renders the ontology extraction prompt using the extract_ontology.jinja2 template.
|
||||
|
||||
Args:
|
||||
scenario: The scenario description text to extract ontology classes from
|
||||
domain: Optional domain hint for the scenario (e.g., "Healthcare", "Education")
|
||||
max_classes: Maximum number of classes to extract (default: 15)
|
||||
json_schema: JSON schema for the expected output format
|
||||
|
||||
Returns:
|
||||
Rendered prompt content as string
|
||||
"""
|
||||
template = prompt_env.get_template("extract_ontology.jinja2")
|
||||
rendered_prompt = template.render(
|
||||
scenario=scenario,
|
||||
domain=domain,
|
||||
max_classes=max_classes,
|
||||
json_schema=json_schema
|
||||
)
|
||||
|
||||
# 记录渲染结果到提示日志
|
||||
log_prompt_rendering('ontology extraction', rendered_prompt)
|
||||
# 可选:记录模板渲染信息
|
||||
log_template_rendering('extract_ontology.jinja2', {
|
||||
'scenario_len': len(scenario) if scenario else 0,
|
||||
'domain': domain,
|
||||
'max_classes': max_classes,
|
||||
'json_schema': 'OntologyExtractionResponse.schema'
|
||||
})
|
||||
|
||||
return rendered_prompt
|
||||
|
||||
210
api/app/core/memory/utils/prompt/prompts/extract_ontology.jinja2
Normal file
210
api/app/core/memory/utils/prompt/prompts/extract_ontology.jinja2
Normal file
@@ -0,0 +1,210 @@
|
||||
===Task===
|
||||
Extract ontology classes from the given scenario description following ontology engineering standards.
|
||||
|
||||
===Role===
|
||||
You are a professional ontology engineer with expertise in knowledge representation and OWL (Web Ontology Language) standards. Your task is to identify abstract classes and concepts from scenario descriptions, not concrete instances.
|
||||
|
||||
===Scenario Description===
|
||||
{{ scenario }}
|
||||
|
||||
{% if domain -%}
|
||||
===Domain Hint===
|
||||
This scenario belongs to the **{{ domain }}** domain. Consider domain-specific concepts and terminology when extracting classes.
|
||||
{%- endif %}
|
||||
|
||||
===Extraction Rules===
|
||||
|
||||
**1. Abstract Classes, Not Instances:**
|
||||
- Extract abstract categories and concepts (e.g., "MedicalProcedure", "Patient", "Diagnosis")
|
||||
- Do NOT extract concrete instances (e.g., "John Smith", "Room 301", "2024-01-15")
|
||||
- Think in terms of "types of things" rather than "specific things"
|
||||
|
||||
**2. Naming Convention (PascalCase):**
|
||||
- Use PascalCase format for the "name" field: start with uppercase letter, capitalize each word, no spaces
|
||||
- Examples: "MedicalProcedure", "HealthcareProvider", "DiagnosticTest"
|
||||
- Avoid: "medical procedure", "healthcare_provider", "diagnostic-test"
|
||||
- Use clear, descriptive names in English
|
||||
- Avoid abbreviations unless they are standard in the domain (e.g., "API", "DNA")
|
||||
- Provide Chinese translation in the "name_chinese" field (e.g., "医疗程序", "医疗服务提供者", "诊断测试")
|
||||
|
||||
**3. Domain Relevance:**
|
||||
- Focus on classes that are central to the scenario's domain
|
||||
- Prioritize classes that represent key concepts, entities, or relationships
|
||||
- Avoid overly generic classes (e.g., "Thing", "Object") unless they have specific domain meaning
|
||||
|
||||
**4. Class Quantity:**
|
||||
- Extract between 5 and {{ max_classes }} classes
|
||||
- Aim for a balanced set covering the main concepts in the scenario
|
||||
- Quality over quantity: prefer well-defined classes over exhaustive lists
|
||||
|
||||
**5. Clear Descriptions:**
|
||||
- Provide concise, informative descriptions in Chinese (max 500 characters)
|
||||
- Describe what the class represents, not specific instances
|
||||
- Use clear, natural Chinese language that explains the class's role in the domain
|
||||
|
||||
**6. Concrete Examples:**
|
||||
- Provide 2-5 concrete instance examples in Chinese for each class
|
||||
- Examples should be specific, realistic instances of the class
|
||||
- Examples help clarify the class's scope and meaning
|
||||
- Use natural Chinese language for examples
|
||||
- Example format: ["示例1", "示例2", "示例3"]
|
||||
|
||||
**7. Class Hierarchy:**
|
||||
- Identify parent-child relationships where applicable
|
||||
- Use the parent_class field to specify inheritance
|
||||
- Parent class must be one of the extracted classes or a standard OWL class
|
||||
- Leave parent_class as null for top-level classes
|
||||
|
||||
**8. Entity Types:**
|
||||
- Classify each class with an appropriate entity_type
|
||||
- Common types: "Person", "Organization", "Location", "Event", "Concept", "Process", "Object", "Role"
|
||||
- Choose the most specific type that applies
|
||||
|
||||
**9. OWL Reserved Words:**
|
||||
- Do NOT use OWL reserved words as class names
|
||||
- Reserved words include: "Thing", "Nothing", "Class", "Property", "ObjectProperty", "DatatypeProperty", "AnnotationProperty", "Ontology", "Individual", "Literal"
|
||||
- If a reserved word is needed, add a domain-specific prefix (e.g., "MedicalClass" instead of "Class")
|
||||
|
||||
**10. Language Consistency:**
|
||||
- Extract all class names in English (PascalCase format) for the "name" field
|
||||
- Provide Chinese translation for class names in the "name_chinese" field
|
||||
- Descriptions MUST be in Chinese (中文)
|
||||
- Examples MUST be in Chinese (中文)
|
||||
- Use clear, natural Chinese language for descriptions and examples
|
||||
|
||||
===Examples===
|
||||
|
||||
**Example 1 (Healthcare Domain):**
|
||||
Scenario: "A hospital manages patient records, schedules appointments, and coordinates medical procedures. Doctors diagnose conditions and prescribe treatments."
|
||||
|
||||
Output:
|
||||
{
|
||||
"classes": [
|
||||
{
|
||||
"name": "Patient",
|
||||
"name_chinese": "患者",
|
||||
"description": "在医疗机构接受医疗护理或治疗的人",
|
||||
"examples": ["张三", "李四", "患有糖尿病的老年患者"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Person",
|
||||
"domain": "Healthcare"
|
||||
},
|
||||
{
|
||||
"name": "MedicalProcedure",
|
||||
"name_chinese": "医疗程序",
|
||||
"description": "为医疗诊断或治疗而执行的系统性操作流程",
|
||||
"examples": ["手术", "血液检查", "X光检查", "疫苗接种"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Process",
|
||||
"domain": "Healthcare"
|
||||
},
|
||||
{
|
||||
"name": "Diagnosis",
|
||||
"name_chinese": "诊断",
|
||||
"description": "基于症状和检查结果对疾病或状况的识别",
|
||||
"examples": ["糖尿病诊断", "癌症诊断", "流感诊断"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Concept",
|
||||
"domain": "Healthcare"
|
||||
},
|
||||
{
|
||||
"name": "Doctor",
|
||||
"name_chinese": "医生",
|
||||
"description": "诊断和治疗患者的持证医疗专业人员",
|
||||
"examples": ["全科医生", "外科医生", "心脏病专家"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Role",
|
||||
"domain": "Healthcare"
|
||||
},
|
||||
{
|
||||
"name": "Treatment",
|
||||
"name_chinese": "治疗",
|
||||
"description": "为治愈或管理疾病状况而提供的医疗护理或疗法",
|
||||
"examples": ["药物治疗", "物理治疗", "化疗", "手术治疗"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Process",
|
||||
"domain": "Healthcare"
|
||||
}
|
||||
],
|
||||
"domain": "Healthcare",
|
||||
"namespace": "http://example.org/healthcare#"
|
||||
}
|
||||
|
||||
**Example 2 (Education Domain):**
|
||||
Scenario: "A university offers courses taught by professors. Students enroll in programs, attend lectures, and complete assignments to earn degrees."
|
||||
|
||||
Output:
|
||||
{
|
||||
"classes": [
|
||||
{
|
||||
"name": "Student",
|
||||
"name_chinese": "学生",
|
||||
"description": "在教育机构注册学习的人",
|
||||
"examples": ["本科生", "研究生", "在职学生"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Role",
|
||||
"domain": "Education"
|
||||
},
|
||||
{
|
||||
"name": "Course",
|
||||
"name_chinese": "课程",
|
||||
"description": "涵盖特定学科或主题的结构化教育课程",
|
||||
"examples": ["计算机科学导论", "微积分I", "世界历史"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Concept",
|
||||
"domain": "Education"
|
||||
},
|
||||
{
|
||||
"name": "Professor",
|
||||
"name_chinese": "教授",
|
||||
"description": "教授课程并进行研究的学术教师",
|
||||
"examples": ["助理教授", "副教授", "正教授"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Role",
|
||||
"domain": "Education"
|
||||
},
|
||||
{
|
||||
"name": "AcademicProgram",
|
||||
"name_chinese": "学术项目",
|
||||
"description": "通向学位或证书的结构化课程体系",
|
||||
"examples": ["理学学士", "文学硕士", "博士项目"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Concept",
|
||||
"domain": "Education"
|
||||
},
|
||||
{
|
||||
"name": "Assignment",
|
||||
"name_chinese": "作业",
|
||||
"description": "分配给学生以评估学习成果的任务或项目",
|
||||
"examples": ["论文", "习题集", "研究报告", "实验报告"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Object",
|
||||
"domain": "Education"
|
||||
},
|
||||
{
|
||||
"name": "Lecture",
|
||||
"name_chinese": "讲座",
|
||||
"description": "由教师进行的教育性演讲或讲座",
|
||||
"examples": ["入门讲座", "客座讲座", "在线讲座"],
|
||||
"parent_class": null,
|
||||
"entity_type": "Event",
|
||||
"domain": "Education"
|
||||
}
|
||||
],
|
||||
"domain": "Education",
|
||||
"namespace": "http://example.org/education#"
|
||||
}
|
||||
|
||||
===Output Format===
|
||||
|
||||
**JSON Requirements:**
|
||||
- Use only ASCII double quotes (") for JSON structure
|
||||
- Never use Chinese quotation marks ("") or Unicode quotes
|
||||
- Escape quotation marks in text with backslashes (\")
|
||||
- Ensure proper string closure and comma separation
|
||||
- No line breaks within JSON string values
|
||||
- All class names must be in PascalCase format
|
||||
- All class names must be unique (case-insensitive)
|
||||
- Extract between 5 and {{ max_classes }} classes
|
||||
|
||||
{{ json_schema }}
|
||||
10
api/app/core/memory/utils/validation/__init__.py
Normal file
10
api/app/core/memory/utils/validation/__init__.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Validation utilities for ontology extraction.
|
||||
|
||||
This module provides validation classes for ontology class names,
|
||||
descriptions, and OWL compliance checking.
|
||||
"""
|
||||
|
||||
from .ontology_validator import OntologyValidator
|
||||
from .owl_validator import OWLValidator
|
||||
|
||||
__all__ = ['OntologyValidator', 'OWLValidator']
|
||||
268
api/app/core/memory/utils/validation/ontology_validator.py
Normal file
268
api/app/core/memory/utils/validation/ontology_validator.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""String validation for ontology class names and descriptions.
|
||||
|
||||
This module provides the OntologyValidator class for validating and sanitizing
|
||||
ontology class names according to OWL standards and naming conventions.
|
||||
|
||||
Classes:
|
||||
OntologyValidator: Validates class names, removes duplicates, and truncates descriptions
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import List, Tuple
|
||||
|
||||
from app.core.memory.models.ontology_models import OntologyClass
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OntologyValidator:
|
||||
"""Validator for ontology class names and descriptions.
|
||||
|
||||
This validator performs string-level validation including:
|
||||
- PascalCase naming convention validation
|
||||
- OWL reserved word checking
|
||||
- Duplicate class name removal
|
||||
- Description length truncation
|
||||
|
||||
Attributes:
|
||||
OWL_RESERVED_WORDS: Set of OWL reserved words that cannot be used as class names
|
||||
"""
|
||||
|
||||
# OWL reserved words that cannot be used as class names
|
||||
OWL_RESERVED_WORDS = {
|
||||
'Thing', 'Nothing', 'Class', 'Property',
|
||||
'ObjectProperty', 'DatatypeProperty', 'FunctionalProperty',
|
||||
'InverseFunctionalProperty', 'TransitiveProperty', 'SymmetricProperty',
|
||||
'AsymmetricProperty', 'ReflexiveProperty', 'IrreflexiveProperty',
|
||||
'Restriction', 'Ontology', 'Individual', 'NamedIndividual',
|
||||
'Annotation', 'AnnotationProperty', 'Axiom',
|
||||
'AllDifferent', 'AllDisjointClasses', 'AllDisjointProperties',
|
||||
'Datatype', 'DataRange', 'Literal',
|
||||
'DeprecatedClass', 'DeprecatedProperty',
|
||||
'Imports', 'IncompatibleWith', 'PriorVersion', 'VersionInfo',
|
||||
'BackwardCompatibleWith', 'OntologyProperty',
|
||||
}
|
||||
|
||||
def validate_class_name(self, name: str) -> Tuple[bool, str]:
|
||||
"""Validate that a class name follows OWL naming conventions.
|
||||
|
||||
Validation rules:
|
||||
1. Must not be empty
|
||||
2. Must start with an uppercase letter (PascalCase)
|
||||
3. Cannot contain spaces
|
||||
4. Can only contain alphanumeric characters and underscores
|
||||
5. Cannot be an OWL reserved word
|
||||
|
||||
Args:
|
||||
name: The class name to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
- is_valid: True if the name is valid, False otherwise
|
||||
- error_message: Empty string if valid, error description if invalid
|
||||
|
||||
Examples:
|
||||
>>> validator = OntologyValidator()
|
||||
>>> validator.validate_class_name("MedicalProcedure")
|
||||
(True, "")
|
||||
>>> validator.validate_class_name("medical procedure")
|
||||
(False, "Class name 'medical procedure' cannot contain spaces")
|
||||
>>> validator.validate_class_name("Thing")
|
||||
(False, "Class name 'Thing' is an OWL reserved word")
|
||||
"""
|
||||
logger.debug(f"Validating class name: '{name}'")
|
||||
|
||||
# Check if empty
|
||||
if not name or not name.strip():
|
||||
error_msg = "Class name cannot be empty"
|
||||
logger.warning(f"Validation failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
name = name.strip()
|
||||
|
||||
# Check if it's an OWL reserved word
|
||||
if name in self.OWL_RESERVED_WORDS:
|
||||
error_msg = f"Class name '{name}' is an OWL reserved word"
|
||||
logger.warning(f"Validation failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
# Check if starts with uppercase letter
|
||||
if not name[0].isupper():
|
||||
error_msg = f"Class name '{name}' must start with an uppercase letter (PascalCase)"
|
||||
logger.warning(f"Validation failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
# Check for spaces
|
||||
if ' ' in name:
|
||||
error_msg = f"Class name '{name}' cannot contain spaces"
|
||||
logger.warning(f"Validation failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
# Check for invalid characters (only alphanumeric and underscore allowed)
|
||||
if not re.match(r'^[A-Za-z0-9_]+$', name):
|
||||
error_msg = f"Class name '{name}' contains invalid characters. Only alphanumeric characters and underscores are allowed"
|
||||
logger.warning(f"Validation failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
logger.debug(f"Class name '{name}' is valid")
|
||||
return True, ""
|
||||
|
||||
def sanitize_class_name(self, name: str) -> str:
|
||||
"""Attempt to sanitize an invalid class name into a valid format.
|
||||
|
||||
Sanitization steps:
|
||||
1. Strip whitespace
|
||||
2. Remove invalid characters
|
||||
3. Replace spaces with empty string (PascalCase)
|
||||
4. Capitalize first letter of each word
|
||||
5. If result is empty or starts with number, prefix with 'Class'
|
||||
|
||||
Args:
|
||||
name: The class name to sanitize
|
||||
|
||||
Returns:
|
||||
Sanitized class name that should pass validation
|
||||
|
||||
Examples:
|
||||
>>> validator = OntologyValidator()
|
||||
>>> validator.sanitize_class_name("medical procedure")
|
||||
'MedicalProcedure'
|
||||
>>> validator.sanitize_class_name("patient-record")
|
||||
'PatientRecord'
|
||||
>>> validator.sanitize_class_name("123invalid")
|
||||
'Class123Invalid'
|
||||
"""
|
||||
logger.debug(f"Sanitizing class name: '{name}'")
|
||||
|
||||
if not name or not name.strip():
|
||||
logger.warning("Empty class name provided for sanitization, returning 'UnnamedClass'")
|
||||
return "UnnamedClass"
|
||||
|
||||
# Strip whitespace
|
||||
name = name.strip()
|
||||
original_name = name
|
||||
|
||||
# Split on spaces, hyphens, and underscores, then capitalize each word
|
||||
words = re.split(r'[\s\-_]+', name)
|
||||
|
||||
# Capitalize first letter of each word and keep rest as is
|
||||
sanitized_words = []
|
||||
for word in words:
|
||||
if word:
|
||||
# Remove non-alphanumeric characters except underscore
|
||||
clean_word = re.sub(r'[^A-Za-z0-9_]', '', word)
|
||||
if clean_word:
|
||||
# Capitalize first letter
|
||||
sanitized_words.append(clean_word[0].upper() + clean_word[1:])
|
||||
|
||||
# Join words
|
||||
sanitized = ''.join(sanitized_words)
|
||||
|
||||
# If empty or starts with number, prefix with 'Class'
|
||||
if not sanitized or sanitized[0].isdigit():
|
||||
sanitized = 'Class' + sanitized
|
||||
logger.info(f"Prefixed class name with 'Class': '{original_name}' -> '{sanitized}'")
|
||||
|
||||
# If it's a reserved word, append 'Class' suffix
|
||||
if sanitized in self.OWL_RESERVED_WORDS:
|
||||
sanitized = sanitized + 'Class'
|
||||
logger.info(f"Appended 'Class' suffix to reserved word: '{original_name}' -> '{sanitized}'")
|
||||
|
||||
logger.info(f"Sanitized class name: '{original_name}' -> '{sanitized}'")
|
||||
return sanitized
|
||||
|
||||
def remove_duplicates(self, classes: List[OntologyClass]) -> List[OntologyClass]:
|
||||
"""Remove duplicate ontology classes based on case-insensitive name comparison.
|
||||
|
||||
When duplicates are found, keeps the first occurrence and discards subsequent ones.
|
||||
Comparison is case-insensitive to catch variations like 'Patient' and 'patient'.
|
||||
|
||||
Args:
|
||||
classes: List of OntologyClass objects
|
||||
|
||||
Returns:
|
||||
List of OntologyClass objects with duplicates removed
|
||||
|
||||
Examples:
|
||||
>>> validator = OntologyValidator()
|
||||
>>> classes = [
|
||||
... OntologyClass(name="Patient", description="A patient", entity_type="Person", domain="Healthcare"),
|
||||
... OntologyClass(name="patient", description="Another patient", entity_type="Person", domain="Healthcare"),
|
||||
... OntologyClass(name="Doctor", description="A doctor", entity_type="Person", domain="Healthcare"),
|
||||
... ]
|
||||
>>> unique = validator.remove_duplicates(classes)
|
||||
>>> len(unique)
|
||||
2
|
||||
>>> [c.name for c in unique]
|
||||
['Patient', 'Doctor']
|
||||
"""
|
||||
if not classes:
|
||||
logger.debug("No classes to check for duplicates")
|
||||
return classes
|
||||
|
||||
logger.debug(f"Checking {len(classes)} classes for duplicates")
|
||||
|
||||
seen_names = set()
|
||||
unique_classes = []
|
||||
duplicates_found = []
|
||||
|
||||
for ontology_class in classes:
|
||||
# Use lowercase for comparison
|
||||
name_lower = ontology_class.name.lower()
|
||||
|
||||
if name_lower not in seen_names:
|
||||
seen_names.add(name_lower)
|
||||
unique_classes.append(ontology_class)
|
||||
else:
|
||||
duplicates_found.append(ontology_class.name)
|
||||
logger.debug(f"Duplicate class found and removed: '{ontology_class.name}'")
|
||||
|
||||
if duplicates_found:
|
||||
logger.info(
|
||||
f"Removed {len(duplicates_found)} duplicate classes: {duplicates_found}"
|
||||
)
|
||||
else:
|
||||
logger.debug("No duplicate classes found")
|
||||
|
||||
return unique_classes
|
||||
|
||||
def truncate_description(self, description: str, max_length: int = 500) -> str:
|
||||
"""Truncate a description to a maximum length.
|
||||
|
||||
If the description exceeds max_length, it will be truncated and
|
||||
an ellipsis (...) will be appended to indicate truncation.
|
||||
|
||||
Args:
|
||||
description: The description text to truncate
|
||||
max_length: Maximum allowed length (default: 500)
|
||||
|
||||
Returns:
|
||||
Truncated description string
|
||||
|
||||
Examples:
|
||||
>>> validator = OntologyValidator()
|
||||
>>> long_desc = "A" * 600
|
||||
>>> truncated = validator.truncate_description(long_desc, max_length=500)
|
||||
>>> len(truncated)
|
||||
500
|
||||
>>> truncated.endswith("...")
|
||||
True
|
||||
"""
|
||||
if not description:
|
||||
return ""
|
||||
|
||||
if len(description) <= max_length:
|
||||
return description
|
||||
|
||||
# Truncate and add ellipsis
|
||||
# Reserve 3 characters for "..."
|
||||
truncate_at = max_length - 3
|
||||
truncated = description[:truncate_at] + "..."
|
||||
|
||||
logger.debug(
|
||||
f"Truncated description from {len(description)} to {len(truncated)} characters"
|
||||
)
|
||||
|
||||
return truncated
|
||||
585
api/app/core/memory/utils/validation/owl_validator.py
Normal file
585
api/app/core/memory/utils/validation/owl_validator.py
Normal file
@@ -0,0 +1,585 @@
|
||||
"""OWL semantic validation for ontology classes using Owlready2.
|
||||
|
||||
This module provides the OWLValidator class for validating ontology classes
|
||||
against OWL standards using the Owlready2 library. It performs semantic
|
||||
validation including consistency checking, circular inheritance detection,
|
||||
and OWL file export.
|
||||
|
||||
Classes:
|
||||
OWLValidator: Validates ontology classes using OWL reasoning and exports to OWL formats
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from owlready2 import (
|
||||
World,
|
||||
Thing,
|
||||
get_ontology,
|
||||
sync_reasoner_pellet,
|
||||
OwlReadyInconsistentOntologyError,
|
||||
)
|
||||
|
||||
from app.core.memory.models.ontology_models import OntologyClass
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OWLValidator:
|
||||
"""Validator for OWL semantic validation of ontology classes.
|
||||
|
||||
This validator performs semantic-level validation using Owlready2 including:
|
||||
- Creating OWL classes from ontology class definitions
|
||||
- Running consistency checking with Pellet reasoner
|
||||
- Detecting circular inheritance
|
||||
- Validating Protégé compatibility
|
||||
- Exporting ontologies to various OWL formats (RDF/XML, Turtle, N-Triples)
|
||||
|
||||
Attributes:
|
||||
base_namespace: Base URI for the ontology namespace
|
||||
"""
|
||||
|
||||
def __init__(self, base_namespace: str = "http://example.org/ontology#"):
|
||||
"""Initialize the OWL validator.
|
||||
|
||||
Args:
|
||||
base_namespace: Base URI for the ontology namespace (default: http://example.org/ontology#)
|
||||
"""
|
||||
self.base_namespace = base_namespace
|
||||
|
||||
def validate_ontology_classes(
|
||||
self,
|
||||
classes: List[OntologyClass],
|
||||
) -> Tuple[bool, List[str], Optional[World]]:
|
||||
"""Validate extracted ontology classes against OWL standards.
|
||||
|
||||
This method creates an OWL ontology from the provided classes using Owlready2,
|
||||
runs consistency checking with the Pellet reasoner, and detects common issues
|
||||
like circular inheritance.
|
||||
|
||||
Args:
|
||||
classes: List of OntologyClass objects to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_messages, world):
|
||||
- is_valid: True if ontology is valid and consistent, False otherwise
|
||||
- error_messages: List of error/warning messages
|
||||
- world: Owlready2 World object containing the ontology (None if validation failed)
|
||||
|
||||
Examples:
|
||||
>>> validator = OWLValidator()
|
||||
>>> classes = [
|
||||
... OntologyClass(name="Patient", description="A patient", entity_type="Person", domain="Healthcare"),
|
||||
... OntologyClass(name="Doctor", description="A doctor", entity_type="Person", domain="Healthcare"),
|
||||
... ]
|
||||
>>> is_valid, errors, world = validator.validate_ontology_classes(classes)
|
||||
>>> is_valid
|
||||
True
|
||||
>>> len(errors)
|
||||
0
|
||||
"""
|
||||
if not classes:
|
||||
return False, ["No classes provided for validation"], None
|
||||
|
||||
errors = []
|
||||
|
||||
try:
|
||||
# Create a new world (isolated ontology environment)
|
||||
world = World()
|
||||
|
||||
# Use a proper ontology IRI
|
||||
# Owlready2 expects the IRI to end with .owl or similar
|
||||
onto_iri = self.base_namespace.rstrip('#/')
|
||||
if not onto_iri.endswith('.owl'):
|
||||
onto_iri = onto_iri + '.owl'
|
||||
|
||||
# Create ontology
|
||||
onto = world.get_ontology(onto_iri)
|
||||
|
||||
with onto:
|
||||
# Dictionary to store created OWL classes for parent reference
|
||||
owl_classes = {}
|
||||
|
||||
# First pass: Create all classes without parent relationships
|
||||
for ontology_class in classes:
|
||||
try:
|
||||
# Create OWL class dynamically using type() with Thing as base
|
||||
# The key is to NOT set namespace in the dict, let Owlready2 handle it
|
||||
owl_class = type(
|
||||
ontology_class.name, # Class name
|
||||
(Thing,), # Base classes
|
||||
{} # Class dict (empty, let Owlready2 manage)
|
||||
)
|
||||
|
||||
# Add label (rdfs:label) - include both English and Chinese names
|
||||
labels = [ontology_class.name]
|
||||
if ontology_class.name_chinese:
|
||||
labels.append(ontology_class.name_chinese)
|
||||
owl_class.label = labels
|
||||
|
||||
# Add comment (rdfs:comment) with description
|
||||
if ontology_class.description:
|
||||
owl_class.comment = [ontology_class.description]
|
||||
|
||||
# Store for parent relationship setup
|
||||
owl_classes[ontology_class.name] = owl_class
|
||||
|
||||
logger.debug(
|
||||
f"Created OWL class: {ontology_class.name} "
|
||||
f"(Chinese: {ontology_class.name_chinese}) "
|
||||
f"IRI: {owl_class.iri if hasattr(owl_class, 'iri') else 'N/A'}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to create OWL class '{ontology_class.name}': {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.error(error_msg, exc_info=True)
|
||||
|
||||
# Second pass: Set up parent relationships
|
||||
for ontology_class in classes:
|
||||
if ontology_class.parent_class and ontology_class.name in owl_classes:
|
||||
parent_name = ontology_class.parent_class
|
||||
|
||||
# Check if parent exists
|
||||
if parent_name in owl_classes:
|
||||
try:
|
||||
child_class = owl_classes[ontology_class.name]
|
||||
parent_class = owl_classes[parent_name]
|
||||
|
||||
# Set parent by modifying is_a
|
||||
child_class.is_a = [parent_class]
|
||||
|
||||
logger.debug(
|
||||
f"Set parent relationship: {ontology_class.name} -> {parent_name}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = (
|
||||
f"Failed to set parent relationship "
|
||||
f"'{ontology_class.name}' -> '{parent_name}': {str(e)}"
|
||||
)
|
||||
errors.append(error_msg)
|
||||
logger.warning(error_msg)
|
||||
else:
|
||||
warning_msg = (
|
||||
f"Parent class '{parent_name}' not found for '{ontology_class.name}'"
|
||||
)
|
||||
errors.append(warning_msg)
|
||||
logger.warning(warning_msg)
|
||||
|
||||
# Check for circular inheritance
|
||||
for class_name, owl_class in owl_classes.items():
|
||||
if self._has_circular_inheritance(owl_class):
|
||||
error_msg = f"Circular inheritance detected for class '{class_name}'"
|
||||
errors.append(error_msg)
|
||||
logger.error(error_msg)
|
||||
|
||||
# Run consistency checking with Pellet reasoner
|
||||
try:
|
||||
logger.info("Running Pellet reasoner for consistency checking...")
|
||||
sync_reasoner_pellet(world, infer_property_values=True, infer_data_property_values=True)
|
||||
logger.info("Consistency check passed")
|
||||
|
||||
except OwlReadyInconsistentOntologyError as e:
|
||||
error_msg = f"Ontology is inconsistent: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.error(error_msg)
|
||||
return False, errors, world
|
||||
|
||||
except Exception as e:
|
||||
# Reasoner errors are often due to Java not being installed or configured
|
||||
# Log as warning but don't fail validation - ontology structure is still valid
|
||||
warning_msg = f"Reasoner check skipped: {str(e)}"
|
||||
if str(e).strip(): # Only log if there's an actual error message
|
||||
logger.warning(warning_msg)
|
||||
else:
|
||||
logger.warning("Reasoner check skipped: Java may not be installed or configured")
|
||||
# Continue - ontology structure is valid even without reasoner check
|
||||
|
||||
# If we have errors (excluding warnings), validation failed
|
||||
is_valid = len(errors) == 0
|
||||
|
||||
return is_valid, errors, world
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"OWL validation failed: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.error(error_msg, exc_info=True)
|
||||
return False, errors, None
|
||||
|
||||
def _has_circular_inheritance(self, owl_class) -> bool:
|
||||
"""Check if an OWL class has circular inheritance.
|
||||
|
||||
Circular inheritance occurs when a class inherits from itself through
|
||||
a chain of parent relationships (e.g., A -> B -> C -> A).
|
||||
|
||||
Args:
|
||||
owl_class: Owlready2 class object to check
|
||||
|
||||
Returns:
|
||||
True if circular inheritance is detected, False otherwise
|
||||
"""
|
||||
visited = set()
|
||||
current = owl_class
|
||||
|
||||
while current:
|
||||
# Get class IRI or name as identifier
|
||||
class_id = str(current.iri) if hasattr(current, 'iri') else str(current)
|
||||
|
||||
if class_id in visited:
|
||||
# Found a cycle
|
||||
return True
|
||||
|
||||
visited.add(class_id)
|
||||
|
||||
# Get parent classes (is_a relationship)
|
||||
parents = getattr(current, 'is_a', [])
|
||||
|
||||
# Filter out Thing and other base classes
|
||||
parent_classes = [p for p in parents if p != Thing and hasattr(p, 'is_a')]
|
||||
|
||||
if not parent_classes:
|
||||
# No more parents, no cycle
|
||||
break
|
||||
|
||||
# Check first parent (in single inheritance)
|
||||
current = parent_classes[0] if parent_classes else None
|
||||
|
||||
return False
|
||||
|
||||
def export_to_owl(
|
||||
self,
|
||||
world: World,
|
||||
output_path: Optional[str] = None,
|
||||
format: str = "rdfxml",
|
||||
classes: Optional[List] = None
|
||||
) -> str:
|
||||
"""Export ontology to OWL file in specified format.
|
||||
|
||||
Supported formats:
|
||||
- rdfxml: RDF/XML format (default, most compatible)
|
||||
- turtle: Turtle format (more readable)
|
||||
- ntriples: N-Triples format (simplest)
|
||||
- json: JSON format (simplified, human-readable)
|
||||
|
||||
Args:
|
||||
world: Owlready2 World object containing the ontology
|
||||
output_path: Optional file path to save the ontology (if None, returns string)
|
||||
format: Export format - "rdfxml", "turtle", "ntriples", or "json" (default: "rdfxml")
|
||||
classes: Optional list of OntologyClass objects (required for json format)
|
||||
|
||||
Returns:
|
||||
String representation of the exported ontology
|
||||
|
||||
Raises:
|
||||
ValueError: If format is not supported
|
||||
RuntimeError: If export fails
|
||||
|
||||
Examples:
|
||||
>>> validator = OWLValidator()
|
||||
>>> is_valid, errors, world = validator.validate_ontology_classes(classes)
|
||||
>>> owl_content = validator.export_to_owl(world, "ontology.owl", format="rdfxml")
|
||||
"""
|
||||
# Validate format
|
||||
valid_formats = ["rdfxml", "turtle", "ntriples", "json"]
|
||||
if format not in valid_formats:
|
||||
raise ValueError(
|
||||
f"Unsupported format '{format}'. Must be one of: {', '.join(valid_formats)}"
|
||||
)
|
||||
|
||||
# JSON format doesn't need OWL processing
|
||||
if format == "json":
|
||||
if not classes:
|
||||
raise ValueError("Classes list is required for JSON format export")
|
||||
return self._export_to_json(classes)
|
||||
|
||||
# For OWL formats, world is required
|
||||
if not world:
|
||||
raise ValueError("World object is None. Cannot export ontology.")
|
||||
|
||||
# Note: Owlready2 has issues with turtle format export
|
||||
# We'll handle it specially by converting from rdfxml
|
||||
use_conversion = (format == "turtle")
|
||||
|
||||
try:
|
||||
# Get all ontologies in the world
|
||||
ontologies = list(world.ontologies.values())
|
||||
|
||||
if not ontologies:
|
||||
raise RuntimeError("No ontologies found in world")
|
||||
|
||||
# Find the ontology with classes (skip anonymous/empty ontologies)
|
||||
onto = None
|
||||
for ont in ontologies:
|
||||
classes_count = len(list(ont.classes()))
|
||||
logger.debug(f"Checking ontology {ont.base_iri}: {classes_count} classes")
|
||||
if classes_count > 0:
|
||||
onto = ont
|
||||
break
|
||||
|
||||
# If no ontology with classes found, use the last non-anonymous one
|
||||
if onto is None:
|
||||
for ont in reversed(ontologies):
|
||||
if ont.base_iri != "http://anonymous/":
|
||||
onto = ont
|
||||
break
|
||||
|
||||
# If still no ontology, use the first one
|
||||
if onto is None:
|
||||
onto = ontologies[0]
|
||||
|
||||
# Log ontology contents for debugging
|
||||
logger.info(f"Ontology IRI: {onto.base_iri}")
|
||||
logger.info(f"Ontology contains {len(list(onto.classes()))} classes")
|
||||
|
||||
# List all classes in the ontology
|
||||
all_classes = list(onto.classes())
|
||||
for cls in all_classes:
|
||||
logger.info(f"Class in ontology: {cls.name} (IRI: {cls.iri})")
|
||||
if hasattr(cls, 'label'):
|
||||
logger.debug(f" Labels: {cls.label}")
|
||||
if hasattr(cls, 'comment'):
|
||||
logger.debug(f" Comments: {cls.comment}")
|
||||
|
||||
if len(all_classes) == 0:
|
||||
logger.warning("No classes found in ontology! This may indicate a problem with class creation.")
|
||||
|
||||
if output_path:
|
||||
# Save to file
|
||||
export_format = "rdfxml" if use_conversion else format
|
||||
logger.info(f"Exporting ontology to {output_path} in {export_format} format")
|
||||
onto.save(file=output_path, format=export_format)
|
||||
|
||||
# Read back the file content to return
|
||||
with open(output_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Convert to turtle if needed
|
||||
if use_conversion:
|
||||
content = self._convert_to_turtle(content)
|
||||
|
||||
logger.info(f"Successfully exported ontology to {output_path}")
|
||||
|
||||
# Format the content for better readability
|
||||
content = self._format_owl_content(content, format)
|
||||
|
||||
return content
|
||||
else:
|
||||
# Export to string (save to temporary location and read)
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.owl', delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
try:
|
||||
export_format = "rdfxml" if use_conversion else format
|
||||
onto.save(file=tmp_path, format=export_format)
|
||||
|
||||
with open(tmp_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Convert to turtle if needed
|
||||
if use_conversion:
|
||||
content = self._convert_to_turtle(content)
|
||||
|
||||
# Format the content for better readability
|
||||
content = self._format_owl_content(content, format)
|
||||
|
||||
return content
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if os.path.exists(tmp_path):
|
||||
os.remove(tmp_path)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to export ontology: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
raise RuntimeError(error_msg) from e
|
||||
|
||||
def _export_to_json(self, classes: List) -> str:
|
||||
"""Export ontology classes to simplified JSON format.
|
||||
|
||||
This format is more compact and easier to parse than OWL XML.
|
||||
|
||||
Args:
|
||||
classes: List of OntologyClass objects
|
||||
|
||||
Returns:
|
||||
JSON string representation (compact format)
|
||||
"""
|
||||
import json
|
||||
|
||||
result = {
|
||||
"ontology": {
|
||||
"namespace": self.base_namespace,
|
||||
"classes": []
|
||||
}
|
||||
}
|
||||
|
||||
for cls in classes:
|
||||
class_data = {
|
||||
"name": cls.name,
|
||||
"name_chinese": cls.name_chinese,
|
||||
"description": cls.description,
|
||||
"entity_type": cls.entity_type,
|
||||
"domain": cls.domain,
|
||||
"parent_class": cls.parent_class,
|
||||
"examples": cls.examples if hasattr(cls, 'examples') else []
|
||||
}
|
||||
result["ontology"]["classes"].append(class_data)
|
||||
|
||||
# 使用紧凑格式:无缩进,使用分隔符减少空格
|
||||
return json.dumps(result, ensure_ascii=False, separators=(',', ':'))
|
||||
|
||||
def _convert_to_turtle(self, rdfxml_content: str) -> str:
|
||||
"""Convert RDF/XML content to Turtle format using rdflib.
|
||||
|
||||
Args:
|
||||
rdfxml_content: RDF/XML format content
|
||||
|
||||
Returns:
|
||||
Turtle format content
|
||||
"""
|
||||
try:
|
||||
from rdflib import Graph
|
||||
|
||||
# Parse RDF/XML
|
||||
g = Graph()
|
||||
g.parse(data=rdfxml_content, format="xml")
|
||||
|
||||
# Serialize to Turtle
|
||||
turtle_content = g.serialize(format="turtle")
|
||||
|
||||
# Handle bytes vs string
|
||||
if isinstance(turtle_content, bytes):
|
||||
turtle_content = turtle_content.decode('utf-8')
|
||||
|
||||
return turtle_content
|
||||
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"rdflib is not installed. Cannot convert to Turtle format. "
|
||||
"Install with: pip install rdflib"
|
||||
)
|
||||
return rdfxml_content
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to convert to Turtle format: {e}")
|
||||
return rdfxml_content
|
||||
|
||||
def _format_owl_content(self, content: str, format: str) -> str:
|
||||
"""Format OWL content for better readability.
|
||||
|
||||
Args:
|
||||
content: Raw OWL content string
|
||||
format: Format type (rdfxml, turtle, ntriples)
|
||||
|
||||
Returns:
|
||||
Formatted OWL content string
|
||||
"""
|
||||
if format == "rdfxml":
|
||||
# Format XML with proper indentation
|
||||
try:
|
||||
import xml.dom.minidom as minidom
|
||||
dom = minidom.parseString(content)
|
||||
# Pretty print with 2-space indentation
|
||||
formatted = dom.toprettyxml(indent=" ", encoding="utf-8").decode("utf-8")
|
||||
|
||||
# Remove extra blank lines
|
||||
lines = []
|
||||
prev_blank = False
|
||||
for line in formatted.split('\n'):
|
||||
is_blank = not line.strip()
|
||||
if not (is_blank and prev_blank): # Skip consecutive blank lines
|
||||
lines.append(line)
|
||||
prev_blank = is_blank
|
||||
|
||||
formatted = '\n'.join(lines)
|
||||
|
||||
return formatted
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to format XML content: {e}")
|
||||
return content
|
||||
|
||||
elif format == "turtle":
|
||||
# Turtle format is already relatively readable
|
||||
# Just ensure consistent line endings and not empty
|
||||
if not content or content.strip() == "":
|
||||
logger.warning("Turtle content is empty, this may indicate an export issue")
|
||||
return content.strip() + '\n' if content.strip() else content
|
||||
|
||||
elif format == "ntriples":
|
||||
# N-Triples format is line-based, ensure proper line endings
|
||||
return content.strip() + '\n' if content.strip() else content
|
||||
|
||||
return content
|
||||
|
||||
def validate_with_protege_compatibility(
|
||||
self,
|
||||
classes: List[OntologyClass]
|
||||
) -> Tuple[bool, List[str]]:
|
||||
"""Validate that ontology classes are compatible with Protégé editor.
|
||||
|
||||
Protégé compatibility checks:
|
||||
- Class names are valid OWL identifiers
|
||||
- No special characters that Protégé cannot handle
|
||||
- Namespace is properly formatted
|
||||
- Labels and comments are properly encoded
|
||||
|
||||
Args:
|
||||
classes: List of OntologyClass objects to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_compatible, warnings):
|
||||
- is_compatible: True if compatible with Protégé, False otherwise
|
||||
- warnings: List of compatibility warning messages
|
||||
|
||||
Examples:
|
||||
>>> validator = OWLValidator()
|
||||
>>> classes = [OntologyClass(name="Patient", description="A patient", entity_type="Person", domain="Healthcare")]
|
||||
>>> is_compatible, warnings = validator.validate_with_protege_compatibility(classes)
|
||||
>>> is_compatible
|
||||
True
|
||||
"""
|
||||
warnings = []
|
||||
|
||||
# Check namespace format
|
||||
if not self.base_namespace.startswith(('http://', 'https://')):
|
||||
warnings.append(
|
||||
f"Namespace '{self.base_namespace}' should start with http:// or https:// "
|
||||
"for Protégé compatibility"
|
||||
)
|
||||
|
||||
if not self.base_namespace.endswith(('#', '/')):
|
||||
warnings.append(
|
||||
f"Namespace '{self.base_namespace}' should end with # or / "
|
||||
"for Protégé compatibility"
|
||||
)
|
||||
|
||||
# Check each class
|
||||
for ontology_class in classes:
|
||||
# Check for special characters that might cause issues
|
||||
if any(char in ontology_class.name for char in ['<', '>', '"', '{', '}', '|', '^', '`']):
|
||||
warnings.append(
|
||||
f"Class name '{ontology_class.name}' contains special characters "
|
||||
"that may cause issues in Protégé"
|
||||
)
|
||||
|
||||
# Check description length (Protégé can handle long descriptions but may display poorly)
|
||||
if ontology_class.description and len(ontology_class.description) > 1000:
|
||||
warnings.append(
|
||||
f"Class '{ontology_class.name}' has a very long description ({len(ontology_class.description)} chars) "
|
||||
"which may display poorly in Protégé"
|
||||
)
|
||||
|
||||
# Check for non-ASCII characters (Protégé supports them but encoding issues may occur)
|
||||
if not ontology_class.name.isascii():
|
||||
warnings.append(
|
||||
f"Class name '{ontology_class.name}' contains non-ASCII characters "
|
||||
"which may cause encoding issues in some Protégé versions"
|
||||
)
|
||||
|
||||
# If no warnings, it's compatible
|
||||
is_compatible = len(warnings) == 0
|
||||
|
||||
return is_compatible, warnings
|
||||
Reference in New Issue
Block a user