Skip to content

Commit

Permalink
Merge pull request #81789 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-80494

release-22.1: server: add server-wide limit on addsstable send concurrency
  • Loading branch information
dt authored Jun 7, 2022
2 parents c432bc5 + 149d64f commit a07393c
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io/ioutil"
math "math"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -42,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -249,9 +251,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &tree.EvalContext{
Codec: keys.SystemSQLCodec,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -51,9 +53,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: kvDB,
JobRegistry: registry,
Settings: st,
DB: kvDB,
JobRegistry: registry,
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount())
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings,
sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter)
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"fmt"
"math"
"net/url"
"strconv"
"sync"
Expand Down Expand Up @@ -39,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -517,10 +519,11 @@ func getStreamIngestionProcessor(

flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: kvDB,
JobRegistry: registry,
TestingKnobs: execinfra.TestingKnobs{StreamingTestingKnobs: streamingTestingKnobs},
Settings: st,
DB: kvDB,
JobRegistry: registry,
TestingKnobs: execinfra.TestingKnobs{StreamingTestingKnobs: streamingTestingKnobs},
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down Expand Up @@ -62,6 +63,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/tracing",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -78,6 +79,7 @@ func MakeBulkAdder(
timestamp hlc.Timestamp,
opts kvserverbase.BulkAdderOptions,
bulkMon *mon.BytesMonitor,
sendLimiter limit.ConcurrentRequestLimiter,
) (*BufferingAdder, error) {
if opts.MinBufferSize == 0 {
opts.MinBufferSize = 32 << 20
Expand All @@ -98,6 +100,7 @@ func MakeBulkAdder(
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
mem: bulkMon.MakeBoundAccount(),
limiter: sendLimiter,
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand Down
104 changes: 74 additions & 30 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package bulk
import (
"bytes"
"context"
"math"
"sync/atomic"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -55,8 +57,35 @@ var (
0,
settings.NonNegativeDuration,
)

senderConcurrency = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.ingest.sender_concurrency_limit",
"maximum number of concurrent bulk ingest requests sent by any one sender, such as a processor in an IMPORT, index creation or RESTORE, etc (0 = no limit)",
0,
settings.NonNegativeInt,
)
)

// MakeAndRegisterConcurrencyLimiter makes a concurrency limiter and registers it
// with the setting on-change hook; it should be called only once during server
// setup due to the side-effects of the on-change registration.
func MakeAndRegisterConcurrencyLimiter(sv *settings.Values) limit.ConcurrentRequestLimiter {
newLimit := int(senderConcurrency.Get(sv))
if newLimit == 0 {
newLimit = math.MaxInt
}
l := limit.MakeConcurrentRequestLimiter("bulk-send-limit", newLimit)
senderConcurrency.SetOnChange(sv, func(ctx context.Context) {
newLimit := int(senderConcurrency.Get(sv))
if newLimit == 0 {
newLimit = math.MaxInt
}
l.SetLimit(newLimit)
})
return l
}

// SSTBatcher is a helper for bulk-adding many KVs in chunks via AddSSTable. An
// SSTBatcher can be handed KVs repeatedly and will make them into SSTs that are
// added when they reach the configured size, tracking the total added rows,
Expand All @@ -69,6 +98,7 @@ type SSTBatcher struct {
rc *rangecache.RangeCache
settings *cluster.Settings
mem mon.BoundAccount
limiter limit.ConcurrentRequestLimiter

// disallowShadowingBelow is described on roachpb.AddSSTableRequest.
disallowShadowingBelow hlc.Timestamp
Expand Down Expand Up @@ -147,6 +177,7 @@ func MakeSSTBatcher(
writeAtBatchTs bool,
splitFilledRanges bool,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{
name: name,
Expand All @@ -156,6 +187,7 @@ func MakeSSTBatcher(
writeAtBatchTS: writeAtBatchTs,
disableSplits: !splitFilledRanges,
mem: mem,
limiter: sendLimiter,
}
err := b.Reset(ctx)
return b, err
Expand All @@ -164,9 +196,13 @@ func MakeSSTBatcher(
// MakeStreamSSTBatcher creates a batcher configured to ingest duplicate keys
// that might be received from a cluster to cluster stream.
func MakeStreamSSTBatcher(
ctx context.Context, db *kv.DB, settings *cluster.Settings, mem mon.BoundAccount,
ctx context.Context,
db *kv.DB,
settings *cluster.Settings,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem}
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter}
err := b.Reset(ctx)
return b, err
}
Expand Down Expand Up @@ -455,12 +491,42 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
summary := b.rowCounter.BulkOpSummary
data := b.sstFile.Data()
batchTS := b.batchTS

res, err := b.limiter.Begin(ctx)
if err != nil {
return err
}

// If we're flushing due to a range boundary, we we might be flushing this
// one buffer into many different ranges, and doing so one-by-one, waiting
// for each round-trip serially, could really add up; a buffer of random
// data that covers all of a 2000 range table would be flushing 2000 SSTs,
// each of which might be quite small, like 256kib, but still see, say, 50ms
// or more round-trip time. Doing those serially would then take minutes. If
// we can, instead send this SST and move on to the next while it is sent,
// we could reduce that considerably. One concern with doing so however is
// that you could potentially end up with an entire buffer's worth of SSTs
// all inflight at once, effectively doubling the memory footprint, so we
// need to reserve memory from a monitor for the sst before we move on to
// the next one; if memory is not available we'll just block on the send
// and then move on to the next send after this SST is no longer being held
// in memory.
flushAsync := reason == rangeFlush

var reserved int64
if flushAsync {
if err := b.mem.Grow(ctx, int64(cap(data))); err != nil {
log.VEventf(ctx, 3, "%s unable to reserve enough memory to flush async: %v", b.name, err)
flushAsync = false
} else {
reserved = int64(cap(data))
}
}

updatesLastRange := reason != rangeFlush
fn := func(ctx context.Context) error {
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, updatesLastRange); err != nil {
b.mem.Shrink(ctx, reserved)
defer res.Release()
defer b.mem.Shrink(ctx, reserved)
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, !flushAsync); err != nil {
return err
}
b.mu.Lock()
Expand All @@ -469,37 +535,15 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
atomic.AddInt64(&b.stats.batchWaitAtomic, int64(timeutil.Since(beforeFlush)))
atomic.AddInt64(&b.stats.dataSizeAtomic, int64(size))
b.mu.Unlock()
if reserved != 0 {
b.mem.Shrink(ctx, reserved)
}
return nil
}

if reason == rangeFlush {
// If we're flushing due to a range boundary, we we might be flushing this
// one buffer into many different ranges, and doing so one-by-one, waiting
// for each round-trip serially, could really add up; a buffer of random
// data that covers all of a 2000 range table would be flushing 2000 SSTs,
// each of which might be quite small, like 256kib, but still see, say, 50ms
// or more round-trip time. Doing those serially would then take minutes. If
// we can, instead send this SST and move on to the next while it is sent,
// we could reduce that considerably. One concern with doing so however is
// that you could potentially end up with an entire buffer's worth of SSTs
// all inflight at once, effectively doubling the memory footprint, so we
// need to reserve memory from a monitor for the sst before we move on to
// the next one; if memory is not available we'll just block on the send
// and then move on to the next send after this SST is no longer being held
// in memory.
if flushAsync {
if b.asyncAddSSTs == (ctxgroup.Group{}) {
b.asyncAddSSTs = ctxgroup.WithContext(ctx)
}
if err := b.mem.Grow(ctx, int64(cap(data))); err != nil {
log.VEventf(ctx, 3, "%s unable to reserve enough memory to flush async: %v", b.name, err)
} else {
reserved = int64(cap(data))
b.asyncAddSSTs.GoCtx(fn)
return nil
}
b.asyncAddSSTs.GoCtx(fn)
return nil
}

return fn(ctx)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand Down Expand Up @@ -133,9 +134,10 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mockCache.Insert(ctx, r)

ts := hlc.Timestamp{WallTime: 100}
lots := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, lots,
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
)
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

clusterIDForSQL := cfg.rpcContext.LogicalClusterID

bulkSenderLimiter := bulk.MakeAndRegisterConcurrencyLimiter(&cfg.Settings.SV)

// Set up the DistSQL server.
distSQLCfg := execinfra.ServerConfig{
AmbientContext: cfg.AmbientCtx,
Expand All @@ -613,6 +615,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
BackupMonitor: backupMemoryMonitor,
BulkSenderLimiter: bulkSenderLimiter,

ParentMemoryMonitor: rootSQLMemoryMonitor,
BulkAdder: func(
Expand All @@ -621,7 +624,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// Attach a child memory monitor to enable control over the BulkAdder's
// memory usage.
bulkMon := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "bulk-adder-monitor")
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon)
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon, bulkSenderLimiter)
},

Metrics: &distSQLMetrics,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
Expand Down
Loading

0 comments on commit a07393c

Please sign in to comment.