diff --git a/build/deploy/Dockerfile b/build/deploy/Dockerfile index 7d9c62b0dde7..6579a9aeb55e 100644 --- a/build/deploy/Dockerfile +++ b/build/deploy/Dockerfile @@ -1,14 +1,12 @@ -FROM debian:9.12-slim +FROM registry.access.redhat.com/ubi8/ubi -# For deployment, we need -# libc6 - dynamically linked by cockroach binary +# For deployment, we need the following installed (they are installed +# by default in RedHat UBI standard): +# glibc - dynamically linked by cockroach binary # ca-certificates - to authenticate TLS connections for telemetry and # bulk-io with S3/GCS/Azure # tzdata - for time zone functions -RUN apt-get update && \ - apt-get -y upgrade && \ - apt-get install -y libc6 ca-certificates tzdata && \ - rm -rf /var/lib/apt/lists/* +RUN yum update --disablerepo=* --enablerepo=ubi-8-appstream --enablerepo=ubi-8-baseos -y && rm -rf /var/cache/yum # Install GEOS libraries. RUN mkdir /usr/local/lib/cockroach @@ -16,13 +14,13 @@ COPY libgeos.so libgeos_c.so /usr/local/lib/cockroach/ RUN mkdir -p /cockroach COPY cockroach.sh cockroach /cockroach/ + # Set working directory so that relative paths # are resolved appropriately when passed as args. WORKDIR /cockroach/ -# Include the directory into the path -# to make it easier to invoke commands -# via Docker +# Include the directory in the path to make it easier to invoke +# commands via Docker ENV PATH=/cockroach:$PATH ENV COCKROACH_CHANNEL=official-docker diff --git a/pkg/cli/node.go b/pkg/cli/node.go index 8ed8f735362b..3d7fd3863cac 100644 --- a/pkg/cli/node.go +++ b/pkg/cli/node.go @@ -337,7 +337,17 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error { } c := serverpb.NewAdminClient(conn) - return runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, nodeIDs) + if err := runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, nodeIDs); err != nil { + cause := errors.UnwrapAll(err) + if s, ok := status.FromError(cause); ok && s.Code() == codes.NotFound { + // Are we trying to decommision a node that does not + // exist? See Server.Decommission for where this specific grpc error + // code is generated. + return errors.New("node does not exist") + } + return err + } + return nil } func handleNodeDecommissionSelf( @@ -566,13 +576,21 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error { } resp, err := c.Decommission(ctx, req) if err != nil { + cause := errors.UnwrapAll(err) // If it's a specific illegal membership transition error, we try to // surface a more readable message to the user. See // ValidateLivenessTransition in kvserverpb/liveness.go for where this // error is generated. - if s, ok := status.FromError(err); ok && s.Code() == codes.FailedPrecondition { + if s, ok := status.FromError(cause); ok && s.Code() == codes.FailedPrecondition { return errors.Newf("%s", s.Message()) } + if s, ok := status.FromError(cause); ok && s.Code() == codes.NotFound { + // Are we trying to recommission node that does not + // exist? See Server.Decommission for where this specific grpc error + // code is generated. + fmt.Fprintln(stderr) + return errors.New("node does not exist") + } return err } return printDecommissionStatus(*resp) diff --git a/pkg/jobs/helpers.go b/pkg/jobs/helpers.go index 9fe6f2e245eb..9c17ede4b962 100644 --- a/pkg/jobs/helpers.go +++ b/pkg/jobs/helpers.go @@ -65,14 +65,14 @@ func (*FakeNodeLiveness) ModuleTestingKnobs() {} // Self implements the implicit storage.NodeLiveness interface. It uses NodeID // as the node ID. On every call, a nonblocking send is performed over nl.ch to // allow tests to execute a callback. -func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, error) { +func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, bool) { select { case nl.SelfCalledCh <- struct{}{}: default: } nl.mu.Lock() defer nl.mu.Unlock() - return *nl.mu.livenessMap[FakeNodeID.Get()], nil + return *nl.mu.livenessMap[FakeNodeID.Get()], true } // GetLivenesses implements the implicit storage.NodeLiveness interface. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 940808aaf36d..9d5c34ed8453 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -784,10 +784,10 @@ func (r *Registry) maybeCancelJobsDeprecated( // https://github.com/cockroachdb/cockroach/issues/47892 return } - liveness, err := nl.Self() - if err != nil { + liveness, ok := nl.Self() + if !ok { if nodeLivenessLogLimiter.ShouldLog() { - log.Warningf(ctx, "unable to get node liveness: %s", err) + log.Warning(ctx, "own liveness record not found") } // Conservatively assume our lease has expired. Abort all jobs. r.deprecatedCancelAll(ctx) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 15424fbe9526..1942ff5e613c 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1435,11 +1435,12 @@ func (m *multiTestContext) heartbeatLiveness(ctx context.Context, store int) err m.mu.RLock() nl := m.nodeLivenesses[store] m.mu.RUnlock() - l, err := nl.Self() - if err != nil { - return err + l, ok := nl.Self() + if !ok { + return errors.New("liveness not found") } + var err error for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); { if err = nl.Heartbeat(ctx, l); !errors.Is(err, kvserver.ErrEpochIncremented) { break diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a3a61b766b5b..a359b65909cb 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -514,12 +514,9 @@ func (nl *NodeLiveness) SetDrainingInternal( } func (nl *NodeLiveness) SetDecommissioningInternal( - ctx context.Context, - nodeID roachpb.NodeID, - liveness LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (changeCommitted bool, err error) { - return nl.setMembershipStatusInternal(ctx, nodeID, liveness, targetStatus) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } // GetCircuitBreaker returns the circuit breaker controlling diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 80a08f2695c1..670050169394 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -40,9 +40,14 @@ import ( ) var ( - // ErrNoLivenessRecord is returned when asking for liveness information - // about a node for which nothing is known. - ErrNoLivenessRecord = errors.New("node not in the liveness table") + // ErrMissingLivenessRecord is returned when asking for liveness information + // about a node for which nothing is known. This happens when attempting to + // {d,r}ecommission a non-existent node. + ErrMissingLivenessRecord = errors.New("missing liveness record") + + // errLivenessRecordCacheMiss is returned when asking for the liveness + // record of a given node and it is not found in the in-memory cache. + errLivenessRecordCacheMiss = errors.New("liveness record not found in cache") // errChangeMembershipStatusFailed is returned when we're not able to // conditionally write the target membership status. It's safe to retry @@ -177,7 +182,40 @@ type NodeLiveness struct { mu struct { syncutil.RWMutex - callbacks []IsLiveCallback + callbacks []IsLiveCallback + // nodes is an in-memory cache of liveness records that NodeLiveness + // knows about (having learnt of them through gossip or through KV). + // It's a look-aside cache, and is accessed primarily through + // `getLivenessLocked` and callers. + // + // TODO(irfansharif): The caching story for NodeLiveness is a bit + // complicated. This can be attributed to the fact that pre-20.2, we + // weren't always guaranteed for us liveness records for every given + // node. Because of this it wasn't possible to have a + // look-through cache (it wouldn't know where to fetch from if a record + // was found to be missing). + // + // Now that we're always guaranteed to have a liveness records present, + // we should change this out to be a look-through cache instead (it can + // fall back to KV when a given record is missing). This would help + // simplify our current structure where do the following: + // + // - Consult this cache to find an existing liveness record + // - If missing, fetch the record from KV + // - Update the liveness record in KV + // - Add the updated record into this cache (see `maybeUpdate`) + // + // (See `StartHeartbeat` for an example of this pattern.) + // + // What we want instead is a bit simpler: + // + // - Consult this cache to find an existing liveness record + // - If missing, fetch the record from KV, update and return from cache + // - Update the liveness record in KV + // - Add the updated record into this cache + // + // More concretely, we want `getLivenessRecordFromKV` to be tucked away + // within `getLivenessLocked`. nodes map[roachpb.NodeID]LivenessRecord heartbeatCallback HeartbeatCallback // Before heartbeating, we write to each of these engines to avoid @@ -256,22 +294,32 @@ func (nl *NodeLiveness) sem(nodeID roachpb.NodeID) chan struct{} { // pkg/server/drain.go for details. func (nl *NodeLiveness) SetDraining( ctx context.Context, drain bool, reporter func(int, redact.SafeString), -) { +) error { ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { - oldLivenessRec, err := nl.SelfEx() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) + oldLivenessRec, ok := nl.SelfEx() + if !ok { + // There was a cache miss, let's now fetch the record from KV + // directly. + nodeID := nl.gossip.NodeID.Get() + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return err + } + oldLivenessRec = livenessRec } - err = nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter) - if err != nil { + if err := nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter); err != nil { if log.V(1) { log.Infof(ctx, "attempting to set liveness draining status to %v: %v", drain, err) } continue } - return + return nil } + if err := ctx.Err(); err != nil { + return err + } + return errors.New("failed to drain self") } // SetMembershipStatus changes the liveness record to reflect the target @@ -332,7 +380,8 @@ func (nl *NodeLiveness) SetMembershipStatus( return false, errors.Wrap(err, "unable to get liveness") } if kv.Value == nil { - return false, ErrNoLivenessRecord + // We must be trying to decommission a node that does not exist. + return false, ErrMissingLivenessRecord } if err := kv.Value.GetProto(&oldLiveness); err != nil { return false, errors.Wrap(err, "invalid liveness record") @@ -347,8 +396,8 @@ func (nl *NodeLiveness) SetMembershipStatus( // Offer it to make sure that when we actually try to update the // liveness, the previous view is correct. This, too, is required to // de-flake TestNodeLivenessDecommissionAbsent. - nl.maybeUpdate(oldLivenessRec) - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + nl.maybeUpdate(ctx, oldLivenessRec) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } for { @@ -379,18 +428,14 @@ func (nl *NodeLiveness) setDrainingInternal( <-sem }() - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - newLiveness = oldLivenessRec.Liveness + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness + if reporter != nil && drain && !newLiveness.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") @@ -404,7 +449,7 @@ func (nl *NodeLiveness) setDrainingInternal( ignoreCache: true, } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Draining == update.newLiveness.Draining { return errNodeDrainingSet @@ -421,7 +466,7 @@ func (nl *NodeLiveness) setDrainingInternal( return err } - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) return nil } @@ -501,29 +546,15 @@ func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb } func (nl *NodeLiveness) setMembershipStatusInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (statusChanged bool, err error) { - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - // - // TODO(irfansharif): The above is now no longer possible. We always - // create one (see CreateLivenessRecord, WriteInitialClusterData) when - // adding a node to the cluster. We should clean up all this logic that - // tries to work around the liveness record possibly not existing. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - // We start off with a copy of our existing liveness record. - newLiveness = oldLivenessRec.Liveness + return false, errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness newLiveness.Membership = targetStatus if oldLivenessRec.Membership == newLiveness.Membership { // No-op. Return early. @@ -535,11 +566,8 @@ func (nl *NodeLiveness) setMembershipStatusInternal( return false, nil } - if oldLivenessRec.Liveness != (kvserverpb.Liveness{}) { - err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness) - if err != nil { - return false, err - } + if err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness); err != nil { + return false, err } update := livenessUpdate{ @@ -577,9 +605,14 @@ func (nl *NodeLiveness) GetLivenessThreshold() time.Duration { // whether or not its liveness has expired regardless of the liveness status. It // is an error if the specified node is not in the local liveness table. func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { - liveness, err := nl.GetLiveness(nodeID) - if err != nil { - return false, err + liveness, ok := nl.GetLiveness(nodeID) + if !ok { + // TODO(irfansharif): We only expect callers to supply us with node IDs + // they learnt through existing liveness records, which implies we + // should never find ourselves here. We should clean up this conditional + // once we re-visit the caching structure used within NodeLiveness; + // we should be able to return ErrMissingLivenessRecord instead. + return false, errLivenessRecordCacheMiss } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to // consider clock signals from other nodes. @@ -631,9 +664,15 @@ func (nl *NodeLiveness) StartHeartbeat( func(ctx context.Context) error { // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - oldLiveness, err := nl.Self() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) + oldLiveness, ok := nl.Self() + if !ok { + nodeID := nl.gossip.NodeID.Get() + liveness, err := nl.getLivenessFromKV(ctx, nodeID) + if err != nil { + log.Infof(ctx, "unable to get liveness record from KV: %s", err) + continue + } + oldLiveness = liveness } if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { @@ -787,73 +826,19 @@ func (nl *NodeLiveness) heartbeatInternal( // would hit a ConditionFailedError and return a errNodeAlreadyLive down // below. if !incrementEpoch { - curLiveness, err := nl.Self() - if err == nil && minExpiration.Less(curLiveness.Expiration) { + curLiveness, ok := nl.Self() + if ok && minExpiration.Less(curLiveness.Expiration) { return nil } } - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness - if oldLiveness != (kvserverpb.Liveness{}) { - // Start off with our existing view of liveness. - newLiveness = oldLiveness - } else { - // We haven't seen our own liveness record yet[1]. This happens when - // we're heartbeating for the very first time. Let's retrieve it from KV - // before proceeding. - // - // [1]: Elsewhere we maintain the invariant that there always exist a - // liveness record for every given node. See the join RPC and - // WriteInitialClusterData for where that's done. - kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) - if err != nil { - return errors.Wrap(err, "unable to get liveness") - } - - if kv.Value != nil { - // This is the happy path. Let's unpack the liveness record we found - // within KV, and use that to inform what our new liveness should - // be. - if err := kv.Value.GetProto(&oldLiveness); err != nil { - return errors.Wrap(err, "invalid liveness record") - } - - oldLivenessRec := LivenessRecord{ - Liveness: oldLiveness, - raw: kv.Value.TagAndDataBytes(), - } - - // Update our cache with the liveness record we just found. - nl.maybeUpdate(oldLivenessRec) - - newLiveness = oldLiveness - } else { - // This is a "should basically never happen" scenario given our - // invariant around always persisting liveness records on node - // startup. But that was a change we added in 20.2. Though unlikely, - // it's possible to get into the following scenario: - // - // - v20.1 node gets added to v20.1 cluster, and is quickly removed - // before being able to persist its liveness record. - // - The cluster is upgraded to v20.2. - // - The node from earlier is rolled into v20.2, and re-added to the - // cluster. - // - It's never able to successfully heartbeat (it didn't join - // through the join rpc, bootstrap, or gossip). Welp. - // - // Given this possibility, we'll just fall back to creating the - // liveness record here as we did in v20.1 code. - // - // TODO(irfansharif): Remove this once v20.2 is cut. - log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 0, // incremented to epoch=1 below as needed - } - } + if oldLiveness == (kvserverpb.Liveness{}) { + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. Start off with our + // existing view of things. + newLiveness := oldLiveness if incrementEpoch { newLiveness.Epoch++ newLiveness.Draining = false // clear draining field @@ -878,7 +863,7 @@ func (nl *NodeLiveness) heartbeatInternal( } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) // If the actual liveness is different than expected, but is // considered live, treat the heartbeat as a success. This can @@ -919,26 +904,26 @@ func (nl *NodeLiveness) heartbeatInternal( } log.VEventf(ctx, 1, "heartbeat %+v", written.Expiration) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.HeartbeatSuccesses.Inc(1) return nil } -// Self returns the liveness record for this node. ErrNoLivenessRecord +// Self returns the liveness record for this node. ErrMissingLivenessRecord // is returned in the event that the node has neither heartbeat its // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. -func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) { - rec, err := nl.SelfEx() - if err != nil { - return kvserverpb.Liveness{}, err +func (nl *NodeLiveness) Self() (_ kvserverpb.Liveness, ok bool) { + rec, ok := nl.SelfEx() + if !ok { + return kvserverpb.Liveness{}, false } - return rec.Liveness, nil + return rec.Liveness, true } // SelfEx is like Self, but returns the raw, encoded value that the database has // for this liveness record in addition to the decoded liveness proto. -func (nl *NodeLiveness) SelfEx() (LivenessRecord, error) { +func (nl *NodeLiveness) SelfEx() (_ LivenessRecord, ok bool) { nl.mu.RLock() defer nl.mu.RUnlock() return nl.getLivenessLocked(nl.gossip.NodeID.Get()) @@ -989,22 +974,66 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness { return livenesses } -// GetLiveness returns the liveness record for the specified nodeID. -// ErrNoLivenessRecord is returned in the event that nothing is yet known about -// nodeID via liveness gossip. The record returned also includes the raw, -// encoded value that the database has for this liveness record in addition to -// the decoded liveness proto. -func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, error) { +// GetLiveness returns the liveness record for the specified nodeID. If the +// liveness record is not found (due to gossip propagation delays or due to the +// node not existing), we surface that to the caller. The record returned also +// includes the raw, encoded value that the database has for this liveness +// record in addition to the decoded liveness proto. +func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) { nl.mu.RLock() defer nl.mu.RUnlock() return nl.getLivenessLocked(nodeID) } -func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) { +// getLivenessLocked returns the liveness record for the specified nodeID, +// consulting the in-memory cache. If nothing is found (could happen due to +// gossip propagation delays or the node not existing), we surface that to the +// caller. +func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) { if l, ok := nl.mu.nodes[nodeID]; ok { - return l, nil + return l, true } - return LivenessRecord{}, ErrNoLivenessRecord + return LivenessRecord{}, false +} + +// getLivenessFromKV fetches the liveness record from KV for a given node, and +// updates the internal in-memory cache when doing so. +func (nl *NodeLiveness) getLivenessFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (kvserverpb.Liveness, error) { + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return kvserverpb.Liveness{}, err + } + return livenessRec.Liveness, nil +} + +// getLivenessRecordFromKV is like getLivenessFromKV, but returns the raw, +// encoded value that the database has for this liveness record in addition to +// the decoded liveness proto. +func (nl *NodeLiveness) getLivenessRecordFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (LivenessRecord, error) { + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return LivenessRecord{}, errors.Wrap(err, "unable to get liveness") + } + if kv.Value == nil { + return LivenessRecord{}, errors.AssertionFailedf("missing liveness record") + } + var liveness kvserverpb.Liveness + if err := kv.Value.GetProto(&liveness); err != nil { + return LivenessRecord{}, errors.Wrap(err, "invalid liveness record") + } + + livenessRec := LivenessRecord{ + Liveness: liveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(ctx, livenessRec) + return livenessRec, nil } // IncrementEpoch is called to attempt to revoke another node's @@ -1054,7 +1083,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. update.newLiveness.Epoch++ written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Epoch > liveness.Epoch { return ErrEpochAlreadyIncremented @@ -1068,7 +1097,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. } log.Infof(ctx, "incremented n%d liveness epoch to %d", written.NodeID, written.Epoch) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.EpochIncrements.Inc(1) return nil } @@ -1153,11 +1182,14 @@ func (nl *NodeLiveness) updateLivenessAttempt( log.Fatalf(ctx, "unexpected oldRaw when ignoreCache not specified") } - l, err := nl.GetLiveness(update.newLiveness.NodeID) - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - return LivenessRecord{}, err + l, ok := nl.GetLiveness(update.newLiveness.NodeID) + if !ok { + // TODO(irfansharif): See TODO in `NodeLiveness.IsLive`, the same + // applies to this conditional. We probably want to be able to + // return ErrMissingLivenessRecord here instead. + return LivenessRecord{}, errLivenessRecordCacheMiss } - if err == nil && l.Liveness != update.oldLiveness { + if l.Liveness != update.oldLiveness { return LivenessRecord{}, handleCondFailed(l) } oldRaw = l.raw @@ -1215,40 +1247,44 @@ func (nl *NodeLiveness) updateLivenessAttempt( // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. -func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { - // An empty new means that we haven't updated anything. - if new.Liveness == (kvserverpb.Liveness{}) { - return +func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec LivenessRecord) { + if newLivenessRec.Liveness == (kvserverpb.Liveness{}) { + log.Fatal(ctx, "invalid new liveness record; found to be empty") } + var shouldReplace bool nl.mu.Lock() - old := nl.mu.nodes[new.NodeID] + oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID) + if !ok { + shouldReplace = true + } else { + shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) + } - should := shouldReplaceLiveness(old.Liveness, new.Liveness) var callbacks []IsLiveCallback - if should { - nl.mu.nodes[new.NodeID] = new + if shouldReplace { + nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec callbacks = append(callbacks, nl.mu.callbacks...) } nl.mu.Unlock() - if !should { + if !shouldReplace { return } now := nl.clock.Now().GoTime() - if !old.IsLive(now) && new.IsLive(now) { + if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { for _, fn := range callbacks { - fn(new.Liveness) + fn(newLivenessRec.Liveness) } } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer // than the old liveness. -func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { +func shouldReplaceLiveness(ctx context.Context, old, new kvserverpb.Liveness) bool { if (old == kvserverpb.Liveness{}) { - return true + log.Fatal(ctx, "invalid old liveness record; found to be empty") } // Compare liveness information. If old < new, replace. @@ -1272,12 +1308,13 @@ func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { // in-memory liveness info up to date. func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { var liveness kvserverpb.Liveness + ctx := context.TODO() if err := content.GetProto(&liveness); err != nil { - log.Errorf(context.TODO(), "%v", err) + log.Errorf(ctx, "%v", err) return } - nl.maybeUpdate(LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) + nl.maybeUpdate(ctx, LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) } // numLiveNodes is used to populate a metric that tracks the number of live @@ -1289,8 +1326,6 @@ func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { // nodes reporting the metric, so it's simplest to just have all live nodes // report it. func (nl *NodeLiveness) numLiveNodes() int64 { - ctx := nl.ambientCtx.AnnotateCtx(context.Background()) - selfID := nl.gossip.NodeID.Get() if selfID == 0 { return 0 @@ -1299,12 +1334,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 { nl.mu.RLock() defer nl.mu.RUnlock() - self, err := nl.getLivenessLocked(selfID) - if errors.Is(err, ErrNoLivenessRecord) { - return 0 - } - if err != nil { - log.Warningf(ctx, "looking up own liveness: %+v", err) + self, ok := nl.getLivenessLocked(selfID) + if !ok { return 0 } now := nl.clock.Now().GoTime() @@ -1328,9 +1359,9 @@ func (nl *NodeLiveness) numLiveNodes() int64 { func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn { return func(nodeID roachpb.NodeID) (hlc.Timestamp, ctpb.Epoch, error) { now := nl.clock.Now() - liveness, err := nl.GetLiveness(nodeID) - if err != nil { - return hlc.Timestamp{}, 0, err + liveness, ok := nl.GetLiveness(nodeID) + if !ok { + return hlc.Timestamp{}, 0, errLivenessRecordCacheMiss } if !liveness.IsLive(now.GoTime()) { return hlc.Timestamp{}, 0, errLiveClockNotLive diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 4fc9aab9d70f..5c94867097e0 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -108,10 +109,8 @@ func TestNodeLiveness(t *testing.T) { } // Trigger a manual heartbeat and verify liveness is reestablished. for _, nl := range mtc.nodeLivenesses { - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) for { err := nl.Heartbeat(context.Background(), l) if err == nil { @@ -145,10 +144,8 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { // Verify liveness of all nodes for all nodes. verifyLiveness(t, mtc) - liveness, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) - if err != nil { - t.Fatal(err) - } + liveness, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + assert.True(t, ok) if liveness.Epoch != 1 { t.Errorf("expected epoch to be set to 1 initially; got %d", liveness.Epoch) } @@ -184,10 +181,8 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) { t.Fatalf("node %d not live", nodeID) } - livenessRec, err := nl.GetLiveness(nodeID) - if err != nil { - t.Fatal(err) - } + livenessRec, ok := nl.GetLiveness(nodeID) + assert.True(t, ok) if livenessRec.NodeID != nodeID { t.Fatalf("expected node ID %d, got %d", nodeID, livenessRec.NodeID) } @@ -204,9 +199,9 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) { func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) { testutils.SucceedsSoon(t, func() error { - liveness, err := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) - if err != nil { - return err + liveness, ok := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) + if !ok { + return errors.New("liveness not found") } if liveness.Epoch < 2 { return errors.Errorf("expected epoch to be >=2 on restart but was %d", liveness.Epoch) @@ -234,8 +229,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { nl.PauseHeartbeatLoopForTest() enableSync := nl.PauseSynchronousHeartbeatsForTest() - liveness, err := nl.Self() - require.NoError(t, err) + liveness, ok := nl.Self() + assert.True(t, ok) hbBefore := nl.Metrics().HeartbeatSuccesses.Count() require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) @@ -250,10 +245,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { if err := nl.Heartbeat(ctx, liveness); err != nil { return err } - livenessAfter, err := nl.Self() - if err != nil { - return err - } + livenessAfter, found := nl.Self() + assert.True(t, found) exp := livenessAfter.Expiration minExp := hlc.LegacyTimestamp(before.Add(nlActive.Nanoseconds(), 0)) if exp.Less(minExp) { @@ -283,8 +276,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) // Send one more heartbeat. Should update liveness record. - liveness, err = nl.Self() - require.NoError(t, err) + liveness, ok = nl.Self() + require.True(t, ok) require.NoError(t, nl.Heartbeat(ctx, liveness)) require.Equal(t, hbBefore+2, nl.Metrics().HeartbeatSuccesses.Count()) require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) @@ -316,10 +309,8 @@ func TestNodeIsLiveCallback(t *testing.T) { // Trigger a manual heartbeat and verify callbacks for each node ID are invoked. for _, nl := range mtc.nodeLivenesses { - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) if err := nl.Heartbeat(context.Background(), l); err != nil { t.Fatal(err) } @@ -375,10 +366,8 @@ func TestNodeHeartbeatCallback(t *testing.T) { // store. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) for _, nl := range mtc.nodeLivenesses { - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) if err := nl.Heartbeat(context.Background(), l); err != nil { t.Fatal(err) } @@ -409,10 +398,8 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { // First try to increment the epoch of a known-live node. deadNodeID := mtc.gossips[1].NodeID.Get() - oldLiveness, err := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) - if err != nil { - t.Fatal(err) - } + oldLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) + assert.True(t, ok) if err := mtc.nodeLivenesses[0].IncrementEpoch( ctx, oldLiveness.Liveness, ); !testutils.IsError(err, "cannot increment epoch on live node") { @@ -427,9 +414,9 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { // Verify that the epoch has been advanced. testutils.SucceedsSoon(t, func() error { - newLiveness, err := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) - if err != nil { - return err + newLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) + if !ok { + return errors.New("liveness not found") } if newLiveness.Epoch != oldLiveness.Epoch+1 { return errors.Errorf("expected epoch to increment") @@ -481,9 +468,10 @@ func TestNodeLivenessRestart(t *testing.T) { // seeing the liveness record properly gossiped at store startup. var expKeys []string for _, g := range mtc.gossips { - key := gossip.MakeNodeLivenessKey(g.NodeID.Get()) + nodeID := g.NodeID.Get() + key := gossip.MakeNodeLivenessKey(nodeID) expKeys = append(expKeys, key) - if err := g.AddInfoProto(key, &kvserverpb.Liveness{}, 0); err != nil { + if err := g.AddInfoProto(key, &kvserverpb.Liveness{NodeID: nodeID}, 0); err != nil { t.Fatal(err) } } @@ -543,9 +531,12 @@ func TestNodeLivenessSelf(t *testing.T) { // callback. var liveness kvserver.LivenessRecord testutils.SucceedsSoon(t, func() error { - var err error - liveness, err = mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) - return err + l, ok := mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) + if !ok { + return errors.New("liveness not found") + } + liveness = l + return nil }) if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) @@ -573,13 +564,11 @@ func TestNodeLivenessSelf(t *testing.T) { // Self should not see the fake liveness, but have kept the real one. l := mtc.nodeLivenesses[0] - lGetRec, err := l.GetLiveness(g.NodeID.Get()) - require.NoError(t, err) + lGetRec, ok := l.GetLiveness(g.NodeID.Get()) + require.True(t, ok) lGet := lGetRec.Liveness - lSelf, err := l.Self() - if err != nil { - t.Fatal(err) - } + lSelf, ok := l.Self() + assert.True(t, ok) if !reflect.DeepEqual(lGet, lSelf) { t.Errorf("expected GetLiveness() to return same value as Self(): %+v != %+v", lGet, lSelf) } @@ -612,7 +601,15 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + var liveness kvserver.LivenessRecord + testutils.SucceedsSoon(t, func() error { + livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if !ok { + return errors.New("liveness not found") + } + liveness = livenessRec + return nil + }) testutils.SucceedsSoon(t, func() error { if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { @@ -668,7 +665,15 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + var liveness kvserver.LivenessRecord + testutils.SucceedsSoon(t, func() error { + livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if !ok { + return errors.New("liveness not found") + } + liveness = livenessRec + return nil + }) if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) } @@ -711,10 +716,8 @@ func TestNodeLivenessConcurrentHeartbeats(t *testing.T) { // Advance clock past the liveness threshold & concurrently heartbeat node. nl := mtc.nodeLivenesses[0] mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) errCh := make(chan error, concurrency) for i := 0; i < concurrency; i++ { go func() { @@ -745,10 +748,8 @@ func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) { // Advance the clock and this time increment epoch concurrently for node 1. nl := mtc.nodeLivenesses[0] mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) - l, err := nl.GetLiveness(mtc.gossips[1].NodeID.Get()) - if err != nil { - t.Fatal(err) - } + l, ok := nl.GetLiveness(mtc.gossips[1].NodeID.Get()) + assert.True(t, ok) errCh := make(chan error, concurrency) for i := 0; i < concurrency; i++ { go func() { @@ -791,12 +792,16 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Verify success on failed update of a liveness record that already has the // given draining setting. if err := mtc.nodeLivenesses[drainingNodeIdx].SetDrainingInternal( - ctx, kvserver.LivenessRecord{}, false, + ctx, kvserver.LivenessRecord{Liveness: kvserverpb.Liveness{ + NodeID: drainingNodeID, + }}, false, ); err != nil { t.Fatal(err) } - mtc.nodeLivenesses[drainingNodeIdx].SetDraining(ctx, true /* drain */, nil /* reporter */) + if err := mtc.nodeLivenesses[drainingNodeIdx].SetDraining(ctx, true /* drain */, nil /* reporter */); err != nil { + t.Fatal(err) + } // Draining node disappears from store lists. { @@ -893,10 +898,8 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) { verifyLiveness(t, mtc) nl := mtc.nodeLivenesses[0] - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) // And again on manual heartbeat. injectError.Store(true) @@ -1077,8 +1080,10 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { // Verify success on failed update of a liveness record that already has the // given decommissioning setting. + oldLivenessRec, ok := callerNodeLiveness.GetLiveness(nodeID) + assert.True(t, ok) if _, err := callerNodeLiveness.SetDecommissioningInternal( - ctx, nodeID, kvserver.LivenessRecord{}, kvserverpb.MembershipStatus_ACTIVE, + ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { t.Fatal(err) } @@ -1135,7 +1140,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { // When the node simply never existed, expect an error. if _, err := mtc.nodeLivenesses[0].SetMembershipStatus( ctx, goneNodeID, kvserverpb.MembershipStatus_DECOMMISSIONING, - ); !errors.Is(err, kvserver.ErrNoLivenessRecord) { + ); !errors.Is(err, kvserver.ErrMissingLivenessRecord) { t.Fatal(err) } diff --git a/pkg/kv/kvserver/node_liveness_unit_test.go b/pkg/kv/kvserver/node_liveness_unit_test.go index 17fc439995dc..9b780ace0777 100644 --- a/pkg/kv/kvserver/node_liveness_unit_test.go +++ b/pkg/kv/kvserver/node_liveness_unit_test.go @@ -11,6 +11,7 @@ package kvserver import ( + "context" "fmt" "testing" @@ -58,8 +59,8 @@ func TestShouldReplaceLiveness(t *testing.T) { }{ { // Epoch update only. - kvserverpb.Liveness{}, l(1, hlc.Timestamp{}, false, "active"), + l(2, hlc.Timestamp{}, false, "active"), yes, }, { @@ -106,7 +107,7 @@ func TestShouldReplaceLiveness(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - if act := shouldReplaceLiveness(test.old, test.new); act != test.exp { + if act := shouldReplaceLiveness(context.Background(), test.old, test.new); act != test.exp { t.Errorf("unexpected update: %+v", test) } }) diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index 6ae6c71e3d4d..b5a475aaa4ee 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -165,8 +165,7 @@ func (rgcq *replicaGCQueue) shouldQueue( // dormant ranges. Make sure NodeLiveness isn't nil because it can be in // tests/benchmarks. if repl.store.cfg.NodeLiveness != nil { - if liveness, err := repl.store.cfg.NodeLiveness.Self(); err == nil && - !liveness.Membership.Active() { + if liveness, ok := repl.store.cfg.NodeLiveness.Self(); ok && !liveness.Membership.Active() { return true, replicaGCPriorityDefault } } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 2a6f4c379516..b4ab6989c3ab 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -224,12 +224,12 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( *reqLease.Expiration = status.Timestamp.Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0) } else { // Get the liveness for the next lease holder and set the epoch in the lease request. - liveness, err := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID) - if err != nil { + liveness, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID) + if !ok { llHandle.resolve(roachpb.NewError(&roachpb.LeaseRejectedError{ Existing: status.Lease, Requested: reqLease, - Message: fmt.Sprintf("couldn't request lease for %+v: %v", nextLeaseHolder, err), + Message: fmt.Sprintf("couldn't request lease for %+v: %v", nextLeaseHolder, errLivenessRecordCacheMiss), })) return llHandle } @@ -543,17 +543,17 @@ func (r *Replica) leaseStatus( if lease.Type() == roachpb.LeaseExpiration { expiration = lease.GetExpiration() } else { - l, err := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) + l, ok := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) status.Liveness = l.Liveness - if err != nil || status.Liveness.Epoch < lease.Epoch { + if !ok || status.Liveness.Epoch < lease.Epoch { // If lease validity can't be determined (e.g. gossip is down // and liveness info isn't available for owner), we can neither // use the lease nor do we want to attempt to acquire it. - if err != nil { + if !ok { if leaseStatusLogLimiter.ShouldLog() { ctx = r.AnnotateCtx(ctx) log.Warningf(ctx, "can't determine lease status of %s due to node liveness error: %+v", - lease.Replica, err) + lease.Replica, errLivenessRecordCacheMiss) } } status.State = kvserverpb.LeaseState_ERROR diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 4a3b25b79a7e..77c59440f0e8 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -103,8 +103,8 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc return func( nodeID roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration, ) kvserverpb.NodeLivenessStatus { - liveness, err := nodeLiveness.GetLiveness(nodeID) - if err != nil { + liveness, ok := nodeLiveness.GetLiveness(nodeID) + if !ok { return kvserverpb.NodeLivenessStatus_UNAVAILABLE } return LivenessStatus(liveness.Liveness, now, timeUntilStoreDead) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 59a7dd331da6..ccb233435218 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1368,9 +1368,9 @@ func (s *adminServer) checkReadinessForHealthCheck() error { // TODO(knz): update this code when progress is made on // https://github.com/cockroachdb/cockroach/issues/45123 - l, err := s.server.nodeLiveness.GetLiveness(s.server.NodeID()) - if err != nil { - return s.serverError(err) + l, ok := s.server.nodeLiveness.GetLiveness(s.server.NodeID()) + if !ok { + return status.Error(codes.Unavailable, "liveness record not found") } if !l.IsLive(s.server.clock.Now().GoTime()) { return status.Errorf(codes.Unavailable, "node is not healthy") @@ -1706,9 +1706,9 @@ func (s *adminServer) DecommissionStatus( var res serverpb.DecommissionStatusResponse for nodeID := range replicaCounts { - l, err := s.server.nodeLiveness.GetLiveness(nodeID) - if err != nil { - return nil, errors.Wrapf(err, "unable to get liveness for %d", nodeID) + l, ok := s.server.nodeLiveness.GetLiveness(nodeID) + if !ok { + return nil, errors.Newf("unable to get liveness for %d", nodeID) } nodeResp := serverpb.DecommissionStatusResponse_Status{ NodeID: l.NodeID, diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index 1a0dac8f3bf6..6fe67c3d4b42 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1365,10 +1365,8 @@ func TestHealthAPI(t *testing.T) { // server's clock. ts := s.(*TestServer) defer ts.nodeLiveness.PauseAllHeartbeatsForTest()() - self, err := ts.nodeLiveness.Self() - if err != nil { - t.Fatal(err) - } + self, ok := ts.nodeLiveness.Self() + assert.True(t, ok) s.Clock().Update(hlc.Timestamp(self.Expiration).Add(1, 0)) var resp serverpb.HealthResponse diff --git a/pkg/server/drain.go b/pkg/server/drain.go index 971dc69daba5..4af61016797f 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -229,6 +229,8 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf // drainNode initiates the draining mode for the node, which // starts draining range leases. func (s *Server) drainNode(ctx context.Context, reporter func(int, redact.SafeString)) error { - s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter) + if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { + return err + } return s.node.SetDraining(true /* drain */, reporter) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 65542bcf2f3f..69f089afdd0c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -86,7 +86,9 @@ import ( gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + grpcstatus "google.golang.org/grpc/status" ) var ( @@ -2000,6 +2002,9 @@ func (s *Server) Decommission( for _, nodeID := range nodeIDs { statusChanged, err := s.nodeLiveness.SetMembershipStatus(ctx, nodeID, targetStatus) if err != nil { + if errors.Is(err, kvserver.ErrMissingLivenessRecord) { + return grpcstatus.Error(codes.NotFound, kvserver.ErrMissingLivenessRecord.Error()) + } return err } if statusChanged { diff --git a/pkg/sql/optionalnodeliveness/node_liveness.go b/pkg/sql/optionalnodeliveness/node_liveness.go index 1cb5e07039a7..37892a4e164a 100644 --- a/pkg/sql/optionalnodeliveness/node_liveness.go +++ b/pkg/sql/optionalnodeliveness/node_liveness.go @@ -18,7 +18,7 @@ import ( // Interface is the interface used in Container. type Interface interface { - Self() (kvserverpb.Liveness, error) + Self() (kvserverpb.Liveness, bool) GetLivenesses() []kvserverpb.Liveness IsLive(roachpb.NodeID) (bool, error) }