Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl/domain: disallow set schema lease to 0 #55312

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func NewCluster() (*Cluster, error) {
}
cluster.Storage = storage

session.SetSchemaLease(0)
session.DisableStats4Test()
dom, err := session.BootstrapSession(storage)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,15 @@ func setGlobalVars() {

util.SetGOGC(cfg.Performance.GOGC)

ddlLeaseDuration := parseDuration(cfg.Lease)
session.SetSchemaLease(ddlLeaseDuration)
schemaLeaseDuration := parseDuration(cfg.Lease)
if schemaLeaseDuration <= 0 {
// previous version allow set schema lease to 0, and mainly used on
// uni-store and for test, to be compatible we set it to default value here.
log.Warn("schema lease is invalid, use default value",
zap.String("lease", schemaLeaseDuration.String()))
schemaLeaseDuration = config.DefSchemaLease
}
session.SetSchemaLease(schemaLeaseDuration)
statsLeaseDuration := parseDuration(cfg.Performance.StatsLease)
session.SetStatsLease(statsLeaseDuration)
planReplayerGCLease := parseDuration(cfg.Performance.PlanReplayerGCLease)
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ const (
// EnvVarKeyspaceName is the system env name for keyspace name.
EnvVarKeyspaceName = "KEYSPACE_NAME"
// MaxTokenLimit is the max token limit value.
MaxTokenLimit = 1024 * 1024
MaxTokenLimit = 1024 * 1024
DefSchemaLease = 45 * time.Second
)

// Valid config maps
Expand Down Expand Up @@ -912,7 +913,7 @@ var defaultConf = Config{
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
Lease: DefSchemaLease.String(),
TokenLimit: 1000,
OOMUseTmpStorage: true,
TempDir: DefTempDir,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func TestSnapshotVersion(t *testing.T) {

dd := dom.DDL()
ddl.DisableTiFlashPoll(dd)
require.Equal(t, dbTestLease, dd.GetLease())
require.Equal(t, dbTestLease, dom.GetSchemaLease())

snapTS := oracle.GoTimeToTS(time.Now())
tk.MustExec("create database test2")
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestSchemaValidator(t *testing.T) {

dd := dom.DDL()
ddl.DisableTiFlashPoll(dd)
require.Equal(t, dbTestLease, dd.GetLease())
require.Equal(t, dbTestLease, dom.GetSchemaLease())

tk.MustExec("create table test.t(a int)")

Expand Down
32 changes: 13 additions & 19 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ type DDL interface {
// Start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
Start(ctxPool *pools.ResourcePool) error
// GetLease returns current schema lease time.
GetLease() time.Duration
// Stats returns the DDL statistics.
Stats(vars *variable.SessionVars) (map[string]any, error)
// GetScope gets the status variables scope.
Expand Down Expand Up @@ -573,14 +571,6 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {
// give up notify and log it.
func asyncNotifyEvent(d *ddlCtx, e *statsutil.DDLEvent) {
if d.ddlEventCh != nil {
if d.lease == 0 {
// If lease is 0, it's always used in test.
select {
case d.ddlEventCh <- e:
default:
}
return
}
for i := 0; i < 10; i++ {
select {
case d.ddlEventCh <- e:
Expand Down Expand Up @@ -853,12 +843,6 @@ func (d *ddl) close() {
logutil.DDLLogger().Info("DDL closed", zap.String("ID", d.uuid), zap.Duration("take time", time.Since(startTime)))
}

// GetLease implements DDL.GetLease interface.
func (d *ddl) GetLease() time.Duration {
lease := d.lease
return lease
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() syncer.SchemaSyncer {
return d.schemaSyncer
Expand Down Expand Up @@ -1021,9 +1005,19 @@ type RecoverSchemaInfo struct {
// This provides a safe window for async commit and 1PC to commit with an old schema.
func delayForAsyncCommit() {
if variable.EnableMDL.Load() {
// If metadata lock is enabled. The transaction of DDL must begin after prewrite of the async commit transaction,
// then the commit ts of DDL must be greater than the async commit transaction. In this case, the corresponding schema of the async commit transaction
// is correct. But if metadata lock is disabled, we can't ensure that the corresponding schema of the async commit transaction isn't change.
// If metadata lock is enabled. The transaction of DDL must begin after
// pre-write of the async commit transaction, then the commit ts of DDL
// must be greater than the async commit transaction. In this case, the
// corresponding schema of the async commit transaction is correct.
// suppose we're adding index:
// - schema state -> StateWriteOnly with version V
// - some txn T started using async commit and version V,
// and T do pre-write before or after V+1
// - schema state -> StateWriteReorganization with version V+1
// - T commit finish, with TS
// - 'wait schema synced' finish
// - schema state -> Done with version V+2, commit-ts of this
// transaction must > TS, so it's safe for T to commit.
return
}
cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ func runReorgJobAndHandleErr(
if err != nil {
return false, ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, allIndexInfos[0].Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestCheckOwner(t *testing.T) {

time.Sleep(testLease)
require.Equal(t, dom.DDL().OwnerManager().IsOwner(), true)
require.Equal(t, dom.DDL().GetLease(), testLease)
require.Equal(t, dom.GetSchemaLease(), testLease)
}

func TestInvalidDDLJob(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,8 @@ func (w *worker) doModifyColumn(
if !mysql.HasNotNullFlag(oldCol.GetFlag()) && mysql.HasNotNullFlag(newCol.GetFlag()) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.GetFlag())

// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
// We need to check after the flag is set
if d.lease > 0 && !noPreventNullFlag {
if !noPreventNullFlag {
delayForAsyncCommit()
}

Expand Down Expand Up @@ -588,7 +587,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
Expand Down
10 changes: 4 additions & 6 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2233,7 +2233,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
// and then run the reorg next time.
return ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition",
func() {
dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic")
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func (w *worker) onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
// and then run the reorg next time.
return ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition",
func() {
dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic")
Expand Down Expand Up @@ -2702,9 +2702,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
// partition to be exchange with.
// So we need to rollback that change, instead of just cancelling.

if d.lease > 0 {
delayForAsyncCommit()
}
delayForAsyncCommit()

if defID != partDef.ID {
// Should never happen, should have been updated above, in previous state!
Expand Down Expand Up @@ -3451,7 +3449,7 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, partTbl, physTblIDs, elements)
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (reorgErr error) {
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (reorgErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork",
func() {
reorgErr = dbterror.ErrCancelledDDLJob.GenWithStack("reorganize partition for table `%v` panic", tbl.Meta().Name)
Expand Down
23 changes: 4 additions & 19 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ func newReorgSessCtx(store kv.Storage) sessionctx.Context {
return c
}

const defaultWaitReorgTimeout = 10 * time.Second

// ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage.
// make it a var for testing.
var ReorgWaitTimeout = 5 * time.Second

func (rc *reorgCtx) notifyJobState(state model.JobState) {
Expand Down Expand Up @@ -226,7 +225,6 @@ func (rc *reorgCtx) getRowCount() int64 {
func (w *worker) runReorgJob(
reorgInfo *reorgInfo,
tblInfo *model.TableInfo,
lease time.Duration,
reorgFn func() error,
) error {
job := reorgInfo.Job
Expand Down Expand Up @@ -268,16 +266,7 @@ func (w *worker) runReorgJob(
}()
}

waitTimeout := defaultWaitReorgTimeout
// if lease is 0, we are using a local storage,
// and we can wait the reorganization to be done here.
// if lease > 0, we don't need to wait here because
// we should update some job's progress context and try checking again,
// so we use a very little timeout here.
if lease > 0 {
waitTimeout = ReorgWaitTimeout
}

waitTimeout := ReorgWaitTimeout
// wait reorganization job done or timeout
select {
case res := <-rc.doneCh:
Expand Down Expand Up @@ -744,9 +733,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
})

info.first = true
if d.lease > 0 { // Only delay when it's not in test.
delayForAsyncCommit()
}
delayForAsyncCommit()
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -837,9 +824,7 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo
)
if job.SnapshotVer == 0 {
info.first = true
if d.lease > 0 { // Only delay when it's not in test.
delayForAsyncCommit()
}
delayForAsyncCommit()
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) {
newDDL, newDDLExecutor := ddl.NewDDL(context.Background(),
ddl.WithStore(d.Store()),
ddl.WithInfoCache(d.InfoCache()),
ddl.WithLease(d.DDL().GetLease()),
ddl.WithLease(d.GetSchemaLease()),
ddl.WithSchemaLoader(d),
)
d.SetDDL(newDDL, newDDLExecutor)
Expand Down Expand Up @@ -92,7 +92,7 @@ func testRunInterruptedJob(t *testing.T, store kv.Storage, d *domain.Domain, job
done := make(chan error, 1)
go runInterruptedJob(t, store, d.DDLExecutor(), job, done)

ticker := time.NewTicker(d.DDL().GetLease())
ticker := time.NewTicker(d.GetSchemaLease())
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestStat(t *testing.T) {
done := make(chan error, 1)
go runInterruptedJob(t, store, dom.DDLExecutor(), job, done)

ticker := time.NewTicker(dom.DDL().GetLease() * 1)
ticker := time.NewTicker(dom.GetSchemaLease() * 1)
defer ticker.Stop()
ver := getDDLSchemaVer(t, dom.DDL())
LOOP:
Expand Down
6 changes: 0 additions & 6 deletions pkg/ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl"
Expand Down Expand Up @@ -490,11 +489,6 @@ func (d *Checker) Start(ctxPool *pools.ResourcePool) error {
return d.realDDL.Start(ctxPool)
}

// GetLease implements the DDL interface.
func (d *Checker) GetLease() time.Duration {
return d.realDDL.GetLease()
}

// Stats implements the DDL interface.
func (d *Checker) Stats(vars *variable.SessionVars) (map[string]any, error) {
return d.realDDL.Stats(vars)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/indexmerge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestAddIndexMergeProcess(t *testing.T) {
}

func TestAddPrimaryKeyMergeProcess(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 0)
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, time.Second)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/tests/tiflash/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func createTiFlashContext(t *testing.T) (*tiflashContext, func()) {
)

require.NoError(t, err)
session.SetSchemaLease(0)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
infosync.SetMockTiFlash(s.tiflash)
Expand Down
Loading