From a5c50f879626cd64e603b2384b58c013f4284aa7 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Thu, 6 Aug 2020 09:55:04 -0700 Subject: [PATCH] Redis universal client (#162) * Move to Redis Universal Client to enable different Redis Configurations * generate flags * fix logs * fix comment * Comments * Comments --- .../controller/nodes/handler/state_test.go | 25 +++++++++++ .../task/resourcemanager/config/config.go | 4 ++ .../resourcemanager/config/config_flags.go | 2 + .../config/config_flags_test.go | 44 +++++++++++++++++++ .../task/resourcemanager/redis_client.go | 16 ++++--- 5 files changed, 86 insertions(+), 5 deletions(-) create mode 100644 flytepropeller/pkg/controller/nodes/handler/state_test.go diff --git a/flytepropeller/pkg/controller/nodes/handler/state_test.go b/flytepropeller/pkg/controller/nodes/handler/state_test.go new file mode 100644 index 0000000000..0ca3a980a6 --- /dev/null +++ b/flytepropeller/pkg/controller/nodes/handler/state_test.go @@ -0,0 +1,25 @@ +package handler + +import ( + "bytes" + "encoding/base64" + "encoding/gob" + "testing" + + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/k8s" + + "github.com/stretchr/testify/assert" +) + +// A test to demonstrate how to unmarshal a serialized state from a workflow CRD. +func TestDecodeTaskState(t *testing.T) { + str := `I/+DAwEBC1BsdWdpblN0YXRlAf+EAAEBAQVQaGFzZQEGAAAABf+EAQIA` + reader := base64.NewDecoder(base64.RawStdEncoding, bytes.NewReader([]byte(str))) + dec := gob.NewDecoder(reader) + st := &k8s.PluginState{} + err := dec.Decode(st) + if assert.NoError(t, err) { + t.Logf("Deserialized State: [%+v]", st) + assert.Equal(t, k8s.PluginPhaseStarted, st.Phase) + } +} diff --git a/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config.go b/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config.go index e3b49aaac6..2f19d9ea84 100644 --- a/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config.go +++ b/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config.go @@ -33,7 +33,11 @@ type Config struct { } // Specific configs for Redis resource manager +// Ref: https://redis.io/topics/sentinel for information on how to fill in these fields. type RedisConfig struct { + HostPaths []string `json:"hostPaths" pflag:",Redis hosts locations."` + PrimaryName string `json:"primaryName" pflag:",Redis primary name, fill in only if you are connecting to a redis sentinel cluster."` + // deprecated: Please use HostPaths instead HostPath string `json:"hostPath" pflag:",Redis host location"` HostKey string `json:"hostKey" pflag:",Key for local Redis access"` MaxRetries int `json:"maxRetries" pflag:",See Redis client options for more info"` diff --git a/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags.go b/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags.go index 624361a37d..61871d8f8b 100755 --- a/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags.go +++ b/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags.go @@ -43,6 +43,8 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) cmdFlags.String(fmt.Sprintf("%v%v", prefix, "type"), defaultConfig.Type, "Which resource manager to use") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "resourceMaxQuota"), defaultConfig.ResourceMaxQuota, "Global limit for concurrent Qubole queries") + cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "redis.hostPaths"), []string{}, "Redis hosts locations.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "redis.primaryName"), defaultConfig.RedisConfig.PrimaryName, "Redis primary name, fill in only if you are connecting to a redis sentinel cluster.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "redis.hostPath"), defaultConfig.RedisConfig.HostPath, "Redis host location") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "redis.hostKey"), defaultConfig.RedisConfig.HostKey, "Key for local Redis access") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "redis.maxRetries"), defaultConfig.RedisConfig.MaxRetries, "See Redis client options for more info") diff --git a/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags_test.go b/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags_test.go index a65ba6e2f0..28a15aff02 100755 --- a/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/nodes/task/resourcemanager/config/config_flags_test.go @@ -143,6 +143,50 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_redis.hostPaths", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vStringSlice, err := cmdFlags.GetStringSlice("redis.hostPaths"); err == nil { + assert.Equal(t, []string([]string{}), vStringSlice) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := join_Config("1,1", ",") + + cmdFlags.Set("redis.hostPaths", testValue) + if vStringSlice, err := cmdFlags.GetStringSlice("redis.hostPaths"); err == nil { + testDecodeSlice_Config(t, join_Config(vStringSlice, ","), &actual.RedisConfig.HostPaths) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_redis.primaryName", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vString, err := cmdFlags.GetString("redis.primaryName"); err == nil { + assert.Equal(t, string(defaultConfig.RedisConfig.PrimaryName), vString) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("redis.primaryName", testValue) + if vString, err := cmdFlags.GetString("redis.primaryName"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.RedisConfig.PrimaryName) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_redis.hostPath", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly diff --git a/flytepropeller/pkg/controller/nodes/task/resourcemanager/redis_client.go b/flytepropeller/pkg/controller/nodes/task/resourcemanager/redis_client.go index 6bbab18e28..6261ffc175 100644 --- a/flytepropeller/pkg/controller/nodes/task/resourcemanager/redis_client.go +++ b/flytepropeller/pkg/controller/nodes/task/resourcemanager/redis_client.go @@ -27,7 +27,7 @@ type RedisClient interface { } type Redis struct { - c *redis.Client + c redis.UniversalClient } func (r *Redis) SCard(key string) (int64, error) { @@ -55,9 +55,15 @@ func (r *Redis) Ping() (string, error) { } func NewRedisClient(ctx context.Context, config config.RedisConfig) (RedisClient, error) { + // Backward compatibility + if len(config.HostPaths) == 0 && len(config.HostPath) > 0 { + config.HostPaths = []string{config.HostPath} + } + client := &Redis{ - c: redis.NewClient(&redis.Options{ - Addr: config.HostPath, + c: redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: config.HostPaths, + MasterName: config.PrimaryName, Password: config.HostKey, DB: 0, // use default DB MaxRetries: config.MaxRetries, @@ -66,10 +72,10 @@ func NewRedisClient(ctx context.Context, config config.RedisConfig) (RedisClient _, err := client.Ping() if err != nil { - logger.Errorf(ctx, "Error creating Redis client at [%s]. Error: %v", config.HostPath, err) + logger.Errorf(ctx, "Error creating Redis client at [%+v]. Error: %v", config.HostPaths, err) return nil, err } - logger.Infof(ctx, "Created Redis client with host [%s]...", config.HostPath) + logger.Infof(ctx, "Created Redis client with host [%+v]...", config.HostPaths) return client, nil }