diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 7f778606a332..ffa57985415f 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -46,7 +46,11 @@ func Get( reply.Value = val if h.ReadConsistency == roachpb.READ_UNCOMMITTED { var intentVals []roachpb.KeyValue - intentVals, err = CollectIntentRows(ctx, reader, cArgs, intents) + // NOTE: MVCCGet uses a Prefix iterator, so we want to use one in + // CollectIntentRows as well so that we're guaranteed to use the same + // cached iterator and observe a consistent snapshot of the engine. + const usePrefixIter = true + intentVals, err = CollectIntentRows(ctx, reader, usePrefixIter, intents) if err == nil { switch len(intentVals) { case 0: diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 5edc2b2c698b..1757c90f1774 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -76,7 +76,11 @@ func ReverseScan( } if h.ReadConsistency == roachpb.READ_UNCOMMITTED { - reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, scanRes.Intents) + // NOTE: MVCCScan doesn't use a Prefix iterator, so we don't want to use + // one in CollectIntentRows either so that we're guaranteed to use the + // same cached iterator and observe a consistent snapshot of the engine. + const usePrefixIter = false + reply.IntentRows, err = CollectIntentRows(ctx, reader, usePrefixIter, scanRes.Intents) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index fd10986f6d07..090946f91b86 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -76,7 +76,11 @@ func Scan( } if h.ReadConsistency == roachpb.READ_UNCOMMITTED { - reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, scanRes.Intents) + // NOTE: MVCCScan doesn't use a Prefix iterator, so we don't want to use + // one in CollectIntentRows either so that we're guaranteed to use the + // same cached iterator and observe a consistent snapshot of the engine. + const usePrefixIter = false + reply.IntentRows, err = CollectIntentRows(ctx, reader, usePrefixIter, scanRes.Intents) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/intent.go b/pkg/kv/kvserver/batcheval/intent.go index 418ac41f444e..e93e2743c57b 100644 --- a/pkg/kv/kvserver/batcheval/intent.go +++ b/pkg/kv/kvserver/batcheval/intent.go @@ -17,39 +17,91 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" ) -// CollectIntentRows collects the key-value pairs for each intent provided. It -// also verifies that the ReturnIntents option is allowed. +// CollectIntentRows collects the provisional key-value pairs for each intent +// provided. // -// TODO(nvanbenschoten): mvccGetInternal should return the intent values directly -// when ReturnIntents is true. Since this will initially only be used for -// RangeLookups and since this is how they currently collect intent values, this -// is ok for now. +// The method accepts a reader and flag indicating whether a prefix iterator +// should be used when creating an iterator from the reader. This flexibility +// works around a limitation of the Engine.NewReadOnly interface where prefix +// iterators and non-prefix iterators pulled from the same read-only engine are +// not guaranteed to provide a consistent snapshot of the underlying engine. +// This function expects to be able to retrieve the corresponding provisional +// value for each of the provided intents. As such, it is critical that it +// observes the engine in the same state that it was in when the intent keys +// were originally collected. Because of this, callers are tasked with +// indicating whether the intents were originally collected using a prefix +// iterator or not. +// +// TODO(nvanbenschoten): remove the usePrefixIter complexity when we're fully on +// Pebble and can guarantee that all iterators created from a read-only engine +// are consistent. +// +// TODO(nvanbenschoten): mvccGetInternal should return the intent values +// directly when reading at the READ_UNCOMMITTED consistency level. Since this +// is only currently used for range lookups and when watching for a merge (both +// of which are off the hot path), this is ok for now. func CollectIntentRows( - ctx context.Context, reader storage.Reader, cArgs CommandArgs, intents []roachpb.Intent, + ctx context.Context, reader storage.Reader, usePrefixIter bool, intents []roachpb.Intent, ) ([]roachpb.KeyValue, error) { if len(intents) == 0 { return nil, nil } res := make([]roachpb.KeyValue, 0, len(intents)) - for _, intent := range intents { + for i := range intents { + kv, err := readProvisionalVal(ctx, reader, usePrefixIter, &intents[i]) + if err != nil { + switch t := err.(type) { + case *roachpb.WriteIntentError: + log.Fatalf(ctx, "unexpected %T in CollectIntentRows: %+v", t, t) + case *roachpb.ReadWithinUncertaintyIntervalError: + log.Fatalf(ctx, "unexpected %T in CollectIntentRows: %+v", t, t) + } + return nil, err + } + if kv.Value.IsPresent() { + res = append(res, kv) + } + } + return res, nil +} + +// readProvisionalVal retrieves the provisional value for the provided intent +// using the reader and the specified access method (i.e. with or without the +// use of a prefix iterator). The function returns an empty KeyValue if the +// intent is found to contain a deletion tombstone as its provisional value. +func readProvisionalVal( + ctx context.Context, reader storage.Reader, usePrefixIter bool, intent *roachpb.Intent, +) (roachpb.KeyValue, error) { + if usePrefixIter { val, _, err := storage.MVCCGetAsTxn( ctx, reader, intent.Key, intent.Txn.WriteTimestamp, intent.Txn, ) if err != nil { - return nil, err + return roachpb.KeyValue{}, err } if val == nil { // Intent is a deletion. - continue + return roachpb.KeyValue{}, nil } - res = append(res, roachpb.KeyValue{ - Key: intent.Key, - Value: *val, - }) + return roachpb.KeyValue{Key: intent.Key, Value: *val}, nil } - return res, nil + res, err := storage.MVCCScanAsTxn( + ctx, reader, intent.Key, intent.Key.Next(), intent.Txn.WriteTimestamp, intent.Txn, + ) + if err != nil { + return roachpb.KeyValue{}, err + } + if len(res.KVs) > 1 { + log.Fatalf(ctx, "multiple key-values returned from single-key scan: %+v", res.KVs) + } else if len(res.KVs) == 0 { + // Intent is a deletion. + return roachpb.KeyValue{}, nil + } + return res.KVs[0], nil + } // acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the diff --git a/pkg/kv/kvserver/batcheval/intent_test.go b/pkg/kv/kvserver/batcheval/intent_test.go new file mode 100644 index 000000000000..6e97230f71d4 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/intent_test.go @@ -0,0 +1,163 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// instrumentedEngine wraps a storage.Engine and allows for various methods in +// the interface to be instrumented for testing purposes. +type instrumentedEngine struct { + storage.Engine + + onNewIterator func(storage.IterOptions) + // ... can be extended ... +} + +func (ie *instrumentedEngine) NewIterator(opts storage.IterOptions) storage.Iterator { + if ie.onNewIterator != nil { + ie.onNewIterator(opts) + } + return ie.Engine.NewIterator(opts) +} + +// TestCollectIntentsUsesSameIterator tests that all uses of CollectIntents +// (currently only by READ_UNCOMMITTED Gets, Scans, and ReverseScans) use the +// same cached iterator (prefix or non-prefix) for their initial read and their +// provisional value collection for any intents they find. +func TestCollectIntentsUsesSameIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + key := roachpb.Key("key") + ts := hlc.Timestamp{WallTime: 123} + header := roachpb.Header{ + Timestamp: ts, + ReadConsistency: roachpb.READ_UNCOMMITTED, + } + + testCases := []struct { + name string + run func(*testing.T, storage.ReadWriter) (intents []roachpb.KeyValue, _ error) + expPrefixIters int + expNonPrefixIters int + }{ + { + name: "get", + run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) { + req := &roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: key}, + } + var resp roachpb.GetResponse + if _, err := Get(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil { + return nil, err + } + if resp.IntentValue == nil { + return nil, nil + } + return []roachpb.KeyValue{{Key: key, Value: *resp.IntentValue}}, nil + }, + expPrefixIters: 2, + expNonPrefixIters: 0, + }, + { + name: "scan", + run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) { + req := &roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()}, + } + var resp roachpb.ScanResponse + if _, err := Scan(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil { + return nil, err + } + return resp.IntentRows, nil + }, + expPrefixIters: 0, + expNonPrefixIters: 2, + }, + { + name: "reverse scan", + run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) { + req := &roachpb.ReverseScanRequest{ + RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()}, + } + var resp roachpb.ReverseScanResponse + if _, err := ReverseScan(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil { + return nil, err + } + return resp.IntentRows, nil + }, + expPrefixIters: 0, + expNonPrefixIters: 2, + }, + } + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + // Test with and without deletion intents. If a READ_UNCOMMITTED request + // encounters an intent whose provisional value is a deletion tombstone, + // the request should ignore the intent and should not return any + // corresponding intent row. + testutils.RunTrueAndFalse(t, "deletion intent", func(t *testing.T, delete bool) { + db := &instrumentedEngine{Engine: storage.NewDefaultInMem()} + defer db.Close() + + // Write an intent. + val := roachpb.MakeValueFromBytes([]byte("val")) + txn := roachpb.MakeTransaction("test", key, roachpb.NormalUserPriority, ts, 0) + var err error + if delete { + err = storage.MVCCDelete(ctx, db, nil, key, ts, &txn) + } else { + err = storage.MVCCPut(ctx, db, nil, key, ts, val, &txn) + } + require.NoError(t, err) + + // Instrument iterator creation, count prefix vs. non-prefix iters. + var prefixIters, nonPrefixIters int + db.onNewIterator = func(opts storage.IterOptions) { + if opts.Prefix { + prefixIters++ + } else { + nonPrefixIters++ + } + } + + intents, err := c.run(t, db) + require.NoError(t, err) + + // Assert proper intent values. + if delete { + require.Len(t, intents, 0) + } else { + expIntentVal := val + expIntentVal.Timestamp = ts + expIntentKeyVal := roachpb.KeyValue{Key: key, Value: expIntentVal} + require.Len(t, intents, 1) + require.Equal(t, expIntentKeyVal, intents[0]) + } + + // Assert proper iterator use. + require.Equal(t, c.expPrefixIters, prefixIters) + require.Equal(t, c.expNonPrefixIters, nonPrefixIters) + require.Equal(t, c.expNonPrefixIters, nonPrefixIters) + }) + }) + } +} diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index de80c58fca8b..8767c8df8b6b 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -364,10 +364,23 @@ type Engine interface { // this engine. Batched engines accumulate all mutations and apply // them atomically on a call to Commit(). NewBatch() Batch - // NewReadOnly returns a new instance of a ReadWriter that wraps - // this engine. This wrapper panics when unexpected operations (e.g., write + // NewReadOnly returns a new instance of a ReadWriter that wraps this + // engine. This wrapper panics when unexpected operations (e.g., write // operations) are executed on it and caches iterators to avoid the overhead // of creating multiple iterators for batched reads. + // + // All iterators created from a read-only engine with the same "Prefix" + // option are guaranteed to provide a consistent snapshot of the underlying + // engine. For instance, two prefix iterators created from a read-only + // engine will provide a consistent snapshot. Similarly, two non-prefix + // iterators created from a read-only engine will provide a consistent + // snapshot. However, a prefix iterator and a non-prefix iterator created + // from a read-only engine are not guaranteed to provide a consistent view + // of the underlying engine. + // + // TODO(nvanbenschoten): remove this complexity when we're fully on Pebble + // and can guarantee that all iterators created from a read-only engine are + // consistent. To do this, we will want to add an Iterator.Clone method. NewReadOnly() ReadWriter // NewWriteOnlyBatch returns a new instance of a batched engine which wraps // this engine. A write-only batch accumulates all mutations and applies them diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 233235d60722..bb56340a28fd 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2544,6 +2544,28 @@ func MVCCScanToBytes( return mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts) } +// MVCCScanAsTxn constructs a temporary transaction from the given transaction +// metadata and calls MVCCScan as that transaction. This method is required only +// for reading intents of a transaction when only its metadata is known and +// should rarely be used. +// +// The read is carried out without the chance of uncertainty restarts. +func MVCCScanAsTxn( + ctx context.Context, + reader Reader, + key, endKey roachpb.Key, + timestamp hlc.Timestamp, + txnMeta enginepb.TxnMeta, +) (MVCCScanResult, error) { + return MVCCScan(ctx, reader, key, endKey, timestamp, MVCCScanOptions{ + Txn: &roachpb.Transaction{ + TxnMeta: txnMeta, + Status: roachpb.PENDING, + ReadTimestamp: txnMeta.WriteTimestamp, + MaxTimestamp: txnMeta.WriteTimestamp, + }}) +} + // MVCCIterate iterates over the key range [start,end). At each step of the // iteration, f() is invoked with the current key/value pair. If f returns // true (done) or an error, the iteration stops and the error is propagated.