From 8d275dce1b5476d7c4224629d0dfda79c488392c Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 15 May 2020 16:43:41 -0400 Subject: [PATCH] kvserver: don't use marshaled proto for liveness CPut Fixes #38308 Updates to the liveness records are done as CPuts. Before this patch, the CPuts' expected value was the re-marshaled proto with the previous version. As #38308 explains, that's a bad practice since it prevents the proto's encoding to change in any way (e.g. fields can't be removed). Instead, this patch moves to keeping track of the raw bytes that have been read from the DB, besides the unmarshalled liveness protos. The raw bytes are used as the expected values. Release note: None --- pkg/kv/batch.go | 37 +-- pkg/kv/kvserver/below_raft_protos_test.go | 8 - pkg/kv/kvserver/helpers_test.go | 4 +- pkg/kv/kvserver/node_liveness.go | 279 ++++++++++++++-------- pkg/kv/kvserver/node_liveness_test.go | 38 +-- pkg/kv/kvserver/replica_range_lease.go | 4 +- pkg/kv/kvserver/storagepb/liveness.go | 9 + pkg/kv/kvserver/storagepb/liveness.pb.go | 78 +++--- pkg/kv/kvserver/storagepb/liveness.proto | 8 +- pkg/kv/kvserver/store_pool.go | 2 +- pkg/kv/kvserver/track_raft_protos.go | 8 - 11 files changed, 263 insertions(+), 212 deletions(-) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 4ca3a632e842..f6f330c8cbfc 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -410,25 +410,12 @@ func (b *Batch) PutInline(key, value interface{}) { // and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. value can be any key type, a -// protoutil.Message or any Go primitive type (bool, int, etc). +// protoutil.Message, a *roachpb.Value, or any Go primitive type (bool, int, +// etc). func (b *Batch) CPut(key, value interface{}, expValue *roachpb.Value) { b.cputInternal(key, value, expValue, false) } -// CPutDeprecated conditionally sets the value for a key if the existing value is equal -// to expValue. To conditionally set a value only if there is no existing entry -// pass nil for expValue. Note that this must be an interface{}(nil), not a -// typed nil value (e.g. []byte(nil)). -// -// A new result will be appended to the batch which will contain a single row -// and Result.Err will indicate success or failure. -// -// key can be either a byte slice or a string. value can be any key type, a -// protoutil.Message or any Go primitive type (bool, int, etc). -func (b *Batch) CPutDeprecated(key, value, expValue interface{}) { - b.cputInternalDeprecated(key, value, expValue, false) -} - // CPutAllowingIfNotExists is like CPut except it also allows the Put when the // existing entry does not exist -- i.e. it succeeds if there is no existing // entry or the existing entry has the expected value. @@ -459,26 +446,6 @@ func (b *Batch) cputInternal(key, value interface{}, expValue *roachpb.Value, al b.initResult(1, 1, notRaw, nil) } -func (b *Batch) cputInternalDeprecated(key, value, expValue interface{}, allowNotExist bool) { - k, err := marshalKey(key) - if err != nil { - b.initResult(0, 1, notRaw, err) - return - } - v, err := marshalValue(value) - if err != nil { - b.initResult(0, 1, notRaw, err) - return - } - ev, err := marshalValue(expValue) - if err != nil { - b.initResult(0, 1, notRaw, err) - return - } - b.appendReqs(roachpb.NewConditionalPut(k, v, ev, allowNotExist)) - b.initResult(1, 1, notRaw, nil) -} - // InitPut sets the first value for a key to value. An ConditionFailedError is // reported if a value already exists for the key and it's not equal to the // value passed in. If failOnTombstones is set to true, tombstones will return diff --git a/pkg/kv/kvserver/below_raft_protos_test.go b/pkg/kv/kvserver/below_raft_protos_test.go index 891548ea2872..05543e06104f 100644 --- a/pkg/kv/kvserver/below_raft_protos_test.go +++ b/pkg/kv/kvserver/below_raft_protos_test.go @@ -17,7 +17,6 @@ import ( "reflect" "testing" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storagepb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -92,13 +91,6 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ emptySum: 13621293256077144893, populatedSum: 13375098491754757572, }, - reflect.TypeOf(&storagepb.Liveness{}): { - populatedConstructor: func(r *rand.Rand) protoutil.Message { - return storagepb.NewPopulatedLiveness(r, false) - }, - emptySum: 892800390935990883, - populatedSum: 16231745342114354146, - }, // This is used downstream of Raft only to write it into unreplicated keyspace // as part of VersionUnreplicatedRaftTruncatedState. // However, it has been sent through Raft for a long time, as part of diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index ebb1d41e157d..1bf78e7e345c 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -500,13 +500,13 @@ func (r *Replica) ReadProtectedTimestamps(ctx context.Context) { } func (nl *NodeLiveness) SetDrainingInternal( - ctx context.Context, liveness storagepb.Liveness, drain bool, + ctx context.Context, liveness LivenessRecord, drain bool, ) error { return nl.setDrainingInternal(ctx, liveness, drain, nil /* reporter */) } func (nl *NodeLiveness) SetDecommissioningInternal( - ctx context.Context, nodeID roachpb.NodeID, liveness storagepb.Liveness, decommission bool, + ctx context.Context, nodeID roachpb.NodeID, liveness LivenessRecord, decommission bool, ) (changeCommitted bool, err error) { return nl.setDecommissioningInternal(ctx, nodeID, liveness, decommission) } diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index aac527b1df23..593e9592c8ce 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -167,7 +167,7 @@ type NodeLiveness struct { mu struct { syncutil.RWMutex callbacks []IsLiveCallback - nodes map[roachpb.NodeID]storagepb.Liveness + nodes map[roachpb.NodeID]LivenessRecord heartbeatCallback HeartbeatCallback // Before heartbeating, we write to each of these engines to avoid // maintaining liveness when a local disks is stalled. @@ -175,6 +175,17 @@ type NodeLiveness struct { } } +// LivenessRecord is a liveness record that has been read from the database, +// together with its database encoding. The encoding is useful for CPut-ing an +// update to the liveness record: the raw value will act as the expected value. +// This way the proto's encoding can change without the CPut failing. +type LivenessRecord struct { + storagepb.Liveness + // raw represents the raw bytes read from the database. Nil if the value + // doesn't exist in the DB. + raw *roachpb.Value +} + // NewNodeLiveness returns a new instance of NodeLiveness configured // with the specified gossip instance. func NewNodeLiveness( @@ -206,7 +217,7 @@ func NewNodeLiveness( EpochIncrements: metric.NewCounter(metaEpochIncrements), HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, histogramWindow), } - nl.mu.nodes = map[roachpb.NodeID]storagepb.Liveness{} + nl.mu.nodes = make(map[roachpb.NodeID]LivenessRecord) nl.heartbeatToken <- struct{}{} livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) @@ -234,7 +245,7 @@ func (nl *NodeLiveness) sem(nodeID roachpb.NodeID) chan struct{} { func (nl *NodeLiveness) SetDraining(ctx context.Context, drain bool, reporter func(int, string)) { ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { - liveness, err := nl.Self() + liveness, err := nl.SelfEx() if err != nil && !errors.Is(err, ErrNoLivenessRecord) { log.Errorf(ctx, "unexpected error getting liveness: %+v", err) } @@ -300,20 +311,29 @@ func (nl *NodeLiveness) SetDecommissioning( // gossip instead? (I have vague concerns about concurrent reads // and timestamp cache pushes causing problems here) var oldLiveness storagepb.Liveness - if err := nl.db.GetProto(ctx, keys.NodeLivenessKey(nodeID), &oldLiveness); err != nil { + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { return false, errors.Wrap(err, "unable to get liveness") } - if (oldLiveness == storagepb.Liveness{}) { + if kv.Value == nil { return false, ErrNoLivenessRecord } + if err := kv.Value.GetProto(&oldLiveness); err != nil { + return false, errors.Wrap(err, "invalid liveness record") + } + + rec := LivenessRecord{ + Liveness: oldLiveness, + raw: kv.Value, + } // We may have discovered a Liveness not yet received via Gossip. Offer it // to make sure that when we actually try to update the liveness, the // previous view is correct. This, too, is required to de-flake // TestNodeLivenessDecommissionAbsent. - nl.maybeUpdate(oldLiveness) + nl.maybeUpdate(rec) - return nl.setDecommissioningInternal(ctx, nodeID, oldLiveness, decommission) + return nl.setDecommissioningInternal(ctx, nodeID, rec, decommission) } for { @@ -326,7 +346,7 @@ func (nl *NodeLiveness) SetDecommissioning( } func (nl *NodeLiveness) setDrainingInternal( - ctx context.Context, liveness storagepb.Liveness, drain bool, reporter func(int, string), + ctx context.Context, liveness LivenessRecord, drain bool, reporter func(int, string), ) error { nodeID := nl.gossip.NodeID.Get() sem := nl.sem(nodeID) @@ -341,28 +361,32 @@ func (nl *NodeLiveness) setDrainingInternal( }() update := livenessUpdate{ - Liveness: storagepb.Liveness{ + updated: storagepb.Liveness{ NodeID: nodeID, Epoch: 1, }, + old: liveness.Liveness, + ignoreCache: true, + oldRaw: liveness.raw, } - if liveness != (storagepb.Liveness{}) { - update.Liveness = liveness + if liveness.Liveness != (storagepb.Liveness{}) { + update.updated = liveness.Liveness } - if reporter != nil && drain && !update.Draining { + + if reporter != nil && drain && !update.updated.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") } - update.Draining = drain - update.ignoreCache = true + update.updated.Draining = drain - if err := nl.updateLiveness(ctx, update, liveness, func(actual storagepb.Liveness) error { + written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { nl.maybeUpdate(actual) - if actual.Draining == update.Draining { + if actual.Draining == update.updated.Draining { return errNodeDrainingSet } - return errors.New("failed to update liveness record") - }); err != nil { + return errors.New("failed to update liveness record because record has changed") + }) + if err != nil { if log.V(1) { log.Infof(ctx, "updating liveness record: %v", err) } @@ -371,38 +395,56 @@ func (nl *NodeLiveness) setDrainingInternal( } return err } - nl.maybeUpdate(update.Liveness) + nl.maybeUpdate(written) return nil } +// livenessUpdate contains the information for CPutting a new version of a +// liveness record. It has both the new and the old version of the proto. type livenessUpdate struct { - storagepb.Liveness + updated storagepb.Liveness + old storagepb.Liveness // When ignoreCache is set, we won't assume that our in-memory cached version // of the liveness record is accurate and will use a CPut on the liveness - // table with whatever the client supplied. This is used for operations that - // don't want to deal with the inconsistencies of using the cache. + // table with the old value supplied by the client (oldRaw). This is used for + // operations that don't want to deal with the inconsistencies of using the + // cache. + // + // When ignoreCache is not set, the state of the cache is checked against old and, + // if they don't correspond, the CPut is considered to have failed. + // + // When ignoreCache is set, oldRaw needs to be set as well. ignoreCache bool + // oldRaw is the raw value from which `old` was decoded. Used for CPuts as the + // existing value. Note that we don't simply marshal `old` as that would break + // if unmarshalling/marshaling doesn't round-trip. Nil means that a liveness + // record for the respected node is not expected to exist in the database. + // + // oldRaw must not be set when ignoreCache is not set. + oldRaw *roachpb.Value } func (nl *NodeLiveness) setDecommissioningInternal( - ctx context.Context, nodeID roachpb.NodeID, liveness storagepb.Liveness, decommission bool, + ctx context.Context, nodeID roachpb.NodeID, liveness LivenessRecord, decommission bool, ) (changeCommitted bool, err error) { update := livenessUpdate{ - Liveness: storagepb.Liveness{ + updated: storagepb.Liveness{ NodeID: nodeID, Epoch: 1, }, + old: liveness.Liveness, + ignoreCache: true, + oldRaw: liveness.raw, } - if liveness != (storagepb.Liveness{}) { - update.Liveness = liveness + if liveness.Liveness != (storagepb.Liveness{}) { + update.updated = liveness.Liveness } - update.Decommissioning = decommission - update.ignoreCache = true + update.updated.Decommissioning = decommission var conditionFailed bool - if err := nl.updateLiveness(ctx, update, liveness, func(actual storagepb.Liveness) error { + if _, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { conditionFailed = true - if actual.Decommissioning == update.Decommissioning { + if actual.Decommissioning == update.updated.Decommissioning { return nil } return errChangeDecommissioningFailed @@ -585,33 +627,34 @@ func (nl *NodeLiveness) heartbeatInternal( }() update := livenessUpdate{ - Liveness: storagepb.Liveness{ + updated: storagepb.Liveness{ NodeID: nodeID, Epoch: 1, }, + old: liveness, } if liveness != (storagepb.Liveness{}) { - update.Liveness = liveness + update.updated = liveness if incrementEpoch { - update.Epoch++ + update.updated.Epoch++ // Clear draining field. - update.Draining = false + update.updated.Draining = false } } // We need to add the maximum clock offset to the expiration because it's // used when determining liveness for a node. { - update.Expiration = hlc.LegacyTimestamp( + update.updated.Expiration = hlc.LegacyTimestamp( nl.clock.Now().Add((nl.livenessThreshold).Nanoseconds(), 0)) // This guards against the system clock moving backwards. As long // as the cockroach process is running, checks inside hlc.Clock // will ensure that the clock never moves backwards, but these // checks don't work across process restarts. - if update.Expiration.Less(liveness.Expiration) { + if update.updated.Expiration.Less(liveness.Expiration) { return errors.Errorf("proposed liveness update expires earlier than previous record") } } - if err := nl.updateLiveness(ctx, update, liveness, func(actual storagepb.Liveness) error { + written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. nl.maybeUpdate(actual) // If the actual liveness is different than expected, but is @@ -631,7 +674,8 @@ func (nl *NodeLiveness) heartbeatInternal( } // Otherwise, return error. return ErrEpochIncremented - }); err != nil { + }) + if err != nil { if errors.Is(err, errNodeAlreadyLive) { nl.metrics.HeartbeatSuccesses.Inc(1) return nil @@ -640,8 +684,8 @@ func (nl *NodeLiveness) heartbeatInternal( return err } - log.VEventf(ctx, 1, "heartbeat %+v", update.Expiration) - nl.maybeUpdate(update.Liveness) + log.VEventf(ctx, 1, "heartbeat %+v", written.Expiration) + nl.maybeUpdate(written) nl.metrics.HeartbeatSuccesses.Inc(1) return nil } @@ -651,6 +695,16 @@ func (nl *NodeLiveness) heartbeatInternal( // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. func (nl *NodeLiveness) Self() (storagepb.Liveness, error) { + rec, err := nl.SelfEx() + if err != nil { + return storagepb.Liveness{}, err + } + return rec.Liveness, err +} + +// SelfEx is like Self, but returns the raw, encoded value that the database has +// for this liveness record in addition to the decoded liveness proto. +func (nl *NodeLiveness) SelfEx() (LivenessRecord, error) { nl.mu.RLock() defer nl.mu.RUnlock() return nl.getLivenessLocked(nl.gossip.NodeID.Get()) @@ -696,25 +750,27 @@ func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness { defer nl.mu.RUnlock() livenesses := make([]storagepb.Liveness, 0, len(nl.mu.nodes)) for _, l := range nl.mu.nodes { - livenesses = append(livenesses, l) + livenesses = append(livenesses, l.Liveness) } return livenesses } // GetLiveness returns the liveness record for the specified nodeID. -// ErrNoLivenessRecord is returned in the event that nothing is yet -// known about nodeID via liveness gossip. -func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (storagepb.Liveness, error) { +// ErrNoLivenessRecord is returned in the event that nothing is yet known about +// nodeID via liveness gossip. The record returned also includes the raw, +// encoded value that the database has for this liveness record in addition to +// the decoded liveness proto. +func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, error) { nl.mu.RLock() defer nl.mu.RUnlock() return nl.getLivenessLocked(nodeID) } -func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (storagepb.Liveness, error) { +func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) { if l, ok := nl.mu.nodes[nodeID]; ok { return l, nil } - return storagepb.Liveness{}, ErrNoLivenessRecord + return LivenessRecord{}, ErrNoLivenessRecord } // IncrementEpoch is called to attempt to revoke another node's @@ -756,9 +812,13 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness storagepb.L if liveness.IsLive(nl.clock.Now().GoTime()) { return errors.Errorf("cannot increment epoch on live node: %+v", liveness) } - update := livenessUpdate{Liveness: liveness} - update.Epoch++ - if err := nl.updateLiveness(ctx, update, liveness, func(actual storagepb.Liveness) error { + + update := livenessUpdate{ + updated: liveness, + old: liveness, + } + update.updated.Epoch++ + written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { defer nl.maybeUpdate(actual) if actual.Epoch > liveness.Epoch { return ErrEpochAlreadyIncremented @@ -766,12 +826,13 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness storagepb.L return errors.Errorf("unexpected liveness epoch %d; expected >= %d", actual.Epoch, liveness.Epoch) } return errors.Errorf("mismatch incrementing epoch for %+v; actual is %+v", liveness, actual) - }); err != nil { + }) + if err != nil { return err } - log.Infof(ctx, "incremented n%d liveness epoch to %d", update.NodeID, update.Epoch) - nl.maybeUpdate(update.Liveness) + log.Infof(ctx, "incremented n%d liveness epoch to %d", written.NodeID, written.Epoch) + nl.maybeUpdate(written) nl.metrics.EpochIncrements.Inc(1) return nil } @@ -790,28 +851,29 @@ func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { nl.mu.callbacks = append(nl.mu.callbacks, cb) } -// updateLiveness does a conditional put on the node liveness record for the -// node specified by nodeID. In the event that the conditional put fails, and -// the handleCondFailed callback is not nil, it's invoked with the actual node -// liveness record and nil is returned for an error. If handleCondFailed is nil, -// any conditional put failure is returned as an error to the caller. The -// conditional put is done as a 1PC transaction with a ModifiedSpanTrigger which -// indicates the node liveness record that the range leader should gossip on -// commit. +// updateLiveness does a conditional put on a node liveness record. In the event +// that the conditional put fails, and the handleCondFailed callback is not nil, +// it's invoked with the actual node liveness record and nil is returned for an +// error. If handleCondFailed is nil, any conditional put failure is returned as +// an error to the caller. The conditional put is done as a 1PC transaction with +// a ModifiedSpanTrigger which indicates the node liveness record that the range +// leader should gossip on commit. // // updateLiveness terminates certain errors that are expected to occur // sporadically, such as TransactionStatusError (due to the 1PC requirement of // the liveness txn, and ambiguous results). +// +// If the CPut is successful (i.e. no error is returned and handleCondFailed is +// not called), the value that has been written is returned as a LivenessRecord. +// This includes the encoded bytes, and it can be used to update the local +// cache. func (nl *NodeLiveness) updateLiveness( - ctx context.Context, - update livenessUpdate, - oldLiveness storagepb.Liveness, - handleCondFailed func(actual storagepb.Liveness) error, -) error { + ctx context.Context, update livenessUpdate, handleCondFailed func(actual LivenessRecord) error, +) (LivenessRecord, error) { for { // Before each attempt, ensure that the context has not expired. if err := ctx.Err(); err != nil { - return err + return LivenessRecord{}, err } nl.mu.RLock() @@ -823,52 +885,67 @@ func (nl *NodeLiveness) updateLiveness( // shifted to other nodes. A slow/stalled disk would block here and cause // the node to lose its leases. if err := storage.WriteSyncNoop(ctx, eng); err != nil { - return errors.Wrapf(err, "couldn't update node liveness because disk write failed") + return LivenessRecord{}, errors.Wrapf(err, "couldn't update node liveness because disk write failed") } } - if err := nl.updateLivenessAttempt(ctx, update, oldLiveness, handleCondFailed); err != nil { + written, err := nl.updateLivenessAttempt(ctx, update, handleCondFailed) + if err != nil { // Intentionally don't errors.Cause() the error, or we'd hop past errRetryLiveness. if errors.HasType(err, (*errRetryLiveness)(nil)) { log.Infof(ctx, "retrying liveness update after %s", err) continue } - return err + return LivenessRecord{}, err } - return nil + return written, nil } } func (nl *NodeLiveness) updateLivenessAttempt( - ctx context.Context, - update livenessUpdate, - oldLiveness storagepb.Liveness, - handleCondFailed func(actual storagepb.Liveness) error, -) error { - // First check the existing liveness map to avoid known conditional - // put failures. - if !update.ignoreCache { - l, err := nl.GetLiveness(update.NodeID) + ctx context.Context, update livenessUpdate, handleCondFailed func(actual LivenessRecord) error, +) (LivenessRecord, error) { + var oldRaw *roachpb.Value + if update.ignoreCache { + // If ignoreCache is set, the caller is manually providing the previous + // value in update.oldRaw. + oldRaw = update.oldRaw + } else { + // Check the existing liveness map to avoid known conditional put + // failures. The raw value from the map also helps us in doing the CPut. + if update.oldRaw != nil { + log.Fatalf(ctx, "unexpected oldRaw when ignoreCache not specified") + } + + l, err := nl.GetLiveness(update.updated.NodeID) if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - return err + return LivenessRecord{}, err } - if err == nil && l != oldLiveness { - return handleCondFailed(l) + if err == nil && l.Liveness != update.old { + return LivenessRecord{}, handleCondFailed(l) } + oldRaw = l.raw } + v := new(roachpb.Value) if err := nl.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() - key := keys.NodeLivenessKey(update.NodeID) - val := update.Liveness - if oldLiveness == (storagepb.Liveness{}) { - b.CPut(key, &val, nil) - } else { - expVal := oldLiveness - // TODO(andrei): Plumb along oldLiveness as the raw bytes we read from the - // database, not as a proto, so that the proto's encoding can change. See - // #38308. If we do that, we can remove Liveness from belowRaftProtos. - b.CPutDeprecated(key, &val, &expVal) + key := keys.NodeLivenessKey(update.updated.NodeID) + if err := v.SetProto(&update.updated); err != nil { + log.Fatalf(ctx, "failed to marshall proto: %s", err) + } + // Make a copy of the expected value. We can't pass oldRaw because b.CPut() + // modifies it. + var valCpy *roachpb.Value + if oldRaw != nil { + var err error + rawBytes, err := oldRaw.GetBytes() + if err != nil { + log.Fatal(ctx, err.Error()) + } + valCpy = new(roachpb.Value) + valCpy.SetBytes(rawBytes) } + b.CPut(key, v, valCpy) // Use a trigger on EndTxn to indicate that node liveness should be // re-gossiped. Further, require that this transaction complete as a one // phase commit to eliminate the possibility of leaving write intents. @@ -889,19 +966,19 @@ func (nl *NodeLiveness) updateLivenessAttempt( if tErr := (*roachpb.ConditionFailedError)(nil); errors.As(err, &tErr) { if handleCondFailed != nil { if tErr.ActualValue == nil { - return handleCondFailed(storagepb.Liveness{}) + return LivenessRecord{}, handleCondFailed(LivenessRecord{}) } var actualLiveness storagepb.Liveness if err := tErr.ActualValue.GetProto(&actualLiveness); err != nil { - return errors.Wrapf(err, "couldn't update node liveness from CPut actual value") + return LivenessRecord{}, errors.Wrapf(err, "couldn't update node liveness from CPut actual value") } - return handleCondFailed(actualLiveness) + return LivenessRecord{}, handleCondFailed(LivenessRecord{Liveness: actualLiveness, raw: tErr.ActualValue}) } } else if errors.HasType(err, (*roachpb.TransactionStatusError)(nil)) || errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) { - return &errRetryLiveness{err} + return LivenessRecord{}, &errRetryLiveness{err} } - return err + return LivenessRecord{}, err } nl.mu.RLock() @@ -910,16 +987,16 @@ func (nl *NodeLiveness) updateLivenessAttempt( if cb != nil { cb(ctx) } - return nil + return LivenessRecord{Liveness: update.updated, raw: v}, nil } // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. -func (nl *NodeLiveness) maybeUpdate(new storagepb.Liveness) { +func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { nl.mu.Lock() // Note that this works fine even if `old` is empty. old := nl.mu.nodes[new.NodeID] - should := shouldReplaceLiveness(old, new) + should := shouldReplaceLiveness(old.Liveness, new.Liveness) var callbacks []IsLiveCallback if should { nl.mu.nodes[new.NodeID] = new @@ -972,7 +1049,7 @@ func (nl *NodeLiveness) livenessGossipUpdate(key string, content roachpb.Value) return } - nl.maybeUpdate(liveness) + nl.maybeUpdate(LivenessRecord{Liveness: liveness, raw: &content}) } // numLiveNodes is used to populate a metric that tracks the number of live diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index bf016fbf7509..4066ff737060 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -268,6 +268,7 @@ func TestNodeHeartbeatCallback(t *testing.T) { // live. func TestNodeLivenessEpochIncrement(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() mtc := &multiTestContext{} defer mtc.Stop() mtc.Start(t, 2) @@ -282,13 +283,14 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { t.Fatal(err) } if err := mtc.nodeLivenesses[0].IncrementEpoch( - context.Background(), oldLiveness); !testutils.IsError(err, "cannot increment epoch on live node") { + ctx, oldLiveness.Liveness, + ); !testutils.IsError(err, "cannot increment epoch on live node") { t.Fatalf("expected error incrementing a live node: %+v", err) } // Advance clock past liveness threshold & increment epoch. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - if err := mtc.nodeLivenesses[0].IncrementEpoch(context.Background(), oldLiveness); err != nil { + if err := mtc.nodeLivenesses[0].IncrementEpoch(ctx, oldLiveness.Liveness); err != nil { t.Fatalf("unexpected error incrementing a non-live node: %+v", err) } @@ -316,14 +318,17 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { } // Verify error on incrementing an already-incremented epoch. - if err := mtc.nodeLivenesses[0].IncrementEpoch(context.Background(), oldLiveness); !errors.Is(err, kvserver.ErrEpochAlreadyIncremented) { + if err := mtc.nodeLivenesses[0].IncrementEpoch( + ctx, oldLiveness.Liveness, + ); !errors.Is(err, kvserver.ErrEpochAlreadyIncremented) { t.Fatalf("unexpected error incrementing a non-live node: %+v", err) } // Verify error incrementing with a too-high expectation for liveness epoch. oldLiveness.Epoch = 3 if err := mtc.nodeLivenesses[0].IncrementEpoch( - context.Background(), oldLiveness); !testutils.IsError(err, "unexpected liveness epoch 2; expected >= 3") { + ctx, oldLiveness.Liveness, + ); !testutils.IsError(err, "unexpected liveness epoch 2; expected >= 3") { t.Fatalf("expected error incrementing with a too-high expected epoch: %+v", err) } } @@ -403,13 +408,13 @@ func TestNodeLivenessSelf(t *testing.T) { // Verify liveness is properly initialized. This needs to be wrapped in a // SucceedsSoon because node liveness gets initialized via an async gossip // callback. - var liveness storagepb.Liveness + var liveness kvserver.LivenessRecord testutils.SucceedsSoon(t, func() error { var err error liveness, err = mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) return err }) - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness); err != nil { + if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) } @@ -435,10 +440,9 @@ func TestNodeLivenessSelf(t *testing.T) { // Self should not see the fake liveness, but have kept the real one. l := mtc.nodeLivenesses[0] - lGet, err := l.GetLiveness(g.NodeID.Get()) - if err != nil { - t.Fatal(err) - } + lGetRec, err := l.GetLiveness(g.NodeID.Get()) + require.NoError(t, err) + lGet := lGetRec.Liveness lSelf, err := l.Self() if err != nil { t.Fatal(err) @@ -474,7 +478,7 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) testutils.SucceedsSoon(t, func() error { - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness); err != nil { + if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { if errors.Is(err, kvserver.ErrEpochIncremented) { return err } @@ -524,7 +528,7 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness); err != nil { + if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) } @@ -605,7 +609,7 @@ func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) { errCh := make(chan error, concurrency) for i := 0; i < concurrency; i++ { go func() { - errCh <- nl.IncrementEpoch(context.Background(), l) + errCh <- nl.IncrementEpoch(context.Background(), l.Liveness) }() } for i := 0; i < concurrency; i++ { @@ -642,7 +646,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Verify success on failed update of a liveness record that already has the // given draining setting. - if err := mtc.nodeLivenesses[drainingNodeIdx].SetDrainingInternal(ctx, storagepb.Liveness{}, false); err != nil { + if err := mtc.nodeLivenesses[drainingNodeIdx].SetDrainingInternal( + ctx, kvserver.LivenessRecord{}, false, + ); err != nil { t.Fatal(err) } @@ -924,7 +930,9 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { // Verify success on failed update of a liveness record that already has the // given decommissioning setting. - if _, err := callerNodeLiveness.SetDecommissioningInternal(ctx, nodeID, storagepb.Liveness{}, false); err != nil { + if _, err := callerNodeLiveness.SetDecommissioningInternal( + ctx, nodeID, kvserver.LivenessRecord{}, false, + ); err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 681b1babc661..5f0c84996df6 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -543,8 +543,8 @@ func (r *Replica) leaseStatus( if lease.Type() == roachpb.LeaseExpiration { expiration = lease.GetExpiration() } else { - var err error - status.Liveness, err = r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) + l, err := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) + status.Liveness = l.Liveness if err != nil || status.Liveness.Epoch < lease.Epoch { // If lease validity can't be determined (e.g. gossip is down // and liveness info isn't available for owner), we can neither diff --git a/pkg/kv/kvserver/storagepb/liveness.go b/pkg/kv/kvserver/storagepb/liveness.go index c73f37ef6e9c..3c338a7fe06f 100644 --- a/pkg/kv/kvserver/storagepb/liveness.go +++ b/pkg/kv/kvserver/storagepb/liveness.go @@ -11,6 +11,7 @@ package storagepb import ( + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -39,3 +40,11 @@ func (l *Liveness) IsDead(now time.Time, threshold time.Duration) bool { deadAsOf := expiration.Add(threshold) return !now.Before(deadAsOf) } + +func (l Liveness) String() string { + var extra string + if l.Draining || l.Decommissioning { + extra = fmt.Sprintf(" drain:%t decom:%t", l.Draining, l.Decommissioning) + } + return fmt.Sprintf("liveness(nid:%d epo:%d exp:%s%s)", l.NodeID, l.Epoch, l.Expiration, extra) +} diff --git a/pkg/kv/kvserver/storagepb/liveness.pb.go b/pkg/kv/kvserver/storagepb/liveness.pb.go index a7e15bc8e0a7..6c60f99a0591 100644 --- a/pkg/kv/kvserver/storagepb/liveness.pb.go +++ b/pkg/kv/kvserver/storagepb/liveness.pb.go @@ -67,13 +67,14 @@ func (x NodeLivenessStatus) String() string { return proto.EnumName(NodeLivenessStatus_name, int32(x)) } func (NodeLivenessStatus) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_liveness_ef3e281353e7c6bb, []int{0} + return fileDescriptor_liveness_be892b291ea2d19e, []int{0} } // Liveness holds information about a node's latest heartbeat and epoch. // -// NOTE: Care must be taken when changing the encoding of this proto -// because it is used as part of conditional put operations. +// NOTE: 20.1 encodes this proto and uses it for CPut operations, so its +// encoding can't change until 21.1. 20.2 has moved away from the bad practice. +// In 21.1 we should replace the LegacyTimestamp field with a regular Timestamp. type Liveness struct { NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,1,opt,name=node_id,json=nodeId,proto3,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id,omitempty"` // Epoch is a monotonically-increasing value for node liveness. It @@ -90,16 +91,17 @@ type Liveness struct { // is large in comparison to the max offset, and that nodes heartbeat their // liveness records well in advance of this expiration, so the optimism or // pessimism of a checker does not matter very much. + // + // TODO(andrei): Change this to a regular Timestamp field in 21.1. Expiration hlc.LegacyTimestamp `protobuf:"bytes,3,opt,name=expiration,proto3" json:"expiration"` Draining bool `protobuf:"varint,4,opt,name=draining,proto3" json:"draining,omitempty"` Decommissioning bool `protobuf:"varint,5,opt,name=decommissioning,proto3" json:"decommissioning,omitempty"` } -func (m *Liveness) Reset() { *m = Liveness{} } -func (m *Liveness) String() string { return proto.CompactTextString(m) } -func (*Liveness) ProtoMessage() {} +func (m *Liveness) Reset() { *m = Liveness{} } +func (*Liveness) ProtoMessage() {} func (*Liveness) Descriptor() ([]byte, []int) { - return fileDescriptor_liveness_ef3e281353e7c6bb, []int{0} + return fileDescriptor_liveness_be892b291ea2d19e, []int{0} } func (m *Liveness) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -584,37 +586,37 @@ var ( ) func init() { - proto.RegisterFile("kv/kvserver/storagepb/liveness.proto", fileDescriptor_liveness_ef3e281353e7c6bb) + proto.RegisterFile("kv/kvserver/storagepb/liveness.proto", fileDescriptor_liveness_be892b291ea2d19e) } -var fileDescriptor_liveness_ef3e281353e7c6bb = []byte{ - // 438 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x51, 0xcb, 0x6a, 0xdb, 0x40, - 0x14, 0xd5, 0xd8, 0xb2, 0xe3, 0x8e, 0xa1, 0x16, 0xd3, 0x2c, 0x8c, 0x17, 0x92, 0x69, 0xbb, 0x10, - 0x2d, 0x68, 0xa0, 0xd9, 0x75, 0x27, 0x57, 0xa2, 0x88, 0x2a, 0x32, 0xd8, 0x4d, 0x0a, 0xd9, 0x04, - 0x69, 0x34, 0x48, 0x83, 0x1e, 0x23, 0xa4, 0xb1, 0x68, 0xff, 0xa2, 0xd0, 0x1f, 0xe8, 0x67, 0xf4, - 0x13, 0xbc, 0xcc, 0x32, 0x2b, 0xd3, 0xca, 0x7f, 0xd1, 0x55, 0x91, 0x5c, 0x3b, 0x21, 0xbb, 0x73, - 0xcf, 0x9c, 0x7b, 0xe7, 0x9e, 0x7b, 0xe0, 0xeb, 0xa4, 0xc6, 0x49, 0x5d, 0xd1, 0xb2, 0xa6, 0x25, - 0xae, 0x04, 0x2f, 0xfd, 0x88, 0x16, 0x01, 0x4e, 0x59, 0x4d, 0x73, 0x5a, 0x55, 0x46, 0x51, 0x72, - 0xc1, 0x91, 0x46, 0x38, 0x49, 0x4a, 0xee, 0x93, 0xd8, 0x48, 0x6a, 0xe3, 0xa8, 0x37, 0x4e, 0xfa, - 0x99, 0xb6, 0x11, 0x2c, 0xc5, 0x71, 0x4a, 0x70, 0x4a, 0x23, 0x9f, 0x7c, 0xbb, 0x15, 0x2c, 0xa3, - 0x95, 0xf0, 0xb3, 0xe2, 0x30, 0x61, 0x76, 0x1e, 0xf1, 0x88, 0x77, 0x10, 0xb7, 0xe8, 0xc0, 0xbe, - 0xfc, 0xd1, 0x83, 0x23, 0xf7, 0xff, 0x57, 0xe8, 0x06, 0x9e, 0xe5, 0x3c, 0xa4, 0xb7, 0x2c, 0x9c, - 0x82, 0x39, 0xd0, 0x07, 0x0b, 0xb3, 0xd9, 0x69, 0x43, 0x8f, 0x87, 0xd4, 0xb1, 0xfe, 0xee, 0xb4, - 0x8b, 0x88, 0x89, 0x78, 0x13, 0x18, 0x84, 0x67, 0xf8, 0xb4, 0x4e, 0x18, 0x3c, 0x60, 0x5c, 0x24, - 0x11, 0xee, 0x50, 0x11, 0x18, 0x87, 0xb6, 0xd5, 0xb0, 0x9d, 0xe8, 0x84, 0xe8, 0x1c, 0x0e, 0x68, - 0xc1, 0x49, 0x3c, 0xed, 0xcd, 0x81, 0xde, 0x5f, 0x1d, 0x0a, 0xe4, 0x40, 0x48, 0xbf, 0x16, 0xac, - 0xf4, 0x05, 0xe3, 0xf9, 0xb4, 0x3f, 0x07, 0xfa, 0xf8, 0xdd, 0x2b, 0xe3, 0xc1, 0x6b, 0x6b, 0xca, - 0x88, 0x53, 0x62, 0xb8, 0x9d, 0xa9, 0xcf, 0x47, 0x4f, 0x0b, 0x79, 0xbb, 0xd3, 0xa4, 0xd5, 0xa3, - 0x66, 0x34, 0x83, 0xa3, 0xb0, 0xf4, 0x59, 0xce, 0xf2, 0x68, 0x2a, 0xcf, 0x81, 0x3e, 0x5a, 0x9d, - 0x6a, 0xa4, 0xc3, 0x49, 0x48, 0x09, 0xcf, 0x32, 0x56, 0x55, 0x8c, 0x77, 0x92, 0x41, 0x27, 0x79, - 0x4a, 0xbf, 0x97, 0x7f, 0xfd, 0xd4, 0xc0, 0x1b, 0x0e, 0x51, 0xbb, 0xfe, 0xf1, 0x30, 0x6b, 0xe1, - 0x8b, 0x4d, 0x85, 0xc6, 0xf0, 0xec, 0xca, 0xfb, 0xe4, 0x2d, 0xbf, 0x78, 0x8a, 0x84, 0x46, 0x50, - 0xb6, 0x6c, 0xd3, 0x52, 0x00, 0x9a, 0xc0, 0xf1, 0x95, 0x67, 0x5e, 0x9b, 0x8e, 0x6b, 0x2e, 0x5c, - 0x5b, 0xe9, 0xb5, 0x4f, 0xae, 0x73, 0x6d, 0x2b, 0x7d, 0xf4, 0x02, 0x4e, 0x2c, 0xfb, 0xc3, 0xf2, - 0xf2, 0xd2, 0x59, 0xaf, 0x9d, 0xa5, 0xe7, 0x78, 0x1f, 0x15, 0x19, 0x21, 0xf8, 0xfc, 0x31, 0x69, - 0x5b, 0xca, 0x60, 0xf1, 0x76, 0xfb, 0x47, 0x95, 0xb6, 0x8d, 0x0a, 0xee, 0x1a, 0x15, 0xdc, 0x37, - 0x2a, 0xf8, 0xdd, 0xa8, 0xe0, 0xfb, 0x5e, 0x95, 0xee, 0xf6, 0xaa, 0x74, 0xbf, 0x57, 0xa5, 0x9b, - 0x67, 0xa7, 0xa8, 0x83, 0x61, 0x17, 0xdd, 0xc5, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x19, 0x0a, - 0x8d, 0x78, 0x3a, 0x02, 0x00, 0x00, +var fileDescriptor_liveness_be892b291ea2d19e = []byte{ + // 443 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x91, 0xbd, 0x8e, 0x9b, 0x4e, + 0x14, 0xc5, 0x19, 0x7f, 0x2d, 0xff, 0xb1, 0xf4, 0x37, 0x9a, 0x6c, 0x61, 0xb9, 0x00, 0x2b, 0x49, + 0x81, 0x12, 0x89, 0x91, 0xb2, 0x5d, 0x3a, 0x1c, 0x50, 0x84, 0xc2, 0x62, 0xc9, 0xce, 0x6e, 0xa4, + 0x6d, 0x56, 0x30, 0x8c, 0x60, 0xc4, 0xc7, 0x20, 0x18, 0xa3, 0xe4, 0x2d, 0xd2, 0x25, 0xe5, 0x3e, + 0x46, 0x1e, 0xc1, 0xe5, 0x96, 0x5b, 0x59, 0x09, 0x7e, 0x8b, 0x54, 0x11, 0x38, 0xf6, 0xae, 0xd2, + 0x9d, 0x7b, 0xe6, 0xdc, 0x3b, 0xf3, 0x9b, 0x0b, 0x5f, 0x26, 0x35, 0x4e, 0xea, 0x8a, 0x96, 0x35, + 0x2d, 0x71, 0x25, 0x78, 0xe9, 0x47, 0xb4, 0x08, 0x70, 0xca, 0x6a, 0x9a, 0xd3, 0xaa, 0x32, 0x8a, + 0x92, 0x0b, 0x8e, 0x34, 0xc2, 0x49, 0x52, 0x72, 0x9f, 0xc4, 0x46, 0x52, 0x1b, 0xc7, 0xbc, 0x71, + 0xca, 0xcf, 0xb4, 0x8d, 0x60, 0x29, 0x8e, 0x53, 0x82, 0x53, 0x1a, 0xf9, 0xe4, 0xcb, 0xad, 0x60, + 0x19, 0xad, 0x84, 0x9f, 0x15, 0x87, 0x09, 0xb3, 0xf3, 0x88, 0x47, 0xbc, 0x93, 0xb8, 0x55, 0x07, + 0xf7, 0xf9, 0xb7, 0x1e, 0x94, 0xdd, 0xbf, 0x57, 0xa1, 0x1b, 0x78, 0x96, 0xf3, 0x90, 0xde, 0xb2, + 0x70, 0x0a, 0xe6, 0x40, 0x1f, 0x2e, 0xcc, 0x66, 0xa7, 0x8d, 0x3c, 0x1e, 0x52, 0xc7, 0xfa, 0xbd, + 0xd3, 0x2e, 0x22, 0x26, 0xe2, 0x4d, 0x60, 0x10, 0x9e, 0xe1, 0xd3, 0x73, 0xc2, 0xe0, 0x51, 0xe3, + 0x22, 0x89, 0x70, 0xa7, 0x8a, 0xc0, 0x38, 0xb4, 0xad, 0x46, 0xed, 0x44, 0x27, 0x44, 0xe7, 0x70, + 0x48, 0x0b, 0x4e, 0xe2, 0x69, 0x6f, 0x0e, 0xf4, 0xfe, 0xea, 0x50, 0x20, 0x07, 0x42, 0xfa, 0xb9, + 0x60, 0xa5, 0x2f, 0x18, 0xcf, 0xa7, 0xfd, 0x39, 0xd0, 0xc7, 0x6f, 0x5e, 0x18, 0x8f, 0xac, 0x2d, + 0x94, 0x11, 0xa7, 0xc4, 0x70, 0x3b, 0xa8, 0x8f, 0x47, 0xa6, 0xc5, 0x60, 0xbb, 0xd3, 0xa4, 0xd5, + 0x93, 0x66, 0x34, 0x83, 0x72, 0x58, 0xfa, 0x2c, 0x67, 0x79, 0x34, 0x1d, 0xcc, 0x81, 0x2e, 0xaf, + 0x4e, 0x35, 0xd2, 0xe1, 0x24, 0xa4, 0x84, 0x67, 0x19, 0xab, 0x2a, 0xc6, 0xbb, 0xc8, 0xb0, 0x8b, + 0xfc, 0x6b, 0xbf, 0x95, 0xbf, 0xdf, 0x69, 0xd2, 0x8f, 0x3b, 0x0d, 0xbc, 0xe2, 0x10, 0xb5, 0x08, + 0xc7, 0xcf, 0x59, 0x0b, 0x5f, 0x6c, 0x2a, 0x34, 0x86, 0x67, 0x57, 0xde, 0x07, 0x6f, 0xf9, 0xc9, + 0x53, 0x24, 0x24, 0xc3, 0x81, 0x65, 0x9b, 0x96, 0x02, 0xd0, 0x04, 0x8e, 0xaf, 0x3c, 0xf3, 0xda, + 0x74, 0x5c, 0x73, 0xe1, 0xda, 0x4a, 0xaf, 0x3d, 0x72, 0x9d, 0x6b, 0x5b, 0xe9, 0xa3, 0x67, 0x70, + 0x62, 0xd9, 0xef, 0x96, 0x97, 0x97, 0xce, 0x7a, 0xed, 0x2c, 0x3d, 0xc7, 0x7b, 0xaf, 0x0c, 0x10, + 0x82, 0xff, 0x3f, 0x35, 0x6d, 0x4b, 0x19, 0x2e, 0x5e, 0x6f, 0x7f, 0xa9, 0xd2, 0xb6, 0x51, 0xc1, + 0x7d, 0xa3, 0x82, 0x87, 0x46, 0x05, 0x3f, 0x1b, 0x15, 0x7c, 0xdd, 0xab, 0xd2, 0xfd, 0x5e, 0x95, + 0x1e, 0xf6, 0xaa, 0x74, 0xf3, 0xdf, 0x69, 0xdd, 0xc1, 0xa8, 0x5b, 0xdf, 0xc5, 0x9f, 0x00, 0x00, + 0x00, 0xff, 0xff, 0x44, 0x54, 0x3d, 0x44, 0x3e, 0x02, 0x00, 0x00, } diff --git a/pkg/kv/kvserver/storagepb/liveness.proto b/pkg/kv/kvserver/storagepb/liveness.proto index 878067108bba..c097564b2b95 100644 --- a/pkg/kv/kvserver/storagepb/liveness.proto +++ b/pkg/kv/kvserver/storagepb/liveness.proto @@ -17,10 +17,12 @@ import "gogoproto/gogo.proto"; // Liveness holds information about a node's latest heartbeat and epoch. // -// NOTE: Care must be taken when changing the encoding of this proto -// because it is used as part of conditional put operations. +// NOTE: 20.1 encodes this proto and uses it for CPut operations, so its +// encoding can't change until 21.1. 20.2 has moved away from the bad practice. +// In 21.1 we should replace the LegacyTimestamp field with a regular Timestamp. message Liveness { option (gogoproto.populate) = true; + option (gogoproto.goproto_stringer) = false; int32 node_id = 1 [(gogoproto.customname) = "NodeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; @@ -38,6 +40,8 @@ message Liveness { // is large in comparison to the max offset, and that nodes heartbeat their // liveness records well in advance of this expiration, so the optimism or // pessimism of a checker does not matter very much. + // + // TODO(andrei): Change this to a regular Timestamp field in 21.1. util.hlc.LegacyTimestamp expiration = 3 [(gogoproto.nullable) = false]; bool draining = 4; bool decommissioning = 5; diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 0b6e629db604..9d8bb5fa8506 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -107,7 +107,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc if err != nil { return storagepb.NodeLivenessStatus_UNAVAILABLE } - return LivenessStatus(liveness, now, timeUntilStoreDead) + return LivenessStatus(liveness.Liveness, now, timeUntilStoreDead) } } diff --git a/pkg/kv/kvserver/track_raft_protos.go b/pkg/kv/kvserver/track_raft_protos.go index a1a878c43513..ebe36a984837 100644 --- a/pkg/kv/kvserver/track_raft_protos.go +++ b/pkg/kv/kvserver/track_raft_protos.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/compactor" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storagepb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -60,13 +59,6 @@ func TrackRaftProtos() func() []reflect.Type { inner: make(map[reflect.Type]struct{}), } - // Hard-coded protos for which we don't want to change the encoding. These - // are not "below raft" in the normal sense, but instead are used as part of - // conditional put operations. This is a bad practice - see #38308. - belowRaftProtos.Lock() - belowRaftProtos.inner[reflect.TypeOf(&storagepb.Liveness{})] = struct{}{} - belowRaftProtos.Unlock() - protoutil.Interceptor = func(pb protoutil.Message) { t := reflect.TypeOf(pb)