Skip to content

Commit

Permalink
kvserver: add an async parameter to Store.ManuallyEnqueue()
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
aayushshah15 committed Jun 9, 2022
1 parent 741c574 commit a6e2f17
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 49 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -567,7 +567,7 @@ WHERE start_pretty LIKE '%s' ORDER BY start_key ASC`, startPretty)).Scan(&startK
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
testutils.SucceedsSoon(t, func() error {
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
require.NoError(t, err)
return checkGCTrace(trace.String())
})
Expand Down Expand Up @@ -602,7 +602,7 @@ ORDER BY start_key ASC`, tableName, databaseName).Scan(&startKey)
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
testutils.SucceedsSoon(t, func() error {
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
require.NoError(t, err)
return checkGCTrace(trace.String())
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ func TestMultiRegionDataDriven(t *testing.T) {
return errors.New(`could not find replica`)
}
for _, queueName := range []string{"split", "replicate", "raftsnapshot"} {
_, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl,
true /* skipShouldQueue */)
_, processErr, err := store.Enqueue(
ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */
)
if processErr != nil {
return processErr
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return nil
})
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)

require.NoError(t, enqueueError)

Expand Down Expand Up @@ -1038,7 +1038,9 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
// We don't know who the leaseholder might be, so ignore errors.
_, _, _ = tc.GetFirstStoreFromServer(t, i).ManuallyEnqueue(ctx, "replicate", repl, true)
_, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
repl, err := store.GetReplica(desc.RangeID)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestProtectedTimestamps(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica()
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down Expand Up @@ -200,13 +200,13 @@ func TestProtectedTimestamps(t *testing.T) {
s, repl := getStoreAndReplica()
// The protectedts record will prevent us from aging the MVCC garbage bytes
// past the oldest record so shouldQueue should be false. Verify that.
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)shouldQueue=false", trace.String())

// If we skipShouldQueue then gc will run but it should only run up to the
// timestamp of our record at the latest.
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */)
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String())
thresh := thresholdFromTrace(trace)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestProtectedTimestamps(t *testing.T) {
// happens up to the protected timestamp of the new record.
require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID()))
testutils.SucceedsSoon(t, func() error {
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down
40 changes: 30 additions & 10 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,12 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
// needs a snapshot.
recording, pErr, err := leaseholderStore.ManuallyEnqueue(
ctx, "raftsnapshot", leaseholderRepl, false, /* skipShouldQueue */
recording, pErr, err := leaseholderStore.Enqueue(
ctx,
"raftsnapshot",
leaseholderRepl,
false, /* skipShouldQueue */
false, /* async */
)
if pErr != nil {
return pErr
Expand Down Expand Up @@ -582,7 +586,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
{
require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
_, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
_, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
Expand All @@ -600,7 +606,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
ltk.withStopAfterJointConfig(func() {
desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2))
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down Expand Up @@ -639,7 +647,9 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) {
// Run the replicaGC queue.
checkNoGC := func() roachpb.RangeDescriptor {
store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
const msg = `not gc'able, replica is still in range descriptor: (n2,s2):`
Expand Down Expand Up @@ -699,7 +709,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
// raft to figure out that the replica needs a snapshot.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
Expand Down Expand Up @@ -835,7 +847,9 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
queue1ErrCh := make(chan error, 1)
go func() {
queue1ErrCh <- func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1233,7 +1247,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
_, processErr, enqueueErr := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
_, processErr, enqueueErr := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.True(t, kvserver.IsReplicationChangeInProgressError(processErr))
return nil
Expand Down Expand Up @@ -1278,7 +1294,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
})

store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down Expand Up @@ -1313,7 +1331,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
checkTransitioningOut := func() {
t.Helper()
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,11 +1204,8 @@ func TestReplicateQueueShouldQueueNonVoter(t *testing.T) {
// because we know that it is the leaseholder (since it is the only voting
// replica).
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
recording, processErr, err := store.ManuallyEnqueue(
ctx,
"replicate",
repl,
false, /* skipShouldQueue */
recording, processErr, err := store.Enqueue(
ctx, "replicate", repl, false /* skipShouldQueue */, false, /* async */
)
if err != nil {
log.Errorf(ctx, "err: %s", err.Error())
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type replicaQueue interface {
// the queue's inclusion criteria and the queue is not already
// too full, etc.
MaybeAddAsync(context.Context, replicaInQueue, hlc.ClockTimestamp)
// AddAsync adds the replica to the queue without checking whether the replica
// meets the queue's inclusion criteria.
AddAsync(context.Context, replicaInQueue, float64)
// MaybeRemove removes the replica from the queue if it is present.
MaybeRemove(roachpb.RangeID)
// Name returns the name of the queue.
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ func (tq *testQueue) MaybeAddAsync(
}
}

// NB: AddAsync on a testQueue is actually synchronous.
func (tq *testQueue) AddAsync(ctx context.Context, replI replicaInQueue, prio float64) {
repl := replI.(*Replica)

tq.Lock()
defer tq.Unlock()
if index := tq.indexOf(repl.RangeID); index == -1 {
tq.ranges = append(tq.ranges, repl)
}
}

func (tq *testQueue) MaybeRemove(rangeID roachpb.RangeID) {
tq.Lock()
defer tq.Unlock()
Expand Down
43 changes: 29 additions & 14 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -3335,12 +3335,14 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracing.Rec
return collectAndFinish(), nil
}

// ManuallyEnqueue runs the given replica through the requested queue,
// returning all trace events collected along the way as well as the error
// message returned from the queue's process method, if any. Intended to help
// power an admin debug endpoint.
func (s *Store) ManuallyEnqueue(
ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool,
// Enqueue runs the given replica through the requested queue. If `async` is
// specified, the replica is enqueued into the requested queue for asynchronous
// processing and this method returns nothing. Otherwise, it returns all trace
// events collected along the way as well as the error message returned from the
// queue's process method, if any. Intended to help power the
// server.decommissionMonitor and an admin debug endpoint.
func (s *Store) Enqueue(
ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool, async bool,
) (recording tracing.Recording, processError error, enqueueError error) {
ctx = repl.AnnotateCtx(ctx)

Expand All @@ -3351,12 +3353,14 @@ func (s *Store) ManuallyEnqueue(
return nil, nil, errors.Errorf("not enqueueing uninitialized replica %s", repl)
}

var queue queueImpl
var queue replicaQueue
var qImpl queueImpl
var needsLease bool
for _, replicaQueue := range s.scanner.queues {
if strings.EqualFold(replicaQueue.Name(), queueName) {
queue = replicaQueue.(queueImpl)
needsLease = replicaQueue.NeedsLease()
for _, q := range s.scanner.queues {
if strings.EqualFold(q.Name(), queueName) {
queue = q
qImpl = q.(queueImpl)
needsLease = q.NeedsLease()
}
}
if queue == nil {
Expand All @@ -3378,21 +3382,32 @@ func (s *Store) ManuallyEnqueue(
}
}

if async {
// NB: 1e6 is a placeholder for now. We want to use a high enough priority
// to ensure that these replicas are priority-ordered first.
if skipShouldQueue {
queue.AddAsync(ctx, repl, 1e6 /* prio */)
} else {
queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp())
}
return nil, nil, nil
}

ctx, collectAndFinish := tracing.ContextWithRecordingSpan(
ctx, s.cfg.AmbientCtx.Tracer, fmt.Sprintf("manual %s queue run", queueName))
defer collectAndFinish()

if !skipShouldQueue {
log.Eventf(ctx, "running %s.shouldQueue", queueName)
shouldQueue, priority := queue.shouldQueue(ctx, s.cfg.Clock.NowAsClockTimestamp(), repl, confReader)
shouldQueue, priority := qImpl.shouldQueue(ctx, s.cfg.Clock.NowAsClockTimestamp(), repl, confReader)
log.Eventf(ctx, "shouldQueue=%v, priority=%f", shouldQueue, priority)
if !shouldQueue {
return collectAndFinish(), nil, nil
}
}

log.Eventf(ctx, "running %s.process", queueName)
processed, processErr := queue.process(ctx, repl, confReader)
processed, processErr := qImpl.process(ctx, repl, confReader)
log.Eventf(ctx, "processed: %t (err: %v)", processed, processErr)
return collectAndFinish(), processErr, nil
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -2336,6 +2336,10 @@ func (fq *fakeRangeQueue) MaybeAddAsync(context.Context, replicaInQueue, hlc.Clo
// Do nothing
}

func (fq *fakeRangeQueue) AddAsync(context.Context, replicaInQueue, float64) {
// Do nothing
}

func (fq *fakeRangeQueue) MaybeRemove(rangeID roachpb.RangeID) {
fq.maybeRemovedRngs <- rangeID
}
Expand Down Expand Up @@ -3053,7 +3057,9 @@ func TestManuallyEnqueueUninitializedReplica(t *testing.T) {
StoreID: tc.store.StoreID(),
ReplicaID: 7,
})
_, _, err := tc.store.ManuallyEnqueue(ctx, "replicaGC", repl, true)
_, _, err := tc.store.Enqueue(
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
)
require.Error(t, err)
require.Contains(t, err.Error(), "not enqueueing uninitialized replica")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/stores_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *baseStore) Enqueue(
return err
}

_, processErr, enqueueErr := store.ManuallyEnqueue(ctx, queue, repl, skipShouldQueue)
_, processErr, enqueueErr := store.Enqueue(ctx, queue, repl, skipShouldQueue, false /* async */)
if processErr != nil {
return processErr
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"strings"
"time"

apd "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -2832,7 +2832,9 @@ func (s *adminServer) enqueueRangeLocal(
queueName = "mvccGC"
}

traceSpans, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl, req.SkipShouldQueue)
traceSpans, processErr, err := store.Enqueue(
ctx, queueName, repl, req.SkipShouldQueue, false, /* async */
)
if err != nil {
response.Details[0].Error = err.Error()
return response, nil
Expand Down
Loading

0 comments on commit a6e2f17

Please sign in to comment.