Skip to content

Commit

Permalink
Merge pull request #9974 from olemarkus/cleanup-rolling-update-ctx
Browse files Browse the repository at this point in the history
Set ctx and cluster on the rolling update struct instead of passing it around everywhere
  • Loading branch information
k8s-ci-robot authored Sep 27, 2020
2 parents 2d63763 + 63f1332 commit 257f859
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 148 deletions.
3 changes: 2 additions & 1 deletion cmd/kops/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
}

d := &instancegroups.RollingUpdateCluster{
Ctx: ctx,
MasterInterval: 0,
NodeInterval: 0,
BastionInterval: 0,
Expand Down Expand Up @@ -270,7 +271,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
}
d.ClusterValidator = clusterValidator

return d.UpdateSingleInstance(ctx, cloudMember, options.Surge)
return d.UpdateSingleInstance(cloudMember, options.Surge)
}

func deleteNodeMatch(cloudMember *cloudinstances.CloudInstance, options *deleteInstanceOptions) bool {
Expand Down
6 changes: 4 additions & 2 deletions cmd/kops/rollingupdatecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
}

d := &instancegroups.RollingUpdateCluster{
Ctx: ctx,
Cluster: cluster,
MasterInterval: options.MasterInterval,
NodeInterval: options.NodeInterval,
BastionInterval: options.BastionInterval,
Expand All @@ -347,7 +349,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
ValidateSuccessDuration: 10 * time.Second,
}

err = d.AdjustNeedUpdate(groups, cluster, list)
err = d.AdjustNeedUpdate(groups, list)
if err != nil {
return err
}
Expand Down Expand Up @@ -430,5 +432,5 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
}
d.ClusterValidator = clusterValidator

return d.RollingUpdate(ctx, groups, cluster, list)
return d.RollingUpdate(groups, list)
}
28 changes: 14 additions & 14 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func promptInteractive(upgradedHostID, upgradedHostName string) (stopPrompting b
}

// RollingUpdate performs a rolling update on a list of instances.
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, cluster *api.Cluster, group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
isBastion := group.InstanceGroup.IsBastion()
// Do not need a k8s client if you are doing cloudonly.
if c.K8sClient == nil && !c.CloudOnly {
Expand All @@ -94,13 +94,13 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
}

if !c.CloudOnly {
err = c.taintAllNeedUpdate(ctx, group, update)
err = c.taintAllNeedUpdate(group, update)
if err != nil {
return err
}
}

settings := resolveSettings(cluster, group.InstanceGroup, numInstances)
settings := resolveSettings(c.Cluster, group.InstanceGroup, numInstances)

runningDrains := 0
maxSurge := settings.MaxSurge.IntValue()
Expand Down Expand Up @@ -162,7 +162,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c

for uIdx, u := range update {
go func(m *cloudinstances.CloudInstance) {
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
terminateChan <- c.drainTerminateAndWait(m, sleepAfterTerminate)
}(u)
runningDrains++

Expand Down Expand Up @@ -260,7 +260,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
return err
}

func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
Expand All @@ -282,7 +282,7 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cl
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(ctx, n); err != nil {
if err := c.patchTaint(n); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
}
Expand All @@ -293,7 +293,7 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cl
return nil
}

func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
Expand All @@ -314,14 +314,14 @@ func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = c.K8sClient.CoreV1().Nodes().Patch(c.Ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
instanceID := u.ID

nodeName := ""
Expand Down Expand Up @@ -360,7 +360,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *clo
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceID)
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := c.deleteNode(ctx, u.Node); err != nil {
if err := c.deleteNode(u.Node); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
}
}
Expand Down Expand Up @@ -551,9 +551,9 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
}

// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
err := c.K8sClient.CoreV1().Nodes().Delete(c.Ctx, node.Name, options)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -566,7 +566,7 @@ func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node
}

// UpdateSingeInstance performs a rolling update on a single instance
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
func (c *RollingUpdateCluster) UpdateSingleInstance(cloudMember *cloudinstances.CloudInstance, detach bool) error {
if detach {
if cloudMember.CloudInstanceGroup.InstanceGroup.IsMaster() {
klog.Warning("cannot detach master instances. Assuming --surge=false")
Expand All @@ -581,5 +581,5 @@ func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMe
}
}

return c.drainTerminateAndWait(ctx, cloudMember, 0)
return c.drainTerminateAndWait(cloudMember, 0)
}
14 changes: 8 additions & 6 deletions pkg/instancegroups/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (

// RollingUpdateCluster is a struct containing cluster information for a rolling update.
type RollingUpdateCluster struct {
Cloud fi.Cloud
Ctx context.Context
Cluster *api.Cluster
Cloud fi.Cloud

// MasterInterval is the amount of time to wait after stopping a master instance
MasterInterval time.Duration
Expand Down Expand Up @@ -75,7 +77,7 @@ type RollingUpdateCluster struct {
}

// AdjustNeedUpdate adjusts the set of instances that need updating, using factors outside those known by the cloud implementation
func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, cluster *api.Cluster, instanceGroups *api.InstanceGroupList) error {
func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
for _, group := range groups {
if group.Ready != nil {
var newReady []*cloudinstances.CloudInstance
Expand All @@ -101,7 +103,7 @@ func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstance
}

// RollingUpdate performs a rolling update on a K8s Cluster.
func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[string]*cloudinstances.CloudInstanceGroup, cluster *api.Cluster, instanceGroups *api.InstanceGroupList) error {
func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
if len(groups) == 0 {
klog.Info("Cloud Instance Group length is zero. Not doing a rolling-update.")
return nil
Expand Down Expand Up @@ -139,7 +141,7 @@ func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[str

defer wg.Done()

err := c.rollingUpdateInstanceGroup(ctx, cluster, bastionGroups[k], c.BastionInterval)
err := c.rollingUpdateInstanceGroup(bastionGroups[k], c.BastionInterval)

resultsMutex.Lock()
results[k] = err
Expand All @@ -164,7 +166,7 @@ func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[str
// and we don't want to roll all the masters at the same time. See issue #284

for _, k := range sortGroups(masterGroups) {
err := c.rollingUpdateInstanceGroup(ctx, cluster, masterGroups[k], c.MasterInterval)
err := c.rollingUpdateInstanceGroup(masterGroups[k], c.MasterInterval)

// Do not continue update if master(s) failed, cluster is potentially in an unhealthy state
if err != nil {
Expand All @@ -186,7 +188,7 @@ func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[str
}

for _, k := range sortGroups(nodeGroups) {
err := c.rollingUpdateInstanceGroup(ctx, cluster, nodeGroups[k], c.NodeInterval)
err := c.rollingUpdateInstanceGroup(nodeGroups[k], c.NodeInterval)

results[k] = err

Expand Down
Loading

0 comments on commit 257f859

Please sign in to comment.