Skip to content

Commit

Permalink
Merge pull request #7 from joeybloggs/v1
Browse files Browse the repository at this point in the history
Add Consumer Hook
  • Loading branch information
deankarn committed Dec 11, 2015
2 parents 5cff051 + 4d4c823 commit 7f37f4c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type resultStruct struct {
func main() {
p := pool.NewPool(4, 16)

// can add a consumer hook for each consumer routine to get a value from
// such as a database connection which each job can reuse via job.HookParam()
// p.AddConsumerHook(func() interface{}{ return db connection or whatever})

fn := func(job *pool.Job) {

i := job.Params()[0].(int)
Expand Down Expand Up @@ -117,6 +121,10 @@ import (
func main() {
p := pool.NewPool(4, 16)

// can add a consumer hook for each consumer routine to get a value from
// such as a database connection which each job can reuse via job.HookParam()
// p.AddConsumerHook(func() interface{}{ return db connection or whatever})

fn := func(job *pool.Job) {

i := job.Params()[0].(int)
Expand Down
46 changes: 37 additions & 9 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ const (
errRecoveryString = "recovering from panic: %+v\nStack Trace:\n %s"
)

// ConsumerHook type is a function that is called during the consumer startup
// and the return value is added to each Job just prior to firing off the job.
// This is good for say creating a database connection for every job to use but
// not having more than there are consumers.
type ConsumerHook func() interface{}

// ErrRecovery contains the error when a consumer goroutine needed to be recovers
type ErrRecovery struct {
s string
Expand All @@ -23,22 +29,30 @@ func (e *ErrRecovery) Error() string {

// Pool Contains all information for the pool instance
type Pool struct {
jobs chan *Job
results chan interface{}
cancel chan struct{}
wg *sync.WaitGroup
cancelled bool
cancelLock sync.RWMutex
jobs chan *Job
results chan interface{}
cancel chan struct{}
wg *sync.WaitGroup
cancelled bool
cancelLock sync.RWMutex
consumerHook ConsumerHook
}

// JobFunc is the consumable function/job you wish to run
type JobFunc func(job *Job)

// Job contains all information to run a job
type Job struct {
fn JobFunc
params []interface{}
pool *Pool
fn JobFunc
params []interface{}
hookParam interface{}
pool *Pool
}

// HookParam returns the value, if any, set by the ConsumerHook.
// Example a database connection.
func (j *Job) HookParam() interface{} {
return j.hookParam
}

// Params returns an array of the params that were passed in during the Queueing of the funciton
Expand Down Expand Up @@ -82,13 +96,20 @@ func NewPool(consumers int, jobs int) *Pool {
}
}(p)

var consumerParm interface{}

if p.consumerHook != nil {
consumerParm = p.consumerHook()
}

for {
select {
case j := <-p.jobs:
if reflect.ValueOf(j).IsNil() {
return
}

j.hookParam = consumerParm
j.fn(j)
p.wg.Done()
case <-p.cancel:
Expand All @@ -100,6 +121,13 @@ func NewPool(consumers int, jobs int) *Pool {
return p
}

// AddConsumerHook registers a Consumer Hook function to be called by every consumer
// and setting the return value on every job prior to running. Use case is for
// reusing database connections.
func (p *Pool) AddConsumerHook(fn ConsumerHook) {
p.consumerHook = fn
}

func (p *Pool) cancelJobs() {
for range p.jobs {
p.wg.Done()
Expand Down
28 changes: 28 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,34 @@ func TestPool(t *testing.T) {
Equal(t, count, 4)
}

func TestConsumerHook(t *testing.T) {

pool := NewPool(4, 4)
pool.AddConsumerHook(func() interface{} { return 1 })

fn := func(job *Job) {

j := job.HookParam().(int)
job.Return(j)
}

for i := 0; i < 4; i++ {
pool.Queue(fn)
}

var count int

for v := range pool.Results() {
count++

val, ok := v.(int)
Equal(t, ok, true)
Equal(t, val, 1)
}

Equal(t, count, 4)
}

func TestCancel(t *testing.T) {

pool := NewPool(2, 4)
Expand Down

0 comments on commit 7f37f4c

Please sign in to comment.