Skip to content

Commit

Permalink
*: fix flappy tests (#2869)
Browse files Browse the repository at this point in the history
This PR:
 - fixes a data race in p2p.Sender which was flapping in integration and compose tests
 - makes "prepare round 2, decide round 3" QBFT internal test more reliable; tested empirically by running the test 100.000 times, usually the flapping issue manifested itself after a couple runs

category: bug
ticket: #2763

Closes #2763.
  • Loading branch information
gsora authored Feb 8, 2024
1 parent 82415b0 commit 24d8cc1
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 20 deletions.
12 changes: 6 additions & 6 deletions core/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func TestQBFT(t *testing.T) {
})
})

t.Run("prepare round 2, decide round 23", func(t *testing.T) {
t.Run("prepare round 2, decide round 3", func(t *testing.T) {
testQBFT(t, test{
Instance: 0,
CommitsAfter: 2,
ValueDelay: map[int64]time.Duration{
1: time.Second,
1: 2 * time.Second,
},
DecideRound: 3,
PreparedVal: 2,
Expand Down Expand Up @@ -433,14 +433,14 @@ func testQBFT(t *testing.T, test test) {
for _, commit := range qCommit {
// Ensure that all results are the same
for _, previous := range results {
require.EqualValues(t, previous.Value(), commit.Value())
require.EqualValues(t, previous.Value(), commit.Value(), "commit values")
}
if !test.RandomRound {
require.EqualValues(t, test.DecideRound, commit.Round())
require.EqualValues(t, test.DecideRound, commit.Round(), "wrong decide round")
if test.PreparedVal != 0 { // Check prepared value if set
require.EqualValues(t, test.PreparedVal, commit.Value())
require.EqualValues(t, test.PreparedVal, commit.Value(), "wrong prepared value")
} else { // Otherwise check that leader value was used.
require.True(t, isLeader(test.Instance, commit.Round(), commit.Value()))
require.True(t, isLeader(test.Instance, commit.Round(), commit.Value()), "not leader")
}
}
results[commit.Source()] = commit
Expand Down
60 changes: 48 additions & 12 deletions p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,45 @@ var (
_ SendFunc = new(Sender).SendAsync
)

// errorBuffer holds a slice of errors, and mutexes access to it with a sync.RWMutex.
type errorBuffer struct {
store []error
m sync.RWMutex
}

// add adds err to the buffer.
func (eb *errorBuffer) add(err error) {
eb.m.Lock()
defer eb.m.Unlock()
eb.store = append(eb.store, err)
}

// get gets idx from the buffer.
func (eb *errorBuffer) get(idx int) error {
eb.m.RLock()
defer eb.m.RUnlock()

return eb.store[idx]
}

// len returns the length of the buffer.
func (eb *errorBuffer) len() int {
eb.m.RLock()
defer eb.m.RUnlock()

return len(eb.store)
}

// trim trims the buffer by the given amount.
func (eb *errorBuffer) trim(by int) {
eb.m.Lock()
defer eb.m.Unlock()
eb.store = eb.store[len(eb.store)-by:]
}

type peerState struct {
failing bool
buffer []error
buffer errorBuffer
}

// Sender provides an API for sending libp2p messages, both synchronous and asynchronous.
Expand All @@ -59,26 +95,26 @@ type Sender struct {

// addResult adds the result of sending a p2p message to the internal state and possibly logs a status change.
func (s *Sender) addResult(ctx context.Context, peerID peer.ID, err error) {
var state peerState
state := &peerState{}
if val, ok := s.states.Load(peerID); ok {
state = val.(peerState)
state = val.(*peerState)
}

state.buffer = append(state.buffer, err)
if len(state.buffer) > senderBuffer { // Trim buffer
state.buffer = state.buffer[len(state.buffer)-senderBuffer:]
state.buffer.add(err)
if state.buffer.len() > senderBuffer { // Trim buffer
state.buffer.trim(senderBuffer)
}

failure := err != nil
success := !failure

if success && state.failing {
// See if we have senderHysteresis successes i.o.t. change state to success.
full := len(state.buffer) == senderBuffer
oldestFailure := state.buffer[0] != nil
full := state.buffer.len() == senderBuffer
oldestFailure := state.buffer.get(0) != nil
othersSuccess := true
for i := 1; i < len(state.buffer); i++ {
if state.buffer[i] != nil {
for i := 1; i < state.buffer.len(); i++ {
if state.buffer.get(i) != nil {
othersSuccess = false
break
}
Expand All @@ -88,7 +124,7 @@ func (s *Sender) addResult(ctx context.Context, peerID peer.ID, err error) {
state.failing = false
log.Info(ctx, "P2P sending recovered", z.Str("peer", PeerName(peerID)))
}
} else if failure && (len(state.buffer) == 1 || !state.failing) {
} else if failure && (state.buffer.len() == 1 || !state.failing) {
// First attempt failed or state changed to failing

if _, ok := dialErrMsgs(err); !ok { // Only log non-dial errors
Expand All @@ -98,7 +134,7 @@ func (s *Sender) addResult(ctx context.Context, peerID peer.ID, err error) {
state.failing = true
}

s.states.Store(peerID, state) // Note there is a race if two results for the same peer is added at the same time, but this isn't critical.
s.states.Store(peerID, state)
}

// SendAsync returns nil and sends a libp2p message asynchronously.
Expand Down
30 changes: 28 additions & 2 deletions p2p/sender_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func TestSenderAddResult(t *testing.T) {

assertFailing := func(t *testing.T, expect bool) {
t.Helper()
var state peerState
state := &peerState{}
if val, ok := sender.states.Load(peerID); ok {
state = val.(peerState)
state = val.(*peerState)
}
require.Equal(t, expect, state.failing)
}
Expand Down Expand Up @@ -65,6 +65,32 @@ func TestSenderAddResult(t *testing.T) {
// INFO P2P sending failing {"peer": "better-week"}
}

func TestAddResult(t *testing.T) {
// This test is designed to trigger a race condition on Sender.addResult().
// It will never fail if:
// - it's not executed with `-race`
// - it's executed with `-race` and there's no race condition in Sender.addResult().

ctx := context.Background()

sender := new(Sender)

var wg sync.WaitGroup

concurrencyFactor := 1000

wg.Add(concurrencyFactor)

for i := 0; i < concurrencyFactor; i++ {
go func() {
sender.addResult(ctx, "test", nil)
wg.Done()
}()
}

wg.Wait()
}

func TestSenderRetry(t *testing.T) {
sender := new(Sender)
ctx := context.Background()
Expand Down

0 comments on commit 24d8cc1

Please sign in to comment.