Skip to content

Commit

Permalink
store/copr: polish the tiflash-tikv fallback function. (#23078)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Mar 4, 2021
1 parent 5f73c82 commit 4a598ac
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 54 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
Expand All @@ -44,7 +44,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2 h1:Vx3qsoBtFHSQ5GTARXRh1AwNRVJ8SXaedLzIohnxClE=
github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb h1:2rGvEhflp/uK1l1rNUmoHA4CiHpbddHGxg52H71Fke8=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -411,6 +411,8 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 h1:lNGXD00uNXOKMM2pnTe9XvUv3IOEOtFhqNQljlTDZKc=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22 h1:O95vOUHHmAcjdw01D233Cvn5YsxsBDBCMGb3RZcHzgM=
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
94 changes: 64 additions & 30 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand All @@ -46,7 +51,17 @@ var _ = SerialSuites(&ConnTestSuite{})
func (ts *ConnTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
ts.store, err = mockstore.NewMockStore()
ts.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
mockCluster := c.(*unistore.Cluster)
_, _, region1 := mockstore.BootstrapWithSingleStore(c)
store := c.AllocID()
peer := c.AllocID()
mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
mockCluster.AddPeer(region1, store, peer)
}),
mockstore.WithStoreType(mockstore.EmbedUnistore),
)
c.Assert(err, IsNil)
ts.dom, err = session.BootstrapSession(ts.store)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -712,8 +727,17 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}

func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
func testGetTableByName(c *C, ctx sessionctx.Context, db, table string) table.Table {
dom := domain.GetDomain(ctx)
// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table))
c.Assert(err, IsNil)
return tbl
}

func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {
cc := &clientConn{
alloc: arena.NewAllocator(1024),
pkt: &packetIO{
Expand All @@ -723,37 +747,47 @@ func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
tk := testkit.NewTestKitWithInit(c, ts.store)
cc.ctx = &TiDBContext{Session: tk.Se, stmts: make(map[int]*TiDBStatement)}

tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int)")
tk.MustExec("insert into t values (3, 4), (6, 7), (9, 10)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustQuery("explain select sum(a) from t").Check(testkit.Rows(
"StreamAgg_20 1.00 root funcs:sum(Column#6)->Column#4",
"└─TableReader_21 1.00 root data:StreamAgg_8",
" └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6",
" └─TableFullScan_19 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"))

tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustQuery("select count(*) from t").Check(testkit.Rows("3"))
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
// test batch cop send req error
testFallbackWork(c, tk, cc, "select sum(a) from t")
ctx := context.Background()
c.Assert(cc.handleQuery(ctx, "select sum(a) from t"), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)

tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout", "return(true)"), IsNil)
testFallbackWork(c, tk, cc, "select sum(a) from t")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "return(true)"), IsNil)
tk.MustExec("set @@session.tidb_allow_mpp=1")
testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(-1)"), IsNil)
tk.MustExec("set @@session.tidb_allow_mpp=1")
testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil)
}

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout"), IsNil)
func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 0")
c.Assert(tk.QueryToErr(sql), NotNil)
tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1")

c.Assert(cc.handleQuery(ctx, sql), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
}
12 changes: 1 addition & 11 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -149,15 +148,6 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik
if needRetry {
// Backoff once for each retry.
err = bo.Backoff(tikv.BoRegionMiss, errors.New("Cannot find region with TiFlash peer"))
// Actually ErrRegionUnavailable would be thrown out rather than ErrTiFlashServerTimeout. However, since currently
// we don't have MockTiFlash, we inject ErrTiFlashServerTimeout to simulate the situation that TiFlash is down.
if storeType == kv.TiFlash {
failpoint.Inject("errorMockTiFlashServerTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.Trace(tikv.ErrTiFlashServerTimeout))
}
})
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -401,7 +391,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return errors.Trace(err)
return tikv.ErrTiFlashServerTimeout
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ func (ss *RegionBatchRequestSender) onSendFail(bo *tikv.Backoffer, ctxs []copTas
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs))
err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs))
return errors.Trace(err)
}
10 changes: 7 additions & 3 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,19 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
m.sendError(sender.GetRPCError())
logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}
} else {
rpcResp, err = m.store.GetTiKVClient().SendRequest(ctx, originalTask.storeAddr, wrappedReq, tikv.ReadTimeoutMedium)
}

if err != nil {
m.sendError(err)
logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}

Expand Down Expand Up @@ -277,7 +281,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
}
}
m.sendToRespCh(&mppResponse{
err: errors.New(resp.Error.Msg),
err: tikv.ErrTiFlashServerTimeout,
})
return
}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/unistore/cophandler/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ func HandleMPPDAGReq(dbReader *dbreader.DBReader, req *coprocessor.Request, mppC
}
err = mppExec.open()
if err != nil {
return &coprocessor.Response{OtherError: err.Error()}
panic("open phase find error: " + err.Error())
}
_, err = mppExec.next()
if err != nil {
return &coprocessor.Response{OtherError: err.Error()}
panic("running phase find error: " + err.Error())
}
return &coprocessor.Response{}
}
Expand Down
21 changes: 19 additions & 2 deletions store/mockstore/unistore/cophandler/mpp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,14 @@ type exchRecvExec struct {
lock sync.Mutex
wg sync.WaitGroup
err error
inited bool
}

func (e *exchRecvExec) open() error {
return nil
}

func (e *exchRecvExec) init() error {
e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, 0)
serverMetas := make([]*mpp.TaskMeta, 0, len(e.exchangeReceiver.EncodedTaskMeta))
for _, encodedMeta := range e.exchangeReceiver.EncodedTaskMeta {
Expand All @@ -231,6 +236,12 @@ func (e *exchRecvExec) open() error {
}

func (e *exchRecvExec) next() (*chunk.Chunk, error) {
if !e.inited {
e.inited = true
if err := e.init(); err != nil {
return nil, err
}
}
if e.chk != nil {
defer func() {
e.chk = nil
Expand Down Expand Up @@ -326,6 +337,7 @@ type joinExec struct {

idx int
reservedRows []chunk.Row
inited bool
}

func (e *joinExec) buildHashTable() error {
Expand Down Expand Up @@ -399,11 +411,16 @@ func (e *joinExec) open() error {
if err != nil {
return errors.Trace(err)
}
err = e.buildHashTable()
return errors.Trace(err)
return nil
}

func (e *joinExec) next() (*chunk.Chunk, error) {
if !e.inited {
e.inited = true
if err := e.buildHashTable(); err != nil {
return nil, err
}
}
for {
if e.idx < len(e.reservedRows) {
idx := e.idx
Expand Down
19 changes: 18 additions & 1 deletion store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
case tikvrpc.CmdMPPConn:
resp.Resp, err = c.handleEstablishMPPConnection(ctx, req.EstablishMPPConn(), timeout, storeID)
case tikvrpc.CmdMPPTask:
failpoint.Inject("mppDispatchTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("rpc error"))
}
})
resp.Resp, err = c.handleDispatchMPPTask(ctx, req.DispatchMPPTask(), storeID)
case tikvrpc.CmdMvccGetByKey:
resp.Resp, err = c.usSvr.MvccGetByKey(ctx, req.MvccGetByKey())
Expand Down Expand Up @@ -292,7 +297,7 @@ func (c *RPCClient) handleEstablishMPPConnection(ctx context.Context, r *mpp.Est
if err != nil {
return nil, err
}
var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0}
var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0, targetTask: r.ReceiverMeta}
streamResp := &tikvrpc.MPPStreamResponse{Tikv_EstablishMPPConnectionClient: &mockClient}
_, cancel := context.WithCancel(ctx)
streamResp.Lease.Cancel = cancel
Expand Down Expand Up @@ -455,13 +460,20 @@ func (mock *mockBatchCopClient) Recv() (*coprocessor.BatchResponse, error) {
}
return ret, err
}
failpoint.Inject("batchCopRecvTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, context.Canceled)
}
})
return nil, io.EOF
}

type mockMPPConnectionClient struct {
mockClientStream
mppResponses []*mpp.MPPDataPacket
idx int

targetTask *mpp.TaskMeta
}

func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) {
Expand All @@ -470,6 +482,11 @@ func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) {
mock.idx++
return ret, nil
}
failpoint.Inject("mppRecvTimeout", func(val failpoint.Value) {
if int64(val.(int)) == mock.targetTask.TaskId {
failpoint.Return(nil, context.Canceled)
}
})
return nil, io.EOF
}

Expand Down
4 changes: 4 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRe
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) RawGetKeyTTL(ctx context.Context, req *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down

0 comments on commit 4a598ac

Please sign in to comment.