From e96c88a3395e9ca667b2c6809282024838d56c89 Mon Sep 17 00:00:00 2001 From: Megan Davidson Date: Thu, 10 Aug 2023 14:40:55 +1200 Subject: [PATCH] feat: new script to translate ascii files also generalise existing functions for resuse fix: formatting feat: write output so doesn't get removed with tmpdir fix: needs credentials fix: remove unnecessary code fix: tidy and move to util folder fix: rebase fix fix: format --- scripts/collection_from_items.py | 4 +-- scripts/files/fs.py | 51 ++++++++++++++++++++---------- scripts/files/fs_s3.py | 7 ++--- scripts/standardising.py | 4 +-- scripts/util/README.md | 7 +++++ scripts/util/translate_ascii.py | 54 ++++++++++++++++++++++++++++++++ 6 files changed, 103 insertions(+), 24 deletions(-) create mode 100644 scripts/util/README.md create mode 100644 scripts/util/translate_ascii.py diff --git a/scripts/collection_from_items.py b/scripts/collection_from_items.py index 642697ad4..45b85a72c 100644 --- a/scripts/collection_from_items.py +++ b/scripts/collection_from_items.py @@ -7,7 +7,7 @@ from linz_logger import get_log from scripts.cli.cli_helper import coalesce_multi_single -from scripts.files.fs_s3 import bucket_name_from_path, get_object_parallel_multithreading, list_json_in_uri +from scripts.files.fs_s3 import bucket_name_from_path, get_object_parallel_multithreading, list_uri from scripts.logging.time_helper import time_in_ms from scripts.stac.imagery.collection import ImageryCollection from scripts.stac.imagery.provider import Provider, ProviderRole @@ -54,7 +54,7 @@ def main() -> None: s3_client = client("s3") - files_to_read = list_json_in_uri(uri, s3_client) + files_to_read = list_uri(uri, s3_client, extension=".json") start_time = time_in_ms() for key, result in get_object_parallel_multithreading( diff --git a/scripts/files/fs.py b/scripts/files/fs.py index 91ffd4e8a..f5d749477 100644 --- a/scripts/files/fs.py +++ b/scripts/files/fs.py @@ -1,6 +1,6 @@ import os from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import List +from typing import List, Optional import ulid from linz_logger import get_log @@ -51,21 +51,34 @@ def exists(path: str) -> bool: return fs_local.exists(path) -def _download_tiff_and_sidecar(target: str, file: str) -> str: +def _download_file_and_sidecar( + file: str, target: str, preserve_name: bool = False, sidecars: Optional[List[str]] = None +) -> str: """ - Download a tiff file and some of its sidecar files if they exist to the target dir. + Download a specified file and its sidecar files if they exist to the target dir. Args: + file (list): list of files to download target (str): target folder to write to - s3_file (str): source file + preserve_name (boolean): if true output filename = input filename; else output filename = ulid. + sidecars (list): sidecar extenions to attempt download Returns: downloaded file path """ - download_path = os.path.join(target, f"{ulid.ULID()}.tiff") + if preserve_name: + download_path = os.path.join(target, f"{file.split('/')[-1]}") + else: + ext = f".{file.split('.')[-1]}" + download_path = os.path.join(target, f"{ulid.ULID()}{ext}") + get_log().info("Download File", path=file, target_path=download_path) write(download_path, read(file)) - for ext in [".prj", ".tfw"]: + + if not sidecars: + return download_path + + for ext in sidecars: try: write(f"{target.split('.')[0]}{ext}", read(f"{file.split('.')[0]}{ext}")) get_log().info( @@ -76,28 +89,34 @@ def _download_tiff_and_sidecar(target: str, file: str) -> str: return download_path -def download_tiffs_parallel_multithreaded(inputs: List[str], target: str, concurrency: int = 10) -> List[str]: +def download_files_parallel_multithreaded( + inputs: List[str], target: str, preserve_name: bool = False, sidecars: Optional[List[str]] = None, concurrency: int = 10 +) -> List[str]: """ - Download list of tiffs to target destination using multithreading. + Download list of files to target destination using multithreading. Args: - inputs (list): list of tiffs to download + inputs (list): list of files to download target (str): target folder to write to - + preserve_name (boolean): if true output filename = input filename; else output filename = ulid. + sidecars (list): sidecar extenions to attempt download + concurrency (int): maximum number of files to download concurrently Returns: list of downloaded file paths """ - downloaded_tiffs: List[str] = [] + downloaded_files: List[str] = [] with ThreadPoolExecutor(max_workers=concurrency) as executor: - futuress = {executor.submit(_download_tiff_and_sidecar, target, input): input for input in inputs} + futuress = { + executor.submit(_download_file_and_sidecar, input, target, preserve_name, sidecars): input for input in inputs + } for future in as_completed(futuress): if future.exception(): get_log().warn("Failed Download", error=future.exception()) else: - downloaded_tiffs.append(future.result()) + downloaded_files.append(future.result()) - if len(inputs) != len(downloaded_tiffs): - get_log().error("Missing Files", missing_file_count=len(inputs) - len(downloaded_tiffs)) + if len(inputs) != len(downloaded_files): + get_log().error("Missing Files", missing_file_count=len(inputs) - len(downloaded_files)) raise Exception("Not all source files were downloaded") - return downloaded_tiffs + return downloaded_files diff --git a/scripts/files/fs_s3.py b/scripts/files/fs_s3.py index e1f4bf775..42221c129 100644 --- a/scripts/files/fs_s3.py +++ b/scripts/files/fs_s3.py @@ -7,7 +7,6 @@ from linz_logger import get_log from scripts.aws.aws_helper import get_session, parse_path -from scripts.files.files_helper import is_json from scripts.logging.time_helper import time_in_ms @@ -164,7 +163,7 @@ def prefix_from_path(path: str) -> str: return path.replace(f"s3://{bucket_name}/", "") -def list_json_in_uri(uri: str, s3_client: Optional[boto3.client]) -> List[str]: +def list_uri(uri: str, s3_client: Optional[boto3.client], extension: Optional[str] = None) -> List[str]: """Get the `JSON` files from a s3 path Args: @@ -183,8 +182,8 @@ def list_json_in_uri(uri: str, s3_client: Optional[boto3.client]) -> List[str]: for response in response_iterator: for contents_data in response["Contents"]: key = contents_data["Key"] - if not is_json(key): - get_log().trace("skipping file not json", file=key, action="collection_from_items", reason="skip") + if extension not in key: + get_log().trace("skipping file not specified extension", file=key, extension=extension, reason="skip") continue files.append(key) get_log().info("Files Listed", number_of_files=len(files)) diff --git a/scripts/standardising.py b/scripts/standardising.py index e2a1ad67c..f2da18e32 100644 --- a/scripts/standardising.py +++ b/scripts/standardising.py @@ -9,7 +9,7 @@ from scripts.aws.aws_helper import is_s3 from scripts.cli.cli_helper import TileFiles from scripts.files.file_tiff import FileTiff, FileTiffType -from scripts.files.fs import download_tiffs_parallel_multithreaded, exists, read, write +from scripts.files.fs import download_files_parallel_multithreaded, exists, read, write from scripts.gdal.gdal_bands import get_gdal_band_offset from scripts.gdal.gdal_helper import get_gdal_version, run_gdal from scripts.gdal.gdal_preset import ( @@ -128,7 +128,7 @@ def standardising( # Download any needed file from S3 ["/foo/bar.tiff", "s3://foo"] => "/tmp/bar.tiff", "/tmp/foo.tiff" with tempfile.TemporaryDirectory() as tmp_path: standardized_working_path = os.path.join(tmp_path, standardized_file_name) - source_tiffs = download_tiffs_parallel_multithreaded(files.input, tmp_path) + source_tiffs = download_files_parallel_multithreaded(files.input, tmp_path) vrt_add_alpha = True diff --git a/scripts/util/README.md b/scripts/util/README.md new file mode 100644 index 000000000..699625c24 --- /dev/null +++ b/scripts/util/README.md @@ -0,0 +1,7 @@ +# Util + +This folder contains utiliy scripts used to fix datasets, but are not expexted to be regularly required. + +## 1. translate_ascii.py + +10/08/2023 - [TDE-814](https://toitutewhenua.atlassian.net/browse/TDE-814?atlOrigin=eyJpIjoiMDRhODBiOTcyMTQ2NDM2NDk4ZjJkZmY5ODg3MDdlZDUiLCJwIjoiaiJ9) diff --git a/scripts/util/translate_ascii.py b/scripts/util/translate_ascii.py new file mode 100644 index 000000000..553290c76 --- /dev/null +++ b/scripts/util/translate_ascii.py @@ -0,0 +1,54 @@ +import argparse +import os +import tempfile + +from scripts.aws.aws_helper import get_session +from scripts.files.fs import download_files_parallel_multithreaded, read, write +from scripts.files.fs_s3 import bucket_name_from_path, list_uri +from scripts.gdal.gdal_helper import run_gdal + + +def main() -> None: + """- Downloads all ascii files in a given s3 path, + - Translates the files to tiffs, + - Writes them to the specified target. + + Arguments: + --source - s3 path to source data + --target - local or s3 path to write converted tiffs + + example: + python translate_ascii.py --source s3://bucket/input-path/ -- target s3://bucket/output-path/ + """ + + parser = argparse.ArgumentParser() + parser.add_argument("--source", dest="source", required=True, help="S3 path to the input ascii files") + parser.add_argument("--target", dest="target", required=True, help="Output location path") + arguments = parser.parse_args() + + uri = arguments.source + + if not uri.startswith("s3://"): + msg = f"uri is not a s3 path: {uri}" + raise argparse.ArgumentTypeError(msg) + + # list ascii files + s3 = get_session(uri).client("s3") + files = list_uri(uri, s3, extension=".asc") + # reformat output to full s3 path + files = [f"s3://{bucket_name_from_path(uri)}/{file}" for file in files] + + # download ascii files + with tempfile.TemporaryDirectory() as tmp_path: + source_ascii = download_files_parallel_multithreaded(inputs=files, target=tmp_path, preserve_name=True) + + # translate from ascii to geotiff + for asc in source_ascii: + output = os.path.join(tmp_path, f"{asc.split('/')[-1].split('.')[0]}.tif") + command = ["gdal_translate", "-of", "GTiff", asc, output] + run_gdal(command) + write(os.path.join(arguments.target, f"{asc.split('/')[-1].split('.')[0]}.tif"), read(output)) + + +if __name__ == "__main__": + main()