Skip to content

Commit

Permalink
[cluster] Fix deadlock in new ActiveStagedPlacement method (#2980)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 4, 2020
1 parent df320b2 commit 25b1196
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 12 deletions.
11 changes: 5 additions & 6 deletions src/cluster/placement/staged_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"errors"
"sort"
"sync"
"sync/atomic"

"go.uber.org/atomic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/x/clock"
Expand All @@ -44,7 +45,7 @@ type activeStagedPlacement struct {
onPlacementsAddedFn OnPlacementsAddedFn
onPlacementsRemovedFn OnPlacementsRemovedFn

expiring int32
expiring atomic.Int32
closed bool
doneFn DoneFn
}
Expand Down Expand Up @@ -98,8 +99,6 @@ func (p *activeStagedPlacement) Close() error {
}

func (p *activeStagedPlacement) Version() int {
p.RLock()
defer p.RUnlock()
return p.version
}

Expand All @@ -115,7 +114,7 @@ func (p *activeStagedPlacement) activePlacementWithLock(timeNanos int64) (Placem
}
placement := p.placements[idx]
// If the placement that's in effect is not the first placment, expire the stale ones.
if idx > 0 && atomic.CompareAndSwapInt32(&p.expiring, 0, 1) {
if idx > 0 && p.expiring.CAS(0, 1) {
go p.expire()
}
return placement, nil
Expand All @@ -126,7 +125,7 @@ func (p *activeStagedPlacement) expire() {
// because this code path is triggered very infrequently.
cleanup := func() {
p.Unlock()
atomic.StoreInt32(&p.expiring, 0)
p.expiring.Store(0)
}
p.Lock()
defer cleanup()
Expand Down
87 changes: 81 additions & 6 deletions src/cluster/placement/staged_placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/cluster/shard"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

var (
Expand Down Expand Up @@ -395,33 +396,107 @@ func TestActiveStagedPlacementExpireAlreadyClosed(t *testing.T) {
p := &activeStagedPlacement{
placements: append([]Placement{}, testActivePlacements...),
nowFn: func() time.Time { return time.Unix(0, 99999) },
expiring: 1,
closed: true,
onPlacementsRemovedFn: func(placements []Placement) {
for _, placement := range placements {
removedInstances = append(removedInstances, placement.Instances())
}
},
}
p.expiring.Store(1)
p.expire()
require.Equal(t, int32(0), p.expiring)
require.Equal(t, int32(0), p.expiring.Load())
require.Nil(t, removedInstances)
}

func TestActiveStagedPlacementVersionWhileExpiring(t *testing.T) {
for i := 0; i < 100; i++ {
// test itself is fast, unless there's a deadlock
testActiveStagedPlacementVersionWhileExpiring(t)
}
}

//nolint:gocyclo
func testActiveStagedPlacementVersionWhileExpiring(t *testing.T) {
var (
doneCh = make(chan struct{})
signalCh = make(chan struct{})
version int
ranCleanup atomic.Bool
)

p := newActiveStagedPlacement(append([]Placement{}, testActivePlacements...), 42, nil)
p.nowFn = func() time.Time {
return time.Unix(0, testActivePlacements[len(testActivePlacements)-1].CutoverNanos()+1)
}
p.onPlacementsRemovedFn = func(_ []Placement) {
ranCleanup.Store(true)
}

go func() {
defer close(doneCh)
for {
version = p.Version()
select {
case signalCh <- struct{}{}:
return
default:
}
}
}()

pl, doneFn, err := p.ActivePlacement()
require.NoError(t, err)
require.NotNil(t, pl)
require.NotNil(t, doneFn)

// active placement is not the first in the list - expiration of past
// placements must be triggered
require.Equal(t, int32(1), p.expiring.Load())

// make sure p.Version() call was attempted at least once
select {
case <-signalCh:
case <-time.After(time.Second):
t.Fatalf("test timed out, deadlock?")
}

// release placement lock to unblock expiration process
doneFn()
select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatalf("test timed out, deadlock?")
}

// there's no good way to determine when expire process has been completed,
// try polling for 100ms
for i := 0; i < 100; i++ {
if ranCleanup.Load() && p.expiring.Load() == int32(0) {
break
}
time.Sleep(1 * time.Millisecond)
}

require.Equal(t, 42, version)
require.True(t, ranCleanup.Load())
require.Equal(t, int32(0), p.expiring.Load())
}

func TestActiveStagedPlacementExpireAlreadyExpired(t *testing.T) {
var removedInstances [][]Instance
p := &activeStagedPlacement{
placements: append([]Placement{}, testActivePlacements...),
nowFn: func() time.Time { return time.Unix(0, 0) },
expiring: 1,
onPlacementsRemovedFn: func(placements []Placement) {
for _, placement := range placements {
removedInstances = append(removedInstances, placement.Instances())
}
},
}
p.expiring.Store(1)
p.expire()
require.Equal(t, int32(0), p.expiring)
require.Equal(t, int32(0), p.expiring.Load())
require.Nil(t, removedInstances)
}

Expand All @@ -430,15 +505,15 @@ func TestActiveStagedPlacementExpireSuccess(t *testing.T) {
p := &activeStagedPlacement{
placements: append([]Placement{}, testActivePlacements...),
nowFn: func() time.Time { return time.Unix(0, 99999) },
expiring: 1,
onPlacementsRemovedFn: func(placements []Placement) {
for _, placement := range placements {
removedInstances = append(removedInstances, placement.Instances())
}
},
}
p.expiring.Store(1)
p.expire()
require.Equal(t, int32(0), p.expiring)
require.Equal(t, int32(0), p.expiring.Load())
require.Equal(t, [][]Instance{testActivePlacements[0].Instances()}, removedInstances)
require.Equal(t, 1, len(p.placements))
validateSnapshot(t, testActivePlacements[1], p.placements[0])
Expand Down

0 comments on commit 25b1196

Please sign in to comment.