Skip to content

Commit

Permalink
Revert HTTP call split
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Oct 30, 2023
1 parent 9d498db commit b027dcb
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 468 deletions.
27 changes: 9 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,7 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
// once at client creation.
unarySpec := config.newSpec(StreamTypeUnary)
unaryFunc := UnaryFunc(func(ctx context.Context, request AnyRequest) (AnyResponse, error) {

var responseMsg Res
response := &Response[Res]{
Msg: &responseMsg,
}
if err := client.protocolClient.Invoke(ctx, unarySpec, request, response); err != nil {
return nil, err
}
return response, nil

/*conn := client.protocolClient.NewConn(ctx, unarySpec, request.Header())
conn := client.protocolClient.NewConn(ctx, unarySpec, request.Header())
conn.onRequestSend(func(r *http.Request) {
request.setRequestMethod(r.Method)
})
Expand All @@ -107,7 +97,7 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
_ = conn.CloseResponse()
return nil, err
}
return response, conn.CloseResponse()*/
return response, conn.CloseResponse()
})
if interceptor := config.Interceptor; interceptor != nil {
unaryFunc = interceptor.WrapUnary(unaryFunc)
Expand Down Expand Up @@ -145,20 +135,19 @@ func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamFo
if c.err != nil {
return &ClientStreamForClient[Req, Res]{err: c.err}
}
return &ClientStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeClient, nil)}
return &ClientStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeClient, nil /* header */, nil /* onRequestSend */)}
}

// CallServerStream calls a server streaming procedure.
func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Request[Req]) (*ServerStreamForClient[Res], error) {
if c.err != nil {
return nil, c.err
}
conn := c.newConn(ctx, StreamTypeServer, func(r *http.Request) {
conn := c.newConn(ctx, StreamTypeServer, request.header, func(r *http.Request) {
request.method = r.Method
})
request.spec = conn.Spec()
request.peer = conn.Peer()
mergeHeaders(conn.RequestHeader(), request.header)
// Send always returns an io.EOF unless the error is from the client-side.
// We want the user to continue to call Receive in those cases to get the
// full error from the server-side.
Expand All @@ -178,12 +167,14 @@ func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForCli
if c.err != nil {
return &BidiStreamForClient[Req, Res]{err: c.err}
}
return &BidiStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeBidi, nil)}
return &BidiStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeBidi, nil /* header */, nil /* onRequestSend */)}
}

func (c *Client[Req, Res]) newConn(ctx context.Context, streamType StreamType, onRequestSend func(r *http.Request)) StreamingClientConn {
func (c *Client[Req, Res]) newConn(ctx context.Context, streamType StreamType, header http.Header, onRequestSend func(r *http.Request)) StreamingClientConn {
if header == nil {
header = make(http.Header, 8) // arbitrary power of two, prevent immediate resizing
}
newConn := func(ctx context.Context, spec Spec) StreamingClientConn {
header := make(http.Header, 8) // arbitrary power of two, prevent immediate resizing
c.protocolClient.WriteRequestHeader(streamType, header)
conn := c.protocolClient.NewConn(ctx, spec, header)
conn.onRequestSend(onRequestSend)
Expand Down
2 changes: 1 addition & 1 deletion client_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestClientPeer(t *testing.T) {
err = clientStream.Send(&pingv1.SumRequest{})
assert.Nil(t, err)
// server streaming
serverStream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{}))
serverStream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{Number: 1}))
t.Cleanup(func() {
assert.Nil(t, serverStream.Close())
})
Expand Down
Loading

0 comments on commit b027dcb

Please sign in to comment.