From 73865035c6fac86321ea86368d515d4fed068dba Mon Sep 17 00:00:00 2001 From: sarahmish Date: Tue, 28 Jul 2020 15:52:08 +0300 Subject: [PATCH 1/7] added dictionary to record block execution time --- mlblocks/mlpipeline.py | 17 +++++++++++++++++ setup.py | 1 + 2 files changed, 18 insertions(+) diff --git a/mlblocks/mlpipeline.py b/mlblocks/mlpipeline.py index dcfc8a0b..6fc789d4 100644 --- a/mlblocks/mlpipeline.py +++ b/mlblocks/mlpipeline.py @@ -8,6 +8,7 @@ import warnings from collections import Counter, OrderedDict, defaultdict from copy import deepcopy +from datetime import datetime import numpy as np @@ -223,6 +224,7 @@ def __init__(self, pipeline=None, primitives=None, init_params=None, self.set_hyperparameters(hyperparameters) self._re_block_name = re.compile(r'(^[^#]+#\d+)(\..*)?') + self.time = dict() def _get_str_output(self, output): """Get the outputs that correspond to the str specification.""" @@ -390,6 +392,18 @@ def get_output_variables(self, outputs='default'): outputs = self.get_outputs(outputs) return [output['variable'] for output in outputs] + def get_time(self): + """Get the execution time of each block. + + If called before fitting the pipeline, it will return an empty dictionary. + + Returns: + dict: + A dictionary containing the block names as keys and + the execution time in seconds as values. + """ + return self.time.copy() + def _extract_block_name(self, variable_name): return self._re_block_name.search(variable_name).group(1) @@ -616,7 +630,10 @@ def _fit_block(self, block, block_name, context): LOGGER.debug("Fitting block %s", block_name) try: fit_args = self._get_block_args(block_name, block.fit_args, context) + start = datetime.utcnow() block.fit(**fit_args) + elapsed = datetime.utcnow() - start + self.time[block_name] = elapsed.total_seconds() except Exception: if self.verbose: LOGGER.exception("Exception caught fitting MLBlock %s", block_name) diff --git a/setup.py b/setup.py index a4fcc7a3..56ab70cd 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ install_requires = [ + 'Keras>=2.1.6,<2.4' ] From d35544ed72850f6ed4224f16e1344039b1bfb2f1 Mon Sep 17 00:00:00 2001 From: sarahmish Date: Tue, 28 Jul 2020 22:39:37 +0300 Subject: [PATCH 2/7] add debug argument for fit/predict --- mlblocks/mlpipeline.py | 90 ++++++++++++++++++++++++++++++------------ setup.py | 1 - 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/mlblocks/mlpipeline.py b/mlblocks/mlpipeline.py index 6fc789d4..8e5d0629 100644 --- a/mlblocks/mlpipeline.py +++ b/mlblocks/mlpipeline.py @@ -224,7 +224,6 @@ def __init__(self, pipeline=None, primitives=None, init_params=None, self.set_hyperparameters(hyperparameters) self._re_block_name = re.compile(r'(^[^#]+#\d+)(\..*)?') - self.time = dict() def _get_str_output(self, output): """Get the outputs that correspond to the str specification.""" @@ -392,18 +391,6 @@ def get_output_variables(self, outputs='default'): outputs = self.get_outputs(outputs) return [output['variable'] for output in outputs] - def get_time(self): - """Get the execution time of each block. - - If called before fitting the pipeline, it will return an empty dictionary. - - Returns: - dict: - A dictionary containing the block names as keys and - the execution time in seconds as values. - """ - return self.time.copy() - def _extract_block_name(self, variable_name): return self._re_block_name.search(variable_name).group(1) @@ -625,7 +612,7 @@ def _update_outputs(self, variable_name, output_variables, outputs, value): index = output_variables.index(variable_name) outputs[index] = deepcopy(value) - def _fit_block(self, block, block_name, context): + def _fit_block(self, block, block_name, context, debug=False): """Get the block args from the context and fit the block.""" LOGGER.debug("Fitting block %s", block_name) try: @@ -633,14 +620,21 @@ def _fit_block(self, block, block_name, context): start = datetime.utcnow() block.fit(**fit_args) elapsed = datetime.utcnow() - start - self.time[block_name] = elapsed.total_seconds() + + if debug: + debug_info = { + "elapsed": elapsed.total_seconds(), + "input": fit_args + } + return debug_info + except Exception: if self.verbose: LOGGER.exception("Exception caught fitting MLBlock %s", block_name) raise - def _produce_block(self, block, block_name, context, output_variables, outputs): + def _produce_block(self, block, block_name, context, output_variables, outputs, debug=False): """Get the block args from the context and produce the block. Afterwards, set the block outputs back into the context and update @@ -649,7 +643,9 @@ def _produce_block(self, block, block_name, context, output_variables, outputs): LOGGER.debug("Producing block %s", block_name) try: produce_args = self._get_block_args(block_name, block.produce_args, context) + start = datetime.utcnow() block_outputs = block.produce(**produce_args) + elapsed = datetime.utcnow() - start outputs_dict = self._extract_outputs(block_name, block_outputs, block.produce_output) context.update(outputs_dict) @@ -662,13 +658,21 @@ def _produce_block(self, block, block_name, context, output_variables, outputs): variable_name = '{}.{}'.format(block_name, key) self._update_outputs(variable_name, output_variables, outputs, value) + if debug: + debug_info = { + "elapsed": elapsed.total_seconds(), + "input": produce_args, + "output": outputs_dict + } + return debug_info + except Exception: if self.verbose: LOGGER.exception("Exception caught producing MLBlock %s", block_name) raise - def fit(self, X=None, y=None, output_=None, start_=None, **kwargs): + def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs): """Fit the blocks of this pipeline. Sequentially call the ``fit`` and the ``produce`` methods of each block, @@ -698,6 +702,10 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs): If given, the execution of the pipeline will start on the specified block, and all the blocks before that one will be skipped. + debug (boolean): + Debug mode, if True a dictionary containing the block names as keys and + the execution time in seconds, input, output as values is returned. + **kwargs: Any additional keyword arguments will be directly added to the context dictionary and available for the blocks. @@ -725,6 +733,10 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs): if isinstance(start_, int): start_ = self._get_block_name(start_) + debug_info = None + if debug: + debug_info = defaultdict(dict) + for block_name, block in self.blocks.items(): if start_: if block_name == start_: @@ -733,10 +745,15 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs): LOGGER.debug("Skipping block %s fit", block_name) continue - self._fit_block(block, block_name, context) + out = self._fit_block(block, block_name, context, debug) + if debug: + debug_info["fit"][block_name] = out if (block_name != self._last_block_name) or (block_name in output_blocks): - self._produce_block(block, block_name, context, output_variables, outputs) + out = self._produce_block( + block, block_name, context, output_variables, outputs, debug) + if debug: + debug_info["produce"][block_name] = out # We already captured the output from this block if block_name in output_blocks: @@ -746,15 +763,23 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs): # outputs we are done. if output_variables is not None and not output_blocks: if len(outputs) > 1: - return tuple(outputs) + result = tuple(outputs) else: - return outputs[0] + result = outputs[0] + + if debug: + return result, debug_info + + return result + + if debug: + return debug_info if start_: # We skipped all the blocks up to the end raise ValueError('Unknown block name: {}'.format(start_)) - def predict(self, X=None, output_='default', start_=None, **kwargs): + def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs): """Produce predictions using the blocks of this pipeline. Sequentially call the ``produce`` method of each block, capturing the @@ -780,6 +805,10 @@ def predict(self, X=None, output_='default', start_=None, **kwargs): If given, the execution of the pipeline will start on the specified block, and all the blocks before that one will be skipped. + debug (boolean): + Debug mode, if True a dictionary containing the block names as keys and + the execution time in seconds, input, output as values is returned. + **kwargs: Any additional keyword arguments will be directly added to the context dictionary and available for the blocks. @@ -798,6 +827,10 @@ def predict(self, X=None, output_='default', start_=None, **kwargs): if isinstance(start_, int): start_ = self._get_block_name(start_) + debug_info = None + if debug: + debug_info = dict() + for block_name, block in self.blocks.items(): if start_: if block_name == start_: @@ -806,7 +839,9 @@ def predict(self, X=None, output_='default', start_=None, **kwargs): LOGGER.debug("Skipping block %s produce", block_name) continue - self._produce_block(block, block_name, context, output_variables, outputs) + out = self._produce_block(block, block_name, context, output_variables, outputs, debug) + if debug: + debug_info[block_name] = out # We already captured the output from this block if block_name in output_blocks: @@ -816,9 +851,14 @@ def predict(self, X=None, output_='default', start_=None, **kwargs): # outputs we are done. if not output_blocks: if len(outputs) > 1: - return tuple(outputs) + result = tuple(outputs) else: - return outputs[0] + result = outputs[0] + + if debug: + return result, debug_info + + return result if start_: # We skipped all the blocks up to the end diff --git a/setup.py b/setup.py index 56ab70cd..a4fcc7a3 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,6 @@ install_requires = [ - 'Keras>=2.1.6,<2.4' ] From f0cd86f2073e6e1c1a3efe6a0535458374bc597e Mon Sep 17 00:00:00 2001 From: sarahmish Date: Tue, 28 Jul 2020 23:03:53 +0300 Subject: [PATCH 3/7] update mlprimitive test version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a4fcc7a3..85020231 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ tests_require = [ 'pytest>=3.4.2', 'pytest-cov>=2.6.0', - 'mlprimitives>=0.2,<0.3', + 'mlprimitives>=0.2.4.dev0', 'setuptools>=41.0.0', 'numpy<1.17', 'rundoc>=0.4.3', From 2909c03289df305113eae94d41f779263d25f3f6 Mon Sep 17 00:00:00 2001 From: sarahmish Date: Wed, 29 Jul 2020 00:46:12 +0300 Subject: [PATCH 4/7] cap sphinx --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 85020231..4048cbbb 100644 --- a/setup.py +++ b/setup.py @@ -47,8 +47,8 @@ 'watchdog>=0.8.3', # docs - 'm2r>=0.2.0', - 'Sphinx>=1.7.1', + 'm2r>=0.2.0,<0.3', + 'Sphinx>=1.7.1,<3', 'sphinx_rtd_theme>=0.2.4', 'graphviz>=0.9', 'ipython>=6.5.0', From 22a955f47a60e778b50de752f232345e13aac64b Mon Sep 17 00:00:00 2001 From: sarahmish Date: Wed, 29 Jul 2020 01:18:21 +0300 Subject: [PATCH 5/7] cap isort --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 4048cbbb..57c623f0 100644 --- a/setup.py +++ b/setup.py @@ -57,8 +57,8 @@ 'docutils<0.15,>=0.10', # botocore incompatibility with 0.15 # style check - 'flake8>=3.5.0', - 'isort>=4.3.4', + 'flake8>=3.5.0,<3.8', + 'isort>=4.3.4<5', # fix style issues 'autoflake>=1.2', # keep this after flake8 to avoid From 444f301f641e03150490ade67604e4cc9a23703b Mon Sep 17 00:00:00 2001 From: sarahmish Date: Wed, 29 Jul 2020 01:53:42 +0300 Subject: [PATCH 6/7] cap isort (properly) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 57c623f0..c5cf4015 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ # style check 'flake8>=3.5.0,<3.8', - 'isort>=4.3.4<5', + 'isort>=4.3.4,<5', # fix style issues 'autoflake>=1.2', # keep this after flake8 to avoid From e2b6eb3e0d41717579a2949598af411ddbad1a47 Mon Sep 17 00:00:00 2001 From: sarahmish Date: Thu, 30 Jul 2020 04:15:46 +0300 Subject: [PATCH 7/7] debug dictionary passing + added debug tests --- mlblocks/mlpipeline.py | 36 +++---- tests/test_mlpipeline.py | 198 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+), 18 deletions(-) diff --git a/mlblocks/mlpipeline.py b/mlblocks/mlpipeline.py index 8e5d0629..8367b327 100644 --- a/mlblocks/mlpipeline.py +++ b/mlblocks/mlpipeline.py @@ -612,7 +612,7 @@ def _update_outputs(self, variable_name, output_variables, outputs, value): index = output_variables.index(variable_name) outputs[index] = deepcopy(value) - def _fit_block(self, block, block_name, context, debug=False): + def _fit_block(self, block, block_name, context, debug=None): """Get the block args from the context and fit the block.""" LOGGER.debug("Fitting block %s", block_name) try: @@ -621,12 +621,11 @@ def _fit_block(self, block, block_name, context, debug=False): block.fit(**fit_args) elapsed = datetime.utcnow() - start - if debug: - debug_info = { + if debug is not None: + debug["fit"][block_name] = { "elapsed": elapsed.total_seconds(), "input": fit_args } - return debug_info except Exception: if self.verbose: @@ -634,7 +633,7 @@ def _fit_block(self, block, block_name, context, debug=False): raise - def _produce_block(self, block, block_name, context, output_variables, outputs, debug=False): + def _produce_block(self, block, block_name, context, output_variables, outputs, debug=None): """Get the block args from the context and produce the block. Afterwards, set the block outputs back into the context and update @@ -658,13 +657,17 @@ def _produce_block(self, block, block_name, context, output_variables, outputs, variable_name = '{}.{}'.format(block_name, key) self._update_outputs(variable_name, output_variables, outputs, value) - if debug: - debug_info = { + if debug is not None: + record = { "elapsed": elapsed.total_seconds(), "input": produce_args, "output": outputs_dict } - return debug_info + + if "fit" in debug.keys(): + debug["produce"][block_name] = record + else: + debug[block_name] = record except Exception: if self.verbose: @@ -745,15 +748,11 @@ def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs): LOGGER.debug("Skipping block %s fit", block_name) continue - out = self._fit_block(block, block_name, context, debug) - if debug: - debug_info["fit"][block_name] = out + self._fit_block(block, block_name, context, debug_info) if (block_name != self._last_block_name) or (block_name in output_blocks): - out = self._produce_block( - block, block_name, context, output_variables, outputs, debug) - if debug: - debug_info["produce"][block_name] = out + self._produce_block( + block, block_name, context, output_variables, outputs, debug_info) # We already captured the output from this block if block_name in output_blocks: @@ -839,9 +838,7 @@ def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs) LOGGER.debug("Skipping block %s produce", block_name) continue - out = self._produce_block(block, block_name, context, output_variables, outputs, debug) - if debug: - debug_info[block_name] = out + self._produce_block(block, block_name, context, output_variables, outputs, debug_info) # We already captured the output from this block if block_name in output_blocks: @@ -860,6 +857,9 @@ def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs) return result + if debug: + return debug_info + if start_: # We skipped all the blocks up to the end raise ValueError('Unknown block name: {}'.format(start_)) diff --git a/tests/test_mlpipeline.py b/tests/test_mlpipeline.py index 340a3838..25a90edb 100644 --- a/tests/test_mlpipeline.py +++ b/tests/test_mlpipeline.py @@ -681,6 +681,204 @@ def test_get_inputs_no_fit(self): assert inputs == expected + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_fit_no_debug(self): + mlpipeline = MLPipeline(['a_primitive']) + mlpipeline.blocks['a_primitive#1'].fit_args = [ + { + 'name': 'fit_input', + 'type': 'whatever' + } + ] + + returned = mlpipeline.fit(debug=False) + + assert returned is None + + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_fit_debug(self): + mlpipeline = MLPipeline(['a_primitive']) + mlpipeline.blocks['a_primitive#1'].fit_args = [ + { + 'name': 'fit_input', + 'type': 'whatever' + } + ] + + expected_return = dict() + expected_return["fit"] = { + "a_primitive#1": { + "elapsed": 0, + "input": { + "whatever" + } + } + } + + returned = mlpipeline.fit(debug=True) + + print(returned) + assert isinstance(returned, dict) + assert set(returned.keys()) == set(expected_return.keys()) # fit / produce + assert set(returned["fit"].keys()) == set(expected_return["fit"].keys()) # block name + + for block_name, dictionary in expected_return["fit"].items(): + assert set(returned["fit"][block_name].keys()) == set(dictionary.keys()) + + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_fit_produce_debug(self): + outputs = { + 'default': [ + { + 'name': 'a_name', + 'variable': 'a_primitive#1.a_variable', + 'type': 'a_type', + } + ] + } + mlpipeline = MLPipeline(['a_primitive'], outputs=outputs) + mlpipeline.blocks['a_primitive#1'].fit_args = [ + { + 'name': 'fit_input', + 'type': 'whatever' + } + ] + + mlpipeline.blocks['a_primitive#1'].produce_args = [ + { + 'name': 'input', + 'type': 'whatever' + } + ] + + mlpipeline.blocks['a_primitive#1'].produce_output = [ + { + 'name': 'a_name', + 'type': 'a_type' + } + ] + + expected_return = dict() + expected_return["fit"] = { + "a_primitive#1": { + "elapsed": 0, + "input": { + "whatever" + } + } + } + expected_return["produce"] = { + "a_primitive#1": { + "elapsed": 0, + "input": { + "whatever" + }, + "output": { + "whatever" + } + } + } + + returned, debug_returned = mlpipeline.fit(output_='default', debug=True) + + assert len([returned]) == len(outputs["default"]) + assert isinstance(debug_returned, dict) + assert set(debug_returned.keys()) == set(expected_return.keys()) # fit / produce + assert set(debug_returned["fit"].keys()) == set(expected_return["fit"].keys()) + assert set(debug_returned["produce"].keys()) == set(expected_return["produce"].keys()) + + for block_name, dictionary in expected_return["fit"].items(): + assert set(debug_returned["fit"][block_name].keys()) == set(dictionary.keys()) + + for block_name, dictionary in expected_return["produce"].items(): + assert set(debug_returned["produce"][block_name].keys()) == set(dictionary.keys()) + + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_predict_no_debug(self): + outputs = { + 'default': [ + { + 'name': 'a_name', + 'variable': 'a_primitive#1.a_variable', + 'type': 'a_type', + }, + { + 'name': 'b_name', + 'variable': 'a_primitive#1.b_variable', + 'type': 'b_type', + }, + ] + } + mlpipeline = MLPipeline(['a_primitive'], outputs=outputs) + mlpipeline.blocks['a_primitive#1'].produce_args = [ + { + 'name': 'input', + 'type': 'whatever' + } + ] + + mlpipeline.blocks['a_primitive#1'].produce_output = [ + { + 'name': 'a_name', + 'type': 'a_type' + }, + { + 'name': 'b_name', + 'type': 'b_type' + } + ] + + returned = mlpipeline.predict(debug=False) + assert len(returned) == len(outputs["default"]) + for returned_output, expected_output in zip(returned, outputs["default"]): + assert returned_output == expected_output["variable"] + + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_predict_debug(self): + outputs = { + 'default': [ + { + 'name': 'a_name', + 'variable': 'a_primitive#1.a_variable', + 'type': 'a_type', + } + ] + } + mlpipeline = MLPipeline(['a_primitive'], outputs=outputs) + mlpipeline.blocks['a_primitive#1'].produce_args = [ + { + 'name': 'input', + 'type': 'whatever' + } + ] + + mlpipeline.blocks['a_primitive#1'].produce_output = [ + { + 'name': 'a_name', + 'type': 'a_type' + } + ] + + expected_return = dict() + expected_return = { + "a_primitive#1": { + "elapsed": 0, + "input": { + "whatever" + }, + "output": { + "whatever" + } + } + } + returned, debug_returned = mlpipeline.predict(debug=True) + assert len([returned]) == len(outputs["default"]) + assert isinstance(debug_returned, dict) + assert set(debug_returned.keys()) == set(expected_return.keys()) + + for block_name, dictionary in expected_return.items(): + assert set(debug_returned[block_name].keys()) == set(dictionary.keys()) + def test_fit(self): pass