Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamic adjust add index worker number. (#8295) #8786

Merged
merged 2 commits into from
Dec 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions ddl/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ package ddl_test

import (
"fmt"
"math/rand"
"sync/atomic"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand Down Expand Up @@ -238,3 +242,61 @@ func (s *testDBSuite) TestFailSchemaSyncer(c *C) {
_, err = tk.Exec("insert into t values(1)")
c.Assert(err, IsNil)
}

func (s *testDBSuite) TestAddIndexWorkerNum(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_db")
tk.MustExec("use test_db")
tk.MustExec("drop table if exists test_add_index")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")

done := make(chan error, 1)
start := -10
num := 4096
// first add some rows
for i := start; i < num; i++ {
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
tk.MustExec(sql)
}

is := s.dom.InfoSchema()
schemaName := model.NewCIStr("test_db")
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

splitCount := 100
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)

originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := originDDLAddIndexWorkerCnt
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
ddl.TestCheckWorkerNumber = lastSetWorkerCnt
defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt)

gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum")

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)
checkNum := 0

LOOP:
for {
select {
case err = <-done:
if err == nil {
break LOOP
}
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ddl.TestCheckWorkerNumCh:
lastSetWorkerCnt = int32(rand.Intn(8) + 8)
tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
checkNum++
}
}
c.Assert(checkNum, Greater, 5)
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}
111 changes: 72 additions & 39 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
if task.endIncluded {
rightParenthesis = "]"
}
log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds())

return result
Expand Down Expand Up @@ -1045,34 +1045,12 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker
return nil, nil
}

// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle),
// and send these tasks to add index workers, till we finish adding the indices.
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error {
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}
var (
// TestCheckWorkerNumCh use for test adjust add index worker.
TestCheckWorkerNumCh = make(chan struct{}, 0)
// TestCheckWorkerNumber use for test adjust add index worker.
TestCheckWorkerNumber = int32(16)
)

// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
Expand All @@ -1091,6 +1069,9 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
Expand All @@ -1099,16 +1080,68 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
idxWorkers := make([]*addIndexWorker, 0, workerCnt)
defer func() {
closeAddIndexWorkers(idxWorkers)
}()

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

// For dynamic adjust add index worker number.
workerCnt = variable.GetDDLReorgWorkerCounter()
// If only have 1 range, we can only start 1 worker.
if len(kvRanges) < int(workerCnt) {
workerCnt = int32(len(kvRanges))
}
// Enlarge the worker size.
for i := len(idxWorkers); i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker.priority = job.Priority
idxWorkers = append(idxWorkers, idxWorker)
go idxWorkers[i].run(reorgInfo.d)
}
// Shrink the worker size.
if len(idxWorkers) > int(workerCnt) {
workers := idxWorkers[workerCnt:]
idxWorkers = idxWorkers[:workerCnt]
closeAddIndexWorkers(workers)
}

// gofail: var checkIndexWorkerNum bool
// if checkIndexWorkerNum {
// num := int(atomic.LoadInt32(&TestCheckWorkerNumber))
// if num != 0 {
// if num > len(kvRanges) {
// if len(idxWorkers) != len(kvRanges) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// } else if num != len(idxWorkers) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// TestCheckWorkerNumCh <- struct{}{}
// }
//}

log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}

// addTableIndex handles the add index reorganization state for a table.
Expand Down