diff --git a/pkg/config/config.go b/pkg/config/config.go index 6b29b8c5ee8b7..44cfd2613206b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -738,6 +738,9 @@ type Performance struct { // If ForceInitStats is false, tidb can provide service before init stats is finished. Note that during the period // of init stats the optimizer may make bad decisions due to pseudo stats. ForceInitStats bool `toml:"force-init-stats" json:"force-init-stats"` + + // ConcurrentlyInitStats indicates whether to use concurrency to init stats. + ConcurrentlyInitStats bool `toml:"concurrently-init-stats" json:"concurrently-init-stats"` } // PlanCache is the PlanCache section of the config. @@ -1007,6 +1010,7 @@ var defaultConf = Config{ EnableLoadFMSketch: false, LiteInitStats: true, ForceInitStats: true, + ConcurrentlyInitStats: false, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index 5996f489e6eb2..a3cedd175a49c 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/statistics/handle/cache", "//pkg/statistics/handle/globalstats", "//pkg/statistics/handle/history", + "//pkg/statistics/handle/initstats", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index 124185cc608db..f6c58c025e5e6 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/cache" + "github.com/pingcap/tidb/pkg/statistics/handle/initstats" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" @@ -74,6 +75,12 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error if err != nil { return nil, err } + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + ls := initstats.NewWorker(rc.Next, h.initStatsMeta4Chunk) + ls.LoadStats(is, tables, rc) + ls.Wait() + return tables, nil + } req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) for { @@ -231,13 +238,19 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util. } func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache util.StatsCache) error { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } defer terror.Call(rc.Close) + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite) + ls.LoadStats(is, cache, rc) + ls.Wait() + return nil + } + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) for { @@ -254,13 +267,19 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache util.St } func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsCache) error { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } defer terror.Call(rc.Close) + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk) + ls.LoadStats(is, cache, rc) + ls.Wait() + return nil + } + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) for { @@ -303,13 +322,21 @@ func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4C } func (h *Handle) initStatsTopN(cache util.StatsCache) error { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1" rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } defer terror.Call(rc.Close) + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) { + h.initStatsTopN4Chunk(cache, iter) + }) + ls.LoadStats(nil, cache, rc) + ls.Wait() + return nil + } + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) for { @@ -428,24 +455,32 @@ func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterato } func (h *Handle) initStatsBuckets(cache util.StatsCache) error { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } defer terror.Call(rc.Close) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) { + h.initStatsBuckets4Chunk(cache, iter) + }) + ls.LoadStats(nil, cache, rc) + ls.Wait() + } else { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsBuckets4Chunk(cache, iter) } - h.initStatsBuckets4Chunk(cache, iter) } tables := cache.Values() for _, table := range tables { diff --git a/pkg/statistics/handle/handletest/statstest/BUILD.bazel b/pkg/statistics/handle/handletest/statstest/BUILD.bazel index e14d55030f290..f29409c070428 100644 --- a/pkg/statistics/handle/handletest/statstest/BUILD.bazel +++ b/pkg/statistics/handle/handletest/statstest/BUILD.bazel @@ -9,7 +9,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 9, + shard_count = 12, deps = [ "//pkg/config", "//pkg/parser/model", diff --git a/pkg/statistics/handle/handletest/statstest/stats_test.go b/pkg/statistics/handle/handletest/statstest/stats_test.go index 7c710becadc37..1915e7a6aec96 100644 --- a/pkg/statistics/handle/handletest/statstest/stats_test.go +++ b/pkg/statistics/handle/handletest/statstest/stats_test.go @@ -187,10 +187,38 @@ func testInitStatsMemTrace(t *testing.T) { } func TestInitStatsMemTraceWithLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = false + }) testInitStatsMemTraceFunc(t, true) } func TestInitStatsMemTraceWithoutLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = false + }) + testInitStatsMemTraceFunc(t, false) +} + +func TestInitStatsMemTraceWithConcurrrencyLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = true + }) + testInitStatsMemTraceFunc(t, true) +} + +func TestInitStatsMemTraceWithoutConcurrrencyLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = true + }) testInitStatsMemTraceFunc(t, false) } @@ -266,11 +294,26 @@ func TestInitStats51358(t *testing.T) { } func TestInitStatsVer2(t *testing.T) { - originValue := config.GetGlobalConfig().Performance.LiteInitStats - defer func() { - config.GetGlobalConfig().Performance.LiteInitStats = originValue - }() - config.GetGlobalConfig().Performance.LiteInitStats = false + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + config.GetGlobalConfig().Performance.LiteInitStats = false + config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false + }) + initStatsVer2(t) +} + +func TestInitStatsVer2Concurrency(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + config.GetGlobalConfig().Performance.LiteInitStats = false + config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true + }) + initStatsVer2(t) +} + +func initStatsVer2(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/pkg/statistics/handle/initstats/BUILD.bazel b/pkg/statistics/handle/initstats/BUILD.bazel new file mode 100644 index 0000000000000..897e408f8290a --- /dev/null +++ b/pkg/statistics/handle/initstats/BUILD.bazel @@ -0,0 +1,18 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "initstats", + srcs = ["load_stats.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats", + visibility = ["//visibility:public"], + deps = [ + "//pkg/infoschema", + "//pkg/kv", + "//pkg/statistics/handle/logutil", + "//pkg/statistics/handle/util", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/sqlexec", + "@org_uber_go_zap//:zap", + ], +) diff --git a/pkg/statistics/handle/initstats/load_stats.go b/pkg/statistics/handle/initstats/load_stats.go new file mode 100644 index 0000000000000..831758dcb63ef --- /dev/null +++ b/pkg/statistics/handle/initstats/load_stats.go @@ -0,0 +1,86 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package initstats + +import ( + "context" + "runtime" + "sync" + + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +// Worker is used to load stats concurrently. +type Worker struct { + taskFunc func(ctx context.Context, req *chunk.Chunk) error + dealFunc func(is infoschema.InfoSchema, cache statsutil.StatsCache, iter *chunk.Iterator4Chunk) + mu sync.Mutex + wg util.WaitGroupWrapper +} + +// NewWorker creates a new Worker. +func NewWorker( + taskFunc func(ctx context.Context, req *chunk.Chunk) error, + dealFunc func(is infoschema.InfoSchema, cache statsutil.StatsCache, iter *chunk.Iterator4Chunk)) *Worker { + return &Worker{ + taskFunc: taskFunc, + dealFunc: dealFunc, + } +} + +// LoadStats loads stats concurrently when to init stats +func (ls *Worker) LoadStats(is infoschema.InfoSchema, cache statsutil.StatsCache, rc sqlexec.RecordSet) { + concurrency := runtime.GOMAXPROCS(0) + for n := 0; n < concurrency; n++ { + ls.wg.Run(func() { + req := rc.NewChunk(nil) + ls.loadStats(is, cache, req) + }) + } +} + +func (ls *Worker) loadStats(is infoschema.InfoSchema, cache statsutil.StatsCache, req *chunk.Chunk) { + iter := chunk.NewIterator4Chunk(req) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + for { + err := ls.getTask(ctx, req) + if err != nil { + logutil.StatsLogger().Error("load stats failed", zap.Error(err)) + return + } + if req.NumRows() == 0 { + return + } + ls.dealFunc(is, cache, iter) + } +} + +func (ls *Worker) getTask(ctx context.Context, req *chunk.Chunk) error { + ls.mu.Lock() + defer ls.mu.Unlock() + return ls.taskFunc(ctx, req) +} + +// Wait closes the load stats worker. +func (ls *Worker) Wait() { + ls.wg.Wait() +}