From e72c5f63503c10f83ee43874b136f910af316722 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Fri, 10 Jul 2020 22:14:55 -0400 Subject: [PATCH] sql: wip fix hashjoiner error prop Release note: None --- pkg/sql/rowexec/hashjoiner.go | 37 +++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index 4cf8f238dc8d..51f7b7d9c960 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -60,6 +60,9 @@ const ( // other side. This only happens when executing a FULL OUTER, LEFT/RIGHT // OUTER and ANTI joins (depending on which side we store). hjEmittingUnmatched + + hjStateForwardRight + hjStateForwardLeft ) // hashJoiner performs a hash join. There is no guarantee on the output @@ -223,6 +226,27 @@ func (h *hashJoiner) Start(ctx context.Context) context.Context { return ctx } +func (h *hashJoiner) forward( + side joinSide, +) (hashJoinerState, sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) { + row, meta, _, err := h.receiveNext(side) + if err != nil { + h.MoveToDraining(err) + return hjStateUnknown, nil, h.DrainHelper() + } else if meta != nil { + if meta.Err != nil { + h.MoveToDraining(nil) + return hjStateUnknown, nil, meta + } + return h.runningState, nil, meta + } + if row == nil { + h.MoveToDraining(err) + return hjStateUnknown, nil, h.DrainHelper() + } + return h.runningState, nil, nil +} + // Next is part of the RowSource interface. func (h *hashJoiner) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) { for h.State == execinfra.StateRunning { @@ -239,6 +263,10 @@ func (h *hashJoiner) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) h.runningState, row, meta = h.probeRow() case hjEmittingUnmatched: h.runningState, row, meta = h.emitUnmatched() + case hjStateForwardRight: + h.runningState, row, meta = h.forward(rightSide) + case hjStateForwardLeft: + h.runningState, row, meta = h.forward(leftSide) default: log.Fatalf(h.Ctx, "unsupported state: %d", h.runningState) } @@ -317,8 +345,13 @@ func (h *hashJoiner) build() (hashJoinerState, sqlbase.EncDatumRow, *execinfrapb (h.joinType == sqlbase.InnerJoin || (h.joinType == sqlbase.LeftOuterJoin && side == leftSide) || (h.joinType == sqlbase.RightOuterJoin && side == rightSide)) { - h.MoveToDraining(nil /* err */) - return hjStateUnknown, nil, h.DrainHelper() + if side == leftSide { + return hjStateForwardRight, nil, nil + } + return hjStateForwardLeft, nil, nil + //h.MoveToDraining(nil /* err */) + //log.Warningf(h.Ctx, "hash joiner short circuiting into draining %p", h) + //return hjStateUnknown, nil, h.DrainHelper() } // We could skip hjConsumingStoredSide and move straight to // hjReadingProbeSide apart from the fact that hjConsumingStoredSide