Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #71 - Add back service informer to handle Service recreations quicker #95

Merged
merged 1 commit into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newCommand() *cobra.Command {
informers.WithNamespace(namespace))
controller := controller.NewController(kubeClient, rolloutClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
rolloutInformerFactory.Argoproj().V1alpha1().Rollouts(),
resyncDuration,
metricsPort)
Expand Down
90 changes: 58 additions & 32 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/controller"

"github.com/argoproj/argo-rollouts/controller/metrics"
Expand All @@ -39,19 +42,6 @@ import (
const controllerAgentName = "rollouts-controller"

const (
// SuccessSynced is used as part of the Event 'reason' when a Rollout is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a Rollout fails
// to sync due to a Replica of the same name already existing.
ErrResourceExists = "ErrResourceExists"

// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Replica already existing
MessageResourceExists = "Resource %q already exists and is not managed by Rollout"
// MessageResourceSynced is the message used for an Event fired when a Rollout
// is synced successfully
MessageResourceSynced = "Rollout synced successfully"

// DefaultRolloutResyncPeriod Default time in seconds for rollout resync period
DefaultRolloutResyncPeriod = 15 * 60

Expand All @@ -73,6 +63,9 @@ type Controller struct {
replicaSetSynced cache.InformerSynced
rolloutsLister listers.RolloutLister
rolloutsSynced cache.InformerSynced
rolloutsIndexer cache.Indexer
servicesSynced cache.InformerSynced
servicesLister v1.ServiceLister
metricsServer *metrics.MetricsServer

// used for unit testing
Expand All @@ -84,7 +77,8 @@ type Controller struct {
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
rolloutWorkqueue workqueue.RateLimitingInterface
serviceWorkqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
Expand All @@ -96,6 +90,7 @@ func NewController(
kubeclientset kubernetes.Interface,
rolloutsclientset clientset.Interface,
replicaSetInformer appsinformers.ReplicaSetInformer,
servicesInformer coreinformers.ServiceInformer,
rolloutsInformer informers.RolloutInformer,
resyncPeriod time.Duration,
metricsPort int) *Controller {
Expand All @@ -121,24 +116,45 @@ func NewController(
replicaSetControl: replicaSetControl,
replicaSetLister: replicaSetInformer.Lister(),
replicaSetSynced: replicaSetInformer.Informer().HasSynced,
rolloutsIndexer: rolloutsInformer.Informer().GetIndexer(),
rolloutsLister: rolloutsInformer.Lister(),
rolloutsSynced: rolloutsInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"),
servicesSynced: servicesInformer.Informer().HasSynced,
servicesLister: servicesInformer.Lister(),
rolloutWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"),
serviceWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services"),
recorder: recorder,
resyncPeriod: resyncPeriod,
metricsServer: metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister()),
}
controller.enqueueRollout = controller.enqueueRateLimited
controller.enqueueRolloutAfter = controller.enqueueAfter

util.CheckErr(rolloutsInformer.Informer().AddIndexers(cache.Indexers{
serviceIndexName: func(obj interface{}) (strings []string, e error) {
if rollout, ok := obj.(*v1alpha1.Rollout); ok {
return getRolloutServiceKeys(rollout), nil
}
return []string{}, nil
},
}))

log.Info("Setting up event handlers")
// Set up an event handler for when rollout resources change
rolloutsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueRollout,
UpdateFunc: func(old, new interface{}) {
controller.enqueueRollout(new)
},
DeleteFunc: func(obj interface{}) {
if r, ok := obj.(*v1alpha1.Rollout); ok {
for _, s := range getRolloutServiceKeys(r) {
controller.serviceWorkqueue.AddRateLimited(s)
}
}
},
})

replicaSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
Expand All @@ -153,6 +169,13 @@ func NewController(
},
DeleteFunc: controller.handleObject,
})
servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueService,
UpdateFunc: func(oldObj, newObj interface{}) {
controller.enqueueService(newObj)
},
DeleteFunc: controller.enqueueService,
})

return controller
}
Expand All @@ -162,22 +185,25 @@ func NewController(
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {

defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
defer c.rolloutWorkqueue.ShutDown()
defer c.serviceWorkqueue.ShutDown()

// Start the informer factories to begin populating the informer caches
log.Info("Starting Rollout controller")

// Wait for the caches to be synced before starting workers
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutsSynced); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.servicesSynced, c.rolloutsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Info("Starting workers")
// Launch two workers to process Rollout resources

for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(func() { c.runWorker(c.rolloutWorkqueue, logutil.RolloutKey, c.syncHandler) }, time.Second, stopCh)
go wait.Until(func() { c.runWorker(c.serviceWorkqueue, logutil.ServiceKey, c.syncService) }, time.Second, stopCh)
}

log.Info("Started workers")
Expand All @@ -198,15 +224,15 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
func (c *Controller) runWorker(workqueue workqueue.RateLimitingInterface, objType string, syncHandler func(string) error) {
for c.processNextWorkItem(workqueue, objType, syncHandler) {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
func (c *Controller) processNextWorkItem(workqueue workqueue.RateLimitingInterface, objType string, syncHandler func(string) error) bool {
obj, shutdown := workqueue.Get()

if shutdown {
return false
Expand All @@ -220,7 +246,7 @@ func (c *Controller) processNextWorkItem() bool {
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
defer workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
Expand All @@ -232,27 +258,27 @@ func (c *Controller) processNextWorkItem() bool {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Rollout resource to be synced.
if err := c.syncHandler(key); err != nil {
if err := syncHandler(key); err != nil {
err := fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
namespace, name, splitErr := cache.SplitMetaNamespaceKey(key)
if splitErr != nil {
return errors.Wrapf(err, "Error splitting key %s: %s", key, splitErr.Error())
}
c.metricsServer.IncError(namespace, name)
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
workqueue.AddRateLimited(key)
return err
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
log.WithField(logutil.RolloutKey, key).Infof("Successfully synced")
workqueue.Forget(obj)
log.WithField(objType, key).Infof("Successfully synced")
return nil
}(obj)

Expand Down Expand Up @@ -364,7 +390,7 @@ func (c *Controller) enqueue(obj interface{}) {
runtime.HandleError(err)
return
}
c.workqueue.Add(key)
c.rolloutWorkqueue.Add(key)
}

func (c *Controller) enqueueRateLimited(obj interface{}) {
Expand All @@ -374,7 +400,7 @@ func (c *Controller) enqueueRateLimited(obj interface{}) {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
c.rolloutWorkqueue.AddRateLimited(key)
}

func (c *Controller) enqueueAfter(obj interface{}, duration time.Duration) {
Expand All @@ -384,7 +410,7 @@ func (c *Controller) enqueueAfter(obj interface{}, duration time.Duration) {
runtime.HandleError(err)
return
}
c.workqueue.AddAfter(key, duration)
c.rolloutWorkqueue.AddAfter(key, duration)
}

// handleObject will take any resource implementing metav1.Object and attempt
Expand Down
1 change: 1 addition & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share

c := NewController(f.kubeclient, f.client,
k8sI.Apps().V1().ReplicaSets(),
k8sI.Core().V1().Services(),
i.Argoproj().V1alpha1().Rollouts(),
resync(),
DefaultMetricsPort)
Expand Down
81 changes: 81 additions & 0 deletions controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
logutil "github.com/argoproj/argo-rollouts/utils/log"
log "github.com/sirupsen/logrus"
)

const (
serviceIndexName = "byService"
switchSelectorPatch = `{
"spec": {
"selector": {
"%s": "%s"
}
}
}`
removeSelectorPatch = `[{ "op": "remove", "path": "/spec/selector/%s" }]`
)

// switchSelector switch the selector on an existing service to a new value
Expand Down Expand Up @@ -161,3 +166,79 @@ func (c *Controller) getRolloutSelectorLabel(svc *corev1.Service) (string, bool)
currentSelectorValue, ok := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
return currentSelectorValue, ok
}

func (c *Controller) enqueueService(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.serviceWorkqueue.AddRateLimited(key)
}

func (c *Controller) syncService(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
svc, err := c.servicesLister.Services(namespace).Get(name)
if errors.IsNotFound(err) {
log.WithField(logutil.ServiceKey, key).Infof("Service %v has been deleted", key)
return nil
}
if err != nil {
return err
}

if rollouts, err := c.getRolloutsByService(svc.Namespace, svc.Name); err == nil {
for i := range rollouts {
c.enqueueRollout(rollouts[i])
}

if _, hasHashSelector := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]; hasHashSelector && len(rollouts) == 0 {
updatedSvc := svc.DeepCopy()
delete(updatedSvc.Spec.Selector, v1alpha1.DefaultRolloutUniqueLabelKey)
patch := fmt.Sprintf(removeSelectorPatch, v1alpha1.DefaultRolloutUniqueLabelKey)
_, err := c.kubeclientset.CoreV1().Services(updatedSvc.Namespace).Patch(updatedSvc.Name, patchtypes.JSONPatchType, []byte(patch))
if errors.IsNotFound(err) {
return nil
}
return err
}
}
return nil
}

// getRolloutsByService returns all rollouts which are referencing specified service
func (c *Controller) getRolloutsByService(namespace string, serviceName string) ([]*v1alpha1.Rollout, error) {
objs, err := c.rolloutsIndexer.ByIndex(serviceIndexName, fmt.Sprintf("%s/%s", namespace, serviceName))
if err != nil {
return nil, err
}
var rollouts []*v1alpha1.Rollout
for i := range objs {
if r, ok := objs[i].(*v1alpha1.Rollout); ok {
rollouts = append(rollouts, r)
}
}
return rollouts, nil
}

// getRolloutServiceKeys returns services keys (namespace/serviceName) which are referenced by specified rollout
func getRolloutServiceKeys(rollout *v1alpha1.Rollout) []string {
servicesSet := make(map[string]bool)
if rollout.Spec.Strategy.BlueGreenStrategy != nil {
if rollout.Spec.Strategy.BlueGreenStrategy.ActiveService != "" {
servicesSet[fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.BlueGreenStrategy.ActiveService)] = true
}
if rollout.Spec.Strategy.BlueGreenStrategy.PreviewService != "" {
servicesSet[fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.BlueGreenStrategy.PreviewService)] = true
}
}
var services []string
for svc := range servicesSet {
services = append(services, svc)
}
return services
}
Loading