From 54d870daab7909c7732311433f9429e87fc5661d Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Fri, 7 May 2021 01:07:08 -0400 Subject: [PATCH 1/6] fix: rank unknown phases --- src/plotman/manager.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/plotman/manager.py b/src/plotman/manager.py index dd7444a5..dc18f644 100644 --- a/src/plotman/manager.py +++ b/src/plotman/manager.py @@ -84,8 +84,17 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp] eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ] - rankable = [ (d, phases[0]) if phases else (d, (999, 999)) - for (d, phases) in eligible ] + + rankable = [] + for d, phases in eligible: + if len(phases) == 0: + phase = (999, 999) + else: + if None in phases[0]: + phase = (888, 888) + else: + phase = phases[0] + rankable.append((d, phase)) if not eligible: wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger) From cdbb6e3e179fe70082ffceff86019c71bb975760 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 8 May 2021 01:22:09 -0400 Subject: [PATCH 2/6] refactor: introduce Phase class --- src/plotman/_tests/reporting_test.py | 17 ++++++------ src/plotman/job.py | 39 ++++++++++++++++++++++++---- src/plotman/reporting.py | 9 ++++--- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/plotman/_tests/reporting_test.py b/src/plotman/_tests/reporting_test.py index b46dd873..2013e2a9 100644 --- a/src/plotman/_tests/reporting_test.py +++ b/src/plotman/_tests/reporting_test.py @@ -3,23 +3,24 @@ from unittest.mock import patch from plotman import reporting +from plotman import job def test_phases_str_basic(): - assert(reporting.phases_str([(1,2), (2,3), (3,4), (4,0)]) == - '1:2 2:3 3:4 4:0') + phases = job.Phase.list_from_tuples([(1,2), (2,3), (3,4), (4,0)]) + assert reporting.phases_str(phases) == '1:2 2:3 3:4 4:0' def test_phases_str_elipsis_1(): - assert(reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 3) == - '1:2 [+1] 3:4 4:0') + phases = job.Phase.list_from_tuples([(1,2), (2,3), (3,4), (4,0)]) + assert reporting.phases_str(phases, 3) == '1:2 [+1] 3:4 4:0' def test_phases_str_elipsis_2(): - assert(reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 2) == - '1:2 [+2] 4:0') + phases = job.Phase.list_from_tuples([(1,2), (2,3), (3,4), (4,0)]) + assert reporting.phases_str(phases, 2) == '1:2 [+2] 4:0' def test_phases_str_none(): - assert(reporting.phases_str([(None, None), (2, None), (3, 0)]) == - '?:? 2:? 3:0') + phases = job.Phase.list_from_tuples([(None, None), (3, 0)]) + assert reporting.phases_str(phases) == '?:? 3:0' def test_job_viz_empty(): assert(reporting.job_viz([]) == '1 2 3 4 ') diff --git a/src/plotman/job.py b/src/plotman/job.py index 8150d400..9b684c1b 100644 --- a/src/plotman/job.py +++ b/src/plotman/job.py @@ -11,6 +11,7 @@ from enum import Enum, auto from subprocess import call +import attr import click import pendulum import psutil @@ -87,6 +88,35 @@ def __init__(self, error, help, parameters): self.help = help self.parameters = parameters +@attr.frozen(order=False) +class Phase: + major: int = 0 + minor: int = 0 + known: bool = True + + def __lt__(self, other): + return ( + (self.unknown, self.major, self.minor) + < (other.unknown, other.major, other.minor) + ) + + @classmethod + def from_tuple(cls, t): + if len(t) != 2: + raise Exception(f'phase must be created from 2-tuple: {t!r}') + + if None in t and not t[0] is t[1]: + raise Exception(f'phase can not be partially known: {t!r}') + + if t[0] is None: + return cls(known=False) + + return cls(major=t[0], minor=t[1]) + + @classmethod + def list_from_tuples(cls, l): + return [cls.from_tuple(t) for t in l] + # TODO: be more principled and explicit about what we cache vs. what we look up # dynamically from the logfile class Job: @@ -98,9 +128,6 @@ class Job: plot_id = '--------' proc = None # will get a psutil.Process - # These are dynamic, cached, and need to be udpated periodically - phase = (None, None) # Phase/subphase - def get_running_jobs(logroot, cached_jobs=()): '''Return a list of running plot jobs. If a cache of preexisting jobs is provided, reuse those previous jobs without updating their information. Always look for @@ -137,6 +164,8 @@ def get_running_jobs(logroot, cached_jobs=()): def __init__(self, proc, parsed_command, logroot): '''Initialize from an existing psutil.Process object. must know logroot in order to understand open files''' self.proc = proc + # These are dynamic, cached, and need to be udpated periodically + self.phase = Phase(known=False) self.help = parsed_command.help self.args = parsed_command.parameters @@ -285,9 +314,9 @@ def set_phase_from_logfile(self): if phase_subphases: phase = max(phase_subphases.keys()) - self.phase = (phase, phase_subphases[phase]) + self.phase = Phase(major=phase, minor=phase_subphases[phase]) else: - self.phase = (0, 0) + self.phase = Phase(major=0, minor=0) def progress(self): '''Return a 2-tuple with the job phase and subphase (by reading the logfile)''' diff --git a/src/plotman/reporting.py b/src/plotman/reporting.py index 633e4c69..19f4c087 100644 --- a/src/plotman/reporting.py +++ b/src/plotman/reporting.py @@ -13,10 +13,11 @@ def abbr_path(path, putative_prefix): else: return path -def phase_str(phase_pair): - (ph, subph) = phase_pair - return ((str(ph) if ph is not None else '?') + ':' - + (str(subph) if subph is not None else '?')) +def phase_str(phase): + if not phase.known: + return '?:?' + + return f'{phase.major}:{phase.minor}' def phases_str(phases, max_num=None): '''Take a list of phase-subphase pairs and return them as a compact string''' From 792c3fcad43526a8071519ff93b333ae9f72d757 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 8 May 2021 01:34:03 -0400 Subject: [PATCH 3/6] refactor: go back a bit --- src/plotman/manager.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/plotman/manager.py b/src/plotman/manager.py index dc18f644..20800a6a 100644 --- a/src/plotman/manager.py +++ b/src/plotman/manager.py @@ -84,17 +84,8 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp] eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ] - - rankable = [] - for d, phases in eligible: - if len(phases) == 0: - phase = (999, 999) - else: - if None in phases[0]: - phase = (888, 888) - else: - phase = phases[0] - rankable.append((d, phase)) + rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False)) + for (d, phases) in eligible ] if not eligible: wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger) From bcc2a2a958bb0190f87e8d5ad10a4478c0aac833 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 8 May 2021 01:36:29 -0400 Subject: [PATCH 4/6] fix: known not unknown --- src/plotman/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plotman/job.py b/src/plotman/job.py index 9b684c1b..205d3509 100644 --- a/src/plotman/job.py +++ b/src/plotman/job.py @@ -96,8 +96,8 @@ class Phase: def __lt__(self, other): return ( - (self.unknown, self.major, self.minor) - < (other.unknown, other.major, other.minor) + (not self.known, self.major, self.minor) + < (not other.known, other.major, other.minor) ) @classmethod From f5e4c34fb33a74f954e56ed82ff3dea54f3b8d57 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 8 May 2021 01:41:20 -0400 Subject: [PATCH 5/6] fix: more phase corners --- src/plotman/archive.py | 12 ++++++------ src/plotman/manager.py | 7 +++++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/plotman/archive.py b/src/plotman/archive.py index 62059ec0..059a48e5 100644 --- a/src/plotman/archive.py +++ b/src/plotman/archive.py @@ -11,7 +11,7 @@ import psutil import texttable as tt -from plotman import manager, plot_util +from plotman import job, manager, plot_util # TODO : write-protect and delete-protect archived plots @@ -62,14 +62,14 @@ def compute_priority(phase, gb_free, n_plots): # To avoid concurrent IO, we should not touch drives that # are about to receive a new plot. If we don't know the phase, # ignore. - if (phase[0] and phase[1]): - if (phase == (3, 4)): + if (phase.known): + if (phase == job.Phase(3, 4)): priority -= 4 - elif (phase == (3, 5)): + elif (phase == job.Phase(3, 5)): priority -= 8 - elif (phase == (3, 6)): + elif (phase == job.Phase(3, 6)): priority -= 16 - elif (phase >= (3, 7)): + elif (phase >= job.Phase(3, 7)): priority -= 32 # If a drive is getting full, we should prioritize it diff --git a/src/plotman/manager.py b/src/plotman/manager.py index 20800a6a..e0ed2851 100644 --- a/src/plotman/manager.py +++ b/src/plotman/manager.py @@ -47,12 +47,15 @@ def phases_permit_new_job(phases, d, sched_cfg, dir_cfg): '''Scheduling logic: return True if it's OK to start a new job on a tmp dir with existing jobs in the provided phases.''' # Filter unknown-phase jobs - phases = [ph for ph in phases if ph[0] is not None and ph[1] is not None] + phases = [ph for ph in phases if ph.known] if len(phases) == 0: return True - milestone = (sched_cfg.tmpdir_stagger_phase_major, sched_cfg.tmpdir_stagger_phase_minor) + milestone = job.Phase( + major=sched_cfg.tmpdir_stagger_phase_major, + minor=sched_cfg.tmpdir_stagger_phase_minor, + ) # tmpdir_stagger_phase_limit default is 1, as declared in configuration.py if len([p for p in phases if p < milestone]) >= sched_cfg.tmpdir_stagger_phase_limit: return False From e37f50a05194c49c7206be3f5a43b056268bfef4 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 8 May 2021 02:01:22 -0400 Subject: [PATCH 6/6] fix: and yet more --- src/plotman/_tests/archive_test.py | 6 +++--- src/plotman/_tests/manager_test.py | 21 ++++++++++++++------- src/plotman/job.py | 2 ++ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/plotman/_tests/archive_test.py b/src/plotman/_tests/archive_test.py index 9caa2abe..4b98ac90 100755 --- a/src/plotman/_tests/archive_test.py +++ b/src/plotman/_tests/archive_test.py @@ -1,9 +1,9 @@ -from plotman import archive, configuration, manager +from plotman import archive, configuration, job, manager def test_compute_priority(): - assert (archive.compute_priority( (3, 1), 1000, 10) > - archive.compute_priority( (3, 6), 1000, 10) ) + assert (archive.compute_priority( job.Phase(major=3, minor=1), 1000, 10) > + archive.compute_priority( job.Phase(major=3, minor=6), 1000, 10) ) def test_rsync_dest(): arch_dir = '/plotdir/012' diff --git a/src/plotman/_tests/manager_test.py b/src/plotman/_tests/manager_test.py index 2f425955..cca37596 100755 --- a/src/plotman/_tests/manager_test.py +++ b/src/plotman/_tests/manager_test.py @@ -27,30 +27,37 @@ def dir_cfg(): ) def test_permit_new_job_post_milestone(sched_cfg, dir_cfg): + phases = job.Phase.list_from_tuples([ (3, 8), (4, 1) ]) assert manager.phases_permit_new_job( - [ (3, 8), (4, 1) ], '/mnt/tmp/00', sched_cfg, dir_cfg) + phases, '/mnt/tmp/00', sched_cfg, dir_cfg) def test_permit_new_job_pre_milestone(sched_cfg, dir_cfg): + phases = job.Phase.list_from_tuples([ (2, 3), (4, 1) ]) assert not manager.phases_permit_new_job( - [ (2, 3), (4, 1) ], '/mnt/tmp/00', sched_cfg, dir_cfg) + phases, '/mnt/tmp/00', sched_cfg, dir_cfg) def test_permit_new_job_too_many_jobs(sched_cfg, dir_cfg): + phases = job.Phase.list_from_tuples([ (3, 1), (3, 2), (3, 3) ]) assert not manager.phases_permit_new_job( - [ (3, 1), (3, 2), (3, 3) ], '/mnt/tmp/00', sched_cfg, dir_cfg) + phases, '/mnt/tmp/00', sched_cfg, dir_cfg) def test_permit_new_job_too_many_jobs_zerophase(sched_cfg, dir_cfg): + phases = job.Phase.list_from_tuples([ (3, 0), (3, 1), (3, 3) ]) assert not manager.phases_permit_new_job( - [ (3, 0), (3, 1), (3, 3) ], '/mnt/tmp/00', sched_cfg, dir_cfg) + phases, '/mnt/tmp/00', sched_cfg, dir_cfg) def test_permit_new_job_too_many_jobs_nonephase(sched_cfg, dir_cfg): + phases = job.Phase.list_from_tuples([ (None, None), (3, 1), (3, 3) ]) assert manager.phases_permit_new_job( - [ (None, None), (3, 1), (3, 3) ], '/mnt/tmp/00', sched_cfg, dir_cfg) + phases, '/mnt/tmp/00', sched_cfg, dir_cfg) def test_permit_new_job_override_tmp_dir(sched_cfg, dir_cfg): + phases = job.Phase.list_from_tuples([ (3, 1), (3, 2), (3, 3) ]) assert manager.phases_permit_new_job( - [ (3, 1), (3, 2), (3, 3) ], '/mnt/tmp/04', sched_cfg, dir_cfg) + phases, '/mnt/tmp/04', sched_cfg, dir_cfg) + phases = job.Phase.list_from_tuples([ (3, 1), (3, 2), (3, 3), (3, 6) ]) assert not manager.phases_permit_new_job( - [ (3, 1), (3, 2), (3, 3), (3, 6) ], '/mnt/tmp/04', sched_cfg, + phases, '/mnt/tmp/04', sched_cfg, dir_cfg) @patch('plotman.job.Job') diff --git a/src/plotman/job.py b/src/plotman/job.py index 205d3509..5116c305 100644 --- a/src/plotman/job.py +++ b/src/plotman/job.py @@ -1,6 +1,7 @@ # TODO do we use all these? import argparse import contextlib +import functools import logging import os import random @@ -88,6 +89,7 @@ def __init__(self, error, help, parameters): self.help = help self.parameters = parameters +@functools.total_ordering @attr.frozen(order=False) class Phase: major: int = 0