Skip to content

Commit

Permalink
wait for node unjoined (#35)
Browse files Browse the repository at this point in the history
* wait for node unjoined

* better tests

* logic fix

* Update rollingupgrade_controller.go
  • Loading branch information
eytan-avisror authored and shrinandj committed Dec 10, 2019
1 parent 96e2f50 commit a7662f6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
40 changes: 40 additions & 0 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/go-logr/logr"
log "github.com/keikoproj/upgrade-manager/pkg/log"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -76,6 +77,13 @@ const (
ShellBinary = "/bin/sh"
)

var (
// TerminationTimeoutSeconds is the timeout threshold for waiting for a node object unjoin
TerminationTimeoutSeconds = 3600
// TerminationSleepIntervalSeconds is the polling interval for checking if a node object is unjoined
TerminationSleepIntervalSeconds = 30
)

var DefaultRetryer = awsclient.DefaultRetryer{
NumMaxRetries: 250,
MinThrottleDelay: time.Second * 5,
Expand Down Expand Up @@ -222,6 +230,28 @@ func (r *RollingUpgradeReconciler) CallKubectlDrain(ctx context.Context, nodeNam
errChan <- nil
}

func (r *RollingUpgradeReconciler) WaitForTermination(nodeName string, nodeInterface v1.NodeInterface) (bool, error) {
var (
started = time.Now()
)
for {
if time.Since(started) >= (time.Second * time.Duration(TerminationTimeoutSeconds)) {
log.Println("WaitForTermination timed out while waiting for node to unjoin")
return false, nil
}

_, err := nodeInterface.Get(nodeName, metav1.GetOptions{})
if v1errors.IsNotFound(err) {
log.Printf("node %s is unjoined from cluster, upgrade will proceed", nodeName)
break
}

log.Printf("node %s is still joined to clutster, will wait %vs and retry", nodeName, TerminationSleepIntervalSeconds)
time.Sleep(time.Duration(TerminationSleepIntervalSeconds) * time.Second)
}
return true, nil
}

// TerminateNode actually terminates the given node.
func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, svc autoscalingiface.AutoScalingAPI) error {

Expand Down Expand Up @@ -761,6 +791,16 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context,
return
}

unjoined, err := r.WaitForTermination(nodeName, r.generatedClient.CoreV1().Nodes())
if err != nil {
ch <- err
return
}

if !unjoined {
log.Warnf("termination waiter completed but %s is still joined, will proceed with upgrade", nodeName)
}

ruObj.Status.NodesProcessed = ruObj.Status.NodesProcessed + 1
r.Update(*ctx, ruObj)

Expand Down
39 changes: 39 additions & 0 deletions controllers/rollingupgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
Expand Down Expand Up @@ -1928,6 +1929,44 @@ func TestRunRestackNoNodeInAsg(t *testing.T) {
g.Expect(err).To(gomega.BeNil())
}

func TestWaitForTermination(t *testing.T) {
g := gomega.NewGomegaWithT(t)

TerminationTimeoutSeconds = 1
TerminationSleepIntervalSeconds = 1

mockNodeName := "node-123"
mockNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: mockNodeName,
},
}
kuberenetesClient := fake.NewSimpleClientset()
nodeInterface := kuberenetesClient.CoreV1().Nodes()

mgr, err := manager.New(cfg, manager.Options{})
g.Expect(err).NotTo(gomega.HaveOccurred())

rcRollingUpgrade := &RollingUpgradeReconciler{
Client: mgr.GetClient(),
generatedClient: kubernetes.NewForConfigOrDie(mgr.GetConfig()),
admissionMap: sync.Map{},
ruObjNameToASG: sync.Map{},
ClusterState: NewClusterState(),
}
nodeInterface.Create(mockNode)

unjoined, err := rcRollingUpgrade.WaitForTermination(mockNodeName, nodeInterface)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(unjoined).To(gomega.BeFalse())

nodeInterface.Delete(mockNodeName, &metav1.DeleteOptions{})

unjoined, err = rcRollingUpgrade.WaitForTermination(mockNodeName, nodeInterface)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(unjoined).To(gomega.BeTrue())
}

func TestRunRestackWithNodesLessThanMaxUnavailable(t *testing.T) {
g := gomega.NewGomegaWithT(t)

Expand Down

0 comments on commit a7662f6

Please sign in to comment.