diff --git a/docs/advanced_usage/pipelines.rst b/docs/advanced_usage/pipelines.rst index e87a0067..07b36c98 100644 --- a/docs/advanced_usage/pipelines.rst +++ b/docs/advanced_usage/pipelines.rst @@ -423,6 +423,42 @@ An example of this situation, where we want to reuse the output of the first blo predictions = pipeline.predict(X_test) score = compute_score(y_test, predictions) +Pipeline debugging +------------------ + +Sometimes we might be interested in debugging a pipeline execution and obtain information +about the time, the memory usage, the inputs and outputs that each step takes. This is possible +by using the argument ``debug`` with the method ``fit`` and ``predict``. This argument allows us +to retrieve critical information from the pipeline execution: + +* ``Time``: Elapsed time for the primitive and the given stage (fit or predict). +* ``Memory``: Amount of memory increase or decrease for the given primitive for that pipeline. +* ``Input``: The input values that the primitive takes for that specific step. +* ``Output``: The output produced by the primitive. + + +If the ``debug`` argument is set to ``True`` then a dictionary will be returned containing all the +elements listed previously:: + + result, debug_info = pipeline.fit(X_train, y_train, debug=True) + +In case you want to retrieve only some of the elements listed above and skip the rest, you can +pass an ``str`` to the ``debug`` argument with any combination of the following characters: + +* ``i``: To include inputs. +* ``o``: To include outputs. +* ``m``: To include used memory. +* ``t``: To include elapsed time. + +For example, if we are only interested on capturing the elapsed time and used memory during the +``fit`` process, we can call the method as follows:: + + result, debug_info = pipeline.fit(X_train, y_train, debug='tm') + +.. warning:: Bear in mind that if we use ``debug=True`` or saving the ``Input`` and ``Output``, + this will consume extra memory ram as it will create copies of the input data and + the output data for each primitive. For profiling it is recommended using the option + ``tm`` as shown in the previous example. .. _API Reference: ../api_reference.html .. _primitives: ../primitives.html diff --git a/mlblocks/mlpipeline.py b/mlblocks/mlpipeline.py index 6e0744bd..a4111bcb 100644 --- a/mlblocks/mlpipeline.py +++ b/mlblocks/mlpipeline.py @@ -4,6 +4,7 @@ import json import logging +import os import re import warnings from collections import Counter, OrderedDict, defaultdict @@ -11,6 +12,7 @@ from datetime import datetime import numpy as np +import psutil from graphviz import Digraph from mlblocks.discovery import load_pipeline @@ -110,14 +112,14 @@ def _build_blocks(self): if not block_params: block_params = self.init_params.get(primitive_name, dict()) if block_params and block_count > 1: - LOGGER.warning(("Non-numbered init_params are being used " - "for more than one block %s."), primitive_name) + LOGGER.warning(('Non-numbered init_params are being used ' + 'for more than one block %s.'), primitive_name) block = MLBlock(primitive, **block_params) blocks[block_name] = block except Exception: - LOGGER.exception("Exception caught building MLBlock %s", primitive) + LOGGER.exception('Exception caught building MLBlock %s', primitive) raise return blocks @@ -475,8 +477,8 @@ def _sanitize(cls, hyperparameters): is a dict containing a complete hyperparameter specification for that block:: { - "block_name": { - "hyperparameter_name": "hyperparameter_value", + 'block_name': { + 'hyperparameter_name': 'hyperparameter_value', ... }, ... @@ -487,7 +489,7 @@ def _sanitize(cls, hyperparameters): second one:: { - ("block_name", "hyperparameter_name"): "hyperparameter_value", + ('block_name', 'hyperparameter_name'): 'hyperparameter_value', ... } @@ -611,39 +613,52 @@ def _update_outputs(self, variable_name, output_variables, outputs, value): index = output_variables.index(variable_name) outputs[index] = deepcopy(value) - def _fit_block(self, block, block_name, context, debug=None): + def _fit_block(self, block, block_name, context, debug_info=None): """Get the block args from the context and fit the block.""" - LOGGER.debug("Fitting block %s", block_name) + LOGGER.debug('Fitting block %s', block_name) try: fit_args = self._get_block_args(block_name, block.fit_args, context) + process = psutil.Process(os.getpid()) + memory_before = process.memory_info().rss start = datetime.utcnow() block.fit(**fit_args) elapsed = datetime.utcnow() - start + memory_after = process.memory_info().rss - if debug is not None: - debug["fit"][block_name] = { - "elapsed": elapsed.total_seconds(), - "input": fit_args - } + if debug_info is not None: + debug = debug_info['debug'] + record = {} + if 't' in debug: + record['time'] = elapsed.total_seconds() + if 'm' in debug: + record['memory'] = memory_after - memory_before + if 'i' in debug: + record['input'] = deepcopy(fit_args) + + debug_info['fit'][block_name] = record except Exception: if self.verbose: - LOGGER.exception("Exception caught fitting MLBlock %s", block_name) + LOGGER.exception('Exception caught fitting MLBlock %s', block_name) raise - def _produce_block(self, block, block_name, context, output_variables, outputs, debug=None): + def _produce_block(self, block, block_name, context, output_variables, + outputs, debug_info=None): """Get the block args from the context and produce the block. Afterwards, set the block outputs back into the context and update the outputs list if necessary. """ - LOGGER.debug("Producing block %s", block_name) + LOGGER.debug('Producing block %s', block_name) try: produce_args = self._get_block_args(block_name, block.produce_args, context) + process = psutil.Process(os.getpid()) + memory_before = process.memory_info().rss start = datetime.utcnow() block_outputs = block.produce(**produce_args) elapsed = datetime.utcnow() - start + memory_after = process.memory_info().rss outputs_dict = self._extract_outputs(block_name, block_outputs, block.produce_output) context.update(outputs_dict) @@ -656,21 +671,23 @@ def _produce_block(self, block, block_name, context, output_variables, outputs, variable_name = '{}.{}'.format(block_name, key) self._update_outputs(variable_name, output_variables, outputs, value) - if debug is not None: - record = { - "elapsed": elapsed.total_seconds(), - "input": produce_args, - "output": outputs_dict - } + if debug_info is not None: + debug = debug_info['debug'] + record = {} + if 't' in debug: + record['time'] = elapsed.total_seconds() + if 'm' in debug: + record['memory'] = memory_after - memory_before + if 'i' in debug: + record['input'] = deepcopy(produce_args) + if 'o' in debug: + record['output'] = deepcopy(outputs_dict) - if "fit" in debug.keys(): - debug["produce"][block_name] = record - else: - debug[block_name] = record + debug_info['produce'][block_name] = record except Exception: if self.verbose: - LOGGER.exception("Exception caught producing MLBlock %s", block_name) + LOGGER.exception('Exception caught producing MLBlock %s', block_name) raise @@ -692,21 +709,31 @@ def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs): y: Fit Data labels, which the pipeline will use to learn how to behave. - output_ (str or int or list or None): Output specification, as required by ``get_outputs``. If ``None`` is given, nothing will be returned. - start_ (str or int or None): Block index or block name to start processing from. The value can either be an integer, which will be interpreted as a block index, or the name of a block, including the conter number at the end. If given, the execution of the pipeline will start on the specified block, and all the blocks before that one will be skipped. - - debug (boolean): - Debug mode, if True a dictionary containing the block names as keys and - the execution time in seconds, input, output as values is returned. + debug (bool or str): + Debug a pipeline with the following options: + + * ``t``: + Elapsed time for the primitive and the given stage (fit or predict). + * ``m``: + Amount of memory incrase (or decrease) for the primitive. This amount + is represented in bytes. + * ``i``: + The input values that the primitive takes for that step. + * ``o``: + The output values that the primitive generates. + + If provided, return a dictionary with the ``fit`` and ``predict`` performance. + This argument can be a string containing a combination of the letters listed above, + or ``True`` which will return a complete debug. **kwargs: Any additional keyword arguments will be directly added @@ -738,13 +765,14 @@ def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs): debug_info = None if debug: debug_info = defaultdict(dict) + debug_info['debug'] = debug.lower() if isinstance(debug, str) else 'tmio' for block_name, block in self.blocks.items(): if start_: if block_name == start_: start_ = False else: - LOGGER.debug("Skipping block %s fit", block_name) + LOGGER.debug('Skipping block %s fit', block_name) continue self._fit_block(block, block_name, context, debug_info) @@ -770,13 +798,13 @@ def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs): return result - if debug: - return debug_info - if start_: # We skipped all the blocks up to the end raise ValueError('Unknown block name: {}'.format(start_)) + if debug: + return debug_info + def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs): """Produce predictions using the blocks of this pipeline. @@ -791,21 +819,31 @@ def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs) Args: X: Data which the pipeline will use to make predictions. - output_ (str or int or list or None): Output specification, as required by ``get_outputs``. If not specified the ``default`` output will be returned. - start_ (str or int or None): Block index or block name to start processing from. The value can either be an integer, which will be interpreted as a block index, or the name of a block, including the conter number at the end. If given, the execution of the pipeline will start on the specified block, and all the blocks before that one will be skipped. - - debug (boolean): - Debug mode, if True a dictionary containing the block names as keys and - the execution time in seconds, input, output as values is returned. + debug (bool or str): + Debug a pipeline with the following options: + + * ``t``: + Elapsed time for the primitive and the given stage (fit or predict). + * ``m``: + Amount of memory incrase (or decrease) for the primitive. This amount + is represented in bytes. + * ``i``: + The input values that the primitive takes for that step. + * ``o``: + The output values that the primitive generates. + + If ``True`` then a dictionary will be returned containing all the elements listed + previously. If a ``string`` value with the combination of letters is given for + each option, it will return a dictionary with the selected elements. **kwargs: Any additional keyword arguments will be directly added @@ -815,6 +853,9 @@ def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs) object or tuple: * If a single output is requested, it is returned alone. * If multiple outputs have been requested, a tuple is returned. + * If ``debug`` is given, a tupple will be returned where the first element + returned are the predictions and the second a dictionary containing the debug + information. """ context = kwargs.copy() if X is not None: @@ -827,14 +868,15 @@ def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs) debug_info = None if debug: - debug_info = dict() + debug_info = defaultdict(dict) + debug_info['debug'] = debug.lower() if isinstance(debug, str) else 'tmio' for block_name, block in self.blocks.items(): if start_: if block_name == start_: start_ = False else: - LOGGER.debug("Skipping block %s produce", block_name) + LOGGER.debug('Skipping block %s produce', block_name) continue self._produce_block(block, block_name, context, output_variables, outputs, debug_info) @@ -856,9 +898,6 @@ def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs) return result - if debug: - return debug_info - if start_: # We skipped all the blocks up to the end raise ValueError('Unknown block name: {}'.format(start_)) @@ -871,32 +910,32 @@ def to_dict(self): specification of the tunable_hyperparameters:: { - "primitives": [ - "a_primitive", - "another_primitive" + 'primitives': [ + 'a_primitive', + 'another_primitive' ], - "init_params": { - "a_primitive": { - "an_argument": "a_value" + 'init_params': { + 'a_primitive': { + 'an_argument': 'a_value' } }, - "hyperparameters": { - "a_primitive#1": { - "an_argument": "a_value", - "another_argument": "another_value", + 'hyperparameters': { + 'a_primitive#1': { + 'an_argument': 'a_value', + 'another_argument': 'another_value', }, - "another_primitive#1": { - "yet_another_argument": "yet_another_value" + 'another_primitive#1': { + 'yet_another_argument': 'yet_another_value' } }, - "tunable_hyperparameters": { - "another_primitive#1": { - "yet_another_argument": { - "type": "str", - "default": "a_default_value", - "values": [ - "a_default_value", - "yet_another_value" + 'tunable_hyperparameters': { + 'another_primitive#1': { + 'yet_another_argument': { + 'type': 'str', + 'default': 'a_default_value', + 'values': [ + 'a_default_value', + 'yet_another_value' ] } } @@ -926,8 +965,8 @@ def _get_simple_block_name(self, block_name): str: block name stripped of number and other modifiers. """ - full_name = block_name.split("#")[0] - simple_name = full_name.split(".")[-1] + full_name = block_name.split('#')[0] + simple_name = full_name.split('.')[-1] return simple_name def _get_context_name_from_variable(self, variable_name): @@ -942,12 +981,12 @@ def _get_context_name_from_variable(self, variable_name): str: Name of the context of the variable. """ - block_name = variable_name.split("#")[0] + block_name = variable_name.split('#')[0] rest = variable_name[len(block_name) + 1:] - block_index = rest.split(".")[0] + block_index = rest.split('.')[0] context_name = rest[len(block_index) + 1:] if len(context_name) == 0: - raise ValueError("Invalid variable name") + raise ValueError('Invalid variable name') return context_name def _get_relevant_output_variables(self, block_name, block, current_output_variables): @@ -1107,7 +1146,7 @@ def _make_diagram_inputs(self, diagram, input_variables_blocks): Dictionary of input variables of the pipeline and the set of tuples of blocks into which the variable connects and the type of arrowhead to use """ - with diagram.subgraph(name="cluster_inputs") as cluster: + with diagram.subgraph(name='cluster_inputs') as cluster: cluster.attr(tooltip='Input variables') cluster.attr('graph', rank='source', bgcolor='azure3', penwidth='0') cluster.attr('node', penwidth='0', fontsize='20') @@ -1148,7 +1187,7 @@ def _make_diagram_outputs(self, diagram, outputs): output_variables = [] outputs_vars = self.get_outputs(outputs) - with diagram.subgraph(name="cluster_outputs") as cluster: + with diagram.subgraph(name='cluster_outputs') as cluster: cluster.attr(tooltip='Output variables') cluster.attr('graph', rank='source', bgcolor='azure3', penwidth='0') cluster.attr('node', penwidth='0', fontsize='20') diff --git a/setup.py b/setup.py index 4ff3a675..d76236ae 100644 --- a/setup.py +++ b/setup.py @@ -17,12 +17,13 @@ install_requires = [ 'graphviz>=0.9,<1', 'numpy>=1.17.1,<1.19', + 'psutil>=5,<6', ] examples_require = [ 'matplotlib>=2.2.2,<3.2.2', - 'mlprimitives>=0.2.6.dev0,<0.3', + 'mlprimitives>=0.3.0.dev0,<0.4', 'boto3>=1.14,<1.14.45', 'botocore<1.17.45,>=1.17.44', 'jupyter==1.0.0', @@ -34,7 +35,7 @@ tests_require = [ 'pytest>=3.4.2', 'pytest-cov>=2.6.0', - 'mlprimitives>=0.2.6.dev0,<0.3', + 'mlprimitives>=0.3.0.dev0,<0.4', 'setuptools>=41.0.0', 'rundoc>=0.4.3', 'prompt-toolkit>=2.0,<3.0', diff --git a/tests/test_mlpipeline.py b/tests/test_mlpipeline.py index 59e11633..97c59cd0 100644 --- a/tests/test_mlpipeline.py +++ b/tests/test_mlpipeline.py @@ -696,7 +696,7 @@ def test_fit_no_debug(self): assert returned is None @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) - def test_fit_debug(self): + def test_fit_debug_bool(self): mlpipeline = MLPipeline(['a_primitive']) mlpipeline.blocks['a_primitive#1'].fit_args = [ { @@ -706,24 +706,53 @@ def test_fit_debug(self): ] expected_return = dict() - expected_return["fit"] = { - "a_primitive#1": { - "elapsed": 0, - "input": { - "whatever" - } + expected_return['debug'] = 'tmio' + expected_return['fit'] = { + 'a_primitive#1': { + 'time': 0, + 'input': { + 'whatever' + }, + 'memory': 0, } } returned = mlpipeline.fit(debug=True) - print(returned) assert isinstance(returned, dict) assert set(returned.keys()) == set(expected_return.keys()) # fit / produce - assert set(returned["fit"].keys()) == set(expected_return["fit"].keys()) # block name + assert set(returned['fit'].keys()) == set(expected_return['fit'].keys()) # block name + + for block_name, dictionary in expected_return['fit'].items(): + assert set(returned['fit'][block_name].keys()) == set(dictionary.keys()) + + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_fit_debug_str(self): + mlpipeline = MLPipeline(['a_primitive']) + mlpipeline.blocks['a_primitive#1'].fit_args = [ + { + 'name': 'fit_input', + 'type': 'whatever' + } + ] + + expected_return = dict() + expected_return['debug'] = 'tm' + expected_return['fit'] = { + 'a_primitive#1': { + 'time': 0, + 'memory': 0, + } + } + + returned = mlpipeline.fit(debug='tm') + + assert isinstance(returned, dict) + assert set(returned.keys()) == set(expected_return.keys()) # fit / produce + assert set(returned['fit'].keys()) == set(expected_return['fit'].keys()) # block name - for block_name, dictionary in expected_return["fit"].items(): - assert set(returned["fit"][block_name].keys()) == set(dictionary.keys()) + for block_name, dictionary in expected_return['fit'].items(): + assert set(returned['fit'][block_name].keys()) == set(dictionary.keys()) @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) def test_fit_produce_debug(self): @@ -759,39 +788,104 @@ def test_fit_produce_debug(self): ] expected_return = dict() - expected_return["fit"] = { - "a_primitive#1": { - "elapsed": 0, - "input": { - "whatever" - } + expected_return['debug'] = 'tmio' + expected_return['fit'] = { + 'a_primitive#1': { + 'time': 0, + 'input': { + 'whatever' + }, + 'memory': 0, } } - expected_return["produce"] = { - "a_primitive#1": { - "elapsed": 0, - "input": { - "whatever" + expected_return['produce'] = { + 'a_primitive#1': { + 'time': 0, + 'input': { + 'whatever' }, - "output": { - "whatever" - } + 'output': { + 'whatever' + }, + 'memory': 0, } } returned, debug_returned = mlpipeline.fit(output_='default', debug=True) - assert len([returned]) == len(outputs["default"]) + assert len([returned]) == len(outputs['default']) + assert isinstance(debug_returned, dict) + assert set(debug_returned.keys()) == set(expected_return.keys()) # fit / produce + assert set(debug_returned['fit'].keys()) == set(expected_return['fit'].keys()) + assert set(debug_returned['produce'].keys()) == set(expected_return['produce'].keys()) + + for block_name, dictionary in expected_return['fit'].items(): + assert set(debug_returned['fit'][block_name].keys()) == set(dictionary.keys()) + + for block_name, dictionary in expected_return['produce'].items(): + assert set(debug_returned['produce'][block_name].keys()) == set(dictionary.keys()) + + @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) + def test_fit_produce_debug_str(self): + outputs = { + 'default': [ + { + 'name': 'a_name', + 'variable': 'a_primitive#1.a_variable', + 'type': 'a_type', + } + ] + } + mlpipeline = MLPipeline(['a_primitive'], outputs=outputs) + mlpipeline.blocks['a_primitive#1'].fit_args = [ + { + 'name': 'fit_input', + 'type': 'whatever' + } + ] + + mlpipeline.blocks['a_primitive#1'].produce_args = [ + { + 'name': 'input', + 'type': 'whatever' + } + ] + + mlpipeline.blocks['a_primitive#1'].produce_output = [ + { + 'name': 'a_name', + 'type': 'a_type' + } + ] + + expected_return = dict() + expected_return['debug'] = 'tm' + expected_return['fit'] = { + 'a_primitive#1': { + 'time': 0, + 'memory': 0, + } + } + expected_return['produce'] = { + 'a_primitive#1': { + 'time': 0, + 'memory': 0, + } + } + + returned, debug_returned = mlpipeline.fit(output_='default', debug='tm') + + assert len([returned]) == len(outputs['default']) assert isinstance(debug_returned, dict) assert set(debug_returned.keys()) == set(expected_return.keys()) # fit / produce - assert set(debug_returned["fit"].keys()) == set(expected_return["fit"].keys()) - assert set(debug_returned["produce"].keys()) == set(expected_return["produce"].keys()) + assert set(debug_returned['fit'].keys()) == set(expected_return['fit'].keys()) + assert set(debug_returned['produce'].keys()) == set(expected_return['produce'].keys()) - for block_name, dictionary in expected_return["fit"].items(): - assert set(debug_returned["fit"][block_name].keys()) == set(dictionary.keys()) + for block_name, dictionary in expected_return['fit'].items(): + assert set(debug_returned['fit'][block_name].keys()) == set(dictionary.keys()) - for block_name, dictionary in expected_return["produce"].items(): - assert set(debug_returned["produce"][block_name].keys()) == set(dictionary.keys()) + for block_name, dictionary in expected_return['produce'].items(): + assert set(debug_returned['produce'][block_name].keys()) == set(dictionary.keys()) @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) def test_predict_no_debug(self): @@ -829,9 +923,9 @@ def test_predict_no_debug(self): ] returned = mlpipeline.predict(debug=False) - assert len(returned) == len(outputs["default"]) - for returned_output, expected_output in zip(returned, outputs["default"]): - assert returned_output == expected_output["variable"] + assert len(returned) == len(outputs['default']) + for returned_output, expected_output in zip(returned, outputs['default']): + assert returned_output == expected_output['variable'] @patch('mlblocks.mlpipeline.MLBlock', new=get_mlblock_mock) def test_predict_debug(self): @@ -861,18 +955,22 @@ def test_predict_debug(self): expected_return = dict() expected_return = { - "a_primitive#1": { - "elapsed": 0, - "input": { - "whatever" + 'a_primitive#1': { + 'time': 0, + 'input': { + 'whatever' }, - "output": { - "whatever" - } + 'output': { + 'whatever' + }, + 'memory': 0 } } + returned, debug_returned = mlpipeline.predict(debug=True) - assert len([returned]) == len(outputs["default"]) + debug_returned = debug_returned['produce'] + + assert len([returned]) == len(outputs['default']) assert isinstance(debug_returned, dict) assert set(debug_returned.keys()) == set(expected_return.keys())