From 7216081a809d2a3ba54bcf0cd6ca436e9ce2df2f Mon Sep 17 00:00:00 2001 From: Jan-Thorsten Peter Date: Thu, 2 Mar 2023 21:41:05 +0100 Subject: [PATCH 1/2] Replaced example folder with a simpler workflow --- example/README.md | 9 ++ example/config/__init__.py | 43 -------- example/config/dump.py | 23 ----- example/config/workflow.py | 49 +++++++++ example/data/5lines.txt | 5 - example/input/lorem_upsum.txt | 21 ++++ example/recipe/__init__.py | 0 example/recipe/direct_load.py | 19 ---- example/recipe/math.py | 64 ------------ example/recipe/parallel/__init__.py | 101 ------------------- example/recipe/pipeline/__init__.py | 137 -------------------------- example/recipe/pipeline/background.sh | 6 -- example/recipe/pipeline/external.py | 33 ------- example/recipe/pipeline/starter.sh | 4 - example/recipe/splitter/__init__.py | 32 ++++++ example/recipe/splitter/splitter.py | 24 +++++ example/recipe/tools.py | 97 ++++++++++++++++++ example/settings.py | 6 +- 18 files changed, 236 insertions(+), 437 deletions(-) create mode 100644 example/README.md delete mode 100644 example/config/__init__.py delete mode 100644 example/config/dump.py create mode 100644 example/config/workflow.py delete mode 100644 example/data/5lines.txt create mode 100644 example/input/lorem_upsum.txt delete mode 100644 example/recipe/__init__.py delete mode 100644 example/recipe/direct_load.py delete mode 100644 example/recipe/math.py delete mode 100644 example/recipe/parallel/__init__.py delete mode 100644 example/recipe/pipeline/__init__.py delete mode 100755 example/recipe/pipeline/background.sh delete mode 100644 example/recipe/pipeline/external.py delete mode 100755 example/recipe/pipeline/starter.sh create mode 100644 example/recipe/splitter/__init__.py create mode 100755 example/recipe/splitter/splitter.py create mode 100644 example/recipe/tools.py diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..8dbae21 --- /dev/null +++ b/example/README.md @@ -0,0 +1,9 @@ +This folder contains a simple example workflow. + +You can start the workflow by running: +`sis manager config/workflow.main` + +- `config/workflow.py` contains the workflow definition, start here reading the code and comments +- `recipe/` contains the modules which define the actual jobs +- - `tools.py` continue reading here +- - `splitter/__init__.py` read this file last diff --git a/example/config/__init__.py b/example/config/__init__.py deleted file mode 100644 index 67dda87..0000000 --- a/example/config/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -from sisyphus import * -from recipe import parallel -from recipe import pipeline - - -@tk.block(cache=True) -def init(input_file): - input_file = tk.input_path(input_file) - - spliter = parallel.LineSpliter(input_file) - spliter.set_rqmt('run', rqmt={'cpu': 1, 'mem': 2, 'gpu': 1}) - return parallel.Parallel(spliter.out, pipeline.pipeline) - - -def main(): - input_data = tk.Path('data/5lines.txt', tags={'5lines'}) - tk.register_output('result', init(input_data).out, export_graph=True) - - -async def async_main(): - input_data = tk.Path('data', tags={'5lines'}) - input_data = input_data.join_right('5lines.txt') - spliter = parallel.LineSpliter(input_data) - # Test if requesting gpu works - spliter.set_rqmt('run', rqmt={'cpu': 1, 'mem': 2, 'gpu': 1}) - await tk.async_run(spliter.out) - - check_block = tk.sub_block('checker') - parallel_out = [] - for count, path in enumerate(spliter.out.get()): - with tk.block('pipeline_%i' % count): - p = pipeline.pipeline(path, check_block, input_data.tags | {'pipeline_%i' % count}) - parallel_out.append(p) - - await tk.async_run(parallel_out) - for p in parallel_out: - print(p.score.get(), p.out) - -if __name__ == '__main__': - input_data = tk.Path('data/5lines.txt', tags={'5lines'}) - output = init(input_data).out - tk.run(output) - tk.sh('cp %s myoutput' % output) diff --git a/example/config/dump.py b/example/config/dump.py deleted file mode 100644 index 9304083..0000000 --- a/example/config/dump.py +++ /dev/null @@ -1,23 +0,0 @@ -import logging -from sisyphus import * - -DUMP_DIR = 'dump' - - -def dump(obj, name): - filename = '%s/%s.pkl' % (DUMP_DIR, name) - outfile_dir = os.path.dirname(filename) - if not os.path.isdir(outfile_dir): - os.makedirs(outfile_dir) - if os.path.isfile(filename): - logging.warning("Skip dump since %s is already dump here: %s" % (name, filename)) - else: - with gzip.open(filename, 'wb') as f: - pickle.dump(obj, f) - - -def load(name): - filename = '%s/%s.pkl' % (DUMP_DIR, name) - fopen = gzip.open(filename, 'rb') if zipped(filename) else open(filename, 'rb') - with fopen as f: - return pickle.load(f) diff --git a/example/config/workflow.py b/example/config/workflow.py new file mode 100644 index 0000000..d351a5b --- /dev/null +++ b/example/config/workflow.py @@ -0,0 +1,49 @@ +# This imports all things typically used sisyphus +from sisyphus import * +from recipe import tools +from recipe import splitter + + +# Workflow example, it is possible to have multiple workflows in the same file. Since this function is named `main` +# you can call it by running sis manager config/workflow.main +async def main(): + # Initialize input data, tags are optional. The have no influence on the workflow, + # but make it easier to track which input influenced a job + input_data = tk.Path('input/lorem_upsum.txt', tags={'lorem_upsum'}) + + # Create new lines after . and remove empty spaces at the beginning and end of each line + sentences = tools.Pipeline(input_data, ["sed 's:\\.:\\n:g'", "sed 's:^ *::;s: *$::'"]).out + + # Count lines, lines is a Variable meaning once it's computed its value can be accessed using .get() + # or by converting it into a string + lines = tools.WordCount(sentences).lines + + # You can run computations on these variables even though they are not computed yet. + # The requested computation is stored in a wrapper object and only resolved when .get() is called + # or the object is converted into a string. An exception is raised if you call get() on an unfinished Variable + # which doesn't have a backup value. + middle = lines // 2 + first_half = tools.Head(sentences, middle).out + tk.register_output('first_half', first_half) + + # Tell Sisyphus that this output is needed and should be linked inside the output directory + tk.register_output('sentences', sentences) + + # Split each paragraph into a new line and again register output + paragraphs_job = splitter.ParagraphSplitter(sentences) + + # paragraphs_job.outputs() is a coroutine, the `await` keyword will make sure that the + # workflow will be stopped at this point and only continue once the paragraphs_job is finished + paragraphs = await paragraphs_job.outputs() + wc_per_paragraph = [tools.WordCount(paragraph) for paragraph in paragraphs] + + for i, wc in enumerate(wc_per_paragraph): + # You can also register variables as output: + tk.register_output('paragraph.%02i.words' % i, wc.words) + + # All jobs inside the wc_per_paragraph list will be computed after this line + await tk.async_run(wc_per_paragraph) + + # We can now get the final value of all variables + max_word_per_paragraph = max(wc.words.get() for wc in wc_per_paragraph) + print(f"The largest paragraph has {max_word_per_paragraph} words") diff --git a/example/data/5lines.txt b/example/data/5lines.txt deleted file mode 100644 index 26f9a27..0000000 --- a/example/data/5lines.txt +++ /dev/null @@ -1,5 +0,0 @@ -under intellectual property rights (other than patent or trademark) -Licensable by such Contributor to use, reproduce, make available, -modify, display, perform, distribute, and otherwise exploit its -Contributions, either on an unmodified basis, with Modifications, or -as part of a Larger Work; and diff --git a/example/input/lorem_upsum.txt b/example/input/lorem_upsum.txt new file mode 100644 index 0000000..0a023ab --- /dev/null +++ b/example/input/lorem_upsum.txt @@ -0,0 +1,21 @@ +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. + +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. + +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. + +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. + +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis. + +At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat. + +Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. + +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. + +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. + +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. + +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo diff --git a/example/recipe/__init__.py b/example/recipe/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/example/recipe/direct_load.py b/example/recipe/direct_load.py deleted file mode 100644 index 3cc095f..0000000 --- a/example/recipe/direct_load.py +++ /dev/null @@ -1,19 +0,0 @@ -from recipe import parallel -from recipe import pipeline - -from sisyphus import * -RelPath = tk.Path - - -@tk.block(cache=True) -def init(input_file): - input_file = tk.input_path(input_file) - - spliter = parallel.LineSpliter(input_file) - spliter.set_rqmt('run', rqmt={'cpu': 3, 'mem': 2}) - return parallel.Parallel(spliter.out, pipeline.pipeline) - - -def starter(path, tags, output): - input_data = RelPath(path, tags=tags) - tk.register_output(output, init(input_data).out, export_graph=True) diff --git a/example/recipe/math.py b/example/recipe/math.py deleted file mode 100644 index c5fa68c..0000000 --- a/example/recipe/math.py +++ /dev/null @@ -1,64 +0,0 @@ -from sisyphus import * - - -class Add(Job): - """ Simple example with an input and an output file """ - - def __init__(self, a, b): - self.a = a - self.b = b - self.out = self.output_var('result', backup='?') - - def run(self): - a = self.a - if isinstance(a, tk.Variable): - a = a.get() - b = self.b - if isinstance(b, tk.Variable): - b = b.get() - self.out.set(a + b) - - def tasks(self): - yield Task('run', mini_task=True) - - -class Multiply(Job): - """ Simple example with an input and an output file """ - - def __init__(self, a, b): - self.a = a - self.b = b - self.out = self.output_var('result', backup='?') - - def run(self): - a = self.a - if isinstance(a, tk.Variable): - a = a.get() - b = self.b - if isinstance(b, tk.Variable): - b = b.get() - self.out.set(a * b) - - def tasks(self): - yield Task('run', mini_task=True) - - -class Power(Job): - """ Simple example with an input and an output file """ - - def __init__(self, a, b): - self.a = a - self.b = b - self.out = self.output_var('result', backup='?') - - def run(self): - a = self.a - if isinstance(a, tk.Variable): - a = a.get() - b = self.b - if isinstance(b, tk.Variable): - b = b.get() - self.out.set(a**b) - - def tasks(self): - yield Task('run', mini_task=True) diff --git a/example/recipe/parallel/__init__.py b/example/recipe/parallel/__init__.py deleted file mode 100644 index 7f3eea5..0000000 --- a/example/recipe/parallel/__init__.py +++ /dev/null @@ -1,101 +0,0 @@ -import os -import gzip -import glob -import sisyphus.hash -import time - -from sisyphus import * - - -class LineSpliter(Job): - __sis_auto_cleanup__ = False # disable automatic cleanup for testing - - def __init__(self, text): - assert not tk.running_in_worker() - self.text = text - self.out_dir = self.output_path('out_dir', True) - self.out = self.output_var('out_path', pickle=True) - - def run(self): - assert tk.running_in_worker() - start_time = time.time() - waste = [] - # just waste some computation time and memory - while time.time() - start_time < 10: - waste.append(len(waste)) - - if tk.zipped(self.text): - fopen = gzip.open - else: - fopen = open - - # split file into multiple files - out_path = [] - with fopen(str(self.text)) as f: - line_nr = 0 - for line_nr, line in enumerate(f, 1): - filename = os.path.join(str(self.out_dir), 'txt.%i.gz' % line_nr) - with gzip.open(filename, 'w') as out: - print(out, filename, line, line_nr) - out.write(line.encode()) - out_path.append(self.output_path('out_dir/txt.%i.gz' % line_nr, True)) - self.out.set(out_path) - - def tasks(self): - yield Task('run', rqmt={'cpu': 1, 'gpu': 0}) - - @staticmethod - def hash(text): - return sisyphus.hash.short_hash(str(text), 18) - - -class Parallel(Job): - def __init__(self, split_path, parallel_function): - self.split_path = split_path - self.parallel_function = parallel_function - self.parallel_out = [] - self.out = self.output_path('out') - split_path.keep_value(50) - - def update(self): - if len(self.parallel_out) == 0: # only due this once - check_block = tk.sub_block('checker') - for count, path in enumerate(self.split_path.get()): - with tk.block('pipeline_%i' % count): - pipeline = self.parallel_function(path, check_block, - self.tags | {'pipeline_%i' % count}) - self.add_input(pipeline.out) - self.add_input(pipeline.score) - self.parallel_out.append(pipeline) - - # uncomment next line to test if exporting the graph warns from unexported jobs behind sets - # self.parallel_out = set(self.parallel_out) - - def run(self, pos): - self.sh('echo > run.{pos}', pos=pos) - for pipeline in self.parallel_out: - self.sh('cat {text} | gzip -d', text=pipeline.out) - self.sh('cat {score} | gzip -df', score=pipeline.score) - - def setup(self, pos): - self.sh('echo > setup.{pos}', pos=pos) - - def finalize(self): - self.sh('ls setup.*') - self.sh('ls run.*') - - max_score = -1 - max_text = None - for pipeline in self.parallel_out: - score = int(self.sh('cat {score} | gzip -df', True, score=pipeline.score)) - if score > max_score or max_text is None: - max_score = score - max_text = pipeline.out - - self.sh('echo {score} > {out}', score=max_score) - self.sh('cat {text} | gzip -d >> {out}', text=max_text) - - def tasks(self): - yield Task(start='setup', resume='setup', rqmt={}, args=range(1, 9), tries=4) - yield Task(start='run', resume='run', rqmt={}, args=range(1, 9)) - yield Task(start='finalize', resume='finalize') diff --git a/example/recipe/pipeline/__init__.py b/example/recipe/pipeline/__init__.py deleted file mode 100644 index 10f9718..0000000 --- a/example/recipe/pipeline/__init__.py +++ /dev/null @@ -1,137 +0,0 @@ -import os -from recipe.pipeline import external - -from sisyphus import * -Path = setup_path(__package__) - - -class Simple(Job): - - """ Simple example with an input and an output file """ - - def __init__(self, text): - self.text = text - self.out = self.output_path('out.gz') - - def run(self): - self.sh('echo "Pipe1" > tmp') - self.sh('cat {text} | gzip -d | cat - tmp | gzip > {out}') - - def tasks(self): - yield Task('run', rqmt={'cpu': 2}) - - -class Arguments(Job): - - """ Parallel execution of multiple arguments """ - - def __init__(self, text): - self.text = text - self.out = self.output_path('out.gz') - - def run(self, num, message): - with tk.mktemp() as self.tmp: - print(self.tmp) - self.sh('mkdir {tmp}') - self.num = num - self.sh('echo "Pipe2: {message}" > tmp.{num}', message=message) - if num == 0: - self.sh('cat tmp.{num} | gzip > {out}') - - def finalize(self, args): - self.sh('cat {text} | gzip -d > tmp') - for num, message in args: - self.sh('cat tmp.{num} >> tmp', num=num) - self.sh('cat tmp | gzip > {out}') - - def tasks(self): - args = list(enumerate(['foo', 'bar', 'code', 'abc'], 1)) - yield Task('run', args=args) - yield Task('finalize', args=[[args]]) - - -class Simple2(Job): - - def __init__(self, text): - self.text = text - self.out = self.output_path('out.gz') - - def run(self): - self.sh('echo "Pipe4" > tmp') - self.sh('cat {text} | gzip -d | cat - tmp | gzip > {out}') - - def tasks(self): - yield Task('run') - - -class FinishedParts(Job): - - def __init__(self, text): - self.text = text - self.out1 = self.output_path('out1.gz') - self.out2 = self.output_path('out2.gz') - self.out3 = self.output_path('out3.gz') - - def run(self): - self.sh('echo "FinishedParts1" > tmp') - self.sh('cat {text} | gzip -d | cat - tmp | gzip > {out1}') - self.sh('sleep 10') - self.sh('echo "FinishedParts2" > tmp') - self.sh('cat {out1} | gzip -d | cat - tmp | gzip > {out2}') - self.sh('sleep 10') - self.sh('echo "FinishedParts3" > tmp') - self.sh('cat {out2} | gzip -d | cat - tmp | gzip > {out3}') - - def tasks(self): - yield Task('run') - - def path_available(self, path): - assert isinstance(path, tk.Path) - assert path.creator == self - return os.path.isfile(str(path)) - - -class SimplePart1(Simple2): - pass # just giving it a new name - - -class SimplePart2(Simple2): - pass # just giving it a new name - - -class SimplePart3(Simple2): - pass # just giving it a new name - - -class Merger(Job): - def __init__(self, inputs): - self.inputs = inputs - self.out = self.output_path('merger.gz') - - def run(self): - self.files = ' '.join(str(i) for i in self.inputs) - self.sh('cat {files} | gzip -df | gzip > {out}') - - def tasks(self): - yield Task('run') - - -class Piepline: - - def __init__(self, text, score): - self.text = text - self.score = score - self.out = text - self.score = score - - -def pipeline(text, check_block, tags): - pipe1 = Simple(text, sis_tags=tags) - pipe2 = Arguments(pipe1.out) - with check_block: - pipe3 = external.CheckState(pipe2.out) - parts = FinishedParts(Simple2(pipe2.out).out) - pipe4 = Merger([SimplePart1(parts.out1).out, - SimplePart2(parts.out2).out, - SimplePart3(parts.out3).out]) - return Piepline(pipe4.out, pipe3.out) diff --git a/example/recipe/pipeline/background.sh b/example/recipe/pipeline/background.sh deleted file mode 100755 index 8bc9cd0..0000000 --- a/example/recipe/pipeline/background.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -SCORE=$1 -OUT=$2 -sleep 10 -echo $SCORE >> $OUT diff --git a/example/recipe/pipeline/external.py b/example/recipe/pipeline/external.py deleted file mode 100644 index a6eb220..0000000 --- a/example/recipe/pipeline/external.py +++ /dev/null @@ -1,33 +0,0 @@ -import random -import os -import time - -from sisyphus import * -RelPath = setup_path(__package__) - - -class CheckState(Job): - """Example how binaries that detach from the main process and run in the background could be handled""" - def __init__(self, text, binary=RelPath('starter.sh')): - self.text = text - self.binary = binary - self.out = self.output_path('score') - - def run(self): - # create 'random' number - random.seed(self.job_id() + ' run') - score = random.randint(0, 100000) - print(self.binary) - print(self.text) - print(self.out) - self.sh('{binary} {score} {out}', score=score) - time.sleep(10) - assert os.path.isfile('pid') - while True: - if int(self.sh('ps aux | awk \'{{print $2}}\' | grep -w `cat pid` | wc -l', True, pipefail=False)) == 1: - time.sleep(5) - else: - break - - def tasks(self): - yield Task('run', mini_task=True) diff --git a/example/recipe/pipeline/starter.sh b/example/recipe/pipeline/starter.sh deleted file mode 100755 index db176ef..0000000 --- a/example/recipe/pipeline/starter.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -nohup $(dirname $0)/background.sh $*& -echo $! > pid diff --git a/example/recipe/splitter/__init__.py b/example/recipe/splitter/__init__.py new file mode 100644 index 0000000..1f1678b --- /dev/null +++ b/example/recipe/splitter/__init__.py @@ -0,0 +1,32 @@ +from sisyphus import * +import os +# All paths created by RelPath will be relative to the current directory +# RelPath('splitter.py') points therefore at the splitter.py file inside this directory +RelPath = setup_path(__package__) + + +class ParagraphSplitter(Job): + def __init__(self, text, splitter=RelPath('splitter.py')): + assert text + assert isinstance(text, tk.Path) + + self.text = text + self.splitter = splitter + self.out_prefix = 'splitted.' + self.splitted_dir = self.output_path('out', True) + + # It's unclear how many outputs will be created by this job + # A way around it is to compute all output paths after this job has finished + async def outputs(self): + await tk.async_run(self.splitted_dir) + out = [] + for path in sorted(os.listdir(str(self.splitted_dir))): + if path.startswith(self.out_prefix): + out.append(self.output_path('out/' + path)) + return out + + def run(self): + self.sh('cat {text} | {splitter} {splitted_dir}/{out_prefix}') + + def tasks(self): + yield Task('run', 'run') diff --git a/example/recipe/splitter/splitter.py b/example/recipe/splitter/splitter.py new file mode 100755 index 0000000..786c4ac --- /dev/null +++ b/example/recipe/splitter/splitter.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +# Simple example script to split a text file into paragraphs +import sys + + +def main(): + outprefix = sys.argv[1] + counter = 0 + current_output = None + + for line in sys.stdin: + if not line.strip(): + if current_output: + current_output.close() + current_output = None + else: + if not current_output: + current_output = open("%s%03i" % (outprefix, counter), 'wt') + counter += 1 + current_output.write(line) + + +if __name__ == '__main__': + main() diff --git a/example/recipe/tools.py b/example/recipe/tools.py new file mode 100644 index 0000000..daccc49 --- /dev/null +++ b/example/recipe/tools.py @@ -0,0 +1,97 @@ +from sisyphus import * +import os +from typing import List + + +class Pipeline(Job): + """ This job takes a text file as input and pipes it through all given commands """ + + # Everything given to the constructor will be used to compute a hash for this job. + def __init__(self, text: tk.Path, pipeline: List): + # You can validating the inputs to spot errors earlier + assert text, "No text given" + assert isinstance(text, tk.Path), "Given input" + assert pipeline + + # + self.text = text + self.pipeline = pipeline + + self.out = self.output_path('out.txt') + + # Task should return a list, a generator, or something else Sisyphus can iterator over containing all + # tasks of this job. In this example the job has only one task calling the `run` function + def tasks(self): + # Getting the size of the given input file to estimate how much time we need. + # tasks() is only called when all inputs are available, we can therefore assume all input files exist. + size = os.path.getsize(self.text.get_path()) + + if size <= 1024 * 1024: + time = 1 + elif size <= 1024 * 1024 * 1024: + time = 4 + else: + time = 8 + + return [Task('run', # The first argument defines which tasks should be executed to for this task + 'run', # The second (optional) argument defines which function should be called if the job got + # interrupted, e.g. killed do too much memory usage. If no cleanup is needed this + # can be the same as the first argument. Best practices is to just write one function + # that can handle both cases on pass it in both positions. + # If no second argument is given the task will not be restarted automatically. + rqmt={'time': time, 'mem': 2, 'cpu': 2}, # Requirements needed to run this task + tries=3)] # 3 tries if pipe result is empty or pipe failed + + # This function will be call when the job is started + def run(self): + self.pipe = ' | '.join([str(i) for i in self.pipeline]) + # self.sh will run the given string in a shell. Before executing it the string format function will be called + # to replace {...} which attributes from this job. + self.sh('cat {text} | {pipe} > {out}') + # assume that we do not want empty pipe results + assert not (os.stat(str(self.out)).st_size == 0), "Pipe result was empty" + + +# Jobs are regular python classes, meaning you can just subclass an existing class to reuse it's code +class Head(Pipeline): + def __init__(self, text, length, check_output_length=True): + # tk.Delayed takes any object and converts it into a Delayed object which allows us to define operations + # which will only be computed at runtime. Here we want to delay formatting since the value of length + # isn't known before length is computed. + super().__init__(text, [tk.Delayed('head -n {}').format(length)]) + self.length = length + self.check_output_length = check_output_length + + def run(self): + super.run() + if self.check_output_length: + output_length = int(self.sh('cat {out} | wc -l', capture_output=True)) + assert self.length.get() == output_length, "Created output file is to short" + + # This is how the computed hash can be modified, since `check_output_length` does not change the output + # of this job we can exclude it from the hash computation + @classmethod + def hash(cls, parsed_args): + args = parsed_args.copy() + del args['check_output_length'] + return super().hash(args) + + +# Here is a Job with multiple Variables as output +class WordCount(Job): + def __init__(self, text): + self.text = text + self.character = self.output_var('char') + self.lines = self.output_var('lines') + self.words = self.output_var('words') + + def run(self): + line = self.sh('wc < {text}', capture_output=True) + l, w, c = line.split() + self.lines.set(int(l)) + self.words.set(int(w)) + self.character.set(int(c)) + + # Here is an example of task returning a generator + def tasks(self): + yield Task('run') diff --git a/example/settings.py b/example/settings.py index 50c944e..ec40f4b 100644 --- a/example/settings.py +++ b/example/settings.py @@ -18,8 +18,10 @@ def engine(): default_engine='long') -WAIT_PERIOD_JOB_FS_SYNC = 1 # finishing a job -WAIT_PERIOD_BETWEEN_CHECKS = 1 # checking for finished jobs +# Reducing some time outs to allow for a fast run on a local system +WAIT_PERIOD_JOB_FS_SYNC = 1 # Min wait after finishing a job +WAIT_PERIOD_MTIME_OF_INPUTS = 1 # Min wait after writing to an output file +WAIT_PERIOD_BETWEEN_CHECKS = 1 # Pause between checking for finished jobs WAIT_PERIOD_CACHE = 1 # stopping to wait for actionable jobs to appear JOB_AUTO_CLEANUP = False From ac8a4bb6a0408b95b0cec9f702b857df3ddc45f4 Mon Sep 17 00:00:00 2001 From: Jan-Thorsten Peter Date: Fri, 3 Mar 2023 13:32:08 +0100 Subject: [PATCH 2/2] Update example/recipe Co-authored-by: michelwi --- example/config/workflow.py | 6 +++--- example/recipe/splitter/__init__.py | 8 +++++--- example/recipe/tools.py | 21 ++++++++++----------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/example/config/workflow.py b/example/config/workflow.py index d351a5b..a3947f4 100644 --- a/example/config/workflow.py +++ b/example/config/workflow.py @@ -16,17 +16,17 @@ async def main(): # Count lines, lines is a Variable meaning once it's computed its value can be accessed using .get() # or by converting it into a string - lines = tools.WordCount(sentences).lines + num_lines = tools.WordCount(sentences).lines # You can run computations on these variables even though they are not computed yet. # The requested computation is stored in a wrapper object and only resolved when .get() is called # or the object is converted into a string. An exception is raised if you call get() on an unfinished Variable # which doesn't have a backup value. - middle = lines // 2 + middle = num_lines // 2 first_half = tools.Head(sentences, middle).out - tk.register_output('first_half', first_half) # Tell Sisyphus that this output is needed and should be linked inside the output directory + tk.register_output('first_half', first_half) tk.register_output('sentences', sentences) # Split each paragraph into a new line and again register output diff --git a/example/recipe/splitter/__init__.py b/example/recipe/splitter/__init__.py index 1f1678b..505c388 100644 --- a/example/recipe/splitter/__init__.py +++ b/example/recipe/splitter/__init__.py @@ -1,12 +1,14 @@ from sisyphus import * import os +from typing import Iterable, List + # All paths created by RelPath will be relative to the current directory # RelPath('splitter.py') points therefore at the splitter.py file inside this directory RelPath = setup_path(__package__) class ParagraphSplitter(Job): - def __init__(self, text, splitter=RelPath('splitter.py')): + def __init__(self, text: tk.Path, splitter: tk.Path = RelPath('splitter.py')): assert text assert isinstance(text, tk.Path) @@ -17,7 +19,7 @@ def __init__(self, text, splitter=RelPath('splitter.py')): # It's unclear how many outputs will be created by this job # A way around it is to compute all output paths after this job has finished - async def outputs(self): + async def outputs(self) -> List[tk.Path]: await tk.async_run(self.splitted_dir) out = [] for path in sorted(os.listdir(str(self.splitted_dir))): @@ -28,5 +30,5 @@ async def outputs(self): def run(self): self.sh('cat {text} | {splitter} {splitted_dir}/{out_prefix}') - def tasks(self): + def tasks(self) -> Iterable[Task]: yield Task('run', 'run') diff --git a/example/recipe/tools.py b/example/recipe/tools.py index daccc49..c3ad298 100644 --- a/example/recipe/tools.py +++ b/example/recipe/tools.py @@ -1,19 +1,18 @@ from sisyphus import * import os -from typing import List +from typing import List, Any, Dict, Iterable, Union class Pipeline(Job): """ This job takes a text file as input and pipes it through all given commands """ # Everything given to the constructor will be used to compute a hash for this job. - def __init__(self, text: tk.Path, pipeline: List): + def __init__(self, text: tk.Path, pipeline: List[Any]): # You can validating the inputs to spot errors earlier assert text, "No text given" assert isinstance(text, tk.Path), "Given input" assert pipeline - # self.text = text self.pipeline = pipeline @@ -21,7 +20,7 @@ def __init__(self, text: tk.Path, pipeline: List): # Task should return a list, a generator, or something else Sisyphus can iterator over containing all # tasks of this job. In this example the job has only one task calling the `run` function - def tasks(self): + def tasks(self) -> Iterable[Task]: # Getting the size of the given input file to estimate how much time we need. # tasks() is only called when all inputs are available, we can therefore assume all input files exist. size = os.path.getsize(self.text.get_path()) @@ -42,7 +41,7 @@ def tasks(self): rqmt={'time': time, 'mem': 2, 'cpu': 2}, # Requirements needed to run this task tries=3)] # 3 tries if pipe result is empty or pipe failed - # This function will be call when the job is started + # This function will be called when the job is started def run(self): self.pipe = ' | '.join([str(i) for i in self.pipeline]) # self.sh will run the given string in a shell. Before executing it the string format function will be called @@ -54,7 +53,7 @@ def run(self): # Jobs are regular python classes, meaning you can just subclass an existing class to reuse it's code class Head(Pipeline): - def __init__(self, text, length, check_output_length=True): + def __init__(self, text: tk.Path, length: Union[int, str, tk.Delayed], check_output_length: bool = True): # tk.Delayed takes any object and converts it into a Delayed object which allows us to define operations # which will only be computed at runtime. Here we want to delay formatting since the value of length # isn't known before length is computed. @@ -63,15 +62,15 @@ def __init__(self, text, length, check_output_length=True): self.check_output_length = check_output_length def run(self): - super.run() + super().run() if self.check_output_length: output_length = int(self.sh('cat {out} | wc -l', capture_output=True)) - assert self.length.get() == output_length, "Created output file is to short" + assert self.length.get() == output_length, "Created output file length does not match" # This is how the computed hash can be modified, since `check_output_length` does not change the output # of this job we can exclude it from the hash computation @classmethod - def hash(cls, parsed_args): + def hash(cls, parsed_args: Dict[str, Any]) -> str: args = parsed_args.copy() del args['check_output_length'] return super().hash(args) @@ -79,7 +78,7 @@ def hash(cls, parsed_args): # Here is a Job with multiple Variables as output class WordCount(Job): - def __init__(self, text): + def __init__(self, text: tk.Path): self.text = text self.character = self.output_var('char') self.lines = self.output_var('lines') @@ -93,5 +92,5 @@ def run(self): self.character.set(int(c)) # Here is an example of task returning a generator - def tasks(self): + def tasks(self) -> Iterable[Task]: yield Task('run')