diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index dab9cb4a4862..d4af1e6ea930 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -29,7 +30,74 @@ import ( ) func init() { - RegisterReadWriteCommand(roachpb.AddSSTable, DefaultDeclareKeys, EvalAddSSTable) + RegisterReadWriteCommand(roachpb.AddSSTable, declareKeysAddSSTable, EvalAddSSTable) +} + +func declareKeysAddSSTable( + rs ImmutableRangeState, + header roachpb.Header, + req roachpb.Request, + latchSpans, lockSpans *spanset.SpanSet, +) { + // AddSSTable violates MVCC and closed timestamp invariants, so the + // concurrency semantics deserve special attention. + // + // AddSSTable cannot be in a transaction, cannot write intents or tombstones, + // cannot be split across ranges, and is always alone in a batch. + // + // The KV pairs in the SST already have fixed MVCC timestamps, independent of + // the batch timestamp. Pushes by other txns or the closed timestamp do not + // affect the MVCC timestamps. They can be at any time (past or future), even + // below the closed timestamp, and by default they can replace existing + // versions or write below existing versions and intents. This violates MVCC, + // because history must be immutable, and the closed timestamp, because writes + // should never happen below it. + // + // DisallowShadowing=true will prevent writing to keys that already exist + // (with any timestamp), returning an error -- except if the last version is a + // tombstone with a timestamp below the written key or if the timestamp and + // value exactly match the incoming write (for idempotency). If an intent is + // found, WriteIntentError will be returned in order to resolve it and retry: + // if the intent was aborted or a tombstone the request may succeed, but if it + // was a committed value the request will fail. This still violates MVCC (it + // may write a key in the past whose absence has already been observed by a + // reader) and the closed timestamp (it may write a key below it). + // + // The request header's Key and EndKey are set to cover the first and last key + // in the SST. Below, we always declare write latches across this span for + // isolation from concurrent requests. If DisallowShadowing=true, we must also + // declare lock spans over this span for isolation from concurrent + // transactions, and return WriteIntentError for any encountered intents to + // resolve them. This is particularly relevant for IMPORT INTO, which imports + // into an offline table that may contain unresolved intents from previous + // transactions. + // + // Taking out latches/locks across the entire SST span is very coarse, and we + // could instead iterate over the SST and take out point latches/locks, but + // the cost is likely not worth it since AddSSTable is often used with + // unpopulated spans. + // + // AddSSTable callers must take extreme care to only write into key/time spans + // that have never been accessed by a past transaction, and will not be + // accessed by a concurrent transaction, or to make sure these accesses are + // safe. Below is a list of current operations that use AddSSTable and their + // characteristics: + // + // | Operation | DisallowShadowing | Timestamp | Isolation via | + // |------------------------|-------------------|--------------|-------------------| + // | Import | true | Now | Offline table | + // | CREATE TABLE AS SELECT | true | Read TS | Table descriptor | + // | Materialized views | true | Read TS | Table descriptor | + // | Index backfills | false | Now | Index descriptor | + // | Restore (backup) | true | Key TS | Table descriptor | + // | Streaming replication | false | Key TS | Offline tenant | + // + args := req.(*roachpb.AddSSTableRequest) + if args.DisallowShadowing { + DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans) + } else { + DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans) + } } // EvalAddSSTable evaluates an AddSSTable command. diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 35d07bf256e2..e9b622f100b7 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) var engineImpls = []struct { @@ -1056,3 +1057,79 @@ func TestAddSSTableDisallowShadowing(t *testing.T) { }) } } + +func TestAddSSTableDisallowShadowingIntentResolution(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + // Start a transaction that writes an intent at b. + txn := db.NewTxn(ctx, "intent") + require.NoError(t, txn.Put(ctx, "b", "intent")) + + // Generate an SSTable that covers keys a, b, and c, and submit it with high + // priority. This is going to abort the transaction above, encounter its + // intent, and resolve it. + sst := makeSST(t, s.Clock().Now(), map[string]string{ + "a": "1", + "b": "2", + "c": "3", + }) + stats := sstStats(t, sst) + + ba := roachpb.BatchRequest{} + ba.Header.UserPriority = roachpb.MaxUserPriority + ba.Add(&roachpb.AddSSTableRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, + Data: sst, + MVCCStats: stats, + DisallowShadowing: true, + }) + _, pErr := db.NonTransactionalSender().Send(ctx, ba) + require.Nil(t, pErr) + + // The transaction should now be aborted. + err := txn.Commit(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "TransactionRetryWithProtoRefreshError: TransactionAbortedError") +} + +func makeSST(t *testing.T, ts hlc.Timestamp, kvs map[string]string) []byte { + t.Helper() + + sstFile := &storage.MemFile{} + writer := storage.MakeBackupSSTWriter(sstFile) + defer writer.Close() + + keys := make([]string, 0, len(kvs)) + for key := range kvs { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, k := range keys { + key := storage.MVCCKey{Key: roachpb.Key(k), Timestamp: ts} + value := roachpb.Value{} + value.SetString(kvs[k]) + value.InitChecksum(key.Key) + require.NoError(t, writer.Put(key, value.RawBytes)) + } + require.NoError(t, writer.Finish()) + writer.Close() + return sstFile.Data() +} + +func sstStats(t *testing.T, sst []byte) *enginepb.MVCCStats { + t.Helper() + + iter, err := storage.NewMemSSTIterator(sst, true) + require.NoError(t, err) + defer iter.Close() + + stats, err := storage.ComputeStatsForRange(iter, keys.MinKey, keys.MaxKey, 0) + require.NoError(t, err) + return &stats +}