Skip to content

Commit

Permalink
Before evicted and before expired server events are not executed insi…
Browse files Browse the repository at this point in the history
…de an execution unit. (redis#12733)

Redis 7.2 (redis#9406) introduced a new modules event, `RedisModuleEvent_Key`.
This new event allows the module to read the key data just before it is removed
from the database (either deleted, expired, evicted, or overwritten).

When the key is removed from the database, either by active expire or eviction.
The new event was not called as part of an execution unit. This can cause an
issue if the module registers a post notification job inside the event. This job will
not be executed atomically with the expiration/eviction operation and will not
replicated inside a Multi/Exec. Moreover, the post notification job will be executed
right after the event where it is still not safe to perform any write operation, this will
violate the promise that post notification job will be called atomically with the
operation that triggered it and **only when it is safe to write**.

This PR fixes the issue by wrapping each expiration/eviction of a key with an execution
unit. This makes sure the entire operation will run atomically and all the post notification
jobs will be executed at the end where it is safe to write.

Tests were modified to verify the fix.
  • Loading branch information
MeirShpilraien authored Nov 8, 2023
1 parent 6223355 commit 0ffb9d2
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ int performEvictions(void) {
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
enterExecutionUnit(1, 0);
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
Expand All @@ -694,6 +695,7 @@ int performEvictions(void) {
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(keyobj);
keys_freed++;
Expand Down
2 changes: 2 additions & 0 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
enterExecutionUnit(1, 0);
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
deleteExpiredKeyAndPropagate(db,keyobj);
decrRefCount(keyobj);
exitExecutionUnit();
return 1;
} else {
return 0;
Expand Down
67 changes: 67 additions & 0 deletions tests/modules/postnotifications.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha
return REDISMODULE_OK; /* do not count the evicted key */
}

if (strncmp(key_str, "before_evicted", 14) == 0) {
return REDISMODULE_OK; /* do not count the before_evicted key */
}

RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
Expand Down Expand Up @@ -186,6 +190,55 @@ static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleSt
return REDISMODULE_OK;
}

typedef struct KeySpace_EventPostNotificationCtx {
RedisModuleString *triggered_on;
RedisModuleString *new_key;
} KeySpace_EventPostNotificationCtx;

static void KeySpace_ServerEventPostNotificationFree(void *pd) {
KeySpace_EventPostNotificationCtx *pn_ctx = pd;
RedisModule_FreeString(NULL, pn_ctx->new_key);
RedisModule_FreeString(NULL, pn_ctx->triggered_on);
RedisModule_Free(pn_ctx);
}

static void KeySpace_ServerEventPostNotification(RedisModuleCtx *ctx, void *pd) {
REDISMODULE_NOT_USED(ctx);
KeySpace_EventPostNotificationCtx *pn_ctx = pd;
RedisModuleCallReply* rep = RedisModule_Call(ctx, "lpush", "!ss", pn_ctx->new_key, pn_ctx->triggered_on);
RedisModule_FreeCallReply(rep);
}

static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) {
REDISMODULE_NOT_USED(eid);
REDISMODULE_NOT_USED(data);
if (subevent > 3) {
RedisModule_Log(ctx, "warning", "Got an unexpected subevent '%ld'", subevent);
return;
}
static const char* events[] = {
"before_deleted",
"before_expired",
"before_evicted",
"before_overwritten",
};

const RedisModuleString *key_name = RedisModule_GetKeyNameFromModuleKey(((RedisModuleKeyInfo*)data)->key);
const char *key_str = RedisModule_StringPtrLen(key_name, NULL);

for (int i = 0 ; i < 4 ; ++i) {
const char *event = events[i];
if (strncmp(key_str, event , strlen(event)) == 0) {
return; /* don't log any event on our tracking keys */
}
}

KeySpace_EventPostNotificationCtx *pn_ctx = RedisModule_Alloc(sizeof(*pn_ctx));
pn_ctx->triggered_on = RedisModule_HoldString(NULL, (RedisModuleString*)key_name);
pn_ctx->new_key = RedisModule_CreateString(NULL, events[subevent], strlen(events[subevent]));
RedisModule_AddPostNotificationJob(ctx, KeySpace_ServerEventPostNotification, pn_ctx, KeySpace_ServerEventPostNotificationFree);
}

/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
Expand All @@ -200,6 +253,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}

int with_key_events = 0;
if (argc >= 1) {
const char *arg = RedisModule_StringPtrLen(argv[0], 0);
if (strcmp(arg, "with_key_events") == 0) {
with_key_events = 1;
}
}

RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS);

if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){
Expand All @@ -222,6 +283,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}

if (with_key_events) {
if(RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, KeySpace_ServerEventCallback) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
}

if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/moduleapi/async_rm_call.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,14 @@ start_server {tags {"modules"}} {
r lpush l a
assert_equal [$rd read] {OK}

# Explanation of the first multi exec block:
# {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys'
# {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys'
# {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys'
# {incr string_changed{string_foo}} - post notification job that was registered when 'string_foo' changed
# {incr string_changed{string_bar}} - post notification job that was registered when 'string_bar' changed
# {incr string_total} - post notification job that was registered when string_changed{string_foo} changed
# {incr string_total} - post notification job that was registered when string_changed{string_bar} changed
assert_replication_stream $repl {
{select *}
{lpush l a}
Expand Down Expand Up @@ -355,6 +363,25 @@ start_server {tags {"modules"}} {
r lpush l a
assert_equal [$rd read] {OK}

# Explanation of the first multi exec block:
# {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys'
# {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys'
# {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys'
# {incr string_changed{string_foo}} - post notification job that was registered when 'string_foo' changed
# {incr string_changed{string_bar}} - post notification job that was registered when 'string_bar' changed
# {incr string_total} - post notification job that was registered when string_changed{string_foo} changed
# {incr string_total} - post notification job that was registered when string_changed{string_bar} changed
#
# Explanation of the second multi exec block:
# {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys'
# {del string_foo} - lazy expiration of string_foo when 'blpop_and_set_multiple_keys' tries to write to it.
# {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys'
# {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys'
# {incr expired} - the post notification job, registered after string_foo got expired
# {incr string_changed{string_foo}} - post notification job triggered when we set string_foo
# {incr string_changed{string_bar}} - post notification job triggered when we set string_bar
# {incr string_total} - post notification job triggered when we incr 'string_changed{string_foo}'
# {incr string_total} - post notification job triggered when we incr 'string_changed{string_bar}'
assert_replication_stream $repl {
{select *}
{lpush l a}
Expand Down
28 changes: 21 additions & 7 deletions tests/unit/moduleapi/postnotifications.tcl
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
set testmodule [file normalize tests/modules/postnotifications.so]

tags "modules" {
start_server [list overrides [list loadmodule "$testmodule"]] {
start_server {} {
r module load $testmodule with_key_events

test {Test write on post notification callback} {
set repl [attach_to_replication_stream]

r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]

r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]

# the {lpush before_overwritten string_x} is a post notification job registered when 'string_x' was overwritten
assert_replication_stream $repl {
{multi}
{select *}
Expand All @@ -23,6 +25,7 @@ tags "modules" {
{exec}
{multi}
{set string_x 2}
{lpush before_overwritten string_x}
{incr string_changed{string_x}}
{incr string_total}
{exec}
Expand All @@ -37,7 +40,7 @@ tags "modules" {
assert_equal {OK} [r postnotification.async_set]
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]

assert_replication_stream $repl {
{multi}
{select *}
Expand All @@ -63,12 +66,14 @@ tags "modules" {
fail "Failed waiting for x to expired"
}

# the {lpush before_expired x} is a post notification job registered before 'x' got expired
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{lpush before_expired x}
{incr expired}
{exec}
}
Expand All @@ -85,12 +90,14 @@ tags "modules" {
after 10
assert_equal {} [r get x]

# the {lpush before_expired x} is a post notification job registered before 'x' got expired
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{lpush before_expired x}
{incr expired}
{exec}
}
Expand All @@ -108,13 +115,15 @@ tags "modules" {
after 10
assert_equal {OK} [r set read_x 1]

# the {lpush before_expired x} is a post notification job registered before 'x' got expired
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{set read_x 1}
{del x}
{lpush before_expired x}
{incr expired}
{exec}
}
Expand Down Expand Up @@ -143,16 +152,18 @@ tags "modules" {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r config set maxmemory-policy allkeys-random
r config set maxmemory-policy allkeys-random
r config set maxmemory 1

assert_error {OOM *} {r set y 1}

# the {lpush before_evicted x} is a post notification job registered before 'x' got evicted
assert_replication_stream $repl {
{select *}
{set x 1}
{multi}
{del x}
{lpush before_evicted x}
{incr evicted}
{exec}
}
Expand All @@ -164,15 +175,16 @@ tags "modules" {
set testmodule2 [file normalize tests/modules/keyspace_events.so]

tags "modules" {
start_server [list overrides [list loadmodule "$testmodule"]] {
start_server {} {
r module load $testmodule with_key_events
r module load $testmodule2
test {Test write on post notification callback} {
set repl [attach_to_replication_stream]

r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]

r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
Expand All @@ -181,6 +193,7 @@ tags "modules" {
assert_equal {1} [r get string_changed{string1_x}]
assert_equal {3} [r get string_total]

# the {lpush before_overwritten string_x} is a post notification job registered before 'string_x' got overwritten
assert_replication_stream $repl {
{multi}
{select *}
Expand All @@ -190,6 +203,7 @@ tags "modules" {
{exec}
{multi}
{set string_x 2}
{lpush before_overwritten string_x}
{incr string_changed{string_x}}
{incr string_total}
{exec}
Expand All @@ -202,4 +216,4 @@ tags "modules" {
close_replication_stream $repl
}
}
}
}

0 comments on commit 0ffb9d2

Please sign in to comment.