Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessionctx/variable: refine TiDB specific system variables. #2915

Merged
merged 3 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func upgrade(s Session) {
func upgradeToVer2(s Session) {
// Version 2 add two system variable for DistSQL concurrency controlling.
// Insert distsql related system variable.
distSQLVars := []string{variable.DistSQLScanConcurrencyVar, variable.DistSQLJoinConcurrencyVar}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove DistSQLJoinConcurrencyVar?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not used right now, we can easily add it later if needed.
We add it before hand because we used need to update bootstrap version for adding new global variable.

distSQLVars := []string{variable.TiDBDistSQLScanConcurrency}
values := make([]string, 0, len(distSQLVars))
for _, v := range distSQLVars {
value := fmt.Sprintf(`("%s", "%s")`, v, variable.SysVars[v].Value)
Expand Down
4 changes: 2 additions & 2 deletions bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) {
err = txn.Commit()
c.Assert(err, IsNil)
mustExecSQL(c, se1, `delete from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`)
mustExecSQL(c, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s" or VARIABLE_NAME="%s";`,
variable.DistSQLScanConcurrencyVar, variable.DistSQLJoinConcurrencyVar))
mustExecSQL(c, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s";`,
variable.TiDBDistSQLScanConcurrency))
mustExecSQL(c, se1, `commit;`)
delete(storeBootstrapped, store.UUID())
// Make sure the version is downgraded.
Expand Down
2 changes: 0 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
byItems: v.GbyItemsPB,
orderByList: v.SortItemsPB,
}
st.scanConcurrency, b.err = getScanConcurrency(b.ctx)
return st
}

Expand All @@ -599,7 +598,6 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
aggFields: v.AggFields,
byItems: v.GbyItemsPB,
}
st.scanConcurrency, b.err = getScanConcurrency(b.ctx)
return st
}

Expand Down
52 changes: 14 additions & 38 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"math"
"sort"
"strconv"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -52,12 +51,6 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum, tableAsName *mod
return &Row{Data: data, RowKeys: []*RowKeyEntry{entry}}
}

// BaseLookupTableTaskSize represents base number of handles for a lookupTableTask.
var BaseLookupTableTaskSize = 1024

// MaxLookupTableTaskSize represents max number of handles for a lookupTableTask.
var MaxLookupTableTaskSize = 20480

// LookupTableTaskChannelSize represents the channel size of the index double read taskChan.
var LookupTableTaskChannelSize = 50

Expand Down Expand Up @@ -532,14 +525,10 @@ func (e *XSelectIndexExec) slowQueryInfo(duration time.Duration) string {
e.partialCount, e.scanConcurrency, e.returnedRows, e.handleCount)
}

const concurrencyLimit int = 30

// addWorker adds a worker for lookupTableTask.
// It's not thread-safe and should be called in fetchHandles goroutine only.
func addWorker(e *XSelectIndexExec, ch chan *lookupTableTask, concurrency *int) {
if *concurrency <= concurrencyLimit {
go e.pickAndExecTask(ch)
*concurrency = *concurrency + 1
func (e *XSelectIndexExec) addWorker(workCh chan *lookupTableTask, concurrency *int, concurrencyLimit int) {
if *concurrency < concurrencyLimit {
go e.pickAndExecTask(workCh)
*concurrency++
}
}

Expand All @@ -549,8 +538,9 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<
workCh := make(chan *lookupTableTask, 1)
defer close(workCh)

lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
var concurrency int
addWorker(e, workCh, &concurrency)
e.addWorker(workCh, &concurrency, lookupConcurrencyLimit)

for {
handles, finish, err := extractHandlesFromIndexResult(idxResult)
Expand All @@ -562,33 +552,22 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<
tasks := e.buildTableTasks(handles)
for _, task := range tasks {
if concurrency < len(tasks) {
addWorker(e, workCh, &concurrency)
e.addWorker(workCh, &concurrency, lookupConcurrencyLimit)
}

select {
case <-e.ctx.Done():
return
case workCh <- task:
default:
addWorker(e, workCh, &concurrency)
e.addWorker(workCh, &concurrency, lookupConcurrencyLimit)
workCh <- task
}
ch <- task
}
}
}

func getScanConcurrency(ctx context.Context) (int, error) {
sessionVars := ctx.GetSessionVars()
concurrency, err := sessionVars.GetTiDBSystemVar(variable.DistSQLScanConcurrencyVar)
if err != nil {
return 0, errors.Trace(err)
}
c, err := strconv.ParseInt(concurrency, 10, 64)
log.Debugf("[%d] [DistSQL] Scan with concurrency %d", sessionVars.ConnectionID, c)
return int(c), errors.Trace(err)
}

func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
selIdxReq := new(tipb.SelectRequest)
selIdxReq.StartTs = e.startTS
Expand Down Expand Up @@ -634,16 +613,13 @@ func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
// Build tasks with increasing batch size.
var taskSizes []int
total := len(handles)
batchSize := BaseLookupTableTaskSize
batchSize := e.ctx.GetSessionVars().IndexLookupSize
for total > 0 {
if batchSize > total {
batchSize = total
}
taskSizes = append(taskSizes, batchSize)
total -= batchSize
if batchSize < MaxLookupTableTaskSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the old strategy ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old strategy doesn't work well.

batchSize *= 2
}
}

var indexOrder map[int64]int
Expand Down Expand Up @@ -813,9 +789,8 @@ type XSelectTableExec struct {
aggFields []*types.FieldType
aggregate bool

scanConcurrency int
execStart time.Time
partialCount int
execStart time.Time
partialCount int
}

// Schema implements the Executor Schema interface.
Expand Down Expand Up @@ -849,7 +824,7 @@ func (e *XSelectTableExec) doRequest() error {
selReq.GroupBy = e.byItems

kvRanges := tableRangesToKVRanges(e.table.Meta().ID, e.ranges)
e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.scanConcurrency, e.keepOrder)
e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -927,7 +902,8 @@ func (e *XSelectTableExec) Next() (*Row, error) {

func (e *XSelectTableExec) slowQueryInfo(duration time.Duration) string {
return fmt.Sprintf("time: %v, table: %s(%d), partials: %d, concurrency: %d, start_ts: %d, rows: %d",
duration, e.tableInfo.Name, e.tableInfo.ID, e.partialCount, e.scanConcurrency, e.startTS, e.returnedRows)
duration, e.tableInfo.Name, e.tableInfo.ID, e.partialCount, e.ctx.GetSessionVars().DistSQLScanConcurrency,
e.startTS, e.returnedRows)
}

// timeZoneOffset returns the local time zone offset in seconds.
Expand Down
1 change: 1 addition & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *testSuite) TestIndexDoubleReadClose(c *C) {
originSize := executor.LookupTableTaskChannelSize
executor.LookupTableTaskChannelSize = 1
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_index_lookup_size = '10'")
tk.MustExec("use test")
tk.MustExec("create table dist (id int primary key, c_idx int, c_col int, index (c_idx))")

Expand Down
2 changes: 0 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ func (s *testSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
logLevel := os.Getenv("log_level")
log.SetLevelByString(logLevel)
executor.BaseLookupTableTaskSize = 2
}

func (s *testSuite) TearDownSuite(c *C) {
executor.BaseLookupTableTaskSize = 512
s.store.Close()
}

Expand Down
3 changes: 2 additions & 1 deletion plan/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessionctx/varsutil"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
Expand Down Expand Up @@ -524,7 +525,7 @@ func (b *Builder) buildMultiColumns(t *Table, offsets []int, baseOffset int, isS

func (b *Builder) getBuildStatsConcurrency() (int, error) {
sessionVars := b.Ctx.GetSessionVars()
concurrency, err := sessionVars.GetTiDBSystemVar(variable.BuildStatsConcurrencyVar)
concurrency, err := varsutil.GetSessionSystemVar(sessionVars, variable.TiDBBuildStatsConcurrency)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
16 changes: 10 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,13 +935,17 @@ func finishBootstrap(store kv.Storage) {
}
}

const quoteCommaQuote = "', '"
const loadCommonGlobalVarsSQL = "select * from mysql.global_variables where variable_name in ('" +
variable.AutocommitVar + "', '" +
variable.SQLModeVar + "', '" +
variable.DistSQLJoinConcurrencyVar + "', '" +
variable.MaxAllowedPacket + "', '" +
variable.TiDBSkipUTF8Check + "', '" +
variable.DistSQLScanConcurrencyVar + "')"
variable.AutocommitVar + quoteCommaQuote +
variable.SQLModeVar + quoteCommaQuote +
variable.MaxAllowedPacket + quoteCommaQuote +
/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check + quoteCommaQuote +
variable.TiDBSkipDDLWait + quoteCommaQuote +
variable.TiDBIndexLookupSize + quoteCommaQuote +
variable.TiDBIndexLookupConcurrency + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + "')"

// LoadCommonGlobalVariableIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
Expand Down
57 changes: 26 additions & 31 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ package variable

import (
"math"
"strings"
"sync"
"time"

"github.com/juju/errors"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
)
Expand Down Expand Up @@ -134,16 +132,6 @@ type SessionVars struct {
// version, we load an old version schema for query.
SnapshotInfoschema interface{}

// SkipConstraintCheck is true when importing data.
SkipConstraintCheck bool

// SkipUTF8 check on input value.
SkipUTF8Check bool

// SkipDDLWait can be set to true to skip 2 lease wait after create/drop/truncate table, create/drop database.
// Then if there are multiple TiDB servers, the new table may not be available for other TiDB servers.
SkipDDLWait bool

// GlobalAccessor is used to set and get global variables.
GlobalVarsAccessor GlobalVarAccessor

Expand All @@ -166,8 +154,29 @@ type SessionVars struct {

SQLMode mysql.SQLMode

// BuildStatsConcurrencyVar is used to control statistics building concurrency.
/* TiDB system variables */

// SkipConstraintCheck is true when importing data.
SkipConstraintCheck bool

// SkipUTF8 check on input value.
SkipUTF8Check bool

// SkipDDLWait can be set to true to skip 2 lease wait after create/drop/truncate table, create/drop database.
// Then if there are multiple TiDB servers, the new table may not be available for other TiDB servers.
SkipDDLWait bool

// TiDBBuildStatsConcurrency is used to control statistics building concurrency.
BuildStatsConcurrencyVar int

// The number of handles for a index lookup task in index double read executor.
IndexLookupSize int

// The number of concurrent index lookup worker.
IndexLookupConcurrency int

// The number of concurrent dist SQL scan worker.
DistSQLScanConcurrency int
}

// NewSessionVars creates a session vars object.
Expand All @@ -183,7 +192,10 @@ func NewSessionVars() *SessionVars {
Status: mysql.ServerStatusAutocommit,
StmtCtx: new(StatementContext),
AllowAggPushDown: true,
BuildStatsConcurrencyVar: 4,
BuildStatsConcurrencyVar: DefBuildStatsConcurrency,
IndexLookupSize: DefIndexLookupSize,
IndexLookupConcurrency: DefIndexLookupConcurrency,
DistSQLScanConcurrency: DefDistSQLScanConcurrency,
}
}

Expand Down Expand Up @@ -254,23 +266,6 @@ const (
TimeZone = "time_zone"
)

// GetTiDBSystemVar gets variable value for name.
// The variable should be a TiDB specific system variable (The vars in tidbSysVars map).
// We load the variable from session first, if not found, use local defined default variable.
func (s *SessionVars) GetTiDBSystemVar(name string) (string, error) {
key := strings.ToLower(name)
_, ok := tidbSysVars[key]
if !ok {
return "", errors.Errorf("%s is not a TiDB specific system variable.", name)
}

sVal, ok := s.Systems[key]
if ok {
return sVal, nil
}
return SysVars[key].Value, nil
}

// StatementContext contains variables for a statement.
// It should be reset before executing a statement.
type StatementContext struct {
Expand Down
Loading