Skip to content

Commit

Permalink
Merge pull request #23834 from tschottdorf/distsql/plan-dead-nodes
Browse files Browse the repository at this point in the history
distsql: consult liveness during physical planning
  • Loading branch information
tbg authored Mar 15, 2018
2 parents ae2778a + 52681e4 commit 0c03d37
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 15 deletions.
11 changes: 11 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,17 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated")
// ConnHealth returns whether the most recent heartbeat succeeded or not.
// This should not be used as a definite status of a node's health and just used
// to prioritize healthy nodes over unhealthy ones.
//
// NB: as of #22658, this does not work as you think. We kick
// connections out of the connection pool as soon as they run into an
// error, at which point their ConnHealth will reset to
// ErrNotConnected. ConnHealth does no more return a sane notion of
// "recent connection health". When it returns nil all seems well, but
// if it doesn't then this may mean that the node is simply refusing
// connections (and is thus unconnected most of the time), or that the
// node hasn't been connected to but is perfectly healthy.
//
// See #23829.
func (ctx *Context) ConnHealth(target string) error {
if ctx.GetLocalInternalServerForAddr(target) != nil {
// The local server is always considered healthy.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.distSender,
s.gossip,
s.stopper,
s.nodeLiveness,
sqlExecutorTestingKnobs.DistSQLPlannerKnobs,
),
ExecLogger: log.NewSecondaryLogger(nil, "sql-exec", true /*enableGc*/, false /*forceSyncWrites*/),
Expand Down
80 changes: 65 additions & 15 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -83,6 +84,8 @@ type DistSQLPlanner struct {

// gossip handle used to check node version compatibility.
gossip *gossip.Gossip
// liveness is used to avoid planning on down nodes.
liveness *storage.NodeLiveness
}

const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice
Expand Down Expand Up @@ -123,8 +126,12 @@ func NewDistSQLPlanner(
distSender *kv.DistSender,
gossip *gossip.Gossip,
stopper *stop.Stopper,
liveness *storage.NodeLiveness,
testingKnobs DistSQLPlannerTestingKnobs,
) *DistSQLPlanner {
if liveness == nil {
panic("must specify liveness")
}
dsp := &DistSQLPlanner{
planVersion: planVersion,
st: st,
Expand All @@ -134,6 +141,7 @@ func NewDistSQLPlanner(
distSQLSrv: distSQLSrv,
gossip: gossip,
spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy),
liveness: liveness,
testingKnobs: testingKnobs,
}
dsp.initRunners()
Expand Down Expand Up @@ -515,31 +523,73 @@ type spanPartition struct {
func (dsp *DistSQLPlanner) checkNodeHealth(
ctx context.Context, nodeID roachpb.NodeID, addr string,
) error {
// Check if the node is still in gossip - i.e. if it hasn't been
// decommissioned or overridden by another node at the same address.
if _, err := dsp.gossip.GetNodeIDAddress(nodeID); err != nil {
// NB: not all tests populate a NodeLiveness. Everything using the
// proper constructor NewDistSQLPlanner will, though.
isLive := func(_ roachpb.NodeID) (bool, error) {
return true, nil
}
if dsp.liveness != nil {
isLive = dsp.liveness.IsLive
}
return checkNodeHealth(ctx, nodeID, addr, dsp.testingKnobs, dsp.gossip, dsp.rpcContext.ConnHealth, isLive)
}

func checkNodeHealth(
ctx context.Context,
nodeID roachpb.NodeID,
addr string,
knobs DistSQLPlannerTestingKnobs,
g *gossip.Gossip,
connHealth func(string) error,
isLive func(roachpb.NodeID) (bool, error),
) error {
// Check if the target's node descriptor is gossiped. If it isn't, the node
// is definitely gone and has been for a while.
//
// TODO(tschottdorf): it's not clear that this adds anything to the liveness
// check below. The node descriptor TTL is an hour as of 03/2018.
if _, err := g.GetNodeIDAddress(nodeID); err != nil {
log.VEventf(ctx, 1, "not using n%d because gossip doesn't know about it. "+
"It might have gone away from the cluster. Gossip said: %s.", nodeID, err)
return err
}

var err error
if dsp.testingKnobs.OverrideHealthCheck != nil {
err = dsp.testingKnobs.OverrideHealthCheck(nodeID, addr)
} else {
err = dsp.rpcContext.ConnHealth(addr)
{
// NB: as of #22658, ConnHealth does not work as expected; see the
// comment within. We still keep this code for now because in
// practice, once the node is down it will prevent using this node
// 90% of the time (it gets used around once per second as an
// artifact of rpcContext's reconnection mechanism at the time of
// writing). This is better than having it used in 100% of cases
// (until the liveness check below kicks in).
var err error
if knobs.OverrideHealthCheck != nil {
err = knobs.OverrideHealthCheck(nodeID, addr)
} else {
err = connHealth(addr)
}

if err != nil && err != rpc.ErrNotConnected && err != rpc.ErrNotHeartbeated {
// This host is known to be unhealthy. Don't use it (use the gateway
// instead). Note: this can never happen for our nodeID (which
// always has its address in the nodeMap).
log.VEventf(ctx, 1, "marking n%d as unhealthy for this plan: %v", nodeID, err)
return err
}
}
if err != nil && err != rpc.ErrNotConnected && err != rpc.ErrNotHeartbeated {
// This host is known to be unhealthy. Don't use it (use the gateway
// instead). Note: this can never happen for our nodeID (which
// always has its address in the nodeMap).
log.VEventf(ctx, 1, "marking n%d as unhealthy for this plan: %v", nodeID, err)
return err
{
live, err := isLive(nodeID)
if err == nil && !live {
err = errors.New("node is not live")
}
if err != nil {
return errors.Wrapf(err, "not using n%d due to liveness", nodeID)
}
}

// Check that the node is not draining.
drainingInfo := &distsqlrun.DistSQLDrainingInfo{}
if err := dsp.gossip.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil {
if err := g.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil {
// Because draining info has no expiration, an error
// implies that we have not yet received a node's
// draining information. Since this information is
Expand Down
90 changes: 90 additions & 0 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,3 +1068,93 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
t.Errorf("expected partitions:\n %v\ngot:\n %v", expectedPartitions, resMap)
}
}

func TestCheckNodeHealth(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

const nodeID = roachpb.NodeID(5)

mockGossip := gossip.NewTest(nodeID, nil /* rpcContext */, nil, /* grpcServer */
stopper, metric.NewRegistry())

desc := &roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.UnresolvedAddr{NetworkField: "tcp", AddressField: "testaddr"},
}
if err := mockGossip.SetNodeDescriptor(desc); err != nil {
t.Fatal(err)
}
if err := mockGossip.AddInfoProto(
gossip.MakeDistSQLNodeVersionKey(nodeID),
&distsqlrun.DistSQLVersionGossipInfo{
MinAcceptedVersion: distsqlrun.MinAcceptedVersion,
Version: distsqlrun.Version,
},
0, // ttl - no expiration
); err != nil {
t.Fatal(err)
}

errLive := func(roachpb.NodeID) (bool, error) {
return false, errors.New("injected liveness error")
}
notLive := func(roachpb.NodeID) (bool, error) {
return false, nil
}
live := func(roachpb.NodeID) (bool, error) {
return true, nil
}

connHealthy := func(string) error {
return nil
}
connUnhealthy := func(string) error {
return errors.New("injected conn health error")
}
_ = connUnhealthy

livenessTests := []struct {
isLive func(roachpb.NodeID) (bool, error)
exp string
}{
{live, ""},
{errLive, "not using n5 due to liveness: injected liveness error"},
{notLive, "not using n5 due to liveness: node is not live"},
}

for _, test := range livenessTests {
t.Run("liveness", func(t *testing.T) {
if err := checkNodeHealth(
context.Background(), nodeID, desc.Address.AddressField,
DistSQLPlannerTestingKnobs{}, /* knobs */
mockGossip, connHealthy, test.isLive,
); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
}
})
}

connHealthTests := []struct {
connHealth func(string) error
exp string
}{
{connHealthy, ""},
{connUnhealthy, "injected conn health error"},
}

for _, test := range connHealthTests {
t.Run("connHealth", func(t *testing.T) {
if err := checkNodeHealth(
context.Background(), nodeID, desc.Address.AddressField,
DistSQLPlannerTestingKnobs{}, /* knobs */
mockGossip, test.connHealth, live,
); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
}
})
}

}

0 comments on commit 0c03d37

Please sign in to comment.