Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamic adjust add index worker number. #8295

Merged
merged 28 commits into from
Dec 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a5c7024
ddl: dynamic adjust add index worker number init
crazycs520 Nov 13, 2018
00ad984
shrink worker num
crazycs520 Nov 14, 2018
f936c59
add index
crazycs520 Nov 14, 2018
c7fd5ee
refine test
crazycs520 Nov 14, 2018
86d6b99
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 14, 2018
b1cbd7c
refine test
crazycs520 Nov 14, 2018
fe27282
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 15, 2018
4545ebc
shrink worker num if regions is less then worker num
crazycs520 Nov 19, 2018
e05f9b7
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 19, 2018
3033df1
add comment
crazycs520 Nov 20, 2018
d11e016
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 5, 2018
fb85c9b
add test to check change add index worker num take effect.
crazycs520 Dec 5, 2018
a2bdc3e
refine test
crazycs520 Dec 5, 2018
f2aa994
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 6, 2018
73cb88b
add log
crazycs520 Dec 7, 2018
cd97603
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 7, 2018
d6c57b0
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 12, 2018
c594dda
address comment
crazycs520 Dec 17, 2018
8d90b29
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 17, 2018
216c58f
fix test
crazycs520 Dec 17, 2018
451a9da
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
77f49bd
use gofail test
crazycs520 Dec 21, 2018
dfa3e4d
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
d60f065
refine gofail test and remove hook
crazycs520 Dec 21, 2018
50470ad
refine test
crazycs520 Dec 21, 2018
fc6674e
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
8b2ab52
add comment
crazycs520 Dec 22, 2018
34eafec
Merge branch 'master' into adjust-add-index-worker
crazycs520 Dec 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/model"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -952,7 +953,7 @@ func (s *testIntegrationSuite) TestPartitionDropIndex(c *C) {
}
c.Assert(idx1, NotNil)

sessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done)
testutil.SessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done)
ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
Expand Down
48 changes: 8 additions & 40 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
testddlutil "github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -608,7 +609,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)
testddlutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

deletedKeys := make(map[int]struct{})

Expand Down Expand Up @@ -726,7 +727,6 @@ LOOP:
delete(handles, h)
}
c.Assert(handles, HasLen, 0)

s.tk.MustExec("drop table test_add_index")
}

Expand Down Expand Up @@ -754,7 +754,7 @@ func (s *testDBSuite) TestDropIndex(c *C) {
}
c.Assert(c3idx, NotNil)

sessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done)
testddlutil.SessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done)

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
Expand Down Expand Up @@ -916,38 +916,6 @@ func sessionExec(c *C, s kv.Storage, sql string) {
se.Close()
}

func sessionExecInGoroutine(c *C, s kv.Storage, sql string, done chan error) {
execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done)
}

func execMultiSQLInGoroutine(c *C, s kv.Storage, dbName string, multiSQL []string, done chan error) {
go func() {
se, err := session.CreateSession4Test(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute(context.Background(), "use "+dbName)
if err != nil {
done <- errors.Trace(err)
return
}
for _, sql := range multiSQL {
rs, err := se.Execute(context.Background(), sql)
if err != nil {
done <- errors.Trace(err)
return
}
if rs != nil {
done <- errors.Errorf("RecordSet should be empty.")
return
}
done <- nil
}
}()
}

func (s *testDBSuite) testAddColumn(c *C) {
done := make(chan error, 1)

Expand All @@ -957,7 +925,7 @@ func (s *testDBSuite) testAddColumn(c *C) {
s.mustExec(c, "insert into t2 values (?, ?, ?)", i, i, i)
}

sessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done)
testddlutil.SessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done)

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
Expand Down Expand Up @@ -1092,7 +1060,7 @@ func (s *testDBSuite) testDropColumn(c *C) {
}

// get c4 column id
sessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done)
testddlutil.SessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done)

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
Expand Down Expand Up @@ -1151,9 +1119,9 @@ func (s *testDBSuite) TestDropColumn(c *C) {
for i := 0; i < num/2; i++ {
multiDDL = append(multiDDL, "alter table t2 add column c4 int", "alter table t2 drop column c4")
}
execMultiSQLInGoroutine(c, s.store, "drop_col_db", multiDDL, ddlDone)
testddlutil.ExecMultiSQLInGoroutine(c, s.store, "drop_col_db", multiDDL, ddlDone)
for i := 0; i < num; i++ {
execMultiSQLInGoroutine(c, s.store, "drop_col_db", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone)
testddlutil.ExecMultiSQLInGoroutine(c, s.store, "drop_col_db", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone)
}
for i := 0; i < num; i++ {
select {
Expand Down Expand Up @@ -1524,7 +1492,7 @@ func (s *testDBSuite) TestAddNotNullColumn(c *C) {
s.tk.MustExec("create table tnn (c1 int primary key auto_increment, c2 int)")
s.tk.MustExec("insert tnn (c2) values (0)" + strings.Repeat(",(0)", 99))
done := make(chan error, 1)
sessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done)
testddlutil.SessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done)
updateCnt := 0
out:
for {
Expand Down
62 changes: 62 additions & 0 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@ import (
"fmt"
"math/rand"
"os"
"sync/atomic"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -311,3 +315,61 @@ func (s *testFailDBSuite) TestGenGlobalIDFail(c *C) {
tk.MustExec("admin check table t1")
tk.MustExec("admin check table t2")
}

func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_db")
tk.MustExec("use test_db")
tk.MustExec("drop table if exists test_add_index")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")

done := make(chan error, 1)
start := -10
num := 4096
// first add some rows
for i := start; i < num; i++ {
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
tk.MustExec(sql)
}

is := s.dom.InfoSchema()
schemaName := model.NewCIStr("test_db")
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

splitCount := 100
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)

originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := originDDLAddIndexWorkerCnt
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
ddl.TestCheckWorkerNumber = lastSetWorkerCnt
defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt)

gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum")

testutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)
checkNum := 0

LOOP:
for {
select {
case err = <-done:
if err == nil {
break LOOP
}
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ddl.TestCheckWorkerNumCh:
lastSetWorkerCnt = int32(rand.Intn(8) + 8)
tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
checkNum++
}
}
c.Assert(checkNum, Greater, 5)
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}
111 changes: 72 additions & 39 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
if task.endIncluded {
rightParenthesis = "]"
}
log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds())

return result
Expand Down Expand Up @@ -1046,34 +1046,12 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker
return nil, nil
}

// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle),
// and send these tasks to add index workers, till we finish adding the indices.
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error {
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}
var (
// TestCheckWorkerNumCh use for test adjust add index worker.
TestCheckWorkerNumCh = make(chan struct{}, 0)
// TestCheckWorkerNumber use for test adjust add index worker.
TestCheckWorkerNumber = int32(16)
)

// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
Expand All @@ -1092,6 +1070,9 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
Expand All @@ -1100,16 +1081,68 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
idxWorkers := make([]*addIndexWorker, 0, workerCnt)
defer func() {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
closeAddIndexWorkers(idxWorkers)
}()

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

// For dynamic adjust add index worker number.
workerCnt = variable.GetDDLReorgWorkerCounter()
// If only have 1 range, we can only start 1 worker.
if len(kvRanges) < int(workerCnt) {
workerCnt = int32(len(kvRanges))
}
// Enlarge the worker size.
for i := len(idxWorkers); i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker.priority = job.Priority
idxWorkers = append(idxWorkers, idxWorker)
go idxWorkers[i].run(reorgInfo.d)
}
// Shrink the worker size.
if len(idxWorkers) > int(workerCnt) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
workers := idxWorkers[workerCnt:]
idxWorkers = idxWorkers[:workerCnt]
closeAddIndexWorkers(workers)
}

// gofail: var checkIndexWorkerNum bool
// if checkIndexWorkerNum {
// num := int(atomic.LoadInt32(&TestCheckWorkerNumber))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need atomic.LoadInt3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use to avoid data race.

// if num != 0 {
// if num > len(kvRanges) {
// if len(idxWorkers) != len(kvRanges) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// } else if num != len(idxWorkers) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// TestCheckWorkerNumCh <- struct{}{}
// }
//}

log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}

// addTableIndex handles the add index reorganization state for a table.
Expand Down
Loading