From 6f94e569bd768b9b2260c1de80304dc8a6d783a7 Mon Sep 17 00:00:00 2001 From: subwaystation Date: Tue, 30 Nov 2021 14:21:59 +0100 Subject: [PATCH 1/2] parallelize process wfmash, wfmash can be run exclusively --- .github/workflows/ci.yml | 2 + Dockerfile | 2 + bin/split_approx_mappings_in_chunks.py | 55 +++++++++ main.nf | 147 ++++++++++++++++++++----- nextflow.config | 2 + nextflow_schema.json | 11 +- 6 files changed, 189 insertions(+), 30 deletions(-) create mode 100644 bin/split_approx_mappings_in_chunks.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a809ce0..6f342a0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,3 +60,5 @@ jobs: nextflow run ${GITHUB_WORKSPACE} -profile test,docker --smoothxg consensus_spec 10,100,1000 nextflow run ${GITHUB_WORKSPACE} -profile test,docker --vcf_spec "gi|568815561:#,gi|568815567:#" nextflow run ${GITHUB_WORKSPACE} -profile test,docker --smoothxg_write_maf + nextflow run ${GITHUB_WORKSPACE} -profile test,docker --wfmash_chunks 2 + nextflow run ${GITHUB_WORKSPACE} -profile test,docker --wfmash_only diff --git a/Dockerfile b/Dockerfile index a0d2e1cb..0845e437 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,8 @@ RUN apt-get update \ procps \ && apt-get clean -y && rm -rf /var/lib/apt/lists/* +COPY bin/split_approx_mappings_in_chunks.py / + # Install miniconda RUN wget \ https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \ diff --git a/bin/split_approx_mappings_in_chunks.py b/bin/split_approx_mappings_in_chunks.py new file mode 100644 index 00000000..869492d9 --- /dev/null +++ b/bin/split_approx_mappings_in_chunks.py @@ -0,0 +1,55 @@ +# Usage +# Run: +# python3 split_approx_mappings_in_chunks.py approximate_mappings.paf 4 +# It will generate the following files: +# approximate_mappings.paf.chunk_0.paf +# approximate_mappings.paf.chunk_1.paf +# approximate_mappings.paf.chunk_2.paf +# approximate_mappings.paf.chunk_3.paf + +import sys + +# The script that takes the approximate mappings, weighs each mapping by computing its length * (1 - estimated identity), +# then creates N new files where the mapping sets have a similar sum of weights. + +def split_chunks(l, n): + result = [[] for i in range(n)] + sums = [0] * n + i = 0 + for e in l: + result[i].append(e) + sums[i] += e[1] + i = sums.index(min(sums)) + return result + + +if __name__ == '__main__': + path_approx_mappings = sys.argv[1] + num_of_chunks = int(sys.argv[2]) + + rank_to_mapping_dict = {} + mapping_list = [] + + with open(path_approx_mappings) as f: + for rank, line in enumerate(f): + # We could avoid keeping everything in memory by reading the file again later + rank_to_mapping_dict[rank] = line + + _, _, query_start, query_end, _, _, _, target_start, target_end, _, _, _, estimated_identity = line.strip().split('\t') + + num_mapped_bases = max(int(query_end) - int(query_start), int(target_end) - int(target_start)) + estimated_identity = float(estimated_identity.split('id:f:')[1]) / 100.0 + + # High divergence makes alignment more difficult + weight = num_mapped_bases * (1 - estimated_identity) + + mapping_list.append((rank, weight)) + + # Chunk the tuples by looking at their weigths + chunk_list = split_chunks(mapping_list, num_of_chunks) + + # Collect the ranks from the tuples to generate balanced chunks + for num_chunk, element_list in enumerate(chunk_list): + with open(path_approx_mappings + f'.chunk_{num_chunk}.paf', 'w') as fw: + for rank, _ in element_list: + fw.write(rank_to_mapping_dict[rank]) diff --git a/main.nf b/main.nf index 2e2f1cdd..c4fc67ba 100644 --- a/main.nf +++ b/main.nf @@ -47,6 +47,68 @@ if (!params.smoothxg_num_haps) { n_haps = params.wfmash_n_mappings } +process wfmashMap { + publishDir "${params.outdir}/wfmash_map", mode: "${params.publish_dir_mode}" + + input: + tuple val(f), path(fasta) + + output: + tuple val(f), path("${f}.${wfmash_prefix}.map.paf") + + """ + wfmash ${wfmash_exclude_cmd} \ + -s ${params.wfmash_segment_length} \ + -l ${params.wfmash_block_length} \ + ${wfmash_merge_cmd} \ + ${wfmash_split_cmd} \ + -p ${params.wfmash_map_pct_id} \ + -n ${params.wfmash_n_mappings} \ + -k ${params.wfmash_mash_kmer} \ + -t ${task.cpus} \ + -m \ + $fasta $fasta \ + >${f}.${wfmash_prefix}.map.paf + """ +} + +process splitApproxMappingsInChunks { + publishDir "${params.outdir}/wfmash_chunks", mode: "${params.publish_dir_mode}" + + input: + tuple val(f), path(paf) + output: + path("${f}*.chunk_*.paf") + """ + python3 /split_approx_mappings_in_chunks.py $paf ${params.wfmash_chunks} + """ +} + +process wfmashAlign { + publishDir "${params.outdir}/wfmash_align", mode: "${params.publish_dir_mode}" + + input: + tuple val(f), path(fasta), path(paf) + + output: + path("${paf}.align.paf") + + """ + wfmash ${wfmash_exclude_cmd} \ + -s ${params.wfmash_segment_length} \ + -l ${params.wfmash_block_length} \ + ${wfmash_merge_cmd} \ + ${wfmash_split_cmd} \ + -p ${params.wfmash_map_pct_id} \ + -n ${params.wfmash_n_mappings} \ + -k ${params.wfmash_mash_kmer} \ + -t ${task.cpus} \ + -i $paf \ + $fasta $fasta \ + >${paf}.align.paf + """ +} + process wfmash { publishDir "${params.outdir}/wfmash", mode: "${params.publish_dir_mode}" @@ -76,17 +138,23 @@ process seqwish { input: tuple val(f), path(fasta) - path(wfmash) + path(pafs) output: tuple val(f), path("${f}${seqwish_prefix}.gfa") script: """ + if [[ \$(ls *.paf | wc -l) == 1 ]]; then + input=$pafs + else + input=\$(ls *.paf | tr '\\\n' ',') + input=\${input::-1} + fi seqwish \ -t ${task.cpus} \ -s $fasta \ - -p $wfmash \ + -p \$input \ -k ${params.seqwish_min_match_length} \ -g ${f}${seqwish_prefix}.gfa -P \ -B ${params.seqwish_transclose_batch} \ @@ -319,34 +387,52 @@ process multiQC { workflow { main: - wfmash(fasta) - seqwish(fasta, wfmash.out.collect{it[1]}) - smoothxg(seqwish.out) - gfaffix(smoothxg.out.gfa_smooth) - - odgiBuild(seqwish.out.collect{it[1]}.mix(smoothxg.out.consensus_smooth.flatten(), gfaffix.out.gfa_norm)) - odgiStats(odgiBuild.out) - - odgiVizOut = Channel.empty() - if (do_1d) { - odgiVizOut = odgiViz(odgiBuild.out.filter( ~/.*smoothxg.*/ )) - } - odgiDrawOut = Channel.empty() - if (do_2d) { - odgiLayout(odgiBuild.out.filter( ~/.*smoothxg.*/ )) - odgiDrawOut = odgiDraw(odgiLayout.out) - } - - if (params.vcf_spec != false) { - vg_deconstruct(gfaffix.out.gfa_norm) + if (params.wfmash_only) { + // TODO Once we changed the way we changed the publish_dir_mode, we have to emit the .paf file as default, else not + if (params.wfmash_chunks == 1) { + wfmash(fasta) + } else { + wfmashMap(fasta) + splitApproxMappingsInChunks(wfmashMap.out) + wfmashAlign(fasta.combine(splitApproxMappingsInChunks.out.flatten())) + } + } else { + if (params.wfmash_chunks == 1) { + wfmash(fasta) + seqwish(fasta, wfmash.out.collect{it[1]}) + } else { + wfmashMap(fasta) + splitApproxMappingsInChunks(wfmashMap.out) + wfmashAlign(fasta.combine(splitApproxMappingsInChunks.out.flatten())) + seqwish(fasta, wfmashAlign.out.collect()) + } + smoothxg(seqwish.out) + gfaffix(smoothxg.out.gfa_smooth) + + odgiBuild(seqwish.out.collect{it[1]}.mix(smoothxg.out.consensus_smooth.flatten(), gfaffix.out.gfa_norm)) + odgiStats(odgiBuild.out) + + odgiVizOut = Channel.empty() + if (do_1d) { + odgiVizOut = odgiViz(odgiBuild.out.filter( ~/.*smoothxg.*/ )) + } + odgiDrawOut = Channel.empty() + if (do_2d) { + odgiLayout(odgiBuild.out.filter( ~/.*smoothxg.*/ )) + odgiDrawOut = odgiDraw(odgiLayout.out) + } + + if (params.vcf_spec != false) { + vg_deconstruct(gfaffix.out.gfa_norm) + } + + multiQC( + odgiStats.out.collect().ifEmpty([]), + odgiVizOut.collect().ifEmpty([]), + odgiDrawOut.collect().ifEmpty([]), + ch_multiqc_config + ) } - - multiQC( - odgiStats.out.collect().ifEmpty([]), - odgiVizOut.collect().ifEmpty([]), - odgiDrawOut.collect().ifEmpty([]), - ch_multiqc_config - ) } // /* @@ -388,6 +474,9 @@ def helpMessage() { --wfmash_no_splits disable splitting of input sequences during mapping [default: OFF] --wfmash_exclude--delim [c] skip mappings between sequences with the same name prefix before the given delimiter character [default: all-vs-all and !self] + --wfmash_chunks The number of files to generate from the approximate wfmash mappings to scale across a whole cluster. It is recommended to set this to the number of available nodes. If only one machine is available, leave it at 1. [default: 1] + --wfmash_only If this parameter is set, only the wfmash alignment step of the pipeline is executed. This option is offered for users who want to use wfmash on a cluster. [default: OFF] + Seqwish options: --seqwish_min_match_length [n] ignore exact matches below this length [default: 47] --seqwish_transclose_batch [n] number of bp to use for transitive closure batch [default: 10000000] diff --git a/nextflow.config b/nextflow.config index d5d9bbd0..5ad6b56f 100644 --- a/nextflow.config +++ b/nextflow.config @@ -31,6 +31,8 @@ params { wfmash_merge_segments = false wfmash_no_splits = false wfmash_exclude_delim = false + wfmash_chunks = 1 + wfmash_only = false // Seqwish options seqwish_min_match_length = 47 diff --git a/nextflow_schema.json b/nextflow_schema.json index 0708edfd..84f105bc 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -84,6 +84,15 @@ "type": "string", "description": "skip mappings between sequences with the same name prefix before the given delimiter character", "fa_icon": "fas fa-align-center" + }, + "wfmash_chunks": { + "type": "integer", + "default": 1, + "description": "The number of files to generate from the approximate wfmash mappings to scale across a whole cluster. It is recommended to set this to the number of available nodes. If only one machine is available, leave it at 1." + }, + "wfmash_only": { + "type": "boolean", + "description": "If this parameter is set, only the wfmash alignment step of the pipeline is executed. This option is offered for users who want to use wfmash on a cluster." } } }, @@ -146,7 +155,7 @@ }, "smoothxg_block_ratio_min": { "type": "number", - "default": 0.0, + "default": 0, "description": "minimum small / large length ratio to cluster in a block" }, "smoothxg_block_id_min": { From f42f2131ef8c50fecce91ecfecde59ddb8b812d6 Mon Sep 17 00:00:00 2001 From: subwaystation Date: Tue, 30 Nov 2021 14:28:48 +0100 Subject: [PATCH 2/2] add missing reference ODGI --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 9f2fc3fc..52cc788f 100644 --- a/README.md +++ b/README.md @@ -98,4 +98,10 @@ You can cite the `nf-core` publication as follows: In addition, references of tools and data used in this pipeline are as follows: +> **ODGI: understanding pangenome graphs.** +> +> Andrea Guarracino, Simon Heumos, Sven Nahnsen, Pjotr Prins & Erik Garrison. +> +> _bioRxiv_ 2021 Nov 11 doi: [10.1101/2021.11.10.467921](https://doi.org/10.1101/2021.11.10.467921). +