Skip to content

Commit

Permalink
store/tikv: move coprocessor out (#22922)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Mar 1, 2021
1 parent 5706c0c commit febac51
Show file tree
Hide file tree
Showing 28 changed files with 306 additions and 199 deletions.
7 changes: 4 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -261,7 +262,7 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) {
func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
Expand Down Expand Up @@ -334,7 +335,7 @@ func (r *selectResult) Close() error {
// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats.
type CopRuntimeStats interface {
// GetCopRuntimeStats gets the cop runtime stats information.
GetCopRuntimeStats() *tikv.CopRuntimeStats
GetCopRuntimeStats() *copr.CopRuntimeStats
}

type selectResultRuntimeStats struct {
Expand All @@ -347,7 +348,7 @@ type selectResultRuntimeStats struct {
CoprCacheHitNum int64
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) {
func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
s.copRespTime = append(s.copRespTime, respTime)
if copStats.ScanDetail != nil {
s.procKeys = append(s.procKeys, copStats.ScanDetail.ProcessedKeys)
Expand Down
8 changes: 4 additions & 4 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -31,7 +31,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
sr := selectResult{ctx: ctx, storeType: kv.TiKV}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
t := uint64(1)
Expand All @@ -41,12 +41,12 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
},
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse)

sr.copPlanIDs = []int{sr.rootPlanID}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String(), Equals, "tikv_task:{time:1ns, loops:1}, scan_detail: {total_process_keys: 0, total_keys: 0, rocksdb: {delete_skipped_count: 0, key_skipped_count: 0, block: {cache_hit_count: 0, read_count: 0, read_byte: 0 Bytes}}}")
}
4 changes: 2 additions & 2 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand All @@ -44,7 +44,7 @@ func checkGoroutineExists(keyword string) bool {

func (s *testSuite3) TestCopClientSend(c *C) {
c.Skip("not stable")
if _, ok := s.store.GetClient().(*tikv.CopClient); !ok {
if _, ok := s.store.GetClient().(*copr.CopClient); !ok {
// Make sure the store is tikv store.
return
}
Expand Down
15 changes: 8 additions & 7 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
Expand Down Expand Up @@ -7002,7 +7003,7 @@ func (s *testSerialSuite) TestCoprocessorOOMTicase(c *C) {
for _, testcase := range testcases {
c.Log(testcase.name)
// larger than one copResponse, smaller than 2 copResponse
quota := 2*tikv.MockResponseSizeForTest - 100
quota := 2*copr.MockResponseSizeForTest - 100
se, err := session.CreateSession4Test(s.store)
c.Check(err, IsNil)
tk.Se = se
Expand All @@ -7020,17 +7021,17 @@ func (s *testSerialSuite) TestCoprocessorOOMTicase(c *C) {
}

// ticase-4169, trigger oom action twice after workers consuming all the data
failpoint.Enable("github.com/pingcap/tidb/store/tikv/ticase-4169", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/ticase-4169", `return(true)`)
f()
failpoint.Disable("github.com/pingcap/tidb/store/tikv/ticase-4169")
failpoint.Disable("github.com/pingcap/tidb/store/copr/ticase-4169")
// ticase-4170, trigger oom action twice after iterator receiving all the data.
failpoint.Enable("github.com/pingcap/tidb/store/tikv/ticase-4170", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/ticase-4170", `return(true)`)
f()
failpoint.Disable("github.com/pingcap/tidb/store/tikv/ticase-4170")
failpoint.Disable("github.com/pingcap/tidb/store/copr/ticase-4170")
// ticase-4171, trigger oom before reading or consuming any data
failpoint.Enable("github.com/pingcap/tidb/store/tikv/ticase-4171", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/ticase-4171", `return(true)`)
f()
failpoint.Disable("github.com/pingcap/tidb/store/tikv/ticase-4171")
failpoint.Disable("github.com/pingcap/tidb/store/copr/ticase-4171")
}

func (s *testSuite) TestIssue20237(c *C) {
Expand Down
7 changes: 4 additions & 3 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
Expand Down Expand Up @@ -148,9 +149,9 @@ func (s *seqTestSuite) TestEarlyClose(c *C) {
}

// Goroutine should not leak when error happen.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/handleTaskOnceError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/handleTaskOnceError", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/handleTaskOnceError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/handleTaskOnceError"), IsNil)
}()
rss, err := tk.Se.Execute(ctx, "select * from earlyclose")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -680,7 +681,7 @@ func (s *seqTestSuite) TestShowStatsHealthy(c *C) {
// TestIndexDoubleReadClose checks that when a index double read returns before reading all the rows, the goroutine doesn't
// leak. For testing distsql with multiple regions, we need to manually split a mock TiKV.
func (s *seqTestSuite) TestIndexDoubleReadClose(c *C) {
if _, ok := s.store.GetClient().(*tikv.CopClient); !ok {
if _, ok := s.store.GetClient().(*copr.CopClient); !ok {
// Make sure the store is tikv store.
return
}
Expand Down
4 changes: 2 additions & 2 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
}

func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
pkt: &packetIO{
Expand Down Expand Up @@ -755,5 +755,5 @@ func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout"), IsNil)
}
13 changes: 7 additions & 6 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
Expand Down Expand Up @@ -3794,13 +3795,13 @@ func (s *testSessionSerialSuite) TestCoprocessorOOMAction(c *C) {
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionCancel
})
failpoint.Enable("github.com/pingcap/tidb/store/tikv/testRateLimitActionMockConsumeAndAssert", `return(true)`)
defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/testRateLimitActionMockConsumeAndAssert")
failpoint.Enable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`)
defer failpoint.Disable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockConsumeAndAssert")

enableOOM := func(tk *testkit.TestKit, name, sql string) {
c.Logf("enable OOM, testcase: %v", name)
// larger than 4 copResponse, smaller than 5 copResponse
quota := 5*tikv.MockResponseSizeForTest - 100
quota := 5*copr.MockResponseSizeForTest - 100
tk.MustExec("use test")
tk.MustExec("set @@tidb_distsql_scan_concurrency = 10")
tk.MustExec(fmt.Sprintf("set @@tidb_mem_quota_query=%v;", quota))
Expand All @@ -3815,7 +3816,7 @@ func (s *testSessionSerialSuite) TestCoprocessorOOMAction(c *C) {

disableOOM := func(tk *testkit.TestKit, name, sql string) {
c.Logf("disable OOM, testcase: %v", name)
quota := 5*tikv.MockResponseSizeForTest - 100
quota := 5*copr.MockResponseSizeForTest - 100
tk.MustExec("use test")
tk.MustExec("set @@tidb_distsql_scan_concurrency = 10")
tk.MustExec(fmt.Sprintf("set @@tidb_mem_quota_query=%v;", quota))
Expand All @@ -3824,7 +3825,7 @@ func (s *testSessionSerialSuite) TestCoprocessorOOMAction(c *C) {
c.Assert(err.Error(), Matches, "Out Of Memory Quota.*")
}

failpoint.Enable("github.com/pingcap/tidb/store/tikv/testRateLimitActionMockWaitMax", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockWaitMax", `return(true)`)
// assert oom action and switch
for _, testcase := range testcases {
se, err := session.CreateSession4Test(s.store)
Expand Down Expand Up @@ -3855,7 +3856,7 @@ func (s *testSessionSerialSuite) TestCoprocessorOOMAction(c *C) {
enableOOM(tk, testcase.name, testcase.sql)
se.Close()
}
failpoint.Disable("github.com/pingcap/tidb/store/tikv/testRateLimitActionMockWaitMax")
failpoint.Disable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockWaitMax")

// assert oom fallback
for _, testcase := range testcases {
Expand Down
Loading

0 comments on commit febac51

Please sign in to comment.