Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix module / script call CLUSTER SLOTS / SHARDS fake client check crash #1063

Merged
merged 7 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,7 @@ struct client *createAOFClient(void) {
*/
c->raw_flag = 0;
c->flag.deny_blocking = 1;
c->flag.fake = 1;

/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
Expand Down
1 change: 1 addition & 0 deletions src/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ void scriptingInit(int setup) {
if (lctx.lua_client == NULL) {
lctx.lua_client = createClient(NULL);
lctx.lua_client->flag.script = 1;
lctx.lua_client->flag.fake = 1;

/* We do not want to allow blocking commands inside Lua */
lctx.lua_client->flag.deny_blocking = 1;
Expand Down
1 change: 1 addition & 0 deletions src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) {
client *c = createClient(NULL);
c->flag.deny_blocking = 1;
c->flag.script = 1;
c->flag.fake = 1;
engineInfo *ei = zmalloc(sizeof(*ei));
*ei = (engineInfo){
.name = engine_name_sds,
Expand Down
5 changes: 4 additions & 1 deletion src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ client *moduleAllocTempClient(void) {
} else {
c = createClient(NULL);
c->flag.module = 1;
c->flag.fake = 1;
c->user = NULL; /* Root user */
}
return c;
Expand Down Expand Up @@ -890,8 +891,10 @@ void moduleCreateContext(ValkeyModuleCtx *out_ctx, ValkeyModule *module, int ctx
out_ctx->flags = ctx_flags;
if (ctx_flags & VALKEYMODULE_CTX_TEMP_CLIENT)
out_ctx->client = moduleAllocTempClient();
else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT)
else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT) {
out_ctx->client = createClient(NULL);
out_ctx->client->flag.fake = 1;
}

/* Calculate the initial yield time for long blocked contexts.
* in loading we depend on the server hz, but in other cases we also wait
Expand Down
7 changes: 5 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ client *createCachedResponseClient(int resp) {
/* Allocating the `conn` allows to prepare the caching client before adding
* data to the clients output buffer by `prepareClientToWrite`. */
recording_client->conn = zcalloc(sizeof(connection));
recording_client->flag.fake = 1;
return recording_client;
}

Expand Down Expand Up @@ -3250,8 +3251,10 @@ char *getClientSockname(client *c) {
int isClientConnIpV6(client *c) {
/* The cached client peer id is on the form "[IPv6]:port" for IPv6
* addresses, so we just check for '[' here. */
if (c->conn->type == NULL && server.current_client) {
/* Fake client? Use current client instead. */
if (c->flag.fake && server.current_client) {
/* Fake client? Use current client instead.
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
* Noted that in here we are assuming server.current_client is set
* and real (aof has already violated this in loadSingleAppendOnlyFil). */
c = server.current_client;
}
return getClientPeerId(c)[0] == '[';
Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,8 @@ typedef struct ClientFlags {
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t reserved : 6; /* Reserved for future use */
uint64_t fake : 1; /* This is a fake client without a real connection. */
uint64_t reserved : 5; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down
23 changes: 23 additions & 0 deletions tests/integration/aof.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -673,3 +673,26 @@ tags {"aof external:skip"} {
}
}
}

# make sure the test infra won't use SELECT
set old_singledb $::singledb
set ::singledb 1

tags {"aof cluster external:skip"} {
test {Test cluster slots / cluster shards in aof won't crash} {
create_aof $aof_dirpath $aof_file {
append_to_aof [formatCommand cluster slots]
append_to_aof [formatCommand cluster shards]
}

create_aof_manifest $aof_dirpath $aof_manifest_file {
append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n"
}

start_server_aof [list dir $server_path cluster-enabled yes] {
assert_equal [r ping] {PONG}
}
}
}

set ::singledb $old_singledb
3 changes: 2 additions & 1 deletion tests/modules/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ TEST_MODULES = \
postnotifications.so \
moduleauthtwo.so \
rdbloadsave.so \
crash.so
crash.so \
cluster.so

.PHONY: all

Expand Down
51 changes: 51 additions & 0 deletions tests/modules/cluster.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "valkeymodule.h"

#define UNUSED(x) (void)(x)

int test_cluster_slots(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);

if (argc != 1) return ValkeyModule_WrongArity(ctx);

ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SLOTS");
if (!rep) {
ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned");
} else {
ValkeyModule_ReplyWithCallReply(ctx, rep);
ValkeyModule_FreeCallReply(rep);
}

return VALKEYMODULE_OK;
}

int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);

if (argc != 1) return ValkeyModule_WrongArity(ctx);

ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SHARDS");
if (!rep) {
ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned");
} else {
ValkeyModule_ReplyWithCallReply(ctx, rep);
ValkeyModule_FreeCallReply(rep);
}

return VALKEYMODULE_OK;
}

int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "test.cluster_shards", test_cluster_shards, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

return VALKEYMODULE_OK;
}
8 changes: 8 additions & 0 deletions tests/unit/cluster/scripting.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {

assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e
}

test "Calling cluster slots in scripts is OK" {
assert_equal [lsort [r 0 cluster slots]] [lsort [r 0 eval "return redis.call('cluster', 'slots')" 0]]
}

test "Calling cluster shards in scripts is OK" {
assert_equal [lsort [r 0 cluster shards]] [lsort [r 0 eval "return redis.call('cluster', 'shards')" 0]]
}
}
24 changes: 22 additions & 2 deletions tests/unit/moduleapi/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ start_cluster 2 2 [list config_lines $modules] {
}
}

}

set testmodule [file normalize tests/modules/basics.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
Expand All @@ -234,3 +232,25 @@ start_cluster 3 0 [list config_lines $modules] {
assert_equal {PONG} [$node3 PING]
}
}

set testmodule [file normalize tests/modules/cluster.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]

test "VM_CALL with cluster slots" {
assert_equal [lsort [$node1 cluster slots]] [lsort [$node1 test.cluster_slots]]
assert_equal [lsort [$node2 cluster slots]] [lsort [$node2 test.cluster_slots]]
assert_equal [lsort [$node3 cluster slots]] [lsort [$node3 test.cluster_slots]]
}

test "VM_CALL with cluster shards" {
assert_equal [lsort [$node1 cluster shards]] [lsort [$node1 test.cluster_shards]]
assert_equal [lsort [$node2 cluster shards]] [lsort [$node2 test.cluster_shards]]
assert_equal [lsort [$node3 cluster shards]] [lsort [$node3 test.cluster_shards]]
}
}

} ;# end tag
Loading