Skip to content

Commit

Permalink
EH/SB remove Message.LinkName (Azure#20520)
Browse files Browse the repository at this point in the history
One more change from go-amqp to pick up.
  • Loading branch information
jhendrixMSFT authored Mar 29, 2023
1 parent 3ae758d commit 9b0eb84
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 42 deletions.
13 changes: 2 additions & 11 deletions sdk/messaging/azeventhubs/internal/go-amqp/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ type Message struct {
// encryption details).
Footer Annotations

rcvr *Receiver // the receiving link
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
}

// NewMessage returns a *Message with data as the payload.
Expand All @@ -127,14 +126,6 @@ func (m *Message) GetData() []byte {
return m.Data[0]
}

// LinkName returns the receiving link name or the empty string.
func (m *Message) LinkName() string {
if m.rcvr != nil {
return m.rcvr.l.key.name
}
return ""
}

// MarshalBinary encodes the message into binary form.
func (m *Message) MarshalBinary() ([]byte, error) {
buf := &buffer.Buffer{}
Expand Down
2 changes: 0 additions & 2 deletions sdk/messaging/azeventhubs/internal/go-amqp/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (r *Receiver) Prefetched() *Message {
}

debug.Log(3, "RX (Receiver): prefetched delivery ID %d", msg.deliveryID)
msg.rcvr = r

if msg.settled {
r.onSettlement(1)
Expand Down Expand Up @@ -121,7 +120,6 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message,
msg := q.Dequeue()
debug.Assert(msg != nil)
debug.Log(3, "RX (Receiver): received delivery ID %d", msg.deliveryID)
msg.rcvr = r
r.messagesQ.Release(q)
if msg.settled {
r.onSettlement(1)
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/amqp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func copyAnnotations(src map[any]any) amqp.Annotations {
return dest
}

func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage {
func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName string) *AMQPAnnotatedMessage {
var header *AMQPAnnotatedMessageHeader

if goAMQPMessage.Header != nil {
Expand Down Expand Up @@ -273,7 +273,7 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage
DeliveryTag: goAMQPMessage.DeliveryTag,
Footer: footer,
Header: header,
linkName: goAMQPMessage.LinkName(),
linkName: receivingLinkName,
Properties: properties,
inner: goAMQPMessage,
}
Expand Down
13 changes: 2 additions & 11 deletions sdk/messaging/azservicebus/internal/go-amqp/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ type Message struct {
// encryption details).
Footer Annotations

rcvr *Receiver // the receiving link
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
}

// NewMessage returns a *Message with data as the payload.
Expand All @@ -127,14 +126,6 @@ func (m *Message) GetData() []byte {
return m.Data[0]
}

// LinkName returns the receiving link name or the empty string.
func (m *Message) LinkName() string {
if m.rcvr != nil {
return m.rcvr.l.key.name
}
return ""
}

// MarshalBinary encodes the message into binary form.
func (m *Message) MarshalBinary() ([]byte, error) {
buf := &buffer.Buffer{}
Expand Down
2 changes: 0 additions & 2 deletions sdk/messaging/azservicebus/internal/go-amqp/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (r *Receiver) Prefetched() *Message {
}

debug.Log(3, "RX (Receiver): prefetched delivery ID %d", msg.deliveryID)
msg.rcvr = r

if msg.settled {
r.onSettlement(1)
Expand Down Expand Up @@ -121,7 +120,6 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message,
msg := q.Dequeue()
debug.Assert(msg != nil)
debug.Log(3, "RX (Receiver): received delivery ID %d", msg.deliveryID)
msg.rcvr = r
r.messagesQ.Release(q)
if msg.settled {
r.onSettlement(1)
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ func (m *Message) toAMQPMessage() *amqp.Message {
// newReceivedMessage creates a received message from an AMQP message.
// NOTE: this converter assumes that the Body of this message will be the first
// serialized byte array in the Data section of the messsage.
func newReceivedMessage(amqpMsg *amqp.Message) *ReceivedMessage {
func newReceivedMessage(amqpMsg *amqp.Message, receivingLinkName string) *ReceivedMessage {
msg := &ReceivedMessage{
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receivingLinkName),
State: MessageStateActive,
}

Expand Down
18 changes: 9 additions & 9 deletions sdk/messaging/azservicebus/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMessageUnitTest(t *testing.T) {
func TestAMQPMessageToReceivedMessage(t *testing.T) {
t.Run("empty_message", func(t *testing.T) {
// nothing should blow up.
rm := newReceivedMessage(&amqp.Message{})
rm := newReceivedMessage(&amqp.Message{}, "receiving_link")
require.NotNil(t, rm)
})

Expand All @@ -73,7 +73,7 @@ func TestAMQPMessageToReceivedMessage(t *testing.T) {
},
}

receivedMessage := newReceivedMessage(amqpMessage)
receivedMessage := newReceivedMessage(amqpMessage, "receiving_link")

require.Equal(t, []byte("hello"), receivedMessage.Body)
require.EqualValues(t, lockedUntil, *receivedMessage.LockedUntil)
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestAMQPMessageToMessage(t *testing.T) {
Data: [][]byte{[]byte("foo")},
}

msg := newReceivedMessage(amqpMsg)
msg := newReceivedMessage(amqpMsg, "receiving_link")

require.EqualValues(t, msg.MessageID, amqpMsg.Properties.MessageID, "messageID")
require.EqualValues(t, msg.SessionID, amqpMsg.Properties.GroupID, "groupID")
Expand Down Expand Up @@ -179,40 +179,40 @@ func TestMessageState(t *testing.T) {
Annotations: amqp.Annotations{
messageStateAnnotation: td.PropValue,
},
})
}, "receiving_link")
require.EqualValues(t, td.Expected, m.State)
})
}

t.Run("NoAnnotations", func(t *testing.T) {
m := newReceivedMessage(&amqp.Message{
Annotations: nil,
})
}, "receiving_link")
require.EqualValues(t, MessageStateActive, m.State)
})
}

func TestMessageWithIncorrectBody(t *testing.T) {
// these are cases where the simple ReceivedMessage can't represent the AMQP message's
// payload.
message := newReceivedMessage(&amqp.Message{})
message := newReceivedMessage(&amqp.Message{}, "receiving_link")
require.Nil(t, message.Body)

message = newReceivedMessage(&amqp.Message{
Value: "hello",
})
}, "receiving_link")
require.Nil(t, message.Body)

message = newReceivedMessage(&amqp.Message{
Sequence: [][]any{},
})
}, "receiving_link")
require.Nil(t, message.Body)

message = newReceivedMessage(&amqp.Message{
Data: [][]byte{
[]byte("hello"),
[]byte("world"),
},
})
}, "receiving_link")
require.Nil(t, message.Body)
}
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers
}

for _, amqpMsg := range amqpMessages {
receivedMsg := newReceivedMessage(amqpMsg)
receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver.LinkName())
receivedMsg.deferred = true

receivedMessages = append(receivedMessages, receivedMsg)
Expand Down Expand Up @@ -279,7 +279,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option
receivedMessages = make([]*ReceivedMessage, len(messages))

for i := 0; i < len(messages); i++ {
receivedMessages[i] = newReceivedMessage(messages[i])
receivedMessages[i] = newReceivedMessage(messages[i], links.Receiver.LinkName())
}

if len(receivedMessages) > 0 && updateInternalSequenceNumber {
Expand Down Expand Up @@ -440,7 +440,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
var receivedMessages []*ReceivedMessage

for _, msg := range result.Messages {
receivedMessages = append(receivedMessages, newReceivedMessage(msg))
receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver.LinkName()))
}

return receivedMessages, nil
Expand Down

0 comments on commit 9b0eb84

Please sign in to comment.