Skip to content

Commit

Permalink
Merge branch 'master' into partition_ext
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored Mar 3, 2021
2 parents 9a131f6 + 93c1779 commit 6ac85fc
Show file tree
Hide file tree
Showing 40 changed files with 681 additions and 155 deletions.
32 changes: 22 additions & 10 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ import (
func TestT(t *testing.T) {
CustomVerboseFlag = true
logLevel := os.Getenv("log_level")
logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false))
err := logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false))
if err != nil {
t.Fatal(err)
}
autoid.SetStep(5000)
TestingT(t)
}
Expand Down Expand Up @@ -375,9 +378,11 @@ func (s *testSuite) TestGlobalBinding(c *C) {
}

pb := &dto.Metric{}
metrics.BindTotalGauge.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
err = metrics.BindTotalGauge.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, float64(1))
metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
err = metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, testSQL.memoryUsage)

sql, hash := normalizeWithDefaultDB(c, testSQL.querySQL, "test")
Expand Down Expand Up @@ -432,9 +437,11 @@ func (s *testSuite) TestGlobalBinding(c *C) {
bindData = s.domain.BindHandle().GetBindRecord(hash, sql, "test")
c.Check(bindData, IsNil)

metrics.BindTotalGauge.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
err = metrics.BindTotalGauge.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, float64(0))
metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
err = metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeGlobal, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
// From newly created global bind handle.
c.Assert(pb.GetGauge().GetValue(), Equals, testSQL.memoryUsage)

Expand Down Expand Up @@ -482,9 +489,11 @@ func (s *testSuite) TestSessionBinding(c *C) {
}

pb := &dto.Metric{}
metrics.BindTotalGauge.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
err = metrics.BindTotalGauge.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, float64(1))
metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
err = metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, testSQL.memoryUsage)

handle := tk.Se.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
Expand Down Expand Up @@ -530,9 +539,11 @@ func (s *testSuite) TestSessionBinding(c *C) {
c.Check(bindData.OriginalSQL, Equals, testSQL.originSQL)
c.Check(len(bindData.Bindings), Equals, 0)

metrics.BindTotalGauge.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
err = metrics.BindTotalGauge.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, float64(0))
metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
err = metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeSession, bindinfo.Using).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetGauge().GetValue(), Equals, float64(0))
}
}
Expand All @@ -554,7 +565,8 @@ func (s *testSuite) TestGlobalAndSessionBindingBothExist(c *C) {
metrics.BindUsageCounter.Reset()
c.Assert(tk.HasPlan("SELECT * from t1,t2 where t1.id = t2.id", "MergeJoin"), IsTrue)
pb := &dto.Metric{}
metrics.BindUsageCounter.WithLabelValues(metrics.ScopeGlobal).Write(pb)
err := metrics.BindUsageCounter.WithLabelValues(metrics.ScopeGlobal).Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetCounter().GetValue(), Equals, float64(1))

// Test 'tidb_use_plan_baselines'
Expand Down
22 changes: 17 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand All @@ -54,7 +55,9 @@ const (
currentVersion = 1
// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
DDLOwnerKey = "/tidb/ddl/fg/owner"
ddlPrompt = "ddl"
// addingDDLJobPrefix is the path prefix used to record the newly added DDL job, and it's saved to etcd.
addingDDLJobPrefix = "/tidb/ddl/add_ddl_job_"
ddlPrompt = "ddl"

shardRowIDBitsMax = 15

Expand Down Expand Up @@ -200,6 +203,7 @@ type ddlCtx struct {
infoHandle *infoschema.Handle
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client

// hook may be modified.
mu struct {
Expand Down Expand Up @@ -286,6 +290,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
binlogCli: binloginfo.GetPumpsClient(),
infoHandle: opt.InfoHandle,
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
}
ddlCtx.mu.hook = opt.Hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
Expand Down Expand Up @@ -481,16 +486,23 @@ func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
}
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
func (d *ddl) asyncNotifyWorker(job *model.Job) {
// If the workers don't run, we needn't to notify workers.
if !RunWorker {
return
}

var worker *worker
jobTp := job.Type
if jobTp == model.ActionAddIndex || jobTp == model.ActionAddPrimaryKey {
asyncNotify(d.workers[addIdxWorker].ddlJobCh)
worker = d.workers[addIdxWorker]
} else {
asyncNotify(d.workers[generalWorker].ddlJobCh)
worker = d.workers[generalWorker]
}
if d.ownerManager.IsOwner() {
asyncNotify(worker.ddlJobCh)
} else {
d.asyncNotifyByEtcd(worker.addingDDLJobKey, job)
}
}

Expand Down Expand Up @@ -519,7 +531,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
d.asyncNotifyWorker(job.Type)
d.asyncNotifyWorker(job)
logutil.BgLogger().Info("[ddl] start DDL job", zap.String("job", job.String()), zap.String("query", job.Query))

var historyJob *model.Job
Expand Down
46 changes: 41 additions & 5 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl
import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -74,11 +76,12 @@ const (
// worker is used for handling DDL jobs.
// Now we have two kinds of workers.
type worker struct {
id int32
tp workerType
ddlJobCh chan struct{}
ctx context.Context
wg sync.WaitGroup
id int32
tp workerType
addingDDLJobKey string
ddlJobCh chan struct{}
ctx context.Context
wg sync.WaitGroup

sessPool *sessionPool // sessPool is used to new sessions to execute SQL in ddl package.
reorgCtx *reorgCtx // reorgCtx is used for reorganization.
Expand All @@ -97,6 +100,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan
delRangeManager: delRangeMgr,
}

worker.addingDDLJobKey = addingDDLJobPrefix + worker.typeStr()
worker.logCtx = logutil.WithKeyValue(context.Background(), "worker", worker.String())
return worker
}
Expand Down Expand Up @@ -142,23 +146,55 @@ func (w *worker) start(d *ddlCtx) {

ticker := time.NewTicker(checkTime)
defer ticker.Stop()
var notifyDDLJobByEtcdCh clientv3.WatchChan
if d.etcdCli != nil {
notifyDDLJobByEtcdCh = d.etcdCli.Watch(context.Background(), w.addingDDLJobKey)
}

rewatchCnt := 0
for {
ok := true
select {
case <-ticker.C:
logutil.Logger(w.logCtx).Debug("[ddl] wait to check DDL status again", zap.Duration("interval", checkTime))
case <-w.ddlJobCh:
case _, ok = <-notifyDDLJobByEtcdCh:
case <-w.ctx.Done():
return
}

if !ok {
logutil.Logger(w.logCtx).Warn("[ddl] start worker watch channel closed", zap.String("watch key", w.addingDDLJobKey))
notifyDDLJobByEtcdCh = d.etcdCli.Watch(context.Background(), w.addingDDLJobKey)
rewatchCnt++
if rewatchCnt > 10 {
time.Sleep(time.Duration(rewatchCnt) * time.Second)
}
continue
}

rewatchCnt = 0
err := w.handleDDLJobQueue(d)
if err != nil {
logutil.Logger(w.logCtx).Warn("[ddl] handle DDL job failed", zap.Error(err))
}
}
}

func (d *ddl) asyncNotifyByEtcd(addingDDLJobKey string, job *model.Job) {
if d.etcdCli == nil {
return
}

jobID := strconv.FormatInt(job.ID, 10)
timeStart := time.Now()
err := util.PutKVToEtcd(d.ctx, d.etcdCli, 1, addingDDLJobKey, jobID)
if err != nil {
logutil.BgLogger().Info("[ddl] notify handling DDL job failed", zap.String("jobID", jobID), zap.Error(err))
}
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerNotifyDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}

func asyncNotify(ch chan struct{}) {
select {
case ch <- struct{}{}:
Expand Down
74 changes: 74 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,80 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
c.Assert(d1.GetLease(), Equals, testLease)
}

func (s *testDDLSuite) TestNotifyDDLJob(c *C) {
store := testCreateStore(c, "test_notify_job")
defer store.Close()

getFirstNotificationAfterStartDDL := func(d *ddl) {
select {
case <-d.workers[addIdxWorker].ddlJobCh:
default:
// The notification may be received by the worker.
}
select {
case <-d.workers[generalWorker].ddlJobCh:
default:
// The notification may be received by the worker.
}
}

d := testNewDDLAndStart(
context.Background(),
c,
WithStore(store),
WithLease(testLease),
)
defer d.Stop()
getFirstNotificationAfterStartDDL(d)

job := &model.Job{
SchemaID: 1,
TableID: 2,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}
// Test the notification mechanism of the owner and the server receiving the DDL request on the same TiDB.
// This DDL request is a general DDL job.
d.asyncNotifyWorker(job)
select {
case <-d.workers[generalWorker].ddlJobCh:
default:
c.Fatal("do not get the general job notification")
}
// Test the notification mechanism of the owner and the server receiving the DDL request on the same TiDB.
// This DDL request is a add index DDL job.
job.Type = model.ActionAddIndex
d.asyncNotifyWorker(job)
select {
case <-d.workers[addIdxWorker].ddlJobCh:
default:
c.Fatal("do not get the add index job notification")
}
// Test the notification mechanism that the owner and the server receiving the DDL request are not on the same TiDB.
// And the etcd client is nil.
d1 := testNewDDLAndStart(
context.Background(),
c,
WithStore(store),
WithLease(testLease),
)
getFirstNotificationAfterStartDDL(d1)
defer d1.Stop()
d1.ownerManager.RetireOwner()
d1.asyncNotifyWorker(job)
job.Type = model.ActionCreateTable
d1.asyncNotifyWorker(job)
testCheckOwner(c, d1, false)
select {
case <-d1.workers[addIdxWorker].ddlJobCh:
c.Fatal("should not get the add index job notification")
case <-d1.workers[generalWorker].ddlJobCh:
c.Fatal("should not get the general job notification")
default:
}
}

// testRunWorker tests no job is handled when the value of RunWorker is false.
func (s *testDDLSerialSuite) testRunWorker(c *C) {
store := testCreateStore(c, "test_run_worker")
Expand Down
5 changes: 4 additions & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ var _ = Suite(&testSuite{})
func TestT(t *testing.T) {
CustomVerboseFlag = true
logLevel := os.Getenv("log_level")
logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false))
err := logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false))
if err != nil {
t.Fatal(err)
}
TestingT(t)
}

Expand Down
Loading

0 comments on commit 6ac85fc

Please sign in to comment.