From c5d79f1bd18dd8782da5297259ba0fd678f4d2c5 Mon Sep 17 00:00:00 2001 From: joeybloggs Date: Fri, 11 Dec 2015 15:33:00 -0500 Subject: [PATCH 1/2] Add Consumer Hook * now can register ConsumerHook function that will be run while firing up the consumer routines and that return value will be set/passed to each job. This is particularily usefull when creating a saving pool so a the consumer hook would create a database connection for each job to reuse instead of creating an additional one for each job. --- pool.go | 46 +++++++++++++++++++++++++++++++++++++--------- pool_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pool.go b/pool.go index 04e8022..7d46262 100644 --- a/pool.go +++ b/pool.go @@ -11,6 +11,12 @@ const ( errRecoveryString = "recovering from panic: %+v\nStack Trace:\n %s" ) +// ConsumerHook type is a function that is called during the consumer startup +// and the return value is added to each Job just prior to firing off the job. +// This is good for say creating a database connection for every job to use but +// not having more than there are consumers. +type ConsumerHook func() interface{} + // ErrRecovery contains the error when a consumer goroutine needed to be recovers type ErrRecovery struct { s string @@ -23,12 +29,13 @@ func (e *ErrRecovery) Error() string { // Pool Contains all information for the pool instance type Pool struct { - jobs chan *Job - results chan interface{} - cancel chan struct{} - wg *sync.WaitGroup - cancelled bool - cancelLock sync.RWMutex + jobs chan *Job + results chan interface{} + cancel chan struct{} + wg *sync.WaitGroup + cancelled bool + cancelLock sync.RWMutex + consumerHook ConsumerHook } // JobFunc is the consumable function/job you wish to run @@ -36,9 +43,16 @@ type JobFunc func(job *Job) // Job contains all information to run a job type Job struct { - fn JobFunc - params []interface{} - pool *Pool + fn JobFunc + params []interface{} + hookParam interface{} + pool *Pool +} + +// HookParam returns the value, if any, set by the ConsumerHook. +// Example a database connection. +func (j *Job) HookParam() interface{} { + return j.hookParam } // Params returns an array of the params that were passed in during the Queueing of the funciton @@ -82,6 +96,12 @@ func NewPool(consumers int, jobs int) *Pool { } }(p) + var consumerParm interface{} + + if p.consumerHook != nil { + consumerParm = p.consumerHook() + } + for { select { case j := <-p.jobs: @@ -89,6 +109,7 @@ func NewPool(consumers int, jobs int) *Pool { return } + j.hookParam = consumerParm j.fn(j) p.wg.Done() case <-p.cancel: @@ -100,6 +121,13 @@ func NewPool(consumers int, jobs int) *Pool { return p } +// AddConsumerHook registers a Consumer Hook function to be called by every consumer +// and setting the return value on every job prior to running. Use case is for +// reusing database connections. +func (p *Pool) AddConsumerHook(fn ConsumerHook) { + p.consumerHook = fn +} + func (p *Pool) cancelJobs() { for range p.jobs { p.wg.Done() diff --git a/pool_test.go b/pool_test.go index c7a75e5..3c9215d 100644 --- a/pool_test.go +++ b/pool_test.go @@ -52,6 +52,34 @@ func TestPool(t *testing.T) { Equal(t, count, 4) } +func TestConsumerHook(t *testing.T) { + + pool := NewPool(4, 4) + pool.AddConsumerHook(func() interface{} { return 1 }) + + fn := func(job *Job) { + + j := job.HookParam().(int) + job.Return(j) + } + + for i := 0; i < 4; i++ { + pool.Queue(fn) + } + + var count int + + for v := range pool.Results() { + count++ + + val, ok := v.(int) + Equal(t, ok, true) + Equal(t, val, 1) + } + + Equal(t, count, 4) +} + func TestCancel(t *testing.T) { pool := NewPool(2, 4) From 4d4c8230d2a5b28a5127b51744db8ec852e5ec87 Mon Sep 17 00:00:00 2001 From: joeybloggs Date: Fri, 11 Dec 2015 15:39:55 -0500 Subject: [PATCH 2/2] Update README --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 1e8af54..dac5f64 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,10 @@ type resultStruct struct { func main() { p := pool.NewPool(4, 16) + // can add a consumer hook for each consumer routine to get a value from + // such as a database connection which each job can reuse via job.HookParam() + // p.AddConsumerHook(func() interface{}{ return db connection or whatever}) + fn := func(job *pool.Job) { i := job.Params()[0].(int) @@ -117,6 +121,10 @@ import ( func main() { p := pool.NewPool(4, 16) + // can add a consumer hook for each consumer routine to get a value from + // such as a database connection which each job can reuse via job.HookParam() + // p.AddConsumerHook(func() interface{}{ return db connection or whatever}) + fn := func(job *pool.Job) { i := job.Params()[0].(int)