diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 22cad5c56d5a..d924f85a92e1 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -642,7 +642,8 @@ func TestOutboxStreamIDPropagation(t *testing.T) { roachpb.NodeID(0), execinfrapb.FlowID{UUID: uuid.MakeV4()}, outboxStreamID, - nil, + nil, /* cancelFn */ + 0, /* connectionTimeout */ ) outboxDone <- struct{}{} }() diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 85d13ba946c7..ec4acab0d34f 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -15,21 +15,21 @@ import ( "context" "io" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/colserde" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" - "google.golang.org/grpc" ) // flowStreamClient is a utility interface used to mock out the RPC layer. @@ -39,13 +39,6 @@ type flowStreamClient interface { CloseSend() error } -// Dialer is used for dialing based on node IDs. It extracts out the single -// method that Outbox.Run needs from nodedialer.Dialer so that we can mock it -// in tests outside of this package. -type Dialer interface { - Dial(context.Context, roachpb.NodeID, rpc.ConnectionClass) (*grpc.ClientConn, error) -} - // Outbox is used to push data from local flows to a remote endpoint. Run may // be called with the necessary information to establish a connection to a // given remote endpoint. @@ -130,11 +123,12 @@ func (o *Outbox) close(ctx context.Context) { // Outbox goes through the same steps as 1). func (o *Outbox) Run( ctx context.Context, - dialer Dialer, + dialer execinfra.Dialer, nodeID roachpb.NodeID, flowID execinfrapb.FlowID, streamID execinfrapb.StreamID, cancelFn context.CancelFunc, + connectionTimeout time.Duration, ) { o.runnerCtx = ctx ctx = logtags.AddTag(ctx, "streamID", streamID) @@ -142,7 +136,7 @@ func (o *Outbox) Run( var stream execinfrapb.DistSQL_FlowStreamClient if err := func() error { - conn, err := dialer.Dial(ctx, nodeID, rpc.DefaultClass) + conn, err := execinfra.GetConnForOutbox(ctx, dialer, nodeID, connectionTimeout) if err != nil { log.Warningf( ctx, diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 52d824164f63..fa71db13ad23 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -586,7 +586,15 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( // derive a separate child context for each outbox. var outboxCancelFn context.CancelFunc ctx, outboxCancelFn = context.WithCancel(ctx) - outbox.Run(ctx, s.nodeDialer, stream.TargetNodeID, s.flowID, stream.StreamID, outboxCancelFn) + outbox.Run( + ctx, + s.nodeDialer, + stream.TargetNodeID, + s.flowID, + stream.StreamID, + outboxCancelFn, + flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV), + ) currentOutboxes := atomic.AddInt32(&s.numOutboxes, -1) // When the last Outbox on this node exits, we want to make sure that // everything is shutdown; namely, we need to call cancelFn if: diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 6fe06540055e..e167e1e15169 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -258,7 +258,15 @@ func TestVectorizedFlowShutdown(t *testing.T) { require.NoError(t, err) wg.Add(1) go func(id int) { - outbox.Run(ctx, dialer, execinfra.StaticNodeID, flowID, execinfrapb.StreamID(id), cancelFn) + outbox.Run( + ctx, + dialer, + execinfra.StaticNodeID, + flowID, + execinfrapb.StreamID(id), + cancelFn, + 0, /* connectionTimeout */ + ) wg.Done() }(id) diff --git a/pkg/sql/execinfra/outboxbase.go b/pkg/sql/execinfra/outboxbase.go new file mode 100644 index 000000000000..d0ce09c6626f --- /dev/null +++ b/pkg/sql/execinfra/outboxbase.go @@ -0,0 +1,53 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package execinfra + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "google.golang.org/grpc" +) + +// Dialer is used for dialing based on node IDs. It extracts out the single +// method that outboxes need from nodedialer.Dialer so that we can mock it +// in tests outside of this package. +type Dialer interface { + DialNoBreaker(context.Context, roachpb.NodeID, rpc.ConnectionClass) (*grpc.ClientConn, error) +} + +// GetConnForOutbox is a shared function between the rowexec and colexec +// outboxes. It attempts to dial the destination ignoring the breaker, up to the +// given timeout and returns the connection or an error. +// This connection attempt is retried since failure results in a query error. In +// the past, we have seen cases where a gateway node, n1, would send a flow +// request to n2, but n2 would be unable to connect back to n1 due to this +// connection attempt failing. +// Retrying here alleviates these flakes and causes no impact to the end +// user, since the receiver at the other end will hang for +// SettingFlowStreamTimeout waiting for a successful connection attempt. +func GetConnForOutbox( + ctx context.Context, dialer Dialer, nodeID roachpb.NodeID, timeout time.Duration, +) (conn *grpc.ClientConn, err error) { + firstConnectionAttempt := timeutil.Now() + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + conn, err = dialer.DialNoBreaker(ctx, nodeID, rpc.DefaultClass) + if err == nil || timeutil.Since(firstConnectionAttempt) > timeout { + break + } + } + return +} diff --git a/pkg/sql/execinfrapb/testutils.go b/pkg/sql/execinfrapb/testutils.go index ec2504119bc1..eda740d7cb4a 100644 --- a/pkg/sql/execinfrapb/testutils.go +++ b/pkg/sql/execinfrapb/testutils.go @@ -134,8 +134,8 @@ type MockDialer struct { } } -// Dial establishes a grpc connection once. -func (d *MockDialer) Dial( +// DialNoBreaker establishes a grpc connection once. +func (d *MockDialer) DialNoBreaker( context.Context, roachpb.NodeID, rpc.ConnectionClass, ) (*grpc.ClientConn, error) { d.mu.Lock() diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index b92af740271b..1525082dd8da 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -18,7 +18,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -29,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" opentracing "github.com/opentracing/opentracing-go" - "google.golang.org/grpc" ) const outboxBufRows = 16 @@ -221,9 +219,9 @@ func (m *Outbox) mainLoop(ctx context.Context) error { }() if m.stream == nil { - var conn *grpc.ClientConn - var err error - conn, err = m.flowCtx.Cfg.NodeDialer.DialNoBreaker(ctx, m.nodeID, rpc.DefaultClass) + conn, err := execinfra.GetConnForOutbox( + ctx, m.flowCtx.Cfg.NodeDialer, m.nodeID, SettingFlowStreamTimeout.Get(&m.flowCtx.Cfg.Settings.SV), + ) if err != nil { // Log any Dial errors. This does not have a verbosity check due to being // a critical part of query execution: if this step doesn't work, the