Skip to content

Commit

Permalink
Issue argoproj#71 - Add back service informer to handle Service recre…
Browse files Browse the repository at this point in the history
…ations quicker
  • Loading branch information
Alexander Matyushentsev authored and Alexander Matyushentsev committed Jun 1, 2019
1 parent a6aa76d commit f764f45
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 32 deletions.
1 change: 1 addition & 0 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newCommand() *cobra.Command {
informers.WithNamespace(namespace))
controller := controller.NewController(kubeClient, rolloutClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
rolloutInformerFactory.Argoproj().V1alpha1().Rollouts(),
resyncDuration,
metricsPort)
Expand Down
90 changes: 58 additions & 32 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/controller"

"github.com/argoproj/argo-rollouts/controller/metrics"
Expand All @@ -39,19 +42,6 @@ import (
const controllerAgentName = "rollouts-controller"

const (
// SuccessSynced is used as part of the Event 'reason' when a Rollout is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a Rollout fails
// to sync due to a Replica of the same name already existing.
ErrResourceExists = "ErrResourceExists"

// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Replica already existing
MessageResourceExists = "Resource %q already exists and is not managed by Rollout"
// MessageResourceSynced is the message used for an Event fired when a Rollout
// is synced successfully
MessageResourceSynced = "Rollout synced successfully"

// DefaultRolloutResyncPeriod Default time in seconds for rollout resync period
DefaultRolloutResyncPeriod = 15 * 60

Expand All @@ -73,6 +63,9 @@ type Controller struct {
replicaSetSynced cache.InformerSynced
rolloutsLister listers.RolloutLister
rolloutsSynced cache.InformerSynced
rolloutsIndexer cache.Indexer
servicesSynced cache.InformerSynced
servicesLister v1.ServiceLister
metricsServer *metrics.MetricsServer

// used for unit testing
Expand All @@ -84,7 +77,8 @@ type Controller struct {
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
rolloutWorkqueue workqueue.RateLimitingInterface
serviceWorkqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
Expand All @@ -96,6 +90,7 @@ func NewController(
kubeclientset kubernetes.Interface,
rolloutsclientset clientset.Interface,
replicaSetInformer appsinformers.ReplicaSetInformer,
servicesInformer coreinformers.ServiceInformer,
rolloutsInformer informers.RolloutInformer,
resyncPeriod time.Duration,
metricsPort int) *Controller {
Expand All @@ -121,24 +116,45 @@ func NewController(
replicaSetControl: replicaSetControl,
replicaSetLister: replicaSetInformer.Lister(),
replicaSetSynced: replicaSetInformer.Informer().HasSynced,
rolloutsIndexer: rolloutsInformer.Informer().GetIndexer(),
rolloutsLister: rolloutsInformer.Lister(),
rolloutsSynced: rolloutsInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"),
servicesSynced: servicesInformer.Informer().HasSynced,
servicesLister: servicesInformer.Lister(),
rolloutWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"),
serviceWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services"),
recorder: recorder,
resyncPeriod: resyncPeriod,
metricsServer: metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister()),
}
controller.enqueueRollout = controller.enqueueRateLimited
controller.enqueueRolloutAfter = controller.enqueueAfter

util.CheckErr(rolloutsInformer.Informer().AddIndexers(cache.Indexers{
serviceIndexName: func(obj interface{}) (strings []string, e error) {
if rollout, ok := obj.(*v1alpha1.Rollout); ok {
return getRolloutServiceKeys(rollout), nil
}
return []string{}, nil
},
}))

log.Info("Setting up event handlers")
// Set up an event handler for when rollout resources change
rolloutsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueRollout,
UpdateFunc: func(old, new interface{}) {
controller.enqueueRollout(new)
},
DeleteFunc: func(obj interface{}) {
if r, ok := obj.(*v1alpha1.Rollout); ok {
for _, s := range getRolloutServiceKeys(r) {
controller.serviceWorkqueue.AddRateLimited(s)
}
}
},
})

replicaSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
Expand All @@ -153,6 +169,13 @@ func NewController(
},
DeleteFunc: controller.handleObject,
})
servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueService,
UpdateFunc: func(oldObj, newObj interface{}) {
controller.enqueueService(newObj)
},
DeleteFunc: controller.enqueueService,
})

return controller
}
Expand All @@ -162,22 +185,25 @@ func NewController(
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {

defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
defer c.rolloutWorkqueue.ShutDown()
defer c.serviceWorkqueue.ShutDown()

// Start the informer factories to begin populating the informer caches
log.Info("Starting Rollout controller")

// Wait for the caches to be synced before starting workers
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutsSynced); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.servicesSynced, c.rolloutsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Info("Starting workers")
// Launch two workers to process Rollout resources

for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(func() { c.runWorker(c.rolloutWorkqueue, logutil.RolloutKey, c.syncHandler) }, time.Second, stopCh)
go wait.Until(func() { c.runWorker(c.serviceWorkqueue, logutil.ServiceKey, c.syncService) }, time.Second, stopCh)
}

log.Info("Started workers")
Expand All @@ -198,15 +224,15 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
func (c *Controller) runWorker(workqueue workqueue.RateLimitingInterface, objType string, syncHandler func(string) error) {
for c.processNextWorkItem(workqueue, objType, syncHandler) {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
func (c *Controller) processNextWorkItem(workqueue workqueue.RateLimitingInterface, objType string, syncHandler func(string) error) bool {
obj, shutdown := workqueue.Get()

if shutdown {
return false
Expand All @@ -220,7 +246,7 @@ func (c *Controller) processNextWorkItem() bool {
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
defer workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
Expand All @@ -232,27 +258,27 @@ func (c *Controller) processNextWorkItem() bool {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Rollout resource to be synced.
if err := c.syncHandler(key); err != nil {
if err := syncHandler(key); err != nil {
err := fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
namespace, name, splitErr := cache.SplitMetaNamespaceKey(key)
if splitErr != nil {
return errors.Wrapf(err, "Error splitting key %s: %s", key, splitErr.Error())
}
c.metricsServer.IncError(namespace, name)
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
workqueue.AddRateLimited(key)
return err
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
log.WithField(logutil.RolloutKey, key).Infof("Successfully synced")
workqueue.Forget(obj)
log.WithField(objType, key).Infof("Successfully synced")
return nil
}(obj)

Expand Down Expand Up @@ -364,7 +390,7 @@ func (c *Controller) enqueue(obj interface{}) {
runtime.HandleError(err)
return
}
c.workqueue.Add(key)
c.rolloutWorkqueue.Add(key)
}

func (c *Controller) enqueueRateLimited(obj interface{}) {
Expand All @@ -374,7 +400,7 @@ func (c *Controller) enqueueRateLimited(obj interface{}) {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
c.rolloutWorkqueue.AddRateLimited(key)
}

func (c *Controller) enqueueAfter(obj interface{}, duration time.Duration) {
Expand All @@ -384,7 +410,7 @@ func (c *Controller) enqueueAfter(obj interface{}, duration time.Duration) {
runtime.HandleError(err)
return
}
c.workqueue.AddAfter(key, duration)
c.rolloutWorkqueue.AddAfter(key, duration)
}

// handleObject will take any resource implementing metav1.Object and attempt
Expand Down
1 change: 1 addition & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share

c := NewController(f.kubeclient, f.client,
k8sI.Apps().V1().ReplicaSets(),
k8sI.Core().V1().Services(),
i.Argoproj().V1alpha1().Rollouts(),
resync(),
DefaultMetricsPort)
Expand Down
81 changes: 81 additions & 0 deletions controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
logutil "github.com/argoproj/argo-rollouts/utils/log"
log "github.com/sirupsen/logrus"
)

const (
serviceIndexName = "byService"
switchSelectorPatch = `{
"spec": {
"selector": {
"%s": "%s"
}
}
}`
removeSelectorPatch = `[{ "op": "remove", "path": "/spec/selector/%s" }]`
)

// switchSelector switch the selector on an existing service to a new value
Expand Down Expand Up @@ -161,3 +166,79 @@ func (c *Controller) getRolloutSelectorLabel(svc *corev1.Service) (string, bool)
currentSelectorValue, ok := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
return currentSelectorValue, ok
}

func (c *Controller) enqueueService(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.serviceWorkqueue.AddRateLimited(key)
}

func (c *Controller) syncService(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
svc, err := c.servicesLister.Services(namespace).Get(name)
if errors.IsNotFound(err) {
log.WithField(logutil.ServiceKey, key).Infof("Service %v has been deleted", key)
return nil
}
if err != nil {
return err
}

if rollouts, err := c.getRolloutsByService(svc.Namespace, svc.Name); err == nil {
for i := range rollouts {
c.enqueueRollout(rollouts[i])
}

if _, hasHashSelector := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]; hasHashSelector && len(rollouts) == 0 {
updatedSvc := svc.DeepCopy()
delete(updatedSvc.Spec.Selector, v1alpha1.DefaultRolloutUniqueLabelKey)
patch := fmt.Sprintf(removeSelectorPatch, v1alpha1.DefaultRolloutUniqueLabelKey)
_, err := c.kubeclientset.CoreV1().Services(updatedSvc.Namespace).Patch(updatedSvc.Name, patchtypes.JSONPatchType, []byte(patch))
if errors.IsNotFound(err) {
return nil
}
return err
}
}
return nil
}

// getRolloutsByService returns all rollouts which are referencing specified service
func (c *Controller) getRolloutsByService(namespace string, serviceName string) ([]*v1alpha1.Rollout, error) {
objs, err := c.rolloutsIndexer.ByIndex(serviceIndexName, fmt.Sprintf("%s/%s", namespace, serviceName))
if err != nil {
return nil, err
}
var rollouts []*v1alpha1.Rollout
for i := range objs {
if r, ok := objs[i].(*v1alpha1.Rollout); ok {
rollouts = append(rollouts, r)
}
}
return rollouts, nil
}

// getRolloutServiceKeys returns services keys (namespace/serviceName) which are referenced by specified rollout
func getRolloutServiceKeys(rollout *v1alpha1.Rollout) []string {
servicesSet := make(map[string]bool)
if rollout.Spec.Strategy.BlueGreenStrategy != nil {
if rollout.Spec.Strategy.BlueGreenStrategy.ActiveService != "" {
servicesSet[fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.BlueGreenStrategy.ActiveService)] = true
}
if rollout.Spec.Strategy.BlueGreenStrategy.PreviewService != "" {
servicesSet[fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.BlueGreenStrategy.PreviewService)] = true
}
}
var services []string
for svc := range servicesSet {
services = append(services, svc)
}
return services
}
Loading

0 comments on commit f764f45

Please sign in to comment.