From b05f1b3337dd40adc9e71616e9f4e483c1ef0261 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Fri, 23 Apr 2021 15:22:33 +0100 Subject: [PATCH] storage: report all encountered intents in sst export error Previously, pebbleExportToSst was stopping upon encountering first intent. This was causing backups to be very slow if lots of intents build up. To be able to proceed with export, intent needs to be resolved and export retried. The result of this behaviour is that export would run as many times as there were intents in the table before succeeding. To address this, all intents from the range are collected and reported in WriteIntentError. They could be resolved efficiently as batch similar to how GC operates. Release note (bug fix): Backup no longer resolves intents one by one. This eliminates running a high pri query to cleanup intents to unblock backup in case of intent buildup. --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_intents_test.go | 85 ++++++ pkg/storage/mvcc_incremental_iterator.go | 85 ++++-- pkg/storage/mvcc_incremental_iterator_test.go | 257 ++++++++++++------ pkg/storage/pebble.go | 50 +++- pkg/storage/pebble_test.go | 89 ++++++ 6 files changed, 455 insertions(+), 112 deletions(-) create mode 100644 pkg/ccl/backupccl/backup_intents_test.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 8338776723c7..3c49db75d4c9 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -118,6 +118,7 @@ go_test( srcs = [ "backup_cloud_test.go", "backup_destination_test.go", + "backup_intents_test.go", "backup_test.go", "bench_test.go", "create_scheduled_backup_test.go", diff --git a/pkg/ccl/backupccl/backup_intents_test.go b/pkg/ccl/backupccl/backup_intents_test.go new file mode 100644 index 000000000000..6fd1bb193ea8 --- /dev/null +++ b/pkg/ccl/backupccl/backup_intents_test.go @@ -0,0 +1,85 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer utilccl.TestingEnableEnterprise()() + + skip.UnderRace(t, "measures backup times not to regress, can't work under race") + + // Time to create backup in presence of intents differs roughly 10x so some + // arbitrary number is picked which is 2x higher than current backup time on + // current (laptop) hardware. + const backupTimeout = time.Second * 10 + + const totalRowCount = 10000 + const perTransactionRowCount = 10 + + // Interceptor catches requests that cleanup transactions of size 1000 which are + // test data transactions. All other transaction commits pass though. + interceptor := func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + endTxn := req.Requests[0].GetEndTxn() + if endTxn != nil && !endTxn.Commit && len(endTxn.LockSpans) == perTransactionRowCount { + // If this is a rollback of one the test's SQL transactions, allow the + // EndTxn to proceed and mark the transaction record as ABORTED, but strip + // the request of its lock spans so that no intents are recorded into the + // transaction record or eagerly resolved. This is a bit of a hack, but it + // mimics the behavior of an abandoned transaction which is aborted by a + // pusher after expiring due to an absence of heartbeats. + endTxn.LockSpans = nil + } + return nil + } + serverKnobs := kvserver.StoreTestingKnobs{TestingRequestFilter: interceptor} + + s, sqlDb, _ := serverutils.StartServer(t, + base.TestServerArgs{Knobs: base.TestingKnobs{Store: &serverKnobs}}) + defer s.Stopper().Stop(context.Background()) + + _, err := sqlDb.Exec("create table foo(v int not null)") + require.NoError(t, err) + + for i := 0; i < totalRowCount; i += perTransactionRowCount { + tx, err := sqlDb.Begin() + require.NoError(t, err) + for j := 0; j < perTransactionRowCount; j += 1 { + statement := fmt.Sprintf("insert into foo (v) values (%d)", i+j) + _, err = tx.Exec(statement) + require.NoError(t, err) + } + require.NoError(t, tx.Rollback()) + } + + start := timeutil.Now() + _, err = sqlDb.Exec("backup table foo to 'userfile:///test.foo'") + stop := timeutil.Now() + require.NoError(t, err, "Failed to run backup") + t.Logf("Backup took %s", stop.Sub(start)) + require.WithinDuration(t, stop, start, backupTimeout, "Time to make backup") +} diff --git a/pkg/storage/mvcc_incremental_iterator.go b/pkg/storage/mvcc_incremental_iterator.go index 1186e4c16429..099827dc5209 100644 --- a/pkg/storage/mvcc_incremental_iterator.go +++ b/pkg/storage/mvcc_incremental_iterator.go @@ -86,6 +86,12 @@ type MVCCIncrementalIterator struct { // For allocation avoidance, meta is used to store the timestamp of keys // regardless if they are metakeys. meta enginepb.MVCCMetadata + + // Intent aggregation options. + // Configuration passed in MVCCIncrementalIterOptions. + enableWriteIntentAggregation bool + // Optional collection of intents created on demand when first intent encountered. + intents []roachpb.Intent } var _ SimpleMVCCIterator = &MVCCIncrementalIterator{} @@ -100,6 +106,12 @@ type MVCCIncrementalIterOptions struct { // time. StartTime hlc.Timestamp EndTime hlc.Timestamp + // If intent aggregation is enabled, iterator will not fail on first encountered + // intent, but will proceed further. All found intents will be aggregated into + // a single WriteIntentError which would be updated during iteration. Consumer + // would be free to decide if it wants to keep collecting entries and intents or + // skip entries. + EnableWriteIntentAggregation bool } // NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the @@ -135,10 +147,11 @@ func NewMVCCIncrementalIterator( } return &MVCCIncrementalIterator{ - iter: iter, - startTime: opts.StartTime, - endTime: opts.EndTime, - timeBoundIter: timeBoundIter, + iter: iter, + startTime: opts.StartTime, + endTime: opts.EndTime, + timeBoundIter: timeBoundIter, + enableWriteIntentAggregation: opts.EnableWriteIntentAggregation, } } @@ -185,12 +198,22 @@ func (i *MVCCIncrementalIterator) Close() { // key. func (i *MVCCIncrementalIterator) Next() { i.iter.Next() + if !i.checkValidAndSaveErr() { + return + } + i.advance() +} + +// checkValidAndSaveErr checks if the underlying iter is valid after the operation +// and saves the error and validity state. Returns true if the underlying iterator +// is valid. +func (i *MVCCIncrementalIterator) checkValidAndSaveErr() bool { if ok, err := i.iter.Valid(); !ok { i.err = err i.valid = false - return + return false } - i.advance() + return true } // NextKey advances the iterator to the next key. This operation is distinct @@ -198,9 +221,7 @@ func (i *MVCCIncrementalIterator) Next() { // key if the iterator is currently located at the last version for a key. func (i *MVCCIncrementalIterator) NextKey() { i.iter.NextKey() - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } i.advance() @@ -265,9 +286,7 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() { // expensive than a Next call. seekKey := MakeMVCCMetadataKey(tbiKey) i.iter.SeekGE(seekKey) - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } } @@ -313,13 +332,20 @@ func (i *MVCCIncrementalIterator) initMetaAndCheckForIntentOrInlineError() error } if i.startTime.Less(metaTimestamp) && metaTimestamp.LessEq(i.endTime) { - i.err = &roachpb.WriteIntentError{ - Intents: []roachpb.Intent{ - roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key), - }, + if !i.enableWriteIntentAggregation { + // If we don't plan to collect intents for resolving, we bail out here with a single intent. + i.err = &roachpb.WriteIntentError{ + Intents: []roachpb.Intent{ + roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key), + }, + } + i.valid = false + return i.err } - i.valid = false - return i.err + // We are collecting intents, so we need to save it and advance to its proposed value. + // Caller could then use a value key to update proposed row counters for the sake of bookkeeping + // and advance more. + i.intents = append(i.intents, roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key)) } return nil } @@ -345,9 +371,7 @@ func (i *MVCCIncrementalIterator) advance() { // the next valid KV. if i.meta.Txn != nil { i.iter.Next() - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } continue @@ -435,3 +459,22 @@ func (i *MVCCIncrementalIterator) NextIgnoringTime() { return } } + +// NumCollectedIntents returns number of intents encountered during iteration. +// This is only the case when intent aggregation is enabled, otherwise it is +// always 0. +func (i *MVCCIncrementalIterator) NumCollectedIntents() int { + return len(i.intents) +} + +// TryGetIntentError returns roachpb.WriteIntentError if intents were encountered +// during iteration and intent aggregation is enabled. Otherwise function +// returns nil. roachpb.WriteIntentError will contain all encountered intents. +func (i *MVCCIncrementalIterator) TryGetIntentError() error { + if len(i.intents) == 0 { + return nil + } + return &roachpb.WriteIntentError{ + Intents: i.intents, + } +} diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 9013b138d7a3..1e06f7d7f392 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -39,31 +39,134 @@ func iterateExpectErr( startKey, endKey roachpb.Key, startTime, endTime hlc.Timestamp, revisions bool, - errString string, + intents []roachpb.Intent, ) func(*testing.T) { return func(t *testing.T) { t.Helper() - iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ - EndKey: endKey, - StartTime: startTime, - EndTime: endTime, + t.Run("aggregate-intents", func(t *testing.T) { + assertExpectErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents) }) - defer iter.Close() - var iterFn func() - if revisions { - iterFn = iter.Next - } else { - iterFn = iter.NextKey + t.Run("first-intent", func(t *testing.T) { + assertExpectErr(t, e, startKey, endKey, startTime, endTime, revisions, intents[0]) + }) + t.Run("export-intents", func(t *testing.T) { + assertExportedErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents, false) + }) + t.Run("export-intents-tbi", func(t *testing.T) { + assertExportedErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents, true) + }) + } +} + +func assertExpectErr( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntent roachpb.Intent, +) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: endKey, + StartTime: startTime, + EndTime: endTime, + }) + defer iter.Close() + var iterFn func() + if revisions { + iterFn = iter.Next + } else { + iterFn = iter.NextKey + } + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break } - for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { - if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { - break + // pass + } + + _, err := iter.Valid() + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + if !expectedIntent.Key.Equal(intentErr.Intents[0].Key) { + t.Fatalf("Expected intent key %v, but got %v", expectedIntent.Key, intentErr.Intents[0].Key) + } + } else { + t.Fatalf("expected error with intent %v but got %v", expectedIntent, err) + } +} + +func assertExpectErrs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntents []roachpb.Intent, +) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: endKey, + StartTime: startTime, + EndTime: endTime, + EnableWriteIntentAggregation: true, + }) + defer iter.Close() + var iterFn func() + if revisions { + iterFn = iter.Next + } else { + iterFn = iter.NextKey + } + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + // pass + } + + if iter.NumCollectedIntents() != len(expectedIntents) { + t.Fatalf("Expected %d intents but found %d", len(expectedIntents), iter.NumCollectedIntents()) + } + err := iter.TryGetIntentError() + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + for i := range expectedIntents { + if !expectedIntents[i].Key.Equal(intentErr.Intents[i].Key) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Key, expectedIntents[i].Key) + } + if !expectedIntents[i].Txn.ID.Equal(intentErr.Intents[i].Txn.ID) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Txn.ID, expectedIntents[i].Txn.ID) } - // pass } - if _, err := iter.Valid(); !testutils.IsError(err, errString) { - t.Fatalf("expected error %q but got %v", errString, err) + } else { + t.Fatalf("Expected roachpb.WriteIntentError, found %T", err) + } +} + +func assertExportedErrs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntents []roachpb.Intent, + useTBI bool, +) { + const big = 1 << 30 + sstFile := &MemFile{} + _, _, err := e.ExportMVCCToSst(startKey, endKey, startTime, endTime, revisions, big, big, + useTBI, sstFile) + require.Error(t, err) + + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + for i := range expectedIntents { + if !expectedIntents[i].Key.Equal(intentErr.Intents[i].Key) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Key, expectedIntents[i].Key) + } + if !expectedIntents[i].Txn.ID.Equal(intentErr.Intents[i].Txn.ID) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Txn.ID, expectedIntents[i].Txn.ID) + } } + } else { + t.Fatalf("Expected roachpb.WriteIntentError, found %T", err) } } @@ -196,6 +299,7 @@ func assertIteratedKVs( EnableTimeBoundIteratorOptimization: useTBI, StartTime: startTime, EndTime: endTime, + EnableWriteIntentAggregation: true, }) defer iter.Close() var iterFn func() @@ -211,6 +315,9 @@ func assertIteratedKVs( } else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { break } + if iter.NumCollectedIntents() > 0 { + t.Fatal("got unexpected intent error") + } kvs = append(kvs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) } @@ -428,16 +535,33 @@ func TestMVCCIncrementalIterator(t *testing.T) { makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue { return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value} } + makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp) (roachpb.Transaction, roachpb.Value, roachpb.Intent) { + txnID := uuid.MakeV4() + txnMeta := enginepb.TxnMeta{ + Key: key, + ID: txnID, + Epoch: 1, + WriteTimestamp: ts, + } + return roachpb.Transaction{ + TxnMeta: txnMeta, + ReadTimestamp: ts, + }, roachpb.Value{ + RawBytes: val, + }, roachpb.MakeIntent(&txnMeta, key) + } + intents := func(intents ...roachpb.Intent) []roachpb.Intent { return intents } + // Keys are named as kv__. kv1_1_1 := makeKVT(testKey1, testValue1, ts1) kv1_4_4 := makeKVT(testKey1, testValue4, ts4) kv1_2_2 := makeKVT(testKey1, testValue2, ts2) kv2_2_2 := makeKVT(testKey2, testValue3, ts2) - kv1_3Deleted := makeKVT(testKey1, nil, ts3) + kv1Deleted3 := makeKVT(testKey1, nil, ts3) kvs := func(kvs ...MVCCKeyValue) []MVCCKeyValue { return kvs } for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { + t.Run(engineImpl.name+"-latest", func(t *testing.T) { e := engineImpl.create() defer e.Close() @@ -466,49 +590,29 @@ func TestMVCCIncrementalIterator(t *testing.T) { if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { t.Fatal(err) } - t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, latest, kvs(kv1_3Deleted, kv2_2_2))) + t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, latest, kvs(kv1Deleted3, kv2_2_2))) // Exercise intent handling. - txn1ID := uuid.MakeV4() - txn1 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey1, - ID: txn1ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn1Val := roachpb.Value{RawBytes: testValue4} + txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { t.Fatal(err) } - txn2ID := uuid.MakeV4() - txn2 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey2, - ID: txn2ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn2Val := roachpb.Value{RawBytes: testValue4} + txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil { t.Fatal(err) } - t.Run("intents", - iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, latest, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, latest, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, localMax, keyMax, tsMin, ts4, latest, "conflicting intents")) + t.Run("intents-1", + iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, latest, intents(intentErr1))) + t.Run("intents-2", + iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, latest, intents(intentErr2))) + t.Run("intents-multi", + iterateExpectErr(e, localMax, keyMax, tsMin, ts4, latest, intents(intentErr1, intentErr2))) // Intents above the upper time bound or beneath the lower time bound must // be ignored (#28358). Note that the lower time bound is exclusive while // the upper time bound is inclusive. - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, latest, kvs(kv1_3Deleted, kv2_2_2))) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, latest, kvs())) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, latest, kvs())) + t.Run("intents-filtered-1", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, latest, kvs(kv1Deleted3, kv2_2_2))) + t.Run("intents-filtered-2", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, latest, kvs())) + t.Run("intents-filtered-3", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, latest, kvs())) intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1}) intent1.Status = roachpb.COMMITTED @@ -520,12 +624,12 @@ func TestMVCCIncrementalIterator(t *testing.T) { if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil { t.Fatal(err) } - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2))) + t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2))) }) } for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { + t.Run(engineImpl.name+"-all", func(t *testing.T) { e := engineImpl.create() defer e.Close() @@ -554,49 +658,30 @@ func TestMVCCIncrementalIterator(t *testing.T) { if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { t.Fatal(err) } - t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, all, kvs(kv1_3Deleted, kv1_2_2, kv2_2_2))) + t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, all, kvs(kv1Deleted3, kv1_2_2, kv2_2_2))) // Exercise intent handling. - txn1ID := uuid.MakeV4() - txn1 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey1, - ID: txn1ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn1Val := roachpb.Value{RawBytes: testValue4} + txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { t.Fatal(err) } - txn2ID := uuid.MakeV4() - txn2 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey2, - ID: txn2ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn2Val := roachpb.Value{RawBytes: testValue4} + txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil { t.Fatal(err) } - t.Run("intents", - iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, all, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, all, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, localMax, keyMax, tsMin, ts4, all, "conflicting intents")) + // Single intent tests are verifying behavior when intent collection is not enabled. + t.Run("intents-1", + iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, all, intents(intentErr1))) + t.Run("intents-2", + iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, all, intents(intentErr2))) + t.Run("intents-multi", + iterateExpectErr(e, localMax, keyMax, tsMin, ts4, all, intents(intentErr1, intentErr2))) // Intents above the upper time bound or beneath the lower time bound must // be ignored (#28358). Note that the lower time bound is exclusive while // the upper time bound is inclusive. - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, all, kvs(kv1_3Deleted, kv1_2_2, kv1_1_1, kv2_2_2))) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, all, kvs())) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, all, kvs())) + t.Run("intents-filtered-1", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, all, kvs(kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2))) + t.Run("intents-filtered-2", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, all, kvs())) + t.Run("intents-filtered-3", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, all, kvs())) intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1}) intent1.Status = roachpb.COMMITTED @@ -608,7 +693,7 @@ func TestMVCCIncrementalIterator(t *testing.T) { if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil { t.Fatal(err) } - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1_3Deleted, kv1_2_2, kv1_1_1, kv2_2_2))) + t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2))) }) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index de8442521786..230adb22934c 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -47,6 +47,12 @@ import ( const ( maxSyncDurationFatalOnExceededDefault = true + + // Default value for maximum number of intents reported by ExportToSST + // in WriteIntentError is set to half of the maximum lock table size. + // This value is subject to tuning in real environment as we have more + // data available. + maxIntentsPerSstExportErrorDefault = 5000 ) // Default for MaxSyncDuration below. @@ -69,6 +75,11 @@ var MaxSyncDurationFatalOnExceeded = settings.RegisterBoolSetting( maxSyncDurationFatalOnExceededDefault, ) +var maxIntentsPerSstExportError = settings.RegisterIntSetting( + "storage.sst_export.max_intents_per_error", + "maximum number of intents returned in error when sst export fails", + maxIntentsPerSstExportErrorDefault) + // EngineKeyCompare compares cockroach keys, including the version (which // could be MVCC timestamps). func EngineKeyCompare(a, b []byte) int { @@ -657,8 +668,9 @@ func (p *Pebble) ExportMVCCToSst( ) (roachpb.BulkOpSummary, roachpb.Key, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. + maxIntentCount := maxIntentsPerSstExportError.Get(&p.settings.SV) summary, k, err := pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, - maxSize, useTBI, dest) + maxSize, useTBI, dest, maxIntentCount) r.Free() return summary, k, err } @@ -1089,6 +1101,7 @@ func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch { func (p *Pebble) NewSnapshot() Reader { return &pebbleSnapshot{ snapshot: p.db.NewSnapshot(), + settings: p.settings, } } @@ -1327,8 +1340,9 @@ func (p *pebbleReadOnly) ExportMVCCToSst( ) (roachpb.BulkOpSummary, roachpb.Key, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. + maxIntentCount := maxIntentsPerSstExportError.Get(&p.parent.settings.SV) summary, k, err := pebbleExportToSst( - r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, dest) + r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, dest, maxIntentCount) r.Free() return summary, k, err } @@ -1563,6 +1577,7 @@ func (p *pebbleReadOnly) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO // pebbleSnapshot represents a snapshot created using Pebble.NewSnapshot(). type pebbleSnapshot struct { snapshot *pebble.Snapshot + settings *cluster.Settings closed bool } @@ -1590,8 +1605,9 @@ func (p *pebbleSnapshot) ExportMVCCToSst( ) (roachpb.BulkOpSummary, roachpb.Key, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. + maxIntentCount := maxIntentsPerSstExportError.Get(&p.settings.SV) summary, k, err := pebbleExportToSst( - r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, dest) + r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, dest, maxIntentCount) r.Free() return summary, k, err } @@ -1699,6 +1715,7 @@ func pebbleExportToSst( targetSize, maxSize uint64, useTBI bool, dest io.WriteCloser, + maxIntentCount int64, ) (roachpb.BulkOpSummary, roachpb.Key, error) { sstWriter := MakeBackupSSTWriter(noopSync{dest}) defer sstWriter.Close() @@ -1711,6 +1728,7 @@ func pebbleExportToSst( EnableTimeBoundIteratorOptimization: useTBI, StartTime: startTS, EndTime: endTS, + EnableWriteIntentAggregation: true, }) defer iter.Close() var curKey roachpb.Key // only used if exportAllRevisions @@ -1719,8 +1737,8 @@ func pebbleExportToSst( for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; { ok, err := iter.Valid() if err != nil { - // The error may be a WriteIntentError. In which case, returning it will - // cause this command to be retried. + // This is an underlying iterator error, return it to the caller to deal + // with. return roachpb.BulkOpSummary{}, nil, err } if !ok { @@ -1730,6 +1748,11 @@ func pebbleExportToSst( if unsafeKey.Key.Compare(endKey) >= 0 { break } + + if iter.NumCollectedIntents() > 0 { + break + } + unsafeValue := iter.UnsafeValue() isNewKey := !exportAllRevisions || !unsafeKey.Key.Equal(curKey) if paginated && exportAllRevisions && isNewKey { @@ -1776,6 +1799,23 @@ func pebbleExportToSst( } } + // First check if we encountered an intent while iterating the data. + // If we do it means this export can't complete and is aborted. We need to loop over remaining data + // to collect all matching intents before returning them in an error to the caller. + if iter.NumCollectedIntents() > 0 { + for int64(iter.NumCollectedIntents()) < maxIntentCount { + iter.NextKey() + // If we encounter other errors during intent collection, we return our original write intent failure. + // We would find this new error again upon retry. + ok, _ := iter.Valid() + if !ok { + break + } + } + err := iter.TryGetIntentError() + return roachpb.BulkOpSummary{}, nil, err + } + if rows.BulkOpSummary.DataSize == 0 { // If no records were added to the sstable, skip completing it and return a // nil slice – the export code will discard it anyway (based on 0 DataSize). diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 471b2dd14fbc..4ce659c6434f 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" ) @@ -548,3 +549,91 @@ func BenchmarkMVCCKeyCompare(b *testing.B) { fmt.Fprint(ioutil.Discard, c) } } + +type testValue struct { + key roachpb.Key + value roachpb.Value + timestamp hlc.Timestamp + txn *roachpb.Transaction +} + +func intent(key roachpb.Key, val string, ts hlc.Timestamp) testValue { + var value = roachpb.MakeValueFromString(val) + value.InitChecksum(key) + tx := roachpb.MakeTransaction(fmt.Sprintf("txn-%v", key), key, roachpb.NormalUserPriority, ts, 1000) + var txn = &tx + return testValue{key, value, ts, txn} +} + +func value(key roachpb.Key, val string, ts hlc.Timestamp) testValue { + var value = roachpb.MakeValueFromString(val) + value.InitChecksum(key) + return testValue{key, value, ts, nil} +} + +func fillInData(ctx context.Context, engine Engine, data []testValue) error { + batch := engine.NewBatch() + for _, val := range data { + if err := MVCCPut(ctx, batch, nil, val.key, val.timestamp, val.value, val.txn); err != nil { + return err + } + } + return batch.Commit(true) +} + +func ts(ts int64) hlc.Timestamp { + return hlc.Timestamp{WallTime: ts} +} + +func key(k int) roachpb.Key { + return []byte(fmt.Sprintf("%05d", k)) +} + +func requireTxnForValue(t *testing.T, val testValue, intent roachpb.Intent) { + require.Equal(t, val.txn.Key, intent.Txn.Key) +} + +func TestSstExportFailureIntentBatching(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Test function uses a fixed time and key range to produce SST. + // Use varying inserted keys for values and intents to putting them in and out of ranges. + checkReportedErrors := func(data []testValue, expectedIntentIndices []int) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + + engine := createTestPebbleEngine() + defer engine.Close() + + require.NoError(t, fillInData(ctx, engine, data)) + + destination := &MemFile{} + _, _, err := engine.ExportMVCCToSst(key(10), key(20000), ts(999), ts(2000), + true, 0, 0, true, destination) + if len(expectedIntentIndices) == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + e := (*roachpb.WriteIntentError)(nil) + if !errors.As(err, &e) { + require.Fail(t, "Expected WriteIntentFailure, got %T", err) + } + require.Equal(t, len(expectedIntentIndices), len(e.Intents)) + for i, dataIdx := range expectedIntentIndices { + requireTxnForValue(t, data[dataIdx], e.Intents[i]) + } + } + } + } + + // Export range is fixed to k:["00010", "10000"), ts:(999, 2000] for all tests. + testDataCount := int(maxIntentsPerSstExportError.Default() + 1) + testData := make([]testValue, testDataCount*2) + expectedErrors := make([]int, testDataCount) + for i := 0; i < testDataCount; i++ { + testData[i*2] = value(key(i*2+11), "value", ts(1000)) + testData[i*2+1] = intent(key(i*2+12), "intent", ts(1001)) + expectedErrors[i] = i*2 + 1 + } + t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:maxIntentsPerSstExportError.Default()])) +}