Skip to content

Commit

Permalink
storage: batch command application and coalesce applied state per batch
Browse files Browse the repository at this point in the history
This commit batches raft command application where possible. The basic approach
is to notice that many commands only "trivially" update the replica state
machine. Trivial commands can be processed in a single batch by acting on a
copy of the replica state. Non-trivial commands share the same logic but always
commit alone as they for one reason or another rely on having a view of the
replica or storage engine as of a specific log index.

This commit also sneaks in another optimization which batching enables. Each
command mutates a portion of replica state called the applied state which
tracks a combination of the log index which has been applied and the MVCC stats
of the range as of that application. Before this change each entry would update
this applied state and each of those writes will end up in the WAL and
mem-table just the be compacted away in L1. Now that commands are being applied
to the storage engine in a single batch it is easy to only update the applied
state for the last entry in the batch.

For sequential writes this patch shows a considerable performance win. The
below benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with
concurrency 128.

```
name            old ops/s   new ops/s   delta
KV0-throughput  22.1k ± 1%  32.8k ± 1%  +48.59%  (p=0.029 n=4+4)

name            old ms/s    new ms/s    delta
KV0-P50          7.15 ± 2%   6.00 ± 0%  -16.08%  (p=0.029 n=4+4)
KV0-Avg          5.80 ± 0%   3.80 ± 0%  -34.48%  (p=0.029 n=4+4)
```

Due to the re-organization of logic in the change, the Replica.mu does not need
to be acquired as many times during the application of a batch. In the common
case it is now acquired exactly twice in the process of applying a batch
whereas before it was acquired more than twice per entry. This should hopefully
improve performance on large machines which experience mutex contention for a
single range.

This effect is visible on large machines. Below are results from running
a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with
concurrency 1024 and 16 initial splits.

```
name            old ops/s   new ops/s    delta
KV0-throughput  78.1k ± 1%  116.8k ± 5%  +49.42%  (p=0.029 n=4+4)

name            old ms/s    new ms/s     delta
KV0-P50          24.4 ± 3%    19.7 ± 7%  -19.28%  (p=0.029 n=4+4)
KV0-Avg          12.6 ± 0%     7.5 ± 9%  -40.87%  (p=0.029 n=4+4)
```

Fixes #37426.

Release note (performance improvement): Batch raft entry application and
coalesce writes to applied state for the batch.
  • Loading branch information
ajwerner committed Jul 8, 2019
1 parent ff57e89 commit 27cf2b4
Show file tree
Hide file tree
Showing 7 changed files with 1,622 additions and 1,071 deletions.
139 changes: 139 additions & 0 deletions pkg/storage/entry_application_state_buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import "sync"

// entryApplicationStateBufNodeSize is the size of the arrays in an
// entryApplicationStateBufNode.
// TODO(ajwerner): justify this number.
const entryApplicationStateBufNodeSize = 8

// entryApplicationStateBuf is an allocation-efficient buffer used during the
// application of raft entries. Initialization occurs lazily upon the first
// call to allocate but used entryApplicationStateBuf objects should be released
// explicitly with the destroy() method to release the allocated buffers back
// to the pool.
type entryApplicationStateBuf struct {
len int32
head, tail *entryApplicationStateBufNode
}

var entryApplicationStateBufNodeSyncPool = sync.Pool{
New: func() interface{} { return new(entryApplicationStateBufNode) },
}

// entryApplicationStateRingBuf is a ring-buffer of entryApplicationState.
// It offers indexing and truncation.
type entryApplicationStateBufNode struct {
len int32
buf [entryApplicationStateBufNodeSize]entryApplicationState
next *entryApplicationStateBufNode
}

func (buf *entryApplicationStateBuf) last() *entryApplicationState {
return &buf.tail.buf[buf.tail.len-1]
}

// allocate extends the length of buf by one and returns the newly
// added element. If this is the fist call to allocate it will initialize buf.
// After a buf is initialized it should be explicitly destroyed.
func (buf *entryApplicationStateBuf) allocate() *entryApplicationState {
if buf.tail == nil { // lazy initialization
n := entryApplicationStateBufNodeSyncPool.Get().(*entryApplicationStateBufNode)
buf.head, buf.tail = n, n
}
if buf.tail.len == entryApplicationStateBufNodeSize {
newTail := entryApplicationStateBufNodeSyncPool.Get().(*entryApplicationStateBufNode)
buf.tail.next = newTail
buf.tail = newTail
}
buf.len++
return buf.tail.allocate()
}

// destroy releases allocated nodes back into the sync pool.
// It is illegal to use buf after a call to destroy.
func (buf *entryApplicationStateBuf) destroy() {
for cur := buf.head; cur != nil; {
next := cur.next
*cur = entryApplicationStateBufNode{}
entryApplicationStateBufNodeSyncPool.Put(cur)
cur, buf.head = next, next
}
*buf = entryApplicationStateBuf{}
}

// truncate clears all of the entries currently in a buffer.
func (buf *entryApplicationStateBuf) clear() {
for buf.head != buf.tail {
buf.len -= buf.head.len
oldHead := buf.head
newHead := oldHead.next
buf.head = newHead
oldHead.clear()
entryApplicationStateBufNodeSyncPool.Put(oldHead)
}
buf.head.clear()
buf.len = 0
}

// allocate extends the length of the node by one and returns the pointer to the
// newly allocated element. It is illegal to call allocate on a full node.
func (rb *entryApplicationStateBufNode) allocate() *entryApplicationState {
if rb.len == entryApplicationStateBufNodeSize {
panic("cannot push onto a full entryApplicationStateBufNode")
}
ret := &rb.buf[rb.len]
rb.len++
return ret
}

// clear zeroes rb.
func (rb *entryApplicationStateBufNode) clear() {
*rb = entryApplicationStateBufNode{}
}

type entryApplicationStateBufIterator struct {
idx int32
offset int32
buf *entryApplicationStateBuf
node *entryApplicationStateBufNode
}

func (it *entryApplicationStateBufIterator) init(buf *entryApplicationStateBuf) bool {
*it = entryApplicationStateBufIterator{
buf: buf,
node: buf.head,
}
return it.buf.len > 0
}

func (it *entryApplicationStateBufIterator) state() *entryApplicationState {
return &it.node.buf[it.offset]
}

func (it *entryApplicationStateBufIterator) isLast() bool {
return it.idx+1 == it.buf.len
}

func (it *entryApplicationStateBufIterator) next() bool {
if it.idx+1 == it.buf.len {
return false
}
it.idx++
it.offset++
if it.offset == entryApplicationStateBufNodeSize {
it.node = it.node.next
it.offset = 0
}
return true
}
44 changes: 44 additions & 0 deletions pkg/storage/entry_application_state_buf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)

// TestApplicationStateBuf is an overly simplistic test of the
// entryApplicationStateBuf behavior.
func TestApplicationStateBuf(t *testing.T) {
defer leaktest.AfterTest(t)()
var buf entryApplicationStateBuf
var states []*entryApplicationState
for i := 0; i < 5*entryApplicationStateBufNodeSize+1; i++ {
assert.Equal(t, i, int(buf.len))
states = append(states, buf.allocate())
assert.Equal(t, i+1, int(buf.len))
}
last := states[len(states)-1]
assert.Equal(t, last, buf.last())
var it entryApplicationStateBufIterator
i := 0
for ok := it.init(&buf); ok; ok = it.next() {
assert.Equal(t, states[i], it.state())
i++
}
buf.clear()
assert.Equal(t, 0, int(buf.len))
assert.Panics(t, func() { buf.last() })
buf.destroy()
assert.EqualValues(t, buf, entryApplicationStateBuf{})
}
Loading

0 comments on commit 27cf2b4

Please sign in to comment.