-
Notifications
You must be signed in to change notification settings - Fork 1
/
ray_maple_workflow.py
168 lines (138 loc) · 6.3 KB
/
ray_maple_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
MAPLE Workflow
Main Script that runs the inference workflow pipeline.
Pre Process
1. Create water mask
2. Image Tiling
Classification / Inference
Post processing
3. Stich back to the original image dims from the tiles (2.)
Project: Permafrost Discovery Gateway: Mapping Application for Arctic Permafrost Land Environment(MAPLE)
PI : Chandi Witharana
"""
import argparse
import os
from typing import Any, Dict
import ray
import tensorflow as tf
from mpl_config import MPL_Config
import ray_image_preprocessing
import ray_infer_tiles
import ray_write_shapefiles
import ray_tile_and_stitch_util
def create_geotiff_images_dataset(config: MPL_Config) -> ray.data.Dataset:
if config.GCP_FILESYSTEM is not None:
return ray.data.read_binary_files(config.INPUT_IMAGE_DIR + "/", filesystem=config.GCP_FILESYSTEM, include_paths=True)
return ray.data.read_binary_files(config.INPUT_IMAGE_DIR, include_paths=True)
def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]:
row["image_name"] = os.path.basename(row["path"]).split(".tif")[0]
return row
def create_directory_if_not_exists(directory_path: str):
"""Creates a directory with the specified path if it doesn't already exist."""
if not os.path.exists(directory_path):
os.makedirs(directory_path)
print(f"Directory created: {directory_path}")
else:
print(f"Directory already exists: {directory_path}")
if __name__ == "__main__":
tf.compat.v1.disable_eager_execution()
parser = argparse.ArgumentParser(
description="Extract IWPs from satellite image scenes using MAPLE."
)
# Optional Arguments
parser.add_argument(
"--image",
required=False,
default="test_image_01.tif",
metavar="<command>",
help="Image name",
)
parser.add_argument(
"--root_dir",
required=False,
default="",
help="The directory path from where the workflow is running. If none is "
"provided, the current working directory will be used by the workflow. "
"If the root directory starts with gcs:// or gs:// then the workflow will "
"read and write to the google cloud storage buckets.",
)
parser.add_argument(
"--adc_dir",
required=False,
default="",
help="The directory path for application default credentials (adc). This path must be set if "
"you want to give ray access to your gcs buckets when you are running this workflow on your "
"*local computer*. It is necessary for service account impersonation, which is used to give "
"this code access to your storage bucket when running the code locally."
)
parser.add_argument(
"--weight_file",
required=False,
default="hyp_best_train_weights_final.h5",
help="The file path to where the model weights can be found. Should be "
"relative to the root directory.",
)
parser.add_argument(
"--gpus_per_core",
required=False,
default=1,
help="Number of GPUs available per core. Used to determine how many "
"inference processes to spin up. Set this to 0 if you want to run the "
"workflow on a CPU.",
type=int
)
parser.add_argument(
"--concurrency",
required=False,
default=2,
help="Number of ray workers for each map call. The concurrency parameter "
"should be tuned based on the resources available in the raycluster in "
"order to make the best use of the compute resources available. ",
type=int
)
args = parser.parse_args()
image_name = args.image
config = MPL_Config(
args.root_dir, args.adc_dir, args.weight_file, num_gpus_per_core=args.gpus_per_core
)
concurrency = args.concurrency
print("Starting MAPLE Ray pipeline...")
print("""This pipeline will:
- load geotiffs into ray dataset
- calculate a watermask for each image
- tile each watermasked image
- perform inference on each tile
- stitch the tile inference results to get the inference results for each image
- write the inference results for each image to shapefiles
""")
# 0. Load geotiffs into ray dataset
dataset = create_geotiff_images_dataset(config).map(add_image_name, concurrency=concurrency)
# 1. Start calculating watermask
dataset_with_water_mask = dataset.map(fn=ray_image_preprocessing.cal_water_mask,
fn_kwargs={"config": config}, concurrency=args.concurrency)
# 2. Start tiling image
image_tiles_dataset = dataset_with_water_mask.flat_map(
fn=ray_tile_and_stitch_util.tile_image, fn_kwargs={"config": config}, concurrency=args.concurrency)
image_tiles_dataset = image_tiles_dataset.drop_columns(["mask"])
# 3. Start inferencing
inferenced_dataset = image_tiles_dataset.map(
fn=ray_infer_tiles.MaskRCNNPredictor, fn_constructor_kwargs={"config": config}, concurrency=concurrency)
# 4. Start stitching
data_per_image = inferenced_dataset.groupby(
"image_name").map_groups(ray_tile_and_stitch_util.stitch_shapefile, concurrency=args.concurrency)
# 5. Write shapefiles
# Create the output directory if it doesn't exist.
# TODO - do something similar for GCP directories, we have the code to create local
# dirs if they don't exist (ex. mpl_workflow_create_dir_struct.py), it'd be great to
# add support for creating GCP directories if they don't exist.
if config.GCP_FILESYSTEM is None:
create_directory_if_not_exists(config.RAY_OUTPUT_SHAPEFILES_DIR)
shapefiles_dataset = data_per_image.map(
fn=ray_write_shapefiles.WriteShapefiles, fn_constructor_kwargs={"config": config}, concurrency=concurrency)
# Materialize dataset so that the pipeline steps are executed.
materialized_dataset = shapefiles_dataset.materialize()
print("MAPLE Ray pipeline finished, done writing shapefiles", materialized_dataset.schema())
# Once you are done you can check the output on ArcGIS (win) or else you can check in QGIS (nx) Add the image and the
# shp, shx, dbf as layers.
# You can also look at compare_shapefile_features.py for how to compare the features in two shapefiles.
# You can also use 'ogrinfo -so -al <path to shapefile>' on the command line to examine a shapefile.