Skip to content

Commit

Permalink
bulk: replace old SSTIterators with PebbleSSTIterator in bulk stack
Browse files Browse the repository at this point in the history
This is purely a refactor. The only prod refactor is in the SSTBatcher, where
it calls ComputeStatsForRange. This  should not have a significant perf hit
as the iterator only surfaces point keys and the bulk of sst_batcher comput
time is spent in doing ingestion. Currently the pebbleIterator is
slightly slower than the old iterator, but optimizations are coming soon
(cockroachdb#83051).

Release note: none
  • Loading branch information
msbutler committed Jul 26, 2022
1 parent 3adb070 commit d411861
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
Expand Down
9 changes: 7 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand All @@ -66,8 +67,12 @@ func slurpSSTablesLatestKey(
if err != nil {
t.Fatal(err)
}

sst, err := storage.NewSSTIterator(file)
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
sst, err := storage.NewPebbleSSTIterator([]sstable.ReadableFile{file}, iterOpts)
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func MakeAndRegisterConcurrencyLimiter(sv *settings.Values) limit.ConcurrentRequ
// bytes, etc. If configured with a non-nil, populated range cache, it will use
// it to attempt to flush SSTs before they cross range boundaries to minimize
// expensive on-split retries.
//
// Note: the SSTBatcher currently cannot bulk add range keys.
type SSTBatcher struct {
name string
db *kv.DB
Expand Down Expand Up @@ -644,7 +646,15 @@ func (b *SSTBatcher) addSSTable(
updatesLastRange bool,
) error {
sendStart := timeutil.Now()
iter, err := storage.NewMemSSTIterator(sstBytes, true)

// Currently, the SSTBatcher cannot ingest range keys, so it is safe to
// ComputeStatsForRange with an iterator that only surfaces point keys.
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: start,
UpperBound: end,
}
iter, err := storage.NewPebbleMemSSTIterator(sstBytes, true, iterOpts)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ func TestDuplicateHandling(t *testing.T) {
require.NoError(t, err.GoError())
keyCount := 0
for _, file := range resp.(*roachpb.ExportResponse).Files {
it, err := storage.NewMemSSTIterator(file.SST, false /* verify */)
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
it, err := storage.NewPebbleMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
Expand Down
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func TestExportCmd(t *testing.T) {
var kvs []storage.MVCCKeyValue
for _, file := range res.(*roachpb.ExportResponse).Files {
paths = append(paths, file.Path)

sst, err := storage.NewMemSSTIterator(file.SST, false)
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
sst, err := storage.NewPebbleMemSSTIterator(file.SST, true, iterOpts)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down Expand Up @@ -505,7 +509,12 @@ func loadSST(t *testing.T, data []byte, start, end roachpb.Key) []storage.MVCCKe
return nil
}

sst, err := storage.NewMemSSTIterator(data, false)
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: start,
UpperBound: end,
}
sst, err := storage.NewPebbleMemSSTIterator(data, true, iterOpts)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d411861

Please sign in to comment.