Skip to content

Commit

Permalink
Fix some logic error in processing script
Browse files Browse the repository at this point in the history
  • Loading branch information
supermarkion committed Oct 11, 2024
1 parent 0440464 commit 70921eb
Showing 1 changed file with 64 additions and 47 deletions.
111 changes: 64 additions & 47 deletions scripts/stacking_processing_script.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import logging
import sys

import click
import rioxarray
import s3fs
import xarray as xr
Expand All @@ -6,52 +10,64 @@

from dea_burn_cube import bc_io, helper

# Initialize the S3 filesystem (using anonymous access)
fs = s3fs.S3FileSystem(anon=True)

# Set up logging configurations
logging.getLogger("botocore.credentials").setLevel(logging.WARNING)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)


def logging_setup():
"""Set up logging."""
"""
Set up the logging configuration to display logs from all modules except certain libraries.
"""
# Collect all loggers except those from specific libraries
loggers = [
logging.getLogger(name)
for name in logging.root.manager.loggerDict
if not name.startswith("sqlalchemy") and not name.startswith("boto")
]

# Set up a stream handler to direct logs to stdout
stdout_hdlr = logging.StreamHandler(sys.stdout)
for logger in loggers:
logger.addHandler(stdout_hdlr)
logger.propagate = False
logger.propagate = False # Prevent logs from propagating to the root logger


def process_files(match_products, region_id, output_folder):
"""
Search for corresponding files from the match_products based on region id
"""
Processes the files for the given products and region ID by fetching data from S3,
applying product weights, and combining the results.
Parameters:
- match_products (list): List of product information including name, weight, and file extension.
- region_id (str): The ID of the region to process.
- output_folder (str): The base folder path where output files are stored.
Returns:
- xarray.DataArray or None: Returns a combined summary of the processed files or None if no files are found.
"""
pair_files = []

# Collect matching files from each product
# Initialize the S3 filesystem with anonymous access
fs = s3fs.S3FileSystem(anon=True)

# Collect matching files for each product based on region and file extension
for match_product in match_products:
# Build the target folder path dynamically
# Let us assume all NBIC product under the same project folder:
# s3://dea-public-data-dev/projects/burn_cube/derivative
# Build the target folder path dynamically for each product
target_folder = (
f"{output_folder/match_product['product_name']}/3-0-0/{region_id}/"
f"{output_folder}/{match_product['product_name']}/3-0-0/{region_id}/"
)

# List files in the specific target folder
# List files in the target folder using S3 file system
all_files = fs.glob(target_folder + "**")

# Filter matching files by extension and store with product weight
# Filter the list to only include files with the specified extension
matching_files = [
file for file in all_files if file.endswith(match_product["extension_name"])
]

# If matching files are found, store the first file and its product weight
if matching_files:
pair_files.append(
{
Expand All @@ -60,23 +76,23 @@ def process_files(match_products, region_id, output_folder):
}
)

# If no files matched, skip further processing
# If no files matched the criteria, return None to indicate no further processing is needed
if not pair_files:
return None

# Open and process all matching files
# Open and process all matching files, applying their respective weights
da_list = []
for pair_file in pair_files:
# Open raster data from S3
# Open raster data from S3 using rioxarray
da = rioxarray.open_rasterio(f"s3://{pair_file['file_path']}")
# Apply product weight
# Multiply the data array by the product weight
da_list.append(da * pair_file["product_weight"])

# Concatenate along a new "variable" dimension and sum across it
# Combine the weighted data arrays along a new dimension and sum them to get the summary
combined = xr.concat(da_list, dim="variable")
sum_summary = combined.sum(dim="variable")

# Add CRS (Coordinate Reference System) to the output
# Add the Coordinate Reference System (CRS) attribute to the output data array
sum_summary.attrs["crs"] = geometry.CRS("EPSG:3577")

return sum_summary
Expand All @@ -87,59 +103,60 @@ def process_files(match_products, region_id, output_folder):
"--region-id",
"-r",
type=str,
default=None,
help="REQUIRED. Region id AU-30 Grid.",
required=True,
help="REQUIRED. Region ID for the processing, e.g., AU-30 Grid.",
)
@click.option(
"--process-cfg-url",
"-p",
type=str,
default=None,
help="REQUIRED. The Path URL to Burn Cube process cfg file as YAML format.",
required=True,
help="REQUIRED. URL to the Stacking process configuration file (YAML format).",
)
@click.option(
"--overwrite/--no-overwrite",
default=False,
help="Rerun scenes that have already been processed.",
help="Whether to rerun scenes that have already been processed.",
)
def stacking_processing(
region_id,
process_cfg_url,
overwrite,
):
"""
Simple program to load all sub-products (e.g., burn cube, DEA RF and DEA RBR) to get
Stacking result as GeoTIFF file.
def stacking_processing(region_id, process_cfg_url, overwrite):
"""
Load and process satellite imagery data to generate a stacking result saved as a GeoTIFF file.
logging_setup()
Parameters:
- region_id (str): Region ID to identify the area of interest.
- process_cfg_url (str): URL of the YAML configuration file for process settings.
- overwrite (bool): Flag to determine whether to overwrite existing files.
"""
logging_setup() # Initialize the logging setup

# Load the process configuration from the provided YAML URL
process_cfg = helper.load_yaml_remote(process_cfg_url)

match_products = process_cfg["match_products"]
output_folder = process_cfg["output_folder"]

output_product_name = process_cfg["product"]["name"]

# activate AWS credential from attached service account
helper.get_and_set_aws_credentials()
# Process files based on the region and products information
sum_summary = process_files(match_products, region_id, output_folder)

# Limit processing to the first 4 files for efficiency (or batch processing)
result = process_files(match_products, region_id, output_folder)
if sum_summary:
# Define the output GeoTIFF file name pattern
pred_tif = f"dea_nbic_stacking_{region_id.replace('/', '')}_2020.tif"

if result:
sum_summary, pattern = result
# Write the result to a COG file
# Write the result to a Cloud Optimized GeoTIFF (COG) file
write_cog(geo_im=sum_summary, fname=pred_tif, overwrite=overwrite, nodata=-999)

pred_tif = ("dea_nbic_stacking_" + pattern.replace("/", "") + "_2020.tif",)

write_cog(geo_im=sum_summary, fname=pred_tif, overwrite=True, nodata=-999)

logger.info("Save result as: " + str(pred_tif))
logger.info(f"Saved result as: {pred_tif}")

# Construct the S3 file URI for the output file
s3_file_uri = f"{output_folder}/{output_product_name}/3-0-0/{region_id[:3]}/{region_id[3:]}/{pred_tif}"

# Activate AWS credentials from the service account attached
helper.get_and_set_aws_credentials()

# Upload the output GeoTIFF to the specified S3 location
bc_io.upload_object_to_s3(pred_tif, s3_file_uri)
logger.info(f"Uploaded to S3: {s3_file_uri}")


if __name__ == "__main__":
Expand Down

0 comments on commit 70921eb

Please sign in to comment.