From 3062ce3dcd239450bb9f6c1c1f9ee754dee19bf3 Mon Sep 17 00:00:00 2001 From: mdtanker Date: Thu, 18 Jul 2024 15:08:52 -0600 Subject: [PATCH] refactor: deprecate `cross_validation.grav_optimal_parameter` in favor of new Optuna-based function `optimization.optimize_inversion_damping`. BREAKING CHANGE: please switch to the new function --- src/invert4geom/cross_validation.py | 29 +++- src/invert4geom/inversion.py | 72 ++++---- src/invert4geom/optimization.py | 260 ++++++++++++++++++++-------- tests/test_cross_validation.py | 4 + 4 files changed, 252 insertions(+), 113 deletions(-) diff --git a/src/invert4geom/cross_validation.py b/src/invert4geom/cross_validation.py index c2408749..b5dc5f7f 100644 --- a/src/invert4geom/cross_validation.py +++ b/src/invert4geom/cross_validation.py @@ -7,6 +7,7 @@ import random import typing +import deprecation import numpy as np import pandas as pd import verde as vd @@ -208,6 +209,16 @@ def grav_cv_score( return score +@deprecation.deprecated( # type: ignore[misc] + deprecated_in="0.8.0", + removed_in="0.14.0", + current_version=invert4geom.__version__, + details=( + "Use the new function `optimization.optimize_inversion_damping()`" + "instead, which uses Optuna for optimization. If you would still like to " + "conduct a grid search, set `grid_search=True` in the new function.", + ), +) def grav_optimal_parameter( training_data: pd.DataFrame, testing_data: pd.DataFrame, @@ -274,11 +285,17 @@ def grav_optimal_parameter( # run inversions and collect scores scores = [] - pbar = tqdm( - param_values, - desc=f"{param_name} values", - disable=not progressbar, - ) + if progressbar is True: + pbar = tqdm( + param_values, + desc=f"{param_name} values", + ) + elif progressbar is False: + pbar = param_values + else: + msg = "progressbar must be a boolean" # type: ignore[unreachable] + raise ValueError(msg) + for i, value in enumerate(pbar): # update parameter value in kwargs kwargs[param_name] = value @@ -300,7 +317,7 @@ def grav_optimal_parameter( ) logging.warning(msg) if verbose: - # set Python's logging level to get information about the inversion\s progress + # set Python's logging level to get information about the inversion's progress logger = logging.getLogger() logger.setLevel(logging.INFO) diff --git a/src/invert4geom/inversion.py b/src/invert4geom/inversion.py index 7f40ad8d..c1d9ae1b 100644 --- a/src/invert4geom/inversion.py +++ b/src/invert4geom/inversion.py @@ -1133,7 +1133,7 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow in grav_df, if True, must provide`regional_grav_kwargs`, by default False run_damping_cv : bool, optional Choose whether to run cross validation for damping, if True, must supplied - damping values with kwarg `damping_values`, by default False + damping values with kwarg `damping_limits`, by default False run_zref_or_density_cv : bool, optional Choose whether to run cross validation for zref or density, if True, must provide zref values, density values, or both with kwargs `zref_values` or ` @@ -1313,7 +1313,7 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow if calculate_gravity_misfit is False: if "misfit" not in grav_df: msg = ( - "'misfit' must be a column of `grav_df` if calculate_starting_misfit" + "'misfit' must be a column of `grav_df` if calculate_gravity_misfit" " is False" ) raise ValueError(msg) @@ -1330,7 +1330,7 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow # Regional Component of Misfit if calculate_regional_misfit is False: - if "reg" not in grav_df: + if ("reg" not in grav_df) & (run_zref_or_density_cv is False): msg = ( "'reg' must be a column of `grav_df` if calculate_regional_misfit is" " False" @@ -1345,7 +1345,7 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow " calculate_regional_misfit is True" ) logging.warning(msg) - regional_grav_kwargs = kwargs.get("regional_grav_kwargs", None).copy() + regional_grav_kwargs = kwargs.get("regional_grav_kwargs", None) if regional_grav_kwargs is None: msg = ( "regional_grav_kwargs must be provided if calculate_regional_misfit" @@ -1360,8 +1360,6 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow **regional_grav_kwargs, ) - grav_df["res"] = grav_df["misfit"] - grav_df["reg"] - inversion_kwargs = { key: value for key, value in kwargs.items() @@ -1374,17 +1372,24 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow "starting_grav_kwargs", "regional_grav_kwargs", "grav_spacing", - "damping_values", - "zref_values", - "density_contrast_values", + "damping_limits", + "zref_limits", + "density_contrast_limits", "constraints_df", "inversion_region", + "damping_cv_fname", + "damping_cv_trials", + "zref_density_cv_trials", + "zref_density_cv_fname", + "score_as_median", ] } # run only the inversion with specified damping, density, and zref values if (run_damping_cv is False) & (run_zref_or_density_cv is False): - return run_inversion( + grav_df["res"] = grav_df["misfit"] - grav_df["reg"] + + inversion_results = run_inversion( grav_df=grav_df, prism_layer=starting_prisms, progressbar=False, @@ -1399,38 +1404,33 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow return inversion_results if run_damping_cv is True: - # set logging level - logger = logging.getLogger() - logger.setLevel(logging.WARNING) + grav_df["res"] = grav_df["misfit"] - grav_df["reg"] # set which damping parameters to include - damping_values = kwargs.get("damping_values", None) - - inv_results, best_damping, _, _, scores = ( - cross_validation.grav_optimal_parameter( - training_data=grav_df[grav_df.test == False], # noqa: E712 pylint: disable=singleton-comparison - testing_data=grav_df[grav_df.test == True], # noqa: E712 pylint: disable=singleton-comparison - param_to_test=("solver_damping", damping_values), - progressbar=True, - plot_grids=False, - plot_cv=False, - verbose=True, - prism_layer=starting_prisms, - **inversion_kwargs, - ) + damping_limits = kwargs.get("damping_limits", (0.001, 1)) + if damping_limits is None: + msg = "must provide damping_limits if run_damping_cv is True" + raise ValueError(msg) + + study, inversion_results = optimization.optimize_inversion_damping( + training_df=grav_df[grav_df.test == False], # noqa: E712 pylint: disable=singleton-comparison + testing_df=grav_df[grav_df.test == True], # noqa: E712 pylint: disable=singleton-comparison + damping_limits=damping_limits, + n_trials=kwargs.get("damping_cv_trials", 20), + grid_search=kwargs.get("grid_search", False), + plot_grids=False, + fname=kwargs.get("damping_cv_fname", f"{fname}_damping_cv"), + prism_layer=starting_prisms, + score_as_median=kwargs.get("score_as_median", False), + plot_cv=plot_cv, + **inversion_kwargs, ) + best_damping = study.best_params.get("solver_damping") + # use the best damping parameter inversion_kwargs["solver_damping"] = best_damping - if plot_cv is True: - plotting.plot_cv_scores( - scores, - damping_values, - param_name="Damping", - logx=True, - logy=True, - ) if run_zref_or_density_cv is False: if fname is not None: @@ -1474,3 +1474,5 @@ def run_inversion_workflow( # equivalent to monte_carlo_full_workflow # save results to pickle with pathlib.Path(f"{fname}.pickle").open("wb") as f: pickle.dump(inversion_results, f) + + return inversion_results diff --git a/src/invert4geom/optimization.py b/src/invert4geom/optimization.py index c287788f..11925406 100644 --- a/src/invert4geom/optimization.py +++ b/src/invert4geom/optimization.py @@ -323,40 +323,24 @@ def optuna_1job_per_core( ) -class OptimalEqSourceParams: +class OptimalInversionDamping: """ - a class for finding the optimal depth and damping parameters to best fit a set of - equivalent sources to the gravity data. + Objective function to use in an Optuna optimization for finding the optimal damping + regularization value for a gravity inversion. Used within function + `optimize_inversion_damping()`. """ def __init__( self, - coordinates: tuple[ - pd.Series | NDArray, pd.Series | NDArray, pd.Series | NDArray - ], - data: pd.Series | NDArray, damping_limits: tuple[float, float], - depth_limits: tuple[float, float], + fname: str, + plot_grids: bool = False, **kwargs: typing.Any, ) -> None: - """ - Parameters - ---------- - coordinates : tuple[ pd.Series | NDArray, pd.Series | NDArray, - pd.Series | NDArray ] - easting, northing, and upward coordinates of the data - data : pd.Series | NDArray - gravity values - damping_limits : tuple[float, float] - lower and upper bounds for the damping parameter - depth_limits : tuple[float, float] - lower and upper bounds for the depth of the sources - """ - self.coordinates = coordinates - self.data = data + self.fname = fname self.damping_limits = damping_limits - self.depth_limits = depth_limits self.kwargs = kwargs + self.plot_grids = plot_grids def __call__(self, trial: optuna.trial) -> float: """ @@ -374,75 +358,207 @@ def __call__(self, trial: optuna.trial) -> float: "damping", self.damping_limits[0], self.damping_limits[1], - ) - depth = trial.suggest_float( - "depth", - self.depth_limits[0], - self.depth_limits[1], + log=True, ) - return utils.eq_sources_score( - params={"damping": damping, "depth": depth}, - coordinates=self.coordinates, - data=self.data, - **self.kwargs, + new_kwargs = { + key: value + for key, value in self.kwargs.items() + if key + not in [ + "solver_damping", + "progressbar", + "results_fname", + ] + } + + trial.set_user_attr("fname", f"{self.fname}_trial_{trial.number}") + + return cross_validation.grav_cv_score( + solver_damping=damping, + progressbar=False, + results_fname=trial.user_attrs.get("fname"), + plot=self.plot_grids, + **new_kwargs, ) -def optimize_eq_source_params( - coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray, pd.Series | NDArray], - data: pd.Series | NDArray, - n_trials: int = 0, - damping_limits: tuple[float, float] = (0, 10**3), - depth_limits: tuple[float, float] = (0, 10e6), +def optimize_inversion_damping( + training_df: pd.DataFrame, + testing_df: pd.DataFrame, + n_trials: int, + damping_limits: tuple[float, float], + score_as_median: bool = False, sampler: optuna.samplers.BaseSampler | None = None, - parallel: bool = False, + grid_search: bool = False, fname: str | None = None, - use_existing: bool = False, - plot: bool = False, - **eq_kwargs: typing.Any, -) -> tuple[pd.DataFrame, hm.EquivalentSources]: + plot_cv: bool = True, + plot_grids: bool = False, + logx: bool = True, + logy: bool = True, + **kwargs: typing.Any, +) -> tuple[ + optuna.study, tuple[pd.DataFrame, pd.DataFrame, dict[str, typing.Any], float] +]: """ - find the best parameter values for fitting equivalent sources to a set of gravity - data. + Use Optuna to find the optimal damping regularization parameter for a gravity + inversion. The optimization aims to minimize the cross-validation score, + represented by the root mean (or median) squared error (RMSE), between the testing + gravity data, and the predict gravity data after and inversion. Follows methods of + :footcite:t:`uiedafast2017`. + + Provide upper and low damping values, number of trials to run, and specify to let + Optuna choose the best damping value for each trial or to use a grid search. The + results are saved to a pickle file with the best inversion results and the study. Parameters ---------- - coordinates : tuple[pd.Series | NDArray, pd.Series | NDArray, - pd.Series | NDArray] - easting, northing, and upwards coordinates of gravity data - data : pd.Series | NDArray - gravity data values - n_trials : int, optional - number of trials to perform / set of parameters to test, by default 0 - damping_limits : tuple[float, float], optional - lower and upper bounds of damping parameter, by default (0, 10**3) - depth_limits : tuple[float, float], optional - lower and upper bounds of depth parameter, by default (0, 10e6) + training_df : pd.DataFrame + rows of the gravity data frame which are just the training data + testing_df : pd.DataFrame + rows of the gravity data frame which are just the testing data + n_trials : int + number of damping values to try + damping_limits : tuple[float, float] + upper and lower limits + score_as_median : bool, optional + if True, changes the scoring from the root mean square to the root median + square, by default False sampler : optuna.samplers.BaseSampler | None, optional - type of sampler to use, by default None - parallel : bool, optional - if True, will run the trials in parallel, by default False - fname : str, optional - path and filename to save the study results, by default "tmp" with a random - number attached - use_existing : bool, optional - if True, will continue a previously starting optimization, by default False + customize the optuna sampler, by default either BoTorch sampler or GridSampler + depending on if grid_search is True or False + grid_search : bool, optional + search the entire parameter space between damping_limits in n_trial steps, by + default False + fname : str | None, optional + file name to save both study and inversion results to as pickle files, by + default in format `tmp_{random.randint(0,999)}`. + plot_cv : bool, optional + plot the cross-validation results, by default True + plot_grids : bool, optional + for each damping value, plot comparison of predicted and testing gravity data, + by default False + logx : bool, optional + make x axis of CV result plot on log scale, by default True + logy : bool, optional + make y axis of CV result plot on log scale, by default True Returns ------- - tuple[pd.DataFrame, hm.EquivalentSources] - gives a dataframe of the tested parameter sets and associated scores, and the - best resulting fitted equivalent sources. + tuple[optuna.study, tuple[pd.DataFrame, pd.DataFrame, dict[str, typing.Any], float]] + a tuple of the completed optuna study and a tuple of the inversion results: + topography dataframe, gravity dataframe, parameter values and elapsed time. """ - if optuna is None: - msg = "Missing optional dependency 'optuna' required for optimization." - raise ImportError(msg) + + optuna.logging.set_verbosity(optuna.logging.WARN) + + # if sampler not provided, use BoTorch as default unless grid_search is True + if sampler is None: + if grid_search is True: + if n_trials < 4: + msg = ( + "if grid_search is True, n_trials must be at least 4, " + "resetting n_trials to 4 now." + ) + logging.warning(msg) + n_trials = 4 + space = np.logspace( + np.log10(damping_limits[0]), np.log10(damping_limits[1]), n_trials + ) + # omit first and last since they will be enqueued separately + space = space[1:-1] + sampler = optuna.samplers.GridSampler( + search_space={"damping": space}, + seed=10, + ) + else: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", message="BoTorch") + sampler = optuna.integration.BoTorchSampler( + n_startup_trials=int(n_trials / 4), + seed=10, + ) # set file name for saving results with random number between 0 and 999 if fname is None: fname = f"tmp_{random.randint(0,999)}" + study = optuna.create_study( + direction="minimize", + sampler=sampler, + load_if_exists=False, + ) + + # explicitly add the limits as trials + # if grid_search is False: + study.enqueue_trial({"damping": damping_limits[0]}, skip_if_exists=True) + study.enqueue_trial({"damping": damping_limits[1]}, skip_if_exists=True) + + # run optimization + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", message="logei_candidates_func is experimental" + ) + study.optimize( + OptimalInversionDamping( + damping_limits=damping_limits, + rmse_as_median=score_as_median, + training_data=training_df, + testing_data=testing_df, + fname=fname, + plot_grids=plot_grids, + **kwargs, + ), + n_trials=n_trials, + callbacks=[warn_limits_better_than_trial_1_param], + show_progress_bar=True, + ) + + best_trial = study.best_trial + + if best_trial.params.get("damping") in damping_limits: + logging.warning( + "Best damping value (%s) is at the limit of provided values " + "(%s) and thus is likely not a global minimum, expand the range of " + "values tested to ensure the best parameter value is found.", + best_trial.params.get("damping"), + damping_limits, + ) + + logging.info("Trial with lowest score: ") + logging.info("\ttrial number: %s", best_trial.number) + logging.info("\tparameter: %s", best_trial.params) + logging.info("\tscores: %s", best_trial.values) + + # get best inversion result of each set + with pathlib.Path(f"{fname}_trial_{best_trial.number}.pickle").open("rb") as f: + inv_results = pickle.load(f) + + # delete other inversion results + for i in range(n_trials): + if i == best_trial.number: + pass + else: + pathlib.Path(f"{fname}_trial_{i}.pickle").unlink(missing_ok=True) + + # remove if exists + pathlib.Path(fname).unlink(missing_ok=True) + + # save study to pickle + with pathlib.Path(f"{fname}.pickle").open("wb") as f: + pickle.dump(study, f) + + if plot_cv is True: + plotting.plot_cv_scores( + study.trials_dataframe().value.values, + study.trials_dataframe().params_damping.values, + param_name="Damping", + logx=logx, + logy=logy, + ) + return study, inv_results + + class OptimalInversionZrefDensity: """ Objective function to use in an Optuna optimization for finding the optimal values diff --git a/tests/test_cross_validation.py b/tests/test_cross_validation.py index 5782b086..cb139460 100644 --- a/tests/test_cross_validation.py +++ b/tests/test_cross_validation.py @@ -8,3 +8,7 @@ @deprecation.fail_if_not_removed def test_zref_density_optimal_parameter(): cross_validation.zref_density_optimal_parameter() + +@deprecation.fail_if_not_removed +def test_grav_optimal_parameter(): + cross_validation.grav_optimal_parameter()