Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record block execution time #128

Merged
merged 8 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions mlblocks/mlpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import warnings
from collections import Counter, OrderedDict, defaultdict
from copy import deepcopy
from datetime import datetime

import numpy as np
from graphviz import Digraph
Expand Down Expand Up @@ -610,19 +611,28 @@ def _update_outputs(self, variable_name, output_variables, outputs, value):
index = output_variables.index(variable_name)
outputs[index] = deepcopy(value)

def _fit_block(self, block, block_name, context):
def _fit_block(self, block, block_name, context, debug=None):
"""Get the block args from the context and fit the block."""
LOGGER.debug("Fitting block %s", block_name)
try:
fit_args = self._get_block_args(block_name, block.fit_args, context)
start = datetime.utcnow()
block.fit(**fit_args)
elapsed = datetime.utcnow() - start

if debug is not None:
debug["fit"][block_name] = {
"elapsed": elapsed.total_seconds(),
"input": fit_args
}

except Exception:
if self.verbose:
LOGGER.exception("Exception caught fitting MLBlock %s", block_name)

raise

def _produce_block(self, block, block_name, context, output_variables, outputs):
def _produce_block(self, block, block_name, context, output_variables, outputs, debug=None):
"""Get the block args from the context and produce the block.

Afterwards, set the block outputs back into the context and update
Expand All @@ -631,7 +641,9 @@ def _produce_block(self, block, block_name, context, output_variables, outputs):
LOGGER.debug("Producing block %s", block_name)
try:
produce_args = self._get_block_args(block_name, block.produce_args, context)
start = datetime.utcnow()
block_outputs = block.produce(**produce_args)
elapsed = datetime.utcnow() - start

outputs_dict = self._extract_outputs(block_name, block_outputs, block.produce_output)
context.update(outputs_dict)
Expand All @@ -644,13 +656,25 @@ def _produce_block(self, block, block_name, context, output_variables, outputs):
variable_name = '{}.{}'.format(block_name, key)
self._update_outputs(variable_name, output_variables, outputs, value)

if debug is not None:
record = {
"elapsed": elapsed.total_seconds(),
"input": produce_args,
"output": outputs_dict
}

if "fit" in debug.keys():
debug["produce"][block_name] = record
else:
debug[block_name] = record

except Exception:
if self.verbose:
LOGGER.exception("Exception caught producing MLBlock %s", block_name)

raise

def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
def fit(self, X=None, y=None, output_=None, start_=None, debug=False, **kwargs):
"""Fit the blocks of this pipeline.

Sequentially call the ``fit`` and the ``produce`` methods of each block,
Expand Down Expand Up @@ -680,6 +704,10 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
If given, the execution of the pipeline will start on the specified block,
and all the blocks before that one will be skipped.

debug (boolean):
Debug mode, if True a dictionary containing the block names as keys and
the execution time in seconds, input, output as values is returned.

**kwargs:
Any additional keyword arguments will be directly added
to the context dictionary and available for the blocks.
Expand Down Expand Up @@ -707,6 +735,10 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
if isinstance(start_, int):
start_ = self._get_block_name(start_)

debug_info = None
if debug:
debug_info = defaultdict(dict)

for block_name, block in self.blocks.items():
if start_:
if block_name == start_:
Expand All @@ -715,10 +747,11 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
LOGGER.debug("Skipping block %s fit", block_name)
continue

self._fit_block(block, block_name, context)
self._fit_block(block, block_name, context, debug_info)

if (block_name != self._last_block_name) or (block_name in output_blocks):
self._produce_block(block, block_name, context, output_variables, outputs)
self._produce_block(
block, block_name, context, output_variables, outputs, debug_info)

# We already captured the output from this block
if block_name in output_blocks:
Expand All @@ -728,15 +761,23 @@ def fit(self, X=None, y=None, output_=None, start_=None, **kwargs):
# outputs we are done.
if output_variables is not None and not output_blocks:
if len(outputs) > 1:
return tuple(outputs)
result = tuple(outputs)
else:
return outputs[0]
result = outputs[0]

if debug:
return result, debug_info

return result

if debug:
return debug_info

if start_:
# We skipped all the blocks up to the end
raise ValueError('Unknown block name: {}'.format(start_))

def predict(self, X=None, output_='default', start_=None, **kwargs):
def predict(self, X=None, output_='default', start_=None, debug=False, **kwargs):
"""Produce predictions using the blocks of this pipeline.

Sequentially call the ``produce`` method of each block, capturing the
Expand All @@ -762,6 +803,10 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
If given, the execution of the pipeline will start on the specified block,
and all the blocks before that one will be skipped.

debug (boolean):
Debug mode, if True a dictionary containing the block names as keys and
the execution time in seconds, input, output as values is returned.

**kwargs:
Any additional keyword arguments will be directly added
to the context dictionary and available for the blocks.
Expand All @@ -780,6 +825,10 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
if isinstance(start_, int):
start_ = self._get_block_name(start_)

debug_info = None
if debug:
debug_info = dict()

for block_name, block in self.blocks.items():
if start_:
if block_name == start_:
Expand All @@ -788,7 +837,7 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
LOGGER.debug("Skipping block %s produce", block_name)
continue

self._produce_block(block, block_name, context, output_variables, outputs)
self._produce_block(block, block_name, context, output_variables, outputs, debug_info)

# We already captured the output from this block
if block_name in output_blocks:
Expand All @@ -798,9 +847,17 @@ def predict(self, X=None, output_='default', start_=None, **kwargs):
# outputs we are done.
if not output_blocks:
if len(outputs) > 1:
return tuple(outputs)
result = tuple(outputs)
else:
return outputs[0]
result = outputs[0]

if debug:
return result, debug_info

return result

if debug:
return debug_info

if start_:
# We skipped all the blocks up to the end
Expand Down
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
tests_require = [
'pytest>=3.4.2',
'pytest-cov>=2.6.0',
'mlprimitives>=0.2,<0.3',
'mlprimitives>=0.2.4.dev0',
'setuptools>=41.0.0',
'numpy<1.17',
'rundoc>=0.4.3',
Expand All @@ -48,17 +48,17 @@
'watchdog>=0.8.3',

# docs
'm2r>=0.2.0',
'Sphinx>=1.7.1',
'm2r>=0.2.0,<0.3',
'Sphinx>=1.7.1,<3',
'sphinx_rtd_theme>=0.2.4',
'ipython>=6.5.0',
'matplotlib>=2.2.3',
'autodocsumm>=0.1.10',
'docutils<0.15,>=0.10', # botocore incompatibility with 0.15

# style check
'flake8>=3.5.0',
'isort>=4.3.4',
'flake8>=3.5.0,<3.8',
'isort>=4.3.4,<5',

# fix style issues
'autoflake>=1.2', # keep this after flake8 to avoid
Expand Down
Loading