Skip to content

Commit

Permalink
distsql: base extraction for hash and merge join
Browse files Browse the repository at this point in the history
pulls out common structure between the hashJoiner and mergeJoiner
implementation, similar fashion to what was done with readerbase with
respect to table reader and join reader.
  • Loading branch information
irfansharif committed Nov 3, 2016
1 parent 7fbd396 commit d4b9d17
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 159 deletions.
123 changes: 38 additions & 85 deletions pkg/sql/distsql/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,41 @@ package distsql
import (
"sync"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// TODO(irfansharif): Document this.
// bucket here is the set of rows for a given group key (comprised of
// columns specified by the join constraints), 'seen' is used to determine if
// there was a matching group (with the same group key) in the opposite stream.
type bucket struct {
rows sqlbase.EncDatumRows
seen bool
}

// TODO(irfansharif): It's trivial to use the grace hash join algorithm by using
// hashrouters and hash joiners to parallelize hash joins. We would begin by
// 'partitioning' both tables via a hash function. Given the 'partitions' are
// formed by hashing on the join key any join output tuples must belong to the
// same 'partition', each 'partition' would then undergo the standard build and
// probe hash join algorithm, the computation of the partial joins is
// parallelizable.
type bucket struct {
rows sqlbase.EncDatumRows
seen bool
}

//
// HashJoiner performs hash join, it has two input streams and one output.
//
// It works by reading the entire left stream and putting it in a hash
// table. Thus, there is no guarantee on the ordering of results that stem only
// from the left input (in the case of LEFT OUTER, FULL OUTER). However, it is
// guaranteed that results that involve the right stream preserve the ordering;
// i.e. all results that stem from right row (i) precede results that stem from
// right row (i+1).
type hashJoiner struct {
left RowSource
right RowSource
output RowReceiver
ctx context.Context
joinType joinType
filter exprHelper
joinerBase

leftEqCols columns
rightEqCols columns
outputCols columns
buckets map[string]bucket

emptyRight sqlbase.EncDatumRow
emptyLeft sqlbase.EncDatumRow
combinedRow sqlbase.EncDatumRow
rowAlloc sqlbase.EncDatumRowAlloc
datumAlloc sqlbase.DatumAlloc
}

Expand All @@ -65,27 +63,13 @@ func newHashJoiner(
flowCtx *FlowCtx, spec *HashJoinerSpec, inputs []RowSource, output RowReceiver,
) (*hashJoiner, error) {
h := &hashJoiner{
left: inputs[0],
right: inputs[1],
output: output,
ctx: log.WithLogTag(flowCtx.Context, "Hash Joiner", nil),
leftEqCols: columns(spec.LeftEqColumns),
rightEqCols: columns(spec.RightEqColumns),
outputCols: columns(spec.OutputColumns),
joinType: joinType(spec.Type),
buckets: make(map[string]bucket),
emptyLeft: make(sqlbase.EncDatumRow, len(spec.LeftTypes)),
emptyRight: make(sqlbase.EncDatumRow, len(spec.RightTypes)),
}

for i := range h.emptyLeft {
h.emptyLeft[i].Datum = parser.DNull
}
for i := range h.emptyRight {
h.emptyRight[i].Datum = parser.DNull
}

err := h.filter.init(spec.Expr, append(spec.LeftTypes, spec.RightTypes...), flowCtx.evalCtx)
err := h.joinerBase.init(flowCtx, inputs, output, spec.OutputColumns,
spec.Type, spec.LeftTypes, spec.RightTypes, spec.Expr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,10 +101,13 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
}
}

// buildPhase constructs our internal hash map of rows seen, this is done
// entirely from the left stream with the encoding/group key generated using the
// left equality columns.
func (h *hashJoiner) buildPhase() error {
var scratch []byte
for {
lrow, err := h.left.NextRow()
lrow, err := h.inputs[0].NextRow()
if err != nil || lrow == nil {
return err
}
Expand All @@ -129,19 +116,24 @@ func (h *hashJoiner) buildPhase() error {
if err != nil {
return err
}

b, _ := h.buckets[string(encoded)]
b.rows = append(b.rows, lrow)
h.buckets[string(encoded)] = b

scratch = encoded[:0]
}
return nil
}

// probePhase uses our constructed hash map of rows seen from the left stream,
// we probe the map for each row retrieved from the right stream outputting the
// merging of the two rows if matched. Behaviour for outer joins also behave as
// expected, i.e. for RIGHT OUTER joins if no corresponding left row is seen an
// empty DNull row is emitted instead.
func (h *hashJoiner) probePhase() error {
var scratch []byte
for {
rrow, err := h.right.NextRow()
rrow, err := h.inputs[1].NextRow()
if err != nil {
return err
}
Expand All @@ -156,22 +148,22 @@ func (h *hashJoiner) probePhase() error {

b, ok := h.buckets[string(encoded)]
if !ok {
r, err := h.render(nil, rrow)
row, err := h.render(nil, rrow)
if err != nil {
return err
}
if !h.output.PushRow(r) {
if !h.output.PushRow(row) {
return nil
}
} else {
b.seen = true
h.buckets[string(encoded)] = b
for _, lrow := range b.rows {
r, err := h.render(lrow, rrow)
row, err := h.render(lrow, rrow)
if err != nil {
return err
}
if r != nil && !h.output.PushRow(r) {
if row != nil && !h.output.PushRow(row) {
return nil
}
}
Expand All @@ -186,11 +178,11 @@ func (h *hashJoiner) probePhase() error {
for _, b := range h.buckets {
if !b.seen {
for _, lrow := range b.rows {
r, err := h.render(lrow, nil)
row, err := h.render(lrow, nil)
if err != nil {
return err
}
if r != nil && !h.output.PushRow(r) {
if row != nil && !h.output.PushRow(row) {
return nil
}
}
Expand All @@ -214,42 +206,3 @@ func (h *hashJoiner) encode(
}
return appendTo, nil
}

// render evaluates the provided filter and constructs a row with columns from
// both rows as specified by the provided output columns. We expect left or
// right to be nil if there was no explicit "join" match, the filter is then
// evaluated on a combinedRow with null values for the columns of the nil row.
func (h *hashJoiner) render(lrow, rrow sqlbase.EncDatumRow) (sqlbase.EncDatumRow, error) {
switch h.joinType {
case innerJoin:
if lrow == nil || rrow == nil {
return nil, nil
}
case fullOuter:
if lrow == nil {
lrow = h.emptyLeft
} else if rrow == nil {
rrow = h.emptyRight
}
case leftOuter:
if rrow == nil {
rrow = h.emptyRight
}
case rightOuter:
if lrow == nil {
lrow = h.emptyLeft
}
}
h.combinedRow = append(h.combinedRow[:0], lrow...)
h.combinedRow = append(h.combinedRow, rrow...)
res, err := h.filter.evalFilter(h.combinedRow)
if !res || err != nil {
return nil, err
}

row := h.rowAlloc.AllocRow(len(h.outputCols))
for i, col := range h.outputCols {
row[i] = h.combinedRow[col]
}
return row, nil
}
108 changes: 108 additions & 0 deletions pkg/sql/distsql/joinerbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Irfan Sharif ([email protected])

package distsql

import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

type joinerBase struct {
inputs []RowSource
output RowReceiver
ctx context.Context

joinType joinType
outputCols columns
filter exprHelper
rowAlloc sqlbase.EncDatumRowAlloc
emptyLeft sqlbase.EncDatumRow
emptyRight sqlbase.EncDatumRow
combinedRow sqlbase.EncDatumRow
}

// err := init(flowCtx, inputs, output, spec.OutputColumns,
// spec.Type, spec.LeftTypes, spec.RightTypes, spec.Expr)
func (jb *joinerBase) init(
flowCtx *FlowCtx,
inputs []RowSource,
output RowReceiver,
outputCols []uint32,
jType JoinType,
leftTypes []sqlbase.ColumnType_Kind,
rightTypes []sqlbase.ColumnType_Kind,
expr Expression,
) error {
jb.inputs = inputs
jb.output = output
jb.ctx = log.WithLogTag(flowCtx.Context, "Joiner", nil)
jb.outputCols = columns(outputCols)
jb.joinType = joinType(jType)
jb.emptyLeft = make(sqlbase.EncDatumRow, len(leftTypes))
for i := range jb.emptyLeft {
jb.emptyLeft[i].Datum = parser.DNull
}

jb.emptyRight = make(sqlbase.EncDatumRow, len(rightTypes))
for i := range jb.emptyRight {
jb.emptyRight[i].Datum = parser.DNull
}

return jb.filter.init(expr, append(leftTypes, rightTypes...), flowCtx.evalCtx)
}

// render evaluates the provided filter and constructs a row with columns from
// both rows as specified by the provided output columns. We expect left or
// right to be nil if there was no explicit "join" match, the filter is then
// evaluated on a combinedRow with null values for the columns of the nil row.
func (jb *joinerBase) render(lrow, rrow sqlbase.EncDatumRow) (sqlbase.EncDatumRow, error) {
switch jb.joinType {
case innerJoin:
if lrow == nil || rrow == nil {
return nil, nil
}
case fullOuter:
if lrow == nil {
lrow = jb.emptyLeft
} else if rrow == nil {
rrow = jb.emptyRight
}
case leftOuter:
if rrow == nil {
rrow = jb.emptyRight
}
case rightOuter:
if lrow == nil {
lrow = jb.emptyLeft
}
}
jb.combinedRow = append(jb.combinedRow[:0], lrow...)
jb.combinedRow = append(jb.combinedRow, rrow...)
res, err := jb.filter.evalFilter(jb.combinedRow)
if !res || err != nil {
return nil, err
}

row := jb.rowAlloc.AllocRow(len(jb.outputCols))
for i, col := range jb.outputCols {
row[i] = jb.combinedRow[col]
}
return row, nil
}
Loading

0 comments on commit d4b9d17

Please sign in to comment.