Skip to content

Commit

Permalink
Merge pull request #678 from cole-miller/tx-observability-work
Browse files Browse the repository at this point in the history
Start instrumenting existing code with state machines
  • Loading branch information
cole-miller authored Oct 7, 2024
2 parents 45a0740 + 85e494d commit 81eeab5
Show file tree
Hide file tree
Showing 23 changed files with 687 additions and 294 deletions.
12 changes: 2 additions & 10 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,9 @@ jobs:
env:
CC: ${{ matrix.compiler }}
LIBDQLITE_TRACE: 1
ASAN_OPTIONS: fast_unwind_on_malloc=0
run: |
# TODO: return to just `make check` once the mysterious hang is fixed
tests="$(make print-test-programs | tr ' ' '\n' | grep -v '^unit-test$')"
make check TESTS="$tests"
# Grab backtraces when the unit-test binary hangs (this is a heisenbug
# that we've only been able to trigger in GHA jobs)
./unit-test --no-fork &
pid=$!
bash -c "sleep 10m; sudo gdb -p $pid -batch -ex 'thread apply all bt' -ex quit; false" &
wait -n
make check || (cat test-suite.log && false)
- name: Coverage
env:
Expand Down
1 change: 0 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ libraft_la_SOURCES = \
src/raft/fixture.c \
src/raft/flags.c \
src/raft/heap.c \
src/raft/lifecycle.c \
src/raft/log.c \
src/raft/membership.c \
src/raft/progress.c \
Expand Down
47 changes: 47 additions & 0 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,59 @@
#include "gateway.h"
#include "id.h"
#include "leader.h"
#include "lib/sm.h"
#include "lib/threadpool.h"
#include "server.h"
#include "tracing.h"
#include "utils.h"
#include "vfs.h"

/**
* State machine for exec requests.
*/
enum {
EXEC_START,
EXEC_BARRIER,
EXEC_STEPPED,
EXEC_POLLED,
EXEC_DONE,
EXEC_FAILED,
EXEC_NR,
};

#define A(ident) BITS(EXEC_##ident)
#define S(ident, allowed_, flags_) \
[EXEC_##ident] = { .name = #ident, .allowed = (allowed_), .flags = (flags_) }

static const struct sm_conf exec_states[EXEC_NR] = {
S(START, A(BARRIER)|A(FAILED)|A(DONE), SM_INITIAL),
S(BARRIER, A(STEPPED)|A(FAILED)|A(DONE), 0),
S(STEPPED, A(POLLED)|A(FAILED)|A(DONE), 0),
S(POLLED, A(FAILED)|A(DONE), 0),
S(DONE, 0, SM_FINAL),
S(FAILED, 0, SM_FAILURE|SM_FINAL),
};

static bool exec_invariant(const struct sm *sm, int prev)
{
(void)sm;
(void)prev;
return true;
}

/* Called when a leader exec request terminates and the associated callback can
* be invoked. */
static void leaderExecDone(struct exec *req)
{
tracef("leader exec done id:%" PRIu64, req->id);
req->leader->exec = NULL;
/* SQLITE_DONE (= 101) indicates success, any other code including
* SQLITE_OK (= 0) indicates failure. */
int status = req->status == SQLITE_DONE ? 0 :
req->status == SQLITE_OK ? SQLITE_ERROR :
req->status;
sm_done(&req->sm, EXEC_DONE, EXEC_FAILED, status);
sm_fini(&req->sm);
if (req->cb != NULL) {
req->cb(req, req->status);
}
Expand Down Expand Up @@ -366,10 +407,12 @@ static void leaderExecV2(struct exec *req, enum pool_half half)

if (half == POOL_TOP_HALF) {
req->status = sqlite3_step(req->stmt);
sm_move(&req->sm, EXEC_STEPPED);
return;
} /* else POOL_BOTTOM_HALF => */

rv = VfsPoll(vfs, db->path, &frames, &n);
sm_move(&req->sm, EXEC_POLLED);
if (rv != 0 || n == 0) {
tracef("vfs poll");
goto finish;
Expand Down Expand Up @@ -413,6 +456,8 @@ static void execBarrierCb(struct barrier *barrier, int status)
struct exec *req = barrier->data;
struct leader *l = req->leader;

sm_move(&req->sm, EXEC_BARRIER);

if (status != 0) {
l->exec->status = status;
leaderExecDone(l->exec);
Expand Down Expand Up @@ -444,6 +489,8 @@ int leader__exec(struct leader *l,
req->barrier.data = req;
req->barrier.cb = NULL;
req->work = (pool_work_t){};
sm_init(&req->sm, exec_invariant, NULL, exec_states, "exec",
EXEC_START);

rv = leader__barrier(l, &req->barrier, execBarrierCb);
if (rv != 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/leader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
#include <sqlite3.h>
#include <stdbool.h>

#include "./lib/queue.h"
#include "db.h"
#include "lib/queue.h"
#include "lib/sm.h" /* struct sm */
#include "lib/threadpool.h"
#include "raft.h"

Expand Down Expand Up @@ -65,6 +66,7 @@ struct exec {
queue queue;
exec_cb cb;
pool_work_t work;
struct sm sm;
};

/**
Expand Down
32 changes: 25 additions & 7 deletions src/lib/sm.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ void sm_init(struct sm *m,

PRE(conf[state].flags & SM_INITIAL);

m->conf = conf;
m->state = state;
m->invariant = invariant;
m->is_locked = is_locked;
m->id = ++id;
m->pid = getpid();
m->rc = 0;
*m = (struct sm){
.conf = conf,
.state = state,
.invariant = invariant,
.is_locked = is_locked,
.id = ++id,
.pid = getpid(),
.rc = 0,
};
snprintf(m->name, SM_MAX_NAME_LENGTH, "%s", name);
sm_obs(m);

Expand Down Expand Up @@ -98,6 +100,22 @@ void sm_fail(struct sm *m, int fail_state, int rc)
POST(m->invariant != NULL && m->invariant(m, prev));
}

void sm_done(struct sm *m, int good_state, int bad_state, int rc)
{
int prev = sm_state(m);

PRE(sm_is_locked(m));
PRE(m->conf[sm_state(m)].allowed & BITS(good_state));
PRE(m->conf[sm_state(m)].allowed & BITS(bad_state));
PRE(m->conf[good_state].flags & SM_FINAL);
PRE(m->conf[bad_state].flags & SM_FAILURE);

m->rc = rc;
m->state = rc == 0 ? good_state : bad_state;
sm_obs(m);
POST(m->invariant != NULL && m->invariant(m, prev));
}

static __attribute__((noinline)) bool check_failed(const char *f, int n, const char *s)
{
tracef("%s:%d check failed: %s", f, n, s);
Expand Down
1 change: 1 addition & 0 deletions src/lib/sm.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void sm_init(struct sm *m,
void sm_fini(struct sm *m);
void sm_move(struct sm *m, int next_state);
void sm_fail(struct sm *m, int fail_state, int rc);
void sm_done(struct sm *m, int good_state, int bad_state, int rc);
int sm_state(const struct sm *m);
bool sm_check(bool b, const char *f, int n, const char *s);
/* Relates one state machine to another for observability. */
Expand Down
23 changes: 21 additions & 2 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

#include <uv.h>

#include "lib/sm.h"
#include "lib/queue.h"
#include "lib/sm.h" /* struct sm */

#ifndef RAFT_API
#define RAFT_API __attribute__((visibility("default")))
Expand Down Expand Up @@ -524,9 +524,19 @@ typedef void (*raft_io_append_cb)(struct raft_io_append *req, int status);
struct raft_io_append
{
void *data; /* User data */
struct sm sm;
raft_io_append_cb cb; /* Request callback */
};

/**
* Sort-of asynchronous request to truncate the log. (Note that there is no
* callback.)
*/
struct raft_io_truncate
{
struct sm sm;
};

/**
* Asynchronous request to store a new snapshot.
*/
Expand Down Expand Up @@ -652,7 +662,15 @@ struct raft_io
const struct raft_entry entries[],
unsigned n,
raft_io_append_cb cb);
int (*truncate)(struct raft_io *io, raft_index index);
/* Contract: unlike other raft_io methods, truncate takes ownership
* of the passed-in request and frees it using the raft allocator.
* (This is because it doesn't accept a callback that the caller
* could use to free the request itself.) Exception: if the return
* value of truncate is nonzero, the caller must free the request.
* (This is to allow the caller to do sm_relate.) */
int (*truncate)(struct raft_io *io,
struct raft_io_truncate *req,
raft_index index);
int (*snapshot_put)(struct raft_io *io,
unsigned trailing,
struct raft_io_snapshot_put *req,
Expand Down Expand Up @@ -1165,6 +1183,7 @@ RAFT_API int raft_voter_contacts(struct raft *r);
int type; \
raft_index index; \
queue queue; \
struct sm sm; \
uint8_t req_id[16]; \
uint8_t client_id[16]; \
uint8_t unique_id[16]; \
Expand Down
39 changes: 25 additions & 14 deletions src/raft/client.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include "../lib/queue.h"
#include "../raft.h"
#include "../tracing.h"
#include "assert.h"
#include "configuration.h"
#include "err.h"
#include "lifecycle.h"
#include "log.h"
#include "membership.h"
#include "progress.h"
#include "../lib/queue.h"
#include "replication.h"
#include "request.h"

Expand All @@ -17,7 +16,9 @@ int raft_apply(struct raft *r,
const unsigned n,
raft_apply_cb cb)
{
raft_index start;
raft_index index;
const struct sm *entry_sm;
int rv;

tracef("raft_apply n %d", n);
Expand All @@ -34,30 +35,40 @@ int raft_apply(struct raft *r,
}

/* Index of the first entry being appended. */
index = logLastIndex(r->log) + 1;
tracef("%u commands starting at %lld", n, index);
start = logLastIndex(r->log) + 1;
tracef("%u commands starting at %lld", n, start);
req->type = RAFT_COMMAND;
req->index = index;
req->index = start;
req->cb = cb;

sm_init(&req->sm, request_invariant, NULL, request_states, "apply-request",
REQUEST_START);
queue_insert_tail(&r->leader_state.requests, &req->queue);

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, n);
if (rv != 0) {
goto err;
index = start;
for (unsigned i = 0; i < n; i++) {
rv = logAppend(r->log, r->current_term, RAFT_COMMAND, bufs[i], true, NULL);
if (rv != 0) {
goto err_after_request_start;
}
entry_sm = log_get_entry_sm(r->log, r->current_term, index);
assert(entry_sm != NULL);
sm_relate(&req->sm, entry_sm);
index++;
}

lifecycleRequestStart(r, (struct request *)req);

rv = replicationTrigger(r, index);
rv = replicationTrigger(r, start);
if (rv != 0) {
goto err_after_log_append;
goto err_after_request_start;
}

return 0;

err_after_log_append:
err_after_request_start:
logDiscard(r->log, index);
queue_remove(&req->queue);
sm_fail(&req->sm, REQUEST_FAILED, rv);
err:
assert(rv != 0);
return rv;
Expand Down Expand Up @@ -95,7 +106,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
goto err_after_buf_alloc;
}

lifecycleRequestStart(r, (struct request *)req);
queue_insert_tail(&r->leader_state.requests, &req->queue);

rv = replicationTrigger(r, index);
if (rv != 0) {
Expand Down
Loading

0 comments on commit 81eeab5

Please sign in to comment.