Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate envelope writing from marshalling #586

Closed
wants to merge 10 commits into from

Conversation

emcfarlane
Copy link
Contributor

New mechanism for working with envelope messages. Creates new util functions like writeAll, readAll and writeEnvelope, readEnvelope. These are used directly by the protocol implementations to write and read messages from the stream. Each protocol can then handle specific flags independently and avoid the sentinel errSpecialEnvelope to eject from the envelope marshallers.

PR is a prelude to the fix for #541 with a new methods on duplexHTTPCall to write messages based on SendEnvelope WriteEnvelope and changing the buffer pool use ownership. Removing the envelope marshallers means can change the signature for the client and not the server.

- BenchmarkConnect/unary-8                     609           1967440 ns/op         5840173 B/op        230 allocs/op
+ BenchmarkConnect/unary-8                     606           1986667 ns/op         5230730 B/op        218 allocs/op

Rework of #546 . Removed makeEnvelope and use the method signature for enveloping.

@emcfarlane emcfarlane self-assigned this Sep 12, 2023
@@ -20,7 +20,7 @@ import (
)

const (
initialBufferSize = 512
initialBufferSize = bytes.MinRead
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes.MinRead == 512 but more for showing intent, we want to avoid another allocation on first read.

@@ -78,7 +78,10 @@ func newCompressionPool(
}
}

func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readMaxBytes int64) *Error {
func (c *compressionPool) Decompress(pool *bufferPool, src *bytes.Buffer, readMaxBytes int64) *Error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression now takes a pool and swaps with the source on success.

expectCode: connect.CodeInvalidArgument,
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1),
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed internal_argument I don't think it should be raised by the framework. Based on doc string: https://pkg.go.dev/google.golang.org/[email protected]/codes#Code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check what grpc-go does here? I think I changed this to CodeInvalidArgument specifically to match them, but I may be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What direction is this? Returning "internal" from the server, when the client has sent an invalid stream, seems very wrong. Internal should be reserved for internal server errors, not errors in the client (akin to 5xx vs 4xx). You generally want exception reporting/alerting for internal server errors since they indicate a likely bug in your code; you do not want any such alerting for client errors, because then a client can induce alerts in your system w/ carefully crafted (invalid) requests.

So I agree that this should be "internal" only in cases where it's the server that sent the invalid message.

envelope.go Outdated Show resolved Hide resolved
pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
)

func TestBuffers_ReadAllAllocs(t *testing.T) {
Copy link
Contributor Author

@emcfarlane emcfarlane Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are small tests that show allocs, happy to remove.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These don't seem too useful to me. I think benchmarks are more valuable than this sort of test. ¯\_(ツ)_/¯

@@ -40,7 +40,7 @@ const (
headerUserAgent = "User-Agent"
headerTrailer = "Trailer"

discardLimit = 1024 * 1024 * 4 // 4MiB
discardLimit = 1024 * 1024 * 8 // 8MiB
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test was failing now that discard is used. I've upped this limit to avoid changing the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which test? I think it might be better to change the test. While 8mb is not a big number, it can be bad combined with a slow client. And until we solve the "read timeout" thing (discussed in separate threads), we should be more conservative. 4mb is already generous since most RPC payloads are single digit kilobytes, so doubling this seems like the wrong play.

protocol_connect.go Outdated Show resolved Hide resolved
envelope.go Show resolved Hide resolved
Copy link
Member

@jhump jhump left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, Ed, sorry it took me so long to get you feedback. I've left numerous comments on the diff, but overall I'm really worried about merging a change like this.

This change does too much in a single branch. It's very large and dense. And it's hard to review because the changes involve lots of code motion and re-abstraction/de-abstraction. I think we'd need a very clear motivation to merge such a change, and I haven't heard or seen it. Maybe you can elaborate more on why you think this sort of pre-factoring is needed for the upcoming changes you are trying to make? Maybe there are are less intrusive ways to accomplish the same thing.

The changes in abstractions make the protocol implementations more verbose and more complicated. I'm not sure that's really desirable -- I know you stated the explicitness makes them easier to change in the future, but I think we could still do better towards code re-use/DRY-ness than what's in this branch. The change as is feels a lot like "moving spaghetti around on the plate", changing where certain things are done, but not in a way that is obviously an improvement to readability/maintainability.

Re-factoring changes are best when they are mostly mechanical. But this branch also has some subtle behavioral changes, which are definitely not mechanical. That makes it harder to review. We should separate these aspects so that the behavioral changes can be reviewed/discussed separately.

I think we may need to chat about what you're really needing to accomplish and come up with a less radical formulation. I hope that all makes sense.

func (e *envelope) IsSet(flag uint8) bool {
return e.Flags&flag == flag
}
var errEOF = errorf(CodeInternal, "%w", io.EOF)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "internal"? Looks like it previously was "unknown", which I think is more fitting.

// Instead, replace it with the larger, newly-allocated slice. This
// allocates, but it's a small, constant-size allocation.
*buffer = *bytes.NewBuffer(raw)
if dst.Cap() < len(raw) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not the same condition as above, cap(raw) > dst.Cap()?

As is, in one case, we hang on to the bigger buffer, and the other we might hang on to the smaller one.

Also, this logic to swap dst and raw is in here 2x. Maybe extract to a helper function so there's only one place that warrants the context comments you have above but omitted here.

compressionPool *compressionPool
bufferPool *bufferPool
readMaxBytes int
func read(dst *bytes.Buffer, src io.Reader) (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we've reinvented *bytes.Buffer.ReadFrom and io.LimitReader between this and readAll.

I played around with this locally, and using those primitives from the stdlib don't have any meaningful impact on the benchmarks, but improve readability and make the code a little bit more concise since there's no need for this read function. (And your unit tests to verify the number of allocations also still pass.)

pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
)

func TestBuffers_ReadAllAllocs(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These don't seem too useful to me. I think benchmarks are more valuable than this sort of test. ¯\_(ツ)_/¯

Comment on lines +132 to +133
case size < 0:
return 0, errorf(CodeInvalidArgument, "message size %d overflowed uint32", size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. Four bytes cannot overflow uint32 and BigEndian.Uint32 returns a uint32, which is again not possible for it to overflow.

If size is negative here, it simply means we're on a 32-bit architecture and size overflowed 2^31, since we've converted from unsigned to signed. But a size greater than 2^31 is allowed and should not cause an error (unless it causes an allocation error because the data is too big to fit in memory of 32-bit address; but that will manifest as bytes.ErrTooLarge from the call to dst.ReadFrom below, so I don't think we should bother trying to handle it here).

If you instead convert everything to int64 (instead of int), then it's not possible for any comparisons to overflow.

return 0, errorf(CodeInternal, "incomplete envelope: %w", err)
} else if readN != int64(size) {
err = io.ErrUnexpectedEOF
return 0, errorf(CodeInternal, "incomplete envelope: %w", err)
Copy link
Member

@jhump jhump Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very wrong to put internal on everything (including in writeAll below). For one, we shouldn't be adding a code at all unless we know the code for the error. And many of these (like line 151) are I/O errors where it should be "unknown" if anything.

As mentioned in previous comment, this is definitely the wrong code if this is a server reading a request or writing a response. In that case "invalid argument" makes so much more sense when reading the request. And when writing the response, I don't see why we'd even bother wrapper the error with a code at all. That seems like it should just propagate out as is (which would likely eventually turn into an "unknown" which is more appropriate than "internal").

On the client side, if reading a malformed response, then internal does make sense -- but only when we know the response is malformed. The case above (line 151 again, which means an I/O error reading) should not be marked internal. And neither should errors from the client. (The framework should probably wrap it with an "unknown" higher up the stack, just before it gets back to the caller of stream.Send or a unary client caller.)

return prefix[0], nil
}

func writeAll(dst io.Writer, src *bytes.Buffer) *Error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per comment above, I don't think we need this at all. Just use src.WriteTo(dst). Marking every write error as an internal error is wrong.

@@ -40,7 +40,7 @@ const (
headerUserAgent = "User-Agent"
headerTrailer = "Trailer"

discardLimit = 1024 * 1024 * 4 // 4MiB
discardLimit = 1024 * 1024 * 8 // 8MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which test? I think it might be better to change the test. While 8mb is not a big number, it can be bad combined with a slow client. And until we solve the "read timeout" thing (discussed in separate threads), we should be more conservative. 4mb is already generous since most RPC payloads are single digit kilobytes, so doubling this seems like the wrong play.

Comment on lines +298 to +301
if wroteN == limit {
// Ensure we error if we hit the limit.
err = io.ErrShortBuffer
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the limit and the payload could be the same size, in which case this error would be spurious. You'd need to either set the limit reader to discardLimit+1 or follow up with an attempt to read a single byte to decide on if this error is appropriate.

unmarshaler connectUnaryUnmarshaler
responseHeader http.Header
responseTrailer http.Header
*connectClient
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why embed? Does this add any additional exported symbols to connectUnaryClientConn? If so, that seems bad since a caller could type-assert to gain access to them.

@emcfarlane
Copy link
Contributor Author

emcfarlane commented Oct 9, 2023

@jhump thanks for the review. The issue I was trying to solve is the abstraction of the evelopeWriter over an io.Writer here.

Connect clients write to requests by exposing an io.Writer which is passed to a unary or streaming call. For GetBody we want to pass around message buffers instead. This work was based on your feedback for removing the writer interfaces.

I still think the current envelope abstraction leads to more bugs and unnecessary complexity than it helps. These are the bugs/almost bugs I know of:

I'll try address the feedback. Would be great to chat through a solution that's more surgical, less spaghetti.

@jhump
Copy link
Member

jhump commented Oct 9, 2023

Connect clients write to requests by exposing an io.Writer which is passed to a unary or streaming call. For GetBody we want to pass around message buffers instead.

While duplexHTTPCall does expose a Write method, that's not the same thing in my mind as "exposing an io.Writer" to which Connect clients write requests. In fact, the whole connect client flow -- both unary and stream calls -- actually already speak in terms of messages. So I'm pretty sure that can be translated to message buffers in a much more surgical way.

This work was based on your feedback for removing the writer interfaces.

What I was actually proposing in the comments on that previous PR was something much more surgical that just replaces the io.Pipe and the calls to its reader and writer.

Also, I thought we decided to punt on supporting GetBody for streaming calls for now and focus on unary, by separating it from the duplexHTTPCall. So I'm failing to see how a change like this is a needed pre-factor for the step of further special-casing unary calls.

These are the bugs/almost bugs I know of:

All of those can be address by much smaller, targeted fixes instead of a large refactor that happens to touch on all of them at once. Small, targeted changes are easier write, review, and test and are inherently lower risk. While it can be difficult to fix something in a small, target way sometimes, like if a fix really requires a significant architecture change, I don't think that is the case for any of these.

Would be great to chat through a solution that's more surgical, less spaghetti.

I'll schedule some time to chat more.

@emcfarlane
Copy link
Contributor Author

So I'm failing to see how a change like this is a needed pre-factor for the step of further special-casing unary calls.

For connect, unary calls have access to the duplexWriter already so this isn't needed. For gRPC and gRPC-Web both envelope and go through the envelopeWriter. The changes I had to support GetBody would expose two methods on duplexHTTPCall:

func (d *duplexHTTPCall) Send(buf *bytes.Buffer) error
func (d *duplexHTTPCall) SendEnvelope(buf *bytes.Buffer, flags uint8) error

Or something similar to this effect.

I'm happy to go with something better.

@emcfarlane emcfarlane closed this Oct 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants