From a8fe91f13a98160c8f151a557295f805d69301b0 Mon Sep 17 00:00:00 2001 From: Jeremy Arbesfeld Date: Mon, 8 Apr 2024 13:33:09 -0400 Subject: [PATCH] Work in progress, refactor part 3 --- src/dcd_mapping/lookup.py | 9 +- src/dcd_mapping/new_vrs_map.py | 373 +++++++++++++++++++++++++++++++++ src/dcd_mapping/resources.py | 22 +- src/dcd_mapping/schemas.py | 6 +- src/dcd_mapping/transcripts.py | 23 +- src/dcd_mapping/vrs_map.py | 13 +- 6 files changed, 421 insertions(+), 25 deletions(-) create mode 100644 src/dcd_mapping/new_vrs_map.py diff --git a/src/dcd_mapping/lookup.py b/src/dcd_mapping/lookup.py index 26df3bc..f50bf79 100644 --- a/src/dcd_mapping/lookup.py +++ b/src/dcd_mapping/lookup.py @@ -12,6 +12,7 @@ import polars as pl import requests +from biocommons.seqrepo import SeqRepo from cool_seq_tool.app import CoolSeqTool from cool_seq_tool.handlers.seqrepo_access import SeqRepoAccess from cool_seq_tool.schemas import TranscriptPriority @@ -55,7 +56,8 @@ def __new__(cls) -> CoolSeqTool: :return: singleton instance of CoolSeqTool """ if not hasattr(cls, "instance"): - cls.instance = CoolSeqTool() + sr = SeqRepo("/usr/local/share/seqrepo/latest", writeable = True) + cls.instance = CoolSeqTool(sr=sr) return cls.instance @@ -367,14 +369,13 @@ def get_sequence( # -------------------------------- VRS-Python -------------------------------- # -def translate_hgvs_to_vrs(hgvs: str, data_proxy: SeqRepoDataProxy) -> Allele: +def translate_hgvs_to_vrs(hgvs: str) -> Allele: """Convert HGVS variation description to VRS object. :param hgvs: MAVE-HGVS variation string - :param data_proxy: :return: Corresponding VRS allele as a Pydantic class """ - tr = TranslatorBuilder(data_proxy) + tr = TranslatorBuilder(CoolSeqToolBuilder().seqrepo_access) allele = tr.translate_from(hgvs, "hgvs") if ( diff --git a/src/dcd_mapping/new_vrs_map.py b/src/dcd_mapping/new_vrs_map.py new file mode 100644 index 0000000..f1b507d --- /dev/null +++ b/src/dcd_mapping/new_vrs_map.py @@ -0,0 +1,373 @@ +"""Map transcripts to VRS objects. + +Outstanding tasks/questions: +--------------------------- +* Make sure typed digests vs full IDs vs plain digests are all being used correctly +* What is ``vrs_ref_allele_seq`` in output? +* Add basic transcript description where available +* Add HGVS expressions to alleles where available +""" +import functools +import logging +from typing import Dict, List, Optional, Union + +import click +from Bio.Seq import Seq +from cool_seq_tool.schemas import AnnotationLayer, Strand +from ga4gh.core import ga4gh_identify, sha512t24u +from ga4gh.vrs._internal.models import ( + Allele, + Haplotype, +) +from ga4gh.vrs.dataproxy import SeqRepoDataProxy +from ga4gh.vrs.normalize import normalize + +from dcd_mapping.lookup import ( + get_chromosome_identifier, + get_seqrepo, + translate_hgvs_to_vrs, +) +from dcd_mapping.schemas import ( + AlignmentResult, + ScoreRow, + ScoresetMetadata, + TargetSequenceType, + TargetType, + TxSelectResult, + VrsMapping, + VrsMappingResult, +) + +__all__ = ["vrs_map"] + + +_logger = logging.getLogger(__name__) + + +class VrsMapError(Exception): + """Raise in case of VRS mapping errors.""" + +def _create_hgvs_strings( + alignment: AlignmentResult, raw_description: str, layer: AnnotationLayer +) -> List[str]: + chrom_ac = get_chromosome_identifier(alignment.chrom) + if '[' in raw_description: + descr_list = list(set(raw_description[1:-1].split(";"))) + hgvs_strings = [f"{chrom_ac}:{layer.value}.{d}" for d in descr_list] + else: + descr_list = [raw_description] + hgvs_strings = [f"{chrom_ac}:{d}" for d in descr_list] + return hgvs_strings + +def _map_protein_coding_pro( + row: ScoreRow, + score: float, + align_result: AlignmentResult, + sequence: str, + sequence_id: str, + transcript: TxSelectResult, +) -> Optional[VrsMapping]: + """Construct VRS object mapping for ``hgvs_pro`` variant column entry + + These arguments are a little lazy and could be pruned down later + + :param row: + :param score: + :param align_result: + :param sequence: + :param sequence_id: + :param transcript: + :return: VRS mapping object if mapping succeeds + """ + if row.hgvs_pro in {"_wt", "_sy", "p.="}: + _logger.warning( + f"Can't process Enrich2-style variant syntax {row.hgvs_nt} for {row.accession}" + ) + return None + if row.hgvs_pro.startswith("NP_009225.1:p."): # This is for experiment set 97 + vrs_variation = translate_hgvs_to_vrs(row.hgvs_pro) + return VrsMapping( + mavedb_id=row.accession, + pre_mapped=vrs_variation, + post_mapped=vrs_variation, + score=score, + ) + layer = AnnotationLayer.PROTEIN + hgvs_strings = _create_hgvs_strings(align_result, row.hgvs_pro, layer) + mapping = VrsMapping( + mavedb_id=row.accession, + score=score, + pre_mapped=_get_variation( + hgvs_strings, + layer, + sequence_id, + sequence, + align_result, + True, + ), + post_mapped=_get_variation( + hgvs_strings, + layer, + sequence_id, + sequence, + align_result, + False, + transcript.start, + ), + ) + return mapping + +def _get_allele_sequence(allele: Allele) -> str: + """Get sequence for Allele + + :param allele: VRS allele + :return: sequence + """ + sr = get_seqrepo() + start = allele.location.start # type: ignore + end = allele.location.end # type: ignore + base = sr.sr[allele.location.sequenceReference.refgetAccession] # type: ignore + selection = base[start:end] + return selection + + +def _map_protein_coding( + metadata: ScoresetMetadata, + records: List[ScoreRow], + transcript: TxSelectResult, + align_result: AlignmentResult, +) -> VrsMappingResult: + """Perform mapping on protein coding experiment results + + :param metadata: + :param records: + :param align_results: + """ + variations = VrsMappingResult(variations=[]) + if metadata.target_sequence_type == TargetSequenceType.DNA: + sequence = str(Seq(metadata.target_sequence).translate(table="1", stop_symbol="")) + else: + sequence = metadata.target_sequence + + sequence_id = f"ga4gh:SQ.{sha512t24u(sequence.encode('ascii'))}" + alias_dict_list = [{'namespace': 'ga4gh', 'alias': sequence_id}] + get_seqrepo().sr.store(sequence, nsaliases = alias_dict_list) # Add custom digest to SeqRepo + + for row in records: + score = row.score + + # hgvs_pro + hgvs_pro_mappings = _map_protein_coding_pro( + row, + score, + align_result, + sequence, + sequence_id, + transcript, + ) + if hgvs_pro_mappings: + variations.variations.append(hgvs_pro_mappings) + if row.hgvs_nt == "NA": + continue + else: + layer = AnnotationLayer.GENOMIC + hgvs_strings = _create_hgvs_strings(align_result, row.hgvs_nt[2:], layer) + variations.variations.append( + VrsMapping( + mavedb_id=row.accession, + score=score, + pre_mapped=_get_variation( + hgvs_strings, + layer, + sequence_id, + sequence, + align_result, + True, + ), + post_mapped=_get_variation( + hgvs_strings, + layer, + sequence_id, + sequence, + align_result, + False, + ), + ) + ) + return variations + +def _map_regulatory_noncoding( + metadata: ScoresetMetadata, + records: List[ScoreRow], + align_result: AlignmentResult, +) -> VrsMappingResult: + """Perform mapping on noncoding/regulatory experiment results + + :param metadata: metadata for URN + :param records: list of MAVE experiment result rows + :param align_result: + :return: TODO + """ + variations = VrsMappingResult(variations=[]) + sequence_id = f"ga4gh:SQ.{sha512t24u(metadata.target_sequence.encode('ascii'))}" + alias_dict_list = [{'namespace': 'ga4gh', 'alias': sequence_id}] + get_seqrepo().sr.store(metadata.target_sequence, nsaliases = alias_dict_list) # Add custom digest to SeqRepo + + for row in records: + if row.hgvs_nt in {"_wt", "_sy"}: + _logger.warning( + f"Can't process Enrich2-style variant syntax {row.hgvs_nt} for {metadata.urn}" + ) + continue + score = row.score + hgvs_strings = _create_hgvs_strings( + align_result, row.hgvs_nt[2:], AnnotationLayer.GENOMIC + ) + pre_map_allele = _get_variation( + hgvs_strings, + AnnotationLayer.GENOMIC, + sequence_id, + metadata.target_sequence, + align_result, + pre_map=True, + offset=0, + ) + post_map_allele = _get_variation( + hgvs_strings, + AnnotationLayer.GENOMIC, + sequence_id, + metadata.target_sequence, + align_result, + pre_map=False, + offset=0, + ) + variations.variations.append( + VrsMapping( + pre_mapped=pre_map_allele, + post_mapped=post_map_allele, + mavedb_id=row.accession, + score=score, + ) + ) + return variations + +def vrs_map( + metadata: ScoresetMetadata, + align_result: AlignmentResult, + transcript: Optional[TxSelectResult], + records: List[ScoreRow], + silent: bool = True, +) -> Optional[VrsMappingResult]: + """Given a description of a MAVE scoreset and an aligned transcript, generate + the corresponding VRS objects. + + :param metadata: salient MAVE scoreset metadata + :param transcript: output of transcript selection process + :param records: scoreset records + :param silent: + :return: TODO + """ + msg = f"Mapping {metadata.urn} to VRS..." + if not silent: + click.echo(msg) + _logger.info(msg) + if metadata.target_gene_category == TargetType.PROTEIN_CODING and transcript: + return _map_protein_coding(metadata, records, transcript, align_result) + else: + return _map_regulatory_noncoding(metadata, records, align_result) + +def _get_variation( + hgvs_strings: List[str], + layer: AnnotationLayer, + sequence_id: str, + sequence: str, + alignment: AlignmentResult, + pre_map: bool, + offset: int = 0, +) -> Union[Allele, List[Allele]]: + """Create variation (haplotype/allele). + + Outstanding questions: + --------------------- + * Make a class to shadow the seqrepo data proxy that handles custom sequences + without adding them to the system seqrepo. As it currently stands, I wouldn't + expect this code to complete successfully. + * Do we really need to go through an HGVS string/the VRS translator? Can't we just + build the object ourselves? + * trim duplicate code + * simply args + + :param hgvs_strings: + :param layer: annotation layer + :param sequence_id: target sequence digest eg ``"ga4gh:SQ.SQ.jUOcLPDjSqWFEo9kSOG8ITe1dr9QK3h6"`` + :param sequence: target sequence + :param alignment: + :param pre_map: if True, return object for pre mapping stage. Otherwise return for + post-mapping. + :param offset: + :return: + """ + if sequence_id.startswith("ga4gh:"): + sequence_id = sequence_id[6:] + alleles = [] + sequence_store = get_seqrepo() + for hgvs_string in hgvs_strings: + allele = translate_hgvs_to_vrs(hgvs_string) + if "dup" in hgvs_string: + allele.state.sequence = 2 * _get_allele_sequence(allele) # type: ignore + if pre_map: + allele.location.sequenceReference.refgetAccession = sequence_id # type: ignore + else: + if layer == AnnotationLayer.PROTEIN: + allele.location.start += offset # type: ignore + allele.location.end += offset # type: ignore + else: + start: int = allele.location.start # type: ignore + if ( + len(alignment.query_subranges) == 1 + and alignment.strand == Strand.POSITIVE + ): + subrange_start = alignment.query_subranges[0].start + diff = start - subrange_start + diff2: int = allele.location.end - start # type: ignore + allele.location.start = subrange_start + diff + allele.location.end = allele.location.start + diff2 # type: ignore + else: + for query_subrange, hit_subrange in zip( + alignment.query_subranges, alignment.hit_subranges + ): + if start >= query_subrange.start and start < query_subrange.end: + query_subrange_start = query_subrange.start + hit_subrange_start = hit_subrange.start + break + else: + raise ValueError( + "Could not find hit subrange compatible with allele position" + ) + diff = start - query_subrange_start + diff2: int = allele.location.end - start # type: ignore + if alignment.strand == Strand.POSITIVE: # positive orientation + allele.location.start = hit_subrange_start + diff + allele.location.end = allele.location.start + diff2 # type: ignore + if "dup" in hgvs_string: + allele.state.sequence = 2 * _get_allele_sequence(allele) # type: ignore + else: + allele.location.start = hit_subrange_start - diff - diff2 + allele.location.end = allele.location.start + diff2 # type: ignore + if "dup" in hgvs_string: + allele.state.sequence = 2 * _get_allele_sequence(allele) # type: ignore + allele.state.sequence = str( + Seq(str(allele.state.sequence)).reverse_complement() + ) + if allele.state.sequence == "N" and layer != AnnotationLayer.PROTEIN: + allele.state.sequence = _get_allele_sequence(allele) # type: ignore + if '=' in hgvs_string and layer == AnnotationLayer.PROTEIN: + allele.state.sequence = _get_allele_sequence(allele) + allele = normalize(allele, data_proxy=sequence_store) + allele.id = ga4gh_identify(allele) + alleles.append(allele) + + if len(alleles) == 1: + return alleles[0] + else: + return alleles \ No newline at end of file diff --git a/src/dcd_mapping/resources.py b/src/dcd_mapping/resources.py index 389031e..18805de 100644 --- a/src/dcd_mapping/resources.py +++ b/src/dcd_mapping/resources.py @@ -196,6 +196,7 @@ def get_scoreset_metadata(scoreset_urn: str) -> ScoresetMetadata: :raise ResourceAcquisitionError: if unable to acquire metadata """ metadata = get_raw_scoreset_metadata(scoreset_urn) + if not _metadata_response_is_human(metadata): raise ResourceAcquisitionError( f"Experiment for {scoreset_urn} contains no human targets" @@ -236,15 +237,18 @@ def get_scoreset_records(scoreset_urn: str, silent: bool = True) -> List[ScoreRo changes and scores :raise ResourceAcquisitionError: if unable to fetch file """ - scores_csv = LOCAL_STORE_PATH / f"{scoreset_urn}_scores.csv" - # TODO use smarter/more flexible caching methods - if not scores_csv.exists(): - url = f"https://api.mavedb.org/api/v1/score-sets/{scoreset_urn}/scores" - try: - _http_download(url, scores_csv, silent) - except requests.HTTPError: - _logger.error(f"HTTPError when fetching scores CSV from {url}") - raise ResourceAcquisitionError(f"Scores CSV for scoreset {scoreset_urn}") + if scoreset_urn == "urn:mavedb:00000053-a-1": + scores_csv = "analysis_files/00000053-a-1.scores.csv" + else: + scores_csv = LOCAL_STORE_PATH / f"{scoreset_urn}_scores.csv" + # TODO use smarter/more flexible caching methods + if not scores_csv.exists(): + url = f"https://api.mavedb.org/api/v1/score-sets/{scoreset_urn}/scores" + try: + _http_download(url, scores_csv, silent) + except requests.HTTPError: + _logger.error(f"HTTPError when fetching scores CSV from {url}") + raise ResourceAcquisitionError(f"Scores CSV for scoreset {scoreset_urn}") scores_data: List[ScoreRow] = [] with open(scores_csv, "r") as csvfile: diff --git a/src/dcd_mapping/schemas.py b/src/dcd_mapping/schemas.py index d9fba2b..10fce46 100644 --- a/src/dcd_mapping/schemas.py +++ b/src/dcd_mapping/schemas.py @@ -128,10 +128,10 @@ class VrsMapping(BaseModel): """Define pre-post mapping pair structure for VRS-structured variations.""" mavedb_id: StrictStr - pre_mapped: Union[Allele, Haplotype] - post_mapped: Union[Allele, Haplotype] + pre_mapped: Union[Allele, List[Allele]] + post_mapped: Union[Allele, List[Allele]] mapped_transcript: Optional[TranscriptDescription] = None - score: StrictFloat + score: Union[StrictFloat,str] # relation: Literal["SO:is_homologous_to"] = "SO:is_homologous_to" diff --git a/src/dcd_mapping/transcripts.py b/src/dcd_mapping/transcripts.py index 37bedb7..c138c4c 100644 --- a/src/dcd_mapping/transcripts.py +++ b/src/dcd_mapping/transcripts.py @@ -267,6 +267,10 @@ def _offset_target_sequence(metadata: ScoresetMetadata, records: List[ScoreRow]) protein_sequence = _get_protein_sequence(metadata.target_sequence) offset = 0 + + if protein_sequence in seq: + return offset + for i, base in enumerate(protein_sequence): if all( [ @@ -284,6 +288,7 @@ def _offset_target_sequence(metadata: ScoresetMetadata, records: List[ScoreRow]) offset = 0 else: offset = i + break return offset @@ -309,9 +314,15 @@ async def select_transcript( click.echo(msg) _logger.info(msg) - if metadata.urn in {"urn:mavedb:00000053-a-1", "urn:mavedb:00000053-a-2"}: - # target sequence for these scoresets is missing codon - return None + if metadata.urn.startswith("urn:mavedb:00000097"): + # Score Sets in Experiment 97 are expressed in full HGVS strings, + # so additional mapping is not needed + return TxSelectResult(nm="NM_007294.3", + np="NP_009225.1", + start=0, + is_full_match=False, + transcript_mode=TranscriptPriority.MANE_SELECT, + sequence=_get_protein_sequence(metadata.target_sequence)) if metadata.target_gene_category == TargetType.PROTEIN_CODING: transcript_reference = await _select_protein_reference(metadata, align_result) @@ -326,6 +337,12 @@ async def select_transcript( # can't provide transcripts for regulatory/noncoding scoresets return None + if (metadata.urn.startswith("urn:mavedb:00000047") + or metadata.urn.startswith("urn:mavedb:00000048")): + # Set start = 0 as there is discordance between expected and actual + # amino acid locations + transcript_reference.start = 0 + msg = "Reference selection complete." if not silent: click.echo(msg) diff --git a/src/dcd_mapping/vrs_map.py b/src/dcd_mapping/vrs_map.py index 4408343..48967ad 100644 --- a/src/dcd_mapping/vrs_map.py +++ b/src/dcd_mapping/vrs_map.py @@ -222,7 +222,6 @@ def _map_protein_coding( row.accession, ) continue - # hgvs_pro hgvs_pro_mappings = _map_protein_coding_pro( row, @@ -238,12 +237,12 @@ def _map_protein_coding( if row.hgvs_nt == "NA": continue # TODO is it correct to be skipping this? can we project down? - if "97" in metadata.urn: + if metadata.urn.startswith("urn:mavedb:00000097"): _logger.warning(f"Skipping hgvs_nt for {metadata.urn}") continue # TODO more information about this else: layer = AnnotationLayer.GENOMIC - hgvs_strings = _create_hgvs_strings(align_result, row.hgvs_nt, layer) + hgvs_strings = _create_hgvs_strings(align_result, row.hgvs_nt[2:], layer) variations.variations.append( VrsMapping( mavedb_id=row.accession, @@ -275,10 +274,10 @@ def _create_hgvs_strings( chrom_ac = get_chromosome_identifier(alignment.chrom) if chr(91) in raw_description: descr_list = list(set(raw_description[1:-1].split(";"))) + hgvs_strings = [f"{chrom_ac}:{layer.value}.{d}" for d in descr_list] else: descr_list = [raw_description] - hgvs_strings = [f"{chrom_ac}:{d}" for d in descr_list] - # hgvs_strings = [f"{chrom_ac}:{layer.value}.{d}" for d in descr_list] + hgvs_strings = [f"{chrom_ac}:{d}" for d in descr_list] return hgvs_strings @@ -333,7 +332,7 @@ def _get_variation( sequence_store = SequenceStore() sequence_store.local_sequences[sequence_id] = sequence for hgvs_string in hgvs_strings: - allele = translate_hgvs_to_vrs(hgvs_string, sequence_store) + allele = translate_hgvs_to_vrs(hgvs_string) if "dup" in hgvs_string: allele.state.sequence = 2 * _get_allele_sequence(allele) # type: ignore if pre_map: @@ -378,6 +377,8 @@ def _get_variation( ) if allele.state.sequence == "N" and layer != AnnotationLayer.PROTEIN: allele.state.sequence = _get_allele_sequence(allele) # type: ignore + if '=' in hgvs_string and layer == AnnotationLayer.PROTEIN: + allele.state.sequence = _get_allele_sequence(allele) allele = normalize(allele, data_proxy=sequence_store) allele.id = ga4gh_identify(allele) alleles.append(allele)