From ced0fe8a81a724a91fc825c96d24af54f03df19e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 10:28:58 +0200 Subject: [PATCH 1/8] kvserver: group options to NodeLiveness The signature of NewNodeLiveness is now a single options struct. Release note: None --- pkg/kv/kvserver/client_test.go | 18 +++++---- pkg/kv/kvserver/node_liveness.go | 39 ++++++++++--------- pkg/server/server.go | 20 +++++----- .../localtestcluster/local_test_cluster.go | 20 +++++----- 4 files changed, 51 insertions(+), 46 deletions(-) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 1942ff5e613c..eb167ca56d8b 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -906,10 +906,11 @@ func (m *multiTestContext) addStore(idx int) { ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer} m.populateDB(idx, cfg.Settings, stopper) nlActive, nlRenewal := cfg.NodeLivenessDurations() - m.nodeLivenesses[idx] = kvserver.NewNodeLiveness( - ambient, m.clocks[idx], m.dbs[idx], m.gossips[idx], - nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval, - ) + m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: ambient, Clock: m.clocks[idx], DB: m.dbs[idx], Gossip: m.gossips[idx], + LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings, + HistogramWindowInterval: metric.TestSampleInterval, + }) m.populateStorePool(idx, cfg, m.nodeLivenesses[idx]) cfg.DB = m.dbs[idx] cfg.NodeLiveness = m.nodeLivenesses[idx] @@ -1095,10 +1096,11 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { cfg := m.makeStoreConfig(i) m.populateDB(i, m.storeConfig.Settings, stopper) nlActive, nlRenewal := cfg.NodeLivenessDurations() - m.nodeLivenesses[i] = kvserver.NewNodeLiveness( - log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, m.clocks[i], m.dbs[i], - m.gossips[i], nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval, - ) + m.nodeLivenesses[i] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, Clock: m.clocks[i], DB: m.dbs[i], + Gossip: m.gossips[i], LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings, + HistogramWindowInterval: metric.TestSampleInterval, + }) m.populateStorePool(i, cfg, m.nodeLivenesses[i]) cfg.DB = m.dbs[i] cfg.NodeLiveness = m.nodeLivenesses[i] diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 670050169394..79b0e0c8cf73 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -235,27 +235,30 @@ type LivenessRecord struct { raw []byte } +// NoveLivenessOptions is the input to NewNodeLiveness. +type NodeLivenessOptions struct { + AmbientCtx log.AmbientContext + Settings *cluster.Settings + Gossip *gossip.Gossip + Clock *hlc.Clock + DB *kv.DB + LivenessThreshold time.Duration + RenewalDuration time.Duration + HistogramWindowInterval time.Duration +} + // NewNodeLiveness returns a new instance of NodeLiveness configured // with the specified gossip instance. -func NewNodeLiveness( - ambient log.AmbientContext, - clock *hlc.Clock, - db *kv.DB, - g *gossip.Gossip, - livenessThreshold time.Duration, - renewalDuration time.Duration, - st *cluster.Settings, - histogramWindow time.Duration, -) *NodeLiveness { +func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { nl := &NodeLiveness{ - ambientCtx: ambient, - clock: clock, - db: db, - gossip: g, - livenessThreshold: livenessThreshold, - heartbeatInterval: livenessThreshold - renewalDuration, + ambientCtx: opts.AmbientCtx, + clock: opts.Clock, + db: opts.DB, + gossip: opts.Gossip, + livenessThreshold: opts.LivenessThreshold, + heartbeatInterval: opts.LivenessThreshold - opts.RenewalDuration, selfSem: make(chan struct{}, 1), - st: st, + st: opts.Settings, otherSem: make(chan struct{}, 1), heartbeatToken: make(chan struct{}, 1), } @@ -265,7 +268,7 @@ func NewNodeLiveness( HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses), HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures), EpochIncrements: metric.NewCounter(metaEpochIncrements), - HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, histogramWindow), + HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, opts.HistogramWindowInterval), } nl.mu.nodes = make(map[roachpb.NodeID]LivenessRecord) nl.heartbeatToken <- struct{}{} diff --git a/pkg/server/server.go b/pkg/server/server.go index 69f089afdd0c..bef8c26d45ed 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -379,16 +379,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { nlActive, nlRenewal := cfg.NodeLivenessDurations() - nodeLiveness := kvserver.NewNodeLiveness( - cfg.AmbientCtx, - clock, - db, - g, - nlActive, - nlRenewal, - st, - cfg.HistogramWindowInterval(), - ) + nodeLiveness := kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: cfg.AmbientCtx, + Clock: clock, + DB: db, + Gossip: g, + LivenessThreshold: nlActive, + RenewalDuration: nlRenewal, + Settings: st, + HistogramWindowInterval: cfg.HistogramWindowInterval(), + }) registry.AddMetricStruct(nodeLiveness.Metrics()) storePool := kvserver.NewStorePool( diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 258139bf321f..01234abd1fb5 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -162,16 +162,16 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto cfg.Gossip = ltc.Gossip cfg.HistogramWindowInterval = metric.TestSampleInterval active, renewal := cfg.NodeLivenessDurations() - cfg.NodeLiveness = kvserver.NewNodeLiveness( - cfg.AmbientCtx, - cfg.Clock, - cfg.DB, - cfg.Gossip, - active, - renewal, - cfg.Settings, - cfg.HistogramWindowInterval, - ) + cfg.NodeLiveness = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: cfg.AmbientCtx, + Clock: cfg.Clock, + DB: cfg.DB, + Gossip: cfg.Gossip, + LivenessThreshold: active, + RenewalDuration: renewal, + Settings: cfg.Settings, + HistogramWindowInterval: cfg.HistogramWindowInterval, + }) kvserver.TimeUntilStoreDead.Override(&cfg.Settings.SV, kvserver.TestTimeUntilStoreDead) cfg.StorePool = kvserver.NewStorePool( cfg.AmbientCtx, From 1671c7771570241aa56c50295e92f87dcc6adf88 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 10:50:46 +0200 Subject: [PATCH 2/8] kvserver: rename Liveness.Start{Heartbeat,} The bulk of the options for node liveness will shift to the Start method which was renamed to make this a more idiomatic pattern. We want to pass the options at Start time because by that time the caller is usually fully set up to provide its callbacks, etc. At construction time, it is early in the server lifecycle and in particular the `storage.Engine`s are not set up yet. But we want to use them in an upcoming callback, which will be possible once the callbacks are added at Start() time. Release note: None --- pkg/kv/kvserver/client_test.go | 4 +-- pkg/kv/kvserver/node_liveness.go | 30 +++++++++---------- pkg/kv/kvserver/store_bootstrap.go | 4 +-- pkg/server/server.go | 2 +- .../localtestcluster/local_test_cluster.go | 2 +- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index eb167ca56d8b..1bd90b7ee850 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1019,7 +1019,7 @@ func (m *multiTestContext) addStore(idx int) { m.t.Fatal(err) } } - m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) { + m.nodeLivenesses[idx].Start(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) { now := clock.Now() if err := store.WriteLastUpTimestamp(ctx, now); err != nil { log.Warningf(ctx, "%v", err) @@ -1117,7 +1117,7 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Reset() m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Reset() m.mu.Unlock() - cfg.NodeLiveness.StartHeartbeat(ctx, stopper, m.engines[i:i+1], func(ctx context.Context) { + cfg.NodeLiveness.Start(ctx, stopper, m.engines[i:i+1], func(ctx context.Context) { now := m.clocks[i].Now() if err := store.WriteLastUpTimestamp(ctx, now); err != nil { log.Warningf(ctx, "%v", err) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 79b0e0c8cf73..c6efd8bd4ad7 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -182,7 +182,7 @@ type NodeLiveness struct { mu struct { syncutil.RWMutex - callbacks []IsLiveCallback + onIsLive []IsLiveCallback // nodes is an in-memory cache of liveness records that NodeLiveness // knows about (having learnt of them through gossip or through KV). // It's a look-aside cache, and is accessed primarily through @@ -205,7 +205,7 @@ type NodeLiveness struct { // - Update the liveness record in KV // - Add the updated record into this cache (see `maybeUpdate`) // - // (See `StartHeartbeat` for an example of this pattern.) + // (See `Start` for an example of this pattern.) // // What we want instead is a bit simpler: // @@ -502,7 +502,7 @@ type livenessUpdate struct { // given node ID. This is typically used when adding a new node to a running // cluster, or when bootstrapping a cluster through a given node. // -// This is a pared down version of StartHeartbeat; it exists only to durably +// This is a pared down version of Start; it exists only to durably // persist a liveness to record the node's existence. Nodes will heartbeat their // records after starting up, and incrementing to epoch=1 when doing so, at // which point we'll set an appropriate expiration timestamp, gossip the @@ -622,12 +622,12 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { return liveness.IsLive(nl.clock.Now().GoTime()), nil } -// StartHeartbeat starts a periodic heartbeat to refresh this node's last +// Start starts a periodic heartbeat to refresh this node's last // heartbeat in the node liveness table. The optionally provided -// HeartbeatCallback will be invoked whenever this node updates its own -// liveness. The slice of engines will be written to before each heartbeat to -// avoid maintaining liveness in the presence of disk stalls. -func (nl *NodeLiveness) StartHeartbeat( +// HeartbeatCallback will be invoked whenever this node updates its +// own liveness. The slice of engines will be written to before each +// heartbeat to avoid maintaining liveness in the presence of disk stalls. +func (nl *NodeLiveness) Start( ctx context.Context, stopper *stop.Stopper, engines []storage.Engine, alive HeartbeatCallback, ) { log.VEventf(ctx, 1, "starting liveness heartbeat") @@ -729,7 +729,7 @@ func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() { } // PauseSynchronousHeartbeatsForTest disables all node liveness -// heartbeats triggered from outside the normal StartHeartbeat loop. +// heartbeats triggered from outside the normal Start loop. // Returns a closure to call to re-enable synchronous heartbeats. Only // safe for use in tests. func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() { @@ -742,7 +742,7 @@ func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() { } // PauseAllHeartbeatsForTest disables all node liveness heartbeats, -// including those triggered from outside the normal StartHeartbeat +// including those triggered from outside the normal Start // loop. Returns a closure to call to re-enable heartbeats. Only safe // for use in tests. func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() { @@ -772,7 +772,7 @@ var errNodeAlreadyLive = errors.New("node already live") // TODO(bdarnell): Fix error semantics here. // // This method is rarely called directly; heartbeats are normally sent -// by the StartHeartbeat loop. +// by the Start loop. // TODO(bdarnell): Should we just remove this synchronous heartbeat completely? func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness kvserverpb.Liveness) error { return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */) @@ -1116,7 +1116,7 @@ func (nl *NodeLiveness) Metrics() LivenessMetrics { func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { nl.mu.Lock() defer nl.mu.Unlock() - nl.mu.callbacks = append(nl.mu.callbacks, cb) + nl.mu.onIsLive = append(nl.mu.onIsLive, cb) } // updateLiveness does a conditional put on the node liveness record for the @@ -1264,10 +1264,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) } - var callbacks []IsLiveCallback + var onIsLive []IsLiveCallback if shouldReplace { nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec - callbacks = append(callbacks, nl.mu.callbacks...) + onIsLive = append(onIsLive, nl.mu.onIsLive...) } nl.mu.Unlock() @@ -1277,7 +1277,7 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness now := nl.clock.Now().GoTime() if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { - for _, fn := range callbacks { + for _, fn := range onIsLive { fn(newLivenessRec.Liveness) } } diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_bootstrap.go index facd19a247aa..a9a09e8ffed5 100644 --- a/pkg/kv/kvserver/store_bootstrap.go +++ b/pkg/kv/kvserver/store_bootstrap.go @@ -114,8 +114,8 @@ func WriteInitialClusterData( // We start off at epoch=0; when nodes heartbeat their liveness records for // the first time it'll get incremented to epoch=1 [2]. // - // [1]: See `CreateLivenessRecord` and usages for where that happens. - // [2]: See `StartHeartbeat` for where that happens. + // [1]: See `(*NodeLiveness).CreateLivenessRecord` and usages for where that happens. + // [2]: See `(*NodeLiveness).Start` for where that happens. livenessRecord := kvserverpb.Liveness{NodeID: 1, Epoch: 0} if err := livenessVal.SetProto(&livenessRecord); err != nil { return err diff --git a/pkg/server/server.go b/pkg/server/server.go index bef8c26d45ed..8621ea923271 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1581,7 +1581,7 @@ func (s *Server) PreStart(ctx context.Context) error { // Begin the node liveness heartbeat. Add a callback which records the local // store "last up" timestamp for every store whenever the liveness record is // updated. - s.nodeLiveness.StartHeartbeat(ctx, s.stopper, s.engines, func(ctx context.Context) { + s.nodeLiveness.Start(ctx, s.stopper, s.engines, func(ctx context.Context) { now := s.clock.Now() if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { return s.WriteLastUpTimestamp(ctx, now) diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 01234abd1fb5..c72b288991e2 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -229,7 +229,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } if !ltc.DisableLivenessHeartbeat { - cfg.NodeLiveness.StartHeartbeat(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) + cfg.NodeLiveness.Start(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) } if err := ltc.Store.Start(ctx, ltc.stopper); err != nil { From 150689e43074ed57e90808b398c292d1ef94c6fb Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 11:05:41 +0200 Subject: [PATCH 3/8] kvserver: give options struct treatment to NodeLiveness.Start We'll want to pass more parameters to `Start()`, so pull them out into an options struct similar to how we did it for the constructor a few commits prior. Release note: None --- pkg/kv/kvserver/client_test.go | 36 ++++++++++------ pkg/kv/kvserver/node_liveness.go | 43 ++++++++++++------- pkg/server/server.go | 18 +++++--- .../localtestcluster/local_test_cluster.go | 3 +- 4 files changed, 63 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 1bd90b7ee850..1758d26a686b 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1019,15 +1019,19 @@ func (m *multiTestContext) addStore(idx int) { m.t.Fatal(err) } } - m.nodeLivenesses[idx].Start(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) { - now := clock.Now() - if err := store.WriteLastUpTimestamp(ctx, now); err != nil { - log.Warningf(ctx, "%v", err) - } - ran.Do(func() { - close(ran.ch) - }) - }) + m.nodeLivenesses[idx].Start(ctx, + kvserver.NodeLivenessStartOptions{ + Stopper: stopper, + Engines: m.engines[idx : idx+1], + OnSelfLive: func(ctx context.Context) { + now := clock.Now() + if err := store.WriteLastUpTimestamp(ctx, now); err != nil { + log.Warningf(ctx, "%v", err) + } + ran.Do(func() { + close(ran.ch) + }) + }}) store.WaitForInit() @@ -1117,11 +1121,15 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Reset() m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Reset() m.mu.Unlock() - cfg.NodeLiveness.Start(ctx, stopper, m.engines[i:i+1], func(ctx context.Context) { - now := m.clocks[i].Now() - if err := store.WriteLastUpTimestamp(ctx, now); err != nil { - log.Warningf(ctx, "%v", err) - } + cfg.NodeLiveness.Start(ctx, kvserver.NodeLivenessStartOptions{ + Stopper: stopper, + Engines: m.engines[i : i+1], + OnSelfLive: func(ctx context.Context) { + now := m.clocks[i].Now() + if err := store.WriteLastUpTimestamp(ctx, now); err != nil { + log.Warningf(ctx, "%v", err) + } + }, }) } diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index c6efd8bd4ad7..a2f950136495 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -216,8 +216,8 @@ type NodeLiveness struct { // // More concretely, we want `getLivenessRecordFromKV` to be tucked away // within `getLivenessLocked`. - nodes map[roachpb.NodeID]LivenessRecord - heartbeatCallback HeartbeatCallback + nodes map[roachpb.NodeID]LivenessRecord + onSelfLive HeartbeatCallback // Before heartbeating, we write to each of these engines to avoid // maintaining liveness when a local disks is stalled. engines []storage.Engine @@ -235,7 +235,13 @@ type LivenessRecord struct { raw []byte } -// NoveLivenessOptions is the input to NewNodeLiveness. +// NodeLivenessOptions is the input to NewNodeLiveness. +// +// Note that there is yet another struct, NodeLivenessStartOptions, which +// is supplied when the instance is started. This is necessary as during +// server startup, some inputs can only be constructed at Start time. The +// separation has grown organically and various options could in principle +// be moved back and forth. type NodeLivenessOptions struct { AmbientCtx log.AmbientContext Settings *cluster.Settings @@ -622,32 +628,39 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { return liveness.IsLive(nl.clock.Now().GoTime()), nil } +// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`. +type NodeLivenessStartOptions struct { + Stopper *stop.Stopper + Engines []storage.Engine + // OnSelfLive is invoked after every successful heartbeat + // of the local liveness instance's heartbeat loop. + OnSelfLive HeartbeatCallback +} + // Start starts a periodic heartbeat to refresh this node's last // heartbeat in the node liveness table. The optionally provided // HeartbeatCallback will be invoked whenever this node updates its // own liveness. The slice of engines will be written to before each // heartbeat to avoid maintaining liveness in the presence of disk stalls. -func (nl *NodeLiveness) Start( - ctx context.Context, stopper *stop.Stopper, engines []storage.Engine, alive HeartbeatCallback, -) { +func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) { log.VEventf(ctx, 1, "starting liveness heartbeat") retryOpts := base.DefaultRetryOptions() - retryOpts.Closer = stopper.ShouldQuiesce() + retryOpts.Closer = opts.Stopper.ShouldQuiesce() - if len(engines) == 0 { + if len(opts.Engines) == 0 { // Avoid silently forgetting to pass the engines. It happened before. log.Fatalf(ctx, "must supply at least one engine") } nl.mu.Lock() - nl.mu.heartbeatCallback = alive - nl.mu.engines = engines + nl.mu.onSelfLive = opts.OnSelfLive + nl.mu.engines = opts.Engines nl.mu.Unlock() - stopper.RunWorker(ctx, func(context.Context) { + opts.Stopper.RunWorker(ctx, func(context.Context) { ambient := nl.ambientCtx ambient.AddLogTag("liveness-hb", nil) - ctx, cancel := stopper.WithCancelOnStop(context.Background()) + ctx, cancel := opts.Stopper.WithCancelOnStop(context.Background()) defer cancel() ctx, sp := ambient.AnnotateCtxWithSpan(ctx, "liveness heartbeat loop") defer sp.Finish() @@ -658,7 +671,7 @@ func (nl *NodeLiveness) Start( for { select { case <-nl.heartbeatToken: - case <-stopper.ShouldStop(): + case <-opts.Stopper.ShouldStop(): return } // Give the context a timeout approximately as long as the time we @@ -695,7 +708,7 @@ func (nl *NodeLiveness) Start( nl.heartbeatToken <- struct{}{} select { case <-ticker.C: - case <-stopper.ShouldStop(): + case <-opts.Stopper.ShouldStop(): return } } @@ -1240,7 +1253,7 @@ func (nl *NodeLiveness) updateLivenessAttempt( } nl.mu.RLock() - cb := nl.mu.heartbeatCallback + cb := nl.mu.onSelfLive nl.mu.RUnlock() if cb != nil { cb(ctx) diff --git a/pkg/server/server.go b/pkg/server/server.go index 8621ea923271..d5a8b640c6d6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1581,13 +1581,17 @@ func (s *Server) PreStart(ctx context.Context) error { // Begin the node liveness heartbeat. Add a callback which records the local // store "last up" timestamp for every store whenever the liveness record is // updated. - s.nodeLiveness.Start(ctx, s.stopper, s.engines, func(ctx context.Context) { - now := s.clock.Now() - if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { - return s.WriteLastUpTimestamp(ctx, now) - }); err != nil { - log.Warningf(ctx, "writing last up timestamp: %v", err) - } + s.nodeLiveness.Start(ctx, kvserver.NodeLivenessStartOptions{ + Stopper: s.stopper, + Engines: s.engines, + OnSelfLive: func(ctx context.Context) { + now := s.clock.Now() + if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + return s.WriteLastUpTimestamp(ctx, now) + }); err != nil { + log.Warningf(ctx, "writing last up timestamp: %v", err) + } + }, }) // Begin recording status summaries. diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index c72b288991e2..ac742c457057 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -229,7 +229,8 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } if !ltc.DisableLivenessHeartbeat { - cfg.NodeLiveness.Start(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) + cfg.NodeLiveness.Start(ctx, + kvserver.NodeLivenessStartOptions{Stopper: ltc.stopper, Engines: []storage.Engine{ltc.Eng}}) } if err := ltc.Store.Start(ctx, ltc.stopper); err != nil { From 41946c53cf582a9bcf7a37a100c6de58aaa59f7d Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Oct 2020 10:24:38 +0200 Subject: [PATCH 4/8] server: create engines in NewServer We were jumping through a number of hoops to create the engines only in `(*Server).Start` since that seems to be the "idiomatic" place to start moving parts. However, it creates a lot of complexity since various callbacks have to be registered with access to engines. Move engine creation to `NewServer`. This unblocks #54936. Release note: None --- pkg/server/server.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index d5a8b640c6d6..899466606f95 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -123,6 +123,7 @@ type Server struct { mux http.ServeMux clock *hlc.Clock rpcContext *rpc.Context + engines Engines // The gRPC server on which the different RPC handlers will be registered. grpc *grpcServer gossip *gossip.Gossip @@ -158,9 +159,7 @@ type Server struct { externalStorageBuilder *externalStorageBuilder // The following fields are populated at start time, i.e. in `(*Server).Start`. - startTime time.Time - engines Engines } // externalStorageBuilder is a wrapper around the ExternalStorage factory @@ -597,12 +596,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { debugServer := debug.NewServer(st, sqlServer.pgServer.HBADebugFn()) node.InitLogger(sqlServer.execCfg) + engines, err := cfg.CreateEngines(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to create engines") + } + stopper.AddCloser(&engines) + *lateBoundServer = Server{ nodeIDContainer: nodeIDContainer, cfg: cfg, st: st, clock: clock, rpcContext: rpcContext, + engines: engines, grpc: grpcServer, gossip: g, nodeDialer: nodeDialer, @@ -1048,12 +1054,6 @@ func (s *Server) PreStart(ctx context.Context) error { return err } - s.engines, err = s.cfg.CreateEngines(ctx) - if err != nil { - return errors.Wrap(err, "failed to create engines") - } - s.stopper.AddCloser(&s.engines) - // Initialize the external storage builders configuration params now that the // engines have been created. The object can be used to create ExternalStorage // objects hereafter. From 68c765bf6f42f92b6264c70d5c941b0b931244c4 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 11:32:12 +0200 Subject: [PATCH 5/8] kvserver: add Nodeliveness.StartOptions.OnNodeDecommissionedCallback This callback is invoked whenever a node is permanently marked as removed from the cluster. We use it in `Server.Start`, though only to call the optionally provided testing knob to establish a unit test. The plan is to hook this up to a component that keeps track of the decommissioned nodes across restarts, to provide best-effort (but tight) guarantees that these nodes won't be able to reconnect to the cluster. This is required to uphold the invariants for long- running migrations. Release note: None --- pkg/kv/kvserver/node_liveness.go | 49 +++++++++++++------- pkg/kv/kvserver/node_liveness_test.go | 64 +++++++++++++++++++++++++++ pkg/server/server.go | 5 +++ pkg/server/testing_knobs.go | 4 ++ 4 files changed, 105 insertions(+), 17 deletions(-) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index a2f950136495..a75d872b9f5f 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -176,13 +176,14 @@ type NodeLiveness struct { // heartbeatPaused contains an atomically-swapped number representing a bool // (1 or 0). heartbeatToken is a channel containing a token which is taken // when heartbeating or when pausing the heartbeat. Used for testing. - heartbeatPaused uint32 - heartbeatToken chan struct{} - metrics LivenessMetrics + heartbeatPaused uint32 + heartbeatToken chan struct{} + metrics LivenessMetrics + onNodeDecommissioned func(kvserverpb.Liveness) // noop if nil mu struct { syncutil.RWMutex - onIsLive []IsLiveCallback + onIsLive []IsLiveCallback // see NodeLivenessOptions.OnSelfLive // nodes is an in-memory cache of liveness records that NodeLiveness // knows about (having learnt of them through gossip or through KV). // It's a look-aside cache, and is accessed primarily through @@ -217,10 +218,10 @@ type NodeLiveness struct { // More concretely, we want `getLivenessRecordFromKV` to be tucked away // within `getLivenessLocked`. nodes map[roachpb.NodeID]LivenessRecord - onSelfLive HeartbeatCallback + onSelfLive HeartbeatCallback // set in Start() // Before heartbeating, we write to each of these engines to avoid // maintaining liveness when a local disks is stalled. - engines []storage.Engine + engines []storage.Engine // set in Start() } } @@ -251,22 +252,28 @@ type NodeLivenessOptions struct { LivenessThreshold time.Duration RenewalDuration time.Duration HistogramWindowInterval time.Duration + // OnNodeDecommissioned is invoked whenever the instance learns that a + // node was permanently removed from the cluster. This method must be + // idempotent as it may be invoked multiple times and defaults to a + // noop. + OnNodeDecommissioned func(kvserverpb.Liveness) } // NewNodeLiveness returns a new instance of NodeLiveness configured // with the specified gossip instance. func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { nl := &NodeLiveness{ - ambientCtx: opts.AmbientCtx, - clock: opts.Clock, - db: opts.DB, - gossip: opts.Gossip, - livenessThreshold: opts.LivenessThreshold, - heartbeatInterval: opts.LivenessThreshold - opts.RenewalDuration, - selfSem: make(chan struct{}, 1), - st: opts.Settings, - otherSem: make(chan struct{}, 1), - heartbeatToken: make(chan struct{}, 1), + ambientCtx: opts.AmbientCtx, + clock: opts.Clock, + db: opts.DB, + gossip: opts.Gossip, + livenessThreshold: opts.LivenessThreshold, + heartbeatInterval: opts.LivenessThreshold - opts.RenewalDuration, + selfSem: make(chan struct{}, 1), + st: opts.Settings, + otherSem: make(chan struct{}, 1), + heartbeatToken: make(chan struct{}, 1), + onNodeDecommissioned: opts.OnNodeDecommissioned, } nl.metrics = LivenessMetrics{ LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes), @@ -279,6 +286,11 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { nl.mu.nodes = make(map[roachpb.NodeID]LivenessRecord) nl.heartbeatToken <- struct{}{} + // NB: we should consider moving this registration to .Start() once we + // have ensured that nobody uses the server's KV client (kv.DB) before + // nl.Start() is invoked. At the time of writing this invariant does + // not hold (which is a problem, since the node itself won't be live + // at this point, and requests routed to it will hang). livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) nl.gossip.RegisterCallback(livenessRegex, nl.livenessGossipUpdate) @@ -643,7 +655,7 @@ type NodeLivenessStartOptions struct { // own liveness. The slice of engines will be written to before each // heartbeat to avoid maintaining liveness in the presence of disk stalls. func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) { - log.VEventf(ctx, 1, "starting liveness heartbeat") + log.VEventf(ctx, 1, "starting node liveness instance") retryOpts := base.DefaultRetryOptions() retryOpts.Closer = opts.Stopper.ShouldQuiesce() @@ -1294,6 +1306,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness fn(newLivenessRec.Liveness) } } + if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil { + nl.onNodeDecommissioned(newLivenessRec.Liveness) + } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 5c94867097e0..0ea368497c94 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -15,6 +15,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync/atomic" "testing" "time" @@ -41,6 +42,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/proto" + "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -1190,3 +1192,65 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { t.Fatal("no change committed") } } + +func TestNodeLivenessDecommissionedCallback(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + var cb struct { + syncutil.Mutex + m map[roachpb.NodeID]bool // id -> decommissioned + } + + tArgs := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + OnDecommissionedCallback: func(rec kvserverpb.Liveness) { + cb.Lock() + if cb.m == nil { + cb.m = map[roachpb.NodeID]bool{} + } + cb.m[rec.NodeID] = rec.Membership == kvserverpb.MembershipStatus_DECOMMISSIONED + cb.Unlock() + + }, + }, + }, + } + args := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // for speed + ServerArgs: tArgs, + } + tc := testcluster.NewTestCluster(t, 3, args) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + + nl1 := tc.Servers[0].NodeLiveness().(*kvserver.NodeLiveness) + + // Make sure the callback doesn't fire willy-nilly... + func() { + chg, err := nl1.SetMembershipStatus(ctx, 2, kvserverpb.MembershipStatus_DECOMMISSIONING) + require.NoError(t, err) + require.True(t, chg) + cb.Lock() + defer cb.Unlock() + require.Zero(t, cb.m) + }() + + // ... but only when a node actually gets decommissioned. + { + chg, err := nl1.SetMembershipStatus(ctx, 2, kvserverpb.MembershipStatus_DECOMMISSIONED) + require.NoError(t, err) + require.True(t, chg) + testutils.SucceedsSoon(t, func() error { + cb.Lock() + sl := pretty.Diff(map[roachpb.NodeID]bool{2: true}, cb.m) + cb.Unlock() + if len(sl) > 0 { + return errors.Errorf("diff(exp,act) = %s", strings.Join(sl, "\n")) + } + return nil + }) + + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 899466606f95..cde7e0d954c3 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -387,6 +387,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { RenewalDuration: nlRenewal, Settings: st, HistogramWindowInterval: cfg.HistogramWindowInterval(), + OnNodeDecommissioned: func(liveness kvserverpb.Liveness) { + if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil { + knobs.OnDecommissionedCallback(liveness) + } + }, }) registry.AddMetricStruct(nodeLiveness.Metrics()) diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 29001ae95b89..a34b7b0a38ae 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -14,6 +14,7 @@ import ( "net" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/diagnosticspb" @@ -73,6 +74,9 @@ type TestingKnobs struct { // TODO(irfansharif): Update users of this testing knob to use the // appropriate clusterversion.Handle instead. BinaryVersionOverride roachpb.Version + // An (additional) callback invoked whenever a + // node is permanently removed from the cluster. + OnDecommissionedCallback func(kvserverpb.Liveness) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. From 9e54089be0138f3b74936ef580a49a88b8828c96 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 11:54:19 +0200 Subject: [PATCH 6/8] rpc: rename some PingRequest variables for clarity Make it obvious which ones apply to the origin (the maker of the PingRequest) and which ones to the destination (the receiver of PingRequest). We are going to add `origin_node_id` here separately later. Release note: None --- pkg/rpc/context.go | 10 +-- pkg/rpc/heartbeat.go | 10 +-- pkg/rpc/heartbeat.pb.go | 125 +++++++++++++++++++------------------- pkg/rpc/heartbeat.proto | 10 +-- pkg/rpc/heartbeat_test.go | 10 +-- 5 files changed, 83 insertions(+), 82 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index e2c637c899c6..dd5febcba05b 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1133,11 +1133,11 @@ func (ctx *Context) runHeartbeat( // We re-mint the PingRequest to pick up any asynchronous update to clusterID. clusterID := ctx.ClusterID.Get() request := &PingRequest{ - Addr: ctx.Config.Addr, - MaxOffsetNanos: maxOffsetNanos, - ClusterID: &clusterID, - NodeID: conn.remoteNodeID, - ServerVersion: ctx.Settings.Version.BinaryVersion(), + OriginAddr: ctx.Config.Addr, + OriginMaxOffsetNanos: maxOffsetNanos, + ClusterID: &clusterID, + TargetNodeID: conn.remoteNodeID, + ServerVersion: ctx.Settings.Version.BinaryVersion(), } var response *PingResponse diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 90532161af07..8d8c6e1c0335 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -123,7 +123,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR if hs.nodeID != nil { nodeID = hs.nodeID.Get() } - if args.NodeID != 0 && (!hs.testingAllowNamedRPCToAnonymousServer || nodeID != 0) && args.NodeID != nodeID { + if args.TargetNodeID != 0 && (!hs.testingAllowNamedRPCToAnonymousServer || nodeID != 0) && args.TargetNodeID != nodeID { // If nodeID != 0, the situation is clear (we are checking that // the other side is talking to the right node). // @@ -133,7 +133,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR // however we can still serve connections that don't need a node // ID, e.g. during initial gossip. return nil, errors.Errorf( - "client requested node ID %d doesn't match server node ID %d", args.NodeID, nodeID) + "client requested node ID %d doesn't match server node ID %d", args.TargetNodeID, nodeID) } // Check version compatibility. @@ -146,16 +146,16 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR // This check is ignored if either offset is set to 0 (for unittests). // Note that we validated this connection already. Different clusters // could very well have different max offsets. - mo, amo := hs.clock.MaxOffset(), time.Duration(args.MaxOffsetNanos) + mo, amo := hs.clock.MaxOffset(), time.Duration(args.OriginMaxOffsetNanos) if mo != 0 && amo != 0 && mo != amo { panic(fmt.Sprintf("locally configured maximum clock offset (%s) "+ - "does not match that of node %s (%s)", mo, args.Addr, amo)) + "does not match that of node %s (%s)", mo, args.OriginAddr, amo)) } serverOffset := args.Offset // The server offset should be the opposite of the client offset. serverOffset.Offset = -serverOffset.Offset - hs.remoteClockMonitor.UpdateOffset(ctx, args.Addr, serverOffset, 0 /* roundTripLatency */) + hs.remoteClockMonitor.UpdateOffset(ctx, args.OriginAddr, serverOffset, 0 /* roundTripLatency */) return &PingResponse{ Pong: args.Ping, ServerTime: hs.clock.PhysicalNow(), diff --git a/pkg/rpc/heartbeat.pb.go b/pkg/rpc/heartbeat.pb.go index 44f6468102da..792627f37df5 100644 --- a/pkg/rpc/heartbeat.pb.go +++ b/pkg/rpc/heartbeat.pb.go @@ -49,7 +49,7 @@ type RemoteOffset struct { func (m *RemoteOffset) Reset() { *m = RemoteOffset{} } func (*RemoteOffset) ProtoMessage() {} func (*RemoteOffset) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_b9adbf29944dc273, []int{0} + return fileDescriptor_heartbeat_c48a0290adb87462, []int{0} } func (m *RemoteOffset) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -82,21 +82,21 @@ type PingRequest struct { // The last offset the client measured with the server. Offset RemoteOffset `protobuf:"bytes,2,opt,name=offset" json:"offset"` // The address of the client. - Addr string `protobuf:"bytes,3,opt,name=addr" json:"addr"` + OriginAddr string `protobuf:"bytes,3,opt,name=origin_addr,json=originAddr" json:"origin_addr"` // The configured maximum clock offset (in nanoseconds) on the server. - MaxOffsetNanos int64 `protobuf:"varint,4,opt,name=max_offset_nanos,json=maxOffsetNanos" json:"max_offset_nanos"` + OriginMaxOffsetNanos int64 `protobuf:"varint,4,opt,name=origin_max_offset_nanos,json=originMaxOffsetNanos" json:"origin_max_offset_nanos"` // Cluster ID to prevent connections between nodes in different clusters. - ClusterID *github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,5,opt,name=cluster_id,json=clusterId,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"cluster_id,omitempty"` + ClusterID *github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,5,opt,name=origin_cluster_id,json=originClusterId,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"origin_cluster_id,omitempty"` ServerVersion roachpb.Version `protobuf:"bytes,6,opt,name=server_version,json=serverVersion" json:"server_version"` // Node ID to prevent connections from being misrouted to an invalid node inside the cluster. - NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,7,opt,name=node_id,json=nodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` + TargetNodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,7,opt,name=target_node_id,json=targetNodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"target_node_id"` } func (m *PingRequest) Reset() { *m = PingRequest{} } func (m *PingRequest) String() string { return proto.CompactTextString(m) } func (*PingRequest) ProtoMessage() {} func (*PingRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_b9adbf29944dc273, []int{1} + return fileDescriptor_heartbeat_c48a0290adb87462, []int{1} } func (m *PingRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,7 +137,7 @@ func (m *PingResponse) Reset() { *m = PingResponse{} } func (m *PingResponse) String() string { return proto.CompactTextString(m) } func (*PingResponse) ProtoMessage() {} func (*PingResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_b9adbf29944dc273, []int{2} + return fileDescriptor_heartbeat_c48a0290adb87462, []int{2} } func (m *PingResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -392,11 +392,11 @@ func (m *PingRequest) MarshalTo(dAtA []byte) (int, error) { i += n1 dAtA[i] = 0x1a i++ - i = encodeVarintHeartbeat(dAtA, i, uint64(len(m.Addr))) - i += copy(dAtA[i:], m.Addr) + i = encodeVarintHeartbeat(dAtA, i, uint64(len(m.OriginAddr))) + i += copy(dAtA[i:], m.OriginAddr) dAtA[i] = 0x20 i++ - i = encodeVarintHeartbeat(dAtA, i, uint64(m.MaxOffsetNanos)) + i = encodeVarintHeartbeat(dAtA, i, uint64(m.OriginMaxOffsetNanos)) if m.ClusterID != nil { dAtA[i] = 0x2a i++ @@ -417,7 +417,7 @@ func (m *PingRequest) MarshalTo(dAtA []byte) (int, error) { i += n3 dAtA[i] = 0x38 i++ - i = encodeVarintHeartbeat(dAtA, i, uint64(m.NodeID)) + i = encodeVarintHeartbeat(dAtA, i, uint64(m.TargetNodeID)) return i, nil } @@ -497,16 +497,16 @@ func (m *PingRequest) Size() (n int) { n += 1 + l + sovHeartbeat(uint64(l)) l = m.Offset.Size() n += 1 + l + sovHeartbeat(uint64(l)) - l = len(m.Addr) + l = len(m.OriginAddr) n += 1 + l + sovHeartbeat(uint64(l)) - n += 1 + sovHeartbeat(uint64(m.MaxOffsetNanos)) + n += 1 + sovHeartbeat(uint64(m.OriginMaxOffsetNanos)) if m.ClusterID != nil { l = m.ClusterID.Size() n += 1 + l + sovHeartbeat(uint64(l)) } l = m.ServerVersion.Size() n += 1 + l + sovHeartbeat(uint64(l)) - n += 1 + sovHeartbeat(uint64(m.NodeID)) + n += 1 + sovHeartbeat(uint64(m.TargetNodeID)) return n } @@ -737,7 +737,7 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OriginAddr", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -762,13 +762,13 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Addr = string(dAtA[iNdEx:postIndex]) + m.OriginAddr = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field MaxOffsetNanos", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OriginMaxOffsetNanos", wireType) } - m.MaxOffsetNanos = 0 + m.OriginMaxOffsetNanos = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowHeartbeat @@ -778,7 +778,7 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.MaxOffsetNanos |= (int64(b) & 0x7F) << shift + m.OriginMaxOffsetNanos |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -847,9 +847,9 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 7: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TargetNodeID", wireType) } - m.NodeID = 0 + m.TargetNodeID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowHeartbeat @@ -859,7 +859,7 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + m.TargetNodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift if b < 0x80 { break } @@ -1167,46 +1167,47 @@ var ( ErrIntOverflowHeartbeat = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("rpc/heartbeat.proto", fileDescriptor_heartbeat_b9adbf29944dc273) } +func init() { proto.RegisterFile("rpc/heartbeat.proto", fileDescriptor_heartbeat_c48a0290adb87462) } -var fileDescriptor_heartbeat_b9adbf29944dc273 = []byte{ - // 594 bytes of a gzipped FileDescriptorProto +var fileDescriptor_heartbeat_c48a0290adb87462 = []byte{ + // 616 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x41, 0x4f, 0xd4, 0x40, - 0x14, 0x6e, 0xd9, 0xb2, 0xb8, 0xb3, 0x0b, 0x31, 0xa3, 0x21, 0xcd, 0x62, 0xba, 0xb8, 0x09, 0xba, - 0xa7, 0xd6, 0xe0, 0x49, 0x6f, 0x2c, 0x18, 0xdd, 0x98, 0x2c, 0x66, 0x05, 0x0e, 0x1e, 0x6c, 0x66, - 0x3b, 0x8f, 0x32, 0x81, 0xce, 0xd4, 0xe9, 0x94, 0xe0, 0xd1, 0x7f, 0x60, 0x3c, 0x79, 0xf4, 0x77, - 0xf8, 0x0b, 0x38, 0x72, 0x24, 0x1e, 0x88, 0x2e, 0x7f, 0xc4, 0x4c, 0xa7, 0xbb, 0x14, 0xe4, 0x60, - 0xb8, 0xbd, 0xbe, 0xf7, 0xbd, 0xf7, 0xbe, 0x6f, 0xbe, 0x57, 0xf4, 0x40, 0xa6, 0x51, 0x70, 0x00, - 0x44, 0xaa, 0x31, 0x10, 0xe5, 0xa7, 0x52, 0x28, 0x81, 0x17, 0x23, 0x11, 0x1d, 0x4a, 0x41, 0xa2, - 0x03, 0x5f, 0xa6, 0x51, 0x7b, 0xb9, 0x08, 0xd3, 0x71, 0x90, 0x80, 0x22, 0x94, 0x28, 0x62, 0x60, - 0xed, 0x87, 0xb1, 0x88, 0x45, 0x11, 0x06, 0x3a, 0x32, 0xd9, 0xee, 0x17, 0x1b, 0xb5, 0x46, 0x90, - 0x08, 0x05, 0xdb, 0xfb, 0xfb, 0x19, 0x28, 0xfc, 0x08, 0xd5, 0x45, 0x11, 0xb9, 0xf6, 0xaa, 0xdd, - 0xab, 0xf5, 0x9d, 0xd3, 0x8b, 0x8e, 0x35, 0x2a, 0x73, 0xf8, 0x09, 0x6a, 0xe6, 0x3c, 0x02, 0xa9, - 0x08, 0xe3, 0xea, 0xb3, 0x3b, 0x57, 0x81, 0x54, 0x0b, 0x78, 0x0d, 0x35, 0x13, 0x20, 0x59, 0x2e, - 0x81, 0x86, 0x44, 0xb9, 0xb5, 0x0a, 0x0e, 0x4d, 0x0b, 0x1b, 0xea, 0xa5, 0xf3, 0xfd, 0x47, 0xc7, - 0xea, 0xfe, 0xac, 0xa1, 0xe6, 0x3b, 0xc6, 0xe3, 0x11, 0x7c, 0xca, 0x21, 0x53, 0xd8, 0x45, 0x4e, - 0xca, 0x78, 0x5c, 0x10, 0x68, 0x94, 0x5d, 0x45, 0x06, 0xbf, 0x98, 0x91, 0xd3, 0x9b, 0x9b, 0xeb, - 0x2b, 0xfe, 0x35, 0xed, 0x7e, 0x55, 0xc9, 0x0d, 0xe6, 0x2e, 0x72, 0x08, 0xa5, 0xb2, 0xa0, 0x32, - 0x1b, 0xaa, 0x33, 0xd8, 0x47, 0xf7, 0x13, 0x72, 0x12, 0x1a, 0x5c, 0xc8, 0x09, 0x17, 0x99, 0xeb, - 0x54, 0x08, 0x2f, 0x25, 0xe4, 0xc4, 0x8c, 0x1c, 0xea, 0x1a, 0x8e, 0x10, 0x8a, 0x8e, 0xf2, 0x4c, - 0x81, 0x0c, 0x19, 0x75, 0xe7, 0x57, 0xed, 0x5e, 0xab, 0xbf, 0xf5, 0xeb, 0xa2, 0xf3, 0x3c, 0x66, - 0xea, 0x20, 0x1f, 0xfb, 0x91, 0x48, 0x82, 0x19, 0x2d, 0x3a, 0xbe, 0x8a, 0x83, 0xf4, 0x30, 0x0e, - 0x72, 0xc5, 0x8e, 0x82, 0x3c, 0x67, 0xd4, 0xdf, 0xdd, 0x1d, 0x6c, 0x4d, 0x2e, 0x3a, 0x8d, 0x4d, - 0x33, 0x6c, 0xb0, 0x35, 0x6a, 0x94, 0x73, 0x07, 0x14, 0xbf, 0x46, 0x4b, 0x19, 0xc8, 0x63, 0x90, - 0xe1, 0x31, 0xc8, 0x8c, 0x09, 0xee, 0xd6, 0x0b, 0xc5, 0xed, 0xaa, 0x62, 0x63, 0xb4, 0xbf, 0x67, - 0x10, 0x25, 0xdd, 0x45, 0xd3, 0x57, 0x26, 0xf1, 0x47, 0xb4, 0xc0, 0x05, 0x05, 0x4d, 0x75, 0x61, - 0xd5, 0xee, 0xcd, 0xf7, 0x5f, 0x69, 0xd4, 0x7f, 0xd3, 0x9d, 0xee, 0x18, 0x0a, 0x0a, 0x05, 0xdd, - 0xba, 0x89, 0x46, 0x75, 0x3d, 0x75, 0x40, 0xbb, 0xdf, 0xe6, 0x50, 0xcb, 0x98, 0x97, 0xa5, 0x82, - 0x67, 0x50, 0xb8, 0x27, 0xfe, 0x71, 0x4f, 0xf0, 0x58, 0x1f, 0x45, 0xa9, 0x49, 0xb1, 0x04, 0xae, - 0x1d, 0x0f, 0x32, 0x85, 0x1d, 0x96, 0xc0, 0x2d, 0xd2, 0x6b, 0x77, 0x93, 0xfe, 0x14, 0xb5, 0xa6, - 0x46, 0x71, 0x92, 0x40, 0x61, 0xea, 0x94, 0x51, 0xb3, 0xac, 0x0c, 0x49, 0x02, 0x78, 0x1b, 0x3d, - 0xa6, 0x2c, 0x23, 0xe3, 0x23, 0x08, 0xab, 0x0d, 0x7a, 0x3f, 0xdb, 0x67, 0x11, 0x51, 0x9a, 0x84, - 0x36, 0xfa, 0x5e, 0xd9, 0xed, 0x95, 0xf0, 0xcd, 0xab, 0x21, 0x7b, 0x15, 0xec, 0xfa, 0x10, 0x35, - 0xde, 0x4c, 0xff, 0x52, 0xbc, 0x81, 0x1c, 0xfd, 0x40, 0xb8, 0x7d, 0xe3, 0x58, 0x2b, 0x27, 0xdf, - 0x5e, 0xb9, 0xb5, 0x66, 0x5e, 0xb4, 0x6b, 0xad, 0x03, 0x5a, 0xde, 0x81, 0x4c, 0x31, 0x1e, 0xcf, - 0xc6, 0xbe, 0x57, 0x12, 0x48, 0x82, 0xdf, 0x22, 0xa4, 0xb1, 0xe5, 0xd7, 0xdd, 0x57, 0xf4, 0xec, - 0x67, 0x76, 0x7f, 0xed, 0xf4, 0x8f, 0x67, 0x9d, 0x4e, 0x3c, 0xfb, 0x6c, 0xe2, 0xd9, 0xe7, 0x13, - 0xcf, 0xfe, 0x3d, 0xf1, 0xec, 0xaf, 0x97, 0x9e, 0x75, 0x76, 0xe9, 0x59, 0xe7, 0x97, 0x9e, 0xf5, - 0xa1, 0x26, 0xd3, 0xe8, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x08, 0x36, 0xe1, 0x9a, 0x86, 0x04, - 0x00, 0x00, + 0x14, 0x6e, 0xd9, 0x05, 0x65, 0xb6, 0x60, 0x1c, 0x09, 0x36, 0x8b, 0xe9, 0xe2, 0x26, 0xe8, 0x9e, + 0x5a, 0x83, 0x27, 0xf5, 0xc4, 0x42, 0xa2, 0x84, 0xb8, 0x98, 0x15, 0x38, 0x78, 0x69, 0x66, 0x3b, + 0x8f, 0x32, 0x81, 0xce, 0x94, 0xe9, 0x94, 0xe0, 0xd1, 0x7f, 0x60, 0x3c, 0x79, 0xf4, 0xe7, 0x70, + 0xe4, 0x48, 0x3c, 0x10, 0x5d, 0xfe, 0x82, 0x3f, 0xc0, 0x4c, 0xa7, 0x85, 0x82, 0x1c, 0x0c, 0xb7, + 0x37, 0xef, 0x7d, 0xef, 0xbd, 0xef, 0xbd, 0xef, 0x0d, 0x7a, 0x24, 0xd3, 0x28, 0xd8, 0x03, 0x22, + 0xd5, 0x08, 0x88, 0xf2, 0x53, 0x29, 0x94, 0xc0, 0x33, 0x91, 0x88, 0xf6, 0xa5, 0x20, 0xd1, 0x9e, + 0x2f, 0xd3, 0xa8, 0x3d, 0x5f, 0x98, 0xe9, 0x28, 0x48, 0x40, 0x11, 0x4a, 0x14, 0x31, 0xb0, 0xf6, + 0x5c, 0x2c, 0x62, 0x51, 0x98, 0x81, 0xb6, 0x8c, 0xb7, 0xfb, 0xc5, 0x46, 0xce, 0x10, 0x12, 0xa1, + 0x60, 0x73, 0x77, 0x37, 0x03, 0x85, 0x9f, 0xa0, 0x29, 0x51, 0x58, 0xae, 0xbd, 0x68, 0xf7, 0x1a, + 0xfd, 0xe6, 0xc9, 0x79, 0xc7, 0x1a, 0x96, 0x3e, 0xfc, 0x0c, 0xb5, 0x72, 0x1e, 0x81, 0x54, 0x84, + 0x71, 0xf5, 0xd9, 0x9d, 0xa8, 0x41, 0xea, 0x01, 0xbc, 0x84, 0x5a, 0x09, 0x90, 0x2c, 0x97, 0x40, + 0x43, 0xa2, 0xdc, 0x46, 0x0d, 0x87, 0xaa, 0xc0, 0x8a, 0x7a, 0xdd, 0xfc, 0xfe, 0xa3, 0x63, 0x75, + 0xff, 0x34, 0x50, 0xeb, 0x03, 0xe3, 0xf1, 0x10, 0x0e, 0x73, 0xc8, 0x14, 0x76, 0x51, 0x33, 0x65, + 0x3c, 0x2e, 0x08, 0x4c, 0x97, 0x59, 0x85, 0x07, 0xbf, 0xba, 0x24, 0xa7, 0x3b, 0xb7, 0x96, 0x17, + 0xfc, 0x6b, 0xb3, 0xfb, 0xf5, 0x49, 0x6e, 0x30, 0x5f, 0x42, 0x2d, 0x21, 0x59, 0xcc, 0x78, 0x48, + 0x28, 0x95, 0x05, 0xa3, 0xaa, 0x36, 0x32, 0x81, 0x15, 0x4a, 0x25, 0x7e, 0x83, 0x1e, 0x97, 0xb0, + 0x84, 0x1c, 0x87, 0x26, 0x37, 0xe4, 0x84, 0x8b, 0xcc, 0x6d, 0xd6, 0x86, 0x98, 0x33, 0xa0, 0xf7, + 0xe4, 0xd8, 0x34, 0x1b, 0x68, 0x04, 0x4e, 0xd1, 0xc3, 0x32, 0x39, 0x3a, 0xc8, 0x33, 0x05, 0x32, + 0x64, 0xd4, 0x9d, 0x5c, 0xb4, 0x7b, 0x4e, 0x7f, 0xed, 0xe7, 0x79, 0xe7, 0x65, 0xcc, 0xd4, 0x5e, + 0x3e, 0xf2, 0x23, 0x91, 0x04, 0x97, 0xbc, 0xe9, 0xe8, 0xca, 0x0e, 0xd2, 0xfd, 0x38, 0xc8, 0x15, + 0x3b, 0x08, 0xf2, 0x9c, 0x51, 0x7f, 0x7b, 0x7b, 0x7d, 0x6d, 0x7c, 0xde, 0x99, 0x5e, 0x35, 0xc5, + 0xd6, 0xd7, 0x86, 0x0f, 0x4c, 0xf9, 0xca, 0x41, 0xf1, 0x5b, 0x34, 0x9b, 0x81, 0x3c, 0x02, 0x19, + 0x1e, 0x81, 0xcc, 0x98, 0xe0, 0xee, 0x54, 0xb1, 0x98, 0x76, 0x7d, 0x31, 0xe6, 0x1e, 0xfc, 0x1d, + 0x83, 0x28, 0x27, 0x98, 0x31, 0x79, 0xa5, 0x13, 0x1f, 0xa2, 0x59, 0x45, 0x64, 0xac, 0x87, 0x15, + 0x14, 0x34, 0xef, 0x7b, 0x8b, 0x76, 0x6f, 0xb2, 0xbf, 0xa1, 0xc1, 0xff, 0xcd, 0xbd, 0x6a, 0x35, + 0x10, 0x14, 0x0a, 0xee, 0xce, 0x56, 0x51, 0xd4, 0xbc, 0x87, 0x8e, 0xba, 0x7a, 0xd1, 0xee, 0xb7, + 0x09, 0xe4, 0x18, 0xd9, 0xb3, 0x54, 0xf0, 0x0c, 0x0a, 0xdd, 0xc5, 0x3f, 0xba, 0x0b, 0x1e, 0x6b, + 0xf1, 0xca, 0x31, 0x15, 0x4b, 0xe0, 0xda, 0xd9, 0x21, 0x13, 0xd8, 0x62, 0x09, 0xdc, 0xb2, 0x8d, + 0xc6, 0xdd, 0xb6, 0xf1, 0x1c, 0x39, 0x95, 0x82, 0x9c, 0x24, 0x50, 0x48, 0x5f, 0x31, 0x6a, 0x95, + 0x91, 0x01, 0x49, 0x00, 0x6f, 0xa2, 0xa7, 0x94, 0x65, 0x64, 0x74, 0x00, 0x61, 0x3d, 0x41, 0xf7, + 0x67, 0xbb, 0x2c, 0x22, 0x4a, 0x93, 0xd0, 0x17, 0x70, 0xbf, 0xcc, 0xf6, 0x4a, 0xf8, 0xea, 0x55, + 0x91, 0x9d, 0x1a, 0x76, 0x79, 0x80, 0xa6, 0xdf, 0x55, 0xff, 0x1b, 0xaf, 0xa0, 0xa6, 0x5e, 0x10, + 0x6e, 0xdf, 0x38, 0xf3, 0xda, 0x67, 0x69, 0x2f, 0xdc, 0x1a, 0x33, 0x1b, 0xed, 0x5a, 0xcb, 0x80, + 0xe6, 0xb7, 0x20, 0x53, 0x8c, 0xc7, 0x97, 0x65, 0x3f, 0x2a, 0x09, 0x24, 0xc1, 0x1b, 0x08, 0x69, + 0x6c, 0xf9, 0xba, 0x7b, 0x8b, 0x9e, 0xfd, 0xc2, 0xee, 0x2f, 0x9d, 0xfc, 0xf6, 0xac, 0x93, 0xb1, + 0x67, 0x9f, 0x8e, 0x3d, 0xfb, 0x6c, 0xec, 0xd9, 0xbf, 0xc6, 0x9e, 0xfd, 0xf5, 0xc2, 0xb3, 0x4e, + 0x2f, 0x3c, 0xeb, 0xec, 0xc2, 0xb3, 0x3e, 0x35, 0x64, 0x1a, 0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, + 0xfb, 0xa1, 0xf8, 0xf1, 0xc0, 0x04, 0x00, 0x00, } diff --git a/pkg/rpc/heartbeat.proto b/pkg/rpc/heartbeat.proto index 0427588a6154..6b7050c9c47d 100644 --- a/pkg/rpc/heartbeat.proto +++ b/pkg/rpc/heartbeat.proto @@ -42,18 +42,18 @@ message PingRequest { // The last offset the client measured with the server. optional RemoteOffset offset = 2 [(gogoproto.nullable) = false]; // The address of the client. - optional string addr = 3 [(gogoproto.nullable) = false]; + optional string origin_addr = 3 [(gogoproto.nullable) = false]; // The configured maximum clock offset (in nanoseconds) on the server. - optional int64 max_offset_nanos = 4 [(gogoproto.nullable) = false]; + optional int64 origin_max_offset_nanos = 4 [(gogoproto.nullable) = false]; // Cluster ID to prevent connections between nodes in different clusters. - optional bytes cluster_id = 5 [ + optional bytes origin_cluster_id = 5 [ (gogoproto.customname) = "ClusterID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; optional roachpb.Version server_version = 6 [(gogoproto.nullable) = false]; // Node ID to prevent connections from being misrouted to an invalid node inside the cluster. - optional int32 node_id = 7 [ + optional int32 target_node_id = 7 [ (gogoproto.nullable) = false, - (gogoproto.customname) = "NodeID", + (gogoproto.customname) = "TargetNodeID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; } diff --git a/pkg/rpc/heartbeat_test.go b/pkg/rpc/heartbeat_test.go index 6a3293e337a3..0b905bc771f8 100644 --- a/pkg/rpc/heartbeat_test.go +++ b/pkg/rpc/heartbeat_test.go @@ -172,10 +172,10 @@ func TestClockOffsetMismatch(t *testing.T) { hs.clusterID.Set(ctx, uuid.Nil) request := &PingRequest{ - Ping: "testManual", - Addr: "test", - MaxOffsetNanos: (500 * time.Millisecond).Nanoseconds(), - ServerVersion: st.Version.BinaryVersion(), + Ping: "testManual", + OriginAddr: "test", + OriginMaxOffsetNanos: (500 * time.Millisecond).Nanoseconds(), + ServerVersion: st.Version.BinaryVersion(), } response, err := hs.Ping(context.Background(), request) t.Fatalf("should not have reached but got response=%v err=%v", response, err) @@ -257,7 +257,7 @@ func TestNodeIDCompare(t *testing.T) { heartbeat.nodeID.Reset(td.serverNodeID) request := &PingRequest{ Ping: "testPing", - NodeID: td.clientNodeID, + TargetNodeID: td.clientNodeID, ServerVersion: st.Version.BinaryVersion(), } _, err := heartbeat.Ping(context.Background(), request) From 0ef988f4c02e085f99ca93e853bb453b4c056366 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 12:14:25 +0200 Subject: [PATCH 7/8] rpc: add and populate PingRequest.OriginNodeID Upcoming commits will augment Ping() to check the sender's node ID against a blocklist of decommissioned nodes. Release note: None --- pkg/rpc/context.go | 1 + pkg/rpc/heartbeat.pb.go | 119 ++++++++++++++++++++++++---------------- pkg/rpc/heartbeat.proto | 8 ++- 3 files changed, 81 insertions(+), 47 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index dd5febcba05b..a014e6e57498 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1133,6 +1133,7 @@ func (ctx *Context) runHeartbeat( // We re-mint the PingRequest to pick up any asynchronous update to clusterID. clusterID := ctx.ClusterID.Get() request := &PingRequest{ + OriginNodeID: ctx.NodeID.Get(), OriginAddr: ctx.Config.Addr, OriginMaxOffsetNanos: maxOffsetNanos, ClusterID: &clusterID, diff --git a/pkg/rpc/heartbeat.pb.go b/pkg/rpc/heartbeat.pb.go index 792627f37df5..5d854c93443c 100644 --- a/pkg/rpc/heartbeat.pb.go +++ b/pkg/rpc/heartbeat.pb.go @@ -49,7 +49,7 @@ type RemoteOffset struct { func (m *RemoteOffset) Reset() { *m = RemoteOffset{} } func (*RemoteOffset) ProtoMessage() {} func (*RemoteOffset) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_c48a0290adb87462, []int{0} + return fileDescriptor_heartbeat_565332a8a713c8ea, []int{0} } func (m *RemoteOffset) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -88,15 +88,18 @@ type PingRequest struct { // Cluster ID to prevent connections between nodes in different clusters. ClusterID *github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,5,opt,name=origin_cluster_id,json=originClusterId,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"origin_cluster_id,omitempty"` ServerVersion roachpb.Version `protobuf:"bytes,6,opt,name=server_version,json=serverVersion" json:"server_version"` - // Node ID to prevent connections from being misrouted to an invalid node inside the cluster. + // NodeID the originator of the request wishes to connect to. + // This helps prevent connections from being misrouted when addresses are reused. TargetNodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,7,opt,name=target_node_id,json=targetNodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"target_node_id"` + // NodeID of the originator of the PingRequest. + OriginNodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,8,opt,name=origin_node_id,json=originNodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"origin_node_id"` } func (m *PingRequest) Reset() { *m = PingRequest{} } func (m *PingRequest) String() string { return proto.CompactTextString(m) } func (*PingRequest) ProtoMessage() {} func (*PingRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_c48a0290adb87462, []int{1} + return fileDescriptor_heartbeat_565332a8a713c8ea, []int{1} } func (m *PingRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,7 +140,7 @@ func (m *PingResponse) Reset() { *m = PingResponse{} } func (m *PingResponse) String() string { return proto.CompactTextString(m) } func (*PingResponse) ProtoMessage() {} func (*PingResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_c48a0290adb87462, []int{2} + return fileDescriptor_heartbeat_565332a8a713c8ea, []int{2} } func (m *PingResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -418,6 +421,9 @@ func (m *PingRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x38 i++ i = encodeVarintHeartbeat(dAtA, i, uint64(m.TargetNodeID)) + dAtA[i] = 0x40 + i++ + i = encodeVarintHeartbeat(dAtA, i, uint64(m.OriginNodeID)) return i, nil } @@ -507,6 +513,7 @@ func (m *PingRequest) Size() (n int) { l = m.ServerVersion.Size() n += 1 + l + sovHeartbeat(uint64(l)) n += 1 + sovHeartbeat(uint64(m.TargetNodeID)) + n += 1 + sovHeartbeat(uint64(m.OriginNodeID)) return n } @@ -864,6 +871,25 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { break } } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OriginNodeID", wireType) + } + m.OriginNodeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHeartbeat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.OriginNodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipHeartbeat(dAtA[iNdEx:]) @@ -1167,47 +1193,48 @@ var ( ErrIntOverflowHeartbeat = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("rpc/heartbeat.proto", fileDescriptor_heartbeat_c48a0290adb87462) } +func init() { proto.RegisterFile("rpc/heartbeat.proto", fileDescriptor_heartbeat_565332a8a713c8ea) } -var fileDescriptor_heartbeat_c48a0290adb87462 = []byte{ - // 616 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x41, 0x4f, 0xd4, 0x40, - 0x14, 0x6e, 0xd9, 0x05, 0x65, 0xb6, 0x60, 0x1c, 0x09, 0x36, 0x8b, 0xe9, 0xe2, 0x26, 0xe8, 0x9e, - 0x5a, 0x83, 0x27, 0xf5, 0xc4, 0x42, 0xa2, 0x84, 0xb8, 0x98, 0x15, 0x38, 0x78, 0x69, 0x66, 0x3b, - 0x8f, 0x32, 0x81, 0xce, 0x94, 0xe9, 0x94, 0xe0, 0xd1, 0x7f, 0x60, 0x3c, 0x79, 0xf4, 0xe7, 0x70, - 0xe4, 0x48, 0x3c, 0x10, 0x5d, 0xfe, 0x82, 0x3f, 0xc0, 0x4c, 0xa7, 0x85, 0x82, 0x1c, 0x0c, 0xb7, - 0x37, 0xef, 0x7d, 0xef, 0xbd, 0xef, 0xbd, 0xef, 0x0d, 0x7a, 0x24, 0xd3, 0x28, 0xd8, 0x03, 0x22, - 0xd5, 0x08, 0x88, 0xf2, 0x53, 0x29, 0x94, 0xc0, 0x33, 0x91, 0x88, 0xf6, 0xa5, 0x20, 0xd1, 0x9e, - 0x2f, 0xd3, 0xa8, 0x3d, 0x5f, 0x98, 0xe9, 0x28, 0x48, 0x40, 0x11, 0x4a, 0x14, 0x31, 0xb0, 0xf6, - 0x5c, 0x2c, 0x62, 0x51, 0x98, 0x81, 0xb6, 0x8c, 0xb7, 0xfb, 0xc5, 0x46, 0xce, 0x10, 0x12, 0xa1, - 0x60, 0x73, 0x77, 0x37, 0x03, 0x85, 0x9f, 0xa0, 0x29, 0x51, 0x58, 0xae, 0xbd, 0x68, 0xf7, 0x1a, - 0xfd, 0xe6, 0xc9, 0x79, 0xc7, 0x1a, 0x96, 0x3e, 0xfc, 0x0c, 0xb5, 0x72, 0x1e, 0x81, 0x54, 0x84, - 0x71, 0xf5, 0xd9, 0x9d, 0xa8, 0x41, 0xea, 0x01, 0xbc, 0x84, 0x5a, 0x09, 0x90, 0x2c, 0x97, 0x40, - 0x43, 0xa2, 0xdc, 0x46, 0x0d, 0x87, 0xaa, 0xc0, 0x8a, 0x7a, 0xdd, 0xfc, 0xfe, 0xa3, 0x63, 0x75, - 0xff, 0x34, 0x50, 0xeb, 0x03, 0xe3, 0xf1, 0x10, 0x0e, 0x73, 0xc8, 0x14, 0x76, 0x51, 0x33, 0x65, - 0x3c, 0x2e, 0x08, 0x4c, 0x97, 0x59, 0x85, 0x07, 0xbf, 0xba, 0x24, 0xa7, 0x3b, 0xb7, 0x96, 0x17, - 0xfc, 0x6b, 0xb3, 0xfb, 0xf5, 0x49, 0x6e, 0x30, 0x5f, 0x42, 0x2d, 0x21, 0x59, 0xcc, 0x78, 0x48, - 0x28, 0x95, 0x05, 0xa3, 0xaa, 0x36, 0x32, 0x81, 0x15, 0x4a, 0x25, 0x7e, 0x83, 0x1e, 0x97, 0xb0, - 0x84, 0x1c, 0x87, 0x26, 0x37, 0xe4, 0x84, 0x8b, 0xcc, 0x6d, 0xd6, 0x86, 0x98, 0x33, 0xa0, 0xf7, - 0xe4, 0xd8, 0x34, 0x1b, 0x68, 0x04, 0x4e, 0xd1, 0xc3, 0x32, 0x39, 0x3a, 0xc8, 0x33, 0x05, 0x32, - 0x64, 0xd4, 0x9d, 0x5c, 0xb4, 0x7b, 0x4e, 0x7f, 0xed, 0xe7, 0x79, 0xe7, 0x65, 0xcc, 0xd4, 0x5e, - 0x3e, 0xf2, 0x23, 0x91, 0x04, 0x97, 0xbc, 0xe9, 0xe8, 0xca, 0x0e, 0xd2, 0xfd, 0x38, 0xc8, 0x15, - 0x3b, 0x08, 0xf2, 0x9c, 0x51, 0x7f, 0x7b, 0x7b, 0x7d, 0x6d, 0x7c, 0xde, 0x99, 0x5e, 0x35, 0xc5, - 0xd6, 0xd7, 0x86, 0x0f, 0x4c, 0xf9, 0xca, 0x41, 0xf1, 0x5b, 0x34, 0x9b, 0x81, 0x3c, 0x02, 0x19, - 0x1e, 0x81, 0xcc, 0x98, 0xe0, 0xee, 0x54, 0xb1, 0x98, 0x76, 0x7d, 0x31, 0xe6, 0x1e, 0xfc, 0x1d, - 0x83, 0x28, 0x27, 0x98, 0x31, 0x79, 0xa5, 0x13, 0x1f, 0xa2, 0x59, 0x45, 0x64, 0xac, 0x87, 0x15, - 0x14, 0x34, 0xef, 0x7b, 0x8b, 0x76, 0x6f, 0xb2, 0xbf, 0xa1, 0xc1, 0xff, 0xcd, 0xbd, 0x6a, 0x35, - 0x10, 0x14, 0x0a, 0xee, 0xce, 0x56, 0x51, 0xd4, 0xbc, 0x87, 0x8e, 0xba, 0x7a, 0xd1, 0xee, 0xb7, - 0x09, 0xe4, 0x18, 0xd9, 0xb3, 0x54, 0xf0, 0x0c, 0x0a, 0xdd, 0xc5, 0x3f, 0xba, 0x0b, 0x1e, 0x6b, - 0xf1, 0xca, 0x31, 0x15, 0x4b, 0xe0, 0xda, 0xd9, 0x21, 0x13, 0xd8, 0x62, 0x09, 0xdc, 0xb2, 0x8d, - 0xc6, 0xdd, 0xb6, 0xf1, 0x1c, 0x39, 0x95, 0x82, 0x9c, 0x24, 0x50, 0x48, 0x5f, 0x31, 0x6a, 0x95, - 0x91, 0x01, 0x49, 0x00, 0x6f, 0xa2, 0xa7, 0x94, 0x65, 0x64, 0x74, 0x00, 0x61, 0x3d, 0x41, 0xf7, - 0x67, 0xbb, 0x2c, 0x22, 0x4a, 0x93, 0xd0, 0x17, 0x70, 0xbf, 0xcc, 0xf6, 0x4a, 0xf8, 0xea, 0x55, - 0x91, 0x9d, 0x1a, 0x76, 0x79, 0x80, 0xa6, 0xdf, 0x55, 0xff, 0x1b, 0xaf, 0xa0, 0xa6, 0x5e, 0x10, - 0x6e, 0xdf, 0x38, 0xf3, 0xda, 0x67, 0x69, 0x2f, 0xdc, 0x1a, 0x33, 0x1b, 0xed, 0x5a, 0xcb, 0x80, - 0xe6, 0xb7, 0x20, 0x53, 0x8c, 0xc7, 0x97, 0x65, 0x3f, 0x2a, 0x09, 0x24, 0xc1, 0x1b, 0x08, 0x69, - 0x6c, 0xf9, 0xba, 0x7b, 0x8b, 0x9e, 0xfd, 0xc2, 0xee, 0x2f, 0x9d, 0xfc, 0xf6, 0xac, 0x93, 0xb1, - 0x67, 0x9f, 0x8e, 0x3d, 0xfb, 0x6c, 0xec, 0xd9, 0xbf, 0xc6, 0x9e, 0xfd, 0xf5, 0xc2, 0xb3, 0x4e, - 0x2f, 0x3c, 0xeb, 0xec, 0xc2, 0xb3, 0x3e, 0x35, 0x64, 0x1a, 0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, - 0xfb, 0xa1, 0xf8, 0xf1, 0xc0, 0x04, 0x00, 0x00, +var fileDescriptor_heartbeat_565332a8a713c8ea = []byte{ + // 635 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x41, 0x4f, 0xd4, 0x40, + 0x14, 0x6e, 0xd9, 0x05, 0x61, 0x76, 0xc1, 0x38, 0x12, 0x6c, 0x16, 0xd3, 0xc5, 0x4d, 0xd0, 0x3d, + 0xb5, 0x06, 0x4f, 0xea, 0x89, 0x85, 0x44, 0x09, 0x71, 0x31, 0x2b, 0x70, 0xf0, 0xd2, 0xcc, 0x76, + 0x1e, 0x65, 0x02, 0xed, 0x94, 0xe9, 0x94, 0xe0, 0xd1, 0x7f, 0x60, 0x3c, 0x79, 0xf4, 0xe7, 0x70, + 0xe4, 0x48, 0x3c, 0x10, 0x5d, 0x0e, 0xfe, 0x0d, 0x33, 0x9d, 0x29, 0x5b, 0x90, 0x83, 0x21, 0xde, + 0xde, 0xbc, 0xf7, 0xbd, 0xf7, 0x7d, 0xef, 0xed, 0xd7, 0x45, 0x0f, 0x45, 0x1a, 0xfa, 0xfb, 0x40, + 0x84, 0x1c, 0x02, 0x91, 0x5e, 0x2a, 0xb8, 0xe4, 0x78, 0x36, 0xe4, 0xe1, 0x81, 0xe0, 0x24, 0xdc, + 0xf7, 0x44, 0x1a, 0xb6, 0x16, 0x8a, 0x30, 0x1d, 0xfa, 0x31, 0x48, 0x42, 0x89, 0x24, 0x1a, 0xd6, + 0x9a, 0x8f, 0x78, 0xc4, 0x8b, 0xd0, 0x57, 0x91, 0xce, 0x76, 0x3e, 0xdb, 0xa8, 0x39, 0x80, 0x98, + 0x4b, 0xd8, 0xda, 0xdb, 0xcb, 0x40, 0xe2, 0xc7, 0x68, 0x8a, 0x17, 0x91, 0x63, 0x2f, 0xd9, 0xdd, + 0x5a, 0xaf, 0x7e, 0x7a, 0xd1, 0xb6, 0x06, 0x26, 0x87, 0x9f, 0xa2, 0x46, 0x9e, 0x84, 0x20, 0x24, + 0x61, 0x89, 0xfc, 0xe4, 0x4c, 0x54, 0x20, 0xd5, 0x02, 0x5e, 0x46, 0x8d, 0x18, 0x48, 0x96, 0x0b, + 0xa0, 0x01, 0x91, 0x4e, 0xad, 0x82, 0x43, 0x65, 0x61, 0x55, 0xbe, 0xaa, 0x7f, 0xfb, 0xde, 0xb6, + 0x3a, 0xbf, 0xeb, 0xa8, 0xf1, 0x9e, 0x25, 0xd1, 0x00, 0x8e, 0x72, 0xc8, 0x24, 0x76, 0x50, 0x3d, + 0x65, 0x49, 0x54, 0x08, 0x98, 0x31, 0x5d, 0x45, 0x06, 0xbf, 0xbc, 0x12, 0xa7, 0x98, 0x1b, 0x2b, + 0x8b, 0xde, 0xb5, 0xdd, 0xbd, 0xea, 0x26, 0x37, 0x94, 0x2f, 0xa3, 0x06, 0x17, 0x2c, 0x62, 0x49, + 0x40, 0x28, 0x15, 0x85, 0xa2, 0x72, 0x36, 0xd2, 0x85, 0x55, 0x4a, 0x05, 0x7e, 0x8d, 0x1e, 0x19, + 0x58, 0x4c, 0x4e, 0x02, 0xdd, 0x1b, 0x24, 0x24, 0xe1, 0x99, 0x53, 0xaf, 0x2c, 0x31, 0xaf, 0x41, + 0xef, 0xc8, 0x89, 0x26, 0xeb, 0x2b, 0x04, 0x4e, 0xd1, 0x03, 0xd3, 0x1c, 0x1e, 0xe6, 0x99, 0x04, + 0x11, 0x30, 0xea, 0x4c, 0x2e, 0xd9, 0xdd, 0x66, 0x6f, 0xfd, 0xc7, 0x45, 0xfb, 0x45, 0xc4, 0xe4, + 0x7e, 0x3e, 0xf4, 0x42, 0x1e, 0xfb, 0x57, 0xba, 0xe9, 0x70, 0x1c, 0xfb, 0xe9, 0x41, 0xe4, 0xe7, + 0x92, 0x1d, 0xfa, 0x79, 0xce, 0xa8, 0xb7, 0xb3, 0xb3, 0xb1, 0x3e, 0xba, 0x68, 0xcf, 0xac, 0xe9, + 0x61, 0x1b, 0xeb, 0x83, 0xfb, 0x7a, 0x7c, 0x99, 0xa0, 0xf8, 0x0d, 0x9a, 0xcb, 0x40, 0x1c, 0x83, + 0x08, 0x8e, 0x41, 0x64, 0x8c, 0x27, 0xce, 0x54, 0x71, 0x98, 0x56, 0xf5, 0x30, 0xda, 0x0f, 0xde, + 0xae, 0x46, 0x98, 0x0d, 0x66, 0x75, 0x9f, 0x49, 0xe2, 0x23, 0x34, 0x27, 0x89, 0x88, 0xd4, 0xb2, + 0x9c, 0x82, 0xd2, 0x7d, 0x6f, 0xc9, 0xee, 0x4e, 0xf6, 0x36, 0x15, 0xf8, 0x9f, 0xb5, 0x97, 0x54, + 0x7d, 0x4e, 0xa1, 0xd0, 0xde, 0xdc, 0x2e, 0x86, 0xea, 0xf7, 0xa0, 0x29, 0xc7, 0x2f, 0xaa, 0x28, + 0xcd, 0xb5, 0x4a, 0xca, 0xe9, 0xff, 0x42, 0xb9, 0x55, 0x0c, 0x2d, 0x29, 0xf9, 0xf8, 0x45, 0x3b, + 0x5f, 0x27, 0x50, 0x53, 0x3b, 0x2d, 0x4b, 0x79, 0x92, 0x41, 0x61, 0x35, 0xfe, 0x97, 0xd5, 0x78, + 0x12, 0x29, 0xbf, 0x98, 0xcb, 0x4a, 0x16, 0xc3, 0x35, 0xa7, 0x23, 0x5d, 0xd8, 0x66, 0x31, 0xdc, + 0xf2, 0x03, 0xd4, 0xee, 0xf6, 0x03, 0x3c, 0x43, 0xcd, 0xd2, 0x34, 0x09, 0x89, 0xa1, 0x70, 0x5b, + 0xa9, 0xa8, 0x61, 0x2a, 0x7d, 0x12, 0x03, 0xde, 0x42, 0x4f, 0x28, 0xcb, 0xc8, 0xf0, 0x10, 0x82, + 0x6a, 0x83, 0xe2, 0x67, 0x7b, 0x2c, 0x24, 0x52, 0x89, 0x50, 0xa6, 0x9b, 0x36, 0xdd, 0xae, 0x81, + 0xaf, 0x8d, 0x87, 0xec, 0x56, 0xb0, 0x2b, 0x7d, 0x34, 0xf3, 0xb6, 0xfc, 0x4b, 0xc1, 0xab, 0xa8, + 0xae, 0x0e, 0x84, 0x5b, 0x37, 0xbe, 0xac, 0xca, 0xf7, 0xd9, 0x5a, 0xbc, 0xb5, 0xa6, 0x2f, 0xda, + 0xb1, 0x56, 0x00, 0x2d, 0x6c, 0x43, 0x26, 0x59, 0x12, 0x5d, 0x8d, 0xfd, 0x20, 0x05, 0x90, 0x18, + 0x6f, 0x22, 0xa4, 0xb0, 0xe6, 0x75, 0x77, 0x8a, 0xae, 0xfd, 0xdc, 0xee, 0x2d, 0x9f, 0xfe, 0x72, + 0xad, 0xd3, 0x91, 0x6b, 0x9f, 0x8d, 0x5c, 0xfb, 0x7c, 0xe4, 0xda, 0x3f, 0x47, 0xae, 0xfd, 0xe5, + 0xd2, 0xb5, 0xce, 0x2e, 0x5d, 0xeb, 0xfc, 0xd2, 0xb5, 0x3e, 0xd6, 0x44, 0x1a, 0xfe, 0x09, 0x00, + 0x00, 0xff, 0xff, 0x72, 0x69, 0xb3, 0x22, 0x33, 0x05, 0x00, 0x00, } diff --git a/pkg/rpc/heartbeat.proto b/pkg/rpc/heartbeat.proto index 6b7050c9c47d..7da9f6f4810d 100644 --- a/pkg/rpc/heartbeat.proto +++ b/pkg/rpc/heartbeat.proto @@ -50,11 +50,17 @@ message PingRequest { (gogoproto.customname) = "ClusterID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; optional roachpb.Version server_version = 6 [(gogoproto.nullable) = false]; - // Node ID to prevent connections from being misrouted to an invalid node inside the cluster. + // NodeID the originator of the request wishes to connect to. + // This helps prevent connections from being misrouted when addresses are reused. optional int32 target_node_id = 7 [ (gogoproto.nullable) = false, (gogoproto.customname) = "TargetNodeID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + // NodeID of the originator of the PingRequest. + optional int32 origin_node_id = 8 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "OriginNodeID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; } // A PingResponse contains the echoed ping request string. From 8552175a907be30c5a41982eedc838854fc4b378 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 29 Sep 2020 13:09:47 +0200 Subject: [PATCH 8/8] rpc: introduce Context.On{Handle,Send}Ping These two methods are invoked before sending and while handling PingRequests. They'll be used to prevent decommissioned nodes from communicating with the cluster. Release note: None --- pkg/rpc/context.go | 50 +++++++++++++++++++++++++++++--------- pkg/rpc/context_test.go | 54 +++++++++++++++++++++++++++++++++++++++++ pkg/rpc/heartbeat.go | 8 ++++++ pkg/server/server.go | 9 ++++++- 4 files changed, 109 insertions(+), 12 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index a014e6e57498..b74495c354d3 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -233,16 +233,7 @@ func NewServer(ctx *Context, opts ...ServerOption) *grpc.Server { grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...)) s := grpc.NewServer(grpcOpts...) - RegisterHeartbeatServer(s, &HeartbeatService{ - clock: ctx.Clock, - remoteClockMonitor: ctx.RemoteClocks, - clusterName: ctx.ClusterName(), - disableClusterNameVerification: ctx.Config.DisableClusterNameVerification, - clusterID: &ctx.ClusterID, - nodeID: &ctx.NodeID, - settings: ctx.Settings, - testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer, - }) + RegisterHeartbeatServer(s, ctx.NewHeartbeatService()) return s } @@ -385,6 +376,14 @@ type ContextOptions struct { Clock *hlc.Clock Stopper *stop.Stopper Settings *cluster.Settings + // OnHandlePing is called when handling a PingRequest, after + // preliminary checks but before recording clock offset information. + // + // It can inject an error. + OnHandlePing func(*PingRequest) error + // OnSendPing intercepts outgoing PingRequests. It may inject an + // error. + OnSendPing func(*PingRequest) error Knobs ContextTestingKnobs } @@ -404,6 +403,11 @@ func (c ContextOptions) validate() error { if c.Settings == nil { return errors.New("Settings must be set") } + + // NB: OnSendPing and OnHandlePing default to noops. + // This is used both for testing and the cli. + _, _ = c.OnSendPing, c.OnHandlePing + return nil } @@ -1141,11 +1145,20 @@ func (ctx *Context) runHeartbeat( ServerVersion: ctx.Settings.Version.BinaryVersion(), } + interceptor := func(*PingRequest) error { return nil } + if fn := ctx.OnSendPing; fn != nil { + interceptor = fn + } + var response *PingResponse sendTime := ctx.Clock.PhysicalTime() - ping := func(goCtx context.Context) (err error) { + ping := func(goCtx context.Context) error { // NB: We want the request to fail-fast (the default), otherwise we won't // be notified of transport failures. + if err := interceptor(request); err != nil { + return err + } + var err error response, err = heartbeatClient.Ping(goCtx, request) return err } @@ -1219,3 +1232,18 @@ func (ctx *Context) runHeartbeat( heartbeatTimer.Reset(ctx.Config.RPCHeartbeatInterval) } } + +// NewHeartbeatService returns a HeartbeatService initialized from the Context. +func (ctx *Context) NewHeartbeatService() *HeartbeatService { + return &HeartbeatService{ + clock: ctx.Clock, + remoteClockMonitor: ctx.RemoteClocks, + clusterName: ctx.ClusterName(), + disableClusterNameVerification: ctx.Config.DisableClusterNameVerification, + clusterID: &ctx.ClusterID, + nodeID: &ctx.NodeID, + settings: ctx.Settings, + onHandlePing: ctx.OnHandlePing, + testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer, + } +} diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 9c93120de8d5..3069e47c5f1f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -162,6 +162,60 @@ func TestHeartbeatCB(t *testing.T) { }) } +// TestPingInterceptors checks that OnSendPing and OnHandlePing can inject errors. +func TestPingInterceptors(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + const ( + blockedTargetNodeID = 5 + blockedOriginNodeID = 123 + ) + + errBoomSend := errors.Handled(errors.New("boom due to onSendPing")) + errBoomRecv := status.Error(codes.FailedPrecondition, "boom due to onHandlePing") + opts := ContextOptions{ + TenantID: roachpb.SystemTenantID, + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Config: testutils.NewNodeTestBaseContext(), + Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond), + Stopper: stop.NewStopper(), + Settings: cluster.MakeTestingClusterSettings(), + OnSendPing: func(req *PingRequest) error { + if req.TargetNodeID == blockedTargetNodeID { + return errBoomSend + } + return nil + }, + OnHandlePing: func(req *PingRequest) error { + if req.OriginNodeID == blockedOriginNodeID { + return errBoomRecv + } + return nil + }, + } + defer opts.Stopper.Stop(ctx) + + rpcCtx := NewContext(opts) + { + _, err := rpcCtx.GRPCDialNode("unused:1234", 5, SystemClass).Connect(ctx) + require.Equal(t, errBoomSend, errors.Cause(err)) + } + + s := newTestServer(t, rpcCtx) + RegisterHeartbeatServer(s, rpcCtx.NewHeartbeatService()) + rpcCtx.NodeID.Set(ctx, blockedOriginNodeID) + ln, err := netutil.ListenAndServeGRPC(rpcCtx.Stopper, s, util.TestAddr) + if err != nil { + t.Fatal(err) + } + remoteAddr := ln.Addr().String() + { + _, err := rpcCtx.GRPCDialNode(remoteAddr, blockedOriginNodeID, SystemClass).Connect(ctx) + require.Equal(t, errBoomRecv, errors.Cause(err)) + } +} + var _ roachpb.InternalServer = &internalServer{} type internalServer struct{} diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 8d8c6e1c0335..10df41746f8f 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -53,6 +53,8 @@ type HeartbeatService struct { clusterName string disableClusterNameVerification bool + onHandlePing func(*PingRequest) error // see ContextOptions.OnHandlePing + // TestingAllowNamedRPCToAnonymousServer, when defined (in tests), // disables errors in case a heartbeat requests a specific node ID but // the remote node doesn't have a node ID yet. This testing knob is @@ -152,6 +154,12 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR "does not match that of node %s (%s)", mo, args.OriginAddr, amo)) } + if fn := hs.onHandlePing; fn != nil { + if err := fn(args); err != nil { + return nil, err + } + } + serverOffset := args.Offset // The server offset should be the opposite of the client offset. serverOffset.Offset = -serverOffset.Offset diff --git a/pkg/server/server.go b/pkg/server/server.go index cde7e0d954c3..56cd46508aa6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -264,7 +264,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { Clock: clock, Stopper: stopper, Settings: cfg.Settings, - } + OnSendPing: func(req *rpc.PingRequest) error { + // TODO(tbg): hook this up to a check for decommissioned nodes. + return nil + }, + OnHandlePing: func(req *rpc.PingRequest) error { + // TODO(tbg): hook this up to a check for decommissioned nodes. + return nil + }} if knobs := cfg.TestingKnobs.Server; knobs != nil { serverKnobs := knobs.(*TestingKnobs) rpcCtxOpts.Knobs = serverKnobs.ContextTestingKnobs