Skip to content

Commit

Permalink
✨Autoscaling: automatically cleanup nodes from the docker swarm (#3617)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Nov 30, 2022
1 parent 9152bf8 commit 936bac0
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 37 deletions.
1 change: 0 additions & 1 deletion services/autoscaling/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,3 @@ pytest-mock
pytest-runner
python-dotenv
respx
types-aiobotocore[s3] # s3 storage
19 changes: 0 additions & 19 deletions services/autoscaling/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ botocore==1.27.59
# boto3
# moto
# s3transfer
botocore-stubs==1.29.16
# via
# -c requirements/_base.txt
# types-aiobotocore
certifi==2022.9.24
# via
# -c requirements/_base.txt
Expand Down Expand Up @@ -279,23 +275,8 @@ tomli==2.0.1
# via
# coverage
# pytest
types-aiobotocore==2.4.0.post1
# via
# -c requirements/_base.txt
# -r requirements/_test.in
types-aiobotocore-s3==2.4.0.post1
# via types-aiobotocore
types-awscrt==0.15.3
# via
# -c requirements/_base.txt
# botocore-stubs
types-toml==0.10.8.1
# via responses
typing-extensions==4.4.0
# via
# -c requirements/_base.txt
# types-aiobotocore
# types-aiobotocore-s3
urllib3==1.26.13
# via
# -c requirements/../../../requirements/constraints.txt
Expand Down
1 change: 0 additions & 1 deletion services/autoscaling/requirements/_tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ tomlkit==0.11.6
typing-extensions==4.4.0
# via
# -c requirements/_base.txt
# -c requirements/_test.txt
# astroid
# black
# pylint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ async def check_dynamic_resources(app: FastAPI) -> None:
"""
app_settings: ApplicationSettings = app.state.settings
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec

# 1. get monitored nodes information and resources
monitored_nodes = await utils_docker.get_monitored_nodes(
node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS
)

cluster_total_resources = await utils_docker.compute_cluster_total_resources(
monitored_nodes
)
logger.info("%s", f"{cluster_total_resources=}")
cluster_used_resources = await utils_docker.compute_cluster_used_resources(
monitored_nodes
)
logger.info("%s", f"{cluster_used_resources=}")

# 2. Remove nodes that are gone
await utils_docker.remove_monitored_down_nodes(monitored_nodes)

# 3. Scale up nodes if there are pending tasks
pending_tasks = await utils_docker.pending_service_tasks_with_insufficient_resources(
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS
)
Expand All @@ -37,19 +56,6 @@ async def check_dynamic_resources(app: FastAPI) -> None:
f"{app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS}",
)

monitored_nodes = await utils_docker.get_monitored_nodes(
node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS
)

cluster_total_resources = await utils_docker.compute_cluster_total_resources(
monitored_nodes
)
logger.info("current %s", f"{cluster_total_resources=}")
cluster_used_resources = await utils_docker.compute_cluster_used_resources(
monitored_nodes
)
logger.info("current %s", f"{cluster_used_resources=}")

assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

import aiodocker
from models_library.docker import DockerLabelKey
from models_library.generated_models.docker_rest_api import Node, Task, TaskState
from models_library.generated_models.docker_rest_api import (
Node,
NodeState,
Task,
TaskState,
)
from pydantic import ByteSize, parse_obj_as
from servicelib.logging_utils import log_context
from servicelib.utils import logged_gather
Expand Down Expand Up @@ -46,6 +51,32 @@ async def get_monitored_nodes(node_labels: list[DockerLabelKey]) -> list[Node]:
return nodes


async def remove_monitored_down_nodes(nodes: list[Node]) -> list[Node]:
"""removes docker nodes that are in the down state"""

def _check_if_node_is_removable(node: Node) -> bool:
if node.Status and node.Status.State:
return node.Status.State in [
NodeState.down,
NodeState.disconnected,
NodeState.unknown,
]
logger.warning(
"%s has no Status/State! This is unexpected and shall be checked",
f"{node=}",
)
# we do not remove a node that has a weird state, let it be done by someone smarter.
return False

nodes_that_need_removal = [n for n in nodes if _check_if_node_is_removable(n)]
async with aiodocker.Docker() as docker:
for node in nodes_that_need_removal:
assert node.ID # nosec
with log_context(logger, logging.INFO, msg=f"remove {node.ID=}"):
await docker.nodes.remove(node_id=node.ID)
return nodes_that_need_removal


async def pending_service_tasks_with_insufficient_resources(
service_labels: list[DockerLabelKey],
) -> list[Task]:
Expand Down
4 changes: 3 additions & 1 deletion services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async def _creator(
return service

yield _creator

await asyncio.gather(
*(async_docker_client.services.delete(s["ID"]) for s in created_services)
)
Expand All @@ -244,9 +245,10 @@ async def _check_service_task_gone(service: Mapping[str, Any]) -> None:
f"--> checking if service {service['ID']}:{service['Spec']['Name']} is really gone..."
)
assert not await async_docker_client.containers.list(
all=True,
filters={
"label": [f"com.docker.swarm.service.id={service['ID']}"],
}
},
)
print(f"<-- service {service['ID']}:{service['Spec']['Name']} is gone.")

Expand Down
51 changes: 50 additions & 1 deletion services/autoscaling/tests/unit/test_utils_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
import pytest
from deepdiff import DeepDiff
from faker import Faker
from models_library.generated_models.docker_rest_api import Availability, Task
from models_library.generated_models.docker_rest_api import (
Availability,
NodeState,
Task,
)
from pydantic import ByteSize, parse_obj_as
from pytest_mock.plugin import MockerFixture
from simcore_service_autoscaling.models import Resources
from simcore_service_autoscaling.utils_docker import (
Node,
Expand All @@ -23,6 +28,7 @@
get_max_resources_from_docker_task,
get_monitored_nodes,
pending_service_tasks_with_insufficient_resources,
remove_monitored_down_nodes,
tag_node,
wait_for_node,
)
Expand Down Expand Up @@ -122,6 +128,49 @@ async def test_get_monitored_nodes_with_valid_label(
)


async def test_remove_monitored_down_nodes_with_empty_list_does_nothing():
assert await remove_monitored_down_nodes([]) == []


async def test_remove_monitored_down_nodes_of_non_down_node_does_nothing(
host_node: Node,
):
assert await remove_monitored_down_nodes([host_node]) == []


@pytest.fixture
def fake_docker_node(host_node: Node, faker: Faker) -> Node:
fake_node = host_node.copy(deep=True)
fake_node.ID = faker.uuid4()
assert (
host_node.ID != fake_node.ID
), "this should never happen, or you are really unlucky"
return fake_node


async def test_remove_monitored_down_nodes_of_down_node(
fake_docker_node: Node, mocker: MockerFixture
):
mocked_aiodocker = mocker.patch("aiodocker.Docker", autospec=True)
assert fake_docker_node.Status
fake_docker_node.Status.State = NodeState.down
assert fake_docker_node.Status.State == NodeState.down
assert await remove_monitored_down_nodes([fake_docker_node]) == [fake_docker_node]
# NOTE: this is the same as calling with aiodocker.Docker() as docker: docker.nodes.remove()
mocked_aiodocker.return_value.__aenter__.return_value.nodes.remove.assert_called_once_with(
node_id=fake_docker_node.ID
)


async def test_remove_monitored_down_node_with_unexpected_state_does_nothing(
fake_docker_node: Node,
):
assert fake_docker_node.Status
fake_docker_node.Status = None
assert not fake_docker_node.Status
assert await remove_monitored_down_nodes([fake_docker_node]) == []


async def test_pending_service_task_with_insufficient_resources_with_no_service(
host_node: Node,
):
Expand Down

0 comments on commit 936bac0

Please sign in to comment.