diff --git a/README.md b/README.md index d547fb18..d3d43593 100644 --- a/README.md +++ b/README.md @@ -564,13 +564,13 @@ We recommend _not_ to modify these options unless you are clearly aware of their but may improve running time when disk I/O is relatively slow. `--min_mapq` - Filers out all alignments with MAPQ less than this value (will also filter all secondary alignments, as they typically have MAPQ = 0). + Filters out all alignments with MAPQ less than this value (will also filter all secondary alignments, as they typically have MAPQ = 0). `--inconsistent_mapq_cutoff` - Filers out inconsistent alignments with MAPQ less than this value (works when the reference annotation is provided, default is 5). + Filters out inconsistent alignments with MAPQ less than this value (works when the reference annotation is provided, default is 5). `--simple_alignments_mapq_cutoff` - Filers out alignments with 1 or 2 exons and MAPQ less than this value (works only in annotation-free mode, default is 1). + Filters out alignments with 1 or 2 exons and MAPQ less than this value (works only in annotation-free mode, default is 1). `--normalization_method` Method for normalizing non-grouped counts into TPMs: diff --git a/src/alignment_info.py b/src/alignment_info.py index 2ec187ca..1c8b89a8 100644 --- a/src/alignment_info.py +++ b/src/alignment_info.py @@ -14,6 +14,8 @@ class AlignmentInfo: + REGION_TO_CHECK_LEN = 12 + def __init__(self, alignment): self.alignment = alignment # concat indels @@ -22,7 +24,6 @@ def __init__(self, alignment): self.aligned_pairs = None self.aligned_pairs_start_index = None self.aligned_pairs_end_index = None - self.region_to_check = 12 if not self.read_exons: return self.read_start = self.read_exons[0][0] @@ -60,14 +61,15 @@ def get_error_count(self, ref_start, ref_end, intron_index=None, left_site=True, if intron_index is None: selected_pairs = self.aligned_pairs elif left_site: - # find alinned pairs near intron start (previous exon end) - selected_pairs = self.aligned_pairs[max(0, self.aligned_pairs_end_index[intron_index] - self.region_to_check): - self.aligned_pairs_end_index[intron_index] + self.region_to_check] + # find aligned pairs near intron start (previous exon end) + selected_pairs = self.aligned_pairs[ + max(0, self.aligned_pairs_end_index[intron_index] - AlignmentInfo.REGION_TO_CHECK_LEN): + self.aligned_pairs_end_index[intron_index] + AlignmentInfo.REGION_TO_CHECK_LEN] else: # find aligned pairs near intron end (next exon start) selected_pairs = self.aligned_pairs[ - max(0, self.aligned_pairs_start_index[intron_index+1] - self.region_to_check): - self.aligned_pairs_start_index[intron_index+1] + self.region_to_check] + max(0, self.aligned_pairs_start_index[intron_index + 1] - AlignmentInfo.REGION_TO_CHECK_LEN): + self.aligned_pairs_start_index[intron_index + 1] + AlignmentInfo.REGION_TO_CHECK_LEN] indel_count = 0 mismatch_count = 0 diff --git a/src/alignment_refiner.py b/src/alignment_refiner.py deleted file mode 100644 index 5d169b62..00000000 --- a/src/alignment_refiner.py +++ /dev/null @@ -1,86 +0,0 @@ -############################################################################ -# Copyright (c) 2022-2024 University of Helsinki -# Copyright (c) 2020-2022 Saint Petersburg State University -# # All Rights Reserved -# See file LICENSE for details. -############################################################################ - -import logging - -from Bio import pairwise2 - -from .isoform_assignment import MatchEventSubtype - -logger = logging.getLogger('IsoQuant') -pairwise2.MAX_ALIGNMENTS = 1 - - -class AlignmentRefiner: - def __init__(self, gene_info, params): - self.gene_info = gene_info - self.params = params - self.delta = self.params.delta - self.scores = (2, -1, -1, -.2) - - def classify_skipped_exon(self, isoform_junctions, isoform_cregion, read_junctions, read_cregion, - total_intron_len_diff, read_introns_known, total_exon_len, alignment): - - assert len(set(read_cregion)) == 1 - - read_cpos = read_cregion[0] - read_left, read_right = read_junctions[read_cpos] - l, r = isoform_cregion - iso_left, iso_right = isoform_junctions[l][0], isoform_junctions[r][1] - exon_left, exon_right = isoform_junctions[l][1], isoform_junctions[r][0] - if total_intron_len_diff < 2 * self.delta and total_exon_len <= self.max_missed_exon_len: - if alignment is None or self.reference is None: - return MatchEventSubtype.exon_misallignment - - if iso_left - self.delta <= read_left and iso_right + self.delta >= read_right: - seq = self.get_read_sequence(alignment, (iso_left, read_left)) \ - + self.get_read_sequence(alignment, (read_right, iso_right)) - ref_seq = self.reference.fetch(alignment.reference_name, exon_left, exon_right) - if self.sequences_match(seq, ref_seq): - return MatchEventSubtype.exon_misallignment - else: - return MatchEventSubtype.exon_skipping_novel_intron - - if read_introns_known: - return MatchEventSubtype.exon_skipping_known_intron - return MatchEventSubtype.exon_skipping_novel_intron - - def classify_intron_misalignment(self, read_junctions, isoform_junctions, read_cpos, isoform_cpos, alignment=None): - - read_left, read_right = read_junctions[read_cpos] - iso_left, iso_right = isoform_junctions[isoform_cpos] - - if abs(read_left - iso_left) + abs(read_right - iso_right) <= 2 * self.delta: - return MatchEventSubtype.intron_shift - - if alignment is None or self.reference is None: - if abs(read_left - iso_left) <= self.max_intron_shift: - return MatchEventSubtype.intron_shift - return MatchEventSubtype.intron_alternation_novel - - read_region, ref_region = self.get_aligned_regions_intron(read_left, read_right, iso_left, iso_right) - seq = self.get_read_sequence(alignment, read_region) - ref_seq = self.reference.fetch(alignment.reference_name, *ref_region) - - if self.sequences_match(seq, ref_seq): - return MatchEventSubtype.intron_shift - return MatchEventSubtype.intron_alternation_novel - - def sequences_match(self, seq, ref_seq): - score, size = 0, 1 - for a in pairwise2.align.globalms(seq, ref_seq, *self.scores): - score, size = a[2], a[4] - if score > 0.7 * size: - return True - return False - - @staticmethod - def get_aligned_regions_intron(read_left, read_right, iso_left, iso_right): - if iso_left <= read_left: - return (iso_left, read_left), (iso_right, read_right) - return (read_right, iso_right), (read_left, iso_left) - diff --git a/src/common.py b/src/common.py index 9a88f549..9be377b6 100644 --- a/src/common.py +++ b/src/common.py @@ -12,6 +12,7 @@ import threading import math from collections import defaultdict +from enum import Enum logger = logging.getLogger('IsoQuant') @@ -27,6 +28,26 @@ def increment(self): return self.value +class CigarEvent(Enum): + match = 0 + insertion = 1 + deletion = 2 + skipped = 3 + soft_clipping = 4 + hard_clipping = 5 + padding = 6 + seq_match = 7 + seq_mismatch = 8 + + @classmethod + def get_match_events(cls): + return {cls.match, cls.seq_match, cls.seq_mismatch} + + @classmethod + def get_ins_del_match_events(cls): + return {cls.match, cls.insertion, cls.deletion, cls.seq_match, cls.seq_mismatch} + + # key, value def get_first_best_from_sorted(sorted_list_of_pairs): if not sorted_list_of_pairs: @@ -460,25 +481,25 @@ def concat_gapless_blocks(blocks, cigar_tuples): while cigar_index < len(cigar_tuples) and block_index < len(blocks): # init new block - cigar_event = cigar_tuples[cigar_index][0] + cigar_event = CigarEvent(cigar_tuples[cigar_index][0]) if current_block is None: # init new block from match - if cigar_event in [0, 7, 8]: + if cigar_event in CigarEvent.get_match_events(): current_block = (blocks[block_index][0] - deletions_before_block, blocks[block_index][1]) deletions_before_block = 0 block_index += 1 # keep track of deletions before matched block - elif cigar_event == 2: + elif cigar_event == CigarEvent.deletion: deletions_before_block = cigar_tuples[cigar_index][1] # found intron, add current block - elif cigar_event == 3: + elif cigar_event == CigarEvent.skipped: resulting_blocks.append(current_block) current_block = None # add deletion to block - elif cigar_event == 2: + elif cigar_event == CigarEvent.deletion: current_block = (current_block[0], current_block[1] + cigar_tuples[cigar_index][1]) # found match - merge blocks - elif cigar_event in [0, 7, 8]: + elif cigar_event in CigarEvent.get_match_events(): # if abs(current_block[1] - blocks[block_index][0]) > 1: # logger.debug("Distant blocks") # logger.debug(current_block, blocks[block_index]) @@ -500,38 +521,38 @@ def get_read_blocks(ref_start, cigar_tuples): current_ref_block_start = None current_read_block_start = None current_cigar_block_start = None - has_match=False + has_match = False ref_blocks = [] cigar_blocks = [] read_blocks = [] while cigar_index < len(cigar_tuples): - cigar_event = cigar_tuples[cigar_index][0] + cigar_event = CigarEvent(cigar_tuples[cigar_index][0]) event_len = cigar_tuples[cigar_index][1] - if current_ref_block_start is None and cigar_event in [0, 1, 2, 7, 8]: + if current_ref_block_start is None and cigar_event in CigarEvent.get_ins_del_match_events(): # init new block from match current_ref_block_start = ref_pos current_read_block_start = read_pos current_cigar_block_start = cigar_index - if cigar_event == 1: + if cigar_event == CigarEvent.insertion: read_pos += event_len - elif cigar_event == 2: + elif cigar_event == CigarEvent.deletion: ref_pos += event_len else: read_pos += event_len ref_pos += event_len has_match = True # found intron, add current block - elif cigar_event in [0, 7, 8]: + elif cigar_event in CigarEvent.get_match_events(): read_pos += event_len ref_pos += event_len has_match = True - elif cigar_event == 1: + elif cigar_event == CigarEvent.insertion: read_pos += event_len - elif cigar_event == 2: + elif cigar_event == CigarEvent.deletion: ref_pos += event_len - elif cigar_event == 3: + elif cigar_event == CigarEvent.skipped: if current_ref_block_start: if has_match: ref_blocks.append((current_ref_block_start, ref_pos - 1)) @@ -542,7 +563,7 @@ def get_read_blocks(ref_start, cigar_tuples): current_read_block_start = None current_cigar_block_start = None ref_pos += event_len - elif cigar_event == 4: + elif cigar_event == CigarEvent.soft_clipping: if current_ref_block_start: if has_match: ref_blocks.append((current_ref_block_start, ref_pos - 1)) diff --git a/tests/test_alignment_info.py b/tests/test_alignment_info.py new file mode 100644 index 00000000..a31dbe6a --- /dev/null +++ b/tests/test_alignment_info.py @@ -0,0 +1,106 @@ +import pytest + +from src.alignment_info import AlignmentInfo +from src.polya_finder import PolyAFinder +from src.polya_verification import PolyAFixer + +SEQ1 = 'CTCAAGACCAAGAAGGACGACATGACCATGGCTTAAAAGAGTCTGCTCCCCACAGCCCCCTGCGAT' \ + 'GGATGGACGGAGGAACCAGGGTCGGACGACCTCCGATGCTAAGAGCACTCCAACTGCTGCAAACCG' \ + 'AAGAAGCAGGCATCGGACDCCTTTTTTTTTTAGGCGCCAAAAAAAAAAAACCAAAAAAAAAAAAA' +TUPLES1 = [(0, 167), (4, 30)] +TUPLES2 = [(0, 167), (4, 30), (5, 5), (4, 16), (0, 30), (5, 70), (0, 201)] +PAIRS = [(0, 75897), (1, 76063), (2, 75995)] + + +class Alignment: + def __init__(self, query_name, cigartuples, seq, reference_start, reference_end, reference_name, reference_id, + aligned_pairs): + self.query_name = query_name + self.cigartuples = cigartuples + self.seq = seq + self.reference_start = reference_start + self.reference_end = reference_end + self.reference_name = reference_name + self.reference_id = reference_id + self.aligned_pairs = aligned_pairs + + def get_aligned_pairs(self): + return self.aligned_pairs + + +class TestAlignmentInfo: + + @pytest.mark.parametrize("read_exons, read_blocks, cigar_blocks", + [ + ([(75897, 76063), (76064, 76294)], [(0, 166), (213, 443)], [(0, 0), (4, 6)]) + ]) + def test_basic(self, read_exons, read_blocks, cigar_blocks): + alignment = Alignment('query', TUPLES2, SEQ1, 75896, 113577, 'reference', 789, PAIRS) + alignment_info = AlignmentInfo(alignment) + + assert alignment_info.alignment == alignment + assert alignment_info.read_start == read_exons[0][0] + assert alignment_info.read_end == read_exons[-1][-1] + assert alignment_info.read_exons == read_exons + assert alignment_info.read_blocks == read_blocks + assert alignment_info.cigar_blocks == cigar_blocks + + assert alignment_info.aligned_pairs is None + assert alignment_info.aligned_pairs_start_index is None + assert alignment_info.aligned_pairs_end_index is None + assert alignment_info.polya_info is None + assert alignment_info.combined_profile is None + assert alignment_info.exons_changed is False + assert len(alignment_info.cage_hits) == 0 + + def test_alignment_pairs(self): + alignment = Alignment('query', TUPLES1, SEQ1, 75896, 113577, 'reference', 789, PAIRS) + alignment_info = AlignmentInfo(alignment) + alignment_info.set_aligned_pairs() + + assert len(PAIRS) in alignment_info.aligned_pairs_start_index + assert len(alignment_info.aligned_pairs_start_index) == 3 + + @pytest.mark.parametrize("ref_start, ref_end, intron_index, left_site, chr_record, expected", + [ + (75999, 81000, None, None, None, (0, 0)), + (75999, 81000, 0, False, None, (0, 0)), + (75999, 81000, 0, True, None, (0, 0)) + ]) + def test_get_error_count(self, ref_start, ref_end, intron_index, left_site, chr_record, expected): + alignment = Alignment('query', TUPLES2, SEQ1, 75896, 113577, 'reference', 789, PAIRS) + alignment_info = AlignmentInfo(alignment) + + indel_count, mismatch_count = alignment_info.get_error_count(ref_start, ref_end, intron_index, left_site, + chr_record) + assert (indel_count, mismatch_count) == expected + + @pytest.mark.parametrize("read_start, read_end", + [ + (51055, 51221) + ]) + def test_add_polya_info(self, read_start, read_end): + alignment = Alignment('query', TUPLES2, SEQ1, 51054, 35000, 'reference', 789, PAIRS) + alignment_info = AlignmentInfo(alignment) + polya_finder = PolyAFinder() + polya_fixer = PolyAFixer(1) + + alignment_info.add_polya_info(polya_finder, polya_fixer) + + assert alignment_info.exons_changed is True + assert alignment_info.polya_info.internal_polya_pos == read_end + assert alignment_info.polya_info.internal_polyt_pos == -1 + assert alignment_info.polya_info.external_polya_pos == -1 + assert alignment_info.polya_info.external_polyt_pos == -1 + assert alignment_info.read_exons == [(read_start, read_end)] + assert alignment_info.read_blocks == [(0, (read_end - read_start))] + assert alignment_info.cigar_blocks == [(0, 0)] + assert alignment_info.read_start == read_start + assert alignment_info.read_end == read_end + + def test_get_seq(self): + alignment = Alignment('query', TUPLES1, SEQ1, 75896, 113577, 'reference', 789, PAIRS) + alignment_info = AlignmentInfo(alignment) + + with pytest.raises(AssertionError): + alignment_info.get_seq(76000, 81000) diff --git a/tests/test_common.py b/tests/test_common.py index 69068534..607b2d4d 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -1,4 +1,3 @@ - ############################################################################ # Copyright (c) 2022-2024 University of Helsinki # All Rights Reserved @@ -20,7 +19,7 @@ def test_get_first(self): ([1, 1, 1], 1, 2)], ids=("last", "first", "middle", "several", "all")) def test_correct(self, collection, elem, expected): - assert expected == c.rindex(collection, elem) + assert c.rindex(collection, elem) == expected @pytest.mark.parametrize("collection", [(), ("b", "c")]) def test_no_such_elem(self, collection): @@ -33,64 +32,63 @@ class TestRanges: [((1, 20), (1, 20), 0, True), ((1, 20), (2, 21), 1, True), ((1, 10), (1, 20), 5, False)], ids=("equals", "less_than_delta", "bigger_than_delta")) def test_equal(self, range1, range2, delta, expected): - assert expected == c.equal_ranges(range1, range2, delta) + assert c.equal_ranges(range1, range2, delta) == expected @pytest.mark.parametrize("range1, range2, expected", [((1, 20), (1, 20), True), ((1, 20), (20, 121), True), ((1, 10), (11, 20), False), ((111, 210), (11, 110), False), ((120, 210), (101, 121), True)]) def test_overlaps(self, range1, range2, expected): - assert expected == c.overlaps(range1, range2) + assert c.overlaps(range1, range2) == expected @pytest.mark.parametrize("range1, range2, delta, expected", [((1, 20), (20, 25), 0, True), ((18, 120), (2, 20), 3, True), ((1, 10), (6, 20), 6, False)]) def test_overlaps_at_least(self, range1, range2, delta, expected): - assert expected == c.overlaps_at_least(range1, range2, delta) + assert c.overlaps_at_least(range1, range2, delta) == expected @pytest.mark.parametrize("range1, range2, delta, expected", [((1, 25), (20, 25), 0, True), ((18, 40), (16, 41), 3, True), ((10, 27), (5, 30), 4, False)]) def test_contains_approx(self, range1, range2, delta, expected): - assert expected == c.contains_approx(range1, range2, delta) + assert c.contains_approx(range1, range2, delta) == expected @pytest.mark.parametrize("range1, range2, expected", [((1, 20), (1, 20), True), ((1, 120), (20, 101), True), ((1, 10), (11, 20), False), ((111, 210), (111, 211), False), ((120, 210), (119, 121), False)]) def test_contains(self, range1, range2, expected): - assert expected == c.contains(range1, range2) - + assert c.contains(range1, range2) == expected @pytest.mark.parametrize("range1, range2, expected", [((1, 20), (1, 20), True), ((1, 120), (20, 201), False), ((1, 10), (11, 20), False), ((111, 210), (111, 211), False), ((120, 210), (119, 201), True)]) def test_covers_start(self, range1, range2, expected): - assert expected == c.covers_start(range1, range2) + assert c.covers_start(range1, range2) == expected @pytest.mark.parametrize("range1, range2, expected", [((1, 20), (1, 20), True), ((1, 120), (20, 201), True), ((1, 10), (11, 20), False), ((111, 210), (111, 211), True), ((120, 210), (121, 211), True)]) def test_covers_end(self, range1, range2, expected): - assert expected == c.covers_end(range1, range2) + assert c.covers_end(range1, range2) == expected @pytest.mark.parametrize("range, expected", [((1, 20), 20), ((1, 1), 1)]) def test_interval_len(self, range, expected): - assert expected == c.interval_len(range) + assert c.interval_len(range) == expected @pytest.mark.parametrize("ranges, expected", [([(1, 20), (40, 50)], 31), ([(1, 1), (5, 5)], 2)]) def test_intervals_len(self, ranges, expected): - assert expected == c.intervals_total_length(ranges) + assert c.intervals_total_length(ranges) == expected @pytest.mark.parametrize("ranges, point, expected", [([(1, 20), (40, 50)], 60, 31), ([(1, 1), (5, 5)], 10, 2), ([(1, 20), (40, 50)], 39, 20), ([(1, 20), (40, 50)], 41, 21)]) def test_intervals_len_to_point(self, ranges, point, expected): - assert expected == c.sum_intervals_to_point(ranges, point) + assert c.sum_intervals_to_point(ranges, point) == expected @pytest.mark.parametrize("ranges, point, expected", [([(1, 20), (40, 50)], 1, 30), ([(1, 1), (5, 5)], 0, 2), ([(1, 20), (40, 50)], 39, 11), ([(1, 20), (40, 50)], 11, 20)]) def test_intervals_len_from_point(self, ranges, point, expected): - assert expected == c.sum_intervals_from_point(ranges, point) + assert c.sum_intervals_from_point(ranges, point) == expected class TestSimilarityFunctions: @@ -103,7 +101,7 @@ class TestSimilarityFunctions: ([(146, 155), (160, 172)], [(1, 20), (41, 50), (61, 70)], 0.0), ([(11, 20), (41, 50), (61, 70)], [(1, 4), (10, 12), (14, 16), (19, 30), (30, 34), (60, 70), (101, 149)], 0.17)]) def test_jaccard_index(self, ranges1, ranges2, expected): - assert expected == c.jaccard_similarity(ranges1, ranges2) + assert c.jaccard_similarity(ranges1, ranges2) == expected @pytest.mark.parametrize("read_exons, isoform_exons, expected", [([(1, 100)], [(31, 110)], 0.7), @@ -114,7 +112,7 @@ def test_jaccard_index(self, ranges1, ranges2, expected): ([(146, 155), (160, 172)], [(1, 20), (41, 50), (61, 70)], 0.0), ([(11, 20), (41, 50), (61, 70), (201, 220)], [(1, 4), (10, 12), (14, 16), (19, 30), (30, 34), (60, 70), (101, 109)], 0.34)]) def test_read_coverage_fraction(self, read_exons, isoform_exons, expected): - assert expected == c.read_coverage_fraction(read_exons, isoform_exons) + assert c.read_coverage_fraction(read_exons, isoform_exons) == expected @pytest.mark.parametrize("read_exons, isoform_region, expected", [([(1, 100)], (31, 110), 0.3), @@ -125,7 +123,7 @@ def test_read_coverage_fraction(self, read_exons, isoform_exons, expected): ([(1, 20), (41, 50), (61, 80)], (146, 172), 1.0), ([(11, 20), (41, 50), (61, 90)], (16, 70), 0.5)]) def test_extra_flanking(self, read_exons, isoform_region, expected): - assert expected == c.extra_exon_percentage(isoform_region, read_exons) + assert c.extra_exon_percentage(isoform_region, read_exons) == expected class TestExonFunctions: @@ -133,21 +131,21 @@ class TestExonFunctions: [([(1, 100)], []), ([(1, 20), (41, 50), (61, 70)], [(21, 40), (51, 60)])]) def test_junctions_from_blocks(self, exons, expected): - assert expected == c.junctions_from_blocks(exons) + assert c.junctions_from_blocks(exons) == expected @pytest.mark.parametrize("region, introns, intron_position, expected", [((0, 100), [(20, 80)], 0, (81, 100)), ((0, 100), [(20, 40), (60, 80)], 0, (41, 59)), ((0, 100), [(20, 40), (60, 80)], 1, (81, 100))]) def test_following_exon(self, region, introns, intron_position, expected): - assert expected == c.get_following_exon_from_junctions(region, introns, intron_position) + assert c.get_following_exon_from_junctions(region, introns, intron_position) == expected @pytest.mark.parametrize("region, introns, intron_position, expected", [((0, 100), [(20, 80)], 0, (0, 19)), ((0, 100), [(20, 40), (60, 80)], 0, (0, 19)), ((0, 100), [(20, 40), (60, 80)], 1, (41, 59))]) def test_preceding_exon(self, region, introns, intron_position, expected): - assert expected == c.get_preceding_exon_from_junctions(region, introns, intron_position) + assert c.get_preceding_exon_from_junctions(region, introns, intron_position) == expected @pytest.mark.parametrize("region, introns, exon_position, expected", [((0, 100), [(20, 80)], 0, (0, 19)), @@ -156,13 +154,13 @@ def test_preceding_exon(self, region, introns, intron_position, expected): ((0, 100), [(20, 40), (60, 80)], 1, (41, 59)), ((0, 100), [(20, 40), (60, 80)], 2, (81, 100))]) def test_get_exon(self, region, introns, exon_position, expected): - assert expected == c.get_exon(region, introns, exon_position) + assert c.get_exon(region, introns, exon_position) == expected @pytest.mark.parametrize("exons, expected", [([(1, 100)], [(2, 100)]), ([(1, 20), (41, 50), (61, 70)], [(2, 20), (42, 50), (62, 70)])]) def test_correct_bam_coords(self, exons, expected): - assert expected == c.correct_bam_coords(exons) + assert c.correct_bam_coords(exons) == expected @pytest.mark.parametrize("exons, cigar_tuples, expected", [([(1, 100)], [(0, 99)], [(1, 100)]), @@ -170,7 +168,23 @@ def test_correct_bam_coords(self, exons, expected): [(4, 10), (0, 10), (1, 2), (0, 10), (3, 11), (0, 9), (2, 1), (0, 9), (2, 1), (0, 9), (2, 2), (0, 8), (3, 20), (0, 10), (1, 3), (0, 4), (2, 3), (0, 3), (5, 10)], [(10, 30), (41, 80), (100, 120)])]) def test_concat(self, exons, cigar_tuples, expected): - assert expected == c.concat_gapless_blocks(exons, cigar_tuples) + assert c.concat_gapless_blocks(exons, cigar_tuples) == expected + + @pytest.mark.parametrize("ref_start, cigar_tuples, expected", + [(3, [(0, 4), (1, 1), (2, 1), (0, 2)], ([(4, 10)], [(0, 6)], [(0, 3)])), + (5, [(0, 2), (1, 1), (0, 5), (3, 3), (0, 3)], + ([(6, 12), (16, 18)], [(0, 7), (8, 10)], [(0, 2), (4, 4)])), + (3, [(0, 5), (2, 3), (0, 7)], ([(4, 18)], [(0, 11)], [(0, 2)])), + (2, [(0, 3), (2, 1), (0, 4), (3, 3), (1, 2), (0, 5)], + ([(3, 10), (14, 18)], [(0, 6), (7, 13)], [(0, 2), (4, 5)])), + (5, [(0, 3), (1, 1), (0, 4), (3, 3), (2, 2), (0, 5)], + ([(6, 12), (16, 22)], [(0, 7), (8, 12)], [(0, 2), (4, 5)])), + (6, [(4, 2), (0, 6), (2, 1), (0, 5), (1, 2), (0, 4)], ([(7, 22)], [(2, 18)], [(1, 5)])), + (1, [(4, 3), (0, 9), (3, 4), (0, 6), (2, 1), (0, 7), (4, 2)], + ([(2, 10), (15, 28)], [(3, 11), (12, 24)], [(1, 1), (3, 5)])) + ]) + def test_get_read_blocks(self, ref_start, cigar_tuples, expected): + assert c.get_read_blocks(ref_start, cigar_tuples) == expected class TestProfiles: @@ -179,14 +193,14 @@ class TestProfiles: ([-2, 1, -1, -2], [-1, 1, 1, 1], False), ([-2, 1, -1, -2], [-1, 1, -1, 1], True)]) def test_is_subprofile(self, profile1, profile2, expected): - assert expected == c.is_subprofile(profile1, profile2) + assert c.is_subprofile(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([], [], 0), ([0], [1], 0), ([1], [1], 0), ([1, -1], [1, 1], 1), ([0], [0], 0), ([0, 1], [1, 1], 0), ([-1, 1], [1, 0], 1), ([1, 0, -1], [0, 1, 1], 1)]) def test_difference_in_present_features(self, profile1, profile2, expected): - assert expected == c.difference_in_present_features(profile1, profile2) + assert c.difference_in_present_features(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, profile_range, expected", [([], [], None, 0), ([0], [1], (0, 1), 0), ([1], [1], (0, 1), 0), @@ -194,68 +208,68 @@ def test_difference_in_present_features(self, profile1, profile2, expected): ([0], [0], (0, 1), 0), ([0, 1], [1, 1], (0, 1), 0), ([-1, 1], [1, 0], (0, 1), 1), ([1, 0, -1], [0, 1, 1], (2, 3), 1)]) def test_difference_in_present_features_with_range(self, profile1, profile2, profile_range, expected): - assert expected == c.difference_in_present_features(profile1, profile2, profile_range=profile_range) + assert c.difference_in_present_features(profile1, profile2, profile_range=profile_range) == expected @pytest.mark.parametrize("profile1, profile2, diff_limit, expected", [([1, -1, -1], [-1, 1, 1], 0, 1)]) def test_difference_in_present_features_with_lim(self, profile1, profile2, diff_limit, expected): - assert expected == c.difference_in_present_features(profile1, profile2, diff_limit=diff_limit) + assert c.difference_in_present_features(profile1, profile2, diff_limit=diff_limit) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([0], [1], 0), ([0, 1], [1, 1], 1), ([0, 1], [1, -2], 0), ([0, 1, -1], [-2, 1, 1], 1)]) def test_count_both_present_features(self, profile1, profile2, expected): - assert expected == c.count_both_present_features(profile1, profile2) + assert c.count_both_present_features(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([1], [1], True), ([1, 1], [0, 1], False), ([-1, 1, 1, -2], [0, 1, 1, -2], True), ([-2, 1, 1], [0, 1, -1], False)]) def test_all_features_present(self, profile1, profile2, expected): - assert expected == c.all_features_present(profile1, profile2) + assert c.all_features_present(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([1], [1], [1]), ([0, 1], [1, 1], [0, 1]), ([0, 1, 1, 0], [-1, 1, 1, -2], [0, 1, 1, 0]), ([0, 1, -1], [-2, 1, 1], [0, 1, 0])]) def test_find_matching_positions(self, profile1, profile2, expected): - assert expected == c.find_matching_positions(profile1, profile2) + assert c.find_matching_positions(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([1], [1], True), ([0, 1], [1, 1], True), ([0, 1, 1, 0], [-1, 1, 1, -2], True), ([1, -1, 0, 0], [-1, 1, 1, -2], False), ([0, 1, -1], [-2, 1, 1], True)]) def test_has_overlapping_features(self, profile1, profile2, expected): - assert expected == c.has_overlapping_features(profile1, profile2) + assert c.has_overlapping_features(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([1], [1], False), ([0, 1], [1, 1], False), ([0, 1, 1, 0], [-1, 1, 1, -2], False), ([1, -1, 0, 0], [-1, 1, 1, -2], True), ([0, 1, -1], [-2, 1, 1], True)]) def test_has_inconsistent_features(self, profile1, profile2, expected): - assert expected == c.has_inconsistent_features(profile1, profile2) + assert c.has_inconsistent_features(profile1, profile2) == expected @pytest.mark.parametrize("profile1, mask, expected", [([1], [1], [1]), ([0, 1], [1, 1], [0, 1]), ([0, 1, 1, 0], [1, 1, 0, 0], [0, 1, 0, 0]), ([-2, -1, 1, 0], [1, 1, 0, 1], [-2, -1, 0, 0])]) def test_mask_profile(self, profile1, mask, expected): - assert expected == c.mask_profile(profile1, mask) + assert c.mask_profile(profile1, mask) == expected @pytest.mark.parametrize("profile1, features, expected", [([1], [(10, 20)], [(10, 20)]), ([0, 1], [(10, 20), (30, 40)], [(30, 40)]), ([0, 1, 0, 1], [(10, 20), (30, 40), (110, 120), (130, 140)], [(30, 40), (130, 140)])]) def test_get_blocks_from_profile(self, profile1, features, expected): - assert expected == c.get_blocks_from_profile(features, profile1) + assert c.get_blocks_from_profile(features, profile1) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([1], [1], False), ([0, 1], [1, 1], True), ([0, 1, 1, 0], [-1, 1, 1, -2], False), ([-1, -1, 1, 0], [-1, 1, 1, -2], True), ([0, 1, -1], [-2, 1, 1], False)]) def test_left_truncated(self, profile1, profile2, expected): - assert expected == c.left_truncated(profile1, profile2) + assert c.left_truncated(profile1, profile2) == expected @pytest.mark.parametrize("profile1, profile2, expected", [([1], [1], False), ([1, 0], [1, 1], True), ([0, 1, 1, 0], [-1, 1, 1, -2], False), ([-1, 1, -1, 0], [-1, 1, 1, -2], True), ([0, 1, -1], [-2, 1, 1], True)]) def test_right_truncated(self, profile1, profile2, expected): - assert expected == c.right_truncated(profile1, profile2) + assert c.right_truncated(profile1, profile2) == expected class TestListToStr: @@ -264,5 +278,4 @@ def test_empty(self): @pytest.mark.parametrize("element_list, sep, expected", [([1], ",", "1"), ([1, 0, 2], ":", "1:0:2")]) def test_not_empty(self, element_list, sep, expected): - assert expected == c.list_to_str(element_list, sep) - + assert c.list_to_str(element_list, sep) == expected diff --git a/tests/test_long_read_assigner.py b/tests/test_long_read_assigner.py index f20a2420..10c31c7e 100644 --- a/tests/test_long_read_assigner.py +++ b/tests/test_long_read_assigner.py @@ -72,10 +72,10 @@ def test_different_length(self, read_gene_profile, isoform_profiles, hint, expec self.check(read_gene_profile, isoform_profiles, hint, expected) def check(self, read_gene_profile, isoform_profiles, hint, expected): - assert expected == self.assigner.match_profile(read_gene_profile, isoform_profiles, hint) + assert self.assigner.match_profile(read_gene_profile, isoform_profiles, hint) == expected expected = sorted([x[0] for x in expected if x[1] == 0]) - assert expected == self.assigner.find_matching_isoforms(MappedReadProfile(read_gene_profile, None, None, None), - isoform_profiles, hint) + assert self.assigner.find_matching_isoforms(MappedReadProfile(read_gene_profile, None, None, None), + isoform_profiles, hint) == expected @pytest.mark.parametrize("read_gene_profile, isoform_profiles, hint, expected", [([-1, 1, -1, 0], dict(id1=[-1, 1, -1, -1], id2=[-1, 1, -1, -2]), @@ -115,45 +115,45 @@ def test_hint(self, read_gene_profile, isoform_profiles, hint, expected): class TestCompareJunctions: gene_info = GeneInfoTuple(IntronProfiles([(50, 60), (80, 100), (80, 110), (80, 150), (200, 210)]), 0, 300) - @pytest.mark.parametrize("junctions, region, delta, expexted", + @pytest.mark.parametrize("junctions, region, delta, expected", [([(50, 60), (80, 100), (200, 210)], (1, 1), 3, [0, 1, -1, 0]), ([(50, 60), (80, 100), (200, 210)], (0, 1), 3, [1, 1, -1, 0])]) - def test_profile_for_junctions_introns(self, junctions, region, delta, expexted): + def test_profile_for_junctions_introns(self, junctions, region, delta, expected): assigner = LongReadAssigner(self.gene_info, Params(delta)) profile = assigner.intron_comparator.profile_for_junctions_introns(junctions, region) - assert expexted == profile.gene_profile + assert profile.gene_profile == expected - @pytest.mark.parametrize("junctions, region, delta, expexted", + @pytest.mark.parametrize("junctions, region, delta, expected", [([(50, 60), (82, 100), (200, 210)], (1, 1), 3, True), ([(48, 61), (80, 98), (200, 210)], (0, 1), 3, True), ([(48, 61), (80, 98), (160, 220)], (0, 1), 3, True), ([(48, 66), (80, 99), (200, 210)], (0, 1), 3, False)]) - def test_profile_for_junctions_introns(self, junctions, region, delta, expexted): + def test_profile_for_junctions_introns(self, junctions, region, delta, expected): assigner = LongReadAssigner(self.gene_info, Params(delta)) - assert expexted == assigner.intron_comparator.are_known_introns(junctions, region) + assert assigner.intron_comparator.are_known_introns(junctions, region) == expected - @pytest.mark.parametrize("read_intron_read_profile, read_region, read_introns, delta, expexted", + @pytest.mark.parametrize("read_intron_read_profile, read_region, read_introns, delta, expected", [([0, 1, 1], (0, 300), [(30, 60), (82, 100), (200, 210)], 3, MatchEventSubtype.extra_intron_flanking_left), ([0, 1, 1], (0, 300), [(5, 60), (82, 100), (200, 210)], 3, MatchEventSubtype.fake_terminal_exon_left), ([1, 1, 0], (0, 300), [(50, 60), (82, 100), (200, 210)], 3, MatchEventSubtype.extra_intron_flanking_right), ([1, 1, 0], (0, 300), [(50, 60), (82, 100), (200, 292)], 3, MatchEventSubtype.fake_terminal_exon_right)]) - def test_add_extra_out_exon_events(self, read_intron_read_profile, read_region, read_introns, delta, expexted): + def test_add_extra_out_exon_events(self, read_intron_read_profile, read_region, read_introns, delta, expected): assigner = LongReadAssigner(self.gene_info, Params(delta)) assigner.params.max_fake_terminal_exon_len = 10 events = [] assigner.intron_comparator.add_extra_out_exon_events(events, read_intron_read_profile, read_region, read_introns, 100) - assert expexted == events[0].event_type + assert events[0].event_type == expected - @pytest.mark.parametrize("read_region, junctions, region, delta, expexted", + @pytest.mark.parametrize("read_region, junctions, region, delta, expected", [((0, 300), [(50, 60), (82, 100), (200, 210)], (0, 0), 3, True), ((0, 300), [(48, 61), (80, 98), (200, 210)], (0, 1), 3, True), ((0, 300), [(38, 61), (80, 98), (160, 220)], (0, 0), 3, False), ((50, 300), [(58, 76), (80, 97), (130, 210)], (0, 1), 3, False)]) - def test_profile_for_junctions_introns(self, read_region, junctions, region, delta, expexted): + def test_profile_for_junctions_introns(self, read_region, junctions, region, delta, expected): assigner = LongReadAssigner(self.gene_info, Params(delta)) assigner.params.max_suspicious_intron_abs_len = 20 assigner.params.max_suspicious_intron_rel_len = 0.2 - assert expexted == assigner.intron_comparator.are_suspicious_introns(read_region, junctions, region) + assert assigner.intron_comparator.are_suspicious_introns(read_region, junctions, region) == expected @pytest.mark.parametrize("read_junctions, read_region, isoform_junctions, isoform_region, delta", [([], (20, 200), [], (20, 290), 1)])