diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5e91c8f4..cd6afb89 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,11 @@ Version X - nfilter will now simply symlink if no options are supplied essentially skipping itself - nfilter utilizes threads from config file +- config file now has THREADS default +- fix for bug where some miseq reads were not identified correctly in tagreads +- convert functions now support output directory +- bug fix for nfilter symlinking +- fix for qsub job output from runsample Version 1.4.2 +++++++++++++ diff --git a/docs/source/index.rst b/docs/source/index.rst index b1d34c62..a652a132 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -41,3 +41,4 @@ Indices and tables * :ref:`genindex` * :ref:`modindex` * :ref:`search` + diff --git a/ngs_mapper/config.yaml.default b/ngs_mapper/config.yaml.default index 3666f237..795b790c 100644 --- a/ngs_mapper/config.yaml.default +++ b/ngs_mapper/config.yaml.default @@ -2,6 +2,9 @@ # Does not need the trailing / NGSDATA: &NGSDATA /path/to/NGSDATA +# Default threads to use for any stage that supports it +THREADS: &THREADS 1 + # All scripts by name should be top level items # Sub items then are the option names(the dest portion of the add_arugment for the script) # Each option needs to define the default as well as the help message @@ -13,8 +16,8 @@ ngs_filter: default: 0 help: 'The index for each associated MiSeq read must be equal or above this value.' threads: - default: 1 - help: 'How many threads to use for bwa[Default: %(default)s]' + default: *THREADS + help: 'How many threads to use[Default: %(default)s]' platforms: choices: - MiSeq @@ -81,7 +84,7 @@ run_bwa_on_samplename: default: False help: 'Flag to indicate that you want the temporary files kept instead of removing them[Default: %(default)s]' threads: - default: 1 + default: *THREADS help: 'How many threads to use for bwa[Default: %(default)s]' tagreads: SM: @@ -113,7 +116,7 @@ base_caller: default: 10 help: 'What factor to bias high quality bases by. Must be an integer >= 1[Default: %(default)s]' threads: - default: 1 + default: *THREADS help: 'How many threads to use when running base_caller.py[Default: %(default)s]' miseq_sync: ngsdata: diff --git a/ngs_mapper/file_formats.py b/ngs_mapper/file_formats.py index afbd0e6d..d5143cab 100644 --- a/ngs_mapper/file_formats.py +++ b/ngs_mapper/file_formats.py @@ -16,10 +16,12 @@ drop_ext = lambda s: '.'.join(s.split('.')[:-1]) swap_ext = lambda ext: lambda s: drop_ext(s) + '.' + ext find_ext = lambda ext: lambda dir: glob("%s/*%s" % (dir, ext)) +swap_dir = lambda dir: lambda p: os.path.join(dir, os.path.basename(p)) -def convert_sff(dir): +def convert_sff(dir, outdir): sff_paths = find_ext('sff')(dir) outnames = map(swap_ext('fastq'), sff_paths) + outnames = map(swap_dir(outdir), outnames) def wrapped_conv(a, b): logger.info('Converting {0} to {1}'.format(a, b)) n = 0 @@ -31,37 +33,56 @@ def wrapped_conv(a, b): return n return sum(map(wrapped_conv, sff_paths, outnames)) -def convert_ab1(dir): +def convert_ab1(dir, outdir): for abi in find_ext('ab1')(dir): dest = swap_ext('fastq')(abi) + dest = swap_dir(outdir)(dest) logger.info('Converting {0} to {1}'.format(abi, dest)) SeqIO.convert(abi, 'abi', dest, 'fastq') -def convert_gzips(dir): +def convert_gzips(dir, outdir): for gz in find_ext('gz')(dir): - dest = drop_ext(gz) + dest = swap_dir(outdir)(drop_ext(gz)) with gzip.open( gz, 'rb' ) as input: with open(dest, 'w') as output: logger.info('Unpacking {0} to {1}'.format(gz, dest)) output.write(input.read()) - -def convert_formats(dir): - convert_gzips(dir) - convert_ab1(dir) - convert_sff(dir) - -def get_dir_arg(): +def link_fastqs(dir, outdir): + for fq in find_ext('fastq')(dir): + dest = swap_dir(outdir)(fq) + src = os.path.abspath(fq) + dst = os.path.abspath(dest) + if os.path.exists(dst): + logger.warning( + 'Skipping symlink of {0} because {1} already exists.' \ + 'This can happen if you have the file compressed and also not ' \ + 'compressed in the input directory'.format( + src, dst + )) + else: + logger.debug('Symlinking {0} to {1}'.format(src, dst)) + os.symlink(src, dst) + +def convert_formats(dir, outdir): + convert_gzips(dir, outdir) + convert_ab1(dir, outdir) + convert_sff(dir, outdir) + link_fastqs(dir, outdir) + +def get_dir_args(): if os.path.isdir(sys.argv[1]): - return sys.argv[1] + indir, outdir = sys.argv[1], sys.argv[2] + os.mkdir(outdir) + return indir, outdir else: - raise ValueError("Path %s is not a directory" % sys.argv[1]) + raise ValueError("Path %s or %s is not a directory" % (sys.argv[1], sys.argv[2])) def main_convert_formats(): - convert_formats(get_dir_arg()) + convert_formats(*get_dir_args()) -def main_sff_convert(): - convert_sff(get_dir_arg()) +def main_sff_convert(): + convert_sff(*get_dir_args()) # sff_names = filter(lambda x: x.endswith('sff'), os.listdir(dir)) # sff_paths = map(partial(os.path.join, dir), sff_names) diff --git a/ngs_mapper/nfilter.py b/ngs_mapper/nfilter.py index aa27de9f..65c32ec3 100644 --- a/ngs_mapper/nfilter.py +++ b/ngs_mapper/nfilter.py @@ -165,7 +165,7 @@ def write_filtered(readpath, idxQualMin, dropNs, outdir='.'): results = make_filtered(readpath, idxQualMin, dropNs) outpath = name_filtered(readpath, outdir) if not idxQualMin and not dropNs: - os.symlink(readpath, outpath) + os.symlink(os.path.abspath(readpath), os.path.abspath(outpath)) logger.warn("Index Quality was %s and dropNs was set to %s, so file %s was copied to %s without filtering" % (idxQualMin, dropNs, readpath, outpath)) return outpath try: diff --git a/ngs_mapper/runsample.py b/ngs_mapper/runsample.py index a4c19fda..53797c36 100644 --- a/ngs_mapper/runsample.py +++ b/ngs_mapper/runsample.py @@ -221,6 +221,7 @@ def parse_args( args=sys.argv[1:] ): parser.add_argument( '--drop-ns', dest='drop_ns', + action='store_true', default=_config['ngs_filter']['dropNs']['default'], help=_config['ngs_filter']['dropNs']['help'], ) @@ -243,7 +244,32 @@ def parse_args( args=sys.argv[1:] ): args, rest = parser.parse_known_args(args) args.config = configfile - return args,rest + + # Parse qsub args if found + if rest and rest[0].startswith('--qsub'): + qsub_parser = argparse.ArgumentParser(add_help=False) + qsub_parser.add_argument( + '--qsub-help', + default=False, + action='store_true' + ) + qsub_parser.add_argument( + '--qsub_l', + default='nodes=1:ppn=1', + ) + qsub_parser.add_argument( + '--qsub_M', + default=None + ) + + qsub_args = qsub_parser.parse_args(rest) + + if qsub_args.qsub_help: + qsub_parser.print_help() + sys.exit(1) + rest = qsub_args + + return args, rest def make_project_repo( projpath ): ''' @@ -274,11 +300,11 @@ def run_cmd( cmdstr, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, scri raise MissingCommand( "{0} is not an executable?".format(cmd[0]) ) def main(): - args,rest = parse_args() + args,qsubargs = parse_args() # Qsub job? - if rest and rest[0].startswith('--qsub'): - args, qsubargs = split_args(' '.join(sys.argv[1:])) - print pbs_job(args, qsubargs) + if qsubargs: + runsampleargs, _ = split_args(' '.join(sys.argv[1:])) + print pbs_job(runsampleargs, qsubargs) sys.exit(1) # So we can set the global logger global logger @@ -354,20 +380,23 @@ def select_keys(d, keys): return dict( ((k, v) for k, v in d.items() if k in keys)) #convert sffs to fastq + convert_dir = os.path.join(tdir,'converted') - print sh.convert_formats(cmd_args['readsdir'], _out=sys.stdout, _err=sys.stderr) + print sh.convert_formats(cmd_args['readsdir'], convert_dir, _out=sys.stdout, _err=sys.stderr) #print sh.sff_to_fastq(cmd_args['readsdir'], _out=sys.stdout, _err=sys.stderr) try: if cmd_args['config']: - __result = sh.ngs_filter(cmd_args['readsdir'], config=cmd_args['config'], outdir=cmd_args['filtered_dir']) + __result = sh.ngs_filter(convert_dir, config=cmd_args['config'], outdir=cmd_args['filtered_dir']) else: filter_args = select_keys(cmd_args, ["drop_ns", "platforms", "index_min"]) - __result = sh.ngs_filter(cmd_args['readsdir'], outdir=cmd_args['filtered_dir'], **filter_args) + __result = sh.ngs_filter(convert_dir, outdir=cmd_args['filtered_dir'], **filter_args) logger.debug( 'ngs_filter: %s' % __result ) except sh.ErrorReturnCode, e: logger.error(e.stderr) sys.exit(1) + #sh.rm(convert_dir, r=True) + #Trim reads cmd = 'trim_reads {filtered_dir} -q {trim_qual} -o {trim_outdir} --head-crop {head_crop}' if cmd_args['config']: @@ -479,37 +508,17 @@ def pbs_job(runsampleargs, pbsargs): :param string runsampleargs: args that are for runsample that originaly came from sys.argv(any non --qsub\_) - :param string pbsargs: args for qsub(any --qsub\_) + :param Namespace pbsargs: parsed --qsub_* args :return: pbs job file string ''' - qsub_parser = argparse.ArgumentParser(add_help=False) - qsub_parser.add_argument( - '--qsub-help', - default=False, - action='store_true' - ) - qsub_parser.add_argument( - '--qsub_l', - default='nodes=1:ppn=1', - ) - qsub_parser.add_argument( - '--qsub_M', - default=None - ) - qsub_args = qsub_parser.parse_args(pbsargs) - - if qsub_args.qsub_help: - qsub_parser.print_help() - return '' - samplename = runsampleargs[2] template = '#!/bin/bash\n' \ '#PBS -N {samplename}-ngs_mapper\n' \ '#PBS -j oe\n' \ '#PBS -l {qsub_l}\n' - if qsub_args.qsub_M is not None: + if pbsargs.qsub_M is not None: template += '#PBS -m abe\n' \ - '#PBS -M ' + qsub_args.qsub_M + '\n' + '#PBS -M ' + pbsargs.qsub_M + '\n' template += '\n' \ 'cd $PBS_O_WORKDIR\n' \ @@ -517,7 +526,7 @@ def pbs_job(runsampleargs, pbsargs): return template.format( samplename=samplename, - qsub_l=qsub_args.qsub_l, + qsub_l=pbsargs.qsub_l, runsampleargs=' '.join(runsampleargs) ) diff --git a/ngs_mapper/tagreads.py b/ngs_mapper/tagreads.py index 3c2950fe..6e36195b 100644 --- a/ngs_mapper/tagreads.py +++ b/ngs_mapper/tagreads.py @@ -22,7 +22,7 @@ class HeaderExists(Exception): pass ID_MAP = ( re.compile( '[0-9A-Z]{14}' ), re.compile( '[A-Z0-9]{5}:\d{1,}:\d{1,}' ), - re.compile( 'M[0-9]{5}:\d+:\d{9}-[A-Z0-9]{5}:\d:\d{4}:\d{4,5}:\d{4,5}' ), + re.compile( 'M[0-9]{5}:\d+:[\w\d-]+:\d:\d{4}:\d{4,5}:\d{4,5}' ), re.compile( '.*' ) ) # Read Group Template diff --git a/ngs_mapper/tests/test_runsample.py b/ngs_mapper/tests/test_runsample.py index ccc45f6c..50a16117 100644 --- a/ngs_mapper/tests/test_runsample.py +++ b/ngs_mapper/tests/test_runsample.py @@ -32,34 +32,6 @@ def check_git_repo( self, path ): print e.output return False -@attr('current') -class TestUnitArgs(Base): - functionname = 'parse_args' - - def test_defaults( self ): - args = ['ReadsBySample','Reference.fasta','Sample1'] - res = self._C( args ) - eq_( 'ReadsBySample', res[0].readsdir ) - eq_( 'Reference.fasta', res[0].reference ) - eq_( 'Sample1', res[0].prefix ) - eq_( os.getcwd(), res[0].outdir ) - - def test_set_outdir( self ): - args = ['ReadsBySample','Reference.fasta','Sample1','-od','outdir'] - res = self._C( args ) - eq_( 'ReadsBySample', res[0].readsdir ) - eq_( 'Reference.fasta', res[0].reference ) - eq_( 'Sample1', res[0].prefix ) - eq_( 'outdir', res[0].outdir ) - - def test_parses_only_known(self): - args = [ - 'ReadsBySample','Reference.fasta','Sample1', - '-od','outdir', '--qsub_X', 'foo' - ] - res = self._C(args) - eq_(['--qsub_X','foo'], res[1]) - @patch('ngs_mapper.runsample.logger',Mock()) class TestUnitRunCMD(object): from ngs_mapper import runsample @@ -175,6 +147,7 @@ def _expected_files( self, outdir, prefix ): efiles.append( (d,join( outdir, 'trimmed_reads' )) ) efiles.append( (f,join( outdir, prefix+'.reads.png' )) ) efiles.append( (d,(join( outdir, 'trim_stats' ))) ) + efiles.append( (d,(join( outdir, 'converted' ))) ) ibmtools = join(outdir, '.com_ibm_tools_attach') if exists(ibmtools): efiles.append((d,ibmtools)) @@ -270,6 +243,8 @@ def test_ensure_proper_log( self ): import mock import unittest +import sh +from nose import tools from ngs_mapper import runsample @attr('current') @@ -285,3 +260,55 @@ def test_splits_correctly(self): r[1], ['--qsub_l','foo','--qsub-help'] ) + +@attr('current') +class TestUnitArgs(unittest.TestCase): + + def test_defaults( self ): + args = ['ReadsBySample','Reference.fasta','Sample1'] + res = runsample.parse_args( args ) + eq_( 'ReadsBySample', res[0].readsdir ) + eq_( 'Reference.fasta', res[0].reference ) + eq_( 'Sample1', res[0].prefix ) + eq_( os.getcwd(), res[0].outdir ) + + def test_set_outdir( self ): + args = ['ReadsBySample','Reference.fasta','Sample1','-od','outdir'] + res = runsample.parse_args( args ) + eq_( 'ReadsBySample', res[0].readsdir ) + eq_( 'Reference.fasta', res[0].reference ) + eq_( 'Sample1', res[0].prefix ) + eq_( 'outdir', res[0].outdir ) + + def test_parses_qsub_args(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--qsub_M', 'foo' + ] + res = runsample.parse_args(args) + args, qsub_args = res + eq_(args.outdir, 'outdir') + eq_(qsub_args.qsub_M, 'foo') + + def test_parser_exception_when_incorrect_argument_given(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--qsub_missing' + ] + self.assertRaises(SystemExit, runsample.parse_args, args) + + def test_qsub_help_exits_and_displays_help(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--qsub-help' + ] + self.assertRaises(SystemExit, runsample.parse_args, args) + + def test_drop_ns_bool(self): + args = [ + 'ReadsBySample','Reference.fasta','Sample1', + '-od','outdir', '--drop-ns' + ] + res = runsample.parse_args(args) + args, qsub_args = res + eq_(args.drop_ns, True) diff --git a/ngs_mapper/tests/test_tagreads.py b/ngs_mapper/tests/test_tagreads.py index b019651b..7728a503 100644 --- a/ngs_mapper/tests/test_tagreads.py +++ b/ngs_mapper/tests/test_tagreads.py @@ -86,7 +86,8 @@ def test_miseq( self ): 'M02261:4:000000000-A6FWH:1:2106:7558:24138', 'M02261:4:000000000-A6FWH:1:2106:75581:24138', 'M02261:4:000000000-A6FWH:1:2106:7558:241381', - 'M02261:14:000000000-A6481:1:1101:21903:18036' + 'M02261:14:000000000-A6481:1:1101:21903:18036', + 'M01119:15:AARB3:1:1111:23912:25198', ) for r in rn: read = self.mock_read() diff --git a/ngs_mapper/tests/unit/test_runsample.py b/ngs_mapper/tests/unit/test_runsample.py index bca5c737..f87dd5c2 100644 --- a/ngs_mapper/tests/unit/test_runsample.py +++ b/ngs_mapper/tests/unit/test_runsample.py @@ -10,10 +10,11 @@ def setUp(self): '-trim_qual', '25', '-head_crop', '0', '-minth', '0', '--CN', 'bar', '-od', 'outdir' ] - self.qsub_args = self.qa = [ - '--qsub_M', 'email@example.com', - '--qsub_l', 'nodes=1:ppn=1' - ] + class Namespace(object): + pass + self.qsub_args = self.qa = Namespace() + self.qa.qsub_M = 'email@example.com' + self.qa.qsub_l = 'nodes=1:ppn=1' self.args = [ self.runsample_args, self.qsub_args @@ -51,7 +52,7 @@ def test_sets_correct_pbs_directives(self): ) def test_omits_email_if_not_in_args(self): - self.args[1] = ['--qsub_l', 'nodes=1:ppn=1'] + self.args[1].qsub_M = None r = runsample.pbs_job(*self.args) self.assertNotIn('#PBS -M', r) self.assertNotIn('#PBS -m', r)