Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel blast improvements #59

Merged
merged 8 commits into from
Aug 31, 2015
5 changes: 5 additions & 0 deletions bio_pieces/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
except ImportError:
from ordereddict import OrderedDict

try:
from __builtin__ import open
except ImportError:
from builtins import open

# Tests directory
from os.path import dirname
THIS = dirname(__file__)
92 changes: 65 additions & 27 deletions bio_pieces/parallel_blast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,19 @@
import shlex
import subprocess
import sys
try:
from __builtin__ import open
except ImportError:
from builtins import open

try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from bio_pieces.compat import OrderedDict, open

import sh

# Staticly set options for blast
MAX_TARGET_SEQS = 10
BLAST_FORMAT = "6 qseqid sseqid pident length mismatch gapopen qstart qend sstart send evalue bitscore"
# Users cannot have these in the other args passed
STATIC_BLAST_ARGS = [
'-num_threads', '-db', '-query'
]

# Users cannot have these in the other args passed
STATIC_DIAMOND_ARGS = [
'-t', '--threads', '-d', '--db', '-q', '--query', '--daa', '-a'
]

def parse_args():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -73,23 +71,39 @@ def parallel_blast(inputfile, outfile, ninst, db, blasttype, task, blastoptions)
None if blastx/blastp
:param str blastoptions: other options to pass to blast
'''
if has_duplicate_args(blastoptions, STATIC_BLAST_ARGS):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set(a).intersection(b) or filter(a.__contains__, b)
using has_duplicate_args forces me to look for that function definition elsewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blastoptions is a string so for both your cases you would need to convert blastoptions to a list using shlex.split.

set(STATIC_BLAST_ARGS).intersection(shlex.split(blastoptions))

Typically I think this is frowned upon as you have too many things going on in one line making it hard to read an non-pythonic.

I'll remove the function though because it doesn't really have a name that tells what it is doing and it removes a test class though.

raise ValueError("You cannot supply any of the arguments inside of {0} as" \
" optional arguments to blast".format(STATIC_BLAST_ARGS))
blast_path = sh.which(blasttype)
if blast_path is None:
raise ValueError("{0} is not in your path(Maybe not installed?)".format(
blasttype
))
args = ['-u', '--pipe', '--block', '10', '--recstart', '>']
args = ['-u', '--pipe', '--block', '100k', '--recstart', '>']
args += generate_sshlogins(ninst)
args += [blast_path]
if task is not None:
args += ['-task', task]
args += ['-db', db, '-max_target_seqs', str(MAX_TARGET_SEQS),
'-outfmt', '"'+BLAST_FORMAT+'"'
]
args += ['-db', db,]
args += shlex.split(blastoptions)
args += ['-query', '-']
cmd = sh.Command('parallel')
run(cmd, args, inputfile, outfile)
run(cmd, *args, _in=open(inputfile), _out=open(outfile,'w'))

def has_duplicate_args(argstring, staticarglist):
'''
Ensure that none of staticarglist arguments are contained in argstring
If they are then return True otherwise false

:param str argstring: argument string for some command
:param list staticarglist: list of static args that should be checked to see
if they are contained in argstring
:returns: True or False
'''
for x in staticarglist:
if x in argstring:
return True
return False

def parallel_diamond(inputfile, outfile, ninst, db, task, diamondoptions):
'''
Expand All @@ -109,6 +123,9 @@ def parallel_diamond(inputfile, outfile, ninst, db, task, diamondoptions):
diamond -task blastx -compress 0 -db /path/nt -o outfile -query inputfile -o outfile
my $cmd = "$type $task_option $options -q $query -d $db -o $out";
'''
if has_duplicate_args(diamondoptions, STATIC_DIAMOND_ARGS):
raise ValueError("You cannot supply any of the arguments inside of {0} as" \
" optional arguments to diamond".format(STATIC_DIAMOND_ARGS))
# This seems kinda stupid that we are just replacing cpu count for each
# node with 1, but it is easier than refactoring other code to be better
sshlogins = generate_sshlogins(ninst)
Expand All @@ -118,19 +135,40 @@ def parallel_diamond(inputfile, outfile, ninst, db, task, diamondoptions):
dmnd_path = sh.which('diamond')
if dmnd_path is None:
raise ValueError("diamond is not in your path(Maybe not installed?)")
args = ['-u', '--pipe', '--block', '10', '--recstart', '>', '--cat']
args += sshlogins
args += [
diamond_cmd = [
dmnd_path, task, '--threads', str(ninst), '--db', db, '--query', '{}',
'--compress', '0', '-a', outfile
] + shlex.split(diamondoptions)
cmd = sh.Command('parallel')
run(cmd, args, inputfile, outfile)
'-a', '{}.{#}', ';', dmnd_path, 'view', '{}.{#}.daa'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little cryptic I would mention the need to do diamond view in the docstring, maybe mention what {#} is doing

]
if len(sshlogins) > 2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why > 2 rather than > 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because sshlogins is a list of ['--sshlogin', 'something'] so running local the list looks like ['--sshlogins', '1/:']

args = ['-u', '--pipe', '--block', '10', '--recstart', '>', '--cat']
args += sshlogins
diamond_cmd_str = ' '.join(diamond_cmd) + diamondoptions
args += [diamond_cmd_str]
cmd = sh.Command('parallel')
run(cmd, *args, _in=open(inputfile), _out=open(outfile,'w'))
else:
dcmd = sh.Command('diamond')
args = [task]
if diamondoptions:
args += shlex.split(diamondoptions)
p = run(
dcmd, *args, threads=ninst, db=db, query=inputfile, a=outfile
)
p = run(
dcmd, 'view', a=outfile+'.daa', _out=open(outfile,'w')
)

def run(cmd, args, infile, outfile):
print("[cmd] {0} {1}".format(cmd._path, ' '.join(args)))
def run(cmd, *args, **kwargs):
'''
Runs and prints what is being run to stdout
'''
kwargsignore = ['_in', '_out']
kwargs_str = ' '.join(['--'+a+' '+str(v) for a,v in kwargs.items()
if a not in kwargsignore])
args_str = ' '.join(args)
print("[cmd] {0} {1} {2}".format(cmd._path, args_str, kwargs_str))
try:
p = cmd(*args, _in=open(infile), _out=open(outfile,'w'))
p = cmd(*args, **kwargs)
print(p)
except sh.ErrorReturnCode as e:
print("There was an error")
Expand Down
9 changes: 7 additions & 2 deletions docs/scripts/parallel_blast.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ Running diamond
+++++++++++++++

Diamond v0.7.9 is the version that was tested with parallel_blast. As diamond is
still in development the options may change and thus parallel_blast may not run
it correctly.
still in development the options may change in future versions and parallel_blast
may not run them correctly. Please submit a new issue if you find any issues.

.. code-block:: bash

Expand All @@ -57,6 +57,11 @@ it correctly.
Notice how even though we specified ``--ninst 4`` that ``--sshlogin 1/:`` was used
and ``--threads 4`` was set instead.

**Note** In recent versions of diamond, diamond outputs a daa binary file instead
of a tab separated file. parallel_blast automatically converts the diamond output
from daa to tab format for you but leaves the daa file behind(Same name as the
output file you specify, but with the extension .daa)

Command that is run
+++++++++++++++++++

Expand Down
110 changes: 91 additions & 19 deletions tests/test_parallel_blast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import print_function
import tempfile
import os

Expand Down Expand Up @@ -176,10 +177,6 @@ def test_command_string_is_correct(self):
self.assertIn('/path/db/nt', blastcmd)
self.assertIn('-otherblast', blastcmd)
self.assertIn('arg', blastcmd)
self.assertIn('-max_target_seqs', blastcmd)
self.assertIn(str(parallel_blast.MAX_TARGET_SEQS), blastcmd)
self.assertIn('-outfmt', blastcmd)
self.assertIn('"{0}"'.format(parallel_blast.BLAST_FORMAT), blastcmd)

def test_localhost(self):
self.mock_sh_which.return_value = '/path/to/foon'
Expand Down Expand Up @@ -215,6 +212,16 @@ def test_cannot_find_exe_raises_exception(self):
self.infile, self.outfile, 5, '/path/to/db', 'blastn', 'foox', '-bar foo'
)

def test_passing_other_options_that_are_static_options_raises_exception(self):
self.mock_sh_which.return_value = '/path/to/blast'
for arg in parallel_blast.STATIC_BLAST_ARGS:
self.assertRaises(
ValueError,
parallel_blast.parallel_blast,
self.infile, self.outfile, 5, '/path/to/blast', 'blastn', 'foox',
arg + ' foo'
)

class TestParallelDiamond(MockSH):
def test_correct_inputoutput_handling(self):
self.mock_sh_which.return_value = '/path/to/diamond'
Expand All @@ -226,7 +233,6 @@ def test_correct_inputoutput_handling(self):
# It seems that parallel needs
self.assertEqual(r[1]['_in'], self.mock_open.return_value)
self.assertEqual(r[1]['_out'], self.mock_open.return_value)
self.assertIn('--pipe', r[0])

def test_cannot_find_exe_raises_exception(self):
self.mock_sh_which.return_value = None
Expand All @@ -236,24 +242,30 @@ def test_cannot_find_exe_raises_exception(self):
self.infile, self.outfile, 5, '/path/to/dmd', 'foox', '-bar foo'
)

def test_correct_command_string(self):
def test_local_runs_diamond_cmd_directly(self):
self.mock_sh_which.return_value = '/path/to/diamond'
parallel_blast.parallel_diamond(
self.infile, self.outfile, 5, '/path/to/dmd', 'foox', '-bar foo'
)
r = self.mock_sh_cmd.return_value.call_args[0]
self.assertIn('foox', r)
self.assertIn('--threads', r)
self.assertIn('5', r)
self.assertIn('--db', r)
self.assertIn('/path/to/dmd', r)
self.assertIn('--query', r)
self.assertIn('{}', r)
self.assertIn('--cat', r)
self.assertIn('--sshlogin', r)
self.assertIn('1/:', r)

def test_each_remote_host_has_one_instance(self):
# blastx call then view call
r1,r2 = self.mock_sh_cmd.return_value.call_args_list
print(r1)

r1a,r1k = r1
self.assertEqual(5, r1k['threads'])
self.assertEqual(self.infile, r1k['query'])
self.assertEqual('/path/to/dmd', r1k['db'])
self.assertEqual(self.outfile, r1k['a'])
self.assertEqual('foox', r1a[0])
self.assertEqual(('-bar','foo'), r1a[1:])

r2a,r2k = r2
self.assertEqual('view', r2a[0])
self.assertEqual(self.outfile+'.daa', r2k['a'])
self.assertEqual(self.mock_open.return_value, r2k['_out'])
self.mock_open.assert_called_once_with(self.outfile,'w')

def test_each_remote_host_has_one_instance_and_runs_parallel(self):
self.mock_sh_which.return_value = '/path/to/diamond'
self.mock_open.return_value.__enter__.return_value = PBS_MACHINEFILE.splitlines()
with mock.patch.dict('bio_pieces.parallel_blast.os.environ', {'PBS_NODEFILE': self.hostfile}):
Expand All @@ -265,6 +277,66 @@ def test_each_remote_host_has_one_instance(self):
self.assertIn('1/node1.localhost', r)
self.assertIn('1/node2.localhost', r)
self.assertIn('1/node3.localhost', r)
self.assertIn('10', r)

def test_passing_other_options_that_are_static_options_raises_exception(self):
self.mock_sh_which.return_value = '/path/to/diamond'
for arg in parallel_blast.STATIC_DIAMOND_ARGS:
self.assertRaises(
ValueError,
parallel_blast.parallel_diamond,
self.infile, self.outfile, 5, '/path/to/dmd', 'foox', arg + ' foo'
)

class TestHasDuplicateArgs(unittest.TestCase):
def test_empty_argstring_edge_case(self):
self.assertFalse(
parallel_blast.has_duplicate_args('', [])
)

def test_contains_one_arg(self):
self.assertTrue(
parallel_blast.has_duplicate_args('-foo bar -baz', ['-baz'])
)

def test_contains_many_args(self):
self.assertTrue(
parallel_blast.has_duplicate_args('-foo bar -baz', ['-foo', '-baz'])
)

def test_contains_no_args(self):
self.assertFalse(
parallel_blast.has_duplicate_args('-foo bar -baz', ['-bar'])
)

@mock.patch('bio_pieces.parallel_blast.sys.stdout')
class TestRun(MockSH):
def setUp(self):
super(TestRun, self).setUp()
self.args = ['foo', '-bar']
self.kwargs = {'foo':'bar'}
self.cmd = mock.Mock()
self.cmd.return_value.cmd = ['/path/to/x', 'foo', '-bar', '--foo', 'bar']
self.cmd._path = '/path/to/x'

def test_passes_args_kwargs_to_cmd(self, mock_sout):
_in = StringIO('test')
_out = StringIO()
self.kwargs['_in'] = _in
self.kwargs['_out'] = _out
r = parallel_blast.run(self.cmd, *self.args, **self.kwargs)
self.cmd.called_once_with(*self.args, **self.kwargs)
sout = mock_sout.write.call_args_list[0][0][0]
self.assertNotIn('_in', sout)
self.assertNotIn('_out', sout)

def test_prints_cmd_to_stdout(self, mock_sout):
r = parallel_blast.run(self.cmd, *self.args, **self.kwargs)
sout = mock_sout.write.call_args_list
self.assertEqual(
'[cmd] {0}'.format(' '.join(self.cmd.return_value.cmd)),
mock_sout.write.call_args_list[0][0][0]
)

@mock.patch('bio_pieces.parallel_blast.parallel_blast')
@mock.patch('bio_pieces.parallel_blast.parallel_diamond')
Expand Down