Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 1, 2022
2 parents e1c8bbb + 8bbf75d commit aa406ed
Show file tree
Hide file tree
Showing 21 changed files with 378 additions and 95 deletions.
10 changes: 9 additions & 1 deletion br/pkg/streamhelper/spans/sorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,19 @@ func (f *ValuedFull) mergeWithOverlap(val Valued, overlapped []Valued, newItems

// overlapped inserts the overlapped ranges of the span into the `result` slice.
func (f *ValuedFull) overlapped(k Span, result *[]Valued) {
var first Span
var (
first Span
hasFirst bool
)
// Firstly, let's find whether there is a overlapped region with less start key.
f.inner.DescendLessOrEqual(Valued{Key: k}, func(item btree.Item) bool {
first = item.(Valued).Key
hasFirst = true
return false
})
if !hasFirst || !Overlaps(first, k) {
first = k
}

f.inner.AscendGreaterOrEqual(Valued{Key: first}, func(item btree.Item) bool {
r := item.(Valued)
Expand Down
37 changes: 37 additions & 0 deletions br/pkg/streamhelper/spans/sorted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,43 @@ func TestSubRange(t *testing.T) {
kv(s("0008", ""), 42),
},
},
{
Range: []spans.Span{
s("0001", "0004"),
s("0005", "0008"),
},
InputSequence: []spans.Valued{
kv(s("0001", "0002"), 42),
kv(s("0002", "0008"), 43),
kv(s("0004", "0007"), 45),
kv(s("0000", "00015"), 48),
},
Result: []spans.Valued{
kv(s("0001", "00015"), 48),
kv(s("00015", "0002"), 42),
kv(s("0002", "0004"), 43),
kv(s("0005", "0007"), 45),
kv(s("0007", "0008"), 43),
},
},
{
Range: []spans.Span{
s("0001", "0004"),
s("0005", "0008"),
},
InputSequence: []spans.Valued{
kv(s("0004", "0008"), 32),
kv(s("00041", "0007"), 33),
kv(s("0004", "00041"), 99999),
kv(s("0005", "0006"), 34),
},
Result: []spans.Valued{
kv(s("0001", "0004"), 0),
kv(s("0005", "0006"), 34),
kv(s("0006", "0007"), 33),
kv(s("0007", "0008"), 32),
},
},
}

for i, c := range cases {
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"domain.go",
"domain_sysvars.go",
"domainctx.go",
"historical_stats.go",
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
Expand Down
49 changes: 49 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Domain struct {
planReplayerHandle *planReplayerHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -1586,6 +1587,14 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.C
}
}

// SetupHistoricalStatsWorker setups worker
func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) {
do.historicalStatsWorker = &HistoricalStatsWorker{
tblCH: make(chan int64, 16),
sctx: ctx,
}
}

// SetupDumpFileGCChecker setup sctx
func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {
do.dumpFileGcChecker.setupSctx(ctx)
Expand All @@ -1595,13 +1604,19 @@ var planReplayerHandleLease atomic.Uint64

func init() {
planReplayerHandleLease.Store(uint64(10 * time.Second))
enableDumpHistoricalStats.Store(true)
}

// DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test
func DisablePlanReplayerBackgroundJob4Test() {
planReplayerHandleLease.Store(0)
}

// DisableDumpHistoricalStats4Test disable historical dump worker for test
func DisableDumpHistoricalStats4Test() {
enableDumpHistoricalStats.Store(false)
}

// StartPlanReplayerHandle start plan replayer handle job
func (do *Domain) StartPlanReplayerHandle() {
lease := planReplayerHandleLease.Load()
Expand Down Expand Up @@ -1673,6 +1688,40 @@ func (do *Domain) DumpFileGcCheckerLoop() {
}()
}

// GetHistoricalStatsWorker gets historical workers
func (do *Domain) GetHistoricalStatsWorker() *HistoricalStatsWorker {
return do.historicalStatsWorker
}

// EnableDumpHistoricalStats used to control whether enbale dump stats for unit test
var enableDumpHistoricalStats atomic.Bool

// StartHistoricalStatsWorker start historical workers running
func (do *Domain) StartHistoricalStatsWorker() {
if !enableDumpHistoricalStats.Load() {
return
}
do.wg.Add(1)
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("HistoricalStatsWorker exited.")
util.Recover(metrics.LabelDomain, "HistoricalStatsWorkerLoop", nil, false)
}()
for {
select {
case <-do.exit:
return
case tblID := <-do.historicalStatsWorker.tblCH:
err := do.historicalStatsWorker.DumpHistoricalStats(tblID, do.StatsHandle())
if err != nil {
logutil.BgLogger().Warn("dump historical stats failed", zap.Error(err), zap.Int64("tableID", tblID))
}
}
}
}()
}

// StatsHandle returns the statistic handle.
func (do *Domain) StatsHandle() *handle.Handle {
return (*handle.Handle)(atomic.LoadPointer(&do.statsHandle))
Expand Down
63 changes: 63 additions & 0 deletions domain/historical_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle"
)

// HistoricalStatsWorker indicates for dump historical stats
type HistoricalStatsWorker struct {
tblCH chan int64
sctx sessionctx.Context
}

// SendTblToDumpHistoricalStats send tableID to worker to dump historical stats
func (w *HistoricalStatsWorker) SendTblToDumpHistoricalStats(tableID int64) {
w.tblCH <- tableID
}

// DumpHistoricalStats dump stats by given tableID
func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *handle.Handle) error {
historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable()
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
}
if !historicalStatsEnabled {
return nil
}
sctx := w.sctx
is := GetDomain(sctx).InfoSchema()
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
}
tblInfo := tbl.Meta()
dbInfo, existed := is.SchemaByTable(tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
}
return nil
}

// GetOneHistoricalStatsTable gets one tableID from channel, only used for test
func (w *HistoricalStatsWorker) GetOneHistoricalStatsTable() int64 {
return <-w.tblCH
}
16 changes: 2 additions & 14 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,8 @@ func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error {
if !historicalStatsEnabled {
return nil
}

is := domain.GetDomain(sctx).InfoSchema()
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
}
tblInfo := tbl.Meta()
dbInfo, existed := is.SchemaByTable(tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
}
historicalStatsWorker := domain.GetDomain(sctx).GetHistoricalStatsWorker()
historicalStatsWorker.SendTblToDumpHistoricalStats(tableID)
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,11 @@ func TestRecordHistoryStatsAfterAnalyze(t *testing.T) {
tk.MustExec("set global tidb_enable_historical_stats = 1")
defer tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustExec("analyze table t with 2 topn")
// dump historical stats
hsWorker := dom.GetHistoricalStatsWorker()
tblID := hsWorker.GetOneHistoricalStatsTable()
err = hsWorker.DumpHistoricalStats(tblID, h)
require.Nil(t, err)
rows = tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows()
num, _ = strconv.Atoi(rows[0][0].(string))
require.GreaterOrEqual(t, num, 1)
Expand Down
5 changes: 4 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,7 +2950,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
ses, err := createSessions(store, 9)
ses, err := createSessions(store, 10)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3030,6 +3030,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
// setup dumpFileGcChecker
dom.SetupDumpFileGCChecker(ses[8])
dom.DumpFileGcCheckerLoop()
// setup historical stats worker
dom.SetupHistoricalStatsWorker(ses[9])
dom.StartHistoricalStatsWorker()

// A sub context for update table stats, and other contexts for concurrent stats loading.
cnt := 1 + concurrency
Expand Down
1 change: 1 addition & 0 deletions testkit/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma
session.SetSchemaLease(lease)
session.DisableStats4Test()
domain.DisablePlanReplayerBackgroundJob4Test()
domain.DisableDumpHistoricalStats4Test()
dom, err := session.BootstrapSession(store)
require.NoError(t, err)

Expand Down
36 changes: 36 additions & 0 deletions ttl/cache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cache",
srcs = ["table.go"],
importpath = "github.com/pingcap/tidb/ttl/cache",
visibility = ["//visibility:public"],
deps = [
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//table/tables",
"//ttl/session",
"//types",
"//util/chunk",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "cache_test",
srcs = [
"main_test.go",
"table_test.go",
],
flaky = True,
deps = [
":cache",
"//parser/model",
"//testkit",
"//testkit/testsetup",
"//ttl/session",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
2 changes: 1 addition & 1 deletion ttl/main_test.go → ttl/cache/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ttl_test
package cache_test

import (
"testing"
Expand Down
5 changes: 3 additions & 2 deletions ttl/table.go → ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ttl
package cache

import (
"context"
Expand All @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -133,7 +134,7 @@ func (t *PhysicalTable) ValidateKey(key []types.Datum) error {
}

// EvalExpireTime returns the expired time
func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se Session, now time.Time) (expire time.Time, err error) {
func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session, now time.Time) (expire time.Time, err error) {
tz := se.GetSessionVars().TimeZone

expireExpr := t.TTLInfo.IntervalExprStr
Expand Down
Loading

0 comments on commit aa406ed

Please sign in to comment.