Skip to content

Commit

Permalink
sql: retry all DistSQL runner dial errors
Browse files Browse the repository at this point in the history
This commit marks the error that DistSQL runners produce when dialing
remote nodes in a special way that is now always retried-as-local. In
particular, this allows us to fix two problematic scenarios that could
occur when using secondary tenants:
- when attempting to start a pod with stale instance information
- the port is in use by an RPC server for the same tenant, but with
a new instance id.

This commit includes the test from Jeff that exposed the gap in the
retry-as-local mechanism.

Release note: None
  • Loading branch information
yuzefovich committed Aug 10, 2023
1 parent d819b9c commit 990dfdc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
58 changes: 58 additions & 0 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"strings"
"testing"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -413,3 +415,59 @@ func TestSystemConfigWatcherCache(t *testing.T) {
defer leaktest.AfterTest(t)()
systemconfigwatchertest.TestSystemConfigWatcher(t, false /* skipSecondary */)
}

// TestStartTenantWithStaleInstance covers the following scenario:
// - a sql server starts up and is assigned port 'a'
// - the sql server shuts down and releases port 'a'
// - something else starts up and claims port 'a'. In the test that is the
// listener. This is important because the listener causes connections to 'a' to
// hang instead of responding with a RESET packet.
// - a different server with stale instance information schedules a distsql
// flow and attempts to dial port 'a'.
func TestStartTenantWithStaleInstance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s := serverutils.StartServerOnly(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
})
defer s.Stopper().Stop(ctx)

var listener net.Listener
// In rare cases under stress net.Listen call can result in an error that
// the address is already in use (because the stopped tenant hasn't released
// the socket); thus, we allow for some retries to go around that issue.
testutils.SucceedsSoon(t, func() error {
rpcAddr := func() string {
tenantStopper := stop.NewStopper()
defer tenantStopper.Stop(ctx)
server, db := serverutils.StartTenant(t, s, base.TestTenantArgs{
Stopper: tenantStopper,
TenantID: serverutils.TestTenantID(),
},
)
defer db.Close()
return server.RPCAddr()
}()

var err error
listener, err = net.Listen("tcp", rpcAddr)
return err
})
defer func() {
_ = listener.Close()
}()

_, db := serverutils.StartTenant(t, s, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
})
defer func() {
_ = db.Close()
}()

// Query a table to make sure the tenant is healthy, doesn't really matter
// which table.
_, err := db.Exec("SELECT count(*) FROM system.sqlliveness")
require.NoError(t, err)
}
23 changes: 22 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ type runnerResult struct {
err error
}

type runnerDialErr struct {
err error
}

func (e *runnerDialErr) Error() string {
return e.err.Error()
}

func (e *runnerDialErr) Cause() error {
return e.err
}

func isDialErr(err error) bool {
return errors.HasType(err, (*runnerDialErr)(nil))
}

// run executes the request. An error, if encountered, is both sent on the
// result channel and returned.
func (req runnerRequest) run() error {
Expand All @@ -111,6 +127,9 @@ func (req runnerRequest) run() error {

conn, err := req.podNodeDialer.Dial(req.ctx, roachpb.NodeID(req.sqlInstanceID), rpc.DefaultClass)
if err != nil {
// Mark this error as special runnerDialErr so that we could retry this
// distributed query as local.
err = &runnerDialErr{err: err}
res.err = err
return err
}
Expand Down Expand Up @@ -1979,7 +1998,9 @@ func (dsp *DistSQLPlanner) PlanAndRun(
// cancellation has already occurred.
return
}
if !pgerror.IsSQLRetryableError(distributedErr) && !flowinfra.IsFlowRetryableError(distributedErr) {
if !pgerror.IsSQLRetryableError(distributedErr) &&
!flowinfra.IsFlowRetryableError(distributedErr) &&
!isDialErr(distributedErr) {
// Only re-run the query if we think there is a high chance of a
// successful local execution.
return
Expand Down

0 comments on commit 990dfdc

Please sign in to comment.