Skip to content

Commit

Permalink
planner: move logical join and logical selection to logicalop (#55272)
Browse files Browse the repository at this point in the history
ref #51664, ref #52714
  • Loading branch information
AilinKid authored Aug 12, 2024
1 parent 004b442 commit 2abd334
Show file tree
Hide file tree
Showing 86 changed files with 1,324 additions and 1,201 deletions.
2 changes: 1 addition & 1 deletion docs/design/2022-01-04-integer-shard-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ The entry point to add the `tidb_shard` expression is the function as bellow. We

func (ds *DataSource) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
predicates = expression.PropagateConstant(ds.ctx, predicates)
predicates = DeleteTrueExprs(ds, predicates)
predicates = constraint.DeleteTrueExprs(ds, predicates)
// Add tidb_shard() prefix to the condtion for shard index in some scenarios
// TODO: remove it to the place building logical plan
predicates = ds.AddPrefix4ShardIndexes(ds.ctx, predicates)
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ type hashJoinTestCase struct {
concurrency int
ctx sessionctx.Context
keyIdx []int
joinType core.JoinType
joinType logicalop.JoinType
disk bool
useOuterToBuild bool
rawData string
Expand All @@ -607,7 +607,7 @@ func (tc hashJoinTestCase) String() string {
tc.rows, tc.cols, tc.concurrency, tc.keyIdx, tc.disk)
}

func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, useOuterToBuild bool) *hashJoinTestCase {
func defaultHashJoinTestCase(cols []*types.FieldType, joinType logicalop.JoinType, useOuterToBuild bool) *hashJoinTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
Expand All @@ -621,10 +621,10 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
return tc
}

func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType core.JoinType) *expression.Schema {
func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType logicalop.JoinType) *expression.Schema {
colsNeedResolving := joinSchema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if joinType == core.LeftOuterSemiJoin || joinType == core.AntiLeftOuterSemiJoin {
if joinType == logicalop.LeftOuterSemiJoin || joinType == logicalop.AntiLeftOuterSemiJoin {
colsNeedResolving--
}
mergedSchema := expression.MergeSchema(lSchema, rSchema)
Expand Down Expand Up @@ -687,7 +687,7 @@ func prepare4HashJoinV2(testCase *hashJoinTestCase, innerExec, outerExec exec.Ex
// todo: need systematic way to protect.
// physical join should resolveIndices to get right schema column index.
// otherwise, markChildrenUsedColsForTest will fail below.
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), core.InnerJoin)
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), logicalop.InnerJoin)

joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
Expand Down Expand Up @@ -776,7 +776,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec
// todo: need systematic way to protect.
// physical join should resolveIndices to get right schema column index.
// otherwise, markChildrenUsedColsForTest will fail below.
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), core.InnerJoin)
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), logicalop.InnerJoin)

joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
Expand Down
20 changes: 10 additions & 10 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,15 +1428,15 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.

defaultValues := v.DefaultValues
if defaultValues == nil {
if v.JoinType == plannercore.RightOuterJoin {
if v.JoinType == logicalop.RightOuterJoin {
defaultValues = make([]types.Datum, leftExec.Schema().Len())
} else {
defaultValues = make([]types.Datum, rightExec.Schema().Len())
}
}

colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}

Expand All @@ -1447,7 +1447,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.
Joiner: join.NewJoiner(
b.ctx,
v.JoinType,
v.JoinType == plannercore.RightOuterJoin,
v.JoinType == logicalop.RightOuterJoin,
defaultValues,
v.OtherConditions,
exec.RetTypes(leftExec),
Expand All @@ -1470,7 +1470,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.
Filters: v.RightConditions,
}

if v.JoinType == plannercore.RightOuterJoin {
if v.JoinType == logicalop.RightOuterJoin {
e.InnerTable = leftTable
e.OuterTable = rightTable
} else {
Expand Down Expand Up @@ -1603,7 +1603,7 @@ func (b *executorBuilder) buildHashJoinV2(v *plannercore.PhysicalHashJoin) exec.
}

colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
// the matched column is added inside join
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
Expand Down Expand Up @@ -1772,7 +1772,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) exec.Ex
}
isNAJoin := len(v.LeftNAJoinKeys) > 0
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
Expand Down Expand Up @@ -2464,7 +2464,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
OuterExec: outerExec,
OuterFilter: outerFilter,
InnerFilter: innerFilter,
Outer: v.JoinType != plannercore.InnerJoin,
Outer: v.JoinType != logicalop.InnerJoin,
Joiner: tupleJoiner,
OuterSchema: v.OuterSchema,
Sctx: b.ctx,
Expand Down Expand Up @@ -2505,7 +2505,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
outerExec: outerExec,
outerFilter: outerFilter,
innerFilter: innerFilters,
outer: v.JoinType != plannercore.InnerJoin,
outer: v.JoinType != logicalop.InnerJoin,
joiners: joiners,
corCols: corCols,
concurrency: v.Concurrency,
Expand Down Expand Up @@ -3294,7 +3294,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
Finished: &atomic.Value{},
}
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
Expand Down Expand Up @@ -3420,7 +3420,7 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex
LastColHelper: v.CompareFilters,
}
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -697,8 +698,8 @@ func TestMergeJoinRequiredRows(t *testing.T) {
panic("not support")
}
}
joinTypes := []plannercore.JoinType{plannercore.RightOuterJoin, plannercore.LeftOuterJoin,
plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin}
joinTypes := []logicalop.JoinType{logicalop.RightOuterJoin, logicalop.LeftOuterJoin,
logicalop.LeftOuterSemiJoin, logicalop.AntiLeftOuterSemiJoin}
for _, joinType := range joinTypes {
ctx := defaultCtx()
required := make([]int, 100)
Expand All @@ -720,8 +721,8 @@ func TestMergeJoinRequiredRows(t *testing.T) {
}
}

func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, innerSrc, outerSrc exec.Executor) exec.Executor {
if joinType == plannercore.RightOuterJoin {
func buildMergeJoinExec(ctx sessionctx.Context, joinType logicalop.JoinType, innerSrc, outerSrc exec.Executor) exec.Executor {
if joinType == logicalop.RightOuterJoin {
innerSrc, outerSrc = outerSrc, innerSrc
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/core",
"//pkg/planner/core/operator/logicalop",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
Expand Down Expand Up @@ -87,7 +88,7 @@ go_test(
"//pkg/expression",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/planner/core/operator/logicalop",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -498,7 +498,7 @@ func isKeyMatched(keyMode keyMode, serializedKey []byte, rowStart unsafe.Pointer
}

// NewJoinProbe create a join probe used for hash join v2
func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType core.JoinType, keyIndex []int, joinedColumnTypes, probeKeyTypes []*types.FieldType, rightAsBuildSide bool) ProbeV2 {
func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType, keyIndex []int, joinedColumnTypes, probeKeyTypes []*types.FieldType, rightAsBuildSide bool) ProbeV2 {
base := baseJoinProbe{
ctx: ctx,
workID: workID,
Expand Down Expand Up @@ -540,11 +540,11 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType core.JoinType, keyIn
base.rowIndexInfos = make([]*matchedRowInfo, 0, chunk.InitialCapacity)
}
switch joinType {
case core.InnerJoin:
case logicalop.InnerJoin:
return &innerJoinProbe{base}
case core.LeftOuterJoin:
case logicalop.LeftOuterJoin:
return newOuterJoinProbe(base, !rightAsBuildSide, rightAsBuildSide)
case core.RightOuterJoin:
case logicalop.RightOuterJoin:
return newOuterJoinProbe(base, rightAsBuildSide, rightAsBuildSide)
default:
panic("unsupported join type")
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/join/hash_join_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -55,7 +55,7 @@ type hashJoinCtxBase struct {
finished atomic.Bool
IsNullEQ []bool
buildFinished chan error
JoinType plannercore.JoinType
JoinType logicalop.JoinType
IsNullAware bool
memTracker *memory.Tracker // track memory usage.
diskTracker *disk.Tracker // track disk usage.
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/join/hash_join_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/unionexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -216,7 +216,7 @@ func (e *HashJoinV1Exec) fetchAndProbeHashTable(ctx context.Context) {
defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End()
e.ProbeSideTupleFetcher.fetchProbeSideChunks(ctx, e.MaxChunkSize(), func() bool {
return e.ProbeSideTupleFetcher.RowContainer.Len() == uint64(0)
}, e.ProbeSideTupleFetcher.JoinType == plannercore.InnerJoin || e.ProbeSideTupleFetcher.JoinType == plannercore.SemiJoin,
}, e.ProbeSideTupleFetcher.JoinType == logicalop.InnerJoin || e.ProbeSideTupleFetcher.JoinType == logicalop.SemiJoin,
false, e.ProbeSideTupleFetcher.IsOuterJoin, &e.ProbeSideTupleFetcher.hashJoinCtxBase)
}, e.ProbeSideTupleFetcher.handleProbeSideFetcherPanic)

Expand Down Expand Up @@ -737,8 +737,8 @@ func (w *ProbeWorkerV1) joinNAASJMatchProbeSideRow2Chunk(probeKey uint64, probeK
// For NA-AntiLeftOuterSemiJoin, we couldn't match null-bucket first, because once y set has a same key x and null
// key, we should return the result as left side row appended with a scalar value 0 which is from same key matching failure.
func (w *ProbeWorkerV1) joinNAAJMatchProbeSideRow2Chunk(probeKey uint64, probeKeyNullBits *bitmap.ConcurrentBitmap, probeSideRow chunk.Row, hCtx *HashContext, joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
naAntiSemiJoin := w.HashJoinCtx.JoinType == plannercore.AntiSemiJoin && w.HashJoinCtx.IsNullAware
naAntiLeftOuterSemiJoin := w.HashJoinCtx.JoinType == plannercore.AntiLeftOuterSemiJoin && w.HashJoinCtx.IsNullAware
naAntiSemiJoin := w.HashJoinCtx.JoinType == logicalop.AntiSemiJoin && w.HashJoinCtx.IsNullAware
naAntiLeftOuterSemiJoin := w.HashJoinCtx.JoinType == logicalop.AntiLeftOuterSemiJoin && w.HashJoinCtx.IsNullAware
if naAntiSemiJoin {
return w.joinNAASJMatchProbeSideRow2Chunk(probeKey, probeKeyNullBits, probeSideRow, hCtx, joinResult)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/channel"
Expand Down Expand Up @@ -335,10 +335,10 @@ func (e *HashJoinV2Exec) Open(ctx context.Context) error {
}

func (fetcher *ProbeSideTupleFetcherV2) shouldLimitProbeFetchSize() bool {
if fetcher.JoinType == plannercore.LeftOuterJoin && fetcher.RightAsBuildSide {
if fetcher.JoinType == logicalop.LeftOuterJoin && fetcher.RightAsBuildSide {
return true
}
if fetcher.JoinType == plannercore.RightOuterJoin && !fetcher.RightAsBuildSide {
if fetcher.JoinType == logicalop.RightOuterJoin && !fetcher.RightAsBuildSide {
return true
}
return false
Expand Down Expand Up @@ -374,13 +374,13 @@ func (w *BuildWorkerV2) splitPartitionAndAppendToRowTable(typeCtx types.Context,

func (e *HashJoinV2Exec) canSkipProbeIfHashTableIsEmpty() bool {
switch e.JoinType {
case plannercore.InnerJoin:
case logicalop.InnerJoin:
return true
case plannercore.LeftOuterJoin:
case logicalop.LeftOuterJoin:
return !e.RightAsBuildSide
case plannercore.RightOuterJoin:
case logicalop.RightOuterJoin:
return e.RightAsBuildSide
case plannercore.SemiJoin:
case logicalop.SemiJoin:
return e.RightAsBuildSide
default:
return false
Expand Down
Loading

0 comments on commit 2abd334

Please sign in to comment.