diff --git a/executor/set_test.go b/executor/set_test.go index 05b20812e75a2..c3e2ec3a8626a 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -517,6 +517,17 @@ func (s *testSerialSuite1) TestSetVar(c *C) { tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off") tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0")) + tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on") + tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1")) + tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off") + tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0")) + tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on") + tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1")) + tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off") + tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0")) + c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil) + c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil) + // Test issue #22145 tk.MustExec(`set global sync_relay_log = "'"`) diff --git a/server/conn.go b/server/conn.go index cecc23d6dab5b..6479e0d065566 100644 --- a/server/conn.go +++ b/server/conn.go @@ -72,6 +72,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/arena" "github.com/pingcap/tidb/util/chunk" @@ -1457,14 +1458,31 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { if len(pointPlans) > 0 { defer cc.ctx.ClearValue(plannercore.PointPlanKey) } + var retryable bool for i, stmt := range stmts { if len(pointPlans) > 0 { // Save the point plan in Session so we don't need to build the point plan again. cc.ctx.SetValue(plannercore.PointPlanKey, plannercore.PointPlanVal{Plan: pointPlans[i]}) } - err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1) + retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1) if err != nil { - break + if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { + // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash + // server and fallback to TiKV. + warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err}) + func() { + delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash) + defer func() { + cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{} + }() + _, err = cc.handleStmt(ctx, stmt, warns, i == len(stmts)-1) + }() + if err != nil { + break + } + } else { + break + } } } if err != nil { @@ -1567,7 +1585,9 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm return pointPlans, nil } -func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) error { +// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error. +// Currently the first return value is used to fallback to TiKV when TiFlash is down. +func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) { ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) reg := trace.StartRegion(ctx, "ExecuteStmt") rs, err := cc.ctx.ExecuteStmt(ctx, stmt) @@ -1578,7 +1598,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ defer terror.Call(rs.Close) } if err != nil { - return err + return true, err } if lastStmt { @@ -1593,12 +1613,12 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ if rs != nil { connStatus := atomic.LoadInt32(&cc.status) if connStatus == connStatusShutdown { - return executor.ErrQueryInterrupted + return false, executor.ErrQueryInterrupted } - err = cc.writeResultset(ctx, rs, false, status, 0) + retryable, err := cc.writeResultset(ctx, rs, false, status, 0) if err != nil { - return err + return retryable, err } } else { handled, err := cc.handleQuerySpecial(ctx, status) @@ -1609,10 +1629,10 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ } } if err != nil { - return err + return false, err } } - return nil + return false, nil } func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bool, error) { @@ -1678,7 +1698,10 @@ func (cc *clientConn) handleFieldList(ctx context.Context, sql string) (err erro // If binary is true, the data would be encoded in BINARY format. // serverStatus, a flag bit represents server information. // fetchSize, the desired number of rows to be fetched each time when client uses cursor. -func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16, fetchSize int) (runErr error) { +// retryable indicates whether the call of writeResultset has no side effect and can be retried to correct error. The call +// has side effect in cursor mode or once data has been sent to client. Currently retryable is used to fallback to TiKV when +// TiFlash is down. +func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16, fetchSize int) (retryable bool, runErr error) { defer func() { // close ResultSet when cursor doesn't exist r := recover() @@ -1699,13 +1722,13 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b if mysql.HasCursorExistsFlag(serverStatus) { err = cc.writeChunksWithFetchSize(ctx, rs, serverStatus, fetchSize) } else { - err = cc.writeChunks(ctx, rs, binary, serverStatus) + retryable, err = cc.writeChunks(ctx, rs, binary, serverStatus) } if err != nil { - return err + return retryable, err } - return cc.flush(ctx) + return false, cc.flush(ctx) } func (cc *clientConn) writeColumnInfo(columns []*ColumnInfo, serverStatus uint16) error { @@ -1727,10 +1750,12 @@ func (cc *clientConn) writeColumnInfo(columns []*ColumnInfo, serverStatus uint16 // writeChunks writes data from a Chunk, which filled data by a ResultSet, into a connection. // binary specifies the way to dump data. It throws any error while dumping data. // serverStatus, a flag bit represents server information -func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) error { +// The first return value indicates whether error occurs at the first call of ResultSet.Next. +func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) (bool, error) { data := cc.alloc.AllocWithLen(4, 1024) req := rs.NewChunk() gotColumnInfo := false + firstNext := true var stmtDetail *execdetails.StmtExecDetails stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey) if stmtDetailRaw != nil { @@ -1741,15 +1766,16 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool // Here server.tidbResultSet implements Next method. err := rs.Next(ctx, req) if err != nil { - return err + return firstNext, err } + firstNext = false if !gotColumnInfo { // We need to call Next before we get columns. // Otherwise, we will get incorrect columns info. columns := rs.Columns() err = cc.writeColumnInfo(columns, serverStatus) if err != nil { - return err + return false, err } gotColumnInfo = true } @@ -1768,11 +1794,11 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool } if err != nil { reg.End() - return err + return false, err } if err = cc.writePacket(data); err != nil { reg.End() - return err + return false, err } } reg.End() @@ -1780,7 +1806,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool stmtDetail.WriteSQLRespDuration += time.Since(start) } } - return cc.writeEOF(serverStatus) + return false, cc.writeEOF(serverStatus) } // writeChunksWithFetchSize writes data from a Chunk, which filled data by a ResultSet, into a connection. @@ -1873,7 +1899,7 @@ func (cc *clientConn) writeMultiResultset(ctx context.Context, rss []ResultSet, if !lastRs { status |= mysql.ServerMoreResultsExists } - if err := cc.writeResultset(ctx, rs, binary, status, 0); err != nil { + if _, err := cc.writeResultset(ctx, rs, binary, status, 0); err != nil { return err } } diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 9de3c4cc47822..762a5be52cf39 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -46,9 +46,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hack" @@ -192,12 +194,31 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e } } ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) + retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor) + if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { + // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash + // server and fallback to TiKV. + prevErr := err + delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash) + defer func() { + cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{} + }() + _, err = cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor) + // We append warning after the retry because `ResetContextOfStmt` may be called during the retry, which clears warnings. + cc.ctx.GetSessionVars().StmtCtx.AppendError(prevErr) + } + return err +} + +// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried +// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down. +func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) { rs, err := stmt.Execute(ctx, args) if err != nil { - return errors.Annotate(err, cc.preparedStmt2String(stmtID)) + return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID()))) } if rs == nil { - return cc.writeOK(ctx) + return false, cc.writeOK(ctx) } // if the client wants to use cursor @@ -207,20 +228,20 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e stmt.StoreResultSet(rs) err = cc.writeColumnInfo(rs.Columns(), mysql.ServerStatusCursorExists) if err != nil { - return err + return false, err } if cl, ok := rs.(fetchNotifier); ok { cl.OnFetchReturned() } // explicitly flush columnInfo to client. - return cc.flush(ctx) + return false, cc.flush(ctx) } defer terror.Call(rs.Close) - err = cc.writeResultset(ctx, rs, true, 0, 0) + retryable, err := cc.writeResultset(ctx, rs, true, 0, 0) if err != nil { - return errors.Annotate(err, cc.preparedStmt2String(stmtID)) + return retryable, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID()))) } - return nil + return false, nil } // maxFetchSize constants @@ -252,7 +273,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID)) } - err = cc.writeResultset(ctx, rs, true, mysql.ServerStatusCursorExists, int(fetchSize)) + _, err = cc.writeResultset(ctx, rs, true, mysql.ServerStatusCursorExists, int(fetchSize)) if err != nil { return errors.Annotate(err, cc.preparedStmt2String(stmtID)) } diff --git a/server/conn_test.go b/server/conn_test.go index 54971486981b0..8a54c13ff7dfc 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -23,6 +23,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" @@ -710,3 +711,49 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) { tk.MustExec("commit") tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5")) } + +func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout", "return(true)"), IsNil) + cc := &clientConn{ + alloc: arena.NewAllocator(1024), + pkt: &packetIO{ + bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)), + }, + } + tk := testkit.NewTestKitWithInit(c, ts.store) + cc.ctx = &TiDBContext{Session: tk.Se, stmts: make(map[int]*TiDBStatement)} + + tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int)") + tk.MustExec("insert into t values (3, 4), (6, 7), (9, 10)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustQuery("explain select sum(a) from t").Check(testkit.Rows( + "StreamAgg_20 1.00 root funcs:sum(Column#6)->Column#4", + "└─TableReader_21 1.00 root data:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6", + " └─TableFullScan_19 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo")) + + ctx := context.Background() + c.Assert(cc.handleQuery(ctx, "select sum(a) from t"), IsNil) + tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) + c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil) + c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil) + tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout"), IsNil) +} diff --git a/session/session.go b/session/session.go index 474c24d5d3efb..6287c5df35a40 100644 --- a/session/session.go +++ b/session/session.go @@ -2640,6 +2640,7 @@ var builtinGlobalVariable = []string{ variable.TiDBTrackAggregateMemoryUsage, variable.TiDBMultiStatementMode, variable.TiDBEnableExchangePartition, + variable.TiDBEnableTiFlashFallbackTiKV, } var ( diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 38b8c5767e2c0..e763e10aed6b2 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -809,6 +809,9 @@ type SessionVars struct { // TiDBEnableExchangePartition indicates whether to enable exchange partition TiDBEnableExchangePartition bool + + // EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable. + EnableTiFlashFallbackTiKV bool } // CheckAndGetTxnScope will return the transaction scope we should use in the current session. @@ -967,6 +970,7 @@ func NewSessionVars() *SessionVars { GuaranteeLinearizability: DefTiDBGuaranteeLinearizability, AnalyzeVersion: DefTiDBAnalyzeVersion, EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin, + EnableTiFlashFallbackTiKV: DefTiDBEnableTiFlashFallbackTiKV, } vars.KVVars = kv.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ @@ -1697,6 +1701,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.MultiStatementMode = TiDBOptMultiStmt(val) case TiDBEnableExchangePartition: s.TiDBEnableExchangePartition = TiDBOptOn(val) + case TiDBEnableTiFlashFallbackTiKV: + s.EnableTiFlashFallbackTiKV = TiDBOptOn(val) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index efd2a64a03653..b128ecd4276e3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -659,6 +659,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableFastAnalyze, Value: BoolToOnOff(DefTiDBUseFastAnalyze), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipIsolationLevelCheck, Value: BoolToOnOff(DefTiDBSkipIsolationLevelCheck), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableRateLimitAction, Value: BoolToOnOff(DefTiDBEnableRateLimitAction), Type: TypeBool}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashFallbackTiKV, Value: BoolToOnOff(DefTiDBEnableTiFlashFallbackTiKV), Type: TypeBool}, /* The following variable is defined as session scope but is actually server scope. */ {Scope: ScopeSession, Name: TiDBGeneralLog, Value: BoolToOnOff(DefTiDBGeneralLog), Type: TypeBool}, {Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 898d65747b562..7fd20780907e4 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -523,6 +523,9 @@ const ( // TiDBEnableExchangePartition indicates whether to enable exchange partition. TiDBEnableExchangePartition = "tidb_enable_exchange_partition" + + // TiDBEnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable. + TiDBEnableTiFlashFallbackTiKV = "tidb_enable_tiflash_fallback_tikv" ) // TiDB vars that have only global scope @@ -663,6 +666,7 @@ const ( DefTiDBEnableIndexMergeJoin = false DefTiDBTrackAggregateMemoryUsage = false DefTiDBEnableExchangePartition = false + DefTiDBEnableTiFlashFallbackTiKV = false ) // Process global variables. diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index c5b56363c3338..0c70f77a79ad9 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -105,6 +105,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.ShardAllocateStep, Equals, int64(DefTiDBShardAllocateStep)) c.Assert(vars.EnableChangeColumnType, Equals, DefTiDBChangeColumnType) c.Assert(vars.AnalyzeVersion, Equals, DefTiDBAnalyzeVersion) + c.Assert(vars.EnableTiFlashFallbackTiKV, Equals, DefTiDBEnableTiFlashFallbackTiKV) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota)) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.BatchSize)) diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index 39d4e9f6ba8c7..ba597fa346981 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -146,6 +147,15 @@ func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, st if needRetry { // Backoff once for each retry. err = bo.Backoff(BoRegionMiss, errors.New("Cannot find region with TiFlash peer")) + // Actually ErrRegionUnavailable would be thrown out rather than ErrTiFlashServerTimeout. However, since currently + // we don't have MockTiFlash, we inject ErrTiFlashServerTimeout to simulate the situation that TiFlash is down. + if storeType == kv.TiFlash { + failpoint.Inject("errorMockTiFlashServerTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.Trace(ErrTiFlashServerTimeout)) + } + }) + } if err != nil { return nil, errors.Trace(err) }