Skip to content

Commit

Permalink
session, executor: support setting tidb_enable_stmt_summary in sessio…
Browse files Browse the repository at this point in the history
…n scope (pingcap#12217) (pingcap#12308)

cherry-pick 12308 to release-3.1
  • Loading branch information
djshow832 committed Sep 30, 2019
1 parent 1877cb7 commit dd3ad59
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 31 deletions.
2 changes: 1 addition & 1 deletion domain/global_vars_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func checkEnableStmtSummary(rows []chunk.Row, fields []*ast.ResultField) {
}
}

stmtsummary.OnEnableStmtSummaryModified(sVal)
stmtsummary.StmtSummaryByDigestMap.SetEnabled(sVal, false)
break
}
}
Expand Down
8 changes: 4 additions & 4 deletions domain/global_vars_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package domain

import (
"sync/atomic"
"time"

. "github.com/pingcap/check"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/testleak"
)

Expand Down Expand Up @@ -127,18 +127,18 @@ func (gvcSuite *testGVCSuite) TestCheckEnableStmtSummary(c *C) {
Collate: charset.CollationBin,
}

atomic.StoreInt32(&variable.EnableStmtSummary, 0)
stmtsummary.StmtSummaryByDigestMap.SetEnabled("0", false)
ck := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024)
ck.AppendString(0, variable.TiDBEnableStmtSummary)
ck.AppendString(1, "1")
row := ck.GetRow(0)
gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1})
c.Assert(atomic.LoadInt32(&variable.EnableStmtSummary), Equals, int32(1))
c.Assert(stmtsummary.StmtSummaryByDigestMap.Enabled(), Equals, true)

ck = chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024)
ck.AppendString(0, variable.TiDBEnableStmtSummary)
ck.AppendString(1, "0")
row = ck.GetRow(0)
gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1})
c.Assert(atomic.LoadInt32(&variable.EnableStmtSummary), Equals, int32(0))
c.Assert(stmtsummary.StmtSummaryByDigestMap.Enabled(), Equals, false)
}
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
// SummaryStmt collects statements for performance_schema.events_statements_summary_by_digest
func (a *ExecStmt) SummaryStmt() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.InRestrictedSQL || atomic.LoadInt32(&variable.EnableStmtSummary) == 0 {
if sessVars.InRestrictedSQL || !stmtsummary.StmtSummaryByDigestMap.Enabled() {
return
}
stmtCtx := sessVars.StmtCtx
Expand Down
13 changes: 9 additions & 4 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stmtsummary"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -119,6 +120,7 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
if sysVar.Scope == variable.ScopeNone {
return errors.Errorf("Variable '%s' is a read only variable", name)
}
var valStr string
if v.IsGlobal {
// Set global scope system variable.
if sysVar.Scope&variable.ScopeGlobal == 0 {
Expand All @@ -131,18 +133,18 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
if value.IsNull() {
value.SetString("")
}
svalue, err := value.ToString()
valStr, err = value.ToString()
if err != nil {
return err
}
err = sessionVars.GlobalVarsAccessor.SetGlobalSysVar(name, svalue)
err = sessionVars.GlobalVarsAccessor.SetGlobalSysVar(name, valStr)
if err != nil {
return err
}
err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
auditPlugin := plugin.DeclareAuditManifest(p.Manifest)
if auditPlugin.OnGlobalVariableEvent != nil {
auditPlugin.OnGlobalVariableEvent(context.Background(), e.ctx.GetSessionVars(), name, svalue)
auditPlugin.OnGlobalVariableEvent(context.Background(), e.ctx.GetSessionVars(), name, valStr)
}
return nil
})
Expand Down Expand Up @@ -179,7 +181,6 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
sessionVars.SnapshotTS = oldSnapshotTS
return err
}
var valStr string
if value.IsNull() {
valStr = "NULL"
} else {
Expand All @@ -190,6 +191,10 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
logutil.Logger(context.Background()).Info("set session var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", valStr))
}

if name == variable.TiDBEnableStmtSummary {
stmtsummary.StmtSummaryByDigestMap.SetEnabled(valStr, !v.IsGlobal)
}

return nil
}

Expand Down
34 changes: 33 additions & 1 deletion infoschema/perfschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
).Check(testkit.Rows("test 4 4 insert into t values(1, 'a')"))

// Disable it again
tk.MustExec("set global tidb_enable_stmt_summary = 0")
tk.MustExec("set global tidb_enable_stmt_summary = false")
tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("0"))

// Create a new session to test
Expand All @@ -122,4 +122,36 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
tk.MustQuery(`select schema_name, exec_count, sum_rows_affected, query_sample_text
from performance_schema.events_statements_summary_by_digest`,
).Check(testkit.Rows())

// Enable it in session scope
tk.MustExec("set session tidb_enable_stmt_summary = on")
// It should work immediately
tk.MustQuery("select * from t where a=2")
tk.MustQuery(`select schema_name, exec_count, sum_rows_affected, query_sample_text
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select * from t%'`,
).Check(testkit.Rows("test 1 0 select * from t where a=2"))

// Disable it in global scope
tk.MustExec("set global tidb_enable_stmt_summary = off")

// Create a new session to test
tk = testkit.NewTestKitWithInit(c, s.store)

tk.MustQuery("select * from t where a=2")

// Statement summary is still enabled
tk.MustQuery(`select schema_name, exec_count, sum_rows_affected, query_sample_text
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select * from t%'`,
).Check(testkit.Rows("test 2 0 select * from t where a=2"))

// Unset session variable
tk.MustExec("set session tidb_enable_stmt_summary = ''")
tk.MustQuery("select * from t where a=2")

// Statement summary is disabled
tk.MustQuery(`select schema_name, exec_count, sum_rows_affected, query_sample_text
from performance_schema.events_statements_summary_by_digest`,
).Check(testkit.Rows())
}
3 changes: 2 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,8 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeGlobal, TiDBEnableStmtSummary, BoolToIntStr(DefTiDBEnableStmtSummary)},
{ScopeSession, TiDBAllowRemoveAutoInc, BoolToIntStr(DefTiDBAllowRemoveAutoInc)},
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
3 changes: 1 addition & 2 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ const (
DefTiDBWaitSplitRegionFinish = true
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefWaitSplitRegionTimeout = 300 // 300s
DefTiDBEnableStmtSummary = false
DefTiDBAllowRemoveAutoInc = false
)

// Process global variables.
Expand All @@ -367,5 +367,4 @@ var (
MaxOfMaxAllowedPacket uint64 = 1073741824
ExpensiveQueryTimeThreshold uint64 = DefTiDBExpensiveQueryTimeThreshold
MinExpensiveQueryTimeThreshold uint64 = 10 //10s
EnableStmtSummary int32 = BoolToInt32(DefTiDBEnableStmtSummary)
)
20 changes: 19 additions & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction,
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBScatterRegion, TiDBEnableStmtSummary:
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBScatterRegion:
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
return value, nil
}
Expand Down Expand Up @@ -569,6 +569,24 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(TiDBTxnMode, value)
}
case TiDBAllowRemoveAutoInc:
switch {
case strings.EqualFold(value, "ON") || value == "1":
return "on", nil
case strings.EqualFold(value, "OFF") || value == "0":
return "off", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBEnableStmtSummary:
switch {
case strings.EqualFold(value, "ON") || value == "1":
return "1", nil
case strings.EqualFold(value, "OFF") || value == "0":
return "0", nil
case value == "":
return "", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
}
Expand Down
85 changes: 74 additions & 11 deletions util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package stmtsummary
import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/kvcache"
Expand Down Expand Up @@ -55,6 +53,15 @@ type stmtSummaryByDigestMap struct {
// It's rare to read concurrently, so RWMutex is not needed.
sync.Mutex
summaryMap *kvcache.SimpleLRUCache

// enabledWrapper encapsulates variables needed to judge whether statement summary is enabled.
enabledWrapper struct {
sync.RWMutex
// enabled indicates whether statement summary is enabled in current server.
sessionEnabled string
// setInSession indicates whether statement summary has been set in any session.
globalEnabled string
}
}

// StmtSummaryByDigestMap is a global map containing all statement summaries.
Expand Down Expand Up @@ -97,9 +104,13 @@ type StmtExecInfo struct {
// newStmtSummaryByDigestMap creates an empty stmtSummaryByDigestMap.
func newStmtSummaryByDigestMap() *stmtSummaryByDigestMap {
maxStmtCount := config.GetGlobalConfig().StmtSummary.MaxStmtCount
return &stmtSummaryByDigestMap{
ssMap := &stmtSummaryByDigestMap{
summaryMap: kvcache.NewSimpleLRUCache(maxStmtCount, 0, 0),
}
// enabledWrapper.defaultEnabled will be initialized in package variable.
ssMap.enabledWrapper.sessionEnabled = ""
ssMap.enabledWrapper.globalEnabled = ""
return ssMap
}

// newStmtSummaryByDigest creates a stmtSummaryByDigest from StmtExecInfo
Expand Down Expand Up @@ -164,7 +175,7 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {

ssMap.Lock()
// Check again. Statements could be added before disabling the flag and after Clear()
if atomic.LoadInt32(&variable.EnableStmtSummary) == 0 {
if !ssMap.Enabled() {
ssMap.Unlock()
return
}
Expand All @@ -188,7 +199,7 @@ func (ssMap *stmtSummaryByDigestMap) Clear() {
ssMap.Unlock()
}

// Convert statement summary to Datum
// ToDatum converts statement summary to Datum
func (ssMap *stmtSummaryByDigestMap) ToDatum() [][]types.Datum {
ssMap.Lock()
values := ssMap.summaryMap.Values()
Expand Down Expand Up @@ -219,12 +230,64 @@ func (ssMap *stmtSummaryByDigestMap) ToDatum() [][]types.Datum {
return rows
}

// OnEnableStmtSummaryModified is triggered once EnableStmtSummary is modified.
func OnEnableStmtSummaryModified(newValue string) {
if variable.TiDBOptOn(newValue) {
atomic.StoreInt32(&variable.EnableStmtSummary, 1)
// SetEnabled enables or disables statement summary in global(cluster) or session(server) scope.
func (ssMap *stmtSummaryByDigestMap) SetEnabled(value string, inSession bool) {
value = ssMap.normalizeEnableValue(value)

ssMap.enabledWrapper.Lock()
if inSession {
ssMap.enabledWrapper.sessionEnabled = value
} else {
ssMap.enabledWrapper.globalEnabled = value
}
sessionEnabled := ssMap.enabledWrapper.sessionEnabled
globalEnabled := ssMap.enabledWrapper.globalEnabled
ssMap.enabledWrapper.Unlock()

// Clear all summaries once statement summary is disabled.
var needClear bool
if ssMap.isSet(sessionEnabled) {
needClear = !ssMap.isEnabled(sessionEnabled)
} else {
needClear = !ssMap.isEnabled(globalEnabled)
}
if needClear {
ssMap.Clear()
}
}

// Enabled returns whether statement summary is enabled.
func (ssMap *stmtSummaryByDigestMap) Enabled() bool {
ssMap.enabledWrapper.RLock()
var enabled bool
if ssMap.isSet(ssMap.enabledWrapper.sessionEnabled) {
enabled = ssMap.isEnabled(ssMap.enabledWrapper.sessionEnabled)
} else {
atomic.StoreInt32(&variable.EnableStmtSummary, 0)
StmtSummaryByDigestMap.Clear()
enabled = ssMap.isEnabled(ssMap.enabledWrapper.globalEnabled)
}
ssMap.enabledWrapper.RUnlock()
return enabled
}

// normalizeEnableValue converts 'ON' to '1' and 'OFF' to '0'
func (ssMap *stmtSummaryByDigestMap) normalizeEnableValue(value string) string {
switch {
case strings.EqualFold(value, "ON"):
return "1"
case strings.EqualFold(value, "OFF"):
return "0"
default:
return value
}
}

// isEnabled converts a string value to bool.
// 1 indicates true, 0 or '' indicates false.
func (ssMap *stmtSummaryByDigestMap) isEnabled(value string) bool {
return value == "1"
}

// isSet judges whether the variable is set.
func (ssMap *stmtSummaryByDigestMap) isSet(value string) bool {
return value != ""
}
Loading

0 comments on commit dd3ad59

Please sign in to comment.