Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, session: add isolation read with engine type #12997

Merged
merged 15 commits into from
Nov 1, 2019
2 changes: 2 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,8 @@ var builtinGlobalVariable = []string{
variable.CollationServer,
variable.NetWriteTimeout,
variable.MaxExecutionTime,
variable.IsolationReadLabels,
variable.IsolationReadEngines,

/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check,
Expand Down
21 changes: 21 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,27 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite) TestIsolationRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(len(tk.Se.GetSessionVars().GetIsolationReadEngines()), Equals, 0)
c.Assert(len(tk.Se.GetSessionVars().GetIsolationReadLabels()), Equals, 0)
tk.MustExec("set @@isolation_read_labels = 'idc:bj_idc_1,district:Shanghai';")
labels := tk.Se.GetSessionVars().GetIsolationReadLabels()
c.Assert(len(labels), Equals, 2)
c.Assert(labels[0].Key, Equals, "idc")
c.Assert(labels[0].Value, Equals, "bj_idc_1")
c.Assert(labels[1].Key, Equals, "district")
c.Assert(labels[1].Value, Equals, "Shanghai")
tk.MustExec("set @@isolation_read_engines = 'tikv,tiflash';")
engines := tk.Se.GetSessionVars().GetIsolationReadEngines()
c.Assert(len(engines), Equals, 2)
c.Assert(engines[0], Equals, kv.TiKV)
c.Assert(engines[1], Equals, kv.TiFlash)
}

func (s *testSessionSuite) TestStmtHints(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
Expand Down
44 changes: 44 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"github.com/pingcap/kvproto/pkg/metapb"
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -443,6 +444,12 @@ type SessionVars struct {
// replicaRead is used for reading data from replicas, only follower is supported at this time.
replicaRead kv.ReplicaReadType

// isolationReadLabels is used to isolation read, tidb only read from the stores have a label in the labels.
isolationReadLabels []*metapb.StoreLabel

// isolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
isolationReadEngines []kv.StoreType

PlannerSelectBlockAsName []ast.HintTable
}

Expand Down Expand Up @@ -616,6 +623,16 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// GetIsolationReadLabels gets isolation read labels.
func (s *SessionVars) GetIsolationReadLabels() []*metapb.StoreLabel {
return s.isolationReadLabels
}

// GetIsolationReadEngines gets isolation read engines.
func (s *SessionVars) GetIsolationReadEngines() []kv.StoreType {
return s.isolationReadEngines
}

// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
s.GetWriteStmtBufs().clean()
Expand Down Expand Up @@ -956,6 +973,31 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBUsePlanBaselines:
s.UsePlanBaselines = TiDBOptOn(val)
case IsolationReadLabels:
s.isolationReadLabels = make([]*metapb.StoreLabel, 0, 2)
if len(val) == 0 {
break
}
for _, label := range strings.Split(val, ",") {
labelPair := strings.Split(label, ":")
s.isolationReadLabels = append(s.isolationReadLabels, &metapb.StoreLabel{
Key: labelPair[0],
Value: labelPair[1],
})
}
case IsolationReadEngines:
s.isolationReadEngines = make([]kv.StoreType, 0, 2)
if len(val) == 0 {
break
}
for _, engine := range strings.Split(val, ",") {
switch engine {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
case "TiFlash":
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
s.isolationReadEngines = append(s.isolationReadEngines, kv.TiFlash)
case "TiKV":
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
s.isolationReadEngines = append(s.isolationReadEngines, kv.TiKV)
}
}
}
s.systems[name] = val
return nil
Expand Down Expand Up @@ -997,6 +1039,8 @@ const (
TransactionIsolation = "transaction_isolation"
TxnIsolationOneShot = "tx_isolation_one_shot"
MaxExecutionTime = "max_execution_time"
IsolationReadLabels = "isolation_read_labels"
IsolationReadEngines = "isolation_read_engines"
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
)

// these variables are useless for TiDB, but still need to validate their values for some compatible issues.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
{ScopeGlobal | ScopeSession, TiDBCapturePlanBaseline, "0"},
{ScopeGlobal | ScopeSession, TiDBUsePlanBaselines, BoolToIntStr(DefTiDBUsePlanBaselines)},
{ScopeGlobal | ScopeSession, IsolationReadLabels, ""},
{ScopeGlobal | ScopeSession, IsolationReadEngines, ""},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
31 changes: 31 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,37 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case IsolationReadLabels:
if value == "" {
return "", nil
}
labels := strings.Split(value, ",")
for _, label := range labels {
if len(strings.Split(label, ":")) != 2 {
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
}
return value, nil
case IsolationReadEngines:
if value == "" {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
return "", nil
}
engines := strings.Split(value, ",")
var formatVal string
for i, engine := range engines {
if i != 0 {
formatVal += ","
}
switch {
case strings.EqualFold(engine, "TiKV"):
formatVal += "TiKV"
case strings.EqualFold(engine, "TiFlash"):
formatVal += "TiFlash"
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
}
return formatVal, nil
}
return value, nil
}
Expand Down