diff --git a/bootstrap.go b/bootstrap.go index e61a8c8b7504e..851ee8027efb6 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -254,7 +254,7 @@ func upgrade(s Session) { func upgradeToVer2(s Session) { // Version 2 add two system variable for DistSQL concurrency controlling. // Insert distsql related system variable. - distSQLVars := []string{variable.DistSQLScanConcurrencyVar, variable.DistSQLJoinConcurrencyVar} + distSQLVars := []string{variable.TiDBDistSQLScanConcurrency} values := make([]string, 0, len(distSQLVars)) for _, v := range distSQLVars { value := fmt.Sprintf(`("%s", "%s")`, v, variable.SysVars[v].Value) diff --git a/bootstrap_test.go b/bootstrap_test.go index 3fcc0714a2c9f..59b80f4d5cc4e 100644 --- a/bootstrap_test.go +++ b/bootstrap_test.go @@ -193,8 +193,8 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) { err = txn.Commit() c.Assert(err, IsNil) mustExecSQL(c, se1, `delete from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`) - mustExecSQL(c, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s" or VARIABLE_NAME="%s";`, - variable.DistSQLScanConcurrencyVar, variable.DistSQLJoinConcurrencyVar)) + mustExecSQL(c, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s";`, + variable.TiDBDistSQLScanConcurrency)) mustExecSQL(c, se1, `commit;`) delete(storeBootstrapped, store.UUID()) // Make sure the version is downgraded. diff --git a/executor/builder.go b/executor/builder.go index a48dc69f89249..64b53ff4161f1 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -572,7 +572,6 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor { byItems: v.GbyItemsPB, orderByList: v.SortItemsPB, } - st.scanConcurrency, b.err = getScanConcurrency(b.ctx) return st } @@ -599,7 +598,6 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor { aggFields: v.AggFields, byItems: v.GbyItemsPB, } - st.scanConcurrency, b.err = getScanConcurrency(b.ctx) return st } diff --git a/executor/distsql.go b/executor/distsql.go index 6c2923fa77820..e2aeb6c9fc306 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -17,7 +17,6 @@ import ( "fmt" "math" "sort" - "strconv" "time" "github.com/juju/errors" @@ -52,12 +51,6 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum, tableAsName *mod return &Row{Data: data, RowKeys: []*RowKeyEntry{entry}} } -// BaseLookupTableTaskSize represents base number of handles for a lookupTableTask. -var BaseLookupTableTaskSize = 1024 - -// MaxLookupTableTaskSize represents max number of handles for a lookupTableTask. -var MaxLookupTableTaskSize = 20480 - // LookupTableTaskChannelSize represents the channel size of the index double read taskChan. var LookupTableTaskChannelSize = 50 @@ -532,14 +525,10 @@ func (e *XSelectIndexExec) slowQueryInfo(duration time.Duration) string { e.partialCount, e.scanConcurrency, e.returnedRows, e.handleCount) } -const concurrencyLimit int = 30 - -// addWorker adds a worker for lookupTableTask. -// It's not thread-safe and should be called in fetchHandles goroutine only. -func addWorker(e *XSelectIndexExec, ch chan *lookupTableTask, concurrency *int) { - if *concurrency <= concurrencyLimit { - go e.pickAndExecTask(ch) - *concurrency = *concurrency + 1 +func (e *XSelectIndexExec) addWorker(workCh chan *lookupTableTask, concurrency *int, concurrencyLimit int) { + if *concurrency < concurrencyLimit { + go e.pickAndExecTask(workCh) + *concurrency++ } } @@ -549,8 +538,9 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan< workCh := make(chan *lookupTableTask, 1) defer close(workCh) + lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency var concurrency int - addWorker(e, workCh, &concurrency) + e.addWorker(workCh, &concurrency, lookupConcurrencyLimit) for { handles, finish, err := extractHandlesFromIndexResult(idxResult) @@ -562,7 +552,7 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan< tasks := e.buildTableTasks(handles) for _, task := range tasks { if concurrency < len(tasks) { - addWorker(e, workCh, &concurrency) + e.addWorker(workCh, &concurrency, lookupConcurrencyLimit) } select { @@ -570,7 +560,7 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan< return case workCh <- task: default: - addWorker(e, workCh, &concurrency) + e.addWorker(workCh, &concurrency, lookupConcurrencyLimit) workCh <- task } ch <- task @@ -578,17 +568,6 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan< } } -func getScanConcurrency(ctx context.Context) (int, error) { - sessionVars := ctx.GetSessionVars() - concurrency, err := sessionVars.GetTiDBSystemVar(variable.DistSQLScanConcurrencyVar) - if err != nil { - return 0, errors.Trace(err) - } - c, err := strconv.ParseInt(concurrency, 10, 64) - log.Debugf("[%d] [DistSQL] Scan with concurrency %d", sessionVars.ConnectionID, c) - return int(c), errors.Trace(err) -} - func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) { selIdxReq := new(tipb.SelectRequest) selIdxReq.StartTs = e.startTS @@ -634,16 +613,13 @@ func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask { // Build tasks with increasing batch size. var taskSizes []int total := len(handles) - batchSize := BaseLookupTableTaskSize + batchSize := e.ctx.GetSessionVars().IndexLookupSize for total > 0 { if batchSize > total { batchSize = total } taskSizes = append(taskSizes, batchSize) total -= batchSize - if batchSize < MaxLookupTableTaskSize { - batchSize *= 2 - } } var indexOrder map[int64]int @@ -813,9 +789,8 @@ type XSelectTableExec struct { aggFields []*types.FieldType aggregate bool - scanConcurrency int - execStart time.Time - partialCount int + execStart time.Time + partialCount int } // Schema implements the Executor Schema interface. @@ -849,7 +824,7 @@ func (e *XSelectTableExec) doRequest() error { selReq.GroupBy = e.byItems kvRanges := tableRangesToKVRanges(e.table.Meta().ID, e.ranges) - e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.scanConcurrency, e.keepOrder) + e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder) if err != nil { return errors.Trace(err) } @@ -927,7 +902,8 @@ func (e *XSelectTableExec) Next() (*Row, error) { func (e *XSelectTableExec) slowQueryInfo(duration time.Duration) string { return fmt.Sprintf("time: %v, table: %s(%d), partials: %d, concurrency: %d, start_ts: %d, rows: %d", - duration, e.tableInfo.Name, e.tableInfo.ID, e.partialCount, e.scanConcurrency, e.startTS, e.returnedRows) + duration, e.tableInfo.Name, e.tableInfo.ID, e.partialCount, e.ctx.GetSessionVars().DistSQLScanConcurrency, + e.startTS, e.returnedRows) } // timeZoneOffset returns the local time zone offset in seconds. diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 9e59deeb3ca0a..e8b12e85e6ed3 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -39,6 +39,7 @@ func (s *testSuite) TestIndexDoubleReadClose(c *C) { originSize := executor.LookupTableTaskChannelSize executor.LookupTableTaskChannelSize = 1 tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_index_lookup_size = '10'") tk.MustExec("use test") tk.MustExec("create table dist (id int primary key, c_idx int, c_col int, index (c_idx))") diff --git a/executor/executor_test.go b/executor/executor_test.go index 97ce1dadadd95..5f13325e57ba4 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -70,11 +70,9 @@ func (s *testSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) logLevel := os.Getenv("log_level") log.SetLevelByString(logLevel) - executor.BaseLookupTableTaskSize = 2 } func (s *testSuite) TearDownSuite(c *C) { - executor.BaseLookupTableTaskSize = 512 s.store.Close() } diff --git a/plan/statistics/statistics.go b/plan/statistics/statistics.go index 9cdb8182b6999..99ce432996e54 100644 --- a/plan/statistics/statistics.go +++ b/plan/statistics/statistics.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessionctx/varsutil" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/types" @@ -524,7 +525,7 @@ func (b *Builder) buildMultiColumns(t *Table, offsets []int, baseOffset int, isS func (b *Builder) getBuildStatsConcurrency() (int, error) { sessionVars := b.Ctx.GetSessionVars() - concurrency, err := sessionVars.GetTiDBSystemVar(variable.BuildStatsConcurrencyVar) + concurrency, err := varsutil.GetSessionSystemVar(sessionVars, variable.TiDBBuildStatsConcurrency) if err != nil { return 0, errors.Trace(err) } diff --git a/session.go b/session.go index 68ff90eba07f6..6601ea9aadb87 100644 --- a/session.go +++ b/session.go @@ -935,13 +935,17 @@ func finishBootstrap(store kv.Storage) { } } +const quoteCommaQuote = "', '" const loadCommonGlobalVarsSQL = "select * from mysql.global_variables where variable_name in ('" + - variable.AutocommitVar + "', '" + - variable.SQLModeVar + "', '" + - variable.DistSQLJoinConcurrencyVar + "', '" + - variable.MaxAllowedPacket + "', '" + - variable.TiDBSkipUTF8Check + "', '" + - variable.DistSQLScanConcurrencyVar + "')" + variable.AutocommitVar + quoteCommaQuote + + variable.SQLModeVar + quoteCommaQuote + + variable.MaxAllowedPacket + quoteCommaQuote + + /* TiDB specific global variables: */ + variable.TiDBSkipUTF8Check + quoteCommaQuote + + variable.TiDBSkipDDLWait + quoteCommaQuote + + variable.TiDBIndexLookupSize + quoteCommaQuote + + variable.TiDBIndexLookupConcurrency + quoteCommaQuote + + variable.TiDBDistSQLScanConcurrency + "')" // LoadCommonGlobalVariableIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 295366d528918..4f8d644b6bd10 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -15,11 +15,9 @@ package variable import ( "math" - "strings" "sync" "time" - "github.com/juju/errors" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/terror" ) @@ -134,16 +132,6 @@ type SessionVars struct { // version, we load an old version schema for query. SnapshotInfoschema interface{} - // SkipConstraintCheck is true when importing data. - SkipConstraintCheck bool - - // SkipUTF8 check on input value. - SkipUTF8Check bool - - // SkipDDLWait can be set to true to skip 2 lease wait after create/drop/truncate table, create/drop database. - // Then if there are multiple TiDB servers, the new table may not be available for other TiDB servers. - SkipDDLWait bool - // GlobalAccessor is used to set and get global variables. GlobalVarsAccessor GlobalVarAccessor @@ -166,8 +154,29 @@ type SessionVars struct { SQLMode mysql.SQLMode - // BuildStatsConcurrencyVar is used to control statistics building concurrency. + /* TiDB system variables */ + + // SkipConstraintCheck is true when importing data. + SkipConstraintCheck bool + + // SkipUTF8 check on input value. + SkipUTF8Check bool + + // SkipDDLWait can be set to true to skip 2 lease wait after create/drop/truncate table, create/drop database. + // Then if there are multiple TiDB servers, the new table may not be available for other TiDB servers. + SkipDDLWait bool + + // TiDBBuildStatsConcurrency is used to control statistics building concurrency. BuildStatsConcurrencyVar int + + // The number of handles for a index lookup task in index double read executor. + IndexLookupSize int + + // The number of concurrent index lookup worker. + IndexLookupConcurrency int + + // The number of concurrent dist SQL scan worker. + DistSQLScanConcurrency int } // NewSessionVars creates a session vars object. @@ -183,7 +192,10 @@ func NewSessionVars() *SessionVars { Status: mysql.ServerStatusAutocommit, StmtCtx: new(StatementContext), AllowAggPushDown: true, - BuildStatsConcurrencyVar: 4, + BuildStatsConcurrencyVar: DefBuildStatsConcurrency, + IndexLookupSize: DefIndexLookupSize, + IndexLookupConcurrency: DefIndexLookupConcurrency, + DistSQLScanConcurrency: DefDistSQLScanConcurrency, } } @@ -254,23 +266,6 @@ const ( TimeZone = "time_zone" ) -// GetTiDBSystemVar gets variable value for name. -// The variable should be a TiDB specific system variable (The vars in tidbSysVars map). -// We load the variable from session first, if not found, use local defined default variable. -func (s *SessionVars) GetTiDBSystemVar(name string) (string, error) { - key := strings.ToLower(name) - _, ok := tidbSysVars[key] - if !ok { - return "", errors.Errorf("%s is not a TiDB specific system variable.", name) - } - - sVal, ok := s.Systems[key] - if ok { - return sVal, nil - } - return SysVars[key].Value, nil -} - // StatementContext contains variables for a statement. // It should be reset before executing a statement. type StatementContext struct { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index aacdd3417f77f..0d959f587b639 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -14,6 +14,7 @@ package variable import ( + "strconv" "strings" "github.com/pingcap/tidb/mysql" @@ -60,8 +61,6 @@ const ( CodeIncorrectScope terror.ErrCode = 1238 ) -var tidbSysVars map[string]bool - // Variable errors var ( UnknownStatusVar = terror.ClassVariable.New(CodeUnknownStatusVar, "unknown status variable") @@ -81,16 +80,13 @@ func init() { CodeIncorrectScope: mysql.ErrIncorrectGlobalLocalVar, } terror.ErrClassToMySQLCodes[terror.ClassVariable] = mySQLErrCodes +} - tidbSysVars = make(map[string]bool) - tidbSysVars[DistSQLScanConcurrencyVar] = true - tidbSysVars[DistSQLJoinConcurrencyVar] = true - tidbSysVars[TiDBSnapshot] = true - tidbSysVars[TiDBSkipConstraintCheck] = true - tidbSysVars[TiDBSkipDDLWait] = true - tidbSysVars[TiDBOptAggPushDown] = true - tidbSysVars[TiDBOptInSubqUnFolding] = true - tidbSysVars[BuildStatsConcurrencyVar] = true +func boolToIntStr(b bool) string { + if b { + return "1" + } + return "0" } // we only support MySQL now @@ -594,30 +590,19 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, "min_examined_row_limit", "0"}, {ScopeGlobal, "sync_frm", "ON"}, {ScopeGlobal, "innodb_online_alter_log_max_size", "134217728"}, + /* TiDB specific variables */ {ScopeSession, TiDBSnapshot, ""}, - {ScopeGlobal | ScopeSession, DistSQLScanConcurrencyVar, "10"}, - {ScopeGlobal | ScopeSession, DistSQLJoinConcurrencyVar, "5"}, {ScopeSession, TiDBSkipConstraintCheck, "0"}, - {ScopeSession, TiDBSkipDDLWait, "0"}, - {ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, "0"}, - {ScopeSession, TiDBOptAggPushDown, "ON"}, - {ScopeSession, TiDBOptInSubqUnFolding, "OFF"}, - {ScopeSession, BuildStatsConcurrencyVar, "4"}, + {ScopeSession, TiDBOptAggPushDown, boolToIntStr(DefOptAggPushDown)}, + {ScopeSession, TiDBOptInSubqUnFolding, boolToIntStr(DefOptInSubqUnfolding)}, + {ScopeSession, TiDBBuildStatsConcurrency, strconv.Itoa(DefBuildStatsConcurrency)}, + {ScopeGlobal | ScopeSession, TiDBDistSQLScanConcurrency, strconv.Itoa(DefDistSQLScanConcurrency)}, + {ScopeGlobal | ScopeSession, TiDBIndexLookupSize, strconv.Itoa(DefIndexLookupSize)}, + {ScopeGlobal | ScopeSession, TiDBIndexLookupConcurrency, strconv.Itoa(DefIndexLookupConcurrency)}, + {ScopeGlobal | ScopeSession, TiDBSkipDDLWait, boolToIntStr(DefSkipDDLWait)}, + {ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)}, } -// TiDB system variables -const ( - TiDBSnapshot = "tidb_snapshot" - DistSQLScanConcurrencyVar = "tidb_distsql_scan_concurrency" - DistSQLJoinConcurrencyVar = "tidb_distsql_join_concurrency" - TiDBSkipConstraintCheck = "tidb_skip_constraint_check" - TiDBSkipDDLWait = "tidb_skip_ddl_wait" - TiDBSkipUTF8Check = "tidb_skip_utf8_check" - TiDBOptAggPushDown = "tidb_opt_agg_push_down" - TiDBOptInSubqUnFolding = "tidb_opt_insubquery_unfold" - BuildStatsConcurrencyVar = "tidb_build_stats_concurrency" -) - // SetNamesVariables is the system variable names related to set names statements. var SetNamesVariables = []string{ "character_set_client", diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go new file mode 100644 index 0000000000000..d68602ce9f72b --- /dev/null +++ b/sessionctx/variable/tidb_vars.go @@ -0,0 +1,94 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package variable + +/* + Steps to add a new TiDB specific system variable: + + 1. Add a new variable name with comment in this file. + 2. Add the default value of the new variable in this file. + 3. Add SysVar instance in 'defaultSysVars' slice with the default value. + 4. Add a field in `SessionVars`. + 5. Update the `NewSessionVars` function to set the field to its default value. + 6. Update the `varsutil.SetSessionSystemVar` function to use the new value when SET statement is executed. + 7. If it is a global variable, add it in `tidb.loadCommonGlobalVarsSQL`. + 8. Use this variable to control the behavior in code. +*/ + +// TiDB system variable names. +const ( + /* Session only */ + + // tidb_snapshot is used for reading history data, the default value is empty string. + // When the value is set to a datetime string like '2017-11-11 20:20:20', the session reads history data of that time. + TiDBSnapshot = "tidb_snapshot" + + // tidb_skip_constraint_check is used for loading data from a dump file, to speed up the loading process. + // When the value is set to true, unique index constraint is not checked. + TiDBSkipConstraintCheck = "tidb_skip_constraint_check" + + // tidb_opt_agg_push_down is used to endable/disable the optimizer rule of aggregation push down. + TiDBOptAggPushDown = "tidb_opt_agg_push_down" + + // tidb_opt_insubquery_unfold is used to enable/disable the optimizer rule of in subquery unfold. + TiDBOptInSubqUnFolding = "tidb_opt_insubquery_unfold" + + // tidb_build_stats_concurrency is used to speed up the ANALYZE statement, when a table has multiple indices, + // those indices can be scanned concurrently, with the cost of higher system performance impact. + TiDBBuildStatsConcurrency = "tidb_build_stats_concurrency" + + /* Session and global */ + + // tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task. + // A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes. + // Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact. + // If the query has a LIMIT clause, high concurrency makes the system do much more work than needed. + TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency" + + // tidb_index_lookup_size is used for index lookup executor. + // The index lookup executor first scan a batch of handles from a index, then use those handles to lookup the table + // rows, this value controls how much of handles in a batch to do a lookup task. + // Small value sends more RPCs to TiKV, consume more system resource. + // Large value may do more work than needed if the query has a limit. + TiDBIndexLookupSize = "tidb_index_lookup_size" + + // tidb_index_lookup_concurrency is used for index lookup executor. + // A lookup task may have 'tidb_index_lookup_size' of handles at maximun, the handles may be distributed + // in many TiKV nodes, we executes multiple concurrent index lookup tasks concurrently to reduce the time + // waiting for a task to finish. + // Set this value higher may reduce the latency but consumes more system resource. + TiDBIndexLookupConcurrency = "tidb_index_lookup_concurrency" + + // tidb_skip_ddl_wait skips the wait tiem of two lease after executing CREATE TABLE statement. + // When we have multiple TiDB servers in a cluster, the newly created table may not be available on all TiDB server + // until two lease time later, set this value to true will reduce the time to create a table, with the risk that + // other TiDB servers may fail to use the newly created table in a small time window. + TiDBSkipDDLWait = "tidb_skip_ddl_wait" + + // tidb_skip_utf8_check skips the UTF8 validate process, validate UTF8 has performance cost, if we can make sure + // the input string values are valid, we can skip the check. + TiDBSkipUTF8Check = "tidb_skip_utf8_check" +) + +// Default TiDB system variable values. +const ( + DefIndexLookupConcurrency = 4 + DefIndexLookupSize = 20000 + DefDistSQLScanConcurrency = 10 + DefBuildStatsConcurrency = 4 + DefSkipDDLWait = false + DefSkipUTF8Check = false + DefOptAggPushDown = true + DefOptInSubqUnfolding = false +) diff --git a/sessionctx/varsutil/varsutil.go b/sessionctx/varsutil/varsutil.go index dda06741650ef..0bf15a436c74d 100644 --- a/sessionctx/varsutil/varsutil.go +++ b/sessionctx/varsutil/varsutil.go @@ -14,6 +14,7 @@ package varsutil import ( + "strconv" "strings" "time" @@ -123,6 +124,12 @@ func SetSessionSystemVar(vars *variable.SessionVars, name string, value types.Da vars.AllowAggPushDown = tidbOptOn(sVal) case variable.TiDBOptInSubqUnFolding: vars.AllowInSubqueryUnFolding = tidbOptOn(sVal) + case variable.TiDBIndexLookupConcurrency: + vars.IndexLookupConcurrency = tidbOptPositiveInt(sVal, variable.DefIndexLookupConcurrency) + case variable.TiDBIndexLookupSize: + vars.IndexLookupSize = tidbOptPositiveInt(sVal, variable.DefIndexLookupSize) + case variable.TiDBDistSQLScanConcurrency: + vars.DistSQLScanConcurrency = tidbOptPositiveInt(sVal, variable.DefDistSQLScanConcurrency) } vars.Systems[name] = sVal return nil @@ -133,6 +140,14 @@ func tidbOptOn(opt string) bool { return strings.EqualFold(opt, "ON") || opt == "1" } +func tidbOptPositiveInt(opt string, defaultVal int) int { + val, err := strconv.Atoi(opt) + if err != nil || val <= 0 { + return defaultVal + } + return val +} + func parseTimeZone(s string) *time.Location { if s == "SYSTEM" { // TODO: Support global time_zone variable, it should be set to global time_zone value. diff --git a/sessionctx/varsutil/varsutil_test.go b/sessionctx/varsutil/varsutil_test.go index 60922bfd686ef..872aad7805b83 100644 --- a/sessionctx/varsutil/varsutil_test.go +++ b/sessionctx/varsutil/varsutil_test.go @@ -58,6 +58,7 @@ func (s *testVarsutilSuite) TestTiDBOptOn(c *C) { func (s *testVarsutilSuite) TestVarsutil(c *C) { defer testleak.AfterTest(c)() v := variable.NewSessionVars() + v.GlobalVarsAccessor = newMockGlobalAccessor() SetSessionSystemVar(v, "autocommit", types.NewStringDatum("1")) val, err := GetSessionSystemVar(v, "autocommit") @@ -137,3 +138,26 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { SetSessionSystemVar(v, "sql_mode", types.NewStringDatum("REAL_AS_FLOAT,ANSI_QUOTES")) c.Assert(v.SQLMode, Equals, mysql.ModeRealAsFloat|mysql.ModeANSIQuotes) } + +type mockGlobalAccessor struct { + vars map[string]string +} + +func newMockGlobalAccessor() *mockGlobalAccessor { + m := &mockGlobalAccessor{ + vars: make(map[string]string), + } + for name, val := range variable.SysVars { + m.vars[name] = val.Value + } + return m +} + +func (m *mockGlobalAccessor) GetGlobalSysVar(name string) (string, error) { + return m.vars[name], nil +} + +func (m *mockGlobalAccessor) SetGlobalSysVar(name string, value string) error { + m.vars[name] = value + return nil +}