Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type annotations for consensus.py #21

Merged
merged 10 commits into from
Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 51 additions & 30 deletions bioframework/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,40 @@
from functools import partial
from itertools import ifilter, imap, groupby, takewhile, repeat, starmap, izip_longest
import os, sys
from typing import Tuple, Dict, List, Iterator, Iterable, Any, Callable
import collections

from typing import Tuple, Dict, List, Iterator, Iterable, Any, Callable, NamedTuple, BinaryIO

from Bio import SeqIO #done
from Bio.SeqRecord import SeqRecord #done
import vcf #done
from vcf.model import _Record
import sh #todo
#from toolz import compose
from toolz.dicttoolz import merge, dissoc, merge_with, valfilter #todo
from toolz.dicttoolz import merge, dissoc, merge_with, valfilter #done
from docopt import docopt #ignore
from schema import Schema, Use #ignore
from contracts import contract, new_contract #can ignore
#from contracts import contract, new_contract #can ignore
#from mypy.types import VCFRow
#############
# Constants #
#############

AMBIGUITY_TABLE = { 'A': 'A', 'T': 'T', 'G': 'G', 'C': 'C', 'N': 'N',
'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT':
'Y', 'GT': 'K', 'ACG': 'V', 'ACT': 'H', 'AGT': 'D',
'CGT': 'B', 'ACGT': 'N' }
VCFRow = NamedTuple("VCFRow",
[('ref', str),
('AO', List[int]),
('DP', int),
('chrom',str),
('pos', int),
('alt', List[str])])
AMBIGUITY_TABLE = { 'A': 'A', 'T': 'T', 'G': 'G', 'C': 'C', 'N': 'N', 'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT': 'Y', 'GT': 'K', 'ACG': 'V', 'ACT': 'H', 'AGT': 'D', 'CGT': 'B', 'ACGT': 'N' }

MAJORITY_PERCENTAGE = 80
MIN_DEPTH = 10
Mut = Tuple[str, str, int]
###########
# Reducer #
###########
@contract(reference='string', muts='list(tuple(string, string, int))' )
#@contract(reference='string', muts='list(tuple(string, string, int))' )
def make_consensus(reference, muts):
# type: (str, List[Mut]) -> Tuple[str, List[Mut]]
''' Actually builds a consensus string by recursively applying
Expand Down Expand Up @@ -98,10 +105,10 @@ def call_base_multi_alts(min_depth, majority_percentage, dp, alts, ref):

#@contract(min_depth='number,>=0', majority_percentage='number,>=0,<=100', rec='dict', returns='tuple(string, string, int)')
def call_many(min_depth, majority_percentage, rec):
# type: (int, int, Dict) -> Mut
# type: (int, int, VCFRow) -> Mut
#TODO: switch to generators
muts = zip(rec['AO'], rec['alt'])
ref, dp, pos = rec['ref'], rec['DP'], rec['pos']
muts = zip(rec.AO, rec.alt)
ref, dp, pos = rec.ref, rec.DP, rec.pos
longest_len = max(map(lambda x: len(x[-1]), muts))
longest_len = max(longest_len, len(ref))
def fill_gap(r):
Expand All @@ -115,39 +122,40 @@ def seq_count(acc, ao_and_nts):
return map(merge_sum, acc, [{nt:ao} for nt in nts])
# create a list of {base : count}, where the index matches the position
mut_dicts = reduce(seq_count, xs, [{}])
base_caller = partial(call_base_multi_alts, min_depth, majority_percentage, dp) # type: Callable[[Dict[Any,Any], str], str]
base_caller = lambda m,r: call_base_multi_alts(min_depth, majority_percentage, dp, m, r) # # # ?Callable[[Dict[Any,Any], str], str]
res = map(base_caller, mut_dicts, ref)
# trim None values at the end, (which indicate deletion)
result = takewhile(bool, res)
return (ref, ''.join(result), pos)

@contract(rec='dict',returns='dict')
#@contract(rec='dict',returns='dict')
def flatten_vcf_record(rec):
# type: (_Record) -> Dict[str, Any]
# type: (_Record) -> VCFRow
_rec = merge({
'alt' : rec.ALT, 'ref' : rec.REF,
'pos' : rec.POS, 'chrom' : rec.CHROM},
rec.INFO)
if not hasattr(_rec['alt'], '__iter__'): #TODO: put this somewhere else
return merge(_rec, dict(alt=[_rec['alt']], AO=[_rec['AO']]))
else: return _rec
d = merge(_rec, dict(alt=[_rec['alt']], AO=[_rec['AO']]))
else: d = _rec
return VCFRow(**d)

##############
# Group By #
##############
#NOTE: could possibly drop lists, use fn.Stream all the time,
# and write a Stream instance for contracts like:
# https://github.com/AndreaCensi/contracts/blob/831ec7a5260ceb8960540ba0cb6cc26370cf2d82/src/contracts/library/lists.py
@contract(references='list[N]($SeqRecord),N>0', muts='list(dict)',returns='tuple(list(dict))')
#@contract(references='list[N]($SeqRecord),N>0', muts='list(dict)',returns='tuple(list(dict))')
def group_muts_by_refs(references, muts):
# type: (List[SeqRecord], List[Dict[Any, Any]]) -> Iterable[List[Dict]]
# type: (List[SeqRecord], List[VCFRow]) -> List[List[VCFRow]]
'''group and sort the mutations so that they match the order of the references.'''
#NOTE: muts will already be "sorted" in that they are grouped together in the vcf
#fix the groupby so it doesn't incidentally drain the first object of the group
unzip = lambda x: zip(*x)
chroms, groups = unzip(map(lambda kv: (kv[0], list(kv[1])), groupby(muts, get('chrom'))))
@contract(key='tuple(string,list)')
def index_of_ref(key):
chroms, groups = unzip(map(lambda kv: (kv[0], list(kv[1])), groupby(muts, lambda x: x.chrom)))
#@contract(key='tuple(string,list)')
def index_of_ref(key): # type: (Tuple[str, List[SeqRecord]]) -> int
chrom=key[0]
index_of_chrom = map(lambda x: x.id, references).index(chrom)
return index_of_chrom
Expand All @@ -162,13 +170,15 @@ def index_of_ref(key):

#@contract(references='SeqRecord', muts='seq(dict)', mind=int, majority=int)
def all_consensuses(references, muts, mind, majority):
# type: (Iterable[SeqRecord], Iterable[Dict[Any,Any]], int, int) -> Tuple[List[str], Iterator[Tuple[str, List[Mut]]]]
# type: (List[SeqRecord], List[VCFRow], int, int) -> Tuple[List[SeqRecord], Iterable[Tuple[str, List[Mut]]]]
''' generates conesnsuses, including for flu and other mult-reference VCFs.
applies filters and base callers to the mutations.
then builds the consensus using these calls and `make_consensus`'''
muts_by_ref = group_muts_by_refs(references, muts)
def single_consensus(muts, ref):
the_muts = map(partial(call_many, mind, majority), muts)
# type: (List[VCFRow], SeqRecord) -> Tuple[str, List[Mut]]
#the_muts = map(partial(call_many, mind, majority), muts)
the_muts = map(lambda x: call_many(mind, majority, x), muts)
ref_and_alt_differ = lambda x: x[0] != x[1]
# vcf is index-starting-at-1
#real_muts = map(lambda (a,b,pos): (a,b,pos-1), filter(ref_and_alt_differ, the_muts))
Expand All @@ -183,16 +193,27 @@ def single_consensus(muts, ref):
def consensus_str(ref, consensus): # type: (SeqRecord, str) -> str
return ">{0}:Consensus\n{1}".format(ref.id, consensus)

def zero_coverage_positions(bam_file, ref_file): # type: (str, str) -> Iterable[int]
pileup = sh.Command('mpileup')(bam_file, f=ref_file, _iter=True)
get_pos = lambda x: int(x.split()[1]) # type: Callable[[str],int]
return imap(get_pos, pileup)

#TODO: is pileup 0-based or 1-based index?
def trim_ref(ref, positions): # type: (str, Iterator[int]) -> str
start, end = next(positions), collections.deque(positions, 1)[0]
return '-'*start + ref[:start:end] + '-'*(len(ref) - end)



#@contract(ref_fasta=str, vcf=str, mind=int, majority=int)
def run(ref_fasta, freebayes_vcf, outfile, mind, majority):
# type: (str, str, str, int, int) -> int
refs = SeqIO.parse(ref_fasta, 'fasta')
# type: (str, str, BinaryIO, int, int) -> int
_refs = SeqIO.parse(ref_fasta, 'fasta')
with open(freebayes_vcf, 'r') as vcf_handle:
muts = imap(flatten_vcf_record, vcf.Reader(vcf_handle))
refs, muts = list(refs), list(muts)
refs, seqs_and_muts = all_consensuses(refs, muts, mind, majority)
strings = imap(consensus_str, refs, imap(get(0), seqs_and_muts))
_muts = map(flatten_vcf_record, vcf.Reader(vcf_handle))
refs, muts = list(_refs), list(_muts)
the_refs, seqs_and_muts = all_consensuses(refs, muts, mind, majority)
strings = imap(consensus_str, the_refs, imap(get(0), seqs_and_muts))
result = '\n'.join(strings)
outfile.write(result)
outfile.close()
Expand Down
5 changes: 5 additions & 0 deletions mypy/Bio/SeqIO.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from Bio.SeqRecord import SeqRecord
from typing import Generator, Any, Iterator

def parse(*anything): # type: (*Any) -> Iterator[SeqRecord]
pass
6 changes: 6 additions & 0 deletions mypy/Bio/SeqRecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# from Bio.SeqIO import SeqIO
from typing import NamedTuple
class Stringable(object):
def __str__(self): # type: () -> str
pass
SeqRecord = NamedTuple('SeqRecord', [('id', str), ('seq', Stringable)])
Empty file added mypy/Bio/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions mypy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
1. Install [mypy](https://github.com/python/mypy#quick-start)

2. Run mypy: `MYPYPATH=$PWD/mypy:$PWD/mypy/out mypy --py2 bioframework/consensus.py`

If needed, uses `stubgen` to generate more stub files for other libraries.

Empty file added mypy/__init__.py
Empty file.
44 changes: 44 additions & 0 deletions mypy/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List, Dict, Generator, Iterator, Iterable, Tuple
from Bio import SeqIO
from itertools import imap
from Bio.SeqRecord import SeqRecord
def test_long(): # type: () -> int
return 11999999L
def test_seqIO_map_fails(s): # type: (str) -> List[SeqRecord]
return map(lambda x: x.id, SeqIO.parse(s))

#def test_seqIO_map_fails2(s): # type: (str) -> Iterator[SeqRecord]
# return map(lambda x: x.id, SeqIO.parse(s))
def test_seqIO_map_passes(s): # type: (str) -> Iterable[str]
return imap(lambda x: x.id, SeqIO.parse(s))

def test_seqIO(s): # type: (str) -> Iterator[SeqRecord]
return SeqIO.parse(s)
def test_list_seqIO(s): # type: (str) -> List[SeqRecord]
return list(SeqIO.parse(s))
def test_seqIO_fails(s): # type: (str) -> List[str]
return SeqIO.parse(s)
def test_should_pass(s): # type: (SeqRecord) -> str
return s.id
def test_should_fail(s): # type: (SeqRecord) -> int
return s.id
#def test_should_fail(): # type: () -> List[SeqRecord]
# return 3

#a = test_should_fail()
def test_ordered_dict(od): # type: (Dict[str,int]) -> Dict[str,int]
return 1 #type error 1
#
#a = test_ordered_dict(1) #type error 2
#
#def test_me():
# a = test_ordered_dict(1) # type error 3 is not reported

####def test_ordered_dict(od: typing.Dict[str,int]) -> typing.Dict[str,int]:
#### return 1 #type error 1
####
####a = test_ordered_dict(1) #type error 2
####
####def test_me():
#### a = test_ordered_dict(1) # type error 3 is not reported
###
76 changes: 76 additions & 0 deletions mypy/out/docopt.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Stubs for docopt (Python 2)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from typing import Any

class DocoptLanguageError(Exception): ...

class DocoptExit(SystemExit):
usage = ... # type: Any
def __init__(self, message=''): ...

class Pattern:
def __eq__(self, other): ...
def __hash__(self): ...
def fix(self): ...
def fix_identities(self, uniq=None): ...
def fix_repeating_arguments(self): ...
@property
def either(self): ...

class ChildPattern(Pattern):
name = ... # type: Any
value = ... # type: Any
def __init__(self, name, value=None): ...
def flat(self, *types): ...
def match(self, left, collected=None): ...

class ParentPattern(Pattern):
children = ... # type: Any
def __init__(self, *children): ...
def flat(self, *types): ...

class Argument(ChildPattern):
def single_match(self, left): ...
@classmethod
def parse(class_, source): ...

class Command(Argument):
name = ... # type: Any
value = ... # type: Any
def __init__(self, name, value=False): ...
def single_match(self, left): ...

class Option(ChildPattern):
value = ... # type: Any
def __init__(self, short=None, long=None, argcount=0, value=False): ...
@classmethod
def parse(class_, option_description): ...
def single_match(self, left): ...
@property
def name(self): ...

class Required(ParentPattern):
def match(self, left, collected=None): ...

class Optional(ParentPattern):
def match(self, left, collected=None): ...

class AnyOptions(Optional): ...

class OneOrMore(ParentPattern):
def match(self, left, collected=None): ...

class Either(ParentPattern):
def match(self, left, collected=None): ...

class TokenStream(list):
error = ... # type: Any
def __init__(self, source, error): ...
def move(self): ...
def current(self): ...

class Dict(dict): ...

def docopt(doc, argv=None, help=True, version=None, options_first=False): ...
8 changes: 8 additions & 0 deletions mypy/out/hypothesis/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Stubs for hypothesis (Python 2)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from hypothesis._settings import settings as settings, Verbosity as Verbosity
from hypothesis.version import __version_info__ as __version_info__, __version__ as __version__
from hypothesis.control import assume as assume, note as note, reject as reject
from hypothesis.core import given as given, find as find, example as example, seed as seed
31 changes: 31 additions & 0 deletions mypy/out/schema.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Stubs for schema (Python 2)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from typing import Any

class SchemaError(Exception):
autos = ... # type: Any
errors = ... # type: Any
def __init__(self, autos, errors): ...
@property
def code(self): ...

class And:
def __init__(self, *args, **kw): ...
def validate(self, data): ...

class Or(And):
def validate(self, data): ...

class Use:
def __init__(self, callable_, error=None): ...
def validate(self, data): ...

def priority(s): ...

class Schema:
def __init__(self, schema, error=None): ...
def validate(self, data): ...

class Optional(Schema): ...
3 changes: 3 additions & 0 deletions mypy/sh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Callable, Any, Union, List, Iterator
def Command(s): # type: (str) -> Callable[...,Union[List[str],Iterator[str]]]
pass
Empty file added mypy/toolz/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions mypy/toolz/dicttoolz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Dict, Any, Callable, TypeVar
K = TypeVar('K')
V = TypeVar('V')
V2 = TypeVar('V2')
V3 = TypeVar('V3')
def merge(d1, d2): # type: (Dict[K,V], Dict[K,V]) -> Dict[K,V]
pass

def dissoc(d, k): # type: (Dict[K,V], K) -> Dict[K,V]
pass

def merge_with(f, d1, d2): # type: (Callable[[V,V2], V3], Dict[K,V], Dict[K,V2]) -> Dict[K,V3]
pass

def valfilter(f, d): # type: (Callable[[V], bool], Dict[K,V]) -> Dict[K,V]
pass



#from typing import Dict, Any, Callable, TypeVar
#T = TypeVar('T')
#def merge(d1, d2): # type: (Dict[Any,Any], Dict[Any,Any]) -> Dict[Any,Any]
# pass
#
#def dissoc(d, k): # type: (Dict[Any,Any], Any) -> Dict[Any,Any]
# pass
#
#def merge_with(f, d1, d2): # type: (Callable, Dict[Any,Any], Dict[Any,Any]) -> Dict[Any,Any]
# pass
#
#def valfilter(f, d): # type: (Callable, Dict[Any,Any]) -> Dict[Any,Any]
# pass
11 changes: 11 additions & 0 deletions mypy/vcf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Union, Dict, List, NamedTuple, Iterator, BinaryIO
from vcf.model import _Record

#fields = [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])]
#
#VCFRecord = NamedTuple('VCFRecord', fields)

VCFRecord = NamedTuple('VCFRecord', [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])]
)
def Reader(s): # type: (BinaryIO) -> Iterator[_Record]
pass
Loading