Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snappy Framing to the Encoder #5172

Merged
merged 10 commits into from
Mar 23, 2020
80 changes: 59 additions & 21 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package encoder

import (
"bytes"
"fmt"
"io"

Expand All @@ -18,26 +19,21 @@ type SszNetworkEncoder struct {
}

func (e SszNetworkEncoder) doEncode(msg interface{}) ([]byte, error) {
b, err := ssz.Marshal(msg)
if err != nil {
return nil, err
}
if e.UseSnappyCompression {
b = snappy.Encode(nil /*dst*/, b)
}
return b, nil
return ssz.Marshal(msg)
}

// Encode the proto message to the io.Writer.
func (e SszNetworkEncoder) Encode(w io.Writer, msg interface{}) (int, error) {
if msg == nil {
return 0, nil
}

b, err := e.doEncode(msg)
if err != nil {
return 0, err
}
if e.UseSnappyCompression {
return writeSnappyBuffer(w, b)
}
return w.Write(b)
}

Expand All @@ -51,7 +47,14 @@ func (e SszNetworkEncoder) EncodeWithLength(w io.Writer, msg interface{}) (int,
if err != nil {
return 0, err
}
b = append(proto.EncodeVarint(uint64(len(b))), b...)
// write varint first
_, err = w.Write(proto.EncodeVarint(uint64(len(b))))
if err != nil {
return 0, err
}
if e.UseSnappyCompression {
return writeSnappyBuffer(w, b)
}
return w.Write(b)
}

Expand All @@ -68,21 +71,34 @@ func (e SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg interface{}, max
if uint64(len(b)) > maxSize {
return 0, fmt.Errorf("size of encoded message is %d which is larger than the provided max limit of %d", len(b), maxSize)
}
b = append(proto.EncodeVarint(uint64(len(b))), b...)
// write varint first
_, err = w.Write(proto.EncodeVarint(uint64(len(b))))
if err != nil {
return 0, err
}
if e.UseSnappyCompression {
return writeSnappyBuffer(w, b)
}
return w.Write(b)
}

func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {
return ssz.Unmarshal(b, to)
}

// Decode the bytes to the protobuf message provided.
func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error {
if e.UseSnappyCompression {
var err error
b, err = snappy.Decode(nil /*dst*/, b)
newBuffer := bytes.NewBuffer(b)
r := snappy.NewReader(newBuffer)
newObj := make([]byte, len(b))
numOfBytes, err := r.Read(newObj)
if err != nil {
return err
}
return e.doDecode(newObj[:numOfBytes], to)
}

return ssz.Unmarshal(b, to)
return e.doDecode(b, to)
}

// DecodeWithLength the bytes from io.Reader to the protobuf message provided.
Expand All @@ -91,12 +107,15 @@ func (e SszNetworkEncoder) DecodeWithLength(r io.Reader, to interface{}) error {
if err != nil {
return err
}
b := make([]byte, msgLen)
_, err = r.Read(b)
if e.UseSnappyCompression {
r = snappy.NewReader(r)
}
b := make([]byte, e.MaxLength(int(msgLen)))
numOfBytes, err := r.Read(b)
if err != nil {
return err
}
return e.Decode(b, to)
return e.doDecode(b[:numOfBytes], to)
}

// DecodeWithMaxLength the bytes from io.Reader to the protobuf message provided.
Expand All @@ -106,15 +125,18 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}, maxS
if err != nil {
return err
}
if e.UseSnappyCompression {
r = snappy.NewReader(r)
}
if msgLen > maxSize {
return fmt.Errorf("size of decoded message is %d which is larger than the provided max limit of %d", msgLen, maxSize)
}
b := make([]byte, msgLen)
_, err = r.Read(b)
b := make([]byte, e.MaxLength(int(msgLen)))
numOfBytes, err := r.Read(b)
if err != nil {
return err
}
return e.Decode(b, to)
return e.doDecode(b[:numOfBytes], to)
}

// ProtocolSuffix returns the appropriate suffix for protocol IDs.
Expand All @@ -124,3 +146,19 @@ func (e SszNetworkEncoder) ProtocolSuffix() string {
}
return "/ssz"
}

// MaxLength specifies the maximum possible length of an encoded
// chunk of data.
func (e SszNetworkEncoder) MaxLength(length int) int {
if e.UseSnappyCompression {
return snappy.MaxEncodedLen(length)
}
return length
}

// Writes a bytes value through a snappy buffered writer.
func writeSnappyBuffer(w io.Writer, b []byte) (int, error) {
bufWriter := snappy.NewBufferedWriter(w)
defer bufWriter.Close()
return bufWriter.Write(b)
}