Skip to content

Commit

Permalink
fix: stop unexpected rollouts
Browse files Browse the repository at this point in the history
When making any node group changes, we were previously
making an entire change of all the cluster resources.  With
this change, we're instead simply making the small changes
in the node group which should avoid unexpected full rollouts
of node groups when changing min/max for autoscaling or
resizing a cluster.

Signed-off-by: Mohammed Naser <[email protected]>
  • Loading branch information
mnaser committed Mar 16, 2024
1 parent be7d18c commit e4bab1b
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 62 deletions.
4 changes: 3 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
{
devShell = pkgs.mkShell
{
# LD_LIBRARY_PATH = "${pkgs.stdenv.cc.cc.lib}/lib";
LD_LIBRARY_PATH = "${pkgs.stdenv.cc.cc.lib}/lib";

buildInputs = with pkgs; [
bashInteractive
glibcLocales
poetry
];
};
Expand Down
121 changes: 95 additions & 26 deletions magnum_cluster_api/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# under the License.


from __future__ import annotations

import keystoneauth1
from magnum import objects as magnum_objects
from magnum.conductor import scale_manager
Expand Down Expand Up @@ -239,9 +241,6 @@ def resize_cluster(
):
utils.validate_cluster(context, cluster)

if nodegroup is None:
nodegroup = cluster.default_ng_worker

if nodes_to_remove:
machines = objects.Machine.objects(self.k8s_api).filter(
namespace="magnum-system",
Expand All @@ -260,12 +259,7 @@ def resize_cluster(
] = "yes"
machine.update()

nodegroup.node_count = node_count
nodegroup.save()

resources.apply_cluster_from_magnum_cluster(
context, self.k8s_api, cluster, skip_auto_scaling_release=True
)
self._update_nodegroup(context, cluster, nodegroup)

@cluster_lock_wrapper
def upgrade_cluster(
Expand Down Expand Up @@ -341,10 +335,29 @@ def create_nodegroup(
nodegroup: magnum_objects.NodeGroup,
):
utils.validate_nodegroup(nodegroup, context)
resources.apply_cluster_from_magnum_cluster(
context, self.k8s_api, cluster, skip_auto_scaling_release=True

cluster_resource: objects.Cluster = objects.Cluster.objects(
self.k8s_api, namespace="magnum-system"
).get(name=cluster.stack_id)

cluster_resource.obj["spec"]["topology"]["workers"][
"machineDeployments"
].append(resources.mutate_machine_deployment(context, cluster, nodegroup))

current_generation = resources.Cluster(
context, self.k8s_api, cluster
).get_observed_generation()
cluster_resource.update()
self.wait_capi_cluster_reconciliation_start(
context, cluster, current_generation
)

nodegroup.status = fields.ClusterStatus.CREATE_IN_PROGRESS
nodegroup.save()

cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
cluster.save()

def update_nodegroup_status(
self,
context,
Expand Down Expand Up @@ -402,18 +415,53 @@ def update_nodegroup(
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
# TODO
self._update_nodegroup(context, cluster, nodegroup)

# NOTE(okozachenko1203): First we save the nodegroup status because update_cluster_status()
# could be finished before update_nodegroup().
nodegroup.save()
def _update_nodegroup(
self,
context,
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
utils.validate_nodegroup(nodegroup, context)
resources.apply_cluster_from_magnum_cluster(
context, self.k8s_api, cluster, skip_auto_scaling_release=True

cluster_resource: objects.Cluster = objects.Cluster.objects(
self.k8s_api, namespace="magnum-system"
).get(name=cluster.stack_id)

machine_deployment_index = None
for i, machine_deployment in enumerate(
cluster_resource.obj["spec"]["topology"]["workers"]["machineDeployments"]
):
if machine_deployment["name"] == nodegroup.name:
machine_deployment_index = i
break

if machine_deployment_index is not None:
machine_deployment = cluster_resource.obj["spec"]["topology"]["workers"][
"machineDeployments"
][machine_deployment_index]

cluster_resource.obj["spec"]["topology"]["workers"]["machineDeployments"][
machine_deployment_index
] = resources.mutate_machine_deployment(
context,
cluster,
nodegroup,
machine_deployment,
)

current_generation = resources.Cluster(
context, self.k8s_api, cluster
).get_observed_generation()
cluster_resource.update()
self.wait_capi_cluster_reconciliation_start(
context, cluster, current_generation
)
# NOTE(okozachenko1203): We set the cluster status as UPDATE_IN_PROGRESS again at the end because
# update_cluster_status() could be finished and cluster status has been set as
# UPDATE_COMPLETE before nodegroup_conductor.Handler.nodegroup_update finished.

nodegroup.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
nodegroup.save()

cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
cluster.save()

Expand All @@ -424,15 +472,36 @@ def delete_nodegroup(
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
cluster_resource: objects.Cluster = objects.Cluster.objects(
self.k8s_api, namespace="magnum-system"
).get(name=cluster.stack_id)

machine_deployment_index = None
for i, machine_deployment in enumerate(
cluster_resource.obj["spec"]["topology"]["workers"]["machineDeployments"]
):
if machine_deployment["name"] == nodegroup.name:
machine_deployment_index = i
break

if machine_deployment_index is not None:
del cluster_resource.obj["spec"]["topology"]["workers"][
"machineDeployments"
][machine_deployment_index]

current_generation = resources.Cluster(
context, self.k8s_api, cluster
).get_observed_generation()
cluster_resource.update()
self.wait_capi_cluster_reconciliation_start(
context, cluster, current_generation
)

nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
nodegroup.save()

resources.apply_cluster_from_magnum_cluster(
context,
self.k8s_api,
cluster,
skip_auto_scaling_release=True,
)
cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
cluster.save()

@cluster_lock_wrapper
def get_monitor(self, context, cluster: magnum_objects.Cluster):
Expand Down
109 changes: 74 additions & 35 deletions magnum_cluster_api/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1993,36 +1993,68 @@ def create_cluster_class(
ClusterClass(api).apply()


def generate_machine_deployments_for_cluster(
context: context.RequestContext, cluster: objects.Cluster
) -> list:
def mutate_machine_deployment(
context: context.RequestContext,
cluster: objects.Cluster,
node_group: magnum_objects.NodeGroup,
machine_deployment: dict = {},
):
"""
This function will either makes updates to machine deployment fields which
will not cause a rolling update or will return a new machine deployment
if none is provided.
"""

auto_scaling_enabled = utils.get_auto_scaling_enabled(cluster)

machine_deployments = []
for ng in cluster.nodegroups:
if ng.role == "master" or ng.status.startswith("DELETE"):
continue
machine_deployment.setdefault(
"metadata",
{
"annotations": {},
"labels": {},
},
)

# Node labels
machine_deployment["metadata"]["labels"] = {
f"node-role.kubernetes.io/{node_group.role}": "",
"node.cluster.x-k8s.io/nodegroup": node_group.name,
}

# Replicas (or min/max if auto-scaling is enabled)
if auto_scaling_enabled:
machine_deployment["replicas"] = None
machine_deployment["metadata"]["annotations"] = {
AUTOSCALE_ANNOTATION_MIN: str(
utils.get_node_group_min_node_count(node_group)
),
AUTOSCALE_ANNOTATION_MAX: str(
utils.get_node_group_max_node_count(context, node_group)
),
}
else:
machine_deployment["replicas"] = node_group.node_count
machine_deployment["metadata"]["annotations"] = {}

machine_deployment = {
# Fixes
machine_deployment["nodeVolumeDetachTimeout"] = (
CLUSTER_CLASS_NODE_VOLUME_DETACH_TIMEOUT
)

# Anything beyond this point will *NOT* be changed in the machine deployment
# for update operations (i.e. if the machine deployment already exists).
if machine_deployment.get("name") == node_group.name:
return machine_deployment

# At this point, this is all code that will be added for brand new machine
# deployments. We can bring any of this code into the above block if we
# want to change it for existing machine deployments.
machine_deployment.update(
{
"class": "default-worker",
"nodeVolumeDetachTimeout": CLUSTER_CLASS_NODE_VOLUME_DETACH_TIMEOUT,
"name": ng.name,
"metadata": {
"annotations": (
{
AUTOSCALE_ANNOTATION_MIN: f"{utils.get_node_group_min_node_count(ng)}", # noqa: E501
AUTOSCALE_ANNOTATION_MAX: f"{utils.get_node_group_max_node_count(context, ng)}", # noqa: E501
}
if auto_scaling_enabled
else {}
),
"labels": {
f"node-role.kubernetes.io/{ng.role}": "",
"node.cluster.x-k8s.io/nodegroup": ng.name,
},
},
"name": node_group.name,
"failureDomain": utils.get_node_group_label(
context, ng, "availability_zone", ""
context, node_group, "availability_zone", ""
),
"machineHealthCheck": {
"enable": utils.get_cluster_label_as_bool(
Expand All @@ -2036,45 +2068,52 @@ def generate_machine_deployments_for_cluster(
"value": {
"size": utils.get_node_group_label_as_int(
context,
ng,
node_group,
"boot_volume_size",
CONF.cinder.default_boot_volume_size,
),
"type": utils.get_node_group_label(
context,
ng,
node_group,
"boot_volume_type",
cinder.get_default_boot_volume_type(context),
),
},
},
{
"name": "flavor",
"value": ng.flavor_id,
"value": node_group.flavor_id,
},
{
"name": "imageRepository",
"value": utils.get_node_group_label(
context,
ng,
node_group,
"container_infra_prefix",
"",
),
},
{
"name": "imageUUID",
"value": utils.get_image_uuid(ng.image_id, context),
"value": utils.get_image_uuid(node_group.image_id, context),
},
],
},
}
)

return machine_deployment

# NOTE(mnaser): In order for auto-scaler and cluster class reconcilers
# to not fight each other, we only set replicas if the
# auto-scaler is disabled.
if not auto_scaling_enabled:
machine_deployment["replicas"] = ng.node_count

def generate_machine_deployments_for_cluster(
context: context.RequestContext, cluster: objects.Cluster
) -> list:
machine_deployments = []
for ng in cluster.nodegroups:
if ng.role == "master" or ng.status.startswith("DELETE"):
continue

machine_deployment = mutate_machine_deployment(context, cluster, ng)
machine_deployments.append(machine_deployment)

return machine_deployments
Expand Down
25 changes: 25 additions & 0 deletions magnum_cluster_api/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import pytest
from magnum.common import context as magnum_context

from magnum_cluster_api import driver


@pytest.fixture
def context():
Expand All @@ -26,3 +28,26 @@ def context():
user_id="fake_user",
is_admin=False,
)


@pytest.fixture(scope="session")
def mock_pykube(session_mocker):
session_mocker.patch("pykube.KubeConfig")
session_mocker.patch("pykube.HTTPClient")


@pytest.fixture(scope="session")
def mock_cluster_lock(session_mocker):
session_mocker.patch("kubernetes.config.load_config")
session_mocker.patch("magnum_cluster_api.sync.ClusterLock.acquire")
session_mocker.patch("magnum_cluster_api.sync.ClusterLock.release")


@pytest.fixture(scope="session")
def mock_validate_nodegroup(session_mocker):
session_mocker.patch("magnum_cluster_api.utils.validate_nodegroup")


@pytest.fixture()
def ubuntu_driver(mock_cluster_lock, mock_pykube):
yield driver.UbuntuDriver()
Loading

0 comments on commit e4bab1b

Please sign in to comment.