Skip to content

Commit

Permalink
Update azservicebus to latest go-amqp (#20386)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhendrixMSFT authored Mar 29, 2023
1 parent bcbe34f commit 880fad4
Show file tree
Hide file tree
Showing 53 changed files with 2,756 additions and 2,476 deletions.
28 changes: 19 additions & 9 deletions sdk/messaging/azservicebus/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,25 @@ func TestNewClientWithWebsockets(t *testing.T) {
}, nil)
require.NoError(t, err)

// NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an
// open discussion here:
// https://github.com/nhooyr/websocket/discussions/380
//
// The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake.
// I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local
// package. In this context, since the connection has already shut down, this is harmless.
var expectedErr = "failed to close WebSocket: failed to read frame header: EOF"
require.EqualError(t, client.Close(context.Background()), expectedErr)
const (
// NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an
// open discussion here:
// https://github.com/nhooyr/websocket/discussions/380
//
// The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake.
// I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local
// package. In this context, since the connection has already shut down, this is harmless.
expectedWSErr1 = "failed to close WebSocket: failed to read frame header: EOF"

// in addition, the returned error on close doesn't implement net.ErrClosed so we can also see this.
// https://github.com/nhooyr/websocket/issues/286
expectedWSErr2 = "failed to read: WebSocket closed: sent close frame: status = StatusNormalClosure and reason = \"\""
)
err = client.Close(context.Background())
require.Error(t, err)
if es := err.Error(); es != expectedWSErr1 && es != expectedWSErr2 {
t.Fatalf("unexpected error %v", err)
}
}

func TestNewClientUsingSharedAccessSignature(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/amqpLinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (l *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operatio
lastID = linksWithVersion.ID

if err := fn(ctx, linksWithVersion, args); err != nil {
if args.I == 0 && !didQuickRetry && IsDetachError(err) {
if args.I == 0 && !didQuickRetry && IsLinkError(err) {
// go-amqp will asynchronously handle detaches. This means errors that you get
// back from Send(), for instance, can actually be from much earlier in time
// depending on the last time you called into Send().
Expand Down
91 changes: 44 additions & 47 deletions sdk/messaging/azservicebus/internal/amqpLinks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func assertFailedLinks[T error, T2 error](t *testing.T, lwid *LinksWithID, expec
Data: [][]byte{
{0},
},
})
}, nil)

require.True(t, errors.Is(err, expectedErr) || errors.As(err, &expectedErr))
require.ErrorIs(t, err, expectedErr)

_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
require.True(t, errors.Is(err, expectedRPCError) || errors.As(err, &expectedRPCError))

msg, err := lwid.Receiver.Receive(context.TODO())
require.ErrorIs(t, err, expectedErr)
msg, err := lwid.Receiver.Receive(context.TODO(), nil)
require.True(t, errors.Is(err, expectedErr) || errors.As(err, &expectedErr))
require.ErrorIs(t, err, expectedErr)
require.Nil(t, msg)

}
Expand All @@ -60,14 +60,14 @@ func assertLinks(t *testing.T, lwid *LinksWithID) {
Data: [][]byte{
{0},
},
})
}, nil)
require.NoError(t, err)

_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
require.NoError(t, err)

require.NoError(t, lwid.Receiver.IssueCredit(1))
msg, err := lwid.Receiver.Receive(context.TODO())
msg, err := lwid.Receiver.Receive(context.TODO(), nil)
require.NoError(t, err)
require.NotNil(t, msg)
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestAMQPLinksLive(t *testing.T) {
}()

require.EqualValues(t, 0, createLinksCalled)
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{}))
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnError{}))
require.EqualValues(t, 1, createLinksCalled)

lwr, err := links.Get(context.Background())
Expand All @@ -142,10 +142,10 @@ func TestAMQPLinksLive(t *testing.T) {
require.NoError(t, amqpClient.Close())

// all the links are dead because the connection is dead.
assertFailedLinks(t, lwr, &amqp.ConnectionError{}, &amqp.ConnectionError{})
assertFailedLinks(t, lwr, &amqp.ConnError{}, &amqp.ConnError{})

// now we'll recover, which should recreate everything
require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.ConnectionError{}))
require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.ConnError{}))
require.EqualValues(t, 2, createLinksCalled)

lwr, err = links.Get(context.Background())
Expand All @@ -160,12 +160,12 @@ func TestAMQPLinksLive(t *testing.T) {
_ = actualLinks.Receiver.Close(context.Background())
_ = actualLinks.RPCLink.Close(context.Background())

assertFailedLinks(t, lwr, amqp.ErrLinkClosed, context.Canceled)
assertFailedLinks(t, lwr, &amqp.LinkError{}, context.Canceled)

lwr, err = links.Get(context.Background())
require.NoError(t, err)

require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, amqp.ErrLinkClosed))
require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.LinkError{}))
require.EqualValues(t, 3, createLinksCalled)

lwr, err = links.Get(context.Background())
Expand Down Expand Up @@ -253,13 +253,13 @@ func TestAMQPLinksLiveRecoverLink(t *testing.T) {
}()

require.EqualValues(t, 0, createLinksCalled)
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{}))
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnError{}))
require.EqualValues(t, 1, createLinksCalled)

lwr, err := links.Get(context.Background())
require.NoError(t, err)

require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, amqp.ErrLinkClosed))
require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.LinkError{}))
require.EqualValues(t, 2, createLinksCalled)
}

Expand Down Expand Up @@ -296,7 +296,7 @@ func TestAMQPLinksLiveRace(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{})
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnError{})
require.NoError(t, err)
}()
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestAMQPLinksLiveRaceLink(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.DetachError{})
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.LinkError{})
require.NoError(t, err)
}()
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestAMQPLinksRetry(t *testing.T) {

err = links.Retry(context.Background(), log.Event("NotUsed"), "NotUsed", func(ctx context.Context, lwid *LinksWithID, args *utils.RetryFnArgs) error {
// force recoveries
return &amqp.ConnectionError{}
return &amqp.ConnError{}
}, exported.RetryOptions{
MaxRetries: 2,
// note: omitting MaxRetries just to give a sanity check that
Expand All @@ -397,7 +397,7 @@ func TestAMQPLinksRetry(t *testing.T) {
MaxRetryDelay: time.Millisecond,
})

var connErr *amqp.ConnectionError
var connErr *amqp.ConnError
require.ErrorAs(t, err, &connErr)
require.EqualValues(t, 3, createLinksCalled)
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestAMQPLinksMultipleWithSameConnection(t *testing.T) {

go func() {
defer wg.Done()
err = links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.DetachError{})
err = links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.LinkError{})
require.NoError(t, err)
}()

Expand All @@ -469,7 +469,7 @@ func TestAMQPLinksMultipleWithSameConnection(t *testing.T) {
go func() {
defer wg.Done()

err := links2.RecoverIfNeeded(context.Background(), lwr2.ID, &amqp.DetachError{})
err := links2.RecoverIfNeeded(context.Background(), lwr2.ID, &amqp.LinkError{})
require.NoError(t, err)
}()

Expand Down Expand Up @@ -556,7 +556,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
_, err := links.Get(context.Background())
require.NoError(t, err)

rk := links.CloseIfNeeded(context.Background(), amqp.ErrLinkClosed)
rk := links.CloseIfNeeded(context.Background(), &amqp.LinkError{})
require.Equal(t, RecoveryKindLink, rk)
require.Equal(t, 1, receiver.Closed)
require.Equal(t, 1, sender.Closed)
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
_, err := links.Get(context.Background())
require.NoError(t, err)

rk := links.CloseIfNeeded(context.Background(), &amqp.ConnectionError{})
rk := links.CloseIfNeeded(context.Background(), &amqp.ConnError{})
require.Equal(t, RecoveryKindConn, rk)
require.Equal(t, 1, receiver.Closed)
require.Equal(t, 1, sender.Closed)
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) {
{Err: nil, Attempts: []int32{0}},

// connection related or unknown failures happen, all attempts exhausted
{Err: &amqp.ConnectionError{}, Attempts: []int32{0, 1, 2, 3}},
{Err: &amqp.ConnError{}, Attempts: []int32{0, 1, 2, 3}},
{Err: errors.New("unknown error"), Attempts: []int32{0, 1, 2, 3}},

// fatal errors don't retry at all.
Expand All @@ -654,7 +654,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) {
// retry for attempt '0', to avoid sleeping if the error was stale. This mostly happens
// in situations like sending, where you might have long times in between sends and your
// link is closed due to idling.
{Err: &amqp.DetachError{}, Attempts: []int32{0, 0, 1, 2, 3}, ExpectReset: true},
{Err: &amqp.LinkError{}, Attempts: []int32{0, 0, 1, 2, 3}, ExpectReset: true},
}

for _, testData := range tests {
Expand Down Expand Up @@ -733,13 +733,13 @@ func TestAMQPLinks_Logging(t *testing.T) {
endCapture := test.CaptureLogsForTest()
defer endCapture()

err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.DetachError{})
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.LinkError{})
require.NoError(t, err)

messages := endCapture()

require.Equal(t, []string{
"[azsb.Conn] Recovering link for error link detached, reason: *Error(nil)",
"[azsb.Conn] Recovering link for error amqp: link closed",
"[azsb.Conn] Recovering link only",
"[azsb.Conn] Recovered links",
}, messages)
Expand All @@ -765,7 +765,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
endCapture := test.CaptureLogsForTest()
defer endCapture()

err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{})
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnError{})
require.NoError(t, err)

messages := endCapture()
Expand Down Expand Up @@ -806,14 +806,14 @@ func TestAMQPLinksCreditTracking(t *testing.T) {
t.Run("credits are decremented when messages are amqpReceiver.Receive()'d", func(t *testing.T) {
err = lwr.Sender.Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("Received")},
})
}, nil)
require.NoError(t, err)

err = lwr.Receiver.IssueCredit(1)
require.NoError(t, err)
require.Equal(t, uint32(1), lwr.Receiver.Credits())

message, err := lwr.Receiver.Receive(context.Background())
message, err := lwr.Receiver.Receive(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, [][]byte{[]byte("Received")}, message.Data)
require.Equal(t, uint32(0), lwr.Receiver.Credits())
Expand All @@ -825,7 +825,7 @@ func TestAMQPLinksCreditTracking(t *testing.T) {
t.Run("credits are decremented when messages are amqpReceiver.Prefetched()", func(t *testing.T) {
err = lwr.Sender.Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("Received")},
})
}, nil)
require.NoError(t, err)

err = lwr.Receiver.IssueCredit(1)
Expand Down Expand Up @@ -859,7 +859,7 @@ func TestAMQPLinksCreditTracking(t *testing.T) {
// this won't touch the credit since nothing is actually received.
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err = lwr.Receiver.Receive(ctx)
_, err = lwr.Receiver.Receive(ctx, nil)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, uint32(0), lwr.Receiver.Credits())

Expand Down Expand Up @@ -894,10 +894,9 @@ func TestAMQPCloseLinkTimeout_Receiver_ExternalCancellation(t *testing.T) {

createLinkFn := func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) {
receiver, err := session.NewReceiver(ctx, "entity path", &amqp.ReceiverOptions{
SettlementMode: amqp.ModeFirst.Ptr(),
ManualCredits: true,
Credit: 2048,
RequestedSenderSettleMode: amqp.ModeSettled.Ptr(),
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
Credit: -1,
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
})

return nil, receiver, err
Expand Down Expand Up @@ -931,7 +930,7 @@ func TestAMQPCloseLinkTimeout_Receiver_ExternalCancellation(t *testing.T) {

// now close the links. We've made it so the receiver will cancel the context, as if the user
// interrupted the close. This will end up closing the connection as well.
rk := links.CloseIfNeeded(userCtx, &amqp.DetachError{})
rk := links.CloseIfNeeded(userCtx, &amqp.LinkError{})

require.Contains(t, getLogs(), "[azsb.Conn] Connection closed instead. Link closing has timed out.")

Expand Down Expand Up @@ -992,10 +991,9 @@ func TestAMQPCloseLinkTimeout_Receiver_RecoverIfNeeded(t *testing.T) {

createLinkFn := func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) {
receiver, err := session.NewReceiver(ctx, "entity path", &amqp.ReceiverOptions{
SettlementMode: amqp.ModeFirst.Ptr(),
ManualCredits: true,
Credit: 2048,
RequestedSenderSettleMode: amqp.ModeSettled.Ptr(),
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
Credit: -1,
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
})

return nil, receiver, err
Expand Down Expand Up @@ -1029,7 +1027,7 @@ func TestAMQPCloseLinkTimeout_Receiver_RecoverIfNeeded(t *testing.T) {

// now close the links. We've made it so the receiver will cancel the context, as if the user
// interrupted the close. This will end up closing the connection as well.
recoveryErr := links.RecoverIfNeeded(context.WithValue(userCtx, keyType("close"), "cancel"), lwid.ID, &amqp.DetachError{})
recoveryErr := links.RecoverIfNeeded(context.WithValue(userCtx, keyType("close"), "cancel"), lwid.ID, &amqp.LinkError{})

require.Contains(t, getLogs(), "[azsb.Conn] Connection reset for recovery instead of link. Link closing has timed out.")
require.ErrorIs(t, recoveryErr, errConnResetNeeded)
Expand All @@ -1039,7 +1037,7 @@ func TestAMQPCloseLinkTimeout_Receiver_RecoverIfNeeded(t *testing.T) {

nonCancelledCtx, cancel := context.WithCancel(context.Background())
defer cancel()
recoveryErr = links.RecoverIfNeeded(nonCancelledCtx, lwid.ID, &amqp.DetachError{})
recoveryErr = links.RecoverIfNeeded(nonCancelledCtx, lwid.ID, &amqp.LinkError{})

require.NotContains(t, getLogs(), "[azsb.Conn] Connection reset for recovery instead of link. Link closing has timed out.")
require.NoError(t, recoveryErr)
Expand Down Expand Up @@ -1076,7 +1074,7 @@ func TestAMQPCloseLinkTimeout_Sender(t *testing.T) {
}

createLinkFn := func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) {
sender, err := session.NewSender(ctx, "entity path", &amqp.SenderOptions{SettlementMode: amqp.ModeMixed.Ptr(), RequestedReceiverSettleMode: amqp.ModeFirst.Ptr()})
sender, err := session.NewSender(ctx, "entity path", &amqp.SenderOptions{SettlementMode: amqp.SenderSettleModeMixed.Ptr(), RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr()})
return sender, nil, err
}

Expand Down Expand Up @@ -1106,7 +1104,7 @@ func TestAMQPCloseLinkTimeout_Sender(t *testing.T) {

// now close the links. We've made it so the receiver will cancel the context, as if the user
// interrupted the close. This will end up closing the connection as well.
rk := links.CloseIfNeeded(userCtx, &amqp.DetachError{})
rk := links.CloseIfNeeded(userCtx, &amqp.LinkError{})

require.Contains(t, getLogs(), "[azsb.Conn] Connection closed instead. Link closing has timed out.")

Expand Down Expand Up @@ -1164,9 +1162,8 @@ func newAMQPLinksForTest(t *testing.T, mockDataOptions emulation.MockDataOptions
// we use when we create them with the azservicebus.Receiver/Sender.
func newLinksForAMQPLinksTest(entityPath string, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) {
receiverOpts := &amqp.ReceiverOptions{
SettlementMode: amqp.ModeSecond.Ptr(),
ManualCredits: true,
Credit: 1000,
SettlementMode: amqp.ReceiverSettleModeSecond.Ptr(),
Credit: -1,
}

receiver, err := session.NewReceiver(context.Background(), entityPath, receiverOpts)
Expand All @@ -1179,8 +1176,8 @@ func newLinksForAMQPLinksTest(entityPath string, session amqpwrap.AMQPSession) (
context.Background(),
entityPath,
&amqp.SenderOptions{
SettlementMode: amqp.ModeMixed.Ptr(),
RequestedReceiverSettleMode: amqp.ModeFirst.Ptr(),
SettlementMode: amqp.SenderSettleModeMixed.Ptr(),
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/amqp_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *FakeAMQPReceiver) Prefetched() *amqp.Message {

// Receive returns the next result from ReceiveResults or, if the ReceiveResults
// is empty, will block on ctx.Done().
func (r *FakeAMQPReceiver) Receive(ctx context.Context) (*amqp.Message, error) {
func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
r.ReceiveCalled++

select {
Expand Down
Loading

0 comments on commit 880fad4

Please sign in to comment.