diff --git a/receiver/solacereceiver/messaging_service_test.go b/receiver/solacereceiver/messaging_service_test.go index b7d82caad8c3..0e24bb0ce755 100644 --- a/receiver/solacereceiver/messaging_service_test.go +++ b/receiver/solacereceiver/messaging_service_test.go @@ -22,8 +22,10 @@ import ( "net" "reflect" "runtime" + "sync/atomic" "testing" "time" + "unsafe" "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" @@ -283,10 +285,10 @@ func TestAMQPNewClientDialAndCloseCtxTimeoutFailure(t *testing.T) { service, conn := startMockedService(t) closed := false - conn.closeHandle = func() error { + conn.setCloseHandler(func() error { closed = true return nil - } + }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() go func() { @@ -309,10 +311,10 @@ func TestAMQPNewClientDialAndCloseConnFailure(t *testing.T) { mockWriteData(conn, writeData) closed := false - conn.closeHandle = func() error { + conn.setCloseHandler(func() error { closed = true return fmt.Errorf("some error") - } + }) service.close(context.Background()) // expect conn.Close to have been called assert.True(t, closed) @@ -345,13 +347,13 @@ func TestAMQPAcknowledgeMessage(t *testing.T) { writeCalled := make(chan struct{}) // Expected accept from AMQP frame for first received message // "\x00\x00\x00\x1c\x02\x00\x00\x00\x00\x53\x15\xd0\x00\x00\x00\x0c\x00\x00\x00\x05\x41\x43\x40\x41\x00\x53\x24\x45" - conn.writeHandle = func(b []byte) (n int, err error) { + conn.setWriteHandler(func(b []byte) (n int, err error) { // assert that a disposition is written assert.Equal(t, byte(0x15), b[10]) assert.Equal(t, byte(0x24), b[26]) // 0x24 at the 27th byte in this case means accept close(writeCalled) return len(b), nil - } + }) err = service.accept(context.Background(), msg) assert.NoError(t, err) assertChannelClosed(t, writeCalled) @@ -366,13 +368,13 @@ func TestAMQPModifyMessage(t *testing.T) { writeCalled := make(chan struct{}) // Expected modify from AMQP frame for first received message // "\x00\x00\x00\x1c\x02\x00\x00\x00\x00\x53\x15\xd0\x00\x00\x00\x0c\x00\x00\x00\x05\x41\x43\x40\x41\x00\x53\x25\x45" - conn.writeHandle = func(b []byte) (n int, err error) { + conn.setWriteHandler(func(b []byte) (n int, err error) { // assert that a disposition is written assert.Equal(t, byte(0x15), b[10]) assert.Equal(t, byte(0x27), b[26]) // 0x27 at the 27th byte in this case means modify close(writeCalled) return len(b), nil - } + }) err = service.failed(context.Background(), msg) assert.NoError(t, err) select { @@ -420,10 +422,10 @@ func closeMockedAMQPService(t *testing.T, service *amqpMessagingService, conn *c mockWriteData(conn, writeData) closed := false - conn.closeHandle = func() error { + conn.setCloseHandler(func() error { closed = true return nil - } + }) service.close(context.Background()) // expect conn.Close to have been called assert.True(t, closed) @@ -480,7 +482,7 @@ func TestAMQPNewClientDialWithBadAttachResponseExpectingError(t *testing.T) { } func mockWriteData(conn *connMock, data [][]byte, callbacks ...func(sentData, receivedData []byte)) { - conn.writeHandle = func(b []byte) (n int, err error) { + conn.setWriteHandler(func(b []byte) (n int, err error) { var next []byte if len(data) != 0 { next = data[0] @@ -493,7 +495,7 @@ func mockWriteData(conn *connMock, data [][]byte, callbacks ...func(sentData, re conn.nextData <- next } return len(b), nil - } + }) } func mockDialFunc(conn *connMock) { @@ -640,11 +642,22 @@ func TestConfigAMQPAuthenticationNoDetails(t *testing.T) { // Write will call writeHandle type connMock struct { nextData chan []byte - writeHandle func([]byte) (n int, err error) - closeHandle func() error + writeHandle unsafe.Pointer // expected type: func([]byte) (n int, err error) + closeHandle unsafe.Pointer // expected type: func() error remaining *bytes.Reader } +type writeHandler func([]byte) (n int, err error) +type closeHandler func() error + +func (c *connMock) setWriteHandler(handler writeHandler) { + atomic.StorePointer(&c.writeHandle, unsafe.Pointer(&handler)) +} + +func (c *connMock) setCloseHandler(handler closeHandler) { + atomic.StorePointer(&c.closeHandle, unsafe.Pointer(&handler)) +} + func (c *connMock) Read(b []byte) (n int, err error) { if c.remaining == nil { d := <-c.nextData @@ -663,15 +676,17 @@ func (c *connMock) Read(b []byte) (n int, err error) { } func (c *connMock) Write(b []byte) (n int, err error) { - if c.writeHandle != nil { - return c.writeHandle(b) + handlerPointer := atomic.LoadPointer(&c.writeHandle) + if handlerPointer != nil { + return (*(*writeHandler)(handlerPointer))(b) } return len(b), nil } func (c *connMock) Close() error { - if c.closeHandle != nil { - return c.closeHandle() + handlerPointer := atomic.LoadPointer(&c.closeHandle) + if handlerPointer != nil { + return (*(*closeHandler)(handlerPointer))() } return nil } diff --git a/receiver/solacereceiver/receiver_test.go b/receiver/solacereceiver/receiver_test.go index 9561fa30920d..4bfb7626aafe 100644 --- a/receiver/solacereceiver/receiver_test.go +++ b/receiver/solacereceiver/receiver_test.go @@ -352,7 +352,7 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { receiver, messagingService, unmarshaller := newReceiver(t) - delay := 5 * time.Millisecond + delay := 50 * time.Millisecond // Increase delay on windows due to tick granularity // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17197 if runtime.GOOS == "windows" { @@ -461,7 +461,12 @@ func TestReceiverFlowControlDelayedRetryInterrupt(t *testing.T) { func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) { receiver, messagingService, unmarshaller := newReceiver(t) // we won't wait 10 seconds since we will interrupt well before - retryInterval := 20 * time.Millisecond + retryInterval := 50 * time.Millisecond + // Increase delay on windows due to tick granularity + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19409 + if runtime.GOOS == "windows" { + retryInterval = 500 * time.Millisecond + } var retryCount int64 = 5 receiver.config.Flow.DelayedRetry.Delay = retryInterval var err error