Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make webhook to use local nad cache #106

Merged
merged 3 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
package main

import (
"crypto/tls"
"context"
"crypto/tls"
"flag"
"fmt"
"net/http"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
netcache "github.com/k8snetworkplumbingwg/network-resources-injector/pkg/tools"
"github.com/k8snetworkplumbingwg/network-resources-injector/pkg/webhook"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
defaultClientCa = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
defaultClientCa = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
userDefinedInjectionConfigMap = "nri-user-defined-injections"
)

Expand Down Expand Up @@ -90,6 +91,11 @@ func main() {
glog.Fatalf("error in setting resource name keys: %s", err.Error())
}

//initiaze webhook with cache
netAnnotationCache := netcache.Create()
netAnnotationCache.Start()
webhook.SetNetAttachDefCache(netAnnotationCache)

go func() {
/* register handlers */
var httpServer *http.Server
Expand Down Expand Up @@ -193,4 +199,7 @@ func main() {
webhook.SetCustomizedInjections(cm)
}
}

// TODO: find a way to stop cache, should we run the above block in a go routine and make main module
pperiyasamy marked this conversation as resolved.
Show resolved Hide resolved
// to respond to terminate singal ?
}
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -105,6 +106,7 @@ github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
Expand All @@ -114,6 +116,7 @@ github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTV
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
Expand Down
147 changes: 147 additions & 0 deletions pkg/tools/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2021 Nordix Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
cniv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
"github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
"github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type NetAttachDefCache struct {
networkAnnotationsMap map[string]map[string]string
networkAnnotationsMapMutex *sync.Mutex
stopper chan struct{}
isRunning int32
}

type NetAttachDefCacheService interface {
Start()
Stop()
Get(namespace string, networkName string) map[string]string
}

func Create() NetAttachDefCacheService {
return &NetAttachDefCache{make(map[string]map[string]string),
&sync.Mutex{}, make(chan struct{}), 0}
}

// Start creates informer for NetworkAttachmentDefinition events and populate the local cache
func (nc *NetAttachDefCache) Start() {
factory := externalversions.NewSharedInformerFactoryWithOptions(setupNetAttachDefClient(), 0, externalversions.WithNamespace(""))
informer := factory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions().Informer()
// mutex to serialize the events.
mutex := &sync.Mutex{}

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mutex.Lock()
defer mutex.Unlock()
netAttachDef := obj.(*cniv1.NetworkAttachmentDefinition)
nc.put(netAttachDef.Namespace, netAttachDef.Name, netAttachDef.Annotations)
},
UpdateFunc: func(oldObj, newObj interface{}) {
mutex.Lock()
defer mutex.Unlock()
oldNetAttachDef := oldObj.(*cniv1.NetworkAttachmentDefinition)
newNetAttachDef := newObj.(*cniv1.NetworkAttachmentDefinition)
if oldNetAttachDef.GetResourceVersion() == newNetAttachDef.GetResourceVersion() {
glog.Infof("no change in net-attach-def %s, ignoring update event", nc.getKey(oldNetAttachDef.Namespace, newNetAttachDef.Name))
return
}
nc.remove(oldNetAttachDef.Namespace, oldNetAttachDef.Name)
nc.put(newNetAttachDef.Namespace, newNetAttachDef.Name, newNetAttachDef.Annotations)
},
DeleteFunc: func(obj interface{}) {
mutex.Lock()
defer mutex.Unlock()
netAttachDef := obj.(*cniv1.NetworkAttachmentDefinition)
nc.remove(netAttachDef.Namespace, netAttachDef.Name)
},
})
go func() {
atomic.StoreInt32(&(nc.isRunning), int32(1))
// informer Run blocks until informer is stopped
glog.Infof("starting net-attach-def informer")
informer.Run(nc.stopper)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put in a log after .Run "net-attach-def informer stopped"? It may aid debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

atomic.StoreInt32(&(nc.isRunning), int32(0))
}()
}

// Stop teardown the NetworkAttachmentDefinition informer
func (nc *NetAttachDefCache) Stop() {
close(nc.stopper)
Copy link
Member

@martinkennelly martinkennelly Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the informer would force the user who is calling the "Run" function to pass a channel which would signal back to main thread when it has ended following the signal to the informer to stop here.

I was just thinking if we close this, and then if the main process ends before the informer go routine ends, then it will be killed without finishing naturally. Did you look into this? Looks like nothing you can do so I am not saying to rework this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good find! now added a code for graceful shutdown of nad cache with atomic isRunning variable, hope that's ok now.

func(limit time.Duration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wrap this logic in a func? I think it makes it harder to read versus a normal loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :) done.

tEnd := time.Now().Add(limit)
for tEnd.After(time.Now()) {
if atomic.LoadInt32(&nc.isRunning) == 0 {
glog.Infof("net-attach-def informer is stopped")
return
}
time.Sleep(600 * time.Millisecond)
}
glog.Infof("net-attach-def informer is not stopped yet, proceeding with cleaning up nad cache")
}(3 * time.Second)
nc.networkAnnotationsMapMutex.Lock()
nc.networkAnnotationsMap = nil
nc.networkAnnotationsMapMutex.Unlock()
}

func (nc *NetAttachDefCache) put(namespace, networkName string, annotations map[string]string) {
nc.networkAnnotationsMapMutex.Lock()
nc.networkAnnotationsMap[nc.getKey(namespace, networkName)] = annotations
nc.networkAnnotationsMapMutex.Unlock()
}

// Get returns annotations map for the given namespace and network name, if it's not available
// return nil
func (nc *NetAttachDefCache) Get(namespace, networkName string) map[string]string {
nc.networkAnnotationsMapMutex.Lock()
defer nc.networkAnnotationsMapMutex.Unlock()
if annotationsMap, exists := nc.networkAnnotationsMap[nc.getKey(namespace, networkName)]; exists {
return annotationsMap
}
return nil
}

func (nc *NetAttachDefCache) remove(namespace, networkName string) {
nc.networkAnnotationsMapMutex.Lock()
delete(nc.networkAnnotationsMap, nc.getKey(namespace, networkName))
nc.networkAnnotationsMapMutex.Unlock()
}

func (nc *NetAttachDefCache) getKey(namespace, networkName string) string {
return namespace + "/" + networkName
}

// setupNetAttachDefClient creates K8s client for net-attach-def crd
func setupNetAttachDefClient() versioned.Interface {
config, err := rest.InClusterConfig()
if err != nil {
glog.Fatal(err)
}
clientset, err := versioned.NewForConfig(config)
if err != nil {
glog.Fatal(err)
}
return clientset
}
28 changes: 20 additions & 8 deletions pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
multus "gopkg.in/intel/multus-cni.v3/pkg/types"

netcache "github.com/k8snetworkplumbingwg/network-resources-injector/pkg/tools"
"github.com/k8snetworkplumbingwg/network-resources-injector/pkg/types"
"k8s.io/api/admission/v1beta1"
v1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -72,6 +73,7 @@ var (
resourceNameKeys []string
honorExistingResources bool
userDefinedInjects = &userDefinedInjections{Patchs: make(map[string]jsonPatchOperation)}
nadCache netcache.NetAttachDefCacheService
)

func prepareAdmissionReviewResponse(allowed bool, message string, ar *v1beta1.AdmissionReview) error {
Expand Down Expand Up @@ -367,18 +369,23 @@ func getNetworkAttachmentDefinition(namespace, name string) (*cniv1.NetworkAttac

func parseNetworkAttachDefinition(net *multus.NetworkSelectionElement, reqs map[string]int64, nsMap map[string]string) (map[string]int64, map[string]string, error) {
/* for each network in annotation ask API server for network-attachment-definition */
networkAttachmentDefinition, err := getNetworkAttachmentDefinition(net.Namespace, net.Name)
if err != nil {
/* if doesn't exist: deny pod */
reason := errors.Wrapf(err, "could not find network attachment definition '%s/%s'", net.Namespace, net.Name)
glog.Error(reason)
return reqs, nsMap, reason
annotationsMap := nadCache.Get(net.Namespace, net.Name)
if annotationsMap == nil {
glog.Infof("cache entry not found, retrieving network attachment definition '%s/%s' from api server", net.Namespace, net.Name)
networkAttachmentDefinition, err := getNetworkAttachmentDefinition(net.Namespace, net.Name)
if err != nil {
/* if doesn't exist: deny pod */
reason := errors.Wrapf(err, "could not find network attachment definition '%s/%s'", net.Namespace, net.Name)
glog.Error(reason)
return reqs, nsMap, reason
}
annotationsMap = networkAttachmentDefinition.GetAnnotations()
}
glog.Infof("network attachment definition '%s/%s' found", net.Namespace, net.Name)

/* network object exists, so check if it contains resourceName annotation */
for _, networkResourceNameKey := range resourceNameKeys {
if resourceName, exists := networkAttachmentDefinition.ObjectMeta.Annotations[networkResourceNameKey]; exists {
if resourceName, exists := annotationsMap[networkResourceNameKey]; exists {
/* add resource to map/increment if it was already there */
reqs[resourceName]++
glog.Infof("resource '%s' needs to be requested for network '%s/%s'", resourceName, net.Namespace, net.Name)
Expand All @@ -388,7 +395,7 @@ func parseNetworkAttachDefinition(net *multus.NetworkSelectionElement, reqs map[
}

/* parse the net-attach-def annotations for node selector label and add it to the desiredNsMap */
if ns, exists := networkAttachmentDefinition.ObjectMeta.Annotations[nodeSelectorKey]; exists {
if ns, exists := annotationsMap[nodeSelectorKey]; exists {
nsNameValue := strings.Split(ns, "=")
nsNameValueLen := len(nsNameValue)
if nsNameValueLen > 2 {
Expand Down Expand Up @@ -942,6 +949,11 @@ func MutateHandler(w http.ResponseWriter, req *http.Request) {
writeResponse(w, ar)
}

// SetNetAttachDefCache sets up the net attach def cache service
func SetNetAttachDefCache(cache netcache.NetAttachDefCacheService) {
nadCache = cache
}

// SetResourceNameKeys extracts resources from a string and add them to resourceNameKeys array
func SetResourceNameKeys(keys string) error {
if keys == "" {
Expand Down