Skip to content

Commit

Permalink
[ior] MAISTRA-1400 Add IOR to Pilot
Browse files Browse the repository at this point in the history
* [MAISTRA-1089][MAISTRA-1400][MAISTRA-1744][MAISTRA-1811]: Add IOR to Pilot (maistra#135) (maistra#240)

* MAISTRA-1400: Add IOR to Pilot (maistra#135)

* MAISTRA-1400: Add IOR to Pilot

* [MAISTRA-1744] Add route annotation propagation (maistra#158)

* MAISTRA-1811 Store resourceVersion of reconciled Gateway resource (maistra#190)

* MAISTRA-1089 Add support for IOR routes in all namespaces (maistra#193)

* MAISTRA-2131: ior: honor Gateway's httpsRedirect (maistra#276)

If Gateway's httpsRedirect is set to true, create the OpenShift Route
with Insecure Policy set to `Redirect`.

Manual cherrypick from maistra#269.

* MAISTRA-2149: Make IOR robust in multiple replicas (maistra#282)

In scenarios where multiple replicas of istiod are running,
only one IOR should be in charge of keeping routes in sync
with Istio Gateways. We achieve this by making sure IOR only
runs in the leader replica.

Also, because leader election is not 100% acurate, meaning
that for a small window of time there might be two instances
being the leader - which could lead to duplicated routes
being created if a new gateway is created in that time frame -
we also change the way the Route name is created: Instead of
having a generateName field, we now explicitly pass a name to
the Route object to be created. Being deterministic, it allows
the Route creation to fail when there's already a Route object
with the same name (created by the other leader in that time frame).

Use an exclusive leader ID for IOR

* Manual cherrypick of maistra#275

* MAISTRA-1813: Add unit tests for IOR (maistra#286)

* MAISTRA-2051 fixes for maistra install

* MAISTRA-2164: Refactor IOR internals (maistra#295)

Instead of doing lots of API calls on every event - this
does not scale well with lots of namespaces - keep the state
in memory, by doing an initial synchronization on start up and
updating it when receiving events.

The initial synchronization is more complex, as we have to deal with
asynchronous events (e.g., we have to wait for the Gateway store to
be warmed up). Once it's initialized, handling events as they arrive
becomes trivial.

Tests that make sure we do not make more calls to the API server than
the necessary were added, to avoid regressions.

* MAISTRA-2205: Add an option to opt-out for automatic route creation

If the Istio Gateway contains the annotation `maistra.io/manageRoute: false`
then IOR ignores it and doesn't attempt to create or manage route(s) for
this Gateway.

Also, ignore Gateways with the annotation `istio: egressgateway` as
these are not meant to have routes.
  • Loading branch information
brian-avery authored and luksa committed Feb 7, 2022
1 parent ab46cf7 commit 50d0575
Show file tree
Hide file tree
Showing 24 changed files with 5,949 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pilot/pkg/bootstrap/configcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"istio.io/istio/pilot/pkg/config/kube/gateway"
"istio.io/istio/pilot/pkg/config/kube/ingress"
ingressv1 "istio.io/istio/pilot/pkg/config/kube/ingressv1"
"istio.io/istio/pilot/pkg/config/kube/ior"
"istio.io/istio/pilot/pkg/config/memory"
configmonitor "istio.io/istio/pilot/pkg/config/monitor"
"istio.io/istio/pilot/pkg/controller/workloadentry"
Expand Down Expand Up @@ -142,6 +143,8 @@ func (s *Server) initConfigController(args *PilotArgs) error {
// Create the config store.
s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

s.startIOR(args)

// Defer starting the controller until after the service is created.
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop)
Expand All @@ -151,6 +154,32 @@ func (s *Server) initConfigController(args *PilotArgs) error {
return nil
}

// startIOR tries to start IOR, if it's enabled. If it encounters any failure, it logs an error and continue
func (s *Server) startIOR(args *PilotArgs) {
if !features.EnableIOR {
return
}

routerClient, err := ior.NewRouterClient()
if err != nil {
ior.IORLog.Errorf("error creating an openshift router client: %v", err)
return
}

iorKubeClient := ior.NewKubeClient(s.kubeClient)

s.addStartFunc(func(stop <-chan struct{}) error {
go leaderelection.
NewLeaderElection(args.Namespace, args.PodName, leaderelection.IORController, args.Revision, s.kubeClient).
AddRunFunction(func(stop <-chan struct{}) {
if err := ior.Register(iorKubeClient, routerClient, s.configController, args.Namespace, s.kubeClient.GetMemberRoll(), stop, nil); err != nil {
ior.IORLog.Error(err)
}
}).Run(stop)
return nil
})
}

func (s *Server) initK8SConfigStore(args *PilotArgs) error {
if s.kubeClient == nil {
return nil
Expand Down
59 changes: 59 additions & 0 deletions pilot/pkg/config/kube/ior/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright Red Hat, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ior

import (
"strings"
"time"

"k8s.io/client-go/kubernetes"
)

// KubeClient is an extension of `kubernetes.Interface` with auxiliary functions for IOR
type KubeClient interface {
IsRouteSupported() bool
GetActualClient() kubernetes.Interface
GetHandleEventTimeout() time.Duration
}

type kubeClient struct {
client kubernetes.Interface
}

// NewKubeClient creates the IOR version of KubeClient
func NewKubeClient(client kubernetes.Interface) KubeClient {
return &kubeClient{client: client}
}

func (c *kubeClient) IsRouteSupported() bool {
_, s, _ := c.client.Discovery().ServerGroupsAndResources()
// This may fail if any api service is down, but the result will still be populated, so we skip the error
for _, res := range s {
for _, api := range res.APIResources {
if api.Kind == "Route" && strings.HasPrefix(res.GroupVersion, "route.openshift.io/") {
return true
}
}
}
return false
}

func (c *kubeClient) GetActualClient() kubernetes.Interface {
return c.client
}

func (c *kubeClient) GetHandleEventTimeout() time.Duration {
return 10 * time.Second
}
249 changes: 249 additions & 0 deletions pilot/pkg/config/kube/ior/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright Red Hat, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ior

import (
"fmt"
"sync"
"time"

v1 "github.com/openshift/api/route/v1"
routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1"
"golang.org/x/net/context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"istio.io/istio/pkg/servicemesh/controller"
)

// FakeRouter implements routev1.RouteInterface
type FakeRouter struct {
routes map[string]*v1.Route
routesLock sync.Mutex
}

// FakeRouterClient implements routev1.RouteV1Interface
type FakeRouterClient struct {
routesByNamespace map[string]routev1.RouteInterface
routesByNamespaceLock sync.Mutex
}

type fakeKubeClient struct {
client kubernetes.Interface
}

// NewFakeKubeClient creates a new FakeKubeClient
func NewFakeKubeClient(client kubernetes.Interface) KubeClient {
return &fakeKubeClient{client: client}
}

func (c *fakeKubeClient) IsRouteSupported() bool {
return true
}

func (c *fakeKubeClient) GetActualClient() kubernetes.Interface {
return c.client
}

func (c *fakeKubeClient) GetHandleEventTimeout() time.Duration {
return time.Millisecond
}

// NewFakeRouterClient creates a new FakeRouterClient
func NewFakeRouterClient() routev1.RouteV1Interface {
return &FakeRouterClient{
routesByNamespace: make(map[string]routev1.RouteInterface),
}
}

// NewFakeRouter creates a new FakeRouter
func NewFakeRouter() routev1.RouteInterface {
return &FakeRouter{
routes: make(map[string]*v1.Route),
}
}

// RESTClient implements routev1.RouteV1Interface
func (rc *FakeRouterClient) RESTClient() rest.Interface {
panic("not implemented")
}

// Routes implements routev1.RouteV1Interface
func (rc *FakeRouterClient) Routes(namespace string) routev1.RouteInterface {
rc.routesByNamespaceLock.Lock()
defer rc.routesByNamespaceLock.Unlock()

if _, ok := rc.routesByNamespace[namespace]; !ok {
rc.routesByNamespace[namespace] = NewFakeRouter()
}

countCallsIncrement("routes")
return rc.routesByNamespace[namespace]
}

var generatedHostNumber int

// Create implements routev1.RouteInterface
func (fk *FakeRouter) Create(ctx context.Context, route *v1.Route, opts metav1.CreateOptions) (*v1.Route, error) {
fk.routesLock.Lock()
defer fk.routesLock.Unlock()

if route.Spec.Host == "" {
generatedHostNumber++
route.Spec.Host = fmt.Sprintf("generated-host%d.com", generatedHostNumber)
}

fk.routes[route.Name] = route

countCallsIncrement("create")
return route, nil
}

// Update implements routev1.RouteInterface
func (fk *FakeRouter) Update(ctx context.Context, route *v1.Route, opts metav1.UpdateOptions) (*v1.Route, error) {
panic("not implemented")
}

// UpdateStatus implements routev1.RouteInterface
func (fk *FakeRouter) UpdateStatus(ctx context.Context, route *v1.Route, opts metav1.UpdateOptions) (*v1.Route, error) {
panic("not implemented")
}

// Delete implements routev1.RouteInterface
func (fk *FakeRouter) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
fk.routesLock.Lock()
defer fk.routesLock.Unlock()

if _, ok := fk.routes[name]; !ok {
return fmt.Errorf("route %s not found", name)
}

delete(fk.routes, name)

countCallsIncrement("delete")
return nil
}

// DeleteCollection implements routev1.RouteInterface
func (fk *FakeRouter) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
panic("not implemented")
}

// Get implements routev1.RouteInterface
func (fk *FakeRouter) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Route, error) {
panic("not implemented")
}

// List implements routev1.RouteInterface
func (fk *FakeRouter) List(ctx context.Context, opts metav1.ListOptions) (*v1.RouteList, error) {
fk.routesLock.Lock()
defer fk.routesLock.Unlock()

var items []v1.Route
for _, route := range fk.routes {
items = append(items, *route)
}
result := &v1.RouteList{Items: items}

countCallsIncrement("list")
return result, nil
}

// Watch Create implements routev1.RouteInterface
func (fk *FakeRouter) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
panic("not implemented")
}

// Patch implements routev1.RouteInterface
func (fk *FakeRouter) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions,
subresources ...string) (result *v1.Route, err error) {
panic("not implemented")
}

// fakeMemberRollController implements controller.MemberRollController
type fakeMemberRollController struct {
listeners []controller.MemberRollListener
namespaces []string
lock sync.Mutex
}

func newFakeMemberRollController() *fakeMemberRollController {
return &fakeMemberRollController{}
}

// Register implements controller.MemberRollController
func (fk *fakeMemberRollController) Register(listener controller.MemberRollListener, name string) {
fk.lock.Lock()
defer fk.lock.Unlock()

if listener == nil {
return
}
fk.listeners = append(fk.listeners, listener)
}

// Start implements controller.MemberRollController
func (fk *fakeMemberRollController) Start(stopCh <-chan struct{}) {
panic("not implemented")
}

func (fk *fakeMemberRollController) addNamespaces(namespaces ...string) {
fk.namespaces = append(fk.namespaces, namespaces...)
fk.invokeListeners()
}

func (fk *fakeMemberRollController) setNamespaces(namespaces ...string) {
fk.namespaces = namespaces
fk.invokeListeners()
}

func (fk *fakeMemberRollController) invokeListeners() {
fk.lock.Lock()
defer fk.lock.Unlock()

for _, l := range fk.listeners {
l.SetNamespaces(fk.namespaces...)
}
}

var (
countCalls map[string]int = map[string]int{}
countCallsLock sync.Mutex
)

func countCallsReset() {
countCallsLock.Lock()
defer countCallsLock.Unlock()
countCalls = map[string]int{}
}

func countCallsGet(k string) int {
countCallsLock.Lock()
defer countCallsLock.Unlock()
v, ok := countCalls[k]
if !ok {
v = 0
}
return v
}

func countCallsIncrement(k string) {
countCallsLock.Lock()
defer countCallsLock.Unlock()
countCalls[k]++
}
Loading

0 comments on commit 50d0575

Please sign in to comment.