Skip to content

Commit

Permalink
server/cdc: add elastic CPU control to CDC event processing
Browse files Browse the repository at this point in the history
Previously, the SQL layer did not have access to admission control
via `kvadmission.Controller`. This change makes the controller
available inside the SQL server config. Note that it has not been
made available in tenant servers; this is left as a todo.

Previously, the CPU-bound work of CDC event processing (encoding /
decoding rows) had the potential to consume a lot of CPU and
disrupt foreground SQL traffic. This changes adds elastic CPU control
to event processing so that it does not use excessive CPU and
starve foreground traffic.

Fixes: cockroachdb#90089

Release note: None
  • Loading branch information
jayshrivastava committed Nov 10, 2022
1 parent b31c9bd commit 58b9ab3
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 8 deletions.
5 changes: 3 additions & 2 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ bulkio.backup.read_timeout duration 5m0s amount of time after which a read attem
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value
changefeed.event_consumer_pacer_request_size duration 9ms an event consumer worker will perform a blocking request for CPU time before consuming events. after fully utilizing this CPU time, it will request more
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
Expand Down
5 changes: 3 additions & 2 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.balance_range_distribution.enable</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value</td></tr>
<tr><td><code>changefeed.event_consumer_pacer_request_size</code></td><td>duration</td><td><code>9ms</code></td><td>an event consumer worker will perform a blocking request for CPU time before consuming events. after fully utilizing this CPU time, it will request more</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td></tr>
<tr><td><code>changefeed.fast_gzip.enabled</code></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>changefeed.schema_feed.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ go_library(
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
Expand Down
19 changes: 17 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,31 @@ var EventConsumerWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_workers",
"the number of workers to use when processing events: <0 disables, "+
"0 assigns a reasonable default, >0 assigns the setting value",
"0 assigns a reasonable default, >0 assigns the setting value. for experimental/core "+
"changefeeds and changefeeds using parquet format, this is disabled",
0,
).WithPublic()

// EventConsumerWorkerQueueSize specifies the maximum number of events a worker buffer.
var EventConsumerWorkerQueueSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_worker_queue_size",
"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events"+
"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events "+
"which a worker can buffer",
int64(util.ConstantWithMetamorphicTestRange("changefeed.event_consumer_worker_queue_size", 16, 0, 16)),
settings.NonNegativeInt,
).WithPublic()

// EventConsumerPacerRequestSize specifies how often (measured in CPU time)
// that event consumer workers request CPU time from admission control.
// For example, every N milliseconds of CPU work, request N more
// milliseconds of CPU time.
var EventConsumerPacerRequestSize = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.event_consumer_pacer_request_size",
"an event consumer worker will perform a blocking request for CPU time "+
"before consuming events. after fully utilizing this CPU time, it will "+
"request more",
50*time.Millisecond,
settings.PositiveDuration,
).WithPublic()
40 changes: 38 additions & 2 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ import (
"hash"
"hash/crc32"
"runtime"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -104,18 +108,19 @@ func newEventConsumer(
encoder, details, expr, knobs, topicNamer)
}

// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds
numWorkers := changefeedbase.EventConsumerWorkers.Get(&cfg.Settings.SV)
if numWorkers == 0 {
// Pick a reasonable default.
numWorkers = defaultNumWorkers()
}

// TODO (ganeshb) Add support for parallel encoding when using parquet.
// We cannot have a separate encoder and sink for parquet format (see
// parquet_sink_cloudstorage.go). Because of this the current nprox solution
// does not work for parquet format.
//
//TODO (ganeshb) Add support for parallel encoding
// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds.
// The setting description should also be updated.
if numWorkers <= 1 || isSinkless || encodingOpts.Format == changefeedbase.OptFormatParquet {
c, err := makeConsumer(sink, spanFrontier)
if err != nil {
Expand All @@ -135,6 +140,7 @@ func newEventConsumer(
workerCh: make([]chan kvevent.Event, numWorkers),
workerChSize: changefeedbase.EventConsumerWorkerQueueSize.Get(&cfg.Settings.SV),
spanFrontier: spanFrontier,
pacerMaker: cfg.PacerMaker,
}
ss := &safeSink{wrapped: sink, beforeFlush: c.Flush}
c.makeConsumer = func() (eventConsumer, error) {
Expand Down Expand Up @@ -439,6 +445,13 @@ type parallelEventConsumer struct {
// that spawned this event consumer.
spanFrontier *span.Frontier

// pacerMaker is used to create a kvadmission.Pacer
// which is used to incorporate event consumption to elastic CPU
// control. This helps ensure that event encoding/decoding
// does not throttle foreground SQL traffic.
pacerMaker admission.PacerMaker
pacerRequestUnit time.Duration

// termErr and termCh are used to save the first error that occurs
// in any worker and signal all workers to stop.
//
Expand Down Expand Up @@ -527,6 +540,20 @@ func (c *parallelEventConsumer) startWorkers() error {
func (c *parallelEventConsumer) workerLoop(
ctx context.Context, consumer eventConsumer, id int64,
) error {
tenantID, ok := roachpb.TenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
}
pacer := c.pacerMaker.NewPacer(
c.pacerRequestUnit,
admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.BulkNormalPri,
CreateTime: timeutil.Now().UnixNano(),
BypassAdmission: false,
})
defer pacer.Close()

for {
select {
case <-ctx.Done():
Expand All @@ -538,7 +565,16 @@ func (c *parallelEventConsumer) workerLoop(
defer c.mu.Unlock()
return c.mu.termErr
case e := <-c.workerCh[id]:
// Request CPU time to use for event consumption, block if this time is
// unavailable. If there is unused CPU time left from the last call to
// Pace, then use that time instead of blocking.
if err := pacer.Pace(ctx); err != nil {
c.decInFlight()
return c.setWorkerError(err)
}

if err := consumer.ConsumeEvent(ctx, e); err != nil {
c.decInFlight()
return c.setWorkerError(err)
}
c.decInFlight()
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package server
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -876,6 +877,10 @@ func makeTenantSQLServerArgs(
eventsServer.TestingKnobs = knobs.(obs.EventServerTestingKnobs)
}

// TODO(jayant): generate admission.NewGrantCoordinators and pass them
// as server args, following the same pattern as server.NewServer.
var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil

return sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil),
Expand Down Expand Up @@ -925,6 +930,7 @@ func makeTenantSQLServerArgs(
grpc: grpcServer,
eventsServer: eventsServer,
externalStorageBuilder: esb,
pacerMaker: noopElasticCPUGrantCoord,
}, nil
}

Expand Down

0 comments on commit 58b9ab3

Please sign in to comment.