From fca9148bc25e785d4afe4f76cb632b50b83b6cfb Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Fri, 9 Dec 2022 10:37:54 -0500 Subject: [PATCH] ccl/backupccl: add memory monitor to external SST iterators in restore Previously, there was no limit on the amount of memory that can be used while constructing edternal SST iterators during restore. This patch adds a memory monitor to limit the amount of memory that can be used to construct external SST iterators. If a restore processor fails to acquire enough memory to open the next file for a restore span, it will send the iterator for all of the open files it has accumulated so far, and wait until it can acquire the memory to resume constructing the iterator for the remaining files. The memory limit can be controlled via the new cluster setting bulkio.restore.per_processor_memory_limit. Regardless of the setting, however, the amount of memory used will not exceed COCKROACH_RESTORE_MEM_FRACTION * max SQL memory. The new environment variable COCKROACH_RESTORE_MEM_FRACTION defaults to 0.5. Release note: None --- pkg/BUILD.bazel | 2 + pkg/ccl/backupccl/BUILD.bazel | 4 + pkg/ccl/backupccl/backup_test.go | 148 ++++++++- pkg/ccl/backupccl/backuppb/backup.proto | 4 + pkg/ccl/backupccl/backuputils/BUILD.bazel | 32 +- .../backuputils/memory_backed_quota_pool.go | 127 ++++++++ .../memory_backed_quota_pool_test.go | 90 ++++++ pkg/ccl/backupccl/bench_test.go | 200 ++++++++++++ .../full_cluster_backup_restore_test.go | 3 +- pkg/ccl/backupccl/restore_data_processor.go | 289 ++++++++++++++---- .../backupccl/restore_data_processor_test.go | 12 +- pkg/ccl/backupccl/restore_job.go | 12 +- .../backupccl/restore_processor_planning.go | 24 +- pkg/ccl/backupccl/restore_progress.go | 32 +- pkg/ccl/backupccl/restore_progress_test.go | 25 +- pkg/server/server_sql.go | 1 + pkg/settings/float.go | 13 + pkg/sql/exec_util.go | 2 +- pkg/sql/execinfra/server_config.go | 8 + pkg/sql/execinfrapb/processors_bulk_io.proto | 10 +- 20 files changed, 945 insertions(+), 93 deletions(-) create mode 100644 pkg/ccl/backupccl/backuputils/memory_backed_quota_pool.go create mode 100644 pkg/ccl/backupccl/backuputils/memory_backed_quota_pool_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a97b478c198a..296bdf6e7690 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -18,6 +18,7 @@ ALL_TESTS = [ "//pkg/ccl/backupccl/backupinfo:backupinfo_test", "//pkg/ccl/backupccl/backuprand:backuprand_test", "//pkg/ccl/backupccl/backupresolver:backupresolver_test", + "//pkg/ccl/backupccl/backuputils:backuputils_test", "//pkg/ccl/backupccl:backupccl_test", "//pkg/ccl/baseccl:baseccl_test", "//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test", @@ -728,6 +729,7 @@ GO_TARGETS = [ "//pkg/ccl/backupccl/backupresolver:backupresolver_test", "//pkg/ccl/backupccl/backuptestutils:backuptestutils", "//pkg/ccl/backupccl/backuputils:backuputils", + "//pkg/ccl/backupccl/backuputils:backuputils_test", "//pkg/ccl/backupccl:backupccl", "//pkg/ccl/backupccl:backupccl_test", "//pkg/ccl/baseccl:baseccl", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 5cc16dba0fc2..8d9812092a03 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -142,6 +142,7 @@ go_library( "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/protoutil", + "//pkg/util/quotapool", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/span", @@ -208,6 +209,7 @@ go_test( "//pkg/ccl/backupccl/backupinfo", "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/backupccl/backuptestutils", + "//pkg/ccl/backupccl/backuputils", "//pkg/ccl/kvccl", "//pkg/ccl/multiregionccl", "//pkg/ccl/multiregionccl/multiregionccltestutils", @@ -288,7 +290,9 @@ go_test( "//pkg/util/admission", "//pkg/util/ctxgroup", "//pkg/util/encoding", + "//pkg/util/envutil", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/ioctx", "//pkg/util/leaktest", "//pkg/util/limit", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 2fe0ac91e1a8..36f0af146622 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1440,7 +1440,7 @@ func checkInProgressBackupRestore( <-exportSpanCompleteCh } }, - RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) { <-allowResponse }, }, @@ -1611,7 +1611,7 @@ func TestRestoreCheckpointing(t *testing.T) { knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ - RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) { // Because the restore processor has several workers that // concurrently send addsstable requests and because all workers will // wait on the lock below, when one flush gets blocked on the @@ -7349,7 +7349,7 @@ func TestClientDisconnect(t *testing.T) { args := base.TestClusterArgs{} knobs := base.TestingKnobs{ - DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context) { + DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, _ *execinfrapb.RestoreSpanEntry) { blockBackupOrRestore(ctx) }}}, Store: &kvserver.StoreTestingKnobs{ @@ -11029,3 +11029,145 @@ CREATE TABLE child_pk (k INT8 PRIMARY KEY REFERENCES parent); sqlDB.Exec(t, `DROP DATABASE test`) } } + +// Verify that during restore, if a restore span has too many files to fit in +// the memory budget with a single SST iterator, the restore processor should +// repeatedly open and process iterators for as many files as can fit within the +// budget until the span is finished. +func TestRestoreMemoryMonitoring(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const splitSize = 10 + for _, numSplits := range []int{10, 100, 1000} { + for _, numInc := range []int{0, 1, 3, 10} { + for _, restoreProcessorMaxFiles := range []int{5, 10, 20} { + t.Run(fmt.Sprintf("splits=%d/inc=%d/procMaxFiles=%d", numSplits, numInc, restoreProcessorMaxFiles), func(t *testing.T) { + numAccounts := numSplits * splitSize + var expectedNumFiles int + restoreProcessorKnobCount := atomic.Uint32{} + args := base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) { + // The total size of the backup files should be less than the target + // SST size, thus should all fit in one import span. + require.Equal(t, expectedNumFiles, len(entry.Files)) + restoreProcessorKnobCount.Add(1) + }, + }, + }, + }, + } + params := base.TestClusterArgs{ServerArgs: args} + _, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params) + defer cleanupFn() + + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true") + + // Add some splits in the table, and set the target file size to be something + // small so that we get one flushed file per split in the backup. + sqlDB.Exec(t, "ALTER TABLE data.bank SPLIT AT SELECT generate_series($1::INT, $2, $3)", 0, numAccounts, splitSize) + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.file_size = '1B'") + sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'") + + // Take some incremental backups after mutating some rows. Take note of the + // splits that have been changed as that determines the number of incremental + // files that are created. + var numIncFiles int + for i := 0; i < numInc; i++ { + incSplitsWithFile := make(map[int]bool) + + for n := 0; n < 100; n++ { + id := rand.Intn(numAccounts) + sqlDB.Exec(t, `UPDATE data.bank SET balance = balance + 1 WHERE id = $1`, i) + split := id / splitSize + incSplitsWithFile[split] = true + } + + numIncFiles += len(incSplitsWithFile) + } + + expectedNumFiles += numSplits + numIncFiles + // Verify the file counts in the backup. + var numFiles int + sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&numFiles) + require.Equal(t, numFiles, expectedNumFiles) + + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile) + + sqlDB.Exec(t, "CREATE DATABASE data2") + sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')") + + // Assert that the restore processor is processing the same span multiple + // times, and the count is based on what's expected from the memory budget. + // The expected number is just the ceiling of expectedNumFiles/restoreProcessorMaxFiles. + require.Equal(t, (expectedNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load())) + + // Verify data in the restored table. + expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank") + actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank") + require.Equal(t, expectedFingerprints, actualFingerprints) + }) + } + } + } +} + +// Verify that restore with memory monitoring should be able to succeed with +// partial SST iterators that shadow previously written values. +func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numAccounts = 10 + const numIncrementals = 10 + const restoreProcessorMaxFiles = 5 + + restoreProcessorKnobCount := atomic.Uint32{} + + args := base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) { + restoreProcessorKnobCount.Add(1) + }, + }, + }, + }, + } + params := base.TestClusterArgs{ServerArgs: args} + _, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params) + defer cleanupFn() + + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true") + sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'") + + // Repeatedly alter a single row and do an incremental backup. + for i := 0; i < numIncrementals; i++ { + sqlDB.Exec(t, `UPDATE data.bank SET balance = $1 WHERE id = $2`, 1000+i, i) + sqlDB.Exec(t, "BACKUP data.bank INTO latest IN 'userfile:///backup'") + } + + // Set the memory budget for the restore processor to be enough to open 5 + // files. + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile) + + sqlDB.Exec(t, "CREATE DATABASE data2") + sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')") + files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'") + require.Equal(t, 11, len(files)) // 1 file for full + 10 for 10 incrementals + + // Assert that the restore processor is processing the same span multiple + // times, and the count is based on what's expected from the memory budget. + require.Equal(t, 3, int(restoreProcessorKnobCount.Load())) // Ceiling(11/5) + + // Verify data in the restored table. + expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank") + actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank") + require.Equal(t, expectedFingerprints, actualFingerprints) +} diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index d94d064a9ba8..a3b00fa84e02 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -194,6 +194,10 @@ message RestoreProgress { roachpb.RowCount summary = 1 [(gogoproto.nullable) = false]; int64 progressIdx = 2; roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false]; + // CompleteUpTo is the timestamp that the data in DataSpan has been processed + // up to so far. The entire span has been processed if this timestamp is equal + // to restore end time. + util.hlc.Timestamp complete_up_to = 4 [(gogoproto.nullable) = false]; } message BackupProcessorPlanningTraceEvent { diff --git a/pkg/ccl/backupccl/backuputils/BUILD.bazel b/pkg/ccl/backupccl/backuputils/BUILD.bazel index d7759cfad4d2..a3e4ec74e43f 100644 --- a/pkg/ccl/backupccl/backuputils/BUILD.bazel +++ b/pkg/ccl/backupccl/backuputils/BUILD.bazel @@ -1,12 +1,38 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "backuputils", - srcs = ["utils.go"], + srcs = [ + "memory_backed_quota_pool.go", + "utils.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils", visibility = ["//visibility:public"], - deps = ["//pkg/cloud"], + deps = [ + "//pkg/cloud", + "//pkg/util/mon", + "//pkg/util/quotapool", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", + ], +) + +go_test( + name = "backuputils_test", + srcs = ["memory_backed_quota_pool_test.go"], + args = ["-test.timeout=295s"], + embed = [":backuputils"], + tags = ["ccl_test"], + deps = [ + "//pkg/settings/cluster", + "//pkg/util/leaktest", + "//pkg/util/mon", + "//pkg/util/quotapool", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], ) get_x_data(name = "get_x_data") diff --git a/pkg/ccl/backupccl/backuputils/memory_backed_quota_pool.go b/pkg/ccl/backupccl/backuputils/memory_backed_quota_pool.go new file mode 100644 index 000000000000..f70f88de6dd5 --- /dev/null +++ b/pkg/ccl/backupccl/backuputils/memory_backed_quota_pool.go @@ -0,0 +1,127 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backuputils + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// MemoryBackedQuotaPool is an IntPool backed up by a memory monitor. +// Users of MemoryBackedQuotaPool can acquire capacity from the IntPool, +// but the capacity of the IntPool can only be increased by acquiring +// the corresponding amount of memory from the backing memory monitor. +type MemoryBackedQuotaPool struct { + mon *mon.BytesMonitor + mem *mon.BoundAccount + + // capacityMu synchronizes operations related to the capacity + // of quotaPool. + capacityMu syncutil.Mutex + quotaPool *quotapool.IntPool +} + +// NewMemoryBackedQuotaPool creates a MemoryBackedQuotaPool from a +// parent monitor m with a limit. +func NewMemoryBackedQuotaPool( + ctx context.Context, m *mon.BytesMonitor, name redact.RedactableString, limit int64, +) *MemoryBackedQuotaPool { + q := MemoryBackedQuotaPool{ + quotaPool: quotapool.NewIntPool(fmt.Sprintf("%s-pool", name), 0), + } + + if m != nil { + q.mon = mon.NewMonitorInheritWithLimit(name, limit, m) + q.mon.StartNoReserved(ctx, m) + mem := q.mon.MakeBoundAccount() + q.mem = &mem + } + return &q +} + +// TryAcquireMaybeIncreaseCapacity tries to acquire size from the pool. +// On success, a non-nil alloc is returned and Release() must be called on +// it to return the quota to the pool. +// If the acquire fails because of not enough quota, it will repeatedly +// attempt to increase the capacity of the pool until the acquire succeeds. +// If the capacity increase fails, then the function will return +// with the error quotapool.ErrNotEnoughQuota. +// +// Safe for concurrent use. +func (q *MemoryBackedQuotaPool) TryAcquireMaybeIncreaseCapacity( + ctx context.Context, size uint64, +) (*quotapool.IntAlloc, error) { + for { + alloc, err := q.quotaPool.TryAcquire(ctx, size) + if err == nil || !errors.Is(err, quotapool.ErrNotEnoughQuota) { + return alloc, err + } + + // Not enough quota, attempt to grow the memory to increase quota pool + // capacity + if !q.IncreaseCapacity(ctx, size) { + return nil, quotapool.ErrNotEnoughQuota + } + } +} + +// Acquire acquires size from the pool. On success, a non-nil alloc is +// returned and Release() must be called on it to return the quota to the pool. +// +// Safe for concurrent use. +func (q *MemoryBackedQuotaPool) Acquire( + ctx context.Context, size uint64, +) (*quotapool.IntAlloc, error) { + return q.quotaPool.Acquire(ctx, size) +} + +// Release will release allocs back to the pool. +func (q *MemoryBackedQuotaPool) Release(allocs ...*quotapool.IntAlloc) { + q.quotaPool.Release(allocs...) +} + +// IncreaseCapacity will attempt to increase the capacity of the pool. +// Returns true if the increase succeeds and false otherwise. +// +// Safe for concurrent use. +func (q *MemoryBackedQuotaPool) IncreaseCapacity(ctx context.Context, size uint64) bool { + if err := q.mem.Grow(ctx, int64(size)); err != nil { + return false + } + + q.capacityMu.Lock() + defer q.capacityMu.Unlock() + q.quotaPool.UpdateCapacity(q.quotaPool.Capacity() + size) + return true +} + +// Capacity returns the capacity of the pool. +func (q *MemoryBackedQuotaPool) Capacity() uint64 { + q.capacityMu.Lock() + defer q.capacityMu.Unlock() + return q.quotaPool.Capacity() +} + +// Close closes the pool and returns the reserved memory to the backing +// memory monitor. +func (q *MemoryBackedQuotaPool) Close(ctx context.Context) { + if q.mem != nil { + q.mem.Close(ctx) + } + + if q.mon != nil { + q.mon.Stop(ctx) + } +} diff --git a/pkg/ccl/backupccl/backuputils/memory_backed_quota_pool_test.go b/pkg/ccl/backupccl/backuputils/memory_backed_quota_pool_test.go new file mode 100644 index 000000000000..302ce92c54cc --- /dev/null +++ b/pkg/ccl/backupccl/backuputils/memory_backed_quota_pool_test.go @@ -0,0 +1,90 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backuputils + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// TestMemoryBackedQuotaPoolBasic tests the basic functionality +// of the MemoryBackedQuotaPool in the presence of multiple +// goroutines acquiring and releasing quota, also with multiple +// goroutines acquiring and releasing memory from the parent +// memory monitor. +func TestMemoryBackedQuotaPoolBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + + for _, quota := range []int64{1, 10, 100, 1000} { + for _, numGoroutines := range []int{1, 10, 100} { + quota := quota + ctx := context.Background() + mm := mon.NewMonitorWithLimit( + "test-mon", mon.MemoryResource, quota, + nil, nil, 1, 0, + cluster.MakeTestingClusterSettings()) + mm.Start(ctx, nil, mon.NewStandaloneBudget(quota)) + mem := mm.MakeBoundAccount() + mem.Mu = &syncutil.Mutex{} + + qp := NewMemoryBackedQuotaPool(ctx, mm, "test-qp", quota) + res := make(chan error, numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + memReq := int64(rand.Intn(int(quota))) + if err := mem.Grow(ctx, memReq); err != nil { + return + } + + defer mem.Shrink(ctx, memReq) + }() + + go func() { + alloc, err := qp.TryAcquireMaybeIncreaseCapacity(ctx, 1) + if err != nil { + if !errors.Is(err, quotapool.ErrNotEnoughQuota) { + res <- err + return + } + + alloc, err = qp.Acquire(ctx, 1) + if err != nil { + res <- err + return + } + } + + defer alloc.Release() + res <- nil + return + }() + } + + for i := 0; i < numGoroutines; i++ { + select { + case <-time.After(5 * time.Second): + t.Fatalf("timeout after 5s") + case err := <-res: + if err != nil { + t.Fatal(err) + } + } + } + } + } +} diff --git a/pkg/ccl/backupccl/bench_test.go b/pkg/ccl/backupccl/bench_test.go index 934d9bc02d81..2856aa1c635e 100644 --- a/pkg/ccl/backupccl/bench_test.go +++ b/pkg/ccl/backupccl/bench_test.go @@ -9,11 +9,32 @@ package backupccl import ( + "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "io" + "math/rand" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/workload/bank" + "github.com/stretchr/testify/require" ) func BenchmarkDatabaseBackup(b *testing.B) { @@ -147,3 +168,182 @@ func BenchmarkDatabaseFullBackup(b *testing.B) { // *skip*--i.e., the number of bytes in the full backup. b.SetBytes(int64(b.N) * dataSize) } + +// BenchmarkIteratorMemory benchmarks the memory used for creating an SST +// iterator using ExternalSSTReader. It is meant to be used benchmark iterators +// for files on cloud providers. The test constructs an SST and writes it out to +// the cloud storage. It then repeatedly calls ExternalSSTReader to create +// iterators over multiple instances of the same SST. +// +// To run: +// +// dev bench ./pkg/ccl/backupccl/ --filter \ +// BenchmarkIteratorMemory/fileCount=100$/iterCount=10$/rows=200000$/enc=false \ +// --bench-mem --test_env=COCKROACH_S3_SST_DIR= \ +// --test_env=COCKROACH_GCS_SST_DIR= \ +// --test_env=AWS_ACCESS_KEY_ID= --test_env=AWS_SECRET_ACCESS_KEY= \ +// --test_env=GOOGLE_APPLICATION_CREDENTIALS= +func BenchmarkIteratorMemory(b *testing.B) { + defer leaktest.AfterTest(b)() + ctx := context.Background() + + numAccounts := 1000 + tc, _, _, cleanupFn := backupRestoreTestSetup(b, singleNode, numAccounts, InitManualReplication) + defer cleanupFn() + + now := tc.Server(0).Clock().Now() + + makeWriter := func( + store cloud.ExternalStorage, + filename string, + enc *jobspb.BackupEncryptionOptions) (io.WriteCloser, error) { + w, err := store.Writer(ctx, filename) + if err != nil { + return nil, err + } + + if enc != nil { + key, err := backupencryption.GetEncryptionKey(ctx, enc, nil) + if err != nil { + return nil, err + } + encW, err := storageccl.EncryptingWriter(w, key) + if err != nil { + return nil, err + } + w = encW + } + return w, nil + } + + getRandomPayload := func(buf []byte) { + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + const lettersLen = uint32(len(letters)) + for i := 0; i < len(buf); i++ { + buf[i] = letters[rand.Uint32()%lettersLen] + } + } + + writeSST := func(w io.WriteCloser, store cloud.ExternalStorage, payloadSize int, numKeys int) { + sst := storage.MakeBackupSSTWriter(ctx, store.Settings(), w) + + buf := make([]byte, payloadSize) + key := storage.MVCCKey{Timestamp: now} + for i := 0; i < numKeys; i++ { + getRandomPayload(buf) + key.Key = []byte(fmt.Sprintf("id-%09d", i)) + require.NoError(b, sst.Put(key, buf)) + } + + sst.Close() + } + + for _, testCase := range []struct { + storeType string + envVar string + }{ + { + "s3", + "COCKROACH_S3_SST_DIR", + }, + { + "gcs", + "COCKROACH_GCS_SST_DIR", + }, + } { + sstDir := envutil.EnvOrDefaultString(testCase.envVar, "") + if sstDir == "" { + skip.IgnoreLintf(b, "Environment variable %s not set", testCase.envVar) + } + storeURI := fmt.Sprintf("%s?AUTH=implicit", sstDir) + + store, err := cloud.ExternalStorageFromURI( + ctx, + storeURI, + base.ExternalIODirConfig{}, + tc.Servers[0].ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + username.RootUserName(), + tc.Servers[0].InternalDB().(isql.DB), + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(b, err) + + b.ReportAllocs() + + for _, rows := range []int{1_000, 10_000, 50_000, 100_000, 200_000} { + for _, encrypted := range []bool{true, false} { + for _, iterCount := range []int{1, 10, 100} { + for _, fileCount := range []int{10, 100, 1000} { + b.Run(fmt.Sprintf("fileCount=%d/iterCount=%d/rows=%d/store=%s/enc=%t", fileCount, iterCount, rows, testCase.storeType, encrypted), func(b *testing.B) { + var enc *jobspb.BackupEncryptionOptions + var encSuffix string + if encrypted { + const passphrase = "testing-encr-key" + enc = &jobspb.BackupEncryptionOptions{ + Key: []byte(passphrase), + Mode: jobspb.EncryptionMode_Passphrase, + } + encSuffix = "-enc" + } + filename := fmt.Sprintf("test-%d-%d%s", now.WallTime, rows, encSuffix) + w, err := makeWriter(store, filename, enc) + require.NoError(b, err) + + writeSST(w, store, 100, rows) + require.NoError(b, w.Close()) + + sz, err := store.Size(ctx, filename) + require.NoError(b, err) + + log.Infof(ctx, "Benchmarking using file of size %s", humanizeutil.IBytes(sz)) + fileStores := make([]storageccl.StoreFile, fileCount) + for i := 0; i < fileCount; i++ { + fileStores[i].Store = store + fileStores[i].FilePath = filename + } + + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) + iterOpts := storage.IterOptions{ + RangeKeyMaskingBelow: execCfg.Clock.Now(), + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iters := make([]storage.SimpleMVCCIterator, iterCount) + cleanup := func() { + for _, iter := range iters { + if iter != nil { + iter.Close() + } + } + } + defer cleanup() + + var encOpts *kvpb.FileEncryptionOptions + if enc != nil { + key, err := backupencryption.GetEncryptionKey(ctx, enc, nil) + require.NoError(b, err) + encOpts = &kvpb.FileEncryptionOptions{Key: key} + } + + b.ResetTimer() + + for j := 0; j < iterCount; j++ { + iter, err := storageccl.ExternalSSTReader(ctx, fileStores, encOpts, iterOpts) + require.NoError(b, err) + + iters[j] = iter + iter.SeekGE(storage.MVCCKey{}) + } + + b.StopTimer() + }) + } + } + } + } + } +} diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 30b9406fc82b..c92518681beb 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -944,7 +945,7 @@ func TestReintroduceOfflineSpans(t *testing.T) { knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ - RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) { mu.Lock() defer mu.Unlock() if entriesCount == 0 { diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 45fd44ab5d03..8654ecb1ee56 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -14,8 +14,8 @@ import ( "fmt" "runtime" - "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/keys" @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -35,8 +36,11 @@ import ( bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" gogotypes "github.com/gogo/protobuf/types" @@ -52,10 +56,11 @@ type restoreDataProcessor struct { spec execinfrapb.RestoreDataSpec input execinfra.RowSource - // numWorkers is the number of workers this processor should use. Initialized - // at processor creation based on the cluster setting. If the cluster setting - // is updated, the job should be PAUSEd and RESUMEd for the new setting to - // take effect. + // numWorkers is the number of workers this processor should use. This + // number is determined by the cluster setting and the amount of memory + // available to be used by RESTORE. If the cluster setting or memory + // allocation is updated, the job should be PAUSEd and RESUMEd for the new + // worker count to take effect. numWorkers int // phaseGroup manages the phases of the restore: @@ -72,6 +77,10 @@ type restoreDataProcessor struct { progCh chan backuppb.RestoreProgress agg *bulkutil.TracingAggregator + + // qp is a MemoryBackedQuotaPool that restricts the amount of memory that + // can be used by this processor to open iterators on SSTs. + qp *backuputils.MemoryBackedQuotaPool } var ( @@ -83,6 +92,19 @@ const restoreDataProcName = "restoreDataProcessor" const maxConcurrentRestoreWorkers = 32 +// sstReaderOverheadBytesPerFile and sstReaderEncryptedOverheadBytesPerFile were obtained +// benchmarking external SST iterators on GCP and AWS and selecting the highest +// observed memory per file. +const sstReaderOverheadBytesPerFile = 5 << 20 +const sstReaderEncryptedOverheadBytesPerFile = 8 << 20 + +// minWorkerMemReservation is the minimum amount of memory reserved per restore +// data processor worker. It should be greater than +// sstReaderOverheadBytesPerFile and sstReaderEncryptedOverheadBytesPerFile to +// ensure that all workers at least can simultaneously process at least one +// file. +const minWorkerMemReservation = 25 << 20 + func min(a, b int) int { if a < b { return a @@ -121,6 +143,27 @@ var numRestoreWorkers = settings.RegisterIntSetting( settings.PositiveInt, ) +// restorePerProcessorMemoryLimit is the limit on the memory used by a +// restoreDataProcessor. The actual limit is the lowest of this setting +// and the limit determined by restorePerProcessorMemoryLimitSQLFraction +// and --max-sql-memory. +var restorePerProcessorMemoryLimit = settings.RegisterByteSizeSetting( + settings.TenantWritable, + "bulkio.restore.per_processor_memory_limit", + "limit on the amount of memory that can be used by a restore processor", + 1<<30, // 1 GiB +) + +// restorePerProcessorMemoryLimitSQLFraction is the maximum percentage of the +// SQL memory pool that could be used by a restoreDataProcessor. +var restorePerProcessorMemoryLimitSQLFraction = settings.RegisterFloatSetting( + settings.TenantWritable, + "bulkio.restore.per_processor_memory_limit_sql_fraction", + "limit on the amount of memory that can be used by a restore processor as a fraction of max SQL memory", + 0.5, + settings.NonNegativeFloatWithMaximum(1.0), +) + func newRestoreDataProcessor( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -129,18 +172,35 @@ func newRestoreDataProcessor( post *execinfrapb.PostProcessSpec, input execinfra.RowSource, ) (execinfra.Processor, error) { - sv := &flowCtx.Cfg.Settings.SV - numWorkers := int(numRestoreWorkers.Get(sv)) - rd := &restoreDataProcessor{ - flowCtx: flowCtx, - input: input, - spec: spec, - progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers), - metaCh: make(chan *execinfrapb.ProducerMetadata, numWorkers), - numWorkers: numWorkers, + flowCtx: flowCtx, + input: input, + spec: spec, + progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers), + } + + var memMonitor *mon.BytesMonitor + var limit int64 + if spec.MemoryMonitorSSTs { + limit = restorePerProcessorMemoryLimit.Get(&flowCtx.EvalCtx.Settings.SV) + sqlFraction := restorePerProcessorMemoryLimitSQLFraction.Get(&flowCtx.EvalCtx.Settings.SV) + sqlFractionLimit := int64(sqlFraction * float64(flowCtx.Cfg.RootSQLMemoryPoolSize)) + if sqlFractionLimit < limit { + log.Infof(ctx, "using a maximum of %s memory per restore data processor (%f of max SQL memory %s)", + humanizeutil.IBytes(sqlFractionLimit), sqlFraction, + humanizeutil.IBytes(flowCtx.Cfg.RootSQLMemoryPoolSize)) + limit = sqlFractionLimit + } + + memMonitor = flowCtx.Cfg.BackupMonitor + if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if knobs.BackupMemMonitor != nil { + memMonitor = knobs.BackupMemMonitor + } + } } + rd.qp = backuputils.NewMemoryBackedQuotaPool(ctx, memMonitor, "restore-mon", limit) if err := rd.Init(ctx, rd, post, restoreDataOutputTypes, flowCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ InputsToDrain: []execinfra.RowSource{input}, @@ -160,6 +220,20 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { ctx = rd.StartInternal(ctx, restoreDataProcName) rd.input.Start(ctx) + // First we reserve minWorkerMemReservation for each restore worker, and + // making sure that we always have enough memory for at least one worker. The + // maximum number of workers is based on the cluster setting. If the cluster + // setting is updated, the job should be PAUSEd and RESUMEd for the new + // setting to take effect. + numWorkers, err := reserveRestoreWorkerMemory(ctx, rd.flowCtx.Cfg.Settings, rd.qp) + if err != nil { + log.Warningf(ctx, "cannot reserve restore worker memory: %v", err) + rd.MoveToDraining(err) + return + } + rd.numWorkers = numWorkers + rd.metaCh = make(chan *execinfrapb.ProducerMetadata, numWorkers) + ctx, cancel := context.WithCancel(ctx) ctx, rd.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx, fmt.Sprintf("%s-aggregator", restoreDataProcName), rd.EvalCtx.Tracer) @@ -246,14 +320,26 @@ func inputReader( } type mergedSST struct { - entry execinfrapb.RestoreSpanEntry - iter *storage.ReadAsOfIterator - cleanup func() + entry execinfrapb.RestoreSpanEntry + iter *storage.ReadAsOfIterator + cleanup func() + completeUpTo hlc.Timestamp } +type resumeEntry struct { + done bool + idx int +} + +// openSSTs opens all files in entry starting from the resumeIdx and returns a +// multiplexed SST iterator over the files. If memory monitoring is enabled and +// opening an additional file would exceed the current memory budget, a partial +// iterator over only the currently opened files would be returned, along with an +// updated resume idx, which the caller should use with openSSTs again to get an +// iterator over the remaining files. func (rd *restoreDataProcessor) openSSTs( - ctx context.Context, entry execinfrapb.RestoreSpanEntry, -) (mergedSST, error) { + ctx context.Context, entry execinfrapb.RestoreSpanEntry, resume *resumeEntry, +) (mergedSST, *resumeEntry, error) { // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir // in a given restore span entry var dirs []cloud.ExternalStorage @@ -270,11 +356,12 @@ func (rd *restoreDataProcessor) openSSTs( // getIter returns a multiplexed iterator covering the currently accumulated // files over the channel. - getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) (mergedSST, error) { + getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage, iterAllocs []*quotapool.IntAlloc, completeUpTo hlc.Timestamp) (mergedSST, error) { readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { readAsOfIter.Close() + rd.qp.Release(iterAllocs...) for _, dir := range dirsToSend { if err := dir.Close(); err != nil { @@ -284,9 +371,10 @@ func (rd *restoreDataProcessor) openSSTs( } mSST := mergedSST{ - entry: entry, - iter: readAsOfIter, - cleanup: cleanup, + entry: entry, + iter: readAsOfIter, + cleanup: cleanup, + completeUpTo: completeUpTo, } dirs = make([]cloud.ExternalStorage, 0) @@ -296,18 +384,71 @@ func (rd *restoreDataProcessor) openSSTs( log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) storeFiles := make([]storageccl.StoreFile, 0, len(entry.Files)) - for _, file := range entry.Files { + iterAllocs := make([]*quotapool.IntAlloc, 0, len(entry.Files)) + var sstOverheadBytesPerFile uint64 + if rd.spec.Encryption != nil { + sstOverheadBytesPerFile = sstReaderEncryptedOverheadBytesPerFile + } else { + sstOverheadBytesPerFile = sstReaderOverheadBytesPerFile + } + + idx := 0 + if resume != nil { + idx = resume.idx + } + + for ; idx < len(entry.Files); idx++ { + file := entry.Files[idx] log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) + alloc, err := rd.qp.TryAcquireMaybeIncreaseCapacity(ctx, sstOverheadBytesPerFile) + if errors.Is(err, quotapool.ErrNotEnoughQuota) { + // If we failed to allocate more memory, send the iterator + // containing the files we have right now. + if len(storeFiles) > 0 { + iterOpts := storage.IterOptions{ + RangeKeyMaskingBelow: rd.spec.RestoreTime, + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts) + if err != nil { + return mergedSST{}, nil, err + } + + log.VInfof(ctx, 2, "sending iterator after %d out of %d files due to insufficient memory", idx, len(entry.Files)) + + // TODO(rui): this is a placeholder value to show that a span has been + // partially but not completely processed. Eventually this timestamp should + // be the actual timestamp that we have processed up to so far. + completeUpTo := hlc.Timestamp{Logical: 1} + mSST, err := getIter(iter, dirs, iterAllocs, completeUpTo) + res := &resumeEntry{ + idx: idx, + done: false, + } + return mSST, res, err + } + + alloc, err = rd.qp.Acquire(ctx, sstOverheadBytesPerFile) + if err != nil { + return mergedSST{}, nil, err + } + } else if err != nil { + return mergedSST{}, nil, err + } + + iterAllocs = append(iterAllocs, alloc) + dir, err := rd.flowCtx.Cfg.ExternalStorage(ctx, file.Dir) if err != nil { - return mergedSST{}, err + return mergedSST{}, nil, err } dirs = append(dirs, dir) storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path}) - // TODO(pbardea): When memory monitoring is added, return the currently - // accumulated iterators if we run into memory pressure. } + iterOpts := storage.IterOptions{ RangeKeyMaskingBelow: rd.spec.RestoreTime, KeyTypes: storage.IterKeyTypePointsAndRanges, @@ -316,9 +457,15 @@ func (rd *restoreDataProcessor) openSSTs( } iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts) if err != nil { - return mergedSST{}, err + return mergedSST{}, nil, err + } + + mSST, err := getIter(iter, dirs, iterAllocs, rd.spec.RestoreTime) + res := &resumeEntry{ + idx: idx, + done: true, } - return getIter(iter, dirs) + return mSST, res, err } func (rd *restoreDataProcessor) runRestoreWorkers( @@ -335,6 +482,7 @@ func (rd *restoreDataProcessor) runRestoreWorkers( fmt.Sprintf("%s-worker-%d-aggregator", restoreDataProcName, worker), rd.EvalCtx.Tracer) defer agg.Close() + var sstIter mergedSST for { done, err := func() (done bool, _ error) { entry, ok := <-entries @@ -343,22 +491,27 @@ func (rd *restoreDataProcessor) runRestoreWorkers( return done, nil } - sstIter, err := rd.openSSTs(ctx, entry) - if err != nil { - return done, err - } - - summary, err := rd.processRestoreSpanEntry(ctx, kr, sstIter) - if err != nil { - return done, err - } - - select { - case rd.progCh <- makeProgressUpdate(summary, sstIter.entry, rd.spec.PKIDs): - case <-ctx.Done(): - return done, ctx.Err() + var res *resumeEntry + for { + sstIter, res, err = rd.openSSTs(ctx, entry, res) + if err != nil { + return done, err + } + + summary, err := rd.processRestoreSpanEntry(ctx, kr, sstIter) + if err != nil { + return done, err + } + + select { + case rd.progCh <- makeProgressUpdate(summary, sstIter.entry, rd.spec.PKIDs, sstIter.completeUpTo): + case <-ctx.Done(): + return done, ctx.Err() + } + if res.done { + break + } } - return done, nil }() if err != nil { @@ -399,10 +552,11 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( } // disallowShadowingBelow is set to an empty hlc.Timestamp in release builds - // i.e. allow all shadowing without AddSSTable having to check for overlapping - // keys. This is because RESTORE is expected to ingest into an empty keyspace. - // If a restore job is resumed, the un-checkpointed spans that are re-ingested - // will shadow (equal key, value; different ts) the already ingested keys. + // i.e. allow all shadowing without AddSSTable having to check for + // overlapping keys. This is necessary since RESTORE can sometimes construct + // SSTables that overwrite existing keys, in cases when there wasn't + // sufficient memory to open an iterator for all files at once for a given + // import span. // // NB: disallowShadowingBelow used to be unconditionally set to logical=1. // This permissive value would allow shadowing in case the RESTORE has to @@ -420,9 +574,6 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( // progress checkpointing so that we do not have a buildup of un-checkpointed // work, at which point we can reassess reverting to logical=1. disallowShadowingBelow := hlc.Timestamp{} - if !build.IsRelease() { - disallowShadowingBelow = hlc.Timestamp{Logical: 1} - } var err error batcher, err = bulk.MakeSSTBatcher(ctx, @@ -432,6 +583,9 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( disallowShadowingBelow, writeAtBatchTS, false, /* scatterSplitRanges */ + // TODO(rui): we can change this to the processor's bound account, but + // currently there seems to be some accounting errors that will cause + // tests to fail. rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), rd.flowCtx.Cfg.BulkSenderLimiter, ) @@ -508,7 +662,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( if restoreKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { if restoreKnobs.RunAfterProcessingRestoreSpanEntry != nil { - restoreKnobs.RunAfterProcessingRestoreSpanEntry(ctx) + restoreKnobs.RunAfterProcessingRestoreSpanEntry(ctx, &entry) } } @@ -516,12 +670,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( } func makeProgressUpdate( - summary kvpb.BulkOpSummary, entry execinfrapb.RestoreSpanEntry, pkIDs map[uint64]bool, + summary kvpb.BulkOpSummary, + entry execinfrapb.RestoreSpanEntry, + pkIDs map[uint64]bool, + completeUpTo hlc.Timestamp, ) (progDetails backuppb.RestoreProgress) { progDetails.Summary = countRows(summary, pkIDs) progDetails.ProgressIdx = entry.ProgressIdx progDetails.DataSpan = entry.Span - return + progDetails.CompleteUpTo = completeUpTo + return progDetails } // Next is part of the RowSource interface. @@ -531,7 +689,6 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce } var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - select { case progDetails, ok := <-rd.progCh: if !ok { @@ -563,10 +720,32 @@ func (rd *restoreDataProcessor) ConsumerClosed() { return } rd.cancelWorkersAndWait() + + rd.qp.Close(rd.Ctx()) rd.agg.Close() rd.InternalClose() } +func reserveRestoreWorkerMemory( + ctx context.Context, settings *cluster.Settings, qmem *backuputils.MemoryBackedQuotaPool, +) (int, error) { + maxRestoreWorkers := int(numRestoreWorkers.Get(&settings.SV)) + + numWorkers := 0 + for worker := 0; worker < maxRestoreWorkers; worker++ { + if !qmem.IncreaseCapacity(ctx, minWorkerMemReservation) { + if worker != 0 { + break // no more memory to run workers + } + return 0, errors.New("insufficient memory available to run restore") + } + + numWorkers++ + } + + return numWorkers, nil +} + // SSTBatcherExecutor wraps the SSTBatcher methods, allowing a validation only restore to // implement a mock SSTBatcher used purely for job progress tracking. type SSTBatcherExecutor interface { diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index ed974f354a29..f7cd37b54f29 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers @@ -392,10 +393,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix) - mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) + mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) - sst, err := mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry) + sst, res, err := mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, nil) require.NoError(t, err) + require.Equal(t, resumeEntry{done: true, idx: len(restoreSpanEntry.Files)}, *res) rewriter, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), mockRestoreDataSpec.TableRekeys, mockRestoreDataSpec.TenantRekeys, false /* restoreTenantFromStream */) require.NoError(t, err) @@ -433,7 +435,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } func newTestingRestoreDataProcessor( - evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec, + ctx context.Context, + evalCtx *eval.Context, + flowCtx *execinfra.FlowCtx, + spec execinfrapb.RestoreDataSpec, ) (*restoreDataProcessor, error) { rd := &restoreDataProcessor{ ProcessorBase: execinfra.ProcessorBase{ @@ -443,6 +448,7 @@ func newTestingRestoreDataProcessor( }, flowCtx: flowCtx, spec: spec, + qp: backuputils.NewMemoryBackedQuotaPool(ctx, nil, "restore-mon", 0), } return rd, nil } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 3db1f805c519..b73829c46c89 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -293,7 +293,8 @@ func restore( dataToRestore.getSpans(), job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint, on231, - restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV)) + restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV), + endTime) if err != nil { return emptyRowCount, err } @@ -378,12 +379,13 @@ func restore( generativeCheckpointLoop := func(ctx context.Context) error { defer close(requestFinishedCh) for progress := range progCh { - if err := progressTracker.ingestUpdate(ctx, progress); err != nil { + if spanDone, err := progressTracker.ingestUpdate(ctx, progress); err != nil { return err + } else if spanDone { + // Signal that the processor has finished importing a span, to update job + // progress. + requestFinishedCh <- struct{}{} } - // Signal that the processor has finished importing a span, to update job - // progress. - requestFinishedCh <- struct{}{} } return nil } diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 7e49de13445c..eab36e682ce6 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -49,6 +50,13 @@ var replanRestoreFrequency = settings.RegisterDurationSetting( settings.PositiveDuration, ) +var memoryMonitorSSTs = settings.RegisterBoolSetting( + settings.TenantWritable, + "bulkio.restore.memory_monitor_ssts", + "if true, restore will limit number of simultaneously open SSTs based on available memory", + util.ConstantWithMetamorphicTestBool("restore-memory-monitor-ssts", true), +) + // distRestore plans a 2 stage distSQL flow for a distributed restore. It // streams back progress updates over the given progCh. The first stage is a // splitAndScatter processor on every node that is running a compatible version. @@ -102,6 +110,7 @@ func distRestore( fileEncryption = &kvpb.FileEncryptionOptions{Key: encryption.Key} } + memMonSSTs := memoryMonitorSSTs.Get(execCtx.ExecCfg().SV()) makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg()) @@ -112,13 +121,14 @@ func distRestore( p := planCtx.NewPhysicalPlan() restoreDataSpec := execinfrapb.RestoreDataSpec{ - JobID: int64(jobID), - RestoreTime: restoreTime, - Encryption: fileEncryption, - TableRekeys: dataToRestore.getRekeys(), - TenantRekeys: dataToRestore.getTenantRekeys(), - PKIDs: dataToRestore.getPKIDs(), - ValidateOnly: dataToRestore.isValidateOnly(), + JobID: int64(jobID), + RestoreTime: restoreTime, + Encryption: fileEncryption, + TableRekeys: dataToRestore.getRekeys(), + TenantRekeys: dataToRestore.getTenantRekeys(), + PKIDs: dataToRestore.getPKIDs(), + ValidateOnly: dataToRestore.isValidateOnly(), + MemoryMonitorSSTs: memMonSSTs, } // Plan SplitAndScatter in a round-robin fashion. diff --git a/pkg/ccl/backupccl/restore_progress.go b/pkg/ccl/backupccl/restore_progress.go index dae900d1501b..90d97dc19bd7 100644 --- a/pkg/ccl/backupccl/restore_progress.go +++ b/pkg/ccl/backupccl/restore_progress.go @@ -72,6 +72,10 @@ type progressTracker struct { } useFrontier bool inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry + + // endTime is the restore as of timestamp. This can be empty, and an empty timestamp + // indicates a restore of the latest revision. + endTime hlc.Timestamp } func makeProgressTracker( @@ -79,6 +83,7 @@ func makeProgressTracker( persistedSpans []jobspb.RestoreProgress_FrontierEntry, useFrontier bool, maxBytes int64, + endTime hlc.Timestamp, ) (*progressTracker, error) { var ( @@ -111,6 +116,7 @@ func makeProgressTracker( pt.maxBytes = maxBytes pt.useFrontier = useFrontier pt.inFlightSpanFeeder = inFlightSpanFeeder + pt.endTime = endTime return pt, nil } @@ -188,16 +194,30 @@ func (pt *progressTracker) updateJobCallback( } // ingestUpdate updates the progressTracker after a progress update returns from -// the distributed processors. +// the distributed processors. ingestUpdate returns true if the update indicates +// the completion of a span, false otherwise. func (pt *progressTracker) ingestUpdate( ctx context.Context, rawProgress *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, -) error { +) (bool, error) { var progDetails backuppb.RestoreProgress if err := pbtypes.UnmarshalAny(&rawProgress.ProgressDetails, &progDetails); err != nil { log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) } pt.mu.Lock() defer pt.mu.Unlock() + + // After the change to bound the memory used by SST iterator, we can now + // get a progress entry for a partially completed span. Do not mark the + // span as done when receiving these entries. + // + // NB: progDetails with empty CompleteUpTo timestamps always denote the + // completion of a span, as either we've received details from a + // processor that does not memory monitor, or the restore's endTime was + // empty. + if !progDetails.CompleteUpTo.IsEmpty() && !progDetails.CompleteUpTo.Equal(pt.endTime) { + return false, nil + } + pt.mu.res.Add(progDetails.Summary) if pt.useFrontier { updateSpan := progDetails.DataSpan.Clone() @@ -241,7 +261,7 @@ func (pt *progressTracker) ingestUpdate( updateSpan.EndKey = newEndKey } if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { - return err + return false, err } } else { idx := progDetails.ProgressIdx @@ -252,7 +272,7 @@ func (pt *progressTracker) ingestUpdate( if !ok { // The channel has been closed, there is nothing left to do. log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") - return nil + return true, nil } pt.mu.inFlightImportSpans[i] = importSpan.Span } @@ -262,7 +282,7 @@ func (pt *progressTracker) ingestUpdate( if sp, ok := pt.mu.inFlightImportSpans[idx]; ok { // Assert that we're actually marking the correct span done. See #23977. if !sp.Key.Equal(progDetails.DataSpan.Key) { - return errors.Newf("request %d for span %v does not match import span for same idx: %v", + return false, errors.Newf("request %d for span %v does not match import span for same idx: %v", idx, progDetails.DataSpan, sp, ) } @@ -277,5 +297,5 @@ func (pt *progressTracker) ingestUpdate( } } } - return nil + return true, nil } diff --git a/pkg/ccl/backupccl/restore_progress_test.go b/pkg/ccl/backupccl/restore_progress_test.go index 12b88ade5de8..cda092a4134c 100644 --- a/pkg/ccl/backupccl/restore_progress_test.go +++ b/pkg/ccl/backupccl/restore_progress_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" gogotypes "github.com/gogo/protobuf/types" "github.com/stretchr/testify/require" @@ -40,8 +41,8 @@ func TestProgressTracker(t *testing.T) { requiredSpans := []roachpb.Span{c.sp("a", "e"), c.sp("f", "i")} - mockUpdate := func(sp roachpb.Span) *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress { - restoreProgress := backuppb.RestoreProgress{DataSpan: sp} + mockUpdate := func(sp roachpb.Span, completeUpTo hlc.Timestamp) *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress { + restoreProgress := backuppb.RestoreProgress{DataSpan: sp, CompleteUpTo: completeUpTo} details, err := gogotypes.MarshalAny(&restoreProgress) require.NoError(t, err) return &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{ProgressDetails: *details} @@ -60,11 +61,12 @@ func TestProgressTracker(t *testing.T) { type testStep struct { update roachpb.Span expectedPersisted []jobspb.RestoreProgress_FrontierEntry + completeUpTo hlc.Timestamp } // Each test step builds on the persistedSpan. persistedSpans := make([]jobspb.RestoreProgress_FrontierEntry, 0) - for _, step := range []testStep{ + for i, step := range []testStep{ { update: c.sp("a", "c"), expectedPersisted: pSp(c.sp("a", "c")), @@ -75,6 +77,11 @@ func TestProgressTracker(t *testing.T) { update: c.sp("c", "e"), expectedPersisted: pSp(c.sp("a", "f")), }, + { + update: c.sp("h", "i"), + expectedPersisted: pSp(c.sp("a", "f")), + completeUpTo: hlc.Timestamp{Logical: 1}, + }, { update: c.sp("h", "i"), expectedPersisted: pSp(c.sp("a", "f"), c.sp("h", "i")), @@ -85,12 +92,16 @@ func TestProgressTracker(t *testing.T) { expectedPersisted: pSp(c.sp("a", "i")), }, } { - pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0) - require.NoError(t, err) + restoreTime := hlc.Timestamp{} + pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0, restoreTime) + require.NoError(t, err, "step %d", i) - require.NoError(t, pt.ingestUpdate(ctx, mockUpdate(step.update))) + done, err := pt.ingestUpdate(ctx, mockUpdate(step.update, step.completeUpTo)) + require.NoError(t, err) + lastInSpan := step.completeUpTo == restoreTime + require.Equal(t, lastInSpan, done, "step %d", i) persistedSpans = persistFrontier(pt.mu.checkpointFrontier, 0) - require.Equal(t, step.expectedPersisted, persistedSpans) + require.Equal(t, step.expectedPersisted, persistedSpans, "step %d", i) } } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f950d7de9f61..acf2b25b7a1f 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -837,6 +837,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { RangeStatsFetcher: rangeStatsFetcher, AdmissionPacerFactory: cfg.admissionPacerFactory, ExecutorConfig: execCfg, + RootSQLMemoryPoolSize: cfg.MemoryPoolSize, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { diff --git a/pkg/settings/float.go b/pkg/settings/float.go index 971c14c1a49f..3d33f532e796 100644 --- a/pkg/settings/float.go +++ b/pkg/settings/float.go @@ -160,6 +160,19 @@ func NonNegativeFloat(v float64) error { return nil } +// NonNegativeFloatWithMaximum can be passed to RegisterFloatSetting. +func NonNegativeFloatWithMaximum(maxValue float64) func(float64) error { + return func(v float64) error { + if v < 0 { + return errors.Errorf("cannot be set to a negative value: %f", v) + } + if v > maxValue { + return errors.Errorf("cannot be set to a value larger than %f", maxValue) + } + return nil + } +} + // PositiveFloat can be passed to RegisterFloatSetting. func PositiveFloat(v float64) error { if v <= 0 { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fe4fd1cbed3d..fd0adffd68fa 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1685,7 +1685,7 @@ type BackupRestoreTestingKnobs struct { // RunAfterProcessingRestoreSpanEntry allows blocking the RESTORE job after a // single RestoreSpanEntry has been processed and added to the SSTBatcher. - RunAfterProcessingRestoreSpanEntry func(ctx context.Context) + RunAfterProcessingRestoreSpanEntry func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) // RunAfterExportingSpanEntry allows blocking the BACKUP job after a single // span has been exported. diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index e33fd9f3533a..dc0ed6bf73c5 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -112,6 +112,10 @@ type ServerConfig struct { // used during backup. BackupMonitor *mon.BytesMonitor + // Child monitor of the bulk monitor which will be used to monitor the memory + // used during restore. + RestoreMonitor *mon.BytesMonitor + // BulkSenderLimiter is the concurrency limiter that is shared across all of // the processes in a given sql server when sending bulk ingest (AddSST) reqs. BulkSenderLimiter limit.ConcurrentRequestLimiter @@ -198,6 +202,10 @@ type ServerConfig struct { // *sql.ExecutorConfig exposed as an interface (due to dependency cycles). ExecutorConfig interface{} + + // RootSQLMemoryPoolSize is the size in bytes of the root SQL memory + // monitor. + RootSQLMemoryPoolSize int64 } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 761ca768c35d..f8839251dba6 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -327,8 +327,14 @@ message RestoreDataSpec { map pk_ids = 4 [(gogoproto.customname) = "PKIDs"]; reserved 7; optional bool validate_only = 8 [(gogoproto.nullable) = false]; - - // NEXT ID: 9. + // MemoryMonitorSSTs, if true, is used to signal to restore data processors + // that they are permitted to memory monitor opening of external SSTs and send + // partial iterators if necessary. A true value means that the coordinating + // node is able to receive progress for these partial iterators and not mark a + // span as completed until all of the SSTs for the span have been restored. + optional bool memory_monitor_ssts = 9 [(gogoproto.nullable) = false, (gogoproto.customname) = "MemoryMonitorSSTs"]; + + // NEXT ID: 10. } message SplitAndScatterSpec {