diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index e03044faa0bb..4bb125e722a1 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -15,6 +15,7 @@ go_library( deps = [ "//pkg/jobs/joberror", "//pkg/jobs/jobspb", + "//pkg/kv/kvclient/kvcoord", "//pkg/roachpb", "//pkg/settings", "//pkg/sql", diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index a10415b4ad4c..22ab91666e03 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" @@ -108,14 +109,22 @@ func IsRetryableError(err error) bool { return true } + // During node shutdown it is possible for all outgoing transports used by + // the kvfeed to expire, producing a SendError that the node is still able + // to propagate to the frontier. This has been known to happen during + // cluster upgrades. This scenario should not fail the changefeed. + if kvcoord.IsSendError(err) { + return true + } + // TODO(knz): this is a bad implementation. Make it go away // by avoiding string comparisons. + // If a RetryableError occurs on a remote node, DistSQL serializes it such + // that we can't recover the structure and we have to rely on this + // unfortunate string comparison. errStr := err.Error() - if strings.Contains(errStr, retryableErrorString) { - // If a RetryableError occurs on a remote node, DistSQL serializes it such - // that we can't recover the structure and we have to rely on this - // unfortunate string comparison. + if strings.Contains(errStr, retryableErrorString) || strings.Contains(errStr, kvcoord.SendErrorString) { return true } diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index f609a133c77a..16619198f866 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -494,12 +494,6 @@ func (f *kvFeed) runUntilTableEvent( return nil } else if tErr := (*errEndTimeReached)(nil); errors.As(err, &tErr) { return err - } else if kvcoord.IsSendError(err) { - // During node shutdown it is possible for all outgoing transports used by - // the kvfeed to expire, producing a SendError that the node is still able - // to propagate to the frontier. This has been known to happen during - // cluster upgrades. This scenario should not fail the changefeed. - return changefeedbase.MarkRetryableError(err) } else { return err } diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index fdc508143029..28e66b5fcf1e 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "activerecord.go", "activerecord_blocklist.go", "admission_control.go", + "admission_control_elastic_backup.go", "admission_control_multi_store_overload.go", "admission_control_snapshot_overload.go", "admission_control_tpcc_overload.go", diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index b8e092247826..4d3835f14493 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -28,6 +28,7 @@ func registerAdmission(r registry.Registry) { // roachperf. Need to munge with histogram data to compute % test run spent // over some latency threshold. Will be Useful to track over time. + registerElasticControlForBackups(r) registerMultiStoreOverload(r) registerSnapshotOverload(r) registerTPCCOverload(r) diff --git a/pkg/cmd/roachtest/tests/admission_control_elastic_backup.go b/pkg/cmd/roachtest/tests/admission_control_elastic_backup.go new file mode 100644 index 000000000000..77b09c9b7dc6 --- /dev/null +++ b/pkg/cmd/roachtest/tests/admission_control_elastic_backup.go @@ -0,0 +1,110 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" +) + +// This test sets up a 3-node CRDB cluster on 8vCPU machines running +// 1000-warehouse TPC-C with an aggressive (every 20m) full backup schedule. +// We've observed latency spikes during backups because of its CPU-heavy nature +// -- it can elevate CPU scheduling latencies which in turn translates to an +// increase in foreground latency. In #86638 we introduced admission control +// mechanisms to dynamically pace such work while maintaining acceptable CPU +// scheduling latencies (sub millisecond p99s). This roachtest exercises that +// machinery. +// +// TODO(irfansharif): Add libraries to automatically spit out the degree to +// which {CPU-scheduler,foreground} latencies are protected and track this data +// in roachperf. +func registerElasticControlForBackups(r registry.Registry) { + r.Add(registry.TestSpec{ + Name: "admission-control/elastic-backup", + Owner: registry.OwnerAdmissionControl, + // TODO(irfansharif): After two weeks of nightly baking time, reduce + // this to a weekly cadence. This is a long-running test and serves only + // as a coarse-grained benchmark. + // Tags: []string{`weekly`}, + Cluster: r.MakeClusterSpec(4, spec.CPU(8)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + if c.Spec().NodeCount < 4 { + t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount) + } + + crdbNodes := c.Spec().NodeCount - 1 + workloadNode := crdbNodes + 1 + numWarehouses, workloadDuration, estimatedSetupTime := 1000, 90*time.Minute, 10*time.Minute + if c.IsLocal() { + numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute + } + + promCfg := &prometheus.Config{} + promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]). + WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()). + WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()). + WithGrafanaDashboard("http://go.crdb.dev/p/backup-admission-control-grafana"). + WithScrapeConfigs( + prometheus.MakeWorkloadScrapeConfig("workload", "/", + makeWorkloadScrapeNodes( + c.Node(workloadNode).InstallNodes()[0], + []workloadInstance{{nodes: c.Node(workloadNode)}}, + ), + ), + ) + + if t.SkipInit() { + t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute)) + } else { + t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute)) + } + + runTPCC(ctx, t, c, tpccOptions{ + Warehouses: numWarehouses, + Duration: workloadDuration, + SetupType: usingImport, + EstimatedSetupTime: estimatedSetupTime, + SkipPostRunCheck: true, + ExtraSetupArgs: "--checks=false", + PrometheusConfig: promCfg, + During: func(ctx context.Context) error { + db := c.Conn(ctx, t.L(), crdbNodes) + defer db.Close() + + t.Status(fmt.Sprintf("during: enabling admission control (<%s)", 30*time.Second)) + setAdmissionControl(ctx, t, c, true) + + m := c.NewMonitor(ctx, c.Range(1, crdbNodes)) + m.Go(func(ctx context.Context) error { + t.Status(fmt.Sprintf("during: creating full backup schedule to run every 20m (<%s)", time.Minute)) + _, err := db.ExecContext(ctx, + `CREATE SCHEDULE FOR BACKUP INTO $1 RECURRING '*/20 * * * *' FULL BACKUP ALWAYS WITH SCHEDULE OPTIONS ignore_existing_backups;`, + "gs://cockroachdb-backup-testing/"+c.Name()+"?AUTH=implicit", + ) + return err + }) + m.Wait() + + t.Status(fmt.Sprintf("during: waiting for workload to finish (<%s)", workloadDuration)) + return nil + }, + }) + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go b/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go index 216b58fbd26d..d34186c8081c 100644 --- a/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go +++ b/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go @@ -89,17 +89,19 @@ func registerSnapshotOverload(r registry.Registry) { } t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute)) - promCfg := &prometheus.Config{} - promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]) - promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) - promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) - promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload", - "/", makeWorkloadScrapeNodes(c.Node(workloadNode).InstallNodes()[0], []workloadInstance{ - {nodes: c.Node(workloadNode)}, - }))) - promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana") - _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil) - defer cleanupFunc() + { + promCfg := &prometheus.Config{} + promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]) + promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) + promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) + promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload", + "/", makeWorkloadScrapeNodes(c.Node(workloadNode).InstallNodes()[0], []workloadInstance{ + {nodes: c.Node(workloadNode)}, + }))) + promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana") + _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil) + defer cleanupFunc() + } var constraints []string for i := 1; i <= crdbNodes; i++ { diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index 3d06ad4fd750..fead65e29d9b 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -52,13 +53,14 @@ const ( ) type tpccOptions struct { - Warehouses int - ExtraRunArgs string - ExtraSetupArgs string - Chaos func() Chaos // for late binding of stopper - During func(context.Context) error // for running a function during the test - Duration time.Duration // if zero, TPCC is not invoked - SetupType tpccSetupType + Warehouses int + ExtraRunArgs string + ExtraSetupArgs string + Chaos func() Chaos // for late binding of stopper + During func(context.Context) error // for running a function during the test + Duration time.Duration // if zero, TPCC is not invoked + SetupType tpccSetupType + EstimatedSetupTime time.Duration // PrometheusConfig, if set, overwrites the default prometheus config settings. PrometheusConfig *prometheus.Config // DisablePrometheus will force prometheus to not start up. @@ -86,6 +88,8 @@ type tpccOptions struct { // // TODO(tbg): remove this once https://github.com/cockroachdb/cockroach/issues/74705 is completed. EnableCircuitBreakers bool + // SkipPostRunCheck, if set, skips post TPC-C run checks. + SkipPostRunCheck bool } type workloadInstance struct { @@ -125,6 +129,7 @@ func setupTPCC( // Randomize starting with encryption-at-rest enabled. crdbNodes = c.Range(1, c.Spec().NodeCount-1) workloadNode = c.Node(c.Spec().NodeCount) + if c.IsLocal() { opts.Warehouses = 1 } @@ -134,10 +139,12 @@ func setupTPCC( // NB: workloadNode also needs ./cockroach because // of `./cockroach workload` for usingImport. c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) - // We still use bare workload, though we could likely replace - // those with ./cockroach workload as well. - c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), crdbNodes) + settings := install.MakeClusterSettings() + if c.IsLocal() { + settings.Env = append(settings.Env, "COCKROACH_SCAN_INTERVAL=200ms") + settings.Env = append(settings.Env, "COCKROACH_SCAN_MAX_IDLE_TIME=5ms") + } + c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, crdbNodes) } } @@ -149,29 +156,39 @@ func setupTPCC( _, err := db.Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '15s'`) require.NoError(t, err) } - err := WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0])) - require.NoError(t, err) + + if t.SkipInit() { + return + } + + require.NoError(t, WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0]))) + + estimatedSetupTimeStr := "" + if opts.EstimatedSetupTime != 0 { + estimatedSetupTimeStr = fmt.Sprintf(" (<%s)", opts.EstimatedSetupTime) + } + switch opts.SetupType { case usingExistingData: // Do nothing. case usingImport: - t.Status("loading fixture") + t.Status("loading fixture" + estimatedSetupTimeStr) c.Run(ctx, crdbNodes[:1], tpccImportCmd(opts.Warehouses, opts.ExtraSetupArgs)) case usingInit: - t.Status("initializing tables") + t.Status("initializing tables" + estimatedSetupTimeStr) extraArgs := opts.ExtraSetupArgs if !t.BuildVersion().AtLeast(version.MustParse("v20.2.0")) { extraArgs += " --deprecated-fk-indexes" } cmd := fmt.Sprintf( - "./workload init tpcc --warehouses=%d %s {pgurl:1}", + "./cockroach workload init tpcc --warehouses=%d %s {pgurl:1}", opts.Warehouses, extraArgs, ) c.Run(ctx, workloadNode, cmd) default: t.Fatal("unknown tpcc setup type") } - t.Status("") + t.Status("finished tpc-c setup") }() return crdbNodes, workloadNode } @@ -223,7 +240,6 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio rampDuration = 30 * time.Second } crdbNodes, workloadNode := setupTPCC(ctx, t, c, opts) - t.Status("waiting") m := c.NewMonitor(ctx, crdbNodes) for i := range workloadInstances { // Make a copy of i for the goroutine. @@ -235,7 +251,8 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio if len(workloadInstances) > 1 { statsPrefix = fmt.Sprintf("workload_%d.", i) } - t.WorkerStatus(fmt.Sprintf("running tpcc idx %d on %s", i, pgURLs[i])) + t.WorkerStatus(fmt.Sprintf("running tpcc worker=%d warehouses=%d ramp=%s duration=%s on %s (<%s)", + i, opts.Warehouses, rampDuration, opts.Duration, pgURLs[i], time.Minute)) cmd := fmt.Sprintf( "./cockroach workload run tpcc --warehouses=%d --histograms="+t.PerfArtifactsDir()+"/%sstats.json "+ opts.ExtraRunArgs+" --ramp=%s --duration=%s --prometheus-port=%d --pprofport=%d %s %s", @@ -260,8 +277,10 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio } m.Wait() - c.Run(ctx, workloadNode, fmt.Sprintf( - "./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses)) + if !opts.SkipPostRunCheck { + c.Run(ctx, workloadNode, fmt.Sprintf( + "./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses)) + } // Check no errors from metrics. if ep != nil { @@ -1447,15 +1466,21 @@ func setupPrometheusForRoachtest( } } if c.IsLocal() { - t.Skip("skipping test as prometheus is needed, but prometheus does not yet work locally") + t.Status("ignoring prometheus setup given --local was specified") return nil, func() {} } - if err := c.StartGrafana(ctx, t.L(), cfg); err != nil { + t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute)) + + quietLogger, err := t.L().ChildLogger("start-grafana", logger.QuietStdout, logger.QuietStderr) + if err != nil { + t.Fatal(err) + } + if err := c.StartGrafana(ctx, quietLogger, cfg); err != nil { t.Fatal(err) } cleanupFunc := func() { - if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil { + if err := c.StopGrafana(ctx, quietLogger, t.ArtifactsDir()); err != nil { t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err) } } diff --git a/pkg/cmd/roachtest/tests/util.go b/pkg/cmd/roachtest/tests/util.go index 71ab5f628b28..99924ef89eeb 100644 --- a/pkg/cmd/roachtest/tests/util.go +++ b/pkg/cmd/roachtest/tests/util.go @@ -37,7 +37,7 @@ func WaitFor3XReplication(ctx context.Context, t test.Test, db *gosql.DB) error func WaitForReplication( ctx context.Context, t test.Test, db *gosql.DB, replicationFactor int, ) error { - t.L().Printf("waiting for initial up-replication...") + t.L().Printf("waiting for initial up-replication... (<%s)", 2*time.Minute) tStart := timeutil.Now() var oldN int for { @@ -114,8 +114,12 @@ func setAdmissionControl(ctx context.Context, t test.Test, c cluster.Cluster, en if !enabled { val = "false" } - for _, setting := range []string{"admission.kv.enabled", "admission.sql_kv_response.enabled", - "admission.sql_sql_response.enabled"} { + for _, setting := range []string{ + "admission.kv.enabled", + "admission.sql_kv_response.enabled", + "admission.sql_sql_response.enabled", + "admission.elastic_cpu.enabled", + } { if _, err := db.ExecContext( ctx, "SET CLUSTER SETTING "+setting+" = '"+val+"'"); err != nil { t.Fatalf("failed to set admission control to %t: %v", enabled, err) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index eba885e24220..7f154c63d7ea 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2388,8 +2388,12 @@ func TestNewSendError(msg string) error { return newSendError(msg) } +// SendErrorString is the prefix for all sendErrors, exported in order to +// perform cross-node error-checks. +const SendErrorString = "failed to send RPC" + func (s sendError) Error() string { - return "failed to send RPC: " + s.message + return SendErrorString + ": " + s.message } // IsSendError returns true if err is a sendError. diff --git a/pkg/roachprod/prometheus/prometheus.go b/pkg/roachprod/prometheus/prometheus.go index ab9278586eef..f35597157788 100644 --- a/pkg/roachprod/prometheus/prometheus.go +++ b/pkg/roachprod/prometheus/prometheus.go @@ -140,6 +140,13 @@ func (cfg *Config) WithGrafanaDashboard(url string) *Config { return cfg } +// WithScrapeConfigs adds scraping configs to the prometheus instance. Chains +// for convenience. +func (cfg *Config) WithScrapeConfigs(config ...ScrapeConfig) *Config { + cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, config...) + return cfg +} + // WithNodeExporter causes node_exporter to be set up on the specified machines, // a separate process that sends hardware metrics to prometheus. // For more on the node exporter process, see https://prometheus.io/docs/guides/node-exporter/ diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 9b42a104c97e..25019863f24f 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -724,6 +724,12 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) { } } + idx := tab.Index(scan.Index) + if idx.IsInverted() && len(scan.InvertedConstraint) == 0 { + return execPlan{}, + errors.AssertionFailedf("expected inverted index scan to have an inverted constraint") + } + // Save if we planned a full table/index scan on the builder so that the // planner can be made aware later. We only do this for non-virtual tables. stats := scan.Relational().Statistics()