Skip to content

Commit

Permalink
kv: don't mix prefix and non-prefix iters when collecting intents
Browse files Browse the repository at this point in the history
Fixes cockroachdb#47219.

This commit addresses the bug diagnosed and explained in cockroachdb#47219. In that
issue, we saw an assertion failure all the way up in the concurrency
manager because a READ_UNCOMMITTED scan was hitting a WriteIntentError,
which should not be possible. The root cause of this issue was that
READ_UNCOMMITTED scans were mixing prefix and non-prefix iterators
pulled from a read-only engine between the time that they were
collecting intent keys and they were returning to fetch the provisional
values for those keys. This mixing of iterators did not guarantee that
the two stages of the operation would observe a consistent snapshot of
the underlying engine, and because the READ_UNCOMMITTED scans also did
not acquire latches, writes were able to slip in and change the intent
while the scan wasn't looking. This caused the scan to throw a
WriteIntentError for the new intent transaction, which badly confused
other parts of the system (rightfully so).

This commit fixes this issue in a few different ways:
1. it ensures that we always use the same iterator type (prefix or non-prefix)
   when retrieving the provisional values for a collection of intents retrieved
   by an earlier scan during READ_UNCOMMITTED operations.
2. it adds an assertion inside of batcheval.CollectIntentRows that the
   function never returns a WriteIntentError. This would have caught the bug
   much more easily, especially back before we had the concurrency manager
   assertion and this bug could have materialized as stuck range lookups and
   potentially even deadlocked splits due to the dependency cycle between
   those two operations.
3. it documents the limited guarantees that read-only engines provide with
   respect to consistent engine snapshots across iterator instances.

We'll want to backport this fix as far back as possible. It won't crash
earlier releases of Cockroach, but as stated above, it might cause even
more disastrous results. REMINDER: when backporting, remember to change
the release note.

Release notes (bug fix): a bug that could cause Cockroach processes to
crash due to an assertion failure with the text "expected latches held,
found none" has been fixed.

Release justification: fixes a high-priority bug in existing
functionality. The bug became louder (now crashes servers) due to recent
changes that added new assertions into the code.
  • Loading branch information
nvanbenschoten committed Apr 9, 2020
1 parent fe16fd5 commit c9759fc
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 20 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func Get(
reply.Value = val
if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
var intentVals []roachpb.KeyValue
intentVals, err = CollectIntentRows(ctx, reader, cArgs, intents)
// NOTE: MVCCGet uses a Prefix iterator, so we want to use one in
// CollectIntentRows as well so that we're guaranteed to use the same
// cached iterator and observe a consistent snapshot of the engine.
const usePrefixIter = true
intentVals, err = CollectIntentRows(ctx, reader, usePrefixIter, intents)
if err == nil {
switch len(intentVals) {
case 0:
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func ReverseScan(
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, scanRes.Intents)
// NOTE: MVCCScan doesn't use a Prefix iterator, so we don't want to use
// one in CollectIntentRows either so that we're guaranteed to use the
// same cached iterator and observe a consistent snapshot of the engine.
const usePrefixIter = false
reply.IntentRows, err = CollectIntentRows(ctx, reader, usePrefixIter, scanRes.Intents)
if err != nil {
return result.Result{}, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func Scan(
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, scanRes.Intents)
// NOTE: MVCCScan doesn't use a Prefix iterator, so we don't want to use
// one in CollectIntentRows either so that we're guaranteed to use the
// same cached iterator and observe a consistent snapshot of the engine.
const usePrefixIter = false
reply.IntentRows, err = CollectIntentRows(ctx, reader, usePrefixIter, scanRes.Intents)
if err != nil {
return result.Result{}, err
}
Expand Down
82 changes: 67 additions & 15 deletions pkg/kv/kvserver/batcheval/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,91 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// CollectIntentRows collects the key-value pairs for each intent provided. It
// also verifies that the ReturnIntents option is allowed.
// CollectIntentRows collects the provisional key-value pairs for each intent
// provided.
//
// TODO(nvanbenschoten): mvccGetInternal should return the intent values directly
// when ReturnIntents is true. Since this will initially only be used for
// RangeLookups and since this is how they currently collect intent values, this
// is ok for now.
// The method accepts a reader and flag indicating whether a prefix iterator
// should be used when creating an iterator from the reader. This flexibility
// works around a limitation of the Engine.NewReadOnly interface where prefix
// iterators and non-prefix iterators pulled from the same read-only engine are
// not guaranteed to provide a consistent snapshot of the underlying engine.
// This function expects to be able to retrieve the corresponding provisional
// value for each of the provided intents. As such, it is critical that it
// observes the engine in the same state that it was in when the intent keys
// were originally collected. Because of this, callers are tasked with
// indicating whether the intents were originally collected using a prefix
// iterator or not.
//
// TODO(nvanbenschoten): remove the usePrefixIter complexity when we're fully on
// Pebble and can guarantee that all iterators created from a read-only engine
// are consistent.
//
// TODO(nvanbenschoten): mvccGetInternal should return the intent values
// directly when reading at the READ_UNCOMMITTED consistency level. Since this
// is only currently used for range lookups and when watching for a merge (both
// of which are off the hot path), this is ok for now.
func CollectIntentRows(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, intents []roachpb.Intent,
ctx context.Context, reader storage.Reader, usePrefixIter bool, intents []roachpb.Intent,
) ([]roachpb.KeyValue, error) {
if len(intents) == 0 {
return nil, nil
}
res := make([]roachpb.KeyValue, 0, len(intents))
for _, intent := range intents {
for i := range intents {
kv, err := readProvisionalVal(ctx, reader, usePrefixIter, &intents[i])
if err != nil {
switch t := err.(type) {
case *roachpb.WriteIntentError:
log.Fatalf(ctx, "unexpected %T in CollectIntentRows: %+v", t, t)
case *roachpb.ReadWithinUncertaintyIntervalError:
log.Fatalf(ctx, "unexpected %T in CollectIntentRows: %+v", t, t)
}
return nil, err
}
if kv.Value.IsPresent() {
res = append(res, kv)
}
}
return res, nil
}

// readProvisionalVal retrieves the provisional value for the provided intent
// using the reader and the specified access method (i.e. with or without the
// use of a prefix iterator). The function returns an empty KeyValue if the
// intent is found to contain a deletion tombstone as its provisional value.
func readProvisionalVal(
ctx context.Context, reader storage.Reader, usePrefixIter bool, intent *roachpb.Intent,
) (roachpb.KeyValue, error) {
if usePrefixIter {
val, _, err := storage.MVCCGetAsTxn(
ctx, reader, intent.Key, intent.Txn.WriteTimestamp, intent.Txn,
)
if err != nil {
return nil, err
return roachpb.KeyValue{}, err
}
if val == nil {
// Intent is a deletion.
continue
return roachpb.KeyValue{}, nil
}
res = append(res, roachpb.KeyValue{
Key: intent.Key,
Value: *val,
})
return roachpb.KeyValue{Key: intent.Key, Value: *val}, nil
}
return res, nil
res, err := storage.MVCCScanAsTxn(
ctx, reader, intent.Key, intent.Key.Next(), intent.Txn.WriteTimestamp, intent.Txn,
)
if err != nil {
return roachpb.KeyValue{}, err
}
if len(res.KVs) > 1 {
log.Fatalf(ctx, "multiple key-values returned from single-key scan: %+v", res.KVs)
} else if len(res.KVs) == 0 {
// Intent is a deletion.
return roachpb.KeyValue{}, nil
}
return res.KVs[0], nil

}

// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the
Expand Down
163 changes: 163 additions & 0 deletions pkg/kv/kvserver/batcheval/intent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// instrumentedEngine wraps a storage.Engine and allows for various methods in
// the interface to be instrumented for testing purposes.
type instrumentedEngine struct {
storage.Engine

onNewIterator func(storage.IterOptions)
// ... can be extended ...
}

func (ie *instrumentedEngine) NewIterator(opts storage.IterOptions) storage.Iterator {
if ie.onNewIterator != nil {
ie.onNewIterator(opts)
}
return ie.Engine.NewIterator(opts)
}

// TestCollectIntentsUsesSameIterator tests that all uses of CollectIntents
// (currently only by READ_UNCOMMITTED Gets, Scans, and ReverseScans) use the
// same cached iterator (prefix or non-prefix) for their initial read and their
// provisional value collection for any intents they find.
func TestCollectIntentsUsesSameIterator(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
key := roachpb.Key("key")
ts := hlc.Timestamp{WallTime: 123}
header := roachpb.Header{
Timestamp: ts,
ReadConsistency: roachpb.READ_UNCOMMITTED,
}

testCases := []struct {
name string
run func(*testing.T, storage.ReadWriter) (intents []roachpb.KeyValue, _ error)
expPrefixIters int
expNonPrefixIters int
}{
{
name: "get",
run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) {
req := &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
}
var resp roachpb.GetResponse
if _, err := Get(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil {
return nil, err
}
if resp.IntentValue == nil {
return nil, nil
}
return []roachpb.KeyValue{{Key: key, Value: *resp.IntentValue}}, nil
},
expPrefixIters: 2,
expNonPrefixIters: 0,
},
{
name: "scan",
run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) {
req := &roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
}
var resp roachpb.ScanResponse
if _, err := Scan(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil {
return nil, err
}
return resp.IntentRows, nil
},
expPrefixIters: 0,
expNonPrefixIters: 2,
},
{
name: "reverse scan",
run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) {
req := &roachpb.ReverseScanRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
}
var resp roachpb.ReverseScanResponse
if _, err := ReverseScan(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil {
return nil, err
}
return resp.IntentRows, nil
},
expPrefixIters: 0,
expNonPrefixIters: 2,
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
// Test with and without deletion intents. If a READ_UNCOMMITTED request
// encounters an intent whose provisional value is a deletion tombstone,
// the request should ignore the intent and should not return any
// corresponding intent row.
testutils.RunTrueAndFalse(t, "deletion intent", func(t *testing.T, delete bool) {
db := &instrumentedEngine{Engine: storage.NewDefaultInMem()}
defer db.Close()

// Write an intent.
val := roachpb.MakeValueFromBytes([]byte("val"))
txn := roachpb.MakeTransaction("test", key, roachpb.NormalUserPriority, ts, 0)
var err error
if delete {
err = storage.MVCCDelete(ctx, db, nil, key, ts, &txn)
} else {
err = storage.MVCCPut(ctx, db, nil, key, ts, val, &txn)
}
require.NoError(t, err)

// Instrument iterator creation, count prefix vs. non-prefix iters.
var prefixIters, nonPrefixIters int
db.onNewIterator = func(opts storage.IterOptions) {
if opts.Prefix {
prefixIters++
} else {
nonPrefixIters++
}
}

intents, err := c.run(t, db)
require.NoError(t, err)

// Assert proper intent values.
if delete {
require.Len(t, intents, 0)
} else {
expIntentVal := val
expIntentVal.Timestamp = ts
expIntentKeyVal := roachpb.KeyValue{Key: key, Value: expIntentVal}
require.Len(t, intents, 1)
require.Equal(t, expIntentKeyVal, intents[0])
}

// Assert proper iterator use.
require.Equal(t, c.expPrefixIters, prefixIters)
require.Equal(t, c.expNonPrefixIters, nonPrefixIters)
require.Equal(t, c.expNonPrefixIters, nonPrefixIters)
})
})
}
}
17 changes: 15 additions & 2 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,23 @@ type Engine interface {
// this engine. Batched engines accumulate all mutations and apply
// them atomically on a call to Commit().
NewBatch() Batch
// NewReadOnly returns a new instance of a ReadWriter that wraps
// this engine. This wrapper panics when unexpected operations (e.g., write
// NewReadOnly returns a new instance of a ReadWriter that wraps this
// engine. This wrapper panics when unexpected operations (e.g., write
// operations) are executed on it and caches iterators to avoid the overhead
// of creating multiple iterators for batched reads.
//
// All iterators created from a read-only engine with the same "Prefix"
// option are guaranteed to provide a consistent snapshot of the underlying
// engine. For instance, two prefix iterators created from a read-only
// engine will provide a consistent snapshot. Similarly, two non-prefix
// iterators created from a read-only engine will provide a consistent
// snapshot. However, a prefix iterator and a non-prefix iterator created
// from a read-only engine are not guaranteed to provide a consistent view
// of the underlying engine.
//
// TODO(nvanbenschoten): remove this complexity when we're fully on Pebble
// and can guarantee that all iterators created from a read-only engine are
// consistent. To do this, we will want to add an Iterator.Clone method.
NewReadOnly() ReadWriter
// NewWriteOnlyBatch returns a new instance of a batched engine which wraps
// this engine. A write-only batch accumulates all mutations and applies them
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,28 @@ func MVCCScanToBytes(
return mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts)
}

// MVCCScanAsTxn constructs a temporary transaction from the given transaction
// metadata and calls MVCCScan as that transaction. This method is required only
// for reading intents of a transaction when only its metadata is known and
// should rarely be used.
//
// The read is carried out without the chance of uncertainty restarts.
func MVCCScanAsTxn(
ctx context.Context,
reader Reader,
key, endKey roachpb.Key,
timestamp hlc.Timestamp,
txnMeta enginepb.TxnMeta,
) (MVCCScanResult, error) {
return MVCCScan(ctx, reader, key, endKey, timestamp, MVCCScanOptions{
Txn: &roachpb.Transaction{
TxnMeta: txnMeta,
Status: roachpb.PENDING,
ReadTimestamp: txnMeta.WriteTimestamp,
MaxTimestamp: txnMeta.WriteTimestamp,
}})
}

// MVCCIterate iterates over the key range [start,end). At each step of the
// iteration, f() is invoked with the current key/value pair. If f returns
// true (done) or an error, the iteration stops and the error is propagated.
Expand Down

0 comments on commit c9759fc

Please sign in to comment.