Skip to content

Commit

Permalink
Merge pull request #41896 from petermattis/pmattis/pebble-batch-iterator
Browse files Browse the repository at this point in the history
storage/engine: fix pebbleBatch.iter reuse
  • Loading branch information
petermattis authored Oct 24, 2019
2 parents 13e3f12 + 550d483 commit fad4071
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 37 deletions.
7 changes: 4 additions & 3 deletions pkg/storage/engine/mvcc_logical_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ package engine
import (
"context"
"math"
"reflect"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/kr/pretty"
)

func TestMVCCOpLogWriter(t *testing.T) {
Expand Down Expand Up @@ -164,8 +165,8 @@ func TestMVCCOpLogWriter(t *testing.T) {
TxnID: txn2.ID,
}),
}
if ops := ol.LogicalOps(); !reflect.DeepEqual(exp, ops) {
t.Errorf("expected logical ops %+v, found %+v", exp, ops)
if diff := pretty.Diff(exp, ol.LogicalOps()); diff != nil {
t.Errorf("unexpected logical op differences:\n%s", strings.Join(diff, "\n"))
}
})
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,10 @@ func (p *Pebble) GetSSTables() (sstables SSTableInfos) {
}

type pebbleReadOnly struct {
parent *Pebble
iter pebbleIterator
closed bool
parent *Pebble
prefixIter pebbleIterator
normalIter pebbleIterator
closed bool
}

var _ ReadWriter = &pebbleReadOnly{}
Expand All @@ -540,7 +541,8 @@ func (p *pebbleReadOnly) Close() {
panic("closing an already-closed pebbleReadOnly")
}
p.closed = true
p.iter.destroy()
p.prefixIter.destroy()
p.normalIter.destroy()
}

func (p *pebbleReadOnly) Closed() bool {
Expand Down Expand Up @@ -580,18 +582,23 @@ func (p *pebbleReadOnly) NewIterator(opts IterOptions) Iterator {
return newPebbleIterator(p.parent.db, opts)
}

if p.iter.inuse {
iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
}
if iter.inuse {
panic("iterator already in use")
}
p.iter.inuse = true
p.iter.reusable = true

if p.iter.iter != nil {
p.iter.setOptions(opts)
if iter.iter != nil {
iter.setOptions(opts)
} else {
p.iter.init(p.parent.db, opts)
iter.init(p.parent.db, opts)
iter.reusable = true
}
return &p.iter

iter.inuse = true
return iter
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
Expand Down
52 changes: 33 additions & 19 deletions pkg/storage/engine/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type pebbleBatch struct {
db *pebble.DB
batch *pebble.Batch
buf []byte
iter pebbleIterator
prefixIter pebbleIterator
normalIter pebbleIterator
closed bool
isDistinct bool
distinctOpen bool
Expand All @@ -45,9 +46,15 @@ func newPebbleBatch(db *pebble.DB, batch *pebble.Batch) *pebbleBatch {
db: db,
batch: batch,
buf: pb.buf,
iter: pebbleIterator{
lowerBoundBuf: pb.iter.lowerBoundBuf,
upperBoundBuf: pb.iter.upperBoundBuf,
prefixIter: pebbleIterator{
lowerBoundBuf: pb.prefixIter.lowerBoundBuf,
upperBoundBuf: pb.prefixIter.upperBoundBuf,
reusable: true,
},
normalIter: pebbleIterator{
lowerBoundBuf: pb.normalIter.lowerBoundBuf,
upperBoundBuf: pb.normalIter.upperBoundBuf,
reusable: true,
},
}
return pb
Expand All @@ -59,14 +66,19 @@ func (p *pebbleBatch) Close() {
panic("closing an already-closed pebbleBatch")
}
p.closed = true

// Destroy the iterators before closing the batch.
p.prefixIter.destroy()
p.normalIter.destroy()

if !p.isDistinct {
_ = p.batch.Close()
p.batch = nil
} else {
p.parentBatch.distinctOpen = false
p.isDistinct = false
}
p.iter.destroy()

pebbleBatchPool.Put(p)
}

Expand Down Expand Up @@ -157,20 +169,24 @@ func (p *pebbleBatch) NewIterator(opts IterOptions) Iterator {
return newPebbleIterator(p.batch, opts)
}

if p.iter.inuse {
iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
}
if iter.inuse {
panic("iterator already in use")
}
p.iter.inuse = true
p.iter.reusable = true

if p.iter.iter != nil {
p.iter.setOptions(opts)
if iter.iter != nil {
iter.setOptions(opts)
} else if p.batch.Indexed() {
p.iter.init(p.batch, opts)
iter.init(p.batch, opts)
} else {
p.iter.init(p.db, opts)
iter.init(p.db, opts)
}
return &p.iter

iter.inuse = true
return iter
}

// NewIterator implements the Batch interface.
Expand Down Expand Up @@ -322,12 +338,10 @@ func (p *pebbleBatch) Distinct() ReadWriter {
// optimization. In Pebble we're still using the same underlying batch and if
// it is indexed we'll still be indexing it as we Go.
p.distinctOpen = true
return &pebbleBatch{
db: p.db,
batch: p.batch,
parentBatch: p,
isDistinct: true,
}
d := newPebbleBatch(p.db, p.batch)
d.parentBatch = p
d.isDistinct = true
return d
}

// Empty implements the Batch interface.
Expand Down
15 changes: 11 additions & 4 deletions pkg/storage/engine/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) {
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
prefix: opts.Prefix,
reusable: p.reusable,
}

if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
Expand Down Expand Up @@ -125,6 +126,8 @@ func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) {
if p.iter == nil {
panic("unable to create iterator")
}

p.inuse = true
}

func (p *pebbleIterator) setOptions(opts IterOptions) {
Expand Down Expand Up @@ -156,11 +159,12 @@ func (p *pebbleIterator) setOptions(opts IterOptions) {

// Close implements the Iterator interface.
func (p *pebbleIterator) Close() {
if !p.inuse {
panic("closing idle iterator")
}
p.inuse = false

if p.reusable {
if !p.inuse {
panic("closing idle iterator")
}
p.inuse = false
return
}

Expand Down Expand Up @@ -533,6 +537,9 @@ func (p *pebbleIterator) Stats() IteratorStats {
}

func (p *pebbleIterator) destroy() {
if p.inuse {
panic("iterator still in use")
}
if p.iter != nil {
err := p.iter.Close()
if err != nil {
Expand Down

0 comments on commit fad4071

Please sign in to comment.