feat(implicit memory): upgrade pydantic v2 compatibility and confidence level handling

- Replace deprecated `.dict()` with `.model_dump(mode='json')` for pydantic v2 compatibility
- Convert confidence level from enum-based strings to numerical values (0-100 scale)
- Add confidence level mapping in controller (high: 85, medium: 50, low: 20)
- Update dimension analyzer to handle both string and numeric confidence inputs
- Refactor habit analyzer confidence level validation logic
- Remove ConfidenceLevel enum import and replace with integer-based approach
- Update memory config validators for numerical confidence level support
- Ensure all implicit memory schemas use model_dump for serialization
- Improve type consistency across memory analytics modules
This commit is contained in:
Ke Sun
2026-01-08 17:50:01 +08:00
parent e05f33b286
commit 7167c2002f
7 changed files with 195 additions and 105 deletions

View File

@@ -171,7 +171,7 @@ async def get_preference_tags(
)
api_logger.info(f"Retrieved {len(tags)} preference tags for user: {user_id}")
return success(data=[tag.dict() for tag in tags], msg="偏好标签获取成功")
return success(data=[tag.model_dump(mode='json') for tag in tags], msg="偏好标签获取成功")
except Exception as e:
return handle_implicit_memory_error(e, "偏好标签获取", user_id)
@@ -210,7 +210,7 @@ async def get_dimension_portrait(
)
api_logger.info(f"Dimension portrait retrieved for user: {user_id}")
return success(data=portrait.dict(), msg="四维画像获取成功")
return success(data=portrait.model_dump(mode='json'), msg="四维画像获取成功")
except Exception as e:
return handle_implicit_memory_error(e, "四维画像获取", user_id)
@@ -249,7 +249,7 @@ async def get_interest_area_distribution(
)
api_logger.info(f"Interest area distribution retrieved for user: {user_id}")
return success(data=distribution.dict(), msg="兴趣领域分布获取成功")
return success(data=distribution.model_dump(mode='json'), msg="兴趣领域分布获取成功")
except Exception as e:
return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id)
@@ -283,18 +283,28 @@ async def get_behavior_habits(
# Validate inputs
validate_user_id(user_id)
# Convert string confidence level to numerical
numerical_confidence = None
if confidence_level:
confidence_mapping = {
"high": 85,
"medium": 50,
"low": 20
}
numerical_confidence = confidence_mapping.get(confidence_level.lower())
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id)
habits = await service.get_behavior_habits(
user_id=user_id,
confidence_level=confidence_level,
confidence_level=numerical_confidence,
frequency_pattern=frequency_pattern,
time_period=time_period
)
api_logger.info(f"Retrieved {len(habits)} behavior habits for user: {user_id}")
return success(data=[habit.dict() for habit in habits], msg="行为习惯获取成功")
return success(data=[habit.model_dump(mode='json') for habit in habits], msg="行为习惯获取成功")
except Exception as e:
return handle_implicit_memory_error(e, "行为习惯获取", user_id)

View File

@@ -12,7 +12,6 @@ from typing import Any, Dict, List, Optional
from app.core.memory.analytics.implicit_memory.llm_client import ImplicitMemoryLLMClient
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
ConfidenceLevel,
DimensionPortrait,
DimensionScore,
UserMemorySummary,
@@ -28,7 +27,7 @@ class DimensionData(BaseModel):
percentage: float = Field(ge=0.0, le=100.0)
evidence: List[str] = Field(default_factory=list)
reasoning: str = ""
confidence_level: str = "medium"
confidence_level: int = 50 # Default to medium confidence
class DimensionAnalysisResponse(BaseModel):
@@ -147,8 +146,7 @@ class DimensionAnalyzer:
percentage = max(0.0, min(100.0, float(percentage)))
# Validate confidence level
confidence_level_str = dimension_data.get("confidence_level", "low")
confidence_level = self._validate_confidence_level(confidence_level_str)
confidence_level = self._validate_confidence_level(dimension_data.get("confidence_level", 50))
# Ensure evidence is not empty
evidence = dimension_data.get("evidence", [])
@@ -182,32 +180,41 @@ class DimensionAnalyzer:
percentage=0.0,
evidence=["Insufficient data for analysis"],
reasoning=f"No clear evidence found for {dimension_name} dimension",
confidence_level=ConfidenceLevel.LOW
confidence_level=20 # Low confidence as numerical value
)
def _validate_confidence_level(self, confidence_str: str) -> ConfidenceLevel:
"""Validate and convert confidence level string.
def _validate_confidence_level(self, confidence_level) -> int:
"""Return confidence level as integer, handling both string and numeric inputs.
Args:
confidence_str: Confidence level as string
confidence_level: Confidence level (string or numeric)
Returns:
ConfidenceLevel enum value
Confidence level as integer (0-100)
"""
if not confidence_str:
return ConfidenceLevel.MEDIUM
# If it's already a number, return it as int
if isinstance(confidence_level, (int, float)):
return int(confidence_level)
confidence_str = str(confidence_str).lower().strip()
# If it's a string, convert common values to numbers
if isinstance(confidence_level, str):
confidence_str = confidence_level.lower().strip()
if confidence_str in ["high", "높음"]:
return 85
elif confidence_str in ["medium", "중간"]:
return 50
elif confidence_str in ["low", "낮음"]:
return 20
else:
# Try to parse as number
try:
return int(float(confidence_str))
except ValueError:
logger.warning(f"Unknown confidence level: {confidence_level}, defaulting to medium")
return 50
if confidence_str in ["high", "높음"]:
return ConfidenceLevel.HIGH
elif confidence_str in ["medium", "중간"]:
return ConfidenceLevel.MEDIUM
elif confidence_str in ["low", "낮음"]:
return ConfidenceLevel.LOW
else:
logger.warning(f"Unknown confidence level: {confidence_str}, defaulting to medium")
return ConfidenceLevel.MEDIUM
# Default fallback
return 50
def _create_empty_portrait(self, user_id: str) -> DimensionPortrait:
"""Create an empty dimension portrait when no data is available.

View File

@@ -6,14 +6,13 @@ similar habits with confidence scoring.
"""
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from datetime import datetime
from typing import List, Optional
from app.core.memory.analytics.implicit_memory.llm_client import ImplicitMemoryLLMClient
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
ConfidenceLevel,
FrequencyPattern,
UserMemorySummary,
)
@@ -28,7 +27,7 @@ class HabitData(BaseModel):
habit_description: str
frequency_pattern: str
time_context: str
confidence_level: str
confidence_level: int = 50 # Default to medium confidence
supporting_summaries: List[str] = Field(default_factory=list)
specific_examples: List[str] = Field(default_factory=list)
is_current: bool = True
@@ -88,7 +87,6 @@ class HabitAnalyzer:
# Convert to BehaviorHabit objects
behavior_habits = []
current_time = datetime.now()
for habit_data in response.get("habits", []):
try:
@@ -105,8 +103,7 @@ class HabitAnalyzer:
habit_description=habit_data.get("habit_description", ""),
frequency_pattern=self._validate_frequency_pattern(habit_data.get("frequency_pattern", "occasional")),
time_context=habit_data.get("time_context", ""),
confidence_level=self._validate_confidence_level(habit_data.get("confidence_level", "medium")),
supporting_summaries=supporting_summaries,
confidence_level=self._validate_confidence_level(habit_data.get("confidence_level", 50)),
specific_examples=specific_examples,
first_observed=first_observed,
last_observed=last_observed,
@@ -165,26 +162,38 @@ class HabitAnalyzer:
return frequency_mapping.get(frequency_str, FrequencyPattern.OCCASIONAL)
def _validate_confidence_level(self, confidence_str: str) -> ConfidenceLevel:
"""Validate and convert confidence level string.
def _validate_confidence_level(self, confidence_level) -> int:
"""Return confidence level as integer, handling both string and numeric inputs.
Args:
confidence_str: Confidence level as string
confidence_level: Confidence level (string or numeric)
Returns:
ConfidenceLevel enum value
Confidence level as integer (0-100)
"""
confidence_str = confidence_str.lower().strip()
# If it's already a number, return it as int
if isinstance(confidence_level, (int, float)):
return int(confidence_level)
if confidence_str in ["high", "높음"]:
return ConfidenceLevel.HIGH
elif confidence_str in ["medium", "중간"]:
return ConfidenceLevel.MEDIUM
elif confidence_str in ["low", "낮음"]:
return ConfidenceLevel.LOW
else:
logger.warning(f"Unknown confidence level: {confidence_str}, defaulting to medium")
return ConfidenceLevel.MEDIUM
# If it's a string, convert common values to numbers
if isinstance(confidence_level, str):
confidence_str = confidence_level.lower().strip()
if confidence_str in ["high", "높음"]:
return 85
elif confidence_str in ["medium", "중간"]:
return 50
elif confidence_str in ["low", "낮음"]:
return 20
else:
# Try to parse as number
try:
return int(float(confidence_str))
except ValueError:
logger.warning(f"Unknown confidence level: {confidence_level}, defaulting to medium")
return 50
# Default fallback
return 50
def _determine_observation_dates(
self,
@@ -249,7 +258,7 @@ class HabitAnalyzer:
return False
# Check supporting summaries
if not habit.supporting_summaries or len(habit.supporting_summaries) == 0:
if not habit.specific_examples or len(habit.specific_examples) == 0:
return False
# Check specific examples
@@ -389,9 +398,9 @@ class HabitAnalyzer:
Returns:
Merged behavioral habit
"""
# Combine supporting summaries
combined_summaries = list(set(
existing_habit.supporting_summaries + new_habit.supporting_summaries
# Combine supporting summaries (using specific_examples instead)
combined_examples = list(set(
existing_habit.specific_examples + new_habit.specific_examples
))
# Combine specific examples
@@ -400,8 +409,7 @@ class HabitAnalyzer:
))
# Update confidence level (take higher confidence)
confidence_levels = [existing_habit.confidence_level, new_habit.confidence_level]
new_confidence = max(confidence_levels, key=lambda x: ["low", "medium", "high"].index(x.value))
new_confidence = max(existing_habit.confidence_level, new_habit.confidence_level)
# Update observation dates
first_observed = min(existing_habit.first_observed, new_habit.first_observed)
@@ -420,7 +428,6 @@ class HabitAnalyzer:
frequency_pattern=existing_habit.frequency_pattern, # Keep original frequency
time_context=combined_time_context,
confidence_level=new_confidence,
supporting_summaries=combined_summaries,
specific_examples=combined_examples,
first_observed=first_observed,
last_observed=last_observed,
@@ -437,8 +444,8 @@ class HabitAnalyzer:
Sorted list of habits
"""
def priority_score(habit: BehaviorHabit) -> tuple:
# Confidence level score (high=3, medium=2, low=1)
confidence_score = {"high": 3, "medium": 2, "low": 1}.get(habit.confidence_level.value, 1)
# Confidence level score (0-100 scale)
confidence_score = habit.confidence_level
# Recency score (more recent = higher score)
days_since_last = (datetime.now() - habit.last_observed).days

View File

@@ -16,7 +16,6 @@ from app.core.memory.analytics.implicit_memory.analyzers.habit_analyzer import (
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
ConfidenceLevel,
FrequencyPattern,
UserMemorySummary,
)
@@ -116,13 +115,8 @@ class HabitDetector:
def calculate_ranking_score(habit: BehaviorHabit) -> float:
"""Calculate combined ranking score for a habit."""
# Confidence score (0.0-1.0)
confidence_scores = {
ConfidenceLevel.HIGH: 1.0,
ConfidenceLevel.MEDIUM: 0.6,
ConfidenceLevel.LOW: 0.3
}
confidence_score = confidence_scores.get(habit.confidence_level, 0.3)
# Confidence score (0.0-1.0) - convert from 0-100 scale
confidence_score = habit.confidence_level / 100.0
# Recency score (0.0-1.0)
current_time = datetime.now()
@@ -152,7 +146,7 @@ class HabitDetector:
frequency_bonus = frequency_bonuses.get(habit.frequency_pattern, 0.0)
# Evidence quality bonus
evidence_bonus = min(len(habit.supporting_summaries) / 10.0, 0.1) # Max 0.1 bonus
evidence_bonus = min(len(habit.specific_examples) / 10.0, 0.1) # Max 0.1 bonus
# Current habit bonus
current_bonus = 0.1 if habit.is_current else 0.0
@@ -204,7 +198,6 @@ class HabitDetector:
frequency_pattern=habit.frequency_pattern,
time_context=habit.time_context,
confidence_level=habit.confidence_level,
supporting_summaries=habit.supporting_summaries,
specific_examples=habit.specific_examples,
first_observed=habit.first_observed,
last_observed=habit.last_observed,
@@ -218,7 +211,6 @@ class HabitDetector:
frequency_pattern=habit.frequency_pattern,
time_context=habit.time_context,
confidence_level=habit.confidence_level,
supporting_summaries=habit.supporting_summaries,
specific_examples=habit.specific_examples,
first_observed=habit.first_observed,
last_observed=habit.last_observed,

View File

@@ -64,6 +64,11 @@ def validate_model_exists_and_active(
) -> tuple[str, bool]:
"""Validate that a model exists and is active.
This function performs tenant-aware model validation with detailed error messages:
- If model doesn't exist at all: "Model not found"
- If model exists but belongs to different tenant: "Model belongs to different tenant" with details
- If model exists and accessible but inactive: "Model is inactive"
Args:
model_id: Model UUID to validate
model_type: Type of model ("llm", "embedding", "rerank")
@@ -76,7 +81,7 @@ def validate_model_exists_and_active(
Tuple of (model_name, is_active)
Raises:
ModelNotFoundError: If model does not exist
ModelNotFoundError: If model does not exist or belongs to different tenant
ModelInactiveError: If model exists but is inactive
"""
from app.repositories.model_repository import ModelConfigRepository
@@ -84,21 +89,48 @@ def validate_model_exists_and_active(
start_time = time.time()
try:
# First check if model exists at all (without tenant filtering)
model_without_tenant = ModelConfigRepository.get_by_id(db, model_id, tenant_id=None)
# Then check with tenant filtering
model = ModelConfigRepository.get_by_id(db, model_id, tenant_id)
elapsed_ms = (time.time() - start_time) * 1000
if not model:
logger.warning(
"Model not found",
extra={"model_id": str(model_id), "model_type": model_type, "elapsed_ms": elapsed_ms}
)
raise ModelNotFoundError(
model_id=model_id,
model_type=model_type,
config_id=config_id,
workspace_id=workspace_id,
message=f"{model_type.title()} model {model_id} not found"
)
if model_without_tenant:
# Model exists but belongs to different tenant
logger.warning(
"Model belongs to different tenant",
extra={
"model_id": str(model_id),
"model_type": model_type,
"model_name": model_without_tenant.name,
"model_tenant_id": str(model_without_tenant.tenant_id),
"requested_tenant_id": str(tenant_id),
"is_public": model_without_tenant.is_public,
"elapsed_ms": elapsed_ms
}
)
raise ModelNotFoundError(
model_id=model_id,
model_type=model_type,
config_id=config_id,
workspace_id=workspace_id,
message=f"{model_type.title()} model {model_id} ({model_without_tenant.name}) belongs to a different tenant (model tenant: {model_without_tenant.tenant_id}, workspace tenant: {tenant_id}). The model is not public and cannot be accessed from this workspace."
)
else:
# Model doesn't exist at all
logger.warning(
"Model not found",
extra={"model_id": str(model_id), "model_type": model_type, "elapsed_ms": elapsed_ms}
)
raise ModelNotFoundError(
model_id=model_id,
model_type=model_type,
config_id=config_id,
workspace_id=workspace_id,
message=f"{model_type.title()} model {model_id} not found"
)
if not model.is_active:
logger.warning(

View File

@@ -7,14 +7,7 @@ import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator
class ConfidenceLevel(str, Enum):
"""Confidence levels for analysis results."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator
class FrequencyPattern(str, Enum):
@@ -41,6 +34,14 @@ class TimeRange(BaseModel):
raise ValueError('end_date must be after start_date')
return v
@field_serializer("start_date", when_used="json")
def _serialize_start_date(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@field_serializer("end_date", when_used="json")
def _serialize_end_date(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class DateRange(BaseModel):
"""Date range for filtering."""
@@ -54,6 +55,14 @@ class DateRange(BaseModel):
raise ValueError('end_date must be after start_date')
return v
@field_serializer("start_date", when_used="json")
def _serialize_start_date(self, dt: Optional[datetime.datetime]):
return int(dt.timestamp() * 1000) if dt else None
@field_serializer("end_date", when_used="json")
def _serialize_end_date(self, dt: Optional[datetime.datetime]):
return int(dt.timestamp() * 1000) if dt else None
class AnalysisConfig(BaseModel):
"""Configuration for analysis operations."""
@@ -79,6 +88,14 @@ class PreferenceTagResponse(BaseModel):
conversation_references: List[str]
category: Optional[str] = None
@field_serializer("created_at", when_used="json")
def _serialize_created_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@field_serializer("updated_at", when_used="json")
def _serialize_updated_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class DimensionScoreResponse(BaseModel):
"""Score for a personality dimension."""
@@ -88,7 +105,7 @@ class DimensionScoreResponse(BaseModel):
percentage: float = Field(ge=0.0, le=100.0)
evidence: List[str]
reasoning: str
confidence_level: ConfidenceLevel
confidence_level: int = Field(ge=0, le=100)
class DimensionPortraitResponse(BaseModel):
@@ -104,6 +121,10 @@ class DimensionPortraitResponse(BaseModel):
total_summaries_analyzed: int
historical_trends: Optional[List[Dict[str, Any]]] = None
@field_serializer("analysis_timestamp", when_used="json")
def _serialize_analysis_timestamp(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class InterestCategoryResponse(BaseModel):
"""Interest category with percentage and evidence."""
@@ -132,6 +153,10 @@ class InterestAreaDistributionResponse(BaseModel):
"""Calculate total percentage across all interest areas."""
return self.tech.percentage + self.lifestyle.percentage + self.music.percentage + self.art.percentage
@field_serializer("analysis_timestamp", when_used="json")
def _serialize_analysis_timestamp(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class BehaviorHabitResponse(BaseModel):
"""A behavioral habit identified from conversations."""
@@ -140,13 +165,20 @@ class BehaviorHabitResponse(BaseModel):
habit_description: str
frequency_pattern: FrequencyPattern
time_context: str
confidence_level: ConfidenceLevel
supporting_summaries: List[str]
confidence_level: int = Field(ge=0, le=100)
first_observed: datetime.datetime
last_observed: datetime.datetime
is_current: bool = True
specific_examples: List[str]
@field_serializer("first_observed", when_used="json")
def _serialize_first_observed(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@field_serializer("last_observed", when_used="json")
def _serialize_last_observed(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class UserProfileResponse(BaseModel):
"""Comprehensive user profile."""
@@ -163,6 +195,14 @@ class UserProfileResponse(BaseModel):
total_summaries_analyzed: int
analysis_completeness_score: float = Field(ge=0.0, le=1.0)
@field_serializer("created_at", when_used="json")
def _serialize_created_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@field_serializer("updated_at", when_used="json")
def _serialize_updated_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
# Internal/Business Logic Schemas
@@ -176,6 +216,10 @@ class MemorySummary(BaseModel):
participants: List[str]
summary_type: str
@field_serializer("timestamp", when_used="json")
def _serialize_timestamp(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class UserMemorySummary(BaseModel):
"""Memory summary filtered for specific user content."""
@@ -188,6 +232,10 @@ class UserMemorySummary(BaseModel):
confidence_score: float = Field(ge=0.0, le=1.0)
summary_type: str
@field_serializer("timestamp", when_used="json")
def _serialize_timestamp(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
class SummaryAnalysisResult(BaseModel):
"""Result of analyzing memory summaries."""
@@ -201,6 +249,10 @@ class SummaryAnalysisResult(BaseModel):
analysis_timestamp: datetime.datetime
summaries_analyzed: List[str]
@field_serializer("analysis_timestamp", when_used="json")
def _serialize_analysis_timestamp(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
# Aliases for backward compatibility with existing code
PreferenceTag = PreferenceTagResponse

View File

@@ -24,7 +24,6 @@ from app.core.memory.analytics.implicit_memory.habit_detector import HabitDetect
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
ConfidenceLevel,
DateRange,
DimensionPortrait,
FrequencyPattern,
@@ -303,7 +302,7 @@ class ImplicitMemoryService:
async def get_behavior_habits(
self,
user_id: str,
confidence_level: Optional[str] = None,
confidence_level: Optional[int] = None,
frequency_pattern: Optional[str] = None,
time_period: Optional[str] = None
) -> List[BehaviorHabit]:
@@ -311,7 +310,7 @@ class ImplicitMemoryService:
Args:
user_id: Target user ID
confidence_level: Optional confidence level filter ("high", "medium", "low")
confidence_level: Optional confidence level filter (0-100)
frequency_pattern: Optional frequency pattern filter
time_period: Optional time period filter ("current", "past")
@@ -338,13 +337,8 @@ class ImplicitMemoryService:
filtered_habits = []
for habit in behavior_habits:
# Filter by confidence level
if confidence_level:
try:
target_confidence = ConfidenceLevel(confidence_level.lower())
if habit.confidence_level != target_confidence:
continue
except ValueError:
logger.warning(f"Invalid confidence level: {confidence_level}")
if confidence_level is not None:
if habit.confidence_level < confidence_level:
continue
# Filter by frequency pattern
@@ -367,12 +361,8 @@ class ImplicitMemoryService:
filtered_habits.append(habit)
# Sort by confidence level and recency
confidence_order = {"high": 3, "medium": 2, "low": 1}
filtered_habits.sort(
key=lambda x: (
confidence_order.get(x.confidence_level.value, 0),
x.last_observed
),
key=lambda x: (x.confidence_level, x.last_observed),
reverse=True
)