diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index d8762f53b634..297a818e6a28 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -144,11 +144,19 @@ func (n *Dialer) DialInternalClient( if err != nil { return nil, nil, err } - // TODO(bdarnell): Reconcile the different health checks and circuit - // breaker behavior in this file + // Check to see if the connection is in the transient failure state. This can + // happen if the connection already existed, but a recent heartbeat has + // failed and we haven't yet torn down the connection. if err := grpcutil.ConnectionReady(conn); err != nil { return nil, nil, err } + // TODO(bdarnell): Reconcile the different health checks and circuit breaker + // behavior in this file. Note that this different behavior causes problems + // for higher-levels in the system. For example, DistSQL checks for + // ConnHealth when scheduling processors, but can then see attempts to send + // RPCs fail when dial fails due to an open breaker. Reset the breaker here + // as a stop-gap before the reconciliation occurs. + n.getBreaker(nodeID).Reset() return ctx, roachpb.NewInternalClient(conn), nil } @@ -159,6 +167,9 @@ func (n *Dialer) ConnHealth(nodeID roachpb.NodeID) error { if n == nil || n.resolver == nil { return errors.New("no node dialer configured") } + if !n.getBreaker(nodeID).Ready() { + return circuit.ErrBreakerOpen + } addr, err := n.resolver(nodeID) if err != nil { return err diff --git a/pkg/server/server.go b/pkg/server/server.go index 6c8f5a9f9f20..972676a3ae04 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -614,7 +614,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.gossip, s.stopper, s.nodeLiveness, - sqlExecutorTestingKnobs.DistSQLPlannerKnobs, s.nodeDialer, ), diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 50ca59294fba..a9b3ec9cf2b1 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -74,11 +74,9 @@ type DistSQLPlanner struct { st *cluster.Settings // The node descriptor for the gateway node that initiated this query. nodeDesc roachpb.NodeDescriptor - rpcContext *rpc.Context stopper *stop.Stopper distSQLSrv *distsqlrun.ServerImpl spanResolver distsqlplan.SpanResolver - testingKnobs DistSQLPlannerTestingKnobs // metadataTestTolerance is the minimum level required to plan metadata test // processors. @@ -90,10 +88,12 @@ type DistSQLPlanner struct { // gossip handle used to check node version compatibility. gossip *gossip.Gossip - // liveness is used to avoid planning on down nodes. - liveness *storage.NodeLiveness nodeDialer *nodedialer.Dialer + + // nodeHealth encapsulates the various node health checks to avoid planning + // on unhealthy nodes. + nodeHealth distSQLNodeHealth } const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice @@ -135,26 +135,36 @@ func NewDistSQLPlanner( gossip *gossip.Gossip, stopper *stop.Stopper, liveness *storage.NodeLiveness, - testingKnobs DistSQLPlannerTestingKnobs, nodeDialer *nodedialer.Dialer, ) *DistSQLPlanner { if liveness == nil { panic("must specify liveness") } dsp := &DistSQLPlanner{ - planVersion: planVersion, - st: st, - nodeDesc: nodeDesc, - rpcContext: rpcCtx, - stopper: stopper, - distSQLSrv: distSQLSrv, - gossip: gossip, - spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy), - liveness: liveness, - testingKnobs: testingKnobs, + planVersion: planVersion, + st: st, + nodeDesc: nodeDesc, + stopper: stopper, + distSQLSrv: distSQLSrv, + spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy), + gossip: gossip, + nodeDialer: nodeDialer, + nodeHealth: distSQLNodeHealth{ + gossip: gossip, + connHealth: nodeDialer.ConnHealth, + }, metadataTestTolerance: distsqlrun.NoExplain, - nodeDialer: nodeDialer, } + // NB: not all tests populate a NodeLiveness. Everything using the + // proper constructor NewDistSQLPlanner will, though. + if liveness != nil { + dsp.nodeHealth.isLive = liveness.IsLive + } else { + dsp.nodeHealth.isLive = func(_ roachpb.NodeID) (bool, error) { + return true, nil + } + } + dsp.initRunners() return dsp } @@ -591,40 +601,13 @@ type SpanPartition struct { Spans roachpb.Spans } -func (dsp *DistSQLPlanner) checkNodeHealth( - ctx context.Context, nodeID roachpb.NodeID, addr string, -) error { - // NB: not all tests populate a NodeLiveness. Everything using the - // proper constructor NewDistSQLPlanner will, though. - isLive := func(_ roachpb.NodeID) (bool, error) { - return true, nil - } - if dsp.liveness != nil { - isLive = dsp.liveness.IsLive - } - return checkNodeHealth(ctx, nodeID, addr, dsp.testingKnobs, dsp.gossip, dsp.rpcContext.ConnHealth, isLive) +type distSQLNodeHealth struct { + gossip *gossip.Gossip + connHealth func(roachpb.NodeID) error + isLive func(roachpb.NodeID) (bool, error) } -func checkNodeHealth( - ctx context.Context, - nodeID roachpb.NodeID, - addr string, - knobs DistSQLPlannerTestingKnobs, - g *gossip.Gossip, - connHealth func(string) error, - isLive func(roachpb.NodeID) (bool, error), -) error { - // Check if the target's node descriptor is gossiped. If it isn't, the node - // is definitely gone and has been for a while. - // - // TODO(tschottdorf): it's not clear that this adds anything to the liveness - // check below. The node descriptor TTL is an hour as of 03/2018. - if _, err := g.GetNodeIDAddress(nodeID); err != nil { - log.VEventf(ctx, 1, "not using n%d because gossip doesn't know about it. "+ - "It might have gone away from the cluster. Gossip said: %s.", nodeID, err) - return err - } - +func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) error { { // NB: as of #22658, ConnHealth does not work as expected; see the // comment within. We still keep this code for now because in @@ -633,13 +616,7 @@ func checkNodeHealth( // artifact of rpcContext's reconnection mechanism at the time of // writing). This is better than having it used in 100% of cases // (until the liveness check below kicks in). - var err error - if knobs.OverrideHealthCheck != nil { - err = knobs.OverrideHealthCheck(nodeID, addr) - } else { - err = connHealth(addr) - } - + err := h.connHealth(nodeID) if err != nil && err != rpc.ErrNotHeartbeated { // This host is known to be unhealthy. Don't use it (use the gateway // instead). Note: this can never happen for our nodeID (which @@ -649,7 +626,7 @@ func checkNodeHealth( } } { - live, err := isLive(nodeID) + live, err := h.isLive(nodeID) if err == nil && !live { err = errors.New("node is not live") } @@ -660,7 +637,7 @@ func checkNodeHealth( // Check that the node is not draining. drainingInfo := &distsqlrun.DistSQLDrainingInfo{} - if err := g.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil { + if err := h.gossip.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil { // Because draining info has no expiration, an error // implies that we have not yet received a node's // draining information. Since this information is @@ -757,7 +734,7 @@ func (dsp *DistSQLPlanner) PartitionSpans( addr, inAddrMap := planCtx.NodeAddresses[nodeID] if !inAddrMap { addr = replInfo.NodeDesc.Address.String() - if err := dsp.checkNodeHealth(ctx, nodeID, addr); err != nil { + if err := dsp.nodeHealth.check(ctx, nodeID); err != nil { addr = "" } if err == nil && addr != "" { @@ -1028,15 +1005,14 @@ func (dsp *DistSQLPlanner) CheckNodeHealthAndVersion( planCtx *PlanningCtx, desc *roachpb.NodeDescriptor, ) error { nodeID := desc.NodeID - addr := desc.Address.String() var err error - if err = dsp.checkNodeHealth(planCtx.ctx, nodeID, addr); err != nil { + if err = dsp.nodeHealth.check(planCtx.ctx, nodeID); err != nil { err = errors.New("unhealthy") } else if !dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion) { err = errors.New("incompatible version") } else { - planCtx.NodeAddresses[nodeID] = addr + planCtx.NodeAddresses[nodeID] = desc.Address.String() } return err } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 487c3ccb6cf8..7bcbca9061c0 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -774,8 +774,9 @@ func TestPartitionSpans(t *testing.T) { stopper: stopper, spanResolver: tsp, gossip: mockGossip, - testingKnobs: DistSQLPlannerTestingKnobs{ - OverrideHealthCheck: func(node roachpb.NodeID, addr string) error { + nodeHealth: distSQLNodeHealth{ + gossip: mockGossip, + connHealth: func(node roachpb.NodeID) error { for _, n := range tc.deadNodes { if int(node) == n { return fmt.Errorf("test node is unhealthy") @@ -783,6 +784,9 @@ func TestPartitionSpans(t *testing.T) { } return nil }, + isLive: func(nodeID roachpb.NodeID) (bool, error) { + return true, nil + }, }, } @@ -954,11 +958,15 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) { stopper: stopper, spanResolver: tsp, gossip: mockGossip, - testingKnobs: DistSQLPlannerTestingKnobs{ - OverrideHealthCheck: func(node roachpb.NodeID, addr string) error { + nodeHealth: distSQLNodeHealth{ + gossip: mockGossip, + connHealth: func(roachpb.NodeID) error { // All the nodes are healthy. return nil }, + isLive: func(roachpb.NodeID) (bool, error) { + return true, nil + }, }, } @@ -1045,10 +1053,14 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { stopper: stopper, spanResolver: tsp, gossip: mockGossip, - testingKnobs: DistSQLPlannerTestingKnobs{ - OverrideHealthCheck: func(node roachpb.NodeID, addr string) error { - // All the nodes are healthy. - return nil + nodeHealth: distSQLNodeHealth{ + gossip: mockGossip, + connHealth: func(node roachpb.NodeID) error { + _, err := mockGossip.GetNodeIDAddress(node) + return err + }, + isLive: func(roachpb.NodeID) (bool, error) { + return true, nil }, }, } @@ -1119,10 +1131,10 @@ func TestCheckNodeHealth(t *testing.T) { return true, nil } - connHealthy := func(string) error { + connHealthy := func(roachpb.NodeID) error { return nil } - connUnhealthy := func(string) error { + connUnhealthy := func(roachpb.NodeID) error { return errors.New("injected conn health error") } _ = connUnhealthy @@ -1138,18 +1150,19 @@ func TestCheckNodeHealth(t *testing.T) { for _, test := range livenessTests { t.Run("liveness", func(t *testing.T) { - if err := checkNodeHealth( - context.Background(), nodeID, desc.Address.AddressField, - DistSQLPlannerTestingKnobs{}, /* knobs */ - mockGossip, connHealthy, test.isLive, - ); !testutils.IsError(err, test.exp) { + h := distSQLNodeHealth{ + gossip: mockGossip, + connHealth: connHealthy, + isLive: test.isLive, + } + if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) { t.Fatalf("expected %v, got %v", test.exp, err) } }) } connHealthTests := []struct { - connHealth func(string) error + connHealth func(roachpb.NodeID) error exp string }{ {connHealthy, ""}, @@ -1158,14 +1171,14 @@ func TestCheckNodeHealth(t *testing.T) { for _, test := range connHealthTests { t.Run("connHealth", func(t *testing.T) { - if err := checkNodeHealth( - context.Background(), nodeID, desc.Address.AddressField, - DistSQLPlannerTestingKnobs{}, /* knobs */ - mockGossip, test.connHealth, live, - ); !testutils.IsError(err, test.exp) { + h := distSQLNodeHealth{ + gossip: mockGossip, + connHealth: test.connHealth, + isLive: live, + } + if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) { t.Fatalf("expected %v, got %v", test.exp, err) } }) } - } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 54a42744236a..75d7b1f72369 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -361,9 +360,6 @@ type ExecutorTestingKnobs struct { // execution so there'll be nothing left to abort by the time the filter runs. DisableAutoCommit bool - // DistSQLPlannerKnobs are testing knobs for DistSQLPlanner. - DistSQLPlannerKnobs DistSQLPlannerTestingKnobs - // BeforeAutoCommit is called when the Executor is about to commit the KV // transaction after running a statement in an implicit transaction, allowing // tests to inject errors into that commit. @@ -378,14 +374,6 @@ type ExecutorTestingKnobs struct { BeforeAutoCommit func(ctx context.Context, stmt string) error } -// DistSQLPlannerTestingKnobs is used to control internals of the DistSQLPlanner -// for testing purposes. -type DistSQLPlannerTestingKnobs struct { - // If OverrideSQLHealthCheck is set, we use this callback to get the health of - // a node. - OverrideHealthCheck func(node roachpb.NodeID, addrString string) error -} - // databaseCacheHolder is a thread-safe container for a *databaseCache. // It also allows clients to block until the cache is updated to a desired // state.