Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis script command support #677

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_GEOSEARCHSTORE) \
ACTION( REQ_REDIS_EVAL ) /* redis requests - eval */ \
ACTION( REQ_REDIS_EVALSHA ) \
ACTION( REQ_REDIS_SCRIPT) \
ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \
ACTION( REQ_REDIS_QUIT) \
ACTION( REQ_REDIS_AUTH) \
Expand Down Expand Up @@ -288,6 +289,7 @@ struct msg {
uint32_t nfrag_done; /* # fragment done */
uint64_t frag_id; /* id of fragmented message */
struct msg **frag_seq; /* sequence of fragment message, map from keys to fragments*/
uint32_t frag_multibulk_len; /* fragment response multibulk length */

err_t err; /* errno on error? */
unsigned error:1; /* error? */
Expand All @@ -300,6 +302,8 @@ struct msg {
unsigned fdone:1; /* all fragments are done? */
unsigned swallow:1; /* swallow response? */
unsigned redis:1; /* redis? */

uint32_t redis_script_idx; /* redis script command server index */
};

TAILQ_HEAD(msg_tqh, msg);
Expand Down
2 changes: 1 addition & 1 deletion src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen, msg);
if (s_conn == NULL) {
/*
* Handle a failure to establish a new connection to a server,
Expand Down
12 changes: 8 additions & 4 deletions src/nc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,16 @@ server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t key
}

static struct server *
server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen)
server_pool_server(struct server_pool *pool, struct msg *r, const uint8_t *key, uint32_t keylen)
{
struct server *server;
uint32_t idx;

idx = server_pool_idx(pool, key, keylen);
if (r->type == MSG_REQ_REDIS_SCRIPT) {
idx = r->redis_script_idx;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a member named server_index in struct msg. If the value is >=0, use msg->server_index as target server index. In this way, we don't need redis_script_idx and scan_server_idx at all.

} else {
idx = server_pool_idx(pool, key, keylen);
}
server = array_get(&pool->server, idx);

log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen,
Expand All @@ -716,7 +720,7 @@ server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen

struct conn *
server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key,
uint32_t keylen)
uint32_t keylen, struct msg *msg)
{
rstatus_t status;
struct server *server;
Expand All @@ -728,7 +732,7 @@ server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *k
}

/* from a given {key, keylen} pick a server from pool */
server = server_pool_server(pool, key, keylen);
server = server_pool_server(pool, msg, key, keylen);
if (server == NULL) {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion src/nc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void server_connected(struct context *ctx, struct conn *conn);
void server_ok(struct context *ctx, struct conn *conn);

uint32_t server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen);
struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen);
struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen, struct msg *msg);
rstatus_t server_pool_run(struct server_pool *pool);
rstatus_t server_pool_preconnect(struct context *ctx);
void server_pool_disconnect(struct context *ctx);
Expand Down
191 changes: 187 additions & 4 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ redis_argn(const struct msg *r)
case MSG_REQ_REDIS_GEOSEARCHSTORE:

case MSG_REQ_REDIS_RESTORE:
case MSG_REQ_REDIS_SCRIPT:
return true;

default:
Expand Down Expand Up @@ -1009,6 +1010,11 @@ redis_parse_req(struct msg *r)
break;
}

if (str6icmp(m, 's', 'c', 'r', 'i', 'p', 't')) {
r->type = MSG_REQ_REDIS_SCRIPT;
break;
}

break;

case 7:
Expand Down Expand Up @@ -2585,12 +2591,18 @@ redis_copy_bulk(struct msg *dst, struct msg *src)
}

p = mbuf->pos;
ASSERT(*p == '$');
// ASSERT(*p == '$');
p++;

if (p[0] == '-' && p[1] == '1') {
len = 1 + 2 + CRLF_LEN; /* $-1\r\n */
p = mbuf->pos + len;
} else if ((mbuf->pos)[0] == ':') {
for (; p < mbuf->last && isdigit(*p);) {
p++;
}
len = (p - mbuf->pos);
len += CRLF_LEN;
} else {
len = 0;
for (; p < mbuf->last && isdigit(*p); p++) {
Expand Down Expand Up @@ -2643,6 +2655,8 @@ redis_pre_coalesce(struct msg *r)
{
struct msg *pr = r->peer; /* peer request */
struct mbuf *mbuf;
uint8_t *key;
struct keypos *kpos;

ASSERT(!r->request);
ASSERT(pr->request);
Expand All @@ -2653,6 +2667,9 @@ redis_pre_coalesce(struct msg *r)
}
pr->frag_owner->nfrag_done++;

kpos = array_get(pr->keys, 0);
key = kpos->start;

switch (r->type) {
case MSG_RSP_REDIS_INTEGER:
/* only redis 'del' fragmented request sends back integer reply */
Expand All @@ -2677,7 +2694,7 @@ redis_pre_coalesce(struct msg *r)

case MSG_RSP_REDIS_MULTIBULK:
/* only redis 'mget' fragmented request sends back multi-bulk reply */
ASSERT(pr->type == MSG_REQ_REDIS_MGET);
ASSERT(pr->type == MSG_REQ_REDIS_MGET || pr->type == MSG_REQ_REDIS_SCRIPT);

mbuf = STAILQ_FIRST(&r->mhdr);
/*
Expand All @@ -2693,6 +2710,26 @@ redis_pre_coalesce(struct msg *r)
r->mlen -= (uint32_t)(r->narg_end - r->narg_start);
mbuf->pos = r->narg_end;

if (pr->type == MSG_REQ_REDIS_SCRIPT && str6icmp(key, 'e', 'x', 'i', 's', 't', 's')) {
uint8_t *p;
uint32_t len = 0;
p = r->narg_start;
ASSERT(p[0] == '*');
p++;

if (p[0] == '-' && p[1] == '1') {
r->frag_multibulk_len = 0;
} else {
for(;p < r->narg_end && isdigit(*p); p++){
len = 10*len + (uint32_t)(*p - '0');
}
r->frag_multibulk_len = len;
}
}

break;

case MSG_RSP_REDIS_BULK:
break;

case MSG_RSP_REDIS_STATUS:
Expand Down Expand Up @@ -2945,10 +2982,97 @@ redis_fragment_argx(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq,
return NC_OK;
}

static rstatus_t redis_fragment_script(struct msg *r, struct msg_tqh *frag_msgq) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can implement a function redis_fragment_broadcast which can broadcast a request to all servers, we can implement broadcast commands like DBSIZE, FLUSHALL easily. What I mean is this function is not general enough, which can only handle SCRIPT command.

struct server_pool *sp = r->owner->owner;
struct msg **sub_msgs;
uint32_t i,n;
struct mbuf *mbuf,*nbuf,*rbuf;
size_t mlen;
rstatus_t status;
struct keypos *rkpos;
uint32_t rklen;
uint32_t rkey_offset;

ASSERT(sp != NULL);

n = array_n(&sp->server);
log_debug(LOG_VVERB,"server_name: %.*s,server_count:%i",sp->name.len,sp->name.data,n);

sub_msgs = nc_zalloc(n * sizeof(*sub_msgs));
if (sub_msgs == NULL) {
return NC_ENOMEM;
}

ASSERT(r->frag_seq == NULL);
r->frag_seq = nc_alloc(n * sizeof(*r->frag_seq));
if (r->frag_seq == NULL) {
nc_free(sub_msgs);
return NC_ENOMEM;
}

r->frag_id = msg_gen_frag_id();
r->nfrag = 0;
r->frag_owner = r;

ASSERT(array_n(r->keys)>0);
rkpos=array_get(r->keys,0); // key position in original request
rklen = (uint32_t)(rkpos->end - rkpos->start); // the keylen in original request
rbuf = STAILQ_FIRST(&r->mhdr);
if (rbuf == NULL) {
return NC_ERROR;
}
rkey_offset = (uint32_t)(rkpos->start - rbuf->pos); // the offset of key->start from rbuf->pos

for (i = 0; i < n; i++) { /* create a sub_msg for per server */
struct msg *sub_msg;
uint32_t idx = i;
if (sub_msgs[idx] == NULL) {
sub_msgs[idx] = msg_get(r->owner, r->request, r->redis);
if (sub_msgs[idx] == NULL) {
nc_free(sub_msgs);
return NC_ENOMEM;
}
}
r->frag_seq[i] = sub_msg = sub_msgs[idx];

sub_msg->narg = r->narg;
sub_msg->redis_script_idx = idx;
//copy r->mhdr
for (mbuf=STAILQ_FIRST(&r->mhdr);mbuf!=NULL;mbuf=nbuf) {
nbuf=STAILQ_NEXT(mbuf,next);
if(mbuf_empty(mbuf)) continue;

mlen=mbuf_length(mbuf);
status=msg_append(sub_msg,mbuf->pos,mlen);
if (status != NC_OK) return status;
}
struct keypos *kpos;
kpos = array_push(sub_msg->keys);
if (kpos == NULL) {
return NC_ENOMEM;
}
mbuf = STAILQ_FIRST(&sub_msg->mhdr);
if (mbuf == NULL) {
return NC_ERROR;
}
kpos->start=mbuf->pos + rkey_offset; //confirm sub_msg key position
kpos->end=kpos->start + rklen;

sub_msg->type = r->type;
sub_msg->frag_id = r->frag_id;
sub_msg->frag_owner = r->frag_owner;
TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe);
r->nfrag++;
}

nc_free(sub_msgs);
return NC_OK;
}

rstatus_t
redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq)
{
if (1 == array_n(r->keys)){
if (1 == array_n(r->keys) && r->type != MSG_REQ_REDIS_SCRIPT){
return NC_OK;
}

Expand All @@ -2962,7 +3086,8 @@ redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq)
/* TODO: MSETNX - instead of responding with OK, respond with 1 if all fragments respond with 1 */
case MSG_REQ_REDIS_MSET:
return redis_fragment_argx(r, nserver, frag_msgq, 2);

case MSG_REQ_REDIS_SCRIPT:
return redis_fragment_script(r,frag_msgq);
default:
return NC_OK;
}
Expand Down Expand Up @@ -3053,6 +3178,61 @@ redis_post_coalesce_mget(struct msg *request)
}
}

static void redis_post_coalesce_script(struct msg *request)
{
struct msg *response = request->peer;
struct msg *sub_msg;
rstatus_t status;
uint32_t i,j;
uint8_t *key;
struct keypos *kpos;

kpos = array_get(request->keys, 0);
key = kpos->start;

for (i = 0; i < request->nfrag; i++) { /* for each key */
sub_msg = request->frag_seq[i]->peer; /* get it's peer response */
if (sub_msg == NULL) {
response->owner->err = 1;
return;
}
/* Only one response data is retained and the rest is discarded */
if(i ==0){
if(str6icmp(key, 'e', 'x', 'i', 's', 't', 's')){
status = msg_prepend_format(response, "*%d\r\n", sub_msg->frag_multibulk_len);
if (status != NC_OK) {
response->owner->err = 1;
return;
}
for(j=0;j<sub_msg->frag_multibulk_len;j++){
status = redis_copy_bulk(response, sub_msg);
if (status != NC_OK) {
response->owner->err = 1;
return;
}
}
}else{
status = redis_copy_bulk(response, sub_msg);
}
}else{
if(str6icmp(key, 'e', 'x', 'i', 's', 't', 's')){
for(j=0;j<sub_msg->frag_multibulk_len;j++){
status = redis_copy_bulk(NULL, sub_msg);
if (status != NC_OK) {
response->owner->err = 1;
return;
}
}
}else{
status = redis_copy_bulk(NULL, sub_msg);
}
}
if (status != NC_OK) {
response->owner->err = 1;
return;
}
}
}
/*
* Post-coalesce handler is invoked when the message is a response to
* the fragmented multi vector request - 'mget' or 'del' and all the
Expand Down Expand Up @@ -3083,6 +3263,9 @@ redis_post_coalesce(struct msg *r)
case MSG_REQ_REDIS_MSET:
return redis_post_coalesce_mset(r);

case MSG_REQ_REDIS_SCRIPT:
return redis_post_coalesce_script(r);

default:
NOT_REACHED();
}
Expand Down
15 changes: 15 additions & 0 deletions tests/test_redis/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,18 @@ def test_sscan():
assert_equal('0', str(cursor))
assert_equal({b'1'}, set(members))

def test_script_load_and_exits():
r = getconn()

evalsha=r.script_load("return redis.call('hset',KEYS[1],KEYS[1],KEYS[1])")
assert_equal(evalsha,"dbbae75a09f1390aaf069fb60e951ec23cab7a15")

exists=r.script_exists("dbbae75a09f1390aaf069fb60e951ec23cab7a15")
assert_equal([True],exists)

assert_equal(1,r.evalsha("dbbae75a09f1390aaf069fb60e951ec23cab7a15",1,"scriptA"))

dic=r.hgetall("scriptA")
assert_equal(dic,{b'scriptA': b'scriptA'})

assert_equal(True,r.script_flush())