Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GCSToGCSOperator copying list of objects without wildcard #28111

Merged
merged 1 commit into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
Example 1:


The following Operator would copy all the files from ``a/``folder
(i.e a/a.csv, a/b.csv, a/c.csv)in ``data`` bucket to the ``b/`` folder in
The following Operator would copy all the files from ``a/`` folder
(i.e a/a.csv, a/b.csv, a/c.csv) in ``data`` bucket to the ``b/`` folder in
the ``data_backup`` bucket (b/a.csv, b/b.csv, b/c.csv) ::

copy_files = GCSToGCSOperator(
Expand All @@ -314,8 +314,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
Example 2:


The following Operator would copy all avro files from ``a/``folder
(i.e a/a.avro, a/b.avro, a/c.avro)in ``data`` bucket to the ``b/`` folder in
The following Operator would copy all avro files from ``a/`` folder
(i.e a/a.avro, a/b.avro, a/c.avro) in ``data`` bucket to the ``b/`` folder in
the ``data_backup`` bucket (b/a.avro, b/b.avro, b/c.avro) ::

copy_files = GCSToGCSOperator(
Expand All @@ -327,14 +327,30 @@ def _copy_source_without_wildcard(self, hook, prefix):
delimiter='.avro',
gcp_conn_id=google_cloud_conn_id
)

Example 3:


The following Operator would copy files (a/file_1.txt, a/file_2.csv, a/file_3.avro)
in ``data`` bucket to the ``b/`` folder in
the ``data_backup`` bucket (b/file_1.txt, b/file_2.csv, b/file_3.avro) ::

copy_files = GCSToGCSOperator(
task_id='copy_files_without_wildcard',
source_bucket='data',
source_objects=['a/file_1.txt', 'a/file_2.csv', 'a/file_3.avro'],
destination_bucket='data_backup',
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)
"""
objects = hook.list(self.source_bucket, prefix=prefix, delimiter=self.delimiter)

if not self.replace:
# If we are not replacing, ignore files already existing in source buckets
objects = self._ignore_existing_files(hook, prefix, objects=objects, delimiter=self.delimiter)

# If objects is empty and we have prefix, let's check if prefix is a blob
# If objects is empty, and we have prefix, let's check if prefix is a blob
# and copy directly
if len(objects) == 0 and prefix:
if hook.exists(self.source_bucket, prefix):
Expand All @@ -346,13 +362,31 @@ def _copy_source_without_wildcard(self, hook, prefix):
self.log.warning(msg)
raise AirflowException(msg)

for source_obj in objects:
if len(objects) == 1 and objects[0][-1] != "/":
self._copy_file(hook=hook, source_object=objects[0])
elif len(objects):
self._copy_directory(hook=hook, source_objects=objects, prefix=prefix)

def _copy_file(self, hook, source_object):
destination_object = self.destination_object or source_object
if self.destination_object[-1] == "/":
file_name = source_object.split("/")[-1]
destination_object += file_name
self._copy_single_object(
hook=hook, source_object=source_object, destination_object=destination_object
)

def _copy_directory(self, hook, source_objects, prefix):
_prefix = prefix.rstrip("/") + "/"
for source_obj in source_objects:
if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
continue
if self.destination_object is None:
destination_object = source_obj
else:
destination_object = source_obj.replace(prefix, self.destination_object, 1)
file_name_postfix = source_obj.replace(_prefix, "", 1)
destination_object = self.destination_object.rstrip("/") + "/" + file_name_postfix

self._copy_single_object(
hook=hook, source_object=source_obj, destination_object=destination_object
)
Expand Down
73 changes: 42 additions & 31 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@
TASK_ID = "test-gcs-to-gcs-operator"
TEST_BUCKET = "test-bucket"
PREFIX = "TEST"
SOURCE_FOLDER = "test_object"
SOURCE_OBJECTS_NO_FILE = [""]
SOURCE_OBJECTS_TWO_EMPTY_STRING = ["", ""]
SOURCE_OBJECTS_SINGLE_FILE = ["test_object/file1.txt"]
SOURCE_OBJECTS_MULTIPLE_FILES = ["test_object/file1.txt", "test_object/file2.txt"]
SOURCE_OBJECTS_LIST = ["test_object/file1.txt", "test_object/file2.txt", "test_object/file3.json"]
SOURCE_OBJECTS_SINGLE_FILE = [f"{SOURCE_FOLDER}/file1.txt"]
SOURCE_OBJECTS_MULTIPLE_FILES = [f"{SOURCE_FOLDER}/file1.txt", f"{SOURCE_FOLDER}/file2.txt"]
SOURCE_OBJECTS_LIST = [
f"{SOURCE_FOLDER}/file1.txt",
f"{SOURCE_FOLDER}/file2.txt",
f"{SOURCE_FOLDER}/file3.json",
]

SOURCE_OBJECT_WILDCARD_PREFIX = "*test_object"
SOURCE_OBJECT_WILDCARD_SUFFIX = "test_object*"
Expand All @@ -42,13 +47,8 @@
SOURCE_OBJECT_NO_WILDCARD = "test_object.txt"
SOURCE_OBJECT_MULTIPLE_WILDCARDS = "csv/*/test_*.csv"
DESTINATION_BUCKET = "archive"
DESTINATION_OBJECT = "foo/bar"
DESTINATION_OBJECT = "foo/bar/"
DESTINATION_OBJECT_PREFIX = "foo/bar"
SOURCE_FILES_LIST = [
"test_object/file1.txt",
"test_object/file2.txt",
"test_object/file3.json",
]
DELIMITER = ".json"

MOD_TIME_1 = datetime(2016, 1, 1)
Expand Down Expand Up @@ -107,6 +107,7 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
mock_hook.return_value.list.side_effect = [[], [SOURCE_OBJECT_NO_WILDCARD]]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -117,8 +118,8 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
mock.call(DESTINATION_BUCKET, prefix="test_object.txt", delimiter=None),
mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None),
mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

Expand Down Expand Up @@ -159,7 +160,7 @@ def test_execute_prefix_and_suffix(self, mock_hook):
# copy with wildcard
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_with_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -177,7 +178,7 @@ def test_execute_wildcard_with_destination_object(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -199,7 +200,7 @@ def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hoo

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_without_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -216,7 +217,7 @@ def test_execute_wildcard_without_destination_object(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_empty_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -234,7 +235,7 @@ def test_execute_wildcard_empty_destination_object(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_last_modified_time(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -252,7 +253,7 @@ def test_execute_last_modified_time(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
mock_hook.return_value.is_updated_after.side_effect = [True, True, True]
operator = GCSToGCSOperator(
task_id=TASK_ID,
Expand All @@ -271,7 +272,7 @@ def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
mock_hook.return_value.is_updated_after.side_effect = [True, False, False]
operator = GCSToGCSOperator(
task_id=TASK_ID,
Expand All @@ -288,7 +289,7 @@ def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_wc_with_no_last_modified_time(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -318,7 +319,7 @@ def test_no_prefix_with_last_modified_time_with_true_cond(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand All @@ -335,7 +336,7 @@ def test_no_prefix_with_maximum_modified_time_with_true_cond(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand All @@ -353,7 +354,7 @@ def test_exe_last_modified_time_and_maximum_modified_time_with_true_cond(self, m

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand All @@ -369,7 +370,7 @@ def test_execute_no_prefix_with_no_last_modified_time(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand Down Expand Up @@ -403,12 +404,12 @@ def test_executes_with_is_older_than_with_true_cond(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_more_than_1_wildcard(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -426,7 +427,7 @@ def test_execute_more_than_1_wildcard(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_with_empty_destination_bucket(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -500,7 +501,9 @@ def test_executes_with_a_delimiter(self, mock_hook):
# COPY
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_executes_with_delimiter_and_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = ["test_object/file3.json"]
mock_hook.return_value.list.side_effect = [[], [], [SOURCE_OBJECTS_LIST[2]]]
mock_hook.return_value.exists.return_value = False

operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -512,13 +515,17 @@ def test_executes_with_delimiter_and_destination_object(self, mock_hook):

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, "test_object/file3.json", DESTINATION_BUCKET, DESTINATION_OBJECT),
mock.call(
TEST_BUCKET, SOURCE_OBJECTS_LIST[2], DESTINATION_BUCKET, DESTINATION_OBJECT + "file3.json"
),
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_executes_with_different_delimiter_and_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = ["test_object/file1.txt", "test_object/file2.txt"]
mock_hook.return_value.list.side_effect = [[SOURCE_OBJECTS_LIST[0]], [SOURCE_OBJECTS_LIST[1]], []]
mock_hook.return_value.exists.return_value = False

operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -530,8 +537,12 @@ def test_executes_with_different_delimiter_and_destination_object(self, mock_hoo

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, "test_object/file1.txt", DESTINATION_BUCKET, "test_object/file1.txt"),
mock.call(TEST_BUCKET, "test_object/file2.txt", DESTINATION_BUCKET, "test_object/file2.txt"),
mock.call(
TEST_BUCKET, SOURCE_OBJECTS_LIST[0], DESTINATION_BUCKET, DESTINATION_OBJECT + "file1.txt"
),
mock.call(
TEST_BUCKET, SOURCE_OBJECTS_LIST[1], DESTINATION_BUCKET, DESTINATION_OBJECT + "file2.txt"
),
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@

chain(
# TEST SETUP
generate_random_file,
[create_bucket_src, create_bucket_dst],
[upload_file_src, upload_file_src_sub],
[upload_file_dst, upload_file_dst_sub],
Expand Down