Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: refine tiflash fallback testcase #23103

Merged
merged 16 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,11 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
}

for {
failpoint.Inject("secondNextErr", func(value failpoint.Value) {
if value.(bool) && !firstNext {
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
}
})
// Here server.tidbResultSet implements Next method.
err := rs.Next(ctx, req)
if err != nil {
Expand Down
48 changes: 37 additions & 11 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,40 +753,66 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustQuery("select count(*) from t").Check(testkit.Rows("3"))
for i := 0; i < 50; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v, 0)", i))
}
tk.MustQuery("select count(*) from t").Check(testkit.Rows("50"))

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
// test batch cop send req error
testFallbackWork(c, tk, cc, "select sum(a) from t")
// test COM_STMT_EXECUTE
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)

tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
// test COM_STMT_FETCH (cursor mode)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

// test that TiDB would not retry if the first execution already sends data to client
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil)

// simple TiFlash query (unary + non-streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/copRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
testFallbackWork(c, tk, cc, "select sum(a) from t")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/copRpcErrtiflash0"), IsNil)

// TiFlash query based on batch cop (batch + streaming)
tk.MustExec("set @@tidb_allow_batch_cop=1; set @@tidb_allow_mpp=0;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
testFallbackWork(c, tk, cc, "select sum(a) from t")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout", "return(true)"), IsNil)
testFallbackWork(c, tk, cc, "select sum(a) from t")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout"), IsNil)

// TiFlash MPP query (MPP + streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=1;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "return(true)"), IsNil)
tk.MustExec("set @@session.tidb_allow_mpp=1")
testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(-1)"), IsNil)
tk.MustExec("set @@session.tidb_allow_mpp=1")
testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil)
}

func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 0")
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
c.Assert(tk.QueryToErr(sql), NotNil)
tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1")
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")

c.Assert(cc.handleQuery(ctx, sql), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
case tikvrpc.CmdRawScan:
resp.Resp, err = c.rawHandler.RawScan(ctx, req.RawScan())
case tikvrpc.CmdCop:
failpoint.Inject("copRpcErr"+addr, func(value failpoint.Value) {
if value.(string) == addr {
failpoint.Return(nil, errors.New("cop rpc error"))
}
})
resp.Resp, err = c.usSvr.Coprocessor(ctx, req.Cop())
case tikvrpc.CmdCopStream:
resp.Resp, err = c.handleCopStream(ctx, req.Cop())
Expand Down