Skip to content

Commit

Permalink
changefeedccl: Fix flaky test.
Browse files Browse the repository at this point in the history
Fix flaky TestChangefeedHandlesDrainingNodes test.
The source of the flake was that cluster setting updates propagate
asynchronously to the other nodes in the cluster.  Thus, it was possible
for the test to flake because some of the nodes were observing the
old value for the setting.

The flake is fixed by introducing testing utility function that
sets the setting and ensures the setting propagates to all nodes in
the test cluster.

Fixes #76806

Release Notes: none
Release Justification: test only change.
  • Loading branch information
Yevgeniy Miretskiy committed Mar 3, 2022
1 parent 4865605 commit 8850dc1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
37 changes: 20 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4227,8 +4227,8 @@ func TestChangefeedNodeShutdown(t *testing.T) {
defer tc.Stopper().Stop(context.Background())

db := tc.ServerConn(1)
serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", time.Millisecond)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`)
sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
Expand Down Expand Up @@ -4439,19 +4439,22 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
skip.UnderRace(t, "Takes too long with race enabled")

shouldDrain := true
knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
Flowinfra: &flowinfra.TestingKnobs{
FlowRegistryDraining: func() bool {
if shouldDrain {
shouldDrain = false
return true
}
return false
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
Flowinfra: &flowinfra.TestingKnobs{
FlowRegistryDraining: func() bool {
if shouldDrain {
shouldDrain = false
return true
}
return false
},
},
},
}}
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

sinkDir, cleanupFn := testutils.TempDir(t)
defer cleanupFn()
Expand All @@ -4466,9 +4469,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {

db := tc.ServerConn(1)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`)
serverutils.SetClusterSetting(t, tc, "kv.rangefeed.enabled", true)
serverutils.SetClusterSetting(t, tc, "kv.closed_timestamp.target_duration", time.Second)
serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", 10*time.Millisecond)

sqlutils.CreateTable(
t, db, "foo",
Expand All @@ -4491,9 +4494,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
defer closeFeed(t, feed)

// At this point, the job created by feed will fail to start running on node 0 due to draining
// registry. However, this job will be retried, and it should succeeded.
// registry. However, this job will be retried, and it should succeed.
// Note: This test is a bit unrealistic in that if the registry is draining, that
// means that the server is draining (i.e being shut down). We don't do a full shutdown
// means that the server is draining (i.e. being shut down). We don't do a full shutdown
// here, but we are simulating a restart by failing to start a flow the first time around.
assertPayloads(t, feed, []string{
`foo: [1]->{"after": {"k": 1, "v": 1}}`,
Expand Down
2 changes: 2 additions & 0 deletions pkg/testutils/serverutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "serverutils",
srcs = [
"test_cluster_shim.go",
"test_cluster_utils.go",
"test_server_shim.go",
"test_tenant_shim.go",
],
Expand All @@ -20,6 +21,7 @@ go_library(
"//pkg/server/status",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/httputil",
Expand Down
50 changes: 50 additions & 0 deletions pkg/testutils/serverutils/test_cluster_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package serverutils

import (
"context"
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/testutils"
)

// SetClusterSetting executes set cluster settings statement, and then ensures that
// all nodes in the test cluster see that setting update.
func SetClusterSetting(t testutils.TB, c TestClusterInterface, name string, value interface{}) {
t.Helper()
strVal := func() string {
switch v := value.(type) {
case string:
return v
case int, int32, int64:
return fmt.Sprintf("%d", v)
case bool:
return strconv.FormatBool(v)
case float32, float64:
return fmt.Sprintf("%f", v)
case fmt.Stringer:
return v.String()
default:
return fmt.Sprintf("%v", value)
}
}()
query := fmt.Sprintf("SET CLUSTER SETTING %s='%s'", name, strVal)
// Set cluster setting statement ensures the setting is propagated to the local registry.
// So, just execute the query against each node in the cluster.
for i := 0; i < c.NumServers(); i++ {
_, err := c.ServerConn(i).ExecContext(context.Background(), query)
if err != nil {
t.Fatal(err)
}
}
}

0 comments on commit 8850dc1

Please sign in to comment.