From be8caa654965df2c3a8139f63bfe5cb264fd51af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Tue, 3 Jan 2023 15:50:19 +0800 Subject: [PATCH] ttl: disable ttl job when recover/flashback table/database/cluster (#40268) close pingcap/tidb#40265 --- ddl/cluster.go | 38 ++++++++++++++++++++++++--- ddl/cluster_test.go | 24 +++++++++++++++-- ddl/ddl_api.go | 3 ++- ddl/schema.go | 4 +++ ddl/serial_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ ddl/table.go | 5 ++++ 6 files changed, 131 insertions(+), 7 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 227963b3951d5..cd0053f9e7e4f 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -68,6 +68,7 @@ const ( totalLockedRegionsOffset startTSOffset commitTSOffset + ttlJobEnableOffSet ) func closePDSchedule() error { @@ -124,6 +125,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint) } +func getTiDBTTLJobEnable(sess sessionctx.Context) (string, error) { + val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBTTLJobEnable) + if err != nil { + return "", errors.Trace(err) + } + return val, nil +} + +func setTiDBTTLJobEnable(ctx context.Context, sess sessionctx.Context, value string) error { + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBTTLJobEnable, value) +} + func setTiDBEnableAutoAnalyze(ctx context.Context, sess sessionctx.Context, value string) error { return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBEnableAutoAnalyze, value) } @@ -176,6 +189,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = setTiDBSuperReadOnly(d.ctx, sess, variable.On); err != nil { return err } + if err = setTiDBTTLJobEnable(d.ctx, sess, variable.Off); err != nil { + return err + } nowSchemaVersion, err := t.GetSchemaVersion() if err != nil { @@ -553,9 +569,9 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve var flashbackTS, lockedRegions, startTS, commitTS uint64 var pdScheduleValue map[string]interface{} - var autoAnalyzeValue, readOnlyValue string + var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -595,6 +611,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } job.Args[readOnlyOffset] = &readOnlyValue + ttlJobEnableValue, err = getTiDBTTLJobEnable(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue job.SchemaState = model.StateDeleteOnly return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. @@ -694,10 +716,10 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { var flashbackTS, lockedRegions, startTS, commitTS uint64 var pdScheduleValue map[string]interface{} - var autoAnalyzeValue, readOnlyValue string + var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string var gcEnabled bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() @@ -718,6 +740,14 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = setTiDBSuperReadOnly(w.ctx, sess, readOnlyValue); err != nil { return err } + + if job.IsCancelled() { + // only restore `tidb_ttl_job_enable` when flashback failed + if err = setTiDBTTLJobEnable(w.ctx, sess, ttlJobEnableValue); err != nil { + return err + } + } + return setTiDBEnableAutoAnalyze(w.ctx, sess, autoAnalyzeValue) }) if err != nil { diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 12c77c42edafe..8bb1776d6d08f 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -209,12 +209,16 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { rs, err = tk.Exec("show variables like 'tidb_super_read_only'") assert.NoError(t, err) assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) } } dom.DDL().SetHook(hook) - // first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off + // first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off and `tidb_ttl_job_enable` = on tk.MustExec("set global tidb_gc_enable = on") tk.MustExec("set global tidb_super_read_only = off") + tk.MustExec("set global tidb_ttl_job_enable = on") tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -224,10 +228,14 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { rs, err = tk.Exec("show variables like 'tidb_gc_enable'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) - // second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on + // second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on and `tidb_ttl_job_enable` = off tk.MustExec("set global tidb_gc_enable = off") tk.MustExec("set global tidb_super_read_only = on") + tk.MustExec("set global tidb_ttl_job_enable = off") ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) require.NoError(t, err) @@ -238,6 +246,9 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { rs, err = tk.Exec("show variables like 'tidb_gc_enable'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) @@ -268,9 +279,14 @@ func TestCancelFlashbackCluster(t *testing.T) { return job.SchemaState == model.StateDeleteOnly }) dom.DDL().SetHook(hook) + tk.MustExec("set global tidb_ttl_job_enable = on") tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob) hook.MustCancelDone(t) + rs, err := tk.Exec("show variables like 'tidb_ttl_job_enable'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + // Try canceled on StateWriteReorganization, cancel failed hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool { return job.SchemaState == model.StateWriteReorganization @@ -279,6 +295,10 @@ func TestCancelFlashbackCluster(t *testing.T) { tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) hook.MustCancelFailed(t) + rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 7cd65b47b170a..2a20ea69f3d39 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2750,7 +2750,8 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error variable.Off, /* tidb_super_read_only */ 0, /* totalRegions */ 0, /* startTS */ - 0 /* commitTS */}, + 0, /* commitTS */ + variable.On /* tidb_ttl_job_enable */}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) diff --git a/ddl/schema.go b/ddl/schema.go index d9e86c30c5eaf..e9cb1e6579635 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -312,6 +312,10 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i return ver, errors.Trace(err) } for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo { + if recoverInfo.TableInfo.TTLInfo != nil { + // force disable TTL job schedule for recovered table + recoverInfo.TableInfo.TTLInfo.Enable = false + } ver, err = w.recoverTable(t, job, recoverInfo) if err != nil { return ver, errors.Trace(err) diff --git a/ddl/serial_test.go b/ddl/serial_test.go index e3456124871e8..315cfca73e57c 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -462,6 +462,70 @@ func TestCancelAddIndexPanic(t *testing.T) { require.Truef(t, strings.HasPrefix(errMsg, "[ddl:8214]Cancelled DDL job"), "%v", errMsg) } +func TestRecoverTableWithTTL(t *testing.T) { + store, _ := createMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database if not exists test_recover") + tk.MustExec("use test_recover") + defer func(originGC bool) { + if originGC { + util.EmulatorGCEnable() + } else { + util.EmulatorGCDisable() + } + }(util.IsEmulatorGCEnable()) + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + util.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + tk.MustExec(fmt.Sprintf(safePointSQL, time.Now().Add(-time.Hour).Format(gcTimeFormat))) + getDDLJobID := func(table, tp string) int64 { + rs, err := tk.Exec("admin show ddl jobs") + require.NoError(t, err) + rows, err := session.GetRows4Test(context.Background(), tk.Session(), rs) + require.NoError(t, err) + for _, row := range rows { + if row.GetString(2) == table && row.GetString(3) == tp { + return row.GetInt64(0) + } + } + require.FailNowf(t, "can't find %s table of %s", tp, table) + return -1 + } + + // recover table + tk.MustExec("create table t_recover1 (t timestamp) TTL=`t`+INTERVAL 1 DAY") + tk.MustExec("drop table t_recover1") + tk.MustExec("recover table t_recover1") + tk.MustQuery("show create table t_recover1").Check(testkit.Rows("t_recover1 CREATE TABLE `t_recover1` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */")) + + // recover table with job id + tk.MustExec("create table t_recover2 (t timestamp) TTL=`t`+INTERVAL 1 DAY") + tk.MustExec("drop table t_recover2") + jobID := getDDLJobID("t_recover2", "drop table") + tk.MustExec(fmt.Sprintf("recover table BY JOB %d", jobID)) + tk.MustQuery("show create table t_recover2").Check(testkit.Rows("t_recover2 CREATE TABLE `t_recover2` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */")) + + // flashback table + tk.MustExec("create table t_recover3 (t timestamp) TTL=`t`+INTERVAL 1 DAY") + tk.MustExec("drop table t_recover3") + tk.MustExec("flashback table t_recover3") + tk.MustQuery("show create table t_recover3").Check(testkit.Rows("t_recover3 CREATE TABLE `t_recover3` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */")) + + // flashback database + tk.MustExec("create database if not exists test_recover2") + tk.MustExec("create table test_recover2.t1 (t timestamp) TTL=`t`+INTERVAL 1 DAY") + tk.MustExec("create table test_recover2.t2 (t timestamp) TTL=`t`+INTERVAL 1 DAY") + tk.MustExec("drop database test_recover2") + tk.MustExec("flashback database test_recover2") + tk.MustQuery("show create table test_recover2.t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */")) + tk.MustQuery("show create table test_recover2.t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */")) +} + func TestRecoverTableByJobID(t *testing.T) { store, _ := createMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/table.go b/ddl/table.go index 9e6fab762d3c5..a27eeb4df42fa 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -404,6 +404,11 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in schemaID := recoverInfo.SchemaID tblInfo := recoverInfo.TableInfo + if tblInfo.TTLInfo != nil { + // force disable TTL job schedule for recovered table + tblInfo.TTLInfo.Enable = false + } + // check GC and safe point gcEnable, err := checkGCEnable(w) if err != nil {