Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: better error when stores are throttled #35308

Merged
merged 1 commit into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (a *Allocator) AllocateTarget(
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
) (*roachpb.StoreDescriptor, string, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)
sl, aliveStoreCount, throttled := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)

target, details := a.allocateTargetFromList(
ctx, sl, zone, existing, rangeInfo, a.scorerOptions())
Expand All @@ -411,14 +411,16 @@ func (a *Allocator) AllocateTarget(

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
if len(throttled) > 0 {
return nil, "", errors.Errorf(
"%d matching stores are currently throttled: %v", len(throttled), throttled,
)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existing),
aliveStores: aliveStoreCount,
throttledStores: throttledStoreCount,
throttledStores: len(throttled),
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ func (r *Replica) HasQuorum() bool {
// GetStoreList exposes getStoreList for testing only, but with a hardcoded
// storeFilter of storeFilterNone.
func (sp *StorePool) GetStoreList(rangeID roachpb.RangeID) (StoreList, int, int) {
return sp.getStoreList(rangeID, storeFilterNone)
list, available, throttled := sp.getStoreList(rangeID, storeFilterNone)
return list, available, len(throttled)
}

// Stores returns a copy of sl.stores.
Expand Down
28 changes: 17 additions & 11 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type storeDetail struct {
// throttledUntil is when a throttled store can be considered available again
// due to a failed or declined snapshot.
throttledUntil time.Time
// throttledBecause is set to the most recent reason for which a store was
// marked as throttled.
throttledBecause string
// lastUpdatedTime is set when a store is first consulted and every time
// gossip arrives for a store.
lastUpdatedTime time.Time
Expand Down Expand Up @@ -575,14 +578,16 @@ const (
storeFilterThrottled
)

type throttledStoreReasons []string

// getStoreList returns a storeList that contains all active stores that contain
// the required attributes and their associated stats. The storeList is filtered
// according to the provided storeFilter. It also returns the total number of
// alive and throttled stores. The passed in rangeID is used to check for
// corrupted replicas.
func (sp *StorePool) getStoreList(
rangeID roachpb.RangeID, filter storeFilter,
) (StoreList, int, int) {
) (StoreList, int, throttledStoreReasons) {
sp.detailsMu.RLock()
defer sp.detailsMu.RUnlock()

Expand All @@ -597,7 +602,7 @@ func (sp *StorePool) getStoreList(
// from the subset of passed in store IDs.
func (sp *StorePool) getStoreListFromIDs(
storeIDs roachpb.StoreIDSlice, rangeID roachpb.RangeID, filter storeFilter,
) (StoreList, int, int) {
) (StoreList, int, throttledStoreReasons) {
sp.detailsMu.RLock()
defer sp.detailsMu.RUnlock()
return sp.getStoreListFromIDsRLocked(storeIDs, rangeID, filter)
Expand All @@ -607,15 +612,15 @@ func (sp *StorePool) getStoreListFromIDs(
// that the detailsMU read lock is held.
func (sp *StorePool) getStoreListFromIDsRLocked(
storeIDs roachpb.StoreIDSlice, rangeID roachpb.RangeID, filter storeFilter,
) (StoreList, int, int) {
) (StoreList, int, throttledStoreReasons) {
if sp.deterministic {
sort.Sort(storeIDs)
} else {
shuffle.Shuffle(storeIDs)
}

var aliveStoreCount int
var throttledStoreCount int
var throttled throttledStoreReasons
var storeDescriptors []roachpb.StoreDescriptor

now := sp.clock.PhysicalTime()
Expand All @@ -626,7 +631,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
switch s := detail.status(now, timeUntilStoreDead, rangeID, sp.nodeLivenessFn); s {
case storeStatusThrottled:
aliveStoreCount++
throttledStoreCount++
throttled = append(throttled, detail.throttledBecause)
if filter != storeFilterThrottled {
storeDescriptors = append(storeDescriptors, *detail.desc)
}
Expand All @@ -641,7 +646,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
panic(fmt.Sprintf("unknown store status: %d", s))
}
}
return makeStoreList(storeDescriptors), aliveStoreCount, throttledStoreCount
return makeStoreList(storeDescriptors), aliveStoreCount, throttled
}

type throttleReason int
Expand All @@ -657,10 +662,11 @@ const (
// for up-replication or rebalancing until after the configured timeout period
// has elapsed. Declined being true indicates that the remote store explicitly
// declined a snapshot.
func (sp *StorePool) throttle(reason throttleReason, storeID roachpb.StoreID) {
func (sp *StorePool) throttle(reason throttleReason, why string, storeID roachpb.StoreID) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
detail := sp.getStoreDetailLocked(storeID)
detail.throttledBecause = why

// If a snapshot is declined, be it due to an error or because it was
// rejected, we mark the store detail as having been declined so it won't
Expand All @@ -672,16 +678,16 @@ func (sp *StorePool) throttle(reason throttleReason, storeID roachpb.StoreID) {
detail.throttledUntil = sp.clock.PhysicalTime().Add(timeout)
if log.V(2) {
ctx := sp.AnnotateCtx(context.TODO())
log.Infof(ctx, "snapshot declined, s%d will be throttled for %s until %s",
storeID, timeout, detail.throttledUntil)
log.Infof(ctx, "snapshot declined (%s), s%d will be throttled for %s until %s",
why, storeID, timeout, detail.throttledUntil)
}
case throttleFailed:
timeout := FailedReservationsTimeout.Get(&sp.st.SV)
detail.throttledUntil = sp.clock.PhysicalTime().Add(timeout)
if log.V(2) {
ctx := sp.AnnotateCtx(context.TODO())
log.Infof(ctx, "snapshot failed, s%d will be throttled for %s until %s",
storeID, timeout, detail.throttledUntil)
log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s",
why, storeID, timeout, detail.throttledUntil)
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func verifyStoreList(
expectedThrottledStoreCount int,
) error {
var actual []int
sl, aliveStoreCount, throttledStoreCount := sp.getStoreList(rangeID, filter)
sl, aliveStoreCount, throttled := sp.getStoreList(rangeID, filter)
throttledStoreCount := len(throttled)
sl = sl.filter(constraints)
if aliveStoreCount != expectedAliveStoreCount {
return errors.Errorf("expected AliveStoreCount %d does not match actual %d",
Expand Down Expand Up @@ -705,8 +706,8 @@ func TestStorePoolDefaultState(t *testing.T) {
if alive != 0 {
t.Errorf("expected no live stores; got a live count of %d", alive)
}
if throttled != 0 {
t.Errorf("expected no live stores; got a throttled count of %d", throttled)
if len(throttled) != 0 {
t.Errorf("expected no live stores; got throttled %v", throttled)
}
}

Expand All @@ -723,7 +724,7 @@ func TestStorePoolThrottle(t *testing.T) {

{
expected := sp.clock.Now().GoTime().Add(DeclinedReservationsTimeout.Get(&sp.st.SV))
sp.throttle(throttleDeclined, 1)
sp.throttle(throttleDeclined, "", 1)

sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(1)
Expand All @@ -736,7 +737,7 @@ func TestStorePoolThrottle(t *testing.T) {

{
expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV))
sp.throttle(throttleFailed, 1)
sp.throttle(throttleFailed, "", 1)

sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(1)
Expand Down
21 changes: 12 additions & 9 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func sendSnapshotError(stream incomingSnapshotStream, err error) error {

// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test.
type SnapshotStorePool interface {
throttle(reason throttleReason, toStoreID roachpb.StoreID)
throttle(reason throttleReason, why string, toStoreID roachpb.StoreID)
}

// rebalanceSnapshotRate is the rate at which preemptive snapshots can be sent.
Expand Down Expand Up @@ -741,32 +741,35 @@ func sendSnapshot(
// Wait until we get a response from the server.
resp, err := stream.Recv()
if err != nil {
storePool.throttle(throttleFailed, to.StoreID)
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
return err
}
switch resp.Status {
case SnapshotResponse_DECLINED:
if header.CanDecline {
storePool.throttle(throttleDeclined, to.StoreID)
declinedMsg := "reservation rejected"
if len(resp.Message) > 0 {
declinedMsg = resp.Message
}
return &benignError{errors.Errorf("%s: remote declined %s: %s", to, snap, declinedMsg)}
err := &benignError{errors.Errorf("%s: remote declined %s: %s", to, snap, declinedMsg)}
storePool.throttle(throttleDeclined, err.Error(), to.StoreID)
return err
}
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: programming error: remote declined required %s: %s",
err := errors.Errorf("%s: programming error: remote declined required %s: %s",
to, snap, resp.Message)
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
return err
case SnapshotResponse_ERROR:
storePool.throttle(throttleFailed, to.StoreID)
storePool.throttle(throttleFailed, resp.Message, to.StoreID)
return errors.Errorf("%s: remote couldn't accept %s with error: %s",
to, snap, resp.Message)
case SnapshotResponse_ACCEPTED:
// This is the response we're expecting. Continue with snapshot sending.
default:
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: server sent an invalid status while negotiating %s: %s",
err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s",
to, snap, resp.Status)
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
return err
}

log.Infof(ctx, "sending %s", snap)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2965,7 +2965,7 @@ type fakeStorePool struct {
failedThrottles int
}

func (sp *fakeStorePool) throttle(reason throttleReason, toStoreID roachpb.StoreID) {
func (sp *fakeStorePool) throttle(reason throttleReason, why string, toStoreID roachpb.StoreID) {
switch reason {
case throttleDeclined:
sp.declinedThrottles++
Expand Down