-
Notifications
You must be signed in to change notification settings - Fork 36
/
limiter.go
129 lines (111 loc) · 2.96 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package limiter
import (
"errors"
"fmt"
"sync/atomic"
)
const (
// DefaultLimit is the default concurrency limit
DefaultLimit = 100
)
var (
// appending a callback to a closed limiter
ErrorClosed = errors.New("limiter closed")
ErrorSubroutinePanic = errors.New("goroutine panic")
)
// ConcurrencyLimiter object
type ConcurrencyLimiter struct {
limit int
tickets chan int
numInProgress int32
RecoverPanics bool // recover from panics in the subroutines (keeping the process running)
}
// NewConcurrencyLimiter allocates a new ConcurrencyLimiter
func NewConcurrencyLimiter(limit int) *ConcurrencyLimiter {
if limit <= 0 {
limit = DefaultLimit
}
c := &ConcurrencyLimiter{
limit: limit,
tickets: make(chan int, limit),
}
// allocate the tickets:
for i := 0; i < c.limit; i++ {
c.tickets <- i
}
return c
}
// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *ConcurrencyLimiter) Execute(job func()) (int, error) {
ticket, opened := <-c.tickets
if !opened {
return -1, ErrorClosed
}
atomic.AddInt32(&c.numInProgress, 1)
go func() {
defer func() {
c.tickets <- ticket
atomic.AddInt32(&c.numInProgress, -1)
if c.RecoverPanics {
if r := recover(); r != nil {
err := ErrorSubroutinePanic
fmt.Println("ERROR: PANIC: ", r, "err:", err, "- recovering")
}
}
}()
// execute the job
if job != nil {
job()
}
}()
return ticket, nil
}
// ExecuteWithTicket adds a job into an execution queue and returns a ticket id.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *ConcurrencyLimiter) ExecuteWithTicket(job func(ticket int)) (int, error) {
ticket, opened := <-c.tickets
if !opened {
return -1, ErrorClosed
}
atomic.AddInt32(&c.numInProgress, 1)
go func() {
defer func() {
c.tickets <- ticket
atomic.AddInt32(&c.numInProgress, -1)
if c.RecoverPanics {
if r := recover(); r != nil {
err := ErrorSubroutinePanic
fmt.Println("ERROR: PANIC: ", r, "err:", err, "- recovering")
}
}
}()
// run the job
job(ticket)
}()
return ticket, nil
}
// WaitAndClose will block until all the previously Executed jobs completed running.
// New tasks won't be allow
//
// IMPORTANT: calling the Wait function while keep calling Execute leads to
// un-desired race conditions
func (c *ConcurrencyLimiter) WaitAndClose() error {
for i := 0; i < c.limit; i++ {
<-c.tickets
}
return c.close()
}
// GetNumInProgress returns a (racy) counter of how many go routines are active right now
func (c *ConcurrencyLimiter) GetNumInProgress() int32 {
return atomic.LoadInt32(&c.numInProgress)
}
// close the limiter and free the tickets channel
func (c *ConcurrencyLimiter) close() error {
close(c.tickets)
return nil
}