From 393d1ee380260133fa3b860ebdc973ff51093df5 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 1 Jul 2024 23:36:35 -0400 Subject: [PATCH] kv: hook Raft StoreLiveness into storeliveness package Fixes #125242. This commit adds a `replicaRLockedStoreLiveness` adapter type to hook the raft store liveness into the storeliveness package. This is currently unused. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 4 ++ pkg/kv/kvserver/replica_init.go | 3 +- pkg/kv/kvserver/replica_store_liveness.go | 75 ++++++++++++++++++++ pkg/kv/kvserver/store.go | 8 +++ pkg/raft/raftstoreliveness/store_liveness.go | 8 ++- 5 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 pkg/kv/kvserver/replica_store_liveness.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 30788d39ca78..12c45129b105 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -74,6 +74,7 @@ go_library( "replica_send.go", "replica_split_load.go", "replica_sst_snapshot_storage.go", + "replica_store_liveness.go", "replica_tscache.go", "replica_write.go", "replicate_queue.go", @@ -166,6 +167,8 @@ go_library( "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/split", "//pkg/kv/kvserver/stateloader", + "//pkg/kv/kvserver/storeliveness", + "//pkg/kv/kvserver/storeliveness/storelivenesspb", "//pkg/kv/kvserver/tenantrate", "//pkg/kv/kvserver/tscache", "//pkg/kv/kvserver/txnrecovery", @@ -176,6 +179,7 @@ go_library( "//pkg/multitenant/tenantcostmodel", "//pkg/raft", "//pkg/raft/raftpb", + "//pkg/raft/raftstoreliveness", "//pkg/raft/tracker", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 1ab02c394c19..a8c72337179c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -294,11 +294,12 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { ctx := r.AnnotateCtx(context.Background()) rg, err := raft.NewRawNode(newRaftConfig( ctx, - raft.Storage((*replicaRaftStorage)(r)), + (*replicaRaftStorage)(r), raftpb.PeerID(r.replicaID), r.mu.state.RaftAppliedIndex, r.store.cfg, &raftLogger{ctx: ctx}, + (*replicaRLockedStoreLiveness)(r), )) if err != nil { return err diff --git a/pkg/kv/kvserver/replica_store_liveness.go b/pkg/kv/kvserver/replica_store_liveness.go new file mode 100644 index 000000000000..d6d4c4aa4291 --- /dev/null +++ b/pkg/kv/kvserver/replica_store_liveness.go @@ -0,0 +1,75 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// replicaRLockedStoreLiveness implements the raftstoreliveness.StoreLiveness +// interface. The interface methods assume that Replica.mu is held in read mode +// by their callers. +type replicaRLockedStoreLiveness Replica + +var _ raftstoreliveness.StoreLiveness = (*replicaRLockedStoreLiveness)(nil) + +func (r *replicaRLockedStoreLiveness) getStoreIdent(replicaID uint64) (slpb.StoreIdent, bool) { + r.mu.AssertRHeld() + desc, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(replicaID)) + if !ok { + return slpb.StoreIdent{}, false + } + return slpb.StoreIdent{NodeID: desc.NodeID, StoreID: desc.StoreID}, true +} + +// SupportFor implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportFor( + replicaID uint64, +) (raftstoreliveness.StoreLivenessEpoch, bool) { + storeID, ok := r.getStoreIdent(replicaID) + if !ok { + return 0, false + } + epoch, ok := r.store.storeLiveness.SupportFor(storeID) + if !ok { + return 0, false + } + return raftstoreliveness.StoreLivenessEpoch(epoch), true +} + +// SupportFrom implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportFrom( + replicaID uint64, +) (raftstoreliveness.StoreLivenessEpoch, hlc.Timestamp, bool) { + storeID, ok := r.getStoreIdent(replicaID) + if !ok { + return 0, hlc.Timestamp{}, false + } + epoch, exp, ok := r.store.storeLiveness.SupportFrom(storeID) + if !ok { + return 0, hlc.Timestamp{}, false + } + return raftstoreliveness.StoreLivenessEpoch(epoch), exp, true +} + +// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportFromEnabled() bool { + // TODO(nvanbenschoten): hook this up to a version check and cluster setting. + return false +} + +// SupportExpired implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportExpired(ts hlc.Timestamp) bool { + return ts.Less(r.store.Clock().Now()) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index aff99ad05f1f..c7300abaea80 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery" @@ -61,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -387,6 +389,7 @@ func newRaftConfig( appliedIndex kvpb.RaftIndex, storeCfg StoreConfig, logger raft.Logger, + storeLiveness raftstoreliveness.StoreLiveness, ) *raft.Config { return &raft.Config{ ID: id, @@ -402,6 +405,7 @@ func newRaftConfig( MaxInflightBytes: storeCfg.RaftMaxInflightBytes, Storage: strg, Logger: logger, + StoreLiveness: storeLiveness, // We only set this on replica initialization, so replicas without // StepDownOnRemoval may remain on 23.2 nodes until they restart. That's @@ -893,6 +897,7 @@ type Store struct { metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager + storeLiveness storeliveness.Fabric syncWaiter *logstore.SyncWaiterLoop raftEntryCache *raftentry.Cache limiters batcheval.Limiters @@ -2154,6 +2159,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { ) s.metrics.registry.AddMetricStruct(s.recoveryMgr.Metrics()) + // TODO(mira): create the store liveness support manager here. + // s.storeLiveness = ... + s.rangeIDAlloc = idAlloc now := s.cfg.Clock.Now() diff --git a/pkg/raft/raftstoreliveness/store_liveness.go b/pkg/raft/raftstoreliveness/store_liveness.go index fc37bfb913de..758101ba7070 100644 --- a/pkg/raft/raftstoreliveness/store_liveness.go +++ b/pkg/raft/raftstoreliveness/store_liveness.go @@ -64,7 +64,9 @@ type StoreLiveness interface { // active or not, which is what prompts the "SupportFrom" prefix. SupportFromEnabled() bool - // SupportInPast returns whether the supplied timestamp is before the current - // time. - SupportInPast(ts hlc.Timestamp) bool + // SupportExpired returns whether the supplied expiration timestamp is before + // the present time and has therefore expired. If the method returns false, + // the timestamp is still in the future and still provides support up to that + // point in time. + SupportExpired(ts hlc.Timestamp) bool }