diff --git a/ngs_mapper/config.yaml.default b/ngs_mapper/config.yaml.default index c93ce498..795b790c 100644 --- a/ngs_mapper/config.yaml.default +++ b/ngs_mapper/config.yaml.default @@ -2,6 +2,9 @@ # Does not need the trailing / NGSDATA: &NGSDATA /path/to/NGSDATA +# Default threads to use for any stage that supports it +THREADS: &THREADS 1 + # All scripts by name should be top level items # Sub items then are the option names(the dest portion of the add_arugment for the script) # Each option needs to define the default as well as the help message @@ -10,8 +13,11 @@ ngs_filter: default: False help: 'Drop all reads that have Ns.' indexQualityMin: - default: 1 + default: 0 help: 'The index for each associated MiSeq read must be equal or above this value.' + threads: + default: *THREADS + help: 'How many threads to use[Default: %(default)s]' platforms: choices: - MiSeq @@ -78,7 +84,7 @@ run_bwa_on_samplename: default: False help: 'Flag to indicate that you want the temporary files kept instead of removing them[Default: %(default)s]' threads: - default: 1 + default: *THREADS help: 'How many threads to use for bwa[Default: %(default)s]' tagreads: SM: @@ -110,7 +116,7 @@ base_caller: default: 10 help: 'What factor to bias high quality bases by. Must be an integer >= 1[Default: %(default)s]' threads: - default: 1 + default: *THREADS help: 'How many threads to use when running base_caller.py[Default: %(default)s]' miseq_sync: ngsdata: diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index aaddab15..65c32ec3 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -1,10 +1,10 @@ ''' -Usage: ngs_filter [--parallel] [--drop-ns ] [--index-min=] [--platforms ] [--outdir ] [--config ] +Usage: ngs_filter [--threads=] [--drop-ns ] [--index-min=] [--platforms ] [--outdir ] [--config ] Options: --outdir=,-o= outupt directory [Default: filtered] --config=,-c= Derive options from provided YAML file instead of commandline - --parallel Use python's multiprocessing to run on multiple cores + --threads= Number of files to filter in parallel. [Default: 1] --platforms= Only accept reads from specified machines. Choices: 'Roche454','IonTorrent','MiSeq', 'Sanger', 'All', [Default: All] Help: @@ -18,13 +18,15 @@ from Bio import SeqIO from docopt import docopt from schema import Schema, Use, Optional, Or -from itertools import ifilterfalse, izip, chain +from itertools import ifilterfalse, izip, chain, izip_longest import re import os import sys import warnings from data import reads_by_plat from ngs_mapper.config import load_config +import log +logger = log.setup_logger(__name__, log.get_config()) ALLPLATFORMS = 'Roche454','IonTorrent','MiSeq', 'Sanger' #TODO: should guarantee that there is an index file or allow there not to be one @@ -69,20 +71,18 @@ def flatten(container): yield j else: yield i -def map_to_dir(readsdir, func, platforms, parallel=False): +def map_to_dir(readsdir, idxQualMin, dropNs, platforms, outdir, threads): '''maps *func* to all fastq/sff files which are not indexes. fetch the fastqs and indexes of the directory and write the filtered results.''' #no_index_fqs = fqs_excluding_indices(readsdir) plat_files_dict = reads_by_plat(readsdir) #_nested_files = map(plat_files_dict.get, platforms) _nested_files = filter(None, map(plat_files_dict.get, platforms)) - print _nested_files if not _nested_files: raise ValueError("No fastq files found in directory %s matching platforms. \ Files %s that were not within chosen platforms %s" % ( readsdir, os.listdir(readsdir), platforms) ) #plat_files = list(chain(*chain(*_nested_files))) plat_files =flatten(_nested_files) - print plat_files is_index = partial(re.search, r'_I[12]_') # also this currently skips ab1 files # The problem seems to be that of the below reads, only uses _R1_. @@ -98,28 +98,26 @@ def map_to_dir(readsdir, func, platforms, parallel=False): #947__1__TI86__2012_08_06__Unk.sff #947__2__TI86__2012_08_06__Unk.sff -# def is_valid(fn): -# return is_fastq(fn) and not is_index(fn) def is_valid(fn): return not is_index(fn) and is_fastq(fn) files = filter(is_valid, plat_files) msg= "Skipped files %s that were not within chosen platforms %s" % ( plat_files, platforms) if not files: raise ValueError("No fastq or sff files found in directory %s" % readsdir + '\n' + msg) - if parallel: - pool = multiprocessing.Pool() - outpaths = pool.map(func, files) - pool.close() - pool.join() - return outpaths - else: - print "mapping filters over read files %s in directory %s" % (files, readsdir) - return map(func, files) + logger.debug( + "Using {0} threads to map filters over read files {1} in directory {2}" + .format(threads, files, readsdir) + ) + func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) + pool = multiprocessing.Pool(threads) + outpaths = pool.map(func, files) + pool.close() + pool.join() + return outpaths def idx_filter(read, idxread, thresh): ''' AT or ABOVE threshold.''' return min(idxread._per_letter_annotations['phred_quality']) >= thresh - formats={ 'sff' : 'sff', 'fq' : 'fastq', 'fastq' : 'fastq', 'fa' : 'fasta', 'fasta' : 'fasta' } extension = lambda s: s.split('.')[-1] compose = lambda f, g: lambda x: f(g(x)) @@ -136,52 +134,69 @@ def make_filtered(readpath, idxQualMin, dropNs): format = getformat(readpath) # right now this silently skips fq_open = partial(SeqIO.parse, format=format) - reads = [] + def filterReads(readsWithMaybeIndex): + total = badIndex = hadNCount = 0 + read, idxRead = None, None + for read, idxRead in readsWithMaybeIndex: + indexIsBad = False + if idxRead: + indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin + badIndex += int(indexIsBad) + hasN = False + if dropNs: + hasN = 'N' in str(read.seq).upper() + hadNCount += int(hasN) + dropRead = hasN or indexIsBad + total += 1 + if dropRead: + read = None + yield (total, badIndex, hadNCount, read) try: - reads = list(fq_open(readpath)) + indexReads = [] if not index else fq_open(index) + reads = fq_open(readpath) except AssertionError, E: - print "skipping biopython assertion error" - print readpath - #sys.stderr.write(str(E)) - if index and idxQualMin: - idxreads = fq_open(index) - intermediate = [r for r, idx in izip(reads, idxreads) if idx_filter(r, idx, idxQualMin) ] - else: - intermediate = reads - if dropNs: - hasNs = lambda rec: 'N' in str(rec.seq).upper() - filtered = list(ifilterfalse(hasNs, intermediate)) - else: - filtered = intermediate - total, badIndex, hadN = len(reads), len(reads) - len(intermediate), \ - len(intermediate) - len(filtered) - return filtered, total, badIndex, hadN + logger.debug("skipping biopython assertion error in file %s " % readpath) + readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None) + return filterReads(readsWithMaybeIndex) def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): '''write the results to the new directory. Also writes a stats file to outdir/ngs_filter_stats.txt, with basic information about how many reads were filtered.''' - results, total, badIndex, hadN = make_filtered(readpath, idxQualMin, dropNs) + results = make_filtered(readpath, idxQualMin, dropNs) outpath = name_filtered(readpath, outdir) - msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN), - stat_body.format(readpath, badIndex, idxQualMin, hadN) ]) - print msg + if not idxQualMin and not dropNs: + os.symlink(os.path.abspath(readpath), os.path.abspath(outpath)) + logger.warn("Index Quality was %s and dropNs was set to %s, so file %s was copied to %s without filtering" % (idxQualMin, dropNs, readpath, outpath)) + return outpath try: - num_written = SeqIO.write(results, outpath, 'fastq') - print "filtered reads from %s will be written to %s" % (readpath, outpath) - print "%s reads left after filtering." % num_written + num_written = 0 + with open(outpath, 'w') as outfile: + for total, badIndex, hadN, read in results: + if read: + SeqIO.write(read, outfile, 'fastq') + num_written += 1 + logger.info("filtered reads from %s will be written to %s" % (readpath, outpath)) + logger.info("%s reads left after filtering." % num_written) if num_written <= 0: + logger.warn("No reads left after filtering! Quality controls eliminated all reads. Drop-Ns was set to %s; maybe try again with lower quality min than %s. " %(dropNs, idxQualMin)) warnings.warn("No reads left after filtering! Quality controls eliminated all reads. Drop-Ns was set to %s; maybe try again with lower quality min than %s. " %(dropNs, idxQualMin)) except AssertionError, E: - print "skipping biopython assertion error" + logger.debug("skipping biopython assertion error") #sys.stderr.write(str(E)) - with open(os.path.join(outdir, STATSFILE_NAME), 'w') as statfile: + msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN), + stat_body.format(readpath, badIndex, idxQualMin, hadN) ]) + with open(os.path.join(outdir, STATSFILE_NAME), 'a') as statfile: statfile.write(msg) return outpath -def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, parallel=False): +def write_groups(paths, idxQualMin, dropNs, outdir): + func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir) + return map(func, paths) + +def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, threads=1): '''execute write_filtered on the whole directory''' - write_filters = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)#, parallel=parallel) - return map_to_dir(readsdir, write_filters, platforms, parallel) + return map_to_dir(readsdir, idxQualMin=idxQualMin, dropNs=dropNs, + platforms=platforms, outdir=outdir, threads=threads)#, parallel=parallel) def mkdir_p(dir): ''' emulate bash command $ mkdir -p ''' @@ -193,16 +208,18 @@ def picked_platforms(rawarg): return [ p for p in ALLPLATFORMS if p.lower() in rawarg.lower()] -def run_from_config(readsdir, outdir, config_path, parallel): +def run_from_config(readsdir, outdir, config_path): _config = load_config(config_path) defaults = _config['ngs_filter'] - return write_post_filter(readsdir, defaults['indexQualityMin']['default'], defaults['dropNs']['default'], defaults['platforms']['default'], outdir, parallel) + return write_post_filter(readsdir, defaults['indexQualityMin']['default'], + defaults['dropNs']['default'], defaults['platforms']['default'], + outdir, defaults['threads']['default']) def main(): scheme = Schema( { '' : os.path.isdir, Optional('--drop-ns') : bool, - Optional('--parallel') : bool, + Optional('--threads') : Use(int), Optional('--index-min') : Use(lambda x: int(x) if x else x, error="--index-min expects an integer"), Optional('--platforms') : Use(picked_platforms), Optional('--config') : Or(str, lambda x: x is None), @@ -211,16 +228,12 @@ def main(): raw_args = docopt(__doc__, version='Version 1.0') args = scheme.validate(raw_args) - print args mkdir_p(args['--outdir']) if args['--config']: - run_from_config(args[''], args['--outdir'], args['--config'], args['--parallel']) + run_from_config(args[''], args['--outdir'], args['--config']) return 0 dropNs, idxMin = args['--drop-ns'], args['--index-min'] - minmin, minmax = 1, 50 - if not (dropNs or (minmin <= idxMin <=minmax)): - raise ValueError("No filter specified, drop Ns:%s, Index Quality Min:%s" % (dropNs, idxMin)) - status = "\nfiltering with specifications, drop Ns:%s, Index Quality Min:%s\nfrom folder %s to folder %s" % (dropNs, idxMin, args[''], args['--outdir']) - print status - outpaths = write_post_filter(args[''], idxMin, dropNs, args['--platforms'], args['--outdir'], args['--parallel']) + minmin, minmax = 0, 50 + outpaths = write_post_filter(args[''], idxMin, dropNs, + args['--platforms'], args['--outdir'], args['--threads']) return 0 diff --git a/ngs_mapper/tests/test_filter.py b/ngs_mapper/tests/test_filter.py index 7b407508..eb90ce9e 100644 --- a/ngs_mapper/tests/test_filter.py +++ b/ngs_mapper/tests/test_filter.py @@ -73,11 +73,9 @@ def test_filter_raises_error_on_empty_filtered_result(self): self.assertEquals(len(w), 1) #with self.assertRaises(ValueError): - def test_stat_file_none_filtered(self): + def test_symlink_file_none_filtered(self): write_filtered(self.inputfn, 0, False, outdir=self.outdir) - expected = '''ngs_filter found %s reads in file %s, and filtered out %s reads.''' % (4, self.inputfn, 0) - actual = open(self.statsfile).readlines()[0].strip() - self.assertEquals(actual, expected) + self.assertTrue(os.path.islink(self.actualfn)) def test_stat_file_two_filtered(self): write_post_filter(self.inputdir, 32, True, ['Sanger'], self.outdir) @@ -100,10 +98,11 @@ def test_with_config(self, mload_config): mfig = { 'ngs_filter' : {'platforms' : { 'default' : ['Sanger'] }, 'dropNs' : { 'default' : True }, - 'indexQualityMin' : {'default' : 32}} + 'indexQualityMin' : {'default' : 32}, + 'threads' : {'default' : 2}} } mload_config.return_value = mfig - run_from_config(self.inputdir,self.outdir, '_', False) + run_from_config(self.inputdir,self.outdir, '_') actual = open(self.actualfn) expected = open(self.expectedfn) self.assertFilesEqual(expected, actual)