Skip to content

Commit

Permalink
Merge #40645
Browse files Browse the repository at this point in the history
40645: storage: don't panic on absent store in StorePool.getStoreListFromIDs r=nvanbenschoten a=nvanbenschoten

Fixes #40598.

The store ID filter is passed to the StorePool from outside of the lock,
so it's possible that the removal of the store from the StorePool raced
with the goroutine calling getStoreListFromIDs. This should be handled
by ignoring the StoreID, which is already the case if a store is found
with the status `storeStatusDead`, `storeStatusUnknown`, or
`storeStatusDecommissioning`.

Will need to be backported to 19.1 and 2.1.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Sep 11, 2019
2 parents fc9eb68 + 4963b60 commit 4146821
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
8 changes: 6 additions & 2 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,11 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

for _, storeID := range storeIDs {
detail := sp.detailsMu.storeDetails[storeID]
detail, ok := sp.detailsMu.storeDetails[storeID]
if !ok {
// Do nothing; this store is not in the StorePool.
continue
}
switch s := detail.status(now, timeUntilStoreDead, rangeID, sp.nodeLivenessFn); s {
case storeStatusThrottled:
aliveStoreCount++
Expand All @@ -652,7 +656,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
aliveStoreCount++
storeDescriptors = append(storeDescriptors, *detail.desc)
case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning:
// Do nothing; this node cannot be used.
// Do nothing; this store cannot be used.
default:
panic(fmt.Sprintf("unknown store status: %d", s))
}
Expand Down
69 changes: 64 additions & 5 deletions pkg/storage/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,21 @@ func TestStorePoolGossipUpdate(t *testing.T) {
func verifyStoreList(
sp *StorePool,
constraints []config.Constraints,
storeIDs roachpb.StoreIDSlice, // optional
rangeID roachpb.RangeID,
expected []int,
filter storeFilter,
expected []int,
expectedAliveStoreCount int,
expectedThrottledStoreCount int,
) error {
var actual []int
sl, aliveStoreCount, throttled := sp.getStoreList(rangeID, filter)
var sl StoreList
var aliveStoreCount int
var throttled throttledStoreReasons
if storeIDs == nil {
sl, aliveStoreCount, throttled = sp.getStoreList(rangeID, filter)
} else {
sl, aliveStoreCount, throttled = sp.getStoreListFromIDs(storeIDs, rangeID, filter)
}
throttledStoreCount := len(throttled)
sl = sl.filter(constraints)
if aliveStoreCount != expectedAliveStoreCount {
Expand All @@ -167,6 +174,7 @@ func verifyStoreList(
return errors.Errorf("expected ThrottledStoreCount %d does not match actual %d",
expectedThrottledStoreCount, throttledStoreCount)
}
var actual []int
for _, store := range sl.stores {
actual = append(actual, int(store.StoreID))
}
Expand Down Expand Up @@ -235,6 +243,11 @@ func TestStorePoolGetStoreList(t *testing.T) {
Node: roachpb.NodeDescriptor{NodeID: 6},
Attrs: roachpb.Attributes{Attrs: required},
}
absentStore := roachpb.StoreDescriptor{
StoreID: 7,
Node: roachpb.NodeDescriptor{NodeID: 7},
Attrs: roachpb.Attributes{Attrs: required},
}

const rangeID = roachpb.RangeID(1)

Expand All @@ -246,6 +259,7 @@ func TestStorePoolGetStoreList(t *testing.T) {
&emptyStore,
&deadStore,
&declinedStore,
// absentStore is purposefully not gossiped.
}, t)
for i := 1; i <= 7; i++ {
mnl.setNodeStatus(roachpb.NodeID(i), storagepb.NodeLivenessStatus_LIVE)
Expand All @@ -258,36 +272,81 @@ func TestStorePoolGetStoreList(t *testing.T) {
sp.detailsMu.storeDetails[declinedStore.StoreID].throttledUntil = sp.clock.Now().GoTime().Add(time.Hour)
sp.detailsMu.Unlock()

// No filter or limited set of store IDs.
if err := verifyStoreList(
sp,
constraints,
nil, /* storeIDs */
rangeID,
storeFilterNone,
[]int{
int(matchingStore.StoreID),
int(supersetStore.StoreID),
int(declinedStore.StoreID),
},
storeFilterNone,
/* expectedAliveStoreCount */ 5,
/* expectedThrottledStoreCount */ 1,
); err != nil {
t.Error(err)
}

// Filter out throttled stores but don't limit the set of store IDs.
if err := verifyStoreList(
sp,
constraints,
nil, /* storeIDs */
rangeID,
storeFilterThrottled,
[]int{
int(matchingStore.StoreID),
int(supersetStore.StoreID),
},
storeFilterThrottled,
/* expectedAliveStoreCount */ 5,
/* expectedThrottledStoreCount */ 1,
); err != nil {
t.Error(err)
}

limitToStoreIDs := roachpb.StoreIDSlice{
matchingStore.StoreID,
declinedStore.StoreID,
absentStore.StoreID,
}

// No filter but limited to limitToStoreIDs.
// Note that supersetStore is not included.
if err := verifyStoreList(
sp,
constraints,
limitToStoreIDs,
rangeID,
storeFilterNone,
[]int{
int(matchingStore.StoreID),
int(declinedStore.StoreID),
},
/* expectedAliveStoreCount */ 2,
/* expectedThrottledStoreCount */ 1,
); err != nil {
t.Error(err)
}

// Filter out throttled stores and limit to limitToStoreIDs.
// Note that supersetStore is not included.
if err := verifyStoreList(
sp,
constraints,
limitToStoreIDs,
rangeID,
storeFilterThrottled,
[]int{
int(matchingStore.StoreID),
},
/* expectedAliveStoreCount */ 2,
/* expectedThrottledStoreCount */ 1,
); err != nil {
t.Error(err)
}
}

// TestStoreListFilter ensures that the store list constraint filtering works
Expand Down

0 comments on commit 4146821

Please sign in to comment.