-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
storage: batch command application and coalesce applied state per batch
This commit batches raft command application where possible. The basic approach is to notice that many commands only "trivially" update the replica state machine. Trivial commands can be processed in a single batch by acting on a copy of the replica state. Non-trivial commands share the same logic but always commit alone as they for one reason or another rely on having a view of the replica or storage engine as of a specific log index. This commit also sneaks in another optimization which batching enables. Each command mutates a portion of replica state called the applied state which tracks a combination of the log index which has been applied and the MVCC stats of the range as of that application. Before this change each entry would update this applied state and each of those writes will end up in the WAL and mem-table just the be compacted away in L1. Now that commands are being applied to the storage engine in a single batch it is easy to only update the applied state for the last entry in the batch. For sequential writes this patch shows a considerable performance win. The below benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with concurrency 128. ``` name old ops/s new ops/s delta KV0-throughput 22.1k ± 1% 32.8k ± 1% +48.59% (p=0.029 n=4+4) name old ms/s new ms/s delta KV0-P50 7.15 ± 2% 6.00 ± 0% -16.08% (p=0.029 n=4+4) KV0-Avg 5.80 ± 0% 3.80 ± 0% -34.48% (p=0.029 n=4+4) ``` Due to the re-organization of logic in the change, the Replica.mu does not need to be acquired as many times during the application of a batch. In the common case it is now acquired exactly twice in the process of applying a batch whereas before it was acquired more than twice per entry. This should hopefully improve performance on large machines which experience mutex contention for a single range. This effect is visible on large machines. Below are results from running a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with concurrency 1024 and 16 initial splits. ``` name old ops/s new ops/s delta KV0-throughput 78.1k ± 1% 116.8k ± 5% +49.42% (p=0.029 n=4+4) name old ms/s new ms/s delta KV0-P50 24.4 ± 3% 19.7 ± 7% -19.28% (p=0.029 n=4+4) KV0-Avg 12.6 ± 0% 7.5 ± 9% -40.87% (p=0.029 n=4+4) ``` Fixes #37426. Release note (performance improvement): Batch raft entry application and coalesce writes to applied state for the batch.
- Loading branch information
Showing
11 changed files
with
1,752 additions
and
1,076 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Copyright 2019 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package storage | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/storage/engine" | ||
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" | ||
"github.com/cockroachdb/cockroach/pkg/storage/storagepb" | ||
"go.etcd.io/etcd/raft/raftpb" | ||
) | ||
|
||
// cmdAppBatch accumulates state due to the application of raft | ||
// commands. Committed raft commands are applied to the batch in a multi-stage | ||
// process whereby individual commands are decoded, prepared for application | ||
// relative to the current view of replicaState, committed to the storage | ||
// engine, applied to the Replica's in-memory state and then finished by | ||
// releasing their latches and notifying clients. | ||
type cmdAppBatch struct { | ||
// decodeBuf is used to decode an entry before adding it to the batch. | ||
// See decode(). | ||
decodeBuf decodedRaftEntry | ||
|
||
// batch accumulates writes implied by the raft entries in this batch. | ||
batch engine.Batch | ||
|
||
// replicaState is this batch's view of the replica's state. | ||
// It is copied from under the Replica.mu when the batch is initialized and | ||
// is updated in stageTrivialReplicatedEvalResult. | ||
replicaState storagepb.ReplicaState | ||
|
||
// stats is stored on the application batch to avoid an allocation in tracking | ||
// the batch's view of replicaState. All pointer fields in replicaState other | ||
// than Stats are overwritten completely rather than updated in-place. | ||
stats enginepb.MVCCStats | ||
|
||
// updatedTruncatedState tracks whether any command in the batch updated the | ||
// replica's truncated state. Truncated state updates are considered trivial | ||
// but represent something of a special case but given their relative | ||
// frequency and the fact that multiple updates can be trivially coalesced, we | ||
// treat updates to truncated state as trivial. If the batch updated the | ||
// truncated state then after it has been committed, then the side-loaded data | ||
// and raftentry.Cache should be truncated to the index indicated. | ||
// TODO(ajwerner): consider whether this logic should imply that commands | ||
// which update truncated state are non-trivial. | ||
updatedTruncatedState bool | ||
|
||
cmdBuf cmdAppCtxBuf | ||
} | ||
|
||
// cmdAppBatch structs are needed to apply raft commands, which is to | ||
// say, frequently, so best to pool them rather than allocated under the raftMu. | ||
var cmdAppBatchSyncPool = sync.Pool{ | ||
New: func() interface{} { | ||
return new(cmdAppBatch) | ||
}, | ||
} | ||
|
||
func getCmdAppBatch() *cmdAppBatch { | ||
return cmdAppBatchSyncPool.Get().(*cmdAppBatch) | ||
} | ||
|
||
func releaseCmdAppBatch(b *cmdAppBatch) { | ||
b.cmdBuf.clear() | ||
*b = cmdAppBatch{} | ||
cmdAppBatchSyncPool.Put(b) | ||
} | ||
|
||
// add adds adds the entry and its decoded state to the end of the batch. | ||
func (b *cmdAppBatch) add(e *raftpb.Entry, d decodedRaftEntry) { | ||
s := b.cmdBuf.allocate() | ||
s.decodedRaftEntry = d | ||
s.e = e | ||
} | ||
|
||
// decode decodes commands from toProcess until either all of the commands have | ||
// been added to the batch or a non-trivial command is decoded. Non-trivial | ||
// commands must always be in their own batch. If a non-trivial command is | ||
// encountered the batch is returned immediately without adding the newly | ||
// decoded command to the batch or removing it from remaining. | ||
// It is the client's responsibility to deal with non-trivial commands. | ||
// | ||
// numEmptyEntries indicates the number of entries in the consumed portion of | ||
// toProcess contained a zero-byte payload. | ||
func (b *cmdAppBatch) decode( | ||
ctx context.Context, toProcess []raftpb.Entry, decodeBuf *decodedRaftEntry, | ||
) ( | ||
foundNonTrivialEntry bool, | ||
numEmptyEntries int, | ||
remaining []raftpb.Entry, | ||
errExpl string, | ||
err error, | ||
) { | ||
for len(toProcess) > 0 { | ||
e := &toProcess[0] | ||
if len(e.Data) == 0 { | ||
numEmptyEntries++ | ||
} | ||
if errExpl, err = decodeBuf.decode(ctx, e); err != nil { | ||
return false, numEmptyEntries, nil, errExpl, err | ||
} | ||
// This is a non-trivial entry which needs to be processed alone. | ||
foundNonTrivialEntry = !isTrivial(decodeBuf.replicatedResult(), | ||
b.replicaState.UsingAppliedStateKey) | ||
if foundNonTrivialEntry { | ||
break | ||
} | ||
// We're going to process this entry in this batch so pop it from toProcess | ||
// and add to appStates. | ||
toProcess = toProcess[1:] | ||
b.add(e, *decodeBuf) | ||
} | ||
return foundNonTrivialEntry, numEmptyEntries, toProcess, "", nil | ||
} | ||
|
||
func (b *cmdAppBatch) reset() { | ||
b.cmdBuf.clear() | ||
*b = cmdAppBatch{ | ||
decodeBuf: b.decodeBuf, // preserve the previously decoded entry | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
// Copyright 2019 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package storage | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" | ||
"github.com/cockroachdb/cockroach/pkg/storage/storagebase" | ||
"github.com/cockroachdb/cockroach/pkg/storage/storagepb" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/protoutil" | ||
"github.com/pkg/errors" | ||
"go.etcd.io/etcd/raft/raftpb" | ||
) | ||
|
||
// cmdAppCtx stores the state required to apply a single raft | ||
// entry to a replica. The state is accumulated in stages which occur in | ||
// Replica.handleCommittedEntriesRaftMuLocked. From a high level, a command is | ||
// decoded into an entryApplicationBatch, then if it was proposed locally the | ||
// proposal is populated from the replica's proposals map, then the command | ||
// is staged into the batch by writing its update to the batch's engine.Batch | ||
// and applying its "trivial" side-effects to the batch's view of ReplicaState. | ||
// Then the batch is committed, the side-effects are applied and the local | ||
// result is processed. | ||
type cmdAppCtx struct { | ||
// e is the Entry being applied. | ||
e *raftpb.Entry | ||
decodedRaftEntry // decoded from e. | ||
|
||
// proposal is populated on the proposing Replica only and comes from the | ||
// Replica's proposal map. | ||
proposal *ProposalData | ||
// ctx will be the proposal's context if proposed locally, otherwise it will | ||
// be populated with the handleCommittedEntries ctx. | ||
ctx context.Context | ||
|
||
// The below fields are set during stageRaftCommand when we validate that | ||
// a command applies given the current lease in checkForcedErr. | ||
leaseIndex uint64 | ||
forcedErr *roachpb.Error | ||
proposalRetry proposalReevaluationReason | ||
mutationCount int // number of mutations in the WriteBatch, for writeStats | ||
// splitMergeUnlock is acquired for splits and merges. | ||
splitMergeUnlock func(*storagepb.ReplicatedEvalResult) | ||
|
||
// The below fields are set after the data has been written to the storage | ||
// engine in prepareLocalResult. | ||
localResult *result.LocalResult | ||
response proposalResult | ||
} | ||
|
||
func (cmd *cmdAppCtx) proposedLocally() bool { | ||
return cmd.proposal != nil | ||
} | ||
|
||
// decodedRaftEntry represents the deserialized content of a raftpb.Entry. | ||
type decodedRaftEntry struct { | ||
idKey storagebase.CmdIDKey | ||
raftCmd storagepb.RaftCommand | ||
*decodedConfChange // only non-nil for config changes | ||
} | ||
|
||
// decodedConfChange represents the fields of a config change raft command. | ||
type decodedConfChange struct { | ||
cc raftpb.ConfChange | ||
ccCtx ConfChangeContext | ||
} | ||
|
||
func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { | ||
return &d.raftCmd.ReplicatedEvalResult | ||
} | ||
|
||
// decode decodes the entry e into the decodedRaftEntry. | ||
func (d *decodedRaftEntry) decode( | ||
ctx context.Context, e *raftpb.Entry, | ||
) (errExpl string, err error) { | ||
*d = decodedRaftEntry{} | ||
// etcd raft sometimes inserts nil commands, ours are never nil. | ||
// This case is handled upstream of this call. | ||
if len(e.Data) == 0 { | ||
return "", nil | ||
} | ||
switch e.Type { | ||
case raftpb.EntryNormal: | ||
return d.decodeNormalEntry(e) | ||
case raftpb.EntryConfChange: | ||
return d.decodeConfChangeEntry(e) | ||
default: | ||
log.Fatalf(ctx, "unexpected Raft entry: %v", e) | ||
return "", nil // unreachable | ||
} | ||
} | ||
|
||
func (d *decodedRaftEntry) decodeNormalEntry(e *raftpb.Entry) (errExpl string, err error) { | ||
var encodedCommand []byte | ||
d.idKey, encodedCommand = DecodeRaftCommand(e.Data) | ||
// An empty command is used to unquiesce a range and wake the | ||
// leader. Clear commandID so it's ignored for processing. | ||
if len(encodedCommand) == 0 { | ||
d.idKey = "" | ||
} else if err := protoutil.Unmarshal(encodedCommand, &d.raftCmd); err != nil { | ||
const errExpl = "while unmarshalling entry" | ||
return errExpl, errors.Wrap(err, errExpl) | ||
} | ||
return "", nil | ||
} | ||
|
||
func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) (errExpl string, err error) { | ||
d.decodedConfChange = &decodedConfChange{} | ||
if err := protoutil.Unmarshal(e.Data, &d.cc); err != nil { | ||
const errExpl = "while unmarshaling ConfChange" | ||
return errExpl, errors.Wrap(err, errExpl) | ||
} | ||
if err := protoutil.Unmarshal(d.cc.Context, &d.ccCtx); err != nil { | ||
const errExpl = "while unmarshaling ConfChangeContext" | ||
return errExpl, errors.Wrap(err, errExpl) | ||
} | ||
if err := protoutil.Unmarshal(d.ccCtx.Payload, &d.raftCmd); err != nil { | ||
const errExpl = "while unmarshaling RaftCommand" | ||
return errExpl, errors.Wrap(err, errExpl) | ||
} | ||
d.idKey = storagebase.CmdIDKey(d.ccCtx.CommandID) | ||
return "", nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
// Copyright 2019 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package storage | ||
|
||
import "sync" | ||
|
||
// cmdAppCtxBufNodeSize is the size of the arrays in an | ||
// cmdAppStateBufNode. | ||
// TODO(ajwerner): justify this number. | ||
const cmdAppCtxBufNodeSize = 8 | ||
|
||
// cmdAppCtxBuf is an allocation-efficient buffer used during the | ||
// application of raft entries. Initialization occurs lazily upon the first | ||
// call to allocate but used cmdAppCtxBuf objects should be released | ||
// explicitly with the clear() method to release the allocated buffers back | ||
// to the pool. | ||
type cmdAppCtxBuf struct { | ||
len int32 | ||
head, tail *cmdAppCtxBufNode | ||
} | ||
|
||
// cmdAppCtxBufNode is a linked-list element in an | ||
// cmdAppStateBuf. | ||
type cmdAppCtxBufNode struct { | ||
len int32 | ||
buf [cmdAppCtxBufNodeSize]cmdAppCtx | ||
next *cmdAppCtxBufNode | ||
} | ||
|
||
var cmdAppStateBufNodeSyncPool = sync.Pool{ | ||
New: func() interface{} { return new(cmdAppCtxBufNode) }, | ||
} | ||
|
||
// allocate extends the length of buf by one and returns the newly | ||
// added element. If this is the first call to allocate it will initialize buf. | ||
// After a buf is initialized it should be explicitly destroyed. | ||
func (buf *cmdAppCtxBuf) allocate() *cmdAppCtx { | ||
if buf.tail == nil { // lazy initialization | ||
n := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) | ||
buf.head, buf.tail = n, n | ||
} | ||
if buf.tail.len == cmdAppCtxBufNodeSize { | ||
newTail := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) | ||
buf.tail.next = newTail | ||
buf.tail = newTail | ||
} | ||
ret := &buf.tail.buf[buf.tail.len] | ||
buf.tail.len++ | ||
buf.len++ | ||
return ret | ||
} | ||
|
||
// truncate clears all of the entries currently in a buffer and returns any | ||
// allocated buffers to the pool. | ||
func (buf *cmdAppCtxBuf) clear() { | ||
for buf.head != nil { | ||
buf.len -= buf.head.len | ||
oldHead := buf.head | ||
newHead := oldHead.next | ||
buf.head = newHead | ||
*oldHead = cmdAppCtxBufNode{} | ||
cmdAppStateBufNodeSyncPool.Put(oldHead) | ||
} | ||
*buf = cmdAppCtxBuf{} | ||
} | ||
|
||
// last returns a pointer to the last element in the buffer. | ||
func (buf *cmdAppCtxBuf) last() *cmdAppCtx { | ||
return &buf.tail.buf[buf.tail.len-1] | ||
} | ||
|
||
// cmdAppCtxBufIterator iterates through the entries in an | ||
// cmdAppStateBuf. | ||
type cmdAppCtxBufIterator struct { | ||
idx int32 | ||
buf *cmdAppCtxBuf | ||
node *cmdAppCtxBufNode | ||
} | ||
|
||
// init seeks the iterator to the front of buf. It returns true if buf is not | ||
// empty and false if it is. | ||
func (it *cmdAppCtxBufIterator) init(buf *cmdAppCtxBuf) (ok bool) { | ||
*it = cmdAppCtxBufIterator{buf: buf, node: buf.head} | ||
return it.buf.len > 0 | ||
} | ||
|
||
// cur returns the cmdAppState currently pointed to by it. | ||
func (it *cmdAppCtxBufIterator) cur() *cmdAppCtx { | ||
return &it.node.buf[it.idx%cmdAppCtxBufNodeSize] | ||
} | ||
|
||
// isLast returns true if it currently points to the last element in the buffer. | ||
func (it *cmdAppCtxBufIterator) isLast() bool { | ||
return it.idx+1 == it.buf.len | ||
} | ||
|
||
// next moves it to point to the next element. It returns false if it.isLast() | ||
// is true, indicating that there are no more elements in the buffer. | ||
func (it *cmdAppCtxBufIterator) next() bool { | ||
if it.isLast() { | ||
return false | ||
} | ||
it.idx++ | ||
if it.idx%cmdAppCtxBufNodeSize == 0 { | ||
it.node = it.node.next | ||
} | ||
return true | ||
} |
Oops, something went wrong.