diff --git a/src/plotman/_tests/archive_test.py b/src/plotman/_tests/archive_test.py index 4b98ac90..e30116b6 100755 --- a/src/plotman/_tests/archive_test.py +++ b/src/plotman/_tests/archive_test.py @@ -1,13 +1,13 @@ from plotman import archive, configuration, job, manager +import pytest def test_compute_priority(): assert (archive.compute_priority( job.Phase(major=3, minor=1), 1000, 10) > archive.compute_priority( job.Phase(major=3, minor=6), 1000, 10) ) -def test_rsync_dest(): - arch_dir = '/plotdir/012' - arch_cfg = configuration.Archive( +def _archive_legacy(): + return configuration.Archive( rsyncd_module='plots_mod', rsyncd_path='/plotdir', rsyncd_host='thehostname', @@ -15,11 +15,74 @@ def test_rsync_dest(): rsyncd_bwlimit=80000 ) +def test_arch_dest(): + arch_dir = '/plotdir/012' + arch_cfg = _archive_legacy() + # Normal usage assert ('rsync://theusername@thehostname:12000/plots_mod/012' == - archive.rsync_dest(arch_cfg, arch_dir)) + archive.arch_dest(arch_cfg, arch_dir)) # Usage for constructing just the prefix, for scanning process tables # for matching jobs. assert ('rsync://theusername@thehostname:12000/' == - archive.rsync_dest(arch_cfg, '/')) + archive.arch_dest(arch_cfg, '/')) + + +def test_archive_legacy_default(): + arch_cfg = _archive_legacy() + assert arch_cfg.mode == 'legacy' + +def _archive_badmode(): + return configuration.Archive( + rsyncd_module='plots_mod', + rsyncd_path='/plotdir', + rsyncd_host='thehostname', + rsyncd_user='theusername', + rsyncd_bwlimit=80000, + mode='thismodedoesntexist' + ) + +def test_archive_bad_mode(): + arch_cfg = _archive_badmode() + assert arch_cfg.mode == 'thismodedoesntexist' + + +def test_archive_bad_mode_load(): + arch_cfg = _archive_badmode() + with pytest.raises(AttributeError): + getattr(arch_cfg, arch_cfg.mode) + + +def _archive_emptymode(): + return configuration.Archive( + rsyncd_module='plots_mod', + rsyncd_path='/plotdir', + rsyncd_host='thehostname', + rsyncd_user='theusername', + rsyncd_bwlimit=80000, + mode='local' + ) + +def test_archive_local_mode_absent(): + arch_cfg = _archive_emptymode() + arch_cfg_local = getattr(arch_cfg, arch_cfg.mode) + assert not arch_cfg_local + +def _archive_localmode(): + return configuration.Archive( + rsyncd_module='plots_mod', + rsyncd_path='/plotdir', + rsyncd_host='thehostname', + rsyncd_user='theusername', + rsyncd_bwlimit=80000, + mode='local', + local=configuration.ArchiveLocal( + path='/farm' + ) + ) + +def test_archive_local_mode_load(): + arch_cfg = _archive_localmode() + arch_cfg_local = getattr(arch_cfg, arch_cfg.mode) + assert isinstance(arch_cfg_local, configuration.ArchiveLocal) diff --git a/src/plotman/archive.py b/src/plotman/archive.py index 641ae1d1..3660d5e6 100644 --- a/src/plotman/archive.py +++ b/src/plotman/archive.py @@ -87,8 +87,12 @@ def compute_priority(phase, gb_free, n_plots): def get_archdir_freebytes(arch_cfg): archdir_freebytes = {} - df_cmd = ('ssh %s@%s df -aBK | grep " %s/"' % - (arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, posixpath.normpath(arch_cfg.rsyncd_path)) ) + if arch_cfg.mode == 'legacy': + df_cmd = ('ssh %s@%s df -aBK | grep " %s/"' % + (arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, posixpath.normpath(arch_cfg.rsyncd_path)) ) + else: + arch_cfg_custom = getattr(arch_cfg, arch_cfg.mode) + df_cmd = (arch_cfg_custom.df_cmd.format(arch_cfg_custom.path)) with subprocess.Popen(df_cmd, shell=True, stdout=subprocess.PIPE) as proc: for line in proc.stdout.readlines(): fields = line.split() @@ -100,23 +104,30 @@ def get_archdir_freebytes(arch_cfg): archdir_freebytes[archdir] = freebytes return archdir_freebytes -def rsync_dest(arch_cfg, arch_dir): - rsync_path = arch_dir.replace(arch_cfg.rsyncd_path, arch_cfg.rsyncd_module) - if rsync_path.startswith('/'): - rsync_path = rsync_path[1:] # Avoid dup slashes. TODO use path join? - rsync_url = 'rsync://%s@%s:12000/%s' % ( - arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, rsync_path) - return rsync_url +def arch_dest(arch_cfg, arch_dir): + if arch_cfg.mode == 'legacy': + rsync_path = arch_dir.replace(arch_cfg.rsyncd_path, arch_cfg.rsyncd_module) + if rsync_path.startswith('/'): + rsync_path = rsync_path[1:] # Avoid dup slashes. TODO use path join? + return 'rsync://%s@%s:12000/%s' % ( + arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, rsync_path) + else: + arch_cfg_custom = getattr(arch_cfg, arch_cfg.mode) + return arch_cfg_custom.path # TODO: maybe consolidate with similar code in job.py? def get_running_archive_jobs(arch_cfg): '''Look for running rsync jobs that seem to match the pattern we use for archiving them. Return a list of PIDs of matching jobs.''' jobs = [] - dest = rsync_dest(arch_cfg, '/') + dest = arch_dest(arch_cfg, '/') for proc in psutil.process_iter(['pid', 'name']): with contextlib.suppress(psutil.NoSuchProcess): - if proc.name() == 'rsync': + if arch_cfg.mode == 'legacy': + proc_name = 'rsync' + else: + proc_name = getattr(arch_cfg, arch_cfg.mode).archive_tool + if proc.name() == proc_name: args = proc.cmdline() for arg in args: if arg.startswith(dest): @@ -169,10 +180,17 @@ def archive(dir_cfg, all_jobs): return(False, 'No archive directories found with enough free space') msg = 'Found %s with ~%d GB free' % (archdir, freespace / plot_util.GB) - - bwlimit = dir_cfg.archive.rsyncd_bwlimit - throttle_arg = ('--bwlimit=%d' % bwlimit) if bwlimit else '' - cmd = ('rsync %s --compress-level=0 --remove-source-files -P %s %s' % - (throttle_arg, chosen_plot, rsync_dest(dir_cfg.archive, archdir))) - + if dir_cfg.archive.mode == 'legacy': + bwlimit = dir_cfg.archive.rsyncd_bwlimit + throttle_arg = ('--bwlimit=%d' % bwlimit) if bwlimit else '' + cmd = ('rsync %s --compress-level=0 --remove-source-files -P %s %s' % + (throttle_arg, chosen_plot, arch_dest(dir_cfg.archive, archdir))) + else: + arch_cfg_custom = getattr(dir_cfg.archive, dir_cfg.archive.mode) + cmd = arch_cfg_custom.archive_cmd.format( + arch_cfg_custom.archive_tool, + arch_cfg_custom.parameters, + chosen_plot, + arch_dest(dir_cfg.archive, archdir) + ) return (True, cmd) diff --git a/src/plotman/configuration.py b/src/plotman/configuration.py index f081e75d..f6b15fae 100644 --- a/src/plotman/configuration.py +++ b/src/plotman/configuration.py @@ -48,6 +48,14 @@ def get_validated_configs(config_text, config_path): # Data models used to deserializing/formatting plotman.yaml files. +@attr.frozen +class ArchiveLocal: + path: str + df_cmd: str = 'df -BK | grep " {}/"' + archive_tool: str = 'rsync' + archive_cmd: str = '{} {} {} {}' + parameters: str = '--bwlimit=80000 --no-compress --remove-source-files -P' + @attr.frozen class Archive: rsyncd_module: str @@ -56,6 +64,13 @@ class Archive: rsyncd_host: str rsyncd_user: str index: int = 0 # If not explicit, "index" will default to 0 + mode: str = desert.ib( + default='legacy', + marshmallow_field=marshmallow.fields.String( + validate=marshmallow.validate.OneOf(choices=['legacy', 'local']) + ), + ) + local: Optional[ArchiveLocal] = None @attr.frozen class TmpOverrides: diff --git a/src/plotman/resources/plotman.yaml b/src/plotman/resources/plotman.yaml index 060989b8..8db6bfb1 100644 --- a/src/plotman/resources/plotman.yaml +++ b/src/plotman/resources/plotman.yaml @@ -83,7 +83,13 @@ directories: # have four plotters, you could set this to 0, 1, 2, and 3, on # the 4 machines, or 0, 1, 0, 1. # index: 0 - + # Optional switch to enable local archiving (defaults to remote if absent). + # Note: rsyncd_module, rsyncd_host and rsyncd_user are ignored in local mode. + mode: legacy # legacy (config above) or local + # + # Local mode: + # local: + # path: '/farm' # Plotting scheduling parameters scheduling: