Skip to content

Commit

Permalink
Calculate runoff volumes (#24)
Browse files Browse the repository at this point in the history
* add runoff volume calculator

* parameterize variable names
  • Loading branch information
rileyhales authored Jun 2, 2024
1 parent bdb9552 commit b8f03ae
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 106 deletions.
16 changes: 9 additions & 7 deletions config_files/config.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
{
"routing_params_file": "",
"connectivity_file": "",
"runoff_file": "",
"runoff_volumes_file": "",
"outflow_file": "",
"routing": "",
"lhs_file": "",
"lhsinv_file": "",
"adj_file": "",
"nonlinear_routing_params_file": "",
"nonlinear_thresholds_file": "",
"dt_routing": "",
"dt_outflows": "",
"positive_flow": true,
"initial_state_file": "",
"final_state_file": "",
"log": false,
"log_stream": "",
"log_level": "",
"progress_bar": "",
"job_name": "",
"progress_bar": ""
"log_level": "",
"log_stream": "",
"var_runoff_volume": "ro_vol",
"var_river_id": "river_id",
"var_outflow": "Q"
}
7 changes: 5 additions & 2 deletions config_files/config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Required Watershed Files
routing_params_file: ''
connectivity_file: ''
adj_file: '' # Optional - if it does not exist, it will be cached at this location
# Routing Input and Output
runoff_file: ''
runoff_volumes_file: ''
outflow_file: ''
# Input and Output file structure - Optional
var_runoff_volume: 'ro_vol'
var_river_id: 'river_id'
var_outflow: 'Q'
# Compute Options - Optional
routing: 'linear'
positive_flow: True
Expand Down
12 changes: 7 additions & 5 deletions config_files/descriptions.csv
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
Parameter Name,Type,Description
routing_params_file,string,Path to the routing parameters file.
connectivity_file,string,Path to the network connectivity file.
runoff_file,string,Path to the file with runoff values to be routed.
runoff_volumes_file,string,Path to the file with catchment level runoff volumes to be routed.
outflow_file,string,Path where the outflows file should be saved.
lhs_file,string,Path where the LHS matrix should be cached.
lhsinv_file,string,Path where the LHS inverse matrix should be cached.
adj_file,string,Path where the adjacency matrix should be cached.
dt_routing,number,Time interval in seconds between routing computations.
dt_outflows,number,Time interval in seconds between writing flows to disc.
routing,string,Either 'linear' or 'nonlinear' routing- default 'linear'.
nonlinear_routing_params_file,string,Path to the file with nonlinear routing parameters.
nonlinear_thresholds_file,string,Path to the file with nonlinear routing thresholds.
positive_flows,boolean,Force minimum flow value to be >= 0.
initial_state_file,string,Path to the file with initial state values.
final_state_file,string,Path to the file with final state values.
log,boolean,whether to display log messages defaulting to False
log_stream,string,the destination for logged messages: stdout stderr or a file path. default to stdout
log_level,string,Level of logging: either 'debug' 'info' 'warning' 'error' or 'critical'.
job_name,string,A name for this job to be printed in debug statements.
progress_bar,boolean,Indicates whether or not to show a progress bar in debug statements: true or false.
progress_bar,boolean,Indicates whether or not to show a progress bar in debug statements: true or false.
var_runoff_volume,string,Name of the variable in the runoff volumes file that contains the runoff volumes.
var_river_id,string,Name of the variable in all files that contains the river IDs.
var_outflow,string,Name of the variable in the outflows file that contains the outflows.
101 changes: 53 additions & 48 deletions river_route/_MuskingumCunge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import xarray as xr
import yaml

from ._meta import __version__ as VERSION
from .__metadata__ import __version__ as VERSION
from .tools import connectivity_to_adjacency_matrix
from .tools import connectivity_to_digraph

Expand Down Expand Up @@ -84,8 +84,10 @@ def set_configs(self, config_file, **kwargs) -> None:
self.conf['job_name'] = self.conf.get('job_name', 'untitled_job')
self.conf['log'] = bool(self.conf.get('log', False))
self.conf['progress_bar'] = self.conf.get('progress_bar', self.conf['log'])
self.conf['runoff_volume_var'] = self.conf.get('runoff_volume_var', 'm3_riv')
self.conf['log_level'] = self.conf.get('log_level', 'INFO')
self.conf['var_runoff_volume'] = self.conf.get('var_runoff_volume', 'ro_vol')
self.conf['var_river_id'] = self.conf.get('var_river_id', 'river_id')
self.conf['var_discharge'] = self.conf.get('var_discharge', 'Q')

# routing and solver options - time is validated at route time
self.conf['routing'] = self.conf.get('routing', 'linear')
Expand All @@ -100,8 +102,8 @@ def set_configs(self, config_file, **kwargs) -> None:
assert 'routing_params_file' in self.conf, 'Linear routing requires linear routing params'

# type and path checking on file paths
if isinstance(self.conf['runoff_file'], str):
self.conf['runoff_file'] = [self.conf['runoff_file'], ]
if isinstance(self.conf['runoff_volumes_file'], str):
self.conf['runoff_volumes_file'] = [self.conf['runoff_volumes_file'], ]
if isinstance(self.conf['outflow_file'], str):
self.conf['outflow_file'] = [self.conf['outflow_file'], ]
for arg in [k for k in self.conf.keys() if 'file' in k]:
Expand Down Expand Up @@ -129,7 +131,7 @@ def set_configs(self, config_file, **kwargs) -> None:
def _validate_configs(self) -> None:
LOG.info('Validating configs file')
required_file_paths = ['connectivity_file',
'runoff_file',
'runoff_volumes_file',
'outflow_file', ]
paths_should_exist = ['connectivity_file', ]

Expand All @@ -145,7 +147,7 @@ def _validate_configs(self) -> None:
for arg in paths_should_exist:
if not os.path.exists(self.conf[arg]):
raise FileNotFoundError(f'{arg} not found at given path')
for path in self.conf['runoff_file']:
for path in self.conf['runoff_volumes_file']:
assert os.path.exists(path), FileNotFoundError(f'runoff file not found at given path: {path}')

return
Expand All @@ -162,7 +164,7 @@ def _read_river_ids(self) -> np.array:
"""
Reads river ids vector from parquet given in config file
"""
return pd.read_parquet(self.conf['routing_params_file'], columns=['rivid', ]).values.flatten()
return pd.read_parquet(self.conf['routing_params_file'], columns=[self.conf['var_river_id'], ]).values.flatten()

def _read_linear_k(self) -> np.array:
"""
Expand Down Expand Up @@ -215,17 +217,8 @@ def _set_adjacency_matrix(self) -> None:
"""
if hasattr(self, 'A'):
return

if os.path.exists(self.conf.get('adj_file', '')):
LOG.debug('Loading adjacency matrix from file')
self.A = scipy.sparse.load_npz(self.conf['adj_file'])
return

LOG.debug('Calculating Network Adjacency Matrix (A)')
self.A = connectivity_to_adjacency_matrix(self.conf['connectivity_file'])
if self.conf.get('adj_file', ''):
LOG.info('Saving adjacency matrix to file')
scipy.sparse.save_npz(self.conf['adj_file'], self.A)
return

def _calculate_lhs_matrix(self) -> None:
Expand Down Expand Up @@ -272,10 +265,10 @@ def _calculate_muskingum_coefficients(self, k: np.ndarray = None, x: np.ndarray
"""
LOG.debug('Calculating MuskingumCunge coefficients')

if not hasattr(self, 'k'):
self._read_linear_k()
if not hasattr(self, 'x'):
self._read_linear_x()
if k is None:
k = self._read_linear_k()
if x is None:
x = self._read_linear_x()

dt_div_k = self.dt_routing / k
denom = dt_div_k + (2 * (1 - x))
Expand Down Expand Up @@ -330,14 +323,14 @@ def route(self, **kwargs) -> 'MuskingumCunge':
self._set_adjacency_matrix()

LOG.debug('Getting initial value arrays')
for runoff_file, outflow_file in zip(self.conf['runoff_file'], self.conf['outflow_file']):
for runoff_file, outflow_file in zip(self.conf['runoff_volumes_file'], self.conf['outflow_file']):
LOG.info('-' * 80)
LOG.info(f'Reading runoff volumes file: {runoff_file}')
with xr.open_dataset(runoff_file) as runoff_ds:
LOG.debug('Reading time array')
dates = runoff_ds['time'].values.astype('datetime64[s]')
LOG.debug('Reading runoff array')
runoffs = runoff_ds[self.conf['runoff_volume_var']].values
runoffs = runoff_ds[self.conf['var_runoff_volume']].values

self._set_time_params(dates)
self._calculate_muskingum_coefficients()
Expand Down Expand Up @@ -416,21 +409,21 @@ def _write_outflows(self, outflow_file: str, dates: np.array, outflow_array: np.

with nc.Dataset(outflow_file, mode='w', format='NETCDF4') as ds:
ds.createDimension('time', size=dates.shape[0])
ds.createDimension('rivid', size=self.A.shape[0])
ds.createDimension(self.conf['var_river_id'], size=outflow_array.shape[1])

ds.createVariable('time', 'f8', ('time',))
ds['time'].units = f'seconds since {reference_date.strftime("%Y-%m-%d %H:%M:%S")}'
ds['time'][:] = dates
time_var = ds.createVariable('time', 'f8', ('time',))
time_var.units = f'seconds since {reference_date.strftime("%Y-%m-%d %H:%M:%S")}'
time_var[:] = dates

ds.createVariable('rivid', 'i4', ('rivid',))
ds['rivid'][:] = self._read_river_ids()
id_var = ds.createVariable(self.conf['var_river_id'], 'i4', (self.conf['var_river_id']), )
id_var[:] = self._read_river_ids()

ds.createVariable('Qout', 'f4', ('time', 'rivid'))
ds['Qout'][:] = outflow_array
ds['Qout'].long_name = 'Discharge at the outlet of each river reach'
ds['Qout'].standard_name = 'discharge'
ds['Qout'].aggregation_method = 'mean'
ds['Qout'].units = 'm3 s-1'
flow_var = ds.createVariable(self.conf['var_discharge'], 'f4', ('time', self.conf['var_river_id']))
flow_var[:] = outflow_array
flow_var.long_name = 'Discharge at catchment outlet'
flow_var.standard_name = 'discharge'
flow_var.aggregation_method = 'mean'
flow_var.units = 'm3 s-1'
return

def hydrograph(self, river_id: int) -> pd.DataFrame:
Expand All @@ -444,49 +437,61 @@ def hydrograph(self, river_id: int) -> pd.DataFrame:
pandas.DataFrame
"""
with xr.open_mfdataset(self.conf['outflow_file']) as ds:
df = ds.Qout.sel(rivid=river_id).to_dataframe()[['Qout', ]]
df = (
ds
[self.conf['var_discharge']]
.sel(**{self.conf['var_river_id']: river_id})
.to_dataframe()
[[self.conf['var_discharge'], ]]
)
df.columns = [river_id, ]
return df

def mass_balance(self, rivid: int, ancestors: list = None) -> pd.DataFrame:
def mass_balance(self, river_id: int, ancestors: list = None) -> pd.DataFrame:
"""
Get the mass balance for a given river id as a pandas dataframe
Args:
rivid: the ID of a river reach in the output files
ancestors: a list of the given rivid and all rivers upstream of that river
river_id: the ID of a river reach in the output files
ancestors: a list of the given river_id and all rivers upstream of that river
Returns:
pandas.DataFrame
"""
if type(rivid) is not int:
raise TypeError(f'rivid should be an integer ID of a river to mass balance')
if type(river_id) is not int:
raise TypeError(f'river_id should be an integer ID of a river to mass balance')
if ancestors is None:
G = connectivity_to_digraph(self.conf['connectivity_file'])
ancestors = list(nx.ancestors(G, rivid))
with xr.open_mfdataset(self.conf['runoff_file']) as ds:
ancestors = set(list(nx.ancestors(G, river_id)) + [river_id, ])
with xr.open_mfdataset(self.conf['runoff_volumes_file']) as ds:
vdf = (
ds
.sel(rivid=ancestors)
.m3_riv
.sel(**{self.conf['var_river_id']: ancestors})
[self.conf['var_runoff_volume']]
.to_dataframe()
[['m3_riv', ]]
[[self.conf['var_runoff_volume'], ]]
.reset_index()
.pivot(index='time', columns='rivid', values='m3_riv')
.pivot(index='time', columns=self.conf['var_river_id'], values=self.conf['var_runoff_volume'])
.sum(axis=1)
.cumsum()
.rename('runoff_volume')
)
with xr.open_mfdataset(self.conf['outflow_file']) as ds:
qdf = ds.sel(rivid=rivid).to_dataframe()[['Qout', ]].cumsum()
qdf = (
ds
.sel(**{self.conf['var_river_id']: river_id})
.to_dataframe()
[[self.conf['var_discharge'], ]]
.cumsum()
)
# convert to discharged volume - multiply by the time delta in seconds
qdf = qdf * (qdf.index[1] - qdf.index[0]).total_seconds()
qdf.columns = ['discharge_volume', ]

df = qdf.join(vdf)
df['runoff-discharge'] = df['runoff_volume'] - df['discharge_volume']
if not df['runoff-discharge'].gt(0).all():
LOG.warning(f'More discharge than runoff volume for river {rivid}')
LOG.warning(f'More discharge than runoff volume for river {river_id}')

return df

Expand Down
15 changes: 5 additions & 10 deletions river_route/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import river_route.runoff
import river_route.tools
from river_route._MuskingumCunge import MuskingumCunge

from river_route.tools import routing_files_from_RAPID
from river_route.tools import connectivity_to_digraph
from river_route.tools import connectivity_to_adjacency_matrix

from ._meta import __version__
from ._meta import __author__
from ._meta import __url__
from .__metadata__ import __version__, __author__, __url__

__all__ = [
'MuskingumCunge',

'routing_files_from_RAPID',
'connectivity_to_digraph',
'connectivity_to_adjacency_matrix',
'runoff',
'tools',

'__version__',
'__author__',
Expand Down
2 changes: 1 addition & 1 deletion river_route/_meta.py → river_route/__metadata__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.6.0'
__version__ = '0.7.0'
__author__ = 'Riley Hales PhD'
__url__ = 'https://github.com/rileyhales/river-route'
Loading

0 comments on commit b8f03ae

Please sign in to comment.