This repository has been archived by the owner on Jul 29, 2020. It is now read-only.
forked from apid/goscaffold
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tracker.go
157 lines (144 loc) · 3.44 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package goscaffold
import (
"math"
"sync/atomic"
"time"
)
/*
values for the command channel.
*/
const (
startRequest = iota
endRequest
shutdown
)
/*
values for the shutdown state
*/
const (
running int32 = iota
markedDown int32 = iota
shutDown int32 = iota
)
/*
The requestTracker keeps track of HTTP requests. In normal operations it
just counts. Once the server has been marked for shutdown, however, it
counts down to zero and returns a shutdown indication when that
happens.
*/
type requestTracker struct {
// A value will be delivered to this channel when the server can stop.
// If "shutdown" is never called then this will never happen.
C chan error
shutdownWait time.Duration
shutdownState int32
shutdownReason *atomic.Value
commandChan chan int
}
/*
startRequestTracker creates a new tracker. shutdownWait defines the
maximum amount of time that we should wait for shutdown in case some
do not complete in a timely way.
*/
func startRequestTracker(shutdownWait time.Duration) *requestTracker {
rt := &requestTracker{
C: make(chan error, 1),
commandChan: make(chan int, 100),
shutdownState: running,
shutdownWait: shutdownWait,
shutdownReason: &atomic.Value{},
}
go rt.trackerLoop()
return rt
}
/*
start indicates that a request started. It returns true if the request
should proceed, and false if the request should fail because the server is
shutting down.
*/
func (t *requestTracker) start() error {
md := t.markedDown()
if md == nil {
t.commandChan <- startRequest
}
return md
}
/*
end indicates that a request ended. In order for this thing to work, the
caller needs to ensure that start and end are always paired.
*/
func (t *requestTracker) end() {
t.commandChan <- endRequest
}
/*
markedDown returns nil if everything is good, and an error if the server
has been marked down. The error is the one that was sent to the
"Shutdown" method.
*/
func (t *requestTracker) markedDown() error {
ss := atomic.LoadInt32(&t.shutdownState)
if ss != running {
reason := t.shutdownReason.Load().(*error)
if reason == nil {
return nil
}
return *reason
}
return nil
}
/*
shutdown indicates that the tracker should start counting down until
the number of running requests reaches zero. The "reason" will be returned
as the result of the "start" call.
*/
func (t *requestTracker) shutdown(reason error) {
t.shutdownReason.Store(&reason)
t.commandChan <- shutdown
}
func (t *requestTracker) markDown() {
t.shutdownReason.Store(&ErrMarkedDown)
atomic.StoreInt32(&t.shutdownState, markedDown)
}
func (t *requestTracker) sendStop(sent bool) bool {
if !sent {
reason := t.shutdownReason.Load().(*error)
if reason == nil {
return false
}
t.C <- *reason
}
return true
}
/*
trackerLoop runs all day and manages stuff.
*/
func (t *requestTracker) trackerLoop() {
activeRequests := 0
stopping := false
sentStop := false
graceTimer := time.NewTimer(time.Duration(math.MaxInt64))
for !sentStop {
select {
case cmd := <-t.commandChan:
switch cmd {
case startRequest:
activeRequests++
case endRequest:
activeRequests--
if stopping && activeRequests == 0 {
sentStop = t.sendStop(sentStop)
}
case shutdown:
stopping = true
atomic.StoreInt32(&t.shutdownState, shutDown)
if activeRequests <= 0 {
sentStop = t.sendStop(sentStop)
} else {
graceTimer.Reset(t.shutdownWait)
}
}
case <-graceTimer.C:
sentStop = t.sendStop(sentStop)
}
}
}