diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 04f1ccb6abcf..baf05076005f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4227,8 +4227,8 @@ func TestChangefeedNodeShutdown(t *testing.T) { defer tc.Stopper().Stop(context.Background()) db := tc.ServerConn(1) + serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", time.Millisecond) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`) sqlDB.Exec(t, `CREATE DATABASE d`) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) @@ -4439,19 +4439,22 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { skip.UnderRace(t, "Takes too long with race enabled") shouldDrain := true - knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{ - DrainFast: true, - Changefeed: &TestingKnobs{}, - Flowinfra: &flowinfra.TestingKnobs{ - FlowRegistryDraining: func() bool { - if shouldDrain { - shouldDrain = false - return true - } - return false + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + DrainFast: true, + Changefeed: &TestingKnobs{}, + Flowinfra: &flowinfra.TestingKnobs{ + FlowRegistryDraining: func() bool { + if shouldDrain { + shouldDrain = false + return true + } + return false + }, }, }, - }} + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } sinkDir, cleanupFn := testutils.TempDir(t) defer cleanupFn() @@ -4466,9 +4469,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { db := tc.ServerConn(1) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) + serverutils.SetClusterSetting(t, tc, "kv.rangefeed.enabled", true) + serverutils.SetClusterSetting(t, tc, "kv.closed_timestamp.target_duration", time.Second) + serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", 10*time.Millisecond) sqlutils.CreateTable( t, db, "foo", @@ -4491,9 +4494,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer closeFeed(t, feed) // At this point, the job created by feed will fail to start running on node 0 due to draining - // registry. However, this job will be retried, and it should succeeded. + // registry. However, this job will be retried, and it should succeed. // Note: This test is a bit unrealistic in that if the registry is draining, that - // means that the server is draining (i.e being shut down). We don't do a full shutdown + // means that the server is draining (i.e. being shut down). We don't do a full shutdown // here, but we are simulating a restart by failing to start a flow the first time around. assertPayloads(t, feed, []string{ `foo: [1]->{"after": {"k": 1, "v": 1}}`, diff --git a/pkg/testutils/serverutils/BUILD.bazel b/pkg/testutils/serverutils/BUILD.bazel index a4c79352e774..f8492f82c684 100644 --- a/pkg/testutils/serverutils/BUILD.bazel +++ b/pkg/testutils/serverutils/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "serverutils", srcs = [ "test_cluster_shim.go", + "test_cluster_utils.go", "test_server_shim.go", "test_tenant_shim.go", ], @@ -20,6 +21,7 @@ go_library( "//pkg/server/status", "//pkg/settings/cluster", "//pkg/storage", + "//pkg/testutils", "//pkg/testutils/sqlutils", "//pkg/util/hlc", "//pkg/util/httputil", diff --git a/pkg/testutils/serverutils/test_cluster_utils.go b/pkg/testutils/serverutils/test_cluster_utils.go new file mode 100644 index 000000000000..1e3e92013eee --- /dev/null +++ b/pkg/testutils/serverutils/test_cluster_utils.go @@ -0,0 +1,50 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package serverutils + +import ( + "context" + "fmt" + "strconv" + + "github.com/cockroachdb/cockroach/pkg/testutils" +) + +// SetClusterSetting executes set cluster settings statement, and then ensures that +// all nodes in the test cluster see that setting update. +func SetClusterSetting(t testutils.TB, c TestClusterInterface, name string, value interface{}) { + t.Helper() + strVal := func() string { + switch v := value.(type) { + case string: + return v + case int, int32, int64: + return fmt.Sprintf("%d", v) + case bool: + return strconv.FormatBool(v) + case float32, float64: + return fmt.Sprintf("%f", v) + case fmt.Stringer: + return v.String() + default: + return fmt.Sprintf("%v", value) + } + }() + query := fmt.Sprintf("SET CLUSTER SETTING %s='%s'", name, strVal) + // Set cluster setting statement ensures the setting is propagated to the local registry. + // So, just execute the query against each node in the cluster. + for i := 0; i < c.NumServers(); i++ { + _, err := c.ServerConn(i).ExecContext(context.Background(), query) + if err != nil { + t.Fatal(err) + } + } +}