From eb3e4d843ae9a8bd70d53443a7eba75e193cfbe3 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 1 Nov 2022 14:49:59 -0400 Subject: [PATCH] server/cdc: add elastic CPU control to CDC event processing Previously, the SQL layer did not have access to admission control via `kvadmission.Controller`. This change makes the controller available inside the SQL server config. Note that it has not been made available in tenant servers; this is left as a todo. Previously, the CPU-bound work of CDC event processing (encoding / decoding rows) had the potential to consume a lot of CPU and disrupt foreground SQL traffic. This changes adds elastic CPU control to event processing so that it does not use excessive CPU and starve foreground traffic. Fixes: https://github.com/cockroachdb/cockroach/issues/90089 Release note: None --- .../settings/settings-for-tenants.txt | 4 +- docs/generated/settings/settings.html | 4 +- pkg/ccl/changefeedccl/BUILD.bazel | 2 + pkg/ccl/changefeedccl/bench_test.go | 2 +- .../changefeedccl/changefeedbase/settings.go | 19 ++++++- pkg/ccl/changefeedccl/event_processing.go | 49 +++++++++++++++++-- pkg/server/tenant.go | 6 +++ 7 files changed, 76 insertions(+), 10 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index d86829f9dd48..95cc62b34191 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -11,8 +11,8 @@ bulkio.backup.read_timeout duration 5m0s amount of time after which a read attem bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes -changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer -changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value +changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer +changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled changefeed.fast_gzip.enabled boolean true use fast gzip implementation changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 2e984b154339..9832b609a053 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -17,8 +17,8 @@ bulkio.backup.read_with_priority_afterduration1m0samount of time since the read-as-of time above which a BACKUP should use priority when retrying reads bulkio.stream_ingestion.minimum_flush_intervalduration5sthe minimum timestamp between flushes; flushes may still occur if internal buffers fill up changefeed.balance_range_distribution.enablebooleanfalseif enabled, the ranges are balanced equally among all nodes -changefeed.event_consumer_worker_queue_sizeinteger16if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer -changefeed.event_consumer_workersinteger0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value +changefeed.event_consumer_worker_queue_sizeinteger16if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer +changefeed.event_consumer_workersinteger0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled changefeed.fast_gzip.enabledbooleantrueuse fast gzip implementation changefeed.node_throttle_configstringspecifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_afterduration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disables diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5cccc7d139bf..747c175361b8 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -102,6 +102,8 @@ go_library( "//pkg/sql/sqlutil", "//pkg/sql/types", "//pkg/util", + "//pkg/util/admission", + "//pkg/util/admission/admissionpb", "//pkg/util/bitarray", "//pkg/util/bufalloc", "//pkg/util/cache", diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 1d4d3916b025..7e17fd45a32a 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -250,7 +250,7 @@ func createBenchmarkChangefeed( serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, nil, sf, initialHighWater, sink, encoder, makeChangefeedConfigFromJobDetails(details), - execinfrapb.Expression{}, TestingKnobs{}, nil) + execinfrapb.Expression{}, TestingKnobs{}, nil, nil) if err != nil { return nil, nil, err diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 22841234c7e1..42bdf48e0c99 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -242,7 +242,8 @@ var EventConsumerWorkers = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.event_consumer_workers", "the number of workers to use when processing events: <0 disables, "+ - "0 assigns a reasonable default, >0 assigns the setting value", + "0 assigns a reasonable default, >0 assigns the setting value. for experimental/core "+ + "changefeeds and changefeeds using parquet format, this is disabled", 0, ).WithPublic() @@ -250,8 +251,22 @@ var EventConsumerWorkers = settings.RegisterIntSetting( var EventConsumerWorkerQueueSize = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.event_consumer_worker_queue_size", - "if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events"+ + "if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events "+ "which a worker can buffer", int64(util.ConstantWithMetamorphicTestRange("changefeed.event_consumer_worker_queue_size", 16, 0, 16)), settings.NonNegativeInt, ).WithPublic() + +// EventConsumerPacerRequestSize specifies how often (measured in CPU time) +// that event consumer workers request CPU time from admission control. +// For example, every N milliseconds of CPU work, request N more +// milliseconds of CPU time. +var EventConsumerPacerRequestSize = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.event_consumer_pacer_request_size", + "an event consumer worker will perform a blocking request for CPU time "+ + "before consuming events. after fully utilizing this CPU time, it will "+ + "request more", + 50*time.Millisecond, + settings.PositiveDuration, +) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index de387e1f025c..05d7e0bcb283 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -18,10 +18,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -62,6 +65,17 @@ type kvEventToRowConsumer struct { topicDescriptorCache map[TopicIdentifier]TopicDescriptor topicNamer *TopicNamer + + // This pacer is used to incorporate event consumption to elastic CPU + // control. This helps ensure that event encoding/decoding does not throttle + // foreground SQL traffic. + // + // Note that for pacer to function correctly, + // kvEventToRowConsumer.ConsumeEvent must be called by the same goroutine in a + // tight loop. + // + // The pacer is closed by kvEventToRowConsumer.Close. + pacer *admission.Pacer } func newEventConsumer( @@ -85,6 +99,8 @@ func newEventConsumer( return nil, nil, err } + pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV) + makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) { var err error encoder, err := getEncoder(encodingOpts, feed.Targets) @@ -100,22 +116,39 @@ func newEventConsumer( } } + tenantID, ok := roachpb.TenantFromContext(ctx) + if !ok { + tenantID = roachpb.SystemTenantID + } + pacer := cfg.PacerMaker.NewPacer( + pacerRequestUnit, + admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.BulkNormalPri, + CreateTime: timeutil.Now().UnixNano(), + BypassAdmission: false, + }, + ) + return newKVEventToRowConsumer(ctx, cfg, evalCtx, frontier, cursor, s, - encoder, details, expr, knobs, topicNamer) + encoder, details, expr, knobs, topicNamer, pacer) } - // TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds numWorkers := changefeedbase.EventConsumerWorkers.Get(&cfg.Settings.SV) if numWorkers == 0 { // Pick a reasonable default. numWorkers = defaultNumWorkers() } + // The descriptions for event_consumer_worker settings should also be updated + // when these TODOs are completed. + // + // TODO (ganeshb) Add support for parallel encoding when using parquet. // We cannot have a separate encoder and sink for parquet format (see // parquet_sink_cloudstorage.go). Because of this the current nprox solution // does not work for parquet format. // - //TODO (ganeshb) Add support for parallel encoding + // TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds. if numWorkers <= 1 || isSinkless || encodingOpts.Format == changefeedbase.OptFormatParquet { c, err := makeConsumer(sink, spanFrontier) if err != nil { @@ -174,6 +207,7 @@ func newKVEventToRowConsumer( expr execinfrapb.Expression, knobs TestingKnobs, topicNamer *TopicNamer, + pacer *admission.Pacer, ) (*kvEventToRowConsumer, error) { includeVirtual := details.Opts.IncludeVirtual() keyOnly := details.Opts.KeyOnly() @@ -215,6 +249,7 @@ func newKVEventToRowConsumer( evaluator: evaluator, safeExpr: safeExpr, encodingFormat: encodingOpts.Format, + pacer: pacer, }, nil } @@ -242,6 +277,13 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even return errors.AssertionFailedf("expected kv ev, got %v", ev.Type()) } + // Request CPU time to use for event consumption, block if this time is + // unavailable. If there is unused CPU time left from the last call to + // Pace, then use that time instead of blocking. + if err := c.pacer.Pace(ctx); err != nil { + return err + } + schemaTimestamp := ev.KV().Value.Timestamp prevSchemaTimestamp := schemaTimestamp mvccTimestamp := ev.MVCCTimestamp() @@ -382,6 +424,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even // Close is a noop for the kvEventToRowConsumer because it // has no goroutines in flight. func (c *kvEventToRowConsumer) Close() error { + c.pacer.Close() return nil } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b89c387fc85b..7975ccb29429 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -13,6 +13,7 @@ package server import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/util/admission" "net/http" "os" "path/filepath" @@ -876,6 +877,10 @@ func makeTenantSQLServerArgs( eventsServer.TestingKnobs = knobs.(obs.EventServerTestingKnobs) } + // TODO(jayant): generate admission.NewGrantCoordinators and pass them + // as server args, following the same pattern as server.NewServer. + var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil + return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -925,6 +930,7 @@ func makeTenantSQLServerArgs( grpc: grpcServer, eventsServer: eventsServer, externalStorageBuilder: esb, + pacerMaker: noopElasticCPUGrantCoord, }, nil }