diff --git a/deea/equilibration.py b/deea/equilibration.py index 60d1e5c..b5d2358 100644 --- a/deea/equilibration.py +++ b/deea/equilibration.py @@ -330,263 +330,3 @@ def detect_equilibration_window( # Return the equilibration time (index), statistical inefficiency, and ESS. return equil_time, equil_g, equil_ess - - -def get_paired_t_p_timeseries( - data: _np.ndarray, - times: _Optional[_np.ndarray] = None, - fractional_block_size: float = 0.125, - fractional_test_end: float = 0.5, - initial_block_size: float = 0.1, - final_block_size: float = 0.5, - t_test_sidedness: str = "two-sided", -) -> _Tuple[_np.ndarray, _np.ndarray]: - """ - Get a timeseries of the p-values from a paired t-test on the differences - between sample means between intial and final portions of the data. The timeseries - is obtained by repeatedly discarding more data from the time series between - calculations of the p-value. - - Parameters - ---------- - data : np.ndarray - The time series data. This should have shape - (n_runs, n_samples) and must have at least two - runs. - - times : np.ndarray, optional - The times at which the data was sampled. If this is - not provided, the indices of the data will be used. - - fractional_block_size : float, optional - The fraction of data to discard between repeats. The default is 0.125. - - fractional_test_end : float, optional - The fraction of the time series to use in the final test. The default is 0.5. - - initial_block_size : float, optional - The fraction of the truncated time series to use for the "before" portion - of the paired t-test. The default is 0.1. - - final_block_size : float, optional - The fraction of the truncated time series to use for the "after" portion - of the paired t-test. The default is 0.5. - - t_test_sidedness : str, optional - The sidedness of the paired t-test. This can be either 'two-sided', 'less', - or 'greater'. The default is 'two-sided'. - - Returns - ------- - np.ndarray - The p-values of the paired t-test. - - np.ndarray - The times at which the p-values were calculated. - """ - # Check that the data is valid. - data = check_data(data, one_dim_allowed=False) - n_runs, n_samples = data.shape - - # Convert times to indices if necessary. - if times is None: - times = _np.arange(n_samples) - - # Check that times is match the number of samples. - if n_samples != len(times): - raise InvalidInputError( - "Times must have the same length as the number of samples." - ) - - # Check that user inputs are valid. - if fractional_block_size <= 0 or fractional_block_size > 1: - raise InvalidInputError("fractional_block_size must be between 0 and 1.") - - if fractional_test_end <= 0 or fractional_test_end > 1: - raise InvalidInputError("fractional_test_end must be between 0 and 1.") - - if round(fractional_test_end / fractional_block_size) < 2: - raise InvalidInputError( - "fractional_test_end must be at least twice the fractional_block_size." - ) - - # Check that fractional test end is a multiple of fractional block size. - if round((fractional_test_end / fractional_block_size) % 1.0, 3) != 0: - raise InvalidInputError( - "fractional_test_end must be a multiple of fractional_block_size." - ) - - if initial_block_size <= 0 or initial_block_size > 1: - raise InvalidInputError("initial_block_size must be between 0 and 1.") - - if final_block_size <= 0 or final_block_size > 1: - raise InvalidInputError("final_block_size must be between 0 and 1.") - - t_test_sidedness = t_test_sidedness.lower() - if t_test_sidedness not in ["two-sided", "less", "greater"]: - raise InvalidInputError( - "t_test_sidedness must be either 'two-sided', 'less', or 'greater'." - ) - - # Calculate the number of repeats. - n_repeats = ( - round(fractional_test_end / fractional_block_size) + 1 - ) # + 1 for the initial block - - # Calculate the number of samples to discard between repeats. - n_discard = round(n_samples * fractional_block_size) - - # Calculate the p values with their indices. - p_vals = _np.zeros(n_repeats) - p_val_indices = _np.zeros(n_repeats, dtype=int) - time_vals = _np.zeros(n_repeats) - - # Loop over and calculate the p values. - for i in range(n_repeats): - # Truncate the data. - idx = n_discard * i - truncated_data = data[:, idx:] - # Get the number of samples in the truncated data. - n_truncated_samples = truncated_data.shape[1] - # Get the initial and final blocks. - initial_block = truncated_data[ - :, : round(n_truncated_samples * initial_block_size) - ].mean(axis=1) - final_block = truncated_data[ - :, -round(n_truncated_samples * final_block_size) : - ].mean(axis=1) - # Compute the paired t-test. - p_vals[i] = ttest_rel(initial_block, final_block, alternative=t_test_sidedness)[ - 1 - ] - p_val_indices[i] = idx - time_vals[i] = times[idx] - - return p_vals, time_vals - - -def detect_equilibration_paired_t_test( - data: _np.ndarray, - times: _Optional[_np.ndarray] = None, - p_threshold: float = 0.05, - fractional_block_size: float = 0.125, - fractional_test_end: float = 0.5, - initial_block_size: float = 0.1, - final_block_size: float = 0.5, - t_test_sidedness: str = "two-sided", - plot: bool = False, - plot_name: _Union[str, Path] = "equilibration_paired_t_test.png", - time_units: str = "ns", - data_y_label: str = r"$\Delta G$ / kcal mol$^{-1}$", -) -> int: - r""" - Detect the equilibration time of a time series by performing a paired - t-test between initial and final portions of the time series. This is repeated - , discarding more data from the time series between repeats. If the p-value - is greater than the threshold, there is no significant evidence that the data is - no equilibrated and the timeseries is taken to be equilibrated at this time - point. This test may be useful when we care only about systematic bias in the - data, and do not care about detecting inter-run differences. - - Parameters - ---------- - data : np.ndarray - The time series data. This should have shape - (n_runs, n_samples) and must have at least two - runs. - - times : np.ndarray, optional - The times at which the data was sampled. If this is - not provided, the indices of the data will be used. - - p_threshold : float, optional - The p-value threshold to use. The default is 0.05. - - fractional_block_size : float, optional - The fraction of data to discard between repeats. The default is 0.125. - - fractional_test_end : float, optional - The fraction of the time series to use in the final test. The default is 0.5. - - initial_block_size : float, optional - The fraction of the truncated time series to use for the "before" portion - of the paired t-test. The default is 0.1. - - final_block_size : float, optional - The fraction of the truncated time series to use for the "after" portion - of the paired t-test. The default is 0.5. - - t_test_sidedness : str, optional - The sidedness of the paired t-test. This can be either 'two-sided', 'less', - or 'greater'. The default is 'two-sided'. - - plot : bool, optional - Whether to plot the p-values. The default is False. - - plot_name : str | Path, optional - The name of the plot file. The default is 'equilibration_paired_t_test.png'. - - time_units : str, optional - The units of time. The default is "ns". - - data_y_label : str, optional - The y-axis label for the time series data. The default is - "$\Delta G$ / kcal mol$^{-1}$". - - Returns - ------- - int - The time point at which the time series is equilibrated. - """ - # Validate dtata. - data = check_data(data, one_dim_allowed=False) - n_runs, n_samples = data.shape - - # If times is None, units of time are indices. - if times is None: - time_units = "index" - # Convert times to indices. - times = _np.arange(n_samples) - - # Check that user options (not checked in get_paired_t_p_timeseries) are valid. - if p_threshold <= 0 or p_threshold > 0.7: - raise InvalidInputError("p_threshold must be between 0 and 0.7.") - - # Get the p value timeseries and n_discard. - p_vals, times_used = get_paired_t_p_timeseries( - data=data, - times=times, - fractional_block_size=fractional_block_size, - fractional_test_end=fractional_test_end, - initial_block_size=initial_block_size, - final_block_size=final_block_size, - t_test_sidedness=t_test_sidedness, - ) - - # Get the index of the first p value that is greater than the threshold. - meets_threshold = p_vals > p_threshold - if not any(meets_threshold): - raise EquilibrationNotDetectedError( - f"No p values are greater than the threshold of {p_threshold}." - ) - equil_time = times_used[_np.argmax(meets_threshold)] - - # Plot the p values. - if plot: - fig = plt.figure(figsize=(6, 4)) - gridspec_obj = gridspec.GridSpec(1, 1, figure=fig) - plot_equilibration_paired_t_test( - fig=fig, - subplot_spec=gridspec_obj[0], - data=data, - p_values=p_vals, - data_times=times, - p_times=times_used, - p_threshold=p_threshold, - time_units=time_units, - data_y_label=data_y_label, - ) - - fig.savefig(str(plot_name), dpi=300, bbox_inches="tight") - - return equil_time diff --git a/deea/tests/test_equilibration.py b/deea/tests/test_equilibration.py index 651e0a0..6a1ecfb 100644 --- a/deea/tests/test_equilibration.py +++ b/deea/tests/test_equilibration.py @@ -205,24 +205,3 @@ def test_detect_equilibration_paired_t(gaussian_noise, example_timeseries): steeply_decaying_timeseries = np.tile(steeply_decaying_timeseries, (5, 1)) with pytest.raises(EquilibrationNotDetectedError): detect_equilibration_paired_t_test(steeply_decaying_timeseries) - - -def test_plots_paired_t(example_timeseries, example_times, tmpdir): - """Check that the plots are created.""" - tmp_output = Path(tmpdir) / "test_plots_paired_t" - detect_equilibration_paired_t_test( - example_timeseries, example_times, plot=True, plot_name=tmp_output - ) - assert tmp_output.with_suffix(".png").exists() - - -def test_p_values(example_timeseries, example_times): - """Tests on the p-values series generator which can't be run indirectly - through the equilibration detection functions.""" - - with pytest.raises(InvalidInputError): - get_paired_t_p_timeseries(example_timeseries, times=example_times[:-2]) - - # Check that this works on indices if no times passed. - _, times = get_paired_t_p_timeseries(example_timeseries) - assert times[-1] == 1312