Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand service config support #1165

Merged
merged 31 commits into from
May 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
50d4175
Add client and service side apis for limiting the send/recv msg size.…
lyuxuan Apr 3, 2017
13b5f12
merge master
lyuxuan Apr 3, 2017
a0b902a
fix missing import
lyuxuan Apr 3, 2017
ad16b94
fix gofmt goimports go tool vet errors
lyuxuan Apr 3, 2017
a66f923
Make initial service config non-blocking in Dial()
lyuxuan Apr 5, 2017
f02290b
Merge branch 'master' into service_config_pr
lyuxuan Apr 5, 2017
f1bb70f
gofmt
lyuxuan Apr 5, 2017
6f8b553
fix the testMaxMsgSizeServerAPI failure
lyuxuan Apr 6, 2017
fa29686
Merge branch 'master' into service_config_pr
lyuxuan Apr 6, 2017
c6a3937
fix send response error case
lyuxuan Apr 6, 2017
8788b75
merge master resolve conflicts
lyuxuan Apr 13, 2017
cb02ab4
change error message from InvalidArgument to ResourceExhausted
lyuxuan Apr 13, 2017
bab6b61
merge master
lyuxuan Apr 21, 2017
983d837
update the merge of client api and sc
lyuxuan Apr 21, 2017
9c5f260
make max size a pointer type and initialize function a CallOption
lyuxuan Apr 26, 2017
eaa9ccb
minor comment change
lyuxuan Apr 26, 2017
ecbc34a
move server defaults, delete defer cancel() in stream.go
lyuxuan Apr 27, 2017
ea230c7
update
lyuxuan Apr 27, 2017
3ea2870
Merge branch 'master' into service_config_pr
lyuxuan May 7, 2017
d926544
remove unessary nil in return statement
lyuxuan May 8, 2017
59426b3
gofmt
lyuxuan May 8, 2017
bdf9a64
add timeout test, add check or pointer filed in callOption, fix minor…
lyuxuan May 15, 2017
35d77ea
merge master, resolve conflicts
lyuxuan May 15, 2017
4d2b4b5
fix minor typo
lyuxuan May 15, 2017
7505481
comments added
lyuxuan May 15, 2017
504db8e
merge master
lyuxuan May 18, 2017
d19bbe8
change max message size functions name
lyuxuan May 19, 2017
ed64d51
remove unnecessary ok
lyuxuan May 19, 2017
27ae147
remove some todo comments
lyuxuan May 19, 2017
cb64938
fix minor issues
lyuxuan May 19, 2017
4a7b4d0
minor fix
lyuxuan May 19, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}
}
for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil {
if c.maxReceiveMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil {
if err == io.EOF {
break
}
Expand All @@ -93,7 +96,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}

// sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) {
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) {
defer func() {
if err != nil {
// If err is connection error, t will be closed, no need to close stream here.
Expand All @@ -118,6 +121,12 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
if err != nil {
return Errorf(codes.Internal, "grpc: %v", err)
}
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(outBuf) > *c.maxSendMessageSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ideally should be pushed into encode() instead. Otherwise, we will still be allocating more memory than desired. encode() gets the size before it allocates the buffer, so we can safe the work of encoding the object if we check it there after we get the size.

return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
}
err = t.Write(stream, outBuf, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
Expand Down Expand Up @@ -145,14 +154,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli

func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok {
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
defer cancel()
}
mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}

if mc.Timeout != nil && *mc.Timeout >= 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
defer cancel()
}

opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
Expand All @@ -163,6 +176,10 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
o.after(&c)
}
}()

c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)

if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
Expand Down Expand Up @@ -260,7 +277,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, stream, t, args, topts)
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
if err != nil {
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{
Expand Down
82 changes: 56 additions & 26 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ package grpc
import (
"errors"
"fmt"
"math"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -87,22 +87,25 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
maxMsgSize int
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
}

const defaultClientMaxMsgSize = math.MaxInt32
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = 1024 * 1024 * 4
)

// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
Expand All @@ -123,10 +126,15 @@ func WithInitialConnWindowSize(s int32) DialOption {
}
}

// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
func WithMaxMsgSize(s int) DialOption {
return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
}

// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
return func(o *dialOptions) {
o.maxMsgSize = s
o.callOptions = append(o.callOptions, cos...)
}
}

Expand Down Expand Up @@ -321,7 +329,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.dopts.maxMsgSize = defaultClientMaxMsgSize

for _, opt := range opts {
opt(&cc.dopts)
}
Expand Down Expand Up @@ -359,15 +367,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()

scSet := false
if cc.dopts.scChan != nil {
// Wait for the initial service config.
// Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
scSet = true
}
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}
// Set defaults.
Expand Down Expand Up @@ -430,7 +439,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return nil, err
}
}

if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
Expand Down Expand Up @@ -640,12 +659,23 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
return nil
}

// TODO: Avoid the locking here.
func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
// the serivce, we return it.
// Otherwise, we return an empty MethodConfig.
func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
m, ok = cc.sc.Methods[method]
return
m, ok := cc.sc.Methods[method]
if !ok {
i := strings.LastIndex(method, "/")
m, _ = cc.sc.Methods[method[:i+1]]
}
return m
}

func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
Expand Down
79 changes: 59 additions & 20 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ func (d *gzipDecompressor) Type() string {

// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
creds credentials.PerRPCCredentials
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
}

var defaultCallInfo = callInfo{failFast: true}
Expand Down Expand Up @@ -217,6 +219,22 @@ func FailFast(failFast bool) CallOption {
})
}

// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
func MaxCallRecvMsgSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxReceiveMessageSize = &s
return nil
})
}

// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
func MaxCallSendMsgSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxSendMessageSize = &s
return nil
})
}

// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call.
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
Expand Down Expand Up @@ -259,7 +277,7 @@ type parser struct {
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
return 0, nil, err
}
Expand All @@ -270,8 +288,8 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
if length == 0 {
return pf, nil, nil
}
if length > uint32(maxMsgSize) {
return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize)
if length > uint32(maxReceiveMessageSize) {
return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
Expand Down Expand Up @@ -314,7 +332,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
length = uint(len(b))
}
if length > math.MaxUint32 {
return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length)
return nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", length)
}

const (
Expand Down Expand Up @@ -355,8 +373,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}

func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return err
}
Expand All @@ -372,10 +390,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
if len(d) > maxMsgSize {
if len(d) > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)
return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
}
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
Expand Down Expand Up @@ -501,24 +519,22 @@ type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here.
WaitForReady bool
WaitForReady *bool
// Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration
Timeout *time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minumum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
// TODO: support this.
MaxReqSize uint32
MaxReqSize *int
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
// TODO: support this.
MaxRespSize uint32
MaxRespSize *int
}

// ServiceConfig is provided by the service provider and contains parameters for how
Expand All @@ -529,9 +545,32 @@ type ServiceConfig struct {
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
// If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig
}

func min(a, b *int) *int {
if *a < *b {
return a
}
return b
}

func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
if mcMax == nil && doptMax == nil {
return &defaultVal
}
if mcMax != nil && doptMax != nil {
return min(mcMax, doptMax)
}
if mcMax != nil {
return mcMax
}
return doptMax
}

// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
Expand Down
Loading