Skip to content

Commit

Permalink
[feat] Support statistics print by adding results manager object (#334)
Browse files Browse the repository at this point in the history
* [feat] Support statistics print by adding results manager object

* [refactor] Make SearchResults extract run_history at __init__

Since the search results should not be kept in eternally,
I made this class to take run_history in __init__ so that
we can implicitly call extraction inside.
From this change, the call of extraction from outside is not recommended.
However, you can still call it from outside and to prevent mixup of
the environment, self.clear() will be called.

* [fix] Separate those changes into PR#336

* [fix] Fix so that test_loss includes all the metrics

* [enhance] Strengthen the test for sprint and SearchResults

* [fix] Fix an issue in documentation

* [enhance] Increase the coverage

* [refactor] Separate the test for results_manager to organize the structure

* [test] Add the test for get_incumbent_Result

* [test] Remove the previous test_get_incumbent and see the coverage

* [fix] [test] Fix reversion of metric and strengthen the test cases

* [fix] Fix flake8 issues and increase coverage

* [fix] Address Ravin's comments

* [enhance] Increase the coverage

* [fix] Fix a flake8 issu
  • Loading branch information
nabenabe0928 authored Nov 21, 2021
1 parent 2d2f6d1 commit 1e06cce
Show file tree
Hide file tree
Showing 15 changed files with 2,505 additions and 118 deletions.
98 changes: 72 additions & 26 deletions autoPyTorch/api/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from smac.stats.stats import Stats
from smac.tae import StatusType

from autoPyTorch.api.results_manager import ResultsManager, SearchResults
from autoPyTorch.automl_common.common.utils.backend import Backend, create
from autoPyTorch.constants import (
REGRESSION_TASKS,
Expand Down Expand Up @@ -192,12 +193,13 @@ def __init__(
self.search_space: Optional[ConfigurationSpace] = None
self._dataset_requirements: Optional[List[FitRequirement]] = None
self._metric: Optional[autoPyTorchMetric] = None
self._scoring_functions: Optional[List[autoPyTorchMetric]] = None
self._logger: Optional[PicklableClientLogger] = None
self.run_history: RunHistory = RunHistory()
self.trajectory: Optional[List] = None
self.dataset_name: Optional[str] = None
self.cv_models_: Dict = {}

self._results_manager = ResultsManager()

# By default try to use the TCP logging port or get a new port
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT

Expand Down Expand Up @@ -240,6 +242,18 @@ def build_pipeline(self, dataset_properties: Dict[str, Any]) -> BasePipeline:
"""
raise NotImplementedError

@property
def run_history(self) -> RunHistory:
return self._results_manager.run_history

@property
def ensemble_performance_history(self) -> List[Dict[str, Any]]:
return self._results_manager.ensemble_performance_history

@property
def trajectory(self) -> Optional[List]:
return self._results_manager.trajectory

def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None:
"""
Check whether arguments are valid and
Expand Down Expand Up @@ -883,6 +897,12 @@ def _search(

self.pipeline_options['optimize_metric'] = optimize_metric

if all_supported_metrics:
self._scoring_functions = get_metrics(dataset_properties=dataset_properties,
all_supported_metrics=True)
else:
self._scoring_functions = [self._metric]

self.search_space = self.get_search_space(dataset)

# Incorporate budget to pipeline config
Expand Down Expand Up @@ -1037,12 +1057,14 @@ def _search(
pynisher_context=self._multiprocessing_context,
)
try:
run_history, self.trajectory, budget_type = \
run_history, self._results_manager.trajectory, budget_type = \
_proc_smac.run_smbo(func=tae_func)
self.run_history.update(run_history, DataOrigin.INTERNAL)
trajectory_filename = os.path.join(
self._backend.get_smac_output_directory_for_run(self.seed),
'trajectory.json')

assert self.trajectory is not None # mypy check
saveable_trajectory = \
[list(entry[:2]) + [entry[2].get_dictionary()] + list(entry[3:])
for entry in self.trajectory]
Expand All @@ -1059,7 +1081,7 @@ def _search(
self._logger.info("Starting Shutdown")

if proc_ensemble is not None:
self.ensemble_performance_history = list(proc_ensemble.history)
self._results_manager.ensemble_performance_history = list(proc_ensemble.history)

if len(proc_ensemble.futures) > 0:
# Also add ensemble runs that did not finish within smac time
Expand All @@ -1068,7 +1090,7 @@ def _search(
result = proc_ensemble.futures.pop().result()
if result:
ensemble_history, _, _, _ = result
self.ensemble_performance_history.extend(ensemble_history)
self._results_manager.ensemble_performance_history.extend(ensemble_history)
self._logger.info("Ensemble script finished, continue shutdown.")

# save the ensemble performance history file
Expand Down Expand Up @@ -1356,28 +1378,12 @@ def get_incumbent_results(
The incumbent configuration
Dict[str, Union[int, str, float]]:
Additional information about the run of the incumbent configuration.
"""
assert self.run_history is not None, "No Run History found, search has not been called."
if self.run_history.empty():
raise ValueError("Run History is empty. Something went wrong, "
"smac was not able to fit any model?")

run_history_data = self.run_history.data
if not include_traditional:
# traditional classifiers have trainer_configuration in their additional info
run_history_data = dict(
filter(lambda elem: elem[1].status == StatusType.SUCCESS and elem[1].
additional_info is not None and elem[1].
additional_info['configuration_origin'] != 'traditional',
run_history_data.items()))
run_history_data = dict(
filter(lambda elem: 'SUCCESS' in str(elem[1].status), run_history_data.items()))
sorted_runvalue_by_cost = sorted(run_history_data.items(), key=lambda item: item[1].cost)
incumbent_run_key, incumbent_run_value = sorted_runvalue_by_cost[0]
incumbent_config = self.run_history.ids_config[incumbent_run_key.config_id]
incumbent_results = incumbent_run_value.additional_info
return incumbent_config, incumbent_results

if self._metric is None:
raise RuntimeError("`search_results` is only available after a search has finished.")

return self._results_manager.get_incumbent_results(metric=self._metric, include_traditional=include_traditional)

def get_models_with_weights(self) -> List:
if self.models_ is None or len(self.models_) == 0 or \
Expand Down Expand Up @@ -1417,3 +1423,43 @@ def _print_debug_info_to_log(self) -> None:
self._logger.debug(' multiprocessing_context: %s', str(self._multiprocessing_context))
for key, value in vars(self).items():
self._logger.debug(f"\t{key}->{value}")

def get_search_results(self) -> SearchResults:
"""
Get the interface to obtain the search results easily.
"""
if self._scoring_functions is None or self._metric is None:
raise RuntimeError("`search_results` is only available after a search has finished.")

return self._results_manager.get_search_results(
metric=self._metric,
scoring_functions=self._scoring_functions
)

def sprint_statistics(self) -> str:
"""
Prints statistics about the SMAC search.
These statistics include:
1. Optimisation Metric
2. Best Optimisation score achieved by individual pipelines
3. Total number of target algorithm runs
4. Total number of successful target algorithm runs
5. Total number of crashed target algorithm runs
6. Total number of target algorithm runs that exceeded the time limit
7. Total number of successful target algorithm runs that exceeded the memory limit
Returns:
(str):
Formatted string with statistics
"""
if self._scoring_functions is None or self._metric is None:
raise RuntimeError("`search_results` is only available after a search has finished.")

assert self.dataset_name is not None # my check
return self._results_manager.sprint_statistics(
dataset_name=self.dataset_name,
scoring_functions=self._scoring_functions,
metric=self._metric
)
Loading

0 comments on commit 1e06cce

Please sign in to comment.