From 7ff172db32d08df4dca895fc142448e475950293 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 30 Sep 2022 12:27:01 +0000 Subject: [PATCH] use new cluster for each dask test --- tests/python_package_test/test_dask.py | 109 ++++++++++++------------- 1 file changed, 53 insertions(+), 56 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 6bdf3ca50b2c..4d96a2ba127e 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -61,18 +61,15 @@ ] -@pytest.fixture(scope='module') def cluster(): dask_cluster = LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=None) - yield dask_cluster - dask_cluster.close() + return dask_cluster -@pytest.fixture(scope='module') -def cluster2(): - dask_cluster = LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=None) - yield dask_cluster - dask_cluster.close() +class ClientWrapper(Client): + def __exit__(self, exc_type, exc_value, traceback): + super().__exit__(exc_type, exc_value, traceback) + self.cluster.close() @pytest.fixture() @@ -249,8 +246,8 @@ def _objective_logistic_regression(y_true, y_pred): @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification']) @pytest.mark.parametrize('boosting_type', boosting_types) @pytest.mark.parametrize('tree_learner', distributed_training_algorithms) -def test_classifier(output, task, boosting_type, tree_learner, cluster): - with Client(cluster) as client: +def test_classifier(output, task, boosting_type, tree_learner): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective=task, output=output @@ -344,8 +341,8 @@ def test_classifier(output, task, boosting_type, tree_learner, cluster): @pytest.mark.parametrize('output', data_output + ['scipy_csc_matrix']) @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification']) -def test_classifier_pred_contrib(output, task, cluster): - with Client(cluster) as client: +def test_classifier_pred_contrib(output, task): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective=task, output=output @@ -440,8 +437,8 @@ def test_classifier_pred_contrib(output, task, cluster): @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification']) -def test_classifier_custom_objective(output, task, cluster): - with Client(cluster) as client: +def test_classifier_custom_objective(output, task): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective=task, output=output, @@ -539,7 +536,7 @@ def test_machines_to_worker_map_unparseable_host_names(): def test_assign_open_ports_to_workers(cluster): - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: workers = client.scheduler_info()['workers'].keys() n_workers = len(workers) host_to_workers = lgb.dask._group_workers_by_host(workers) @@ -556,7 +553,7 @@ def test_assign_open_ports_to_workers(cluster): def test_training_does_not_fail_on_port_conflicts(cluster): - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, dw, _ = _create_data('binary-classification', output='array') lightgbm_default_port = 12400 @@ -581,8 +578,8 @@ def test_training_does_not_fail_on_port_conflicts(cluster): @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('boosting_type', boosting_types) @pytest.mark.parametrize('tree_learner', distributed_training_algorithms) -def test_regressor(output, boosting_type, tree_learner, cluster): - with Client(cluster) as client: +def test_regressor(output, boosting_type, tree_learner): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective='regression', output=output @@ -661,8 +658,8 @@ def test_regressor(output, boosting_type, tree_learner, cluster): @pytest.mark.parametrize('output', data_output) -def test_regressor_pred_contrib(output, cluster): - with Client(cluster) as client: +def test_regressor_pred_contrib(output): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective='regression', output=output @@ -710,8 +707,8 @@ def test_regressor_pred_contrib(output, cluster): @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('alpha', [.1, .5, .9]) -def test_regressor_quantile(output, alpha, cluster): - with Client(cluster) as client: +def test_regressor_quantile(output, alpha): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective='regression', output=output @@ -757,8 +754,8 @@ def test_regressor_quantile(output, alpha, cluster): @pytest.mark.parametrize('output', data_output) -def test_regressor_custom_objective(output, cluster): - with Client(cluster) as client: +def test_regressor_custom_objective(output): + with ClientWrapper(cluster()) as client: X, y, w, _, dX, dy, dw, _ = _create_data( objective='regression', output=output @@ -810,8 +807,8 @@ def test_regressor_custom_objective(output, cluster): @pytest.mark.parametrize('group', [None, group_sizes]) @pytest.mark.parametrize('boosting_type', boosting_types) @pytest.mark.parametrize('tree_learner', distributed_training_algorithms) -def test_ranker(output, group, boosting_type, tree_learner, cluster): - with Client(cluster) as client: +def test_ranker(output, group, boosting_type, tree_learner): + with ClientWrapper(cluster()) as client: if output == 'dataframe-with-categorical': X, y, w, g, dX, dy, dw, dg = _create_data( objective='ranking', @@ -915,8 +912,8 @@ def test_ranker(output, group, boosting_type, tree_learner, cluster): @pytest.mark.parametrize('output', ['array', 'dataframe', 'dataframe-with-categorical']) -def test_ranker_custom_objective(output, cluster): - with Client(cluster) as client: +def test_ranker_custom_objective(output): + with ClientWrapper(cluster()) as client: if output == 'dataframe-with-categorical': X, y, w, g, dX, dy, dw, dg = _create_data( objective='ranking', @@ -979,11 +976,11 @@ def test_ranker_custom_objective(output, cluster): @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('eval_sizes', [[0.5, 1, 1.5], [0]]) @pytest.mark.parametrize('eval_names_prefix', ['specified', None]) -def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix, cluster): +def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: # Use larger trainset to prevent premature stopping due to zero loss, causing num_trees() < n_estimators. # Use small chunk_size to avoid single-worker allocation of eval data partitions. n_samples = 1000 @@ -1128,8 +1125,8 @@ def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix, @pytest.mark.parametrize('task', ['binary-classification', 'regression', 'ranking']) -def test_eval_set_with_custom_eval_metric(task, cluster): - with Client(cluster) as client: +def test_eval_set_with_custom_eval_metric(task): + with ClientWrapper(cluster()) as client: n_samples = 1000 n_eval_samples = int(n_samples * 0.5) chunk_size = 10 @@ -1200,8 +1197,8 @@ def test_eval_set_with_custom_eval_metric(task, cluster): @pytest.mark.parametrize('task', tasks) -def test_training_works_if_client_not_provided_or_set_after_construction(task, cluster): - with Client(cluster) as client: +def test_training_works_if_client_not_provided_or_set_after_construction(task): + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, _, dg = _create_data( objective=task, output='array', @@ -1265,9 +1262,9 @@ def test_training_works_if_client_not_provided_or_set_after_construction(task, c @pytest.mark.parametrize('serializer', ['pickle', 'joblib', 'cloudpickle']) @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('set_client', [True, False]) -def test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly(serializer, task, set_client, tmp_path, cluster, cluster2): +def test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly(serializer, task, set_client, tmp_path): - with Client(cluster) as client1: + with ClientWrapper(cluster()) as client1: # data on cluster1 X_1, _, _, _, dX_1, dy_1, _, dg_1 = _create_data( objective=task, @@ -1275,7 +1272,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici group=None ) - with Client(cluster2) as client2: + with ClientWrapper(cluster()) as client2: # create identical data on cluster2 X_2, _, _, _, dX_2, dy_2, _, dg_2 = _create_data( objective=task, @@ -1430,7 +1427,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici def test_warns_and_continues_on_unrecognized_tree_learner(cluster): - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: X = da.random.random((1e3, 10)) y = da.random.random((1e3, 1)) dask_regressor = lgb.DaskLGBMRegressor( @@ -1447,8 +1444,8 @@ def test_warns_and_continues_on_unrecognized_tree_learner(cluster): @pytest.mark.parametrize('tree_learner', ['data_parallel', 'voting_parallel']) -def test_training_respects_tree_learner_aliases(tree_learner, cluster): - with Client(cluster) as client: +def test_training_respects_tree_learner_aliases(tree_learner): + with ClientWrapper(cluster()) as client: task = 'regression' _, _, _, _, dX, dy, dw, dg = _create_data(objective=task, output='array') dask_factory = task_to_dask_factory[task] @@ -1466,7 +1463,7 @@ def test_training_respects_tree_learner_aliases(tree_learner, cluster): def test_error_on_feature_parallel_tree_learner(cluster): - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: X = da.random.random((100, 10), chunks=(50, 10)) y = da.random.random(100, chunks=50) X, y = client.persist([X, y]) @@ -1484,7 +1481,7 @@ def test_error_on_feature_parallel_tree_learner(cluster): def test_errors(cluster): - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: def f(part): raise Exception('foo') @@ -1503,11 +1500,11 @@ def f(part): @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('output', data_output) -def test_training_succeeds_even_if_some_workers_do_not_have_any_data(task, output, cluster): +def test_training_succeeds_even_if_some_workers_do_not_have_any_data(task, output): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: def collection_to_single_partition(collection): """Merge the parts of a Dask collection into a single partition.""" if collection is None: @@ -1555,8 +1552,8 @@ def collection_to_single_partition(collection): @pytest.mark.parametrize('task', tasks) -def test_network_params_not_required_but_respected_if_given(task, listen_port, cluster): - with Client(cluster) as client: +def test_network_params_not_required_but_respected_if_given(task, listen_port): + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, _, dg = _create_data( objective=task, output='array', @@ -1613,8 +1610,8 @@ def test_network_params_not_required_but_respected_if_given(task, listen_port, c @pytest.mark.parametrize('task', tasks) -def test_machines_should_be_used_if_provided(task, cluster): - with Client(cluster) as client: +def test_machines_should_be_used_if_provided(task): + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, _, dg = _create_data( objective=task, output='array', @@ -1715,8 +1712,8 @@ def test_dask_methods_and_sklearn_equivalents_have_similar_signatures(methods): @pytest.mark.parametrize('task', tasks) -def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task, cluster): - with Client(cluster) as client: +def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task): + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, dw, dg = _create_data( objective=task, output='dataframe', @@ -1742,11 +1739,11 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('output', data_output) -def test_init_score(task, output, cluster): +def test_init_score(task, output): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, dw, dg = _create_data( objective=task, output=output, @@ -1794,8 +1791,8 @@ def _tested_estimators(): @pytest.mark.parametrize("estimator", _tested_estimators()) @pytest.mark.parametrize("check", sklearn_checks_to_run()) -def test_sklearn_integration(estimator, check, cluster): - with Client(cluster) as client: +def test_sklearn_integration(estimator, check): + with ClientWrapper(cluster()) as client: estimator.set_params(local_listen_port=18000, time_out=5) name = type(estimator).__name__ check(name, estimator) @@ -1811,11 +1808,11 @@ def test_parameters_default_constructible(estimator): @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('output', data_output) -def test_predict_with_raw_score(task, output, cluster): +def test_predict_with_raw_score(task, output): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - with Client(cluster) as client: + with ClientWrapper(cluster()) as client: _, _, _, _, dX, dy, _, dg = _create_data( objective=task, output=output,