Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Support setting source selector filter (#5)
Browse files Browse the repository at this point in the history
* Added `LinkSelectorFilter()` link option.
* Added `describedType`.
* Refactored `readComposite` to try decoding as described type when not a known composite.
* Added `describedType` to marshal tests.
* Added `String()` methods for debugging.

Note: Event Hubs was sending back `0x137000000a` as descriptor code when `LinkSelectorFilter()` was using a string as the descriptor. I haven't been able to figure out what that code refers to.
  • Loading branch information
vcabbage authored Jan 16, 2018
1 parent 92b4f0d commit 9f2af47
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Binary files (no line-ending conversions), diff using hexdump
*.bin binary diff=hex

20 changes: 20 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package amqp
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -619,6 +620,7 @@ type link struct {
senderSettleMode *SenderSettleMode
receiverSettleMode *ReceiverSettleMode
maxMessageSize uint64
filters map[symbol]interface{} // source filters sent during attach
peerMaxMessageSize uint64
detachSent bool // detach frame has been sent
detachReceived bool
Expand Down Expand Up @@ -684,6 +686,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
attach.Source = &source{
Address: l.address,
Dynamic: l.dynamicAddr,
Filter: l.filters,
}
} else {
attach.Role = roleSender
Expand Down Expand Up @@ -1085,6 +1088,23 @@ func LinkReceiverSettle(mode ReceiverSettleMode) LinkOption {
}
}

// LinkSelectorFilter sets a selector filter (apache.org:selector-filter:string) on the link source.
func LinkSelectorFilter(filter string) LinkOption {
// <descriptor name="apache.org:selector-filter:string" code="0x0000468C:0x00000004"/>
const name = symbol("apache.org:selector-filter:string")
code := binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04})
return func(l *link) error {
if l.filters == nil {
l.filters = make(map[symbol]interface{})
}
l.filters[name] = describedType{
descriptor: code,
value: filter,
}
return nil
}
}

// Receiver receives messages on a single AMQP link.
type Receiver struct {
link *link // underlying link
Expand Down
46 changes: 43 additions & 3 deletions decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,12 +703,16 @@ func readComposite(r reader) (interface{}, error) {
return nil, err
}

var iface interface{}

// check if known composite type
construct := compositeTypes[amqpType(v)]
if construct == nil {
return nil, errorErrorf("unmarshaling composite %0x not implemented", v)
if construct != nil {
iface = construct()
} else {
iface = new(describedType) // try as described type
}

iface := construct()
_, err = unmarshal(r, iface)
return iface, err
}
Expand All @@ -726,6 +730,42 @@ var compositeTypes = [256]func() interface{}{
typeCodeStateReceived: func() interface{} { return new(stateReceived) },
typeCodeStateRejected: func() interface{} { return new(stateRejected) },
typeCodeStateReleased: func() interface{} { return new(stateReleased) },

typeCodeOpen: notImplementedConstructor("typeCodeOpen"),
typeCodeBegin: notImplementedConstructor("typeCodeBegin"),
typeCodeAttach: notImplementedConstructor("typeCodeAttach"),
typeCodeFlow: notImplementedConstructor("typeCodeFlow"),
typeCodeTransfer: notImplementedConstructor("typeCodeTransfer"),
typeCodeDisposition: notImplementedConstructor("typeCodeDisposition"),
typeCodeDetach: notImplementedConstructor("typeCodeDetach"),
typeCodeEnd: notImplementedConstructor("typeCodeEnd"),
typeCodeClose: notImplementedConstructor("typeCodeClose"),
typeCodeSource: notImplementedConstructor("typeCodeSource"),
typeCodeTarget: notImplementedConstructor("typeCodeTarget"),
typeCodeMessageHeader: notImplementedConstructor("typeCodeMessageHeader"),
typeCodeDeliveryAnnotations: notImplementedConstructor("typeCodeDeliveryAnnotations"),
typeCodeMessageAnnotations: notImplementedConstructor("typeCodeMessageAnnotations"),
typeCodeMessageProperties: notImplementedConstructor("typeCodeMessageProperties"),
typeCodeApplicationProperties: notImplementedConstructor("typeCodeApplicationProperties"),
typeCodeApplicationData: notImplementedConstructor("typeCodeApplicationData"),
typeCodeAMQPSequence: notImplementedConstructor("typeCodeAMQPSequence"),
typeCodeAMQPValue: notImplementedConstructor("typeCodeAMQPValue"),
typeCodeFooter: notImplementedConstructor("typeCodeFooter"),
typeCodeSASLMechanism: notImplementedConstructor("typeCodeSASLMechanism"),
typeCodeSASLInit: notImplementedConstructor("typeCodeSASLInit"),
typeCodeSASLChallenge: notImplementedConstructor("typeCodeSASLChallenge"),
typeCodeSASLResponse: notImplementedConstructor("typeCodeSASLResponse"),
typeCodeSASLOutcome: notImplementedConstructor("typeCodeSASLOutcome"),
}

type notImplemented string

func (ni notImplemented) unmarshal(r reader) error {
return errorNew("readComposite unmarshal not implemented for " + string(ni))
}

func notImplementedConstructor(s string) func() interface{} {
return func() interface{} { return notImplemented(s) }
}

func readTimestamp(r reader) (time.Time, error) {
Expand Down
13 changes: 11 additions & 2 deletions encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func writeSymbolArray(wr writer, symbols []symbol) error {
defer bufPool.Put(buf)

for _, symbol := range symbols {
err := writeSymbol(buf, symbol, ofType)
err := writeSymbolType(buf, symbol, ofType)
if err != nil {
return err
}
Expand All @@ -313,7 +313,16 @@ func writeSymbolArray(wr writer, symbols []symbol) error {
return err
}

func writeSymbol(wr writer, sym symbol, typ amqpType) error {
func writeSymbol(wr writer, sym symbol) error {
ofType := typeCodeSym8
if len(sym) > math.MaxUint8 {
ofType = typeCodeSym32
}

return writeSymbolType(wr, sym, ofType)
}

func writeSymbolType(wr writer, sym symbol, typ amqpType) error {
if !utf8.ValidString(string(sym)) {
return errorNew("not a valid UTF-8 string")
}
Expand Down
Binary file added fuzz/marshal/corpus/describedType.bin
Binary file not shown.
7 changes: 6 additions & 1 deletion marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package amqp

import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"math"
Expand Down Expand Up @@ -328,6 +329,10 @@ func TestMarshalUnmarshal(t *testing.T) {
uint16(math.MaxUint16),
uint32(math.MaxUint32),
uint64(math.MaxUint64),
describedType{
descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}),
value: "amqp.annotation.x-opt-offset > '312'",
},
}

for _, typ := range types {
Expand All @@ -352,7 +357,7 @@ func TestMarshalUnmarshal(t *testing.T) {
newTyp := reflect.New(reflect.TypeOf(typ))
_, err = unmarshal(&buf, newTyp.Interface())
if err != nil {
t.Error(fmt.Sprintf("%v", err))
t.Error(fmt.Sprintf("%+v", err))
return
}

Expand Down
77 changes: 74 additions & 3 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ const (
typeCodeSASLResponse amqpType = 0x43
typeCodeSASLOutcome amqpType = 0x44

// Lifetime Policies
typeCodeDeleteOnClose amqpType = 0x2b
typeCodeDeleteOnNoLinks amqpType = 0x2c
typeCodeDeleteOnNoMessages amqpType = 0x2d
Expand Down Expand Up @@ -761,6 +760,24 @@ func (s *source) unmarshal(r reader) error {
}...)
}

func (s source) String() string {
return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+
"Dynamic: %t, DynamicNodeProperties: %v, DistributionMode: %s, Filter: %v, DefaultOutcome: %v"+
"Outcomes: %v, Capabilities: %v}",
s.Address,
s.Durable,
s.ExpiryPolicy,
s.Timeout,
s.Dynamic,
s.DynamicNodeProperties,
s.DistributionMode,
s.Filter,
s.DefaultOutcome,
s.Outcomes,
s.Capabilities,
)
}

/*
<type name="target" class="composite" source="list" provides="target">
<descriptor name="amqp:target:list" code="0x00000000:0x00000029"/>
Expand Down Expand Up @@ -882,6 +899,19 @@ func (t *target) unmarshal(r reader) error {
}...)
}

func (t target) String() string {
return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+
"Dynamic: %t, DynamicNodeProperties: %v, Capabilities: %v}",
t.Address,
t.Durable,
t.ExpiryPolicy,
t.Timeout,
t.Dynamic,
t.DynamicNodeProperties,
t.Capabilities,
)
}

/*
<type name="flow" class="composite" source="list" provides="frame">
<descriptor name="amqp:flow:list" code="0x00000000:0x00000013"/>
Expand Down Expand Up @@ -2263,12 +2293,16 @@ func (s symbol) marshal(wr writer) error {

var err error
switch {
// List8
// Sym8
case l < 256:
_, err = wr.Write(append([]byte{byte(typeCodeSym8), byte(l)}, []byte(s)...))

// List32
// Sym32
case l < math.MaxUint32:
err = wr.WriteByte(uint8(typeCodeSym32))
if err != nil {
return err
}
err = binary.Write(wr, binary.BigEndian, uint32(l))
if err != nil {
return err
Expand Down Expand Up @@ -2545,3 +2579,40 @@ func (m *ReceiverSettleMode) unmarshal(r reader) error {
_, err := unmarshal(r, (*uint8)(m))
return err
}

type describedType struct {
descriptor interface{}
value interface{}
}

func (t describedType) marshal(wr writer) error {
err := wr.WriteByte(0x0) // descriptor constructor
if err != nil {
return err
}
err = marshal(wr, t.descriptor)
if err != nil {
return err
}
return marshal(wr, t.value)
}

func (t *describedType) unmarshal(r reader) error {
b, err := r.ReadByte()
if b != 0x0 {
return errorErrorf("invalid described type header %0x", b)
}
_, err = unmarshal(r, &t.descriptor)
if err != nil {
return err
}
_, err = unmarshal(r, &t.value)
return err
}

func (t describedType) String() string {
return fmt.Sprintf("describedType{descriptor: %v, value: %v}",
t.descriptor,
t.value,
)
}

0 comments on commit 9f2af47

Please sign in to comment.