Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust hash ring failure retry mechanism #29

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/hashkit/nc_ketama.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ ketama_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == 0) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand All @@ -104,7 +100,7 @@ ketama_update(struct server_pool *pool)
ASSERT(server->weight > 0);

/* count weight only for live servers */
if (!pool->auto_eject_hosts || server->next_retry <= now) {
if (!pool->auto_eject_hosts || server->fail == 0) {
total_weight += server->weight;
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/hashkit/nc_modula.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ modula_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == 0) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand Down
6 changes: 1 addition & 5 deletions src/hashkit/nc_random.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ random_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == 0) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand Down
5 changes: 5 additions & 0 deletions src/nc_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,8 @@ client_close(struct context *ctx, struct conn *conn)

conn_put(conn);
}

void
client_restore(struct context *ctx, struct conn *conn)
{
}
1 change: 1 addition & 0 deletions src/nc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ bool client_active(struct conn *conn);
void client_ref(struct conn *conn, void *owner);
void client_unref(struct conn *conn);
void client_close(struct context *ctx, struct conn *conn);
void client_restore(struct context *ctx, struct conn *conn);

#endif
1 change: 1 addition & 0 deletions src/nc_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ conf_server_each_transform(void *elem, void *data)

s->next_retry = 0LL;
s->failure_count = 0;
s->fail = FAIL_STATUS_NORMAL;

log_debug(LOG_VERB, "transform to server %"PRIu32" '%.*s'",
s->idx, s->pname.len, s->pname.data);
Expand Down
3 changes: 3 additions & 0 deletions src/nc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ conn_get(void *owner, bool client, bool redis)

conn->close = client_close;
conn->active = client_active;
conn->restore = client_restore;

conn->ref = client_ref;
conn->unref = client_unref;
Expand All @@ -193,6 +194,7 @@ conn_get(void *owner, bool client, bool redis)

conn->close = server_close;
conn->active = server_active;
conn->restore = server_restore;

conn->ref = server_ref;
conn->unref = server_unref;
Expand Down Expand Up @@ -235,6 +237,7 @@ conn_get_proxy(void *owner)

conn->close = proxy_close;
conn->active = NULL;
conn->restore = proxy_restore;

conn->ref = proxy_ref;
conn->unref = proxy_unref;
Expand Down
4 changes: 4 additions & 0 deletions src/nc_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef void (*conn_ref_t)(struct conn *, void *);
typedef void (*conn_unref_t)(struct conn *);

typedef void (*conn_msgq_t)(struct context *, struct conn *, struct msg *);
typedef void (*conn_restore_t)(struct context *, struct conn *);

struct conn {
TAILQ_ENTRY(conn) conn_tqe; /* link in server_pool / server / free q */
Expand All @@ -58,6 +59,7 @@ struct conn {
conn_send_done_t send_done; /* write done handler */
conn_close_t close; /* close handler */
conn_active_t active; /* active? handler */
conn_restore_t restore; /* restore handler */

conn_ref_t ref; /* connection reference handler */
conn_unref_t unref; /* connection unreference handler */
Expand Down Expand Up @@ -96,4 +98,6 @@ ssize_t conn_sendv(struct conn *conn, struct array *sendv, size_t nsend);
void conn_init(void);
void conn_deinit(void);

rstatus_t event_add_out_with_conn(struct context *ctx, struct conn *conn, struct msg *msg);

#endif
76 changes: 73 additions & 3 deletions src/nc_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@

static uint32_t ctx_id; /* context generation */

static void
core_failed_servers_init(struct context *ctx)
{
int i;

for (i = 0; i < 2; i++) {
array_init(&(ctx->failed_servers[i]), 10, sizeof(struct server *));
}
}

static void
core_failed_servers_deinit(struct context *ctx)
{
uint32_t i, n, nsize;

for (i = 0; i < 2; i++) {
nsize = array_n(&(ctx->failed_servers[i]));
for (n = 0; n < nsize; n++) {
array_pop(&(ctx->failed_servers[n]));
}
array_deinit(&(ctx->failed_servers[n]));
}
}

static struct context *
core_ctx_create(struct instance *nci)
{
Expand All @@ -42,6 +66,10 @@ core_ctx_create(struct instance *nci)
ctx->cf = NULL;
ctx->stats = NULL;
array_null(&ctx->pool);
array_null(&(ctx->failed_servers[0]));
array_null(&(ctx->failed_servers[1]));
ctx->failed_idx = 0;
ctx->fails = &(ctx->failed_servers[0]);
ctx->ep = -1;
ctx->nevent = EVENT_SIZE_HINT;
ctx->max_timeout = nci->stats_interval;
Expand All @@ -63,6 +91,9 @@ core_ctx_create(struct instance *nci)
return NULL;
}

/* initialize fails_servers */
core_failed_servers_init(ctx);

/* create stats per server pool */
ctx->stats = stats_create(nci->stats_port, nci->stats_addr, nci->stats_interval,
nci->hostname, &ctx->pool);
Expand Down Expand Up @@ -118,6 +149,7 @@ core_ctx_destroy(struct context *ctx)
log_debug(LOG_VVERB, "destroy ctx %p id %"PRIu32"", ctx, ctx->id);
proxy_deinit(ctx);
server_pool_disconnect(ctx);
core_failed_servers_deinit(ctx);
event_deinit(ctx);
stats_destroy(ctx->stats);
server_pool_deinit(&ctx->pool);
Expand Down Expand Up @@ -231,6 +263,41 @@ core_error(struct context *ctx, struct conn *conn)
core_close(ctx, conn);
}

static void
retry_connection(struct context *ctx)
{
struct array *servers;
int idx;
struct server *server;
int64_t now;
uint32_t i, nsize;
rstatus_t status;

servers = ctx->fails;
idx = (ctx->failed_idx == 0) ? 1 : 0;

ctx->failed_idx = idx;
ctx->fails = &(ctx->failed_servers[idx]);

now = nc_usec_now();
nsize = array_n(servers);
if (nsize == 0) {
return;
}

for (i = 0; i < nsize; i++) {
server = *(struct server **)array_pop(servers);
if (server->next_retry == 0 || server->next_retry < now) {
status = server_reconnect(ctx, server);
if (status != NC_OK) {
add_failed_server(ctx, server);
}
} else {
add_failed_server(ctx, server);
}
}
}

static void
core_timeout(struct context *ctx)
{
Expand All @@ -242,7 +309,7 @@ core_timeout(struct context *ctx)
msg = msg_tmo_min();
if (msg == NULL) {
ctx->timeout = ctx->max_timeout;
return;
break;
}

/* skip over req that are in-error or done */
Expand All @@ -264,7 +331,7 @@ core_timeout(struct context *ctx)
if (now < then) {
int delta = (int)(then - now);
ctx->timeout = MIN(delta, ctx->max_timeout);
return;
break;
}

log_debug(LOG_INFO, "req %"PRIu64" on s %d timedout", msg->id, conn->sd);
Expand All @@ -274,13 +341,14 @@ core_timeout(struct context *ctx)

core_close(ctx, conn);
}

retry_connection(ctx);
}

static void
core_core(struct context *ctx, struct conn *conn, uint32_t events)
{
rstatus_t status;

log_debug(LOG_VVERB, "event %04"PRIX32" on %c %d", events,
conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd);

Expand All @@ -292,6 +360,8 @@ core_core(struct context *ctx, struct conn *conn, uint32_t events)
return;
}

conn->restore(ctx, conn);

/* read takes precedence over write */
if (events & (EPOLLIN | EPOLLHUP)) {
status = core_recv(ctx, conn);
Expand Down
25 changes: 14 additions & 11 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,20 @@ struct instance;
#include <nc_connection.h>

struct context {
uint32_t id; /* unique context id */
struct conf *cf; /* configuration */
struct stats *stats; /* stats */

struct array pool; /* server_pool[] */

int ep; /* epoll device */
int nevent; /* # epoll event */
int max_timeout; /* epoll wait max timeout in msec */
int timeout; /* epoll wait timeout in msec */
struct epoll_event *event; /* epoll event */
uint32_t id; /* unique context id */
struct conf *cf; /* configuration */
struct stats *stats; /* stats */

struct array pool; /* server_pool[] */
struct array failed_servers[2]; /* failed servers */
struct array *fails; /* ref of current fails server */

int failed_idx; /* current idx for failed servers */
int ep; /* epoll device */
int nevent; /* # epoll event */
int max_timeout; /* epoll wait max timeout in msec */
int timeout; /* epoll wait timeout in msec */
struct epoll_event *event; /* epoll event */
};

struct instance {
Expand Down
5 changes: 5 additions & 0 deletions src/nc_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,8 @@ proxy_recv(struct context *ctx, struct conn *conn)

return NC_OK;
}

void
proxy_restore(struct context *ctx, struct conn *conn)
{
}
1 change: 1 addition & 0 deletions src/nc_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ rstatus_t proxy_each_deinit(void *elem, void *data);
rstatus_t proxy_init(struct context *ctx);
void proxy_deinit(struct context *ctx);
rstatus_t proxy_recv(struct context *ctx, struct conn *conn);
void proxy_restore(struct context *ctx, struct conn *conn);

#endif
30 changes: 21 additions & 9 deletions src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,12 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
}
ASSERT(!s_conn->client && !s_conn->proxy);

/* enqueue the message (request) into server inq */
if (TAILQ_EMPTY(&s_conn->imsg_q)) {
status = event_add_out(ctx->ep, s_conn);
if (status != NC_OK) {
req_forward_error(ctx, c_conn, msg);
s_conn->err = errno;
return;
}
status = event_add_out_with_conn(ctx, s_conn, msg);
if (status != NC_OK) {
req_forward_error(ctx, c_conn, msg);
s_conn->err = errno;
return;
}
s_conn->enqueue_inq(ctx, s_conn, msg);

req_forward_stats(ctx, s_conn->owner, msg);

Expand Down Expand Up @@ -586,3 +582,19 @@ req_send_done(struct context *ctx, struct conn *conn, struct msg *msg)
req_put(msg);
}
}

rstatus_t
event_add_out_with_conn(struct context *ctx, struct conn *conn, struct msg *msg)
{
rstatus_t status;

if (TAILQ_EMPTY(&conn->imsg_q)) {
status = event_add_out(ctx->ep, conn);
if (status != NC_OK) {
return status;
}
}

conn->enqueue_inq(ctx, conn, msg);
return NC_OK;
}
9 changes: 9 additions & 0 deletions src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
struct msg *pmsg;
struct server *server;

ASSERT(!conn->client && !conn->proxy);

server = (struct server *)conn->owner;

if (msg_empty(msg)) {
ASSERT(conn->rmsg == NULL);
log_debug(LOG_VERB, "filter empty rsp %"PRIu64" on s %d", msg->id,
Expand All @@ -168,6 +171,12 @@ rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
ASSERT(pmsg->request && !pmsg->done);

if (pmsg->swallow) {
if (server->fail == FAIL_STATUS_ERR_TRY_HEARTBEAT) {
struct conn *c_conn;

c_conn = pmsg->owner;
server_restore_from_heartbeat(server, c_conn);
}
conn->dequeue_outq(ctx, conn, pmsg);
pmsg->done = 1;

Expand Down
Loading