From 52681e41a11edef23a627827d4933f249a4d5ff3 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Thu, 15 Mar 2018 12:08:31 -0400 Subject: [PATCH] distsql: consult liveness during physical planning The recent PR #22658 introduced a regression in `(*rpcContext).ConnHealth` which caused DistSQL to continue planning on unavailable nodes for about an hour (`ttlNodeDescriptorGossip`) if the leaseholder cache happened to not be updated by other non-DistSQL requests. Instead, consult node liveness and avoid planning on dead nodes. This reduces the problem to a <10s window. The defunct `ConnHealth` mechanism still protects against planning in some of cases (supposedly due to a once-per-second reconnection policy) and is retained for that reason, with issue #23829 filed to decide its future. NB: I'm not putting a release note since this was introduced after 1.1. We released it in a beta, though, so it may be worth calling out there. Touches #23601. (Not fixing it because this issue should only close when there's a roachtest). Release note (bug fix): NB: this fixes a regression introduced in 2.0-beta, and not present in 1.1: Avoid planning DistSQL errors against unavailable nodes. --- pkg/rpc/context.go | 11 +++ pkg/server/server.go | 1 + pkg/sql/distsql_physical_planner.go | 80 +++++++++++++++++---- pkg/sql/distsql_physical_planner_test.go | 90 ++++++++++++++++++++++++ 4 files changed, 167 insertions(+), 15 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 78d37195014a..dc9415e9d95f 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -541,6 +541,17 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated") // ConnHealth returns whether the most recent heartbeat succeeded or not. // This should not be used as a definite status of a node's health and just used // to prioritize healthy nodes over unhealthy ones. +// +// NB: as of #22658, this does not work as you think. We kick +// connections out of the connection pool as soon as they run into an +// error, at which point their ConnHealth will reset to +// ErrNotConnected. ConnHealth does no more return a sane notion of +// "recent connection health". When it returns nil all seems well, but +// if it doesn't then this may mean that the node is simply refusing +// connections (and is thus unconnected most of the time), or that the +// node hasn't been connected to but is perfectly healthy. +// +// See #23829. func (ctx *Context) ConnHealth(target string) error { if ctx.GetLocalInternalServerForAddr(target) != nil { // The local server is always considered healthy. diff --git a/pkg/server/server.go b/pkg/server/server.go index d681f8688ba8..0c1bb089f3f9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -523,6 +523,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.distSender, s.gossip, s.stopper, + s.nodeLiveness, sqlExecutorTestingKnobs.DistSQLPlannerKnobs, ), ExecLogger: log.NewSecondaryLogger(nil, "sql-exec", true /*enableGc*/, false /*forceSyncWrites*/), diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 407a41506992..0d0d1b2db285 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -83,6 +84,8 @@ type DistSQLPlanner struct { // gossip handle used to check node version compatibility. gossip *gossip.Gossip + // liveness is used to avoid planning on down nodes. + liveness *storage.NodeLiveness } const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice @@ -123,8 +126,12 @@ func NewDistSQLPlanner( distSender *kv.DistSender, gossip *gossip.Gossip, stopper *stop.Stopper, + liveness *storage.NodeLiveness, testingKnobs DistSQLPlannerTestingKnobs, ) *DistSQLPlanner { + if liveness == nil { + panic("must specify liveness") + } dsp := &DistSQLPlanner{ planVersion: planVersion, st: st, @@ -134,6 +141,7 @@ func NewDistSQLPlanner( distSQLSrv: distSQLSrv, gossip: gossip, spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy), + liveness: liveness, testingKnobs: testingKnobs, } dsp.initRunners() @@ -515,31 +523,73 @@ type spanPartition struct { func (dsp *DistSQLPlanner) checkNodeHealth( ctx context.Context, nodeID roachpb.NodeID, addr string, ) error { - // Check if the node is still in gossip - i.e. if it hasn't been - // decommissioned or overridden by another node at the same address. - if _, err := dsp.gossip.GetNodeIDAddress(nodeID); err != nil { + // NB: not all tests populate a NodeLiveness. Everything using the + // proper constructor NewDistSQLPlanner will, though. + isLive := func(_ roachpb.NodeID) (bool, error) { + return true, nil + } + if dsp.liveness != nil { + isLive = dsp.liveness.IsLive + } + return checkNodeHealth(ctx, nodeID, addr, dsp.testingKnobs, dsp.gossip, dsp.rpcContext.ConnHealth, isLive) +} + +func checkNodeHealth( + ctx context.Context, + nodeID roachpb.NodeID, + addr string, + knobs DistSQLPlannerTestingKnobs, + g *gossip.Gossip, + connHealth func(string) error, + isLive func(roachpb.NodeID) (bool, error), +) error { + // Check if the target's node descriptor is gossiped. If it isn't, the node + // is definitely gone and has been for a while. + // + // TODO(tschottdorf): it's not clear that this adds anything to the liveness + // check below. The node descriptor TTL is an hour as of 03/2018. + if _, err := g.GetNodeIDAddress(nodeID); err != nil { log.VEventf(ctx, 1, "not using n%d because gossip doesn't know about it. "+ "It might have gone away from the cluster. Gossip said: %s.", nodeID, err) return err } - var err error - if dsp.testingKnobs.OverrideHealthCheck != nil { - err = dsp.testingKnobs.OverrideHealthCheck(nodeID, addr) - } else { - err = dsp.rpcContext.ConnHealth(addr) + { + // NB: as of #22658, ConnHealth does not work as expected; see the + // comment within. We still keep this code for now because in + // practice, once the node is down it will prevent using this node + // 90% of the time (it gets used around once per second as an + // artifact of rpcContext's reconnection mechanism at the time of + // writing). This is better than having it used in 100% of cases + // (until the liveness check below kicks in). + var err error + if knobs.OverrideHealthCheck != nil { + err = knobs.OverrideHealthCheck(nodeID, addr) + } else { + err = connHealth(addr) + } + + if err != nil && err != rpc.ErrNotConnected && err != rpc.ErrNotHeartbeated { + // This host is known to be unhealthy. Don't use it (use the gateway + // instead). Note: this can never happen for our nodeID (which + // always has its address in the nodeMap). + log.VEventf(ctx, 1, "marking n%d as unhealthy for this plan: %v", nodeID, err) + return err + } } - if err != nil && err != rpc.ErrNotConnected && err != rpc.ErrNotHeartbeated { - // This host is known to be unhealthy. Don't use it (use the gateway - // instead). Note: this can never happen for our nodeID (which - // always has its address in the nodeMap). - log.VEventf(ctx, 1, "marking n%d as unhealthy for this plan: %v", nodeID, err) - return err + { + live, err := isLive(nodeID) + if err == nil && !live { + err = errors.New("node is not live") + } + if err != nil { + return errors.Wrapf(err, "not using n%d due to liveness", nodeID) + } } // Check that the node is not draining. drainingInfo := &distsqlrun.DistSQLDrainingInfo{} - if err := dsp.gossip.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil { + if err := g.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil { // Because draining info has no expiration, an error // implies that we have not yet received a node's // draining information. Since this information is diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index f18d12e52f39..f6b8f0332cf7 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -1068,3 +1068,93 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { t.Errorf("expected partitions:\n %v\ngot:\n %v", expectedPartitions, resMap) } } + +func TestCheckNodeHealth(t *testing.T) { + defer leaktest.AfterTest(t)() + + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + const nodeID = roachpb.NodeID(5) + + mockGossip := gossip.NewTest(nodeID, nil /* rpcContext */, nil, /* grpcServer */ + stopper, metric.NewRegistry()) + + desc := &roachpb.NodeDescriptor{ + NodeID: nodeID, + Address: util.UnresolvedAddr{NetworkField: "tcp", AddressField: "testaddr"}, + } + if err := mockGossip.SetNodeDescriptor(desc); err != nil { + t.Fatal(err) + } + if err := mockGossip.AddInfoProto( + gossip.MakeDistSQLNodeVersionKey(nodeID), + &distsqlrun.DistSQLVersionGossipInfo{ + MinAcceptedVersion: distsqlrun.MinAcceptedVersion, + Version: distsqlrun.Version, + }, + 0, // ttl - no expiration + ); err != nil { + t.Fatal(err) + } + + errLive := func(roachpb.NodeID) (bool, error) { + return false, errors.New("injected liveness error") + } + notLive := func(roachpb.NodeID) (bool, error) { + return false, nil + } + live := func(roachpb.NodeID) (bool, error) { + return true, nil + } + + connHealthy := func(string) error { + return nil + } + connUnhealthy := func(string) error { + return errors.New("injected conn health error") + } + _ = connUnhealthy + + livenessTests := []struct { + isLive func(roachpb.NodeID) (bool, error) + exp string + }{ + {live, ""}, + {errLive, "not using n5 due to liveness: injected liveness error"}, + {notLive, "not using n5 due to liveness: node is not live"}, + } + + for _, test := range livenessTests { + t.Run("liveness", func(t *testing.T) { + if err := checkNodeHealth( + context.Background(), nodeID, desc.Address.AddressField, + DistSQLPlannerTestingKnobs{}, /* knobs */ + mockGossip, connHealthy, test.isLive, + ); !testutils.IsError(err, test.exp) { + t.Fatalf("expected %v, got %v", test.exp, err) + } + }) + } + + connHealthTests := []struct { + connHealth func(string) error + exp string + }{ + {connHealthy, ""}, + {connUnhealthy, "injected conn health error"}, + } + + for _, test := range connHealthTests { + t.Run("connHealth", func(t *testing.T) { + if err := checkNodeHealth( + context.Background(), nodeID, desc.Address.AddressField, + DistSQLPlannerTestingKnobs{}, /* knobs */ + mockGossip, test.connHealth, live, + ); !testutils.IsError(err, test.exp) { + t.Fatalf("expected %v, got %v", test.exp, err) + } + }) + } + +}