Skip to content

Commit

Permalink
Merge #96161
Browse files Browse the repository at this point in the history
96161: ccl/sqlproxyccl: fix possible NPE within the connector r=JeffSwenson a=jaylim-crl

Related: https://github.com/cockroachlabs/support/issues/2040

Previously, an invariant is violated where it was possible to return err=nil when the infinite retry dial loop exits. When that happens, callers would attempt to read from the net.Conn object, which is nil, leading to a panic.

The invariant is violated whenever the context that was passed down to `dialTenantCluster` gets cancelled or expires. In particular, this can happen in two cases:
1. when the main stopper stops
2. when a connection migration process hits a timeout (of 15 seconds)

The first case is rare since this has to happen in concert with a transient failure to dial the SQL server.

Here's one example for the second case:
1. we block while dialing the SQL server
2. while we're waiting for (1), transfer hits a timeout, so context gets cancelled
3. (1) gets unblocked due to a timeout
4. err from (1) gets replaced with the error from ReportFailure
5. retry loop checks for context cancellation, and exits
6. we end up returning `nil, errors.Mark(nil, ctx.Err())` = `nil, nil`

The root cause of this issue is that the error from ReportFailure replaced the original error, and usually ReportFailure suceeds. This commit fixes that issue by not reusing the same error variable for ReportFailure.

Epic: none

Release note: None

Co-authored-by: Jay <[email protected]>
  • Loading branch information
craig[bot] and jaylim-crl committed Jan 31, 2023
2 parents 639decf + bdc365c commit ac65603
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 4 deletions.
17 changes: 13 additions & 4 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,20 @@ func (c *connector) dialTenantCluster(
// Report the failure to the directory cache so that it can
// refresh any stale information that may have caused the
// problem.
if err = reportFailureToDirectoryCache(
if reportErr := reportFailureToDirectoryCache(
ctx, c.TenantID, serverAssignment.Addr(), c.DirectoryCache,
); err != nil {
); reportErr != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
log.Ops.Errorf(ctx,
"report failure (%d errors skipped): %v",
reportFailureErrs,
err,
reportErr,
)
reportFailureErrs = 0
}
// nolint:errwrap
err = errors.Wrapf(err, "reporting failure: %s", reportErr.Error())
}
continue
}
Expand All @@ -275,7 +277,14 @@ func (c *connector) dialTenantCluster(
// a bounded number of times. In our case, since we retry infinitely, the
// only possibility is when ctx's Done channel is closed (which implies that
// ctx.Err() != nil).
//
if err == nil || ctx.Err() == nil {
// nolint:errwrap
return nil, errors.AssertionFailedf(
"unexpected retry loop exit, err=%v, ctxErr=%v",
err,
ctx.Err(),
)
}
// If the error is already marked, just return that.
if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return nil, err
Expand Down
84 changes: 84 additions & 0 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,90 @@ func TestConnector_dialTenantCluster(t *testing.T) {
require.Nil(t, conn)
})

t.Run("context canceled after dial fails", func(t *testing.T) {
// This is a short test, and is expected to finish within ms.
ctx, cancel := context.WithTimeout(bgCtx, 2*time.Second)
defer cancel()

stopper := stop.NewStopper()
defer stopper.Stop(ctx)

c := &connector{
TenantID: roachpb.MustMakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
dc := &testTenantDirectoryCache{}
c.DirectoryCache = dc
b, err := balancer.NewBalancer(
ctx,
stopper,
balancer.NewMetrics(),
c.DirectoryCache,
balancer.NoRebalanceLoop(),
)
require.NoError(t, err)
c.Balancer = b

var dialSQLServerCount int
c.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) {
return "127.0.0.10:42", nil
}
c.testingKnobs.dialSQLServer = func(serverAssignment *balancer.ServerAssignment) (net.Conn, error) {
require.Equal(t, serverAssignment.Addr(), "127.0.0.10:42")
dialSQLServerCount++

// Cancel context to trigger loop exit on next retry.
cancel()
return nil, markAsRetriableConnectorError(errors.New("bar"))
}

var reportFailureFnCount int

// Invoke dial tenant with a success to ReportFailure.
// ---------------------------------------------------
dc.reportFailureFn = func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
require.Equal(t, c.TenantID, tenantID)
require.Equal(t, "127.0.0.10:42", addr)
return nil
}
conn, err := c.dialTenantCluster(ctx, nil /* requester */)
require.EqualError(t, err, "bar")
require.True(t, errors.Is(err, context.Canceled))
require.Nil(t, conn)

// Assert existing calls.
require.Equal(t, 1, dialSQLServerCount)
require.Equal(t, 1, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))

// Invoke dial tenant with a failure to ReportFailure. Final error
// should include the secondary failure.
// ---------------------------------------------------------------
dc.reportFailureFn = func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
require.Equal(t, c.TenantID, tenantID)
require.Equal(t, "127.0.0.10:42", addr)
return errors.New("failure to report")
}
conn, err = c.dialTenantCluster(ctx, nil /* requester */)
require.EqualError(t, err, "reporting failure: failure to report: bar")
require.True(t, errors.Is(err, context.Canceled))
require.Nil(t, conn)

// Assert existing calls.
require.Equal(t, 2, dialSQLServerCount)
require.Equal(t, 2, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(2))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))
})

t.Run("non-transient error", func(t *testing.T) {
// This is a short test, and is expected to finish within ms.
ctx, cancel := context.WithTimeout(bgCtx, 2*time.Second)
Expand Down

0 comments on commit ac65603

Please sign in to comment.