Skip to content

Commit

Permalink
grpcclient: Support custom gRPC compressors (#583)
Browse files Browse the repository at this point in the history
* grpcclient: Support custom gRPC compressors

---------

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 authored Sep 19, 2024
1 parent 931a021 commit 9102f24
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
* [ENHANCEMENT] Added new ring methods to expose number of writable instances with tokens per zone, and overall. #560 #562
* [ENHANCEMENT] `services.FailureWatcher` can now be closed, which unregisters all service and manager listeners, and closes channel used to receive errors. #564
* [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571
* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
26 changes: 20 additions & 6 deletions grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package grpcclient

import (
"flag"
"slices"
"strings"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -40,6 +42,9 @@ type Config struct {

Middleware []grpc.UnaryClientInterceptor `yaml:"-"`
StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"`

// CustomCompressors allows configuring custom compressors.
CustomCompressors []string `yaml:"-"`
}

// RegisterFlags registers flags.
Expand All @@ -55,9 +60,19 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.InitialStreamWindowSize = defaultInitialWindowSize
cfg.InitialConnectionWindowSize = defaultInitialWindowSize

var supportedCompressors strings.Builder
supportedCompressors.WriteString("Use compression when sending messages. Supported values are: 'gzip', 'snappy'")
for _, cmp := range cfg.CustomCompressors {
supportedCompressors.WriteString(", ")
supportedCompressors.WriteString("'")
supportedCompressors.WriteString(cmp)
supportedCompressors.WriteString("'")
}
supportedCompressors.WriteString(" and '' (disable compression)")

f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", supportedCompressors.String())
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.")
Expand All @@ -74,11 +89,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
switch cfg.GRPCCompression {
case gzip.Name, snappy.Name, "":
// valid
default:
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression)
supportedCompressors := []string{gzip.Name, snappy.Name, ""}
supportedCompressors = append(supportedCompressors, cfg.CustomCompressors...)
if !slices.Contains(supportedCompressors, cfg.GRPCCompression) {
return errors.Errorf("unsupported compression type: %q", cfg.GRPCCompression)
}
return nil
}
Expand Down
46 changes: 46 additions & 0 deletions grpcclient/grpcclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package grpcclient

import (
"flag"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func TestConfig(t *testing.T) {
t.Run("custom compressors", func(t *testing.T) {
const comp = "custom"
cfg := Config{
CustomCompressors: []string{comp},
}
fs := flag.NewFlagSet("test", flag.PanicOnError)
cfg.RegisterFlagsWithPrefix("test", fs)
f := fs.Lookup("test.grpc-compression")
require.NotNil(t, f)
require.Equal(t, "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'custom' and '' (disable compression)", f.Usage)

t.Run("valid compressor", func(t *testing.T) {
cfg.GRPCCompression = comp

require.NoError(t, cfg.Validate())
opts := cfg.CallOptions()

var compressorOpt grpc.CompressorCallOption
for _, o := range opts {
co, ok := o.(grpc.CompressorCallOption)
if ok {
compressorOpt = co
break
}
}
require.Equal(t, comp, compressorOpt.CompressorType)
})

t.Run("invalid compressor", func(t *testing.T) {
cfg.GRPCCompression = "invalid"

require.EqualError(t, cfg.Validate(), `unsupported compression type: "invalid"`)
})
})
}

0 comments on commit 9102f24

Please sign in to comment.