diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 1b65c73..c287c63 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -22,5 +22,5 @@ jobs: - name: mypy check run: | - pip install mypy types-requests pandas-stubs - mypy -p eedl \ No newline at end of file + pip install mypy types-requests pandas-stubs typing_extensions + mypy -p eedl --enable-incomplete-feature=Unpack diff --git a/eedl/__init__.py b/eedl/__init__.py index c232bef..e72be2a 100644 --- a/eedl/__init__.py +++ b/eedl/__init__.py @@ -1 +1 @@ -__version__ = "2023.08.22" +__version__ = "2023.08.31" diff --git a/eedl/core.py b/eedl/core.py new file mode 100644 index 0000000..93afb5a --- /dev/null +++ b/eedl/core.py @@ -0,0 +1,43 @@ +import os +from pathlib import Path +from typing import Union, Dict + +import fiona + + +def _get_fiona_args(polygon_path: Union[str, Path]) -> Dict[str, Union[str, Path]]: + """ + A simple utility that detects if, maybe, we're dealing with an Esri File Geodatabase. This is the wrong way + to do this, but it'll work in many situations. + + :param polygon_path: File location of polygons. + :type polygon_path: Union[str, Path] + :return: Returns the full path and, depending on the file format, the file name in a dictionary. + :rtype: Dict[str, Union[str, Path]] + """ + + parts = os.path.split(polygon_path) + # if the folder name ends with .gdb and the "filename" doesn't have an extension, assume it's an FGDB + if (parts[0].endswith(".gdb") or parts[0].endswith(".gpkg")) and "." not in parts[1]: + return {'fp': parts[0], 'layer': parts[1]} + else: + return {'fp': polygon_path} + + +def safe_fiona_open(features_path: Union[str, Path], **extra_kwargs) -> fiona.Collection: + """ + Handles opening things in fiona in a way that is safe, even for geodatabases where we need + to open the geodatabase itself and specify a layer. The caller is responsible for + ensuring the features are closed (e.g. a try/finally block with a call to features.close() + in the finally block should immediately follow calling this function. + :param features_path: A Path object or string path to open with fiona + :param extra_kwargs: Keyword arguments to directly pass through to fiona. Helpful when trying to filter features, etc + :return: + """ + kwargs = _get_fiona_args(features_path) + main_file_path = kwargs['fp'] + del kwargs['fp'] + + kwargs = {**kwargs, **extra_kwargs} + + return fiona.open(main_file_path, **kwargs) diff --git a/eedl/google_cloud.py b/eedl/google_cloud.py index bb98db1..5512692 100644 --- a/eedl/google_cloud.py +++ b/eedl/google_cloud.py @@ -24,9 +24,10 @@ def get_public_export_urls(bucket_name: str, prefix: str = "") -> List[str]: base_url = "https://storage.googleapis.com/" request_url = f"{base_url}{bucket_name}/" + search_url = f"{request_url}?prefix={prefix}" # need to include the prefix here or else we get failures after having more than 1k items - # get the content of the bucket (it needs to be public - listing = requests.get(request_url).text + # get the content of the bucket (it needs to be public) + listing = requests.get(search_url).text # comes back as an XML listing - don't need to parse the XML, just need the values of the Key elements pattern = re.compile("(.*?)") @@ -51,6 +52,8 @@ def download_public_export(bucket_name: str, output_folder: Union[str, Path], pr # get the urls of items in the bucket with the specified prefix urls = get_public_export_urls(bucket_name, prefix) + os.makedirs(output_folder, exist_ok=True) + for url in urls: filename = url.split("/")[-1] # get the filename output_path = Path(output_folder) / filename # construct the output path diff --git a/eedl/helpers.py b/eedl/helpers.py new file mode 100644 index 0000000..8298111 --- /dev/null +++ b/eedl/helpers.py @@ -0,0 +1,214 @@ +import os +import itertools +import datetime + +from .core import safe_fiona_open +from .image import EEDLImage, TaskRegistry + +import ee +from ee import ImageCollection + + +class GroupedCollectionExtractor(): + + def __init__(self, **kwargs): + self.keep_image_objects = False # whether or not to store the EEDLImage objects on this class so they can be accessed when it's done. We don't just to not use the RAM on large exports + self.all_images = [] # all the exported images are saved here. They can then be operated on once the extractor is complete + self.skip_existing = True # a feature allowing it to resume from crashes. If the mosaic image exists, it skips doing any processing on the rest of it + self.on_error = "log" + + self.collection = None + self.collection_band = None + self.time_start = None + self.time_end = None + self.mosaic_by_date = True + self.areas_of_interest_path = None # the path to a spatial data file readable by Fiona/GEOS that has features defining AOIs to extract individually + + self.strict_clip = True # may be necessary for some things to behave, so keeping this as a default to True. People can disable if they know what they're doing (may be faster) + self.export_type = "drive" + self.drive_root_folder = None + self.cloud_bucket = None + self.download_folder = None # local folder name after downloading for processing + self.export_folder = None # drive/cloud export folder name + + self.zonal_run = True + self.zonal_areas_of_interest_attr = None # what is the attribute on each of the AOI polygons that tells us what items to use in the zonal extraction + self.zonal_features_path = None # what polygons to use as inputs for zonal stats + self.zonal_features_area_of_interest_attr = None # what field in the zonal features has the value that should match zonal_areas_of_interest_attr? + self.zonal_features_preserve_fields = None # what fields to preserve, as a tuple - typically an ID and anything else you want + self.zonal_stats_to_calc = () # what statistics to output by zonal feature + self.zonal_use_points = False + self.zonal_inject_date: bool = False + self.zonal_inject_group_id: bool = False + self.zonal_nodata_value: int = 0 + + self.merge_sqlite = True # should we merge all outputs to a single SQLite database + self.merge_grouped_csv = True # should we merge CSV by grouped item + self.merge_final_csv = False # should we merge all output tables + + self._all_outputs = list() # for storing the paths to all output csvs + + self.max_fiona_features_load = 1000 # threshold where we switch from keeping fiona features in memory as a list to using itertools.tee to split the iterator + + for kwarg in kwargs: + setattr(self, kwarg, kwargs[kwarg]) + + def _single_item_extract(self, image, task_registry, zonal_features, aoi_attr, ee_geom, image_date, aoi_download_folder): + """ + This looks a bit silly here, but we need to construct this here so that we have access + to this method's variables since we can't pass them in and it can't be a class function. + :param image: + :param state: + :return: + """ + + filename_description = "alfalfa_et_ensemble" + export_image = EEDLImage( + task_registry=task_registry, + drive_root_folder=self.drive_root_folder, + cloud_bucket=self.cloud_bucket, + filename_description=filename_description + ) + export_image.zonal_polygons = zonal_features + export_image.zonal_use_points = self.zonal_use_points + export_image.zonal_keep_fields = self.zonal_features_preserve_fields + export_image.zonal_stats_to_calc = self.zonal_stats_to_calc + export_image.zonal_nodata_value = self.zonal_nodata_value + export_image.date_string = image_date + + zonal_inject_constants = {} + if self.zonal_inject_date: + zonal_inject_constants["date"] = image_date + if self.zonal_inject_group_id: + zonal_inject_constants["group_id"] = aoi_attr + + export_image.zonal_inject_constants = zonal_inject_constants + + filename_suffix = f"{aoi_attr}_{image_date}" + if self.skip_existing and export_image.check_mosaic_exists(aoi_download_folder, self.export_folder, f"{filename_description}_{filename_suffix}"): + print(f"Image {filename_suffix} exists and skip_existing=True. Skipping") + return + + export_image.export(image, + export_type=self.export_type, + filename_suffix=filename_suffix, + clip=ee_geom, + strict_clip=self.strict_clip, + folder=self.export_folder, # the folder to export to in Google Drive + ) # this all needs some work still so that + + def extract(self): + collection = self._get_and_filter_collection() + + # now we need to get each polygon to filter the bounds to and make a new collection with filterBounds for just + # that geometry + + self._all_outputs = list() + features = safe_fiona_open(self.areas_of_interest_path) + try: + num_complete = 0 + for feature in features: + print(f"Number of complete AOIs: {num_complete}") + task_registry = TaskRegistry() + + ee_geom = ee.Geometry.Polygon(feature['geometry']['coordinates'][0]) # WARNING: THIS DOESN'T CHECK CRS + aoi_collection = collection.filterBounds(ee_geom) + + # get some variables defined for use in extracting the zonal stats + aoi_attr = feature.properties[self.zonal_areas_of_interest_attr] # this is the value we'll search for in the zonal features + zonal_features_query = f"{self.zonal_features_area_of_interest_attr} = '{aoi_attr}'" + aoi_download_folder = os.path.join(self.download_folder, aoi_attr) + + fiona_zonal_features = safe_fiona_open(self.zonal_features_path) + try: + zonal_features_filtered = fiona_zonal_features.filter(where=zonal_features_query) + + image_list = aoi_collection.toList(aoi_collection.size()).getInfo() + indicies_and_dates = [(im['properties']['system:index'], im['properties']['system:time_start']) for im in image_list] + + """ + if len(zonal_features_filtered) < self.max_fiona_features_load: + # zonal_features_filtered = list(zonal_features_filtered) # this *would* be inefficient, but we're going to re-use it so many times, it's not terrible, exce + # using_tee = False + # else: + # using an itertools tee may not be more efficient than a list, but it also might, because + # even if we iterate through all features and all features remain queued for other iterations + # it may not load all attributes, etc, for each feature if fiona lazy loads anything. It won't + # be that much slower in any case, though the complexity of maintaining the code here is something + # to consider + """ + zonal_features_filtered_tee = itertools.tee(zonal_features_filtered, len(image_list)) + using_tee = True + + for i, image_info in enumerate(indicies_and_dates): + if using_tee: + zonal_features = zonal_features_filtered_tee[i - 1] + else: + zonal_features = zonal_features_filtered + + image = aoi_collection.filter(ee.Filter.eq("system:time_start", image_info[1])).first() # get the image from the collection again based on ID + timsetamp_in_seconds = int(str(image_info[1])[:-3]) # we could divide by 1000, but then we'd coerce back from a float. This is precise. + date_string = datetime.datetime.fromtimestamp(timsetamp_in_seconds, tz=datetime.timezone.utc).strftime("%Y-%m-%d") + + self._single_item_extract(image, task_registry, zonal_features, aoi_attr, ee_geom, date_string, aoi_download_folder) + + # ok, now that we have a collection for the AOI, we need to iterate through all the images + # in the collection as we normally would in a script, but also extract the features of interest for use + # in zonal stats. Right now the zonal stats code only accepts files. We might want to make it accept + # some kind of fiona iterator - can we filter fiona objects by attributes? + # fiona supports SQL queries on open and zonal stats now supports receiving an open fiona object + + task_registry.setup_log(os.path.join(self.download_folder, "eedl_processing_error_log.txt")) + task_registry.wait_for_images(aoi_download_folder, sleep_time=15, callback="mosaic_and_zonal", try_again_disk_full=False, on_failure=self.on_error) + + if self.keep_image_objects: + self.all_images.extend(task_registry.images) + + finally: + fiona_zonal_features.close() + + num_complete += 1 + finally: + features.close() + + def _get_and_filter_collection(self): + collection = ImageCollection(self.collection) + + if self.time_start or self.time_end: + collection = collection.filterDate(self.time_start, self.time_end) + + if self.collection_band: + collection = collection.select(self.collection_band) + + if self.mosaic_by_date: # if we're supposed to take the images in the collection and merge them so that all images on one date are a single image + collection = mosaic_by_date(collection) + + return collection + + +def mosaic_by_date(image_collection): + """ + Adapted to Python from code found via https://gis.stackexchange.com/a/343453/1955 + :param image_collection: An image collection + :return: ee.ImageCollection + """ + image_list = image_collection.toList(image_collection.size()) + + unique_dates = image_list.map(lambda im: ee.Image(im).date().format("YYYY-MM-dd")).distinct() + + def _make_mosaicked_image(d): + d = ee.Date(d) + + image = image_collection.filterDate(d, d.advance(1, "day")).mosaic() + + image_w_props = image.set( + "system:time_start", d.millis(), + "system:id", d.format("YYYY-MM-dd"), + "system:index", d.format("YYYY-MM-dd") + ).rename(d.format("YYYY-MM-dd")), + + return image_w_props[0] + + mosaic_imlist = unique_dates.map(_make_mosaicked_image) + + return ee.ImageCollection(mosaic_imlist) diff --git a/eedl/image.py b/eedl/image.py index 6e58fc1..56e068a 100644 --- a/eedl/image.py +++ b/eedl/image.py @@ -1,8 +1,12 @@ import os +import io import shutil import time from pathlib import Path from typing import Dict, List, Optional, Tuple, Union +from typing_extensions import TypedDict, NotRequired, Unpack +import traceback +import datetime import ee from ee import EEException @@ -11,33 +15,27 @@ from . import mosaic_rasters from . import zonal + +class EEExportDict(TypedDict): + fileDimensions: Optional[int] + folder: NotRequired[Optional[Union[str, Path]]] + crs: Optional[str] + region: NotRequired[ee.geometry.Geometry] + description: str + fileNamePrefix: str + scale: Union[int, float] + maxPixels: Union[int, float] + bucket: NotRequired[Optional[str]] + + DEFAULTS = dict( CRS='EPSG:4326', - TILE_SIZE=12800, + TILE_SIZE=12800, # multiple of shardSize default 256 EXPORT_FOLDER="ee_exports" ) -def _get_fiona_args(polygon_path: Union[str, Path]) -> Dict[str, Union[str, Path]]: - """ - A simple utility that detects if, maybe, we're dealing with an Esri File Geodatabase. This is the wrong way - to do this, but it'll work in many situations. - - :param polygon_path: File location of polygons. - :type polygon_path: Union[str, Path] - :return: Returns the full path and, depending on the file format, the file name in a dictionary. - :rtype: Dict[str, Union[str, Path]] - """ - - parts = os.path.split(polygon_path) - # if the folder name ends with .gdb and the "filename" doesn't have an extension, assume it's an FGDB - if (parts[0].endswith(".gdb") or parts[0].endswith(".gpkg")) and "." not in parts[1]: - return {'fp': parts[0], 'layer': parts[1]} - else: - return {'fp': polygon_path} - - def download_images_in_folder(source_location: Union[str, Path], download_location: Union[str, Path], prefix: str) -> None: """ Handles pulling data from Google Drive over to a local location, filtering by a filename prefix and folder @@ -53,6 +51,9 @@ def download_images_in_folder(source_location: Union[str, Path], download_locati folder_search_path: Union[str, Path] = source_location files = [filename for filename in os.listdir(folder_search_path) if filename.startswith(prefix)] + if len(files) == 0: + print(f"Likely Error: Could not find files to download for {prefix} in {folder_search_path} - you likely have a misconfiguration in your export parameters. Future steps may fail.") + os.makedirs(download_location, exist_ok=True) for filename in files: @@ -72,8 +73,11 @@ def __init__(self) -> None: Initialized the TaskRegistry class and defaults images to "[]" and the callback function to "None" :return: None """ - self.images: List[Image] = [] + self.images: List[EEDLImage] = [] self.callback: Optional[str] = None + self.log_file_path: Optional[Union[str, Path]] = None # the path to the log file + self.log_file: Optional[io.TextIOWrapper] = None # the open log file handle + self.raise_errors: bool = True def add(self, image: ee.image.Image) -> None: """ @@ -130,15 +134,48 @@ def download_ready_images(self, download_location: Union[str, Path]) -> None: :return: None """ for image in self.downloadable_tasks: - print(f"{image.filename} is ready for download") - image.download_results(download_location=download_location, callback=self.callback) + try: + print(f"{image.filename} is ready for download") + image.download_results(download_location=download_location, callback=self.callback) + except: # noqa: E722 + # on any error raise or log it + if self.raise_errors: + raise + + error_details = traceback.format_exc() + self.log_error("local", f"Failed to process image {image.filename}. Error details: {error_details}") + + def setup_log(self, log_file_path: Union[str, Path], mode='a'): + self.log_file_path = log_file_path + self.log_file = open(self.log_file_path, 'a') + + def log_error(self, error_type: str, error_message: str): + """ + :param error_type: Options "ee", "local" to indicate whether it was an error on Earth Engine's side or on + the local processing side + :param error_message: The error message to print to the log file + """ + message = f"{error_type} Error: {error_message}" + date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") + print(message) + + if self.log_file: + self.log_file.write(f"{date_string}: {message}") + + def __del__(self): + if self.log_file is not None: + try: + self.log_file.close() + except: # noqa: E722 + # if we get any exception while closing it, don't make noise, just move on. We're just trying to be clean here where we can + pass def wait_for_images(self, download_location: Union[str, Path], sleep_time: int = 10, callback: Optional[str] = None, try_again_disk_full: bool = True, - on_failure="raise") -> None: + on_failure: str = "log") -> None: """ Blocker until there are no more incomplete or downloadable tasks left. @@ -153,6 +190,11 @@ def wait_for_images(self, :return: None """ + if on_failure == "raise": + self.raise_errors = True + elif on_failure == "log" and self.log_file: # if they say to log the errors and specified a log file, set raise errors to False + self.raise_errors = False + self.callback = callback while len(self.incomplete_tasks) > 0 or len(self.downloadable_tasks) > 0: try: @@ -166,18 +208,22 @@ def wait_for_images(self, time.sleep(sleep_time) - if on_failure == "raise" and len(self.failed_tasks) > 0: - raise EEException(f"{len(self.failed_tasks)} images failed to export. Example error message from first" - f" failed image \"{self.failed_tasks[0].last_task_status['description']}\" was" - f" \"{self.failed_tasks[0].last_task_status['error_message']}\"." - f" Check https://code.earthengine.google.com/tasks in your web browser to see status and" - f" messages for all export tasks.") + if len(self.failed_tasks) > 0: + message = f"{len(self.failed_tasks)} images failed to export. Example error message from first" \ + f" failed image \"{self.failed_tasks[0].last_task_status['description']}\" was" \ + f" \"{self.failed_tasks[0].last_task_status['error_message']}\"." \ + f" Check https://code.earthengine.google.com/tasks in your web browser to see status and" \ + f" messages for all export tasks." + if on_failure == "raise": + raise EEException(message) + else: + print(message) main_task_registry = TaskRegistry() -class Image: +class EEDLImage: """ The main class that does all the work. Any use of this package should instantiate this class for each export the user wants to do. As we refine this, we may be able to provide just a single function in this module named @@ -209,9 +255,23 @@ def __init__(self, **kwargs) -> None: self.export_folder: Optional[Union[str, Path]] = None self.mosaic_image: Optional[Union[str, Path]] = None self.task: Optional[ee.batch.Task] = None - self.bucket: Optional[str] = None + self.cloud_bucket: Optional[str] = None self._ee_image: Optional[ee.image.Image] = None self.output_folder: Optional[Union[str, Path]] = None + self.task_registry = main_task_registry + + self.filename_description = "" + self.date_string = "" # for items that want to store a date representation + + # these values are only used if someone calls the mosaic_and_zonal callback - we need the values defined on + # the class to do that + self.zonal_polygons: Optional[Union[str, Path]] = None + self.zonal_stats_to_calc: Optional[Tuple] = None + self.zonal_keep_fields: Optional[Tuple] = None + self.zonal_use_points: bool = False + self.zonal_output_filepath: Optional[Union[str, Path]] = None # set by self.zonal_stats + self.zonal_inject_constants: dict = dict() + self.zonal_nodata_value: int = -9999 # set the defaults here - this is a nice strategy where we get to define constants near the top that aren't buried in code, then apply them here for key in DEFAULTS: @@ -226,8 +286,6 @@ def __init__(self, **kwargs) -> None: self.task_data_downloaded = False self.export_type = "Drive" # other option is "Cloud" - self.filename_description = "" - def _set_names(self, filename_suffix: str = "") -> None: """ @@ -280,8 +338,9 @@ def export(self, filename_suffix: str, export_type: str = "drive", clip: Optional[ee.geometry.Geometry] = None, + strict_clip: Optional[bool] = False, drive_root_folder: Optional[Union[str, Path]] = None, - **export_kwargs) -> None: + **export_kwargs: Unpack[EEExportDict]) -> None: """ Handles the exporting of an image @@ -291,69 +350,102 @@ def export(self, :type filename_suffix: Str :param export_type: Specifies how the image should be exported. Either "cloud" or "drive". Defaults to "drive". :type export_type: Str - :param clip: Defines the clip that should be used + :param clip: Defines the region of interest for export - does not perform a strict clip, which is often slower. + Instead it uses the Earth Engine export's "region" parameter to clip the results to the bounding box of + the clip geometry. To clip to the actual geometry, set strict_clip to True :type clip: Optional[ee.geometry.Geometry] + :param strict_clip: When set to True, performs a true clip on the result so that it's not just the bounding box + but also the actual clipping geometry. Defaults to False + :type clip: Optional[bool] :param drive_root_folder: The folder for exporting if "drive" is selected :type drive_root_folder: Optional[Union[str, Path]] :return: None """ - # If "image" does not have a clip attribute, the error message is not very helpful. This allows for a custom error message: if not isinstance(image, ee.image.Image): - raise ValueError("Invalid image provided for export") + raise ValueError("Invalid image provided for export - please provide a single image (not a collection or another object) of class ee.image.Image for export") + + if export_type.lower() == "drive" and \ + (self.drive_root_folder is None or not os.path.exists(self.drive_root_folder)) and \ + (drive_root_folder is None or not os.path.exists(drive_root_folder)): - if export_type.lower() == "drive" and (drive_root_folder is None or not os.path.exists(drive_root_folder)): raise NotADirectoryError("The provided path for the Google Drive export folder is not a valid directory but" " Drive export was specified. Either change the export type to use Google Cloud" " and set that up properly (with a bucket, etc), or set the drive_root_folder" " to a valid folder") elif export_type.lower() == "drive": - self.drive_root_folder = drive_root_folder + if drive_root_folder: + self.drive_root_folder = drive_root_folder self._initialize() + if clip and not isinstance(clip, ee.geometry.Geometry): + raise ValueError("Invalid geometry provided for clipping. Export parameter `clip` must be an instance of ee.geometry.Geometry") + + if clip and strict_clip and isinstance(clip, ee.geometry.Geometry): # + image = image.clip(clip) + self._ee_image = image self._set_names(filename_suffix) - ee_kwargs = { + ee_kwargs: EEExportDict = { 'description': self.description, 'fileNamePrefix': self.filename, 'scale': 30, 'maxPixels': 1e12, - # multiple of shardSize default 256. Should split into about 9 tiles 'fileDimensions': self.tile_size, 'crs': self.crs } - # Get a silent error if clip is not of type ee.geometry.Geometry if isinstance(clip, ee.geometry.Geometry): ee_kwargs["region"] = clip - elif clip: - raise ValueError("Invalid geometry provided for export") # override any of these defaults with anything else provided ee_kwargs.update(export_kwargs) + if "folder" not in ee_kwargs: # if they didn't specify a folder, use the class' default or whatever they defined previously + ee_kwargs['folder'] = self.export_folder + else: + self.export_folder = ee_kwargs['folder'] # we need to persist this so we can find the image later on, and so it's picked up by cloud export code below + if export_type.lower() == "drive": - if "folder" not in ee_kwargs: - ee_kwargs['folder'] = self.export_folder self.task = ee.batch.Export.image.toDrive(self._ee_image, **ee_kwargs) elif export_type.lower() == "cloud": # add the folder to the filename here for Google Cloud ee_kwargs['fileNamePrefix'] = f"{self.export_folder}/{ee_kwargs['fileNamePrefix']}" - self.bucket = str(ee_kwargs['bucket']) + + if "bucket" not in ee_kwargs: # if we already defined the bucket on the class, use that + ee_kwargs['bucket'] = self.cloud_bucket + else: # otherwise, attempt to retrieve it from the call to this function + self.cloud_bucket = str(ee_kwargs['bucket']) + + if "folder" in ee_kwargs: # we made this part of the filename prefix above, so delete it now or it will cause an error. + del ee_kwargs["folder"] + self.task = ee.batch.Export.image.toCloudStorage(self._ee_image, **ee_kwargs) # export_type is not valid else: - raise ValueError("Invalid value for export_type. Did you mean drive or cloud?") + raise ValueError("Invalid value for export_type. Did you mean \"drive\" or \"cloud\"?") self.task.start() self.export_type = export_type - main_task_registry.add(self) + self.task_registry.add(self) + + @staticmethod + def check_mosaic_exists(download_location: Union[str, Path], export_folder: Union[str, Path], filename: str): + """ + This function isn't ideal because it duplicates information - you need to pass it in elsewhere and assume + this file format matches, rather than actually calculating the paths earlier in the process. But that's + currently necessary because the task registry sets the download location right now. So we want to be able + to check at any time if the mosaic exists so that we can skip processing - we're using this. Otherwise + we'd need to do a big refactor that's probably not worth it. + """ + output_file = os.path.join(str(download_location), str(export_folder), f"{filename}_mosaic.tif") + return os.path.exists(output_file) def download_results(self, download_location: Union[str, Path], callback: Optional[str] = None, drive_wait: int = 15) -> None: """ @@ -372,14 +464,15 @@ def download_results(self, download_location: Union[str, Path], callback: Option # "FAILED", "READY", "SUBMITTED" (maybe - double check that - it might be that it waits with UNSUBMITTED), # "RUNNING", "UNSUBMITTED" - folder_search_path = os.path.join(str(self.drive_root_folder), str(self.export_folder)) self.output_folder = os.path.join(str(download_location), str(self.export_folder)) + if self.export_type.lower() == "drive": time.sleep(drive_wait) # it seems like there's often a race condition where EE reports export complete, but no files are found. Give things a short time to sync up. + folder_search_path = os.path.join(str(self.drive_root_folder), str(self.export_folder)) download_images_in_folder(folder_search_path, self.output_folder, prefix=self.filename) elif self.export_type.lower() == "cloud": - google_cloud.download_public_export(str(self.bucket), self.output_folder, f"{self.export_folder}/{self.filename}") + google_cloud.download_public_export(str(self.cloud_bucket), self.output_folder, f"{self.export_folder}/{self.filename}") else: raise ValueError("Unknown export_type (not one of 'drive', 'cloud') - can't download") @@ -399,6 +492,31 @@ def mosaic(self) -> None: self.mosaic_image = os.path.join(str(self.output_folder), f"{self.filename}_mosaic.tif") mosaic_rasters.mosaic_folder(str(self.output_folder), self.mosaic_image, prefix=self.filename) + def mosaic_and_zonal(self) -> None: + """ + A callback that takes no parameters, but runs mosaic and zonal stats. Runs zonal stats + by allowing the user to set all the zonal params on the class instance instead of passing + them as params + """ + + if not (self.zonal_polygons and self.zonal_keep_fields and self.zonal_stats_to_calc): + raise ValueError("Can't run mosaic and zonal callback without `polygons`, `keep_fields, and `stats` values" + "set on the class instance.") + + try: + use_points = self.zonal_use_points + except AttributeError: + use_points = False + + self.mosaic() + self.zonal_stats(polygons=self.zonal_polygons, + keep_fields=self.zonal_keep_fields, + stats=self.zonal_stats_to_calc, + use_points=use_points, + inject_constants=self.zonal_inject_constants, + nodata_value=self.zonal_nodata_value + ) + def zonal_stats(self, polygons: Union[str, Path], keep_fields: Tuple[str, ...] = ("UniqueID", "CLASS2"), @@ -406,6 +524,8 @@ def zonal_stats(self, report_threshold: int = 1000, write_batch_size: int = 2000, use_points: bool = False, + inject_constants: dict = dict(), + nodata_value: int = -9999 ) -> None: """ @@ -426,7 +546,8 @@ def zonal_stats(self, """ - zonal.zonal_stats(polygons, + self.zonal_output_filepath = zonal.zonal_stats( + polygons, self.mosaic_image, self.output_folder, self.filename, @@ -434,7 +555,10 @@ def zonal_stats(self, stats=stats, report_threshold=report_threshold, write_batch_size=write_batch_size, - use_points=use_points) + use_points=use_points, + inject_constants=inject_constants, + nodata_value=nodata_value + ) def _check_task_status(self) -> Dict[str, Union[Dict[str, str], bool]]: """ diff --git a/eedl/merge.py b/eedl/merge.py index 6870473..f91fc69 100644 --- a/eedl/merge.py +++ b/eedl/merge.py @@ -2,6 +2,7 @@ A tool to merge separate timeseries outputs into a single data frame or DB table """ import sqlite3 +import os from typing import Optional import pandas @@ -69,3 +70,51 @@ def plot_merged(df: pandas.DataFrame, et_field: str, date_field: str = "et_date" ).add(so.Line(linewidth=0.5, alpha=0.1), group=uniqueid) .layout(size=(8, 4)) ) + + +def merge_csvs_in_folder(folder_path, output_path, sqlite_db=None, sqlite_table=None): + if sqlite_db and not sqlite_table: + raise ValueError("Cannot insert into sqlite db without table name") + + csvs = [item for item in os.listdir(folder_path) if item.endswith(".csv")] + + dfs = [] + for csv in csvs: + print(csv) + df = pandas.read_csv(os.path.join(folder_path, csv)) + df.drop(columns="index", inplace=True, errors="ignore") + dfs.append(df) + + # merge all the data frames together + final_df = pandas.concat(dfs) + final_df.reset_index(drop=True, inplace=True) + + if output_path: + final_df.to_csv(output_path) + + if sqlite_db: + with sqlite3.connect(sqlite_db) as conn: + final_df.to_sql(str(sqlite_table), conn) + + return final_df + + +def merge_many(base_folder, subfolder_name="alfalfa_et"): + output_folder = os.path.join(base_folder, "merged_csvs") + os.makedirs(output_folder, exist_ok=True) + + folders = [folder for folder in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, folder))] + for folder in folders: + if folder == "merged_csvs": + continue + print(folder) + output_file = os.path.join(output_folder, f"{folder}.csv") + input_folder = os.path.join(base_folder, folder, subfolder_name) + merge_csvs_in_folder(input_folder, output_file) + + print("Merging all CSVs") + mega_output_csv = os.path.join(output_folder, "_all_csvs_merged.csv") + mega_output_sqlite = os.path.join(output_folder, "_all_data_merged.sqlite") + sqlite_table = "merged_data" + merge_csvs_in_folder(output_folder, mega_output_csv, mega_output_sqlite, sqlite_table) + print("Done") diff --git a/eedl/mosaic_rasters.py b/eedl/mosaic_rasters.py index 7cb7f46..24d9d5d 100644 --- a/eedl/mosaic_rasters.py +++ b/eedl/mosaic_rasters.py @@ -1,4 +1,5 @@ import os +import shutil import tempfile from pathlib import Path from typing import Sequence, Union @@ -18,6 +19,11 @@ def mosaic_folder(folder_path: Union[str, Path], output_path: Union[str, Path], :return: None """ tifs = [os.path.join(folder_path, filename) for filename in os.listdir(folder_path) if filename.endswith(".tif") and filename.startswith(prefix)] + + if len(tifs) == 1: # if we only got one image back, don't both mosaicking, though this will also skip generating overviews. + shutil.move(tifs[0], output_path) # just move the output image to the "mosaic" name, then return + return + mosaic_rasters(tifs, output_path) diff --git a/eedl/zonal.py b/eedl/zonal.py index a910343..e8275f1 100644 --- a/eedl/zonal.py +++ b/eedl/zonal.py @@ -1,32 +1,15 @@ import csv import os from pathlib import Path -from typing import Dict, Iterable, Union +from typing import Iterable, Union import fiona import rasterstats +from eedl.core import safe_fiona_open -def _get_fiona_args(polygon_path: Union[str, Path]) -> Dict[str, Union[str, Path]]: - """ - A simple utility that detects if, maybe, we're dealing with an Esri File Geodatabase. This is the wrong way - to do this, but it'll work in many situations. - - :param polygon_path: File location of polygons. - :type polygon_path: Union[str, Path] - :return: Returns the full path and, depending on the file format, the file name in a dictionary. - :rtype: Dict[str, Union[str, Path]] - """ - - parts = os.path.split(polygon_path) - # if the folder name ends with .gdb and the "filename" doesn't have an extension, assume it's an FGDB - if (parts[0].endswith(".gdb") or parts[0].endswith(".gpkg")) and "." not in parts[1]: - return {'fp': parts[0], 'layer': parts[1]} - else: - return {'fp': polygon_path} - -def zonal_stats(features: Union[str, Path], +def zonal_stats(features: Union[str, Path, fiona.Collection], raster: Union[str, Path, None], output_folder: Union[str, Path, None], filename: str, @@ -35,6 +18,8 @@ def zonal_stats(features: Union[str, Path], report_threshold: int = 1000, write_batch_size: int = 2000, use_points: bool = False, + inject_constants: dict = dict(), + nodata_value: int = -9999, **kwargs) -> Union[str, Path, None]: # TODO: Make this check if raster and polys are in the same CRS - if they're not, then rasterstats doesn't # automatically align them and we just get bad output. @@ -65,6 +50,10 @@ def zonal_stats(features: Union[str, Path], when use_points is True. Additionally, when this is True, the `stats` argument to this function is ignored as only a single value will be extracted as the attribute `value` in the output CSV. Default is False. :type use_points: Bool + :param inject_constants: A dictionary of field: value mappings to inject into every row. Useful if you plan to merge + the data later. For example, a raster may be a single variable and date, and we're extracting many rasters. So + for each zonal call, you could do something like inject_constants = {date: '2021-01-01', variable: 'et'}, which + would produce headers in the CSV for "date" and "variable" and added values in the CSV of "2021-01-01", "et". :param kwargs: Passed through to rasterstats :return: :rtype: Union[str, Path, None] @@ -73,30 +62,40 @@ def zonal_stats(features: Union[str, Path], # next line, each item isn't evaluated, which should prevent us from needing to store a geojson representation of # all the polygons at one time since we'll strip it off (it'd be bad to try to keep all of it - # A silly hack to get fiona to open GDB data by splitting it only if the input is a gdb data item, then providing - # anything else as kwargs. But fiona requires the main item to be an arg, not a kwarg - kwargs = _get_fiona_args(features) - main_file_path = kwargs['fp'] - del kwargs['fp'] - output_filepath: Union[str, None] = None - with fiona.open(main_file_path, **kwargs) as feats_open: + if not (isinstance(features, fiona.Collection) or hasattr(features, "__iter__")): # if features isn't already a fiona collection instance or something else we can iterate over + # A silly hack to get fiona to open GDB data by splitting it only if the input is a gdb data item, then providing + # anything else as kwargs. But fiona requires the main item to be an arg, not a kwarg + feats_open = safe_fiona_open(features) + _feats_opened_in_function = True + else: + feats_open = features # if it's a fiona instance, just use the open instance + _feats_opened_in_function = False # but mark that we didn't open it so we don't close it later + try: if not use_points: # If we want to do zonal, open a zonal stats generator - zstats_results_geo = rasterstats.gen_zonal_stats(feats_open, raster, stats=stats, geojson_out=True, nodata=-9999, **kwargs) + zstats_results_geo = rasterstats.gen_zonal_stats(feats_open, + raster, + stats=stats, + geojson_out=True, + nodata=nodata_value, + **kwargs + ) fieldnames = (*stats, *keep_fields) - filesuffix = "zonal_stats" + filesuffix = f"zonal_stats_nodata{nodata_value}" else: # Otherwise, open a point query generator. # TODO: Need to make it convert the polygons to points here, otherwise it'll get the vertex data zstats_results_geo = rasterstats.gen_point_query(feats_open, raster, geojson_out=True, # Need this to get extra attributes back - nodata=-9999, + nodata=nodata_value, interpolate="nearest", # Need this or else rasterstats uses a mix of nearby cells, even for single points **kwargs) - fieldnames = ("value", *keep_fields,) # When doing point queries, we get a field called "value" back with the raster value - filesuffix = "point_query" + fieldnames = ("value", *keep_fields) # When doing point queries, we get a field called "value" back with the raster value + filesuffix = f"point_query_nodata{nodata_value}" + + fieldnames_headers = (*fieldnames, *inject_constants.keys()) # this is separate because we use fieldnames later to pull out data - the constants are handled separately, but we need to write this to the CSV as a header # Here's a first approach that still stores a lot in memory - it's commented out because we're instead # going to just generate them one by one and write them to a file directly. @@ -110,7 +109,7 @@ def zonal_stats(features: Union[str, Path], i = 0 output_filepath = os.path.join(str(output_folder), f"{filename}_{filesuffix}.csv") with open(output_filepath, 'w', newline='') as csv_file: - writer = csv.DictWriter(csv_file, fieldnames=fieldnames) + writer = csv.DictWriter(csv_file, fieldnames=fieldnames_headers) writer.writeheader() results = [] for poly in zstats_results_geo: # Get the result for the polygon, then filter the keys with the dictionary comprehension below @@ -120,6 +119,8 @@ def zonal_stats(features: Union[str, Path], if type(result[key]) is float: result[key] = f"{result[key]:.5f}" + result = {**result, **inject_constants} # merge in the constants + i += 1 results.append(result) if i % write_batch_size == 0: @@ -132,45 +133,8 @@ def zonal_stats(features: Union[str, Path], if len(results) > 0: # Clear out any remaining items at the end writer.writerows(results) print(i) + finally: + if _feats_opened_in_function: # if we opened the fiona object here, close it. Otherwise, leave it open + feats_open.close() return output_filepath - - -def run_data_2018_baseline() -> None: - """ - - - :return: None - """ - datasets = [ - # dict( - # name="cv_water_balance", - # raster_folder=r"D:\ET_Summers\ee_exports_water_balance\et_exports_sseboper", - # liq=r"C:\Users\dsx\Downloads\drought_liq_2018.gdb\liq_cv_2018_3310", - # output_folder=r"D:\ET_Summers\ee_exports_water_balance\et_exports_sseboper\2018_baseline" - # ), - dict( - name="non_cv_water_balance", - raster_folder=r"D:\ET_Summers\ee_exports_water_balance_non_cv\et_exports_sseboper", - liq=r"C:\Users\dsx\Downloads\drought_liq_2018.gdb\liq_non_cv_2018_3310", - output_folder=r"D:\ET_Summers\ee_exports_water_balance_non_cv\et_exports_sseboper\2018_baseline" - ) - - ] - - skips = [r"mean_et_2022-2022-05-01--2022-08-31__water_balance_may_aug_mean_mosaic.tif"] - - for dataset in datasets: - liq = dataset["liq"] - raster_folder = dataset["raster_folder"] - output_folder = dataset["output_folder"] - # was going to do this differently, but leaving it alone - rasters = [item for item in os.listdir(raster_folder) if item.endswith(".tif") and item not in skips] - rasters_processing = [os.path.join(raster_folder, item) for item in rasters] - - print(liq) - print(rasters_processing) - for raster in rasters_processing: - print(raster) - output_name = os.path.splitext(os.path.split(raster)[1])[0] - zonal_stats(liq, raster, output_folder, output_name) diff --git a/environment.yml b/environment.yml index 29691be..60bec8d 100644 --- a/environment.yml +++ b/environment.yml @@ -14,4 +14,5 @@ dependencies: - google-cloud-storage - setuptools - requests - - pytest \ No newline at end of file + - pytest + - typing-extensions \ No newline at end of file diff --git a/examples/basic.py b/examples/basic.py index e51ddfe..ea7f5c9 100644 --- a/examples/basic.py +++ b/examples/basic.py @@ -3,7 +3,7 @@ # we should change the name of our Image class - it conflicts with the class image in the ee package, and people will # likely be using both. Let's not cause confusion -from eedl.image import Image +from eedl.image import EEDLImage import eedl @@ -13,7 +13,7 @@ def test_simple() -> None: # Adam, make sure to set the drive root folder for your own testing - we'll need to fix this, and in the future, # we can use a Google Cloud bucket for most testing this is clunky - we should make the instantiation of the image be able to take a kwarg that sets the value of image, I think. - image = Image() + image = EEDLImage() image.export(s2_image, "valley_water_s2_test_image", export_type="Drive", drive_root_folder=r"G:\My Drive", clip=geometry) # We need to make it check and report whether the export on the EE side was successful. This test "passed" because Earth Engine failed and there wasn't anything to download (oops) diff --git a/examples/data/test_bound_exports.gpkg b/examples/data/test_bound_exports.gpkg new file mode 100644 index 0000000..32f9d37 Binary files /dev/null and b/examples/data/test_bound_exports.gpkg differ diff --git a/examples/multi_export_helper_alfalfa.py b/examples/multi_export_helper_alfalfa.py new file mode 100644 index 0000000..0f64396 --- /dev/null +++ b/examples/multi_export_helper_alfalfa.py @@ -0,0 +1,43 @@ +import os +import sys + +import ee + +folder = os.path.dirname(os.path.abspath(__file__)) + +sys.path.append(os.path.join(os.path.dirname(folder))) + +from eedl.helpers import GroupedCollectionExtractor # noqa: E402 + +ee.Initialize() + +data_geopackage = os.path.join(folder, "data", "test_bound_exports.gpkg") +region_bounds = os.path.join(data_geopackage, "huc8_export_bounds_4326") +field_bounds = os.path.join(data_geopackage, "alfalfa_fields_with_huc8_4326") + +extractor = GroupedCollectionExtractor( + keep_image_objects=False, + collection="OpenET/ENSEMBLE/CONUS/GRIDMET/MONTHLY/v2_0", + collection_band="et_ensemble_mad", + time_start="2019-01-01", + time_end="2021-12-31", + mosaic_by_date=True, # OpenET images are already a single mosaic per date, so we'll just iterate through the images + areas_of_interest_path=region_bounds, + strict_clip=True, # we just want a rectangular BBox clip, not a geometry clip, which can be slower sometimes, but crossing UTM zones created problems without the strict clip - seems like an EE bug. Leave this as True for OpenET exports + drive_root_folder=r"G:\My Drive", + export_type="drive", + export_folder="alfalfa_et", + download_folder=r"D:\alfalfa_et", + zonal_run=True, + zonal_areas_of_interest_attr="huc8", + zonal_features_path=field_bounds, + zonal_features_area_of_interest_attr="huc8", + zonal_features_preserve_fields=("Id", "huc8",), + zonal_stats_to_calc=("min", "max", "mean", "std", "count"), + zonal_use_points=False, + zonal_inject_date=True, + zonal_inject_group_id=True, + zonal_nodata_value=0, +) + +extractor.extract() diff --git a/examples/scv_et_rework.py b/examples/scv_et_rework.py index 992ddf4..19c1a9d 100644 --- a/examples/scv_et_rework.py +++ b/examples/scv_et_rework.py @@ -5,7 +5,7 @@ # we should change the name of our Image class - it conflicts with the class image in the ee package, and people will # likely be using both. Let's not cause confusion -from eedl.image import Image +from eedl.image import EEDLImage import eedl ee.Initialize() @@ -34,7 +34,7 @@ def scv_data_download_for_year(year, openet_collection=r"OpenET/ENSEMBLE/CONUS/G winter_image = winter_collection_doubles.sum() # export the annual image and queue it for download - annual_export_image = Image(crs="EPSG:4326") + annual_export_image = EEDLImage(crs="EPSG:4326") annual_export_image.export(annual_image, filename_suffix=f"valley_water_ensemble_total_et_mm_{year}", export_type="Drive", @@ -43,7 +43,7 @@ def scv_data_download_for_year(year, openet_collection=r"OpenET/ENSEMBLE/CONUS/G folder="vw_et_update_2023" ) - winter_export_image = Image(crs="EPSG:4326") + winter_export_image = EEDLImage(crs="EPSG:4326") winter_export_image.export(winter_image, filename_suffix=f"valley_water_ensemble_winter_et_mm_{year}", export_type="Drive", diff --git a/requirements.txt b/requirements.txt index 2cf0772..cf826b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ seaborn google-cloud-storage setuptools requests -pytest \ No newline at end of file +pytest +typing-extensions \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 9b1f949..75fec1b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,4 +28,5 @@ install_requires = setuptools requests pytest + typing-extensions python_requires = >= 3.8 diff --git a/test.bat b/test.bat new file mode 100644 index 0000000..55b033e --- /dev/null +++ b/test.bat @@ -0,0 +1 @@ +pytest \ No newline at end of file diff --git a/tests/test_basic.py b/tests/test_basic.py index 1d85e3d..ff912b4 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,12 +1,12 @@ # import ee # from ee import ImageCollection import pytest # noqa -from eedl.image import Image +from eedl.image import EEDLImage # we should change the name of our Image class - it conflicts with the class image in the ee package, and people will # likely be using both. Let's not cause confusion def test_class_instantiation(): - image = Image() - assert isinstance(image, Image) + image = EEDLImage() + assert isinstance(image, EEDLImage)