From 70921eb8f6f9e61366b750203ce6433e36afd071 Mon Sep 17 00:00:00 2001 From: Sai Ma Date: Fri, 11 Oct 2024 03:07:06 +0000 Subject: [PATCH] Fix some logic error in processing script --- scripts/stacking_processing_script.py | 111 +++++++++++++++----------- 1 file changed, 64 insertions(+), 47 deletions(-) diff --git a/scripts/stacking_processing_script.py b/scripts/stacking_processing_script.py index 28fd85b..d9f3595 100644 --- a/scripts/stacking_processing_script.py +++ b/scripts/stacking_processing_script.py @@ -1,3 +1,7 @@ +import logging +import sys + +import click import rioxarray import s3fs import xarray as xr @@ -6,52 +10,64 @@ from dea_burn_cube import bc_io, helper -# Initialize the S3 filesystem (using anonymous access) -fs = s3fs.S3FileSystem(anon=True) - +# Set up logging configurations logging.getLogger("botocore.credentials").setLevel(logging.WARNING) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") logger = logging.getLogger(__name__) def logging_setup(): - """Set up logging.""" + """ + Set up the logging configuration to display logs from all modules except certain libraries. + """ + # Collect all loggers except those from specific libraries loggers = [ logging.getLogger(name) for name in logging.root.manager.loggerDict if not name.startswith("sqlalchemy") and not name.startswith("boto") ] + # Set up a stream handler to direct logs to stdout stdout_hdlr = logging.StreamHandler(sys.stdout) for logger in loggers: logger.addHandler(stdout_hdlr) - logger.propagate = False + logger.propagate = False # Prevent logs from propagating to the root logger def process_files(match_products, region_id, output_folder): """ - Search for corresponding files from the match_products based on region id - """ + Processes the files for the given products and region ID by fetching data from S3, + applying product weights, and combining the results. + + Parameters: + - match_products (list): List of product information including name, weight, and file extension. + - region_id (str): The ID of the region to process. + - output_folder (str): The base folder path where output files are stored. + Returns: + - xarray.DataArray or None: Returns a combined summary of the processed files or None if no files are found. + """ pair_files = [] - # Collect matching files from each product + # Initialize the S3 filesystem with anonymous access + fs = s3fs.S3FileSystem(anon=True) + + # Collect matching files for each product based on region and file extension for match_product in match_products: - # Build the target folder path dynamically - # Let us assume all NBIC product under the same project folder: - # s3://dea-public-data-dev/projects/burn_cube/derivative + # Build the target folder path dynamically for each product target_folder = ( - f"{output_folder/match_product['product_name']}/3-0-0/{region_id}/" + f"{output_folder}/{match_product['product_name']}/3-0-0/{region_id}/" ) - # List files in the specific target folder + # List files in the target folder using S3 file system all_files = fs.glob(target_folder + "**") - # Filter matching files by extension and store with product weight + # Filter the list to only include files with the specified extension matching_files = [ file for file in all_files if file.endswith(match_product["extension_name"]) ] + # If matching files are found, store the first file and its product weight if matching_files: pair_files.append( { @@ -60,23 +76,23 @@ def process_files(match_products, region_id, output_folder): } ) - # If no files matched, skip further processing + # If no files matched the criteria, return None to indicate no further processing is needed if not pair_files: return None - # Open and process all matching files + # Open and process all matching files, applying their respective weights da_list = [] for pair_file in pair_files: - # Open raster data from S3 + # Open raster data from S3 using rioxarray da = rioxarray.open_rasterio(f"s3://{pair_file['file_path']}") - # Apply product weight + # Multiply the data array by the product weight da_list.append(da * pair_file["product_weight"]) - # Concatenate along a new "variable" dimension and sum across it + # Combine the weighted data arrays along a new dimension and sum them to get the summary combined = xr.concat(da_list, dim="variable") sum_summary = combined.sum(dim="variable") - # Add CRS (Coordinate Reference System) to the output + # Add the Coordinate Reference System (CRS) attribute to the output data array sum_summary.attrs["crs"] = geometry.CRS("EPSG:3577") return sum_summary @@ -87,59 +103,60 @@ def process_files(match_products, region_id, output_folder): "--region-id", "-r", type=str, - default=None, - help="REQUIRED. Region id AU-30 Grid.", + required=True, + help="REQUIRED. Region ID for the processing, e.g., AU-30 Grid.", ) @click.option( "--process-cfg-url", "-p", type=str, - default=None, - help="REQUIRED. The Path URL to Burn Cube process cfg file as YAML format.", + required=True, + help="REQUIRED. URL to the Stacking process configuration file (YAML format).", ) @click.option( "--overwrite/--no-overwrite", default=False, - help="Rerun scenes that have already been processed.", + help="Whether to rerun scenes that have already been processed.", ) -def stacking_processing( - region_id, - process_cfg_url, - overwrite, -): - """ - Simple program to load all sub-products (e.g., burn cube, DEA RF and DEA RBR) to get - Stacking result as GeoTIFF file. +def stacking_processing(region_id, process_cfg_url, overwrite): """ + Load and process satellite imagery data to generate a stacking result saved as a GeoTIFF file. - logging_setup() + Parameters: + - region_id (str): Region ID to identify the area of interest. + - process_cfg_url (str): URL of the YAML configuration file for process settings. + - overwrite (bool): Flag to determine whether to overwrite existing files. + """ + logging_setup() # Initialize the logging setup + # Load the process configuration from the provided YAML URL process_cfg = helper.load_yaml_remote(process_cfg_url) match_products = process_cfg["match_products"] output_folder = process_cfg["output_folder"] - output_product_name = process_cfg["product"]["name"] - # activate AWS credential from attached service account - helper.get_and_set_aws_credentials() + # Process files based on the region and products information + sum_summary = process_files(match_products, region_id, output_folder) - # Limit processing to the first 4 files for efficiency (or batch processing) - result = process_files(match_products, region_id, output_folder) + if sum_summary: + # Define the output GeoTIFF file name pattern + pred_tif = f"dea_nbic_stacking_{region_id.replace('/', '')}_2020.tif" - if result: - sum_summary, pattern = result - # Write the result to a COG file + # Write the result to a Cloud Optimized GeoTIFF (COG) file + write_cog(geo_im=sum_summary, fname=pred_tif, overwrite=overwrite, nodata=-999) - pred_tif = ("dea_nbic_stacking_" + pattern.replace("/", "") + "_2020.tif",) - - write_cog(geo_im=sum_summary, fname=pred_tif, overwrite=True, nodata=-999) - - logger.info("Save result as: " + str(pred_tif)) + logger.info(f"Saved result as: {pred_tif}") + # Construct the S3 file URI for the output file s3_file_uri = f"{output_folder}/{output_product_name}/3-0-0/{region_id[:3]}/{region_id[3:]}/{pred_tif}" + # Activate AWS credentials from the service account attached + helper.get_and_set_aws_credentials() + + # Upload the output GeoTIFF to the specified S3 location bc_io.upload_object_to_s3(pred_tif, s3_file_uri) + logger.info(f"Uploaded to S3: {s3_file_uri}") if __name__ == "__main__":