diff --git a/src/nc_message.h b/src/nc_message.h index 26a063bc..c7c24540 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -196,6 +196,7 @@ typedef enum msg_parse_result { ACTION( REQ_REDIS_GEOSEARCHSTORE) \ ACTION( REQ_REDIS_EVAL ) /* redis requests - eval */ \ ACTION( REQ_REDIS_EVALSHA ) \ + ACTION( REQ_REDIS_SCRIPT) \ ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \ ACTION( REQ_REDIS_QUIT) \ ACTION( REQ_REDIS_AUTH) \ @@ -288,6 +289,7 @@ struct msg { uint32_t nfrag_done; /* # fragment done */ uint64_t frag_id; /* id of fragmented message */ struct msg **frag_seq; /* sequence of fragment message, map from keys to fragments*/ + uint32_t frag_multibulk_len; /* fragment response multibulk length */ err_t err; /* errno on error? */ unsigned error:1; /* error? */ @@ -300,6 +302,8 @@ struct msg { unsigned fdone:1; /* all fragments are done? */ unsigned swallow:1; /* swallow response? */ unsigned redis:1; /* redis? */ + + uint32_t redis_script_idx; /* redis script command server index */ }; TAILQ_HEAD(msg_tqh, msg); diff --git a/src/nc_request.c b/src/nc_request.c index d69dd1ee..7bcdf486 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -573,7 +573,7 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg) key = kpos->start; keylen = (uint32_t)(kpos->end - kpos->start); - s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen); + s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen, msg); if (s_conn == NULL) { /* * Handle a failure to establish a new connection to a server, diff --git a/src/nc_server.c b/src/nc_server.c index dab6a79b..fb05225f 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -700,12 +700,16 @@ server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t key } static struct server * -server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen) +server_pool_server(struct server_pool *pool, struct msg *r, const uint8_t *key, uint32_t keylen) { struct server *server; uint32_t idx; - idx = server_pool_idx(pool, key, keylen); + if (r->type == MSG_REQ_REDIS_SCRIPT) { + idx = r->redis_script_idx; + } else { + idx = server_pool_idx(pool, key, keylen); + } server = array_get(&pool->server, idx); log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen, @@ -716,7 +720,7 @@ server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen struct conn * server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, - uint32_t keylen) + uint32_t keylen, struct msg *msg) { rstatus_t status; struct server *server; @@ -728,7 +732,7 @@ server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *k } /* from a given {key, keylen} pick a server from pool */ - server = server_pool_server(pool, key, keylen); + server = server_pool_server(pool, msg, key, keylen); if (server == NULL) { return NULL; } diff --git a/src/nc_server.h b/src/nc_server.h index 6bab9b56..62192b89 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -137,7 +137,7 @@ void server_connected(struct context *ctx, struct conn *conn); void server_ok(struct context *ctx, struct conn *conn); uint32_t server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen); -struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen); +struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen, struct msg *msg); rstatus_t server_pool_run(struct server_pool *pool); rstatus_t server_pool_preconnect(struct context *ctx); void server_pool_disconnect(struct context *ctx); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index a80f84a3..514f0bdd 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -283,6 +283,7 @@ redis_argn(const struct msg *r) case MSG_REQ_REDIS_GEOSEARCHSTORE: case MSG_REQ_REDIS_RESTORE: + case MSG_REQ_REDIS_SCRIPT: return true; default: @@ -1009,6 +1010,11 @@ redis_parse_req(struct msg *r) break; } + if (str6icmp(m, 's', 'c', 'r', 'i', 'p', 't')) { + r->type = MSG_REQ_REDIS_SCRIPT; + break; + } + break; case 7: @@ -2585,12 +2591,18 @@ redis_copy_bulk(struct msg *dst, struct msg *src) } p = mbuf->pos; - ASSERT(*p == '$'); + // ASSERT(*p == '$'); p++; if (p[0] == '-' && p[1] == '1') { len = 1 + 2 + CRLF_LEN; /* $-1\r\n */ p = mbuf->pos + len; + } else if ((mbuf->pos)[0] == ':') { + for (; p < mbuf->last && isdigit(*p);) { + p++; + } + len = (p - mbuf->pos); + len += CRLF_LEN; } else { len = 0; for (; p < mbuf->last && isdigit(*p); p++) { @@ -2643,6 +2655,8 @@ redis_pre_coalesce(struct msg *r) { struct msg *pr = r->peer; /* peer request */ struct mbuf *mbuf; + uint8_t *key; + struct keypos *kpos; ASSERT(!r->request); ASSERT(pr->request); @@ -2653,6 +2667,9 @@ redis_pre_coalesce(struct msg *r) } pr->frag_owner->nfrag_done++; + kpos = array_get(pr->keys, 0); + key = kpos->start; + switch (r->type) { case MSG_RSP_REDIS_INTEGER: /* only redis 'del' fragmented request sends back integer reply */ @@ -2677,7 +2694,7 @@ redis_pre_coalesce(struct msg *r) case MSG_RSP_REDIS_MULTIBULK: /* only redis 'mget' fragmented request sends back multi-bulk reply */ - ASSERT(pr->type == MSG_REQ_REDIS_MGET); + ASSERT(pr->type == MSG_REQ_REDIS_MGET || pr->type == MSG_REQ_REDIS_SCRIPT); mbuf = STAILQ_FIRST(&r->mhdr); /* @@ -2693,6 +2710,26 @@ redis_pre_coalesce(struct msg *r) r->mlen -= (uint32_t)(r->narg_end - r->narg_start); mbuf->pos = r->narg_end; + if (pr->type == MSG_REQ_REDIS_SCRIPT && str6icmp(key, 'e', 'x', 'i', 's', 't', 's')) { + uint8_t *p; + uint32_t len = 0; + p = r->narg_start; + ASSERT(p[0] == '*'); + p++; + + if (p[0] == '-' && p[1] == '1') { + r->frag_multibulk_len = 0; + } else { + for(;p < r->narg_end && isdigit(*p); p++){ + len = 10*len + (uint32_t)(*p - '0'); + } + r->frag_multibulk_len = len; + } + } + + break; + + case MSG_RSP_REDIS_BULK: break; case MSG_RSP_REDIS_STATUS: @@ -2945,10 +2982,97 @@ redis_fragment_argx(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq, return NC_OK; } +static rstatus_t redis_fragment_script(struct msg *r, struct msg_tqh *frag_msgq) { + struct server_pool *sp = r->owner->owner; + struct msg **sub_msgs; + uint32_t i,n; + struct mbuf *mbuf,*nbuf,*rbuf; + size_t mlen; + rstatus_t status; + struct keypos *rkpos; + uint32_t rklen; + uint32_t rkey_offset; + + ASSERT(sp != NULL); + + n = array_n(&sp->server); + log_debug(LOG_VVERB,"server_name: %.*s,server_count:%i",sp->name.len,sp->name.data,n); + + sub_msgs = nc_zalloc(n * sizeof(*sub_msgs)); + if (sub_msgs == NULL) { + return NC_ENOMEM; + } + + ASSERT(r->frag_seq == NULL); + r->frag_seq = nc_alloc(n * sizeof(*r->frag_seq)); + if (r->frag_seq == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + + r->frag_id = msg_gen_frag_id(); + r->nfrag = 0; + r->frag_owner = r; + + ASSERT(array_n(r->keys)>0); + rkpos=array_get(r->keys,0); // key position in original request + rklen = (uint32_t)(rkpos->end - rkpos->start); // the keylen in original request + rbuf = STAILQ_FIRST(&r->mhdr); + if (rbuf == NULL) { + return NC_ERROR; + } + rkey_offset = (uint32_t)(rkpos->start - rbuf->pos); // the offset of key->start from rbuf->pos + + for (i = 0; i < n; i++) { /* create a sub_msg for per server */ + struct msg *sub_msg; + uint32_t idx = i; + if (sub_msgs[idx] == NULL) { + sub_msgs[idx] = msg_get(r->owner, r->request, r->redis); + if (sub_msgs[idx] == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + } + r->frag_seq[i] = sub_msg = sub_msgs[idx]; + + sub_msg->narg = r->narg; + sub_msg->redis_script_idx = idx; + //copy r->mhdr + for (mbuf=STAILQ_FIRST(&r->mhdr);mbuf!=NULL;mbuf=nbuf) { + nbuf=STAILQ_NEXT(mbuf,next); + if(mbuf_empty(mbuf)) continue; + + mlen=mbuf_length(mbuf); + status=msg_append(sub_msg,mbuf->pos,mlen); + if (status != NC_OK) return status; + } + struct keypos *kpos; + kpos = array_push(sub_msg->keys); + if (kpos == NULL) { + return NC_ENOMEM; + } + mbuf = STAILQ_FIRST(&sub_msg->mhdr); + if (mbuf == NULL) { + return NC_ERROR; + } + kpos->start=mbuf->pos + rkey_offset; //confirm sub_msg key position + kpos->end=kpos->start + rklen; + + sub_msg->type = r->type; + sub_msg->frag_id = r->frag_id; + sub_msg->frag_owner = r->frag_owner; + TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe); + r->nfrag++; + } + + nc_free(sub_msgs); + return NC_OK; +} + rstatus_t redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq) { - if (1 == array_n(r->keys)){ + if (1 == array_n(r->keys) && r->type != MSG_REQ_REDIS_SCRIPT){ return NC_OK; } @@ -2962,7 +3086,8 @@ redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq) /* TODO: MSETNX - instead of responding with OK, respond with 1 if all fragments respond with 1 */ case MSG_REQ_REDIS_MSET: return redis_fragment_argx(r, nserver, frag_msgq, 2); - + case MSG_REQ_REDIS_SCRIPT: + return redis_fragment_script(r,frag_msgq); default: return NC_OK; } @@ -3053,6 +3178,61 @@ redis_post_coalesce_mget(struct msg *request) } } +static void redis_post_coalesce_script(struct msg *request) +{ + struct msg *response = request->peer; + struct msg *sub_msg; + rstatus_t status; + uint32_t i,j; + uint8_t *key; + struct keypos *kpos; + + kpos = array_get(request->keys, 0); + key = kpos->start; + + for (i = 0; i < request->nfrag; i++) { /* for each key */ + sub_msg = request->frag_seq[i]->peer; /* get it's peer response */ + if (sub_msg == NULL) { + response->owner->err = 1; + return; + } + /* Only one response data is retained and the rest is discarded */ + if(i ==0){ + if(str6icmp(key, 'e', 'x', 'i', 's', 't', 's')){ + status = msg_prepend_format(response, "*%d\r\n", sub_msg->frag_multibulk_len); + if (status != NC_OK) { + response->owner->err = 1; + return; + } + for(j=0;jfrag_multibulk_len;j++){ + status = redis_copy_bulk(response, sub_msg); + if (status != NC_OK) { + response->owner->err = 1; + return; + } + } + }else{ + status = redis_copy_bulk(response, sub_msg); + } + }else{ + if(str6icmp(key, 'e', 'x', 'i', 's', 't', 's')){ + for(j=0;jfrag_multibulk_len;j++){ + status = redis_copy_bulk(NULL, sub_msg); + if (status != NC_OK) { + response->owner->err = 1; + return; + } + } + }else{ + status = redis_copy_bulk(NULL, sub_msg); + } + } + if (status != NC_OK) { + response->owner->err = 1; + return; + } + } +} /* * Post-coalesce handler is invoked when the message is a response to * the fragmented multi vector request - 'mget' or 'del' and all the @@ -3083,6 +3263,9 @@ redis_post_coalesce(struct msg *r) case MSG_REQ_REDIS_MSET: return redis_post_coalesce_mset(r); + case MSG_REQ_REDIS_SCRIPT: + return redis_post_coalesce_script(r); + default: NOT_REACHED(); } diff --git a/tests/test_redis/test_commands.py b/tests/test_redis/test_commands.py index f4ed3a73..73e8c2fb 100644 --- a/tests/test_redis/test_commands.py +++ b/tests/test_redis/test_commands.py @@ -101,3 +101,18 @@ def test_sscan(): assert_equal('0', str(cursor)) assert_equal({b'1'}, set(members)) +def test_script_load_and_exits(): + r = getconn() + + evalsha=r.script_load("return redis.call('hset',KEYS[1],KEYS[1],KEYS[1])") + assert_equal(evalsha,"dbbae75a09f1390aaf069fb60e951ec23cab7a15") + + exists=r.script_exists("dbbae75a09f1390aaf069fb60e951ec23cab7a15") + assert_equal([True],exists) + + assert_equal(1,r.evalsha("dbbae75a09f1390aaf069fb60e951ec23cab7a15",1,"scriptA")) + + dic=r.hgetall("scriptA") + assert_equal(dic,{b'scriptA': b'scriptA'}) + + assert_equal(True,r.script_flush()) \ No newline at end of file