Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, session: fix re-upgrade issues #44469

Merged
merged 6 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ func hasSysDB(job *model.Job) bool {

func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) {
if d.stateSyncer.IsUpgradingState() {
if job.IsPaused() {
return false, nil
}
// We need to turn the 'pausing' job to be 'paused' in ddl worker,
// and stop the reorganization workers
if job.IsPausing() || hasSysDB(job) {
return true, nil
}
if job.IsPaused() {
return false, nil
}
var errs []error
// During binary upgrade, pause all running DDL jobs
errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID})
Expand All @@ -200,7 +200,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
return false, nil
}

if job.IsPausedBySystem() && !hasSysDB(job) {
if job.IsPausedBySystem() {
var errs []error
errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID})
if len(errs) > 0 && errs[0] != nil {
Expand Down
9 changes: 6 additions & 3 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,16 +1168,17 @@ func upgrade(s Session) {
}

func syncUpgradeState(s Session) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
totalInterval := time.Duration(internalSQLTimeout) * time.Second
ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval)
defer cancelFunc()
dom := domain.GetDomain(s)
err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
}

retryTimes := 10
interval := 200 * time.Millisecond
retryTimes := int(totalInterval / interval)
for i := 0; i < retryTimes; i++ {
op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap")
if err == nil && op.String() == owner.OpGetUpgradingState.String() {
Expand All @@ -1186,7 +1187,9 @@ func syncUpgradeState(s Session) {
if i == retryTimes-1 {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
}
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
if i%10 == 0 {
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
}
time.Sleep(interval)
}

Expand Down
2 changes: 1 addition & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//config",
"//ddl",
Expand Down
74 changes: 74 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,80 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
require.True(t, suc)
}

func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
// Mock a general and a reorg job in boostrap.
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind++

store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1)
require.NoError(t, err)
err = txn.Commit(context.Background())
Copy link
Contributor

@dhysum dhysum Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use the session but not txn here, or another session? It would be better use the same mechamism in certain routing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because this method of FinishBootstrap call requires txn.

require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)

// Add a paused DDL job before upgrade.
session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)")
ch := make(chan struct{})
var jobID int64
hook := &callback.TestDDLCallback{}
hook.OnJobRunAfterExported = func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
se := session.CreateSessionAndSetID(t, store)
session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID))
}
if job.State == model.JobStatePaused && jobID == 0 {
// Mock pause the ddl job by system.
job.AdminOperator = model.AdminCommandBySystem
Copy link
Contributor

@dhysum dhysum Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like that would be better to do the real upgrade but not mock since it will be more like user's action.

And, the mock here normally seems being a litlle bit complex since there are cases that we may not know what will be going to happen in the future because the behavior of 'job.AdminOperator' may be changed by its module. In another word, the case here involves the inside-logic of 'admin pause' which may introduce unknow error in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the test should fail if the internal implementation changes, so just modify the test. And the way I've come up with mocks so far is this. Do you have any other suggestions?

ch <- struct{}{}
jobID = job.ID
}
}
dom.DDL().SetHook(hook)
go func() {
_, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int")
}()

<-ch
dom.Close()
// Make sure upgrade is successful.
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID)
// Make sure the add index operation is successful.
suc := false
for i := 0; i < 20; i++ {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
rows, err := execute(context.Background(), seLatestV, sql)
require.NoError(t, err)
require.Len(t, rows, 1)
require.Equal(t, rows[0].GetString(2), "upgrade_tbl")

state := rows[0].GetString(11)
if state == "synced" {
suc = true
break
}
time.Sleep(time.Millisecond * 200)
}
require.True(t, suc)
}

func TestUpgradeVersionForResumeJob(t *testing.T) {
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
Expand Down
24 changes: 23 additions & 1 deletion session/mock_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) {
TestHook.OnBootstrapAfter(s)
}

// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster).
func mockSimpleUpgradeToVerLatest(s Session, ver int64) {
logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer))
if ver >= mockLatestVer {
return
}
mustExecute(s, "use mysql")
mustExecute(s, `create table if not exists mock_sys_t(
c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better that different column is with different kind of type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a simple test, hopefully a system DDL job. I don't feel the need to consider these types.

);`)
mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1")
mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)")
TestHook.OnBootstrapAfter(s)
}

// TestHook is exported for testing.
var TestHook = TestCallback{}

Expand Down Expand Up @@ -140,13 +155,20 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 {
return ver
}

// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion.
var MockUpgradeToVerLatestKind = 1
zimulala marked this conversation as resolved.
Show resolved Hide resolved

func addMockBootstrapVersionForTest(s Session) {
if !*WithMockUpgrade {
return
}

TestHook.OnBootstrapBefore(s)
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
if MockUpgradeToVerLatestKind == 1 {
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
} else {
bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest)
}
currentBootstrapVersion++
}

Expand Down