Skip to content

Commit

Permalink
Add task to prepare emissions for GEFS (NOAA-EMC#2562)
Browse files Browse the repository at this point in the history
This PR:
- introduces a task to prepare emissions for a forecast into the GEFS
application.
- adds configuration, j-job, rocoto job, ex-script and the python class
for this job
- updates GEFS workflow to be able to generate the XML to call this job.
- updates the `fcst` and `efcs` job dependencies in the GEFS application
to depend on `prep_emissions` if aerosols are turned ON.
- provides a placeholder for @bbakernoaa to work on the details for
preparing emissions.

Co-authored-by: Walter Kolczynski - NOAA <[email protected]>
  • Loading branch information
aerorahul and WalterKolczynski-NOAA authored May 7, 2024
1 parent 233c188 commit 9b6f840
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 0 deletions.
4 changes: 4 additions & 0 deletions env/HERA.env
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then

export APRUN_CALCFIMS="${launcher} -n 1"

elif [[ "${step}" = "prep_emissions" ]]; then

export APRUN="${launcher} -n 1"

elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then

export CFP_MP="YES"
Expand Down
4 changes: 4 additions & 0 deletions env/HERCULES.env
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ case ${step} in

export APRUN_CALCFIMS="${launcher} -n 1"
;;
"prep_emissions")

export APRUN="${launcher} -n 1"
;;
"waveinit" | "waveprep" | "wavepostsbs" | "wavepostbndpnt" | "wavepostpnt" | "wavepostbndpntbll")

export CFP_MP="YES"
Expand Down
4 changes: 4 additions & 0 deletions env/JET.env
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then

export APRUN_CALCFIMS="${launcher} -n 1"

elif [[ "${step}" = "prep_emissions" ]]; then

export APRUN="${launcher} -n 1"

elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then

export CFP_MP="YES"
Expand Down
4 changes: 4 additions & 0 deletions env/ORION.env
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then

export APRUN_CALCFIMS="${launcher} -n 1"

elif [[ "${step}" = "prep_emissions" ]]; then

export APRUN="${launcher} -n 1"

elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || \
[[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostpnt" ]] || [[ "${step}" == "wavepostbndpntbll" ]]; then

Expand Down
4 changes: 4 additions & 0 deletions env/S4.env
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then

export APRUN_CALCFIMS="${launcher} -n 1"

elif [[ "${step}" = "prep_emissions" ]]; then

export APRUN="${launcher} -n 1"

elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then

export CFP_MP="YES"
Expand Down
4 changes: 4 additions & 0 deletions env/WCOSS2.env
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then

export APRUN_CALCFIMS="${launcher} -n 1"

elif [[ "${step}" = "prep_emissions" ]]; then

export APRUN="${launcher} -n 1"

elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then

export USE_CFP="YES"
Expand Down
35 changes: 35 additions & 0 deletions jobs/JGLOBAL_PREP_EMISSIONS
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "prep_emissions" -c "base prep_emissions"

##############################################
# Set variables used in the script
##############################################
# TODO: Set local variables used in this script e.g. GDATE may be needed for previous cycle

##############################################
# Begin JOB SPECIFIC work
##############################################
# Generate COM variables from templates
# TODO: Add necessary COMIN, COMOUT variables for this job

###############################################################
# Run relevant script
EXSCRIPT=${PREP_EMISSIONS_PY:-${SCRgfs}/exglobal_prep_emissions.py}
${EXSCRIPT}
status=$?
(( status != 0 )) && ( echo "FATAL ERROR: Error executing ${EXSCRIPT}, ABORT!"; exit "${status}" )

##############################################
# End JOB SPECIFIC work
##############################################

##############################################
# Final processing
##############################################
if [[ -e "${pgmout}" ]] ; then
cat "${pgmout}"
fi

exit 0
23 changes: 23 additions & 0 deletions jobs/rocoto/prep_emissions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"

###############################################################
# Source UFSDA workflow modules
source "${HOMEgfs}/ush/load_fv3gfs_modules.sh"
status=$?
(( status != 0 )) && exit "${status}"

export job="prep_emissions"
export jobid="${job}.$$"

###############################################################
# setup python path for workflow utilities and tasks
PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python"
export PYTHONPATH

###############################################################
# Execute the JJOB
"${HOMEgfs}/jobs/JGLOBAL_PREP_EMISSIONS"
status=$?
exit "${status}"
11 changes: 11 additions & 0 deletions parm/config/gefs/config.prep_emissions
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#! /usr/bin/env bash

########## config.prep_emissions ##########
# aerosol emissions preprocessing specific

echo "BEGIN: config.prep_emissions"

# Get task specific resources
source "${EXPDIR}/config.resources" prep_emissions

echo "END: config.prep_emissions"
8 changes: 8 additions & 0 deletions parm/config/gefs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ case ${step} in
export memory_waveinit="2GB"
;;

"prep_emissions")
export wtime_prep_emissions="00:10:00"
export npe_prep_emissions=1
export nth_prep_emissions=1
export npe_node_prep_emissions=$(( npe_node_max / nth_prep_emissions ))
export memory_prep_emissions="1GB"
;;

"fcst" | "efcs")
export is_exclusive=True

Expand Down
25 changes: 25 additions & 0 deletions scripts/exglobal_prep_emissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python3
# exglobal_prep_emissions.py
# This script creates a emissions object
# which perform the pre-processing for aerosol emissions
import os

from wxflow import Logger, cast_strdict_as_dtypedict
from pygfs import AerosolEmissions


# Initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)


if __name__ == '__main__':

# Take configuration from environment and cast it as python dictionary
config = cast_strdict_as_dtypedict(os.environ)

# Instantiate the emissions pre-processing task
emissions = AerosolEmissions(config)
emissions.initialize()
emissions.configure()
emissions.execute(emissions.task_config.DATA, emissions.task_config.APRUN)
emissions.finalize()
16 changes: 16 additions & 0 deletions ush/python/pygfs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import os

from .task.analysis import Analysis
from .task.aero_emissions import AerosolEmissions
from .task.aero_analysis import AerosolAnalysis
from .task.atm_analysis import AtmAnalysis
from .task.atmens_analysis import AtmEnsAnalysis
from .task.snow_analysis import SnowAnalysis
from .task.upp import UPP
from .task.oceanice_products import OceanIceProducts
from .task.gfs_forecast import GFSForecast

__docformat__ = "restructuredtext"
__version__ = "0.1.0"
pygfs_directory = os.path.dirname(__file__)
82 changes: 82 additions & 0 deletions ush/python/pygfs/task/aero_emissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python3

import os
from logging import getLogger
from typing import Dict, Any, Union
from pprint import pformat

from wxflow import (AttrDict,
parse_j2yaml,
FileHandler,
Jinja,
logit,
Task,
add_to_datetime, to_timedelta,
WorkflowException,
Executable, which)

logger = getLogger(__name__.split('.')[-1])


class AerosolEmissions(Task):
"""Aerosol Emissions pre-processing Task
"""

@logit(logger, name="AerosolEmissions")
def __init__(self, config: Dict[str, Any]) -> None:
"""Constructor for the Aerosol Emissions task
Parameters
----------
config : Dict[str, Any]
Incoming configuration for the task from the environment
Returns
-------
None
"""
super().__init__(config)

local_variable = "something"

localdict = AttrDict(
{'variable_used_repeatedly': local_variable}
)
self.task_config = AttrDict(**self.config, **self.runtime_config, **localdict)

@staticmethod
@logit(logger)
def initialize() -> None:
"""Initialize the work directory
"""

@staticmethod
@logit(logger)
def configure() -> None:
"""Configure the artifacts in the work directory.
Copy run specific data to run directory
"""

@staticmethod
@logit(logger)
def execute(workdir: Union[str, os.PathLike], aprun_cmd: str) -> None:
"""Run the executable (if any)
Parameters
----------
workdir : str | os.PathLike
work directory with the staged data, parm files, namelists, etc.
aprun_cmd : str
launcher command for executable.x
Returns
-------
None
"""

@staticmethod
@logit(logger)
def finalize() -> None:
"""Perform closing actions of the task.
Copy data back from the DATA/ directory to COM/
"""
6 changes: 6 additions & 0 deletions workflow/applications/gefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def _get_app_configs(self):
if self.do_ocean or self.do_ice:
configs += ['oceanice_products']

if self.do_aero:
configs += ['prep_emissions']

return configs

@staticmethod
Expand All @@ -45,6 +48,9 @@ def get_task_names(self):
if self.do_wave:
tasks += ['waveinit']

if self.do_aero:
tasks += ['prep_emissions']

tasks += ['fcst']

if self.nens > 0:
Expand Down
29 changes: 29 additions & 0 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ def waveinit(self):

return task

def prep_emissions(self):
deps = []
dep_dict = {'type': 'task', 'name': f'stage_ic'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('prep_emissions')
task_name = 'prep_emissions'
task_dict = {'task_name': task_name,
'resources': resources,
'envars': self.envars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/prep_emissions.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}
task = rocoto.create_task(task_dict)

return task

def fcst(self):
dependencies = []
dep_dict = {'type': 'task', 'name': f'stage_ic'}
Expand All @@ -98,6 +119,10 @@ def fcst(self):
dep_dict = {'type': 'task', 'name': f'wave_init'}
dependencies.append(rocoto.add_dependency(dep_dict))

if self.app_config.do_aero:
dep_dict = {'type': 'task', 'name': f'prep_emissions'}
dependencies.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)

resources = self.get_resource('fcst')
Expand Down Expand Up @@ -125,6 +150,10 @@ def efcs(self):
dep_dict = {'type': 'task', 'name': f'wave_init'}
dependencies.append(rocoto.add_dependency(dep_dict))

if self.app_config.do_aero:
dep_dict = {'type': 'task', 'name': f'prep_emissions'}
dependencies.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)

efcsenvars = self.envars.copy()
Expand Down

0 comments on commit 9b6f840

Please sign in to comment.