Skip to content

Commit

Permalink
ccl/backupccl: add memory monitor to external SST iterators in restore
Browse files Browse the repository at this point in the history
Previously, there was no limit on the amount of memory that can be used while
constructing edternal SST iterators during restore. This patch adds a memory
monitor to limit the amount of memory that can be used to construct external
SST iterators. If a restore processor fails to acquire enough memory to open
the next file for a restore span, it will send the iterator for all of the open
files it has accumulated so far, and wait until it can acquire the memory to
resume constructing the iterator for the remaining files.

Release note: None
  • Loading branch information
Rui Hu committed Mar 1, 2023
1 parent 7a3778b commit 7f4b46c
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 50 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down Expand Up @@ -286,6 +287,7 @@ go_test(
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
Expand Down
121 changes: 119 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ func checkInProgressBackupRestore(
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
<-allowResponse
},
},
Expand Down Expand Up @@ -7323,7 +7323,7 @@ func TestClientDisconnect(t *testing.T) {

args := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context) {
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, _ *execinfrapb.RestoreSpanEntry) {
blockBackupOrRestore(ctx)
}}},
Store: &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -10876,3 +10876,120 @@ func TestExportResponseDataSizeZeroCPUPagination(t *testing.T) {
sqlDB.Exec(t, `BACKUP TABLE foo INTO 'nodelocal://1/foo'`)
require.Equal(t, 2, numRequests)
}

// Verify that during restore, if an import span has too many files to fit in
// the memory budget with a single SST iterator, the restore processor should
// repeatedly open and process iterators for as many files as can fit within the
// budget until the span is finished.
func TestRestoreMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1000
const splitSize = 10
// Number of files in the backup
const expectedNumFiles = numAccounts / splitSize

restoreProcessorKnobErr := atomic.Value{}
restoreProcessorKnobCount := atomic.Uint32{}

args := base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
// The total size of the backup files should be less than the target
// SST size, thus should all fit in one import span.
if len(entry.Files) != expectedNumFiles {
restoreProcessorKnobErr.Store(errors.Newf("expected entry to have %d files, got %d", expectedNumFiles, len(entry.Files)))
}

restoreProcessorKnobCount.Add(1)
}}}}}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

// Add some splits in the table, and set the target file size to be something
// small so that we get one flushed file per split in the backup.
sqlDB.Exec(t, "ALTER TABLE data.bank SPLIT AT SELECT generate_series($1::INT, $2, $3)", 0, numAccounts, splitSize)
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.file_size = '100B'")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Verify the file counts in the backup.
var numFiles int
sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&numFiles)
require.Equal(t, numFiles, numAccounts/splitSize)

// Set the memory budget for the restore processor to be less than the total
// memory required to open all files at once.
const restoreProcessorMaxFiles = 10
require.Less(t, restoreProcessorMaxFiles, numFiles)
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")
require.Nil(t, restoreProcessorKnobErr.Load())

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
require.Equal(t, (expectedNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load()))

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}

// Verify that restore with memory monitoring should be able to succeed with
// partial SST iterators that shadow previously written values.
func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 10
const numIncrementals = 10
const restoreProcessorMaxFiles = 2

restoreProcessorKnobErr := atomic.Value{}
restoreProcessorKnobCount := atomic.Uint32{}

args := base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
restoreProcessorKnobCount.Add(1)
}}}}}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Repeatedly alter a row single and do an incremental backup.
for i := 0; i < numIncrementals; i++ {
sqlDB.Exec(t, `UPDATE data.bank SET balance = $1 WHERE id = 1`, i)
sqlDB.Exec(t, "BACKUP data.bank INTO latest IN 'userfile:///backup'")
}

// Set the memory budget for the restore processor to be enough to open 2
// files.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")
require.Nil(t, restoreProcessorKnobErr.Load())
files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'")
require.Equal(t, 12, len(files))
// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
require.Equal(t, 6, int(restoreProcessorKnobCount.Load()))

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}
8 changes: 8 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ message RestoreProgress {
roachpb.RowCount summary = 1 [(gogoproto.nullable) = false];
int64 progressIdx = 2;
roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false];
// Incomplete when true indicates that the processor has not finished
// processing all of the data in DataSpan and there are still more
// RestoreProgress on the way for DataSpan. When false, the processor has
// finished processing the entirety of DataSpan. Note that the default value
// of false will always indicate that the span has been completely processed,
// which is useful for processors on older versions that are not aware of this
// field.
bool incomplete = 4;
}

message BackupProcessorPlanningTraceEvent {
Expand Down
115 changes: 115 additions & 0 deletions pkg/ccl/backupccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,27 @@
package backupccl

import (
"context"
"fmt"
"net/url"
"path"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
"github.com/stretchr/testify/require"
)

func BenchmarkDatabaseBackup(b *testing.B) {
Expand Down Expand Up @@ -147,3 +163,102 @@ func BenchmarkDatabaseFullBackup(b *testing.B) {
// *skip*--i.e., the number of bytes in the full backup.
b.SetBytes(int64(b.N) * dataSize)
}

func BenchmarkIteratorMemory(b *testing.B) {
defer leaktest.AfterTest(b)()
ctx := context.Background()

numAccounts := 1000
tc, _, _, cleanupFn := backupRestoreTestSetup(b, singleNode, numAccounts, InitManualReplication)
defer cleanupFn()

for _, testCase := range []struct {
testName string
}{
{
testName: "gcp",
},
{
testName: "aws",
},
} {
b.Run(testCase.testName, func(b *testing.B) {
sst := envutil.EnvOrDefaultString("SST_PATH", "unknown")
encKey := envutil.EnvOrDefaultString("ENCRYPTION_KEY", "invalid")

uri, err := url.Parse(sst)
if err != nil {
b.Fatal(err)
}

storeURI := fmt.Sprintf("%s://%s/%s?AUTH=implicit", uri.Scheme, uri.Host, path.Dir(uri.Path))
dataFile := path.Base(uri.Path)

store, err := cloud.ExternalStorageFromURI(
ctx,
storeURI,
base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
username.RootUserName(),
tc.Servers[0].InternalDB().(isql.DB),
nil, /* limiters */
cloud.NilMetrics,
)

require.NoError(b, err)

for _, encrypted := range []bool{true, false} {
for _, iterCount := range []int{1, 10, 100, 1000} {
for _, fileCount := range []int{10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("fileCount=%d/iterCount=%d/enc=%t", fileCount, iterCount, encrypted), func(b *testing.B) {
b.ResetTimer()

fileStores := make([]storageccl.StoreFile, fileCount)

for i := 0; i < fileCount; i++ {
fileStores[i].Store = store
fileStores[i].FilePath = dataFile
}

execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: execCfg.Clock.Now(),
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}

iters := make([]storage.SimpleMVCCIterator, iterCount)
cleanup := func() {
for _, iter := range iters {
if iter != nil {
iter.Close()
}
}
}
defer cleanup()

var enc *kvpb.FileEncryptionOptions
if encrypted {
enc = &kvpb.FileEncryptionOptions{
Key: []byte(encKey),
}
}

for j := 0; j < iterCount; j++ {
iter, err := storageccl.ExternalSSTReader(ctx, fileStores, enc, iterOpts)
require.NoError(b, err)

iters[j] = iter
iter.SeekGE(storage.MVCCKey{})
}

b.StopTimer()
})
}
}
}
})
}
}
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -944,7 +945,7 @@ func TestReintroduceOfflineSpans(t *testing.T) {
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
mu.Lock()
defer mu.Unlock()
if entriesCount == 0 {
Expand Down
Loading

0 comments on commit 7f4b46c

Please sign in to comment.