Skip to content

Commit

Permalink
Remove nested read lock to prevent deadlock (#2128)
Browse files Browse the repository at this point in the history
* Remove nested read lock to prevent deadlock

* Make test into prop test and make it 'big'

* Respect shardIterateBatchMinSize
  • Loading branch information
justinjc authored Jan 31, 2020
1 parent 217cfe8 commit af17524
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
21 changes: 15 additions & 6 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,14 @@ func (s *dbShard) RetrievableBlockColdVersion(blockStart time.Time) (int, error)
// BlockStatesSnapshot implements series.QueryableBlockRetriever
func (s *dbShard) BlockStatesSnapshot() series.ShardBlockStateSnapshot {
s.RLock()
bootstrapped := s.bootstrapState == Bootstrapped
snapshots := s.blockStatesSnapshotWithRLock()
s.RUnlock()

return snapshots
}

func (s *dbShard) blockStatesSnapshotWithRLock() series.ShardBlockStateSnapshot {
bootstrapped := s.bootstrapState == Bootstrapped
if !bootstrapped {
// Needs to be bootstrapped.
return series.NewShardBlockStateSnapshot(false, series.BootstrappedBlockStateSnapshot{})
Expand Down Expand Up @@ -509,7 +515,7 @@ func (s *dbShard) forEachShardEntry(entryFn dbShardEntryWorkFn) error {

func iterateBatchSize(elemsLen int) int {
if elemsLen < shardIterateBatchMinSize {
return elemsLen
return shardIterateBatchMinSize
}
t := math.Ceil(float64(shardIterateBatchPercent) * float64(elemsLen))
return int(math.Max(shardIterateBatchMinSize, t))
Expand Down Expand Up @@ -684,9 +690,11 @@ func (s *dbShard) tickAndExpire(
s.RLock()
tickSleepBatch := s.currRuntimeOptions.tickSleepSeriesBatchSize
tickSleepPerSeries := s.currRuntimeOptions.tickSleepPerSeries
// Acquire snapshot of block states here to avoid releasing the
// RLock and acquiring it right after.
blockStates := s.BlockStatesSnapshot()
// Use blockStatesSnapshotWithRLock here to prevent nested read locks.
// Nested read locks will cause deadlocks if there is write lock attempt in
// between the nested read locks, since the write lock attempt will block
// future read lock attempts.
blockStates := s.blockStatesSnapshotWithRLock()
s.RUnlock()
s.forEachShardEntryBatch(func(currEntries []*lookup.Entry) bool {
// re-using `expired` to amortize allocs, still need to reset it
Expand Down Expand Up @@ -2096,6 +2104,8 @@ func (s *dbShard) ColdFlush(
s.RUnlock()
return errShardNotBootstrappedToFlush
}
// Use blockStatesSnapshotWithRLock to avoid having to re-acquire read lock.
blockStates := s.blockStatesSnapshotWithRLock()
s.RUnlock()

resources.reset()
Expand All @@ -2106,7 +2116,6 @@ func (s *dbShard) ColdFlush(
idElementPool = resources.idElementPool
)

blockStates := s.BlockStatesSnapshot()
blockStatesSnapshot, bootstrapped := blockStates.UnwrapValue()
if !bootstrapped {
return errFlushStateIsNotInitialized
Expand Down
30 changes: 29 additions & 1 deletion src/dbnode/storage/shard_race_prop_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build big
//
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down Expand Up @@ -110,6 +112,10 @@ var fetchBlocksMetadataV2ShardFn testShardReadFn = func(shard *dbShard) {
func propTestDatabaseShard(t *testing.T, tickBatchSize int) (*dbShard, Options) {
opts := DefaultTestOptions().SetRuntimeOptionsManager(runtime.NewOptionsManager())
shard := testDatabaseShard(t, opts)
// This sleep duration needs to be at the microsecond level because tests
// can have a high number of iterations, using a high number of series in
// combination with a small batch size, causing frequent timeouts during
// execution.
shard.currRuntimeOptions.tickSleepPerSeries = time.Microsecond
shard.currRuntimeOptions.tickSleepSeriesBatchSize = tickBatchSize
return shard, opts
Expand All @@ -127,7 +133,29 @@ func anyIDs() gopter.Gen {
}

func TestShardTickWriteRace(t *testing.T) {
shard, opts := propTestDatabaseShard(t, 10)
parameters := gopter.DefaultTestParameters()
seed := time.Now().UnixNano()
parameters.MinSuccessfulTests = 100
parameters.MaxSize = 10
parameters.Rng = rand.New(rand.NewSource(seed))
properties := gopter.NewProperties(parameters)

properties.Property("Concurrent Tick and Write doesn't deadlock", prop.ForAll(
func(tickBatchSize int) bool {
testShardTickWriteRace(t, int(tickBatchSize))
return true
},
gen.IntRange(1, 10).WithLabel("tickBatchSize"),
))

reporter := gopter.NewFormatedReporter(true, 160, os.Stdout)
if !properties.Run(reporter) {
t.Errorf("failed with initial seed: %d", seed)
}
}

func testShardTickWriteRace(t *testing.T, tickBatchSize int) {
shard, opts := propTestDatabaseShard(t, tickBatchSize)
defer func() {
shard.Close()
opts.RuntimeOptionsManager().Close()
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ func TestShardNewEntryTakesRefToNoFinalizeID(t *testing.T) {

func TestShardIterateBatchSize(t *testing.T) {
smaller := shardIterateBatchMinSize - 1
require.Equal(t, smaller, iterateBatchSize(smaller))
require.Equal(t, shardIterateBatchMinSize, iterateBatchSize(smaller))

require.Equal(t, shardIterateBatchMinSize, iterateBatchSize(shardIterateBatchMinSize+1))

Expand Down

0 comments on commit af17524

Please sign in to comment.