Skip to content

Commit

Permalink
surround ttl-compact with SWAP_ENABLED.
Browse files Browse the repository at this point in the history
  • Loading branch information
patpatbear committed Jan 7, 2025
1 parent 603ae91 commit cfd0708
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 23 deletions.
9 changes: 2 additions & 7 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,6 @@ configEnum swap_info_propagate_mode_enum[] = {
{NULL, 0}
};

/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
{1024*1024*256, 1024*1024*64, 60}, /* slave */
{1024*1024*32, 1024*1024*8, 60} /* pubsub */
};

/* swap batch limits presets. */
swapBatchLimitsConfig swapBatchLimitsDefaults[SWAP_TYPES] = {
{0, 0}, /* NOP */
Expand Down Expand Up @@ -3055,7 +3048,9 @@ standardConfig configs[] = {

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
#ifdef ENABLE_SWAP
createUIntConfig("swap-ttl-compact-expire-percentile", NULL, MODIFIABLE_CONFIG, 1, 100, server.swap_ttl_compact_expire_percentile, 99, INTEGER_CONFIG, NULL, NULL),
#endif

/* Unsigned Long configs */
createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */
Expand Down
8 changes: 4 additions & 4 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
#ifdef ENABLE_SWAP
if ((rocksFlushDB(dbnum)))
serverLog(LL_WARNING,"[ROCKS] flushd rocks db(%d) failed.",dbnum);

if (server.swap_ttl_compact_ctx) {
swapTtlCompactCtxReset(server.swap_ttl_compact_ctx);
}
#endif
/* Fire the flushdb modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
Expand All @@ -516,10 +520,6 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
/* Empty redis database structure. */
removed = emptyDbStructure(server.db, dbnum, async, callback);

if (server.swap_ttl_compact_ctx) {
swapTtlCompactCtxReset(server.swap_ttl_compact_ctx);
}

/* Flush slots to keys map if enable cluster, we can flush entire
* slots to keys map whatever dbnum because only support one DB
* in cluster mode. */
Expand Down
2 changes: 2 additions & 0 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,9 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;

if (hasActiveChildProcess()) {
#ifdef ENABLE_SWAP
swapForkRocksdbCtxRelease(sfrctx);
#endif
return C_ERR;
}

Expand Down
26 changes: 15 additions & 11 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3394,13 +3394,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.maxmemory_scale_from > server.maxmemory)
updateMaxMemoryScaleFrom();
}
#endif
run_with_period(1000) {
if (server.repl_mode->mode == REPL_MODE_XSYNC) {
xsyncReplicationCron();
}
}


run_with_period(1000*(int)server.swap_sst_age_limit_refresh_period) {
ttlCompactRefreshSstAgeLimit();
}
Expand All @@ -3409,11 +3403,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* producing and consuming task are both in swap util thread
* so producing should be slower than consuming, otherwise consuming
* will be starved to death. */
ttlCompactProduceTask();
ttlCompactProduceTask();
}

run_with_period(1000*(int)server.swap_ttl_compact_period / 2) {
ttlCompactConsumeTask();
ttlCompactConsumeTask();
}

run_with_period(1000*(int)server.swap_swap_info_slave_period) {
Expand All @@ -3424,7 +3418,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
swapPropagateSwapInfoCmd(3, argv);
swapDestorySwapInfoSstAgeLimitCmd(argv);
}

}
#endif
run_with_period(1000) {
if (server.repl_mode->mode == REPL_MODE_XSYNC) {
xsyncReplicationCron();
}
}

/* Fire the cron loop modules event. */
Expand Down Expand Up @@ -3788,7 +3787,9 @@ void createSharedObjects(void) {
shared.persist = createStringObject("PERSIST",7);
shared.set = createStringObject("SET",3);
shared.eval = createStringObject("EVAL",4);
#ifdef ENABLE_SWAP
shared.swap_info = createStringObject("swap.info",9);
#endif

/* Shared command argument */
shared.left = createStringObject("left",4);
Expand All @@ -3810,7 +3811,9 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*",1);
shared.special_equals = createStringObject("=",1);
shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
#ifdef ENABLE_SWAP
shared.sst_age_limit = createStringObject("SST-AGE-LIMIT",13);
#endif

shared.gtid = createStringObject("GTID",4);

Expand Down Expand Up @@ -5873,14 +5876,15 @@ void pingCommand(client *c) {
if (c->argc == 1) {
addReply(c,shared.pong);
} else {
#ifdef ENABLE_SWAP
/* extend ping argv for swap.info */
if (!strncasecmp(c->argv[1]->ptr, "swap.info", 9)) {
int swap_info_argc = 0;
sds *swap_info_argv = swapDecodeSwapInfo(c->argv[1]->ptr, &swap_info_argc);
swapApplySwapInfo(swap_info_argc, swap_info_argv);
sdsfreesplitres(swap_info_argv, swap_info_argc);
}

#endif
addReplyBulk(c,c->argv[1]);
}
}
Expand Down

0 comments on commit cfd0708

Please sign in to comment.