Skip to content

Commit

Permalink
Fix GCSToGCSOperator copying list of objects without wildcard (#28111)
Browse files Browse the repository at this point in the history
Co-authored-by: Maksim Moiseenkov <[email protected]>
  • Loading branch information
moiseenkov and moiseenkov authored Dec 5, 2022
1 parent 0ef8d93 commit 3fef462
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 38 deletions.
48 changes: 41 additions & 7 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
Example 1:
The following Operator would copy all the files from ``a/``folder
(i.e a/a.csv, a/b.csv, a/c.csv)in ``data`` bucket to the ``b/`` folder in
The following Operator would copy all the files from ``a/`` folder
(i.e a/a.csv, a/b.csv, a/c.csv) in ``data`` bucket to the ``b/`` folder in
the ``data_backup`` bucket (b/a.csv, b/b.csv, b/c.csv) ::
copy_files = GCSToGCSOperator(
Expand All @@ -314,8 +314,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
Example 2:
The following Operator would copy all avro files from ``a/``folder
(i.e a/a.avro, a/b.avro, a/c.avro)in ``data`` bucket to the ``b/`` folder in
The following Operator would copy all avro files from ``a/`` folder
(i.e a/a.avro, a/b.avro, a/c.avro) in ``data`` bucket to the ``b/`` folder in
the ``data_backup`` bucket (b/a.avro, b/b.avro, b/c.avro) ::
copy_files = GCSToGCSOperator(
Expand All @@ -327,14 +327,30 @@ def _copy_source_without_wildcard(self, hook, prefix):
delimiter='.avro',
gcp_conn_id=google_cloud_conn_id
)
Example 3:
The following Operator would copy files (a/file_1.txt, a/file_2.csv, a/file_3.avro)
in ``data`` bucket to the ``b/`` folder in
the ``data_backup`` bucket (b/file_1.txt, b/file_2.csv, b/file_3.avro) ::
copy_files = GCSToGCSOperator(
task_id='copy_files_without_wildcard',
source_bucket='data',
source_objects=['a/file_1.txt', 'a/file_2.csv', 'a/file_3.avro'],
destination_bucket='data_backup',
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)
"""
objects = hook.list(self.source_bucket, prefix=prefix, delimiter=self.delimiter)

if not self.replace:
# If we are not replacing, ignore files already existing in source buckets
objects = self._ignore_existing_files(hook, prefix, objects=objects, delimiter=self.delimiter)

# If objects is empty and we have prefix, let's check if prefix is a blob
# If objects is empty, and we have prefix, let's check if prefix is a blob
# and copy directly
if len(objects) == 0 and prefix:
if hook.exists(self.source_bucket, prefix):
Expand All @@ -346,13 +362,31 @@ def _copy_source_without_wildcard(self, hook, prefix):
self.log.warning(msg)
raise AirflowException(msg)

for source_obj in objects:
if len(objects) == 1 and objects[0][-1] != "/":
self._copy_file(hook=hook, source_object=objects[0])
elif len(objects):
self._copy_directory(hook=hook, source_objects=objects, prefix=prefix)

def _copy_file(self, hook, source_object):
destination_object = self.destination_object or source_object
if self.destination_object[-1] == "/":
file_name = source_object.split("/")[-1]
destination_object += file_name
self._copy_single_object(
hook=hook, source_object=source_object, destination_object=destination_object
)

def _copy_directory(self, hook, source_objects, prefix):
_prefix = prefix.rstrip("/") + "/"
for source_obj in source_objects:
if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
continue
if self.destination_object is None:
destination_object = source_obj
else:
destination_object = source_obj.replace(prefix, self.destination_object, 1)
file_name_postfix = source_obj.replace(_prefix, "", 1)
destination_object = self.destination_object.rstrip("/") + "/" + file_name_postfix

self._copy_single_object(
hook=hook, source_object=source_obj, destination_object=destination_object
)
Expand Down
73 changes: 42 additions & 31 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@
TASK_ID = "test-gcs-to-gcs-operator"
TEST_BUCKET = "test-bucket"
PREFIX = "TEST"
SOURCE_FOLDER = "test_object"
SOURCE_OBJECTS_NO_FILE = [""]
SOURCE_OBJECTS_TWO_EMPTY_STRING = ["", ""]
SOURCE_OBJECTS_SINGLE_FILE = ["test_object/file1.txt"]
SOURCE_OBJECTS_MULTIPLE_FILES = ["test_object/file1.txt", "test_object/file2.txt"]
SOURCE_OBJECTS_LIST = ["test_object/file1.txt", "test_object/file2.txt", "test_object/file3.json"]
SOURCE_OBJECTS_SINGLE_FILE = [f"{SOURCE_FOLDER}/file1.txt"]
SOURCE_OBJECTS_MULTIPLE_FILES = [f"{SOURCE_FOLDER}/file1.txt", f"{SOURCE_FOLDER}/file2.txt"]
SOURCE_OBJECTS_LIST = [
f"{SOURCE_FOLDER}/file1.txt",
f"{SOURCE_FOLDER}/file2.txt",
f"{SOURCE_FOLDER}/file3.json",
]

SOURCE_OBJECT_WILDCARD_PREFIX = "*test_object"
SOURCE_OBJECT_WILDCARD_SUFFIX = "test_object*"
Expand All @@ -42,13 +47,8 @@
SOURCE_OBJECT_NO_WILDCARD = "test_object.txt"
SOURCE_OBJECT_MULTIPLE_WILDCARDS = "csv/*/test_*.csv"
DESTINATION_BUCKET = "archive"
DESTINATION_OBJECT = "foo/bar"
DESTINATION_OBJECT = "foo/bar/"
DESTINATION_OBJECT_PREFIX = "foo/bar"
SOURCE_FILES_LIST = [
"test_object/file1.txt",
"test_object/file2.txt",
"test_object/file3.json",
]
DELIMITER = ".json"

MOD_TIME_1 = datetime(2016, 1, 1)
Expand Down Expand Up @@ -107,6 +107,7 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
mock_hook.return_value.list.side_effect = [[], [SOURCE_OBJECT_NO_WILDCARD]]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -117,8 +118,8 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
mock.call(DESTINATION_BUCKET, prefix="test_object.txt", delimiter=None),
mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None),
mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

Expand Down Expand Up @@ -159,7 +160,7 @@ def test_execute_prefix_and_suffix(self, mock_hook):
# copy with wildcard
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_with_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -177,7 +178,7 @@ def test_execute_wildcard_with_destination_object(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -199,7 +200,7 @@ def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hoo

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_without_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -216,7 +217,7 @@ def test_execute_wildcard_without_destination_object(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_empty_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -234,7 +235,7 @@ def test_execute_wildcard_empty_destination_object(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_last_modified_time(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -252,7 +253,7 @@ def test_execute_last_modified_time(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
mock_hook.return_value.is_updated_after.side_effect = [True, True, True]
operator = GCSToGCSOperator(
task_id=TASK_ID,
Expand All @@ -271,7 +272,7 @@ def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
mock_hook.return_value.is_updated_after.side_effect = [True, False, False]
operator = GCSToGCSOperator(
task_id=TASK_ID,
Expand All @@ -288,7 +289,7 @@ def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_wc_with_no_last_modified_time(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1]
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -318,7 +319,7 @@ def test_no_prefix_with_last_modified_time_with_true_cond(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand All @@ -335,7 +336,7 @@ def test_no_prefix_with_maximum_modified_time_with_true_cond(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand All @@ -353,7 +354,7 @@ def test_exe_last_modified_time_and_maximum_modified_time_with_true_cond(self, m

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand All @@ -369,7 +370,7 @@ def test_execute_no_prefix_with_no_last_modified_time(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
Expand Down Expand Up @@ -403,12 +404,12 @@ def test_executes_with_is_older_than_with_true_cond(self, mock_hook):

operator.execute(None)
mock_hook.return_value.rewrite.assert_called_once_with(
TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"
TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_more_than_1_wildcard(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -426,7 +427,7 @@ def test_execute_more_than_1_wildcard(self, mock_hook):

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_with_empty_destination_bucket(self, mock_hook):
mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -500,7 +501,9 @@ def test_executes_with_a_delimiter(self, mock_hook):
# COPY
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_executes_with_delimiter_and_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = ["test_object/file3.json"]
mock_hook.return_value.list.side_effect = [[], [], [SOURCE_OBJECTS_LIST[2]]]
mock_hook.return_value.exists.return_value = False

operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -512,13 +515,17 @@ def test_executes_with_delimiter_and_destination_object(self, mock_hook):

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, "test_object/file3.json", DESTINATION_BUCKET, DESTINATION_OBJECT),
mock.call(
TEST_BUCKET, SOURCE_OBJECTS_LIST[2], DESTINATION_BUCKET, DESTINATION_OBJECT + "file3.json"
),
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_executes_with_different_delimiter_and_destination_object(self, mock_hook):
mock_hook.return_value.list.return_value = ["test_object/file1.txt", "test_object/file2.txt"]
mock_hook.return_value.list.side_effect = [[SOURCE_OBJECTS_LIST[0]], [SOURCE_OBJECTS_LIST[1]], []]
mock_hook.return_value.exists.return_value = False

operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
Expand All @@ -530,8 +537,12 @@ def test_executes_with_different_delimiter_and_destination_object(self, mock_hoo

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, "test_object/file1.txt", DESTINATION_BUCKET, "test_object/file1.txt"),
mock.call(TEST_BUCKET, "test_object/file2.txt", DESTINATION_BUCKET, "test_object/file2.txt"),
mock.call(
TEST_BUCKET, SOURCE_OBJECTS_LIST[0], DESTINATION_BUCKET, DESTINATION_OBJECT + "file1.txt"
),
mock.call(
TEST_BUCKET, SOURCE_OBJECTS_LIST[1], DESTINATION_BUCKET, DESTINATION_OBJECT + "file2.txt"
),
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@

chain(
# TEST SETUP
generate_random_file,
[create_bucket_src, create_bucket_dst],
[upload_file_src, upload_file_src_sub],
[upload_file_dst, upload_file_dst_sub],
Expand Down

0 comments on commit 3fef462

Please sign in to comment.