Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start instrumenting existing code with state machines #678

Merged
merged 27 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9f0f195
leader: Add state machine for exec requests
cole-miller Aug 2, 2024
7afa477
sm: Initialize rc field
cole-miller Aug 2, 2024
62380b1
raft/client: Introduce state machine for raft_apply requests
cole-miller Aug 2, 2024
5db84c2
raft/log: Add state machine for active entries
cole-miller Aug 2, 2024
96f9759
raft/uv_append: Add state machine for append requests
cole-miller Aug 3, 2024
05c4891
raft/replication: Create state machine to track appendFollower
cole-miller Aug 3, 2024
65032c8
sm: Observe failures
cole-miller Aug 3, 2024
24cb351
sm: Remove extraneous newlines
cole-miller Aug 3, 2024
fde8bbe
sm: Support attributes
cole-miller Aug 3, 2024
51f4baa
Note number of follower append entries in an attr
cole-miller Aug 3, 2024
ccf87e2
Remove problematic entry-to-append on follower
cole-miller Aug 3, 2024
be2f9d3
Set up I/O fixture with simple sms
cole-miller Aug 5, 2024
c20e79b
Add a state machine for truncate requests
cole-miller Aug 5, 2024
cb93894
Make the fixture's truncate method async
cole-miller Aug 5, 2024
51ee710
Relate follower append to log truncation
cole-miller Aug 5, 2024
c90a011
Ifdef out raft-related sm_relate for external raft builds
cole-miller Aug 5, 2024
1ebee4b
Ignore format-nonliteral warning
cole-miller Aug 5, 2024
fa3f0b4
Fix cleanup in appendLeader
cole-miller Aug 5, 2024
11665b2
Address review comments
cole-miller Sep 23, 2024
bc7a5ec
Remove overzealous assert
cole-miller Sep 23, 2024
2c2ce8b
Shorten state machine declarations
cole-miller Sep 23, 2024
66a2cd1
Merge remote-tracking branch 'canonical/master' into tx-observability…
cole-miller Oct 7, 2024
19f455b
Fix
cole-miller Oct 7, 2024
633f471
Fix
cole-miller Oct 7, 2024
07ed5b9
Fix CI
cole-miller Oct 7, 2024
33a4a24
Investigate
cole-miller Oct 7, 2024
85e494d
Try to appease ASan
cole-miller Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,9 @@ jobs:
env:
CC: ${{ matrix.compiler }}
LIBDQLITE_TRACE: 1
ASAN_OPTIONS: fast_unwind_on_malloc=0
run: |
# TODO: return to just `make check` once the mysterious hang is fixed
tests="$(make print-test-programs | tr ' ' '\n' | grep -v '^unit-test$')"
make check TESTS="$tests"

# Grab backtraces when the unit-test binary hangs (this is a heisenbug
# that we've only been able to trigger in GHA jobs)
./unit-test --no-fork &
pid=$!
bash -c "sleep 10m; sudo gdb -p $pid -batch -ex 'thread apply all bt' -ex quit; false" &
wait -n
make check || (cat test-suite.log && false)

- name: Coverage
env:
Expand Down
1 change: 0 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ libraft_la_SOURCES = \
src/raft/fixture.c \
src/raft/flags.c \
src/raft/heap.c \
src/raft/lifecycle.c \
src/raft/log.c \
src/raft/membership.c \
src/raft/progress.c \
Expand Down
47 changes: 47 additions & 0 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,59 @@
#include "gateway.h"
#include "id.h"
#include "leader.h"
#include "lib/sm.h"
#include "lib/threadpool.h"
#include "server.h"
#include "tracing.h"
#include "utils.h"
#include "vfs.h"

/**
* State machine for exec requests.
*/
enum {
EXEC_START,
EXEC_BARRIER,
EXEC_STEPPED,
EXEC_POLLED,
EXEC_DONE,
EXEC_FAILED,
EXEC_NR,
};

#define A(ident) BITS(EXEC_##ident)
#define S(ident, allowed_, flags_) \
[EXEC_##ident] = { .name = #ident, .allowed = (allowed_), .flags = (flags_) }

static const struct sm_conf exec_states[EXEC_NR] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wanted to leave this code here to show that it's possible to define states in a more compact way:

#define _S(name, flags, allowed)      \
        [name] = {                    \
                .flags   = flags,  \
                .name    = #name,  \
                .allowed = allowed \
        }

        _S(INIT,   SM_INITIAL, BITS(INIT, DONE)),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I experimented with switching the state machine definitions over to this style, and it works well for some of them, but once you have more than a few allowed transitions out of some state the lines get long enough to need breaking, and at that point you're more or less back to the beginning. I've pushed a commit that rewrites the definitions in this style, let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not pushing for this style, but it's my personal preference.

S(START, A(BARRIER)|A(FAILED)|A(DONE), SM_INITIAL),
S(BARRIER, A(STEPPED)|A(FAILED)|A(DONE), 0),
S(STEPPED, A(POLLED)|A(FAILED)|A(DONE), 0),
S(POLLED, A(FAILED)|A(DONE), 0),
S(DONE, 0, SM_FINAL),
S(FAILED, 0, SM_FAILURE|SM_FINAL),
};

static bool exec_invariant(const struct sm *sm, int prev)
{
(void)sm;
(void)prev;
return true;
}

/* Called when a leader exec request terminates and the associated callback can
* be invoked. */
static void leaderExecDone(struct exec *req)
{
tracef("leader exec done id:%" PRIu64, req->id);
req->leader->exec = NULL;
/* SQLITE_DONE (= 101) indicates success, any other code including
* SQLITE_OK (= 0) indicates failure. */
int status = req->status == SQLITE_DONE ? 0 :
req->status == SQLITE_OK ? SQLITE_ERROR :
req->status;
sm_done(&req->sm, EXEC_DONE, EXEC_FAILED, status);
sm_fini(&req->sm);
if (req->cb != NULL) {
req->cb(req, req->status);
}
Expand Down Expand Up @@ -366,10 +407,12 @@ static void leaderExecV2(struct exec *req, enum pool_half half)

if (half == POOL_TOP_HALF) {
req->status = sqlite3_step(req->stmt);
sm_move(&req->sm, EXEC_STEPPED);
return;
} /* else POOL_BOTTOM_HALF => */

rv = VfsPoll(vfs, db->path, &frames, &n);
sm_move(&req->sm, EXEC_POLLED);
if (rv != 0 || n == 0) {
tracef("vfs poll");
goto finish;
Expand Down Expand Up @@ -413,6 +456,8 @@ static void execBarrierCb(struct barrier *barrier, int status)
struct exec *req = barrier->data;
struct leader *l = req->leader;

sm_move(&req->sm, EXEC_BARRIER);

if (status != 0) {
l->exec->status = status;
leaderExecDone(l->exec);
Expand Down Expand Up @@ -444,6 +489,8 @@ int leader__exec(struct leader *l,
req->barrier.data = req;
req->barrier.cb = NULL;
req->work = (pool_work_t){};
sm_init(&req->sm, exec_invariant, NULL, exec_states, "exec",
EXEC_START);

rv = leader__barrier(l, &req->barrier, execBarrierCb);
if (rv != 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/leader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
#include <sqlite3.h>
#include <stdbool.h>

#include "./lib/queue.h"
#include "db.h"
#include "lib/queue.h"
#include "lib/sm.h" /* struct sm */
#include "lib/threadpool.h"
#include "raft.h"

Expand Down Expand Up @@ -65,6 +66,7 @@ struct exec {
queue queue;
exec_cb cb;
pool_work_t work;
struct sm sm;
};

/**
Expand Down
32 changes: 25 additions & 7 deletions src/lib/sm.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ void sm_init(struct sm *m,

PRE(conf[state].flags & SM_INITIAL);

m->conf = conf;
m->state = state;
m->invariant = invariant;
m->is_locked = is_locked;
m->id = ++id;
m->pid = getpid();
m->rc = 0;
*m = (struct sm){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks cool as it's easier to grep out initialization with grep "(struct sm){".

.conf = conf,
.state = state,
.invariant = invariant,
.is_locked = is_locked,
.id = ++id,
.pid = getpid(),
.rc = 0,
};
snprintf(m->name, SM_MAX_NAME_LENGTH, "%s", name);
sm_obs(m);

Expand Down Expand Up @@ -98,6 +100,22 @@ void sm_fail(struct sm *m, int fail_state, int rc)
POST(m->invariant != NULL && m->invariant(m, prev));
}

void sm_done(struct sm *m, int good_state, int bad_state, int rc)
{
int prev = sm_state(m);

PRE(sm_is_locked(m));
PRE(m->conf[sm_state(m)].allowed & BITS(good_state));
PRE(m->conf[sm_state(m)].allowed & BITS(bad_state));
PRE(m->conf[good_state].flags & SM_FINAL);
PRE(m->conf[bad_state].flags & SM_FAILURE);

m->rc = rc;
m->state = rc == 0 ? good_state : bad_state;
sm_obs(m);
POST(m->invariant != NULL && m->invariant(m, prev));
}

static __attribute__((noinline)) bool check_failed(const char *f, int n, const char *s)
{
tracef("%s:%d check failed: %s", f, n, s);
Expand Down
1 change: 1 addition & 0 deletions src/lib/sm.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void sm_init(struct sm *m,
void sm_fini(struct sm *m);
void sm_move(struct sm *m, int next_state);
void sm_fail(struct sm *m, int fail_state, int rc);
void sm_done(struct sm *m, int good_state, int bad_state, int rc);
int sm_state(const struct sm *m);
bool sm_check(bool b, const char *f, int n, const char *s);
/* Relates one state machine to another for observability. */
Expand Down
23 changes: 21 additions & 2 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

#include <uv.h>

#include "lib/sm.h"
#include "lib/queue.h"
#include "lib/sm.h" /* struct sm */

#ifndef RAFT_API
#define RAFT_API __attribute__((visibility("default")))
Expand Down Expand Up @@ -524,9 +524,19 @@ typedef void (*raft_io_append_cb)(struct raft_io_append *req, int status);
struct raft_io_append
{
void *data; /* User data */
struct sm sm;
raft_io_append_cb cb; /* Request callback */
};

/**
* Sort-of asynchronous request to truncate the log. (Note that there is no
* callback.)
*/
struct raft_io_truncate
{
struct sm sm;
};

/**
* Asynchronous request to store a new snapshot.
*/
Expand Down Expand Up @@ -652,7 +662,15 @@ struct raft_io
const struct raft_entry entries[],
unsigned n,
raft_io_append_cb cb);
int (*truncate)(struct raft_io *io, raft_index index);
/* Contract: unlike other raft_io methods, truncate takes ownership
* of the passed-in request and frees it using the raft allocator.
* (This is because it doesn't accept a callback that the caller
* could use to free the request itself.) Exception: if the return
* value of truncate is nonzero, the caller must free the request.
* (This is to allow the caller to do sm_relate.) */
int (*truncate)(struct raft_io *io,
struct raft_io_truncate *req,
raft_index index);
int (*snapshot_put)(struct raft_io *io,
unsigned trailing,
struct raft_io_snapshot_put *req,
Expand Down Expand Up @@ -1165,6 +1183,7 @@ RAFT_API int raft_voter_contacts(struct raft *r);
int type; \
raft_index index; \
queue queue; \
struct sm sm; \
uint8_t req_id[16]; \
uint8_t client_id[16]; \
uint8_t unique_id[16]; \
Expand Down
39 changes: 25 additions & 14 deletions src/raft/client.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include "../lib/queue.h"
#include "../raft.h"
#include "../tracing.h"
#include "assert.h"
#include "configuration.h"
#include "err.h"
#include "lifecycle.h"
#include "log.h"
#include "membership.h"
#include "progress.h"
#include "../lib/queue.h"
#include "replication.h"
#include "request.h"

Expand All @@ -17,7 +16,9 @@
const unsigned n,
raft_apply_cb cb)
{
raft_index start;
raft_index index;
const struct sm *entry_sm;
int rv;

tracef("raft_apply n %d", n);
Expand All @@ -34,30 +35,40 @@
}

/* Index of the first entry being appended. */
index = logLastIndex(r->log) + 1;
tracef("%u commands starting at %lld", n, index);
start = logLastIndex(r->log) + 1;
tracef("%u commands starting at %lld", n, start);
req->type = RAFT_COMMAND;
req->index = index;
req->index = start;
req->cb = cb;

sm_init(&req->sm, request_invariant, NULL, request_states, "apply-request",
REQUEST_START);
queue_insert_tail(&r->leader_state.requests, &req->queue);

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, n);
if (rv != 0) {
goto err;
index = start;
for (unsigned i = 0; i < n; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason of removing logAppendCommands() helper is just cleaner code or I'm missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a cleanup, yes.

rv = logAppend(r->log, r->current_term, RAFT_COMMAND, bufs[i], true, NULL);
if (rv != 0) {
goto err_after_request_start;

Check warning on line 53 in src/raft/client.c

View check run for this annotation

Codecov / codecov/patch

src/raft/client.c#L53

Added line #L53 was not covered by tests
}
entry_sm = log_get_entry_sm(r->log, r->current_term, index);
assert(entry_sm != NULL);
sm_relate(&req->sm, entry_sm);
index++;
}

lifecycleRequestStart(r, (struct request *)req);

rv = replicationTrigger(r, index);
rv = replicationTrigger(r, start);
if (rv != 0) {
goto err_after_log_append;
goto err_after_request_start;

Check warning on line 63 in src/raft/client.c

View check run for this annotation

Codecov / codecov/patch

src/raft/client.c#L63

Added line #L63 was not covered by tests
}

return 0;

err_after_log_append:
err_after_request_start:

Check warning on line 68 in src/raft/client.c

View check run for this annotation

Codecov / codecov/patch

src/raft/client.c#L68

Added line #L68 was not covered by tests
logDiscard(r->log, index);
queue_remove(&req->queue);
sm_fail(&req->sm, REQUEST_FAILED, rv);

Check warning on line 71 in src/raft/client.c

View check run for this annotation

Codecov / codecov/patch

src/raft/client.c#L71

Added line #L71 was not covered by tests
err:
assert(rv != 0);
return rv;
Expand Down Expand Up @@ -95,7 +106,7 @@
goto err_after_buf_alloc;
}

lifecycleRequestStart(r, (struct request *)req);
queue_insert_tail(&r->leader_state.requests, &req->queue);

rv = replicationTrigger(r, index);
if (rv != 0) {
Expand Down
Loading
Loading