-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.go
82 lines (70 loc) · 1.42 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package engine
import (
"context"
"fmt"
uuid "github.com/satori/go.uuid"
)
type work struct {
Executable
success chan bool
}
// worker - a unit task executor
type worker struct {
id string
ctx context.Context
stop chan struct{}
input chan work
pool chan chan work
}
// NewWorker - initializes a new worker
func NewWorker(ctx context.Context, pool chan chan work) *worker {
return &worker{
id: fmt.Sprintf("%s-%s", "worker", uuid.NewV4().String()),
ctx: ctx,
pool: pool,
input: make(chan work),
stop: make(chan struct{}),
}
}
// Start - readies worker for execution
func (w *worker) Start() {
log(w.id).Debugf("starting...")
go w.work()
}
// Stop - stops the worker routine
func (w *worker) Stop() {
close(w.stop)
}
func (w *worker) execute(work work) {
if !work.IsCompleted() {
if err := work.Execute(); err != nil {
log(w.id).Errorf("error while executing: %+v", work)
work.OnFailure(err)
go func() {
work.success <- false
close(work.success)
}()
return
}
log(w.id).Infof("completed executing: %+v", work)
work.OnSuccess()
}
go func() {
work.success <- true
close(work.success)
}()
}
func (w *worker) work() {
for {
select {
case w.pool <- w.input:
log(w.id).Debugf("back In queue")
case execute := <-w.input:
log(w.id).Debugf("executing: %+v", execute)
w.execute(execute)
case <-w.stop:
log(w.id).Debugf("stopping...")
return
}
}
}