Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics/handle: refine the condition of dumping stats delta (#41133) #41170

Open
wants to merge 1 commit into
base: release-5.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
startup --host_jvm_args=-Xmx8g
startup --unlimit_coredumps

run:ci --color=yes

build --announce_rc
build --experimental_guard_against_concurrent_changes
build --experimental_remote_merkle_tree_cache
build --java_language_version=17
build --java_runtime_version=17
build --tool_java_language_version=17
build --tool_java_runtime_version=17
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --color=yes
build:ci --experimental_remote_cache_compression
build:release --workspace_status_command=./build/print-workspace-status.sh --stamp
build:release --config=ci
build:race --config=ci
build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=1 --test_sharding_strategy=disabled

test --test_env=TZ=Asia/Shanghai
test --test_output=errors --test_summary=testcase
test:ci --color=yes --spawn_strategy=local
test:ci --verbose_failures --test_verbose_timeout_warnings
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --experimental_ui_max_stdouterr_bytes=104857600
test:race --test_timeout=1200,6000,18000,72000

try-import /data/bazel
37 changes: 32 additions & 5 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,43 @@ var (
dumpStatsMaxDuration = time.Hour
)

// needDumpStatsDelta returns true when only updates a small portion of the table and the time since last update
// do not exceed one hour.
func needDumpStatsDelta(h *Handle, id int64, item variable.TableDelta, currentTime time.Time) bool {
// needDumpStatsDelta checks whether to dump stats delta.
// 1. If the table doesn't exist or is a mem table or system table, then return false.
// 2. If the mode is DumpAll, then return true.
// 3. If the stats delta haven't been dumped in the past hour, then return true.
// 4. If the table stats is pseudo or empty or `Modify Count / Table Count` exceeds the threshold.
func (h *Handle) needDumpStatsDelta(is infoschema.InfoSchema, mode dumpMode, id int64, item variable.TableDelta, currentTime time.Time) bool {
tbl, ok := h.getTableByPhysicalID(is, id)
if !ok {
return false
}
dbInfo, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
if util.IsMemOrSysDB(dbInfo.Name.L) {
return false
}
if mode == DumpAll {
return true
}
if item.InitTime.IsZero() {
item.InitTime = currentTime
}
<<<<<<< HEAD
tbl, ok := h.statsCache.Load().(statsCache).tables[id]
if !ok {
// No need to dump if the stats is invalid.
return false
}
=======
>>>>>>> 6f45f81f3d4 (statistics/handle: refine the condition of dumping stats delta (#41133))
if currentTime.Sub(item.InitTime) > dumpStatsMaxDuration {
// Dump the stats to kv at least once an hour.
return true
}
if tbl.Count == 0 || float64(item.Count)/float64(tbl.Count) > DumpStatsDeltaRatio {
statsTbl := h.GetPartitionStats(tbl.Meta(), id)
if statsTbl.Pseudo || statsTbl.Count == 0 || float64(item.Count)/float64(statsTbl.Count) > DumpStatsDeltaRatio {
// Dump the stats when there are many modifications.
return true
}
Expand Down Expand Up @@ -455,9 +476,15 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
h.globalMap.data = deltaMap
h.globalMap.Unlock()
}()
// TODO: pass in do.InfoSchema() to DumpStatsDeltaToKV.
is := func() infoschema.InfoSchema {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.ctx.GetDomainInfoSchema().(infoschema.InfoSchema)
}()
currentTime := time.Now()
for id, item := range deltaMap {
if mode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) {
if !h.needDumpStatsDelta(is, mode, id, item, currentTime) {
continue
}
updated, err := h.dumpTableStatCountToKV(id, item)
Expand Down
298 changes: 298 additions & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2519,3 +2519,301 @@ func (s *testSerialStatsSuite) TestEnableAndDisableColumnTracking(c *C) {
tk.MustExec("set global tidb_enable_column_tracking = 0")
tk.MustQuery("show column_stats_usage where db_name = 'test' and table_name = 't' and last_used_at is not null").Check(testkit.Rows())
}
<<<<<<< HEAD
=======

func TestStatsLockUnlockForAutoAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

oriStart := tk.MustQuery("select @@tidb_auto_analyze_start_time").Rows()[0][0].(string)
oriEnd := tk.MustQuery("select @@tidb_auto_analyze_end_time").Rows()[0][0].(string)
handle.AutoAnalyzeMinCnt = 0
defer func() {
handle.AutoAnalyzeMinCnt = 1000
tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_start_time='%v'", oriStart))
tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_end_time='%v'", oriEnd))
}()

h := dom.StatsHandle()
tk.MustExec("use test")
tk.MustExec("create table t (a int)")
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 19))
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
is := dom.InfoSchema()
require.NoError(t, h.Update(is))
// To pass the stats.Pseudo check in autoAnalyzeTable
tk.MustExec("analyze table t")
tk.MustExec("explain select * from t where a = 1")
require.NoError(t, h.LoadNeededHistograms())
tk.MustExec("set global tidb_auto_analyze_start_time='00:00 +0000'")
tk.MustExec("set global tidb_auto_analyze_end_time='23:59 +0000'")

tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 10))
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
require.True(t, h.HandleAutoAnalyze(is))

tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.Nil(t, err)

tblStats := h.GetTableStats(tbl.Meta())
for _, col := range tblStats.Columns {
require.True(t, col.IsStatsInitialized())
}

tk.MustExec("lock stats t")

tk.MustExec("delete from t limit 12")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
require.False(t, h.HandleAutoAnalyze(is))

tblStats1 := h.GetTableStats(tbl.Meta())
require.Equal(t, tblStats, tblStats1)

tk.MustExec("unlock stats t")

tk.MustExec("delete from t limit 4")

rows := tk.MustQuery("select count(*) from t").Rows()
num, _ := strconv.Atoi(rows[0][0].(string))
require.Equal(t, num, 15)

tk.MustExec("analyze table t")

tblStats2 := h.GetTableStats(tbl.Meta())
require.Equal(t, int64(15), tblStats2.Count)
}

func TestStatsLockForFeedback(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
testKit.MustExec("insert into t values (1,2),(2,2),(4,5)")
testKit.MustExec("analyze table t with 0 topn")
testKit.MustExec("insert into t values (3,4)")
for i := 5; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values(%d, %d)", i, i+1))
}

h := dom.StatsHandle()
oriProbability := statistics.FeedbackProbability.Load()
oriNumber := statistics.MaxNumberOfRanges
oriMinLogCount := handle.MinLogScanCount.Load()
oriErrorRate := handle.MinLogErrorRate.Load()
defer func() {
statistics.FeedbackProbability.Store(oriProbability)
statistics.MaxNumberOfRanges = oriNumber
handle.MinLogScanCount.Store(oriMinLogCount)
handle.MinLogErrorRate.Store(oriErrorRate)
}()
statistics.FeedbackProbability.Store(1)
handle.MinLogScanCount.Store(0)
handle.MinLogErrorRate.Store(0)
tests := []struct {
sql string
hist string
}{
{
// test primary key feedback
sql: "select * from t where t.a <= 4 order by a desc",
hist: "column:1 ndv:4 totColSize:0\n" +
"num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0 ndv: 0\n" +
"num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n" +
"num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0",
},
//run 1st sql after table locked, hist should not changed
{
sql: "select * from t where t.a <= 8 order by a desc",
hist: "column:1 ndv:4 totColSize:0\n" +
"num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0 ndv: 0\n" +
"num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n" +
"num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0",
},
//run 2nd sql after table unlocked, hist should not changed
{
sql: "select * from t where t.a <= 12 order by a desc",
hist: "column:1 ndv:12 totColSize:0\n" +
"num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0 ndv: 0\n" +
"num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n" +
"num: 9 lower_bound: 4 upper_bound: 12 repeats: 0 ndv: 0",
},
//run 4th sql after table locked, hist should not changed
{
sql: "select * from t",
hist: "column:1 ndv:12 totColSize:0\n" +
"num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0 ndv: 0\n" +
"num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n" +
"num: 9 lower_bound: 4 upper_bound: 12 repeats: 0 ndv: 0",
},
}
is := dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
for i, test := range tests {
testKit.MustQuery(test.sql)
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.DumpStatsFeedbackToKV())
require.NoError(t, h.HandleUpdateStats(dom.InfoSchema()))
require.NoError(t, err)
require.NoError(t, h.Update(is))
tblInfo := table.Meta()
tbl := h.GetTableStats(tblInfo)
//fmt.Printf("\n i: %d, exp: %s, \nact: %s\n", i, tests[i].hist, tbl.Columns[tblInfo.Columns[0].ID].ToString(0))
require.Equal(t, tests[i].hist, tbl.Columns[tblInfo.Columns[0].ID].ToString(0))
// add table lock after 2nd
if i == 0 {
testKit.MustExec("lock stats t")
} else if i == 1 {
testKit.MustExec("unlock stats t")
} else if i == 2 {
testKit.MustExec("lock stats t")
}
}
}

func TestStatsLockForDelta(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("set @@session.tidb_analyze_version = 1")
testKit.MustExec("create table t1 (c1 int, c2 int)")
testKit.MustExec("create table t2 (c1 int, c2 int)")

is := dom.InfoSchema()
tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tableInfo1 := tbl1.Meta()
h := dom.StatsHandle()

testKit.MustExec("lock stats t1")

rowCount1 := 10
rowCount2 := 20
for i := 0; i < rowCount1; i++ {
testKit.MustExec("insert into t1 values(1, 2)")
}
for i := 0; i < rowCount2; i++ {
testKit.MustExec("insert into t2 values(1, 2)")
}

err = h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
err = h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)

require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
stats1 := h.GetTableStats(tableInfo1)
require.Equal(t, stats1.Count, int64(0))

tbl2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
tableInfo2 := tbl2.Meta()
stats2 := h.GetTableStats(tableInfo2)
require.Equal(t, int64(rowCount2), stats2.Count)

testKit.MustExec("analyze table t1")
for i := 0; i < rowCount1; i++ {
testKit.MustExec("insert into t1 values(1, 2)")
}
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
stats1 = h.GetTableStats(tableInfo1)
require.Equal(t, stats1.Count, int64(0))

testKit.MustExec("unlock stats t1")

testKit.MustExec("analyze table t1")
stats1 = h.GetTableStats(tableInfo1)
require.Equal(t, int64(20), stats1.Count)

for i := 0; i < rowCount1; i++ {
testKit.MustExec("insert into t1 values(1, 2)")
}
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
stats1 = h.GetTableStats(tableInfo1)
require.Equal(t, int64(30), stats1.Count)
}

func TestFillMissingStatsMeta(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int, b int)")
tk.MustExec("create table t2 (a int, b int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (maxvalue))")

tk.MustQuery("select * from mysql.stats_meta").Check(testkit.Rows())

is := dom.InfoSchema()
tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tbl1ID := tbl1.Meta().ID
tbl2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
tbl2Info := tbl2.Meta()
tbl2ID := tbl2Info.ID
require.Len(t, tbl2Info.Partition.Definitions, 2)
p0ID := tbl2Info.Partition.Definitions[0].ID
p1ID := tbl2Info.Partition.Definitions[1].ID
h := dom.StatsHandle()

checkStatsMeta := func(id int64, expectedModifyCount, expectedCount string) int64 {
rows := tk.MustQuery(fmt.Sprintf("select version, modify_count, count from mysql.stats_meta where table_id = %v", id)).Rows()
require.Len(t, rows, 1)
ver, err := strconv.ParseInt(rows[0][0].(string), 10, 64)
require.NoError(t, err)
require.Equal(t, expectedModifyCount, rows[0][1])
require.Equal(t, expectedCount, rows[0][2])
return ver
}

tk.MustExec("insert into t1 values (1, 2), (3, 4)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta))
require.NoError(t, h.Update(is))
ver1 := checkStatsMeta(tbl1ID, "2", "2")
tk.MustExec("delete from t1 where a = 1")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta))
require.NoError(t, h.Update(is))
ver2 := checkStatsMeta(tbl1ID, "3", "1")
require.Greater(t, ver2, ver1)

tk.MustExec("insert into t2 values (1, 2), (3, 4)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta))
require.NoError(t, h.Update(is))
checkStatsMeta(p0ID, "2", "2")
globalVer1 := checkStatsMeta(tbl2ID, "2", "2")
tk.MustExec("insert into t2 values (11, 12)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta))
require.NoError(t, h.Update(is))
checkStatsMeta(p1ID, "1", "1")
globalVer2 := checkStatsMeta(tbl2ID, "3", "3")
require.Greater(t, globalVer2, globalVer1)
}

func TestNotDumpSysTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int, b int)")
h := dom.StatsHandle()
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tk.MustQuery("select count(1) from mysql.stats_meta").Check(testkit.Rows("1"))
// After executing `delete from mysql.stats_meta`, a delta for mysql.stats_meta is created but it would not be dumped.
tk.MustExec("delete from mysql.stats_meta")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("mysql"), model.NewCIStr("stats_meta"))
require.NoError(t, err)
tblID := tbl.Meta().ID
tk.MustQuery(fmt.Sprintf("select * from mysql.stats_meta where table_id = %v", tblID)).Check(testkit.Rows())
}
>>>>>>> 6f45f81f3d4 (statistics/handle: refine the condition of dumping stats delta (#41133))