diff --git a/bioframework/consensus.py b/bioframework/consensus.py index 05a1970..84bfdf4 100755 --- a/bioframework/consensus.py +++ b/bioframework/consensus.py @@ -14,25 +14,32 @@ from functools import partial from itertools import ifilter, imap, groupby, takewhile, repeat, starmap, izip_longest import os, sys -from typing import Tuple, Dict, List, Iterator, Iterable, Any, Callable +import collections + +from typing import Tuple, Dict, List, Iterator, Iterable, Any, Callable, NamedTuple, BinaryIO from Bio import SeqIO #done from Bio.SeqRecord import SeqRecord #done import vcf #done from vcf.model import _Record +import sh #todo #from toolz import compose -from toolz.dicttoolz import merge, dissoc, merge_with, valfilter #todo +from toolz.dicttoolz import merge, dissoc, merge_with, valfilter #done from docopt import docopt #ignore from schema import Schema, Use #ignore -from contracts import contract, new_contract #can ignore +#from contracts import contract, new_contract #can ignore +#from mypy.types import VCFRow ############# # Constants # ############# - -AMBIGUITY_TABLE = { 'A': 'A', 'T': 'T', 'G': 'G', 'C': 'C', 'N': 'N', - 'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT': - 'Y', 'GT': 'K', 'ACG': 'V', 'ACT': 'H', 'AGT': 'D', - 'CGT': 'B', 'ACGT': 'N' } +VCFRow = NamedTuple("VCFRow", + [('ref', str), + ('AO', List[int]), + ('DP', int), + ('chrom',str), + ('pos', int), + ('alt', List[str])]) +AMBIGUITY_TABLE = { 'A': 'A', 'T': 'T', 'G': 'G', 'C': 'C', 'N': 'N', 'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT': 'Y', 'GT': 'K', 'ACG': 'V', 'ACT': 'H', 'AGT': 'D', 'CGT': 'B', 'ACGT': 'N' } MAJORITY_PERCENTAGE = 80 MIN_DEPTH = 10 @@ -40,7 +47,7 @@ ########### # Reducer # ########### -@contract(reference='string', muts='list(tuple(string, string, int))' ) +#@contract(reference='string', muts='list(tuple(string, string, int))' ) def make_consensus(reference, muts): # type: (str, List[Mut]) -> Tuple[str, List[Mut]] ''' Actually builds a consensus string by recursively applying @@ -98,10 +105,10 @@ def call_base_multi_alts(min_depth, majority_percentage, dp, alts, ref): #@contract(min_depth='number,>=0', majority_percentage='number,>=0,<=100', rec='dict', returns='tuple(string, string, int)') def call_many(min_depth, majority_percentage, rec): - # type: (int, int, Dict) -> Mut + # type: (int, int, VCFRow) -> Mut #TODO: switch to generators - muts = zip(rec['AO'], rec['alt']) - ref, dp, pos = rec['ref'], rec['DP'], rec['pos'] + muts = zip(rec.AO, rec.alt) + ref, dp, pos = rec.ref, rec.DP, rec.pos longest_len = max(map(lambda x: len(x[-1]), muts)) longest_len = max(longest_len, len(ref)) def fill_gap(r): @@ -115,22 +122,23 @@ def seq_count(acc, ao_and_nts): return map(merge_sum, acc, [{nt:ao} for nt in nts]) # create a list of {base : count}, where the index matches the position mut_dicts = reduce(seq_count, xs, [{}]) - base_caller = partial(call_base_multi_alts, min_depth, majority_percentage, dp) # type: Callable[[Dict[Any,Any], str], str] + base_caller = lambda m,r: call_base_multi_alts(min_depth, majority_percentage, dp, m, r) # # # ?Callable[[Dict[Any,Any], str], str] res = map(base_caller, mut_dicts, ref) # trim None values at the end, (which indicate deletion) result = takewhile(bool, res) return (ref, ''.join(result), pos) -@contract(rec='dict',returns='dict') +#@contract(rec='dict',returns='dict') def flatten_vcf_record(rec): - # type: (_Record) -> Dict[str, Any] + # type: (_Record) -> VCFRow _rec = merge({ 'alt' : rec.ALT, 'ref' : rec.REF, 'pos' : rec.POS, 'chrom' : rec.CHROM}, rec.INFO) if not hasattr(_rec['alt'], '__iter__'): #TODO: put this somewhere else - return merge(_rec, dict(alt=[_rec['alt']], AO=[_rec['AO']])) - else: return _rec + d = merge(_rec, dict(alt=[_rec['alt']], AO=[_rec['AO']])) + else: d = _rec + return VCFRow(**d) ############## # Group By # @@ -138,16 +146,16 @@ def flatten_vcf_record(rec): #NOTE: could possibly drop lists, use fn.Stream all the time, # and write a Stream instance for contracts like: # https://github.com/AndreaCensi/contracts/blob/831ec7a5260ceb8960540ba0cb6cc26370cf2d82/src/contracts/library/lists.py -@contract(references='list[N]($SeqRecord),N>0', muts='list(dict)',returns='tuple(list(dict))') +#@contract(references='list[N]($SeqRecord),N>0', muts='list(dict)',returns='tuple(list(dict))') def group_muts_by_refs(references, muts): - # type: (List[SeqRecord], List[Dict[Any, Any]]) -> Iterable[List[Dict]] + # type: (List[SeqRecord], List[VCFRow]) -> List[List[VCFRow]] '''group and sort the mutations so that they match the order of the references.''' #NOTE: muts will already be "sorted" in that they are grouped together in the vcf #fix the groupby so it doesn't incidentally drain the first object of the group unzip = lambda x: zip(*x) - chroms, groups = unzip(map(lambda kv: (kv[0], list(kv[1])), groupby(muts, get('chrom')))) - @contract(key='tuple(string,list)') - def index_of_ref(key): + chroms, groups = unzip(map(lambda kv: (kv[0], list(kv[1])), groupby(muts, lambda x: x.chrom))) + #@contract(key='tuple(string,list)') + def index_of_ref(key): # type: (Tuple[str, List[SeqRecord]]) -> int chrom=key[0] index_of_chrom = map(lambda x: x.id, references).index(chrom) return index_of_chrom @@ -162,13 +170,15 @@ def index_of_ref(key): #@contract(references='SeqRecord', muts='seq(dict)', mind=int, majority=int) def all_consensuses(references, muts, mind, majority): - # type: (Iterable[SeqRecord], Iterable[Dict[Any,Any]], int, int) -> Tuple[List[str], Iterator[Tuple[str, List[Mut]]]] + # type: (List[SeqRecord], List[VCFRow], int, int) -> Tuple[List[SeqRecord], Iterable[Tuple[str, List[Mut]]]] ''' generates conesnsuses, including for flu and other mult-reference VCFs. applies filters and base callers to the mutations. then builds the consensus using these calls and `make_consensus`''' muts_by_ref = group_muts_by_refs(references, muts) def single_consensus(muts, ref): - the_muts = map(partial(call_many, mind, majority), muts) + # type: (List[VCFRow], SeqRecord) -> Tuple[str, List[Mut]] + #the_muts = map(partial(call_many, mind, majority), muts) + the_muts = map(lambda x: call_many(mind, majority, x), muts) ref_and_alt_differ = lambda x: x[0] != x[1] # vcf is index-starting-at-1 #real_muts = map(lambda (a,b,pos): (a,b,pos-1), filter(ref_and_alt_differ, the_muts)) @@ -183,16 +193,27 @@ def single_consensus(muts, ref): def consensus_str(ref, consensus): # type: (SeqRecord, str) -> str return ">{0}:Consensus\n{1}".format(ref.id, consensus) +def zero_coverage_positions(bam_file, ref_file): # type: (str, str) -> Iterable[int] + pileup = sh.Command('mpileup')(bam_file, f=ref_file, _iter=True) + get_pos = lambda x: int(x.split()[1]) # type: Callable[[str],int] + return imap(get_pos, pileup) + +#TODO: is pileup 0-based or 1-based index? +def trim_ref(ref, positions): # type: (str, Iterator[int]) -> str + start, end = next(positions), collections.deque(positions, 1)[0] + return '-'*start + ref[:start:end] + '-'*(len(ref) - end) + + #@contract(ref_fasta=str, vcf=str, mind=int, majority=int) def run(ref_fasta, freebayes_vcf, outfile, mind, majority): - # type: (str, str, str, int, int) -> int - refs = SeqIO.parse(ref_fasta, 'fasta') + # type: (str, str, BinaryIO, int, int) -> int + _refs = SeqIO.parse(ref_fasta, 'fasta') with open(freebayes_vcf, 'r') as vcf_handle: - muts = imap(flatten_vcf_record, vcf.Reader(vcf_handle)) - refs, muts = list(refs), list(muts) - refs, seqs_and_muts = all_consensuses(refs, muts, mind, majority) - strings = imap(consensus_str, refs, imap(get(0), seqs_and_muts)) + _muts = map(flatten_vcf_record, vcf.Reader(vcf_handle)) + refs, muts = list(_refs), list(_muts) + the_refs, seqs_and_muts = all_consensuses(refs, muts, mind, majority) + strings = imap(consensus_str, the_refs, imap(get(0), seqs_and_muts)) result = '\n'.join(strings) outfile.write(result) outfile.close() diff --git a/mypy/Bio/SeqIO.pyi b/mypy/Bio/SeqIO.pyi new file mode 100644 index 0000000..baaf8a9 --- /dev/null +++ b/mypy/Bio/SeqIO.pyi @@ -0,0 +1,5 @@ +from Bio.SeqRecord import SeqRecord +from typing import Generator, Any, Iterator + +def parse(*anything): # type: (*Any) -> Iterator[SeqRecord] + pass diff --git a/mypy/Bio/SeqRecord.py b/mypy/Bio/SeqRecord.py new file mode 100644 index 0000000..8bdc6c6 --- /dev/null +++ b/mypy/Bio/SeqRecord.py @@ -0,0 +1,6 @@ +# from Bio.SeqIO import SeqIO +from typing import NamedTuple +class Stringable(object): + def __str__(self): # type: () -> str + pass +SeqRecord = NamedTuple('SeqRecord', [('id', str), ('seq', Stringable)]) diff --git a/mypy/Bio/__init__.py b/mypy/Bio/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mypy/README.md b/mypy/README.md new file mode 100644 index 0000000..26906fb --- /dev/null +++ b/mypy/README.md @@ -0,0 +1,6 @@ +1. Install [mypy](https://github.com/python/mypy#quick-start) + +2. Run mypy: `MYPYPATH=$PWD/mypy:$PWD/mypy/out mypy --py2 bioframework/consensus.py` + +If needed, uses `stubgen` to generate more stub files for other libraries. + diff --git a/mypy/__init__.py b/mypy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mypy/example.py b/mypy/example.py new file mode 100644 index 0000000..4f9bd9f --- /dev/null +++ b/mypy/example.py @@ -0,0 +1,44 @@ +from typing import List, Dict, Generator, Iterator, Iterable, Tuple +from Bio import SeqIO +from itertools import imap +from Bio.SeqRecord import SeqRecord +def test_long(): # type: () -> int + return 11999999L +def test_seqIO_map_fails(s): # type: (str) -> List[SeqRecord] + return map(lambda x: x.id, SeqIO.parse(s)) + +#def test_seqIO_map_fails2(s): # type: (str) -> Iterator[SeqRecord] +# return map(lambda x: x.id, SeqIO.parse(s)) +def test_seqIO_map_passes(s): # type: (str) -> Iterable[str] + return imap(lambda x: x.id, SeqIO.parse(s)) + +def test_seqIO(s): # type: (str) -> Iterator[SeqRecord] + return SeqIO.parse(s) +def test_list_seqIO(s): # type: (str) -> List[SeqRecord] + return list(SeqIO.parse(s)) +def test_seqIO_fails(s): # type: (str) -> List[str] + return SeqIO.parse(s) +def test_should_pass(s): # type: (SeqRecord) -> str + return s.id +def test_should_fail(s): # type: (SeqRecord) -> int + return s.id +#def test_should_fail(): # type: () -> List[SeqRecord] +# return 3 + +#a = test_should_fail() +def test_ordered_dict(od): # type: (Dict[str,int]) -> Dict[str,int] + return 1 #type error 1 +# +#a = test_ordered_dict(1) #type error 2 +# +#def test_me(): +# a = test_ordered_dict(1) # type error 3 is not reported + +####def test_ordered_dict(od: typing.Dict[str,int]) -> typing.Dict[str,int]: +#### return 1 #type error 1 +#### +####a = test_ordered_dict(1) #type error 2 +#### +####def test_me(): +#### a = test_ordered_dict(1) # type error 3 is not reported +### diff --git a/mypy/out/docopt.pyi b/mypy/out/docopt.pyi new file mode 100644 index 0000000..6f9431c --- /dev/null +++ b/mypy/out/docopt.pyi @@ -0,0 +1,76 @@ +# Stubs for docopt (Python 2) +# +# NOTE: This dynamically typed stub was automatically generated by stubgen. + +from typing import Any + +class DocoptLanguageError(Exception): ... + +class DocoptExit(SystemExit): + usage = ... # type: Any + def __init__(self, message=''): ... + +class Pattern: + def __eq__(self, other): ... + def __hash__(self): ... + def fix(self): ... + def fix_identities(self, uniq=None): ... + def fix_repeating_arguments(self): ... + @property + def either(self): ... + +class ChildPattern(Pattern): + name = ... # type: Any + value = ... # type: Any + def __init__(self, name, value=None): ... + def flat(self, *types): ... + def match(self, left, collected=None): ... + +class ParentPattern(Pattern): + children = ... # type: Any + def __init__(self, *children): ... + def flat(self, *types): ... + +class Argument(ChildPattern): + def single_match(self, left): ... + @classmethod + def parse(class_, source): ... + +class Command(Argument): + name = ... # type: Any + value = ... # type: Any + def __init__(self, name, value=False): ... + def single_match(self, left): ... + +class Option(ChildPattern): + value = ... # type: Any + def __init__(self, short=None, long=None, argcount=0, value=False): ... + @classmethod + def parse(class_, option_description): ... + def single_match(self, left): ... + @property + def name(self): ... + +class Required(ParentPattern): + def match(self, left, collected=None): ... + +class Optional(ParentPattern): + def match(self, left, collected=None): ... + +class AnyOptions(Optional): ... + +class OneOrMore(ParentPattern): + def match(self, left, collected=None): ... + +class Either(ParentPattern): + def match(self, left, collected=None): ... + +class TokenStream(list): + error = ... # type: Any + def __init__(self, source, error): ... + def move(self): ... + def current(self): ... + +class Dict(dict): ... + +def docopt(doc, argv=None, help=True, version=None, options_first=False): ... diff --git a/mypy/out/hypothesis/__init__.pyi b/mypy/out/hypothesis/__init__.pyi new file mode 100644 index 0000000..764f9f7 --- /dev/null +++ b/mypy/out/hypothesis/__init__.pyi @@ -0,0 +1,8 @@ +# Stubs for hypothesis (Python 2) +# +# NOTE: This dynamically typed stub was automatically generated by stubgen. + +from hypothesis._settings import settings as settings, Verbosity as Verbosity +from hypothesis.version import __version_info__ as __version_info__, __version__ as __version__ +from hypothesis.control import assume as assume, note as note, reject as reject +from hypothesis.core import given as given, find as find, example as example, seed as seed diff --git a/mypy/out/schema.pyi b/mypy/out/schema.pyi new file mode 100644 index 0000000..3eb2140 --- /dev/null +++ b/mypy/out/schema.pyi @@ -0,0 +1,31 @@ +# Stubs for schema (Python 2) +# +# NOTE: This dynamically typed stub was automatically generated by stubgen. + +from typing import Any + +class SchemaError(Exception): + autos = ... # type: Any + errors = ... # type: Any + def __init__(self, autos, errors): ... + @property + def code(self): ... + +class And: + def __init__(self, *args, **kw): ... + def validate(self, data): ... + +class Or(And): + def validate(self, data): ... + +class Use: + def __init__(self, callable_, error=None): ... + def validate(self, data): ... + +def priority(s): ... + +class Schema: + def __init__(self, schema, error=None): ... + def validate(self, data): ... + +class Optional(Schema): ... diff --git a/mypy/sh.py b/mypy/sh.py new file mode 100644 index 0000000..ee8e4e9 --- /dev/null +++ b/mypy/sh.py @@ -0,0 +1,3 @@ +from typing import Callable, Any, Union, List, Iterator +def Command(s): # type: (str) -> Callable[...,Union[List[str],Iterator[str]]] + pass diff --git a/mypy/toolz/__init__.py b/mypy/toolz/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mypy/toolz/dicttoolz.py b/mypy/toolz/dicttoolz.py new file mode 100644 index 0000000..6f7178e --- /dev/null +++ b/mypy/toolz/dicttoolz.py @@ -0,0 +1,32 @@ +from typing import Dict, Any, Callable, TypeVar +K = TypeVar('K') +V = TypeVar('V') +V2 = TypeVar('V2') +V3 = TypeVar('V3') +def merge(d1, d2): # type: (Dict[K,V], Dict[K,V]) -> Dict[K,V] + pass + +def dissoc(d, k): # type: (Dict[K,V], K) -> Dict[K,V] + pass + +def merge_with(f, d1, d2): # type: (Callable[[V,V2], V3], Dict[K,V], Dict[K,V2]) -> Dict[K,V3] + pass + +def valfilter(f, d): # type: (Callable[[V], bool], Dict[K,V]) -> Dict[K,V] + pass + + + +#from typing import Dict, Any, Callable, TypeVar +#T = TypeVar('T') +#def merge(d1, d2): # type: (Dict[Any,Any], Dict[Any,Any]) -> Dict[Any,Any] +# pass +# +#def dissoc(d, k): # type: (Dict[Any,Any], Any) -> Dict[Any,Any] +# pass +# +#def merge_with(f, d1, d2): # type: (Callable, Dict[Any,Any], Dict[Any,Any]) -> Dict[Any,Any] +# pass +# +#def valfilter(f, d): # type: (Callable, Dict[Any,Any]) -> Dict[Any,Any] +# pass diff --git a/mypy/vcf/__init__.py b/mypy/vcf/__init__.py new file mode 100644 index 0000000..29c9cc8 --- /dev/null +++ b/mypy/vcf/__init__.py @@ -0,0 +1,11 @@ +from typing import Union, Dict, List, NamedTuple, Iterator, BinaryIO +from vcf.model import _Record + +#fields = [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])] +# +#VCFRecord = NamedTuple('VCFRecord', fields) + +VCFRecord = NamedTuple('VCFRecord', [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])] +) +def Reader(s): # type: (BinaryIO) -> Iterator[_Record] + pass diff --git a/mypy/vcf/model.py b/mypy/vcf/model.py new file mode 100644 index 0000000..9266869 --- /dev/null +++ b/mypy/vcf/model.py @@ -0,0 +1,3 @@ +from typing import Union, Dict, List, NamedTuple, Iterator +_Record = NamedTuple('_Record', [("ALT", Union[str, List[str]]), ("REF", str), ("POS", int), ("CHROM", str), ("INFO", Dict[str, Union[int, List[int]]])] +) diff --git a/requirements.txt b/requirements.txt index dbdcd86..8e01b72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ pycontracts toolz pyvcf +typing +docopt +schema diff --git a/tests/test_consensus.py b/tests/test_consensus.py index 55f81f5..955e086 100644 --- a/tests/test_consensus.py +++ b/tests/test_consensus.py @@ -6,7 +6,7 @@ from hypothesis import strategies as st from hypothesis import given, assume from operator import itemgetter as get -from bioframework.consensus import call_many, all_consensuses, make_consensus +from bioframework.consensus import call_many, all_consensuses, make_consensus, VCFRow import string import itertools import unittest @@ -15,7 +15,7 @@ st.integers(min_value=1), st.text(alphabet='ACTGN', min_size=1, max_size=6)) \ .flatmap(lambda tup:\ - vcf_dict_strategy_factory(*tup)) + vcf_dict_strategy_factory(*tup)).map(lambda d: VCFRow(**d)) pos_int = st.integers(min_value=0) #TODO: these 10, 80 for trhesh and majority_percentage should be factored out and possibly be strategies themselves @@ -24,23 +24,23 @@ def just_ref(*args): class CallBaseHypothesisTest(unittest.TestCase): @given(simple_vcf_dict_strategy, pos_int) def test_under_mind_is_N(self, mut, mind): - assume(mut['DP'] < mind) + assume(mut.DP < mind) result = call_many(mind, 80, mut)[1] self.assertTrue(all(map(lambda x: x == 'N', result))) @given(simple_vcf_dict_strategy) def test_ao_under_minority_is_ref(self, mut): - assume(sum(mut['AO']) / mut['DP'] < 0.2) + assume(sum(mut.AO) / mut.DP < 0.2) result = call_many(0, 80, mut)[1] - self.assertEquals(result, mut['ref']) + self.assertEquals(result, mut.ref) @given(simple_vcf_dict_strategy) def test_over_majority_is_alt(self, mut): #TODO: this is slow - assume(sum(mut['AO']) / mut['DP'] > 0.8) - assume(len(mut['alt']) == 1) + assume(sum(mut.AO) / mut.DP > 0.8) + assume(len(mut.alt) == 1) result = call_many(0, 80, mut)[1] - self.assertEquals(result, mut['alt'][0]) + self.assertEquals(result, mut.alt[0]) #Commented out because it's not actually always true, # e.g. mut={'ref': u'AA', 'pos': 1, 'AO': [784313725491], 'alt': [u'A'], @@ -48,9 +48,9 @@ def test_over_majority_is_alt(self, mut): # should result in AA # @given(simple_vcf_dict_strategy) # def test_over_minoriy_is_not_ref(self, mut): -# assume(sum(mut['AO']) / mut['DP'] > 0.2) +# assume(sum(mut.AO) / mut.DP > 0.2) # result = call_many(0, 80, mut)[1] -# self.assertNotEquals(result, mut['ref']) +# self.assertNotEquals(result, mut.ref) class ConsesusExampleTest(unittest.TestCase): def test_make_consensus_example(self): @@ -62,7 +62,7 @@ def test_make_consensus_example(self): self.assertEquals(expected, actual) def test_single_example(self): - muts = [{ + raw_muts = [{ 'pos' : 2, 'ref' : 'CG', 'alt' : ['TT'], @@ -78,19 +78,20 @@ def test_single_example(self): 'DP' : 150, 'chrom' : 'X' }] + muts = map(lambda d: VCFRow(**d), raw_muts) ref = make_seqrec('X', 'ACGTACGT') expected = 'ATTTAAGT' result = just_ref([ref], muts, 10, 80) self.assertEquals(expected, result) ref_with_vcf_dicts_strategy = ref_with_vcf_dicts_strategy_factory().map( - lambda (r, muts): (make_seqrec(muts[0]['chrom'], r), muts)) + lambda (r, muts): (make_seqrec(muts[0]['chrom'], r), map(lambda d: VCFRow(**d), muts))) from collections import Counter countof = lambda c: lambda x: Counter(x).get(c, 0) def run_cons(*args): _, alt_and_cons = all_consensuses(*args) cons, alts = zip(*alt_and_cons) return cons[0], alts[0] -class ConsensusHypothesisTest(unittest.TestCase): +class ConsensusHypothesisTest(unittest.TestCase): #ref_and_muts=(SeqRecord(seq=Seq(u'AAAAAAAAAA', IUPACAmbiguousDNA()), id=u'', name='', description='', dbxrefs=[]), [ # {'ref': u'A', 'pos': 1, 'AO': [479, 777, 119, 604], 'alt': [u'G', u'C', u'G', u'TG'], 'chrom': u'', 'DP': 2635}, # {'ref': u'A', 'pos': 3, 'AO': [291, 241, 583, 420], 'alt': [u'CTG', u'C', u'G', u'C'], 'chrom': u'', 'DP': 1627}]), rand=random.seed(0)) @@ -100,11 +101,13 @@ class ConsensusHypothesisTest(unittest.TestCase): def test_n_count(self, ref_and_muts, rand): ref, muts = ref_and_muts originalNs = countof('N')(ref) - alts = map(get('alt'), muts) - assume(not any(map(lambda x: 'N' in x, itertools.chain(*alts)))) + alts = map(lambda x: x.alt, muts) + refs = map(lambda x: x.ref, muts) + assume(not filter(lambda x: 'N' in x, itertools.chain(*alts))) + assume(not filter(lambda x: len(x) > 1, itertools.chain(*alts))) + assume(not filter(lambda x: len(x) > 1, refs)) # needed because ACGT -> N - assume(not filter(lambda x: len(x) > 3, alts)) - expectedNs = len(filter(lambda x: x['DP'] < 10, muts)) + originalNs + expectedNs = len(filter(lambda x: x.DP < 10, muts)) + originalNs result = just_ref([ref], muts, 10, 80) self.assertEquals(countof('N')(result), expectedNs) @@ -119,7 +122,7 @@ def test_less_or_equal_length_when_no_inserts(self, ref_and_muts): def assume_greater_or_equal_length_when_no_deletions(self, ref_and_muts): ref, muts = ref_and_muts def has_deletion(mut): - filter(lambda x: len(x) < mut['ref'], mut['alt']) + filter(lambda x: len(x) < mut.ref, mut.alt) assume(not any(map(has_deletion, muts))) result = just_ref([ref], muts, 10, 80) self.assertLesserEqual(len(ref), len(result)) @@ -133,20 +136,20 @@ def test_more_or_equal_ns_with_lower_threshold(self, ref_and_muts, n1, n2): cons1 = just_ref([ref], muts, n1, 80) cons2 = just_ref([ref], muts, n2, 80) nsCount1, nsCount2 = countof('N')(cons1), countof('N')(cons2) - self.assertLessEqual(nsCount1, nsCount2) + self.assertLessEqual(nsCount1, nsCount2) @given(ref_with_vcf_dicts_strategy) def test_consensus_from_consensus_contains_more_alts(self, ref_and_muts): ref, muts = ref_and_muts - assume(not any(map(lambda x: len(x['alt']) > 1, muts))) + assume(not any(map(lambda x: len(x.alt) > 1, muts))) n1 = 10 cons1, alts = run_cons([ref], muts, n1, 80) assume(not any(map(lambda x: len(x[0]) > len(x[1]), alts))) - cons2, _ = run_cons([make_seqrec(muts[0]['chrom'], cons1)], muts, n1, 80) + cons2, _ = run_cons([make_seqrec(muts[0].chrom, cons1)], muts, n1, 80) picked_alts = map(get(1), alts) altCounts1 = sum(map(lambda f: f(cons1), map(countof, picked_alts))) altCounts2 = sum(map(lambda f: f(cons2), map(countof, picked_alts))) - self.assertLessEqual(altCounts1, altCounts2) + self.assertLessEqual(altCounts1, altCounts2) #NOTE: the below test appears to be meaningless, @@ -156,7 +159,7 @@ def test_consensus_from_consensus_contains_more_alts(self, ref_and_muts): def test_lower_majority_required_contains_more_alts(self, ref_and_muts, p1, p2): ref, muts = ref_and_muts assume(p1 < p2) - assume(not any(map(lambda x: len(x['alt']) > 1, muts))) + assume(not any(map(lambda x: len(x.alt) > 1, muts))) n1 = 10 cons1, alts = run_cons([ref], muts, n1, p1) assume(not any(map(lambda x: len(x[0]) > len(x[1]), alts)))