Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Separate source and target addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Menzhinsky authored and vcabbage committed Jan 30, 2018
1 parent d62cd7a commit 156a96c
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 27 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkAddress("/queue-name"),
amqp.LinkTargetAddress("/queue-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
Expand All @@ -87,7 +87,7 @@ func main() {
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkAddress("/queue-name"),
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(10),
)
if err != nil {
Expand Down
72 changes: 54 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ func (s *Sender) Send(ctx context.Context, msg *Message) error {

// Address returns the link's address.
func (s *Sender) Address() string {
return s.link.address
if s.link.target == nil {
return ""
}
return s.link.target.Address
}

// Close closes the Sender and AMQP link.
Expand Down Expand Up @@ -617,7 +620,6 @@ type link struct {
name string // our name
handle uint32 // our handle
remoteHandle uint32 // remote's handle
address string // address sent during attach
dynamicAddr bool // request a dynamic link address from the server
rx chan frameBody // sessions sends frames for this link on this channel
transfers chan performTransfer // sender uses for send; receiver uses for receive
Expand All @@ -627,6 +629,8 @@ type link struct {
doneOnce sync.Once
session *Session // parent session
receiver *Receiver // allows link options to modify Receiver
source *source
target *target

// "The delivery-count is initialized by the sender when a link endpoint is created,
// and is incremented whenever a message is sent. Only the sender MAY independently
Expand All @@ -639,7 +643,6 @@ type link struct {
senderSettleMode *SenderSettleMode
receiverSettleMode *ReceiverSettleMode
maxMessageSize uint64
filters map[symbol]interface{} // source filters sent during attach
peerMaxMessageSize uint64
detachSent bool // detach frame has been sent
detachReceived bool
Expand Down Expand Up @@ -698,21 +701,22 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
ReceiverSettleMode: l.receiverSettleMode,
SenderSettleMode: l.senderSettleMode,
MaxMessageSize: l.maxMessageSize,
Source: l.source,
Target: l.target,
}

if isReceiver {
attach.Role = roleReceiver
attach.Source = &source{
Address: l.address,
Dynamic: l.dynamicAddr,
Filter: l.filters,
if attach.Source == nil {
attach.Source = new(source)
}
attach.Source.Dynamic = l.dynamicAddr
} else {
attach.Role = roleSender
attach.Target = &target{
Address: l.address,
Dynamic: l.dynamicAddr,
if attach.Target == nil {
attach.Target = new(target)
}
attach.Target.Dynamic = l.dynamicAddr
}

// send Attach frame
Expand Down Expand Up @@ -743,7 +747,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
if isReceiver {
// if dynamic address requested, copy assigned name to address
if l.dynamicAddr && resp.Source != nil {
l.address = resp.Source.Address
l.source.Address = resp.Source.Address
}
// deliveryCount is a sequence number, must initialize to sender's initial sequence number
l.deliveryCount = resp.InitialDeliveryCount
Expand All @@ -755,7 +759,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
} else {
// if dynamic address requested, copy assigned name to address
if l.dynamicAddr && resp.Target != nil {
l.address = resp.Target.Address
l.target.Address = resp.Target.Address
}
l.transfers = make(chan performTransfer)
if resp.ReceiverSettleMode != nil {
Expand Down Expand Up @@ -1026,9 +1030,35 @@ type LinkOption func(*link) error
//
// For a Receiver this configures the source address.
// For a Sender this configures the target address.
//
// Deprecated: use LinkSourceAddress or LinkTargetAddress instead.
func LinkAddress(source string) LinkOption {
return func(l *link) error {
l.address = source
if l.receiver != nil {
return LinkSourceAddress(source)(l)
}
return LinkTargetAddress(source)(l)
}
}

// LinkSourceAddress sets the source address.
func LinkSourceAddress(addr string) LinkOption {
return func(l *link) error {
if l.source == nil {
l.source = new(source)
}
l.source.Address = addr
return nil
}
}

// LinkTargetAddress sets the target address.
func LinkTargetAddress(addr string) LinkOption {
return func(l *link) error {
if l.target == nil {
l.target = new(target)
}
l.target.Address = addr
return nil
}
}
Expand Down Expand Up @@ -1113,10 +1143,13 @@ func LinkSelectorFilter(filter string) LinkOption {
const name = symbol("apache.org:selector-filter:string")
code := binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04})
return func(l *link) error {
if l.filters == nil {
l.filters = make(map[symbol]interface{})
if l.source == nil {
l.source = new(source)
}
l.filters[name] = describedType{
if l.source.Filter == nil {
l.source.Filter = make(map[symbol]interface{})
}
l.source.Filter[name] = describedType{
descriptor: code,
value: filter,
}
Expand All @@ -1127,7 +1160,7 @@ func LinkSelectorFilter(filter string) LinkOption {
// Receiver receives messages on a single AMQP link.
type Receiver struct {
link *link // underlying link
buf bytes.Buffer // resable buffer for decoding multi frame messages
buf bytes.Buffer // reusable buffer for decoding multi frame messages
batching bool // enable batching of message dispositions
batchMaxAge time.Duration // maximum time between the start n batch and sending the batch to the server
dispositions chan messageDisposition // message dispositions are sent on this channel when batching is enabled
Expand Down Expand Up @@ -1204,7 +1237,10 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {

// Address returns the link's address.
func (r *Receiver) Address() string {
return r.link.address
if r.link.source == nil {
return ""
}
return r.link.source.Address
}

// Close closes the Receiver and AMQP link.
Expand Down
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Example() {
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkAddress("/queue-name"),
amqp.LinkTargetAddress("/queue-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
Expand All @@ -55,7 +55,7 @@ func Example() {
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkAddress("/queue-name"),
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(10),
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func FuzzConn(data []byte) int {
return 0
}

r, err := s.NewReceiver(LinkAddress("source"), LinkCredit(2))
r, err := s.NewReceiver(LinkSourceAddress("source"), LinkCredit(2))
if err != nil {
return 0
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func FuzzConn(data []byte) int {
return 0
}

sender, err := s.NewSender(LinkAddress("source"), LinkCredit(2))
sender, err := s.NewSender(LinkTargetAddress("source"), LinkCredit(2))
if err != nil {
return 0
}
Expand Down
6 changes: 3 additions & 3 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestIntegrationRoundTrip(t *testing.T) {

// Create a sender
sender, err := session.NewSender(
amqp.LinkAddress(queueName),
amqp.LinkTargetAddress(queueName),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestIntegrationRoundTrip(t *testing.T) {

// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkAddress(queueName),
amqp.LinkSourceAddress(queueName),
amqp.LinkCredit(10),
amqp.LinkBatching(false),
)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestIntegrationSend(t *testing.T) {

// Create a sender
sender, err := session.NewSender(
amqp.LinkAddress(queueName),
amqp.LinkTargetAddress(queueName),
)
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 156a96c

Please sign in to comment.