Skip to content

Commit

Permalink
Update example/recipe
Browse files Browse the repository at this point in the history
Co-authored-by: michelwi <[email protected]>
  • Loading branch information
critias and michelwi committed Mar 3, 2023
1 parent 7216081 commit 90339e6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
6 changes: 3 additions & 3 deletions example/config/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ async def main():

# Count lines, lines is a Variable meaning once it's computed its value can be accessed using .get()
# or by converting it into a string
lines = tools.WordCount(sentences).lines
num_lines = tools.WordCount(sentences).lines

# You can run computations on these variables even though they are not computed yet.
# The requested computation is stored in a wrapper object and only resolved when .get() is called
# or the object is converted into a string. An exception is raised if you call get() on an unfinished Variable
# which doesn't have a backup value.
middle = lines // 2
middle = num_lines // 2
first_half = tools.Head(sentences, middle).out
tk.register_output('first_half', first_half)

# Tell Sisyphus that this output is needed and should be linked inside the output directory
tk.register_output('first_half', first_half)
tk.register_output('sentences', sentences)

# Split each paragraph into a new line and again register output
Expand Down
8 changes: 5 additions & 3 deletions example/recipe/splitter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from sisyphus import *
import os
from typing import Iterable, List

# All paths created by RelPath will be relative to the current directory
# RelPath('splitter.py') points therefore at the splitter.py file inside this directory
RelPath = setup_path(__package__)


class ParagraphSplitter(Job):
def __init__(self, text, splitter=RelPath('splitter.py')):
def __init__(self, text: tk.Path, splitter: tk.Path = RelPath('splitter.py')):
assert text
assert isinstance(text, tk.Path)

Expand All @@ -17,7 +19,7 @@ def __init__(self, text, splitter=RelPath('splitter.py')):

# It's unclear how many outputs will be created by this job
# A way around it is to compute all output paths after this job has finished
async def outputs(self):
async def outputs(self) -> List[tk.Path]:
await tk.async_run(self.splitted_dir)
out = []
for path in sorted(os.listdir(str(self.splitted_dir))):
Expand All @@ -28,5 +30,5 @@ async def outputs(self):
def run(self):
self.sh('cat {text} | {splitter} {splitted_dir}/{out_prefix}')

def tasks(self):
def tasks(self) -> Iterable[Task]:
yield Task('run', 'run')
15 changes: 7 additions & 8 deletions example/recipe/tools.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from sisyphus import *
import os
from typing import List
from typing import List, Any, Dict, Iterable, Union


class Pipeline(Job):
""" This job takes a text file as input and pipes it through all given commands """

# Everything given to the constructor will be used to compute a hash for this job.
def __init__(self, text: tk.Path, pipeline: List):
def __init__(self, text: tk.Path, pipeline: List[Any]):
# You can validating the inputs to spot errors earlier
assert text, "No text given"
assert isinstance(text, tk.Path), "Given input"
assert pipeline

#
self.text = text
self.pipeline = pipeline

Expand Down Expand Up @@ -42,7 +41,7 @@ def tasks(self):
rqmt={'time': time, 'mem': 2, 'cpu': 2}, # Requirements needed to run this task
tries=3)] # 3 tries if pipe result is empty or pipe failed

# This function will be call when the job is started
# This function will be called when the job is started
def run(self):
self.pipe = ' | '.join([str(i) for i in self.pipeline])
# self.sh will run the given string in a shell. Before executing it the string format function will be called
Expand All @@ -54,7 +53,7 @@ def run(self):

# Jobs are regular python classes, meaning you can just subclass an existing class to reuse it's code
class Head(Pipeline):
def __init__(self, text, length, check_output_length=True):
def __init__(self, text: tk.Path, length: Union[int, str, tk.Delayed], check_output_length: bool = True):
# tk.Delayed takes any object and converts it into a Delayed object which allows us to define operations
# which will only be computed at runtime. Here we want to delay formatting since the value of length
# isn't known before length is computed.
Expand All @@ -71,15 +70,15 @@ def run(self):
# This is how the computed hash can be modified, since `check_output_length` does not change the output
# of this job we can exclude it from the hash computation
@classmethod
def hash(cls, parsed_args):
def hash(cls, parsed_args: Dict[str, Any]) -> str:
args = parsed_args.copy()
del args['check_output_length']
return super().hash(args)


# Here is a Job with multiple Variables as output
class WordCount(Job):
def __init__(self, text):
def __init__(self, text: tk.Path):
self.text = text
self.character = self.output_var('char')
self.lines = self.output_var('lines')
Expand All @@ -93,5 +92,5 @@ def run(self):
self.character.set(int(c))

# Here is an example of task returning a generator
def tasks(self):
def tasks(self) -> Iterable[Task]:
yield Task('run')

0 comments on commit 90339e6

Please sign in to comment.