Skip to content

Commit

Permalink
Update azeventhubs to latest go-amqp (#20387)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhendrixMSFT authored Mar 29, 2023
1 parent 880fad4 commit b6583f3
Show file tree
Hide file tree
Showing 42 changed files with 2,646 additions and 2,361 deletions.
32 changes: 22 additions & 10 deletions sdk/messaging/azeventhubs/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@ import (
)

func TestConsumerClient_UsingWebSockets(t *testing.T) {
// NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an
// open discussion here:
// https://github.com/nhooyr/websocket/discussions/380
//
// The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake.
// I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local
// package. In this context, since the connection has already shut down, this is harmless.
var expectedWSErr = "failed to close WebSocket: failed to read frame header: EOF"
const (
// NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an
// open discussion here:
// https://github.com/nhooyr/websocket/discussions/380
//
// The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake.
// I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local
// package. In this context, since the connection has already shut down, this is harmless.
expectedWSErr1 = "failed to close WebSocket: failed to read frame header: EOF"

// in addition, the returned error on close doesn't implement net.ErrClosed so we can also see this.
// https://github.com/nhooyr/websocket/issues/286
expectedWSErr2 = "failed to read: WebSocket closed: sent close frame: status = StatusNormalClosure and reason = \"\""
)

newWebSocketConnFn := func(ctx context.Context, args azeventhubs.WebSocketConnParams) (net.Conn, error) {
opts := &websocket.DialOptions{
Expand All @@ -53,7 +59,10 @@ func TestConsumerClient_UsingWebSockets(t *testing.T) {

defer func() {
err := producerClient.Close(context.Background())
require.EqualError(t, err, expectedWSErr)
require.Error(t, err)
if es := err.Error(); es != expectedWSErr1 && es != expectedWSErr2 {
t.Fatalf("unexpected error %v", err)
}
}()

partProps, err := producerClient.GetPartitionProperties(context.Background(), "0", nil)
Expand All @@ -79,7 +88,10 @@ func TestConsumerClient_UsingWebSockets(t *testing.T) {

defer func() {
err := consumerClient.Close(context.Background())
require.EqualError(t, err, expectedWSErr)
require.Error(t, err)
if es := err.Error(); es != expectedWSErr1 && es != expectedWSErr2 {
t.Fatalf("unexpected error %v", err)
}
}()

partClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
Expand Down
14 changes: 7 additions & 7 deletions sdk/messaging/azeventhubs/internal/amqp_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ type FakeAMQPReceiver struct {
amqpwrap.AMQPReceiverCloser

// ActiveCredits are incremented and decremented by IssueCredit and Receive.
ActiveCredits uint32
ActiveCredits int32

// IssuedCredit just accumulates, so we can get an idea of how many credits we issued overall.
IssuedCredit []uint32

// CreditsSetFromOptions is similar to issuedCredit, but only tracks credits added in via the LinkOptions.Credit
// field (ie, enabling prefetch).
CreditsSetFromOptions uint32
CreditsSetFromOptions int32

// ManualCreditsSetFromOptions is the value of the LinkOptions.ManualCredits value.
ManualCreditsSetFromOptions bool
Expand Down Expand Up @@ -71,10 +71,10 @@ func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQ

func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error) {
sess.NS.NewReceiverCalled++
sess.NS.Receiver.ManualCreditsSetFromOptions = opts.ManualCredits
sess.NS.Receiver.ManualCreditsSetFromOptions = opts.Credit == -1
sess.NS.Receiver.CreditsSetFromOptions = opts.Credit

if !opts.ManualCredits {
if opts.Credit > 0 {
sess.NS.Receiver.ActiveCredits = opts.Credit
}

Expand All @@ -92,11 +92,11 @@ func (sess *FakeAMQPSession) Close(ctx context.Context) error {
}

func (r *FakeAMQPReceiver) Credits() uint32 {
return r.ActiveCredits
return uint32(r.ActiveCredits)
}

func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error {
r.ActiveCredits += credit
r.ActiveCredits += int32(credit)
r.IssuedCredit = append(r.IssuedCredit, credit)
return nil
}
Expand All @@ -105,7 +105,7 @@ func (r *FakeAMQPReceiver) LinkName() string {
return r.NameForLink
}

func (r *FakeAMQPReceiver) Receive(ctx context.Context) (*amqp.Message, error) {
func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
if len(r.Messages) > 0 {
r.ActiveCredits--
m := r.Messages[0]
Expand Down
10 changes: 5 additions & 5 deletions sdk/messaging/azeventhubs/internal/amqpwrap/amqpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// AMQPReceiver is implemented by *amqp.Receiver
type AMQPReceiver interface {
IssueCredit(credit uint32) error
Receive(ctx context.Context) (*amqp.Message, error)
Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
Prefetched() *amqp.Message

// settlement functions
Expand All @@ -40,7 +40,7 @@ type AMQPReceiverCloser interface {

// AMQPSender is implemented by *amqp.Sender
type AMQPSender interface {
Send(ctx context.Context, msg *amqp.Message) error
Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
MaxMessageSize() uint64
LinkName() string
}
Expand Down Expand Up @@ -84,7 +84,7 @@ type RPCResponse struct {
// It exists only so we can return AMQPSession, which itself only exists so we can
// return interfaces for AMQPSender and AMQPReceiver from AMQPSession.
type AMQPClientWrapper struct {
Inner *amqp.Client
Inner *amqp.Conn
}

func (w *AMQPClientWrapper) Close() error {
Expand Down Expand Up @@ -150,8 +150,8 @@ func (rw *AMQPReceiverWrapper) IssueCredit(credit uint32) error {
return err
}

func (rw *AMQPReceiverWrapper) Receive(ctx context.Context) (*amqp.Message, error) {
message, err := rw.inner.Receive(ctx)
func (rw *AMQPReceiverWrapper) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
message, err := rw.inner.Receive(ctx, o)

if err != nil {
return nil, err
Expand Down
49 changes: 25 additions & 24 deletions sdk/messaging/azeventhubs/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TransformError(err error) error {
// there are a few errors that all boil down to "bad creds or unauthorized"
var amqpErr *amqp.Error

if errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrorUnauthorizedAccess {
if errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondUnauthorizedAccess {
return exported.NewError(exported.ErrorCodeUnauthorizedAccess, err)
}

Expand Down Expand Up @@ -97,7 +97,7 @@ func IsQuickRecoveryError(err error) bool {
return false
}

var de *amqp.DetachError
var de *amqp.LinkError
return errors.As(err, &de)
}

Expand All @@ -122,30 +122,30 @@ func IsDrainingError(err error) bool {
return strings.Contains(err.Error(), "link is currently draining")
}

const errorConditionLockLost = amqp.ErrorCondition("com.microsoft:message-lock-lost")
const errorConditionLockLost = amqp.ErrCond("com.microsoft:message-lock-lost")

var amqpConditionsToRecoveryKind = map[amqp.ErrorCondition]RecoveryKind{
var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{
// no recovery needed, these are temporary errors.
amqp.ErrorCondition("com.microsoft:server-busy"): RecoveryKindNone,
amqp.ErrorCondition("com.microsoft:timeout"): RecoveryKindNone,
amqp.ErrorCondition("com.microsoft:operation-cancelled"): RecoveryKindNone,
amqp.ErrCond("com.microsoft:server-busy"): RecoveryKindNone,
amqp.ErrCond("com.microsoft:timeout"): RecoveryKindNone,
amqp.ErrCond("com.microsoft:operation-cancelled"): RecoveryKindNone,

// Link recovery needed
amqp.ErrorDetachForced: RecoveryKindLink, // "amqp:link:detach-forced"
amqp.ErrorTransferLimitExceeded: RecoveryKindLink, // "amqp:link:transfer-limit-exceeded"
amqp.ErrCondDetachForced: RecoveryKindLink, // "amqp:link:detach-forced"
amqp.ErrCondTransferLimitExceeded: RecoveryKindLink, // "amqp:link:transfer-limit-exceeded"

// Connection recovery needed
amqp.ErrorConnectionForced: RecoveryKindConn, // "amqp:connection:forced"
amqp.ErrorInternalError: RecoveryKindConn, // "amqp:internal-error"
amqp.ErrCondConnectionForced: RecoveryKindConn, // "amqp:connection:forced"
amqp.ErrCondInternalError: RecoveryKindConn, // "amqp:internal-error"

// No recovery possible - this operation is non retriable.
amqp.ErrorMessageSizeExceeded: RecoveryKindFatal, // "amqp:link:message-size-exceeded"
amqp.ErrorUnauthorizedAccess: RecoveryKindFatal, // creds are bad
amqp.ErrorNotFound: RecoveryKindFatal, // "amqp:not-found"
amqp.ErrorNotAllowed: RecoveryKindFatal, // "amqp:not-allowed"
amqp.ErrorCondition("com.microsoft:entity-disabled"): RecoveryKindFatal, // entity is disabled in the portal
amqp.ErrorCondition("com.microsoft:session-cannot-be-locked"): RecoveryKindFatal,
errorConditionLockLost: RecoveryKindFatal,
amqp.ErrCondMessageSizeExceeded: RecoveryKindFatal, // "amqp:link:message-size-exceeded"
amqp.ErrCondUnauthorizedAccess: RecoveryKindFatal, // creds are bad
amqp.ErrCondNotFound: RecoveryKindFatal, // "amqp:not-found"
amqp.ErrCondNotAllowed: RecoveryKindFatal, // "amqp:not-allowed"
amqp.ErrCond("com.microsoft:entity-disabled"): RecoveryKindFatal, // entity is disabled in the portal
amqp.ErrCond("com.microsoft:session-cannot-be-locked"): RecoveryKindFatal,
errorConditionLockLost: RecoveryKindFatal,
}

// GetRecoveryKind determines the recovery type for non-session based links.
Expand Down Expand Up @@ -192,15 +192,16 @@ func GetRecoveryKind(err error) RecoveryKind {
}

// check the "special" AMQP errors that aren't condition-based.
if errors.Is(err, amqp.ErrLinkClosed) || IsQuickRecoveryError(err) {
if IsQuickRecoveryError(err) {
return RecoveryKindLink
}

var connErr *amqp.ConnectionError
var connErr *amqp.ConnError
var sessionErr *amqp.SessionError

if errors.As(err, &connErr) ||
// session closures appear to leak through when the connection itself is going down.
errors.Is(err, amqp.ErrSessionClosed) {
errors.As(err, &sessionErr) {
return RecoveryKindConn
}

Expand Down Expand Up @@ -333,18 +334,18 @@ func IsNotAllowedError(err error) bool {
var e *amqp.Error

return errors.As(err, &e) &&
e.Condition == amqp.ErrorNotAllowed
e.Condition == amqp.ErrCondNotAllowed
}

func (e ErrConnectionClosed) Error() string {
return fmt.Sprintf("the connection has been closed: %s", string(e))
}

func IsOwnershipLostError(err error) bool {
var de *amqp.DetachError
var de *amqp.LinkError

if errors.As(err, &de) {
return de.RemoteError != nil && de.RemoteError.Condition == "amqp:link:stolen"
return de.RemoteErr != nil && de.RemoteErr.Condition == "amqp:link:stolen"
}

return false
Expand Down
26 changes: 13 additions & 13 deletions sdk/messaging/azeventhubs/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
)

func TestOwnershipLost(t *testing.T) {
detachErr := &amqp.DetachError{
RemoteError: &amqp.Error{
Condition: amqp.ErrorCondition("amqp:link:stolen"),
detachErr := &amqp.LinkError{
RemoteErr: &amqp.Error{
Condition: amqp.ErrCond("amqp:link:stolen"),
},
}

Expand All @@ -32,15 +32,15 @@ func TestOwnershipLost(t *testing.T) {
require.ErrorAs(t, transformedErr, &err)
require.Equal(t, exported.ErrorCodeOwnershipLost, err.Code)

require.False(t, IsOwnershipLostError(&amqp.DetachError{}))
require.False(t, IsOwnershipLostError(&amqp.ConnectionError{}))
require.False(t, IsOwnershipLostError(&amqp.LinkError{}))
require.False(t, IsOwnershipLostError(&amqp.ConnError{}))
require.False(t, IsOwnershipLostError(errors.New("definitely not an ownership lost error")))
}

func TestGetRecoveryKind(t *testing.T) {
require.Equal(t, GetRecoveryKind(nil), RecoveryKindNone)
require.Equal(t, GetRecoveryKind(errConnResetNeeded), RecoveryKindConn)
require.Equal(t, GetRecoveryKind(&amqp.DetachError{}), RecoveryKindLink)
require.Equal(t, GetRecoveryKind(&amqp.LinkError{}), RecoveryKindLink)
require.Equal(t, GetRecoveryKind(context.Canceled), RecoveryKindFatal)
require.Equal(t, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusUnauthorized}}), RecoveryKindFatal)
require.Equal(t, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusNotFound}}), RecoveryKindFatal)
Expand All @@ -49,9 +49,9 @@ func TestGetRecoveryKind(t *testing.T) {
func Test_TransformError(t *testing.T) {
var asExportedErr *exported.Error

err := TransformError(&amqp.DetachError{
RemoteError: &amqp.Error{
Condition: amqp.ErrorCondition("amqp:link:stolen"),
err := TransformError(&amqp.LinkError{
RemoteErr: &amqp.Error{
Condition: amqp.ErrCond("amqp:link:stolen"),
},
})
require.ErrorAs(t, err, &asExportedErr)
Expand All @@ -61,7 +61,7 @@ func Test_TransformError(t *testing.T) {
require.ErrorAs(t, err, &asExportedErr)
require.Equal(t, exported.ErrorCodeUnauthorizedAccess, asExportedErr.Code)

err = TransformError(&amqp.Error{Condition: amqp.ErrorUnauthorizedAccess})
err = TransformError(&amqp.Error{Condition: amqp.ErrCondUnauthorizedAccess})
require.ErrorAs(t, err, &asExportedErr)
require.Equal(t, exported.ErrorCodeUnauthorizedAccess, asExportedErr.Code)

Expand All @@ -75,14 +75,14 @@ func Test_TransformError(t *testing.T) {
require.False(t, errors.As(err, &asExportedErr))

// sanity check, an RPCError but it's not a azservicebus.Code type error.
err = TransformError(&amqp.Error{Condition: amqp.ErrorNotFound})
err = TransformError(&amqp.Error{Condition: amqp.ErrCondNotFound})
require.False(t, errors.As(err, &asExportedErr))

err = TransformError(amqp.ErrLinkClosed)
err = TransformError(&amqp.LinkError{})
require.ErrorAs(t, err, &asExportedErr)
require.Equal(t, exported.ErrorCodeConnectionLost, asExportedErr.Code)

err = TransformError(&amqp.ConnectionError{})
err = TransformError(&amqp.ConnError{})
require.ErrorAs(t, err, &asExportedErr)
require.Equal(t, exported.ErrorCodeConnectionLost, asExportedErr.Code)

Expand Down
Loading

0 comments on commit b6583f3

Please sign in to comment.