Skip to content

Commit

Permalink
Robust hash ring failure retry mechanism
Browse files Browse the repository at this point in the history
Applying robush hash ring failure into random and modula

Fix issue twitter#126
  • Loading branch information
charsyam committed Jun 25, 2013
1 parent 835ae01 commit 40bf389
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 46 deletions.
8 changes: 2 additions & 6 deletions src/hashkit/nc_ketama.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ ketama_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == 0) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand All @@ -104,7 +100,7 @@ ketama_update(struct server_pool *pool)
ASSERT(server->weight > 0);

/* count weight only for live servers */
if (!pool->auto_eject_hosts || server->next_retry <= now) {
if (!pool->auto_eject_hosts || server->fail == 0) {
total_weight += server->weight;
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/hashkit/nc_modula.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ modula_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == 0) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand Down
6 changes: 1 addition & 5 deletions src/hashkit/nc_random.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ random_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == 0) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand Down
5 changes: 5 additions & 0 deletions src/nc_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,8 @@ client_close(struct context *ctx, struct conn *conn)

conn_put(conn);
}

void
client_restore(struct context *ctx, struct conn *conn)
{
}
1 change: 1 addition & 0 deletions src/nc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ bool client_active(struct conn *conn);
void client_ref(struct conn *conn, void *owner);
void client_unref(struct conn *conn);
void client_close(struct context *ctx, struct conn *conn);
void client_restore(struct context *ctx, struct conn *conn);

#endif
1 change: 1 addition & 0 deletions src/nc_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ conf_server_each_transform(void *elem, void *data)

s->next_retry = 0LL;
s->failure_count = 0;
s->fail = FAIL_STATUS_NORMAL;

log_debug(LOG_VERB, "transform to server %"PRIu32" '%.*s'",
s->idx, s->pname.len, s->pname.data);
Expand Down
3 changes: 3 additions & 0 deletions src/nc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ conn_get(void *owner, bool client, bool redis)

conn->close = client_close;
conn->active = client_active;
conn->restore = client_restore;

conn->ref = client_ref;
conn->unref = client_unref;
Expand All @@ -193,6 +194,7 @@ conn_get(void *owner, bool client, bool redis)

conn->close = server_close;
conn->active = server_active;
conn->restore = server_restore;

conn->ref = server_ref;
conn->unref = server_unref;
Expand Down Expand Up @@ -235,6 +237,7 @@ conn_get_proxy(void *owner)

conn->close = proxy_close;
conn->active = NULL;
conn->restore = proxy_restore;

conn->ref = proxy_ref;
conn->unref = proxy_unref;
Expand Down
4 changes: 4 additions & 0 deletions src/nc_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef void (*conn_ref_t)(struct conn *, void *);
typedef void (*conn_unref_t)(struct conn *);

typedef void (*conn_msgq_t)(struct context *, struct conn *, struct msg *);
typedef void (*conn_restore_t)(struct context *, struct conn *);

struct conn {
TAILQ_ENTRY(conn) conn_tqe; /* link in server_pool / server / free q */
Expand All @@ -58,6 +59,7 @@ struct conn {
conn_send_done_t send_done; /* write done handler */
conn_close_t close; /* close handler */
conn_active_t active; /* active? handler */
conn_restore_t restore; /* restore handler */

conn_ref_t ref; /* connection reference handler */
conn_unref_t unref; /* connection unreference handler */
Expand Down Expand Up @@ -96,4 +98,6 @@ ssize_t conn_sendv(struct conn *conn, struct array *sendv, size_t nsend);
void conn_init(void);
void conn_deinit(void);

rstatus_t event_add_out_with_conn(struct context *ctx, struct conn *conn, struct msg *msg);

#endif
76 changes: 73 additions & 3 deletions src/nc_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@

static uint32_t ctx_id; /* context generation */

static void
core_failed_servers_init(struct context *ctx)
{
int i;

for (i = 0; i < 2; i++) {
array_init(&(ctx->failed_servers[i]), 10, sizeof(struct server *));
}
}

static void
core_failed_servers_deinit(struct context *ctx)
{
uint32_t i, n, nsize;

for (i = 0; i < 2; i++) {
nsize = array_n(&(ctx->failed_servers[i]));
for (n = 0; n < nsize; n++) {
array_pop(&(ctx->failed_servers[n]));
}
array_deinit(&(ctx->failed_servers[n]));
}
}

static struct context *
core_ctx_create(struct instance *nci)
{
Expand All @@ -42,6 +66,10 @@ core_ctx_create(struct instance *nci)
ctx->cf = NULL;
ctx->stats = NULL;
array_null(&ctx->pool);
array_null(&(ctx->failed_servers[0]));
array_null(&(ctx->failed_servers[1]));
ctx->failed_idx = 0;
ctx->fails = &(ctx->failed_servers[0]);
ctx->ep = -1;
ctx->nevent = EVENT_SIZE_HINT;
ctx->max_timeout = nci->stats_interval;
Expand All @@ -63,6 +91,9 @@ core_ctx_create(struct instance *nci)
return NULL;
}

/* initialize fails_servers */
core_failed_servers_init(ctx);

/* create stats per server pool */
ctx->stats = stats_create(nci->stats_port, nci->stats_addr, nci->stats_interval,
nci->hostname, &ctx->pool);
Expand Down Expand Up @@ -118,6 +149,7 @@ core_ctx_destroy(struct context *ctx)
log_debug(LOG_VVERB, "destroy ctx %p id %"PRIu32"", ctx, ctx->id);
proxy_deinit(ctx);
server_pool_disconnect(ctx);
core_failed_servers_deinit(ctx);
event_deinit(ctx);
stats_destroy(ctx->stats);
server_pool_deinit(&ctx->pool);
Expand Down Expand Up @@ -231,6 +263,41 @@ core_error(struct context *ctx, struct conn *conn)
core_close(ctx, conn);
}

static void
retry_connection(struct context *ctx)
{
struct array *servers;
int idx;
struct server *server;
int64_t now;
uint32_t i, nsize;
rstatus_t status;

servers = ctx->fails;
idx = (ctx->failed_idx == 0) ? 1 : 0;

ctx->failed_idx = idx;
ctx->fails = &(ctx->failed_servers[idx]);

now = nc_usec_now();
nsize = array_n(servers);
if (nsize == 0) {
return;
}

for (i = 0; i < nsize; i++) {
server = *(struct server **)array_pop(servers);
if (server->next_retry == 0 || server->next_retry < now) {
status = server_reconnect(ctx, server);
if (status != NC_OK) {
add_failed_server(ctx, server);
}
} else {
add_failed_server(ctx, server);
}
}
}

static void
core_timeout(struct context *ctx)
{
Expand All @@ -242,7 +309,7 @@ core_timeout(struct context *ctx)
msg = msg_tmo_min();
if (msg == NULL) {
ctx->timeout = ctx->max_timeout;
return;
break;
}

/* skip over req that are in-error or done */
Expand All @@ -264,7 +331,7 @@ core_timeout(struct context *ctx)
if (now < then) {
int delta = (int)(then - now);
ctx->timeout = MIN(delta, ctx->max_timeout);
return;
break;
}

log_debug(LOG_INFO, "req %"PRIu64" on s %d timedout", msg->id, conn->sd);
Expand All @@ -274,13 +341,14 @@ core_timeout(struct context *ctx)

core_close(ctx, conn);
}

retry_connection(ctx);
}

static void
core_core(struct context *ctx, struct conn *conn, uint32_t events)
{
rstatus_t status;

log_debug(LOG_VVERB, "event %04"PRIX32" on %c %d", events,
conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd);

Expand All @@ -292,6 +360,8 @@ core_core(struct context *ctx, struct conn *conn, uint32_t events)
return;
}

conn->restore(ctx, conn);

/* read takes precedence over write */
if (events & (EPOLLIN | EPOLLHUP)) {
status = core_recv(ctx, conn);
Expand Down
25 changes: 14 additions & 11 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,20 @@ struct instance;
#include <nc_connection.h>

struct context {
uint32_t id; /* unique context id */
struct conf *cf; /* configuration */
struct stats *stats; /* stats */

struct array pool; /* server_pool[] */

int ep; /* epoll device */
int nevent; /* # epoll event */
int max_timeout; /* epoll wait max timeout in msec */
int timeout; /* epoll wait timeout in msec */
struct epoll_event *event; /* epoll event */
uint32_t id; /* unique context id */
struct conf *cf; /* configuration */
struct stats *stats; /* stats */

struct array pool; /* server_pool[] */
struct array failed_servers[2]; /* failed servers */
struct array *fails; /* ref of current fails server */

int failed_idx; /* current idx for failed servers */
int ep; /* epoll device */
int nevent; /* # epoll event */
int max_timeout; /* epoll wait max timeout in msec */
int timeout; /* epoll wait timeout in msec */
struct epoll_event *event; /* epoll event */
};

struct instance {
Expand Down
5 changes: 5 additions & 0 deletions src/nc_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,8 @@ proxy_recv(struct context *ctx, struct conn *conn)

return NC_OK;
}

void
proxy_restore(struct context *ctx, struct conn *conn)
{
}
1 change: 1 addition & 0 deletions src/nc_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ rstatus_t proxy_each_deinit(void *elem, void *data);
rstatus_t proxy_init(struct context *ctx);
void proxy_deinit(struct context *ctx);
rstatus_t proxy_recv(struct context *ctx, struct conn *conn);
void proxy_restore(struct context *ctx, struct conn *conn);

#endif
30 changes: 21 additions & 9 deletions src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,12 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
}
ASSERT(!s_conn->client && !s_conn->proxy);

/* enqueue the message (request) into server inq */
if (TAILQ_EMPTY(&s_conn->imsg_q)) {
status = event_add_out(ctx->ep, s_conn);
if (status != NC_OK) {
req_forward_error(ctx, c_conn, msg);
s_conn->err = errno;
return;
}
status = event_add_out_with_conn(ctx, s_conn, msg);
if (status != NC_OK) {
req_forward_error(ctx, c_conn, msg);
s_conn->err = errno;
return;
}
s_conn->enqueue_inq(ctx, s_conn, msg);

req_forward_stats(ctx, s_conn->owner, msg);

Expand Down Expand Up @@ -586,3 +582,19 @@ req_send_done(struct context *ctx, struct conn *conn, struct msg *msg)
req_put(msg);
}
}

rstatus_t
event_add_out_with_conn(struct context *ctx, struct conn *conn, struct msg *msg)
{
rstatus_t status;

if (TAILQ_EMPTY(&conn->imsg_q)) {
status = event_add_out(ctx->ep, conn);
if (status != NC_OK) {
return status;
}
}

conn->enqueue_inq(ctx, conn, msg);
return NC_OK;
}
9 changes: 9 additions & 0 deletions src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
struct msg *pmsg;
struct server *server;

ASSERT(!conn->client && !conn->proxy);

server = (struct server *)conn->owner;

if (msg_empty(msg)) {
ASSERT(conn->rmsg == NULL);
log_debug(LOG_VERB, "filter empty rsp %"PRIu64" on s %d", msg->id,
Expand All @@ -168,6 +171,12 @@ rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
ASSERT(pmsg->request && !pmsg->done);

if (pmsg->swallow) {
if (server->fail == FAIL_STATUS_ERR_TRY_HEARTBEAT) {
struct conn *c_conn;

c_conn = pmsg->owner;
server_restore_from_heartbeat(server, c_conn);
}
conn->dequeue_outq(ctx, conn, pmsg);
pmsg->done = 1;

Expand Down
Loading

0 comments on commit 40bf389

Please sign in to comment.