Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
44641: engine: return target bytes consumed by scans r=petermattis,itsbilal a=tbg

To thread TargetBytes into the KV API, we will need to teach DistSender
to compute the remaining TargetBytes for each Range it's scanning. This
means it has to know how much was consumed by each individual response,
and so MVCC should return that information.

It also gives some additional test coverage, which in itself is already
enough of a reason to do it.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 4, 2020
2 parents 50b12ad + ac1c8b2 commit aeee3fe
Show file tree
Hide file tree
Showing 18 changed files with 252 additions and 180 deletions.
2 changes: 2 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ typedef struct {
int32_t len;
// count is the number of key/value pairs in bufs.
int32_t count;
// bytes is the number of bytes (as measured by TargetSize) in bufs.
int64_t bytes;
} DBChunkedBuffer;

// DBScanResults contains the key/value pairs and intents encoded
Expand Down
1 change: 1 addition & 0 deletions c-deps/libroach/mvcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ template <bool reverse> class mvccScanner {
if (kvs_->Count() > 0) {
kvs_->GetChunks(&results_.data.bufs, &results_.data.len);
results_.data.count = kvs_->Count();
results_.data.bytes = kvs_->NumBytes();
}
if (intents_->Count() > 0) {
results_.intents = ToDBSlice(intents_->Data());
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestRangeSplitMeta(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
if _, _, _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{}); err != nil {
if _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{}); err != nil {
return errors.Errorf("failed to verify no dangling intents: %s", err)
}
return nil
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
// for timing of finishing the test writer and a possibly-ongoing
// asynchronous split.
testutils.SucceedsSoon(t, func() error {
if _, _, _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{}); err != nil {
if _, err := engine.MVCCScan(ctx, s.Eng, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{}); err != nil {
return errors.Errorf("failed to verify no dangling intents: %s", err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ func TestBootstrapCluster(t *testing.T) {
}

// Scan the complete contents of the local database directly from the engine.
rows, _, _, err := engine.MVCCScan(ctx, e, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{})
res, err := engine.MVCCScan(ctx, e, keys.LocalMax, roachpb.KeyMax, math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{})
if err != nil {
t.Fatal(err)
}
var foundKeys keySlice
for _, kv := range rows {
for _, kv := range res.KVs {
foundKeys = append(foundKeys, kv.Key)
}
var expectedKeys = keySlice{
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/addressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ func TestUpdateRangeAddressing(t *testing.T) {
// to RocksDB will be asynchronous.
var kvs []roachpb.KeyValue
testutils.SucceedsSoon(t, func() error {
var err error
kvs, _, _, err = engine.MVCCScan(ctx, store.Engine(), keys.MetaMin, keys.MetaMax,
res, err := engine.MVCCScan(ctx, store.Engine(), keys.MetaMin, keys.MetaMax,
math.MaxInt64, hlc.MaxTimestamp, engine.MVCCScanOptions{})
if err != nil {
// Wait for the intent to be resolved.
Expand All @@ -176,6 +175,7 @@ func TestUpdateRangeAddressing(t *testing.T) {
}
t.Fatal(err)
}
kvs = res.KVs
return nil
})
metas := metaSlice{}
Expand Down
25 changes: 10 additions & 15 deletions pkg/storage/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ func ReverseScan(
h := cArgs.Header
reply := resp.(*roachpb.ReverseScanResponse)

var res engine.MVCCScanResult
var err error
var intents []roachpb.Intent
var resumeSpan *roachpb.Span

switch args.ScanFormat {
case roachpb.BATCH_RESPONSE:
var kvData [][]byte
var numKvs int64
kvData, numKvs, resumeSpan, intents, err = engine.MVCCScanToBytes(
res, err = engine.MVCCScanToBytes(
ctx, reader, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp,
engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Expand All @@ -52,11 +49,9 @@ func ReverseScan(
if err != nil {
return result.Result{}, err
}
reply.NumKeys = numKvs
reply.BatchResponses = kvData
reply.BatchResponses = res.KVData
case roachpb.KEY_VALUES:
var rows []roachpb.KeyValue
rows, resumeSpan, intents, err = engine.MVCCScan(
res, err = engine.MVCCScan(
ctx, reader, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp, engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
Expand All @@ -65,19 +60,19 @@ func ReverseScan(
if err != nil {
return result.Result{}, err
}
reply.NumKeys = int64(len(rows))
reply.Rows = rows
reply.Rows = res.KVs
default:
panic(fmt.Sprintf("Unknown scanFormat %d", args.ScanFormat))
}

if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.NumKeys = res.NumKeys
if res.ResumeSpan != nil {
reply.ResumeSpan = res.ResumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, intents)
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, res.Intents)
}
return result.FromEncounteredIntents(intents), err
return result.FromEncounteredIntents(res.Intents), err
}
25 changes: 10 additions & 15 deletions pkg/storage/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ func Scan(
h := cArgs.Header
reply := resp.(*roachpb.ScanResponse)

var res engine.MVCCScanResult
var err error
var intents []roachpb.Intent
var resumeSpan *roachpb.Span

switch args.ScanFormat {
case roachpb.BATCH_RESPONSE:
var kvData [][]byte
var numKvs int64
kvData, numKvs, resumeSpan, intents, err = engine.MVCCScanToBytes(
res, err = engine.MVCCScanToBytes(
ctx, reader, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp,
engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Expand All @@ -51,31 +48,29 @@ func Scan(
if err != nil {
return result.Result{}, err
}
reply.NumKeys = numKvs
reply.BatchResponses = kvData
reply.BatchResponses = res.KVData
case roachpb.KEY_VALUES:
var rows []roachpb.KeyValue
rows, resumeSpan, intents, err = engine.MVCCScan(
res, err = engine.MVCCScan(
ctx, reader, args.Key, args.EndKey, cArgs.MaxKeys, h.Timestamp, engine.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
})
if err != nil {
return result.Result{}, err
}
reply.NumKeys = int64(len(rows))
reply.Rows = rows
reply.Rows = res.KVs
default:
panic(fmt.Sprintf("Unknown scanFormat %d", args.ScanFormat))
}
reply.NumKeys = res.NumKeys

if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
if res.ResumeSpan != nil {
reply.ResumeSpan = res.ResumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, intents)
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, res.Intents)
}
return result.FromEncounteredIntents(intents), err
return result.FromEncounteredIntents(res.Intents), err
}
6 changes: 3 additions & 3 deletions pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ func runMVCCScan(ctx context.Context, b *testing.B, emk engineMaker, opts benchS
endKey = endKey.Next()
walltime := int64(5 * (rand.Int31n(int32(opts.numVersions)) + 1))
ts := hlc.Timestamp{WallTime: walltime}
kvs, _, _, err := MVCCScan(ctx, eng, startKey, endKey, int64(opts.numRows), ts, MVCCScanOptions{
res, err := MVCCScan(ctx, eng, startKey, endKey, int64(opts.numRows), ts, MVCCScanOptions{
Reverse: opts.reverse,
})
if err != nil {
b.Fatalf("failed scan: %+v", err)
}
if len(kvs) != opts.numRows {
b.Fatalf("failed to scan: %d != %d", len(kvs), opts.numRows)
if len(res.KVs) != opts.numRows {
b.Fatalf("failed to scan: %d != %d", len(res.KVs), opts.numRows)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type MVCCIterator interface {
// in the buffer.
MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error)
) (MVCCScanResult, error)
}

// IterOptions contains options used to create an Iterator.
Expand Down
Loading

0 comments on commit aeee3fe

Please sign in to comment.