Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: declare lock spans for AddSSTable #71676

Merged
merged 1 commit into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand All @@ -29,7 +30,74 @@ import (
)

func init() {
RegisterReadWriteCommand(roachpb.AddSSTable, DefaultDeclareKeys, EvalAddSSTable)
RegisterReadWriteCommand(roachpb.AddSSTable, declareKeysAddSSTable, EvalAddSSTable)
}

func declareKeysAddSSTable(
rs ImmutableRangeState,
header roachpb.Header,
req roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
// AddSSTable violates MVCC and closed timestamp invariants, so the
// concurrency semantics deserve special attention.
//
// AddSSTable cannot be in a transaction, cannot write intents or tombstones,
// cannot be split across ranges, and is always alone in a batch.
//
// The KV pairs in the SST already have fixed MVCC timestamps, independent of
// the batch timestamp. Pushes by other txns or the closed timestamp do not
// affect the MVCC timestamps. They can be at any time (past or future), even
// below the closed timestamp, and by default they can replace existing
// versions or write below existing versions and intents. This violates MVCC,
// because history must be immutable, and the closed timestamp, because writes
// should never happen below it.
//
// DisallowShadowing=true will prevent writing to keys that already exist
// (with any timestamp), returning an error -- except if the last version is a
// tombstone with a timestamp below the written key or if the timestamp and
// value exactly match the incoming write (for idempotency). If an intent is
// found, WriteIntentError will be returned in order to resolve it and retry:
// if the intent was aborted or a tombstone the request may succeed, but if it
// was a committed value the request will fail. This still violates MVCC (it
// may write a key in the past whose absence has already been observed by a
// reader) and the closed timestamp (it may write a key below it).
//
// The request header's Key and EndKey are set to cover the first and last key
// in the SST. Below, we always declare write latches across this span for
// isolation from concurrent requests. If DisallowShadowing=true, we must also
// declare lock spans over this span for isolation from concurrent
// transactions, and return WriteIntentError for any encountered intents to
// resolve them. This is particularly relevant for IMPORT INTO, which imports
// into an offline table that may contain unresolved intents from previous
// transactions.
//
// Taking out latches/locks across the entire SST span is very coarse, and we
// could instead iterate over the SST and take out point latches/locks, but
// the cost is likely not worth it since AddSSTable is often used with
// unpopulated spans.
//
// AddSSTable callers must take extreme care to only write into key/time spans
// that have never been accessed by a past transaction, and will not be
// accessed by a concurrent transaction, or to make sure these accesses are
// safe. Below is a list of current operations that use AddSSTable and their
// characteristics:
//
// | Operation | DisallowShadowing | Timestamp | Isolation via |
// |------------------------|-------------------|--------------|-------------------|
// | Import | true | Now | Offline table |
// | CREATE TABLE AS SELECT | true | Read TS | Table descriptor |
// | Materialized views | true | Read TS | Table descriptor |
// | Index backfills | false | Now | Index descriptor |
// | Restore (backup) | true | Key TS | Table descriptor |
// | Streaming replication | false | Key TS | Offline tenant |
//
args := req.(*roachpb.AddSSTableRequest)
if args.DisallowShadowing {
DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans)
} else {
DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans)
}
}

// EvalAddSSTable evaluates an AddSSTable command.
Expand Down
77 changes: 77 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)

var engineImpls = []struct {
Expand Down Expand Up @@ -1056,3 +1057,79 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
})
}
}

func TestAddSSTableDisallowShadowingIntentResolution(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

// Start a transaction that writes an intent at b.
txn := db.NewTxn(ctx, "intent")
require.NoError(t, txn.Put(ctx, "b", "intent"))

// Generate an SSTable that covers keys a, b, and c, and submit it with high
// priority. This is going to abort the transaction above, encounter its
// intent, and resolve it.
sst := makeSST(t, s.Clock().Now(), map[string]string{
"a": "1",
"b": "2",
"c": "3",
})
stats := sstStats(t, sst)

ba := roachpb.BatchRequest{}
ba.Header.UserPriority = roachpb.MaxUserPriority
ba.Add(&roachpb.AddSSTableRequest{
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
Data: sst,
MVCCStats: stats,
DisallowShadowing: true,
})
_, pErr := db.NonTransactionalSender().Send(ctx, ba)
require.Nil(t, pErr)

// The transaction should now be aborted.
err := txn.Commit(ctx)
require.Error(t, err)
require.Contains(t, err.Error(), "TransactionRetryWithProtoRefreshError: TransactionAbortedError")
}

func makeSST(t *testing.T, ts hlc.Timestamp, kvs map[string]string) []byte {
t.Helper()

sstFile := &storage.MemFile{}
writer := storage.MakeBackupSSTWriter(sstFile)
defer writer.Close()

keys := make([]string, 0, len(kvs))
for key := range kvs {
keys = append(keys, key)
}
sort.Strings(keys)

for _, k := range keys {
key := storage.MVCCKey{Key: roachpb.Key(k), Timestamp: ts}
value := roachpb.Value{}
value.SetString(kvs[k])
value.InitChecksum(key.Key)
require.NoError(t, writer.Put(key, value.RawBytes))
}
require.NoError(t, writer.Finish())
writer.Close()
return sstFile.Data()
}

func sstStats(t *testing.T, sst []byte) *enginepb.MVCCStats {
t.Helper()

iter, err := storage.NewMemSSTIterator(sst, true)
require.NoError(t, err)
defer iter.Close()

stats, err := storage.ComputeStatsForRange(iter, keys.MinKey, keys.MaxKey, 0)
require.NoError(t, err)
return &stats
}