Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: react to decommissioning nodes by proactively enqueuing their replicas #80993

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -565,7 +565,7 @@ WHERE start_pretty LIKE '%s' ORDER BY start_key ASC`, startPretty)).Scan(&startK
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
testutils.SucceedsSoon(t, func() error {
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
require.NoError(t, err)
return checkGCTrace(trace.String())
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.5s'
return errors.New(`could not find replica`)
}
for _, queueName := range []string{"split", "replicate", "raftsnapshot"} {
_, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl,
true /* skipShouldQueue */)
_, processErr, err := store.Enqueue(
ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */
)
if processErr != nil {
return processErr
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return nil
})
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)

require.NoError(t, enqueueError)

Expand Down Expand Up @@ -907,7 +907,9 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
// We don't know who the leaseholder might be, so ignore errors.
_, _, _ = tc.GetFirstStoreFromServer(t, i).ManuallyEnqueue(ctx, "replicate", repl, true)
_, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
repl, err := store.GetReplica(desc.RangeID)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestProtectedTimestamps(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica()
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down Expand Up @@ -200,13 +200,13 @@ func TestProtectedTimestamps(t *testing.T) {
s, repl := getStoreAndReplica()
// The protectedts record will prevent us from aging the MVCC garbage bytes
// past the oldest record so shouldQueue should be false. Verify that.
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)shouldQueue=false", trace.String())

// If we skipShouldQueue then gc will run but it should only run up to the
// timestamp of our record at the latest.
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */)
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String())
thresh := thresholdFromTrace(trace)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestProtectedTimestamps(t *testing.T) {
// happens up to the protected timestamp of the new record.
require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID()))
testutils.SucceedsSoon(t, func() error {
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down
50 changes: 33 additions & 17 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ type NodeLiveness struct {
// heartbeatPaused contains an atomically-swapped number representing a bool
// (1 or 0). heartbeatToken is a channel containing a token which is taken
// when heartbeating or when pausing the heartbeat. Used for testing.
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics Metrics
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
engineSyncs singleflight.Group
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics Metrics
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
engineSyncs singleflight.Group

mu struct {
syncutil.RWMutex
Expand Down Expand Up @@ -279,24 +280,28 @@ type NodeLivenessOptions struct {
// idempotent as it may be invoked multiple times and defaults to a
// noop.
OnNodeDecommissioned func(livenesspb.Liveness)
// OnNodeDecommissioning is invoked when a node is detected to be
// decommissioning.
OnNodeDecommissioning OnNodeDecommissionCallback
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
nl := &NodeLiveness{
ambientCtx: opts.AmbientCtx,
stopper: opts.Stopper,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
renewalDuration: opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
ambientCtx: opts.AmbientCtx,
stopper: opts.Stopper,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
renewalDuration: opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
onNodeDecommissioning: opts.OnNodeDecommissioning,
}
nl.metrics = Metrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
Expand Down Expand Up @@ -696,6 +701,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
!liveness.Draining
}

// OnNodeDecommissionCallback is a callback that is invoked when a node is
// detected to be decommissioning.
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)

// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Engines []storage.Engine
Expand Down Expand Up @@ -1397,6 +1406,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)

var shouldReplace bool
nl.mu.Lock()

// NB: shouldReplace will always be true right after a node restarts since the
// `nodes` map will be empty. This means that the callbacks called below will
// always be invoked at least once after node restarts.
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
if !ok {
shouldReplace = true
Expand Down Expand Up @@ -1424,6 +1437,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
nl.onNodeDecommissioned(newLivenessRec.Liveness)
}
if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil {
nl.onNodeDecommissioning(newLivenessRec.NodeID)
}
}

// shouldReplaceLiveness checks to see if the new liveness is in fact newer
Expand Down
40 changes: 30 additions & 10 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,12 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
// needs a snapshot.
recording, pErr, err := leaseholderStore.ManuallyEnqueue(
ctx, "raftsnapshot", leaseholderRepl, false, /* skipShouldQueue */
recording, pErr, err := leaseholderStore.Enqueue(
ctx,
"raftsnapshot",
leaseholderRepl,
false, /* skipShouldQueue */
false, /* async */
)
if pErr != nil {
return pErr
Expand Down Expand Up @@ -751,7 +755,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
{
require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
_, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
_, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
Expand All @@ -769,7 +775,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
ltk.withStopAfterJointConfig(func() {
desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2))
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down Expand Up @@ -808,7 +816,9 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) {
// Run the replicaGC queue.
checkNoGC := func() roachpb.RangeDescriptor {
store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
const msg = `not gc'able, replica is still in range descriptor: (n2,s2):`
Expand Down Expand Up @@ -868,7 +878,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
// raft to figure out that the replica needs a snapshot.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1004,7 +1016,9 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
queue1ErrCh := make(chan error, 1)
go func() {
queue1ErrCh <- func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1484,7 +1498,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
_, processErr, enqueueErr := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
_, processErr, enqueueErr := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.True(t, kvserver.IsReplicationChangeInProgressError(processErr))
return nil
Expand Down Expand Up @@ -1529,7 +1545,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
})

store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down Expand Up @@ -1564,7 +1582,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
checkTransitioningOut := func() {
t.Helper()
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,11 +1204,8 @@ func TestReplicateQueueShouldQueueNonVoter(t *testing.T) {
// because we know that it is the leaseholder (since it is the only voting
// replica).
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
recording, processErr, err := store.ManuallyEnqueue(
ctx,
"replicate",
repl,
false, /* skipShouldQueue */
recording, processErr, err := store.Enqueue(
ctx, "replicate", repl, false /* skipShouldQueue */, false, /* async */
)
if err != nil {
log.Errorf(ctx, "err: %s", err.Error())
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type replicaQueue interface {
// the queue's inclusion criteria and the queue is not already
// too full, etc.
MaybeAddAsync(context.Context, replicaInQueue, hlc.ClockTimestamp)
// AddAsync adds the replica to the queue without checking whether the replica
// meets the queue's inclusion criteria.
AddAsync(context.Context, replicaInQueue, float64)
// MaybeRemove removes the replica from the queue if it is present.
MaybeRemove(roachpb.RangeID)
// Name returns the name of the queue.
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ func (tq *testQueue) MaybeAddAsync(
}
}

// NB: AddAsync on a testQueue is actually synchronous.
func (tq *testQueue) AddAsync(ctx context.Context, replI replicaInQueue, prio float64) {
repl := replI.(*Replica)

tq.Lock()
defer tq.Unlock()
if index := tq.indexOf(repl.RangeID); index == -1 {
tq.ranges = append(tq.ranges, repl)
}
}

func (tq *testQueue) MaybeRemove(rangeID roachpb.RangeID) {
tq.Lock()
defer tq.Unlock()
Expand Down
Loading