diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 1942ff5e613c..1758d26a686b 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -906,10 +906,11 @@ func (m *multiTestContext) addStore(idx int) { ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer} m.populateDB(idx, cfg.Settings, stopper) nlActive, nlRenewal := cfg.NodeLivenessDurations() - m.nodeLivenesses[idx] = kvserver.NewNodeLiveness( - ambient, m.clocks[idx], m.dbs[idx], m.gossips[idx], - nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval, - ) + m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: ambient, Clock: m.clocks[idx], DB: m.dbs[idx], Gossip: m.gossips[idx], + LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings, + HistogramWindowInterval: metric.TestSampleInterval, + }) m.populateStorePool(idx, cfg, m.nodeLivenesses[idx]) cfg.DB = m.dbs[idx] cfg.NodeLiveness = m.nodeLivenesses[idx] @@ -1018,15 +1019,19 @@ func (m *multiTestContext) addStore(idx int) { m.t.Fatal(err) } } - m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) { - now := clock.Now() - if err := store.WriteLastUpTimestamp(ctx, now); err != nil { - log.Warningf(ctx, "%v", err) - } - ran.Do(func() { - close(ran.ch) - }) - }) + m.nodeLivenesses[idx].Start(ctx, + kvserver.NodeLivenessStartOptions{ + Stopper: stopper, + Engines: m.engines[idx : idx+1], + OnSelfLive: func(ctx context.Context) { + now := clock.Now() + if err := store.WriteLastUpTimestamp(ctx, now); err != nil { + log.Warningf(ctx, "%v", err) + } + ran.Do(func() { + close(ran.ch) + }) + }}) store.WaitForInit() @@ -1095,10 +1100,11 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { cfg := m.makeStoreConfig(i) m.populateDB(i, m.storeConfig.Settings, stopper) nlActive, nlRenewal := cfg.NodeLivenessDurations() - m.nodeLivenesses[i] = kvserver.NewNodeLiveness( - log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, m.clocks[i], m.dbs[i], - m.gossips[i], nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval, - ) + m.nodeLivenesses[i] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, Clock: m.clocks[i], DB: m.dbs[i], + Gossip: m.gossips[i], LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings, + HistogramWindowInterval: metric.TestSampleInterval, + }) m.populateStorePool(i, cfg, m.nodeLivenesses[i]) cfg.DB = m.dbs[i] cfg.NodeLiveness = m.nodeLivenesses[i] @@ -1115,11 +1121,15 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Reset() m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Reset() m.mu.Unlock() - cfg.NodeLiveness.StartHeartbeat(ctx, stopper, m.engines[i:i+1], func(ctx context.Context) { - now := m.clocks[i].Now() - if err := store.WriteLastUpTimestamp(ctx, now); err != nil { - log.Warningf(ctx, "%v", err) - } + cfg.NodeLiveness.Start(ctx, kvserver.NodeLivenessStartOptions{ + Stopper: stopper, + Engines: m.engines[i : i+1], + OnSelfLive: func(ctx context.Context) { + now := m.clocks[i].Now() + if err := store.WriteLastUpTimestamp(ctx, now); err != nil { + log.Warningf(ctx, "%v", err) + } + }, }) } diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 670050169394..a75d872b9f5f 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -176,13 +176,14 @@ type NodeLiveness struct { // heartbeatPaused contains an atomically-swapped number representing a bool // (1 or 0). heartbeatToken is a channel containing a token which is taken // when heartbeating or when pausing the heartbeat. Used for testing. - heartbeatPaused uint32 - heartbeatToken chan struct{} - metrics LivenessMetrics + heartbeatPaused uint32 + heartbeatToken chan struct{} + metrics LivenessMetrics + onNodeDecommissioned func(kvserverpb.Liveness) // noop if nil mu struct { syncutil.RWMutex - callbacks []IsLiveCallback + onIsLive []IsLiveCallback // see NodeLivenessOptions.OnSelfLive // nodes is an in-memory cache of liveness records that NodeLiveness // knows about (having learnt of them through gossip or through KV). // It's a look-aside cache, and is accessed primarily through @@ -205,7 +206,7 @@ type NodeLiveness struct { // - Update the liveness record in KV // - Add the updated record into this cache (see `maybeUpdate`) // - // (See `StartHeartbeat` for an example of this pattern.) + // (See `Start` for an example of this pattern.) // // What we want instead is a bit simpler: // @@ -216,11 +217,11 @@ type NodeLiveness struct { // // More concretely, we want `getLivenessRecordFromKV` to be tucked away // within `getLivenessLocked`. - nodes map[roachpb.NodeID]LivenessRecord - heartbeatCallback HeartbeatCallback + nodes map[roachpb.NodeID]LivenessRecord + onSelfLive HeartbeatCallback // set in Start() // Before heartbeating, we write to each of these engines to avoid // maintaining liveness when a local disks is stalled. - engines []storage.Engine + engines []storage.Engine // set in Start() } } @@ -235,29 +236,44 @@ type LivenessRecord struct { raw []byte } +// NodeLivenessOptions is the input to NewNodeLiveness. +// +// Note that there is yet another struct, NodeLivenessStartOptions, which +// is supplied when the instance is started. This is necessary as during +// server startup, some inputs can only be constructed at Start time. The +// separation has grown organically and various options could in principle +// be moved back and forth. +type NodeLivenessOptions struct { + AmbientCtx log.AmbientContext + Settings *cluster.Settings + Gossip *gossip.Gossip + Clock *hlc.Clock + DB *kv.DB + LivenessThreshold time.Duration + RenewalDuration time.Duration + HistogramWindowInterval time.Duration + // OnNodeDecommissioned is invoked whenever the instance learns that a + // node was permanently removed from the cluster. This method must be + // idempotent as it may be invoked multiple times and defaults to a + // noop. + OnNodeDecommissioned func(kvserverpb.Liveness) +} + // NewNodeLiveness returns a new instance of NodeLiveness configured // with the specified gossip instance. -func NewNodeLiveness( - ambient log.AmbientContext, - clock *hlc.Clock, - db *kv.DB, - g *gossip.Gossip, - livenessThreshold time.Duration, - renewalDuration time.Duration, - st *cluster.Settings, - histogramWindow time.Duration, -) *NodeLiveness { +func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { nl := &NodeLiveness{ - ambientCtx: ambient, - clock: clock, - db: db, - gossip: g, - livenessThreshold: livenessThreshold, - heartbeatInterval: livenessThreshold - renewalDuration, - selfSem: make(chan struct{}, 1), - st: st, - otherSem: make(chan struct{}, 1), - heartbeatToken: make(chan struct{}, 1), + ambientCtx: opts.AmbientCtx, + clock: opts.Clock, + db: opts.DB, + gossip: opts.Gossip, + livenessThreshold: opts.LivenessThreshold, + heartbeatInterval: opts.LivenessThreshold - opts.RenewalDuration, + selfSem: make(chan struct{}, 1), + st: opts.Settings, + otherSem: make(chan struct{}, 1), + heartbeatToken: make(chan struct{}, 1), + onNodeDecommissioned: opts.OnNodeDecommissioned, } nl.metrics = LivenessMetrics{ LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes), @@ -265,11 +281,16 @@ func NewNodeLiveness( HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses), HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures), EpochIncrements: metric.NewCounter(metaEpochIncrements), - HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, histogramWindow), + HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, opts.HistogramWindowInterval), } nl.mu.nodes = make(map[roachpb.NodeID]LivenessRecord) nl.heartbeatToken <- struct{}{} + // NB: we should consider moving this registration to .Start() once we + // have ensured that nobody uses the server's KV client (kv.DB) before + // nl.Start() is invoked. At the time of writing this invariant does + // not hold (which is a problem, since the node itself won't be live + // at this point, and requests routed to it will hang). livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) nl.gossip.RegisterCallback(livenessRegex, nl.livenessGossipUpdate) @@ -499,7 +520,7 @@ type livenessUpdate struct { // given node ID. This is typically used when adding a new node to a running // cluster, or when bootstrapping a cluster through a given node. // -// This is a pared down version of StartHeartbeat; it exists only to durably +// This is a pared down version of Start; it exists only to durably // persist a liveness to record the node's existence. Nodes will heartbeat their // records after starting up, and incrementing to epoch=1 when doing so, at // which point we'll set an appropriate expiration timestamp, gossip the @@ -619,32 +640,39 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { return liveness.IsLive(nl.clock.Now().GoTime()), nil } -// StartHeartbeat starts a periodic heartbeat to refresh this node's last +// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`. +type NodeLivenessStartOptions struct { + Stopper *stop.Stopper + Engines []storage.Engine + // OnSelfLive is invoked after every successful heartbeat + // of the local liveness instance's heartbeat loop. + OnSelfLive HeartbeatCallback +} + +// Start starts a periodic heartbeat to refresh this node's last // heartbeat in the node liveness table. The optionally provided -// HeartbeatCallback will be invoked whenever this node updates its own -// liveness. The slice of engines will be written to before each heartbeat to -// avoid maintaining liveness in the presence of disk stalls. -func (nl *NodeLiveness) StartHeartbeat( - ctx context.Context, stopper *stop.Stopper, engines []storage.Engine, alive HeartbeatCallback, -) { - log.VEventf(ctx, 1, "starting liveness heartbeat") +// HeartbeatCallback will be invoked whenever this node updates its +// own liveness. The slice of engines will be written to before each +// heartbeat to avoid maintaining liveness in the presence of disk stalls. +func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) { + log.VEventf(ctx, 1, "starting node liveness instance") retryOpts := base.DefaultRetryOptions() - retryOpts.Closer = stopper.ShouldQuiesce() + retryOpts.Closer = opts.Stopper.ShouldQuiesce() - if len(engines) == 0 { + if len(opts.Engines) == 0 { // Avoid silently forgetting to pass the engines. It happened before. log.Fatalf(ctx, "must supply at least one engine") } nl.mu.Lock() - nl.mu.heartbeatCallback = alive - nl.mu.engines = engines + nl.mu.onSelfLive = opts.OnSelfLive + nl.mu.engines = opts.Engines nl.mu.Unlock() - stopper.RunWorker(ctx, func(context.Context) { + opts.Stopper.RunWorker(ctx, func(context.Context) { ambient := nl.ambientCtx ambient.AddLogTag("liveness-hb", nil) - ctx, cancel := stopper.WithCancelOnStop(context.Background()) + ctx, cancel := opts.Stopper.WithCancelOnStop(context.Background()) defer cancel() ctx, sp := ambient.AnnotateCtxWithSpan(ctx, "liveness heartbeat loop") defer sp.Finish() @@ -655,7 +683,7 @@ func (nl *NodeLiveness) StartHeartbeat( for { select { case <-nl.heartbeatToken: - case <-stopper.ShouldStop(): + case <-opts.Stopper.ShouldStop(): return } // Give the context a timeout approximately as long as the time we @@ -692,7 +720,7 @@ func (nl *NodeLiveness) StartHeartbeat( nl.heartbeatToken <- struct{}{} select { case <-ticker.C: - case <-stopper.ShouldStop(): + case <-opts.Stopper.ShouldStop(): return } } @@ -726,7 +754,7 @@ func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() { } // PauseSynchronousHeartbeatsForTest disables all node liveness -// heartbeats triggered from outside the normal StartHeartbeat loop. +// heartbeats triggered from outside the normal Start loop. // Returns a closure to call to re-enable synchronous heartbeats. Only // safe for use in tests. func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() { @@ -739,7 +767,7 @@ func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() { } // PauseAllHeartbeatsForTest disables all node liveness heartbeats, -// including those triggered from outside the normal StartHeartbeat +// including those triggered from outside the normal Start // loop. Returns a closure to call to re-enable heartbeats. Only safe // for use in tests. func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() { @@ -769,7 +797,7 @@ var errNodeAlreadyLive = errors.New("node already live") // TODO(bdarnell): Fix error semantics here. // // This method is rarely called directly; heartbeats are normally sent -// by the StartHeartbeat loop. +// by the Start loop. // TODO(bdarnell): Should we just remove this synchronous heartbeat completely? func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness kvserverpb.Liveness) error { return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */) @@ -1113,7 +1141,7 @@ func (nl *NodeLiveness) Metrics() LivenessMetrics { func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { nl.mu.Lock() defer nl.mu.Unlock() - nl.mu.callbacks = append(nl.mu.callbacks, cb) + nl.mu.onIsLive = append(nl.mu.onIsLive, cb) } // updateLiveness does a conditional put on the node liveness record for the @@ -1237,7 +1265,7 @@ func (nl *NodeLiveness) updateLivenessAttempt( } nl.mu.RLock() - cb := nl.mu.heartbeatCallback + cb := nl.mu.onSelfLive nl.mu.RUnlock() if cb != nil { cb(ctx) @@ -1261,10 +1289,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) } - var callbacks []IsLiveCallback + var onIsLive []IsLiveCallback if shouldReplace { nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec - callbacks = append(callbacks, nl.mu.callbacks...) + onIsLive = append(onIsLive, nl.mu.onIsLive...) } nl.mu.Unlock() @@ -1274,10 +1302,13 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness now := nl.clock.Now().GoTime() if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { - for _, fn := range callbacks { + for _, fn := range onIsLive { fn(newLivenessRec.Liveness) } } + if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil { + nl.onNodeDecommissioned(newLivenessRec.Liveness) + } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 5c94867097e0..0ea368497c94 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -15,6 +15,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync/atomic" "testing" "time" @@ -41,6 +42,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/proto" + "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -1190,3 +1192,65 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { t.Fatal("no change committed") } } + +func TestNodeLivenessDecommissionedCallback(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + var cb struct { + syncutil.Mutex + m map[roachpb.NodeID]bool // id -> decommissioned + } + + tArgs := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + OnDecommissionedCallback: func(rec kvserverpb.Liveness) { + cb.Lock() + if cb.m == nil { + cb.m = map[roachpb.NodeID]bool{} + } + cb.m[rec.NodeID] = rec.Membership == kvserverpb.MembershipStatus_DECOMMISSIONED + cb.Unlock() + + }, + }, + }, + } + args := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // for speed + ServerArgs: tArgs, + } + tc := testcluster.NewTestCluster(t, 3, args) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + + nl1 := tc.Servers[0].NodeLiveness().(*kvserver.NodeLiveness) + + // Make sure the callback doesn't fire willy-nilly... + func() { + chg, err := nl1.SetMembershipStatus(ctx, 2, kvserverpb.MembershipStatus_DECOMMISSIONING) + require.NoError(t, err) + require.True(t, chg) + cb.Lock() + defer cb.Unlock() + require.Zero(t, cb.m) + }() + + // ... but only when a node actually gets decommissioned. + { + chg, err := nl1.SetMembershipStatus(ctx, 2, kvserverpb.MembershipStatus_DECOMMISSIONED) + require.NoError(t, err) + require.True(t, chg) + testutils.SucceedsSoon(t, func() error { + cb.Lock() + sl := pretty.Diff(map[roachpb.NodeID]bool{2: true}, cb.m) + cb.Unlock() + if len(sl) > 0 { + return errors.Errorf("diff(exp,act) = %s", strings.Join(sl, "\n")) + } + return nil + }) + + } +} diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_bootstrap.go index facd19a247aa..a9a09e8ffed5 100644 --- a/pkg/kv/kvserver/store_bootstrap.go +++ b/pkg/kv/kvserver/store_bootstrap.go @@ -114,8 +114,8 @@ func WriteInitialClusterData( // We start off at epoch=0; when nodes heartbeat their liveness records for // the first time it'll get incremented to epoch=1 [2]. // - // [1]: See `CreateLivenessRecord` and usages for where that happens. - // [2]: See `StartHeartbeat` for where that happens. + // [1]: See `(*NodeLiveness).CreateLivenessRecord` and usages for where that happens. + // [2]: See `(*NodeLiveness).Start` for where that happens. livenessRecord := kvserverpb.Liveness{NodeID: 1, Epoch: 0} if err := livenessVal.SetProto(&livenessRecord); err != nil { return err diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index c4076fa5fade..9bfbd132532e 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -233,16 +233,7 @@ func NewServer(ctx *Context, opts ...ServerOption) *grpc.Server { grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...)) s := grpc.NewServer(grpcOpts...) - RegisterHeartbeatServer(s, &HeartbeatService{ - clock: ctx.Clock, - remoteClockMonitor: ctx.RemoteClocks, - clusterName: ctx.ClusterName(), - disableClusterNameVerification: ctx.Config.DisableClusterNameVerification, - clusterID: &ctx.ClusterID, - nodeID: &ctx.NodeID, - settings: ctx.Settings, - testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer, - }) + RegisterHeartbeatServer(s, ctx.NewHeartbeatService()) return s } @@ -385,6 +376,14 @@ type ContextOptions struct { Clock *hlc.Clock Stopper *stop.Stopper Settings *cluster.Settings + // OnHandlePing is called when handling a PingRequest, after + // preliminary checks but before recording clock offset information. + // + // It can inject an error. + OnHandlePing func(*PingRequest) error + // OnSendPing intercepts outgoing PingRequests. It may inject an + // error. + OnSendPing func(*PingRequest) error Knobs ContextTestingKnobs } @@ -404,6 +403,11 @@ func (c ContextOptions) validate() error { if c.Settings == nil { return errors.New("Settings must be set") } + + // NB: OnSendPing and OnHandlePing default to noops. + // This is used both for testing and the cli. + _, _ = c.OnSendPing, c.OnHandlePing + return nil } @@ -1141,18 +1145,28 @@ func (ctx *Context) runHeartbeat( // We re-mint the PingRequest to pick up any asynchronous update to clusterID. clusterID := ctx.ClusterID.Get() request := &PingRequest{ - Addr: ctx.Config.Addr, - MaxOffsetNanos: maxOffsetNanos, - ClusterID: &clusterID, - NodeID: conn.remoteNodeID, - ServerVersion: ctx.Settings.Version.BinaryVersion(), + OriginNodeID: ctx.NodeID.Get(), + OriginAddr: ctx.Config.Addr, + OriginMaxOffsetNanos: maxOffsetNanos, + ClusterID: &clusterID, + TargetNodeID: conn.remoteNodeID, + ServerVersion: ctx.Settings.Version.BinaryVersion(), + } + + interceptor := func(*PingRequest) error { return nil } + if fn := ctx.OnSendPing; fn != nil { + interceptor = fn } var response *PingResponse sendTime := ctx.Clock.PhysicalTime() - ping := func(goCtx context.Context) (err error) { + ping := func(goCtx context.Context) error { // NB: We want the request to fail-fast (the default), otherwise we won't // be notified of transport failures. + if err := interceptor(request); err != nil { + return err + } + var err error response, err = heartbeatClient.Ping(goCtx, request) return err } @@ -1226,3 +1240,18 @@ func (ctx *Context) runHeartbeat( heartbeatTimer.Reset(ctx.Config.RPCHeartbeatInterval) } } + +// NewHeartbeatService returns a HeartbeatService initialized from the Context. +func (ctx *Context) NewHeartbeatService() *HeartbeatService { + return &HeartbeatService{ + clock: ctx.Clock, + remoteClockMonitor: ctx.RemoteClocks, + clusterName: ctx.ClusterName(), + disableClusterNameVerification: ctx.Config.DisableClusterNameVerification, + clusterID: &ctx.ClusterID, + nodeID: &ctx.NodeID, + settings: ctx.Settings, + onHandlePing: ctx.OnHandlePing, + testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer, + } +} diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 9c93120de8d5..3069e47c5f1f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -162,6 +162,60 @@ func TestHeartbeatCB(t *testing.T) { }) } +// TestPingInterceptors checks that OnSendPing and OnHandlePing can inject errors. +func TestPingInterceptors(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + const ( + blockedTargetNodeID = 5 + blockedOriginNodeID = 123 + ) + + errBoomSend := errors.Handled(errors.New("boom due to onSendPing")) + errBoomRecv := status.Error(codes.FailedPrecondition, "boom due to onHandlePing") + opts := ContextOptions{ + TenantID: roachpb.SystemTenantID, + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Config: testutils.NewNodeTestBaseContext(), + Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond), + Stopper: stop.NewStopper(), + Settings: cluster.MakeTestingClusterSettings(), + OnSendPing: func(req *PingRequest) error { + if req.TargetNodeID == blockedTargetNodeID { + return errBoomSend + } + return nil + }, + OnHandlePing: func(req *PingRequest) error { + if req.OriginNodeID == blockedOriginNodeID { + return errBoomRecv + } + return nil + }, + } + defer opts.Stopper.Stop(ctx) + + rpcCtx := NewContext(opts) + { + _, err := rpcCtx.GRPCDialNode("unused:1234", 5, SystemClass).Connect(ctx) + require.Equal(t, errBoomSend, errors.Cause(err)) + } + + s := newTestServer(t, rpcCtx) + RegisterHeartbeatServer(s, rpcCtx.NewHeartbeatService()) + rpcCtx.NodeID.Set(ctx, blockedOriginNodeID) + ln, err := netutil.ListenAndServeGRPC(rpcCtx.Stopper, s, util.TestAddr) + if err != nil { + t.Fatal(err) + } + remoteAddr := ln.Addr().String() + { + _, err := rpcCtx.GRPCDialNode(remoteAddr, blockedOriginNodeID, SystemClass).Connect(ctx) + require.Equal(t, errBoomRecv, errors.Cause(err)) + } +} + var _ roachpb.InternalServer = &internalServer{} type internalServer struct{} diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 90532161af07..10df41746f8f 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -53,6 +53,8 @@ type HeartbeatService struct { clusterName string disableClusterNameVerification bool + onHandlePing func(*PingRequest) error // see ContextOptions.OnHandlePing + // TestingAllowNamedRPCToAnonymousServer, when defined (in tests), // disables errors in case a heartbeat requests a specific node ID but // the remote node doesn't have a node ID yet. This testing knob is @@ -123,7 +125,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR if hs.nodeID != nil { nodeID = hs.nodeID.Get() } - if args.NodeID != 0 && (!hs.testingAllowNamedRPCToAnonymousServer || nodeID != 0) && args.NodeID != nodeID { + if args.TargetNodeID != 0 && (!hs.testingAllowNamedRPCToAnonymousServer || nodeID != 0) && args.TargetNodeID != nodeID { // If nodeID != 0, the situation is clear (we are checking that // the other side is talking to the right node). // @@ -133,7 +135,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR // however we can still serve connections that don't need a node // ID, e.g. during initial gossip. return nil, errors.Errorf( - "client requested node ID %d doesn't match server node ID %d", args.NodeID, nodeID) + "client requested node ID %d doesn't match server node ID %d", args.TargetNodeID, nodeID) } // Check version compatibility. @@ -146,16 +148,22 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR // This check is ignored if either offset is set to 0 (for unittests). // Note that we validated this connection already. Different clusters // could very well have different max offsets. - mo, amo := hs.clock.MaxOffset(), time.Duration(args.MaxOffsetNanos) + mo, amo := hs.clock.MaxOffset(), time.Duration(args.OriginMaxOffsetNanos) if mo != 0 && amo != 0 && mo != amo { panic(fmt.Sprintf("locally configured maximum clock offset (%s) "+ - "does not match that of node %s (%s)", mo, args.Addr, amo)) + "does not match that of node %s (%s)", mo, args.OriginAddr, amo)) + } + + if fn := hs.onHandlePing; fn != nil { + if err := fn(args); err != nil { + return nil, err + } } serverOffset := args.Offset // The server offset should be the opposite of the client offset. serverOffset.Offset = -serverOffset.Offset - hs.remoteClockMonitor.UpdateOffset(ctx, args.Addr, serverOffset, 0 /* roundTripLatency */) + hs.remoteClockMonitor.UpdateOffset(ctx, args.OriginAddr, serverOffset, 0 /* roundTripLatency */) return &PingResponse{ Pong: args.Ping, ServerTime: hs.clock.PhysicalNow(), diff --git a/pkg/rpc/heartbeat.pb.go b/pkg/rpc/heartbeat.pb.go index 44f6468102da..5d854c93443c 100644 --- a/pkg/rpc/heartbeat.pb.go +++ b/pkg/rpc/heartbeat.pb.go @@ -49,7 +49,7 @@ type RemoteOffset struct { func (m *RemoteOffset) Reset() { *m = RemoteOffset{} } func (*RemoteOffset) ProtoMessage() {} func (*RemoteOffset) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_b9adbf29944dc273, []int{0} + return fileDescriptor_heartbeat_565332a8a713c8ea, []int{0} } func (m *RemoteOffset) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -82,21 +82,24 @@ type PingRequest struct { // The last offset the client measured with the server. Offset RemoteOffset `protobuf:"bytes,2,opt,name=offset" json:"offset"` // The address of the client. - Addr string `protobuf:"bytes,3,opt,name=addr" json:"addr"` + OriginAddr string `protobuf:"bytes,3,opt,name=origin_addr,json=originAddr" json:"origin_addr"` // The configured maximum clock offset (in nanoseconds) on the server. - MaxOffsetNanos int64 `protobuf:"varint,4,opt,name=max_offset_nanos,json=maxOffsetNanos" json:"max_offset_nanos"` + OriginMaxOffsetNanos int64 `protobuf:"varint,4,opt,name=origin_max_offset_nanos,json=originMaxOffsetNanos" json:"origin_max_offset_nanos"` // Cluster ID to prevent connections between nodes in different clusters. - ClusterID *github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,5,opt,name=cluster_id,json=clusterId,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"cluster_id,omitempty"` + ClusterID *github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,5,opt,name=origin_cluster_id,json=originClusterId,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"origin_cluster_id,omitempty"` ServerVersion roachpb.Version `protobuf:"bytes,6,opt,name=server_version,json=serverVersion" json:"server_version"` - // Node ID to prevent connections from being misrouted to an invalid node inside the cluster. - NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,7,opt,name=node_id,json=nodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` + // NodeID the originator of the request wishes to connect to. + // This helps prevent connections from being misrouted when addresses are reused. + TargetNodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,7,opt,name=target_node_id,json=targetNodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"target_node_id"` + // NodeID of the originator of the PingRequest. + OriginNodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,8,opt,name=origin_node_id,json=originNodeId,customtype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"origin_node_id"` } func (m *PingRequest) Reset() { *m = PingRequest{} } func (m *PingRequest) String() string { return proto.CompactTextString(m) } func (*PingRequest) ProtoMessage() {} func (*PingRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_b9adbf29944dc273, []int{1} + return fileDescriptor_heartbeat_565332a8a713c8ea, []int{1} } func (m *PingRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,7 +140,7 @@ func (m *PingResponse) Reset() { *m = PingResponse{} } func (m *PingResponse) String() string { return proto.CompactTextString(m) } func (*PingResponse) ProtoMessage() {} func (*PingResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_heartbeat_b9adbf29944dc273, []int{2} + return fileDescriptor_heartbeat_565332a8a713c8ea, []int{2} } func (m *PingResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -392,11 +395,11 @@ func (m *PingRequest) MarshalTo(dAtA []byte) (int, error) { i += n1 dAtA[i] = 0x1a i++ - i = encodeVarintHeartbeat(dAtA, i, uint64(len(m.Addr))) - i += copy(dAtA[i:], m.Addr) + i = encodeVarintHeartbeat(dAtA, i, uint64(len(m.OriginAddr))) + i += copy(dAtA[i:], m.OriginAddr) dAtA[i] = 0x20 i++ - i = encodeVarintHeartbeat(dAtA, i, uint64(m.MaxOffsetNanos)) + i = encodeVarintHeartbeat(dAtA, i, uint64(m.OriginMaxOffsetNanos)) if m.ClusterID != nil { dAtA[i] = 0x2a i++ @@ -417,7 +420,10 @@ func (m *PingRequest) MarshalTo(dAtA []byte) (int, error) { i += n3 dAtA[i] = 0x38 i++ - i = encodeVarintHeartbeat(dAtA, i, uint64(m.NodeID)) + i = encodeVarintHeartbeat(dAtA, i, uint64(m.TargetNodeID)) + dAtA[i] = 0x40 + i++ + i = encodeVarintHeartbeat(dAtA, i, uint64(m.OriginNodeID)) return i, nil } @@ -497,16 +503,17 @@ func (m *PingRequest) Size() (n int) { n += 1 + l + sovHeartbeat(uint64(l)) l = m.Offset.Size() n += 1 + l + sovHeartbeat(uint64(l)) - l = len(m.Addr) + l = len(m.OriginAddr) n += 1 + l + sovHeartbeat(uint64(l)) - n += 1 + sovHeartbeat(uint64(m.MaxOffsetNanos)) + n += 1 + sovHeartbeat(uint64(m.OriginMaxOffsetNanos)) if m.ClusterID != nil { l = m.ClusterID.Size() n += 1 + l + sovHeartbeat(uint64(l)) } l = m.ServerVersion.Size() n += 1 + l + sovHeartbeat(uint64(l)) - n += 1 + sovHeartbeat(uint64(m.NodeID)) + n += 1 + sovHeartbeat(uint64(m.TargetNodeID)) + n += 1 + sovHeartbeat(uint64(m.OriginNodeID)) return n } @@ -737,7 +744,7 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OriginAddr", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -762,13 +769,13 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Addr = string(dAtA[iNdEx:postIndex]) + m.OriginAddr = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field MaxOffsetNanos", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OriginMaxOffsetNanos", wireType) } - m.MaxOffsetNanos = 0 + m.OriginMaxOffsetNanos = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowHeartbeat @@ -778,7 +785,7 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.MaxOffsetNanos |= (int64(b) & 0x7F) << shift + m.OriginMaxOffsetNanos |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -847,9 +854,28 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 7: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TargetNodeID", wireType) + } + m.TargetNodeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHeartbeat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TargetNodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OriginNodeID", wireType) } - m.NodeID = 0 + m.OriginNodeID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowHeartbeat @@ -859,7 +885,7 @@ func (m *PingRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + m.OriginNodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift if b < 0x80 { break } @@ -1167,46 +1193,48 @@ var ( ErrIntOverflowHeartbeat = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("rpc/heartbeat.proto", fileDescriptor_heartbeat_b9adbf29944dc273) } +func init() { proto.RegisterFile("rpc/heartbeat.proto", fileDescriptor_heartbeat_565332a8a713c8ea) } -var fileDescriptor_heartbeat_b9adbf29944dc273 = []byte{ - // 594 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x41, 0x4f, 0xd4, 0x40, - 0x14, 0x6e, 0xd9, 0xb2, 0xb8, 0xb3, 0x0b, 0x31, 0xa3, 0x21, 0xcd, 0x62, 0xba, 0xb8, 0x09, 0xba, - 0xa7, 0xd6, 0xe0, 0x49, 0x6f, 0x2c, 0x18, 0xdd, 0x98, 0x2c, 0x66, 0x05, 0x0e, 0x1e, 0x6c, 0x66, - 0x3b, 0x8f, 0x32, 0x81, 0xce, 0xd4, 0xe9, 0x94, 0xe0, 0xd1, 0x7f, 0x60, 0x3c, 0x79, 0xf4, 0x77, - 0xf8, 0x0b, 0x38, 0x72, 0x24, 0x1e, 0x88, 0x2e, 0x7f, 0xc4, 0x4c, 0xa7, 0xbb, 0x14, 0xe4, 0x60, - 0xb8, 0xbd, 0xbe, 0xf7, 0xbd, 0xf7, 0xbe, 0x6f, 0xbe, 0x57, 0xf4, 0x40, 0xa6, 0x51, 0x70, 0x00, - 0x44, 0xaa, 0x31, 0x10, 0xe5, 0xa7, 0x52, 0x28, 0x81, 0x17, 0x23, 0x11, 0x1d, 0x4a, 0x41, 0xa2, - 0x03, 0x5f, 0xa6, 0x51, 0x7b, 0xb9, 0x08, 0xd3, 0x71, 0x90, 0x80, 0x22, 0x94, 0x28, 0x62, 0x60, - 0xed, 0x87, 0xb1, 0x88, 0x45, 0x11, 0x06, 0x3a, 0x32, 0xd9, 0xee, 0x17, 0x1b, 0xb5, 0x46, 0x90, - 0x08, 0x05, 0xdb, 0xfb, 0xfb, 0x19, 0x28, 0xfc, 0x08, 0xd5, 0x45, 0x11, 0xb9, 0xf6, 0xaa, 0xdd, - 0xab, 0xf5, 0x9d, 0xd3, 0x8b, 0x8e, 0x35, 0x2a, 0x73, 0xf8, 0x09, 0x6a, 0xe6, 0x3c, 0x02, 0xa9, - 0x08, 0xe3, 0xea, 0xb3, 0x3b, 0x57, 0x81, 0x54, 0x0b, 0x78, 0x0d, 0x35, 0x13, 0x20, 0x59, 0x2e, - 0x81, 0x86, 0x44, 0xb9, 0xb5, 0x0a, 0x0e, 0x4d, 0x0b, 0x1b, 0xea, 0xa5, 0xf3, 0xfd, 0x47, 0xc7, - 0xea, 0xfe, 0xac, 0xa1, 0xe6, 0x3b, 0xc6, 0xe3, 0x11, 0x7c, 0xca, 0x21, 0x53, 0xd8, 0x45, 0x4e, - 0xca, 0x78, 0x5c, 0x10, 0x68, 0x94, 0x5d, 0x45, 0x06, 0xbf, 0x98, 0x91, 0xd3, 0x9b, 0x9b, 0xeb, - 0x2b, 0xfe, 0x35, 0xed, 0x7e, 0x55, 0xc9, 0x0d, 0xe6, 0x2e, 0x72, 0x08, 0xa5, 0xb2, 0xa0, 0x32, - 0x1b, 0xaa, 0x33, 0xd8, 0x47, 0xf7, 0x13, 0x72, 0x12, 0x1a, 0x5c, 0xc8, 0x09, 0x17, 0x99, 0xeb, - 0x54, 0x08, 0x2f, 0x25, 0xe4, 0xc4, 0x8c, 0x1c, 0xea, 0x1a, 0x8e, 0x10, 0x8a, 0x8e, 0xf2, 0x4c, - 0x81, 0x0c, 0x19, 0x75, 0xe7, 0x57, 0xed, 0x5e, 0xab, 0xbf, 0xf5, 0xeb, 0xa2, 0xf3, 0x3c, 0x66, - 0xea, 0x20, 0x1f, 0xfb, 0x91, 0x48, 0x82, 0x19, 0x2d, 0x3a, 0xbe, 0x8a, 0x83, 0xf4, 0x30, 0x0e, - 0x72, 0xc5, 0x8e, 0x82, 0x3c, 0x67, 0xd4, 0xdf, 0xdd, 0x1d, 0x6c, 0x4d, 0x2e, 0x3a, 0x8d, 0x4d, - 0x33, 0x6c, 0xb0, 0x35, 0x6a, 0x94, 0x73, 0x07, 0x14, 0xbf, 0x46, 0x4b, 0x19, 0xc8, 0x63, 0x90, - 0xe1, 0x31, 0xc8, 0x8c, 0x09, 0xee, 0xd6, 0x0b, 0xc5, 0xed, 0xaa, 0x62, 0x63, 0xb4, 0xbf, 0x67, - 0x10, 0x25, 0xdd, 0x45, 0xd3, 0x57, 0x26, 0xf1, 0x47, 0xb4, 0xc0, 0x05, 0x05, 0x4d, 0x75, 0x61, - 0xd5, 0xee, 0xcd, 0xf7, 0x5f, 0x69, 0xd4, 0x7f, 0xd3, 0x9d, 0xee, 0x18, 0x0a, 0x0a, 0x05, 0xdd, - 0xba, 0x89, 0x46, 0x75, 0x3d, 0x75, 0x40, 0xbb, 0xdf, 0xe6, 0x50, 0xcb, 0x98, 0x97, 0xa5, 0x82, - 0x67, 0x50, 0xb8, 0x27, 0xfe, 0x71, 0x4f, 0xf0, 0x58, 0x1f, 0x45, 0xa9, 0x49, 0xb1, 0x04, 0xae, - 0x1d, 0x0f, 0x32, 0x85, 0x1d, 0x96, 0xc0, 0x2d, 0xd2, 0x6b, 0x77, 0x93, 0xfe, 0x14, 0xb5, 0xa6, - 0x46, 0x71, 0x92, 0x40, 0x61, 0xea, 0x94, 0x51, 0xb3, 0xac, 0x0c, 0x49, 0x02, 0x78, 0x1b, 0x3d, - 0xa6, 0x2c, 0x23, 0xe3, 0x23, 0x08, 0xab, 0x0d, 0x7a, 0x3f, 0xdb, 0x67, 0x11, 0x51, 0x9a, 0x84, - 0x36, 0xfa, 0x5e, 0xd9, 0xed, 0x95, 0xf0, 0xcd, 0xab, 0x21, 0x7b, 0x15, 0xec, 0xfa, 0x10, 0x35, - 0xde, 0x4c, 0xff, 0x52, 0xbc, 0x81, 0x1c, 0xfd, 0x40, 0xb8, 0x7d, 0xe3, 0x58, 0x2b, 0x27, 0xdf, - 0x5e, 0xb9, 0xb5, 0x66, 0x5e, 0xb4, 0x6b, 0xad, 0x03, 0x5a, 0xde, 0x81, 0x4c, 0x31, 0x1e, 0xcf, - 0xc6, 0xbe, 0x57, 0x12, 0x48, 0x82, 0xdf, 0x22, 0xa4, 0xb1, 0xe5, 0xd7, 0xdd, 0x57, 0xf4, 0xec, - 0x67, 0x76, 0x7f, 0xed, 0xf4, 0x8f, 0x67, 0x9d, 0x4e, 0x3c, 0xfb, 0x6c, 0xe2, 0xd9, 0xe7, 0x13, - 0xcf, 0xfe, 0x3d, 0xf1, 0xec, 0xaf, 0x97, 0x9e, 0x75, 0x76, 0xe9, 0x59, 0xe7, 0x97, 0x9e, 0xf5, - 0xa1, 0x26, 0xd3, 0xe8, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x08, 0x36, 0xe1, 0x9a, 0x86, 0x04, - 0x00, 0x00, +var fileDescriptor_heartbeat_565332a8a713c8ea = []byte{ + // 635 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x41, 0x4f, 0xd4, 0x40, + 0x14, 0x6e, 0xd9, 0x05, 0x61, 0x76, 0xc1, 0x38, 0x12, 0x6c, 0x16, 0xd3, 0xc5, 0x4d, 0xd0, 0x3d, + 0xb5, 0x06, 0x4f, 0xea, 0x89, 0x85, 0x44, 0x09, 0x71, 0x31, 0x2b, 0x70, 0xf0, 0xd2, 0xcc, 0x76, + 0x1e, 0x65, 0x02, 0xed, 0x94, 0xe9, 0x94, 0xe0, 0xd1, 0x7f, 0x60, 0x3c, 0x79, 0xf4, 0xe7, 0x70, + 0xe4, 0x48, 0x3c, 0x10, 0x5d, 0x0e, 0xfe, 0x0d, 0x33, 0x9d, 0x29, 0x5b, 0x90, 0x83, 0x21, 0xde, + 0xde, 0xbc, 0xf7, 0xbd, 0xf7, 0x7d, 0xef, 0xed, 0xd7, 0x45, 0x0f, 0x45, 0x1a, 0xfa, 0xfb, 0x40, + 0x84, 0x1c, 0x02, 0x91, 0x5e, 0x2a, 0xb8, 0xe4, 0x78, 0x36, 0xe4, 0xe1, 0x81, 0xe0, 0x24, 0xdc, + 0xf7, 0x44, 0x1a, 0xb6, 0x16, 0x8a, 0x30, 0x1d, 0xfa, 0x31, 0x48, 0x42, 0x89, 0x24, 0x1a, 0xd6, + 0x9a, 0x8f, 0x78, 0xc4, 0x8b, 0xd0, 0x57, 0x91, 0xce, 0x76, 0x3e, 0xdb, 0xa8, 0x39, 0x80, 0x98, + 0x4b, 0xd8, 0xda, 0xdb, 0xcb, 0x40, 0xe2, 0xc7, 0x68, 0x8a, 0x17, 0x91, 0x63, 0x2f, 0xd9, 0xdd, + 0x5a, 0xaf, 0x7e, 0x7a, 0xd1, 0xb6, 0x06, 0x26, 0x87, 0x9f, 0xa2, 0x46, 0x9e, 0x84, 0x20, 0x24, + 0x61, 0x89, 0xfc, 0xe4, 0x4c, 0x54, 0x20, 0xd5, 0x02, 0x5e, 0x46, 0x8d, 0x18, 0x48, 0x96, 0x0b, + 0xa0, 0x01, 0x91, 0x4e, 0xad, 0x82, 0x43, 0x65, 0x61, 0x55, 0xbe, 0xaa, 0x7f, 0xfb, 0xde, 0xb6, + 0x3a, 0xbf, 0xeb, 0xa8, 0xf1, 0x9e, 0x25, 0xd1, 0x00, 0x8e, 0x72, 0xc8, 0x24, 0x76, 0x50, 0x3d, + 0x65, 0x49, 0x54, 0x08, 0x98, 0x31, 0x5d, 0x45, 0x06, 0xbf, 0xbc, 0x12, 0xa7, 0x98, 0x1b, 0x2b, + 0x8b, 0xde, 0xb5, 0xdd, 0xbd, 0xea, 0x26, 0x37, 0x94, 0x2f, 0xa3, 0x06, 0x17, 0x2c, 0x62, 0x49, + 0x40, 0x28, 0x15, 0x85, 0xa2, 0x72, 0x36, 0xd2, 0x85, 0x55, 0x4a, 0x05, 0x7e, 0x8d, 0x1e, 0x19, + 0x58, 0x4c, 0x4e, 0x02, 0xdd, 0x1b, 0x24, 0x24, 0xe1, 0x99, 0x53, 0xaf, 0x2c, 0x31, 0xaf, 0x41, + 0xef, 0xc8, 0x89, 0x26, 0xeb, 0x2b, 0x04, 0x4e, 0xd1, 0x03, 0xd3, 0x1c, 0x1e, 0xe6, 0x99, 0x04, + 0x11, 0x30, 0xea, 0x4c, 0x2e, 0xd9, 0xdd, 0x66, 0x6f, 0xfd, 0xc7, 0x45, 0xfb, 0x45, 0xc4, 0xe4, + 0x7e, 0x3e, 0xf4, 0x42, 0x1e, 0xfb, 0x57, 0xba, 0xe9, 0x70, 0x1c, 0xfb, 0xe9, 0x41, 0xe4, 0xe7, + 0x92, 0x1d, 0xfa, 0x79, 0xce, 0xa8, 0xb7, 0xb3, 0xb3, 0xb1, 0x3e, 0xba, 0x68, 0xcf, 0xac, 0xe9, + 0x61, 0x1b, 0xeb, 0x83, 0xfb, 0x7a, 0x7c, 0x99, 0xa0, 0xf8, 0x0d, 0x9a, 0xcb, 0x40, 0x1c, 0x83, + 0x08, 0x8e, 0x41, 0x64, 0x8c, 0x27, 0xce, 0x54, 0x71, 0x98, 0x56, 0xf5, 0x30, 0xda, 0x0f, 0xde, + 0xae, 0x46, 0x98, 0x0d, 0x66, 0x75, 0x9f, 0x49, 0xe2, 0x23, 0x34, 0x27, 0x89, 0x88, 0xd4, 0xb2, + 0x9c, 0x82, 0xd2, 0x7d, 0x6f, 0xc9, 0xee, 0x4e, 0xf6, 0x36, 0x15, 0xf8, 0x9f, 0xb5, 0x97, 0x54, + 0x7d, 0x4e, 0xa1, 0xd0, 0xde, 0xdc, 0x2e, 0x86, 0xea, 0xf7, 0xa0, 0x29, 0xc7, 0x2f, 0xaa, 0x28, + 0xcd, 0xb5, 0x4a, 0xca, 0xe9, 0xff, 0x42, 0xb9, 0x55, 0x0c, 0x2d, 0x29, 0xf9, 0xf8, 0x45, 0x3b, + 0x5f, 0x27, 0x50, 0x53, 0x3b, 0x2d, 0x4b, 0x79, 0x92, 0x41, 0x61, 0x35, 0xfe, 0x97, 0xd5, 0x78, + 0x12, 0x29, 0xbf, 0x98, 0xcb, 0x4a, 0x16, 0xc3, 0x35, 0xa7, 0x23, 0x5d, 0xd8, 0x66, 0x31, 0xdc, + 0xf2, 0x03, 0xd4, 0xee, 0xf6, 0x03, 0x3c, 0x43, 0xcd, 0xd2, 0x34, 0x09, 0x89, 0xa1, 0x70, 0x5b, + 0xa9, 0xa8, 0x61, 0x2a, 0x7d, 0x12, 0x03, 0xde, 0x42, 0x4f, 0x28, 0xcb, 0xc8, 0xf0, 0x10, 0x82, + 0x6a, 0x83, 0xe2, 0x67, 0x7b, 0x2c, 0x24, 0x52, 0x89, 0x50, 0xa6, 0x9b, 0x36, 0xdd, 0xae, 0x81, + 0xaf, 0x8d, 0x87, 0xec, 0x56, 0xb0, 0x2b, 0x7d, 0x34, 0xf3, 0xb6, 0xfc, 0x4b, 0xc1, 0xab, 0xa8, + 0xae, 0x0e, 0x84, 0x5b, 0x37, 0xbe, 0xac, 0xca, 0xf7, 0xd9, 0x5a, 0xbc, 0xb5, 0xa6, 0x2f, 0xda, + 0xb1, 0x56, 0x00, 0x2d, 0x6c, 0x43, 0x26, 0x59, 0x12, 0x5d, 0x8d, 0xfd, 0x20, 0x05, 0x90, 0x18, + 0x6f, 0x22, 0xa4, 0xb0, 0xe6, 0x75, 0x77, 0x8a, 0xae, 0xfd, 0xdc, 0xee, 0x2d, 0x9f, 0xfe, 0x72, + 0xad, 0xd3, 0x91, 0x6b, 0x9f, 0x8d, 0x5c, 0xfb, 0x7c, 0xe4, 0xda, 0x3f, 0x47, 0xae, 0xfd, 0xe5, + 0xd2, 0xb5, 0xce, 0x2e, 0x5d, 0xeb, 0xfc, 0xd2, 0xb5, 0x3e, 0xd6, 0x44, 0x1a, 0xfe, 0x09, 0x00, + 0x00, 0xff, 0xff, 0x72, 0x69, 0xb3, 0x22, 0x33, 0x05, 0x00, 0x00, } diff --git a/pkg/rpc/heartbeat.proto b/pkg/rpc/heartbeat.proto index 0427588a6154..7da9f6f4810d 100644 --- a/pkg/rpc/heartbeat.proto +++ b/pkg/rpc/heartbeat.proto @@ -42,18 +42,24 @@ message PingRequest { // The last offset the client measured with the server. optional RemoteOffset offset = 2 [(gogoproto.nullable) = false]; // The address of the client. - optional string addr = 3 [(gogoproto.nullable) = false]; + optional string origin_addr = 3 [(gogoproto.nullable) = false]; // The configured maximum clock offset (in nanoseconds) on the server. - optional int64 max_offset_nanos = 4 [(gogoproto.nullable) = false]; + optional int64 origin_max_offset_nanos = 4 [(gogoproto.nullable) = false]; // Cluster ID to prevent connections between nodes in different clusters. - optional bytes cluster_id = 5 [ + optional bytes origin_cluster_id = 5 [ (gogoproto.customname) = "ClusterID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; optional roachpb.Version server_version = 6 [(gogoproto.nullable) = false]; - // Node ID to prevent connections from being misrouted to an invalid node inside the cluster. - optional int32 node_id = 7 [ + // NodeID the originator of the request wishes to connect to. + // This helps prevent connections from being misrouted when addresses are reused. + optional int32 target_node_id = 7 [ (gogoproto.nullable) = false, - (gogoproto.customname) = "NodeID", + (gogoproto.customname) = "TargetNodeID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + // NodeID of the originator of the PingRequest. + optional int32 origin_node_id = 8 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "OriginNodeID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; } diff --git a/pkg/rpc/heartbeat_test.go b/pkg/rpc/heartbeat_test.go index 6a3293e337a3..0b905bc771f8 100644 --- a/pkg/rpc/heartbeat_test.go +++ b/pkg/rpc/heartbeat_test.go @@ -172,10 +172,10 @@ func TestClockOffsetMismatch(t *testing.T) { hs.clusterID.Set(ctx, uuid.Nil) request := &PingRequest{ - Ping: "testManual", - Addr: "test", - MaxOffsetNanos: (500 * time.Millisecond).Nanoseconds(), - ServerVersion: st.Version.BinaryVersion(), + Ping: "testManual", + OriginAddr: "test", + OriginMaxOffsetNanos: (500 * time.Millisecond).Nanoseconds(), + ServerVersion: st.Version.BinaryVersion(), } response, err := hs.Ping(context.Background(), request) t.Fatalf("should not have reached but got response=%v err=%v", response, err) @@ -257,7 +257,7 @@ func TestNodeIDCompare(t *testing.T) { heartbeat.nodeID.Reset(td.serverNodeID) request := &PingRequest{ Ping: "testPing", - NodeID: td.clientNodeID, + TargetNodeID: td.clientNodeID, ServerVersion: st.Version.BinaryVersion(), } _, err := heartbeat.Ping(context.Background(), request) diff --git a/pkg/server/server.go b/pkg/server/server.go index f01142cda58c..bb9e002c2aa4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -264,7 +264,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { Clock: clock, Stopper: stopper, Settings: cfg.Settings, - } + OnSendPing: func(req *rpc.PingRequest) error { + // TODO(tbg): hook this up to a check for decommissioned nodes. + return nil + }, + OnHandlePing: func(req *rpc.PingRequest) error { + // TODO(tbg): hook this up to a check for decommissioned nodes. + return nil + }} if knobs := cfg.TestingKnobs.Server; knobs != nil { serverKnobs := knobs.(*TestingKnobs) rpcCtxOpts.Knobs = serverKnobs.ContextTestingKnobs @@ -378,16 +385,21 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { nlActive, nlRenewal := cfg.NodeLivenessDurations() - nodeLiveness := kvserver.NewNodeLiveness( - cfg.AmbientCtx, - clock, - db, - g, - nlActive, - nlRenewal, - st, - cfg.HistogramWindowInterval(), - ) + nodeLiveness := kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: cfg.AmbientCtx, + Clock: clock, + DB: db, + Gossip: g, + LivenessThreshold: nlActive, + RenewalDuration: nlRenewal, + Settings: st, + HistogramWindowInterval: cfg.HistogramWindowInterval(), + OnNodeDecommissioned: func(liveness kvserverpb.Liveness) { + if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil { + knobs.OnDecommissionedCallback(liveness) + } + }, + }) registry.AddMetricStruct(nodeLiveness.Metrics()) storePool := kvserver.NewStorePool( @@ -1586,13 +1598,17 @@ func (s *Server) PreStart(ctx context.Context) error { // Begin the node liveness heartbeat. Add a callback which records the local // store "last up" timestamp for every store whenever the liveness record is // updated. - s.nodeLiveness.StartHeartbeat(ctx, s.stopper, s.engines, func(ctx context.Context) { - now := s.clock.Now() - if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { - return s.WriteLastUpTimestamp(ctx, now) - }); err != nil { - log.Warningf(ctx, "writing last up timestamp: %v", err) - } + s.nodeLiveness.Start(ctx, kvserver.NodeLivenessStartOptions{ + Stopper: s.stopper, + Engines: s.engines, + OnSelfLive: func(ctx context.Context) { + now := s.clock.Now() + if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + return s.WriteLastUpTimestamp(ctx, now) + }); err != nil { + log.Warningf(ctx, "writing last up timestamp: %v", err) + } + }, }) // Begin recording status summaries. diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 29001ae95b89..a34b7b0a38ae 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -14,6 +14,7 @@ import ( "net" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/diagnosticspb" @@ -73,6 +74,9 @@ type TestingKnobs struct { // TODO(irfansharif): Update users of this testing knob to use the // appropriate clusterversion.Handle instead. BinaryVersionOverride roachpb.Version + // An (additional) callback invoked whenever a + // node is permanently removed from the cluster. + OnDecommissionedCallback func(kvserverpb.Liveness) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 258139bf321f..ac742c457057 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -162,16 +162,16 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto cfg.Gossip = ltc.Gossip cfg.HistogramWindowInterval = metric.TestSampleInterval active, renewal := cfg.NodeLivenessDurations() - cfg.NodeLiveness = kvserver.NewNodeLiveness( - cfg.AmbientCtx, - cfg.Clock, - cfg.DB, - cfg.Gossip, - active, - renewal, - cfg.Settings, - cfg.HistogramWindowInterval, - ) + cfg.NodeLiveness = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ + AmbientCtx: cfg.AmbientCtx, + Clock: cfg.Clock, + DB: cfg.DB, + Gossip: cfg.Gossip, + LivenessThreshold: active, + RenewalDuration: renewal, + Settings: cfg.Settings, + HistogramWindowInterval: cfg.HistogramWindowInterval, + }) kvserver.TimeUntilStoreDead.Override(&cfg.Settings.SV, kvserver.TestTimeUntilStoreDead) cfg.StorePool = kvserver.NewStorePool( cfg.AmbientCtx, @@ -229,7 +229,8 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } if !ltc.DisableLivenessHeartbeat { - cfg.NodeLiveness.StartHeartbeat(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) + cfg.NodeLiveness.Start(ctx, + kvserver.NodeLivenessStartOptions{Stopper: ltc.stopper, Engines: []storage.Engine{ltc.Eng}}) } if err := ltc.Store.Start(ctx, ltc.stopper); err != nil {