Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, session: add isolation read with engine type #12997

Merged
merged 15 commits into from
Nov 1, 2019
8 changes: 8 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ const (
TiFlash
)

// Name returns the name of store type.
func (t StoreType) Name() string {
if t == TiFlash {
return "tiflash"
}
return "tikv"
}

// Request represents a kv request.
type Request struct {
// Tp is the request type.
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,9 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, indent string, isLast
var storeType string
switch x.StoreType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call Name directly?

case kv.TiKV:
storeType = "tikv"
storeType = kv.TiKV.Name()
case kv.TiFlash:
storeType = "tiflash"
storeType = kv.TiFlash.Name()
default:
err = errors.Errorf("the store type %v is unknown", x.StoreType)
return
Expand Down
17 changes: 17 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,20 @@ func (s *testIntegrationSuite) TestSimplifyOuterJoinWithCast(c *C) {
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSuite) TestNoneAccessPathsFoundByIsolationRead(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")

_, err := tk.Exec("select * from t")
c.Assert(err, IsNil)

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")

_, err = tk.Exec("select * from t")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can not find access path matching 'tidb_isolation_read_engines'(value: 'tiflash'). Available values are 'tikv'.")
}
4 changes: 4 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,6 +2438,10 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if err != nil {
return nil, err
}
possiblePaths, err = b.filterPathByIsolationRead(possiblePaths)
if err != nil {
return nil, err
}

var columns []*table.Column
if b.inUpdateStmt {
Expand Down
27 changes: 27 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -662,6 +663,32 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tblInf
return available, nil
}

func (b *PlanBuilder) filterPathByIsolationRead(paths []*accessPath) ([]*accessPath, error) {
// TODO: filter paths with isolation read locations.
isolationReadEngines := b.ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
for i := len(paths) - 1; i >= 0; i-- {
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := availableEngine[paths[i].storeType]; !ok {
availableEngine[paths[i].storeType] = struct{}{}
if availableEngineStr != "" {
availableEngineStr += ", "
}
availableEngineStr += paths[i].storeType.Name()
}
if _, ok := isolationReadEngines[paths[i].storeType]; !ok {
paths = append(paths[:i], paths[i+1:]...)
}
}
var err error
if len(paths) == 0 {
engineVals, _ := b.ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines)
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("Can not find access path matching '%v'(value: '%v'). Available values are '%v'.",
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr))
}
return paths, err
}

func removeIgnoredPaths(paths, ignoredPaths []*accessPath, tblInfo *model.TableInfo) []*accessPath {
if len(ignoredPaths) == 0 {
return paths
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,7 @@ var builtinGlobalVariable = []string{
variable.TiDBMaxDeltaSchemaCount,
variable.TiDBCapturePlanBaseline,
variable.TiDBUsePlanBaselines,
variable.TiDBIsolationReadEngines,
}

var (
Expand Down
15 changes: 15 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,21 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite) TestIsolationRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(len(tk.Se.GetSessionVars().GetIsolationReadEngines()), Equals, 2)
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';")
engines := tk.Se.GetSessionVars().GetIsolationReadEngines()
c.Assert(len(engines), Equals, 1)
_, hasTiFlash := engines[kv.TiFlash]
_, hasTiKV := engines[kv.TiKV]
c.Assert(hasTiFlash, Equals, true)
c.Assert(hasTiKV, Equals, false)
}

func (s *testSessionSuite) TestStmtHints(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ type SessionVars struct {
// replicaRead is used for reading data from replicas, only follower is supported at this time.
replicaRead kv.ReplicaReadType

// isolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
isolationReadEngines map[kv.StoreType]struct{}

PlannerSelectBlockAsName []ast.HintTable
}

Expand Down Expand Up @@ -520,6 +523,7 @@ func NewSessionVars() *SessionVars {
replicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}},
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -616,6 +620,11 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// GetIsolationReadEngines gets isolation read engines.
func (s *SessionVars) GetIsolationReadEngines() map[kv.StoreType]struct{} {
return s.isolationReadEngines
}

// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
s.GetWriteStmtBufs().clean()
Expand Down Expand Up @@ -956,6 +965,16 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBUsePlanBaselines:
s.UsePlanBaselines = TiDBOptOn(val)
case TiDBIsolationReadEngines:
s.isolationReadEngines = make(map[kv.StoreType]struct{})
for _, engine := range strings.Split(val, ",") {
switch engine {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
case kv.TiKV.Name():
s.isolationReadEngines[kv.TiKV] = struct{}{}
case kv.TiFlash.Name():
s.isolationReadEngines[kv.TiFlash] = struct{}{}
}
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
{ScopeGlobal | ScopeSession, TiDBCapturePlanBaseline, "0"},
{ScopeGlobal | ScopeSession, TiDBUsePlanBaselines, BoolToIntStr(DefTiDBUsePlanBaselines)},
{ScopeGlobal | ScopeSession, TiDBIsolationReadEngines, "tikv,tiflash"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ const (

// TiDBUsePlanBaselines indicates whether the use of plan baselines is enabled.
TiDBUsePlanBaselines = "tidb_use_plan_baselines"

// TiDBIsolationReadEngines indicates the tidb only read from the stores whose engine type is involved in IsolationReadEngines.
// Now, only support TiKV and TiFlash.
TiDBIsolationReadEngines = "tidb_isolation_read_engines"
)

// Default TiDB system variable values.
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/timeutil"
)
Expand Down Expand Up @@ -625,6 +626,24 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBIsolationReadEngines:
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
engines := strings.Split(value, ",")
var formatVal string
for i, engine := range engines {
engine = strings.TrimSpace(engine)
if i != 0 {
formatVal += ","
}
switch {
case strings.EqualFold(engine, kv.TiKV.Name()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also trim the spaces around each item and remove the duplicated items? For example, users may set the variable this way:

set @@ tidb_isolation_read_engines = "tiflash, tikv,  tiflash,    tikv";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

formatVal += kv.TiKV.Name()
case strings.EqualFold(engine, kv.TiFlash.Name()):
formatVal += kv.TiFlash.Name()
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
}
return formatVal, nil
}
return value, nil
}
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBTxnMode, "pessimistic", false},
{TiDBTxnMode, "optimistic", false},
{TiDBTxnMode, "", false},
{TiDBIsolationReadEngines, "", true},
{TiDBIsolationReadEngines, "tikv", false},
{TiDBIsolationReadEngines, "TiKV,tiflash", false},
{TiDBIsolationReadEngines, " tikv, tiflash ", false},
}

for _, t := range tests {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
s.storeType = kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
if label.Value == kv.TiFlash.Name() {
s.storeType = kv.TiFlash
}
break
Expand Down Expand Up @@ -1265,7 +1265,7 @@ func (s *Store) reResolve(c *RegionCache) {
storeType := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
if label.Value == kv.TiFlash.Name() {
storeType = kv.TiFlash
}
break
Expand Down