Skip to content

Commit

Permalink
migrations: add migration to wait on in-flight schema changes
Browse files Browse the repository at this point in the history
Eventually, we want to stop accepting non-MVCC AddSSTables. To do
this, we need all CREATE INDEX schema changes to use the new
mvcc-compatible process. To make this more tenable, we want to drain
in-flight schema changes during a migration.

Release note: Finalizing an upgrade to 22.1 requires that all
in-flight schema changes enter a terminal state. This may mean that
finalization takes as long as the longest-running schema change.
  • Loading branch information
stevendanna committed Feb 11, 2022
1 parent 18e4193 commit db5b216
Show file tree
Hide file tree
Showing 13 changed files with 487 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-62 set the active cluster version in the format '<major>.<minor>'
version version 21.2-64 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-62</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-64</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
14 changes: 14 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ const (
DontProposeWriteTimestampForLeaseTransfers
// TenantSettingsTable adds the system table for tracking tenant usage.
TenantSettingsTable
// NoNonMVCCIndexBackfills adds the migration which waits for
// all schema changes to complete. After this point, no
// non-MVCC index backfills will be used.
//
// TODO(ssd): Do we want to name this something related to
// AddSST since we will use it not just for the migration but
// also eventually to decide whether to start rejecting
// AddSSTable requests (and failing resumed old schema
// changes).
NoNonMVCCIndexBackfills

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -429,6 +439,10 @@ var versionsSingleton = keyedVersions{
Key: TenantSettingsTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 62},
},
{
Key: NoNonMVCCIndexBackfills,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 64},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type TestingKnobs struct {

// DisableAdoptions disables job adoptions.
DisableAdoptions bool

// BeforeWaitForJobsQuery is called once per invocation of the
// poll-show-jobs query in WaitForJobs.
BeforeWaitForJobsQuery func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand All @@ -92,6 +96,13 @@ type TestingIntervalOverrides struct {

// RetryMaxDelay overrides retryMaxDelaySetting cluster setting.
RetryMaxDelay *time.Duration

// WaitForJobsInitialDelay is the initial delay used in
// WaitForJobs calls.
WaitForJobsInitialDelay *time.Duration

// WaitForJobsMaxDelay
WaitForJobsMaxDelay *time.Duration
}

// NewTestingKnobsWithShortIntervals return a TestingKnobs structure with
Expand Down
54 changes: 47 additions & 7 deletions pkg/jobs/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,44 @@ func (r *Registry) WaitForJobs(
return r.waitForJobs(ctx, ex, jobs, jobFinishedLocally)
}

func (r *Registry) waitForJobs(
// WaitForJobsIgnoringJobErrors is like WaitForJobs but it only
// returns an error in the case that polling the jobs table fails.
func (r *Registry) WaitForJobsIgnoringJobErrors(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
log.Infof(ctx, "waiting for %d %v queued jobs to complete", len(jobs), jobs)
jobFinishedLocally, cleanup := r.installWaitingSet(jobs...)
defer cleanup()
return r.waitForJobsToBeTerminalOrPaused(ctx, ex, jobs, jobFinishedLocally)
}

func (r *Registry) waitForJobsToBeTerminalOrPaused(
ctx context.Context,
ex sqlutil.InternalExecutor,
jobs []jobspb.JobID,
jobFinishedLocally <-chan struct{},
) error {

if len(jobs) == 0 {
return nil
}

query := makeWaitForJobsQuery(jobs)
start := timeutil.Now()
// Manually retry instead of using SHOW JOBS WHEN COMPLETE so we have greater
// control over retries. Also, avoiding SHOW JOBS prevents us from having to
// populate the crdb_internal.jobs vtable.
initialBackoff := 500 * time.Millisecond
maxBackoff := 3 * time.Second

if r.knobs.IntervalOverrides.WaitForJobsInitialDelay != nil {
initialBackoff = *r.knobs.IntervalOverrides.WaitForJobsInitialDelay
}

if r.knobs.IntervalOverrides.WaitForJobsMaxDelay != nil {
maxBackoff = *r.knobs.IntervalOverrides.WaitForJobsMaxDelay
}

ret := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 500 * time.Millisecond,
MaxBackoff: 3 * time.Second,
InitialBackoff: initialBackoff,
MaxBackoff: maxBackoff,
Multiplier: 1.5,

// Setting the closer here will terminate the loop if the job finishes
Expand All @@ -92,6 +111,9 @@ func (r *Registry) waitForJobs(
// We poll the number of queued jobs that aren't finished. As with SHOW JOBS
// WHEN COMPLETE, if one of the jobs is missing from the jobs table for
// whatever reason, we'll fail later when we try to load the job.
if fn := r.knobs.BeforeWaitForJobsQuery; fn != nil {
fn()
}
row, err := ex.QueryRowEx(
ctx,
"poll-show-jobs",
Expand All @@ -110,13 +132,31 @@ func (r *Registry) waitForJobs(
log.Infof(ctx, "waiting for %d queued jobs to complete", count)
}
if count == 0 {
break
return nil
}
}
return nil
}

func (r *Registry) waitForJobs(
ctx context.Context,
ex sqlutil.InternalExecutor,
jobs []jobspb.JobID,
jobFinishedLocally <-chan struct{},
) error {
if len(jobs) == 0 {
return nil
}
start := timeutil.Now()
defer func() {
log.Infof(ctx, "waited for %d %v queued jobs to complete %v",
len(jobs), jobs, timeutil.Since(start))
}()

if err := r.waitForJobsToBeTerminalOrPaused(ctx, ex, jobs, jobFinishedLocally); err != nil {
return err
}

for i, id := range jobs {
j, err := r.LoadJob(ctx, id)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/migrationjob/migration_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error {
CollectionFactory: execCtx.ExecCfg().CollectionFactory,
LeaseManager: execCtx.ExecCfg().LeaseManager,
InternalExecutor: execCtx.ExecCfg().InternalExecutor,
JobRegistry: execCtx.ExecCfg().JobRegistry,
TestingKnobs: execCtx.ExecCfg().MigrationTestingKnobs,
}
tenantDeps.SpanConfig.KVAccessor = execCtx.ExecCfg().SpanConfigKVAccessor
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/migrationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ func (m *Manager) checkPreconditions(
Settings: m.settings,
LeaseManager: m.lm,
InternalExecutor: m.ie,
JobRegistry: m.jr,
}); err != nil {
return errors.Wrapf(
err,
Expand Down
6 changes: 6 additions & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"schema_changes.go",
"seed_tenant_span_configs.go",
"tenant_settings.go",
"wait_for_schema_changes.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrations",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -78,13 +79,15 @@ go_test(
"public_schema_migration_external_test.go",
"raft_applied_index_term_external_test.go",
"remove_invalid_database_privileges_external_test.go",
"wait_for_schema_changes_test.go",
],
data = glob(["testdata/**"]),
embed = [":migrations"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
Expand All @@ -107,12 +110,15 @@ go_test(
"//pkg/sql/privilege",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
6 changes: 6 additions & 0 deletions pkg/migration/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ var migrations = []migration.Migration{
NoPrecondition,
tenantSettingsTableMigration,
),
migration.NewTenantMigration(
"wait for all in-flight schema changes",
toCV(clusterversion.NoNonMVCCIndexBackfills),
NoPrecondition,
waitForAllSchemaChanges,
),
}

func init() {
Expand Down
78 changes: 78 additions & 0 deletions pkg/migration/migrations/wait_for_schema_changes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/errors"
)

// waitForAllSchemaChanges waits for all schema changes to enter a
// terminal or paused state.
//
// Because this is intended for the mvcc-bulk-ops transition, it does
// not care about schema changes created while this migration is
// running because any such schema changes must already be using the
// new mvcc bulk operations
//
// Note that we do not use SHOW JOBS WHEN COMPLETE here to avoid
// blocking forever on PAUSED jobs. Jobs using old index backfills
// will fail on Resume.
func waitForAllSchemaChanges(
ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job,
) error {

initialJobListQuery := fmt.Sprintf(`
SELECT
job_id
FROM
[SHOW JOBS]
WHERE
job_type = 'SCHEMA CHANGE'
AND status NOT IN ('%s', '%s', '%s', '%s', '%s')
`,
string(jobs.StatusSucceeded),
string(jobs.StatusFailed),
string(jobs.StatusCanceled),
string(jobs.StatusRevertFailed),
string(jobs.StatusPaused))
rows, err := d.InternalExecutor.QueryBufferedEx(ctx,
"query-non-terminal-schema-changers",
nil, /* txn */
sessiondata.NodeUserSessionDataOverride,
initialJobListQuery)
if err != nil {
return err
}

jobList := make([]jobspb.JobID, len(rows))
for i, datums := range rows {
if len(datums) != 1 {
return errors.AssertionFailedf("unexpected number of columns: %d (expected 1)", len(datums))
}
d := datums[0]
id, ok := d.(*tree.DInt)
if !ok {
return errors.AssertionFailedf("unexpected type for id column: %T (expected DInt)", d)
}
jobList[i] = jobspb.JobID(*id)
}
return d.JobRegistry.WaitForJobsIgnoringJobErrors(ctx, d.InternalExecutor, jobList)
}
Loading

0 comments on commit db5b216

Please sign in to comment.