diff --git a/DEPS.bzl b/DEPS.bzl index 4c3e94cca5a14..1d23df5e1e756 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2898,8 +2898,8 @@ def go_deps(): name = "com_github_pingcap_log", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/log", - sum = "h1:T7e5Low0BU2ZazI2dz2mh3W1qv+w8wtvq1YR8DneA0c=", - version = "v1.1.1-0.20221110065318-21a4942860b3", + sum = "h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI=", + version = "v1.1.1-0.20221116035753-734d527bc87c", ) go_repository( name = "com_github_pingcap_sysutil", diff --git a/Dockerfile b/Dockerfile index 8416ef542a3d3..f3dae2519f53a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,16 +13,24 @@ # limitations under the License. # Builder image -FROM alpine:edge as builder +FROM rockylinux:9 as builder -ADD . https://raw.githubusercontent.com/njhallett/apk-fastest-mirror/c4ca44caef3385d830fea34df2dbc2ba4a17e021/apk-fastest-mirror.sh ./proxy -RUN sh ./proxy/apk-fastest-mirror.sh -t 50 && apk add --no-cache git build-base go +ENV GOLANG_VERSION 1.19.3 +ENV ARCH amd64 +ENV GOLANG_DOWNLOAD_URL https://dl.google.com/go/go$GOLANG_VERSION.linux-$ARCH.tar.gz +ENV GOPATH /go +ENV GOROOT /usr/local/go +ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH +RUN yum update -y && yum groupinstall 'Development Tools' -y \ + && curl -fsSL "$GOLANG_DOWNLOAD_URL" -o golang.tar.gz \ + && tar -C /usr/local -xzf golang.tar.gz \ + && rm golang.tar.gz COPY . /tidb ARG GOPROXY RUN export GOPROXY=${GOPROXY} && cd /tidb && make server -FROM alpine:latest +FROM rockylinux:9-minimal COPY --from=builder /tidb/bin/tidb-server /tidb-server diff --git a/autoid_service/autoid.go b/autoid_service/autoid.go index d20c78fd06098..f2836fc80fe85 100644 --- a/autoid_service/autoid.go +++ b/autoid_service/autoid.go @@ -405,12 +405,30 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (* val := s.getAlloc(req.DbID, req.TblID, req.IsUnsigned) - if req.N == 0 && val.base != 0 { - base := val.base + if req.N == 0 { + if val.base != 0 { + return &autoid.AutoIDResponse{ + Min: val.base, + Max: val.base, + }, nil + } + // This item is not initialized, get the data from remote. + var currentEnd int64 + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta) + err := kv.RunInNewTxn(ctx, s.store, true, func(ctx context.Context, txn kv.Transaction) error { + idAcc := meta.NewMeta(txn).GetAutoIDAccessors(req.DbID, req.TblID).RowID() + var err1 error + currentEnd, err1 = idAcc.Get() + if err1 != nil { + return err1 + } + val.end = currentEnd + return nil + }) return &autoid.AutoIDResponse{ - Min: base, - Max: base, - }, nil + Min: currentEnd, + Max: currentEnd, + }, err } val.Lock() diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index b7cb35a9f8727..b25aa87f2fce8 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -2894,17 +2894,23 @@ func TestAutoIncrementTableOption(t *testing.T) { tk.MustExec("create database test_auto_inc_table_opt;") tk.MustExec("use test_auto_inc_table_opt;") - // Empty auto_inc allocator should not cause error. - tk.MustExec("create table t (a bigint primary key clustered) auto_increment = 10;") - tk.MustExec("alter table t auto_increment = 10;") - tk.MustExec("alter table t auto_increment = 12345678901234567890;") - - // Rebase the auto_inc allocator to a large integer should work. - tk.MustExec("drop table t;") - tk.MustExec("create table t (a bigint unsigned auto_increment, unique key idx(a));") - tk.MustExec("alter table t auto_increment = 12345678901234567890;") - tk.MustExec("insert into t values ();") - tk.MustQuery("select * from t;").Check(testkit.Rows("12345678901234567890")) + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + // Empty auto_inc allocator should not cause error. + tk.MustExec("create table t (a bigint primary key clustered) auto_increment = 10" + str) + tk.MustExec("alter table t auto_increment = 10;") + tk.MustExec("alter table t auto_increment = 12345678901234567890;") + tk.MustExec("drop table t;") + + // Rebase the auto_inc allocator to a large integer should work. + tk.MustExec("create table t (a bigint unsigned auto_increment, unique key idx(a))" + str) + // Set auto_inc to negative is not supported + err := tk.ExecToErr("alter table t auto_increment = -1;") + require.Error(t, err) + tk.MustExec("alter table t auto_increment = 12345678901234567890;") + tk.MustExec("insert into t values ();") + tk.MustQuery("select * from t;").Check(testkit.Rows("12345678901234567890")) + tk.MustExec("drop table t;") + } } func TestAutoIncrementForce(t *testing.T) { @@ -2919,83 +2925,95 @@ func TestAutoIncrementForce(t *testing.T) { require.NoError(t, err) return gid } - // Rebase _tidb_row_id. - tk.MustExec("create table t (a int);") - tk.MustExec("alter table t force auto_increment = 2;") - tk.MustExec("insert into t values (1),(2);") - tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("1 2", "2 3")) - // Cannot set next global ID to 0. - tk.MustGetErrCode("alter table t force auto_increment = 0;", errno.ErrAutoincReadFailed) - tk.MustExec("alter table t force auto_increment = 1;") - require.Equal(t, uint64(1), getNextGlobalID()) - // inserting new rows can overwrite the existing data. - tk.MustExec("insert into t values (3);") - require.Equal(t, "[kv:1062]Duplicate entry '2' for key 't.PRIMARY'", tk.ExecToErr("insert into t values (3);").Error()) - tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("3 1", "1 2", "2 3")) - - // Rebase auto_increment. - tk.MustExec("drop table if exists t;") - tk.MustExec("create table t (a int primary key auto_increment, b int);") - tk.MustExec("insert into t values (1, 1);") - tk.MustExec("insert into t values (100000000, 1);") - tk.MustExec("delete from t where a = 100000000;") - require.Greater(t, getNextGlobalID(), uint64(100000000)) - // Cannot set next global ID to 0. - tk.MustGetErrCode("alter table t /*T![force_inc] force */ auto_increment = 0;", errno.ErrAutoincReadFailed) - tk.MustExec("alter table t /*T![force_inc] force */ auto_increment = 2;") - require.Equal(t, uint64(2), getNextGlobalID()) - tk.MustExec("insert into t(b) values (2);") - tk.MustQuery("select a, b from t;").Check(testkit.Rows("1 1", "2 2")) - - // Rebase auto_random. - tk.MustExec("drop table if exists t;") - tk.MustExec("create table t (a bigint primary key auto_random(5));") - tk.MustExec("insert into t values ();") - tk.MustExec("set @@allow_auto_random_explicit_insert = true") - tk.MustExec("insert into t values (100000000);") - tk.MustExec("delete from t where a = 100000000;") - require.Greater(t, getNextGlobalID(), uint64(100000000)) - // Cannot set next global ID to 0. - tk.MustGetErrCode("alter table t force auto_random_base = 0;", errno.ErrAutoincReadFailed) - tk.MustExec("alter table t force auto_random_base = 2;") - require.Equal(t, uint64(2), getNextGlobalID()) - tk.MustExec("insert into t values ();") - tk.MustQuery("select (a & 3) from t order by 1;").Check(testkit.Rows("1", "2")) - - // Change next global ID. - tk.MustExec("drop table if exists t;") - tk.MustExec("create table t (a bigint primary key auto_increment);") - tk.MustExec("insert into t values (1);") - bases := []uint64{1, 65535, 10, math.MaxUint64, math.MaxInt64 + 1, 1, math.MaxUint64, math.MaxInt64, 2} - lastBase := fmt.Sprintf("%d", bases[len(bases)-1]) - for _, b := range bases { - tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b)) - require.Equal(t, b, getNextGlobalID()) + + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + // Rebase _tidb_row_id. + tk.MustExec("create table t (a int)" + str) + tk.MustExec("alter table t force auto_increment = 2;") + tk.MustExec("insert into t values (1),(2);") + tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("1 2", "2 3")) + // Cannot set next global ID to 0. + tk.MustGetErrCode("alter table t force auto_increment = 0;", errno.ErrAutoincReadFailed) + tk.MustExec("alter table t force auto_increment = 1;") + require.Equal(t, uint64(1), getNextGlobalID()) + // inserting new rows can overwrite the existing data. + tk.MustExec("insert into t values (3);") + require.Equal(t, "[kv:1062]Duplicate entry '2' for key 't.PRIMARY'", tk.ExecToErr("insert into t values (3);").Error()) + tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("3 1", "1 2", "2 3")) + tk.MustExec("drop table if exists t;") } - tk.MustExec("insert into t values ();") - tk.MustQuery("select a from t;").Check(testkit.Rows("1", lastBase)) - // Force alter unsigned int auto_increment column. - tk.MustExec("drop table if exists t;") - tk.MustExec("create table t (a bigint unsigned primary key auto_increment);") - for _, b := range bases { - tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b)) - require.Equal(t, b, getNextGlobalID()) + + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + // Rebase auto_increment. + tk.MustExec("create table t (a int primary key auto_increment, b int)" + str) + tk.MustExec("insert into t values (1, 1);") + tk.MustExec("insert into t values (100000000, 1);") + tk.MustExec("delete from t where a = 100000000;") + require.Greater(t, getNextGlobalID(), uint64(100000000)) + // Cannot set next global ID to 0. + tk.MustGetErrCode("alter table t /*T![force_inc] force */ auto_increment = 0;", errno.ErrAutoincReadFailed) + tk.MustExec("alter table t /*T![force_inc] force */ auto_increment = 2;") + require.Equal(t, uint64(2), getNextGlobalID()) + tk.MustExec("insert into t(b) values (2);") + tk.MustQuery("select a, b from t;").Check(testkit.Rows("1 1", "2 2")) + tk.MustExec("drop table if exists t;") + } + + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + // Rebase auto_random. + tk.MustExec("create table t (a bigint primary key auto_random(5))" + str) + tk.MustExec("insert into t values ();") + tk.MustExec("set @@allow_auto_random_explicit_insert = true") + tk.MustExec("insert into t values (100000000);") + tk.MustExec("delete from t where a = 100000000;") + require.Greater(t, getNextGlobalID(), uint64(100000000)) + // Cannot set next global ID to 0. + tk.MustGetErrCode("alter table t force auto_random_base = 0;", errno.ErrAutoincReadFailed) + tk.MustExec("alter table t force auto_random_base = 2;") + require.Equal(t, uint64(2), getNextGlobalID()) tk.MustExec("insert into t values ();") - tk.MustQuery("select a from t;").Check(testkit.Rows(fmt.Sprintf("%d", b))) - tk.MustExec("delete from t;") + tk.MustQuery("select (a & 3) from t order by 1;").Check(testkit.Rows("1", "2")) + tk.MustExec("drop table if exists t;") } - // Force alter with @@auto_increment_increment and @@auto_increment_offset. - tk.MustExec("drop table if exists t;") - tk.MustExec("create table t(a int key auto_increment);") - tk.MustExec("set @@auto_increment_offset=2;") - tk.MustExec("set @@auto_increment_increment = 11;") - tk.MustExec("insert into t values (500);") - tk.MustExec("alter table t force auto_increment=100;") - tk.MustExec("insert into t values (), ();") - tk.MustQuery("select * from t;").Check(testkit.Rows("101", "112", "500")) - tk.MustQuery("select * from t order by a;").Check(testkit.Rows("101", "112", "500")) - tk.MustExec("drop table if exists t;") + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + // Change next global ID. + tk.MustExec("create table t (a bigint primary key auto_increment)" + str) + tk.MustExec("insert into t values (1);") + bases := []uint64{1, 65535, 10, math.MaxUint64, math.MaxInt64 + 1, 1, math.MaxUint64, math.MaxInt64, 2} + lastBase := fmt.Sprintf("%d", bases[len(bases)-1]) + for _, b := range bases { + fmt.Println("execute alter table force increment to ==", b) + tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b)) + require.Equal(t, b, getNextGlobalID()) + } + tk.MustExec("insert into t values ();") + tk.MustQuery("select a from t;").Check(testkit.Rows("1", lastBase)) + // Force alter unsigned int auto_increment column. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint unsigned primary key auto_increment)" + str) + for _, b := range bases { + tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b)) + require.Equal(t, b, getNextGlobalID()) + tk.MustExec("insert into t values ();") + tk.MustQuery("select a from t;").Check(testkit.Rows(fmt.Sprintf("%d", b))) + tk.MustExec("delete from t;") + } + tk.MustExec("drop table if exists t;") + } + + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + // Force alter with @@auto_increment_increment and @@auto_increment_offset. + tk.MustExec("create table t(a int key auto_increment)" + str) + tk.MustExec("set @@auto_increment_offset=2;") + tk.MustExec("set @@auto_increment_increment = 11;") + tk.MustExec("insert into t values (500);") + tk.MustExec("alter table t force auto_increment=100;") + tk.MustExec("insert into t values (), ();") + tk.MustQuery("select * from t;").Check(testkit.Rows("101", "112", "500")) + tk.MustQuery("select * from t order by a;").Check(testkit.Rows("101", "112", "500")) + tk.MustExec("drop table if exists t;") + } // Check for warning in case we can't set the auto_increment to the desired value tk.MustExec("create table t(a int primary key auto_increment)") diff --git a/ddl/ddl_tiflash_api.go b/ddl/ddl_tiflash_api.go index 7e456815acd8c..4b8fca2a91c0f 100644 --- a/ddl/ddl_tiflash_api.go +++ b/ddl/ddl_tiflash_api.go @@ -29,20 +29,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/ddl/placement" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -50,12 +44,13 @@ import ( // TiFlashReplicaStatus records status for each TiFlash replica. type TiFlashReplicaStatus struct { - ID int64 - Count uint64 - LocationLabels []string - Available bool - HighPriority bool - IsPartition bool + ID int64 + Count uint64 + LocationLabels []string + Available bool + LogicalTableAvailable bool + HighPriority bool + IsPartition bool } // TiFlashTick is type for backoff threshold. @@ -283,16 +278,16 @@ func LoadTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplic for _, p := range pi.Definitions { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition %v\n", tblInfo.ID, p.ID)) *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, - tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), false, true}) + tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, false, true}) } // partitions that in adding mid-state for _, p := range pi.AddingDefinitions { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition adding %v\n", tblInfo.ID, p.ID)) - *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), true, true}) + *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, true, true}) } } else { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has no partition\n", tblInfo.ID)) - *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, false, false}) + *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, tblInfo.TiFlashReplica.Available, false, false}) } } @@ -355,22 +350,6 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error { return nil } -func getTiFlashPeerWithoutLagCount(pollTiFlashContext *TiFlashManagementContext, tableID int64) (int, error) { - // storeIDs -> regionID, PD will not create two peer on the same store - var flashPeerCount int - for _, store := range pollTiFlashContext.TiFlashStores { - regionReplica := make(map[int64]int) - err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, ®ionReplica) - if err != nil { - logutil.BgLogger().Error("Fail to get peer status from TiFlash.", - zap.Int64("tableID", tableID)) - return 0, err - } - flashPeerCount += len(regionReplica) - } - return flashPeerCount, nil -} - func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) { pollMaxCount := RefreshProgressMaxTableCount failpoint.Inject("PollAvailableTableProgressMaxCount", func(val failpoint.Value) { @@ -466,6 +445,21 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T } } + failpoint.Inject("waitForAddPartition", func(val failpoint.Value) { + for _, phyTable := range tableList { + is := d.infoCache.GetLatest() + _, ok := is.TableByID(phyTable.ID) + if !ok { + tb, _, _ := is.FindTableByPartitionID(phyTable.ID) + if tb == nil { + logutil.BgLogger().Info("[ddl] waitForAddPartition") + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + } + } + } + }) + needPushPending := false if pollTiFlashContext.UpdatingProgressTables.Len() == 0 { needPushPending = true @@ -479,7 +473,7 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T available = val.(bool) }) // We only check unavailable tables here, so doesn't include blocked add partition case. - if !available { + if !available && !tb.LogicalTableAvailable { enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID) if inqueue && !enabled { logutil.BgLogger().Info("Escape checking available status due to backoff", zap.Int64("tableId", tb.ID)) @@ -540,110 +534,6 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T return nil } -func getDropOrTruncateTableTiflash(ctx sessionctx.Context, currentSchema infoschema.InfoSchema, tikvHelper *helper.Helper, replicaInfos *[]TiFlashReplicaStatus) error { - store := tikvHelper.Store.(kv.Storage) - - txn, err := store.Begin() - if err != nil { - return errors.Trace(err) - } - gcSafePoint, err := gcutil.GetGCSafePoint(ctx) - if err != nil { - return err - } - uniqueIDMap := make(map[int64]struct{}) - handleJobAndTableInfo := func(job *model.Job, tblInfo *model.TableInfo) (bool, error) { - // Avoid duplicate table ID info. - if _, ok := currentSchema.TableByID(tblInfo.ID); ok { - return false, nil - } - if _, ok := uniqueIDMap[tblInfo.ID]; ok { - return false, nil - } - uniqueIDMap[tblInfo.ID] = struct{}{} - LoadTiFlashReplicaInfo(tblInfo, replicaInfos) - return false, nil - } - fn := func(jobs []*model.Job) (bool, error) { - getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) { - snapMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(StartTS))) - if err != nil { - return nil, err - } - tbl, err := snapMeta.GetTable(SchemaID, TableID) - return tbl, err - } - return GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, handleJobAndTableInfo) - } - - err = IterAllDDLJobs(ctx, txn, fn) - if err != nil { - if terror.ErrorEqual(variable.ErrSnapshotTooOld, err) { - // The err indicate that current ddl job and remain DDL jobs was been deleted by GC, - // just ignore the error and return directly. - return nil - } - return err - } - return nil -} - -// HandlePlacementRuleRoutine fetch all rules from pd, remove all obsolete rules. -// It handles rare situation, when we fail to alter pd rules. -func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFlashReplicaStatus) error { - c := context.Background() - tikvStore, ok := ctx.GetStore().(helper.Storage) - if !ok { - return errors.New("Can not get Helper") - } - tikvHelper := &helper.Helper{ - Store: tikvStore, - RegionCache: tikvStore.GetRegionCache(), - } - - allRulesArr, err := infosync.GetTiFlashGroupRules(c, "tiflash") - if err != nil { - return errors.Trace(err) - } - allRules := make(map[string]placement.TiFlashRule) - for _, r := range allRulesArr { - allRules[r.ID] = r - } - - start := time.Now() - originLen := len(tableList) - currentSchema := d.GetInfoSchemaWithInterceptor(ctx) - if err := getDropOrTruncateTableTiflash(ctx, currentSchema, tikvHelper, &tableList); err != nil { - // may fail when no `tikv_gc_safe_point` available, should return in order to remove valid pd rules. - logutil.BgLogger().Error("getDropOrTruncateTableTiflash returns error", zap.Error(err)) - return errors.Trace(err) - } - elapsed := time.Since(start) - logutil.BgLogger().Info("getDropOrTruncateTableTiflash cost", zap.Duration("time", elapsed), zap.Int("updated", len(tableList)-originLen)) - for _, tb := range tableList { - // For every region in each table, if it has one replica, we reckon it ready. - ruleID := fmt.Sprintf("table-%v-r", tb.ID) - if _, ok := allRules[ruleID]; !ok { - // Mostly because of a previous failure of setting pd rule. - logutil.BgLogger().Warn(fmt.Sprintf("Table %v exists, but there are no rule for it", tb.ID)) - newRule := infosync.MakeNewRule(tb.ID, tb.Count, tb.LocationLabels) - _ = infosync.SetTiFlashPlacementRule(context.Background(), *newRule) - } - // For every existing table, we do not remove their rules. - delete(allRules, ruleID) - } - - // Remove rules of non-existing table - for _, v := range allRules { - logutil.BgLogger().Info("Remove TiFlash rule", zap.String("id", v.ID)) - if err := infosync.DeleteTiFlashPlacementRule(c, "tiflash", v.ID); err != nil { - logutil.BgLogger().Warn("delete TiFlash pd rule failed", zap.Error(err), zap.String("ruleID", v.ID)) - } - } - - return nil -} - func (d *ddl) PollTiFlashRoutine() { pollTiflashContext, err := NewTiFlashManagementContext() if err != nil { diff --git a/ddl/ddl_tiflash_test.go b/ddl/ddl_tiflash_test.go index 27d05112df483..accf7cc038ebd 100644 --- a/ddl/ddl_tiflash_test.go +++ b/ddl/ddl_tiflash_test.go @@ -1264,3 +1264,38 @@ func TestTiFlashPartitionNotAvailable(t *testing.T) { require.NotNil(t, replica) require.True(t, replica.Available) } + +func TestTiFlashAvailableAfterAddPartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + + // still available after adding partition. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly", `return(2)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/waitForAddPartition", `return(3)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/waitForAddPartition")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue")) + }() + tk.MustExec("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (20))") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + pi := tb.Meta().GetPartitionInfo() + require.NotNil(t, pi) + require.Equal(t, len(pi.Definitions), 2) +} diff --git a/ddl/job_table.go b/ddl/job_table.go index d23f083539e87..62bba65cf6678 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -67,7 +67,7 @@ func (dc *ddlCtx) excludeJobIDs() string { } const ( - getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids) and %s reorg %s order by processing desc, job_id" + getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing order by processing desc limit 1) and %s reorg %s order by processing desc, job_id" ) type jobType int diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 8f9e59ae31084..25d8d4300eacd 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -177,3 +177,68 @@ func check(t *testing.T, record []int64, ids ...int64) { } } } + +func TestAlwaysChoiceProcessingJob(t *testing.T) { + if !variable.EnableConcurrentDDL.Load() { + t.Skipf("test requires concurrent ddl") + } + store, dom := testkit.CreateMockStoreAndDomain(t) + + d := dom.DDL() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + + ddlJobs := []string{ + "alter table t add index idx(a)", + "alter table t add index idx(b)", + } + + hook := &ddl.TestDDLCallback{} + var wg util.WaitGroupWrapper + wg.Add(1) + var once sync.Once + var idxa, idxb int64 + hook.OnGetJobBeforeExported = func(jobType string) { + once.Do(func() { + var jobs []*model.Job + for i, job := range ddlJobs { + wg.Run(func() { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + recordSet, _ := tk.Exec(job) + if recordSet != nil { + require.NoError(t, recordSet.Close()) + } + }) + for { + time.Sleep(time.Millisecond * 100) + var err error + jobs, err = ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session(), nil) + require.NoError(t, err) + if len(jobs) == i+1 { + break + } + } + } + idxa = jobs[0].ID + idxb = jobs[1].ID + require.Greater(t, idxb, idxa) + tk := testkit.NewTestKit(t, store) + tk.MustExec("update mysql.tidb_ddl_job set processing = 1 where job_id = ?", idxb) + wg.Done() + }) + } + + record := make([]int64, 0, 16) + hook.OnGetJobAfterExported = func(jobType string, job *model.Job) { + // record the job schedule order + record = append(record, job.ID) + } + + d.SetHook(hook) + wg.Wait() + + check(t, record, idxb, idxa) +} diff --git a/ddl/partition.go b/ddl/partition.go index 7bba0b1006332..a8947d091bfc5 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -170,6 +170,10 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v job.SchemaState = model.StateReplicaOnly case model.StateReplicaOnly: // replica only -> public + failpoint.Inject("sleepBeforeReplicaOnly", func(val failpoint.Value) { + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + }) // Here need do some tiflash replica complement check. // TODO: If a table is with no TiFlashReplica or it is not available, the replica-only state can be eliminated. if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { @@ -193,6 +197,15 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { for _, d := range partInfo.Definitions { tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID) + err = infosync.UpdateTiFlashProgressCache(d.ID, 1) + if err != nil { + // just print log, progress will be updated in `refreshTiFlashTicker` + logutil.BgLogger().Error("update tiflash sync progress cache failed", + zap.Error(err), + zap.Int64("tableID", tblInfo.ID), + zap.Int64("partitionID", d.ID), + ) + } } } // For normal and replica finished table, move the `addingDefinitions` into `Definitions`. diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index f7ef9baba6907..4b575f4dc63e0 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -42,7 +42,9 @@ go_library( "//privilege/privileges", "//sessionctx", "//sessionctx/sessionstates", + "//sessionctx/stmtctx", "//sessionctx/variable", + "//statistics", "//statistics/handle", "//telemetry", "//types", diff --git a/domain/domain.go b/domain/domain.go index 1016d5ba9b5cb..b900cf3eb8d3a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1532,11 +1532,17 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) { } // SetupPlanReplayerHandle setup plan replayer handle -func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) { - do.planReplayerHandle = &planReplayerHandle{ - planReplayerTaskCollectorHandle: &planReplayerTaskCollectorHandle{ - sctx: ctx, - }, +func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + do.planReplayerHandle = &planReplayerHandle{} + do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{ + ctx: ctx, + sctx: collectorSctx, + } + do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{ + ctx: ctx, + sctx: dumperSctx, + taskCH: make(chan *PlanReplayerDumpTask, 16), } } @@ -1557,27 +1563,42 @@ func (do *Domain) StartPlanReplayerHandle() { if planReplayerHandleLease < 1 { return } - do.wg.Add(1) + do.wg.Add(2) go func() { tikcer := time.NewTicker(planReplayerHandleLease) defer func() { tikcer.Stop() do.wg.Done() - logutil.BgLogger().Info("PlanReplayerHandle exited.") - util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false) + logutil.BgLogger().Info("PlanReplayerTaskCollectHandle exited.") + util.Recover(metrics.LabelDomain, "PlanReplayerTaskCollectHandle", nil, false) }() for { select { case <-do.exit: return case <-tikcer.C: - err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background()) + err := do.planReplayerHandle.CollectPlanReplayerTask() if err != nil { logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err)) } } } }() + go func() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.") + util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false) + }() + for { + select { + case <-do.exit: + return + case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH: + do.planReplayerHandle.HandlePlanReplayerDumpTask(task) + } + } + }() } // GetPlanReplayerHandle returns plan replayer handle diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index efc2e8ad21429..faab592950c64 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -16,8 +16,10 @@ package domain import ( "context" + "encoding/base64" "fmt" "io/ioutil" + "math/rand" "os" "path/filepath" "strconv" @@ -33,7 +35,9 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" @@ -122,6 +126,16 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { type planReplayerHandle struct { *planReplayerTaskCollectorHandle + *planReplayerTaskDumpHandle +} + +// HandlePlanReplayerDumpTask handle dump task +func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool { + success := h.dumpPlanReplayerDumpTask(task) + if success { + h.removeTask(task.PlanReplayerTaskKey) + } + return success } type planReplayerTaskCollectorHandle struct { @@ -129,6 +143,7 @@ type planReplayerTaskCollectorHandle struct { sync.RWMutex tasks map[PlanReplayerTaskKey]struct{} } + ctx context.Context sctx sessionctx.Context } @@ -153,32 +168,30 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port) } for _, record := range records { - if !record.Internal { - if len(record.FailedReason) > 0 { - insertExternalPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record) - } else { - insertExternalPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record) - } + if len(record.FailedReason) > 0 { + insertPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record) + } else { + insertPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record) } } } -func insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { +func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { exec := sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx, fmt.Sprintf( - "insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')", - record.OriginSQL, record.FailedReason, instance)) + "insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')", + record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance)) if err != nil { logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed", zap.Error(err)) } } -func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { +func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { exec := sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx, fmt.Sprintf( - "insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')", - record.OriginSQL, record.Token, instance)) + "insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')", + record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance)) if err != nil { logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed", zap.Error(err)) @@ -186,15 +199,14 @@ func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx ses } // CollectPlanReplayerTask collects all unhandled plan replayer task -func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask(ctx context.Context) error { - ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) - allKeys, err := h.collectAllPlanReplayerTask(ctx1) +func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error { + allKeys, err := h.collectAllPlanReplayerTask(h.ctx) if err != nil { return err } tasks := make([]PlanReplayerTaskKey, 0) for _, key := range allKeys { - unhandled, err := checkUnHandledReplayerTask(ctx1, h.sctx, key) + unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key) if err != nil { return err } @@ -227,6 +239,12 @@ func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey h.taskMu.tasks = r } +func (h *planReplayerTaskCollectorHandle) removeTask(taskKey PlanReplayerTaskKey) { + h.taskMu.Lock() + defer h.taskMu.Unlock() + delete(h.taskMu.tasks, taskKey) +} + func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) { exec := h.sctx.(sqlexec.SQLExecutor) rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task") @@ -245,16 +263,96 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context for _, row := range rows { sqlDigest, planDigest := row.GetString(0), row.GetString(1) allKeys = append(allKeys, PlanReplayerTaskKey{ - sqlDigest: sqlDigest, - planDigest: planDigest, + SQLDigest: sqlDigest, + PlanDigest: planDigest, }) } return allKeys, nil } +type planReplayerTaskDumpHandle struct { + ctx context.Context + sctx sessionctx.Context + taskCH chan *PlanReplayerDumpTask +} + +// DrainTask drain a task for unit test +func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { + return <-h.taskCH +} + +// HandlePlanReplayerDumpTask handled the task +func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) { + taskKey := task.PlanReplayerTaskKey + unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey) + if err != nil { + logutil.BgLogger().Warn("check plan replayer capture task failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + // the task is processed, thus we directly skip it. + if !unhandled { + return true + } + + file, fileName, err := GeneratePlanReplayerFile() + if err != nil { + logutil.BgLogger().Warn("generate plan replayer capture task file failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return + } + task.Zf = file + task.FileName = fileName + task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) + jsStats := make(map[int64]*handle.JSONTable) + is := GetDomain(h.sctx).InfoSchema() + for tblID, stat := range task.TblStats { + tbl, ok := is.TableByID(tblID) + if !ok { + return false + } + schema, ok := is.SchemaByTable(tbl.Meta()) + if !ok { + return false + } + r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table)) + if err != nil { + logutil.BgLogger().Warn("generate plan replayer capture task json stats failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + jsStats[tblID] = r + } + err = DumpPlanReplayerInfo(h.ctx, h.sctx, task) + if err != nil { + logutil.BgLogger().Warn("dump plan replayer capture task result failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + return true +} + +// SendTask send dumpTask in background task handler +func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) { + select { + case h.taskCH <- task: + default: + // TODO: add metrics here + // directly discard the task if the task channel is full in order not to block the query process + } +} + func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) { exec := sctx.(sqlexec.SQLExecutor) - rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest)) + rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest)) if err != nil { return false, err } @@ -274,7 +372,8 @@ func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, ta // PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status type PlanReplayerStatusRecord struct { - Internal bool + SQLDigest string + PlanDigest string OriginSQL string Token string FailedReason string @@ -282,18 +381,57 @@ type PlanReplayerStatusRecord struct { // PlanReplayerTaskKey indicates key of a plan replayer task type PlanReplayerTaskKey struct { - sqlDigest string - planDigest string + SQLDigest string + PlanDigest string } // PlanReplayerDumpTask wrap the params for plan replayer dump type PlanReplayerDumpTask struct { + PlanReplayerTaskKey + + // tmp variables stored during the query + EncodePlan func(*stmtctx.StatementContext, bool) (string, string) + TblStats map[int64]interface{} + + // variables used to dump the plan SessionBindings []*bindinfo.BindRecord EncodedPlan string - FileName string - Zf *os.File SessionVars *variable.SessionVars - TblStats map[int64]*handle.JSONTable + JSONTblStats map[int64]*handle.JSONTable ExecStmts []ast.StmtNode Analyze bool + + FileName string + Zf *os.File +} + +// GeneratePlanReplayerFile generates plan replayer file +func GeneratePlanReplayerFile() (*os.File, string, error) { + path := GetPlanReplayerDirName() + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + return nil, "", errors.AddStack(err) + } + fileName, err := generatePlanReplayerFileName() + if err != nil { + return nil, "", errors.AddStack(err) + } + zf, err := os.Create(filepath.Join(path, fileName)) + if err != nil { + return nil, "", errors.AddStack(err) + } + return zf, fileName, err +} + +func generatePlanReplayerFileName() (string, error) { + // Generate key and create zip file + time := time.Now().UnixNano() + b := make([]byte, 16) + //nolint: gosec + _, err := rand.Read(b) + if err != nil { + return "", err + } + key := base64.URLEncoding.EncodeToString(b) + return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil } diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 93d0278a4ba3d..195dae7b4a0b1 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -210,7 +210,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, } // Dump stats - if err = dumpStats(zw, pairs, task.TblStats, do); err != nil { + if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil { return err } @@ -252,9 +252,10 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord { if len(task.ExecStmts) > 0 { for _, execStmt := range task.ExecStmts { records = append(records, PlanReplayerStatusRecord{ - OriginSQL: execStmt.Text(), - Token: task.FileName, - Internal: false, + SQLDigest: task.SQLDigest, + PlanDigest: task.PlanDigest, + OriginSQL: execStmt.Text(), + Token: task.FileName, }) } } diff --git a/domain/plan_replayer_handle_test.go b/domain/plan_replayer_handle_test.go index 2c25f56e15045..5a824ef4eeeb6 100644 --- a/domain/plan_replayer_handle_test.go +++ b/domain/plan_replayer_handle_test.go @@ -15,7 +15,7 @@ package domain_test import ( - "context" + "fmt" "testing" "github.com/pingcap/tidb/testkit" @@ -31,14 +31,14 @@ func TestPlanReplayerHandleCollectTask(t *testing.T) { tk.MustExec("delete from mysql.plan_replayer_task") tk.MustExec("delete from mysql.plan_replayer_status") tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") - err := prHandle.CollectPlanReplayerTask(context.Background()) + err := prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 1) // assert no task tk.MustExec("delete from mysql.plan_replayer_task") tk.MustExec("delete from mysql.plan_replayer_status") - err = prHandle.CollectPlanReplayerTask(context.Background()) + err = prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 0) @@ -48,7 +48,7 @@ func TestPlanReplayerHandleCollectTask(t *testing.T) { tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');") tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, token, instance) values ('123','123','123','123')") - err = prHandle.CollectPlanReplayerTask(context.Background()) + err = prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 1) @@ -58,7 +58,44 @@ func TestPlanReplayerHandleCollectTask(t *testing.T) { tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');") tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, fail_reason, instance) values ('123','123','123','123')") - err = prHandle.CollectPlanReplayerTask(context.Background()) + err = prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 2) } + +func TestPlanReplayerHandleDumpTask(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + prHandle := dom.GetPlanReplayerHandle() + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustQuery("select * from t;") + _, d := tk.Session().GetSessionVars().StmtCtx.SQLDigest() + _, pd := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest() + sqlDigest := d.String() + planDigest := pd.String() + + // register task + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("delete from mysql.plan_replayer_status") + tk.MustExec(fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%v','%v');", sqlDigest, planDigest)) + err := prHandle.CollectPlanReplayerTask() + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 1) + + tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;") + + // capture task and dump + tk.MustQuery("select * from t;") + task := prHandle.DrainTask() + require.NotNil(t, task) + success := prHandle.HandlePlanReplayerDumpTask(task) + require.True(t, success) + // assert memory task consumed + require.Len(t, prHandle.GetTasks(), 0) + + // assert collect task again and no more memory task + err = prHandle.CollectPlanReplayerTask() + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 0) +} diff --git a/executor/adapter.go b/executor/adapter.go index db9fbbaa929e0..3dd0e0ce0877e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1629,6 +1629,11 @@ func getPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) { return normalized, planDigest } +// GetEncodedPlan returned same as getEncodedPlan +func GetEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) { + return getEncodedPlan(stmtCtx, genHint) +} + // getEncodedPlan gets the encoded plan, and generates the hint string if indicated. func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) { var hintSet bool diff --git a/executor/autoidtest/BUILD.bazel b/executor/autoidtest/BUILD.bazel new file mode 100644 index 0000000000000..dd467855dc8d9 --- /dev/null +++ b/executor/autoidtest/BUILD.bazel @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "autoidtest_test", + srcs = [ + "autoid_test.go", + "main_test.go", + ], + flaky = True, + race = "on", + deps = [ + "//config", + "//ddl/testutil", + "//meta/autoid", + "//parser/mysql", + "//session", + "//sessionctx/variable", + "//testkit", + "//testkit/testutil", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/executor/autoidtest/autoid_test.go b/executor/autoidtest/autoid_test.go new file mode 100644 index 0000000000000..269f3966f8a01 --- /dev/null +++ b/executor/autoidtest/autoid_test.go @@ -0,0 +1,737 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package autoid_test + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + ddltestutil "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/testutil" + "github.com/stretchr/testify/require" +) + +// Test filter different kind of allocators. +// In special ddl type, for example: +// 1: ActionRenameTable : it will abandon all the old allocators. +// 2: ActionRebaseAutoID : it will drop row-id-type allocator. +// 3: ActionModifyTableAutoIdCache : it will drop row-id-type allocator. +// 3: ActionRebaseAutoRandomBase : it will drop auto-rand-type allocator. +func TestFilterDifferentAllocators(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists t1") + + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + tk.MustExec("create table t(a bigint auto_random(5) key, b int auto_increment unique)" + str) + tk.MustExec("insert into t values()") + tk.MustQuery("select b from t").Check(testkit.Rows("1")) + allHandles, err := ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t") + require.NoError(t, err) + require.Equal(t, 1, len(allHandles)) + orderedHandles := testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) + require.Equal(t, int64(1), orderedHandles[0]) + tk.MustExec("delete from t") + + // Test rebase auto_increment. + tk.MustExec("alter table t auto_increment 3000000") + tk.MustExec("insert into t values()") + tk.MustQuery("select b from t").Check(testkit.Rows("3000000")) + allHandles, err = ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t") + require.NoError(t, err) + require.Equal(t, 1, len(allHandles)) + orderedHandles = testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) + require.Equal(t, int64(2), orderedHandles[0]) + tk.MustExec("delete from t") + + // Test rebase auto_random. + tk.MustExec("alter table t auto_random_base 3000000") + tk.MustExec("insert into t values()") + tk.MustQuery("select b from t").Check(testkit.Rows("3000001")) + allHandles, err = ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t") + require.NoError(t, err) + require.Equal(t, 1, len(allHandles)) + orderedHandles = testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) + require.Equal(t, int64(3000000), orderedHandles[0]) + tk.MustExec("delete from t") + + // Test rename table. + tk.MustExec("rename table t to t1") + tk.MustExec("insert into t1 values()") + res := tk.MustQuery("select b from t1") + strInt64, err := strconv.ParseInt(res.Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + require.GreaterOrEqual(t, strInt64, int64(3000002)) + allHandles, err = ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t1") + require.NoError(t, err) + require.Equal(t, 1, len(allHandles)) + orderedHandles = testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) + require.Greater(t, orderedHandles[0], int64(3000001)) + + tk.MustExec("drop table t1") + } +} + +func TestAutoIncrementInsertMinMax(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + cases := []struct { + t string + s string + vals []int64 + expect [][]interface{} + }{ + {"tinyint", "signed", []int64{-128, 0, 127}, testkit.Rows("-128", "1", "2", "3", "127")}, + {"tinyint", "unsigned", []int64{0, 127, 255}, testkit.Rows("1", "2", "127", "128", "255")}, + {"smallint", "signed", []int64{-32768, 0, 32767}, testkit.Rows("-32768", "1", "2", "3", "32767")}, + {"smallint", "unsigned", []int64{0, 32767, 65535}, testkit.Rows("1", "2", "32767", "32768", "65535")}, + {"mediumint", "signed", []int64{-8388608, 0, 8388607}, testkit.Rows("-8388608", "1", "2", "3", "8388607")}, + {"mediumint", "unsigned", []int64{0, 8388607, 16777215}, testkit.Rows("1", "2", "8388607", "8388608", "16777215")}, + {"integer", "signed", []int64{-2147483648, 0, 2147483647}, testkit.Rows("-2147483648", "1", "2", "3", "2147483647")}, + {"integer", "unsigned", []int64{0, 2147483647, 4294967295}, testkit.Rows("1", "2", "2147483647", "2147483648", "4294967295")}, + {"bigint", "signed", []int64{-9223372036854775808, 0, 9223372036854775807}, testkit.Rows("-9223372036854775808", "1", "2", "3", "9223372036854775807")}, + {"bigint", "unsigned", []int64{0, 9223372036854775807}, testkit.Rows("1", "2", "9223372036854775807", "9223372036854775808")}, + } + + for _, option := range []string{"", "auto_id_cache 1", "auto_id_cache 100"} { + for idx, c := range cases { + sql := fmt.Sprintf("create table t%d (a %s %s key auto_increment) %s", idx, c.t, c.s, option) + tk.MustExec(sql) + + for _, val := range c.vals { + tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", idx, val)) + tk.Exec(fmt.Sprintf("insert into t%d values ()", idx)) // ignore error + } + + tk.MustQuery(fmt.Sprintf("select * from t%d order by a", idx)).Check(c.expect) + + tk.MustExec(fmt.Sprintf("drop table t%d", idx)) + } + } + + tk.MustExec("create table t10 (a integer key auto_increment) auto_id_cache 1") + err := tk.ExecToErr("insert into t10 values (2147483648)") + require.Error(t, err) + err = tk.ExecToErr("insert into t10 values (-2147483649)") + require.Error(t, err) +} + +func TestInsertWithAutoidSchema(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t1(id int primary key auto_increment, n int);`) + tk.MustExec(`create table t2(id int unsigned primary key auto_increment, n int);`) + tk.MustExec(`create table t3(id tinyint primary key auto_increment, n int);`) + tk.MustExec(`create table t4(id int primary key, n float auto_increment, key I_n(n));`) + tk.MustExec(`create table t5(id int primary key, n float unsigned auto_increment, key I_n(n));`) + tk.MustExec(`create table t6(id int primary key, n double auto_increment, key I_n(n));`) + tk.MustExec(`create table t7(id int primary key, n double unsigned auto_increment, key I_n(n));`) + // test for inserting multiple values + tk.MustExec(`create table t8(id int primary key auto_increment, n int);`) + + testInsertWithAutoidSchema(t, tk) +} + +func TestInsertWithAutoidSchemaCache(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t1(id int primary key auto_increment, n int) AUTO_ID_CACHE 1;`) + tk.MustExec(`create table t2(id int unsigned primary key auto_increment, n int) AUTO_ID_CACHE 1;`) + tk.MustExec(`create table t3(id tinyint primary key auto_increment, n int) AUTO_ID_CACHE 1;`) + tk.MustExec(`create table t4(id int primary key, n float auto_increment, key I_n(n)) AUTO_ID_CACHE 1;`) + tk.MustExec(`create table t5(id int primary key, n float unsigned auto_increment, key I_n(n)) AUTO_ID_CACHE 1;`) + tk.MustExec(`create table t6(id int primary key, n double auto_increment, key I_n(n)) AUTO_ID_CACHE 1;`) + tk.MustExec(`create table t7(id int primary key, n double unsigned auto_increment, key I_n(n)) AUTO_ID_CACHE 1;`) + // test for inserting multiple values + tk.MustExec(`create table t8(id int primary key auto_increment, n int);`) + + testInsertWithAutoidSchema(t, tk) +} + +func testInsertWithAutoidSchema(t *testing.T, tk *testkit.TestKit) { + tests := []struct { + insert string + query string + result [][]interface{} + }{ + { + `insert into t1(id, n) values(1, 1)`, + `select * from t1 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t1(n) values(2)`, + `select * from t1 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t1(n) values(3)`, + `select * from t1 where id = 3`, + testkit.Rows(`3 3`), + }, + { + `insert into t1(id, n) values(-1, 4)`, + `select * from t1 where id = -1`, + testkit.Rows(`-1 4`), + }, + { + `insert into t1(n) values(5)`, + `select * from t1 where id = 4`, + testkit.Rows(`4 5`), + }, + { + `insert into t1(id, n) values('5', 6)`, + `select * from t1 where id = 5`, + testkit.Rows(`5 6`), + }, + { + `insert into t1(n) values(7)`, + `select * from t1 where id = 6`, + testkit.Rows(`6 7`), + }, + { + `insert into t1(id, n) values(7.4, 8)`, + `select * from t1 where id = 7`, + testkit.Rows(`7 8`), + }, + { + `insert into t1(id, n) values(7.5, 9)`, + `select * from t1 where id = 8`, + testkit.Rows(`8 9`), + }, + { + `insert into t1(n) values(9)`, + `select * from t1 where id = 9`, + testkit.Rows(`9 9`), + }, + // test last insert id + { + `insert into t1 values(3000, -1), (null, -2)`, + `select * from t1 where id = 3000`, + testkit.Rows(`3000 -1`), + }, + { + `;`, + `select * from t1 where id = 3001`, + testkit.Rows(`3001 -2`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`3001`), + }, + { + `insert into t2(id, n) values(1, 1)`, + `select * from t2 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t2(n) values(2)`, + `select * from t2 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t2(n) values(3)`, + `select * from t2 where id = 3`, + testkit.Rows(`3 3`), + }, + { + `insert into t3(id, n) values(1, 1)`, + `select * from t3 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t3(n) values(2)`, + `select * from t3 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t3(n) values(3)`, + `select * from t3 where id = 3`, + testkit.Rows(`3 3`), + }, + { + `insert into t3(id, n) values(-1, 4)`, + `select * from t3 where id = -1`, + testkit.Rows(`-1 4`), + }, + { + `insert into t3(n) values(5)`, + `select * from t3 where id = 4`, + testkit.Rows(`4 5`), + }, + { + `insert into t4(id, n) values(1, 1)`, + `select * from t4 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t4(id) values(2)`, + `select * from t4 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t4(id, n) values(3, -1)`, + `select * from t4 where id = 3`, + testkit.Rows(`3 -1`), + }, + { + `insert into t4(id) values(4)`, + `select * from t4 where id = 4`, + testkit.Rows(`4 3`), + }, + { + `insert into t4(id, n) values(5, 5.5)`, + `select * from t4 where id = 5`, + testkit.Rows(`5 5.5`), + }, + { + `insert into t4(id) values(6)`, + `select * from t4 where id = 6`, + testkit.Rows(`6 7`), + }, + { + `insert into t4(id, n) values(7, '7.7')`, + `select * from t4 where id = 7`, + testkit.Rows(`7 7.7`), + }, + { + `insert into t4(id) values(8)`, + `select * from t4 where id = 8`, + testkit.Rows(`8 9`), + }, + { + `insert into t4(id, n) values(9, 10.4)`, + `select * from t4 where id = 9`, + testkit.Rows(`9 10.4`), + }, + { + `insert into t4(id) values(10)`, + `select * from t4 where id = 10`, + testkit.Rows(`10 11`), + }, + { + `insert into t5(id, n) values(1, 1)`, + `select * from t5 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t5(id) values(2)`, + `select * from t5 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t5(id) values(3)`, + `select * from t5 where id = 3`, + testkit.Rows(`3 3`), + }, + { + `insert into t6(id, n) values(1, 1)`, + `select * from t6 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t6(id) values(2)`, + `select * from t6 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t6(id, n) values(3, -1)`, + `select * from t6 where id = 3`, + testkit.Rows(`3 -1`), + }, + { + `insert into t6(id) values(4)`, + `select * from t6 where id = 4`, + testkit.Rows(`4 3`), + }, + { + `insert into t6(id, n) values(5, 5.5)`, + `select * from t6 where id = 5`, + testkit.Rows(`5 5.5`), + }, + { + `insert into t6(id) values(6)`, + `select * from t6 where id = 6`, + testkit.Rows(`6 7`), + }, + { + `insert into t6(id, n) values(7, '7.7')`, + `select * from t4 where id = 7`, + testkit.Rows(`7 7.7`), + }, + { + `insert into t6(id) values(8)`, + `select * from t4 where id = 8`, + testkit.Rows(`8 9`), + }, + { + `insert into t6(id, n) values(9, 10.4)`, + `select * from t6 where id = 9`, + testkit.Rows(`9 10.4`), + }, + { + `insert into t6(id) values(10)`, + `select * from t6 where id = 10`, + testkit.Rows(`10 11`), + }, + { + `insert into t7(id, n) values(1, 1)`, + `select * from t7 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `insert into t7(id) values(2)`, + `select * from t7 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `insert into t7(id) values(3)`, + `select * from t7 where id = 3`, + testkit.Rows(`3 3`), + }, + + // the following is test for insert multiple values. + { + `insert into t8(n) values(1),(2)`, + `select * from t8 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `;`, + `select * from t8 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `;`, + `select last_insert_id();`, + testkit.Rows(`1`), + }, + // test user rebase and auto alloc mixture. + { + `insert into t8 values(null, 3),(-1, -1),(null,4),(null, 5)`, + `select * from t8 where id = 3`, + testkit.Rows(`3 3`), + }, + // -1 won't rebase allocator here cause -1 < base. + { + `;`, + `select * from t8 where id = -1`, + testkit.Rows(`-1 -1`), + }, + { + `;`, + `select * from t8 where id = 4`, + testkit.Rows(`4 4`), + }, + { + `;`, + `select * from t8 where id = 5`, + testkit.Rows(`5 5`), + }, + { + `;`, + `select last_insert_id();`, + testkit.Rows(`3`), + }, + { + `insert into t8 values(null, 6),(10, 7),(null, 8)`, + `select * from t8 where id = 6`, + testkit.Rows(`6 6`), + }, + // 10 will rebase allocator here. + { + `;`, + `select * from t8 where id = 10`, + testkit.Rows(`10 7`), + }, + { + `;`, + `select * from t8 where id = 11`, + testkit.Rows(`11 8`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`6`), + }, + // fix bug for last_insert_id should be first allocated id in insert rows (skip the rebase id). + { + `insert into t8 values(100, 9),(null,10),(null,11)`, + `select * from t8 where id = 100`, + testkit.Rows(`100 9`), + }, + { + `;`, + `select * from t8 where id = 101`, + testkit.Rows(`101 10`), + }, + { + `;`, + `select * from t8 where id = 102`, + testkit.Rows(`102 11`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`101`), + }, + // test with sql_mode: NO_AUTO_VALUE_ON_ZERO. + { + `;`, + `select @@sql_mode`, + testkit.Rows(`ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`), + }, + { + `;`, + "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO`", + nil, + }, + { + `insert into t8 values (0, 12), (null, 13)`, + `select * from t8 where id = 0`, + testkit.Rows(`0 12`), + }, + { + `;`, + `select * from t8 where id = 103`, + testkit.Rows(`103 13`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`103`), + }, + // test without sql_mode: NO_AUTO_VALUE_ON_ZERO. + { + `;`, + "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`", + nil, + }, + // value 0 will be substitute by autoid. + { + `insert into t8 values (0, 14), (null, 15)`, + `select * from t8 where id = 104`, + testkit.Rows(`104 14`), + }, + { + `;`, + `select * from t8 where id = 105`, + testkit.Rows(`105 15`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`104`), + }, + // last test : auto increment allocation can find in retryInfo. + { + `retry : insert into t8 values (null, 16), (null, 17)`, + `select * from t8 where id = 1000`, + testkit.Rows(`1000 16`), + }, + { + `;`, + `select * from t8 where id = 1001`, + testkit.Rows(`1001 17`), + }, + { + `;`, + `select last_insert_id()`, + // this insert doesn't has the last_insert_id, should be same as the last insert case. + testkit.Rows(`104`), + }, + } + + for _, tt := range tests { + if strings.HasPrefix(tt.insert, "retry : ") { + // it's the last retry insert case, change the sessionVars. + retryInfo := &variable.RetryInfo{Retrying: true} + retryInfo.AddAutoIncrementID(1000) + retryInfo.AddAutoIncrementID(1001) + tk.Session().GetSessionVars().RetryInfo = retryInfo + tk.MustExec(tt.insert[8:]) + tk.Session().GetSessionVars().RetryInfo = &variable.RetryInfo{} + } else { + tk.MustExec(tt.insert) + } + if tt.query == "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO`" || + tt.query == "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`" { + tk.MustExec(tt.query) + } else { + tk.MustQuery(tt.query).Check(tt.result) + } + } +} + +// TestAutoIDIncrementAndOffset There is a potential issue in MySQL: when the value of auto_increment_offset is greater +// than that of auto_increment_increment, the value of auto_increment_offset is ignored +// (https://dev.mysql.com/doc/refman/8.0/en/replication-options-master.html#sysvar_auto_increment_increment), +// This issue is a flaw of the implementation of MySQL and it doesn't exist in TiDB. +func TestAutoIDIncrementAndOffset(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + // Test for offset is larger than increment. + tk.Session().GetSessionVars().AutoIncrementIncrement = 5 + tk.Session().GetSessionVars().AutoIncrementOffset = 10 + + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + tk.MustExec(`create table io (a int key auto_increment)` + str) + tk.MustExec(`insert into io values (null),(null),(null)`) + tk.MustQuery(`select * from io`).Check(testkit.Rows("10", "15", "20")) + tk.MustExec(`drop table io`) + } + + // Test handle is PK. + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + tk.MustExec(`create table io (a int key auto_increment)` + str) + tk.Session().GetSessionVars().AutoIncrementOffset = 10 + tk.Session().GetSessionVars().AutoIncrementIncrement = 2 + tk.MustExec(`insert into io values (),(),()`) + tk.MustQuery(`select * from io`).Check(testkit.Rows("10", "12", "14")) + tk.MustExec(`delete from io`) + + // Test reset the increment. + tk.Session().GetSessionVars().AutoIncrementIncrement = 5 + tk.MustExec(`insert into io values (),(),()`) + tk.MustQuery(`select * from io`).Check(testkit.Rows("15", "20", "25")) + tk.MustExec(`delete from io`) + + tk.Session().GetSessionVars().AutoIncrementIncrement = 10 + tk.MustExec(`insert into io values (),(),()`) + tk.MustQuery(`select * from io`).Check(testkit.Rows("30", "40", "50")) + tk.MustExec(`delete from io`) + + tk.Session().GetSessionVars().AutoIncrementIncrement = 5 + tk.MustExec(`insert into io values (),(),()`) + tk.MustQuery(`select * from io`).Check(testkit.Rows("55", "60", "65")) + tk.MustExec(`drop table io`) + } + + // Test handle is not PK. + for _, str := range []string{"", " AUTO_ID_CACHE 1"} { + tk.Session().GetSessionVars().AutoIncrementIncrement = 2 + tk.Session().GetSessionVars().AutoIncrementOffset = 10 + tk.MustExec(`create table io (a int, b int auto_increment, key(b))` + str) + tk.MustExec(`insert into io(b) values (null),(null),(null)`) + // AutoID allocation will take increment and offset into consideration. + tk.MustQuery(`select b from io`).Check(testkit.Rows("10", "12", "14")) + // HandleID allocation will ignore the increment and offset. + tk.MustQuery(`select _tidb_rowid from io`).Check(testkit.Rows("15", "16", "17")) + tk.MustExec(`delete from io`) + + tk.Session().GetSessionVars().AutoIncrementIncrement = 10 + tk.MustExec(`insert into io(b) values (null),(null),(null)`) + tk.MustQuery(`select b from io`).Check(testkit.Rows("20", "30", "40")) + tk.MustQuery(`select _tidb_rowid from io`).Check(testkit.Rows("41", "42", "43")) + + // Test invalid value. + tk.Session().GetSessionVars().AutoIncrementIncrement = -1 + tk.Session().GetSessionVars().AutoIncrementOffset = -2 + tk.MustGetErrMsg(`insert into io(b) values (null),(null),(null)`, + "[autoid:8060]Invalid auto_increment settings: auto_increment_increment: -1, auto_increment_offset: -2, both of them must be in range [1..65535]") + tk.MustExec(`delete from io`) + + tk.Session().GetSessionVars().AutoIncrementIncrement = 65536 + tk.Session().GetSessionVars().AutoIncrementOffset = 65536 + tk.MustGetErrMsg(`insert into io(b) values (null),(null),(null)`, + "[autoid:8060]Invalid auto_increment settings: auto_increment_increment: 65536, auto_increment_offset: 65536, both of them must be in range [1..65535]") + + tk.MustExec(`drop table io`) + } +} + +func TestRenameTableForAutoIncrement(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("USE test;") + tk.MustExec("drop table if exists t1, t2, t3;") + tk.MustExec("create table t1 (id int key auto_increment);") + tk.MustExec("insert into t1 values ()") + tk.MustExec("rename table t1 to t11") + tk.MustExec("insert into t11 values ()") + // TODO(tiancaiamao): fix bug and uncomment here, rename table should not discard the cached AUTO_ID. + // tk.MustQuery("select * from t11").Check(testkit.Rows("1", "2")) + + // auto_id_cache 1 use another implementation and do not have such bug. + tk.MustExec("create table t2 (id int key auto_increment) auto_id_cache 1;") + tk.MustExec("insert into t2 values ()") + tk.MustExec("rename table t2 to t22") + tk.MustExec("insert into t22 values ()") + tk.MustQuery("select * from t22").Check(testkit.Rows("1", "2")) + + tk.MustExec("create table t3 (id int key auto_increment) auto_id_cache 100;") + tk.MustExec("insert into t3 values ()") + tk.MustExec("rename table t3 to t33") + tk.MustExec("insert into t33 values ()") + // TODO(tiancaiamao): fix bug and uncomment here, rename table should not discard the cached AUTO_ID. + // tk.MustQuery("select * from t33").Check(testkit.Rows("1", "2")) +} + +func TestAlterTableAutoIDCache(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("USE test;") + tk.MustExec("drop table if exists t_473;") + tk.MustExec("create table t_473 (id int key auto_increment)") + tk.MustExec("insert into t_473 values ()") + tk.MustQuery("select * from t_473").Check(testkit.Rows("1")) + rs, err := tk.Exec("show table t_473 next_row_id") + require.NoError(t, err) + rows, err1 := session.ResultSetToStringSlice(context.Background(), tk.Session(), rs) + require.NoError(t, err1) + // "test t_473 id 1013608 AUTO_INCREMENT" + val, err2 := strconv.ParseUint(rows[0][3], 10, 64) + require.NoError(t, err2) + + tk.MustExec("alter table t_473 auto_id_cache = 100") + tk.MustQuery("show table t_473 next_row_id").Check(testkit.Rows(fmt.Sprintf("test t_473 id %d AUTO_INCREMENT", val))) + tk.MustExec("insert into t_473 values ()") + tk.MustQuery("select * from t_473").Check(testkit.Rows("1", fmt.Sprintf("%d", val))) + tk.MustQuery("show table t_473 next_row_id").Check(testkit.Rows(fmt.Sprintf("test t_473 id %d AUTO_INCREMENT", val+100))) + + // Note that auto_id_cache=1 use a different implementation. + tk.MustExec("alter table t_473 auto_id_cache = 1") + tk.MustQuery("show table t_473 next_row_id").Check(testkit.Rows(fmt.Sprintf("test t_473 id %d AUTO_INCREMENT", val+100))) + tk.MustExec("insert into t_473 values ()") + tk.MustQuery("select * from t_473").Check(testkit.Rows("1", fmt.Sprintf("%d", val), fmt.Sprintf("%d", val+100))) + tk.MustQuery("show table t_473 next_row_id").Check(testkit.Rows(fmt.Sprintf("test t_473 id %d AUTO_INCREMENT", val+101))) + + // alter table from auto_id_cache=1 to default will discard the IDs cached by the autoid service. + // This is because they are two component and TiDB can't tell the autoid service to "save position and exit". + tk.MustExec("alter table t_473 auto_id_cache = 20000") + tk.MustQuery("show table t_473 next_row_id").Check(testkit.Rows(fmt.Sprintf("test t_473 id %d AUTO_INCREMENT", val+4100))) + + tk.MustExec("insert into t_473 values ()") + tk.MustQuery("select * from t_473").Check(testkit.Rows("1", + fmt.Sprintf("%d", val), + fmt.Sprintf("%d", val+100), + fmt.Sprintf("%d", val+4100))) + tk.MustQuery("show table t_473 next_row_id").Check(testkit.Rows(fmt.Sprintf("test t_473 id %d AUTO_INCREMENT", val+24100))) +} diff --git a/executor/autoidtest/main_test.go b/executor/autoidtest/main_test.go new file mode 100644 index 0000000000000..f87db4afe1371 --- /dev/null +++ b/executor/autoidtest/main_test.go @@ -0,0 +1,44 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package autoid_test + +import ( + "testing" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/meta/autoid" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + autoid.SetStep(5000) + config.UpdateGlobal(func(conf *config.Config) { + conf.Log.SlowThreshold = 30000 // 30s + conf.TiKVClient.AsyncCommit.SafeWindow = 0 + conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 + conf.Experimental.AllowsExpressionIndex = true + }) + tikv.EnableFailpoints() + + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/executor/compiler.go b/executor/compiler.go index 241b15874e1e2..4ddb6208a445c 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -21,7 +21,9 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -154,9 +156,42 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } } + if c.Ctx.GetSessionVars().EnablePlanReplayerCapture { + checkPlanReplayerCaptureTask(c.Ctx, stmtNode) + } + return stmt, nil } +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) { + tasks := domain.GetDomain(sctx).GetPlanReplayerHandle().GetTasks() + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) + for _, task := range tasks { + if task.SQLDigest == sqlDigest.String() && task.PlanDigest == planDigest.String() { + sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode) + } + } +} + +func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode) { + stmtCtx := sctx.GetSessionVars().StmtCtx + handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) + dumpTask := &domain.PlanReplayerDumpTask{ + PlanReplayerTaskKey: domain.PlanReplayerTaskKey{ + SQLDigest: sqlDigest, + PlanDigest: planDigest, + }, + EncodePlan: GetEncodedPlan, + TblStats: stmtCtx.TableStats, + SessionBindings: handle.GetAllBindRecord(), + SessionVars: sctx.GetSessionVars(), + ExecStmts: []ast.StmtNode{stmtNode}, + Analyze: false, + } + domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) +} + // needLowerPriority checks whether it's needed to lower the execution priority // of a query. // If the estimated output row count of any operator in the physical plan tree diff --git a/executor/ddl_test.go b/executor/ddl_test.go index d4d0d59d4ab5b..54a67cbbf9cb7 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -1131,65 +1131,6 @@ func TestAutoRandomClusteredPrimaryKey(t *testing.T) { tk.MustQuery("select a from t;").Check(testkit.Rows("1")) } -// Test filter different kind of allocators. -// In special ddl type, for example: -// 1: ActionRenameTable : it will abandon all the old allocators. -// 2: ActionRebaseAutoID : it will drop row-id-type allocator. -// 3: ActionModifyTableAutoIdCache : it will drop row-id-type allocator. -// 3: ActionRebaseAutoRandomBase : it will drop auto-rand-type allocator. -func TestFilterDifferentAllocators(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("drop table if exists t1") - - tk.MustExec("create table t(a bigint auto_random(5) key, b int auto_increment unique)") - tk.MustExec("insert into t values()") - tk.MustQuery("select b from t").Check(testkit.Rows("1")) - allHandles, err := ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t") - require.NoError(t, err) - require.Equal(t, 1, len(allHandles)) - orderedHandles := testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) - require.Equal(t, int64(1), orderedHandles[0]) - tk.MustExec("delete from t") - - // Test rebase auto_increment. - tk.MustExec("alter table t auto_increment 3000000") - tk.MustExec("insert into t values()") - tk.MustQuery("select b from t").Check(testkit.Rows("3000000")) - allHandles, err = ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t") - require.NoError(t, err) - require.Equal(t, 1, len(allHandles)) - orderedHandles = testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) - require.Equal(t, int64(2), orderedHandles[0]) - tk.MustExec("delete from t") - - // Test rebase auto_random. - tk.MustExec("alter table t auto_random_base 3000000") - tk.MustExec("insert into t values()") - tk.MustQuery("select b from t").Check(testkit.Rows("3000001")) - allHandles, err = ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t") - require.NoError(t, err) - require.Equal(t, 1, len(allHandles)) - orderedHandles = testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) - require.Equal(t, int64(3000000), orderedHandles[0]) - tk.MustExec("delete from t") - - // Test rename table. - tk.MustExec("rename table t to t1") - tk.MustExec("insert into t1 values()") - res := tk.MustQuery("select b from t1") - strInt64, err := strconv.ParseInt(res.Rows()[0][0].(string), 10, 64) - require.NoError(t, err) - require.Greater(t, strInt64, int64(3000002)) - allHandles, err = ddltestutil.ExtractAllTableHandles(tk.Session(), "test", "t1") - require.NoError(t, err) - require.Equal(t, 1, len(allHandles)) - orderedHandles = testutil.MaskSortHandles(allHandles, 5, mysql.TypeLonglong) - require.Greater(t, orderedHandles[0], int64(3000001)) -} - func TestMaxHandleAddIndex(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/executor/executor_test.go b/executor/executor_test.go index 641f56817cd60..ed4551f809ee4 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5947,6 +5947,8 @@ func TestSummaryFailedUpdate(t *testing.T) { tk.Session().SetSessionManager(sm) dom.ExpensiveQueryHandle().SetSessionManager(sm) defer tk.MustExec("SET GLOBAL tidb_mem_oom_action = DEFAULT") + tk.MustQuery("select variable_value from mysql.GLOBAL_VARIABLES where variable_name = 'tidb_mem_oom_action'").Check(testkit.Rows("LOG")) + tk.MustExec("SET GLOBAL tidb_mem_oom_action='CANCEL'") require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil)) tk.MustExec("set @@tidb_mem_quota_query=1") diff --git a/executor/insert_test.go b/executor/insert_test.go index 53d0c62cc9776..f3db8c6aca40f 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -589,435 +589,6 @@ func TestAllowInvalidDates(t *testing.T) { runWithMode("ALLOW_INVALID_DATES") } -func TestInsertWithAutoidSchema(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec(`use test`) - tk.MustExec(`create table t1(id int primary key auto_increment, n int);`) - tk.MustExec(`create table t2(id int unsigned primary key auto_increment, n int);`) - tk.MustExec(`create table t3(id tinyint primary key auto_increment, n int);`) - tk.MustExec(`create table t4(id int primary key, n float auto_increment, key I_n(n));`) - tk.MustExec(`create table t5(id int primary key, n float unsigned auto_increment, key I_n(n));`) - tk.MustExec(`create table t6(id int primary key, n double auto_increment, key I_n(n));`) - tk.MustExec(`create table t7(id int primary key, n double unsigned auto_increment, key I_n(n));`) - // test for inserting multiple values - tk.MustExec(`create table t8(id int primary key auto_increment, n int);`) - - tests := []struct { - insert string - query string - result [][]interface{} - }{ - { - `insert into t1(id, n) values(1, 1)`, - `select * from t1 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t1(n) values(2)`, - `select * from t1 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t1(n) values(3)`, - `select * from t1 where id = 3`, - testkit.Rows(`3 3`), - }, - { - `insert into t1(id, n) values(-1, 4)`, - `select * from t1 where id = -1`, - testkit.Rows(`-1 4`), - }, - { - `insert into t1(n) values(5)`, - `select * from t1 where id = 4`, - testkit.Rows(`4 5`), - }, - { - `insert into t1(id, n) values('5', 6)`, - `select * from t1 where id = 5`, - testkit.Rows(`5 6`), - }, - { - `insert into t1(n) values(7)`, - `select * from t1 where id = 6`, - testkit.Rows(`6 7`), - }, - { - `insert into t1(id, n) values(7.4, 8)`, - `select * from t1 where id = 7`, - testkit.Rows(`7 8`), - }, - { - `insert into t1(id, n) values(7.5, 9)`, - `select * from t1 where id = 8`, - testkit.Rows(`8 9`), - }, - { - `insert into t1(n) values(9)`, - `select * from t1 where id = 9`, - testkit.Rows(`9 9`), - }, - // test last insert id - { - `insert into t1 values(3000, -1), (null, -2)`, - `select * from t1 where id = 3000`, - testkit.Rows(`3000 -1`), - }, - { - `;`, - `select * from t1 where id = 3001`, - testkit.Rows(`3001 -2`), - }, - { - `;`, - `select last_insert_id()`, - testkit.Rows(`3001`), - }, - { - `insert into t2(id, n) values(1, 1)`, - `select * from t2 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t2(n) values(2)`, - `select * from t2 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t2(n) values(3)`, - `select * from t2 where id = 3`, - testkit.Rows(`3 3`), - }, - { - `insert into t3(id, n) values(1, 1)`, - `select * from t3 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t3(n) values(2)`, - `select * from t3 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t3(n) values(3)`, - `select * from t3 where id = 3`, - testkit.Rows(`3 3`), - }, - { - `insert into t3(id, n) values(-1, 4)`, - `select * from t3 where id = -1`, - testkit.Rows(`-1 4`), - }, - { - `insert into t3(n) values(5)`, - `select * from t3 where id = 4`, - testkit.Rows(`4 5`), - }, - { - `insert into t4(id, n) values(1, 1)`, - `select * from t4 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t4(id) values(2)`, - `select * from t4 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t4(id, n) values(3, -1)`, - `select * from t4 where id = 3`, - testkit.Rows(`3 -1`), - }, - { - `insert into t4(id) values(4)`, - `select * from t4 where id = 4`, - testkit.Rows(`4 3`), - }, - { - `insert into t4(id, n) values(5, 5.5)`, - `select * from t4 where id = 5`, - testkit.Rows(`5 5.5`), - }, - { - `insert into t4(id) values(6)`, - `select * from t4 where id = 6`, - testkit.Rows(`6 7`), - }, - { - `insert into t4(id, n) values(7, '7.7')`, - `select * from t4 where id = 7`, - testkit.Rows(`7 7.7`), - }, - { - `insert into t4(id) values(8)`, - `select * from t4 where id = 8`, - testkit.Rows(`8 9`), - }, - { - `insert into t4(id, n) values(9, 10.4)`, - `select * from t4 where id = 9`, - testkit.Rows(`9 10.4`), - }, - { - `insert into t4(id) values(10)`, - `select * from t4 where id = 10`, - testkit.Rows(`10 11`), - }, - { - `insert into t5(id, n) values(1, 1)`, - `select * from t5 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t5(id) values(2)`, - `select * from t5 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t5(id) values(3)`, - `select * from t5 where id = 3`, - testkit.Rows(`3 3`), - }, - { - `insert into t6(id, n) values(1, 1)`, - `select * from t6 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t6(id) values(2)`, - `select * from t6 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t6(id, n) values(3, -1)`, - `select * from t6 where id = 3`, - testkit.Rows(`3 -1`), - }, - { - `insert into t6(id) values(4)`, - `select * from t6 where id = 4`, - testkit.Rows(`4 3`), - }, - { - `insert into t6(id, n) values(5, 5.5)`, - `select * from t6 where id = 5`, - testkit.Rows(`5 5.5`), - }, - { - `insert into t6(id) values(6)`, - `select * from t6 where id = 6`, - testkit.Rows(`6 7`), - }, - { - `insert into t6(id, n) values(7, '7.7')`, - `select * from t4 where id = 7`, - testkit.Rows(`7 7.7`), - }, - { - `insert into t6(id) values(8)`, - `select * from t4 where id = 8`, - testkit.Rows(`8 9`), - }, - { - `insert into t6(id, n) values(9, 10.4)`, - `select * from t6 where id = 9`, - testkit.Rows(`9 10.4`), - }, - { - `insert into t6(id) values(10)`, - `select * from t6 where id = 10`, - testkit.Rows(`10 11`), - }, - { - `insert into t7(id, n) values(1, 1)`, - `select * from t7 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `insert into t7(id) values(2)`, - `select * from t7 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `insert into t7(id) values(3)`, - `select * from t7 where id = 3`, - testkit.Rows(`3 3`), - }, - - // the following is test for insert multiple values. - { - `insert into t8(n) values(1),(2)`, - `select * from t8 where id = 1`, - testkit.Rows(`1 1`), - }, - { - `;`, - `select * from t8 where id = 2`, - testkit.Rows(`2 2`), - }, - { - `;`, - `select last_insert_id();`, - testkit.Rows(`1`), - }, - // test user rebase and auto alloc mixture. - { - `insert into t8 values(null, 3),(-1, -1),(null,4),(null, 5)`, - `select * from t8 where id = 3`, - testkit.Rows(`3 3`), - }, - // -1 won't rebase allocator here cause -1 < base. - { - `;`, - `select * from t8 where id = -1`, - testkit.Rows(`-1 -1`), - }, - { - `;`, - `select * from t8 where id = 4`, - testkit.Rows(`4 4`), - }, - { - `;`, - `select * from t8 where id = 5`, - testkit.Rows(`5 5`), - }, - { - `;`, - `select last_insert_id();`, - testkit.Rows(`3`), - }, - { - `insert into t8 values(null, 6),(10, 7),(null, 8)`, - `select * from t8 where id = 6`, - testkit.Rows(`6 6`), - }, - // 10 will rebase allocator here. - { - `;`, - `select * from t8 where id = 10`, - testkit.Rows(`10 7`), - }, - { - `;`, - `select * from t8 where id = 11`, - testkit.Rows(`11 8`), - }, - { - `;`, - `select last_insert_id()`, - testkit.Rows(`6`), - }, - // fix bug for last_insert_id should be first allocated id in insert rows (skip the rebase id). - { - `insert into t8 values(100, 9),(null,10),(null,11)`, - `select * from t8 where id = 100`, - testkit.Rows(`100 9`), - }, - { - `;`, - `select * from t8 where id = 101`, - testkit.Rows(`101 10`), - }, - { - `;`, - `select * from t8 where id = 102`, - testkit.Rows(`102 11`), - }, - { - `;`, - `select last_insert_id()`, - testkit.Rows(`101`), - }, - // test with sql_mode: NO_AUTO_VALUE_ON_ZERO. - { - `;`, - `select @@sql_mode`, - testkit.Rows(`ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`), - }, - { - `;`, - "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO`", - nil, - }, - { - `insert into t8 values (0, 12), (null, 13)`, - `select * from t8 where id = 0`, - testkit.Rows(`0 12`), - }, - { - `;`, - `select * from t8 where id = 103`, - testkit.Rows(`103 13`), - }, - { - `;`, - `select last_insert_id()`, - testkit.Rows(`103`), - }, - // test without sql_mode: NO_AUTO_VALUE_ON_ZERO. - { - `;`, - "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`", - nil, - }, - // value 0 will be substitute by autoid. - { - `insert into t8 values (0, 14), (null, 15)`, - `select * from t8 where id = 104`, - testkit.Rows(`104 14`), - }, - { - `;`, - `select * from t8 where id = 105`, - testkit.Rows(`105 15`), - }, - { - `;`, - `select last_insert_id()`, - testkit.Rows(`104`), - }, - // last test : auto increment allocation can find in retryInfo. - { - `retry : insert into t8 values (null, 16), (null, 17)`, - `select * from t8 where id = 1000`, - testkit.Rows(`1000 16`), - }, - { - `;`, - `select * from t8 where id = 1001`, - testkit.Rows(`1001 17`), - }, - { - `;`, - `select last_insert_id()`, - // this insert doesn't has the last_insert_id, should be same as the last insert case. - testkit.Rows(`104`), - }, - } - - for _, tt := range tests { - if strings.HasPrefix(tt.insert, "retry : ") { - // it's the last retry insert case, change the sessionVars. - retryInfo := &variable.RetryInfo{Retrying: true} - retryInfo.AddAutoIncrementID(1000) - retryInfo.AddAutoIncrementID(1001) - tk.Session().GetSessionVars().RetryInfo = retryInfo - tk.MustExec(tt.insert[8:]) - tk.Session().GetSessionVars().RetryInfo = &variable.RetryInfo{} - } else { - tk.MustExec(tt.insert) - } - if tt.query == "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO`" || - tt.query == "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`" { - tk.MustExec(tt.query) - } else { - tk.MustQuery(tt.query).Check(tt.result) - } - } -} - func TestPartitionInsertOnDuplicate(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -1139,75 +710,6 @@ func TestInsertFloatOverflow(t *testing.T) { tk.MustExec("drop table if exists t,t1") } -// TestAutoIDIncrementAndOffset There is a potential issue in MySQL: when the value of auto_increment_offset is greater -// than that of auto_increment_increment, the value of auto_increment_offset is ignored -// (https://dev.mysql.com/doc/refman/8.0/en/replication-options-master.html#sysvar_auto_increment_increment), -// This issue is a flaw of the implementation of MySQL and it doesn't exist in TiDB. -func TestAutoIDIncrementAndOffset(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec(`use test`) - // Test for offset is larger than increment. - tk.Session().GetSessionVars().AutoIncrementIncrement = 5 - tk.Session().GetSessionVars().AutoIncrementOffset = 10 - tk.MustExec(`create table io (a int key auto_increment)`) - tk.MustExec(`insert into io values (null),(null),(null)`) - tk.MustQuery(`select * from io`).Check(testkit.Rows("10", "15", "20")) - tk.MustExec(`drop table io`) - - // Test handle is PK. - tk.MustExec(`create table io (a int key auto_increment)`) - tk.Session().GetSessionVars().AutoIncrementOffset = 10 - tk.Session().GetSessionVars().AutoIncrementIncrement = 2 - tk.MustExec(`insert into io values (),(),()`) - tk.MustQuery(`select * from io`).Check(testkit.Rows("10", "12", "14")) - tk.MustExec(`delete from io`) - - // Test reset the increment. - tk.Session().GetSessionVars().AutoIncrementIncrement = 5 - tk.MustExec(`insert into io values (),(),()`) - tk.MustQuery(`select * from io`).Check(testkit.Rows("15", "20", "25")) - tk.MustExec(`delete from io`) - - tk.Session().GetSessionVars().AutoIncrementIncrement = 10 - tk.MustExec(`insert into io values (),(),()`) - tk.MustQuery(`select * from io`).Check(testkit.Rows("30", "40", "50")) - tk.MustExec(`delete from io`) - - tk.Session().GetSessionVars().AutoIncrementIncrement = 5 - tk.MustExec(`insert into io values (),(),()`) - tk.MustQuery(`select * from io`).Check(testkit.Rows("55", "60", "65")) - tk.MustExec(`drop table io`) - - // Test handle is not PK. - tk.Session().GetSessionVars().AutoIncrementIncrement = 2 - tk.Session().GetSessionVars().AutoIncrementOffset = 10 - tk.MustExec(`create table io (a int, b int auto_increment, key(b))`) - tk.MustExec(`insert into io(b) values (null),(null),(null)`) - // AutoID allocation will take increment and offset into consideration. - tk.MustQuery(`select b from io`).Check(testkit.Rows("10", "12", "14")) - // HandleID allocation will ignore the increment and offset. - tk.MustQuery(`select _tidb_rowid from io`).Check(testkit.Rows("15", "16", "17")) - tk.MustExec(`delete from io`) - - tk.Session().GetSessionVars().AutoIncrementIncrement = 10 - tk.MustExec(`insert into io(b) values (null),(null),(null)`) - tk.MustQuery(`select b from io`).Check(testkit.Rows("20", "30", "40")) - tk.MustQuery(`select _tidb_rowid from io`).Check(testkit.Rows("41", "42", "43")) - - // Test invalid value. - tk.Session().GetSessionVars().AutoIncrementIncrement = -1 - tk.Session().GetSessionVars().AutoIncrementOffset = -2 - tk.MustGetErrMsg(`insert into io(b) values (null),(null),(null)`, - "[autoid:8060]Invalid auto_increment settings: auto_increment_increment: -1, auto_increment_offset: -2, both of them must be in range [1..65535]") - tk.MustExec(`delete from io`) - - tk.Session().GetSessionVars().AutoIncrementIncrement = 65536 - tk.Session().GetSessionVars().AutoIncrementOffset = 65536 - tk.MustGetErrMsg(`insert into io(b) values (null),(null),(null)`, - "[autoid:8060]Invalid auto_increment settings: auto_increment_increment: 65536, auto_increment_offset: 65536, both of them must be in range [1..65535]") -} - // Fix https://github.com/pingcap/tidb/issues/32601. func TestTextTooLongError(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index fec3de1867933..ea6ff6155bf44 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -18,14 +18,10 @@ import ( "archive/zip" "bytes" "context" - "crypto/rand" - "encoding/base64" "encoding/json" "fmt" "os" - "path/filepath" "strings" - "time" "github.com/BurntSushi/toml" "github.com/pingcap/errors" @@ -95,44 +91,13 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *PlanReplayerExec) createFile() error { var err error - e.DumpInfo.File, e.DumpInfo.FileName, err = GeneratePlanReplayerFile() + e.DumpInfo.File, e.DumpInfo.FileName, err = domain.GeneratePlanReplayerFile() if err != nil { return err } return nil } -// GeneratePlanReplayerFile generates plan replayer file -func GeneratePlanReplayerFile() (*os.File, string, error) { - path := domain.GetPlanReplayerDirName() - err := os.MkdirAll(path, os.ModePerm) - if err != nil { - return nil, "", errors.AddStack(err) - } - fileName, err := generatePlanReplayerFileName() - if err != nil { - return nil, "", errors.AddStack(err) - } - zf, err := os.Create(filepath.Join(path, fileName)) - if err != nil { - return nil, "", errors.AddStack(err) - } - return zf, fileName, err -} - -func generatePlanReplayerFileName() (string, error) { - // Generate key and create zip file - time := time.Now().UnixNano() - b := make([]byte, 16) - //nolint: gosec - _, err := rand.Read(b) - if err != nil { - return "", err - } - key := base64.URLEncoding.EncodeToString(b) - return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil -} - func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) { fileName := e.FileName zf := e.File diff --git a/expression/expr_to_pb_test.go b/expression/expr_to_pb_test.go index 802d559eb3c80..7047c540fa577 100644 --- a/expression/expr_to_pb_test.go +++ b/expression/expr_to_pb_test.go @@ -558,6 +558,14 @@ func TestExprPushDownToFlash(t *testing.T) { require.NoError(t, err) exprs = append(exprs, function) + // ExtractDuration + extractDurationUnitCol := new(Constant) + extractDurationUnitCol.Value = types.NewStringDatum("microsecond") + extractDurationUnitCol.RetType = types.NewFieldType(mysql.TypeString) + function, err = NewFunction(mock.NewContext(), ast.Extract, types.NewFieldType(mysql.TypeLonglong), extractDurationUnitCol, durationColumn) + require.NoError(t, err) + exprs = append(exprs, function) + // CastIntAsInt function, err = NewFunction(mock.NewContext(), ast.Cast, types.NewFieldType(mysql.TypeLonglong), intColumn) require.NoError(t, err) diff --git a/expression/expression.go b/expression/expression.go index fefa1c403c959..cbfa4bfb0ee24 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -1227,7 +1227,7 @@ func scalarExprSupportedByFlash(function *ScalarFunction) bool { } case ast.Extract: switch function.Function.PbCode() { - case tipb.ScalarFuncSig_ExtractDatetime: + case tipb.ScalarFuncSig_ExtractDatetime, tipb.ScalarFuncSig_ExtractDuration: return true } case ast.Replace: diff --git a/go.mod b/go.mod index 01400b701b85b..c147af6b8506e 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/kvproto v0.0.0-20221103025916-e7e21f0e9cd9 - github.com/pingcap/log v1.1.1-0.20221110065318-21a4942860b3 + github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e github.com/pingcap/tipb v0.0.0-20221020071514-cd933387bcb5 diff --git a/go.sum b/go.sum index 0730b5193bd55..48306f2d8d576 100644 --- a/go.sum +++ b/go.sum @@ -783,8 +783,8 @@ github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= -github.com/pingcap/log v1.1.1-0.20221110065318-21a4942860b3 h1:T7e5Low0BU2ZazI2dz2mh3W1qv+w8wtvq1YR8DneA0c= -github.com/pingcap/log v1.1.1-0.20221110065318-21a4942860b3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI= +github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= github.com/pingcap/tipb v0.0.0-20221020071514-cd933387bcb5 h1:Yoo8j5xQGxjlsC3yt0ndsiAz0WZXED9rzsKmEN0U0DY= diff --git a/infoschema/tables.go b/infoschema/tables.go index 656a99f52d838..8df6cab9cb84c 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1729,8 +1729,8 @@ func FormatTiDBVersion(TiDBVersion string, isDefaultVersion bool) string { // The user hasn't set the config 'ServerVersion'. if isDefaultVersion { - nodeVersion = TiDBVersion[strings.LastIndex(TiDBVersion, "TiDB-")+len("TiDB-"):] - if nodeVersion[0] == 'v' { + nodeVersion = TiDBVersion[strings.Index(TiDBVersion, "TiDB-")+len("TiDB-"):] + if len(nodeVersion) > 0 && nodeVersion[0] == 'v' { nodeVersion = nodeVersion[1:] } nodeVersions := strings.Split(nodeVersion, "-") diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 3650f2833a1a6..aee92b0afed70 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -676,8 +676,14 @@ func TestSelectHiddenColumn(t *testing.T) { func TestFormatVersion(t *testing.T) { // Test for defaultVersions. - defaultVersions := []string{"5.7.25-TiDB-None", "5.7.25-TiDB-8.0.18", "5.7.25-TiDB-8.0.18-beta.1", "5.7.25-TiDB-v4.0.0-beta-446-g5268094af"} - defaultRes := []string{"None", "8.0.18", "8.0.18-beta.1", "4.0.0-beta"} + defaultVersions := []string{ + "5.7.25-TiDB-None", + "5.7.25-TiDB-8.0.18", + "5.7.25-TiDB-8.0.18-beta.1", + "5.7.25-TiDB-v4.0.0-beta-446-g5268094af", + "5.7.25-TiDB-", + "5.7.25-TiDB-v4.0.0-TiDB-446"} + defaultRes := []string{"None", "8.0.18", "8.0.18-beta.1", "4.0.0-beta", "", "4.0.0-TiDB"} for i, v := range defaultVersions { version := infoschema.FormatTiDBVersion(v, true) require.Equal(t, defaultRes[i], version) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 8a8acaa292666..88e8d8f43bffa 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -98,6 +98,7 @@ const ( ActionMultiSchemaChange ActionType = 61 ActionFlashbackCluster ActionType = 62 ActionRecoverSchema ActionType = 63 + ActionReorganizePartition ActionType = 64 ) var actionMap = map[ActionType]string{ @@ -160,6 +161,7 @@ var actionMap = map[ActionType]string{ ActionMultiSchemaChange: "alter table multi-schema change", ActionFlashbackCluster: "flashback cluster", ActionRecoverSchema: "flashback schema", + ActionReorganizePartition: "alter table reorganize partition", // `ActionAlterTableAlterPartition` is removed and will never be used. // Just left a tombstone here for compatibility. diff --git a/planner/core/collect_column_stats_usage.go b/planner/core/collect_column_stats_usage.go index 49fffb149b85d..4a351e60a9018 100644 --- a/planner/core/collect_column_stats_usage.go +++ b/planner/core/collect_column_stats_usage.go @@ -17,7 +17,6 @@ package core import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx/variable" ) const ( @@ -51,7 +50,7 @@ type columnStatsUsageCollector struct { visitedtbls map[int64]struct{} } -func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector { +func newColumnStatsUsageCollector(collectMode uint64, enabledPlanCapture bool) *columnStatsUsageCollector { collector := &columnStatsUsageCollector{ collectMode: collectMode, // Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning. @@ -64,7 +63,7 @@ func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector if collectMode&collectHistNeededColumns != 0 { collector.histNeededCols = make(map[model.TableItemID]struct{}) } - if variable.EnablePlanReplayerCapture.Load() { + if enabledPlanCapture { collector.collectVisitedTable = true collector.visitedtbls = map[int64]struct{}{} } @@ -300,7 +299,7 @@ func CollectColumnStatsUsage(lp LogicalPlan, predicate, histNeeded bool) ([]mode if histNeeded { mode |= collectHistNeededColumns } - collector := newColumnStatsUsageCollector(mode) + collector := newColumnStatsUsageCollector(mode, lp.SCtx().GetSessionVars().EnablePlanReplayerCapture) collector.collectFromPlan(lp) if collector.collectVisitedTable { recordTableRuntimeStats(lp.SCtx(), collector.visitedtbls) diff --git a/planner/core/plan_replayer_capture_test.go b/planner/core/plan_replayer_capture_test.go index 2e88f090bd784..6778cdba20bbf 100644 --- a/planner/core/plan_replayer_capture_test.go +++ b/planner/core/plan_replayer_capture_test.go @@ -35,7 +35,6 @@ func TestPlanReplayerCaptureRecordJsonStats(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t1(a int)") tk.MustExec("create table t2(a int)") - tk.MustExec("SET global tidb_enable_plan_replayer_capture = ON;") tk.MustExec("analyze table t1") tk.MustExec("analyze table t2") testcases := []struct { @@ -68,6 +67,7 @@ func getTableStats(sql string, t *testing.T, ctx sessionctx.Context, dom *domain err = core.Preprocess(context.Background(), ctx, stmt, core.WithPreprocessorReturn(&core.PreprocessorReturn{InfoSchema: dom.InfoSchema()})) require.NoError(t, err) sctx := core.MockContext() + sctx.GetSessionVars().EnablePlanReplayerCapture = true builder, _ := core.NewPlanBuilder().Init(sctx, dom.InfoSchema(), &hint.BlockHintProcessor{}) domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(dom.InfoSchema()) plan, err := builder.Build(context.TODO(), stmt) diff --git a/session/bootstrap.go b/session/bootstrap.go index 08bf0293db72c..57660af7fa808 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -447,6 +447,7 @@ const ( plan_digest VARCHAR(128) NOT NULL, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (sql_digest,plan_digest));` + // CreateStatsTableLocked stores the locked tables CreateStatsTableLocked = `CREATE TABLE IF NOT EXISTS mysql.stats_table_locked( table_id bigint(64) NOT NULL, diff --git a/session/session.go b/session/session.go index 4b220af34516a..1b254c879ec40 100644 --- a/session/session.go +++ b/session/session.go @@ -2878,7 +2878,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 8) + ses, err := createSessions(store, 9) if err != nil { return nil, err } @@ -2953,10 +2953,10 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } // setup plan replayer handle - dom.SetupPlanReplayerHandle(ses[6]) + dom.SetupPlanReplayerHandle(ses[6], ses[7]) dom.StartPlanReplayerHandle() // setup dumpFileGcChecker - dom.SetupDumpFileGCChecker(ses[7]) + dom.SetupDumpFileGCChecker(ses[8]) dom.DumpFileGcCheckerLoop() // A sub context for update table stats, and other contexts for concurrent stats loading. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b11b727079630..b8fbcf54848e1 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1305,6 +1305,9 @@ type SessionVars struct { // preuseChunkAlloc indicates whether pre statement use chunk alloc // like select @@last_sql_use_alloc preUseChunkAlloc bool + + // EnablePlanReplayerCapture indicates whether enabled plan replayer capture + EnablePlanReplayerCapture bool } // GetNewChunkWithCapacity Attempt to request memory from the chunk pool diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a592e7a7e8831..d73885663d957 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1039,15 +1039,17 @@ var defaultSysVars = []*SysVar{ }, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { return fmt.Sprintf("%d", MemoryUsageAlarmKeepRecordNum.Load()), nil }}, - {Scope: ScopeGlobal, Name: TiDBEnablePlanReplayerCapture, Value: BoolToOnOff(false), Type: TypeBool, - SetGlobal: func(ctx context.Context, s *SessionVars, val string) error { - EnablePlanReplayerCapture.Store(TiDBOptOn(val)) - return nil - }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { - return strconv.FormatBool(EnablePlanReplayerCapture.Load()), nil - }}, /* The system variables below have GLOBAL and SESSION scope */ + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanReplayerCapture, Value: BoolToOnOff(false), Type: TypeBool, + SetSession: func(s *SessionVars, val string) error { + s.EnablePlanReplayerCapture = TiDBOptOn(val) + return nil + }, + GetSession: func(vars *SessionVars) (string, error) { + return strconv.FormatBool(vars.EnablePlanReplayerCapture), nil + }, + }, {Scope: ScopeGlobal | ScopeSession, Name: TiDBRowFormatVersion, Value: strconv.Itoa(DefTiDBRowFormatV1), Type: TypeUnsigned, MinValue: 1, MaxValue: 2, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgRowFormat(TidbOptInt64(val, DefTiDBRowFormatV2)) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 80c9b41f4cc6e..3511775de08f1 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1148,9 +1148,8 @@ var ( // DefTiDBServerMemoryLimit indicates the default value of TiDBServerMemoryLimit(TotalMem * 80%). // It should be a const and shouldn't be modified after tidb is started. - DefTiDBServerMemoryLimit = serverMemoryLimitDefaultValue() - GOGCTunerThreshold = atomic.NewFloat64(DefTiDBGOGCTunerThreshold) - EnablePlanReplayerCapture = atomic.NewBool(DefTiDBEnablePlanReplayerCapture) + DefTiDBServerMemoryLimit = serverMemoryLimitDefaultValue() + GOGCTunerThreshold = atomic.NewFloat64(DefTiDBGOGCTunerThreshold) ) var ( diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 92c3b535ba5d3..054ec83ee7e8a 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -2028,7 +2028,6 @@ func (w *GCWorker) doGCPlacementRules(se session.Session, safePoint uint64, dr u zap.Int64("tableID", id), zap.String("endKey", string(dr.EndKey)), zap.Uint64("safePoint", safePoint)) ruleID := fmt.Sprintf("table-%v-r", id) if err := infosync.DeleteTiFlashPlacementRule(context.Background(), "tiflash", ruleID); err != nil { - // If DeletePlacementRule fails here, the rule will be deleted in `HandlePlacementRuleRoutine`. logutil.BgLogger().Error("delete TiFlash pd rule failed when gc", zap.Error(err), zap.String("ruleID", ruleID), zap.Uint64("safePoint", safePoint)) } else { diff --git a/util/logutil/log.go b/util/logutil/log.go index 83eb44eb9d2cc..a7d59ae0c2d0f 100644 --- a/util/logutil/log.go +++ b/util/logutil/log.go @@ -111,37 +111,30 @@ func InitLogger(cfg *LogConfig) error { return errors.Trace(err) } - _, _, err = initGRPCLogger(cfg) - if err != nil { - return errors.Trace(err) - } - + initGRPCLogger(gl) return nil } -func initGRPCLogger(cfg *LogConfig) (*zap.Logger, *log.ZapProperties, error) { - // Copy Config struct by assignment. - config := cfg.Config - var l *zap.Logger - var err error - var prop *log.ZapProperties +func initGRPCLogger(gl *zap.Logger) { + level := zapcore.ErrorLevel + verbosity := 0 if len(os.Getenv("GRPC_DEBUG")) > 0 { - config.Level = "debug" - l, prop, err = log.InitLogger(&config, zap.AddStacktrace(zapcore.FatalLevel)) - if err != nil { - return nil, nil, errors.Trace(err) - } - gzap.ReplaceGrpcLoggerV2WithVerbosity(l, 999) - } else { - config.Level = "error" - l, prop, err = log.InitLogger(&config, zap.AddStacktrace(zapcore.FatalLevel)) - if err != nil { - return nil, nil, errors.Trace(err) - } - gzap.ReplaceGrpcLoggerV2(l) + verbosity = 999 + level = zapcore.DebugLevel } - return l, prop, nil + newgl := gl.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + oldcore, ok := core.(*log.TextIOCore) + if !ok { + return oldcore + } + newcore := oldcore.Clone() + leveler := zap.NewAtomicLevel() + leveler.SetLevel(level) + newcore.LevelEnabler = leveler + return newcore + })) + gzap.ReplaceGrpcLoggerV2WithVerbosity(newgl, verbosity) } // ReplaceLogger replace global logger instance with given log config. diff --git a/util/logutil/log_test.go b/util/logutil/log_test.go index d059204973678..57a7786c0e530 100644 --- a/util/logutil/log_test.go +++ b/util/logutil/log_test.go @@ -100,21 +100,6 @@ func TestSetLevel(t *testing.T) { require.Equal(t, zap.DebugLevel, log.GetLevel()) } -func TestGrpcLoggerCreation(t *testing.T) { - level := "info" - conf := NewLogConfig(level, DefaultLogFormat, "", EmptyFileLogConfig, false) - _, p, err := initGRPCLogger(conf) - // assert after init grpc logger, the original conf is not changed - require.Equal(t, conf.Level, level) - require.NoError(t, err) - require.Equal(t, p.Level.Level(), zap.ErrorLevel) - os.Setenv("GRPC_DEBUG", "1") - defer os.Unsetenv("GRPC_DEBUG") - _, newP, err := initGRPCLogger(conf) - require.NoError(t, err) - require.Equal(t, newP.Level.Level(), zap.DebugLevel) -} - func TestSlowQueryLoggerCreation(t *testing.T) { level := "Error" conf := NewLogConfig(level, DefaultLogFormat, "", EmptyFileLogConfig, false)