Skip to content

Commit

Permalink
Merge pull request #22 from water3d/Adam_dev
Browse files Browse the repository at this point in the history
Small changes to comments and doctrings to be consistent as well as small clean ups
  • Loading branch information
nickrsan authored Sep 12, 2023
2 parents c503cd6 + d538bc2 commit 7d4a4a1
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 75 deletions.
2 changes: 1 addition & 1 deletion eedl/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2023.08.31"
__version__ = "2023.09.12"
16 changes: 8 additions & 8 deletions eedl/google_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def get_public_export_urls(bucket_name: str, prefix: str = "") -> List[str]:
# get the content of the bucket (it needs to be public)
listing = requests.get(search_url).text

# comes back as an XML listing - don't need to parse the XML, just need the values of the Key elements
# Comes back as an XML listing - don't need to parse the XML, just need the values of the Key elements
pattern = re.compile("<Key>(.*?)</Key>")
items = pattern.findall(listing)
# make them into full URLs with the bucket URL at the front and check if the files have the prefix specific
# Make them into full URLs with the bucket URL at the front and check if the files have the prefix specific
filtered = [f"{request_url}{item}" for item in items if item.startswith(prefix)]

return filtered
Expand All @@ -49,17 +49,17 @@ def download_public_export(bucket_name: str, output_folder: Union[str, Path], pr
:type prefix: str
:return: None.
"""
# get the urls of items in the bucket with the specified prefix
# Get the urls of items in the bucket with the specified prefix
urls = get_public_export_urls(bucket_name, prefix)

os.makedirs(output_folder, exist_ok=True)

for url in urls:
filename = url.split("/")[-1] # get the filename
output_path = Path(output_folder) / filename # construct the output path
# get the data - this could be a problem if it's larger than fits in RAM - I believe requests has a way to operate as a streambuffer - not looking into that at this moment
filename = url.split("/")[-1] # Get the filename
output_path = Path(output_folder) / filename # Construct the output path
# Get the data - this could be a problem if it's larger than fits in RAM - I believe requests has a way to operate as a streambuffer - not looking into that at this moment
response = requests.get(url)
output_path.write_bytes(response.content) # write it to a file
output_path.write_bytes(response.content) # Write it to a file


def download_export(bucket_name: str,
Expand All @@ -68,7 +68,7 @@ def download_export(bucket_name: str,
delimiter: str = "/",
autodelete: bool = True) -> None:

"""Downloads a blob from the bucket.
"""Downloads a blob from the specified bucket.
Modified from Google Cloud sample documentation at
https://cloud.google.com/storage/docs/samples/storage-download-file#storage_download_file-python
Expand Down
2 changes: 1 addition & 1 deletion eedl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _single_item_extract(self, image, task_registry, zonal_features, aoi_attr, e
export_image.zonal_inject_constants = zonal_inject_constants

filename_suffix = f"{aoi_attr}_{image_date}"
if self.skip_existing and export_image.check_mosaic_exists(aoi_download_folder, self.export_folder, f"{filename_description}_{filename_suffix}"):
if self.skip_existing and export_image.check_mosaic_exists(aoi_download_folder, self.export_folder, f"{self.filename_description}_{filename_suffix}"):
print(f"Image {filename_suffix} exists and skip_existing=True. Skipping")
return

Expand Down
81 changes: 39 additions & 42 deletions eedl/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def download_images_in_folder(source_location: Union[str, Path], download_locati
"""
Handles pulling data from Google Drive over to a local location, filtering by a filename prefix and folder
:param source_location: Directory to search for files
:param source_location: Directory to search for files.
:type source_location: Union[str, Path]
:param download_location: Destination for files with the specified prefix
:param download_location: Destination for files with the specified prefix.
:type download_location: Union[str, Path]
:param prefix: A prefix to use to filter items in the folder - only files where the name matches this prefix will be moved
:param prefix: A prefix to use to filter items in the folder - only files where the name matches this prefix will be moved.
:type prefix: str
:return: None
"""
Expand All @@ -57,12 +57,12 @@ def download_images_in_folder(source_location: Union[str, Path], download_locati
os.makedirs(download_location, exist_ok=True)

for filename in files:
shutil.move(os.path.join(folder_search_path, filename), os.path.join(download_location, filename))
shutil.move(str(os.path.join(folder_search_path, filename)), str(os.path.join(download_location, filename)))


class TaskRegistry:
"""
The TaskRegistry class makes it convent to manage arbitrarily many Earth Engine images that are in varying states of being downloaded.
The TaskRegistry class makes it convenient to manage arbitrarily many Earth Engine images that are in varying states of being downloaded.
"""
INCOMPLETE_STATUSES = ("READY", "UNSUBMITTED", "RUNNING")
COMPLETE_STATUSES = ["COMPLETED"]
Expand All @@ -81,8 +81,7 @@ def __init__(self) -> None:

def add(self, image: ee.image.Image) -> None:
"""
Adds an Earth Engine image to the list of Earth Engine images
Adds an Earth Engine image to the list of Earth Engine images.
:param image: Earth Engine image to be added to the list of images
:type image: ee.image.Image
:return: None
Expand All @@ -92,9 +91,8 @@ def add(self, image: ee.image.Image) -> None:
@property
def incomplete_tasks(self) -> List[ee.image.Image]:
"""
List of Earth Engine images that have not been completed yet
:return: List of Earth Engine images that have not been completed yet
List of Earth Engine images that have not been completed yet.
:return: List of Earth Engine images that have not been completed yet.
:rtype: List[ee.image.Image]
"""
initial_tasks = [image for image in self.images if image.last_task_status['state'] in self.INCOMPLETE_STATUSES]
Expand All @@ -106,30 +104,32 @@ def incomplete_tasks(self) -> List[ee.image.Image]:
@property
def complete_tasks(self) -> List[ee.image.Image]:
"""
List of Earth Engine images
:return: List of Earth Engine images
List of Earth Engine images.
:return: List of Earth Engine images.
:rtype: List[ee.image.Image]
"""
return [image for image in self.images if image.last_task_status['state'] in self.COMPLETE_STATUSES + self.FAILED_STATUSES]

@property
def failed_tasks(self) -> List[ee.image.Image]:
"""
List of Earth Engine images that have either been cancelled or that have failed
"""
return [image for image in self.images if image.last_task_status['state'] in self.FAILED_STATUSES]

@property
def downloadable_tasks(self) -> List[ee.image.Image]:
"""
List of Earth Engine images that have successfully been downloaded
:return: List of Earth Engine images that have successfully been downloaded
List of Earth Engine images that have not been cancelled or have failed.
:return: List of Earth Engine images that have not been cancelled or have failed.
:rtype: List[ee.image.Image]
"""
return [image for image in self.complete_tasks if image.task_data_downloaded is False and image.last_task_status['state'] not in self.FAILED_STATUSES]

def download_ready_images(self, download_location: Union[str, Path]) -> None:
"""
:param download_location: Destination for downloaded files
Downloads all images that are ready to be downloaded.
:param download_location: Destination for downloaded files.
:type download_location: Union[str, Path]
:return: None
"""
Expand Down Expand Up @@ -178,7 +178,6 @@ def wait_for_images(self,
on_failure: str = "log") -> None:
"""
Blocker until there are no more incomplete or downloadable tasks left.
:param download_location: Destination for downloaded files.
:type download_location: Union[str, Path]
:param sleep_time: Time between checking if the disk is full in seconds. Defaults to 10 seconds.
Expand Down Expand Up @@ -235,9 +234,9 @@ class EEDLImage:
:param crs: Coordinate Reference System to use for exports in a format Earth Engine understands, such as "EPSG:3310"
:type crs: Optional[str]
:param tile_size: the number of pixels per side of tiles to export
:param tile_size: The number of pixels per side of tiles to export
:type tile_size: Optional[int]
:param export_folder: the name of the folder in the chosen export location that will be created for the export
:param export_folder: The name of the folder in the chosen export location that will be created for the export
:type export_folder: Optional[Union[str, Path]]
This docstring needs to be checked to ensure it's in a standard format that Sphinx will render
Expand Down Expand Up @@ -273,24 +272,24 @@ def __init__(self, **kwargs) -> None:
self.zonal_inject_constants: dict = dict()
self.zonal_nodata_value: int = -9999

# set the defaults here - this is a nice strategy where we get to define constants near the top that aren't buried in code, then apply them here
# Set the defaults here - this is a nice strategy where we get to define constants near the top that aren't buried in code, then apply them here.
for key in DEFAULTS:
setattr(self, key.lower(), DEFAULTS[key])

for key in kwargs: # now apply any provided keyword arguments over the top of the defaults.
for key in kwargs: # Now apply any provided keyword arguments over the top of the defaults.
setattr(self, key, kwargs[key])

self._last_task_status = {"state": "UNSUBMITTED"}
# this will be the default status initially, so always assume it's UNSUBMITTED if we haven't gotten anything
# from the server. "None" would work too, but then we couldn't just check the status
# This will be the default status initially, so always assume it's UNSUBMITTED if we haven't gotten anything.
# From the server. "None" would work too, but then we couldn't just check the status.
self.task_data_downloaded = False
self.export_type = "Drive" # other option is "Cloud"
self.export_type = "Drive" # The other option is "Cloud".

def _set_names(self, filename_suffix: str = "") -> None:
"""
:param filename_suffix: Suffix used to later identify files.
:type filename_suffix: Str
:type filename_suffix: str
:return: None
"""
self.description = filename_suffix
Expand All @@ -300,15 +299,14 @@ def _set_names(self, filename_suffix: str = "") -> None:
def _initialize() -> None:
"""
Handles the initialization and potentially the authentication of Earth Engine
:return: None
"""
try: # try just a basic discardable operation used in their docs so that we don't initialize if we don't need to
try: # Try just a basic discard-able operation used in their docs so that we don't initialize if we don't need to.
_ = ee.Image("NASA/NASADEM_HGT/001")
except EEException: # if it fails, try just running initialize
except EEException: # If it fails, try just running initialize.
try:
ee.Initialize()
except EEException: # if that still fails, try authenticating first
except EEException: # If that still fails, try authenticating first.
ee.Authenticate()
ee.Initialize()

Expand All @@ -327,7 +325,7 @@ def last_task_status(self, new_status: Dict[str, str]) -> None:
Sets the value of the private variable "_last_task_status" to a specified value. Realistically, this shouldn't
be used as the value should only be set from within the object, but it's here in case it's needed.
:param new_status: Updated status
:param new_status: Status to update the _last_task_status to.
:type new_status: Dict[str, str]
:return: None
"""
Expand All @@ -343,7 +341,6 @@ def export(self,
**export_kwargs: Unpack[EEExportDict]) -> None:
"""
Handles the exporting of an image
:param image: Image for export
:type image: ee.image.Image
:param filename_suffix: The unique identifier used internally to identify images.
Expand All @@ -363,6 +360,7 @@ def export(self,
"""

if not isinstance(image, ee.image.Image):

raise ValueError("Invalid image provided for export - please provide a single image (not a collection or another object) of class ee.image.Image for export")

if export_type.lower() == "drive" and \
Expand All @@ -372,7 +370,7 @@ def export(self,
raise NotADirectoryError("The provided path for the Google Drive export folder is not a valid directory but"
" Drive export was specified. Either change the export type to use Google Cloud"
" and set that up properly (with a bucket, etc), or set the drive_root_folder"
" to a valid folder")
" to a valid folder.")
elif export_type.lower() == "drive":
if drive_root_folder:
self.drive_root_folder = drive_root_folder
Expand Down Expand Up @@ -449,15 +447,15 @@ def check_mosaic_exists(download_location: Union[str, Path], export_folder: Unio

def download_results(self, download_location: Union[str, Path], callback: Optional[str] = None, drive_wait: int = 15) -> None:
"""
:param download_location: The directory where the results should be downloaded to
:param download_location: The directory where the results should be downloaded to. Expects a string path or a Pathlib Path object.
:type download_location: Union[str, Path]
:param callback: The callback function called once the image is downloaded
:param callback: The callback function is called once the image has been downloaded.
:type callback: Optional[str]
:param drive_wait: The amount of time in seconds to wait to allow for files that Earth Engine reports have been exported to actually populate. Default is 15 seconds.
:type drive_wait: int
:return: None
"""
# need an event loop that checks self.task.status(), which
# will get the current state of the task
# Need an event loop that checks self.task.status(), which will get the current state of the task.

# state options
# == "CANCELLED", "CANCEL_REQUESTED", "COMPLETED",
Expand Down Expand Up @@ -536,14 +534,13 @@ def zonal_stats(self,
:param stats:
:type stats: Tuple[str, ...]
:param report_threshold: After how many iterations should it print out the feature number it's on. Defaults to 1000.
Set to None to disable
Set to None to disable.
:type report_threshold: int
:param write_batch_size: How many zones should we store up before writing to the disk? Defaults to 2000
:param write_batch_size: How many zones should we store up before writing to the disk? Defaults to 2000.
:type write_batch_size: int
:param use_points:
:type use_points: bool
:return: None
"""

self.zonal_output_filepath = zonal.zonal_stats(
Expand All @@ -564,7 +561,7 @@ def _check_task_status(self) -> Dict[str, Union[Dict[str, str], bool]]:
"""
Updates the status is it needs to be changed
:return: Returns a dictionary of the most up-to-date status and whether it was changed
:return: Returns a dictionary of the most up-to-date status and whether that status was changed
:rtype: Dict[str, Union[Dict[str, str], bool]]
"""

Expand Down
15 changes: 7 additions & 8 deletions eedl/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def merge_outputs(file_mapping,
"""
Makes output zonal stats files into a data frame and adds a datetime field. Merges all inputs into one DF, and
can optionally insert into a sqlite database.
:param file_mapping: A set of tuples with a path to a file and a time value (string or datetime) to associate with it.
:type file_mapping:
:param date_field: Defaults to "et_date".
Expand All @@ -38,7 +37,7 @@ def merge_outputs(file_mapping,
df.loc[:, date_field] = time_value
dfs.append(df)

# merge all the data frames together
# Merge all the data frames together
final_df = pandas.concat(dfs)
final_df.reset_index(inplace=True)

Expand All @@ -51,16 +50,16 @@ def merge_outputs(file_mapping,

def plot_merged(df: pandas.DataFrame, et_field: str, date_field: str = "et_date", uniqueid: str = "UniqueID") -> so.Plot:
"""
:param df: Data source for the plot
Creates a seaborn plot of the
:param df: Data source for the plot.
:type df: pandas.DataFrame
:param et_field: Name of the variable on the x-axis
:param et_field: Name of the variable on the x-axis.
:type et_field: str
:param date_field: Name of the variable on the y-axis. Default is "et_date"
:param date_field: Name of the variable on the y-axis. Default is "et_date".
:type date_field: str
:param uniqueid: Defines additional data subsets that transforms should operate on independently. Default is "UniqueID"
:param uniqueid: Defines additional data subsets that transforms should operate on independently. Default is "UniqueID".
:type uniqueid: str
:return: Returns a seaborn object plot
:return: Returns a seaborn object plot.
:rtype: so.Plot
"""
return (
Expand Down
10 changes: 5 additions & 5 deletions eedl/mosaic_rasters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
def mosaic_folder(folder_path: Union[str, Path], output_path: Union[str, Path], prefix: str = "") -> None:
"""
:param folder_path: Location of the folder
:param folder_path: Location of the folder.
:type folder_path: Union[str, Path]
:param output_path: Output destination
:param output_path: Output destination.
:type output_path: Union[str, Path]
:param prefix: Used to find the files of interest.
:type prefix: Str
Expand Down Expand Up @@ -49,11 +49,11 @@ def mosaic_rasters(raster_paths: Sequence[Union[str, Path]],
vrt_options = gdal.BuildVRTOptions(resampleAlg='nearest', resolution="highest")
my_vrt = gdal.BuildVRT(vrt_path, raster_paths, options=vrt_options)
# my_vrt = None
my_vrt.FlushCache() # write the VRT out
my_vrt.FlushCache() # Write the VRT out
print(f"VRT at {vrt_path}")

# now let's export it to the output_path as a geotiff
driver = gdal.GetDriverByName("GTIFF") # we'll use VRT driver.CreateCopy
# Now let's export it to the output_path as a geotiff
driver = gdal.GetDriverByName("GTIFF") # We'll use VRT driver.CreateCopy
vrt_data = gdal.Open(vrt_path)
output = driver.CreateCopy(output_path, vrt_data, 0, ["COMPRESS=DEFLATE", ])
output.FlushCache()
Expand Down
Loading

0 comments on commit 7d4a4a1

Please sign in to comment.