Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
28009: release-2.0: storage/engine: limit the size of batch groups r=benesch a=petermattis

Backport 1/1 commits from cockroachdb#27895.

/cc @cockroachdb/release

Queuing this up so we don't forget about it.

---

Limit the size of batch groups to 1 MB. A larger batch can still be
committed, but it won't be grouped with any other batches.

Fixes cockroachdb#27865

Release note (bug fix): Limit the size of "batch groups" when committing
a batch to RocksDB to avoid rare scenarios in which multi-gigabyte batch
groups are created which can cause a server to run out of memory when
replaying the RocksDB log at startup.


Co-authored-by: Peter Mattis <[email protected]>
  • Loading branch information
craig[bot] and petermattis committed Aug 2, 2018
2 parents cb9bb37 + 27346e4 commit 72b4e71
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 11 deletions.
70 changes: 60 additions & 10 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ type RocksDB struct {
syncutil.Mutex
cond sync.Cond
committing bool
groupSize int
pending []*rocksDBBatch
}

Expand Down Expand Up @@ -626,7 +627,6 @@ func (r *RocksDB) open() error {
func (r *RocksDB) syncLoop() {
s := &r.syncer
s.Lock()
defer s.Unlock()

var lastSync time.Time

Expand All @@ -635,6 +635,7 @@ func (r *RocksDB) syncLoop() {
s.cond.Wait()
}
if s.closed {
s.Unlock()
return
}

Expand Down Expand Up @@ -1623,6 +1624,47 @@ func (r *rocksDBBatch) NewTimeBoundIterator(start, end hlc.Timestamp) Iterator {
return iter
}

const maxBatchGroupSize = 1 << 20 // 1 MiB

// makeBatchGroup add the specified batch to the pending list of batches to
// commit. Groups are delimited by a nil batch in the pending list. Group
// leaders are the first batch in the pending list and the first batch after a
// nil batch. The size of a group is limited by the maxSize parameter which is
// measured as the number of bytes in the group's batches. The groupSize
// parameter is the size of the current group being formed. Returns the new
// list of pending batches, the new size of the current group and whether the
// batch that was added is the leader of its group.
func makeBatchGroup(
pending []*rocksDBBatch, b *rocksDBBatch, groupSize, maxSize int,
) (_ []*rocksDBBatch, _ int, leader bool) {
leader = len(pending) == 0
if n := len(b.unsafeRepr()); leader {
groupSize = n
} else if groupSize+n > maxSize {
leader = true
groupSize = n
pending = append(pending, nil)
} else {
groupSize += n
}
pending = append(pending, b)
return pending, groupSize, leader
}

// nextBatchGroup extracts the group of batches from the pending list. See
// makeBatchGroup for an explanation of how groups are encoded into the pending
// list. Returns the next group in the prefix return value, and the remaining
// groups in the suffix parameter (the next group is always a prefix of the
// pending argument).
func nextBatchGroup(pending []*rocksDBBatch) (prefix []*rocksDBBatch, suffix []*rocksDBBatch) {
for i := 1; i < len(pending); i++ {
if pending[i] == nil {
return pending[:i], pending[i+1:]
}
}
return pending, pending[len(pending):]
}

func (r *rocksDBBatch) Commit(syncCommit bool) error {
if r.Closed() {
panic("this batch was already committed")
Expand All @@ -1643,16 +1685,19 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error {
// slice. Every batch has an associated wait group which is signaled when
// the commit is complete.
c.Lock()
leader := len(c.pending) == 0
c.pending = append(c.pending, r)

var leader bool
c.pending, c.groupSize, leader = makeBatchGroup(c.pending, r, c.groupSize, maxBatchGroupSize)

if leader {
// We're the leader. Wait for any running commit to finish.
for c.committing {
// We're the leader of our group. Wait for any running commit to finish and
// for our batch to make it to the head of the pending queue.
for c.committing || c.pending[0] != r {
c.cond.Wait()
}
pending := c.pending
c.pending = nil

var pending []*rocksDBBatch
pending, c.pending = nextBatchGroup(c.pending)
c.committing = true
c.Unlock()

Expand Down Expand Up @@ -1683,14 +1728,19 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error {
// proceed.
c.Lock()
c.committing = false
c.cond.Signal()
// NB: Multiple leaders can be waiting.
c.cond.Broadcast()
c.Unlock()

// Propagate the error to all of the batches involved in the commit. If a
// batch requires syncing and the commit was successful, add it to the
// syncing list. Note that we're reusing the pending list here for the
// syncing list.
syncing := pending[:0]
// syncing list. We need to be careful to cap the capacity so that
// extending this slice past the length of the pending list will result in
// reallocation. Otherwise we have a race between appending to this list
// while holding the sync lock below, and appending to the commit pending
// list while holding the commit lock above.
syncing := pending[:0:len(pending)]
for _, b := range pending {
if err != nil || !b.syncCommit {
b.commitErr = err
Expand Down
62 changes: 61 additions & 1 deletion pkg/storage/engine/rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
"testing"
Expand Down Expand Up @@ -544,7 +545,12 @@ func TestConcurrentBatch(t *testing.T) {
if err := batch.Put(MakeMVCCMetadataKey(key), nil); err != nil {
t.Fatal(err)
}
if len(batch.Repr()) >= 4<<20 {
const targetSize = 4 << 20
if targetSize < maxBatchGroupSize {
t.Fatalf("target size (%d) should be larger than the max batch group size (%d)",
targetSize, maxBatchGroupSize)
}
if len(batch.Repr()) >= targetSize {
break
}
}
Expand Down Expand Up @@ -1029,3 +1035,57 @@ func TestRocksDBOptions(t *testing.T) {
}
}
}

func TestMakeBatchGroup(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
maxSize int
sizes []int
groupSize []int
leader []bool
groups []int
}{
{1, []int{100, 100, 100}, []int{100, 100, 100}, []bool{true, true, true}, []int{1, 1, 1}},
{199, []int{100, 100, 100}, []int{100, 100, 100}, []bool{true, true, true}, []int{1, 1, 1}},
{200, []int{100, 100, 100}, []int{100, 200, 100}, []bool{true, false, true}, []int{2, 1}},
{299, []int{100, 100, 100}, []int{100, 200, 100}, []bool{true, false, true}, []int{2, 1}},
{300, []int{100, 100, 100}, []int{100, 200, 300}, []bool{true, false, false}, []int{3}},
{
400,
[]int{100, 200, 300, 100, 500},
[]int{100, 300, 300, 400, 500},
[]bool{true, false, true, false, true},
[]int{2, 2, 1},
},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
var pending []*rocksDBBatch
var groupSize int
for i := range c.sizes {
// We use intimate knowledge of rocksDBBatch and RocksDBBatchBuilder to
// construct a batch of a specific size.
b := &rocksDBBatch{}
b.builder.repr = make([]byte, c.sizes[i])
var leader bool
pending, groupSize, leader = makeBatchGroup(pending, b, groupSize, c.maxSize)
if c.groupSize[i] != groupSize {
t.Fatalf("expected group size %d, but found %d", c.groupSize[i], groupSize)
}
if c.leader[i] != leader {
t.Fatalf("expected leader %t, but found %t", c.leader[i], leader)
}
}
var groups []int
for len(pending) > 0 {
var group []*rocksDBBatch
group, pending = nextBatchGroup(pending)
groups = append(groups, len(group))
}
if !reflect.DeepEqual(c.groups, groups) {
t.Fatalf("expected %d, but found %d", c.groups, groups)
}
})
}
}

0 comments on commit 72b4e71

Please sign in to comment.