Skip to content

Commit

Permalink
Merge pull request #128 from sarahmish/block-execution-time
Browse files Browse the repository at this point in the history
Record block execution time
  • Loading branch information
csala authored Sep 16, 2020
2 parents 7337ee1 + 5e44993 commit dbb6477
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 17 deletions.
79 changes: 68 additions & 11 deletions mlblocks/mlpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import warnings
from collections import Counter, OrderedDict, defaultdict
from copy import deepcopy
from datetime import datetime

import numpy as np
from graphviz import Digraph
Expand Down Expand Up @@ -610,19 +611,28 @@ def _update_outputs(self, variable_name, output_variables, outputs, value):
index = output_variables.index(variable_name)
outputs[index] = deepcopy(value)

def _fit_block(self, block, block_name, context):
def _fit_block(self, block, block_name, context, debug=None):
"""Get the block args from the context and fit the block."""
LOGGER.debug("Fitting block %s", block_name)
try:
fit_args = self._get_block_args(block_name, block.fit_args, context)
start = datetime.utcnow()
block.fit(**fit_args)
elapsed = datetime.utcnow() - start

if debug is not None:
debug["fit"][block_name] = {
"elapsed": elapsed.total_seconds(),
"input": fit_args
}

except Exception:
if self.verbose:
LOGGER.exception("Exception caught fitting MLBlock %s", block_name)

raise

def _produce_block(self, block, block_name, context, output_variables, outputs):
def _produce_block(self, block, block_name, context, output_variables, outputs, debug=None):
"""Get the block args from the context and produce the block.
Afterwards, set the block outputs back into the context and update
Expand All @@ -631,7 +641,9 @@ def _produce_block(self, block, block_name, context, output_variables, outputs):
LOGGER.debug("Producing block %s", block_name)
try:
produce_args = self._get_block_args(block_name, block.produce_args, context)
start = datetime.utcnow()
block_outputs = block.produce(**produce_args)
elapsed = datetime.utcnow() - start

outputs_dict = self._extract_outputs(block_name, block_outputs, block.produce_output)
context.update(outputs_dict)
Expand All @@ -644,13 +656,25 @@ def _produce_block(self, block, block_name, context, output_variables, outputs):
variable_name = '{}.{}'.format(block_name, key)
self._update_outputs(variable_name, output_variables, outputs, value)

if debug is not None:
record = {
"elapsed": elapsed.total_seconds(),
"input": produce_args,
"output": outputs_dict
}

if "fit" in debug.keys():
debug["produce"][block_name] = record
else:
debug[block_name] = record

except Exception:
if self.verbose:
LOGGER.exception("Exception caught producing MLBlock %s", block_name)

raise

def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs):
"""Fit the blocks of this pipeline.
Sequentially call the ``fit`` and the ``produce`` methods of each block,
Expand Down Expand Up @@ -680,6 +704,10 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
If given, the execution of the pipeline will start on the specified block,
and all the blocks before that one will be skipped.
debug (boolean):
Debug mode, if True a dictionary containing the block names as keys and
the execution time in seconds, input, output as values is returned.
**kwargs:
Any additional keyword arguments will be directly added
to the context dictionary and available for the blocks.
Expand Down Expand Up @@ -707,6 +735,10 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
if isinstance(start_, int):
start_ = self._get_block_name(start_)

debug_info = None
if debug:
debug_info = defaultdict(dict)

for block_name, block in self.blocks.items():
if start_:
if block_name == start_:
Expand All @@ -715,10 +747,11 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
LOGGER.debug("Skipping block %s fit", block_name)
continue

self._fit_block(block, block_name, context)
self._fit_block(block, block_name, context, debug_info)

if (block_name != self._last_block_name) or (block_name in output_blocks):
self._produce_block(block, block_name, context, output_variables, outputs)
self._produce_block(
block, block_name, context, output_variables, outputs, debug_info)

# We already captured the output from this block
if block_name in output_blocks:
Expand All @@ -728,15 +761,23 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
# outputs we are done.
if output_variables is not None and not output_blocks:
if len(outputs) > 1:
return tuple(outputs)
result = tuple(outputs)
else:
return outputs[0]
result = outputs[0]

if debug:
return result, debug_info

return result

if debug:
return debug_info

if start_:
# We skipped all the blocks up to the end
raise ValueError('Unknown block name: {}'.format(start_))

def predict(self, X=None, output_='default', start_=None, **kwargs):
def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs):
"""Produce predictions using the blocks of this pipeline.
Sequentially call the ``produce`` method of each block, capturing the
Expand All @@ -762,6 +803,10 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
If given, the execution of the pipeline will start on the specified block,
and all the blocks before that one will be skipped.
debug (boolean):
Debug mode, if True a dictionary containing the block names as keys and
the execution time in seconds, input, output as values is returned.
**kwargs:
Any additional keyword arguments will be directly added
to the context dictionary and available for the blocks.
Expand All @@ -780,6 +825,10 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
if isinstance(start_, int):
start_ = self._get_block_name(start_)

debug_info = None
if debug:
debug_info = dict()

for block_name, block in self.blocks.items():
if start_:
if block_name == start_:
Expand All @@ -788,7 +837,7 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
LOGGER.debug("Skipping block %s produce", block_name)
continue

self._produce_block(block, block_name, context, output_variables, outputs)
self._produce_block(block, block_name, context, output_variables, outputs, debug_info)

# We already captured the output from this block
if block_name in output_blocks:
Expand All @@ -798,9 +847,17 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
# outputs we are done.
if not output_blocks:
if len(outputs) > 1:
return tuple(outputs)
result = tuple(outputs)
else:
return outputs[0]
result = outputs[0]

if debug:
return result, debug_info

return result

if debug:
return debug_info

if start_:
# We skipped all the blocks up to the end
Expand Down
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
tests_require = [
'pytest>=3.4.2',
'pytest-cov>=2.6.0',
'mlprimitives>=0.2,<0.3',
'mlprimitives>=0.2.4.dev0',
'setuptools>=41.0.0',
'numpy<1.17',
'rundoc>=0.4.3',
Expand All @@ -48,17 +48,17 @@
'watchdog>=0.8.3',

# docs
'm2r>=0.2.0',
'Sphinx>=1.7.1',
'm2r>=0.2.0,<0.3',
'Sphinx>=1.7.1,<3',
'sphinx_rtd_theme>=0.2.4',
'ipython>=6.5.0',
'matplotlib>=2.2.3',
'autodocsumm>=0.1.10',
'docutils<0.15,>=0.10', # botocore incompatibility with 0.15

# style check
'flake8>=3.5.0',
'isort>=4.3.4',
'flake8>=3.5.0,<3.8',
'isort>=4.3.4,<5',

# fix style issues
'autoflake>=1.2', # keep this after flake8 to avoid
Expand Down
Loading

0 comments on commit dbb6477

Please sign in to comment.