Skip to content

Commit

Permalink
*: kill auto analyze (#31724)
Browse files Browse the repository at this point in the history
close #32808
  • Loading branch information
chrysan authored Mar 7, 2022
1 parent b497b49 commit 62e2078
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 19 deletions.
65 changes: 63 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type Domain struct {
isLostConnectionToPD atomicutil.Int32 // !0: true, 0: false.
onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)

sysProcesses SysProcesses
}

// loadInfoSchema loads infoschema at startTS.
Expand Down Expand Up @@ -733,6 +735,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit)
do.sysProcesses = SysProcesses{mu: &sync.RWMutex{}, procMap: make(map[uint64]sessionctx.Context)}
return do
}

Expand Down Expand Up @@ -935,6 +938,11 @@ func (do *Domain) SysSessionPool() *sessionPool {
return do.sysSessionPool
}

// SysProcTracker returns the system processes tracker.
func (do *Domain) SysProcTracker() sessionctx.SysProcTracker {
return &do.sysProcesses
}

// GetEtcdClient returns the etcd client.
func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
Expand Down Expand Up @@ -1242,7 +1250,7 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses)
if err != nil {
return err
}
Expand Down Expand Up @@ -1272,7 +1280,7 @@ var RunAutoAnalyze = true
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses)
if err != nil {
return err
}
Expand Down Expand Up @@ -1794,3 +1802,56 @@ var (
ErrInfoSchemaChanged = dbterror.ClassDomain.NewStdErr(errno.ErrInfoSchemaChanged,
mysql.Message(errno.MySQLErrName[errno.ErrInfoSchemaChanged].Raw+". "+kv.TxnRetryableMark, nil))
)

// SysProcesses holds the sys processes infos
type SysProcesses struct {
mu *sync.RWMutex
procMap map[uint64]sessionctx.Context
}

// Track tracks the sys process into procMap
func (s *SysProcesses) Track(id uint64, proc sessionctx.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if oldProc, ok := s.procMap[id]; ok && oldProc != proc {
return errors.Errorf("The ID is in use: %v", id)
}
s.procMap[id] = proc
proc.GetSessionVars().ConnectionID = id
atomic.StoreUint32(&proc.GetSessionVars().Killed, 0)
return nil
}

// UnTrack removes the sys process from procMap
func (s *SysProcesses) UnTrack(id uint64) {
s.mu.Lock()
defer s.mu.Unlock()
if proc, ok := s.procMap[id]; ok {
delete(s.procMap, id)
proc.GetSessionVars().ConnectionID = 0
atomic.StoreUint32(&proc.GetSessionVars().Killed, 0)
}
}

// GetSysProcessList gets list of system ProcessInfo
func (s *SysProcesses) GetSysProcessList() map[uint64]*util.ProcessInfo {
s.mu.RLock()
defer s.mu.RUnlock()
rs := make(map[uint64]*util.ProcessInfo)
for connID, proc := range s.procMap {
// if session is still tracked in this map, it's not returned to sysSessionPool yet
if pi := proc.ShowProcess(); pi != nil && pi.ID == connID {
rs[connID] = pi
}
}
return rs
}

// KillSysProcess kills sys process with specified ID
func (s *SysProcesses) KillSysProcess(id uint64) {
s.mu.Lock()
defer s.mu.Unlock()
if proc, ok := s.procMap[id]; ok {
atomic.StoreUint32(&proc.GetSessionVars().Killed, 1)
}
}
2 changes: 1 addition & 1 deletion executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func TestKillStmt(t *testing.T) {
result.Check(testkit.Rows("Warning 1105 Parse ConnectionID failed: Unexpected connectionID excceeds int64"))

// local kill
connID := util.GlobalConnID{Is64bits: true, ServerID: 1, LocalConnID: 101}
connID := util.NewGlobalConnID(1, true)
tk.MustExec("kill " + strconv.FormatUint(connID.ID(), 10))
result = tk.MustQuery("show warnings")
result.Check(testkit.Rows())
Expand Down
25 changes: 18 additions & 7 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,7 @@ func (s *Server) SetDomain(dom *domain.Domain) {

// InitGlobalConnID initialize global connection id.
func (s *Server) InitGlobalConnID(serverIDGetter func() uint64) {
s.globalConnID = util.GlobalConnID{
ServerIDGetter: serverIDGetter,
Is64bits: true,
}
s.globalConnID = util.NewGlobalConnIDWithGetter(serverIDGetter, true)
}

// newConn creates a new *clientConn from a net.Conn.
Expand All @@ -193,7 +190,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
globalConnID: util.GlobalConnID{ServerID: 0, Is64bits: true},
globalConnID: util.NewGlobalConnID(0, true),
}
s.capability = defaultCapability
setTxnScope()
Expand Down Expand Up @@ -619,9 +616,22 @@ func (s *Server) checkConnectionCount() error {

// ShowProcessList implements the SessionManager interface.
func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo {
rs := make(map[uint64]*util.ProcessInfo)
for connID, pi := range s.getUserProcessList() {
rs[connID] = pi
}
if s.dom != nil {
for connID, pi := range s.dom.SysProcTracker().GetSysProcessList() {
rs[connID] = pi
}
}
return rs
}

func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
rs := make(map[uint64]*util.ProcessInfo, len(s.clients))
rs := make(map[uint64]*util.ProcessInfo)
for _, client := range s.clients {
if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {
continue
Expand Down Expand Up @@ -668,7 +678,8 @@ func (s *Server) Kill(connectionID uint64, query bool) {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
conn, ok := s.clients[connectionID]
if !ok {
if !ok && s.dom != nil {
s.dom.SysProcTracker().KillSysProcess(connectionID)
return
}

Expand Down
9 changes: 8 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ type Session interface {
AuthWithoutVerification(user *auth.UserIdentity) bool
AuthPluginForUser(user *auth.UserIdentity) (string, error)
MatchIdentity(username, remoteHost string) (*auth.UserIdentity, error)
ShowProcess() *util.ProcessInfo
// Return the information of the txn current running
TxnInfo() *txninfo.TxnInfo
// PrepareTxnCtx is exported for test.
Expand Down Expand Up @@ -1755,6 +1754,14 @@ func (s *session) withRestrictedSQLExecutor(ctx context.Context, opts []sqlexec.
return nil, nil, errors.Trace(err)
}
defer clean()
if execOption.TrackSysProcID > 0 {
err = execOption.TrackSysProc(execOption.TrackSysProcID, se)
if err != nil {
return nil, nil, errors.Trace(err)
}
// unTrack should be called before clean (return sys session)
defer execOption.UnTrackSysProc(execOption.TrackSysProcID)
}
return fn(ctx, se)
}

Expand Down
10 changes: 10 additions & 0 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type Context interface {
GetBuiltinFunctionUsage() map[string]uint32
// GetStmtStats returns stmtstats.StatementStats owned by implementation.
GetStmtStats() *stmtstats.StatementStats
// ShowProcess returns ProcessInfo running in current Context
ShowProcess() *util.ProcessInfo
}

type basicCtxType int
Expand Down Expand Up @@ -206,3 +208,11 @@ func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error
}
return nil
}

// SysProcTracker is used to track background sys processes
type SysProcTracker interface {
Track(id uint64, proc Context) error
UnTrack(id uint64)
GetSysProcessList() map[uint64]*util.ProcessInfo
KillSysProcess(id uint64)
}
9 changes: 7 additions & 2 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,20 @@ type Handle struct {

// StatsLoad is used to load stats concurrently
StatsLoad StatsLoad

// sysProcTracker is used to track sys process like analyze
sysProcTracker sessionctx.SysProcTracker
}

func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
return h.mu.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, params...)
}

func (h *Handle) execRestrictedSQLWithStatsVer(ctx context.Context, statsVer int, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
func (h *Handle) execRestrictedSQLWithStatsVer(ctx context.Context, statsVer int, procTrackID uint64, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
sqlexec.ExecOptionUseSessionPool,
execOptionForAnalyze[statsVer],
sqlexec.ExecOptionWithSysProcTrack(procTrackID, h.sysProcTracker.Track, h.sysProcTracker.UnTrack),
}
return h.mu.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, optFuncs, sql, params...)
}
Expand Down Expand Up @@ -180,13 +184,14 @@ type sessionPool interface {
}

// NewHandle creates a Handle for update stats.
func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*Handle, error) {
func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker) (*Handle, error) {
cfg := config.GetGlobalConfig()
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)},
pool: pool,
sysProcTracker: tracker,
}
handle.lease.Store(lease)
handle.statsCache.memTracker = memory.NewTracker(memory.LabelForStatsCache, -1)
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func TestVersion(t *testing.T) {
tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tableInfo1 := tbl1.Meta()
h, err := handle.NewHandle(testKit.Session(), time.Millisecond, do.SysSessionPool())
h, err := handle.NewHandle(testKit.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker())
require.NoError(t, err)
unit := oracle.ComposeTS(1, 0)
testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", 2*unit, tableInfo1.ID)
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ var execOptionForAnalyze = map[int]sqlexec.OptionFuncAlias{

func (h *Handle) execAutoAnalyze(statsVer int, sql string, params ...interface{}) {
startTime := time.Now()
_, _, err := h.execRestrictedSQLWithStatsVer(context.Background(), statsVer, sql, params...)
_, _, err := h.execRestrictedSQLWithStatsVer(context.Background(), statsVer, util.GetAutoAnalyzeProcID(), sql, params...)
dur := time.Since(startTime)
metrics.AutoAnalyzeHistogram.Observe(dur.Seconds())
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (c *Context) ExecuteInternal(ctx context.Context, sql string, args ...inter
return nil, errors.Errorf("Not Supported.")
}

// ShowProcess implements sessionctx.Context ShowProcess interface.
func (c *Context) ShowProcess() *util.ProcessInfo {
return &util.ProcessInfo{}
}

type mockDDLOwnerChecker struct{}

func (c *mockDDLOwnerChecker) IsOwner() bool { return true }
Expand Down
21 changes: 21 additions & 0 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ type GlobalConnID struct {
ServerIDGetter func() uint64
}

// NewGlobalConnID creates GlobalConnID with serverID
func NewGlobalConnID(serverID uint64, is64Bits bool) GlobalConnID {
return GlobalConnID{ServerID: serverID, Is64bits: is64Bits, LocalConnID: reservedLocalConns}
}

// NewGlobalConnIDWithGetter creates GlobalConnID with serverIDGetter
func NewGlobalConnIDWithGetter(serverIDGetter func() uint64, is64Bits bool) GlobalConnID {
return GlobalConnID{ServerIDGetter: serverIDGetter, Is64bits: is64Bits, LocalConnID: reservedLocalConns}
}

const (
// MaxServerID is maximum serverID.
MaxServerID = 1<<22 - 1
Expand Down Expand Up @@ -251,3 +261,14 @@ func ParseGlobalConnID(id uint64) (g GlobalConnID, isTruncated bool, err error)
ServerID: 0,
}, false, nil
}

const (
reservedLocalConns = 200
reservedConnAnalyze = 1
)

// GetAutoAnalyzeProcID returns processID for auto analyze
// TODO support IDs for concurrent auto-analyze
func GetAutoAnalyzeProcID() uint64 {
return reservedConnAnalyze
}
21 changes: 17 additions & 4 deletions util/sqlexec/restricted_sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -54,10 +55,13 @@ type RestrictedSQLExecutor interface {

// ExecOption is a struct defined for ExecRestrictedStmt/SQL option.
type ExecOption struct {
IgnoreWarning bool
SnapshotTS uint64
AnalyzeVer int
UseCurSession bool
IgnoreWarning bool
SnapshotTS uint64
AnalyzeVer int
UseCurSession bool
TrackSysProcID uint64
TrackSysProc func(id uint64, ctx sessionctx.Context) error
UnTrackSysProc func(id uint64)
}

// OptionFuncAlias is defined for the optional parameter of ExecRestrictedStmt/SQL.
Expand Down Expand Up @@ -97,6 +101,15 @@ func ExecOptionWithSnapshot(snapshot uint64) OptionFuncAlias {
}
}

// ExecOptionWithSysProcTrack tells ExecRestrictedStmt/SQL to track sys process.
func ExecOptionWithSysProcTrack(procID uint64, track func(id uint64, ctx sessionctx.Context) error, untrack func(id uint64)) OptionFuncAlias {
return func(option *ExecOption) {
option.TrackSysProcID = procID
option.TrackSysProc = track
option.UnTrackSysProc = untrack
}
}

// GetExecOption applies OptionFuncs and return ExecOption
func GetExecOption(opts []OptionFuncAlias) ExecOption {
var execOption ExecOption
Expand Down

0 comments on commit 62e2078

Please sign in to comment.