Skip to content

Commit

Permalink
Merge pull request #14066 from rleungx/add-config
Browse files Browse the repository at this point in the history
embed: add `GRPCAdditionalServerOptions` config
  • Loading branch information
ahrtr authored Jun 21, 2024
2 parents 00a6097 + d9cb8b8 commit 1d13fc5
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 7 deletions.
7 changes: 7 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ type Config struct {
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// GRPCAdditionalServerOptions is the additional server option hook
// for changing the default internal gRPC configuration. Note these
// additional configurations take precedence over the existing individual
// configurations if present. Please refer to
// https://github.com/etcd-io/etcd/pull/14066#issuecomment-1248682996
GRPCAdditionalServerOptions []grpc.ServerOption `json:"grpc-additional-server-options"`

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts `json:"socket-options"`

Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ func (e *Etcd) serveClients() {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}
gopts = append(gopts, e.cfg.GRPCAdditionalServerOptions...)

splitHTTP := false
for _, sctx := range e.sctxs {
Expand Down
15 changes: 10 additions & 5 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,16 @@ type ClusterConfig struct {
QuotaBackendBytes int64
BackendBatchInterval time.Duration

MaxTxnOps uint
MaxRequestBytes uint
MaxTxnOps uint
MaxRequestBytes uint

SnapshotCount uint64
SnapshotCatchUpEntries uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
GRPCAdditionalServerOptions []grpc.ServerOption

ClientMaxCallSendMsgSize int
ClientMaxCallRecvMsgSize int
Expand Down Expand Up @@ -278,6 +280,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
GRPCAdditionalServerOptions: c.Cfg.GRPCAdditionalServerOptions,
ClientMaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize,
ClientMaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize,
UseIP: c.Cfg.UseIP,
Expand Down Expand Up @@ -603,6 +606,7 @@ type MemberConfig struct {
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
GRPCAdditionalServerOptions []grpc.ServerOption
ClientMaxCallSendMsgSize int
ClientMaxCallRecvMsgSize int
UseIP bool
Expand Down Expand Up @@ -709,6 +713,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
Timeout: mcfg.GRPCKeepAliveTimeout,
}))
}
m.GRPCServerOpts = append(m.GRPCServerOpts, mcfg.GRPCAdditionalServerOptions...)
m.ClientMaxCallSendMsgSize = mcfg.ClientMaxCallSendMsgSize
m.ClientMaxCallRecvMsgSize = mcfg.ClientMaxCallRecvMsgSize
m.UseIP = mcfg.UseIP
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/clientv3/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,14 +702,12 @@ func TestKVLargeRequests(t *testing.T) {
// without proper client-side receive size limit
// "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)"
{

maxRequestBytesServer: 7*1024*1024 + 512*1024,
maxCallSendBytesClient: 7 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 5 * 1024 * 1024,
expectError: nil,
},

{
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 100 * 1024 * 1024,
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,7 +1923,71 @@ func TestV3LargeRequests(t *testing.T) {
t.Errorf("#%d: range expected no error, got %v", i, err)
}
}
})
}
}

// TestV3AdditionalGRPCOptions ensures that configurable GRPCAdditionalServerOptions works as intended.
func TestV3AdditionalGRPCOptions(t *testing.T) {
integration.BeforeTest(t)
tests := []struct {
name string
maxRequestBytes uint
grpcOpts []grpc.ServerOption
valueSize int
expectError error
}{
{
name: "requests will get a gRPC error because it's larger than gRPC MaxRecvMsgSize",
maxRequestBytes: 8 * 1024 * 1024,
grpcOpts: nil,
valueSize: 9 * 1024 * 1024,
expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"),
},
{
name: "requests will get an etcd custom gRPC error because it's larger than MaxRequestBytes",
maxRequestBytes: 8 * 1024 * 1024,
grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(10 * 1024 * 1024)},
valueSize: 9 * 1024 * 1024,
expectError: rpctypes.ErrGRPCRequestTooLarge,
},
{
name: "requests size is smaller than MaxRequestBytes but larger than MaxRecvMsgSize",
maxRequestBytes: 8 * 1024 * 1024,
grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(4 * 1024 * 1024)},
valueSize: 6 * 1024 * 1024,
expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 1,
MaxRequestBytes: test.maxRequestBytes,
ClientMaxCallSendMsgSize: 12 * 1024 * 1024,
GRPCAdditionalServerOptions: test.grpcOpts,
})
defer clus.Terminate(t)
kvcli := integration.ToGRPC(clus.Client(0)).KV
reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
if _, err := kvcli.Put(context.TODO(), reqput); err != nil {
if _, ok := err.(rpctypes.EtcdError); ok {
if err.Error() != status.Convert(test.expectError).Message() {
t.Errorf("expected %v, got %v", status.Convert(test.expectError).Message(), err.Error())
}
} else if !strings.HasPrefix(err.Error(), test.expectError.Error()) {
t.Errorf("expected error starting with '%s', got '%s'", test.expectError.Error(), err.Error())
}
}
// request went through, expect large response back from server
if test.expectError == nil {
reqget := &pb.RangeRequest{Key: []byte("foo")}
// limit receive call size with original value + gRPC overhead bytes
_, err := kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
if err != nil {
t.Errorf("range expected no error, got %v", err)
}
}
})
}
}
Expand Down

0 comments on commit 1d13fc5

Please sign in to comment.