Skip to content

Commit

Permalink
sql,systemschema,migration: add system.long_running_migrations, adopt
Browse files Browse the repository at this point in the history
This commit adds a table, `system.long_running_migrations`, to store the
completion status of long-running migrations. This table is used to ensure
safety and isolation in the face of concurrent attempts to upgrade the
cluster version.

An important facet of this change is the migration to add this table
to the cluster during upgrades. Given that the implementation of long-running
migrations relies on the table's existence, all versions which are associated
with long-running migrations must follow the version which introduces this
table. This migration needs to utilize the new infrastructure rather than
the `sqlmigrations` infrastructure because of some really bad behavior
introduced in 20.2 (#59850). Given two such versions already existed, they
have been assigned new version keys and their old version keys and values
are now merely placeholders. This provides comptability with the earlier alphas.

Release note: None
  • Loading branch information
ajwerner committed Feb 7, 2021
1 parent 81361ec commit 6c31b48
Show file tree
Hide file tree
Showing 24 changed files with 478 additions and 79 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-18</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-24</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
systemschema.WebSessionsTable.GetName(): {
includeInClusterBackup: optOutOfClusterBackup,
},
systemschema.LongRunningMigrationsTable.GetName(): {
includeInClusterBackup: optOutOfClusterBackup,
},
}

// getSystemTablesToBackup returns a set of system table names that should be
Expand Down
36 changes: 32 additions & 4 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ const (
CPutInline
// ReplicaVersions enables the versioning of Replica state.
ReplicaVersions
// ReplacedTruncatedAndRangeAppliedStateMigration stands in for
// TruncatedAndRangeAppliedStateMigration which was re-introduced after the
// migration job was introduced. This is necessary because the jobs
// infrastructure used to run this migration in v21.1 and its later alphas
// was introduced after this version was first introduced. Later code in the
// release relies on the job to run the migration but the job relies on
// its startup migrations having been run. Versions associated with long
// running migrations must follow LongRunningMigrations.
ReplacedTruncatedAndRangeAppliedStateMigration
// ReplacedPostTruncatedAndRangeAppliedStateMigration is like the above
// version. See its comment.
ReplacedPostTruncatedAndRangeAppliedStateMigration
// NewSchemaChanger enables the new schema changer.
NewSchemaChanger
// LongRunningMigrations introduces the LongRunningMigrations table and jobs.
// All versions which have a registered long-running migration must have a
// version higher than this version.
LongRunningMigrations
// TruncatedAndRangeAppliedStateMigration is part of the migration to stop
// using the legacy truncated state within KV. After the migration, we'll be
// using the unreplicated truncated state and the RangeAppliedState on all
Expand All @@ -217,8 +235,6 @@ const (
// using the replicated legacy TruncatedState. It's also used in asserting
// that no replicated truncated state representation is found.
PostTruncatedAndRangeAppliedStateMigration
// NewSchemaChanger enables the new schema changer.
NewSchemaChanger

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -350,17 +366,29 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 12},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Key: ReplacedTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 14},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Key: ReplacedPostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 16},
},
{
Key: NewSchemaChanger,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 18},
},
{
Key: LongRunningMigrations,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 20},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 22},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 24},
},
// Step (2): Add new versions here.
})

Expand Down
11 changes: 7 additions & 4 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ const (
ScheduledJobsTableID = 37
TenantsRangesID = 38 // pseudo
SqllivenessID = 39
LongRunningMigrationsID = 40

// CommentType is type for system.comments
DatabaseCommentType = 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/util/log",
"@com_github_cockroachdb_logtags//:logtags",
],
Expand Down
21 changes: 15 additions & 6 deletions pkg/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/logtags"
)

Expand Down Expand Up @@ -165,22 +167,29 @@ type Cluster interface {
// in #57445. Once that makes its way into master, this TODO can be removed.
type Migration struct {
cv clusterversion.ClusterVersion
fn MigrationFn
fn SQLMigrationFn
desc string
}

type MigrationFn func(context.Context, clusterversion.ClusterVersion, Cluster) error
// SQLDeps are the dependencies of migrations which perform actions at the
// SQL layer. Lower-level migrations may depend solely on the Cluster.
type SQLDeps struct {
Cluster Cluster
Codec keys.SQLCodec
Settings *cluster.Settings
}

type KVMigrationFn func(context.Context, clusterversion.ClusterVersion, Cluster) error
type SQLMigrationFn func(context.Context, clusterversion.ClusterVersion, SQLDeps) error

// Run kickstarts the actual migration process. It's responsible for recording
// the ongoing status of the migration into a system table.
//
// TODO(irfansharif): Introduce a `system.migrations` table, and populate it here.
func (m *Migration) Run(
ctx context.Context, cv clusterversion.ClusterVersion, h Cluster,
ctx context.Context, cv clusterversion.ClusterVersion, d SQLDeps,
) (err error) {
ctx = logtags.AddTag(ctx, fmt.Sprintf("migration=%s", cv), nil)

if err := m.fn(ctx, cv, h); err != nil {
if err := m.fn(ctx, cv, d); err != nil {
return err
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/migration/migrationjob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrationjob",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/migration",
"//pkg/security",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
100 changes: 98 additions & 2 deletions pkg/migration/migrationjob/migration_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@ package migrationjob
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

func init() {
Expand All @@ -28,22 +36,110 @@ func init() {
})
}

// NewRecord constructs a new jobs.Record for this migration.
func NewRecord(version clusterversion.ClusterVersion, user security.SQLUsername) jobs.Record {
return jobs.Record{
Description: "Long running migration",
Details: jobspb.LongRunningMigrationDetails{
ClusterVersion: &version,
},
Username: user,
Progress: jobspb.LongRunningMigrationProgress{},
NonCancelable: true,
}
}

type resumer struct {
j *jobs.Job
}

var _ jobs.Resumer = (*resumer)(nil)

func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error {
// TODO(ajwerner): add some check to see if we're done.

execCtx := execCtxI.(sql.JobExecContext)
pl := r.j.Payload()
cv := *pl.GetLongRunningMigration().ClusterVersion
ie := execCtx.ExecCfg().InternalExecutor

alreadyCompleted, err := CheckIfMigrationCompleted(ctx, nil /* txn */, ie, cv)
if alreadyCompleted || err != nil {
return errors.Wrapf(err, "checking migration completion for %v", cv)
}
m, ok := migration.GetMigration(cv)
deps := migration.SQLDeps{
Cluster: execCtx.MigrationCluster(),
Codec: execCtx.ExecCfg().Codec,
Settings: execCtx.ExecCfg().Settings,
}
if !ok {
// TODO(ajwerner): Consider treating this as an assertion failure. Jobs
// should only be created for a cluster version if there is an associated
// migration.
return nil
}
return m.Run(ctx, cv, execCtx.MigrationCluster())
if err := m.Run(ctx, cv, deps); err != nil {
return errors.Wrapf(err, "running migration for %v", cv)
}

// Mark the migration as having been completed so that subsequent iterations
// no-op and new jobs are not created.
return errors.Wrapf(markMigrationCompleted(ctx, ie, cv),
"marking migration complete for %v", cv)
}

// CheckIfMigrationCompleted queries the system.long_running_migrations table
// to determine if the migration associated with this version has already been
// completed. The txn may be nil, in which case the check will be run in its
// own transaction.
func CheckIfMigrationCompleted(
ctx context.Context,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
version clusterversion.ClusterVersion,
) (alreadyCompleted bool, _ error) {
row, err := ie.QueryRow(ctx, "migration-manager-find-already-completed", txn, `
SELECT EXISTS(
SELECT *
FROM system.long_running_migrations
WHERE major = $1
AND minor = $2
AND patch = $3
AND internal = $4
);
`,
version.Major, version.Minor, version.Patch, version.Internal)
if err != nil {
return false, err
}
return bool(*row[0].(*tree.DBool)), nil
}

func markMigrationCompleted(
ctx context.Context, ie sqlutil.InternalExecutor, cv clusterversion.ClusterVersion,
) error {
_, err := ie.ExecEx(
ctx,
"migration-job-mark-job-succeeded",
nil, /* txn */
sessiondata.NodeUserSessionDataOverride,
`
INSERT
INTO system.long_running_migrations
(
major,
minor,
patch,
internal,
completed_at
)
VALUES ($1, $2, $3, $4, $5)`,
cv.Major,
cv.Minor,
cv.Patch,
cv.Internal,
timeutil.Now())
return err
}

// The long-running migration resumer has no reverting logic.
Expand Down
6 changes: 5 additions & 1 deletion pkg/migration/migrationmanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ go_library(
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/migration",
"//pkg/migration/migrationjob",
"//pkg/security",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlutil",
Expand Down Expand Up @@ -44,10 +46,12 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)
Loading

0 comments on commit 6c31b48

Please sign in to comment.