Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel blast improvements #59

Merged
merged 8 commits into from
Aug 31, 2015
5 changes: 5 additions & 0 deletions bio_pieces/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
except ImportError:
from ordereddict import OrderedDict

try:
from __builtin__ import open
except ImportError:
from builtins import open

# Tests directory
from os.path import dirname
THIS = dirname(__file__)
91 changes: 60 additions & 31 deletions bio_pieces/parallel_blast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,19 @@
import shlex
import subprocess
import sys
try:
from __builtin__ import open
except ImportError:
from builtins import open

try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from bio_pieces.compat import OrderedDict, open

import sh

# Staticly set options for blast
MAX_TARGET_SEQS = 10
BLAST_FORMAT = "6 qseqid sseqid pident length mismatch gapopen qstart qend sstart send evalue bitscore"
# Users cannot have these in the other args passed
STATIC_BLAST_ARGS = [
'-num_threads', '-db', '-query'
]

# Users cannot have these in the other args passed
STATIC_DIAMOND_ARGS = [
'-t', '--threads', '-d', '--db', '-q', '--query', '--daa', '-a'
]

def parse_args():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -73,23 +71,24 @@ def parallel_blast(inputfile, outfile, ninst, db, blasttype, task, blastoptions)
None if blastx/blastp
:param str blastoptions: other options to pass to blast
'''
if set(STATIC_BLAST_ARGS).intersection(shlex.split(blastoptions)):
raise ValueError("You cannot supply any of the arguments inside of {0} as" \
" optional arguments to blast".format(STATIC_BLAST_ARGS))
blast_path = sh.which(blasttype)
if blast_path is None:
raise ValueError("{0} is not in your path(Maybe not installed?)".format(
blasttype
))
args = ['-u', '--pipe', '--block', '10', '--recstart', '>']
args = ['-u', '--pipe', '--block', '100k', '--recstart', '>']
args += generate_sshlogins(ninst)
args += [blast_path]
if task is not None:
args += ['-task', task]
args += ['-db', db, '-max_target_seqs', str(MAX_TARGET_SEQS),
'-outfmt', '"'+BLAST_FORMAT+'"'
]
args += ['-db', db,]
args += shlex.split(blastoptions)
args += ['-query', '-']
cmd = sh.Command('parallel')
run(cmd, args, inputfile, outfile)
run(cmd, *args, _in=open(inputfile), _out=open(outfile,'w'))

def parallel_diamond(inputfile, outfile, ninst, db, task, diamondoptions):
'''
Expand All @@ -98,17 +97,21 @@ def parallel_diamond(inputfile, outfile, ninst, db, task, diamondoptions):
Will not run more than 1 diamond process per host as diamond utilizes
threads better than blast

Since diamond v0.7.9 produces a daa file, diamond view is required to output
the tsv format that is similar to blast's output format. diamond view is
automatically called on the produced .daa file so that GNU Parallel can combine
all output into a single stream.

:param str inputfile: Input fasta path
:param str outfile: Output file path
:param int ninst: number of threads to use if not in PBS or SGE job
:param str db: Database path to blast against
:param str task: blastx or blastp
:param str diamondoptions: other options to pass to blast
'''
'''
diamond -task blastx -compress 0 -db /path/nt -o outfile -query inputfile -o outfile
my $cmd = "$type $task_option $options -q $query -d $db -o $out";
'''
if set(STATIC_DIAMOND_ARGS).intersection(shlex.split(diamondoptions)):
raise ValueError("You cannot supply any of the arguments inside of {0} as" \
" optional arguments to diamond".format(STATIC_DIAMOND_ARGS))
# This seems kinda stupid that we are just replacing cpu count for each
# node with 1, but it is easier than refactoring other code to be better
sshlogins = generate_sshlogins(ninst)
Expand All @@ -118,19 +121,45 @@ def parallel_diamond(inputfile, outfile, ninst, db, task, diamondoptions):
dmnd_path = sh.which('diamond')
if dmnd_path is None:
raise ValueError("diamond is not in your path(Maybe not installed?)")
args = ['-u', '--pipe', '--block', '10', '--recstart', '>', '--cat']
args += sshlogins
args += [
# Diamond base command arguments
# parallel replaces {} with the temporary file it is using
# and replaces {#} with the current file segment it is using
# After diamond is finished, diamond view will be used to output the tsv
# format of the file
diamond_cmd = [
dmnd_path, task, '--threads', str(ninst), '--db', db, '--query', '{}',
'--compress', '0', '-a', outfile
] + shlex.split(diamondoptions)
cmd = sh.Command('parallel')
run(cmd, args, inputfile, outfile)
'--daa', '{}.{#}', ';', dmnd_path, 'view', '--daa', '{}.{#}.daa'
]
if len(sshlogins) > 2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why > 2 rather than > 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because sshlogins is a list of ['--sshlogin', 'something'] so running local the list looks like ['--sshlogins', '1/:']

args = ['-u', '--pipe', '--block', '10', '--recstart', '>', '--cat']
args += sshlogins
diamond_cmd_str = ' '.join(diamond_cmd) + diamondoptions
args += [diamond_cmd_str]
cmd = sh.Command('parallel')
run(cmd, *args, _in=open(inputfile), _out=open(outfile,'w'))
else:
dcmd = sh.Command('diamond')
args = [task]
if diamondoptions:
args += shlex.split(diamondoptions)
p = run(
dcmd, *args, threads=ninst, db=db, query=inputfile, daa=outfile
)
p = run(
dcmd, 'view', daa=outfile+'.daa', _out=open(outfile,'w')
)

def run(cmd, args, infile, outfile):
print("[cmd] {0} {1}".format(cmd._path, ' '.join(args)))
def run(cmd, *args, **kwargs):
'''
Runs and prints what is being run to stdout
'''
kwargsignore = ['_in', '_out']
kwargs_str = ' '.join(['--'+a+' '+str(v) for a,v in kwargs.items()
if a not in kwargsignore])
args_str = ' '.join(args)
print("[cmd] {0} {1} {2}".format(cmd._path, args_str, kwargs_str))
try:
p = cmd(*args, _in=open(infile), _out=open(outfile,'w'))
p = cmd(*args, **kwargs)
print(p)
except sh.ErrorReturnCode as e:
print("There was an error")
Expand Down
9 changes: 7 additions & 2 deletions docs/scripts/parallel_blast.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ Running diamond
+++++++++++++++

Diamond v0.7.9 is the version that was tested with parallel_blast. As diamond is
still in development the options may change and thus parallel_blast may not run
it correctly.
still in development the options may change in future versions and parallel_blast
may not run them correctly. Please submit a new issue if you find any issues.

.. code-block:: bash

Expand All @@ -57,6 +57,11 @@ it correctly.
Notice how even though we specified ``--ninst 4`` that ``--sshlogin 1/:`` was used
and ``--threads 4`` was set instead.

**Note** In recent versions of diamond, diamond outputs a daa binary file instead
of a tab separated file. parallel_blast automatically converts the diamond output
from daa to tab format for you but leaves the daa file behind(Same name as the
output file you specify, but with the extension .daa)

Command that is run
+++++++++++++++++++

Expand Down
89 changes: 70 additions & 19 deletions tests/test_parallel_blast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import print_function
import tempfile
import os

Expand Down Expand Up @@ -176,10 +177,6 @@ def test_command_string_is_correct(self):
self.assertIn('/path/db/nt', blastcmd)
self.assertIn('-otherblast', blastcmd)
self.assertIn('arg', blastcmd)
self.assertIn('-max_target_seqs', blastcmd)
self.assertIn(str(parallel_blast.MAX_TARGET_SEQS), blastcmd)
self.assertIn('-outfmt', blastcmd)
self.assertIn('"{0}"'.format(parallel_blast.BLAST_FORMAT), blastcmd)

def test_localhost(self):
self.mock_sh_which.return_value = '/path/to/foon'
Expand Down Expand Up @@ -215,6 +212,16 @@ def test_cannot_find_exe_raises_exception(self):
self.infile, self.outfile, 5, '/path/to/db', 'blastn', 'foox', '-bar foo'
)

def test_passing_other_options_that_are_static_options_raises_exception(self):
self.mock_sh_which.return_value = '/path/to/blast'
for arg in parallel_blast.STATIC_BLAST_ARGS:
self.assertRaises(
ValueError,
parallel_blast.parallel_blast,
self.infile, self.outfile, 5, '/path/to/blast', 'blastn', 'foox',
arg + ' foo'
)

class TestParallelDiamond(MockSH):
def test_correct_inputoutput_handling(self):
self.mock_sh_which.return_value = '/path/to/diamond'
Expand All @@ -226,7 +233,6 @@ def test_correct_inputoutput_handling(self):
# It seems that parallel needs
self.assertEqual(r[1]['_in'], self.mock_open.return_value)
self.assertEqual(r[1]['_out'], self.mock_open.return_value)
self.assertIn('--pipe', r[0])

def test_cannot_find_exe_raises_exception(self):
self.mock_sh_which.return_value = None
Expand All @@ -236,24 +242,30 @@ def test_cannot_find_exe_raises_exception(self):
self.infile, self.outfile, 5, '/path/to/dmd', 'foox', '-bar foo'
)

def test_correct_command_string(self):
def test_local_runs_diamond_cmd_directly(self):
self.mock_sh_which.return_value = '/path/to/diamond'
parallel_blast.parallel_diamond(
self.infile, self.outfile, 5, '/path/to/dmd', 'foox', '-bar foo'
)
r = self.mock_sh_cmd.return_value.call_args[0]
self.assertIn('foox', r)
self.assertIn('--threads', r)
self.assertIn('5', r)
self.assertIn('--db', r)
self.assertIn('/path/to/dmd', r)
self.assertIn('--query', r)
self.assertIn('{}', r)
self.assertIn('--cat', r)
self.assertIn('--sshlogin', r)
self.assertIn('1/:', r)

def test_each_remote_host_has_one_instance(self):
# blastx call then view call
r1,r2 = self.mock_sh_cmd.return_value.call_args_list
print(r1)

r1a,r1k = r1
self.assertEqual(5, r1k['threads'])
self.assertEqual(self.infile, r1k['query'])
self.assertEqual('/path/to/dmd', r1k['db'])
self.assertEqual(self.outfile, r1k['daa'])
self.assertEqual('foox', r1a[0])
self.assertEqual(('-bar','foo'), r1a[1:])

r2a,r2k = r2
self.assertEqual('view', r2a[0])
self.assertEqual(self.outfile+'.daa', r2k['daa'])
self.assertEqual(self.mock_open.return_value, r2k['_out'])
self.mock_open.assert_called_once_with(self.outfile,'w')

def test_each_remote_host_has_one_instance_and_runs_parallel(self):
self.mock_sh_which.return_value = '/path/to/diamond'
self.mock_open.return_value.__enter__.return_value = PBS_MACHINEFILE.splitlines()
with mock.patch.dict('bio_pieces.parallel_blast.os.environ', {'PBS_NODEFILE': self.hostfile}):
Expand All @@ -265,6 +277,45 @@ def test_each_remote_host_has_one_instance(self):
self.assertIn('1/node1.localhost', r)
self.assertIn('1/node2.localhost', r)
self.assertIn('1/node3.localhost', r)
self.assertIn('10', r)

def test_passing_other_options_that_are_static_options_raises_exception(self):
self.mock_sh_which.return_value = '/path/to/diamond'
for arg in parallel_blast.STATIC_DIAMOND_ARGS:
self.assertRaises(
ValueError,
parallel_blast.parallel_diamond,
self.infile, self.outfile, 5, '/path/to/dmd', 'foox', arg + ' foo'
)

@mock.patch('bio_pieces.parallel_blast.sys.stdout')
class TestRun(MockSH):
def setUp(self):
super(TestRun, self).setUp()
self.args = ['foo', '-bar']
self.kwargs = {'foo':'bar'}
self.cmd = mock.Mock()
self.cmd.return_value.cmd = ['/path/to/x', 'foo', '-bar', '--foo', 'bar']
self.cmd._path = '/path/to/x'

def test_passes_args_kwargs_to_cmd(self, mock_sout):
_in = StringIO('test')
_out = StringIO()
self.kwargs['_in'] = _in
self.kwargs['_out'] = _out
r = parallel_blast.run(self.cmd, *self.args, **self.kwargs)
self.cmd.called_once_with(*self.args, **self.kwargs)
sout = mock_sout.write.call_args_list[0][0][0]
self.assertNotIn('_in', sout)
self.assertNotIn('_out', sout)

def test_prints_cmd_to_stdout(self, mock_sout):
r = parallel_blast.run(self.cmd, *self.args, **self.kwargs)
sout = mock_sout.write.call_args_list
self.assertEqual(
'[cmd] {0}'.format(' '.join(self.cmd.return_value.cmd)),
mock_sout.write.call_args_list[0][0][0]
)

@mock.patch('bio_pieces.parallel_blast.parallel_blast')
@mock.patch('bio_pieces.parallel_blast.parallel_diamond')
Expand Down