Skip to content

Commit

Permalink
ccl/sqlproxyccl: fix possible NPE within the connector
Browse files Browse the repository at this point in the history
Previously, an invariant is violated where it was possible to return err=nil
when the infinite retry dial loop exits. When that happens, callers would
attempt to read from the net.Conn object, which is nil, leading to a panic.

The invariant is violated whenever the context that was passed down to
`dialTenantCluster` gets cancelled or expires. In particular, this can happen
in two cases:
1. when the main stopper stops
2. when a connection migration process hits a timeout (of 15 seconds)

The first case is rare since this has to happen in concert with a transient
failure to dial the SQL server.

Here's one example for the second case:
1. we block while dialing the SQL server
2. while we're waiting for (1), transfer hits a timeout, so context gets
   cancelled
3. (1) gets unblocked due to a timeout
4. err from (1) gets replaced with the error from ReportFailure
5. retry loop checks for context cancellation, and exits
6. we end up returning `nil, errors.Mark(nil, ctx.Err())` = `nil, nil`

The root cause of this issue is that the error from ReportFailure replaced the
original error, and usually ReportFailure suceeds. This commit fixes that
issue by not reusing the same error variable for ReportFailure.

Epic: none

Release note: None
  • Loading branch information
jaylim-crl committed Jan 30, 2023
1 parent 69dd453 commit c58c5e5
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 4 deletions.
15 changes: 11 additions & 4 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,19 @@ func (c *connector) dialTenantCluster(
// Report the failure to the directory cache so that it can
// refresh any stale information that may have caused the
// problem.
if err = reportFailureToDirectoryCache(
if reportErr := reportFailureToDirectoryCache(
ctx, c.TenantID, serverAssignment.Addr(), c.DirectoryCache,
); err != nil {
); reportErr != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
log.Ops.Errorf(ctx,
"report failure (%d errors skipped): %v",
reportFailureErrs,
err,
reportErr,
)
reportFailureErrs = 0
}
err = errors.Wrap(err, reportErr.Error())
}
continue
}
Expand All @@ -275,7 +276,13 @@ func (c *connector) dialTenantCluster(
// a bounded number of times. In our case, since we retry infinitely, the
// only possibility is when ctx's Done channel is closed (which implies that
// ctx.Err() != nil).
//
if err == nil || ctx.Err() == nil {
return nil, errors.AssertionFailedf(
"unexpected retry loop exit, err=%v, ctxErr=%v",
err,
ctx.Err(),
)
}
// If the error is already marked, just return that.
if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return nil, err
Expand Down
84 changes: 84 additions & 0 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,90 @@ func TestConnector_dialTenantCluster(t *testing.T) {
require.Nil(t, conn)
})

t.Run("context canceled after dial fails", func(t *testing.T) {
// This is a short test, and is expected to finish within ms.
ctx, cancel := context.WithTimeout(bgCtx, 2*time.Second)
defer cancel()

stopper := stop.NewStopper()
defer stopper.Stop(ctx)

c := &connector{
TenantID: roachpb.MustMakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
dc := &testTenantDirectoryCache{}
c.DirectoryCache = dc
b, err := balancer.NewBalancer(
ctx,
stopper,
balancer.NewMetrics(),
c.DirectoryCache,
balancer.NoRebalanceLoop(),
)
require.NoError(t, err)
c.Balancer = b

var dialSQLServerCount int
c.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) {
return "127.0.0.10:42", nil
}
c.testingKnobs.dialSQLServer = func(serverAssignment *balancer.ServerAssignment) (net.Conn, error) {
require.Equal(t, serverAssignment.Addr(), "127.0.0.10:42")
dialSQLServerCount++

// Cancel context to trigger loop exit on next retry.
cancel()
return nil, markAsRetriableConnectorError(errors.New("bar"))
}

var reportFailureFnCount int

// Invoke dial tenant with a success to ReportFailure.
// ---------------------------------------------------
dc.reportFailureFn = func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
require.Equal(t, c.TenantID, tenantID)
require.Equal(t, "127.0.0.10:42", addr)
return nil
}
conn, err := c.dialTenantCluster(ctx, nil /* requester */)
require.EqualError(t, err, "bar")
require.True(t, errors.Is(err, context.Canceled))
require.Nil(t, conn)

// Assert existing calls.
require.Equal(t, 1, dialSQLServerCount)
require.Equal(t, 1, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))

// Invoke dial tenant with a failure to ReportFailure. Final error
// should include the secondary failure.
// ---------------------------------------------------------------
dc.reportFailureFn = func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
require.Equal(t, c.TenantID, tenantID)
require.Equal(t, "127.0.0.10:42", addr)
return errors.New("failure to report")
}
conn, err = c.dialTenantCluster(ctx, nil /* requester */)
require.EqualError(t, err, "failure to report: bar")
require.True(t, errors.Is(err, context.Canceled))
require.Nil(t, conn)

// Assert existing calls.
require.Equal(t, 2, dialSQLServerCount)
require.Equal(t, 2, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(2))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))
})

t.Run("non-transient error", func(t *testing.T) {
// This is a short test, and is expected to finish within ms.
ctx, cancel := context.WithTimeout(bgCtx, 2*time.Second)
Expand Down

0 comments on commit c58c5e5

Please sign in to comment.