Skip to content

Commit

Permalink
Merge pull request #258 from altendky/rank_unknown_phases
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky authored May 9, 2021
2 parents 5487b05 + 97f0059 commit 78e8825
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/plotman/_tests/archive_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from plotman import archive, configuration, manager
from plotman import archive, configuration, job, manager


def test_compute_priority():
assert (archive.compute_priority( (3, 1), 1000, 10) >
archive.compute_priority( (3, 6), 1000, 10) )
assert (archive.compute_priority( job.Phase(major=3, minor=1), 1000, 10) >
archive.compute_priority( job.Phase(major=3, minor=6), 1000, 10) )

def test_rsync_dest():
arch_dir = '/plotdir/012'
Expand Down
21 changes: 14 additions & 7 deletions src/plotman/_tests/manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,37 @@ def dir_cfg():
)

def test_permit_new_job_post_milestone(sched_cfg, dir_cfg):
phases = job.Phase.list_from_tuples([ (3, 8), (4, 1) ])
assert manager.phases_permit_new_job(
[ (3, 8), (4, 1) ], '/mnt/tmp/00', sched_cfg, dir_cfg)
phases, '/mnt/tmp/00', sched_cfg, dir_cfg)

def test_permit_new_job_pre_milestone(sched_cfg, dir_cfg):
phases = job.Phase.list_from_tuples([ (2, 3), (4, 1) ])
assert not manager.phases_permit_new_job(
[ (2, 3), (4, 1) ], '/mnt/tmp/00', sched_cfg, dir_cfg)
phases, '/mnt/tmp/00', sched_cfg, dir_cfg)

def test_permit_new_job_too_many_jobs(sched_cfg, dir_cfg):
phases = job.Phase.list_from_tuples([ (3, 1), (3, 2), (3, 3) ])
assert not manager.phases_permit_new_job(
[ (3, 1), (3, 2), (3, 3) ], '/mnt/tmp/00', sched_cfg, dir_cfg)
phases, '/mnt/tmp/00', sched_cfg, dir_cfg)

def test_permit_new_job_too_many_jobs_zerophase(sched_cfg, dir_cfg):
phases = job.Phase.list_from_tuples([ (3, 0), (3, 1), (3, 3) ])
assert not manager.phases_permit_new_job(
[ (3, 0), (3, 1), (3, 3) ], '/mnt/tmp/00', sched_cfg, dir_cfg)
phases, '/mnt/tmp/00', sched_cfg, dir_cfg)

def test_permit_new_job_too_many_jobs_nonephase(sched_cfg, dir_cfg):
phases = job.Phase.list_from_tuples([ (None, None), (3, 1), (3, 3) ])
assert manager.phases_permit_new_job(
[ (None, None), (3, 1), (3, 3) ], '/mnt/tmp/00', sched_cfg, dir_cfg)
phases, '/mnt/tmp/00', sched_cfg, dir_cfg)

def test_permit_new_job_override_tmp_dir(sched_cfg, dir_cfg):
phases = job.Phase.list_from_tuples([ (3, 1), (3, 2), (3, 3) ])
assert manager.phases_permit_new_job(
[ (3, 1), (3, 2), (3, 3) ], '/mnt/tmp/04', sched_cfg, dir_cfg)
phases, '/mnt/tmp/04', sched_cfg, dir_cfg)
phases = job.Phase.list_from_tuples([ (3, 1), (3, 2), (3, 3), (3, 6) ])
assert not manager.phases_permit_new_job(
[ (3, 1), (3, 2), (3, 3), (3, 6) ], '/mnt/tmp/04', sched_cfg,
phases, '/mnt/tmp/04', sched_cfg,
dir_cfg)

@patch('plotman.job.Job')
Expand Down
17 changes: 9 additions & 8 deletions src/plotman/_tests/reporting_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@
from unittest.mock import patch

from plotman import reporting
from plotman import job


def test_phases_str_basic():
assert(reporting.phases_str([(1,2), (2,3), (3,4), (4,0)]) ==
'1:2 2:3 3:4 4:0')
phases = job.Phase.list_from_tuples([(1,2), (2,3), (3,4), (4,0)])
assert reporting.phases_str(phases) == '1:2 2:3 3:4 4:0'

def test_phases_str_elipsis_1():
assert(reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 3) ==
'1:2 [+1] 3:4 4:0')
phases = job.Phase.list_from_tuples([(1,2), (2,3), (3,4), (4,0)])
assert reporting.phases_str(phases, 3) == '1:2 [+1] 3:4 4:0'

def test_phases_str_elipsis_2():
assert(reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 2) ==
'1:2 [+2] 4:0')
phases = job.Phase.list_from_tuples([(1,2), (2,3), (3,4), (4,0)])
assert reporting.phases_str(phases, 2) == '1:2 [+2] 4:0'

def test_phases_str_none():
assert(reporting.phases_str([(None, None), (2, None), (3, 0)]) ==
'?:? 2:? 3:0')
phases = job.Phase.list_from_tuples([(None, None), (3, 0)])
assert reporting.phases_str(phases) == '?:? 3:0'

def test_job_viz_empty():
assert(reporting.job_viz([]) == '1 2 3 4 ')
Expand Down
12 changes: 6 additions & 6 deletions src/plotman/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import psutil
import texttable as tt

from plotman import manager, plot_util
from plotman import job, manager, plot_util

# TODO : write-protect and delete-protect archived plots

Expand Down Expand Up @@ -62,14 +62,14 @@ def compute_priority(phase, gb_free, n_plots):
# To avoid concurrent IO, we should not touch drives that
# are about to receive a new plot. If we don't know the phase,
# ignore.
if (phase[0] and phase[1]):
if (phase == (3, 4)):
if (phase.known):
if (phase == job.Phase(3, 4)):
priority -= 4
elif (phase == (3, 5)):
elif (phase == job.Phase(3, 5)):
priority -= 8
elif (phase == (3, 6)):
elif (phase == job.Phase(3, 6)):
priority -= 16
elif (phase >= (3, 7)):
elif (phase >= job.Phase(3, 7)):
priority -= 32

# If a drive is getting full, we should prioritize it
Expand Down
41 changes: 36 additions & 5 deletions src/plotman/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# TODO do we use all these?
import argparse
import contextlib
import functools
import logging
import os
import random
Expand All @@ -11,6 +12,7 @@
from enum import Enum, auto
from subprocess import call

import attr
import click
import pendulum
import psutil
Expand Down Expand Up @@ -87,6 +89,36 @@ def __init__(self, error, help, parameters):
self.help = help
self.parameters = parameters

@functools.total_ordering
@attr.frozen(order=False)
class Phase:
major: int = 0
minor: int = 0
known: bool = True

def __lt__(self, other):
return (
(not self.known, self.major, self.minor)
< (not other.known, other.major, other.minor)
)

@classmethod
def from_tuple(cls, t):
if len(t) != 2:
raise Exception(f'phase must be created from 2-tuple: {t!r}')

if None in t and not t[0] is t[1]:
raise Exception(f'phase can not be partially known: {t!r}')

if t[0] is None:
return cls(known=False)

return cls(major=t[0], minor=t[1])

@classmethod
def list_from_tuples(cls, l):
return [cls.from_tuple(t) for t in l]

# TODO: be more principled and explicit about what we cache vs. what we look up
# dynamically from the logfile
class Job:
Expand All @@ -98,9 +130,6 @@ class Job:
plot_id = '--------'
proc = None # will get a psutil.Process

# These are dynamic, cached, and need to be udpated periodically
phase = (None, None) # Phase/subphase

def get_running_jobs(logroot, cached_jobs=()):
'''Return a list of running plot jobs. If a cache of preexisting jobs is provided,
reuse those previous jobs without updating their information. Always look for
Expand Down Expand Up @@ -137,6 +166,8 @@ def get_running_jobs(logroot, cached_jobs=()):
def __init__(self, proc, parsed_command, logroot):
'''Initialize from an existing psutil.Process object. must know logroot in order to understand open files'''
self.proc = proc
# These are dynamic, cached, and need to be udpated periodically
self.phase = Phase(known=False)

self.help = parsed_command.help
self.args = parsed_command.parameters
Expand Down Expand Up @@ -285,9 +316,9 @@ def set_phase_from_logfile(self):

if phase_subphases:
phase = max(phase_subphases.keys())
self.phase = (phase, phase_subphases[phase])
self.phase = Phase(major=phase, minor=phase_subphases[phase])
else:
self.phase = (0, 0)
self.phase = Phase(major=0, minor=0)

def progress(self):
'''Return a 2-tuple with the job phase and subphase (by reading the logfile)'''
Expand Down
9 changes: 6 additions & 3 deletions src/plotman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ def phases_permit_new_job(phases, d, sched_cfg, dir_cfg):
'''Scheduling logic: return True if it's OK to start a new job on a tmp dir
with existing jobs in the provided phases.'''
# Filter unknown-phase jobs
phases = [ph for ph in phases if ph[0] is not None and ph[1] is not None]
phases = [ph for ph in phases if ph.known]

if len(phases) == 0:
return True

milestone = (sched_cfg.tmpdir_stagger_phase_major, sched_cfg.tmpdir_stagger_phase_minor)
milestone = job.Phase(
major=sched_cfg.tmpdir_stagger_phase_major,
minor=sched_cfg.tmpdir_stagger_phase_minor,
)
# tmpdir_stagger_phase_limit default is 1, as declared in configuration.py
if len([p for p in phases if p < milestone]) >= sched_cfg.tmpdir_stagger_phase_limit:
return False
Expand Down Expand Up @@ -84,7 +87,7 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg):
tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp]
eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases
if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ]
rankable = [ (d, phases[0]) if phases else (d, (999, 999))
rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False))
for (d, phases) in eligible ]

if not eligible:
Expand Down
9 changes: 5 additions & 4 deletions src/plotman/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ def abbr_path(path, putative_prefix):
else:
return path

def phase_str(phase_pair):
(ph, subph) = phase_pair
return ((str(ph) if ph is not None else '?') + ':'
+ (str(subph) if subph is not None else '?'))
def phase_str(phase):
if not phase.known:
return '?:?'

return f'{phase.major}:{phase.minor}'

def phases_str(phases, max_num=None):
'''Take a list of phase-subphase pairs and return them as a compact string'''
Expand Down

0 comments on commit 78e8825

Please sign in to comment.