Skip to content

Commit

Permalink
Merge pull request #2 from anantadwi13/dev
Browse files Browse the repository at this point in the history
refactor JobFunc parameters
  • Loading branch information
anantadwi13 authored Apr 8, 2022
2 parents 851d22b + 17d58fb commit 90f2990
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 29 deletions.
6 changes: 6 additions & 0 deletions .github/.codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
coverage:
status:
project:
default:
target: 90%
threshold: 5%
26 changes: 12 additions & 14 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
)

type simpleJob struct {
id string
jobFunc JobFunc
requestData interface{}
resultChan chan *Result
id string
jobFunc SimpleJobFunc
params []interface{}

status Status
statusLock sync.RWMutex
Expand All @@ -24,19 +23,18 @@ type Result struct {
Error error
}

type JobFunc func(isCanceled chan ChanSignal, requestData interface{}, result chan *Result)
type SimpleJobFunc func(isCanceled chan ChanSignal, params ...interface{})

func NewJobSimple(
jobFunc JobFunc, requestData interface{}, result chan *Result,
jobFunc SimpleJobFunc, params ...interface{},
) Job {
return &simpleJob{
id: uuid.NewString(),
jobFunc: jobFunc,
requestData: requestData,
resultChan: result,
status: StatusCreated,
cancelChan: make(chan ChanSignal, 2),
doneChan: make(chan ChanSignal),
id: uuid.NewString(),
jobFunc: jobFunc,
params: params,
status: StatusCreated,
cancelChan: make(chan ChanSignal, 2),
doneChan: make(chan ChanSignal),
}
}

Expand All @@ -53,7 +51,7 @@ func (s *simpleJob) Do(ctx context.Context) {
defer s.setStatus(StatusStopped)

go func() {
s.jobFunc(s.cancelChan, s.requestData, s.resultChan)
s.jobFunc(s.cancelChan, s.params...)
close(s.doneChan)
}()

Expand Down
30 changes: 21 additions & 9 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ func TestSimpleJobNormal(t *testing.T) {
ctx = context.Background()
)

job := NewJobSimple(func(isCanceled chan ChanSignal, requestData interface{}, result chan *Result) {
job := NewJobSimple(func(isCanceled chan ChanSignal, params ...interface{}) {
if len(params) != 1 {
return
}

select {
case <-time.After(10 * time.Millisecond):
atomic.AddInt32(requestData.(*int32), 1)
atomic.AddInt32(params[0].(*int32), 1)
case <-isCanceled:
}
}, &data, nil)
}, &data)

assert.NotEmpty(t, job.Id())
assert.Equal(t, StatusCreated, job.Status())
Expand Down Expand Up @@ -56,13 +60,17 @@ func TestSimpleJobCancellation(t *testing.T) {
ctx = context.Background()
)

job := NewJobSimple(func(isCanceled chan ChanSignal, requestData interface{}, result chan *Result) {
job := NewJobSimple(func(isCanceled chan ChanSignal, params ...interface{}) {
if len(params) != 1 {
return
}

select {
case <-time.After(20 * time.Millisecond):
atomic.AddInt32(requestData.(*int32), 1)
atomic.AddInt32(params[0].(*int32), 1)
case <-isCanceled:
}
}, &data, nil)
}, &data)

assert.NotEmpty(t, job.Id())
assert.Equal(t, StatusCreated, job.Status())
Expand Down Expand Up @@ -110,13 +118,17 @@ func TestSimpleJobDeadline(t *testing.T) {
ctx = context.Background()
)

job := NewJobSimple(func(isCanceled chan ChanSignal, requestData interface{}, result chan *Result) {
job := NewJobSimple(func(isCanceled chan ChanSignal, params ...interface{}) {
if len(params) != 1 {
return
}

select {
case <-time.After(20 * time.Millisecond):
atomic.AddInt32(requestData.(*int32), 1)
atomic.AddInt32(params[0].(*int32), 1)
case <-isCanceled:
}
}, &data, nil)
}, &data)

assert.NotEmpty(t, job.Id())
assert.Equal(t, StatusCreated, job.Status())
Expand Down
25 changes: 19 additions & 6 deletions worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@ type JobWrapper struct {
}

func generateDummyJob(timeout time.Duration, number *int32) Job {
return NewJobSimple(func(isCanceled chan ChanSignal, requestData interface{}, result chan *Result) {
return NewJobSimple(func(isCanceled chan ChanSignal, params ...interface{}) {
if len(params) != 1 {
return
}

num, ok := params[0].(*int32)
if !ok {
return
}

select {
case <-time.After(timeout):
atomic.AddInt32(number, 1)
atomic.AddInt32(num, 1)
case <-isCanceled:
}
}, nil, nil)
}, number)
}

func TestWorkerPoolValidation(t *testing.T) {
Expand Down Expand Up @@ -51,11 +60,15 @@ func TestWorkerPoolMainFlow(t *testing.T) {

value := 0

job := NewJobSimple(func(isCanceled chan ChanSignal, requestData interface{}, result chan *Result) {
val, ok := requestData.(*int)
job := NewJobSimple(func(isCanceled chan ChanSignal, params ...interface{}) {
if len(params) != 1 {
return
}

val, ok := params[0].(*int)
assert.True(t, ok)
*val++
}, &value, nil)
}, &value)

assert.NotEmpty(t, job.Id())

Expand Down

0 comments on commit 90f2990

Please sign in to comment.