Skip to content

Commit

Permalink
[azeventhubs,azservicebus] More recovery fixes (Azure#20528)
Browse files Browse the repository at this point in the history
Fixes for both EH and SB in recovery and also in how some errors are surfaced.

The main changes:
- go-amqp no longer handles sessions or sender/receiver creations being cancelled, so that has shifted into SB and EH. If NewSession, NewReceiver or NewSender or (Session|Receiver|Sender).Close() are cancelled then we consider that a connection level issue and reset it immediately (or at the next recovery point)
- Timeouts have been added around closing of old links. Combined with fixes from go-amqp we shouldn't have hangs on close.
- Connection-level clients (ProducerClient, ConsumerClient and azservicebus.Client) no longer return errors if the failure is in amqp.Conn.Close(). The errors were misleading at best or useless since the connection they applied to had been closed. We do log it, but it's no longer returned in the API.
  • Loading branch information
richardpark-msft authored Apr 1, 2023
1 parent d0dc9e9 commit a6b6039
Show file tree
Hide file tree
Showing 24 changed files with 2,331 additions and 585 deletions.
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
### Bugs Fixed

- Authentication errors could cause unnecessary retries, making calls taking longer to fail. (PR#20450)
- Recovery now includes internal timeouts and also handles restarting a connection if AMQP primitives aren't closed cleanly.
- Latest go-amqp changes have been merged in with fixes for robustness.

### Other Changes

Expand Down
31 changes: 2 additions & 29 deletions sdk/messaging/azeventhubs/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@ import (
)

func TestConsumerClient_UsingWebSockets(t *testing.T) {
const (
// NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an
// open discussion here:
// https://github.com/nhooyr/websocket/discussions/380
//
// The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake.
// I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local
// package. In this context, since the connection has already shut down, this is harmless.
expectedWSErr1 = "failed to close WebSocket: failed to read frame header: EOF"

// in addition, the returned error on close doesn't implement net.ErrClosed so we can also see this.
// https://github.com/nhooyr/websocket/issues/286
expectedWSErr2 = "failed to read: WebSocket closed: sent close frame: status = StatusNormalClosure and reason = \"\""
)

newWebSocketConnFn := func(ctx context.Context, args azeventhubs.WebSocketConnParams) (net.Conn, error) {
opts := &websocket.DialOptions{
Subprotocols: []string{"amqp"},
Expand All @@ -57,13 +42,7 @@ func TestConsumerClient_UsingWebSockets(t *testing.T) {
})
require.NoError(t, err)

defer func() {
err := producerClient.Close(context.Background())
require.Error(t, err)
if es := err.Error(); es != expectedWSErr1 && es != expectedWSErr2 {
t.Fatalf("unexpected error %v", err)
}
}()
defer test.RequireClose(t, producerClient)

partProps, err := producerClient.GetPartitionProperties(context.Background(), "0", nil)
require.NoError(t, err)
Expand All @@ -86,13 +65,7 @@ func TestConsumerClient_UsingWebSockets(t *testing.T) {
})
require.NoError(t, err)

defer func() {
err := consumerClient.Close(context.Background())
require.Error(t, err)
if es := err.Error(); es != expectedWSErr1 && es != expectedWSErr2 {
t.Fatalf("unexpected error %v", err)
}
}()
defer test.RequireClose(t, consumerClient)

partClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: getStartPosition(partProps),
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azeventhubs/internal/cbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie

token, err := provider.GetToken(audience)
if err != nil {
azlog.Writef(exported.EventAuth, "Failed to get token from provider")
azlog.Writef(exported.EventAuth, "Failed to get token from provider: %s", err)
return closeLink(ctx, err)
}

Expand All @@ -77,7 +77,7 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie
}

if _, err := link.RPC(ctx, msg); err != nil {
azlog.Writef(exported.EventAuth, "Failed to send/receive RPC message")
azlog.Writef(exported.EventAuth, "Failed to send/receive RPC message: %s", err)
return closeLink(ctx, err)
}

Expand Down
20 changes: 13 additions & 7 deletions sdk/messaging/azeventhubs/internal/links.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,22 @@ func (l *Links[LinkT]) RecoverIfNeeded(ctx context.Context, partitionID string,
ctx, cancel := l.contextWithTimeoutFn(ctx, defaultCloseTimeout)
defer cancel()

err := l.closePartitionLinkIfMatch(ctx, partitionID, lwid.Link.LinkName())
if err := l.closePartitionLinkIfMatch(ctx, partitionID, lwid.Link.LinkName()); err != nil {
azlog.Writef(exported.EventConn, "(%s) Error when cleaning up old link for link recovery: %s", lwid.String(), err)

if err != nil {
if rk := GetRecoveryKind(err); rk == RecoveryKindConn {
if GetRecoveryKind(err) == RecoveryKindConn {
log.Writef(exported.EventConn, "Upgrading to connection reset for recovery instead of link")

if err := l.ns.Recover(ctx, lwid.ConnID); err != nil {
log.Writef(exported.EventConn, "failed to recover connection: %s", err.Error())

// we still need the next recovery to attempt a connection level recovery
return amqpwrap.ErrConnResetNeeded
}
} else {
log.Writef(exported.EventConn, "failed to recreate link: %s", err.Error())
return err
}

// we don't need to propagate this error - it'll just be the link detach error or whatever
// caused the link to detach (for instance, if the Event Hub itself has been Disabled).
azlog.Writef(exported.EventConn, "(%s) Error when cleaning up old link for link recovery: %s", lwid.String(), err)
}

return nil
Expand Down
192 changes: 192 additions & 0 deletions sdk/messaging/azeventhubs/internal/links_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal
import (
"context"
"fmt"
"net"
"testing"
"time"

Expand Down Expand Up @@ -65,3 +66,194 @@ func TestLinksCBSLinkStillOpen(t *testing.T) {
require.NoError(t, err)
require.Equal(t, oldConnID+1, lwid.ConnID, "Connection gets incremented since it had to be reset")
}

func TestLinksRecoverLinkWithConnectionFailure(t *testing.T) {
ns, links := newLinksForTest(t)
defer test.RequireClose(t, links)
defer test.RequireNSClose(t, ns)

oldLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

// cause a connection level failure by closing the connection out from underneath
// this.
origConn, _, err := ns.GetAMQPClientImpl(context.Background())
require.NoError(t, err)
err = origConn.Close()
require.NoError(t, err)

err = oldLWID.Link.Send(context.Background(), &amqp.Message{}, nil)
require.Error(t, err)
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))

// now recover like normal
err = links.RecoverIfNeeded(context.Background(), "0", oldLWID, err)
require.NoError(t, err)

newLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

requireNewLinkNewConn(t, oldLWID, newLWID)

err = newLWID.Link.Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
require.NoError(t, err)
}

// TestLinksRecoverLinkWithConnectionFailureAndExpiredContext checks that we're able to recover
// after a "partial" recovery, where the user or the passed in context was already cancelled. The
// recovery, in those cases, should leave us in a state that the next call to GetLinks()
// will reinstantiate everything.
func TestLinksRecoverLinkWithConnectionFailureAndExpiredContext(t *testing.T) {
ns, links := newLinksForTest(t)
defer test.RequireClose(t, links)
defer test.RequireNSClose(t, ns)

oldLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

// cause a connection level failure by closing the connection out from underneath
// this.
origConn, _, err := ns.GetAMQPClientImpl(context.Background())
require.NoError(t, err)
err = origConn.Close()
require.NoError(t, err)

err = oldLWID.Link.Send(context.Background(), &amqp.Message{}, nil)
require.Error(t, err)
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))

// Try to recover, but using an expired context. We'll get a network error (not enough time to resolve or
// create a connection), which would normally be a connection level recovery event.
cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()

err = links.RecoverIfNeeded(cancelledCtx, "0", oldLWID, err)
var netErr net.Error
require.ErrorAs(t, err, &netErr)

// now recover like normal
err = links.RecoverIfNeeded(context.Background(), "0", oldLWID, err)
require.NoError(t, err)

newLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

requireNewLinkNewConn(t, oldLWID, newLWID)

err = newLWID.Link.Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
require.NoError(t, err)
}

func TestLinkFailureUpgradedToConnectionError(t *testing.T) {
ns, links := newLinksForTest(t)
defer test.RequireClose(t, links)
defer test.RequireNSClose(t, ns)

oldLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

// cause a connection level failure by closing the connection out from underneath
// this.
origConn, _, err := ns.GetAMQPClientImpl(context.Background())
require.NoError(t, err)
err = origConn.Close()
require.NoError(t, err)

err = oldLWID.Link.Send(context.Background(), &amqp.Message{}, nil)
require.Error(t, err)
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))

getLogsFn := test.CaptureLogsForTest()

// NOTE: this is the key diff between the connection level test and ours - we induce a failure at the connection level _but_ we pretend to recover a link level error.
err = links.RecoverIfNeeded(context.Background(), "0", oldLWID, &amqp.LinkError{})
require.NoError(t, err)

logs := getLogsFn()
require.Contains(t, logs, "[azeh.Conn] Upgrading to connection reset for recovery instead of link")

newLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

requireNewLinkNewConn(t, oldLWID, newLWID)

err = newLWID.Link.Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
require.NoError(t, err)
}

func TestLinkFailure(t *testing.T) {
ns, links := newLinksForTest(t)
defer test.RequireClose(t, links)
defer test.RequireNSClose(t, ns)

oldLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

// close the Receiver out from under the Links
err = oldLWID.Link.Close(context.Background())
require.NoError(t, err)

err = oldLWID.Link.Send(context.Background(), &amqp.Message{Value: "hello"}, nil)
require.Error(t, err)
require.Equal(t, RecoveryKindLink, GetRecoveryKind(err))

// we only close the link here, it actually opens up on the next time we call links.Get()
cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()

err = links.RecoverIfNeeded(cancelledCtx, "0", oldLWID, err)
require.NoError(t, err)

newLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

requireNewLinkSameConn(t, oldLWID, newLWID)
}

func requireNewLinkSameConn(t *testing.T, oldLWID *LinkWithID[AMQPSenderCloser], newLWID *LinkWithID[AMQPSenderCloser]) {
t.Helper()
require.NotEqual(t, oldLWID.Link.LinkName(), newLWID.Link.LinkName(), "Link should have a new ID because it was recreated")
require.Equal(t, oldLWID.ConnID, newLWID.ConnID, "Connection ID should be the same since recreation wasn't needed")
}

func requireNewLinkNewConn(t *testing.T, oldLWID *LinkWithID[AMQPSenderCloser], newLWID *LinkWithID[AMQPSenderCloser]) {
t.Helper()
require.NotEqual(t, oldLWID.Link.LinkName(), newLWID.Link.LinkName(), "Link should have a new ID because it was recreated")
require.Equal(t, oldLWID.ConnID+1, newLWID.ConnID, "Connection ID should be recreated")
}

func newLinksForTest(t *testing.T) (*Namespace, *Links[amqpwrap.AMQPSenderCloser]) {
testParams := test.GetConnectionParamsForTest(t)
ns, err := NewNamespace(NamespaceWithConnectionString(testParams.ConnectionString))
require.NoError(t, err)

links := NewLinks(ns, fmt.Sprintf("%s/$management", testParams.EventHubName), func(partitionID string) string {
return fmt.Sprintf("%s/Partitions/%s", testParams.EventHubName, partitionID)
}, func(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (AMQPSenderCloser, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return session.NewSender(ctx, entityPath, &amqp.SenderOptions{
SettlementMode: to.Ptr(amqp.SenderSettleModeMixed),
RequestedReceiverSettleMode: to.Ptr(amqp.ReceiverSettleModeFirst),
})
}
})

err = links.Retry(context.Background(), exported.EventConn, "test", "0", exported.RetryOptions{
RetryDelay: -1,
MaxRetryDelay: time.Millisecond,
}, func(ctx context.Context, innerLWID LinkWithID[AMQPSenderCloser]) error {
return nil
})
require.NoError(t, err)

return ns, links
}
Loading

0 comments on commit a6b6039

Please sign in to comment.