This repository has been archived by the owner on May 6, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 382
/
controller_instance.go
2658 lines (2330 loc) · 109 KB
/
controller_instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
stderrors "errors"
"fmt"
"net/url"
"sync"
"time"
"github.com/golang/glog"
osb "github.com/pmorie/go-open-service-broker-client/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/kubernetes-incubator/service-catalog/pkg/apis/servicecatalog/v1beta1"
scfeatures "github.com/kubernetes-incubator/service-catalog/pkg/features"
"github.com/kubernetes-incubator/service-catalog/pkg/pretty"
)
const (
successDeprovisionReason string = "DeprovisionedSuccessfully"
successDeprovisionMessage string = "The instance was deprovisioned successfully"
successUpdateInstanceReason string = "InstanceUpdatedSuccessfully"
successUpdateInstanceMessage string = "The instance was updated successfully"
successProvisionReason string = "ProvisionedSuccessfully"
successProvisionMessage string = "The instance was provisioned successfully"
successOrphanMitigationReason string = "OrphanMitigationSuccessful"
successOrphanMitigationMessage string = "Orphan mitigation was completed successfully"
errorWithParameters string = "ErrorWithParameters"
errorProvisionCallFailedReason string = "ProvisionCallFailed"
errorErrorCallingProvisionReason string = "ErrorCallingProvision"
errorUpdateInstanceCallFailedReason string = "UpdateInstanceCallFailed"
errorErrorCallingUpdateInstanceReason string = "ErrorCallingUpdateInstance"
errorDeprovisionCalledReason string = "DeprovisionCallFailed"
errorDeprovisionBlockedByCredentialsReason string = "DeprovisionBlockedByExistingCredentials"
errorPollingLastOperationReason string = "ErrorPollingLastOperation"
errorWithOriginatingIdentity string = "Error with Originating Identity"
errorWithOngoingAsyncOperation string = "ErrorAsyncOperationInProgress"
errorWithOngoingAsyncOperationMessage string = "Another operation for this service instance is in progress. "
errorNonexistentClusterServiceClassReason string = "ReferencesNonexistentServiceClass"
errorNonexistentClusterServiceClassMessage string = "ReferencesNonexistentServiceClass"
errorNonexistentClusterServicePlanReason string = "ReferencesNonexistentServicePlan"
errorNonexistentClusterServiceBrokerReason string = "ReferencesNonexistentBroker"
errorNonexistentServiceClassReason string = "ReferencesNonexistentServiceClass"
errorNonexistentServiceClassMessage string = "ReferencesNonexistentServiceClass"
errorNonexistentServicePlanReason string = "ReferencesNonexistentServicePlan"
errorNonexistentServiceBrokerReason string = "ReferencesNonexistentBroker"
errorDeletedClusterServiceClassReason string = "ReferencesDeletedServiceClass"
errorDeletedClusterServiceClassMessage string = "ReferencesDeletedServiceClass"
errorDeletedClusterServicePlanReason string = "ReferencesDeletedServicePlan"
errorDeletedClusterServicePlanMessage string = "ReferencesDeletedServicePlan"
errorDeletedServiceClassReason string = "ReferencesDeletedServiceClass"
errorDeletedServiceClassMessage string = "ReferencesDeletedServiceClass"
errorDeletedServicePlanReason string = "ReferencesDeletedServicePlan"
errorDeletedServicePlanMessage string = "ReferencesDeletedServicePlan"
errorFindingNamespaceServiceInstanceReason string = "ErrorFindingNamespaceForInstance"
errorOrphanMitigationFailedReason string = "OrphanMitigationFailed"
errorInvalidDeprovisionStatusReason string = "InvalidDeprovisionStatus"
errorInvalidDeprovisionStatusMessage string = "The deprovision status is invalid"
errorAmbiguousPlanReferenceScope string = "Couldn't determine if the instance refers to a Cluster or Namespaced ServiceClass/Plan"
asyncProvisioningReason string = "Provisioning"
asyncProvisioningMessage string = "The instance is being provisioned asynchronously"
asyncUpdatingInstanceReason string = "UpdatingInstance"
asyncUpdatingInstanceMessage string = "The instance is being updated asynchronously"
asyncDeprovisioningReason string = "Deprovisioning"
asyncDeprovisioningMessage string = "The instance is being deprovisioned asynchronously"
provisioningInFlightReason string = "ProvisionRequestInFlight"
provisioningInFlightMessage string = "Provision request for ServiceInstance in-flight to Broker"
instanceUpdatingInFlightReason string = "UpdateInstanceRequestInFlight"
instanceUpdatingInFlightMessage string = "Update request for ServiceInstance in-flight to Broker"
deprovisioningInFlightReason string = "DeprovisionRequestInFlight"
deprovisioningInFlightMessage string = "Deprovision request for ServiceInstance in-flight to Broker"
startingInstanceOrphanMitigationReason string = "StartingInstanceOrphanMitigation"
startingInstanceOrphanMitigationMessage string = "The instance provision call failed with an ambiguous error; attempting to deprovision the instance in order to mitigate an orphaned resource"
clusterIdentifierKey string = "clusterid"
minBrokerOperationRetryDelay time.Duration = time.Second * 1
maxBrokerOperationRetryDelay time.Duration = time.Minute * 20
)
type backoffEntry struct {
generation int64
calculatedRetryTime time.Time // earliest time we should retry
dirty bool // true indicates new backoff should be calculated
}
type instanceOperationBackoff struct {
// lock to be used for accessing retry map
mutex sync.RWMutex
instances map[string]backoffEntry
rateLimiter workqueue.RateLimiter // used to calculate next retry time
}
// ServiceInstance handlers and control-loop
// instanceAdd adds the instance key to the work queue
func (c *controller) instanceAdd(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
c.instanceQueue.Add(key)
}
// instanceAddAfter adds the instance key to the work queue after the specified
// duration elapses
func (c *controller) instanceAddAfter(obj interface{}, d time.Duration) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
c.instanceQueue.AddAfter(key, d)
}
func (c *controller) instanceUpdate(oldObj, newObj interface{}) {
// Instances with ongoing asynchronous operations will be manually added
// to the polling queue by the reconciler. They should be ignored here in
// order to enforce polling rate-limiting.
instance := newObj.(*v1beta1.ServiceInstance)
if !instance.Status.AsyncOpInProgress {
c.instanceAdd(newObj)
}
}
func (c *controller) instanceDelete(obj interface{}) {
instance, ok := obj.(*v1beta1.ServiceInstance)
if instance == nil || !ok {
return
}
pcb := pretty.NewInstanceContextBuilder(instance)
glog.V(4).Info(pcb.Message("Received delete event; no further processing will occur"))
}
// Async operations on instances have a somewhat convoluted flow in order to
// ensure that only a single goroutine works on an instance at any given time.
// The flow is:
//
// 1. When the controller wants to begin polling the state of an operation on
// an instance, it calls its beginPollingServiceInstance method (or
// calls continuePollingServiceInstance, an alias of that method)
// 2. begin/continuePollingServiceInstance do a rate-limited add to the polling queue
// 3. the instancePollingQueue calls requeueServiceInstanceForPoll, which adds the instance's
// key to the instance work queue
// 4. the worker servicing the instance polling queue forgets the instances key,
// requiring the controller to call continuePollingServiceInstance if additional
// work is needed.
// 5. the instance work queue is the single work queue that actually services
// instances by calling reconcileServiceInstance
// 6. when an asynchronous operation is completed, the controller calls
// finishPollingServiceInstance to forget the instance from the polling queue
// requeueServiceInstanceForPoll adds the given instance key to the controller's work
// queue for instances. It is used to trigger polling for the status of an
// async operation on and instance and is called by the worker servicing the
// instance polling queue. After requeueServiceInstanceForPoll exits, the worker
// forgets the key from the polling queue, so the controller must call
// continuePollingServiceInstance if the instance requires additional polling.
func (c *controller) requeueServiceInstanceForPoll(key string) error {
c.instanceQueue.Add(key)
return nil
}
// beginPollingServiceInstance does a rate-limited add of the key for the given
// instance to the controller's instance polling queue.
func (c *controller) beginPollingServiceInstance(instance *v1beta1.ServiceInstance) error {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
pcb := pretty.NewInstanceContextBuilder(instance)
s := fmt.Sprintf("Couldn't create a key for object %+v: %v", instance, err)
glog.Errorf(pcb.Message(s))
return fmt.Errorf(s)
}
c.instancePollingQueue.AddRateLimited(key)
return nil
}
// continuePollingServiceInstance does a rate-limited add of the key for the given
// instance to the controller's instance polling queue.
func (c *controller) continuePollingServiceInstance(instance *v1beta1.ServiceInstance) error {
return c.beginPollingServiceInstance(instance)
}
// finishPollingServiceInstance removes the instance's key from the controller's instance
// polling queue.
func (c *controller) finishPollingServiceInstance(instance *v1beta1.ServiceInstance) error {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
pcb := pretty.NewInstanceContextBuilder(instance)
s := fmt.Sprintf("Couldn't create a key for object %+v: %v", instance, err)
glog.Errorf(pcb.Message(s))
return fmt.Errorf(s)
}
c.instancePollingQueue.Forget(key)
return nil
}
// resetPollingRateLimiterForServiceInstance causes the polling queue's rate
// limiter to forget the given instance.
func (c *controller) resetPollingRateLimiterForServiceInstance(instance *v1beta1.ServiceInstance) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
pcb := pretty.NewInstanceContextBuilder(instance)
s := fmt.Sprintf("Couldn't create a key for object %+v: %v", instance, err)
glog.Errorf(pcb.Message(s))
return
}
c.instancePollingQueue.Forget(key)
}
// getReconciliationActionForServiceInstance gets the action the reconciler
// should be taking on the given instance.
func getReconciliationActionForServiceInstance(instance *v1beta1.ServiceInstance) ReconciliationAction {
switch {
case instance.Status.AsyncOpInProgress:
return reconcilePoll
case instance.ObjectMeta.DeletionTimestamp != nil || instance.Status.OrphanMitigationInProgress:
return reconcileDelete
case instance.Status.ProvisionStatus == v1beta1.ServiceInstanceProvisionStatusProvisioned:
return reconcileUpdate
default: // instance.Status.ProvisionStatus == "NotProvisioned"
return reconcileAdd
}
}
func (c *controller) reconcileServiceInstanceKey(key string) error {
// For namespace-scoped resources, SplitMetaNamespaceKey splits the key
// i.e. "namespace/name" into two separate strings
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pcb := pretty.NewContextBuilder(pretty.ServiceInstance, namespace, name, "")
instance, err := c.instanceLister.ServiceInstances(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Info(pcb.Messagef("Not doing work for %v because it has been deleted", key))
return nil
}
if err != nil {
glog.Errorf(pcb.Messagef("Unable to retrieve %v from store: %v", key, err))
return err
}
return c.reconcileServiceInstance(instance)
}
// reconcileServiceInstance is the control-loop for reconciling Instances. An
// error is returned to indicate that the instance has not been fully
// processed and should be resubmitted at a later time.
func (c *controller) reconcileServiceInstance(instance *v1beta1.ServiceInstance) error {
updated, err := c.initObservedGeneration(instance)
if err != nil {
return err
}
if updated {
// The updated instance will be automatically added back to the queue
// and processed again
return nil
}
updated, err = c.initOrphanMitigationCondition(instance)
if err != nil {
return err
}
if updated {
// The updated instance will be automatically added back to the queue
// and processed again
return nil
}
reconciliationAction := getReconciliationActionForServiceInstance(instance)
switch reconciliationAction {
// ERIK CP
case reconcileAdd:
return c.reconcileServiceInstanceAdd(instance)
case reconcileUpdate:
return c.reconcileServiceInstanceUpdate(instance)
case reconcileDelete:
return c.reconcileServiceInstanceDelete(instance)
case reconcilePoll:
return c.pollServiceInstance(instance)
default:
pcb := pretty.NewInstanceContextBuilder(instance)
return fmt.Errorf(pcb.Messagef("Unknown reconciliation action %v", reconciliationAction))
}
}
// initObservedGeneration implements ObservedGeneration initialization based on
// ReconciledGeneration for status API migration.
// Returns true if the status was updated (i.e. the iteration has finished and no
// more processing needed).
func (c *controller) initObservedGeneration(instance *v1beta1.ServiceInstance) (bool, error) {
if instance.Status.ObservedGeneration == 0 && instance.Status.ReconciledGeneration != 0 {
instance = instance.DeepCopy()
instance.Status.ObservedGeneration = instance.Status.ReconciledGeneration
// Before we implement https://github.com/kubernetes-incubator/service-catalog/issues/1715
// and switch to non-terminal errors, the "Failed":"True" is a sign that the provisioning failed
provisioned := !isServiceInstanceFailed(instance)
if provisioned {
instance.Status.ProvisionStatus = v1beta1.ServiceInstanceProvisionStatusProvisioned
} else {
instance.Status.ProvisionStatus = v1beta1.ServiceInstanceProvisionStatusNotProvisioned
}
_, err := c.updateServiceInstanceStatus(instance)
if err != nil {
return false, err
}
return true, nil
}
return false, nil
}
// initOrphanMitigationCondition implements OrphanMitigation condition initialization
// based on OrphanMitigationInProgress field for status API migration.
// Returns true if the status was updated (i.e. the iteration has finished and no
// more processing needed).
func (c *controller) initOrphanMitigationCondition(instance *v1beta1.ServiceInstance) (bool, error) {
if !isServiceInstanceOrphanMitigation(instance) && instance.Status.OrphanMitigationInProgress {
instance := instance.DeepCopy()
reason := startingInstanceOrphanMitigationReason
message := startingInstanceOrphanMitigationMessage
c.recorder.Event(instance, corev1.EventTypeWarning, reason, message)
setServiceInstanceCondition(instance, v1beta1.ServiceInstanceConditionOrphanMitigation,
v1beta1.ConditionTrue,
reason,
message)
_, err := c.updateServiceInstanceStatus(instance)
if err != nil {
return false, err
}
return true, nil
}
return false, nil
}
// setRetryBackoffRequired marks the specified instance/generation as needing a
// delay before the next provision/update is attempted. We always set this flag
// before attempting a provision or update operation in case we must retry. This
// will eventually be cleared by the background worker running
// purgeExpiredRetryEntries() or when the operation is successful.
func (c *controller) setRetryBackoffRequired(instance *v1beta1.ServiceInstance) {
pcb := pretty.NewInstanceContextBuilder(instance)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
glog.Errorf(pcb.Messagef("Couldn't create a key for object %+v: %v", instance, err))
return
}
c.instanceOperationRetryQueue.mutex.Lock()
defer c.instanceOperationRetryQueue.mutex.Unlock()
retryEntry, found := c.instanceOperationRetryQueue.instances[key]
if !found || retryEntry.generation != instance.Generation {
retryEntry.generation = instance.Generation
// reset the backoff as the generation changed
if found {
c.instanceOperationRetryQueue.rateLimiter.Forget(key)
}
}
retryEntry.dirty = true
c.instanceOperationRetryQueue.instances[key] = retryEntry
glog.V(4).Info(pcb.Messagef("added %v generation %v to backoffBeforeRetrying map", key, instance.Generation))
}
// backoffAndRequeueIfRetrying returns true if this is a retry and a backoff
// (delay) needs to be observed before retrying. This only applies to
// Provisioning and Updating and is generation specific. If the generation has
// been bumped since the instance was added to the retry map there will be no
// backoff delay.
func (c *controller) backoffAndRequeueIfRetrying(instance *v1beta1.ServiceInstance, operation string) bool {
pcb := pretty.NewInstanceContextBuilder(instance)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
glog.Errorf(pcb.Messagef("Couldn't create a key for object %+v: %v", instance, err))
return false
}
delay := time.Millisecond * 0
// if there is a pending delay, calculate it and clear the dirty bit
c.instanceOperationRetryQueue.mutex.Lock()
defer c.instanceOperationRetryQueue.mutex.Unlock()
retryEntry, exists := c.instanceOperationRetryQueue.instances[key]
if exists {
if retryEntry.generation != instance.Generation {
// the retry entry was on an old generation, we don't care,
// cleanup and no delay
delete(c.instanceOperationRetryQueue.instances, key)
c.instanceOperationRetryQueue.rateLimiter.Forget(key)
return false
}
if retryEntry.dirty {
// calculate earliest retry time with exponential backoff
retryEntry.calculatedRetryTime = time.Now().Add(c.instanceOperationRetryQueue.rateLimiter.When(key))
retryEntry.dirty = false
c.instanceOperationRetryQueue.instances[key] = retryEntry
glog.V(4).Infof(pcb.Messagef("generation %v retryTime calculated as %v", instance.Generation, retryEntry.calculatedRetryTime))
}
now := time.Now()
delay = retryEntry.calculatedRetryTime.Sub(now)
if delay > 0 {
msg := fmt.Sprintf("Delaying %s retry, next attempt will be after %s", operation, retryEntry.calculatedRetryTime)
c.recorder.Event(instance, corev1.EventTypeWarning, "RetryBackoff", msg)
glog.V(2).Info(pcb.Message(msg))
// add back to worker queue to retry at the specified time
c.instanceAddAfter(instance, delay)
return true
}
}
return false
}
// purgeExpiredRetryEntries clears entries from the map that have an expired
// retry time. Invoked by a worker on a timer.
func (c *controller) purgeExpiredRetryEntries() {
now := time.Now()
c.instanceOperationRetryQueue.mutex.Lock()
defer c.instanceOperationRetryQueue.mutex.Unlock()
// Ensure we only purge items that aren't being acted on by retries.
// Due to queues and potential delays, only remove entries that are at
// least maxBrokerOperationRetryDelay past next retry time to ensure
// entries are not prematurely removed
overDue := now.Add(-maxBrokerOperationRetryDelay)
purgedEntries := 0
for k, v := range c.instanceOperationRetryQueue.instances {
if v.calculatedRetryTime.Before(overDue) {
glog.V(5).Infof("removing %s from instanceOperationRetryQueue which had retry time of %v", k, v.calculatedRetryTime)
delete(c.instanceOperationRetryQueue.instances, k)
c.instanceOperationRetryQueue.rateLimiter.Forget(k)
purgedEntries++
}
}
glog.V(5).Infof("purged %v expired entries from instanceOperationRetryQueue.instances, number of entries remaining: %v", purgedEntries, len(c.instanceOperationRetryQueue.instances))
}
// removeInstanceFromRetryMap removes the instance from the retry & ratelimter maps
func (c *controller) removeInstanceFromRetryMap(instance *v1beta1.ServiceInstance) {
pcb := pretty.NewInstanceContextBuilder(instance)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
glog.Errorf(pcb.Messagef("Couldn't create a key for object %+v: %v", instance, err))
return
}
c.instanceOperationRetryQueue.mutex.Lock()
defer c.instanceOperationRetryQueue.mutex.Unlock()
delete(c.instanceOperationRetryQueue.instances, key)
c.instanceOperationRetryQueue.rateLimiter.Forget(key)
glog.V(4).Infof(pcb.Message("removed from instanceOperationRetryQueue"))
}
// reconcileServiceInstanceAdd is responsible for handling the provisioning
// of new service instances.
func (c *controller) reconcileServiceInstanceAdd(instance *v1beta1.ServiceInstance) error {
pcb := pretty.NewInstanceContextBuilder(instance)
if isServiceInstanceProcessedAlready(instance) {
glog.V(4).Info(pcb.Message("Not processing event because status showed there is no work to do"))
return nil
}
// don't DOS the broker. If we already did a provision attempt that ended with a non-terminal
// error wait for the exponential backoff to pass
if c.backoffAndRequeueIfRetrying(instance, "provision") {
return nil
}
instance = instance.DeepCopy()
// Any status updates from this point should have an updated observed generation
if instance.Status.ObservedGeneration != instance.Generation {
c.prepareObservedGeneration(instance)
}
// Update references to Plan/Class if necessary.
modified, err := c.resolveReferences(instance)
if err != nil {
return err
}
if modified {
// resolveReferences has updated the instance, so we need to continue in the next iteration
return nil
}
glog.V(4).Info(pcb.Message("Processing adding event"))
request, inProgressProperties, err := c.prepareProvisionRequest(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
if instance.Status.CurrentOperation == "" || !isServiceInstancePropertiesStateEqual(instance.Status.InProgressProperties, inProgressProperties) {
instance, err = c.recordStartOfServiceInstanceOperation(instance, v1beta1.ServiceInstanceOperationProvision, inProgressProperties)
if err != nil {
// There has been an update to the instance. Start reconciliation
// over with a fresh view of the instance.
return err
}
// recordStartOfServiceInstanceOperation has updated the instance, so we need to continue in the next iteration
return nil
}
var prettyClass string
var brokerName string
var brokerClient osb.Client
if instance.Spec.ClusterServiceClassSpecified() {
var serviceClass *v1beta1.ClusterServiceClass
serviceClass, _, brokerName, brokerClient, _ = c.getClusterServiceClassPlanAndClusterServiceBroker(instance)
prettyClass = pretty.ClusterServiceClassName(serviceClass)
} else {
var serviceClass *v1beta1.ServiceClass
serviceClass, _, brokerName, brokerClient, _ = c.getServiceClassPlanAndServiceBroker(instance)
prettyClass = pretty.ServiceClassName(serviceClass)
}
glog.V(4).Info(pcb.Messagef(
"Provisioning a new ServiceInstance of %s at Broker %q",
prettyClass, brokerName,
))
c.setRetryBackoffRequired(instance)
response, err := brokerClient.ProvisionInstance(request)
if err != nil {
if httpErr, ok := osb.IsHTTPError(err); ok {
msg := fmt.Sprintf(
"Error provisioning ServiceInstance of %s at ClusterServiceBroker %q: %s",
prettyClass, brokerName, httpErr,
)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, errorProvisionCallFailedReason, msg)
// Depending on the specific response, we may need to initiate orphan mitigation.
shouldMitigateOrphan := shouldStartOrphanMitigation(httpErr.StatusCode)
if isRetriableHTTPStatus(httpErr.StatusCode) {
return c.processTemporaryProvisionFailure(instance, readyCond, shouldMitigateOrphan)
}
// A failure with a given HTTP response code is treated as a terminal
// failure.
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, "ClusterServiceBrokerReturnedFailure", msg)
return c.processTerminalProvisionFailure(instance, readyCond, failedCond, shouldMitigateOrphan)
}
reason := errorErrorCallingProvisionReason
// A timeout error is considered a retriable error, but we
// should initiate orphan mitigation.
if urlErr, ok := err.(*url.Error); ok && urlErr.Timeout() {
msg := fmt.Sprintf("Communication with the ClusterServiceBroker timed out; operation will be retried: %v", urlErr)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, msg)
return c.processTemporaryProvisionFailure(instance, readyCond, true)
}
// All other errors should be retried, unless the
// reconciliation retry time limit has passed.
msg := fmt.Sprintf("The provision call failed and will be retried: Error communicating with broker for provisioning: %v", err)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, msg)
if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
msg := "Stopping reconciliation retries because too much time has elapsed"
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorReconciliationRetryTimeoutReason, msg)
return c.processTerminalProvisionFailure(instance, readyCond, failedCond, false)
}
return c.processServiceInstanceOperationError(instance, readyCond)
}
if response.Async {
return c.processProvisionAsyncResponse(instance, response)
}
return c.processProvisionSuccess(instance, response.DashboardURL)
}
// reconcileServiceInstanceUpdate is responsible for handling updating the plan
// or parameters of a service instance.
func (c *controller) reconcileServiceInstanceUpdate(instance *v1beta1.ServiceInstance) error {
pcb := pretty.NewInstanceContextBuilder(instance)
if isServiceInstanceProcessedAlready(instance) {
glog.V(4).Info(pcb.Message("Not processing event because status showed there is no work to do"))
return nil
}
// don't DOS the broker. If we already did an update attempt that ended with a non-terminal
// error wait for the exponential backoff to pass
if c.backoffAndRequeueIfRetrying(instance, "update") {
return nil
}
instance = instance.DeepCopy()
// Any status updates from this point should have an updated observed generation
if instance.Status.ObservedGeneration != instance.Generation {
c.prepareObservedGeneration(instance)
}
// Update references to ClusterServicePlan / ClusterServiceClass if necessary.
modified, err := c.resolveReferences(instance)
if err != nil {
return err
}
if modified {
// resolveReferences has updated the instance, so we need to continue in the next iteration
return nil
}
glog.V(4).Info(pcb.Message("Processing updating event"))
var brokerClient osb.Client
var request *osb.UpdateInstanceRequest
if instance.Spec.ClusterServiceClassSpecified() {
serviceClass, servicePlan, brokerName, bClient, err := c.getClusterServiceClassPlanAndClusterServiceBroker(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
brokerClient = bClient
// Check if the ServiceClass or ServicePlan has been deleted. If so, do
// not allow plan upgrades, but do allow parameter changes.
if err := c.checkForRemovedClusterClassAndPlan(instance, serviceClass, servicePlan); err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
req, inProgressProperties, err := c.prepareUpdateInstanceRequest(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
request = req
if instance.Status.CurrentOperation == "" || !isServiceInstancePropertiesStateEqual(instance.Status.InProgressProperties, inProgressProperties) {
instance, err = c.recordStartOfServiceInstanceOperation(instance, v1beta1.ServiceInstanceOperationUpdate, inProgressProperties)
if err != nil {
// There has been an update to the instance. Start reconciliation
// over with a fresh view of the instance.
return err
}
// recordStartOfServiceInstanceOperation has updated the instance, so we need to continue in the next iteration
return nil
}
glog.V(4).Info(pcb.Messagef(
"Updating ServiceInstance of %s at ClusterServiceBroker %q",
pretty.ClusterServiceClassName(serviceClass), brokerName,
))
} else if instance.Spec.ServiceClassSpecified() {
serviceClass, servicePlan, brokerName, bClient, err := c.getServiceClassPlanAndServiceBroker(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
brokerClient = bClient
// Check if the ServiceClass or ServicePlan has been deleted. If so, do
// not allow plan upgrades, but do allow parameter changes.
if err := c.checkForRemovedClassAndPlan(instance, serviceClass, servicePlan); err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
req, inProgressProperties, err := c.prepareUpdateInstanceRequest(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
request = req
if instance.Status.CurrentOperation == "" || !isServiceInstancePropertiesStateEqual(instance.Status.InProgressProperties, inProgressProperties) {
instance, err = c.recordStartOfServiceInstanceOperation(instance, v1beta1.ServiceInstanceOperationUpdate, inProgressProperties)
if err != nil {
// There has been an update to the instance. Start reconciliation
// over with a fresh view of the instance.
return err
}
// recordStartOfServiceInstanceOperation has updated the instance, so we need to continue in the next iteration
return nil
}
glog.V(4).Info(pcb.Messagef(
"Updating ServiceInstance of %s at ServiceBroker %q",
pretty.ServiceClassName(serviceClass), brokerName,
))
}
c.setRetryBackoffRequired(instance)
response, err := brokerClient.UpdateInstance(request)
if err != nil {
if httpErr, ok := osb.IsHTTPError(err); ok {
msg := fmt.Sprintf("ServiceBroker returned a failure for update call; update will not be retried: %v", httpErr)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, errorUpdateInstanceCallFailedReason, msg)
if isRetriableHTTPStatus(httpErr.StatusCode) {
return c.processTemporaryUpdateServiceInstanceFailure(instance, readyCond)
}
// A failure with a given HTTP response code is treated as a terminal
// failure.
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorUpdateInstanceCallFailedReason, msg)
return c.processTerminalUpdateServiceInstanceFailure(instance, readyCond, failedCond)
}
reason := errorErrorCallingUpdateInstanceReason
if urlErr, ok := err.(*url.Error); ok && urlErr.Timeout() {
msg := fmt.Sprintf("Communication with the ServiceBroker timed out; update will be retried: %v", urlErr)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, msg)
return c.processTemporaryUpdateServiceInstanceFailure(instance, readyCond)
}
msg := fmt.Sprintf("The update call failed and will be retried: Error communicating with broker for updating: %s", err)
if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
// log and record the real error, but process as a
// failure with reconciliation retry timeout
glog.Info(pcb.Message(msg))
c.recorder.Event(instance, corev1.EventTypeWarning, reason, msg)
msg = "Stopping reconciliation retries because too much time has elapsed"
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, errorReconciliationRetryTimeoutReason, msg)
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorReconciliationRetryTimeoutReason, msg)
return c.processTerminalUpdateServiceInstanceFailure(instance, readyCond, failedCond)
}
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, msg)
return c.processServiceInstanceOperationError(instance, readyCond)
}
if utilfeature.DefaultFeatureGate.Enabled(scfeatures.UpdateDashboardURL) {
if *response.DashboardURL != "" {
instance.Status.DashboardURL = response.DashboardURL
}
}
if response.Async {
return c.processUpdateServiceInstanceAsyncResponse(instance, response)
}
return c.processUpdateServiceInstanceSuccess(instance)
}
// reconcileServiceInstanceDelete is responsible for handling any instance whose
// deletion timestamp is set.
func (c *controller) reconcileServiceInstanceDelete(instance *v1beta1.ServiceInstance) error {
if finalizers := sets.NewString(instance.Finalizers...); !finalizers.Has(v1beta1.FinalizerServiceCatalog) {
return nil
}
pcb := pretty.NewInstanceContextBuilder(instance)
// If deprovisioning has already failed, do not do anything more
if instance.Status.DeprovisionStatus == v1beta1.ServiceInstanceDeprovisionStatusFailed {
glog.V(4).Info(pcb.Message("Not processing deleting event because deprovisioning has failed"))
return nil
}
if instance.Status.OrphanMitigationInProgress {
glog.V(4).Info(pcb.Message("Performing orphan mitigation"))
} else {
glog.V(4).Info(pcb.Message("Processing deleting event"))
}
instance = instance.DeepCopy()
// Any status updates from this point should have an updated observed generation
// except for the orphan mitigation (it is considered to be a continuation
// of the previously failed provisioning operation).
if !instance.Status.OrphanMitigationInProgress && instance.Status.ObservedGeneration != instance.Generation {
c.prepareObservedGeneration(instance)
}
// If the deprovisioning succeeded or is not needed, then no need to
// make a request to the broker.
if instance.Status.DeprovisionStatus == v1beta1.ServiceInstanceDeprovisionStatusNotRequired ||
instance.Status.DeprovisionStatus == v1beta1.ServiceInstanceDeprovisionStatusSucceeded {
return c.processServiceInstanceGracefulDeletionSuccess(instance)
}
// At this point, if the deprovision status is not Required, then it is
// either an invalid value or there is a logical error in the controller.
// Set the deprovision status to Failed and bail out.
if instance.Status.DeprovisionStatus != v1beta1.ServiceInstanceDeprovisionStatusRequired {
msg := fmt.Sprintf("ServiceInstance has invalid DeprovisionStatus field: %v", instance.Status.DeprovisionStatus)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionUnknown, errorInvalidDeprovisionStatusReason, msg)
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorInvalidDeprovisionStatusReason, msg)
return c.processDeprovisionFailure(instance, readyCond, failedCond)
}
// We don't want to delete the instance if there are any bindings associated.
if err := c.checkServiceInstanceHasExistingBindings(instance); err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
var prettyName string
var brokerName string
var brokerClient osb.Client
if instance.Spec.ClusterServiceClassSpecified() {
serviceClass, name, bClient, err := c.getClusterServiceClassAndClusterServiceBroker(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
brokerName = name
brokerClient = bClient
// we need the serviceClass SOLELY to get a value for a msg string >:(
prettyName = pretty.ClusterServiceClassName(serviceClass)
} else if instance.Spec.ServiceClassSpecified() {
serviceClass, name, bClient, err := c.getServiceClassAndServiceBroker(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
brokerName = name
brokerClient = bClient
// we need the serviceClass SOLELY to get a value for a msg string >:(
prettyName = pretty.ServiceClassName(serviceClass)
}
request, inProgressProperties, err := c.prepareDeprovisionRequest(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
if instance.DeletionTimestamp == nil {
// Orphan mitigation
if instance.Status.OperationStartTime == nil {
// if mitigating an orphan, set the operation start time if unset
now := metav1.Now()
instance.Status.OperationStartTime = &now
}
} else {
if instance.Status.CurrentOperation != v1beta1.ServiceInstanceOperationDeprovision {
if instance.Status.OrphanMitigationInProgress {
// There is no need in tracking orphan mitigation separately
// from the normal deletion
removeServiceInstanceCondition(instance, v1beta1.ServiceInstanceConditionOrphanMitigation)
instance.Status.OrphanMitigationInProgress = false
}
instance, err = c.recordStartOfServiceInstanceOperation(instance, v1beta1.ServiceInstanceOperationDeprovision, inProgressProperties)
if err != nil {
// There has been an update to the instance. Start reconciliation
// over with a fresh view of the instance.
return err
}
// recordStartOfServiceInstanceOperation has updated the instance, so we need to continue in the next iteration
return nil
}
}
glog.V(4).Info(pcb.Message("Sending deprovision request to broker"))
response, err := brokerClient.DeprovisionInstance(request)
if err != nil {
msg := fmt.Sprintf(
`Error deprovisioning, %s at ClusterServiceBroker %q: %v`,
prettyName, brokerName, err,
)
if httpErr, ok := osb.IsHTTPError(err); ok {
msg = fmt.Sprintf("Deprovision call failed; received error response from broker: %v", httpErr)
}
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionUnknown, errorDeprovisionCalledReason, msg)
if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
msg := "Stopping reconciliation retries because too much time has elapsed"
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorReconciliationRetryTimeoutReason, msg)
return c.processDeprovisionFailure(instance, readyCond, failedCond)
}
return c.processServiceInstanceOperationError(instance, readyCond)
}
if response.Async {
return c.processDeprovisionAsyncResponse(instance, response)
}
return c.processDeprovisionSuccess(instance)
}
func (c *controller) pollServiceInstance(instance *v1beta1.ServiceInstance) error {
pcb := pretty.NewInstanceContextBuilder(instance)
glog.V(4).Info(pcb.Message("Processing poll event"))
instance = instance.DeepCopy()
var brokerClient osb.Client
var err error
if instance.Spec.ClusterServiceClassSpecified() {
_, _, _, brokerClient, err = c.getClusterServiceClassPlanAndClusterServiceBroker(instance)
} else {
_, _, _, brokerClient, err = c.getServiceClassPlanAndServiceBroker(instance)
}
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
// There are some conditions that are different depending on which
// operation we're polling for. This is more readable than checking the
// status in various places.
mitigatingOrphan := instance.Status.OrphanMitigationInProgress
provisioning := instance.Status.CurrentOperation == v1beta1.ServiceInstanceOperationProvision && !mitigatingOrphan
deleting := instance.Status.CurrentOperation == v1beta1.ServiceInstanceOperationDeprovision || mitigatingOrphan
request, err := c.prepareServiceInstanceLastOperationRequest(instance)
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
}
glog.V(5).Info(pcb.Message("Polling last operation"))
response, err := brokerClient.PollLastOperation(request)
if err != nil {
// If the operation was for delete and we receive a http.StatusGone,
// this is considered a success as per the spec
if osb.IsGoneError(err) && deleting {
if err := c.processDeprovisionSuccess(instance); err != nil {
return c.handleServiceInstancePollingError(instance, err)
}
return c.finishPollingServiceInstance(instance)
}
// We got some kind of error and should continue polling.
//
// The instance's Ready condition should already be False, so
// we just need to record an event.
s := fmt.Sprintf("Error polling last operation: %v", err)
glog.V(4).Info(pcb.Message(s))
c.recorder.Event(instance, corev1.EventTypeWarning, errorPollingLastOperationReason, s)
if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
return c.processServiceInstancePollingFailureRetryTimeout(instance, nil)
}
return c.continuePollingServiceInstance(instance)
}
description := "(no description provided)"
if response.Description != nil {
description = *response.Description
}
glog.V(4).Info(pcb.Messagef("Poll returned %q : %q", response.State, description))
switch response.State {
case osb.StateInProgress:
var message string
var reason string
switch {
case deleting:
reason = asyncDeprovisioningReason
message = asyncDeprovisioningMessage
case provisioning:
reason = asyncProvisioningReason
message = asyncProvisioningMessage
default:
reason = asyncUpdatingInstanceReason
message = asyncUpdatingInstanceMessage
}
if response.Description != nil {
message = fmt.Sprintf("%s (%s)", message, *response.Description)
}
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, message)
if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
return c.processServiceInstancePollingFailureRetryTimeout(instance, readyCond)