diff --git a/README.md b/README.md index 54978b9..83667e6 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Install river-route from source using conda/mamba such as with the following com ```bash git clone https://github.com/rileyhales/river-route cd river-route -mamba env create -f environment.yml +mamba env create -f environment.yaml mamba activate rr python setup.py install ``` @@ -25,7 +25,7 @@ import river_route as rr ( rr - .Muskingum('/path/to/config.yml') + .MuskingumCunge('/path/to/config.yaml') .route() ) ``` @@ -37,7 +37,7 @@ import river_route as rr ( rr - .Muskingum(**{ + .MuskingumCunge(**{ 'routing_params_file': '/path/to/routing_params.parquet', 'connectivity_file': '/path/to/connectivity.parquet', 'runoff_file': '/path/to/runoff.nc', @@ -125,7 +125,7 @@ graph LR a[Calculate LHS & LHS-1] --> b b[Divide Runoff Data by \n Number of Steps] --> c c[Iterate On Runoff Intervals] --> d - d[Solving Matrix \n Muskingum] --> e + d[Solving Matrix \n MuskingumCunge] --> e e[Enforce Min and Max Q] --> f & c f[Write Outflows to Disk] --> g g[Cache Final State] @@ -175,8 +175,8 @@ index. | Column | Data Type | Description | |--------|-----------|---------------------------------------------------------------------------| | rivid | integer | Unique ID of a river segment | -| k | float | the k parameter of the Muskingum Cunge routing equation length / velocity | -| x | float | the x parameter of the Muskingum Cunge routing equation. x : [0, 0.5] | +| k | float | the k parameter of the MuskingumCunge Cunge routing equation length / velocity | +| x | float | the x parameter of the MuskingumCunge Cunge routing equation. x : [0, 0.5] | ### Connectivity File diff --git a/config_files/config.json b/config_files/config.json index 8ad23d3..baf3413 100644 --- a/config_files/config.json +++ b/config_files/config.json @@ -3,17 +3,15 @@ "connectivity_file": "", "runoff_file": "", "outflow_file": "", + "routing": "", "lhs_file": "", "lhsinv_file": "", "adj_file": "", "dt_routing": "", "dt_outflows": "", - "min_q": "", - "max_q": "", - "qinit_file": "", - "rinit_file": "", - "qfinal_file": "", - "rfinal_file": "", + "positive_flow": true, + "initial_state_file": "", + "final_state_file": "", "log": false, "log_stream": "", "log_level": "", diff --git a/config_files/config.yaml b/config_files/config.yaml new file mode 100644 index 0000000..dc10ecb --- /dev/null +++ b/config_files/config.yaml @@ -0,0 +1,21 @@ +# Required Watershed Files +routing_params_file: '' +connectivity_file: '' +adj_file: '' # Optional - if it does not exist, it will be cached at this location +# Routing Input and Output +runoff_file: '' +outflow_file: '' +# Compute Options - Optional +routing: 'linear' +positive_flow: True +dt_routing: 0 +dt_outflows: 0 +# initial and final state files - Optional +initial_state_file: '' +final_state_file: '' +# simulation management and debugging - Optional +log: False +progress_bar: False +log_level: 'DEBUG' +log_stream: '' +job_name: '' diff --git a/config_files/config.yml b/config_files/config.yml deleted file mode 100644 index 561f62f..0000000 --- a/config_files/config.yml +++ /dev/null @@ -1,25 +0,0 @@ -# Required Input/Output Files -routing_params_file: '' -connectivity_file: '' -runoff_file: '' -outflow_file: '' -# cache routing matrices - Optional -lhs_file: '' -lhs_inv_file: '' -adj_file: '' -# Compute Options -dt_routing: 300 -dt_outflows: 0 -min_q: 0 -max_q: None -# initial and final state files - Optional -qinit_file: '' -rinit_file: '' -qfinal_file: '' -rfinal_file: '' -# simulation management and debugging - Optional -log: False -log_level: 'INFO' -log_stream: '' -job_name: '' -progress_bar: True diff --git a/config_files/descriptions.csv b/config_files/descriptions.csv index 08e1416..9be1b5b 100644 --- a/config_files/descriptions.csv +++ b/config_files/descriptions.csv @@ -8,13 +8,11 @@ lhsinv_file,string,Path where the LHS inverse matrix should be cached. adj_file,string,Path where the adjacency matrix should be cached. dt_routing,number,Time interval in seconds between routing computations. dt_outflows,number,Time interval in seconds between writing flows to disc. -min_q,number,Minimum flow value allowed after each routing computation. -max_q,number,Maximum flow value allowed after each routing computation. -qinit_file,string,Path to the initial flows file. -rinit_file,string,Path to the initial runoff file. -qfinal_file,string,Path where the final flows file should be saved. -rfinal_file,string,Path where the final runoff file should be saved. -log,boolean,whether or not to display log messages defaulting to False +routing,string,Either 'linear' or 'nonlinear' routing- default 'linear'. +positive_flows,boolean,Force minimum flow value to be >= 0. +initial_state_file,string,Path to the file with initial state values. +final_state_file,string,Path to the file with final state values. +log,boolean,whether to display log messages defaulting to False log_stream,string,the destination for logged messages: stdout stderr or a file path. default to stdout log_level,string,Level of logging: either 'debug' 'info' 'warning' 'error' or 'critical'. job_name,string,A name for this job to be printed in debug statements. diff --git a/environment.yml b/environment.yaml similarity index 94% rename from environment.yml rename to environment.yaml index 850555e..83ea1ce 100644 --- a/environment.yml +++ b/environment.yaml @@ -6,6 +6,7 @@ dependencies: - dask - fastparquet - matplotlib + - natsort - netcdf4 - networkx - numpy diff --git a/requirements.txt b/requirements.txt index 53a5591..58fe623 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ dask fastparquet matplotlib +natsort netcdf4 networkx numpy diff --git a/river_route/_muskingum.py b/river_route/_MuskingumCunge.py similarity index 65% rename from river_route/_muskingum.py rename to river_route/_MuskingumCunge.py index c888ab4..f035eeb 100644 --- a/river_route/_muskingum.py +++ b/river_route/_MuskingumCunge.py @@ -17,25 +17,26 @@ from .tools import connectivity_to_adjacency_matrix from .tools import connectivity_to_digraph -__all__ = ['Muskingum', ] +__all__ = ['MuskingumCunge', ] LOG = logging.getLogger('river_route') LOG.disabled = True -class Muskingum: +class MuskingumCunge: # Given configs conf: dict + log: logging.Logger # Routing matrices A: scipy.sparse.csc_matrix lhs: scipy.sparse.csc_matrix - lhsinv: scipy.sparse.csc_matrix + k: np.array + x: np.array c1: np.array c2: np.array c3: np.array - qinit: np.array - rinit: np.array + initial_state: (np.array, np.array) # Q init, R init # Time options dt_total: float @@ -47,9 +48,6 @@ class Muskingum: num_runoff_steps_per_outflow: int def __init__(self, config_file: str = None, **kwargs, ): - """ - Implements Matrix Muskingum routing - """ self.set_configs(config_file, **kwargs) return @@ -87,10 +85,19 @@ def set_configs(self, config_file, **kwargs) -> None: self.conf['log'] = bool(self.conf.get('log', False)) self.conf['progress_bar'] = self.conf.get('progress_bar', self.conf['log']) self.conf['runoff_volume_var'] = self.conf.get('runoff_volume_var', 'm3_riv') - self.conf['dt_routing'] = self.conf.get('dt_routing', 60) self.conf['log_level'] = self.conf.get('log_level', 'INFO') - self.conf['min_q'] = self.conf.get('min_q', False) - self.conf['max_q'] = self.conf.get('max_q', False) + + # routing and solver options - time is validated at route time + self.conf['routing'] = self.conf.get('routing', 'linear') + assert self.conf['routing'] in ['linear', 'nonlinear'], 'Routing method not recognized' + self.conf['positive_flow'] = self.conf.get('positive_flow', True) + + # check that the routing params files are given depending on the routing method + if self.conf['routing'] == 'nonlinear': + assert 'nonlinear_routing_params_file' in self.conf, 'Nonlinear routing requires nonlinear routing params' + assert 'nonlinear_thresholds_file' in self.conf, 'Nonlinear routing requires nonlinear thresholds' + else: + assert 'routing_params_file' in self.conf, 'Linear routing requires linear routing params' # type and path checking on file paths if isinstance(self.conf['runoff_file'], str): @@ -103,9 +110,10 @@ def set_configs(self, config_file, **kwargs) -> None: elif isinstance(self.conf[arg], str): self.conf[arg] = os.path.abspath(self.conf[arg]) + # Enable logging options if self.conf['log']: LOG.disabled = False - LOG.setLevel(self.conf.get('log_level', 'DEBUG')) + LOG.setLevel(self.conf.get('log_level', 'INFO')) log_destination = self.conf.get('log_stream', 'stdout') log_format = self.conf.get('log_format', '%(asctime)s - %(levelname)s - %(message)s') if log_destination == 'stdout': @@ -124,9 +132,14 @@ def _validate_configs(self) -> None: 'runoff_file', 'outflow_file', ] paths_should_exist = ['connectivity_file', ] - required_time_opts = ['dt_routing', ] - for arg in required_file_paths + required_time_opts: + if self.conf['routing'] == 'linear': + required_file_paths.append('routing_params_file') + else: + required_file_paths.append('nonlinear_routing_params_file') + required_file_paths.append('nonlinear_thresholds_file') + + for arg in required_file_paths: if arg not in self.conf: raise ValueError(f'{arg} not found in configs') for arg in paths_should_exist: @@ -139,46 +152,56 @@ def _validate_configs(self) -> None: def _log_configs(self) -> None: LOG.debug(f'river-route version: {VERSION}') + LOG.debug(f'Number of Rivers: {self._read_river_ids().shape[0]}') LOG.debug('Configs:') for k, v in self.conf.items(): LOG.debug(f'\t{k}: {v}') return - def _read_riverids(self) -> np.array: + def _read_river_ids(self) -> np.array: """ Reads river ids vector from parquet given in config file """ return pd.read_parquet(self.conf['routing_params_file'], columns=['rivid', ]).values.flatten() - def _read_k(self) -> np.array: + def _read_linear_k(self) -> np.array: """ Reads K vector from parquet given in config file """ - return pd.read_parquet(self.conf['routing_params_file'], columns=['k', ]).values.flatten() + if hasattr(self, 'k'): + return self.k + self.k = pd.read_parquet(self.conf['routing_params_file'], columns=['k', ]).values.flatten() + return self.k - def _read_x(self) -> np.array: + def _read_linear_x(self) -> np.array: """ Reads X vector from parquet given in config file """ - return pd.read_parquet(self.conf['routing_params_file'], columns=['x', ]).values.flatten() - - def _read_qinit(self) -> np.array: - if hasattr(self, 'qinit'): - return self.qinit - - qinit = self.conf.get('qinit_file', None) - if qinit is None or qinit == '': - return np.zeros(self.A.shape[0]) - return pd.read_parquet(self.conf['qinit_file']).values.flatten() - - def _read_rinit(self) -> np.array: - if hasattr(self, 'rinit'): - return self.rinit - - rinit = self.conf.get('rinit_file', None) - if rinit is None or rinit == '': - return np.zeros(self.A.shape[0]) - return pd.read_parquet(self.conf['rinit_file']).values.flatten() + if hasattr(self, 'x'): + return self.x + self.x = pd.read_parquet(self.conf['routing_params_file'], columns=['x', ]).values.flatten() + return self.x + + def _read_initial_state(self) -> (np.array, np.array): + if hasattr(self, 'initial_state'): + return self.initial_state + + state_file = self.conf.get('initial_state_file', '') + if state_file == '': + LOG.debug('Setting initial state to zeros') + return np.zeros(self.A.shape[0]), np.zeros(self.A.shape[0]) + assert os.path.exists(state_file), FileNotFoundError(f'Initial state file not found at: {state_file}') + LOG.debug('Reading Initial State from Parquet') + initial_state = pd.read_parquet(state_file).values + initial_state = (initial_state[:, 0].flatten(), initial_state[:, 1].flatten()) + self.initial_state = initial_state + + def _write_final_state(self) -> None: + final_state_file = self.conf.get('final_state_file', '') + if final_state_file == '': + return + LOG.debug('Writing Final State to Parquet') + pd.DataFrame(self.initial_state, columns=['Q', 'R']).to_parquet(self.conf['final_state_file']) def _get_digraph(self) -> nx.DiGraph: """ @@ -194,66 +217,33 @@ def _set_adjacency_matrix(self) -> None: return if os.path.exists(self.conf.get('adj_file', '')): - LOG.info('Loading adjacency matrix from file') + LOG.debug('Loading adjacency matrix from file') self.A = scipy.sparse.load_npz(self.conf['adj_file']) return - LOG.info('Calculating Network Adjacency Matrix (A)') + LOG.debug('Calculating Network Adjacency Matrix (A)') self.A = connectivity_to_adjacency_matrix(self.conf['connectivity_file']) if self.conf.get('adj_file', ''): LOG.info('Saving adjacency matrix to file') scipy.sparse.save_npz(self.conf['adj_file'], self.A) return - def _set_lhs_matrix(self) -> None: - """ - Calculate the LHS matrix for the routing problem - """ - if hasattr(self, 'lhs'): - return - - if os.path.exists(self.conf.get('lhs_file', '')): - LOG.info('Loading LHS matrix from file') - self.lhs = scipy.sparse.load_npz(self.conf['lhs_file']) - return - - LOG.info('Calculating LHS Matrix') - self.lhs = scipy.sparse.eye(self.A.shape[0]) - scipy.sparse.diags(self.c2) @ self.A + def _calculate_lhs_matrix(self) -> None: + self.lhs = scipy.sparse.eye(self.A.shape[0]) - (scipy.sparse.diags(self.c2) @ self.A) self.lhs = self.lhs.tocsc() - if self.conf.get('lhs_file', ''): - LOG.info('Saving LHS matrix to file') - scipy.sparse.save_npz(self.conf['lhs_file'], self.lhs) - return - - def _set_lhs_inv_matrix(self) -> None: - """ - Calculate the LHS matrix for the routing problem - """ - if hasattr(self, 'lhsinv'): - return - - if os.path.exists(self.conf.get('lhsinv_file', '')): - LOG.info('Loading LHS Inverse matrix from file') - self.lhsinv = scipy.sparse.load_npz(self.conf['lhsinv_file']) - return - - self._set_lhs_matrix() - LOG.info('Inverting LHS Matrix') - self.lhsinv = scipy.sparse.csc_matrix(scipy.sparse.linalg.inv(self.lhs)) - if self.conf.get('lhsinv_file', ''): - LOG.info('Saving LHS Inverse matrix to file') - scipy.sparse.save_npz(self.conf['lhsinv_file'], self.lhsinv) return def _set_time_params(self, dates: np.array): """ Set time parameters for the simulation """ - LOG.info('Setting and validating time parameters') - self.dt_routing = self.conf['dt_routing'] + LOG.debug('Setting and validating time parameters') self.dt_runoff = self.conf.get('dt_runoff', (dates[1] - dates[0]).astype('timedelta64[s]').astype(int)) self.dt_outflow = self.conf.get('dt_outflow', self.dt_runoff) self.dt_total = self.conf.get('dt_total', self.dt_runoff * dates.shape[0]) + if self.conf.get('dt_routing', 0): + LOG.warning('dt_routing was not provided or is Null/False, defaulting to dt_runoff') + self.dt_routing = self.conf.get('dt_routing', self.dt_runoff) try: # check that time options have the correct sizes @@ -261,7 +251,6 @@ def _set_time_params(self, dates: np.array): assert self.dt_total >= self.dt_outflow, 'dt_total !>= dt_outflow' assert self.dt_outflow >= self.dt_runoff, 'dt_outflow !>= dt_runoff' assert self.dt_runoff >= self.dt_routing, 'dt_runoff !>= dt_routing' - # check that time options are evenly divisible assert self.dt_total % self.dt_runoff == 0, 'dt_total must be an integer multiple of dt_runoff' assert self.dt_total % self.dt_outflow == 0, 'dt_total must be an integer multiple of dt_outflow' @@ -277,28 +266,49 @@ def _set_time_params(self, dates: np.array): self.num_routing_steps = int(self.dt_total / self.dt_routing) return - def _calculate_muskingum_coefficients(self) -> None: + def _calculate_muskingum_coefficients(self, k: np.ndarray = None, x: np.ndarray = None) -> None: """ - Calculate the 3 Muskingum Cunge routing coefficients for each segment using given k and x + Calculate the 3 MuskingumCunge routing coefficients for each segment using given k and x """ - LOG.info('Calculating Muskingum coefficients') + LOG.debug('Calculating MuskingumCunge coefficients') + + if not hasattr(self, 'k'): + self._read_linear_k() + if not hasattr(self, 'x'): + self._read_linear_x() + + dt_div_k = self.dt_routing / k + denom = dt_div_k + (2 * (1 - x)) + _2x = 2 * x + self.c1 = (dt_div_k + _2x) / denom + self.c2 = (dt_div_k - _2x) / denom + self.c3 = ((2 * (1 - x)) - dt_div_k) / denom + + # sum of coefficients should be 1 (or arbitrarily close) for all segments + assert np.allclose(self.c1 + self.c2 + self.c3, 1), 'MuskingumCunge coefficients do not sum to 1' + return - k = self._read_k() - x = self._read_x() + def _calculate_nonlinear_coefficients(self, q_t: np.array) -> None: + if not hasattr(self, 'nonlinear_thresholds'): + self.nonlinear_thresholds = pd.read_parquet(self.conf['nonlinear_thresholds_file']) + threshold_columns = sorted(self.nonlinear_thresholds.columns) + if not np.any(q_t > self.nonlinear_thresholds[threshold_columns[0]]): + return + + if not hasattr(self, 'nonlinear_k_table'): + self.nonlinear_k_table = pd.read_parquet(self.conf['nonlinear_routing_params_file']) - dt_route_half = self.conf['dt_routing'] / 2 - kx = k * x - denom = k - kx + dt_route_half + k = self._read_linear_k() - self.c1 = (dt_route_half - kx) / denom - self.c2 = (dt_route_half + kx) / denom - self.c3 = (k - kx - dt_route_half) / denom + for threshold in threshold_columns: # small to large + values_to_change = (q_t > self.nonlinear_thresholds[threshold]).values + k[values_to_change] = self.nonlinear_k_table[f'k_{threshold}'][values_to_change] - # sum of muskingum coefficients should be 1 for all segments - assert np.allclose(self.c1 + self.c2 + self.c3, 1), 'Muskingum coefficients do not approximately sum to 1' + self._calculate_muskingum_coefficients(k=k) + self._calculate_lhs_matrix() return - def route(self, **kwargs) -> 'Muskingum': + def route(self, **kwargs) -> 'MuskingumCunge': """ Performs time-iterative runoff routing through the river network @@ -306,7 +316,7 @@ def route(self, **kwargs) -> 'Muskingum': **kwargs: optional keyword arguments to override and update previously calculated or given configs Returns: - river_route.Muskingum + river_route.MuskingumCunge """ LOG.info(f'Beginning routing: {self.conf["job_name"]}') t1 = datetime.datetime.now() @@ -318,27 +328,22 @@ def route(self, **kwargs) -> 'Muskingum': self._validate_configs() self._log_configs() self._set_adjacency_matrix() - self._calculate_muskingum_coefficients() + LOG.debug('Getting initial value arrays') for runoff_file, outflow_file in zip(self.conf['runoff_file'], self.conf['outflow_file']): + LOG.info('-' * 80) LOG.info(f'Reading runoff volumes file: {runoff_file}') with xr.open_dataset(runoff_file) as runoff_ds: - LOG.info('Reading time array') + LOG.debug('Reading time array') dates = runoff_ds['time'].values.astype('datetime64[s]') - LOG.info('Reading runoff array') + LOG.debug('Reading runoff array') runoffs = runoff_ds[self.conf['runoff_volume_var']].values self._set_time_params(dates) + self._calculate_muskingum_coefficients() runoffs = runoffs / self.dt_runoff # convert volume -> volume/time - - LOG.debug('Getting initial value arrays') - q_t = self._read_qinit() - r_t = self._read_rinit() - inflow_t = (self.A @ q_t) + r_t - - # begin the analytical solution - self._set_lhs_inv_matrix() - outflow_array = self._analytical_solution(dates, runoffs, q_t, r_t, inflow_t) + q_t, r_t = self._read_initial_state() + outflow_array = self._router(dates, runoffs, q_t, r_t) if self.dt_outflow > self.dt_runoff: LOG.info('Resampling dates and outflows to specified timestep') @@ -357,27 +362,25 @@ def route(self, **kwargs) -> 'Muskingum': self._write_outflows(outflow_file, dates, outflow_array) # write the final state to disc - if self.conf.get('qfinal_file', False): - LOG.info('Writing Qfinal parquet') - pd.DataFrame(self.qinit, columns=['Q', ]).astype(float).to_parquet(self.conf['qfinal_file']) - if self.conf.get('rfinal_file', False): - LOG.info('Writing Rfinal parquet') - pd.DataFrame(self.rinit, columns=['R', ]).astype(float).to_parquet(self.conf['rfinal_file']) + self._write_final_state() t2 = datetime.datetime.now() LOG.info('All runoff files routed') LOG.info(f'Total job time: {(t2 - t1).total_seconds()}') return self - def _analytical_solution(self, - dates: np.array, - runoffs: np.array, - q_t: np.array, - r_t: np.array, - inflow_t: np.array, ) -> np.array: + def _router(self, + dates: np.array, + runoffs: np.array, + q_init: np.array, + r_init: np.array, ) -> np.array: + # set initial values + self._calculate_lhs_matrix() outflow_array = np.zeros((runoffs.shape[0], self.A.shape[0])) - - force_min_max = self.conf['min_q'] or self.conf['max_q'] + q_t = np.zeros_like(q_init) + q_t[:] = q_init + r_prev = np.zeros_like(r_init) + r_prev[:] = r_init LOG.info('Performing routing computation iterations') t1 = datetime.datetime.now() @@ -386,28 +389,32 @@ def _analytical_solution(self, for runoff_time_step, runoff_end_date in enumerate(dates): r_t = runoffs[runoff_time_step, :] interval_flows = np.zeros((self.num_routing_steps_per_runoff, self.A.shape[0])) - for routing_substep_iteration in range(self.num_routing_steps_per_runoff): - inflow_tnext = (self.A @ q_t) + r_t - q_t = self.lhsinv @ ((self.c1 * inflow_t) + (self.c2 * r_t) + (self.c3 * q_t)) - q_t = q_t if not force_min_max else np.clip(q_t, a_min=self.conf['min_q'], a_max=self.conf['max_q']) - interval_flows[routing_substep_iteration, :] = q_t - inflow_t = inflow_tnext - interval_flows = np.mean(interval_flows, axis=0) - interval_flows = np.round(interval_flows, decimals=2) - outflow_array[runoff_time_step, :] = interval_flows + for routing_sub_iteration_num in range(self.num_routing_steps_per_runoff): + if self.conf['routing'] == 'nonlinear': + self._calculate_nonlinear_coefficients(q_t) + rhs = (self.c1 * ((self.A @ q_t) + r_prev)) + (self.c2 * r_t) + (self.c3 * q_t) + q_t[:] = self._solver(rhs) + interval_flows[routing_sub_iteration_num, :] = q_t + outflow_array[runoff_time_step, :] = np.mean(interval_flows, axis=0) + r_prev[:] = r_t + + if self.conf['positive_flow']: + outflow_array[outflow_array < 0] = 0 + t2 = datetime.datetime.now() LOG.info(f'Routing completed in {(t2 - t1).total_seconds()} seconds') - - self.qinit = q_t - self.rinit = r_t + self.initial_state = (q_t, r_prev) return outflow_array + def _solver(self, rhs: np.array) -> np.array: + return scipy.sparse.linalg.cgs(self.lhs, rhs, x0=rhs, )[0] + def _write_outflows(self, outflow_file: str, dates: np.array, outflow_array: np.array) -> None: reference_date = datetime.datetime.fromtimestamp(dates[0].astype(int), tz=datetime.timezone.utc) dates = dates[::self.num_runoff_steps_per_outflow].astype('datetime64[s]') dates = dates - dates[0] - with nc.Dataset(outflow_file, mode='w') as ds: + with nc.Dataset(outflow_file, mode='w', format='NETCDF4') as ds: ds.createDimension('time', size=dates.shape[0]) ds.createDimension('rivid', size=self.A.shape[0]) @@ -416,7 +423,7 @@ def _write_outflows(self, outflow_file: str, dates: np.array, outflow_array: np. ds['time'][:] = dates ds.createVariable('rivid', 'i4', ('rivid',)) - ds['rivid'][:] = self._read_riverids() + ds['rivid'][:] = self._read_river_ids() ds.createVariable('Qout', 'f4', ('time', 'rivid')) ds['Qout'][:] = outflow_array @@ -426,19 +433,19 @@ def _write_outflows(self, outflow_file: str, dates: np.array, outflow_array: np. ds['Qout'].units = 'm3 s-1' return - def hydrograph(self, rivid: int) -> pd.DataFrame: + def hydrograph(self, river_id: int) -> pd.DataFrame: """ Get the hydrograph for a given river id as a pandas dataframe Args: - rivid: the ID of a river reach in the output files + river_id: the ID of a river reach in the output files Returns: pandas.DataFrame """ with xr.open_mfdataset(self.conf['outflow_file']) as ds: - df = ds.Qout.sel(rivid=rivid).to_dataframe()[['Qout', ]] - df.columns = [rivid, ] + df = ds.Qout.sel(rivid=river_id).to_dataframe()[['Qout', ]] + df.columns = [river_id, ] return df def mass_balance(self, rivid: int, ancestors: list = None) -> pd.DataFrame: @@ -465,8 +472,7 @@ def mass_balance(self, rivid: int, ancestors: list = None) -> pd.DataFrame: .to_dataframe() [['m3_riv', ]] .reset_index() - .set_index('time') - .pivot(columns='rivid', values='m3_riv') + .pivot(index='time', columns='rivid', values='m3_riv') .sum(axis=1) .cumsum() .rename('runoff_volume') diff --git a/river_route/__init__.py b/river_route/__init__.py index 720ded2..acd51f3 100644 --- a/river_route/__init__.py +++ b/river_route/__init__.py @@ -1,6 +1,6 @@ -from river_route._muskingum import Muskingum +from river_route._MuskingumCunge import MuskingumCunge -from river_route.tools import configs_from_rapid +from river_route.tools import routing_files_from_RAPID from river_route.tools import connectivity_to_digraph from river_route.tools import connectivity_to_adjacency_matrix @@ -9,9 +9,9 @@ from ._meta import __url__ __all__ = [ - 'Muskingum', + 'MuskingumCunge', - 'configs_from_rapid', + 'routing_files_from_RAPID', 'connectivity_to_digraph', 'connectivity_to_adjacency_matrix', diff --git a/river_route/_cli.py b/river_route/_cli.py index c5d240a..71da019 100644 --- a/river_route/_cli.py +++ b/river_route/_cli.py @@ -54,5 +54,5 @@ def main(): kwargs = vars(args) kwargs.pop('config') - rr.Muskingum(args.config, **kwargs).route() + rr.MuskingumCunge(args.config, **kwargs).route() return diff --git a/river_route/_meta.py b/river_route/_meta.py index d914541..c9605fc 100644 --- a/river_route/_meta.py +++ b/river_route/_meta.py @@ -1,3 +1,3 @@ -__version__ = '0.5.0' +__version__ = '0.6.0' __author__ = 'Riley Hales PhD' __url__ = 'https://github.com/rileyhales/river-route' diff --git a/river_route/jobs.py b/river_route/jobs.py new file mode 100644 index 0000000..1a4b7ac --- /dev/null +++ b/river_route/jobs.py @@ -0,0 +1,135 @@ +from typing import List, Dict, Any +import json +from os.path import join, basename, isdir +import glob +from natsort import natsorted +import pandas as pd + +__all__ = [ + 'job_configs', +] + + +def job_configs( + routing_params_file: str, + connectivity_file: str, + adj_file: str, + runoff_file: str or List[str], + outflow_file: str or List[str], + dt_routing: int = 0, + dt_outflows: int = 0, + positive_flow: bool = True, + routing: str = 'linear', + nonlinear_routing_params_file: str = '', + nonlinear_thresholds_file: str = '', + initial_state_file: str = '', + final_state_file: str = '', + log: bool = True, + log_stream: str = 'stdout', + log_level: str = 'INFO', + job_name: str = 'untitled_job', + progress_bar: bool = True, +): + """ + Constructs a dictionary with all necessary parameters for a river routing job + + Args: + routing_params_file: (string), Path to the routing parameters file. + connectivity_file: (string), Path to the network connectivity file. + adj_file: (string), Path where the adjacency matrix should be cached. + runoff_file: (string), Path to the file with runoff values to be routed. + outflow_file: (string), Path where the outflows file should be saved. + dt_routing: (int), Time interval in seconds between routing computations. + dt_outflows: (int), Time interval in seconds between writing flows to disc. + positive_flow: (boolean), Whether to enforce positive flows. + routing: (string), Routing method to use: either 'linear' or 'nonlinear'. + nonlinear_routing_params_file: (string), Path to the nonlinear routing parameters file. + nonlinear_thresholds_file: (string), Path to the nonlinear routing thresholds file. + initial_state_file: (string), Path to the file with initial state values. + final_state_file: (string), Path to the file with final state values. + log: (boolean), whether to display log messages defaulting to False + log_stream: (string), the destination for logged messages: stdout stderr or a file path. default to stdout + log_level: (string), Level of logging: either 'debug' 'info' 'warning' 'error' or 'critical'. + job_name: (string), A name for this job to be printed in debug statements. + progress_bar: (boolean), Indicates whether to show a progress bar in debug statements: true or false. + + Returns: + (dict), A dictionary with parameters to be passed to MuskingumCunge class init. + """ + if routing == 'nonlinear': + assert nonlinear_routing_params_file and nonlinear_thresholds_file, \ + 'Nonlinear routing requires a nonlinear routing parameters file and a nonlinear thresholds file' + return { + # Required Watershed Files + 'routing_params_file': routing_params_file, + 'connectivity_file': connectivity_file, + 'adj_file': adj_file, + # Routing Input and Output + 'runoff_file': runoff_file, + 'outflow_file': outflow_file, + # Compute Options - Optional + 'positive_flow': positive_flow, + 'dt_routing': dt_routing, + 'dt_outflows': dt_outflows, + # Routing Method - Optional + 'routing': routing, + 'nonlinear_routing_params_file': nonlinear_routing_params_file, + 'nonlinear_thresholds_file': nonlinear_thresholds_file, + # initial and final state files - Optional + 'initial_state_file': initial_state_file, + 'final_state_file': final_state_file, + # simulation management and debugging - Optional + 'log': log, + 'progress_bar': progress_bar, + 'log_level': log_level, + 'log_stream': log_stream, + 'job_name': job_name, + } + + +def job_files_from_directories( + vpu_dir: str, inflows_dir: str, jobs_dir: str, outputs_dir: str, states_dir: str, logs_dir: str = None, + initial_state_file: str = None, sim_type: str = 'sequential', dt_routing: int = 0, dt_outflows: int = 0, + **kwargs +) -> List[Dict[str, Any]]: + """ + Generate river-route job files for all VPUs and for all runoff files in given directories + """ + vpus = natsorted(glob.glob(join(vpu_dir, '*'))) + vpus = [basename(x) for x in vpus if isdir(x)] + + jobs = [] + + for vpu in vpus: + routing_params_file = join(vpu_dir, vpu, 'params.parquet') + connectivity_file = join(vpu_dir, vpu, 'connectivity.parquet') + adj_file = join(vpu_dir, vpu, 'adj.npz') + runvol_files = natsorted(glob.glob(join(inflows_dir, vpu, '*.nc'))) + output_files = [join(outputs_dir, basename(f).replace('m3_', 'Qout_')) for f in runvol_files] + + if sim_type == 'sequential': + start_date = pd.to_datetime(basename(runvol_files[0]).split('_')[2], format='%Y%m%d') + final_state = join(states_dir, vpu, f'finalstate_{vpu}_{start_date}.parquet') + jobs.append( + job_configs(routing_params_file=routing_params_file, connectivity_file=connectivity_file, + adj_file=adj_file, runoff_file=runvol_files, outflow_file=output_files, + dt_routing=dt_routing, dt_outflows=dt_outflows, initial_state_file=initial_state_file, + final_state_file=final_state, job_name=f'job_{vpu}_{start_date}_{sim_type}', **kwargs) + ) + + elif sim_type == 'ensemble': + for runvol_file, output_file in zip(runvol_files, output_files): + start_date = pd.to_datetime(basename(runvol_file).split('_')[2], format='%Y%m%d') + final_state = join(states_dir, vpu, f'finalstate_{vpu}_{start_date}.parquet') + jobs.append( + job_configs(routing_params_file=routing_params_file, connectivity_file=connectivity_file, + adj_file=adj_file, runoff_file=runvol_file, outflow_file=output_file, + dt_routing=dt_routing, dt_outflows=dt_outflows, initial_state_file=initial_state_file, + final_state_file=final_state, job_name=f'job_{vpu}_{start_date}_{sim_type}', **kwargs) + ) + + for job in jobs: + with open(join(jobs_dir, f'{job["job_name"]}.json'), 'w') as f: + json.dump(job, f) + + return jobs diff --git a/river_route/tools.py b/river_route/tools.py index 8f6c3b6..82028ee 100644 --- a/river_route/tools.py +++ b/river_route/tools.py @@ -8,18 +8,18 @@ logger = logging.getLogger(__name__) __all__ = [ - 'configs_from_rapid', + 'routing_files_from_RAPID', 'connectivity_to_digraph', 'connectivity_to_adjacency_matrix', ] -def configs_from_rapid(riv_bas_id: str, - k: str, - x: str, - rapid_connect: str, - out_params: str, - out_connectivity: str, ) -> None: +def routing_files_from_RAPID(riv_bas_id: str, + k: str, + x: str, + rapid_connect: str, + out_params: str, + out_connectivity: str, ) -> None: """ Generate river-route configuration files from input files for RAPID