Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execinfra: add retries to outbox DialNoBreaker attempts #52624

Merged
merged 1 commit into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,8 @@ func TestOutboxStreamIDPropagation(t *testing.T) {
roachpb.NodeID(0),
execinfrapb.FlowID{UUID: uuid.MakeV4()},
outboxStreamID,
nil,
nil, /* cancelFn */
0, /* connectionTimeout */
)
outboxDone <- struct{}{}
}()
Expand Down
16 changes: 5 additions & 11 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ import (
"context"
"io"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"google.golang.org/grpc"
)

// flowStreamClient is a utility interface used to mock out the RPC layer.
Expand All @@ -39,13 +39,6 @@ type flowStreamClient interface {
CloseSend() error
}

// Dialer is used for dialing based on node IDs. It extracts out the single
// method that Outbox.Run needs from nodedialer.Dialer so that we can mock it
// in tests outside of this package.
type Dialer interface {
Dial(context.Context, roachpb.NodeID, rpc.ConnectionClass) (*grpc.ClientConn, error)
}

// Outbox is used to push data from local flows to a remote endpoint. Run may
// be called with the necessary information to establish a connection to a
// given remote endpoint.
Expand Down Expand Up @@ -130,19 +123,20 @@ func (o *Outbox) close(ctx context.Context) {
// Outbox goes through the same steps as 1).
func (o *Outbox) Run(
ctx context.Context,
dialer Dialer,
dialer execinfra.Dialer,
nodeID roachpb.NodeID,
flowID execinfrapb.FlowID,
streamID execinfrapb.StreamID,
cancelFn context.CancelFunc,
connectionTimeout time.Duration,
) {
o.runnerCtx = ctx
ctx = logtags.AddTag(ctx, "streamID", streamID)
log.VEventf(ctx, 2, "Outbox Dialing %s", nodeID)

var stream execinfrapb.DistSQL_FlowStreamClient
if err := func() error {
conn, err := dialer.Dial(ctx, nodeID, rpc.DefaultClass)
conn, err := execinfra.GetConnForOutbox(ctx, dialer, nodeID, connectionTimeout)
if err != nil {
log.Warningf(
ctx,
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,15 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream(
// derive a separate child context for each outbox.
var outboxCancelFn context.CancelFunc
ctx, outboxCancelFn = context.WithCancel(ctx)
outbox.Run(ctx, s.nodeDialer, stream.TargetNodeID, s.flowID, stream.StreamID, outboxCancelFn)
outbox.Run(
ctx,
s.nodeDialer,
stream.TargetNodeID,
s.flowID,
stream.StreamID,
outboxCancelFn,
flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV),
)
currentOutboxes := atomic.AddInt32(&s.numOutboxes, -1)
// When the last Outbox on this node exits, we want to make sure that
// everything is shutdown; namely, we need to call cancelFn if:
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,15 @@ func TestVectorizedFlowShutdown(t *testing.T) {
require.NoError(t, err)
wg.Add(1)
go func(id int) {
outbox.Run(ctx, dialer, execinfra.StaticNodeID, flowID, execinfrapb.StreamID(id), cancelFn)
outbox.Run(
ctx,
dialer,
execinfra.StaticNodeID,
flowID,
execinfrapb.StreamID(id),
cancelFn,
0, /* connectionTimeout */
)
wg.Done()
}(id)

Expand Down
53 changes: 53 additions & 0 deletions pkg/sql/execinfra/outboxbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execinfra

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"google.golang.org/grpc"
)

// Dialer is used for dialing based on node IDs. It extracts out the single
// method that outboxes need from nodedialer.Dialer so that we can mock it
// in tests outside of this package.
type Dialer interface {
DialNoBreaker(context.Context, roachpb.NodeID, rpc.ConnectionClass) (*grpc.ClientConn, error)
}

// GetConnForOutbox is a shared function between the rowexec and colexec
// outboxes. It attempts to dial the destination ignoring the breaker, up to the
// given timeout and returns the connection or an error.
// This connection attempt is retried since failure results in a query error. In
// the past, we have seen cases where a gateway node, n1, would send a flow
// request to n2, but n2 would be unable to connect back to n1 due to this
// connection attempt failing.
// Retrying here alleviates these flakes and causes no impact to the end
// user, since the receiver at the other end will hang for
// SettingFlowStreamTimeout waiting for a successful connection attempt.
func GetConnForOutbox(
ctx context.Context, dialer Dialer, nodeID roachpb.NodeID, timeout time.Duration,
) (conn *grpc.ClientConn, err error) {
firstConnectionAttempt := timeutil.Now()
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
conn, err = dialer.DialNoBreaker(ctx, nodeID, rpc.DefaultClass)
if err == nil || timeutil.Since(firstConnectionAttempt) > timeout {
break
}
}
return
}
4 changes: 2 additions & 2 deletions pkg/sql/execinfrapb/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ type MockDialer struct {
}
}

// Dial establishes a grpc connection once.
func (d *MockDialer) Dial(
// DialNoBreaker establishes a grpc connection once.
func (d *MockDialer) DialNoBreaker(
context.Context, roachpb.NodeID, rpc.ConnectionClass,
) (*grpc.ClientConn, error) {
d.mu.Lock()
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)

const outboxBufRows = 16
Expand Down Expand Up @@ -221,9 +219,9 @@ func (m *Outbox) mainLoop(ctx context.Context) error {
}()

if m.stream == nil {
var conn *grpc.ClientConn
var err error
conn, err = m.flowCtx.Cfg.NodeDialer.DialNoBreaker(ctx, m.nodeID, rpc.DefaultClass)
conn, err := execinfra.GetConnForOutbox(
ctx, m.flowCtx.Cfg.NodeDialer, m.nodeID, SettingFlowStreamTimeout.Get(&m.flowCtx.Cfg.Settings.SV),
)
if err != nil {
// Log any Dial errors. This does not have a verbosity check due to being
// a critical part of query execution: if this step doesn't work, the
Expand Down