Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: support asynchronous storage writes #8

Merged
merged 13 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func mustTemp(pre, body string) string {
func ltoa(l *raftLog) string {
s := fmt.Sprintf("lastIndex: %d\n", l.lastIndex())
s += fmt.Sprintf("applied: %d\n", l.applied)
s += fmt.Sprintf("applying: %d\n", l.applying)
for i, e := range l.allEntries() {
s += fmt.Sprintf("#%d: %+v\n", i, e)
}
Expand Down
100 changes: 100 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,94 @@ given ID MUST be used only once even if the old node has been removed.
This means that for example IP addresses make poor node IDs since they
may be reused. Node IDs must be non-zero.

# Usage with Asynchronous Storage Writes

The library can be configured with an alternate interface for local storage
writes that can provide better performance in the presence of high proposal
concurrency by minimizing interference between proposals. This feature is called
AsynchronousStorageWrites, and can be enabled using the flag on the Config
struct with the same name.

When Asynchronous Storage Writes is enabled, the responsibility of code using
the library is different from what was presented above. Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 above). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 above).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order.

Each local storage message carries a slice of response messages that must
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
for _, m := range rd.Messages {
switch m.To {
case raft.LocalAppendThread:
toAppend <- m
case raft.LocalApplyThread:
toApply <-m
default:
sendOverNetwork(m)
}
}
case <-s.done:
return
}
}

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

// append thread
go func() {
for {
select {
case m := <-toAppend:
saveToStorage(m.State, m.Entries, m.Snapshot)
send(m.Responses)
case <-s.done:
return
}
}
}

// apply thread
go func() {
for {
select {
case m := <-toApply:
for _, entry := range m.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
send(m.Responses)
case <-s.done:
return
}
}
}

# Implementation notes

This implementation is up to date with the final Raft thesis
Expand Down Expand Up @@ -295,5 +383,17 @@ stale log entries:
that the follower that sent this 'MsgUnreachable' is not reachable, often
indicating 'MsgApp' is lost. When follower's progress state is replicate,
the leader sets it back to probe.

'MsgStorageAppend' is a message from a node to its local append storage
thread to write entries, hard state, and/or a snapshot to stable storage.
The message will carry one or more responses, one of which will be a
'MsgStorageAppendResp' back to itself. The responses can also contain
'MsgAppResp', 'MsgVoteResp', and 'MsgPreVoteResp' messages. Used with
AsynchronousStorageWrites.

'MsgStorageApply' is a message from a node to its local apply storage
thread to apply committed entries. The message will carry one response,
which will be a 'MsgStorageApplyResp' back to itself. Used with
AsynchronousStorageWrites.
*/
package raft
Loading