Skip to content

Commit

Permalink
Organize EC2 classes in Amazon provider (#20157)
Browse files Browse the repository at this point in the history
* Organize EC2 classes in Amazon provider
  • Loading branch information
eladkal authored Dec 9, 2021
1 parent 0d369bd commit 985bb06
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 204 deletions.
115 changes: 115 additions & 0 deletions airflow/providers/amazon/aws/operators/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from typing import Optional

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook


class EC2StartInstanceOperator(BaseOperator):
"""
Start AWS EC2 instance using boto3.
:param instance_id: id of the AWS EC2 instance
:type instance_id: str
:param aws_conn_id: aws connection to use
:type aws_conn_id: str
:param region_name: (optional) aws region name associated with the client
:type region_name: Optional[str]
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
"""

template_fields = ("instance_id", "region_name")
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: Optional[str] = None,
check_interval: float = 15,
**kwargs,
):
super().__init__(**kwargs)
self.instance_id = instance_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.check_interval = check_interval

def execute(self, context):
ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
self.log.info("Starting EC2 instance %s", self.instance_id)
instance = ec2_hook.get_instance(instance_id=self.instance_id)
instance.start()
ec2_hook.wait_for_state(
instance_id=self.instance_id,
target_state="running",
check_interval=self.check_interval,
)


class EC2StopInstanceOperator(BaseOperator):
"""
Stop AWS EC2 instance using boto3.
:param instance_id: id of the AWS EC2 instance
:type instance_id: str
:param aws_conn_id: aws connection to use
:type aws_conn_id: str
:param region_name: (optional) aws region name associated with the client
:type region_name: Optional[str]
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
"""

template_fields = ("instance_id", "region_name")
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: Optional[str] = None,
check_interval: float = 15,
**kwargs,
):
super().__init__(**kwargs)
self.instance_id = instance_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.check_interval = check_interval

def execute(self, context):
ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
self.log.info("Stopping EC2 instance %s", self.instance_id)
instance = ec2_hook.get_instance(instance_id=self.instance_id)
instance.stop()
ec2_hook.wait_for_state(
instance_id=self.instance_id,
target_state="stopped",
check_interval=self.check_interval,
)
56 changes: 8 additions & 48 deletions airflow/providers/amazon/aws/operators/ec2_start_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,14 @@
# specific language governing permissions and limitations
# under the License.
#
"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.ec2`."""

from typing import Optional
import warnings

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
from airflow.providers.amazon.aws.operators.ec2 import EC2StartInstanceOperator # noqa


class EC2StartInstanceOperator(BaseOperator):
"""
Start AWS EC2 instance using boto3.
:param instance_id: id of the AWS EC2 instance
:type instance_id: str
:param aws_conn_id: aws connection to use
:type aws_conn_id: str
:param region_name: (optional) aws region name associated with the client
:type region_name: Optional[str]
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
"""

template_fields = ("instance_id", "region_name")
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: Optional[str] = None,
check_interval: float = 15,
**kwargs,
):
super().__init__(**kwargs)
self.instance_id = instance_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.check_interval = check_interval

def execute(self, context):
ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
self.log.info("Starting EC2 instance %s", self.instance_id)
instance = ec2_hook.get_instance(instance_id=self.instance_id)
instance.start()
ec2_hook.wait_for_state(
instance_id=self.instance_id,
target_state="running",
check_interval=self.check_interval,
)
warnings.warn(
"This module is deprecated. Please use `airflow.providers.amazon.aws.operators.ec2`.",
DeprecationWarning,
stacklevel=2,
)
56 changes: 8 additions & 48 deletions airflow/providers/amazon/aws/operators/ec2_stop_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,14 @@
# specific language governing permissions and limitations
# under the License.
#
"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.ec2`."""

from typing import Optional
import warnings

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
from airflow.providers.amazon.aws.operators.ec2 import EC2StopInstanceOperator # noqa


class EC2StopInstanceOperator(BaseOperator):
"""
Stop AWS EC2 instance using boto3.
:param instance_id: id of the AWS EC2 instance
:type instance_id: str
:param aws_conn_id: aws connection to use
:type aws_conn_id: str
:param region_name: (optional) aws region name associated with the client
:type region_name: Optional[str]
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
"""

template_fields = ("instance_id", "region_name")
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: Optional[str] = None,
check_interval: float = 15,
**kwargs,
):
super().__init__(**kwargs)
self.instance_id = instance_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.check_interval = check_interval

def execute(self, context):
ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
self.log.info("Stopping EC2 instance %s", self.instance_id)
instance = ec2_hook.get_instance(instance_id=self.instance_id)
instance.stop()
ec2_hook.wait_for_state(
instance_id=self.instance_id,
target_state="stopped",
check_interval=self.check_interval,
)
warnings.warn(
"This module is deprecated. Please use `airflow.providers.amazon.aws.operators.ec2`.",
DeprecationWarning,
stacklevel=2,
)
65 changes: 65 additions & 0 deletions airflow/providers/amazon/aws/sensors/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from typing import Optional

from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
from airflow.sensors.base import BaseSensorOperator


class EC2InstanceStateSensor(BaseSensorOperator):
"""
Check the state of the AWS EC2 instance until
state of the instance become equal to the target state.
:param target_state: target state of instance
:type target_state: str
:param instance_id: id of the AWS EC2 instance
:type instance_id: str
:param region_name: (optional) aws region name associated with the client
:type region_name: Optional[str]
"""

template_fields = ("target_state", "instance_id", "region_name")
ui_color = "#cc8811"
ui_fgcolor = "#ffffff"
valid_states = ["running", "stopped", "terminated"]

def __init__(
self,
*,
target_state: str,
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: Optional[str] = None,
**kwargs,
):
if target_state not in self.valid_states:
raise ValueError(f"Invalid target_state: {target_state}")
super().__init__(**kwargs)
self.target_state = target_state
self.instance_id = instance_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name

def poke(self, context):
ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
instance_state = ec2_hook.get_instance_state(instance_id=self.instance_id)
self.log.info("instance state: %s", instance_state)
return instance_state == self.target_state
53 changes: 8 additions & 45 deletions airflow/providers/amazon/aws/sensors/ec2_instance_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from typing import Optional

from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
from airflow.sensors.base import BaseSensorOperator


class EC2InstanceStateSensor(BaseSensorOperator):
"""
Check the state of the AWS EC2 instance until
state of the instance become equal to the target state.
:param target_state: target state of instance
:type target_state: str
:param instance_id: id of the AWS EC2 instance
:type instance_id: str
:param region_name: (optional) aws region name associated with the client
:type region_name: Optional[str]
"""
"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.ec2`."""

template_fields = ("target_state", "instance_id", "region_name")
ui_color = "#cc8811"
ui_fgcolor = "#ffffff"
valid_states = ["running", "stopped", "terminated"]
import warnings

def __init__(
self,
*,
target_state: str,
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: Optional[str] = None,
**kwargs,
):
if target_state not in self.valid_states:
raise ValueError(f"Invalid target_state: {target_state}")
super().__init__(**kwargs)
self.target_state = target_state
self.instance_id = instance_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name
from airflow.providers.amazon.aws.sensors.ec2 import EC2InstanceStateSensor # noqa

def poke(self, context):
ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
instance_state = ec2_hook.get_instance_state(instance_id=self.instance_id)
self.log.info("instance state: %s", instance_state)
return instance_state == self.target_state
warnings.warn(
"This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.ec2`.",
DeprecationWarning,
stacklevel=2,
)
2 changes: 2 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ operators:
python-modules:
- airflow.providers.amazon.aws.operators.ec2_start_instance
- airflow.providers.amazon.aws.operators.ec2_stop_instance
- airflow.providers.amazon.aws.operators.ec2
- integration-name: Amazon ECS
python-modules:
- airflow.providers.amazon.aws.operators.ecs
Expand Down Expand Up @@ -263,6 +264,7 @@ sensors:
- integration-name: Amazon EC2
python-modules:
- airflow.providers.amazon.aws.sensors.ec2_instance_state
- airflow.providers.amazon.aws.sensors.ec2
- integration-name: Amazon Elastic Kubernetes Service (EKS)
python-modules:
- airflow.providers.amazon.aws.sensors.eks
Expand Down
2 changes: 2 additions & 0 deletions dev/provider_packages/prepare_provider_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,8 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin
"This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.",
'numpy.ufunc size changed, may indicate binary incompatibility. Expected 192 from C header,'
' got 216 from PyObject',
'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.ec2`.',
'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.ec2`.',
}


Expand Down
Loading

0 comments on commit 985bb06

Please sign in to comment.