Skip to content

Commit

Permalink
moving ec2 client to its own module
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 6, 2022
1 parent a1876ab commit 35fa357
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ._meta import VERSION
from .core.errors import Ec2InstanceNotFoundError
from .core.settings import ApplicationSettings
from .modules.ec2 import get_ec2_client
from .utils import rabbitmq

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,8 +62,9 @@ async def check_dynamic_resources(app: FastAPI) -> None:

assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
ec2_client = get_ec2_client(app)
list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES
ec2_client, app_settings.AUTOSCALING_EC2_INSTANCES
)

for task in pending_tasks:
Expand All @@ -85,7 +87,7 @@ async def check_dynamic_resources(app: FastAPI) -> None:

logger.debug("%s", f"{ec2_instances_needed[0]=}")
new_instance_dns_name = await utils_aws.start_aws_instance(
app_settings.AUTOSCALING_EC2_ACCESS,
ec2_client,
app_settings.AUTOSCALING_EC2_INSTANCES,
instance_type=parse_obj_as(
InstanceTypeType, ec2_instances_needed[0].name
Expand Down
Empty file.
37 changes: 22 additions & 15 deletions services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@


@dataclass
class AutoscalingEC2Client:
session: aioboto3.Session
class AutoscalingEC2:
client: EC2Client
session: aioboto3.Session
exit_stack: contextlib.AsyncExitStack


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.ec2_client = None
settings: Optional[EC2Settings] = app.state.settings.EC2Settings
if not settings:
logger.warning("EC2 client is de-activated in the settings")
return

@classmethod
async def create(cls, settings: EC2Settings) -> "AutoscalingEC2":
session = aioboto3.Session()
session_client = session.client(
"ec2",
Expand All @@ -41,20 +35,33 @@ async def on_startup() -> None:
ec2_client = cast(
EC2Client, await exit_stack.enter_async_context(session_client)
)
return cls(ec2_client, session, exit_stack)

async def close(self) -> None:
await self.exit_stack.aclose()


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.ec2_client = None
settings: Optional[EC2Settings] = app.state.settings.EC2Settings
if not settings:
logger.warning("EC2 client is de-activated in the settings")
return

app.state.ec2_client = ec2_client
app.state.ec2_client = await AutoscalingEC2.create(settings)

async def on_shutdown() -> None:
if app.state.ec2_client:
await app.state.ec2_client.close()
await cast(AutoscalingEC2, app.state.ec2_client).close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_ec2_client(app: FastAPI) -> RabbitMQClient:
def get_ec2_client(app: FastAPI) -> EC2Client:
if not app.state.ec2_client:
raise ConfigurationError(
msg="EC2 client is not available. Please check the configuration."
)
return cast(RabbitMQClient, app.state.ec2_client)
return cast(AutoscalingEC2, app.state.ec2_client).client
176 changes: 77 additions & 99 deletions services/autoscaling/src/simcore_service_autoscaling/utils_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import logging
from collections import OrderedDict
from textwrap import dedent
from typing import AsyncIterator, Callable, cast
from typing import Callable, cast

import aioboto3
from pydantic import ByteSize, parse_obj_as
from servicelib.logging_utils import log_context
from types_aiobotocore_ec2 import EC2Client
Expand All @@ -20,39 +19,21 @@
Ec2InstanceNotFoundError,
Ec2TooManyInstancesError,
)
from .core.settings import EC2InstancesSettings, EC2Settings
from .core.settings import EC2InstancesSettings
from .models import EC2Instance, Resources

logger = logging.getLogger(__name__)


@contextlib.asynccontextmanager
async def ec2_client(settings: EC2Settings) -> AsyncIterator[EC2Client]:
ec2 = None
try:
session = aioboto3.Session()
async with session.client(
"ec2",
endpoint_url=settings.EC2_ENDPOINT,
aws_access_key_id=settings.EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY,
region_name=settings.EC2_REGION_NAME,
) as ec2:
yield ec2
finally:
if ec2:
await ec2.close()


async def get_ec2_instance_capabilities(
settings: EC2Settings, instance_settings: EC2InstancesSettings
ec2_client: EC2Client,
instance_settings: EC2InstancesSettings,
) -> list[EC2Instance]:
async with ec2_client(settings) as ec2:
instance_types = await ec2.describe_instance_types(
InstanceTypes=cast(
list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES
)
instance_types = await ec2_client.describe_instance_types(
InstanceTypes=cast(
list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES
)
)

list_instances: list[EC2Instance] = []
for instance in instance_types.get("InstanceTypes", []):
Expand Down Expand Up @@ -127,7 +108,7 @@ def _is_ec2_instance_running(instance: ReservationTypeDef):


async def start_aws_instance(
settings: EC2Settings,
ec2_client: EC2Client,
instance_settings: EC2InstancesSettings,
instance_type: InstanceTypeType,
tags: dict[str, str],
Expand All @@ -138,76 +119,73 @@ async def start_aws_instance(
logging.INFO,
msg=f"launching AWS instance {instance_type} with {tags=}",
):
async with ec2_client(settings) as client:
# first check the max amount is not already reached
if current_instances := await client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys()
]
# first check the max amount is not already reached
if current_instances := await ec2_client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys()
]
):
if (
len(current_instances.get("Reservations", []))
>= instance_settings.EC2_INSTANCES_MAX_INSTANCES
) and all(
_is_ec2_instance_running(instance)
for instance in current_instances["Reservations"]
):
if (
len(current_instances.get("Reservations", []))
>= instance_settings.EC2_INSTANCES_MAX_INSTANCES
) and all(
_is_ec2_instance_running(instance)
for instance in current_instances["Reservations"]
):
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
)

instances = await client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
MinCount=1,
MaxCount=1,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID,
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in tags.items()
],
}
],
UserData=_compose_user_data(startup_script),
SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
)
instance_id = instances["Instances"][0]["InstanceId"]
logger.info(
"New instance launched: %s, waiting for it to start now...", instance_id
)
# wait for the instance to be in a running state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=[instance_id])
logger.info(
"instance %s exists now, waiting for running state...", instance_id
)

waiter = client.get_waiter("instance_running")
await waiter.wait(InstanceIds=[instance_id])
logger.info("instance %s is now running", instance_id)

# NOTE: this is currently disactivated as this makes starting an instance
# take between 2-4 minutes more and it seems to be responsive much before
# nevertheless if we get weird errors, this should be activated again!

# waiter = client.get_waiter("instance_status_ok")
# await waiter.wait(InstanceIds=[instance_id])
# logger.info("instance %s status is OK...", instance_id)
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
)

# get the private IP
instances = await client.describe_instances(InstanceIds=[instance_id])
private_dns_name: str = instances["Reservations"][0]["Instances"][0][
"PrivateDnsName"
]
logger.info(
"instance %s is available on %s, happy computing!!",
instance_id,
private_dns_name,
)
return private_dns_name
instances = await ec2_client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
MinCount=1,
MaxCount=1,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID,
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in tags.items()
],
}
],
UserData=_compose_user_data(startup_script),
SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
)
instance_id = instances["Instances"][0]["InstanceId"]
logger.info(
"New instance launched: %s, waiting for it to start now...", instance_id
)
# wait for the instance to be in a running state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = ec2_client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=[instance_id])
logger.info("instance %s exists now, waiting for running state...", instance_id)

waiter = ec2_client.get_waiter("instance_running")
await waiter.wait(InstanceIds=[instance_id])
logger.info("instance %s is now running", instance_id)

# NOTE: this is currently disactivated as this makes starting an instance
# take between 2-4 minutes more and it seems to be responsive much before
# nevertheless if we get weird errors, this should be activated again!

# waiter = client.get_waiter("instance_status_ok")
# await waiter.wait(InstanceIds=[instance_id])
# logger.info("instance %s status is OK...", instance_id)

# get the private IP
instances = await ec2_client.describe_instances(InstanceIds=[instance_id])
private_dns_name: str = instances["Reservations"][0]["Instances"][0][
"PrivateDnsName"
]
logger.info(
"instance %s is available on %s, happy computing!!",
instance_id,
private_dns_name,
)
return private_dns_name
12 changes: 8 additions & 4 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
from simcore_service_autoscaling.core.application import create_app
from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings
from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys
from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2
from simcore_service_autoscaling.utils_aws import EC2Client
from simcore_service_autoscaling.utils_aws import ec2_client as autoscaling_ec2_client
from tenacity import retry
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
Expand Down Expand Up @@ -512,10 +512,14 @@ async def aws_ami_id(


@pytest.fixture
async def ec2_client() -> AsyncIterator[EC2Client]:
async def ec2_client(
app_environment: EnvVarsDict,
) -> AsyncIterator[EC2Client]:
settings = EC2Settings.create_from_envs()
async with autoscaling_ec2_client(settings) as client:
yield client
ec2 = await AutoscalingEC2.create(settings)
assert ec2
yield ec2.client
await ec2.close()


@pytest.fixture
Expand Down
41 changes: 41 additions & 0 deletions services/autoscaling/tests/unit/test_ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=unused-variable

import botocore.exceptions
import pytest
from pytest_simcore.helpers.utils_envs import EnvVarsDict
from simcore_service_autoscaling.core.settings import EC2Settings
from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2
from types_aiobotocore_ec2 import EC2Client


@pytest.fixture
def ec2_settings(
app_environment: EnvVarsDict,
) -> EC2Settings:
return EC2Settings.create_from_envs()


async def test_ec2_client_lifespan(ec2_settings: EC2Settings):
ec2 = await AutoscalingEC2.create(settings=ec2_settings)
assert ec2
assert ec2.client
assert ec2.exit_stack
assert ec2.session

await ec2.close()


async def test_ec2_client_raises_when_no_connection_available(ec2_client: EC2Client):
with pytest.raises(
botocore.exceptions.ClientError, match=r".+ AWS was not able to validate .+"
):
await ec2_client.describe_account_attributes(DryRun=True)


async def test_ec2_client_with_mock_server(
mocked_aws_server_envs: None, ec2_client: EC2Client
):
# passes without exception
await ec2_client.describe_account_attributes(DryRun=True)
Loading

0 comments on commit 35fa357

Please sign in to comment.