Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring internal/db/kvs/redis package #590

Merged
merged 6 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/db/kvs/redis/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

redis "github.com/go-redis/redis/v7"
"github.com/vdaas/vald/internal/net"

"go.uber.org/goleak"
)

Expand Down
154 changes: 87 additions & 67 deletions internal/db/kvs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
)

var (
// Nil is a type alias of redis.Nil.
Nil = redis.Nil
)

// Redis is an interface to manipulate Redis server.
type Redis interface {
TxPipeline() redis.Pipeliner
Ping() *StatusCmd
Expand All @@ -42,10 +44,16 @@ type Redis interface {
Deleter
}

type Conn = redis.Conn
type IntCmd = redis.IntCmd
type StringCmd = redis.StringCmd
type StatusCmd = redis.StatusCmd
type (
// Conn is a type alias of redis.Conn.
Conn = redis.Conn
// IntCmd is a type alias of redis.IntCmd.
IntCmd = redis.IntCmd
// StringCmd is a type alias of redis.StringCmd.
StringCmd = redis.StringCmd
// StatusCmd is a type alias of redis.StatusCmd.
StatusCmd = redis.StatusCmd
)

type redisClient struct {
addrs []string
Expand Down Expand Up @@ -75,89 +83,101 @@ type redisClient struct {
routeRandomly bool
tlsConfig *tls.Config
writeTimeout time.Duration
client Redis
}

// New returns Redis implementation if no error occurs.
func New(ctx context.Context, opts ...Option) (rc Redis, err error) {
r := new(redisClient)
for _, opt := range append(defaultOpts, opts...) {
if err = opt(r); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}
switch len(r.addrs) {

r, err = r.newRedisClient(ctx)
if err != nil {
return nil, err
}

return r.ping(ctx)
}

func (rc *redisClient) newRedisClient(ctx context.Context) (*redisClient, error) {
switch len(rc.addrs) {
case 0:
return nil, errors.ErrRedisAddrsNotFound
case 1:
if len(r.addrs[0]) == 0 {
if len(rc.addrs[0]) == 0 {
return nil, errors.ErrRedisAddrsNotFound
}
rc = redis.NewClient(&redis.Options{
Addr: r.addrs[0],
Password: r.password,
Dialer: r.dialer,
OnConnect: r.onConnect,
DB: r.db,
MaxRetries: r.maxRetries,
MinRetryBackoff: r.minRetryBackoff,
MaxRetryBackoff: r.maxRetryBackoff,
DialTimeout: r.dialTimeout,
ReadTimeout: r.readTimeout,
WriteTimeout: r.writeTimeout,
PoolSize: r.poolSize,
MinIdleConns: r.minIdleConns,
MaxConnAge: r.maxConnAge,
PoolTimeout: r.poolTimeout,
IdleTimeout: r.idleTimeout,
IdleCheckFrequency: r.idleCheckFrequency,
TLSConfig: r.tlsConfig,
rc.client = redis.NewClient(&redis.Options{
Addr: rc.addrs[0],
Password: rc.password,
Dialer: rc.dialer,
OnConnect: rc.onConnect,
DB: rc.db,
MaxRetries: rc.maxRetries,
MinRetryBackoff: rc.minRetryBackoff,
MaxRetryBackoff: rc.maxRetryBackoff,
DialTimeout: rc.dialTimeout,
ReadTimeout: rc.readTimeout,
WriteTimeout: rc.writeTimeout,
PoolSize: rc.poolSize,
MinIdleConns: rc.minIdleConns,
MaxConnAge: rc.maxConnAge,
PoolTimeout: rc.poolTimeout,
IdleTimeout: rc.idleTimeout,
IdleCheckFrequency: rc.idleCheckFrequency,
TLSConfig: rc.tlsConfig,
})
default:
rc = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.addrs,
Dialer: r.dialer,
MaxRedirects: r.maxRedirects,
ReadOnly: r.readOnly,
RouteByLatency: r.routeByLatency,
RouteRandomly: r.routeRandomly,
ClusterSlots: r.clusterSlots,
OnNewNode: r.onNewNode,
OnConnect: r.onConnect,
Password: r.password,
MaxRetries: r.maxRetries,
MinRetryBackoff: r.minRetryBackoff,
MaxRetryBackoff: r.maxRetryBackoff,
DialTimeout: r.dialTimeout,
ReadTimeout: r.readTimeout,
WriteTimeout: r.writeTimeout,
PoolSize: r.poolSize,
MinIdleConns: r.minIdleConns,
MaxConnAge: r.maxConnAge,
PoolTimeout: r.poolTimeout,
IdleTimeout: r.idleTimeout,
IdleCheckFrequency: r.idleCheckFrequency,
TLSConfig: r.tlsConfig,
rc.client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: rc.addrs,
Dialer: rc.dialer,
MaxRedirects: rc.maxRedirects,
ReadOnly: rc.readOnly,
RouteByLatency: rc.routeByLatency,
RouteRandomly: rc.routeRandomly,
ClusterSlots: rc.clusterSlots,
OnNewNode: rc.onNewNode,
OnConnect: rc.onConnect,
Password: rc.password,
MaxRetries: rc.maxRetries,
MinRetryBackoff: rc.minRetryBackoff,
MaxRetryBackoff: rc.maxRetryBackoff,
DialTimeout: rc.dialTimeout,
ReadTimeout: rc.readTimeout,
WriteTimeout: rc.writeTimeout,
PoolSize: rc.poolSize,
MinIdleConns: rc.minIdleConns,
MaxConnAge: rc.maxConnAge,
PoolTimeout: rc.poolTimeout,
IdleTimeout: rc.idleTimeout,
IdleCheckFrequency: rc.idleCheckFrequency,
TLSConfig: rc.tlsConfig,
}).WithContext(ctx)
}

err = func() (err error) {
pctx, cancel := context.WithTimeout(ctx, r.initialPingTimeLimit)
defer cancel()
tick := time.NewTicker(r.initialPingDuration)
for {
select {
case <-pctx.Done():
return errors.Wrap(errors.Wrap(err, errors.ErrRedisConnectionPingFailed.Error()), ctx.Err().Error())
case <-tick.C:
err = rc.Ping().Err()
if err == nil {
return nil
}
log.Error(err)
return rc, nil
}

func (rc *redisClient) ping(ctx context.Context) (r Redis, err error) {
pctx, cancel := context.WithTimeout(ctx, rc.initialPingTimeLimit)
defer cancel()
tick := time.NewTicker(rc.initialPingDuration)
for {
select {
case <-pctx.Done():
err = errors.Wrap(errors.Wrap(err, errors.ErrRedisConnectionPingFailed.Error()), pctx.Err().Error())
log.Error(err)
return nil, err
case <-tick.C:
err = rc.client.Ping().Err()
if err == nil {
return rc.client, nil
}
log.Warn(err)
}
}()
if err != nil {
return nil, err
}
return rc, nil
}
53 changes: 53 additions & 0 deletions internal/db/kvs/redis/redis_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package redis

import redis "github.com/go-redis/redis/v7"

type MockRedis struct {
TxPipelineFunc func() redis.Pipeliner
PingFunc func() *StatusCmd
CloseFunc func() error
GetFunc func(string) *redis.StringCmd
MGetFunc func(...string) *redis.SliceCmd
DelFunc func(keys ...string) *redis.IntCmd
}

var _ = (*MockRedis)(nil)

func (m *MockRedis) TxPipeline() redis.Pipeliner {
return m.TxPipelineFunc()
}

func (m *MockRedis) Ping() *StatusCmd {
return m.PingFunc()
}

func (m *MockRedis) Close() error {
return m.CloseFunc()
}

func (m *MockRedis) Get(key string) *redis.StringCmd {
return m.GetFunc(key)
}

func (m *MockRedis) MGet(keys ...string) *redis.SliceCmd {
return m.MGetFunc(keys...)
}

func (m *MockRedis) Del(keys ...string) *redis.IntCmd {
return m.DelFunc(keys...)
}
Loading