diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 727cd041..002547f6 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -7,6 +7,10 @@ from celery import Celery from app.core.config import settings +# macOS fork() safety - must be set before any Celery initialization +if platform.system() == 'Darwin': + os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES') + # 创建 Celery 应用实例 # broker: 任务队列(使用 Redis DB 0) # backend: 结果存储(使用 Redis DB 10) diff --git a/api/app/core/agent/langchain_agent.py b/api/app/core/agent/langchain_agent.py index 58cf9443..7e0015ae 100644 --- a/api/app/core/agent/langchain_agent.py +++ b/api/app/core/agent/langchain_agent.py @@ -151,6 +151,7 @@ class LangChainAgent: # TODO: 移到memory module async def term_memory_save(self,long_term_messages,actual_config_id,end_user_id,type): db = next(get_db()) + #TODO: 魔法数字 scope=6 try: @@ -160,6 +161,12 @@ class LangChainAgent: from app.core.memory.agent.utils.redis_tool import write_store result = write_store.get_session_by_userid(end_user_id) + + # Handle case where no session exists in Redis (returns False) + if not result or result is False: + logger.debug(f"No existing session in Redis for user {end_user_id}, skipping short-term memory update") + return + if type=="chunk" or type=="aggregate": data = await format_parsing(result, "dict") chunk_data = data[:scope] @@ -167,7 +174,14 @@ class LangChainAgent: repo.upsert(end_user_id, chunk_data) logger.info(f'写入短长期:') else: + # TODO: This branch handles type="time" strategy, currently unused. + # Will be activated when time-based long-term storage is implemented. + # TODO: 魔法数字 - extract 5 to a constant long_time_data = write_store.find_user_recent_sessions(end_user_id, 5) + # Handle case where no session exists in Redis (returns False or empty) + if not long_time_data or long_time_data is False: + logger.debug(f"No recent sessions in Redis for user {end_user_id}") + return long_messages = await messages_parse(long_time_data) repo.upsert(end_user_id, long_messages) logger.info(f'写入短长期:') diff --git a/api/app/core/memory/agent/langgraph_graph/routing/write_router.py b/api/app/core/memory/agent/langgraph_graph/routing/write_router.py index 6650b9b0..e9de02b6 100644 --- a/api/app/core/memory/agent/langgraph_graph/routing/write_router.py +++ b/api/app/core/memory/agent/langgraph_graph/routing/write_router.py @@ -61,6 +61,7 @@ async def window_dialogue(end_user_id,langchain_messages,memory_config,scope): scope:窗口大小 ''' scope=scope + redis_messages = [] is_end_user_id = count_store.get_sessions_count(end_user_id) if is_end_user_id is not False: is_end_user_id = count_store.get_sessions_count(end_user_id)[0] @@ -92,6 +93,9 @@ async def memory_long_term_storage(end_user_id,memory_config,time): memory_config: 内存配置对象 ''' long_time_data = write_store.find_user_recent_sessions(end_user_id, time) + # Handle case where no session exists in Redis (returns False or empty) + if not long_time_data or long_time_data is False: + return format_messages = await chat_data_format(long_time_data) if format_messages!=[]: await write_messages(end_user_id, format_messages, memory_config) @@ -109,8 +113,9 @@ async def aggregate_judgment(end_user_id: str, ori_messages: list, memory_config try: # 1. 获取历史会话数据(使用新方法) result = write_store.get_all_sessions_by_end_user_id(end_user_id) - history = await format_parsing(result) - if not result: + + # Handle case where no session exists in Redis (returns False or empty) + if not result or result is False: history = [] else: history = await format_parsing(result)