diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..854069f0 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,3 @@ +# Binary files (no line-ending conversions), diff using hexdump +*.bin binary diff=hex + diff --git a/client.go b/client.go index ed9a8c57..99c1a368 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package amqp import ( "bytes" "context" + "encoding/binary" "fmt" "math" "math/rand" @@ -619,6 +620,7 @@ type link struct { senderSettleMode *SenderSettleMode receiverSettleMode *ReceiverSettleMode maxMessageSize uint64 + filters map[symbol]interface{} // source filters sent during attach peerMaxMessageSize uint64 detachSent bool // detach frame has been sent detachReceived bool @@ -684,6 +686,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { attach.Source = &source{ Address: l.address, Dynamic: l.dynamicAddr, + Filter: l.filters, } } else { attach.Role = roleSender @@ -1085,6 +1088,23 @@ func LinkReceiverSettle(mode ReceiverSettleMode) LinkOption { } } +// LinkSelectorFilter sets a selector filter (apache.org:selector-filter:string) on the link source. +func LinkSelectorFilter(filter string) LinkOption { + // + const name = symbol("apache.org:selector-filter:string") + code := binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}) + return func(l *link) error { + if l.filters == nil { + l.filters = make(map[symbol]interface{}) + } + l.filters[name] = describedType{ + descriptor: code, + value: filter, + } + return nil + } +} + // Receiver receives messages on a single AMQP link. type Receiver struct { link *link // underlying link diff --git a/decode.go b/decode.go index d11e73f6..8e1add49 100644 --- a/decode.go +++ b/decode.go @@ -703,12 +703,16 @@ func readComposite(r reader) (interface{}, error) { return nil, err } + var iface interface{} + + // check if known composite type construct := compositeTypes[amqpType(v)] - if construct == nil { - return nil, errorErrorf("unmarshaling composite %0x not implemented", v) + if construct != nil { + iface = construct() + } else { + iface = new(describedType) // try as described type } - iface := construct() _, err = unmarshal(r, iface) return iface, err } @@ -726,6 +730,42 @@ var compositeTypes = [256]func() interface{}{ typeCodeStateReceived: func() interface{} { return new(stateReceived) }, typeCodeStateRejected: func() interface{} { return new(stateRejected) }, typeCodeStateReleased: func() interface{} { return new(stateReleased) }, + + typeCodeOpen: notImplementedConstructor("typeCodeOpen"), + typeCodeBegin: notImplementedConstructor("typeCodeBegin"), + typeCodeAttach: notImplementedConstructor("typeCodeAttach"), + typeCodeFlow: notImplementedConstructor("typeCodeFlow"), + typeCodeTransfer: notImplementedConstructor("typeCodeTransfer"), + typeCodeDisposition: notImplementedConstructor("typeCodeDisposition"), + typeCodeDetach: notImplementedConstructor("typeCodeDetach"), + typeCodeEnd: notImplementedConstructor("typeCodeEnd"), + typeCodeClose: notImplementedConstructor("typeCodeClose"), + typeCodeSource: notImplementedConstructor("typeCodeSource"), + typeCodeTarget: notImplementedConstructor("typeCodeTarget"), + typeCodeMessageHeader: notImplementedConstructor("typeCodeMessageHeader"), + typeCodeDeliveryAnnotations: notImplementedConstructor("typeCodeDeliveryAnnotations"), + typeCodeMessageAnnotations: notImplementedConstructor("typeCodeMessageAnnotations"), + typeCodeMessageProperties: notImplementedConstructor("typeCodeMessageProperties"), + typeCodeApplicationProperties: notImplementedConstructor("typeCodeApplicationProperties"), + typeCodeApplicationData: notImplementedConstructor("typeCodeApplicationData"), + typeCodeAMQPSequence: notImplementedConstructor("typeCodeAMQPSequence"), + typeCodeAMQPValue: notImplementedConstructor("typeCodeAMQPValue"), + typeCodeFooter: notImplementedConstructor("typeCodeFooter"), + typeCodeSASLMechanism: notImplementedConstructor("typeCodeSASLMechanism"), + typeCodeSASLInit: notImplementedConstructor("typeCodeSASLInit"), + typeCodeSASLChallenge: notImplementedConstructor("typeCodeSASLChallenge"), + typeCodeSASLResponse: notImplementedConstructor("typeCodeSASLResponse"), + typeCodeSASLOutcome: notImplementedConstructor("typeCodeSASLOutcome"), +} + +type notImplemented string + +func (ni notImplemented) unmarshal(r reader) error { + return errorNew("readComposite unmarshal not implemented for " + string(ni)) +} + +func notImplementedConstructor(s string) func() interface{} { + return func() interface{} { return notImplemented(s) } } func readTimestamp(r reader) (time.Time, error) { diff --git a/encode.go b/encode.go index d78d8a2b..184d3d08 100644 --- a/encode.go +++ b/encode.go @@ -298,7 +298,7 @@ func writeSymbolArray(wr writer, symbols []symbol) error { defer bufPool.Put(buf) for _, symbol := range symbols { - err := writeSymbol(buf, symbol, ofType) + err := writeSymbolType(buf, symbol, ofType) if err != nil { return err } @@ -313,7 +313,16 @@ func writeSymbolArray(wr writer, symbols []symbol) error { return err } -func writeSymbol(wr writer, sym symbol, typ amqpType) error { +func writeSymbol(wr writer, sym symbol) error { + ofType := typeCodeSym8 + if len(sym) > math.MaxUint8 { + ofType = typeCodeSym32 + } + + return writeSymbolType(wr, sym, ofType) +} + +func writeSymbolType(wr writer, sym symbol, typ amqpType) error { if !utf8.ValidString(string(sym)) { return errorNew("not a valid UTF-8 string") } diff --git a/fuzz/marshal/corpus/describedType.bin b/fuzz/marshal/corpus/describedType.bin new file mode 100644 index 00000000..77c4059c Binary files /dev/null and b/fuzz/marshal/corpus/describedType.bin differ diff --git a/marshal_test.go b/marshal_test.go index f73814a1..6a54717a 100644 --- a/marshal_test.go +++ b/marshal_test.go @@ -2,6 +2,7 @@ package amqp import ( "bytes" + "encoding/binary" "fmt" "io/ioutil" "math" @@ -328,6 +329,10 @@ func TestMarshalUnmarshal(t *testing.T) { uint16(math.MaxUint16), uint32(math.MaxUint32), uint64(math.MaxUint64), + describedType{ + descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}), + value: "amqp.annotation.x-opt-offset > '312'", + }, } for _, typ := range types { @@ -352,7 +357,7 @@ func TestMarshalUnmarshal(t *testing.T) { newTyp := reflect.New(reflect.TypeOf(typ)) _, err = unmarshal(&buf, newTyp.Interface()) if err != nil { - t.Error(fmt.Sprintf("%v", err)) + t.Error(fmt.Sprintf("%+v", err)) return } diff --git a/types.go b/types.go index 5247df0e..6dbeeb23 100644 --- a/types.go +++ b/types.go @@ -106,7 +106,6 @@ const ( typeCodeSASLResponse amqpType = 0x43 typeCodeSASLOutcome amqpType = 0x44 - // Lifetime Policies typeCodeDeleteOnClose amqpType = 0x2b typeCodeDeleteOnNoLinks amqpType = 0x2c typeCodeDeleteOnNoMessages amqpType = 0x2d @@ -761,6 +760,24 @@ func (s *source) unmarshal(r reader) error { }...) } +func (s source) String() string { + return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+ + "Dynamic: %t, DynamicNodeProperties: %v, DistributionMode: %s, Filter: %v, DefaultOutcome: %v"+ + "Outcomes: %v, Capabilities: %v}", + s.Address, + s.Durable, + s.ExpiryPolicy, + s.Timeout, + s.Dynamic, + s.DynamicNodeProperties, + s.DistributionMode, + s.Filter, + s.DefaultOutcome, + s.Outcomes, + s.Capabilities, + ) +} + /* @@ -882,6 +899,19 @@ func (t *target) unmarshal(r reader) error { }...) } +func (t target) String() string { + return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+ + "Dynamic: %t, DynamicNodeProperties: %v, Capabilities: %v}", + t.Address, + t.Durable, + t.ExpiryPolicy, + t.Timeout, + t.Dynamic, + t.DynamicNodeProperties, + t.Capabilities, + ) +} + /* @@ -2263,12 +2293,16 @@ func (s symbol) marshal(wr writer) error { var err error switch { - // List8 + // Sym8 case l < 256: _, err = wr.Write(append([]byte{byte(typeCodeSym8), byte(l)}, []byte(s)...)) - // List32 + // Sym32 case l < math.MaxUint32: + err = wr.WriteByte(uint8(typeCodeSym32)) + if err != nil { + return err + } err = binary.Write(wr, binary.BigEndian, uint32(l)) if err != nil { return err @@ -2545,3 +2579,40 @@ func (m *ReceiverSettleMode) unmarshal(r reader) error { _, err := unmarshal(r, (*uint8)(m)) return err } + +type describedType struct { + descriptor interface{} + value interface{} +} + +func (t describedType) marshal(wr writer) error { + err := wr.WriteByte(0x0) // descriptor constructor + if err != nil { + return err + } + err = marshal(wr, t.descriptor) + if err != nil { + return err + } + return marshal(wr, t.value) +} + +func (t *describedType) unmarshal(r reader) error { + b, err := r.ReadByte() + if b != 0x0 { + return errorErrorf("invalid described type header %0x", b) + } + _, err = unmarshal(r, &t.descriptor) + if err != nil { + return err + } + _, err = unmarshal(r, &t.value) + return err +} + +func (t describedType) String() string { + return fmt.Sprintf("describedType{descriptor: %v, value: %v}", + t.descriptor, + t.value, + ) +}