From 1ae9bfad716806092778cbc007c303f1c2b00c2c Mon Sep 17 00:00:00 2001 From: Yifan Xu <30385241+xuyifangreeneyes@users.noreply.github.com> Date: Tue, 9 Mar 2021 19:58:55 +0800 Subject: [PATCH] server: refine tiflash fallback testcase (#23103) --- server/conn.go | 5 ++++ server/conn_test.go | 48 +++++++++++++++++++++++++-------- store/mockstore/unistore/rpc.go | 5 ++++ 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/server/conn.go b/server/conn.go index 5128e9b9ca7a7..b4a648811f763 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1777,6 +1777,11 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool } for { + failpoint.Inject("secondNextErr", func(value failpoint.Value) { + if value.(bool) && !firstNext { + failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout) + } + }) // Here server.tidbResultSet implements Next method. err := rs.Next(ctx, req) if err != nil { diff --git a/server/conn_test.go b/server/conn_test.go index 8feb24d4692c8..03c40e0cd9b34 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -753,40 +753,66 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { tb := testGetTableByName(c, tk.Se, "test", "t") err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) c.Assert(err, IsNil) - tk.MustExec("insert into t values(1,0)") - tk.MustExec("insert into t values(2,0)") - tk.MustExec("insert into t values(3,0)") - tk.MustQuery("select count(*) from t").Check(testkit.Rows("3")) + for i := 0; i < 50; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%v, 0)", i)) + } + tk.MustQuery("select count(*) from t").Check(testkit.Rows("50")) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) - // test batch cop send req error - testFallbackWork(c, tk, cc, "select sum(a) from t") + // test COM_STMT_EXECUTE ctx := context.Background() + tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil) - tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) + // test COM_STMT_FETCH (cursor mode) + c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil) + c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil) + tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0") + c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil) + + // test that TiDB would not retry if the first execution already sends data to client + c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil) + tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil) + + // simple TiFlash query (unary + non-streaming) + tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/copRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) + testFallbackWork(c, tk, cc, "select sum(a) from t") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/copRpcErrtiflash0"), IsNil) + + // TiFlash query based on batch cop (batch + streaming) + tk.MustExec("set @@tidb_allow_batch_cop=1; set @@tidb_allow_mpp=0;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) + testFallbackWork(c, tk, cc, "select sum(a) from t") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout", "return(true)"), IsNil) testFallbackWork(c, tk, cc, "select sum(a) from t") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout"), IsNil) + // TiFlash MPP query (MPP + streaming) + tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=1;") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "return(true)"), IsNil) - tk.MustExec("set @@session.tidb_allow_mpp=1") testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(-1)"), IsNil) - tk.MustExec("set @@session.tidb_allow_mpp=1") testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil) } func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) { ctx := context.Background() - tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 0") + tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0") c.Assert(tk.QueryToErr(sql), NotNil) - tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1") + tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") c.Assert(cc.handleQuery(ctx, sql), IsNil) tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 52bdc5e34a513..69d9a5639fc39 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -223,6 +223,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R case tikvrpc.CmdRawScan: resp.Resp, err = c.rawHandler.RawScan(ctx, req.RawScan()) case tikvrpc.CmdCop: + failpoint.Inject("copRpcErr"+addr, func(value failpoint.Value) { + if value.(string) == addr { + failpoint.Return(nil, errors.New("cop rpc error")) + } + }) resp.Resp, err = c.usSvr.Coprocessor(ctx, req.Cop()) case tikvrpc.CmdCopStream: resp.Resp, err = c.handleCopStream(ctx, req.Cop())