Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsqlrun: schedule failed streams on the gateway #17497

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (
"sync/atomic"
"unsafe"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -33,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1176,14 +1174,7 @@ func (ds *DistSender) sendToReplicas(
// leader). If the original attempt merely timed out or was
// lost, then the batch will succeed and we can be assured the
// commit was applied just once.
//
// The Unavailable code is used by GRPC to indicate that a
// request fails fast and is not sent, so we can be sure there
// is no ambiguity on these errors. Note that these are common
// if a node is down.
// See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/call.go#L182
// See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/stream.go#L158
if haveCommit && grpc.Code(err) != codes.Unavailable {
if haveCommit && !netutil.ErrIsGRPCUnavailable(err) {
ambiguousError = err
}
log.ErrEventf(ctx, "RPC error: %s", err)
Expand Down
132 changes: 127 additions & 5 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package sql

import (
"strings"
"time"

"golang.org/x/net/context"

opentracing "github.com/opentracing/opentracing-go"
Expand All @@ -29,9 +32,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// poisonedFlowDefaultTimeout is the amount of time that a poisoned flow (a
// flow that will not actually be scheduled) lives in the FlowRegistry.
const poisonedFlowDefaultTimeout time.Duration = time.Second

// To allow queries to send out flow RPCs in parallel, we use a pool of workers
// that can issue the RPCs on behalf of the running code. The pool is shared by
// multiple queries.
Expand Down Expand Up @@ -128,6 +136,7 @@ func (dsp *distSQLPlanner) Run(

recv.resultToStreamColMap = plan.planToStreamColMap
thisNodeID := dsp.nodeDesc.NodeID
thisNodeAddr := planCtx.nodeAddresses[thisNodeID]

// DistSQL needs to initialize the Transaction proto before we put it in the
// FlowRequest's below. This is because we might not have used the txn do to
Expand All @@ -151,6 +160,10 @@ func (dsp *distSQLPlanner) Run(
// Skip this node.
continue
}

addFallbackToOutputStreams(&flowSpec, thisNodeAddr)
flows[nodeID] = flowSpec

req := &distsqlrun.SetupFlowRequest{
Version: distsqlrun.Version,
Txn: *txn.Proto(),
Expand All @@ -175,25 +188,63 @@ func (dsp *distSQLPlanner) Run(
}

var firstErr error
var flowsToRunLocally []roachpb.NodeID
anyScheduled := false
// Now wait for all the flows to be scheduled on remote nodes. Note that we
// are not waiting for the flows themselves to complete.
for i := 0; i < len(flows)-1; i++ {
res := <-resultChan
if firstErr == nil {
firstErr = res.err
if res.err != nil {
// Flows for which we've gotten an error will be run locally iff we know for
// sure that the server where we attempted to schedule them did not, in
// fact, schedule them (we don't want to schedule the same flow on two
// nodes, as that might corrupt results in cases some producers connect to
// one node and other to the other).
// Note that admission control errors are not recovered in this way - we
// don't want to overload the local node in case another node is overloaded.
// TODO(andrei, radu): this should be reconsidered when we integrate the
// local flow with local admission control.
if isVersionMismatchErr(res.err) || netutil.ErrIsGRPCUnavailable(res.err) {
// Flows that failed to schedule because of version mismatch will be run
// locally.
flowsToRunLocally = append(flowsToRunLocally, res.nodeID)
continue
}
if firstErr == nil {
firstErr = res.err
}
continue
}
// TODO(radu): accumulate the flows that we failed to set up and move them
// into the local flow.
anyScheduled = true
}
localFlow := flows[thisNodeID]
if firstErr != nil {
if anyScheduled {
// If any flows were scheduled, poison the local FlowRegistry so those
// remote flows are likely to catch an error soon.
dsp.distSQLSrv.PoisonFlow(localFlow.FlowID, poisonedFlowDefaultTimeout)
}
return firstErr
}

// Merge the flowsToRunLocally (if any) into the local flow.
if len(flowsToRunLocally) > 0 {
addressesToRewrite := make(map[string]struct{})
for _, failedNodeID := range flowsToRunLocally {
log.VEventf(ctx, 2, "scheduling locally flow for node: %d", failedNodeID)
// Copy over the processors to the local flow.
localFlow.Processors = append(localFlow.Processors, flows[failedNodeID].Processors...)
addressesToRewrite[planCtx.nodeAddresses[failedNodeID]] = struct{}{}
}
// Rewrite the streams that are now local.
rewriteStreamsToLocal(&localFlow, addressesToRewrite)
}

// Set up the flow on this node.
localReq := distsqlrun.SetupFlowRequest{
Version: distsqlrun.Version,
Txn: *txn.Proto(),
Flow: flows[thisNodeID],
Flow: localFlow,
EvalContext: evalCtxProto,
}
ctx, flow, err := dsp.distSQLSrv.SetupSyncFlow(ctx, &localReq, recv)
Expand All @@ -208,6 +259,77 @@ func (dsp *distSQLPlanner) Run(
return nil
}

// rewriteStreamToLocal rewrites all the streams in flow that are either inbound
// from or outbound to any node in addressesToRewrite (identified by the node's
// address) to local streams. This is called after the respective processors
// have been moved to run locally.
func rewriteStreamsToLocal(flow *distsqlrun.FlowSpec, addressesToRewrite map[string]struct{}) {
// Two steps:
// 1. Rewrite the outgoing endpoints, identifying them by
// addresses of failed nodes. Also collect all the stream IDs. These need
// collecting because only outgoing endpoints have addresses in the spec, but
// we also need to rewrite the incoming endpoints.
// 2. Rewrite the incoming endpoints.
streamIDs := make(map[distsqlrun.StreamID]struct{})
for i := range flow.Processors {
proc := &flow.Processors[i]
for j := range proc.Output {
output := &proc.Output[j]
for k := range output.Streams {
outputStream := &output.Streams[k]
if _, ok := addressesToRewrite[outputStream.TargetAddr]; ok {
outputStream.Type = distsqlrun.StreamEndpointSpec_LOCAL
outputStream.TargetAddr = ""
streamIDs[outputStream.StreamID] = struct{}{}
}
}
}
}

for i := range flow.Processors {
proc := &flow.Processors[i]
for j := range proc.Input {
input := &proc.Input[j]
for k := range input.Streams {
inputStream := &input.Streams[k]
if _, ok := streamIDs[inputStream.StreamID]; ok {
inputStream.Type = distsqlrun.StreamEndpointSpec_LOCAL
}
}
}
}
}

// addFallbackToOutputStreams fills in the FallbackAddr field of all the output
// streams in the flow to the specified gatewayAddr
func addFallbackToOutputStreams(flow *distsqlrun.FlowSpec, gatewayAddr string) {
for i := range flow.Processors {
proc := &flow.Processors[i]
for j := range proc.Output {
output := &proc.Output[j]
for k := range output.Streams {
outputStream := &output.Streams[k]
if outputStream.Type == distsqlrun.StreamEndpointSpec_REMOTE {
outputStream.FallbackAddr = &gatewayAddr
}
}
}
}
}

// isVersionMismatchErr checks whether the error is a DistSQL version mismatch
// error, indicating that a SetupFlow request failed because a remote node is
// incompatible.
func isVersionMismatchErr(err error) bool {
// We check both the error string and the error type. We'd like to check just
// the type, but 1.0 didn't have a typed error.
if strings.HasPrefix(err.Error(), distsqlrun.VersionMismatchErrorPrefix) {
return true
}
_, ok := err.(*distsqlrun.VersionMismatchError)
return ok
}

// distSQLReceiver is a RowReceiver that stores incoming rows in a RowContainer.
// This is where the DistSQL execution meets the SQL Session - the RowContainer
// comes from a client Session.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsqlrun/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,17 @@ func NewError(err error) *Error {
Detail: &Error_RetryableTxnError{
RetryableTxnError: retryErr,
}}
} else {
// Anything unrecognized is an "internal error".
} else if versionMismatchError, ok := err.(*VersionMismatchError); ok {
return &Error{
Detail: &Error_PGError{
PGError: pgerror.NewError(
pgerror.CodeInternalError, err.Error())}}
Detail: &Error_VersionMismatchError{
VersionMismatchError: versionMismatchError,
}}
}
// Anything unrecognized is an "internal error".
return &Error{
Detail: &Error_PGError{
PGError: pgerror.NewError(
pgerror.CodeInternalError, err.Error())}}
}

// ErrorDetail returns the payload as a Go error.
Expand All @@ -646,6 +650,8 @@ func (e *Error) ErrorDetail() error {
return t.PGError
case *Error_RetryableTxnError:
return t.RetryableTxnError
case *Error_VersionMismatchError:
return t.VersionMismatchError
default:
panic(fmt.Sprintf("bad error detail: %+v", t))
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/distsqlrun/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package distsqlrun

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
)
Expand Down Expand Up @@ -49,3 +51,25 @@ func convertToSpecOrdering(columnOrdering sqlbase.ColumnOrdering) Ordering {
}
return specOrdering
}

// VersionMismatchErrorPrefix is a prefix of the VersionMismatchError's message.
// This can be used to check for this error even when it's coming from servers
// that were returning it as an untyped "internal error".
const VersionMismatchErrorPrefix = "version mismatch in flow request:"

// NewVersionMismatchError creates a new VersionMismatchError.
func NewVersionMismatchError(
requestedVersion uint32, serverMinVersion uint32, serverVersion uint32,
) error {
return &VersionMismatchError{
RequestedVersion: uint32(requestedVersion),
ServerMinVersion: uint32(serverMinVersion),
ServerVersion: uint32(serverVersion),
}
}

// Error implements the error interface.
func (e *VersionMismatchError) Error() string {
return fmt.Sprintf("%s %d; this node accepts %d through %d",
VersionMismatchErrorPrefix, e.RequestedVersion, e.ServerMinVersion, e.ServerVersion)
}
Loading