Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDMA builtin support #1209

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ To build TLS as Valkey module:
Note that sentinel mode does not support TLS module.

To build with experimental RDMA support you'll need RDMA development libraries
(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu). For now, Valkey only
supports RDMA as connection module mode. Run:
(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu).

To build RDMA support as Valkey built-in:

% make BUILD_RDMA=yes

To build RDMA as Valkey module:

% make BUILD_RDMA=module

Expand Down
30 changes: 15 additions & 15 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -337,26 +337,26 @@ ifeq ($(BUILD_TLS),module)
TLS_MODULE_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_MODULE)
endif

BUILD_RDMA:=no
RDMA_MODULE=
RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so
RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS)
ifeq ($(BUILD_RDMA),module)
FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE)
RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?)
RDMA_LIBS=
RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?)
ifeq ($(RDMA_PKGCONFIG),0)
RDMA_LIBS=$(shell $(PKG_CONFIG) --libs librdmacm libibverbs)
else
RDMA_LIBS=-lrdmacm -libverbs
endif
RDMA_MODULE=$(RDMA_MODULE_NAME)
RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE $(RDMA_LIBS)
else
ifeq ($(BUILD_RDMA),no)
# disable RDMA, do nothing
else
$(error "RDMA is only supported as module (BUILD_RDMA=module), or disabled (BUILD_RDMA=no)")

ifeq ($(BUILD_RDMA),yes)
FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE=$(BUILD_NO)
FINAL_LIBS += $(RDMA_LIBS)
endif

RDMA_MODULE=
RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so
RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS)
ifeq ($(BUILD_RDMA),module)
FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE)
RDMA_MODULE=$(RDMA_MODULE_NAME)
RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) -DBUILD_RDMA_MODULE=$(BUILD_MODULE) $(RDMA_LIBS)
endif

ifndef V
Expand Down Expand Up @@ -423,7 +423,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
118 changes: 98 additions & 20 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -1533,10 +1533,27 @@ void rewriteConfigOOMScoreAdjValuesOption(standardConfig *config, const char *na
}

/* Rewrite the bind option. */
void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
static void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state, char **bindaddr, int bindaddr_count) {
UNUSED(config);
int force = 1;
sds line, addresses;

/* Rewrite as bind <addr1> <addr2> ... <addrN> */
if (bindaddr_count > 0)
addresses = sdsjoin(bindaddr, bindaddr_count, " ");
else
addresses = sdsnew("\"\"");
line = sdsnew(name);
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, addresses);
sdsfree(addresses);

rewriteConfigRewriteLine(state, name, line, force);
}

/* Rewrite the bind option. */
static void rewriteConfigSocketBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
UNUSED(config);
int is_default = 0;

/* Compare server.bindaddr with CONFIG_DEFAULT_BINDADDR */
Expand All @@ -1556,17 +1573,7 @@ void rewriteConfigBindOption(standardConfig *config, const char *name, struct re
return;
}

/* Rewrite as bind <addr1> <addr2> ... <addrN> */
if (server.bindaddr_count > 0)
addresses = sdsjoin(server.bindaddr, server.bindaddr_count, " ");
else
addresses = sdsnew("\"\"");
line = sdsnew(name);
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, addresses);
sdsfree(addresses);

rewriteConfigRewriteLine(state, name, line, force);
rewriteConfigBindOption(config, name, state, server.bindaddr, server.bindaddr_count);
}

/* Rewrite the loadmodule option. */
Expand Down Expand Up @@ -2634,7 +2641,7 @@ static int applyBind(const char **err) {
tcp_listener->ct = connectionByType(CONN_TYPE_SOCKET);
if (changeListener(tcp_listener) == C_ERR) {
*err = "Failed to bind to specified addresses.";
if (tls_listener) closeListener(tls_listener); /* failed with TLS together */
if (tls_listener) connCloseListener(tls_listener); /* failed with TLS together */
return 0;
}

Expand All @@ -2646,7 +2653,7 @@ static int applyBind(const char **err) {
tls_listener->ct = connectionByType(CONN_TYPE_TLS);
if (changeListener(tls_listener) == C_ERR) {
*err = "Failed to bind to specified addresses.";
closeListener(tcp_listener); /* failed with TCP together */
connCloseListener(tcp_listener); /* failed with TCP together */
return 0;
}
}
Expand Down Expand Up @@ -2919,8 +2926,9 @@ static sds getConfigNotifyKeyspaceEventsOption(standardConfig *config) {
return keyspaceEventsFlagsToString(server.notify_keyspace_events);
}

static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err, char **bindaddr, int *bindaddr_count) {
UNUSED(config);
int orig_bindaddr_count = *bindaddr_count;
int j;

if (argc > CONFIG_BINDADDR_MAX) {
Expand All @@ -2932,11 +2940,77 @@ static int setConfigBindOption(standardConfig *config, sds *argv, int argc, cons
if (argc == 1 && sdslen(argv[0]) == 0) argc = 0;

/* Free old bind addresses */
for (j = 0; j < server.bindaddr_count; j++) {
zfree(server.bindaddr[j]);
for (j = 0; j < orig_bindaddr_count; j++) {
zfree(bindaddr[j]);
}
for (j = 0; j < argc; j++) bindaddr[j] = zstrdup(argv[j]);
*bindaddr_count = argc;

return 1;
}

static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);

return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count);
}

static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);

return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count);
}

static sds getConfigRdmaBindOption(standardConfig *config) {
UNUSED(config);
return sdsjoin(server.rdma_ctx_config.bindaddr, server.rdma_ctx_config.bindaddr_count, " ");
}

static void rewriteConfigRdmaBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
UNUSED(config);

if (server.rdma_ctx_config.bindaddr_count) {
rewriteConfigBindOption(config, name, state, server.rdma_ctx_config.bindaddr,
server.rdma_ctx_config.bindaddr_count);
}
}

static int applyRdmaBind(const char **err) {
connListener *rdma_listener = listenerByType(CONN_TYPE_RDMA);

if (!rdma_listener) {
*err = "No RDMA building support.";
return 0;
}

rdma_listener->bindaddr = server.rdma_ctx_config.bindaddr;
rdma_listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
rdma_listener->port = server.rdma_ctx_config.port;
rdma_listener->ct = connectionByType(CONN_TYPE_RDMA);
if (changeListener(rdma_listener) == C_ERR) {
*err = "Failed to bind to specified addresses for RDMA.";
return 0;
}

return 1;
}

static int updateRdmaPort(const char **err) {
connListener *listener = listenerByType(CONN_TYPE_RDMA);

if (listener != NULL) {
*err = "No RDMA building support.";
return 0;
}

listener->bindaddr = server.rdma_ctx_config.bindaddr;
listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
listener->port = server.rdma_ctx_config.port;
listener->ct = connectionByType(CONN_TYPE_RDMA);
if (changeListener(listener) == C_ERR) {
*err = "Unable to listen on this port for RDMA. Check server logs.";
return 0;
}
for (j = 0; j < argc; j++) server.bindaddr[j] = zstrdup(argv[j]);
server.bindaddr_count = argc;

return 1;
}
Expand Down Expand Up @@ -3231,6 +3305,9 @@ standardConfig static_configs[] = {
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-comp-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.comp_vector, -1, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down Expand Up @@ -3310,7 +3387,8 @@ standardConfig static_configs[] = {
createSpecialConfig("client-output-buffer-limit", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigClientOutputBufferLimitOption, getConfigClientOutputBufferLimitOption, rewriteConfigClientOutputBufferLimitOption, NULL),
createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj),
createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL),
createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigBindOption, getConfigBindOption, rewriteConfigBindOption, applyBind),
createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind),
createSpecialConfig("rdma-bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigRdmaBindOption, getConfigRdmaBindOption, rewriteConfigRdmaBindOption, applyRdmaBind),
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),

Expand Down
3 changes: 3 additions & 0 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ int connTypeInitialize(void) {
/* may fail if without BUILD_TLS=yes */
RedisRegisterConnectionTypeTLS();

/* may fail if without BUILD_RDMA=yes */
RegisterConnectionTypeRdma();

return C_OK;
}

Expand Down
10 changes: 10 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef enum {
#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_RDMA "rdma"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */

typedef void (*ConnectionCallbackFunc)(struct connection *conn);
Expand All @@ -79,6 +80,7 @@ typedef struct ConnectionType {
int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
int (*is_local)(connection *conn);
int (*listen)(connListener *listener);
void (*closeListener)(connListener *listener);

/* create/shutdown/close connection */
connection *(*conn_create)(void);
Expand Down Expand Up @@ -442,6 +444,13 @@ static inline int connListen(connListener *listener) {
return listener->ct->listen(listener);
}

/* Close a listened listener */
static inline void connCloseListener(connListener *listener) {
if (listener->count) {
listener->ct->closeListener(listener);
}
}

/* Get accept_handler of a connection type */
static inline aeFileProc *connAcceptHandler(ConnectionType *ct) {
if (ct) return ct->accept_handler;
Expand All @@ -454,6 +463,7 @@ sds getListensInfoString(sds info);
int RedisRegisterConnectionTypeSocket(void);
int RedisRegisterConnectionTypeUnix(void);
int RedisRegisterConnectionTypeTLS(void);
int RegisterConnectionTypeRdma(void);

/* Return 1 if connection is using TLS protocol, 0 if otherwise. */
static inline int connIsTLS(connection *conn) {
Expand Down
Loading
Loading