Skip to content

Commit

Permalink
feat: new script to translate ascii files
Browse files Browse the repository at this point in the history
also generalise existing functions for resuse
fix: formatting


feat: write output so doesn't get removed with tmpdir


fix: needs credentials


fix: remove unnecessary code


fix: tidy and move to util folder


fix: rebase fix


fix: format
  • Loading branch information
MDavidson17 committed Aug 10, 2023
1 parent 2dfe650 commit e96c88a
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 24 deletions.
4 changes: 2 additions & 2 deletions scripts/collection_from_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from linz_logger import get_log

from scripts.cli.cli_helper import coalesce_multi_single
from scripts.files.fs_s3 import bucket_name_from_path, get_object_parallel_multithreading, list_json_in_uri
from scripts.files.fs_s3 import bucket_name_from_path, get_object_parallel_multithreading, list_uri
from scripts.logging.time_helper import time_in_ms
from scripts.stac.imagery.collection import ImageryCollection
from scripts.stac.imagery.provider import Provider, ProviderRole
Expand Down Expand Up @@ -54,7 +54,7 @@ def main() -> None:

s3_client = client("s3")

files_to_read = list_json_in_uri(uri, s3_client)
files_to_read = list_uri(uri, s3_client, extension=".json")

start_time = time_in_ms()
for key, result in get_object_parallel_multithreading(
Expand Down
51 changes: 35 additions & 16 deletions scripts/files/fs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
from typing import List, Optional

import ulid
from linz_logger import get_log
Expand Down Expand Up @@ -51,21 +51,34 @@ def exists(path: str) -> bool:
return fs_local.exists(path)


def _download_tiff_and_sidecar(target: str, file: str) -> str:
def _download_file_and_sidecar(
file: str, target: str, preserve_name: bool = False, sidecars: Optional[List[str]] = None
) -> str:
"""
Download a tiff file and some of its sidecar files if they exist to the target dir.
Download a specified file and its sidecar files if they exist to the target dir.
Args:
file (list): list of files to download
target (str): target folder to write to
s3_file (str): source file
preserve_name (boolean): if true output filename = input filename; else output filename = ulid.
sidecars (list): sidecar extenions to attempt download
Returns:
downloaded file path
"""
download_path = os.path.join(target, f"{ulid.ULID()}.tiff")
if preserve_name:
download_path = os.path.join(target, f"{file.split('/')[-1]}")
else:
ext = f".{file.split('.')[-1]}"
download_path = os.path.join(target, f"{ulid.ULID()}{ext}")

get_log().info("Download File", path=file, target_path=download_path)
write(download_path, read(file))
for ext in [".prj", ".tfw"]:

if not sidecars:
return download_path

for ext in sidecars:
try:
write(f"{target.split('.')[0]}{ext}", read(f"{file.split('.')[0]}{ext}"))
get_log().info(
Expand All @@ -76,28 +89,34 @@ def _download_tiff_and_sidecar(target: str, file: str) -> str:
return download_path


def download_tiffs_parallel_multithreaded(inputs: List[str], target: str, concurrency: int = 10) -> List[str]:
def download_files_parallel_multithreaded(
inputs: List[str], target: str, preserve_name: bool = False, sidecars: Optional[List[str]] = None, concurrency: int = 10
) -> List[str]:
"""
Download list of tiffs to target destination using multithreading.
Download list of files to target destination using multithreading.
Args:
inputs (list): list of tiffs to download
inputs (list): list of files to download
target (str): target folder to write to
preserve_name (boolean): if true output filename = input filename; else output filename = ulid.
sidecars (list): sidecar extenions to attempt download
concurrency (int): maximum number of files to download concurrently
Returns:
list of downloaded file paths
"""
downloaded_tiffs: List[str] = []
downloaded_files: List[str] = []
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futuress = {executor.submit(_download_tiff_and_sidecar, target, input): input for input in inputs}
futuress = {
executor.submit(_download_file_and_sidecar, input, target, preserve_name, sidecars): input for input in inputs
}
for future in as_completed(futuress):
if future.exception():
get_log().warn("Failed Download", error=future.exception())
else:
downloaded_tiffs.append(future.result())
downloaded_files.append(future.result())

if len(inputs) != len(downloaded_tiffs):
get_log().error("Missing Files", missing_file_count=len(inputs) - len(downloaded_tiffs))
if len(inputs) != len(downloaded_files):
get_log().error("Missing Files", missing_file_count=len(inputs) - len(downloaded_files))
raise Exception("Not all source files were downloaded")
return downloaded_tiffs
return downloaded_files
7 changes: 3 additions & 4 deletions scripts/files/fs_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from linz_logger import get_log

from scripts.aws.aws_helper import get_session, parse_path
from scripts.files.files_helper import is_json
from scripts.logging.time_helper import time_in_ms


Expand Down Expand Up @@ -164,7 +163,7 @@ def prefix_from_path(path: str) -> str:
return path.replace(f"s3://{bucket_name}/", "")


def list_json_in_uri(uri: str, s3_client: Optional[boto3.client]) -> List[str]:
def list_uri(uri: str, s3_client: Optional[boto3.client], extension: Optional[str] = None) -> List[str]:
"""Get the `JSON` files from a s3 path
Args:
Expand All @@ -183,8 +182,8 @@ def list_json_in_uri(uri: str, s3_client: Optional[boto3.client]) -> List[str]:
for response in response_iterator:
for contents_data in response["Contents"]:
key = contents_data["Key"]
if not is_json(key):
get_log().trace("skipping file not json", file=key, action="collection_from_items", reason="skip")
if extension not in key:
get_log().trace("skipping file not specified extension", file=key, extension=extension, reason="skip")
continue
files.append(key)
get_log().info("Files Listed", number_of_files=len(files))
Expand Down
4 changes: 2 additions & 2 deletions scripts/standardising.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from scripts.aws.aws_helper import is_s3
from scripts.cli.cli_helper import TileFiles
from scripts.files.file_tiff import FileTiff, FileTiffType
from scripts.files.fs import download_tiffs_parallel_multithreaded, exists, read, write
from scripts.files.fs import download_files_parallel_multithreaded, exists, read, write
from scripts.gdal.gdal_bands import get_gdal_band_offset
from scripts.gdal.gdal_helper import get_gdal_version, run_gdal
from scripts.gdal.gdal_preset import (
Expand Down Expand Up @@ -128,7 +128,7 @@ def standardising(
# Download any needed file from S3 ["/foo/bar.tiff", "s3://foo"] => "/tmp/bar.tiff", "/tmp/foo.tiff"
with tempfile.TemporaryDirectory() as tmp_path:
standardized_working_path = os.path.join(tmp_path, standardized_file_name)
source_tiffs = download_tiffs_parallel_multithreaded(files.input, tmp_path)
source_tiffs = download_files_parallel_multithreaded(files.input, tmp_path)

vrt_add_alpha = True

Expand Down
7 changes: 7 additions & 0 deletions scripts/util/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Util

This folder contains utiliy scripts used to fix datasets, but are not expexted to be regularly required.

## 1. translate_ascii.py

10/08/2023 - [TDE-814](https://toitutewhenua.atlassian.net/browse/TDE-814?atlOrigin=eyJpIjoiMDRhODBiOTcyMTQ2NDM2NDk4ZjJkZmY5ODg3MDdlZDUiLCJwIjoiaiJ9)
54 changes: 54 additions & 0 deletions scripts/util/translate_ascii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import argparse
import os
import tempfile

from scripts.aws.aws_helper import get_session
from scripts.files.fs import download_files_parallel_multithreaded, read, write
from scripts.files.fs_s3 import bucket_name_from_path, list_uri
from scripts.gdal.gdal_helper import run_gdal


def main() -> None:
"""- Downloads all ascii files in a given s3 path,
- Translates the files to tiffs,
- Writes them to the specified target.
Arguments:
--source - s3 path to source data
--target - local or s3 path to write converted tiffs
example:
python translate_ascii.py --source s3://bucket/input-path/ -- target s3://bucket/output-path/
"""

parser = argparse.ArgumentParser()
parser.add_argument("--source", dest="source", required=True, help="S3 path to the input ascii files")
parser.add_argument("--target", dest="target", required=True, help="Output location path")
arguments = parser.parse_args()

uri = arguments.source

if not uri.startswith("s3://"):
msg = f"uri is not a s3 path: {uri}"
raise argparse.ArgumentTypeError(msg)

# list ascii files
s3 = get_session(uri).client("s3")
files = list_uri(uri, s3, extension=".asc")
# reformat output to full s3 path
files = [f"s3://{bucket_name_from_path(uri)}/{file}" for file in files]

# download ascii files
with tempfile.TemporaryDirectory() as tmp_path:
source_ascii = download_files_parallel_multithreaded(inputs=files, target=tmp_path, preserve_name=True)

# translate from ascii to geotiff
for asc in source_ascii:
output = os.path.join(tmp_path, f"{asc.split('/')[-1].split('.')[0]}.tif")
command = ["gdal_translate", "-of", "GTiff", asc, output]
run_gdal(command)
write(os.path.join(arguments.target, f"{asc.split('/')[-1].split('.')[0]}.tif"), read(output))


if __name__ == "__main__":
main()

0 comments on commit e96c88a

Please sign in to comment.