Skip to content

Commit

Permalink
Merge pull request #179 from rhysnewell/ISS-174
Browse files Browse the repository at this point in the history
threading issues and rosella version
  • Loading branch information
rhysnewell authored Dec 6, 2023
2 parents a341fe4 + c1fe024 commit 16e1c3e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 62 deletions.
14 changes: 5 additions & 9 deletions aviary/aviary.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,14 @@ def main():

base_group.add_argument(
'-t', '--max-threads', '--max_threads',
help='Maximum number of threads given to any particular process',
help='Maximum number of threads given to any particular process. If max_threads > n_cores then n_cores will be bumped up to max_threads. Useful if you want more fine grain control over the number of threads used by each process.',
dest='max_threads',
default=8,
)

base_group.add_argument(
'-p', '--pplacer-threads', '--pplacer_threads',
help=argparse.SUPPRESS,
dest='pplacer_threads',
default=8,
)

base_group.add_argument(
'-n', '--n-cores', '--n_cores',
help='Maximum number of cores available for use. Must be >= to max_threads',
help='Maximum number of cores available for use. Setting to multiples of max_threads will allow for multiple processes to be run in parallel.',
dest='n_cores',
default=16,
)
Expand Down Expand Up @@ -1261,6 +1254,9 @@ def main():

# else:
args = manage_env_vars(args)
if int(args.max_threads) > int(args.n_cores):
args.n_cores = args.max_threads

prefix = args.output
if not os.path.exists(prefix):
os.makedirs(prefix)
Expand Down
6 changes: 3 additions & 3 deletions aviary/modules/annotation/annotation.smk
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ rule checkm2:
mag_extension = config['mag_extension'],
checkm2_db_path = config["checkm2_db_folder"]
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 8*60*attempt,
Expand Down Expand Up @@ -175,7 +175,7 @@ rule eggnog:
output:
done = 'data/eggnog/done'
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand Down Expand Up @@ -206,7 +206,7 @@ rule gtdbtk:
conda:
"../../envs/gtdbtk.yaml"
threads:
min(config["max_threads"], 32)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 256*1024*attempt),
runtime = lambda wildcards, attempt: 12*60*attempt,
Expand Down
24 changes: 12 additions & 12 deletions aviary/modules/assembly/assembly.smk
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rule flye_assembly:
params:
long_read_type = config["long_read_type"]
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt,
Expand Down Expand Up @@ -84,7 +84,7 @@ rule polish_metagenome_flye:
illumina = False,
coassemble = config["coassemble"]
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -111,7 +111,7 @@ rule generate_pilon_sort:
bam = temp("data/pilon.sort.bam"),
bai = temp("data/pilon.sort.bam.bai")
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand Down Expand Up @@ -160,7 +160,7 @@ rule polish_meta_racon_ill:
fasta = "data/assembly.pol.fin.fasta",
paf = temp("data/polishing/alignment.racon_ill.0.paf")
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand Down Expand Up @@ -302,7 +302,7 @@ rule filter_illumina_assembly:
conda:
"../../envs/minimap2.yaml"
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand Down Expand Up @@ -352,7 +352,7 @@ rule spades_assembly:
fasta = "data/spades_assembly.fasta",
spades_folder = temp(directory("data/spades_assembly/"))
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 256*1024*attempt),
runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt,
Expand Down Expand Up @@ -414,7 +414,7 @@ rule assemble_short_reads:
tmpdir = config["tmpdir"],
final_assembly = True
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt,
Expand Down Expand Up @@ -451,7 +451,7 @@ rule spades_assembly_coverage:
bam = temp("data/short_vs_mega.bam"),
bai = temp("data/short_vs_mega.bam.bai")
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -478,7 +478,7 @@ rule metabat_binning_short:
conda:
"../binning/envs/metabat2.yaml"
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -503,7 +503,7 @@ rule map_long_mega:
bam = temp("data/long_vs_mega.bam"),
bai = temp("data/long_vs_mega.bam.bai")
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand Down Expand Up @@ -546,7 +546,7 @@ rule get_read_pools:
conda:
"envs/mfqe.yaml"
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 12*60*attempt,
Expand All @@ -565,7 +565,7 @@ rule assemble_pools:
fasta = "data/spades_assembly.fasta",
metabat_done = "data/metabat_bins/done"
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt,
Expand Down
50 changes: 25 additions & 25 deletions aviary/modules/binning/binning.smk
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ rule prepare_binning_files:
conda:
"../../envs/coverm.yaml"
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt,
Expand Down Expand Up @@ -89,7 +89,7 @@ rule maxbin2:
params:
min_contig_size = config["min_contig_size"]
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt,
Expand All @@ -116,7 +116,7 @@ rule concoct:
params:
min_contig_size = config["min_contig_size"]
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 72*60 + 24*60*attempt,
Expand Down Expand Up @@ -175,7 +175,7 @@ rule vamb:
min_contig_size = config["min_contig_size"],
vamb_threads = min(int(config["max_threads"]), 16) // 2 # vamb use double the threads you give it
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand Down Expand Up @@ -215,7 +215,7 @@ rule metabat2:
conda:
"envs/metabat2.yaml"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 12*60*attempt,
Expand Down Expand Up @@ -246,7 +246,7 @@ rule metabat_spec:
benchmark:
"benchmarks/metabat_spec.benchmark.txt"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -272,7 +272,7 @@ rule metabat_sspec:
benchmark:
"benchmarks/metabat_sspec.benchmark.txt"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -298,7 +298,7 @@ rule metabat_sens:
benchmark:
"benchmarks/metabat_sens.benchmark.txt"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -324,7 +324,7 @@ rule metabat_ssens:
benchmark:
"benchmarks/metabat_ssens.benchmark.txt"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -350,7 +350,7 @@ rule rosella:
conda:
"envs/rosella.yaml"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60*attempt,
Expand All @@ -375,7 +375,7 @@ rule semibin:
output:
done = "data/semibin_bins/done"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 24*60 + 48*60*(attempt-1),
Expand Down Expand Up @@ -408,7 +408,7 @@ rule checkm_rosella:
conda:
"../../envs/checkm2.yaml"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 8*60*attempt,
Expand All @@ -433,7 +433,7 @@ rule checkm_metabat2:
conda:
"../../envs/checkm2.yaml"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 8*60*attempt,
Expand All @@ -458,7 +458,7 @@ rule checkm_semibin:
conda:
"../../envs/checkm2.yaml"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 8*60*attempt,
Expand Down Expand Up @@ -491,7 +491,7 @@ rule refine_rosella:
final_refining = False,
bin_prefix = "rosella"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt,
Expand All @@ -512,7 +512,7 @@ rule refine_metabat2:
output:
'data/metabat2_refined/done'
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt,
Expand Down Expand Up @@ -544,7 +544,7 @@ rule refine_semibin:
fasta = ancient(config["fasta"]),
# kmers = "data/rosella_bins/kmer_frequencies.tsv"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt,
Expand Down Expand Up @@ -624,7 +624,7 @@ rule das_tool:
semibin_done = [] if "semibin" in config["skip_binners"] else "data/semibin_refined/done",
vamb_done = [] if "vamb" in config["skip_binners"] else "data/vamb_bins/done",
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 12*60*attempt,
Expand Down Expand Up @@ -664,7 +664,7 @@ rule refine_dastool:
fasta = ancient(config["fasta"]),
# kmers = "data/rosella_bins/kmer_frequencies.tsv"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 48*60 + 24*60*attempt,
Expand Down Expand Up @@ -695,7 +695,7 @@ rule get_abundances:
input:
"bins/checkm.out"
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt,
Expand Down Expand Up @@ -732,7 +732,7 @@ rule checkm_das_tool:
conda:
"../../envs/checkm.yaml"
threads:
min(config["max_threads"], 16)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 128*1024*attempt),
runtime = lambda wildcards, attempt: 8*60*attempt,
Expand All @@ -749,7 +749,7 @@ rule checkm_das_tool:
rule singlem_pipe_reads:
output:
"data/singlem_out/metagenome.combined_otu_table.csv"
threads: 1
threads: min(config["max_threads"], 48)
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 8*1024*attempt),
runtime = lambda wildcards, attempt: 12*60*attempt,
Expand All @@ -770,7 +770,7 @@ rule singlem_appraise:
params:
pplacer_threads = config['pplacer_threads'],
fasta = config['fasta']
threads: 1
threads: min(config["max_threads"], 48)
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 8*1024*attempt),
runtime = lambda wildcards, attempt: 12*60*attempt,
Expand Down Expand Up @@ -844,7 +844,7 @@ rule dereplicate_and_get_abundances_paired:
final_bins = 'bins/final_bins',
derep_ani = 0.97
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt,
Expand All @@ -865,7 +865,7 @@ rule dereplicate_and_get_abundances_interleaved:
final_bins = 'bins/final_bins',
derep_ani = 0.97
threads:
min(config["max_threads"], 64)
config["max_threads"]
resources:
mem_mb = lambda wildcards, attempt: min(int(config["max_memory"])*1024, 512*1024*attempt),
runtime = lambda wildcards, attempt: 24*60 + 24*60*attempt,
Expand Down
Loading

0 comments on commit 16e1c3e

Please sign in to comment.