diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index 8e0c49338..525ad7147 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -166,6 +166,7 @@ func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) ProviderID: config.ProviderID, Secure: config.TLS, Selector: config.Selector, + MaxMsgSize: config.MaxMsgSize, } } diff --git a/core/pkg/sync/builder/syncbuilder_test.go b/core/pkg/sync/builder/syncbuilder_test.go index 54120d64d..1cdb3d404 100644 --- a/core/pkg/sync/builder/syncbuilder_test.go +++ b/core/pkg/sync/builder/syncbuilder_test.go @@ -173,6 +173,26 @@ func Test_SyncsFromFromConfig(t *testing.T) { }, wantErr: false, }, + { + name: "grpc-with-msg-size", + args: args{ + logger: lg, + sources: []sync.SourceConfig{ + { + URI: "grpc://host:port", + Provider: syncProviderGrpc, + ProviderID: "myapp", + CertPath: "/tmp/ca.cert", + Selector: "source=database", + MaxMsgSize: 10, + }, + }, + }, + wantSyncs: []sync.ISync{ + &grpc.Sync{}, + }, + wantErr: false, + }, { name: "combined", injectFunc: func(builder *SyncBuilder) { diff --git a/core/pkg/sync/grpc/grpc_sync.go b/core/pkg/sync/grpc/grpc_sync.go index 02411a60b..ba89d01a2 100644 --- a/core/pkg/sync/grpc/grpc_sync.go +++ b/core/pkg/sync/grpc/grpc_sync.go @@ -48,6 +48,7 @@ type Sync struct { Secure bool Selector string URI string + MaxMsgSize int client FlagSyncServiceClient ready bool @@ -62,7 +63,17 @@ func (g *Sync) Init(_ context.Context) error { } // Derive reusable client connection - rpcCon, err := grpc.NewClient(g.URI, grpc.WithTransportCredentials(tCredentials)) + // Set MaxMsgSize if passed + var rpcCon *grpc.ClientConn + + if g.MaxMsgSize > 0 { + g.Logger.Info(fmt.Sprintf("setting max receive message size %d bytes default 4MB", g.MaxMsgSize)) + dialOptions := grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(g.MaxMsgSize)) + rpcCon, err = grpc.NewClient(g.URI, grpc.WithTransportCredentials(tCredentials), dialOptions) + } else { + rpcCon, err = grpc.NewClient(g.URI, grpc.WithTransportCredentials(tCredentials)) + } + if err != nil { err := fmt.Errorf("error initiating grpc client connection: %w", err) g.Logger.Error(err.Error()) diff --git a/core/pkg/sync/grpc/grpc_sync_test.go b/core/pkg/sync/grpc/grpc_sync_test.go index a00caea20..50e830eb1 100644 --- a/core/pkg/sync/grpc/grpc_sync_test.go +++ b/core/pkg/sync/grpc/grpc_sync_test.go @@ -19,6 +19,8 @@ import ( grpcmock "github.com/open-feature/flagd/core/pkg/sync/grpc/mock" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -78,6 +80,30 @@ func Test_InitWithMockCredentialBuilder(t *testing.T) { } } +func Test_InitWithSizeOverride(t *testing.T) { + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + observedLogger := zap.New(observedZapCore) + + mockCtrl := gomock.NewController(t) + mockCredentialBulder := credendialsmock.NewMockBuilder(mockCtrl) + + mockCredentialBulder.EXPECT(). + Build(gomock.Any(), gomock.Any()). + Return(insecure.NewCredentials(), nil) + + grpcSync := Sync{ + URI: "grpc-target", + Logger: logger.NewLogger(observedLogger, false), + CredentialBuilder: mockCredentialBulder, + MaxMsgSize: 10, + } + + err := grpcSync.Init(context.Background()) + + require.Nilf(t, err, "%s: expected no error, but got non nil error", t.Name()) + require.Equal(t, "setting max receive message size 10 bytes default 4MB", observedLogs.All()[0].Message) +} + func Test_ReSyncTests(t *testing.T) { const target = "localBufCon" diff --git a/core/pkg/sync/isync.go b/core/pkg/sync/isync.go index ce8a35635..a72c86447 100644 --- a/core/pkg/sync/isync.go +++ b/core/pkg/sync/isync.go @@ -73,4 +73,5 @@ type SourceConfig struct { ProviderID string `json:"providerID,omitempty"` Selector string `json:"selector,omitempty"` Interval uint32 `json:"interval,omitempty"` + MaxMsgSize int `json:"maxMsgSize,omitempty"` } diff --git a/docs/reference/sync-configuration.md b/docs/reference/sync-configuration.md index 8882afa5c..0c3433295 100644 --- a/docs/reference/sync-configuration.md +++ b/docs/reference/sync-configuration.md @@ -38,6 +38,7 @@ Alternatively, these configurations can be passed to flagd via config file, spec | providerID | optional `string` | Value binds to grpc connection's providerID field. gRPC server implementations may use this to identify connecting flagd instance | | selector | optional `string` | Value binds to grpc connection's selector field. gRPC server implementations may use this to filter flag configurations | | certPath | optional `string` | Used for grpcs sync when TLS certificate is needed. If not provided, system certificates will be used for TLS connection | +| maxMsgSize | optional `int` | Used for gRPC sync to set max receive message size (in bytes) e.g. 5242880 for 5MB. If not provided, the default is [4MB](https://pkg.go.dev/google.golang.org#grpc#MaxCallRecvMsgSize) | The `uri` field values **do not** follow the [URI patterns](#uri-patterns). The provider type is instead derived from the `provider` field. Only exception is the remote provider where `http(s)://` is expected by default. Incorrect @@ -64,6 +65,7 @@ Startup command: {"uri":"https://secure-remote/basic-auth","provider":"http","authHeader":"Basic dXNlcjpwYXNz"}, {"uri":"default/my-flag-config","provider":"kubernetes"}, {"uri":"grpc-source:8080","provider":"grpc"}, + {"uri":"my-flag-source:8080","provider":"grpc", "maxMsgSize": 5242880}, {"uri":"my-flag-source:8080","provider":"grpc", "certPath": "/certs/ca.cert", "tls": true, "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}]' ``` @@ -80,6 +82,9 @@ sources: provider: kubernetes - uri: my-flag-source:8080 provider: grpc + - uri: my-flag-source:8080 + provider: grpc + maxMsgSize: 5242880 - uri: my-flag-source:8080 provider: grpc certPath: /certs/ca.cert