Skip to content

Commit

Permalink
ccl/backupccl: add memory monitor to external SST iterators in restore
Browse files Browse the repository at this point in the history
Previously, there was no limit on the amount of memory that can be used while
constructing edternal SST iterators during restore. This patch adds a memory
monitor to limit the amount of memory that can be used to construct external
SST iterators. If a restore processor fails to acquire enough memory to open
the next file for a restore span, it will send the iterator for all of the open
files it has accumulated so far, and wait until it can acquire the memory to
resume constructing the iterator for the remaining files.

The memory limit can be controlled via the new cluster setting
bulkio.restore.per_processor_memory_limit. Regardless of the setting,
however, the amount of memory used will not exceed
COCKROACH_RESTORE_MEM_FRACTION * max SQL memory. The new environment
variable COCKROACH_RESTORE_MEM_FRACTION defaults to 0.5.

Release note: None
  • Loading branch information
Rui Hu committed Apr 3, 2023
1 parent 0745cd4 commit fca9148
Show file tree
Hide file tree
Showing 20 changed files with 945 additions and 93 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ALL_TESTS = [
"//pkg/ccl/backupccl/backupinfo:backupinfo_test",
"//pkg/ccl/backupccl/backuprand:backuprand_test",
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl/backuputils:backuputils_test",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl_test",
"//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test",
Expand Down Expand Up @@ -728,6 +729,7 @@ GO_TARGETS = [
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl/backuptestutils:backuptestutils",
"//pkg/ccl/backupccl/backuputils:backuputils",
"//pkg/ccl/backupccl/backuputils:backuputils_test",
"//pkg/ccl/backupccl:backupccl",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down Expand Up @@ -208,6 +209,7 @@ go_test(
"//pkg/ccl/backupccl/backupinfo",
"//pkg/ccl/backupccl/backuppb",
"//pkg/ccl/backupccl/backuptestutils",
"//pkg/ccl/backupccl/backuputils",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/multiregionccl/multiregionccltestutils",
Expand Down Expand Up @@ -288,7 +290,9 @@ go_test(
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
Expand Down
148 changes: 145 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ func checkInProgressBackupRestore(
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
<-allowResponse
},
},
Expand Down Expand Up @@ -1611,7 +1611,7 @@ func TestRestoreCheckpointing(t *testing.T) {
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
// Because the restore processor has several workers that
// concurrently send addsstable requests and because all workers will
// wait on the lock below, when one flush gets blocked on the
Expand Down Expand Up @@ -7349,7 +7349,7 @@ func TestClientDisconnect(t *testing.T) {

args := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context) {
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, _ *execinfrapb.RestoreSpanEntry) {
blockBackupOrRestore(ctx)
}}},
Store: &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -11029,3 +11029,145 @@ CREATE TABLE child_pk (k INT8 PRIMARY KEY REFERENCES parent);
sqlDB.Exec(t, `DROP DATABASE test`)
}
}

// Verify that during restore, if a restore span has too many files to fit in
// the memory budget with a single SST iterator, the restore processor should
// repeatedly open and process iterators for as many files as can fit within the
// budget until the span is finished.
func TestRestoreMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const splitSize = 10
for _, numSplits := range []int{10, 100, 1000} {
for _, numInc := range []int{0, 1, 3, 10} {
for _, restoreProcessorMaxFiles := range []int{5, 10, 20} {
t.Run(fmt.Sprintf("splits=%d/inc=%d/procMaxFiles=%d", numSplits, numInc, restoreProcessorMaxFiles), func(t *testing.T) {
numAccounts := numSplits * splitSize
var expectedNumFiles int
restoreProcessorKnobCount := atomic.Uint32{}
args := base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
// The total size of the backup files should be less than the target
// SST size, thus should all fit in one import span.
require.Equal(t, expectedNumFiles, len(entry.Files))
restoreProcessorKnobCount.Add(1)
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")

// Add some splits in the table, and set the target file size to be something
// small so that we get one flushed file per split in the backup.
sqlDB.Exec(t, "ALTER TABLE data.bank SPLIT AT SELECT generate_series($1::INT, $2, $3)", 0, numAccounts, splitSize)
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.file_size = '1B'")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Take some incremental backups after mutating some rows. Take note of the
// splits that have been changed as that determines the number of incremental
// files that are created.
var numIncFiles int
for i := 0; i < numInc; i++ {
incSplitsWithFile := make(map[int]bool)

for n := 0; n < 100; n++ {
id := rand.Intn(numAccounts)
sqlDB.Exec(t, `UPDATE data.bank SET balance = balance + 1 WHERE id = $1`, i)
split := id / splitSize
incSplitsWithFile[split] = true
}

numIncFiles += len(incSplitsWithFile)
}

expectedNumFiles += numSplits + numIncFiles
// Verify the file counts in the backup.
var numFiles int
sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&numFiles)
require.Equal(t, numFiles, expectedNumFiles)

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
// The expected number is just the ceiling of expectedNumFiles/restoreProcessorMaxFiles.
require.Equal(t, (expectedNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load()))

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
})
}
}
}
}

// Verify that restore with memory monitoring should be able to succeed with
// partial SST iterators that shadow previously written values.
func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 10
const numIncrementals = 10
const restoreProcessorMaxFiles = 5

restoreProcessorKnobCount := atomic.Uint32{}

args := base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
restoreProcessorKnobCount.Add(1)
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Repeatedly alter a single row and do an incremental backup.
for i := 0; i < numIncrementals; i++ {
sqlDB.Exec(t, `UPDATE data.bank SET balance = $1 WHERE id = $2`, 1000+i, i)
sqlDB.Exec(t, "BACKUP data.bank INTO latest IN 'userfile:///backup'")
}

// Set the memory budget for the restore processor to be enough to open 5
// files.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")
files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'")
require.Equal(t, 11, len(files)) // 1 file for full + 10 for 10 incrementals

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
require.Equal(t, 3, int(restoreProcessorKnobCount.Load())) // Ceiling(11/5)

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ message RestoreProgress {
roachpb.RowCount summary = 1 [(gogoproto.nullable) = false];
int64 progressIdx = 2;
roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false];
// CompleteUpTo is the timestamp that the data in DataSpan has been processed
// up to so far. The entire span has been processed if this timestamp is equal
// to restore end time.
util.hlc.Timestamp complete_up_to = 4 [(gogoproto.nullable) = false];
}

message BackupProcessorPlanningTraceEvent {
Expand Down
32 changes: 29 additions & 3 deletions pkg/ccl/backupccl/backuputils/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "backuputils",
srcs = ["utils.go"],
srcs = [
"memory_backed_quota_pool.go",
"utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils",
visibility = ["//visibility:public"],
deps = ["//pkg/cloud"],
deps = [
"//pkg/cloud",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "backuputils_test",
srcs = ["memory_backed_quota_pool_test.go"],
args = ["-test.timeout=295s"],
embed = [":backuputils"],
tags = ["ccl_test"],
deps = [
"//pkg/settings/cluster",
"//pkg/util/leaktest",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

get_x_data(name = "get_x_data")
127 changes: 127 additions & 0 deletions pkg/ccl/backupccl/backuputils/memory_backed_quota_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backuputils

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// MemoryBackedQuotaPool is an IntPool backed up by a memory monitor.
// Users of MemoryBackedQuotaPool can acquire capacity from the IntPool,
// but the capacity of the IntPool can only be increased by acquiring
// the corresponding amount of memory from the backing memory monitor.
type MemoryBackedQuotaPool struct {
mon *mon.BytesMonitor
mem *mon.BoundAccount

// capacityMu synchronizes operations related to the capacity
// of quotaPool.
capacityMu syncutil.Mutex
quotaPool *quotapool.IntPool
}

// NewMemoryBackedQuotaPool creates a MemoryBackedQuotaPool from a
// parent monitor m with a limit.
func NewMemoryBackedQuotaPool(
ctx context.Context, m *mon.BytesMonitor, name redact.RedactableString, limit int64,
) *MemoryBackedQuotaPool {
q := MemoryBackedQuotaPool{
quotaPool: quotapool.NewIntPool(fmt.Sprintf("%s-pool", name), 0),
}

if m != nil {
q.mon = mon.NewMonitorInheritWithLimit(name, limit, m)
q.mon.StartNoReserved(ctx, m)
mem := q.mon.MakeBoundAccount()
q.mem = &mem
}
return &q
}

// TryAcquireMaybeIncreaseCapacity tries to acquire size from the pool.
// On success, a non-nil alloc is returned and Release() must be called on
// it to return the quota to the pool.
// If the acquire fails because of not enough quota, it will repeatedly
// attempt to increase the capacity of the pool until the acquire succeeds.
// If the capacity increase fails, then the function will return
// with the error quotapool.ErrNotEnoughQuota.
//
// Safe for concurrent use.
func (q *MemoryBackedQuotaPool) TryAcquireMaybeIncreaseCapacity(
ctx context.Context, size uint64,
) (*quotapool.IntAlloc, error) {
for {
alloc, err := q.quotaPool.TryAcquire(ctx, size)
if err == nil || !errors.Is(err, quotapool.ErrNotEnoughQuota) {
return alloc, err
}

// Not enough quota, attempt to grow the memory to increase quota pool
// capacity
if !q.IncreaseCapacity(ctx, size) {
return nil, quotapool.ErrNotEnoughQuota
}
}
}

// Acquire acquires size from the pool. On success, a non-nil alloc is
// returned and Release() must be called on it to return the quota to the pool.
//
// Safe for concurrent use.
func (q *MemoryBackedQuotaPool) Acquire(
ctx context.Context, size uint64,
) (*quotapool.IntAlloc, error) {
return q.quotaPool.Acquire(ctx, size)
}

// Release will release allocs back to the pool.
func (q *MemoryBackedQuotaPool) Release(allocs ...*quotapool.IntAlloc) {
q.quotaPool.Release(allocs...)
}

// IncreaseCapacity will attempt to increase the capacity of the pool.
// Returns true if the increase succeeds and false otherwise.
//
// Safe for concurrent use.
func (q *MemoryBackedQuotaPool) IncreaseCapacity(ctx context.Context, size uint64) bool {
if err := q.mem.Grow(ctx, int64(size)); err != nil {
return false
}

q.capacityMu.Lock()
defer q.capacityMu.Unlock()
q.quotaPool.UpdateCapacity(q.quotaPool.Capacity() + size)
return true
}

// Capacity returns the capacity of the pool.
func (q *MemoryBackedQuotaPool) Capacity() uint64 {
q.capacityMu.Lock()
defer q.capacityMu.Unlock()
return q.quotaPool.Capacity()
}

// Close closes the pool and returns the reserved memory to the backing
// memory monitor.
func (q *MemoryBackedQuotaPool) Close(ctx context.Context) {
if q.mem != nil {
q.mem.Close(ctx)
}

if q.mon != nil {
q.mon.Stop(ctx)
}
}
Loading

0 comments on commit fca9148

Please sign in to comment.