Skip to content

Commit

Permalink
Merge #59829
Browse files Browse the repository at this point in the history
59829: *: configure the use of separated intents in a cluster r=sumeerbhola a=sumeerbhola

This is done using a combination of the cluster version,
which prevents separated intents until any node is running
below the SeparatedIntents version, and then by the
storage.transaction.separated_intents.enabled cluster
setting which is on by default.

This replaces the temporary consts that were being used
previously for configuration.

The safety of transition from 20.2 to 21.1 is
discussed in:
- comments in intent_reader_writer.go (where the cluster
  setting is declared).
- the new Writer.SafeToWriteSeparatedIntents which
  gates the use of MVCCMetadata.TxnDidNotUpdateMeta
  so that replicas do not diverge across new and
  old nodes that do not understand this field.
- comments in store_snapshot.go regarding why snapshots
  exchanged between 20.2 and 21.1 nodes are fine
  even though the latter will iterate through the
  lock table key space for outgoing snapshots and
  construct ssts for deleting the lock table key
  space for incoming snapshots.

The only non-test code that cares about this configuration
lives in the storage package and accesses the config
via the optional cluster.Settings provided when
constructing a Pebble Engine.

Affected tests are in 3 categories:
- tests that had different datadriven test files for
  different configuration settings. These now iterate
  through all the configurations. This is a small
  subset of storage tests.
- tests that were adjusting their expected values
  based on the configuration settings. These now
  randomize the configuration settings.
- tests that were ignorant of the configuration
  settings. These also randomize the settings, which
  is done at different levels of abstraction depending
  on the kind of test.
  - When constructing the Engine in the test. For tests
    that directly construct an Engine or indirectly
    via LocalTestCluster or kvserver.testContext.
    This allows for manipulating both the version
    and the cluster setting.
  - When constructing cluster.Settings for
    server.TestServerFactory. This only manipulates
    the cluster setting.

Informs #41720

Release note (ops change): adds the
storage.transaction.separated_intents.enabled cluster
setting, which enables separated intents by default.

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Feb 12, 2021
2 parents 9f97177 + cee3bcf commit 98beba6
Show file tree
Hide file tree
Showing 71 changed files with 738 additions and 538 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-24</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-26</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func slurpSSTablesLatestKey(
) []storage.MVCCKeyValue {
start, end := storage.MVCCKey{Key: keys.LocalMax}, storage.MVCCKey{Key: keys.MaxKey}

e := storage.NewDefaultInMem()
e := storage.NewDefaultInMemForTesting()
defer e.Close()
batch := e.NewBatch()
defer batch.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestWriteBatchMVCCStats(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
e := storage.NewDefaultInMem()
e := storage.NewDefaultInMemForTesting()
defer e.Close()

var batch storage.RocksDBBatchBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func assertEqualKVs(
var prevKey roachpb.Key
var valueTimestampTuples []roachpb.KeyValue
var err error
for it.SeekGE(storage.MVCCKey{}); ; it.Next() {
for it.SeekGE(storage.MVCCKey{Key: key}); ; it.Next() {
if ok, err := it.Valid(); !ok {
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ const (
// using the replicated legacy TruncatedState. It's also used in asserting
// that no replicated truncated state representation is found.
PostTruncatedAndRangeAppliedStateMigration
// SeparatedIntents allows the writing of separated intents/locks.
SeparatedIntents

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -389,6 +391,10 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 24},
},
{
Key: SeparatedIntents,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 26},
},
// Step (2): Add new versions here.
})

Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/kv/kvserver/abortspan/abortspan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
func createTestAbortSpan(
t *testing.T, rangeID roachpb.RangeID, stopper *stop.Stopper,
) (*AbortSpan, storage.Engine) {
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
return New(rangeID), eng
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestSpanSetBatchBoundaries(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

var ss spanset.SpanSet
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
func TestSpanSetBatchTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

var ss spanset.SpanSet
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestSpanSetBatchTimestamps(t *testing.T) {
func TestSpanSetIteratorTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

var ss spanset.SpanSet
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {
func TestSpanSetNonMVCCBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

var ss spanset.SpanSet
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestSpanSetNonMVCCBatch(t *testing.T) {
func TestSpanSetMVCCResolveWriteIntentRangeUsingIter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

// createTestPebbleEngine returns a new in-memory Pebble storage engine.
func createTestPebbleEngine() storage.Engine {
return storage.NewInMem(context.Background(), roachpb.Attributes{}, 1<<20)
return storage.NewInMemForTesting(context.Background(), roachpb.Attributes{}, 1<<20)
}

var engineImpls = []struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_clear_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestCmdClearRangeBytesThreshold(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx := context.Background()
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

var stats enginepb.MVCCStats
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCmdClearRangeDeadline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

var stats enginepb.MVCCStats
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
Expand Down Expand Up @@ -974,7 +974,7 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
defer TestingSetTxnAutoGC(false)()

testutils.RunTrueAndFalse(t, "withStoredTxnRecord", func(t *testing.T, storeTxnBeforeEndTxn bool) {
db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {

testutils.RunTrueAndFalse(t, "epoch", func(t *testing.T, epoch bool) {
ctx := context.Background()
db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_recover_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestRecoverTxn(t *testing.T) {
txn.InFlightWrites = []roachpb.SequencedWrite{{Key: k2, Sequence: 0}}

testutils.RunTrueAndFalse(t, "missing write", func(t *testing.T, missingWrite bool) {
db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

// Write the transaction record.
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestRecoverTxnRecordChanged(t *testing.T) {
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

// Write the modified transaction record, simulating a concurrent
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
ts3 := hlc.Timestamp{WallTime: 3}
ts4 := hlc.Timestamp{WallTime: 4}

db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

// Create an sstable containing an unresolved intent.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestDeclareKeysResolveIntent(t *testing.T) {
},
}
ctx := context.Background()
engine := storage.NewDefaultInMem()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()
testutils.RunTrueAndFalse(t, "ranged", func(t *testing.T, ranged bool) {
for _, test := range tests {
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
}

testutils.RunTrueAndFalse(t, "ranged", func(t *testing.T, ranged bool) {
db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_revert_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func getStats(t *testing.T, reader storage.Reader) enginepb.MVCCStats {
// createTestRocksDBEngine returns a new in-memory RocksDB engine with 1MB of
// storage capacity.
func createTestRocksDBEngine(ctx context.Context) storage.Engine {
return storage.NewInMem(ctx, roachpb.Attributes{}, 1<<20)
return storage.NewInMemForTesting(ctx, roachpb.Attributes{}, 1<<20)
}

// createTestPebbleEngine returns a new in-memory Pebble storage engine.
func createTestPebbleEngine(ctx context.Context) storage.Engine {
return storage.NewInMem(ctx, roachpb.Attributes{}, 1<<20)
return storage.NewInMemForTesting(ctx, roachpb.Attributes{}, 1<<20)
}

var engineImpls = []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func testScanReverseScanInner(
k1, k2 := roachpb.Key("a"), roachpb.Key("b")
ts := hlc.Timestamp{WallTime: 1}

eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

// Write to k1 and k2.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func runUnreplicatedTruncatedState(t *testing.T, tc unreplicatedTruncStateTest)
FirstIndex: firstIndex,
}

eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

truncState := roachpb.RaftTruncatedState{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestCollectIntentsUsesSameIterator(t *testing.T) {
// the request should ignore the intent and should not return any
// corresponding intent row.
testutils.RunTrueAndFalse(t, "deletion intent", func(t *testing.T, delete bool) {
db := &instrumentedEngine{Engine: storage.NewDefaultInMem()}
db := &instrumentedEngine{Engine: storage.NewDefaultInMemForTesting()}
defer db.Close()

// Write an intent.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func TestUpdateAbortSpan(t *testing.T) {
skip.IgnoreLint(t, "invalid test case")
}

db := storage.NewDefaultInMem()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
Expand Down
20 changes: 6 additions & 14 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3086,14 +3086,11 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
inSnap.State.Desc.RangeID != rangeIds[string(keyA)] {
return nil
}
// TODO(sumeer): fix this test (and others in this file) when
// DisallowSeparatedIntents=false

// The seven to nine SSTs we are expecting to ingest are in the following order:
// - Replicated range-id local keys of the range in the snapshot.
// - Range-local keys of the range in the snapshot.
// - Optionally, two SSTs for the lock table keys of the range in the
// snapshot
// - Two SSTs for the lock table keys of the range in the snapshot.
// - User keys of the range in the snapshot.
// - Unreplicated range-id local keys of the range in the snapshot.
// - SST to clear range-id local keys of the subsumed replica with
Expand All @@ -3105,12 +3102,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// NOTE: There are no range-local keys or lock table keys, in [d, /Max) in
// the store we're sending a snapshot to, so we aren't expecting SSTs to
// clear those keys.
expectedSSTCount := 7
indexAdjustment := 0
if !storage.DisallowSeparatedIntents {
expectedSSTCount += 2
indexAdjustment = 2
}
expectedSSTCount := 9
if len(sstNames) != expectedSSTCount {
return errors.Errorf("expected to ingest %d SSTs, got %d SSTs",
expectedSSTCount, len(sstNames))
Expand All @@ -3128,9 +3120,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// already been sent.
var sstNamesSubset []string
// The SST with the user keys in the snapshot.
sstNamesSubset = append(sstNamesSubset, sstNames[2+indexAdjustment])
sstNamesSubset = append(sstNamesSubset, sstNames[4])
// Remaining ones from the predict list above.
sstNamesSubset = append(sstNamesSubset, sstNames[4+indexAdjustment:]...)
sstNamesSubset = append(sstNamesSubset, sstNames[6:]...)

// Construct the expected SSTs and ensure that they are byte-by-byte
// equal. This verification ensures that the SSTs have the same
Expand Down Expand Up @@ -3171,9 +3163,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
}
}
}
if len(expectedSSTs) != 3+indexAdjustment {
if len(expectedSSTs) != 5 {
return errors.Errorf("len of expectedSSTs should expected to be %d, but got %d",
3+indexAdjustment, len(expectedSSTs))
5, len(expectedSSTs))
}
// Keep the last one which contains the user keys.
expectedSSTs = expectedSSTs[len(expectedSSTs)-1:]
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func createTestStoreWithOpts(
}
eng := opts.eng
if eng == nil {
eng = storage.NewDefaultInMem()
eng = storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
}

Expand Down Expand Up @@ -832,7 +832,7 @@ func (m *multiTestContext) addStore(idx int) {
} else {
engineStopper := stop.NewStopper()
m.engineStoppers = append(m.engineStoppers, engineStopper)
eng = storage.NewDefaultInMem()
eng = storage.NewDefaultInMemForTesting()
engineStopper.AddCloser(eng)
m.engines = append(m.engines, eng)
needBootstrap = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestGCIterator(t *testing.T) {
}
makeTest := func(tc testCase) func(t *testing.T) {
return func(t *testing.T) {
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
ds := makeLiteralDataDistribution(tc.data...)
ds.setupTest(t, eng, desc)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestRunNewVsOld(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("%v@%v,ttl=%v", tc.ds, tc.now, tc.ttl), func(t *testing.T) {
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

tc.ds.dist(N, rng).setupTest(t, eng, *tc.ds.desc())
Expand Down Expand Up @@ -144,7 +144,7 @@ func BenchmarkRun(b *testing.B) {
}
makeTest := func(old bool, spec randomRunGCTestSpec) func(b *testing.B) {
return func(b *testing.B) {
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
ms := spec.ds.dist(b.N, rng).setupTest(b, eng, *spec.ds.desc())
b.SetBytes(int64(float64(ms.Total()) / float64(b.N)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (cws *cachedWriteSimulator) value(size int) roachpb.Value {
func (cws *cachedWriteSimulator) multiKey(
numOps int, size int, txn *roachpb.Transaction, ms *enginepb.MVCCStats,
) {
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
t, ctx := cws.t, context.Background()

Expand All @@ -254,7 +254,7 @@ func (cws *cachedWriteSimulator) multiKey(
func (cws *cachedWriteSimulator) singleKeySteady(
qps int, duration time.Duration, size int, ms *enginepb.MVCCStats,
) {
eng := storage.NewDefaultInMem()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
t, ctx := cws.t, context.Background()
cacheKey := fmt.Sprintf("%d-%s-%s", qps, duration, humanizeutil.IBytes(int64(size)))
Expand Down
Loading

0 comments on commit 98beba6

Please sign in to comment.