Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add redis token refresh support #3238

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bindings/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (r *Redis) Init(ctx context.Context, meta bindings.Metadata) (err error) {
if err != nil {
return err
}

_, err = r.client.PingResult(ctx)
if err != nil {
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err)
Expand Down
12 changes: 6 additions & 6 deletions common/component/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func ParseClientFromProperties(properties map[string]string, componentType metad

var c RedisClient
if settings.Failover {
c = newV8FailoverClient(settings)
c = newV8FailoverClient(settings, properties)
} else {
c = newV8Client(settings)
c = newV8Client(settings, properties)
}
version, versionErr := GetServerVersion(c)
c.Close() // close the client to avoid leaking connections
Expand All @@ -177,14 +177,14 @@ func ParseClientFromProperties(properties map[string]string, componentType metad
}
if useNewClient {
if settings.Failover {
return newV9FailoverClient(settings), settings, nil
return newV9FailoverClient(settings, properties), settings, nil
}
return newV9Client(settings), settings, nil
return newV9Client(settings, properties), settings, nil
} else {
if settings.Failover {
return newV8FailoverClient(settings), settings, nil
return newV8FailoverClient(settings, properties), settings, nil
}
return newV8Client(settings), settings, nil
return newV8Client(settings, properties), settings, nil
}
}

Expand Down
33 changes: 24 additions & 9 deletions common/component/redis/v8client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"strings"
"time"

"github.com/dapr/kit/logger"

Check failure on line 22 in common/component/redis/v8client.go

View workflow job for this annotation

GitHub Actions / Build linux_amd64 binaries

File is not `goimports`-ed with -local github.com/dapr/ (goimports)
v8 "github.com/go-redis/redis/v8"
)

Expand All @@ -27,6 +28,8 @@
writeTimeout Duration
}

var v8logger = logger.NewLogger("dapr.components.redisv9")
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved

func (p v8Pipeliner) Exec(ctx context.Context) error {
_, err := p.pipeliner.Exec(ctx)
return err
Expand All @@ -48,6 +51,7 @@
readTimeout Duration
writeTimeout Duration
dialTimeout Duration
closeCh chan struct{}
}

func (c v8Client) GetDel(ctx context.Context, key string) (string, error) {
Expand Down Expand Up @@ -316,10 +320,11 @@
return c.client.TTL(writeCtx, key).Result()
}

func newV8FailoverClient(s *Settings) RedisClient {
func newV8FailoverClient(s *Settings, properties map[string]string) RedisClient {
if s == nil {
return nil
}
closeCh := make(chan struct{})
opts := &v8.FailoverOptions{
DB: s.DB,
MasterName: s.SentinelMasterName,
Expand Down Expand Up @@ -349,27 +354,32 @@

if s.RedisType == ClusterType {
opts.SentinelAddrs = strings.Split(s.Host, ",")

client := v8.NewFailoverClusterClient(opts)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV8Client(client), properties, v8logger, closeCh)
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
return v8Client{
client: v8.NewFailoverClusterClient(opts),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

client := v8.NewFailoverClient(opts)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV8Client(client), properties, v8logger, closeCh)
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
return v8Client{
client: v8.NewFailoverClient(opts),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

func newV8Client(s *Settings) RedisClient {
func newV8Client(s *Settings, properties map[string]string) RedisClient {
if s == nil {
return nil
}
closeCh := make(chan struct{})
if s.RedisType == ClusterType {
options := &v8.ClusterOptions{
Addrs: strings.Split(s.Host, ","),
Expand All @@ -394,12 +404,15 @@
InsecureSkipVerify: s.EnableTLS,
}
}
client := v8.NewClusterClient(options)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV8Client(client), properties, v8logger, closeCh)

return v8Client{
client: v8.NewClusterClient(options),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

Expand Down Expand Up @@ -428,12 +441,14 @@
InsecureSkipVerify: s.EnableTLS,
}
}

client := v8.NewClient(options)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV8Client(client), properties, v8logger, closeCh)
return v8Client{
client: v8.NewClient(options),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

Expand Down
76 changes: 66 additions & 10 deletions common/component/redis/v9client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"

Check failure on line 22 in common/component/redis/v9client.go

View workflow job for this annotation

GitHub Actions / Build linux_amd64 binaries

File is not `goimports`-ed with -local github.com/dapr/ (goimports)
"github.com/dapr/components-contrib/common/authentication/azure"
"github.com/dapr/kit/logger"
v9 "github.com/redis/go-redis/v9"
)

Expand All @@ -27,6 +30,8 @@
writeTimeout Duration
}

var v9logger = logger.NewLogger("dapr.components.redisv9")

func (p v9Pipeliner) Exec(ctx context.Context) error {
_, err := p.pipeliner.Exec(ctx)
return err
Expand All @@ -48,6 +53,7 @@
readTimeout Duration
writeTimeout Duration
dialTimeout Duration
closeCh chan struct{}
}

func (c v9Client) GetDel(ctx context.Context, key string) (string, error) {
Expand Down Expand Up @@ -117,6 +123,7 @@
}

func (c v9Client) Close() error {
close(c.closeCh)
return c.client.Close()
}

Expand Down Expand Up @@ -317,10 +324,11 @@
return c.client.TTL(writeCtx, key).Result()
}

func newV9FailoverClient(s *Settings) RedisClient {
func newV9FailoverClient(s *Settings, properties map[string]string) RedisClient {
if s == nil {
return nil
}
closeCh := make(chan struct{})
opts := &v9.FailoverOptions{
DB: s.DB,
MasterName: s.SentinelMasterName,
Expand Down Expand Up @@ -350,27 +358,31 @@

if s.RedisType == ClusterType {
opts.SentinelAddrs = strings.Split(s.Host, ",")

client := v9.NewFailoverClusterClient(opts)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV9Client(client), properties, v9logger, closeCh)
return v9Client{
client: v9.NewFailoverClusterClient(opts),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

client := v9.NewFailoverClient(opts)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV9Client(client), properties, v9logger,closeCh)

Check failure on line 372 in common/component/redis/v9client.go

View workflow job for this annotation

GitHub Actions / Build linux_amd64 binaries

File is not `gofmt`-ed with `-s` (gofmt)
return v9Client{
client: v9.NewFailoverClient(opts),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}

func newV9Client(s *Settings) RedisClient {
func newV9Client(s *Settings, properties map[string]string) RedisClient {
if s == nil {
return nil
}
closeCh := make(chan struct{})
if s.RedisType == ClusterType {
options := &v9.ClusterOptions{
Addrs: strings.Split(s.Host, ","),
Expand All @@ -395,12 +407,14 @@
InsecureSkipVerify: s.EnableTLS,
}
}

client := v9.NewClusterClient(options)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV9Client(client), properties, v9logger, closeCh)
return v9Client{
client: v9.NewClusterClient(options),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

Expand Down Expand Up @@ -429,11 +443,53 @@
InsecureSkipVerify: s.EnableTLS,
}
}

client := v9.NewClient(options)
go refreshTokenRoutineForRedis(context.Background(), ClientFromV9Client(client), properties, v9logger, closeCh)
return v9Client{
client: v9.NewClient(options),
client: client,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
closeCh: closeCh,
}
}

func ClientFromV9Client(client v9.UniversalClient) RedisClient {
return v9Client{client: client}
}

func refreshTokenRoutineForRedis(ctx context.Context, redisClient RedisClient, meta map[string]string, logger logger.Logger, closeCh chan struct{}) {
ticker := time.NewTicker(time.Hour)
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for {
select {
case <-closeCh:
return
case <-ticker.C:
env, err := azure.NewEnvironmentSettings(meta)

Check failure on line 470 in common/component/redis/v9client.go

View workflow job for this annotation

GitHub Actions / Build linux_amd64 binaries

ineffectual assignment to err (ineffassign)
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
tokenCred, err := env.GetTokenCredential()
if err != nil {
logger.Debug("Failed to get Azure AD token credential:", err)
continue
}
at, err := tokenCred.GetToken(ctx, policy.TokenRequestOptions{
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Scopes: []string{
env.Cloud.Services[azure.ServiceOSSRDBMS].Audience + "/.default",
},
})
if err != nil {
logger.Debug("Failed to get Azure AD token:", err)
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// Authenticate with Redis using the refreshed token
err = redisClient.(v9Client).client.Pipeline().Auth(ctx, at.Token).Err()
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Debug("Failed to authenticate with Redis using refreshed Azure AD token:", err)
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
logger.Info("Successfully refreshed Azure AD token and re-authenticated Redis.")
}
}
}
Loading