Skip to content

Commit

Permalink
sql: drain instead of hard shutdown in DistSQLReceiver.Push
Browse files Browse the repository at this point in the history
Previously, when the DistSQLReceiver encountered an error or received
enough rows to satisfy its consumer, it would transition to
`ConsumerClosed` status. I believe this is an incorrect behavior, and in
almost all cases we actually must transition to draining (we already had
several TODOs to do that), the only exception is if we're canceled
(indicated by an error on the context).

My reasoning is that there are certain types of metadata (like
LeafTxnFinalState) that must be received by the gateway to achieve the
correctness, yet that metadata is only collected in the draining state,
so if we go from `NeedMoreRows` to `ConsumerClosed`, we don't get
a chance to collect that meta.

As a concrete example, consider the way we implemented portals with
limits - some SELECT query reads  some rows, once the portal limit is
satisfied, an error is returned from `AddRow`, this will currently cause
us to perform a hard shutdown of the flow, and we end up not collecting
spans to refresh. I believe it is a no bueno.

This commit fixes this problem by transitioning to draining in all cases
except when the context has an error.

Release note: None
  • Loading branch information
yuzefovich committed Apr 2, 2021
1 parent eff4eb2 commit 7bc94ea
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,20 @@ func (r *DistSQLReceiver) SetError(err error) {
r.resultWriter.SetError(err)
}

// checkError checks whether the resultWriter has an error set and updates the
// status if so. The status is also returned for convenience.
func (r *DistSQLReceiver) checkError() execinfra.ConsumerStatus {
if r.resultWriter.Err() != nil {
// If we encountered an error, we drain unless we were canceled.
if r.ctx.Err() != nil {
r.status = execinfra.ConsumerClosed
} else {
r.status = execinfra.DrainRequested
}
}
return r.status
}

// Push is part of the RowReceiver interface.
func (r *DistSQLReceiver) Push(
row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata,
Expand Down Expand Up @@ -675,15 +689,12 @@ func (r *DistSQLReceiver) Push(
}
// Release the meta object. It is unsafe for use after this call.
meta.Release()
return r.status
return r.checkError()
}
if r.resultWriter.Err() == nil && r.ctx.Err() != nil {
r.resultWriter.SetError(r.ctx.Err())
}
if r.resultWriter.Err() != nil {
// TODO(andrei): We should drain here if we weren't canceled.
return execinfra.ConsumerClosed
}
r.checkError()
if r.status != execinfra.NeedMoreRows {
return r.status
}
Expand All @@ -706,7 +717,7 @@ func (r *DistSQLReceiver) Push(
// planNodeToRowSource is not set up to handle decoding the row.
if r.noColsRequired {
r.row = []tree.Datum{}
r.status = execinfra.ConsumerClosed
r.status = execinfra.DrainRequested
} else {
if r.row == nil {
r.row = make(tree.Datums, len(row))
Expand All @@ -715,18 +726,19 @@ func (r *DistSQLReceiver) Push(
err := encDatum.EnsureDecoded(r.outputTypes[i], &r.alloc)
if err != nil {
r.resultWriter.SetError(err)
r.status = execinfra.ConsumerClosed
return r.status
return r.checkError()
}
r.row[i] = encDatum.Datum
}
}
r.tracing.TraceExecRowsResult(r.ctx, r.row)
// Note that AddRow accounts for the memory used by the Datums.
if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil {
// ErrLimitedResultClosed is not a real error, it is a
// signal to stop distsql and return success to the client.
if !errors.Is(commErr, ErrLimitedResultClosed) {
if errors.Is(commErr, ErrLimitedResultClosed) {
// ErrLimitedResultClosed is not a real error, it is a signal to
// stop distsql and return success to the client (that's why we
// don't set the error on the resultWriter).
r.status = execinfra.DrainRequested
} else {
// Set the error on the resultWriter too, for the convenience of some of the
// clients. If clients don't care to differentiate between communication
// errors and query execution errors, they can simply inspect
Expand All @@ -745,11 +757,8 @@ func (r *DistSQLReceiver) Push(
if !errors.Is(commErr, ErrLimitedResultNotSupported) {
r.commErr = commErr
}
r.checkError()
}
// TODO(andrei): We should drain here. Metadata from this query would be
// useful, particularly as it was likely a large query (since AddRow()
// above failed, presumably with an out-of-memory error).
r.status = execinfra.ConsumerClosed
}
return r.status
}
Expand Down

0 comments on commit 7bc94ea

Please sign in to comment.