feat(memory): add session-based chat history and user metadata retrieval
- Add ChatSessionCache to manage chat history per session - Add SEARCH_USER_METADATA cypher query for retrieving user entity metadata - Add "str" mode support to StructResponse for raw text extraction - Add content_str field to MemorySearchResult for pre-formatted content - Fix sandbox URL by removing hardcoded port - Add description field to entity search results - Remove history from UserInput schema, use session_id instead
This commit is contained in:
@@ -158,12 +158,19 @@ class RedisTaskScheduler:
|
||||
return {"status": status, "task_id": task_id, "result": result_content}
|
||||
|
||||
def _cleanup_finished(self):
|
||||
pending = self.redis.hgetall(PENDING_HASH)
|
||||
if not pending:
|
||||
cursor = 0
|
||||
all_pending = {}
|
||||
while True:
|
||||
cursor, batch = self.redis.hscan(PENDING_HASH, cursor=cursor, count=100)
|
||||
all_pending.update(batch)
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
if not all_pending:
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
task_ids = list(pending.keys())
|
||||
task_ids = list(all_pending.keys())
|
||||
|
||||
pipe = self.redis.pipeline()
|
||||
for task_id in task_ids:
|
||||
@@ -176,7 +183,7 @@ class RedisTaskScheduler:
|
||||
|
||||
for task_id, raw_result in zip(task_ids, results):
|
||||
try:
|
||||
meta = json.loads(pending[task_id])
|
||||
meta = json.loads(all_pending[task_id])
|
||||
lock_key = meta["lock_key"]
|
||||
dispatched_at = meta.get("dispatched_at", 0)
|
||||
age = now - dispatched_at
|
||||
@@ -276,6 +283,22 @@ class RedisTaskScheduler:
|
||||
return True
|
||||
return stable_hash(user_id) % self._shard_count == self._shard_index
|
||||
|
||||
def _commit_post_dispatch(self, lock_key, task, msg_id, dispatch_lock):
|
||||
pipe = self.redis.pipeline()
|
||||
pipe.set(lock_key, task.id, ex=3600)
|
||||
pipe.hset(PENDING_HASH, task.id, json.dumps({
|
||||
"lock_key": lock_key,
|
||||
"dispatched_at": time.time(),
|
||||
"msg_id": msg_id,
|
||||
}))
|
||||
pipe.delete(dispatch_lock)
|
||||
pipe.set(
|
||||
f"task_tracker:{msg_id}",
|
||||
json.dumps({"status": "DISPATCHED", "task_id": task.id}),
|
||||
ex=86400,
|
||||
)
|
||||
pipe.execute()
|
||||
|
||||
def _dispatch(self, msg_id, msg_data) -> bool:
|
||||
user_id = msg_data["user_id"]
|
||||
task_name = msg_data["task_name"]
|
||||
@@ -308,28 +331,17 @@ class RedisTaskScheduler:
|
||||
task_name, user_id, msg_id, e, exc_info=True,
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
pipe = self.redis.pipeline()
|
||||
pipe.set(lock_key, task.id, ex=3600)
|
||||
pipe.hset(PENDING_HASH, task.id, json.dumps({
|
||||
"lock_key": lock_key,
|
||||
"dispatched_at": time.time(),
|
||||
"msg_id": msg_id,
|
||||
}))
|
||||
pipe.delete(dispatch_lock)
|
||||
pipe.set(
|
||||
f"task_tracker:{msg_id}",
|
||||
json.dumps({"status": "DISPATCHED", "task_id": task.id}),
|
||||
ex=86400,
|
||||
)
|
||||
pipe.execute()
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Post-dispatch state update failed for %s: %s",
|
||||
task.id, e, exc_info=True,
|
||||
)
|
||||
self.errors += 1
|
||||
for attempt in range(2):
|
||||
try:
|
||||
self._commit_post_dispatch(lock_key, task, msg_id, dispatch_lock)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Post-dispatch state update failed for %s: %s",
|
||||
task.id, e, exc_info=True,
|
||||
)
|
||||
time.sleep(0.1)
|
||||
self.errors += 1
|
||||
|
||||
self.dispatched += 1
|
||||
logger.info("Task dispatched: %s (msg=%s)", task.id, msg_id)
|
||||
@@ -367,22 +379,21 @@ class RedisTaskScheduler:
|
||||
return
|
||||
|
||||
for uid, msg in candidates:
|
||||
queue_key = f"{USER_QUEUE_PREFIX}{uid}"
|
||||
if self._dispatch(msg["msg_id"], msg):
|
||||
self.redis.lpop(f"{USER_QUEUE_PREFIX}{uid}")
|
||||
self.redis.lpop(queue_key)
|
||||
if self.redis.llen(queue_key) > 0:
|
||||
self.redis.sadd(READY_SET, uid)
|
||||
|
||||
def schedule_loop(self):
|
||||
self._heartbeat()
|
||||
self._cleanup_finished()
|
||||
|
||||
pipe = self.redis.pipeline()
|
||||
pipe.smembers(READY_SET)
|
||||
pipe.delete(READY_SET)
|
||||
results = pipe.execute()
|
||||
ready_users = results[0] or set()
|
||||
|
||||
ready_users = self.redis.smembers(READY_SET) or set()
|
||||
my_users = [uid for uid in ready_users if self._is_mine(uid)]
|
||||
|
||||
if not my_users:
|
||||
if my_users:
|
||||
self.redis.srem(READY_SET, *my_users)
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
return
|
||||
|
||||
@@ -445,7 +456,7 @@ class RedisTaskScheduler:
|
||||
"Scheduler started: instance=%s", self.instance_id,
|
||||
)
|
||||
|
||||
while True:
|
||||
while self.running:
|
||||
try:
|
||||
self.schedule_loop()
|
||||
|
||||
@@ -480,9 +491,7 @@ class RedisTaskScheduler:
|
||||
logger.error("Shutdown cleanup error: %s", e)
|
||||
|
||||
|
||||
scheduler: RedisTaskScheduler | None = None
|
||||
if scheduler is None:
|
||||
scheduler = RedisTaskScheduler()
|
||||
scheduler = RedisTaskScheduler()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import signal
|
||||
|
||||
Reference in New Issue
Block a user