Skip to content

Commit

Permalink
Merge pull request #21 from VDBWRAIR/types
Browse files Browse the repository at this point in the history
Type annotations for consensus.py
  • Loading branch information
averagehat committed Mar 4, 2016
2 parents 613e825 + 272a115 commit 875e1aa
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 53 deletions.
81 changes: 51 additions & 30 deletions bioframework/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,40 @@
from functools import partial
from itertools import ifilter, imap, groupby, takewhile, repeat, starmap, izip_longest
import os, sys
from typing import Tuple, Dict, List, Iterator, Iterable, Any, Callable
import collections

from typing import Tuple, Dict, List, Iterator, Iterable, Any, Callable, NamedTuple, BinaryIO

from Bio import SeqIO #done
from Bio.SeqRecord import SeqRecord #done
import vcf #done
from vcf.model import _Record
import sh #todo
#from toolz import compose
from toolz.dicttoolz import merge, dissoc, merge_with, valfilter #todo
from toolz.dicttoolz import merge, dissoc, merge_with, valfilter #done
from docopt import docopt #ignore
from schema import Schema, Use #ignore
from contracts import contract, new_contract #can ignore
#from contracts import contract, new_contract #can ignore
#from mypy.types import VCFRow
#############
# Constants #
#############

AMBIGUITY_TABLE = { 'A': 'A', 'T': 'T', 'G': 'G', 'C': 'C', 'N': 'N',
'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT':
'Y', 'GT': 'K', 'ACG': 'V', 'ACT': 'H', 'AGT': 'D',
'CGT': 'B', 'ACGT': 'N' }
VCFRow = NamedTuple("VCFRow",
[('ref', str),
('AO', List[int]),
('DP', int),
('chrom',str),
('pos', int),
('alt', List[str])])
AMBIGUITY_TABLE = { 'A': 'A', 'T': 'T', 'G': 'G', 'C': 'C', 'N': 'N', 'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT': 'Y', 'GT': 'K', 'ACG': 'V', 'ACT': 'H', 'AGT': 'D', 'CGT': 'B', 'ACGT': 'N' }

MAJORITY_PERCENTAGE = 80
MIN_DEPTH = 10
Mut = Tuple[str, str, int]
###########
# Reducer #
###########
@contract(reference='string', muts='list(tuple(string, string, int))' )
#@contract(reference='string', muts='list(tuple(string, string, int))' )
def make_consensus(reference, muts):
# type: (str, List[Mut]) -> Tuple[str, List[Mut]]
''' Actually builds a consensus string by recursively applying
Expand Down Expand Up @@ -98,10 +105,10 @@ def call_base_multi_alts(min_depth, majority_percentage, dp, alts, ref):

#@contract(min_depth='number,>=0', majority_percentage='number,>=0,<=100', rec='dict', returns='tuple(string, string, int)')
def call_many(min_depth, majority_percentage, rec):
# type: (int, int, Dict) -> Mut
# type: (int, int, VCFRow) -> Mut
#TODO: switch to generators
muts = zip(rec['AO'], rec['alt'])
ref, dp, pos = rec['ref'], rec['DP'], rec['pos']
muts = zip(rec.AO, rec.alt)
ref, dp, pos = rec.ref, rec.DP, rec.pos
longest_len = max(map(lambda x: len(x[-1]), muts))
longest_len = max(longest_len, len(ref))
def fill_gap(r):
Expand All @@ -115,39 +122,40 @@ def seq_count(acc, ao_and_nts):
return map(merge_sum, acc, [{nt:ao} for nt in nts])
# create a list of {base : count}, where the index matches the position
mut_dicts = reduce(seq_count, xs, [{}])
base_caller = partial(call_base_multi_alts, min_depth, majority_percentage, dp) # type: Callable[[Dict[Any,Any], str], str]
base_caller = lambda m,r: call_base_multi_alts(min_depth, majority_percentage, dp, m, r) # # # ?Callable[[Dict[Any,Any], str], str]
res = map(base_caller, mut_dicts, ref)
# trim None values at the end, (which indicate deletion)
result = takewhile(bool, res)
return (ref, ''.join(result), pos)

@contract(rec='dict',returns='dict')
#@contract(rec='dict',returns='dict')
def flatten_vcf_record(rec):
# type: (_Record) -> Dict[str, Any]
# type: (_Record) -> VCFRow
_rec = merge({
'alt' : rec.ALT, 'ref' : rec.REF,
'pos' : rec.POS, 'chrom' : rec.CHROM},
rec.INFO)
if not hasattr(_rec['alt'], '__iter__'): #TODO: put this somewhere else
return merge(_rec, dict(alt=[_rec['alt']], AO=[_rec['AO']]))
else: return _rec
d = merge(_rec, dict(alt=[_rec['alt']], AO=[_rec['AO']]))
else: d = _rec
return VCFRow(**d)

##############
# Group By #
##############
#NOTE: could possibly drop lists, use fn.Stream all the time,
# and write a Stream instance for contracts like:
# https://github.com/AndreaCensi/contracts/blob/831ec7a5260ceb8960540ba0cb6cc26370cf2d82/src/contracts/library/lists.py
@contract(references='list[N]($SeqRecord),N>0', muts='list(dict)',returns='tuple(list(dict))')
#@contract(references='list[N]($SeqRecord),N>0', muts='list(dict)',returns='tuple(list(dict))')
def group_muts_by_refs(references, muts):
# type: (List[SeqRecord], List[Dict[Any, Any]]) -> Iterable[List[Dict]]
# type: (List[SeqRecord], List[VCFRow]) -> List[List[VCFRow]]
'''group and sort the mutations so that they match the order of the references.'''
#NOTE: muts will already be "sorted" in that they are grouped together in the vcf
#fix the groupby so it doesn't incidentally drain the first object of the group
unzip = lambda x: zip(*x)
chroms, groups = unzip(map(lambda kv: (kv[0], list(kv[1])), groupby(muts, get('chrom'))))
@contract(key='tuple(string,list)')
def index_of_ref(key):
chroms, groups = unzip(map(lambda kv: (kv[0], list(kv[1])), groupby(muts, lambda x: x.chrom)))
#@contract(key='tuple(string,list)')
def index_of_ref(key): # type: (Tuple[str, List[SeqRecord]]) -> int
chrom=key[0]
index_of_chrom = map(lambda x: x.id, references).index(chrom)
return index_of_chrom
Expand All @@ -162,13 +170,15 @@ def index_of_ref(key):

#@contract(references='SeqRecord', muts='seq(dict)', mind=int, majority=int)
def all_consensuses(references, muts, mind, majority):
# type: (Iterable[SeqRecord], Iterable[Dict[Any,Any]], int, int) -> Tuple[List[str], Iterator[Tuple[str, List[Mut]]]]
# type: (List[SeqRecord], List[VCFRow], int, int) -> Tuple[List[SeqRecord], Iterable[Tuple[str, List[Mut]]]]
''' generates conesnsuses, including for flu and other mult-reference VCFs.
applies filters and base callers to the mutations.
then builds the consensus using these calls and `make_consensus`'''
muts_by_ref = group_muts_by_refs(references, muts)
def single_consensus(muts, ref):
the_muts = map(partial(call_many, mind, majority), muts)
# type: (List[VCFRow], SeqRecord) -> Tuple[str, List[Mut]]
#the_muts = map(partial(call_many, mind, majority), muts)
the_muts = map(lambda x: call_many(mind, majority, x), muts)
ref_and_alt_differ = lambda x: x[0] != x[1]
# vcf is index-starting-at-1
#real_muts = map(lambda (a,b,pos): (a,b,pos-1), filter(ref_and_alt_differ, the_muts))
Expand All @@ -183,16 +193,27 @@ def single_consensus(muts, ref):
def consensus_str(ref, consensus): # type: (SeqRecord, str) -> str
return ">{0}:Consensus\n{1}".format(ref.id, consensus)

def zero_coverage_positions(bam_file, ref_file): # type: (str, str) -> Iterable[int]
pileup = sh.Command('mpileup')(bam_file, f=ref_file, _iter=True)
get_pos = lambda x: int(x.split()[1]) # type: Callable[[str],int]
return imap(get_pos, pileup)

#TODO: is pileup 0-based or 1-based index?
def trim_ref(ref, positions): # type: (str, Iterator[int]) -> str
start, end = next(positions), collections.deque(positions, 1)[0]
return '-'*start + ref[:start:end] + '-'*(len(ref) - end)



#@contract(ref_fasta=str, vcf=str, mind=int, majority=int)
def run(ref_fasta, freebayes_vcf, outfile, mind, majority):
# type: (str, str, str, int, int) -> int
refs = SeqIO.parse(ref_fasta, 'fasta')
# type: (str, str, BinaryIO, int, int) -> int
_refs = SeqIO.parse(ref_fasta, 'fasta')
with open(freebayes_vcf, 'r') as vcf_handle:
muts = imap(flatten_vcf_record, vcf.Reader(vcf_handle))
refs, muts = list(refs), list(muts)
refs, seqs_and_muts = all_consensuses(refs, muts, mind, majority)
strings = imap(consensus_str, refs, imap(get(0), seqs_and_muts))
_muts = map(flatten_vcf_record, vcf.Reader(vcf_handle))
refs, muts = list(_refs), list(_muts)
the_refs, seqs_and_muts = all_consensuses(refs, muts, mind, majority)
strings = imap(consensus_str, the_refs, imap(get(0), seqs_and_muts))
result = '\n'.join(strings)
outfile.write(result)
outfile.close()
Expand Down
5 changes: 5 additions & 0 deletions mypy/Bio/SeqIO.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from Bio.SeqRecord import SeqRecord
from typing import Generator, Any, Iterator

def parse(*anything): # type: (*Any) -> Iterator[SeqRecord]
pass
6 changes: 6 additions & 0 deletions mypy/Bio/SeqRecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# from Bio.SeqIO import SeqIO
from typing import NamedTuple
class Stringable(object):
def __str__(self): # type: () -> str
pass
SeqRecord = NamedTuple('SeqRecord', [('id', str), ('seq', Stringable)])
Empty file added mypy/Bio/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions mypy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
1. Install [mypy](https://github.com/python/mypy#quick-start)

2. Run mypy: `MYPYPATH=$PWD/mypy:$PWD/mypy/out mypy --py2 bioframework/consensus.py`

If needed, uses `stubgen` to generate more stub files for other libraries.

Empty file added mypy/__init__.py
Empty file.
44 changes: 44 additions & 0 deletions mypy/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List, Dict, Generator, Iterator, Iterable, Tuple
from Bio import SeqIO
from itertools import imap
from Bio.SeqRecord import SeqRecord
def test_long(): # type: () -> int
return 11999999L
def test_seqIO_map_fails(s): # type: (str) -> List[SeqRecord]
return map(lambda x: x.id, SeqIO.parse(s))

#def test_seqIO_map_fails2(s): # type: (str) -> Iterator[SeqRecord]
# return map(lambda x: x.id, SeqIO.parse(s))
def test_seqIO_map_passes(s): # type: (str) -> Iterable[str]
return imap(lambda x: x.id, SeqIO.parse(s))

def test_seqIO(s): # type: (str) -> Iterator[SeqRecord]
return SeqIO.parse(s)
def test_list_seqIO(s): # type: (str) -> List[SeqRecord]
return list(SeqIO.parse(s))
def test_seqIO_fails(s): # type: (str) -> List[str]
return SeqIO.parse(s)
def test_should_pass(s): # type: (SeqRecord) -> str
return s.id
def test_should_fail(s): # type: (SeqRecord) -> int
return s.id
#def test_should_fail(): # type: () -> List[SeqRecord]
# return 3

#a = test_should_fail()
def test_ordered_dict(od): # type: (Dict[str,int]) -> Dict[str,int]
return 1 #type error 1
#
#a = test_ordered_dict(1) #type error 2
#
#def test_me():
# a = test_ordered_dict(1) # type error 3 is not reported

####def test_ordered_dict(od: typing.Dict[str,int]) -> typing.Dict[str,int]:
#### return 1 #type error 1
####
####a = test_ordered_dict(1) #type error 2
####
####def test_me():
#### a = test_ordered_dict(1) # type error 3 is not reported
###
76 changes: 76 additions & 0 deletions mypy/out/docopt.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Stubs for docopt (Python 2)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from typing import Any

class DocoptLanguageError(Exception): ...

class DocoptExit(SystemExit):
usage = ... # type: Any
def __init__(self, message=''): ...

class Pattern:
def __eq__(self, other): ...
def __hash__(self): ...
def fix(self): ...
def fix_identities(self, uniq=None): ...
def fix_repeating_arguments(self): ...
@property
def either(self): ...

class ChildPattern(Pattern):
name = ... # type: Any
value = ... # type: Any
def __init__(self, name, value=None): ...
def flat(self, *types): ...
def match(self, left, collected=None): ...

class ParentPattern(Pattern):
children = ... # type: Any
def __init__(self, *children): ...
def flat(self, *types): ...

class Argument(ChildPattern):
def single_match(self, left): ...
@classmethod
def parse(class_, source): ...

class Command(Argument):
name = ... # type: Any
value = ... # type: Any
def __init__(self, name, value=False): ...
def single_match(self, left): ...

class Option(ChildPattern):
value = ... # type: Any
def __init__(self, short=None, long=None, argcount=0, value=False): ...
@classmethod
def parse(class_, option_description): ...
def single_match(self, left): ...
@property
def name(self): ...

class Required(ParentPattern):
def match(self, left, collected=None): ...

class Optional(ParentPattern):
def match(self, left, collected=None): ...

class AnyOptions(Optional): ...

class OneOrMore(ParentPattern):
def match(self, left, collected=None): ...

class Either(ParentPattern):
def match(self, left, collected=None): ...

class TokenStream(list):
error = ... # type: Any
def __init__(self, source, error): ...
def move(self): ...
def current(self): ...

class Dict(dict): ...

def docopt(doc, argv=None, help=True, version=None, options_first=False): ...
8 changes: 8 additions & 0 deletions mypy/out/hypothesis/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Stubs for hypothesis (Python 2)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from hypothesis._settings import settings as settings, Verbosity as Verbosity
from hypothesis.version import __version_info__ as __version_info__, __version__ as __version__
from hypothesis.control import assume as assume, note as note, reject as reject
from hypothesis.core import given as given, find as find, example as example, seed as seed
31 changes: 31 additions & 0 deletions mypy/out/schema.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Stubs for schema (Python 2)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from typing import Any

class SchemaError(Exception):
autos = ... # type: Any
errors = ... # type: Any
def __init__(self, autos, errors): ...
@property
def code(self): ...

class And:
def __init__(self, *args, **kw): ...
def validate(self, data): ...

class Or(And):
def validate(self, data): ...

class Use:
def __init__(self, callable_, error=None): ...
def validate(self, data): ...

def priority(s): ...

class Schema:
def __init__(self, schema, error=None): ...
def validate(self, data): ...

class Optional(Schema): ...
3 changes: 3 additions & 0 deletions mypy/sh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Callable, Any, Union, List, Iterator
def Command(s): # type: (str) -> Callable[...,Union[List[str],Iterator[str]]]
pass
Empty file added mypy/toolz/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions mypy/toolz/dicttoolz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Dict, Any, Callable, TypeVar
K = TypeVar('K')
V = TypeVar('V')
V2 = TypeVar('V2')
V3 = TypeVar('V3')
def merge(d1, d2): # type: (Dict[K,V], Dict[K,V]) -> Dict[K,V]
pass

def dissoc(d, k): # type: (Dict[K,V], K) -> Dict[K,V]
pass

def merge_with(f, d1, d2): # type: (Callable[[V,V2], V3], Dict[K,V], Dict[K,V2]) -> Dict[K,V3]
pass

def valfilter(f, d): # type: (Callable[[V], bool], Dict[K,V]) -> Dict[K,V]
pass



#from typing import Dict, Any, Callable, TypeVar
#T = TypeVar('T')
#def merge(d1, d2): # type: (Dict[Any,Any], Dict[Any,Any]) -> Dict[Any,Any]
# pass
#
#def dissoc(d, k): # type: (Dict[Any,Any], Any) -> Dict[Any,Any]
# pass
#
#def merge_with(f, d1, d2): # type: (Callable, Dict[Any,Any], Dict[Any,Any]) -> Dict[Any,Any]
# pass
#
#def valfilter(f, d): # type: (Callable, Dict[Any,Any]) -> Dict[Any,Any]
# pass
11 changes: 11 additions & 0 deletions mypy/vcf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Union, Dict, List, NamedTuple, Iterator, BinaryIO
from vcf.model import _Record

#fields = [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])]
#
#VCFRecord = NamedTuple('VCFRecord', fields)

VCFRecord = NamedTuple('VCFRecord', [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])]
)
def Reader(s): # type: (BinaryIO) -> Iterator[_Record]
pass
Loading

0 comments on commit 875e1aa

Please sign in to comment.