Skip to content

Commit

Permalink
cylc#119: update vs cylc#954
Browse files Browse the repository at this point in the history
  • Loading branch information
benfitzpatrick committed May 21, 2014
2 parents 68ff73b + 4746584 commit 8ba44d6
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 180 deletions.
90 changes: 48 additions & 42 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,98 +1125,104 @@ def process_graph_line( self, line, section, ttype, seq,
raise SuiteConfigError, "ERROR: time offsets are not legal on the right side of dependencies: " + rgroup

# now split on '&' (AND) and generate corresponding pairs
rights = re.split( '\s*&\s*', rgroup )
right_nodes = re.split( '\s*&\s*', rgroup )
else:
rights = [None]
right_nodes = [None]

new_rights = []
for r in rights:
if r:
new_right_nodes = []
for right_node in right_nodes:
if right_node:
# ignore output labels on the right (for chained
# tasks they are only meaningful on the left)
new_rights.append( re.sub( ':\w+', '', r ))
new_right_nodes.append( re.sub( ':\w+', '', right_node ))
else:
# retain None's in order to handle lone nodes on the left
new_rights.append( None )
new_right_nodes.append( None )

rights = new_rights
right_nodes = new_right_nodes

# extract task names from lexpression
nstr = re.sub( '[(|&)]', ' ', lexpression )
nstr = nstr.strip()
lnames = re.split( ' +', nstr )
left_nodes = re.split( ' +', nstr )

# detect and fail and self-dependence loops (foo => foo)
for r_name in rights:
if r_name in lnames:
print >> sys.stderr, "Self-dependence detected in '" + r_name + "':"
for right_node in right_nodes:
if right_node in left_nodes:
print >> sys.stderr, (
"Self-dependence detected in '" + right_node + "':")
print >> sys.stderr, " line:", line
print >> sys.stderr, " from:", orig_line
raise SuiteConfigError, "ERROR: self-dependence loop detected"

for rt in rights:
for right_node in right_nodes:
# foo => '!bar' means task bar should suicide if foo succeeds.
suicide = False
if rt and rt.startswith('!'):
r = rt[1:]
if right_node and right_node.startswith('!'):
right_name = right_node[1:]
suicide = True
else:
r = rt
right_name = right_node

pruned_lnames = list(lnames) # Create copy of LHS tasks.
pruned_left_nodes = list(left_nodes) # Create copy of LHS tasks.

asyncid_pattern = None
if ttype != 'cycling':
for n in lnames + [r]:
if not n:
for node in left_nodes + [right_name]:
if not node:
continue
try:
name = graphnode(
n, base_interval=seq.get_interval()).name
node_name = graphnode(
node, base_interval=seq.get_interval()).name
except GraphNodeError, x:
print >> sys.stderr, orig_line
raise SuiteConfigError, str(x)
if ttype == 'async_repeating':
if name not in self.async_repeating_tasks:
self.async_repeating_tasks.append(name)
if node_name not in self.async_repeating_tasks:
self.async_repeating_tasks.append(node_name)
m = re.match( '^ASYNCID:(.*)$', section )
asyncid_pattern = m.groups()[0]

if ttype == 'cycling':
for n in lnames:
for left_node in left_nodes:
try:
node = graphnode(
n, base_interval=seq.get_interval())
left_graph_node = graphnode(
left_node, base_interval=seq.get_interval())
except GraphNodeError, x:
print >> sys.stderr, orig_line
raise SuiteConfigError, str(x)
name = node.name
output = node.output
if name in tasks_to_prune or return_all_dependencies:
special_dependencies.append((name, output, r))
if name in tasks_to_prune:
pruned_lnames.remove(n)
left_name = left_graph_node.name
left_output = left_graph_node.output
if (left_name in tasks_to_prune or
return_all_dependencies):
special_dependencies.append(
(left_name, left_output, right_name))
if left_name in tasks_to_prune:
pruned_left_nodes.remove(left_node)

if not self.validation and not graphing_disabled:
# edges not needed for validation
self.generate_edges( lexpression, pruned_lnames, r, ttype,
self.generate_edges( lexpression, pruned_left_nodes,
right_name, ttype,
seq, suicide )
self.generate_taskdefs( orig_line, pruned_lnames, r, ttype,
self.generate_taskdefs( orig_line, pruned_left_nodes,
right_name, ttype,
section, asyncid_pattern,
seq.get_interval() )
self.generate_triggers( lexpression, pruned_lnames, r, seq,
asyncid_pattern, suicide )
self.generate_triggers( lexpression, pruned_left_nodes,
right_name, seq,
asyncid_pattern, suicide )
return special_dependencies


def generate_edges( self, lexpression, lnames, right, ttype, seq, suicide=False ):
def generate_edges( self, lexpression, left_nodes, right, ttype, seq, suicide=False ):
"""Add nodes from this graph section to the abstract graph edges structure."""
conditional = False
if re.search( '\|', lexpression ):
# plot conditional triggers differently
conditional = True

for left in lnames:
for left in left_nodes:
sasl = left in self.async_repeating_tasks
e = graphing.edge( left, right, seq, sasl, suicide, conditional )
if ttype == 'async_repeating':
Expand All @@ -1225,9 +1231,9 @@ def generate_edges( self, lexpression, lnames, right, ttype, seq, suicide=False
else:
self.edges.append(e)

def generate_taskdefs( self, line, lnames, right, ttype, section, asyncid_pattern,
def generate_taskdefs( self, line, left_nodes, right, ttype, section, asyncid_pattern,
base_interval ):
for node in lnames + [right]:
for node in left_nodes + [right]:
if not node:
# if right is None, lefts are lone nodes
# for which we still define the taskdefs
Expand Down Expand Up @@ -1306,7 +1312,7 @@ def generate_taskdefs( self, line, lnames, right, ttype, section, asyncid_patter
outp = outputx(msg, base_interval)
self.taskdefs[ name ].outputs.append( outp )

def generate_triggers( self, lexpression, lnames, right, seq, asyncid_pattern, suicide ):
def generate_triggers( self, lexpression, left_nodes, right, seq, asyncid_pattern, suicide ):
if not right:
# lefts are lone nodes; no more triggers to define.
return
Expand All @@ -1322,7 +1328,7 @@ def generate_triggers( self, lexpression, lnames, right, seq, asyncid_pattern, s

ctrig = {}
cname = {}
for left in lnames:
for left in left_nodes:
# (GraphNodeError checked above)
cycle_point = None
lnode = graphnode(left, base_interval=base_interval)
Expand Down
104 changes: 52 additions & 52 deletions lib/cylc/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,98 +289,98 @@ def set_offset( self, i_offset ):
if self.p_stop > self.p_context_stop:
self.p_stop -= self.i_step

def is_on_sequence( self, p ):
"""Is point p on-sequence, disregarding bounds?"""
def is_on_sequence( self, point ):
"""Is point on-sequence, disregarding bounds?"""
if self.i_step:
return int( p - self.p_start ) % int(self.i_step) == 0
return int( point - self.p_start ) % int(self.i_step) == 0
else:
return p == self.p_start
return point == self.p_start

def _get_point_in_bounds( self, p ):
"""Return point p, or None if out of bounds."""
if p >= self.p_start and p <= self.p_stop:
return p
def _get_point_in_bounds( self, point ):
"""Return point, or None if out of bounds."""
if point >= self.p_start and point <= self.p_stop:
return point
else:
return None

def is_valid( self, p ):
"""Is point p on-sequence and in-bounds?"""
return self.is_on_sequence( p ) and \
p >= self.p_start and p <= self.p_stop
def is_valid( self, point ):
"""Is point on-sequence and in-bounds?"""
return self.is_on_sequence( point ) and \
point >= self.p_start and point <= self.p_stop

def get_prev_point( self, p ):
"""Return the previous point < p, or None if out of bounds."""
def get_prev_point( self, point ):
"""Return the previous point < point, or None if out of bounds."""
# Only used in computing special sequential task prerequisites.
if not self.i_step:
# implies a one-off task was declared sequential
# TODO - check this results in sensible behaviour
return None
i = int( p - self.p_start ) % int(self.i_step)
i = int( point - self.p_start ) % int(self.i_step)
if i:
p_prev = p - IntegerInterval(str(i))
prev_point = point - IntegerInterval(str(i))
else:
p_prev = p - self.i_step
return self._get_point_in_bounds( p_prev )

def get_nearest_prev_point(self, p):
"""Return the largest point < some arbitrary point p."""
if self.is_on_sequence(p):
return self.get_prev_point(p)
point = self._get_point_in_bounds( self.p_start )
prev_point = point - self.i_step
return self._get_point_in_bounds( prev_point )

def get_nearest_prev_point(self, point):
"""Return the largest point < some arbitrary point."""
if self.is_on_sequence(point):
return self.get_prev_point(point)
sequence_point = self._get_point_in_bounds( self.p_start )
prev_point = None
while point is not None:
if point > p:
while sequence_point is not None:
if sequence_point > point:
# Technically, >=, but we already test for this above.
break
prev_point = point
point = self.get_next_point(point)
prev_point = sequence_point
sequence_point = self.get_next_point(sequence_point)
return prev_point

def get_next_point( self, p ):
"""Return the next point > p, or None if out of bounds."""
def get_next_point( self, point ):
"""Return the next point > point, or None if out of bounds."""
if not self.i_step:
# this is a one-off sequence
# TODO - is this needed? if so, check it results in sensible behaviour
if p < self.p_start:
if point < self.p_start:
return self.p_start
else:
return None
i = int( p - self.p_start ) % int(self.i_step)
p_next = p + self.i_step - IntegerInterval(i)
return self._get_point_in_bounds( p_next )
i = int( point - self.p_start ) % int(self.i_step)
next_point = point + self.i_step - IntegerInterval(i)
return self._get_point_in_bounds( next_point )

def get_next_point_on_sequence( self, p ):
"""Return the next point > p assuming that p is on-sequence,
def get_next_point_on_sequence( self, point ):
"""Return the next point > point assuming that point is on-sequence,
or None if out of bounds."""
# This can be used when working with a single sequence.
if not self.i_step:
return None
p_next = p + self.i_step
return self._get_point_in_bounds( p_next )
next_point = point + self.i_step
return self._get_point_in_bounds( next_point )

def get_first_point( self, p ):
"""Return the first point >= to p, or None if out of bounds."""
def get_first_point( self, point ):
"""Return the first point >= to point, or None if out of bounds."""
# Used to find the first point >= suite initial cycle time.
if p <= self.p_start:
p = self._get_point_in_bounds( self.p_start )
elif self.is_on_sequence( p ):
p = self._get_point_in_bounds( p )
if point <= self.p_start:
point = self._get_point_in_bounds( self.p_start )
elif self.is_on_sequence( point ):
point = self._get_point_in_bounds( point )
else:
p = self.get_next_point( p )
return p
point = self.get_next_point( point )
return point

def get_stop_point( self ):
"""Return the last point in this sequence, or None if unbounded."""
return self.p_stop

def __eq__( self, q ):
if self.i_step and not q.i_step or \
not self.i_step and q.i_step:
def __eq__( self, other ):
if self.i_step and not other.i_step or \
not self.i_step and other.i_step:
return False
else:
return self.i_step == q.i_step and \
self.p_start == q.p_start and \
self.p_stop == q.p_stop
return self.i_step == other.i_step and \
self.p_start == other.p_start and \
self.p_stop == other.p_stop


def init_from_cfg(cfg):
Expand Down
Loading

0 comments on commit 8ba44d6

Please sign in to comment.