From 6e758faa37fa4377caaf6e6945c19dfe192381d7 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 4 Mar 2026 14:17:45 +0800 Subject: [PATCH] [changes] Using Pydantic to standardize the time data for scheduled tasks --- api/app/core/config.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/api/app/core/config.py b/api/app/core/config.py index 6f48fec2..0dbebb6d 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -1,9 +1,10 @@ import json import os from pathlib import Path -from typing import Any, Dict, Optional +from typing import Annotated, Any, Dict, Optional from dotenv import load_dotenv +from pydantic import Field, TypeAdapter load_dotenv() @@ -206,10 +207,18 @@ class Settings: MEMORY_CACHE_REGENERATION_HOURS: int = int(os.getenv("MEMORY_CACHE_REGENERATION_HOURS", "24")) # Celery Beat Schedule Configuration (定时任务执行频率) - MEMORY_INCREMENT_HOUR: int = int(os.getenv("MEMORY_INCREMENT_HOUR", "2")) - MEMORY_INCREMENT_MINUTE: int = int(os.getenv("MEMORY_INCREMENT_MINUTE", "0")) - WORKSPACE_REFLECTION_INTERVAL_SECONDS: int = int(os.getenv("WORKSPACE_REFLECTION_INTERVAL_SECONDS", "30")) - FORGETTING_CYCLE_INTERVAL_HOURS: int = int(os.getenv("FORGETTING_CYCLE_INTERVAL_HOURS", "24")) + MEMORY_INCREMENT_HOUR: int = TypeAdapter( + Annotated[int, Field(ge=0, le=23, description="cron hour [0, 23]")] + ).validate_python(int(os.getenv("MEMORY_INCREMENT_HOUR", "2"))) + MEMORY_INCREMENT_MINUTE: int = TypeAdapter( + Annotated[int, Field(ge=0, le=59, description="cron minute [0, 59]")] + ).validate_python(int(os.getenv("MEMORY_INCREMENT_MINUTE", "0"))) + WORKSPACE_REFLECTION_INTERVAL_SECONDS: int = TypeAdapter( + Annotated[int, Field(ge=1, description="reflection interval in seconds, must be >= 1")] + ).validate_python(int(os.getenv("WORKSPACE_REFLECTION_INTERVAL_SECONDS", "30"))) + FORGETTING_CYCLE_INTERVAL_HOURS: int = TypeAdapter( + Annotated[int, Field(ge=1, description="forgetting cycle interval in hours, must be >= 1")] + ).validate_python(int(os.getenv("FORGETTING_CYCLE_INTERVAL_HOURS", "24"))) # Memory Module Configuration (internal) MEMORY_OUTPUT_DIR: str = os.getenv("MEMORY_OUTPUT_DIR", "logs/memory-output")