Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate S3PrefixSensor and S3KeySizeSensor in favor of S3KeySensor #22737

Merged
merged 11 commits into from
Apr 12, 2022
58 changes: 57 additions & 1 deletion airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import os
from datetime import datetime
from typing import List

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
Expand All @@ -27,18 +28,38 @@
S3GetBucketTaggingOperator,
S3PutBucketTaggingOperator,
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
KEY = os.environ.get('KEY', 'key')
KEY_2 = os.environ.get('KEY_2', 'key2')
TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')


with DAG(
dag_id='example_s3_bucket',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_sensor_s3_key_function_definition]
def check_fn(files: List) -> bool:
"""
Example of custom check: check if all files are bigger than 1kB

:param files: List of S3 object attributes.
Format: [{
'Size': int
}]
:return: true if the criteria is met
:rtype: bool
"""
return all(f.get('Size', 0) > 1024 for f in files)

# [END howto_sensor_s3_key_function_definition]

# [START howto_operator_s3_create_bucket]
create_bucket = S3CreateBucketOperator(
task_id='s3_create_bucket',
Expand Down Expand Up @@ -69,10 +90,45 @@
)
# [END howto_operator_s3_delete_bucket_tagging]

# [START howto_sensor_s3_key_single_key]
# Check if a file exists
s3_sensor_one_key = S3KeySensor(
task_id="s3_sensor_one_key",
bucket_name=BUCKET_NAME,
bucket_key=KEY,
)
# [END howto_sensor_s3_key_single_key]

# [START howto_sensor_s3_key_multiple_keys]
# Check if both files exist
s3_sensor_two_keys = S3KeySensor(
task_id="s3_sensor_two_keys",
bucket_name=BUCKET_NAME,
bucket_key=[KEY, KEY_2],
)
# [END howto_sensor_s3_key_multiple_keys]

# [START howto_sensor_s3_key_function]
# Check if a file exists and match a certain pattern defined in check_fn
s3_sensor_key_function = S3KeySensor(
task_id="s3_sensor_key_function",
bucket_name=BUCKET_NAME,
bucket_key=KEY,
check_fn=check_fn,
)
# [END howto_sensor_s3_key_function]

# [START howto_operator_s3_delete_bucket]
delete_bucket = S3DeleteBucketOperator(
task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True
)
# [END howto_operator_s3_delete_bucket]

chain(create_bucket, put_tagging, get_tagging, delete_tagging, delete_bucket)
chain(
create_bucket,
put_tagging,
get_tagging,
delete_tagging,
[s3_sensor_one_key, s3_sensor_two_keys, s3_sensor_key_function],
delete_bucket,
)
59 changes: 52 additions & 7 deletions airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,26 +348,71 @@ def _is_in_period(input_date: datetime) -> bool:

return self._list_key_object_filter(keys, from_datetime, to_datetime)

@provide_bucket_name
def get_file_metadata(
self,
prefix: str,
bucket_name: Optional[str] = None,
page_size: Optional[int] = None,
max_items: Optional[int] = None,
) -> List:
"""
Lists metadata objects in a bucket under prefix

:param prefix: a key prefix
:param bucket_name: the name of the bucket
:param page_size: pagination size
:param max_items: maximum items to return
:return: a list of metadata of objects
:rtype: list
"""
config = {
'PageSize': page_size,
'MaxItems': max_items,
}

paginator = self.get_conn().get_paginator('list_objects_v2')
response = paginator.paginate(Bucket=bucket_name, Prefix=prefix, PaginationConfig=config)

files = []
for page in response:
if 'Contents' in page:
files += page['Contents']
return files

@provide_bucket_name
@unify_bucket_name_and_key
def check_for_key(self, key: str, bucket_name: Optional[str] = None) -> bool:
def head_object(self, key: str, bucket_name: Optional[str] = None) -> Optional[dict]:
"""
Checks if a key exists in a bucket
Retrieves metadata of an object

:param key: S3 key that will point to the file
:param bucket_name: Name of the bucket in which the file is stored
:return: True if the key exists and False if not.
:rtype: bool
:return: metadata of an object
:rtype: dict
"""
try:
self.get_conn().head_object(Bucket=bucket_name, Key=key)
return True
return self.get_conn().head_object(Bucket=bucket_name, Key=key)
except ClientError as e:
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
return False
return None
else:
raise e

@provide_bucket_name
@unify_bucket_name_and_key
def check_for_key(self, key: str, bucket_name: Optional[str] = None) -> bool:
"""
Checks if a key exists in a bucket

:param key: S3 key that will point to the file
:param bucket_name: Name of the bucket in which the file is stored
:return: True if the key exists and False if not.
:rtype: bool
"""
obj = self.head_object(key, bucket_name)
return obj is not None

@provide_bucket_name
@unify_bucket_name_and_key
def get_key(self, key: str, bucket_name: Optional[str] = None) -> S3Transfer:
Expand Down
Loading