Skip to content

Commit

Permalink
Node: watch IPPool CR
Browse files Browse the repository at this point in the history
Signed-off-by: Fred Rolland <[email protected]>
  • Loading branch information
rollandf committed Aug 22, 2023
1 parent 4b75b56 commit e2dcee6
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 111 deletions.
23 changes: 16 additions & 7 deletions cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/google/renameio/v2"
"github.com/spf13/cobra"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -50,13 +49,14 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"

nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1"
ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1"
"github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-node/app/options"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/cmdutils"
cniTypes "github.com/Mellanox/nvidia-k8s-ipam/pkg/cni/types"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/allocator"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner"
nodectrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/node"
ippoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/ippool"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/grpc/middleware"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/handlers"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/migrator"
Expand Down Expand Up @@ -116,7 +116,8 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
ctrl.SetLogger(logger)

logger.Info("start IPAM node daemon",
"version", version.GetVersionString(), "node", opts.NodeName)
"version", version.GetVersionString(), "node", opts.NodeName,
"IPPools Namespace", opts.PoolsNamespace)

if err := deployShimCNI(logger, opts); err != nil {
return err
Expand All @@ -129,13 +130,19 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
return err
}

if err := ipamv1alpha1.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to register ipamv1alpha1 scheme")
return err
}

poolManager := poolPkg.NewManager()

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
NewCache: cache.BuilderWithOptions(cache.Options{
SelectorsByObject: cache.SelectorsByObject{&corev1.Node{}: cache.ObjectSelector{
Field: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", opts.NodeName)),
SelectorsByObject: cache.SelectorsByObject{&ipamv1alpha1.IPPool{}: cache.ObjectSelector{
Field: fields.AndSelectors(
fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace=%s", opts.PoolsNamespace))),
}},
}),
MetricsBindAddress: opts.MetricsAddr,
Expand All @@ -146,12 +153,14 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
logger.Error(err, "unable to initialize manager")
return err
}
if err = (&nodectrl.NodeReconciler{

if err = (&ippoolctrl.IPPoolReconciler{
PoolManager: poolManager,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
NodeName: opts.NodeName,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "Node")
logger.Error(err, "unable to create controller", "controller", "IPPool")
return err
}

Expand Down
17 changes: 14 additions & 3 deletions cmd/ipam-node/app/app_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"

ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1"
)

var (
Expand All @@ -39,17 +42,25 @@ func TestApp(t *testing.T) {

var _ = BeforeSuite(func() {
By("bootstrapping test environment")
testEnv = &envtest.Environment{}
var err error
err = ipamv1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{"../../../deploy/crds"},
CRDInstallOptions: envtest.CRDInstallOptions{
ErrorIfPathMissing: true,
},
}

ctx, cFunc = context.WithCancel(context.Background())

var err error
// cfg is defined in this file globally.
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

k8sClient, err = client.New(cfg, client.Options{})
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
})
Expand Down
84 changes: 63 additions & 21 deletions cmd/ipam-node/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1"
ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1"
"github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-node/app"
"github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-node/app/options"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
)

const (
Expand All @@ -43,27 +44,67 @@ const (
testNamespace = "default"
)

func createTestNode() *corev1.Node {
nodeObj := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testNodeName},
}
ExpectWithOffset(1, pool.SetIPBlockAnnotation(nodeObj, map[string]*pool.IPPool{
testPoolName1: {
Name: testPoolName1,
Subnet: "192.168.0.0/16",
StartIP: "192.168.0.2",
EndIP: "192.168.0.254",
Gateway: "192.168.0.1",
func createTestPools() {
pool1 := &ipamv1alpha1.IPPool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName1, Namespace: testNamespace},
Spec: ipamv1alpha1.IPPoolSpec{
Subnet: "192.168.0.0/16",
PerNodeBlockSize: 252,
Gateway: "192.168.0.1",
},
testPoolName2: {Name: testPoolName2,
Subnet: "10.100.0.0/16",
StartIP: "10.100.0.2",
EndIP: "10.100.0.254",
Gateway: "10.100.0.1",
}
ExpectWithOffset(1, k8sClient.Create(ctx, pool1))

pool2 := &ipamv1alpha1.IPPool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName2, Namespace: testNamespace},
Spec: ipamv1alpha1.IPPoolSpec{
Subnet: "10.100.0.0/16",
PerNodeBlockSize: 252,
Gateway: "10.100.0.1",
},
})).NotTo(HaveOccurred())
ExpectWithOffset(1, k8sClient.Create(ctx, nodeObj))
return nodeObj
}
ExpectWithOffset(1, k8sClient.Create(ctx, pool2))

// Update statuses with range allocation
Eventually(func(g Gomega) error {
status := ipamv1alpha1.IPPoolStatus{
Allocations: []ipamv1alpha1.Allocation{
{
NodeName: testNodeName,
StartIP: "192.168.0.2",
EndIP: "192.168.0.254",
},
},
}
return updatePoolStatus(testPoolName1, status)
}, 30, 5).Should(Not(HaveOccurred()))

Eventually(func(g Gomega) error {
status := ipamv1alpha1.IPPoolStatus{
Allocations: []ipamv1alpha1.Allocation{
{
NodeName: testNodeName,
StartIP: "10.100.0.2",
EndIP: "10.100.0.254",
},
},
}
return updatePoolStatus(testPoolName2, status)
}, 30, 5).Should(Not(HaveOccurred()))
}

func updatePoolStatus(poolName string, status ipamv1alpha1.IPPoolStatus) error {
pool := &ipamv1alpha1.IPPool{}
err := k8sClient.Get(ctx, types.NamespacedName{Name: poolName, Namespace: testNamespace}, pool)
if err != nil {
return err
}
pool.Status = status
err = k8sClient.Status().Update(ctx, pool)
if err != nil {
return err
}
return nil
}

func createTestPod() *corev1.Pod {
Expand Down Expand Up @@ -98,6 +139,7 @@ func getOptions(testDir string) *options.Options {
opts.CNIBinDir = cniBinDir
opts.CNIConfDir = cniConfDir
opts.CNIDaemonSocket = daemonSocket
opts.PoolsNamespace = testNamespace
return opts
}

Expand All @@ -122,7 +164,7 @@ var _ = Describe("IPAM Node daemon", func() {
testDir := GinkgoT().TempDir()
opts := getOptions(testDir)

createTestNode()
createTestPools()
pod := createTestPod()

ctx = logr.NewContext(ctx, klog.NewKlogr())
Expand Down
16 changes: 11 additions & 5 deletions cmd/ipam-node/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ func New() *Options {
// Options holds command line options for controller
type Options struct {
cmdoptions.Options
MetricsAddr string
ProbeAddr string
NodeName string
BindAddress string
StoreFile string
MetricsAddr string
ProbeAddr string
NodeName string
PoolsNamespace string
BindAddress string
StoreFile string
// shim CNI parameters
CNIBinDir string
CNIBinFile string
Expand Down Expand Up @@ -91,6 +92,8 @@ func (o *Options) AddNamedFlagSets(sharedFS *cliflag.NamedFlagSets) {
o.ProbeAddr, "The address the probe endpoint binds to.")
daemonFS.StringVar(&o.NodeName, "node-name",
o.NodeName, "The name of the Node on which the daemon runs")
daemonFS.StringVar(&o.PoolsNamespace, "ippools-namespace",
o.PoolsNamespace, "The name of the namespace to watch for IPPools CRs")
daemonFS.StringVar(&o.BindAddress, "bind-address", o.BindAddress,
"GPRC server bind address. e.g.: tcp://127.0.0.1:9092, unix:///var/lib/foo")
daemonFS.StringVar(&o.StoreFile, "store-file", o.StoreFile,
Expand Down Expand Up @@ -122,6 +125,9 @@ func (o *Options) Validate() error {
if len(o.NodeName) == 0 {
return fmt.Errorf("node-name is required parameter")
}
if len(o.PoolsNamespace) == 0 {
return fmt.Errorf("ippools-namespace is required parameter")
}
_, _, err := ParseBindAddress(o.BindAddress)
if err != nil {
return fmt.Errorf("bind-address is invalid: %v", err)
Expand Down
14 changes: 13 additions & 1 deletion deploy/nv-ipam.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ rules:
- apiGroups:
- ""
resources:
- nodes
- pods
verbs:
- get
- list
- watch
- apiGroups:
- nv-ipam.nvidia.com
resources:
- ippools
verbs:
- get
- list
- watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down Expand Up @@ -70,6 +77,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: IPPOOLS_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
command: [ "/ipam-node" ]
args:
- --node-name=$(NODE_NAME)
Expand All @@ -83,6 +94,7 @@ spec:
- --cni-conf-dir=/etc/cni/net.d/nv-ipam.d
- --cni-log-file=/var/log/nv-ipam-cni.log
- --cni-log-level=info # log level for shim CNI
- --ippools-namespace=$(IPPOOLS_NAMESPACE)
resources:
requests:
cpu: "100m"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,58 @@ package controllers
import (
"context"

corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
)

// NodeReconciler reconciles Node objects
type NodeReconciler struct {
// IPPoolReconciler reconciles Node objects
type IPPoolReconciler struct {
PoolManager pool.Manager
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
NodeName string
}

// Reconcile contains logic to sync Node objects
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Reconcile contains logic to sync IPPool objects
func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLog := log.FromContext(ctx)
node := &corev1.Node{}
err := r.Client.Get(ctx, req.NamespacedName, node)
ipPool := &ipamv1alpha1.IPPool{}
err := r.Client.Get(ctx, req.NamespacedName, ipPool)
if err != nil {
if apiErrors.IsNotFound(err) {
reqLog.Info("Pool not found, removing from PoolManager")
r.PoolManager.RemovePool(req.Name)
return ctrl.Result{}, nil
}
reqLog.Error(err, "failed to get Pool object from the cache")
return ctrl.Result{}, err
}
if err := r.PoolManager.Update(node); err != nil {
reqLog.Info("pool config from the node object is not updated, reset pool config",
"reason", err.Error())
r.PoolManager.Reset()
} else {
reqLog.Info("pools configuration updated", "data", r.PoolManager.GetPools())
reqLog.Info("Notification on Pool", "name", ipPool.Name)
for _, alloc := range ipPool.Status.Allocations {
if alloc.NodeName == r.NodeName {
pool := &pool.IPPool{
Name: ipPool.Name,
Subnet: ipPool.Spec.Subnet,
Gateway: ipPool.Spec.Gateway,
StartIP: alloc.StartIP,
EndIP: alloc.EndIP,
}
r.PoolManager.UpdatePool(pool)
break
}
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *IPPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
WithEventFilter(predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return true
}
return e.ObjectOld.GetAnnotations()[pool.IPBlocksAnnotation] !=
e.ObjectNew.GetAnnotations()[pool.IPBlocksAnnotation]
},
}).
For(&ipamv1alpha1.IPPool{}).
Complete(r)
}
Loading

0 comments on commit e2dcee6

Please sign in to comment.