From aee3cfd4c1064c10b7781baa2e835d5fa185747b Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 6 Jun 2023 11:27:05 +0800 Subject: [PATCH 1/5] ddl, session: fix re-upgrade issues --- ddl/job_table.go | 8 +- session/bootstrap.go | 9 ++- .../bootstraptest/bootstrap_upgrade_test.go | 74 +++++++++++++++++++ session/mock_bootstrap.go | 24 +++++- 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/ddl/job_table.go b/ddl/job_table.go index 7a35122ad3bbe..1bd84869d9dfd 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -171,14 +171,14 @@ func hasSysDB(job *model.Job) bool { func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) { if d.stateSyncer.IsUpgradingState() { + if job.IsPaused() { + return false, nil + } // We need to turn the 'pausing' job to be 'paused' in ddl worker, // and stop the reorganization workers if job.IsPausing() || hasSysDB(job) { return true, nil } - if job.IsPaused() { - return false, nil - } var errs []error // During binary upgrade, pause all running DDL jobs errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID}) @@ -200,7 +200,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun return false, nil } - if job.IsPausedBySystem() && !hasSysDB(job) { + if job.IsPausedBySystem() { var errs []error errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID}) if len(errs) > 0 { diff --git a/session/bootstrap.go b/session/bootstrap.go index 565ce61a82afd..cede30954910c 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1167,7 +1167,8 @@ func upgrade(s Session) { } func syncUpgradeState(s Session) { - ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + totalInterval := time.Duration(internalSQLTimeout) * time.Second + ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval) defer cancelFunc() dom := domain.GetDomain(s) err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) @@ -1175,8 +1176,8 @@ func syncUpgradeState(s Session) { logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) } - retryTimes := 10 interval := 200 * time.Millisecond + retryTimes := int(totalInterval / interval) for i := 0; i < retryTimes; i++ { op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap") if err == nil && op.String() == owner.OpGetUpgradingState.String() { @@ -1185,7 +1186,9 @@ func syncUpgradeState(s Session) { if i == retryTimes-1 { logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) } - logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + if i%10 == 0 { + logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + } time.Sleep(interval) } diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 60a8954e2f9c8..bba20fe5462d0 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -362,6 +362,80 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { require.True(t, suc) } +func TestUpgradeVersionForSystemPausedJob(t *testing.T) { + // Mock a general and a reorg job in boostrap. + *session.WithMockUpgrade = true + session.MockUpgradeToVerLatestKind++ + + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion-1, ver) + + // Add a paused DDL job before upgrade. + session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)") + ch := make(chan struct{}) + var jobID int64 + hook := &callback.TestDDLCallback{} + hook.OnJobRunAfterExported = func(job *model.Job) { + if job.SchemaState == model.StateDeleteOnly { + se := session.CreateSessionAndSetID(t, store) + session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID)) + } + if job.State == model.JobStatePaused && jobID == 0 { + // Mock pause the ddl job by system. + job.AdminOperator = model.AdminCommandBySystem + ch <- struct{}{} + jobID = job.ID + } + } + dom.DDL().SetHook(hook) + go func() { + _, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int") + }() + + <-ch + dom.Close() + // Make sure upgrade is successful. + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + seLatestV := session.CreateSessionAndSetID(t, store) + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion+1, ver) + + sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID) + // Make sure the add index operation is successful. + suc := false + for i := 0; i < 20; i++ { + rows, err := execute(context.Background(), seLatestV, sql) + require.NoError(t, err) + require.Len(t, rows, 1) + require.Equal(t, rows[0].GetString(2), "upgrade_tbl") + + state := rows[0].GetString(11) + if state == "synced" { + suc = true + break + } + time.Sleep(time.Millisecond * 200) + } + require.True(t, suc) +} + func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query) diff --git a/session/mock_bootstrap.go b/session/mock_bootstrap.go index df7f4f582dd1f..623b4e0491931 100644 --- a/session/mock_bootstrap.go +++ b/session/mock_bootstrap.go @@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) { TestHook.OnBootstrapAfter(s) } +// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster). +func mockSimpleUpgradeToVerLatest(s Session, ver int64) { + logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer)) + if ver >= mockLatestVer { + return + } + mustExecute(s, "use mysql") + mustExecute(s, `create table if not exists mock_sys_t( + c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1) + );`) + mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1") + mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)") + TestHook.OnBootstrapAfter(s) +} + // TestHook is exported for testing. var TestHook = TestCallback{} @@ -140,13 +155,20 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 { return ver } +// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion. +var MockUpgradeToVerLatestKind = 1 + func addMockBootstrapVersionForTest(s Session) { if !*WithMockUpgrade { return } TestHook.OnBootstrapBefore(s) - bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) + if MockUpgradeToVerLatestKind == 1 { + bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) + } else { + bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest) + } currentBootstrapVersion++ } From 35a2e8581aedde316bd6e054a6232b692b7c9953 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 7 Jun 2023 17:27:54 +0800 Subject: [PATCH 2/5] session: update bazel --- session/bootstraptest/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session/bootstraptest/BUILD.bazel b/session/bootstraptest/BUILD.bazel index 71e5966c7bd74..cfce2f5ec0fcc 100644 --- a/session/bootstraptest/BUILD.bazel +++ b/session/bootstraptest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 9, + shard_count = 10, deps = [ "//config", "//ddl", From e50bf5c4d747f79ce5317dfd8f1e997f77b44141 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 9 Jun 2023 11:48:29 +0800 Subject: [PATCH 3/5] ddl, session: address comments and do tiny cleanup --- ddl/job_table.go | 9 +++------ session/bootstraptest/bootstrap_upgrade_test.go | 4 +++- session/mock_bootstrap.go | 10 ++++++++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/ddl/job_table.go b/ddl/job_table.go index e3b4336922d33..86544bb710cd5 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -551,8 +551,7 @@ func job2UniqueIDs(job *model.Job, schema bool) string { } func job2SchemaNames(job *model.Job) []string { - switch job.Type { - case model.ActionRenameTable: + if job.Type == model.ActionRenameTable { var oldSchemaID int64 var oldSchemaName model.CIStr var tableName model.CIStr @@ -562,11 +561,9 @@ func job2SchemaNames(job *model.Job) []string { names = append(names, strings.ToLower(job.SchemaName)) names = append(names, oldSchemaName.O) return names - case model.ActionRenameTables: - // TODO: Get this action's schema names. - case model.ActionExchangeTablePartition: - // TODO: Get this action's schema names. } + // TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names. + return []string{job.SchemaName} } diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index a782d49c17649..7b537ba69ae06 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -363,10 +363,12 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { require.True(t, suc) } +// TestUpgradeVersionForSystemPausedJob tests mock the first upgrade failed, and it has a mock system DDL in queue. +// Then we do re-upgrade(This operation will pause all DDL jobs by the system). func TestUpgradeVersionForSystemPausedJob(t *testing.T) { // Mock a general and a reorg job in boostrap. *session.WithMockUpgrade = true - session.MockUpgradeToVerLatestKind++ + session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest store, dom := session.CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() diff --git a/session/mock_bootstrap.go b/session/mock_bootstrap.go index 623b4e0491931..c7106710ac4fb 100644 --- a/session/mock_bootstrap.go +++ b/session/mock_bootstrap.go @@ -155,8 +155,14 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 { return ver } +const ( + defaultMockUpgradeToVerLatest = 0 + // MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion. + MockSimpleUpgradeToVerLatest = 1 +) + // MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion. -var MockUpgradeToVerLatestKind = 1 +var MockUpgradeToVerLatestKind = defaultMockUpgradeToVerLatest func addMockBootstrapVersionForTest(s Session) { if !*WithMockUpgrade { @@ -164,7 +170,7 @@ func addMockBootstrapVersionForTest(s Session) { } TestHook.OnBootstrapBefore(s) - if MockUpgradeToVerLatestKind == 1 { + if MockUpgradeToVerLatestKind == defaultMockUpgradeToVerLatest { bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) } else { bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest) From 1291ed3c6bf496c5463392e36c1d0b50c8067e11 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 9 Jun 2023 13:09:34 +0800 Subject: [PATCH 4/5] session: address a comment --- .../bootstraptest/bootstrap_upgrade_test.go | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 7b537ba69ae06..760342d01edd9 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -344,11 +344,15 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { // Resume the DDL job, then add index operation can be executed successfully. session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID)) + checkDDLJobExecSucc(t, seLatestV, jobID) +} + +// checkDDLJobExecSucc is used to make sure the DDL operation is successful. +func checkDDLJobExecSucc(t *testing.T, se session.Session, jobID int64) { sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID) - // Make sure the add index operation is successful. suc := false for i := 0; i < 20; i++ { - rows, err := execute(context.Background(), seLatestV, sql) + rows, err := execute(context.Background(), se, sql) require.NoError(t, err) require.Len(t, rows, 1) require.Equal(t, rows[0].GetString(2), "upgrade_tbl") @@ -420,23 +424,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion+1, ver) - sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID) - // Make sure the add index operation is successful. - suc := false - for i := 0; i < 20; i++ { - rows, err := execute(context.Background(), seLatestV, sql) - require.NoError(t, err) - require.Len(t, rows, 1) - require.Equal(t, rows[0].GetString(2), "upgrade_tbl") - - state := rows[0].GetString(11) - if state == "synced" { - suc = true - break - } - time.Sleep(time.Millisecond * 200) - } - require.True(t, suc) + checkDDLJobExecSucc(t, seLatestV, jobID) } func TestUpgradeVersionForResumeJob(t *testing.T) { From 1e531f1d186114fddce17c972cd2501056ca32aa Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 12 Jun 2023 15:26:41 +0800 Subject: [PATCH 5/5] session: add a comment --- session/mock_bootstrap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session/mock_bootstrap.go b/session/mock_bootstrap.go index c7106710ac4fb..cfad1f8fa24e4 100644 --- a/session/mock_bootstrap.go +++ b/session/mock_bootstrap.go @@ -157,7 +157,7 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 { const ( defaultMockUpgradeToVerLatest = 0 - // MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion. + // MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion, this is just a few simple DDL operations. MockSimpleUpgradeToVerLatest = 1 )