Skip to content

Commit

Permalink
storage/engine: extract an MVCCScanOptions struct
Browse files Browse the repository at this point in the history
The signatures of the functions in the MVCCScan family are extremely
unwieldy. Extract an MVCCScanOptions struct for the fields that have
reasonable default values.

We can immediately realize the benefits of this change by removing
MVCCReverseScan, MVCCReverseScanWithBytes, and MVCCIterateUsingIter,
as the functionality provided by those functions can be requested with
the appropriate option in the MVCCScanOptions struct.

This paves the way towards introducing additional parameters, like
minimum and maximum timestamp bounds.

Updates #28358.

Release note: None
  • Loading branch information
benesch committed Nov 15, 2018
1 parent 6f89f69 commit 1eccba3
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 215 deletions.
6 changes: 2 additions & 4 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
var descs []roachpb.RangeDescriptor

if _, err := engine.MVCCIterate(context.Background(), db, start, end, hlc.MaxTimestamp,
false /* consistent */, false /* tombstones */, nil, /* txn */
false /* reverse */, func(kv roachpb.KeyValue) (bool, error) {
engine.MVCCScanOptions{Inconsistent: true}, func(kv roachpb.KeyValue) (bool, error) {
var desc roachpb.RangeDescriptor
_, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
Expand Down Expand Up @@ -652,8 +651,7 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
var hasError bool

if _, err := engine.MVCCIterate(ctx, db, start, end, hlc.MaxTimestamp,
false /* consistent */, false /* tombstones */, nil, /* txn */
false /* reverse */, func(kv roachpb.KeyValue) (bool, error) {
engine.MVCCScanOptions{Inconsistent: true}, func(kv roachpb.KeyValue) (bool, error) {
rangeID, _, suffix, detail, err := keys.DecodeRangeIDKey(kv.Key)
if err != nil {
return false, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestRangeSplitMeta(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
if _, _, _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, true, nil); err != nil {
if _, _, _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{}); err != nil {
return errors.Errorf("failed to verify no dangling intents: %s", err)
}
return nil
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
// for timing of finishing the test writer and a possibly-ongoing
// asynchronous split.
testutils.SucceedsSoon(t, func() error {
if _, _, _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, true, nil); err != nil {
if _, _, _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{}); err != nil {
return errors.Errorf("failed to verify no dangling intents: %s", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestBootstrapCluster(t *testing.T) {
}

// Scan the complete contents of the local database directly from the engine.
rows, _, _, err := engine.MVCCScan(context.Background(), e, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, true, nil)
rows, _, _, err := engine.MVCCScan(context.Background(), e, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{})
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func (sc *AbortSpan) Get(
func (sc *AbortSpan) Iterate(
ctx context.Context, e engine.Reader, f func(roachpb.Key, roachpb.AbortSpanEntry) error,
) error {
_, err := engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{},
true /* consistent */, false /* tombstones */, nil /* txn */, false, /* reverse */
_, err := engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{}, engine.MVCCScanOptions{},
func(kv roachpb.KeyValue) (bool, error) {
var entry roachpb.AbortSpanEntry
if _, err := keys.DecodeAbortSpanKey(kv.Key, nil); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/addressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestUpdateRangeAddressing(t *testing.T) {
t.Fatal(err)
}
// Scan meta keys directly from engine.
kvs, _, _, err := engine.MVCCScan(ctx, store.Engine(), keys.MetaMin, keys.MetaMax, math.MaxInt64, hlc.MaxTimestamp, true, nil)
kvs, _, _, err := engine.MVCCScan(ctx, store.Engine(), keys.MetaMin, keys.MetaMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{})
if err != nil {
t.Fatal(err)
}
Expand Down
42 changes: 14 additions & 28 deletions pkg/storage/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,21 @@ func RefreshRange(
return result.Result{}, errors.Errorf("no transaction specified to %s", args.Method())
}

iter := batch.NewIterator(engine.IterOptions{
UpperBound: args.EndKey,
})
defer iter.Close()
// Iterate over values until we discover any value written at or
// after the original timestamp, but before or at the current
// timestamp. Note that we do not iterate using the txn and the
// iteration is done with consistent=false. This reads only
// committed values and returns all intents, including those from
// the txn itself. Note that we include tombstones, which must be
// considered as updates on refresh.
// Iterate over values until we discover any value written at or after the
// original timestamp, but before or at the current timestamp. Note that we
// iterate inconsistently without using the txn. This reads only committed
// values and returns all intents, including those from the txn itself. Note
// that we include tombstones, which must be considered as updates on refresh.
log.VEventf(ctx, 2, "refresh %s @[%s-%s]", args.Span(), h.Txn.OrigTimestamp, h.Txn.Timestamp)
intents, err := engine.MVCCIterateUsingIter(
ctx,
iter,
args.Key,
args.EndKey,
h.Txn.Timestamp,
false, /* consistent */
true, /* tombstones */
nil, /* txn */
false, /* reverse */
func(kv roachpb.KeyValue) (bool, error) {
if ts := kv.Value.Timestamp; !ts.Less(h.Txn.OrigTimestamp) {
return true, errors.Errorf("encountered recently written key %s @%s", kv.Key, ts)
}
return false, nil
},
)
intents, err := engine.MVCCIterate(ctx, batch, args.Key, args.EndKey, h.Txn.Timestamp, engine.MVCCScanOptions{
Inconsistent: true,
Tombstones: true,
}, func(kv roachpb.KeyValue) (bool, error) {
if ts := kv.Value.Timestamp; !ts.Less(h.Txn.OrigTimestamp) {
return true, errors.Errorf("encountered recently written key %s @%s", kv.Key, ts)
}
return false, nil
})
if err != nil {
return result.Result{}, err
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/storage/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,26 @@ func ReverseScan(
case roachpb.BATCH_RESPONSE:
var kvData []byte
var numKvs int64
kvData, numKvs, resumeSpan, intents, err = engine.MVCCReverseScanToBytes(
ctx, batch, args.Key, args.EndKey, cArgs.MaxKeys,
h.Timestamp, h.ReadConsistency == roachpb.CONSISTENT, h.Txn)
kvData, numKvs, resumeSpan, intents, err = engine.MVCCScanToBytes(
ctx, batch, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp,
engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
Reverse: true,
})
if err != nil {
return result.Result{}, err
}
reply.NumKeys = numKvs
reply.BatchResponse = kvData
case roachpb.KEY_VALUES:
var rows []roachpb.KeyValue
rows, resumeSpan, intents, err = engine.MVCCReverseScan(ctx, batch, args.Key, args.EndKey,
cArgs.MaxKeys, h.Timestamp, h.ReadConsistency == roachpb.CONSISTENT, h.Txn)
rows, resumeSpan, intents, err = engine.MVCCScan(
ctx, batch, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp, engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
Reverse: true,
})
if err != nil {
return result.Result{}, err
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/storage/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,23 @@ func Scan(
var kvData []byte
var numKvs int64
kvData, numKvs, resumeSpan, intents, err = engine.MVCCScanToBytes(
ctx, batch, args.Key, args.EndKey, cArgs.MaxKeys,
h.Timestamp, h.ReadConsistency == roachpb.CONSISTENT, h.Txn)
ctx, batch, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp,
engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
})
if err != nil {
return result.Result{}, err
}
reply.NumKeys = numKvs
reply.BatchResponse = kvData
case roachpb.KEY_VALUES:
var rows []roachpb.KeyValue
rows, resumeSpan, intents, err = engine.MVCCScan(ctx, batch, args.Key, args.EndKey,
cArgs.MaxKeys, h.Timestamp, h.ReadConsistency == roachpb.CONSISTENT, h.Txn)
rows, resumeSpan, intents, err = engine.MVCCScan(
ctx, batch, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp, engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
})
if err != nil {
return result.Result{}, err
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ func runMVCCScan(emk engineMaker, numRows, numVersions, valueSize int, reverse b
iter.Close()
}

scan := MVCCScan
if reverse {
scan = MVCCReverseScan
}
b.SetBytes(int64(numRows * valueSize))
b.ResetTimer()

Expand All @@ -177,7 +173,9 @@ func runMVCCScan(emk engineMaker, numRows, numVersions, valueSize int, reverse b
endKey = endKey.Next()
walltime := int64(5 * (rand.Int31n(int32(numVersions)) + 1))
ts := hlc.Timestamp{WallTime: walltime}
kvs, _, _, err := scan(context.Background(), eng, startKey, endKey, int64(numRows), ts, true, nil)
kvs, _, _, err := MVCCScan(context.Background(), eng, startKey, endKey, int64(numRows), ts, MVCCScanOptions{
Reverse: reverse,
})
if err != nil {
b.Fatalf("failed scan: %s", err)
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,14 @@ type Iterator interface {
txn *roachpb.Transaction, consistent, tombstones bool,
) (*roachpb.Value, []roachpb.Intent, error)
// MVCCScan is the internal implementation of the family of package-level
// MVCCScan functions. There are two notable differences. The first is that
// key/value pairs are returned raw, as a buffer of varint-prefixed slices,
// alternating from key to value, where numKVs specifies the number of pairs
// in the buffer. The second is that the tombstones parameter allows returning
// deleted values, where the value portion will be empty.
// MVCCScan functions. The notable difference is that key/value pairs are
// returned raw, as a buffer of varint-prefixed slices, alternating from key
// to value, where numKVs specifies the number of pairs in the buffer.
//
// There is little reason to use this function directly. Use the package-level
// MVCCScan, or one of its variants, instead.
MVCCScan(start, end roachpb.Key, max int64, timestamp hlc.Timestamp,
txn *roachpb.Transaction, consistent, reverse, tombstone bool,
MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData []byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error)
// SetUpperBound installs a new upper bound for this iterator.
SetUpperBound(roachpb.Key)
Expand Down
105 changes: 28 additions & 77 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,7 @@ func MVCCDeleteRange(
// transaction with a newer timestamp, we need to use the max timestamp for
// scan.
kvs, resumeSpan, _, err := MVCCScan(
ctx, engine, key, endKey, max, hlc.MaxTimestamp, true /* consistent */, txn)
ctx, engine, key, endKey, max, hlc.MaxTimestamp, MVCCScanOptions{Txn: txn})
if err != nil {
return nil, nil, 0, err
}
Expand Down Expand Up @@ -1675,17 +1675,12 @@ func MVCCDeleteRange(
func mvccScanToKvs(
ctx context.Context,
iter Iterator,
key,
endKey roachpb.Key,
key, endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
tombstones bool,
txn *roachpb.Transaction,
reverse bool,
opts MVCCScanOptions,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
kvData, numKVs, resumeSpan, intents, err := iter.MVCCScan(
key, endKey, max, timestamp, txn, consistent, reverse, tombstones)
kvData, numKVs, resumeSpan, intents, err := iter.MVCCScan(key, endKey, max, timestamp, opts)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -1737,6 +1732,20 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) {
return intents, nil
}

// MVCCScanOptions bundles options for the MVCCScan family of functions.
type MVCCScanOptions struct {
// TODO(benesch): The max parameter should be moved into this struct. Its
// semantics make that tricky, though, as the zero value for this struct
// naturally represents an unbounded scan, but a max of zero currently means
// to return no results.

// See the documentation for MVCCScan for information on these parameters.
Inconsistent bool
Tombstones bool
Reverse bool
Txn *roachpb.Transaction
}

// MVCCScan scans the key range [key, endKey) in the provided engine up to some
// maximum number of results in ascending order. If it hits max, it returns a
// "resume span" to be used in the next call to this function. If the limit is
Expand Down Expand Up @@ -1766,67 +1775,28 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) {
func MVCCScan(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
key, endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
opts MVCCScanOptions,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return mvccScanToKvs(ctx, iter, key, endKey, max, timestamp,
consistent, false /* tombstones */, txn, false /* reverse */)
return mvccScanToKvs(ctx, iter, key, endKey, max, timestamp, opts)
}

// MVCCScanToBytes is like MVCCScan, but it returns the results in a byte array.
func MVCCScanToBytes(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) ([]byte, int64, *roachpb.Span, []roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return iter.MVCCScan(key, endKey, max, timestamp, txn, consistent, false /* reverse */, false /* tombstones */)
}

// MVCCReverseScan is like MVCCScan, but it returns keys in descending order.
func MVCCReverseScan(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return mvccScanToKvs(ctx, iter, key, endKey, max, timestamp,
consistent, false /* tombstones */, txn, true /* reverse */)
}

// MVCCReverseScanToBytes is like MVCCReverseScan, but it returns the results
// in a byte array.
func MVCCReverseScanToBytes(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
key, endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
opts MVCCScanOptions,
) ([]byte, int64, *roachpb.Span, []roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return iter.MVCCScan(key, endKey, max, timestamp, txn, consistent, true /* reverse */, false /* tombstones */)
return iter.MVCCScan(key, endKey, max, timestamp, opts)
}

// MVCCIterate iterates over the key range [start,end). At each step of the
Expand All @@ -1838,38 +1808,19 @@ func MVCCIterate(
engine Reader,
key, endKey roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
tombstones bool,
txn *roachpb.Transaction,
reverse bool,
opts MVCCScanOptions,
f func(roachpb.KeyValue) (bool, error),
) ([]roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return MVCCIterateUsingIter(ctx, iter, key, endKey, timestamp, consistent,
tombstones, txn, reverse, f)
}

// MVCCIterateUsingIter iterates over the key range [start,end) using the
// supplied iterator. See comments for MVCCIterate.
func MVCCIterateUsingIter(
ctx context.Context,
iter Iterator,
startKey, endKey roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
tombstones bool,
txn *roachpb.Transaction,
reverse bool,
f func(roachpb.KeyValue) (bool, error),
) ([]roachpb.Intent, error) {
var intents []roachpb.Intent
var wiErr error

for {
const maxKeysPerScan = 1000
kvs, resume, newIntents, err := mvccScanToKvs(
ctx, iter, startKey, endKey, maxKeysPerScan, timestamp, consistent, tombstones, txn, reverse)
ctx, iter, key, endKey, maxKeysPerScan, timestamp, opts)
if err != nil {
switch tErr := err.(type) {
case *roachpb.WriteIntentError:
Expand Down Expand Up @@ -1910,10 +1861,10 @@ func MVCCIterateUsingIter(
if resume == nil {
break
}
if reverse {
if opts.Reverse {
endKey = resume.EndKey
} else {
startKey = resume.Key
key = resume.Key
}
}

Expand Down
Loading

0 comments on commit 1eccba3

Please sign in to comment.