Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialize the unary RPC flow internally #609

Closed
jhump opened this issue Oct 13, 2023 · 21 comments
Closed

Specialize the unary RPC flow internally #609

jhump opened this issue Oct 13, 2023 · 21 comments
Labels
enhancement New feature or request

Comments

@jhump
Copy link
Member

jhump commented Oct 13, 2023

Currently, all RPCs flow through the same internal abstraction: duplexHTTPCall. This abstraction can accommodate all manner of calls, including full-duplex bidirectional calls, but it is overkill for unary RPC. Some of the machinery needed to support bidirectional streaming (synchronization mechanisms and an extra goroutine and io.Pipe per call) is pure overhead and unnecessary for simple, unary RPCs.

We can eliminate this overhead by providing a dedicated flow for unary RPCs via a new method on the protocolClient interface. Something like so:

// Invoke invokes a unary RPC. The given req is sent to the server and the reply
// is unmarshalled into resp. Returns the HTTP method used, response headers,
// trailers, and optional error.
Invoke(ctx context.Context, spec Spec, reqHdr http.Header, req, resp any) (method string hdr, tlr http.Header, error)

A very naive implementation of this new method might look like so (just to demonstrate what it conceptually does):

func (m *myClient) Invoke(ctx context.Context, spec Spec, reqHdr http.Header, req, resp any) (http.Header, http.Header, error) {
	conn := client.protocolClient.NewConn(ctx, spec, reqHdr)
	if err := conn.Send(req); err != nil && !errors.Is(err, io.EOF) {
		_ = conn.CloseRequest()
		_ = conn.CloseResponse()
		return nil, err
	}
	if err := conn.CloseRequest(); err != nil {
		_ = conn.CloseResponse()
		return nil, err
	}
	
	if err := conn.Receive(resp); err != nil {
		return nil, err
	}
	// Make sure there's no extra message and also
	// make sure we've received any trailers.
	if err := conn.Receive(resp); err == nil {
		return nil, NewError(CodeUnknown, errors.New("unary stream has multiple messages"))
	} else if err != nil && !errors.Is(err, io.EOF) {
		return nil, NewError(CodeUnknown, err)
	}
	return http.MethodPost, conn.ResponseHeader(), conn.ResponseTrailer(), conn.CloseResponse() 
}

But the value of it is that it enables a much more bespoke flow that eschews the under-the-hood complexity of the StreamingClientConn (which is implemented by way of duplexHTTPConn).

Having this in place would also make it trivial to resolve #541 for unary RPCs in a way that doesn't further complicate duplexHTTPConn.

@jhump jhump added the enhancement New feature or request label Oct 13, 2023
@jhump
Copy link
Member Author

jhump commented Oct 19, 2023

An alternative to the suggestion above would be a means to independently specialize the request-handling from the response-handling flows. This would allow the same optimizations unlocked above for unary RPCs to also be used for server-streaming RPCs, which also have a unary request.

Here's a quick sketch of what that might look like:

type protocolClient interface {
	Peer() Peer
	WriteRequestHeader(StreamType, http.Header)
    
	// Use for unary and server-stream RPCs.
	NewUnaryRequestConn(context.Context, Spec, http.Header, req any) streamingResponse
	// Use for client-stream and bidi-stream RPCs.
	NewStreamingRequestConn(context.Context, Spec, http.Header) streamingClientConn
}

type streamingResponse interface {
	Receive(any) error
	ResponseHeader() http.Header
	ResponseTrailer() http.Header
	CloseResponse() error
}

type streamingClientConn interface {
	streamingResponse

	Spec() Spec
	Peer() Peer
	Send(any) error
	RequestHeader() http.Header
	CloseRequest() error
}

func unaryResponse(str streamingResponse, resp any) error {
 	// used by unary and client-stream RPCs to extract a unary response
	// message or an error from the given response.
	// ...
}

@akshayjshah, @mattrobenolt, @emcfarlane, thoughts?

(For more context, see thread starting with #611 (comment)).

@mattrobenolt
Copy link
Contributor

Yup, that much more matches my expectations of the abstraction here. Then the client vs server conn implementations are composable to make any of the permutations, and bidi includes the extra overhead of needing a goroutine to do both sides concurrently.

@jhump
Copy link
Member Author

jhump commented Oct 19, 2023

and bidi includes the extra overhead of needing a goroutine to do both sides concurrently.

The client-side still needs this, too. The goroutine is not needed to do both concurrently but rather is necessary in order to stream the request. That's just the way the net/http interface works. The call to http.Client.Do must be started before the application can start sending request bytes. But that call blocks until the server sends a status line and response headers. So it must happen in a separate goroutine for a streaming upload so that the application can send bytes while the http.Client.Do call is blocked. It is only after the application has finished uploading the request data and half-closes the RPC that the server will then send a response and unblock that goroutine.

In the above outline, of just two methods, the latter one (NewStreamingRequestConn) must use a pipe and a goroutine.

@mattrobenolt
Copy link
Contributor

Sure that's fair. That's what I said might be more of a limitation of stdlib APIs available to us. Theoretically it wouldn't need it, but doesn't matter.

I think treating client stream with the bidi path is more than acceptable.

@jhump
Copy link
Member Author

jhump commented Oct 20, 2023

@emcfarlane, your initial approach in 9c76410 is actually pretty close to the above suggestion. Instead of having the split in logic be in the protocolClient interface, you've got it one layer lower, by splitting the HTTP call in two. So maybe that is the right direction (sorry I suggested otherwise in PR comments). Instead of calling them unaryHTTPCall and duplexHTTPCall, maybe call them unaryRequestHTTPCall and streamingRequestHTTPCall to better convey their purpose.

@emcfarlane
Copy link
Contributor

The problem is with the request body, the io.ReadCloser which is currently an io.Pipe but should be a bytes.Buffer for unary to trivially allow GetBody to be set. It surfaces on the client side because we want to use an io.Writer to send the payload but need to provide an io.Reader. On the server we have an io.Reader for request and io.Writer for response, which maps cleanly to send and receive. I think down the road we can implement a more sophisticated streaming reader to replace io.Pipe that allows GetBody but for unary and server-stream we can use the simple solution of a bytes buffer reader.

Adding a unaryRequestHTTPCall works well for connectUnaryClientConn as we can replace the duplexHTTPCall here:

duplexCall *duplexHTTPCall

For grpc and grpc-web the duplexHTTPCall can't be switched out for unaryRequestHTTPCall as easily as there's no separate grpcUnaryClientConn:
duplexCall *duplexHTTPCall

To reduce the changes needed I want to replace duplexHTTPCall with a more general httpCall struct that changes the io.Writer interface to Send(buffer *bytes.Buffer) error. duplexHTTPCall already has the streamType, so on Send if it is a unary or server-stream request we can set the body to the entire buffer and create the GetBody function. For streaming we build the io.Pipe and call the request in a go routine.

This makes the duplexHTTPCall a bit more complicated as it decides which call style to invoke but removes complications in protocol implementations having to switch behaviour on streamType. @jhump @mattrobenolt what do you think?

@mattrobenolt
Copy link
Contributor

The problem is with the request body, the io.ReadCloser which is currently an io.Pipe but should be a bytes.Buffer for unary to trivially allow GetBody to be set. It surfaces on the client side because we want to use an io.Writer to send the payload but need to provide an io.Reader. On the server we have an io.Reader for request and io.Writer for response, which maps cleanly to send and receive. I think down the road we can implement a more sophisticated streaming reader to replace io.Pipe that allows GetBody but for unary and server-stream we can use the simple solution of a bytes buffer reader.

For my understanding, why is it relevant to implement GetBody? From my understanding and experiments around here, this is used to replay a request in error, but only if a request is deemed idempotent: https://cs.opensource.google/go/go/+/refs/tags/go1.21.3:src/net/http/transport.go;l=87-94

Personally, I've dealt with this for our load balancer situation, and we explicitly do not want a GetBody, since none of our RPCs would be safe to be idempotent. Granted, Connect has awareness of idempotency, but that's bundled up with a GET request, and doesn't have a body anyways. https://cs.opensource.google/go/go/+/refs/tags/go1.21.3:src/net/http/request.go;l=1440-1454

Given that this would be for our POST path, I'm not sure it'd even be used, at least by the default stdlib http.Transport.

This makes the duplexHTTPCall a bit more complicated as it decides which call style to invoke but removes complications in protocol implementations having to switch behaviour on streamType.

I think that makes sense inside the the *HTTPCall implementation. I'd consider that closer to our "transport" and makes sense to me that it would determine how to do this.

@jhump
Copy link
Member Author

jhump commented Oct 20, 2023

@mattrobenolt, it is used. See #541. The HTTP library can also replay a request if it is certain that it was not processed. This happens, for example, in HTTP/2 when the server is doing graceful shutdown and sends a GOAWAY frame with an error code of NO_ERROR.

@mattrobenolt
Copy link
Contributor

Interesting, TIL! I had assumed if the body was read at all, this is quite unsafe. It's not a scenario I guess we've encountered.

@emcfarlane
Copy link
Contributor

I've implemented the basic unary changes needed: #611

Theres some remaining work but I'd like to break it into another PR to avoid growing this one too much. If that sounds like a good idea I can get the current work ready for review once #594 merges.

@emcfarlane
Copy link
Contributor

I'm proposing the Send method on the duplex HTTP call to replace the io.Writer:

// Send the payload to the server.
func (d *duplexHTTPCall) Send(payload messagePayload) error

// messagePayload a single message for a stream. It allows for rewinding of the underlying 
// source to support retries.
type messagePayload interface {
	io.Reader
	io.WriterTo
	// Rewind seeks the payload to the beginning.
	Rewind() 
}

With two implementations, one enveloped and the other based on a bytes buffer. Extending the *envelope methods to implement is started in #622:

type envelope struct {
	Data *bytes.Buffer
	Flags uint8
	offset uint // Offset into Data
}

And the bytes buffer could be:

type messageBuffer struct {
	Data *bytes.Buffer
	offset uint // Offset into Data
}
// TODO: methods to implement messagePayload similar to envelope.

@akshayjshah, @mattrobenolt, @jhump what do you think about this API? How should the envelopeWriter be modified to support calling this method? Or is there a better alternative?

@emcfarlane
Copy link
Contributor

For envelopeWriter I was thinking the current method would be changed to accept a destination:

func (e *envelopeWriter) Marshal(dst io.Writer, msg any) *Error

Both the client *duplexHTTPCall and the server http.ResponseWriter can use. Then to update to Send for the client add a method which returns the envelope created in Marshal (rewritting Marshal to call MarshalEnvelope then write to the destination):

func (e *envelopeWriter) MarshalEnvelope(data *bytes.Buffer, msg any) (*envelope, *Error)

func (e *envelopeWriter) Marshal(dst io.Writer, msg any) (*Error) {
	buffer := e.bufferPool.Get()
	defer e.bufferPool.Put(buffer)
	env, err := e.MarshalEnvelope(msg)
	if err != nil { return err }
	if _, err := env.WriteTo(dst); err != nil { return err }
}

@jhump
Copy link
Member Author

jhump commented Nov 7, 2023

I think you'll want another method in the messagePayload interface:

    Release(*bufferPool)

So when the message is no longer needed, it can be returned to the pool.

As I mentioned F2F, I was bothered by *envelope needing to track mutable state to implement io.Reader. An alternative might be a slightly different interface:

type messagePayload interface {
	// Creates a new reader for reading the payload data.
	NewReader() io.Reader
	// Writes the payload data to w.
	WriteTo(w io.Writer) (n int64, err error)
	// Returns any underlying *bytes.Buffer instances to pool.
	// This payload must not be used again after this is called.
	Release(pool *bufferPool)
}

This way, a message payload implementation (i.e. *envelope) is effectively immutable -- which in my mind is more intuitive since it's surprising that a thing called a "payload" might be mutable. The way it was previously specified, it also lacks symmetry/consistency with how other things work that also implement both io.Reader and io.WriterTo: if I consume some data via the io.Reader interface, that doesn't actually impact the io.WriterTo behavior (or at least it didn't in the PR you had opened, #622).

So the value returned from NewReader is the thing that tracks mutable state (offset). Then there's no need for Rewind() since the caller instead just creates a new reader. And since it doesn't implement both io.Reader and io.WriterTo, there would be no expectations about how the two methods should interact. I changed the above to actually explicitly state the WriteTo method, instead of embedding io.WriterTo, to further clarify this point.

Does that make sense? What do you think?

@emcfarlane
Copy link
Contributor

WriterTo is often called after a partial Read to sniff the content type. Separating them out doesn't make sense as WriterTo is an optimisation for Read. Consuming some Read should affect the WriterTo and always does in the std library implementations. The behaviour should match what a *bytes.Reader does with the Rewind() effectively being (*bytes.Reader).Seek(io.Start, 0).

@jhump
Copy link
Member Author

jhump commented Nov 9, 2023

WriterTo is often called after a partial Read to sniff the content type. Separating them out doesn't make sense as WriterTo is an optimisation for Read. Consuming some Read should affect the WriterTo and always does in the std library implementations

Sure. But that case will never happen here. We either need to use it as a reader (for non-streaming request messages) or to write (everything else). Seems strange to complicate the implementation of WriteTo for a non-existent use case. 🤷

@emcfarlane
Copy link
Contributor

I think we'd still want the Request body as an io.Reader to implement io.WriterTo to so we get the optimized io.Copy calls (and other internal funcs that use it). The complexity cost is very low.

@jhump
Copy link
Member Author

jhump commented Nov 10, 2023

Okay. That's fair. I guess that's really only for Connect unary, but since that is likely a significant majority of RPC traffic, that does make sense.

@emcfarlane
Copy link
Contributor

In the current GetBody change for unary we can ensure that the buffer is fully read or closed before returning to the caller on Send. This removes the need to change the ownership of buffer from the caller. I think this simplifies the changes for unary and when we want to implement for both unary and stream we can address the need to change the buffer ownership.

Currently a unary call effectively does:

rdr, wtr := io.Pipe()
req.Body = rdr
go client.Do(ctx, req)
wtr.Write(buf) // Blocks on Read/Close

The new GetBody changes:

rdr := &payloadCloser{buf: buf}
req.Body = rdr
req.GetBody = func() (io.ReadClosure, error) {
	return rdr, rdr.Rewind()
}
client.Do(ctx, req)
rdr.Wait() // Blocks on Read/Close

Both wtr.Write and rdr.Wait block on being drained fully or closed. (*payloadCloser).Wait() was added as under testing with -race there were reads occurring after recycling the buffer in the pool. client.Do returns don't guarantee that the request.Body will no longer be used, even for unary calls. To ensure that the client request cannot access the buffer Wait resets the payload closure when we know the body has been Close() or the read returns an error. If GetBody is called the state of the payload closure is rewound so Wait() will not release until the next Close or error. This lets the request be rewound as many times as needed, then when the client returns we wait for the drain event before returning to the caller. In most cases this Wait() should be immediate, the request was read during the call.

@emcfarlane
Copy link
Contributor

An alternative to the API on envelopeWriter above would be to swap the io.Writer for:

type sender interface {
	Send(messagePayload) (int64, error)
}

With a simple adapter for io.Writer so http.ResponseWriter can still be used:

type writeSender struct {
	writer io.Writer
}
func (s writeSender) Send(payload messagePayload) error {
	return payload.WriteTo(s.writer)
}

This would keep the semantics that envelopeWriter closes over the destination.

@emcfarlane
Copy link
Contributor

emcfarlane commented Nov 10, 2023

Implemented the sender API here: emcfarlane/connect-go@main...ed/sender annoyingly this increases allocations removing some of the benefits we gain. I think related to allocating the interfaces for the sender args, maybe I missed something. Are we happy with this approach?

geomean                           192.3         198.3        +3.15%

@jhump
Copy link
Member Author

jhump commented Jan 22, 2024

Resolved in #649.

@jhump jhump closed this as completed Jan 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants