Skip to content

Commit

Permalink
planner, session: add isolation read with engine type (pingcap#12997)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed Jan 19, 2020
1 parent bac96d7 commit 67b66c7
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 4 deletions.
8 changes: 8 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ const (
TiFlash
)

// Name returns the name of store type.
func (t StoreType) Name() string {
if t == TiFlash {
return "tiflash"
}
return "tikv"
}

// Request represents a kv request.
type Request struct {
// Tp is the request type.
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,9 @@ func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, taskType, indent string
var storeType string
switch copPlan.StoreType {
case kv.TiKV:
storeType = "tikv"
storeType = kv.TiKV.Name()
case kv.TiFlash:
storeType = "tiflash"
storeType = kv.TiFlash.Name()
default:
err = errors.Errorf("the store type %v is unknown", copPlan.StoreType)
return
Expand Down
17 changes: 17 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,20 @@ func (s *testIntegrationSuite) TestErrNoDB(c *C) {
tk.MustExec("use test")
tk.MustExec("grant select on test1111 to test@'%'")
}

func (s *testIntegrationSuite) TestNoneAccessPathsFoundByIsolationRead(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")

_, err := tk.Exec("select * from t")
c.Assert(err, IsNil)

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")

_, err = tk.Exec("select * from t")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can not find access path matching 'tidb_isolation_read_engines'(value: 'tiflash'). Available values are 'tikv'.")
}
4 changes: 4 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,10 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if err != nil {
return nil, err
}
possiblePaths, err = b.filterPathByIsolationRead(possiblePaths)
if err != nil {
return nil, err
}

var columns []*table.Column
if b.inUpdateStmt {
Expand Down
27 changes: 27 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -573,6 +574,32 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tblInf
return available, nil
}

func (b *PlanBuilder) filterPathByIsolationRead(paths []*accessPath) ([]*accessPath, error) {
// TODO: filter paths with isolation read locations.
isolationReadEngines := b.ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
for i := len(paths) - 1; i >= 0; i-- {
if _, ok := availableEngine[paths[i].storeType]; !ok {
availableEngine[paths[i].storeType] = struct{}{}
if availableEngineStr != "" {
availableEngineStr += ", "
}
availableEngineStr += paths[i].storeType.Name()
}
if _, ok := isolationReadEngines[paths[i].storeType]; !ok {
paths = append(paths[:i], paths[i+1:]...)
}
}
var err error
if len(paths) == 0 {
engineVals, _ := b.ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines)
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("Can not find access path matching '%v'(value: '%v'). Available values are '%v'.",
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr))
}
return paths, err
}

func removeIgnoredPaths(paths, ignoredPaths []*accessPath, tblInfo *model.TableInfo) []*accessPath {
if len(ignoredPaths) == 0 {
return paths
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,7 @@ var builtinGlobalVariable = []string{
variable.TiDBStmtSummaryHistorySize,
variable.TiDBMaxDeltaSchemaCount,
variable.TiDBStoreLimit,
variable.TiDBIsolationReadEngines,
}

var (
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ type SessionVars struct {
// LockWaitTimeout is the duration waiting for pessimistic lock in milliseconds
// negative value means nowait, 0 means default behavior, others means actual wait time
LockWaitTimeout int64

// isolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
isolationReadEngines map[kv.StoreType]struct{}
}

// PreparedParams contains the parameters of the current prepared statement when executing it.
Expand Down Expand Up @@ -530,6 +533,7 @@ func NewSessionVars() *SessionVars {
ReplicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}},
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -579,6 +583,11 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// GetIsolationReadEngines gets isolation read engines.
func (s *SessionVars) GetIsolationReadEngines() map[kv.StoreType]struct{} {
return s.isolationReadEngines
}

// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
if !s.LightningMode {
Expand Down Expand Up @@ -925,6 +934,16 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
}
case TiDBStoreLimit:
storeutil.StoreLimit.Store(tidbOptInt64(val, DefTiDBStoreLimit))
case TiDBIsolationReadEngines:
s.isolationReadEngines = make(map[kv.StoreType]struct{})
for _, engine := range strings.Split(val, ",") {
switch engine {
case kv.TiKV.Name():
s.isolationReadEngines[kv.TiKV] = struct{}{}
case kv.TiFlash.Name():
s.isolationReadEngines[kv.TiFlash] = struct{}{}
}
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBStmtSummaryRefreshInterval, strconv.Itoa(config.GetGlobalConfig().StmtSummary.RefreshInterval)},
{ScopeGlobal | ScopeSession, TiDBStmtSummaryHistorySize, strconv.Itoa(config.GetGlobalConfig().StmtSummary.HistorySize)},
{ScopeGlobal | ScopeSession, TiDBStoreLimit, strconv.FormatInt(atomic.LoadInt64(&config.GetGlobalConfig().TiKVClient.StoreLimit), 10)},
{ScopeGlobal | ScopeSession, TiDBIsolationReadEngines, "tikv,tiflash"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ const (

// TiDBStoreLimit indicates the limit of sending request to a store, 0 means without limit.
TiDBStoreLimit = "tidb_store_limit"

// TiDBIsolationReadEngines indicates the tidb only read from the stores whose engine type is involved in IsolationReadEngines.
// Now, only support TiKV and TiFlash.
TiDBIsolationReadEngines = "tidb_isolation_read_engines"
)

// Default TiDB system variable values.
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/timeutil"
)
Expand Down Expand Up @@ -644,6 +645,24 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", nil
}
return checkUInt64SystemVar(name, value, 0, math.MaxUint8, vars)
case TiDBIsolationReadEngines:
engines := strings.Split(value, ",")
var formatVal string
for i, engine := range engines {
engine = strings.TrimSpace(engine)
if i != 0 {
formatVal += ","
}
switch {
case strings.EqualFold(engine, kv.TiKV.Name()):
formatVal += kv.TiKV.Name()
case strings.EqualFold(engine, kv.TiFlash.Name()):
formatVal += kv.TiFlash.Name()
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
}
return formatVal, nil
}
return value, nil
}
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBStmtSummaryRefreshInterval, "", false},
{TiDBStmtSummaryHistorySize, "a", true},
{TiDBStmtSummaryHistorySize, "", false},
{TiDBIsolationReadEngines, "", true},
{TiDBIsolationReadEngines, "tikv", false},
{TiDBIsolationReadEngines, "TiKV,tiflash", false},
{TiDBIsolationReadEngines, " tikv, tiflash ", false},
}

for _, t := range tests {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
s.storeType = kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
if label.Value == kv.TiFlash.Name() {
s.storeType = kv.TiFlash
}
break
Expand Down Expand Up @@ -1184,7 +1184,7 @@ func (s *Store) reResolve(c *RegionCache) {
storeType := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
if label.Value == kv.TiFlash.Name() {
storeType = kv.TiFlash
}
break
Expand Down

0 comments on commit 67b66c7

Please sign in to comment.