diff --git a/CHANGELOG.md b/CHANGELOG.md index b589b57..39e25bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,12 +29,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added a CHANGELOG.md - Ahmed body recipe - Documentation for SFNO, GraphCast, vortex shedding, and Ahmed body +- Documentation for DLWP, and RNN examples ### Changed - Updated the SFNO example - Changed the default SFNO configs - Header test to ignore .gitignore items +- Sample download scripts in the DLWP example ### Deprecated diff --git a/docs/img/gray_scott_predictions_blog_2.gif b/docs/img/gray_scott_predictions_blog_2.gif new file mode 100644 index 0000000..bdabc1e Binary files /dev/null and b/docs/img/gray_scott_predictions_blog_2.gif differ diff --git a/examples/cfd/gray_scott_rnn/README.md b/examples/cfd/gray_scott_rnn/README.md new file mode 100644 index 0000000..55c8cb2 --- /dev/null +++ b/examples/cfd/gray_scott_rnn/README.md @@ -0,0 +1,50 @@ +# RNN for transient 3D Gray Scott system + +This example uses recurrent neural networks for spatio-temporal prediction of the +Gray-Scott reaction diffusion system. The example uses architecture that is inspired from +[Physics Informed RNN-DCT Networks for Time-Dependent Partial Differential Equations](https://arxiv.org/pdf/2202.12358.pdf) +paper. + +## Problem overview + +Time-series prediction is a key task in many domains. +The application of deep learning architectures—particularly RNNs, long short-term memory +networks (LSTMs), and similar networks has significantly enhanced the predictive capabilities. +These models are unique in their ability to capture temporal dependencies and learn complex +patterns over time, making them well suited for forecasting time varying relationships. +In physics-ML, these models are critical in predicting dynamic physical systems’ evolution, +enabling better simulations, understanding of complex natural phenomena, and aiding +in discoveries. + +This problem involves predicting the next timesteps of a 3D Gray Scott system given the +initial condition. + +## Dataset + +This example relies on the Dataset used in [Transformers for modeling physical systems](https://www.sciencedirect.com/science/article/abs/pii/S0893608021004500) +paper which solves the Reaction-Diffusion system governed by Gray-Scott model on a 3D grid. +The different samples are generated by using different initial conditions for the simulation. + +## Model overview and architecture + +The model uses Convolutional GRU layers for the RNN propagation and use a ResNet type +architecture for spatial encoding. This example uses the one-to-many variant of the RNN +model in Modulus. + +![Comparison between the 3D RNN model prediction and the +ground truth](../../../docs/img/gray_scott_predictions_blog_2.gif) + +## Getting Started + +The example script contains code to download and do any pre-processing for the dataset. + +To get started, simply run + +```bash +python gray_scott_rnn.py +``` + +## References + +- [Physics Informed RNN-DCT Networks for Time-Dependent Partial Differential Equations](https://arxiv.org/pdf/2202.12358.pdf) +- [Transformers for modeling physical systems](https://www.sciencedirect.com/science/article/abs/pii/S0893608021004500) diff --git a/examples/cfd/navier_stokes_rnn/README.md b/examples/cfd/navier_stokes_rnn/README.md new file mode 100644 index 0000000..df0ede3 --- /dev/null +++ b/examples/cfd/navier_stokes_rnn/README.md @@ -0,0 +1,62 @@ +# RNN for transient 2D Navier Stokes flow + +This example uses recurrent neural networks for spatio-temporal prediction of +the Navier Stokes flow. The example uses architecture that is inspired from +[Physics Informed RNN-DCT Networks for Time-Dependent Partial Differential Equations](https://arxiv.org/pdf/2202.12358.pdf) +paper. + +## Problem overview + +Time-series prediction is a key task in many domains. +The application of deep learning architectures—particularly RNNs, long short-term memory +networks (LSTMs), and similar networks has significantly enhanced the predictive +capabilities. +These models are unique in their ability to capture temporal dependencies and learn complex +patterns over time, making them well suited for forecasting time varying relationships. +In physics-ML, these models are critical in predicting dynamic physical systems’ evolution, +enabling better simulations, understanding of complex natural phenomena, and aiding +in discoveries. + +This problem involves predicting the next timesteps of a 2D Navier Stokes flow given input +of the initial condition or multiple input timesteps. + +## Dataset + +This example relies on the Dataset used in [Fourier Neural Operator paper](https://arxiv.org/pdf/2010.08895.pdf) +which solves the Navier Stokes equations in the vorticity form on a unit torus. +The different samples are generated by using different initial conditions for the simulation +The example uses 1000 training samples and 10 test samples. + +## Model overview and architecture + +The model uses Convolutional GRU layers for the RNN propagation and use a ResNet type +architecture for spatial encoding. The model has two variants- one in which the +subsequent predictions can be generated using only a single input time step (one-to-many) +and other where multiple input time-steps are used to produce multiple output time-steps +(many-to-many/seq-to-seq). + +## Getting Started + +The example script contains code to download and do any pre-processing for the dataset. +To download the dataset, `gdown` package is required, which can be installed using + +```bash +pip install gdown +``` + +To get started, simply run + +```bash +python navier_stokes_rnn.py +``` + +To run the seq2seq variant of the example, simply run + +```bash +python navier_stokes_rnn.py model_type=seq2seq +``` + +## References + +- [Physics Informed RNN-DCT Networks for Time-Dependent Partial Differential Equations](https://arxiv.org/pdf/2202.12358.pdf) +- [Fourier Neural Operator for Parametric Partial Differential Equations](https://arxiv.org/pdf/2010.08895.pdf) diff --git a/examples/weather/dlwp/README.md b/examples/weather/dlwp/README.md new file mode 100644 index 0000000..539cd21 --- /dev/null +++ b/examples/weather/dlwp/README.md @@ -0,0 +1,60 @@ +# Deep Learning Weather Prediction (DLWP) model for weather forecasting + +This example is an implementation of the +[DLWP Cubed-sphere](https://agupubs.onlinelibrary.wiley.com/doi/epdf/10.1029/2021MS002502) +model. The DLWP model can be used to predict the state of the atmosphere given a previous +atmospheric state. You can infer a 320-member ensemble set of six-week forecasts at 1.4° +resolution within a couple of minutes, demonstrating the potential of AI in developing +near real-time digital twins for weather prediction + +## Problem overview + +The goal is to train an AI model that can emulate the state of the atmosphere and predict +global weather over a certain time span. The Deep Learning Weather Prediction (DLWP) model +uses deep CNNs for globally gridded weather prediction. DLWP CNNs directly map u(t) to +its future state u(t+Δt) by learning from historical observations of the weather, +with Δt set to 6 hr + +## Dataset + +The model is trained on 7-channel subset of ERA5 Data that is mapped onto a cubed sphere +grid with a resolution of 64x64 grid cells. +The model uses years 1980-2015 for training, 2016-2017 for validation +and 2018 for out of sample testing. Some sample scripts for downloading the data and processing +it are provided in the `data_curation` directory. A larger subset of dataset is hosted +at the National Energy Research Scientific Computing Center (NERSC). For convenience +[it is available to all via Globus](https://app.globus.org/file-manager?origin_id=945b3c9e-0f8c-11ed-8daf-9f359c660fbd&origin_path=%2F~%2Fdata%2F). +You will need a Globus account and will need to be logged in to your account in order +to access the data. You may also need the [Globus Connect](https://www.globus.org/globus-connect) +to transfer data. + +## Model overview and architecture + +DLWP uses convolutional neural networks (CNNs) on a cubed sphere grid to produce global forecasts. +The latest DLWP model leverages a U-Net architecture with skip connections to capture multi-scale +processes.The model architecture is described in the following papers + +[Sub-Seasonal Forecasting With a Large Ensemble of Deep-Learning Weather Prediction Models](https://agupubs.onlinelibrary.wiley.com/doi/full/10.1029/2021MS002502) + +[Improving Data-Driven Global Weather Prediction Using Deep Convolutional Neural Networks on a Cubed Sphere](https://agupubs.onlinelibrary.wiley.com/doi/10.1029/2020MS002109) + +## Getting Started + +To train the model, run + +```bash +python train_dlwp.py +``` + +Progress can be monitored using MLFlow. Open a new terminal and navigate to the training +directory, then run: + +```bash +mlflow ui -p 2458 +``` + +View progress in a browser at + +## References + +[Sub-Seasonal Forecasting With a Large Ensemble of Deep-Learning Weather Prediction Models](https://agupubs.onlinelibrary.wiley.com/doi/full/10.1029/2021MS002502) diff --git a/examples/weather/dlwp/data_curation/README.md b/examples/weather/dlwp/data_curation/README.md index 0e7d419..ad0aafe 100644 --- a/examples/weather/dlwp/data_curation/README.md +++ b/examples/weather/dlwp/data_curation/README.md @@ -1,8 +1,4 @@ - # Order of running the scripts -1. Run `data_download_pbss.py`. -2. Run `remapping.py`. -3. Run `concat.py`. -4. Run `netcdf_to_h5py.py`. -5. Run `get_mean_std.py` +1. Run `data_download_simple.py`. +2. Run `post_processing.py`. diff --git a/examples/weather/dlwp/data_curation/concat.py b/examples/weather/dlwp/data_curation/concat.py deleted file mode 100644 index d8d4ccd..0000000 --- a/examples/weather/dlwp/data_curation/concat.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import subprocess -import xarray as xr -import numpy as np -from pathlib import Path - - -def concat(base_path, start_year, end_year, variables_list): - modified_files_path = base_path[:-1] + "-concat/" - Path(modified_files_path).mkdir(parents=True, exist_ok=True) - - for year in range(start_year, end_year): - print(f"Processing Year: {year}") - list_datasets = [] - for i in range(len(variables_list)): - list_datasets.append( - xr.open_dataset(base_path + str(year) + "_" + str(i) + "_rs_cs.nc") - ) - - # list_datasets = [ch.drop("level", errors='ignore') for ch in list_datasets] - # channel = xr.concat(list_datasets, dim='channel').transpose("time", "channel", "face", "y", "x") - channel = xr.merge(list_datasets) - # ds = channel.to_dataset(name="fields") - - combined_filename = modified_files_path + str(year) + ".nc" - channel.to_netcdf(combined_filename) - - -concat( - "./train-post/", - 1980, - 2016, - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], -) -concat( - "./test-post/", - 2016, - 2018, - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], -) -concat( - "./out_of_sample-post/", - 2018, - 2019, - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], -) diff --git a/examples/weather/dlwp/data_curation/data_download_pbss.py b/examples/weather/dlwp/data_curation/data_download_pbss.py deleted file mode 100644 index f93d631..0000000 --- a/examples/weather/dlwp/data_curation/data_download_pbss.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import os -import intake -import datetime -import xarray -import numpy as np -import dask -import hydra -import s3fs -import boto3 -import tempfile -import h5py -import netCDF4 - -from dask.diagnostics.progress import ProgressBar -from omegaconf import DictConfig - -from e2_datasets.path import catalog_path - - -def upload_np_array_to_s3(np_array, s3_path, s3_fs): - # Upload np array to s3 - with tempfile.NamedTemporaryFile() as f: - np.save(f, np_array) - f.flush() - s3_fs.put(f.name, s3_path) - - -def subsample_era5_4_var(era5_pl, era5_sl, era5_tisr, era5_6h_acc_precip): - # This function subsamples the ERA5 data to 4 variables and is only used for testing - # Subsample ERA5 data to 4 variables - subsampled_era5_4 = [ - era5_pl["T"].sel(level=850), # 0 - era5_pl["Z"].sel(level=1000), # 1 - era5_pl["Z"].sel(level=700), # 2 - era5_pl["Z"].sel(level=500), # 3 - era5_pl["Z"].sel(level=300), # 4 - era5_sl.TCWV, # 5 - era5_sl.VAR_2T, # 6 - ] - return subsampled_era5_4 - - -def subsample_era5_34_var(era5_pl, era5_sl, era5_tisr, era5_6h_acc_precip): - # Subsample ERA5 data to 34 variables - subsampled_era5_34 = [ - era5_sl.VAR_10U, - era5_sl.VAR_10V, - era5_sl.VAR_2T, - era5_sl.SP, - era5_sl.MSL, - era5_pl["T"].sel(level=850), - era5_pl["U"].sel(level=1000), - era5_pl["V"].sel(level=1000), - era5_pl["Z"].sel(level=1000), - era5_pl["U"].sel(level=850), - era5_pl["V"].sel(level=850), - era5_pl["Z"].sel(level=850), - era5_pl["U"].sel(level=500), - era5_pl["V"].sel(level=500), - era5_pl["Z"].sel(level=500), - era5_pl["T"].sel(level=500), - era5_pl["Z"].sel(level=50), - era5_pl["R"].sel(level=500), - era5_pl["R"].sel(level=850), - era5_sl.TCWV, - era5_sl.VAR_100U, - era5_sl.VAR_100V, - era5_pl["U"].sel(level=250), - era5_pl["V"].sel(level=250), - era5_pl["Z"].sel(level=250), - era5_pl["T"].sel(level=250), - era5_pl["U"].sel(level=100), - era5_pl["V"].sel(level=100), - era5_pl["Z"].sel(level=100), - era5_pl["T"].sel(level=100), - era5_pl["U"].sel(level=900), - era5_pl["V"].sel(level=900), - era5_pl["Z"].sel(level=900), - era5_pl["T"].sel(level=900), - ] - return subsampled_era5_34 - - -def create_xarray(subsampled_era5, name): - - # Only subsample era5 data so we only take years 1980-2021 - subsampled_era5 = [ - ch.sel(time=slice("1980-01-01", "2021-01-01")) for ch in subsampled_era5 - ] - - # Chunk every xarray the same way - common_chunks = {"time": 1, "latitude": 721, "longitude": 1440} - subsampled_era5 = [ch.chunk(common_chunks) for ch in subsampled_era5] - - # Create xarray dataset - channel_name = [ch.name for ch in subsampled_era5] - channel_level = [ch.coords.get("level", np.nan) for ch in subsampled_era5] - dropped_subsampled_era5 = [ - ch.drop("level", errors="ignore") for ch in subsampled_era5 - ] - - return dropped_subsampled_era5 - - -@hydra.main(version_base="1.2", config_path="conf", config_name="config") -def main(cfg: DictConfig) -> None: - # Open ERA5 from intake catalog - catalog = intake.open_catalog(catalog_path) - with dask.config.set( - scheduler="processes", - num_workers=cfg.dask.num_workers, - threads_per_worker=cfg.dask.threads_per_worker, - ): - if cfg.debug: - era5_pl = catalog.pbss_era5.era5_pl.read(variables=["T", "Z"]) - era5_sl = catalog.pbss_era5.era5_sl.read(variables=["VAR_2T", "TCWV"]) - else: - era5_pl = catalog.pbss_era5.era5_pl.read( - variables=["T", "U", "V", "Z", "R"] - ) - era5_sl = catalog.pbss_era5.era5_sl.read( - variables=[ - "VAR_10U", - "VAR_10V", - "VAR_2T", - "SP", - "MSL", - "VAR_100U", - "VAR_100V", - "TCWV", - ] - ) - era5_tisr = catalog.pbss_era5.era5_tisr.to_dask() - era5_6h_acc_precip = catalog.pbss_era5.era5_6h_acc_precip.to_dask() - # Subsample ERA5 - if cfg.debug: - subsampled_era5 = subsample_era5_4_var( - era5_pl, era5_sl, era5_tisr, era5_6h_acc_precip - ) - else: - if cfg.dataset.name == "era5_34_var": - subsampled_era5 = subsample_era5_34_var( - era5_pl, era5_sl, era5_tisr, era5_6h_acc_precip - ) - else: - raise ValueError(f"Unknown dataset {cfg.dataset.name}") - - # Create xarray of subsampled ERA5 - era5_xarray = create_xarray(subsampled_era5, cfg.dataset.name) - # Subset to desired time frequency - if cfg.dataset.dt != 1: - times = era5_xarray[0].time.dt.hour % cfg.dataset.dt == 0 - for i in range(len(era5_xarray)): - print(i) - era5_xarray[i] = era5_xarray[i].sel(time=times) - - # Set years (please keep this hard set) - if cfg.debug: - year_pairs = { - "train": list(range(1980, 2016)), - "test": [2016, 2017], - "out_of_sample": [2018], - } - else: - year_pairs = { - "train": list(range(1980, 2016)), - "test": [2016, 2017], - "out_of_sample": [2018], - } - - # Make hdf5 files - for split, years in year_pairs.items(): - # Save each year - for year in years: - for i in range(len(era5_xarray)): - # HDF5 filename - filename = os.path.join(split, f"{year}_{i}.nc") - # Save year using dask - with dask.config.set( - scheduler="threads", - num_workers=cfg.dask.num_workers, - threads_per_worker=cfg.dask.threads_per_worker, - **{"array.slicing.split_large_chunks": False}, - ): - with ProgressBar(): - # Get data for the current year - year_data = era5_xarray[i].sel( - time=era5_xarray[i].time.dt.year == year - ) - # year_data = era5_xarray[i].sel(time=slice('1980-01-01', '1980-01-02')) - year_data.to_netcdf(filename, engine="h5netcdf") - - -if __name__ == "__main__": - main() diff --git a/examples/weather/dlwp/data_curation/data_download_simple.py b/examples/weather/dlwp/data_curation/data_download_simple.py new file mode 100644 index 0000000..fe46a78 --- /dev/null +++ b/examples/weather/dlwp/data_curation/data_download_simple.py @@ -0,0 +1,77 @@ +# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import cdsapi + + +def download_data(var, year, save_path): + c = cdsapi.Client() + if var[0] == "single_level": + config = { + "product_type": "reanalysis", + "format": "netcdf", + "variable": var[1], + "year": year, + "month": "01", + "day": ["01", "02", "03"], + "time": ["00:00", "06:00", "12:00", "18:00"], + } + c.retrieve( + "reanalysis-era5-single-levels", + config, + save_path, + ) + elif var[0] == "pressure_level": + config = { + "product_type": "reanalysis", + "format": "netcdf", + "variable": var[1], + "pressure_level": var[2], + "year": year, + "month": "01", + "day": ["01", "02", "03"], + "time": ["00:00", "06:00", "12:00", "18:00"], + } + c.retrieve( + "reanalysis-era5-pressure-levels", + config, + save_path, + ) + + +var_list = [ + ("pressure_level", "temperature", "850"), + ("pressure_level", "geopotential", "1000"), + ("pressure_level", "geopotential", "700"), + ("pressure_level", "geopotential", "500"), + ("pressure_level", "geopotential", "300"), + ("single_level", "total_column_water"), + ("single_level", "2m_temperature"), +] + +for i, var in enumerate(var_list): + if not os.path.exists("data/train_temp/"): + os.makedirs("data/train_temp/") + download_data(var, "1979", "./data/train_temp/1979_" + str(i) + ".nc") + +for i, var in enumerate(var_list): + if not os.path.exists("data/test_temp/"): + os.makedirs("data/test_temp/") + download_data(var, "2017", "./data/test_temp/2017_" + str(i) + ".nc") + +for i, var in enumerate(var_list): + if not os.path.exists("data/out_of_sample_temp/"): + os.makedirs("data/out_of_sample_temp/") + download_data(var, "2018", "./data/out_of_sample_temp/2018_" + str(i) + ".nc") diff --git a/examples/weather/dlwp/data_curation/get_mean_std.py b/examples/weather/dlwp/data_curation/get_mean_std.py deleted file mode 100644 index 2dd0441..0000000 --- a/examples/weather/dlwp/data_curation/get_mean_std.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import xarray as xr -import numpy as np -import glob - - -def process_file(file, var): - ds = xr.open_dataset(file)[var] - mean = ds.mean().compute() - std = ds.std().compute() - var = ds.var().compute() - return mean, std, var - - -directories = [ - "./train-post-concat", - "./test-post-concat", - "./out_of_sample-post-concat", -] -files = [] -for directory in directories: - files.extend(glob.glob(f"{directory}/*.nc")) - -var_list = ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"] - -mean_list = [] -std_list = [] -for var in var_list: - file_stats = [process_file(file, var) for file in files] - - global_mean = np.mean([mean for mean, std, var in file_stats]) - global_var = np.average( - [var for mean, std, var in file_stats], - weights=[mean for mean, std, var in file_stats], - ) - global_std = np.sqrt(global_var) - - # Print the results for individual files - # for i, (mean, std, var) in enumerate(file_stats): - # print(f"File {i+1}: Mean = {mean}, Standard Deviation = {std}") - - mean_list.append(global_mean) - std_list.append(global_std) - print( - f"Var = {var}, Global Mean = {global_mean}, Global Standard Deviation = {global_std}" - ) - -mean_array = np.expand_dims( - np.expand_dims(np.expand_dims(np.stack(mean_list, axis=0), -1), -1), 0 -) -std_array = np.expand_dims( - np.expand_dims(np.expand_dims(np.stack(std_list, axis=0), -1), -1), 0 -) -with open("global_means.npy", "wb") as f: - np.save(f, mean_array) -with open("global_stds.npy", "wb") as f: - np.save(f, std_array) - -print(mean_array.shape, std_array.shape) diff --git a/examples/weather/dlwp/data_curation/latlon_grid.py b/examples/weather/dlwp/data_curation/latlon_grid.py deleted file mode 100644 index 810117c..0000000 --- a/examples/weather/dlwp/data_curation/latlon_grid.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import xarray as xr -import numpy as np -import subprocess - -# read any arbitrary input -ds = xr.open_dataset("./train/1980_0.nc") -latgrid, longrid = np.meshgrid(ds["longitude"].values, ds["latitude"].values) - -# Create DataArrays from the 2D arrays and add dimensions -da_latgrid = xr.DataArray( - latgrid, - coords=[ds["latitude"].values, ds["longitude"].values], - dims=["latitude", "longitude"], -) -da_longrid = xr.DataArray( - longrid, - coords=[ds["latitude"].values, ds["longitude"].values], - dims=["latitude", "longitude"], -) - -# Create a Dataset and add the DataArrays -ds_new = xr.Dataset({"latgrid": da_latgrid, "longrid": da_longrid}) - -# Save the Dataset to NetCDF -ds_new.to_netcdf("latlon_grid_field.nc") - -# Remap the grid to cubed sphere -remap_cmd = "ApplyOfflineMap --in_data latlon_grid_field.nc --out_data latlon_grid_field_cs.nc --map ./map_LL721x1440_CS64.nc --var 'latgrid,longrid'" -output = subprocess.run(remap_cmd, shell=True, stdout=subprocess.DEVNULL) - -# reshape tempest remap's output to (face, res, res) shape -mapped_filename = "latlon_grid_field_cs.nc" -reshaped_mapped_filename = "latlon_grid_field_rs_cs.nc" -ds = xr.open_dataset(mapped_filename) -list_datasets = [] -for key in list(ds.keys()): - if key == "lat" or key == "lon": - pass - else: - data_var = ds[key] - col_var = ds["ncol"] - - num = 6 - res = int(np.sqrt(col_var.size / num)) - - y_coords = np.arange(res) - x_coords = np.arange(res) - - data_var_reshaped = data_var.data.reshape((num, res, res)) - - # Create a new coordinate for the 'face' dimension - face_coords = np.arange(num) - - # Create a new DataArray with the reshaped data and updated coordinates - reshaped_da = xr.DataArray( - data_var_reshaped, - dims=[ - "face", - "y", - "x", - ], - name=key, - ) - - # Add the coordinates to the reshaped DataArray - reshaped_da["face"] = ("face", face_coords) - reshaped_da["y"] = ("y", y_coords) - reshaped_da["x"] = ("x", x_coords) - - # Copy the attributes from the original data variable - reshaped_da.attrs = data_var.attrs - - list_datasets.append(reshaped_da) - -combined = xr.merge(list_datasets) -# Save the dataset to a new file -combined.to_netcdf(reshaped_mapped_filename) diff --git a/examples/weather/dlwp/data_curation/netcdf_to_h5py.py b/examples/weather/dlwp/data_curation/netcdf_to_h5py.py deleted file mode 100644 index 9b070da..0000000 --- a/examples/weather/dlwp/data_curation/netcdf_to_h5py.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import os -import subprocess -import xarray as xr -import numpy as np -from pathlib import Path - - -def netcdf_to_h5py(base_path, start_year, end_year, variables_list): - modified_files_path = base_path[:-1] + "-h5py/" - Path(modified_files_path).mkdir(parents=True, exist_ok=True) - - for year in range(start_year, end_year): - print(f"Processing Year: {year}") - nc = xr.open_dataset(base_path + str(year) + ".nc") - channel = xr.concat( - [nc[var] for var in variables_list], dim="channel" - ).transpose("time", "channel", "face", "y", "x") - h5_filename = modified_files_path + str(year) + ".h5" - ds = channel.to_dataset(name="fields") - ds.to_netcdf(h5_filename, engine="h5netcdf") - - -netcdf_to_h5py( - "./train-post-concat/", - 1980, - 2016, - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], -) -netcdf_to_h5py( - "./test-post-concat/", - 2016, - 2018, - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], -) -netcdf_to_h5py( - "./out_of_sample-post-concat/", - 2018, - 2019, - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], -) diff --git a/examples/weather/dlwp/data_curation/post_processing.py b/examples/weather/dlwp/data_curation/post_processing.py new file mode 100644 index 0000000..868ddfe --- /dev/null +++ b/examples/weather/dlwp/data_curation/post_processing.py @@ -0,0 +1,72 @@ +# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import glob +import numpy as np +import xarray as xr +import h5py +from scipy.sparse import coo_matrix +from collections import defaultdict + + +def processor(files, dest): + # group files by year + files_by_year = defaultdict(list) + for file in files: + basename = os.path.basename(file) + year = basename.rsplit("_", 1)[0] + files_by_year[year].append(file) + + input_map_wts = xr.open_dataset("./map_LL721x1440_CS64.nc") + i = input_map_wts.row.values - 1 + j = input_map_wts.col.values - 1 + data = input_map_wts.S.values + M = coo_matrix((data, (i, j))) + + results = {} + # process files year by year + for year, filenames in files_by_year.items(): + result_arrays = [] + filenames = sorted(filenames, key=lambda x: x[-4]) + for filename in filenames: + with xr.open_dataset(filename) as ds: + data_var_name = list(ds.data_vars)[0] + # read the data variable and multiply by the matrix + data = ds[data_var_name].values + num_time = data.shape[0] + result = np.reshape( + np.reshape(data, (num_time, -1)) * M.T, (num_time, 6, 64, 64) + ) + result_arrays.append(result.astype(np.float32)) + + # concatenate the arrays + result_stack = np.stack(result_arrays, axis=1) + results[year] = result_stack + + for year, result in results.items(): + print(year, result.shape) + if not os.path.exists(dest): + os.makedirs(dest) + output_filename = dest + f"{year}.h5" + print(output_filename) + # store result in a HDF5 file + with h5py.File(output_filename, "w") as hf: + hf.create_dataset("fields", data=result) + + +processor(glob.glob("./data/train_temp/*.nc"), "./data/train/") +processor(glob.glob("./data/test_temp/*.nc"), "./data/test/") +processor(glob.glob("./data/out_of_sample_temp/*.nc"), "./data/out_of_sample/") diff --git a/examples/weather/dlwp/data_curation/remapping.py b/examples/weather/dlwp/data_curation/remapping.py deleted file mode 100644 index e504a62..0000000 --- a/examples/weather/dlwp/data_curation/remapping.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import os -import subprocess -import xarray as xr -import numpy as np -from pathlib import Path - -base_path = "./train/" - - -def remapping(base_path, start_year, end_year, variables_list, new_var_names, map_file): - # prepare directory to store the modified data - modified_files_path = base_path[:-1] + "-post/" - Path(modified_files_path).mkdir(parents=True, exist_ok=True) - - for year in range(start_year, end_year): - print(f"Processing Year: {year}") - for i, var in enumerate(variables_list): - print(f"Processing Variable: {var}") - filename = base_path + str(year) + "_" + str(i) + ".nc" - newfilename = modified_files_path + str(year) + "_" + str(i) + ".nc" - - ds = xr.open_dataset(filename) - # Clean-up the attributes to avoid errors with tempest-remap - ds.attrs["image_git_repo"] = "random" - ds.attrs["image_git_commit_sha"] = "random" - ds.attrs["SLURM_JOB_USER"] = "random" - - ds.to_netcdf(newfilename) - - cmd1 = "ncatted -a _FillValue,,m,f,9.96921e+36 " + str(newfilename) - cmd2 = "ncatted -a missing_value,,m,f,9.96921e+36 " + str(newfilename) - cmd3 = "ncatted -a number_of_significant_digits,,m,f,0 " + str(newfilename) - cmd4 = "ncatted -a ecmwf_local_table,,m,f,0 " + str(newfilename) - cmd5 = "ncatted -a ecmwf_parameter,,m,f,0 " + str(newfilename) - - subprocess.run(cmd1, shell=True) - subprocess.run(cmd2, shell=True) - subprocess.run(cmd3, shell=True) - subprocess.run(cmd4, shell=True) - subprocess.run(cmd5, shell=True) - print("pre-processing complete, applying remaps") - - # Apply remaps - mapped_filename = modified_files_path + str(year) + "_" + str(i) + "_cs.nc" - remap_cmd = ( - "ApplyOfflineMap --in_data " - + str(newfilename) - + " --out_data " - + mapped_filename - + " --map " - + str(map_file) - + " --var " - + var - ) - output = subprocess.run(remap_cmd, shell=True, stdout=subprocess.DEVNULL) - print("applying remaps complete, now reshaping") - - # reshape tempest remap's output to (face, res, res) shape - reshaped_mapped_filename = ( - modified_files_path + str(year) + "_" + str(i) + "_rs_cs.nc" - ) - ds = xr.open_dataset(mapped_filename) - list_datasets = [] - for key in list(ds.keys()): - if key == "lat" or key == "lon": - pass - else: - data_var = ds[key] - time_var = ds["time"] - col_var = ds["ncol"] - - num = 6 - res = int(np.sqrt(col_var.size / num)) - - y_coords = np.arange(res) - x_coords = np.arange(res) - - data_var_reshaped = data_var.data.reshape( - (time_var.size, num, res, res) - ) - - # Create a new coordinate for the 'face' dimension - face_coords = np.arange(num) - - # Create a new DataArray with the reshaped data and updated coordinates - reshaped_da = xr.DataArray( - data_var_reshaped, - dims=[ - "time", - "face", - "y", - "x", - ], - name=new_var_names[i], - ) - - # Add the coordinates to the reshaped DataArray - reshaped_da["time"] = ("time", ds["time"].data) - reshaped_da["face"] = ("face", face_coords) - reshaped_da["y"] = ("y", y_coords) - reshaped_da["x"] = ("x", x_coords) - - # Copy the attributes from the original data variable - reshaped_da.attrs = data_var.attrs - - list_datasets.append(reshaped_da) - - combined = xr.merge(list_datasets) - # Save the dataset to a new file - combined.to_netcdf(reshaped_mapped_filename) - print("reshaping complete") - - # Clean-up temp files - os.remove(newfilename) - os.remove(mapped_filename) - - -# remap train data -remapping( - "./train/", - 1980, - 2016, - ["T", "Z", "Z", "Z", "Z", "TCWV", "VAR_2T"], - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], - "./map_LL721x1440_CS64.nc", -) - -# remap test data -remapping( - "./test/", - 2016, - 2018, - ["T", "Z", "Z", "Z", "Z", "TCWV", "VAR_2T"], - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], - "./map_LL721x1440_CS64.nc", -) - -# remap out-of-sample data -remapping( - "./out_of_sample/", - 2018, - 2019, - ["T", "Z", "Z", "Z", "Z", "TCWV", "VAR_2T"], - ["VAR_T_850", "Z_1000", "Z_700", "Z_500", "Z_300", "TCWV", "VAR_T_2"], - "./map_LL721x1440_CS64.nc", -)