Skip to content

Commit

Permalink
*: support coprocessor cache for paging protocol (#35787)
Browse files Browse the repository at this point in the history
close #35786
  • Loading branch information
tiancaiamao authored Jul 1, 2022
1 parent 16e5815 commit b71a23b
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 37 deletions.
3 changes: 0 additions & 3 deletions executor/copr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ func TestIntegrationCopCache(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key)")

// TODO(tiancaiamao) update the test and support cop cache for paging.
tk.MustExec("set @@tidb_enable_paging = off")

tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tid := tblInfo.Meta().ID
Expand Down
3 changes: 0 additions & 3 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,9 +822,6 @@ func TestLimitIndexEstimation(t *testing.T) {
SQL string
Plan []string
}
// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

analyzeSuiteData := core.GetAnalyzeSuiteData()
analyzeSuiteData.GetTestCases(t, &input, &output)
Expand Down
17 changes: 0 additions & 17 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ func TestPushLimitDownIndexLookUpReader(t *testing.T) {
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("analyze table tbl")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -3685,9 +3682,6 @@ func TestExtendedStatsSwitch(t *testing.T) {
"1.000000 1",
))

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

// Estimated index scan count is 4 using extended stats.
tk.MustQuery("explain format = 'brief' select * from t use index(b) where a > 3 order by b limit 1").Check(testkit.Rows(
"Limit 1.00 root offset:0, count:1",
Expand Down Expand Up @@ -4557,9 +4551,6 @@ func TestLimitIndexLookUpKeepOrder(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int, c int, d int, index idx(a,b,c));")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -4786,9 +4777,6 @@ func TestMultiColMaxOneRow(t *testing.T) {
tk.MustExec("create table t1(a int)")
tk.MustExec("create table t2(a int, b int, c int, primary key(a,b))")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5561,8 +5549,6 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) {

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")
// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
Expand Down Expand Up @@ -5602,9 +5588,6 @@ func TestIssue27083(t *testing.T) {
require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down
4 changes: 0 additions & 4 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,6 @@ func TestEliminateMaxOneRow(t *testing.T) {
tk.MustExec("create table t2(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL)")
tk.MustExec("create table t3(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, c int(11) DEFAULT NULL, UNIQUE KEY idx_abc (a, b, c))")

// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
Expand Down
2 changes: 1 addition & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func calcPagingCost(ctx sessionctx.Context, indexPlan PhysicalPlan, expectCnt ui

// we want the diff between idxCst and pagingCst here,
// however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed
return pagingCst - sessVars.GetSeekFactor(nil)
return math.Max(pagingCst-sessVars.GetSeekFactor(nil), 0)
}

func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask {
Expand Down
36 changes: 31 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,15 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
var cacheKey []byte
var cacheValue *coprCacheValue

// TODO: cache paging copr
// If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since
// computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key.
if task.cmdType == tikvrpc.CmdCop && !task.paging && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
cKey, err := coprCacheBuildKey(&copReq)
if err == nil {
cacheKey = cKey
cValue := worker.store.coprCache.Get(cKey)
copReq.IsCacheEnabled = true

if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs {
// Append cache version to the request to skip Coprocessor computation if possible
// when request result is cached
Expand Down Expand Up @@ -779,7 +779,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
metrics.TiKVCoprocessorHistogram.WithLabelValues(storeID, strconv.FormatBool(staleRead)).Observe(costTime.Seconds())

if worker.req.Paging {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime)
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, costTime)
}

// Handles the response for non-paging copTask.
Expand Down Expand Up @@ -848,8 +848,8 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime)
func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, cacheKey, cacheValue, task, ch, nil, costTime)
if err != nil || len(remainedTasks) != 0 {
// If there is region error or lock error, keep the paging size and retry.
for _, remainedTask := range remainedTasks {
Expand Down Expand Up @@ -954,6 +954,26 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
data := make([]byte, len(cacheValue.Data))
copy(data, cacheValue.Data)
resp.pbResp.Data = data
if worker.req.Paging {
var start, end []byte
if cacheValue.PageStart != nil {
start = make([]byte, len(cacheValue.PageStart))
copy(start, cacheValue.PageStart)
}
if cacheValue.PageEnd != nil {
end = make([]byte, len(cacheValue.PageEnd))
copy(end, cacheValue.PageEnd)
}
// When paging protocol is used, the response key range is part of the cache data.
if start != nil || end != nil {
resp.pbResp.Range = &coprocessor.KeyRange{
Start: start,
End: end,
}
} else {
resp.pbResp.Range = nil
}
}
resp.detail.CoprCacheHit = true
} else {
// Cache not hit or cache hit but not valid: update the cache if the response can be cached.
Expand All @@ -969,6 +989,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
RegionID: task.region.GetID(),
RegionDataVersion: resp.pbResp.CacheLastVersion,
}
// When paging protocol is used, the response key range is part of the cache data.
if r := resp.pbResp.GetRange(); r != nil {
newCacheValue.PageStart = append([]byte{}, r.GetStart()...)
newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...)
}

worker.store.coprCache.Set(cacheKey, &newCacheValue)
}
}
Expand Down
14 changes: 13 additions & 1 deletion store/copr/coprocessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type coprCacheValue struct {
TimeStamp uint64
RegionID uint64
RegionDataVersion uint64

// Used in coprocessor paging protocol
PageStart []byte
PageEnd []byte
}

func (v *coprCacheValue) String() string {
Expand All @@ -54,7 +58,7 @@ func (v *coprCacheValue) String() string {
const coprCacheValueSize = int(unsafe.Sizeof(coprCacheValue{}))

func (v *coprCacheValue) Len() int {
return coprCacheValueSize + len(v.Key) + len(v.Data)
return coprCacheValueSize + len(v.Key) + len(v.Data) + len(v.PageStart) + len(v.PageEnd)
}

func newCoprCache(config *config.CoprocessorCache) (*coprCache, error) {
Expand Down Expand Up @@ -108,6 +112,9 @@ func coprCacheBuildKey(copReq *coprocessor.Request) ([]byte, error) {
}
totalLength += 2 + len(r.Start) + 2 + len(r.End)
}
if copReq.PagingSize > 0 {
totalLength += 1
}

key := make([]byte, totalLength)

Expand Down Expand Up @@ -141,6 +148,11 @@ func coprCacheBuildKey(copReq *coprocessor.Request) ([]byte, error) {
dest += len(r.End)
}

// 1 byte when use paging protocol
if copReq.PagingSize > 0 {
key[dest] = 1
}

return key, nil
}

Expand Down
16 changes: 13 additions & 3 deletions store/copr/coprocessor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func TestCacheValueLen(t *testing.T) {
RegionID: 0x1,
RegionDataVersion: 0x3,
}
// 72 = (8 byte pointer + 8 byte for length + 8 byte for cap) * 2 + 8 byte * 3
require.Equal(t, 72, v.Len())
// 120 = (8 byte pointer + 8 byte for length + 8 byte for cap) * 4 + 8 byte * 3
require.Equal(t, 120, v.Len())

v = coprCacheValue{
Key: []byte("foobar"),
Expand All @@ -165,7 +165,17 @@ func TestCacheValueLen(t *testing.T) {
RegionID: 0x1,
RegionDataVersion: 0x3,
}
require.Equal(t, 72+len(v.Key)+len(v.Data), v.Len())
require.Equal(t, 120+len(v.Key)+len(v.Data), v.Len())

v = coprCacheValue{
Key: []byte("foobar"),
Data: []byte("12345678"),
TimeStamp: 0x123,
RegionID: 0x1,
RegionDataVersion: 0x3,
PageEnd: []byte("3235"),
}
require.Equal(t, 120+len(v.Key)+len(v.Data)+len(v.PageEnd), v.Len())
}

func TestGetSet(t *testing.T) {
Expand Down

0 comments on commit b71a23b

Please sign in to comment.