diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index 022d93ee1215..dfc2fc9c388e 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -83,6 +83,8 @@ go_test( "public_schema_migration_external_test.go", "raft_applied_index_term_external_test.go", "remove_invalid_database_privileges_external_test.go", + "schema_changes_external_test.go", + "schema_changes_helpers_test.go", ], data = glob(["testdata/**"]), embed = [":migrations"], @@ -91,12 +93,14 @@ go_test( "//pkg/base", "//pkg/clusterversion", "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvserver", "//pkg/kv/kvserver/stateloader", + "//pkg/migration", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/migration/migrations/helpers_test.go b/pkg/migration/migrations/helpers_test.go index a229e87f2104..6270bc07eeb6 100644 --- a/pkg/migration/migrations/helpers_test.go +++ b/pkg/migration/migrations/helpers_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -45,6 +46,12 @@ type Schema struct { // Migrate runs cluster migration by changing the 'version' cluster setting. func Migrate( t *testing.T, sqlDB *gosql.DB, key clusterversion.Key, done chan struct{}, expectError bool, +) { + UpgradeToVersion(t, sqlDB, clusterversion.ByKey(key), done, expectError) +} + +func UpgradeToVersion( + t *testing.T, sqlDB *gosql.DB, v roachpb.Version, done chan struct{}, expectError bool, ) { defer func() { if done != nil { @@ -52,7 +59,7 @@ func Migrate( } }() _, err := sqlDB.Exec(`SET CLUSTER SETTING version = $1`, - clusterversion.ByKey(key).String()) + v.String()) if expectError { assert.Error(t, err) return @@ -151,3 +158,6 @@ func GetTable( require.NoError(t, err) return table } + +// WaitForJobStatement is exported so that it can be detected by a testing knob. +const WaitForJobStatement = waitForJobStatement diff --git a/pkg/migration/migrations/schema_changes.go b/pkg/migration/migrations/schema_changes.go index 37ac41c74866..d84ef37b37a0 100644 --- a/pkg/migration/migrations/schema_changes.go +++ b/pkg/migration/migrations/schema_changes.go @@ -43,6 +43,10 @@ type operation struct { schemaExistsFn func(catalog.TableDescriptor, catalog.TableDescriptor, string) (bool, error) } +// waitForJobStatement is the statement used to wait for an ongoing job to +// complete. +const waitForJobStatement = "SHOW JOBS WHEN COMPLETE VALUES ($1)" + // migrateTable is run during a migration to a new version and changes an existing // table's schema based on schemaChangeQuery. The schema-change is ignored if the // table already has the required changes. @@ -92,7 +96,7 @@ func migrateTable( for _, mutation := range mutations { log.Infof(ctx, "waiting for the mutation job %v to complete", mutation.JobID) if _, err := d.InternalExecutor.Exec(ctx, "migration-mutations-wait", - nil, "SHOW JOB WHEN COMPLETE $1", mutation.JobID); err != nil { + nil, waitForJobStatement, mutation.JobID); err != nil { return err } } @@ -242,6 +246,8 @@ func hasIndex(storedTable, expectedTable catalog.TableDescriptor, indexName stri expectedCopy.StoreColumnNames = []string{} storedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} expectedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} + storedCopy.CreatedAtNanos = 0 + expectedCopy.CreatedAtNanos = 0 if err = ensureProtoMessagesAreEqual(&expectedCopy, &storedCopy); err != nil { return false, err diff --git a/pkg/migration/migrations/schema_changes_external_test.go b/pkg/migration/migrations/schema_changes_external_test.go new file mode 100644 index 000000000000..052d5e3cbd09 --- /dev/null +++ b/pkg/migration/migrations/schema_changes_external_test.go @@ -0,0 +1,494 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations_test + +import ( + "context" + "math" + "regexp" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrations" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestMigrationWithFailures tests modification of a table during +// migration with different failures. It tests the system behavior with failure +// combinations of the migration job and schema-change jobs at different stages +// in their progress. +// +// This test was originally written in support of the migration which added +// exponential backoff to the system.jobs table, but was retrofitted to prevent +// regressions. +func TestMigrationWithFailures(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t, "very slow") + + // We're going to be migrating from startCV to endCV. + startCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2041}} + endCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2042}} + + // The tests follows the following procedure. + // + // Inject the old table descriptor and ensure that the system is using the + // deprecated jobs-table. + // + // Start migration, which initiates two schema-change jobs one by one. Test + // the system for each schema-change job separately. Later on, we inject + // failure in this migration, causing it to fail. + // + // Depending on the test setting, intercept the target schema-change job, + // preventing the job from progressing. We may cancel this schema-change or + // let it succeed to test different scenarios. + // + // Cancel the migration, causing the migration to revert and fail. + // + // Wait for the canceled migration-job to finish, expecting its failure. The + // schema-change job is still not progressing to control what the restarted + // migration will observe. + // + // Restart the migration, expecting it to succeed. Depending on the test setting, + // the intercepted schema-change job may wail for the migration job to resume. + // If it does, the migration job is expected to observe the ongoing schema-change. + // The ongoing schema-change is canceled or not, depending on the test case. + // In either case, we expect the correct number of mutations to be skipped + // during the migration. + // + // If we canceled the schema-job, expect it to rerun + // as part of the migration. Otherwise, expect the schema-change to be ignored + // during the migration. + // + // Finally, we validate that the schema changes are in effect by reading the new + // columns and the index, and by running a job that is failed and retried to + // practice exponential-backoff machinery. + + const createTableBefore = ` +CREATE TABLE test.test_table ( + id INT8 DEFAULT unique_rowid() PRIMARY KEY, + status STRING NOT NULL, + created TIMESTAMP NOT NULL DEFAULT now(), + payload BYTES NOT NULL, + progress BYTES, + created_by_type STRING, + created_by_id INT, + claim_session_id BYTES, + claim_instance_id INT8, + INDEX (status, created), + INDEX (created_by_type, created_by_id) STORING (status), + FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id), + FAMILY progress (progress), + FAMILY claim (claim_session_id, claim_instance_id) +); +` + const createTableAfter = ` +CREATE TABLE test.test_table ( + id INT8 DEFAULT unique_rowid() PRIMARY KEY, + status STRING NOT NULL, + created TIMESTAMP NOT NULL DEFAULT now(), + payload BYTES NOT NULL, + progress BYTES, + created_by_type STRING, + created_by_id INT, + claim_session_id BYTES, + claim_instance_id INT8, + num_runs INT8, + last_run TIMESTAMP, + INDEX (status, created), + INDEX (created_by_type, created_by_id) STORING (status), + INDEX jobs_run_stats_idx ( + claim_session_id, + status, + created + ) STORING(last_run, num_runs, claim_instance_id) + WHERE ` + systemschema.JobsRunStatsIdxPredicate + `, + FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id), + FAMILY progress (progress), + FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run) +); +` + + for _, test := range []struct { + // Test identifier. + name string + // Job status when the job is intercepted while transitioning to the intercepted status. + query string + // Whether the schema-change job should wait for the migration to restart + // after failure before proceeding. + waitForMigrationRestart bool + // Cancel the intercepted schema-change to inject a failure during migration. + cancelSchemaJob bool + // Expected number of schema-changes that are skipped during migration. + expectedSkipped int + }{ + { + name: "adding columns", + query: migrations.TestingAddColsQuery, + waitForMigrationRestart: false, // Does not matter. + cancelSchemaJob: false, // Does not matter. + expectedSkipped: 0, // Will be ignored. + }, + { + name: "adding index", + query: migrations.TestingAddIndexQuery, + waitForMigrationRestart: false, // Does not matter. + cancelSchemaJob: false, // Does not matter. + expectedSkipped: 0, // Will be ignored. + }, + { + name: "fail adding columns", + query: migrations.TestingAddColsQuery, + waitForMigrationRestart: true, // Need to wait to observe failing schema change. + cancelSchemaJob: true, // To fail adding columns. + expectedSkipped: 0, + }, + { + name: "fail adding index", + query: migrations.TestingAddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe failing schema change. + cancelSchemaJob: true, // To fail adding index. + expectedSkipped: 1, // Columns must not be added again. + }, + { + name: "skip none", + query: migrations.TestingAddColsQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: true, // To fail adding index and skip adding column. + expectedSkipped: 0, // Both columns and index must be added. + }, + { + name: "skip adding columns", + query: migrations.TestingAddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: true, // To fail adding index and skip adding column. + expectedSkipped: 1, // Columns must not be added again. + }, + { + name: "skip adding columns and index", + query: migrations.TestingAddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: false, // To fail adding index and skip adding column. + expectedSkipped: 2, // Both columns and index must not be added again. + }, + } { + t.Run(test.name, func(t *testing.T) { + scope := log.Scope(t) + defer scope.Close(t) + + type updateEvent struct { + orig, updated jobs.JobMetadata + errChan chan error + } + + ctx := context.Background() + cancelCtx, cancel := context.WithCancel(ctx) + // To intercept the schema-change and the migration job. + updateEventChan := make(chan updateEvent) + var enableUpdateEventCh syncutil.AtomicBool + enableUpdateEventCh.Set(false) + beforeUpdate := func(orig, updated jobs.JobMetadata) error { + if !enableUpdateEventCh.Get() { + return nil + } + ue := updateEvent{ + orig: orig, + updated: updated, + errChan: make(chan error), + } + select { + case updateEventChan <- ue: + case <-cancelCtx.Done(): + return cancelCtx.Err() + } + select { + case err := <-ue.errChan: + return err + case <-cancelCtx.Done(): + return cancelCtx.Err() + } + } + var schemaEvent updateEvent + migrationWaitCh := make(chan struct{}) + + // Number of schema-change jobs that are skipped. + settings := cluster.MakeTestingClusterSettingsWithVersions( + endCV.Version, startCV.Version, false, /* initializeVersion */ + ) + require.NoError(t, clusterversion.Initialize( + ctx, startCV.Version, &settings.SV, + )) + jobsKnobs := jobs.NewTestingKnobsWithShortIntervals() + jobsKnobs.BeforeUpdate = beforeUpdate + migrationFunc, expectedDescriptor := migrations. + MakeFakeMigrationForTestMigrationWithFailures() + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: startCV.Version, + }, + JobsTestingKnobs: jobsKnobs, + SQLExecutor: &sql.ExecutorTestingKnobs{ + BeforeExecute: func(ctx context.Context, stmt string) { + if stmt == migrations.WaitForJobStatement { + select { + case migrationWaitCh <- struct{}{}: + case <-ctx.Done(): + } + } + }, + }, + MigrationManager: &migration.TestingKnobs{ + ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { + return []clusterversion.ClusterVersion{ + endCV, + } + }, + RegistryOverride: func(cv clusterversion.ClusterVersion) (migration.Migration, bool) { + if cv.Equal(endCV) { + return migration.NewTenantMigration("testing", + endCV, + migrations.NoPrecondition, + migrationFunc, + ), true + } + panic("unexpected version") + }}, + }, + }, + } + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + defer cancel() + s := tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Build the expected table descriptor, inject it into the + // migration function, drop it, and then add the descriptor + // in the pre-migration state. + tdb.Exec(t, "CREATE DATABASE test") + tdb.Exec(t, createTableAfter) + var desc catalog.TableDescriptor + ie := s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutor + cf := s.CollectionFactory().(*descs.CollectionFactory) + require.NoError(t, cf.Txn(ctx, ie, s.DB(), func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + tn := tree.MakeTableNameWithSchema("test", "public", "test_table") + _, desc, err = descriptors.GetImmutableTableByName(ctx, txn, &tn, tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + Required: true, + AvoidLeased: true, + }, + }) + return err + })) + tdb.Exec(t, "DROP TABLE test.test_table") + tdb.Exec(t, createTableBefore) + expectedDescriptor.Store(desc) + enableUpdateEventCh.Set(true) + + // Run the migration, expecting failure. + t.Log("trying migration, expecting to fail") + // Channel to wait for the migration job to complete. + finishChan := make(chan struct{}) + go migrations.UpgradeToVersion( + t, sqlDB, endCV.Version, finishChan, true, /* expectError */ + ) + + var migJobID jobspb.JobID + // Intercept the target schema-change job and get migration-job's ID. + t.Log("intercepting the schema job") + for { + e := <-updateEventChan + // The migration job creates schema-change jobs. Therefore, we are guaranteed + // to get the migration-job's ID before canceling the job later on. + if e.orig.Payload.Type() == jobspb.TypeMigration { + migJobID = e.orig.ID + e.errChan <- nil + continue + } + schemaQuery := strings.Replace(e.orig.Payload.Description, "test.public.test_table", "test.test_table", -1) + testQuery := removeSpaces(test.query) + testQuery = strings.ReplaceAll(testQuery, ":::STRING", "") + if testQuery == schemaQuery { + // Intercepted the target schema-change. + schemaEvent = e + t.Logf("intercepted schema change job: %v", e.orig.ID) + break + } + // Ignore all other job updates. + e.errChan <- nil + } + // Cancel the migration job. + t.Log("canceling the migration job") + go cancelJob(t, ctx, s, migJobID) + + // Wait for the migration job to finish while preventing the intercepted + // schema-change job from progressing. + t.Log("waiting for the migration job to finish.") + testutils.SucceedsSoon(t, func() error { + for { + select { + case <-finishChan: + return nil + case e := <-updateEventChan: + e.errChan <- nil + default: + return errors.Errorf("waiting for the migration job to finish.") + } + } + }) + + // Let all jobs to continue until test's completion, except the intercepted + // schema-change job that we resume later on. + go func() { + for { + var e updateEvent + select { + case e = <-updateEventChan: + close(e.errChan) + case <-cancelCtx.Done(): + return + } + } + }() + + // Restart the migration job. + t.Log("retrying migration, expecting to succeed") + go migrations.UpgradeToVersion(t, sqlDB, endCV.Version, finishChan, false /* expectError */) + + // Wait until the new migration job observes an existing mutation job. + if test.waitForMigrationRestart { + t.Log("waiting for the migration job to observe a mutation") + <-migrationWaitCh + } + + t.Log("resuming the schema change job") + // If configured so, mark the schema-change job to cancel. + if test.cancelSchemaJob { + cancelJob(t, ctx, s, schemaEvent.orig.ID) + } + // Resume the schema-change job and all other jobs. + schemaEvent.errChan <- nil + + // If canceled the job, wait for the job to finish. + if test.cancelSchemaJob { + t.Log("waiting for the schema job to reach the cancel status") + waitUntilState(t, tdb, schemaEvent.orig.ID, jobs.StatusCanceled) + } + // Ensure all migrations complete. + go func() { + for { + select { + case <-migrationWaitCh: + case <-cancelCtx.Done(): + return + } + } + }() + + // Wait for the migration to complete, expecting success. + t.Logf("waiting for the new migration job to complete.") + testutils.SucceedsSoon(t, func() error { + select { + case <-finishChan: + return nil + default: + } + return errors.Errorf("waiting for the migration job to finish.") + }) + if test.waitForMigrationRestart { + // Ensure that we have observed the expected number of ignored schema change jobs. + log.Flush() + entries, err := log.FetchEntriesFromFiles( + 0, math.MaxInt64, 10000, + regexp.MustCompile("skipping.*operation as the schema change already exists."), + log.WithFlattenedSensitiveData, + ) + require.NoError(t, err) + require.Len(t, entries, test.expectedSkipped) + } + }) + } +} + +// cancelJob marks the given job as cancel-requested, leading the job to be +// canceled. +func cancelJob( + t *testing.T, ctx context.Context, s serverutils.TestServerInterface, jobID jobspb.JobID, +) { + err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Using this way of canceling because the migration job us non-cancelable. + // Canceling in this way skips the check. + return s.JobRegistry().(*jobs.Registry).UpdateJobWithTxn( + ctx, jobID, txn, false /* useReadLock */, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + ju.UpdateStatus(jobs.StatusCancelRequested) + return nil + }) + }) + assert.NoError(t, err) +} + +// waitUntilState waits until the specified job reaches to given state. +func waitUntilState( + t *testing.T, tdb *sqlutils.SQLRunner, jobID jobspb.JobID, expectedStatus jobs.Status, +) { + testutils.SucceedsSoon(t, func() error { + var status jobs.Status + tdb.QueryRow(t, + "SELECT status FROM system.jobs WHERE id = $1", jobID, + ).Scan(&status) + if status == expectedStatus { + return nil + } + return errors.Errorf( + "waiting for job %v to reach status %v, current status is %v", + jobID, expectedStatus, status) + }) +} + +func removeSpaces(stmt string) string { + stmt = strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(stmt, " ")) + stmt = strings.ReplaceAll(stmt, "( ", "(") + stmt = strings.ReplaceAll(stmt, " )", ")") + return stmt +} diff --git a/pkg/migration/migrations/schema_changes_helpers_test.go b/pkg/migration/migrations/schema_changes_helpers_test.go new file mode 100644 index 000000000000..46a183e4a8ec --- /dev/null +++ b/pkg/migration/migrations/schema_changes_helpers_test.go @@ -0,0 +1,78 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "context" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" +) + +const ( + // TestingAddColsQuery is used by TestMigrationWithFailures. + TestingAddColsQuery = ` +ALTER TABLE test.test_table + ADD COLUMN num_runs INT8 FAMILY claim, + ADD COLUMN last_run TIMESTAMP FAMILY claim` + + // TestingAddIndexQuery is used by TestMigrationWithFailures. + TestingAddIndexQuery = ` +CREATE INDEX jobs_run_stats_idx + ON test.test_table (claim_session_id, status, created) + STORING (last_run, num_runs, claim_instance_id) + WHERE ` + systemschema.JobsRunStatsIdxPredicate +) + +// MakeFakeMigrationForTestMigrationWithFailures makes the migration function +// used in the +func MakeFakeMigrationForTestMigrationWithFailures() ( + m migration.TenantMigrationFunc, + expectedTableDescriptor *atomic.Value, +) { + expectedTableDescriptor = &atomic.Value{} + return func( + ctx context.Context, cs clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job, + ) error { + row, err := d.InternalExecutor.QueryRow(ctx, "look-up-id", nil, /* txn */ + `select id from system.namespace where name = $1`, "test_table") + if err != nil { + return err + } + tableID := descpb.ID(tree.MustBeDInt(row[0])) + for _, op := range []operation{ + { + name: "jobs-add-columns", + schemaList: []string{"num_runs", "last_run"}, + query: TestingAddColsQuery, + schemaExistsFn: hasColumn, + }, + { + name: "jobs-add-index", + schemaList: []string{"jobs_run_stats_idx"}, + query: TestingAddIndexQuery, + schemaExistsFn: hasIndex, + }, + } { + expected := expectedTableDescriptor.Load().(catalog.TableDescriptor) + if err := migrateTable(ctx, cs, d, op, tableID, expected); err != nil { + return err + } + } + return nil + }, expectedTableDescriptor +}