Skip to content

Commit

Permalink
Merge #58280
Browse files Browse the repository at this point in the history
58280: colexec: add vectorized cross joiner r=yuzefovich a=yuzefovich

**descpb: add helper method for computing output types of joins**

Release note: None

**colexec: shorten merge joiner benchmark**

This commit adjust the merge joiner benchmark to run faster (from 145s
to 54s on my laptop) by removing the smallest row options and reducing
the number of options from 5 to 4.

Release note: None

**colexec: extract cross joiner base from the merge joiner**

This commit extracts the logic of building from the buffered groups in
the merge joiner into a cross joiner base struct which will allow us to
reuse it for implementing the disk-backed vectorized cross joiner. The
logical changes were kept to minimum.

Release note: None

**colexec: add vectorized cross joiner**

This commit adds the vectorized cross joiner with the core building
logic being reused from the base struct extracted from the merge joiner.
The new operator simply needs to consume the inputs to set up the base
struct while also paying attention to possibly unmatched tuples (this
was never the case for the buffered group in the merge joiner). It also
optimizes the behavior in how the inputs are consumed depending on the
join type.

Fixes: #46205

Release note: None

**colexec: optimize cross joiner for some join types**

This commit optimizes the cross joiner a bit by taking advantage that
for some join types we need to repeat left tuples only once. It
additionally cleans up the logic around exiting from the build methods
so that we don't run into a situation where `toAppend` is 0 (which is
pointless).

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 14, 2021
2 parents af2a567 + f535dbe commit 80e6094
Show file tree
Hide file tree
Showing 33 changed files with 3,559 additions and 10,713 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/and_or_projection.eg.go \
pkg/sql/colexec/cast.eg.go \
pkg/sql/colexec/const.eg.go \
pkg/sql/colexec/crossjoiner.eg.go \
pkg/sql/colexec/default_cmp_expr.eg.go \
pkg/sql/colexec/default_cmp_proj_ops.eg.go \
pkg/sql/colexec/default_cmp_sel_ops.eg.go \
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/descpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/parser",
"//pkg/sql/privilege",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/encoding",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/catalog/descpb/join_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package descpb

import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -107,3 +108,22 @@ func (j JoinType) IsLeftAntiOrExceptAll() bool {
func (j JoinType) IsRightSemiOrRightAnti() bool {
return j == RightSemiJoin || j == RightAntiJoin
}

// MakeOutputTypes computes the output types for this join type.
func (j JoinType) MakeOutputTypes(left, right []*types.T) []*types.T {
numOutputTypes := 0
if j.ShouldIncludeLeftColsInOutput() {
numOutputTypes += len(left)
}
if j.ShouldIncludeRightColsInOutput() {
numOutputTypes += len(right)
}
outputTypes := make([]*types.T, 0, numOutputTypes)
if j.ShouldIncludeLeftColsInOutput() {
outputTypes = append(outputTypes, left...)
}
if j.ShouldIncludeRightColsInOutput() {
outputTypes = append(outputTypes, right...)
}
return outputTypes
}
2 changes: 2 additions & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"columnarizer.go",
"constants.go",
"count.go",
"crossjoiner.go",
"deselector.go",
"disk_spiller.go",
"expr.go",
Expand Down Expand Up @@ -110,6 +111,7 @@ go_test(
"columnarizer_test.go",
"const_test.go",
"count_test.go",
"crossjoiner_test.go",
"default_agg_test.go",
"default_cmp_op_test.go",
"dep_test.go",
Expand Down
164 changes: 83 additions & 81 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,6 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
if !spec.Core.HashJoiner.OnExpr.Empty() && spec.Core.HashJoiner.Type != descpb.InnerJoin {
return errors.Newf("can't plan vectorized non-inner hash joins with ON expressions")
}
leftInput, rightInput := spec.Input[0], spec.Input[1]
if len(leftInput.ColumnTypes) == 0 || len(rightInput.ColumnTypes) == 0 {
// We have a cross join of two inputs, and at least one of them has
// zero-length schema. However, the hash join operators (both
// external and in-memory) have a built-in assumption of non-empty
// inputs, so we will fallback to row execution in such cases.
// TODO(yuzefovich): implement specialized cross join operator.
return errors.Newf("can't plan vectorized hash joins with an empty input schema")
}
return nil

case spec.Core.MergeJoiner != nil:
Expand Down Expand Up @@ -905,76 +896,93 @@ func NewColOperator(
rightTypes := make([]*types.T, len(spec.Input[1].ColumnTypes))
copy(rightTypes, spec.Input[1].ColumnTypes)

hashJoinerMemMonitorName := fmt.Sprintf("hash-joiner-%d", spec.ProcessorID)
var hashJoinerMemAccount *mon.BoundAccount
var hashJoinerUnlimitedAllocator *colmem.Allocator
if useStreamingMemAccountForBuffering {
hashJoinerMemAccount = streamingMemAccount
hashJoinerUnlimitedAllocator = streamingAllocator
} else {
hashJoinerMemAccount = result.createMemAccountForSpillStrategy(
ctx, flowCtx, hashJoinerMemMonitorName,
if len(core.HashJoiner.LeftEqColumns) == 0 {
// We are performing a cross-join, so we need to plan a
// specialized operator.
crossJoinerMemMonitorName := fmt.Sprintf("cross-joiner-%d", spec.ProcessorID)
crossJoinerMemAccount := result.createBufferingUnlimitedMemAccount(ctx, flowCtx, crossJoinerMemMonitorName)
crossJoinerDiskAcc := result.createDiskAccount(ctx, flowCtx, crossJoinerMemMonitorName)
unlimitedAllocator := colmem.NewAllocator(ctx, crossJoinerMemAccount, factory)
// TODO(yuzefovich): audit all usages of GetWorkMemLimit to see
// whether we should be paying attention to ForceDiskSpill knob
// there too.
memoryLimit := execinfra.GetWorkMemLimit(flowCtx.Cfg)
if flowCtx.Cfg.TestingKnobs.ForceDiskSpill {
memoryLimit = 1
}
result.Op = colexec.NewCrossJoiner(
unlimitedAllocator,
memoryLimit,
args.DiskQueueCfg,
args.FDSemaphore,
core.HashJoiner.Type,
inputs[0], inputs[1],
leftTypes, rightTypes,
crossJoinerDiskAcc,
)
hashJoinerUnlimitedAllocator = colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, hashJoinerMemMonitorName), factory,
result.ToClose = append(result.ToClose, result.Op.(colexecbase.Closer))
} else {
hashJoinerMemMonitorName := fmt.Sprintf("hash-joiner-%d", spec.ProcessorID)
var hashJoinerMemAccount *mon.BoundAccount
var hashJoinerUnlimitedAllocator *colmem.Allocator
if useStreamingMemAccountForBuffering {
hashJoinerMemAccount = streamingMemAccount
hashJoinerUnlimitedAllocator = streamingAllocator
} else {
hashJoinerMemAccount = result.createMemAccountForSpillStrategy(
ctx, flowCtx, hashJoinerMemMonitorName,
)
hashJoinerUnlimitedAllocator = colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, hashJoinerMemMonitorName), factory,
)
}
hjSpec := colexec.MakeHashJoinerSpec(
core.HashJoiner.Type,
core.HashJoiner.LeftEqColumns,
core.HashJoiner.RightEqColumns,
leftTypes,
rightTypes,
core.HashJoiner.RightEqColumnsAreKey,
)
}
// It is valid for empty set of equality columns to be considered as
// "key" (for example, the input has at most 1 row). However, hash
// joiner, in order to handle NULL values correctly, needs to think
// that an empty set of equality columns doesn't form a key.
rightEqColsAreKey := core.HashJoiner.RightEqColumnsAreKey && len(core.HashJoiner.RightEqColumns) > 0
hjSpec := colexec.MakeHashJoinerSpec(
core.HashJoiner.Type,
core.HashJoiner.LeftEqColumns,
core.HashJoiner.RightEqColumns,
leftTypes,
rightTypes,
rightEqColsAreKey,
)

inMemoryHashJoiner := colexec.NewHashJoiner(
colmem.NewAllocator(ctx, hashJoinerMemAccount, factory),
hashJoinerUnlimitedAllocator, hjSpec, inputs[0], inputs[1],
colexec.HashJoinerInitialNumBuckets,
)
if args.TestingKnobs.DiskSpillingDisabled {
// We will not be creating a disk-backed hash joiner because
// we're running a test that explicitly asked for only in-memory
// hash joiner.
result.Op = inMemoryHashJoiner
} else {
diskAccount := result.createDiskAccount(ctx, flowCtx, hashJoinerMemMonitorName)
result.Op = colexec.NewTwoInputDiskSpiller(
inputs[0], inputs[1], inMemoryHashJoiner.(colexecbase.BufferingInMemoryOperator),
hashJoinerMemMonitorName,
func(inputOne, inputTwo colexecbase.Operator) colexecbase.Operator {
monitorNamePrefix := "external-hash-joiner"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, monitorNamePrefix), factory,
)
ehj := colexec.NewExternalHashJoiner(
unlimitedAllocator,
flowCtx,
args,
hjSpec,
inputOne, inputTwo,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, monitorNamePrefix, factory),
diskAccount,
)
result.ToClose = append(result.ToClose, ehj.(colexecbase.Closer))
return ehj
},
args.TestingKnobs.SpillingCallbackFn,
inMemoryHashJoiner := colexec.NewHashJoiner(
colmem.NewAllocator(ctx, hashJoinerMemAccount, factory),
hashJoinerUnlimitedAllocator, hjSpec, inputs[0], inputs[1],
colexec.HashJoinerInitialNumBuckets,
)
if args.TestingKnobs.DiskSpillingDisabled {
// We will not be creating a disk-backed hash joiner because
// we're running a test that explicitly asked for only
// in-memory hash joiner.
result.Op = inMemoryHashJoiner
} else {
diskAccount := result.createDiskAccount(ctx, flowCtx, hashJoinerMemMonitorName)
result.Op = colexec.NewTwoInputDiskSpiller(
inputs[0], inputs[1], inMemoryHashJoiner.(colexecbase.BufferingInMemoryOperator),
hashJoinerMemMonitorName,
func(inputOne, inputTwo colexecbase.Operator) colexecbase.Operator {
monitorNamePrefix := "external-hash-joiner"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, monitorNamePrefix), factory,
)
ehj := colexec.NewExternalHashJoiner(
unlimitedAllocator,
flowCtx,
args,
hjSpec,
inputOne, inputTwo,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, monitorNamePrefix, factory),
diskAccount,
)
result.ToClose = append(result.ToClose, ehj.(colexecbase.Closer))
return ehj
},
args.TestingKnobs.SpillingCallbackFn,
)
}
}
result.ColumnTypes = make([]*types.T, 0, len(leftTypes)+len(rightTypes))
if core.HashJoiner.Type.ShouldIncludeLeftColsInOutput() {
result.ColumnTypes = append(result.ColumnTypes, leftTypes...)
}
if core.HashJoiner.Type.ShouldIncludeRightColsInOutput() {
result.ColumnTypes = append(result.ColumnTypes, rightTypes...)
}

result.ColumnTypes = core.HashJoiner.Type.MakeOutputTypes(leftTypes, rightTypes)

if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin {
if err = result.planAndMaybeWrapFilter(
Expand Down Expand Up @@ -1028,13 +1036,7 @@ func NewColOperator(

result.Op = mj
result.ToClose = append(result.ToClose, mj.(colexecbase.Closer))
result.ColumnTypes = make([]*types.T, 0, len(leftTypes)+len(rightTypes))
if core.MergeJoiner.Type.ShouldIncludeLeftColsInOutput() {
result.ColumnTypes = append(result.ColumnTypes, leftTypes...)
}
if core.MergeJoiner.Type.ShouldIncludeRightColsInOutput() {
result.ColumnTypes = append(result.ColumnTypes, rightTypes...)
}
result.ColumnTypes = core.MergeJoiner.Type.MakeOutputTypes(leftTypes, rightTypes)

if onExpr != nil {
if err = result.planAndMaybeWrapFilter(
Expand Down
Loading

0 comments on commit 80e6094

Please sign in to comment.