Skip to content

Commit

Permalink
Configurable max gRPC message size (flyteorg#339)
Browse files Browse the repository at this point in the history
* Configurable max gRPC message size

Signed-off-by: Katrina Rogan <[email protected]>

* derp

Signed-off-by: Katrina Rogan <[email protected]>

* generate

Signed-off-by: Katrina Rogan <[email protected]>

* both insecure & secure http, grpc too

Signed-off-by: Katrina Rogan <[email protected]>

* regen

Signed-off-by: Katrina Rogan <[email protected]>

* Review comments

Signed-off-by: Katrina Rogan <[email protected]>

* review comments

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Feb 17, 2022
1 parent 4af6bd5 commit 6cf09d1
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 12 deletions.
25 changes: 21 additions & 4 deletions cmd/entrypoints/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func newGRPCServer(ctx context.Context, cfg *config.ServerConfig, authCtx interf
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(chainedUnaryInterceptors),
}
if cfg.GrpcConfig.MaxMessageSizeBytes > 0 {
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(cfg.GrpcConfig.MaxMessageSizeBytes))
}
serverOpts = append(serverOpts, opts...)
grpcServer := grpc.NewServer(serverOpts...)
grpcPrometheus.Register(grpcServer)
Expand All @@ -125,7 +128,7 @@ func newGRPCServer(ctx context.Context, cfg *config.ServerConfig, authCtx interf
healthServer := health.NewServer()
healthServer.SetServingStatus("flyteadmin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
if cfg.GrpcServerReflection {
if cfg.GrpcConfig.ServerReflection || cfg.GrpcServerReflection {
reflection.Register(grpcServer)
}
return grpcServer, nil
Expand Down Expand Up @@ -263,8 +266,15 @@ func serveGatewayInsecure(ctx context.Context, cfg *config.ServerConfig, authCfg
}()

logger.Infof(ctx, "Starting HTTP/1 Gateway server on %s", cfg.GetHostAddress())
httpServer, err := newHTTPServer(ctx, cfg, authCfg, authCtx, cfg.GetGrpcHostAddress(), grpc.WithInsecure(),
grpc.WithMaxHeaderListSize(common.MaxResponseStatusBytes))
grpcOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithMaxHeaderListSize(common.MaxResponseStatusBytes),
}
if cfg.GrpcConfig.MaxMessageSizeBytes > 0 {
grpcOptions = append(grpcOptions,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.GrpcConfig.MaxMessageSizeBytes)))
}
httpServer, err := newHTTPServer(ctx, cfg, authCfg, authCtx, cfg.GetGrpcHostAddress(), grpcOptions...)
if err != nil {
return err
}
Expand Down Expand Up @@ -351,7 +361,14 @@ func serveGatewaySecure(ctx context.Context, cfg *config.ServerConfig, authCfg *
ServerName: cfg.GetHostAddress(),
RootCAs: certPool,
})
httpServer, err := newHTTPServer(ctx, cfg, authCfg, authCtx, cfg.GetHostAddress(), grpc.WithTransportCredentials(dialCreds))
serverOpts := []grpc.DialOption{
grpc.WithTransportCredentials(dialCreds),
}
if cfg.GrpcConfig.MaxMessageSizeBytes > 0 {
serverOpts = append(serverOpts,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.GrpcConfig.MaxMessageSizeBytes)))
}
httpServer, err := newHTTPServer(ctx, cfg, authCfg, authCtx, cfg.GetHostAddress(), serverOpts...)
if err != nil {
return err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ const SectionKey = "server"

type ServerConfig struct {
HTTPPort int `json:"httpPort" pflag:",On which http port to serve admin"`
GrpcPort int `json:"grpcPort" pflag:",On which grpc port to serve admin"`
GrpcServerReflection bool `json:"grpcServerReflection" pflag:",Enable GRPC Server Reflection"`
GrpcPort int `json:"grpcPort" pflag:",deprecated"`
GrpcServerReflection bool `json:"grpcServerReflection" pflag:",deprecated"`
KubeConfig string `json:"kube-config" pflag:",Path to kubernetes client config file, default is empty, useful for incluster config."`
Master string `json:"master" pflag:",The address of the Kubernetes API server."`
Security ServerSecurityOptions `json:"security"`

GrpcConfig GrpcConfig `json:"grpc"`
// Deprecated: please use auth.AppAuth.ThirdPartyConfig instead.
DeprecatedThirdPartyConfig authConfig.ThirdPartyConfigOptions `json:"thirdPartyConfig" pflag:",Deprecated please use auth.appAuth.thirdPartyConfig instead."`
}

type GrpcConfig struct {
Port int `json:"port" pflag:",On which grpc port to serve admin"`
ServerReflection bool `json:"serverReflection" pflag:",Enable GRPC Server Reflection"`
MaxMessageSizeBytes int `json:"maxMessageSizeBytes" pflag:",The max size in bytes for incoming gRPC messages"`
}

type ServerSecurityOptions struct {
Secure bool `json:"secure"`
Ssl SslOptions `json:"ssl"`
Expand All @@ -48,14 +54,17 @@ type SslOptions struct {
}

var defaultServerConfig = &ServerConfig{
HTTPPort: 8088,
GrpcPort: 8089,
GrpcServerReflection: true,
HTTPPort: 8088,
KubeConfig: "$HOME/.kube/config",
Security: ServerSecurityOptions{
AllowCors: true,
AllowedHeaders: []string{"Content-Type", "flyte-authorization"},
AllowedOrigins: []string{"*"},
},
GrpcConfig: GrpcConfig{
Port: 8089,
ServerReflection: true,
},
}
var serverConfig = config.MustRegisterSection(SectionKey, defaultServerConfig)

Expand All @@ -78,6 +87,9 @@ func (s ServerConfig) GetHostAddress() string {
}

func (s ServerConfig) GetGrpcHostAddress() string {
if s.GrpcConfig.Port >= 0 {
return fmt.Sprintf(":%d", s.GrpcConfig.Port)
}
return fmt.Sprintf(":%d", s.GrpcPort)
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/config/serverconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions pkg/config/serverconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6cf09d1

Please sign in to comment.