diff --git a/go.mod b/go.mod index fa3dd960c03..45aeeedd9cd 100644 --- a/go.mod +++ b/go.mod @@ -212,7 +212,7 @@ require ( github.com/opencontainers/image-spec v1.1.0-rc3 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/prometheus/procfs v0.9.0 // indirect github.com/prometheus/prom2json v1.3.2 // indirect github.com/prometheus/statsd_exporter v0.23.0 // indirect @@ -226,7 +226,7 @@ require ( github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/stoewer/go-strcase v1.3.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.2 // indirect github.com/subosito/gotenv v1.4.2 // indirect github.com/vbatts/tar-split v0.11.2 // indirect github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect diff --git a/pilot/pkg/bootstrap/configcontroller.go b/pilot/pkg/bootstrap/configcontroller.go index 6020f86ffd0..9a7297aa29f 100644 --- a/pilot/pkg/bootstrap/configcontroller.go +++ b/pilot/pkg/bootstrap/configcontroller.go @@ -124,8 +124,6 @@ func (s *Server) initConfigController(args *PilotArgs) error { // Create the config store. s.environment.ConfigStore = aggregateConfigController - s.startIOR(args) - // Defer starting the controller until after the service is created. s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) @@ -135,32 +133,6 @@ func (s *Server) initConfigController(args *PilotArgs) error { return nil } -// startIOR tries to start IOR, if it's enabled. If it encounters any failure, it logs an error and continue -func (s *Server) startIOR(args *PilotArgs) { - if !features.EnableIOR { - return - } - - routerClient, err := ior.NewRouterClient() - if err != nil { - ior.IORLog.Errorf("error creating an openshift router client: %v", err) - return - } - - iorKubeClient := ior.NewKubeClient(s.kubeClient) - - s.addStartFunc(func(stop <-chan struct{}) error { - go leaderelection. - NewLeaderElection(args.Namespace, args.PodName, leaderelection.IORController, args.Revision, s.kubeClient). - AddRunFunction(func(stop <-chan struct{}) { - if err := ior.Register(iorKubeClient, routerClient, s.configController, args.Namespace, s.kubeClient.GetMemberRollController(), stop, nil); err != nil { - ior.IORLog.Error(err) - } - }).Run(stop) - return nil - }) -} - func (s *Server) initK8SConfigStore(args *PilotArgs) error { if s.kubeClient == nil { return nil @@ -233,6 +205,16 @@ func (s *Server) initK8SConfigStore(args *PilotArgs) error { return err } } + if features.EnableIOR { + s.addTerminatingStartFunc(func(stop <-chan struct{}) error { + leaderelection. + NewLeaderElection(args.Namespace, args.PodName, leaderelection.IORController, args.Revision, s.kubeClient). + AddRunFunction(func(leaderStop <-chan struct{}) { + ior.Run(s.kubeClient, configController, leaderStop) + }).Run(stop) + return nil + }) + } s.RWConfigStore, err = configaggregate.MakeWriteableCache(s.ConfigStores, configController) if err != nil { return err diff --git a/pilot/pkg/config/kube/ior/controller.go b/pilot/pkg/config/kube/ior/controller.go new file mode 100644 index 00000000000..3ce828e4ec5 --- /dev/null +++ b/pilot/pkg/config/kube/ior/controller.go @@ -0,0 +1,477 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ior + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + v1 "github.com/openshift/api/route/v1" + routeclient "github.com/openshift/client-go/route/clientset/versioned" + routeListerV1 "github.com/openshift/client-go/route/listers/route/v1" + "github.com/pkg/errors" + "golang.org/x/net/context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + listerv1 "k8s.io/client-go/listers/core/v1" + + networking "istio.io/api/networking/v1alpha3" + "istio.io/istio/pilot/pkg/model" + "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/schema/collections" + "istio.io/pkg/log" +) + +const ( + maistraPrefix = "maistra.io/" + generatedByLabel = maistraPrefix + "generated-by" + generatedByValue = "ior" + originalHostAnnotation = maistraPrefix + "original-host" + gatewayNameLabel = maistraPrefix + "gateway-name" + gatewayNamespaceLabel = maistraPrefix + "gateway-namespace" + gatewayResourceVersionLabel = maistraPrefix + "gateway-resourceVersion" + ShouldManageRouteAnnotation = maistraPrefix + "manageRoute" +) + +// routeController manages the integration between Istio Gateways and OpenShift Routes +type routeController struct { + store model.ConfigStoreController + + podLister listerv1.PodLister + serviceLister listerv1.ServiceLister + + routeClient routeclient.Interface + routeLister routeListerV1.RouteLister +} + +// newRouteController returns a new instance of Route object +func newRouteController(kubeClient KubeClient, store model.ConfigStoreController) *routeController { + for !kubeClient.IsRouteSupported() { + IORLog.Infof("routes are not supported in this cluster; waiting for Route resource to become available...") + time.Sleep(10 * time.Second) + } + + r := &routeController{ + store: store, + podLister: kubeClient.GetActualClient().KubeInformer().Core().V1().Pods().Lister(), + serviceLister: kubeClient.GetActualClient().KubeInformer().Core().V1().Services().Lister(), + + routeClient: kubeClient.GetActualClient().Route(), + routeLister: kubeClient.GetActualClient().RouteInformer().Route().V1().Routes().Lister(), + } + + return r +} + +func isManagedByIOR(cfg config.Config) (bool, error) { + // We don't manage egress gateways, but we can only look for the default label here. + // Users can still use generic labels (e.g. "app: my-ingressgateway" as in the istio docs) to refer to the gateway pod + gw := cfg.Spec.(*networking.Gateway) + if istioLabel, ok := gw.Selector["istio"]; ok && istioLabel == "egressgateway" { + return false, nil + } + + manageRouteValue, ok := cfg.Annotations[ShouldManageRouteAnnotation] + if !ok { + // Manage routes by default, when annotation is not found. + return true, nil + } + + manageRoute, err := strconv.ParseBool(manageRouteValue) + if err != nil { + return false, fmt.Errorf("could not parse annotation %q: %s", ShouldManageRouteAnnotation, err) + } + + return manageRoute, nil +} + +func getHost(route v1.Route) string { + if host := route.ObjectMeta.Annotations[originalHostAnnotation]; host != "" { + return host + } + return route.Spec.Host +} + +func (r *routeController) deleteRoute(route *v1.Route) error { + var immediate int64 + host := getHost(*route) + err := r.routeClient.RouteV1().Routes(route.Namespace).Delete(context.TODO(), route.ObjectMeta.Name, metav1.DeleteOptions{GracePeriodSeconds: &immediate}) + if err != nil { + return errors.Wrapf(err, "error deleting route %s/%s for the host %s", + route.ObjectMeta.Name, + route.ObjectMeta.Namespace, + host) + } + + IORLog.Infof("route %s/%s deleted for the host %s", route.ObjectMeta.Name, route.ObjectMeta.Namespace, host) + return nil +} + +func buildRoute(metadata config.Meta, originalHost string, tls *networking.ServerTLSSettings, serviceNamespace string, serviceName string) *v1.Route { + actualHost, wildcard := getActualHost(originalHost, true) + + var tlsConfig *v1.TLSConfig + targetPort := "http2" + if tls != nil { + tlsConfig = &v1.TLSConfig{Termination: v1.TLSTerminationPassthrough} + targetPort = "https" + if tls.HttpsRedirect { + tlsConfig.InsecureEdgeTerminationPolicy = v1.InsecureEdgeTerminationPolicyRedirect + } + } + + // Copy annotationMap + annotationMap := map[string]string{ + originalHostAnnotation: originalHost, + } + for keyName, keyValue := range metadata.Annotations { + if !strings.HasPrefix(keyName, "kubectl.kubernetes.io") && keyName != ShouldManageRouteAnnotation { + annotationMap[keyName] = keyValue + } + } + + // Copy labelMap + labelMap := getDefaultRouteLabelMap(metadata.Name, metadata.Namespace) + labelMap[gatewayResourceVersionLabel] = metadata.ResourceVersion + + for keyName, keyValue := range metadata.Labels { + if !strings.HasPrefix(keyName, maistraPrefix) { + labelMap[keyName] = keyValue + } + } + + return &v1.Route{ + ObjectMeta: metav1.ObjectMeta{ + Name: getRouteName(metadata.Namespace, metadata.Name, originalHost), + Namespace: serviceNamespace, + Labels: labelMap, + Annotations: annotationMap, + }, + Spec: v1.RouteSpec{ + Host: actualHost, + Port: &v1.RoutePort{ + TargetPort: intstr.IntOrString{ + Type: intstr.String, + StrVal: targetPort, + }, + }, + To: v1.RouteTargetReference{ + Name: serviceName, + }, + TLS: tlsConfig, + WildcardPolicy: wildcard, + }, + } +} + +func (r *routeController) createRoute( + metadata config.Meta, + originalHost string, + tls *networking.ServerTLSSettings, + serviceNamespace, serviceName string, +) (*v1.Route, error) { + IORLog.Debugf("creating route for hostname %s", originalHost) + + nr, err := r. + routeClient. + RouteV1(). + Routes(serviceNamespace). + Create(context.TODO(), buildRoute(metadata, originalHost, tls, serviceNamespace, serviceName), metav1.CreateOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "error creating a route for the host %s from gateway: %s/%s", + originalHost, + metadata.Namespace, + metadata.Name) + } + + IORLog.Infof("route %s/%s created for hostname %s from gateway %s/%s", + nr.ObjectMeta.Namespace, nr.ObjectMeta.Name, + nr.Spec.Host, + metadata.Namespace, metadata.Name) + + return nr, nil +} + +func (r *routeController) updateRoute( + metadata config.Meta, + originalHost string, + tls *networking.ServerTLSSettings, + serviceNamespace string, serviceName string, + route *v1.Route, +) (*v1.Route, error) { + IORLog.Debugf("updating route for hostname %s", originalHost) + + curr := buildRoute(metadata, originalHost, tls, serviceNamespace, serviceName) + + curr.ResourceVersion = route.ResourceVersion + + nr, err := r. + routeClient. + RouteV1(). + Routes(serviceNamespace). + Update(context.TODO(), curr, metav1.UpdateOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "error updating a route for the host %s from gateway: %s/%s", + originalHost, + metadata.Namespace, + metadata.Name) + } + + IORLog.Infof("route %s/%s updated for hostname %s from gateway %s/%s", + nr.ObjectMeta.Namespace, nr.ObjectMeta.Name, + nr.Spec.Host, + metadata.Namespace, metadata.Name) + + return nr, nil +} + +func (r *routeController) findRoutes(metadata config.Meta) ([]*v1.Route, error) { + return r.routeLister.List( + labels.SelectorFromSet( + getDefaultRouteLabelMap(metadata.Name, metadata.Namespace), + ), + ) +} + +// findService tries to find a service that matches with the given gateway selector +// Returns the namespace and service name that is a match, or an error +func (r *routeController) findService(gateway *networking.Gateway) (*model.NamespacedName, error) { + gwSelector := labels.SelectorFromSet(gateway.Selector) + + // Get the list of pods that match the gateway selector + pods, err := r.podLister.List(gwSelector) + if err != nil { + return nil, errors.Wrapf(err, "could not get the list of pods with labels %s", gwSelector.String()) + } + + IORLog.Debugf("found %d pod(s) with %s gateway selector", len(pods), gwSelector) + + // Get the list of services in this namespace + + for _, pod := range pods { + services, err := r.serviceLister.Services(pod.Namespace).List(labels.Everything()) + if err != nil { + return nil, errors.Wrapf(err, "could not get all the services in namespace %s", pod.Namespace) + } + IORLog.Debugf("found %d service(s) under %s namespace", len(services), pod.Namespace) + podLabels := labels.Set(pod.ObjectMeta.Labels) + // Look for a service whose selector matches the pod labels + for _, service := range services { + svcSelector := labels.SelectorFromSet(service.Spec.Selector) + + IORLog.Debugf("matching service selector %s against %s", svcSelector.String(), podLabels) + if svcSelector.Matches(podLabels) { + return &model.NamespacedName{Namespace: pod.Namespace, Name: service.Name}, nil + } + + } + } + + return nil, fmt.Errorf("could not find a service that matches the gateway selector '%s'", gwSelector.String()) +} + +func getRouteName(namespace, name, host string) string { + return fmt.Sprintf("%s-%s-%s", namespace, name, hostHash(host)) +} + +// getActualHost returns the actual hostname to be used in the route +// `emitWarning` should be false when this function is used internally, without user interaction +// It also returns the route's WildcardPolicy based on the hostname +func getActualHost(originalHost string, emitWarning bool) (string, v1.WildcardPolicyType) { + wildcard := v1.WildcardPolicyNone + + if strings.Contains(originalHost, "/") { + originalHost = strings.SplitN(originalHost, "/", 2)[1] + IORLog.Debugf("Hostname contains a namespace part. Ignoring it and considering the %q portion.", originalHost) + } + + actualHost := originalHost + + if originalHost == "*" { + actualHost = "" + if emitWarning { + IORLog.Warn("Hostname * is not supported at the moment. Letting OpenShift create it instead.") + } + } else if strings.HasPrefix(originalHost, "*.") { + // FIXME: Update link below to version 4.5 when it's out + // Wildcards are not enabled by default in OCP 3.x. + // See https://docs.openshift.com/container-platform/3.11/install_config/router/default_haproxy_router.html#using-wildcard-routes + // FIXME(2): Is there a way to check if OCP supports wildcard and print out a warning if not? + wildcard = v1.WildcardPolicySubdomain + actualHost = "wildcard." + strings.TrimPrefix(originalHost, "*.") + } + + return actualHost, wildcard +} + +// hostHash applies a sha256 on the host and truncate it to the first 8 bytes +// This gives enough uniqueness for a given hostname +func hostHash(name string) string { + if name == "" { + name = "star" + } + + hash := sha256.Sum256([]byte(name)) + return hex.EncodeToString(hash[:8]) +} + +func (r *routeController) reconcileGateway(config *config.Config, routes []*v1.Route) error { + gateway, ok := config.Spec.(*networking.Gateway) + + if !ok { + return fmt.Errorf("could not decode spec as Gateway from %v", config) + } + + var err error + var namespacedName *model.NamespacedName + + namespacedName, err = r.findService(gateway) + + if err != nil { + return errors.Wrapf(err, "gateway %s/%s does not specify a valid service", config.Namespace, config.Name) + } + + serviceNamespace := namespacedName.Namespace + serviceName := namespacedName.Name + + routeMap := make(map[string]*v1.Route) + + for _, v := range routes { + routeMap[v.Name] = v + } + + var result *multierror.Error + + for _, server := range gateway.Servers { + for _, host := range server.Hosts { + var err error + + name := getRouteName(config.Namespace, config.Name, host) + + route, found := routeMap[name] + + if found { + _, err = r.updateRoute(config.Meta, host, server.Tls, serviceNamespace, serviceName, route) + + // We always want to remove the route to avoid getting deleted. + delete(routeMap, name) + } else { + _, err = r.createRoute(config.Meta, host, server.Tls, serviceNamespace, serviceName) + } + + if err != nil { + result = multierror.Append(result, err) + } + } + } + + for k, v := range routeMap { + IORLog.Debugf("clean up route %s for host %s", k, getHost(*v)) + if err := r.deleteRoute(v); err != nil { + result = multierror.Append(result, err) + } + } + + return result.ErrorOrNil() +} + +func (r *routeController) processEvent(old, curr *config.Config, event model.Event) error { + if IORLog.GetOutputLevel() >= log.DebugLevel { + debugMessage := fmt.Sprintf("event %v arrived:", event) + if event == model.EventUpdate { + debugMessage += fmt.Sprintf("\told object: %v", old) + } + debugMessage += fmt.Sprintf("\tnew object: %v", curr) + + IORLog.Debug(debugMessage) + } + + isManaged, err := isManagedByIOR(*curr) + if err != nil { + return err + } + + if !isManaged { + IORLog.Debugf("skipped processing routes for gateway %s/%s, as it is annotated by user", curr.Name, curr.Namespace) + return nil + } + + config := r.store.Get(collections.Gateway.GroupVersionKind(), curr.Name, curr.Namespace) + + var routes []*v1.Route + + routes, err = r.findRoutes(curr.Meta) + + if err != nil { + return errors.Wrapf(err, "error finding routes matching gateway %s/%s", curr.Name, curr.Namespace) + } + + if config != nil { + return r.reconcileGateway(curr, routes) + } + + var result *multierror.Error + + for _, route := range routes { + if err := r.deleteRoute(route); err != nil { + result = multierror.Append(result, err) + } + } + + return result.ErrorOrNil() +} + +func (r *routeController) Run(stop <-chan struct{}) { + var aliveLock sync.Mutex + alive := true + + IORLog.Debugf("Registering IOR into SMMR broadcast") + + go func(stop <-chan struct{}) { + <-stop + aliveLock.Lock() + defer aliveLock.Unlock() + alive = false + IORLog.Info("This pod is no longer a leader. IOR stopped responding") + }(stop) + + IORLog.Debugf("Registering IOR into Gateway broadcast") + kind := collections.Gateway.GroupVersionKind() + r.store.RegisterEventHandler(kind, func(old, curr config.Config, evt model.Event) { + aliveLock.Lock() + defer aliveLock.Unlock() + if alive { + err := r.processEvent(&old, &curr, evt) + if err != nil { + IORLog.Errorf("failed to process gateway %s/%s event %s: %s", curr.Name, curr.Namespace, evt.String(), err) + } + } + }) +} + +func getDefaultRouteLabelMap(name, namespace string) map[string]string { + return map[string]string{ + generatedByLabel: generatedByValue, + gatewayNamespaceLabel: namespace, + gatewayNameLabel: name, + } +} diff --git a/pilot/pkg/config/kube/ior/fake.go b/pilot/pkg/config/kube/ior/fake.go index 8c0f7990ad2..35227e32218 100644 --- a/pilot/pkg/config/kube/ior/fake.go +++ b/pilot/pkg/config/kube/ior/fake.go @@ -15,35 +15,11 @@ package ior import ( - "fmt" - "strings" - "sync" "time" - v1 "github.com/openshift/api/route/v1" - routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" - "golang.org/x/net/context" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/rest" - "istio.io/istio/pkg/kube" - "istio.io/istio/pkg/servicemesh/controller" ) -// FakeRouter implements routev1.RouteInterface -type FakeRouter struct { - routes map[string]*v1.Route - routesLock sync.Mutex -} - -// FakeRouterClient implements routev1.RouteV1Interface -type FakeRouterClient struct { - routesByNamespace map[string]routev1.RouteInterface - routesByNamespaceLock sync.Mutex -} - type fakeKubeClient struct { client kube.Client } @@ -64,191 +40,3 @@ func (c *fakeKubeClient) GetActualClient() kube.Client { func (c *fakeKubeClient) GetHandleEventTimeout() time.Duration { return time.Millisecond } - -// NewFakeRouterClient creates a new FakeRouterClient -func NewFakeRouterClient() routev1.RouteV1Interface { - return &FakeRouterClient{ - routesByNamespace: make(map[string]routev1.RouteInterface), - } -} - -// NewFakeRouter creates a new FakeRouter -func NewFakeRouter() routev1.RouteInterface { - return &FakeRouter{ - routes: make(map[string]*v1.Route), - } -} - -// RESTClient implements routev1.RouteV1Interface -func (rc *FakeRouterClient) RESTClient() rest.Interface { - panic("not implemented") -} - -// Routes implements routev1.RouteV1Interface -func (rc *FakeRouterClient) Routes(namespace string) routev1.RouteInterface { - rc.routesByNamespaceLock.Lock() - defer rc.routesByNamespaceLock.Unlock() - - if _, ok := rc.routesByNamespace[namespace]; !ok { - rc.routesByNamespace[namespace] = NewFakeRouter() - } - - countCallsIncrement("routes") - return rc.routesByNamespace[namespace] -} - -var generatedHostNumber int - -// Create implements routev1.RouteInterface -func (fk *FakeRouter) Create(ctx context.Context, route *v1.Route, opts metav1.CreateOptions) (*v1.Route, error) { - fk.routesLock.Lock() - defer fk.routesLock.Unlock() - - if strings.Contains(route.Spec.Host, "/") { - return nil, fmt.Errorf("invalid hostname") - } - - if route.Spec.Host == "" { - generatedHostNumber++ - route.Spec.Host = fmt.Sprintf("generated-host%d.com", generatedHostNumber) - } - - fk.routes[route.Name] = route - - countCallsIncrement("create") - return route, nil -} - -// Update implements routev1.RouteInterface -func (fk *FakeRouter) Update(ctx context.Context, route *v1.Route, opts metav1.UpdateOptions) (*v1.Route, error) { - panic("not implemented") -} - -// UpdateStatus implements routev1.RouteInterface -func (fk *FakeRouter) UpdateStatus(ctx context.Context, route *v1.Route, opts metav1.UpdateOptions) (*v1.Route, error) { - panic("not implemented") -} - -// Delete implements routev1.RouteInterface -func (fk *FakeRouter) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { - fk.routesLock.Lock() - defer fk.routesLock.Unlock() - - if _, ok := fk.routes[name]; !ok { - return fmt.Errorf("route %s not found", name) - } - - delete(fk.routes, name) - - countCallsIncrement("delete") - return nil -} - -// DeleteCollection implements routev1.RouteInterface -func (fk *FakeRouter) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { - panic("not implemented") -} - -// Get implements routev1.RouteInterface -func (fk *FakeRouter) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Route, error) { - panic("not implemented") -} - -// List implements routev1.RouteInterface -func (fk *FakeRouter) List(ctx context.Context, opts metav1.ListOptions) (*v1.RouteList, error) { - fk.routesLock.Lock() - defer fk.routesLock.Unlock() - - var items []v1.Route - for _, route := range fk.routes { - items = append(items, *route) - } - result := &v1.RouteList{Items: items} - - countCallsIncrement("list") - return result, nil -} - -// Watch Create implements routev1.RouteInterface -func (fk *FakeRouter) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - panic("not implemented") -} - -// Patch implements routev1.RouteInterface -func (fk *FakeRouter) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, - subresources ...string, -) (result *v1.Route, err error) { - panic("not implemented") -} - -// fakeMemberRollController implements controller.MemberRollController -type fakeMemberRollController struct { - listeners []controller.MemberRollListener - namespaces []string - lock sync.Mutex -} - -func newFakeMemberRollController() *fakeMemberRollController { - return &fakeMemberRollController{} -} - -// Register implements controller.MemberRollController -func (fk *fakeMemberRollController) Register(listener controller.MemberRollListener, name string) { - fk.lock.Lock() - defer fk.lock.Unlock() - - if listener == nil { - return - } - - // ensure that listener has no namespaces until the smmrc initializes it with the actual list of namespaces in the member roll - listener.SetNamespaces(nil) - - fk.listeners = append(fk.listeners, listener) -} - -// Start implements controller.MemberRollController -func (fk *fakeMemberRollController) Start(stopCh <-chan struct{}) { - panic("not implemented") -} - -func (fk *fakeMemberRollController) setNamespaces(namespaces ...string) { - fk.namespaces = namespaces - fk.invokeListeners() -} - -func (fk *fakeMemberRollController) invokeListeners() { - fk.lock.Lock() - defer fk.lock.Unlock() - - for _, l := range fk.listeners { - l.SetNamespaces(fk.namespaces) - } -} - -var ( - countCalls = map[string]int{} - countCallsLock sync.Mutex -) - -func countCallsReset() { - countCallsLock.Lock() - defer countCallsLock.Unlock() - countCalls = map[string]int{} -} - -func countCallsGet(k string) int { - countCallsLock.Lock() - defer countCallsLock.Unlock() - v, ok := countCalls[k] - if !ok { - v = 0 - } - return v -} - -func countCallsIncrement(k string) { - countCallsLock.Lock() - defer countCallsLock.Unlock() - countCalls[k]++ -} diff --git a/pilot/pkg/config/kube/ior/ior.go b/pilot/pkg/config/kube/ior/ior.go index 06332c511d0..65dda7cb341 100644 --- a/pilot/pkg/config/kube/ior/ior.go +++ b/pilot/pkg/config/kube/ior/ior.go @@ -15,83 +15,16 @@ package ior import ( - "fmt" - "sync" - - routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" - - networking "istio.io/api/networking/v1alpha3" "istio.io/istio/pilot/pkg/model" - "istio.io/istio/pkg/config" - "istio.io/istio/pkg/config/schema/collections" - "istio.io/istio/pkg/servicemesh/controller" + "istio.io/istio/pkg/kube" "istio.io/pkg/log" ) // IORLog is IOR-scoped log var IORLog = log.RegisterScope("ior", "IOR logging", 0) -// Register configures IOR component to respond to Gateway creations and removals -func Register( - k8sClient KubeClient, - routerClient routev1.RouteV1Interface, - store model.ConfigStoreController, - pilotNamespace string, - mrc controller.MemberRollController, - stop <-chan struct{}, - errorChannel chan error, -) error { - IORLog.Info("Registering IOR component") - - r := newRoute(k8sClient, routerClient, store, pilotNamespace, mrc, stop) - r.errorChannel = errorChannel - - alive := true - var aliveLock sync.Mutex - go func(stop <-chan struct{}) { - // Stop responding to events when we are no longer a leader. - // Two notes here: - // (1) There's no such method "UnregisterEventHandler()" - // (2) It might take a few seconds to this channel to be closed. So, both pods might be leader for a few seconds. - <-stop - IORLog.Info("This pod is no longer a leader. IOR stopped responding") - aliveLock.Lock() - alive = false - aliveLock.Unlock() - }(stop) - - IORLog.Debugf("Registering IOR into Istio's Gateway broadcast") - kind := collections.Gateway.GroupVersionKind() - store.RegisterEventHandler(kind, func(old, curr config.Config, event model.Event) { - aliveLock.Lock() - defer aliveLock.Unlock() - if !alive { - return - } - - // encapsulate in goroutine to not slow down processing because of waiting for mutex - go func() { - _, ok := curr.Spec.(*networking.Gateway) - if !ok { - IORLog.Errorf("could not decode object as Gateway. Object = %v", curr) - return - } - - debugMessage := fmt.Sprintf("Event %v arrived:", event) - if event == model.EventUpdate { - debugMessage += fmt.Sprintf("\tOld object: %v", old) - } - debugMessage += fmt.Sprintf("\tNew object: %v", curr) - IORLog.Debug(debugMessage) - - if err := r.handleEvent(event, curr); err != nil { - IORLog.Error(err) - if r.errorChannel != nil { - r.errorChannel <- err - } - } - }() - }) - - return nil +func Run(kubeClient kube.Client, store model.ConfigStoreController, stop <-chan struct{}) { + IORLog.Info("setting up IOR") + r := newRouteController(NewKubeClient(kubeClient), store) + r.Run(stop) } diff --git a/pilot/pkg/config/kube/ior/ior_test.go b/pilot/pkg/config/kube/ior/ior_test.go index 4bb303ddff1..9351b0f30a1 100644 --- a/pilot/pkg/config/kube/ior/ior_test.go +++ b/pilot/pkg/config/kube/ior/ior_test.go @@ -22,56 +22,70 @@ import ( "time" routeapiv1 "github.com/openshift/api/route/v1" - routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" - "github.com/stretchr/testify/assert" + routeclient "github.com/openshift/client-go/route/clientset/versioned" k8sioapicorev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" networking "istio.io/api/networking/v1alpha3" + istioclient "istio.io/client-go/pkg/clientset/versioned" "istio.io/istio/pilot/pkg/config/kube/crdclient" "istio.io/istio/pilot/pkg/model" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/schema/collections" "istio.io/istio/pkg/kube" - memberroll "istio.io/istio/pkg/servicemesh/controller" "istio.io/istio/pkg/test/util/retry" "istio.io/pkg/log" ) const prefixedLabel = maistraPrefix + "fake" -func initClients(t *testing.T, - stop <-chan struct{}, - errorChannel chan error, - mrc memberroll.MemberRollController, - register bool, -) (model.ConfigStoreController, KubeClient, routev1.RouteV1Interface) { +func newClients( + t *testing.T, + k8sClient kube.Client, +) ( + *crdclient.Client, + KubeClient, + routeclient.Interface, + *routeController, +) { t.Helper() - k8sClient := kube.NewFakeClient() + if k8sClient == nil { + k8sClient = kube.NewFakeClient() + } + iorKubeClient := NewFakeKubeClient(k8sClient) - routerClient := NewFakeRouterClient() store, err := crdclient.New(k8sClient, crdclient.Option{}) if err != nil { t.Fatal(err) } + r := newRouteController(iorKubeClient, store) + + return store, iorKubeClient, r.routeClient, r +} + +func runClients( + store model.ConfigStoreController, + kubeClient KubeClient, + stop <-chan struct{}, +) { go store.Run(stop) - k8sClient.RunAndWait(stop) - cache.WaitForCacheSync(stop, store.HasSynced) - retry.UntilSuccessOrFail(t, func() error { - if !store.HasSynced() { - return fmt.Errorf("store has not synced yet") - } - return nil - }, retry.Timeout(time.Second)) + kubeClient.GetActualClient().RunAndWait(stop) +} - if register { - if err := Register(iorKubeClient, routerClient, store, "istio-system", mrc, stop, errorChannel); err != nil { - t.Fatal(err) - } - } +func initClients( + t *testing.T, + stop <-chan struct{}, +) ( + model.ConfigStoreController, + KubeClient, + routeclient.Interface, +) { + store, iorKubeClient, routerClient, r := newClients(t, nil) + + r.Run(stop) + runClients(store, iorKubeClient, stop) return store, iorKubeClient, routerClient } @@ -147,16 +161,6 @@ func TestCreate(t *testing.T) { true, nil, }, - { - "Non-existing namespace", - "non-existing", - []string{"fail.com"}, - map[string]string{"istio": "ingressgateway"}, - 0, - "could not handle the ADD event for non-existing", - false, - nil, - }, { "Gateway not managed", "istio-system", @@ -241,74 +245,29 @@ func TestCreate(t *testing.T) { IORLog.SetOutputLevel(log.DebugLevel) - var stop chan struct{} - var errorChannel chan error - var store model.ConfigStoreController - var k8sClient KubeClient - var routerClient routev1.RouteV1Interface - var mrc *fakeMemberRollController controlPlaneNs := "istio-system" + stop := make(chan struct{}) + defer func() { close(stop) }() + store, k8sClient, routerClient := initClients(t, stop) - for _, testType := range []string{"initialSync", "events"} { - if testType == "events" { - stop = make(chan struct{}) - defer func() { close(stop) }() - errorChannel = make(chan error) - mrc = newFakeMemberRollController() - store, k8sClient, routerClient = initClients(t, stop, errorChannel, mrc, true) - mrc.setNamespaces(controlPlaneNs) + createIngressGateway(t, k8sClient.GetActualClient(), controlPlaneNs, map[string]string{"istio": "ingressgateway"}) - createIngressGateway(t, k8sClient.GetActualClient(), controlPlaneNs, map[string]string{"istio": "ingressgateway"}) - } + for i, c := range cases { + t.Run(c.testName, func(t *testing.T) { + gatewayName := fmt.Sprintf("gw%d", i) + createGateway(t, store, c.ns, gatewayName, c.hosts, c.gwSelector, c.tls, c.annotations) - for i, c := range cases { - t.Run(testType+"-"+c.testName, func(t *testing.T) { - if testType == "initialSync" { - stop = make(chan struct{}) - defer func() { close(stop) }() - errorChannel = make(chan error) - mrc = newFakeMemberRollController() - store, k8sClient, routerClient = initClients(t, stop, errorChannel, mrc, false) - createIngressGateway(t, k8sClient.GetActualClient(), controlPlaneNs, map[string]string{"istio": "ingressgateway"}) - if err := Register(k8sClient, routerClient, store, controlPlaneNs, mrc, stop, errorChannel); err != nil { - t.Fatal(err) - } - } - gatewayName := fmt.Sprintf("gw%d", i) - createGateway(t, store, c.ns, gatewayName, c.hosts, c.gwSelector, c.tls, c.annotations) - if testType == "initialSync" { - mrc.setNamespaces(controlPlaneNs) - } - list, _ := getRoutes(t, routerClient, controlPlaneNs, c.expectedRoutes, time.Second) - if err := getError(errorChannel); err != nil { - if c.expectedError == "" { - t.Fatal(err) - } - - if !strings.Contains(err.Error(), c.expectedError) { - t.Fatalf("expected error message containing `%s', got: %s", c.expectedError, err.Error()) - } - - // Error is expected and matches the golden string, nothing to do - } else { - if c.expectedError != "" { - t.Fatalf("expected error message containing `%s', got success", c.expectedError) - } - - // Only continue the validation if any route is expected to be created - if c.expectedRoutes > 0 { - validateRoutes(t, c.hosts, list, gatewayName, c.tls) - - // Remove the gateway and expect all routes get removed - deleteGateway(t, store, c.ns, gatewayName) - _, _ = getRoutes(t, routerClient, c.ns, 0, time.Second) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } - } - } - }) - } + list := getRoutes(t, routerClient, controlPlaneNs, c.expectedRoutes, time.Second) + + // Only continue the validation if any route is expected to be created + if c.expectedRoutes > 0 { + validateRoutes(t, c.hosts, list, gatewayName, c.tls) + + // Remove the gateway and expect all routes get removed + deleteGateway(t, k8sClient.GetActualClient().Istio(), c.ns, gatewayName) + _ = getRoutes(t, routerClient, c.ns, 0, time.Second) + } + }) } } @@ -413,189 +372,94 @@ func TestEdit(t *testing.T) { }, } + IORLog.SetOutputLevel(log.DebugLevel) + stop := make(chan struct{}) defer func() { close(stop) }() - errorChannel := make(chan error) - mrc := newFakeMemberRollController() - store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, true) + store, k8sClient, routerClient := initClients(t, stop) controlPlane := "istio-system" createIngressGateway(t, k8sClient.GetActualClient(), controlPlane, map[string]string{"istio": "ingressgateway"}) createGateway(t, store, controlPlane, "gw", []string{"abc.com"}, map[string]string{"istio": "ingressgateway"}, false, nil) - mrc.setNamespaces("istio-system") - list, _ := getRoutes(t, routerClient, controlPlane, 1, time.Second) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } + list := getRoutes(t, routerClient, controlPlane, 1, time.Second) for i, c := range cases { t.Run(c.testName, func(t *testing.T) { editGateway(t, store, c.ns, "gw", c.hosts, c.gwSelector, c.tls, fmt.Sprintf("%d", i+2)) - list, _ = getRoutes(t, routerClient, controlPlane, c.expectedRoutes, time.Second) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } + list = getRoutes(t, routerClient, controlPlane, c.expectedRoutes, time.Second) validateRoutes(t, c.hosts, list, "gw", c.tls) }) } } -// TestPerf makes sure we are not doing more API calls than necessary -func TestPerf(t *testing.T) { - IORLog.SetOutputLevel(log.DebugLevel) - countCallsReset() - - stop := make(chan struct{}) - defer func() { close(stop) }() - errorChannel := make(chan error) - mrc := newFakeMemberRollController() - store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, true) - - // Create a bunch of namespaces and gateways, and make sure they don't take too long to be created - createIngressGateway(t, k8sClient.GetActualClient(), "istio-system", map[string]string{"istio": "ingressgateway"}) - qty := 100 - qtyNamespaces := qty + 1 - createGateways(t, store, 1, qty) - mrc.setNamespaces(generateNamespaces(qty)...) - - // It takes ~ 2s on my laptop, it's slower on prow - _, ignore := getRoutes(t, routerClient, "istio-system", qty, time.Minute) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } - assert.Equal(t, qty, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") - assert.Equal(t, 0, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") - assert.Equal(t, qtyNamespaces, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") - // qty=number of Create() calls; qtyNamespaces=number of List() calls - assert.Equal(t, qty+qtyNamespaces, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") - - // Now we have a lot of routes created, let's create one more gateway. We don't expect a lot of new API calls - countCallsReset() - createGateway(t, store, "ns1", "gw-ns1-1", []string{"instant.com"}, map[string]string{"istio": "ingressgateway"}, false, nil) - _, ignore = getRoutes(t, routerClient, "istio-system", qty+1, time.Second) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } - assert.Equal(t, 1, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") - assert.Equal(t, 0, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") - assert.Equal(t, 0, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") - assert.Equal(t, 1, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") - - // Editing. We don't expect a lot of new API calls - countCallsReset() - editGateway(t, store, "ns1", "gw-ns1-1", []string{"edited.com", "edited-other.com"}, map[string]string{"istio": "ingressgateway"}, false, "2") - _, ignore = getRoutes(t, routerClient, "istio-system", qty+2, time.Second) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } - assert.Equal(t, 2, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") - assert.Equal(t, 1, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") - assert.Equal(t, 0, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") - assert.Equal(t, 3, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") - - // Same for deletion. We don't expect a lot of new API calls - countCallsReset() - deleteGateway(t, store, "ns1", "gw-ns1-1") - _, ignore = getRoutes(t, routerClient, "istio-system", qty, time.Second) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } - assert.Equal(t, 0, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") - assert.Equal(t, 2, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") - assert.Equal(t, 0, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") - assert.Equal(t, 2, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") -} - // TestConcurrency makes sure IOR can respond to events even when doing its initial sync func TestConcurrency(t *testing.T) { IORLog.SetOutputLevel(log.DebugLevel) stop := make(chan struct{}) defer func() { close(stop) }() - errorChannel := make(chan error) - mrc := newFakeMemberRollController() - store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, true) + store, k8sClient, routerClient := initClients(t, stop) + + qty := 10 + runs := 10 // Create a bunch of namespaces and gateways createIngressGateway(t, k8sClient.GetActualClient(), "istio-system", map[string]string{"istio": "ingressgateway"}) - qty := 50 - createGateways(t, store, 1, qty) - mrc.setNamespaces(generateNamespaces(qty)...) // At the same time, while IOR is processing those initial `qty` gateways, create `qty` more - go func() { - mrc.setNamespaces(generateNamespaces(qty * 2)...) - createGateways(t, store, qty+1, qty*2) - }() + for i := 0; i < runs; i++ { + go func(j int) { + createGateways(t, store, (qty*j)+1, (qty*j)+qty) + }(i) + } // And expect all `qty * 2` gateways to be created - _, _ = getRoutes(t, routerClient, "istio-system", (qty * 2), time.Minute) - if err := getError(errorChannel); err != nil { - t.Fatal(err) - } + _ = getRoutes(t, routerClient, "istio-system", (qty * runs), time.Minute) } -func TestDuplicateUpdateEvents(t *testing.T) { - IORLog.SetOutputLevel(log.DebugLevel) - stop := make(chan struct{}) - defer func() { close(stop) }() - errorChannel := make(chan error) - mrc := newFakeMemberRollController() - store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, false) - - r := newRoute(k8sClient, routerClient, store, "istio-system", mrc, stop) +func TestStatelessness(t *testing.T) { + type state struct { + name string + ns string + hosts []string + gwSelector map[string]string + expectedRoutes int + tls bool + } - mrc.setNamespaces("istio-system") - createIngressGateway(t, k8sClient.GetActualClient(), "istio-system", map[string]string{"istio": "ingressgateway"}) + watchedNamespace := "istio-system" - cfg := config.Config{ - Meta: config.Meta{ - GroupVersionKind: collections.Gateway.GroupVersionKind(), - Namespace: "istio-system", - Name: "a", - ResourceVersion: "1", - }, - Spec: &networking.Gateway{ - Servers: []*networking.Server{ - { - Hosts: []string{"a.com"}, - }, - }, - }, + initialState := state{ + "gw", + watchedNamespace, + []string{"ghi.org", "jkl.com"}, + map[string]string{"istio": "ingressgateway"}, + 2, + false, } - // Create the first router, should work just fine - err := r.handleEvent(model.EventAdd, cfg) - if err != nil { - t.Fatal(err) - } - func() { - r.gatewaysLock.Lock() - defer r.gatewaysLock.Unlock() - if len(r.gatewaysMap) != 1 { - t.Fatal("error creating the first route") - } - }() + IORLog.SetOutputLevel(log.DebugLevel) - // Simulate an UPDATE event with the same data, should be ignored - err = r.handleEvent(model.EventUpdate, cfg) - if err == nil { - t.Fatalf("expecting the error: %q, but got nothing", eventDuplicatedMessage) - } - if msg := err.Error(); msg != eventDuplicatedMessage { - t.Fatalf("expecting the error: %q, but got %q", eventDuplicatedMessage, msg) - } -} + stop := make(chan struct{}) + defer func() { close(stop) }() + iorStop := make(chan struct{}) + store, kubeClient, routerClient, r := newClients(t, nil) + r.Run(iorStop) + runClients(store, kubeClient, stop) -func generateNamespaces(qty int) []string { - var result []string + createIngressGateway(t, kubeClient.GetActualClient(), watchedNamespace, map[string]string{"istio": "ingressgateway"}) + createGateway(t, store, initialState.ns, initialState.name, initialState.hosts, map[string]string{"istio": "ingressgateway"}, initialState.tls, nil) - for i := 1; i <= qty; i++ { - result = append(result, fmt.Sprintf("ns%d", i)) - } + list := getRoutes(t, routerClient, watchedNamespace, 2, time.Second) + validateRoutes(t, initialState.hosts, list, initialState.name, initialState.tls) + + close(iorStop) - return append(result, "istio-system") + backupIOR := newRouteController(kubeClient, store) + backupIOR.Run(stop) + + store.HasSynced() } func createGateways(t *testing.T, store model.ConfigStoreController, begin, end int) { @@ -611,24 +475,9 @@ func createGateways(t *testing.T, store model.ConfigStoreController, begin, end } } -// getError tries to read an error from the error channel. -// It tries 3 times beforing returning nil, in case of there's no error in the channel, -// this is to give some time to async functions to run and fill the channel properly -func getError(errorChannel chan error) error { - for i := 1; i < 3; i++ { - select { - case err := <-errorChannel: - return err - default: - } - time.Sleep(10 * time.Millisecond) - } - return nil -} - // getRoutes is a helper function that keeps trying getting a list of routes until it gets `size` items. // It returns the list of routes itself and the number of retries it run -func getRoutes(t *testing.T, routerClient routev1.RouteV1Interface, ns string, size int, timeout time.Duration) (*routeapiv1.RouteList, int) { +func getRoutes(t *testing.T, routerClient routeclient.Interface, ns string, size int, timeout time.Duration) *routeapiv1.RouteList { var list *routeapiv1.RouteList t.Helper() @@ -637,8 +486,8 @@ func getRoutes(t *testing.T, routerClient routev1.RouteV1Interface, ns string, s retry.UntilSuccessOrFail(t, func() error { var err error - time.Sleep(time.Millisecond * 100) - list, err = routerClient.Routes(ns).List(context.TODO(), v1.ListOptions{}) + time.Sleep(time.Second) + list, err = routerClient.RouteV1().Routes(ns).List(context.TODO(), v1.ListOptions{}) count++ if err != nil { return err @@ -649,7 +498,7 @@ func getRoutes(t *testing.T, routerClient routev1.RouteV1Interface, ns string, s return nil }, retry.Timeout(timeout)) - return list, count + return list } func findRouteByHost(list *routeapiv1.RouteList, host string) *routeapiv1.Route { @@ -684,8 +533,8 @@ func createService(t *testing.T, client kube.Client, ns string, labels map[strin t.Helper() _, err := client.Kube().CoreV1().Services(ns).Create(context.TODO(), &k8sioapicorev1.Service{ - ObjectMeta: v1.ObjectMeta{ - Labels: labels, + Spec: k8sioapicorev1.ServiceSpec{ + Selector: labels, }, }, v1.CreateOptions{}) if err != nil { @@ -764,10 +613,11 @@ func editGateway(t *testing.T, store model.ConfigStoreController, ns string, nam } } -func deleteGateway(t *testing.T, store model.ConfigStoreController, ns string, name string) { +func deleteGateway(t *testing.T, istioClient istioclient.Interface, ns string, name string) { t.Helper() - err := store.Delete(collections.Gateway.GroupVersionKind(), name, ns, nil) + var immediate int64 + err := istioClient.NetworkingV1alpha3().Gateways(ns).Delete(context.TODO(), name, v1.DeleteOptions{GracePeriodSeconds: &immediate}) if err != nil { t.Fatal(err) } diff --git a/pilot/pkg/config/kube/ior/route.go b/pilot/pkg/config/kube/ior/route.go deleted file mode 100644 index 43b3680ecc7..00000000000 --- a/pilot/pkg/config/kube/ior/route.go +++ /dev/null @@ -1,627 +0,0 @@ -// Copyright Red Hat, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ior - -import ( - "crypto/sha256" - "encoding/hex" - "fmt" - "strconv" - "strings" - "sync" - "time" - - "github.com/hashicorp/go-multierror" - v1 "github.com/openshift/api/route/v1" - routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" - "golang.org/x/net/context" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - networking "istio.io/api/networking/v1alpha3" - "istio.io/istio/pilot/pkg/model" - "istio.io/istio/pkg/config" - "istio.io/istio/pkg/config/schema/collections" - "istio.io/istio/pkg/kube" - "istio.io/istio/pkg/servicemesh/controller" -) - -const ( - maistraPrefix = "maistra.io/" - generatedByLabel = maistraPrefix + "generated-by" - generatedByValue = "ior" - originalHostAnnotation = maistraPrefix + "original-host" - gatewayNameLabel = maistraPrefix + "gateway-name" - gatewayNamespaceLabel = maistraPrefix + "gateway-namespace" - gatewayResourceVersionLabel = maistraPrefix + "gateway-resourceVersion" - ShouldManageRouteAnnotation = maistraPrefix + "manageRoute" - - eventDuplicatedMessage = "event UPDATE arrived but resourceVersions are the same - ignoring" -) - -type syncRoutes struct { - metadata config.Meta - gateway *networking.Gateway - routes []*v1.Route -} - -// route manages the integration between Istio Gateways and OpenShift Routes -type route struct { - pilotNamespace string - routerClient routev1.RouteV1Interface - kubeClient kubernetes.Interface - store model.ConfigStoreController - gatewaysMap map[string]*syncRoutes - gatewaysLock sync.Mutex - initialSyncRun chan struct{} - alive bool - stop <-chan struct{} - handleEventTimeout time.Duration - errorChannel chan error - - // memberroll functionality - mrc controller.MemberRollController - namespaceLock sync.Mutex - namespaces []string - gotInitialUpdate bool -} - -// NewRouterClient returns an OpenShift client for Routers -func NewRouterClient() (routev1.RouteV1Interface, error) { - config, err := kube.BuildClientConfig("", "") - if err != nil { - return nil, err - } - - client, err := routev1.NewForConfig(config) - if err != nil { - return nil, err - } - - return client, nil -} - -// newRoute returns a new instance of Route object -func newRoute( - kubeClient KubeClient, - routerClient routev1.RouteV1Interface, - store model.ConfigStoreController, - pilotNamespace string, - mrc controller.MemberRollController, - stop <-chan struct{}, -) *route { - for !kubeClient.IsRouteSupported() { - IORLog.Infof("routes are not supported in this cluster; waiting for Route resource to become available...") - time.Sleep(10 * time.Second) - } - - r := &route{} - - r.kubeClient = kubeClient.GetActualClient().Kube() - r.routerClient = routerClient - r.pilotNamespace = pilotNamespace - r.store = store - r.mrc = mrc - r.namespaces = []string{pilotNamespace} - r.stop = stop - r.initialSyncRun = make(chan struct{}) - r.handleEventTimeout = kubeClient.GetHandleEventTimeout() - - if r.mrc != nil { - IORLog.Debugf("Registering IOR into SMMR broadcast") - r.alive = true - r.mrc.Register(r, "ior") - - go func(stop <-chan struct{}) { - <-stop - r.alive = false - IORLog.Debugf("Unregistering IOR from SMMR broadcast") - }(stop) - } - - return r -} - -// initialSync runs on initialization only. -// -// It lists all Istio Gateways (source of truth) and OpenShift Routes, compares them and makes the necessary adjustments -// (creation and/or removal of routes) so that gateways and routes be in sync. -func (r *route) initialSync(initialNamespaces []string) error { - var result *multierror.Error - r.gatewaysMap = make(map[string]*syncRoutes) - - r.gatewaysLock.Lock() - defer r.gatewaysLock.Unlock() - - // List the gateways and put them into the gatewaysMap - // The store must be synced otherwise we might get an empty list - // We enforce this before calling this function in UpdateNamespaces() - configs := r.store.List(collections.Gateway.GroupVersionKind(), model.NamespaceAll) - - IORLog.Debugf("initialSync() - Got %d Gateway(s)", len(configs)) - - for i, cfg := range configs { - if err := r.ensureNamespaceExists(cfg); err != nil { - result = multierror.Append(result, err) - continue - } - manageRoute, err := isManagedByIOR(cfg) - if err != nil { - result = multierror.Append(result, err) - continue - } - if !manageRoute { - IORLog.Debugf("initialSync() - Ignoring Gateway %s/%s as it is not managed by Istiod", cfg.Namespace, cfg.Name) - continue - } - - IORLog.Debugf("initialSync() - Parsing Gateway [%d] %s/%s", i+1, cfg.Namespace, cfg.Name) - r.addNewSyncRoute(cfg) - } - - // List the routes and put them into a map. Map key is the route object name - routes := map[string]v1.Route{} - for _, ns := range initialNamespaces { - IORLog.Debugf("initialSync() - Listing routes in ns %s", ns) - routeList, err := r.routerClient.Routes(ns).List(context.TODO(), metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", generatedByLabel, generatedByValue), - }) - if err != nil { - return fmt.Errorf("could not get list of Routes in namespace %s: %s", ns, err) - } - for _, route := range routeList.Items { - routes[route.Name] = route - } - } - IORLog.Debugf("initialSync() - Got %d route(s) across all %d namespace(s)", len(routes), len(initialNamespaces)) - - // Now that we have maps and routes mapped we can compare them (Gateways are the source of truth) - for _, syncRoute := range r.gatewaysMap { - for _, server := range syncRoute.gateway.Servers { - for _, host := range server.Hosts { - actualHost, _ := getActualHost(host, false) - routeName := getRouteName(syncRoute.metadata.Namespace, syncRoute.metadata.Name, actualHost) - route, ok := routes[routeName] - if ok { - // A route for this host was found, remove its entry in this map so that in the end only orphan routes are left - delete(routes, routeName) - - // Route matches, no need to create one. Put it in the gatewaysMap and move to the next one - if syncRoute.metadata.ResourceVersion == route.Labels[gatewayResourceVersionLabel] { - syncRoute.routes = append(syncRoute.routes, &route) - continue - } - - // Route does not match, remove it. - result = multierror.Append(result, r.deleteRoute(&route)) - } - - // Route is not found or was removed above because it didn't match. We need to create one now. - route2, err := r.createRoute(syncRoute.metadata, syncRoute.gateway, host, server.Tls) - if err != nil { - result = multierror.Append(result, err) - } else { - // Put it in the gatewaysMap and move to the next one - syncRoute.routes = append(syncRoute.routes, route2) - } - } - } - } - - // At this point there are routes for every hostname in every Gateway. - // The `routes` map should only contain "orphan" routes, i.e., routes that do not belong to any Gateway - // - for _, route := range routes { - result = multierror.Append(result, r.deleteRoute(&route)) - } - - return result.ErrorOrNil() -} - -func gatewaysMapKey(namespace, name string) string { - return namespace + "/" + name -} - -// addNewSyncRoute creates a new syncRoutes and adds it to the gatewaysMap -// Must be called with gatewaysLock locked -func (r *route) addNewSyncRoute(cfg config.Config) *syncRoutes { - gw := cfg.Spec.(*networking.Gateway) - syncRoute := &syncRoutes{ - metadata: cfg.Meta, - gateway: gw, - } - - r.gatewaysMap[gatewaysMapKey(cfg.Namespace, cfg.Name)] = syncRoute - return syncRoute -} - -// ensureNamespaceExists makes sure the gateway namespace is present in r.namespaces -// r.namespaces is updated by the SMMR controller, in SetNamespaces() -// This handles the case where an ADD event comes before SetNamespaces() is called and -// the unlikely case an ADD event arrives for a gateway whose namespace does not belong to the SMMR at all -func (r *route) ensureNamespaceExists(cfg config.Config) error { - timeout := time.After(r.handleEventTimeout) // production default is 10s, but test default is only 1ms - - for { - r.namespaceLock.Lock() - namespaces := r.namespaces - r.namespaceLock.Unlock() - - for _, ns := range namespaces { - if ns == cfg.Namespace { - IORLog.Debugf("Namespace %s found in SMMR", cfg.Namespace) - return nil - } - } - - select { - case <-timeout: - IORLog.Debugf("Namespace %s not found in SMMR. Aborting.", cfg.Namespace) - return fmt.Errorf("could not handle the ADD event for %s/%s: SMMR does not recognize this namespace", cfg.Namespace, cfg.Name) - default: - IORLog.Debugf("Namespace %s not found in SMMR, trying again", cfg.Namespace) - } - time.Sleep(r.handleEventTimeout / 100) - } -} - -func (r *route) handleAdd(cfg config.Config) error { - var result *multierror.Error - - if err := r.ensureNamespaceExists(cfg); err != nil { - return err - } - - r.gatewaysLock.Lock() - defer r.gatewaysLock.Unlock() - - if _, ok := r.gatewaysMap[gatewaysMapKey(cfg.Namespace, cfg.Name)]; ok { - IORLog.Infof("gateway %s/%s already exists, not creating route(s) for it", cfg.Namespace, cfg.Name) - return nil - } - - syncRoute := r.addNewSyncRoute(cfg) - - for _, server := range syncRoute.gateway.Servers { - for _, host := range server.Hosts { - route, err := r.createRoute(cfg.Meta, syncRoute.gateway, host, server.Tls) - if err != nil { - result = multierror.Append(result, err) - } else { - syncRoute.routes = append(syncRoute.routes, route) - } - } - } - - return result.ErrorOrNil() -} - -func isManagedByIOR(cfg config.Config) (bool, error) { - // We don't manage egress gateways, but we can only look for the default label here. - // Users can still use generic labels (e.g. "app: my-ingressgateway" as in the istio docs) to refer to the gateway pod - gw := cfg.Spec.(*networking.Gateway) - if istioLabel, ok := gw.Selector["istio"]; ok && istioLabel == "egressgateway" { - return false, nil - } - - manageRouteValue, ok := cfg.Annotations[ShouldManageRouteAnnotation] - if !ok { - // Manage routes by default, when annotation is not found. - return true, nil - } - - manageRoute, err := strconv.ParseBool(manageRouteValue) - if err != nil { - return false, fmt.Errorf("could not parse annotation %q: %s", ShouldManageRouteAnnotation, err) - } - - return manageRoute, nil -} - -func (r *route) handleDel(cfg config.Config) error { - var result *multierror.Error - - r.gatewaysLock.Lock() - defer r.gatewaysLock.Unlock() - - key := gatewaysMapKey(cfg.Namespace, cfg.Name) - syncRoute, ok := r.gatewaysMap[key] - if !ok { - return fmt.Errorf("could not find an internal reference to gateway %s/%s", cfg.Namespace, cfg.Name) - } - - IORLog.Debugf("The gateway %s/%s has %d route(s) associated with it. Removing them now.", cfg.Namespace, cfg.Name, len(syncRoute.routes)) - for _, route := range syncRoute.routes { - result = multierror.Append(result, r.deleteRoute(route)) - } - - delete(r.gatewaysMap, key) - - return result.ErrorOrNil() -} - -func (r *route) verifyResourceVersions(cfg config.Config) error { - r.gatewaysLock.Lock() - defer r.gatewaysLock.Unlock() - - key := gatewaysMapKey(cfg.Namespace, cfg.Name) - syncRoute, ok := r.gatewaysMap[key] - if !ok { - return fmt.Errorf("could not find an internal reference to gateway %s/%s", cfg.Namespace, cfg.Name) - } - - if syncRoute.metadata.ResourceVersion != cfg.ResourceVersion { - return nil - } - - return fmt.Errorf(eventDuplicatedMessage) -} - -func (r *route) handleEvent(event model.Event, cfg config.Config) error { - // Block until initial sync has finished - <-r.initialSyncRun - - manageRoute, err := isManagedByIOR(cfg) - if err != nil { - return err - } - if !manageRoute { - IORLog.Infof("Ignoring Gateway %s/%s as it is not managed by Istiod", cfg.Namespace, cfg.Name) - return nil - } - - switch event { - case model.EventAdd: - return r.handleAdd(cfg) - - case model.EventUpdate: - if err = r.verifyResourceVersions(cfg); err != nil { - return err - } - - var result *multierror.Error - result = multierror.Append(result, r.handleDel(cfg)) - result = multierror.Append(result, r.handleAdd(cfg)) - return result.ErrorOrNil() - - case model.EventDelete: - return r.handleDel(cfg) - } - - return fmt.Errorf("unknown event type %s", event) -} - -// Trigerred by SMMR controller when SMMR changes -func (r *route) SetNamespaces(namespaces []string) { - if !r.alive { - return - } - - if namespaces == nil { - return - } - - IORLog.Debugf("UpdateNamespaces(%v)", namespaces) - r.namespaceLock.Lock() - r.namespaces = namespaces - r.namespaceLock.Unlock() - - if r.gotInitialUpdate { - return - } - r.gotInitialUpdate = true - - // In the first update we perform an initial sync - go func() { - // But only after gateway store cache is synced - IORLog.Debug("Waiting for the Gateway store cache to sync before performing our initial sync") - if !cache.WaitForNamedCacheSync("Gateways", r.stop, r.store.HasSynced) { - IORLog.Infof("Failed to sync Gateway store cache. Not performing initial sync.") - return - } - IORLog.Debug("Gateway store cache synced. Performing our initial sync now") - - if err := r.initialSync(namespaces); err != nil { - IORLog.Error(err) - if r.errorChannel != nil { - r.errorChannel <- err - } - } - IORLog.Debug("Initial sync finished") - close(r.initialSyncRun) - }() -} - -func getHost(route v1.Route) string { - if host := route.ObjectMeta.Annotations[originalHostAnnotation]; host != "" { - return host - } - return route.Spec.Host -} - -func (r *route) deleteRoute(route *v1.Route) error { - var immediate int64 - host := getHost(*route) - err := r.routerClient.Routes(route.Namespace).Delete(context.TODO(), route.ObjectMeta.Name, metav1.DeleteOptions{GracePeriodSeconds: &immediate}) - if err != nil { - return fmt.Errorf("error deleting route %s/%s: %s", route.ObjectMeta.Namespace, route.ObjectMeta.Name, err) - } - - IORLog.Infof("Deleted route %s/%s (gateway hostname: %s)", route.ObjectMeta.Namespace, route.ObjectMeta.Name, host) - return nil -} - -func (r *route) createRoute(metadata config.Meta, gateway *networking.Gateway, originalHost string, tls *networking.ServerTLSSettings) (*v1.Route, error) { - IORLog.Debugf("Creating route for hostname %s", originalHost) - actualHost, wildcard := getActualHost(originalHost, true) - - var tlsConfig *v1.TLSConfig - targetPort := "http2" - if tls != nil { - tlsConfig = &v1.TLSConfig{Termination: v1.TLSTerminationPassthrough} - targetPort = "https" - if tls.HttpsRedirect { - tlsConfig.InsecureEdgeTerminationPolicy = v1.InsecureEdgeTerminationPolicyRedirect - } - } - - serviceNamespace, serviceName, err := r.findService(gateway) - if err != nil { - return nil, err - } - - // Copy annotations - annotations := map[string]string{ - originalHostAnnotation: originalHost, - } - for keyName, keyValue := range metadata.Annotations { - if !strings.HasPrefix(keyName, "kubectl.kubernetes.io") && keyName != ShouldManageRouteAnnotation { - annotations[keyName] = keyValue - } - } - - // Copy labels - labels := map[string]string{ - generatedByLabel: generatedByValue, - gatewayNamespaceLabel: metadata.Namespace, - gatewayNameLabel: metadata.Name, - gatewayResourceVersionLabel: metadata.ResourceVersion, - } - for keyName, keyValue := range metadata.Labels { - if !strings.HasPrefix(keyName, maistraPrefix) { - labels[keyName] = keyValue - } - } - - nr, err := r.routerClient.Routes(serviceNamespace).Create(context.TODO(), &v1.Route{ - ObjectMeta: metav1.ObjectMeta{ - Name: getRouteName(metadata.Namespace, metadata.Name, actualHost), - Namespace: serviceNamespace, - Labels: labels, - Annotations: annotations, - }, - Spec: v1.RouteSpec{ - Host: actualHost, - Port: &v1.RoutePort{ - TargetPort: intstr.IntOrString{ - Type: intstr.String, - StrVal: targetPort, - }, - }, - To: v1.RouteTargetReference{ - Name: serviceName, - }, - TLS: tlsConfig, - WildcardPolicy: wildcard, - }, - }, metav1.CreateOptions{}) - if err != nil { - return nil, fmt.Errorf("error creating a route for the host %s (gateway: %s/%s): %s", originalHost, metadata.Namespace, metadata.Name, err) - } - - IORLog.Infof("Created route %s/%s for hostname %s (gateway: %s/%s)", - nr.ObjectMeta.Namespace, nr.ObjectMeta.Name, - nr.Spec.Host, - metadata.Namespace, metadata.Name) - - return nr, nil -} - -// findService tries to find a service that matches with the given gateway selector -// Returns the namespace and service name that is a match, or an error -func (r *route) findService(gateway *networking.Gateway) (string, string, error) { - r.namespaceLock.Lock() - namespaces := r.namespaces - r.namespaceLock.Unlock() - - gwSelector := labels.SelectorFromSet(gateway.Selector) - - for _, ns := range namespaces { - // Get the list of pods that match the gateway selector - podList, err := r.kubeClient.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: gwSelector.String()}) - if err != nil { - return "", "", fmt.Errorf("could not get the list of pods in namespace %s: %v", ns, err) - } - - // Get the list of services in this namespace - svcList, err := r.kubeClient.CoreV1().Services(ns).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return "", "", fmt.Errorf("could not get the list of services in namespace %s: %v", ns, err) - } - - // Look for a service whose selector matches the pod labels - for _, pod := range podList.Items { - podLabels := labels.Set(pod.ObjectMeta.Labels) - - for _, svc := range svcList.Items { - svcSelector := labels.SelectorFromSet(svc.Spec.Selector) - if svcSelector.Matches(podLabels) { - return ns, svc.Name, nil - } - } - } - } - - return "", "", fmt.Errorf("could not find a service that matches the gateway selector `%s'. Namespaces where we looked at: %v", - gwSelector.String(), namespaces) -} - -func getRouteName(namespace, name, actualHost string) string { - return fmt.Sprintf("%s-%s-%s", namespace, name, hostHash(actualHost)) -} - -// getActualHost returns the actual hostname to be used in the route -// `emitWarning` should be false when this function is used internally, without user interaction -// It also returns the route's WildcardPolicy based on the hostname -func getActualHost(originalHost string, emitWarning bool) (string, v1.WildcardPolicyType) { - wildcard := v1.WildcardPolicyNone - - if strings.Contains(originalHost, "/") { - originalHost = strings.SplitN(originalHost, "/", 2)[1] - IORLog.Debugf("Hostname contains a namespace part. Ignoring it and considering the %q portion.", originalHost) - } - - actualHost := originalHost - - if originalHost == "*" { - actualHost = "" - if emitWarning { - IORLog.Warn("Hostname * is not supported at the moment. Letting OpenShift create it instead.") - } - } else if strings.HasPrefix(originalHost, "*.") { - // FIXME: Update link below to version 4.5 when it's out - // Wildcards are not enabled by default in OCP 3.x. - // See https://docs.openshift.com/container-platform/3.11/install_config/router/default_haproxy_router.html#using-wildcard-routes - // FIXME(2): Is there a way to check if OCP supports wildcard and print out a warning if not? - wildcard = v1.WildcardPolicySubdomain - actualHost = "wildcard." + strings.TrimPrefix(originalHost, "*.") - } - - return actualHost, wildcard -} - -// hostHash applies a sha256 on the host and truncate it to the first 8 bytes -// This gives enough uniqueness for a given hostname -func hostHash(name string) string { - if name == "" { - name = "star" - } - - hash := sha256.Sum256([]byte(name)) - return hex.EncodeToString(hash[:8]) -} diff --git a/pkg/kube/client.go b/pkg/kube/client.go index f5b71e0d60f..a0a57f036f7 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -33,7 +33,11 @@ import ( xnsgatewayapiinformer "github.com/maistra/xns-informer/pkg/generated/gatewayapi" xnsistioinformer "github.com/maistra/xns-informer/pkg/generated/istio" xnskubeinformer "github.com/maistra/xns-informer/pkg/generated/kube" + xnsrouteinformer "github.com/maistra/xns-informer/pkg/generated/openshift/route" xnsinformers "github.com/maistra/xns-informer/pkg/informers" + routeclient "github.com/openshift/client-go/route/clientset/versioned" + routefake "github.com/openshift/client-go/route/clientset/versioned/fake" + routeinformer "github.com/openshift/client-go/route/informers/externalversions" "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/grpc/credentials" @@ -132,6 +136,8 @@ type Client interface { // Istio returns the Istio kube client. Istio() istioclient.Interface + Route() routeclient.Interface + // GatewayAPI returns the gateway-api kube client. GatewayAPI() gatewayapiclient.Interface @@ -147,6 +153,8 @@ type Client interface { // IstioInformer returns an informer for the istio client IstioInformer() istioinformer.SharedInformerFactory + RouteInformer() routeinformer.SharedInformerFactory + // GatewayAPIInformer returns an informer for the gateway-api client GatewayAPIInformer() gatewayapiinformer.SharedInformerFactory @@ -286,6 +294,9 @@ func NewFakeClient(objects ...runtime.Object) CLIClient { c.istio = istiofake.NewSimpleClientset() c.istioInformer = xnsistioinformer.NewSharedInformerFactoryWithOptions(c.istio, resyncInterval) + c.route = routefake.NewSimpleClientset() + c.routeInformer = xnsrouteinformer.NewSharedInformerFactory(c.route, resyncInterval) + c.gatewayapi = gatewayapifake.NewSimpleClientset() c.gatewayapiInformer = xnsgatewayapiinformer.NewSharedInformerFactory(c.gatewayapi, resyncInterval) @@ -367,6 +378,9 @@ type client struct { istio istioclient.Interface istioInformer xnsistioinformer.SharedInformerFactory + route routeclient.Interface + routeInformer xnsrouteinformer.SharedInformerFactory + gatewayapi gatewayapiclient.Interface gatewayapiInformer xnsgatewayapiinformer.SharedInformerFactory @@ -454,6 +468,15 @@ func newClientInternal(clientFactory *clientFactory, revision string) (*client, resyncInterval, ) + c.route, err = routeclient.NewForConfig(c.config) + if err != nil { + return nil, err + } + c.routeInformer = xnsrouteinformer.NewSharedInformerFactoryWithOptions( + c.route, + resyncInterval, + ) + if features.EnableGatewayAPI { c.gatewayapi, err = gatewayapiclient.NewForConfig(c.config) if err != nil { @@ -538,6 +561,10 @@ func (c *client) Istio() istioclient.Interface { return c.istio } +func (c *client) Route() routeclient.Interface { + return c.route +} + func (c *client) GatewayAPI() gatewayapiclient.Interface { return c.gatewayapi } @@ -558,6 +585,10 @@ func (c *client) IstioInformer() istioinformer.SharedInformerFactory { return c.istioInformer } +func (c *client) RouteInformer() routeinformer.SharedInformerFactory { + return c.routeInformer +} + func (c *client) GatewayAPIInformer() gatewayapiinformer.SharedInformerFactory { return c.gatewayapiInformer } @@ -588,6 +619,7 @@ func (c *client) AddMemberRollController(namespace, memberRollName string) (err c.memberRoll.Register(c.kubeInformer, "kubernetes-informers") c.memberRoll.Register(c.istioInformer, "istio-informers") + c.memberRoll.Register(c.routeInformer, "openshift-route-informers") c.memberRoll.Register(c.dynamicInformer, "dynamic-informers") if features.EnableGatewayAPI { c.memberRoll.Register(c.gatewayapiInformer, "service-apis-informers") @@ -617,6 +649,7 @@ func (c *client) RunAndWait(stop <-chan struct{}) { fastWaitForCacheSyncDynamic(stop, c.dynamicInformer) fastWaitForCacheSyncDynamic(stop, c.metadataInformer) fastWaitForCacheSync(stop, c.istioInformer) + fastWaitForCacheSync(stop, c.routeInformer) if features.EnableGatewayAPI { fastWaitForCacheSync(stop, c.gatewayapiInformer) } @@ -637,6 +670,7 @@ func (c *client) RunAndWait(stop <-chan struct{}) { c.dynamicInformer.WaitForCacheSync(stop) c.metadataInformer.WaitForCacheSync(stop) c.istioInformer.WaitForCacheSync(stop) + c.routeInformer.WaitForCacheSync(stop) if features.EnableGatewayAPI { c.gatewayapiInformer.WaitForCacheSync(stop) } @@ -659,6 +693,7 @@ func (c *client) startInformer(stop <-chan struct{}) { c.dynamicInformer.Start(stop) c.metadataInformer.Start(stop) c.istioInformer.Start(stop) + c.routeInformer.Start(stop) if features.EnableGatewayAPI { c.gatewayapiInformer.Start(stop) } diff --git a/pkg/kube/mock_client.go b/pkg/kube/mock_client.go index 3241fde08d3..bb6d614deb6 100644 --- a/pkg/kube/mock_client.go +++ b/pkg/kube/mock_client.go @@ -20,6 +20,8 @@ import ( "net/http" "reflect" + routeclient "github.com/openshift/client-go/route/clientset/versioned" + routeinformer "github.com/openshift/client-go/route/informers/externalversions" "google.golang.org/grpc/credentials" v1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -103,6 +105,10 @@ func (c MockClient) Istio() istioclient.Interface { panic("not used in mock") } +func (c MockClient) Route() routeclient.Interface { + panic("not used in mock") +} + func (c MockClient) GatewayAPI() serviceapisclient.Interface { panic("not used in mock") } @@ -119,6 +125,10 @@ func (c MockClient) GatewayAPIInformer() serviceapisinformer.SharedInformerFacto panic("not used in mock") } +func (c MockClient) RouteInformer() routeinformer.SharedInformerFactory { + panic("not used in mock") +} + func (c MockClient) MCSApisInformer() mcsapisinformer.SharedInformerFactory { panic("not used in mock") } diff --git a/tests/integration/servicemesh/maistra/maistra.go b/tests/integration/servicemesh/maistra/maistra.go index 4e304ddc8f4..e0087619cc3 100644 --- a/tests/integration/servicemesh/maistra/maistra.go +++ b/tests/integration/servicemesh/maistra/maistra.go @@ -258,7 +258,22 @@ func EnableIOR(ctx resource.Context, ns namespace.Instance) error { if err := waitForIstiod(kubeClient, ns, &lastSeenGeneration); err != nil { return err } - if err := patchIstiodArgs(kubeClient, ns, enableIOR); err != nil { + if err := patchIstiodArgs(kubeClient, ns, enableIORPatch); err != nil { + return err + } + if err := waitForIstiod(kubeClient, ns, &lastSeenGeneration); err != nil { + return err + } + return nil +} + +func DisableIOR(ctx resource.Context, ns namespace.Instance) error { + kubeClient := ctx.Clusters().Default().Kube() + var lastSeenGeneration int64 + if err := waitForIstiod(kubeClient, ns, &lastSeenGeneration); err != nil { + return err + } + if err := patchIstiodArgs(kubeClient, ns, disableIORPatch); err != nil { return err } if err := waitForIstiod(kubeClient, ns, &lastSeenGeneration); err != nil { @@ -296,70 +311,10 @@ func applyRolesToMemberNamespaces(c config.Factory, values map[string]string, na return nil } -const defaultMaistraSettings = `[ - { - "op": "add", - "path": "/spec/template/spec/containers/0/args/1", - "value": "--memberRollName=default" - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/args/2", - "value": "--enableCRDScan=false" - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/args/3", - "value": "--disableNodeAccess=true" - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/args/4", - "value": "--enableIngressClassName=false" - }, +const disableWebhookPatch = `[ { "op": "add", "path": "/spec/template/spec/containers/0/env/1", - "value": { - "name": "ENABLE_IOR", - "value": "false" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/2", - "value": { - "name": "PILOT_ENABLE_GATEWAY_API", - "value": "false" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/3", - "value": { - "name": "PILOT_ENABLE_GATEWAY_API_STATUS", - "value": "false" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/4", - "value": { - "name": "PILOT_ENABLE_GATEWAY_API_DEPLOYMENT_CONTROLLER", - "value": "false" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/5", - "value": { - "name": "PRIORITIZED_LEADER_ELECTION", - "value": "false" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/6", "value": { "name": "INJECTION_WEBHOOK_CONFIG_NAME", "value": "" @@ -367,7 +322,7 @@ const defaultMaistraSettings = `[ }, { "op": "add", - "path": "/spec/template/spec/containers/0/env/7", + "path": "/spec/template/spec/containers/0/env/2", "value": { "name": "VALIDATION_WEBHOOK_CONFIG_NAME", "value": "" @@ -375,32 +330,24 @@ const defaultMaistraSettings = `[ } ]` -const disableWebhookPatch = `[ +const enableIORPatch = `[ { "op": "add", "path": "/spec/template/spec/containers/0/env/1", "value": { - "name": "INJECTION_WEBHOOK_CONFIG_NAME", - "value": "" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/2", - "value": { - "name": "VALIDATION_WEBHOOK_CONFIG_NAME", - "value": "" + "name": "ENABLE_IOR", + "value": "true" } } ]` -const enableIOR = `[ +const disableIORPatch = `[ { - "op": "replace", + "op": "add", "path": "/spec/template/spec/containers/0/env/1", "value": { "name": "ENABLE_IOR", - "value": "true" + "value": "false" } } ]` diff --git a/tests/integration/servicemesh/managingroutes/main_test.go b/tests/integration/servicemesh/managingroutes/main_test.go index 322859092ba..66f44a8fdb3 100644 --- a/tests/integration/servicemesh/managingroutes/main_test.go +++ b/tests/integration/servicemesh/managingroutes/main_test.go @@ -19,7 +19,6 @@ package managingroutes import ( "context" - "encoding/json" "fmt" "path/filepath" "strings" @@ -27,15 +26,15 @@ import ( "time" routeapiv1 "github.com/openshift/api/route/v1" + routeversioned "github.com/openshift/client-go/route/clientset/versioned" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "istio.io/istio/pilot/pkg/config/kube/ior" "istio.io/istio/pkg/test/env" "istio.io/istio/pkg/test/framework" - "istio.io/istio/pkg/test/framework/components/istioctl" "istio.io/istio/pkg/test/framework/components/namespace" "istio.io/istio/pkg/test/util/retry" "istio.io/istio/tests/integration/servicemesh/maistra" + "istio.io/istio/tests/integration/servicemesh/router" ) var ( @@ -45,12 +44,15 @@ var ( virtualSvcTmpl = filepath.Join(env.IstioSrc, "tests/integration/servicemesh/managingroutes/testdata/virtual-service.tmpl.yaml") ) +const DefaultGatewayCount = 1 + func TestMain(m *testing.M) { // do not change order of setup functions // nolint: staticcheck framework. NewSuite(m). RequireMaxClusters(1). + Setup(router.InstallOpenShiftRouter). Setup(maistra.ApplyServiceMeshCRDs). Setup(namespace.Setup(&istioNamespace, namespace.Config{Prefix: "istio-system"})). Setup(maistra.Install(namespace.Future(&istioNamespace), nil)). @@ -65,105 +67,172 @@ func TestMain(m *testing.M) { Run() } -const gatewayName = "common-gateway" - func TestManagingGateways(t *testing.T) { framework.NewTest(t). Run(func(ctx framework.TestContext) { + gatewayName := "default-gateway" + namespaceGateway := namespace.NewOrFail(ctx, ctx, namespace.Config{Prefix: "gateway", Inject: true}).Name() namespaceA := namespace.NewOrFail(ctx, ctx, namespace.Config{Prefix: "a", Inject: true}).Name() namespaceB := namespace.NewOrFail(ctx, ctx, namespace.Config{Prefix: "b", Inject: true}).Name() - applyGatewayOrFail(ctx, namespaceGateway, "a", "b") - applyVirtualServiceOrFail(ctx, namespaceA, namespaceGateway, "a") - applyVirtualServiceOrFail(ctx, namespaceB, namespaceGateway, "b") - - if err := maistra.ApplyServiceMeshMemberRoll(ctx, istioNamespace, namespaceGateway, namespaceA); err != nil { - ctx.Fatalf("failed to create ServiceMeshMemberRoll: %s", err) - } - verifyThatIngressHasVirtualHostForMember(ctx, istioNamespace.Name(), "a") + namespaceC := namespace.NewOrFail(ctx, ctx, namespace.Config{Prefix: "c", Inject: true}).Name() + applyVirtualServiceOrFail(ctx, namespaceA, namespaceGateway, gatewayName, "a") + applyVirtualServiceOrFail(ctx, namespaceB, namespaceGateway, gatewayName, "b") + applyVirtualServiceOrFail(ctx, namespaceC, namespaceGateway, gatewayName, "c") - if err := maistra.ApplyServiceMeshMemberRoll(ctx, istioNamespace, namespaceGateway, namespaceA, namespaceB); err != nil { + if err := maistra.ApplyServiceMeshMemberRoll(ctx, istioNamespace, namespaceGateway, namespaceA, namespaceB, namespaceC); err != nil { ctx.Fatalf("failed to add member to ServiceMeshMemberRoll: %s", err) } - verifyThatIngressHasVirtualHostForMember(ctx, istioNamespace.Name(), "a", "b") - if err := maistra.ApplyServiceMeshMemberRoll(ctx, istioNamespace, namespaceGateway, namespaceB); err != nil { - ctx.Fatalf("failed to create ServiceMeshMemberRoll: %s", err) + if err := maistra.EnableIOR(ctx, istioNamespace); err != nil { + ctx.Error("failed to enable IOR: %s", err) } - verifyThatIngressHasVirtualHostForMember(ctx, istioNamespace.Name(), "b") + + ctx.Cleanup(func() { + if err := maistra.DisableIOR(ctx, istioNamespace); err != nil { + ctx.Fatalf("failed to disable IOR: %s", err) + } + }) ctx.NewSubTest("RouteCreation").Run(func(t framework.TestContext) { - if err := maistra.EnableIOR(t, istioNamespace); err != nil { - t.Fatalf("failed to enable IOR: %s", err) + labelSetA := map[string]string{ + "a": "a", + } + t.Cleanup(func() { + deleteGatewayOrFail(t, namespaceGateway, gatewayName, labelSetA, "a", "b") + ensureRoutesCleared(t) + }) + applyGatewayOrFail(ctx, namespaceGateway, gatewayName, labelSetA, "a", "b") + + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "a.maistra.io") + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "b.maistra.io") + }) + + ctx.NewSubTest("RouteChange").Run(func(t framework.TestContext) { + labelSetA := map[string]string{ + "a": "a", + } + t.Cleanup(func() { + deleteGatewayOrFail(t, namespaceGateway, gatewayName, labelSetA, "a", "b") + ensureRoutesCleared(t) + }) + applyGatewayOrFail(t, namespaceGateway, gatewayName, labelSetA, "a", "b", "c") + + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "a.maistra.io") + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "b.maistra.io") + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "c.maistra.io") + + applyGatewayOrFail(t, namespaceGateway, gatewayName, labelSetA, "a", "b") + + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "a.maistra.io") + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "b.maistra.io") + verifyThatRouteIsMissingOrFail(t, namespaceGateway, gatewayName, "c.maistra.io") + }) + + ctx.NewSubTest("RouteLabelUpdate").Run(func(t framework.TestContext) { + labelSetA := map[string]string{ + "a": "a", + } + labelSetB := map[string]string{ + "b": "b", } + t.Cleanup(func() { + deleteGatewayOrFail(t, namespaceGateway, gatewayName, labelSetB, "a") + ensureRoutesCleared(t) + }) + + applyGatewayOrFail(t, namespaceGateway, gatewayName, labelSetA, "a") + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "a.maistra.io") + + applyGatewayOrFail(t, namespaceGateway, gatewayName, labelSetB, "a") verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "a.maistra.io") + + routeClient := t.AllClusters().Default().Route() + retry.UntilSuccessOrFail(t, func() error { + route, err := findRoute(routeClient, namespaceGateway, gatewayName, "a.maistra.io") + if err != nil || route == nil { + return fmt.Errorf("failed to find route: %s", err) + } + + labels := route.GetObjectMeta().GetLabels() + val, ok := labels["b"] + + if ok && val == "b" { + return nil + } + + return fmt.Errorf("expected to find 'b' in the labels, but got %s", labels) + }, retry.BackoffDelay(500*time.Millisecond)) }) }) } -func verifyThatIngressHasVirtualHostForMember(ctx framework.TestContext, istioNamespace string, expectedMembers ...string) { - expectedGatewayRouteName := "http.8080" - expectedVirtualHostsNum := len(expectedMembers) +func verifyThatRouteExistsOrFail(ctx framework.TestContext, gatewayNamespace, gatewayName, host string) { + routeClient := ctx.AllClusters().Default().Route() retry.UntilSuccessOrFail(ctx, func() error { - podName, err := getPodName(ctx, istioNamespace, "istio-ingressgateway") - if err != nil { - return err - } - routes, err := getRoutesFromProxy(ctx, podName, istioNamespace, expectedGatewayRouteName) + route, err := findRoute(routeClient, gatewayNamespace, gatewayName, host) if err != nil { - return fmt.Errorf("failed to get routes from proxy %s: %s", podName, err) + return fmt.Errorf("failed to get Routes: %v", err) } - if len(routes) != 1 { - return fmt.Errorf("expected to find exactly 1 route '%s', got %d", expectedGatewayRouteName, len(routes)) + + if route == nil { + return fmt.Errorf("no Route found") } - virtualHostsNum := len(routes[0].VirtualHosts) - if virtualHostsNum != expectedVirtualHostsNum { - return fmt.Errorf("expected to find exactly %d virtual hosts, got %d", expectedVirtualHostsNum, virtualHostsNum) + return nil + }, retry.BackoffDelay(500*time.Millisecond)) +} + +func verifyThatRouteIsMissingOrFail(ctx framework.TestContext, gatewayNamespace, gatewayName, host string) { + routeClient := ctx.AllClusters().Default().Route() + + retry.UntilSuccessOrFail(ctx, func() error { + route, err := findRoute(routeClient, gatewayNamespace, gatewayName, host) + if err != nil { + return fmt.Errorf("failed to get Routes: %v", err) } - CheckExpectedMembersLoop: - for _, member := range expectedMembers { - expectedVirtualHostName := fmt.Sprintf("%s.maistra.io:80", member) - for _, virtualHost := range routes[0].VirtualHosts { - if virtualHost.Name == expectedVirtualHostName { - continue CheckExpectedMembersLoop - } - } - return fmt.Errorf("expected virtual host '%s' was not found", expectedVirtualHostName) + if route != nil { + return fmt.Errorf("found unexpected Route") } + return nil - }, retry.Timeout(10*time.Second)) + }, retry.BackoffDelay(500*time.Millisecond)) } -func verifyThatRouteExistsOrFail(ctx framework.TestContext, expectedGwNs, expectedGwName, expectedHost string) { - routerClient, err := ior.NewRouterClient() +func findRoute(routeClient routeversioned.Interface, gatewayNamespace, gatewayName, host string) (*routeapiv1.Route, error) { + routes, err := routeClient.RouteV1().Routes(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}) if err != nil { - ctx.Fatalf("failed to create Router client: %s", err) + return nil, fmt.Errorf("failed to get Routes: %s", err) } - var routes *routeapiv1.RouteList + + if len(routes.Items) != 0 { + for _, route := range routes.Items { + if route.Spec.Host == host && strings.HasPrefix(route.Name, fmt.Sprintf("%s-%s-", gatewayNamespace, gatewayName)) { + return &route, nil + } + } + } + + return nil, nil +} + +func ensureRoutesCleared(ctx framework.TestContext) { + routeClient := ctx.AllClusters().Default().Route() + retry.UntilSuccessOrFail(ctx, func() error { - routes, err = routerClient.Routes("istio-system").List(context.TODO(), metav1.ListOptions{}) + routes, err := routeClient.RouteV1().Routes(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{LabelSelector: "maistra.io/generated-by=ior"}) if err != nil { return fmt.Errorf("failed to get Routes: %s", err) } - if len(routes.Items) == 0 { - return fmt.Errorf("no Routes found") - } - return nil - }, retry.Timeout(10*time.Second)) - found := false - for _, route := range routes.Items { - if route.Spec.Host == expectedHost && strings.HasPrefix(route.Name, fmt.Sprintf("%s-%s-", expectedGwNs, expectedGwName)) { - found = true - break + if count := len(routes.Items); count != DefaultGatewayCount { + return fmt.Errorf("found unexpected routes %d", count) } - } - if !found { - ctx.Fatalf("failed to find Route for host %s", expectedHost) - } + + return nil + }, retry.BackoffDelay(500*time.Millisecond)) } type RouteConfig struct { @@ -175,47 +244,26 @@ type VirtualHost struct { Name string `json:"name"` } -func getRoutesFromProxy(ctx framework.TestContext, pod, namespace, routeName string) ([]*RouteConfig, error) { - istioCtl := istioctl.NewOrFail(ctx, ctx, istioctl.Config{}) - stdout, stderr, err := istioCtl.Invoke([]string{ - "proxy-config", "routes", fmt.Sprintf("%s.%s", pod, namespace), "--name", routeName, "-o", "json", - }) - if err != nil || stderr != "" { - return nil, fmt.Errorf("failed to execute command 'istioctl proxy-config': %s: %s", stderr, err) - } - - routes := make([]*RouteConfig, 0) - if err := json.Unmarshal([]byte(stdout), &routes); err != nil { - return nil, fmt.Errorf("failed to unmarshall routes: %s", err) - } - - return routes, nil -} - -func getPodName(ctx framework.TestContext, namespace, appName string) (string, error) { - pods, err := ctx.Clusters().Default().PodsForSelector(context.TODO(), namespace, fmt.Sprintf("app=%s", appName)) - if err != nil { - return "", fmt.Errorf("failed to get %s pod from namespace %s: %v", appName, namespace, err) - } - if len(pods.Items) == 0 { - return "", fmt.Errorf("list of received %s pods from namespace %s is empty", appName, namespace) - } - return pods.Items[0].Name, nil +func applyGatewayOrFail(ctx framework.TestContext, ns, name string, labels map[string]string, hosts ...string) { + // retry because of flaky validation webhook + retry.UntilSuccessOrFail(ctx, func() error { + return ctx.ConfigIstio().EvalFile(ns, map[string]interface{}{"hosts": hosts, "name": name, "labels": labels}, gatewayTmpl).Apply() + }, retry.Timeout(3*time.Second)) } -func applyGatewayOrFail(ctx framework.TestContext, ns string, hosts ...string) { +func deleteGatewayOrFail(ctx framework.TestContext, ns, name string, labels map[string]string, hosts ...string) { // retry because of flaky validation webhook retry.UntilSuccessOrFail(ctx, func() error { - return ctx.ConfigIstio().EvalFile(ns, map[string][]string{"hosts": hosts}, gatewayTmpl).Apply() + return ctx.ConfigIstio().EvalFile(ns, map[string]interface{}{"hosts": hosts, "name": name, "labels": labels}, gatewayTmpl).Delete() }, retry.Timeout(3*time.Second)) } -func applyVirtualServiceOrFail(ctx framework.TestContext, ns, gatewayNs, virtualServiceName string) { +func applyVirtualServiceOrFail(ctx framework.TestContext, ns, gatewayNs, gatewayName, virtualServiceName string) { values := map[string]string{ - "name": virtualServiceName, - "gatewayNs": gatewayNs, + "name": virtualServiceName, + "gatewayNs": gatewayNs, + "gatewayName": gatewayName, } - // retry because of flaky validation webhook retry.UntilSuccessOrFail(ctx, func() error { return ctx.ConfigIstio().EvalFile(ns, values, virtualSvcTmpl).Apply() }, retry.Timeout(3*time.Second)) diff --git a/tests/integration/servicemesh/managingroutes/testdata/gateway.tmpl.yaml b/tests/integration/servicemesh/managingroutes/testdata/gateway.tmpl.yaml index c73fe53cbd4..903b697816d 100644 --- a/tests/integration/servicemesh/managingroutes/testdata/gateway.tmpl.yaml +++ b/tests/integration/servicemesh/managingroutes/testdata/gateway.tmpl.yaml @@ -1,7 +1,11 @@ apiVersion: networking.istio.io/v1alpha3 kind: Gateway metadata: - name: common-gateway + name: {{ .name }} + labels: + {{- range $k, $v := .labels }} + {{$k}}: {{$v}} + {{- end }} spec: selector: istio: ingressgateway diff --git a/tests/integration/servicemesh/managingroutes/testdata/virtual-service.tmpl.yaml b/tests/integration/servicemesh/managingroutes/testdata/virtual-service.tmpl.yaml index 24a8fad305a..bd05f4959a3 100644 --- a/tests/integration/servicemesh/managingroutes/testdata/virtual-service.tmpl.yaml +++ b/tests/integration/servicemesh/managingroutes/testdata/virtual-service.tmpl.yaml @@ -6,7 +6,7 @@ spec: hosts: - "{{ .name }}.maistra.io" gateways: - - {{ .gatewayNs }}/common-gateway + - {{ .gatewayNs }}/{{ .gatewayName }} http: - route: - destination: