Skip to content

Commit

Permalink
Merge #54936
Browse files Browse the repository at this point in the history
54936: kvserver,rpc: set the stage for maintaining a local blocklist of permanently removed nodes r=irfansharif a=tbg

As part of long-running migrations (#54903) we need to make sure, to the best of our abilities, that decommissioned nodes can not return to the cluster. We will achieve this by persisting this knowledge locally (in an eventually consistent way, via gossip) and ading checks in the RPC layer.

This PR stops short of adding the actual storage but sets the stage for doing so.

First seven commits are from another PR and should be ignored here.

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Oct 14, 2020
2 parents 05826e3 + 8552175 commit cf805a5
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 203 deletions.
54 changes: 32 additions & 22 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,10 +906,11 @@ func (m *multiTestContext) addStore(idx int) {
ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer}
m.populateDB(idx, cfg.Settings, stopper)
nlActive, nlRenewal := cfg.NodeLivenessDurations()
m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(
ambient, m.clocks[idx], m.dbs[idx], m.gossips[idx],
nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval,
)
m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{
AmbientCtx: ambient, Clock: m.clocks[idx], DB: m.dbs[idx], Gossip: m.gossips[idx],
LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings,
HistogramWindowInterval: metric.TestSampleInterval,
})
m.populateStorePool(idx, cfg, m.nodeLivenesses[idx])
cfg.DB = m.dbs[idx]
cfg.NodeLiveness = m.nodeLivenesses[idx]
Expand Down Expand Up @@ -1018,15 +1019,19 @@ func (m *multiTestContext) addStore(idx int) {
m.t.Fatal(err)
}
}
m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) {
now := clock.Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
ran.Do(func() {
close(ran.ch)
})
})
m.nodeLivenesses[idx].Start(ctx,
kvserver.NodeLivenessStartOptions{
Stopper: stopper,
Engines: m.engines[idx : idx+1],
OnSelfLive: func(ctx context.Context) {
now := clock.Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
ran.Do(func() {
close(ran.ch)
})
}})

store.WaitForInit()

Expand Down Expand Up @@ -1095,10 +1100,11 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) {
cfg := m.makeStoreConfig(i)
m.populateDB(i, m.storeConfig.Settings, stopper)
nlActive, nlRenewal := cfg.NodeLivenessDurations()
m.nodeLivenesses[i] = kvserver.NewNodeLiveness(
log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, m.clocks[i], m.dbs[i],
m.gossips[i], nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval,
)
m.nodeLivenesses[i] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{
AmbientCtx: log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, Clock: m.clocks[i], DB: m.dbs[i],
Gossip: m.gossips[i], LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings,
HistogramWindowInterval: metric.TestSampleInterval,
})
m.populateStorePool(i, cfg, m.nodeLivenesses[i])
cfg.DB = m.dbs[i]
cfg.NodeLiveness = m.nodeLivenesses[i]
Expand All @@ -1115,11 +1121,15 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) {
m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Reset()
m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Reset()
m.mu.Unlock()
cfg.NodeLiveness.StartHeartbeat(ctx, stopper, m.engines[i:i+1], func(ctx context.Context) {
now := m.clocks[i].Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
cfg.NodeLiveness.Start(ctx, kvserver.NodeLivenessStartOptions{
Stopper: stopper,
Engines: m.engines[i : i+1],
OnSelfLive: func(ctx context.Context) {
now := m.clocks[i].Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
},
})
}

Expand Down
139 changes: 85 additions & 54 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ type NodeLiveness struct {
// heartbeatPaused contains an atomically-swapped number representing a bool
// (1 or 0). heartbeatToken is a channel containing a token which is taken
// when heartbeating or when pausing the heartbeat. Used for testing.
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics LivenessMetrics
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics LivenessMetrics
onNodeDecommissioned func(kvserverpb.Liveness) // noop if nil

mu struct {
syncutil.RWMutex
callbacks []IsLiveCallback
onIsLive []IsLiveCallback // see NodeLivenessOptions.OnSelfLive
// nodes is an in-memory cache of liveness records that NodeLiveness
// knows about (having learnt of them through gossip or through KV).
// It's a look-aside cache, and is accessed primarily through
Expand All @@ -205,7 +206,7 @@ type NodeLiveness struct {
// - Update the liveness record in KV
// - Add the updated record into this cache (see `maybeUpdate`)
//
// (See `StartHeartbeat` for an example of this pattern.)
// (See `Start` for an example of this pattern.)
//
// What we want instead is a bit simpler:
//
Expand All @@ -216,11 +217,11 @@ type NodeLiveness struct {
//
// More concretely, we want `getLivenessRecordFromKV` to be tucked away
// within `getLivenessLocked`.
nodes map[roachpb.NodeID]LivenessRecord
heartbeatCallback HeartbeatCallback
nodes map[roachpb.NodeID]LivenessRecord
onSelfLive HeartbeatCallback // set in Start()
// Before heartbeating, we write to each of these engines to avoid
// maintaining liveness when a local disks is stalled.
engines []storage.Engine
engines []storage.Engine // set in Start()
}
}

Expand All @@ -235,41 +236,61 @@ type LivenessRecord struct {
raw []byte
}

// NodeLivenessOptions is the input to NewNodeLiveness.
//
// Note that there is yet another struct, NodeLivenessStartOptions, which
// is supplied when the instance is started. This is necessary as during
// server startup, some inputs can only be constructed at Start time. The
// separation has grown organically and various options could in principle
// be moved back and forth.
type NodeLivenessOptions struct {
AmbientCtx log.AmbientContext
Settings *cluster.Settings
Gossip *gossip.Gossip
Clock *hlc.Clock
DB *kv.DB
LivenessThreshold time.Duration
RenewalDuration time.Duration
HistogramWindowInterval time.Duration
// OnNodeDecommissioned is invoked whenever the instance learns that a
// node was permanently removed from the cluster. This method must be
// idempotent as it may be invoked multiple times and defaults to a
// noop.
OnNodeDecommissioned func(kvserverpb.Liveness)
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(
ambient log.AmbientContext,
clock *hlc.Clock,
db *kv.DB,
g *gossip.Gossip,
livenessThreshold time.Duration,
renewalDuration time.Duration,
st *cluster.Settings,
histogramWindow time.Duration,
) *NodeLiveness {
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
nl := &NodeLiveness{
ambientCtx: ambient,
clock: clock,
db: db,
gossip: g,
livenessThreshold: livenessThreshold,
heartbeatInterval: livenessThreshold - renewalDuration,
selfSem: make(chan struct{}, 1),
st: st,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
ambientCtx: opts.AmbientCtx,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
heartbeatInterval: opts.LivenessThreshold - opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
}
nl.metrics = LivenessMetrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
HeartbeatsInFlight: metric.NewGauge(metaHeartbeatsInFlight),
HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses),
HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures),
EpochIncrements: metric.NewCounter(metaEpochIncrements),
HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, histogramWindow),
HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, opts.HistogramWindowInterval),
}
nl.mu.nodes = make(map[roachpb.NodeID]LivenessRecord)
nl.heartbeatToken <- struct{}{}

// NB: we should consider moving this registration to .Start() once we
// have ensured that nobody uses the server's KV client (kv.DB) before
// nl.Start() is invoked. At the time of writing this invariant does
// not hold (which is a problem, since the node itself won't be live
// at this point, and requests routed to it will hang).
livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix)
nl.gossip.RegisterCallback(livenessRegex, nl.livenessGossipUpdate)

Expand Down Expand Up @@ -499,7 +520,7 @@ type livenessUpdate struct {
// given node ID. This is typically used when adding a new node to a running
// cluster, or when bootstrapping a cluster through a given node.
//
// This is a pared down version of StartHeartbeat; it exists only to durably
// This is a pared down version of Start; it exists only to durably
// persist a liveness to record the node's existence. Nodes will heartbeat their
// records after starting up, and incrementing to epoch=1 when doing so, at
// which point we'll set an appropriate expiration timestamp, gossip the
Expand Down Expand Up @@ -619,32 +640,39 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
return liveness.IsLive(nl.clock.Now().GoTime()), nil
}

// StartHeartbeat starts a periodic heartbeat to refresh this node's last
// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Stopper *stop.Stopper
Engines []storage.Engine
// OnSelfLive is invoked after every successful heartbeat
// of the local liveness instance's heartbeat loop.
OnSelfLive HeartbeatCallback
}

// Start starts a periodic heartbeat to refresh this node's last
// heartbeat in the node liveness table. The optionally provided
// HeartbeatCallback will be invoked whenever this node updates its own
// liveness. The slice of engines will be written to before each heartbeat to
// avoid maintaining liveness in the presence of disk stalls.
func (nl *NodeLiveness) StartHeartbeat(
ctx context.Context, stopper *stop.Stopper, engines []storage.Engine, alive HeartbeatCallback,
) {
log.VEventf(ctx, 1, "starting liveness heartbeat")
// HeartbeatCallback will be invoked whenever this node updates its
// own liveness. The slice of engines will be written to before each
// heartbeat to avoid maintaining liveness in the presence of disk stalls.
func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) {
log.VEventf(ctx, 1, "starting node liveness instance")
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()
retryOpts.Closer = opts.Stopper.ShouldQuiesce()

if len(engines) == 0 {
if len(opts.Engines) == 0 {
// Avoid silently forgetting to pass the engines. It happened before.
log.Fatalf(ctx, "must supply at least one engine")
}

nl.mu.Lock()
nl.mu.heartbeatCallback = alive
nl.mu.engines = engines
nl.mu.onSelfLive = opts.OnSelfLive
nl.mu.engines = opts.Engines
nl.mu.Unlock()

stopper.RunWorker(ctx, func(context.Context) {
opts.Stopper.RunWorker(ctx, func(context.Context) {
ambient := nl.ambientCtx
ambient.AddLogTag("liveness-hb", nil)
ctx, cancel := stopper.WithCancelOnStop(context.Background())
ctx, cancel := opts.Stopper.WithCancelOnStop(context.Background())
defer cancel()
ctx, sp := ambient.AnnotateCtxWithSpan(ctx, "liveness heartbeat loop")
defer sp.Finish()
Expand All @@ -655,7 +683,7 @@ func (nl *NodeLiveness) StartHeartbeat(
for {
select {
case <-nl.heartbeatToken:
case <-stopper.ShouldStop():
case <-opts.Stopper.ShouldStop():
return
}
// Give the context a timeout approximately as long as the time we
Expand Down Expand Up @@ -692,7 +720,7 @@ func (nl *NodeLiveness) StartHeartbeat(
nl.heartbeatToken <- struct{}{}
select {
case <-ticker.C:
case <-stopper.ShouldStop():
case <-opts.Stopper.ShouldStop():
return
}
}
Expand Down Expand Up @@ -726,7 +754,7 @@ func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() {
}

// PauseSynchronousHeartbeatsForTest disables all node liveness
// heartbeats triggered from outside the normal StartHeartbeat loop.
// heartbeats triggered from outside the normal Start loop.
// Returns a closure to call to re-enable synchronous heartbeats. Only
// safe for use in tests.
func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() {
Expand All @@ -739,7 +767,7 @@ func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() {
}

// PauseAllHeartbeatsForTest disables all node liveness heartbeats,
// including those triggered from outside the normal StartHeartbeat
// including those triggered from outside the normal Start
// loop. Returns a closure to call to re-enable heartbeats. Only safe
// for use in tests.
func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() {
Expand Down Expand Up @@ -769,7 +797,7 @@ var errNodeAlreadyLive = errors.New("node already live")
// TODO(bdarnell): Fix error semantics here.
//
// This method is rarely called directly; heartbeats are normally sent
// by the StartHeartbeat loop.
// by the Start loop.
// TODO(bdarnell): Should we just remove this synchronous heartbeat completely?
func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness kvserverpb.Liveness) error {
return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */)
Expand Down Expand Up @@ -1113,7 +1141,7 @@ func (nl *NodeLiveness) Metrics() LivenessMetrics {
func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.callbacks = append(nl.mu.callbacks, cb)
nl.mu.onIsLive = append(nl.mu.onIsLive, cb)
}

// updateLiveness does a conditional put on the node liveness record for the
Expand Down Expand Up @@ -1237,7 +1265,7 @@ func (nl *NodeLiveness) updateLivenessAttempt(
}

nl.mu.RLock()
cb := nl.mu.heartbeatCallback
cb := nl.mu.onSelfLive
nl.mu.RUnlock()
if cb != nil {
cb(ctx)
Expand All @@ -1261,10 +1289,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness
shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness)
}

var callbacks []IsLiveCallback
var onIsLive []IsLiveCallback
if shouldReplace {
nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec
callbacks = append(callbacks, nl.mu.callbacks...)
onIsLive = append(onIsLive, nl.mu.onIsLive...)
}
nl.mu.Unlock()

Expand All @@ -1274,10 +1302,13 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness

now := nl.clock.Now().GoTime()
if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) {
for _, fn := range callbacks {
for _, fn := range onIsLive {
fn(newLivenessRec.Liveness)
}
}
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
nl.onNodeDecommissioned(newLivenessRec.Liveness)
}
}

// shouldReplaceLiveness checks to see if the new liveness is in fact newer
Expand Down
Loading

0 comments on commit cf805a5

Please sign in to comment.