Skip to content

Commit

Permalink
storage: deflake TestStoreMetrics
Browse files Browse the repository at this point in the history
This was a tough one. Several problems were addressed, all variations on the
same theme:

- DistSenders in multiTestContext use a shared global stopper, but they
  may be called on goroutines which belong to a Store-level task. If that
  Store wants to quiesce and the DistSender can't finish its task because
  that same Store is already in quiescing mode, deadlocks occurred.
  The unfortunate solution is plugging in a channel which draws from two
  Stoppers, one of which may be quiesced and replaced multiple times.
- Additional deadlocks were caused due to multiTestContext's transport,
  which acquired a read lock that was formerly held in write mode throughout
  mtc.stopStore() (circumvented by dropping the lock there while quiescing).
- verifyStats was stopping individual Stores to perform computations without
  moving parts. Stopping individual Stores is tough when their tasks may be
  stuck on other Stores but can't complete while their own Store is already
  quiescing. Instead, verifyStats stops *all stores* simultaneously, regardless
  of which Store is actively being investigated.

Prior to these changes, failed in a few hundred to a few thousand iters
(depending on how many of the above were partially addressed):

```
$ make stressrace PKG=./storage TESTS=TestStoreMetrics TESTTIMEOUT=10s STRESSFLAGS='-maxfails 1 -stderr -p 128 -timeout 15m'
15784 runs so far, 0 failures, over 8m0s
```

Fixes cockroachdb#7678.
  • Loading branch information
tbg committed Jul 14, 2016
1 parent eb598b3 commit c2dd196
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/RFCS/drain_modes.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ running to `drain-clients` to both.

In our Stopper usage, we've taken to fairly ruthlessly shutting down service of
many components once `(*Stopper).Quiesce()` is called. In code, this manifests
itself through copious use of the `(*Stopper).ShouldDrain()` channel even when
itself through copious use of the `(*Stopper).ShouldQuiesce()` channel even when
not running inside of a task (or running long-running operations inside tasks).

This was motivated mainly by endless amounts of test failures around leaked
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewServer(ctx Context, stopper *stop.Stopper) (*Server, error) {
s.stopper,
)

// A custom RetryOptions is created which uses stopper.ShouldDrain() as
// A custom RetryOptions is created which uses stopper.ShouldQuiesce() as
// the Closer. This prevents infinite retry loops from occurring during
// graceful server shutdown
//
Expand Down
114 changes: 70 additions & 44 deletions storage/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package storage_test

import (
"sync"
"testing"

"github.com/cockroachdb/cockroach/internal/client"
Expand Down Expand Up @@ -54,56 +55,83 @@ func checkCounter(t *testing.T, s *storage.Store, key string, e int64) {
}
}

func verifyStats(t *testing.T, mtc *multiTestContext, storeIdx int) {
// Get the current store at storeIdx.
s := mtc.stores[storeIdx]
// Stop the store at the given index, while keeping the reference to the
// store object. ComputeMVCCStats() still works on a stopped store (it needs
func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) {
var stores []*storage.Store
var wg sync.WaitGroup

mtc.mu.RLock()
numStores := len(mtc.stores)
// We need to stop the stores at the given indexes, while keeping the reference to the
// store objects. ComputeMVCCStats() still works on a stopped store (it needs
// only the engine, which is still open), and the most recent stats are still
// available on the stopped store object; however, no further information can
// be committed to the store while it is stopped, preventing any races during
// verification.
mtc.stopStore(storeIdx)
for _, storeIdx := range storeIdxSlice {
stores = append(stores, mtc.stores[storeIdx])
}
mtc.mu.RUnlock()

wg.Add(numStores)
// We actually stop *all* of the Stores. Stopping only a few is riddled
// with deadlocks since operations can span nodes, but stoppers don't
// know about this - taking all of them down at the same time is the
// only sane way of guaranteeing that nothing interesting happens, at
// least when bringing down the nodes jeopardizes majorities.
for i := 0; i < numStores; i++ {
go func(i int) {
defer wg.Done()
mtc.stopStore(i)
}(i)
}
wg.Wait()

for _, s := range stores {
fatalf := func(msg string, args ...interface{}) {
prefix := s.Ident.String() + ": "
t.Fatalf(prefix+msg, args...)
}
// Compute real total MVCC statistics from store.
realStats, err := s.ComputeMVCCStats()
if err != nil {
t.Fatal(err)
}

// Compute real total MVCC statistics from store.
realStats, err := s.ComputeMVCCStats()
if err != nil {
t.Fatal(err)
}
// Sanity regression check for bug #4624: ensure intent count is zero.
if a := realStats.IntentCount; a != 0 {
fatalf("expected intent count to be zero, was %d", a)
}

// Sanity regression check for bug #4624: ensure intent count is zero.
if a := realStats.IntentCount; a != 0 {
t.Fatalf("Expected intent count to be zero, was %d", a)
}
// Sanity check: LiveBytes is not zero (ensures we don't have
// zeroed out structures.)
if liveBytes := getGauge(t, s, "livebytes"); liveBytes == 0 {
fatalf("expected livebytes to be non-zero, was zero")
}

// Sanity check: LiveBytes is not zero (ensures we don't have
// zeroed out structures.)
if liveBytes := getGauge(t, s, "livebytes"); liveBytes == 0 {
t.Fatal("Expected livebytes to be non-zero, was zero")
// Ensure that real MVCC stats match computed stats.
checkGauge(t, s, "livebytes", realStats.LiveBytes)
checkGauge(t, s, "keybytes", realStats.KeyBytes)
checkGauge(t, s, "valbytes", realStats.ValBytes)
checkGauge(t, s, "intentbytes", realStats.IntentBytes)
checkGauge(t, s, "livecount", realStats.LiveCount)
checkGauge(t, s, "keycount", realStats.KeyCount)
checkGauge(t, s, "valcount", realStats.ValCount)
checkGauge(t, s, "intentcount", realStats.IntentCount)
checkGauge(t, s, "sysbytes", realStats.SysBytes)
checkGauge(t, s, "syscount", realStats.SysCount)
// "Ages" will be different depending on how much time has passed. Even with
// a manual clock, this can be an issue in tests. Therefore, we do not
// verify them in this test.

if t.Failed() {
fatalf("verifyStats failed, aborting test.")
}
}

// Ensure that real MVCC stats match computed stats.
checkGauge(t, s, "livebytes", realStats.LiveBytes)
checkGauge(t, s, "keybytes", realStats.KeyBytes)
checkGauge(t, s, "valbytes", realStats.ValBytes)
checkGauge(t, s, "intentbytes", realStats.IntentBytes)
checkGauge(t, s, "livecount", realStats.LiveCount)
checkGauge(t, s, "keycount", realStats.KeyCount)
checkGauge(t, s, "valcount", realStats.ValCount)
checkGauge(t, s, "intentcount", realStats.IntentCount)
checkGauge(t, s, "sysbytes", realStats.SysBytes)
checkGauge(t, s, "syscount", realStats.SysCount)
// "Ages" will be different depending on how much time has passed. Even with
// a manual clock, this can be an issue in tests. Therefore, we do not
// verify them in this test.

if t.Failed() {
t.Log(errors.Errorf("verifyStats failed, aborting test."))
t.FailNow()
// Restart all Stores.
for i := 0; i < numStores; i++ {
mtc.restartStore(i)
}

// Restart the store at the provided index.
mtc.restartStore(storeIdx)
}

func verifyRocksDBStats(t *testing.T, s *storage.Store) {
Expand Down Expand Up @@ -182,8 +210,7 @@ func TestStoreMetrics(t *testing.T) {
mtc.waitForValues(roachpb.Key("z"), []int64{5, 5, 5})

// Verify all stats on store 0 and 1 after addition.
verifyStats(t, mtc, 0)
verifyStats(t, mtc, 1)
verifyStats(t, mtc, 0, 1)

// Create a transaction statement that fails, but will add an entry to the
// sequence cache. Regression test for #4969.
Expand Down Expand Up @@ -211,8 +238,7 @@ func TestStoreMetrics(t *testing.T) {
checkCounter(t, mtc.stores[1], "replicas", 1)

// Verify all stats on store0 and store1 after range is removed.
verifyStats(t, mtc, 0)
verifyStats(t, mtc, 1)
verifyStats(t, mtc, 0, 1)

verifyRocksDBStats(t, mtc.stores[0])
verifyRocksDBStats(t, mtc.stores[1])
Expand Down
68 changes: 64 additions & 4 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,57 @@ func (m *multiTestContext) addStore() {
),
)
}

stopper := stop.NewStopper()
if len(m.dbs) <= idx {
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = m.clientStopper.ShouldQuiesce()
retryOpts.Closer = func() chan struct{} {
ch := make(chan struct{})
// Feed the channel periodically as long as our "real" stopper
// is quiescing. Close it when clientStopper closes (which means
// we're done). This is awkward, but necessary. We allow stopping
// and restarting stores, but their DistSender needs to respect
// that as well (since they may be involved in Store tasks).
if m.clientStopper.RunAsyncTask(func() {
feed := func() bool { // true when closed (and we're done)
select {
case ch <- struct{}{}:
case <-m.clientStopper.ShouldQuiesce():
if ch != nil {
close(ch)
ch = nil
}
return true
}
return false
}

for {
var grabbedStopper *stop.Stopper
// While the stopper is nil, this store is down.
for grabbedStopper != nil {
m.mu.RLock()
if len(m.stoppers) <= idx {
grabbedStopper = m.stoppers[idx]
}
m.mu.RUnlock()
if feed() {
return
}
}
select {
case <-m.clientStopper.ShouldQuiesce():
feed() // to make sure chan is closed
return
case <-grabbedStopper.ShouldQuiesce():
feed()
}
}
}) != nil {
close(ch)
}
return ch
}()
m.distSenders = append(m.distSenders,
kv.NewDistSender(&kv.DistSenderContext{
Clock: m.clock,
Expand All @@ -552,7 +600,6 @@ func (m *multiTestContext) addStore() {
m.dbs = append(m.dbs, client.NewDB(sender))
}

stopper := stop.NewStopper()
ctx := m.makeContext(idx)
nodeID := roachpb.NodeID(idx + 1)
store := storage.NewStore(ctx, eng, &roachpb.NodeDescriptor{NodeID: nodeID})
Expand Down Expand Up @@ -631,11 +678,24 @@ func (m *multiTestContext) gossipNodeDesc(g *gossip.Gossip, nodeID roachpb.NodeI
// StopStore stops a store but leaves the engine intact.
// All stopped stores must be restarted before multiTestContext.Stop is called.
func (m *multiTestContext) stopStore(i int) {
m.mu.Lock()
// Stopping with multiple stoppers (which are not aware of each other) is
// messy.
// multiTestContextKVTransport needs a read lock to access its stopper and
// it's already in a task, so if we simply grabbed a write lock here while
// stopping we could deadlock (see #7678).
// So we initiate quiescing under a write lock, and then release the lock
// during stopping.
stopper := m.stoppers[i]
m.stoppers[i] = nil
go stopper.Quiesce()
<-stopper.ShouldQuiesce()
m.mu.Unlock()
stopper.Stop()

m.mu.Lock()
defer m.mu.Unlock()
m.senders[i].RemoveStore(m.stores[i])
m.stoppers[i].Stop()
m.stoppers[i] = nil
m.stores[i] = nil
}

Expand Down
2 changes: 1 addition & 1 deletion storage/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (p *pendingLeaseRequest) RequestPending() *roachpb.Lease {
// opposed to an extension, or acquiring the lease when none is held).
//
// Note: Once this function gets a context to be used for cancellation, instead
// of replica.store.Stopper().ShouldDrain(), care will be needed for cancelling
// of replica.store.Stopper().ShouldQuiesce(), care will be needed for cancelling
// the Raft command, similar to replica.addWriteCmd.
func (p *pendingLeaseRequest) InitOrJoinRequest(
replica *Replica,
Expand Down

0 comments on commit c2dd196

Please sign in to comment.