From 41f48683e8037129fcf3204c77ba89bc01ab267c Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Sat, 20 Jan 2018 15:41:22 -0800 Subject: [PATCH] Reduce marshaling allocations --- decode.go | 129 ------------------ encode.go | 249 ++++++++++++++++++++++++++++------ types.go | 391 +++++++++++++++++++++++++++++++++++------------------- 3 files changed, 464 insertions(+), 305 deletions(-) diff --git a/decode.go b/decode.go index 71ed13e7..542b56c8 100644 --- a/decode.go +++ b/decode.go @@ -360,50 +360,6 @@ type unmarshalField struct { // is null. type nullHandler func() error -// required returns a nullHandler that will cause an error to -// be returned if the field is null. -func required(name string) nullHandler { - return func() error { - return errorNew(name + " is required") - } -} - -// defaultUint32 returns a nullHandler that sets n to defaultValue -// if the field is null. -func defaultUint32(n *uint32, defaultValue uint32) nullHandler { - return func() error { - *n = defaultValue - return nil - } -} - -// defaultUint16 returns a nullHandler that sets n to defaultValue -// if the field is null. -func defaultUint16(n *uint16, defaultValue uint16) nullHandler { - return func() error { - *n = defaultValue - return nil - } -} - -// defaultUint8 returns a nullHandler that sets n to defaultValue -// if the field is null. -func defaultUint8(n *uint8, defaultValue uint8) nullHandler { - return func() error { - *n = defaultValue - return nil - } -} - -// defaultSymbol returns a nullHandler that sets s to defaultValue -// if the field is null. -func defaultSymbol(s *symbol, defaultValue symbol) nullHandler { - return func() error { - *s = defaultValue - return nil - } -} - // readCompositeHeader reads and consumes the composite header from r. // // If the composite is null, errNull will be returned. @@ -1047,51 +1003,6 @@ func readUint(r reader) (value uint64, _ error) { } } -type limitReader struct { - reader - limit uint32 - read uint32 -} - -var errLimitReached = errorNew("limit reached") - -func (r *limitReader) Read(p []byte) (int, error) { - if r.read >= r.limit { - return 0, errLimitReached - } - n, err := r.reader.Read(p) - r.read += uint32(n) - return n, err -} - -type mapReader struct { - r *limitReader - count int // elements (2 * # of pairs) - read int -} - -func (mr *mapReader) pairs() int { - return mr.count / 2 -} - -func (mr *mapReader) more() bool { - return mr.read < mr.count -} - -func (mr *mapReader) next(key, value interface{}) error { - _, err := unmarshal(mr.r, key) - if err != nil { - return err - } - mr.read++ - _, err = unmarshal(mr.r, value) - if err != nil { - return err - } - mr.read++ - return nil -} - func readMapHeader(r reader) (size uint32, count uint8, _ error) { b, err := r.ReadByte() if err != nil { @@ -1123,43 +1034,3 @@ func readMapHeader(r reader) (size uint32, count uint8, _ error) { count, err = r.ReadByte() return size, count, err } - -func newMapReader(r reader) (*mapReader, error) { - b, err := r.ReadByte() - if err != nil { - return nil, err - } - - var n uint32 - switch amqpType(b) { - case typeCodeNull: - return nil, errNull - case typeCodeMap8: - bn, err := r.ReadByte() - if err != nil { - return nil, err - } - n = uint32(bn) - case typeCodeMap32: - if r.Len() < 4 { - return nil, errorNew("invalid length for type map32") - } - n = binary.BigEndian.Uint32(r.Next(4)) - default: - return nil, errorErrorf("invalid map type %x", b) - } - - if uint64(n) > uint64(r.Len()) { - return nil, errInvalidLength - } - - b, err = r.ReadByte() - if err != nil { - return nil, err - } - - return &mapReader{ - r: &limitReader{reader: r, limit: n}, - count: int(b), - }, nil -} diff --git a/encode.go b/encode.go index 184d3d08..7cf806b3 100644 --- a/encode.go +++ b/encode.go @@ -21,6 +21,7 @@ var ( type writer interface { io.Writer io.ByteWriter + WriteString(s string) (n int, err error) } // bufPool is used to reduce allocations when encoding. @@ -79,13 +80,26 @@ func marshal(wr writer, i interface{}) error { } else { err = wr.WriteByte(byte(typeCodeBoolFalse)) } + case *bool: + if *t { + err = wr.WriteByte(byte(typeCodeBoolTrue)) + } else { + err = wr.WriteByte(byte(typeCodeBoolFalse)) + } case uint: if intSize == 64 { return writeUint64(wr, uint64(t)) } return writeUint32(wr, uint32(t)) + case *uint: + if intSize == 64 { + return writeUint64(wr, uint64(*t)) + } + return writeUint32(wr, uint32(*t)) case uint64: return writeUint64(wr, t) + case *uint64: + return writeUint64(wr, *t) case uint32: return writeUint32(wr, t) case *uint32: @@ -99,48 +113,107 @@ func marshal(wr writer, i interface{}) error { if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, t) + tmp := make([]byte, 2) + binary.BigEndian.PutUint16(tmp, t) + _, err = wr.Write(tmp) + case *uint16: + err = wr.WriteByte(byte(typeCodeUshort)) + if err != nil { + return err + } + tmp := make([]byte, 2) + binary.BigEndian.PutUint16(tmp, *t) + _, err = wr.Write(tmp) case uint8: - _, err = wr.Write([]byte{byte(typeCodeUbyte), t}) + err = wr.WriteByte(byte(typeCodeUbyte)) + if err != nil { + return err + } + err = wr.WriteByte(t) case *uint8: - _, err = wr.Write([]byte{byte(typeCodeUbyte), *t}) + err = wr.WriteByte(byte(typeCodeUbyte)) + if err != nil { + return err + } + err = wr.WriteByte(*t) case int: if intSize == 64 { return writeInt64(wr, int64(t)) } return writeInt32(wr, int32(t)) + case *int: + if intSize == 64 { + return writeInt64(wr, int64(*t)) + } + return writeInt32(wr, int32(*t)) case int8: err = wr.WriteByte(byte(typeCodeByte)) if err != nil { return err } - binary.Write(wr, binary.BigEndian, t) + err = wr.WriteByte(uint8(t)) + case *int8: + err = wr.WriteByte(byte(typeCodeByte)) + if err != nil { + return err + } + err = wr.WriteByte(uint8(*t)) case int16: err = wr.WriteByte(byte(typeCodeShort)) if err != nil { return err } - binary.Write(wr, binary.BigEndian, t) - case int64: - return writeInt64(wr, t) + tmp := make([]byte, 2) + binary.BigEndian.PutUint16(tmp, uint16(t)) + _, err = wr.Write(tmp) + case *int16: + err = wr.WriteByte(byte(typeCodeShort)) + if err != nil { + return err + } + tmp := make([]byte, 2) + binary.BigEndian.PutUint16(tmp, uint16(*t)) + _, err = wr.Write(tmp) case int32: return writeInt32(wr, t) + case *int32: + return writeInt32(wr, *t) + case int64: + return writeInt64(wr, t) + case *int64: + return writeInt64(wr, *t) case []symbol: err = writeSymbolArray(wr, t) + case *[]symbol: + err = writeSymbolArray(wr, *t) case string: err = writeString(wr, t) + case *string: + err = writeString(wr, *t) case []byte: err = writeBinary(wr, t) + case *[]byte: + err = writeBinary(wr, *t) case map[interface{}]interface{}: err = writeMap(wr, t) + case *map[interface{}]interface{}: + err = writeMap(wr, *t) case map[string]interface{}: err = writeMap(wr, t) + case *map[string]interface{}: + err = writeMap(wr, *t) case map[symbol]interface{}: err = writeMap(wr, t) + case *map[symbol]interface{}: + err = writeMap(wr, *t) case unsettled: err = writeMap(wr, t) + case *unsettled: + err = writeMap(wr, *t) case time.Time: err = writeTimestamp(wr, t) + case *time.Time: + err = writeTimestamp(wr, *t) default: return errorErrorf("marshal not implemented for %T", i) } @@ -149,28 +222,41 @@ func marshal(wr writer, i interface{}) error { func writeInt32(wr writer, n int32) error { if n < 128 && n >= -128 { - _, err := wr.Write([]byte{byte(typeCodeSmallint), byte(n)}) - return err + err := wr.WriteByte(byte(typeCodeSmallint)) + if err != nil { + return err + } + return wr.WriteByte(byte(n)) } err := wr.WriteByte(byte(typeCodeInt)) if err != nil { return err } - return binary.Write(wr, binary.BigEndian, n) + + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(n)) + _, err = wr.Write(tmp) + return err } func writeInt64(wr writer, n int64) error { if n < 128 && n >= -128 { - _, err := wr.Write([]byte{byte(typeCodeSmalllong), byte(n)}) - return err + err := wr.WriteByte(byte(typeCodeSmalllong)) + if err != nil { + return err + } + return wr.WriteByte(byte(n)) } err := wr.WriteByte(byte(typeCodeLong)) if err != nil { return err } - return binary.Write(wr, binary.BigEndian, n) + tmp := make([]byte, 8) + binary.BigEndian.PutUint64(tmp, uint64(n)) + _, err = wr.Write(tmp) + return err } func writeUint32(wr writer, n uint32) error { @@ -179,15 +265,22 @@ func writeUint32(wr writer, n uint32) error { } if n < 256 { - _, err := wr.Write([]byte{byte(typeCodeSmallUint), byte(n)}) - return err + err := wr.WriteByte(byte(typeCodeSmallUint)) + if err != nil { + return err + } + return wr.WriteByte(byte(n)) } err := wr.WriteByte(byte(typeCodeUint)) if err != nil { return err } - return binary.Write(wr, binary.BigEndian, n) + + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, n) + _, err = wr.Write(tmp) + return err } func writeUint64(wr writer, n uint64) error { @@ -196,7 +289,11 @@ func writeUint64(wr writer, n uint64) error { } if n < 256 { - _, err := wr.Write([]byte{byte(typeCodeSmallUlong), byte(n)}) + err := wr.WriteByte(byte(typeCodeSmallUlong)) + if err != nil { + return err + } + err = wr.WriteByte(byte(n)) return err } @@ -204,7 +301,11 @@ func writeUint64(wr writer, n uint64) error { if err != nil { return err } - return binary.Write(wr, binary.BigEndian, n) + + tmp := make([]byte, 8) + binary.BigEndian.PutUint64(tmp, n) + _, err = wr.Write(tmp) + return err } func writeTimestamp(wr writer, t time.Time) error { @@ -214,13 +315,20 @@ func writeTimestamp(wr writer, t time.Time) error { } ms := t.UnixNano() / int64(time.Millisecond) - return binary.Write(wr, binary.BigEndian, ms) + tmp := make([]byte, 8) + binary.BigEndian.PutUint64(tmp, uint64(ms)) + _, err = wr.Write(tmp) + return err } // marshalField is a field to be marshaled type marshalField struct { - value interface{} - omit bool // indicates that this field should be omitted (set to null) + value interface{} // value to be marshaled, use pointers to avoid interface conversion overhead + omit bool // indicates that this field should be omitted (set to null) +} +type marshalField2 struct { + marshal func(wr writer) error + omit bool // indicates that this field should be omitted (set to null) } // marshalComposite is a helper for us in a composite's marshal() function. @@ -244,7 +352,19 @@ func marshalComposite(wr writer, code amqpType, fields ...marshalField) error { // write header only if lastSetIdx == -1 { - _, err := wr.Write([]byte{0x0, byte(typeCodeSmallUlong), byte(code), byte(typeCodeList0)}) + err := wr.WriteByte(0x0) + if err != nil { + return err + } + err = wr.WriteByte(byte(typeCodeSmallUlong)) + if err != nil { + return err + } + err = wr.WriteByte(byte(code)) + if err != nil { + return err + } + err = wr.WriteByte(byte(typeCodeList0)) return err } @@ -281,8 +401,15 @@ func marshalComposite(wr writer, code amqpType, fields ...marshalField) error { } func writeDescriptor(wr writer, code amqpType) error { - _, err := wr.Write([]byte{0x0, byte(typeCodeSmallUlong), uint8(code)}) - return err + err := wr.WriteByte(0x0) + if err != nil { + return err + } + err = wr.WriteByte(byte(typeCodeSmallUlong)) + if err != nil { + return err + } + return wr.WriteByte(byte(code)) } func writeSymbolArray(wr writer, symbols []symbol) error { @@ -336,14 +463,16 @@ func writeSymbolType(wr writer, sym symbol, typ amqpType) error { return err } case typeCodeSym32: - err := binary.Write(wr, binary.BigEndian, uint32(l)) + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(l)) + _, err := wr.Write(tmp) if err != nil { return err } default: return errorNew("invalid symbol type") } - _, err := wr.Write([]byte(sym)) + _, err := wr.WriteString(string(sym)) return err } @@ -356,7 +485,15 @@ func writeString(wr writer, str string) error { switch { // Str8 case l < 256: - _, err := wr.Write(append([]byte{byte(typeCodeStr8), uint8(l)}, []byte(str)...)) + err := wr.WriteByte(byte(typeCodeStr8)) + if err != nil { + return err + } + err = wr.WriteByte(byte(l)) + if err != nil { + return err + } + _, err = wr.WriteString(str) return err // Str32 @@ -365,11 +502,15 @@ func writeString(wr writer, str string) error { if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, uint32(l)) + + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(l)) + _, err = wr.Write(tmp) if err != nil { return err } - _, err = wr.Write([]byte(str)) + + _, err = wr.WriteString(str) return err default: @@ -383,7 +524,15 @@ func writeBinary(wr writer, bin []byte) error { switch { // List8 case l < 256: - _, err := wr.Write(append([]byte{byte(typeCodeVbin8), uint8(l)}, bin...)) + err := wr.WriteByte(byte(typeCodeVbin8)) + if err != nil { + return err + } + err = wr.WriteByte(uint8(l)) + if err != nil { + return err + } + _, err = wr.Write(bin) return err // List32 @@ -392,10 +541,14 @@ func writeBinary(wr writer, bin []byte) error { if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, uint32(l)) + + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(l)) + _, err = wr.Write(tmp) if err != nil { return err } + _, err = wr.Write(bin) return err @@ -429,7 +582,15 @@ func writeSlice(wr writer, isArray bool, of amqpType, numFields int, size int) e // list8 case numFields < 256 && size < 256: - _, err := wr.Write([]byte{byte(size8), uint8(size + 1), uint8(numFields)}) + err := wr.WriteByte(byte(size8)) + if err != nil { + return err + } + err = wr.WriteByte(byte(size + 1)) + if err != nil { + return err + } + err = wr.WriteByte(byte(numFields)) if err != nil { return err } @@ -440,11 +601,15 @@ func writeSlice(wr writer, isArray bool, of amqpType, numFields int, size int) e if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, uint32(size+4)) + + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(size+4)) + _, err = wr.Write(tmp) if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, uint32(numFields)) + binary.BigEndian.PutUint32(tmp, uint32(numFields)) + _, err = wr.Write(tmp) if err != nil { return err } @@ -483,7 +648,7 @@ func writeMap(wr writer, m interface{}) error { case map[string]interface{}: length = len(m) for key, val := range m { - err := marshal(buf, key) + err := amqpString(key).marshal(buf) if err != nil { return err } @@ -495,7 +660,7 @@ func writeMap(wr writer, m interface{}) error { case map[symbol]interface{}: length = len(m) for key, val := range m { - err := marshal(buf, key) + err := key.marshal(buf) if err != nil { return err } @@ -507,7 +672,7 @@ func writeMap(wr writer, m interface{}) error { case unsettled: length = len(m) for key, val := range m { - err := marshal(buf, key) + err := amqpString(key).marshal(buf) if err != nil { return err } @@ -528,7 +693,11 @@ func writeMap(wr writer, m interface{}) error { l := buf.Len() + 1 // +1 for pairs byte switch { case l < 256: - _, err := wr.Write([]byte{byte(typeCodeMap8), byte(l)}) + err := wr.WriteByte(byte(typeCodeMap8)) + if err != nil { + return err + } + err = wr.WriteByte(byte(l)) if err != nil { return err } @@ -537,7 +706,9 @@ func writeMap(wr writer, m interface{}) error { if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, uint32(l)) + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(l)) + _, err = wr.Write(tmp) if err != nil { return err } diff --git a/types.go b/types.go index dd066148..10b851be 100644 --- a/types.go +++ b/types.go @@ -8,6 +8,7 @@ import ( "reflect" "strconv" "time" + "unicode/utf8" ) type amqpType uint8 @@ -205,15 +206,15 @@ func (o *performOpen) link() (uint32, bool) { func (o *performOpen) marshal(wr writer) error { return marshalComposite(wr, typeCodeOpen, []marshalField{ - {value: o.ContainerID, omit: false}, - {value: o.Hostname, omit: o.Hostname == ""}, - {value: o.MaxFrameSize, omit: o.MaxFrameSize == 0}, - {value: o.ChannelMax, omit: o.ChannelMax == 0}, - {value: milliseconds(o.IdleTimeout), omit: o.IdleTimeout == 0}, - {value: o.OutgoingLocales, omit: len(o.OutgoingLocales) == 0}, - {value: o.IncomingLocales, omit: len(o.IncomingLocales) == 0}, - {value: o.OfferedCapabilities, omit: len(o.OfferedCapabilities) == 0}, - {value: o.DesiredCapabilities, omit: len(o.DesiredCapabilities) == 0}, + {value: &o.ContainerID, omit: false}, + {value: &o.Hostname, omit: o.Hostname == ""}, + {value: &o.MaxFrameSize, omit: o.MaxFrameSize == 0}, + {value: &o.ChannelMax, omit: o.ChannelMax == 0}, + {value: (*milliseconds)(&o.IdleTimeout), omit: o.IdleTimeout == 0}, + {value: &o.OutgoingLocales, omit: len(o.OutgoingLocales) == 0}, + {value: &o.IncomingLocales, omit: len(o.IncomingLocales) == 0}, + {value: &o.OfferedCapabilities, omit: len(o.OfferedCapabilities) == 0}, + {value: &o.DesiredCapabilities, omit: len(o.DesiredCapabilities) == 0}, {value: o.Properties, omit: len(o.Properties) == 0}, }...) } @@ -305,13 +306,13 @@ func (b *performBegin) link() (uint32, bool) { func (b *performBegin) marshal(wr writer) error { return marshalComposite(wr, typeCodeBegin, []marshalField{ - {value: b.RemoteChannel, omit: b.RemoteChannel == 0}, - {value: b.NextOutgoingID, omit: false}, - {value: b.IncomingWindow, omit: false}, - {value: b.OutgoingWindow, omit: false}, - {value: b.HandleMax, omit: b.HandleMax == 0}, - {value: b.OfferedCapabilities, omit: len(b.OfferedCapabilities) == 0}, - {value: b.DesiredCapabilities, omit: len(b.DesiredCapabilities) == 0}, + {value: &b.RemoteChannel, omit: b.RemoteChannel == 0}, + {value: &b.NextOutgoingID, omit: false}, + {value: &b.IncomingWindow, omit: false}, + {value: &b.OutgoingWindow, omit: false}, + {value: &b.HandleMax, omit: b.HandleMax == 0}, + {value: &b.OfferedCapabilities, omit: len(b.OfferedCapabilities) == 0}, + {value: &b.DesiredCapabilities, omit: len(b.DesiredCapabilities) == 0}, {value: b.Properties, omit: b.Properties == nil}, }...) } @@ -319,10 +320,10 @@ func (b *performBegin) marshal(wr writer) error { func (b *performBegin) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeBegin, []unmarshalField{ {field: &b.RemoteChannel}, - {field: &b.NextOutgoingID, handleNull: required("Begin.NextOutgoingID")}, - {field: &b.IncomingWindow, handleNull: required("Begin.IncomingWindow")}, - {field: &b.OutgoingWindow, handleNull: required("Begin.OutgoingWindow")}, - {field: &b.HandleMax, handleNull: defaultUint32(&b.HandleMax, 4294967295)}, + {field: &b.NextOutgoingID, handleNull: func() error { return errorNew("Begin.NextOutgoingID is required") }}, + {field: &b.IncomingWindow, handleNull: func() error { return errorNew("Begin.IncomingWindow is required") }}, + {field: &b.OutgoingWindow, handleNull: func() error { return errorNew("Begin.OutgoingWindow is required") }}, + {field: &b.HandleMax, handleNull: func() error { b.HandleMax = 4294967295; return nil }}, {field: &b.OfferedCapabilities}, {field: &b.DesiredCapabilities}, {field: &b.Properties}, @@ -506,28 +507,28 @@ func (a *performAttach) link() (uint32, bool) { func (a *performAttach) marshal(wr writer) error { return marshalComposite(wr, typeCodeAttach, []marshalField{ - {value: a.Name, omit: false}, - {value: a.Handle, omit: false}, - {value: a.Role, omit: false}, + {value: &a.Name, omit: false}, + {value: &a.Handle, omit: false}, + {value: &a.Role, omit: false}, {value: a.SenderSettleMode, omit: a.SenderSettleMode == nil}, {value: a.ReceiverSettleMode, omit: a.ReceiverSettleMode == nil}, {value: a.Source, omit: a.Source == nil}, {value: a.Target, omit: a.Target == nil}, {value: a.Unsettled, omit: len(a.Unsettled) == 0}, - {value: a.IncompleteUnsettled, omit: !a.IncompleteUnsettled}, - {value: a.InitialDeliveryCount, omit: a.Role == roleReceiver}, - {value: a.MaxMessageSize, omit: a.MaxMessageSize == 0}, - {value: a.OfferedCapabilities, omit: len(a.OfferedCapabilities) == 0}, - {value: a.DesiredCapabilities, omit: len(a.DesiredCapabilities) == 0}, + {value: &a.IncompleteUnsettled, omit: !a.IncompleteUnsettled}, + {value: &a.InitialDeliveryCount, omit: a.Role == roleReceiver}, + {value: &a.MaxMessageSize, omit: a.MaxMessageSize == 0}, + {value: &a.OfferedCapabilities, omit: len(a.OfferedCapabilities) == 0}, + {value: &a.DesiredCapabilities, omit: len(a.DesiredCapabilities) == 0}, {value: a.Properties, omit: len(a.Properties) == 0}, }...) } func (a *performAttach) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeAttach, []unmarshalField{ - {field: &a.Name, handleNull: required("Attach.Name")}, - {field: &a.Handle, handleNull: required("Attach.Handle")}, - {field: &a.Role, handleNull: required("Attach.Role")}, + {field: &a.Name, handleNull: func() error { return errorNew("Attach.Name is required") }}, + {field: &a.Handle, handleNull: func() error { return errorNew("Attach.Handle is required") }}, + {field: &a.Role, handleNull: func() error { return errorNew("Attach.Role is required") }}, {field: &a.SenderSettleMode}, {field: &a.ReceiverSettleMode}, {field: &a.Source}, @@ -574,22 +575,25 @@ func (u unsettled) marshal(wr writer) error { } func (u *unsettled) unmarshal(r reader) error { - mr, err := newMapReader(r) + _, count, err := readMapHeader(r) if err != nil { return err } - mm := make(unsettled, mr.pairs()) - for mr.more() { - var key string + m := make(unsettled, count/2) + for i := uint8(0); i < count; i += 2 { + key, err := readString(r) + if err != nil { + return err + } var value deliveryState - err = mr.next(&key, &value) + _, err = unmarshal(r, &value) if err != nil { return err } - mm[key] = value + m[key] = value } - *u = mm + *u = m return nil } @@ -729,17 +733,17 @@ type source struct { func (s *source) marshal(wr writer) error { return marshalComposite(wr, typeCodeSource, []marshalField{ - {value: s.Address, omit: s.Address == ""}, - {value: s.Durable, omit: s.Durable == 0}, - {value: s.ExpiryPolicy, omit: s.ExpiryPolicy == ""}, - {value: s.Timeout, omit: s.Timeout == 0}, - {value: s.Dynamic, omit: !s.Dynamic}, + {value: &s.Address, omit: s.Address == ""}, + {value: &s.Durable, omit: s.Durable == 0}, + {value: &s.ExpiryPolicy, omit: s.ExpiryPolicy == ""}, + {value: &s.Timeout, omit: s.Timeout == 0}, + {value: &s.Dynamic, omit: !s.Dynamic}, {value: s.DynamicNodeProperties, omit: len(s.DynamicNodeProperties) == 0}, - {value: s.DistributionMode, omit: s.DistributionMode == ""}, + {value: &s.DistributionMode, omit: s.DistributionMode == ""}, {value: s.Filter, omit: len(s.Filter) == 0}, {value: s.DefaultOutcome, omit: s.DefaultOutcome == nil}, - {value: s.Outcomes, omit: len(s.Outcomes) == 0}, - {value: s.Capabilities, omit: len(s.Capabilities) == 0}, + {value: &s.Outcomes, omit: len(s.Outcomes) == 0}, + {value: &s.Capabilities, omit: len(s.Capabilities) == 0}, }...) } @@ -747,7 +751,7 @@ func (s *source) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeSource, []unmarshalField{ {field: &s.Address}, {field: &s.Durable}, - {field: &s.ExpiryPolicy, handleNull: defaultSymbol(&s.ExpiryPolicy, "session-end")}, + {field: &s.ExpiryPolicy, handleNull: func() error { s.ExpiryPolicy = "session-end"; return nil }}, {field: &s.Timeout}, {field: &s.Dynamic}, {field: &s.DynamicNodeProperties}, @@ -876,13 +880,13 @@ type target struct { func (t *target) marshal(wr writer) error { return marshalComposite(wr, typeCodeTarget, []marshalField{ - {value: t.Address, omit: t.Address == ""}, - {value: t.Durable, omit: t.Durable == 0}, - {value: t.ExpiryPolicy, omit: t.ExpiryPolicy == ""}, - {value: t.Timeout, omit: t.Timeout == 0}, - {value: t.Dynamic, omit: !t.Dynamic}, + {value: &t.Address, omit: t.Address == ""}, + {value: &t.Durable, omit: t.Durable == 0}, + {value: &t.ExpiryPolicy, omit: t.ExpiryPolicy == ""}, + {value: &t.Timeout, omit: t.Timeout == 0}, + {value: &t.Dynamic, omit: !t.Dynamic}, {value: t.DynamicNodeProperties, omit: len(t.DynamicNodeProperties) == 0}, - {value: t.Capabilities, omit: len(t.Capabilities) == 0}, + {value: &t.Capabilities, omit: len(t.Capabilities) == 0}, }...) } @@ -890,7 +894,7 @@ func (t *target) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeTarget, []unmarshalField{ {field: &t.Address}, {field: &t.Durable}, - {field: &t.ExpiryPolicy, handleNull: defaultSymbol(&t.ExpiryPolicy, "session-end")}, + {field: &t.ExpiryPolicy, handleNull: func() error { t.ExpiryPolicy = "session-end"; return nil }}, {field: &t.Timeout}, {field: &t.Dynamic}, {field: &t.DynamicNodeProperties}, @@ -1060,15 +1064,15 @@ func (f *performFlow) link() (uint32, bool) { func (f *performFlow) marshal(wr writer) error { return marshalComposite(wr, typeCodeFlow, []marshalField{ {value: f.NextIncomingID, omit: f.NextIncomingID == nil}, - {value: f.IncomingWindow, omit: false}, - {value: f.NextOutgoingID, omit: false}, - {value: f.OutgoingWindow, omit: false}, + {value: &f.IncomingWindow, omit: false}, + {value: &f.NextOutgoingID, omit: false}, + {value: &f.OutgoingWindow, omit: false}, {value: f.Handle, omit: f.Handle == nil}, {value: f.DeliveryCount, omit: f.DeliveryCount == nil}, {value: f.LinkCredit, omit: f.LinkCredit == nil}, {value: f.Available, omit: f.Available == nil}, - {value: f.Drain, omit: !f.Drain}, - {value: f.Echo, omit: !f.Echo}, + {value: &f.Drain, omit: !f.Drain}, + {value: &f.Echo, omit: !f.Echo}, {value: f.Properties, omit: len(f.Properties) == 0}, }...) } @@ -1076,9 +1080,9 @@ func (f *performFlow) marshal(wr writer) error { func (f *performFlow) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeFlow, []unmarshalField{ {field: &f.NextIncomingID}, - {field: &f.IncomingWindow, handleNull: required("Flow.IncomingWindow")}, - {field: &f.NextOutgoingID, handleNull: required("Flow.NextOutgoingID")}, - {field: &f.OutgoingWindow, handleNull: required("Flow.OutgoingWindow")}, + {field: &f.IncomingWindow, handleNull: func() error { return errorNew("Flow.IncomingWindow is required") }}, + {field: &f.NextOutgoingID, handleNull: func() error { return errorNew("Flow.NextOutgoingID is required") }}, + {field: &f.OutgoingWindow, handleNull: func() error { return errorNew("Flow.OutgoingWindow is required") }}, {field: &f.Handle}, {field: &f.DeliveryCount}, {field: &f.LinkCredit}, @@ -1279,17 +1283,17 @@ func (t *performTransfer) link() (uint32, bool) { func (t *performTransfer) marshal(wr writer) error { err := marshalComposite(wr, typeCodeTransfer, []marshalField{ - {value: t.Handle}, + {value: &t.Handle}, {value: t.DeliveryID, omit: t.DeliveryID == nil}, - {value: t.DeliveryTag, omit: len(t.DeliveryTag) == 0}, + {value: &t.DeliveryTag, omit: len(t.DeliveryTag) == 0}, {value: t.MessageFormat, omit: t.MessageFormat == nil}, - {value: t.Settled, omit: !t.Settled}, - {value: t.More, omit: !t.More}, + {value: &t.Settled, omit: !t.Settled}, + {value: &t.More, omit: !t.More}, {value: t.ReceiverSettleMode, omit: t.ReceiverSettleMode == nil}, {value: t.State, omit: t.State == nil}, - {value: t.Resume, omit: !t.Resume}, - {value: t.Aborted, omit: !t.Aborted}, - {value: t.Batchable, omit: !t.Batchable}, + {value: &t.Resume, omit: !t.Resume}, + {value: &t.Aborted, omit: !t.Aborted}, + {value: &t.Batchable, omit: !t.Batchable}, }...) if err != nil { return err @@ -1301,7 +1305,7 @@ func (t *performTransfer) marshal(wr writer) error { func (t *performTransfer) unmarshal(r reader) error { err := unmarshalComposite(r, typeCodeTransfer, []unmarshalField{ - {field: &t.Handle, handleNull: required("Transfer.Handle")}, + {field: &t.Handle, handleNull: func() error { return errorNew("Transfer.Handle is required") }}, {field: &t.DeliveryID}, {field: &t.DeliveryTag}, {field: &t.MessageFormat}, @@ -1388,19 +1392,19 @@ func (*performDisposition) link() (uint32, bool) { func (d *performDisposition) marshal(wr writer) error { return marshalComposite(wr, typeCodeDisposition, []marshalField{ - {value: d.Role, omit: false}, - {value: d.First, omit: false}, + {value: &d.Role, omit: false}, + {value: &d.First, omit: false}, {value: d.Last, omit: d.Last == nil}, - {value: d.Settled, omit: !d.Settled}, + {value: &d.Settled, omit: !d.Settled}, {value: d.State, omit: d.State == nil}, - {value: d.Batchable, omit: !d.Batchable}, + {value: &d.Batchable, omit: !d.Batchable}, }...) } func (d *performDisposition) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeDisposition, []unmarshalField{ - {field: &d.Role, handleNull: required("Disposition.Role")}, - {field: &d.First, handleNull: required("Disposition.First")}, + {field: &d.Role, handleNull: func() error { return errorNew("Disposition.Role is required") }}, + {field: &d.First, handleNull: func() error { return errorNew("Disposition.Handle is required") }}, {field: &d.Last}, {field: &d.Settled}, {field: &d.State}, @@ -1444,15 +1448,15 @@ func (d *performDetach) link() (uint32, bool) { func (d *performDetach) marshal(wr writer) error { return marshalComposite(wr, typeCodeDetach, []marshalField{ - {value: d.Handle, omit: false}, - {value: d.Closed, omit: !d.Closed}, + {value: &d.Handle, omit: false}, + {value: &d.Closed, omit: !d.Closed}, {value: d.Error, omit: d.Error == nil}, }...) } func (d *performDetach) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeDetach, []unmarshalField{ - {field: &d.Handle, handleNull: required("Detach.Handle")}, + {field: &d.Handle, handleNull: func() error { return errorNew("Detach.Handle is required") }}, {field: &d.Closed}, {field: &d.Error}, }...) @@ -1536,15 +1540,15 @@ type Error struct { func (e *Error) marshal(wr writer) error { return marshalComposite(wr, typeCodeError, []marshalField{ - {value: e.Condition, omit: false}, - {value: e.Description, omit: e.Description == ""}, + {value: &e.Condition, omit: false}, + {value: &e.Description, omit: e.Description == ""}, {value: e.Info, omit: len(e.Info) == 0}, }...) } func (e *Error) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeError, []unmarshalField{ - {field: &e.Condition, handleNull: required("Error.Condition")}, + {field: &e.Condition, handleNull: func() error { return errorNew("Error.Condition is required") }}, {field: &e.Description}, {field: &e.Info}, }...) @@ -1792,7 +1796,7 @@ func (m *Message) marshal(wr writer) error { if err != nil { return err } - err = marshal(wr, m.Data) + err = writeBinary(wr, m.Data) if err != nil { return err } @@ -1899,11 +1903,29 @@ func peekMessageType(buf []byte) (uint8, error) { return 0, errorErrorf("invalid composite header %0x", buf[0]) } - v, err := readUlong(&roReader{b: buf[1:]}) - if err != nil { - return 0, err + // copied from readUlong to avoid allocations + t := amqpType(buf[1]) + if t == typeCodeUlong0 { + return 0, nil + } + + if t == typeCodeSmallUlong { + if len(buf[2:]) == 0 { + errorNew("invalid ulong") + } + return uint8(buf[2]), nil } - return uint8(v), err + + if t != typeCodeUlong { + return 0, errorErrorf("invalid type for uint32 %0x", t) + } + + if len(buf[2:]) < 8 { + return 0, errorNew("invalid ulong") + } + v := binary.BigEndian.Uint64(buf[2:10]) + + return uint8(v), nil } func checkNull(buf []byte) bool { @@ -1933,18 +1955,18 @@ type MessageHeader struct { func (h *MessageHeader) marshal(wr writer) error { return marshalComposite(wr, typeCodeMessageHeader, []marshalField{ - {value: h.Durable, omit: !h.Durable}, - {value: h.Priority, omit: h.Priority == 4}, - {value: milliseconds(h.TTL), omit: h.TTL == 0}, - {value: h.FirstAcquirer, omit: !h.FirstAcquirer}, - {value: h.DeliveryCount, omit: h.DeliveryCount == 0}, + {value: &h.Durable, omit: !h.Durable}, + {value: &h.Priority, omit: h.Priority == 4}, + {value: (*milliseconds)(&h.TTL), omit: h.TTL == 0}, + {value: &h.FirstAcquirer, omit: !h.FirstAcquirer}, + {value: &h.DeliveryCount, omit: h.DeliveryCount == 0}, }...) } func (h *MessageHeader) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeMessageHeader, []unmarshalField{ {field: &h.Durable}, - {field: &h.Priority, handleNull: defaultUint8(&h.Priority, 4)}, + {field: &h.Priority, handleNull: func() error { h.Priority = 4; return nil }}, {field: (*milliseconds)(&h.TTL)}, {field: &h.FirstAcquirer}, {field: &h.DeliveryCount}, @@ -1992,18 +2014,18 @@ type MessageProperties struct { func (p *MessageProperties) marshal(wr writer) error { return marshalComposite(wr, typeCodeMessageProperties, []marshalField{ {value: p.MessageID, omit: p.MessageID == nil}, - {value: p.UserID, omit: len(p.UserID) == 0}, - {value: p.To, omit: p.To == ""}, - {value: p.Subject, omit: p.Subject == ""}, - {value: p.ReplyTo, omit: p.ReplyTo == ""}, + {value: &p.UserID, omit: len(p.UserID) == 0}, + {value: &p.To, omit: p.To == ""}, + {value: &p.Subject, omit: p.Subject == ""}, + {value: &p.ReplyTo, omit: p.ReplyTo == ""}, {value: p.CorrelationID, omit: p.CorrelationID == nil}, - {value: symbol(p.ContentType), omit: p.ContentType == ""}, - {value: symbol(p.ContentEncoding), omit: p.ContentEncoding == ""}, - {value: p.AbsoluteExpiryTime, omit: p.AbsoluteExpiryTime.IsZero()}, - {value: p.CreationTime, omit: p.CreationTime.IsZero()}, - {value: p.GroupID, omit: p.GroupID == ""}, - {value: p.GroupSequence}, - {value: p.ReplyToGroupID, omit: p.ReplyToGroupID == ""}, + {value: (*symbol)(&p.ContentType), omit: p.ContentType == ""}, + {value: (*symbol)(&p.ContentEncoding), omit: p.ContentEncoding == ""}, + {value: &p.AbsoluteExpiryTime, omit: p.AbsoluteExpiryTime.IsZero()}, + {value: &p.CreationTime, omit: p.CreationTime.IsZero()}, + {value: &p.GroupID, omit: p.GroupID == ""}, + {value: &p.GroupSequence}, + {value: &p.ReplyToGroupID, omit: p.ReplyToGroupID == ""}, }...) } @@ -2060,15 +2082,15 @@ type stateReceived struct { func (sr *stateReceived) marshal(wr writer) error { return marshalComposite(wr, typeCodeStateReceived, []marshalField{ - {value: sr.SectionNumber, omit: false}, - {value: sr.SectionOffset, omit: false}, + {value: &sr.SectionNumber, omit: false}, + {value: &sr.SectionOffset, omit: false}, }...) } func (sr *stateReceived) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeStateReceived, []unmarshalField{ - {field: &sr.SectionNumber, handleNull: required("StateReceived.SectionNumber")}, - {field: &sr.SectionOffset, handleNull: required("StateReceived.SectionOffset")}, + {field: &sr.SectionNumber, handleNull: func() error { return errorNew("StateReceiver.SectionNumber is required") }}, + {field: &sr.SectionOffset, handleNull: func() error { return errorNew("StateReceiver.SectionOffset is required") }}, }...) } @@ -2173,8 +2195,8 @@ type stateModified struct { func (sm *stateModified) marshal(wr writer) error { return marshalComposite(wr, typeCodeStateModified, []marshalField{ - {value: sm.DeliveryFailed, omit: !sm.DeliveryFailed}, - {value: sm.UndeliverableHere, omit: !sm.UndeliverableHere}, + {value: &sm.DeliveryFailed, omit: !sm.DeliveryFailed}, + {value: &sm.UndeliverableHere, omit: !sm.UndeliverableHere}, {value: sm.MessageAnnotations, omit: sm.MessageAnnotations == nil}, }...) } @@ -2212,15 +2234,15 @@ func (si *saslInit) link() (uint32, bool) { func (si *saslInit) marshal(wr writer) error { return marshalComposite(wr, typeCodeSASLInit, []marshalField{ - {value: si.Mechanism, omit: false}, - {value: si.InitialResponse, omit: len(si.InitialResponse) == 0}, - {value: si.Hostname, omit: len(si.Hostname) == 0}, + {value: &si.Mechanism, omit: false}, + {value: &si.InitialResponse, omit: len(si.InitialResponse) == 0}, + {value: &si.Hostname, omit: len(si.Hostname) == 0}, }...) } func (si *saslInit) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeSASLInit, []unmarshalField{ - {field: &si.Mechanism, handleNull: required("saslInit.Mechanism")}, + {field: &si.Mechanism, handleNull: func() error { return errorNew("saslInit.Mechanism is required") }}, {field: &si.InitialResponse}, {field: &si.Hostname}, }...) @@ -2239,13 +2261,13 @@ type saslMechanisms struct { func (sm saslMechanisms) marshal(wr writer) error { return marshalComposite(wr, typeCodeSASLMechanism, []marshalField{ - {value: sm.Mechanisms, omit: false}, + {value: &sm.Mechanisms, omit: false}, }...) } func (sm *saslMechanisms) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeSASLMechanism, - unmarshalField{field: &sm.Mechanisms, handleNull: required("SASLMechanisms.Mechanisms")}, + unmarshalField{field: &sm.Mechanisms, handleNull: func() error { return errorNew("saslMechanisms.Mechanisms is required") }}, ) } @@ -2268,14 +2290,14 @@ type saslOutcome struct { func (so saslOutcome) marshal(wr writer) error { return marshalComposite(wr, typeCodeSASLOutcome, []marshalField{ - {value: so.Code, omit: false}, - {value: so.AdditionalData, omit: len(so.AdditionalData) == 0}, + {value: &so.Code, omit: false}, + {value: &so.AdditionalData, omit: len(so.AdditionalData) == 0}, }...) } func (so *saslOutcome) unmarshal(r reader) error { return unmarshalComposite(r, typeCodeSASLOutcome, []unmarshalField{ - {field: &so.Code, handleNull: required("SASLOutcome.Code")}, + {field: &so.Code, handleNull: func() error { return errorNew("saslOutcome.AdditionalData is required") }}, {field: &so.AdditionalData}, }...) } @@ -2284,6 +2306,76 @@ func (*saslOutcome) link() (uint32, bool) { return 0, false } +type symbolSlice []symbol + +func (s symbolSlice) marshal(wr writer) error { + return writeSymbolArray(wr, s) +} + +type amqpUint16 uint16 + +func (n amqpUint16) marshal(wr writer) error { + err := wr.WriteByte(byte(typeCodeUshort)) + if err != nil { + return err + } + tmp := make([]byte, 2) + binary.BigEndian.PutUint16(tmp, uint16(n)) + _, err = wr.Write(tmp) + return err +} + +type amqpUint32 uint32 + +func (n amqpUint32) marshal(wr writer) error { + return writeUint32(wr, uint32(n)) +} + +// symbol is an AMQP symbolic string. +type amqpString string + +func (s amqpString) marshal(wr writer) error { + if !utf8.ValidString(string(s)) { + return errorNew("not a valid UTF-8 string") + } + l := len(s) + + switch { + // Str8 + case l < 256: + err := wr.WriteByte(byte(typeCodeStr8)) + if err != nil { + return err + } + err = wr.WriteByte(byte(l)) + if err != nil { + return err + } + _, err = wr.WriteString(string(s)) + return err + + // Str32 + case l < math.MaxUint32: + err := wr.WriteByte(byte(typeCodeStr32)) + if err != nil { + return err + } + + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(l)) + _, err = wr.Write(tmp) + if err != nil { + return err + } + + _, err = wr.WriteString(string(s)) + return err + + default: + return errorNew("too long") + } +} + // symbol is an AMQP symbolic string. type symbol string @@ -2294,7 +2386,15 @@ func (s symbol) marshal(wr writer) error { switch { // Sym8 case l < 256: - _, err = wr.Write(append([]byte{byte(typeCodeSym8), byte(l)}, []byte(s)...)) + err = wr.WriteByte(byte(typeCodeSym8)) + if err != nil { + return err + } + err = wr.WriteByte(byte(l)) + if err != nil { + return err + } + _, err = wr.WriteString(string(s)) // Sym32 case l < math.MaxUint32: @@ -2302,11 +2402,13 @@ func (s symbol) marshal(wr writer) error { if err != nil { return err } - err = binary.Write(wr, binary.BigEndian, uint32(l)) + tmp := make([]byte, 4) + binary.BigEndian.PutUint32(tmp, uint32(l)) + _, err := wr.Write(tmp) if err != nil { return err } - _, err = wr.Write([]byte(s)) + _, err = wr.WriteString(string(s)) default: return errorNew("too long") } @@ -2335,16 +2437,18 @@ func (m mapAnyAny) marshal(wr writer) error { } func (m *mapAnyAny) unmarshal(r reader) error { - mr, err := newMapReader(r) + _, count, err := readMapHeader(r) if err != nil { return err } - mm := make(mapAnyAny, mr.pairs()) - for mr.more() { - var key interface{} - var value interface{} - err = mr.next(&key, &value) + mm := make(mapAnyAny, count/2) + for i := uint8(0); i < count; i += 2 { + key, err := readAny(r) + if err != nil { + return err + } + value, err := readAny(r) if err != nil { return err } @@ -2361,7 +2465,6 @@ func (m *mapAnyAny) unmarshal(r reader) error { mm[key] = value } *m = mm - return nil } @@ -2373,22 +2476,25 @@ func (m mapStringAny) marshal(wr writer) error { } func (m *mapStringAny) unmarshal(r reader) error { - mr, err := newMapReader(r) + _, count, err := readMapHeader(r) if err != nil { return err } - mm := make(mapStringAny, mr.pairs()) - for mr.more() { - var key string - var value interface{} - err = mr.next(&key, &value) + mm := make(mapStringAny, count/2) + for i := uint8(0); i < count; i += 2 { + key, err := readString(r) + if err != nil { + return err + } + value, err := readAny(r) if err != nil { return err } mm[key] = value } *m = mm + return nil } @@ -2464,8 +2570,19 @@ const ( ) func (p lifetimePolicy) marshal(wr writer) error { - _, err := wr.Write([]byte{0x0, byte(typeCodeSmallUlong), byte(p), byte(typeCodeList0)}) - return err + err := wr.WriteByte(byte(0x0)) + if err != nil { + return err + } + err = wr.WriteByte(byte(typeCodeSmallUlong)) + if err != nil { + return err + } + err = wr.WriteByte(byte(p)) + if err != nil { + return err + } + return wr.WriteByte(byte(typeCodeList0)) } func (p *lifetimePolicy) unmarshal(r reader) error {