diff --git a/README.md b/README.md index c969d3ed..6e9b21ad 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ func main() { { // Create a sender sender, err := session.NewSender( - amqp.LinkAddress("/queue-name"), + amqp.LinkTargetAddress("/queue-name"), ) if err != nil { log.Fatal("Creating sender link:", err) @@ -87,7 +87,7 @@ func main() { { // Create a receiver receiver, err := session.NewReceiver( - amqp.LinkAddress("/queue-name"), + amqp.LinkSourceAddress("/queue-name"), amqp.LinkCredit(10), ) if err != nil { diff --git a/client.go b/client.go index 8b7210e2..f2563c0e 100644 --- a/client.go +++ b/client.go @@ -303,7 +303,10 @@ func (s *Sender) Send(ctx context.Context, msg *Message) error { // Address returns the link's address. func (s *Sender) Address() string { - return s.link.address + if s.link.target == nil { + return "" + } + return s.link.target.Address } // Close closes the Sender and AMQP link. @@ -617,7 +620,6 @@ type link struct { name string // our name handle uint32 // our handle remoteHandle uint32 // remote's handle - address string // address sent during attach dynamicAddr bool // request a dynamic link address from the server rx chan frameBody // sessions sends frames for this link on this channel transfers chan performTransfer // sender uses for send; receiver uses for receive @@ -627,6 +629,8 @@ type link struct { doneOnce sync.Once session *Session // parent session receiver *Receiver // allows link options to modify Receiver + source *source + target *target // "The delivery-count is initialized by the sender when a link endpoint is created, // and is incremented whenever a message is sent. Only the sender MAY independently @@ -639,7 +643,6 @@ type link struct { senderSettleMode *SenderSettleMode receiverSettleMode *ReceiverSettleMode maxMessageSize uint64 - filters map[symbol]interface{} // source filters sent during attach peerMaxMessageSize uint64 detachSent bool // detach frame has been sent detachReceived bool @@ -698,21 +701,22 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { ReceiverSettleMode: l.receiverSettleMode, SenderSettleMode: l.senderSettleMode, MaxMessageSize: l.maxMessageSize, + Source: l.source, + Target: l.target, } if isReceiver { attach.Role = roleReceiver - attach.Source = &source{ - Address: l.address, - Dynamic: l.dynamicAddr, - Filter: l.filters, + if attach.Source == nil { + attach.Source = new(source) } + attach.Source.Dynamic = l.dynamicAddr } else { attach.Role = roleSender - attach.Target = &target{ - Address: l.address, - Dynamic: l.dynamicAddr, + if attach.Target == nil { + attach.Target = new(target) } + attach.Target.Dynamic = l.dynamicAddr } // send Attach frame @@ -743,7 +747,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { if isReceiver { // if dynamic address requested, copy assigned name to address if l.dynamicAddr && resp.Source != nil { - l.address = resp.Source.Address + l.source.Address = resp.Source.Address } // deliveryCount is a sequence number, must initialize to sender's initial sequence number l.deliveryCount = resp.InitialDeliveryCount @@ -755,7 +759,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { } else { // if dynamic address requested, copy assigned name to address if l.dynamicAddr && resp.Target != nil { - l.address = resp.Target.Address + l.target.Address = resp.Target.Address } l.transfers = make(chan performTransfer) if resp.ReceiverSettleMode != nil { @@ -1026,9 +1030,35 @@ type LinkOption func(*link) error // // For a Receiver this configures the source address. // For a Sender this configures the target address. +// +// Deprecated: use LinkSourceAddress or LinkTargetAddress instead. func LinkAddress(source string) LinkOption { return func(l *link) error { - l.address = source + if l.receiver != nil { + return LinkSourceAddress(source)(l) + } + return LinkTargetAddress(source)(l) + } +} + +// LinkSourceAddress sets the source address. +func LinkSourceAddress(addr string) LinkOption { + return func(l *link) error { + if l.source == nil { + l.source = new(source) + } + l.source.Address = addr + return nil + } +} + +// LinkTargetAddress sets the target address. +func LinkTargetAddress(addr string) LinkOption { + return func(l *link) error { + if l.target == nil { + l.target = new(target) + } + l.target.Address = addr return nil } } @@ -1113,10 +1143,13 @@ func LinkSelectorFilter(filter string) LinkOption { const name = symbol("apache.org:selector-filter:string") code := binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}) return func(l *link) error { - if l.filters == nil { - l.filters = make(map[symbol]interface{}) + if l.source == nil { + l.source = new(source) } - l.filters[name] = describedType{ + if l.source.Filter == nil { + l.source.Filter = make(map[symbol]interface{}) + } + l.source.Filter[name] = describedType{ descriptor: code, value: filter, } @@ -1127,7 +1160,7 @@ func LinkSelectorFilter(filter string) LinkOption { // Receiver receives messages on a single AMQP link. type Receiver struct { link *link // underlying link - buf bytes.Buffer // resable buffer for decoding multi frame messages + buf bytes.Buffer // reusable buffer for decoding multi frame messages batching bool // enable batching of message dispositions batchMaxAge time.Duration // maximum time between the start n batch and sending the batch to the server dispositions chan messageDisposition // message dispositions are sent on this channel when batching is enabled @@ -1204,7 +1237,10 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) { // Address returns the link's address. func (r *Receiver) Address() string { - return r.link.address + if r.link.source == nil { + return "" + } + return r.link.source.Address } // Close closes the Receiver and AMQP link. diff --git a/example_test.go b/example_test.go index b10188cd..2b10fc1f 100644 --- a/example_test.go +++ b/example_test.go @@ -31,7 +31,7 @@ func Example() { { // Create a sender sender, err := session.NewSender( - amqp.LinkAddress("/queue-name"), + amqp.LinkTargetAddress("/queue-name"), ) if err != nil { log.Fatal("Creating sender link:", err) @@ -55,7 +55,7 @@ func Example() { { // Create a receiver receiver, err := session.NewReceiver( - amqp.LinkAddress("/queue-name"), + amqp.LinkSourceAddress("/queue-name"), amqp.LinkCredit(10), ) if err != nil { diff --git a/fuzz.go b/fuzz.go index e9a0c4de..6bce8870 100644 --- a/fuzz.go +++ b/fuzz.go @@ -26,7 +26,7 @@ func FuzzConn(data []byte) int { return 0 } - r, err := s.NewReceiver(LinkAddress("source"), LinkCredit(2)) + r, err := s.NewReceiver(LinkSourceAddress("source"), LinkCredit(2)) if err != nil { return 0 } @@ -57,7 +57,7 @@ func FuzzConn(data []byte) int { return 0 } - sender, err := s.NewSender(LinkAddress("source"), LinkCredit(2)) + sender, err := s.NewSender(LinkTargetAddress("source"), LinkCredit(2)) if err != nil { return 0 } diff --git a/integration_test.go b/integration_test.go index 900552f2..569be200 100644 --- a/integration_test.go +++ b/integration_test.go @@ -105,7 +105,7 @@ func TestIntegrationRoundTrip(t *testing.T) { // Create a sender sender, err := session.NewSender( - amqp.LinkAddress(queueName), + amqp.LinkTargetAddress(queueName), ) if err != nil { t.Fatal(err) @@ -139,7 +139,7 @@ func TestIntegrationRoundTrip(t *testing.T) { // Create a receiver receiver, err := session.NewReceiver( - amqp.LinkAddress(queueName), + amqp.LinkSourceAddress(queueName), amqp.LinkCredit(10), amqp.LinkBatching(false), ) @@ -232,7 +232,7 @@ func TestIntegrationSend(t *testing.T) { // Create a sender sender, err := session.NewSender( - amqp.LinkAddress(queueName), + amqp.LinkTargetAddress(queueName), ) if err != nil { t.Fatal(err)