From 50f01ffbddee7745821a2d0f0345055a7befa17b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 27 Mar 2018 14:01:48 +0000 Subject: [PATCH 1/2] kv: mark AddSSTable requests "unsplittable" Usually we split span requests automatically in DistSender however AddSSTable requests are special: their large SST file is an opaque payload that contains keys that could span the post-split requests. We currently assert during ingestion that the span of an SST's contents matches the request's span, as the file is ingested whole, without modification, meaning that currently a split-and-truncated request fails. However we if a request will fail, we can reject it as soon as we'd try to split it, before sending these large, expensive files around. More importantly, if communicate back to the caller the actual range bounds, it can try again, generating re-chunked SSTs with bounds that should be OK. Note: we could try to make the recipient able to correctly ingest only the subset of the unsplit SST by iterating over it and generating a new file containing just those keys in the request span. However this is significant complexity to add to a raft command evaluation, and changes the performance characteristics of `AddSSTable` that make it appealing in the first place. Release note: none. --- pkg/kv/dist_sender.go | 5 +++++ pkg/roachpb/api.go | 19 ++++++++++--------- pkg/roachpb/batch.go | 5 +++++ 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index ba975645a4b3..3574f09510f2 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -692,6 +692,11 @@ func (ds *DistSender) divideAndSendBatchToRanges( return resp.reply, resp.pErr } + if ba.IsUnsplittable() { + mismatch := roachpb.NewRangeKeyMismatchError(rs.Key.AsRawKey(), rs.EndKey.AsRawKey(), ri.Desc()) + return nil, roachpb.NewError(mismatch) + } + // Make an empty slice of responses which will be populated with responses // as they come in via Combine(). br = &roachpb.BatchResponse{ diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 929e4f4fd5c4..e4a1ebb59335 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -103,14 +103,15 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error { } const ( - isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder - isRead // read-only cmds don't go through raft, but may run on lease holder - isWrite // write cmds go through raft and must be proposed on lease holder - isTxn // txn commands may be part of a transaction - isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution - isRange // range commands may span multiple keys - isReverse // reverse commands traverse ranges in descending direction - isAlone // requests which must be alone in a batch + isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder + isRead // read-only cmds don't go through raft, but may run on lease holder + isWrite // write cmds go through raft and must be proposed on lease holder + isTxn // txn commands may be part of a transaction + isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution + isRange // range commands may span multiple keys + isReverse // reverse commands traverse ranges in descending direction + isAlone // requests which must be alone in a batch + isUnsplittable // range command that must not be split during sending // Requests for acquiring a lease skip the (proposal-time) check that the // proposing replica has a valid lease. skipLeaseCheck @@ -1079,7 +1080,7 @@ func (*WriteBatchRequest) flags() int { return isWrite | isRange } func (*ExportRequest) flags() int { return isRead | isRange | updatesReadTSCache } func (*ImportRequest) flags() int { return isAdmin | isAlone } func (*AdminScatterRequest) flags() int { return isAdmin | isAlone | isRange } -func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange } +func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange | isUnsplittable } // RefreshRequest and RefreshRangeRequest both list // updates(Read)TSCache, though they actually update the read or write diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 228422166c6a..5243c7196af1 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -117,6 +117,11 @@ func (ba *BatchRequest) IsTransactionWrite() bool { return ba.hasFlag(isTxnWrite) } +// IsUnsplittable returns true iff the BatchRequest an un-splittable request. +func (ba *BatchRequest) IsUnsplittable() bool { + return ba.hasFlag(isUnsplittable) +} + // IsSingleRequest returns true iff the BatchRequest contains a single request. func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1 From d4d2d29ad33b7870e5cdfc0f47a769d0c10131cf Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 22 Jun 2018 14:19:07 +0000 Subject: [PATCH 2/2] storageccl: retry SST chunks with new splits on err Previously an ImportRequest would fail to add SSTables that spanned the boundries of the target range(s). This reattempts the AddSSTable call with re-chunked SSTables that avoid spanning the bounds returned in range mismatch error. It does this by iterating the SSTable to build and add smaller sstables for either side of the split. This error currently happens rarely in practice -- we usually explicitly split ranges immediately before sending an Import with matching boundsto them. Usually the empty, just-split range has no reason to split again, so the Import usually succeeds. However in some cases, like resuming a prior RESTORE, we may be re-Importing into ranges that are *not* empty and could have split at points other than those picked by the RESTORE statement. Fixes #17819. Release note: none. --- pkg/ccl/storageccl/import.go | 70 ++++++++++++ pkg/ccl/storageccl/import_test.go | 179 ++++++++++++++++++++---------- 2 files changed, 192 insertions(+), 57 deletions(-) diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index f340863f914e..bba3a7fede75 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -148,15 +148,85 @@ func AddSSTable(ctx context.Context, db *client.DB, start, end roachpb.Key, sstB if err == nil { return nil } + if m, ok := errors.Cause(err).(*roachpb.RangeKeyMismatchError); ok { + return addSplitSSTable(ctx, db, sstBytes, start, m.MismatchedRange.EndKey.AsRawKey()) + } if _, ok := err.(*roachpb.AmbiguousResultError); i == maxAddSSTableRetries || !ok { return errors.Wrapf(err, "addsstable [%s,%s)", start, end) } + log.Warningf(ctx, "addsstable [%s,%s) attempt %d failed: %+v", start, end, i, err) continue } } +func addSplitSSTable( + ctx context.Context, db *client.DB, sstBytes []byte, start, splitKey roachpb.Key, +) error { + iter, err := engineccl.NewMemSSTIterator(sstBytes, false) + if err != nil { + return err + } + defer iter.Close() + + w, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer w.Close() + + split := false + var first, last roachpb.Key + + iter.Seek(engine.MVCCKey{Key: start}) + for { + if ok, err := iter.Valid(); err != nil { + return err + } else if !ok { + break + } + + key := iter.UnsafeKey() + + if !split && key.Key.Compare(splitKey) >= 0 { + res, err := w.Finish() + if err != nil { + return err + } + if err := AddSSTable(ctx, db, first, last.PrefixEnd(), res); err != nil { + return err + } + w.Close() + w, err = engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + + split = true + first = first[:0] + last = last[:0] + } + + if len(first) == 0 { + first = append(first[:0], key.Key...) + } + last = append(last[:0], key.Key...) + + if err := w.Add(engine.MVCCKeyValue{Key: key, Value: iter.UnsafeValue()}); err != nil { + return err + } + + iter.Next() + } + + res, err := w.Finish() + if err != nil { + return err + } + return AddSSTable(ctx, db, first, last.PrefixEnd(), res) +} + // evalImport bulk loads key/value entries. func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.ImportResponse, error) { args := cArgs.Args.(*roachpb.ImportRequest) diff --git a/pkg/ccl/storageccl/import_test.go b/pkg/ccl/storageccl/import_test.go index 764621533bed..5f245d819564 100644 --- a/pkg/ccl/storageccl/import_test.go +++ b/pkg/ccl/storageccl/import_test.go @@ -35,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) @@ -155,7 +154,20 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { t.Fatal(err) } - writeSST := func(keys ...[]byte) string { + const ( + oldID = 51 + indexID = 1 + ) + + srcPrefix := makeKeyRewriterPrefixIgnoringInterleaved(oldID, indexID) + var keys []roachpb.Key + for i := 0; i < 8; i++ { + key := append([]byte(nil), srcPrefix...) + key = encoding.EncodeStringAscending(key, fmt.Sprintf("k%d", i)) + keys = append(keys, key) + } + + writeSST := func(t *testing.T, offsets []int) string { path := strconv.FormatInt(hlc.UnixNano(), 10) sst, err := engine.MakeRocksDBSstFileWriter() @@ -165,7 +177,8 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { defer sst.Close() ts := hlc.NewClock(hlc.UnixNano, time.Nanosecond).Now() value := roachpb.MakeValueFromString("bar") - for _, key := range keys { + for _, idx := range offsets { + key := keys[idx] value.ClearChecksum() value.InitChecksum(key) kv := engine.MVCCKeyValue{Key: engine.MVCCKey{Key: key, Timestamp: ts}, Value: value.RawBytes} @@ -183,51 +196,6 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { return path } - const ( - oldID = 51 - newID = 100 - indexID = 1 - ) - - kr := prefixRewriter{ - {OldPrefix: makeKeyRewriterPrefixIgnoringInterleaved(oldID, indexID), NewPrefix: makeKeyRewriterPrefixIgnoringInterleaved(newID, indexID)}, - } - var keys [][]byte - for i := 0; i < 4; i++ { - key := append([]byte(nil), kr[0].OldPrefix...) - key = encoding.EncodeStringAscending(key, fmt.Sprintf("foo%d", i)) - keys = append(keys, key) - } - - rekeys := []roachpb.ImportRequest_TableRekey{ - { - OldID: oldID, - NewDesc: mustMarshalDesc(t, &sqlbase.TableDescriptor{ - ID: newID, - PrimaryIndex: sqlbase.IndexDescriptor{ - ID: indexID, - }, - }), - }, - } - - files := []string{ - writeSST(keys[2], keys[3]), - writeSST(keys[0], keys[1]), - writeSST(keys[2], keys[3]), - } - - dataStartKey := roachpb.Key(keys[0]) - dataEndKey := roachpb.Key(keys[3]).PrefixEnd() - reqStartKey, ok := kr.rewriteKey(append([]byte(nil), dataStartKey...)) - if !ok { - t.Fatalf("failed to rewrite key: %s", reqStartKey) - } - reqEndKey, ok := kr.rewriteKey(append([]byte(nil), dataEndKey...)) - if !ok { - t.Fatalf("failed to rewrite key: %s", reqEndKey) - } - // Make the first few WriteBatch/AddSSTable calls return // AmbiguousResultError. Import should be resilient to this. const initialAmbiguousSubReqs = 3 @@ -259,7 +227,6 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { args.StoreSpecs = []base.StoreSpec{{InMemory: false, Path: filepath.Join(dir, "testserver")}} s, _, kvDB := serverutils.StartServer(t, args) defer s.Stopper().Stop(ctx) - init(s.ClusterSettings()) storage, err := ExportStorageConfFromURI("nodelocal:///foo") @@ -267,24 +234,117 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { t.Fatalf("%+v", err) } - for i := 1; i <= len(files); i++ { - t.Run(strconv.Itoa(i), func(t *testing.T) { + const splitKey1, splitKey2 = 3, 5 + // Each test case consists of some number of batches of keys, represented as + // ints [0, 8). Splits are at 3 and 5. + for i, testCase := range [][][]int{ + // Simple cases, no spanning splits, try first, last, middle, etc in each. + // r1 + {{0}}, + {{1}}, + {{2}}, + {{0, 1, 2}}, + {{0}, {1}, {2}}, + + // r2 + {{3}}, + {{4}}, + {{3, 4}}, + {{3}, {4}}, + + // r3 + {{5}}, + {{5, 6, 7}}, + {{6}}, + + // batches exactly matching spans. + {{0, 1, 2}, {3, 4}, {5, 6, 7}}, + + // every key, in its own batch. + {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}}, + + // every key in one big batch. + {{0, 1, 2, 3, 4, 5, 6, 7}}, + + // Look for off-by-ones on and around the splits. + {{2, 3}}, + {{1, 3}}, + {{2, 4}}, + {{1, 4}}, + {{1, 5}}, + {{2, 5}}, + + // Mixture of split-aligned and non-aligned batches. + {{1}, {5}, {6}}, + {{1, 2, 3}, {4, 5}, {6, 7}}, + {{0}, {2, 3, 5}, {7}}, + {{0, 4}, {5, 7}}, + {{0, 3}, {4}}, + } { + t.Run(fmt.Sprintf("%d-%v", i, testCase), func(t *testing.T) { + newID := sqlbase.ID(100 + i) + kr := prefixRewriter{{ + OldPrefix: srcPrefix, + NewPrefix: makeKeyRewriterPrefixIgnoringInterleaved(newID, indexID), + }} + rekeys := []roachpb.ImportRequest_TableRekey{ + { + OldID: oldID, + NewDesc: mustMarshalDesc(t, &sqlbase.TableDescriptor{ + ID: newID, + PrimaryIndex: sqlbase.IndexDescriptor{ + ID: indexID, + }, + }), + }, + } + + first := keys[testCase[0][0]] + last := keys[testCase[len(testCase)-1][len(testCase[len(testCase)-1])-1]] + + reqStartKey, ok := kr.rewriteKey(append([]byte(nil), keys[0]...)) + if !ok { + t.Fatalf("failed to rewrite key: %s", reqStartKey) + } + reqEndKey, ok := kr.rewriteKey(append([]byte(nil), keys[len(keys)-1].PrefixEnd()...)) + if !ok { + t.Fatalf("failed to rewrite key: %s", reqEndKey) + } + reqMidKey1, ok := kr.rewriteKey(append([]byte(nil), keys[splitKey1]...)) + if !ok { + t.Fatalf("failed to rewrite key: %s", reqMidKey1) + } + reqMidKey2, ok := kr.rewriteKey(append([]byte(nil), keys[splitKey2]...)) + if !ok { + t.Fatalf("failed to rewrite key: %s", reqMidKey2) + } + + if err := kvDB.AdminSplit(ctx, reqMidKey1, reqMidKey1); err != nil { + t.Fatal(err) + } + if err := kvDB.AdminSplit(ctx, reqMidKey2, reqMidKey2); err != nil { + t.Fatal(err) + } + atomic.StoreInt64(&remainingAmbiguousSubReqs, initialAmbiguousSubReqs) req := &roachpb.ImportRequest{ RequestHeader: roachpb.RequestHeader{Key: reqStartKey}, - DataSpan: roachpb.Span{Key: dataStartKey, EndKey: dataEndKey}, + DataSpan: roachpb.Span{Key: first, EndKey: last.PrefixEnd()}, Rekeys: rekeys, } - for _, f := range files[:i] { + var slurp []string + for ks := range testCase { + f := writeSST(t, testCase[ks]) + slurp = append(slurp, f) req.Files = append(req.Files, roachpb.ImportRequest_File{Dir: storage, Path: f}) } - expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), files[:i], kr) + expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, kr) // Import may be retried by DistSender if it takes too long to return, so // make sure it's idempotent. - for j := 0; j < 1; j++ { + for j := 0; j < 2; j++ { b := &client.Batch{} b.AddRawRequest(req) if err := kvDB.Run(ctx, b); err != nil { @@ -297,8 +357,13 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { kvs := clientKVsToEngineKVs(clientKVs) if !reflect.DeepEqual(kvs, expectedKVs) { - for _, kv := range append(kvs, expectedKVs...) { - log.Info(ctx, kv) + for i := 0; i < len(kvs) || i < len(expectedKVs); i++ { + if i < len(expectedKVs) { + t.Logf("expected %d\t%v\t%v", i, expectedKVs[i].Key, expectedKVs[i].Value) + } + if i < len(kvs) { + t.Logf("got %d\t%v\t%v", i, kvs[i].Key, kvs[i].Value) + } } t.Fatalf("got %+v expected %+v", kvs, expectedKVs) }