Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Envelope writing from marshalling #546

Closed
wants to merge 4 commits into from

Conversation

emcfarlane
Copy link
Contributor

@emcfarlane emcfarlane commented Jul 10, 2023

A new API for working with envelope messages. Envelopes are built up within a single buffer. We can now use a single write call and pass a bytes.Buffer as the full payload.

With this change marshallers no longer write to the stream. Control is given to the protocols connection handler. This allows the protocols implementation to easily customize the behaviour. However, it does duplicates some of the flow. Overall I think it's simpler to duplicate control flow into the connection handler to allow for protocol specific features. Changes:

- BenchmarkConnect/unary-8                     583           2035062 ns/op         5448544 B/op        232 allocs/op
+ BenchmarkConnect/unary-8                     580           2054063 ns/op         5190420 B/op        221 allocs/op

Separates marshalling and unmarshalling from writing to the stream.
Conn handlers are responsible for marshal -> compress -> write logic.
Let's Conn handlers use custom logic for buffering on the wire.
@emcfarlane emcfarlane requested a review from akshayjshah July 10, 2023 14:23
@emcfarlane emcfarlane self-assigned this Jul 10, 2023
@emcfarlane emcfarlane marked this pull request as ready for review July 10, 2023 14:32
@@ -364,7 +364,7 @@ func receiveUnaryResponse[T any](conn StreamingClientConn) (*Response[T], error)
// In a well-formed stream, the response message may be followed by a block
// of in-stream trailers or HTTP trailers. To ensure that we receive the
// trailers, try to read another message from the stream.
if err := conn.Receive(new(T)); err == nil {
Copy link
Contributor Author

@emcfarlane emcfarlane Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this extra Receive call with this protocol drain fix: #536

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. That'd also let us make the function non-generic, which would cut binary size a bit.

Copy link
Member

@akshayjshah akshayjshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ... a lot to get through in one sitting 😅 Most important questions:

  • Is the new envelope type the best we can do? (Asked inline on the type.)
  • How can we break this change up into more easily-reviewed chunks? As is, it's really difficult to substantively review.

buffer_pool.go Show resolved Hide resolved
return errorf(CodeInternal, "marshal message: %w", err)
func marshal(dst *bytes.Buffer, message any, codec Codec) *Error {
if message == nil {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct - nil handling is codec-specific. This works for binary protobuf but isn't correct for protojson (where we should write {}).

defer w.bufferPool.Put(buffer)
envelope := &envelope{Data: buffer}
return w.Write(envelope)
dst.Write(raw)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've allocated this []byte; we should try to put it back into the pool when we're done with it.

}
if readMaxBytes > 0 && bytesRead > int64(readMaxBytes) {
// Attempt to read to end in order to allow connection re-use
discardedBytes, err := io.Copy(io.Discard, src)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the discard function here, so we're not copying an unbounded number of bytes from a badly-behaved client?

func makeEnvelope(buffer *bytes.Buffer) envelope {
var head [5]byte
buffer.Write(head[:])
return envelope{Buffer: buffer}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these changes, is envelope doing enough work to keep around? It doesn't seem to provide much value as a wrapper on top of bytes.Buffer, and I don't love that the unwrap method isn't safe to call twice.

Would the rest of the repository be more clear if we instead had something like this (and managed the envelope and message buffer separately)?

type envelope [5]byte

func (e *envelope) Flags() uint8 { ... }
func (e *envelope) SetFlags(uint8) { ... }
func (e *envelope) HasFlags(uint8) { ... } // check if flags are set
func (e *envelope) Size() int { ... } // read size from last 4 bytes
func (e *envelope) SetSize(int) { ... } // write size to last 4 bytes

Copy link
Contributor Author

@emcfarlane emcfarlane Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Envelope is only needed to build a new enveloped message. On read we need to unwrap it to access the message size and then only need to return flags as the message size is encoded in the return data payload.

To make it easier to build new messages and allow downstream to be agnostic to whether the message is enveloped or not we can encode the envelope message in the bytes.Buffer directly.

buffer := &bytes.Buffer{}
marshal(buffer, msg)
if buffer.Len() > min {
  compressBuffer := &bytes.Buffer{}
  compress(compressBuffer, buffer)
  buffer = compressBuffer // swap
 }
write(buffer)

Can be written in an envelope as:

envelope := makeEnvelope(&bytes.Buffer{})
marshal(envelope.Buffer, msg)
var flags uint8
if envelope.size() > min {
  compressEnvelope := makeEnvelope(&bytes.Buffer{})
  compress(compressEnvelope.Buffer, envelope.unwrap())
  flags |= flagCompressed
  envelope = compressEnvelope // swap
}
envelope.encodeSizeAndFlags(flags)
write(envelope.Buffer)

Defiantly something better could be done though. The unwrap() is used to escape the envelope and return the data for a reader (needed when we compress the payloads). Maybe:

type envelope struct {
  Buffer *bytes.Buffer
}
func (e envelope) Encode(flags uint8) 
func (e envelope) Size() int // size of data, buffer.Len() - 5
func (e envelope) Unwrap() *bytes.Buffer

I think for flags we could have a type, but easy to write the logic inplace too.

type flags uint8
func (x flags) Has(y flags) bool  { return x&y > 0 }
func (x flags) Set(y flags) flags { return x | y }

defer w.bufferPool.Put(buffer)
envelope := &envelope{Data: buffer}
return w.Write(envelope)
dst.Write(raw)
Copy link
Contributor Author

@emcfarlane emcfarlane Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to append to the buffer rather than replace. (we could add the original logic, it's just more complicated as still need to append(dst.Bytes(), raw).

func makeEnvelope(buffer *bytes.Buffer) envelope {
var head [5]byte
buffer.Write(head[:])
return envelope{Buffer: buffer}
Copy link
Contributor Author

@emcfarlane emcfarlane Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Envelope is only needed to build a new enveloped message. On read we need to unwrap it to access the message size and then only need to return flags as the message size is encoded in the return data payload.

To make it easier to build new messages and allow downstream to be agnostic to whether the message is enveloped or not we can encode the envelope message in the bytes.Buffer directly.

buffer := &bytes.Buffer{}
marshal(buffer, msg)
if buffer.Len() > min {
  compressBuffer := &bytes.Buffer{}
  compress(compressBuffer, buffer)
  buffer = compressBuffer // swap
 }
write(buffer)

Can be written in an envelope as:

envelope := makeEnvelope(&bytes.Buffer{})
marshal(envelope.Buffer, msg)
var flags uint8
if envelope.size() > min {
  compressEnvelope := makeEnvelope(&bytes.Buffer{})
  compress(compressEnvelope.Buffer, envelope.unwrap())
  flags |= flagCompressed
  envelope = compressEnvelope // swap
}
envelope.encodeSizeAndFlags(flags)
write(envelope.Buffer)

Defiantly something better could be done though. The unwrap() is used to escape the envelope and return the data for a reader (needed when we compress the payloads). Maybe:

type envelope struct {
  Buffer *bytes.Buffer
}
func (e envelope) Encode(flags uint8) 
func (e envelope) Size() int // size of data, buffer.Len() - 5
func (e envelope) Unwrap() *bytes.Buffer

I think for flags we could have a type, but easy to write the logic inplace too.

type flags uint8
func (x flags) Has(y flags) bool  { return x&y > 0 }
func (x flags) Set(y flags) flags { return x | y }

@emcfarlane
Copy link
Contributor Author

Closing, will look at breaking apart!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants