From 6c31b48bc95edbb88a24a718398b7d745a5d7c76 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 5 Feb 2021 01:26:00 -0500 Subject: [PATCH] sql,systemschema,migration: add `system.long_running_migrations`, adopt This commit adds a table, `system.long_running_migrations`, to store the completion status of long-running migrations. This table is used to ensure safety and isolation in the face of concurrent attempts to upgrade the cluster version. An important facet of this change is the migration to add this table to the cluster during upgrades. Given that the implementation of long-running migrations relies on the table's existence, all versions which are associated with long-running migrations must follow the version which introduces this table. This migration needs to utilize the new infrastructure rather than the `sqlmigrations` infrastructure because of some really bad behavior introduced in 20.2 (#59850). Given two such versions already existed, they have been assigned new version keys and their old version keys and values are now merely placeholders. This provides comptability with the earlier alphas. Release note: None --- docs/generated/settings/settings.html | 2 +- pkg/ccl/backupccl/system_schema.go | 3 + pkg/clusterversion/cockroach_versions.go | 36 +++++- pkg/clusterversion/key_string.go | 11 +- pkg/keys/constants.go | 1 + pkg/migration/BUILD.bazel | 2 + pkg/migration/migration.go | 21 +++- pkg/migration/migrationjob/BUILD.bazel | 8 ++ pkg/migration/migrationjob/migration_job.go | 100 ++++++++++++++++- pkg/migration/migrationmanager/BUILD.bazel | 6 +- pkg/migration/migrationmanager/manager.go | 81 ++++++++------ .../migrationmanager/manager_external_test.go | 103 +++++++++++++++++- pkg/migration/migrations/BUILD.bazel | 1 + .../longrunningmigrations/BUILD.bazel | 14 +++ .../long_running_migrations.go | 36 ++++++ pkg/migration/migrations/migrations.go | 1 + .../truncatedstate/truncated_state.go | 4 +- pkg/migration/registry.go | 14 ++- pkg/server/server_sql.go | 4 +- pkg/sql/catalog/bootstrap/metadata.go | 1 + pkg/sql/catalog/descpb/privilege.go | 1 + pkg/sql/catalog/systemschema/system.go | 60 ++++++++++ pkg/sql/tests/system_table_test.go | 1 + pkg/sqlmigrations/migrations.go | 46 +++++--- 24 files changed, 478 insertions(+), 79 deletions(-) create mode 100644 pkg/migration/migrations/longrunningmigrations/BUILD.bazel create mode 100644 pkg/migration/migrations/longrunningmigrations/long_running_migrations.go diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 81c75d1ecda7..237467a83db6 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -100,6 +100,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen at https:///debug/requests trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set -versionversion20.2-18set the active cluster version in the format '.' +versionversion20.2-24set the active cluster version in the format '.' diff --git a/pkg/ccl/backupccl/system_schema.go b/pkg/ccl/backupccl/system_schema.go index f90f1ea22bd3..b3279929d121 100644 --- a/pkg/ccl/backupccl/system_schema.go +++ b/pkg/ccl/backupccl/system_schema.go @@ -227,6 +227,9 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{ systemschema.WebSessionsTable.GetName(): { includeInClusterBackup: optOutOfClusterBackup, }, + systemschema.LongRunningMigrationsTable.GetName(): { + includeInClusterBackup: optOutOfClusterBackup, + }, } // getSystemTablesToBackup returns a set of system table names that should be diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index e15974015e4d..57c6e2d90166 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -203,6 +203,24 @@ const ( CPutInline // ReplicaVersions enables the versioning of Replica state. ReplicaVersions + // ReplacedTruncatedAndRangeAppliedStateMigration stands in for + // TruncatedAndRangeAppliedStateMigration which was re-introduced after the + // migration job was introduced. This is necessary because the jobs + // infrastructure used to run this migration in v21.1 and its later alphas + // was introduced after this version was first introduced. Later code in the + // release relies on the job to run the migration but the job relies on + // its startup migrations having been run. Versions associated with long + // running migrations must follow LongRunningMigrations. + ReplacedTruncatedAndRangeAppliedStateMigration + // ReplacedPostTruncatedAndRangeAppliedStateMigration is like the above + // version. See its comment. + ReplacedPostTruncatedAndRangeAppliedStateMigration + // NewSchemaChanger enables the new schema changer. + NewSchemaChanger + // LongRunningMigrations introduces the LongRunningMigrations table and jobs. + // All versions which have a registered long-running migration must have a + // version higher than this version. + LongRunningMigrations // TruncatedAndRangeAppliedStateMigration is part of the migration to stop // using the legacy truncated state within KV. After the migration, we'll be // using the unreplicated truncated state and the RangeAppliedState on all @@ -217,8 +235,6 @@ const ( // using the replicated legacy TruncatedState. It's also used in asserting // that no replicated truncated state representation is found. PostTruncatedAndRangeAppliedStateMigration - // NewSchemaChanger enables the new schema changer. - NewSchemaChanger // Step (1): Add new versions here. ) @@ -350,17 +366,29 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Version: roachpb.Version{Major: 20, Minor: 2, Internal: 12}, }, { - Key: TruncatedAndRangeAppliedStateMigration, + Key: ReplacedTruncatedAndRangeAppliedStateMigration, Version: roachpb.Version{Major: 20, Minor: 2, Internal: 14}, }, { - Key: PostTruncatedAndRangeAppliedStateMigration, + Key: ReplacedPostTruncatedAndRangeAppliedStateMigration, Version: roachpb.Version{Major: 20, Minor: 2, Internal: 16}, }, { Key: NewSchemaChanger, Version: roachpb.Version{Major: 20, Minor: 2, Internal: 18}, }, + { + Key: LongRunningMigrations, + Version: roachpb.Version{Major: 20, Minor: 2, Internal: 20}, + }, + { + Key: TruncatedAndRangeAppliedStateMigration, + Version: roachpb.Version{Major: 20, Minor: 2, Internal: 22}, + }, + { + Key: PostTruncatedAndRangeAppliedStateMigration, + Version: roachpb.Version{Major: 20, Minor: 2, Internal: 24}, + }, // Step (2): Add new versions here. }) diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 140c4bc46ddd..53a57522e4ca 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -34,14 +34,17 @@ func _() { _ = x[VirtualComputedColumns-23] _ = x[CPutInline-24] _ = x[ReplicaVersions-25] - _ = x[TruncatedAndRangeAppliedStateMigration-26] - _ = x[PostTruncatedAndRangeAppliedStateMigration-27] + _ = x[ReplacedTruncatedAndRangeAppliedStateMigration-26] + _ = x[ReplacedPostTruncatedAndRangeAppliedStateMigration-27] _ = x[NewSchemaChanger-28] + _ = x[LongRunningMigrations-29] + _ = x[TruncatedAndRangeAppliedStateMigration-30] + _ = x[PostTruncatedAndRangeAppliedStateMigration-31] } -const _Key_name = "NamespaceTableWithSchemasStart20_2GeospatialTypeEnumsRangefeedLeasesAlterColumnTypeGeneralAlterSystemJobsAddCreatedByColumnsAddScheduledJobsTableUserDefinedSchemasNoOriginFKIndexesNodeMembershipStatusMinPasswordLengthAbortSpanBytesAlterSystemJobsAddSqllivenessColumnsAddNewSystemSqllivenessTableMaterializedViewsBox2DTypeUpdateScheduledJobsSchemaCreateLoginPrivilegeHBAForNonTLSV20_2Start21_1EmptyArraysInInvertedIndexesUniqueWithoutIndexConstraintsVirtualComputedColumnsCPutInlineReplicaVersionsTruncatedAndRangeAppliedStateMigrationPostTruncatedAndRangeAppliedStateMigrationNewSchemaChanger" +const _Key_name = "NamespaceTableWithSchemasStart20_2GeospatialTypeEnumsRangefeedLeasesAlterColumnTypeGeneralAlterSystemJobsAddCreatedByColumnsAddScheduledJobsTableUserDefinedSchemasNoOriginFKIndexesNodeMembershipStatusMinPasswordLengthAbortSpanBytesAlterSystemJobsAddSqllivenessColumnsAddNewSystemSqllivenessTableMaterializedViewsBox2DTypeUpdateScheduledJobsSchemaCreateLoginPrivilegeHBAForNonTLSV20_2Start21_1EmptyArraysInInvertedIndexesUniqueWithoutIndexConstraintsVirtualComputedColumnsCPutInlineReplicaVersionsReplacedTruncatedAndRangeAppliedStateMigrationReplacedPostTruncatedAndRangeAppliedStateMigrationNewSchemaChangerLongRunningMigrationsTruncatedAndRangeAppliedStateMigrationPostTruncatedAndRangeAppliedStateMigration" -var _Key_index = [...]uint16{0, 25, 34, 48, 53, 68, 90, 124, 145, 163, 180, 200, 217, 231, 295, 312, 321, 346, 366, 378, 383, 392, 420, 449, 471, 481, 496, 534, 576, 592} +var _Key_index = [...]uint16{0, 25, 34, 48, 53, 68, 90, 124, 145, 163, 180, 200, 217, 231, 295, 312, 321, 346, 366, 378, 383, 392, 420, 449, 471, 481, 496, 542, 592, 608, 629, 667, 709} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index d4f8c978accf..9bec28d2fe8c 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -399,6 +399,7 @@ const ( ScheduledJobsTableID = 37 TenantsRangesID = 38 // pseudo SqllivenessID = 39 + LongRunningMigrationsID = 40 // CommentType is type for system.comments DatabaseCommentType = 0 diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index 9ad83fe4cf7d..98a8989e3c6b 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -11,10 +11,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", + "//pkg/keys", "//pkg/kv", "//pkg/roachpb", "//pkg/security", "//pkg/server/serverpb", + "//pkg/settings/cluster", "//pkg/util/log", "@com_github_cockroachdb_logtags//:logtags", ], diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go index 94bfff02f356..7678ae28aca5 100644 --- a/pkg/migration/migration.go +++ b/pkg/migration/migration.go @@ -26,10 +26,12 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/logtags" ) @@ -165,22 +167,29 @@ type Cluster interface { // in #57445. Once that makes its way into master, this TODO can be removed. type Migration struct { cv clusterversion.ClusterVersion - fn MigrationFn + fn SQLMigrationFn desc string } -type MigrationFn func(context.Context, clusterversion.ClusterVersion, Cluster) error +// SQLDeps are the dependencies of migrations which perform actions at the +// SQL layer. Lower-level migrations may depend solely on the Cluster. +type SQLDeps struct { + Cluster Cluster + Codec keys.SQLCodec + Settings *cluster.Settings +} + +type KVMigrationFn func(context.Context, clusterversion.ClusterVersion, Cluster) error +type SQLMigrationFn func(context.Context, clusterversion.ClusterVersion, SQLDeps) error // Run kickstarts the actual migration process. It's responsible for recording // the ongoing status of the migration into a system table. -// -// TODO(irfansharif): Introduce a `system.migrations` table, and populate it here. func (m *Migration) Run( - ctx context.Context, cv clusterversion.ClusterVersion, h Cluster, + ctx context.Context, cv clusterversion.ClusterVersion, d SQLDeps, ) (err error) { ctx = logtags.AddTag(ctx, fmt.Sprintf("migration=%s", cv), nil) - if err := m.fn(ctx, cv, h); err != nil { + if err := m.fn(ctx, cv, d); err != nil { return err } diff --git a/pkg/migration/migrationjob/BUILD.bazel b/pkg/migration/migrationjob/BUILD.bazel index 47544b726855..8b7717c32bae 100644 --- a/pkg/migration/migrationjob/BUILD.bazel +++ b/pkg/migration/migrationjob/BUILD.bazel @@ -6,10 +6,18 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrationjob", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/kv", "//pkg/migration", + "//pkg/security", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/sql/sqlutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/migration/migrationjob/migration_job.go b/pkg/migration/migrationjob/migration_job.go index 34b4899e23ef..8b63af2736a5 100644 --- a/pkg/migration/migrationjob/migration_job.go +++ b/pkg/migration/migrationjob/migration_job.go @@ -15,11 +15,19 @@ package migrationjob import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) func init() { @@ -28,6 +36,19 @@ func init() { }) } +// NewRecord constructs a new jobs.Record for this migration. +func NewRecord(version clusterversion.ClusterVersion, user security.SQLUsername) jobs.Record { + return jobs.Record{ + Description: "Long running migration", + Details: jobspb.LongRunningMigrationDetails{ + ClusterVersion: &version, + }, + Username: user, + Progress: jobspb.LongRunningMigrationProgress{}, + NonCancelable: true, + } +} + type resumer struct { j *jobs.Job } @@ -35,15 +56,90 @@ type resumer struct { var _ jobs.Resumer = (*resumer)(nil) func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { - // TODO(ajwerner): add some check to see if we're done. + execCtx := execCtxI.(sql.JobExecContext) pl := r.j.Payload() cv := *pl.GetLongRunningMigration().ClusterVersion + ie := execCtx.ExecCfg().InternalExecutor + + alreadyCompleted, err := CheckIfMigrationCompleted(ctx, nil /* txn */, ie, cv) + if alreadyCompleted || err != nil { + return errors.Wrapf(err, "checking migration completion for %v", cv) + } m, ok := migration.GetMigration(cv) + deps := migration.SQLDeps{ + Cluster: execCtx.MigrationCluster(), + Codec: execCtx.ExecCfg().Codec, + Settings: execCtx.ExecCfg().Settings, + } if !ok { + // TODO(ajwerner): Consider treating this as an assertion failure. Jobs + // should only be created for a cluster version if there is an associated + // migration. return nil } - return m.Run(ctx, cv, execCtx.MigrationCluster()) + if err := m.Run(ctx, cv, deps); err != nil { + return errors.Wrapf(err, "running migration for %v", cv) + } + + // Mark the migration as having been completed so that subsequent iterations + // no-op and new jobs are not created. + return errors.Wrapf(markMigrationCompleted(ctx, ie, cv), + "marking migration complete for %v", cv) +} + +// CheckIfMigrationCompleted queries the system.long_running_migrations table +// to determine if the migration associated with this version has already been +// completed. The txn may be nil, in which case the check will be run in its +// own transaction. +func CheckIfMigrationCompleted( + ctx context.Context, + txn *kv.Txn, + ie sqlutil.InternalExecutor, + version clusterversion.ClusterVersion, +) (alreadyCompleted bool, _ error) { + row, err := ie.QueryRow(ctx, "migration-manager-find-already-completed", txn, ` +SELECT EXISTS( + SELECT * + FROM system.long_running_migrations + WHERE major = $1 + AND minor = $2 + AND patch = $3 + AND internal = $4 + ); +`, + version.Major, version.Minor, version.Patch, version.Internal) + if err != nil { + return false, err + } + return bool(*row[0].(*tree.DBool)), nil +} + +func markMigrationCompleted( + ctx context.Context, ie sqlutil.InternalExecutor, cv clusterversion.ClusterVersion, +) error { + _, err := ie.ExecEx( + ctx, + "migration-job-mark-job-succeeded", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` +INSERT + INTO system.long_running_migrations + ( + major, + minor, + patch, + internal, + completed_at + ) +VALUES ($1, $2, $3, $4, $5)`, + cv.Major, + cv.Minor, + cv.Patch, + cv.Internal, + timeutil.Now()) + return err } // The long-running migration resumer has no reverting logic. diff --git a/pkg/migration/migrationmanager/BUILD.bazel b/pkg/migration/migrationmanager/BUILD.bazel index 3c2e7dad3aac..34a39d843602 100644 --- a/pkg/migration/migrationmanager/BUILD.bazel +++ b/pkg/migration/migrationmanager/BUILD.bazel @@ -8,11 +8,13 @@ go_library( deps = [ "//pkg/clusterversion", "//pkg/jobs", - "//pkg/jobs/jobspb", + "//pkg/keys", "//pkg/kv", "//pkg/migration", + "//pkg/migration/migrationjob", "//pkg/security", "//pkg/server/serverpb", + "//pkg/settings/cluster", "//pkg/sql/protoreflect", "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", @@ -44,10 +46,12 @@ go_test( "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", + "//pkg/util", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", + "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/migration/migrationmanager/manager.go b/pkg/migration/migrationmanager/manager.go index 880f776a4b46..df15f4059a64 100644 --- a/pkg/migration/migrationmanager/manager.go +++ b/pkg/migration/migrationmanager/manager.go @@ -19,11 +19,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrationjob" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -35,17 +37,27 @@ import ( // Manager is the instance responsible for executing migrations across the // cluster. type Manager struct { - c migration.Cluster - ie sqlutil.InternalExecutor - jr *jobs.Registry + c migration.Cluster + ie sqlutil.InternalExecutor + jr *jobs.Registry + codec keys.SQLCodec + settings *cluster.Settings } // NewManager constructs a new Manager. -func NewManager(c migration.Cluster, ie sqlutil.InternalExecutor, jr *jobs.Registry) *Manager { +func NewManager( + c migration.Cluster, + ie sqlutil.InternalExecutor, + jr *jobs.Registry, + codec keys.SQLCodec, + settings *cluster.Settings, +) *Manager { return &Manager{ - c: c, - ie: ie, - jr: jr, + c: c, + ie: ie, + jr: jr, + codec: codec, + settings: settings, } } @@ -63,12 +75,6 @@ func (m *Manager) Migrate( return nil } - // TODO(irfansharif): We'll need to acquire a lease here and refresh it - // throughout during the migration to ensure mutual exclusion. - - // TODO(irfansharif): We'll need to create a system table to store - // in-progress state of long running migrations, for introspection. - clusterVersions := clusterversion.ListBetween(from, to) if len(clusterVersions) == 0 { // We're attempt to migrate to something that's not defined in cluster @@ -216,11 +222,25 @@ func (m *Manager) Migrate( func (m *Manager) runMigration( ctx context.Context, user security.SQLUsername, version clusterversion.ClusterVersion, ) error { - if _, exists := migration.GetMigration(version); !exists { + mig, exists := migration.GetMigration(version) + if !exists { return nil } - id, err := m.getOrCreateMigrationJob(ctx, user, version) - if err != nil { + // The migration which introduces the infrastructure for running other long + // running migrations in jobs. It needs to be special-cased and run without + // a job or leasing for bootstrapping purposes. Fortunately it has been + // designed to be idempotent and cheap. + // + // TODO(ajwerner): Remove in 21.2. + if version.Version == clusterversion.ByKey(clusterversion.LongRunningMigrations) { + return mig.Run(ctx, version, migration.SQLDeps{ + Cluster: m.c, + Codec: m.codec, + Settings: m.settings, + }) + } + alreadyCompleted, id, err := m.getOrCreateMigrationJob(ctx, user, version) + if alreadyCompleted || err != nil { return err } return m.jr.Run(ctx, m.ie, []int64{id}) @@ -228,9 +248,12 @@ func (m *Manager) runMigration( func (m *Manager) getOrCreateMigrationJob( ctx context.Context, user security.SQLUsername, version clusterversion.ClusterVersion, -) (jobID int64, _ error) { - +) (alreadyCompleted bool, jobID int64, _ error) { if err := m.c.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + alreadyCompleted, err = migrationjob.CheckIfMigrationCompleted(ctx, txn, m.ie, version) + if alreadyCompleted || err != nil { + return err + } var found bool found, jobID, err = m.getRunningMigrationJob(ctx, txn, version) if err != nil { @@ -240,24 +263,16 @@ func (m *Manager) getOrCreateMigrationJob( return nil } var j *jobs.Job - j, err = m.jr.CreateJobWithTxn(ctx, jobs.Record{ - Description: "Long running migration", - Details: jobspb.LongRunningMigrationDetails{ - ClusterVersion: &version, - }, - Username: user, - Progress: jobspb.LongRunningMigrationProgress{}, - NonCancelable: true, - }, txn) + j, err = m.jr.CreateJobWithTxn(ctx, migrationjob.NewRecord(version, user), txn) if err != nil { return err } jobID = *j.ID() return nil }); err != nil { - return 0, err + return false, 0, err } - return jobID, nil + return alreadyCompleted, jobID, nil } func (m *Manager) getRunningMigrationJob( @@ -270,14 +285,14 @@ SELECT id, status status, crdb_internal.pb_to_json( 'cockroach.sql.jobs.jobspb.Payload', - payload + payload, + false -- emit_defaults ) AS pl FROM system.jobs WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` ) WHERE pl->'longRunningMigration'->'clusterVersion' = $1::JSON;` - // TODO(ajwerner): Flip the emitDefaults flag once this is rebased on master. - jsonMsg, err := protoreflect.MessageToJSON(&version, true /* emitDefaults */) + jsonMsg, err := protoreflect.MessageToJSON(&version, false /* emitDefaults */) if err != nil { return false, 0, errors.Wrap(err, "failed to marshal version to JSON") } diff --git a/pkg/migration/migrationmanager/manager_external_test.go b/pkg/migration/migrationmanager/manager_external_test.go index 3ddfcc711009..3c642cf67a2e 100644 --- a/pkg/migration/migrationmanager/manager_external_test.go +++ b/pkg/migration/migrationmanager/manager_external_test.go @@ -12,6 +12,7 @@ package migrationmanager_test import ( "context" + gosql "database/sql" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -26,11 +27,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) // TestAlreadyRunningJobsAreHandledProperly is a relatively low-level test to @@ -65,7 +68,7 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { ch := make(chan chan error) defer migration.TestingRegisterMigrationInterceptor(endCV, func( - ctx context.Context, cv clusterversion.ClusterVersion, h migration.Cluster, + ctx context.Context, cv clusterversion.ClusterVersion, h migration.SQLDeps, ) error { canResume := make(chan error) ch <- canResume @@ -197,15 +200,15 @@ func TestMigrateUpdatesReplicaVersion(t *testing.T) { t.Fatalf("got replica version %s, expected %s", got, startCV.Version) } - // Register the below raft migration. + // RegisterKVMigration the below raft migration. unregisterKVMigration := batcheval.TestingRegisterMigrationInterceptor(endCV.Version, func() {}) defer unregisterKVMigration() - // Register the top-level migration. + // RegisterKVMigration the top-level migration. unregister := migration.TestingRegisterMigrationInterceptor(endCV, func( - ctx context.Context, cv clusterversion.ClusterVersion, c migration.Cluster, + ctx context.Context, cv clusterversion.ClusterVersion, d migration.SQLDeps, ) error { - return c.DB().Migrate(ctx, desc.StartKey, desc.EndKey, cv.Version) + return d.Cluster.DB().Migrate(ctx, desc.StartKey, desc.EndKey, cv.Version) }) defer unregister() @@ -233,3 +236,93 @@ func TestMigrateUpdatesReplicaVersion(t *testing.T) { t.Fatalf("got replica version %s, expected %s", got, endCV.Version) } } + +// TestConcurrentMigrationAttempts ensures that concurrent attempts to run +// migrations over a number of versions exhibits reasonable behavior. Namely, +// that each migration gets run one time and that migrations do not get run +// again. +func TestConcurrentMigrationAttempts(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // We're going to be migrating from startKey to endKey. We end up needing + // to use real versions because the ListBetween uses the keys compiled into + // the clusterversion package. + const ( + startKey = clusterversion.LongRunningMigrations + endKey = clusterversion.PostTruncatedAndRangeAppliedStateMigration + ) + migrationRunCounts := make(map[clusterversion.ClusterVersion]int) + var unregisterFuncs []func() + defer func() { + for _, f := range unregisterFuncs { + f() + } + }() + + // RegisterKVMigration the migrations to update the map with run counts. + // There should definitely not be any concurrency of execution, so the race + // detector should not fire. + for key := startKey + 1; key <= endKey; key++ { + unregisterFuncs = append(unregisterFuncs, + migration.TestingRegisterMigrationInterceptor( + clusterversion.ClusterVersion{ + Version: clusterversion.ByKey(key), + }, + func( + ctx context.Context, cv clusterversion.ClusterVersion, c migration.SQLDeps, + ) error { + migrationRunCounts[cv]++ + return nil + })) + } + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.ByKey(endKey), + clusterversion.ByKey(startKey), + false, + ), + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + BinaryVersionOverride: clusterversion.ByKey(startKey), + DisableAutomaticVersionUpgrade: 1, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Run N instances of the migration concurrently on different connections. + // They should all eventually succeed; some may internally experience a + // serializable restart but cockroach will handle that transparently. + // Afterwards we'll ensure that no migration was run more than once. + N := 25 + if util.RaceEnabled { + N = 5 + } + db := tc.ServerConn(0) + db.SetMaxOpenConns(N) + conns := make([]*gosql.Conn, N) + for i := range conns { + var err error + conns[i], err = db.Conn(ctx) + require.NoError(t, err) + } + var g errgroup.Group + for i := 0; i < N; i++ { + conn := conns[i] + g.Go(func() error { + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(endKey).String()) + return err + }) + } + require.Nil(t, g.Wait()) + for k, c := range migrationRunCounts { + require.Equalf(t, 1, c, "version: %v", k) + } + require.Len(t, migrationRunCounts, len(unregisterFuncs)) +} diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index 3fba9346bebe..5aeacaa8975f 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/migration/migrationjob", + "//pkg/migration/migrations/longrunningmigrations", "//pkg/migration/migrations/truncatedstate", ], ) diff --git a/pkg/migration/migrations/longrunningmigrations/BUILD.bazel b/pkg/migration/migrations/longrunningmigrations/BUILD.bazel new file mode 100644 index 000000000000..d9c687da84c5 --- /dev/null +++ b/pkg/migration/migrations/longrunningmigrations/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "longrunningmigrations", + srcs = ["long_running_migrations.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrations/longrunningmigrations", + visibility = ["//visibility:public"], + deps = [ + "//pkg/clusterversion", + "//pkg/migration", + "//pkg/sql/catalog/systemschema", + "//pkg/sqlmigrations", + ], +) diff --git a/pkg/migration/migrations/longrunningmigrations/long_running_migrations.go b/pkg/migration/migrations/longrunningmigrations/long_running_migrations.go new file mode 100644 index 000000000000..42630b50cb72 --- /dev/null +++ b/pkg/migration/migrations/longrunningmigrations/long_running_migrations.go @@ -0,0 +1,36 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package longrunningmigrations + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sqlmigrations" +) + +func init() { + migration.RegisterSQLMigration( + clusterversion.LongRunningMigrations, + longRunningMigrationsTableMigration, + "add the system.long_running_migrations table", + ) +} + +func longRunningMigrationsTableMigration( + ctx context.Context, _ clusterversion.ClusterVersion, d migration.SQLDeps, +) error { + return sqlmigrations.CreateSystemTable( + ctx, d.Cluster.DB(), d.Codec, d.Settings, systemschema.LongRunningMigrationsTable, + ) +} diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index d837971b93c8..9d236106b0bf 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -14,5 +14,6 @@ package migrations import ( _ "github.com/cockroachdb/cockroach/pkg/migration/migrationjob" + _ "github.com/cockroachdb/cockroach/pkg/migration/migrations/longrunningmigrations" _ "github.com/cockroachdb/cockroach/pkg/migration/migrations/truncatedstate" ) diff --git a/pkg/migration/migrations/truncatedstate/truncated_state.go b/pkg/migration/migrations/truncatedstate/truncated_state.go index 2a69fc05ad46..b3050cabdf4e 100644 --- a/pkg/migration/migrations/truncatedstate/truncated_state.go +++ b/pkg/migration/migrations/truncatedstate/truncated_state.go @@ -25,12 +25,12 @@ import ( ) func init() { - migration.Register( + migration.RegisterKVMigration( clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedStateMigration, "use unreplicated TruncatedState and RangeAppliedState for all ranges", ) - migration.Register( + migration.RegisterKVMigration( clusterversion.PostTruncatedAndRangeAppliedStateMigration, postTruncatedStateMigration, "purge all replicas using the replicated TruncatedState", diff --git a/pkg/migration/registry.go b/pkg/migration/registry.go index 612d24348512..6f7bfbb0f346 100644 --- a/pkg/migration/registry.go +++ b/pkg/migration/registry.go @@ -22,9 +22,17 @@ import ( // bump of the corresponding version gate. var registry = make(map[clusterversion.ClusterVersion]Migration) -// Register is a short hand to Register a given migration within the global +// RegisterKVMigration is a short hand to RegisterKVMigration a given migration within the global // registry. -func Register(key clusterversion.Key, fn MigrationFn, desc string) { +func RegisterKVMigration(key clusterversion.Key, fn KVMigrationFn, desc string) { + RegisterSQLMigration(key, func( + ctx context.Context, version clusterversion.ClusterVersion, deps SQLDeps, + ) error { + return fn(ctx, version, deps.Cluster) + }, desc) +} + +func RegisterSQLMigration(key clusterversion.Key, fn SQLMigrationFn, desc string) { cv := clusterversion.ClusterVersion{Version: clusterversion.ByKey(key)} if _, ok := registry[cv]; ok { log.Fatalf(context.Background(), "doubly registering migration for %s", cv) @@ -46,7 +54,7 @@ func GetMigration(key clusterversion.ClusterVersion) (Migration, bool) { // global state. This should instead be a testing knob that the migration // manager checks when search for attached migrations. func TestingRegisterMigrationInterceptor( - cv clusterversion.ClusterVersion, fn MigrationFn, + cv clusterversion.ClusterVersion, fn SQLMigrationFn, ) (unregister func()) { registry[cv] = Migration{cv: cv, fn: fn} return func() { delete(registry, cv) } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index ec98447d327c..9b72d3965022 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -632,7 +632,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { Dialer: cfg.nodeDialer, DB: cfg.db, }) - migrationMgr := migrationmanager.NewManager(c, cfg.circularInternalExecutor, jobRegistry) + migrationMgr := migrationmanager.NewManager( + c, cfg.circularInternalExecutor, jobRegistry, codec, cfg.Settings, + ) execCfg.MigrationCluster = c execCfg.VersionUpgradeHook = migrationMgr.Migrate } diff --git a/pkg/sql/catalog/bootstrap/metadata.go b/pkg/sql/catalog/bootstrap/metadata.go index 7818715e16f8..4c8bbedd5743 100644 --- a/pkg/sql/catalog/bootstrap/metadata.go +++ b/pkg/sql/catalog/bootstrap/metadata.go @@ -342,6 +342,7 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) { target.AddDescriptor(keys.SystemDatabaseID, systemschema.ScheduledJobsTable) target.AddDescriptor(keys.SystemDatabaseID, systemschema.SqllivenessTable) + target.AddDescriptor(keys.SystemDatabaseID, systemschema.LongRunningMigrationsTable) } // addSplitIDs adds a split point for each of the PseudoTableIDs to the supplied diff --git a/pkg/sql/catalog/descpb/privilege.go b/pkg/sql/catalog/descpb/privilege.go index f59406574fca..a925f3823119 100644 --- a/pkg/sql/catalog/descpb/privilege.go +++ b/pkg/sql/catalog/descpb/privilege.go @@ -435,6 +435,7 @@ var SystemAllowedPrivileges = map[ID]privilege.List{ keys.StatementDiagnosticsTableID: privilege.ReadWriteData, keys.ScheduledJobsTableID: privilege.ReadWriteData, keys.SqllivenessID: privilege.ReadWriteData, + keys.LongRunningMigrationsID: privilege.ReadWriteData, } // SetOwner sets the owner of the privilege descriptor to the provided string. diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 5adbb79e2b60..745e5165c7ec 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -345,6 +345,17 @@ CREATE TABLE system.sqlliveness ( expiration DECIMAL NOT NULL, FAMILY fam0_session_id_expiration (session_id, expiration) )` + + LongRunningMigrationsTableSchema = ` +CREATE TABLE system.long_running_migrations ( + major INT8 NOT NULL, + minor INT8 NOT NULL, + patch INT8 NOT NULL, + internal INT8 NOT NULL, + completed_at TIMESTAMPTZ NOT NULL, + FAMILY "primary" (major, minor, patch, internal, completed_at), + PRIMARY KEY (major, minor, patch, internal) +)` ) func pk(name string) descpb.IndexDescriptor { @@ -1690,6 +1701,55 @@ var ( FormatVersion: descpb.InterleavedFormatVersion, NextMutationID: 1, }) + + // LongRunningMigrationsTable is the descriptor for the + // long_running_migrations table. It stores facts about the completion state + // of long-running migrations. It is used to prevent migrations from running + // again after they have been completed. + LongRunningMigrationsTable = tabledesc.NewImmutable(descpb.TableDescriptor{ + Name: "long_running_migrations", + ID: keys.LongRunningMigrationsID, + ParentID: keys.SystemDatabaseID, + UnexposedParentSchemaID: keys.PublicSchemaID, + Version: 1, + Columns: []descpb.ColumnDescriptor{ + {Name: "major", ID: 1, Type: types.Int, Nullable: false}, + {Name: "minor", ID: 2, Type: types.Int, Nullable: false}, + {Name: "patch", ID: 3, Type: types.Int, Nullable: false}, + {Name: "internal", ID: 4, Type: types.Int, Nullable: false}, + {Name: "completed_at", ID: 5, Type: types.TimestampTZ, Nullable: false}, + }, + NextColumnID: 6, + Families: []descpb.ColumnFamilyDescriptor{ + { + Name: "primary", + ID: 0, + ColumnNames: []string{"major", "minor", "patch", "internal", "completed_at"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 5}, + DefaultColumnID: 5, + }, + }, + NextFamilyID: 1, + PrimaryIndex: descpb.IndexDescriptor{ + Name: tabledesc.PrimaryKeyIndexName, + ID: 1, + Unique: true, + ColumnNames: []string{"major", "minor", "patch", "internal"}, + ColumnDirections: []descpb.IndexDescriptor_Direction{ + descpb.IndexDescriptor_ASC, + descpb.IndexDescriptor_ASC, + descpb.IndexDescriptor_ASC, + descpb.IndexDescriptor_ASC, + }, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4}, + Version: descpb.EmptyArraysInInvertedIndexesVersion, + }, + NextIndexID: 2, + Privileges: descpb.NewCustomSuperuserPrivilegeDescriptor( + descpb.SystemAllowedPrivileges[keys.JobsTableID], security.NodeUserName()), + FormatVersion: descpb.InterleavedFormatVersion, + NextMutationID: 1, + }) ) // newCommentPrivilegeDescriptor returns a privilege descriptor for comment table diff --git a/pkg/sql/tests/system_table_test.go b/pkg/sql/tests/system_table_test.go index fe110ada1484..e369b2db7335 100644 --- a/pkg/sql/tests/system_table_test.go +++ b/pkg/sql/tests/system_table_test.go @@ -191,6 +191,7 @@ func TestSystemTableLiterals(t *testing.T) { {keys.StatementDiagnosticsTableID, systemschema.StatementDiagnosticsTableSchema, systemschema.StatementDiagnosticsTable}, {keys.ScheduledJobsTableID, systemschema.ScheduledJobsTableSchema, systemschema.ScheduledJobsTable}, {keys.SqllivenessID, systemschema.SqllivenessTableSchema, systemschema.SqllivenessTable}, + {keys.LongRunningMigrationsID, systemschema.LongRunningMigrationsTableSchema, systemschema.LongRunningMigrationsTable}, } { privs := *test.pkg.GetPrivileges() gen, err := sql.CreateTestTableDescriptor( diff --git a/pkg/sqlmigrations/migrations.go b/pkg/sqlmigrations/migrations.go index 8868a0032439..7571b70fc00d 100644 --- a/pkg/sqlmigrations/migrations.go +++ b/pkg/sqlmigrations/migrations.go @@ -334,14 +334,14 @@ var backwardCompatibleMigrations = []migrationDescriptor{ func staticIDs( ids ...descpb.ID, -) func(ctx context.Context, db db, codec keys.SQLCodec) ([]descpb.ID, error) { - return func(ctx context.Context, db db, codec keys.SQLCodec) ([]descpb.ID, error) { return ids, nil } +) func(ctx context.Context, db DB, codec keys.SQLCodec) ([]descpb.ID, error) { + return func(ctx context.Context, db DB, codec keys.SQLCodec) ([]descpb.ID, error) { return ids, nil } } func databaseIDs( names ...string, -) func(ctx context.Context, db db, codec keys.SQLCodec) ([]descpb.ID, error) { - return func(ctx context.Context, db db, codec keys.SQLCodec) ([]descpb.ID, error) { +) func(ctx context.Context, db DB, codec keys.SQLCodec) ([]descpb.ID, error) { + return func(ctx context.Context, db DB, codec keys.SQLCodec) ([]descpb.ID, error) { var ids []descpb.ID for _, name := range names { // This runs as part of an older migration (introduced in 2.1). We use @@ -392,7 +392,7 @@ type migrationDescriptor struct { // descriptors that were added by this migration. This is needed to automate // certain tests, which check the number of ranges/descriptors present on // server bootup. - newDescriptorIDs func(ctx context.Context, db db, codec keys.SQLCodec) ([]descpb.ID, error) + newDescriptorIDs func(ctx context.Context, db DB, codec keys.SQLCodec) ([]descpb.ID, error) } func init() { @@ -408,7 +408,7 @@ func init() { } type runner struct { - db db + db DB codec keys.SQLCodec sqlExecutor *sql.InternalExecutor settings *cluster.Settings @@ -449,9 +449,9 @@ type leaseManager interface { TimeRemaining(l *leasemanager.Lease) time.Duration } -// db is defined just to allow us to use a fake client.DB when testing this +// DB is defined just to allow us to use a fake client.DB when testing this // package. -type db interface { +type DB interface { Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error) Get(ctx context.Context, key interface{}) (kv.KeyValue, error) Put(ctx context.Context, key, value interface{}) error @@ -463,7 +463,7 @@ type db interface { type Manager struct { stopper *stop.Stopper leaseManager leaseManager - db db + db DB codec keys.SQLCodec sqlExecutor *sql.InternalExecutor testingKnobs MigrationManagerTestingKnobs @@ -508,7 +508,7 @@ func NewManager( // lifecycle is tightly controlled. func ExpectedDescriptorIDs( ctx context.Context, - db db, + db DB, codec keys.SQLCodec, defaultZoneConfig *zonepb.ZoneConfig, defaultSystemZoneConfig *zonepb.ZoneConfig, @@ -891,7 +891,7 @@ func (m *Manager) migrateSystemNamespace( } func getCompletedMigrations( - ctx context.Context, db db, codec keys.SQLCodec, + ctx context.Context, db DB, codec keys.SQLCodec, ) (map[string]struct{}, error) { if log.V(1) { log.Info(ctx, "trying to get the list of completed migrations") @@ -913,14 +913,26 @@ func migrationKey(codec keys.SQLCodec, migration migrationDescriptor) roachpb.Ke } func createSystemTable(ctx context.Context, r runner, desc catalog.TableDescriptor) error { + return CreateSystemTable(ctx, r.db, r.codec, r.settings, desc) +} + +// CreateSystemTable is a function to inject a new system table. If the table +// already exists, ths function is a no-op. +func CreateSystemTable( + ctx context.Context, + db DB, + codec keys.SQLCodec, + settings *cluster.Settings, + desc catalog.TableDescriptor, +) error { // We install the table at the KV layer so that we can choose a known ID in // the reserved ID space. (The SQL layer doesn't allow this.) - err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() - tKey := catalogkv.MakePublicTableNameKey(ctx, r.settings, desc.GetParentID(), desc.GetName()) - b.CPut(tKey.Key(r.codec), desc.GetID(), nil) - b.CPut(catalogkeys.MakeDescMetadataKey(r.codec, desc.GetID()), desc.DescriptorProto(), nil) - if err := txn.SetSystemConfigTrigger(r.codec.ForSystemTenant()); err != nil { + tKey := catalogkv.MakePublicTableNameKey(ctx, settings, desc.GetParentID(), desc.GetName()) + b.CPut(tKey.Key(codec), desc.GetID(), nil) + b.CPut(catalogkeys.MakeDescMetadataKey(codec, desc.GetID()), desc.DescriptorProto(), nil) + if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { return err } return txn.Run(ctx, b) @@ -1137,7 +1149,7 @@ func createDefaultDbs(ctx context.Context, r runner) error { for retry := retry.Start(retry.Options{MaxRetries: 5}); retry.Next(); { for _, dbName := range []string{catalogkeys.DefaultDatabaseName, catalogkeys.PgDatabaseName} { stmt := fmt.Sprintf(createDbStmt, dbName) - err = r.execAsRoot(ctx, "create-default-db", stmt) + err = r.execAsRoot(ctx, "create-default-DB", stmt) if err != nil { log.Warningf(ctx, "failed attempt to add database %q: %s", dbName, err) break