From d0b94bfa7bdb1cbaf42d43ab3b120371fd9d1e9b Mon Sep 17 00:00:00 2001 From: Panciera Date: Wed, 23 Mar 2016 13:47:12 -0400 Subject: [PATCH 01/31] ngs_filter now uses generators --- ngs_mapper/nfilter.py | 47 ++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index aaddab15..431bf996 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -18,7 +18,7 @@ from Bio import SeqIO from docopt import docopt from schema import Schema, Use, Optional, Or -from itertools import ifilterfalse, izip, chain +from itertools import ifilterfalse, izip, chain, izip_longest import re import os import sys @@ -119,7 +119,6 @@ def is_valid(fn): def idx_filter(read, idxread, thresh): ''' AT or ABOVE threshold.''' return min(idxread._per_letter_annotations['phred_quality']) >= thresh - formats={ 'sff' : 'sff', 'fq' : 'fastq', 'fastq' : 'fastq', 'fa' : 'fasta', 'fasta' : 'fasta' } extension = lambda s: s.split('.')[-1] compose = lambda f, g: lambda x: f(g(x)) @@ -136,25 +135,41 @@ def make_filtered(readpath, idxQualMin, dropNs): format = getformat(readpath) # right now this silently skips fq_open = partial(SeqIO.parse, format=format) - reads = [] + def filterRead((total, badIndex, hadNCount, keptReads), (read, idxRead)): + if idxRead: + indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin + badIndex += int(indexIsBad) + hasN = False + if dropNs: + hasN = 'N' in str(read.seq).upper() + hadNCount += int(hasN) + dropRead = hasN or indexIsBad + total += 1 + if not dropRead: + keptReads = chain([read], keptReads) + return (total, badIndex, hadNCount, keptReads) try: - reads = list(fq_open(readpath)) + indexReads = [] if not index else fq_open(index) + reads = fq_open(readpath) except AssertionError, E: print "skipping biopython assertion error" print readpath #sys.stderr.write(str(E)) - if index and idxQualMin: - idxreads = fq_open(index) - intermediate = [r for r, idx in izip(reads, idxreads) if idx_filter(r, idx, idxQualMin) ] - else: - intermediate = reads - if dropNs: - hasNs = lambda rec: 'N' in str(rec.seq).upper() - filtered = list(ifilterfalse(hasNs, intermediate)) - else: - filtered = intermediate - total, badIndex, hadN = len(reads), len(reads) - len(intermediate), \ - len(intermediate) - len(filtered) + readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None) + total, badIndex, hadN, filtered = reduce(filterRead, readsWithMaybeIndex, (0, 0, 0, [])) + return filtered, total, badIndex, hadN +# if index and idxQualMin: +# idxreads = fq_open(index) +# intermediate = [r for r, idx in izip(reads, idxreads) if idx_filter(r, idx, idxQualMin) ] +# else: +# intermediate = reads +# if dropNs: +# hasNs = lambda rec: 'N' in str(rec.seq).upper() +# filtered = list(ifilterfalse(hasNs, intermediate)) +# else: +# filtered = intermediate +# total, badIndex, hadN = len(reads), len(reads) - len(intermediate), \ +# len(intermediate) - len(filtered) return filtered, total, badIndex, hadN def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): From 92dc4c7cdf31e452711a3304c01213f2d4d80b23 Mon Sep 17 00:00:00 2001 From: Panciera Date: Wed, 23 Mar 2016 13:48:13 -0400 Subject: [PATCH 02/31] remove tuple argument --- ngs_mapper/nfilter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 431bf996..afc8d7cc 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -135,7 +135,8 @@ def make_filtered(readpath, idxQualMin, dropNs): format = getformat(readpath) # right now this silently skips fq_open = partial(SeqIO.parse, format=format) - def filterRead((total, badIndex, hadNCount, keptReads), (read, idxRead)): + def filterRead(acc, elem): + (total, badIndex, hadNCount, keptReads), (read, idxRead) = (acc, elem) if idxRead: indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin badIndex += int(indexIsBad) From a1797d505c29da8a3e35e5ee8b5ac53978b2b4d4 Mon Sep 17 00:00:00 2001 From: Panciera Date: Wed, 23 Mar 2016 14:13:14 -0400 Subject: [PATCH 03/31] fix missing variable definition --- ngs_mapper/nfilter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index afc8d7cc..048af4ec 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -137,6 +137,7 @@ def make_filtered(readpath, idxQualMin, dropNs): fq_open = partial(SeqIO.parse, format=format) def filterRead(acc, elem): (total, badIndex, hadNCount, keptReads), (read, idxRead) = (acc, elem) + indexIsBad = False if idxRead: indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin badIndex += int(indexIsBad) From 36cc00d21ac826bf82b4760cce740e8218b2579e Mon Sep 17 00:00:00 2001 From: Mike Panciera Date: Thu, 24 Mar 2016 15:18:37 -0400 Subject: [PATCH 04/31] deleting some comments --- ngs_mapper/nfilter.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 048af4ec..cbe4a971 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -98,8 +98,6 @@ def map_to_dir(readsdir, func, platforms, parallel=False): #947__1__TI86__2012_08_06__Unk.sff #947__2__TI86__2012_08_06__Unk.sff -# def is_valid(fn): -# return is_fastq(fn) and not is_index(fn) def is_valid(fn): return not is_index(fn) and is_fastq(fn) files = filter(is_valid, plat_files) @@ -160,19 +158,6 @@ def filterRead(acc, elem): readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None) total, badIndex, hadN, filtered = reduce(filterRead, readsWithMaybeIndex, (0, 0, 0, [])) return filtered, total, badIndex, hadN -# if index and idxQualMin: -# idxreads = fq_open(index) -# intermediate = [r for r, idx in izip(reads, idxreads) if idx_filter(r, idx, idxQualMin) ] -# else: -# intermediate = reads -# if dropNs: -# hasNs = lambda rec: 'N' in str(rec.seq).upper() -# filtered = list(ifilterfalse(hasNs, intermediate)) -# else: -# filtered = intermediate -# total, badIndex, hadN = len(reads), len(reads) - len(intermediate), \ -# len(intermediate) - len(filtered) - return filtered, total, badIndex, hadN def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): '''write the results to the new directory. From bda87ee3c9e22793ef6563bcfbeb89a36f41cf2c Mon Sep 17 00:00:00 2001 From: Panciera Date: Fri, 25 Mar 2016 15:26:11 -0400 Subject: [PATCH 05/31] moved from reduce to cytoolz.accumulate --- ngs_mapper/nfilter.py | 31 ++++++++++++++++++------------- requirements.txt | 1 + 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 048af4ec..8cf4827f 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -25,7 +25,7 @@ import warnings from data import reads_by_plat from ngs_mapper.config import load_config - +from cytoolz.itertoolz import accumulate ALLPLATFORMS = 'Roche454','IonTorrent','MiSeq', 'Sanger' #TODO: should guarantee that there is an index file or allow there not to be one STATSFILE_NAME='ngs_filter_stats.txt' @@ -136,7 +136,7 @@ def make_filtered(readpath, idxQualMin, dropNs): # right now this silently skips fq_open = partial(SeqIO.parse, format=format) def filterRead(acc, elem): - (total, badIndex, hadNCount, keptReads), (read, idxRead) = (acc, elem) + (total, badIndex, hadNCount, _), (read, idxRead) = (acc, elem) indexIsBad = False if idxRead: indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin @@ -147,9 +147,9 @@ def filterRead(acc, elem): hadNCount += int(hasN) dropRead = hasN or indexIsBad total += 1 - if not dropRead: - keptReads = chain([read], keptReads) - return (total, badIndex, hadNCount, keptReads) + if dropRead: + read = None + return (total, badIndex, hadNCount, read) try: indexReads = [] if not index else fq_open(index) reads = fq_open(readpath) @@ -158,8 +158,9 @@ def filterRead(acc, elem): print readpath #sys.stderr.write(str(E)) readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None) - total, badIndex, hadN, filtered = reduce(filterRead, readsWithMaybeIndex, (0, 0, 0, [])) - return filtered, total, badIndex, hadN + #total, badIndex, hadN, filtered = accumulate(filterRead, readsWithMaybeIndex, (0, 0, 0, [])) + return accumulate(filterRead, readsWithMaybeIndex, (0, 0, 0, None)) + #return filtered, total, badIndex, hadN # if index and idxQualMin: # idxreads = fq_open(index) # intermediate = [r for r, idx in izip(reads, idxreads) if idx_filter(r, idx, idxQualMin) ] @@ -172,18 +173,19 @@ def filterRead(acc, elem): # filtered = intermediate # total, badIndex, hadN = len(reads), len(reads) - len(intermediate), \ # len(intermediate) - len(filtered) - return filtered, total, badIndex, hadN def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): '''write the results to the new directory. Also writes a stats file to outdir/ngs_filter_stats.txt, with basic information about how many reads were filtered.''' - results, total, badIndex, hadN = make_filtered(readpath, idxQualMin, dropNs) + results = make_filtered(readpath, idxQualMin, dropNs) outpath = name_filtered(readpath, outdir) - msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN), - stat_body.format(readpath, badIndex, idxQualMin, hadN) ]) - print msg try: - num_written = SeqIO.write(results, outpath, 'fastq') + num_written = 0 + with open(outpath, 'w') as outfile: + for total, badIndex, hadN, read in results: + if read: + SeqIO.write(read, outfile, 'fastq') + num_written += 1 print "filtered reads from %s will be written to %s" % (readpath, outpath) print "%s reads left after filtering." % num_written if num_written <= 0: @@ -191,6 +193,9 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): except AssertionError, E: print "skipping biopython assertion error" #sys.stderr.write(str(E)) + msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN), + stat_body.format(readpath, badIndex, idxQualMin, hadN) ]) + print msg with open(os.path.join(outdir, STATSFILE_NAME), 'w') as statfile: statfile.write(msg) return outpath diff --git a/requirements.txt b/requirements.txt index 1caa765f..8f75c54e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,4 @@ argparse counter schema docopt +cytoolz From cd828bf1240454dbd463eabbcf0abea2485055a7 Mon Sep 17 00:00:00 2001 From: Panciera Date: Fri, 25 Mar 2016 15:46:45 -0400 Subject: [PATCH 06/31] explicit generator instead of `accumulate` --- ngs_mapper/nfilter.py | 36 +++++++++++++++++++----------------- requirements.txt | 1 - 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 8cf4827f..3b5d3dc5 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -25,7 +25,6 @@ import warnings from data import reads_by_plat from ngs_mapper.config import load_config -from cytoolz.itertoolz import accumulate ALLPLATFORMS = 'Roche454','IonTorrent','MiSeq', 'Sanger' #TODO: should guarantee that there is an index file or allow there not to be one STATSFILE_NAME='ngs_filter_stats.txt' @@ -135,21 +134,23 @@ def make_filtered(readpath, idxQualMin, dropNs): format = getformat(readpath) # right now this silently skips fq_open = partial(SeqIO.parse, format=format) - def filterRead(acc, elem): - (total, badIndex, hadNCount, _), (read, idxRead) = (acc, elem) - indexIsBad = False - if idxRead: - indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin - badIndex += int(indexIsBad) - hasN = False - if dropNs: - hasN = 'N' in str(read.seq).upper() - hadNCount += int(hasN) - dropRead = hasN or indexIsBad - total += 1 - if dropRead: - read = None - return (total, badIndex, hadNCount, read) + def filterReads(readsWithMaybeIndex): + total = badIndex = hadNCount = 0 + read, idxRead = None, None + for read, idxRead in readsWithMaybeIndex: + indexIsBad = False + if idxRead: + indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin + badIndex += int(indexIsBad) + hasN = False + if dropNs: + hasN = 'N' in str(read.seq).upper() + hadNCount += int(hasN) + dropRead = hasN or indexIsBad + total += 1 + if dropRead: + read = None + yield (total, badIndex, hadNCount, read) try: indexReads = [] if not index else fq_open(index) reads = fq_open(readpath) @@ -158,8 +159,9 @@ def filterRead(acc, elem): print readpath #sys.stderr.write(str(E)) readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None) + return filterReads(readsWithMaybeIndex) #total, badIndex, hadN, filtered = accumulate(filterRead, readsWithMaybeIndex, (0, 0, 0, [])) - return accumulate(filterRead, readsWithMaybeIndex, (0, 0, 0, None)) + #return accumulate(filterRead, readsWithMaybeIndex, (0, 0, 0, None)) #return filtered, total, badIndex, hadN # if index and idxQualMin: # idxreads = fq_open(index) diff --git a/requirements.txt b/requirements.txt index 8f75c54e..1caa765f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,3 @@ argparse counter schema docopt -cytoolz From 719e7a04c294ae24630e5d8a273ac3719283ab60 Mon Sep 17 00:00:00 2001 From: Panciera Date: Mon, 28 Mar 2016 11:19:23 -0400 Subject: [PATCH 07/31] print statements -> logger --- ngs_mapper/nfilter.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index dda2be47..fddc400d 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -25,6 +25,9 @@ import warnings from data import reads_by_plat from ngs_mapper.config import load_config +import log +logger = log.setup_logger(__name__, log.get_config()) + ALLPLATFORMS = 'Roche454','IonTorrent','MiSeq', 'Sanger' #TODO: should guarantee that there is an index file or allow there not to be one STATSFILE_NAME='ngs_filter_stats.txt' @@ -75,13 +78,11 @@ def map_to_dir(readsdir, func, platforms, parallel=False): plat_files_dict = reads_by_plat(readsdir) #_nested_files = map(plat_files_dict.get, platforms) _nested_files = filter(None, map(plat_files_dict.get, platforms)) - print _nested_files if not _nested_files: raise ValueError("No fastq files found in directory %s matching platforms. \ Files %s that were not within chosen platforms %s" % ( readsdir, os.listdir(readsdir), platforms) ) #plat_files = list(chain(*chain(*_nested_files))) plat_files =flatten(_nested_files) - print plat_files is_index = partial(re.search, r'_I[12]_') # also this currently skips ab1 files # The problem seems to be that of the below reads, only uses _R1_. @@ -110,7 +111,7 @@ def is_valid(fn): pool.join() return outpaths else: - print "mapping filters over read files %s in directory %s" % (files, readsdir) + logger.debug("mapping filters over read files %s in directory %s" % (files, readsdir)) return map(func, files) def idx_filter(read, idxread, thresh): @@ -153,9 +154,7 @@ def filterReads(readsWithMaybeIndex): indexReads = [] if not index else fq_open(index) reads = fq_open(readpath) except AssertionError, E: - print "skipping biopython assertion error" - print readpath - #sys.stderr.write(str(E)) + logger.debug("skipping biopython assertion error in file %s " % readpath) readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None) return filterReads(readsWithMaybeIndex) @@ -171,16 +170,16 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): if read: SeqIO.write(read, outfile, 'fastq') num_written += 1 - print "filtered reads from %s will be written to %s" % (readpath, outpath) - print "%s reads left after filtering." % num_written + logger.info("filtered reads from %s will be written to %s" % (readpath, outpath)) + logger.info("%s reads left after filtering." % num_written) if num_written <= 0: + logger.warn("No reads left after filtering! Quality controls eliminated all reads. Drop-Ns was set to %s; maybe try again with lower quality min than %s. " %(dropNs, idxQualMin)) warnings.warn("No reads left after filtering! Quality controls eliminated all reads. Drop-Ns was set to %s; maybe try again with lower quality min than %s. " %(dropNs, idxQualMin)) except AssertionError, E: - print "skipping biopython assertion error" + logger.debug("skipping biopython assertion error") #sys.stderr.write(str(E)) msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN), stat_body.format(readpath, badIndex, idxQualMin, hadN) ]) - print msg with open(os.path.join(outdir, STATSFILE_NAME), 'w') as statfile: statfile.write(msg) return outpath @@ -218,7 +217,6 @@ def main(): raw_args = docopt(__doc__, version='Version 1.0') args = scheme.validate(raw_args) - print args mkdir_p(args['--outdir']) if args['--config']: run_from_config(args[''], args['--outdir'], args['--config'], args['--parallel']) @@ -227,7 +225,6 @@ def main(): minmin, minmax = 1, 50 if not (dropNs or (minmin <= idxMin <=minmax)): raise ValueError("No filter specified, drop Ns:%s, Index Quality Min:%s" % (dropNs, idxMin)) - status = "\nfiltering with specifications, drop Ns:%s, Index Quality Min:%s\nfrom folder %s to folder %s" % (dropNs, idxMin, args[''], args['--outdir']) - print status - outpaths = write_post_filter(args[''], idxMin, dropNs, args['--platforms'], args['--outdir'], args['--parallel']) + outpaths = write_post_filter(args[''], idxMin, dropNs, + args['--platforms'], args['--outdir'], args['--parallel']) return 0 From 951bac31beddac04edacdc9ac3109f2fd244f421 Mon Sep 17 00:00:00 2001 From: Panciera Date: Mon, 28 Mar 2016 12:01:48 -0400 Subject: [PATCH 08/31] filter now symlinks if qual=0 and dropNs=False --- ngs_mapper/nfilter.py | 8 +++++--- ngs_mapper/tests/test_filter.py | 6 ++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index fddc400d..62869a9b 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -163,6 +163,10 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): Also writes a stats file to outdir/ngs_filter_stats.txt, with basic information about how many reads were filtered.''' results = make_filtered(readpath, idxQualMin, dropNs) outpath = name_filtered(readpath, outdir) + if not idxQualMin and not dropNs: + os.symlink(readpath, outpath) + logger.warn("Index Quality was %s and dropNs was set to %s, so file %s was copied to %s without filtering" % (idxQualMin, dropNs, readpath, outpath)) + return outpath try: num_written = 0 with open(outpath, 'w') as outfile: @@ -222,9 +226,7 @@ def main(): run_from_config(args[''], args['--outdir'], args['--config'], args['--parallel']) return 0 dropNs, idxMin = args['--drop-ns'], args['--index-min'] - minmin, minmax = 1, 50 - if not (dropNs or (minmin <= idxMin <=minmax)): - raise ValueError("No filter specified, drop Ns:%s, Index Quality Min:%s" % (dropNs, idxMin)) + minmin, minmax = 0, 50 outpaths = write_post_filter(args[''], idxMin, dropNs, args['--platforms'], args['--outdir'], args['--parallel']) return 0 diff --git a/ngs_mapper/tests/test_filter.py b/ngs_mapper/tests/test_filter.py index 7b407508..01ebc9c5 100644 --- a/ngs_mapper/tests/test_filter.py +++ b/ngs_mapper/tests/test_filter.py @@ -73,11 +73,9 @@ def test_filter_raises_error_on_empty_filtered_result(self): self.assertEquals(len(w), 1) #with self.assertRaises(ValueError): - def test_stat_file_none_filtered(self): + def test_symlink_file_none_filtered(self): write_filtered(self.inputfn, 0, False, outdir=self.outdir) - expected = '''ngs_filter found %s reads in file %s, and filtered out %s reads.''' % (4, self.inputfn, 0) - actual = open(self.statsfile).readlines()[0].strip() - self.assertEquals(actual, expected) + self.assertTrue(os.path.islink(self.actualfn)) def test_stat_file_two_filtered(self): write_post_filter(self.inputdir, 32, True, ['Sanger'], self.outdir) From 3197f2fdb849c5fe946f26f6137e96bb6b3bba40 Mon Sep 17 00:00:00 2001 From: Panciera Date: Mon, 28 Mar 2016 12:02:41 -0400 Subject: [PATCH 09/31] filter minquality is now 0 to avoid wasted runtime --- ngs_mapper/config.yaml.default | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/config.yaml.default b/ngs_mapper/config.yaml.default index c37ef789..eae9e746 100644 --- a/ngs_mapper/config.yaml.default +++ b/ngs_mapper/config.yaml.default @@ -10,7 +10,7 @@ ngs_filter: default: False help: 'Drop all reads that have Ns.' indexQualityMin: - default: 1 + default: 0 help: 'The index for each associated MiSeq read must be equal or above this value.' platforms: choices: From 1397dc31e0b88ef78bcbe89fd3f4cc21215ce97d Mon Sep 17 00:00:00 2001 From: Panciera Date: Mon, 28 Mar 2016 13:51:13 -0400 Subject: [PATCH 10/31] `nfilter` now uses `threads` argument/config for parallelism --- ngs_mapper/config.yaml.default | 3 +++ ngs_mapper/nfilter.py | 41 +++++++++++++++++++++++---------- ngs_mapper/tests/test_filter.py | 5 ++-- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/ngs_mapper/config.yaml.default b/ngs_mapper/config.yaml.default index eae9e746..3ee81c20 100644 --- a/ngs_mapper/config.yaml.default +++ b/ngs_mapper/config.yaml.default @@ -12,6 +12,9 @@ ngs_filter: indexQualityMin: default: 0 help: 'The index for each associated MiSeq read must be equal or above this value.' + threads: + default: 1 + help: 'How many threads to use for bwa[Default: %(default)s]' platforms: choices: - MiSeq diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 62869a9b..5db90b03 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -1,10 +1,10 @@ ''' -Usage: ngs_filter [--parallel] [--drop-ns ] [--index-min=] [--platforms ] [--outdir ] [--config ] +Usage: ngs_filter [--threads=] [--drop-ns ] [--index-min=] [--platforms ] [--outdir ] [--config ] Options: --outdir=,-o= outupt directory [Default: filtered] --config=,-c= Derive options from provided YAML file instead of commandline - --parallel Use python's multiprocessing to run on multiple cores + --threads= Number of files to filter in parallel. [Default: 1] --platforms= Only accept reads from specified machines. Choices: 'Roche454','IonTorrent','MiSeq', 'Sanger', 'All', [Default: All] Help: @@ -71,7 +71,7 @@ def flatten(container): yield j else: yield i -def map_to_dir(readsdir, func, platforms, parallel=False): +def map_to_dir(readsdir, idxQualMin, dropNs, platforms, outdir, threads): '''maps *func* to all fastq/sff files which are not indexes. fetch the fastqs and indexes of the directory and write the filtered results.''' #no_index_fqs = fqs_excluding_indices(readsdir) @@ -104,14 +104,25 @@ def is_valid(fn): msg= "Skipped files %s that were not within chosen platforms %s" % ( plat_files, platforms) if not files: raise ValueError("No fastq or sff files found in directory %s" % readsdir + '\n' + msg) - if parallel: + if threads > 1: + iter_func = partial(write_groups, idxQualMin=idxQualMin, + dropNs=dropNs, outdir=outdir) + def make_groups(numGroups, seq): + groups = [[] for _ in xrange(numGroups)] + i = 0 + for x in seq: + groups[i] = [x] + groups[i] + i = (i + 1) % numGroups + return groups + groups = make_groups(threads, files) pool = multiprocessing.Pool() - outpaths = pool.map(func, files) + outpaths = list(chain(*pool.map(iter_func, groups))) pool.close() pool.join() return outpaths else: logger.debug("mapping filters over read files %s in directory %s" % (files, readsdir)) + func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) return map(func, files) def idx_filter(read, idxread, thresh): @@ -188,10 +199,14 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): statfile.write(msg) return outpath -def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, parallel=False): +def write_groups(paths, idxQualMin, dropNs, outdir): + func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) + return map(func, paths) + +def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, threads=0): '''execute write_filtered on the whole directory''' - write_filters = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)#, parallel=parallel) - return map_to_dir(readsdir, write_filters, platforms, parallel) + return map_to_dir(readsdir, idxQualMin=idxQualMin, dropNs=dropNs, + platforms=platforms, outdir=outdir, threads=threads)#, parallel=parallel) def mkdir_p(dir): ''' emulate bash command $ mkdir -p ''' @@ -203,10 +218,12 @@ def picked_platforms(rawarg): return [ p for p in ALLPLATFORMS if p.lower() in rawarg.lower()] -def run_from_config(readsdir, outdir, config_path, parallel): +def run_from_config(readsdir, outdir, config_path): _config = load_config(config_path) defaults = _config['ngs_filter'] - return write_post_filter(readsdir, defaults['indexQualityMin']['default'], defaults['dropNs']['default'], defaults['platforms']['default'], outdir, parallel) + return write_post_filter(readsdir, defaults['indexQualityMin']['default'], + defaults['dropNs']['default'], defaults['platforms']['default'], + outdir, defaults['threads']['default']) def main(): scheme = Schema( @@ -223,10 +240,10 @@ def main(): args = scheme.validate(raw_args) mkdir_p(args['--outdir']) if args['--config']: - run_from_config(args[''], args['--outdir'], args['--config'], args['--parallel']) + run_from_config(args[''], args['--outdir'], args['--config']) return 0 dropNs, idxMin = args['--drop-ns'], args['--index-min'] minmin, minmax = 0, 50 outpaths = write_post_filter(args[''], idxMin, dropNs, - args['--platforms'], args['--outdir'], args['--parallel']) + args['--platforms'], args['--outdir'], args['--threads']) return 0 diff --git a/ngs_mapper/tests/test_filter.py b/ngs_mapper/tests/test_filter.py index 01ebc9c5..eb90ce9e 100644 --- a/ngs_mapper/tests/test_filter.py +++ b/ngs_mapper/tests/test_filter.py @@ -98,10 +98,11 @@ def test_with_config(self, mload_config): mfig = { 'ngs_filter' : {'platforms' : { 'default' : ['Sanger'] }, 'dropNs' : { 'default' : True }, - 'indexQualityMin' : {'default' : 32}} + 'indexQualityMin' : {'default' : 32}, + 'threads' : {'default' : 2}} } mload_config.return_value = mfig - run_from_config(self.inputdir,self.outdir, '_', False) + run_from_config(self.inputdir,self.outdir, '_') actual = open(self.actualfn) expected = open(self.expectedfn) self.assertFilesEqual(expected, actual) From ff63261a3027264e07f9eafe148f3238b325689e Mon Sep 17 00:00:00 2001 From: Panciera Date: Mon, 28 Mar 2016 14:16:50 -0400 Subject: [PATCH 11/31] fixed arg schema --- ngs_mapper/nfilter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 5db90b03..1f4e8538 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -229,7 +229,7 @@ def main(): scheme = Schema( { '' : os.path.isdir, Optional('--drop-ns') : bool, - Optional('--parallel') : bool, + Optional('--threads') : Use(int), Optional('--index-min') : Use(lambda x: int(x) if x else x, error="--index-min expects an integer"), Optional('--platforms') : Use(picked_platforms), Optional('--config') : Or(str, lambda x: x is None), From 0f85f470fe370d611cb07e2c84f96e35205403ee Mon Sep 17 00:00:00 2001 From: Mike Panciera Date: Mon, 28 Mar 2016 14:18:34 -0400 Subject: [PATCH 12/31] fixed missing `=` --- doc/source/config.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/config.rst b/doc/source/config.rst index c00a4373..1c466997 100644 --- a/doc/source/config.rst +++ b/doc/source/config.rst @@ -90,7 +90,7 @@ You will probably want to be able to run an entire samplesheet with a custom con .. code-block:: bash - $> RUNSAMPLEOPTIONS"-c config.yaml" runsamplesheet.sh /path/to/NGSData/ReadsBySample samplesheet.tsv + $> RUNSAMPLEOPTIONS="-c config.yaml" runsamplesheet.sh /path/to/NGSData/ReadsBySample samplesheet.tsv Editing config.yaml =================== From 3510db4466a1566a43653863ff6af5869d2cfb7d Mon Sep 17 00:00:00 2001 From: Panciera Date: Mon, 28 Mar 2016 15:03:00 -0400 Subject: [PATCH 13/31] fix 'w'/'a' bug in stats file, where it got overwritten each read --- ngs_mapper/nfilter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 1f4e8538..3f40baa6 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -195,7 +195,7 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): #sys.stderr.write(str(E)) msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN), stat_body.format(readpath, badIndex, idxQualMin, hadN) ]) - with open(os.path.join(outdir, STATSFILE_NAME), 'w') as statfile: + with open(os.path.join(outdir, STATSFILE_NAME), 'a') as statfile: statfile.write(msg) return outpath From 999dc98d60dba05fe482ccadd51040e90a5d5020 Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Tue, 29 Mar 2016 00:05:07 -0400 Subject: [PATCH 14/31] Closes #221 --- ngs_mapper/tagreads.py | 2 +- ngs_mapper/tests/test_tagreads.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ngs_mapper/tagreads.py b/ngs_mapper/tagreads.py index 3c2950fe..6e36195b 100644 --- a/ngs_mapper/tagreads.py +++ b/ngs_mapper/tagreads.py @@ -22,7 +22,7 @@ class HeaderExists(Exception): pass ID_MAP = ( re.compile( '[0-9A-Z]{14}' ), re.compile( '[A-Z0-9]{5}:\d{1,}:\d{1,}' ), - re.compile( 'M[0-9]{5}:\d+:\d{9}-[A-Z0-9]{5}:\d:\d{4}:\d{4,5}:\d{4,5}' ), + re.compile( 'M[0-9]{5}:\d+:[\w\d-]+:\d:\d{4}:\d{4,5}:\d{4,5}' ), re.compile( '.*' ) ) # Read Group Template diff --git a/ngs_mapper/tests/test_tagreads.py b/ngs_mapper/tests/test_tagreads.py index b019651b..7728a503 100644 --- a/ngs_mapper/tests/test_tagreads.py +++ b/ngs_mapper/tests/test_tagreads.py @@ -86,7 +86,8 @@ def test_miseq( self ): 'M02261:4:000000000-A6FWH:1:2106:7558:24138', 'M02261:4:000000000-A6FWH:1:2106:75581:24138', 'M02261:4:000000000-A6FWH:1:2106:7558:241381', - 'M02261:14:000000000-A6481:1:1101:21903:18036' + 'M02261:14:000000000-A6481:1:1101:21903:18036', + 'M01119:15:AARB3:1:1111:23912:25198', ) for r in rn: read = self.mock_read() From bf18479d470f8264ef81d24c7f4259db6843f949 Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Tue, 29 Mar 2016 00:16:35 -0400 Subject: [PATCH 15/31] attempting to simplify threads for multiprocessing pool --- ngs_mapper/config.yaml.default | 11 +++++++---- ngs_mapper/nfilter.py | 32 +++++++++++--------------------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/ngs_mapper/config.yaml.default b/ngs_mapper/config.yaml.default index 3ee81c20..602e51ff 100644 --- a/ngs_mapper/config.yaml.default +++ b/ngs_mapper/config.yaml.default @@ -2,6 +2,9 @@ # Does not need the trailing / NGSDATA: &NGSDATA /path/to/NGSDATA +# Default threads to use for any stage that supports it +THREADS: &THREADS 1 + # All scripts by name should be top level items # Sub items then are the option names(the dest portion of the add_arugment for the script) # Each option needs to define the default as well as the help message @@ -13,8 +16,8 @@ ngs_filter: default: 0 help: 'The index for each associated MiSeq read must be equal or above this value.' threads: - default: 1 - help: 'How many threads to use for bwa[Default: %(default)s]' + default: *THREADS + help: 'How many threads to use[Default: %(default)s]' platforms: choices: - MiSeq @@ -69,7 +72,7 @@ run_bwa_on_samplename: default: False help: 'Flag to indicate that you want the temporary files kept instead of removing them[Default: %(default)s]' threads: - default: 1 + default: *THREADS help: 'How many threads to use for bwa[Default: %(default)s]' tagreads: SM: @@ -101,7 +104,7 @@ base_caller: default: 10 help: 'What factor to bias high quality bases by. Must be an integer >= 1[Default: %(default)s]' threads: - default: 1 + default: *THREADS help: 'How many threads to use when running base_caller.py[Default: %(default)s]' miseq_sync: ngsdata: diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 3f40baa6..a57d4896 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -104,26 +104,16 @@ def is_valid(fn): msg= "Skipped files %s that were not within chosen platforms %s" % ( plat_files, platforms) if not files: raise ValueError("No fastq or sff files found in directory %s" % readsdir + '\n' + msg) - if threads > 1: - iter_func = partial(write_groups, idxQualMin=idxQualMin, - dropNs=dropNs, outdir=outdir) - def make_groups(numGroups, seq): - groups = [[] for _ in xrange(numGroups)] - i = 0 - for x in seq: - groups[i] = [x] + groups[i] - i = (i + 1) % numGroups - return groups - groups = make_groups(threads, files) - pool = multiprocessing.Pool() - outpaths = list(chain(*pool.map(iter_func, groups))) - pool.close() - pool.join() - return outpaths - else: - logger.debug("mapping filters over read files %s in directory %s" % (files, readsdir)) - func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) - return map(func, files) + logger.debug( + "Using {0} threads to map filters over read files {1} in directory {2}" + .format(threads, files, readsdir) + ) + func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) + pool = multiprocessing.Pool(threads) + outpaths = list(chain(*pool.map(iter_func, groups))) + pool.close() + pool.join() + return outpaths def idx_filter(read, idxread, thresh): ''' AT or ABOVE threshold.''' @@ -203,7 +193,7 @@ def write_groups(paths, idxQualMin, dropNs, outdir): func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) return map(func, paths) -def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, threads=0): +def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, threads=1): '''execute write_filtered on the whole directory''' return map_to_dir(readsdir, idxQualMin=idxQualMin, dropNs=dropNs, platforms=platforms, outdir=outdir, threads=threads)#, parallel=parallel) From 3c2f363f12e1648285623a599f63f5096624b7b4 Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Tue, 29 Mar 2016 00:23:47 -0400 Subject: [PATCH 16/31] fixed incorrect function name in pool.map --- ngs_mapper/nfilter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index a57d4896..2a9341fa 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -110,7 +110,7 @@ def is_valid(fn): ) func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) pool = multiprocessing.Pool(threads) - outpaths = list(chain(*pool.map(iter_func, groups))) + outpaths = pool.map(func) pool.close() pool.join() return outpaths From 1efc9cde8545136b2d5db84f4f732dda8c2cf109 Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Tue, 29 Mar 2016 00:39:15 -0400 Subject: [PATCH 17/31] fixing map call even good-er --- ngs_mapper/nfilter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index 2a9341fa..aa27de9f 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -110,7 +110,7 @@ def is_valid(fn): ) func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) pool = multiprocessing.Pool(threads) - outpaths = pool.map(func) + outpaths = pool.map(func, files) pool.close() pool.join() return outpaths From da98cc730f5dfcce4bf0a92912b170747debd738 Mon Sep 17 00:00:00 2001 From: Panciera Date: Tue, 29 Mar 2016 12:58:04 -0400 Subject: [PATCH 18/31] fixed symlink by using abspath --- ngs_mapper/nfilter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index aa27de9f..65c32ec3 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -165,7 +165,7 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): results = make_filtered(readpath, idxQualMin, dropNs) outpath = name_filtered(readpath, outdir) if not idxQualMin and not dropNs: - os.symlink(readpath, outpath) + os.symlink(os.path.abspath(readpath), os.path.abspath(outpath)) logger.warn("Index Quality was %s and dropNs was set to %s, so file %s was copied to %s without filtering" % (idxQualMin, dropNs, readpath, outpath)) return outpath try: From c4702bda4adcffe463a0a5a3fd6198876eda27ff Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Tue, 29 Mar 2016 23:50:42 -0400 Subject: [PATCH 19/31] Closes #229. Moved qsub argument parsing into parse_args so they can parse the rest of unparsed args if they exist and --qsub_ exist so error can be raised if wrong args exist --- ngs_mapper/runsample.py | 61 +++++++++++++------------ ngs_mapper/tests/test_runsample.py | 73 ++++++++++++++++++------------ 2 files changed, 78 insertions(+), 56 deletions(-) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index a4c19fda..3ab32bc0 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -243,7 +243,32 @@ def parse_args( args=sys.argv[1:] ): args, rest = parser.parse_known_args(args) args.config = configfile - return args,rest + + # Parse qsub args if found + if rest and rest[0].startswith('--qsub'): + qsub_parser = argparse.ArgumentParser(add_help=False) + qsub_parser.add_argument( + '--qsub-help', + default=False, + action='store_true' + ) + qsub_parser.add_argument( + '--qsub_l', + default='nodes=1:ppn=1', + ) + qsub_parser.add_argument( + '--qsub_M', + default=None + ) + + qsub_args = qsub_parser.parse_args(rest) + + if qsub_args.qsub_help: + qsub_parser.print_help() + sys.exit(1) + rest = qsub_args + + return args, rest def make_project_repo( projpath ): ''' @@ -274,10 +299,9 @@ def run_cmd( cmdstr, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, scri raise MissingCommand( "{0} is not an executable?".format(cmd[0]) ) def main(): - args,rest = parse_args() + args,qsubargs = parse_args() # Qsub job? - if rest and rest[0].startswith('--qsub'): - args, qsubargs = split_args(' '.join(sys.argv[1:])) + if qsubargs: print pbs_job(args, qsubargs) sys.exit(1) # So we can set the global logger @@ -479,37 +503,18 @@ def pbs_job(runsampleargs, pbsargs): :param string runsampleargs: args that are for runsample that originaly came from sys.argv(any non --qsub\_) - :param string pbsargs: args for qsub(any --qsub\_) + :param Namespace pbsargs: parsed --qsub_* args :return: pbs job file string ''' - qsub_parser = argparse.ArgumentParser(add_help=False) - qsub_parser.add_argument( - '--qsub-help', - default=False, - action='store_true' - ) - qsub_parser.add_argument( - '--qsub_l', - default='nodes=1:ppn=1', - ) - qsub_parser.add_argument( - '--qsub_M', - default=None - ) - qsub_args = qsub_parser.parse_args(pbsargs) - - if qsub_args.qsub_help: - qsub_parser.print_help() - return '' - + runsampleargs, _ = split_args(' '.join(sys.argv[1:])) samplename = runsampleargs[2] template = '#!/bin/bash\n' \ '#PBS -N {samplename}-ngs_mapper\n' \ '#PBS -j oe\n' \ '#PBS -l {qsub_l}\n' - if qsub_args.qsub_M is not None: + if pbsargs.qsub_M is not None: template += '#PBS -m abe\n' \ - '#PBS -M ' + qsub_args.qsub_M + '\n' + '#PBS -M ' + pbsargs.qsub_M + '\n' template += '\n' \ 'cd $PBS_O_WORKDIR\n' \ @@ -517,7 +522,7 @@ def pbs_job(runsampleargs, pbsargs): return template.format( samplename=samplename, - qsub_l=qsub_args.qsub_l, + qsub_l=pbsargs.qsub_l, runsampleargs=' '.join(runsampleargs) ) diff --git a/ngs_mapper/tests/test_runsample.py b/ngs_mapper/tests/test_runsample.py index ccc45f6c..fb3b7581 100644 --- a/ngs_mapper/tests/test_runsample.py +++ b/ngs_mapper/tests/test_runsample.py @@ -32,34 +32,6 @@ def check_git_repo( self, path ): print e.output return False -@attr('current') -class TestUnitArgs(Base): - functionname = 'parse_args' - - def test_defaults( self ): - args = ['ReadsBySample','Reference.fasta','Sample1'] - res = self._C( args ) - eq_( 'ReadsBySample', res[0].readsdir ) - eq_( 'Reference.fasta', res[0].reference ) - eq_( 'Sample1', res[0].prefix ) - eq_( os.getcwd(), res[0].outdir ) - - def test_set_outdir( self ): - args = ['ReadsBySample','Reference.fasta','Sample1','-od','outdir'] - res = self._C( args ) - eq_( 'ReadsBySample', res[0].readsdir ) - eq_( 'Reference.fasta', res[0].reference ) - eq_( 'Sample1', res[0].prefix ) - eq_( 'outdir', res[0].outdir ) - - def test_parses_only_known(self): - args = [ - 'ReadsBySample','Reference.fasta','Sample1', - '-od','outdir', '--qsub_X', 'foo' - ] - res = self._C(args) - eq_(['--qsub_X','foo'], res[1]) - @patch('ngs_mapper.runsample.logger',Mock()) class TestUnitRunCMD(object): from ngs_mapper import runsample @@ -270,6 +242,8 @@ def test_ensure_proper_log( self ): import mock import unittest +import sh +from nose import tools from ngs_mapper import runsample @attr('current') @@ -285,3 +259,46 @@ def test_splits_correctly(self): r[1], ['--qsub_l','foo','--qsub-help'] ) + +@attr('current') +class TestUnitArgs(unittest.TestCase): + + def test_defaults( self ): + args = ['ReadsBySample','Reference.fasta','Sample1'] + res = runsample.parse_args( args ) + eq_( 'ReadsBySample', res[0].readsdir ) + eq_( 'Reference.fasta', res[0].reference ) + eq_( 'Sample1', res[0].prefix ) + eq_( os.getcwd(), res[0].outdir ) + + def test_set_outdir( self ): + args = ['ReadsBySample','Reference.fasta','Sample1','-od','outdir'] + res = runsample.parse_args( args ) + eq_( 'ReadsBySample', res[0].readsdir ) + eq_( 'Reference.fasta', res[0].reference ) + eq_( 'Sample1', res[0].prefix ) + eq_( 'outdir', res[0].outdir ) + + def test_parses_qsub_args(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--qsub_M', 'foo' + ] + res = runsample.parse_args(args) + args, qsub_args = res + eq_(args.outdir, 'outdir') + eq_(qsub_args.qsub_M, 'foo') + + def test_parser_exception_when_incorrect_argument_given(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--qsub_missing' + ] + self.assertRaises(SystemExit, runsample.parse_args, args) + + def test_qsub_help_exits_and_displays_help(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--qsub-help' + ] + self.assertRaises(SystemExit, runsample.parse_args, args) From c9feddfdd319699182657dcca9b73a21feaf015d Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Tue, 29 Mar 2016 23:57:42 -0400 Subject: [PATCH 20/31] fixes drop-ns. Closes #229 --- ngs_mapper/runsample.py | 1 + ngs_mapper/tests/test_runsample.py | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index 3ab32bc0..39f9a179 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -221,6 +221,7 @@ def parse_args( args=sys.argv[1:] ): parser.add_argument( '--drop-ns', dest='drop_ns', + action='store_true', default=_config['ngs_filter']['dropNs']['default'], help=_config['ngs_filter']['dropNs']['help'], ) diff --git a/ngs_mapper/tests/test_runsample.py b/ngs_mapper/tests/test_runsample.py index fb3b7581..846e2b50 100644 --- a/ngs_mapper/tests/test_runsample.py +++ b/ngs_mapper/tests/test_runsample.py @@ -302,3 +302,12 @@ def test_qsub_help_exits_and_displays_help(self): '-od','outdir', '--qsub-help' ] self.assertRaises(SystemExit, runsample.parse_args, args) + + def test_drop_ns_bool(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--drop-ns' + ] + res = runsample.parse_args(args) + args, qsub_args = res + eq_(args.drop_ns, True) From b99c16b94429c23775dd1bdb85b72b6f193ec49c Mon Sep 17 00:00:00 2001 From: Tyghe Vallard Date: Wed, 30 Mar 2016 01:09:22 -0400 Subject: [PATCH 21/31] unit tests folder...didn't see them before --- ngs_mapper/runsample.py | 4 ++-- ngs_mapper/tests/unit/test_runsample.py | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index 39f9a179..9a6f33ed 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -303,7 +303,8 @@ def main(): args,qsubargs = parse_args() # Qsub job? if qsubargs: - print pbs_job(args, qsubargs) + runsampleargs, _ = split_args(' '.join(sys.argv[1:])) + print pbs_job(runsampleargs, qsubargs) sys.exit(1) # So we can set the global logger global logger @@ -507,7 +508,6 @@ def pbs_job(runsampleargs, pbsargs): :param Namespace pbsargs: parsed --qsub_* args :return: pbs job file string ''' - runsampleargs, _ = split_args(' '.join(sys.argv[1:])) samplename = runsampleargs[2] template = '#!/bin/bash\n' \ '#PBS -N {samplename}-ngs_mapper\n' \ diff --git a/ngs_mapper/tests/unit/test_runsample.py b/ngs_mapper/tests/unit/test_runsample.py index bca5c737..f87dd5c2 100644 --- a/ngs_mapper/tests/unit/test_runsample.py +++ b/ngs_mapper/tests/unit/test_runsample.py @@ -10,10 +10,11 @@ def setUp(self): '-trim_qual', '25', '-head_crop', '0', '-minth', '0', '--CN', 'bar', '-od', 'outdir' ] - self.qsub_args = self.qa = [ - '--qsub_M', 'email@example.com', - '--qsub_l', 'nodes=1:ppn=1' - ] + class Namespace(object): + pass + self.qsub_args = self.qa = Namespace() + self.qa.qsub_M = 'email@example.com' + self.qa.qsub_l = 'nodes=1:ppn=1' self.args = [ self.runsample_args, self.qsub_args @@ -51,7 +52,7 @@ def test_sets_correct_pbs_directives(self): ) def test_omits_email_if_not_in_args(self): - self.args[1] = ['--qsub_l', 'nodes=1:ppn=1'] + self.args[1].qsub_M = None r = runsample.pbs_job(*self.args) self.assertNotIn('#PBS -M', r) self.assertNotIn('#PBS -m', r) From 61cc534c880d578fd2d003f89cc36db682756ee4 Mon Sep 17 00:00:00 2001 From: Mike Panciera Date: Wed, 30 Mar 2016 11:06:54 -0400 Subject: [PATCH 22/31] include docs link and build steps --- doc/source/install.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/doc/source/install.rst b/doc/source/install.rst index a44bf861..722acb38 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -118,3 +118,16 @@ You can pseudo test the installation of the pipeline by running the functional t .. code-block:: bash ngs_mapper/tests/slow_tests.sh + +Documentation +------------- +The documentation is available to view online at http://ngs-mapper.readthedocs.org/en/latest/ By default this site always shows the latest documentation; you can select your verion by clicking `v:latest` in the bottom left menu, then selecting your version number from `Versions.` + +If for any reason you need to use the documentation locally, you can build it: + +.. code-block:: bash + + cd doc + make clean && make html + firefox build/html/install.html#build-and-view-complete-documentation + cd .. From eaacb53f5fc2b8180d405ecf7d7d6149b50b246f Mon Sep 17 00:00:00 2001 From: Mike Panciera Date: Wed, 30 Mar 2016 11:09:17 -0400 Subject: [PATCH 23/31] add documentation link to readme --- README.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.rst b/README.rst index 44b6fb65..24ad764c 100644 --- a/README.rst +++ b/README.rst @@ -21,6 +21,14 @@ Installation `install `_ + +Documentation +------------ + + +`Documentation `_ + + Upgrading --------- From 3a25c808d477dd3494b86e581c8f9f71f9e191b1 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 10:47:05 -0400 Subject: [PATCH 24/31] convert_formats now spits out to a new directory --- ngs_mapper/file_formats.py | 31 +++++++++++++++++-------------- ngs_mapper/runsample.py | 9 ++++++--- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/ngs_mapper/file_formats.py b/ngs_mapper/file_formats.py index afbd0e6d..31815426 100644 --- a/ngs_mapper/file_formats.py +++ b/ngs_mapper/file_formats.py @@ -16,10 +16,12 @@ drop_ext = lambda s: '.'.join(s.split('.')[:-1]) swap_ext = lambda ext: lambda s: drop_ext(s) + '.' + ext find_ext = lambda ext: lambda dir: glob("%s/*%s" % (dir, ext)) +swap_dir = lambda dir: lambda p: os.path.join(dir, basename(p)) -def convert_sff(dir): +def convert_sff(dir, outdir): sff_paths = find_ext('sff')(dir) outnames = map(swap_ext('fastq'), sff_paths) + outnames = map(swap_dir(outdir), outnames) def wrapped_conv(a, b): logger.info('Converting {0} to {1}'.format(a, b)) n = 0 @@ -31,37 +33,38 @@ def wrapped_conv(a, b): return n return sum(map(wrapped_conv, sff_paths, outnames)) -def convert_ab1(dir): +def convert_ab1(dir, outdir): for abi in find_ext('ab1')(dir): dest = swap_ext('fastq')(abi) + dest = swap_dir('outdir')(dest) logger.info('Converting {0} to {1}'.format(abi, dest)) SeqIO.convert(abi, 'abi', dest, 'fastq') -def convert_gzips(dir): +def convert_gzips(dir, outdir): for gz in find_ext('gz')(dir): - dest = drop_ext(gz) + dest = swap_dir(outdir)(drop_ext(gz)) with gzip.open( gz, 'rb' ) as input: with open(dest, 'w') as output: logger.info('Unpacking {0} to {1}'.format(gz, dest)) output.write(input.read()) -def convert_formats(dir): - convert_gzips(dir) - convert_ab1(dir) - convert_sff(dir) +def convert_formats(dir, outdir): + convert_gzips(dir, outdir) + convert_ab1(dir, outdir) + convert_sff(dir, outdir) -def get_dir_arg(): - if os.path.isdir(sys.argv[1]): - return sys.argv[1] +def get_dir_args(): + if os.path.isdir(sys.argv[1]) and os.path.isdir(sys.argv[2]): + return sys.argv[1], sys.argv[2] else: - raise ValueError("Path %s is not a directory" % sys.argv[1]) + raise ValueError("Path %s or %s is not a directory" % (sys.argv[1], sys.argv[2])) def main_convert_formats(): - convert_formats(get_dir_arg()) + convert_formats(*get_dir_args()) def main_sff_convert(): - convert_sff(get_dir_arg()) + convert_sff(*get_dir_args()) # sff_names = filter(lambda x: x.endswith('sff'), os.listdir(dir)) # sff_paths = map(partial(os.path.join, dir), sff_names) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index 9a6f33ed..f723d5e9 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -380,20 +380,23 @@ def select_keys(d, keys): return dict( ((k, v) for k, v in d.items() if k in keys)) #convert sffs to fastq + convert_dir = os.path.join(tdir,'converted') - print sh.convert_formats(cmd_args['readsdir'], _out=sys.stdout, _err=sys.stderr) + print sh.convert_formats(cmd_args['readsdir'], convert_dir, _out=sys.stdout, _err=sys.stderr) #print sh.sff_to_fastq(cmd_args['readsdir'], _out=sys.stdout, _err=sys.stderr) try: if cmd_args['config']: - __result = sh.ngs_filter(cmd_args['readsdir'], config=cmd_args['config'], outdir=cmd_args['filtered_dir']) + __result = sh.ngs_filter(convert_dir, config=cmd_args['config'], outdir=cmd_args['filtered_dir']) else: filter_args = select_keys(cmd_args, ["drop_ns", "platforms", "index_min"]) - __result = sh.ngs_filter(cmd_args['readsdir'], outdir=cmd_args['filtered_dir'], **filter_args) + __result = sh.ngs_filter(convert_dir, outdir=cmd_args['filtered_dir'], **filter_args) logger.debug( 'ngs_filter: %s' % __result ) except sh.ErrorReturnCode, e: logger.error(e.stderr) sys.exit(1) + sh.rm(convert_dir, r=True) + #Trim reads cmd = 'trim_reads {filtered_dir} -q {trim_qual} -o {trim_outdir} --head-crop {head_crop}' if cmd_args['config']: From 25e46b41e31184c4a366a0648651ff49b42d9983 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 11:28:20 -0400 Subject: [PATCH 25/31] outdir won't exist yet --- ngs_mapper/file_formats.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ngs_mapper/file_formats.py b/ngs_mapper/file_formats.py index 31815426..a97c47ae 100644 --- a/ngs_mapper/file_formats.py +++ b/ngs_mapper/file_formats.py @@ -54,8 +54,9 @@ def convert_formats(dir, outdir): convert_sff(dir, outdir) def get_dir_args(): - if os.path.isdir(sys.argv[1]) and os.path.isdir(sys.argv[2]): - return sys.argv[1], sys.argv[2] + if os.path.isdir(sys.argv[1]): + indir, outdir = sys.argv[1], sys.argv[2] + os.mkdir(outdir) else: raise ValueError("Path %s or %s is not a directory" % (sys.argv[1], sys.argv[2])) @@ -63,8 +64,8 @@ def main_convert_formats(): convert_formats(*get_dir_args()) -def main_sff_convert(): - convert_sff(*get_dir_args()) +def main_sff_convert(): + convert_sff(*get_dir_args()) # sff_names = filter(lambda x: x.endswith('sff'), os.listdir(dir)) # sff_paths = map(partial(os.path.join, dir), sff_names) From f96264669f6479ff2d2e0c32d09c33e4878e80df Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 11:54:32 -0400 Subject: [PATCH 26/31] fix trivial errors --- ngs_mapper/file_formats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ngs_mapper/file_formats.py b/ngs_mapper/file_formats.py index a97c47ae..addc9c76 100644 --- a/ngs_mapper/file_formats.py +++ b/ngs_mapper/file_formats.py @@ -16,7 +16,7 @@ drop_ext = lambda s: '.'.join(s.split('.')[:-1]) swap_ext = lambda ext: lambda s: drop_ext(s) + '.' + ext find_ext = lambda ext: lambda dir: glob("%s/*%s" % (dir, ext)) -swap_dir = lambda dir: lambda p: os.path.join(dir, basename(p)) +swap_dir = lambda dir: lambda p: os.path.join(dir, os.path.basename(p)) def convert_sff(dir, outdir): sff_paths = find_ext('sff')(dir) @@ -57,6 +57,7 @@ def get_dir_args(): if os.path.isdir(sys.argv[1]): indir, outdir = sys.argv[1], sys.argv[2] os.mkdir(outdir) + return indir, outdir else: raise ValueError("Path %s or %s is not a directory" % (sys.argv[1], sys.argv[2])) From a1deff997cabe4848a8777830100b7e7560761a9 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 12:02:09 -0400 Subject: [PATCH 27/31] now symlinks fastq files --- ngs_mapper/file_formats.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ngs_mapper/file_formats.py b/ngs_mapper/file_formats.py index addc9c76..658dec0e 100644 --- a/ngs_mapper/file_formats.py +++ b/ngs_mapper/file_formats.py @@ -47,11 +47,16 @@ def convert_gzips(dir, outdir): with open(dest, 'w') as output: logger.info('Unpacking {0} to {1}'.format(gz, dest)) output.write(input.read()) +def link_fastqs(dir, outdir): + for fq in find_ext('fastq')(dir): + dest = swap_dir(outdir)(fq) + os.symlink(os.path.abspath(fq), os.path.abspath(dest)) def convert_formats(dir, outdir): convert_gzips(dir, outdir) convert_ab1(dir, outdir) convert_sff(dir, outdir) + link_fastqs(dir, outdir) def get_dir_args(): if os.path.isdir(sys.argv[1]): From a43120e4c2ba93a01441d3d403d27c4a3c727fc6 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 12:12:22 -0400 Subject: [PATCH 28/31] no longer remove convert dir (ngs_filter symlinks) --- ngs_mapper/runsample.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index f723d5e9..53797c36 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -395,7 +395,7 @@ def select_keys(d, keys): logger.error(e.stderr) sys.exit(1) - sh.rm(convert_dir, r=True) + #sh.rm(convert_dir, r=True) #Trim reads cmd = 'trim_reads {filtered_dir} -q {trim_qual} -o {trim_outdir} --head-crop {head_crop}' From 187c38c452aca6cbcc2d3f63906911ec35451392 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 12:18:55 -0400 Subject: [PATCH 29/31] filter application never gets called if no filter options --- ngs_mapper/runsample.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index 53797c36..2073eb0f 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -384,16 +384,19 @@ def select_keys(d, keys): print sh.convert_formats(cmd_args['readsdir'], convert_dir, _out=sys.stdout, _err=sys.stderr) #print sh.sff_to_fastq(cmd_args['readsdir'], _out=sys.stdout, _err=sys.stderr) - try: - if cmd_args['config']: - __result = sh.ngs_filter(convert_dir, config=cmd_args['config'], outdir=cmd_args['filtered_dir']) - else: - filter_args = select_keys(cmd_args, ["drop_ns", "platforms", "index_min"]) - __result = sh.ngs_filter(convert_dir, outdir=cmd_args['filtered_dir'], **filter_args) - logger.debug( 'ngs_filter: %s' % __result ) - except sh.ErrorReturnCode, e: - logger.error(e.stderr) - sys.exit(1) + if cmd_args["drop_ns"] or cm_args["index_min"]: + try: + if cmd_args['config']: + __result = sh.ngs_filter(convert_dir, config=cmd_args['config'], outdir=cmd_args['filtered_dir']) + else: + filter_args = select_keys(cmd_args, ["drop_ns", "platforms", "index_min"]) + __result = sh.ngs_filter(convert_dir, outdir=cmd_args['filtered_dir'], **filter_args) + logger.debug( 'ngs_filter: %s' % __result ) + except sh.ErrorReturnCode, e: + logger.error(e.stderr) + sys.exit(1) + else: + cmd_args['filtered_dir'] = convert_dir #sh.rm(convert_dir, r=True) From 22366edefdf35fd4a47b1f1370d80ad75b66ffeb Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Wed, 6 Apr 2016 12:26:21 -0400 Subject: [PATCH 30/31] fastq reads get gnuzipped after they're used --- ngs_mapper/runsample.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index 2073eb0f..549e1a9f 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -373,6 +373,7 @@ def main(): shutil.copy( args.reference, cmd_args['reference'] ) # Return code list + zipfile = sh.gnuzip rets = [] logger.debug(cmd_args) #Filter @@ -395,6 +396,7 @@ def select_keys(d, keys): except sh.ErrorReturnCode, e: logger.error(e.stderr) sys.exit(1) + zipfile(convert_dir) else: cmd_args['filtered_dir'] = convert_dir @@ -412,6 +414,8 @@ def select_keys(d, keys): if rets[-1] != 0: logger.critical( "{0} did not exit sucessfully".format(cmd.format(**cmd_args)) ) + zipfile(cmd_args['filtered_dir']) + # Filter on index quality and Ns # Mapping @@ -477,6 +481,8 @@ def select_keys(d, keys): logger.critical( "{0} did not exit sucessfully".format(cmd) ) rets.append( r ) + zipfile(cmd_args['trim_outdir']) + # Consensus cmd = 'vcf_consensus {vcf} -i {samplename} -o {consensus}' p = run_cmd( cmd.format(**cmd_args), stdout=lfile, stderr=subprocess.STDOUT ) From 36381957c6d9c2749aa2a121984981fc11722270 Mon Sep 17 00:00:00 2001 From: Mike Panciera Date: Wed, 6 Apr 2016 12:31:06 -0400 Subject: [PATCH 31/31] accidentally quoted variable --- ngs_mapper/file_formats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngs_mapper/file_formats.py b/ngs_mapper/file_formats.py index 658dec0e..dea37eee 100644 --- a/ngs_mapper/file_formats.py +++ b/ngs_mapper/file_formats.py @@ -36,7 +36,7 @@ def wrapped_conv(a, b): def convert_ab1(dir, outdir): for abi in find_ext('ab1')(dir): dest = swap_ext('fastq')(abi) - dest = swap_dir('outdir')(dest) + dest = swap_dir(outdir)(dest) logger.info('Converting {0} to {1}'.format(abi, dest)) SeqIO.convert(abi, 'abi', dest, 'fastq')