Skip to content

Commit

Permalink
Merge pull request #4292 from hashicorp/f-heartbeat
Browse files Browse the repository at this point in the history
Emit heartbeat and node registration events
  • Loading branch information
dadgar authored May 22, 2018
2 parents 774dd0f + cfc9d1f commit 0061c38
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 128 deletions.
2 changes: 1 addition & 1 deletion command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestHTTP_NodeQuery(t *testing.T) {
if len(n.Events) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.Events[0].Message != "Node Registered" {
if n.Events[0].Message != "Node registered" {
t.Fatalf("Expected node registration event to be first node event: %#v", n)
}
})
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status); err != nil {
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err)
return err
}
Expand Down
37 changes: 17 additions & 20 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func TestFSM_DeregisterNode(t *testing.T) {

func TestFSM_UpdateNodeStatus(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

Expand All @@ -257,43 +258,39 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
require.Nil(resp)

// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)

event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
req2 := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
require.Nil(resp)

// Verify the status is ready.
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node.Status != structs.NodeStatusReady {
t.Fatalf("bad node: %#v", node)
}
require.NoError(err)
require.Equal(structs.NodeStatusReady, node.Status)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)

// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
Expand Down
9 changes: 7 additions & 2 deletions nomad/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ const (
// heartbeatNotLeader is the error string returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeader = "failed to reset heartbeat since server is not leader"

// NodeHeartbeatEventMissed is the event used when the Nodes heartbeat is
// missed.
NodeHeartbeatEventMissed = "Node heartbeat missed"
)

var (
Expand Down Expand Up @@ -123,8 +127,9 @@ func (s *Server) invalidateHeartbeat(id string) {

// Make a request to update the node status
req := structs.NodeUpdateStatusRequest{
NodeID: id,
Status: structs.NodeStatusDown,
NodeID: id,
Status: structs.NodeStatusDown,
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
WriteRequest: structs.WriteRequest{
Region: s.config.Region,
},
Expand Down
16 changes: 6 additions & 10 deletions nomad/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,30 +140,26 @@ func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) {

func TestHeartbeat_InvalidateHeartbeat(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a node
node := mock.Node()
state := s1.fsm.State()
err := state.UpsertNode(1, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(state.UpsertNode(1, node))

// This should cause a status update
s1.invalidateHeartbeat(node.ID)

// Check it is updated
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !out.TerminalStatus() {
t.Fatalf("should update node: %#v", out)
}
require.NoError(err)
require.True(out.TerminalStatus())
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventMissed, out.Events[1].Message)
}

func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
// ineligible
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"

// NodeHeartbeatEventReregistered is the message used when the node becomes
// reregistered by the heartbeat.
NodeHeartbeatEventReregistered = "Node reregistered by heartbeat"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -367,6 +371,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Commit this update via Raft
var index uint64
if node.Status != args.Status {
// Attach an event if we are updating the node status to ready when it
// is down via a heartbeat
if node.Status == structs.NodeStatusDown && args.NodeEvent == nil {
args.NodeEvent = structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeHeartbeatEventReregistered)
}

_, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err)
Expand Down
55 changes: 53 additions & 2 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
Expand Down Expand Up @@ -508,6 +509,56 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Check that we have no client connections
require.Empty(s1.connectedNodes())

// Create the register request but make the node down
node := mock.Node()
node.Status = structs.NodeStatusDown
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))

// Update the status
dereg := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusInit,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2))
require.NotZero(resp2.Index)

// Check for heartbeat interval
ttl := resp2.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.NoError(err)
require.NotNil(out)
require.EqualValues(resp2.Index, out.ModifyIndex)
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message)
}

func TestClientEndpoint_Register_GetEvals(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
Expand Down Expand Up @@ -1222,7 +1273,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
if len(resp2.Node.Events) != 1 {
t.Fatalf("Did not set node events: %#v", resp2.Node)
}
if resp2.Node.Events[0].Message != "Node Registered" {
if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered {
t.Fatalf("Did not set node register event correctly: %#v", resp2.Node)
}

Expand Down Expand Up @@ -2622,7 +2673,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {

// Node status update triggers watches
time.AfterFunc(100*time.Millisecond, func() {
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown)
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil)
})

req.MinQueryIndex = 38
Expand Down
18 changes: 8 additions & 10 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package nomad
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strings"
"testing"
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -531,22 +529,22 @@ func TestServer_InvalidSchedulers(t *testing.T) {
t.Parallel()
require := require.New(t)

// Set the config to not have the core scheduler
config := DefaultConfig()
config.DevMode = true
config.LogOutput = testlog.NewWriter(t)
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
logger := log.New(config.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
logger := testlog.Logger(t)
s := &Server{
config: config,
logger: logger,
}

// Set the config to not have the core scheduler
config.EnabledSchedulers = []string{"batch"}
_, err := NewServer(config, catalog, logger)
err := s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "scheduler not enabled")

// Set the config to have an unknown scheduler
config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"}
_, err = NewServer(config, catalog, logger)
err = s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "foo")
}
33 changes: 27 additions & 6 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventRegistered = "Node registered"

// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventReregistered = "Node re-registered"
)

// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
Expand Down Expand Up @@ -530,17 +540,23 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
// Retain node events that have already been set on the node
node.Events = exist.Events

// If we are transitioning from down, record the re-registration
if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown {
appendNodeEvents(index, node, []*structs.NodeEvent{
structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventReregistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))})
}

node.Drain = exist.Drain // Retain the drain mode
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
nodeEvent := &structs.NodeEvent{
Message: "Node Registered",
Subsystem: "Cluster",
Timestamp: time.Unix(node.StatusUpdatedAt, 0),
}
nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventRegistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))
node.Events = []*structs.NodeEvent{nodeEvent}
node.CreateIndex = index
node.ModifyIndex = index
Expand Down Expand Up @@ -585,7 +601,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
}

// UpdateNodeStatus is used to update the status of a node
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error {
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

Expand All @@ -602,6 +618,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()

// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
}

// Update the status in the copy
copyNode.Status = status
copyNode.ModifyIndex = index
Expand Down
Loading

0 comments on commit 0061c38

Please sign in to comment.