Skip to content

Commit

Permalink
Merge pull request cockroachdb#18439 from a-robinson/snapshotreject
Browse files Browse the repository at this point in the history
storage: Add explanatory messages to preemptive snapshot rejections
  • Loading branch information
a-robinson authored Sep 12, 2017
2 parents e4c832d + a07d076 commit 29c6854
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 20 deletions.
32 changes: 20 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ const (
// recovery because it is on a recently restarted node.
prohibitRebalancesBehindThreshold = 1000

// Messages that provide detail about why a preemptive snapshot was rejected.
rebalancesDisabledMsg = "rebalances disabled because node is behind"
snapshotApplySemBusyMsg = "store busy applying snapshots and/or removing replicas"
storeDrainingMsg = "store is draining"

// IntersectingSnapshotMsg is part of the error message returned from
// canApplySnapshotLocked and is exposed here so testing can rely on it.
IntersectingSnapshotMsg = "snapshot intersects existing range"
Expand Down Expand Up @@ -2690,35 +2695,35 @@ func (s *Store) maybeWaitInPushTxnQueue(

// reserveSnapshot throttles incoming snapshots. The returned closure is used
// to cleanup the reservation and release its resources. A nil cleanup function
// and a nil error indicates the reservation was declined.
// and a non-empty rejectionMessage indicates the reservation was declined.
func (s *Store) reserveSnapshot(
ctx context.Context, header *SnapshotRequest_Header,
) (func(), error) {
) (_cleanup func(), _rejectionMsg string, _err error) {
if header.RangeSize == 0 {
// Empty snapshots are exempt from rate limits because they're so cheap to
// apply. This vastly speeds up rebalancing any empty ranges created by a
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
// getting stuck behind large snapshots managed by the replicate queue.
} else if header.CanDecline {
if atomic.LoadInt32(&s.rebalancesDisabled) == 1 {
return nil, nil
return nil, rebalancesDisabledMsg, nil
}
select {
case s.snapshotApplySem <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
return nil, "", ctx.Err()
case <-s.stopper.ShouldStop():
return nil, errors.Errorf("stopped")
return nil, "", errors.Errorf("stopped")
default:
return nil, nil
return nil, snapshotApplySemBusyMsg, nil
}
} else {
select {
case s.snapshotApplySem <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
return nil, "", ctx.Err()
case <-s.stopper.ShouldStop():
return nil, errors.Errorf("stopped")
return nil, "", errors.Errorf("stopped")
}
}

Expand All @@ -2730,7 +2735,7 @@ func (s *Store) reserveSnapshot(
if header.RangeSize != 0 {
<-s.snapshotApplySem
}
}, nil
}, "", nil
}

// HandleSnapshot reads an incoming streaming snapshot and applies it if
Expand All @@ -2743,17 +2748,20 @@ func (s *Store) HandleSnapshot(
if s.IsDraining() {
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_DECLINED,
Message: "store is draining",
Message: storeDrainingMsg,
})
}

ctx := s.AnnotateCtx(stream.Context())
cleanup, err := s.reserveSnapshot(ctx, header)
cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header)
if err != nil {
return err
}
if cleanup == nil {
return stream.Send(&SnapshotResponse{Status: SnapshotResponse_DECLINED})
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_DECLINED,
Message: rejectionMsg,
})
}
defer cleanup()

Expand Down
44 changes: 36 additions & 8 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2597,31 +2597,56 @@ func TestReserveSnapshotThrottling(t *testing.T) {

ctx := context.Background()

cleanupNonEmpty1, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{
cleanupNonEmpty1, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{
RangeSize: 1,
})
if err != nil {
t.Fatal(err)
}
if rejectionMsg != "" {
t.Fatalf("expected no rejection message, got %q", rejectionMsg)
}
if n := s.ReservationCount(); n != 1 {
t.Fatalf("expected 1 reservation, but found %d", n)
}

// Ensure we allow a concurrent empty snapshot.
cleanupEmpty, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{})
cleanupEmpty, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{})
if err != nil {
t.Fatal(err)
}
if rejectionMsg != "" {
t.Fatalf("expected no rejection message, got %q", rejectionMsg)
}
// Empty snapshots are not throttled and so do not increase the reservation
// count.
if n := s.ReservationCount(); n != 1 {
t.Fatalf("expected 1 reservations, but found %d", n)
t.Fatalf("expected 1 reservation, but found %d", n)
}
cleanupEmpty()

// Verify we don't allow concurrent snapshots by spawning a goroutine which
// will execute the cleanup after a short delay but only if another snapshot
// was not allowed through.
// Verify that a declinable snapshot will be declined if another is in
// progress.
cleanupNonEmpty2, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{
RangeSize: 1,
CanDecline: true,
})
if err != nil {
t.Fatal(err)
}
if rejectionMsg != snapshotApplySemBusyMsg {
t.Fatalf("expected rejection message %q, got %q", snapshotApplySemBusyMsg, rejectionMsg)
}
if cleanupNonEmpty2 != nil {
t.Fatalf("got unexpected non-nil cleanup method")
}
if n := s.ReservationCount(); n != 1 {
t.Fatalf("expected 1 reservation, but found %d", n)
}

// Verify we block concurrent snapshots by spawning a goroutine which will
// execute the cleanup after a short delay but only if another snapshot was
// not allowed through.
var boom int32
go func() {
time.Sleep(20 * time.Millisecond)
Expand All @@ -2630,14 +2655,17 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
}()

cleanupNonEmpty2, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{
cleanupNonEmpty3, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{
RangeSize: 1,
})
if err != nil {
t.Fatal(err)
}
if rejectionMsg != "" {
t.Fatalf("expected no rejection message, got %q", rejectionMsg)
}
atomic.StoreInt32(&boom, 1)
cleanupNonEmpty2()
cleanupNonEmpty3()

if n := s.ReservationCount(); n != 0 {
t.Fatalf("expected 0 reservations, but found %d", n)
Expand Down

0 comments on commit 29c6854

Please sign in to comment.