Skip to content

Commit

Permalink
engine: Port MVCCScanner to Go, implement MVCCScan for pebble
Browse files Browse the repository at this point in the history
This change ports the MVCC Scanner from libroach to go, for use
with Pebble's iterators. One half of the work involved in cockroachdb#39674.
Also updates pebbleIterator to call it in pebbleIterator.MVCC*.

Release note: None
  • Loading branch information
itsbilal committed Oct 14, 2019
1 parent 4b1974c commit 62ff407
Show file tree
Hide file tree
Showing 3 changed files with 731 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func MakeValue(meta enginepb.MVCCMetadata) roachpb.Value {
return roachpb.Value{RawBytes: meta.RawBytes}
}

// IsIntentOf returns true if the meta record is an intent of the supplied
// IsIntentOf returns true if the meta record is an intent of the supplied m
// transaction.
func IsIntentOf(meta *enginepb.MVCCMetadata, txn *roachpb.Transaction) bool {
return meta.Txn != nil && txn != nil && meta.Txn.ID == txn.ID
Expand Down
153 changes: 149 additions & 4 deletions pkg/storage/engine/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

Expand All @@ -38,6 +39,8 @@ type pebbleIterator struct {
// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC key when true.
prefix bool
// MVCC Scanner struct to store variables for one call to MVCCGet / MVCCScan.
mvccScanner pebbleMVCCScanner
}

var _ Iterator = &pebbleIterator{}
Expand Down Expand Up @@ -302,16 +305,158 @@ func (p *pebbleIterator) FindSplitKey(
func (p *pebbleIterator) MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
// TODO(itsbilal): Implement in a separate PR. See #39674.
panic("unimplemented for now, see #39674")
if opts.Inconsistent && opts.Txn != nil {
return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(key) == 0 {
return nil, nil, emptyKeyError()
}
if p.iter == nil {
panic("uninitialized iterator")
}

// MVCCGet is implemented as an MVCCScan with an end key that sorts after the
// start key.
keyEnd := make([]byte, 0, len(key)+1)
keyEnd = append(keyEnd, key...)
keyEnd = append(keyEnd, 0x00)

p.mvccScanner = pebbleMVCCScanner{
parent: p.iter,
start: key,
end: keyEnd,
ts: timestamp,
maxKeys: 1,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

if opts.Txn != nil {
p.mvccScanner.txn = opts.Txn
p.mvccScanner.checkUncertainty = timestamp.Less(opts.Txn.MaxTimestamp)
}

p.mvccScanner.init()
p.mvccScanner.get()

// Init calls SetBounds. Reset it to what this iterator had at the start.
defer func() {
if p.iter != nil {
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
}
}()

if p.mvccScanner.err != nil {
return nil, nil, p.mvccScanner.err
}
intents, err := buildScanIntents(p.mvccScanner.intents.Repr())
if err != nil {
return nil, nil, err
}
if !opts.Inconsistent && len(intents) > 0 {
return nil, nil, &roachpb.WriteIntentError{Intents: intents}
}

if len(intents) > 1 {
return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents))
} else if len(intents) == 1 {
intent = &intents[0]
}

if len(p.mvccScanner.results.repr) == 0 {
return nil, intent, nil
}

mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(p.mvccScanner.results.repr)
if err != nil {
return nil, nil, err
}

value = &roachpb.Value{
RawBytes: rawValue,
Timestamp: mvccKey.Timestamp,
}
return
}

// MVCCScan implements the Iterator interface.
func (p *pebbleIterator) MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData []byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
// TODO(itsbilal): Implement in a separate PR. See #39674.
panic("unimplemented for now, see #39674")
if opts.Inconsistent && opts.Txn != nil {
return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(end) == 0 {
return nil, 0, nil, nil, emptyKeyError()
}
if max == 0 {
resumeSpan = &roachpb.Span{Key: start, EndKey: end}
return nil, 0, resumeSpan, nil, nil
}
if p.iter == nil {
panic("uninitialized iterator")
}

p.mvccScanner = pebbleMVCCScanner{
parent: p.iter,
reverse: opts.Reverse,
start: start,
end: end,
ts: timestamp,
maxKeys: max,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

if opts.Txn != nil {
p.mvccScanner.txn = opts.Txn
p.mvccScanner.checkUncertainty = timestamp.Less(opts.Txn.MaxTimestamp)
}

p.mvccScanner.init()
p.mvccScanner.scan()

// Init calls SetBounds. Reset it to what this iterator had at the start.
defer func() {
if p.iter != nil {
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
}
}()

if p.mvccScanner.err != nil {
return nil, 0, nil, nil, p.mvccScanner.err
}

kvData = p.mvccScanner.results.repr
numKVs = p.mvccScanner.results.count

if p.mvccScanner.curKey != nil {
if opts.Reverse {
resumeSpan = &roachpb.Span{
Key: p.mvccScanner.start,
EndKey: p.mvccScanner.curKey,
}
// curKey was not added to results, so it needs to be included in the
// resume span.
resumeSpan.EndKey = resumeSpan.EndKey.Next()
} else {
resumeSpan = &roachpb.Span{
Key: p.mvccScanner.curKey,
EndKey: p.mvccScanner.end,
}
}
}
intents, err = buildScanIntents(p.mvccScanner.intents.Repr())
if err != nil {
return nil, 0, nil, nil, err
}

if !opts.Inconsistent && len(intents) > 0 {
return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents}
}
return
}

// SetUpperBound implements the Iterator interface.
Expand Down
Loading

0 comments on commit 62ff407

Please sign in to comment.