Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add --detect_file_replacement flag and plumbing (redo) #5546

Merged
merged 4 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tensorboard/backend/event_processing/data_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, flags):
purge_orphaned_data=flags.purge_orphaned_data,
max_reload_threads=flags.max_reload_threads,
event_file_active_filter=_get_event_file_active_filter(flags),
detect_file_replacement=flags.detect_file_replacement,
)
self._data_provider = data_provider.MultiplexerDataProvider(
self._multiplexer, flags.logdir or flags.logdir_spec
Expand Down
2 changes: 2 additions & 0 deletions tensorboard/backend/event_processing/data_ingester_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class FakeFlags(object):
def __init__(
self,
detect_file_replacement=None,
generic_data="auto",
logdir="",
logdir_spec="",
Expand All @@ -45,6 +46,7 @@ def __init__(
samples_per_plugin=None,
window_title="",
):
self.detect_file_replacement = detect_file_replacement
self.generic_data = generic_data
self.logdir = logdir
self.logdir_spec = logdir_spec
Expand Down
26 changes: 21 additions & 5 deletions tensorboard/backend/event_processing/plugin_event_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(
tensor_size_guidance=None,
purge_orphaned_data=True,
event_file_active_filter=None,
detect_file_replacement=None,
):
"""Construct the `EventAccumulator`.

Expand All @@ -122,6 +123,9 @@ def __init__(
event_file_active_filter: Optional predicate for determining whether an
event file latest load timestamp should be considered active. If passed,
this will enable multifile directory loading.
detect_file_replacement: Optional boolean; if True, event file loading
will try to detect when a file has been replaced with a new version
that contains additional data, by monitoring the file size.
"""
size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE)
sizes = {}
Expand Down Expand Up @@ -155,7 +159,9 @@ def __init__(
self._plugin_tag_lock = threading.Lock()

self.path = path
self._generator = _GeneratorFromPath(path, event_file_active_filter)
self._generator = _GeneratorFromPath(
path, event_file_active_filter, detect_file_replacement
)
self._generator_mutex = threading.Lock()

self.purge_orphaned_data = purge_orphaned_data
Expand Down Expand Up @@ -639,23 +645,33 @@ def _GetPurgeMessage(
)


def _GeneratorFromPath(path, event_file_active_filter=None):
def _GeneratorFromPath(
path, event_file_active_filter=None, detect_file_replacement=None
):
"""Create an event generator for file or directory at given path string."""
if not path:
raise ValueError("path must be a valid string")
if io_wrapper.IsSummaryEventsFile(path):
return event_file_loader.EventFileLoader(path)
return event_file_loader.EventFileLoader(path, detect_file_replacement)
elif event_file_active_filter:
loader_factory = (
lambda path: event_file_loader.TimestampedEventFileLoader(
path, detect_file_replacement
)
)
return directory_loader.DirectoryLoader(
path,
event_file_loader.TimestampedEventFileLoader,
loader_factory,
path_filter=io_wrapper.IsSummaryEventsFile,
active_filter=event_file_active_filter,
)
else:
loader_factory = lambda path: event_file_loader.EventFileLoader(
path, detect_file_replacement
)
return directory_watcher.DirectoryWatcher(
path,
event_file_loader.EventFileLoader,
loader_factory,
io_wrapper.IsSummaryEventsFile,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
purge_orphaned_data=True,
max_reload_threads=None,
event_file_active_filter=None,
detect_file_replacement=None,
):
"""Constructor for the `EventMultiplexer`.

Expand All @@ -98,6 +99,9 @@ def __init__(
event_file_active_filter: Optional predicate for determining whether an
event file latest load timestamp should be considered active. If passed,
this will enable multifile directory loading.
detect_file_replacement: Optional boolean; if True, event file loading
will try to detect when a file has been replaced with a new version
that contains additional data, by monitoring the file size.
"""
logger.info("Event Multiplexer initializing.")
self._accumulators_mutex = threading.Lock()
Expand All @@ -111,6 +115,7 @@ def __init__(
self.purge_orphaned_data = purge_orphaned_data
self._max_reload_threads = max_reload_threads or 1
self._event_file_active_filter = event_file_active_filter
self._detect_file_replacement = detect_file_replacement
if run_path_map is not None:
logger.info(
"Event Multplexer doing initialization load for %s",
Expand Down Expand Up @@ -159,6 +164,7 @@ def AddRun(self, path, name=None):
tensor_size_guidance=self._tensor_size_guidance,
purge_orphaned_data=self.purge_orphaned_data,
event_file_active_filter=self._event_file_active_filter,
detect_file_replacement=self._detect_file_replacement,
)
self._accumulators[name] = accumulator
self._paths[name] = path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,9 @@ def Reload(self):

def _GetFakeAccumulator(
path,
size_guidance=None,
tensor_size_guidance=None,
purge_orphaned_data=None,
event_file_active_filter=None,
**unused_kwargs,
):
del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused.
del event_file_active_filter # unused
del unused_kwargs
return _FakeAccumulator(path)


Expand Down
27 changes: 27 additions & 0 deletions tensorboard/plugins/core/core_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,26 @@ def define_flags(self, parser):
means keep all samples of that type. For instance "scalars=500,images=0"
keeps 500 scalars and all images. Most users should not need to set this
flag.\
""",
)

parser.add_argument(
"--detect_file_replacement",
metavar="BOOL",
# Custom str-to-bool converter since regular bool() doesn't work.
type=lambda v: {"true": True, "false": False}.get(v.lower(), v),
choices=[True, False],
default=None,
help="""\
[experimental] If true, this enables experimental support for detecting when
event files are replaced with new versions that contain additional data. This is
not needed in the normal case where new data is either appended to an existing
file or written to a brand new file, but it arises, for example, when using
rsync without the --inplace option, in which new versions of the original file
are first written to a temporary file, then swapped into the final location.

This option is currently incompatible with --load_fast=true, and if passed will
disable fast-loading mode. (default: false)\
""",
)

Expand Down Expand Up @@ -683,6 +703,13 @@ def fix_flags(self, flags):
)
elif flags.host is not None and flags.bind_all:
raise FlagsError("Must not specify both --host and --bind_all.")
elif (
flags.load_fast == "true" and flags.detect_file_replacement is True
):
raise FlagsError(
"Must not specify both --load_fast=true and"
"--detect_file_replacement=true"
)

flags.path_prefix = flags.path_prefix.rstrip("/")
if flags.path_prefix and not flags.path_prefix.startswith("/"):
Expand Down
2 changes: 2 additions & 0 deletions tensorboard/plugins/core/core_plugin_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
grpc_data_provider="",
host=None,
inspect=False,
load_fast="auto",
logdir="",
logdir_spec="",
path_prefix="",
Expand All @@ -66,6 +67,7 @@ def __init__(
self.grpc_data_provider = grpc_data_provider
self.host = host
self.inspect = inspect
self.load_fast = load_fast
self.logdir = logdir
self.logdir_spec = logdir_spec
self.path_prefix = path_prefix
Expand Down
7 changes: 7 additions & 0 deletions tensorboard/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ def _should_use_data_server(flags):
"paths; falling back to slower Python-only load path."
)
return False
if flags.detect_file_replacement is True:
logger.info(
"Note: --detect_file_replacement=true is not supported with "
"--load_fast behavior; falling back to slower Python-only load "
"path."
)
return False
return True


Expand Down
2 changes: 2 additions & 0 deletions tensorboard/program_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_should_use_data_server(self):
def f(**kwargs):
kwargs.setdefault("logdir", "")
kwargs.setdefault("logdir_spec", "")
kwargs.setdefault("detect_file_replacement", None)
flags = argparse.Namespace()
for k, v in kwargs.items():
setattr(flags, k, v)
Expand All @@ -96,6 +97,7 @@ def f(**kwargs):
self.assertTrue(f(logdir="logs/mnist/"))
self.assertTrue(f(logdir="gs://logs"))
self.assertFalse(f(logdir="notgs://logs"))
self.assertFalse(f(logdir="foo", detect_file_replacement=True))


class WerkzeugServerTest(tb_test.TestCase):
Expand Down