From a07d076f95a1cd7cd9234fd204ee23e949a49370 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Mon, 11 Sep 2017 18:15:34 -0400 Subject: [PATCH] storage: Add explanatory messages to preemptive snapshot rejections --- pkg/storage/store.go | 32 +++++++++++++++++----------- pkg/storage/store_test.go | 44 ++++++++++++++++++++++++++++++++------- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index e6e8cc970edc..4c0385dc82d6 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -96,6 +96,11 @@ const ( // recovery because it is on a recently restarted node. prohibitRebalancesBehindThreshold = 1000 + // Messages that provide detail about why a preemptive snapshot was rejected. + rebalancesDisabledMsg = "rebalances disabled because node is behind" + snapshotApplySemBusyMsg = "store busy applying snapshots and/or removing replicas" + storeDrainingMsg = "store is draining" + // IntersectingSnapshotMsg is part of the error message returned from // canApplySnapshotLocked and is exposed here so testing can rely on it. IntersectingSnapshotMsg = "snapshot intersects existing range" @@ -2690,10 +2695,10 @@ func (s *Store) maybeWaitInPushTxnQueue( // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function -// and a nil error indicates the reservation was declined. +// and a non-empty rejectionMessage indicates the reservation was declined. func (s *Store) reserveSnapshot( ctx context.Context, header *SnapshotRequest_Header, -) (func(), error) { +) (_cleanup func(), _rejectionMsg string, _err error) { if header.RangeSize == 0 { // Empty snapshots are exempt from rate limits because they're so cheap to // apply. This vastly speeds up rebalancing any empty ranges created by a @@ -2701,24 +2706,24 @@ func (s *Store) reserveSnapshot( // getting stuck behind large snapshots managed by the replicate queue. } else if header.CanDecline { if atomic.LoadInt32(&s.rebalancesDisabled) == 1 { - return nil, nil + return nil, rebalancesDisabledMsg, nil } select { case s.snapshotApplySem <- struct{}{}: case <-ctx.Done(): - return nil, ctx.Err() + return nil, "", ctx.Err() case <-s.stopper.ShouldStop(): - return nil, errors.Errorf("stopped") + return nil, "", errors.Errorf("stopped") default: - return nil, nil + return nil, snapshotApplySemBusyMsg, nil } } else { select { case s.snapshotApplySem <- struct{}{}: case <-ctx.Done(): - return nil, ctx.Err() + return nil, "", ctx.Err() case <-s.stopper.ShouldStop(): - return nil, errors.Errorf("stopped") + return nil, "", errors.Errorf("stopped") } } @@ -2730,7 +2735,7 @@ func (s *Store) reserveSnapshot( if header.RangeSize != 0 { <-s.snapshotApplySem } - }, nil + }, "", nil } // HandleSnapshot reads an incoming streaming snapshot and applies it if @@ -2743,17 +2748,20 @@ func (s *Store) HandleSnapshot( if s.IsDraining() { return stream.Send(&SnapshotResponse{ Status: SnapshotResponse_DECLINED, - Message: "store is draining", + Message: storeDrainingMsg, }) } ctx := s.AnnotateCtx(stream.Context()) - cleanup, err := s.reserveSnapshot(ctx, header) + cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header) if err != nil { return err } if cleanup == nil { - return stream.Send(&SnapshotResponse{Status: SnapshotResponse_DECLINED}) + return stream.Send(&SnapshotResponse{ + Status: SnapshotResponse_DECLINED, + Message: rejectionMsg, + }) } defer cleanup() diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 29c4349be403..1a1d669bbb08 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2597,31 +2597,56 @@ func TestReserveSnapshotThrottling(t *testing.T) { ctx := context.Background() - cleanupNonEmpty1, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ + cleanupNonEmpty1, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ RangeSize: 1, }) if err != nil { t.Fatal(err) } + if rejectionMsg != "" { + t.Fatalf("expected no rejection message, got %q", rejectionMsg) + } if n := s.ReservationCount(); n != 1 { t.Fatalf("expected 1 reservation, but found %d", n) } // Ensure we allow a concurrent empty snapshot. - cleanupEmpty, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{}) + cleanupEmpty, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{}) if err != nil { t.Fatal(err) } + if rejectionMsg != "" { + t.Fatalf("expected no rejection message, got %q", rejectionMsg) + } // Empty snapshots are not throttled and so do not increase the reservation // count. if n := s.ReservationCount(); n != 1 { - t.Fatalf("expected 1 reservations, but found %d", n) + t.Fatalf("expected 1 reservation, but found %d", n) } cleanupEmpty() - // Verify we don't allow concurrent snapshots by spawning a goroutine which - // will execute the cleanup after a short delay but only if another snapshot - // was not allowed through. + // Verify that a declinable snapshot will be declined if another is in + // progress. + cleanupNonEmpty2, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ + RangeSize: 1, + CanDecline: true, + }) + if err != nil { + t.Fatal(err) + } + if rejectionMsg != snapshotApplySemBusyMsg { + t.Fatalf("expected rejection message %q, got %q", snapshotApplySemBusyMsg, rejectionMsg) + } + if cleanupNonEmpty2 != nil { + t.Fatalf("got unexpected non-nil cleanup method") + } + if n := s.ReservationCount(); n != 1 { + t.Fatalf("expected 1 reservation, but found %d", n) + } + + // Verify we block concurrent snapshots by spawning a goroutine which will + // execute the cleanup after a short delay but only if another snapshot was + // not allowed through. var boom int32 go func() { time.Sleep(20 * time.Millisecond) @@ -2630,14 +2655,17 @@ func TestReserveSnapshotThrottling(t *testing.T) { } }() - cleanupNonEmpty2, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ + cleanupNonEmpty3, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ RangeSize: 1, }) if err != nil { t.Fatal(err) } + if rejectionMsg != "" { + t.Fatalf("expected no rejection message, got %q", rejectionMsg) + } atomic.StoreInt32(&boom, 1) - cleanupNonEmpty2() + cleanupNonEmpty3() if n := s.ReservationCount(); n != 0 { t.Fatalf("expected 0 reservations, but found %d", n)