-
Notifications
You must be signed in to change notification settings - Fork 2
/
pipeline_tools.py
1921 lines (1657 loc) · 80.9 KB
/
pipeline_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
## SeA-SnaP tools used in pipelines
## version: 0.9.5
## author: J.P.Pett ([email protected])
import sys, os, re, shutil, hashlib, itertools, yaml, pandas as pd
from builtins import isinstance, TypeError
from collections.abc import Mapping, Iterable
from collections import namedtuple, OrderedDict
from types import SimpleNamespace
from contextlib import contextmanager
from copy import deepcopy
from time import strftime
from warnings import warn
import warnings
from pathlib import Path
from glob import iglob, glob
# from snakemake.io import glob_wildcards
yaml.add_representer(OrderedDict, lambda dumper, data: dumper.represent_dict(dict(data)))
warnings.simplefilter("always")
##################################################################################################################################
# ---------------------------------------------- base class for handling file paths ----------------------------------------------#
##################################################################################################################################
class PipelinePathHandler:
"""
Generates file paths from patterns defined in config.
Loads and checks paths from config file upon initialization and provides
methods to fill and expand the wildcards.
"""
allowed_wildcards = ["step", "extension"]
required_wildcards_out_log = ["step", "extension"]
required_wildcards_in = []
wildcard_fix_values = {}
def __init__(self, workflow, test_config=False, test_allowed_wildcards=True):
self.test_config = self._load_test_config(test_config)
if self.test_config:
self._test_config_general(workflow.config, self.test_config)
self.snakemake_workflow = workflow
self.out_path_pattern = self.snakemake_workflow.config["pipeline_param"]["out_path_pattern"]
self.log_path_pattern = self.snakemake_workflow.config["pipeline_param"]["log_path_pattern"]
self.in_path_pattern = self.snakemake_workflow.config["pipeline_param"]["in_path_pattern"]
self.wildcard_constraints = self._prepare_inpathpattern()
self.out_dir_pattern = "/".join(self.out_path_pattern.split("/")[:-1])
self.out_path_wildcards = self._get_wildcard_list(self.out_path_pattern)
self.log_path_wildcards = self._get_wildcard_list(self.log_path_pattern)
self.in_path_wildcards = self._get_wildcard_list(self.in_path_pattern)
self.outdir_path_wildcards = self._get_wildcard_list(self.out_dir_pattern)
self.input_choice = self._set_input_choice(self.snakemake_workflow.config)
# check consistency of loaded input and output patterns
if self.test_config:
self._test_config_input(test_allowed_wildcards)
# wildcard values
self.wildcard_values = self._get_wildcard_values_from_input(self.in_path_pattern)
# neutral replacement for optional wildcards
self.opt_wildcard_placeholders = {w: "{{{}}}".format(w) for w in
set(self.in_path_wildcards) - set(self.required_wildcards_in)}
# path pattern of out directory may miss some wildcards
placeholders_in_outdir_path = set(self.outdir_path_wildcards) & set(self.opt_wildcard_placeholders.keys())
self.opt_wildc_placeh_outdir = {key: self.opt_wildcard_placeholders[key] for key in placeholders_in_outdir_path}
# ---------------------------------------------------- helper methods ----------------------------------------------------#
def _load_test_config(self, test_config):
""" load test_config for validity check of config values """
if test_config:
if type(test_config) is dict:
return test_config
elif isinstance(test_config, str):
with open(test_config, "r") as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
else:
raise TypeError("Wrong type of argument test_config: must be dict or str.")
else:
return None
def _test_config_input(self, test_allowed_wildcards):
""" test whether wildcards in path patterns are valid """
if test_allowed_wildcards:
# test if all wildcards allowed
if not all(x in self.allowed_wildcards for x in
self.out_path_wildcards + self.log_path_wildcards + self.in_path_wildcards):
raise ValueError(
"Error in config file: unknown wildcards. allowed wildcards: {}".format(self.allowed_wildcards))
# test for required wildcards in all patterns
if not all(
x in self.out_path_wildcards and x in self.log_path_wildcards for x in self.required_wildcards_out_log):
raise ValueError("Error in config file: 'step', 'extension', 'sample' and 'name' must be wildcards "
"in out_path_pattern and log_path_pattern")
if not all(x in self.in_path_wildcards for x in self.required_wildcards_in):
raise ValueError("Error in config file: 'step', 'extension', 'sample' and 'name' must be wildcards "
"in out_path_pattern and log_path_pattern")
def _test_config_general(self, base_dict, check_dict):
""" test whether values set in the config are valid """
for key, val in check_dict.items():
## key test
# if wildcard
if key == "__any__":
for glob_key in base_dict:
if glob_key not in check_dict:
self._test_config_general(base_dict, {glob_key: val})
# if reference
elif key == "__any_other__":
if isinstance(val, str):
ref_dict = self.test_config
for r_key in val.split(":"): ref_dict = ref_dict[r_key]
elif isinstance(val, dict):
ref_dict = val
for opt_key in ref_dict:
if opt_key in base_dict:
self._test_config_general(base_dict, {opt_key: ref_dict[opt_key]})
# if missing
elif key not in base_dict and key not in ["__num__", "__opt__"]:
raise KeyError("Error in config file: The required key '{}' was not defined!".format(key))
## value test
# if string
elif isinstance(val, str):
if isinstance(base_dict[key], str):
if not re.fullmatch(val, base_dict[key], re.DOTALL):
raise ValueError("Error in config file: value of '{}' ('{}') does not match '{}'!".format(key,
base_dict[
key],
val))
else:
raise TypeError(
"Error in config file: value of '{}' should be a string! got: {}".format(key, base_dict[key]))
# if list
elif isinstance(val, list):
if isinstance(base_dict[key], list):
for item in base_dict[key]:
self._test_config_general({key: item}, {key: val[0]})
else:
raise TypeError(
"Error in config file: value of '{}' should be a list! got: {}".format(key, base_dict[key]))
# if None
elif val is None:
if base_dict[key] is not None:
raise TypeError(
"Error in config file: value of '{}' should be None! got: {}".format(key, base_dict[key]))
# if bool
elif isinstance(val, bool):
if not isinstance(base_dict[key], bool):
raise TypeError(
"Error in config file: value of '{}' should be bool! got: {}".format(key, base_dict[key]))
# if dict
elif isinstance(val, dict):
# if number range
if len(val) == 1 and list(val)[0] == "__num__":
if not isinstance(base_dict[key], (int, float)):
raise TypeError("Error in config file: value of '{}' must be a number!".format(key))
num_range = list(val.values())[0]
assert len(num_range) == 2
if num_range[0] and not base_dict[key] >= num_range[0]:
raise ValueError("Error in config file: value of '{}' must be >={}!".format(key, num_range[0]))
if num_range[1] and not base_dict[key] <= num_range[1]:
raise ValueError("Error in config file: value of '{}' must be <={}!".format(key, num_range[1]))
else:
others = {}
for v_key, v_val in val.items():
# if option list (OR)
if v_key == "__opt__":
options, error_num = v_val, 0
for option in options:
try:
self._test_config_general(base_dict, {key: option})
except (KeyError, ValueError, TypeError) as err:
if not re.match("\"?Error in config file", str(err)):
raise
error_num += 1
except:
raise
if error_num == len(options):
raise ValueError("Error in config file: no valid option for '{}' ('{}')! "
"must be one of: {}".format(key, base_dict[key], options))
else:
others[v_key] = v_val
# others (AND)
if others:
if isinstance(base_dict[key], dict):
self._test_config_general(base_dict[key], others)
else:
raise TypeError("Error in config file: value of '{}' must be a dict!".format(key))
def _set_input_choice(self, config):
""" create input choice dictionary from config ensuring a standardized structure """
input_choice = {}
if "input_choice" in config["pipeline_param"] and isinstance(config["pipeline_param"]["input_choice"], dict):
for key, val in config["pipeline_param"]["input_choice"].items():
if val:
if isinstance(val, str):
input_choice[key] = [val]
elif isinstance(val, list):
input_choice[key] = val
else:
raise TypeError("Error in config: input_choice elements must have dict values "
"str or list, not {}!".format(type(val)))
return input_choice
def _get_wildcard_list(self, pattern):
return re.findall("{([^}]+)}", pattern)
def _make_name(self, description):
return re.sub("\.|/|-|\s", "_", description)
@staticmethod
def _md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
@staticmethod
def _is_older(file1, file2):
file1, file2 = Path(file1), Path(file2)
assert file1.is_file()
if file2.is_file():
return file1.stat().st_mtime < file2.stat().st_mtime
elif file2.is_dir():
newest_f = float("inf")
for p in file2.rglob("*"):
if p.stat().st_mtime < newest_f:
newest_f = p.stat().st_mtime
return file1.stat().st_mtime < newest_f
else:
raise TypeError(f"Path {file2} is not a file or directry!")
@staticmethod
def _get_names_values(data):
if isinstance(data, Mapping):
data_values = list(data.values())
data_keys = list(data.keys())
elif isinstance(data, Iterable):
data_values = list(data)
data_keys = None
else:
raise TypeError(f"Cannot get names and values for type {type(data)}!")
return data_keys, data_values
def _get_wildcard_combinations(self, wildcard_values):
""" go through wildcard values and get combinations """
combinations = []
WildcardComb = namedtuple("WildcardComb", [s for s in wildcard_values])
wildcard_comb_num = len(wildcard_values["sample"])
assert all([len(val) == wildcard_comb_num for val in wildcard_values.values()])
for index in range(wildcard_comb_num):
combinations.append(WildcardComb(**{key: wildcard_values[key][index] for key in wildcard_values}))
return combinations
def _prepare_inpathpattern(self):
""" read and remove wildcard constraints from in_path_pattern """
wildcards = re.findall("{([^{}]+)}", self.in_path_pattern)
wildcard_constraints = {}
for wildcard in wildcards:
comp = wildcard.split(",")
if len(comp) > 1:
wildcard_constraints[comp[0]] = comp[1] + "|" + self.wildcard_fix_values[comp[0]]
self.in_path_pattern = self.in_path_pattern.replace(wildcard, comp[0])
return wildcard_constraints
def _get_wildcard_fix_values(self, fix):
inv = ["!", "-"] # invert selection if these characters precede wildcard name
if isinstance(fix, str):
if fix == "all":
return self.wildcard_fix_values
else:
fix = [fix]
if isinstance(fix, list):
include = set([w for w in fix if w[0] not in inv])
exclude = set([w[1:] for w in fix if w[0] in inv])
if exclude and not include:
include = set(self.wildcard_fix_values) # all wildcards
for wcd in include | exclude:
if wcd not in self.allowed_wildcards:
raise ValueError(f"Unknown wildcard: '{wcd}'!")
use_wcd = include - exclude
return {w: self.wildcard_fix_values[w] for w in use_wcd}
else:
raise ValueError(f"Wrong argument for 'fix': {fix}.")
def _get_wildcard_values_from_input(self, input_pattern, unix_style=True, verbose=False):
""" go through files in input path and get values matching the wildcards """
glob_pattern = re.sub("{[^}./]+}", "*", input_pattern)
wildcards = re.findall("{([^}./]+)}", input_pattern)
input_files = glob(glob_pattern + ("*" if glob_pattern[-1] != "*" else ""), recursive=True)
match_tup = self._get_match_pattern_and_wildcards(input_pattern, unix_style)
if verbose:
print("\ninput files:\n{}".format("\n".join(input_files)))
print(f"\nmatch pattern:\n{match_tup[0]}")
wildcard_values = {w: [] for w in wildcards}
for inp in input_files:
self._get_wildcard_values_from_file_path(inp, match_tup=match_tup, wildc_val=wildcard_values)
return wildcard_values
def _wildc_replace(self, matchobj):
""" method used with re.sub to generate match pattern from path pattern """
wildc_name = matchobj.group(1)
if wildc_name in self.wildcard_constraints:
return "({})".format(self.wildcard_constraints[wildc_name].replace("//", "/"))
elif wildc_name == "extension":
return "([^}/]+)"
else:
return "([^}./]+)"
def _get_wildcard_values_from_file_path(
self, filename, input_pattern=None, match_tup=None, wildc_val=None, unix_style=True, verbose=False
):
"""
get values matching wildcards from given file path
:param match_tup: tuple of (match_pattern, wildcards)
"""
if not input_pattern and not match_tup:
raise ValueError("Need either 'input_pattern' or 'match_tup' as argument.")
if match_tup:
match_pattern, wildcards = match_tup
else:
match_pattern, wildcards = self._get_match_pattern_and_wildcards(input_pattern, unix_style)
if verbose:
print(f"\nmatch pattern:\n{match_pattern}")
wildcard_values = wildc_val if wildc_val else {w: [] for w in wildcards}
match_obj = re.match(match_pattern, filename)
if not match_obj:
warn(
f"Skipping file in working dir: '{filename}', because not matching match pattern: '{match_pattern}' ..will not be tracked.")
found = False
else:
matches = match_obj.groups()
assert len(matches) == len(wildcards)
seen = set()
for index, wildc in enumerate(wildcards):
if not wildc in seen:
wildcard_values[wildc].append(matches[index])
seen.add(wildc)
found = True
return found, wildcard_values
def _get_match_pattern_and_wildcards(self, input_pattern, unix_style):
match_pattern = re.sub("\\\\{([^}./]+)\\\\}", self._wildc_replace, re.escape(input_pattern))
if unix_style:
match_pattern = re.sub(r"\\\*\\\*", "[^{}]*", match_pattern)
match_pattern = re.sub(r"(?<!\[\^{}\]\*)\\\*", "[^{}./]*", match_pattern)
wildcards = re.findall("{([^}./]+)}", input_pattern)
return match_pattern, wildcards
def _collect_generated_files(self, path_pattern=None):
if not path_pattern: path_pattern = self.out_path_pattern
input_files = iglob(re.sub("{[^}./]+}", "*", path_pattern))
wildcards = re.findall("{([^}./]+)}", path_pattern)
table_cols = {w: [] for w in wildcards}
table_cols["filename"] = []
for inp in input_files:
found, _ = self._get_wildcard_values_from_file_path(inp, input_pattern=path_pattern, wildc_val=table_cols)
if found:
table_cols["filename"].append(inp)
assert all([len(val) == len(table_cols["filename"]) for val in table_cols.values()])
return table_cols
def _choose_input(self, wildcards, choice_name, options, func):
""" called by wrapper choose_input() """
if wildcards and hasattr(wildcards, choice_name):
input_from = getattr(wildcards, choice_name)
elif choice_name in self.input_choice:
inp_choice = self.input_choice[choice_name]
if isinstance(inp_choice, str):
input_from = inp_choice
elif isinstance(inp_choice, list):
input_from = inp_choice[0]
else:
raise TypeError(
"Error choosing input: {} is not a valid type for input choice! "
"(input_choice in config for {})".format(type(inp_choice), choice_name)
)
else:
raise KeyError(
"Error choosing input: no wildcard '{}' passed and no input choice for '{}' "
"specified in config!".format(choice_name, choice_name)
)
for inp in options:
inp = {**dict(wildcards.items()), **inp}
if input_from == inp["step"]:
if func == "file_path":
return self.file_path(**inp)
elif func == "out_dir_name":
return self.out_dir_name(**inp)
elif callable(func):
return func(**inp)
else:
raise ValueError("Error choosing input: wrong func argument: {}".format(str(func)))
raise ValueError(
"Error choosing input: no valid mapping input type specified! (got: {})".format(input_from)
)
def load_config_from_path(self, path, path_handler=None):
if not path_handler:
path_handler = self
config_file = path_handler.file_path("pipeline_report", "yaml", path_pattern=path, fix="all")
with open(config_file, "r") as stream:
try:
config_dict = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
return config_dict
#-------------------------------------------- methods used in snakemake file --------------------------------------------#
def choose_input(self, choice_name, options, func="file_path"):
"""
choose rule to import from based on wildcard or config
One option is that a wildcard with name choice_name is passed, giving the name of a rule that should be imported from.
This is useful, when the pipeline should compute results for both inputs in one run. Then the wildcard in the ouput file
paths distinguishes these runs.
The other option is that the choice of input is specified in the config file (under pipeline_params.input_choice).
This is useful, if only one of the possible inputs should be used in one run.
All parameters for both options should be provided and the behaviour of the pipeline will depend on, whether a corresponding
wildcard was provided in the output path pattern, computing results for all or just one input.
In the config file under pipeline_params.input_choice for each choice_name a list of input rules can be specified. In the wildcard
mode (when a respective wildcard was provided in the output path pattern) expansion will happen over the whole list to compute
results. In the config mode (when the wildcard was not included) only the first element of the list will be used.
:param options: wildcard arguments to generate different inputs (list of dicts)
:param choice_name: name of the wildcard (string) used for the decision
:param wildcards: wildcards object from snakemake
:returns: a function that is used by snakemake to obtain the input file path
"""
return lambda wildcards: self._choose_input(wildcards, choice_name, options, func=func)
def wildcard_values_from(self, filepath, in_path_pattern=True):
"""
Parse a file path and return a dict of wildcard names to values.
:param filepath: a file path (string)
:param in_path_pattern: use in_path_pattern to extract wildcards, if False use out_path_pattern
:returns: a dict of wildcard names to values
"""
if in_path_pattern:
return self._get_wildcard_values_from_file_path(filepath, input_pattern=self.in_path_pattern)[1]
else:
return self._get_wildcard_values_from_file_path(filepath, input_pattern=self.out_path_pattern)[1]
@staticmethod
def get_r_repr(data, to_type=None, round_float=None):
"""
Translate python data structure into R representation for vector (i.e. c(...)) or list (i.e. list(...)).
If to_type==None, choose c() or list() automatically. Named vector/list is created if data is of type
Mapping (e.g. a dictionary).
:param data: data to convert into R representation
:param to_type: "vector" or "list"; if None, choose automatically
:param round_float: round floats to N digits; if None do not round
:returns: a string with R representation of data
"""
if not isinstance(data, Iterable):
# single value
if isinstance(data, bool):
return "TRUE" if data else "FALSE"
elif isinstance(data, (int, float)):
return str(round(data, round_float) if round_float else data)
elif data is None:
return "NULL"
elif isinstance(data, str):
# string
return '"' + data + '"'
else:
# other Iterable
data_keys, data_values = PipelinePathHandler._get_names_values(data)
# auto-determine target type to_type
same_type = all(isinstance(i, type(data_values[0])) for i in data_values)
no_iter_inside = not isinstance(data_values[0], Iterable) or isinstance(data_values[0],
str) if data_values else True
use_type = "vector" if same_type and no_iter_inside else "list"
if to_type:
if to_type == "vector" and to_type != use_type:
warn("Cannot use 'vector' for all Iterables, using 'list'!")
else:
use_type = to_type
# set substitute string
if use_type == "vector":
subs = "c({})"
elif use_type == "list":
subs = "list({})"
else:
raise ValueError(f"Wrong value {use_type} for to_type!")
# fill values
data_values = (
PipelinePathHandler.get_r_repr(value, round_float=round_float, to_type=to_type)
for value in data_values
)
if data_keys:
return subs.format(", ".join(f'"{k}"={v}' for k, v in zip(data_keys, data_values)))
else:
return subs.format(", ".join(data_values))
raise TypeError(f"No R representation set for {type(data)}!")
def file_path(self, *args, **kwargs):
pass
def out_dir_name(self, *args, **kwargs):
pass
def expand_path(self, *args, **kwargs):
pass
def log(self, out_log, script, step, extension, fix=None, **kwargs):
"""
create various log files and return a path to the script file for execution
:param out_log: the main log file of rule 'step' that will contain output and error messages
:param script: the script with wildcards filled by snakemake
:param step: the rule for which logs shall be generated
:param extension: the extension of the script file, e.g. '.R' or '.sh'
:param fix: fix some wildcards; e.g. fix=["sample"] has the same effect as passing sample="all_samples", fix="all" fixes all wildcards
:returns: path to a file containing the script
"""
script_file = self.file_path(step, extension, log=True, fix=fix, **kwargs)
config_yaml = self.file_path(step, "config.yaml", log=True, fix=fix, **kwargs)
conda_list = self.file_path(step, "conda_list.txt", log=True, fix=fix, **kwargs)
conda_info = self.file_path(step, "conda_info.txt", log=True, fix=fix, **kwargs)
conda_env = self.file_path(step, "conda_env.yaml", log=True, fix=fix, **kwargs)
# write script to file
with open(script_file, "w") as f: f.write(script)
# dump merged config
with open(config_yaml, "w") as f: yaml.dump(self.snakemake_workflow.config, f, default_flow_style=False)
# write conda logs
os.system("conda list > {}".format(conda_list))
os.system("conda info > {}".format(conda_info))
os.system("conda env export > {}".format(conda_env))
# git and snakefile info to out_log
git_dir = self.snakemake_workflow.basedir
out_log_abs = str(Path(out_log).resolve())
os.system('echo "--------------- git info ---------------" > {}'.format(out_log))
os.system('cd {}; echo "name:" $(git rev-parse --show-toplevel) >> {}'.format(git_dir, out_log_abs))
os.system('cd {}; echo "branch:" $(git symbolic-ref HEAD) >> {}'.format(git_dir, out_log_abs))
os.system('cd {}; echo "hash:" $(git rev-parse HEAD) >> {}'.format(git_dir, out_log_abs))
os.system('cd {}; {{ echo "status:"; git status -sb; }} >> {}'.format(git_dir, out_log_abs))
os.system('echo "--------------- snakefile --------------" >> {}'.format(out_log))
os.system('echo "snakefile name: {}" >> {}'.format(self.snakemake_workflow.snakefile, out_log))
os.system('echo "snakefile md5 : {}" >> {}'.format(self._md5(self.snakemake_workflow.snakefile), out_log))
os.system('echo "----------------------------------------" >> "{l}"; echo >> "{l}"'.format(l=out_log))
return script_file
def export(self, config_key="export"):
"""
export selected results into a separate folder structure (as configured in config file)
"""
export_spec = self.snakemake_workflow.config[config_key]
blueprint = export_spec["blueprint"]
none_context = contextmanager(lambda: iter([None]))()
with (open(blueprint["file"], "w") if blueprint and blueprint["file"] else none_context) as bp_out:
for pattern in export_spec["path_pattern"]:
# --- go through path patterns (that specify which location to copy to)
pat = strftime(pattern).replace(
"{GENOME}",
self.snakemake_workflow.config["organism"]["genome_version"]
)
wildcards = self._get_wildcard_list(pat)
key_wcs = [wc for wc in wildcards if wc[:6] == "files:"]
assert len(key_wcs) == 1 # no nested keys for now
key = key_wcs[0].split(":")[1:]
assert len(key) > 0
pat = pat.replace("{{{}}}".format(key_wcs[0]), key[0])
# --- read specification for fetching files
for opt_dct in export_spec["_".join(key)]:
# --- read options
if "files" in opt_dct:
mode = "files"
elif "dir" in opt_dct:
mode = "dir"
else:
raise ValueError("Error in export: no mode (files or dir) specfied in config!")
compress = opt_dct["compress"] if "compress" in opt_dct else None
compress_list = opt_dct["compress_list"] if "compress_list" in opt_dct else []
exclude = opt_dct["exclude"] if "exclude" in opt_dct else []
tar_excl = " ".join(f'--exclude="{i}"' for i in exclude)
arg_dct = opt_dct[mode]
suffix = opt_dct["suffix"] if "suffix" in opt_dct else None
# --- extract files
extra_wcs = set(wildcards) - set(key_wcs) - (set(arg_dct) & set(wildcards))
assert len(extra_wcs) <= 1 # only either 'sample' (mapping) or 'contrast' (DE) for now
if extra_wcs:
extra_wc = list(extra_wcs)[0]
wc_in_dct = {k: v for k, v in arg_dct.items() if k in wildcards}
search_pat = self.out_path_pattern if "log" not in arg_dct else self.log_path_pattern
if mode == "files":
sourcef = self.expand_path(**arg_dct)
else:
sourcef = self.file_path(**{**arg_dct, "extension": "{extension}"})
if compress and not compress_list:
list_dir = Path(sourcef).parent / ""
if suffix: list_dir = list_dir / suffix
sourcef = glob(str(list_dir).replace(f"{{{extra_wc}}}", "*"))
else:
source_tmp = []
list_dir = Path(sourcef).parent
if suffix: list_dir = list_dir / suffix
dirs = glob(str(list_dir).replace(f"{{{extra_wc}}}", "*"))
for d in dirs:
if Path(d).is_file():
source_tmp.append(str(d))
else:
for fp in Path(d).iterdir():
f = str(fp)
if os.path.isdir(f) and str(fp.name) not in compress_list:
source_tmp.extend(
glob(str(Path(f) / "**"), recursive=True)
)
# elif fp.with_suffix("").stem not in compress_list or os.path.isdir(f):
else:
source_tmp.append(f)
sourcef = source_tmp
search_pat = str(Path(search_pat).parent)
get_wc = self._get_wildcard_values_from_file_path
target = [
pat.format(**{**wc_in_dct, extra_wc: get_wc(src, input_pattern=search_pat)[1][extra_wc][0]})
for src in sourcef
]
else:
sourcef = [self.file_path(**arg_dct)]
target = [pat.format(**{k: v for k, v in arg_dct.items() if k in wildcards})]
# --- copy files or write blueprint
assert len(sourcef) == len(target)
for i in range(len(sourcef)):
if mode == "dir" and (not compress or compress_list):
target[i] = str(Path(target[i]) / Path(sourcef[i]).name)
f_src, f_trg = Path(sourcef[i]).resolve(), Path(target[i])
if f_src.exists() and f_src.suffix != ".md5":
print("\n...copy {} to {} ...\n".format(sourcef[i], target[i]))
if not compress_list or f_src.name in compress_list:
if compress == "zip":
to_zip = str(f_src.with_suffix('.zip'))
if not Path(to_zip).exists() or self._is_older(to_zip, f_src):
print(f"compression: create {to_zip} ...")
os.system(f"cd {str(f_src.parent)}; zip -r {to_zip} {str(f_src.name)}")
sourcef[i], f_src = to_zip, Path(to_zip)
if mode == "dir" and compress_list:
zip_trg = f_trg.with_suffix(f_trg.suffix + '.zip')
target[i], f_trg = str(zip_trg), zip_trg
elif compress == "tar":
to_tar = str(f_src.with_suffix('.tar.gz'))
if not Path(to_tar).exists() or self._is_older(to_tar, f_src):
print(f"compression: create {to_tar} ...")
if extra_wcs:
print(extra_wc)
extra_wc_val = self._get_wildcard_values_from_file_path(
sourcef[i],
input_pattern=os.path.join(self.out_dir_pattern, "**"),
unix_style=True
)[1]
print(extra_wc_val)
tar_excl_upd = tar_excl.replace(f"{{{extra_wc}}}", extra_wc_val[extra_wc][0])
else:
tar_excl_upd = tar_excl
os.system(f"cd {str(f_src.parent)}; tar -czf {to_tar} {tar_excl_upd} {str(f_src.name)}")
sourcef[i], f_src = to_tar, Path(to_tar)
if mode == "dir" and compress_list:
tar_trg = f_trg.with_suffix(f_trg.suffix + '.tar.gz')
target[i], f_trg = str(tar_trg), tar_trg
if bp_out:
print(blueprint["command"].format(
src=f_src,
dest=target[i]
), file=bp_out)
# -- create md5 sum
md5_path = Path(sourcef[i] + ".md5")
if not md5_path.exists() or self._is_older(md5_path, sourcef[i]):
md5_path.write_text(self._md5(sourcef[i]))
print(blueprint["command"].format(
src=md5_path.resolve(),
dest=f_trg.with_suffix(f_trg.suffix + ".md5")
), file=bp_out)
else:
Path(target[i]).parent.mkdir(exist_ok=True, parents=True)
shutil.copy2(sourcef[i], target[i])
Path(target[i] + ".md5").write_text(self._md5(target[i]))
elif not f_src.exists():
warn(f"Source file {str(f_src)} does not exist!")
##################################################################################################################################
# ----------------------------------------------- child class for mapping pipline ------------------------------------------------#
##################################################################################################################################
class MappingPipelinePathHandler(PipelinePathHandler):
""" path handler for mapping pipeline """
allowed_wildcards = ["step", "extension", "sample", "mate", "batch", "flowcell", "lane", "library"]
required_wildcards_out_log = ["step", "extension", "sample"]
required_wildcards_in = ["sample"]
wildcard_fix_values = dict(sample="all_samples", mate="all_mates", batch="all_batches", flowcell="all_flowcells",
lane="all_lanes", library="all_libraries")
def __init__(self, workflow, test_config=False, **kwargs):
super().__init__(workflow, test_config, **kwargs)
self.samples = self.snakemake_workflow.config["sample_info"]
self.sample_ids = list(self.samples.keys())
self._test_config_samples()
# wildcard value-combinations parsed from input directory
self.wildcard_combs = self._get_wildcard_combinations_per_sample(self.wildcard_values)
# paths to static files
self.data_paths = self.snakemake_workflow.config["organism"]
# test if all sample IDs can be found in input path
if test_config and not set(self.sample_ids).issubset(set(self.wildcard_values["sample"])):
raise ValueError("Error in config file: not all specified samples could be found using given input path pattern")
# write log with parsed values to file for debugging
self._write_log()
#---------------------------------------------------- helper methods ----------------------------------------------------#
def _test_config_input(self, test_allowed_wildcards):
super()._test_config_input(test_allowed_wildcards)
# test whether wildcards used consistently across patterns
if not set(self.out_path_wildcards) == set(self.log_path_wildcards) == set(self.in_path_wildcards) | set(
self.required_wildcards_out_log) - set(self.required_wildcards_in):
raise ValueError(
"Error in config file: out_path_pattern, log_path_pattern and in_path_pattern "
"do not contain the same wildcards. out: {}, log: {}, in: {}".format(
set(self.out_path_wildcards),
set(self.log_path_wildcards),
set(self.in_path_wildcards)
)
)
# test if all wildcards used in outdir path pattern
if not set(self.out_path_wildcards) == set(self.outdir_path_wildcards) | {"extension"}:
raise ValueError(
"Error in config file: all wildcards of out and log dir should be used in folder names (exception: {{extension}}), otherwise "
"different rules might compute output in the same folder, which can lead to mixed or deleted intermediate files. "
"in folder name: {}, all: {}".format(set(self.outdir_path_wildcards), set(self.out_path_wildcards))
)
def _test_config_samples(self):
# test if mate wildcard used if multiple read extensions are used
if all([
any(len(val["paired_end_extensions"]) > 1 for _, val in self.samples.items()),
"mate" not in self.in_path_wildcards,
]
):
raise ValueError(
"Error in config file: some samples have more than one 'paired end extension' "
"set in the sample info file, but {mate} wildcard in the in_path_pattern is not set. "
)
def _get_wildcard_combinations_per_sample(self, wildcard_values):
""" go through wildcard values and get combinations per sample """
wildcard_comb_num = len(wildcard_values["sample"])
assert all([len(val) == wildcard_comb_num for val in wildcard_values.values()])
WildcardComb = namedtuple("WildcardComb", [s for s in wildcard_values])
per_sample_comb = {s: [] for s in wildcard_values["sample"]}
for index in range(wildcard_comb_num):
sample = wildcard_values["sample"][index]
per_sample_comb[sample].append(
WildcardComb(**{key: wildcard_values[key][index] for key in wildcard_values}))
return per_sample_comb
def _write_log(self, **kwargs):
filename = self.file_path(step="MappingPipelinePathHandler", extension="log", log=True, fix="all", **kwargs)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as f:
f.write(
"pattern input:\n in path pattern: {}\n out path pattern: {}\n log path pattern: {}\n\n"
"parsed wildcards:\n in path wildcards: {}\n out/log path wildcards: {}\n out dir wildcards: {}\n\n"
"samples:\n sample IDs: {}\n sample info: {}\n\n"
"wildcard values:\n per wildcard: {}\n combinations: {}\n\n"
"neutral replacement of optional wildcards:\n out/log: {}\n out dir: {}".format(
self.in_path_pattern, self.out_path_pattern, self.log_path_pattern,
self.in_path_wildcards, self.out_path_wildcards, self.outdir_path_wildcards, self.sample_ids,
self.samples, self.wildcard_values, self.wildcard_combs, self.opt_wildcard_placeholders,
self.opt_wildc_placeh_outdir
)
)
#-------------------------------------------- methods used in snakemake file --------------------------------------------#
def get_fastq_pairs(self, wildcards, mate=0, mate_key=""):
"""
Generate paths to one or more .fastq input files for a given name extension (e.g. paired end extension).
:param wildcards: rule wildcards
:param mate: index of extension type to return (based on mate_key)
:param mate_key: key to config dict where different extensions to a sample ID are defined (e.g. paired end extensions)
:returns: a list with paths to specified input files, [] if extension at given index (mate) does not exist
"""
kwargs_out = {key: getattr(wildcards, key, val) for key, val in self.opt_wildcard_placeholders.items()}
pattern_list = []
seen = set()
for comb in self.wildcard_combs[wildcards.sample]:
# TODO: case of ignored wildcards? meant for e.g. allFlowcell
kwargs_filled = {key: getattr(comb, key) if ("{" in val or val not in self.wildcard_values[
key]) and key != "mate" else val for key, val in kwargs_out.items()}
kwargs_id_tup = tuple(kwargs_filled[key] for key in sorted(kwargs_filled))
if kwargs_id_tup not in seen:
seen.add(kwargs_id_tup)
if mate_key:
mate_lst = self.samples[wildcards.sample][mate_key]
kwargs_filled["mate"] = mate_lst[mate] if len(mate_lst) > mate else mate_lst[0]
pattern = self.in_path_pattern.format(sample = wildcards.sample, **kwargs_filled)
pattern += self.samples[wildcards.sample]["read_extension"]
else:
if "mate" not in kwargs_filled or mate == "*": kwargs_filled["mate"] = "*"
pattern = self.in_path_pattern.format(sample=wildcards.sample, **kwargs_filled) + \
self.samples[wildcards.sample]["read_extension"]
pattern_list.append(pattern)
paths = [path for pat in pattern_list for path in iglob(pat, recursive=True)]
paths.sort()
return paths
def file_path(self, step, extension, sample="{sample}", log=False, fix=None, path_pattern=None, **kwargs):
"""
Generate single path for intermediate and output or log files.
:param step: Snakemake rule for which the paths are generated
:param extension: file extension for the generated file path
:param sample: sample ID to be included in the file path
:param log: generate path to logfile if log==True otherwise generate path to output/intermediate file
:param fix: fix some wildcards; e.g. fix=["sample"] has the same effect as passing sample="all_samples", fix="all" fixes all wildcards
:param path_pattern: explicitly set the path pattern (string)
:param **kwargs: if used specify replacement for {batch}, {flowcell}, {lane}, etc. ...
"""
if fix:
fixed_wildcards = self._get_wildcard_fix_values(fix)
if sample == "{sample}" and "sample" in fixed_wildcards:
sample = fixed_wildcards["sample"]
kwargs = {**{k: v for k, v in fixed_wildcards.items() if k != "sample"}, **kwargs}
if not path_pattern: path_pattern = self.log_path_pattern if log else self.out_path_pattern
kwargs_out = {key: kwargs[key] if key in kwargs else val for key, val in self.opt_wildcard_placeholders.items()}
return path_pattern.format(step=step, extension=extension, sample=sample, **kwargs_out)
def out_dir_name(self, step, sample="{sample}", fix=None, **kwargs):
"""
Generate single path to intermediate and output file directory.
:param step: Snakemake rule for which the paths are generated
:param sample: sample ID to be included in the file path
:param fix: fix some wildcards; e.g. fix=["sample"] has the same effect as passing sample="all_samples", fix="all" fixes all wildcards
"""
if fix:
fixed_wildcards = self._get_wildcard_fix_values(fix)
if sample == "{sample}" and "sample" in fixed_wildcards:
sample = fixed_wildcards["sample"]
kwargs = {**{k: v for k, v in fixed_wildcards.items() if k != "sample"}, **kwargs}
kwargs_out = {key: kwargs[key] if key in kwargs else val for key, val in self.opt_wildc_placeh_outdir.items()}
return self.out_dir_pattern.format(step=step, sample=sample, **kwargs_out)
def expand_path(self, step, extension="", fix=None, **kwargs):
"""
Generate multiple paths for intermediate and output files corresponding to different sample IDs (and e.g. paired end extensions).
Always generates paths over all sample IDs and their combinations with keyword arguments (kwargs) that were not
defined like, e.g. flowcell="all_flowcells", where "all_flowcells" is an arbitrary string that will be filled in the generated paths
instead of the real flowcell IDs. Only keyword arguments that correspond to wildcards in the config path pattern (with the same name) are used.
:param step: Snakemake rule name (string) for which the paths are generated
:param extension: file extension for the generated file path; '' to generate paths to directories
:param fix: fix some wildcards; e.g. fix=["sample"] has the same effect as passing sample="all_samples", fix="all" fixes all wildcards
:param **kwargs: replacement strings for optional wildcards, e.g. batch, flowcell, lane (see description)
:returns: list of paths
"""
if fix:
fixed_wildcards = self._get_wildcard_fix_values(fix)
kwargs = {**{k: v for k, v in fixed_wildcards.items() if k != "sample"}, **kwargs}
kwargs_out = {key: kwargs[key] if key in kwargs else val for key, val in self.opt_wildcard_placeholders.items()}
paths = []
for sample in self.sample_ids:
seen = set()
for comb in self.wildcard_combs[sample]:
kwargs_filled = {key: getattr(comb, key) if "{" in val else val for key, val in kwargs_out.items()}
kwargs_id_tup = tuple(kwargs_filled[key] for key in sorted(kwargs_filled))
if kwargs_id_tup not in seen:
seen.add(kwargs_id_tup)
if extension:
out_path_pattern = self.file_path(step, extension, sample=sample, **kwargs_filled)
else:
out_path_pattern = self.out_dir_name(step, sample=sample, **kwargs_filled)
paths.append(out_path_pattern)
return paths
def link_index(self, step, sample="{sample}", entry="nopath", subdir="", add_done=False, fix=None, **kwargs):
"""
Generate symbolic link to index folder (if provided in config).
A key-value pair is assumed to be in the organism config file with the name of the step (e.g. "star_index") as key
and a path to a corresponding index/input file. If no index should be linked, keep the key in the config file,
but leave the value empty.
:param step: Snakemake rule for which the paths are generated
:param sample: sample ID to be included in the file path
:param entry: set the path to be linked explicitly
:param fix: fix some wildcards; e.g. fix=["sample"] has the same effect as passing sample="all_samples", fix="all" fixes all wildcards
:param **kwargs: if used specify replacement for {batch}, {flowcell}, {lane}, etc. ...
"""
if fix:
fixed_wildcards = self._get_wildcard_fix_values(fix)
if sample == "{sample}" and "sample" in fixed_wildcards:
sample = fixed_wildcards["sample"]
kwargs = {**{k: v for k, v in fixed_wildcards.items() if k != "sample"}, **kwargs}
loc = Path(self.out_dir_name(step, sample, **kwargs)) / subdir
if entry != "nopath":
index = entry
elif step in self.data_paths:
index = self.data_paths[step]
else:
raise ValueError("Error linking index: no keyword provided in config to set index for {}!".format(step))
if index:
ind = Path(index)
loc.parent.mkdir(exist_ok=True, parents=True)
if not loc.exists():
# create
loc.symlink_to(ind.resolve())
if add_done:
Path(self.file_path(step, extension="done", sample=sample, **kwargs)).touch()
elif not loc.samefile(index):
# update
loc.unlink()
loc.symlink_to(ind.resolve())
if add_done:
Path(self.file_path(step, extension="done", sample=sample, **kwargs)).touch()
def log_generated_files(self, save_to="", path_pattern=None, **kwargs):
"""
Scan all generated output/intermediate files and list them in a file with their corresponding wildcard values.
This file can be used e.g. by the pipeline Rmd report to identify paths to files without the handler.
"""
table_cols = self._collect_generated_files(path_pattern=path_pattern)
table = pd.DataFrame(table_cols)
if not save_to:
save_to = self.file_path(
step="MappingPipelinePathHandler", extension="tsv", fix="all", path_pattern=path_pattern, **kwargs
)
table.to_csv(save_to, sep="\t", index=False)
##################################################################################################################################
# ------------------------------------------------- child class for DE pipline ---------------------------------------------------#
##################################################################################################################################
class DEPipelinePathHandler(PipelinePathHandler):
""" path handler for differential expression pipeline """
allowed_wildcards = ["step", "extension", "sample", "mate", "batch", "flowcell", "lane", "contrast", "mapping",
"library"]
required_wildcards_out_log = ["step", "extension", "contrast"]
required_wildcards_in = ["step", "extension", "sample"]
wildcard_fix_values = dict(contrast="all")
def __init__(self, workflow, test_config=False, **kwargs):