-
Notifications
You must be signed in to change notification settings - Fork 263
/
Copy pathwait_for_ready.go
312 lines (279 loc) · 11 KB
/
wait_for_ready.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wait
import (
"context"
"fmt"
"io"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"knative.dev/pkg/apis"
)
// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor
kind string
}
// Callbacks and configuration used while waiting for event
type waitForEvent struct {
watchMaker WatchMaker
eventDone EventDone
kind string
}
// EventDone is a marker to stop actual waiting on given event state
type EventDone func(ev *watch.Event) bool
// Interface used for waiting of a resource of a given name to reach a definitive
// state in its "Ready" condition.
type Wait interface {
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration)
}
type Options struct {
// Window for how long a ReadyCondition == false has to stay
// for being considered as an error (useful for flaky reconciliation
ErrorWindow *time.Duration
// Timeout for how long to wait at maximum
Timeout *time.Duration
}
// Create watch which is used when waiting for Ready condition
type WatchMaker func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error)
// Extract conditions from a runtime object
type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
// Callback for event messages
type MessageCallback func(durationSinceState time.Duration, message string)
// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor,
}
}
// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
watchMaker: watchMaker,
eventDone: eventDone,
}
}
// SimpleMessageCallback returns a callback which prints out a simple event message to a given writer
func SimpleMessageCallback(out io.Writer) MessageCallback {
oldMessage := ""
return func(duration time.Duration, message string) {
txt := message
if message == oldMessage {
txt = "..."
}
fmt.Fprintf(out, "%7.3fs %s\n", float64(duration.Round(time.Millisecond))/float64(time.Second), txt)
oldMessage = message
}
}
// NoopMessageCallback is callback which does nothing
func NoopMessageCallback() MessageCallback {
return func(durationSinceState time.Duration, message string) {}
}
// Wait until a resource enters condition of type "Ready" to "False" or "True".
// `watchFunc` creates the actual watch, `kind` is the type what your are watching for
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault()
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(ctx, name, initialVersion, start, timeoutTimer, options.errorWindowWithDefault(), options, msgCallback)
if err != nil {
return err, time.Since(start)
}
if timeoutReached {
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
}
if retry {
// sleep to prevent CPU pegging and restart the loop
time.Sleep(pollInterval)
continue
}
return nil, time.Since(start)
}
}
// waitForReadyCondition waits until the status condition "Ready" is set to true (good path) or return an error
// when the "Ready" condition is set to false. An error is also returned when the given timeout is reached (plus the
// return value of timeoutReached is set to true in this case).
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition(ctx context.Context, name string, initialVersion string, start time.Time,
timeoutTimer *time.Timer, errorWindow time.Duration, options Options, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {
watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault())
if err != nil {
return false, false, err
}
defer watcher.Stop()
// channel used to transport the error that has been received
errChan := make(chan error)
var errorTimer *time.Timer
// Stop error timer if it has been started because of
// a ConditionReady has been set to false
defer (func() {
if errorTimer != nil {
errorTimer.Stop()
errorTimer = nil
}
})()
for {
select {
case <-ctx.Done():
return false, false, ctx.Err()
case <-timeoutTimer.C:
// We reached a timeout without receiving a "Ready" == "True" event
return false, true, nil
case err = <-errChan:
// The error timer fired and we have not received a recovery event ("True" / "Unknown") in the
// meantime. So the error status is considered to be final.
return false, false, err
case event, ok := <-watcher.ResultChan():
if !ok || event.Object == nil {
return true, false, nil
}
// Skip event if its not a MODIFIED event, as only MODIFIED events update the condition
// we are looking for.
// This will filter out all synthetic ADDED events that created bt the API server for
// the initial state. See https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter
// for details:
// "Get State and Start at Most Recent: Start a watch at the most recent resource version,
// which must be consistent (i.e. served from etcd via a quorum read). To establish initial state,
// the watch begins with synthetic “Added” events of all resources instances that exist at the starting
// resource version. All following watch events are for all changes that occurred after the resource
// version the watch started at."
if event.Type != watch.Modified {
continue
}
// Check whether resource is in sync already (meta.generation == status.observedGeneration)
inSync, err := generationCheck(event.Object)
if err != nil {
return false, false, err
}
// Skip events if generations has not yet been consolidated, regardless of type.
// Wait for the next event to come in until the generations align
if !inSync {
continue
}
conditions, err := w.conditionsExtractor(event.Object)
if err != nil {
return false, false, err
}
for _, cond := range conditions {
if cond.Type == apis.ConditionReady {
switch cond.Status {
case corev1.ConditionTrue:
// Any error timer running will be cancelled by the defer method that has been set above
return false, false, nil
case corev1.ConditionFalse:
// Fire up a timer waiting for the error window duration to still allow to reconcile
// to a true condition even after the condition went to false. If this is not the case within
// this window, then an error is returned.
// If there is already a timer running, we just log.
if errorTimer == nil {
err := fmt.Errorf("%s: %s", cond.Reason, cond.Message)
errorTimer = time.AfterFunc(errorWindow, func() {
errChan <- err
})
}
case corev1.ConditionUnknown:
// If an errorTimer is triggered because of a previous "False" event, but now
// we received an "Unknown" event during the error window, cancel the error timer
// to avoid to receive an error signal.
if errorTimer != nil {
errorTimer.Stop()
errorTimer = nil
}
}
if cond.Message != "" {
msgCallback(time.Since(start), cond.Message)
}
}
}
}
}
}
// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) {
watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault())
if err != nil {
return err, 0
}
defer watcher.Stop()
timeout := options.timeoutWithDefault()
start := time.Now()
// channel used to transport the error
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err(), time.Since(start)
case <-timer.C:
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
case event := <-watcher.ResultChan():
if w.eventDone(&event) {
return nil, time.Since(start)
}
}
}
}
func generationCheck(object runtime.Object) (bool, error) {
unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return false, err
}
meta, ok := unstructured["metadata"].(map[string]interface{})
if !ok {
return false, fmt.Errorf("cannot extract metadata from %v", object)
}
status, ok := unstructured["status"].(map[string]interface{})
if !ok {
return false, fmt.Errorf("cannot extract status from %v", object)
}
observedGeneration, ok := status["observedGeneration"]
if !ok {
// Can be the case if not status has been attached yet
return false, nil
}
givenGeneration, ok := meta["generation"]
if !ok {
return false, fmt.Errorf("no field 'generation' in metadata of %v", object)
}
return givenGeneration == observedGeneration, nil
}
func (o Options) timeoutWithDefault() time.Duration {
if o.Timeout != nil {
return *o.Timeout
}
return 60 * time.Second
}
func (o Options) errorWindowWithDefault() time.Duration {
if o.ErrorWindow != nil {
return *o.ErrorWindow
}
return 2 * time.Second
}