Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203) #52056

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 101 additions & 3 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
package executor

import (
"cmp"
"context"
"encoding/base64"
"fmt"
"math/rand"
"os"
<<<<<<< HEAD:executor/benchmark_test.go
"sort"
=======
"slices"
>>>>>>> 58e5284b3f4 (planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203)):pkg/executor/benchmark_test.go
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -882,7 +887,49 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
return tc
}

<<<<<<< HEAD:executor/benchmark_test.go
func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
=======
func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType core.JoinType) *expression.Schema {
colsNeedResolving := joinSchema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if joinType == core.LeftOuterSemiJoin || joinType == core.AntiLeftOuterSemiJoin {
colsNeedResolving--
}
mergedSchema := expression.MergeSchema(lSchema, rSchema)
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, joinSchema.Len())
copy(shallowColSlice, joinSchema.Columns)
joinSchema = expression.NewSchema(shallowColSlice...)
foundCnt := 0
// Here we want to resolve all join schema columns directly as a merged schema, and you know same name
// col in join schema should be separately redirected to corresponded same col in child schema. But two
// column sets are **NOT** always ordered, see comment: https://github.com/pingcap/tidb/pull/45831#discussion_r1481031471
// we are using mapping mechanism instead of moving j forward.
marked := make([]bool, mergedSchema.Len())
for i := 0; i < colsNeedResolving; i++ {
findIdx := -1
for j := 0; j < len(mergedSchema.Columns); j++ {
if !joinSchema.Columns[i].EqualColumn(mergedSchema.Columns[j]) || marked[j] {
continue
}
// resolve to a same unique id one, and it not being marked.
findIdx = j
break
}
if findIdx != -1 {
// valid one.
joinSchema.Columns[i] = joinSchema.Columns[i].Clone().(*expression.Column)
joinSchema.Columns[i].Index = findIdx
marked[findIdx] = true
foundCnt++
}
}
return joinSchema
}

func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Executor) *HashJoinExec {
>>>>>>> 58e5284b3f4 (planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203)):pkg/executor/benchmark_test.go
if testCase.useOuterToBuild {
innerExec, outerExec = outerExec, innerExec
}
Expand Down Expand Up @@ -960,26 +1007,57 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)

// markChildrenUsedColsForTest compares each child with the output schema, and mark
// each column of the child is used by output or not.
<<<<<<< HEAD:executor/benchmark_test.go
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputSchema.Columns {
markedOffsets[col.Index] = struct{}{}
=======
func markChildrenUsedColsForTest(ctx sessionctx.Context, outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
childrenUsed = make([][]int, 0, len(childSchemas))
markedOffsets := make(map[int]int)
for originalIdx, col := range outputSchema.Columns {
markedOffsets[col.Index] = originalIdx
>>>>>>> 58e5284b3f4 (planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203)):pkg/executor/benchmark_test.go
}
prefixLen := 0
type intPair struct {
first int
second int
}
// for example here.
// left child schema: [col11]
// right child schema: [col21, col22]
// output schema is [col11, col22, col21], if not records the original derived order after physical resolve index.
// the lused will be [0], the rused will be [0,1], while the actual order is dismissed, [1,0] is correct for rused.
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
usedIdxPair := make([]intPair, 0, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
if originalIdx, ok := markedOffsets[prefixLen+i]; ok {
usedIdxPair = append(usedIdxPair, intPair{first: originalIdx, second: i})
}
}
<<<<<<< HEAD:executor/benchmark_test.go
childrenUsed = append(childrenUsed, used)
}
for _, child := range childSchemas {
used := expression.GetUsedList(outputSchema.Columns, child)
childrenUsed = append(childrenUsed, used)
}
=======
// sort the used idxes according their original indexes derived after resolveIndex.
slices.SortFunc(usedIdxPair, func(a, b intPair) int {
return cmp.Compare(a.first, b.first)
})
usedIdx := make([]int, 0, len(childSchema.Columns))
for _, one := range usedIdxPair {
usedIdx = append(usedIdx, one.second)
}
childrenUsed = append(childrenUsed, usedIdx)
prefixLen += childSchema.Len()
}
>>>>>>> 58e5284b3f4 (planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203)):pkg/executor/benchmark_test.go
return
}

Expand Down Expand Up @@ -1580,15 +1658,35 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
isOuterJoin: false,
}

var usedIdx [][]int
if tc.childrenUsedSchema != nil {
usedIdx = make([][]int, 0, len(tc.childrenUsedSchema))
for _, childSchema := range tc.childrenUsedSchema {
used := make([]int, 0, len(childSchema))
for idx, one := range childSchema {
if one {
used = append(used, idx)
}
}
usedIdx = append(usedIdx, used)
}
}

mergeJoinExec.joiner = newJoiner(
tc.ctx,
0,
false,
defaultValues,
nil,
<<<<<<< HEAD:executor/benchmark_test.go
retTypes(leftExec),
retTypes(rightExec),
tc.childrenUsedSchema,
=======
exec.RetTypes(leftExec),
exec.RetTypes(rightExec),
usedIdx,
>>>>>>> 58e5284b3f4 (planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203)):pkg/executor/benchmark_test.go
false,
)

Expand Down
45 changes: 30 additions & 15 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,11 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
end: v.Offset + v.Count,
}

childUsedSchemaLen := v.Children()[0].Schema().Len()
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
for i, used := range childUsedSchema {
if used {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, i)
}
}
if len(e.columnIdxsUsedByChild) == len(childUsedSchema) {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, childUsedSchema...)
if len(e.columnIdxsUsedByChild) == childUsedSchemaLen {
e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition.
}
return e
Expand Down Expand Up @@ -2993,21 +2990,39 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputCols {
markedOffsets[col.Index] = struct{}{}
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
childrenUsed = make([][]int, 0, len(childSchemas))
markedOffsets := make(map[int]int)
// keep the original maybe reversed order.
for originalIdx, col := range outputCols {
markedOffsets[col.Index] = originalIdx
}
prefixLen := 0
type intPair struct {
first int
second int
}
// for example here.
// left child schema: [col11]
// right child schema: [col21, col22]
// output schema is [col11, col22, col21], if not records the original derived order after physical resolve index.
// the lused will be [0], the rused will be [0,1], while the actual order is dismissed, [1,0] is correct for rused.
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
usedIdxPair := make([]intPair, 0, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
if originalIdx, ok := markedOffsets[prefixLen+i]; ok {
usedIdxPair = append(usedIdxPair, intPair{first: originalIdx, second: i})
}
}
childrenUsed = append(childrenUsed, used)
// sort the used idxes according their original indexes derived after resolveIndex.
slices.SortFunc(usedIdxPair, func(a, b intPair) int {
return cmp.Compare(a.first, b.first)
})
usedIdx := make([]int, 0, len(childSchema.Columns))
for _, one := range usedIdxPair {
usedIdx = append(usedIdx, one.second)
}
childrenUsed = append(childrenUsed, usedIdx)
prefixLen += childSchema.Len()
}
return
Expand Down
17 changes: 6 additions & 11 deletions executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func JoinerType(j joiner) plannercore.JoinType {

func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
outerIsRight bool, defaultInner []types.Datum, filter []expression.Expression,
lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]bool, isNA bool) joiner {
lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]int, isNA bool) joiner {
base := baseJoiner{
ctx: ctx,
conditions: filter,
Expand All @@ -141,19 +141,14 @@ func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
}
base.selected = make([]bool, 0, chunk.InitialCapacity)
base.isNull = make([]bool, 0, chunk.InitialCapacity)
// lused and rused should be followed with its original order.
// the case is that is join schema rely on the reversed order
// of child's schema, here we should keep it original order.
if childrenUsed != nil {
base.lUsed = make([]int, 0, len(childrenUsed[0])) // make it non-nil
for i, used := range childrenUsed[0] {
if used {
base.lUsed = append(base.lUsed, i)
}
}
base.lUsed = append(base.lUsed, childrenUsed[0]...)
base.rUsed = make([]int, 0, len(childrenUsed[1])) // make it non-nil
for i, used := range childrenUsed[1] {
if used {
base.rUsed = append(base.rUsed, i)
}
}
base.rUsed = append(base.rUsed, childrenUsed[1]...)
logutil.BgLogger().Debug("InlineProjection",
zap.Ints("lUsed", base.lUsed), zap.Ints("rUsed", base.rUsed),
zap.Int("lCount", len(lhsColTypes)), zap.Int("rCount", len(rhsColTypes)))
Expand Down
120 changes: 120 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,126 @@ func TestTiflashEmptyDynamicPruneResult(t *testing.T) {
tk.MustQuery("select /*+ read_from_storage(tiflash[t1, t2]) */ * from IDT_RP24833 partition(p3, p4) t1 join IDT_RP24833 partition(p2) t2 on t1.col1 = t2.col1 where t1. col1 between -8448770111093677011 and -8448770111093677011 and t2. col1 <= -8448770111093677011;").Check(testkit.Rows())
}

<<<<<<< HEAD:executor/tiflashtest/tiflash_test.go
=======
func TestDisaggregatedTiFlash(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
conf.UseAutoScaler = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
conf.UseAutoScaler = false
})
err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.TestASStr, "tmpAddr", "tmpClusterID", false)
require.NoError(t, err)

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
// Expect error, because TestAutoScaler return empty topo.
require.Contains(t, err.Error(), "Cannot find proper topo to dispatch MPPTask: topo from AutoScaler is empty")

err = tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.AWSASStr, "tmpAddr", "tmpClusterID", false)
require.NoError(t, err)
err = tk.ExecToErr("select * from t;")
// Expect error, because AWSAutoScaler is not setup, so http request will fail.
require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed")
}

// todo: remove this after AutoScaler is stable.
func TestDisaggregatedTiFlashNonAutoScaler(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
conf.UseAutoScaler = false
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
conf.UseAutoScaler = true
})

// Setting globalTopoFetcher to nil to can make sure cannot fetch topo from AutoScaler.
err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.InvalidASStr, "tmpAddr", "tmpClusterID", false)
require.Contains(t, err.Error(), "unexpected topo fetch type. expect: mock or aws or gcp, got invalid")

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
// This error message means we use PD instead of AutoScaler.
require.Contains(t, err.Error(), "tiflash_compute node is unavailable")
}

func TestDisaggregatedTiFlashQuery(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl_1")
tk.MustExec(`create table tbl_1 ( col_1 bigint not null default -1443635317331776148,
col_2 text ( 176 ) collate utf8mb4_bin not null,
col_3 decimal ( 8, 3 ),
col_4 varchar ( 128 ) collate utf8mb4_bin not null,
col_5 varchar ( 377 ) collate utf8mb4_bin,
col_6 double,
col_7 varchar ( 459 ) collate utf8mb4_bin,
col_8 tinyint default -88 ) charset utf8mb4 collate utf8mb4_bin ;`)
tk.MustExec("alter table tbl_1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "tbl_1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")

tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("create table t1(c1 int, c2 int) partition by hash(c1) partitions 3")
tk.MustExec("insert into t1 values(1, 1), (2, 2), (3, 3)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb = external.GetTableByName(t, tk, "test", "t1")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_11 9970.00 root ",
"├─TableReader_16 3323.33 root MppVersion: 2, data:ExchangeSender_15",
"│ └─ExchangeSender_15 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_14 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_13 10000.00 mpp[tiflash] table:t1, partition:p0 pushed down filter:empty, keep order:false, stats:pseudo",
"├─TableReader_20 3323.33 root MppVersion: 2, data:ExchangeSender_19",
"│ └─ExchangeSender_19 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_18 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_17 10000.00 mpp[tiflash] table:t1, partition:p1 pushed down filter:empty, keep order:false, stats:pseudo",
"└─TableReader_24 3323.33 root MppVersion: 2, data:ExchangeSender_23",
" └─ExchangeSender_23 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_22 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_21 10000.00 mpp[tiflash] table:t1, partition:p2 pushed down filter:empty, keep order:false, stats:pseudo"))
}

>>>>>>> 58e5284b3f4 (planner,executor: fix join resolveIndex won't find its column from children schema & amend join's lused and rused logic for reversed column ref from join schema to its children (#51203)):pkg/executor/test/tiflashtest/tiflash_test.go
func TestMPPMemoryTracker(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
Expand Down
Loading
Loading