Skip to content

Commit

Permalink
Merge #99438 #99468
Browse files Browse the repository at this point in the history
99438: sql/schemachanger: block legacy schema changer operations earlier r=fqazi a=fqazi

Previously, it was possible for ADD/DROP CONSTRAINT and other declarative schema changer operations to collide with DROP TABLE in the legacy schema changer, since our gate was not early enough. So, some work would be done before the schema change was blocked with a retry error.
This patch prevents DROP TABLE/VIEW/SEQUENCE from executing until a declarative schema change on such objects is completed earlier.

Fixes: #99379

Release note: None

99468: c2c: add acceptance test r=stevendanna,renatolabs a=msbutler

c2c: add acceptance test

This patch adds a lightweight acceptance test that calls the main c2c roachtest
driver. This will add a c2c roachtest to Essential CI, preventing future
patches from inadvertently breaking the C2C roachtest driver.

Fixes #99230

Release note: None

Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Mar 24, 2023
3 parents 9e1977a + 0440cb1 + a411fe2 commit a9e90a6
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 100 deletions.
7 changes: 7 additions & 0 deletions pkg/cmd/roachtest/tests/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func registerAcceptance(r registry.Registry) {
timeout: 30 * time.Minute,
},
},
registry.OwnerDisasterRecovery: {
{
name: "c2c",
fn: runAcceptanceClusterReplication,
numNodes: 3,
},
},
}
tags := []string{"default", "quick"}
specTemplate := registry.TestSpec{
Expand Down
212 changes: 122 additions & 90 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type c2cSetup struct {
src clusterInfo
dst clusterInfo
workloadNode option.NodeListOption
promCfg *prometheus.Config
}

var c2cPromMetrics = map[string]clusterstats.ClusterStat{
Expand Down Expand Up @@ -372,6 +373,22 @@ func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c clus
sp.t = t
sp.c = c
sp.metrics = &c2cMetrics{}
if !c.IsLocal() {
// TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to
// pass a grafana dashboard for this to work
sp.setup.promCfg = (&prometheus.Config{}).
WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]).
WithCluster(sp.setup.dst.nodes.InstallNodes()).
WithNodeExporter(sp.setup.dst.nodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")

require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), sp.setup.promCfg))
sp.t.L().Printf("Prom has started")
}
}

func (sp *replicationTestSpec) crdbNodes() option.NodeListOption {
return sp.setup.src.nodes.Merge(sp.setup.dst.nodes)
}

func (sp *replicationTestSpec) startStatsCollection(
Expand All @@ -385,18 +402,8 @@ func (sp *replicationTestSpec) startStatsCollection(
return map[string]float64{}
}
}
// TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to
// pass a grafana dashboard for this to work
cfg := (&prometheus.Config{}).
WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]).
WithCluster(sp.setup.dst.nodes.InstallNodes()).
WithNodeExporter(sp.setup.dst.nodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")

require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), cfg))
sp.t.L().Printf("Prom has started")

client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), cfg)

client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), sp.setup.promCfg)
require.NoError(sp.t, err, "error creating prometheus client for stats collector")
collector := clusterstats.NewStatsCollector(ctx, client)

Expand Down Expand Up @@ -528,6 +535,102 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost)
require.Equal(sp.t, srcFingerprint, destFingerprint)
}

func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster.Cluster) {
metricSnapper := sp.startStatsCollection(ctx)
sp.preStreamingWorkload(ctx)

t.L().Printf("begin workload on src cluster")
m := c.NewMonitor(ctx, sp.crdbNodes())
// The roachtest driver can use the workloadCtx to cancel the workload.
workloadCtx, workloadCancel := context.WithCancel(ctx)
defer workloadCancel()

workloadDoneCh := make(chan struct{})
m.Go(func(ctx context.Context) error {
defer close(workloadDoneCh)
err := sp.runWorkload(workloadCtx)
// The workload should only return an error if the roachtest driver cancels the
// workloadCtx after sp.additionalDuration has elapsed after the initial scan completes.
if err != nil && workloadCtx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
t.L().Printf("workload successfully finished")
return nil
})

t.Status("starting replication stream")
sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())
ingestionJobID := sp.startReplicationStream()

removeTenantRateLimiters(t, sp.setup.dst.sysSQL, sp.setup.dst.name)

lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh)
})

t.L().Printf("waiting for replication stream to finish ingesting initial scan")
sp.waitForHighWatermark(ingestionJobID, sp.timeout/2)
sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now())
t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`,
sp.additionalDuration))

select {
case <-workloadDoneCh:
t.L().Printf("workload finished on its own")
case <-time.After(sp.getWorkloadTimeout()):
workloadCancel()
t.L().Printf("workload has cancelled after %s", sp.additionalDuration)
case <-ctx.Done():
t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`)
return
}
var currentTime time.Time
sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime := currentTime.Add(-sp.cutover)
sp.t.Status("cutover time chosen: ", cutoverTime.String())

retainedTime := sp.getReplicationRetainedTime()

sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime)
sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now())

sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
sp.stopReplicationStream(ingestionJobID, cutoverTime)
sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

sp.metrics.export(sp.t, len(sp.setup.src.nodes))

t.Status("comparing fingerprints")
sp.compareTenantFingerprintsAtTimestamp(
ctx,
retainedTime,
cutoverTime,
)
lv.assertValid(t)
}
func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster.Cluster) {
if !c.IsLocal() {
t.Fatal("acceptance tests should only run on a local cluster")
}
sp := replicationTestSpec{
srcNodes: 1,
dstNodes: 1,
// The timeout field ensures the c2c roachtest driver behaves properly.
timeout: 10 * time.Minute,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
}
sp.setupC2C(ctx, t, c)
sp.main(ctx, t, c)
}

func registerClusterToCluster(r registry.Registry) {
for _, sp := range []replicationTestSpec{
{
Expand Down Expand Up @@ -578,7 +681,7 @@ func registerClusterToCluster(r registry.Registry) {
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
timeout: 20 * time.Minute,
timeout: 5 * time.Minute,
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
skip: "for local ad hoc testing",
Expand All @@ -597,7 +700,9 @@ func registerClusterToCluster(r registry.Registry) {
} {
sp := sp
clusterOps := make([]spec.Option, 0)
clusterOps = append(clusterOps, spec.CPU(sp.cpus))
if sp.cpus != 0 {
clusterOps = append(clusterOps, spec.CPU(sp.cpus))
}
if sp.pdSize != 0 {
clusterOps = append(clusterOps, spec.VolumeSize(sp.pdSize))
}
Expand All @@ -611,89 +716,16 @@ func registerClusterToCluster(r registry.Registry) {
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
sp.setupC2C(ctx, t, c)
m := c.NewMonitor(ctx, sp.setup.src.nodes.Merge(sp.setup.dst.nodes))
hc := roachtestutil.NewHealthChecker(t, c, sp.setup.src.nodes.Merge(sp.setup.dst.nodes))

m := c.NewMonitor(ctx, sp.crdbNodes())
hc := roachtestutil.NewHealthChecker(t, c, sp.crdbNodes())
m.Go(func(ctx context.Context) error {
require.NoError(t, hc.Runner(ctx))
return nil
})
defer hc.Done()

metricSnapper := sp.startStatsCollection(ctx)
sp.preStreamingWorkload(ctx)

t.L().Printf("begin workload on src cluster")
// The roachtest driver can use the workloadCtx to cancel the workload.
workloadCtx, workloadCancel := context.WithCancel(ctx)
defer workloadCancel()

workloadDoneCh := make(chan struct{})
m.Go(func(ctx context.Context) error {
defer close(workloadDoneCh)
err := sp.runWorkload(workloadCtx)
// The workload should only return an error if the roachtest driver cancels the
// workloadCtx after sp.additionalDuration has elapsed after the initial scan completes.
if err != nil && workloadCtx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
t.L().Printf("workload successfully finished")
return nil
})

t.Status("starting replication stream")
sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())
ingestionJobID := sp.startReplicationStream()

lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh)
})

t.L().Printf("waiting for replication stream to finish ingesting initial scan")
sp.waitForHighWatermark(ingestionJobID, sp.timeout/2)
sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now())
t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`,
sp.additionalDuration))

select {
case <-workloadDoneCh:
t.L().Printf("workload finished on its own")
case <-time.After(sp.getWorkloadTimeout()):
workloadCancel()
t.L().Printf("workload has cancelled after %s", sp.additionalDuration)
case <-ctx.Done():
t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`)
return
}
var currentTime time.Time
sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime := currentTime.Add(-sp.cutover)
sp.t.Status("cutover time chosen: ", cutoverTime.String())

retainedTime := sp.getReplicationRetainedTime()

sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime)
sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now())

sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
sp.stopReplicationStream(ingestionJobID, cutoverTime)
sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

sp.metrics.export(sp.t, len(sp.setup.src.nodes))

t.Status("comparing fingerprints")
sp.compareTenantFingerprintsAtTimestamp(
ctx,
retainedTime,
cutoverTime,
)
lv.assertValid(t)
sp.main(ctx, t, c)
},
})
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,5 @@ func createInMemoryTenant(

// removeTenantRateLimiters ensures the tenant is not throttled by limiters.
func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) {
systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1::STRING, 10000000000, 0,
10000000000, now(), 0);`, tenantName)
systemSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY exempt_from_rate_limiting=true`, tenantName)
}
7 changes: 7 additions & 0 deletions pkg/sql/drop_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
Expand Down Expand Up @@ -83,6 +84,12 @@ func (n *dropSequenceNode) startExec(params runParams) error {
ctx := params.ctx
for _, toDel := range n.td {
droppedDesc := toDel.desc
// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if catalog.HasConcurrentDeclarativeSchemaChange(droppedDesc) {
return scerrors.ConcurrentSchemaChangeError(droppedDesc)
}
err := params.p.dropSequenceImpl(
ctx, droppedDesc, true /* queueJob */, tree.AsStringWithFQNames(n.n, params.Ann()), n.n.DropBehavior,
)
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,12 @@ func (p *planner) dropTableImpl(
behavior tree.DropBehavior,
) ([]string, error) {
var droppedViews []string

// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) {
return nil, scerrors.ConcurrentSchemaChangeError(tableDesc)
}
// Remove foreign key back references from tables that this table has foreign
// keys to.
// Copy out the set of outbound fks as it may be overwritten in the loop.
Expand Down Expand Up @@ -353,13 +358,6 @@ func (p *planner) initiateDropTable(
return errors.Errorf("table %q is already being dropped", tableDesc.Name)
}

// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) {
return scerrors.ConcurrentSchemaChangeError(tableDesc)
}

// Use the delayed GC mechanism to schedule usage of the more efficient
// ClearRange pathway.
if tableDesc.IsTable() {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -236,6 +237,12 @@ func (p *planner) dropViewImpl(
) ([]string, error) {
var cascadeDroppedViews []string

// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if catalog.HasConcurrentDeclarativeSchemaChange(viewDesc) {
return nil, scerrors.ConcurrentSchemaChangeError(viewDesc)
}
// Remove back-references from the tables/views this view depends on.
dependedOn := append([]descpb.ID(nil), viewDesc.DependsOn...)
for _, depID := range dependedOn {
Expand Down

0 comments on commit a9e90a6

Please sign in to comment.