Skip to content

Commit

Permalink
executor,store/copr: fix index merge, distsql request's key ranges sh…
Browse files Browse the repository at this point in the history
…ould be sorted (#36633)

close #36632
  • Loading branch information
tiancaiamao authored Jul 29, 2022
1 parent 7a47538 commit 8b919bf
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 0 deletions.
8 changes: 8 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -304,6 +305,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
var builder distsql.RequestBuilder
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand Down Expand Up @@ -612,6 +616,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
}

// init kvReq, result and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(kvRange, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
kvReq, err := builder.SetKeyRanges(kvRange).Build()
if err != nil {
worker.syncErr(err)
Expand Down
47 changes: 47 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -581,3 +582,49 @@ func TestAdaptiveClosestRead(t *testing.T) {
// 2 IndexScan with cost 19/56, 2 TableReader with cost 32.5/65.
checkMetrics("select/* +USE_INDEX_MERGE(t) */ id from t use index(`idx_v_s1`) use index(idx_s2) where (s1 < 3 and v > 0) or s2 = 3;", 3, 1)
}

func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging"))
}()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE `UK_COLLATION19523` (" +
"`COL1` binary(1) DEFAULT NULL," +
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"UNIQUE KEY `U_COL1` (`COL1`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci")

tk.MustExec("prepare stmt from 'SELECT/*+ HASH_JOIN(t1, t2) */ * FROM UK_COLLATION19523 t1 JOIN UK_COLLATION19523 t2 ON t1.col1 > t2.col1 WHERE t1.col1 IN (?, ?, ?) AND t2.col1 < ?;';")
tk.MustExec("set @a=0x4F, @b=0xF8, @c=NULL, @d=0xBF;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0x00, @b=0xD2, @c=9179987834981541375, @d=0xF8;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")

tk.MustExec("CREATE TABLE `IDT_COLLATION26873` (" +
"`COL1` varbinary(20) DEFAULT NULL," +
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"KEY `U_COL1` (`COL1`))")
tk.MustExec("prepare stmt from 'SELECT/*+ INL_JOIN(t1, t2) */ t2.* FROM IDT_COLLATION26873 t1 LEFT JOIN IDT_COLLATION26873 t2 ON t1.col1 = t2.col1 WHERE t1.col1 < ? AND t1.col1 IN (?, ?, ?);';")
tk.MustExec("set @a=NULL, @b=NULL, @c=NULL, @d=NULL;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xE3253A6AC72A3A168EAF0E34A4779A947872CCCD, @b=0xD67BB26504EE152C2C356D7F6CAD897F03462963, @c=NULL, @d=0xDE735FEB375A4CF33479A39CA925470BFB229DB4;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=2606738829406840179, @b=1468233589368287363, @c=5174008984061521089, @d=7727946571160309462;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=NULL, @c=6864108002939154648, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @c=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
}
5 changes: 5 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -324,6 +325,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
}

// init kvReq and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(keyRange, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
kvReq, err := builder.SetKeyRanges(keyRange).Build()
if err != nil {
worker.syncErr(e.resultCh, err)
Expand Down
14 changes: 14 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package copr

import (
"bytes"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -51,6 +52,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var coprCacheCounterEvict = tidbmetrics.DistSQLCoprCacheCounter.WithLabelValues("evict")
Expand Down Expand Up @@ -92,6 +94,18 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
// coprocessor request but type is not DAG
req.Paging = false
}

failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
if req.Paging {
isSorted := slices.IsSortedFunc(req.KeyRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
if !isSorted {
logutil.BgLogger().Fatal("distsql request key range not sorted!")
}
}
})

ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
ctx = context.WithValue(ctx, util.RequestSourceKey, req.RequestSource)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
Expand Down

0 comments on commit 8b919bf

Please sign in to comment.