Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Return ErrMessageTooLarge if a message is too big to send #20721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sdk/messaging/azservicebus/internal/go-amqp/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
)
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return nil, fmt.Errorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag))
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
}
}

s.mu.Lock()
Expand All @@ -114,7 +117,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
}

if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
return nil, fmt.Errorf("encoded message size exceeds max of %d", s.l.maxMessageSize)
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
}
}

senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled
Expand Down
3 changes: 2 additions & 1 deletion sdk/messaging/azservicebus/message_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

// ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add()
var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch")
// or if the message is being sent on its own and is too large for the link.
var ErrMessageTooLarge = errors.New("the message is too large")

type (
// MessageBatch represents a batch of messages to send to Service Bus in a single message
Expand Down
15 changes: 12 additions & 3 deletions sdk/messaging/azservicebus/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azservicebus

import (
"context"
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
Expand Down Expand Up @@ -33,7 +34,7 @@ type MessageBatchOptions struct {
// NewMessageBatch can be used to create a batch that contain multiple
// messages. Sending a batch of messages is more efficient than sending the
// messages one at a time.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error) {
var batch *MessageBatch

Expand Down Expand Up @@ -61,7 +62,9 @@ type SendMessageOptions struct {
}

// SendMessage sends a Message to a queue or topic.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return:
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
// - An [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error {
return s.sendMessage(ctx, message)
}
Expand All @@ -74,7 +77,9 @@ type SendAMQPAnnotatedMessageOptions struct {
// SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic.
// Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better
// interoperability with pure AMQP clients.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return:
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
// - An [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error {
return s.sendMessage(ctx, message)
}
Expand Down Expand Up @@ -171,6 +176,10 @@ func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage)
return lwid.Sender.Send(ctx, message.toAMQPMessage(), nil)
}, RetryOptions(s.retryOptions))

if amqpErr := (*amqp.Error)(nil); errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondMessageSizeExceeded {
return ErrMessageTooLarge
}

return internal.TransformError(err)
}

Expand Down
44 changes: 44 additions & 0 deletions sdk/messaging/azservicebus/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,47 @@ func (rm receivedMessages) Less(i, j int) bool {
func (rm receivedMessages) Swap(i, j int) {
rm[i], rm[j] = rm[j], rm[i]
}

func Test_Sender_Send_MessageTooBig(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{
ClientOptions: &ClientOptions{
RetryOptions: RetryOptions{
// This is a purposefully ridiculous wait time but we'll never hit it
// because exceeding the max message size is NOT a retryable error.
RetryDelay: time.Hour,
},
},
QueueProperties: &admin.QueueProperties{
EnablePartitioning: to.Ptr(true),
}})
defer cleanup()

sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)

hugePayload := []byte{}

for i := 0; i < 1000*1000; i++ {
hugePayload = append(hugePayload, 100)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err = sender.SendMessage(ctx, &Message{
MessageID: to.Ptr("message with a message ID"),
Body: hugePayload,
}, nil)

require.ErrorIs(t, err, ErrMessageTooLarge)

ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
defer cancel()

err = sender.SendAMQPAnnotatedMessage(ctx, &AMQPAnnotatedMessage{
Body: AMQPAnnotatedMessageBody{
Data: [][]byte{hugePayload},
},
}, nil)

require.ErrorIs(t, err, ErrMessageTooLarge)
}