diff --git a/pkg/cmd/roachtest/tests/acceptance.go b/pkg/cmd/roachtest/tests/acceptance.go index 44f8ed20d8b2..22b9a737b6e9 100644 --- a/pkg/cmd/roachtest/tests/acceptance.go +++ b/pkg/cmd/roachtest/tests/acceptance.go @@ -68,6 +68,13 @@ func registerAcceptance(r registry.Registry) { timeout: 30 * time.Minute, }, }, + registry.OwnerDisasterRecovery: { + { + name: "c2c", + fn: runAcceptanceClusterReplication, + numNodes: 3, + }, + }, } tags := []string{"default", "quick"} specTemplate := registry.TestSpec{ diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 9dd4d3fc5bf2..4b8c857c446b 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -67,6 +67,7 @@ type c2cSetup struct { src clusterInfo dst clusterInfo workloadNode option.NodeListOption + promCfg *prometheus.Config } var c2cPromMetrics = map[string]clusterstats.ClusterStat{ @@ -372,6 +373,22 @@ func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c clus sp.t = t sp.c = c sp.metrics = &c2cMetrics{} + if !c.IsLocal() { + // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to + // pass a grafana dashboard for this to work + sp.setup.promCfg = (&prometheus.Config{}). + WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]). + WithCluster(sp.setup.dst.nodes.InstallNodes()). + WithNodeExporter(sp.setup.dst.nodes.InstallNodes()). + WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") + + require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), sp.setup.promCfg)) + sp.t.L().Printf("Prom has started") + } +} + +func (sp *replicationTestSpec) crdbNodes() option.NodeListOption { + return sp.setup.src.nodes.Merge(sp.setup.dst.nodes) } func (sp *replicationTestSpec) startStatsCollection( @@ -385,18 +402,8 @@ func (sp *replicationTestSpec) startStatsCollection( return map[string]float64{} } } - // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to - // pass a grafana dashboard for this to work - cfg := (&prometheus.Config{}). - WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]). - WithCluster(sp.setup.dst.nodes.InstallNodes()). - WithNodeExporter(sp.setup.dst.nodes.InstallNodes()). - WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - - require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), cfg)) - sp.t.L().Printf("Prom has started") - - client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), cfg) + + client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), sp.setup.promCfg) require.NoError(sp.t, err, "error creating prometheus client for stats collector") collector := clusterstats.NewStatsCollector(ctx, client) @@ -528,6 +535,102 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) require.Equal(sp.t, srcFingerprint, destFingerprint) } +func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster.Cluster) { + metricSnapper := sp.startStatsCollection(ctx) + sp.preStreamingWorkload(ctx) + + t.L().Printf("begin workload on src cluster") + m := c.NewMonitor(ctx, sp.crdbNodes()) + // The roachtest driver can use the workloadCtx to cancel the workload. + workloadCtx, workloadCancel := context.WithCancel(ctx) + defer workloadCancel() + + workloadDoneCh := make(chan struct{}) + m.Go(func(ctx context.Context) error { + defer close(workloadDoneCh) + err := sp.runWorkload(workloadCtx) + // The workload should only return an error if the roachtest driver cancels the + // workloadCtx after sp.additionalDuration has elapsed after the initial scan completes. + if err != nil && workloadCtx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned a + // different error. + return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) + } + t.L().Printf("workload successfully finished") + return nil + }) + + t.Status("starting replication stream") + sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + ingestionJobID := sp.startReplicationStream() + + removeTenantRateLimiters(t, sp.setup.dst.sysSQL, sp.setup.dst.name) + + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) + defer lv.maybeLogLatencyHist() + + m.Go(func(ctx context.Context) error { + return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) + }) + + t.L().Printf("waiting for replication stream to finish ingesting initial scan") + sp.waitForHighWatermark(ingestionJobID, sp.timeout/2) + sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, + sp.additionalDuration)) + + select { + case <-workloadDoneCh: + t.L().Printf("workload finished on its own") + case <-time.After(sp.getWorkloadTimeout()): + workloadCancel() + t.L().Printf("workload has cancelled after %s", sp.additionalDuration) + case <-ctx.Done(): + t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) + return + } + var currentTime time.Time + sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime := currentTime.Add(-sp.cutover) + sp.t.Status("cutover time chosen: ", cutoverTime.String()) + + retainedTime := sp.getReplicationRetainedTime() + + sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", + cutoverTime.String())) + sp.stopReplicationStream(ingestionJobID, cutoverTime) + sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + + sp.metrics.export(sp.t, len(sp.setup.src.nodes)) + + t.Status("comparing fingerprints") + sp.compareTenantFingerprintsAtTimestamp( + ctx, + retainedTime, + cutoverTime, + ) + lv.assertValid(t) +} +func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster.Cluster) { + if !c.IsLocal() { + t.Fatal("acceptance tests should only run on a local cluster") + } + sp := replicationTestSpec{ + srcNodes: 1, + dstNodes: 1, + // The timeout field ensures the c2c roachtest driver behaves properly. + timeout: 10 * time.Minute, + workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + additionalDuration: 0 * time.Minute, + cutover: 30 * time.Second, + } + sp.setupC2C(ctx, t, c) + sp.main(ctx, t, c) +} + func registerClusterToCluster(r registry.Registry) { for _, sp := range []replicationTestSpec{ { @@ -578,7 +681,7 @@ func registerClusterToCluster(r registry.Registry) { cpus: 4, pdSize: 10, workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, - timeout: 20 * time.Minute, + timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, skip: "for local ad hoc testing", @@ -597,7 +700,9 @@ func registerClusterToCluster(r registry.Registry) { } { sp := sp clusterOps := make([]spec.Option, 0) - clusterOps = append(clusterOps, spec.CPU(sp.cpus)) + if sp.cpus != 0 { + clusterOps = append(clusterOps, spec.CPU(sp.cpus)) + } if sp.pdSize != 0 { clusterOps = append(clusterOps, spec.VolumeSize(sp.pdSize)) } @@ -611,89 +716,16 @@ func registerClusterToCluster(r registry.Registry) { RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { sp.setupC2C(ctx, t, c) - m := c.NewMonitor(ctx, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) - hc := roachtestutil.NewHealthChecker(t, c, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) + m := c.NewMonitor(ctx, sp.crdbNodes()) + hc := roachtestutil.NewHealthChecker(t, c, sp.crdbNodes()) m.Go(func(ctx context.Context) error { require.NoError(t, hc.Runner(ctx)) return nil }) defer hc.Done() - metricSnapper := sp.startStatsCollection(ctx) - sp.preStreamingWorkload(ctx) - - t.L().Printf("begin workload on src cluster") - // The roachtest driver can use the workloadCtx to cancel the workload. - workloadCtx, workloadCancel := context.WithCancel(ctx) - defer workloadCancel() - - workloadDoneCh := make(chan struct{}) - m.Go(func(ctx context.Context) error { - defer close(workloadDoneCh) - err := sp.runWorkload(workloadCtx) - // The workload should only return an error if the roachtest driver cancels the - // workloadCtx after sp.additionalDuration has elapsed after the initial scan completes. - if err != nil && workloadCtx.Err() == nil { - // Implies the workload context was not cancelled and the workload cmd returned a - // different error. - return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) - } - t.L().Printf("workload successfully finished") - return nil - }) - - t.Status("starting replication stream") - sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - ingestionJobID := sp.startReplicationStream() - - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) - defer lv.maybeLogLatencyHist() - - m.Go(func(ctx context.Context) error { - return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) - }) - - t.L().Printf("waiting for replication stream to finish ingesting initial scan") - sp.waitForHighWatermark(ingestionJobID, sp.timeout/2) - sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) - t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, - sp.additionalDuration)) - - select { - case <-workloadDoneCh: - t.L().Printf("workload finished on its own") - case <-time.After(sp.getWorkloadTimeout()): - workloadCancel() - t.L().Printf("workload has cancelled after %s", sp.additionalDuration) - case <-ctx.Done(): - t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) - return - } - var currentTime time.Time - sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(¤tTime) - cutoverTime := currentTime.Add(-sp.cutover) - sp.t.Status("cutover time chosen: ", cutoverTime.String()) - - retainedTime := sp.getReplicationRetainedTime() - - sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) - sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - - sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", - cutoverTime.String())) - sp.stopReplicationStream(ingestionJobID, cutoverTime) - sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) - - sp.metrics.export(sp.t, len(sp.setup.src.nodes)) - - t.Status("comparing fingerprints") - sp.compareTenantFingerprintsAtTimestamp( - ctx, - retainedTime, - cutoverTime, - ) - lv.assertValid(t) + sp.main(ctx, t, c) }, }) } diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 4b209615fde0..2ca12644bcbe 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -366,7 +366,5 @@ func createInMemoryTenant( // removeTenantRateLimiters ensures the tenant is not throttled by limiters. func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { - systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1::STRING, 10000000000, 0, -10000000000, now(), 0);`, tenantName) systemSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY exempt_from_rate_limiting=true`, tenantName) } diff --git a/pkg/sql/drop_sequence.go b/pkg/sql/drop_sequence.go index 8f2caf3efab7..0d13f54316fc 100644 --- a/pkg/sql/drop_sequence.go +++ b/pkg/sql/drop_sequence.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -83,6 +84,12 @@ func (n *dropSequenceNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { droppedDesc := toDel.desc + // Exit early with an error if the table is undergoing a declarative schema + // change, before we try to get job IDs and update job statuses later. See + // createOrUpdateSchemaChangeJob. + if catalog.HasConcurrentDeclarativeSchemaChange(droppedDesc) { + return scerrors.ConcurrentSchemaChangeError(droppedDesc) + } err := params.p.dropSequenceImpl( ctx, droppedDesc, true /* queueJob */, tree.AsStringWithFQNames(n.n, params.Ann()), n.n.DropBehavior, ) diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 59c1464134d8..6b64678edeca 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -229,7 +229,12 @@ func (p *planner) dropTableImpl( behavior tree.DropBehavior, ) ([]string, error) { var droppedViews []string - + // Exit early with an error if the table is undergoing a declarative schema + // change, before we try to get job IDs and update job statuses later. See + // createOrUpdateSchemaChangeJob. + if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) { + return nil, scerrors.ConcurrentSchemaChangeError(tableDesc) + } // Remove foreign key back references from tables that this table has foreign // keys to. // Copy out the set of outbound fks as it may be overwritten in the loop. @@ -353,13 +358,6 @@ func (p *planner) initiateDropTable( return errors.Errorf("table %q is already being dropped", tableDesc.Name) } - // Exit early with an error if the table is undergoing a declarative schema - // change, before we try to get job IDs and update job statuses later. See - // createOrUpdateSchemaChangeJob. - if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) { - return scerrors.ConcurrentSchemaChangeError(tableDesc) - } - // Use the delayed GC mechanism to schedule usage of the more efficient // ClearRange pathway. if tableDesc.IsTable() { diff --git a/pkg/sql/drop_view.go b/pkg/sql/drop_view.go index dd0ea192ab4a..8146bd68aec6 100644 --- a/pkg/sql/drop_view.go +++ b/pkg/sql/drop_view.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -236,6 +237,12 @@ func (p *planner) dropViewImpl( ) ([]string, error) { var cascadeDroppedViews []string + // Exit early with an error if the table is undergoing a declarative schema + // change, before we try to get job IDs and update job statuses later. See + // createOrUpdateSchemaChangeJob. + if catalog.HasConcurrentDeclarativeSchemaChange(viewDesc) { + return nil, scerrors.ConcurrentSchemaChangeError(viewDesc) + } // Remove back-references from the tables/views this view depends on. dependedOn := append([]descpb.ID(nil), viewDesc.DependsOn...) for _, depID := range dependedOn {