From 4643bf304421e09f76d00f3acdcafa80f6c753ed Mon Sep 17 00:00:00 2001 From: Anatoliy Bilenko Date: Thu, 22 Feb 2024 10:19:47 +0000 Subject: [PATCH 1/2] Replaced macro-based QUEUE__* interfaces by function-based Signed-off-by: Anatoliy Bilenko --- src/db.c | 4 +- src/fsm.c | 56 ++++++------- src/leader.c | 4 +- src/lib/queue.h | 186 +++++++++++++++++++----------------------- src/registry.c | 16 ++-- src/roles.c | 24 +++--- src/server.c | 18 ++-- test/unit/test_conn.c | 4 +- 8 files changed, 148 insertions(+), 164 deletions(-) diff --git a/src/db.c b/src/db.c index 24e25e16f..23f1b760b 100644 --- a/src/db.c +++ b/src/db.c @@ -46,7 +46,7 @@ int db__init(struct db *db, struct config *config, const char *filename) db->follower = NULL; db->tx_id = 0; db->read_lock = 0; - QUEUE__INIT(&db->leaders); + queue_init(&db->leaders); return 0; err_after_path_alloc: @@ -59,7 +59,7 @@ int db__init(struct db *db, struct config *config, const char *filename) void db__close(struct db *db) { - assert(QUEUE__IS_EMPTY(&db->leaders)); + assert(queue_empty(&db->leaders)); if (db->follower != NULL) { int rc; rc = sqlite3_close(db->follower); diff --git a/src/fsm.c b/src/fsm.c index 4daa86790..50fcbdc83 100644 --- a/src/fsm.c +++ b/src/fsm.c @@ -502,10 +502,10 @@ static unsigned snapshotNumBufs(struct fsm *f) queue *head; unsigned n = 1; /* snapshot header */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { n += 2; /* database header & wal */ - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); n += dbNumPages(db); /* 1 buffer per page (zero copy) */ } @@ -539,12 +539,12 @@ static void freeSnapshotBufs(struct fsm *f, i = 1; /* Free all database headers & WAL buffers */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { if (i == n_bufs) { break; } - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); /* i is the index of the database header */ sqlite3_free(bufs[i].base); /* i is now the index of the next database header (if any) */ @@ -567,9 +567,9 @@ static int fsm__snapshot(struct raft_fsm *fsm, /* First count how many databases we have and check that no transaction * nor checkpoint nor other snapshot is in progress. */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); if (db->tx_id != 0 || db->read_lock) { return RAFT_BUSY; } @@ -577,9 +577,9 @@ static int fsm__snapshot(struct raft_fsm *fsm, } /* Lock all databases, preventing the checkpoint from running */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); rv = databaseReadLock(db); assert(rv == 0); } @@ -598,9 +598,9 @@ static int fsm__snapshot(struct raft_fsm *fsm, /* Encode individual databases. */ i = 1; - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); /* database_header + num_pages + wal */ unsigned n = 1 + dbNumPages(db) + 1; rv = encodeDatabase(db, &(*bufs)[i], n); @@ -618,9 +618,9 @@ static int fsm__snapshot(struct raft_fsm *fsm, err_after_bufs_alloc: sqlite3_free(*bufs); err: - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); databaseReadUnlock(db); } assert(rv != 0); @@ -663,12 +663,12 @@ static int fsm__snapshot_finalize(struct raft_fsm *fsm, /* Unlock all databases that were locked for the snapshot, this is safe * because DB's are only ever added at the back of the queue. */ n_db = 0; - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { if (n_db == header.n) { break; } - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); rv = databaseReadUnlock(db); assert(rv == 0); n_db++; @@ -815,7 +815,7 @@ static unsigned snapshotNumBufsDisk(struct fsm *f) queue *head; unsigned n = 1; /* snapshot header */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { n += 3; /* database header, database file and wal */ } @@ -849,7 +849,7 @@ static void freeSnapshotBufsDisk(struct fsm *f, i = 1; /* Free all database headers & WAL buffers. Unmap the DB file. */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { if (i == n_bufs) { break; @@ -878,9 +878,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm, /* First count how many databases we have and check that no transaction * nor checkpoint nor other snapshot is in progress. */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); if (db->tx_id != 0 || db->read_lock) { return RAFT_BUSY; } @@ -890,9 +890,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm, /* Lock all databases, preventing the checkpoint from running. This * ensures the database is not written while it is mmap'ed and copied by * raft. */ - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); rv = databaseReadLock(db); assert(rv == 0); } @@ -917,9 +917,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm, /* Copy WAL of all databases. */ i = 1; - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); /* database_header + db + WAL */ unsigned n = 3; /* pass pointer to buffer that will contain WAL. */ @@ -938,9 +938,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm, err_after_bufs_alloc: sqlite3_free(*bufs); err: - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); databaseReadUnlock(db); } assert(rv != 0); @@ -972,13 +972,13 @@ static int fsm__snapshot_async_disk(struct raft_fsm *fsm, /* Encode individual databases. */ i = 1; - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { if (i == *n_bufs) { /* In case a db was added in meanwhile */ break; } - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); /* database_header + database file + wal */ unsigned n = 3; rv = encodeDiskDatabaseAsync(db, &(*bufs)[i], n); @@ -1031,12 +1031,12 @@ static int fsm__snapshot_finalize_disk(struct raft_fsm *fsm, /* Unlock all databases that were locked for the snapshot, this is safe * because DB's are only ever added at the back of the queue. */ n_db = 0; - QUEUE__FOREACH(head, &f->registry->dbs) + QUEUE_FOREACH(head, &f->registry->dbs) { if (n_db == header.n) { break; } - db = QUEUE__DATA(head, struct db, queue); + db = QUEUE_DATA(head, struct db, queue); databaseReadUnlock(db); n_db++; } diff --git a/src/leader.c b/src/leader.c index 28a3927cf..1541a28c9 100644 --- a/src/leader.c +++ b/src/leader.c @@ -136,7 +136,7 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft) l->exec = NULL; l->inflight = NULL; - QUEUE__PUSH(&db->leaders, &l->queue); + queue_insert_tail(&db->leaders, &l->queue); return 0; } @@ -153,7 +153,7 @@ void leader__close(struct leader *l) rc = sqlite3_close(l->conn); assert(rc == 0); - QUEUE__REMOVE(&l->queue); + queue_remove(&l->queue); } /* A checkpoint command that fails to commit is not a huge issue. diff --git a/src/lib/queue.h b/src/lib/queue.h index bbaffe802..84fb6c022 100644 --- a/src/lib/queue.h +++ b/src/lib/queue.h @@ -1,106 +1,90 @@ #ifndef LIB_QUEUE_H_ #define LIB_QUEUE_H_ -#include - -typedef void *queue[2]; - -/* Private macros. */ -#define QUEUE__NEXT(q) (*(queue **)&((*(q))[0])) -#define QUEUE__PREV(q) (*(queue **)&((*(q))[1])) - -#define QUEUE__PREV_NEXT(q) (QUEUE__NEXT(QUEUE__PREV(q))) -#define QUEUE__NEXT_PREV(q) (QUEUE__PREV(QUEUE__NEXT(q))) - -/** - * Initialize an empty queue. - */ -#define QUEUE__INIT(q) \ - { \ - QUEUE__NEXT(q) = (q); \ - QUEUE__PREV(q) = (q); \ - } - -/** - * Return true if the queue has no element. - */ -#define QUEUE__IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE__NEXT(q)) - -/** - * Insert an element at the back of a queue. - */ -#define QUEUE__PUSH(q, e) \ - { \ - QUEUE__NEXT(e) = (q); \ - QUEUE__PREV(e) = QUEUE__PREV(q); \ - QUEUE__PREV_NEXT(e) = (e); \ - QUEUE__PREV(q) = (e); \ - } - -#define QUEUE__INSERT_TAIL(q, e) QUEUE__PUSH(q, e) - -/** - * Insert an element at the front of a queue. - */ -#define QUEUE__INSERT_HEAD(h, q) \ - { \ - QUEUE__NEXT(q) = QUEUE__NEXT(h); \ - QUEUE__PREV(q) = (h); \ - QUEUE__NEXT_PREV(q) = (q); \ - QUEUE__NEXT(h) = (q); \ - } - -/** - * Remove the given element from the queue. Any element can be removed at any - * time. - */ -#define QUEUE__REMOVE(e) \ - { \ - QUEUE__PREV_NEXT(e) = QUEUE__NEXT(e); \ - QUEUE__NEXT_PREV(e) = QUEUE__PREV(e); \ - } - -/** - * Moves elements from queue @h to queue @n - * Note: Removed QUEUE__SPLIT() and merged it into QUEUE__MOVE(). - */ -#define QUEUE__MOVE(h, n) \ - { \ - if (QUEUE__IS_EMPTY(h)) { \ - QUEUE__INIT(n); \ - } else { \ - queue *__q = QUEUE__HEAD(h); \ - QUEUE__PREV(n) = QUEUE__PREV(h); \ - QUEUE__PREV_NEXT(n) = (n); \ - QUEUE__NEXT(n) = (__q); \ - QUEUE__PREV(h) = QUEUE__PREV(__q); \ - QUEUE__PREV_NEXT(h) = (h); \ - QUEUE__PREV(__q) = (n); \ - } \ - } - -/** - * Return the element at the front of the queue. - */ -#define QUEUE__HEAD(q) (QUEUE__NEXT(q)) - -/** - * Return the element at the back of the queue. - */ -#define QUEUE__TAIL(q) (QUEUE__PREV(q)) - -/** - * Iternate over the element of a queue. - * - * Mutating the queue while iterating results in undefined behavior. - */ -#define QUEUE__FOREACH(q, e) \ - for ((q) = QUEUE__NEXT(e); (q) != (e); (q) = QUEUE__NEXT(q)) - -/** - * Return the structure holding the given element. - */ -#define QUEUE__DATA(e, type, field) \ - ((type *)(uintptr_t)((char *)(e)-offsetof(type, field))) +#include /* offsetof */ + +struct queue +{ + struct queue *next; + struct queue *prev; +}; + +typedef struct queue queue; + +#define QUEUE_DATA(pointer, type, field) \ + ((type *)((char *)(pointer)-offsetof(type, field))) + +#define QUEUE_FOREACH(q, h) for ((q) = (h)->next; (q) != (h); (q) = (q)->next) + +static inline void queue_init(struct queue *q) +{ + q->next = q; + q->prev = q; +} + +static inline int queue_empty(const struct queue *q) +{ + return q == q->next; +} + +static inline struct queue *queue_head(const struct queue *q) +{ + return q->next; +} + +static inline struct queue *queue_next(const struct queue *q) +{ + return q->next; +} + +static inline void queue_add(struct queue *h, struct queue *n) +{ + h->prev->next = n->next; + n->next->prev = h->prev; + h->prev = n->prev; + h->prev->next = h; +} + +static inline void queue_split(struct queue *h, + struct queue *q, + struct queue *n) +{ + n->prev = h->prev; + n->prev->next = n; + n->next = q; + h->prev = q->prev; + h->prev->next = h; + q->prev = n; +} + +static inline void queue_move(struct queue *h, struct queue *n) +{ + if (queue_empty(h)) + queue_init(n); + else + queue_split(h, h->next, n); +} + +static inline void queue_insert_head(struct queue *h, struct queue *q) +{ + q->next = h->next; + q->prev = h; + q->next->prev = q; + h->next = q; +} + +static inline void queue_insert_tail(struct queue *h, struct queue *q) +{ + q->next = h; + q->prev = h->prev; + q->prev->next = q; + h->prev = q; +} + +static inline void queue_remove(struct queue *q) +{ + q->prev->next = q->next; + q->next->prev = q->prev; +} #endif /* LIB_QUEUE_H_*/ diff --git a/src/registry.c b/src/registry.c index 631dbfc51..46445bcfa 100644 --- a/src/registry.c +++ b/src/registry.c @@ -9,17 +9,17 @@ void registry__init(struct registry *r, struct config *config) { r->config = config; - QUEUE__INIT(&r->dbs); + queue_init(&r->dbs); } void registry__close(struct registry *r) { - while (!QUEUE__IS_EMPTY(&r->dbs)) { + while (!queue_empty(&r->dbs)) { struct db *db; queue *head; - head = QUEUE__HEAD(&r->dbs); - QUEUE__REMOVE(head); - db = QUEUE__DATA(head, struct db, queue); + head = queue_head(&r->dbs); + queue_remove(head); + db = QUEUE_DATA(head, struct db, queue); db__close(db); sqlite3_free(db); } @@ -28,9 +28,9 @@ void registry__close(struct registry *r) int registry__db_get(struct registry *r, const char *filename, struct db **db) { queue *head; - QUEUE__FOREACH(head, &r->dbs) + QUEUE_FOREACH(head, &r->dbs) { - *db = QUEUE__DATA(head, struct db, queue); + *db = QUEUE_DATA(head, struct db, queue); if (strcmp((*db)->filename, filename) == 0) { return 0; } @@ -40,6 +40,6 @@ int registry__db_get(struct registry *r, const char *filename, struct db **db) return DQLITE_NOMEM; } db__init(*db, r->config, filename); - QUEUE__PUSH(&r->dbs, &(*db)->queue); + queue_insert_tail(&r->dbs, &(*db)->queue); return 0; } diff --git a/src/roles.c b/src/roles.c index 21fb6bc5f..e01f4ef36 100644 --- a/src/roles.c +++ b/src/roles.c @@ -183,13 +183,13 @@ static void startChange(struct dqlite_node *d) int role; int rv; - if (QUEUE__IS_EMPTY(&d->roles_changes)) { + if (queue_empty(&d->roles_changes)) { return; } - head = QUEUE__HEAD(&d->roles_changes); - QUEUE__REMOVE(head); - rec = QUEUE__DATA(head, struct change_record, queue); + head = queue_head(&d->roles_changes); + queue_remove(head); + rec = QUEUE_DATA(head, struct change_record, queue); id = rec->id; role = rec->role; raft_free(rec); @@ -228,9 +228,9 @@ static void queueChange(uint64_t id, int role, void *arg) /* If we already queued a role change for this node, just update * that record instead of queueing a new one. */ - QUEUE__FOREACH(head, &d->roles_changes) + QUEUE_FOREACH(head, &d->roles_changes) { - rec = QUEUE__DATA(head, struct change_record, queue); + rec = QUEUE_DATA(head, struct change_record, queue); if (rec->id == id) { rec->role = role; return; @@ -243,7 +243,7 @@ static void queueChange(uint64_t id, int role, void *arg) } rec->id = id; rec->role = role; - QUEUE__PUSH(&d->roles_changes, &rec->queue); + queue_insert_tail(&d->roles_changes, &rec->queue); } void RolesComputeChanges(int voters, @@ -666,7 +666,7 @@ void RolesAdjust(struct dqlite_node *d) } /* If a series of role adjustments is already in progress, don't kick * off another one. */ - if (!QUEUE__IS_EMPTY(&d->roles_changes)) { + if (!queue_empty(&d->roles_changes)) { return; } assert(d->running); @@ -707,10 +707,10 @@ void RolesCancelPendingChanges(struct dqlite_node *d) queue *head; struct change_record *rec; - while (!QUEUE__IS_EMPTY(&d->roles_changes)) { - head = QUEUE__HEAD(&d->roles_changes); - rec = QUEUE__DATA(head, struct change_record, queue); - QUEUE__REMOVE(head); + while (!queue_empty(&d->roles_changes)) { + head = queue_head(&d->roles_changes); + rec = QUEUE_DATA(head, struct change_record, queue); + queue_remove(head); raft_free(rec); } } diff --git a/src/server.c b/src/server.c index 72fd40e2c..819c38857 100644 --- a/src/server.c +++ b/src/server.c @@ -40,9 +40,9 @@ static void state_cb(struct raft *r, if (old_state == RAFT_LEADER && new_state != RAFT_LEADER) { tracef("node %llu@%s: leadership lost", r->id, r->address); - QUEUE__FOREACH(head, &d->conns) + QUEUE_FOREACH(head, &d->conns) { - conn = QUEUE__DATA(head, struct conn, queue); + conn = QUEUE_DATA(head, struct conn, queue); gateway__leader_close(&conn->gateway, RAFT_LEADERSHIPLOST); } @@ -143,9 +143,9 @@ int dqlite__init(struct dqlite_node *d, goto err_after_stopped_init; } - QUEUE__INIT(&d->queue); - QUEUE__INIT(&d->conns); - QUEUE__INIT(&d->roles_changes); + queue_init(&d->queue); + queue_init(&d->conns); + queue_init(&d->roles_changes); d->raft_state = RAFT_UNAVAILABLE; d->running = false; d->listener = NULL; @@ -492,7 +492,7 @@ static void raftCloseCb(struct raft *raft) static void destroy_conn(struct conn *conn) { - QUEUE__REMOVE(&conn->queue); + queue_remove(&conn->queue); sqlite3_free(conn); } @@ -540,9 +540,9 @@ static void stopCb(uv_async_t *stop) } d->running = false; - QUEUE__FOREACH(head, &d->conns) + QUEUE_FOREACH(head, &d->conns) { - conn = QUEUE__DATA(head, struct conn, queue); + conn = QUEUE_DATA(head, struct conn, queue); conn__stop(conn); } raft_close(&d->raft, raftCloseCb); @@ -649,7 +649,7 @@ static void listenCb(uv_stream_t *listener, int status) goto err_after_conn_alloc; } - QUEUE__PUSH(&t->conns, &conn->queue); + queue_insert_tail(&t->conns, &conn->queue); return; diff --git a/test/unit/test_conn.c b/test/unit/test_conn.c index 769c52945..a15708c5b 100644 --- a/test/unit/test_conn.c +++ b/test/unit/test_conn.c @@ -25,7 +25,7 @@ TEST_MODULE(conn); static void connCloseCb(struct conn *conn) { - bool *closed = conn->queue[0]; + bool *closed = (bool *)conn->queue.next; *closed = true; } @@ -56,7 +56,7 @@ static void connCloseCb(struct conn *conn) rv = transport__stream(&f->loop, f->server, &stream); \ munit_assert_int(rv, ==, 0); \ f->closed = false; \ - f->conn.queue[0] = &f->closed; \ + f->conn.queue.next = (queue *)&f->closed; \ rv = conn__start(&f->conn, &f->config, &f->loop, &f->registry, \ &f->raft, stream, &f->raft_transport, seed, \ connCloseCb); \ From 27e78352cf736105fec664c3b7d7b1068201546f Mon Sep 17 00:00:00 2001 From: Anatoliy Bilenko Date: Thu, 22 Feb 2024 10:46:28 +0000 Subject: [PATCH 2/2] Replaced macro-based QUEUE__* interfaces by function-based in raft Signed-off-by: Anatoliy Bilenko --- bt/request | 2 +- src/lib/queue.h | 9 +++- src/lib/threadpool.c | 24 +++++----- src/raft.h | 6 ++- src/raft/client.c | 6 +-- src/raft/convert.c | 10 ++-- src/raft/fixture.c | 24 +++++----- src/raft/lifecycle.c | 6 +-- src/raft/queue.h | 57 ---------------------- src/raft/replication.c | 2 +- src/raft/request.h | 2 +- src/raft/state.c | 2 +- src/raft/uv.c | 32 ++++++------- src/raft/uv.h | 2 +- src/raft/uv_append.c | 94 ++++++++++++++++++------------------- src/raft/uv_finalize.c | 10 ++-- src/raft/uv_prepare.c | 42 ++++++++--------- src/raft/uv_recv.c | 12 ++--- src/raft/uv_send.c | 32 ++++++------- src/raft/uv_snapshot.c | 6 +-- src/raft/uv_tcp.c | 12 ++--- src/raft/uv_tcp.h | 2 +- src/raft/uv_tcp_connect.c | 14 +++--- src/raft/uv_tcp_listen.c | 16 +++---- src/raft/uv_truncate.c | 4 +- src/raft/uv_work.c | 6 +-- src/raft/uv_writer.c | 30 ++++++------ src/raft/uv_writer.h | 2 +- test/raft/unit/test_queue.c | 52 ++++++++++---------- test/unit/test_conn.c | 4 +- 30 files changed, 236 insertions(+), 286 deletions(-) delete mode 100644 src/raft/queue.h diff --git a/bt/request b/bt/request index 8f855fdc6..129f91cd8 100755 --- a/bt/request +++ b/bt/request @@ -11,7 +11,7 @@ struct request void *data; int type; raft_index index; - void *queue[2]; + queue queue; }; uprobe:$libraft_path:lifecycleRequestStart diff --git a/src/lib/queue.h b/src/lib/queue.h index 84fb6c022..10217213c 100644 --- a/src/lib/queue.h +++ b/src/lib/queue.h @@ -11,8 +11,8 @@ struct queue typedef struct queue queue; -#define QUEUE_DATA(pointer, type, field) \ - ((type *)((char *)(pointer)-offsetof(type, field))) +#define QUEUE_DATA(e, type, field) \ + ((type *)((void *)((char *)(e)-offsetof(type, field)))) #define QUEUE_FOREACH(q, h) for ((q) = (h)->next; (q) != (h); (q) = (q)->next) @@ -37,6 +37,11 @@ static inline struct queue *queue_next(const struct queue *q) return q->next; } +static inline struct queue *queue_tail(const struct queue *q) +{ + return q->prev; +} + static inline void queue_add(struct queue *h, struct queue *n) { h->prev->next = n->next; diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c index 3321a5615..28275036d 100644 --- a/src/lib/threadpool.c +++ b/src/lib/threadpool.c @@ -143,25 +143,25 @@ static inline void w_unregister(pool_t *pool, pool_work_t *w) static bool empty(const queue *q) { - return QUEUE__IS_EMPTY(q); + return queue_empty(q); } static queue *head(const queue *q) { - return QUEUE__HEAD(q); + return queue_head(q); } static void push(queue *to, queue *what) { - QUEUE__INSERT_TAIL(to, what); + queue_insert_tail(to, what); } static queue *pop(queue *from) { - queue *q = QUEUE__HEAD(from); + queue *q = queue_head(from); PRE(q != NULL); - QUEUE__REMOVE(q); - QUEUE__INIT(q); + queue_remove(q); + queue_init(q); return q; } @@ -180,7 +180,7 @@ static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second) static pool_work_t *q_to_w(const queue *q) { - return QUEUE__DATA(q, pool_work_t, link); + return QUEUE_DATA(q, pool_work_t, link); } static enum pool_work_type q_type(const queue *q) @@ -413,7 +413,7 @@ static void pool_threads_init(pool_t *pool) .idx = i, }; - QUEUE__INIT(&ts[i].inq); + queue_init(&ts[i].inq); if (uv_cond_init(&ts[i].cond)) { abort(); } @@ -462,7 +462,7 @@ void work_done(uv_async_t *handle) pool_impl_t *pi = CONTAINER_OF(handle, pool_impl_t, outq_async); uv_mutex_lock(&pi->outq_mutex); - QUEUE__MOVE(&pi->outq, &q); + queue_move(&pi->outq, &q); uv_mutex_unlock(&pi->outq_mutex); while (!empty(&q)) { @@ -513,9 +513,9 @@ int pool_init(pool_t *pool, .threads_nr = threads_nr, .ord_in_flight = 0, }; - QUEUE__INIT(&pi->outq); - QUEUE__INIT(&pi->ordered); - QUEUE__INIT(&pi->unordered); + queue_init(&pi->outq); + queue_init(&pi->ordered); + queue_init(&pi->unordered); rc = uv_mutex_init(&pi->outq_mutex); if (rc != 0) { diff --git a/src/raft.h b/src/raft.h index 7f8496c58..c9c7dad9c 100644 --- a/src/raft.h +++ b/src/raft.h @@ -9,6 +9,8 @@ #include +#include "lib/queue.h" + #ifndef RAFT_API #define RAFT_API __attribute__((visibility("default"))) #endif @@ -802,7 +804,7 @@ struct raft raft_index round_index; /* Target of the current round. */ raft_time round_start; /* Start of current round. */ - void *requests[2]; /* Outstanding client requests. */ + queue requests; /* Outstanding client requests. */ uint32_t voter_contacts; /* Current number of voting nodes we are in contact with */ @@ -1029,7 +1031,7 @@ RAFT_API int raft_voter_contacts(struct raft *r); void *data; \ int type; \ raft_index index; \ - void *queue[2]; \ + queue queue; \ uint8_t req_id[16]; \ uint8_t client_id[16]; \ uint8_t unique_id[16]; \ diff --git a/src/raft/client.c b/src/raft/client.c index cd88f1d2b..3a7173049 100644 --- a/src/raft/client.c +++ b/src/raft/client.c @@ -7,7 +7,7 @@ #include "log.h" #include "membership.h" #include "progress.h" -#include "queue.h" +#include "../lib/queue.h" #include "replication.h" #include "request.h" @@ -57,7 +57,7 @@ int raft_apply(struct raft *r, err_after_log_append: logDiscard(r->log, index); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); err: assert(rv != 0); return rv; @@ -106,7 +106,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb) err_after_log_append: logDiscard(r->log, index); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); err_after_buf_alloc: raft_free(buf.base); err: diff --git a/src/raft/convert.c b/src/raft/convert.c index 1c4d52d25..e29adc5ee 100644 --- a/src/raft/convert.c +++ b/src/raft/convert.c @@ -9,7 +9,7 @@ #include "log.h" #include "membership.h" #include "progress.h" -#include "queue.h" +#include "../lib/queue.h" #include "replication.h" #include "request.h" @@ -93,11 +93,11 @@ static void convertClearLeader(struct raft *r) } /* Fail all outstanding requests */ - while (!QUEUE_IS_EMPTY(&r->leader_state.requests)) { + while (!queue_empty(&r->leader_state.requests)) { struct request *req; queue *head; - head = QUEUE_HEAD(&r->leader_state.requests); - QUEUE_REMOVE(head); + head = queue_head(&r->leader_state.requests); + queue_remove(head); req = QUEUE_DATA(head, struct request, queue); assert(req->type == RAFT_COMMAND || req->type == RAFT_BARRIER); switch (req->type) { @@ -209,7 +209,7 @@ int convertToLeader(struct raft *r) r->election_timer_start = r->io->time(r->io); /* Reset apply requests queue */ - QUEUE_INIT(&r->leader_state.requests); + queue_init(&r->leader_state.requests); /* Allocate and initialize the progress array. */ rv = progressBuildArray(r); diff --git a/src/raft/fixture.c b/src/raft/fixture.c index cef8cfc41..9678196e0 100644 --- a/src/raft/fixture.c +++ b/src/raft/fixture.c @@ -10,7 +10,7 @@ #include "convert.h" #include "entry.h" #include "log.h" -#include "queue.h" +#include "../lib/queue.h" #include "snapshot.h" /* Defaults */ @@ -365,7 +365,7 @@ static void ioFlushSend(struct io *io, struct send *send) src = &send->message; dst = &transmit->message; - QUEUE_PUSH(&io->requests, &transmit->queue); + queue_insert_tail(&io->requests, &transmit->queue); *dst = *src; switch (dst->type) { @@ -416,12 +416,12 @@ static void ioDestroyTransmit(struct transmit *transmit) /* Flush all requests in the queue. */ static void ioFlushAll(struct io *io) { - while (!QUEUE_IS_EMPTY(&io->requests)) { + while (!queue_empty(&io->requests)) { queue *head; struct ioRequest *r; - head = QUEUE_HEAD(&io->requests); - QUEUE_REMOVE(head); + head = queue_head(&io->requests); + queue_remove(head); r = QUEUE_DATA(head, struct ioRequest, queue); switch (r->type) { @@ -592,7 +592,7 @@ static int ioMethodAppend(struct raft_io *raft_io, req->cb = cb; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -660,7 +660,7 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io, r->completion_time = *io->time + io->disk_latency; r->trailing = trailing; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -680,7 +680,7 @@ static int ioMethodAsyncWork(struct raft_io *raft_io, r->req->cb = cb; r->completion_time = *io->time + io->work_duration; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -699,7 +699,7 @@ static int ioMethodSnapshotGet(struct raft_io *raft_io, r->req->cb = cb; r->completion_time = *io->time + io->disk_latency; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -749,7 +749,7 @@ static int ioMethodSend(struct raft_io *raft_io, peer = ioGetPeer(io, message->server_id); r->completion_time = *io->time + peer->send_latency; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -881,7 +881,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->snapshot = NULL; io->entries = NULL; io->n = 0; - QUEUE_INIT(&io->requests); + queue_init(&io->requests); io->n_peers = 0; io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100; io->network_latency = NETWORK_LATENCY; @@ -1400,7 +1400,7 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) } } assert(found); - QUEUE_REMOVE(head); + queue_remove(head); switch (r->type) { case APPEND: ioFlushAppend(io, (struct append *)r); diff --git a/src/raft/lifecycle.c b/src/raft/lifecycle.c index bd6d618c7..49d214ef7 100644 --- a/src/raft/lifecycle.c +++ b/src/raft/lifecycle.c @@ -1,6 +1,6 @@ #include "lifecycle.h" #include "../tracing.h" -#include "queue.h" +#include "../lib/queue.h" #include #include @@ -23,7 +23,7 @@ void lifecycleRequestStart(struct raft *r, struct request *req) if (reqIdIsSet(req)) { tracef("request start id:%" PRIu64, extractReqId(req)); } - QUEUE_PUSH(&r->leader_state.requests, &req->queue); + queue_insert_tail(&r->leader_state.requests, &req->queue); } void lifecycleRequestEnd(struct raft *r, struct request *req) @@ -32,5 +32,5 @@ void lifecycleRequestEnd(struct raft *r, struct request *req) if (reqIdIsSet(req)) { tracef("request end id:%" PRIu64, extractReqId(req)); } - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); } diff --git a/src/raft/queue.h b/src/raft/queue.h deleted file mode 100644 index 1262cf554..000000000 --- a/src/raft/queue.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QUEUE_H_ -#define QUEUE_H_ - -#include - -typedef void *queue[2]; - -/* Private macros. */ -#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) -#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) - -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) - -/* Initialize an empty queue. */ -#define QUEUE_INIT(q) \ - { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } - -/* Return true if the queue has no element. */ -#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) - -/* Insert an element at the back of a queue. */ -#define QUEUE_PUSH(q, e) \ - { \ - QUEUE_NEXT(e) = (q); \ - QUEUE_PREV(e) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(e) = (e); \ - QUEUE_PREV(q) = (e); \ - } - -/* Remove the given element from the queue. Any element can be removed at any * - * time. */ -#define QUEUE_REMOVE(e) \ - { \ - QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ - QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ - } - -/* Return the element at the front of the queue. */ -#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) - -/* Return the element at the back of the queue. */ -#define QUEUE_TAIL(q) (QUEUE_PREV(q)) - -/* Iterate over the element of a queue. * Mutating the queue while iterating - * results in undefined behavior. */ -#define QUEUE_FOREACH(q, e) \ - for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) - -/* Return the structure holding the given element. */ -#define QUEUE_DATA(e, type, field) \ - ((type *)((void *)((char *)(e)-offsetof(type, field)))) - -#endif /* QUEUE_H_*/ diff --git a/src/raft/replication.c b/src/raft/replication.c index 8310feb8b..feecfd8f7 100644 --- a/src/raft/replication.c +++ b/src/raft/replication.c @@ -15,7 +15,7 @@ #include "log.h" #include "membership.h" #include "progress.h" -#include "queue.h" +#include "../lib/queue.h" #include "replication.h" #include "request.h" #include "snapshot.h" diff --git a/src/raft/request.h b/src/raft/request.h index 08ad4a36b..86dc21e3d 100644 --- a/src/raft/request.h +++ b/src/raft/request.h @@ -10,7 +10,7 @@ struct request void *data; int type; raft_index index; - void *queue[2]; + queue queue; uint8_t req_id[16]; uint8_t client_id[16]; uint8_t unique_id[16]; diff --git a/src/raft/state.c b/src/raft/state.c index af46d76d9..ace3955c2 100644 --- a/src/raft/state.c +++ b/src/raft/state.c @@ -2,7 +2,7 @@ #include "configuration.h" #include "election.h" #include "log.h" -#include "queue.h" +#include "../lib/queue.h" int raft_state(struct raft *r) { diff --git a/src/raft/uv.c b/src/raft/uv.c index c0602c5d3..f1450e742 100644 --- a/src/raft/uv.c +++ b/src/raft/uv.c @@ -172,10 +172,10 @@ void uvMaybeFireCloseCb(struct uv *uv) if (uv->timer.data != NULL) { return; } - if (!QUEUE_IS_EMPTY(&uv->append_segments)) { + if (!queue_empty(&uv->append_segments)) { return; } - if (!QUEUE_IS_EMPTY(&uv->finalize_reqs)) { + if (!queue_empty(&uv->finalize_reqs)) { return; } if (uv->finalize_work.data != NULL) { @@ -190,13 +190,13 @@ void uvMaybeFireCloseCb(struct uv *uv) if (uv->snapshot_put_work.data != NULL) { return; } - if (!QUEUE_IS_EMPTY(&uv->snapshot_get_reqs)) { + if (!queue_empty(&uv->snapshot_get_reqs)) { return; } - if (!QUEUE_IS_EMPTY(&uv->async_work_reqs)) { + if (!queue_empty(&uv->async_work_reqs)) { return; } - if (!QUEUE_IS_EMPTY(&uv->aborting)) { + if (!queue_empty(&uv->aborting)) { return; } @@ -698,28 +698,28 @@ int raft_uv_init(struct raft_io *io, #endif uv->segment_size = UV__MAX_SEGMENT_SIZE; uv->block_size = 0; - QUEUE_INIT(&uv->clients); - QUEUE_INIT(&uv->servers); + queue_init(&uv->clients); + queue_init(&uv->servers); uv->connect_retry_delay = CONNECT_RETRY_DELAY; uv->prepare_inflight = NULL; - QUEUE_INIT(&uv->prepare_reqs); - QUEUE_INIT(&uv->prepare_pool); + queue_init(&uv->prepare_reqs); + queue_init(&uv->prepare_pool); uv->prepare_next_counter = 1; uv->append_next_index = 1; - QUEUE_INIT(&uv->append_segments); - QUEUE_INIT(&uv->append_pending_reqs); - QUEUE_INIT(&uv->append_writing_reqs); + queue_init(&uv->append_segments); + queue_init(&uv->append_pending_reqs); + queue_init(&uv->append_writing_reqs); uv->barrier = NULL; - QUEUE_INIT(&uv->finalize_reqs); + queue_init(&uv->finalize_reqs); uv->finalize_work.data = NULL; uv->truncate_work.data = NULL; - QUEUE_INIT(&uv->snapshot_get_reqs); - QUEUE_INIT(&uv->async_work_reqs); + queue_init(&uv->snapshot_get_reqs); + queue_init(&uv->async_work_reqs); uv->snapshot_put_work.data = NULL; uv->timer.data = NULL; uv->tick_cb = NULL; /* Set by raft_io->start() */ uv->recv_cb = NULL; /* Set by raft_io->start() */ - QUEUE_INIT(&uv->aborting); + queue_init(&uv->aborting); uv->closing = false; uv->close_cb = NULL; uv->auto_recovery = true; diff --git a/src/raft/uv.h b/src/raft/uv.h index db4009c64..3d0fd342b 100644 --- a/src/raft/uv.h +++ b/src/raft/uv.h @@ -6,7 +6,7 @@ #include "../raft.h" #include "../tracing.h" #include "err.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv_fs.h" #include "uv_os.h" diff --git a/src/raft/uv_append.c b/src/raft/uv_append.c index 9feb66ca5..42756c451 100644 --- a/src/raft/uv_append.c +++ b/src/raft/uv_append.c @@ -1,7 +1,7 @@ #include "assert.h" #include "byte.h" #include "heap.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv.h" #include "uv_encoding.h" #include "uv_writer.h" @@ -92,7 +92,7 @@ static void uvAliveSegmentFinalize(struct uvAliveSegment *s) * close the file handle and release the segment memory. */ } - QUEUE_REMOVE(&s->queue); + queue_remove(&s->queue); UvWriterClose(&s->writer, uvAliveSegmentWriterCloseCb); } @@ -102,10 +102,10 @@ static void uvAppendFinishRequestsInQueue(struct uv *uv, queue *q, int status) { queue queue_copy; struct uvAppend *append; - QUEUE_INIT(&queue_copy); - while (!QUEUE_IS_EMPTY(q)) { + queue_init(&queue_copy); + while (!queue_empty(q)) { queue *head; - head = QUEUE_HEAD(q); + head = queue_head(q); append = QUEUE_DATA(head, struct uvAppend, queue); /* Rollback the append next index if the result was * unsuccessful. */ @@ -116,15 +116,15 @@ static void uvAppendFinishRequestsInQueue(struct uv *uv, queue *q, int status) tracef("rollback uv->append_next_index now:%llu", uv->append_next_index); } - QUEUE_REMOVE(head); - QUEUE_PUSH(&queue_copy, head); + queue_remove(head); + queue_insert_tail(&queue_copy, head); } - while (!QUEUE_IS_EMPTY(&queue_copy)) { + while (!queue_empty(&queue_copy)) { queue *head; struct raft_io_append *req; - head = QUEUE_HEAD(&queue_copy); + head = queue_head(&queue_copy); append = QUEUE_DATA(head, struct uvAppend, queue); - QUEUE_REMOVE(head); + queue_remove(head); req = append->req; RaftHeapFree(append); req->cb(req, status); @@ -150,10 +150,10 @@ static void uvAppendFinishPendingRequests(struct uv *uv, int status) static struct uvAliveSegment *uvGetCurrentAliveSegment(struct uv *uv) { queue *head; - if (QUEUE_IS_EMPTY(&uv->append_segments)) { + if (queue_empty(&uv->append_segments)) { return NULL; } - head = QUEUE_HEAD(&uv->append_segments); + head = queue_head(&uv->append_segments); return QUEUE_DATA(head, struct uvAliveSegment, queue); } @@ -282,14 +282,14 @@ static void uvAliveSegmentWriteCb(struct UvWriterReq *write, const int status) /* During the closing sequence we should have already canceled all * pending request. */ if (uv->closing) { - assert(QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(queue_empty(&uv->append_pending_reqs)); assert(s->finalize); uvAliveSegmentFinalize(s); return; } /* Possibly process waiting requests. */ - if (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { + if (!queue_empty(&uv->append_pending_reqs)) { rv = uvAppendMaybeStart(uv); if (rv != 0) { uv->errored = true; @@ -338,10 +338,10 @@ static int uvAppendMaybeStart(struct uv *uv) int rv; assert(!uv->closing); - assert(!QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(!queue_empty(&uv->append_pending_reqs)); /* If we are already writing, let's wait. */ - if (!QUEUE_IS_EMPTY(&uv->append_writing_reqs)) { + if (!queue_empty(&uv->append_writing_reqs)) { return 0; } @@ -372,18 +372,18 @@ static int uvAppendMaybeStart(struct uv *uv) /* Let's add to the segment's write buffer all pending requests targeted * to this segment. */ - QUEUE_INIT(&q); + queue_init(&q); n_reqs = 0; - while (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { - head = QUEUE_HEAD(&uv->append_pending_reqs); + while (!queue_empty(&uv->append_pending_reqs)) { + head = queue_head(&uv->append_pending_reqs); append = QUEUE_DATA(head, struct uvAppend, queue); assert(append->segment != NULL); if (append->segment != segment) { break; /* Not targeted to this segment */ } - QUEUE_REMOVE(head); - QUEUE_PUSH(&q, head); + queue_remove(head); + queue_insert_tail(&q, head); n_reqs++; rv = uvAliveSegmentEncodeEntriesToWriteBuf(segment, append); if (rv != 0) { @@ -397,21 +397,21 @@ static int uvAppendMaybeStart(struct uv *uv) * request, in that case we need to wait for it). Otherwise it must mean * we have exhausted the queue of pending append requests. */ if (n_reqs == 0) { - assert(QUEUE_IS_EMPTY(&uv->append_writing_reqs)); + assert(queue_empty(&uv->append_writing_reqs)); if (segment->finalize) { uvAliveSegmentFinalize(segment); - if (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { + if (!queue_empty(&uv->append_pending_reqs)) { goto start; } } - assert(QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(queue_empty(&uv->append_pending_reqs)); return 0; } - while (!QUEUE_IS_EMPTY(&q)) { - head = QUEUE_HEAD(&q); - QUEUE_REMOVE(head); - QUEUE_PUSH(&uv->append_writing_reqs, head); + while (!queue_empty(&q)) { + head = queue_head(&q); + queue_remove(head); + queue_insert_tail(&uv->append_writing_reqs, head); } rv = uvAliveSegmentWrite(segment); @@ -457,7 +457,7 @@ static void uvAliveSegmentPrepareCb(struct uvPrepare *req, int status) /* If we have been closed, let's discard the segment. */ if (uv->closing) { - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); assert(status == RAFT_CANCELED); /* UvPrepare cancels pending reqs */ uvSegmentBufferClose(&segment->pending); @@ -476,7 +476,7 @@ static void uvAliveSegmentPrepareCb(struct uvPrepare *req, int status) /* There must be pending appends that were waiting for this prepare * requests. */ - assert(!QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(!queue_empty(&uv->append_pending_reqs)); rv = uvAliveSegmentReady(uv, req->fd, req->counter, segment); if (rv != 0) { @@ -493,7 +493,7 @@ static void uvAliveSegmentPrepareCb(struct uvPrepare *req, int status) return; err: - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); RaftHeapFree(segment); uv->errored = true; uvAppendFinishPendingRequests(uv, rv); @@ -535,7 +535,7 @@ static int uvAppendPushAliveSegment(struct uv *uv) } uvAliveSegmentInit(segment, uv); - QUEUE_PUSH(&uv->append_segments, &segment->queue); + queue_insert_tail(&uv->append_segments, &segment->queue); rv = UvPrepare(uv, &fd, &counter, &segment->prepare, uvAliveSegmentPrepareCb); @@ -557,7 +557,7 @@ static int uvAppendPushAliveSegment(struct uv *uv) UvOsClose(fd); UvFinalize(uv, counter, 0, 0, 0); err_after_alloc: - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); RaftHeapFree(segment); err: assert(rv != 0); @@ -568,10 +568,10 @@ static int uvAppendPushAliveSegment(struct uv *uv) static struct uvAliveSegment *uvGetLastAliveSegment(struct uv *uv) { queue *tail; - if (QUEUE_IS_EMPTY(&uv->append_segments)) { + if (queue_empty(&uv->append_segments)) { return NULL; } - tail = QUEUE_TAIL(&uv->append_segments); + tail = queue_tail(&uv->append_segments); return QUEUE_DATA(tail, struct uvAliveSegment, queue); } @@ -647,7 +647,7 @@ static int uvAppendEnqueueRequest(struct uv *uv, struct uvAppend *append) uvAliveSegmentReserveSegmentCapacity(segment, size); append->segment = segment; - QUEUE_PUSH(&uv->append_pending_reqs, &append->queue); + queue_insert_tail(&uv->append_pending_reqs, &append->queue); uv->append_next_index += append->n; tracef("set uv->append_next_index %llu", uv->append_next_index); @@ -711,7 +711,7 @@ int UvAppend(struct raft_io *io, } assert(append->segment != NULL); - assert(!QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(!queue_empty(&uv->append_pending_reqs)); /* Try to write immediately. */ rv = uvAppendMaybeStart(uv); @@ -753,7 +753,7 @@ static void uvFinalizeCurrentAliveSegmentOnceIdle(struct uv *uv) break; } } - has_writing_reqs = !QUEUE_IS_EMPTY(&uv->append_writing_reqs); + has_writing_reqs = !queue_empty(&uv->append_writing_reqs); /* If there is no pending append request or inflight write against the * current segment, we can submit a request for it to be closed @@ -792,11 +792,11 @@ bool UvBarrierMaybeTrigger(struct UvBarrier *barrier) return false; } - if (!QUEUE_IS_EMPTY(&barrier->reqs)) { + if (!queue_empty(&barrier->reqs)) { queue *head; struct UvBarrierReq *r; - head = QUEUE_HEAD(&barrier->reqs); - QUEUE_REMOVE(head); + head = queue_head(&barrier->reqs); + queue_remove(head); r = QUEUE_DATA(head, struct UvBarrierReq, queue); r->cb(r); return true; @@ -821,7 +821,7 @@ static struct UvBarrier *uvBarrierCreate(void) return NULL; } barrier->blocking = false; - QUEUE_INIT(&barrier->reqs); + queue_init(&barrier->reqs); return barrier; } @@ -902,8 +902,8 @@ int UvBarrier(struct uv *uv, raft_index next_index, struct UvBarrierReq *req) * the callback immediately. * * TODO: find a way to avoid invoking this synchronously. */ - if (QUEUE_IS_EMPTY(&uv->append_segments) && - QUEUE_IS_EMPTY(&uv->finalize_reqs) && + if (queue_empty(&uv->append_segments) && + queue_empty(&uv->finalize_reqs) && uv->finalize_work.data == NULL) { /* Not interested in return value. */ UvBarrierMaybeTrigger(barrier); @@ -930,7 +930,7 @@ void UvUnblock(struct uv *uv) uvMaybeFireCloseCb(uv); return; } - if (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { + if (!queue_empty(&uv->append_pending_reqs)) { int rv; rv = uvAppendMaybeStart(uv); if (rv != 0) { @@ -945,7 +945,7 @@ void UvBarrierAddReq(struct UvBarrier *barrier, struct UvBarrierReq *req) assert(req != NULL); /* Once there's a blocking req, this barrier becomes blocking. */ barrier->blocking |= req->blocking; - QUEUE_PUSH(&barrier->reqs, &req->queue); + queue_insert_tail(&barrier->reqs, &req->queue); } /* Fire all pending barrier requests, the barrier callback will notice that @@ -1022,7 +1022,7 @@ void uvAppendClose(struct uv *uv) /* Also finalize the segments that we didn't write at all and are just * sitting in the append_segments queue waiting for writes against the * current segment to complete. */ - while (!QUEUE_IS_EMPTY(&uv->append_segments)) { + while (!queue_empty(&uv->append_segments)) { segment = uvGetLastAliveSegment(uv); assert(segment != NULL); if (segment == uvGetCurrentAliveSegment(uv)) { diff --git a/src/raft/uv_finalize.c b/src/raft/uv_finalize.c index 638b551ad..beb9d4c00 100644 --- a/src/raft/uv_finalize.c +++ b/src/raft/uv_finalize.c @@ -1,6 +1,6 @@ #include "assert.h" #include "heap.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv.h" #include "uv_os.h" @@ -88,7 +88,7 @@ static void uvFinalizeAfterWorkCb(uv_work_t *work, int status) /* If we have no more dismissed segments to close, check if there's a * barrier to unblock or if we are done closing. */ - if (QUEUE_IS_EMPTY(&uv->finalize_reqs)) { + if (queue_empty(&uv->finalize_reqs)) { tracef("unblock barrier or close"); if (uv->barrier != NULL && UvBarrierReady(uv)) { UvBarrierMaybeTrigger(uv->barrier); @@ -98,9 +98,9 @@ static void uvFinalizeAfterWorkCb(uv_work_t *work, int status) } /* Grab a new dismissed segment to close. */ - head = QUEUE_HEAD(&uv->finalize_reqs); + head = queue_head(&uv->finalize_reqs); segment = QUEUE_DATA(head, struct uvDyingSegment, queue); - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); rv = uvFinalizeStart(segment); if (rv != 0) { @@ -160,7 +160,7 @@ int UvFinalize(struct uv *uv, /* If we're already processing a segment, let's put the request in the * queue and wait. */ if (uv->finalize_work.data != NULL) { - QUEUE_PUSH(&uv->finalize_reqs, &segment->queue); + queue_insert_tail(&uv->finalize_reqs, &segment->queue); return 0; } diff --git a/src/raft/uv_prepare.c b/src/raft/uv_prepare.c index 00355480f..199da6f29 100644 --- a/src/raft/uv_prepare.c +++ b/src/raft/uv_prepare.c @@ -77,12 +77,12 @@ static void uvPrepareWorkCb(uv_work_t *work) * status. */ static void uvPrepareFinishAllRequests(struct uv *uv, int status) { - while (!QUEUE_IS_EMPTY(&uv->prepare_reqs)) { + while (!queue_empty(&uv->prepare_reqs)) { queue *head; struct uvPrepare *req; - head = QUEUE_HEAD(&uv->prepare_reqs); + head = queue_head(&uv->prepare_reqs); req = QUEUE_DATA(head, struct uvPrepare, queue); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); req->cb(req, status); } } @@ -94,10 +94,10 @@ static void uvPrepareConsume(struct uv *uv, uv_file *fd, uvCounter *counter) queue *head; struct uvIdleSegment *segment; /* Pop a segment from the pool. */ - head = QUEUE_HEAD(&uv->prepare_pool); + head = queue_head(&uv->prepare_pool); segment = QUEUE_DATA(head, struct uvIdleSegment, queue); assert(segment->fd >= 0); - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); *fd = segment->fd; *counter = segment->counter; RaftHeapFree(segment); @@ -111,13 +111,13 @@ static void uvPrepareFinishOldestRequest(struct uv *uv) struct uvPrepare *req; assert(!uv->closing); - assert(!QUEUE_IS_EMPTY(&uv->prepare_reqs)); - assert(!QUEUE_IS_EMPTY(&uv->prepare_pool)); + assert(!queue_empty(&uv->prepare_reqs)); + assert(!queue_empty(&uv->prepare_pool)); /* Pop the head of the prepare requests queue. */ - head = QUEUE_HEAD(&uv->prepare_reqs); + head = queue_head(&uv->prepare_reqs); req = QUEUE_DATA(head, struct uvPrepare, queue); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); /* Finish the request */ uvPrepareConsume(uv, &req->fd, &req->counter); @@ -199,8 +199,8 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) /* If we are closing, let's discard the segment. All pending requests * have already being fired with RAFT_CANCELED. */ if (uv->closing) { - assert(QUEUE_IS_EMPTY(&uv->prepare_pool)); - assert(QUEUE_IS_EMPTY(&uv->prepare_reqs)); + assert(queue_empty(&uv->prepare_pool)); + assert(queue_empty(&uv->prepare_reqs)); if (segment->status == 0) { char errmsg[RAFT_ERRMSG_BUF_SIZE]; UvOsClose(segment->fd); @@ -218,7 +218,7 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) * Note that if there's no pending request, we don't set the error * message, to avoid overwriting previous errors. */ if (segment->status != 0) { - if (!QUEUE_IS_EMPTY(&uv->prepare_reqs)) { + if (!queue_empty(&uv->prepare_reqs)) { ErrMsgTransferf(segment->errmsg, uv->io->errmsg, "create segment %s", segment->filename); uvPrepareFinishAllRequests(uv, segment->status); @@ -231,10 +231,10 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) assert(segment->fd >= 0); tracef("completed creation of %s", segment->filename); - QUEUE_PUSH(&uv->prepare_pool, &segment->queue); + queue_insert_tail(&uv->prepare_pool, &segment->queue); /* Let's process any pending request. */ - if (!QUEUE_IS_EMPTY(&uv->prepare_reqs)) { + if (!queue_empty(&uv->prepare_reqs)) { uvPrepareFinishOldestRequest(uv); } @@ -249,7 +249,7 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) * above, thus reducing the pool size and making it smaller than the * target size. */ if (uvPrepareCount(uv) >= UV__TARGET_POOL_SIZE) { - assert(QUEUE_IS_EMPTY(&uv->prepare_reqs)); + assert(queue_empty(&uv->prepare_reqs)); return; } @@ -284,7 +284,7 @@ int UvPrepare(struct uv *uv, assert(!uv->closing); - if (!QUEUE_IS_EMPTY(&uv->prepare_pool)) { + if (!queue_empty(&uv->prepare_pool)) { uvPrepareConsume(uv, fd, counter); goto maybe_start; } @@ -292,7 +292,7 @@ int UvPrepare(struct uv *uv, *fd = -1; *counter = 0; req->cb = cb; - QUEUE_PUSH(&uv->prepare_reqs, &req->queue); + queue_insert_tail(&uv->prepare_reqs, &req->queue); maybe_start: /* If we are already creating a segment, let's just wait. */ @@ -311,7 +311,7 @@ int UvPrepare(struct uv *uv, if (*fd != -1) { uvPrepareDiscard(uv, *fd, *counter); } else { - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); } assert(rv != 0); return rv; @@ -325,12 +325,12 @@ void UvPrepareClose(struct uv *uv) uvPrepareFinishAllRequests(uv, RAFT_CANCELED); /* Remove any unused prepared segment. */ - while (!QUEUE_IS_EMPTY(&uv->prepare_pool)) { + while (!queue_empty(&uv->prepare_pool)) { queue *head; struct uvIdleSegment *segment; - head = QUEUE_HEAD(&uv->prepare_pool); + head = queue_head(&uv->prepare_pool); segment = QUEUE_DATA(head, struct uvIdleSegment, queue); - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); uvPrepareDiscard(uv, segment->fd, segment->counter); RaftHeapFree(segment); } diff --git a/src/raft/uv_recv.c b/src/raft/uv_recv.c index 72d6f5ec6..fe93a21ef 100644 --- a/src/raft/uv_recv.c +++ b/src/raft/uv_recv.c @@ -78,13 +78,13 @@ static int uvServerInit(struct uvServer *s, s->message.type = 0; s->payload.base = NULL; s->payload.len = 0; - QUEUE_PUSH(&uv->servers, &s->queue); + queue_insert_tail(&uv->servers, &s->queue); return 0; } static void uvServerDestroy(struct uvServer *s) { - QUEUE_REMOVE(&s->queue); + queue_remove(&s->queue); if (s->header.base != NULL) { /* This means we were interrupted while reading the header. */ @@ -179,8 +179,8 @@ static void uvServerStreamCloseCb(uv_handle_t *handle) static void uvServerAbort(struct uvServer *s) { struct uv *uv = s->uv; - QUEUE_REMOVE(&s->queue); - QUEUE_PUSH(&uv->aborting, &s->queue); + queue_remove(&s->queue); + queue_insert_tail(&uv->aborting, &s->queue); uv_close((struct uv_handle_s *)s->stream, uvServerStreamCloseCb); } @@ -411,10 +411,10 @@ int UvRecvStart(struct uv *uv) void UvRecvClose(struct uv *uv) { - while (!QUEUE_IS_EMPTY(&uv->servers)) { + while (!queue_empty(&uv->servers)) { queue *head; struct uvServer *server; - head = QUEUE_HEAD(&uv->servers); + head = queue_head(&uv->servers); server = QUEUE_DATA(head, struct uvServer, queue); uvServerAbort(server); } diff --git a/src/raft/uv_send.c b/src/raft/uv_send.c index 86133542c..b4a4aad78 100644 --- a/src/raft/uv_send.c +++ b/src/raft/uv_send.c @@ -95,9 +95,9 @@ static int uvClientInit(struct uvClient *c, rv = uv_timer_init(c->uv->loop, &c->timer); assert(rv == 0); strcpy(c->address, address); - QUEUE_INIT(&c->pending); + queue_init(&c->pending); c->closing = false; - QUEUE_PUSH(&uv->clients, &c->queue); + queue_insert_tail(&uv->clients, &c->queue); return 0; } @@ -119,13 +119,13 @@ static void uvClientMaybeDestroy(struct uvClient *c) return; } - while (!QUEUE_IS_EMPTY(&c->pending)) { + while (!queue_empty(&c->pending)) { queue *head; struct uvSend *send; struct raft_io_send *req; - head = QUEUE_HEAD(&c->pending); + head = queue_head(&c->pending); send = QUEUE_DATA(head, struct uvSend, queue); - QUEUE_REMOVE(head); + queue_remove(head); req = send->req; uvSendDestroy(send); if (req->cb != NULL) { @@ -133,7 +133,7 @@ static void uvClientMaybeDestroy(struct uvClient *c) } } - QUEUE_REMOVE(&c->queue); + queue_remove(&c->queue); assert(c->address != NULL); RaftHeapFree(c->address); @@ -210,7 +210,7 @@ static int uvClientSend(struct uvClient *c, struct uvSend *send) /* If there's no connection available, let's queue the request. */ if (c->stream == NULL) { tracef("no connection available -> enqueue message"); - QUEUE_PUSH(&c->pending, &send->queue); + queue_insert_tail(&c->pending, &send->queue); return 0; } @@ -234,12 +234,12 @@ static void uvClientSendPending(struct uvClient *c) int rv; assert(c->stream != NULL); tracef("send pending messages"); - while (!QUEUE_IS_EMPTY(&c->pending)) { + while (!queue_empty(&c->pending)) { queue *head; struct uvSend *send; - head = QUEUE_HEAD(&c->pending); + head = queue_head(&c->pending); send = QUEUE_DATA(head, struct uvSend, queue); - QUEUE_REMOVE(head); + queue_remove(head); rv = uvClientSend(c, send); if (rv != 0) { if (send->req->cb != NULL) { @@ -322,9 +322,9 @@ static void uvClientConnectCb(struct raft_uv_connect *req, queue *head; struct uvSend *old_send; struct raft_io_send *old_req; - head = QUEUE_HEAD(&c->pending); + head = queue_head(&c->pending); old_send = QUEUE_DATA(head, struct uvSend, queue); - QUEUE_REMOVE(head); + queue_remove(head); old_req = old_send->req; uvSendDestroy(old_send); if (old_req->cb != NULL) { @@ -384,8 +384,8 @@ static void uvClientAbort(struct uvClient *c) uv_is_active((struct uv_handle_s *)&c->timer) || c->connect.data != NULL); - QUEUE_REMOVE(&c->queue); - QUEUE_PUSH(&uv->aborting, &c->queue); + queue_remove(&c->queue); + queue_insert_tail(&uv->aborting, &c->queue); rv = uv_timer_stop(&c->timer); assert(rv == 0); @@ -507,10 +507,10 @@ int UvSend(struct raft_io *io, void UvSendClose(struct uv *uv) { assert(uv->closing); - while (!QUEUE_IS_EMPTY(&uv->clients)) { + while (!queue_empty(&uv->clients)) { queue *head; struct uvClient *client; - head = QUEUE_HEAD(&uv->clients); + head = queue_head(&uv->clients); client = QUEUE_DATA(head, struct uvClient, queue); uvClientAbort(client); } diff --git a/src/raft/uv_snapshot.c b/src/raft/uv_snapshot.c index d4b8910d1..343d1ce07 100644 --- a/src/raft/uv_snapshot.c +++ b/src/raft/uv_snapshot.c @@ -751,7 +751,7 @@ static void uvSnapshotGetAfterWorkCb(uv_work_t *work, int status) int req_status = get->status; struct uv *uv = get->uv; assert(status == 0); - QUEUE_REMOVE(&get->queue); + queue_remove(&get->queue); RaftHeapFree(get); req->cb(req, snapshot, req_status); uvMaybeFireCloseCb(uv); @@ -784,11 +784,11 @@ int UvSnapshotGet(struct raft_io *io, } get->work.data = get; - QUEUE_PUSH(&uv->snapshot_get_reqs, &get->queue); + queue_insert_tail(&uv->snapshot_get_reqs, &get->queue); rv = uv_queue_work(uv->loop, &get->work, uvSnapshotGetWorkCb, uvSnapshotGetAfterWorkCb); if (rv != 0) { - QUEUE_REMOVE(&get->queue); + queue_remove(&get->queue); tracef("get last snapshot: %s", uv_strerror(rv)); rv = RAFT_IOERR; goto err_after_snapshot_alloc; diff --git a/src/raft/uv_tcp.c b/src/raft/uv_tcp.c index 4196b9f56..1455958e0 100644 --- a/src/raft/uv_tcp.c +++ b/src/raft/uv_tcp.c @@ -40,9 +40,9 @@ void UvTcpMaybeFireCloseCb(struct UvTcp *t) return; } - assert(QUEUE_IS_EMPTY(&t->accepting)); - assert(QUEUE_IS_EMPTY(&t->connecting)); - if (!QUEUE_IS_EMPTY(&t->aborting)) { + assert(queue_empty(&t->accepting)); + assert(queue_empty(&t->connecting)); + if (!queue_empty(&t->aborting)) { return; } @@ -82,9 +82,9 @@ int raft_uv_tcp_init(struct raft_uv_transport *transport, t->listeners = NULL; t->n_listeners = 0; t->accept_cb = NULL; - QUEUE_INIT(&t->accepting); - QUEUE_INIT(&t->connecting); - QUEUE_INIT(&t->aborting); + queue_init(&t->accepting); + queue_init(&t->connecting); + queue_init(&t->aborting); t->closing = false; t->close_cb = NULL; diff --git a/src/raft/uv_tcp.h b/src/raft/uv_tcp.h index 924f4f5b3..ed475f89a 100644 --- a/src/raft/uv_tcp.h +++ b/src/raft/uv_tcp.h @@ -2,7 +2,7 @@ #define UV_TCP_H_ #include "../raft.h" -#include "queue.h" +#include "../lib/queue.h" /* Protocol version. */ #define UV__TCP_HANDSHAKE_PROTOCOL 1 diff --git a/src/raft/uv_tcp_connect.c b/src/raft/uv_tcp_connect.c index e493d14a8..57ec3c0ef 100644 --- a/src/raft/uv_tcp_connect.c +++ b/src/raft/uv_tcp_connect.c @@ -77,7 +77,7 @@ static void uvTcpConnectFinish(struct uvTcpConnect *connect) struct uv_stream_s *stream = (struct uv_stream_s *)connect->tcp; struct raft_uv_connect *req = connect->req; int status = connect->status; - QUEUE_REMOVE(&connect->queue); + queue_remove(&connect->queue); RaftHeapFree(connect->handshake.base); uv_freeaddrinfo(connect->getaddrinfo.addrinfo); raft_free(connect); @@ -101,8 +101,8 @@ static void uvTcpConnectUvCloseCb(struct uv_handle_s *handle) /* Abort a connection request. */ static void uvTcpConnectAbort(struct uvTcpConnect *connect) { - QUEUE_REMOVE(&connect->queue); - QUEUE_PUSH(&connect->t->aborting, &connect->queue); + queue_remove(&connect->queue); + queue_insert_tail(&connect->t->aborting, &connect->queue); uv_cancel((struct uv_req_s *)&connect->getaddrinfo); /* Call uv_close on the tcp handle, if there is no getaddrinfo request * in flight and the handle is not currently closed due to next IP @@ -353,7 +353,7 @@ int UvTcpConnect(struct raft_uv_transport *transport, req->cb = cb; /* Keep track of the pending request */ - QUEUE_PUSH(&t->connecting, &r->queue); + queue_insert_tail(&t->connecting, &r->queue); /* Start connecting */ rv = uvTcpConnectStart(r, address); @@ -364,7 +364,7 @@ int UvTcpConnect(struct raft_uv_transport *transport, return 0; err_after_alloc: - QUEUE_REMOVE(&r->queue); + queue_remove(&r->queue); RaftHeapFree(r); err: return rv; @@ -372,10 +372,10 @@ int UvTcpConnect(struct raft_uv_transport *transport, void UvTcpConnectClose(struct UvTcp *t) { - while (!QUEUE_IS_EMPTY(&t->connecting)) { + while (!queue_empty(&t->connecting)) { struct uvTcpConnect *connect; queue *head; - head = QUEUE_HEAD(&t->connecting); + head = queue_head(&t->connecting); connect = QUEUE_DATA(head, struct uvTcpConnect, queue); uvTcpConnectAbort(connect); } diff --git a/src/raft/uv_tcp_listen.c b/src/raft/uv_tcp_listen.c index 41b6ca1ad..c7683d431 100644 --- a/src/raft/uv_tcp_listen.c +++ b/src/raft/uv_tcp_listen.c @@ -69,7 +69,7 @@ static void uvTcpIncomingCloseCb(struct uv_handle_s *handle) { struct uvTcpIncoming *incoming = handle->data; struct UvTcp *t = incoming->t; - QUEUE_REMOVE(&incoming->queue); + queue_remove(&incoming->queue); if (incoming->handshake.address.base != NULL) { RaftHeapFree(incoming->handshake.address.base); } @@ -84,8 +84,8 @@ static void uvTcpIncomingAbort(struct uvTcpIncoming *incoming) struct UvTcp *t = incoming->t; /* After uv_close() returns we are guaranteed that no more alloc_cb or * read_cb will be called. */ - QUEUE_REMOVE(&incoming->queue); - QUEUE_PUSH(&t->aborting, &incoming->queue); + queue_remove(&incoming->queue); + queue_insert_tail(&t->aborting, &incoming->queue); uv_close((struct uv_handle_s *)incoming->tcp, uvTcpIncomingCloseCb); } @@ -143,7 +143,7 @@ static void uvTcpIncomingReadCbAddress(uv_stream_t *stream, assert(rv == 0); id = byteFlip64(incoming->handshake.preamble[1]); address = incoming->handshake.address.base; - QUEUE_REMOVE(&incoming->queue); + queue_remove(&incoming->queue); incoming->t->accept_cb(incoming->t->transport, id, address, (struct uv_stream_s *)incoming->tcp); RaftHeapFree(incoming->handshake.address.base); @@ -274,7 +274,7 @@ static void uvTcpListenCb(struct uv_stream_s *stream, int status) incoming->listener = (struct uv_tcp_s *)stream; incoming->tcp = NULL; - QUEUE_PUSH(&t->accepting, &incoming->queue); + queue_insert_tail(&t->accepting, &incoming->queue); rv = uvTcpIncomingStart(incoming); if (rv != 0) { @@ -284,7 +284,7 @@ static void uvTcpListenCb(struct uv_stream_s *stream, int status) return; err_after_accept_alloc: - QUEUE_REMOVE(&incoming->queue); + queue_remove(&incoming->queue); RaftHeapFree(incoming); err: assert(rv != 0); @@ -411,9 +411,9 @@ void UvTcpListenClose(struct UvTcp *t) queue *head; assert(t->closing); - while (!QUEUE_IS_EMPTY(&t->accepting)) { + while (!queue_empty(&t->accepting)) { struct uvTcpIncoming *incoming; - head = QUEUE_HEAD(&t->accepting); + head = queue_head(&t->accepting); incoming = QUEUE_DATA(head, struct uvTcpIncoming, queue); uvTcpIncomingAbort(incoming); } diff --git a/src/raft/uv_truncate.c b/src/raft/uv_truncate.c index 51bd84fcb..4365a1b96 100644 --- a/src/raft/uv_truncate.c +++ b/src/raft/uv_truncate.c @@ -137,8 +137,8 @@ static void uvTruncateBarrierCb(struct UvBarrierReq *barrier) return; } - assert(QUEUE_IS_EMPTY(&uv->append_writing_reqs)); - assert(QUEUE_IS_EMPTY(&uv->finalize_reqs)); + assert(queue_empty(&uv->append_writing_reqs)); + assert(queue_empty(&uv->finalize_reqs)); assert(uv->finalize_work.data == NULL); assert(uv->truncate_work.data == NULL); diff --git a/src/raft/uv_work.c b/src/raft/uv_work.c index 5b6431b97..42777462f 100644 --- a/src/raft/uv_work.c +++ b/src/raft/uv_work.c @@ -28,7 +28,7 @@ static void uvAsyncAfterWorkCb(uv_work_t *work, int status) struct uv *uv = w->uv; assert(status == 0); - QUEUE_REMOVE(&w->queue); + queue_remove(&w->queue); RaftHeapFree(w); req->cb(req, req_status); uvMaybeFireCloseCb(uv); @@ -56,11 +56,11 @@ int UvAsyncWork(struct raft_io *io, async_work->work.data = async_work; req->cb = cb; - QUEUE_PUSH(&uv->async_work_reqs, &async_work->queue); + queue_insert_tail(&uv->async_work_reqs, &async_work->queue); rv = uv_queue_work(uv->loop, &async_work->work, uvAsyncWorkCb, uvAsyncAfterWorkCb); if (rv != 0) { - QUEUE_REMOVE(&async_work->queue); + queue_remove(&async_work->queue); tracef("async work: %s", uv_strerror(rv)); rv = RAFT_IOERR; goto err_after_req_alloc; diff --git a/src/raft/uv_writer.c b/src/raft/uv_writer.c index b489765b6..840d9a9e9 100644 --- a/src/raft/uv_writer.c +++ b/src/raft/uv_writer.c @@ -33,7 +33,7 @@ static void uvWriterReqSetStatus(struct UvWriterReq *req, int result) * callback if set. */ static void uvWriterReqFinish(struct UvWriterReq *req) { - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); if (req->status != 0) { uvWriterReqTransferErrMsg(req); } @@ -219,10 +219,10 @@ static void uvWriterPollCb(uv_poll_t *poller, int status, int events) return; fail_requests: - while (!QUEUE_IS_EMPTY(&w->poll_queue)) { + while (!queue_empty(&w->poll_queue)) { queue *head; struct UvWriterReq *req; - head = QUEUE_HEAD(&w->poll_queue); + head = queue_head(&w->poll_queue); req = QUEUE_DATA(head, struct UvWriterReq, queue); uvWriterReqSetStatus(req, status); uvWriterReqFinish(req); @@ -251,8 +251,8 @@ int UvWriterInit(struct UvWriter *w, w->event_poller.data = NULL; w->check.data = NULL; w->close_cb = NULL; - QUEUE_INIT(&w->poll_queue); - QUEUE_INIT(&w->work_queue); + queue_init(&w->poll_queue); + queue_init(&w->work_queue); w->closing = false; w->errmsg = errmsg; @@ -352,10 +352,10 @@ static void uvWriterPollerCloseCb(struct uv_handle_s *handle) w->event_poller.data = NULL; /* Cancel all pending requests. */ - while (!QUEUE_IS_EMPTY(&w->poll_queue)) { + while (!queue_empty(&w->poll_queue)) { queue *head; struct UvWriterReq *req; - head = QUEUE_HEAD(&w->poll_queue); + head = queue_head(&w->poll_queue); req = QUEUE_DATA(head, struct UvWriterReq, queue); assert(req->work.data == NULL); req->status = RAFT_CANCELED; @@ -382,7 +382,7 @@ static void uvWriterCheckCloseCb(struct uv_handle_s *handle) static void uvWriterCheckCb(struct uv_check_s *check) { struct UvWriter *w = check->data; - if (!QUEUE_IS_EMPTY(&w->work_queue)) { + if (!queue_empty(&w->work_queue)) { return; } uv_close((struct uv_handle_s *)&w->check, uvWriterCheckCloseCb); @@ -407,7 +407,7 @@ void UvWriterClose(struct UvWriter *w, UvWriterCloseCb cb) /* If we have requests executing in the threadpool, we need to wait for * them. That's done in the check callback. */ - if (!QUEUE_IS_EMPTY(&w->work_queue)) { + if (!queue_empty(&w->work_queue)) { uv_check_start(&w->check, uvWriterCheckCb); } else { uv_close((struct uv_handle_s *)&w->check, uvWriterCheckCloseCb); @@ -440,8 +440,8 @@ int UvWriterSubmit(struct UvWriter *w, * writes, so ensure that we're getting write requests * sequentially. */ if (w->n_events == 1) { - assert(QUEUE_IS_EMPTY(&w->poll_queue)); - assert(QUEUE_IS_EMPTY(&w->work_queue)); + assert(queue_empty(&w->poll_queue)); + assert(queue_empty(&w->work_queue)); } assert(w->fd >= 0); @@ -490,7 +490,7 @@ int UvWriterSubmit(struct UvWriter *w, /* Try to submit the write request asynchronously */ if (w->async) { - QUEUE_PUSH(&w->poll_queue, &req->queue); + queue_insert_tail(&w->poll_queue, &req->queue); rv = UvOsIoSubmit(w->ctx, 1, &iocbs); /* If no error occurred, we're done, the write request was @@ -499,7 +499,7 @@ int UvWriterSubmit(struct UvWriter *w, goto done; } - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); /* Check the reason of the error. */ switch (rv) { @@ -521,7 +521,7 @@ int UvWriterSubmit(struct UvWriter *w, /* If we got here it means we need to run io_submit in the threadpool. */ - QUEUE_PUSH(&w->work_queue, &req->queue); + queue_insert_tail(&w->work_queue, &req->queue); req->work.data = req; rv = uv_queue_work(w->loop, &req->work, uvWriterWorkCb, uvWriterAfterWorkCb); @@ -529,7 +529,7 @@ int UvWriterSubmit(struct UvWriter *w, /* UNTESTED: with the current libuv implementation this can't * fail. */ req->work.data = NULL; - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); UvOsErrMsg(w->errmsg, "uv_queue_work", rv); rv = RAFT_IOERR; goto err; diff --git a/src/raft/uv_writer.h b/src/raft/uv_writer.h index db8f5c293..b4cb7fcde 100644 --- a/src/raft/uv_writer.h +++ b/src/raft/uv_writer.h @@ -6,7 +6,7 @@ #include #include "err.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv_os.h" /* Perform asynchronous writes to a single file. */ diff --git a/test/raft/unit/test_queue.c b/test/raft/unit/test_queue.c index aee0f0a4d..7fc9a263b 100644 --- a/test/raft/unit/test_queue.c +++ b/test/raft/unit/test_queue.c @@ -1,4 +1,4 @@ -#include "../../../src/raft/queue.h" +#include "../../../src/lib/queue.h" #include "../lib/runner.h" /****************************************************************************** @@ -23,7 +23,7 @@ static void *setUp(MUNIT_UNUSED const MunitParameter params[], MUNIT_UNUSED void *user_data) { struct fixture *f = munit_malloc(sizeof *f); - QUEUE_INIT(&f->queue); + queue_init(&f->queue); return f; } @@ -47,12 +47,12 @@ static void tearDown(void *data) for (i_ = 0; i_ < N; i_++) { \ struct item *item_ = &f->items[i_]; \ item_->value = i_ + 1; \ - QUEUE_PUSH(&f->queue, &item_->queue); \ + queue_insert_tail(&f->queue, &item_->queue); \ } \ } /* Remove the i'th fixture item from the fixture queue. */ -#define REMOVE(I) QUEUE_REMOVE(&f->items[I].queue) +#define REMOVE(I) queue_remove(&f->items[I].queue) /****************************************************************************** * @@ -64,7 +64,7 @@ static void tearDown(void *data) * value. */ #define ASSERT_HEAD(VALUE) \ { \ - queue *head_ = QUEUE_HEAD(&f->queue); \ + queue *head_ = queue_head(&f->queue); \ struct item *item_; \ item_ = QUEUE_DATA(head_, struct item, queue); \ munit_assert_int(item_->value, ==, VALUE); \ @@ -73,34 +73,34 @@ static void tearDown(void *data) /* Assert that the item at the tail of the queue has the given value. */ #define ASSERT_TAIL(VALUE) \ { \ - queue *tail_ = QUEUE_TAIL(&f->queue); \ + queue *tail_ = queue_tail(&f->queue); \ struct item *item_; \ item_ = QUEUE_DATA(tail_, struct item, queue); \ munit_assert_int(item_->value, ==, VALUE); \ } /* Assert that the fixture's queue is empty. */ -#define ASSERT_EMPTY munit_assert_true(QUEUE_IS_EMPTY(&f->queue)) +#define ASSERT_EMPTY munit_assert_true(queue_empty(&f->queue)) /* Assert that the fixture's queue is not empty. */ -#define ASSERT_NOT_EMPTY munit_assert_false(QUEUE_IS_EMPTY(&f->queue)) +#define ASSERT_NOT_EMPTY munit_assert_false(queue_empty(&f->queue)) /****************************************************************************** * - * QUEUE_IS_EMPTY + * queue_empty * *****************************************************************************/ -SUITE(QUEUE_IS_EMPTY) +SUITE(queue_empty) -TEST(QUEUE_IS_EMPTY, yes, setUp, tearDown, 0, NULL) +TEST(queue_empty, yes, setUp, tearDown, 0, NULL) { struct fixture *f = data; ASSERT_EMPTY; return MUNIT_OK; } -TEST(QUEUE_IS_EMPTY, no, setUp, tearDown, 0, NULL) +TEST(queue_empty, no, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(1); @@ -110,13 +110,13 @@ TEST(QUEUE_IS_EMPTY, no, setUp, tearDown, 0, NULL) /****************************************************************************** * - * QUEUE_PUSH + * queue_insert_tail * *****************************************************************************/ -SUITE(QUEUE_PUSH) +SUITE(queue_insert_tail) -TEST(QUEUE_PUSH, one, setUp, tearDown, 0, NULL) +TEST(queue_insert_tail, one, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(1); @@ -124,7 +124,7 @@ TEST(QUEUE_PUSH, one, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_PUSH, two, setUp, tearDown, 0, NULL) +TEST(queue_insert_tail, two, setUp, tearDown, 0, NULL) { struct fixture *f = data; int i; @@ -139,13 +139,13 @@ TEST(QUEUE_PUSH, two, setUp, tearDown, 0, NULL) /****************************************************************************** * - * QUEUE_REMOVE + * queue_remove * *****************************************************************************/ -SUITE(QUEUE_REMOVE) +SUITE(queue_remove) -TEST(QUEUE_REMOVE, first, setUp, tearDown, 0, NULL) +TEST(queue_remove, first, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); @@ -154,7 +154,7 @@ TEST(QUEUE_REMOVE, first, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_REMOVE, second, setUp, tearDown, 0, NULL) +TEST(queue_remove, second, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); @@ -163,7 +163,7 @@ TEST(QUEUE_REMOVE, second, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_REMOVE, success, setUp, tearDown, 0, NULL) +TEST(queue_remove, success, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); @@ -174,13 +174,13 @@ TEST(QUEUE_REMOVE, success, setUp, tearDown, 0, NULL) /****************************************************************************** * - * QUEUE_TAIL + * queue_tail * *****************************************************************************/ -SUITE(QUEUE_TAIL) +SUITE(queue_tail) -TEST(QUEUE_TAIL, one, setUp, tearDown, 0, NULL) +TEST(queue_tail, one, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(1); @@ -188,7 +188,7 @@ TEST(QUEUE_TAIL, one, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_TAIL, two, setUp, tearDown, 0, NULL) +TEST(queue_tail, two, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(2); @@ -196,7 +196,7 @@ TEST(QUEUE_TAIL, two, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_TAIL, three, setUp, tearDown, 0, NULL) +TEST(queue_tail, three, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); diff --git a/test/unit/test_conn.c b/test/unit/test_conn.c index a15708c5b..06bcfd7bc 100644 --- a/test/unit/test_conn.c +++ b/test/unit/test_conn.c @@ -25,7 +25,7 @@ TEST_MODULE(conn); static void connCloseCb(struct conn *conn) { - bool *closed = (bool *)conn->queue.next; + uint64_t *closed = (uint64_t *)conn->queue.next; *closed = true; } @@ -37,7 +37,7 @@ static void connCloseCb(struct conn *conn) FIXTURE_RAFT; \ FIXTURE_CLIENT; \ struct conn conn; \ - bool closed; + uint64_t closed; #define SETUP \ struct uv_stream_s *stream; \