Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport-19.1: bulk: change AddSSTTable to not be recursive #36840

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 85 additions & 39 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,57 +189,95 @@ func (b *SSTBatcher) GetSummary() roachpb.BulkOpSummary {
return b.totalRows
}

type sender interface {
AddSSTable(ctx context.Context, begin, end interface{}, data []byte) error
}

type sstSpan struct {
start, end roachpb.Key
sstBytes []byte
}

// AddSSTable retries db.AddSSTable if retryable errors occur, including if the
// SST spans a split, in which case it is iterated and split into two SSTs, one
// for each side of the split in the error, and each are retried.
func AddSSTable(ctx context.Context, db *client.DB, start, end roachpb.Key, sstBytes []byte) error {
const maxAddSSTableRetries = 10
var err error
for i := 0; i < maxAddSSTableRetries; i++ {
log.VEventf(ctx, 2, "sending %s AddSSTable [%s,%s)", sz(len(sstBytes)), start, end)
// This will fail if the range has split but we'll check for that below.
err = db.AddSSTable(ctx, start, end, sstBytes)
if err == nil {
return nil
}
// This range has split -- we need to split the SST to try again.
if m, ok := errors.Cause(err).(*roachpb.RangeKeyMismatchError); ok {
split := m.MismatchedRange.EndKey.AsRawKey()
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
return addSplitSSTable(ctx, db, sstBytes, start, split)
func AddSSTable(ctx context.Context, db sender, start, end roachpb.Key, sstBytes []byte) error {
work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes}}
// Create an iterator that iterates over the top level SST to produce all the splits.
var iter engine.SimpleIterator
defer func() {
if iter != nil {
iter.Close()
}
// Retry on AmbiguousResult.
if _, ok := err.(*roachpb.AmbiguousResultError); ok {
log.Warningf(ctx, "addsstable [%s,%s) attempt %d failed: %+v", start, end, i, err)
continue
}()
const maxAddSSTableRetries = 10
for len(work) > 0 {
item := work[0]
work = work[1:]
if err := func() error {
var err error
for i := 0; i < maxAddSSTableRetries; i++ {
log.VEventf(ctx, 2, "sending %s AddSSTable [%s,%s)", sz(len(sstBytes)), start, end)
// This will fail if the range has split but we'll check for that below.
err = db.AddSSTable(ctx, item.start, item.end, item.sstBytes)
if err == nil {
return nil
}
// This range has split -- we need to split the SST to try again.
if m, ok := errors.Cause(err).(*roachpb.RangeKeyMismatchError); ok {
if iter == nil {
iter, err = engine.NewMemSSTIterator(sstBytes, false)
if err != nil {
return err
}
}
split := m.MismatchedRange.EndKey.AsRawKey()
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
left, right, err := createSplitSSTable(ctx, db, item.start, split, iter)
if err != nil {
return err
}
// Add more work.
work = append([]*sstSpan{left, right}, work...)
return nil
}
// Retry on AmbiguousResult.
if _, ok := err.(*roachpb.AmbiguousResultError); ok {
log.Warningf(ctx, "addsstable [%s,%s) attempt %d failed: %+v", start, end, i, err)
continue
}
}
return errors.Wrapf(err, "addsstable [%s,%s)", item.start, item.end)
}(); err != nil {
return err
}
// explicitly deallocate SST. This will not deallocate the
// top level SST which is kept around to iterate over.
item.sstBytes = nil
}
return errors.Wrapf(err, "addsstable [%s,%s)", start, end)
}

// addSplitSSTable is a helper for splitting up and retrying AddSStable calls.
func addSplitSSTable(
ctx context.Context, db *client.DB, sstBytes []byte, start, splitKey roachpb.Key,
) error {
iter, err := engine.NewMemSSTIterator(sstBytes, false)
if err != nil {
return err
}
defer iter.Close()
return nil
}

// createSplitSSTable is a helper for splitting up SSTs. The iterator
// passed in is over the top level SST passed into AddSSTTable().
func createSplitSSTable(
ctx context.Context, db sender, start, splitKey roachpb.Key, iter engine.SimpleIterator,
) (*sstSpan, *sstSpan, error) {
w, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
return nil, nil, err
}
defer w.Close()

split := false
var first, last roachpb.Key
var left, right *sstSpan

iter.Seek(engine.MVCCKey{Key: start})
for {
if ok, err := iter.Valid(); err != nil {
return err
return nil, nil, err
} else if !ok {
break
}
Expand All @@ -249,15 +287,18 @@ func addSplitSSTable(
if !split && key.Key.Compare(splitKey) >= 0 {
res, err := w.Finish()
if err != nil {
return err
return nil, nil, err
}
if err := AddSSTable(ctx, db, first, last.PrefixEnd(), res); err != nil {
return err
left = &sstSpan{
start: first,
end: last.PrefixEnd(),
sstBytes: res,
}

w.Close()
w, err = engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
return nil, nil, err
}

split = true
Expand All @@ -271,15 +312,20 @@ func addSplitSSTable(
last = append(last[:0], key.Key...)

if err := w.Add(engine.MVCCKeyValue{Key: key, Value: iter.UnsafeValue()}); err != nil {
return err
return nil, nil, err
}

iter.Next()
}

res, err := w.Finish()
if err != nil {
return err
return nil, nil, err
}
right = &sstSpan{
start: first,
end: last.PrefixEnd(),
sstBytes: res,
}
return AddSSTable(ctx, db, first, last.PrefixEnd(), res)
return left, right, nil
}
93 changes: 92 additions & 1 deletion pkg/storage/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"reflect"
"runtime"
"strings"
"testing"

Expand All @@ -27,10 +28,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

Expand Down Expand Up @@ -203,8 +206,96 @@ func runTestImport(t *testing.T, batchSize int64) {
t.Logf("got %d\t%v\t%v", i, got[i].Key, got[i].Value)
}
}
t.Fatalf("got %+v expected %+v", got, expected)
t.Fatalf("got %+v\nexpected %+v", got, expected)
}
})
}
}

type mockSender func(span roachpb.Span) error

func (m mockSender) AddSSTable(ctx context.Context, begin, end interface{}, data []byte) error {
return m(roachpb.Span{Key: begin.(roachpb.Key), EndKey: end.(roachpb.Key)})
}

// TestAddBigSpanningSSTWithSplits tests a situation where a large
// spanning SST is being ingested over a span with a lot of splits.
func TestAddBigSpanningSSTWithSplits(t *testing.T) {
defer leaktest.AfterTest(t)()

if testing.Short() {
t.Skip("this test needs to do a larger SST to see the quadratic mem usage on retries kick in.")
}

const numKeys, valueSize, splitEvery = 500, 5000, 1

prefix := encoding.EncodeUvarintAscending(keys.MakeTablePrefix(uint32(100)), uint64(1))
key := func(i int) []byte {
return encoding.EncodeVarintAscending(append([]byte{}, prefix...), int64(i))
}

var splits []roachpb.Key

// Create a large SST.
w, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
t.Fatal(err)
}
defer w.Close()
r, _ := randutil.NewPseudoRand()
buf := make([]byte, valueSize)
for i := 0; i < numKeys; i++ {
randutil.ReadTestdataBytes(r, buf)
if i%splitEvery == 0 {
splits = append(splits, key(i))
}
if err := w.Add(engine.MVCCKeyValue{
Key: engine.MVCCKey{Key: key(i)},
Value: roachpb.MakeValueFromString(string(buf)).RawBytes,
}); err != nil {
t.Fatal(err)
}
}
sst, err := w.Finish()
if err != nil {
t.Fatal(err)
}

// Keep track of the memory.
getMem := func() uint64 {
var stats runtime.MemStats
runtime.ReadMemStats(&stats)
return stats.HeapInuse
}
var early, late uint64
var totalAdditionAttempts int
mock := mockSender(func(span roachpb.Span) error {
totalAdditionAttempts++
for i := range splits {
if span.ContainsKey(splits[i]) && !span.Key.Equal(splits[i]) {
earlySplit := numKeys / 100
if i == earlySplit {
early = getMem()
} else if i == len(splits)-earlySplit {
late = getMem()
}
return &roachpb.RangeKeyMismatchError{
MismatchedRange: &roachpb.RangeDescriptor{EndKey: roachpb.RKey(splits[i])},
}
}
}
return nil
})

const kb = 1 << 10

t.Logf("Adding %dkb sst spanning %d splits", len(sst)/kb, len(splits))
if err := bulk.AddSSTable(context.TODO(), mock, key(0), key(numKeys), sst); err != nil {
t.Fatal(err)
}
t.Logf("Adding took %d total attempts", totalAdditionAttempts)
if late > early*2 {
t.Fatalf("Mem usage grew from %dkb before grew to %dkb later (%.2fx)",
early/kb, late/kb, float64(late)/float64(early))
}
}