Skip to content

Commit

Permalink
Merge #38568
Browse files Browse the repository at this point in the history
38568: storage: batch command application and coalesce applied state per batch  r=nvanbenschoten,tbg a=ajwerner

This commit batches raft command application where possible. The basic
approach is to notice that many commands only "trivially" update the
replica state machine. Trivial commands can be processed in a single
batch by acting on a copy of the replica state. Non-trivial commands
share the same logic but always commit alone as they for one reason or
another rely on having a view of the replica or storage engine as of a
specific log index.

This commit also sneaks in another optimization which batching enables.
Each command mutates a portion of replica state called the applied state
which tracks a combination of the log index which has been applied and the
MVCC stats of the range as of that application. Before this change each
entry would update this applied state and each of those writes will end up
in the WAL and mem-table just the be compacted away in L0. Now that
commands are being applied to the storage engine in a single batch it is easy
to only update the applied state for the last entry in the batch.

For sequential writes this patch shows a considerable performance win. The below
benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with concurrency 128.

```
name            old ops/s   new ops/s   delta
KV0-throughput  22.1k ± 1%  32.8k ± 1%  +48.59%  (p=0.029 n=4+4)

name            old ms/s    new ms/s    delta
KV0-P50          7.15 ± 2%   6.00 ± 0%  -16.08%  (p=0.029 n=4+4)
KV0-Avg          5.80 ± 0%   3.80 ± 0%  -34.48%  (p=0.029 n=4+4)
```

Due to the re-organization of logic in the change, the Replica.mu does not need
to be acquired as many times during the application of a batch. In the common
case it is now acquired exactly twice in the process of applying a batch whereas
before it was acquired more than twice per entry. This should hopefully improve
performance on large machines which experience mutex contention for a single
range.

These optimizations combine for a big win on large machines. Below are results 
from running a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with
concurrency 1024 and 16 initial splits.

```
name            old ops/s   new ops/s    delta
KV0-throughput  78.1k ± 1%  116.8k ± 5%  +49.42%  (p=0.029 n=4+4)

name            old ms/s    new ms/s     delta
KV0-P50          24.4 ± 3%    19.7 ± 7%  -19.28%  (p=0.029 n=4+4)
KV0-Avg          12.6 ± 0%     7.5 ± 9%  -40.87%  (p=0.029 n=4+4)
```

Fixes #37426.

Release note (performance improvement): Batch raft entry application and
coalesce writes to applied state for the batch.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jul 12, 2019
2 parents 35a9571 + e4ce717 commit 509baff
Show file tree
Hide file tree
Showing 11 changed files with 1,752 additions and 1,076 deletions.
131 changes: 131 additions & 0 deletions pkg/storage/cmd_app_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"go.etcd.io/etcd/raft/raftpb"
)

// cmdAppBatch accumulates state due to the application of raft
// commands. Committed raft commands are applied to the batch in a multi-stage
// process whereby individual commands are decoded, prepared for application
// relative to the current view of replicaState, committed to the storage
// engine, applied to the Replica's in-memory state and then finished by
// releasing their latches and notifying clients.
type cmdAppBatch struct {
// decodeBuf is used to decode an entry before adding it to the batch.
// See decode().
decodeBuf decodedRaftEntry

// batch accumulates writes implied by the raft entries in this batch.
batch engine.Batch

// replicaState is this batch's view of the replica's state.
// It is copied from under the Replica.mu when the batch is initialized and
// is updated in stageTrivialReplicatedEvalResult.
replicaState storagepb.ReplicaState

// stats is stored on the application batch to avoid an allocation in tracking
// the batch's view of replicaState. All pointer fields in replicaState other
// than Stats are overwritten completely rather than updated in-place.
stats enginepb.MVCCStats

// updatedTruncatedState tracks whether any command in the batch updated the
// replica's truncated state. Truncated state updates are considered trivial
// but represent something of a special case but given their relative
// frequency and the fact that multiple updates can be trivially coalesced, we
// treat updates to truncated state as trivial. If the batch updated the
// truncated state then after it has been committed, then the side-loaded data
// and raftentry.Cache should be truncated to the index indicated.
// TODO(ajwerner): consider whether this logic should imply that commands
// which update truncated state are non-trivial.
updatedTruncatedState bool

cmdBuf cmdAppCtxBuf
}

// cmdAppBatch structs are needed to apply raft commands, which is to
// say, frequently, so best to pool them rather than allocated under the raftMu.
var cmdAppBatchSyncPool = sync.Pool{
New: func() interface{} {
return new(cmdAppBatch)
},
}

func getCmdAppBatch() *cmdAppBatch {
return cmdAppBatchSyncPool.Get().(*cmdAppBatch)
}

func releaseCmdAppBatch(b *cmdAppBatch) {
b.cmdBuf.clear()
*b = cmdAppBatch{}
cmdAppBatchSyncPool.Put(b)
}

// add adds adds the entry and its decoded state to the end of the batch.
func (b *cmdAppBatch) add(e *raftpb.Entry, d decodedRaftEntry) {
s := b.cmdBuf.allocate()
s.decodedRaftEntry = d
s.e = e
}

// decode decodes commands from toProcess until either all of the commands have
// been added to the batch or a non-trivial command is decoded. Non-trivial
// commands must always be in their own batch. If a non-trivial command is
// encountered the batch is returned immediately without adding the newly
// decoded command to the batch or removing it from remaining.
// It is the client's responsibility to deal with non-trivial commands.
//
// numEmptyEntries indicates the number of entries in the consumed portion of
// toProcess contained a zero-byte payload.
func (b *cmdAppBatch) decode(
ctx context.Context, toProcess []raftpb.Entry, decodeBuf *decodedRaftEntry,
) (
foundNonTrivialEntry bool,
numEmptyEntries int,
remaining []raftpb.Entry,
errExpl string,
err error,
) {
for len(toProcess) > 0 {
e := &toProcess[0]
if len(e.Data) == 0 {
numEmptyEntries++
}
if errExpl, err = decodeBuf.decode(ctx, e); err != nil {
return false, numEmptyEntries, nil, errExpl, err
}
// This is a non-trivial entry which needs to be processed alone.
foundNonTrivialEntry = !isTrivial(decodeBuf.replicatedResult(),
b.replicaState.UsingAppliedStateKey)
if foundNonTrivialEntry {
break
}
// We're going to process this entry in this batch so pop it from toProcess
// and add to appStates.
toProcess = toProcess[1:]
b.add(e, *decodeBuf)
}
return foundNonTrivialEntry, numEmptyEntries, toProcess, "", nil
}

func (b *cmdAppBatch) reset() {
b.cmdBuf.clear()
*b = cmdAppBatch{
decodeBuf: b.decodeBuf, // preserve the previously decoded entry
}
}
134 changes: 134 additions & 0 deletions pkg/storage/cmd_app_ctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"
)

// cmdAppCtx stores the state required to apply a single raft
// entry to a replica. The state is accumulated in stages which occur in
// Replica.handleCommittedEntriesRaftMuLocked. From a high level, a command is
// decoded into an entryApplicationBatch, then if it was proposed locally the
// proposal is populated from the replica's proposals map, then the command
// is staged into the batch by writing its update to the batch's engine.Batch
// and applying its "trivial" side-effects to the batch's view of ReplicaState.
// Then the batch is committed, the side-effects are applied and the local
// result is processed.
type cmdAppCtx struct {
// e is the Entry being applied.
e *raftpb.Entry
decodedRaftEntry // decoded from e.

// proposal is populated on the proposing Replica only and comes from the
// Replica's proposal map.
proposal *ProposalData
// ctx will be the proposal's context if proposed locally, otherwise it will
// be populated with the handleCommittedEntries ctx.
ctx context.Context

// The below fields are set during stageRaftCommand when we validate that
// a command applies given the current lease in checkForcedErr.
leaseIndex uint64
forcedErr *roachpb.Error
proposalRetry proposalReevaluationReason
mutationCount int // number of mutations in the WriteBatch, for writeStats
// splitMergeUnlock is acquired for splits and merges.
splitMergeUnlock func(*storagepb.ReplicatedEvalResult)

// The below fields are set after the data has been written to the storage
// engine in prepareLocalResult.
localResult *result.LocalResult
response proposalResult
}

func (cmd *cmdAppCtx) proposedLocally() bool {
return cmd.proposal != nil
}

// decodedRaftEntry represents the deserialized content of a raftpb.Entry.
type decodedRaftEntry struct {
idKey storagebase.CmdIDKey
raftCmd storagepb.RaftCommand
*decodedConfChange // only non-nil for config changes
}

// decodedConfChange represents the fields of a config change raft command.
type decodedConfChange struct {
cc raftpb.ConfChange
ccCtx ConfChangeContext
}

func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult {
return &d.raftCmd.ReplicatedEvalResult
}

// decode decodes the entry e into the decodedRaftEntry.
func (d *decodedRaftEntry) decode(
ctx context.Context, e *raftpb.Entry,
) (errExpl string, err error) {
*d = decodedRaftEntry{}
// etcd raft sometimes inserts nil commands, ours are never nil.
// This case is handled upstream of this call.
if len(e.Data) == 0 {
return "", nil
}
switch e.Type {
case raftpb.EntryNormal:
return d.decodeNormalEntry(e)
case raftpb.EntryConfChange:
return d.decodeConfChangeEntry(e)
default:
log.Fatalf(ctx, "unexpected Raft entry: %v", e)
return "", nil // unreachable
}
}

func (d *decodedRaftEntry) decodeNormalEntry(e *raftpb.Entry) (errExpl string, err error) {
var encodedCommand []byte
d.idKey, encodedCommand = DecodeRaftCommand(e.Data)
// An empty command is used to unquiesce a range and wake the
// leader. Clear commandID so it's ignored for processing.
if len(encodedCommand) == 0 {
d.idKey = ""
} else if err := protoutil.Unmarshal(encodedCommand, &d.raftCmd); err != nil {
const errExpl = "while unmarshalling entry"
return errExpl, errors.Wrap(err, errExpl)
}
return "", nil
}

func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) (errExpl string, err error) {
d.decodedConfChange = &decodedConfChange{}
if err := protoutil.Unmarshal(e.Data, &d.cc); err != nil {
const errExpl = "while unmarshaling ConfChange"
return errExpl, errors.Wrap(err, errExpl)
}
if err := protoutil.Unmarshal(d.cc.Context, &d.ccCtx); err != nil {
const errExpl = "while unmarshaling ConfChangeContext"
return errExpl, errors.Wrap(err, errExpl)
}
if err := protoutil.Unmarshal(d.ccCtx.Payload, &d.raftCmd); err != nil {
const errExpl = "while unmarshaling RaftCommand"
return errExpl, errors.Wrap(err, errExpl)
}
d.idKey = storagebase.CmdIDKey(d.ccCtx.CommandID)
return "", nil
}
116 changes: 116 additions & 0 deletions pkg/storage/cmd_app_ctx_buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import "sync"

// cmdAppCtxBufNodeSize is the size of the arrays in an
// cmdAppStateBufNode.
// TODO(ajwerner): justify this number.
const cmdAppCtxBufNodeSize = 8

// cmdAppCtxBuf is an allocation-efficient buffer used during the
// application of raft entries. Initialization occurs lazily upon the first
// call to allocate but used cmdAppCtxBuf objects should be released
// explicitly with the clear() method to release the allocated buffers back
// to the pool.
type cmdAppCtxBuf struct {
len int32
head, tail *cmdAppCtxBufNode
}

// cmdAppCtxBufNode is a linked-list element in an
// cmdAppStateBuf.
type cmdAppCtxBufNode struct {
len int32
buf [cmdAppCtxBufNodeSize]cmdAppCtx
next *cmdAppCtxBufNode
}

var cmdAppStateBufNodeSyncPool = sync.Pool{
New: func() interface{} { return new(cmdAppCtxBufNode) },
}

// allocate extends the length of buf by one and returns the newly
// added element. If this is the first call to allocate it will initialize buf.
// After a buf is initialized it should be explicitly destroyed.
func (buf *cmdAppCtxBuf) allocate() *cmdAppCtx {
if buf.tail == nil { // lazy initialization
n := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode)
buf.head, buf.tail = n, n
}
if buf.tail.len == cmdAppCtxBufNodeSize {
newTail := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode)
buf.tail.next = newTail
buf.tail = newTail
}
ret := &buf.tail.buf[buf.tail.len]
buf.tail.len++
buf.len++
return ret
}

// truncate clears all of the entries currently in a buffer and returns any
// allocated buffers to the pool.
func (buf *cmdAppCtxBuf) clear() {
for buf.head != nil {
buf.len -= buf.head.len
oldHead := buf.head
newHead := oldHead.next
buf.head = newHead
*oldHead = cmdAppCtxBufNode{}
cmdAppStateBufNodeSyncPool.Put(oldHead)
}
*buf = cmdAppCtxBuf{}
}

// last returns a pointer to the last element in the buffer.
func (buf *cmdAppCtxBuf) last() *cmdAppCtx {
return &buf.tail.buf[buf.tail.len-1]
}

// cmdAppCtxBufIterator iterates through the entries in an
// cmdAppStateBuf.
type cmdAppCtxBufIterator struct {
idx int32
buf *cmdAppCtxBuf
node *cmdAppCtxBufNode
}

// init seeks the iterator to the front of buf. It returns true if buf is not
// empty and false if it is.
func (it *cmdAppCtxBufIterator) init(buf *cmdAppCtxBuf) (ok bool) {
*it = cmdAppCtxBufIterator{buf: buf, node: buf.head}
return it.buf.len > 0
}

// cur returns the cmdAppState currently pointed to by it.
func (it *cmdAppCtxBufIterator) cur() *cmdAppCtx {
return &it.node.buf[it.idx%cmdAppCtxBufNodeSize]
}

// isLast returns true if it currently points to the last element in the buffer.
func (it *cmdAppCtxBufIterator) isLast() bool {
return it.idx+1 == it.buf.len
}

// next moves it to point to the next element. It returns false if it.isLast()
// is true, indicating that there are no more elements in the buffer.
func (it *cmdAppCtxBufIterator) next() bool {
if it.isLast() {
return false
}
it.idx++
if it.idx%cmdAppCtxBufNodeSize == 0 {
it.node = it.node.next
}
return true
}
Loading

0 comments on commit 509baff

Please sign in to comment.