Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rm dupes #235

Open
wants to merge 38 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d0b94bf
ngs_filter now uses generators
Mar 23, 2016
92dc4c7
remove tuple argument
Mar 23, 2016
a1797d5
fix missing variable definition
Mar 23, 2016
36cc00d
deleting some comments
averagehat Mar 24, 2016
bda87ee
moved from reduce to cytoolz.accumulate
Mar 25, 2016
cd828bf
explicit generator instead of `accumulate`
Mar 25, 2016
8362668
Merge branch 'faster-filter' of https://github.com/VDBWRAIR/ngs_mappe…
Mar 25, 2016
719e7a0
print statements -> logger
Mar 28, 2016
951bac3
filter now symlinks if qual=0 and dropNs=False
Mar 28, 2016
3197f2f
filter minquality is now 0 to avoid wasted runtime
Mar 28, 2016
1397dc3
`nfilter` now uses `threads` argument/config for parallelism
Mar 28, 2016
ff63261
fixed arg schema
Mar 28, 2016
0f85f47
fixed missing `=`
averagehat Mar 28, 2016
3510db4
fix 'w'/'a' bug in stats file, where it got overwritten each read
Mar 28, 2016
999dc98
Closes #221
necrolyte2 Mar 29, 2016
bf18479
attempting to simplify threads for multiprocessing pool
necrolyte2 Mar 29, 2016
3c2f363
fixed incorrect function name in pool.map
necrolyte2 Mar 29, 2016
1efc9cd
fixing map call even good-er
necrolyte2 Mar 29, 2016
03ece3f
Merge pull request #226 from VDBWRAIR/fix-221
averagehat Mar 29, 2016
b3639f1
Merge pull request #228 from VDBWRAIR/faster-filter-pool
averagehat Mar 29, 2016
3ed7521
Merge remote-tracking branch 'origin/master' into faster-filter
Mar 29, 2016
da98cc7
fixed symlink by using abspath
Mar 29, 2016
c4702bd
Closes #229. Moved qsub argument parsing into parse_args so they can …
necrolyte2 Mar 30, 2016
c9feddf
fixes drop-ns. Closes #229
necrolyte2 Mar 30, 2016
b99c16b
unit tests folder...didn't see them before
necrolyte2 Mar 30, 2016
3677d30
Merge pull request #215 from VDBWRAIR/faster-filter
necrolyte2 Mar 30, 2016
df39b43
Merge pull request #230 from VDBWRAIR/fix-229
averagehat Mar 30, 2016
61cc534
include docs link and build steps
averagehat Mar 30, 2016
eaacb53
add documentation link to readme
averagehat Mar 30, 2016
3a25c80
convert_formats now spits out to a new directory
averagehat Apr 6, 2016
25e46b4
outdir won't exist yet
averagehat Apr 6, 2016
f962646
fix trivial errors
averagehat Apr 6, 2016
a1deff9
now symlinks fastq files
averagehat Apr 6, 2016
a43120e
no longer remove convert dir (ngs_filter symlinks)
averagehat Apr 6, 2016
187c38c
filter application never gets called if no filter options
averagehat Apr 6, 2016
22366ed
fastq reads get gnuzipped after they're used
averagehat Apr 6, 2016
3638195
accidentally quoted variable
averagehat Apr 6, 2016
9a386a8
Merge branch 'convert-fix' of https://github.com/VDBWRAIR/ngs_mapper …
averagehat Apr 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ Installation

`install <doc/source/install.rst>`_


Documentation
------------


`Documentation <https://github.com/VDBWRAIR/ngs_mapper/blob/master/doc/source/install.rst#documentation>`_


Upgrading
---------

Expand Down
2 changes: 1 addition & 1 deletion doc/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ You will probably want to be able to run an entire samplesheet with a custom con

.. code-block:: bash

$> RUNSAMPLEOPTIONS"-c config.yaml" runsamplesheet.sh /path/to/NGSData/ReadsBySample samplesheet.tsv
$> RUNSAMPLEOPTIONS="-c config.yaml" runsamplesheet.sh /path/to/NGSData/ReadsBySample samplesheet.tsv

Editing config.yaml
===================
Expand Down
13 changes: 13 additions & 0 deletions doc/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,16 @@ You can pseudo test the installation of the pipeline by running the functional t
.. code-block:: bash

ngs_mapper/tests/slow_tests.sh

Documentation
-------------
The documentation is available to view online at http://ngs-mapper.readthedocs.org/en/latest/ By default this site always shows the latest documentation; you can select your verion by clicking `v:latest` in the bottom left menu, then selecting your version number from `Versions.`

If for any reason you need to use the documentation locally, you can build it:

.. code-block:: bash

cd doc
make clean && make html
firefox build/html/install.html#build-and-view-complete-documentation
cd ..
12 changes: 9 additions & 3 deletions ngs_mapper/config.yaml.default
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Does not need the trailing /
NGSDATA: &NGSDATA /path/to/NGSDATA

# Default threads to use for any stage that supports it
THREADS: &THREADS 1

# All scripts by name should be top level items
# Sub items then are the option names(the dest portion of the add_arugment for the script)
# Each option needs to define the default as well as the help message
Expand All @@ -10,8 +13,11 @@ ngs_filter:
default: False
help: 'Drop all reads that have Ns.'
indexQualityMin:
default: 1
default: 0
help: 'The index for each associated MiSeq read must be equal or above this value.'
threads:
default: *THREADS
help: 'How many threads to use[Default: %(default)s]'
platforms:
choices:
- MiSeq
Expand Down Expand Up @@ -78,7 +84,7 @@ run_bwa_on_samplename:
default: False
help: 'Flag to indicate that you want the temporary files kept instead of removing them[Default: %(default)s]'
threads:
default: 1
default: *THREADS
help: 'How many threads to use for bwa[Default: %(default)s]'
tagreads:
SM:
Expand Down Expand Up @@ -110,7 +116,7 @@ base_caller:
default: 10
help: 'What factor to bias high quality bases by. Must be an integer >= 1[Default: %(default)s]'
threads:
default: 1
default: *THREADS
help: 'How many threads to use when running base_caller.py[Default: %(default)s]'
miseq_sync:
ngsdata:
Expand Down
42 changes: 26 additions & 16 deletions ngs_mapper/file_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
drop_ext = lambda s: '.'.join(s.split('.')[:-1])
swap_ext = lambda ext: lambda s: drop_ext(s) + '.' + ext
find_ext = lambda ext: lambda dir: glob("%s/*%s" % (dir, ext))
swap_dir = lambda dir: lambda p: os.path.join(dir, os.path.basename(p))

def convert_sff(dir):
def convert_sff(dir, outdir):
sff_paths = find_ext('sff')(dir)
outnames = map(swap_ext('fastq'), sff_paths)
outnames = map(swap_dir(outdir), outnames)
def wrapped_conv(a, b):
logger.info('Converting {0} to {1}'.format(a, b))
n = 0
Expand All @@ -31,37 +33,45 @@ def wrapped_conv(a, b):
return n
return sum(map(wrapped_conv, sff_paths, outnames))

def convert_ab1(dir):
def convert_ab1(dir, outdir):
for abi in find_ext('ab1')(dir):
dest = swap_ext('fastq')(abi)
dest = swap_dir(outdir)(dest)
logger.info('Converting {0} to {1}'.format(abi, dest))
SeqIO.convert(abi, 'abi', dest, 'fastq')

def convert_gzips(dir):
def convert_gzips(dir, outdir):
for gz in find_ext('gz')(dir):
dest = drop_ext(gz)
dest = swap_dir(outdir)(drop_ext(gz))
with gzip.open( gz, 'rb' ) as input:
with open(dest, 'w') as output:
logger.info('Unpacking {0} to {1}'.format(gz, dest))
output.write(input.read())

def convert_formats(dir):
convert_gzips(dir)
convert_ab1(dir)
convert_sff(dir)

def get_dir_arg():
def link_fastqs(dir, outdir):
for fq in find_ext('fastq')(dir):
dest = swap_dir(outdir)(fq)
os.symlink(os.path.abspath(fq), os.path.abspath(dest))

def convert_formats(dir, outdir):
convert_gzips(dir, outdir)
convert_ab1(dir, outdir)
convert_sff(dir, outdir)
link_fastqs(dir, outdir)

def get_dir_args():
if os.path.isdir(sys.argv[1]):
return sys.argv[1]
indir, outdir = sys.argv[1], sys.argv[2]
os.mkdir(outdir)
return indir, outdir
else:
raise ValueError("Path %s is not a directory" % sys.argv[1])
raise ValueError("Path %s or %s is not a directory" % (sys.argv[1], sys.argv[2]))

def main_convert_formats():
convert_formats(get_dir_arg())
convert_formats(*get_dir_args())


def main_sff_convert():
convert_sff(get_dir_arg())
def main_sff_convert():
convert_sff(*get_dir_args())

# sff_names = filter(lambda x: x.endswith('sff'), os.listdir(dir))
# sff_paths = map(partial(os.path.join, dir), sff_names)
Expand Down
131 changes: 72 additions & 59 deletions ngs_mapper/nfilter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'''
Usage: ngs_filter <readdir> [--parallel] [--drop-ns ] [--index-min=<index_min>] [--platforms <PLATFORMS>] [--outdir <DIR>] [--config <CONFIG>]
Usage: ngs_filter <readdir> [--threads=<threads>] [--drop-ns ] [--index-min=<index_min>] [--platforms <PLATFORMS>] [--outdir <DIR>] [--config <CONFIG>]

Options:
--outdir=<DIR>,-o=<DIR> outupt directory [Default: filtered]
--config=<CONFIG>,-c=<CONFIG> Derive options from provided YAML file instead of commandline
--parallel Use python's multiprocessing to run on multiple cores
--threads=<threads> Number of files to filter in parallel. [Default: 1]
--platforms=<PLATFORMS> Only accept reads from specified machines. Choices: 'Roche454','IonTorrent','MiSeq', 'Sanger', 'All', [Default: All]

Help:
Expand All @@ -18,13 +18,15 @@
from Bio import SeqIO
from docopt import docopt
from schema import Schema, Use, Optional, Or
from itertools import ifilterfalse, izip, chain
from itertools import ifilterfalse, izip, chain, izip_longest
import re
import os
import sys
import warnings
from data import reads_by_plat
from ngs_mapper.config import load_config
import log
logger = log.setup_logger(__name__, log.get_config())

ALLPLATFORMS = 'Roche454','IonTorrent','MiSeq', 'Sanger'
#TODO: should guarantee that there is an index file or allow there not to be one
Expand Down Expand Up @@ -69,20 +71,18 @@ def flatten(container):
yield j
else:
yield i
def map_to_dir(readsdir, func, platforms, parallel=False):
def map_to_dir(readsdir, idxQualMin, dropNs, platforms, outdir, threads):
'''maps *func* to all fastq/sff files which are not indexes.
fetch the fastqs and indexes of the directory and write the filtered results.'''
#no_index_fqs = fqs_excluding_indices(readsdir)
plat_files_dict = reads_by_plat(readsdir)
#_nested_files = map(plat_files_dict.get, platforms)
_nested_files = filter(None, map(plat_files_dict.get, platforms))
print _nested_files
if not _nested_files:
raise ValueError("No fastq files found in directory %s matching platforms. \
Files %s that were not within chosen platforms %s" % ( readsdir, os.listdir(readsdir), platforms) )
#plat_files = list(chain(*chain(*_nested_files)))
plat_files =flatten(_nested_files)
print plat_files
is_index = partial(re.search, r'_I[12]_')
# also this currently skips ab1 files
# The problem seems to be that of the below reads, only uses _R1_.
Expand All @@ -98,28 +98,26 @@ def map_to_dir(readsdir, func, platforms, parallel=False):
#947__1__TI86__2012_08_06__Unk.sff
#947__2__TI86__2012_08_06__Unk.sff

# def is_valid(fn):
# return is_fastq(fn) and not is_index(fn)
def is_valid(fn):
return not is_index(fn) and is_fastq(fn)
files = filter(is_valid, plat_files)
msg= "Skipped files %s that were not within chosen platforms %s" % ( plat_files, platforms)
if not files:
raise ValueError("No fastq or sff files found in directory %s" % readsdir + '\n' + msg)
if parallel:
pool = multiprocessing.Pool()
outpaths = pool.map(func, files)
pool.close()
pool.join()
return outpaths
else:
print "mapping filters over read files %s in directory %s" % (files, readsdir)
return map(func, files)
logger.debug(
"Using {0} threads to map filters over read files {1} in directory {2}"
.format(threads, files, readsdir)
)
func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)
pool = multiprocessing.Pool(threads)
outpaths = pool.map(func, files)
pool.close()
pool.join()
return outpaths

def idx_filter(read, idxread, thresh):
''' AT or ABOVE threshold.'''
return min(idxread._per_letter_annotations['phred_quality']) >= thresh

formats={ 'sff' : 'sff', 'fq' : 'fastq', 'fastq' : 'fastq', 'fa' : 'fasta', 'fasta' : 'fasta' }
extension = lambda s: s.split('.')[-1]
compose = lambda f, g: lambda x: f(g(x))
Expand All @@ -136,52 +134,69 @@ def make_filtered(readpath, idxQualMin, dropNs):
format = getformat(readpath)
# right now this silently skips
fq_open = partial(SeqIO.parse, format=format)
reads = []
def filterReads(readsWithMaybeIndex):
total = badIndex = hadNCount = 0
read, idxRead = None, None
for read, idxRead in readsWithMaybeIndex:
indexIsBad = False
if idxRead:
indexIsBad = min(idxRead._per_letter_annotations['phred_quality']) < idxQualMin
badIndex += int(indexIsBad)
hasN = False
if dropNs:
hasN = 'N' in str(read.seq).upper()
hadNCount += int(hasN)
dropRead = hasN or indexIsBad
total += 1
if dropRead:
read = None
yield (total, badIndex, hadNCount, read)
try:
reads = list(fq_open(readpath))
indexReads = [] if not index else fq_open(index)
reads = fq_open(readpath)
except AssertionError, E:
print "skipping biopython assertion error"
print readpath
#sys.stderr.write(str(E))
if index and idxQualMin:
idxreads = fq_open(index)
intermediate = [r for r, idx in izip(reads, idxreads) if idx_filter(r, idx, idxQualMin) ]
else:
intermediate = reads
if dropNs:
hasNs = lambda rec: 'N' in str(rec.seq).upper()
filtered = list(ifilterfalse(hasNs, intermediate))
else:
filtered = intermediate
total, badIndex, hadN = len(reads), len(reads) - len(intermediate), \
len(intermediate) - len(filtered)
return filtered, total, badIndex, hadN
logger.debug("skipping biopython assertion error in file %s " % readpath)
readsWithMaybeIndex = izip_longest(reads, indexReads, fillvalue=None)
return filterReads(readsWithMaybeIndex)

def write_filtered(readpath, idxQualMin, dropNs, outdir='.'):
'''write the results to the new directory.
Also writes a stats file to outdir/ngs_filter_stats.txt, with basic information about how many reads were filtered.'''
results, total, badIndex, hadN = make_filtered(readpath, idxQualMin, dropNs)
results = make_filtered(readpath, idxQualMin, dropNs)
outpath = name_filtered(readpath, outdir)
msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN),
stat_body.format(readpath, badIndex, idxQualMin, hadN) ])
print msg
if not idxQualMin and not dropNs:
os.symlink(os.path.abspath(readpath), os.path.abspath(outpath))
logger.warn("Index Quality was %s and dropNs was set to %s, so file %s was copied to %s without filtering" % (idxQualMin, dropNs, readpath, outpath))
return outpath
try:
num_written = SeqIO.write(results, outpath, 'fastq')
print "filtered reads from %s will be written to %s" % (readpath, outpath)
print "%s reads left after filtering." % num_written
num_written = 0
with open(outpath, 'w') as outfile:
for total, badIndex, hadN, read in results:
if read:
SeqIO.write(read, outfile, 'fastq')
num_written += 1
logger.info("filtered reads from %s will be written to %s" % (readpath, outpath))
logger.info("%s reads left after filtering." % num_written)
if num_written <= 0:
logger.warn("No reads left after filtering! Quality controls eliminated all reads. Drop-Ns was set to %s; maybe try again with lower quality min than %s. " %(dropNs, idxQualMin))
warnings.warn("No reads left after filtering! Quality controls eliminated all reads. Drop-Ns was set to %s; maybe try again with lower quality min than %s. " %(dropNs, idxQualMin))
except AssertionError, E:
print "skipping biopython assertion error"
logger.debug("skipping biopython assertion error")
#sys.stderr.write(str(E))
with open(os.path.join(outdir, STATSFILE_NAME), 'w') as statfile:
msg = '\n'.join( [stat_header.format(total, readpath, badIndex + hadN),
stat_body.format(readpath, badIndex, idxQualMin, hadN) ])
with open(os.path.join(outdir, STATSFILE_NAME), 'a') as statfile:
statfile.write(msg)
return outpath

def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, parallel=False):
def write_groups(paths, idxQualMin, dropNs, outdir):
func = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)
return map(func, paths)

def write_post_filter(readsdir, idxQualMin, dropNs, platforms, outdir=None, threads=1):
'''execute write_filtered on the whole directory'''
write_filters = partial(write_filtered, idxQualMin=idxQualMin, dropNs=dropNs, outdir=outdir)#, parallel=parallel)
return map_to_dir(readsdir, write_filters, platforms, parallel)
return map_to_dir(readsdir, idxQualMin=idxQualMin, dropNs=dropNs,
platforms=platforms, outdir=outdir, threads=threads)#, parallel=parallel)

def mkdir_p(dir):
''' emulate bash command $ mkdir -p '''
Expand All @@ -193,16 +208,18 @@ def picked_platforms(rawarg):
return [ p for p in ALLPLATFORMS if p.lower() in rawarg.lower()]


def run_from_config(readsdir, outdir, config_path, parallel):
def run_from_config(readsdir, outdir, config_path):
_config = load_config(config_path)
defaults = _config['ngs_filter']
return write_post_filter(readsdir, defaults['indexQualityMin']['default'], defaults['dropNs']['default'], defaults['platforms']['default'], outdir, parallel)
return write_post_filter(readsdir, defaults['indexQualityMin']['default'],
defaults['dropNs']['default'], defaults['platforms']['default'],
outdir, defaults['threads']['default'])

def main():
scheme = Schema(
{ '<readdir>' : os.path.isdir,
Optional('--drop-ns') : bool,
Optional('--parallel') : bool,
Optional('--threads') : Use(int),
Optional('--index-min') : Use(lambda x: int(x) if x else x, error="--index-min expects an integer"),
Optional('--platforms') : Use(picked_platforms),
Optional('--config') : Or(str, lambda x: x is None),
Expand All @@ -211,16 +228,12 @@ def main():

raw_args = docopt(__doc__, version='Version 1.0')
args = scheme.validate(raw_args)
print args
mkdir_p(args['--outdir'])
if args['--config']:
run_from_config(args['<readdir>'], args['--outdir'], args['--config'], args['--parallel'])
run_from_config(args['<readdir>'], args['--outdir'], args['--config'])
return 0
dropNs, idxMin = args['--drop-ns'], args['--index-min']
minmin, minmax = 1, 50
if not (dropNs or (minmin <= idxMin <=minmax)):
raise ValueError("No filter specified, drop Ns:%s, Index Quality Min:%s" % (dropNs, idxMin))
status = "\nfiltering with specifications, drop Ns:%s, Index Quality Min:%s\nfrom folder %s to folder %s" % (dropNs, idxMin, args['<readdir>'], args['--outdir'])
print status
outpaths = write_post_filter(args['<readdir>'], idxMin, dropNs, args['--platforms'], args['--outdir'], args['--parallel'])
minmin, minmax = 0, 50
outpaths = write_post_filter(args['<readdir>'], idxMin, dropNs,
args['--platforms'], args['--outdir'], args['--threads'])
return 0
Loading