Skip to content

Commit

Permalink
[dbnode] Fix concurrency granularity of seekerManager.UpdateOpenLease (
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Oct 27, 2020
1 parent 53414ba commit 0e2f31f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 40 deletions.
34 changes: 19 additions & 15 deletions src/dbnode/persist/fs/seek_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ type seekerManager struct {
bytesPool pool.CheckedBytesPool
filePathPrefix string

status seekerManagerStatus
shardSet sharding.ShardSet
isUpdatingLease bool
status seekerManagerStatus
shardSet sharding.ShardSet
updateOpenLeasesInProgress map[block.HashableLeaseDescriptor]struct{}

cacheShardIndicesWorkers xsync.WorkerPool

Expand Down Expand Up @@ -205,6 +205,7 @@ func NewSeekerManager(
logger: opts.InstrumentOptions().Logger(),
openCloseLoopDoneCh: make(chan struct{}),
reusableSeekerResourcesPool: reusableSeekerResourcesPool,
updateOpenLeasesInProgress: make(map[block.HashableLeaseDescriptor]struct{}),
}
m.openAnyUnopenSeekersFn = m.openAnyUnopenSeekers
m.newOpenSeekerFn = m.newOpenSeeker
Expand Down Expand Up @@ -466,23 +467,25 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s
// the currently borrowed "inactive" seekers (if any) to be returned.
// 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker,
// and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine
// running the UpdateOpenlease() function that all inactive seekers have been returned and closed at
// running the UpdateOpenLease() function that all inactive seekers have been returned and closed at
// which point the function will return successfully.
func (m *seekerManager) UpdateOpenLease(
descriptor block.LeaseDescriptor,
state block.LeaseState,
) (block.UpdateOpenLeaseResult, error) {
noop, err := m.startUpdateOpenLease(descriptor)
hashableDescriptor := block.NewHashableLeaseDescriptor(descriptor)
noop, err := m.startUpdateOpenLease(descriptor.Namespace, hashableDescriptor)
if err != nil {
return 0, err
}
if noop {
return block.NoOpenLease, nil
}

defer func() {
m.Lock()
// Was already set to true by startUpdateOpenLease().
m.isUpdatingLease = false
// Was added by startUpdateOpenLease().
delete(m.updateOpenLeasesInProgress, hashableDescriptor)
m.Unlock()
}()

Expand All @@ -500,25 +503,26 @@ func (m *seekerManager) UpdateOpenLease(
return updateLeaseResult, nil
}

func (m *seekerManager) startUpdateOpenLease(descriptor block.LeaseDescriptor) (bool, error) {
func (m *seekerManager) startUpdateOpenLease(
namespace ident.ID,
hashableDescriptor block.HashableLeaseDescriptor,
) (bool, error) {
m.Lock()
defer m.Unlock()

if m.status != seekerManagerOpen {
return false, errUpdateOpenLeaseSeekerManagerNotOpen
}
if m.isUpdatingLease {
// This guard is a little overly aggressive. In practice, the algorithm remains correct even in the presence
// of concurrent UpdateOpenLease() calls as long as they are for different shard/blockStart combinations.
// However, the calling code currently has no need to call this method concurrently at all so use the
// simpler check for now.
if _, ok := m.updateOpenLeasesInProgress[hashableDescriptor]; ok {
// Prevent UpdateOpenLease() calls from happening concurrently
// (at the granularity of block.LeaseDescriptor).
return false, errConcurrentUpdateOpenLeaseNotAllowed
}
if !m.namespace.Equal(descriptor.Namespace) {
if !m.namespace.Equal(namespace) {
return true, nil
}

m.isUpdatingLease = true
m.updateOpenLeasesInProgress[hashableDescriptor] = struct{}{}

return false, nil
}
Expand Down
88 changes: 83 additions & 5 deletions src/dbnode/persist/fs/seek_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/fortytw2/leaktest"
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

var (
ctrl = gomock.NewController(t)
ctrl = xtest.NewController(t)
shards = []uint32{2, 5, 9, 478, 1023}
m = NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
)
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) {
sharding.DefaultHashFn(1),
)
require.NoError(t, err)
// Pick a start time thats within retention so the background loop doesn't close
// Pick a start time that's within retention so the background loop doesn't close
// the seeker.
blockStart := time.Now().Truncate(metadata.Options().RetentionOptions().BlockSize())
require.NoError(t, m.Open(metadata, shardSet))
Expand Down Expand Up @@ -216,12 +217,89 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) {
require.NoError(t, m.Close())
}

func TestSeekerManagerUpdateOpenLeaseConcurrentNotAllowed(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

var (
ctrl = xtest.NewController(t)
shards = []uint32{1, 2}
m = NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
metadata = testNs1Metadata(t)
// Pick a start time that's within retention so the background loop doesn't close the seeker.
blockStart = time.Now().Truncate(metadata.Options().RetentionOptions().BlockSize())
)
defer ctrl.Finish()

descriptor1 := block.LeaseDescriptor{
Namespace: metadata.ID(),
Shard: 1,
BlockStart: blockStart,
}

m.newOpenSeekerFn = func(
shard uint32,
blockStart time.Time,
volume int,
) (DataFileSetSeeker, error) {
if volume == 1 {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Call UpdateOpenLease while within another UpdateOpenLease call.
_, err := m.UpdateOpenLease(descriptor1, block.LeaseState{Volume: 2})
if shard == 1 {
// Concurrent call is made with the same shard id (and other values).
require.Equal(t, errConcurrentUpdateOpenLeaseNotAllowed, err)
} else {
// Concurrent call is made with a different shard id (2) and so it should pass.
require.NoError(t, err)
}
}()
wg.Wait()
}
mock := NewMockDataFileSetSeeker(ctrl)
mock.EXPECT().ConcurrentClone().Return(mock, nil).AnyTimes()
mock.EXPECT().Close().AnyTimes()
mock.EXPECT().ConcurrentIDBloomFilter().Return(nil).AnyTimes()
return mock, nil
}
m.sleepFn = func(_ time.Duration) {
time.Sleep(time.Millisecond)
}

shardSet, err := sharding.NewShardSet(
sharding.NewShards(shards, shard.Available),
sharding.DefaultHashFn(1),
)
require.NoError(t, err)
require.NoError(t, m.Open(metadata, shardSet))

for _, shardID := range shards {
seeker, err := m.Borrow(shardID, blockStart)
require.NoError(t, err)
require.NoError(t, m.Return(shardID, blockStart, seeker))
}

updateResult, err := m.UpdateOpenLease(descriptor1, block.LeaseState{Volume: 1})
require.NoError(t, err)
require.Equal(t, block.UpdateOpenLease, updateResult)

descriptor2 := descriptor1
descriptor2.Shard = 2
updateResult, err = m.UpdateOpenLease(descriptor2, block.LeaseState{Volume: 1})
require.NoError(t, err)
require.Equal(t, block.UpdateOpenLease, updateResult)

require.NoError(t, m.Close())
}

// TestSeekerManagerBorrowOpenSeekersLazy tests that the Borrow() method will
// open seekers lazily if they're not already open.
func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)

shards := []uint32{2, 5, 9, 478, 1023}
m := NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
Expand Down Expand Up @@ -271,7 +349,7 @@ func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) {
func TestSeekerManagerOpenCloseLoop(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
m := NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
clockOpts := m.opts.ClockOptions()
now := clockOpts.NowFn()()
Expand Down Expand Up @@ -450,7 +528,7 @@ func TestSeekerManagerAssignShardSet(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

var (
ctrl = gomock.NewController(t)
ctrl = xtest.NewController(t)
shards = []uint32{1, 2}
m = NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
)
Expand Down
21 changes: 1 addition & 20 deletions src/dbnode/storage/block/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"sync"
"time"
)

var (
Expand Down Expand Up @@ -147,7 +146,7 @@ func (m *leaseManager) UpdateOpenLeases(
// return before being released.
m.Unlock()

hashableDescriptor := newHashableLeaseDescriptor(descriptor)
hashableDescriptor := NewHashableLeaseDescriptor(descriptor)
if _, ok := m.updateOpenLeasesInProgress.LoadOrStore(hashableDescriptor, struct{}{}); ok {
// Prevent UpdateOpenLeases() calls from happening concurrently (since the lock
// is not held for the duration) to ensure that Leaser's receive all updates
Expand Down Expand Up @@ -192,21 +191,3 @@ func (m *leaseManager) isRegistered(leaser Leaser) bool {
}
return false
}

type hashableLeaseDescriptor struct {
namespace string
shard uint32
blockStart time.Time
}

func newHashableLeaseDescriptor(descriptor LeaseDescriptor) hashableLeaseDescriptor {
ns := ""
if descriptor.Namespace != nil {
ns = descriptor.Namespace.String()
}
return hashableLeaseDescriptor{
namespace: ns,
shard: descriptor.Shard,
blockStart: descriptor.BlockStart,
}
}
20 changes: 20 additions & 0 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,26 @@ type LeaseDescriptor struct {
BlockStart time.Time
}

// HashableLeaseDescriptor is a lease descriptor that can be used as a map key.
type HashableLeaseDescriptor struct {
namespace string
shard uint32
blockStart time.Time
}

// HashableLeaseDescriptor transforms LeaseDescriptor into a HashableLeaseDescriptor.
func NewHashableLeaseDescriptor(descriptor LeaseDescriptor) HashableLeaseDescriptor {
ns := ""
if descriptor.Namespace != nil {
ns = descriptor.Namespace.String()
}
return HashableLeaseDescriptor{
namespace: ns,
shard: descriptor.Shard,
blockStart: descriptor.BlockStart,
}
}

// LeaseState is the current state of a lease which can be
// requested to be updated.
type LeaseState struct {
Expand Down

0 comments on commit 0e2f31f

Please sign in to comment.