Skip to content

Commit

Permalink
Remove expensive plugin initialization code
Browse files Browse the repository at this point in the history
This change does some code cleanup on Beholder to fix #1107, where invoking
TensorFlow routines caused a nontrivial amount of GPU to be reserved.
  • Loading branch information
jart committed May 17, 2018
1 parent 0a8aaac commit 1e49774
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 126 deletions.
4 changes: 3 additions & 1 deletion tensorboard/plugins/beholder/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ py_library(
":shared_config",
"//tensorboard:expect_numpy_installed",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard:util",
"//tensorboard/backend:http_util",
"//tensorboard/backend/event_processing:plugin_asset_util",
"//tensorboard/plugins:base_plugin",
Expand Down Expand Up @@ -81,12 +82,13 @@ py_library(
py_library(
name = "im_util",
srcs = ["im_util.py"],
data = ["resources"],
data = ["resources"], # TODO(jart): Don't reference directory.
srcs_version = "PY2AND3",
deps = [
":colormaps",
"//tensorboard:expect_numpy_installed",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard:util",
],
)

Expand Down
45 changes: 25 additions & 20 deletions tensorboard/plugins/beholder/beholder_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
from google.protobuf import message
from werkzeug import wrappers

from tensorboard import util
from tensorboard.backend import http_util
from tensorboard.backend.event_processing import plugin_asset_util as pau
from tensorboard.plugins import base_plugin
from tensorboard.plugins.beholder import file_system_tools
from tensorboard.plugins.beholder import im_util
from tensorboard.plugins.beholder import shared_config

DEFAULT_INFO = [{
'name': 'Waiting for data...',
}]


class BeholderPlugin(base_plugin.TBPlugin):
"""
Expand All @@ -40,15 +45,14 @@ class BeholderPlugin(base_plugin.TBPlugin):
plugin_name = shared_config.PLUGIN_NAME

def __init__(self, context):
self._lock = threading.Lock()
self._MULTIPLEXER = context.multiplexer
self.PLUGIN_LOGDIR = pau.PluginDirectory(
context.logdir, shared_config.PLUGIN_NAME)
self.FPS = 10
self.most_recent_frame = im_util.get_image_relative_to_script('no-data.png')
self.most_recent_info = [{
'name': 'Waiting for data...',
}]
self._config_file_lock = threading.Lock()
self.most_recent_frame = None
self.most_recent_info = DEFAULT_INFO

def get_plugin_apps(self):
return {
Expand Down Expand Up @@ -96,18 +100,18 @@ def _serve_is_active(self, request):
}
return http_util.Respond(request, response, 'application/json')


def _fetch_current_frame(self):
path = '{}/{}'.format(self.PLUGIN_LOGDIR, shared_config.SUMMARY_FILENAME)

try:
frame = file_system_tools.read_tensor_summary(path).astype(np.uint8)
self.most_recent_frame = frame
return frame

except (message.DecodeError, IOError, tf.errors.NotFoundError):
return self.most_recent_frame

with self._lock:
try:
frame = file_system_tools.read_tensor_summary(path).astype(np.uint8)
self.most_recent_frame = frame
return frame
except (message.DecodeError, IOError, tf.errors.NotFoundError):
if self.most_recent_frame is None:
self.most_recent_frame = im_util.get_image_relative_to_script(
'no-data.png')
return self.most_recent_frame

@wrappers.Request.application
def _serve_change_config(self, request):
Expand All @@ -132,18 +136,19 @@ def _serve_change_config(self, request):
'{}/{}'.format(self.PLUGIN_LOGDIR, shared_config.CONFIG_FILENAME))
return http_util.Respond(request, {'config': config}, 'application/json')


@wrappers.Request.application
def _serve_section_info(self, request):
path = '{}/{}'.format(
self.PLUGIN_LOGDIR, shared_config.SECTION_INFO_FILENAME)
info = file_system_tools.read_pickle(path, default=self.most_recent_info)
self.most_recent_info = info
with self._lock:
default = self.most_recent_info
info = file_system_tools.read_pickle(path, default=default)
if info is not default:
with self._lock:
self.most_recent_info = info
return http_util.Respond(request, info, 'application/json')


def _frame_generator(self):

while True:
last_duration = 0

Expand All @@ -154,7 +159,7 @@ def _frame_generator(self):

start_time = time.time()
array = self._fetch_current_frame()
image_bytes = im_util.encode_png(array)
image_bytes = util.encode_png(array)

frame_text = b'--frame\r\n'
content_type = b'Content-Type: image/png\r\n\r\n'
Expand Down
108 changes: 4 additions & 104 deletions tensorboard/plugins/beholder/im_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from __future__ import print_function

import os
import threading

import numpy as np
import tensorflow as tf

from tensorboard import util
from tensorboard.plugins.beholder import colormaps


Expand Down Expand Up @@ -86,82 +86,7 @@ def apply_colormap(image, colormap='magma'):
return image if cm is None else cm[image]


# Taken from https://github.com/tensorflow/tensorboard/blob/
# /28f58888ebb22e2db0f4f1f60cd96138ef72b2ef/tensorboard/util.py

# Modified by Chris Anderson to not use the GPU.
class PersistentOpEvaluator(object):
"""Evaluate a fixed TensorFlow graph repeatedly, safely, efficiently.
Extend this class to create a particular kind of op evaluator, like an
image encoder. In `initialize_graph`, create an appropriate TensorFlow
graph with placeholder inputs. In `run`, evaluate this graph and
return its result. This class will manage a singleton graph and
session to preserve memory usage, and will ensure that this graph and
session do not interfere with other concurrent sessions.
A subclass of this class offers a threadsafe, highly parallel Python
entry point for evaluating a particular TensorFlow graph.
Example usage:
class FluxCapacitanceEvaluator(PersistentOpEvaluator):
\"\"\"Compute the flux capacitance required for a system.
Arguments:
x: Available power input, as a `float`, in jigawatts.
Returns:
A `float`, in nanofarads.
\"\"\"
def initialize_graph(self):
self._placeholder = tf.placeholder(some_dtype)
self._op = some_op(self._placeholder)
def run(self, x):
return self._op.eval(feed_dict: {self._placeholder: x})
evaluate_flux_capacitance = FluxCapacitanceEvaluator()
for x in xs:
evaluate_flux_capacitance(x)
"""

def __init__(self):
super(PersistentOpEvaluator, self).__init__()
self._session = None
self._initialization_lock = threading.Lock()


def _lazily_initialize(self):
"""Initialize the graph and session, if this has not yet been done."""
with self._initialization_lock:
if self._session:
return
graph = tf.Graph()
with graph.as_default():
self.initialize_graph()

config = tf.ConfigProto(device_count={'GPU': 0})
self._session = tf.Session(graph=graph, config=config)


def initialize_graph(self):
"""Create the TensorFlow graph needed to compute this operation.
This should write ops to the default graph and return `None`.
"""
raise NotImplementedError('Subclasses must implement "initialize_graph".')


def run(self, *args, **kwargs):
"""Evaluate the ops with the given input.
When this function is called, the default session will have the
graph defined by a previous call to `initialize_graph`. This
function should evaluate any ops necessary to compute the result of
the query for the given *args and **kwargs, likely returning the
result of a call to `some_op.eval(...)`.
"""
raise NotImplementedError('Subclasses must implement "run".')


def __call__(self, *args, **kwargs):
self._lazily_initialize()
with self._session.as_default():
return self.run(*args, **kwargs)


class PNGDecoder(PersistentOpEvaluator):
class PNGDecoder(util.PersistentOpEvaluator):

def __init__(self):
super(PNGDecoder, self).__init__()
Expand All @@ -181,30 +106,7 @@ def run(self, image):
})


class PNGEncoder(PersistentOpEvaluator):

def __init__(self):
super(PNGEncoder, self).__init__()
self._image_placeholder = None
self._encode_op = None


def initialize_graph(self):
self._image_placeholder = tf.placeholder(dtype=tf.uint8)
self._encode_op = tf.image.encode_png(self._image_placeholder)


# pylint: disable=arguments-differ
def run(self, image):
if len(image.shape) == 2:
image = image.reshape([image.shape[0], image.shape[1], 1])

return self._encode_op.eval(feed_dict={
self._image_placeholder: image,
})


class Resizer(PersistentOpEvaluator):
class Resizer(util.PersistentOpEvaluator):

def __init__(self):
super(Resizer, self).__init__()
Expand Down Expand Up @@ -233,18 +135,16 @@ def run(self, image, height, width):


decode_png = PNGDecoder()
encode_png = PNGEncoder()
resize = Resizer()


def read_image(filename):
with tf.gfile.Open(filename, 'rb') as image_file:
return np.array(decode_png(image_file.read()))


def write_image(array, filename):
with tf.gfile.Open(filename, 'w') as image_file:
image_file.write(encode_png(array))
image_file.write(util.encode_png(array))


def get_image_relative_to_script(filename):
Expand Down
4 changes: 3 additions & 1 deletion tensorboard/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ def _lazily_initialize(self):
graph = tf.Graph()
with graph.as_default():
self.initialize_graph()
self._session = tf.Session(graph=graph)
# Don't reserve GPU because libpng can't run on GPU.
config = tf.ConfigProto(device_count={'GPU': 0})
self._session = tf.Session(graph=graph, config=config)

def initialize_graph(self):
"""Create the TensorFlow graph needed to compute this operation.
Expand Down

0 comments on commit 1e49774

Please sign in to comment.