diff --git a/aviary/aviary.py b/aviary/aviary.py index c1c62eff..6d8a4f99 100644 --- a/aviary/aviary.py +++ b/aviary/aviary.py @@ -138,21 +138,14 @@ def main(): base_group.add_argument( '-t', '--max-threads', '--max_threads', - help='Maximum number of threads given to any particular process', + help='Maximum number of threads given to any particular process. If max_threads > n_cores then n_cores will be bumped up to max_threads. Useful if you want more fine grain control over the number of threads used by each process.', dest='max_threads', default=8, ) - base_group.add_argument( - '-p', '--pplacer-threads', '--pplacer_threads', - help=argparse.SUPPRESS, - dest='pplacer_threads', - default=8, - ) - base_group.add_argument( '-n', '--n-cores', '--n_cores', - help='Maximum number of cores available for use. Must be >= to max_threads', + help='Maximum number of cores available for use. Setting to multiples of max_threads will allow for multiple processes to be run in parallel.', dest='n_cores', default=16, ) @@ -1261,6 +1254,9 @@ def main(): # else: args = manage_env_vars(args) + if int(args.max_threads) > int(args.n_cores): + args.n_cores = args.max_threads + prefix = args.output if not os.path.exists(prefix): os.makedirs(prefix) diff --git a/aviary/modules/annotation/annotation.smk b/aviary/modules/annotation/annotation.smk index 0498c928..6b2ca96a 100644 --- a/aviary/modules/annotation/annotation.smk +++ b/aviary/modules/annotation/annotation.smk @@ -147,7 +147,7 @@ rule checkm2: mag_extension = config['mag_extension'], checkm2_db_path = config["checkm2_db_folder"] threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 8*60*attempt, @@ -175,7 +175,7 @@ rule eggnog: output: done = 'data/eggnog/done' threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -206,7 +206,7 @@ rule gtdbtk: conda: "../../envs/gtdbtk.yaml" threads: - min(config["max_threads"], 32) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 256*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, diff --git a/aviary/modules/assembly/assembly.smk b/aviary/modules/assembly/assembly.smk index db92da54..00b1f258 100644 --- a/aviary/modules/assembly/assembly.smk +++ b/aviary/modules/assembly/assembly.smk @@ -56,7 +56,7 @@ rule flye_assembly: params: long_read_type = config["long_read_type"] threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt, @@ -84,7 +84,7 @@ rule polish_metagenome_flye: illumina = False, coassemble = config["coassemble"] threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -111,7 +111,7 @@ rule generate_pilon_sort: bam = temp("data/pilon.sort.bam"), bai = temp("data/pilon.sort.bam.bai") threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -160,7 +160,7 @@ rule polish_meta_racon_ill: fasta = "data/assembly.pol.fin.fasta", paf = temp("data/polishing/alignment.racon_ill.0.paf") threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -302,7 +302,7 @@ rule filter_illumina_assembly: conda: "../../envs/minimap2.yaml" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -352,7 +352,7 @@ rule spades_assembly: fasta = "data/spades_assembly.fasta", spades_folder = temp(directory("data/spades_assembly/")) threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 256*1024*attempt), runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt, @@ -414,7 +414,7 @@ rule assemble_short_reads: tmpdir = config["tmpdir"], final_assembly = True threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt, @@ -451,7 +451,7 @@ rule spades_assembly_coverage: bam = temp("data/short_vs_mega.bam"), bai = temp("data/short_vs_mega.bam.bai") threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -478,7 +478,7 @@ rule metabat_binning_short: conda: "../binning/envs/metabat2.yaml" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -503,7 +503,7 @@ rule map_long_mega: bam = temp("data/long_vs_mega.bam"), bai = temp("data/long_vs_mega.bam.bai") threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -546,7 +546,7 @@ rule get_read_pools: conda: "envs/mfqe.yaml" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -565,7 +565,7 @@ rule assemble_pools: fasta = "data/spades_assembly.fasta", metabat_done = "data/metabat_bins/done" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt, diff --git a/aviary/modules/binning/binning.smk b/aviary/modules/binning/binning.smk index b1048524..9bbdfe1d 100644 --- a/aviary/modules/binning/binning.smk +++ b/aviary/modules/binning/binning.smk @@ -58,7 +58,7 @@ rule prepare_binning_files: conda: "../../envs/coverm.yaml" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt, @@ -89,7 +89,7 @@ rule maxbin2: params: min_contig_size = config["min_contig_size"] threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt, @@ -116,7 +116,7 @@ rule concoct: params: min_contig_size = config["min_contig_size"] threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt, @@ -175,7 +175,7 @@ rule vamb: min_contig_size = config["min_contig_size"], vamb_threads = min(int(config["max_threads"]), 16) // 2 # vamb use double the threads you give it threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -215,7 +215,7 @@ rule metabat2: conda: "envs/metabat2.yaml" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -246,7 +246,7 @@ rule metabat_spec: benchmark: "benchmarks/metabat_spec.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -272,7 +272,7 @@ rule metabat_sspec: benchmark: "benchmarks/metabat_sspec.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -298,7 +298,7 @@ rule metabat_sens: benchmark: "benchmarks/metabat_sens.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -324,7 +324,7 @@ rule metabat_ssens: benchmark: "benchmarks/metabat_ssens.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -350,7 +350,7 @@ rule rosella: conda: "envs/rosella.yaml" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60*attempt, @@ -375,7 +375,7 @@ rule semibin: output: done = "data/semibin_bins/done" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 48*60*(attempt-1), @@ -408,7 +408,7 @@ rule checkm_rosella: conda: "../../envs/checkm2.yaml" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 8*60*attempt, @@ -433,7 +433,7 @@ rule checkm_metabat2: conda: "../../envs/checkm2.yaml" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 8*60*attempt, @@ -458,7 +458,7 @@ rule checkm_semibin: conda: "../../envs/checkm2.yaml" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 8*60*attempt, @@ -491,7 +491,7 @@ rule refine_rosella: final_refining = False, bin_prefix = "rosella" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt, @@ -512,7 +512,7 @@ rule refine_metabat2: output: 'data/metabat2_refined/done' threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt, @@ -544,7 +544,7 @@ rule refine_semibin: fasta = ancient(config["fasta"]), # kmers = "data/rosella_bins/kmer_frequencies.tsv" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt, @@ -624,7 +624,7 @@ rule das_tool: semibin_done = [] if "semibin" in config["skip_binners"] else "data/semibin_refined/done", vamb_done = [] if "vamb" in config["skip_binners"] else "data/vamb_bins/done", threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -664,7 +664,7 @@ rule refine_dastool: fasta = ancient(config["fasta"]), # kmers = "data/rosella_bins/kmer_frequencies.tsv" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt, @@ -695,7 +695,7 @@ rule get_abundances: input: "bins/checkm.out" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt, @@ -732,7 +732,7 @@ rule checkm_das_tool: conda: "../../envs/checkm.yaml" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 8*60*attempt, @@ -749,7 +749,7 @@ rule checkm_das_tool: rule singlem_pipe_reads: output: "data/singlem_out/metagenome.combined_otu_table.csv" - threads: 1 + threads: min(config["max_threads"], 48) resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 8*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -770,7 +770,7 @@ rule singlem_appraise: params: pplacer_threads = config['pplacer_threads'], fasta = config['fasta'] - threads: 1 + threads: min(config["max_threads"], 48) resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 8*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -844,7 +844,7 @@ rule dereplicate_and_get_abundances_paired: final_bins = 'bins/final_bins', derep_ani = 0.97 threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt, @@ -865,7 +865,7 @@ rule dereplicate_and_get_abundances_interleaved: final_bins = 'bins/final_bins', derep_ani = 0.97 threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt, diff --git a/aviary/modules/binning/envs/rosella.yaml b/aviary/modules/binning/envs/rosella.yaml index cdf362be..ee398172 100644 --- a/aviary/modules/binning/envs/rosella.yaml +++ b/aviary/modules/binning/envs/rosella.yaml @@ -7,7 +7,7 @@ dependencies: - python >= 3.8, <= 3.10 - gcc - cxx-compiler - - rosella >= 0.5.1 + - rosella >= 0.5.2 - numba >= 0.53, <= 0.57 - numpy <= 1.24 - joblib >= 1.1.0, <= 1.3 diff --git a/aviary/modules/quality_control/qc.smk b/aviary/modules/quality_control/qc.smk index d7c6de69..4fe15324 100755 --- a/aviary/modules/quality_control/qc.smk +++ b/aviary/modules/quality_control/qc.smk @@ -28,7 +28,7 @@ rule qc_short_reads: conda: "../../envs/minimap2.yaml" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 8*60*attempt, @@ -53,7 +53,7 @@ rule qc_long_reads: reference_filter = [] if config["reference_filter"] == "none" else config["reference_filter"], skip_qc = config["skip_qc"] threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -77,7 +77,7 @@ rule fastqc: benchmark: "benchmarks/fastqc_short.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -96,7 +96,7 @@ rule fastqc_long: benchmark: "benchmarks/fastqc_long.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -116,7 +116,7 @@ rule nanoplot: benchmark: "benchmarks/nanoplot.benchmark.txt" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -136,7 +136,7 @@ rule metaquast: output: "www/metaquast/report.html" threads: - min(config["max_threads"], 16) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt), runtime = lambda wildcards, attempt: 12*60*attempt, @@ -158,7 +158,7 @@ rule read_fraction_recovered: conda: "../../envs/coverm.yaml" threads: - min(config["max_threads"], 64) + config["max_threads"] resources: mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt), runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt, diff --git a/aviary/scripts/singlem_reads.py b/aviary/scripts/singlem_reads.py index b1736180..1321f713 100644 --- a/aviary/scripts/singlem_reads.py +++ b/aviary/scripts/singlem_reads.py @@ -7,7 +7,7 @@ def run_singlem( long_reads, short_reads_1, short_reads_2, - pplacer_threads: int, + threads: int, log: str, ): try: @@ -18,17 +18,17 @@ def run_singlem( singlem_output_list = [] if long_reads != "none": - singlem_pipe_cmd = f"singlem pipe --threads {pplacer_threads} --sequences {' '.join(long_reads)} --otu-table data/singlem_out/metagenome.longread_otu_table.csv".split() + singlem_pipe_cmd = f"singlem pipe --threads {threads} --sequences {' '.join(long_reads)} --otu-table data/singlem_out/metagenome.longread_otu_table.csv".split() with open(log, "a") as logf: run(singlem_pipe_cmd, stdout=logf, stderr=STDOUT) if short_reads_2 != "none": - singlem_pipe_cmd = f"singlem pipe --threads {pplacer_threads} --forward {' '.join(short_reads_1)} --reverse {' '.join(short_reads_2)} --otu-table data/singlem_out/metagenome.shortread_otu_table.csv".split() + singlem_pipe_cmd = f"singlem pipe --threads {threads} --forward {' '.join(short_reads_1)} --reverse {' '.join(short_reads_2)} --otu-table data/singlem_out/metagenome.shortread_otu_table.csv".split() with open(log, "a") as logf: run(singlem_pipe_cmd, stdout=logf, stderr=STDOUT) elif short_reads_1 != "none": - singlem_pipe_cmd = f"singlem pipe --threads {pplacer_threads} --sequences {' '.join(short_reads_1)} --otu-table data/singlem_out/metagenome.shortread_otu_table.csv".split() + singlem_pipe_cmd = f"singlem pipe --threads {threads} --sequences {' '.join(short_reads_1)} --otu-table data/singlem_out/metagenome.shortread_otu_table.csv".split() with open(log, "a") as logf: run(singlem_pipe_cmd, stdout=logf, stderr=STDOUT) @@ -57,7 +57,7 @@ def run_singlem( long_reads = snakemake.config['long_reads'] short_reads_1 = snakemake.config['short_reads_1'] short_reads_2 = snakemake.config['short_reads_2'] - pplacer_threads = snakemake.config["pplacer_threads"] + pplacer_threads = snakemake.threads log = snakemake.log[0] with open(log, "w") as logf: pass