From 4135613cc37eaf3172548a9c88a55ea047a8c24c Mon Sep 17 00:00:00 2001 From: Joel Hendrix Date: Wed, 29 Mar 2023 14:31:18 -0700 Subject: [PATCH] EH/SB remove Message.LinkName One more change from go-amqp to pick up. --- .../azeventhubs/internal/go-amqp/message.go | 13 ++----------- .../azeventhubs/internal/go-amqp/receiver.go | 2 -- sdk/messaging/azservicebus/amqp_message.go | 4 ++-- .../azservicebus/internal/go-amqp/message.go | 13 ++----------- .../azservicebus/internal/go-amqp/receiver.go | 2 -- sdk/messaging/azservicebus/message.go | 4 ++-- sdk/messaging/azservicebus/message_test.go | 18 +++++++++--------- sdk/messaging/azservicebus/receiver.go | 6 +++--- 8 files changed, 20 insertions(+), 42 deletions(-) diff --git a/sdk/messaging/azeventhubs/internal/go-amqp/message.go b/sdk/messaging/azeventhubs/internal/go-amqp/message.go index a19892fbdfc5..e82d5564d57f 100644 --- a/sdk/messaging/azeventhubs/internal/go-amqp/message.go +++ b/sdk/messaging/azeventhubs/internal/go-amqp/message.go @@ -102,9 +102,8 @@ type Message struct { // encryption details). Footer Annotations - rcvr *Receiver // the receiving link - deliveryID uint32 // used when sending disposition - settled bool // whether transfer was settled by sender + deliveryID uint32 // used when sending disposition + settled bool // whether transfer was settled by sender } // NewMessage returns a *Message with data as the payload. @@ -127,14 +126,6 @@ func (m *Message) GetData() []byte { return m.Data[0] } -// LinkName returns the receiving link name or the empty string. -func (m *Message) LinkName() string { - if m.rcvr != nil { - return m.rcvr.l.key.name - } - return "" -} - // MarshalBinary encodes the message into binary form. func (m *Message) MarshalBinary() ([]byte, error) { buf := &buffer.Buffer{} diff --git a/sdk/messaging/azeventhubs/internal/go-amqp/receiver.go b/sdk/messaging/azeventhubs/internal/go-amqp/receiver.go index b6256e3d9a9a..697e258ffdb8 100644 --- a/sdk/messaging/azeventhubs/internal/go-amqp/receiver.go +++ b/sdk/messaging/azeventhubs/internal/go-amqp/receiver.go @@ -90,7 +90,6 @@ func (r *Receiver) Prefetched() *Message { } debug.Log(3, "RX (Receiver): prefetched delivery ID %d", msg.deliveryID) - msg.rcvr = r if msg.settled { r.onSettlement(1) @@ -121,7 +120,6 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message, msg := q.Dequeue() debug.Assert(msg != nil) debug.Log(3, "RX (Receiver): received delivery ID %d", msg.deliveryID) - msg.rcvr = r r.messagesQ.Release(q) if msg.settled { r.onSettlement(1) diff --git a/sdk/messaging/azservicebus/amqp_message.go b/sdk/messaging/azservicebus/amqp_message.go index bd40f0038906..c7484257994f 100644 --- a/sdk/messaging/azservicebus/amqp_message.go +++ b/sdk/messaging/azservicebus/amqp_message.go @@ -222,7 +222,7 @@ func copyAnnotations(src map[any]any) amqp.Annotations { return dest } -func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage { +func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName string) *AMQPAnnotatedMessage { var header *AMQPAnnotatedMessageHeader if goAMQPMessage.Header != nil { @@ -273,7 +273,7 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage DeliveryTag: goAMQPMessage.DeliveryTag, Footer: footer, Header: header, - linkName: goAMQPMessage.LinkName(), + linkName: receivingLinkName, Properties: properties, inner: goAMQPMessage, } diff --git a/sdk/messaging/azservicebus/internal/go-amqp/message.go b/sdk/messaging/azservicebus/internal/go-amqp/message.go index 72e21f494361..52bef7a244b7 100644 --- a/sdk/messaging/azservicebus/internal/go-amqp/message.go +++ b/sdk/messaging/azservicebus/internal/go-amqp/message.go @@ -102,9 +102,8 @@ type Message struct { // encryption details). Footer Annotations - rcvr *Receiver // the receiving link - deliveryID uint32 // used when sending disposition - settled bool // whether transfer was settled by sender + deliveryID uint32 // used when sending disposition + settled bool // whether transfer was settled by sender } // NewMessage returns a *Message with data as the payload. @@ -127,14 +126,6 @@ func (m *Message) GetData() []byte { return m.Data[0] } -// LinkName returns the receiving link name or the empty string. -func (m *Message) LinkName() string { - if m.rcvr != nil { - return m.rcvr.l.key.name - } - return "" -} - // MarshalBinary encodes the message into binary form. func (m *Message) MarshalBinary() ([]byte, error) { buf := &buffer.Buffer{} diff --git a/sdk/messaging/azservicebus/internal/go-amqp/receiver.go b/sdk/messaging/azservicebus/internal/go-amqp/receiver.go index 42be86bf9cc7..ad7a0107fe85 100644 --- a/sdk/messaging/azservicebus/internal/go-amqp/receiver.go +++ b/sdk/messaging/azservicebus/internal/go-amqp/receiver.go @@ -90,7 +90,6 @@ func (r *Receiver) Prefetched() *Message { } debug.Log(3, "RX (Receiver): prefetched delivery ID %d", msg.deliveryID) - msg.rcvr = r if msg.settled { r.onSettlement(1) @@ -121,7 +120,6 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message, msg := q.Dequeue() debug.Assert(msg != nil) debug.Log(3, "RX (Receiver): received delivery ID %d", msg.deliveryID) - msg.rcvr = r r.messagesQ.Release(q) if msg.settled { r.onSettlement(1) diff --git a/sdk/messaging/azservicebus/message.go b/sdk/messaging/azservicebus/message.go index 1bee14da7c79..b3247fd5d355 100644 --- a/sdk/messaging/azservicebus/message.go +++ b/sdk/messaging/azservicebus/message.go @@ -312,9 +312,9 @@ func (m *Message) toAMQPMessage() *amqp.Message { // newReceivedMessage creates a received message from an AMQP message. // NOTE: this converter assumes that the Body of this message will be the first // serialized byte array in the Data section of the messsage. -func newReceivedMessage(amqpMsg *amqp.Message) *ReceivedMessage { +func newReceivedMessage(amqpMsg *amqp.Message, receivingLinkName string) *ReceivedMessage { msg := &ReceivedMessage{ - RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg), + RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receivingLinkName), State: MessageStateActive, } diff --git a/sdk/messaging/azservicebus/message_test.go b/sdk/messaging/azservicebus/message_test.go index 87ffdfaa14de..2db45826d68c 100644 --- a/sdk/messaging/azservicebus/message_test.go +++ b/sdk/messaging/azservicebus/message_test.go @@ -50,7 +50,7 @@ func TestMessageUnitTest(t *testing.T) { func TestAMQPMessageToReceivedMessage(t *testing.T) { t.Run("empty_message", func(t *testing.T) { // nothing should blow up. - rm := newReceivedMessage(&amqp.Message{}) + rm := newReceivedMessage(&amqp.Message{}, "receiving_link") require.NotNil(t, rm) }) @@ -73,7 +73,7 @@ func TestAMQPMessageToReceivedMessage(t *testing.T) { }, } - receivedMessage := newReceivedMessage(amqpMessage) + receivedMessage := newReceivedMessage(amqpMessage, "receiving_link") require.Equal(t, []byte("hello"), receivedMessage.Body) require.EqualValues(t, lockedUntil, *receivedMessage.LockedUntil) @@ -134,7 +134,7 @@ func TestAMQPMessageToMessage(t *testing.T) { Data: [][]byte{[]byte("foo")}, } - msg := newReceivedMessage(amqpMsg) + msg := newReceivedMessage(amqpMsg, "receiving_link") require.EqualValues(t, msg.MessageID, amqpMsg.Properties.MessageID, "messageID") require.EqualValues(t, msg.SessionID, amqpMsg.Properties.GroupID, "groupID") @@ -179,7 +179,7 @@ func TestMessageState(t *testing.T) { Annotations: amqp.Annotations{ messageStateAnnotation: td.PropValue, }, - }) + }, "receiving_link") require.EqualValues(t, td.Expected, m.State) }) } @@ -187,7 +187,7 @@ func TestMessageState(t *testing.T) { t.Run("NoAnnotations", func(t *testing.T) { m := newReceivedMessage(&amqp.Message{ Annotations: nil, - }) + }, "receiving_link") require.EqualValues(t, MessageStateActive, m.State) }) } @@ -195,17 +195,17 @@ func TestMessageState(t *testing.T) { func TestMessageWithIncorrectBody(t *testing.T) { // these are cases where the simple ReceivedMessage can't represent the AMQP message's // payload. - message := newReceivedMessage(&amqp.Message{}) + message := newReceivedMessage(&amqp.Message{}, "receiving_link") require.Nil(t, message.Body) message = newReceivedMessage(&amqp.Message{ Value: "hello", - }) + }, "receiving_link") require.Nil(t, message.Body) message = newReceivedMessage(&amqp.Message{ Sequence: [][]any{}, - }) + }, "receiving_link") require.Nil(t, message.Body) message = newReceivedMessage(&amqp.Message{ @@ -213,6 +213,6 @@ func TestMessageWithIncorrectBody(t *testing.T) { []byte("hello"), []byte("world"), }, - }) + }, "receiving_link") require.Nil(t, message.Body) } diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 14d4cb254ebc..59b5b48db698 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -229,7 +229,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers } for _, amqpMsg := range amqpMessages { - receivedMsg := newReceivedMessage(amqpMsg) + receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver.LinkName()) receivedMsg.deferred = true receivedMessages = append(receivedMessages, receivedMsg) @@ -279,7 +279,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option receivedMessages = make([]*ReceivedMessage, len(messages)) for i := 0; i < len(messages); i++ { - receivedMessages[i] = newReceivedMessage(messages[i]) + receivedMessages[i] = newReceivedMessage(messages[i], links.Receiver.LinkName()) } if len(receivedMessages) > 0 && updateInternalSequenceNumber { @@ -440,7 +440,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt var receivedMessages []*ReceivedMessage for _, msg := range result.Messages { - receivedMessages = append(receivedMessages, newReceivedMessage(msg)) + receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver.LinkName())) } return receivedMessages, nil