diff --git a/cmd/rollouts-controller/main.go b/cmd/rollouts-controller/main.go index 9e1a482e32..b7ce0b241a 100644 --- a/cmd/rollouts-controller/main.go +++ b/cmd/rollouts-controller/main.go @@ -79,6 +79,7 @@ func newCommand() *cobra.Command { informers.WithNamespace(namespace)) controller := controller.NewController(kubeClient, rolloutClient, kubeInformerFactory.Apps().V1().ReplicaSets(), + kubeInformerFactory.Core().V1().Services(), rolloutInformerFactory.Argoproj().V1alpha1().Rollouts(), resyncDuration, metricsPort) diff --git a/controller/controller.go b/controller/controller.go index d8067c732d..d12fea6ae2 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -17,13 +17,16 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" + v1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/cmd/kubeadm/app/util" "k8s.io/kubernetes/pkg/controller" "github.com/argoproj/argo-rollouts/controller/metrics" @@ -39,19 +42,6 @@ import ( const controllerAgentName = "rollouts-controller" const ( - // SuccessSynced is used as part of the Event 'reason' when a Rollout is synced - SuccessSynced = "Synced" - // ErrResourceExists is used as part of the Event 'reason' when a Rollout fails - // to sync due to a Replica of the same name already existing. - ErrResourceExists = "ErrResourceExists" - - // MessageResourceExists is the message used for Events when a resource - // fails to sync due to a Replica already existing - MessageResourceExists = "Resource %q already exists and is not managed by Rollout" - // MessageResourceSynced is the message used for an Event fired when a Rollout - // is synced successfully - MessageResourceSynced = "Rollout synced successfully" - // DefaultRolloutResyncPeriod Default time in seconds for rollout resync period DefaultRolloutResyncPeriod = 15 * 60 @@ -73,6 +63,9 @@ type Controller struct { replicaSetSynced cache.InformerSynced rolloutsLister listers.RolloutLister rolloutsSynced cache.InformerSynced + rolloutsIndexer cache.Indexer + servicesSynced cache.InformerSynced + servicesLister v1.ServiceLister metricsServer *metrics.MetricsServer // used for unit testing @@ -84,7 +77,8 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface + rolloutWorkqueue workqueue.RateLimitingInterface + serviceWorkqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder @@ -96,6 +90,7 @@ func NewController( kubeclientset kubernetes.Interface, rolloutsclientset clientset.Interface, replicaSetInformer appsinformers.ReplicaSetInformer, + servicesInformer coreinformers.ServiceInformer, rolloutsInformer informers.RolloutInformer, resyncPeriod time.Duration, metricsPort int) *Controller { @@ -121,9 +116,13 @@ func NewController( replicaSetControl: replicaSetControl, replicaSetLister: replicaSetInformer.Lister(), replicaSetSynced: replicaSetInformer.Informer().HasSynced, + rolloutsIndexer: rolloutsInformer.Informer().GetIndexer(), rolloutsLister: rolloutsInformer.Lister(), rolloutsSynced: rolloutsInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"), + servicesSynced: servicesInformer.Informer().HasSynced, + servicesLister: servicesInformer.Lister(), + rolloutWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"), + serviceWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services"), recorder: recorder, resyncPeriod: resyncPeriod, metricsServer: metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister()), @@ -131,6 +130,15 @@ func NewController( controller.enqueueRollout = controller.enqueueRateLimited controller.enqueueRolloutAfter = controller.enqueueAfter + util.CheckErr(rolloutsInformer.Informer().AddIndexers(cache.Indexers{ + serviceIndexName: func(obj interface{}) (strings []string, e error) { + if rollout, ok := obj.(*v1alpha1.Rollout); ok { + return getRolloutServiceKeys(rollout), nil + } + return []string{}, nil + }, + })) + log.Info("Setting up event handlers") // Set up an event handler for when rollout resources change rolloutsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -138,7 +146,15 @@ func NewController( UpdateFunc: func(old, new interface{}) { controller.enqueueRollout(new) }, + DeleteFunc: func(obj interface{}) { + if r, ok := obj.(*v1alpha1.Rollout); ok { + for _, s := range getRolloutServiceKeys(r) { + controller.serviceWorkqueue.AddRateLimited(s) + } + } + }, }) + replicaSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { @@ -153,6 +169,13 @@ func NewController( }, DeleteFunc: controller.handleObject, }) + servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueService, + UpdateFunc: func(oldObj, newObj interface{}) { + controller.enqueueService(newObj) + }, + DeleteFunc: controller.enqueueService, + }) return controller } @@ -162,22 +185,25 @@ func NewController( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() - defer c.workqueue.ShutDown() + defer c.rolloutWorkqueue.ShutDown() + defer c.serviceWorkqueue.ShutDown() // Start the informer factories to begin populating the informer caches log.Info("Starting Rollout controller") // Wait for the caches to be synced before starting workers log.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutsSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.servicesSynced, c.rolloutsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } log.Info("Starting workers") - // Launch two workers to process Rollout resources + for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(func() { c.runWorker(c.rolloutWorkqueue, logutil.RolloutKey, c.syncHandler) }, time.Second, stopCh) + go wait.Until(func() { c.runWorker(c.serviceWorkqueue, logutil.ServiceKey, c.syncService) }, time.Second, stopCh) } log.Info("Started workers") @@ -198,15 +224,15 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { +func (c *Controller) runWorker(workqueue workqueue.RateLimitingInterface, objType string, syncHandler func(string) error) { + for c.processNextWorkItem(workqueue, objType, syncHandler) { } } // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() +func (c *Controller) processNextWorkItem(workqueue workqueue.RateLimitingInterface, objType string, syncHandler func(string) error) bool { + obj, shutdown := workqueue.Get() if shutdown { return false @@ -220,7 +246,7 @@ func (c *Controller) processNextWorkItem() bool { // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. - defer c.workqueue.Done(obj) + defer workqueue.Done(obj) var key string var ok bool // We expect strings to come off the workqueue. These are of the @@ -232,13 +258,13 @@ func (c *Controller) processNextWorkItem() bool { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. - c.workqueue.Forget(obj) + workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // Run the syncHandler, passing it the namespace/name string of the // Rollout resource to be synced. - if err := c.syncHandler(key); err != nil { + if err := syncHandler(key); err != nil { err := fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) namespace, name, splitErr := cache.SplitMetaNamespaceKey(key) if splitErr != nil { @@ -246,13 +272,13 @@ func (c *Controller) processNextWorkItem() bool { } c.metricsServer.IncError(namespace, name) // Put the item back on the workqueue to handle any transient errors. - c.workqueue.AddRateLimited(key) + workqueue.AddRateLimited(key) return err } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - c.workqueue.Forget(obj) - log.WithField(logutil.RolloutKey, key).Infof("Successfully synced") + workqueue.Forget(obj) + log.WithField(objType, key).Infof("Successfully synced") return nil }(obj) @@ -364,7 +390,7 @@ func (c *Controller) enqueue(obj interface{}) { runtime.HandleError(err) return } - c.workqueue.Add(key) + c.rolloutWorkqueue.Add(key) } func (c *Controller) enqueueRateLimited(obj interface{}) { @@ -374,7 +400,7 @@ func (c *Controller) enqueueRateLimited(obj interface{}) { runtime.HandleError(err) return } - c.workqueue.AddRateLimited(key) + c.rolloutWorkqueue.AddRateLimited(key) } func (c *Controller) enqueueAfter(obj interface{}, duration time.Duration) { @@ -384,7 +410,7 @@ func (c *Controller) enqueueAfter(obj interface{}, duration time.Duration) { runtime.HandleError(err) return } - c.workqueue.AddAfter(key, duration) + c.rolloutWorkqueue.AddAfter(key, duration) } // handleObject will take any resource implementing metav1.Object and attempt diff --git a/controller/controller_test.go b/controller/controller_test.go index 829a692a08..214fb1e211 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -309,6 +309,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share c := NewController(f.kubeclient, f.client, k8sI.Apps().V1().ReplicaSets(), + k8sI.Core().V1().Services(), i.Argoproj().V1alpha1().Rollouts(), resync(), DefaultMetricsPort) diff --git a/controller/service.go b/controller/service.go index 11a878db7b..9c40d2b9c9 100644 --- a/controller/service.go +++ b/controller/service.go @@ -8,13 +8,17 @@ import ( errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" patchtypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/argoproj/argo-rollouts/utils/conditions" logutil "github.com/argoproj/argo-rollouts/utils/log" + log "github.com/sirupsen/logrus" ) const ( + serviceIndexName = "byService" switchSelectorPatch = `{ "spec": { "selector": { @@ -22,6 +26,7 @@ const ( } } }` + removeSelectorPatch = `[{ "op": "remove", "path": "/spec/selector/%s" }]` ) // switchSelector switch the selector on an existing service to a new value @@ -161,3 +166,79 @@ func (c *Controller) getRolloutSelectorLabel(svc *corev1.Service) (string, bool) currentSelectorValue, ok := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey] return currentSelectorValue, ok } + +func (c *Controller) enqueueService(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.serviceWorkqueue.AddRateLimited(key) +} + +func (c *Controller) syncService(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + svc, err := c.servicesLister.Services(namespace).Get(name) + if errors.IsNotFound(err) { + log.WithField(logutil.ServiceKey, key).Infof("Service %v has been deleted", key) + return nil + } + if err != nil { + return err + } + + if rollouts, err := c.getRolloutsByService(svc.Namespace, svc.Name); err == nil { + for i := range rollouts { + c.enqueueRollout(rollouts[i]) + } + + if _, hasHashSelector := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]; hasHashSelector && len(rollouts) == 0 { + updatedSvc := svc.DeepCopy() + delete(updatedSvc.Spec.Selector, v1alpha1.DefaultRolloutUniqueLabelKey) + patch := fmt.Sprintf(removeSelectorPatch, v1alpha1.DefaultRolloutUniqueLabelKey) + _, err := c.kubeclientset.CoreV1().Services(updatedSvc.Namespace).Patch(updatedSvc.Name, patchtypes.JSONPatchType, []byte(patch)) + if errors.IsNotFound(err) { + return nil + } + return err + } + } + return nil +} + +// getRolloutsByService returns all rollouts which are referencing specified service +func (c *Controller) getRolloutsByService(namespace string, serviceName string) ([]*v1alpha1.Rollout, error) { + objs, err := c.rolloutsIndexer.ByIndex(serviceIndexName, fmt.Sprintf("%s/%s", namespace, serviceName)) + if err != nil { + return nil, err + } + var rollouts []*v1alpha1.Rollout + for i := range objs { + if r, ok := objs[i].(*v1alpha1.Rollout); ok { + rollouts = append(rollouts, r) + } + } + return rollouts, nil +} + +// getRolloutServiceKeys returns services keys (namespace/serviceName) which are referenced by specified rollout +func getRolloutServiceKeys(rollout *v1alpha1.Rollout) []string { + servicesSet := make(map[string]bool) + if rollout.Spec.Strategy.BlueGreenStrategy != nil { + if rollout.Spec.Strategy.BlueGreenStrategy.ActiveService != "" { + servicesSet[fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.BlueGreenStrategy.ActiveService)] = true + } + if rollout.Spec.Strategy.BlueGreenStrategy.PreviewService != "" { + servicesSet[fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.BlueGreenStrategy.PreviewService)] = true + } + } + var services []string + for svc := range servicesSet { + services = append(services, svc) + } + return services +} diff --git a/controller/service_test.go b/controller/service_test.go index ed513427af..1519f62013 100644 --- a/controller/service_test.go +++ b/controller/service_test.go @@ -244,3 +244,31 @@ func TestPreviewServiceNotFound(t *testing.T) { _, pausedCondition := newProgressingCondition(conditions.ServiceNotFoundReason, notUsedPreviewSvc) assert.Equal(t, calculatePatch(r, fmt.Sprintf(expectedPatch, pausedCondition)), patch) } + +func TestGetRolloutServiceKeysForCanary(t *testing.T) { + keys := getRolloutServiceKeys(&v1alpha1.Rollout{ + Spec: v1alpha1.RolloutSpec{ + Strategy: v1alpha1.RolloutStrategy{ + CanaryStrategy: &v1alpha1.CanaryStrategy{}, + }, + }, + }) + assert.Empty(t, keys) +} + +func TestGetRolloutServiceKeysForBlueGreen(t *testing.T) { + keys := getRolloutServiceKeys(&v1alpha1.Rollout{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + }, + Spec: v1alpha1.RolloutSpec{ + Strategy: v1alpha1.RolloutStrategy{ + BlueGreenStrategy: &v1alpha1.BlueGreenStrategy{ + PreviewService: "preview-service", + ActiveService: "active-service", + }, + }, + }, + }) + assert.ElementsMatch(t, keys, []string{"default/preview-service", "default/active-service"}) +} diff --git a/utils/log/log.go b/utils/log/log.go index 8b70c9329c..cd732774be 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -9,6 +9,8 @@ import ( const ( // RolloutKey defines the key for the rollout field RolloutKey = "rollout" + // ServiceKey defines the key for the service field + ServiceKey = "service" // NamespaceKey defines the key for the namespace field NamespaceKey = "namespace" )