Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#74091

73750: changefeedccl: limit in-memory rowfetcher cache r=[miretskiy] a=HonoreDB

We generate a row.Fetcher for each new version of a watched
table, and cache it in-memory. This is technically an unbounded
memory usage and thus could cause an OOM, although in practice
you won't hit that unless you are Schema Changes Georg, the
hypothetical outlier who makes thousands of schema changes a second.
To support Schema Changes Georg, this PR bounds the size of the
cache and evicts the oldest record. There's smarter stuff we could
do (per an existing TODO) to eagerly evict records using resolved
timestamps, but that's an optimization only a small subset of
hypothetical Georgs would care about (those running single
changefeeds on many tables, all with many rapid schema changes)
so I doubt we'll want to bother.

Release note: None

73805: backupccl: add BackupMonitor to memory monitor file stitching r=dt a=adityamaru

This change adds a BackupMonitor that hangs off the bulk memory
monitor. This monitor currently only guards the queue that we use
to buffer SSTs while stitching them together in the sstSink.

Informs: cockroachdb#73815

Release note: None

74088: bazel: pass path argument to `find` r=irfansharif a=rickystewart

The bare `find -name ...` invocation without a path argument fails on
some versions of `find`, namely the one installed on macOS machines.

Release note: None

74091: bazel: build `dev` with `--config nonogo` r=irfansharif a=rickystewart

I didn't do this unconditionally because I was afraid it would thrash
the cache, but in my testing it seems to not really happen (at least,
compiling `dev` with `nonogo` is so fast that it vastly outweighs any
invalidation that may be happening).

Closes cockroachdb#74004.

Release note: None

Co-authored-by: Aaron Zinger <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
4 people committed Dec 20, 2021
5 parents 9838965 + 08542b2 + 1ac3e93 + 3e3f536 + b3db255 commit 0d07a2d
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build/bazelutil/bazel-generate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fi

bazel run //:gazelle

if files_unchanged_from_upstream $(find -name BUILD.bazel) $(find -name '*.bzl'); then
if files_unchanged_from_upstream $(find ./pkg -name BUILD.bazel) $(find ./pkg -name '*.bzl'); then
echo "Skipping //pkg/cmd/generate-test-suites (relevant files are unchanged from upstream)."
else
CONTENTS=$(bazel run //pkg/cmd/generate-test-suites --run_under="cd $PWD && ")
Expand Down
8 changes: 2 additions & 6 deletions dev
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ set -uo pipefail

this_dir=$(cd "$(dirname "$0")" && pwd)
mkdir -p $this_dir/artifacts
bazel build //pkg/cmd/dev &> $this_dir/artifacts/dev.log
bazel build //pkg/cmd/dev --config nonogo &> $this_dir/artifacts/dev.log
status=$?
if [ $status -eq 0 ]
then
$(bazel info bazel-bin)/pkg/cmd/dev/dev_/dev "$@"
$(bazel info bazel-bin --config nonogo)/pkg/cmd/dev/dev_/dev "$@"
else
echo 'Failed to build pkg/cmd/dev! Got output:'
cat $this_dir/artifacts/dev.log
echo 'Hint: if the full `dev` build is failing for you, you can build a minimal version with --config nonogo.'
echo 'Afterward, run `dev doctor` to debug your failing build. For example:'
echo ' bazel build pkg/cmd/dev --config nonogo && _bazel/bin/pkg/cmd/dev/dev_/dev doctor'
echo 'When `dev doctor` says you are ready to build, try re-building the full binary with `./dev`.'
exit $status
fi
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"create_scheduled_backup.go",
"key_rewriter.go",
"manifest_handling.go",
"memory_utils.go",
"restoration_data.go",
"restore_data_processor.go",
"restore_job.go",
Expand Down Expand Up @@ -110,6 +111,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/stop",
Expand Down Expand Up @@ -226,6 +228,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
46 changes: 40 additions & 6 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ type backupDataProcessor struct {
cancelAndWaitForWorker func()
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error

// Memory accumulator that reserves the memory usage of the backup processor.
backupMem *memoryAccumulator
}

var _ execinfra.Processor = &backupDataProcessor{}
Expand All @@ -127,11 +130,18 @@ func newBackupDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
memMonitor := flowCtx.Cfg.BackupMonitor
if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if knobs.BackupMemMonitor != nil {
memMonitor = knobs.BackupMemMonitor
}
}
bp := &backupDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
backupMem: newMemoryAccumulator(memMonitor),
}
if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -160,7 +170,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
TaskName: "backup-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem)
cancel()
close(bp.progCh)
}); err != nil {
Expand Down Expand Up @@ -196,6 +206,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.ProcessorBase.InternalClose()
bp.backupMem.close(bp.Ctx)
}

// ConsumerClosed is part of the RowSource interface. We have to override the
Expand Down Expand Up @@ -227,6 +238,7 @@ func runBackupProcessor(
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.BackupDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupMem *memoryAccumulator,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings
Expand Down Expand Up @@ -525,7 +537,10 @@ func runBackupProcessor(
return err
}

sink := &sstSink{conf: sinkConf, dest: storage}
sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem)
if err != nil {
return err
}

defer func() {
err := sink.Close()
Expand Down Expand Up @@ -579,6 +594,20 @@ type sstSink struct {
sizeFlushes int
spanGrows int
}

backupMem *memoryAccumulator
}

func makeSSTSink(
ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator,
) (*sstSink, error) {
s := &sstSink{conf: conf, dest: dest, backupMem: backupMem}

// Reserve memory for the file buffer.
if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil {
return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue")
}
return s, nil
}

func (s *sstSink) Close() error {
Expand All @@ -589,6 +618,10 @@ func (s *sstSink) Close() error {
if s.cancel != nil {
s.cancel()
}

// Release the memory reserved for the file buffer back to the memory
// accumulator.
s.backupMem.release(smallFileBuffer.Get(s.conf.settings))
if s.out != nil {
return s.out.Close()
}
Expand Down Expand Up @@ -696,6 +729,7 @@ func (s *sstSink) open(ctx context.Context) error {
}
s.out = w
s.sst = storage.MakeBackupSSTWriter(s.out)

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"math"
"math/rand"
"net/url"
"os"
Expand Down Expand Up @@ -74,6 +75,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -9012,6 +9014,49 @@ CREATE SCHEMA db.s;
sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`)
}

func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

memoryMonitor := mon.NewMonitor(
"test-mem",
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment */
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
ctx := context.Background()
byteLimit := 14 << 20 // 14 MiB
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit)))
defer memoryMonitor.Stop(ctx)
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
BackupMemMonitor: memoryMonitor,
}},
}
params.ServerArgs.Knobs = knobs

const numAccounts = 100

_, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts,
InitManualReplication, params)
defer cleanup()

// Run a backup and expect the Grow() for the sstSink to return a memory error
// since the default queue byte size is 16MiB.
sqlDB.ExpectErr(t, "failed to reserve memory for sstSink queue", `BACKUP INTO 'nodelocal://0/foo'`)

// Reduce the queue byte size cluster setting.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '13MiB'`)

// Now the backup should succeed because it is below the `byteLimit`.
sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/bar'`)
}

// TestBackupRestoreSeperateIncrementalPrefix tests that a backup/restore round
// trip using the 'incremental_storage' parameter restores the same db as a BR
// round trip without the parameter.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
// and not report any progress in the meantime unless it is disabled.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)

// Test servers only have 128MB root memory monitors, reduce the buffer size
// so we don't see memory errors.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`)

// Take a backup that we'll use to create an OFFLINE descriptor.
srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`)
srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc)
Expand Down
68 changes: 68 additions & 0 deletions pkg/ccl/backupccl/memory_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory
// from the bound account when it is closed, otherwise it accumulates and
// re-uses resources.
// This is useful when resources, once accumulated should not be returned as
// they may be needed later to make progress.
// It is safe for concurrent use.
type memoryAccumulator struct {
syncutil.Mutex
ba mon.BoundAccount
reserved int64
}

// newMemoryAccumulator creates a new accumulator backed by a bound account created
// from the given memory monitor.
func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator {
return &memoryAccumulator{ba: mm.MakeBoundAccount()}
}

// request checks that the given number of bytes is available, requesting some
// from the backing monitor if necessary.
func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error {
acc.Lock()
defer acc.Unlock()

if acc.reserved >= requested {
acc.reserved -= requested
return nil
}

requested -= acc.reserved
acc.reserved = 0

return acc.ba.Grow(ctx, requested)
}

// release releases a number of bytes back into the internal reserved pool.
func (acc *memoryAccumulator) release(released int64) {
acc.Lock()
defer acc.Unlock()

acc.reserved += released
}

// close returns all accumulated memory to the backing monitor.
func (acc *memoryAccumulator) close(ctx context.Context) {
acc.Lock()
defer acc.Unlock()

acc.reserved = 0
acc.ba.Close(ctx)
}
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/column-families
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs;
-- Split the output files very small to catch output SSTs mid-row.
SET CLUSTER SETTING bulkio.backup.file_size = '1';
SET CLUSTER SETTING kv.bulk_sst.target_size = '1';
SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB';
BACKUP cfs TO 'nodelocal://1/foo';
CREATE DATABASE r1;
RESTORE cfs FROM 'nodelocal://1/foo' WITH into_db='r1';
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
"//pkg/util/ctxgroup",
"//pkg/util/duration",
"//pkg/util/encoding",
Expand Down
27 changes: 18 additions & 9 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
Expand All @@ -35,14 +36,22 @@ import (
type rowFetcherCache struct {
codec keys.SQLCodec
leaseMgr *lease.Manager
fetchers map[idVersion]*row.Fetcher
fetchers *cache.UnorderedCache

collection *descs.Collection
db *kv.DB

a rowenc.DatumAlloc
}

var rfCacheConfig = cache.Config{
Policy: cache.CacheFIFO,
// TODO: If we find ourselves thrashing here in changefeeds on many tables,
// we can improve performance by eagerly evicting versions using Resolved notifications.
// A old version with a timestamp entirely before a notification can be safely evicted.
ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 },
}

type idVersion struct {
id descpb.ID
version descpb.DescriptorVersion
Expand All @@ -60,7 +69,7 @@ func newRowFetcherCache(
leaseMgr: leaseMgr,
collection: cf.NewCollection(nil /* TemporarySchemaProvider */),
db: db,
fetchers: make(map[idVersion]*row.Fetcher),
fetchers: cache.NewUnorderedCache(rfCacheConfig),
}
}

Expand Down Expand Up @@ -138,10 +147,13 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
// UserDefinedTypeColsHaveSameVersion if we have a hit because we are
// guaranteed that the tables have the same version. Additionally, these
// fetchers are always initialized with a single tabledesc.Immutable.
if rf, ok := c.fetchers[idVer]; ok &&
catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) {
return rf, nil
if v, ok := c.fetchers.Get(idVer); ok {
rf := v.(*row.Fetcher)
if catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) {
return rf, nil
}
}

// TODO(dan): Allow for decoding a subset of the columns.
var colIdxMap catalog.TableColMap
var valNeededForCol util.FastIntSet
Expand Down Expand Up @@ -177,9 +189,6 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
// Necessary because virtual columns are not populated.
rf.IgnoreUnexpectedNulls = true

// TODO(dan): Bound the size of the cache. Resolved notifications will let
// us evict anything for timestamps entirely before the notification. Then
// probably an LRU just in case?
c.fetchers[idVer] = &rf
c.fetchers.Add(idVer, &rf)
return &rf, nil
}
Loading

0 comments on commit 0d07a2d

Please sign in to comment.