diff --git a/red/red/tests/test_equilibration.py b/red/red/tests/test_equilibration.py deleted file mode 100644 index 9cd599c..0000000 --- a/red/red/tests/test_equilibration.py +++ /dev/null @@ -1,224 +0,0 @@ -"""Tests for equilibration detection functions.""" - -from pathlib import Path - -import numpy as np -import pytest - -from red.equilibration import ( - detect_equilibration_init_seq, - detect_equilibration_paired_t_test, - detect_equilibration_window, - get_paired_t_p_timeseries, -) - -from .._exceptions import EquilibrationNotDetectedError, InvalidInputError -from . import example_times, example_timeseries, gaussian_noise, tmpdir - - -def test_detect_equilibration_init_seq(example_timeseries, example_times, tmpdir): - """ - Test equilibration detection based on the minimum sum of squared errors, - using initial sequence methods to estimate the variance. - """ - # Use the mean time to make this faster. - example_timeseries = example_timeseries.mean(axis=0) - - # Compute the equilibration index. - equil_idx, equil_g, equil_ess = detect_equilibration_init_seq( - data=example_timeseries - ) - - # Check that the equilibration index is correct. - assert equil_idx == 398 - assert equil_g == pytest.approx(4.292145845594654, abs=1e-4) - assert equil_ess == pytest.approx(518.85468949889, abs=1e-4) - - # Try also supplying the times. Plot in a temporary directory. - tmp_output = Path(tmpdir) / "test_plots_min_sse_init_seq" - # tmp_output = "./test_plots_min_sse_init_seq" - equil_time, equil_g, equil_ess = detect_equilibration_init_seq( - data=example_timeseries, - times=example_times, - method="min_sse", - plot=True, - plot_name=tmp_output, - sequence_estimator="initial_convex", - smooth_lag_times=True, # To speed up the test. - ) - assert equil_time == 0.3312 - assert equil_g == pytest.approx(4.206494270439359, abs=1e-4) - assert equil_ess == pytest.approx(525.8535630357488, abs=1e-4) - # Check that test_plots_min_sse.png exists. - assert tmp_output.with_suffix(".png").exists() - - -def test_detect_equilibration_init_seq_max_ess(example_timeseries, example_times): - """ - Test equilibration detection based on the maximum effective sample size, - using initial sequence methods to estimate the variance. - """ - # Compute the equilibration index. - equil_idx, equil_g, equil_ess = detect_equilibration_init_seq( - data=example_timeseries, method="max_ess", smooth_lag_times=True - ) - - # Check that the equilibration index is correct. - assert equil_idx == 486 - assert equil_g == pytest.approx(6.8237419281169265, abs=1e-4) - assert equil_ess == pytest.approx(1567.321875982989, abs=1e-4) - - -def test_detect_equilibration_init_seq_raises(example_timeseries): - """ - Test that invalid inputs raise errors. - """ - with pytest.raises(InvalidInputError): - detect_equilibration_init_seq( - method="non_existent", - data=example_timeseries, - sequence_estimator="positive", - ) - - -def test_detect_equilibration_window(example_timeseries, example_times, tmpdir): - """ - Test equilibration detection based on the minimum sum of squared errors, - using window methods to estimate the variance. - """ - # Use the mean time to make this faster. - example_timeseries = example_timeseries.mean(axis=0) - - # Compute the equilibration index. - equil_idx, equil_g, equil_ess = detect_equilibration_window(data=example_timeseries) - - # Check that the equilibration index is correct. - assert equil_idx == 428 - assert equil_g == pytest.approx(2.755411021809227, abs=1e-4) - assert equil_ess == pytest.approx(797.3402089962718, abs=1e-4) - - # Try also supplying the times. Plot in a temporary directory. - tmp_output = Path(tmpdir) / "test_plots_min_sse_window" - # tmp_output = "./test_plots_min_sse_window" - equil_time, equil_g, equil_ess = detect_equilibration_window( - data=example_timeseries, - times=example_times, - plot=True, - plot_name=tmp_output, - plot_window_size=True, - ) - assert equil_time == 0.3432 - assert equil_g == pytest.approx(2.755411021809227, abs=1e-4) - assert equil_ess == pytest.approx(797.3402089962718, abs=1e-4) - # Check that test_plots_min_sse.png exists. - assert tmp_output.with_suffix(".png").exists() - - -def test_detect_equilibration_window_max_ess(example_timeseries, example_times): - """ - Test equilibration detection based on the maximum effective sample size, - using window methods to estimate the variance. - """ - # Compute the equilibration index. - equil_idx, equil_g, equil_ess = detect_equilibration_window( - data=example_timeseries, method="max_ess" - ) - - # Check that the equilibration index is correct. - assert equil_idx == 624 - assert equil_g == pytest.approx(9.498261999314913, abs=1e-4) - assert equil_ess == pytest.approx(1053.350602533562, abs=1e-4) - - -def test_compare_pymbar(example_timeseries): - """Check that we get the same result as pymbar when equivalent - methods are used.""" - example_timeseries = example_timeseries.mean(axis=0) - - # Results below were obtained with: - # ( equil_idx_chod, - # equil_g_chod, - # equil_ess_chod, - # ) = pymbar.timeseries.detect_equilibration(example_timeseries, fast=False, nskip=1) - equil_idx_chod = 877 - equil_g_chod = 4.111825 - # equil_ess_chod = 425.35858 - equil_idx, equil_g, _ = detect_equilibration_init_seq( - example_timeseries, method="max_ess", sequence_estimator="positive" - ) - assert equil_idx == equil_idx_chod - assert equil_g == pytest.approx(equil_g_chod, abs=1e-4) - - -def test_detect_equilibration_paired_t(gaussian_noise, example_timeseries): - """ - Test equilibration detection based on the paired t-test. - """ - # Check that it fails for a single run. - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise[0]) - - # Check the index for the correlated timeseries. - idx = detect_equilibration_paired_t_test(example_timeseries) - assert idx == 328 - - # Try stupid inputs and check that we get input errors. - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, p_threshold=0) - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, p_threshold=1) - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, fractional_block_size=0) - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, fractional_block_size=1) - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, t_test_sidedness="asd") - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, fractional_test_end=0) - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test( - gaussian_noise, fractional_test_end=0.3, initial_block_size=0.5 - ) - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test( - gaussian_noise, fractional_test_end=0.3, initial_block_size=0.3 - ) - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, initial_block_size=1.2) - - with pytest.raises(InvalidInputError): - detect_equilibration_paired_t_test(gaussian_noise, final_block_size=1.2) - - # Generate steeply decaying timeseries and check that it fails to equilibrate. - # Generate 5 very similar exponential decays. - steeply_decaying_timeseries = np.exp(-np.arange(1000)) - steeply_decaying_timeseries = np.tile(steeply_decaying_timeseries, (5, 1)) - with pytest.raises(EquilibrationNotDetectedError): - detect_equilibration_paired_t_test(steeply_decaying_timeseries) - - -def test_plots_paired_t(example_timeseries, example_times, tmpdir): - """Check that the plots are created.""" - tmp_output = Path(tmpdir) / "test_plots_paired_t" - detect_equilibration_paired_t_test( - example_timeseries, example_times, plot=True, plot_name=tmp_output - ) - assert tmp_output.with_suffix(".png").exists() - - -def test_p_values(example_timeseries, example_times): - """Tests on the p-values series generator which can't be run indirectly - through the equilibration detection functions.""" - - with pytest.raises(InvalidInputError): - get_paired_t_p_timeseries(example_timeseries, times=example_times[:-2]) - - # Check that this works on indices if no times passed. - _, times = get_paired_t_p_timeseries(example_timeseries) - assert times[-1] == 1312 diff --git a/red/red/tests/test_plot.py b/red/red/tests/test_plot.py deleted file mode 100644 index af9f999..0000000 --- a/red/red/tests/test_plot.py +++ /dev/null @@ -1,183 +0,0 @@ -"""Tests for the plotting functions.""" - -import matplotlib.pyplot as plt -import numpy as np -import pytest -from matplotlib import gridspec - -from red.equilibration import get_paired_t_p_timeseries - -from .._exceptions import InvalidInputError -from ..equilibration import get_sse_series_init_seq -from ..plot import ( - plot_equilibration_min_sse, - plot_equilibration_paired_t_test, - plot_p_values, - plot_sse, - plot_timeseries, -) -from . import example_times, example_timeseries - -# Set to True to save the plots. -SAVE_PLOTS = False - - -def test_plot_timeseries(example_timeseries, example_times): - """Test plotting a timeseries.""" - # Create a figure. - fig, ax = plt.subplots() - - # Plot the timeseries. - plot_timeseries(ax, example_timeseries, example_times) - # fig.savefig("test_plot_timeseries.png") - - # Check that it works for a single run. - fig, ax = plt.subplots() - plot_timeseries(ax, example_timeseries[0], example_times) - - if SAVE_PLOTS: - fig.savefig("test_plot_timeseries_single.png") - - # Try with n_blocks == 0. - plot_timeseries(ax, example_timeseries, example_times, n_blocks=0) - - # Test that invalid input raises an error. - with pytest.raises(InvalidInputError): - plot_timeseries(ax, example_timeseries, example_times, n_blocks=-1) - - with pytest.raises(InvalidInputError): - plot_timeseries(ax, example_timeseries, list(example_times)) - - with pytest.raises(InvalidInputError): - plot_timeseries(ax, example_timeseries, example_times[:-2]) - - with pytest.raises(InvalidInputError): - plot_timeseries(ax, example_timeseries, example_timeseries) - - -def test_plot_sse(example_timeseries, example_times): - """Test plotting the sum of squared errors.""" - # Create a figure. - fig, ax = plt.subplots() - - # Compute the SSE for the example timeseries. - sse_vals, lag_times = get_sse_series_init_seq( - data=example_timeseries, smooth_lag_times=True - ) - times = example_times[: len(sse_vals)] - - # Plot the SSE. - plot_sse(ax=ax, sse=sse_vals, max_lags=lag_times, window_sizes=None, times=times) - - if SAVE_PLOTS: - fig.savefig("test_plot_sse.png") - - # Check that invalid input raises an error. - with pytest.raises(InvalidInputError): - plot_sse(ax, sse_vals, max_lags=None, window_sizes=None, times=times[:-2]) - - with pytest.raises(InvalidInputError): - # Make sse_vals a 2D array. - plot_sse( - ax, - np.array([sse_vals, sse_vals]), - max_lags=lag_times, - window_sizes=lag_times, - times=times, - ) - - with pytest.raises(InvalidInputError): - plot_sse( - ax, - list(sse_vals), - max_lags=None, - window_sizes=None, - times=times, - ) - - with pytest.raises(InvalidInputError): - plot_sse( - ax, - sse_vals, - max_lags=lag_times, - window_sizes=lag_times, - times=times, - ) - - -def test_plot_equilibration_min_sse(example_timeseries, example_times): - """Test plotting the equilibration detection based on the minimum SSE.""" - # Take mean to speed things up, but plot the original data to - # test this works. - example_timeseries_mean = example_timeseries.mean(axis=0) - - # Create a figure. - fig = plt.figure(figsize=(6, 4)) - gridspec_obj = gridspec.GridSpec(1, 1, figure=fig) - - # Compute the SSE for the example timeseries. - sse_vals, _ = get_sse_series_init_seq( - data=example_timeseries_mean, smooth_lag_times=True - ) - times = example_times[: len(sse_vals)] - - # Plot the equilibration detection. - plot_equilibration_min_sse( - fig=fig, - subplot_spec=gridspec_obj[0], - data=example_timeseries, - sse_series=sse_vals, - max_lag_series=None, - data_times=example_times, - sse_times=times, - ) - - if SAVE_PLOTS: - fig.savefig("test_plot_equilibration_min_sse.png", bbox_inches="tight") - - -def test_plot_p_values(example_timeseries, example_times): - """Test plotting the p-values of the paired t-test.""" - # Create a figure. - fig, ax = plt.subplots() - _, n_samples = example_timeseries.shape - - # Compute the p-values for the example timeseries. - p_values, times_used = get_paired_t_p_timeseries( - example_timeseries, example_times, t_test_sidedness="two-sided" - ) - - # Plot the p-values. - plot_p_values(ax, p_values, times_used) - - if SAVE_PLOTS: - fig.savefig("test_plot_p_values.png") - - # Check that invalid input raises an error. - with pytest.raises(InvalidInputError): - plot_p_values(ax, p_values, example_times[:-2]) - - with pytest.raises(InvalidInputError): - plot_p_values(ax, list(p_values), example_timeseries) - - with pytest.raises(InvalidInputError): - # Make p_values a 2D array. - plot_p_values(ax, np.array([p_values, p_values]), example_timeseries) - - -def test_plot_equilibration_paired_t_test(example_timeseries, example_times): - """Test plotting the equilibration detection based on the paired t-test.""" - # Create a figure. - fig = plt.figure(figsize=(6, 4)) - gridspec_obj = gridspec.GridSpec(1, 1, figure=fig) - - # Compute the p-values for the example timeseries. - p_values, p_times = get_paired_t_p_timeseries(example_timeseries, example_times) - - # Plot the equilibration detection. - plot_equilibration_paired_t_test( - fig, gridspec_obj[0], example_timeseries, p_values, example_times, p_times - ) - - if SAVE_PLOTS: - fig.savefig("test_plot_equilibration_paired_t_test.png", bbox_inches="tight") diff --git a/red/red/tests/test_sse.py b/red/red/tests/test_sse.py deleted file mode 100644 index cdf566e..0000000 --- a/red/red/tests/test_sse.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Tests for the red.sse module, which computes the squared standard error series.""" - -import pytest - -from red.sse import get_sse_series_init_seq, get_sse_series_window - -from . import example_timeseries - - -def test_get_sse_series_init_seq(example_timeseries): - """Test the get_sse_series_init_seq function.""" - # Get the squared standard error series. - sse_series, _ = get_sse_series_init_seq(example_timeseries, smooth_lag_times=True) - - assert sse_series[0] == pytest.approx(2.972160655979027, abs=1e-4) - assert sse_series[1] == pytest.approx(2.948013765819789, abs=1e-4) - assert sse_series[-1] == pytest.approx(3.042444011451404, abs=1e-4) - - -def test_get_sse_series_window(example_timeseries): - """Test the get_sse_series_window function.""" - # Get the squared standard error series. - sse_series, _ = get_sse_series_window(example_timeseries) - - assert sse_series[0] == pytest.approx(3.549012057748636, abs=1e-4) - assert sse_series[1] == pytest.approx(3.5341649172948486, abs=1e-4) - assert sse_series[-1] == pytest.approx(4.1203730796278455, abs=1e-4) diff --git a/red/red/tests/test_variance.py b/red/red/tests/test_variance.py deleted file mode 100644 index a4e2fcc..0000000 --- a/red/red/tests/test_variance.py +++ /dev/null @@ -1,416 +0,0 @@ -""" -Unit and regression test for the variance module. -""" - -import numpy as np -import numpy.testing as npt -import pytest -from statsmodels.tsa.stattools import acovf - -from red._exceptions import AnalysisError, InvalidInputError -from red.variance import ( - _get_autocovariance, - _get_autocovariance_window, - _get_gamma_cap, - _smoothen_max_lag_times, - get_variance_initial_sequence, - get_variance_series_initial_sequence, - get_variance_series_window, - get_variance_window, - inter_run_variance, - intra_run_variance, - lugsail_variance, - replicated_batch_means_variance, -) - -from . import example_times, example_timeseries, gaussian_noise, gaussian_noise_offset - - -def test_get_autocovariance(example_timeseries): - """Test the _get_autocovariance function.""" - example_timeseries = example_timeseries.mean(axis=0) - example_timeseries_demeaned = example_timeseries - example_timeseries.mean() - - # Compute the autocovariance function with no truncation. - autocovariance = _get_autocovariance(example_timeseries) - autocovariance_statsmodels = acovf(example_timeseries_demeaned, fft=False) - assert autocovariance == pytest.approx(autocovariance_statsmodels, abs=0.01) - - # Test with max lag time. - autocovariance = _get_autocovariance(example_timeseries, max_lag=10) - autocovariance_statsmodels = acovf(example_timeseries_demeaned, fft=False, nlag=10) - assert autocovariance == pytest.approx(autocovariance_statsmodels, abs=0.01) - assert len(autocovariance) == 11 - - # Test with different mean. - autocovariance = _get_autocovariance( - example_timeseries, mean=50 - ) # Actual mean 54.9 - assert autocovariance[:3] == pytest.approx( - np.array([342.65922098, 132.30372161, 120.22912133]), abs=0.01 - ) - - -def test_gamma_cap(example_timeseries): - """Test the _get_gamma_cap function.""" - example_timeseries = example_timeseries.mean(axis=0) - - # Compute the autocovariance function with no truncation. - autocovariance = _get_autocovariance(example_timeseries) - - # Compute the gamma function. - gamma_cap = _get_gamma_cap(autocovariance) - - # Check that we get the right lengths of results. - assert len(gamma_cap) == len(autocovariance) // 2 - - # Check that the gamma function result has not regressed. - assert gamma_cap[:3] == pytest.approx( - np.array([427.04712998, 193.80629572, 183.46922874]), abs=0.01 - ) - assert gamma_cap[-3:] == pytest.approx( - np.array([0.54119574, 0.04336649, 0.68390851]), abs=0.01 - ) - - # Check that an analysis error is raised if input sequence too short. - with pytest.raises(AnalysisError): - _get_gamma_cap(example_timeseries[:1]) - - -def test_variance_initial_sequence(example_timeseries): - """Test the get_variance_initial_sequence function.""" - # Take mean of timeseries. - example_timeseries = example_timeseries.mean(axis=0) - - # Get variances with different methods. - methods = [ - "positive", - "initial_positive", - "initial_monotone", - "initial_convex", - ] - res = { - method: get_variance_initial_sequence(np.array([example_timeseries]), method) - for method in methods - } - - # Geyer results obtained from R package mcmc. - geyer_res = { - "initial_positive": 27304.776049686046, - "initial_monotone": 24216.32288181667, - "initial_convex": 21904.973713096544, # Actual MCMC result is 21898.6178149005, but very close. - } - - # Check that the results are correct. - for method in geyer_res: - assert res[method][0] == pytest.approx(geyer_res[method], abs=0.01) - - # Check that the results decrease in the correct order. - assert ( - res["initial_positive"][0] - > res["positive"][0] - > res["initial_monotone"][0] - > res["initial_convex"][0] - ) - - -def test_variance_initial_sequence_multiple_runs(example_timeseries): - """Regression test for the get_variance_initial_sequence function - with multiple runs.""" - # Expected results. - expected = { - "positive": 49723.19149920802, - "initial_positive": 52014.07839470464, - "initial_monotone": 43587.36425899603, - "initial_convex": 39009.608609724724, - } - for method in expected: - var = get_variance_initial_sequence(example_timeseries, method)[0] - assert var == pytest.approx(expected[method], abs=0.01) - - -def test_get_variance_series_initial_sequence(example_timeseries): - """Test the get_variance_series_initial_sequence function.""" - # Take mean of timeseries. - example_timeseries = example_timeseries.mean(axis=0) - - var_seq = get_variance_series_initial_sequence( - example_timeseries, sequence_estimator="initial_positive", min_max_lag_time=0 - )[0] - assert var_seq[0] == pytest.approx(27304.77604968605, abs=0.01) - assert var_seq[1] == pytest.approx(27121.16749972886, abs=0.01) - assert var_seq[-1] == pytest.approx(819.4266803627752, abs=0.01) - - -def test_get_variance_initial_sequence_raises(example_timeseries): - """Test that get_variance_initial_sequence raises the errors - if the input is not valid.""" - with pytest.raises(InvalidInputError): - get_variance_series_initial_sequence( - example_timeseries, sequence_estimator="not_implemented", min_max_lag_time=1 - ) - - with pytest.raises(InvalidInputError): - get_variance_series_initial_sequence( - example_timeseries, - sequence_estimator="initial_positive", - min_max_lag_time=-1, - ) - - with pytest.raises(InvalidInputError): - get_variance_initial_sequence( - example_timeseries, - sequence_estimator="initial_positive", - min_max_lag_time=10_000, - ) - - for max_max_lag_time in [-1, 10_000, 3]: - with pytest.raises(InvalidInputError): - get_variance_initial_sequence( - example_timeseries, - sequence_estimator="initial_positive", - min_max_lag_time=5, - max_max_lag_time=max_max_lag_time, - ) - - for autocov in [4, np.array([[1, 2, 3]]), np.array([1])]: - with pytest.raises(InvalidInputError): - get_variance_initial_sequence( - example_timeseries, - sequence_estimator="initial_positive", - autocov=autocov, - ) - - with pytest.raises(AnalysisError): - get_variance_initial_sequence( - np.zeros((10)), - sequence_estimator="initial_positive", - ) - - with pytest.raises(InvalidInputError): - get_variance_initial_sequence( - example_timeseries, - sequence_estimator="initial_positive", - max_max_lag_time=4, - autocov=np.array([1, 2, 3]), - ) - - -def test_get_variance_series_initial_sequence_raises(example_timeseries): - """Test that get_variance_series_initial_sequence raises the errors - if the input is not valid.""" - for frac_pad in [-0.1, 1.1]: - with pytest.raises(InvalidInputError): - get_variance_series_initial_sequence( - example_timeseries, - sequence_estimator="initial_positive", - frac_padding=frac_pad, - ) - - with pytest.warns(UserWarning): - get_variance_series_initial_sequence( - example_timeseries, sequence_estimator="initial_positive", frac_padding=0.6 - ) - - -def test_smoothen_lags(): - """Test the _smoothen_max_lag_times function.""" - rough_lags = np.array([10, 11, 6, 5, 4, 4, 4, 3]) - smooth_lags = _smoothen_max_lag_times(rough_lags) - # Compare the arrays of lists using pytest. Expect answer is - # monotonic decreasing and interpolated between change points. - npt.assert_array_equal(smooth_lags, np.array([10, 8, 6, 5, 4, 4, 3, 3])) - - -def test_get_variance_series_initial_sequence_smooth_lags(example_timeseries): - """Regression test for the get_variance_series_initial_sequence function - with smoothened lags.""" - # Take mean of timeseries. - example_timeseries = example_timeseries.mean(axis=0) - - var_seq = get_variance_series_initial_sequence( - example_timeseries, sequence_estimator="initial_positive", smooth_lag_times=True - )[0] - assert var_seq[0] == pytest.approx(27304.77604968605, abs=0.01) - assert var_seq[1] == pytest.approx(27118.973970536812, abs=0.01) - assert var_seq[20] == pytest.approx(22105.33941568437, abs=0.01) - assert var_seq[-1] == pytest.approx(690.7015384062945, abs=0.01) - - # Try with a sequence where the max lag will be the end of the sequence. - smooth_seq = np.array([1.0, -1.0, 1.0, -1.0, 1.0, -1.0, 1.0, -1.0]) - var_seq = get_variance_series_initial_sequence( - smooth_seq, sequence_estimator="initial_positive", smooth_lag_times=True - )[0] - assert var_seq[0] == pytest.approx(1, abs=0.01) - - -def test_get_autocovariance_window(): - """Check that an error is raised if the window size is too large.""" - with pytest.raises(InvalidInputError): - _get_autocovariance_window(np.array([[1, 2, 3]]), window_size=4) - - -def test_get_variance_window(example_timeseries): - """Test the get_variance_window function.""" - # Take mean of timeseries. - example_timeseries = np.array([example_timeseries.mean(axis=0)]) - - # First, try with a window size of 1 and check that we get the same result as - # np.var. - variance = get_variance_window(example_timeseries, window_size=1) - assert variance == pytest.approx(np.var(example_timeseries), abs=0.01) - - # Now try with a window size of 5. - variance = get_variance_window(example_timeseries, window_size=5) - assert variance == pytest.approx(721.2492651825239, abs=0.01) - - -def test_get_variance_window_raises(example_timeseries): - """Check that errors are raised if the input is not valid.""" - with pytest.raises(InvalidInputError): - get_variance_window(example_timeseries, kernel=4) - - for window_size in [-1, 0, 10_000]: - with pytest.raises(InvalidInputError): - get_variance_window(example_timeseries, window_size=window_size) - - with pytest.raises(AnalysisError): - get_variance_window(np.zeros((10, 10)), window_size=1) - - -def test_get_variance_window_multiple_runs(example_timeseries): - """Regression test for the get_variance_window function with multiple runs.""" - # Expected results. - expected = { - 1: 1345.5038988334522, - 5: 2220.1371817657955, - 10: 3182.492111191889, - 100: 14670.359227619572, - } - for window_size in expected: - var = get_variance_window(example_timeseries, window_size=window_size) - assert var == pytest.approx(expected[window_size], abs=0.01) - - -def test_get_variance_series_window(example_timeseries): - """Test the get_variance_series_window function.""" - # Take mean of timeseries. - example_timeseries = example_timeseries.mean(axis=0) - - var_seq = get_variance_series_window( - example_timeseries, window_size=1, window_size_fn=None - )[0] - assert var_seq[0] == pytest.approx(318.6396575172195, abs=0.01) - assert var_seq[1] == pytest.approx(318.3955761337024, abs=0.01) - assert var_seq[-1] == pytest.approx(243.41472988044396, abs=0.01) - - # Now try with the window size function. - var_seq = get_variance_series_window( - example_timeseries, window_size=None, window_size_fn=lambda x: round(x**0.5) - )[0] - assert var_seq[0] == pytest.approx(4262.823818412766, abs=0.01) - assert var_seq[1] == pytest.approx(22701.2073132481, abs=0.01) - assert var_seq[-1] == pytest.approx(664.5874784139719, abs=0.01) - - -def test_replicated_batch_means_variance(gaussian_noise): - """Test the replicated batch means variance.""" - # Compute the replicated batch means variance. - variance = replicated_batch_means_variance(gaussian_noise, 1) - - # Check that the variance is correct. - assert variance == pytest.approx(0.1, abs=0.01) - - # Make the batch 100 times larger and check that the variance estimate does not change. - variance = replicated_batch_means_variance(gaussian_noise, 10) - assert variance == pytest.approx(0.1, abs=0.01) - - # Check that it works for a single run. - variance = replicated_batch_means_variance(gaussian_noise[0], 1) - assert variance == pytest.approx(0.1, abs=0.01) - - # Make sure errors are raised if the batch size is too small or large. - with pytest.raises(InvalidInputError): - replicated_batch_means_variance(gaussian_noise, 0) - - with pytest.raises(InvalidInputError): - replicated_batch_means_variance(gaussian_noise, 100_000) - - -def test_lugsail_variance(gaussian_noise): - """Test the lugsail variance.""" - # Compute the lugsail variance. - variance = lugsail_variance(gaussian_noise) - - # Check that the variance is correct. - assert variance == pytest.approx(0.1, abs=0.02) - - # Check that it works for a single run. - variance = lugsail_variance(gaussian_noise[0]) - assert variance == pytest.approx( - 0.1, abs=0.05 - ) # Wide tolerance due to higher variance from single run. - - # Make sure errors are raised if n_pow is unreasonable. - with pytest.raises(InvalidInputError): - lugsail_variance(gaussian_noise, 0) - - with pytest.raises(InvalidInputError): - lugsail_variance(gaussian_noise, 100_000) - - with pytest.raises(AnalysisError): - lugsail_variance(gaussian_noise, 0.1) - - -def test_get_variance_series_window_raises(example_timeseries): - """Check that get_variance_series_window raises the errors - if the input is not valid.""" - with pytest.raises(InvalidInputError): - get_variance_series_window( - example_timeseries, window_size=1, window_size_fn=lambda x: x**0.5 - ) - - with pytest.raises(InvalidInputError): - get_variance_series_window( - example_timeseries, window_size=None, window_size_fn=None - ) - - with pytest.raises(InvalidInputError): - get_variance_series_window( - example_timeseries, window_size=None, window_size_fn=4 - ) - - for frac_pad in [-0.1, 1.1]: - with pytest.raises(InvalidInputError): - get_variance_series_window(example_timeseries, frac_padding=frac_pad) - - with pytest.warns(UserWarning): - get_variance_series_window(example_timeseries, frac_padding=0.6) - - -def test_inter_run_variance(gaussian_noise): - """Test the inter-run variance.""" - # Compute the inter-run variance. - variance = inter_run_variance(gaussian_noise) - - # Check that the variance is correct. - assert variance == pytest.approx( - 0.1, abs=0.5 - ) # Wide tolerance due to high variance. - - # Check that it raises an invalid input error if the data is one dimensional. - with pytest.raises(InvalidInputError): - inter_run_variance(gaussian_noise[0]) - - -def test_intra_run_variance(gaussian_noise): - """Test the intra-run variance.""" - # Compute the intra-run variance. - variance = intra_run_variance(gaussian_noise) - - # Check that the variance is correct. - assert variance == pytest.approx(0.1, abs=0.01) - - # Check that it runs for a single run. - variance = intra_run_variance(gaussian_noise[0]) - assert variance == pytest.approx(0.1, abs=0.01)