From 5fe5832dc30bcdf1cfb04d53beeab4f2954b8843 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Thu, 1 Dec 2022 09:09:05 +0000 Subject: [PATCH] Fix GCSToGCSOperator copying list of objects without wildcard --- .../google/cloud/transfers/gcs_to_gcs.py | 48 ++++++++++-- .../google/cloud/transfers/test_gcs_to_gcs.py | 73 +++++++++++-------- .../google/cloud/gcs/example_gcs_to_gcs.py | 1 + 3 files changed, 84 insertions(+), 38 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index b0a15c98991d9..962f16ef4ab4c 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -298,8 +298,8 @@ def _copy_source_without_wildcard(self, hook, prefix): Example 1: - The following Operator would copy all the files from ``a/``folder - (i.e a/a.csv, a/b.csv, a/c.csv)in ``data`` bucket to the ``b/`` folder in + The following Operator would copy all the files from ``a/`` folder + (i.e a/a.csv, a/b.csv, a/c.csv) in ``data`` bucket to the ``b/`` folder in the ``data_backup`` bucket (b/a.csv, b/b.csv, b/c.csv) :: copy_files = GCSToGCSOperator( @@ -314,8 +314,8 @@ def _copy_source_without_wildcard(self, hook, prefix): Example 2: - The following Operator would copy all avro files from ``a/``folder - (i.e a/a.avro, a/b.avro, a/c.avro)in ``data`` bucket to the ``b/`` folder in + The following Operator would copy all avro files from ``a/`` folder + (i.e a/a.avro, a/b.avro, a/c.avro) in ``data`` bucket to the ``b/`` folder in the ``data_backup`` bucket (b/a.avro, b/b.avro, b/c.avro) :: copy_files = GCSToGCSOperator( @@ -327,6 +327,22 @@ def _copy_source_without_wildcard(self, hook, prefix): delimiter='.avro', gcp_conn_id=google_cloud_conn_id ) + + Example 3: + + + The following Operator would copy files (a/file_1.txt, a/file_2.csv, a/file_3.avro) + in ``data`` bucket to the ``b/`` folder in + the ``data_backup`` bucket (b/file_1.txt, b/file_2.csv, b/file_3.avro) :: + + copy_files = GCSToGCSOperator( + task_id='copy_files_without_wildcard', + source_bucket='data', + source_objects=['a/file_1.txt', 'a/file_2.csv', 'a/file_3.avro'], + destination_bucket='data_backup', + destination_object='b/', + gcp_conn_id=google_cloud_conn_id + ) """ objects = hook.list(self.source_bucket, prefix=prefix, delimiter=self.delimiter) @@ -334,7 +350,7 @@ def _copy_source_without_wildcard(self, hook, prefix): # If we are not replacing, ignore files already existing in source buckets objects = self._ignore_existing_files(hook, prefix, objects=objects, delimiter=self.delimiter) - # If objects is empty and we have prefix, let's check if prefix is a blob + # If objects is empty, and we have prefix, let's check if prefix is a blob # and copy directly if len(objects) == 0 and prefix: if hook.exists(self.source_bucket, prefix): @@ -346,13 +362,31 @@ def _copy_source_without_wildcard(self, hook, prefix): self.log.warning(msg) raise AirflowException(msg) - for source_obj in objects: + if len(objects) == 1 and objects[0][-1] != "/": + self._copy_file(hook=hook, source_object=objects[0]) + elif len(objects): + self._copy_directory(hook=hook, source_objects=objects, prefix=prefix) + + def _copy_file(self, hook, source_object): + destination_object = self.destination_object or source_object + if self.destination_object[-1] == "/": + file_name = source_object.split("/")[-1] + destination_object += file_name + self._copy_single_object( + hook=hook, source_object=source_object, destination_object=destination_object + ) + + def _copy_directory(self, hook, source_objects, prefix): + _prefix = prefix.rstrip("/") + "/" + for source_obj in source_objects: if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)): continue if self.destination_object is None: destination_object = source_obj else: - destination_object = source_obj.replace(prefix, self.destination_object, 1) + file_name_postfix = source_obj.replace(_prefix, "", 1) + destination_object = self.destination_object.rstrip("/") + "/" + file_name_postfix + self._copy_single_object( hook=hook, source_object=source_obj, destination_object=destination_object ) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index c98348ae1b99c..1c2328d4e5885 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -29,11 +29,16 @@ TASK_ID = "test-gcs-to-gcs-operator" TEST_BUCKET = "test-bucket" PREFIX = "TEST" +SOURCE_FOLDER = "test_object" SOURCE_OBJECTS_NO_FILE = [""] SOURCE_OBJECTS_TWO_EMPTY_STRING = ["", ""] -SOURCE_OBJECTS_SINGLE_FILE = ["test_object/file1.txt"] -SOURCE_OBJECTS_MULTIPLE_FILES = ["test_object/file1.txt", "test_object/file2.txt"] -SOURCE_OBJECTS_LIST = ["test_object/file1.txt", "test_object/file2.txt", "test_object/file3.json"] +SOURCE_OBJECTS_SINGLE_FILE = [f"{SOURCE_FOLDER}/file1.txt"] +SOURCE_OBJECTS_MULTIPLE_FILES = [f"{SOURCE_FOLDER}/file1.txt", f"{SOURCE_FOLDER}/file2.txt"] +SOURCE_OBJECTS_LIST = [ + f"{SOURCE_FOLDER}/file1.txt", + f"{SOURCE_FOLDER}/file2.txt", + f"{SOURCE_FOLDER}/file3.json", +] SOURCE_OBJECT_WILDCARD_PREFIX = "*test_object" SOURCE_OBJECT_WILDCARD_SUFFIX = "test_object*" @@ -42,13 +47,8 @@ SOURCE_OBJECT_NO_WILDCARD = "test_object.txt" SOURCE_OBJECT_MULTIPLE_WILDCARDS = "csv/*/test_*.csv" DESTINATION_BUCKET = "archive" -DESTINATION_OBJECT = "foo/bar" +DESTINATION_OBJECT = "foo/bar/" DESTINATION_OBJECT_PREFIX = "foo/bar" -SOURCE_FILES_LIST = [ - "test_object/file1.txt", - "test_object/file2.txt", - "test_object/file3.json", -] DELIMITER = ".json" MOD_TIME_1 = datetime(2016, 1, 1) @@ -107,6 +107,7 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook): + mock_hook.return_value.list.side_effect = [[], [SOURCE_OBJECT_NO_WILDCARD]] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -117,8 +118,8 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook): operator.execute(None) mock_calls = [ - mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None), - mock.call(DESTINATION_BUCKET, prefix="test_object.txt", delimiter=None), + mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None), + mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None), ] mock_hook.return_value.list.assert_has_calls(mock_calls) @@ -159,7 +160,7 @@ def test_execute_prefix_and_suffix(self, mock_hook): # copy with wildcard @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_wildcard_with_destination_object(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -177,7 +178,7 @@ def test_execute_wildcard_with_destination_object(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -199,7 +200,7 @@ def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hoo @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_wildcard_without_destination_object(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -216,7 +217,7 @@ def test_execute_wildcard_without_destination_object(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_wildcard_empty_destination_object(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -234,7 +235,7 @@ def test_execute_wildcard_empty_destination_object(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_last_modified_time(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -252,7 +253,7 @@ def test_execute_last_modified_time(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] mock_hook.return_value.is_updated_after.side_effect = [True, True, True] operator = GCSToGCSOperator( task_id=TASK_ID, @@ -271,7 +272,7 @@ def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] mock_hook.return_value.is_updated_after.side_effect = [True, False, False] operator = GCSToGCSOperator( task_id=TASK_ID, @@ -288,7 +289,7 @@ def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_wc_with_no_last_modified_time(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST[:-1] operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -318,7 +319,7 @@ def test_no_prefix_with_last_modified_time_with_true_cond(self, mock_hook): operator.execute(None) mock_hook.return_value.rewrite.assert_called_once_with( - TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt" + TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") @@ -335,7 +336,7 @@ def test_no_prefix_with_maximum_modified_time_with_true_cond(self, mock_hook): operator.execute(None) mock_hook.return_value.rewrite.assert_called_once_with( - TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt" + TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") @@ -353,7 +354,7 @@ def test_exe_last_modified_time_and_maximum_modified_time_with_true_cond(self, m operator.execute(None) mock_hook.return_value.rewrite.assert_called_once_with( - TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt" + TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") @@ -369,7 +370,7 @@ def test_execute_no_prefix_with_no_last_modified_time(self, mock_hook): operator.execute(None) mock_hook.return_value.rewrite.assert_called_once_with( - TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt" + TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") @@ -403,12 +404,12 @@ def test_executes_with_is_older_than_with_true_cond(self, mock_hook): operator.execute(None) mock_hook.return_value.rewrite.assert_called_once_with( - TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt" + TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD, DESTINATION_BUCKET, SOURCE_OBJECT_NO_WILDCARD ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_more_than_1_wildcard(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -426,7 +427,7 @@ def test_execute_more_than_1_wildcard(self, mock_hook): @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_execute_with_empty_destination_bucket(self, mock_hook): - mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + mock_hook.return_value.list.return_value = SOURCE_OBJECTS_LIST operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -500,7 +501,9 @@ def test_executes_with_a_delimiter(self, mock_hook): # COPY @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_executes_with_delimiter_and_destination_object(self, mock_hook): - mock_hook.return_value.list.return_value = ["test_object/file3.json"] + mock_hook.return_value.list.side_effect = [[], [], [SOURCE_OBJECTS_LIST[2]]] + mock_hook.return_value.exists.return_value = False + operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -512,13 +515,17 @@ def test_executes_with_delimiter_and_destination_object(self, mock_hook): operator.execute(None) mock_calls = [ - mock.call(TEST_BUCKET, "test_object/file3.json", DESTINATION_BUCKET, DESTINATION_OBJECT), + mock.call( + TEST_BUCKET, SOURCE_OBJECTS_LIST[2], DESTINATION_BUCKET, DESTINATION_OBJECT + "file3.json" + ), ] mock_hook.return_value.rewrite.assert_has_calls(mock_calls) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_executes_with_different_delimiter_and_destination_object(self, mock_hook): - mock_hook.return_value.list.return_value = ["test_object/file1.txt", "test_object/file2.txt"] + mock_hook.return_value.list.side_effect = [[SOURCE_OBJECTS_LIST[0]], [SOURCE_OBJECTS_LIST[1]], []] + mock_hook.return_value.exists.return_value = False + operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, @@ -530,8 +537,12 @@ def test_executes_with_different_delimiter_and_destination_object(self, mock_hoo operator.execute(None) mock_calls = [ - mock.call(TEST_BUCKET, "test_object/file1.txt", DESTINATION_BUCKET, "test_object/file1.txt"), - mock.call(TEST_BUCKET, "test_object/file2.txt", DESTINATION_BUCKET, "test_object/file2.txt"), + mock.call( + TEST_BUCKET, SOURCE_OBJECTS_LIST[0], DESTINATION_BUCKET, DESTINATION_OBJECT + "file1.txt" + ), + mock.call( + TEST_BUCKET, SOURCE_OBJECTS_LIST[1], DESTINATION_BUCKET, DESTINATION_OBJECT + "file2.txt" + ), ] mock_hook.return_value.rewrite.assert_has_calls(mock_calls) diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py index e605811a8b60a..b377be2e55add 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py @@ -213,6 +213,7 @@ chain( # TEST SETUP + generate_random_file, [create_bucket_src, create_bucket_dst], [upload_file_src, upload_file_src_sub], [upload_file_dst, upload_file_dst_sub],