From b71a23ba478a1379db8e7455afc410d3eb0d0b86 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 1 Jul 2022 18:32:39 +0800 Subject: [PATCH] *: support coprocessor cache for paging protocol (#35787) close pingcap/tidb#35786 --- executor/copr_cache_test.go | 3 --- planner/core/cbo_test.go | 3 --- planner/core/integration_test.go | 17 ------------- planner/core/physical_plan_test.go | 4 ---- planner/core/task.go | 2 +- store/copr/coprocessor.go | 36 ++++++++++++++++++++++++---- store/copr/coprocessor_cache.go | 14 ++++++++++- store/copr/coprocessor_cache_test.go | 16 ++++++++++--- 8 files changed, 58 insertions(+), 37 deletions(-) diff --git a/executor/copr_cache_test.go b/executor/copr_cache_test.go index 78bef9bed05cd..ba99c1894d864 100644 --- a/executor/copr_cache_test.go +++ b/executor/copr_cache_test.go @@ -53,9 +53,6 @@ func TestIntegrationCopCache(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t (a int primary key)") - // TODO(tiancaiamao) update the test and support cop cache for paging. - tk.MustExec("set @@tidb_enable_paging = off") - tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) tid := tblInfo.Meta().ID diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index a135eadf959a0..2aeeba0e92072 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -822,9 +822,6 @@ func TestLimitIndexEstimation(t *testing.T) { SQL string Plan []string } - // When paging is used, there is a 'paging:true' makes the explain output differ. - // IndexLookUp 0.00 root paging:true - tk.MustExec("set @@tidb_enable_paging = off") analyzeSuiteData := core.GetAnalyzeSuiteData() analyzeSuiteData.GetTestCases(t, &input, &output) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index ad038c1f3cd91..326755419ae26 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -177,9 +177,6 @@ func TestPushLimitDownIndexLookUpReader(t *testing.T) { tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)") tk.MustExec("analyze table tbl") - // When paging is enabled, there would be a 'paging: true' in the explain result. - tk.MustExec("set @@tidb_enable_paging = off") - var input []string var output []struct { SQL string @@ -3685,9 +3682,6 @@ func TestExtendedStatsSwitch(t *testing.T) { "1.000000 1", )) - // When paging is enabled, there would be a 'paging: true' in the explain result. - tk.MustExec("set @@tidb_enable_paging = off") - // Estimated index scan count is 4 using extended stats. tk.MustQuery("explain format = 'brief' select * from t use index(b) where a > 3 order by b limit 1").Check(testkit.Rows( "Limit 1.00 root offset:0, count:1", @@ -4557,9 +4551,6 @@ func TestLimitIndexLookUpKeepOrder(t *testing.T) { tk.MustExec("drop table if exists t;") tk.MustExec("create table t(a int, b int, c int, d int, index idx(a,b,c));") - // When paging is enabled, there would be a 'paging: true' in the explain result. - tk.MustExec("set @@tidb_enable_paging = off") - var input []string var output []struct { SQL string @@ -4786,9 +4777,6 @@ func TestMultiColMaxOneRow(t *testing.T) { tk.MustExec("create table t1(a int)") tk.MustExec("create table t2(a int, b int, c int, primary key(a,b))") - // When paging is enabled, there would be a 'paging: true' in the explain result. - tk.MustExec("set @@tidb_enable_paging = off") - var input []string var output []struct { SQL string @@ -5561,8 +5549,6 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) { // Default RPC encoding may cause statistics explain result differ and then the test unstable. tk.MustExec("set @@tidb_enable_chunk_rpc = on") - // When paging is enabled, there would be a 'paging: true' in the explain result. - tk.MustExec("set @@tidb_enable_paging = off") var input []string var output []struct { @@ -5602,9 +5588,6 @@ func TestIssue27083(t *testing.T) { require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll)) tk.MustExec("analyze table t") - // When paging is enabled, there would be a 'paging: true' in the explain result. - tk.MustExec("set @@tidb_enable_paging = off") - var input []string var output []struct { SQL string diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 97b80f92251c1..fe5c5cba7da00 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -538,10 +538,6 @@ func TestEliminateMaxOneRow(t *testing.T) { tk.MustExec("create table t2(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL)") tk.MustExec("create table t3(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, c int(11) DEFAULT NULL, UNIQUE KEY idx_abc (a, b, c))") - // When paging is used, there is a 'paging:true' makes the explain output differ. - // IndexLookUp 0.00 root paging:true - tk.MustExec("set @@tidb_enable_paging = off") - for i, ts := range input { testdata.OnRecord(func() { output[i].SQL = ts diff --git a/planner/core/task.go b/planner/core/task.go index 2f6d853f6b382..7d88aac896812 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -667,7 +667,7 @@ func calcPagingCost(ctx sessionctx.Context, indexPlan PhysicalPlan, expectCnt ui // we want the diff between idxCst and pagingCst here, // however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed - return pagingCst - sessVars.GetSeekFactor(nil) + return math.Max(pagingCst-sessVars.GetSeekFactor(nil), 0) } func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask { diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 87526935a71fc..f869711a50a05 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -712,15 +712,15 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch var cacheKey []byte var cacheValue *coprCacheValue - // TODO: cache paging copr // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since // computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key. - if task.cmdType == tikvrpc.CmdCop && !task.paging && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { + if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { cKey, err := coprCacheBuildKey(&copReq) if err == nil { cacheKey = cKey cValue := worker.store.coprCache.Get(cKey) copReq.IsCacheEnabled = true + if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs { // Append cache version to the request to skip Coprocessor computation if possible // when request result is cached @@ -779,7 +779,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch metrics.TiKVCoprocessorHistogram.WithLabelValues(storeID, strconv.FormatBool(staleRead)).Observe(costTime.Seconds()) if worker.req.Paging { - return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime) + return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, costTime) } // Handles the response for non-paging copTask. @@ -848,8 +848,8 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan return logStr } -func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { - remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime) +func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { + remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, cacheKey, cacheValue, task, ch, nil, costTime) if err != nil || len(remainedTasks) != 0 { // If there is region error or lock error, keep the paging size and retry. for _, remainedTask := range remainedTasks { @@ -954,6 +954,26 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R data := make([]byte, len(cacheValue.Data)) copy(data, cacheValue.Data) resp.pbResp.Data = data + if worker.req.Paging { + var start, end []byte + if cacheValue.PageStart != nil { + start = make([]byte, len(cacheValue.PageStart)) + copy(start, cacheValue.PageStart) + } + if cacheValue.PageEnd != nil { + end = make([]byte, len(cacheValue.PageEnd)) + copy(end, cacheValue.PageEnd) + } + // When paging protocol is used, the response key range is part of the cache data. + if start != nil || end != nil { + resp.pbResp.Range = &coprocessor.KeyRange{ + Start: start, + End: end, + } + } else { + resp.pbResp.Range = nil + } + } resp.detail.CoprCacheHit = true } else { // Cache not hit or cache hit but not valid: update the cache if the response can be cached. @@ -969,6 +989,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R RegionID: task.region.GetID(), RegionDataVersion: resp.pbResp.CacheLastVersion, } + // When paging protocol is used, the response key range is part of the cache data. + if r := resp.pbResp.GetRange(); r != nil { + newCacheValue.PageStart = append([]byte{}, r.GetStart()...) + newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...) + } + worker.store.coprCache.Set(cacheKey, &newCacheValue) } } diff --git a/store/copr/coprocessor_cache.go b/store/copr/coprocessor_cache.go index 35987347fb0f2..1adef9915cb48 100644 --- a/store/copr/coprocessor_cache.go +++ b/store/copr/coprocessor_cache.go @@ -41,6 +41,10 @@ type coprCacheValue struct { TimeStamp uint64 RegionID uint64 RegionDataVersion uint64 + + // Used in coprocessor paging protocol + PageStart []byte + PageEnd []byte } func (v *coprCacheValue) String() string { @@ -54,7 +58,7 @@ func (v *coprCacheValue) String() string { const coprCacheValueSize = int(unsafe.Sizeof(coprCacheValue{})) func (v *coprCacheValue) Len() int { - return coprCacheValueSize + len(v.Key) + len(v.Data) + return coprCacheValueSize + len(v.Key) + len(v.Data) + len(v.PageStart) + len(v.PageEnd) } func newCoprCache(config *config.CoprocessorCache) (*coprCache, error) { @@ -108,6 +112,9 @@ func coprCacheBuildKey(copReq *coprocessor.Request) ([]byte, error) { } totalLength += 2 + len(r.Start) + 2 + len(r.End) } + if copReq.PagingSize > 0 { + totalLength += 1 + } key := make([]byte, totalLength) @@ -141,6 +148,11 @@ func coprCacheBuildKey(copReq *coprocessor.Request) ([]byte, error) { dest += len(r.End) } + // 1 byte when use paging protocol + if copReq.PagingSize > 0 { + key[dest] = 1 + } + return key, nil } diff --git a/store/copr/coprocessor_cache_test.go b/store/copr/coprocessor_cache_test.go index 8c68888ee61a8..91906c980d0f8 100644 --- a/store/copr/coprocessor_cache_test.go +++ b/store/copr/coprocessor_cache_test.go @@ -155,8 +155,8 @@ func TestCacheValueLen(t *testing.T) { RegionID: 0x1, RegionDataVersion: 0x3, } - // 72 = (8 byte pointer + 8 byte for length + 8 byte for cap) * 2 + 8 byte * 3 - require.Equal(t, 72, v.Len()) + // 120 = (8 byte pointer + 8 byte for length + 8 byte for cap) * 4 + 8 byte * 3 + require.Equal(t, 120, v.Len()) v = coprCacheValue{ Key: []byte("foobar"), @@ -165,7 +165,17 @@ func TestCacheValueLen(t *testing.T) { RegionID: 0x1, RegionDataVersion: 0x3, } - require.Equal(t, 72+len(v.Key)+len(v.Data), v.Len()) + require.Equal(t, 120+len(v.Key)+len(v.Data), v.Len()) + + v = coprCacheValue{ + Key: []byte("foobar"), + Data: []byte("12345678"), + TimeStamp: 0x123, + RegionID: 0x1, + RegionDataVersion: 0x3, + PageEnd: []byte("3235"), + } + require.Equal(t, 120+len(v.Key)+len(v.Data)+len(v.PageEnd), v.Len()) } func TestGetSet(t *testing.T) {