From 3adb3571ab5361326925304d4a95f89c26fd0ee0 Mon Sep 17 00:00:00 2001 From: Karthik K Date: Thu, 28 Jun 2018 17:47:56 +0530 Subject: [PATCH 1/4] refactor: use standardized redis uri for dialer arguments --- redis_broker.go | 31 +++---------------------------- redis_pool.go | 27 +++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 28 deletions(-) create mode 100644 redis_pool.go diff --git a/redis_broker.go b/redis_broker.go index 7a95d7c..f6b836a 100644 --- a/redis_broker.go +++ b/redis_broker.go @@ -17,35 +17,10 @@ type RedisCeleryBroker struct { workWG sync.WaitGroup } -// NewRedisPool creates pool of redis connections -func NewRedisPool(host, pass string) *redis.Pool { - return &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", host) - if err != nil { - return nil, err - } - if pass != "" { - if _, err = c.Do("AUTH", pass); err != nil { - c.Close() - return nil, err - } - } - return c, err - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, - } -} - -// NewRedisCeleryBroker creates new RedisCeleryBroker -func NewRedisCeleryBroker(host, pass string) *RedisCeleryBroker { +// NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri +func NewRedisCeleryBroker(uri string) *RedisCeleryBroker { return &RedisCeleryBroker{ - Pool: NewRedisPool(host, pass), + Pool: NewRedisPool(uri), queueName: "celery", } } diff --git a/redis_pool.go b/redis_pool.go new file mode 100644 index 0000000..fa2c583 --- /dev/null +++ b/redis_pool.go @@ -0,0 +1,27 @@ +package gocelery + +import ( + "time" + + "github.com/gomodule/redigo/redis" +) + + +// NewRedisPoolURI creates pool of redis connections from given uri +func NewRedisPoolURI(uri string) *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL(uri) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } +} From 8109b38ee71b0efd038cde1fd67ae50f6660d18e Mon Sep 17 00:00:00 2001 From: Karthik K Date: Thu, 28 Jun 2018 18:03:38 +0530 Subject: [PATCH 2/4] fix: test cases --- README.md | 8 ++++---- backend_test.go | 6 +++--- broker_test.go | 6 +++--- gocelery_test.go | 4 ++-- redis_backend.go | 6 +++--- redis_broker.go | 21 ++++++++++++++++++++- worker_test.go | 4 ++-- 7 files changed, 37 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index b891ca4..1ca3d72 100644 --- a/README.md +++ b/README.md @@ -54,8 +54,8 @@ func add(a int, b int) int { func main() { // create broker and backend - celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "") - celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "") + celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379") + celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379") // use AMQP instead // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://") @@ -154,8 +154,8 @@ Submit Task from Go Client ```go func main() { // create broker and backend - celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "") - celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "") + celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379") + celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379") // use AMQP instead // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://") diff --git a/backend_test.go b/backend_test.go index 3228e55..4f1ea4f 100644 --- a/backend_test.go +++ b/backend_test.go @@ -10,14 +10,14 @@ import ( func getBackends() []CeleryBackend { return []CeleryBackend{ - NewRedisCeleryBackend("localhost:6379", ""), + NewRedisCeleryBackend("redis://localhost:6379"), NewAMQPCeleryBackend("amqp://"), } } // TestGetResult is Redis specific test to get result from backend func TestGetResult(t *testing.T) { - backend := NewRedisCeleryBackend("localhost:6379", "") + backend := NewRedisCeleryBackend("redis://localhost:6379") taskID := generateUUID() // value must be float64 for testing due to json limitation @@ -46,7 +46,7 @@ func TestGetResult(t *testing.T) { // TestSetResult is Redis specific test to set result to backend func TestSetResult(t *testing.T) { - backend := NewRedisCeleryBackend("localhost:6379", "") + backend := NewRedisCeleryBackend("redis://localhost:6379") taskID := generateUUID() value := reflect.ValueOf(rand.Float64()) resultMessage := getReflectionResultMessage(&value) diff --git a/broker_test.go b/broker_test.go index d4a273b..d9fce14 100644 --- a/broker_test.go +++ b/broker_test.go @@ -21,14 +21,14 @@ func makeCeleryMessage() (*CeleryMessage, error) { // test all brokers func getBrokers() []CeleryBroker { return []CeleryBroker{ - NewRedisCeleryBroker("localhost:6379", ""), + NewRedisCeleryBroker("redis://localhost:6379"), //NewAMQPCeleryBroker("amqp://"), } } // TestSend is Redis specific test that sets CeleryMessage to queue func TestSend(t *testing.T) { - broker := NewRedisCeleryBroker("localhost:6379", "") + broker := NewRedisCeleryBroker("redis://localhost:6379") celeryMessage, err := makeCeleryMessage() if err != nil || celeryMessage == nil { t.Errorf("failed to construct celery message: %v", err) @@ -58,7 +58,7 @@ func TestSend(t *testing.T) { // TestGet is Redis specific test that gets CeleryMessage from queue func TestGet(t *testing.T) { - broker := NewRedisCeleryBroker("localhost:6379", "") + broker := NewRedisCeleryBroker("redis://localhost:6379") celeryMessage, err := makeCeleryMessage() if err != nil || celeryMessage == nil { t.Errorf("failed to construct celery message: %v", err) diff --git a/gocelery_test.go b/gocelery_test.go index e43e756..931b9af 100644 --- a/gocelery_test.go +++ b/gocelery_test.go @@ -52,8 +52,8 @@ func getAMQPClient() (*CeleryClient, error) { } func getRedisClient() (*CeleryClient, error) { - redisBroker := NewRedisCeleryBroker("localhost:6379", "") - redisBackend := NewRedisCeleryBackend("localhost:6379", "") + redisBroker := NewRedisCeleryBroker("redis://localhost:6379") + redisBackend := NewRedisCeleryBackend("redis://localhost:6379") return NewCeleryClient(redisBroker, redisBackend, 1) } diff --git a/redis_backend.go b/redis_backend.go index 7af477d..6914ed7 100644 --- a/redis_backend.go +++ b/redis_backend.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) // RedisCeleryBackend is CeleryBackend for Redis @@ -13,9 +13,9 @@ type RedisCeleryBackend struct { } // NewRedisCeleryBackend creates new RedisCeleryBackend -func NewRedisCeleryBackend(host, pass string) *RedisCeleryBackend { +func NewRedisCeleryBackend(uri string) *RedisCeleryBackend { return &RedisCeleryBackend{ - Pool: NewRedisPool(host, pass), + Pool: NewRedisPool(uri), } } diff --git a/redis_broker.go b/redis_broker.go index f6b836a..d9776bf 100644 --- a/redis_broker.go +++ b/redis_broker.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) // RedisCeleryBroker is CeleryBroker for Redis @@ -17,6 +17,25 @@ type RedisCeleryBroker struct { workWG sync.WaitGroup } +// NewRedisPool creates pool of redis connections from given uri +func NewRedisPool(uri string) *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL(uri) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } +} + // NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri func NewRedisCeleryBroker(uri string) *RedisCeleryBroker { return &RedisCeleryBroker{ diff --git a/worker_test.go b/worker_test.go index 06222f0..5754b43 100644 --- a/worker_test.go +++ b/worker_test.go @@ -13,8 +13,8 @@ func add(a int, b int) int { // newCeleryWorker creates celery worker func newCeleryWorker(numWorkers int) *CeleryWorker { - broker := NewRedisCeleryBroker("localhost:6379", "") - backend := NewRedisCeleryBackend("localhost:6379", "") + broker := NewRedisCeleryBroker("redis://localhost:6379") + backend := NewRedisCeleryBackend("redis://localhost:6379") celeryWorker := NewCeleryWorker(broker, backend, numWorkers) return celeryWorker } From a1fb8f8a8de87aae080d5527a50cdefe956333a3 Mon Sep 17 00:00:00 2001 From: Karthik K Date: Thu, 28 Jun 2018 18:04:49 +0530 Subject: [PATCH 3/4] refactor: remove redis_pool --- redis_pool.go | 27 --------------------------- 1 file changed, 27 deletions(-) delete mode 100644 redis_pool.go diff --git a/redis_pool.go b/redis_pool.go deleted file mode 100644 index fa2c583..0000000 --- a/redis_pool.go +++ /dev/null @@ -1,27 +0,0 @@ -package gocelery - -import ( - "time" - - "github.com/gomodule/redigo/redis" -) - - -// NewRedisPoolURI creates pool of redis connections from given uri -func NewRedisPoolURI(uri string) *redis.Pool { - return &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.DialURL(uri) - if err != nil { - return nil, err - } - return c, err - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, - } -} From 8a490e4fdfb6156546d3c9259f103abb59aaa889 Mon Sep 17 00:00:00 2001 From: Karthik K Date: Thu, 28 Jun 2018 18:09:36 +0530 Subject: [PATCH 4/4] refactor: main programs to reflect the new API --- Gopkg.toml | 4 ++-- example/client/main.go | 4 ++-- example/worker/main.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Gopkg.toml b/Gopkg.toml index 3334b6e..265a781 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -4,8 +4,8 @@ name = "github.com/satori/go.uuid" [[constraint]] - name = "github.com/garyburd/redigo" - version = "1.6.0" + name = "github.com/gomodule/redigo" + version = "2.0.0" [[constraint]] branch = "master" diff --git a/example/client/main.go b/example/client/main.go index 22fd0b7..dbf2a60 100644 --- a/example/client/main.go +++ b/example/client/main.go @@ -15,8 +15,8 @@ import ( func main() { // create broker and backend - celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "") - celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "") + celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379") + celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379") // AMQP example //celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://") diff --git a/example/worker/main.go b/example/worker/main.go index 40449a2..1349a79 100644 --- a/example/worker/main.go +++ b/example/worker/main.go @@ -49,8 +49,8 @@ func (a *AddTask) RunTask() (interface{}, error) { func main() { // create broker and backend - celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "") - celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "") + celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379") + celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379") // AMQP example //celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")