Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster filter #215

Merged
merged 19 commits into from
Mar 30, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ngs_mapper/config.yaml.default
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ ngs_filter:
indexQualityMin:
default: 0
help: 'The index for each associated MiSeq read must be equal or above this value.'
threads:
default: 1
help: 'How many threads to use for bwa[Default: %(default)s]'
platforms:
choices:
- MiSeq
Expand Down
41 changes: 29 additions & 12 deletions ngs_mapper/nfilter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'''
Usage: ngs_filter <readdir> [--parallel] [--drop-ns ] [--index-min=<index_min>] [--platforms <PLATFORMS>] [--outdir <DIR>] [--config <CONFIG>]
Usage: ngs_filter <readdir> [--threads=<threads>] [--drop-ns ] [--index-min=<index_min>] [--platforms <PLATFORMS>] [--outdir <DIR>] [--config <CONFIG>]

Options:
--outdir=<DIR>,-o=<DIR> outupt directory [Default: filtered]
--config=<CONFIG>,-c=<CONFIG> Derive options from provided YAML file instead of commandline
--parallel Use python's multiprocessing to run on multiple cores
--threads=<threads> Number of files to filter in parallel. [Default: 1]
--platforms=<PLATFORMS> Only accept reads from specified machines. Choices: 'Roche454','IonTorrent','MiSeq', 'Sanger', 'All', [Default: All]

Help:
Expand Down Expand Up @@ -71,7 +71,7 @@ def flatten(container):
yield j
else:
yield i
def map_to_dir(readsdir, func, platforms, parallel=False):
def map_to_dir(readsdir, idxQualMin, dropNs, platforms, outdir, threads):
'''maps *func* to all fastq/sff files which are not indexes.
fetch the fastqs and indexes of the directory and write the filtered results.'''
#no_index_fqs = fqs_excluding_indices(readsdir)
Expand Down Expand Up @@ -104,14 +104,25 @@ def is_valid(fn):
msg= "Skipped files %s that were not within chosen platforms %s" % ( plat_files, platforms)
if not files:
raise ValueError("No fastq or sff files found in directory %s" % readsdir + '\n' + msg)
if parallel:
if threads > 1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it would work fine, but I'm wondering why you took this approach instead of just pool = multiprocessing.Pool(threads)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about that option

iter_func = partial(write_groups, idxQualMin=idxQualMin,
dropNs=dropNs, outdir=outdir)
def make_groups(numGroups, seq):
groups = [[] for _ in xrange(numGroups)]
i = 0
for x in seq:
groups[i] = [x] + groups[i]
i = (i + 1) % numGroups
return groups
groups = make_groups(threads, files)
pool = multiprocessing.Pool()
outpaths = pool.map(func, files)
outpaths = list(chain(*pool.map(iter_func, groups)))
pool.close()
pool.join()
return outpaths
else:
logger.debug("mapping filters over read files %s in directory %s" % (files, readsdir))
func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)
return map(func, files)

def idx_filter(read, idxread, thresh):
Expand Down Expand Up @@ -188,10 +199,14 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'):
statfile.write(msg)
return outpath

def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, parallel=False):
def write_groups(paths, idxQualMin, dropNs, outdir):
func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)
return map(func, paths)

def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, threads=0):
'''execute write_filtered on the whole directory'''
write_filters = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)#, parallel=parallel)
return map_to_dir(readsdir, write_filters, platforms, parallel)
return map_to_dir(readsdir, idxQualMin=idxQualMin, dropNs=dropNs,
platforms=platforms, outdir=outdir, threads=threads)#, parallel=parallel)

def mkdir_p(dir):
''' emulate bash command $ mkdir -p '''
Expand All @@ -203,10 +218,12 @@ def picked_platforms(rawarg):
return [ p for p in ALLPLATFORMS if p.lower() in rawarg.lower()]


def run_from_config(readsdir, outdir, config_path, parallel):
def run_from_config(readsdir, outdir, config_path):
_config = load_config(config_path)
defaults = _config['ngs_filter']
return write_post_filter(readsdir, defaults['indexQualityMin']['default'], defaults['dropNs']['default'], defaults['platforms']['default'], outdir, parallel)
return write_post_filter(readsdir, defaults['indexQualityMin']['default'],
defaults['dropNs']['default'], defaults['platforms']['default'],
outdir, defaults['threads']['default'])

def main():
scheme = Schema(
Expand All @@ -223,10 +240,10 @@ def main():
args = scheme.validate(raw_args)
mkdir_p(args['--outdir'])
if args['--config']:
run_from_config(args['<readdir>'], args['--outdir'], args['--config'], args['--parallel'])
run_from_config(args['<readdir>'], args['--outdir'], args['--config'])
return 0
dropNs, idxMin = args['--drop-ns'], args['--index-min']
minmin, minmax = 0, 50
outpaths = write_post_filter(args['<readdir>'], idxMin, dropNs,
args['--platforms'], args['--outdir'], args['--parallel'])
args['--platforms'], args['--outdir'], args['--threads'])
return 0
5 changes: 3 additions & 2 deletions ngs_mapper/tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ def test_with_config(self, mload_config):
mfig = { 'ngs_filter' :
{'platforms' : { 'default' : ['Sanger'] },
'dropNs' : { 'default' : True },
'indexQualityMin' : {'default' : 32}}
'indexQualityMin' : {'default' : 32},
'threads' : {'default' : 2}}
}
mload_config.return_value = mfig
run_from_config(self.inputdir,self.outdir, '_', False)
run_from_config(self.inputdir,self.outdir, '_')
actual = open(self.actualfn)
expected = open(self.expectedfn)
self.assertFilesEqual(expected, actual)