Skip to content

Commit

Permalink
engine: Port MVCCScanner to Go, implement MVCCScan for pebble
Browse files Browse the repository at this point in the history
This change ports the MVCC Scanner from libroach to go, for use
with Pebble's iterators. One half of the work involved in cockroachdb#39674.
Also updates pebbleIterator to call it in pebbleIterator.MVCC*.

Release note: None
  • Loading branch information
itsbilal committed Oct 15, 2019
1 parent aa1bfce commit 48e1008
Show file tree
Hide file tree
Showing 3 changed files with 758 additions and 11 deletions.
7 changes: 0 additions & 7 deletions pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,7 @@ func TestEngineBatchStaleCachedIterator(t *testing.T) {

// Higher-level failure mode. Mostly for documentation.
{
// TODO(itsbilal): Stop skipping this test for pebble batches when the
// pebble MVCC scanner is merged.
batch := eng.NewBatch()
switch batch.(type) {
case *pebbleBatch:
return
default:
}
defer batch.Close()

key := roachpb.Key("z")
Expand Down
157 changes: 153 additions & 4 deletions pkg/storage/engine/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

Expand Down Expand Up @@ -302,16 +303,164 @@ func (p *pebbleIterator) FindSplitKey(
func (p *pebbleIterator) MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
// TODO(itsbilal): Implement in a separate PR. See #39674.
panic("unimplemented for now, see #39674")
if opts.Inconsistent && opts.Txn != nil {
return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(key) == 0 {
return nil, nil, emptyKeyError()
}
if p.iter == nil {
panic("uninitialized iterator")
}

// MVCCGet is implemented as an MVCCScan with an end key that sorts after the
// start key.
keyEnd := make([]byte, 0, len(key)+1)
keyEnd = append(keyEnd, key...)
keyEnd = append(keyEnd, 0x00)

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

*mvccScanner = pebbleMVCCScanner{
parent: p.iter,
start: key,
end: keyEnd,
ts: timestamp,
maxKeys: 1,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

if opts.Txn != nil {
mvccScanner.txn = opts.Txn
mvccScanner.checkUncertainty = timestamp.Less(opts.Txn.MaxTimestamp)
}

mvccScanner.init()
mvccScanner.get()

// Init calls SetBounds. Reset it to what this iterator had at the start.
defer func() {
if p.iter != nil {
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
}
}()

if mvccScanner.err != nil {
return nil, nil, mvccScanner.err
}
intents, err := buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, nil, err
}
if !opts.Inconsistent && len(intents) > 0 {
return nil, nil, &roachpb.WriteIntentError{Intents: intents}
}

if len(intents) > 1 {
return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents))
} else if len(intents) == 1 {
intent = &intents[0]
}

if len(mvccScanner.results.repr) == 0 {
return nil, intent, nil
}

mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr)
if err != nil {
return nil, nil, err
}

value = &roachpb.Value{
RawBytes: rawValue,
Timestamp: mvccKey.Timestamp,
}
return
}

// MVCCScan implements the Iterator interface.
func (p *pebbleIterator) MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData []byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
// TODO(itsbilal): Implement in a separate PR. See #39674.
panic("unimplemented for now, see #39674")
if opts.Inconsistent && opts.Txn != nil {
return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(end) == 0 {
return nil, 0, nil, nil, emptyKeyError()
}
if max == 0 {
resumeSpan = &roachpb.Span{Key: start, EndKey: end}
return nil, 0, resumeSpan, nil, nil
}
if p.iter == nil {
panic("uninitialized iterator")
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

*mvccScanner = pebbleMVCCScanner{
parent: p.iter,
reverse: opts.Reverse,
start: start,
end: end,
ts: timestamp,
maxKeys: max,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

if opts.Txn != nil {
mvccScanner.txn = opts.Txn
mvccScanner.checkUncertainty = timestamp.Less(opts.Txn.MaxTimestamp)
}

mvccScanner.init()
mvccScanner.scan()

// Init calls SetBounds. Reset it to what this iterator had at the start.
defer func() {
if p.iter != nil {
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
}
}()

if mvccScanner.err != nil {
return nil, 0, nil, nil, mvccScanner.err
}

kvData = mvccScanner.results.repr
numKVs = mvccScanner.results.count

if mvccScanner.curKey != nil {
if opts.Reverse {
resumeSpan = &roachpb.Span{
Key: mvccScanner.start,
EndKey: mvccScanner.curKey,
}
// curKey was not added to results, so it needs to be included in the
// resume span.
resumeSpan.EndKey = resumeSpan.EndKey.Next()
} else {
resumeSpan = &roachpb.Span{
Key: mvccScanner.curKey,
EndKey: mvccScanner.end,
}
}
}
intents, err = buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, 0, nil, nil, err
}

if !opts.Inconsistent && len(intents) > 0 {
return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents}
}
return
}

// SetUpperBound implements the Iterator interface.
Expand Down
Loading

0 comments on commit 48e1008

Please sign in to comment.