Skip to content

Commit

Permalink
[chore] [receiver/solacereceiver] Fixed a few flaky tests in receiver…
Browse files Browse the repository at this point in the history
…/solacereceiver (#20348)

Addresses a couple of test failures with two main problems:

- Addressed an issue where on windows the timer window was too small to get within a scheduling tick. Upped the timeout in this case.
- Addressed a cache coherency issue with callbacks on mock transports. Replaced callbacks with atomic unsafe.Pointers.
  • Loading branch information
mcardy authored Mar 27, 2023
1 parent da13d9c commit 7f4e051
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 20 deletions.
51 changes: 33 additions & 18 deletions receiver/solacereceiver/messaging_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"net"
"reflect"
"runtime"
"sync/atomic"
"testing"
"time"
"unsafe"

"github.com/Azure/go-amqp"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -283,10 +285,10 @@ func TestAMQPNewClientDialAndCloseCtxTimeoutFailure(t *testing.T) {
service, conn := startMockedService(t)

closed := false
conn.closeHandle = func() error {
conn.setCloseHandler(func() error {
closed = true
return nil
}
})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
go func() {
Expand All @@ -309,10 +311,10 @@ func TestAMQPNewClientDialAndCloseConnFailure(t *testing.T) {
mockWriteData(conn, writeData)

closed := false
conn.closeHandle = func() error {
conn.setCloseHandler(func() error {
closed = true
return fmt.Errorf("some error")
}
})
service.close(context.Background())
// expect conn.Close to have been called
assert.True(t, closed)
Expand Down Expand Up @@ -345,13 +347,13 @@ func TestAMQPAcknowledgeMessage(t *testing.T) {
writeCalled := make(chan struct{})
// Expected accept from AMQP frame for first received message
// "\x00\x00\x00\x1c\x02\x00\x00\x00\x00\x53\x15\xd0\x00\x00\x00\x0c\x00\x00\x00\x05\x41\x43\x40\x41\x00\x53\x24\x45"
conn.writeHandle = func(b []byte) (n int, err error) {
conn.setWriteHandler(func(b []byte) (n int, err error) {
// assert that a disposition is written
assert.Equal(t, byte(0x15), b[10])
assert.Equal(t, byte(0x24), b[26]) // 0x24 at the 27th byte in this case means accept
close(writeCalled)
return len(b), nil
}
})
err = service.accept(context.Background(), msg)
assert.NoError(t, err)
assertChannelClosed(t, writeCalled)
Expand All @@ -366,13 +368,13 @@ func TestAMQPModifyMessage(t *testing.T) {
writeCalled := make(chan struct{})
// Expected modify from AMQP frame for first received message
// "\x00\x00\x00\x1c\x02\x00\x00\x00\x00\x53\x15\xd0\x00\x00\x00\x0c\x00\x00\x00\x05\x41\x43\x40\x41\x00\x53\x25\x45"
conn.writeHandle = func(b []byte) (n int, err error) {
conn.setWriteHandler(func(b []byte) (n int, err error) {
// assert that a disposition is written
assert.Equal(t, byte(0x15), b[10])
assert.Equal(t, byte(0x27), b[26]) // 0x27 at the 27th byte in this case means modify
close(writeCalled)
return len(b), nil
}
})
err = service.failed(context.Background(), msg)
assert.NoError(t, err)
select {
Expand Down Expand Up @@ -420,10 +422,10 @@ func closeMockedAMQPService(t *testing.T, service *amqpMessagingService, conn *c
mockWriteData(conn, writeData)

closed := false
conn.closeHandle = func() error {
conn.setCloseHandler(func() error {
closed = true
return nil
}
})
service.close(context.Background())
// expect conn.Close to have been called
assert.True(t, closed)
Expand Down Expand Up @@ -480,7 +482,7 @@ func TestAMQPNewClientDialWithBadAttachResponseExpectingError(t *testing.T) {
}

func mockWriteData(conn *connMock, data [][]byte, callbacks ...func(sentData, receivedData []byte)) {
conn.writeHandle = func(b []byte) (n int, err error) {
conn.setWriteHandler(func(b []byte) (n int, err error) {
var next []byte
if len(data) != 0 {
next = data[0]
Expand All @@ -493,7 +495,7 @@ func mockWriteData(conn *connMock, data [][]byte, callbacks ...func(sentData, re
conn.nextData <- next
}
return len(b), nil
}
})
}

func mockDialFunc(conn *connMock) {
Expand Down Expand Up @@ -640,11 +642,22 @@ func TestConfigAMQPAuthenticationNoDetails(t *testing.T) {
// Write will call writeHandle
type connMock struct {
nextData chan []byte
writeHandle func([]byte) (n int, err error)
closeHandle func() error
writeHandle unsafe.Pointer // expected type: func([]byte) (n int, err error)
closeHandle unsafe.Pointer // expected type: func() error
remaining *bytes.Reader
}

type writeHandler func([]byte) (n int, err error)
type closeHandler func() error

func (c *connMock) setWriteHandler(handler writeHandler) {
atomic.StorePointer(&c.writeHandle, unsafe.Pointer(&handler))
}

func (c *connMock) setCloseHandler(handler closeHandler) {
atomic.StorePointer(&c.closeHandle, unsafe.Pointer(&handler))
}

func (c *connMock) Read(b []byte) (n int, err error) {
if c.remaining == nil {
d := <-c.nextData
Expand All @@ -663,15 +676,17 @@ func (c *connMock) Read(b []byte) (n int, err error) {
}

func (c *connMock) Write(b []byte) (n int, err error) {
if c.writeHandle != nil {
return c.writeHandle(b)
handlerPointer := atomic.LoadPointer(&c.writeHandle)
if handlerPointer != nil {
return (*(*writeHandler)(handlerPointer))(b)
}
return len(b), nil
}

func (c *connMock) Close() error {
if c.closeHandle != nil {
return c.closeHandle()
handlerPointer := atomic.LoadPointer(&c.closeHandle)
if handlerPointer != nil {
return (*(*closeHandler)(handlerPointer))()
}
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions receiver/solacereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
receiver, messagingService, unmarshaller := newReceiver(t)
delay := 5 * time.Millisecond
delay := 50 * time.Millisecond
// Increase delay on windows due to tick granularity
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17197
if runtime.GOOS == "windows" {
Expand Down Expand Up @@ -461,7 +461,12 @@ func TestReceiverFlowControlDelayedRetryInterrupt(t *testing.T) {
func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) {
receiver, messagingService, unmarshaller := newReceiver(t)
// we won't wait 10 seconds since we will interrupt well before
retryInterval := 20 * time.Millisecond
retryInterval := 50 * time.Millisecond
// Increase delay on windows due to tick granularity
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19409
if runtime.GOOS == "windows" {
retryInterval = 500 * time.Millisecond
}
var retryCount int64 = 5
receiver.config.Flow.DelayedRetry.Delay = retryInterval
var err error
Expand Down

0 comments on commit 7f4e051

Please sign in to comment.