From 3cc5ec679eda5f24574266b2ce36486188532559 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Thu, 14 Jun 2018 13:57:20 +0000 Subject: [PATCH] WIP Move the Service controller to use the new controller model. Fixes: https://github.com/knative/serving/issues/1134 --- pkg/apis/serving/v1alpha1/revision_types.go | 13 +- pkg/apis/serving/v1alpha1/service_types.go | 73 +++++- pkg/controller/configuration/configuration.go | 2 +- pkg/controller/names.go | 8 + pkg/controller/service/service.go | 207 +++++++++++------- .../service/service_configuration.go | 2 +- pkg/controller/service/service_route.go | 6 +- pkg/controller/service/service_route_test.go | 10 +- 8 files changed, 224 insertions(+), 97 deletions(-) diff --git a/pkg/apis/serving/v1alpha1/revision_types.go b/pkg/apis/serving/v1alpha1/revision_types.go index 7965c5c22edf..958c976e2e21 100644 --- a/pkg/apis/serving/v1alpha1/revision_types.go +++ b/pkg/apis/serving/v1alpha1/revision_types.go @@ -398,13 +398,12 @@ func (rs *RevisionStatus) MarkContainerMissing(message string) { } func (rs *RevisionStatus) checkAndMarkReady() { - ra := rs.GetCondition(RevisionConditionResourcesAvailable) - if ra == nil || ra.Status != corev1.ConditionTrue { - return - } - ch := rs.GetCondition(RevisionConditionContainerHealthy) - if ch == nil || ch.Status != corev1.ConditionTrue { - return + rct := []RevisionConditionType{RevisionConditionContainerHealthy, RevisionConditionResourcesAvailable} + for _, cond := range rct { + c := rs.GetCondition(cond) + if c == nil || c.Status != corev1.ConditionTrue { + return + } } rs.markReady() } diff --git a/pkg/apis/serving/v1alpha1/service_types.go b/pkg/apis/serving/v1alpha1/service_types.go index 22c1a689da90..2e4afa6598ca 100644 --- a/pkg/apis/serving/v1alpha1/service_types.go +++ b/pkg/apis/serving/v1alpha1/service_types.go @@ -99,6 +99,12 @@ const ( // ServiceConditionReady is set when the service is configured // and has available backends ready to receive traffic. ServiceConditionReady ServiceConditionType = "Ready" + // ServiceConditionRouteReady is set when the service's underlying + // route has reported readiness. + ServiceConditionRouteReady ServiceConditionType = "RouteReady" + // ServiceConditionConfigurationReady is set when the service's underlying + // configuration has reported readiness. + ServiceConditionConfigurationReady ServiceConditionType = "ConfigurationReady" ) type ServiceStatus struct { @@ -183,7 +189,8 @@ func (ss *ServiceStatus) RemoveCondition(t ServiceConditionType) { } func (ss *ServiceStatus) InitializeConditions() { - sct := []ServiceConditionType{ServiceConditionReady} + sct := []ServiceConditionType{ServiceConditionReady, + ServiceConditionConfigurationReady, ServiceConditionRouteReady} for _, cond := range sct { if rc := ss.GetCondition(cond); rc == nil { ss.setCondition(&ServiceCondition{ @@ -193,3 +200,67 @@ func (ss *ServiceStatus) InitializeConditions() { } } } + +func (ss *ServiceStatus) PropagateConfiguration(cs ConfigurationStatus) { + cc := cs.GetCondition(ConfigurationConditionReady) + if cc == nil { + return + } + sct := []ServiceConditionType{ServiceConditionConfigurationReady} + // If the underlying Configuration reported failure, then bubble it up. + if cc.Status == corev1.ConditionFalse { + sct = append(sct, ServiceConditionReady) + } + for _, cond := range sct { + ss.setCondition(&ServiceCondition{ + Type: cond, + Status: cc.Status, + Reason: cc.Reason, + Message: cc.Message, + }) + } + if cc.Status == corev1.ConditionTrue { + ss.checkAndMarkReady() + } +} + +func (ss *ServiceStatus) PropagateRoute(rs RouteStatus) { + rc := rs.GetCondition(RouteConditionReady) + if rc == nil { + return + } + sct := []ServiceConditionType{ServiceConditionRouteReady} + // If the underlying Route reported failure, then bubble it up. + if rc.Status == corev1.ConditionFalse { + sct = append(sct, ServiceConditionReady) + } + for _, cond := range sct { + ss.setCondition(&ServiceCondition{ + Type: cond, + Status: rc.Status, + Reason: rc.Reason, + Message: rc.Message, + }) + } + if rc.Status == corev1.ConditionTrue { + ss.checkAndMarkReady() + } +} + +func (ss *ServiceStatus) checkAndMarkReady() { + sct := []ServiceConditionType{ServiceConditionConfigurationReady, ServiceConditionRouteReady} + for _, cond := range sct { + c := ss.GetCondition(cond) + if c == nil || c.Status != corev1.ConditionTrue { + return + } + } + ss.markReady() +} + +func (ss *ServiceStatus) markReady() { + ss.setCondition(&ServiceCondition{ + Type: ServiceConditionReady, + Status: corev1.ConditionTrue, + }) +} diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index 3a5ef1a4fd95..a6403d9b9c43 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -44,7 +44,7 @@ type Controller struct { buildClientSet buildclientset.Interface - // lister indexes properties about Configuration + // listers index properties about resources lister listers.ConfigurationLister revisionLister listers.RevisionLister } diff --git a/pkg/controller/names.go b/pkg/controller/names.go index b1d1f1f5677b..40a9d9cabe76 100644 --- a/pkg/controller/names.go +++ b/pkg/controller/names.go @@ -72,6 +72,14 @@ func GetElaK8SServiceName(u *v1alpha1.Route) string { return u.Name + "-service" } +func GetServiceConfigurationName(u *v1alpha1.Service) string { + return u.Name +} + +func GetServiceRouteName(u *v1alpha1.Service) string { + return u.Name +} + func GetElaK8SActivatorServiceName() string { return "activator-service" } diff --git a/pkg/controller/service/service.go b/pkg/controller/service/service.go index a0491baba5d6..899e13e62eb9 100644 --- a/pkg/controller/service/service.go +++ b/pkg/controller/service/service.go @@ -20,7 +20,10 @@ import ( "fmt" "reflect" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -32,28 +35,18 @@ import ( listers "github.com/knative/serving/pkg/client/listers/serving/v1alpha1" "github.com/knative/serving/pkg/controller" "github.com/knative/serving/pkg/logging/logkey" - "go.opencensus.io/stats" - "go.opencensus.io/tag" ) -var ( - processItemCount = stats.Int64( - "controller_service_queue_process_count", - "Counter to keep track of items in the service work queue", - stats.UnitNone) - statusTagKey tag.Key -) - -const ( - controllerAgentName = "service-controller" -) +const controllerAgentName = "service-controller" // Controller implements the controller for Service resources. type Controller struct { *controller.Base - // lister indexes properties about Services - lister listers.ServiceLister + // listers index properties about resources + lister listers.ServiceLister + configurationLister listers.ConfigurationLister + routeLister listers.RouteLister } // NewController initializes the controller and is called by the generated code @@ -65,12 +58,17 @@ func NewController( // obtain references to a shared index informer for the Services. informer := elaInformerFactory.Serving().V1alpha1().Services() + configurationInformer := elaInformerFactory.Serving().V1alpha1().Configurations() + routeInformer := elaInformerFactory.Serving().V1alpha1().Routes() - informers := []cache.SharedIndexInformer{informer.Informer()} + informers := []cache.SharedIndexInformer{informer.Informer(), + configurationInformer.Informer(), routeInformer.Informer()} controller := &Controller{ - Base: controller.NewBase(opt, controllerAgentName, "Services", informers), - lister: informer.Lister(), + Base: controller.NewBase(opt, controllerAgentName, "Services", informers), + lister: informer.Lister(), + configurationLister: configurationInformer.Lister(), + routeLister: routeInformer.Lister(), } controller.Logger.Info("Setting up event handlers") @@ -82,15 +80,45 @@ func NewController( DeleteFunc: controller.Enqueue, }) + configurationInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: isControlled, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: controller.EnqueueControllerOf, + UpdateFunc: func(old, new interface{}) { + controller.EnqueueControllerOf(new) + }, + }, + }) + + routeInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: isControlled, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: controller.EnqueueControllerOf, + UpdateFunc: func(old, new interface{}) { + controller.EnqueueControllerOf(new) + }, + }, + }) + return controller } +func isControlled(obj interface{}) bool { + if object, ok := obj.(metav1.Object); ok { + owner := metav1.GetControllerOf(object) + return owner != nil && + owner.APIVersion == v1alpha1.SchemeGroupVersion.String() && + owner.Kind == "Service" + } + return false +} + // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - return c.RunController(threadiness, stopCh, c.updateServiceEvent, "Service") + return c.RunController(threadiness, stopCh, c.Reconcile, "Service") } // loggerWithServiceInfo enriches the logs with service name and namespace. @@ -98,10 +126,10 @@ func loggerWithServiceInfo(logger *zap.SugaredLogger, ns string, name string) *z return logger.With(zap.String(logkey.Namespace, ns), zap.String(logkey.Service, name)) } -// updateServiceEvent compares the actual state with the desired, and attempts to +// Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Service resource // with the current status of the resource. -func (c *Controller) updateServiceEvent(key string) error { +func (c *Controller) Reconcile(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -109,52 +137,63 @@ func (c *Controller) updateServiceEvent(key string) error { return nil } + // Wrap our logger with the additional context of the configuration that we are reconciling. logger := loggerWithServiceInfo(c.Logger, namespace, name) // Get the Service resource with this namespace/name service, err := c.lister.Services(namespace).Get(name) - if err != nil { - // The resource may no longer exist, in which case we stop - // processing. - if apierrs.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("service %q in work queue no longer exists", key)) - return nil - } - + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + runtime.HandleError(fmt.Errorf("service %q in work queue no longer exists", key)) + return nil + } else if err != nil { return err } + // Don't modify the informers copy service = service.DeepCopy() service.Status.InitializeConditions() - // We added the Generation to avoid fighting the Configuration controller, - // which adds a Generation to avoid fighting the Revision controller. We - // shouldn't need this once k8s 1.10 lands, see: - // https://github.com/kubernetes/kubernetes/issues/58778 - // TODO(#642): Remove this. - if service.GetGeneration() == service.Status.ObservedGeneration { - logger.Infof("Skipping reconcile since already reconciled %d == %d", - service.Spec.Generation, service.Status.ObservedGeneration) - return nil - } - - logger.Infof("Running reconcile Service for %s\n%+v\n", service.Name, service) - - config := MakeServiceConfiguration(service) - if err := c.reconcileConfiguration(config); err != nil { - logger.Errorf("Failed to update Configuration for %q: %v", service.Name, err) + configName := controller.GetServiceConfigurationName(service) + config, err := c.configurationLister.Configurations(service.Namespace).Get(configName) + if errors.IsNotFound(err) { + config, err = c.createConfiguration(service) + if err != nil { + logger.Errorf("Failed to create Configuration %q: %v", configName, err) + c.Recorder.Eventf(service, corev1.EventTypeWarning, "CreationFailed", "Failed to create Configuration %q: %v", configName, err) + return err + } + } else if err != nil { + logger.Errorf("Failed to reconcile Service: %q failed to Get Configuration: %q; %v", service.Name, configName, zap.Error(err)) + return err + } else if config, err = c.reconcileConfiguration(service, config); err != nil { + logger.Errorf("Failed to reconcile Service: %q failed to reconcile Configuration: %q; %v", service.Name, configName, zap.Error(err)) return err } - // TODO: If revision is specified, check that the revision is ready before - // switching routes to it. Though route controller might just do the right thing? + // Update our Status based on the state of our underlying Configuration. + service.Status.PropagateConfiguration(config.Status) - route := MakeServiceRoute(service, config.Name) - if err := c.reconcileRoute(route); err != nil { - logger.Errorf("Failed to update Route for %q: %v", service.Name, err) + routeName := controller.GetServiceRouteName(service) + route, err := c.routeLister.Routes(service.Namespace).Get(routeName) + if errors.IsNotFound(err) { + route, err = c.createRoute(service) + if err != nil { + logger.Errorf("Failed to create Route %q: %v", routeName, err) + c.Recorder.Eventf(service, corev1.EventTypeWarning, "CreationFailed", "Failed to create Route %q: %v", routeName, err) + return err + } + } else if err != nil { + logger.Errorf("Failed to reconcile Service: %q failed to Get Route: %q", service.Name, routeName) + return err + } else if route, err = c.reconcileRoute(service, route); err != nil { + logger.Errorf("Failed to reconcile Service: %q failed to reconcile Route: %q", service.Name, routeName) return err } + // Update our Status based on the state of our underlying Route. + service.Status.PropagateRoute(route.Status) + // Update the Status of the Service with the latest generation that // we just reconciled against so we don't keep generating Revisions. // TODO(#642): Remove this. @@ -185,38 +224,50 @@ func (c *Controller) updateStatus(service *v1alpha1.Service) (*v1alpha1.Service, return existing, nil } -func (c *Controller) reconcileConfiguration(config *v1alpha1.Configuration) error { - configClient := c.ElaClientSet.ServingV1alpha1().Configurations(config.Namespace) +func (c *Controller) createConfiguration(service *v1alpha1.Service) (*v1alpha1.Configuration, error) { + configClient := c.ElaClientSet.ServingV1alpha1().Configurations(service.Namespace) + return configClient.Create(MakeServiceConfiguration(service)) +} - existing, err := configClient.Get(config.Name, metav1.GetOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - _, err := configClient.Create(config) - return err - } - return err +func (c *Controller) reconcileConfiguration(service *v1alpha1.Service, config *v1alpha1.Configuration) (*v1alpha1.Configuration, error) { + logger := loggerWithServiceInfo(c.Logger, service.Namespace, service.Name) + desiredConfig := MakeServiceConfiguration(service) + + // TODO(#642): Remove this (needed to avoid continuous updates) + desiredConfig.Spec.Generation = config.Spec.Generation + + if diff := cmp.Diff(desiredConfig.Spec, config.Spec); diff == "" { + // No differences to reconcile. + return config, nil + } else { + logger.Infof("Reconciling configuration diff (-desired,+observed): %v", diff) } - // TODO(vaikas): Perhaps only update if there are actual changes. - copy := existing.DeepCopy() - copy.Spec = config.Spec - _, err = configClient.Update(copy) - return err + // Preserve the rest of the object (e.g. ObjectMeta) + config.Spec = desiredConfig.Spec + configClient := c.ElaClientSet.ServingV1alpha1().Configurations(service.Namespace) + return configClient.Update(config) } -func (c *Controller) reconcileRoute(route *v1alpha1.Route) error { - routeClient := c.ElaClientSet.ServingV1alpha1().Routes(route.Namespace) +func (c *Controller) createRoute(service *v1alpha1.Service) (*v1alpha1.Route, error) { + routeClient := c.ElaClientSet.ServingV1alpha1().Routes(service.Namespace) + return routeClient.Create(MakeServiceRoute(service)) +} - existing, err := routeClient.Get(route.Name, metav1.GetOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - _, err := routeClient.Create(route) - return err - } - return err +func (c *Controller) reconcileRoute(service *v1alpha1.Service, route *v1alpha1.Route) (*v1alpha1.Route, error) { + logger := loggerWithServiceInfo(c.Logger, service.Namespace, service.Name) + desiredRoute := MakeServiceRoute(service) + + // TODO(#642): Remove this (needed to avoid continuous updates) + desiredRoute.Spec.Generation = route.Spec.Generation + + if diff := cmp.Diff(desiredRoute.Spec, route.Spec); diff == "" { + // No differences to reconcile. + return route, nil + } else { + logger.Infof("Reconciling route diff (-desired,+observed): %v", diff) } - // TODO(vaikas): Perhaps only update if there are actual changes. - copy := existing.DeepCopy() - copy.Spec = route.Spec - _, err = routeClient.Update(copy) - return err + // Preserve the rest of the object (e.g. ObjectMeta) + route.Spec = desiredRoute.Spec + routeClient := c.ElaClientSet.ServingV1alpha1().Routes(service.Namespace) + return routeClient.Update(route) } diff --git a/pkg/controller/service/service_configuration.go b/pkg/controller/service/service_configuration.go index f84b5c174ce3..1efb18fb62e3 100644 --- a/pkg/controller/service/service_configuration.go +++ b/pkg/controller/service/service_configuration.go @@ -27,7 +27,7 @@ import ( func MakeServiceConfiguration(service *v1alpha1.Service) *v1alpha1.Configuration { c := &v1alpha1.Configuration{ ObjectMeta: metav1.ObjectMeta{ - Name: service.Name, + Name: controller.GetServiceConfigurationName(service), Namespace: service.Namespace, OwnerReferences: []metav1.OwnerReference{ *controller.NewServiceControllerRef(service), diff --git a/pkg/controller/service/service_route.go b/pkg/controller/service/service_route.go index f020f07d8a06..6603bf3c82cf 100644 --- a/pkg/controller/service/service_route.go +++ b/pkg/controller/service/service_route.go @@ -24,10 +24,10 @@ import ( ) // MakeServiceRoute creates a Route from a Service object. -func MakeServiceRoute(service *v1alpha1.Service, configName string) *v1alpha1.Route { +func MakeServiceRoute(service *v1alpha1.Service) *v1alpha1.Route { c := &v1alpha1.Route{ ObjectMeta: metav1.ObjectMeta{ - Name: service.Name, + Name: controller.GetServiceRouteName(service), Namespace: service.Namespace, OwnerReferences: []metav1.OwnerReference{ *controller.NewServiceControllerRef(service), @@ -42,7 +42,7 @@ func MakeServiceRoute(service *v1alpha1.Service, configName string) *v1alpha1.Ro // If there's RunLatest, use the configName, otherwise pin to a specific Revision // as specified in the Pinned section of the Service spec. if service.Spec.RunLatest != nil { - tt.ConfigurationName = configName + tt.ConfigurationName = controller.GetServiceConfigurationName(service) } else { tt.RevisionName = service.Spec.Pinned.RevisionName } diff --git a/pkg/controller/service/service_route_test.go b/pkg/controller/service/service_route_test.go index e51578641ad0..2bcdbc680977 100644 --- a/pkg/controller/service/service_route_test.go +++ b/pkg/controller/service/service_route_test.go @@ -17,15 +17,13 @@ import ( "testing" "github.com/knative/serving/pkg/apis/serving" -) - -const ( - testConfigName string = "test-configuration" + "github.com/knative/serving/pkg/controller" ) func TestRouteRunLatest(t *testing.T) { s := createServiceWithRunLatest() - r := MakeServiceRoute(s, testConfigName) + testConfigName := controller.GetServiceConfigurationName(s) + r := MakeServiceRoute(s) if got, want := r.Name, testServiceName; got != want { t.Errorf("expected %q for service name got %q", want, got) } @@ -60,7 +58,7 @@ func TestRouteRunLatest(t *testing.T) { func TestRoutePinned(t *testing.T) { s := createServiceWithPinned() - r := MakeServiceRoute(s, testConfigName) + r := MakeServiceRoute(s) if got, want := r.Name, testServiceName; got != want { t.Errorf("expected %q for service name got %q", want, got) }