Skip to content

Commit

Permalink
Fix blocking commands timeout is reset due to re-processing command (r…
Browse files Browse the repository at this point in the history
…edis#13004)

In redis#11012, we will reprocess command when client is unblocked on keys,
in some blocking commands, for example, in the XREADGROUP BLOCK
scenario,
because of the re-processing command, we will recalculate the block
timeout,
causing the blocking time to be reset.

This commit add a new CLIENT_REPROCESSING_COMMAND clent flag, explicitly
let the command know that it is being re-processed, later in
blockForKeys
we will not reset the timeout.

Affected BLOCK cases: 
- list / zset / stream, added test cases for each.

Unaffected cases:
- module (never re-process the commands).
- WAIT / WAITAOF (never re-process the commands).

Fixes redis#12998.
  • Loading branch information
enjoy-binbin authored Jan 30, 2024
1 parent af7ceeb commit 492021d
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 8 deletions.
8 changes: 6 additions & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;

c->bstate.timeout = timeout;
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
}

for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
Expand All @@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
listAddNodeTail(l,c);
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));


/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
if (unblock_on_nokey) {
Expand Down
8 changes: 4 additions & 4 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -7802,15 +7802,15 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->background_timer = 0;
bc->background_duration = 0;

c->bstate.timeout = 0;
mstime_t timeout = 0;
if (timeout_ms) {
mstime_t now = mstime();
if (timeout_ms > LLONG_MAX - now) {
if (timeout_ms > LLONG_MAX - now) {
c->bstate.module_blocked_handle = NULL;
addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */
return bc;
}
c->bstate.timeout = timeout_ms + now;
timeout = timeout_ms + now;
}

if (islua || ismulti) {
Expand All @@ -7826,7 +7826,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
} else {
blockClient(c,BLOCKED_MODULE);
}
Expand Down
10 changes: 9 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3646,12 +3646,20 @@ void call(client *c, int flags) {
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;

/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
* processing of the command proc, the client is aware that it is being
* re-processed. */
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;

monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();

c->cmd->proc(c);

/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;

exitExecutionUnit();

/* In case client is blocked after trying to execute the command,
Expand Down Expand Up @@ -3709,7 +3717,7 @@ void call(client *c, int flags) {

/* Send the command to clients in MONITOR mode if applicable,
* since some administrative commands are considered too dangerous to be shown.
* Other exceptions is a client which is unblocked and retring to process the command
* Other exceptions is a client which is unblocked and retrying to process the command
* or we are currently in the process of loading AOF. */
if (update_command_stats && !reprocessing_command &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/type/list.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,34 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
r select 9
} {OK} {singledb:skip needs:debug}

test {BLPOP unblock but the key is expired and then block again - reprocessing command} {
r flushall
r debug set-active-expire 0
set rd [redis_deferring_client]

set start [clock milliseconds]
$rd blpop mylist 1
wait_for_blocked_clients_count 1

# The exec will try to awake the blocked client, but the key is expired,
# so the client will be blocked again during the command reprocessing.
r multi
r rpush mylist a
r pexpire mylist 100
r debug sleep 0.2
r exec

assert_equal {} [$rd read]
set end [clock milliseconds]

# In the past, this time would have been 1000+200, in order to avoid
# timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1100

r debug set-active-expire 1
$rd close
} {0} {needs:debug}

foreach {pop} {BLPOP BLMPOP_LEFT} {
test "$pop when new key is moved into place" {
set rd [redis_deferring_client]
Expand Down
30 changes: 29 additions & 1 deletion tests/unit/type/stream-cgroups.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ start_server {
$rd close
}

test {Blocking XREADGROUP for stream key that has clients blocked on list - avoid endless loop} {
test {Blocking XREADGROUP for stream key that has clients blocked on stream - avoid endless loop} {
r DEL mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM

Expand All @@ -498,6 +498,34 @@ start_server {
assert_equal [r ping] {PONG}
}

test {Blocking XREADGROUP for stream key that has clients blocked on stream - reprocessing command} {
r DEL mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM

set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]

$rd1 xreadgroup GROUP mygroup myuser BLOCK 0 STREAMS mystream >
wait_for_blocked_clients_count 1

set start [clock milliseconds]
$rd2 xreadgroup GROUP mygroup myuser BLOCK 1000 STREAMS mystream >
wait_for_blocked_clients_count 2

# After a while call xadd and let rd2 re-process the command.
after 200
r xadd mystream * field value
assert_equal {} [$rd2 read]
set end [clock milliseconds]

# In the past, this time would have been 1000+200, in order to avoid
# timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1100

$rd1 close
$rd2 close
}

test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
r config resetstat
r del mystream
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/type/zset.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,34 @@ start_server {tags {"zset"}} {
}
}

test {BZPOPMIN unblock but the key is expired and then block again - reprocessing command} {
r flushall
r debug set-active-expire 0
set rd [redis_deferring_client]

set start [clock milliseconds]
$rd bzpopmin zset{t} 1
wait_for_blocked_clients_count 1

# The exec will try to awake the blocked client, but the key is expired,
# so the client will be blocked again during the command reprocessing.
r multi
r zadd zset{t} 1 one
r pexpire zset{t} 100
r debug sleep 0.2
r exec

assert_equal {} [$rd read]
set end [clock milliseconds]

# In the past, this time would have been 1000+200, in order to avoid
# timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1100

r debug set-active-expire 1
$rd close
} {0} {needs:debug}

test "BZPOPMIN with same key multiple times should work" {
set rd [redis_deferring_client]
r del z1{t} z2{t}
Expand Down

0 comments on commit 492021d

Please sign in to comment.