Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Fix concurrency granularity of seekerManager.UpdateOpenLease #2790

Merged
merged 6 commits into from
Oct 27, 2020
34 changes: 19 additions & 15 deletions src/dbnode/persist/fs/seek_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ type seekerManager struct {
bytesPool pool.CheckedBytesPool
filePathPrefix string

status seekerManagerStatus
shardSet sharding.ShardSet
isUpdatingLease bool
status seekerManagerStatus
shardSet sharding.ShardSet
updateOpenLeasesInProgress map[block.HashableLeaseDescriptor]struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on consistent naming w/ what's in block/lease.go


cacheShardIndicesWorkers xsync.WorkerPool

Expand Down Expand Up @@ -205,6 +205,7 @@ func NewSeekerManager(
logger: opts.InstrumentOptions().Logger(),
openCloseLoopDoneCh: make(chan struct{}),
reusableSeekerResourcesPool: reusableSeekerResourcesPool,
updateOpenLeasesInProgress: make(map[block.HashableLeaseDescriptor]struct{}),
}
m.openAnyUnopenSeekersFn = m.openAnyUnopenSeekers
m.newOpenSeekerFn = m.newOpenSeeker
Expand Down Expand Up @@ -466,23 +467,25 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s
// the currently borrowed "inactive" seekers (if any) to be returned.
// 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker,
// and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine
// running the UpdateOpenlease() function that all inactive seekers have been returned and closed at
// running the UpdateOpenLease() function that all inactive seekers have been returned and closed at
// which point the function will return successfully.
func (m *seekerManager) UpdateOpenLease(
descriptor block.LeaseDescriptor,
state block.LeaseState,
) (block.UpdateOpenLeaseResult, error) {
noop, err := m.startUpdateOpenLease(descriptor)
hashableDescriptor := block.NewHashableLeaseDescriptor(descriptor)
noop, err := m.startUpdateOpenLease(descriptor.Namespace, hashableDescriptor)
if err != nil {
return 0, err
}
if noop {
return block.NoOpenLease, nil
}

defer func() {
m.Lock()
// Was already set to true by startUpdateOpenLease().
m.isUpdatingLease = false
// Was added by startUpdateOpenLease().
delete(m.updateOpenLeasesInProgress, hashableDescriptor)
m.Unlock()
}()

Expand All @@ -500,25 +503,26 @@ func (m *seekerManager) UpdateOpenLease(
return updateLeaseResult, nil
}

func (m *seekerManager) startUpdateOpenLease(descriptor block.LeaseDescriptor) (bool, error) {
func (m *seekerManager) startUpdateOpenLease(
namespace ident.ID,
hashableDescriptor block.HashableLeaseDescriptor,
) (bool, error) {
m.Lock()
defer m.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would we want to release this lock and perhaps use a sync.Map like we do in block/lease.go for tracking open leases? Although tbh perf wise this prob wouldn't make a diff at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using sync.Map (as I did in block/lease.go) was the initial approach that I tried, but then I realized that the lock is being held for the whole duration of startUpdateOpenLease, and there is this namespace check in between progress check / update, so I decided to go with a simple map under this lock (to better preserve the original semantics).


if m.status != seekerManagerOpen {
return false, errUpdateOpenLeaseSeekerManagerNotOpen
}
if m.isUpdatingLease {
// This guard is a little overly aggressive. In practice, the algorithm remains correct even in the presence
// of concurrent UpdateOpenLease() calls as long as they are for different shard/blockStart combinations.
// However, the calling code currently has no need to call this method concurrently at all so use the
// simpler check for now.
if _, ok := m.updateOpenLeasesInProgress[hashableDescriptor]; ok {
// Prevent UpdateOpenLease() calls from happening concurrently
// (at the granularity of block.LeaseDescriptor).
return false, errConcurrentUpdateOpenLeaseNotAllowed
}
if !m.namespace.Equal(descriptor.Namespace) {
if !m.namespace.Equal(namespace) {
return true, nil
}

m.isUpdatingLease = true
m.updateOpenLeasesInProgress[hashableDescriptor] = struct{}{}

return false, nil
}
Expand Down
88 changes: 83 additions & 5 deletions src/dbnode/persist/fs/seek_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/fortytw2/leaktest"
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

var (
ctrl = gomock.NewController(t)
ctrl = xtest.NewController(t)
shards = []uint32{2, 5, 9, 478, 1023}
m = NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
)
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) {
sharding.DefaultHashFn(1),
)
require.NoError(t, err)
// Pick a start time thats within retention so the background loop doesn't close
// Pick a start time that's within retention so the background loop doesn't close
// the seeker.
blockStart := time.Now().Truncate(metadata.Options().RetentionOptions().BlockSize())
require.NoError(t, m.Open(metadata, shardSet))
Expand Down Expand Up @@ -216,12 +217,89 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) {
require.NoError(t, m.Close())
}

func TestSeekerManagerUpdateOpenLeaseConcurrentNotAllowed(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

var (
ctrl = xtest.NewController(t)
shards = []uint32{1, 2}
m = NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
metadata = testNs1Metadata(t)
// Pick a start time that's within retention so the background loop doesn't close the seeker.
blockStart = time.Now().Truncate(metadata.Options().RetentionOptions().BlockSize())
)
defer ctrl.Finish()

descriptor1 := block.LeaseDescriptor{
Namespace: metadata.ID(),
Shard: 1,
BlockStart: blockStart,
}

m.newOpenSeekerFn = func(
shard uint32,
blockStart time.Time,
volume int,
) (DataFileSetSeeker, error) {
if volume == 1 {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Call UpdateOpenLease while within another UpdateOpenLease call.
_, err := m.UpdateOpenLease(descriptor1, block.LeaseState{Volume: 2})
if shard == 1 {
// Concurrent call is made with the same shard id (and other values).
require.Equal(t, errConcurrentUpdateOpenLeaseNotAllowed, err)
} else {
// Concurrent call is made with a different shard id (2) and so it should pass.
require.NoError(t, err)
}
}()
wg.Wait()
}
mock := NewMockDataFileSetSeeker(ctrl)
mock.EXPECT().ConcurrentClone().Return(mock, nil).AnyTimes()
mock.EXPECT().Close().AnyTimes()
mock.EXPECT().ConcurrentIDBloomFilter().Return(nil).AnyTimes()
return mock, nil
}
m.sleepFn = func(_ time.Duration) {
time.Sleep(time.Millisecond)
}

shardSet, err := sharding.NewShardSet(
sharding.NewShards(shards, shard.Available),
sharding.DefaultHashFn(1),
)
require.NoError(t, err)
require.NoError(t, m.Open(metadata, shardSet))

for _, shardID := range shards {
seeker, err := m.Borrow(shardID, blockStart)
require.NoError(t, err)
require.NoError(t, m.Return(shardID, blockStart, seeker))
}

updateResult, err := m.UpdateOpenLease(descriptor1, block.LeaseState{Volume: 1})
require.NoError(t, err)
require.Equal(t, block.UpdateOpenLease, updateResult)

descriptor2 := descriptor1
descriptor2.Shard = 2
updateResult, err = m.UpdateOpenLease(descriptor2, block.LeaseState{Volume: 1})
require.NoError(t, err)
require.Equal(t, block.UpdateOpenLease, updateResult)

require.NoError(t, m.Close())
}

// TestSeekerManagerBorrowOpenSeekersLazy tests that the Borrow() method will
// open seekers lazily if they're not already open.
func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)

shards := []uint32{2, 5, 9, 478, 1023}
m := NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
Expand Down Expand Up @@ -271,7 +349,7 @@ func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) {
func TestSeekerManagerOpenCloseLoop(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
m := NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
clockOpts := m.opts.ClockOptions()
now := clockOpts.NowFn()()
Expand Down Expand Up @@ -450,7 +528,7 @@ func TestSeekerManagerAssignShardSet(t *testing.T) {
defer leaktest.CheckTimeout(t, 1*time.Minute)()

var (
ctrl = gomock.NewController(t)
ctrl = xtest.NewController(t)
shards = []uint32{1, 2}
m = NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager)
)
Expand Down
21 changes: 1 addition & 20 deletions src/dbnode/storage/block/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"sync"
"time"
)

var (
Expand Down Expand Up @@ -147,7 +146,7 @@ func (m *leaseManager) UpdateOpenLeases(
// return before being released.
m.Unlock()

hashableDescriptor := newHashableLeaseDescriptor(descriptor)
hashableDescriptor := NewHashableLeaseDescriptor(descriptor)
if _, ok := m.updateOpenLeasesInProgress.LoadOrStore(hashableDescriptor, struct{}{}); ok {
// Prevent UpdateOpenLeases() calls from happening concurrently (since the lock
// is not held for the duration) to ensure that Leaser's receive all updates
Expand Down Expand Up @@ -192,21 +191,3 @@ func (m *leaseManager) isRegistered(leaser Leaser) bool {
}
return false
}

type hashableLeaseDescriptor struct {
namespace string
shard uint32
blockStart time.Time
}

func newHashableLeaseDescriptor(descriptor LeaseDescriptor) hashableLeaseDescriptor {
ns := ""
if descriptor.Namespace != nil {
ns = descriptor.Namespace.String()
}
return hashableLeaseDescriptor{
namespace: ns,
shard: descriptor.Shard,
blockStart: descriptor.BlockStart,
}
}
20 changes: 20 additions & 0 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,26 @@ type LeaseDescriptor struct {
BlockStart time.Time
}

// HashableLeaseDescriptor is a lease descriptor that can be used as a map key.
type HashableLeaseDescriptor struct {
namespace string
shard uint32
blockStart time.Time
}

// HashableLeaseDescriptor transforms LeaseDescriptor into a HashableLeaseDescriptor.
func NewHashableLeaseDescriptor(descriptor LeaseDescriptor) HashableLeaseDescriptor {
ns := ""
if descriptor.Namespace != nil {
ns = descriptor.Namespace.String()
}
return HashableLeaseDescriptor{
namespace: ns,
shard: descriptor.Shard,
blockStart: descriptor.BlockStart,
}
}

// LeaseState is the current state of a lease which can be
// requested to be updated.
type LeaseState struct {
Expand Down