From aa8f10c87797beccd284be125be22deac5a6f819 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Mon, 1 May 2023 23:11:41 +0000 Subject: [PATCH] Return an error when you try to send a message that's too large. This now works just like the message batch - you'll get an ErrMessageTooLarge if you attempt to send a message that's too large for the link's configured size. NOTE: there's a patch to `internal/go-amqp/Sender.go` to match what's in go-amqp's main so it returns a programmatically useful error when the message is too large. Fixes #20647 --- .../azservicebus/internal/go-amqp/sender.go | 10 ++++- sdk/messaging/azservicebus/message_batch.go | 3 +- sdk/messaging/azservicebus/sender.go | 15 +++++-- sdk/messaging/azservicebus/sender_test.go | 44 +++++++++++++++++++ 4 files changed, 66 insertions(+), 6 deletions(-) diff --git a/sdk/messaging/azservicebus/internal/go-amqp/sender.go b/sdk/messaging/azservicebus/internal/go-amqp/sender.go index dcfb4dc557be..afe17e7f8e1c 100644 --- a/sdk/messaging/azservicebus/internal/go-amqp/sender.go +++ b/sdk/messaging/azservicebus/internal/go-amqp/sender.go @@ -101,7 +101,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader ) if len(msg.DeliveryTag) > maxDeliveryTagLength { - return nil, fmt.Errorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)) + return nil, &Error{ + Condition: ErrCondMessageSizeExceeded, + Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)), + } } s.mu.Lock() @@ -114,7 +117,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha } if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize { - return nil, fmt.Errorf("encoded message size exceeds max of %d", s.l.maxMessageSize) + return nil, &Error{ + Condition: ErrCondMessageSizeExceeded, + Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize), + } } senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled diff --git a/sdk/messaging/azservicebus/message_batch.go b/sdk/messaging/azservicebus/message_batch.go index fd33a0195ff5..1fde3d4bd761 100644 --- a/sdk/messaging/azservicebus/message_batch.go +++ b/sdk/messaging/azservicebus/message_batch.go @@ -12,7 +12,8 @@ import ( ) // ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add() -var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch") +// or if the message is being sent on its own and is too large for the link. +var ErrMessageTooLarge = errors.New("the message is too large") type ( // MessageBatch represents a batch of messages to send to Service Bus in a single message diff --git a/sdk/messaging/azservicebus/sender.go b/sdk/messaging/azservicebus/sender.go index 1c549c8094df..24e18ab23f58 100644 --- a/sdk/messaging/azservicebus/sender.go +++ b/sdk/messaging/azservicebus/sender.go @@ -5,6 +5,7 @@ package azservicebus import ( "context" + "errors" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" @@ -33,7 +34,7 @@ type MessageBatchOptions struct { // NewMessageBatch can be used to create a batch that contain multiple // messages. Sending a batch of messages is more efficient than sending the // messages one at a time. -// If the operation fails it can return an *azservicebus.Error type if the failure is actionable. +// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error) { var batch *MessageBatch @@ -61,7 +62,9 @@ type SendMessageOptions struct { } // SendMessage sends a Message to a queue or topic. -// If the operation fails it can return an *azservicebus.Error type if the failure is actionable. +// If the operation fails it can return: +// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size. +// - An [*azservicebus.Error] type if the failure is actionable. func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error { return s.sendMessage(ctx, message) } @@ -74,7 +77,9 @@ type SendAMQPAnnotatedMessageOptions struct { // SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic. // Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better // interoperability with pure AMQP clients. -// If the operation fails it can return an *azservicebus.Error type if the failure is actionable. +// If the operation fails it can return: +// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size. +// - An [*azservicebus.Error] type if the failure is actionable. func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error { return s.sendMessage(ctx, message) } @@ -171,6 +176,10 @@ func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage) return lwid.Sender.Send(ctx, message.toAMQPMessage(), nil) }, RetryOptions(s.retryOptions)) + if amqpErr := (*amqp.Error)(nil); errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondMessageSizeExceeded { + return ErrMessageTooLarge + } + return internal.TransformError(err) } diff --git a/sdk/messaging/azservicebus/sender_test.go b/sdk/messaging/azservicebus/sender_test.go index 0c0892a55b28..e3648f8f9148 100644 --- a/sdk/messaging/azservicebus/sender_test.go +++ b/sdk/messaging/azservicebus/sender_test.go @@ -734,3 +734,47 @@ func (rm receivedMessages) Less(i, j int) bool { func (rm receivedMessages) Swap(i, j int) { rm[i], rm[j] = rm[j], rm[i] } + +func Test_Sender_Send_MessageTooBig(t *testing.T) { + client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{ + ClientOptions: &ClientOptions{ + RetryOptions: RetryOptions{ + // This is a purposefully ridiculous wait time but we'll never hit it + // because exceeding the max message size is NOT a retryable error. + RetryDelay: time.Hour, + }, + }, + QueueProperties: &admin.QueueProperties{ + EnablePartitioning: to.Ptr(true), + }}) + defer cleanup() + + sender, err := client.NewSender(queueName, nil) + require.NoError(t, err) + + hugePayload := []byte{} + + for i := 0; i < 1000*1000; i++ { + hugePayload = append(hugePayload, 100) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + err = sender.SendMessage(ctx, &Message{ + MessageID: to.Ptr("message with a message ID"), + Body: hugePayload, + }, nil) + + require.ErrorIs(t, err, ErrMessageTooLarge) + + ctx, cancel = context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + err = sender.SendAMQPAnnotatedMessage(ctx, &AMQPAnnotatedMessage{ + Body: AMQPAnnotatedMessageBody{ + Data: [][]byte{hugePayload}, + }, + }, nil) + + require.ErrorIs(t, err, ErrMessageTooLarge) +}