Skip to content

Commit

Permalink
storage: better error when stores are throttled
Browse files Browse the repository at this point in the history
This is an annoying blocker during replication, and has also played a
role in test flakes such as #35307.

Release note: None
  • Loading branch information
tbg committed Mar 5, 2019
1 parent 047556f commit 2beec4e
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 31 deletions.
10 changes: 6 additions & 4 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (a *Allocator) AllocateTarget(
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
) (*roachpb.StoreDescriptor, string, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)
sl, aliveStoreCount, throttled := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)

target, details := a.allocateTargetFromList(
ctx, sl, zone, existing, rangeInfo, a.scorerOptions())
Expand All @@ -411,14 +411,16 @@ func (a *Allocator) AllocateTarget(

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
if len(throttled) > 0 {
return nil, "", errors.Errorf(
"%d matching stores are currently throttled: %v", len(throttled), throttled,
)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existing),
aliveStores: aliveStoreCount,
throttledStores: throttledStoreCount,
throttledStores: len(throttled),
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ func (r *Replica) HasQuorum() bool {
// GetStoreList exposes getStoreList for testing only, but with a hardcoded
// storeFilter of storeFilterNone.
func (sp *StorePool) GetStoreList(rangeID roachpb.RangeID) (StoreList, int, int) {
return sp.getStoreList(rangeID, storeFilterNone)
list, available, throttled := sp.getStoreList(rangeID, storeFilterNone)
return list, available, len(throttled)
}

// Stores returns a copy of sl.stores.
Expand Down
28 changes: 17 additions & 11 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type storeDetail struct {
// throttledUntil is when a throttled store can be considered available again
// due to a failed or declined snapshot.
throttledUntil time.Time
// throttledBecause is set to the most recent reason for which a store was
// marked as throttled.
throttledBecause string
// lastUpdatedTime is set when a store is first consulted and every time
// gossip arrives for a store.
lastUpdatedTime time.Time
Expand Down Expand Up @@ -575,14 +578,16 @@ const (
storeFilterThrottled
)

type throttledStoreReasons []string

// getStoreList returns a storeList that contains all active stores that contain
// the required attributes and their associated stats. The storeList is filtered
// according to the provided storeFilter. It also returns the total number of
// alive and throttled stores. The passed in rangeID is used to check for
// corrupted replicas.
func (sp *StorePool) getStoreList(
rangeID roachpb.RangeID, filter storeFilter,
) (StoreList, int, int) {
) (StoreList, int, throttledStoreReasons) {
sp.detailsMu.RLock()
defer sp.detailsMu.RUnlock()

Expand All @@ -597,7 +602,7 @@ func (sp *StorePool) getStoreList(
// from the subset of passed in store IDs.
func (sp *StorePool) getStoreListFromIDs(
storeIDs roachpb.StoreIDSlice, rangeID roachpb.RangeID, filter storeFilter,
) (StoreList, int, int) {
) (StoreList, int, throttledStoreReasons) {
sp.detailsMu.RLock()
defer sp.detailsMu.RUnlock()
return sp.getStoreListFromIDsRLocked(storeIDs, rangeID, filter)
Expand All @@ -607,15 +612,15 @@ func (sp *StorePool) getStoreListFromIDs(
// that the detailsMU read lock is held.
func (sp *StorePool) getStoreListFromIDsRLocked(
storeIDs roachpb.StoreIDSlice, rangeID roachpb.RangeID, filter storeFilter,
) (StoreList, int, int) {
) (StoreList, int, throttledStoreReasons) {
if sp.deterministic {
sort.Sort(storeIDs)
} else {
shuffle.Shuffle(storeIDs)
}

var aliveStoreCount int
var throttledStoreCount int
var throttled throttledStoreReasons
var storeDescriptors []roachpb.StoreDescriptor

now := sp.clock.PhysicalTime()
Expand All @@ -626,7 +631,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
switch s := detail.status(now, timeUntilStoreDead, rangeID, sp.nodeLivenessFn); s {
case storeStatusThrottled:
aliveStoreCount++
throttledStoreCount++
throttled = append(throttled, detail.throttledBecause)
if filter != storeFilterThrottled {
storeDescriptors = append(storeDescriptors, *detail.desc)
}
Expand All @@ -641,7 +646,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
panic(fmt.Sprintf("unknown store status: %d", s))
}
}
return makeStoreList(storeDescriptors), aliveStoreCount, throttledStoreCount
return makeStoreList(storeDescriptors), aliveStoreCount, throttled
}

type throttleReason int
Expand All @@ -657,10 +662,11 @@ const (
// for up-replication or rebalancing until after the configured timeout period
// has elapsed. Declined being true indicates that the remote store explicitly
// declined a snapshot.
func (sp *StorePool) throttle(reason throttleReason, storeID roachpb.StoreID) {
func (sp *StorePool) throttle(reason throttleReason, why string, storeID roachpb.StoreID) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
detail := sp.getStoreDetailLocked(storeID)
detail.throttledBecause = why

// If a snapshot is declined, be it due to an error or because it was
// rejected, we mark the store detail as having been declined so it won't
Expand All @@ -672,16 +678,16 @@ func (sp *StorePool) throttle(reason throttleReason, storeID roachpb.StoreID) {
detail.throttledUntil = sp.clock.PhysicalTime().Add(timeout)
if log.V(2) {
ctx := sp.AnnotateCtx(context.TODO())
log.Infof(ctx, "snapshot declined, s%d will be throttled for %s until %s",
storeID, timeout, detail.throttledUntil)
log.Infof(ctx, "snapshot declined (%s), s%d will be throttled for %s until %s",
why, storeID, timeout, detail.throttledUntil)
}
case throttleFailed:
timeout := FailedReservationsTimeout.Get(&sp.st.SV)
detail.throttledUntil = sp.clock.PhysicalTime().Add(timeout)
if log.V(2) {
ctx := sp.AnnotateCtx(context.TODO())
log.Infof(ctx, "snapshot failed, s%d will be throttled for %s until %s",
storeID, timeout, detail.throttledUntil)
log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s",
why, storeID, timeout, detail.throttledUntil)
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func verifyStoreList(
expectedThrottledStoreCount int,
) error {
var actual []int
sl, aliveStoreCount, throttledStoreCount := sp.getStoreList(rangeID, filter)
sl, aliveStoreCount, throttled := sp.getStoreList(rangeID, filter)
throttledStoreCount := len(throttled)
sl = sl.filter(constraints)
if aliveStoreCount != expectedAliveStoreCount {
return errors.Errorf("expected AliveStoreCount %d does not match actual %d",
Expand Down Expand Up @@ -705,8 +706,8 @@ func TestStorePoolDefaultState(t *testing.T) {
if alive != 0 {
t.Errorf("expected no live stores; got a live count of %d", alive)
}
if throttled != 0 {
t.Errorf("expected no live stores; got a throttled count of %d", throttled)
if len(throttled) != 0 {
t.Errorf("expected no live stores; got throttled %v", throttled)
}
}

Expand All @@ -723,7 +724,7 @@ func TestStorePoolThrottle(t *testing.T) {

{
expected := sp.clock.Now().GoTime().Add(DeclinedReservationsTimeout.Get(&sp.st.SV))
sp.throttle(throttleDeclined, 1)
sp.throttle(throttleDeclined, "", 1)

sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(1)
Expand All @@ -736,7 +737,7 @@ func TestStorePoolThrottle(t *testing.T) {

{
expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV))
sp.throttle(throttleFailed, 1)
sp.throttle(throttleFailed, "", 1)

sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(1)
Expand Down
21 changes: 12 additions & 9 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func sendSnapshotError(stream incomingSnapshotStream, err error) error {

// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test.
type SnapshotStorePool interface {
throttle(reason throttleReason, toStoreID roachpb.StoreID)
throttle(reason throttleReason, why string, toStoreID roachpb.StoreID)
}

// rebalanceSnapshotRate is the rate at which preemptive snapshots can be sent.
Expand Down Expand Up @@ -741,32 +741,35 @@ func sendSnapshot(
// Wait until we get a response from the server.
resp, err := stream.Recv()
if err != nil {
storePool.throttle(throttleFailed, to.StoreID)
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
return err
}
switch resp.Status {
case SnapshotResponse_DECLINED:
if header.CanDecline {
storePool.throttle(throttleDeclined, to.StoreID)
declinedMsg := "reservation rejected"
if len(resp.Message) > 0 {
declinedMsg = resp.Message
}
return &benignError{errors.Errorf("%s: remote declined %s: %s", to, snap, declinedMsg)}
err := &benignError{errors.Errorf("%s: remote declined %s: %s", to, snap, declinedMsg)}
storePool.throttle(throttleDeclined, err.Error(), to.StoreID)
return err
}
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: programming error: remote declined required %s: %s",
err := errors.Errorf("%s: programming error: remote declined required %s: %s",
to, snap, resp.Message)
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
return err
case SnapshotResponse_ERROR:
storePool.throttle(throttleFailed, to.StoreID)
storePool.throttle(throttleFailed, resp.Message, to.StoreID)
return errors.Errorf("%s: remote couldn't accept %s with error: %s",
to, snap, resp.Message)
case SnapshotResponse_ACCEPTED:
// This is the response we're expecting. Continue with snapshot sending.
default:
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: server sent an invalid status while negotiating %s: %s",
err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s",
to, snap, resp.Status)
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
return err
}

log.Infof(ctx, "sending %s", snap)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2965,7 +2965,7 @@ type fakeStorePool struct {
failedThrottles int
}

func (sp *fakeStorePool) throttle(reason throttleReason, toStoreID roachpb.StoreID) {
func (sp *fakeStorePool) throttle(reason throttleReason, why string, toStoreID roachpb.StoreID) {
switch reason {
case throttleDeclined:
sp.declinedThrottles++
Expand Down

0 comments on commit 2beec4e

Please sign in to comment.