Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iupac #32

Merged
merged 4 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed ExampleData/advntr_example.vcf.gz
Binary file not shown.
1 change: 1 addition & 0 deletions ExampleData/command.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
EnsembleTR --vcfs eh_example_smaller.vcf.gz,hipstr_example_smaller.vcf.gz,gangstr_example_smaller.vcf.gz --out ensemble_example.vcf --ref hg38.analysisSet.fa
Binary file removed ExampleData/eh_example.vcf.gz
Binary file not shown.
Binary file added ExampleData/eh_example_smaller.vcf.gz
Binary file not shown.
Binary file removed ExampleData/gangstr_example.vcf.gz
Binary file not shown.
Binary file added ExampleData/gangstr_example_smaller.vcf.gz
Binary file not shown.
Binary file removed ExampleData/hipstr_example.vcf.gz
Binary file not shown.
Binary file added ExampleData/hipstr_example_smaller.vcf.gz
Binary file not shown.
24 changes: 12 additions & 12 deletions ensembletr/recordcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import trtools.utils.tr_harmonizer as trh
from trtools.utils.utils import GetCanonicalMotif
from collections import defaultdict
from enum import Enum
import networkx as nx
Expand Down Expand Up @@ -39,7 +38,7 @@ def __init__(self, rec, vcf_type, vcf_samples):
self.pos = self.hm_record.pos
if vcf_type.name == 'advntr' or vcf_type.name == 'eh':
self.pos += 1 # AdVNTR call is 0-based, should change it to 1-based
self.canonical_motif = GetCanonicalMotif(self.hm_record.motif)
self.motif = self.hm_record.motif
self.prepend_seq = ''
self.append_seq = ''
self.vcf_samples = vcf_samples
Expand Down Expand Up @@ -152,22 +151,22 @@ class RecordCluster:
list of record objects to be merged
ref_genome : pyfaidx.Fasta
reference genome
canon_motif : str
canonical repeat motif
motif : str
repeat motif
samples : list of str
List of samples to analyze

"""
def __init__(self, recobjs, ref_genome, canon_motif, samples):
self.canonical_motif = canon_motif
def __init__(self, recobjs, ref_genome, motif, samples):
self.motif = motif
self.vcf_types = [False] * len(convert_type_to_idx.keys())
self.samples = samples
self.fasta = ref_genome
self.record_objs = recobjs
self.first_pos = -1
self.last_pos = -1
self.chrom = recobjs[0].cyvcf2_record.CHROM
self.update()
self.update(motif)
self.hipstr_allele_frequency = {}
for ro in self.record_objs:
if ro.vcf_type.name == "hipstr":
Expand All @@ -183,7 +182,7 @@ def GetHipSTR_freqs(self, ro):
freqs[call[1]] += 1
return freqs

def AppendRecordObject(self, ro):
def AppendRecordObject(self, ro, mutual_motif):
r"""
Add a record object to the RecordCluster
and update associated metadata
Expand All @@ -194,9 +193,9 @@ def AppendRecordObject(self, ro):
the Record Object to be added
"""
self.record_objs.append(ro)
self.update()
self.update(mutual_motif)

def update(self):
def update(self, mutual_motif):
r"""
Update prepend/append sequences
of individual record objects so they
Expand All @@ -206,6 +205,7 @@ def update(self):
"""
self.first_pos = min([rec.pos for rec in self.record_objs])
self.last_end = max([rec.cyvcf2_record.end for rec in self.record_objs])
self.motif = mutual_motif.upper()

print(
f"Analysing record cluster ranged in {self.record_objs[0].cyvcf2_record.CHROM}:{self.first_pos}-{self.last_end}.")
Expand Down Expand Up @@ -291,10 +291,10 @@ class OverlappingRegion:
def __init__(self, rcs):
self.RecordClusters = rcs

def GetCanonicalMotifs(self):
def GetMotifs(self):
ret = []
for rc in self.RecordClusters:
ret.append(rc.canonical_motif)
ret.append(rc.motif)
return ret

class Allele:
Expand Down
26 changes: 23 additions & 3 deletions ensembletr/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
from .. import utils
import pytest
from ensembletr import utils

def test_EHScore():
score = utils.GetEHScore("14-14/14-14", "14/14", 2)
assert(score == 1)
assert(score == 1)

def test_motif_equality():
assert(utils.MotifEquality("AGRG", 'AGGG') == True)
assert (utils.MotifEquality("AGG", 'GGR') == True)
assert (utils.MotifEquality("GBG", 'GGG') == True)
assert (utils.MotifEquality("GHG", 'GGG') == False)
assert (utils.MotifEquality("NHG", 'GAC') == True)
assert (utils.MotifEquality("NHG", 'HAC') == True)
assert (utils.MotifEquality("MHG", 'AGG') == False)

def test_all_possible_seqs():
assert(utils.GetAllPossibleSequences("R") == ['A', 'G'])
assert (utils.GetAllPossibleSequences("M") == ['A', 'C'])
assert (utils.GetAllPossibleSequences("TM") == ['TA', 'TC'])
assert (sorted(utils.GetAllPossibleSequences("BG")) == sorted(['CG', 'GG','TG']))
assert (len(utils.GetAllPossibleSequences("BGRR")) == 12)
assert (len(utils.GetAllPossibleSequences("BGBB")) == 27)
assert ('CGGA' in utils.GetAllPossibleSequences("BGDH"))
assert (utils.GetAllPossibleSequences('AGTT') == ['AGTT'])

test_all_possible_seqs()
67 changes: 67 additions & 0 deletions ensembletr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
"""

import math
import trtools.utils.utils as utils

IUPAC_map_dict = {'R': ['A', 'G'], 'Y': ['C', 'T'], 'S': ['G', 'C'],
'W': ['A', 'T'], 'K': ['G', 'T'], 'M': ['A', 'C'], 'B': ['G', 'C', 'T'],
'D': ['A', 'G', 'T'], 'H': ['A', 'C', 'T'], 'V': ['A', 'C', 'G'],
'N': ['A', 'C', 'G', 'T']}

def GetEHScore(conf_invs, CNs, ru_len):
r"""
Expand Down Expand Up @@ -57,3 +63,64 @@ def CalcEHAlleleScore(conf_inv, allele):
if allele == 0:
return 1/math.exp(4 * (dist))
return 1/math.exp(4 * (dist) / int(allele))


def MotifEquality(motif_1, motif_2):
r"""
Compute if two motifs are equal to each other, considering IUPAC characters
This code computes the list of all possible sequences that each motif can be
then returns True if there is any overlap between the set of sequences for motif_1
and motif_2.

Parameters
----------
motif_1 : str
first motif
motif_2 : str
second motif

Returns
-------
Equality : bool, mutual motif
"""
if len(motif_1) != len(motif_2):
return False,None
potential_sequences_1 = [utils.GetCanonicalMotif(motif) for motif in GetAllPossibleSequences(motif_1)]
potential_sequences_2 = [utils.GetCanonicalMotif(motif) for motif in GetAllPossibleSequences(motif_2)]
Comment on lines +88 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be slightly faster to use sets instead of lists?

Suggested change
potential_sequences_1 = [utils.GetCanonicalMotif(motif) for motif in GetAllPossibleSequences(motif_1)]
potential_sequences_2 = [utils.GetCanonicalMotif(motif) for motif in GetAllPossibleSequences(motif_2)]
potential_sequences_1 = {utils.GetCanonicalMotif(motif) for motif in GetAllPossibleSequences(motif_1)}
potential_sequences_2 = {utils.GetCanonicalMotif(motif) for motif in GetAllPossibleSequences(motif_2)}

for seq1 in potential_sequences_1:
if seq1 in potential_sequences_2:
return True, seq1
return False, None


def GetAllPossibleSequences(motif):
r"""
Computing all the possible sequences that a motif can be considering IUPAC characters.
For example, a motif with sequence RGG can be both AGG and GGG. Divide and conquer method is
used to form all possible sequences.

Parameters
----------
motif : str
motif

Returns
-------
All possible sequences : list of str
"""
possible_seqs = []
if len(motif) < 1:
return []
elif len(motif) == 1:
if motif[0] in IUPAC_map_dict:
return IUPAC_map_dict[motif[0]]
else:
return motif[0]
else:
first_part = GetAllPossibleSequences(motif[0:int(len(motif)/2)])
second_part = GetAllPossibleSequences(motif[int(len(motif)/2):])
for seq1 in first_part:
for seq2 in second_part:
possible_seqs.append(seq1 + seq2)
return possible_seqs
Comment on lines +96 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you memoize this function to get a speedup? or would that require too much memory?

Suggested change
def GetAllPossibleSequences(motif):
r"""
Computing all the possible sequences that a motif can be considering IUPAC characters.
For example, a motif with sequence RGG can be both AGG and GGG. Divide and conquer method is
used to form all possible sequences.
Parameters
----------
motif : str
motif
Returns
-------
All possible sequences : list of str
"""
possible_seqs = []
if len(motif) < 1:
return []
elif len(motif) == 1:
if motif[0] in IUPAC_map_dict:
return IUPAC_map_dict[motif[0]]
else:
return motif[0]
else:
first_part = GetAllPossibleSequences(motif[0:int(len(motif)/2)])
second_part = GetAllPossibleSequences(motif[int(len(motif)/2):])
for seq1 in first_part:
for seq2 in second_part:
possible_seqs.append(seq1 + seq2)
return possible_seqs
@lru_cache(maxsize=None)
def GetAllPossibleSequences(motif):
r"""
Computing all the possible sequences that a motif can be considering IUPAC characters.
For example, a motif with sequence RGG can be both AGG and GGG. Divide and conquer method is
used to form all possible sequences.
Parameters
----------
motif : str
motif
Returns
-------
All possible sequences : set of str
"""
if len(motif) == 1:
return IUPAC_map_dict.get(motif, [motif])
first_part = GetAllPossibleSequences(motif[:len(motif)//2])
second_part = GetAllPossibleSequences(motif[len(motif)//2:])
return {f1 + f2 for f1 in first_part for f2 in second_part}

you might need to add this at the top:

from functools import lru_cache


16 changes: 9 additions & 7 deletions ensembletr/vcfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import trtools.utils.common as common
import trtools.utils.utils as utils
from . import utils as ensembleutils
import trtools.utils.mergeutils as mergeutils
import trtools.utils.tr_harmonizer as trh
import cyvcf2
Expand Down Expand Up @@ -215,7 +216,7 @@ def getOverlapMinRecords(self):
def getMergableCalls(self):
r"""
Determine which calls are mergeable
Make one list of record clusters for each canonical motif
Make one list of record clusters for each motif

Returns
-------
Expand All @@ -227,15 +228,16 @@ def getMergableCalls(self):
if self.is_overlap_min[i] and self.current_tr_records[i] is not None:
curr_ro = recordcluster.RecordObj(self.current_tr_records[i].vcfrecord, self.vcfwrappers[i].vcftype,
self.samples_list[i])
canon_motif = utils.GetCanonicalMotif(curr_ro.hm_record.motif)
motif = curr_ro.hm_record.motif
added = False
for rc in record_cluster_list:
if rc.canonical_motif == canon_motif:
rc.AppendRecordObject(curr_ro)
equal, mutual_motif = ensembleutils.MotifEquality(motif, rc.motif)
if equal:
rc.AppendRecordObject(curr_ro, mutual_motif)
added = True
if not added:
record_cluster_list.append(recordcluster.RecordCluster([curr_ro], self.ref_genome,
canon_motif, self.samples))
motif, self.samples))
ov_region = recordcluster.OverlappingRegion(record_cluster_list)
return ov_region

Expand Down Expand Up @@ -342,8 +344,8 @@ def WriteRecord(self, rcres):
FILTER = "."
INFO_DICT = {'START': rcres.record_cluster.first_pos,
'END': rcres.record_cluster.first_pos + len(REF) - 1,
'PERIOD': len(rcres.record_cluster.canonical_motif),
'RU': rcres.record_cluster.canonical_motif,
'PERIOD': len(rcres.record_cluster.motif),
'RU': rcres.record_cluster.motif,
'METHODS': "|".join([str(int(item)) for item in rcres.record_cluster.vcf_types])}
INFO = ";".join(["%s=%s"%(key, INFO_DICT[key]) for key in INFO_DICT])
FORMAT = ['GT','GB', 'NCOPY','EXP','SCORE','GTS','ALS','INPUTS']
Expand Down
Loading