Feature/memory perceptual (#48)

* perf(workflow): pass JSON data to HTTP node as a string

* perf(prompt_opt): simplify log output

* feat(memory): add perceptual memory page API and related database schema

* perf(log): clean up API exception log output

* perf(memory): simplify perceptual memory timeline response by removing metadata
This commit is contained in:
Eternity
2026-01-07 16:00:22 +08:00
committed by GitHub
parent 957f8f83ff
commit c52b360068
8 changed files with 758 additions and 4 deletions

View File

@@ -0,0 +1,166 @@
import uuid
from typing import Dict, Any, Optional
from sqlalchemy.orm import Session
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.core.logging_config import get_business_logger
from app.models.memory_perceptual_model import PerceptualType, FileStorageType
from app.repositories.memory_perceptual_repository import MemoryPerceptualRepository
from app.schemas.memory_perceptual_schema import (
PerceptualQuerySchema,
PerceptualTimelineResponse,
PerceptualMemoryItem,
AudioModal, Content, VideoModal, TextModal
)
business_logger = get_business_logger()
class MemoryPerceptualService:
def __init__(self, db: Session):
self.db = db
self.repository = MemoryPerceptualRepository(db)
def get_memory_count(self, end_user_id: uuid.UUID) -> Dict[str, Any]:
"""Retrieve perceptual memory statistics for a user."""
business_logger.info(f"Fetching perceptual memory statistics: end_user_id={end_user_id}")
try:
total_count = self.repository.get_count_by_user_id(end_user_id=end_user_id)
vision_count = self.repository.get_count_by_type(end_user_id, PerceptualType.VISION)
audio_count = self.repository.get_count_by_type(end_user_id, PerceptualType.AUDIO)
text_count = self.repository.get_count_by_type(end_user_id, PerceptualType.TEXT)
conversation_count = self.repository.get_count_by_type(end_user_id, PerceptualType.CONVERSATION)
stats = {
"total": total_count,
"by_type": {
"vision": vision_count,
"audio": audio_count,
"text": text_count,
"conversation": conversation_count
}
}
business_logger.info(f"Memory statistics fetched successfully: total={total_count}")
return stats
except Exception as e:
business_logger.error(f"Failed to fetch memory statistics: {str(e)}")
raise BusinessException(f"Failed to fetch memory statistics: {str(e)}", BizCode.DB_ERROR)
def _get_latest_memory_by_type(
self,
end_user_id: uuid.UUID,
perceptual_type: PerceptualType
) -> Optional[dict[str, Any]]:
"""Internal helper to retrieve the latest memory by type."""
business_logger.info(f"Fetching latest {perceptual_type.name.lower()} memory: end_user_id={end_user_id}")
try:
memories = self.repository.get_by_type(
end_user_id=end_user_id,
perceptual_type=perceptual_type,
limit=1,
offset=0
)
if not memories:
business_logger.info(f"No {perceptual_type.name.lower()} memory found: end_user_id={end_user_id}")
return None
memory = memories[0]
meta_data = memory.meta_data or {}
modalities = meta_data.get("modalities")
content = meta_data.get("content")
if not modalities:
raise BusinessException(f"Modalities not defined, perceptual memory_id={memory.id}", BizCode.DB_ERROR)
if not content:
raise BusinessException(f"Content not defined, perceptual memory_id={memory.id}", BizCode.DB_ERROR)
content = Content(**content)
match perceptual_type:
case PerceptualType.VISION:
modal = VideoModal(**modalities)
case PerceptualType.AUDIO:
modal = AudioModal(**modalities)
case PerceptualType.TEXT:
modal = TextModal(**modalities)
case _:
raise BusinessException("Unsupported perceptual type", BizCode.DB_ERROR)
detail = modal.model_dump()
result = {
"id": str(memory.id),
"file_name": memory.file_name,
"file_path": memory.file_path,
"storage_type": memory.storage_service,
"summary": memory.summary,
"keywords": content.keywords,
"topic": content.topic,
"domain": content.domain,
"created_time": memory.created_time.isoformat() if memory.created_time else None,
**detail
}
business_logger.info(
f"Latest {perceptual_type.name.lower()} memory retrieved successfully: file={memory.file_name}")
return result
except Exception as e:
business_logger.error(f"Failed to fetch latest {perceptual_type.name.lower()} memory: {str(e)}")
raise BusinessException(f"Failed to fetch latest {perceptual_type.name.lower()} memory: {str(e)}",
BizCode.DB_ERROR)
def get_latest_visual_memory(self, end_user_id: uuid.UUID) -> Optional[Dict[str, Any]]:
return self._get_latest_memory_by_type(end_user_id, PerceptualType.VISION)
def get_latest_audio_memory(self, end_user_id: uuid.UUID) -> Optional[Dict[str, Any]]:
return self._get_latest_memory_by_type(end_user_id, PerceptualType.AUDIO)
def get_latest_text_memory(self, end_user_id: uuid.UUID) -> Optional[Dict[str, Any]]:
return self._get_latest_memory_by_type(end_user_id, PerceptualType.TEXT)
def get_time_line(self, end_user_id: uuid.UUID, query: PerceptualQuerySchema) -> PerceptualTimelineResponse:
"""Retrieve a timeline of perceptual memories for a user."""
business_logger.info(f"Fetching perceptual memory timeline: "
f"end_user_id={end_user_id}, filter={query.filter}")
try:
if query.page < 1:
raise BusinessException("Page number must be greater than 0", BizCode.INVALID_PARAMETER)
if query.page_size < 1 or query.page_size > 100:
raise BusinessException("Page size must be between 1 and 100", BizCode.INVALID_PARAMETER)
total_count, memories = self.repository.get_timeline(end_user_id, query)
memory_items = []
for memory in memories:
memory_item = PerceptualMemoryItem(
id=memory.id,
perceptual_type=PerceptualType(memory.perceptual_type),
file_path=memory.file_path,
file_name=memory.file_name,
summary=memory.summary,
created_time=memory.created_time,
storage_type=FileStorageType(memory.storage_service),
)
memory_items.append(memory_item)
timeline_response = PerceptualTimelineResponse(
total=total_count,
page=query.page,
page_size=query.page_size,
total_pages=(total_count + query.page_size - 1) // query.page_size,
memories=memory_items
)
business_logger.info(f"Perceptual memory timeline retrieved successfully: "
f"total={total_count}, returned={len(memories)}")
return timeline_response
except BusinessException:
raise
except Exception as e:
business_logger.error(f"Failed to fetch perceptual memory timeline: {str(e)}")
raise BusinessException(f"Failed to fetch perceptual memory timeline: {str(e)}", BizCode.DB_ERROR)

View File

@@ -166,6 +166,8 @@ class PromptOptimizerService:
model_config = self.get_model_config(tenant_id, model_id)
session_history = self.get_session_message_history(session_id=session_id, user_id=user_id)
logger.info(f"Prompt optimization started, user_id={user_id}, session_id={session_id}")
# Create LLM instance
api_config: ModelApiKey = model_config.api_keys[0]
llm = RedBearLLM(RedBearModelConfig(
@@ -203,7 +205,6 @@ class PromptOptimizerService:
messages.extend(session_history[:-1]) # last message is current message
messages.extend([(RoleType.USER.value, rendered_user_message)])
logger.info(f"Prompt optimization message: {messages}")
buffer = ""
prompt_started = False
prompt_finished = False
@@ -250,6 +251,7 @@ class PromptOptimizerService:
content=desc
)
variables = self.parser_prompt_variables(optim_result.get("prompt"))
logger.info(f"Prompt optimization completed, user_id={user_id}, session_id={session_id}")
yield {"desc": optim_result.get("desc"), "variables": variables}
@staticmethod