Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator #19665

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
580dd93
This commit adds new features to the airflow aws redshift module. Sim…
Nov 17, 2021
7a921c5
Add missing requirements for Static checks. Add license to all python…
Nov 18, 2021
0edb98d
Adding changes for pre-commit success
Nov 18, 2021
9911c72
Adding `ClusterStates` Enum to redshift hook to avoid having magic st…
Nov 20, 2021
a68cbbf
Adding documentation to existing AWS operator docs
Nov 22, 2021
b20a38e
Moving the cast to `ClusterStates` Enum to inside the `cluster_status…
Nov 22, 2021
81f2f75
Fixed logging to redshift sensor to show the actual value of the ENUM…
Nov 22, 2021
61b2935
Fixed logging to redshift sensor to show the actual value of the ENUM…
Nov 29, 2021
3b3f1e0
Adding `seealso` to Operator documentation
Nov 30, 2021
b151337
Update airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
dbarrundiag Nov 30, 2021
ec92273
Update airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
dbarrundiag Nov 30, 2021
3ce29da
Removing unused `check_interval` from operator
Nov 30, 2021
456cb73
Merge remote-tracking branch 'origin/redshift-resume-pause-cluster-op…
Nov 30, 2021
4d6e402
Update airflow/providers/amazon/aws/sensors/redshift.py
dbarrundiag Nov 30, 2021
57c4c35
Update docs/apache-airflow-providers-amazon/operators/redshift.rst
dbarrundiag Nov 30, 2021
e03031f
Update docs/apache-airflow-providers-amazon/operators/redshift.rst
dbarrundiag Nov 30, 2021
92cfcf2
Update tests/providers/amazon/aws/sensors/test_redshift.py
dbarrundiag Nov 30, 2021
06a7e3d
Rolling back changes to not use enum and move all Operators into one …
Dec 7, 2021
d20662a
Merge remote-tracking branch 'origin/redshift-resume-pause-cluster-op…
Dec 7, 2021
e72bec0
Rolling back changes to not use enum and move all Operators into one …
Dec 7, 2021
e659253
Rolling back changes to not use enum and move all Operators into one …
Dec 7, 2021
b303dc7
Rolling back changes to not use enum and move all Operators into one …
Dec 7, 2021
6c63fed
Update airflow/providers/amazon/aws/operators/redshift.py
dbarrundiag Dec 7, 2021
3ac8f33
Update airflow/providers/amazon/aws/operators/redshift.py
dbarrundiag Dec 7, 2021
50becc6
Removing no longer required subclass
Dec 7, 2021
653d6b2
Merge remote-tracking branch 'origin/redshift-resume-pause-cluster-op…
Dec 7, 2021
f012c29
Add unittests for TestPauseClusterOperator and TestResumeClusterOperator
Dec 9, 2021
a7ff6ca
Clean unittest for redshift Operators
Dec 9, 2021
f4ac7a4
Small fixups
Dec 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions airflow/providers/amazon/aws/hooks/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
# specific language governing permissions and limitations
# under the License.
"""Interact with AWS Redshift clusters."""

from typing import Dict, List, Optional, Union

try:
from functools import cached_property
except ImportError:
from cached_property import cached_property

from enum import Enum

import redshift_connector
from redshift_connector import Connection as RedshiftConnection
from sqlalchemy import create_engine
Expand All @@ -33,6 +34,21 @@
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class RedshiftClusterStates(Enum):
"""Contains the possible State values of a Redshift Cluster."""

AVAILABLE = 'available'
CREATING = 'creating'
DELETING = 'deleting'
RESUMING = 'resuming'
MODIFYING = 'modifying'
PAUSED = 'paused'
REBOOTING = 'rebooting'
RENAMING = 'renaming'
RESIZING = 'resizing'
NONEXISTENT = 'nonexistent'


class RedshiftHook(AwsBaseHook):
"""
Interact with AWS Redshift, using the boto3 library
Expand All @@ -52,7 +68,7 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

# TODO: Wrap create_cluster_snapshot
def cluster_status(self, cluster_identifier: str) -> str:
def cluster_status(self, cluster_identifier: str) -> RedshiftClusterStates:
"""
Return status of a cluster

Expand All @@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
"""
try:
response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
return response[0]['ClusterStatus'] if response else None
return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a little unsure about converting to enum here. You don't control the API so you can't guarantee that you will have the value in your enum. If an unexpected value appears here, you'll get an unexpected failure. I think you can use the enum for evaluating whether the returned value is the one you are looking for, but I don't think it's a good idea to immediately convert it without error handling. I suppose you could catch ValueError and return an uncategorized enum value or something.... but probably better to just leave it raw. i bet @uranusjr would have some wisdom to share here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@o-nikolas had some good thoughts here on using the Enum:
#19665 (comment)

essentially because that's what was used on the EKS operators similar to this airflow/airflow/providers/amazon/aws/hooks/eks.py

Copy link
Contributor

@dstandish dstandish Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i read the comment and yeah in the 'not found' case that is the focus of the example, sure returning an enum would be fine -- it's completely in your control and a "made-up" status --- but forcing the raw api response into enum is what i feel more iffy about.

and i'd easily say keep the enum there and use it when evaluating the response (e.g. is it paused or available -- use the enum to evaluate the string against rather than hardcode). just am unsure about converting the raw response in the return value and asking for second opinion :)

Copy link
Contributor

@o-nikolas o-nikolas Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good discussion folks!

IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point. I still vote for using Enums from the start here, add error handling, and if APIs change then this code and other code in the related operators will require attention like any other breaking API changes cause.

Copy link
Contributor

@dstandish dstandish Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point

Not necessarily. This hook method is just returning status. If new statuses appear (or if you missed one in your research), it won't break your code, unless you force it into an enum that doesn't know the new status. And it's far from certain that a new status would break your operator. If you are waiting for your cluster to become available and there's a new status kindof-almost-available well that's still not available so your logic is unaffected -- unless of course you are forcing it into an enum and then your code breaks unnecessarily. That's the scenario that makes me wary of this approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hooks are necessarily tied to the API/service they wrap, and I agree with @dstandish -- we don't want our airflow Hook to suddley not work at all when AWS add a new status code to their API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks so much @dstandish and @ashb, should we go back to using the strings and leave cluster_not_found to avoid the breaking change?

Copy link
Contributor

@dstandish dstandish Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please let's avoid breaking

in my view you are still welcome to use the enum for the purpose of evaluating the response from the API but just don't force the raw response into Enum

thank you 🙏

Copy link
Contributor

@o-nikolas o-nikolas Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dstandish,

Yes, you would need to correctly handle the case of an unknown/new status, but there's no reason you can't correctly handle this as well as use Eunms (E.g. coerce anything we haven't seen yet to an UNKOWN Enum state, or just continue to return None like other errors cases can do in this existing code). Certainly each approach has pros and cons, but I see no breaking reason if you chose to use Enums.

Though I'm happy to disagree and commit to using strings if that's what others think as well :)

Copy link
Contributor

@dstandish dstandish Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coerce anything we haven't seen yet to an UNKOWN Enum state, or just continue to return None like other errors cases can do in this existing code

Yup it's true this is an approach that would be more tolerant of the unexpected, it's just not what was done here.

But I would say there is little value in implementing such error handling in an effort to still convert in this return value. If your value conforms to the enum, then your comparisons against the enum would work even without converting in the return. And if it doesn't conform, then you either fail (current code state), or throw away information (replacing with None or UNKNOWN). So what's the point of converting it to an enum at all? I would just leave the raw values alone and use the enums for evaluation.

Though I'm happy to disagree and commit to using strings if that's what others think as well :)

I don't know that we need to make a universal proclaimation but in this case I think not converting to enum makes sense :)

except self.get_conn().exceptions.ClusterNotFoundFault:
return 'cluster_not_found'
dstandish marked this conversation as resolved.
Show resolved Hide resolved
return RedshiftClusterStates.NONEXISTENT

def delete_cluster(
self,
Expand Down
58 changes: 58 additions & 0 deletions airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook


class RedshiftPauseClusterOperator(BaseOperator):
"""
Pause an AWS Redshift Cluster using boto3.
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved

:param cluster_identifier: id of the AWS Redshift Cluster
:type cluster_identifier: str
:param aws_conn_id: aws connection to use
:type aws_conn_id: str
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
"""

template_fields = ("cluster_identifier",)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
cluster_identifier: str,
aws_conn_id: str = "aws_default",
check_interval: float = 15,
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.aws_conn_id = aws_conn_id
self.check_interval = check_interval

def execute(self, context):
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == RedshiftClusterStates.AVAILABLE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not met i would log info, or perhaps even warning because it seems that it is unexpected.

redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
58 changes: 58 additions & 0 deletions airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook


class RedshiftResumeClusterOperator(BaseOperator):
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
"""
Resume an AWS Redshift Cluster using boto3.
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved

:param cluster_identifier: id of the AWS Redshift Cluster
:type cluster_identifier: str
:param aws_conn_id: aws connection to use
:type aws_conn_id: str
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
"""

template_fields = ("cluster_identifier",)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
cluster_identifier: str,
aws_conn_id: str = "aws_default",
check_interval: float = 15,
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.aws_conn_id = aws_conn_id
self.check_interval = check_interval

def execute(self, context):
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == RedshiftClusterStates.PAUSED:
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
17 changes: 12 additions & 5 deletions airflow/providers/amazon/aws/sensors/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from typing import Optional

from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
from airflow.sensors.base import BaseSensorOperator


Expand All @@ -28,7 +28,7 @@ class AwsRedshiftClusterSensor(BaseSensorOperator):
:param cluster_identifier: The identifier for the cluster being pinged.
:type cluster_identifier: str
:param target_status: The cluster status desired.
:type target_status: str
:type target_status: RedshiftClusterStates
"""

template_fields = ('cluster_identifier', 'target_status')
Expand All @@ -37,18 +37,25 @@ def __init__(
self,
*,
cluster_identifier: str,
target_status: str = 'available',
target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.target_status = target_status
self.target_status = (
target_status
if isinstance(target_status, RedshiftClusterStates)
else RedshiftClusterStates(str(target_status))
)

dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
self.aws_conn_id = aws_conn_id
self.hook: Optional[RedshiftHook] = None

dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
def poke(self, context):
self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier)
self.log.info(
'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier
'Checking cluster %r for status %r', self.cluster_identifier, self.target_status.value

or

Suggested change
'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier
'Checking cluster %r\nwaiting for status %r', self.cluster_identifier, self.target_status.value

because for cluster %s is not really a grammatical expression by itself and it's on a newline so it seems that it should be.

)
return self.get_hook().cluster_status(self.cluster_identifier) == self.target_status

def get_hook(self) -> RedshiftHook:
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ operators:
- integration-name: Amazon Redshift
python-modules:
- airflow.providers.amazon.aws.operators.redshift
- airflow.providers.amazon.aws.operators.redshift_pause_cluster
- airflow.providers.amazon.aws.operators.redshift_resume_cluster

sensors:
- integration-name: Amazon Athena
Expand Down
23 changes: 23 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/redshift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,26 @@ All together, here is our DAG:
:language: python
:start-after: [START redshift_operator_howto_guide]
:end-before: [END redshift_operator_howto_guide]


.. _howto/operator:RedshiftResumeClusterOperator:

Resume a Redshift Cluster
"""""""""""""""""""""""""""""""""""""""""""

To resume an existing AWS Redshift Cluster you can use
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
:class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`.

This Operator leverages the AWS CLI
`resume-cluster <https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__ API

.. _howto/operator:RedshiftPauseClusterOperator:

Pause a Redshift Cluster
"""""""""""""""""""""""""""""""""""""""""""

To pause an existing AWS Redshift Cluster you can use
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
:class:`~airflow.providers.amazon.aws.operators.redshift_pause_cluster.RedshiftPauseClusterOperator`.

This Operator leverages the AWS CLI
`pause-cluster <https://docs.aws.amazon.com/cli/latest/reference/redshift/pause-cluster.html>`__ API
6 changes: 3 additions & 3 deletions tests/providers/amazon/aws/hooks/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook, RedshiftSQLHook

try:
from moto import mock_redshift
Expand Down Expand Up @@ -98,15 +98,15 @@ def test_cluster_status_returns_cluster_not_found(self):
self._create_clusters()
hook = RedshiftHook(aws_conn_id='aws_default')
status = hook.cluster_status('test_cluster_not_here')
assert status == 'cluster_not_found'
assert status == RedshiftClusterStates.NONEXISTENT

@unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')
@mock_redshift
def test_cluster_status_returns_available_cluster(self):
self._create_clusters()
hook = RedshiftHook(aws_conn_id='aws_default')
status = hook.cluster_status('test_cluster')
assert status == 'available'
assert status == RedshiftClusterStates.AVAILABLE


class TestRedshiftSQLHookConn(unittest.TestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import unittest

import boto3

from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator

try:
from moto import mock_redshift
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
mock_redshift = None


class TestPauseClusterOperator(unittest.TestCase):
dbarrundiag marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
def _create_clusters():
client = boto3.client('redshift', region_name='us-east-1')
client.create_cluster(
ClusterIdentifier='test_cluster_to_pause',
NodeType='dc1.large',
MasterUsername='admin',
MasterUserPassword='mock_password',
)
client.create_cluster(
ClusterIdentifier='test_cluster_to_resume',
NodeType='dc1.large',
MasterUsername='admin',
MasterUserPassword='mock_password',
)
if not client.describe_clusters()['Clusters']:
raise ValueError('AWS not properly mocked')

def test_init(self):
redshift_operator = RedshiftPauseClusterOperator(
task_id="task_test",
cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test",
check_interval=3,
)
assert redshift_operator.task_id == "task_test"
assert redshift_operator.cluster_identifier == "test_cluster"
assert redshift_operator.aws_conn_id == "aws_conn_test"
assert redshift_operator.check_interval == 3
Loading