diff --git a/src/plotman/_tests/archive_test.py b/src/plotman/_tests/archive_test.py index 4b98ac90..62ec1570 100755 --- a/src/plotman/_tests/archive_test.py +++ b/src/plotman/_tests/archive_test.py @@ -1,25 +1,6 @@ -from plotman import archive, configuration, job, manager +from plotman import archive, job def test_compute_priority(): assert (archive.compute_priority( job.Phase(major=3, minor=1), 1000, 10) > archive.compute_priority( job.Phase(major=3, minor=6), 1000, 10) ) - -def test_rsync_dest(): - arch_dir = '/plotdir/012' - arch_cfg = configuration.Archive( - rsyncd_module='plots_mod', - rsyncd_path='/plotdir', - rsyncd_host='thehostname', - rsyncd_user='theusername', - rsyncd_bwlimit=80000 - ) - - # Normal usage - assert ('rsync://theusername@thehostname:12000/plots_mod/012' == - archive.rsync_dest(arch_cfg, arch_dir)) - - # Usage for constructing just the prefix, for scanning process tables - # for matching jobs. - assert ('rsync://theusername@thehostname:12000/' == - archive.rsync_dest(arch_cfg, '/')) diff --git a/src/plotman/_tests/configuration_test.py b/src/plotman/_tests/configuration_test.py index 3424aa06..7dd6e074 100644 --- a/src/plotman/_tests/configuration_test.py +++ b/src/plotman/_tests/configuration_test.py @@ -13,12 +13,19 @@ def config_text_fixture(): return importlib.resources.read_text(plotman_resources, "plotman.yaml") -def test_get_validated_configs__default(config_text): +@pytest.fixture(name='target_definitions_text') +def target_definitions_text_fixture(): + return importlib.resources.read_text( + plotman_resources, "target_definitions.yaml", + ) + + +def test_get_validated_configs__default(config_text, target_definitions_text): """Check that get_validated_configs() works with default/example plotman.yaml file.""" - res = configuration.get_validated_configs(config_text, '') + res = configuration.get_validated_configs(config_text, '', target_definitions_text) assert isinstance(res, configuration.PlotmanConfig) -def test_get_validated_configs__malformed(config_text): +def test_get_validated_configs__malformed(config_text, target_definitions_text): """Check that get_validated_configs() raises exception with invalid plotman.yaml contents.""" loaded_yaml = yaml.load(config_text, Loader=yaml.SafeLoader) @@ -27,7 +34,7 @@ def test_get_validated_configs__malformed(config_text): malformed_config_text = yaml.dump(loaded_yaml, Dumper=yaml.SafeDumper) with pytest.raises(configuration.ConfigurationException) as exc_info: - configuration.get_validated_configs(malformed_config_text, '/the_path') + configuration.get_validated_configs(malformed_config_text, '/the_path', target_definitions_text) assert exc_info.value.args[0] == f"Config file at: '/the_path' is malformed" @@ -43,14 +50,14 @@ def test_get_validated_configs__missing(): ) -def test_loads_without_user_interface(config_text): +def test_loads_without_user_interface(config_text, target_definitions_text): loaded_yaml = yaml.load(config_text, Loader=yaml.SafeLoader) del loaded_yaml["user_interface"] stripped_config_text = yaml.dump(loaded_yaml, Dumper=yaml.SafeDumper) - reloaded_yaml = configuration.get_validated_configs(stripped_config_text, '') + reloaded_yaml = configuration.get_validated_configs(stripped_config_text, '', target_definitions_text) assert reloaded_yaml.user_interface == configuration.UserInterface() @@ -58,14 +65,14 @@ def test_loads_without_user_interface(config_text): def test_get_dst_directories_gets_dst(): tmp = ['/tmp'] dst = ['/dst0', '/dst1'] - directories = configuration.Directories(log='', tmp=tmp, dst=dst) + directories = configuration.Directories(tmp=tmp, dst=dst) assert directories.get_dst_directories() == dst def test_get_dst_directories_gets_tmp(): tmp = ['/tmp'] - directories = configuration.Directories(log='', tmp=tmp) + directories = configuration.Directories(tmp=tmp) assert directories.get_dst_directories() == tmp @@ -73,13 +80,13 @@ def test_get_dst_directories_gets_tmp(): def test_dst_is_dst(): tmp = ['/tmp'] dst = ['/dst0', '/dst1'] - directories = configuration.Directories(log='', tmp=tmp, dst=dst) + directories = configuration.Directories(tmp=tmp, dst=dst) assert not directories.dst_is_tmp() def test_dst_is_tmp(): tmp = ['/tmp'] - directories = configuration.Directories(log='', tmp=tmp) + directories = configuration.Directories(tmp=tmp) assert directories.dst_is_tmp() diff --git a/src/plotman/_tests/manager_test.py b/src/plotman/_tests/manager_test.py index 43a3f78e..7b1d2573 100755 --- a/src/plotman/_tests/manager_test.py +++ b/src/plotman/_tests/manager_test.py @@ -20,7 +20,6 @@ def sched_cfg(): @pytest.fixture def dir_cfg(): return configuration.Directories( - log="/plots/log", tmp=["/var/tmp", "/tmp"], dst=["/mnt/dst/00", "/mnt/dst/01", "/mnt/dst/03"], tmp_overrides={"/mnt/tmp/04": configuration.TmpOverrides(tmpdir_max_jobs=4)} diff --git a/src/plotman/archive.py b/src/plotman/archive.py index 641ae1d1..1376af7d 100644 --- a/src/plotman/archive.py +++ b/src/plotman/archive.py @@ -1,5 +1,6 @@ import argparse import contextlib +import logging import math import os import posixpath @@ -9,37 +10,68 @@ import sys from datetime import datetime +import pendulum import psutil import texttable as tt -from plotman import job, manager, plot_util +from plotman import configuration, job, manager, plot_util + + +logger = logging.getLogger(__name__) # TODO : write-protect and delete-protect archived plots -def spawn_archive_process(dir_cfg, all_jobs): +def spawn_archive_process(dir_cfg, arch_cfg, log_cfg, all_jobs): '''Spawns a new archive process using the command created in the archive() function. Returns archiving status and a log message to print.''' - log_message = None + log_messages = [] archiving_status = None # Look for running archive jobs. Be robust to finding more than one # even though the scheduler should only run one at a time. - arch_jobs = get_running_archive_jobs(dir_cfg.archive) + arch_jobs = get_running_archive_jobs(arch_cfg) if not arch_jobs: - (should_start, status_or_cmd) = archive(dir_cfg, all_jobs) + (should_start, status_or_cmd, archive_log_messages) = archive(dir_cfg, arch_cfg, all_jobs) + log_messages.extend(archive_log_messages) if not should_start: archiving_status = status_or_cmd else: - cmd = status_or_cmd - # TODO: do something useful with output instead of DEVNULL - p = subprocess.Popen(cmd, + args = status_or_cmd + + log_file_path = log_cfg.create_transfer_log_path(time=pendulum.now()) + + log_messages.append(f'Starting archive: {args["args"]} ; logging to {log_file_path}') + # TODO: CAMPid 09840103109429840981397487498131 + try: + open_log_file = open(log_file_path, 'x') + except FileExistsError: + log_messages.append( + f'Archiving log file already exists, skipping attempt to start a' + f' new archive transfer: {log_file_path!r}' + ) + return (False, log_messages) + except FileNotFoundError as e: + message = ( + f'Unable to open log file. Verify that the directory exists' + f' and has proper write permissions: {log_file_path!r}' + ) + raise Exception(message) from e + + # Preferably, do not add any code between the try block above + # and the with block below. IOW, this space intentionally left + # blank... As is, this provides a good chance that our handle + # of the log file will get closed explicitly while still + # allowing handling of just the log file opening error. + + with open_log_file: + # start_new_sessions to make the job independent of this controlling tty. + p = subprocess.Popen(**args, shell=True, - stdout=subprocess.DEVNULL, + stdout=open_log_file, stderr=subprocess.STDOUT, start_new_session=True) - log_message = 'Starting archive: ' + cmd # At least for now it seems that even if we get a new running # archive jobs list it doesn't contain the new rsync process. # My guess is that this is because the bash in the middle due to @@ -51,7 +83,7 @@ def spawn_archive_process(dir_cfg, all_jobs): if archiving_status is None: archiving_status = 'pid: ' + ', '.join(map(str, arch_jobs)) - return archiving_status, log_message + return archiving_status, log_messages def compute_priority(phase, gb_free, n_plots): # All these values are designed around dst buffer dirs of about @@ -86,50 +118,82 @@ def compute_priority(phase, gb_free, n_plots): return priority def get_archdir_freebytes(arch_cfg): + log_messages = [] + target = arch_cfg.target_definition() + archdir_freebytes = {} - df_cmd = ('ssh %s@%s df -aBK | grep " %s/"' % - (arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, posixpath.normpath(arch_cfg.rsyncd_path)) ) - with subprocess.Popen(df_cmd, shell=True, stdout=subprocess.PIPE) as proc: - for line in proc.stdout.readlines(): - fields = line.split() - if fields[3] == b'-': - # not actually mounted + timeout = 5 + try: + completed_process = subprocess.run( + [target.disk_space_path], + env={**os.environ, **arch_cfg.environment()}, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout, + ) + except subprocess.TimeoutExpired as e: + log_messages.append(f'Disk space check timed out in {timeout} seconds') + if e.stdout is None: + stdout = '' + else: + stdout = e.stdout.decode('utf-8', errors='ignore').strip() + if e.stderr is None: + stderr = '' + else: + stderr = e.stderr.decode('utf-8', errors='ignore').strip() + else: + stdout = completed_process.stdout.decode('utf-8', errors='ignore').strip() + stderr = completed_process.stderr.decode('utf-8', errors='ignore').strip() + for line in stdout.splitlines(): + line = line.strip() + split = line.split(':') + if len(split) != 2: + log_messages.append(f'Unable to parse disk script line: {line!r}') continue - freebytes = int(fields[3][:-1]) * 1024 # Strip the final 'K' - archdir = (fields[5]).decode('utf-8') - archdir_freebytes[archdir] = freebytes - return archdir_freebytes - -def rsync_dest(arch_cfg, arch_dir): - rsync_path = arch_dir.replace(arch_cfg.rsyncd_path, arch_cfg.rsyncd_module) - if rsync_path.startswith('/'): - rsync_path = rsync_path[1:] # Avoid dup slashes. TODO use path join? - rsync_url = 'rsync://%s@%s:12000/%s' % ( - arch_cfg.rsyncd_user, arch_cfg.rsyncd_host, rsync_path) - return rsync_url + archdir, space = split + freebytes = int(space) + archdir_freebytes[archdir.strip()] = freebytes + + for line in log_messages: + logger.info(line) + + logger.info('stdout from disk space script:') + for line in stdout.splitlines(): + logger.info(f' {line}') + + logger.info('stderr from disk space script:') + for line in stderr.splitlines(): + logger.info(f' {line}') + + return archdir_freebytes, log_messages # TODO: maybe consolidate with similar code in job.py? def get_running_archive_jobs(arch_cfg): '''Look for running rsync jobs that seem to match the pattern we use for archiving them. Return a list of PIDs of matching jobs.''' jobs = [] - dest = rsync_dest(arch_cfg, '/') - for proc in psutil.process_iter(['pid', 'name']): + target = arch_cfg.target_definition() + variables = {**os.environ, **arch_cfg.environment()} + dest = target.transfer_process_argument_prefix.format(**variables) + proc_name = target.transfer_process_name.format(**variables) + for proc in psutil.process_iter(): with contextlib.suppress(psutil.NoSuchProcess): - if proc.name() == 'rsync': - args = proc.cmdline() - for arg in args: - if arg.startswith(dest): - jobs.append(proc.pid) + with proc.oneshot(): + if proc.name() == proc_name: + args = proc.cmdline() + for arg in args: + if arg.startswith(dest): + jobs.append(proc.pid) return jobs -def archive(dir_cfg, all_jobs): +def archive(dir_cfg, arch_cfg, all_jobs): '''Configure one archive job. Needs to know all jobs so it can avoid IO contention on the plotting dstdir drives. Returns either (False, ) if we should not execute an archive job or (True, ) with the archive command if we should.''' - if dir_cfg.archive is None: - return (False, "No 'archive' settings declared in plotman.yaml") + log_messages = [] + if arch_cfg is None: + return (False, "No 'archive' settings declared in plotman.yaml", log_messages) dir2ph = manager.dstdirs_to_furthest_phase(all_jobs) best_priority = -100000000 @@ -146,7 +210,7 @@ def archive(dir_cfg, all_jobs): chosen_plot = dir_plots[0] if not chosen_plot: - return (False, 'No plots found') + return (False, 'No plots found', log_messages) # TODO: sanity check that archive machine is available # TODO: filter drives mounted RO @@ -154,25 +218,28 @@ def archive(dir_cfg, all_jobs): # # Pick first archive dir with sufficient space # - archdir_freebytes = get_archdir_freebytes(dir_cfg.archive) + archdir_freebytes, freebytes_log_messages = get_archdir_freebytes(arch_cfg) + log_messages.extend(freebytes_log_messages) if not archdir_freebytes: - return(False, 'No free archive dirs found.') + return(False, 'No free archive dirs found.', log_messages) archdir = '' available = [(d, space) for (d, space) in archdir_freebytes.items() if space > 1.2 * plot_util.get_k32_plotsize()] if len(available) > 0: - index = min(dir_cfg.archive.index, len(available) - 1) + index = min(arch_cfg.index, len(available) - 1) (archdir, freespace) = sorted(available)[index] if not archdir: - return(False, 'No archive directories found with enough free space') - - msg = 'Found %s with ~%d GB free' % (archdir, freespace / plot_util.GB) - - bwlimit = dir_cfg.archive.rsyncd_bwlimit - throttle_arg = ('--bwlimit=%d' % bwlimit) if bwlimit else '' - cmd = ('rsync %s --compress-level=0 --remove-source-files -P %s %s' % - (throttle_arg, chosen_plot, rsync_dest(dir_cfg.archive, archdir))) - - return (True, cmd) + return(False, 'No archive directories found with enough free space', log_messages) + + env = arch_cfg.environment( + source=chosen_plot, + destination=archdir, + ) + subprocess_arguments = { + 'args': arch_cfg.target_definition().transfer_path, + 'env': {**os.environ, **env} + } + + return (True, subprocess_arguments, log_messages) diff --git a/src/plotman/configuration.py b/src/plotman/configuration.py index 1a73491d..ad522c68 100644 --- a/src/plotman/configuration.py +++ b/src/plotman/configuration.py @@ -1,4 +1,9 @@ import contextlib +import importlib +import os +import stat +import tempfile +import textwrap from typing import Dict, List, Optional import appdirs @@ -7,6 +12,8 @@ import marshmallow import yaml +from plotman import resources as plotman_resources + class ConfigurationException(Exception): """Raised when plotman.yaml configuration is missing or malformed.""" @@ -28,7 +35,7 @@ def read_configuration_text(config_path): ) from e -def get_validated_configs(config_text, config_path): +def get_validated_configs(config_text, config_path, preset_target_definitions_text): """Return a validated instance of PlotmanConfig with data from plotman.yaml :raises ConfigurationException: Raised when plotman.yaml is either missing or malformed @@ -36,6 +43,18 @@ def get_validated_configs(config_text, config_path): schema = desert.schema(PlotmanConfig) config_objects = yaml.load(config_text, Loader=yaml.SafeLoader) + version = config_objects.get('version', (0,)) + + expected_major_version = 1 + + if version[0] != expected_major_version: + message = textwrap.dedent(f"""\ + Expected major version {expected_major_version}, found version {version} + See https://github.com/ericaltendorf/plotman/wiki/Configuration#versions + """) + + raise Exception(message) + try: loaded = schema.load(config_objects) except marshmallow.exceptions.ValidationError as e: @@ -43,32 +62,166 @@ def get_validated_configs(config_text, config_path): f"Config file at: '{config_path}' is malformed" ) from e + preset_target_objects = yaml.safe_load(preset_target_definitions_text) + preset_target_schema = desert.schema(PresetTargetDefinitions) + preset_target_definitions = preset_target_schema.load(preset_target_objects) + + loaded.archiving.target_definitions = { + **preset_target_definitions.target_definitions, + **loaded.archiving.target_definitions, + } + return loaded +class CustomStringField(marshmallow.fields.String): + def _deserialize(self, value, attr, data, **kwargs): + if isinstance(value, int): + value = str(value) + + return super()._deserialize(value, attr, data, **kwargs) # Data models used to deserializing/formatting plotman.yaml files. +# TODO: bah, mutable? bah. +@attr.mutable +class ArchivingTarget: + transfer_process_name: str + transfer_process_argument_prefix: str + # TODO: mutable attribute... + env: Dict[str, Optional[str]] = desert.ib( + factory=dict, + marshmallow_field=marshmallow.fields.Dict( + keys=marshmallow.fields.String(), + values=CustomStringField(allow_none=True), + ), + ) + disk_space_path: Optional[str] = None + disk_space_script: Optional[str] = None + transfer_path: Optional[str] = None + transfer_script: Optional[str] = None + @attr.frozen -class Archive: - rsyncd_module: str - rsyncd_path: str - rsyncd_bwlimit: int - rsyncd_host: str - rsyncd_user: str +class PresetTargetDefinitions: + target_definitions: Dict[str, ArchivingTarget] = attr.ib(factory=dict) + +# TODO: bah, mutable? bah. +@attr.mutable +class Archiving: + target: str + # TODO: mutable attribute... + env: Dict[str, str] = desert.ib( + factory=dict, + marshmallow_field=marshmallow.fields.Dict( + keys=marshmallow.fields.String(), + values=CustomStringField(), + ), + ) index: int = 0 # If not explicit, "index" will default to 0 + target_definitions: Dict[str, ArchivingTarget] = attr.ib(factory=dict) + + def target_definition(self): + return self.target_definitions[self.target] + + def environment( + self, + source=None, + destination=None, + ): + target = self.target_definition() + complete = {**target.env, **self.env} + + missing_mandatory_keys = [ + key + for key, value in complete.items() + if value is None + ] + + if len(missing_mandatory_keys) > 0: + target = repr(self.target) + missing = ', '.join(repr(key) for key in missing_mandatory_keys) + message = f'Missing env options for archival target {target}: {missing}' + raise Exception(message) + + variables = {**os.environ, **complete} + complete['process_name'] = target.transfer_process_name.format(**variables) + + if source is not None: + complete['source'] = source + + if destination is not None: + complete['destination'] = destination + + return complete + + def maybe_create_scripts(self, temp): + rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR + target = self.target_definition() + + if target.disk_space_path is None: + with tempfile.NamedTemporaryFile( + mode='w', + encoding='utf-8', + prefix='plotman-disk-space-script', + delete=False, + dir=temp, + ) as disk_space_script_file: + disk_space_script_file.write(target.disk_space_script) + + target.disk_space_path = disk_space_script_file.name + os.chmod(target.disk_space_path, rwx) + + if target.transfer_path is None: + with tempfile.NamedTemporaryFile( + mode='w', + encoding='utf-8', + prefix='plotman-transfer-script', + delete=False, + dir=temp, + ) as transfer_script_file: + transfer_script_file.write(target.transfer_script) + + target.transfer_path = transfer_script_file.name + os.chmod(target.transfer_path, rwx) @attr.frozen class TmpOverrides: tmpdir_max_jobs: Optional[int] = None +@attr.frozen +class Logging: + plots: str = os.path.join(appdirs.user_data_dir("plotman"), 'plots') + transfers: str = os.path.join(appdirs.user_data_dir("plotman"), 'transfers') + application: str = os.path.join(appdirs.user_log_dir("plotman"), 'plotman.log') + + def setup(self): + os.makedirs(self.plots, exist_ok=True) + os.makedirs(self.transfers, exist_ok=True) + os.makedirs(os.path.dirname(self.application), exist_ok=True) + + def create_plot_log_path(self, time): + return self._create_log_path( + time=time, + directory=self.plots, + group='plot', + ) + + def create_transfer_log_path(self, time): + return self._create_log_path( + time=time, + directory=self.transfers, + group='transfer', + ) + + def _create_log_path(self, time, directory, group): + timestamp = time.isoformat(timespec='microseconds').replace(':', '_') + return os.path.join(directory, f'{timestamp}.{group}.log') + @attr.frozen class Directories: - log: str tmp: List[str] dst: Optional[List[str]] = None tmp2: Optional[str] = None tmp_overrides: Optional[Dict[str, TmpOverrides]] = None - archive: Optional[Archive] = None def dst_is_tmp(self): return self.dst is None and self.tmp2 is None @@ -87,7 +240,6 @@ def get_dst_directories(self): return self.dst - @attr.frozen class Scheduling: global_max_jobs: int @@ -127,4 +279,19 @@ class PlotmanConfig: scheduling: Scheduling plotting: Plotting commands: Commands = attr.ib(factory=Commands) + logging: Logging = Logging() + archiving: Optional[Archiving] = None user_interface: UserInterface = attr.ib(factory=UserInterface) + version: List[int] = [0] + + @contextlib.contextmanager + def setup(self): + prefix = f'plotman-pid_{os.getpid()}-' + + self.logging.setup() + + with tempfile.TemporaryDirectory(prefix=prefix) as temp: + if self.archiving is not None: + self.archiving.maybe_create_scripts(temp=temp) + + yield diff --git a/src/plotman/interactive.py b/src/plotman/interactive.py index bd5e7307..acbd1248 100644 --- a/src/plotman/interactive.py +++ b/src/plotman/interactive.py @@ -63,19 +63,15 @@ def archiving_status_msg(configured, active, status): return '(not configured)' # cmd_autostart_plotting is the (optional) argument passed from the command line. May be None -def curses_main(stdscr, cmd_autostart_plotting): +def curses_main(stdscr, cmd_autostart_plotting, cfg): log = Log() - config_path = configuration.get_path() - config_text = configuration.read_configuration_text(config_path) - cfg = configuration.get_validated_configs(config_text, config_path) - if cmd_autostart_plotting is not None: plotting_active = cmd_autostart_plotting else: plotting_active = cfg.commands.interactive.autostart_plotting - archiving_configured = cfg.directories.archive is not None + archiving_configured = cfg.archiving is not None archiving_active = archiving_configured plotting_status = '' # todo rename these msg? @@ -90,7 +86,7 @@ def curses_main(stdscr, cmd_autostart_plotting): jobs_win = curses.newwin(1, 1, 1, 0) dirs_win = curses.newwin(1, 1, 1, 0) - jobs = Job.get_running_jobs(cfg.directories.log) + jobs = Job.get_running_jobs(cfg.logging.plots) last_refresh = None pressed_key = '' # For debugging @@ -112,15 +108,15 @@ def curses_main(stdscr, cmd_autostart_plotting): do_full_refresh = elapsed >= cfg.scheduling.polling_time_s if not do_full_refresh: - jobs = Job.get_running_jobs(cfg.directories.log, cached_jobs=jobs) + jobs = Job.get_running_jobs(cfg.logging.plots, cached_jobs=jobs) else: last_refresh = datetime.datetime.now() - jobs = Job.get_running_jobs(cfg.directories.log) + jobs = Job.get_running_jobs(cfg.logging.plots) if plotting_active: (started, msg) = manager.maybe_start_new_plot( - cfg.directories, cfg.scheduling, cfg.plotting + cfg.directories, cfg.scheduling, cfg.plotting, cfg.logging ) if (started): if aging_reason is not None: @@ -128,7 +124,7 @@ def curses_main(stdscr, cmd_autostart_plotting): aging_reason = None log.log(msg) plotting_status = '' - jobs = Job.get_running_jobs(cfg.directories.log, cached_jobs=jobs) + jobs = Job.get_running_jobs(cfg.logging.plots, cached_jobs=jobs) else: # If a plot is delayed for any reason other than stagger, log it if msg.find("stagger") < 0: @@ -137,11 +133,13 @@ def curses_main(stdscr, cmd_autostart_plotting): if archiving_configured: if archiving_active: - archiving_status, log_message = archive.spawn_archive_process(cfg.directories, jobs) - if log_message: + archiving_status, log_messages = archive.spawn_archive_process(cfg.directories, cfg.archiving, cfg.logging, jobs) + for log_message in log_messages: log.log(log_message) - archdir_freebytes = archive.get_archdir_freebytes(cfg.directories.archive) + archdir_freebytes, log_messages = archive.get_archdir_freebytes(cfg.archiving) + for log_message in log_messages: + log.log(log_message) # Get terminal size. Recommended method is stdscr.getmaxyx(), but this @@ -176,7 +174,11 @@ def curses_main(stdscr, cmd_autostart_plotting): dst_dir = cfg.directories.get_dst_directories() dst_prefix = os.path.commonpath(dst_dir) if archiving_configured: - arch_prefix = cfg.directories.archive.rsyncd_path + archive_directories = archdir_freebytes.keys() + if len(archive_directories) == 0: + arch_prefix = '' + else: + arch_prefix = os.path.commonpath(archive_directories) n_tmpdirs = len(cfg.directories.tmp) @@ -330,13 +332,13 @@ def curses_main(stdscr, cmd_autostart_plotting): else: pressed_key = key -def run_interactive(autostart_plotting = None): +def run_interactive(cfg, autostart_plotting = None): locale.setlocale(locale.LC_ALL, '') code = locale.getpreferredencoding() # Then use code as the encoding for str.encode() calls. try: - curses.wrapper(curses_main, autostart_plotting) + curses.wrapper(curses_main, autostart_plotting, cfg=cfg) except curses.error as e: raise TerminalTooSmallError( "Your terminal may be too small, try making it bigger.", diff --git a/src/plotman/manager.py b/src/plotman/manager.py index 8de33163..7e619ed3 100644 --- a/src/plotman/manager.py +++ b/src/plotman/manager.py @@ -71,8 +71,8 @@ def phases_permit_new_job(phases, d, sched_cfg, dir_cfg): return True -def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): - jobs = job.Job.get_running_jobs(dir_cfg.log) +def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg, log_cfg): + jobs = job.Job.get_running_jobs(log_cfg.plots) wait_reason = None # If we don't start a job this iteration, this says why. @@ -111,9 +111,7 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): else: dstdir = max(dir2ph, key=dir2ph.get) - logfile = os.path.join( - dir_cfg.log, pendulum.now().isoformat(timespec='microseconds').replace(':', '_') + '.log' - ) + log_file_path = log_cfg.create_plot_log_path(time=pendulum.now()) plot_args = ['chia', 'plots', 'create', '-k', str(plotting_cfg.k), @@ -136,10 +134,11 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): if plotting_cfg.x: plot_args.append('-x') - logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), logfile)) + logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), log_file_path)) + # TODO: CAMPid 09840103109429840981397487498131 try: - open_log_file = open(logfile, 'x') + open_log_file = open(log_file_path, 'x') except FileExistsError: # The desired log file name already exists. Most likely another # plotman process already launched a new process in response to @@ -149,13 +148,13 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): # plotting process, we'll get it at the next check cycle anyways. message = ( f'Plot log file already exists, skipping attempt to start a' - f' new plot: {logfile!r}' + f' new plot: {log_file_path!r}' ) return (False, logmsg) except FileNotFoundError as e: message = ( f'Unable to open log file. Verify that the directory exists' - f' and has proper write permissions: {logfile!r}' + f' and has proper write permissions: {log_file_path!r}' ) raise Exception(message) from e diff --git a/src/plotman/plotman.py b/src/plotman/plotman.py index 19e58174..a100ab4a 100755 --- a/src/plotman/plotman.py +++ b/src/plotman/plotman.py @@ -2,11 +2,14 @@ import datetime import importlib import importlib.resources +import logging +import logging.handlers import os import random from shutil import copyfile import time -import sys + +import pendulum # Plotman libraries from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting @@ -89,6 +92,11 @@ def get_term_width(): columns = 120 # 80 is typically too narrow. TODO: make a command line arg. return columns +class Iso8601Formatter(logging.Formatter): + def formatTime(self, record, datefmt=None): + time = pendulum.from_timestamp(timestamp=record.created, tz='local') + return time.isoformat(timespec='microseconds', ) + def main(): random.seed() @@ -137,122 +145,139 @@ def main(): config_path = configuration.get_path() config_text = configuration.read_configuration_text(config_path) - cfg = configuration.get_validated_configs(config_text, config_path) - - # - # Stay alive, spawning plot jobs - # - if args.cmd == 'plot': - print('...starting plot loop') - while True: - wait_reason = manager.maybe_start_new_plot(cfg.directories, cfg.scheduling, cfg.plotting) - - # TODO: report this via a channel that can be polled on demand, so we don't spam the console - if wait_reason: - print('...sleeping %d s: %s' % (cfg.scheduling.polling_time_s, wait_reason)) - - time.sleep(cfg.scheduling.polling_time_s) - - # - # Analysis of completed jobs - # - elif args.cmd == 'analyze': - - analyzer.analyze(args.logfile, args.clipterminals, - args.bytmp, args.bybitfield) - - else: - jobs = Job.get_running_jobs(cfg.directories.log) - - # Status report - if args.cmd == 'status': - result = "{0}\n\n{1}\n\nUpdated at: {2}".format( - reporting.status_report(jobs, get_term_width()), - reporting.summary(jobs), - datetime.datetime.today().strftime("%c"), - ) - print(result) - - # Directories report - elif args.cmd == 'dirs': - print(reporting.dirs_report(jobs, cfg.directories, cfg.scheduling, get_term_width())) - - elif args.cmd == 'interactive': - interactive.run_interactive(args.autostart_plotting) - - # Start running archival - elif args.cmd == 'archive': - print('...starting archive loop') - firstit = True - while True: - if not firstit: - print('Sleeping 60s until next iteration...') - time.sleep(60) - jobs = Job.get_running_jobs(cfg.directories.log) - firstit = False + preset_target_definitions_text = importlib.resources.read_text( + plotman_resources, "target_definitions.yaml", + ) + + cfg = configuration.get_validated_configs(config_text, config_path, preset_target_definitions_text) + + with cfg.setup(): + root_logger = logging.getLogger() + handler = logging.handlers.RotatingFileHandler( + backupCount=10, + encoding='utf-8', + filename=cfg.logging.application, + maxBytes=10_000_000, + ) + formatter = Iso8601Formatter(fmt='%(asctime)s: %(message)s') + handler.setFormatter(formatter) + root_logger.addHandler(handler) + root_logger.setLevel(logging.INFO) - archiving_status, log_message = archive.spawn_archive_process(cfg.directories, jobs) - if log_message: - print(log_message) + # + # Stay alive, spawning plot jobs + # + if args.cmd == 'plot': + print('...starting plot loop') + while True: + wait_reason = manager.maybe_start_new_plot(cfg.directories, cfg.scheduling, cfg.plotting, cfg.logging) + # TODO: report this via a channel that can be polled on demand, so we don't spam the console + if wait_reason: + print('...sleeping %d s: %s' % (cfg.scheduling.polling_time_s, wait_reason)) - # Debugging: show the destination drive usage schedule - elif args.cmd == 'dsched': - for (d, ph) in manager.dstdirs_to_furthest_phase(jobs).items(): - print(' %s : %s' % (d, str(ph))) + time.sleep(cfg.scheduling.polling_time_s) # - # Job control commands + # Analysis of completed jobs # - elif args.cmd in [ 'details', 'files', 'kill', 'suspend', 'resume' ]: - print(args) - - selected = [] - - # TODO: clean up treatment of wildcard - if args.idprefix[0] == 'all': - selected = jobs - else: - # TODO: allow multiple idprefixes, not just take the first - selected = manager.select_jobs_by_partial_id(jobs, args.idprefix[0]) - if (len(selected) == 0): - print('Error: %s matched no jobs.' % args.idprefix[0]) - elif len(selected) > 1: - print('Error: "%s" matched multiple jobs:' % args.idprefix[0]) - for j in selected: - print(' %s' % j.plot_id) - selected = [] - - for job in selected: - if args.cmd == 'details': - print(job.status_str_long()) - - elif args.cmd == 'files': - temp_files = job.get_temp_files() - for f in temp_files: - print(' %s' % f) - - elif args.cmd == 'kill': - # First suspend so job doesn't create new files - print('Pausing PID %d, plot id %s' % (job.proc.pid, job.plot_id)) - job.suspend() - - temp_files = job.get_temp_files() - print('Will kill pid %d, plot id %s' % (job.proc.pid, job.plot_id)) - print('Will delete %d temp files' % len(temp_files)) - conf = input('Are you sure? ("y" to confirm): ') - if (conf != 'y'): - print('canceled. If you wish to resume the job, do so manually.') - else: - print('killing...') - job.cancel() - print('cleaning up temp files...') + elif args.cmd == 'analyze': + + analyzer.analyze(args.logfile, args.clipterminals, + args.bytmp, args.bybitfield) + + else: + jobs = Job.get_running_jobs(cfg.logging.plots) + + # Status report + if args.cmd == 'status': + result = "{0}\n\n{1}\n\nUpdated at: {2}".format( + reporting.status_report(jobs, get_term_width()), + reporting.summary(jobs), + datetime.datetime.today().strftime("%c"), + ) + print(result) + + # Directories report + elif args.cmd == 'dirs': + print(reporting.dirs_report(jobs, cfg.directories, cfg.scheduling, get_term_width())) + + elif args.cmd == 'interactive': + interactive.run_interactive(autostart_plotting=args.autostart_plotting, cfg=cfg) + + # Start running archival + elif args.cmd == 'archive': + print('...starting archive loop') + firstit = True + while True: + if not firstit: + print('Sleeping 60s until next iteration...') + time.sleep(60) + jobs = Job.get_running_jobs(cfg.logging.plots) + firstit = False + + archiving_status, log_messages = archive.spawn_archive_process(cfg.directories, cfg.archiving, cfg.logging, jobs) + for log_message in log_messages: + print(log_message) + + + # Debugging: show the destination drive usage schedule + elif args.cmd == 'dsched': + for (d, ph) in manager.dstdirs_to_furthest_phase(jobs).items(): + print(' %s : %s' % (d, str(ph))) + + # + # Job control commands + # + elif args.cmd in [ 'details', 'files', 'kill', 'suspend', 'resume' ]: + print(args) + + selected = [] + + # TODO: clean up treatment of wildcard + if args.idprefix[0] == 'all': + selected = jobs + else: + # TODO: allow multiple idprefixes, not just take the first + selected = manager.select_jobs_by_partial_id(jobs, args.idprefix[0]) + if (len(selected) == 0): + print('Error: %s matched no jobs.' % args.idprefix[0]) + elif len(selected) > 1: + print('Error: "%s" matched multiple jobs:' % args.idprefix[0]) + for j in selected: + print(' %s' % j.plot_id) + selected = [] + + for job in selected: + if args.cmd == 'details': + print(job.status_str_long()) + + elif args.cmd == 'files': + temp_files = job.get_temp_files() for f in temp_files: - os.remove(f) - - elif args.cmd == 'suspend': - print('Suspending ' + job.plot_id) - job.suspend() - elif args.cmd == 'resume': - print('Resuming ' + job.plot_id) - job.resume() + print(' %s' % f) + + elif args.cmd == 'kill': + # First suspend so job doesn't create new files + print('Pausing PID %d, plot id %s' % (job.proc.pid, job.plot_id)) + job.suspend() + + temp_files = job.get_temp_files() + print('Will kill pid %d, plot id %s' % (job.proc.pid, job.plot_id)) + print('Will delete %d temp files' % len(temp_files)) + conf = input('Are you sure? ("y" to confirm): ') + if (conf != 'y'): + print('canceled. If you wish to resume the job, do so manually.') + else: + print('killing...') + job.cancel() + print('cleaning up temp files...') + for f in temp_files: + os.remove(f) + + elif args.cmd == 'suspend': + print('Suspending ' + job.plot_id) + job.suspend() + elif args.cmd == 'resume': + print('Resuming ' + job.plot_id) + job.resume() diff --git a/src/plotman/reporting.py b/src/plotman/reporting.py index b6dd69c4..670d82ef 100644 --- a/src/plotman/reporting.py +++ b/src/plotman/reporting.py @@ -192,7 +192,7 @@ def dst_dir_report(jobs, dstdirs, width, prefix=''): return tab.draw() def arch_dir_report(archdir_freebytes, width, prefix=''): - cells = ['%s:%5dGB' % (abbr_path(d, prefix), int(int(space) / plot_util.GB)) + cells = ['%s:%5dG' % (abbr_path(d, prefix), int(int(space) / plot_util.GB)) for (d, space) in sorted(archdir_freebytes.items())] if not cells: return '' @@ -207,16 +207,18 @@ def arch_dir_report(archdir_freebytes, width, prefix=''): return tab.draw() # TODO: remove this -def dirs_report(jobs, dir_cfg, sched_cfg, width): +def dirs_report(jobs, dir_cfg, arch_cfg, sched_cfg, width): dst_dir = dir_cfg.get_dst_directories() reports = [ tmp_dir_report(jobs, dir_cfg, sched_cfg, width), dst_dir_report(jobs, dst_dir, width), ] - if dir_cfg.archive is not None: + if arch_cfg is not None: + freebytes, archive_log_messages = archive.get_archdir_freebytes(arch_cfg) reports.extend([ 'archive dirs free space:', - arch_dir_report(archive.get_archdir_freebytes(dir_cfg.archive), width), + arch_dir_report(freebytes, width), + *archive_log_messages, ]) return '\n'.join(reports) + '\n' diff --git a/src/plotman/resources/plotman.yaml b/src/plotman/resources/plotman.yaml index d37fde56..57a0edf2 100644 --- a/src/plotman/resources/plotman.yaml +++ b/src/plotman/resources/plotman.yaml @@ -1,5 +1,17 @@ # Default/example plotman.yaml configuration file +# https://github.com/ericaltendorf/plotman/wiki/Configuration#versions +version: [1] + +logging: + # One directory in which to store all plot job logs (the STDOUT/ + # STDERR of all plot jobs). In order to monitor progress, plotman + # reads these logs on a regular basis, so using a fast drive is + # recommended. + plots: /home/chia/chia/logs + # transfers: + # application: + # Options for display and rendering user_interface: # Call out to the `stty` program to determine terminal size, instead of @@ -18,12 +30,6 @@ commands: # Where to plot and log. directories: - # One directory in which to store all plot job logs (the STDOUT/ - # STDERR of all plot jobs). In order to monitor progress, plotman - # reads these logs on a regular basis, so using a fast drive is - # recommended. - log: /home/chia/chia/logs - # One or more directories to use as tmp dirs for plotting. The # scheduler will use all of them and distribute jobs among them. # It assumes that IO is independent for each one (i.e., that each @@ -65,32 +71,36 @@ directories: - /mnt/dst/00 - /mnt/dst/01 - # Archival configuration. Optional; if you do not wish to run the - # archiving operation, comment this section out. - # - # Currently archival depends on an rsync daemon running on the remote - # host. - # The archival also uses ssh to connect to the remote host and check - # for available directories. Set up ssh keys on the remote host to - # allow public key login from rsyncd_user. - # Complete example: https://github.com/ericaltendorf/plotman/wiki/Archiving - archive: - rsyncd_module: plots # Define this in remote rsyncd.conf. - rsyncd_path: /plots # This is used via ssh. Should match path - # defined in the module referenced above. - rsyncd_bwlimit: 80000 # Bandwidth limit in KB/s - rsyncd_host: myfarmer - rsyncd_user: chia - # Optional index. If omitted or set to 0, plotman will archive - # to the first archive dir with free space. If specified, - # plotman will skip forward up to 'index' drives (if they exist). - # This can be useful to reduce io contention on a drive on the - # archive host if you have multiple plotters (simultaneous io - # can still happen at the time a drive fills up.) E.g., if you - # have four plotters, you could set this to 0, 1, 2, and 3, on - # the 4 machines, or 0, 1, 0, 1. - # index: 0 - +# Archival configuration. Optional; if you do not wish to run the +# archiving operation, comment this section out. Almost everyone +# should be using the archival feature. It is meant to distribute +# plots among multiple disks filling them all. This can be done both +# to local and to remote disks. +# +# As of v0.4, archiving commands are highly configurable. The basic +# configuration consists of a script for checking available disk space +# and another for actually transferring plots. Each can be specified +# as either a path to an existing script or inline script contents. +# It is expected that most people will use existing recipes and will +# adjust them by specifying environment variables that will set their +# system specific values. These can be provided to the scripts via +# the `env` key. plotman will additionally provide `source` and +# `destination` environment variables to the transfer script so it +# knows the specifically selected items to process. plotman also needs +# to be able to generally detect if a transfer process is already +# running. To be able to identify externally launched transfers, the +# process name and an argument prefix to match must be provided. Note +# that variable substitution of environment variables including those +# specified in the env key can be used in both process name and process +# argument prefix elements but that they use the python substitution +# format. +# +# Complete example: https://github.com/ericaltendorf/plotman/wiki/Archiving +archiving: + target: local_rsync + env: + command: rsync + site_root: /farm/sites # Plotting scheduling parameters scheduling: diff --git a/src/plotman/resources/target_definitions.yaml b/src/plotman/resources/target_definitions.yaml new file mode 100644 index 00000000..b970123c --- /dev/null +++ b/src/plotman/resources/target_definitions.yaml @@ -0,0 +1,64 @@ +target_definitions: + local_rsync: + env: + command: rsync + options: --preallocate --remove-source-files --skip-compress plot --whole-file + site_root: null + + # The disk space script must return a line for each directory + # to consider archiving to with the following form. + # + # /some/path:1000000000000 + # + # That line tells plotman that it should consider archiving + # plots to files at paths such as /some/path/theplotid.plot and + # that there is 1TB of space available for use in that + # directory. + disk_space_script: | + #!/bin/bash + set -evx + site_root_stripped=$(echo "${site_root}" | sed 's;/\+$;;') + # printf with %.0f used to handle mawk such as in Ubuntu Docker images + # otherwise it saturates and you get saturated sizes like 2147483647 + df -BK | grep " ${site_root_stripped}/" | awk '{ gsub(/K$/,"",$4); printf "%s:%.0f\n", $6, $4*1024 }' + transfer_script: | + #!/bin/bash + set -evx + "${command}" ${options} "${source}" "${destination}" + transfer_process_name: "{command}" + transfer_process_argument_prefix: "{site_root}" + rsyncd: + env: + # A value of null indicates a mandatory option + command: rsync + options: --bwlimit=80000 --preallocate --remove-source-files --skip-compress plot --whole-file + rsync_port: 873 + ssh_port: 22 + user: null + host: null + site_root: null + site: null + disk_space_script: | + #!/bin/bash + set -evx + site_root_stripped=$(echo "${site_root}" | sed 's;/\+$;;') + # printf with %.0f used to handle mawk such as in Ubuntu Docker images + # otherwise it saturates and you get saturated sizes like 2147483647 + ssh -p "${ssh_port}" "${user}@${host}" "df -BK | grep \" $(echo "${site_root_stripped}" | sed 's;/\+$;;')/\" | awk '{ gsub(/K\$/,\"\",\$4); printf \"%s:%.0f\n\", \$6, \$4*1024 }'" + transfer_script: | + #!/bin/bash + set -evx + echo Launching transfer activity + relative_path=$(realpath --relative-to="${site_root}" "${destination}") + url_root="rsync://${user}@${host}:${rsync_port}/${site}" + "${command}" ${options} "${source}" "${url_root}/${relative_path}" + transfer_process_name: "{command}" + transfer_process_argument_prefix: "rsync://{user}@{host}:{rsync_port}/{site}" +# external_script: +# env: +# some_common_value_with_a_default: /a/path +# some_mandatory option: null +# disk_space_path: /home/me/my_disk_space_script.sh +# transfer_path: /home/me/my_transfer_script.sh +# transfer_process_name: rsync +# transfer_process_argument_prefix: /the/destination/directory/root