Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storageccl: retry SST chunks with new splits on err #26984

Merged
merged 2 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,85 @@ func AddSSTable(ctx context.Context, db *client.DB, start, end roachpb.Key, sstB
if err == nil {
return nil
}
if m, ok := errors.Cause(err).(*roachpb.RangeKeyMismatchError); ok {
return addSplitSSTable(ctx, db, sstBytes, start, m.MismatchedRange.EndKey.AsRawKey())
}
if _, ok := err.(*roachpb.AmbiguousResultError); i == maxAddSSTableRetries || !ok {
return errors.Wrapf(err, "addsstable [%s,%s)", start, end)
}

log.Warningf(ctx, "addsstable [%s,%s) attempt %d failed: %+v",
start, end, i, err)
continue
}
}

func addSplitSSTable(
ctx context.Context, db *client.DB, sstBytes []byte, start, splitKey roachpb.Key,
) error {
iter, err := engineccl.NewMemSSTIterator(sstBytes, false)
if err != nil {
return err
}
defer iter.Close()

w, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer w.Close()

split := false
var first, last roachpb.Key

iter.Seek(engine.MVCCKey{Key: start})
for {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
break
}

key := iter.UnsafeKey()

if !split && key.Key.Compare(splitKey) >= 0 {
res, err := w.Finish()
if err != nil {
return err
}
if err := AddSSTable(ctx, db, first, last.PrefixEnd(), res); err != nil {
return err
}
w.Close()
w, err = engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}

split = true
first = first[:0]
last = last[:0]
}

if len(first) == 0 {
first = append(first[:0], key.Key...)
}
last = append(last[:0], key.Key...)

if err := w.Add(engine.MVCCKeyValue{Key: key, Value: iter.UnsafeValue()}); err != nil {
return err
}

iter.Next()
}

res, err := w.Finish()
if err != nil {
return err
}
return AddSSTable(ctx, db, first, last.PrefixEnd(), res)
}

// evalImport bulk loads key/value entries.
func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.ImportResponse, error) {
args := cArgs.Args.(*roachpb.ImportRequest)
Expand Down
179 changes: 122 additions & 57 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -155,7 +154,20 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
t.Fatal(err)
}

writeSST := func(keys ...[]byte) string {
const (
oldID = 51
indexID = 1
)

srcPrefix := makeKeyRewriterPrefixIgnoringInterleaved(oldID, indexID)
var keys []roachpb.Key
for i := 0; i < 8; i++ {
key := append([]byte(nil), srcPrefix...)
key = encoding.EncodeStringAscending(key, fmt.Sprintf("k%d", i))
keys = append(keys, key)
}

writeSST := func(t *testing.T, offsets []int) string {
path := strconv.FormatInt(hlc.UnixNano(), 10)

sst, err := engine.MakeRocksDBSstFileWriter()
Expand All @@ -165,7 +177,8 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
defer sst.Close()
ts := hlc.NewClock(hlc.UnixNano, time.Nanosecond).Now()
value := roachpb.MakeValueFromString("bar")
for _, key := range keys {
for _, idx := range offsets {
key := keys[idx]
value.ClearChecksum()
value.InitChecksum(key)
kv := engine.MVCCKeyValue{Key: engine.MVCCKey{Key: key, Timestamp: ts}, Value: value.RawBytes}
Expand All @@ -183,51 +196,6 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
return path
}

const (
oldID = 51
newID = 100
indexID = 1
)

kr := prefixRewriter{
{OldPrefix: makeKeyRewriterPrefixIgnoringInterleaved(oldID, indexID), NewPrefix: makeKeyRewriterPrefixIgnoringInterleaved(newID, indexID)},
}
var keys [][]byte
for i := 0; i < 4; i++ {
key := append([]byte(nil), kr[0].OldPrefix...)
key = encoding.EncodeStringAscending(key, fmt.Sprintf("foo%d", i))
keys = append(keys, key)
}

rekeys := []roachpb.ImportRequest_TableRekey{
{
OldID: oldID,
NewDesc: mustMarshalDesc(t, &sqlbase.TableDescriptor{
ID: newID,
PrimaryIndex: sqlbase.IndexDescriptor{
ID: indexID,
},
}),
},
}

files := []string{
writeSST(keys[2], keys[3]),
writeSST(keys[0], keys[1]),
writeSST(keys[2], keys[3]),
}

dataStartKey := roachpb.Key(keys[0])
dataEndKey := roachpb.Key(keys[3]).PrefixEnd()
reqStartKey, ok := kr.rewriteKey(append([]byte(nil), dataStartKey...))
if !ok {
t.Fatalf("failed to rewrite key: %s", reqStartKey)
}
reqEndKey, ok := kr.rewriteKey(append([]byte(nil), dataEndKey...))
if !ok {
t.Fatalf("failed to rewrite key: %s", reqEndKey)
}

// Make the first few WriteBatch/AddSSTable calls return
// AmbiguousResultError. Import should be resilient to this.
const initialAmbiguousSubReqs = 3
Expand Down Expand Up @@ -259,32 +227,124 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
args.StoreSpecs = []base.StoreSpec{{InMemory: false, Path: filepath.Join(dir, "testserver")}}
s, _, kvDB := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

init(s.ClusterSettings())

storage, err := ExportStorageConfFromURI("nodelocal:///foo")
if err != nil {
t.Fatalf("%+v", err)
}

for i := 1; i <= len(files); i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
const splitKey1, splitKey2 = 3, 5
// Each test case consists of some number of batches of keys, represented as
// ints [0, 8). Splits are at 3 and 5.
for i, testCase := range [][][]int{
// Simple cases, no spanning splits, try first, last, middle, etc in each.
// r1
{{0}},
{{1}},
{{2}},
{{0, 1, 2}},
{{0}, {1}, {2}},

// r2
{{3}},
{{4}},
{{3, 4}},
{{3}, {4}},

// r3
{{5}},
{{5, 6, 7}},
{{6}},

// batches exactly matching spans.
{{0, 1, 2}, {3, 4}, {5, 6, 7}},

// every key, in its own batch.
{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}},

// every key in one big batch.
{{0, 1, 2, 3, 4, 5, 6, 7}},

// Look for off-by-ones on and around the splits.
{{2, 3}},
{{1, 3}},
{{2, 4}},
{{1, 4}},
{{1, 5}},
{{2, 5}},

// Mixture of split-aligned and non-aligned batches.
{{1}, {5}, {6}},
{{1, 2, 3}, {4, 5}, {6, 7}},
{{0}, {2, 3, 5}, {7}},
{{0, 4}, {5, 7}},
{{0, 3}, {4}},
} {
t.Run(fmt.Sprintf("%d-%v", i, testCase), func(t *testing.T) {
newID := sqlbase.ID(100 + i)
kr := prefixRewriter{{
OldPrefix: srcPrefix,
NewPrefix: makeKeyRewriterPrefixIgnoringInterleaved(newID, indexID),
}}
rekeys := []roachpb.ImportRequest_TableRekey{
{
OldID: oldID,
NewDesc: mustMarshalDesc(t, &sqlbase.TableDescriptor{
ID: newID,
PrimaryIndex: sqlbase.IndexDescriptor{
ID: indexID,
},
}),
},
}

first := keys[testCase[0][0]]
last := keys[testCase[len(testCase)-1][len(testCase[len(testCase)-1])-1]]

reqStartKey, ok := kr.rewriteKey(append([]byte(nil), keys[0]...))
if !ok {
t.Fatalf("failed to rewrite key: %s", reqStartKey)
}
reqEndKey, ok := kr.rewriteKey(append([]byte(nil), keys[len(keys)-1].PrefixEnd()...))
if !ok {
t.Fatalf("failed to rewrite key: %s", reqEndKey)
}
reqMidKey1, ok := kr.rewriteKey(append([]byte(nil), keys[splitKey1]...))
if !ok {
t.Fatalf("failed to rewrite key: %s", reqMidKey1)
}
reqMidKey2, ok := kr.rewriteKey(append([]byte(nil), keys[splitKey2]...))
if !ok {
t.Fatalf("failed to rewrite key: %s", reqMidKey2)
}

if err := kvDB.AdminSplit(ctx, reqMidKey1, reqMidKey1); err != nil {
t.Fatal(err)
}
if err := kvDB.AdminSplit(ctx, reqMidKey2, reqMidKey2); err != nil {
t.Fatal(err)
}

atomic.StoreInt64(&remainingAmbiguousSubReqs, initialAmbiguousSubReqs)

req := &roachpb.ImportRequest{
RequestHeader: roachpb.RequestHeader{Key: reqStartKey},
DataSpan: roachpb.Span{Key: dataStartKey, EndKey: dataEndKey},
DataSpan: roachpb.Span{Key: first, EndKey: last.PrefixEnd()},
Rekeys: rekeys,
}

for _, f := range files[:i] {
var slurp []string
for ks := range testCase {
f := writeSST(t, testCase[ks])
slurp = append(slurp, f)
req.Files = append(req.Files, roachpb.ImportRequest_File{Dir: storage, Path: f})
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), files[:i], kr)
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, kr)

// Import may be retried by DistSender if it takes too long to return, so
// make sure it's idempotent.
for j := 0; j < 1; j++ {
for j := 0; j < 2; j++ {
b := &client.Batch{}
b.AddRawRequest(req)
if err := kvDB.Run(ctx, b); err != nil {
Expand All @@ -297,8 +357,13 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
kvs := clientKVsToEngineKVs(clientKVs)

if !reflect.DeepEqual(kvs, expectedKVs) {
for _, kv := range append(kvs, expectedKVs...) {
log.Info(ctx, kv)
for i := 0; i < len(kvs) || i < len(expectedKVs); i++ {
if i < len(expectedKVs) {
t.Logf("expected %d\t%v\t%v", i, expectedKVs[i].Key, expectedKVs[i].Value)
}
if i < len(kvs) {
t.Logf("got %d\t%v\t%v", i, kvs[i].Key, kvs[i].Value)
}
}
t.Fatalf("got %+v expected %+v", kvs, expectedKVs)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,11 @@ func (ds *DistSender) divideAndSendBatchToRanges(
return resp.reply, resp.pErr
}

if ba.IsUnsplittable() {
mismatch := roachpb.NewRangeKeyMismatchError(rs.Key.AsRawKey(), rs.EndKey.AsRawKey(), ri.Desc())
return nil, roachpb.NewError(mismatch)
}

// Make an empty slice of responses which will be populated with responses
// as they come in via Combine().
br = &roachpb.BatchResponse{
Expand Down
19 changes: 10 additions & 9 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error {
}

const (
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isUnsplittable // range command that must not be split during sending
// Requests for acquiring a lease skip the (proposal-time) check that the
// proposing replica has a valid lease.
skipLeaseCheck
Expand Down Expand Up @@ -1079,7 +1080,7 @@ func (*WriteBatchRequest) flags() int { return isWrite | isRange }
func (*ExportRequest) flags() int { return isRead | isRange | updatesReadTSCache }
func (*ImportRequest) flags() int { return isAdmin | isAlone }
func (*AdminScatterRequest) flags() int { return isAdmin | isAlone | isRange }
func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange }
func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange | isUnsplittable }

// RefreshRequest and RefreshRangeRequest both list
// updates(Read)TSCache, though they actually update the read or write
Expand Down
5 changes: 5 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func (ba *BatchRequest) IsTransactionWrite() bool {
return ba.hasFlag(isTxnWrite)
}

// IsUnsplittable returns true iff the BatchRequest an un-splittable request.
func (ba *BatchRequest) IsUnsplittable() bool {
return ba.hasFlag(isUnsplittable)
}

// IsSingleRequest returns true iff the BatchRequest contains a single request.
func (ba *BatchRequest) IsSingleRequest() bool {
return len(ba.Requests) == 1
Expand Down