Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom archive mode #627

Merged
merged 76 commits into from
Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
fbca19a
local/remote switch for rsync
jkbecker Apr 2, 2021
a39f39a
fix local rsync dest
jkbecker Apr 2, 2021
fc7ea41
add notes to config.yaml
jkbecker Apr 2, 2021
f3281d6
Merge branch 'development' of github.com:ericaltendorf/plotman into a…
jkbecker Apr 2, 2021
ad86f13
Merge branch 'setup' into archivelocal
jkbecker Apr 4, 2021
715267b
Merge branch 'setup' of github.com:jkbecker/plotman into archivelocal
jkbecker Apr 5, 2021
6882e10
Merge branch 'development' of github.com:jkbecker/plotman into archiv…
jkbecker Apr 7, 2021
829661c
Update config.yaml
jkbecker Apr 7, 2021
aae3654
Update src/plotman/archive.py
jkbecker Apr 7, 2021
9f205df
Merge branch 'development' of github.com:ericaltendorf/plotman into a…
jkbecker Apr 28, 2021
c570f7c
Merge branch 'archivelocal' of github.com:jkbecker/plotman into archi…
jkbecker Apr 28, 2021
c7d8ce2
adapt config template
jkbecker Apr 28, 2021
68b683f
adapt error message
jkbecker Apr 28, 2021
9d5ae8a
doc fix
jkbecker Apr 28, 2021
2d917fa
whitespace, defaults
jkbecker Apr 28, 2021
43ead3d
Update src/plotman/configuration.py
jkbecker Apr 28, 2021
4aee539
Merge branch 'development' into archivelocal
altendky Apr 28, 2021
194dc6d
Update src/plotman/configuration.py
jkbecker Apr 29, 2021
c5d5ad6
Update src/plotman/archive.py
jkbecker Apr 29, 2021
59de0c6
Update src/plotman/archive.py
altendky Apr 29, 2021
7f15c43
Merge branch 'development' of github.com:ericaltendorf/plotman into a…
jkbecker May 8, 2021
11d06a7
new config file format supporting local archive mode
jkbecker May 8, 2021
6df447e
prepare for legacy vs extendable new configs
jkbecker May 8, 2021
b97c385
merging
jkbecker May 8, 2021
4c92cb1
better example value for local mode
jkbecker May 8, 2021
57ebfe5
some archive tests
jkbecker May 8, 2021
160b3f9
rsync_dest -> arch_dest refactoring
jkbecker May 8, 2021
fff7c22
Merge branch 'development' into archivelocal
altendky May 17, 2021
12a8214
custom archive
altendky May 19, 2021
c3737ee
less debuggy
altendky May 19, 2021
8978b4d
locally detects free space and transfers
altendky May 19, 2021
82b3684
rsync argument tweaks
altendky May 19, 2021
33e2760
skip malformed df-alike output
altendky May 21, 2021
239892a
Merge branch 'development' into custom_archive
altendky May 21, 2021
73fe689
switch to scripts
altendky May 22, 2021
7480d2d
correct default scripts
altendky May 22, 2021
a35959c
add a little config example
altendky May 22, 2021
28d434f
Merge branch 'development' into custom_archive
altendky May 22, 2021
5866683
just break archiving and make it all new
altendky May 22, 2021
141ba38
less path
altendky May 22, 2021
6e291bb
fix archive prefix identification
altendky May 23, 2021
3c8ead9
add preliminary explanation and examples in config
altendky May 23, 2021
0a1e96e
misc
altendky May 23, 2021
f24a372
fixup for archive subcommand as well
altendky May 23, 2021
68db1f2
links to wiki for configuration versions
altendky May 23, 2021
cc25e05
minor cleanup
altendky May 24, 2021
de73ed4
cleanup temp script files
altendky May 24, 2021
2519534
--preallocate --whole-file
altendky May 27, 2021
41d9033
correct indentation in example config
altendky May 27, 2021
7ad3405
explain disk space script output format
altendky May 27, 2021
d512f6b
lots of logging from the disk script
altendky May 27, 2021
8fa5f37
fixup
altendky May 27, 2021
816f430
slight refactor for reporting
altendky May 27, 2021
87cbdb8
less recalculation in get_running_archive_jobs()
altendky May 27, 2021
cf12e4c
hold multiple archival target definitions
altendky May 28, 2021
efbe6d0
add env with defaults and mandatories for target definitions
altendky May 28, 2021
22ce23f
add preset archiving target definitions
altendky May 28, 2021
4a4a4f3
log archive transfer activities
altendky May 30, 2021
d12ffd4
Merge branch 'development' into custom_archive
altendky May 30, 2021
50fcceb
Apply suggestions from code review
altendky May 30, 2021
c87f6e8
add set -evx to archival presets
altendky May 30, 2021
1132260
archive -> disk space in log message
altendky May 30, 2021
7ab1523
be agnostic to trailing /s on site_root (again)
altendky May 30, 2021
2d87e20
add logging (for disk space script output)
altendky May 30, 2021
2461304
allow integers as env values to avoid user confusion specifying ports
altendky May 31, 2021
4001446
Merge branch 'development' into custom_archive
altendky May 31, 2021
d7aca71
top level logging: and plots/transfers/application log path configura…
altendky May 31, 2021
33b3b97
tidy
altendky May 31, 2021
c070605
shift some IO up a layer
altendky May 31, 2021
77c5ec1
a couple more cfg.logging.plots
altendky May 31, 2021
ab0614b
Merge branch 'interactive-noauto-start' into custom_archive
altendky Jun 1, 2021
af155b2
Merge branch 'interactive-noauto-start' into custom_archive
altendky Jun 1, 2021
3abc9d2
Merge branch 'interactive-noauto-start' into custom_archive
altendky Jun 1, 2021
175aebc
Merge branch 'development' into custom_archive
altendky Jun 1, 2021
2e27644
handle some None outputs
altendky Jun 2, 2021
d680322
printf for mawk to avoid scientific notation etc
altendky Jun 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions src/plotman/_tests/archive_test.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,6 @@
from plotman import archive, configuration, job, manager
from plotman import archive, job


def test_compute_priority():
assert (archive.compute_priority( job.Phase(major=3, minor=1), 1000, 10) >
archive.compute_priority( job.Phase(major=3, minor=6), 1000, 10) )

def test_rsync_dest():
arch_dir = '/plotdir/012'
arch_cfg = configuration.Archive(
rsyncd_module='plots_mod',
rsyncd_path='/plotdir',
rsyncd_host='thehostname',
rsyncd_user='theusername',
rsyncd_bwlimit=80000
)

# Normal usage
assert ('rsync://theusername@thehostname:12000/plots_mod/012' ==
archive.rsync_dest(arch_cfg, arch_dir))

# Usage for constructing just the prefix, for scanning process tables
# for matching jobs.
assert ('rsync://theusername@thehostname:12000/' ==
archive.rsync_dest(arch_cfg, '/'))
27 changes: 17 additions & 10 deletions src/plotman/_tests/configuration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@ def config_text_fixture():
return importlib.resources.read_text(plotman_resources, "plotman.yaml")


def test_get_validated_configs__default(config_text):
@pytest.fixture(name='target_definitions_text')
def target_definitions_text_fixture():
return importlib.resources.read_text(
plotman_resources, "target_definitions.yaml",
)


def test_get_validated_configs__default(config_text, target_definitions_text):
"""Check that get_validated_configs() works with default/example plotman.yaml file."""
res = configuration.get_validated_configs(config_text, '')
res = configuration.get_validated_configs(config_text, '', target_definitions_text)
assert isinstance(res, configuration.PlotmanConfig)

def test_get_validated_configs__malformed(config_text):
def test_get_validated_configs__malformed(config_text, target_definitions_text):
"""Check that get_validated_configs() raises exception with invalid plotman.yaml contents."""
loaded_yaml = yaml.load(config_text, Loader=yaml.SafeLoader)

Expand All @@ -27,7 +34,7 @@ def test_get_validated_configs__malformed(config_text):
malformed_config_text = yaml.dump(loaded_yaml, Dumper=yaml.SafeDumper)

with pytest.raises(configuration.ConfigurationException) as exc_info:
configuration.get_validated_configs(malformed_config_text, '/the_path')
configuration.get_validated_configs(malformed_config_text, '/the_path', target_definitions_text)

assert exc_info.value.args[0] == f"Config file at: '/the_path' is malformed"

Expand All @@ -43,43 +50,43 @@ def test_get_validated_configs__missing():
)


def test_loads_without_user_interface(config_text):
def test_loads_without_user_interface(config_text, target_definitions_text):
loaded_yaml = yaml.load(config_text, Loader=yaml.SafeLoader)

del loaded_yaml["user_interface"]

stripped_config_text = yaml.dump(loaded_yaml, Dumper=yaml.SafeDumper)

reloaded_yaml = configuration.get_validated_configs(stripped_config_text, '')
reloaded_yaml = configuration.get_validated_configs(stripped_config_text, '', target_definitions_text)

assert reloaded_yaml.user_interface == configuration.UserInterface()


def test_get_dst_directories_gets_dst():
tmp = ['/tmp']
dst = ['/dst0', '/dst1']
directories = configuration.Directories(log='', tmp=tmp, dst=dst)
directories = configuration.Directories(tmp=tmp, dst=dst)

assert directories.get_dst_directories() == dst


def test_get_dst_directories_gets_tmp():
tmp = ['/tmp']
directories = configuration.Directories(log='', tmp=tmp)
directories = configuration.Directories(tmp=tmp)

assert directories.get_dst_directories() == tmp


def test_dst_is_dst():
tmp = ['/tmp']
dst = ['/dst0', '/dst1']
directories = configuration.Directories(log='', tmp=tmp, dst=dst)
directories = configuration.Directories(tmp=tmp, dst=dst)

assert not directories.dst_is_tmp()


def test_dst_is_tmp():
tmp = ['/tmp']
directories = configuration.Directories(log='', tmp=tmp)
directories = configuration.Directories(tmp=tmp)

assert directories.dst_is_tmp()
1 change: 0 additions & 1 deletion src/plotman/_tests/manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def sched_cfg():
@pytest.fixture
def dir_cfg():
return configuration.Directories(
log="/plots/log",
tmp=["/var/tmp", "/tmp"],
dst=["/mnt/dst/00", "/mnt/dst/01", "/mnt/dst/03"],
tmp_overrides={"/mnt/tmp/04": configuration.TmpOverrides(tmpdir_max_jobs=4)}
Expand Down
175 changes: 121 additions & 54 deletions src/plotman/archive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import contextlib
import logging
import math
import os
import posixpath
Expand All @@ -9,37 +10,68 @@
import sys
from datetime import datetime

import pendulum
import psutil
import texttable as tt

from plotman import job, manager, plot_util
from plotman import configuration, job, manager, plot_util


logger = logging.getLogger(__name__)

# TODO : write-protect and delete-protect archived plots

def spawn_archive_process(dir_cfg, all_jobs):
def spawn_archive_process(dir_cfg, arch_cfg, log_cfg, all_jobs):
'''Spawns a new archive process using the command created
in the archive() function. Returns archiving status and a log message to print.'''

log_message = None
log_messages = []
archiving_status = None

# Look for running archive jobs. Be robust to finding more than one
# even though the scheduler should only run one at a time.
arch_jobs = get_running_archive_jobs(dir_cfg.archive)
arch_jobs = get_running_archive_jobs(arch_cfg)

if not arch_jobs:
(should_start, status_or_cmd) = archive(dir_cfg, all_jobs)
(should_start, status_or_cmd, archive_log_messages) = archive(dir_cfg, arch_cfg, all_jobs)
log_messages.extend(archive_log_messages)
if not should_start:
archiving_status = status_or_cmd
else:
cmd = status_or_cmd
# TODO: do something useful with output instead of DEVNULL
p = subprocess.Popen(cmd,
args = status_or_cmd

log_file_path = log_cfg.create_transfer_log_path(time=pendulum.now())

log_messages.append(f'Starting archive: {args["args"]} ; logging to {log_file_path}')
# TODO: CAMPid 09840103109429840981397487498131
try:
open_log_file = open(log_file_path, 'x')
except FileExistsError:
log_messages.append(
f'Archiving log file already exists, skipping attempt to start a'
f' new archive transfer: {log_file_path!r}'
)
return (False, log_messages)
except FileNotFoundError as e:
message = (
f'Unable to open log file. Verify that the directory exists'
f' and has proper write permissions: {log_file_path!r}'
)
raise Exception(message) from e
Comment on lines +55 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be prudent to check it on startup rather than here. I added a snipped in another comment.


# Preferably, do not add any code between the try block above
# and the with block below. IOW, this space intentionally left
# blank... As is, this provides a good chance that our handle
# of the log file will get closed explicitly while still
# allowing handling of just the log file opening error.

with open_log_file:
# start_new_sessions to make the job independent of this controlling tty.
p = subprocess.Popen(**args,
shell=True,
stdout=subprocess.DEVNULL,
stdout=open_log_file,
stderr=subprocess.STDOUT,
start_new_session=True)
log_message = 'Starting archive: ' + cmd
# At least for now it seems that even if we get a new running
# archive jobs list it doesn't contain the new rsync process.
# My guess is that this is because the bash in the middle due to
Expand All @@ -51,7 +83,7 @@ def spawn_archive_process(dir_cfg, all_jobs):
if archiving_status is None:
archiving_status = 'pid: ' + ', '.join(map(str, arch_jobs))

return archiving_status, log_message
return archiving_status, log_messages

def compute_priority(phase, gb_free, n_plots):
# All these values are designed around dst buffer dirs of about
Expand Down Expand Up @@ -86,50 +118,82 @@ def compute_priority(phase, gb_free, n_plots):
return priority

def get_archdir_freebytes(arch_cfg):
log_messages = []
target = arch_cfg.target_definition()

archdir_freebytes = {}
df_cmd = ('ssh %s@%s df -aBK | grep " %s/"' %
(arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, posixpath.normpath(arch_cfg.rsyncd_path)) )
with subprocess.Popen(df_cmd, shell=True, stdout=subprocess.PIPE) as proc:
for line in proc.stdout.readlines():
fields = line.split()
if fields[3] == b'-':
# not actually mounted
timeout = 5
try:
completed_process = subprocess.run(
[target.disk_space_path],
env={**os.environ, **arch_cfg.environment()},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
except subprocess.TimeoutExpired as e:
log_messages.append(f'Disk space check timed out in {timeout} seconds')
if e.stdout is None:
stdout = ''
else:
stdout = e.stdout.decode('utf-8', errors='ignore').strip()
if e.stderr is None:
stderr = ''
else:
stderr = e.stderr.decode('utf-8', errors='ignore').strip()
else:
stdout = completed_process.stdout.decode('utf-8', errors='ignore').strip()
stderr = completed_process.stderr.decode('utf-8', errors='ignore').strip()
for line in stdout.splitlines():
line = line.strip()
split = line.split(':')
if len(split) != 2:
log_messages.append(f'Unable to parse disk script line: {line!r}')
continue
freebytes = int(fields[3][:-1]) * 1024 # Strip the final 'K'
archdir = (fields[5]).decode('utf-8')
archdir_freebytes[archdir] = freebytes
return archdir_freebytes

def rsync_dest(arch_cfg, arch_dir):
rsync_path = arch_dir.replace(arch_cfg.rsyncd_path, arch_cfg.rsyncd_module)
if rsync_path.startswith('/'):
rsync_path = rsync_path[1:] # Avoid dup slashes. TODO use path join?
rsync_url = 'rsync://%s@%s:12000/%s' % (
arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, rsync_path)
return rsync_url
archdir, space = split
freebytes = int(space)
archdir_freebytes[archdir.strip()] = freebytes

for line in log_messages:
logger.info(line)

logger.info('stdout from disk space script:')
for line in stdout.splitlines():
logger.info(f' {line}')

logger.info('stderr from disk space script:')
for line in stderr.splitlines():
logger.info(f' {line}')

return archdir_freebytes, log_messages

# TODO: maybe consolidate with similar code in job.py?
def get_running_archive_jobs(arch_cfg):
'''Look for running rsync jobs that seem to match the pattern we use for archiving
them. Return a list of PIDs of matching jobs.'''
jobs = []
dest = rsync_dest(arch_cfg, '/')
for proc in psutil.process_iter(['pid', 'name']):
target = arch_cfg.target_definition()
variables = {**os.environ, **arch_cfg.environment()}
dest = target.transfer_process_argument_prefix.format(**variables)
proc_name = target.transfer_process_name.format(**variables)
for proc in psutil.process_iter():
with contextlib.suppress(psutil.NoSuchProcess):
if proc.name() == 'rsync':
args = proc.cmdline()
for arg in args:
if arg.startswith(dest):
jobs.append(proc.pid)
with proc.oneshot():
if proc.name() == proc_name:
args = proc.cmdline()
for arg in args:
if arg.startswith(dest):
jobs.append(proc.pid)
return jobs

def archive(dir_cfg, all_jobs):
def archive(dir_cfg, arch_cfg, all_jobs):
'''Configure one archive job. Needs to know all jobs so it can avoid IO
contention on the plotting dstdir drives. Returns either (False, <reason>)
if we should not execute an archive job or (True, <cmd>) with the archive
command if we should.'''
if dir_cfg.archive is None:
return (False, "No 'archive' settings declared in plotman.yaml")
log_messages = []
if arch_cfg is None:
return (False, "No 'archive' settings declared in plotman.yaml", log_messages)

dir2ph = manager.dstdirs_to_furthest_phase(all_jobs)
best_priority = -100000000
Expand All @@ -146,33 +210,36 @@ def archive(dir_cfg, all_jobs):
chosen_plot = dir_plots[0]

if not chosen_plot:
return (False, 'No plots found')
return (False, 'No plots found', log_messages)

# TODO: sanity check that archive machine is available
# TODO: filter drives mounted RO

#
# Pick first archive dir with sufficient space
#
archdir_freebytes = get_archdir_freebytes(dir_cfg.archive)
archdir_freebytes, freebytes_log_messages = get_archdir_freebytes(arch_cfg)
log_messages.extend(freebytes_log_messages)
if not archdir_freebytes:
return(False, 'No free archive dirs found.')
return(False, 'No free archive dirs found.', log_messages)

archdir = ''
available = [(d, space) for (d, space) in archdir_freebytes.items() if
space > 1.2 * plot_util.get_k32_plotsize()]
if len(available) > 0:
index = min(dir_cfg.archive.index, len(available) - 1)
index = min(arch_cfg.index, len(available) - 1)
(archdir, freespace) = sorted(available)[index]

if not archdir:
return(False, 'No archive directories found with enough free space')

msg = 'Found %s with ~%d GB free' % (archdir, freespace / plot_util.GB)

bwlimit = dir_cfg.archive.rsyncd_bwlimit
throttle_arg = ('--bwlimit=%d' % bwlimit) if bwlimit else ''
cmd = ('rsync %s --compress-level=0 --remove-source-files -P %s %s' %
(throttle_arg, chosen_plot, rsync_dest(dir_cfg.archive, archdir)))

return (True, cmd)
return(False, 'No archive directories found with enough free space', log_messages)

env = arch_cfg.environment(
source=chosen_plot,
destination=archdir,
)
subprocess_arguments = {
'args': arch_cfg.target_definition().transfer_path,
'env': {**os.environ, **env}
}

return (True, subprocess_arguments, log_messages)
Loading