From 0440cb1c0801ea0f455f69ae32f717fd843fdba9 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 23 Mar 2023 16:37:52 -0400 Subject: [PATCH 1/4] sql/schemachanger: block legacy schema changer operations earlier Previously, it was possible for ADD/DROP CONSTRAINT and other declarative schema changer operations to collide with DROP TABLE in the legacy schema changer, since our gate was not early enough. So, some work would be done before the schema change was blocked with a retry error. This patch prevents DROP TABLE/VIEW/SEQUENCE from executing until a declarative schema change on such objects is completed earlier. Fixes: #99379 Release note: None --- pkg/sql/drop_sequence.go | 7 +++++++ pkg/sql/drop_table.go | 14 ++++++-------- pkg/sql/drop_view.go | 7 +++++++ 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pkg/sql/drop_sequence.go b/pkg/sql/drop_sequence.go index 8f2caf3efab7..0d13f54316fc 100644 --- a/pkg/sql/drop_sequence.go +++ b/pkg/sql/drop_sequence.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -83,6 +84,12 @@ func (n *dropSequenceNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { droppedDesc := toDel.desc + // Exit early with an error if the table is undergoing a declarative schema + // change, before we try to get job IDs and update job statuses later. See + // createOrUpdateSchemaChangeJob. + if catalog.HasConcurrentDeclarativeSchemaChange(droppedDesc) { + return scerrors.ConcurrentSchemaChangeError(droppedDesc) + } err := params.p.dropSequenceImpl( ctx, droppedDesc, true /* queueJob */, tree.AsStringWithFQNames(n.n, params.Ann()), n.n.DropBehavior, ) diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 59c1464134d8..6b64678edeca 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -229,7 +229,12 @@ func (p *planner) dropTableImpl( behavior tree.DropBehavior, ) ([]string, error) { var droppedViews []string - + // Exit early with an error if the table is undergoing a declarative schema + // change, before we try to get job IDs and update job statuses later. See + // createOrUpdateSchemaChangeJob. + if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) { + return nil, scerrors.ConcurrentSchemaChangeError(tableDesc) + } // Remove foreign key back references from tables that this table has foreign // keys to. // Copy out the set of outbound fks as it may be overwritten in the loop. @@ -353,13 +358,6 @@ func (p *planner) initiateDropTable( return errors.Errorf("table %q is already being dropped", tableDesc.Name) } - // Exit early with an error if the table is undergoing a declarative schema - // change, before we try to get job IDs and update job statuses later. See - // createOrUpdateSchemaChangeJob. - if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) { - return scerrors.ConcurrentSchemaChangeError(tableDesc) - } - // Use the delayed GC mechanism to schedule usage of the more efficient // ClearRange pathway. if tableDesc.IsTable() { diff --git a/pkg/sql/drop_view.go b/pkg/sql/drop_view.go index dd0ea192ab4a..8146bd68aec6 100644 --- a/pkg/sql/drop_view.go +++ b/pkg/sql/drop_view.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -236,6 +237,12 @@ func (p *planner) dropViewImpl( ) ([]string, error) { var cascadeDroppedViews []string + // Exit early with an error if the table is undergoing a declarative schema + // change, before we try to get job IDs and update job statuses later. See + // createOrUpdateSchemaChangeJob. + if catalog.HasConcurrentDeclarativeSchemaChange(viewDesc) { + return nil, scerrors.ConcurrentSchemaChangeError(viewDesc) + } // Remove back-references from the tables/views this view depends on. dependedOn := append([]descpb.ID(nil), viewDesc.DependsOn...) for _, depID := range dependedOn { From 513a040aa041eaac974e74d1427458f6c3ca0183 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 22 Mar 2023 16:48:31 -0400 Subject: [PATCH 2/4] c2c: increase portability of roachtest driver This small patch puts all roachtest driver logic into a main function, allowing other test harnesses to initiate C2C roachtests. Informs #99230 Release note: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 187 +++++++++--------- 1 file changed, 99 insertions(+), 88 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 9dd4d3fc5bf2..bc4be80245f8 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -67,6 +67,7 @@ type c2cSetup struct { src clusterInfo dst clusterInfo workloadNode option.NodeListOption + promCfg *prometheus.Config } var c2cPromMetrics = map[string]clusterstats.ClusterStat{ @@ -372,6 +373,22 @@ func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c clus sp.t = t sp.c = c sp.metrics = &c2cMetrics{} + if !c.IsLocal() { + // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to + // pass a grafana dashboard for this to work + sp.setup.promCfg = (&prometheus.Config{}). + WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]). + WithCluster(sp.setup.dst.nodes.InstallNodes()). + WithNodeExporter(sp.setup.dst.nodes.InstallNodes()). + WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") + + require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), sp.setup.promCfg)) + sp.t.L().Printf("Prom has started") + } +} + +func (sp *replicationTestSpec) crdbNodes() option.NodeListOption { + return sp.setup.src.nodes.Merge(sp.setup.dst.nodes) } func (sp *replicationTestSpec) startStatsCollection( @@ -385,18 +402,8 @@ func (sp *replicationTestSpec) startStatsCollection( return map[string]float64{} } } - // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to - // pass a grafana dashboard for this to work - cfg := (&prometheus.Config{}). - WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]). - WithCluster(sp.setup.dst.nodes.InstallNodes()). - WithNodeExporter(sp.setup.dst.nodes.InstallNodes()). - WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - - require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), cfg)) - sp.t.L().Printf("Prom has started") - - client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), cfg) + + client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), sp.setup.promCfg) require.NoError(sp.t, err, "error creating prometheus client for stats collector") collector := clusterstats.NewStatsCollector(ctx, client) @@ -528,6 +535,83 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) require.Equal(sp.t, srcFingerprint, destFingerprint) } +func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster.Cluster) { + metricSnapper := sp.startStatsCollection(ctx) + sp.preStreamingWorkload(ctx) + + t.L().Printf("begin workload on src cluster") + m := c.NewMonitor(ctx, sp.crdbNodes()) + // The roachtest driver can use the workloadCtx to cancel the workload. + workloadCtx, workloadCancel := context.WithCancel(ctx) + defer workloadCancel() + + workloadDoneCh := make(chan struct{}) + m.Go(func(ctx context.Context) error { + defer close(workloadDoneCh) + err := sp.runWorkload(workloadCtx) + // The workload should only return an error if the roachtest driver cancels the + // workloadCtx after sp.additionalDuration has elapsed after the initial scan completes. + if err != nil && workloadCtx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned a + // different error. + return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) + } + t.L().Printf("workload successfully finished") + return nil + }) + + t.Status("starting replication stream") + sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + ingestionJobID := sp.startReplicationStream() + + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) + defer lv.maybeLogLatencyHist() + + m.Go(func(ctx context.Context) error { + return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) + }) + + t.L().Printf("waiting for replication stream to finish ingesting initial scan") + sp.waitForHighWatermark(ingestionJobID, sp.timeout/2) + sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, + sp.additionalDuration)) + + select { + case <-workloadDoneCh: + t.L().Printf("workload finished on its own") + case <-time.After(sp.getWorkloadTimeout()): + workloadCancel() + t.L().Printf("workload has cancelled after %s", sp.additionalDuration) + case <-ctx.Done(): + t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) + return + } + var currentTime time.Time + sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime := currentTime.Add(-sp.cutover) + sp.t.Status("cutover time chosen: ", cutoverTime.String()) + + retainedTime := sp.getReplicationRetainedTime() + + sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", + cutoverTime.String())) + sp.stopReplicationStream(ingestionJobID, cutoverTime) + sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + + sp.metrics.export(sp.t, len(sp.setup.src.nodes)) + + t.Status("comparing fingerprints") + sp.compareTenantFingerprintsAtTimestamp( + ctx, + retainedTime, + cutoverTime, + ) + lv.assertValid(t) +} func registerClusterToCluster(r registry.Registry) { for _, sp := range []replicationTestSpec{ { @@ -611,89 +695,16 @@ func registerClusterToCluster(r registry.Registry) { RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { sp.setupC2C(ctx, t, c) - m := c.NewMonitor(ctx, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) - hc := roachtestutil.NewHealthChecker(t, c, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) + m := c.NewMonitor(ctx, sp.crdbNodes()) + hc := roachtestutil.NewHealthChecker(t, c, sp.crdbNodes()) m.Go(func(ctx context.Context) error { require.NoError(t, hc.Runner(ctx)) return nil }) defer hc.Done() - metricSnapper := sp.startStatsCollection(ctx) - sp.preStreamingWorkload(ctx) - - t.L().Printf("begin workload on src cluster") - // The roachtest driver can use the workloadCtx to cancel the workload. - workloadCtx, workloadCancel := context.WithCancel(ctx) - defer workloadCancel() - - workloadDoneCh := make(chan struct{}) - m.Go(func(ctx context.Context) error { - defer close(workloadDoneCh) - err := sp.runWorkload(workloadCtx) - // The workload should only return an error if the roachtest driver cancels the - // workloadCtx after sp.additionalDuration has elapsed after the initial scan completes. - if err != nil && workloadCtx.Err() == nil { - // Implies the workload context was not cancelled and the workload cmd returned a - // different error. - return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) - } - t.L().Printf("workload successfully finished") - return nil - }) - - t.Status("starting replication stream") - sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - ingestionJobID := sp.startReplicationStream() - - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) - defer lv.maybeLogLatencyHist() - - m.Go(func(ctx context.Context) error { - return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) - }) - - t.L().Printf("waiting for replication stream to finish ingesting initial scan") - sp.waitForHighWatermark(ingestionJobID, sp.timeout/2) - sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) - t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, - sp.additionalDuration)) - - select { - case <-workloadDoneCh: - t.L().Printf("workload finished on its own") - case <-time.After(sp.getWorkloadTimeout()): - workloadCancel() - t.L().Printf("workload has cancelled after %s", sp.additionalDuration) - case <-ctx.Done(): - t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) - return - } - var currentTime time.Time - sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(¤tTime) - cutoverTime := currentTime.Add(-sp.cutover) - sp.t.Status("cutover time chosen: ", cutoverTime.String()) - - retainedTime := sp.getReplicationRetainedTime() - - sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) - sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - - sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", - cutoverTime.String())) - sp.stopReplicationStream(ingestionJobID, cutoverTime) - sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) - - sp.metrics.export(sp.t, len(sp.setup.src.nodes)) - - t.Status("comparing fingerprints") - sp.compareTenantFingerprintsAtTimestamp( - ctx, - retainedTime, - cutoverTime, - ) - lv.assertValid(t) + sp.main(ctx, t, c) }, }) } From 56415260d3358ad4da10ad73dbb510163cb84210 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 24 Mar 2023 07:48:59 -0400 Subject: [PATCH 3/4] c2c: remove tenant rate limiter on destination side in roachtest Release note: none Epic: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 2 ++ pkg/cmd/roachtest/tests/multitenant_utils.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index bc4be80245f8..dd6f9e031752 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -564,6 +564,8 @@ func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster. sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) ingestionJobID := sp.startReplicationStream() + removeTenantRateLimiters(t, sp.setup.dst.sysSQL, sp.setup.dst.name) + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) defer lv.maybeLogLatencyHist() diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 4b209615fde0..2ca12644bcbe 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -366,7 +366,5 @@ func createInMemoryTenant( // removeTenantRateLimiters ensures the tenant is not throttled by limiters. func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { - systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1::STRING, 10000000000, 0, -10000000000, now(), 0);`, tenantName) systemSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY exempt_from_rate_limiting=true`, tenantName) } From a411fe24f78d98c833e1fcecb5e15c9202b717b9 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 22 Mar 2023 17:20:18 -0400 Subject: [PATCH 4/4] c2c: add acceptance test This patch adds a lightweight acceptance test that calls the main c2c roachtest driver. This will add a c2c roachtest to Essential CI, preventing future patches from inadvertently breaking the C2C roachtest driver. Fixes #99230 Release note: None --- pkg/cmd/roachtest/tests/acceptance.go | 7 ++++++ pkg/cmd/roachtest/tests/cluster_to_cluster.go | 23 +++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/roachtest/tests/acceptance.go b/pkg/cmd/roachtest/tests/acceptance.go index 44f8ed20d8b2..22b9a737b6e9 100644 --- a/pkg/cmd/roachtest/tests/acceptance.go +++ b/pkg/cmd/roachtest/tests/acceptance.go @@ -68,6 +68,13 @@ func registerAcceptance(r registry.Registry) { timeout: 30 * time.Minute, }, }, + registry.OwnerDisasterRecovery: { + { + name: "c2c", + fn: runAcceptanceClusterReplication, + numNodes: 3, + }, + }, } tags := []string{"default", "quick"} specTemplate := registry.TestSpec{ diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index dd6f9e031752..4b8c857c446b 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -614,6 +614,23 @@ func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster. ) lv.assertValid(t) } +func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster.Cluster) { + if !c.IsLocal() { + t.Fatal("acceptance tests should only run on a local cluster") + } + sp := replicationTestSpec{ + srcNodes: 1, + dstNodes: 1, + // The timeout field ensures the c2c roachtest driver behaves properly. + timeout: 10 * time.Minute, + workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + additionalDuration: 0 * time.Minute, + cutover: 30 * time.Second, + } + sp.setupC2C(ctx, t, c) + sp.main(ctx, t, c) +} + func registerClusterToCluster(r registry.Registry) { for _, sp := range []replicationTestSpec{ { @@ -664,7 +681,7 @@ func registerClusterToCluster(r registry.Registry) { cpus: 4, pdSize: 10, workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, - timeout: 20 * time.Minute, + timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, skip: "for local ad hoc testing", @@ -683,7 +700,9 @@ func registerClusterToCluster(r registry.Registry) { } { sp := sp clusterOps := make([]spec.Option, 0) - clusterOps = append(clusterOps, spec.CPU(sp.cpus)) + if sp.cpus != 0 { + clusterOps = append(clusterOps, spec.CPU(sp.cpus)) + } if sp.pdSize != 0 { clusterOps = append(clusterOps, spec.VolumeSize(sp.pdSize)) }