From 79739fdfd753a88d2ad0b08efa2692ce51263a1e Mon Sep 17 00:00:00 2001 From: Mark Robinson Date: Tue, 8 Jun 2021 18:57:51 -0700 Subject: [PATCH] fix: Mitigate the bug where items are re-added constantly to the workqueue. #1193 (#1243) This will prevent argo from hanging for up to 16 minutes at a time while processing a rollout. Signed-off-by: Mark Robinson --- analysis/controller_test.go | 4 ++- controller/controller.go | 12 ++++--- experiments/controller_test.go | 9 +++-- ingress/ingress_test.go | 6 ++-- .../viewcontroller/viewcontroller.go | 4 ++- rollout/controller.go | 4 +++ rollout/controller_test.go | 8 +++-- rollout/trafficrouting/istio/controller.go | 4 ++- service/service_test.go | 6 ++-- utils/controller/controller.go | 2 ++ utils/controller/controller_test.go | 34 ++++++++++--------- utils/queue/queue.go | 13 +++++++ 12 files changed, 72 insertions(+), 34 deletions(-) create mode 100644 utils/queue/queue.go diff --git a/analysis/controller_test.go b/analysis/controller_test.go index cf60053f4b..e758ba5d7c 100644 --- a/analysis/controller_test.go +++ b/analysis/controller_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/argoproj/argo-rollouts/utils/queue" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/undefinedlabs/go-mpatch" @@ -87,7 +89,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share i := informers.NewSharedInformerFactory(f.client, resync()) k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync()) - analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns") + analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns") metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{ Addr: "localhost:8080", diff --git a/controller/controller.go b/controller/controller.go index af6add023d..997e9202a0 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/argoproj/argo-rollouts/utils/queue" + "github.com/pkg/errors" smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" log "github.com/sirupsen/logrus" @@ -130,11 +132,11 @@ func NewManager( K8SRequestProvider: k8sRequestProvider, }) - rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") - experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments") - analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns") - serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services") - ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses") + rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") + experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Experiments") + analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns") + serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services") + ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses") refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, rolloutWorkqueue, rolloutsInformer.Informer()) diff --git a/experiments/controller_test.go b/experiments/controller_test.go index e7861668d9..aed35dc86a 100644 --- a/experiments/controller_test.go +++ b/experiments/controller_test.go @@ -8,8 +8,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/undefinedlabs/go-mpatch" + + "github.com/argoproj/argo-rollouts/utils/queue" + + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -316,8 +319,8 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share i := informers.NewSharedInformerFactory(f.client, resync()) k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync()) - rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") - experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments") + rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") + experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Experiments") metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{ Addr: "localhost:8080", diff --git a/ingress/ingress_test.go b/ingress/ingress_test.go index 6153fda191..42c6edf0e4 100644 --- a/ingress/ingress_test.go +++ b/ingress/ingress_test.go @@ -4,6 +4,8 @@ import ( "sync" "testing" + "github.com/argoproj/argo-rollouts/utils/queue" + "github.com/stretchr/testify/assert" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -63,8 +65,8 @@ func newFakeIngressController(ing *extensionsv1beta1.Ingress, rollout *v1alpha1. i := informers.NewSharedInformerFactory(client, 0) k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0) - rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") - ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses") + rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") + ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses") c := NewController(ControllerConfig{ Client: kubeclient, diff --git a/pkg/kubectl-argo-rollouts/viewcontroller/viewcontroller.go b/pkg/kubectl-argo-rollouts/viewcontroller/viewcontroller.go index 2add879dd2..247ca64b39 100644 --- a/pkg/kubectl-argo-rollouts/viewcontroller/viewcontroller.go +++ b/pkg/kubectl-argo-rollouts/viewcontroller/viewcontroller.go @@ -5,6 +5,8 @@ import ( "reflect" "time" + "github.com/argoproj/argo-rollouts/utils/queue" + log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" @@ -98,7 +100,7 @@ func newViewController(namespace string, name string, kubeClient kubernetes.Inte rolloutLister: rolloutsInformerFactory.Argoproj().V1alpha1().Rollouts().Lister().Rollouts(namespace), experimentLister: rolloutsInformerFactory.Argoproj().V1alpha1().Experiments().Lister().Experiments(namespace), analysisRunLister: rolloutsInformerFactory.Argoproj().V1alpha1().AnalysisRuns().Lister().AnalysisRuns(namespace), - workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + workqueue: workqueue.NewRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter()), } controller.cacheSyncs = append(controller.cacheSyncs, diff --git a/rollout/controller.go b/rollout/controller.go index 0e9d072c9b..479c69bd4a 100644 --- a/rollout/controller.go +++ b/rollout/controller.go @@ -377,6 +377,7 @@ func (c *Controller) syncHandler(key string) error { resolveErr := c.refResolver.Resolve(r) roCtx, err := c.newRolloutContext(r) if err != nil { + logCtx.Errorf("newRolloutContext err %v", err) return err } if resolveErr != nil { @@ -388,6 +389,9 @@ func (c *Controller) syncHandler(key string) error { if roCtx.newRollout != nil { c.writeBackToInformer(roCtx.newRollout) } + if err != nil { + logCtx.Errorf("roCtx.reconcile err %v", err) + } return err } diff --git a/rollout/controller_test.go b/rollout/controller_test.go index 232a5acf3e..14a6cf2436 100644 --- a/rollout/controller_test.go +++ b/rollout/controller_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/argoproj/argo-rollouts/utils/queue" + "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -487,9 +489,9 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share istioVirtualServiceInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer() istioDestinationRuleInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer() - rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") - serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services") - ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses") + rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 10*time.Second), "Rollouts") + serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services") + ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses") metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{ Addr: "localhost:8080", diff --git a/rollout/trafficrouting/istio/controller.go b/rollout/trafficrouting/istio/controller.go index 9cc442ae0a..b49132758c 100644 --- a/rollout/trafficrouting/istio/controller.go +++ b/rollout/trafficrouting/istio/controller.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/argoproj/argo-rollouts/utils/queue" + log "github.com/sirupsen/logrus" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -58,7 +60,7 @@ type IstioController struct { func NewIstioController(cfg IstioControllerConfig) *IstioController { c := IstioController{ IstioControllerConfig: cfg, - destinationRuleWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DestinationRules"), + destinationRuleWorkqueue: workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "DestinationRules"), VirtualServiceLister: dynamiclister.New(cfg.VirtualServiceInformer.GetIndexer(), istioutil.GetIstioVirtualServiceGVR()), DestinationRuleLister: dynamiclister.New(cfg.DestinationRuleInformer.GetIndexer(), istioutil.GetIstioDestinationRuleGVR()), } diff --git a/service/service_test.go b/service/service_test.go index a1a6f2b0b5..533cbd9e49 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -3,6 +3,8 @@ package service import ( "testing" + "github.com/argoproj/argo-rollouts/utils/queue" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -65,8 +67,8 @@ func newFakeServiceController(svc *corev1.Service, rollout *v1alpha1.Rollout) (* i := informers.NewSharedInformerFactory(client, 0) k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0) - rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") - serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services") + rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") + serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services") metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{ Addr: "localhost:8080", K8SRequestProvider: &metrics.K8sRequestsCountProvider{}, diff --git a/utils/controller/controller.go b/utils/controller/controller.go index 34364b18ca..e3123d0f1c 100644 --- a/utils/controller/controller.go +++ b/utils/controller/controller.go @@ -160,6 +160,8 @@ func processNextWorkItem(workqueue workqueue.RateLimitingInterface, objType stri // Put the item back on // the workqueue to handle any transient errors. workqueue.AddRateLimited(key) + + logCtx.Infof("%s syncHandler queue retries: %v : key \"%v\"", objType, workqueue.NumRequeues(key), key) return err } // Finally, if no error occurs we Forget this item so it does not diff --git a/utils/controller/controller_test.go b/utils/controller/controller_test.go index 8630c2d962..1d26070066 100644 --- a/utils/controller/controller_test.go +++ b/utils/controller/controller_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/argoproj/argo-rollouts/utils/queue" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,7 +32,7 @@ import ( ) func TestProcessNextWorkItemHandlePanic(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") q.Add("valid/key") metricServer := metrics.NewMetricsServer(metrics.ServerConfig{ @@ -44,7 +46,7 @@ func TestProcessNextWorkItemHandlePanic(t *testing.T) { } func TestProcessNextWorkItemShutDownQueue(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") syncHandler := func(key string) error { return nil } @@ -53,7 +55,7 @@ func TestProcessNextWorkItemShutDownQueue(t *testing.T) { } func TestProcessNextWorkItemNoTStringKey(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") q.Add(1) syncHandler := func(key string) error { return nil @@ -62,7 +64,7 @@ func TestProcessNextWorkItemNoTStringKey(t *testing.T) { } func TestProcessNextWorkItemNoValidKey(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") q.Add("invalid.key") syncHandler := func(key string) error { return nil @@ -71,7 +73,7 @@ func TestProcessNextWorkItemNoValidKey(t *testing.T) { } func TestProcessNextWorkItemNormalSync(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") q.Add("valid/key") syncHandler := func(key string) error { return nil @@ -80,7 +82,7 @@ func TestProcessNextWorkItemNormalSync(t *testing.T) { } func TestProcessNextWorkItemSyncHandlerReturnError(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") q.Add("valid/key") metricServer := metrics.NewMetricsServer(metrics.ServerConfig{ Addr: "localhost:8080", @@ -93,7 +95,7 @@ func TestProcessNextWorkItemSyncHandlerReturnError(t *testing.T) { } func TestEnqueue(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") r := &v1alpha1.Rollout{ ObjectMeta: metav1.ObjectMeta{ Name: "testName", @@ -105,13 +107,13 @@ func TestEnqueue(t *testing.T) { } func TestEnqueueInvalidObj(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") Enqueue(struct{}{}, q) assert.Equal(t, 0, q.Len()) } func TestEnqueueAfter(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") r := &v1alpha1.Rollout{ ObjectMeta: metav1.ObjectMeta{ Name: "testName", @@ -125,13 +127,13 @@ func TestEnqueueAfter(t *testing.T) { } func TestEnqueueString(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") Enqueue("default/foo", q) assert.Equal(t, 1, q.Len()) } func TestEnqueueAfterInvalidObj(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") EnqueueAfter(struct{}{}, time.Duration(1), q) assert.Equal(t, 0, q.Len()) time.Sleep(2 * time.Second) @@ -139,7 +141,7 @@ func TestEnqueueAfterInvalidObj(t *testing.T) { } func TestEnqueueRateLimited(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") r := &v1alpha1.Rollout{ ObjectMeta: metav1.ObjectMeta{ Name: "testName", @@ -153,7 +155,7 @@ func TestEnqueueRateLimited(t *testing.T) { } func TestEnqueueRateLimitedInvalidObject(t *testing.T) { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") EnqueueRateLimited(struct{}{}, q) assert.Equal(t, 0, q.Len()) time.Sleep(time.Second) @@ -440,7 +442,7 @@ func TestProcessNextWatchObj(t *testing.T) { }) indexer.Add(obj) { - wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + wq := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") watchEvent := watch.Event{ Object: obj, } @@ -448,7 +450,7 @@ func TestProcessNextWatchObj(t *testing.T) { assert.Equal(t, 1, wq.Len()) } { - wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + wq := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") watchEvent := watch.Event{ Object: obj, } @@ -456,7 +458,7 @@ func TestProcessNextWatchObj(t *testing.T) { assert.Equal(t, 0, wq.Len()) } { - wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + wq := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts") invalidWatchEvent := watch.Event{} processNextWatchObj(invalidWatchEvent, wq, indexer, "testIndexer") assert.Equal(t, 0, wq.Len()) diff --git a/utils/queue/queue.go b/utils/queue/queue.go new file mode 100644 index 0000000000..a6d5c8e0e7 --- /dev/null +++ b/utils/queue/queue.go @@ -0,0 +1,13 @@ +package queue + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +// DefaultArgoRolloutsRateLimiter is the default queue rate limiter. +// Similar to workqueue.DefaultControllerRateLimiter() but the max limit is 10 seconds instead of 16 minutes +func DefaultArgoRolloutsRateLimiter() workqueue.RateLimiter { + return workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 10*time.Second) +}