Skip to content

Commit

Permalink
ddl/domain: disallow set schema lease to 0 (#55312)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Aug 12, 2024
1 parent d6dda3d commit 3708722
Show file tree
Hide file tree
Showing 37 changed files with 92 additions and 151 deletions.
1 change: 0 additions & 1 deletion br/pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func NewCluster() (*Cluster, error) {
}
cluster.Storage = storage

session.SetSchemaLease(0)
session.DisableStats4Test()
dom, err := session.BootstrapSession(storage)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,15 @@ func setGlobalVars() {

util.SetGOGC(cfg.Performance.GOGC)

ddlLeaseDuration := parseDuration(cfg.Lease)
session.SetSchemaLease(ddlLeaseDuration)
schemaLeaseDuration := parseDuration(cfg.Lease)
if schemaLeaseDuration <= 0 {
// previous version allow set schema lease to 0, and mainly used on
// uni-store and for test, to be compatible we set it to default value here.
log.Warn("schema lease is invalid, use default value",
zap.String("lease", schemaLeaseDuration.String()))
schemaLeaseDuration = config.DefSchemaLease
}
session.SetSchemaLease(schemaLeaseDuration)
statsLeaseDuration := parseDuration(cfg.Performance.StatsLease)
session.SetStatsLease(statsLeaseDuration)
planReplayerGCLease := parseDuration(cfg.Performance.PlanReplayerGCLease)
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ const (
// EnvVarKeyspaceName is the system env name for keyspace name.
EnvVarKeyspaceName = "KEYSPACE_NAME"
// MaxTokenLimit is the max token limit value.
MaxTokenLimit = 1024 * 1024
MaxTokenLimit = 1024 * 1024
DefSchemaLease = 45 * time.Second
)

// Valid config maps
Expand Down Expand Up @@ -912,7 +913,7 @@ var defaultConf = Config{
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
Lease: DefSchemaLease.String(),
TokenLimit: 1000,
OOMUseTmpStorage: true,
TempDir: DefTempDir,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func TestSnapshotVersion(t *testing.T) {

dd := dom.DDL()
ddl.DisableTiFlashPoll(dd)
require.Equal(t, dbTestLease, dd.GetLease())
require.Equal(t, dbTestLease, dom.GetSchemaLease())

snapTS := oracle.GoTimeToTS(time.Now())
tk.MustExec("create database test2")
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestSchemaValidator(t *testing.T) {

dd := dom.DDL()
ddl.DisableTiFlashPoll(dd)
require.Equal(t, dbTestLease, dd.GetLease())
require.Equal(t, dbTestLease, dom.GetSchemaLease())

tk.MustExec("create table test.t(a int)")

Expand Down
32 changes: 13 additions & 19 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ type DDL interface {
// Start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
Start(ctxPool *pools.ResourcePool) error
// GetLease returns current schema lease time.
GetLease() time.Duration
// Stats returns the DDL statistics.
Stats(vars *variable.SessionVars) (map[string]any, error)
// GetScope gets the status variables scope.
Expand Down Expand Up @@ -573,14 +571,6 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {
// give up notify and log it.
func asyncNotifyEvent(d *ddlCtx, e *statsutil.DDLEvent) {
if d.ddlEventCh != nil {
if d.lease == 0 {
// If lease is 0, it's always used in test.
select {
case d.ddlEventCh <- e:
default:
}
return
}
for i := 0; i < 10; i++ {
select {
case d.ddlEventCh <- e:
Expand Down Expand Up @@ -853,12 +843,6 @@ func (d *ddl) close() {
logutil.DDLLogger().Info("DDL closed", zap.String("ID", d.uuid), zap.Duration("take time", time.Since(startTime)))
}

// GetLease implements DDL.GetLease interface.
func (d *ddl) GetLease() time.Duration {
lease := d.lease
return lease
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() syncer.SchemaSyncer {
return d.schemaSyncer
Expand Down Expand Up @@ -1021,9 +1005,19 @@ type RecoverSchemaInfo struct {
// This provides a safe window for async commit and 1PC to commit with an old schema.
func delayForAsyncCommit() {
if variable.EnableMDL.Load() {
// If metadata lock is enabled. The transaction of DDL must begin after prewrite of the async commit transaction,
// then the commit ts of DDL must be greater than the async commit transaction. In this case, the corresponding schema of the async commit transaction
// is correct. But if metadata lock is disabled, we can't ensure that the corresponding schema of the async commit transaction isn't change.
// If metadata lock is enabled. The transaction of DDL must begin after
// pre-write of the async commit transaction, then the commit ts of DDL
// must be greater than the async commit transaction. In this case, the
// corresponding schema of the async commit transaction is correct.
// suppose we're adding index:
// - schema state -> StateWriteOnly with version V
// - some txn T started using async commit and version V,
// and T do pre-write before or after V+1
// - schema state -> StateWriteReorganization with version V+1
// - T commit finish, with TS
// - 'wait schema synced' finish
// - schema state -> Done with version V+2, commit-ts of this
// transaction must > TS, so it's safe for T to commit.
return
}
cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ func runReorgJobAndHandleErr(
if err != nil {
return false, ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, allIndexInfos[0].Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestCheckOwner(t *testing.T) {

time.Sleep(testLease)
require.Equal(t, dom.DDL().OwnerManager().IsOwner(), true)
require.Equal(t, dom.DDL().GetLease(), testLease)
require.Equal(t, dom.GetSchemaLease(), testLease)
}

func TestInvalidDDLJob(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,8 @@ func (w *worker) doModifyColumn(
if !mysql.HasNotNullFlag(oldCol.GetFlag()) && mysql.HasNotNullFlag(newCol.GetFlag()) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.GetFlag())

// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
// We need to check after the flag is set
if d.lease > 0 && !noPreventNullFlag {
if !noPreventNullFlag {
delayForAsyncCommit()
}

Expand Down Expand Up @@ -588,7 +587,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
Expand Down
10 changes: 4 additions & 6 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2233,7 +2233,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
// and then run the reorg next time.
return ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition",
func() {
dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic")
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func (w *worker) onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
// and then run the reorg next time.
return ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition",
func() {
dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic")
Expand Down Expand Up @@ -2702,9 +2702,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
// partition to be exchange with.
// So we need to rollback that change, instead of just cancelling.

if d.lease > 0 {
delayForAsyncCommit()
}
delayForAsyncCommit()

if defID != partDef.ID {
// Should never happen, should have been updated above, in previous state!
Expand Down Expand Up @@ -3451,7 +3449,7 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, partTbl, physTblIDs, elements)
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (reorgErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (reorgErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork",
func() {
reorgErr = dbterror.ErrCancelledDDLJob.GenWithStack("reorganize partition for table `%v` panic", tbl.Meta().Name)
Expand Down
23 changes: 4 additions & 19 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ func newReorgSessCtx(store kv.Storage) sessionctx.Context {
return c
}

const defaultWaitReorgTimeout = 10 * time.Second

// ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage.
// make it a var for testing.
var ReorgWaitTimeout = 5 * time.Second

func (rc *reorgCtx) notifyJobState(state model.JobState) {
Expand Down Expand Up @@ -226,7 +225,6 @@ func (rc *reorgCtx) getRowCount() int64 {
func (w *worker) runReorgJob(
reorgInfo *reorgInfo,
tblInfo *model.TableInfo,
lease time.Duration,
reorgFn func() error,
) error {
job := reorgInfo.Job
Expand Down Expand Up @@ -268,16 +266,7 @@ func (w *worker) runReorgJob(
}()
}

waitTimeout := defaultWaitReorgTimeout
// if lease is 0, we are using a local storage,
// and we can wait the reorganization to be done here.
// if lease > 0, we don't need to wait here because
// we should update some job's progress context and try checking again,
// so we use a very little timeout here.
if lease > 0 {
waitTimeout = ReorgWaitTimeout
}

waitTimeout := ReorgWaitTimeout
// wait reorganization job done or timeout
select {
case res := <-rc.doneCh:
Expand Down Expand Up @@ -744,9 +733,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
})

info.first = true
if d.lease > 0 { // Only delay when it's not in test.
delayForAsyncCommit()
}
delayForAsyncCommit()
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -837,9 +824,7 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo
)
if job.SnapshotVer == 0 {
info.first = true
if d.lease > 0 { // Only delay when it's not in test.
delayForAsyncCommit()
}
delayForAsyncCommit()
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) {
newDDL, newDDLExecutor := ddl.NewDDL(context.Background(),
ddl.WithStore(d.Store()),
ddl.WithInfoCache(d.InfoCache()),
ddl.WithLease(d.DDL().GetLease()),
ddl.WithLease(d.GetSchemaLease()),
ddl.WithSchemaLoader(d),
)
d.SetDDL(newDDL, newDDLExecutor)
Expand Down Expand Up @@ -92,7 +92,7 @@ func testRunInterruptedJob(t *testing.T, store kv.Storage, d *domain.Domain, job
done := make(chan error, 1)
go runInterruptedJob(t, store, d.DDLExecutor(), job, done)

ticker := time.NewTicker(d.DDL().GetLease())
ticker := time.NewTicker(d.GetSchemaLease())
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestStat(t *testing.T) {
done := make(chan error, 1)
go runInterruptedJob(t, store, dom.DDLExecutor(), job, done)

ticker := time.NewTicker(dom.DDL().GetLease() * 1)
ticker := time.NewTicker(dom.GetSchemaLease() * 1)
defer ticker.Stop()
ver := getDDLSchemaVer(t, dom.DDL())
LOOP:
Expand Down
6 changes: 0 additions & 6 deletions pkg/ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl"
Expand Down Expand Up @@ -490,11 +489,6 @@ func (d *Checker) Start(ctxPool *pools.ResourcePool) error {
return d.realDDL.Start(ctxPool)
}

// GetLease implements the DDL interface.
func (d *Checker) GetLease() time.Duration {
return d.realDDL.GetLease()
}

// Stats implements the DDL interface.
func (d *Checker) Stats(vars *variable.SessionVars) (map[string]any, error) {
return d.realDDL.Stats(vars)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/indexmerge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestAddIndexMergeProcess(t *testing.T) {
}

func TestAddPrimaryKeyMergeProcess(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 0)
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, time.Second)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/tests/tiflash/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func createTiFlashContext(t *testing.T) (*tiflashContext, func()) {
)

require.NoError(t, err)
session.SetSchemaLease(0)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
infosync.SetMockTiFlash(s.tiflash)
Expand Down
Loading

0 comments on commit 3708722

Please sign in to comment.