From 37dcb8b49d9a0cf45dcc6d0ce3011a4d6016e9e7 Mon Sep 17 00:00:00 2001 From: Alice Fage Date: Tue, 16 Jul 2024 09:40:38 +1200 Subject: [PATCH] fix: concurrency file order TDE-1213 (#998) #### Motivation When supplying multiple source locations to use for standardising, if there are overlapping source images, the VRT created does not honour the priority order for the supplied datasets. This Pull Request is to retain the priority of the images for merging. #### Modification Move away from using `as_completed` for parallel processing so that the output file list is in the same order as the input file list. #### Checklist _If not applicable, provide explanation of why._ - [x] Tests updated - [x] Docs updated - [x] Issue linked in Title --- scripts/files/fs.py | 48 +++++++++++++++++----------------- scripts/files/tests/fs_test.py | 16 ++++++++++++ 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/scripts/files/fs.py b/scripts/files/fs.py index 5c6e01654..7981a6a90 100644 --- a/scripts/files/fs.py +++ b/scripts/files/fs.py @@ -1,5 +1,5 @@ import os -from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING @@ -105,14 +105,17 @@ def write_all(inputs: list[str], target: str, concurrency: int | None = 4, gener Returns: list of written file paths """ + results: list[Future] = [] # type: ignore written_tiffs: list[str] = [] with ThreadPoolExecutor(max_workers=concurrency) as executor: - futuress = {write_file(executor, input_, target, generate_name): input_ for input_ in inputs} - for future in as_completed(futuress): - if future.exception(): - get_log().warn("Failed Read-Write", error=future.exception()) - else: - written_tiffs.append(future.result()) + for input_ in inputs: + results.append(executor.submit(write_file, input_, target, generate_name)) + + for future in results: + if future.exception(): + get_log().warn("Failed Read-Write", error=future.exception()) + else: + written_tiffs.append(future.result()) if len(inputs) != len(written_tiffs): get_log().error("Missing Files", count=len(inputs) - len(written_tiffs)) @@ -130,26 +133,28 @@ def write_sidecars(inputs: list[str], target: str, concurrency: int | None = 4) target: target folder to write to concurrency: max thread pool workers """ + results: list[Future] = [] # type: ignore with ThreadPoolExecutor(max_workers=concurrency) as executor: - results = {write_file(executor, input_, target): input_ for input_ in inputs} - for future in as_completed(results): - future_ex = future.exception() - if isinstance(future_ex, NoSuchFileError): - get_log().info("No sidecar file found; skipping", path=future_ex.path) - else: - get_log().info("wrote_sidecar_file", path=future.result()) + for input_ in inputs: + results.append(executor.submit(write_file, input_, target)) + + for future in results: + future_ex = future.exception() + if isinstance(future_ex, NoSuchFileError): + get_log().info("No sidecar file found; skipping", error=future.exception()) + else: + get_log().info("wrote_sidecar_file", path=future.result()) -def write_file(executor: ThreadPoolExecutor, input_: str, target: str, generate_name: bool | None = True) -> Future[str]: +def write_file(input_: str, target: str, generate_name: bool | None = True) -> str: """Read a file from a path and write it to a target path. Args: - executor: A ThreadPoolExecutor instance. - input_: A path to a file to read. + input: A path to a file to read. target: A path to write the file to. generate_name: create a target file name based on multihash the source filename Returns: - Future[str]: The result of the execution. + str: Target file name. """ get_log().info(f"Trying write from file: {input_}") @@ -159,12 +164,7 @@ def write_file(executor: ThreadPoolExecutor, input_: str, target: str, generate_ else: target_file_name = os.path.basename(input_) - try: - return executor.submit(copy, input_, os.path.join(target, target_file_name)) - except NoSuchFileError as nsfe: - future: Future[str] = Future() - future.set_exception(nsfe) - return future + return copy(input_, os.path.join(target, target_file_name)) class NoSuchFileError(Exception): diff --git a/scripts/files/tests/fs_test.py b/scripts/files/tests/fs_test.py index 71d55e8e9..120d597e6 100644 --- a/scripts/files/tests/fs_test.py +++ b/scripts/files/tests/fs_test.py @@ -89,6 +89,22 @@ def test_write_sidecars_one_found(capsys: CaptureFixture[str], subtests: SubTest rmtree(target) +def test_write_all_in_order(setup: str) -> None: + inputs: list[str] = [] + file_contents = "a" * 1000 * 1000 + i = 0 + while i < 10: + path = Path(os.path.join(setup, str(i))) + if i % 2 == 0: + path.write_text(file_contents, encoding="utf-8") # 1MB + else: + path.touch() + inputs.append(path.as_posix()) + i += 1 + written_files = write_all(inputs=inputs, target=setup, generate_name=False) + assert written_files == inputs + + @mock_aws def test_should_get_s3_object_modified_datetime() -> None: bucket_name = "any-bucket-name"