-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathbatch.go
131 lines (105 loc) · 3.36 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package pool
import "sync"
// Batch contains all information for a batch run of WorkUnits
type Batch interface {
// Queue queues the work to be run in the pool and starts processing immediately
// and also retains a reference for Cancellation and outputting to results.
// WARNING be sure to call QueueComplete() once all work has been Queued.
Queue(fn WorkFunc)
// QueueComplete lets the batch know that there will be no more Work Units Queued
// so that it may close the results channels once all work is completed.
// WARNING: if this function is not called the results channel will never exhaust,
// but block forever listening for more results.
QueueComplete()
// Cancel cancels the Work Units belonging to this Batch
Cancel()
// Results returns a Work Unit result channel that will output all
// completed units of work.
Results() <-chan WorkUnit
// WaitAll is an alternative to Results() where you
// may want/need to wait until all work has been
// processed, but don't need to check results.
// eg. individual units of work may handle their own
// errors, logging...
WaitAll()
}
// batch contains all information for a batch run of WorkUnits
type batch struct {
pool Pool
m sync.Mutex
units []WorkUnit
results chan WorkUnit
done chan struct{}
closed bool
wg *sync.WaitGroup
}
func newBatch(p Pool) Batch {
return &batch{
pool: p,
units: make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
results: make(chan WorkUnit),
done: make(chan struct{}),
wg: new(sync.WaitGroup),
}
}
// Queue queues the work to be run in the pool and starts processing immediately
// and also retains a reference for Cancellation and outputting to results.
// WARNING be sure to call QueueComplete() once all work has been Queued.
func (b *batch) Queue(fn WorkFunc) {
b.m.Lock()
if b.closed {
b.m.Unlock()
return
}
wu := b.pool.Queue(fn)
b.units = append(b.units, wu) // keeping a reference for cancellation purposes
b.wg.Add(1)
b.m.Unlock()
go func(b *batch, wu WorkUnit) {
wu.Wait()
b.results <- wu
b.wg.Done()
}(b, wu)
}
// QueueComplete lets the batch know that there will be no more Work Units Queued
// so that it may close the results channels once all work is completed.
// WARNING: if this function is not called the results channel will never exhaust,
// but block forever listening for more results.
func (b *batch) QueueComplete() {
b.m.Lock()
b.closed = true
close(b.done)
b.m.Unlock()
}
// Cancel cancels the Work Units belonging to this Batch
func (b *batch) Cancel() {
b.QueueComplete() // no more to be added
b.m.Lock()
// go in reverse order to try and cancel as many as possbile
// one at end are less likely to have run than those at the beginning
for i := len(b.units) - 1; i >= 0; i-- {
b.units[i].Cancel()
}
b.m.Unlock()
}
// Results returns a Work Unit result channel that will output all
// completed units of work.
func (b *batch) Results() <-chan WorkUnit {
go func(b *batch) {
<-b.done
b.m.Lock()
b.wg.Wait()
b.m.Unlock()
close(b.results)
}(b)
return b.results
}
// WaitAll is an alternative to Results() where you
// may want/need to wait until all work has been
// processed, but don't need to check results.
// eg. individual units of work may handle their own
// errors and logging...
func (b *batch) WaitAll() {
for range b.Results() {
}
}