diff --git a/docs/data_datacube.md b/docs/data_datacube.md index a353ea97..ceec98e9 100644 --- a/docs/data_datacube.md +++ b/docs/data_datacube.md @@ -29,11 +29,11 @@ Build the docker image and push it to a ecr repository. ```bash ecr_repo_id=12345 cd scripts/pipeline/batch -docker build -t $ecr_repo_iud.dkr.ecr.us-east-1.amazonaws.com/fetch-and-run . +docker build -t $ecr_repo_id.dkr.ecr.us-east-1.amazonaws.com/fetch-and-run . -aws ecr get-login-password --profile clay --region us-east-1 | docker login --username AWS --password-stdin $ecr_repo_iud.dkr.ecr.us-east-1.amazonaws.com +aws ecr get-login-password --profile clay --region us-east-1 | docker login --username AWS --password-stdin $ecr_repo_id.dkr.ecr.us-east-1.amazonaws.com -docker push $ecr_repo_iud.dkr.ecr.us-east-1.amazonaws.com/fetch-and-run:latest +docker push $ecr_repo_id.dkr.ecr.us-east-1.amazonaws.com/fetch-and-run ``` ### Prepare AWS batch diff --git a/docs/data_sampling.md b/docs/data_sampling.md index 7d3bd5af..ba26437f 100644 --- a/docs/data_sampling.md +++ b/docs/data_sampling.md @@ -22,8 +22,12 @@ In addition to the landcover classes, we also added diversity by selecting 500 tiles out of the 3000 tiles with the highest count of land cover classes present in the tile. +After selecting MGRS tiles for each of these criteria, we removed duplicates. + The following table summarizes the selection criteria for each class. +## For model version v0.1 + | Class | Nr of Tiles | From highest | |---|---|---| Diversity | 500 | 3000 @@ -39,9 +43,32 @@ Bare / sparse vegetation | 50 | 500 Snow and Ice | 50 | 500 Permanent water bodies | 100 | 1000 -After selecting MGRS tiles for each of these criteria, we removed duplicates. This resulted in a sample of 1517 MGRS tiles total in our sample. The resulting sample file can be downloaded from the following link https://clay-mgrs-samples.s3.amazonaws.com/mgrs_sample.fgb + +## For model version v0.2 + +| Class | Nr of Tiles | From highest | +|---|---|---| +Diversity | 400 | 2000 +Built-up | 300 | 300 +Built-up | 1000 | 1500 +Herbaceous wetland | 50 | 500 +Mangroves | 50 | 500 +Moss and lichen | 50 | 500 +Cropland | 800 | 3600 +Tree cover | 150 | 750 +Shrubland | 100 | 500 +Grassland | 200 | 500 +Bare / sparse vegetation | 50 | 500 +Snow and Ice | 25 | 500 +Permanent water bodies | 50 | 1000 + +This resulted in a sample of 2728 MGRS tiles total in our sample. + +The resulting sample file can be downloaded from the following link + +https://clay-mgrs-samples.s3.amazonaws.com/mgrs_sample_v02.fgb diff --git a/scripts/pipeline/batch/submit.py b/scripts/pipeline/batch/submit.py index 3ee87c68..cbc568ab 100644 --- a/scripts/pipeline/batch/submit.py +++ b/scripts/pipeline/batch/submit.py @@ -1,31 +1,40 @@ +import os + import boto3 batch = boto3.client("batch", region_name="us-east-1") -NR_OF_TILES_IN_SAMPLE_FILE = 1517 +NR_OF_TILES_IN_SAMPLE_FILE = 2728 + +PC_KEY = os.environ["PC_SDK_SUBSCRIPTION_KEY"] -PC_KEY = "***" -for i in range(NR_OF_TILES_IN_SAMPLE_FILE): - job = { - "jobName": f"fetch-and-run-{i}", - "jobQueue": "fetch-and-run", - "jobDefinition": "fetch-and-run", - "containerOverrides": { - "command": ["datacube.py", "--index", f"{i}", "--bucket", "clay-tiles-02"], - "environment": [ - {"name": "BATCH_FILE_TYPE", "value": "zip"}, - { - "name": "BATCH_FILE_S3_URL", - "value": "s3://clay-fetch-and-run-packages/batch-fetch-and-run.zip", - }, - {"name": "PC_SDK_SUBSCRIPTION_KEY", "value": f"{PC_KEY}"}, - ], - "resourceRequirements": [ - {"type": "MEMORY", "value": "8000"}, - {"type": "VCPU", "value": "4"}, - ], - }, - } +job = { + "jobName": "fetch-and-run-datacube", + "jobQueue": "fetch-and-run", + "jobDefinition": "fetch-and-run", + "containerOverrides": { + "command": [ + "datacube.py", + "--bucket", + "clay-tiles-04-sample-v02", + "--sample", + "https://clay-mgrs-samples.s3.amazonaws.com/mgrs_sample_v02.fgb", + ], + "environment": [ + {"name": "BATCH_FILE_TYPE", "value": "zip"}, + { + "name": "BATCH_FILE_S3_URL", + "value": "s3://clay-fetch-and-run-packages/batch-fetch-and-run.zip", + }, + {"name": "PC_SDK_SUBSCRIPTION_KEY", "value": f"{PC_KEY}"}, + ], + "resourceRequirements": [ + {"type": "MEMORY", "value": "15500"}, + {"type": "VCPU", "value": "4"}, + ], + }, + "arrayProperties": {"size": NR_OF_TILES_IN_SAMPLE_FILE}, +} - print(batch.submit_job(**job)) +print(batch.submit_job(**job)) diff --git a/scripts/pipeline/datacube.py b/scripts/pipeline/datacube.py index ad4d470f..f7ef4215 100755 --- a/scripts/pipeline/datacube.py +++ b/scripts/pipeline/datacube.py @@ -26,6 +26,8 @@ Process Sentinel-2, Sentinel-1, and DEM data for a specified time range, area of interest, and resolution. """ +import logging +import os import random from datetime import timedelta @@ -46,8 +48,15 @@ CLOUD_COVER_PERCENTAGE = 50 NODATA_PIXEL_PERCENTAGE = 20 NODATA = 0 -S1_MATCH_ATTEMPTS = 20 -DATES_PER_LOCATION = 3 +S1_MATCH_ATTEMPTS = 40 +DATES_PER_LOCATION = 4 + +logger = logging.getLogger("datacube") +hdr = logging.StreamHandler() +formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s") +hdr.setFormatter(formatter) +logger.addHandler(hdr) +logger.setLevel(logging.INFO) def get_surrounding_days(reference, interval_days): @@ -123,7 +132,7 @@ def search_sentinel2( ) s2_items = search.item_collection() - print(f"Found {len(s2_items)} Sentinel-2 items") + logger.info(f"Found {len(s2_items)} Sentinel-2 items") if not len(s2_items): return None, None @@ -143,7 +152,7 @@ def search_sentinel2( bbox = least_clouds.geometry.bounds epsg = s2_item.properties["proj:epsg"] - print("EPSG code based on Sentinel-2 item: ", epsg) + logger.info(f"EPSG code based on Sentinel-2 item: {epsg}") return s2_item, bbox @@ -187,7 +196,7 @@ def search_sentinel1(bbox, catalog, date_range): }, ) s1_items = search.item_collection() - print(f"Found {len(s1_items)} Sentinel-1 items") + logger.info(f"Found {len(s1_items)} Sentinel-1 items") if not len(s1_items): return @@ -203,7 +212,7 @@ def search_sentinel1(bbox, catalog, date_range): s1_gdf = s1_gdf.sort_values(by="overlap", ascending=False) most_overlap_orbit = s1_gdf.iloc[0]["sat:orbit_state"] - print("Most overlapped orbit: ", most_overlap_orbit) + logger.info(f"Most overlapped orbit: {most_overlap_orbit}") selected_item_ids = [] intersection = None orbit = None @@ -246,7 +255,7 @@ def search_dem(bbox, catalog): """ search = catalog.search(collections=["cop-dem-glo-30"], bbox=bbox) dem_items = search.item_collection() - print(f"Found {len(dem_items)} DEM items") + logger.info(f"Found {len(dem_items)} DEM items") return dem_items @@ -354,7 +363,7 @@ def process( continue surrounding_days = get_surrounding_days(s2_item.datetime, interval_days=3) - print("Searching S1 in date range", surrounding_days) + logger.info(f"Searching S1 in date range {surrounding_days}") s1_items = search_sentinel1(bbox, catalog, surrounding_days) @@ -362,7 +371,7 @@ def process( break if i == S1_MATCH_ATTEMPTS - 1: - print( + logger.info( "No match for S1 scenes found for date range " f"{date_range} after {S1_MATCH_ATTEMPTS} attempts." ) @@ -379,6 +388,10 @@ def process( resolution, ) + if 0 in (dat.shape[0] for dat in result): + logger.info("S2/S1 pixel coverages do not overlap although bounds do") + return None, None + return date, result @@ -404,13 +417,13 @@ def convert_attrs_and_coords_objects_to_str(data): @click.option( "--sample", required=False, - default="https://clay-mgrs-samples.s3.amazonaws.com/mgrs_sample.fgb", + default="https://clay-mgrs-samples.s3.amazonaws.com/mgrs_sample_v02.fgb", help="Location of MGRS tile sample", ) @click.option( "--index", required=False, - default=0, + default=None, help="Index of MGRS tile from sample file that should be processed", ) @click.option( @@ -441,13 +454,20 @@ def convert_attrs_and_coords_objects_to_str(data): type=str, help="Comma separated list of date ranges, each provided as YYYY-MM-DD/YYYY-MM-DD.", ) -def main(sample, index, subset, bucket, localpath, dateranges): - index = int(index) +@click.option("-v", "--verbose", is_flag=True) +def main(sample, index, subset, bucket, localpath, dateranges, verbose): # noqa: PLR0913 + if verbose: + logger.setLevel(logging.DEBUG) + + if index is None: + index = int(os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX", 0)) + else: + index = int(index) tiles = gpd.read_file(sample) tile = tiles.iloc[index] mgrs = tile["name"] - print(f"Starting algorithm for MGRS tile {tile['name']} with index {index}") + logger.info(f"Starting algorithm for MGRS tile {tile['name']} with index {index}") if subset: subset = [int(dat) for dat in subset.split(",")] @@ -466,7 +486,7 @@ def main(sample, index, subset, bucket, localpath, dateranges): match_count = 0 for date_range in date_ranges: - print(f"Processing data for date range {date_range}") + logger.info(f"Processing data for date range {date_range}") date, pixels = process( tile.geometry, date_range, @@ -480,7 +500,7 @@ def main(sample, index, subset, bucket, localpath, dateranges): match_count += 1 if subset: - print(f"Subsetting to {subset}") + logger.info(f"Subsetting to {subset}") pixels = [ part[:, subset[1] : subset[3], subset[0] : subset[2]] for part in pixels ] @@ -493,7 +513,7 @@ def main(sample, index, subset, bucket, localpath, dateranges): break if not match_count: - raise ValueError("No matching data found") + logger.info("No matching data found") if __name__ == "__main__": diff --git a/scripts/pipeline/landcover.py b/scripts/pipeline/landcover.py index 24d79163..68e4754a 100644 --- a/scripts/pipeline/landcover.py +++ b/scripts/pipeline/landcover.py @@ -13,9 +13,7 @@ WGS84 = CRS.from_epsg(4326) NODATA = 0 WATER = 80 -WATER_LOWER_TH = 0.2 -WATER_UPPER_TH = 0.7 -RANDOM_SEED = 42 +RANDOM_SEED = 23 CLASSES = { 10: "Tree cover", 20: "Shrubland", @@ -156,31 +154,25 @@ def percentages(data): def sample(wd): """ Sample the mgrs tiles based on landcover statistics. - - Target: ~1000 tiles - Set very small counts to zero. Exclude high latitudes. - 200 samples from the 2000 most diverse - 50 samples from the 1000 highest for all other categories except water - 100 samples from all tiles with water between 30% an 70% (making sure we - capture some, but exclude only purely water so we catch coasts) """ data = geopandas.read_file(Path(wd, "mgrs_stats.fgb")) data_norm = percentages(data.loc[:, data.columns != "count"]) data[data_norm.columns] = data_norm - diversity = split_highest(data, "count", 500, 3000) - urban = split_highest(data, "Built-up", 400) + diversity = split_highest(data, "count", 400, 2000) + urban = split_highest(data, "Built-up", 300, 300) + urban = split_highest(data, "Built-up", 1000, 1500) wetland = split_highest(data, "Herbaceous wetland", 50, 500) mangroves = split_highest(data, "Mangroves", 50, 500) moss = split_highest(data, "Moss and lichen", 50, 500) - cropland = split_highest(data, "Cropland", 100, 500) - trees = split_highest(data, "Tree cover", 100, 500) - shrubland = split_highest(data, "Shrubland", 50, 500) - grassland = split_highest(data, "Grassland", 50, 500) + cropland = split_highest(data, "Cropland", 800, 3600) + trees = split_highest(data, "Tree cover", 150, 750) + shrubland = split_highest(data, "Shrubland", 100, 500) + grassland = split_highest(data, "Grassland", 200, 500) bare = split_highest(data, "Bare / sparse vegetation", 50, 500) - snow = split_highest(data, "Snow and Ice", 50, 500) - water = split_highest(data, "Permanent water bodies", 100, 1000) + snow = split_highest(data, "Snow and Ice", 25, 250) + water = split_highest(data, "Permanent water bodies", 50, 1000) result = pandas.concat( [ @@ -200,6 +192,7 @@ def sample(wd): ) result = result.drop_duplicates(subset=["name"]) + print(f"Found {len(result)} MGRS tiles") result.to_file(Path(wd, "mgrs_sample.fgb", driver="FlatGeobuf")) diff --git a/scripts/pipeline/tile.py b/scripts/pipeline/tile.py index 41a9056f..3c5683a8 100644 --- a/scripts/pipeline/tile.py +++ b/scripts/pipeline/tile.py @@ -8,6 +8,7 @@ It includes functions to filter tiles based on cloud coverage and no-data pixels, and a tiling function that generates smaller tiles from the input stack. """ +import logging import subprocess import tempfile @@ -18,11 +19,13 @@ from rasterio.enums import ColorInterp NODATA = 0 -TILE_SIZE = 512 +TILE_SIZE = 256 PIXELS_PER_TILE = TILE_SIZE * TILE_SIZE BAD_PIXEL_MAX_PERCENTAGE = 0.3 SCL_FILTER = [0, 1, 3, 8, 9, 10] -VERSION = "02" +VERSION = "04" + +logger = logging.getLogger("datacube") def filter_clouds_nodata(tile): @@ -37,19 +40,19 @@ def filter_clouds_nodata(tile): """ # Check for nodata pixels if int(tile.sel(band="B02").isin([NODATA]).sum()): - print("Too much no-data in B02") + logger.debug("Too much no-data in B02") return False bands_to_check = ["vv", "vh", "dem"] for band in bands_to_check: if int(np.isnan(tile.sel(band=band)).sum()): - print(f"Too much no-data in {band}") + logger.debug(f"Too much no-data in {band}") return False # Check for cloud coverage cloudy_pixel_count = int(tile.sel(band="SCL").isin(SCL_FILTER).sum()) if cloudy_pixel_count / PIXELS_PER_TILE >= BAD_PIXEL_MAX_PERCENTAGE: - print("Too much cloud coverage") + logger.debug("Too much cloud coverage") return False return True # If both conditions pass @@ -66,7 +69,7 @@ def tile_to_dir(stack, date, mgrs, bucket, dir): - mgrs (str): MGRS Tile id - bucket(str): AWS S3 bucket to write tiles to """ - print("Writing tempfiles to ", dir) + logger.debug(f"Writing tempfiles to {dir}") # Calculate the number of full tiles in x and y directions num_x_tiles = stack[0].x.size // TILE_SIZE @@ -90,8 +93,8 @@ def tile_to_dir(stack, date, mgrs, bucket, dir): tile = xr.concat(parts, dim="band").rename("tile") counter += 1 - if counter % 100 == 0: - print(f"Counted {counter} tiles") + if counter % 250 == 0: + logger.info(f"Counted {counter} tiles") if not filter_clouds_nodata(tile): continue @@ -117,21 +120,21 @@ def tile_to_dir(stack, date, mgrs, bucket, dir): with rasterio.open(name, "r+") as rst: rst.colorinterp = color rst.update_tags(date=date) - if bucket: - print(f"Syncing {dir} with s3://{bucket}/{VERSION}/{mgrs}/{date}") - subprocess.run( - [ - "aws", - "s3", - "sync", - dir, - f"s3://{bucket}/{VERSION}/{mgrs}/{date}", - "--no-progress", - ], - check=True, - ) - else: - print("No bucket specified, skipping S3 sync.") + if bucket: + logger.info(f"Syncing {dir} with s3://{bucket}/{VERSION}/{mgrs}/{date}") + subprocess.run( + [ + "aws", + "s3", + "sync", + dir, + f"s3://{bucket}/{VERSION}/{mgrs}/{date}", + "--quiet", + ], + check=True, + ) + else: + logger.info("No bucket specified, skipping S3 sync.") def tiler(stack, date, mgrs, bucket, dir):