Skip to content

Commit

Permalink
Merge pull request #627 from altendky/custom_archive
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky authored Jun 5, 2021
2 parents ec8243d + d680322 commit ec01e62
Show file tree
Hide file tree
Showing 11 changed files with 593 additions and 270 deletions.
21 changes: 1 addition & 20 deletions src/plotman/_tests/archive_test.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,6 @@
from plotman import archive, configuration, job, manager
from plotman import archive, job


def test_compute_priority():
assert (archive.compute_priority( job.Phase(major=3, minor=1), 1000, 10) >
archive.compute_priority( job.Phase(major=3, minor=6), 1000, 10) )

def test_rsync_dest():
arch_dir = '/plotdir/012'
arch_cfg = configuration.Archive(
rsyncd_module='plots_mod',
rsyncd_path='/plotdir',
rsyncd_host='thehostname',
rsyncd_user='theusername',
rsyncd_bwlimit=80000
)

# Normal usage
assert ('rsync://theusername@thehostname:12000/plots_mod/012' ==
archive.rsync_dest(arch_cfg, arch_dir))

# Usage for constructing just the prefix, for scanning process tables
# for matching jobs.
assert ('rsync://theusername@thehostname:12000/' ==
archive.rsync_dest(arch_cfg, '/'))
27 changes: 17 additions & 10 deletions src/plotman/_tests/configuration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@ def config_text_fixture():
return importlib.resources.read_text(plotman_resources, "plotman.yaml")


def test_get_validated_configs__default(config_text):
@pytest.fixture(name='target_definitions_text')
def target_definitions_text_fixture():
return importlib.resources.read_text(
plotman_resources, "target_definitions.yaml",
)


def test_get_validated_configs__default(config_text, target_definitions_text):
"""Check that get_validated_configs() works with default/example plotman.yaml file."""
res = configuration.get_validated_configs(config_text, '')
res = configuration.get_validated_configs(config_text, '', target_definitions_text)
assert isinstance(res, configuration.PlotmanConfig)

def test_get_validated_configs__malformed(config_text):
def test_get_validated_configs__malformed(config_text, target_definitions_text):
"""Check that get_validated_configs() raises exception with invalid plotman.yaml contents."""
loaded_yaml = yaml.load(config_text, Loader=yaml.SafeLoader)

Expand All @@ -27,7 +34,7 @@ def test_get_validated_configs__malformed(config_text):
malformed_config_text = yaml.dump(loaded_yaml, Dumper=yaml.SafeDumper)

with pytest.raises(configuration.ConfigurationException) as exc_info:
configuration.get_validated_configs(malformed_config_text, '/the_path')
configuration.get_validated_configs(malformed_config_text, '/the_path', target_definitions_text)

assert exc_info.value.args[0] == f"Config file at: '/the_path' is malformed"

Expand All @@ -43,43 +50,43 @@ def test_get_validated_configs__missing():
)


def test_loads_without_user_interface(config_text):
def test_loads_without_user_interface(config_text, target_definitions_text):
loaded_yaml = yaml.load(config_text, Loader=yaml.SafeLoader)

del loaded_yaml["user_interface"]

stripped_config_text = yaml.dump(loaded_yaml, Dumper=yaml.SafeDumper)

reloaded_yaml = configuration.get_validated_configs(stripped_config_text, '')
reloaded_yaml = configuration.get_validated_configs(stripped_config_text, '', target_definitions_text)

assert reloaded_yaml.user_interface == configuration.UserInterface()


def test_get_dst_directories_gets_dst():
tmp = ['/tmp']
dst = ['/dst0', '/dst1']
directories = configuration.Directories(log='', tmp=tmp, dst=dst)
directories = configuration.Directories(tmp=tmp, dst=dst)

assert directories.get_dst_directories() == dst


def test_get_dst_directories_gets_tmp():
tmp = ['/tmp']
directories = configuration.Directories(log='', tmp=tmp)
directories = configuration.Directories(tmp=tmp)

assert directories.get_dst_directories() == tmp


def test_dst_is_dst():
tmp = ['/tmp']
dst = ['/dst0', '/dst1']
directories = configuration.Directories(log='', tmp=tmp, dst=dst)
directories = configuration.Directories(tmp=tmp, dst=dst)

assert not directories.dst_is_tmp()


def test_dst_is_tmp():
tmp = ['/tmp']
directories = configuration.Directories(log='', tmp=tmp)
directories = configuration.Directories(tmp=tmp)

assert directories.dst_is_tmp()
1 change: 0 additions & 1 deletion src/plotman/_tests/manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def sched_cfg():
@pytest.fixture
def dir_cfg():
return configuration.Directories(
log="/plots/log",
tmp=["/var/tmp", "/tmp"],
dst=["/mnt/dst/00", "/mnt/dst/01", "/mnt/dst/03"],
tmp_overrides={"/mnt/tmp/04": configuration.TmpOverrides(tmpdir_max_jobs=4)}
Expand Down
175 changes: 121 additions & 54 deletions src/plotman/archive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import contextlib
import logging
import math
import os
import posixpath
Expand All @@ -9,37 +10,68 @@
import sys
from datetime import datetime

import pendulum
import psutil
import texttable as tt

from plotman import job, manager, plot_util
from plotman import configuration, job, manager, plot_util


logger = logging.getLogger(__name__)

# TODO : write-protect and delete-protect archived plots

def spawn_archive_process(dir_cfg, all_jobs):
def spawn_archive_process(dir_cfg, arch_cfg, log_cfg, all_jobs):
'''Spawns a new archive process using the command created
in the archive() function. Returns archiving status and a log message to print.'''

log_message = None
log_messages = []
archiving_status = None

# Look for running archive jobs. Be robust to finding more than one
# even though the scheduler should only run one at a time.
arch_jobs = get_running_archive_jobs(dir_cfg.archive)
arch_jobs = get_running_archive_jobs(arch_cfg)

if not arch_jobs:
(should_start, status_or_cmd) = archive(dir_cfg, all_jobs)
(should_start, status_or_cmd, archive_log_messages) = archive(dir_cfg, arch_cfg, all_jobs)
log_messages.extend(archive_log_messages)
if not should_start:
archiving_status = status_or_cmd
else:
cmd = status_or_cmd
# TODO: do something useful with output instead of DEVNULL
p = subprocess.Popen(cmd,
args = status_or_cmd

log_file_path = log_cfg.create_transfer_log_path(time=pendulum.now())

log_messages.append(f'Starting archive: {args["args"]} ; logging to {log_file_path}')
# TODO: CAMPid 09840103109429840981397487498131
try:
open_log_file = open(log_file_path, 'x')
except FileExistsError:
log_messages.append(
f'Archiving log file already exists, skipping attempt to start a'
f' new archive transfer: {log_file_path!r}'
)
return (False, log_messages)
except FileNotFoundError as e:
message = (
f'Unable to open log file. Verify that the directory exists'
f' and has proper write permissions: {log_file_path!r}'
)
raise Exception(message) from e

# Preferably, do not add any code between the try block above
# and the with block below. IOW, this space intentionally left
# blank... As is, this provides a good chance that our handle
# of the log file will get closed explicitly while still
# allowing handling of just the log file opening error.

with open_log_file:
# start_new_sessions to make the job independent of this controlling tty.
p = subprocess.Popen(**args,
shell=True,
stdout=subprocess.DEVNULL,
stdout=open_log_file,
stderr=subprocess.STDOUT,
start_new_session=True)
log_message = 'Starting archive: ' + cmd
# At least for now it seems that even if we get a new running
# archive jobs list it doesn't contain the new rsync process.
# My guess is that this is because the bash in the middle due to
Expand All @@ -51,7 +83,7 @@ def spawn_archive_process(dir_cfg, all_jobs):
if archiving_status is None:
archiving_status = 'pid: ' + ', '.join(map(str, arch_jobs))

return archiving_status, log_message
return archiving_status, log_messages

def compute_priority(phase, gb_free, n_plots):
# All these values are designed around dst buffer dirs of about
Expand Down Expand Up @@ -86,50 +118,82 @@ def compute_priority(phase, gb_free, n_plots):
return priority

def get_archdir_freebytes(arch_cfg):
log_messages = []
target = arch_cfg.target_definition()

archdir_freebytes = {}
df_cmd = ('ssh %s@%s df -aBK | grep " %s/"' %
(arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, posixpath.normpath(arch_cfg.rsyncd_path)) )
with subprocess.Popen(df_cmd, shell=True, stdout=subprocess.PIPE) as proc:
for line in proc.stdout.readlines():
fields = line.split()
if fields[3] == b'-':
# not actually mounted
timeout = 5
try:
completed_process = subprocess.run(
[target.disk_space_path],
env={**os.environ, **arch_cfg.environment()},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
except subprocess.TimeoutExpired as e:
log_messages.append(f'Disk space check timed out in {timeout} seconds')
if e.stdout is None:
stdout = ''
else:
stdout = e.stdout.decode('utf-8', errors='ignore').strip()
if e.stderr is None:
stderr = ''
else:
stderr = e.stderr.decode('utf-8', errors='ignore').strip()
else:
stdout = completed_process.stdout.decode('utf-8', errors='ignore').strip()
stderr = completed_process.stderr.decode('utf-8', errors='ignore').strip()
for line in stdout.splitlines():
line = line.strip()
split = line.split(':')
if len(split) != 2:
log_messages.append(f'Unable to parse disk script line: {line!r}')
continue
freebytes = int(fields[3][:-1]) * 1024 # Strip the final 'K'
archdir = (fields[5]).decode('utf-8')
archdir_freebytes[archdir] = freebytes
return archdir_freebytes

def rsync_dest(arch_cfg, arch_dir):
rsync_path = arch_dir.replace(arch_cfg.rsyncd_path, arch_cfg.rsyncd_module)
if rsync_path.startswith('/'):
rsync_path = rsync_path[1:] # Avoid dup slashes. TODO use path join?
rsync_url = 'rsync://%s@%s:12000/%s' % (
arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, rsync_path)
return rsync_url
archdir, space = split
freebytes = int(space)
archdir_freebytes[archdir.strip()] = freebytes

for line in log_messages:
logger.info(line)

logger.info('stdout from disk space script:')
for line in stdout.splitlines():
logger.info(f' {line}')

logger.info('stderr from disk space script:')
for line in stderr.splitlines():
logger.info(f' {line}')

return archdir_freebytes, log_messages

# TODO: maybe consolidate with similar code in job.py?
def get_running_archive_jobs(arch_cfg):
'''Look for running rsync jobs that seem to match the pattern we use for archiving
them. Return a list of PIDs of matching jobs.'''
jobs = []
dest = rsync_dest(arch_cfg, '/')
for proc in psutil.process_iter(['pid', 'name']):
target = arch_cfg.target_definition()
variables = {**os.environ, **arch_cfg.environment()}
dest = target.transfer_process_argument_prefix.format(**variables)
proc_name = target.transfer_process_name.format(**variables)
for proc in psutil.process_iter():
with contextlib.suppress(psutil.NoSuchProcess):
if proc.name() == 'rsync':
args = proc.cmdline()
for arg in args:
if arg.startswith(dest):
jobs.append(proc.pid)
with proc.oneshot():
if proc.name() == proc_name:
args = proc.cmdline()
for arg in args:
if arg.startswith(dest):
jobs.append(proc.pid)
return jobs

def archive(dir_cfg, all_jobs):
def archive(dir_cfg, arch_cfg, all_jobs):
'''Configure one archive job. Needs to know all jobs so it can avoid IO
contention on the plotting dstdir drives. Returns either (False, <reason>)
if we should not execute an archive job or (True, <cmd>) with the archive
command if we should.'''
if dir_cfg.archive is None:
return (False, "No 'archive' settings declared in plotman.yaml")
log_messages = []
if arch_cfg is None:
return (False, "No 'archive' settings declared in plotman.yaml", log_messages)

dir2ph = manager.dstdirs_to_furthest_phase(all_jobs)
best_priority = -100000000
Expand All @@ -146,33 +210,36 @@ def archive(dir_cfg, all_jobs):
chosen_plot = dir_plots[0]

if not chosen_plot:
return (False, 'No plots found')
return (False, 'No plots found', log_messages)

# TODO: sanity check that archive machine is available
# TODO: filter drives mounted RO

#
# Pick first archive dir with sufficient space
#
archdir_freebytes = get_archdir_freebytes(dir_cfg.archive)
archdir_freebytes, freebytes_log_messages = get_archdir_freebytes(arch_cfg)
log_messages.extend(freebytes_log_messages)
if not archdir_freebytes:
return(False, 'No free archive dirs found.')
return(False, 'No free archive dirs found.', log_messages)

archdir = ''
available = [(d, space) for (d, space) in archdir_freebytes.items() if
space > 1.2 * plot_util.get_k32_plotsize()]
if len(available) > 0:
index = min(dir_cfg.archive.index, len(available) - 1)
index = min(arch_cfg.index, len(available) - 1)
(archdir, freespace) = sorted(available)[index]

if not archdir:
return(False, 'No archive directories found with enough free space')

msg = 'Found %s with ~%d GB free' % (archdir, freespace / plot_util.GB)

bwlimit = dir_cfg.archive.rsyncd_bwlimit
throttle_arg = ('--bwlimit=%d' % bwlimit) if bwlimit else ''
cmd = ('rsync %s --compress-level=0 --remove-source-files -P %s %s' %
(throttle_arg, chosen_plot, rsync_dest(dir_cfg.archive, archdir)))

return (True, cmd)
return(False, 'No archive directories found with enough free space', log_messages)

env = arch_cfg.environment(
source=chosen_plot,
destination=archdir,
)
subprocess_arguments = {
'args': arch_cfg.target_definition().transfer_path,
'env': {**os.environ, **env}
}

return (True, subprocess_arguments, log_messages)
Loading

0 comments on commit ec01e62

Please sign in to comment.