From 408a46654d6e189f754c0d9052b5976bc0b3aca5 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 9 Jan 2023 15:50:22 +0800 Subject: [PATCH 1/3] session: fix deadlock when init domain failed (#40409) close pingcap/tidb#40408 --- domain/domain.go | 8 ++++++-- domain/domain_test.go | 6 +++--- session/tidb.go | 12 ++++++------ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index b3ed976bf7e03..d01b900cdf444 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -898,7 +898,7 @@ func (do *Domain) Close() { const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout // NewDomain creates a new domain. Should not create multiple domains for the same store. -func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory, onClose func()) *Domain { +func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain { capacity := 200 // capacity of the sysSessionPool size do := &Domain{ store: store, @@ -909,7 +909,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), indexUsageSyncLease: idxUsageSyncLease, dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}}, - onClose: onClose, expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), mdlCheckTableInfo: &mdlCheckTableInfo{ mu: sync.Mutex{}, @@ -1082,6 +1081,11 @@ func (do *Domain) Init( return nil } +// SetOnClose used to set do.onClose func. +func (do *Domain) SetOnClose(onClose func()) { + do.onClose = onClose +} + func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { cfg := config.GetGlobalConfig() if pdClient == nil || do.etcdClient == nil { diff --git a/domain/domain_test.go b/domain/domain_test.go index c117ac2244b2e..bd9287fe730ec 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -68,7 +68,7 @@ func TestInfo(t *testing.T) { Storage: s, pdAddrs: []string{cluster.Members[0].GRPCURL()}} ddlLease := 80 * time.Millisecond - dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory, nil) + dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory) defer func() { dom.Close() err := s.Close() @@ -171,7 +171,7 @@ func TestStatWorkRecoverFromPanic(t *testing.T) { require.NoError(t, err) ddlLease := 80 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil) + dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory) metrics.PanicCounter.Reset() // Since the stats lease is 0 now, so create a new ticker will panic. @@ -238,7 +238,7 @@ func TestClosestReplicaReadChecker(t *testing.T) { require.NoError(t, err) ddlLease := 80 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil) + dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory) defer func() { dom.Close() require.Nil(t, store.Close()) diff --git a/session/tidb.go b/session/tidb.go index fd4411a18518c..113638958ecab 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -42,11 +42,12 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/syncutil" "go.uber.org/zap" ) type domainMap struct { - mu sync.Mutex + mu syncutil.Mutex domains map[string]*domain.Domain } @@ -81,10 +82,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { zap.Stringer("index usage sync lease", idxUsageSyncLease)) factory := createSessionFunc(store) sysFactory := createSessionWithDomainFunc(store) - onClose := func() { - dm.Delete(store) - } - d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose) + d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory) var ddlInjector func(ddl.DDL) *schematracker.Checker if injector, ok := store.(schematracker.StorageDDLInjector); ok { @@ -102,8 +100,10 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { if err != nil { return nil, err } - dm.domains[key] = d + d.SetOnClose(func() { + dm.Delete(store) + }) return } From 83ec4b03f35830f9d6f3054b6dbf5ee2b411251c Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Mon, 9 Jan 2023 16:30:22 +0800 Subject: [PATCH 2/3] executor: support insert ignore/duplicate replace into with unique multi-valued index (#40369) close pingcap/tidb#40207 --- executor/batch_checker.go | 44 ++++++++++++++++++++------------------- executor/insert_test.go | 28 +++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index c1eb1fda0d8f5..838c6af7bace0 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -190,28 +190,30 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D } // Pass handle = 0 to GenIndexKey, // due to we only care about distinct key. - key, distinct, err1 := v.GenIndexKey(ctx.GetSessionVars().StmtCtx, - colVals, kv.IntHandle(0), nil) - if err1 != nil { - return nil, err1 - } - // Skip the non-distinct keys. - if !distinct { - continue - } - // If index is used ingest ways, then we should check key from temp index. - if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { - _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) - } - colValStr, err1 := formatDataForDupError(colVals) - if err1 != nil { - return nil, err1 + iter := v.GenIndexKVIter(ctx.GetSessionVars().StmtCtx, colVals, kv.IntHandle(0), nil) + for iter.Valid() { + key, _, distinct, err1 := iter.Next(nil) + if err1 != nil { + return nil, err1 + } + // Skip the non-distinct keys. + if !distinct { + continue + } + // If index is used ingest ways, then we should check key from temp index. + if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { + _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) + } + colValStr, err1 := formatDataForDupError(colVals) + if err1 != nil { + return nil, err1 + } + uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{ + newKey: key, + dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())), + commonHandle: t.Meta().IsCommonHandle, + }) } - uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{ - newKey: key, - dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())), - commonHandle: t.Meta().IsCommonHandle, - }) } row = row[:len(row)-extraColumns] result = append(result, toBeCheckedRow{ diff --git a/executor/insert_test.go b/executor/insert_test.go index b55c3a63765e3..acd8d6736b219 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -419,6 +419,34 @@ func TestInsertValueForCastDecimalField(t *testing.T) { tk.MustQuery(`select cast(a as decimal) from t1;`).Check(testkit.Rows(`9999999999`)) } +func TestInsertForMultiValuedIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t1;`) + tk.MustExec(`create table t1(a json, b int, unique index idx((cast(a as signed array))));`) + tk.MustExec(`insert into t1 values ('[1,11]', 1);`) + tk.MustExec(`insert into t1 values ('[2, 22]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustGetErrMsg(`insert into t1 values ('[2, 222]', 2);`, "[kv:1062]Duplicate entry '2' for key 't1.idx'") + tk.MustExec(`replace into t1 values ('[1, 10]', 10)`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[2, 22] 2`, `[1, 10] 10`)) + tk.MustExec(`replace into t1 values ('[1, 2]', 1)`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 2] 1`)) + tk.MustExec(`replace into t1 values ('[1, 11]', 1)`) + tk.MustExec(`insert into t1 values ('[2, 22]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustExec(`insert ignore into t1 values ('[1]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustExec(`insert ignore into t1 values ('[1, 2]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustExec(`insert into t1 values ('[2]', 2) on duplicate key update b = 10;`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 10`)) + tk.MustGetErrMsg(`insert into t1 values ('[2, 1]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'") + tk.MustGetErrMsg(`insert into t1 values ('[1,2]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'") + tk.MustGetErrMsg(`insert into t1 values ('[11, 22]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'") +} + func TestInsertDateTimeWithTimeZone(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From 199bb382a19dd41caad2bba49703290baec261d2 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 9 Jan 2023 16:54:23 +0800 Subject: [PATCH 3/3] br: support reset_tiflash after ebs restoration (#40124) close pingcap/tidb#40208 --- br/pkg/restore/client.go | 92 +++++++++++++++++++ br/pkg/restore/tiflashrec/tiflash_recorder.go | 52 ++++++++++- .../tiflashrec/tiflash_recorder_test.go | 29 ++++++ br/pkg/task/restore_data.go | 4 + 4 files changed, 175 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index d7811574915f2..6f91a3b4deffc 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2646,6 +2646,74 @@ func (rc *Client) SetWithSysTable(withSysTable bool) { rc.withSysTable = withSysTable } +func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error { + dom, err := g.GetDomain(storage) + if err != nil { + return errors.Trace(err) + } + info := dom.InfoSchema() + allSchema := info.AllSchemas() + recorder := tiflashrec.New() + + expectTiFlashStoreCount := uint64(0) + needTiFlash := false + for _, s := range allSchema { + for _, t := range s.Tables { + if t.TiFlashReplica != nil { + expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count) + recorder.AddTable(t.ID, *t.TiFlashReplica) + needTiFlash = true + } + } + } + if !needTiFlash { + log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica") + return nil + } + // we wait for ten minutes to wait tiflash starts. + // since tiflash only starts when set unmark recovery mode finished. + timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + err = utils.WithRetry(timeoutCtx, func() error { + tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx) + log.Info("get tiflash store count for resetting TiFlash Replica", + zap.Uint64("count", tiFlashStoreCount)) + if err != nil { + return errors.Trace(err) + } + if tiFlashStoreCount < expectTiFlashStoreCount { + log.Info("still waiting for enough tiflash store start", + zap.Uint64("expect", expectTiFlashStoreCount), + zap.Uint64("actual", tiFlashStoreCount), + ) + return errors.New("tiflash store count is less than expected") + } + return nil + }, &waitTiFlashBackoffer{ + Attempts: 30, + BaseBackoff: 4 * time.Second, + }) + if err != nil { + return err + } + + sqls := recorder.GenerateResetAlterTableDDLs(info) + log.Info("Generating SQLs for resetting tiflash replica", + zap.Strings("sqls", sqls)) + + return g.UseOneShotSession(storage, false, func(se glue.Session) error { + for _, sql := range sqls { + if errExec := se.ExecuteInternal(ctx, sql); errExec != nil { + logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.", + logutil.ShortError(errExec), + zap.String("sql", sql), + ) + } + } + return nil + }) +} + // MockClient create a fake client used to test. func MockClient(dbs map[string]*utils.Database) *Client { return &Client{databases: dbs} @@ -2721,3 +2789,27 @@ func CheckNewCollationEnable( log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled)) return nil } + +type waitTiFlashBackoffer struct { + Attempts int + BaseBackoff time.Duration +} + +// NextBackoff returns a duration to wait before retrying again +func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration { + bo := b.BaseBackoff + b.Attempts-- + if b.Attempts == 0 { + return 0 + } + b.BaseBackoff *= 2 + if b.BaseBackoff > 32*time.Second { + b.BaseBackoff = 32 * time.Second + } + return bo +} + +// Attempt returns the remain attempt times +func (b *waitTiFlashBackoffer) Attempt() int { + return b.Attempts +} diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder.go b/br/pkg/restore/tiflashrec/tiflash_recorder.go index 31dde982a7b69..84707f05e1f1b 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder.go @@ -79,6 +79,46 @@ func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) { } } +func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema) []string { + items := make([]string, 0, len(r.items)) + r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) { + table, ok := info.TableByID(id) + if !ok { + log.Warn("Table do not exist, skipping", zap.Int64("id", id)) + return + } + schema, ok := info.SchemaByTable(table.Meta()) + if !ok { + log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name)) + return + } + // Currently, we didn't backup tiflash cluster volume during volume snapshot backup, + // But the table has replica info after volume restoration. + // We should reset it to 0, then set it back. otherwise, it will return error when alter tiflash replica. + altTableSpec, err := alterTableSpecOf(replica, true) + if err != nil { + log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica)) + return + } + items = append(items, fmt.Sprintf( + "ALTER TABLE %s %s", + utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + altTableSpec), + ) + altTableSpec, err = alterTableSpecOf(replica, false) + if err != nil { + log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica)) + return + } + items = append(items, fmt.Sprintf( + "ALTER TABLE %s %s", + utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + altTableSpec), + ) + }) + return items +} + func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string { items := make([]string, 0, len(r.items)) r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) { @@ -92,7 +132,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name)) return } - altTableSpec, err := alterTableSpecOf(replica) + altTableSpec, err := alterTableSpecOf(replica, false) if err != nil { log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica)) return @@ -106,7 +146,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s return items } -func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) { +func alterTableSpecOf(replica model.TiFlashReplicaInfo, reset bool) (string, error) { spec := &ast.AlterTableSpec{ Tp: ast.AlterTableSetTiFlashReplica, TiFlashReplica: &ast.TiFlashReplicaSpec{ @@ -114,6 +154,14 @@ func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) { Labels: replica.LocationLabels, }, } + if reset { + spec = &ast.AlterTableSpec{ + Tp: ast.AlterTableSetTiFlashReplica, + TiFlashReplica: &ast.TiFlashReplicaSpec{ + Count: 0, + }, + } + } buf := bytes.NewBuffer(make([]byte, 0, 32)) restoreCx := format.NewRestoreCtx( diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder_test.go b/br/pkg/restore/tiflashrec/tiflash_recorder_test.go index b01272caeddc5..f7316a1ed3133 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder_test.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder_test.go @@ -170,3 +170,32 @@ func TestGenSql(t *testing.T) { "ALTER TABLE `test`.`evils` SET TIFLASH REPLICA 1 LOCATION LABELS 'kIll''; OR DROP DATABASE test --', 'dEaTh with " + `\\"quoting\\"` + "'", }) } + +func TestGenResetSql(t *testing.T) { + tInfo := func(id int, name string) *model.TableInfo { + return &model.TableInfo{ + ID: int64(id), + Name: model.NewCIStr(name), + } + } + fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{ + tInfo(1, "fruits"), + tInfo(2, "whisper"), + }) + rec := tiflashrec.New() + rec.AddTable(1, model.TiFlashReplicaInfo{ + Count: 1, + }) + rec.AddTable(2, model.TiFlashReplicaInfo{ + Count: 2, + LocationLabels: []string{"climate"}, + }) + + sqls := rec.GenerateResetAlterTableDDLs(fakeInfo) + require.ElementsMatch(t, sqls, []string{ + "ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 0", + "ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 2 LOCATION LABELS 'climate'", + "ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 0", + "ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 1", + }) +} diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index f8e286dd0e72b..fc82d011abb0d 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -154,6 +154,10 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto //TODO: restore volume type into origin type //ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta + // since we cannot reset tiflash automaticlly. so we should start it manually + if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil { + return errors.Trace(err) + } progress.Close() summary.CollectDuration("restore duration", time.Since(startAll)) summary.SetSuccessStatus(true)