Skip to content

Commit

Permalink
Merge pull request #31 from rollandf/leader-migrate
Browse files Browse the repository at this point in the history
Run migration logic with leader election
  • Loading branch information
ykulazhenkov authored Sep 19, 2023
2 parents 1ee40f5 + efd152a commit 11f148c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 15 deletions.
18 changes: 14 additions & 4 deletions cmd/ipam-controller/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,25 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio
os.Exit(1)
}

if err := migrator.Migrate(ctx, k8sClient, opts.IPPoolsNamespace); err != nil {
logger.Error(err, fmt.Sprintf("failed to migrate NV-IPAM config from ConfigMap, "+
"set %s env variable to disable migration", migrator.EnvDisableMigration))
return err
migrationChan := make(chan struct{})
m := migrator.Migrator{
IPPoolsNamespace: opts.IPPoolsNamespace,
K8sClient: k8sClient,
MigrationCh: migrationChan,
LeaderElection: opts.EnableLeaderElection,
Logger: logger.WithName("Migrator"),
}
err = mgr.Add(&m)
if err != nil {
logger.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

nodeEventCH := make(chan event.GenericEvent, 1)

if err = (&nodectrl.NodeReconciler{
NodeEventCh: nodeEventCH,
MigrationCh: migrationChan,
PoolsNamespace: opts.IPPoolsNamespace,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -158,6 +167,7 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio
PoolsNamespace: opts.IPPoolsNamespace,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "IPPool")
return err
Expand Down
8 changes: 5 additions & 3 deletions cmd/ipam-controller/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ var _ = Describe("App", func() {

go func() {
Expect(app.RunController(logr.NewContext(ctrlCtx, klog.NewKlogr()), cfg, &options.Options{
MetricsAddr: "0", // disable
ProbeAddr: "0", // disable
IPPoolsNamespace: TestNamespace,
MetricsAddr: "0", // disable
ProbeAddr: "0", // disable
IPPoolsNamespace: TestNamespace,
EnableLeaderElection: true,
LeaderElectionNamespace: TestNamespace,
})).NotTo(HaveOccurred())
close(controllerStopped)
}()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ipam-controller/controllers/ippool/ippool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ import (
type IPPoolReconciler struct {
PoolsNamespace string
NodeEventCh chan event.GenericEvent
MigrationCh chan struct{}
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
}

// Reconcile contains logic to sync IPPool objects
func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLog := log.FromContext(ctx)
if req.Namespace != r.PoolsNamespace {
// this should never happen because of the watcher configuration of the manager from controller-runtime pkg
Expand Down
7 changes: 7 additions & 0 deletions pkg/ipam-controller/controllers/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package controllers

import (
"context"
"fmt"

apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,12 +34,18 @@ import (
type NodeReconciler struct {
PoolsNamespace string
NodeEventCh chan event.GenericEvent
MigrationCh chan struct{}
client.Client
Scheme *runtime.Scheme
}

// Reconcile contains logic to sync Node objects
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLog := log.FromContext(ctx)
reqLog.Info("Notification on Node", "name", req.Name)
node := &corev1.Node{}
Expand Down
29 changes: 27 additions & 2 deletions pkg/ipam-controller/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,35 @@ const (
DefaultConfigMapName = "nvidia-k8s-ipam-config"
)

// Migrator migrate from CM config to IPPool CR
type Migrator struct {
K8sClient client.Client
IPPoolsNamespace string
MigrationCh chan struct{}
LeaderElection bool
Logger logr.Logger
}

// Implements manager.Runnable
func (m *Migrator) Start(ctx context.Context) error {
err := Migrate(ctx, m.Logger, m.K8sClient, m.IPPoolsNamespace)
if err != nil {
m.Logger.Error(err, fmt.Sprintf("failed to migrate NV-IPAM config from ConfigMap, "+
"set %s env variable to disable migration", EnvDisableMigration))
return err
}
close(m.MigrationCh)
return nil
}

// Implements manager.NeedLeaderElection
func (m *Migrator) NeedLeaderElection() bool {
return m.LeaderElection
}

// Migrate reads the ConfigMap with the IPAM configuration, reads the allocations
// from the Nodes annotation, create IPPool CRs and delete the ConfigMap and annotations
func Migrate(ctx context.Context, c client.Client, poolNamespace string) error {
logger := logr.FromContextOrDiscard(ctx).WithName("migrator")
func Migrate(ctx context.Context, logger logr.Logger, c client.Client, poolNamespace string) error {
if os.Getenv(EnvDisableMigration) != "" {
logger.Info(fmt.Sprintf("%s set, skip controller migration", EnvDisableMigration))
return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/ipam-controller/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/config"
Expand Down Expand Up @@ -145,7 +146,7 @@ var _ = Describe("Controller Migrator", func() {
Expect(updateNode(node2))

By("Run migrator")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).NotTo(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).NotTo(HaveOccurred())

By("Verify Pool1 Spec")
pool1 := &ipamv1alpha1.IPPool{}
Expand Down Expand Up @@ -183,7 +184,7 @@ var _ = Describe("Controller Migrator", func() {

It("No ConfigMap", func() {
By("Run migrator")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).NotTo(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).NotTo(HaveOccurred())
})

Context("Negative flows", func() {
Expand All @@ -197,12 +198,12 @@ var _ = Describe("Controller Migrator", func() {
}
Expect(k8sClient.Create(ctx, cm)).NotTo(HaveOccurred())
By("Run migrator - should fail")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())

By("Create invalid cfg - not a json data")
updateConfigMap("{{")
By("Run migrator - should fail")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())

By("Create invalid cfg - Gateway not in subnet")
var inValidConfig = `
Expand All @@ -212,7 +213,7 @@ var _ = Describe("Controller Migrator", func() {
}
}`
updateConfigMap(inValidConfig)
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())

By("Create valid cfg - IPPool exists with different spec")
updateConfigMap(validConfig)
Expand All @@ -228,7 +229,7 @@ var _ = Describe("Controller Migrator", func() {
},
}
Expect(k8sClient.Create(ctx, pool1)).NotTo(HaveOccurred())
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())
})
})
})

0 comments on commit 11f148c

Please sign in to comment.