diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index aadf9a2b5bdc..bb091f9e9f4c 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -196,11 +196,12 @@ type NodeLiveness struct { // heartbeatPaused contains an atomically-swapped number representing a bool // (1 or 0). heartbeatToken is a channel containing a token which is taken // when heartbeating or when pausing the heartbeat. Used for testing. - heartbeatPaused uint32 - heartbeatToken chan struct{} - metrics Metrics - onNodeDecommissioned func(livenesspb.Liveness) // noop if nil - engineSyncs singleflight.Group + heartbeatPaused uint32 + heartbeatToken chan struct{} + metrics Metrics + onNodeDecommissioned func(livenesspb.Liveness) // noop if nil + onNodeDecommissioning OnNodeDecommissionCallback // noop if nil + engineSyncs singleflight.Group mu struct { syncutil.RWMutex @@ -279,6 +280,9 @@ type NodeLivenessOptions struct { // idempotent as it may be invoked multiple times and defaults to a // noop. OnNodeDecommissioned func(livenesspb.Liveness) + // OnNodeDecommissioning is invoked when a node is detected to be + // decommissioning. + OnNodeDecommissioning OnNodeDecommissionCallback } // NewNodeLiveness returns a new instance of NodeLiveness configured @@ -696,6 +700,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool { !liveness.Draining } +// OnNodeDecommissionCallback is a callback that is invoked when a node is +// detected to be decommissioning. +type OnNodeDecommissionCallback func(nodeID roachpb.NodeID) + // NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`. type NodeLivenessStartOptions struct { Engines []storage.Engine @@ -1397,6 +1405,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) var shouldReplace bool nl.mu.Lock() + + // NB: shouldReplace will always be true right after a node restarts since the + // `nodes` map will be empty. This means that the callbacks called below will + // always be invoked at least once after node restarts. oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID) if !ok { shouldReplace = true @@ -1424,6 +1436,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil { nl.onNodeDecommissioned(newLivenessRec.Liveness) } + if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil { + nl.onNodeDecommissioning(newLivenessRec.NodeID) + } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 9546a9effc40..5a3cb1666ac8 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3408,6 +3408,10 @@ func (s *Store) Enqueue( ) (recording tracing.Recording, processError error, enqueueError error) { ctx = repl.AnnotateCtx(ctx) + if fn := s.TestingKnobs().EnqueueReplicaInterceptor; fn != nil { + fn(queueName, repl) + } + // Do not enqueue uninitialized replicas. The baseQueue ignores these during // normal queue scheduling, but we error here to signal to the user that the // operation was unsuccessful. @@ -3445,10 +3449,17 @@ func (s *Store) Enqueue( } if async { - // NB: 1e6 is a placeholder for now. We want to use a high enough priority - // to ensure that these replicas are priority-ordered first. + // NB: 1e5 is a placeholder for now. We want to use a high enough priority + // to ensure that these replicas are priority-ordered first (just below the + // replacement of dead replicas). + // + // TODO(aayush): Once we address + // https://github.com/cockroachdb/cockroach/issues/79266, we can consider + // removing the `AddAsync` path here and just use the `MaybeAddAsync` path, + // which will allow us to stop specifiying the priority ad-hoc. + const asyncEnqueuePriority = 1e5 if skipShouldQueue { - queue.AddAsync(ctx, repl, 1e6 /* prio */) + queue.AddAsync(ctx, repl, asyncEnqueuePriority) } else { queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp()) } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 16cb5ed51ba3..c18e374c819f 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -406,6 +406,9 @@ type StoreTestingKnobs struct { // AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the // send snapshot semaphore. AfterSendSnapshotThrottle func() + + // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. + EnqueueReplicaInterceptor func(queueName string, replica *Replica) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index e9454587ce7b..ceb84c2a720f 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -393,6 +393,17 @@ func (d ReplicaSet) ConfState() raftpb.ConfState { return cs } +// HasReplicaOnNode returns true iff the given nodeID is present in the +// ReplicaSet. +func (d ReplicaSet) HasReplicaOnNode(nodeID NodeID) bool { + for _, rep := range d.wrapped { + if rep.NodeID == nodeID { + return true + } + } + return false +} + // CanMakeProgress reports whether the given descriptors can make progress at // the replication layer. This is more complicated than just counting the number // of replicas due to the existence of joint quorums. diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index de6f653e0ce3..7eb1cb151ab5 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -2391,6 +2391,66 @@ func TestDecommissionSelf(t *testing.T) { } } +// TestDecommissionEnqueueReplicas tests that a decommissioning node's replicas +// are proactively enqueued into their replicateQueues by the other nodes in the +// system. +func TestDecommissionEnqueueReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // can't handle 7-node clusters + + ctx := context.Background() + enqueuedRangeIDs := make(chan roachpb.RangeID) + tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Insecure: true, // allows admin client without setting up certs + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EnqueueReplicaInterceptor: func( + queueName string, repl *kvserver.Replica, + ) { + require.Equal(t, queueName, "replicate") + enqueuedRangeIDs <- repl.RangeID + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + decommissionAndCheck := func(decommissioningSrvIdx int) { + t.Logf("decommissioning n%d", tc.Target(decommissioningSrvIdx).NodeID) + // Add a scratch range's replica to a node we will decommission. + scratchKey := tc.ScratchRange(t) + decommissioningSrv := tc.Server(decommissioningSrvIdx) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx)) + + conn, err := decommissioningSrv.RPCContext().GRPCDialNode( + decommissioningSrv.RPCAddr(), decommissioningSrv.NodeID(), rpc.DefaultClass, + ).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + decomNodeIDs := []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()} + _, err = adminClient.Decommission( + ctx, + &serverpb.DecommissionRequest{ + NodeIDs: decomNodeIDs, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Ensure that the scratch range's replica was proactively enqueued. + require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID) + } + + decommissionAndCheck(2 /* decommissioningSrvIdx */) + decommissionAndCheck(3 /* decommissioningSrvIdx */) + decommissionAndCheck(5 /* decommissioningSrvIdx */) +} + func TestAdminDecommissionedOperations(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index 250cb23c2016..32e8aba1bc65 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -13,21 +13,100 @@ package server import ( "context" "sort" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" ) +// decommissioningNodeMap tracks the set of nodes that we know are +// decommissioning. This map is used to inform whether we need to proactively +// enqueue some decommissioning node's ranges for rebalancing. +type decommissioningNodeMap struct { + syncutil.RWMutex + nodes map[roachpb.NodeID]interface{} +} + +// makeOnNodeDecommissioningCallback returns a callback that enqueues the +// decommissioning node's ranges into the `stores`' replicateQueues for +// rebalancing. +func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback( + stores *kvserver.Stores, +) liveness.OnNodeDecommissionCallback { + return func(decommissioningNodeID roachpb.NodeID) { + ctx := context.Background() + t.Lock() + defer t.Unlock() + if _, ok := t.nodes[decommissioningNodeID]; ok { + // We've already enqueued this node's replicas up for processing. + // Nothing more to do. + return + } + + logLimiter := log.Every(5 * time.Second) // avoid log spam + if err := stores.VisitStores(func(store *kvserver.Store) error { + // For each range that we have a lease for, check if it has a replica + // on the decommissioning node. If so, proactively enqueue this replica + // into our local replicateQueue. + store.VisitReplicas( + func(replica *kvserver.Replica) (wantMore bool) { + shouldEnqueue := replica.Desc().Replicas().HasReplicaOnNode(decommissioningNodeID) && + // Only bother enqueuing if we own the lease for this replica. + replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) + if !shouldEnqueue { + return true /* wantMore */ + } + _, processErr, enqueueErr := store.Enqueue( + // NB: We elide the shouldQueue check since we _know_ that the + // range being enqueued has replicas on a decommissioning node. + // Unfortunately, until + // https://github.com/cockroachdb/cockroach/issues/79266 is fixed, + // the shouldQueue() method can return false negatives (i.e. it + // would return false when it really shouldn't). + ctx, "replicate", replica, true /* skipShouldQueue */, true, /* async */ + ) + if processErr != nil && logLimiter.ShouldLog() { + // NB: The only case where we would expect to see a processErr when + // enqueuing a replica async is if it does not have the lease. We + // are checking that above, but that check is inherently racy. + log.Warningf( + ctx, "unexpected processing error when enqueuing replica asynchronously: %v", processErr, + ) + } + if enqueueErr != nil && logLimiter.ShouldLog() { + log.Warningf(ctx, "unable to enqueue replica: %s", enqueueErr) + } + return true /* wantMore */ + }) + return nil + }); err != nil { + // We're swallowing any errors above, so this shouldn't ever happen. + log.Fatalf( + ctx, "error while nudging replicas for decommissioning node n%d", decommissioningNodeID, + ) + } + } +} + +func (t *decommissioningNodeMap) onNodeDecommissioned(nodeID roachpb.NodeID) { + t.Lock() + defer t.Unlock() + // NB: We may have already deleted this node, but that's ok. + delete(t.nodes, nodeID) +} + func getPingCheckDecommissionFn( engines Engines, ) (*nodeTombstoneStorage, func(context.Context, roachpb.NodeID, codes.Code) error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 3113c0b05663..afbbc2d5ee64 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -86,7 +86,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - sentry "github.com/getsentry/sentry-go" + "github.com/getsentry/sentry-go" "google.golang.org/grpc/codes" ) @@ -398,6 +398,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { return nil, err } + stores := kvserver.NewStores(cfg.AmbientCtx, clock) + + decomNodeMap := &decommissioningNodeMap{ + nodes: make(map[roachpb.NodeID]interface{}), + } nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ AmbientCtx: cfg.AmbientCtx, Stopper: stopper, @@ -408,6 +413,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { RenewalDuration: nlRenewal, Settings: st, HistogramWindowInterval: cfg.HistogramWindowInterval(), + // When we learn that a node is decommissioning, we want to proactively + // enqueue the ranges we have that also have a replica on the + // decommissioning node. + OnNodeDecommissioning: decomNodeMap.makeOnNodeDecommissioningCallback(stores), OnNodeDecommissioned: func(liveness livenesspb.Liveness) { if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil { knobs.OnDecommissionedCallback(liveness) @@ -417,6 +426,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ); err != nil { log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err) } + + decomNodeMap.onNodeDecommissioned(liveness.NodeID) }, }) registry.AddMetricStruct(nodeLiveness.Metrics()) @@ -441,7 +452,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) - stores := kvserver.NewStores(cfg.AmbientCtx, clock) ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */) // The InternalExecutor will be further initialized later, as we create more @@ -665,10 +675,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) node := NewNode( - storeCfg, recorder, registry, stopper, - txnMetrics, stores, nil /* execCfg */, cfg.ClusterIDContainer, - gcoords.Regular.GetWorkQueue(admission.KVWork), gcoords.Stores, - tenantUsage, tenantSettingsWatcher, spanConfig.kvAccessor, + storeCfg, + recorder, + registry, + stopper, + txnMetrics, + stores, + nil, + cfg.ClusterIDContainer, + gcoords.Regular.GetWorkQueue(admission.KVWork), + gcoords.Stores, + tenantUsage, + tenantSettingsWatcher, + spanConfig.kvAccessor, ) roachpb.RegisterInternalServer(grpcServer.Server, node) kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)