Skip to content

Commit

Permalink
gRPC transport initial work with crossdock testing (#863)
Browse files Browse the repository at this point in the history
gRPC transport initial work with crossdock testing (#863)
  • Loading branch information
bufdev authored Apr 18, 2017
1 parent ccddb89 commit 36f9f63
Show file tree
Hide file tree
Showing 53 changed files with 2,867 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.crossdock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.8.1

EXPOSE 8080-8088
EXPOSE 8080-8090
ENV SUPPRESS_DOCKER 1
WORKDIR /go/src/go.uber.org/yarpc
ADD dockercrossdockdeps.mk /go/src/go.uber.org/yarpc/
Expand Down
3 changes: 2 additions & 1 deletion build/local.mk
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Paths besides auto-detected generated files that should be excluded from
# lint results.
LINT_EXCLUDES_EXTRAS =
LINT_EXCLUDES_EXTRAS = \
transport/x/grpc/handler.go

# Regex for 'go vet' rules to ignore
GOVET_IGNORE_RULES = \
Expand Down
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ services:
- AXIS_HTTPSERVER=go
- AXIS_CLIENT_ONEWAY=go
- AXIS_SERVER_ONEWAY=go
- AXIS_TRANSPORT_ONEWAY=http,redis,cherami
- AXIS_TRANSPORT_ONEWAY=grpc,http,redis,cherami
- AXIS_TRANSPORT_ONEWAY_CTXPROPAGATION=http,redis,cherami
- AXIS_GO_ENCODING=raw,json,thrift,protobuf
- AXIS_GO_CLIENT=go
- AXIS_GO_SERVER=go

# Transports available to the ctxpropagation behavior for multihop
# requests.
Expand All @@ -45,6 +49,8 @@ services:
- BEHAVIOR_JSON=client,server,transport
- SKIP_JSON=client:java+transport:tchannel,server:java+transport:tchannel
- BEHAVIOR_THRIFT=client,server,transport
- BEHAVIOR_PROTOBUF=go_client,go_server,transport
- BEHAVIOR_GRPC=go_client,go_server,go_encoding
- SKIP_THRIFT=client:java+transport:tchannel,server:java+transport:tchannel
- BEHAVIOR_HEADERS=client,server,transport,encoding
- SKIP_HEADERS=client:java+transport:tchannel,server:java+transport:tchannel
Expand All @@ -70,7 +76,7 @@ services:
- BEHAVIOR_CTXPROPAGATION=ctxclient,ctxserver,transport,ctxavailabletransports
- BEHAVIOR_APACHETHRIFT=apachethriftclient,apachethriftserver
- BEHAVIOR_ONEWAY=client_oneway,server_oneway,transport_oneway,encoding
- BEHAVIOR_ONEWAY_CTXPROPAGATION=client_oneway,server_oneway,transport_oneway
- BEHAVIOR_ONEWAY_CTXPROPAGATION=client_oneway,server_oneway,transport_oneway_ctxpropagation

- REPORT=compact

Expand All @@ -83,7 +89,7 @@ services:
context: .
dockerfile: Dockerfile.crossdock
ports:
- "8080-8088"
- "8080-8090"
environment:
- REDIS=enabled
- CHERAMI=enabled
Expand Down
19 changes: 18 additions & 1 deletion encoding/x/protobuf/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func (u *unaryHandler) Handle(ctx context.Context, transportRequest *transport.R
}
}
response, appErr := u.handle(ctx, request)
if appErr != nil {
responseWriter.SetApplicationError()
}
if err := call.WriteToResponse(responseWriter); err != nil {
return err
}
Expand All @@ -78,9 +81,23 @@ func (u *unaryHandler) Handle(ctx context.Context, transportRequest *transport.R
}
responseData = protoBuffer.Bytes()
}
// We have to detect if our transport requires a raw response
// It is not possible to propagate this information on ctx with the current API
// we we attach this in the relevant transport (currently only gRPC) on the headers
// If we are sending a raw response back to a YARPC client, it needs to understand
// this is happening, so we attach the headers on the response as well
// Other clients (namely the existing gRPC clients outside of YARPC) understand
// that the response is the raw response.
if isRawResponse(transportRequest.Headers) {
responseWriter.AddHeaders(getRawResponseHeaders())
_, err := responseWriter.Write(responseData)
if err != nil {
return err
}
return appErr
}
var wireError *wirepb.Error
if appErr != nil {
responseWriter.SetApplicationError()
wireError = &wirepb.Error{
appErr.Error(),
}
Expand Down
9 changes: 9 additions & 0 deletions encoding/x/protobuf/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (c *client) Call(
if responseData == nil {
return nil, nil
}
// TODO: the error from Call will be the application error, we might
// also have a response returned however
if isRawResponse(transportResponse.Headers) {
response := newResponse()
if err := proto.Unmarshal(responseData, response); err != nil {
return nil, encoding.ResponseBodyDecodeError(transportRequest, err)
}
return response, nil
}
wireResponse := &wirepb.Response{}
if err := proto.Unmarshal(responseData, wireResponse); err != nil {
return nil, encoding.ResponseBodyDecodeError(transportRequest, err)
Expand Down
11 changes: 8 additions & 3 deletions encoding/x/protobuf/protoc-gen-yarpc-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ package main
import (
"fmt"
"log"
"strings"
"text/template"

"go.uber.org/yarpc/internal/protoplugin"
Expand Down Expand Up @@ -79,7 +80,7 @@ type {{$service.GetName}}YarpcClient interface {
// New{{$service.GetName}}YarpcClient builds a new yarpc client for the {{$service.GetName}} service.
func New{{$service.GetName}}YarpcClient(clientConfig transport.ClientConfig) {{$service.GetName}}YarpcClient {
return &_{{$service.GetName}}YarpcCaller{protobuf.NewClient("{{$service.GetName}}", clientConfig)}
return &_{{$service.GetName}}YarpcCaller{protobuf.NewClient("{{trimPrefixPeriod $service.FQSN}}", clientConfig)}
}
// {{$service.GetName}}YarpcServer is the yarpc server-side interface for the {{$service.GetName}} service.
Expand All @@ -94,7 +95,7 @@ type {{$service.GetName}}YarpcServer interface {
func Build{{$service.GetName}}YarpcProcedures(server {{$service.GetName}}YarpcServer) []transport.Procedure {
handler := &_{{$service.GetName}}YarpcHandler{server}
return protobuf.BuildProcedures(
"{{$service.GetName}}",
"{{trimPrefixPeriod $service.FQSN}}",
map[string]transport.UnaryHandler{
{{range $method := unaryMethods $service}}"{{$method.GetName}}": protobuf.NewUnaryHandler(handler.{{$method.GetName}}, new{{$service.GetName}}_{{$method.GetName}}YarpcRequest),
{{end}}
Expand Down Expand Up @@ -181,7 +182,7 @@ var (
{{end}}
`

var funcMap = template.FuncMap{"unaryMethods": unaryMethods, "onewayMethods": onewayMethods}
var funcMap = template.FuncMap{"unaryMethods": unaryMethods, "onewayMethods": onewayMethods, "trimPrefixPeriod": trimPrefixPeriod}

func main() {
if err := protoplugin.Run(
Expand Down Expand Up @@ -230,3 +231,7 @@ func onewayMethods(service *protoplugin.Service) ([]*protoplugin.Method, error)
}
return methods, nil
}

func trimPrefixPeriod(s string) string {
return strings.TrimPrefix(s, ".")
}
4 changes: 2 additions & 2 deletions encoding/x/protobuf/testing/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func benchmarkIntegrationForTransportType(b *testing.B, transportType testutils.
transportType,
keyValueYarpcServer,
sinkYarpcServer,
func(keyValueYarpcClient examplepb.KeyValueYarpcClient, sinkYarpcClient examplepb.SinkYarpcClient) error {
benchmarkIntegration(b, keyValueYarpcClient, sinkYarpcClient, keyValueYarpcServer, sinkYarpcServer)
func(clients *example.Clients) error {
benchmarkIntegration(b, clients.KeyValueYarpcClient, clients.SinkYarpcClient, keyValueYarpcServer, sinkYarpcServer)
return nil
},
)
Expand Down
5 changes: 5 additions & 0 deletions encoding/x/protobuf/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@
// THE SOFTWARE.

package testing

import (
// this is to make sure scripts/cover.sh picks this up with .Deps
_ "go.uber.org/yarpc/internal/examples/protobuf/example"
)
56 changes: 38 additions & 18 deletions encoding/x/protobuf/testing/testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"testing"
"time"

"go.uber.org/yarpc/encoding/x/protobuf"
"go.uber.org/yarpc/internal/examples/protobuf/example"
"go.uber.org/yarpc/internal/examples/protobuf/examplepb"
"go.uber.org/yarpc/internal/testutils"
Expand All @@ -50,8 +49,8 @@ func testIntegrationForTransportType(t *testing.T, transportType testutils.Trans
transportType,
keyValueYarpcServer,
sinkYarpcServer,
func(keyValueYarpcClient examplepb.KeyValueYarpcClient, sinkYarpcClient examplepb.SinkYarpcClient) error {
testIntegration(t, keyValueYarpcClient, sinkYarpcClient, keyValueYarpcServer, sinkYarpcServer)
func(clients *example.Clients) error {
testIntegration(t, clients, keyValueYarpcServer, sinkYarpcServer)
return nil
},
),
Expand All @@ -60,37 +59,41 @@ func testIntegrationForTransportType(t *testing.T, transportType testutils.Trans

func testIntegration(
t *testing.T,
keyValueYarpcClient examplepb.KeyValueYarpcClient,
sinkYarpcClient examplepb.SinkYarpcClient,
clients *example.Clients,
keyValueYarpcServer *example.KeyValueYarpcServer,
sinkYarpcServer *example.SinkYarpcServer,
) {
_, err := getValue(keyValueYarpcClient, "foo")
_, err := getValue(clients.KeyValueYarpcClient, "foo")
assert.Error(t, err)
_, err = getValueGRPC(clients.KeyValueGRPCClient, "foo")
assert.Error(t, err)
assert.NotNil(t, protobuf.GetApplicationError(err))

assert.NoError(t, setValue(keyValueYarpcClient, "foo", "bar"))
value, err := getValue(keyValueYarpcClient, "foo")
assert.NoError(t, setValue(clients.KeyValueYarpcClient, "foo", "bar"))
value, err := getValue(clients.KeyValueYarpcClient, "foo")
assert.NoError(t, err)
assert.Equal(t, "bar", value)

assert.NoError(t, setValue(keyValueYarpcClient, "foo", ""))
_, err = getValue(keyValueYarpcClient, "foo")
assert.NoError(t, setValueGRPC(clients.KeyValueGRPCClient, "foo", "barGRPC"))
value, err = getValueGRPC(clients.KeyValueGRPCClient, "foo")
assert.NoError(t, err)
assert.Equal(t, "barGRPC", value)

assert.NoError(t, setValue(clients.KeyValueYarpcClient, "foo", ""))
_, err = getValue(clients.KeyValueYarpcClient, "foo")
assert.Error(t, err)
assert.NotNil(t, protobuf.GetApplicationError(err))

assert.NoError(t, setValue(keyValueYarpcClient, "foo", "baz"))
assert.NoError(t, setValue(keyValueYarpcClient, "baz", "bat"))
value, err = getValue(keyValueYarpcClient, "foo")
assert.NoError(t, setValue(clients.KeyValueYarpcClient, "foo", "baz"))
assert.NoError(t, setValue(clients.KeyValueYarpcClient, "baz", "bat"))
value, err = getValue(clients.KeyValueYarpcClient, "foo")
assert.NoError(t, err)
assert.Equal(t, "baz", value)
value, err = getValue(keyValueYarpcClient, "baz")
value, err = getValue(clients.KeyValueYarpcClient, "baz")
assert.NoError(t, err)
assert.Equal(t, "bat", value)

assert.NoError(t, fire(sinkYarpcClient, "foo"))
assert.NoError(t, fire(clients.SinkYarpcClient, "foo"))
assert.NoError(t, sinkYarpcServer.WaitFireDone())
assert.NoError(t, fire(sinkYarpcClient, "bar"))
assert.NoError(t, fire(clients.SinkYarpcClient, "bar"))
assert.NoError(t, sinkYarpcServer.WaitFireDone())
assert.Equal(t, []string{"foo", "bar"}, sinkYarpcServer.Values())
}
Expand All @@ -112,6 +115,23 @@ func setValue(keyValueYarpcClient examplepb.KeyValueYarpcClient, key string, val
return err
}

func getValueGRPC(keyValueGRPCClient examplepb.KeyValueClient, key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
response, err := keyValueGRPCClient.GetValue(ctx, &examplepb.GetValueRequest{key})
if err != nil {
return "", err
}
return response.Value, nil
}

func setValueGRPC(keyValueGRPCClient examplepb.KeyValueClient, key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := keyValueGRPCClient.SetValue(ctx, &examplepb.SetValueRequest{key, value})
return err
}

func fire(sinkYarpcClient examplepb.SinkYarpcClient, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand Down
36 changes: 27 additions & 9 deletions encoding/x/protobuf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,26 @@ import (
"github.com/gogo/protobuf/proto"
)

// Encoding is the name of this encoding.
const Encoding transport.Encoding = "protobuf"
const (
// Encoding is the name of this encoding.
Encoding transport.Encoding = "protobuf"

// GetApplicationError returns the application error from the server, if present.
rawResponseHeaderKey = "yarpc-protobuf-raw-response"
)

// SetRawResponse will set rawResponseHeaderKey to "true".
//
// TODO: this has overlap with IsApplicationError
func GetApplicationError(err error) error {
if applicationError, ok := err.(*applicationError); ok {
return applicationError
}
return nil
// rawResponseHeaderKey is a header key attached to either a request or
// response that signals a UnaryHandler to not encode an application error
// inside a wirepb.Response object, instead marshalling the actual response.
//
// Note per the documentation on transport.Headers#With, the returned Header
// may not be the same as the input header, so the caller should always
// update the header with:
//
// header = protobuf.SetRawResponse(header)
func SetRawResponse(headers transport.Headers) transport.Headers {
return headers.With(rawResponseHeaderKey, "1")
}

// ***all below functions should only be called by generated code***
Expand Down Expand Up @@ -118,3 +127,12 @@ func NewOnewayHandler(
func CastError(expectedType proto.Message, actualType proto.Message) error {
return fmt.Errorf("expected proto.Message to have type %T but had type %T", expectedType, actualType)
}

func isRawResponse(headers transport.Headers) bool {
rawResponse, ok := headers.Get(rawResponseHeaderKey)
return ok && rawResponse == "1"
}

func getRawResponseHeaders() transport.Headers {
return SetRawResponse(transport.Headers{})
}
38 changes: 35 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 36f9f63

Please sign in to comment.