From d4ef1477486b486f025c3ee7d2d837b8889a05db Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Wed, 8 May 2024 15:27:26 -0400 Subject: [PATCH 1/2] roachtest: expand and cloudify cdc/initial-scan-rolling-restart This patch expands the `cdc/initial-scan-rolling-restart` test to also test normal checkpoints (in addition to shutdown checkpoints). It also updates the test to work on GCE so that it can run during nightly CI. Release note: None --- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/cdc.go | 96 +++++++++++++++++++++------- pkg/workload/debug/webhook_server.go | 6 +- 3 files changed, 76 insertions(+), 27 deletions(-) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 2cd8d3de9f0a..27842f2c5f42 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -274,6 +274,7 @@ go_library( "//pkg/util/uuid", "//pkg/util/version", "//pkg/workload", + "//pkg/workload/debug", "//pkg/workload/histogram", "//pkg/workload/querybench", "//pkg/workload/tpcc", diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 3fdb9b322799..2c4a884c663d 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -62,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/workload/debug" "github.com/cockroachdb/errors" "golang.org/x/oauth2/clientcredentials" ) @@ -908,11 +909,23 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { m.Wait() } -func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cluster) { - const rowCount, splitCount = 1000000, 500 +type cdcCheckpointType int + +const ( + cdcNormalCheckpoint cdcCheckpointType = iota + cdcShutdownCheckpoint +) + +// runCDCInitialScanRollingRestart runs multiple initial-scan-only changefeeds +// on a 4-node cluster, using node 1 as the coordinator and continuously +// restarting nodes 2-4 to hopefully force the changefeed to replan and exercise +// the checkpoint restore logic. +func runCDCInitialScanRollingRestart( + ctx context.Context, t test.Test, c cluster.Cluster, checkpointType cdcCheckpointType, +) { startOpts := option.DefaultStartOpts() - ips, err := c.InternalIP(ctx, t.L(), c.Node(1)) - sinkURL := fmt.Sprintf("https://%s:9707", ips[0]) + ips, err := c.ExternalIP(ctx, t.L(), c.Node(1)) + sinkURL := fmt.Sprintf("https://%s:%d", ips[0], debug.WebhookServerPort) sink := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}} if err != nil { t.Fatal(err) @@ -922,7 +935,7 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl racks := install.MakeClusterSettings(install.NumRacksOption(c.Spec().NodeCount)) racks.Env = append(racks.Env, `COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true`) c.Start(ctx, t.L(), option.DefaultStartOpts(), racks) - m := c.NewMonitor(ctx, c.Range(1, 5)) + m := c.NewMonitor(ctx, c.All()) restart := func(n int) error { cmd := fmt.Sprintf("./cockroach node drain --certs-dir=%s --port={pgport:%d} --self", install.CockroachNodeCertsDir, n) @@ -945,21 +958,42 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl db := c.Conn(ctx, t.L(), 1) - // Setup a 1M row table that is split into >= 500 scattered ranges. + // Setup a large table with 1M rows and a small table with 5 rows. // Keep ranges off n1 so that our plans use 2, 3, and 4. + const ( + largeRowCount = 1000000 + smallRowCount = 5 + ) t.L().Printf("setting up test data...") - for _, s := range []string{ + setupStmts := []string{ `ALTER RANGE default CONFIGURE ZONE USING constraints = '[-rack=0]'`, - fmt.Sprintf(`CREATE TABLE t (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, rowCount), - `ALTER TABLE t SCATTER`, - // Split some bigger chunks up to scatter it a bit more. - fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount/4), - `ALTER TABLE t SCATTER`, - // Finish splitting, so that drained ranges spread out evenly. - fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount), + fmt.Sprintf(`CREATE TABLE large (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, largeRowCount), + `ALTER TABLE large SCATTER`, + fmt.Sprintf(`CREATE TABLE small (id PRIMARY KEY) AS SELECT generate_series(%d, %d)`, largeRowCount+1, largeRowCount+smallRowCount), + `ALTER TABLE small SCATTER`, `SET CLUSTER SETTING jobs.registry.retry.initial_delay = '.1s'`, `SET CLUSTER SETTING jobs.registry.retry.max_delay = '.4s'`, - } { + } + switch checkpointType { + case cdcNormalCheckpoint: + setupStmts = append(setupStmts, + `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`, + `SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`, + ) + case cdcShutdownCheckpoint: + const largeSplitCount = 5 + setupStmts = append(setupStmts, + `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`, + `SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`, + // Split some bigger chunks up to scatter it a bit more. + fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4), + `ALTER TABLE large SCATTER`, + // Finish splitting, so that drained ranges spread out evenly. + fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount), + `ALTER TABLE large SCATTER`, + ) + } + for _, s := range setupStmts { t.L().Printf(s) if _, err := db.Exec(s); err != nil { t.Fatal(err) @@ -988,7 +1022,7 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl }() t.L().Printf("starting rolling drain+restarts of 2, 3, 4...") for { - for _, n := range []int{2, 3, 4, 5} { + for _, n := range []int{2, 3, 4} { select { case <-stopRestarts: return nil @@ -1015,11 +1049,12 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl t.L().Printf("exiting webhook sink status: %v", err) }() - for i := 1; i < 5; i++ { + const numChangefeeds = 5 + for i := 1; i < numChangefeeds; i++ { t.L().Printf("starting changefeed...") var job int if err := db.QueryRow( - fmt.Sprintf("CREATE CHANGEFEED FOR TABLE t INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL), + fmt.Sprintf("CREATE CHANGEFEED FOR TABLE large, small INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL), ).Scan(&job); err != nil { t.Fatal(err) } @@ -1054,8 +1089,9 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl t.Fatal(err) } t.L().Printf("sink got %d unique, %d dupes", unique, dupes) - if unique != rowCount { - t.Fatalf("expected %d, got %d", rowCount, unique) + expected := largeRowCount + smallRowCount + if unique != expected { + t.Fatalf("expected %d, got %d", expected, unique) } _, err = sink.Get(sinkURL + "/reset") t.L().Printf("resetting sink %v", err) @@ -1286,15 +1322,27 @@ func registerCDC(r registry.Registry) { }, }) r.Add(registry.TestSpec{ - Name: "cdc/initial-scan-rolling-restart", + Name: "cdc/initial-scan-rolling-restart/normal-checkpoint", + Owner: registry.OwnerCDC, + Cluster: r.MakeClusterSpec(4), + RequiresLicense: true, + CompatibleClouds: registry.OnlyGCE, + Suites: registry.Suites(registry.Nightly), + Timeout: time.Minute * 15, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runCDCInitialScanRollingRestart(ctx, t, c, cdcNormalCheckpoint) + }, + }) + r.Add(registry.TestSpec{ + Name: "cdc/initial-scan-rolling-restart/shutdown-checkpoint", Owner: registry.OwnerCDC, - Cluster: r.MakeClusterSpec(5), + Cluster: r.MakeClusterSpec(4), RequiresLicense: true, - CompatibleClouds: registry.OnlyLocal, + CompatibleClouds: registry.OnlyGCE, Suites: registry.Suites(registry.Nightly), Timeout: time.Minute * 15, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runCDCBackfillRollingRestart(ctx, t, c) + runCDCInitialScanRollingRestart(ctx, t, c, cdcShutdownCheckpoint) }, }) r.Add(registry.TestSpec{ diff --git a/pkg/workload/debug/webhook_server.go b/pkg/workload/debug/webhook_server.go index 5469ed2e257c..f33dbb4f3d5d 100644 --- a/pkg/workload/debug/webhook_server.go +++ b/pkg/workload/debug/webhook_server.go @@ -37,7 +37,7 @@ var webhookServerCmd = &cobra.Command{ } const ( - port = 9707 + WebhookServerPort = 9707 ) func webhookServer(cmd *cobra.Command, args []string) error { @@ -124,11 +124,11 @@ func webhookServer(cmd *cobra.Command, args []string) error { if err != nil { return err } - log.Printf("starting server on port %d", port) + log.Printf("starting server on port %d", WebhookServerPort) return (&http.Server{ TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, Handler: mux, - Addr: fmt.Sprintf(":%d", port), + Addr: fmt.Sprintf(":%d", WebhookServerPort), }).ListenAndServeTLS("", "") } From 11edce0908d3f36036a324922d75e3a2e7081112 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 10 May 2024 15:32:31 +0000 Subject: [PATCH 2/2] roachprod: add env var for secure default Release note: none. Epic: none. --- pkg/cmd/roachprod/BUILD.bazel | 1 + pkg/cmd/roachprod/flags.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachprod/BUILD.bazel b/pkg/cmd/roachprod/BUILD.bazel index 5118fa963d76..8eec4f079f04 100644 --- a/pkg/cmd/roachprod/BUILD.bazel +++ b/pkg/cmd/roachprod/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/roachprod/ui", "//pkg/roachprod/vm", "//pkg/roachprod/vm/gce", + "//pkg/util/envutil", "//pkg/util/flagutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/cmd/roachprod/flags.go b/pkg/cmd/roachprod/flags.go index 5d24f13e3a11..6b69828e829f 100644 --- a/pkg/cmd/roachprod/flags.go +++ b/pkg/cmd/roachprod/flags.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/ssh" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/cockroach/pkg/roachprod/vm/gce" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/flagutil" "github.com/spf13/cobra" "golang.org/x/exp/maps" @@ -50,7 +51,7 @@ var ( listPattern string isSecure bool // Set based on the values passed to --secure and --insecure secure = true // DEPRECATED - insecure = false + insecure = envutil.EnvOrDefaultBool("COCKROACH_ROACHPROD_INSECURE", false) virtualClusterName string sqlInstance int extraSSHOptions = ""