From 2e634d780b16b68e9746af039928f476b2808c86 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 1 Oct 2018 09:44:35 -0400 Subject: [PATCH 1/2] rpc/nodedialer: reset conn breaker after succesful connection `Dialer.DialInternalClient` does not check the circuit breaker but blindly attempts a connection and can succeed, leaving the system in a state where there is a healthy connection to a node, but the circuit breaker used for dialing is open. DistSQL checks for connection health when scheduling processors, but the connection health check does not examine the breaker. So DistSQL will proceed to schedule a processor on a node but then be unable to use the connection to that node because `Dialer.Dial` will return with a `breaker open` error. The code contains a TODO to reconcile the handling of circuit breakers in the various `Dialer` methods, but changing the handling is risky in the short term. As a stop-gap, we reset the breaker after a connection is successfully opened. Fixes #29149 Release note: None --- pkg/rpc/nodedialer/nodedialer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index d8762f53b634..931953e3fd77 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -144,8 +144,13 @@ func (n *Dialer) DialInternalClient( if err != nil { return nil, nil, err } - // TODO(bdarnell): Reconcile the different health checks and circuit - // breaker behavior in this file + // TODO(bdarnell): Reconcile the different health checks and circuit breaker + // behavior in this file. Note that this different behavior causes problems + // for higher-levels in the system. For example, DistSQL checks for + // ConnHealth when scheduling processors, but can then see attempts to send + // RPCs fail when dial fails due to an open breaker. Reset the breaker here + // as a stop-gap before the reconciliation occurs. + n.getBreaker(nodeID).Reset() if err := grpcutil.ConnectionReady(conn); err != nil { return nil, nil, err } From 569aa8e3ebf498b847aa27d1414f8c71fd203132 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Wed, 3 Oct 2018 21:41:07 -0400 Subject: [PATCH 2/2] sql: consider conn circuit breakers in distsql planning Change `DistSQLPlanner.checkNodeHealth` so that it uses `nodedialer.Dialer.ConnHealth` instead of `rpc.Context.ConnHealth`. The former is the right method to be calling to check a node's connection health. Refactor `DistSQLPlanner.checkNodeHealth` into a `distSQLNodeHealth` struct. This removed the need for `DistSQLPlannerTestingKnobs`. Enhance `nodedialer.Dialer.ConnHealth` to mark connections as unhealthy if the circuit breaker is open. This prevents DistSQL from planning processors on such nodes. Release note: None --- pkg/rpc/nodedialer/nodedialer.go | 12 ++- pkg/server/server.go | 1 - pkg/sql/distsql_physical_planner.go | 98 +++++++++--------------- pkg/sql/distsql_physical_planner_test.go | 57 ++++++++------ pkg/sql/exec_util.go | 12 --- 5 files changed, 81 insertions(+), 99 deletions(-) diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 931953e3fd77..297a818e6a28 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -144,6 +144,12 @@ func (n *Dialer) DialInternalClient( if err != nil { return nil, nil, err } + // Check to see if the connection is in the transient failure state. This can + // happen if the connection already existed, but a recent heartbeat has + // failed and we haven't yet torn down the connection. + if err := grpcutil.ConnectionReady(conn); err != nil { + return nil, nil, err + } // TODO(bdarnell): Reconcile the different health checks and circuit breaker // behavior in this file. Note that this different behavior causes problems // for higher-levels in the system. For example, DistSQL checks for @@ -151,9 +157,6 @@ func (n *Dialer) DialInternalClient( // RPCs fail when dial fails due to an open breaker. Reset the breaker here // as a stop-gap before the reconciliation occurs. n.getBreaker(nodeID).Reset() - if err := grpcutil.ConnectionReady(conn); err != nil { - return nil, nil, err - } return ctx, roachpb.NewInternalClient(conn), nil } @@ -164,6 +167,9 @@ func (n *Dialer) ConnHealth(nodeID roachpb.NodeID) error { if n == nil || n.resolver == nil { return errors.New("no node dialer configured") } + if !n.getBreaker(nodeID).Ready() { + return circuit.ErrBreakerOpen + } addr, err := n.resolver(nodeID) if err != nil { return err diff --git a/pkg/server/server.go b/pkg/server/server.go index 6c8f5a9f9f20..972676a3ae04 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -614,7 +614,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.gossip, s.stopper, s.nodeLiveness, - sqlExecutorTestingKnobs.DistSQLPlannerKnobs, s.nodeDialer, ), diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 50ca59294fba..a9b3ec9cf2b1 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -74,11 +74,9 @@ type DistSQLPlanner struct { st *cluster.Settings // The node descriptor for the gateway node that initiated this query. nodeDesc roachpb.NodeDescriptor - rpcContext *rpc.Context stopper *stop.Stopper distSQLSrv *distsqlrun.ServerImpl spanResolver distsqlplan.SpanResolver - testingKnobs DistSQLPlannerTestingKnobs // metadataTestTolerance is the minimum level required to plan metadata test // processors. @@ -90,10 +88,12 @@ type DistSQLPlanner struct { // gossip handle used to check node version compatibility. gossip *gossip.Gossip - // liveness is used to avoid planning on down nodes. - liveness *storage.NodeLiveness nodeDialer *nodedialer.Dialer + + // nodeHealth encapsulates the various node health checks to avoid planning + // on unhealthy nodes. + nodeHealth distSQLNodeHealth } const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice @@ -135,26 +135,36 @@ func NewDistSQLPlanner( gossip *gossip.Gossip, stopper *stop.Stopper, liveness *storage.NodeLiveness, - testingKnobs DistSQLPlannerTestingKnobs, nodeDialer *nodedialer.Dialer, ) *DistSQLPlanner { if liveness == nil { panic("must specify liveness") } dsp := &DistSQLPlanner{ - planVersion: planVersion, - st: st, - nodeDesc: nodeDesc, - rpcContext: rpcCtx, - stopper: stopper, - distSQLSrv: distSQLSrv, - gossip: gossip, - spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy), - liveness: liveness, - testingKnobs: testingKnobs, + planVersion: planVersion, + st: st, + nodeDesc: nodeDesc, + stopper: stopper, + distSQLSrv: distSQLSrv, + spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy), + gossip: gossip, + nodeDialer: nodeDialer, + nodeHealth: distSQLNodeHealth{ + gossip: gossip, + connHealth: nodeDialer.ConnHealth, + }, metadataTestTolerance: distsqlrun.NoExplain, - nodeDialer: nodeDialer, } + // NB: not all tests populate a NodeLiveness. Everything using the + // proper constructor NewDistSQLPlanner will, though. + if liveness != nil { + dsp.nodeHealth.isLive = liveness.IsLive + } else { + dsp.nodeHealth.isLive = func(_ roachpb.NodeID) (bool, error) { + return true, nil + } + } + dsp.initRunners() return dsp } @@ -591,40 +601,13 @@ type SpanPartition struct { Spans roachpb.Spans } -func (dsp *DistSQLPlanner) checkNodeHealth( - ctx context.Context, nodeID roachpb.NodeID, addr string, -) error { - // NB: not all tests populate a NodeLiveness. Everything using the - // proper constructor NewDistSQLPlanner will, though. - isLive := func(_ roachpb.NodeID) (bool, error) { - return true, nil - } - if dsp.liveness != nil { - isLive = dsp.liveness.IsLive - } - return checkNodeHealth(ctx, nodeID, addr, dsp.testingKnobs, dsp.gossip, dsp.rpcContext.ConnHealth, isLive) +type distSQLNodeHealth struct { + gossip *gossip.Gossip + connHealth func(roachpb.NodeID) error + isLive func(roachpb.NodeID) (bool, error) } -func checkNodeHealth( - ctx context.Context, - nodeID roachpb.NodeID, - addr string, - knobs DistSQLPlannerTestingKnobs, - g *gossip.Gossip, - connHealth func(string) error, - isLive func(roachpb.NodeID) (bool, error), -) error { - // Check if the target's node descriptor is gossiped. If it isn't, the node - // is definitely gone and has been for a while. - // - // TODO(tschottdorf): it's not clear that this adds anything to the liveness - // check below. The node descriptor TTL is an hour as of 03/2018. - if _, err := g.GetNodeIDAddress(nodeID); err != nil { - log.VEventf(ctx, 1, "not using n%d because gossip doesn't know about it. "+ - "It might have gone away from the cluster. Gossip said: %s.", nodeID, err) - return err - } - +func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) error { { // NB: as of #22658, ConnHealth does not work as expected; see the // comment within. We still keep this code for now because in @@ -633,13 +616,7 @@ func checkNodeHealth( // artifact of rpcContext's reconnection mechanism at the time of // writing). This is better than having it used in 100% of cases // (until the liveness check below kicks in). - var err error - if knobs.OverrideHealthCheck != nil { - err = knobs.OverrideHealthCheck(nodeID, addr) - } else { - err = connHealth(addr) - } - + err := h.connHealth(nodeID) if err != nil && err != rpc.ErrNotHeartbeated { // This host is known to be unhealthy. Don't use it (use the gateway // instead). Note: this can never happen for our nodeID (which @@ -649,7 +626,7 @@ func checkNodeHealth( } } { - live, err := isLive(nodeID) + live, err := h.isLive(nodeID) if err == nil && !live { err = errors.New("node is not live") } @@ -660,7 +637,7 @@ func checkNodeHealth( // Check that the node is not draining. drainingInfo := &distsqlrun.DistSQLDrainingInfo{} - if err := g.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil { + if err := h.gossip.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil { // Because draining info has no expiration, an error // implies that we have not yet received a node's // draining information. Since this information is @@ -757,7 +734,7 @@ func (dsp *DistSQLPlanner) PartitionSpans( addr, inAddrMap := planCtx.NodeAddresses[nodeID] if !inAddrMap { addr = replInfo.NodeDesc.Address.String() - if err := dsp.checkNodeHealth(ctx, nodeID, addr); err != nil { + if err := dsp.nodeHealth.check(ctx, nodeID); err != nil { addr = "" } if err == nil && addr != "" { @@ -1028,15 +1005,14 @@ func (dsp *DistSQLPlanner) CheckNodeHealthAndVersion( planCtx *PlanningCtx, desc *roachpb.NodeDescriptor, ) error { nodeID := desc.NodeID - addr := desc.Address.String() var err error - if err = dsp.checkNodeHealth(planCtx.ctx, nodeID, addr); err != nil { + if err = dsp.nodeHealth.check(planCtx.ctx, nodeID); err != nil { err = errors.New("unhealthy") } else if !dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion) { err = errors.New("incompatible version") } else { - planCtx.NodeAddresses[nodeID] = addr + planCtx.NodeAddresses[nodeID] = desc.Address.String() } return err } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 487c3ccb6cf8..7bcbca9061c0 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -774,8 +774,9 @@ func TestPartitionSpans(t *testing.T) { stopper: stopper, spanResolver: tsp, gossip: mockGossip, - testingKnobs: DistSQLPlannerTestingKnobs{ - OverrideHealthCheck: func(node roachpb.NodeID, addr string) error { + nodeHealth: distSQLNodeHealth{ + gossip: mockGossip, + connHealth: func(node roachpb.NodeID) error { for _, n := range tc.deadNodes { if int(node) == n { return fmt.Errorf("test node is unhealthy") @@ -783,6 +784,9 @@ func TestPartitionSpans(t *testing.T) { } return nil }, + isLive: func(nodeID roachpb.NodeID) (bool, error) { + return true, nil + }, }, } @@ -954,11 +958,15 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) { stopper: stopper, spanResolver: tsp, gossip: mockGossip, - testingKnobs: DistSQLPlannerTestingKnobs{ - OverrideHealthCheck: func(node roachpb.NodeID, addr string) error { + nodeHealth: distSQLNodeHealth{ + gossip: mockGossip, + connHealth: func(roachpb.NodeID) error { // All the nodes are healthy. return nil }, + isLive: func(roachpb.NodeID) (bool, error) { + return true, nil + }, }, } @@ -1045,10 +1053,14 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { stopper: stopper, spanResolver: tsp, gossip: mockGossip, - testingKnobs: DistSQLPlannerTestingKnobs{ - OverrideHealthCheck: func(node roachpb.NodeID, addr string) error { - // All the nodes are healthy. - return nil + nodeHealth: distSQLNodeHealth{ + gossip: mockGossip, + connHealth: func(node roachpb.NodeID) error { + _, err := mockGossip.GetNodeIDAddress(node) + return err + }, + isLive: func(roachpb.NodeID) (bool, error) { + return true, nil }, }, } @@ -1119,10 +1131,10 @@ func TestCheckNodeHealth(t *testing.T) { return true, nil } - connHealthy := func(string) error { + connHealthy := func(roachpb.NodeID) error { return nil } - connUnhealthy := func(string) error { + connUnhealthy := func(roachpb.NodeID) error { return errors.New("injected conn health error") } _ = connUnhealthy @@ -1138,18 +1150,19 @@ func TestCheckNodeHealth(t *testing.T) { for _, test := range livenessTests { t.Run("liveness", func(t *testing.T) { - if err := checkNodeHealth( - context.Background(), nodeID, desc.Address.AddressField, - DistSQLPlannerTestingKnobs{}, /* knobs */ - mockGossip, connHealthy, test.isLive, - ); !testutils.IsError(err, test.exp) { + h := distSQLNodeHealth{ + gossip: mockGossip, + connHealth: connHealthy, + isLive: test.isLive, + } + if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) { t.Fatalf("expected %v, got %v", test.exp, err) } }) } connHealthTests := []struct { - connHealth func(string) error + connHealth func(roachpb.NodeID) error exp string }{ {connHealthy, ""}, @@ -1158,14 +1171,14 @@ func TestCheckNodeHealth(t *testing.T) { for _, test := range connHealthTests { t.Run("connHealth", func(t *testing.T) { - if err := checkNodeHealth( - context.Background(), nodeID, desc.Address.AddressField, - DistSQLPlannerTestingKnobs{}, /* knobs */ - mockGossip, test.connHealth, live, - ); !testutils.IsError(err, test.exp) { + h := distSQLNodeHealth{ + gossip: mockGossip, + connHealth: test.connHealth, + isLive: live, + } + if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) { t.Fatalf("expected %v, got %v", test.exp, err) } }) } - } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 54a42744236a..75d7b1f72369 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -361,9 +360,6 @@ type ExecutorTestingKnobs struct { // execution so there'll be nothing left to abort by the time the filter runs. DisableAutoCommit bool - // DistSQLPlannerKnobs are testing knobs for DistSQLPlanner. - DistSQLPlannerKnobs DistSQLPlannerTestingKnobs - // BeforeAutoCommit is called when the Executor is about to commit the KV // transaction after running a statement in an implicit transaction, allowing // tests to inject errors into that commit. @@ -378,14 +374,6 @@ type ExecutorTestingKnobs struct { BeforeAutoCommit func(ctx context.Context, stmt string) error } -// DistSQLPlannerTestingKnobs is used to control internals of the DistSQLPlanner -// for testing purposes. -type DistSQLPlannerTestingKnobs struct { - // If OverrideSQLHealthCheck is set, we use this callback to get the health of - // a node. - OverrideHealthCheck func(node roachpb.NodeID, addrString string) error -} - // databaseCacheHolder is a thread-safe container for a *databaseCache. // It also allows clients to block until the cache is updated to a desired // state.