Skip to content

Commit

Permalink
roachtest: expand and cloudify cdc/initial-scan-rolling-restart
Browse files Browse the repository at this point in the history
This patch expands the `cdc/initial-scan-rolling-restart` test to also
test normal checkpoints (in addition to shutdown checkpoints). It also
updates the test to work on GCE so that it can run during nightly CI.

Release note: None
  • Loading branch information
andyyang890 committed May 10, 2024
1 parent eaae192 commit d4ef147
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ go_library(
"//pkg/util/uuid",
"//pkg/util/version",
"//pkg/workload",
"//pkg/workload/debug",
"//pkg/workload/histogram",
"//pkg/workload/querybench",
"//pkg/workload/tpcc",
Expand Down
96 changes: 72 additions & 24 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/debug"
"github.com/cockroachdb/errors"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -908,11 +909,23 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
m.Wait()
}

func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
const rowCount, splitCount = 1000000, 500
type cdcCheckpointType int

const (
cdcNormalCheckpoint cdcCheckpointType = iota
cdcShutdownCheckpoint
)

// runCDCInitialScanRollingRestart runs multiple initial-scan-only changefeeds
// on a 4-node cluster, using node 1 as the coordinator and continuously
// restarting nodes 2-4 to hopefully force the changefeed to replan and exercise
// the checkpoint restore logic.
func runCDCInitialScanRollingRestart(
ctx context.Context, t test.Test, c cluster.Cluster, checkpointType cdcCheckpointType,
) {
startOpts := option.DefaultStartOpts()
ips, err := c.InternalIP(ctx, t.L(), c.Node(1))
sinkURL := fmt.Sprintf("https://%s:9707", ips[0])
ips, err := c.ExternalIP(ctx, t.L(), c.Node(1))
sinkURL := fmt.Sprintf("https://%s:%d", ips[0], debug.WebhookServerPort)
sink := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
if err != nil {
t.Fatal(err)
Expand All @@ -922,7 +935,7 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
racks := install.MakeClusterSettings(install.NumRacksOption(c.Spec().NodeCount))
racks.Env = append(racks.Env, `COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true`)
c.Start(ctx, t.L(), option.DefaultStartOpts(), racks)
m := c.NewMonitor(ctx, c.Range(1, 5))
m := c.NewMonitor(ctx, c.All())

restart := func(n int) error {
cmd := fmt.Sprintf("./cockroach node drain --certs-dir=%s --port={pgport:%d} --self", install.CockroachNodeCertsDir, n)
Expand All @@ -945,21 +958,42 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl

db := c.Conn(ctx, t.L(), 1)

// Setup a 1M row table that is split into >= 500 scattered ranges.
// Setup a large table with 1M rows and a small table with 5 rows.
// Keep ranges off n1 so that our plans use 2, 3, and 4.
const (
largeRowCount = 1000000
smallRowCount = 5
)
t.L().Printf("setting up test data...")
for _, s := range []string{
setupStmts := []string{
`ALTER RANGE default CONFIGURE ZONE USING constraints = '[-rack=0]'`,
fmt.Sprintf(`CREATE TABLE t (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, rowCount),
`ALTER TABLE t SCATTER`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount/4),
`ALTER TABLE t SCATTER`,
// Finish splitting, so that drained ranges spread out evenly.
fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount),
fmt.Sprintf(`CREATE TABLE large (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, largeRowCount),
`ALTER TABLE large SCATTER`,
fmt.Sprintf(`CREATE TABLE small (id PRIMARY KEY) AS SELECT generate_series(%d, %d)`, largeRowCount+1, largeRowCount+smallRowCount),
`ALTER TABLE small SCATTER`,
`SET CLUSTER SETTING jobs.registry.retry.initial_delay = '.1s'`,
`SET CLUSTER SETTING jobs.registry.retry.max_delay = '.4s'`,
} {
}
switch checkpointType {
case cdcNormalCheckpoint:
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`,
)
case cdcShutdownCheckpoint:
const largeSplitCount = 5
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4),
`ALTER TABLE large SCATTER`,
// Finish splitting, so that drained ranges spread out evenly.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount),
`ALTER TABLE large SCATTER`,
)
}
for _, s := range setupStmts {
t.L().Printf(s)
if _, err := db.Exec(s); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -988,7 +1022,7 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
}()
t.L().Printf("starting rolling drain+restarts of 2, 3, 4...")
for {
for _, n := range []int{2, 3, 4, 5} {
for _, n := range []int{2, 3, 4} {
select {
case <-stopRestarts:
return nil
Expand All @@ -1015,11 +1049,12 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
t.L().Printf("exiting webhook sink status: %v", err)
}()

for i := 1; i < 5; i++ {
const numChangefeeds = 5
for i := 1; i < numChangefeeds; i++ {
t.L().Printf("starting changefeed...")
var job int
if err := db.QueryRow(
fmt.Sprintf("CREATE CHANGEFEED FOR TABLE t INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL),
fmt.Sprintf("CREATE CHANGEFEED FOR TABLE large, small INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL),
).Scan(&job); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1054,8 +1089,9 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
t.Fatal(err)
}
t.L().Printf("sink got %d unique, %d dupes", unique, dupes)
if unique != rowCount {
t.Fatalf("expected %d, got %d", rowCount, unique)
expected := largeRowCount + smallRowCount
if unique != expected {
t.Fatalf("expected %d, got %d", expected, unique)
}
_, err = sink.Get(sinkURL + "/reset")
t.L().Printf("resetting sink %v", err)
Expand Down Expand Up @@ -1286,15 +1322,27 @@ func registerCDC(r registry.Registry) {
},
})
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-rolling-restart",
Name: "cdc/initial-scan-rolling-restart/normal-checkpoint",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(4),
RequiresLicense: true,
CompatibleClouds: registry.OnlyGCE,
Suites: registry.Suites(registry.Nightly),
Timeout: time.Minute * 15,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCInitialScanRollingRestart(ctx, t, c, cdcNormalCheckpoint)
},
})
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-rolling-restart/shutdown-checkpoint",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(5),
Cluster: r.MakeClusterSpec(4),
RequiresLicense: true,
CompatibleClouds: registry.OnlyLocal,
CompatibleClouds: registry.OnlyGCE,
Suites: registry.Suites(registry.Nightly),
Timeout: time.Minute * 15,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBackfillRollingRestart(ctx, t, c)
runCDCInitialScanRollingRestart(ctx, t, c, cdcShutdownCheckpoint)
},
})
r.Add(registry.TestSpec{
Expand Down
6 changes: 3 additions & 3 deletions pkg/workload/debug/webhook_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var webhookServerCmd = &cobra.Command{
}

const (
port = 9707
WebhookServerPort = 9707
)

func webhookServer(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -124,11 +124,11 @@ func webhookServer(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
log.Printf("starting server on port %d", port)
log.Printf("starting server on port %d", WebhookServerPort)
return (&http.Server{
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
Handler: mux,
Addr: fmt.Sprintf(":%d", port),
Addr: fmt.Sprintf(":%d", WebhookServerPort),
}).ListenAndServeTLS("", "")
}

Expand Down

0 comments on commit d4ef147

Please sign in to comment.