diff --git a/snappy_pipeline/workflows/somatic_variant_calling/Snakefile b/snappy_pipeline/workflows/somatic_variant_calling/Snakefile index ce83c1937..5a8bfb139 100644 --- a/snappy_pipeline/workflows/somatic_variant_calling/Snakefile +++ b/snappy_pipeline/workflows/somatic_variant_calling/Snakefile @@ -72,58 +72,57 @@ rule somatic_variant_calling_mutect_run: # Run MuTect 2 ---------------------------------------------------------------- - -rule somatic_variant_calling_mutect2_pileup_normal: - input: - unpack(wf.get_input_files("mutect2", "pileup_normal")), - output: - **wf.get_output_files("mutect2", "pileup_normal"), - threads: wf.get_resource("mutect2", "pileup_normal", "threads") - resources: - time=wf.get_resource("mutect2", "pileup_normal", "time"), - memory=wf.get_resource("mutect2", "pileup_normal", "memory"), - partition=wf.get_resource("mutect2", "pileup_normal", "partition"), - tmpdir=wf.get_resource("mutect2", "pileup_normal", "tmpdir"), - log: - **wf.get_log_file("mutect2", "pileup_normal"), - params: - normal_lib_name=wf.substep_getattr("mutect2", "get_normal_lib_name"), - wrapper: - wf.wrapper_path("mutect2/pileup") - - -rule somatic_variant_calling_mutect2_pileup_tumor: - input: - unpack(wf.get_input_files("mutect2", "pileup_tumor")), - output: - **wf.get_output_files("mutect2", "pileup_tumor"), - threads: wf.get_resource("mutect2", "pileup_tumor", "threads") - resources: - time=wf.get_resource("mutect2", "pileup_tumor", "time"), - memory=wf.get_resource("mutect2", "pileup_tumor", "memory"), - partition=wf.get_resource("mutect2", "pileup_tumor", "partition"), - tmpdir=wf.get_resource("mutect2", "pileup_tumor", "tmpdir"), - log: - **wf.get_log_file("mutect2", "pileup_tumor"), - wrapper: - wf.wrapper_path("mutect2/pileup") - - -rule somatic_variant_calling_mutect2_contamination: - input: - unpack(wf.get_input_files("mutect2", "contamination")), - output: - **wf.get_output_files("mutect2", "contamination"), - threads: wf.get_resource("mutect2", "contamination", "threads") - resources: - time=wf.get_resource("mutect2", "contamination", "time"), - memory=wf.get_resource("mutect2", "contamination", "memory"), - partition=wf.get_resource("mutect2", "contamination", "partition"), - tmpdir=wf.get_resource("mutect2", "contamination", "tmpdir"), - log: - **wf.get_log_file("mutect2", "contamination"), - wrapper: - wf.wrapper_path("mutect2/contamination") +if config["step_config"]["somatic_variant_calling"]["mutect2"]["common_variants"]: + + rule somatic_variant_calling_mutect2_pileup_normal: + input: + unpack(wf.get_input_files("mutect2", "pileup_normal")), + output: + **wf.get_output_files("mutect2", "pileup_normal"), + threads: wf.get_resource("mutect2", "pileup_normal", "threads") + resources: + time=wf.get_resource("mutect2", "pileup_normal", "time"), + memory=wf.get_resource("mutect2", "pileup_normal", "memory"), + partition=wf.get_resource("mutect2", "pileup_normal", "partition"), + tmpdir=wf.get_resource("mutect2", "pileup_normal", "tmpdir"), + log: + **wf.get_log_file("mutect2", "pileup_normal"), + params: + normal_lib_name=wf.substep_getattr("mutect2", "get_normal_lib_name"), + wrapper: + wf.wrapper_path("mutect2/pileup") + + rule somatic_variant_calling_mutect2_pileup_tumor: + input: + unpack(wf.get_input_files("mutect2", "pileup_tumor")), + output: + **wf.get_output_files("mutect2", "pileup_tumor"), + threads: wf.get_resource("mutect2", "pileup_tumor", "threads") + resources: + time=wf.get_resource("mutect2", "pileup_tumor", "time"), + memory=wf.get_resource("mutect2", "pileup_tumor", "memory"), + partition=wf.get_resource("mutect2", "pileup_tumor", "partition"), + tmpdir=wf.get_resource("mutect2", "pileup_tumor", "tmpdir"), + log: + **wf.get_log_file("mutect2", "pileup_tumor"), + wrapper: + wf.wrapper_path("mutect2/pileup") + + rule somatic_variant_calling_mutect2_contamination: + input: + unpack(wf.get_input_files("mutect2", "contamination")), + output: + **wf.get_output_files("mutect2", "contamination"), + threads: wf.get_resource("mutect2", "contamination", "threads") + resources: + time=wf.get_resource("mutect2", "contamination", "time"), + memory=wf.get_resource("mutect2", "contamination", "memory"), + partition=wf.get_resource("mutect2", "contamination", "partition"), + tmpdir=wf.get_resource("mutect2", "contamination", "tmpdir"), + log: + **wf.get_log_file("mutect2", "contamination"), + wrapper: + wf.wrapper_path("mutect2/contamination") rule somatic_variant_calling_mutect2_run: diff --git a/snappy_pipeline/workflows/somatic_variant_calling/__init__.py b/snappy_pipeline/workflows/somatic_variant_calling/__init__.py index 89fd8032f..92b5bed42 100644 --- a/snappy_pipeline/workflows/somatic_variant_calling/__init__.py +++ b/snappy_pipeline/workflows/somatic_variant_calling/__init__.py @@ -102,7 +102,7 @@ #: Names of the files to create for the extension EXT_NAMES = ("vcf", "vcf_tbi", "vcf_md5", "vcf_tbi_md5") -EXTS_MATCHED = { +EXT_MATCHED = { "mutect": { "vcf": ".vcf.gz", "vcf_md5": ".vcf.gz.md5", @@ -127,6 +127,7 @@ "full_vcf_tbi": ".full.vcf.gz.tbi", "full_vcf_tbi_md5": ".full.vcf.gz.tbi.md5", "tar": ".tar.gz", + "tar_md5": ".tar.gz.md5", }, "mutect2": { "vcf": ".vcf.gz", @@ -137,10 +138,6 @@ "full_md5": ".full.vcf.gz.md5", "full_vcf_tbi": ".full.vcf.gz.tbi", "full_vcf_tbi_md5": ".full.vcf.gz.tbi.md5", - "stats": ".vcf.stats", - "stats_md5": ".vcf.stats.md5", - "f1r2": ".f1r2_tar.tar.gz", - "f1r2_md5": ".f1r2_tar.tar.gz.md5", }, } @@ -220,8 +217,15 @@ # Configuration for MuTect 2 mutect2: panel_of_normals: '' # Set path to panel of normals vcf if required - germline_resource: REQUIRED # Germline variants resource (same as panel of normals) - common_variants: REQUIRED # Common germline variants for contamination estimation + germline_resource: '' # Germline variants resource (same as panel of normals) + common_variants: '' # Common germline variants for contamination estimation + extra_arguments: [] # List additional Mutect2 arguments + # Each additional argument xust be in the form: + # "-- " + # For example, to filter reads prior to calling & to + # add annotations to the output vcf: + # - "--read-filter CigarContainsNoNOperator" + # - "--annotation AssemblyComplexity BaseQuality" # Parallelization configuration num_cores: 2 # number of cores to use locally window_length: 50000000 # split input into windows of this size, each triggers a job @@ -440,7 +444,7 @@ def check_config(self): def get_output_files(self, action): output_files = {} - for k, v in EXTS_MATCHED[self.name].items(): + for k, v in EXT_MATCHED[self.name].items(): output_files[k] = self.base_path_out.format(var_caller=self.name, ext=v) return output_files @@ -480,7 +484,7 @@ class Mutect2StepPart(MutectBaseStepPart): name = "mutect2" #: Class available actions - actions = ("run", "filter", "contamination", "pileup_normal", "pileup_tumor") + actions = ["run", "filter"] # "contamination", "pileup_normal", "pileup_tumor") #: Class resource usage dictionary. Key: action (string); Value: resource (ResourceUsage). resource_usage_dict = { @@ -521,6 +525,8 @@ def check_config(self): ("static_data_config", "reference", "path"), "Path to reference FASTA not configured but required for %s" % (self.name,), ) + if self.config[self.name]["common_variants"]: + self.actions.extend(["contamination", "pileup_normal", "pileup_tumor"]) def get_input_files(self, action): """Return input function for Mutect2 rules. @@ -563,8 +569,7 @@ def _get_input_files_run(self, wildcards): "tumor_bai": ngs_mapping(tumor_base_path + ".bam.bai"), } - @staticmethod - def _get_input_files_filter(wildcards): + def _get_input_files_filter(self, wildcards): """Get input files for rule ``filter``. :param wildcards: Snakemake wildcards associated with rule, namely: 'mapper' (e.g., 'bwa') @@ -578,13 +583,15 @@ def _get_input_files_filter(wildcards): **wildcards ) ) - return { + input_files = { "raw": base_path + ".raw.vcf.gz", "stats": base_path + ".raw.vcf.stats", "f1r2": base_path + ".raw.f1r2_tar.tar.gz", - "table": base_path + ".contamination.tbl", - "segments": base_path + ".segments.tbl", } + if "contamination" in self.actions: + input_files["table"] = base_path + ".contamination.tbl" + input_files["segments"] = base_path + ".segments.tbl" + return input_files def _get_input_files_pileup_normal(self, wildcards): """Get input files for rule ``pileup_normal``. @@ -766,6 +773,7 @@ def get_output_files(self, action): somatic_ext_names = expand("full_{name}", name=EXT_NAMES) somatic_ext_values = expand(".full{ext}", ext=EXT_VALUES) result["tar"] = self.base_path_out.format(var_caller="scalpel", ext=".tar.gz") + result["tar_md5"] = self.base_path_out.format(var_caller="scalpel", ext=".tar.gz.md5") result.update( dict( zip( @@ -1130,25 +1138,26 @@ def get_result_files(self): We will process all NGS libraries of all bio samples in all sample sheets. """ name_pattern = "{mapper}.{caller}.{tumor_library.name}" - yield from self._yield_result_files_matched( - os.path.join("output", name_pattern, "out", name_pattern + "{ext}"), - mapper=self.w_config["step_config"]["ngs_mapping"]["tools"]["dna"], - caller=set(self.config["tools"]) & set(SOMATIC_VARIANT_CALLERS_MATCHED), - ext=EXT_VALUES, - ) - yield from self._yield_result_files_matched( - os.path.join("output", name_pattern, "log", name_pattern + "{ext}"), - mapper=self.w_config["step_config"]["ngs_mapping"]["tools"]["dna"], - caller=set(self.config["tools"]) & set(SOMATIC_VARIANT_CALLERS_MATCHED), - ext=( - ".log", - ".log.md5", - ".conda_info.txt", - ".conda_info.txt.md5", - ".conda_list.txt", - ".conda_list.txt.md5", - ), - ) + for caller in set(self.config["tools"]) & set(SOMATIC_VARIANT_CALLERS_MATCHED): + yield from self._yield_result_files_matched( + os.path.join("output", name_pattern, "out", name_pattern + "{ext}"), + mapper=self.w_config["step_config"]["ngs_mapping"]["tools"]["dna"], + caller=caller, + ext=EXT_MATCHED[caller].values() if caller in EXT_MATCHED else EXT_VALUES, + ) + yield from self._yield_result_files_matched( + os.path.join("output", name_pattern, "log", name_pattern + "{ext}"), + mapper=self.w_config["step_config"]["ngs_mapping"]["tools"]["dna"], + caller=caller, + ext=( + ".log", + ".log.md5", + ".conda_info.txt", + ".conda_info.txt.md5", + ".conda_list.txt", + ".conda_list.txt.md5", + ), + ) # Panel of normals # joint calling name_pattern = "{mapper}.{caller}.{donor.name}" diff --git a/snappy_wrappers/wrapper_parallel.py b/snappy_wrappers/wrapper_parallel.py index 4a2ddb02b..00f56837e 100644 --- a/snappy_wrappers/wrapper_parallel.py +++ b/snappy_wrappers/wrapper_parallel.py @@ -1085,3 +1085,429 @@ def construct_parallel_rules(self): wrapper: '{wrapper_prefix}/snappy_wrappers/wrappers/{inner_wrapper}' """ ).format(**vals).lstrip() + + +class ParallelMutect2BaseWrapper(ParallelBaseWrapper): + """Base class for parallel wrapper class + + Extends ParalleleBaseWrapper for: + - Increase of resource usage in case of job failure + - Multiple output files + """ + + merge_block_size = 1000 + + def __init__(self, snakemake): + super().__init__(snakemake) + + def get_all_output(self): + """Return dict with all output""" + return { + key: os.path.join(self.main_cwd, relative_path) + for key, relative_path in dict(self.snakemake.output).items() + } + + def get_all_log(self): + """Return dict with all log""" + return { + key: os.path.join(self.main_cwd, relative_path) + for key, relative_path in dict(self.snakemake.log).items() + } + + def allow_resources_increase(self): + """Returns true if chunks restarted with increased resources upon fail""" + raise NotImplementedError("Override me!") + + def merge_code_level_one(self): + """Code to merge all chunk outputs & logs at level one""" + raise NotImplementedError("Override me!") + + def merge_code_final(self): + """Code to merge all chunk or level one merge outputs & logs""" + raise NotImplementedError("Override me!") + + def construct_preamble(self): + """Return a preamble that redefines resource_chunk_{threads,memory} to + define functions as "scaling up" with the number of attempts. + """ + return textwrap.dedent( + "\n".join( + ParallelMutect2BaseWrapper._construct_preamble(self.allow_resources_increase()) + ).format( + all_output=self.get_all_output(), + chunk_resources_threads=repr(self.job_resources.threads), + chunk_resources_time=repr(self.job_resources.time), + chunk_resources_memory=repr(self.job_resources.memory), + chunk_resources_partition=repr(self.job_resources.partition), + merge_resources_threads=repr(self.merge_resources.threads), + merge_resources_time=repr(self.merge_resources.time), + merge_resources_memory=repr(self.merge_resources.memory), + merge_resources_partition=repr(self.merge_resources.partition), + ) + ) + + @classmethod + def _construct_preamble(cls, allow_resources_increase): + yield r""" + shell.executable("/bin/bash") + shell.prefix("set -ex;") + + configfile: 'config.json' + + localrules: all + """ + + if allow_resources_increase: + yield r""" + def multiply_time(day_time_str, factor): + # Check if time contains day, ex: '1-00:00:00' + if "-" in day_time_str: + arr_ = day_time_str.split("-") + days = int(arr_[0]) + time_str = arr_[1] + else: + days = 0 + time_str = day_time_str + + # Process based on time structure + arr_ = time_str.split(":") + if time_str.count(":") == 2: # hours:minutes:seconds + seconds = int(arr_[0]) * 60 * 60 + int(arr_[1]) * 60 + int(arr_[2]) + elif time_str.count(":") == 1: # minutes:seconds + seconds = int(arr_[0]) * 60 + int(arr_[1]) + elif time_str.count(":") == 0: # minutes + seconds = int(time_str) * 60 + else: + raise ValueError(f"Invalid time: {{day_time_str}}") + # Add days to second + seconds += days * 86400 + + # Apply factor + seconds = int(seconds * factor) + + # Normalise time + (norm_days, remainder) = divmod(seconds, 86400) + (hours, remainder) = divmod(remainder, 3600) + (minutes, seconds) = divmod(remainder, 60) + + # Fill string - example hour '7' -> '07' + h_str = str(hours).zfill(2) + m_str = str(minutes).zfill(2) + s_str = str(seconds).zfill(2) + + return "%d-%s:%s:%s" % (norm_days, h_str, m_str, s_str) + + + def multiply_memory(memory_str, factor): + memory_mb = None + suffixes = ( + ("k", 1e-3), + ("M", 1), + ("G", 1e3), + ("T", 1e6), + ) + for (suffix, mult) in suffixes: + if memory_str.endswith(suffix): + memory_mb = float(memory_str[:-1]) * mult + break + # No match, assume no suffix int + if not memory_mb: + memory_mb = float(memory_str) + return int(memory_mb * factor) + """ + + yield r""" + def resource_chunk_threads(wildcards): + '''Return the number of threads to use for running one chunk.''' + return {chunk_resources_threads} + + def resource_chunk_partition(wildcards): + '''Return the partition to use for running one chunk.''' + return {chunk_resources_partition} + + def resource_merge_threads(wildcards): + '''Return the number of threads to use for running merging.''' + return {merge_resources_threads} + + def resource_merge_memory(wildcards): + '''Return the memory to use for running merging.''' + return {merge_resources_memory} + + def resource_merge_time(wildcards): + '''Return the time to use for running merging.''' + return {merge_resources_time} + + def resource_merge_partition(wildcards): + '''Return the partition to use for running merging.''' + return {merge_resources_partition} + """ + + if allow_resources_increase: + yield r""" + def resource_chunk_memory(wildcards, attempt): + '''Return the memory to use for running one chunk.''' + return multiply_memory({chunk_resources_memory}, attempt) + + def resource_chunk_time(wildcards, attempt): + '''Return the time to use for running one chunk.''' + return multiply_time({chunk_resources_time}, attempt) + """ + else: + yield r""" + def resource_chunk_memory(wildcards): + '''Return the memory to use for running one chunk.''' + return {chunk_resources_memory} + + def resource_chunk_time(wildcards): + '''Return the time to use for running one chunk.''' + return {chunk_resources_time}) + """ + + yield r""" + rule all: + input: **{all_output} + """ + + def construct_parallel_rules(self): + """Construct the rules for parallel processing to generate.""" + for jobno, region in enumerate(self.get_regions()): + params = dict(self.snakemake.params) + params.setdefault("args", {}).update({"intervals": [region.human_readable()]}) + output = { + key: "job_out.{jobno}.d/out/tmp_{jobno}.{fn}".format( + jobno=jobno, fn=os.path.basename(fn) + ) + for key, fn in dict(self.snakemake.output).items() + } + log = { + key: "job_out.{jobno}.d/log/tmp_{jobno}.{fn}".format( + jobno=jobno, fn=os.path.basename(fn) + ) + for key, fn in dict(self.snakemake.log).items() + } + vals = { + "input": repr(dict(self.snakemake.input)), + "jobno": jobno, + "params": repr(params), + "output": repr(output), + "log": repr(log), + "wrapper_prefix": "file://" + self.wrapper_base_dir, + "inner_wrapper": self.inner_wrapper, + } + yield textwrap.dedent( + r""" + rule chunk_{jobno}: + input: + **{input} + output: + **{output} + log: + **{log} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{params} + wrapper: '{wrapper_prefix}/snappy_wrappers/wrappers/{inner_wrapper}' + """ + ).format(**vals).lstrip() + + def construct_merge_rule(self): + """Join the overall result files. + + The aim of this class is to enable writing wrappers agnostic of parallelisation. + The wrappers should only test if there is a `snakemake.params[args][intervals]`, + and restrict the computation to those intervals when present. + Otherwise, the wrapper should generate output & logs regardless of parallelisation. + + When the user's defined interval length is very small, the number of parallel + chunks is very large. When it exceeds `merge_block_size`, the merging is done + is two passes: merging blocks of `merge_block_size` chunks, and then merging + the level one blocks just merged to generate the final output. + + The output files in `snakemake.output` are used to generate the output in all + chunk rules. This is also used to generate input for merge rules (level one & final). + The log files of each chunk are put in a tar file at each merging step, to ensure + that the complete log of each chunk is kept. + TODO: The tr file of the complete log is generated in `work`, but it is not linked + to `output`, as for the other files. It means that an automatic upload of complete + logs to SODAR is not yet possible, if the upload is based on the contents of òutput`. + TODO: This blurb should be expanded in moved to a developer's documentation. + """ + # Extract all outputs from snakemake object + final_input = {} + merge_rules = [] + chunk_logs = None + outputs = dict(self.snakemake.output) + + # Distinguish cases of more than two levels, two levels, and one levels for merging. + n_jobs = len(self.get_regions()) + if n_jobs > self.merge_block_size * self.merge_block_size: + # Too many files (>1M with merge_block_size of 1k) + raise Exception("Number of output file requires more than two mergin steps!") + elif n_jobs > self.merge_block_size: + # We need two merge passes. + # Find the number of level one merge jobs + n_merge = (n_jobs + self.merge_block_size - 1) // self.merge_block_size + # Loop creating level one merge jobs + final_input = {key: [] for key in outputs.keys()} + for i_merge in range(n_merge): + # Create level one merge rule & append it to list + merge_input, merge_output, merge_log = self._prepare_level_one_merge( + i_merge, n_jobs + ) + merge_rules.append( + self._construct_level_one_merge_rule( + i_merge, + merge_input, + merge_output, + merge_log, + "merge_out.{jobno}.d/log/merge.tar.gz".format(jobno=i_merge), + ) + ) + for key in final_input.keys(): + final_input[key].append(merge_output[key]) + # Create the list of all merge level 1 logs + chunk_logs = [ + "merge_out.{jobno}.d/log/*".format(jobno=i_merge) for i_merge in range(n_merge) + ] + else: + # We can do with one merge pass, all chunks output as final merge input. + for key, fn in outputs.items(): + final_input[key] = [ + "job_out.{jobno}.d/out/tmp_{jobno}.{fn}".format( + jobno=i_job, fn=os.path.basename(fn) + ) + for i_job in range(n_jobs) + ] + chunk_logs = ["job_out.{jobno}.d/log/*".format(jobno=i_job) for i_job in range(n_jobs)] + + # Create final merge rule & append to list + merge_rules.append( + self._construct_final_merge_rule( + final_input, + outputs, + chunk_logs, + os.path.join(self.main_cwd, self.snakemake.log.log + ".merge.tar.gz"), + ) + ) + + return "\n\n".join(merge_rules) + + def _prepare_level_one_merge(self, i_merge, n_jobs): + """Create input, output & logs for a level one merge job""" + outputs = dict(self.snakemake.output) + # List of jobs in merge + first_job = i_merge * self.merge_block_size + last_job = (i_merge + 1) * self.merge_block_size + if last_job > n_jobs: + last_job = n_jobs + + # Collect all chunks outputs as input for this merge + merge_input = {} + for key, fn in outputs.items(): + merge_input[key] = [ + "job_out.{jobno}.d/out/tmp_{jobno}.{fn}".format( + jobno=i_job, fn=os.path.basename(fn) + ) + for i_job in range(first_job, last_job) + ] + + # Create merge outputs & logs + merge_output = {} + for key, fn in outputs.items(): + path = "merge_out.{jobno}.d/out/merge_{jobno}.{fn}".format( + jobno=i_merge, fn=os.path.basename(fn) + ) + merge_output[key] = path + + merge_log = [ + "job_out.{jobno}.d/log/*".format(jobno=i_job) for i_job in range(first_job, last_job) + ] + + return (merge_input, merge_output, merge_log) + + def _construct_level_one_merge_rule( + self, chunk_no, merge_input, merge_output, chunk_logs, merge_log + ): + return ( + textwrap.dedent( + r""" + rule merge_chunk_{chunk_no}: + input: **{chunk_input} + output: **{chunk_output} + threads: resource_merge_threads + resources: + time=resource_merge_time, + memory=resource_merge_memory, + partition=resource_merge_partition, + shell: + r''' + set -euo pipefail # Unofficial Bash strict mode + + # Merge chunks output ------------------------------------------ + {merge_code} + + # Save chunk logs in tarball ----------------------------------- + mkdir -p $(dirname {merge_log}) + tar -zcvf {merge_log} {chunk_logs} + pushd $(dirname {merge_log}) + f=$(basename {merge_log}) + md5sum $f > $f.md5 + popd + ''' + """ + ) + .lstrip() + .format( + chunk_no=chunk_no, + chunk_input=repr(merge_input), + chunk_output=repr(merge_output), + merge_code=self.merge_code_level_one(), + chunk_logs=" ".join(chunk_logs), + merge_log=merge_log, + ) + ) + + def _construct_final_merge_rule(self, merge_input, merge_output, chunk_logs, merge_log): + return ( + textwrap.dedent( + r""" + rule merge_all: + input: **{all_input} + output: **{all_output} + log: **{all_log} + threads: resource_merge_threads + resources: + time=resource_merge_time, + memory=resource_merge_memory, + partition=resource_merge_partition, + shell: + r''' + set -euo pipefail # Unofficial Bash strict mode + + # Merge chunks output ------------------------------------------ + {merge_code} + + # Save chunk logs in tarball ----------------------------------- + mkdir -p $(dirname {merge_log}) + tar -zcvf {merge_log} {chunk_logs} + pushd $(dirname {merge_log}) + f=$(basename {merge_log}) + md5sum $f > $f.md5 + popd + ''' + """ + ) + .lstrip() + .format( + all_input=repr(merge_input), + all_output=self.get_all_output(), + all_log=self.get_all_log(), + merge_code=self.merge_code_final(), + chunk_logs=" ".join(chunk_logs), + merge_log=merge_log, + ) + ) diff --git a/snappy_wrappers/wrappers/mutect2/contamination/wrapper.py b/snappy_wrappers/wrappers/mutect2/contamination/wrapper.py index fb8091c6f..30ce60469 100644 --- a/snappy_wrappers/wrappers/mutect2/contamination/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2/contamination/wrapper.py @@ -18,25 +18,27 @@ export LD_LIBRARY_PATH=$(dirname $(which bgzip))/../lib # Also pipe everything to log file -if [[ -n "{snakemake.log}" ]]; then +if [[ -n "{snakemake.log.log}" ]]; then if [[ "$(set +e; tty; set -e)" != "" ]]; then - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - exec &> >(tee -a "{snakemake.log}" >&2) + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + exec &> >(tee -a "{snakemake.log.log}" >&2) else - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - echo "No tty, logging disabled" >"{snakemake.log}" + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + echo "No tty, logging disabled" >"{snakemake.log.log}" fi fi -# TODO: add through shell.prefix -export TMPDIR=/fast/users/$USER/scratch/tmp +# Write out information about conda installation. +conda list >{snakemake.log.conda_list} +conda info >{snakemake.log.conda_info} +md5sum {snakemake.log.conda_list} >{snakemake.log.conda_list_md5} +md5sum {snakemake.log.conda_info} >{snakemake.log.conda_info_md5} -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT -mkdir -p $TMPDIR/out +# Setup auto-cleaned tmpdir +export tmpdir=$(mktemp -d) +trap "rm -rf $tmpdir" EXIT -out_base=$TMPDIR/out/$(basename {snakemake.output.table} .contamination.tbl) +out_base=$tmpdir/$(basename {snakemake.output.table} .contamination.tbl) gatk --java-options '-Xms4000m -Xmx8000m' CalculateContamination \ --input {snakemake.input.tumor} \ @@ -44,7 +46,7 @@ --tumor-segmentation ${{out_base}}.segments.tbl \ --output ${{out_base}}.contamination.tbl -pushd $TMPDIR && \ +pushd $tmpdir && \ for f in $out_base.*; do \ md5sum $f >$f.md5; \ done && \ @@ -53,3 +55,10 @@ mv $out_base.* $(dirname {snakemake.output.table}) """ ) + +# Compute MD5 sums of logs. +shell( + r""" +md5sum {snakemake.log.log} >{snakemake.log.log_md5} +""" +) diff --git a/snappy_wrappers/wrappers/mutect2/create_panel/wrapper.py b/snappy_wrappers/wrappers/mutect2/create_panel/wrapper.py index fbc5393af..8c3742c00 100644 --- a/snappy_wrappers/wrappers/mutect2/create_panel/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2/create_panel/wrapper.py @@ -43,50 +43,50 @@ fi fi -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf ${{TMPDIR}}" EXIT -mkdir -p ${{TMPDIR}}/out -mkdir -p ${{TMPDIR}}/vcfs +# Setup auto-cleaned tmpdir +export tmpdir=$(mktemp -d) +trap "rm -rf ${{tmpdir}}" EXIT +mkdir -p ${{tmpdir}}/out +mkdir -p ${{tmpdir}}/vcfs -out_base=${{TMPDIR}}/out/$(basename {snakemake.output.vcf} .vcf.gz) +out_base=${{tmpdir}}/out/$(basename {snakemake.output.vcf} .vcf.gz) vcfs=$(echo "{snakemake.input.normals}" | tr ' ' '\n') # Create a file with the list of contigs & vcf list for GenomicsDBImport -rm -f ${{TMPDIR}}/contigs.txt +rm -f ${{tmpdir}}/contigs.txt cmd="" for vcf in ${{vcfs}} do bcftools view -h ${{vcf}} \ | grep "^##contig=<" \ | sed -re "s/.*ID=([^,>]+),length=([^,>]+).*/\1:1-\2/" \ - >> ${{TMPDIR}}/contigs_all.list + >> ${{tmpdir}}/contigs_all.list cmd="$cmd -V ${{vcf}} " done -sort ${{TMPDIR}}/contigs_all.list | uniq > ${{TMPDIR}}/contigs.list +sort ${{tmpdir}}/contigs_all.list | uniq > ${{tmpdir}}/contigs.list # Create the genomicsdb rm -rf pon_db gatk --java-options '-Xms10000m -Xmx20000m' GenomicsDBImport \ - --tmp-dir ${{TMPDIR}} \ + --tmp-dir ${{tmpdir}} \ --reference {snakemake.config[static_data_config][reference][path]} \ - --genomicsdb-workspace-path ${{TMPDIR}}/pon_db \ - --intervals ${{TMPDIR}}/contigs.list \ + --genomicsdb-workspace-path ${{tmpdir}}/pon_db \ + --intervals ${{tmpdir}}/contigs.list \ $cmd # Create the panel of normals vcf gatk CreateSomaticPanelOfNormals \ - --tmp-dir ${{TMPDIR}} \ + --tmp-dir ${{tmpdir}} \ --reference {snakemake.config[static_data_config][reference][path]} \ --germline-resource "{snakemake.config[step_config][panel_of_normals][mutect2][germline_resource]}" \ - --variant gendb://${{TMPDIR}}/pon_db \ + --variant gendb://${{tmpdir}}/pon_db \ --output ${{out_base}}.vcf bgzip ${{out_base}}.vcf tabix -f ${{out_base}}.vcf.gz -pushd $TMPDIR && \ +pushd $tmpdir && \ for f in ${{out_base}}.*; do \ md5sum $f >$f.md5; \ done && \ diff --git a/snappy_wrappers/wrappers/mutect2/filter/wrapper.py b/snappy_wrappers/wrappers/mutect2/filter/wrapper.py index 9dab72091..2b7b8e186 100644 --- a/snappy_wrappers/wrappers/mutect2/filter/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2/filter/wrapper.py @@ -8,6 +8,15 @@ reference = snakemake.config["static_data_config"]["reference"]["path"] +segments = ( + " --tumor-segmentation {} ".format(snakemake.input.segments) + if "segments" in snakemake.input + else "" +) +table = ( + " --contamination-table {} ".format(snakemake.input.table) if "table" in snakemake.input else "" +) + shell.executable("/bin/bash") shell( @@ -18,38 +27,42 @@ export LD_LIBRARY_PATH=$(dirname $(which bgzip))/../lib # Also pipe everything to log file -if [[ -n "{snakemake.log}" ]]; then +if [[ -n "{snakemake.log.log}" ]]; then if [[ "$(set +e; tty; set -e)" != "" ]]; then - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - exec &> >(tee -a "{snakemake.log}" >&2) + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + exec &> >(tee -a "{snakemake.log.log}" >&2) else - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - echo "No tty, logging disabled" >"{snakemake.log}" + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + echo "No tty, logging disabled" >"{snakemake.log.log}" fi fi -# TODO: add through shell.prefix -export TMPDIR=/fast/users/$USER/scratch/tmp -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT +# Write out information about conda installation. +conda list >{snakemake.log.conda_list} +conda info >{snakemake.log.conda_info} +md5sum {snakemake.log.conda_list} >{snakemake.log.conda_list_md5} +md5sum {snakemake.log.conda_info} >{snakemake.log.conda_info_md5} + +# Setup auto-cleaned tmpdir +export tmpdir=$(mktemp -d) +trap "rm -rf $tmpdir" EXIT # Extract orientation stats for all chunks -mkdir -p $TMPDIR/f1r2 -rel_path=$(realpath --relative-to=$TMPDIR/f1r2 {snakemake.input.f1r2}) -pushd $TMPDIR/f1r2 +mkdir -p $tmpdir/f1r2 +rel_path=$(realpath --relative-to=$tmpdir/f1r2 {snakemake.input.f1r2}) +pushd $tmpdir/f1r2 tar -zxvf ${{rel_path}} popd # Create command line list of all f1r2 stats -chunks=$(ls $TMPDIR/f1r2/*.tar.gz) +chunks=$(ls $tmpdir/f1r2/*.tar.gz) cmd=$(echo "$chunks" | tr '\n' ' ' | sed -e "s/ *$//" | sed -e "s/ / -I /g") # Create orientation model gatk --java-options '-Xms4000m -Xmx8000m' LearnReadOrientationModel \ -I $cmd \ - -O $TMPDIR/read-orientation-model.tar.gz + -O $tmpdir/read-orientation-model.tar.gz # Workaround problem with bcftools merging inserting missing values (.) in MPOS zcat {snakemake.input.raw} \ @@ -62,16 +75,15 @@ print $0; }} }}' \ - > $TMPDIR/in.vcf + > $tmpdir/in.vcf # Filter calls gatk --java-options '-Xms4000m -Xmx8000m' FilterMutectCalls \ --reference {reference} \ - --tumor-segmentation {snakemake.input.segments} \ - --contamination-table {snakemake.input.table} \ - --ob-priors $TMPDIR/read-orientation-model.tar.gz \ + {segments} {table} \ + --ob-priors $tmpdir/read-orientation-model.tar.gz \ --stats {snakemake.input.stats} \ - --variant $TMPDIR/in.vcf \ + --variant $tmpdir/in.vcf \ --output {snakemake.output.full} # Index & move to final dest @@ -93,3 +105,10 @@ popd """ ) + +# Compute MD5 sums of logs. +shell( + r""" +md5sum {snakemake.log.log} >{snakemake.log.log_md5} +""" +) diff --git a/snappy_wrappers/wrappers/mutect2/pileup/wrapper.py b/snappy_wrappers/wrappers/mutect2/pileup/wrapper.py index 88ea8d8fa..595b10cf8 100644 --- a/snappy_wrappers/wrappers/mutect2/pileup/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2/pileup/wrapper.py @@ -21,25 +21,27 @@ export LD_LIBRARY_PATH=$(dirname $(which bgzip))/../lib # Also pipe everything to log file -if [[ -n "{snakemake.log}" ]]; then +if [[ -n "{snakemake.log.log}" ]]; then if [[ "$(set +e; tty; set -e)" != "" ]]; then - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - exec &> >(tee -a "{snakemake.log}" >&2) + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + exec &> >(tee -a "{snakemake.log.log}" >&2) else - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - echo "No tty, logging disabled" >"{snakemake.log}" + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + echo "No tty, logging disabled" >"{snakemake.log.log}" fi fi -# TODO: add through shell.prefix -export TMPDIR=/fast/users/$USER/scratch/tmp +# Write out information about conda installation. +conda list >{snakemake.log.conda_list} +conda info >{snakemake.log.conda_info} +md5sum {snakemake.log.conda_list} >{snakemake.log.conda_list_md5} +md5sum {snakemake.log.conda_info} >{snakemake.log.conda_info_md5} -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT -mkdir -p $TMPDIR/out +# Setup auto-cleaned tmpdir +export tmpdir=$(mktemp -d) +trap "rm -rf $tmpdir" EXIT -out_base=$TMPDIR/out/$(basename {snakemake.output.pileup} .pileup) +out_base=$tmpdir/$(basename {snakemake.output.pileup} .pileup) gatk --java-options '-Xms4000m -Xmx8000m' GetPileupSummaries \ --input {snakemake.input.bam} \ @@ -48,7 +50,7 @@ --intervals {common_variants} \ --output $out_base.pileup -pushd $TMPDIR && \ +pushd $tmpdir && \ for f in $out_base.*; do \ md5sum $f >$f.md5; \ done && \ @@ -57,3 +59,10 @@ mv $out_base.* $(dirname {snakemake.output.pileup}) """ ) + +# Compute MD5 sums of logs. +shell( + r""" +md5sum {snakemake.log.log} >{snakemake.log.log_md5} +""" +) diff --git a/snappy_wrappers/wrappers/mutect2/prepare_panel/wrapper.py b/snappy_wrappers/wrappers/mutect2/prepare_panel/wrapper.py index f1bad6698..874674680 100644 --- a/snappy_wrappers/wrappers/mutect2/prepare_panel/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2/prepare_panel/wrapper.py @@ -16,48 +16,55 @@ export LD_LIBRARY_PATH=$(dirname $(which bgzip))/../lib # Also pipe everything to log file -if [[ -n "{snakemake.log}" ]]; then +if [[ -n "{snakemake.log.log}" ]]; then if [[ "$(set +e; tty; set -e)" != "" ]]; then - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - exec &> >(tee -a "{snakemake.log}" >&2) + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + exec &> >(tee -a "{snakemake.log.log}" >&2) else - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - echo "No tty, logging disabled" >"{snakemake.log}" + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + echo "No tty, logging disabled" >"{snakemake.log.log}" fi fi -# TODO: add through shell.prefix -export TMPDIR=/fast/users/$USER/scratch/tmp +# Write out information about conda installation. +conda list >{snakemake.log.conda_list} +conda info >{snakemake.log.conda_info} +md5sum {snakemake.log.conda_list} >{snakemake.log.conda_list_md5} +md5sum {snakemake.log.conda_info} >{snakemake.log.conda_info_md5} -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT -mkdir -p $TMPDIR/out +# Setup auto-cleaned tmpdir +tmpdir=$(mktemp -d) +trap "rm -rf $tmpdir" EXIT -out_base=$TMPDIR/out/$(basename {snakemake.output.vcf} .vcf.gz) +vcf=$(basename --suffix=.gz {snakemake.output.vcf}) gatk --java-options "-Xmx6g" Mutect2 \ - --tmp-dir ${{TMPDIR}} \ + --tmp-dir ${{tmpdir}} \ --reference {snakemake.config[static_data_config][reference][path]} \ - --input {snakemake.input} \ + --input {snakemake.input.normal_bam} \ --max-mnp-distance 0 \ - --output ${{out_base}}.vcf \ + --output $tmpdir/$vcf \ $(if [[ -n "{snakemake.params.args[intervals]}" ]]; then for itv in "{snakemake.params.args[intervals]}"; do \ echo -n "--intervals $itv "; \ done; \ fi) -bgzip ${{out_base}}.vcf +bgzip $tmpdir/$vcf +tabix $tmpdir/$vcf.gz -gatk IndexFeatureFile --input ${{out_base}}.vcf.gz +pushd $tmpdir +md5sum $vcf.gz > $vcf.gz.md5 +md5sum $vcf.gz.tbi > $vcf.gz.tbi.md5 +popd -pushd $TMPDIR && \ - for f in ${{out_base}}.*; do \ - md5sum $f >$f.md5; \ - done && \ - popd +mv $tmpdir/$vcf.gz $tmpdir/$vcf.gz.md5 $tmpdir/$vcf.gz.tbi $tmpdir/$vcf.gz.tbi.md5 $(dirname {snakemake.output.vcf}) +""" +) -mv ${{out_base}}.* $(dirname {snakemake.output.vcf}) +# Compute MD5 sums of logs. +shell( + r""" +md5sum {snakemake.log.log} >{snakemake.log.log_md5} """ ) diff --git a/snappy_wrappers/wrappers/mutect2/run/wrapper.py b/snappy_wrappers/wrappers/mutect2/run/wrapper.py index f72104c43..363fa9e1c 100644 --- a/snappy_wrappers/wrappers/mutect2/run/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2/run/wrapper.py @@ -7,12 +7,9 @@ __author__ = "Manuel Holtgrewe " reference = snakemake.config["static_data_config"]["reference"]["path"] -germline_resource = snakemake.config["step_config"]["somatic_variant_calling"]["mutect2"][ - "germline_resource" -] -panel_of_normals = snakemake.config["step_config"]["somatic_variant_calling"]["mutect2"][ - "panel_of_normals" -] +config = snakemake.config["step_config"]["somatic_variant_calling"]["mutect2"] + +extra_arguments = " ".join(config["extra_arguments"]) shell.executable("/bin/bash") @@ -24,25 +21,27 @@ export LD_LIBRARY_PATH=$(dirname $(which bgzip))/../lib # Also pipe everything to log file -if [[ -n "{snakemake.log}" ]]; then +if [[ -n "{snakemake.log.log}" ]]; then if [[ "$(set +e; tty; set -e)" != "" ]]; then - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - exec &> >(tee -a "{snakemake.log}" >&2) + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + exec &> >(tee -a "{snakemake.log.log}" >&2) else - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - echo "No tty, logging disabled" >"{snakemake.log}" + rm -f "{snakemake.log.log}" && mkdir -p $(dirname {snakemake.log.log}) + echo "No tty, logging disabled" >"{snakemake.log.log}" fi fi -# TODO: add through shell.prefix -export TMPDIR=/fast/users/$USER/scratch/tmp +# Write out information about conda installation. +conda list >{snakemake.log.conda_list} +conda info >{snakemake.log.conda_info} +md5sum {snakemake.log.conda_list} >{snakemake.log.conda_list_md5} +md5sum {snakemake.log.conda_info} >{snakemake.log.conda_info_md5} -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT -mkdir -p $TMPDIR/out +# Setup auto-cleaned tmpdir +export tmpdir=$(mktemp -d) +trap "rm -rf $tmpdir" EXIT -out_base=$TMPDIR/out/$(basename {snakemake.output.raw} .vcf.gz) +out_base=$tmpdir/$(basename {snakemake.output.raw} .vcf.gz) # Add intervals if required intervals="" @@ -54,23 +53,22 @@ done fi -# Add panel of normals if required -pon="" -if [[ "X{panel_of_normals}" != "X" ]] -then - pon=" --panel-of-normals {panel_of_normals}" -fi - gatk Mutect2 \ - --tmp-dir $TMPDIR \ + --tmp-dir $tmpdir \ --input {snakemake.input.normal_bam} \ --input {snakemake.input.tumor_bam} \ --normal "{snakemake.params.normal_lib_name}" \ --reference {reference} \ - --germline-resource {germline_resource} \ - --f1r2-tar-gz ${{out_base}}.f1r2.tar.gz \ --output $out_base.vcf \ - $intervals $pon + --f1r2-tar-gz ${{out_base}}.f1r2.tar.gz \ + $(if [[ -n "{config[germline_resource]}" ]]; then \ + echo --germline-resource {config[germline_resource]} + fi) \ + $(if [[ -n "{config[panel_of_normals]}" ]]; then \ + echo --panel-of-normals {config[panel_of_normals]} + fi) \ + $intervals \ + {extra_arguments} rm -f $out_base.vcf.idx @@ -83,14 +81,19 @@ tar -zcvf ${{out_base}}.f1r2_tar.tar.gz --directory ${{dir_base}} ${{file_base}}.f1r2.tar.gz -pushd $TMPDIR && \ - for f in $out_base.*; do \ - md5sum $f >$f.md5; \ - done && \ - popd +pushd $tmpdir +for f in $out_base.*; do + md5sum $f >$f.md5 +done +popd mv $out_base.* $(dirname {snakemake.output.raw}) -# d=$(dirname {snakemake.output.raw}) -# cp /fast/scratch/users/blance_c/tmp/tmpqd1wusadsomatic_variant_calling_mutect2/$d/* $(dirname {snakemake.output.raw}) +""" +) + +# Compute MD5 sums of logs. +shell( + r""" +md5sum {snakemake.log.log} >{snakemake.log.log_md5} """ ) diff --git a/snappy_wrappers/wrappers/mutect2/select_panel/wrapper.py b/snappy_wrappers/wrappers/mutect2/select_panel/wrapper.py deleted file mode 100644 index cc2689e37..000000000 --- a/snappy_wrappers/wrappers/mutect2/select_panel/wrapper.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -"""CUBI+Snakemake wrapper code for MuTect 2: Snakemake wrapper.py -""" - -from snakemake import shell - -__author__ = "Manuel Holtgrewe " - -shell.executable("/bin/bash") - -shell( - r""" -set -x - -export JAVA_HOME=$(dirname $(which gatk))/.. -export LD_LIBRARY_PATH=$(dirname $(which bgzip))/../lib - -# Also pipe everything to log file -if [[ -n "{snakemake.log}" ]]; then - if [[ "$(set +e; tty; set -e)" != "" ]]; then - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - exec &> >(tee -a "{snakemake.log}" >&2) - else - rm -f "{snakemake.log}" && mkdir -p $(dirname {snakemake.log}) - echo "No tty, logging disabled" >"{snakemake.log}" - fi -fi - -# TODO: add through shell.prefix -export TMPDIR=/fast/users/$USER/scratch/tmp - -# Setup auto-cleaned TMPDIR -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT -mkdir -p $TMPDIR/out - -out_base=$TMPDIR/out/$(basename {snakemake.output.vcf} .vcf.gz) - -gatk -Xmx16g GenomicsDBImport \ - --tmp-dir ${{TMPDIR}} \ - --reference {snakemake.config[static_data_config][reference][path]} \ - --input {snakemake.input.normal_bam} \ - --output ${{out_base}}.vcf.gz \ - -tabix -f ${{out_base}}.vcf.gz - -rm -f ${{out_base}}.vcf.idx - -tabix -f ${{out_base}}.vcf.gz - -pushd $TMPDIR && \ - for f in ${{out_base}}.*; do \ - md5sum $f >$f.md5; \ - done && \ - popd - -mv ${{out_base}}.* $(dirname {snakemake.output.vcf}) -""" -) diff --git a/snappy_wrappers/wrappers/mutect2_par/prepare_panel/parallel_prepare_panel.py b/snappy_wrappers/wrappers/mutect2_par/prepare_panel/parallel_prepare_panel.py index 8b2e6980c..f8cd535ff 100644 --- a/snappy_wrappers/wrappers/mutect2_par/prepare_panel/parallel_prepare_panel.py +++ b/snappy_wrappers/wrappers/mutect2_par/prepare_panel/parallel_prepare_panel.py @@ -16,14 +16,13 @@ from snappy_wrappers.resource_usage import ResourceUsage # noqa: E402 from snappy_wrappers.wrapper_parallel import ( # noqa: E402 - ParallelVariantCallingBaseWrapper, gib_to_string, hours, + ParallelMutect2BaseWrapper, ) -class ParallelMutect2Wrapper(ParallelVariantCallingBaseWrapper): - """Parallel execution of MuTect 2""" +class ParallelMutect2Wrapper(ParallelMutect2BaseWrapper): inner_wrapper = "mutect2/prepare_panel" step_name = "panel_of_normals" @@ -44,158 +43,36 @@ def __init__(self, snakemake): partition=os.getenv("SNAPPY_PIPELINE_PARTITION"), ) - def construct_preamble(self): - """Return a preamble that redefines resource_chunk_{threads,memory} to - define functions as "scaling up" with the number of attempts. - """ - return ( - textwrap.dedent( - r""" - shell.executable("/bin/bash") - shell.prefix("set -ex;") - - configfile: 'config.json' - - localrules: all - - def multiply_time(day_time_str, factor): - # Check if time contains day, ex: '1-00:00:00' - if "-" in day_time_str: - arr_ = day_time_str.split("-") - days = int(arr_[0]) - time_str = arr_[1] - else: - days = 0 - time_str = day_time_str - - # Process based on time structure - arr_ = time_str.split(":") - if time_str.count(":") == 2: # hours:minutes:seconds - seconds = int(arr_[0]) * 60 * 60 + int(arr_[1]) * 60 + int(arr_[2]) - elif time_str.count(":") == 1: # minutes:seconds - seconds = int(arr_[0]) * 60 + int(arr_[1]) - elif time_str.count(":") == 0: # minutes - seconds = int(time_str) * 60 - else: - raise ValueError(f"Invalid time: {{day_time_str}}") - # Add days to second - seconds += days * 86400 - - # Apply factor - seconds = int(seconds * factor) - - # Normalise time - (norm_days, remainder) = divmod(seconds, 86400) - (hours, remainder) = divmod(remainder, 3600) - (minutes, seconds) = divmod(remainder, 60) - - # Fill string - example hour '7' -> '07' - h_str = str(hours).zfill(2) - m_str = str(minutes).zfill(2) - s_str = str(seconds).zfill(2) - - return "%d-%s:%s:%s" % (norm_days, h_str, m_str, s_str) - - - def multiply_memory(memory_str, factor): - memory_mb = None - suffixes = ( - ("k", 1e-3), - ("M", 1), - ("G", 1e3), - ("T", 1e6), - ) - for (suffix, mult) in suffixes: - if memory_str.endswith(suffix): - memory_mb = float(memory_str[:-1]) * mult - break - # No match, assume no suffix int - if not memory_mb: - memory_mb = float(memory_str) - return int(memory_mb * factor) - - def resource_chunk_threads(wildcards): - '''Return the number of threads to use for running one chunk.''' - return {chunk_resources_threads} - - def resource_chunk_memory(wildcards, attempt): - '''Return the memory to use for running one chunk.''' - return multiply_memory({chunk_resources_memory}, attempt) - - def resource_chunk_time(wildcards, attempt): - '''Return the time to use for running one chunk.''' - return multiply_time({chunk_resources_time}, attempt) - - def resource_chunk_partition(wildcards): - '''Return the partition to use for running one chunk.''' - return {chunk_resources_partition} - - def resource_merge_threads(wildcards): - '''Return the number of threads to use for running merging.''' - return {merge_resources_threads} - - def resource_merge_memory(wildcards): - '''Return the memory to use for running merging.''' - return {merge_resources_memory} - - def resource_merge_time(wildcards): - '''Return the time to use for running merging.''' - return {merge_resources_time} - - def resource_merge_partition(wildcards): - '''Return the partition to use for running merging.''' - return {merge_resources_partition} - - rule all: - input: **{all_output} - """ - ) - .lstrip() - .format( - all_output=repr(self.get_all_output()), - chunk_resources_threads=repr(self.job_resources.threads), - chunk_resources_time=repr(self.job_resources.time), - chunk_resources_memory=repr(self.job_resources.memory), - chunk_resources_partition=repr(self.job_resources.partition), - merge_resources_threads=repr(self.merge_resources.threads), - merge_resources_time=repr(self.merge_resources.time), - merge_resources_memory=repr(self.merge_resources.memory), - merge_resources_partition=repr(self.merge_resources.partition), - ) + @classmethod + def allow_resources_increase(cls): + return True + + @classmethod + def merge_code_level_one(cls): + return cls._merge_code() + + @classmethod + def merge_code_final(cls): + return cls._merge_code() + + @classmethod + def _merge_code(cls): + return textwrap.dedent( + r""" + # Concatenate raw calls vcfs & index result ---------------------- + bcftools concat \ + --allow-overlaps \ + -d none \ + -o {output.vcf} \ + -O z \ + {input.vcf} + tabix -f {output.vcf} + + # Compute md5 sums ----------------------------------------------- + pushd $(dirname {output.vcf}) + f=$(basename {output.vcf}) + md5sum $f > $f.md5 + md5sum $f.tbi > $f.tbi.md5 + popd + """ ) - - def construct_parallel_rules(self): - """Construct the rules for parallel processing to generate.""" - for jobno, region in enumerate(self.get_regions()): - params = dict(self.snakemake.params) - params.setdefault("args", {}).update({"intervals": [region.human_readable()]}) - output = { - key: "job_out.{jobno}.d/out/tmp_{jobno}.{ext}".format(jobno=jobno, ext=ext) - for key, ext in self.key_ext.items() - } - vals = { - "input_bam": repr(self.snakemake.input.normal_bam), - "jobno": jobno, - "params": repr(params), - "output": repr(output), - "wrapper_prefix": "file://" + self.wrapper_base_dir, - "inner_wrapper": self.inner_wrapper, - } - yield textwrap.dedent( - r""" - rule chunk_{jobno}: - input: - {input_bam}, - output: - touch("job_out.{jobno}.d/.done"), - **{output} - threads: resource_chunk_threads - resources: - time=resource_chunk_time, - memory=resource_chunk_memory, - partition=resource_chunk_partition, - params: - **{params} - wrapper: '{wrapper_prefix}/snappy_wrappers/wrappers/{inner_wrapper}' - """ - ).format(**vals).lstrip() diff --git a/snappy_wrappers/wrappers/mutect2_par/prepare_panel/wrapper.py b/snappy_wrappers/wrappers/mutect2_par/prepare_panel/wrapper.py index 2fedaadfc..6c46f6f51 100644 --- a/snappy_wrappers/wrappers/mutect2_par/prepare_panel/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2_par/prepare_panel/wrapper.py @@ -15,10 +15,3 @@ # Kick off execution using the wrapper class defined above. ParallelMutect2Wrapper(snakemake).run().shutdown_logging() - -# Compute MD5 sums of logs. -shell( - r""" -md5sum {snakemake.log.log} >{snakemake.log.log_md5} -""" -) diff --git a/snappy_wrappers/wrappers/mutect2_par/run/parallel_mutect2.py b/snappy_wrappers/wrappers/mutect2_par/run/parallel_mutect2.py index 716ae8df3..d2713eaa1 100644 --- a/snappy_wrappers/wrappers/mutect2_par/run/parallel_mutect2.py +++ b/snappy_wrappers/wrappers/mutect2_par/run/parallel_mutect2.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- """Definition for Mutect2 variant caller in parallel, genome is split into windows + +isort:skip_file """ import os @@ -14,40 +16,18 @@ from snappy_wrappers.resource_usage import ResourceUsage # noqa: E402 from snappy_wrappers.wrapper_parallel import ( # noqa: E402 - ParallelSomaticVariantCallingBaseWrapper, gib_to_string, hours, + ParallelMutect2BaseWrapper, ) -class ParallelMutect2Wrapper(ParallelSomaticVariantCallingBaseWrapper): - """Parallel execution of MuTect 2""" +class ParallelMutect2Wrapper(ParallelMutect2BaseWrapper): inner_wrapper = "mutect2/run" step_name = "somatic_variant_calling" tool_name = "mutect2" - realpath_output_keys = ( - "raw", - "raw_md5", - "raw_tbi", - "raw_tbi_md5", - "stats", - "stats_md5", - "f1r2", - "f1r2_md5", - ) - key_ext = { - "raw": "vcf.gz", - "raw_md5": "vcf.gz.md5", - "raw_tbi": "vcf.gz.tbi", - "raw_tbi_md5": "vcf.gz.tbi.md5", - "stats": "vcf.stats", - "stats_md5": "vcf.stats.md5", - "f1r2": "f1r2_tar.tar.gz", - "f1r2_md5": "f1r2_tar.tar.gz.md5", - } - def __init__(self, snakemake): super().__init__(snakemake) self.job_resources = ResourceUsage( @@ -63,279 +43,50 @@ def __init__(self, snakemake): partition=os.getenv("SNAPPY_PIPELINE_PARTITION"), ) - def construct_preamble(self): - """Return a preamble that redefines resource_chunk_{threads,memory} to - define functions as "scaling up" with the number of attempts. - """ - return ( - textwrap.dedent( - r""" - shell.executable("/bin/bash") - shell.prefix("set -ex;") - - configfile: 'config.json' - - localrules: all - - def multiply_time(day_time_str, factor): - # Check if time contains day, ex: '1-00:00:00' - if "-" in day_time_str: - arr_ = day_time_str.split("-") - days = int(arr_[0]) - time_str = arr_[1] - else: - days = 0 - time_str = day_time_str - - # Process based on time structure - arr_ = time_str.split(":") - if time_str.count(":") == 2: # hours:minutes:seconds - seconds = int(arr_[0]) * 60 * 60 + int(arr_[1]) * 60 + int(arr_[2]) - elif time_str.count(":") == 1: # minutes:seconds - seconds = int(arr_[0]) * 60 + int(arr_[1]) - elif time_str.count(":") == 0: # minutes - seconds = int(time_str) * 60 - else: - raise ValueError(f"Invalid time: {{day_time_str}}") - # Add days to second - seconds += days * 86400 - - # Apply factor - seconds = int(seconds * factor) - - # Normalise time - (norm_days, remainder) = divmod(seconds, 86400) - (hours, remainder) = divmod(remainder, 3600) - (minutes, seconds) = divmod(remainder, 60) - - # Fill string - example hour '7' -> '07' - h_str = str(hours).zfill(2) - m_str = str(minutes).zfill(2) - s_str = str(seconds).zfill(2) - - return "%d-%s:%s:%s" % (norm_days, h_str, m_str, s_str) - - - def multiply_memory(memory_str, factor): - memory_mb = None - suffixes = ( - ("k", 1e-3), - ("M", 1), - ("G", 1e3), - ("T", 1e6), - ) - for (suffix, mult) in suffixes: - if memory_str.endswith(suffix): - memory_mb = float(memory_str[:-1]) * mult - break - # No match, assume no suffix int - if not memory_mb: - memory_mb = float(memory_str) - return int(memory_mb * factor) - - def resource_chunk_threads(wildcards): - '''Return the number of threads to use for running one chunk.''' - return {chunk_resources_threads} - - def resource_chunk_memory(wildcards, attempt): - '''Return the memory to use for running one chunk.''' - return multiply_memory({chunk_resources_memory}, attempt) - - def resource_chunk_time(wildcards, attempt): - '''Return the time to use for running one chunk.''' - return multiply_time({chunk_resources_time}, attempt) - - def resource_chunk_partition(wildcards): - '''Return the partition to use for running one chunk.''' - return {chunk_resources_partition} - - def resource_merge_threads(wildcards): - '''Return the number of threads to use for running merging.''' - return {merge_resources_threads} - - def resource_merge_memory(wildcards): - '''Return the memory to use for running merging.''' - return {merge_resources_memory} - - def resource_merge_time(wildcards): - '''Return the time to use for running merging.''' - return {merge_resources_time} - - def resource_merge_partition(wildcards): - '''Return the partition to use for running merging.''' - return {merge_resources_partition} - - rule all: - input: **{all_output} - """ - ) - .lstrip() - .format( - all_output=repr(self.get_all_output()), - chunk_resources_threads=repr(self.job_resources.threads), - chunk_resources_time=repr(self.job_resources.time), - chunk_resources_memory=repr(self.job_resources.memory), - chunk_resources_partition=repr(self.job_resources.partition), - merge_resources_threads=repr(self.merge_resources.threads), - merge_resources_time=repr(self.merge_resources.time), - merge_resources_memory=repr(self.merge_resources.memory), - merge_resources_partition=repr(self.merge_resources.partition), - ) - ) - - def _construct_level_one_merge_rule(self, chunk_no, merge_input): - return ( - textwrap.dedent( - r""" - rule merge_chunk_{chunk_no}: - input: {chunk_input} - output: - vcf='merge_out.{chunk_no}.d/out/out.vcf.gz', - tbi='merge_out.{chunk_no}.d/out/out.vcf.gz.tbi', - stats='merge_out.{chunk_no}.d/out/out.vcf.stats', - f1r2='merge_out.{chunk_no}.d/out/out.f1r2_tar.tar.gz' - threads: resources_chunk_threads - resources: - time=resources_chunk_time, - memory=resources_chunk_memory, - partition=resources_chunk_partition, - shell: - r''' - set -euo pipefail # Unofficial Bash strict mode - - # Concatenate VCF files ----------------------------------------------- - - bcftools concat \ - --allow-overlaps \ - -d none \ - -o {{output.vcf}} \ - -O z \ - {{input}} - - tabix -f {{output.vcf}} - - # Concatenate stats --------------------------------------------------- - - chunks=$(echo "{{input}}" | sed -e "s/\.vcf\.gz/.vcf.stats/g" | sed -e "s/ / -stats /") - gatk MergeMutectStats -stats $chunks -O {output.stats} - - # Concatenate f1r2 tar files ------------------------------------------ - - tar_dir=$(dirname "{{output.vcf}}") - tar_dir="${{tar_dir}}/out.f1r2_tar" - - mkdir -p $tar_dir - pushd $tar_dir - - chunks=$(echo "{{input}}" | sed -e "s/\.vcf\.gz/.f1r2_tar.tar.gz/" | sed -re "s/job_out\.([0-9]+)\.d/..\/..\/job_out.\1.d/g") - for chunk in $chunks - do - tar -zxvf $chunk - done - tar -zcvf ../out.f1r2_tar.tar.gz * - - popd - rm -rf $tar_dir - ''' - """ - ) - .lstrip() - .format( - chunk_no=chunk_no, - chunk_input=repr(merge_input), - ) - ) - - def _construct_final_merge_rule(self, merge_input): - return ( - textwrap.dedent( - r""" - rule merge_all: - input: {all_input} - output: **{all_output} - threads: resource_merge_threads - resources: - time=resource_merge_time, - memory=resource_merge_memory, - partition=resource_merge_partition, - log: **{all_log} - shell: - r''' - set -euo pipefail # Unofficial Bash strict mode - - # Initialize output directory ----------------------------------------- - - outdir=$(basename {{output.raw}}) - - mkdir -p output - - # Concatenate VCF files ----------------------------------------------- - - bcftools concat \ - --allow-overlaps \ - -d none \ - -o output/out.vcf.gz \ - -O z \ - {{input}} - - # Concatenate stats --------------------------------------------------- - - chunks=$(echo "{{input}}" | sed -e "s/\.vcf\.gz/.vcf.stats/g" | sed -e "s/ / -stats /g") - stats="output/out.vcf.stats" - gatk MergeMutectStats -stats $chunks -O $stats - - # Concatenate f1r2 tar files ------------------------------------------ - - tar_dir="output/out.f1r2_tar" - - mkdir -p $tar_dir - pushd $tar_dir - - chunks=$(echo "{{input}}" | sed -e "s/\.vcf\.gz/.f1r2_tar.tar.gz/g" | sed -re "s/(job|merge)_out\.([0-9]+)\.d/..\/..\/\1_out.\2.d/g") - for chunk in $chunks - do - tar -zxvf $chunk - done - tar -zcvf ../out.f1r2_tar.tar.gz * - - popd - rm -rf $tar_dir - - # tabix index & md5 checksums ----------------------------------------- - - tabix -f output/out.vcf.gz - - pushd output - for f in *; do md5sum $f >$f.md5; done - popd - - # Move to output directory -------------------------------------------- - - mkdir -p $(dirname {{output.raw}}) - mv output/out.vcf.gz {{output.raw}} - mv output/out.vcf.gz.md5 {{output.raw_md5}} - mv output/out.vcf.gz.tbi {{output.raw_tbi}} - mv output/out.vcf.gz.tbi.md5 {{output.raw_tbi_md5}} - mv output/out.vcf.stats {{output.stats}} - mv output/out.vcf.stats.md5 {{output.stats_md5}} - mv output/out.f1r2_tar.tar.gz {{output.f1r2}} - mv output/out.f1r2_tar.tar.gz.md5 {{output.f1r2_md5}} - - # Write out information about conda installation. - conda list >{{log.conda_list}} - conda info >{{log.conda_info}} - - pushd $(dirname {{log.conda_list}}) - md5sum $(basename {{log.conda_list}}) >$(basename {{log.conda_list}}).md5 - md5sum $(basename {{log.conda_info}}) >$(basename {{log.conda_info}}).md5 - popd - ''' - """ - ) - .lstrip() - .format( - all_input=repr(merge_input), - all_output=repr(self.get_all_output()), - all_log=repr(self.get_all_log_files()), - ) + @classmethod + def allow_resources_increase(cls): + return True + + @classmethod + def merge_code_level_one(cls): + return cls._merge_code() + + @classmethod + def merge_code_final(cls): + return cls._merge_code() + + @classmethod + def _merge_code(cls): + return textwrap.dedent( + r""" + # Concatenate raw calls vcfs & index result ---------------------- + bcftools concat \ + --allow-overlaps \ + -d none \ + -o {output.raw} \ + -O z \ + {input.raw} + tabix -f {output.raw} + + # Concatenate stats with GATK tool ------------------------------- + stats=$(echo "{input.stats}" | sed -e "s/ / -stats /g") + gatk MergeMutectStats -stats $stats -O {output.stats} + + # Contatenate orientation tar files ------------------------------ + tmpdir=$(mktemp -d) + for tar_file in {input.f1r2} + do + abs_path=$(realpath $tar_file) + pushd $tmpdir + tar -zxvf $abs_path + popd + done + tar -zcvf {output.f1r2} -C $tmpdir . + rm -rf $tmpdir + + # Compute md5 sums ----------------------------------------------- + pushd $(dirname {output.raw}) + for f in *; do md5sum $f > $f.md5; done + popd + """ ) diff --git a/snappy_wrappers/wrappers/mutect2_par/run/wrapper.py b/snappy_wrappers/wrappers/mutect2_par/run/wrapper.py index 952008eeb..c008f5cf0 100644 --- a/snappy_wrappers/wrappers/mutect2_par/run/wrapper.py +++ b/snappy_wrappers/wrappers/mutect2_par/run/wrapper.py @@ -6,10 +6,3 @@ # Kick off execution using the wrapper class defined above. ParallelMutect2Wrapper(snakemake).run().shutdown_logging() - -# Compute MD5 sums of logs. -shell( - r""" -md5sum {snakemake.log.log} > {snakemake.log.log_md5} -""" -) diff --git a/tests/snappy_pipeline/workflows/test_workflows_somatic_variant_calling.py b/tests/snappy_pipeline/workflows/test_workflows_somatic_variant_calling.py index 19d7b19e4..6795e9e7b 100644 --- a/tests/snappy_pipeline/workflows/test_workflows_somatic_variant_calling.py +++ b/tests/snappy_pipeline/workflows/test_workflows_somatic_variant_calling.py @@ -43,8 +43,11 @@ def minimal_config(): tools: - mutect - scalpel + - mutect2 scalpel: path_target_regions: /path/to/target/regions.bed + mutect2: + common_variants: /path/to/common_variants.vcf data_sets: first_batch: @@ -501,6 +504,7 @@ def test_scalpel_step_part_get_output_files(somatic_variant_calling_workflow): "full_vcf": base_name_out + ".full.vcf.gz", "full_vcf_md5": base_name_out + ".full.vcf.gz.md5", "tar": base_name_out + ".tar.gz", + "tar_md5": base_name_out + ".tar.gz.md5", "vcf_tbi": base_name_out + ".vcf.gz.tbi", "vcf_tbi_md5": base_name_out + ".vcf.gz.tbi.md5", "vcf": base_name_out + ".vcf.gz", @@ -936,9 +940,31 @@ def test_somatic_variant_calling_workflow(somatic_variant_calling_workflow): expected = [ tpl.format(mapper=mapper, var_caller=var_caller, i=i, t=t, ext=ext) for i, t in ((1, 1), (2, 1), (2, 2)) - for ext in ("vcf.gz", "vcf.gz.md5", "vcf.gz.tbi", "vcf.gz.tbi.md5") + for ext in ( + "vcf.gz", + "vcf.gz.md5", + "vcf.gz.tbi", + "vcf.gz.tbi.md5", + "full.vcf.gz", + "full.vcf.gz.md5", + "full.vcf.gz.tbi", + "full.vcf.gz.tbi.md5", + ) + for mapper in ("bwa",) + for var_caller in ("mutect", "scalpel", "mutect2") + ] + # add special cases + expected += [ + tpl.format(mapper=mapper, var_caller="mutect", i=i, t=t, ext=ext) + for i, t in ((1, 1), (2, 1), (2, 2)) + for ext in ("txt", "txt.md5", "wig", "wig.md5") + for mapper in ("bwa",) + ] + expected += [ + tpl.format(mapper=mapper, var_caller="scalpel", i=i, t=t, ext=ext) + for i, t in ((1, 1), (2, 1), (2, 2)) + for ext in ("tar.gz", "tar.gz.md5") for mapper in ("bwa",) - for var_caller in ("mutect", "scalpel") ] # add log files tpl = ( @@ -957,7 +983,7 @@ def test_somatic_variant_calling_workflow(somatic_variant_calling_workflow): "log.md5", ) for mapper in ("bwa",) - for var_caller in ("mutect", "scalpel") + for var_caller in ("mutect", "scalpel", "mutect2") ] expected = list(sorted(expected)) actual = list(sorted(somatic_variant_calling_workflow.get_result_files())) diff --git a/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel.snakemake b/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel.snakemake new file mode 100644 index 000000000..1e7436654 --- /dev/null +++ b/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel.snakemake @@ -0,0 +1,316 @@ +# PREAMBLE + + +shell.executable("/bin/bash") +shell.prefix("set -ex;") + +configfile: 'config.json' + +localrules: all + + +def multiply_time(day_time_str, factor): + # Check if time contains day, ex: '1-00:00:00' + if "-" in day_time_str: + arr_ = day_time_str.split("-") + days = int(arr_[0]) + time_str = arr_[1] + else: + days = 0 + time_str = day_time_str + + # Process based on time structure + arr_ = time_str.split(":") + if time_str.count(":") == 2: # hours:minutes:seconds + seconds = int(arr_[0]) * 60 * 60 + int(arr_[1]) * 60 + int(arr_[2]) + elif time_str.count(":") == 1: # minutes:seconds + seconds = int(arr_[0]) * 60 + int(arr_[1]) + elif time_str.count(":") == 0: # minutes + seconds = int(time_str) * 60 + else: + raise ValueError(f"Invalid time: {day_time_str}") + # Add days to second + seconds += days * 86400 + + # Apply factor + seconds = int(seconds * factor) + + # Normalise time + (norm_days, remainder) = divmod(seconds, 86400) + (hours, remainder) = divmod(remainder, 3600) + (minutes, seconds) = divmod(remainder, 60) + + # Fill string - example hour '7' -> '07' + h_str = str(hours).zfill(2) + m_str = str(minutes).zfill(2) + s_str = str(seconds).zfill(2) + + return "%d-%s:%s:%s" % (norm_days, h_str, m_str, s_str) + + +def multiply_memory(memory_str, factor): + memory_mb = None + suffixes = ( + ("k", 1e-3), + ("M", 1), + ("G", 1e3), + ("T", 1e6), + ) + for (suffix, mult) in suffixes: + if memory_str.endswith(suffix): + memory_mb = float(memory_str[:-1]) * mult + break + # No match, assume no suffix int + if not memory_mb: + memory_mb = float(memory_str) + return int(memory_mb * factor) + + +def resource_chunk_threads(wildcards): + '''Return the number of threads to use for running one chunk.''' + return 1 + +def resource_chunk_partition(wildcards): + '''Return the partition to use for running one chunk.''' + return 'medium' + +def resource_merge_threads(wildcards): + '''Return the number of threads to use for running merging.''' + return 1 + +def resource_merge_memory(wildcards): + '''Return the memory to use for running merging.''' + return '8G' + +def resource_merge_time(wildcards): + '''Return the time to use for running merging.''' + return '20:00:00' + +def resource_merge_partition(wildcards): + '''Return the partition to use for running merging.''' + return 'medium' + + +def resource_chunk_memory(wildcards, attempt): + '''Return the memory to use for running one chunk.''' + return multiply_memory('28G', attempt) + +def resource_chunk_time(wildcards, attempt): + '''Return the time to use for running one chunk.''' + return multiply_time('12:00:00', attempt) + + +rule all: + input: **{'vcf': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + + +# PARALLEL WORK + +rule chunk_0: + input: + **{'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'vcf': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + log: + **{'conda_info': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.log', 'log_md5': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'args': {'intervals': ['1:1-50,000,000']}} + + +rule chunk_1: + input: + **{'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'vcf': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + log: + **{'conda_info': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.log', 'log_md5': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'args': {'intervals': ['1:50,000,001-100,000,000']}} + + +rule chunk_2: + input: + **{'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'vcf': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + log: + **{'conda_info': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.log', 'log_md5': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'args': {'intervals': ['1:100,000,001-150,000,000']}} + + +rule chunk_3: + input: + **{'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'vcf': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + log: + **{'conda_info': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.log', 'log_md5': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'args': {'intervals': ['1:150,000,001-200,000,000']}} + + +rule chunk_4: + input: + **{'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'vcf': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + log: + **{'conda_info': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.log', 'log_md5': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'args': {'intervals': ['1:200,000,001-249,250,621']}} + + +# JOIN PARALLEL RESULTS + +rule merge_chunk_0: + input: **{'vcf': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz'], 'vcf_md5': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5'], 'vcf_tbi': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi'], 'vcf_tbi_md5': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5']} + output: **{'vcf': 'merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + threads: resource_merge_threads + resources: + time=resource_merge_time, + memory=resource_merge_memory, + partition=resource_merge_partition, + shell: + r''' + set -euo pipefail # Unofficial Bash strict mode + + # Merge chunks output ------------------------------------------ + +# Concatenate raw calls vcfs & index result ---------------------- +bcftools concat \ + --allow-overlaps \ + -d none \ + -o {output.vcf} \ + -O z \ + {input.vcf} +tabix -f {output.vcf} + +# Compute md5 sums ----------------------------------------------- +pushd $(dirname {output.vcf}) +f=$(basename {output.vcf}) +md5sum $f > $f.md5 +md5sum $f.tbi > $f.tbi.md5 +popd + + + # Save chunk logs in tarball ----------------------------------- + mkdir -p $(dirname merge_out.0.d/log/merge.tar.gz) + tar -zcvf merge_out.0.d/log/merge.tar.gz job_out.0.d/log/* job_out.1.d/log/* job_out.2.d/log/* + pushd $(dirname merge_out.0.d/log/merge.tar.gz) + f=$(basename merge_out.0.d/log/merge.tar.gz) + md5sum $f > $f.md5 + popd + ''' + + +rule merge_chunk_1: + input: **{'vcf': ['job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz'], 'vcf_md5': ['job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5'], 'vcf_tbi': ['job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi'], 'vcf_tbi_md5': ['job_out.3.d/out/tmp_3.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5']} + output: **{'vcf': 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + threads: resource_merge_threads + resources: + time=resource_merge_time, + memory=resource_merge_memory, + partition=resource_merge_partition, + shell: + r''' + set -euo pipefail # Unofficial Bash strict mode + + # Merge chunks output ------------------------------------------ + +# Concatenate raw calls vcfs & index result ---------------------- +bcftools concat \ + --allow-overlaps \ + -d none \ + -o {output.vcf} \ + -O z \ + {input.vcf} +tabix -f {output.vcf} + +# Compute md5 sums ----------------------------------------------- +pushd $(dirname {output.vcf}) +f=$(basename {output.vcf}) +md5sum $f > $f.md5 +md5sum $f.tbi > $f.tbi.md5 +popd + + + # Save chunk logs in tarball ----------------------------------- + mkdir -p $(dirname merge_out.1.d/log/merge.tar.gz) + tar -zcvf merge_out.1.d/log/merge.tar.gz job_out.3.d/log/* job_out.4.d/log/* + pushd $(dirname merge_out.1.d/log/merge.tar.gz) + f=$(basename merge_out.1.d/log/merge.tar.gz) + md5sum $f > $f.md5 + popd + ''' + + +rule merge_all: + input: **{'vcf': ['merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz'], 'vcf_md5': ['merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5'], 'vcf_tbi': ['merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi'], 'vcf_tbi_md5': ['merge_out.0.d/out/merge_0.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5', 'merge_out.1.d/out/merge_1.bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5']} + output: **{'vcf': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi', 'vcf_tbi_md5': '/work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} + log: **{'conda_info': '/work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': '/work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': '/work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': '/work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.conda_list.txt.md5', 'log': '/work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.log', 'log_md5': '/work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.log.md5'} + threads: resource_merge_threads + resources: + time=resource_merge_time, + memory=resource_merge_memory, + partition=resource_merge_partition, + shell: + r''' + set -euo pipefail # Unofficial Bash strict mode + + # Merge chunks output ------------------------------------------ + +# Concatenate raw calls vcfs & index result ---------------------- +bcftools concat \ + --allow-overlaps \ + -d none \ + -o {output.vcf} \ + -O z \ + {input.vcf} +tabix -f {output.vcf} + +# Compute md5 sums ----------------------------------------------- +pushd $(dirname {output.vcf}) +f=$(basename {output.vcf}) +md5sum $f > $f.md5 +md5sum $f.tbi > $f.tbi.md5 +popd + + + # Save chunk logs in tarball ----------------------------------- + mkdir -p $(dirname /work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.log.merge.tar.gz) + tar -zcvf /work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.log.merge.tar.gz merge_out.0.d/log/* merge_out.1.d/log/* + pushd $(dirname /work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.log.merge.tar.gz) + f=$(basename /work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1.log.merge.tar.gz) + md5sum $f > $f.md5 + popd + ''' + + +# EPILOGUE + diff --git a/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel_chunk.snakemake b/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel_chunk.snakemake deleted file mode 100644 index 43ec94572..000000000 --- a/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel_chunk.snakemake +++ /dev/null @@ -1,13 +0,0 @@ -rule chunk_0: - input: - 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam', - output: - touch("job_out.0.d/.done"), - **{'vcf': 'job_out.0.d/out/tmp_0.vcf.gz', 'vcf_md5': 'job_out.0.d/out/tmp_0.vcf.gz.md5', 'vcf_tbi': 'job_out.0.d/out/tmp_0.vcf.gz.tbi', 'vcf_tbi_md5': 'job_out.0.d/out/tmp_0.vcf.gz.tbi.md5'} - threads: resource_chunk_threads - resources: - time=resource_chunk_time, - memory=resource_chunk_memory, - partition=resource_chunk_partition, - params: - **{'args': {'intervals': ['1:1-50,010,000']}} diff --git a/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel_preamble.snakemake b/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel_preamble.snakemake deleted file mode 100644 index 36931db36..000000000 --- a/tests/snappy_wrappers/wrappers/data/mutect2_par_prepare_panel_preamble.snakemake +++ /dev/null @@ -1,49 +0,0 @@ -rule merge_all: - input: ['job_out.0.d/out/tmp_0.vcf.gz', 'job_out.1.d/out/tmp_1.vcf.gz', 'job_out.2.d/out/tmp_2.vcf.gz', 'job_out.3.d/out/tmp_3.vcf.gz', 'job_out.4.d/out/tmp_4.vcf.gz'] - output: **{'vcf': '/work/bwa.mutect2.prepare_panel/out/P001-N1-DNA1-WGS1.vcf.gz', 'vcf_md5': '/work/bwa.mutect2.prepare_panel/out/P001-N1-DNA1-WGS1.vcf.gz.md5', 'vcf_tbi': '/work/bwa.mutect2.prepare_panel/out/P001-N1-DNA1-WGS1.vcf.tbi.gz', 'vcf_tbi_md5': '/work/bwa.mutect2.prepare_panel/out/P001-N1-DNA1-WGS1.vcf.gz.tbi.md5'} - threads: resource_merge_threads - resources: - time=resource_merge_time, - memory=resource_merge_memory, - partition=resource_merge_partition, - log: **{'conda_info': '/work/bwa.mutect2.prepare_panel/log/P001-N1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': '/work/bwa.mutect2.prepare_panel/log/P001-N1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': '/work/bwa.mutect2.prepare_panel/log/P001-N1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': '/work/bwa.mutect2.prepare_panel/log/P001-N1-DNA1-WGS1.conda_list.txt.md5'} - shell: - r''' - set -euo pipefail # Unofficial Bash strict mode - - # Initialize output directory ----------------------------------------- - - outdir=$(basename {output.vcf}) - - mkdir -p output - - # Concatenate files --------------------------------------------------- - bcftools concat \ - --allow-overlaps \ - -d none \ - -o output/out.vcf.gz \ - -O z \ - {input} - - tabix -f output/out.vcf.gz - - pushd output - for f in *; do md5sum $f >$f.md5; done - popd - - # Move to output directory -------------------------------------------- - mkdir -p $(dirname {output.vcf}) - mv output/out.vcf.gz {output.vcf} - mv output/out.vcf.gz.md5 {output.vcf_md5} - mv output/out.vcf.gz.tbi {output.vcf_tbi} - mv output/out.vcf.gz.tbi.md5 {output.vcf_tbi_md5} - - # Write out information about conda installation. - conda list >{log.conda_list} - conda info >{log.conda_info} - - pushd $(dirname {log.conda_list}) - md5sum $(basename {log.conda_list}) >$(basename {log.conda_list}).md5 - md5sum $(basename {log.conda_info}) >$(basename {log.conda_info}).md5 - popd - ''' diff --git a/tests/snappy_wrappers/wrappers/data/mutect2_par_run.snakemake b/tests/snappy_wrappers/wrappers/data/mutect2_par_run.snakemake index 0c00a1c92..558131679 100644 --- a/tests/snappy_wrappers/wrappers/data/mutect2_par_run.snakemake +++ b/tests/snappy_wrappers/wrappers/data/mutect2_par_run.snakemake @@ -1,80 +1,248 @@ -rule merge_all: - input: ['job_out.0.d/out/tmp_0.vcf.gz', 'job_out.1.d/out/tmp_1.vcf.gz', 'job_out.2.d/out/tmp_2.vcf.gz', 'job_out.3.d/out/tmp_3.vcf.gz', 'job_out.4.d/out/tmp_4.vcf.gz'] - output: **{'raw': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} - threads: resource_merge_threads - resources: - time=resource_merge_time, - memory=resource_merge_memory, - partition=resource_merge_partition, - log: **{'conda_info': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5'} - shell: - r''' - set -euo pipefail # Unofficial Bash strict mode +# PREAMBLE - # Initialize output directory ----------------------------------------- - outdir=$(basename {output.raw}) +shell.executable("/bin/bash") +shell.prefix("set -ex;") - mkdir -p output +configfile: 'config.json' - # Concatenate VCF files ----------------------------------------------- +localrules: all - bcftools concat \ - --allow-overlaps \ - -d none \ - -o output/out.vcf.gz \ - -O z \ - {input} - # Concatenate stats --------------------------------------------------- +def multiply_time(day_time_str, factor): + # Check if time contains day, ex: '1-00:00:00' + if "-" in day_time_str: + arr_ = day_time_str.split("-") + days = int(arr_[0]) + time_str = arr_[1] + else: + days = 0 + time_str = day_time_str + + # Process based on time structure + arr_ = time_str.split(":") + if time_str.count(":") == 2: # hours:minutes:seconds + seconds = int(arr_[0]) * 60 * 60 + int(arr_[1]) * 60 + int(arr_[2]) + elif time_str.count(":") == 1: # minutes:seconds + seconds = int(arr_[0]) * 60 + int(arr_[1]) + elif time_str.count(":") == 0: # minutes + seconds = int(time_str) * 60 + else: + raise ValueError(f"Invalid time: {day_time_str}") + # Add days to second + seconds += days * 86400 + + # Apply factor + seconds = int(seconds * factor) + + # Normalise time + (norm_days, remainder) = divmod(seconds, 86400) + (hours, remainder) = divmod(remainder, 3600) + (minutes, seconds) = divmod(remainder, 60) + + # Fill string - example hour '7' -> '07' + h_str = str(hours).zfill(2) + m_str = str(minutes).zfill(2) + s_str = str(seconds).zfill(2) + + return "%d-%s:%s:%s" % (norm_days, h_str, m_str, s_str) + + +def multiply_memory(memory_str, factor): + memory_mb = None + suffixes = ( + ("k", 1e-3), + ("M", 1), + ("G", 1e3), + ("T", 1e6), + ) + for (suffix, mult) in suffixes: + if memory_str.endswith(suffix): + memory_mb = float(memory_str[:-1]) * mult + break + # No match, assume no suffix int + if not memory_mb: + memory_mb = float(memory_str) + return int(memory_mb * factor) + + +def resource_chunk_threads(wildcards): + '''Return the number of threads to use for running one chunk.''' + return 1 + +def resource_chunk_partition(wildcards): + '''Return the partition to use for running one chunk.''' + return 'medium' + +def resource_merge_threads(wildcards): + '''Return the number of threads to use for running merging.''' + return 1 + +def resource_merge_memory(wildcards): + '''Return the memory to use for running merging.''' + return '64G' + +def resource_merge_time(wildcards): + '''Return the time to use for running merging.''' + return '5:00:00' + +def resource_merge_partition(wildcards): + '''Return the partition to use for running merging.''' + return 'medium' + + +def resource_chunk_memory(wildcards, attempt): + '''Return the memory to use for running one chunk.''' + return multiply_memory('28G', attempt) + +def resource_chunk_time(wildcards, attempt): + '''Return the time to use for running one chunk.''' + return multiply_time('3:00:00', attempt) + - chunks=$(echo "{input}" | sed -e "s/\.vcf\.gz/.vcf.stats/g" | sed -e "s/ / -stats /g") - stats="output/out.vcf.stats" - gatk MergeMutectStats -stats $chunks -O $stats - - # Concatenate f1r2 tar files ------------------------------------------ +rule all: + input: **{'raw': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + + +# PARALLEL WORK + +rule chunk_0: + input: + **{'tumor_bai': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam.bai', 'tumor_bam': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam', 'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'raw': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': 'job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + log: + **{'conda_info': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.log', 'log_md5': 'job_out.0.d/log/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'normal_lib_name': 'P001-N1-DNA1-WGS1', 'args': {'intervals': ['1:1-50,000,000']}} + + +rule chunk_1: + input: + **{'tumor_bai': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam.bai', 'tumor_bam': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam', 'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'raw': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + log: + **{'conda_info': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.log', 'log_md5': 'job_out.1.d/log/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'normal_lib_name': 'P001-N1-DNA1-WGS1', 'args': {'intervals': ['1:50,000,001-100,000,000']}} + + +rule chunk_2: + input: + **{'tumor_bai': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam.bai', 'tumor_bam': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam', 'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'raw': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + log: + **{'conda_info': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.log', 'log_md5': 'job_out.2.d/log/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'normal_lib_name': 'P001-N1-DNA1-WGS1', 'args': {'intervals': ['1:100,000,001-150,000,000']}} + + +rule chunk_3: + input: + **{'tumor_bai': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam.bai', 'tumor_bam': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam', 'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'raw': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + log: + **{'conda_info': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.log', 'log_md5': 'job_out.3.d/log/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'normal_lib_name': 'P001-N1-DNA1-WGS1', 'args': {'intervals': ['1:150,000,001-200,000,000']}} + + +rule chunk_4: + input: + **{'tumor_bai': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam.bai', 'tumor_bam': 'NGS_MAPPING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1.bam', 'normal_bai': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam.bai', 'normal_bam': 'NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam'} + output: + **{'raw': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + log: + **{'conda_info': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5', 'log': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.log', 'log_md5': 'job_out.4.d/log/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.log.md5'} + threads: resource_chunk_threads + resources: + time=resource_chunk_time, + memory=resource_chunk_memory, + partition=resource_chunk_partition, + params: + **{'normal_lib_name': 'P001-N1-DNA1-WGS1', 'args': {'intervals': ['1:200,000,001-249,250,621']}} - tar_dir="output/out.f1r2_tar" - mkdir -p $tar_dir - pushd $tar_dir +# JOIN PARALLEL RESULTS - chunks=$(echo "{input}" | sed -e "s/\.vcf\.gz/.f1r2_tar.tar.gz/g" | sed -re "s/(job|merge)_out\.([0-9]+)\.d/..\/..\/\1_out.\2.d/g") - for chunk in $chunks - do - tar -zxvf $chunk - done - tar -zcvf ../out.f1r2_tar.tar.gz * +rule merge_all: + input: **{'raw': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz'], 'raw_md5': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5'], 'raw_tbi': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi'], 'raw_tbi_md5': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5'], 'stats': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats'], 'stats_md5': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5'], 'f1r2': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz'], 'f1r2_md5': ['job_out.0.d/out/tmp_0.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5', 'job_out.1.d/out/tmp_1.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5', 'job_out.2.d/out/tmp_2.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5', 'job_out.3.d/out/tmp_3.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5', 'job_out.4.d/out/tmp_4.bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5']} + output: **{'raw': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz', 'raw_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.md5', 'raw_tbi': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi', 'raw_tbi_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.gz.tbi.md5', 'stats': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats', 'stats_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.vcf.stats.md5', 'f1r2': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz', 'f1r2_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/out/bwa.mutect2.P001-T1-DNA1-WGS1.raw.f1r2_tar.tar.gz.md5'} + log: **{'conda_info': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.conda_list.txt.md5', 'log': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.log', 'log_md5': '/work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.log.md5'} + threads: resource_merge_threads + resources: + time=resource_merge_time, + memory=resource_merge_memory, + partition=resource_merge_partition, + shell: + r''' + set -euo pipefail # Unofficial Bash strict mode + # Merge chunks output ------------------------------------------ + +# Concatenate raw calls vcfs & index result ---------------------- +bcftools concat \ + --allow-overlaps \ + -d none \ + -o {output.raw} \ + -O z \ + {input.raw} +tabix -f {output.raw} + +# Concatenate stats with GATK tool ------------------------------- +stats=$(echo "{input.stats}" | sed -e "s/ / -stats /g") +gatk MergeMutectStats -stats $stats -O {output.stats} + +# Contatenate orientation tar files ------------------------------ +tmpdir=$(mktemp -d) +for tar_file in {input.f1r2} +do + abs_path=$(realpath $tar_file) + pushd $tmpdir + tar -zxvf $abs_path + popd +done +tar -zcvf {output.f1r2} -C $tmpdir . +rm -rf $tmpdir + +# Compute md5 sums ----------------------------------------------- +pushd $(dirname {output.raw}) +for f in *; do md5sum $f > $f.md5; done +popd + + + # Save chunk logs in tarball ----------------------------------- + mkdir -p $(dirname /work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.log.merge.tar.gz) + tar -zcvf /work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.log.merge.tar.gz job_out.0.d/log/* job_out.1.d/log/* job_out.2.d/log/* job_out.3.d/log/* job_out.4.d/log/* + pushd $(dirname /work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.log.merge.tar.gz) + f=$(basename /work/bwa.mutect2.P001-T1-DNA1-WGS1/log/bwa.mutect2.P001-T1-DNA1-WGS1.log.merge.tar.gz) + md5sum $f > $f.md5 popd - rm -rf $tar_dir + ''' - # tabix index & md5 checksums ----------------------------------------- - tabix -f output/out.vcf.gz +# EPILOGUE - pushd output - for f in *; do md5sum $f >$f.md5; done - popd - - # Move to output directory -------------------------------------------- - - mkdir -p $(dirname {output.raw}) - mv output/out.vcf.gz {output.raw} - mv output/out.vcf.gz.md5 {output.raw_md5} - mv output/out.vcf.gz.tbi {output.raw_tbi} - mv output/out.vcf.gz.tbi.md5 {output.raw_tbi_md5} - mv output/out.vcf.stats {output.stats} - mv output/out.vcf.stats.md5 {output.stats_md5} - mv output/out.f1r2_tar.tar.gz {output.f1r2} - mv output/out.f1r2_tar.tar.gz.md5 {output.f1r2_md5} - - # Write out information about conda installation. - conda list >{log.conda_list} - conda info >{log.conda_info} - - pushd $(dirname {log.conda_list}) - md5sum $(basename {log.conda_list}) >$(basename {log.conda_list}).md5 - md5sum $(basename {log.conda_info}) >$(basename {log.conda_info}).md5 - popd - ''' diff --git a/tests/snappy_wrappers/wrappers/test_mutect2_par_prepare_panel.py b/tests/snappy_wrappers/wrappers/test_mutect2_par_prepare_panel.py index c7ada0a72..2ce13cb39 100644 --- a/tests/snappy_wrappers/wrappers/test_mutect2_par_prepare_panel.py +++ b/tests/snappy_wrappers/wrappers/test_mutect2_par_prepare_panel.py @@ -3,6 +3,7 @@ import importlib.machinery import os from pathlib import Path +import re import tempfile import textwrap import types @@ -85,10 +86,10 @@ def minimal_config(): def snakemake_output_dict(): """Returns dictionary that defined snakemake.output""" return { - "vcf": "work/{mapper}.mutect2.prepare_panel/out/{normal_library}.vcf.gz", - "vcf_md5": "work/{mapper}.mutect2.prepare_panel/out/{normal_library}.vcf.gz.md5", - "vcf_tbi": "work/{mapper}.mutect2.prepare_panel/out/{normal_library}.vcf.tbi.gz", - "vcf_tbi_md5": "work/{mapper}.mutect2.prepare_panel/out/{normal_library}.vcf.gz.tbi.md5", + "vcf": "work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz", + "vcf_md5": "work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.md5", + "vcf_tbi": "work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi", + "vcf_tbi_md5": "work/bwa.mutect2/out/bwa.mutect2.P001-N1-DNA1-WGS1.vcf.gz.tbi.md5", } @@ -96,7 +97,7 @@ def snakemake_output_dict(): def snakemake_obj(minimal_config, snakemake_output_dict): """Returns Snakemake object.""" # Define helper variables - rule_name = "somatic_variant_calling_mutect2_run" + rule_name = "panel_of_normals_mutect2_prepare_panel" threads = 2 bench_iteration = 2 script_dir = "/work" @@ -105,7 +106,7 @@ def snakemake_obj(minimal_config, snakemake_output_dict): "normal_bam": "NGS_MAPPING/output/bwa.P001-N1-DNA1-WGS1/out/bwa.P001-N1-DNA1-WGS1.bam", } - log_base_name = "work/{mapper}.mutect2.prepare_panel/log/{normal_library}" + log_base_name = "work/bwa.mutect2/log/bwa.mutect2.P001-N1-DNA1-WGS1" log_dict = { "conda_info": log_base_name + ".conda_info.txt", "conda_info_md5": log_base_name + ".conda_info.txt.md5", @@ -179,50 +180,25 @@ def construct_preamble_module( return module -def test_mutect2_wrapper_prepare_panel_construct_parallel_rules( - snakemake_obj, variant_caller_fake_fs, mocker -): - """Tests ParallelMutect2Wrapper.construct_merge_rule()""" +def test_mutect2_wrapper_prepare_panel_joint_chunks(snakemake_obj, variant_caller_fake_fs, mocker): + """Tests ParallelMutect2Wrapper.joint_chunks()""" # Patch out file-system patch_module_fs("snappy_wrappers.wrapper_parallel", variant_caller_fake_fs, mocker) wrapper_par = ParallelMutect2Wrapper(snakemake=snakemake_obj) + # Force level_one + wrapper_par.merge_block_size = 3 # Define expected - data_path = (Path(__file__).parent / "data/mutect2_par_prepare_panel_chunk.snakemake").resolve() - with open(data_path, "r", encoding="utf8") as f: - expected = f.read() - # Get actual and assert if `rule chunk_0` is correct - # Note: It is not feasible to test all chunks as the `wrapper` will be set to a local file - _tmp_actual = list(wrapper_par.construct_parallel_rules())[0] - actual = ( - "\n".join( - [line for line in _tmp_actual.split("\n") if not ("wrapper" in line or line == "")] - ) - + "\n" - ) - assert actual == expected - - -def test_mutect2_wrapper_run_construct_merge_rule(snakemake_obj, variant_caller_fake_fs, mocker): - """Tests ParallelMutect2Wrapper.construct_merge_rule()""" - # Patch out file-system - patch_module_fs("snappy_wrappers.wrapper_parallel", variant_caller_fake_fs, mocker) - wrapper_par = ParallelMutect2Wrapper(snakemake=snakemake_obj) - # Define expected - data_path = ( - Path(__file__).parent / "data/mutect2_par_prepare_panel_preamble.snakemake" - ).resolve() + data_path = (Path(__file__).parent / "data/mutect2_par_prepare_panel.snakemake").resolve() with open(data_path, "r", encoding="utf8") as f: expected = f.read() + # Remove wrapper lines that contain path to script + remove_wrapper = re.compile(" +wrapper: [^\n]+\n") # Get actual and assert - actual = ( - wrapper_par.construct_merge_rule() - .replace("{mapper}", snakemake_obj.wildcards["mapper"]) - .replace("{normal_library}", snakemake_obj.wildcards["normal_library"]) - ) + actual = remove_wrapper.sub("", wrapper_par.joint_chunks()) assert actual == expected -def test_mutect2_wrapper_run_preamble_multiply_time(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_multiply_time(construct_preamble_module): """Tests Parallel Preamble multiply_time()""" # Constant factor factor = 10 @@ -250,7 +226,7 @@ def test_mutect2_wrapper_run_preamble_multiply_time(construct_preamble_module): construct_preamble_module.multiply_time(day_time_str="_not_a_valid_time", factor=factor) -def test_mutect2_wrapper_run_preamble_multiply_memory(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_multiply_memory(construct_preamble_module): """Tests Parallel Preamble multiply_memory()""" # Constant factor factor = 10 @@ -274,14 +250,14 @@ def test_mutect2_wrapper_run_preamble_multiply_memory(construct_preamble_module) assert actual == _expected, msg -def test_mutect2_wrapper_run_preamble_resource_chunk_threads(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_chunk_threads(construct_preamble_module): """Tests Parallel Preamble resource_chunk_threads() - Chunks always get a single thread""" expected = 1 actual = construct_preamble_module.resource_chunk_threads(wildcards=None) assert actual == expected -def test_mutect2_wrapper_run_preamble_resource_chunk_memory(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_chunk_memory(construct_preamble_module): """ Tests Parallel Preamble resource_chunk_memory() - Baseline memory defined in Snakemake object """ @@ -302,7 +278,7 @@ def test_mutect2_wrapper_run_preamble_resource_chunk_memory(construct_preamble_m assert actual == _expected, msg -def test_mutect2_wrapper_run_preamble_resource_chunk_time(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_chunk_time(construct_preamble_module): """ Tests Parallel Preamble resource_chunk_time() - Baseline time defined in Snakemake object """ @@ -323,35 +299,35 @@ def test_mutect2_wrapper_run_preamble_resource_chunk_time(construct_preamble_mod assert actual == _expected, msg -def test_mutect2_wrapper_run_preamble_resource_chunk_partition(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_chunk_partition(construct_preamble_module): """Tests Parallel Preamble resource_chunk_partition()""" expected = "medium" actual = construct_preamble_module.resource_chunk_partition(wildcards=None) assert actual == expected -def test_mutect2_wrapper_run_preamble_resource_merge_threads(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_merge_threads(construct_preamble_module): """Tests Parallel Preamble resource_merge_threads() - Merge always get a single thread""" expected = 1 actual = construct_preamble_module.resource_merge_threads(wildcards=None) assert actual == expected -def test_mutect2_wrapper_run_preamble_resource_merge_memory(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_merge_memory(construct_preamble_module): """Tests Parallel Preamble resource_merge_memory() - Merge always get the same value""" expected = "8G" actual = construct_preamble_module.resource_merge_memory(wildcards=None) assert actual == expected -def test_mutect2_wrapper_run_preamble_resource_merge_time(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_merge_time(construct_preamble_module): """Tests Parallel Preamble resource_merge_time() - Merge always get the same value""" expected = "20:00:00" actual = construct_preamble_module.resource_merge_time(wildcards=None) assert actual == expected -def test_mutect2_wrapper_run_preamble_resource_merge_partition(construct_preamble_module): +def test_mutect2_wrapper_prepare_panel_preamble_resource_merge_partition(construct_preamble_module): """Tests Parallel Preamble resource_merge_partition()""" expected = "medium" actual = construct_preamble_module.resource_merge_partition(wildcards=None) diff --git a/tests/snappy_wrappers/wrappers/test_mutect2_par_run.py b/tests/snappy_wrappers/wrappers/test_mutect2_par_run.py index cd22ec5a4..023e51bcd 100644 --- a/tests/snappy_wrappers/wrappers/test_mutect2_par_run.py +++ b/tests/snappy_wrappers/wrappers/test_mutect2_par_run.py @@ -3,6 +3,7 @@ import importlib.machinery import os from pathlib import Path +import re import tempfile import textwrap import types @@ -44,8 +45,8 @@ def minimal_config(): - mutect2 mutect2: panel_of_normals: '' # Set path to panel of normals vcf if required - germline_resource: REQUIRED # Germline variants resource (same as panel of normals) - common_variants: REQUIRED # Common germline variants for contamination estimation + germline_resource: '' # Germline variants resource (same as panel of normals) + common_variants: '' # Common germline variants for contamination estimation # Parallelization configuration num_cores: 2 # number of cores to use locally window_length: 50000000 # split input into windows of this size, each triggers a job @@ -180,8 +181,8 @@ def construct_preamble_module(snakemake_obj, variant_caller_fake_fs, mocker): return module -def test_mutect2_wrapper_run_construct_merge_rule(snakemake_obj, variant_caller_fake_fs, mocker): - """Tests ParallelMutect2Wrapper.construct_merge_rule()""" +def test_mutect2_wrapper_run_joint_chunks(snakemake_obj, variant_caller_fake_fs, mocker): + """Tests ParallelMutect2Wrapper.joint_chunks()""" # Patch out file-system patch_module_fs("snappy_wrappers.wrapper_parallel", variant_caller_fake_fs, mocker) wrapper_par = ParallelMutect2Wrapper(snakemake=snakemake_obj) @@ -189,8 +190,10 @@ def test_mutect2_wrapper_run_construct_merge_rule(snakemake_obj, variant_caller_ data_path = (Path(__file__).parent / "data/mutect2_par_run.snakemake").resolve() with open(data_path, "r", encoding="utf8") as f: expected = f.read() + # Remove wrapper lines that contain path to script + remove_wrapper = re.compile(" +wrapper: [^\n]+\n") # Get actual and assert - actual = wrapper_par.construct_merge_rule() + actual = remove_wrapper.sub("", wrapper_par.joint_chunks()) assert actual == expected