Skip to content

Commit

Permalink
txn: enable pipelined dml by hint (#51770)
Browse files Browse the repository at this point in the history
ref #50215
  • Loading branch information
ekexium authored Mar 15, 2024
1 parent ea20982 commit 68c03cf
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 127 deletions.
2 changes: 1 addition & 1 deletion pkg/planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func setVarHintChecker(varName, hint string) (ok bool, warning error) {
if sysVar == nil { // no such a variable
return false, plannererrors.ErrUnresolvedHintName.FastGenByArgs(varName, hint)
}
if !sysVar.IsHintUpdatableVerfied {
if !sysVar.IsHintUpdatableVerified {
warning = plannererrors.ErrNotHintUpdatable.FastGenByArgs(varName)
}
return true, warning
Expand Down
3 changes: 3 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4305,6 +4305,9 @@ func (s *session) usePipelinedDmlOrWarn() bool {
if stmtCtx == nil {
return false
}
if stmtCtx.IsReadOnly {
return false
}
vars := s.GetSessionVars()
if !vars.TxnCtx.EnableMDL {
stmtCtx.AppendWarning(
Expand Down
239 changes: 191 additions & 48 deletions pkg/sessionctx/variable/noop.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/sessionctx/variable/setvar_affect.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var isHintUpdatableVerified = map[string]struct{}{
func setHintUpdatable(vars []*SysVar) {
for _, v := range vars {
if _, ok := isHintUpdatableVerified[v.Name]; ok {
v.IsHintUpdatableVerfied = true
v.IsHintUpdatableVerified = true
}
}
}
222 changes: 148 additions & 74 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,10 +1243,16 @@ var defaultSysVars = []*SysVar{
s.EnableNonPreparedPlanCacheForDML = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptEnableFuzzyBinding, Value: BoolToOnOff(false), Type: TypeBool, IsHintUpdatableVerfied: true, SetSession: func(s *SessionVars, val string) error {
s.EnableFuzzyBinding = TiDBOptOn(val)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: TiDBOptEnableFuzzyBinding,
Value: BoolToOnOff(false),
Type: TypeBool,
IsHintUpdatableVerified: true,
SetSession: func(s *SessionVars, val string) error {
s.EnableFuzzyBinding = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBNonPreparedPlanCacheSize, Value: strconv.FormatUint(uint64(DefTiDBNonPreparedPlanCacheSize), 10), Type: TypeUnsigned, MinValue: 1, MaxValue: 100000, SetSession: func(s *SessionVars, val string) error {
uVal, err := strconv.ParseUint(val, 10, 64)
if err == nil {
Expand Down Expand Up @@ -1462,39 +1468,70 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: DefaultWeekFormat, Value: "0", Type: TypeUnsigned, MinValue: 0, MaxValue: 7},
{Scope: ScopeGlobal | ScopeSession, Name: SQLModeVar, Value: mysql.DefaultSQLMode, IsHintUpdatableVerfied: true, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
// Ensure the SQL mode parses
normalizedValue = mysql.FormatSQLModeStr(normalizedValue)
if _, err := mysql.GetSQLMode(normalizedValue); err != nil {
return originalValue, err
}
return normalizedValue, nil
}, SetSession: func(s *SessionVars, val string) error {
val = mysql.FormatSQLModeStr(val)
// Modes is a list of different modes separated by commas.
sqlMode, err := mysql.GetSQLMode(val)
if err != nil {
return errors.Trace(err)
}
s.SQLMode = sqlMode
s.SetStatusFlag(mysql.ServerStatusNoBackslashEscaped, sqlMode.HasNoBackslashEscapesMode())
return nil
}},
{Scope: ScopeGlobal, Name: TiDBLoadBindingTimeout, Value: "200", Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt32, IsHintUpdatableVerfied: false, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
timeoutMS := tidbOptPositiveInt32(s, 0)
vars.LoadBindingTimeout = uint64(timeoutMS)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: MaxExecutionTime, Value: "0", Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt32, IsHintUpdatableVerfied: true, SetSession: func(s *SessionVars, val string) error {
timeoutMS := tidbOptPositiveInt32(val, 0)
s.MaxExecutionTime = uint64(timeoutMS)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiKVClientReadTimeout, Value: "0", Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt32, IsHintUpdatableVerfied: true, SetSession: func(s *SessionVars, val string) error {
timeoutMS := tidbOptPositiveInt32(val, 0)
s.TiKVClientReadTimeout = uint64(timeoutMS)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: SQLModeVar,
Value: mysql.DefaultSQLMode,
IsHintUpdatableVerified: true,
Validation: func(
vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag,
) (string, error) {
// Ensure the SQL mode parses
normalizedValue = mysql.FormatSQLModeStr(normalizedValue)
if _, err := mysql.GetSQLMode(normalizedValue); err != nil {
return originalValue, err
}
return normalizedValue, nil
}, SetSession: func(s *SessionVars, val string) error {
val = mysql.FormatSQLModeStr(val)
// Modes is a list of different modes separated by commas.
sqlMode, err := mysql.GetSQLMode(val)
if err != nil {
return errors.Trace(err)
}
s.SQLMode = sqlMode
s.SetStatusFlag(mysql.ServerStatusNoBackslashEscaped, sqlMode.HasNoBackslashEscapesMode())
return nil
}},
{
Scope: ScopeGlobal,
Name: TiDBLoadBindingTimeout,
Value: "200",
Type: TypeUnsigned,
MinValue: 0,
MaxValue: math.MaxInt32,
IsHintUpdatableVerified: false,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
timeoutMS := tidbOptPositiveInt32(s, 0)
vars.LoadBindingTimeout = uint64(timeoutMS)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: MaxExecutionTime,
Value: "0",
Type: TypeUnsigned,
MinValue: 0,
MaxValue: math.MaxInt32,
IsHintUpdatableVerified: true,
SetSession: func(s *SessionVars, val string) error {
timeoutMS := tidbOptPositiveInt32(val, 0)
s.MaxExecutionTime = uint64(timeoutMS)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: TiKVClientReadTimeout,
Value: "0",
Type: TypeUnsigned,
MinValue: 0,
MaxValue: math.MaxInt32,
IsHintUpdatableVerified: true,
SetSession: func(s *SessionVars, val string) error {
timeoutMS := tidbOptPositiveInt32(val, 0)
s.TiKVClientReadTimeout = uint64(timeoutMS)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: CollationServer, Value: mysql.DefaultCollationName, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
return checkCollation(vars, normalizedValue, originalValue, scope)
}, SetSession: func(s *SessionVars, val string) error {
Expand All @@ -1514,20 +1551,28 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: SQLLogBin, Value: On, Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TimeZone, Value: "SYSTEM", IsHintUpdatableVerfied: true, Validation: func(varErrFunctionsNoopImpls *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
if strings.EqualFold(normalizedValue, "SYSTEM") {
return "SYSTEM", nil
}
_, err := timeutil.ParseTimeZone(normalizedValue)
return normalizedValue, err
}, SetSession: func(s *SessionVars, val string) error {
tz, err := timeutil.ParseTimeZone(val)
if err != nil {
return err
}
s.TimeZone = tz
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: TimeZone,
Value: "SYSTEM",
IsHintUpdatableVerified: true,
Validation: func(
varErrFunctionsNoopImpls *SessionVars, normalizedValue string, originalValue string,
scope ScopeFlag,
) (string, error) {
if strings.EqualFold(normalizedValue, "SYSTEM") {
return "SYSTEM", nil
}
_, err := timeutil.ParseTimeZone(normalizedValue)
return normalizedValue, err
}, SetSession: func(s *SessionVars, val string) error {
tz, err := timeutil.ParseTimeZone(val)
if err != nil {
return err
}
s.TimeZone = tz
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: ForeignKeyChecks, Value: BoolToOnOff(DefTiDBForeignKeyChecks), Type: TypeBool, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
if TiDBOptOn(normalizedValue) {
vars.ForeignKeyChecks = true
Expand Down Expand Up @@ -1612,21 +1657,31 @@ var defaultSysVars = []*SysVar{
s.LockWaitTimeout = lockWaitSec * 1000
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: GroupConcatMaxLen, Value: "1024", IsHintUpdatableVerfied: true, Type: TypeUnsigned, MinValue: 4, MaxValue: math.MaxUint64, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
// https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_group_concat_max_len
// Minimum Value 4
// Maximum Value (64-bit platforms) 18446744073709551615
// Maximum Value (32-bit platforms) 4294967295
if mathutil.IntBits == 32 {
if val, err := strconv.ParseUint(normalizedValue, 10, 64); err == nil {
if val > uint64(math.MaxUint32) {
vars.StmtCtx.AppendWarning(ErrTruncatedWrongValue.FastGenByArgs(GroupConcatMaxLen, originalValue))
return strconv.FormatInt(int64(math.MaxUint32), 10), nil
{
Scope: ScopeGlobal | ScopeSession,
Name: GroupConcatMaxLen,
Value: "1024",
IsHintUpdatableVerified: true,
Type: TypeUnsigned,
MinValue: 4,
MaxValue: math.MaxUint64,
Validation: func(
vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag,
) (string, error) {
// https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_group_concat_max_len
// Minimum Value 4
// Maximum Value (64-bit platforms) 18446744073709551615
// Maximum Value (32-bit platforms) 4294967295
if mathutil.IntBits == 32 {
if val, err := strconv.ParseUint(normalizedValue, 10, 64); err == nil {
if val > uint64(math.MaxUint32) {
vars.StmtCtx.AppendWarning(ErrTruncatedWrongValue.FastGenByArgs(GroupConcatMaxLen, originalValue))
return strconv.FormatInt(int64(math.MaxUint32), 10), nil
}
}
}
}
return normalizedValue, nil
}},
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: CharacterSetConnection, Value: mysql.DefaultCharset, skipInit: true, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
return checkCharacterSet(normalizedValue, CharacterSetConnection)
}, SetSession: func(s *SessionVars, val string) error {
Expand Down Expand Up @@ -1673,10 +1728,16 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: WindowingUseHighPrecision, Value: On, Type: TypeBool, IsHintUpdatableVerfied: true, SetSession: func(s *SessionVars, val string) error {
s.WindowingUseHighPrecision = TiDBOptOn(val)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: WindowingUseHighPrecision,
Value: On,
Type: TypeBool,
IsHintUpdatableVerified: true,
SetSession: func(s *SessionVars, val string) error {
s.WindowingUseHighPrecision = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: BlockEncryptionMode, Value: "aes-128-ecb", Type: TypeEnum, PossibleValues: []string{"aes-128-ecb", "aes-192-ecb", "aes-256-ecb", "aes-128-cbc", "aes-192-cbc", "aes-256-cbc", "aes-128-ofb", "aes-192-ofb", "aes-256-ofb", "aes-128-cfb", "aes-192-cfb", "aes-256-cfb"}},
/* TiDB specific variables */
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), Depended: true, SetSession: func(s *SessionVars, val string) error {
Expand Down Expand Up @@ -1737,10 +1798,16 @@ var defaultSysVars = []*SysVar{
s.SetAllowInSubqToJoinAndAgg(TiDBOptOn(val))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptPreferRangeScan, Value: BoolToOnOff(DefOptPreferRangeScan), Type: TypeBool, IsHintUpdatableVerfied: true, SetSession: func(s *SessionVars, val string) error {
s.SetAllowPreferRangeScan(TiDBOptOn(val))
return nil
}},
{
Scope: ScopeGlobal | ScopeSession,
Name: TiDBOptPreferRangeScan,
Value: BoolToOnOff(DefOptPreferRangeScan),
Type: TypeBool,
IsHintUpdatableVerified: true,
SetSession: func(s *SessionVars, val string) error {
s.SetAllowPreferRangeScan(TiDBOptOn(val))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptLimitPushDownThreshold, Value: strconv.Itoa(DefOptLimitPushDownThreshold), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
s.LimitPushDownThreshold = TidbOptInt64(val, DefOptLimitPushDownThreshold)
return nil
Expand Down Expand Up @@ -2715,7 +2782,12 @@ var defaultSysVars = []*SysVar{
s.EnableMPPSharedCTEExecution = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptFixControl, Value: "", Type: TypeStr, IsHintUpdatableVerfied: true,
{
Scope: ScopeGlobal | ScopeSession,
Name: TiDBOptFixControl,
Value: "",
Type: TypeStr,
IsHintUpdatableVerified: true,
SetGlobal: func(ctx context.Context, vars *SessionVars, val string) error {
// validation logic for setting global
// we don't put this in Validation to avoid repeating the checking logic for setting session.
Expand Down Expand Up @@ -3082,7 +3154,9 @@ var defaultSysVars = []*SysVar{
return nil
}
return errors.Errorf("unsupport DML type: %s", val)
}},
},
IsHintUpdatableVerified: true,
},
}

// GlobalSystemVariableInitialValue gets the default value for a system variable including ones that are dynamically set (e.g. based on the store)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sessionctx/variable/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ type SysVar struct {
SetSession func(*SessionVars, string) error
// SetGlobal is called after validation
SetGlobal func(context.Context, *SessionVars, string) error
// IsHintUpdatableVerfied indicate whether we've confirmed that SET_VAR() hint is worked for this hint.
IsHintUpdatableVerfied bool
// IsHintUpdatableVerified indicate whether we've confirmed that SET_VAR() hint is worked for this hint.
IsHintUpdatableVerified bool
// Deprecated: Hidden previously meant that the variable still responds to SET but doesn't show up in SHOW VARIABLES
// However, this feature is no longer used. All variables are visble.
Hidden bool
Expand Down
15 changes: 14 additions & 1 deletion tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestPipelinedDMLPositive(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("create table t (a int, b int)")
tk.MustExec("insert into t values(1, 1)")
tk.MustExec("set session tidb_dml_type = bulk")
for _, stmt := range stmts {
Expand Down Expand Up @@ -123,6 +123,19 @@ func TestPipelinedDMLPositive(t *testing.T) {
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error())
tk.MustQuery("show warnings").CheckContain("pessimistic-auto-commit config is ignored in favor of Pipelined DML")
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(false)

// enable by hint
// Hint works for DELETE and UPDATE, but not for INSERT if the hint is in its select clause.
tk.MustExec("set @@tidb_dml_type = standard")
err = panicToErr(
func() error {
_, err := tk.Exec("delete /*+ SET_VAR(tidb_dml_type=bulk) */ from t")
// "insert into t select /*+ SET_VAR(tidb_dml_type=bulk) */ * from t" won't work
return err
},
)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error())
}

func TestPipelinedDMLNegative(t *testing.T) {
Expand Down

0 comments on commit 68c03cf

Please sign in to comment.