Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Add Redis persistent DB mgmt commands when clearing history #565

Merged
merged 5 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/565.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Issue BGSAVE or BGREWRITEAOF commands when clearning the session statistics history to ensure compaction and persistence of the Redis database
30 changes: 24 additions & 6 deletions src/ai/backend/manager/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ async def _clear_redis_history():
target_kernels = [str(x['id']) for x in result.all()]

delete_count = 0
if len(target_kernels) > 0:
async with redis_ctx(cli_ctx) as redis_conn_set:

def _build_pipe(
r: aioredis.Redis,
Expand All @@ -168,7 +168,7 @@ def _build_pipe(
pipe.delete(*kernel_ids)
return pipe

async with redis_ctx(cli_ctx) as redis_conn_set:
if len(target_kernels) > 0:
# Apply chunking to avoid excessive length of command params
# and indefinite blocking of the Redis server.
for kernel_ids in chunked(target_kernels, 32):
Expand All @@ -178,10 +178,28 @@ def _build_pipe(
)
# Each DEL command returns the number of keys deleted.
delete_count += sum(results)
log.info(
"Cleaned up {:,} redis statistics records older than {:}.",
delete_count, expiration_date,
)
log.info(
"Cleaned up {:,} redis statistics records older than {:}.",
delete_count, expiration_date,
)

# Sync and compact the persistent database of Redis
redis_config = await redis_helper.execute(
redis_conn_set.stat,
lambda r: r.config_get("appendonly"),
)
if redis_config['appendonly'] == 'yes':
await redis_helper.execute(
redis_conn_set.stat,
lambda r: r.bgrewriteaof(),
)
log.info("Issued BGREWRITEAOF to the Redis database.")
else:
await redis_helper.execute(
redis_conn_set.stat,
lambda r: r.execute_command("BGSAVE SCHEDULE"),
)
log.info("Issued BGSAVE to the Redis database.")
except:
log.exception("Unexpected error while cleaning up redis history")

Expand Down