From c1be95b8cac849759b20100401c94077f9bcaaf4 Mon Sep 17 00:00:00 2001 From: Justine Tunney Date: Thu, 17 Aug 2017 17:16:51 -0700 Subject: [PATCH] Restore event_processing APIs (#378) Many teams depend on those APIs internally. --- tensorboard/backend/application.py | 4 +- tensorboard/backend/application_test.py | 2 +- tensorboard/backend/event_processing/BUILD | 37 +- .../event_processing/event_accumulator.py | 273 +++++-- .../event_accumulator_test.py | 574 +++++++++----- .../event_processing/event_multiplexer.py | 62 +- .../event_multiplexer_test.py | 36 +- .../plugin_event_accumulator.py | 608 +++++++++++++++ .../plugin_event_accumulator_test.py | 718 ++++++++++++++++++ .../plugin_event_multiplexer.py | 448 +++++++++++ .../plugin_event_multiplexer_test.py | 325 ++++++++ .../plugins/audio/audio_plugin_test.py | 2 +- tensorboard/plugins/core/core_plugin_test.py | 2 +- .../plugins/debugger/debugger_plugin_test.py | 2 +- .../debugger/debugger_plugin_testlib.py | 2 +- .../debugger/events_writer_manager_test.py | 2 +- .../plugins/distribution/compressor.py | 51 ++ .../distribution/distributions_plugin_test.py | 4 +- tensorboard/plugins/graph/graphs_plugin.py | 2 +- .../plugins/graph/graphs_plugin_test.py | 2 +- .../histogram/histograms_plugin_test.py | 4 +- .../plugins/image/images_plugin_test.py | 2 +- tensorboard/plugins/pr_curve/summary_test.py | 12 +- .../projector/projector_plugin_test.py | 2 +- .../plugins/scalar/scalars_plugin_test.py | 4 +- tensorboard/plugins/text/text_plugin_test.py | 2 +- 26 files changed, 2904 insertions(+), 278 deletions(-) create mode 100644 tensorboard/backend/event_processing/plugin_event_accumulator.py create mode 100644 tensorboard/backend/event_processing/plugin_event_accumulator_test.py create mode 100644 tensorboard/backend/event_processing/plugin_event_multiplexer.py create mode 100644 tensorboard/backend/event_processing/plugin_event_multiplexer_test.py diff --git a/tensorboard/backend/application.py b/tensorboard/backend/application.py index 3550f07a48d..1bd836fd402 100644 --- a/tensorboard/backend/application.py +++ b/tensorboard/backend/application.py @@ -37,8 +37,8 @@ from tensorboard import db from tensorboard.backend import http_util -from tensorboard.backend.event_processing import event_accumulator -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.audio import metadata as audio_metadata from tensorboard.plugins.core import core_plugin diff --git a/tensorboard/backend/application_test.py b/tensorboard/backend/application_test.py index b719f09fd48..5c133735abb 100644 --- a/tensorboard/backend/application_test.py +++ b/tensorboard/backend/application_test.py @@ -33,7 +33,7 @@ from tensorboard import main as tensorboard from tensorboard.backend import application -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index a87c80c2efd..25a7ead2282 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -74,7 +74,10 @@ py_test( py_library( name = "event_accumulator", - srcs = ["event_accumulator.py"], + srcs = [ + "event_accumulator.py", + "plugin_event_accumulator.py", + ], srcs_version = "PY2AND3", visibility = ["//visibility:public"], deps = [ @@ -103,9 +106,27 @@ py_test( ], ) +py_test( + name = "plugin_event_accumulator_test", + size = "small", + srcs = ["plugin_event_accumulator_test.py"], + srcs_version = "PY2AND3", + deps = [ + ":event_accumulator", + "//tensorboard:expect_tensorflow_installed", + "//tensorboard/plugins/audio:summary", + "//tensorboard/plugins/distribution:compressor", + "//tensorboard/plugins/image:summary", + "//tensorboard/plugins/scalar:summary", + ], +) + py_library( name = "event_multiplexer", - srcs = ["event_multiplexer.py"], + srcs = [ + "event_multiplexer.py", + "plugin_event_multiplexer.py", + ], srcs_version = "PY2AND3", visibility = ["//visibility:public"], deps = [ @@ -129,6 +150,18 @@ py_test( ], ) +py_test( + name = "plugin_event_multiplexer_test", + size = "small", + srcs = ["plugin_event_multiplexer_test.py"], + srcs_version = "PY2AND3", + deps = [ + ":event_accumulator", + ":event_multiplexer", + "//tensorboard:expect_tensorflow_installed", + ], +) + py_library( name = "plugin_asset_util", srcs = ["plugin_asset_util.py"], diff --git a/tensorboard/backend/event_processing/event_accumulator.py b/tensorboard/backend/event_processing/event_accumulator.py index 0ed89cb3cfa..81a79fbc329 100644 --- a/tensorboard/backend/event_processing/event_accumulator.py +++ b/tensorboard/backend/event_processing/event_accumulator.py @@ -23,38 +23,80 @@ import tensorflow as tf -from tensorboard import data_compat from tensorboard.backend.event_processing import directory_watcher from tensorboard.backend.event_processing import event_file_loader from tensorboard.backend.event_processing import plugin_asset_util from tensorboard.backend.event_processing import reservoir +from tensorboard.plugins.distribution import compressor namedtuple = collections.namedtuple +ScalarEvent = namedtuple('ScalarEvent', ['wall_time', 'step', 'value']) + +CompressedHistogramEvent = namedtuple('CompressedHistogramEvent', + ['wall_time', 'step', + 'compressed_histogram_values']) + +HistogramEvent = namedtuple('HistogramEvent', + ['wall_time', 'step', 'histogram_value']) + +HistogramValue = namedtuple('HistogramValue', ['min', 'max', 'num', 'sum', + 'sum_squares', 'bucket_limit', + 'bucket']) + +ImageEvent = namedtuple('ImageEvent', ['wall_time', 'step', + 'encoded_image_string', 'width', + 'height']) + +AudioEvent = namedtuple('AudioEvent', ['wall_time', 'step', + 'encoded_audio_string', 'content_type', + 'sample_rate', 'length_frames']) TensorEvent = namedtuple('TensorEvent', ['wall_time', 'step', 'tensor_proto']) ## Different types of summary events handled by the event_accumulator SUMMARY_TYPES = { + 'simple_value': '_ProcessScalar', + 'histo': '_ProcessHistogram', + 'image': '_ProcessImage', + 'audio': '_ProcessAudio', 'tensor': '_ProcessTensor', } ## The tagTypes below are just arbitrary strings chosen to pass the type ## information of the tag from the backend to the frontend +COMPRESSED_HISTOGRAMS = 'distributions' +HISTOGRAMS = 'histograms' +IMAGES = 'images' +AUDIO = 'audio' +SCALARS = 'scalars' TENSORS = 'tensors' GRAPH = 'graph' META_GRAPH = 'meta_graph' RUN_METADATA = 'run_metadata' +## Normal CDF for std_devs: (-Inf, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, Inf) +## naturally gives bands around median of width 1 std dev, 2 std dev, 3 std dev, +## and then the long tail. +NORMAL_HISTOGRAM_BPS = (0, 668, 1587, 3085, 5000, 6915, 8413, 9332, 10000) + DEFAULT_SIZE_GUIDANCE = { - TENSORS: 500, + COMPRESSED_HISTOGRAMS: 500, + IMAGES: 4, + AUDIO: 4, + SCALARS: 10000, + HISTOGRAMS: 1, + TENSORS: 10, } STORE_EVERYTHING_SIZE_GUIDANCE = { + COMPRESSED_HISTOGRAMS: 0, + IMAGES: 0, + AUDIO: 0, + SCALARS: 0, + HISTOGRAMS: 0, TENSORS: 0, } -_TENSOR_RESERVOIR_KEY = "." # arbitrary - def IsTensorFlowEventsFile(path): """Check the path name to see if it is probably a TF Events file. @@ -76,23 +118,31 @@ def IsTensorFlowEventsFile(path): class EventAccumulator(object): """An `EventAccumulator` takes an event generator, and accumulates the values. - The `EventAccumulator` is intended to provide a convenient Python - interface for loading Event data written during a TensorFlow run. - TensorFlow writes out `Event` protobuf objects, which have a timestamp - and step number, and often contain a `Summary`. Summaries can have - different kinds of data stored as arbitrary tensors. The Summaries - also have a tag, which we use to organize logically related data. The - `EventAccumulator` supports retrieving the `Event` and `Summary` data - by its tag. + The `EventAccumulator` is intended to provide a convenient Python interface + for loading Event data written during a TensorFlow run. TensorFlow writes out + `Event` protobuf objects, which have a timestamp and step number, and often + contain a `Summary`. Summaries can have different kinds of data like an image, + a scalar value, or a histogram. The Summaries also have a tag, which we use to + organize logically related data. The `EventAccumulator` supports retrieving + the `Event` and `Summary` data by its tag. - Calling `Tags()` gets a map from `tagType` (i.e., `tensors`) to the - associated tags for those data types. Then, the functional endpoint - (i.g., `Accumulator.Tensors(tag)`) allows for the retrieval of all - data associated with that tag. + Calling `Tags()` gets a map from `tagType` (e.g. `'images'`, + `'compressedHistograms'`, `'scalars'`, etc) to the associated tags for those + data types. Then, various functional endpoints (eg + `Accumulator.Scalars(tag)`) allow for the retrieval of all data + associated with that tag. The `Reload()` method synchronously loads all of the data written so far. + Histograms, audio, and images are very large, so storing all of them is not + recommended. + Fields: + audios: A reservoir.Reservoir of audio summaries. + compressed_histograms: A reservoir.Reservoir of compressed + histogram summaries. + histograms: A reservoir.Reservoir of histogram summaries. + images: A reservoir.Reservoir of image summaries. most_recent_step: Step of last Event proto added. This should only be accessed from the thread that calls Reload. This is -1 if nothing has been loaded yet. @@ -102,9 +152,8 @@ class EventAccumulator(object): the thread that calls Reload. path: A file path to a directory containing tf events files, or a single tf events file. The accumulator will load events from this path. - tensors_by_tag: A dictionary mapping each tag name to a - reservoir.Reservoir of tensor summaries. Each such reservoir will - only use a single key, given by `_TENSOR_RESERVOIR_KEY`. + scalars: A reservoir.Reservoir of scalar summaries. + tensors: A reservoir.Reservoir of tensor summaries. @@Tensors """ @@ -112,7 +161,7 @@ class EventAccumulator(object): def __init__(self, path, size_guidance=None, - tensor_size_guidance=None, + compression_bps=NORMAL_HISTOGRAM_BPS, purge_orphaned_data=True): """Construct the `EventAccumulator`. @@ -125,34 +174,34 @@ def __init__(self, from a `tagType` string to an integer representing the number of items to keep per tag for items of that `tagType`. If the size is 0, all events are stored. - tensor_size_guidance: Like `size_guidance`, but allowing finer - granularity for tensor summaries. Should be a map from the - `plugin_name` field on the `PluginData` proto to an integer - representing the number of items to keep per tag. Plugins for - which there is no entry in this map will default to the value of - `size_guidance[event_accumulator.TENSORS]`. Defaults to `{}`. + compression_bps: Information on how the `EventAccumulator` should compress + histogram data for the `CompressedHistograms` tag (for details see + `ProcessCompressedHistogram`). purge_orphaned_data: Whether to discard any events that were "orphaned" by a TensorFlow restart. """ - size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE) + size_guidance = size_guidance or DEFAULT_SIZE_GUIDANCE sizes = {} for key in DEFAULT_SIZE_GUIDANCE: if key in size_guidance: sizes[key] = size_guidance[key] else: sizes[key] = DEFAULT_SIZE_GUIDANCE[key] - self._size_guidance = size_guidance - self._tensor_size_guidance = dict(tensor_size_guidance or {}) self._first_event_timestamp = None + self.scalars = reservoir.Reservoir(size=sizes[SCALARS]) self._graph = None self._graph_from_metagraph = False self._meta_graph = None self._tagged_metadata = {} self.summary_metadata = {} - self.tensors_by_tag = {} - self._tensors_by_tag_lock = threading.Lock() + self.histograms = reservoir.Reservoir(size=sizes[HISTOGRAMS]) + self.compressed_histograms = reservoir.Reservoir( + size=sizes[COMPRESSED_HISTOGRAMS], always_keep_last=False) + self.images = reservoir.Reservoir(size=sizes[IMAGES]) + self.audios = reservoir.Reservoir(size=sizes[AUDIO]) + self.tensors = reservoir.Reservoir(size=sizes[TENSORS]) # Keep a mapping from plugin name to a dict mapping from tag to plugin data # content obtained from the SummaryMetadata (metadata field of Value) for @@ -166,6 +215,7 @@ def __init__(self, self.path = path self._generator = _GeneratorFromPath(path) + self._compression_bps = compression_bps self.purge_orphaned_data = purge_orphaned_data self.most_recent_step = -1 @@ -173,7 +223,8 @@ def __init__(self, self.file_version = None # The attributes that get built up by the accumulator - self.accumulated_attrs = () + self.accumulated_attrs = ('scalars', 'histograms', + 'compressed_histograms', 'images', 'audios') self._tensor_summaries = {} def Reload(self): @@ -332,8 +383,6 @@ def _ProcessEvent(self, event): self._tagged_metadata[tag] = event.tagged_run_metadata.run_metadata elif event.HasField('summary'): for value in event.summary.value: - value = data_compat.migrate_value(value) - if value.HasField('metadata'): tag = value.tag # We only store the first instance of the metadata. This check @@ -363,6 +412,7 @@ def _ProcessEvent(self, event): tag = value.node_name getattr(self, summary_func)(tag, event.wall_time, event.step, datum) + def Tags(self): """Return all tags found in the value stream. @@ -370,7 +420,12 @@ def Tags(self): A `{tagType: ['list', 'of', 'tags']}` dictionary. """ return { - TENSORS: list(self.tensors_by_tag.keys()), + IMAGES: self.images.Keys(), + AUDIO: self.audios.Keys(), + HISTOGRAMS: self.histograms.Keys(), + SCALARS: self.scalars.Keys(), + COMPRESSED_HISTOGRAMS: self.compressed_histograms.Keys(), + TENSORS: self.tensors.Keys(), # Use a heuristic: if the metagraph is available, but # graph is not, then we assume the metagraph contains the graph. GRAPH: self._graph is not None, @@ -378,6 +433,20 @@ def Tags(self): RUN_METADATA: list(self._tagged_metadata.keys()) } + def Scalars(self, tag): + """Given a summary tag, return all associated `ScalarEvent`s. + + Args: + tag: A string tag associated with the events. + + Raises: + KeyError: If the tag is not found. + + Returns: + An array of `ScalarEvent`s. + """ + return self.scalars.Items(tag) + def Graph(self): """Return the graph definition, if there is one. @@ -430,6 +499,62 @@ def RunMetadata(self, tag): run_metadata.ParseFromString(self._tagged_metadata[tag]) return run_metadata + def Histograms(self, tag): + """Given a summary tag, return all associated histograms. + + Args: + tag: A string tag associated with the events. + + Raises: + KeyError: If the tag is not found. + + Returns: + An array of `HistogramEvent`s. + """ + return self.histograms.Items(tag) + + def CompressedHistograms(self, tag): + """Given a summary tag, return all associated compressed histograms. + + Args: + tag: A string tag associated with the events. + + Raises: + KeyError: If the tag is not found. + + Returns: + An array of `CompressedHistogramEvent`s. + """ + return self.compressed_histograms.Items(tag) + + def Images(self, tag): + """Given a summary tag, return all associated images. + + Args: + tag: A string tag associated with the events. + + Raises: + KeyError: If the tag is not found. + + Returns: + An array of `ImageEvent`s. + """ + return self.images.Items(tag) + + def Audio(self, tag): + """Given a summary tag, return all associated audio. + + Args: + tag: A string tag associated with the events. + + Raises: + KeyError: If the tag is not found. + + Returns: + An array of `AudioEvent`s. + """ + return self.audios.Items(tag) + def Tensors(self, tag): """Given a summary tag, return all associated tensors. @@ -442,7 +567,7 @@ def Tensors(self, tag): Returns: An array of `TensorEvent`s. """ - return self.tensors_by_tag[tag].Items(_TENSOR_RESERVOIR_KEY) + return self.tensors.Items(tag) def _MaybePurgeOrphanedData(self, event): """Maybe purge orphaned data due to a TensorFlow crash. @@ -505,21 +630,57 @@ def _CheckForOutOfOrderStepAndMaybePurge(self, event): self.most_recent_step = event.step self.most_recent_wall_time = event.wall_time + def _ConvertHistogramProtoToTuple(self, histo): + return HistogramValue(min=histo.min, + max=histo.max, + num=histo.num, + sum=histo.sum, + sum_squares=histo.sum_squares, + bucket_limit=list(histo.bucket_limit), + bucket=list(histo.bucket)) + + def _ProcessHistogram(self, tag, wall_time, step, histo): + """Processes a proto histogram by adding it to accumulated state.""" + histo = self._ConvertHistogramProtoToTuple(histo) + histo_ev = HistogramEvent(wall_time, step, histo) + self.histograms.AddItem(tag, histo_ev) + self.compressed_histograms.AddItem(tag, histo_ev, self._CompressHistogram) + + def _CompressHistogram(self, histo_ev): + """Callback for _ProcessHistogram.""" + return CompressedHistogramEvent( + histo_ev.wall_time, + histo_ev.step, + compressor.compress_histogram_proto( + histo_ev.histogram_value, self._compression_bps)) + + def _ProcessImage(self, tag, wall_time, step, image): + """Processes an image by adding it to accumulated state.""" + event = ImageEvent(wall_time=wall_time, + step=step, + encoded_image_string=image.encoded_image_string, + width=image.width, + height=image.height) + self.images.AddItem(tag, event) + + def _ProcessAudio(self, tag, wall_time, step, audio): + """Processes a audio by adding it to accumulated state.""" + event = AudioEvent(wall_time=wall_time, + step=step, + encoded_audio_string=audio.encoded_audio_string, + content_type=audio.content_type, + sample_rate=audio.sample_rate, + length_frames=audio.length_frames) + self.audios.AddItem(tag, event) + + def _ProcessScalar(self, tag, wall_time, step, scalar): + """Processes a simple value by adding it to accumulated state.""" + sv = ScalarEvent(wall_time=wall_time, step=step, value=scalar) + self.scalars.AddItem(tag, sv) + def _ProcessTensor(self, tag, wall_time, step, tensor): tv = TensorEvent(wall_time=wall_time, step=step, tensor_proto=tensor) - with self._tensors_by_tag_lock: - if tag not in self.tensors_by_tag: - reservoir_size = self._GetTensorReservoirSize(tag) - self.tensors_by_tag[tag] = reservoir.Reservoir(reservoir_size) - self.tensors_by_tag[tag].AddItem(_TENSOR_RESERVOIR_KEY, tv) - - def _GetTensorReservoirSize(self, tag): - default = self._size_guidance[TENSORS] - summary_metadata = self.summary_metadata.get(tag) - if summary_metadata is None: - return default - return self._tensor_size_guidance.get( - summary_metadata.plugin_data.plugin_name, default) + self.tensors.AddItem(tag, tv) def _Purge(self, event, by_tags): """Purge all events that have occurred after the given event.step. @@ -566,14 +727,20 @@ def _ExpiredPerTag(value): def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, - event_wall_time): + event_wall_time, num_expired_scalars, num_expired_histos, + num_expired_comp_histos, num_expired_images, + num_expired_audio): """Return the string message associated with TensorBoard purges.""" return ('Detected out of order event.step likely caused by ' 'a TensorFlow restart. Purging expired events from Tensorboard' ' display between the previous step: {} (timestamp: {}) and ' - 'current step: {} (timestamp: {}).' - ).format(most_recent_step, most_recent_wall_time, event_step, - event_wall_time) + 'current step: {} (timestamp: {}). Removing {} scalars, {} ' + 'histograms, {} compressed histograms, {} images, ' + 'and {} audio.').format(most_recent_step, most_recent_wall_time, + event_step, event_wall_time, + num_expired_scalars, num_expired_histos, + num_expired_comp_histos, num_expired_images, + num_expired_audio) def _GeneratorFromPath(path): diff --git a/tensorboard/backend/event_processing/event_accumulator_test.py b/tensorboard/backend/event_processing/event_accumulator_test.py index f4f2f167a5c..5d9e0dbe438 100644 --- a/tensorboard/backend/event_processing/event_accumulator_test.py +++ b/tensorboard/backend/event_processing/event_accumulator_test.py @@ -24,10 +24,8 @@ from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf -from tensorboard.plugins.audio import summary as audio_summary -from tensorboard.plugins.image import summary as image_summary -from tensorboard.plugins.scalar import summary as scalar_summary from tensorboard.backend.event_processing import event_accumulator as ea +from tensorboard.plugins.distribution import compressor class _EventGenerator(object): @@ -48,19 +46,71 @@ def Load(self): while self.items: yield self.items.pop(0) - def AddScalarTensor(self, tag, wall_time=0, step=0, value=0): - """Add a rank-0 tensor event. - - Note: This is not related to the scalar plugin; it's just a - convenience function to add an event whose contents aren't - important. - """ - tensor = tf.make_tensor_proto(float(value)) + def AddScalar(self, tag, wall_time=0, step=0, value=0): event = tf.Event( wall_time=wall_time, step=step, summary=tf.Summary( - value=[tf.Summary.Value(tag=tag, tensor=tensor)])) + value=[tf.Summary.Value(tag=tag, simple_value=value)])) + self.AddEvent(event) + + def AddHistogram(self, + tag, + wall_time=0, + step=0, + hmin=1, + hmax=2, + hnum=3, + hsum=4, + hsum_squares=5, + hbucket_limit=None, + hbucket=None): + histo = tf.HistogramProto( + min=hmin, + max=hmax, + num=hnum, + sum=hsum, + sum_squares=hsum_squares, + bucket_limit=hbucket_limit, + bucket=hbucket) + event = tf.Event( + wall_time=wall_time, + step=step, + summary=tf.Summary(value=[tf.Summary.Value(tag=tag, histo=histo)])) + self.AddEvent(event) + + def AddImage(self, + tag, + wall_time=0, + step=0, + encoded_image_string=b'imgstr', + width=150, + height=100): + image = tf.Summary.Image( + encoded_image_string=encoded_image_string, width=width, height=height) + event = tf.Event( + wall_time=wall_time, + step=step, + summary=tf.Summary(value=[tf.Summary.Value(tag=tag, image=image)])) + self.AddEvent(event) + + def AddAudio(self, + tag, + wall_time=0, + step=0, + encoded_audio_string=b'sndstr', + content_type='audio/wav', + sample_rate=44100, + length_frames=22050): + audio = tf.Summary.Audio( + encoded_audio_string=encoded_audio_string, + content_type=content_type, + sample_rate=sample_rate, + length_frames=length_frames) + event = tf.Event( + wall_time=wall_time, + step=step, + summary=tf.Summary(value=[tf.Summary.Value(tag=tag, audio=audio)])) self.AddEvent(event) def AddEvent(self, event): @@ -92,6 +142,11 @@ def assertTagsEqual(self, actual, expected): """ empty_tags = { + ea.IMAGES: [], + ea.AUDIO: [], + ea.SCALARS: [], + ea.HISTOGRAMS: [], + ea.COMPRESSED_HISTOGRAMS: [], ea.GRAPH: False, ea.META_GRAPH: False, ea.RUN_METADATA: [], @@ -136,38 +191,278 @@ def testEmptyAccumulator(self): x.Reload() self.assertTagsEqual(x.Tags(), {}) + def testTags(self): + """Tags should be found in EventAccumulator after adding some events.""" + gen = _EventGenerator(self) + gen.AddScalar('s1') + gen.AddScalar('s2') + gen.AddHistogram('hst1') + gen.AddHistogram('hst2') + gen.AddImage('im1') + gen.AddImage('im2') + gen.AddAudio('snd1') + gen.AddAudio('snd2') + acc = ea.EventAccumulator(gen) + acc.Reload() + self.assertTagsEqual(acc.Tags(), { + ea.IMAGES: ['im1', 'im2'], + ea.AUDIO: ['snd1', 'snd2'], + ea.SCALARS: ['s1', 's2'], + ea.HISTOGRAMS: ['hst1', 'hst2'], + ea.COMPRESSED_HISTOGRAMS: ['hst1', 'hst2'], + }) + def testReload(self): """EventAccumulator contains suitable tags after calling Reload.""" gen = _EventGenerator(self) acc = ea.EventAccumulator(gen) acc.Reload() self.assertTagsEqual(acc.Tags(), {}) - gen.AddScalarTensor('s1', wall_time=1, step=10, value=50) - gen.AddScalarTensor('s2', wall_time=1, step=10, value=80) + gen.AddScalar('s1') + gen.AddScalar('s2') + gen.AddHistogram('hst1') + gen.AddHistogram('hst2') + gen.AddImage('im1') + gen.AddImage('im2') + gen.AddAudio('snd1') + gen.AddAudio('snd2') acc.Reload() self.assertTagsEqual(acc.Tags(), { - ea.TENSORS: ['s1', 's2'], + ea.IMAGES: ['im1', 'im2'], + ea.AUDIO: ['snd1', 'snd2'], + ea.SCALARS: ['s1', 's2'], + ea.HISTOGRAMS: ['hst1', 'hst2'], + ea.COMPRESSED_HISTOGRAMS: ['hst1', 'hst2'], }) + def testScalars(self): + """Tests whether EventAccumulator contains scalars after adding them.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + s1 = ea.ScalarEvent(wall_time=1, step=10, value=32) + s2 = ea.ScalarEvent(wall_time=2, step=12, value=64) + gen.AddScalar('s1', wall_time=1, step=10, value=32) + gen.AddScalar('s2', wall_time=2, step=12, value=64) + acc.Reload() + self.assertEqual(acc.Scalars('s1'), [s1]) + self.assertEqual(acc.Scalars('s2'), [s2]) + + def testHistograms(self): + """Tests whether histograms are inserted into EventAccumulator.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + + val1 = ea.HistogramValue( + min=1, + max=2, + num=3, + sum=4, + sum_squares=5, + bucket_limit=[1, 2, 3], + bucket=[0, 3, 0]) + val2 = ea.HistogramValue( + min=-2, + max=3, + num=4, + sum=5, + sum_squares=6, + bucket_limit=[2, 3, 4], + bucket=[1, 3, 0]) + + hst1 = ea.HistogramEvent(wall_time=1, step=10, histogram_value=val1) + hst2 = ea.HistogramEvent(wall_time=2, step=12, histogram_value=val2) + gen.AddHistogram( + 'hst1', + wall_time=1, + step=10, + hmin=1, + hmax=2, + hnum=3, + hsum=4, + hsum_squares=5, + hbucket_limit=[1, 2, 3], + hbucket=[0, 3, 0]) + gen.AddHistogram( + 'hst2', + wall_time=2, + step=12, + hmin=-2, + hmax=3, + hnum=4, + hsum=5, + hsum_squares=6, + hbucket_limit=[2, 3, 4], + hbucket=[1, 3, 0]) + acc.Reload() + self.assertEqual(acc.Histograms('hst1'), [hst1]) + self.assertEqual(acc.Histograms('hst2'), [hst2]) + + def testCompressedHistograms(self): + """Tests compressed histograms inserted into EventAccumulator.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen, compression_bps=(0, 2500, 5000, 7500, 10000)) + + gen.AddHistogram( + 'hst1', + wall_time=1, + step=10, + hmin=1, + hmax=2, + hnum=3, + hsum=4, + hsum_squares=5, + hbucket_limit=[1, 2, 3], + hbucket=[0, 3, 0]) + gen.AddHistogram( + 'hst2', + wall_time=2, + step=12, + hmin=-2, + hmax=3, + hnum=4, + hsum=5, + hsum_squares=6, + hbucket_limit=[2, 3, 4], + hbucket=[1, 3, 0]) + acc.Reload() + + # Create the expected values after compressing hst1 + expected_vals1 = [ + compressor.CompressedHistogramValue(bp, val) + for bp, val in [(0, 1.0), (2500, 1.25), (5000, 1.5), (7500, 1.75 + ), (10000, 2.0)] + ] + expected_cmphst1 = ea.CompressedHistogramEvent( + wall_time=1, step=10, compressed_histogram_values=expected_vals1) + self.assertEqual(acc.CompressedHistograms('hst1'), [expected_cmphst1]) + + # Create the expected values after compressing hst2 + expected_vals2 = [ + compressor.CompressedHistogramValue(bp, val) + for bp, val in [(0, -2), + (2500, 2), + (5000, 2 + 1 / 3), + (7500, 2 + 2 / 3), + (10000, 3)] + ] + expected_cmphst2 = ea.CompressedHistogramEvent( + wall_time=2, step=12, compressed_histogram_values=expected_vals2) + self.assertEqual(acc.CompressedHistograms('hst2'), [expected_cmphst2]) + + def testImages(self): + """Tests 2 images inserted/accessed in EventAccumulator.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + im1 = ea.ImageEvent( + wall_time=1, + step=10, + encoded_image_string=b'big', + width=400, + height=300) + im2 = ea.ImageEvent( + wall_time=2, + step=12, + encoded_image_string=b'small', + width=40, + height=30) + gen.AddImage( + 'im1', + wall_time=1, + step=10, + encoded_image_string=b'big', + width=400, + height=300) + gen.AddImage( + 'im2', + wall_time=2, + step=12, + encoded_image_string=b'small', + width=40, + height=30) + acc.Reload() + self.assertEqual(acc.Images('im1'), [im1]) + self.assertEqual(acc.Images('im2'), [im2]) + + def testAudio(self): + """Tests 2 audio events inserted/accessed in EventAccumulator.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + snd1 = ea.AudioEvent( + wall_time=1, + step=10, + encoded_audio_string=b'big', + content_type='audio/wav', + sample_rate=44100, + length_frames=441000) + snd2 = ea.AudioEvent( + wall_time=2, + step=12, + encoded_audio_string=b'small', + content_type='audio/wav', + sample_rate=44100, + length_frames=44100) + gen.AddAudio( + 'snd1', + wall_time=1, + step=10, + encoded_audio_string=b'big', + content_type='audio/wav', + sample_rate=44100, + length_frames=441000) + gen.AddAudio( + 'snd2', + wall_time=2, + step=12, + encoded_audio_string=b'small', + content_type='audio/wav', + sample_rate=44100, + length_frames=44100) + acc.Reload() + self.assertEqual(acc.Audio('snd1'), [snd1]) + self.assertEqual(acc.Audio('snd2'), [snd2]) + def testKeyError(self): """KeyError should be raised when accessing non-existing keys.""" gen = _EventGenerator(self) acc = ea.EventAccumulator(gen) acc.Reload() with self.assertRaises(KeyError): - acc.Tensors('s1') + acc.Scalars('s1') + with self.assertRaises(KeyError): + acc.Scalars('hst1') + with self.assertRaises(KeyError): + acc.Scalars('im1') + with self.assertRaises(KeyError): + acc.Histograms('s1') + with self.assertRaises(KeyError): + acc.Histograms('im1') + with self.assertRaises(KeyError): + acc.Images('s1') + with self.assertRaises(KeyError): + acc.Images('hst1') + with self.assertRaises(KeyError): + acc.Audio('s1') + with self.assertRaises(KeyError): + acc.Audio('hst1') def testNonValueEvents(self): """Non-value events in the generator don't cause early exits.""" gen = _EventGenerator(self) acc = ea.EventAccumulator(gen) - gen.AddScalarTensor('s1', wall_time=1, step=10, value=20) + gen.AddScalar('s1', wall_time=1, step=10, value=20) gen.AddEvent(tf.Event(wall_time=2, step=20, file_version='nots2')) - gen.AddScalarTensor('s3', wall_time=3, step=100, value=1) + gen.AddScalar('s3', wall_time=3, step=100, value=1) + gen.AddHistogram('hst1') + gen.AddImage('im1') + gen.AddAudio('snd1') acc.Reload() self.assertTagsEqual(acc.Tags(), { - ea.TENSORS: ['s1', 's3'], + ea.IMAGES: ['im1'], + ea.AUDIO: ['snd1'], + ea.SCALARS: ['s1', 's3'], + ea.HISTOGRAMS: ['hst1'], + ea.COMPRESSED_HISTOGRAMS: ['hst1'], }) def testExpiredDataDiscardedAfterRestartForFileVersionLessThan2(self): @@ -180,7 +475,6 @@ def testExpiredDataDiscardedAfterRestartForFileVersionLessThan2(self): Only file versions < 2 use this out-of-order discard logic. Later versions discard events based on the step value of SessionLog.START. """ - self.skipTest("TODO: Implement event discarding for tensor events.") warnings = [] self.stubs.Set(tf.logging, 'warn', warnings.append) @@ -188,19 +482,19 @@ def testExpiredDataDiscardedAfterRestartForFileVersionLessThan2(self): acc = ea.EventAccumulator(gen) gen.AddEvent(tf.Event(wall_time=0, step=0, file_version='brain.Event:1')) - gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) + gen.AddScalar('s1', wall_time=1, step=100, value=20) + gen.AddScalar('s1', wall_time=1, step=200, value=20) + gen.AddScalar('s1', wall_time=1, step=300, value=20) acc.Reload() ## Check that number of items are what they should be - self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 200, 300]) + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300]) - gen.AddScalarTensor('s1', wall_time=1, step=101, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=201, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=301, value=20) + gen.AddScalar('s1', wall_time=1, step=101, value=20) + gen.AddScalar('s1', wall_time=1, step=201, value=20) + gen.AddScalar('s1', wall_time=1, step=301, value=20) acc.Reload() ## Check that we have discarded 200 and 300 from s1 - self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 101, 201, 301]) + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) def testOrphanedDataNotDiscardedIfFlagUnset(self): """Tests that events are not discarded if purge_orphaned_data is false. @@ -209,19 +503,19 @@ def testOrphanedDataNotDiscardedIfFlagUnset(self): acc = ea.EventAccumulator(gen, purge_orphaned_data=False) gen.AddEvent(tf.Event(wall_time=0, step=0, file_version='brain.Event:1')) - gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) + gen.AddScalar('s1', wall_time=1, step=100, value=20) + gen.AddScalar('s1', wall_time=1, step=200, value=20) + gen.AddScalar('s1', wall_time=1, step=300, value=20) acc.Reload() ## Check that number of items are what they should be - self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 200, 300]) + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300]) - gen.AddScalarTensor('s1', wall_time=1, step=101, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=201, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=301, value=20) + gen.AddScalar('s1', wall_time=1, step=101, value=20) + gen.AddScalar('s1', wall_time=1, step=201, value=20) + gen.AddScalar('s1', wall_time=1, step=301, value=20) acc.Reload() ## Check that we have discarded 200 and 300 from s1 - self.assertEqual([x.step for x in acc.Tensors('s1')], + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300, 101, 201, 301]) def testEventsDiscardedPerTagAfterRestartForFileVersionLessThan2(self): @@ -234,7 +528,6 @@ def testEventsDiscardedPerTagAfterRestartForFileVersionLessThan2(self): Only file versions < 2 use this out-of-order discard logic. Later versions discard events based on the step value of SessionLog.START. """ - self.skipTest("TODO: Implement event discarding for tensor events.") warnings = [] self.stubs.Set(tf.logging, 'warn', warnings.append) @@ -242,37 +535,37 @@ def testEventsDiscardedPerTagAfterRestartForFileVersionLessThan2(self): acc = ea.EventAccumulator(gen) gen.AddEvent(tf.Event(wall_time=0, step=0, file_version='brain.Event:1')) - gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=101, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=201, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=301, value=20) + gen.AddScalar('s1', wall_time=1, step=100, value=20) + gen.AddScalar('s1', wall_time=1, step=200, value=20) + gen.AddScalar('s1', wall_time=1, step=300, value=20) + gen.AddScalar('s1', wall_time=1, step=101, value=20) + gen.AddScalar('s1', wall_time=1, step=201, value=20) + gen.AddScalar('s1', wall_time=1, step=301, value=20) - gen.AddScalarTensor('s2', wall_time=1, step=101, value=20) - gen.AddScalarTensor('s2', wall_time=1, step=201, value=20) - gen.AddScalarTensor('s2', wall_time=1, step=301, value=20) + gen.AddScalar('s2', wall_time=1, step=101, value=20) + gen.AddScalar('s2', wall_time=1, step=201, value=20) + gen.AddScalar('s2', wall_time=1, step=301, value=20) acc.Reload() ## Check that we have discarded 200 and 300 - self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 101, 201, 301]) + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) ## Check that s1 discards do not affect s2 ## i.e. check that only events from the out of order tag are discarded - self.assertEqual([x.step for x in acc.Tensors('s2')], [101, 201, 301]) + self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301]) def testOnlySummaryEventsTriggerDiscards(self): """Test that file version event does not trigger data purge.""" gen = _EventGenerator(self) acc = ea.EventAccumulator(gen) - gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) + gen.AddScalar('s1', wall_time=1, step=100, value=20) ev1 = tf.Event(wall_time=2, step=0, file_version='brain.Event:1') graph_bytes = tf.GraphDef().SerializeToString() ev2 = tf.Event(wall_time=3, step=0, graph_def=graph_bytes) gen.AddEvent(ev1) gen.AddEvent(ev2) acc.Reload() - self.assertEqual([x.step for x in acc.Tensors('s1')], [100]) + self.assertEqual([x.step for x in acc.Scalars('s1')], [100]) def testSessionLogStartMessageDiscardsExpiredEvents(self): """Test that SessionLog.START message discards expired events. @@ -281,31 +574,30 @@ def testSessionLogStartMessageDiscardsExpiredEvents(self): but this logic can only be used for event protos which have the SessionLog enum, which was introduced to event.proto for file_version >= brain.Event:2. """ - self.skipTest("TODO: Implement event discarding for tensor events.") gen = _EventGenerator(self) acc = ea.EventAccumulator(gen) gen.AddEvent(tf.Event(wall_time=0, step=1, file_version='brain.Event:2')) - gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) - gen.AddScalarTensor('s1', wall_time=1, step=400, value=20) + gen.AddScalar('s1', wall_time=1, step=100, value=20) + gen.AddScalar('s1', wall_time=1, step=200, value=20) + gen.AddScalar('s1', wall_time=1, step=300, value=20) + gen.AddScalar('s1', wall_time=1, step=400, value=20) - gen.AddScalarTensor('s2', wall_time=1, step=202, value=20) - gen.AddScalarTensor('s2', wall_time=1, step=203, value=20) + gen.AddScalar('s2', wall_time=1, step=202, value=20) + gen.AddScalar('s2', wall_time=1, step=203, value=20) slog = tf.SessionLog(status=tf.SessionLog.START) gen.AddEvent(tf.Event(wall_time=2, step=201, session_log=slog)) acc.Reload() - self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 200]) - self.assertEqual([x.step for x in acc.Tensors('s2')], []) + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200]) + self.assertEqual([x.step for x in acc.Scalars('s2')], []) def testFirstEventTimestamp(self): """Test that FirstEventTimestamp() returns wall_time of the first event.""" gen = _EventGenerator(self) acc = ea.EventAccumulator(gen) gen.AddEvent(tf.Event(wall_time=10, step=20, file_version='brain.Event:2')) - gen.AddScalarTensor('s1', wall_time=30, step=40, value=20) + gen.AddScalar('s1', wall_time=30, step=40, value=20) self.assertEqual(acc.FirstEventTimestamp(), 10) def testReloadPopulatesFirstEventTimestamp(self): @@ -332,71 +624,43 @@ def testFirstEventTimestampLoadsEvent(self): acc.Reload() self.assertEqual(acc.file_version, 2.0) - def testNewStyleScalarSummary(self): - """Verify processing of tensorboard.plugins.scalar.summary.""" + def testTFSummaryScalar(self): + """Verify processing of tf.summary.scalar.""" event_sink = _EventGenerator(self, zero_out_timestamps=True) writer = tf.summary.FileWriter(self.get_temp_dir()) writer.event_writer = event_sink with self.test_session() as sess: - step = tf.placeholder(tf.float32, shape=[]) - scalar_summary.op('accuracy', 1.0 - 1.0 / (step + tf.constant(1.0))) - scalar_summary.op('xent', 1.0 / (step + tf.constant(1.0))) + ipt = tf.placeholder(tf.float32) + tf.summary.scalar('scalar1', ipt) + tf.summary.scalar('scalar2', ipt * ipt) merged = tf.summary.merge_all() writer.add_graph(sess.graph) for i in xrange(10): - summ = sess.run(merged, feed_dict={step: float(i)}) + summ = sess.run(merged, feed_dict={ipt: i}) writer.add_summary(summ, global_step=i) accumulator = ea.EventAccumulator(event_sink) accumulator.Reload() - tags = [ - u'accuracy/scalar_summary', - u'xent/scalar_summary', + seq1 = [ea.ScalarEvent(wall_time=0, step=i, value=i) for i in xrange(10)] + seq2 = [ + ea.ScalarEvent( + wall_time=0, step=i, value=i * i) for i in xrange(10) ] self.assertTagsEqual(accumulator.Tags(), { - ea.TENSORS: tags, + ea.SCALARS: ['scalar1', 'scalar2'], ea.GRAPH: True, ea.META_GRAPH: False, }) - def testNewStyleAudioSummary(self): - """Verify processing of tensorboard.plugins.audio.summary.""" - event_sink = _EventGenerator(self, zero_out_timestamps=True) - writer = tf.summary.FileWriter(self.get_temp_dir()) - writer.event_writer = event_sink - with self.test_session() as sess: - ipt = tf.random_normal(shape=[5, 441, 2]) - with tf.name_scope('1'): - audio_summary.op('one', ipt, sample_rate=44100, max_outputs=1) - with tf.name_scope('2'): - audio_summary.op('two', ipt, sample_rate=44100, max_outputs=2) - with tf.name_scope('3'): - audio_summary.op('three', ipt, sample_rate=44100, max_outputs=3) - merged = tf.summary.merge_all() - writer.add_graph(sess.graph) - for i in xrange(10): - summ = sess.run(merged) - writer.add_summary(summ, global_step=i) + self.assertEqual(accumulator.Scalars('scalar1'), seq1) + self.assertEqual(accumulator.Scalars('scalar2'), seq2) + first_value = accumulator.Scalars('scalar1')[0].value + self.assertTrue(isinstance(first_value, float)) - accumulator = ea.EventAccumulator(event_sink) - accumulator.Reload() - - tags = [ - u'1/one/audio_summary', - u'2/two/audio_summary', - u'3/three/audio_summary', - ] - - self.assertTagsEqual(accumulator.Tags(), { - ea.TENSORS: tags, - ea.GRAPH: True, - ea.META_GRAPH: False, - }) - - def testNewStyleImageSummary(self): - """Verify processing of tensorboard.plugins.image.summary.""" + def testTFSummaryImage(self): + """Verify processing of tf.summary.image.""" event_sink = _EventGenerator(self, zero_out_timestamps=True) writer = tf.summary.FileWriter(self.get_temp_dir()) writer.event_writer = event_sink @@ -407,11 +671,11 @@ def testNewStyleImageSummary(self): # Using the tf node name instead allows argument re-use to the image # summary. with tf.name_scope('1'): - image_summary.op('images', ipt, max_outputs=1) + tf.summary.image('images', ipt, max_outputs=1) with tf.name_scope('2'): - image_summary.op('images', ipt, max_outputs=2) + tf.summary.image('images', ipt, max_outputs=2) with tf.name_scope('3'): - image_summary.op('images', ipt, max_outputs=3) + tf.summary.image('images', ipt, max_outputs=3) merged = tf.summary.merge_all() writer.add_graph(sess.graph) for i in xrange(10): @@ -422,13 +686,12 @@ def testNewStyleImageSummary(self): accumulator.Reload() tags = [ - u'1/images/image_summary', - u'2/images/image_summary', - u'3/images/image_summary', + u'1/images/image', u'2/images/image/0', u'2/images/image/1', + u'3/images/image/0', u'3/images/image/1', u'3/images/image/2' ] self.assertTagsEqual(accumulator.Tags(), { - ea.TENSORS: tags, + ea.IMAGES: tags, ea.GRAPH: True, ea.META_GRAPH: False, }) @@ -464,72 +727,10 @@ def testTFSummaryTensor(self): self.assertTrue(np.array_equal(vector, [1.0, 2.0, 3.0])) self.assertTrue(np.array_equal(string, six.b('foobar'))) - def _testTFSummaryTensor_SizeGuidance(self, - plugin_name, - tensor_size_guidance, - steps, - expected_count): - event_sink = _EventGenerator(self, zero_out_timestamps=True) - writer = tf.summary.FileWriter(self.get_temp_dir()) - writer.event_writer = event_sink - with self.test_session() as sess: - summary_metadata = tf.SummaryMetadata( - plugin_data=tf.SummaryMetadata.PluginData(plugin_name=plugin_name, - content='{}')) - tf.summary.tensor_summary('scalar', tf.constant(1.0), - summary_metadata=summary_metadata) - merged = tf.summary.merge_all() - for step in xrange(steps): - writer.add_summary(sess.run(merged), global_step=step) - - - accumulator = ea.EventAccumulator( - event_sink, tensor_size_guidance=tensor_size_guidance) - accumulator.Reload() - - tensors = accumulator.Tensors('scalar') - self.assertEqual(len(tensors), expected_count) - - def testTFSummaryTensor_SizeGuidance_DefaultToTensorGuidance(self): - self._testTFSummaryTensor_SizeGuidance( - plugin_name='jabberwocky', - tensor_size_guidance={}, - steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 1, - expected_count=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS]) - - def testTFSummaryTensor_SizeGuidance_UseSmallSingularPluginGuidance(self): - size = int(ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] / 2) - assert size < ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS], size - self._testTFSummaryTensor_SizeGuidance( - plugin_name='jabberwocky', - tensor_size_guidance={'jabberwocky': size}, - steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 1, - expected_count=size) - - def testTFSummaryTensor_SizeGuidance_UseLargeSingularPluginGuidance(self): - size = ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 5 - self._testTFSummaryTensor_SizeGuidance( - plugin_name='jabberwocky', - tensor_size_guidance={'jabberwocky': size}, - steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 10, - expected_count=size) - - def testTFSummaryTensor_SizeGuidance_IgnoreIrrelevantGuidances(self): - size_small = int(ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] / 3) - size_large = int(ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] / 2) - assert size_small < size_large < ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS], ( - size_small, size_large) - self._testTFSummaryTensor_SizeGuidance( - plugin_name='jabberwocky', - tensor_size_guidance={'jabberwocky': size_small, - 'wnoorejbpxl': size_large}, - steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 1, - expected_count=size_small) - class RealisticEventAccumulatorTest(EventAccumulatorTest): - def testTensorsRealistically(self): + def testScalarsRealistically(self): """Test accumulator by writing values and then reading them.""" def FakeScalarSummary(tag, value): @@ -569,20 +770,20 @@ def FakeScalarSummary(tag, value): acc = ea.EventAccumulator(directory) acc.Reload() self.assertTagsEqual(acc.Tags(), { - ea.TENSORS: ['id', 'sq'], + ea.SCALARS: ['id', 'sq'], ea.GRAPH: True, ea.META_GRAPH: True, ea.RUN_METADATA: ['test run'], }) - id_events = acc.Tensors('id') - sq_events = acc.Tensors('sq') + id_events = acc.Scalars('id') + sq_events = acc.Scalars('sq') self.assertEqual(30, len(id_events)) self.assertEqual(30, len(sq_events)) for i in xrange(30): self.assertEqual(i * 5, id_events[i].step) self.assertEqual(i * 5, sq_events[i].step) - self.assertEqual(i, tf.make_ndarray(id_events[i].tensor_proto).item()) - self.assertEqual(i * i, tf.make_ndarray(sq_events[i].tensor_proto).item()) + self.assertEqual(i, id_events[i].value) + self.assertEqual(i * i, sq_events[i].value) # Write a few more events to test incremental reloading for i in xrange(30, 40): @@ -594,15 +795,15 @@ def FakeScalarSummary(tag, value): # Verify we can now see all of the data acc.Reload() - id_events = acc.Tensors('id') - sq_events = acc.Tensors('sq') + id_events = acc.Scalars('id') + sq_events = acc.Scalars('sq') self.assertEqual(40, len(id_events)) self.assertEqual(40, len(sq_events)) for i in xrange(40): self.assertEqual(i * 5, id_events[i].step) self.assertEqual(i * 5, sq_events[i].step) - self.assertEqual(i, tf.make_ndarray(id_events[i].tensor_proto).item()) - self.assertEqual(i * i, tf.make_ndarray(sq_events[i].tensor_proto).item()) + self.assertEqual(i, id_events[i].value) + self.assertEqual(i * i, sq_events[i].value) self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph()) self.assertProtoEquals(meta_graph_def, acc.MetaGraph()) @@ -658,9 +859,8 @@ def _writeMetadata(self, logdir, summary_metadata, nonce=''): def testSummaryMetadata(self): logdir = self.get_temp_dir() summary_metadata = tf.SummaryMetadata( - display_name='current tagee', - summary_description='no', - plugin_data=tf.SummaryMetadata.PluginData(plugin_name='outlet')) + display_name='current tagee', summary_description='no') + summary_metadata.plugin_data.plugin_name = 'outlet' self._writeMetadata(logdir, summary_metadata) acc = ea.EventAccumulator(logdir) acc.Reload() diff --git a/tensorboard/backend/event_processing/event_multiplexer.py b/tensorboard/backend/event_processing/event_multiplexer.py index 171814b7c0a..37884188c38 100644 --- a/tensorboard/backend/event_processing/event_multiplexer.py +++ b/tensorboard/backend/event_processing/event_multiplexer.py @@ -70,7 +70,6 @@ class EventMultiplexer(object): def __init__(self, run_path_map=None, size_guidance=None, - tensor_size_guidance=None, purge_orphaned_data=True): """Constructor for the `EventMultiplexer`. @@ -81,9 +80,6 @@ def __init__(self, size_guidance: A dictionary mapping from `tagType` to the number of items to store for each tag of that type. See `event_accumulator.EventAccumulator` for details. - tensor_size_guidance: A dictionary mapping from `plugin_name` to - the number of items to store for each tag of that type. See - `event_accumulator.EventAccumulator` for details. purge_orphaned_data: Whether to discard any events that were "orphaned" by a TensorFlow restart. """ @@ -94,7 +90,6 @@ def __init__(self, self._reload_called = False self._size_guidance = (size_guidance or event_accumulator.DEFAULT_SIZE_GUIDANCE) - self._tensor_size_guidance = tensor_size_guidance self.purge_orphaned_data = purge_orphaned_data if run_path_map is not None: tf.logging.info('Event Multplexer doing initialization load for %s', @@ -135,7 +130,6 @@ def AddRun(self, path, name=None): accumulator = event_accumulator.EventAccumulator( path, size_guidance=self._size_guidance, - tensor_size_guidance=self._tensor_size_guidance, purge_orphaned_data=self.purge_orphaned_data) self._accumulators[name] = accumulator self._paths[name] = path @@ -324,6 +318,57 @@ def RunMetadata(self, run, tag): accumulator = self.GetAccumulator(run) return accumulator.RunMetadata(tag) + def Histograms(self, run, tag): + """Retrieve the histogram events associated with a run and tag. + + Args: + run: A string name of the run for which values are retrieved. + tag: A string name of the tag for which values are retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + An array of `event_accumulator.HistogramEvents`. + """ + accumulator = self.GetAccumulator(run) + return accumulator.Histograms(tag) + + def CompressedHistograms(self, run, tag): + """Retrieve the compressed histogram events associated with a run and tag. + + Args: + run: A string name of the run for which values are retrieved. + tag: A string name of the tag for which values are retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + An array of `event_accumulator.CompressedHistogramEvents`. + """ + accumulator = self.GetAccumulator(run) + return accumulator.CompressedHistograms(tag) + + def Images(self, run, tag): + """Retrieve the image events associated with a run and tag. + + Args: + run: A string name of the run for which values are retrieved. + tag: A string name of the tag for which values are retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + An array of `event_accumulator.ImageEvents`. + """ + accumulator = self.GetAccumulator(run) + return accumulator.Images(tag) + def Audio(self, run, tag): """Retrieve the audio events associated with a run and tag. @@ -405,7 +450,10 @@ def Runs(self): Returns: ``` - {runName: { scalarValues: [tagA, tagB, tagC], + {runName: { images: [tag1, tag2, tag3], + scalarValues: [tagA, tagB, tagC], + histograms: [tagX, tagY, tagZ], + compressedHistograms: [tagX, tagY, tagZ], graph: true, meta_graph: true}} ``` """ diff --git a/tensorboard/backend/event_processing/event_multiplexer_test.py b/tensorboard/backend/event_processing/event_multiplexer_test.py index a2f79a34924..ccef6666eb4 100644 --- a/tensorboard/backend/event_processing/event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/event_multiplexer_test.py @@ -60,7 +60,11 @@ def __init__(self, path): } def Tags(self): - return {} + return {event_accumulator.IMAGES: ['im1', 'im2'], + event_accumulator.AUDIO: ['snd1', 'snd2'], + event_accumulator.HISTOGRAMS: ['hst1', 'hst2'], + event_accumulator.COMPRESSED_HISTOGRAMS: ['cmphst1', 'cmphst2'], + event_accumulator.SCALARS: ['sv1', 'sv2']} def FirstEventTimestamp(self): return 0 @@ -70,6 +74,21 @@ def _TagHelper(self, tag_name, enum): raise KeyError return ['%s/%s' % (self._path, tag_name)] + def Scalars(self, tag_name): + return self._TagHelper(tag_name, event_accumulator.SCALARS) + + def Histograms(self, tag_name): + return self._TagHelper(tag_name, event_accumulator.HISTOGRAMS) + + def CompressedHistograms(self, tag_name): + return self._TagHelper(tag_name, event_accumulator.COMPRESSED_HISTOGRAMS) + + def Images(self, tag_name): + return self._TagHelper(tag_name, event_accumulator.IMAGES) + + def Audio(self, tag_name): + return self._TagHelper(tag_name, event_accumulator.AUDIO) + def Tensors(self, tag_name): return self._TagHelper(tag_name, event_accumulator.TENSORS) @@ -88,9 +107,9 @@ def Reload(self): def _GetFakeAccumulator(path, size_guidance=None, - tensor_size_guidance=None, + compression_bps=None, purge_orphaned_data=None): - del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused. + del size_guidance, compression_bps, purge_orphaned_data # Unused. return _FakeAccumulator(path) @@ -126,6 +145,15 @@ def testReload(self): self.assertTrue(x.GetAccumulator('run1').reload_called) self.assertTrue(x.GetAccumulator('run2').reload_called) + def testScalars(self): + """Tests Scalars function returns suitable values.""" + x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) + + run1_actual = x.Scalars('run1', 'sv1') + run1_expected = ['path1/sv1'] + + self.assertEqual(run1_expected, run1_actual) + def testPluginRunToTagToContent(self): """Tests the method that produces the run to tag to content mapping.""" x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) @@ -144,7 +172,7 @@ def testExceptions(self): """KeyError should be raised when accessing non-existing keys.""" x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) with self.assertRaises(KeyError): - x.Tensors('sv1', 'xxx') + x.Scalars('sv1', 'xxx') def testInitialization(self): """Tests EventMultiplexer is created properly with its params.""" diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py new file mode 100644 index 00000000000..0ed89cb3cfa --- /dev/null +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -0,0 +1,608 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Takes a generator of values, and accumulates them for a frontend.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import os +import threading + +import tensorflow as tf + +from tensorboard import data_compat +from tensorboard.backend.event_processing import directory_watcher +from tensorboard.backend.event_processing import event_file_loader +from tensorboard.backend.event_processing import plugin_asset_util +from tensorboard.backend.event_processing import reservoir + +namedtuple = collections.namedtuple + +TensorEvent = namedtuple('TensorEvent', ['wall_time', 'step', 'tensor_proto']) + +## Different types of summary events handled by the event_accumulator +SUMMARY_TYPES = { + 'tensor': '_ProcessTensor', +} + +## The tagTypes below are just arbitrary strings chosen to pass the type +## information of the tag from the backend to the frontend +TENSORS = 'tensors' +GRAPH = 'graph' +META_GRAPH = 'meta_graph' +RUN_METADATA = 'run_metadata' + +DEFAULT_SIZE_GUIDANCE = { + TENSORS: 500, +} + +STORE_EVERYTHING_SIZE_GUIDANCE = { + TENSORS: 0, +} + +_TENSOR_RESERVOIR_KEY = "." # arbitrary + + +def IsTensorFlowEventsFile(path): + """Check the path name to see if it is probably a TF Events file. + + Args: + path: A file path to check if it is an event file. + + Raises: + ValueError: If the path is an empty string. + + Returns: + If path is formatted like a TensorFlowEventsFile. + """ + if not path: + raise ValueError('Path must be a nonempty string') + return 'tfevents' in tf.compat.as_str_any(os.path.basename(path)) + + +class EventAccumulator(object): + """An `EventAccumulator` takes an event generator, and accumulates the values. + + The `EventAccumulator` is intended to provide a convenient Python + interface for loading Event data written during a TensorFlow run. + TensorFlow writes out `Event` protobuf objects, which have a timestamp + and step number, and often contain a `Summary`. Summaries can have + different kinds of data stored as arbitrary tensors. The Summaries + also have a tag, which we use to organize logically related data. The + `EventAccumulator` supports retrieving the `Event` and `Summary` data + by its tag. + + Calling `Tags()` gets a map from `tagType` (i.e., `tensors`) to the + associated tags for those data types. Then, the functional endpoint + (i.g., `Accumulator.Tensors(tag)`) allows for the retrieval of all + data associated with that tag. + + The `Reload()` method synchronously loads all of the data written so far. + + Fields: + most_recent_step: Step of last Event proto added. This should only + be accessed from the thread that calls Reload. This is -1 if + nothing has been loaded yet. + most_recent_wall_time: Timestamp of last Event proto added. This is + a float containing seconds from the UNIX epoch, or -1 if + nothing has been loaded yet. This should only be accessed from + the thread that calls Reload. + path: A file path to a directory containing tf events files, or a single + tf events file. The accumulator will load events from this path. + tensors_by_tag: A dictionary mapping each tag name to a + reservoir.Reservoir of tensor summaries. Each such reservoir will + only use a single key, given by `_TENSOR_RESERVOIR_KEY`. + + @@Tensors + """ + + def __init__(self, + path, + size_guidance=None, + tensor_size_guidance=None, + purge_orphaned_data=True): + """Construct the `EventAccumulator`. + + Args: + path: A file path to a directory containing tf events files, or a single + tf events file. The accumulator will load events from this path. + size_guidance: Information on how much data the EventAccumulator should + store in memory. The DEFAULT_SIZE_GUIDANCE tries not to store too much + so as to avoid OOMing the client. The size_guidance should be a map + from a `tagType` string to an integer representing the number of + items to keep per tag for items of that `tagType`. If the size is 0, + all events are stored. + tensor_size_guidance: Like `size_guidance`, but allowing finer + granularity for tensor summaries. Should be a map from the + `plugin_name` field on the `PluginData` proto to an integer + representing the number of items to keep per tag. Plugins for + which there is no entry in this map will default to the value of + `size_guidance[event_accumulator.TENSORS]`. Defaults to `{}`. + purge_orphaned_data: Whether to discard any events that were "orphaned" by + a TensorFlow restart. + """ + size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE) + sizes = {} + for key in DEFAULT_SIZE_GUIDANCE: + if key in size_guidance: + sizes[key] = size_guidance[key] + else: + sizes[key] = DEFAULT_SIZE_GUIDANCE[key] + self._size_guidance = size_guidance + self._tensor_size_guidance = dict(tensor_size_guidance or {}) + + self._first_event_timestamp = None + + self._graph = None + self._graph_from_metagraph = False + self._meta_graph = None + self._tagged_metadata = {} + self.summary_metadata = {} + self.tensors_by_tag = {} + self._tensors_by_tag_lock = threading.Lock() + + # Keep a mapping from plugin name to a dict mapping from tag to plugin data + # content obtained from the SummaryMetadata (metadata field of Value) for + # that plugin (This is not the entire SummaryMetadata proto - only the + # content for that plugin). The SummaryWriter only keeps the content on the + # first event encountered per tag, so we must store that first instance of + # content for each tag. + self._plugin_to_tag_to_content = collections.defaultdict(dict) + + self._generator_mutex = threading.Lock() + self.path = path + self._generator = _GeneratorFromPath(path) + + self.purge_orphaned_data = purge_orphaned_data + + self.most_recent_step = -1 + self.most_recent_wall_time = -1 + self.file_version = None + + # The attributes that get built up by the accumulator + self.accumulated_attrs = () + self._tensor_summaries = {} + + def Reload(self): + """Loads all events added since the last call to `Reload`. + + If `Reload` was never called, loads all events in the file. + + Returns: + The `EventAccumulator`. + """ + with self._generator_mutex: + for event in self._generator.Load(): + self._ProcessEvent(event) + return self + + def PluginAssets(self, plugin_name): + """Return a list of all plugin assets for the given plugin. + + Args: + plugin_name: The string name of a plugin to retrieve assets for. + + Returns: + A list of string plugin asset names, or empty list if none are available. + If the plugin was not registered, an empty list is returned. + """ + return plugin_asset_util.ListAssets(self.path, plugin_name) + + def RetrievePluginAsset(self, plugin_name, asset_name): + """Return the contents of a given plugin asset. + + Args: + plugin_name: The string name of a plugin. + asset_name: The string name of an asset. + + Returns: + The string contents of the plugin asset. + + Raises: + KeyError: If the asset is not available. + """ + return plugin_asset_util.RetrieveAsset(self.path, plugin_name, asset_name) + + def FirstEventTimestamp(self): + """Returns the timestamp in seconds of the first event. + + If the first event has been loaded (either by this method or by `Reload`, + this returns immediately. Otherwise, it will load in the first event. Note + that this means that calling `Reload` will cause this to block until + `Reload` has finished. + + Returns: + The timestamp in seconds of the first event that was loaded. + + Raises: + ValueError: If no events have been loaded and there were no events found + on disk. + """ + if self._first_event_timestamp is not None: + return self._first_event_timestamp + with self._generator_mutex: + try: + event = next(self._generator.Load()) + self._ProcessEvent(event) + return self._first_event_timestamp + + except StopIteration: + raise ValueError('No event timestamp could be found') + + def PluginTagToContent(self, plugin_name): + """Returns a dict mapping tags to content specific to that plugin. + + Args: + plugin_name: The name of the plugin for which to fetch plugin-specific + content. + + Raises: + KeyError: if the plugin name is not found. + + Returns: + A dict mapping tags to plugin-specific content (which are always strings). + Those strings are often serialized protos. + """ + if plugin_name not in self._plugin_to_tag_to_content: + raise KeyError('Plugin %r could not be found.' % plugin_name) + return self._plugin_to_tag_to_content[plugin_name] + + def SummaryMetadata(self, tag): + """Given a summary tag name, return the associated metadata object. + + Args: + tag: The name of a tag, as a string. + + Raises: + KeyError: If the tag is not found. + + Returns: + A `SummaryMetadata` protobuf. + """ + return self.summary_metadata[tag] + + def _ProcessEvent(self, event): + """Called whenever an event is loaded.""" + if self._first_event_timestamp is None: + self._first_event_timestamp = event.wall_time + + if event.HasField('file_version'): + new_file_version = _ParseFileVersion(event.file_version) + if self.file_version and self.file_version != new_file_version: + ## This should not happen. + tf.logging.warn(('Found new file_version for event.proto. This will ' + 'affect purging logic for TensorFlow restarts. ' + 'Old: {0} New: {1}').format(self.file_version, + new_file_version)) + self.file_version = new_file_version + + self._MaybePurgeOrphanedData(event) + + ## Process the event. + # GraphDef and MetaGraphDef are handled in a special way: + # If no graph_def Event is available, but a meta_graph_def is, and it + # contains a graph_def, then use the meta_graph_def.graph_def as our graph. + # If a graph_def Event is available, always prefer it to the graph_def + # inside the meta_graph_def. + if event.HasField('graph_def'): + if self._graph is not None: + tf.logging.warn( + ('Found more than one graph event per run, or there was ' + 'a metagraph containing a graph_def, as well as one or ' + 'more graph events. Overwriting the graph with the ' + 'newest event.')) + self._graph = event.graph_def + self._graph_from_metagraph = False + elif event.HasField('meta_graph_def'): + if self._meta_graph is not None: + tf.logging.warn(('Found more than one metagraph event per run. ' + 'Overwriting the metagraph with the newest event.')) + self._meta_graph = event.meta_graph_def + if self._graph is None or self._graph_from_metagraph: + # We may have a graph_def in the metagraph. If so, and no + # graph_def is directly available, use this one instead. + meta_graph = tf.MetaGraphDef() + meta_graph.ParseFromString(self._meta_graph) + if meta_graph.graph_def: + if self._graph is not None: + tf.logging.warn( + ('Found multiple metagraphs containing graph_defs,' + 'but did not find any graph events. Overwriting the ' + 'graph with the newest metagraph version.')) + self._graph_from_metagraph = True + self._graph = meta_graph.graph_def.SerializeToString() + elif event.HasField('tagged_run_metadata'): + tag = event.tagged_run_metadata.tag + if tag in self._tagged_metadata: + tf.logging.warn('Found more than one "run metadata" event with tag ' + + tag + '. Overwriting it with the newest event.') + self._tagged_metadata[tag] = event.tagged_run_metadata.run_metadata + elif event.HasField('summary'): + for value in event.summary.value: + value = data_compat.migrate_value(value) + + if value.HasField('metadata'): + tag = value.tag + # We only store the first instance of the metadata. This check + # is important: the `FileWriter` does strip metadata from all + # values except the first one per each tag, but a new + # `FileWriter` is created every time a training job stops and + # restarts. Hence, we must also ignore non-initial metadata in + # this logic. + if tag not in self.summary_metadata: + self.summary_metadata[tag] = value.metadata + plugin_data = value.metadata.plugin_data + if plugin_data.plugin_name: + self._plugin_to_tag_to_content[plugin_data.plugin_name][tag] = ( + plugin_data.content) + else: + tf.logging.warn( + ('This summary with tag %r is oddly not associated with a ' + 'plugin.'), tag) + + for summary_type, summary_func in SUMMARY_TYPES.items(): + if value.HasField(summary_type): + datum = getattr(value, summary_type) + tag = value.tag + if summary_type == 'tensor' and not tag: + # This tensor summary was created using the old method that used + # plugin assets. We must still continue to support it. + tag = value.node_name + getattr(self, summary_func)(tag, event.wall_time, event.step, datum) + + def Tags(self): + """Return all tags found in the value stream. + + Returns: + A `{tagType: ['list', 'of', 'tags']}` dictionary. + """ + return { + TENSORS: list(self.tensors_by_tag.keys()), + # Use a heuristic: if the metagraph is available, but + # graph is not, then we assume the metagraph contains the graph. + GRAPH: self._graph is not None, + META_GRAPH: self._meta_graph is not None, + RUN_METADATA: list(self._tagged_metadata.keys()) + } + + def Graph(self): + """Return the graph definition, if there is one. + + If the graph is stored directly, return that. If no graph is stored + directly but a metagraph is stored containing a graph, return that. + + Raises: + ValueError: If there is no graph for this run. + + Returns: + The `graph_def` proto. + """ + graph = tf.GraphDef() + if self._graph is not None: + graph.ParseFromString(self._graph) + return graph + raise ValueError('There is no graph in this EventAccumulator') + + def MetaGraph(self): + """Return the metagraph definition, if there is one. + + Raises: + ValueError: If there is no metagraph for this run. + + Returns: + The `meta_graph_def` proto. + """ + if self._meta_graph is None: + raise ValueError('There is no metagraph in this EventAccumulator') + meta_graph = tf.MetaGraphDef() + meta_graph.ParseFromString(self._meta_graph) + return meta_graph + + def RunMetadata(self, tag): + """Given a tag, return the associated session.run() metadata. + + Args: + tag: A string tag associated with the event. + + Raises: + ValueError: If the tag is not found. + + Returns: + The metadata in form of `RunMetadata` proto. + """ + if tag not in self._tagged_metadata: + raise ValueError('There is no run metadata with this tag name') + + run_metadata = tf.RunMetadata() + run_metadata.ParseFromString(self._tagged_metadata[tag]) + return run_metadata + + def Tensors(self, tag): + """Given a summary tag, return all associated tensors. + + Args: + tag: A string tag associated with the events. + + Raises: + KeyError: If the tag is not found. + + Returns: + An array of `TensorEvent`s. + """ + return self.tensors_by_tag[tag].Items(_TENSOR_RESERVOIR_KEY) + + def _MaybePurgeOrphanedData(self, event): + """Maybe purge orphaned data due to a TensorFlow crash. + + When TensorFlow crashes at step T+O and restarts at step T, any events + written after step T are now "orphaned" and will be at best misleading if + they are included in TensorBoard. + + This logic attempts to determine if there is orphaned data, and purge it + if it is found. + + Args: + event: The event to use as a reference, to determine if a purge is needed. + """ + if not self.purge_orphaned_data: + return + ## Check if the event happened after a crash, and purge expired tags. + if self.file_version and self.file_version >= 2: + ## If the file_version is recent enough, use the SessionLog enum + ## to check for restarts. + self._CheckForRestartAndMaybePurge(event) + else: + ## If there is no file version, default to old logic of checking for + ## out of order steps. + self._CheckForOutOfOrderStepAndMaybePurge(event) + + def _CheckForRestartAndMaybePurge(self, event): + """Check and discard expired events using SessionLog.START. + + Check for a SessionLog.START event and purge all previously seen events + with larger steps, because they are out of date. Because of supervisor + threading, it is possible that this logic will cause the first few event + messages to be discarded since supervisor threading does not guarantee + that the START message is deterministically written first. + + This method is preferred over _CheckForOutOfOrderStepAndMaybePurge which + can inadvertently discard events due to supervisor threading. + + Args: + event: The event to use as reference. If the event is a START event, all + previously seen events with a greater event.step will be purged. + """ + if event.HasField( + 'session_log') and event.session_log.status == tf.SessionLog.START: + self._Purge(event, by_tags=False) + + def _CheckForOutOfOrderStepAndMaybePurge(self, event): + """Check for out-of-order event.step and discard expired events for tags. + + Check if the event is out of order relative to the global most recent step. + If it is, purge outdated summaries for tags that the event contains. + + Args: + event: The event to use as reference. If the event is out-of-order, all + events with the same tags, but with a greater event.step will be purged. + """ + if event.step < self.most_recent_step and event.HasField('summary'): + self._Purge(event, by_tags=True) + else: + self.most_recent_step = event.step + self.most_recent_wall_time = event.wall_time + + def _ProcessTensor(self, tag, wall_time, step, tensor): + tv = TensorEvent(wall_time=wall_time, step=step, tensor_proto=tensor) + with self._tensors_by_tag_lock: + if tag not in self.tensors_by_tag: + reservoir_size = self._GetTensorReservoirSize(tag) + self.tensors_by_tag[tag] = reservoir.Reservoir(reservoir_size) + self.tensors_by_tag[tag].AddItem(_TENSOR_RESERVOIR_KEY, tv) + + def _GetTensorReservoirSize(self, tag): + default = self._size_guidance[TENSORS] + summary_metadata = self.summary_metadata.get(tag) + if summary_metadata is None: + return default + return self._tensor_size_guidance.get( + summary_metadata.plugin_data.plugin_name, default) + + def _Purge(self, event, by_tags): + """Purge all events that have occurred after the given event.step. + + If by_tags is True, purge all events that occurred after the given + event.step, but only for the tags that the event has. Non-sequential + event.steps suggest that a TensorFlow restart occurred, and we discard + the out-of-order events to display a consistent view in TensorBoard. + + Discarding by tags is the safer method, when we are unsure whether a restart + has occurred, given that threading in supervisor can cause events of + different tags to arrive with unsynchronized step values. + + If by_tags is False, then purge all events with event.step greater than the + given event.step. This can be used when we are certain that a TensorFlow + restart has occurred and these events can be discarded. + + Args: + event: The event to use as reference for the purge. All events with + the same tags, but with a greater event.step will be purged. + by_tags: Bool to dictate whether to discard all out-of-order events or + only those that are associated with the given reference event. + """ + ## Keep data in reservoirs that has a step less than event.step + _NotExpired = lambda x: x.step < event.step + + if by_tags: + def _ExpiredPerTag(value): + return [getattr(self, x).FilterItems(_NotExpired, value.tag) + for x in self.accumulated_attrs] + + expired_per_tags = [_ExpiredPerTag(value) + for value in event.summary.value] + expired_per_type = [sum(x) for x in zip(*expired_per_tags)] + else: + expired_per_type = [getattr(self, x).FilterItems(_NotExpired) + for x in self.accumulated_attrs] + + if sum(expired_per_type) > 0: + purge_msg = _GetPurgeMessage(self.most_recent_step, + self.most_recent_wall_time, event.step, + event.wall_time, *expired_per_type) + tf.logging.warn(purge_msg) + + +def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, + event_wall_time): + """Return the string message associated with TensorBoard purges.""" + return ('Detected out of order event.step likely caused by ' + 'a TensorFlow restart. Purging expired events from Tensorboard' + ' display between the previous step: {} (timestamp: {}) and ' + 'current step: {} (timestamp: {}).' + ).format(most_recent_step, most_recent_wall_time, event_step, + event_wall_time) + + +def _GeneratorFromPath(path): + """Create an event generator for file or directory at given path string.""" + if not path: + raise ValueError('path must be a valid string') + if IsTensorFlowEventsFile(path): + return event_file_loader.EventFileLoader(path) + else: + return directory_watcher.DirectoryWatcher( + path, event_file_loader.EventFileLoader, IsTensorFlowEventsFile) + + +def _ParseFileVersion(file_version): + """Convert the string file_version in event.proto into a float. + + Args: + file_version: String file_version from event.proto + + Returns: + Version number as a float. + """ + tokens = file_version.split('brain.Event:') + try: + return float(tokens[-1]) + except ValueError: + ## This should never happen according to the definition of file_version + ## specified in event.proto. + tf.logging.warn( + ('Invalid event.proto file_version. Defaulting to use of ' + 'out-of-order event.step logic for purging expired events.')) + return -1 diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py new file mode 100644 index 00000000000..d9a99e1e905 --- /dev/null +++ b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py @@ -0,0 +1,718 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os + +import numpy as np +import six +from six.moves import xrange # pylint: disable=redefined-builtin +import tensorflow as tf + +from tensorboard.plugins.audio import summary as audio_summary +from tensorboard.plugins.image import summary as image_summary +from tensorboard.plugins.scalar import summary as scalar_summary +from tensorboard.backend.event_processing import plugin_event_accumulator as ea + + +class _EventGenerator(object): + """Class that can add_events and then yield them back. + + Satisfies the EventGenerator API required for the EventAccumulator. + Satisfies the EventWriter API required to create a tf.summary.FileWriter. + + Has additional convenience methods for adding test events. + """ + + def __init__(self, testcase, zero_out_timestamps=False): + self._testcase = testcase + self.items = [] + self.zero_out_timestamps = zero_out_timestamps + + def Load(self): + while self.items: + yield self.items.pop(0) + + def AddScalarTensor(self, tag, wall_time=0, step=0, value=0): + """Add a rank-0 tensor event. + + Note: This is not related to the scalar plugin; it's just a + convenience function to add an event whose contents aren't + important. + """ + tensor = tf.make_tensor_proto(float(value)) + event = tf.Event( + wall_time=wall_time, + step=step, + summary=tf.Summary( + value=[tf.Summary.Value(tag=tag, tensor=tensor)])) + self.AddEvent(event) + + def AddEvent(self, event): + if self.zero_out_timestamps: + event.wall_time = 0 + self.items.append(event) + + def add_event(self, event): # pylint: disable=invalid-name + """Match the EventWriter API.""" + self.AddEvent(event) + + def get_logdir(self): # pylint: disable=invalid-name + """Return a temp directory for asset writing.""" + return self._testcase.get_temp_dir() + + +class EventAccumulatorTest(tf.test.TestCase): + + def assertTagsEqual(self, actual, expected): + """Utility method for checking the return value of the Tags() call. + + It fills out the `expected` arg with the default (empty) values for every + tag type, so that the author needs only specify the non-empty values they + are interested in testing. + + Args: + actual: The actual Accumulator tags response. + expected: The expected tags response (empty fields may be omitted) + """ + + empty_tags = { + ea.GRAPH: False, + ea.META_GRAPH: False, + ea.RUN_METADATA: [], + ea.TENSORS: [], + } + + # Verifies that there are no unexpected keys in the actual response. + # If this line fails, likely you added a new tag type, and need to update + # the empty_tags dictionary above. + self.assertItemsEqual(actual.keys(), empty_tags.keys()) + + for key in actual: + expected_value = expected.get(key, empty_tags[key]) + if isinstance(expected_value, list): + self.assertItemsEqual(actual[key], expected_value) + else: + self.assertEqual(actual[key], expected_value) + + +class MockingEventAccumulatorTest(EventAccumulatorTest): + + def setUp(self): + super(MockingEventAccumulatorTest, self).setUp() + self.stubs = tf.test.StubOutForTesting() + self._real_constructor = ea.EventAccumulator + self._real_generator = ea._GeneratorFromPath + + def _FakeAccumulatorConstructor(generator, *args, **kwargs): + ea._GeneratorFromPath = lambda x: generator + return self._real_constructor(generator, *args, **kwargs) + + ea.EventAccumulator = _FakeAccumulatorConstructor + + def tearDown(self): + self.stubs.CleanUp() + ea.EventAccumulator = self._real_constructor + ea._GeneratorFromPath = self._real_generator + + def testEmptyAccumulator(self): + gen = _EventGenerator(self) + x = ea.EventAccumulator(gen) + x.Reload() + self.assertTagsEqual(x.Tags(), {}) + + def testReload(self): + """EventAccumulator contains suitable tags after calling Reload.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + acc.Reload() + self.assertTagsEqual(acc.Tags(), {}) + gen.AddScalarTensor('s1', wall_time=1, step=10, value=50) + gen.AddScalarTensor('s2', wall_time=1, step=10, value=80) + acc.Reload() + self.assertTagsEqual(acc.Tags(), { + ea.TENSORS: ['s1', 's2'], + }) + + def testKeyError(self): + """KeyError should be raised when accessing non-existing keys.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + acc.Reload() + with self.assertRaises(KeyError): + acc.Tensors('s1') + + def testNonValueEvents(self): + """Non-value events in the generator don't cause early exits.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + gen.AddScalarTensor('s1', wall_time=1, step=10, value=20) + gen.AddEvent(tf.Event(wall_time=2, step=20, file_version='nots2')) + gen.AddScalarTensor('s3', wall_time=3, step=100, value=1) + + acc.Reload() + self.assertTagsEqual(acc.Tags(), { + ea.TENSORS: ['s1', 's3'], + }) + + def testExpiredDataDiscardedAfterRestartForFileVersionLessThan2(self): + """Tests that events are discarded after a restart is detected. + + If a step value is observed to be lower than what was previously seen, + this should force a discard of all previous items with the same tag + that are outdated. + + Only file versions < 2 use this out-of-order discard logic. Later versions + discard events based on the step value of SessionLog.START. + """ + self.skipTest("TODO: Implement event discarding for tensor events.") + warnings = [] + self.stubs.Set(tf.logging, 'warn', warnings.append) + + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + + gen.AddEvent(tf.Event(wall_time=0, step=0, file_version='brain.Event:1')) + gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) + acc.Reload() + ## Check that number of items are what they should be + self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 200, 300]) + + gen.AddScalarTensor('s1', wall_time=1, step=101, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=201, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=301, value=20) + acc.Reload() + ## Check that we have discarded 200 and 300 from s1 + self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 101, 201, 301]) + + def testOrphanedDataNotDiscardedIfFlagUnset(self): + """Tests that events are not discarded if purge_orphaned_data is false. + """ + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen, purge_orphaned_data=False) + + gen.AddEvent(tf.Event(wall_time=0, step=0, file_version='brain.Event:1')) + gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) + acc.Reload() + ## Check that number of items are what they should be + self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 200, 300]) + + gen.AddScalarTensor('s1', wall_time=1, step=101, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=201, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=301, value=20) + acc.Reload() + ## Check that we have discarded 200 and 300 from s1 + self.assertEqual([x.step for x in acc.Tensors('s1')], + [100, 200, 300, 101, 201, 301]) + + def testEventsDiscardedPerTagAfterRestartForFileVersionLessThan2(self): + """Tests that event discards after restart, only affect the misordered tag. + + If a step value is observed to be lower than what was previously seen, + this should force a discard of all previous items that are outdated, but + only for the out of order tag. Other tags should remain unaffected. + + Only file versions < 2 use this out-of-order discard logic. Later versions + discard events based on the step value of SessionLog.START. + """ + self.skipTest("TODO: Implement event discarding for tensor events.") + warnings = [] + self.stubs.Set(tf.logging, 'warn', warnings.append) + + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + + gen.AddEvent(tf.Event(wall_time=0, step=0, file_version='brain.Event:1')) + gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=101, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=201, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=301, value=20) + + gen.AddScalarTensor('s2', wall_time=1, step=101, value=20) + gen.AddScalarTensor('s2', wall_time=1, step=201, value=20) + gen.AddScalarTensor('s2', wall_time=1, step=301, value=20) + + acc.Reload() + ## Check that we have discarded 200 and 300 + self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 101, 201, 301]) + + ## Check that s1 discards do not affect s2 + ## i.e. check that only events from the out of order tag are discarded + self.assertEqual([x.step for x in acc.Tensors('s2')], [101, 201, 301]) + + def testOnlySummaryEventsTriggerDiscards(self): + """Test that file version event does not trigger data purge.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) + ev1 = tf.Event(wall_time=2, step=0, file_version='brain.Event:1') + graph_bytes = tf.GraphDef().SerializeToString() + ev2 = tf.Event(wall_time=3, step=0, graph_def=graph_bytes) + gen.AddEvent(ev1) + gen.AddEvent(ev2) + acc.Reload() + self.assertEqual([x.step for x in acc.Tensors('s1')], [100]) + + def testSessionLogStartMessageDiscardsExpiredEvents(self): + """Test that SessionLog.START message discards expired events. + + This discard logic is preferred over the out-of-order step discard logic, + but this logic can only be used for event protos which have the SessionLog + enum, which was introduced to event.proto for file_version >= brain.Event:2. + """ + self.skipTest("TODO: Implement event discarding for tensor events.") + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + gen.AddEvent(tf.Event(wall_time=0, step=1, file_version='brain.Event:2')) + + gen.AddScalarTensor('s1', wall_time=1, step=100, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=200, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=300, value=20) + gen.AddScalarTensor('s1', wall_time=1, step=400, value=20) + + gen.AddScalarTensor('s2', wall_time=1, step=202, value=20) + gen.AddScalarTensor('s2', wall_time=1, step=203, value=20) + + slog = tf.SessionLog(status=tf.SessionLog.START) + gen.AddEvent(tf.Event(wall_time=2, step=201, session_log=slog)) + acc.Reload() + self.assertEqual([x.step for x in acc.Tensors('s1')], [100, 200]) + self.assertEqual([x.step for x in acc.Tensors('s2')], []) + + def testFirstEventTimestamp(self): + """Test that FirstEventTimestamp() returns wall_time of the first event.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + gen.AddEvent(tf.Event(wall_time=10, step=20, file_version='brain.Event:2')) + gen.AddScalarTensor('s1', wall_time=30, step=40, value=20) + self.assertEqual(acc.FirstEventTimestamp(), 10) + + def testReloadPopulatesFirstEventTimestamp(self): + """Test that Reload() means FirstEventTimestamp() won't load events.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + gen.AddEvent(tf.Event(wall_time=1, step=2, file_version='brain.Event:2')) + + acc.Reload() + + def _Die(*args, **kwargs): # pylint: disable=unused-argument + raise RuntimeError('Load() should not be called') + + self.stubs.Set(gen, 'Load', _Die) + self.assertEqual(acc.FirstEventTimestamp(), 1) + + def testFirstEventTimestampLoadsEvent(self): + """Test that FirstEventTimestamp() doesn't discard the loaded event.""" + gen = _EventGenerator(self) + acc = ea.EventAccumulator(gen) + gen.AddEvent(tf.Event(wall_time=1, step=2, file_version='brain.Event:2')) + + self.assertEqual(acc.FirstEventTimestamp(), 1) + acc.Reload() + self.assertEqual(acc.file_version, 2.0) + + def testNewStyleScalarSummary(self): + """Verify processing of tensorboard.plugins.scalar.summary.""" + event_sink = _EventGenerator(self, zero_out_timestamps=True) + writer = tf.summary.FileWriter(self.get_temp_dir()) + writer.event_writer = event_sink + with self.test_session() as sess: + step = tf.placeholder(tf.float32, shape=[]) + scalar_summary.op('accuracy', 1.0 - 1.0 / (step + tf.constant(1.0))) + scalar_summary.op('xent', 1.0 / (step + tf.constant(1.0))) + merged = tf.summary.merge_all() + writer.add_graph(sess.graph) + for i in xrange(10): + summ = sess.run(merged, feed_dict={step: float(i)}) + writer.add_summary(summ, global_step=i) + + accumulator = ea.EventAccumulator(event_sink) + accumulator.Reload() + + tags = [ + u'accuracy/scalar_summary', + u'xent/scalar_summary', + ] + + self.assertTagsEqual(accumulator.Tags(), { + ea.TENSORS: tags, + ea.GRAPH: True, + ea.META_GRAPH: False, + }) + + def testNewStyleAudioSummary(self): + """Verify processing of tensorboard.plugins.audio.summary.""" + event_sink = _EventGenerator(self, zero_out_timestamps=True) + writer = tf.summary.FileWriter(self.get_temp_dir()) + writer.event_writer = event_sink + with self.test_session() as sess: + ipt = tf.random_normal(shape=[5, 441, 2]) + with tf.name_scope('1'): + audio_summary.op('one', ipt, sample_rate=44100, max_outputs=1) + with tf.name_scope('2'): + audio_summary.op('two', ipt, sample_rate=44100, max_outputs=2) + with tf.name_scope('3'): + audio_summary.op('three', ipt, sample_rate=44100, max_outputs=3) + merged = tf.summary.merge_all() + writer.add_graph(sess.graph) + for i in xrange(10): + summ = sess.run(merged) + writer.add_summary(summ, global_step=i) + + accumulator = ea.EventAccumulator(event_sink) + accumulator.Reload() + + tags = [ + u'1/one/audio_summary', + u'2/two/audio_summary', + u'3/three/audio_summary', + ] + + self.assertTagsEqual(accumulator.Tags(), { + ea.TENSORS: tags, + ea.GRAPH: True, + ea.META_GRAPH: False, + }) + + def testNewStyleImageSummary(self): + """Verify processing of tensorboard.plugins.image.summary.""" + event_sink = _EventGenerator(self, zero_out_timestamps=True) + writer = tf.summary.FileWriter(self.get_temp_dir()) + writer.event_writer = event_sink + with self.test_session() as sess: + ipt = tf.ones([10, 4, 4, 3], tf.uint8) + # This is an interesting example, because the old tf.image_summary op + # would throw an error here, because it would be tag reuse. + # Using the tf node name instead allows argument re-use to the image + # summary. + with tf.name_scope('1'): + image_summary.op('images', ipt, max_outputs=1) + with tf.name_scope('2'): + image_summary.op('images', ipt, max_outputs=2) + with tf.name_scope('3'): + image_summary.op('images', ipt, max_outputs=3) + merged = tf.summary.merge_all() + writer.add_graph(sess.graph) + for i in xrange(10): + summ = sess.run(merged) + writer.add_summary(summ, global_step=i) + + accumulator = ea.EventAccumulator(event_sink) + accumulator.Reload() + + tags = [ + u'1/images/image_summary', + u'2/images/image_summary', + u'3/images/image_summary', + ] + + self.assertTagsEqual(accumulator.Tags(), { + ea.TENSORS: tags, + ea.GRAPH: True, + ea.META_GRAPH: False, + }) + + def testTFSummaryTensor(self): + """Verify processing of tf.summary.tensor.""" + event_sink = _EventGenerator(self, zero_out_timestamps=True) + writer = tf.summary.FileWriter(self.get_temp_dir()) + writer.event_writer = event_sink + with self.test_session() as sess: + tf.summary.tensor_summary('scalar', tf.constant(1.0)) + tf.summary.tensor_summary('vector', tf.constant([1.0, 2.0, 3.0])) + tf.summary.tensor_summary('string', tf.constant(six.b('foobar'))) + merged = tf.summary.merge_all() + summ = sess.run(merged) + writer.add_summary(summ, 0) + + accumulator = ea.EventAccumulator(event_sink) + accumulator.Reload() + + self.assertTagsEqual(accumulator.Tags(), { + ea.TENSORS: ['scalar', 'vector', 'string'], + }) + + scalar_proto = accumulator.Tensors('scalar')[0].tensor_proto + scalar = tf.make_ndarray(scalar_proto) + vector_proto = accumulator.Tensors('vector')[0].tensor_proto + vector = tf.make_ndarray(vector_proto) + string_proto = accumulator.Tensors('string')[0].tensor_proto + string = tf.make_ndarray(string_proto) + + self.assertTrue(np.array_equal(scalar, 1.0)) + self.assertTrue(np.array_equal(vector, [1.0, 2.0, 3.0])) + self.assertTrue(np.array_equal(string, six.b('foobar'))) + + def _testTFSummaryTensor_SizeGuidance(self, + plugin_name, + tensor_size_guidance, + steps, + expected_count): + event_sink = _EventGenerator(self, zero_out_timestamps=True) + writer = tf.summary.FileWriter(self.get_temp_dir()) + writer.event_writer = event_sink + with self.test_session() as sess: + summary_metadata = tf.SummaryMetadata( + plugin_data=tf.SummaryMetadata.PluginData(plugin_name=plugin_name, + content='{}')) + tf.summary.tensor_summary('scalar', tf.constant(1.0), + summary_metadata=summary_metadata) + merged = tf.summary.merge_all() + for step in xrange(steps): + writer.add_summary(sess.run(merged), global_step=step) + + + accumulator = ea.EventAccumulator( + event_sink, tensor_size_guidance=tensor_size_guidance) + accumulator.Reload() + + tensors = accumulator.Tensors('scalar') + self.assertEqual(len(tensors), expected_count) + + def testTFSummaryTensor_SizeGuidance_DefaultToTensorGuidance(self): + self._testTFSummaryTensor_SizeGuidance( + plugin_name='jabberwocky', + tensor_size_guidance={}, + steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 1, + expected_count=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS]) + + def testTFSummaryTensor_SizeGuidance_UseSmallSingularPluginGuidance(self): + size = int(ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] / 2) + assert size < ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS], size + self._testTFSummaryTensor_SizeGuidance( + plugin_name='jabberwocky', + tensor_size_guidance={'jabberwocky': size}, + steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 1, + expected_count=size) + + def testTFSummaryTensor_SizeGuidance_UseLargeSingularPluginGuidance(self): + size = ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 5 + self._testTFSummaryTensor_SizeGuidance( + plugin_name='jabberwocky', + tensor_size_guidance={'jabberwocky': size}, + steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 10, + expected_count=size) + + def testTFSummaryTensor_SizeGuidance_IgnoreIrrelevantGuidances(self): + size_small = int(ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] / 3) + size_large = int(ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] / 2) + assert size_small < size_large < ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS], ( + size_small, size_large) + self._testTFSummaryTensor_SizeGuidance( + plugin_name='jabberwocky', + tensor_size_guidance={'jabberwocky': size_small, + 'wnoorejbpxl': size_large}, + steps=ea.DEFAULT_SIZE_GUIDANCE[ea.TENSORS] + 1, + expected_count=size_small) + + +class RealisticEventAccumulatorTest(EventAccumulatorTest): + + def testTensorsRealistically(self): + """Test accumulator by writing values and then reading them.""" + + def FakeScalarSummary(tag, value): + value = tf.Summary.Value(tag=tag, simple_value=value) + summary = tf.Summary(value=[value]) + return summary + + directory = os.path.join(self.get_temp_dir(), 'values_dir') + if tf.gfile.IsDirectory(directory): + tf.gfile.DeleteRecursively(directory) + tf.gfile.MkDir(directory) + + writer = tf.summary.FileWriter(directory, max_queue=100) + + with tf.Graph().as_default() as graph: + _ = tf.constant([2.0, 1.0]) + # Add a graph to the summary writer. + writer.add_graph(graph) + meta_graph_def = tf.train.export_meta_graph(graph_def=graph.as_graph_def( + add_shapes=True)) + writer.add_meta_graph(meta_graph_def) + + run_metadata = tf.RunMetadata() + device_stats = run_metadata.step_stats.dev_stats.add() + device_stats.device = 'test device' + writer.add_run_metadata(run_metadata, 'test run') + + # Write a bunch of events using the writer. + for i in xrange(30): + summ_id = FakeScalarSummary('id', i) + summ_sq = FakeScalarSummary('sq', i * i) + writer.add_summary(summ_id, i * 5) + writer.add_summary(summ_sq, i * 5) + writer.flush() + + # Verify that we can load those events properly + acc = ea.EventAccumulator(directory) + acc.Reload() + self.assertTagsEqual(acc.Tags(), { + ea.TENSORS: ['id', 'sq'], + ea.GRAPH: True, + ea.META_GRAPH: True, + ea.RUN_METADATA: ['test run'], + }) + id_events = acc.Tensors('id') + sq_events = acc.Tensors('sq') + self.assertEqual(30, len(id_events)) + self.assertEqual(30, len(sq_events)) + for i in xrange(30): + self.assertEqual(i * 5, id_events[i].step) + self.assertEqual(i * 5, sq_events[i].step) + self.assertEqual(i, tf.make_ndarray(id_events[i].tensor_proto).item()) + self.assertEqual(i * i, tf.make_ndarray(sq_events[i].tensor_proto).item()) + + # Write a few more events to test incremental reloading + for i in xrange(30, 40): + summ_id = FakeScalarSummary('id', i) + summ_sq = FakeScalarSummary('sq', i * i) + writer.add_summary(summ_id, i * 5) + writer.add_summary(summ_sq, i * 5) + writer.flush() + + # Verify we can now see all of the data + acc.Reload() + id_events = acc.Tensors('id') + sq_events = acc.Tensors('sq') + self.assertEqual(40, len(id_events)) + self.assertEqual(40, len(sq_events)) + for i in xrange(40): + self.assertEqual(i * 5, id_events[i].step) + self.assertEqual(i * 5, sq_events[i].step) + self.assertEqual(i, tf.make_ndarray(id_events[i].tensor_proto).item()) + self.assertEqual(i * i, tf.make_ndarray(sq_events[i].tensor_proto).item()) + self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph()) + self.assertProtoEquals(meta_graph_def, acc.MetaGraph()) + + def testGraphFromMetaGraphBecomesAvailable(self): + """Test accumulator by writing values and then reading them.""" + + directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir') + if tf.gfile.IsDirectory(directory): + tf.gfile.DeleteRecursively(directory) + tf.gfile.MkDir(directory) + + writer = tf.summary.FileWriter(directory, max_queue=100) + + with tf.Graph().as_default() as graph: + _ = tf.constant([2.0, 1.0]) + # Add a graph to the summary writer. + meta_graph_def = tf.train.export_meta_graph(graph_def=graph.as_graph_def( + add_shapes=True)) + writer.add_meta_graph(meta_graph_def) + + writer.flush() + + # Verify that we can load those events properly + acc = ea.EventAccumulator(directory) + acc.Reload() + self.assertTagsEqual(acc.Tags(), { + ea.GRAPH: True, + ea.META_GRAPH: True, + }) + self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph()) + self.assertProtoEquals(meta_graph_def, acc.MetaGraph()) + + def _writeMetadata(self, logdir, summary_metadata, nonce=''): + """Write to disk a summary with the given metadata. + + Arguments: + logdir: a string + summary_metadata: a `SummaryMetadata` protobuf object + nonce: optional; will be added to the end of the event file name + to guarantee that multiple calls to this function do not stomp the + same file + """ + + summary = tf.Summary() + summary.value.add( + tensor=tf.make_tensor_proto(['po', 'ta', 'to'], dtype=tf.string), + tag='you_are_it', + metadata=summary_metadata) + writer = tf.summary.FileWriter(logdir, filename_suffix=nonce) + writer.add_summary(summary.SerializeToString()) + writer.close() + + def testSummaryMetadata(self): + logdir = self.get_temp_dir() + summary_metadata = tf.SummaryMetadata( + display_name='current tagee', + summary_description='no', + plugin_data=tf.SummaryMetadata.PluginData(plugin_name='outlet')) + self._writeMetadata(logdir, summary_metadata) + acc = ea.EventAccumulator(logdir) + acc.Reload() + self.assertProtoEquals(summary_metadata, + acc.SummaryMetadata('you_are_it')) + + def testSummaryMetadata_FirstMetadataWins(self): + logdir = self.get_temp_dir() + summary_metadata_1 = tf.SummaryMetadata( + display_name='current tagee', + summary_description='no', + plugin_data=tf.SummaryMetadata.PluginData(plugin_name='outlet', + content='120v')) + self._writeMetadata(logdir, summary_metadata_1, nonce='1') + acc = ea.EventAccumulator(logdir) + acc.Reload() + summary_metadata_2 = tf.SummaryMetadata( + display_name='tagee of the future', + summary_description='definitely not', + plugin_data=tf.SummaryMetadata.PluginData(plugin_name='plug', + content='110v')) + self._writeMetadata(logdir, summary_metadata_2, nonce='2') + acc.Reload() + + self.assertProtoEquals(summary_metadata_1, + acc.SummaryMetadata('you_are_it')) + + def testPluginTagToContent_PluginsCannotJumpOnTheBandwagon(self): + # If there are multiple `SummaryMetadata` for a given tag, and the + # set of plugins in the `plugin_data` of second is different from + # that of the first, then the second set should be ignored. + logdir = self.get_temp_dir() + summary_metadata_1 = tf.SummaryMetadata( + display_name='current tagee', + summary_description='no', + plugin_data=tf.SummaryMetadata.PluginData(plugin_name='outlet', + content='120v')) + self._writeMetadata(logdir, summary_metadata_1, nonce='1') + acc = ea.EventAccumulator(logdir) + acc.Reload() + summary_metadata_2 = tf.SummaryMetadata( + display_name='tagee of the future', + summary_description='definitely not', + plugin_data=tf.SummaryMetadata.PluginData(plugin_name='plug', + content='110v')) + self._writeMetadata(logdir, summary_metadata_2, nonce='2') + acc.Reload() + + self.assertEqual(acc.PluginTagToContent('outlet'), + {'you_are_it': '120v'}) + with six.assertRaisesRegex(self, KeyError, 'plug'): + acc.PluginTagToContent('plug') + +if __name__ == '__main__': + tf.test.main() diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer.py b/tensorboard/backend/event_processing/plugin_event_multiplexer.py new file mode 100644 index 00000000000..d0082afe1ff --- /dev/null +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer.py @@ -0,0 +1,448 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Provides an interface for working with multiple event files.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import threading + +import six +import tensorflow as tf + +from tensorboard.backend.event_processing import directory_watcher +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long +from tensorboard.backend.event_processing import io_wrapper + + +class EventMultiplexer(object): + """An `EventMultiplexer` manages access to multiple `EventAccumulator`s. + + Each `EventAccumulator` is associated with a `run`, which is a self-contained + TensorFlow execution. The `EventMultiplexer` provides methods for extracting + information about events from multiple `run`s. + + Example usage for loading specific runs from files: + + ```python + x = EventMultiplexer({'run1': 'path/to/run1', 'run2': 'path/to/run2'}) + x.Reload() + ``` + + Example usage for loading a directory where each subdirectory is a run + + ```python + (eg:) /parent/directory/path/ + /parent/directory/path/run1/ + /parent/directory/path/run1/events.out.tfevents.1001 + /parent/directory/path/run1/events.out.tfevents.1002 + + /parent/directory/path/run2/ + /parent/directory/path/run2/events.out.tfevents.9232 + + /parent/directory/path/run3/ + /parent/directory/path/run3/events.out.tfevents.9232 + x = EventMultiplexer().AddRunsFromDirectory('/parent/directory/path') + (which is equivalent to:) + x = EventMultiplexer({'run1': '/parent/directory/path/run1', 'run2':...} + ``` + + If you would like to watch `/parent/directory/path`, wait for it to be created + (if necessary) and then periodically pick up new runs, use + `AutoloadingMultiplexer` + @@Tensors + """ + + def __init__(self, + run_path_map=None, + size_guidance=None, + tensor_size_guidance=None, + purge_orphaned_data=True): + """Constructor for the `EventMultiplexer`. + + Args: + run_path_map: Dict `{run: path}` which specifies the + name of a run, and the path to find the associated events. If it is + None, then the EventMultiplexer initializes without any runs. + size_guidance: A dictionary mapping from `tagType` to the number of items + to store for each tag of that type. See + `event_accumulator.EventAccumulator` for details. + tensor_size_guidance: A dictionary mapping from `plugin_name` to + the number of items to store for each tag of that type. See + `event_accumulator.EventAccumulator` for details. + purge_orphaned_data: Whether to discard any events that were "orphaned" by + a TensorFlow restart. + """ + tf.logging.info('Event Multiplexer initializing.') + self._accumulators_mutex = threading.Lock() + self._accumulators = {} + self._paths = {} + self._reload_called = False + self._size_guidance = (size_guidance or + event_accumulator.DEFAULT_SIZE_GUIDANCE) + self._tensor_size_guidance = tensor_size_guidance + self.purge_orphaned_data = purge_orphaned_data + if run_path_map is not None: + tf.logging.info('Event Multplexer doing initialization load for %s', + run_path_map) + for (run, path) in six.iteritems(run_path_map): + self.AddRun(path, run) + tf.logging.info('Event Multiplexer done initializing') + + def AddRun(self, path, name=None): + """Add a run to the multiplexer. + + If the name is not specified, it is the same as the path. + + If a run by that name exists, and we are already watching the right path, + do nothing. If we are watching a different path, replace the event + accumulator. + + If `Reload` has been called, it will `Reload` the newly created + accumulators. + + Args: + path: Path to the event files (or event directory) for given run. + name: Name of the run to add. If not provided, is set to path. + + Returns: + The `EventMultiplexer`. + """ + name = name or path + accumulator = None + with self._accumulators_mutex: + if name not in self._accumulators or self._paths[name] != path: + if name in self._paths and self._paths[name] != path: + # TODO(@dandelionmane) - Make it impossible to overwrite an old path + # with a new path (just give the new path a distinct name) + tf.logging.warning('Conflict for name %s: old path %s, new path %s', + name, self._paths[name], path) + tf.logging.info('Constructing EventAccumulator for %s', path) + accumulator = event_accumulator.EventAccumulator( + path, + size_guidance=self._size_guidance, + tensor_size_guidance=self._tensor_size_guidance, + purge_orphaned_data=self.purge_orphaned_data) + self._accumulators[name] = accumulator + self._paths[name] = path + if accumulator: + if self._reload_called: + accumulator.Reload() + return self + + def AddRunsFromDirectory(self, path, name=None): + """Load runs from a directory; recursively walks subdirectories. + + If path doesn't exist, no-op. This ensures that it is safe to call + `AddRunsFromDirectory` multiple times, even before the directory is made. + + If path is a directory, load event files in the directory (if any exist) and + recursively call AddRunsFromDirectory on any subdirectories. This mean you + can call AddRunsFromDirectory at the root of a tree of event logs and + TensorBoard will load them all. + + If the `EventMultiplexer` is already loaded this will cause + the newly created accumulators to `Reload()`. + Args: + path: A string path to a directory to load runs from. + name: Optionally, what name to apply to the runs. If name is provided + and the directory contains run subdirectories, the name of each subrun + is the concatenation of the parent name and the subdirectory name. If + name is provided and the directory contains event files, then a run + is added called "name" and with the events from the path. + + Raises: + ValueError: If the path exists and isn't a directory. + + Returns: + The `EventMultiplexer`. + """ + tf.logging.info('Starting AddRunsFromDirectory: %s', path) + for subdir in GetLogdirSubdirectories(path): + tf.logging.info('Adding events from directory %s', subdir) + rpath = os.path.relpath(subdir, path) + subname = os.path.join(name, rpath) if name else rpath + self.AddRun(subdir, name=subname) + tf.logging.info('Done with AddRunsFromDirectory: %s', path) + return self + + def Reload(self): + """Call `Reload` on every `EventAccumulator`.""" + tf.logging.info('Beginning EventMultiplexer.Reload()') + self._reload_called = True + # Build a list so we're safe even if the list of accumulators is modified + # even while we're reloading. + with self._accumulators_mutex: + items = list(self._accumulators.items()) + + names_to_delete = set() + for name, accumulator in items: + try: + accumulator.Reload() + except (OSError, IOError) as e: + tf.logging.error("Unable to reload accumulator '%s': %s", name, e) + except directory_watcher.DirectoryDeletedError: + names_to_delete.add(name) + + with self._accumulators_mutex: + for name in names_to_delete: + tf.logging.warning("Deleting accumulator '%s'", name) + del self._accumulators[name] + tf.logging.info('Finished with EventMultiplexer.Reload()') + return self + + def PluginAssets(self, plugin_name): + """Get index of runs and assets for a given plugin. + + Args: + plugin_name: Name of the plugin we are checking for. + + Returns: + A dictionary that maps from run_name to a list of plugin + assets for that run. + """ + with self._accumulators_mutex: + # To avoid nested locks, we construct a copy of the run-accumulator map + items = list(six.iteritems(self._accumulators)) + + return {run: accum.PluginAssets(plugin_name) for run, accum in items} + + def RetrievePluginAsset(self, run, plugin_name, asset_name): + """Return the contents for a specific plugin asset from a run. + + Args: + run: The string name of the run. + plugin_name: The string name of a plugin. + asset_name: The string name of an asset. + + Returns: + The string contents of the plugin asset. + + Raises: + KeyError: If the asset is not available. + """ + accumulator = self.GetAccumulator(run) + return accumulator.RetrievePluginAsset(plugin_name, asset_name) + + def FirstEventTimestamp(self, run): + """Return the timestamp of the first event of the given run. + + This may perform I/O if no events have been loaded yet for the run. + + Args: + run: A string name of the run for which the timestamp is retrieved. + + Returns: + The wall_time of the first event of the run, which will typically be + seconds since the epoch. + + Raises: + KeyError: If the run is not found. + ValueError: If the run has no events loaded and there are no events on + disk to load. + """ + accumulator = self.GetAccumulator(run) + return accumulator.FirstEventTimestamp() + + def Scalars(self, run, tag): + """Retrieve the scalar events associated with a run and tag. + + Args: + run: A string name of the run for which values are retrieved. + tag: A string name of the tag for which values are retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + An array of `event_accumulator.ScalarEvents`. + """ + accumulator = self.GetAccumulator(run) + return accumulator.Scalars(tag) + + def Graph(self, run): + """Retrieve the graph associated with the provided run. + + Args: + run: A string name of a run to load the graph for. + + Raises: + KeyError: If the run is not found. + ValueError: If the run does not have an associated graph. + + Returns: + The `GraphDef` protobuf data structure. + """ + accumulator = self.GetAccumulator(run) + return accumulator.Graph() + + def MetaGraph(self, run): + """Retrieve the metagraph associated with the provided run. + + Args: + run: A string name of a run to load the graph for. + + Raises: + KeyError: If the run is not found. + ValueError: If the run does not have an associated graph. + + Returns: + The `MetaGraphDef` protobuf data structure. + """ + accumulator = self.GetAccumulator(run) + return accumulator.MetaGraph() + + def RunMetadata(self, run, tag): + """Get the session.run() metadata associated with a TensorFlow run and tag. + + Args: + run: A string name of a TensorFlow run. + tag: A string name of the tag associated with a particular session.run(). + + Raises: + KeyError: If the run is not found, or the tag is not available for the + given run. + + Returns: + The metadata in the form of `RunMetadata` protobuf data structure. + """ + accumulator = self.GetAccumulator(run) + return accumulator.RunMetadata(tag) + + def Audio(self, run, tag): + """Retrieve the audio events associated with a run and tag. + + Args: + run: A string name of the run for which values are retrieved. + tag: A string name of the tag for which values are retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + An array of `event_accumulator.AudioEvents`. + """ + accumulator = self.GetAccumulator(run) + return accumulator.Audio(tag) + + def Tensors(self, run, tag): + """Retrieve the tensor events associated with a run and tag. + + Args: + run: A string name of the run for which values are retrieved. + tag: A string name of the tag for which values are retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + An array of `event_accumulator.TensorEvent`s. + """ + accumulator = self.GetAccumulator(run) + return accumulator.Tensors(tag) + + def PluginRunToTagToContent(self, plugin_name): + """Returns a 2-layer dictionary of the form {run: {tag: content}}. + + The `content` referred above is the content field of the PluginData proto + for the specified plugin within a Summary.Value proto. + + Args: + plugin_name: The name of the plugin for which to fetch content. + + Returns: + A dictionary of the form {run: {tag: content}}. + """ + mapping = {} + for run in self.Runs(): + try: + tag_to_content = self.GetAccumulator(run).PluginTagToContent( + plugin_name) + except KeyError: + # This run lacks content for the plugin. Try the next run. + continue + mapping[run] = tag_to_content + return mapping + + def SummaryMetadata(self, run, tag): + """Return the summary metadata for the given tag on the given run. + + Args: + run: A string name of the run for which summary metadata is to be + retrieved. + tag: A string name of the tag whose summary metadata is to be + retrieved. + + Raises: + KeyError: If the run is not found, or the tag is not available for + the given run. + + Returns: + A `tf.SummaryMetadata` protobuf. + """ + accumulator = self.GetAccumulator(run) + return accumulator.SummaryMetadata(tag) + + def Runs(self): + """Return all the run names in the `EventMultiplexer`. + + Returns: + ``` + {runName: { scalarValues: [tagA, tagB, tagC], + graph: true, meta_graph: true}} + ``` + """ + with self._accumulators_mutex: + # To avoid nested locks, we construct a copy of the run-accumulator map + items = list(six.iteritems(self._accumulators)) + return {run_name: accumulator.Tags() for run_name, accumulator in items} + + def RunPaths(self): + """Returns a dict mapping run names to event file paths.""" + return self._paths + + def GetAccumulator(self, run): + """Returns EventAccumulator for a given run. + + Args: + run: String name of run. + + Returns: + An EventAccumulator object. + + Raises: + KeyError: If run does not exist. + """ + with self._accumulators_mutex: + return self._accumulators[run] + + +def GetLogdirSubdirectories(path): + """Returns subdirectories with event files on path.""" + if tf.gfile.Exists(path) and not tf.gfile.IsDirectory(path): + raise ValueError('GetLogdirSubdirectories: path exists and is not a ' + 'directory, %s' % path) + + # ListRecursively just yields nothing if the path doesn't exist. + return ( + subdir + for (subdir, files) in io_wrapper.ListRecursively(path) + if list(filter(event_accumulator.IsTensorFlowEventsFile, files)) + ) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py new file mode 100644 index 00000000000..e622a9e887a --- /dev/null +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -0,0 +1,325 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import os.path +import shutil + +import tensorflow as tf + +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long + + +def _AddEvents(path): + if not tf.gfile.IsDirectory(path): + tf.gfile.MakeDirs(path) + fpath = os.path.join(path, 'hypothetical.tfevents.out') + with tf.gfile.GFile(fpath, 'w') as f: + f.write('') + return fpath + + +def _CreateCleanDirectory(path): + if tf.gfile.IsDirectory(path): + tf.gfile.DeleteRecursively(path) + tf.gfile.MkDir(path) + + +class _FakeAccumulator(object): + + def __init__(self, path): + """Constructs a fake accumulator with some fake events. + + Args: + path: The path for the run that this accumulator is for. + """ + self._path = path + self.reload_called = False + self._plugin_to_tag_to_content = { + 'baz_plugin': { + 'foo': 'foo_content', + 'bar': 'bar_content', + } + } + + def Tags(self): + return {} + + def FirstEventTimestamp(self): + return 0 + + def _TagHelper(self, tag_name, enum): + if tag_name not in self.Tags()[enum]: + raise KeyError + return ['%s/%s' % (self._path, tag_name)] + + def Tensors(self, tag_name): + return self._TagHelper(tag_name, event_accumulator.TENSORS) + + def PluginTagToContent(self, plugin_name): + # We pre-pend the runs with the path and '_' so that we can verify that the + # tags are associated with the correct runs. + return { + self._path + '_' + run: content_mapping + for (run, content_mapping + ) in self._plugin_to_tag_to_content[plugin_name].items() + } + + def Reload(self): + self.reload_called = True + + +def _GetFakeAccumulator(path, + size_guidance=None, + tensor_size_guidance=None, + purge_orphaned_data=None): + del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused. + return _FakeAccumulator(path) + + +class EventMultiplexerTest(tf.test.TestCase): + + def setUp(self): + super(EventMultiplexerTest, self).setUp() + self.stubs = tf.test.StubOutForTesting() + + self.stubs.Set(event_accumulator, 'EventAccumulator', _GetFakeAccumulator) + + def tearDown(self): + self.stubs.CleanUp() + + def testEmptyLoader(self): + """Tests empty EventMultiplexer creation.""" + x = event_multiplexer.EventMultiplexer() + self.assertEqual(x.Runs(), {}) + + def testRunNamesRespected(self): + """Tests two EventAccumulators inserted/accessed in EventMultiplexer.""" + x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) + self.assertItemsEqual(sorted(x.Runs().keys()), ['run1', 'run2']) + self.assertEqual(x.GetAccumulator('run1')._path, 'path1') + self.assertEqual(x.GetAccumulator('run2')._path, 'path2') + + def testReload(self): + """EventAccumulators should Reload after EventMultiplexer call it.""" + x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) + self.assertFalse(x.GetAccumulator('run1').reload_called) + self.assertFalse(x.GetAccumulator('run2').reload_called) + x.Reload() + self.assertTrue(x.GetAccumulator('run1').reload_called) + self.assertTrue(x.GetAccumulator('run2').reload_called) + + def testPluginRunToTagToContent(self): + """Tests the method that produces the run to tag to content mapping.""" + x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) + self.assertDictEqual({ + 'run1': { + 'path1_foo': 'foo_content', + 'path1_bar': 'bar_content', + }, + 'run2': { + 'path2_foo': 'foo_content', + 'path2_bar': 'bar_content', + } + }, x.PluginRunToTagToContent('baz_plugin')) + + def testExceptions(self): + """KeyError should be raised when accessing non-existing keys.""" + x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) + with self.assertRaises(KeyError): + x.Tensors('sv1', 'xxx') + + def testInitialization(self): + """Tests EventMultiplexer is created properly with its params.""" + x = event_multiplexer.EventMultiplexer() + self.assertEqual(x.Runs(), {}) + x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) + self.assertItemsEqual(x.Runs(), ['run1', 'run2']) + self.assertEqual(x.GetAccumulator('run1')._path, 'path1') + self.assertEqual(x.GetAccumulator('run2')._path, 'path2') + + def testAddRunsFromDirectory(self): + """Tests AddRunsFromDirectory function. + + Tests the following scenarios: + - When the directory does not exist. + - When the directory is empty. + - When the directory has empty subdirectory. + - Contains proper EventAccumulators after adding events. + """ + x = event_multiplexer.EventMultiplexer() + tmpdir = self.get_temp_dir() + join = os.path.join + fakedir = join(tmpdir, 'fake_accumulator_directory') + realdir = join(tmpdir, 'real_accumulator_directory') + self.assertEqual(x.Runs(), {}) + x.AddRunsFromDirectory(fakedir) + self.assertEqual(x.Runs(), {}, 'loading fakedir had no effect') + + _CreateCleanDirectory(realdir) + x.AddRunsFromDirectory(realdir) + self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect') + + path1 = join(realdir, 'path1') + tf.gfile.MkDir(path1) + x.AddRunsFromDirectory(realdir) + self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect') + + _AddEvents(path1) + x.AddRunsFromDirectory(realdir) + self.assertItemsEqual(x.Runs(), ['path1'], 'loaded run: path1') + loader1 = x.GetAccumulator('path1') + self.assertEqual(loader1._path, path1, 'has the correct path') + + path2 = join(realdir, 'path2') + _AddEvents(path2) + x.AddRunsFromDirectory(realdir) + self.assertItemsEqual(x.Runs(), ['path1', 'path2']) + self.assertEqual( + x.GetAccumulator('path1'), loader1, 'loader1 not regenerated') + + path2_2 = join(path2, 'path2') + _AddEvents(path2_2) + x.AddRunsFromDirectory(realdir) + self.assertItemsEqual(x.Runs(), ['path1', 'path2', 'path2/path2']) + self.assertEqual( + x.GetAccumulator('path2/path2')._path, path2_2, 'loader2 path correct') + + def testAddRunsFromDirectoryThatContainsEvents(self): + x = event_multiplexer.EventMultiplexer() + tmpdir = self.get_temp_dir() + join = os.path.join + realdir = join(tmpdir, 'event_containing_directory') + + _CreateCleanDirectory(realdir) + + self.assertEqual(x.Runs(), {}) + + _AddEvents(realdir) + x.AddRunsFromDirectory(realdir) + self.assertItemsEqual(x.Runs(), ['.']) + + subdir = join(realdir, 'subdir') + _AddEvents(subdir) + x.AddRunsFromDirectory(realdir) + self.assertItemsEqual(x.Runs(), ['.', 'subdir']) + + def testAddRunsFromDirectoryWithRunNames(self): + x = event_multiplexer.EventMultiplexer() + tmpdir = self.get_temp_dir() + join = os.path.join + realdir = join(tmpdir, 'event_containing_directory') + + _CreateCleanDirectory(realdir) + + self.assertEqual(x.Runs(), {}) + + _AddEvents(realdir) + x.AddRunsFromDirectory(realdir, 'foo') + self.assertItemsEqual(x.Runs(), ['foo/.']) + + subdir = join(realdir, 'subdir') + _AddEvents(subdir) + x.AddRunsFromDirectory(realdir, 'foo') + self.assertItemsEqual(x.Runs(), ['foo/.', 'foo/subdir']) + + def testAddRunsFromDirectoryWalksTree(self): + x = event_multiplexer.EventMultiplexer() + tmpdir = self.get_temp_dir() + join = os.path.join + realdir = join(tmpdir, 'event_containing_directory') + + _CreateCleanDirectory(realdir) + _AddEvents(realdir) + sub = join(realdir, 'subdirectory') + sub1 = join(sub, '1') + sub2 = join(sub, '2') + sub1_1 = join(sub1, '1') + _AddEvents(sub1) + _AddEvents(sub2) + _AddEvents(sub1_1) + x.AddRunsFromDirectory(realdir) + + self.assertItemsEqual(x.Runs(), ['.', 'subdirectory/1', 'subdirectory/2', + 'subdirectory/1/1']) + + def testAddRunsFromDirectoryThrowsException(self): + x = event_multiplexer.EventMultiplexer() + tmpdir = self.get_temp_dir() + + filepath = _AddEvents(tmpdir) + with self.assertRaises(ValueError): + x.AddRunsFromDirectory(filepath) + + def testAddRun(self): + x = event_multiplexer.EventMultiplexer() + x.AddRun('run1_path', 'run1') + run1 = x.GetAccumulator('run1') + self.assertEqual(sorted(x.Runs().keys()), ['run1']) + self.assertEqual(run1._path, 'run1_path') + + x.AddRun('run1_path', 'run1') + self.assertEqual(run1, x.GetAccumulator('run1'), 'loader not recreated') + + x.AddRun('run2_path', 'run1') + new_run1 = x.GetAccumulator('run1') + self.assertEqual(new_run1._path, 'run2_path') + self.assertNotEqual(run1, new_run1) + + x.AddRun('runName3') + self.assertItemsEqual(sorted(x.Runs().keys()), ['run1', 'runName3']) + self.assertEqual(x.GetAccumulator('runName3')._path, 'runName3') + + def testAddRunMaintainsLoading(self): + x = event_multiplexer.EventMultiplexer() + x.Reload() + x.AddRun('run1') + x.AddRun('run2') + self.assertTrue(x.GetAccumulator('run1').reload_called) + self.assertTrue(x.GetAccumulator('run2').reload_called) + + +class EventMultiplexerWithRealAccumulatorTest(tf.test.TestCase): + + def testDeletingDirectoryRemovesRun(self): + x = event_multiplexer.EventMultiplexer() + tmpdir = self.get_temp_dir() + join = os.path.join + run1_dir = join(tmpdir, 'run1') + run2_dir = join(tmpdir, 'run2') + run3_dir = join(tmpdir, 'run3') + + for dirname in [run1_dir, run2_dir, run3_dir]: + _AddEvents(dirname) + + x.AddRun(run1_dir, 'run1') + x.AddRun(run2_dir, 'run2') + x.AddRun(run3_dir, 'run3') + + x.Reload() + + # Delete the directory, then reload. + shutil.rmtree(run2_dir) + x.Reload() + self.assertNotIn('run2', x.Runs().keys()) + + +if __name__ == '__main__': + tf.test.main() diff --git a/tensorboard/plugins/audio/audio_plugin_test.py b/tensorboard/plugins/audio/audio_plugin_test.py index ec356258b19..8bc5c13a830 100644 --- a/tensorboard/plugins/audio/audio_plugin_test.py +++ b/tensorboard/plugins/audio/audio_plugin_test.py @@ -32,7 +32,7 @@ from werkzeug import wrappers from tensorboard.backend import application -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.audio import summary from tensorboard.plugins.audio import audio_plugin diff --git a/tensorboard/plugins/core/core_plugin_test.py b/tensorboard/plugins/core/core_plugin_test.py index f59cc5edbc6..e5dc0951ab0 100644 --- a/tensorboard/plugins/core/core_plugin_test.py +++ b/tensorboard/plugins/core/core_plugin_test.py @@ -28,7 +28,7 @@ from werkzeug import wrappers from tensorboard.backend import application -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.core import core_plugin diff --git a/tensorboard/plugins/debugger/debugger_plugin_test.py b/tensorboard/plugins/debugger/debugger_plugin_test.py index 71ad1c77a7c..1aaa10516fe 100644 --- a/tensorboard/plugins/debugger/debugger_plugin_test.py +++ b/tensorboard/plugins/debugger/debugger_plugin_test.py @@ -23,7 +23,7 @@ import tensorflow as tf -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.debugger import debugger_plugin from tensorboard.plugins.debugger import debugger_plugin_testlib diff --git a/tensorboard/plugins/debugger/debugger_plugin_testlib.py b/tensorboard/plugins/debugger/debugger_plugin_testlib.py index 412ba14f096..802e112aa58 100644 --- a/tensorboard/plugins/debugger/debugger_plugin_testlib.py +++ b/tensorboard/plugins/debugger/debugger_plugin_testlib.py @@ -30,7 +30,7 @@ # pylint: disable=ungrouped-imports, wrong-import-order from google.protobuf import json_format from tensorboard.backend import application -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.debugger import constants from tensorboard.plugins.debugger import debugger_plugin diff --git a/tensorboard/plugins/debugger/events_writer_manager_test.py b/tensorboard/plugins/debugger/events_writer_manager_test.py index ac04db0b7bf..91c48b4d60e 100644 --- a/tensorboard/plugins/debugger/events_writer_manager_test.py +++ b/tensorboard/plugins/debugger/events_writer_manager_test.py @@ -23,7 +23,7 @@ import tensorflow as tf -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.debugger import debugger_plugin from tensorboard.plugins.debugger import debugger_plugin_testlib diff --git a/tensorboard/plugins/distribution/compressor.py b/tensorboard/plugins/distribution/compressor.py index 343bf32459a..7288a7027c8 100644 --- a/tensorboard/plugins/distribution/compressor.py +++ b/tensorboard/plugins/distribution/compressor.py @@ -32,6 +32,57 @@ ['basis_point', 'value']) +# TODO(@jart): Unfork these methods. +def compress_histogram_proto(histo, bps=NORMAL_HISTOGRAM_BPS): + """Creates fixed size histogram by adding compression to accumulated state. + + This routine transforms a histogram at a particular step by interpolating its + variable number of buckets to represent their cumulative weight at a constant + number of compression points. This significantly reduces the size of the + histogram and makes it suitable for a two-dimensional area plot where the + output of this routine constitutes the ranges for a single x coordinate. + + Args: + histo: A HistogramProto object. + bps: Compression points represented in basis points, 1/100ths of a percent. + Defaults to normal distribution. + + Returns: + List of values for each basis point. + """ + # See also: Histogram::Percentile() in core/lib/histogram/histogram.cc + if not histo.num: + return [CompressedHistogramValue(b, 0.0) for b in bps] + bucket = np.array(histo.bucket) + bucket_limit = list(histo.bucket_limit) + weights = (bucket * bps[-1] / (bucket.sum() or 1.0)).cumsum() + values = [] + j = 0 + while j < len(bps): + i = np.searchsorted(weights, bps[j], side='right') + while i < len(weights): + cumsum = weights[i] + cumsum_prev = weights[i - 1] if i > 0 else 0.0 + if cumsum == cumsum_prev: # prevent lerp divide by zero + i += 1 + continue + if not i or not cumsum_prev: + lhs = histo.min + else: + lhs = max(bucket_limit[i - 1], histo.min) + rhs = min(bucket_limit[i], histo.max) + weight = _lerp(bps[j], cumsum_prev, cumsum, lhs, rhs) + values.append(CompressedHistogramValue(bps[j], weight)) + j += 1 + break + else: + break + while j < len(bps): + values.append(CompressedHistogramValue(bps[j], histo.max)) + j += 1 + return values + + def compress_histogram(buckets, bps=NORMAL_HISTOGRAM_BPS): """Creates fixed size histogram by adding compression to accumulated state. diff --git a/tensorboard/plugins/distribution/distributions_plugin_test.py b/tensorboard/plugins/distribution/distributions_plugin_test.py index 546723a27d1..e2b65aed948 100644 --- a/tensorboard/plugins/distribution/distributions_plugin_test.py +++ b/tensorboard/plugins/distribution/distributions_plugin_test.py @@ -26,8 +26,8 @@ from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf -from tensorboard.backend.event_processing import event_accumulator -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.distribution import compressor from tensorboard.plugins.distribution import distributions_plugin diff --git a/tensorboard/plugins/graph/graphs_plugin.py b/tensorboard/plugins/graph/graphs_plugin.py index 24799998d01..95d9eba540a 100644 --- a/tensorboard/plugins/graph/graphs_plugin.py +++ b/tensorboard/plugins/graph/graphs_plugin.py @@ -22,7 +22,7 @@ from tensorboard.backend import http_util from tensorboard.backend import process_graph -from tensorboard.backend.event_processing import event_accumulator +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long from tensorboard.plugins import base_plugin _PLUGIN_PREFIX_ROUTE = 'graphs' diff --git a/tensorboard/plugins/graph/graphs_plugin_test.py b/tensorboard/plugins/graph/graphs_plugin_test.py index 17b96b9efab..5851bdad458 100644 --- a/tensorboard/plugins/graph/graphs_plugin_test.py +++ b/tensorboard/plugins/graph/graphs_plugin_test.py @@ -26,7 +26,7 @@ import tensorflow as tf from google.protobuf import text_format -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.graph import graphs_plugin diff --git a/tensorboard/plugins/histogram/histograms_plugin_test.py b/tensorboard/plugins/histogram/histograms_plugin_test.py index c3d180a4760..d4deb7f72db 100644 --- a/tensorboard/plugins/histogram/histograms_plugin_test.py +++ b/tensorboard/plugins/histogram/histograms_plugin_test.py @@ -26,8 +26,8 @@ from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf -from tensorboard.backend.event_processing import event_accumulator -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.histogram import histograms_plugin from tensorboard.plugins.histogram import summary diff --git a/tensorboard/plugins/image/images_plugin_test.py b/tensorboard/plugins/image/images_plugin_test.py index 9d25c2f4430..021542ce5a2 100644 --- a/tensorboard/plugins/image/images_plugin_test.py +++ b/tensorboard/plugins/image/images_plugin_test.py @@ -32,7 +32,7 @@ from werkzeug import wrappers from tensorboard.backend import application -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.image import summary from tensorboard.plugins.image import images_plugin diff --git a/tensorboard/plugins/pr_curve/summary_test.py b/tensorboard/plugins/pr_curve/summary_test.py index 551d63ab3bc..8d6315e6157 100644 --- a/tensorboard/plugins/pr_curve/summary_test.py +++ b/tensorboard/plugins/pr_curve/summary_test.py @@ -26,7 +26,7 @@ import tensorflow as tf from google.protobuf import json_format -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer from tensorboard.plugins.pr_curve import plugin_data_pb2 from tensorboard.plugins.pr_curve import pr_curve_demo from tensorboard.plugins.pr_curve import summary @@ -53,7 +53,7 @@ def setUp(self, *args, **kwargs): def validateTensorEvent(self, expected_step, expected_value, tensor_event): """Checks that the values stored within a tensor are correct. - + Args: expected_step: The expected step. tensor_event: A TensorEvent named tuple. @@ -208,7 +208,7 @@ def testExplicitWeights(self): [0.2222222, 0.3777778, 0.8333333, 1.0, 1.0], # Precision. [1.0, 0.34, 0.1, 0.02, 0.0], # Recall. ], tensor_events[1]) - + self.validateTensorEvent(2, [ [50.0, 18.0, 6.0, 1.0, 0.0], # True positives. [175.0, 27.0, 6.0, 0.0, 0.0], # False positives. @@ -229,7 +229,7 @@ def testExplicitWeights(self): [0.4444444, 0.5819672, 0.8275862, 0.5, 1.0], # Precision. [1.0, 0.71, 0.24, 0.02, 0.0], # Recall. ], tensor_events[0]) - + self.validateTensorEvent(1, [ [100.0, 63.0, 20.0, 5.0, 0.0], # True positives. [125.0, 42.0, 7.0, 1.0, 0.0], # False positives. @@ -260,7 +260,7 @@ def testExplicitWeights(self): [0.3333333, 0.3786982, 0.5384616, 1.0, 1.0], # Precision. [1.0, 0.8533334, 0.28, 0.0666667, 0.0], # Recall. ], tensor_events[0]) - + self.validateTensorEvent(1, [ [75.0, 62.0, 21.0, 1.0, 0.0], # True positives. [150.0, 99.0, 21.0, 3.0, 0.0], # False positives. @@ -269,7 +269,7 @@ def testExplicitWeights(self): [0.3333333, 0.3850932, 0.5, 0.25, 1.0], # Precision. [1.0, 0.8266667, 0.28, 0.0133333, 0.0], # Recall. ], tensor_events[1]) - + self.validateTensorEvent(2, [ [75.0, 61.0, 16.0, 2.0, 0.0], # True positives. [150.0, 92.0, 20.0, 1.0, 0.0], # False positives. diff --git a/tensorboard/plugins/projector/projector_plugin_test.py b/tensorboard/plugins/projector/projector_plugin_test.py index f709ed1f859..dcc47a82e74 100644 --- a/tensorboard/plugins/projector/projector_plugin_test.py +++ b/tensorboard/plugins/projector/projector_plugin_test.py @@ -32,7 +32,7 @@ from google.protobuf import text_format from tensorboard.backend import application -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.projector import projector_config_pb2 from tensorboard.plugins.projector import projector_plugin diff --git a/tensorboard/plugins/scalar/scalars_plugin_test.py b/tensorboard/plugins/scalar/scalars_plugin_test.py index 6633b4ebbcd..4a46e4dd9d0 100644 --- a/tensorboard/plugins/scalar/scalars_plugin_test.py +++ b/tensorboard/plugins/scalar/scalars_plugin_test.py @@ -27,8 +27,8 @@ from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf -from tensorboard.backend.event_processing import event_accumulator -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.scalar import scalars_plugin from tensorboard.plugins.scalar import summary diff --git a/tensorboard/plugins/text/text_plugin_test.py b/tensorboard/plugins/text/text_plugin_test.py index 49bb3a27249..ecdb1a03f09 100644 --- a/tensorboard/plugins/text/text_plugin_test.py +++ b/tensorboard/plugins/text/text_plugin_test.py @@ -27,7 +27,7 @@ import tensorflow as tf from tensorboard import plugin_util -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long from tensorboard.plugins import base_plugin from tensorboard.plugins.text import text_plugin