diff --git a/.gitignore b/.gitignore index ae6e8d1..dbbbf78 100644 --- a/.gitignore +++ b/.gitignore @@ -160,4 +160,7 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -*.pickle \ No newline at end of file +*.pickle + +# BLAT query file directory +mavedb_mapping/tmp/* diff --git a/mavedb_mapping/align.py b/mavedb_mapping/align.py index 791be4d..09b1bdd 100644 --- a/mavedb_mapping/align.py +++ b/mavedb_mapping/align.py @@ -1 +1,114 @@ -"""Align MaveDB target sequences to the human genome.""" +"""Align MaveDB target sequences to a human reference genome.""" +import subprocess +from pathlib import Path +from typing import List, Optional + +from Bio.SearchIO import read as read_blat +from Bio.SearchIO._model import QueryResult +from Bio.SearchIO._model.hsp import HSP + +from mavedb_mapping.resources import get_mapping_tmp_dir, get_ref_genome_file +from mavedb_mapping.schemas import ( + ScoresetMetadata, + TargetSequenceType, +) + + +class AlignmentError(Exception): + """Raise when errors encountered during alignment.""" + + +def _build_query_file(scoreset_metadata: ScoresetMetadata) -> Path: + """Construct BLAT query file. + + :param scoreset_metadata: MaveDB scoreset metadata object + :return: Path to constructed file + """ + query_file = get_mapping_tmp_dir() / "blat_query.fa" + with open(query_file, "w") as f: + f.write(">" + "query" + "\n") + f.write(scoreset_metadata.target_sequence + "\n") + f.close() + return query_file + + +def _run_blat(scoreset_metadata: ScoresetMetadata) -> QueryResult: + """Run a BLAT query and returns the output. + + How to handle the BLAT binary is a big TODO. There are some newish Python packages + that wrap it/provide bindings -- might be best to look there. + + :param scoreset_metadata: object containing scoreset attributes + :param ref_genome_path: location of reference genome file + :return: BLAT query result + :raise AlignmentError: if BLAT subprocess returns error code + """ + query_file = _build_query_file(scoreset_metadata) + reference_genome_file = get_ref_genome_file() + min_score = len(scoreset_metadata.target_sequence) // 2 # minimum match 50% + out_file = get_mapping_tmp_dir() / "blat_out.psl" + + if scoreset_metadata.target_sequence_type == TargetSequenceType.PROTEIN: + command = f"blat {reference_genome_file} -q=prot -t=dnax -minScore={min_score} {query_file.absolute()} {out_file.absolute()}" + else: + # missing `-t=dnax`? + command = f"blat {reference_genome_file} -minScore={min_score} {query_file.absolute()} {out_file.absolute()}" + process = subprocess.run( + command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT + ) + if process.returncode != 0: + raise AlignmentError( + f"BLAT process returned error code {process.returncode}: {command}" + ) + + # seems like maybe this hits an error resolved by adding -q=dnax sometimes? + # investigate, refer to older code if it comes up + output = read_blat("blat_out.psl", "blat-psl") + return output + + +def _get_hit_starts(output: QueryResult, hit_index: int) -> List[int]: + """Get hit starts from HSP object. + + :param output: BLAT result object + :return: The starts of hit sequence. + """ + hit_starts: List[int] = [] + for n in range(len(output[hit_index])): + hit_starts.append(output[hit_index][n].hit_start) + return hit_starts + + +def _get_hsp(output: QueryResult) -> HSP: + """Obtain high-scoring pairs (HSP) for query sequence. + + (I am pretty sure this should be refactored) + + :param output: BLAT result object + """ + hit_dict = {} + for c in range(len(output)): + max_hit = output[c][0].score + for e in range(len(output[c])): + if output[c][e].score > max_hit: + max_hit = output[c][e].score + hit_dict[c] = max_hit + + # Using top scoring hit + hit = max(hit_dict, key=hit_dict.get) + hit_starts = _get_hit_starts(output, hit) + hsp = output[hit][hit_starts.index(max(hit_starts))] + return hsp + + +def align(scoreset_metadata: ScoresetMetadata) -> Optional[HSP]: + """Align target sequence to a reference genome. + + :param scoreset_metadata: object containing scoreset metadata + :return: high-scoring pairs (HSP) object + """ + try: + blat_output = _run_blat(scoreset_metadata) + except AlignmentError: + return None + return _get_hsp(blat_output) diff --git a/mavedb_mapping/cache.py b/mavedb_mapping/cache.py index 2c222fd..05530ba 100644 --- a/mavedb_mapping/cache.py +++ b/mavedb_mapping/cache.py @@ -2,6 +2,8 @@ import os from pathlib import Path -LOCAL_STORE_PATH = Path(os.environ.get("MAVEDB_STORAGE_DIR", "~/.mave_db/storage")) +LOCAL_STORE_PATH = Path( + os.environ.get("MAVEDB_STORAGE_DIR", Path.home() / ".mave_db/storage") +) if not LOCAL_STORE_PATH.exists(): LOCAL_STORE_PATH.mkdir(exist_ok=True, parents=True) diff --git a/mavedb_mapping/fetch.py b/mavedb_mapping/fetch.py deleted file mode 100644 index 8d0bdcf..0000000 --- a/mavedb_mapping/fetch.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Acquire MaveDB experiment and scoreset metadata.""" -import csv -import logging -from pathlib import Path -from typing import List, Optional, Set - -import requests -from pydantic import ValidationError - -from mavedb_mapping.cache import LOCAL_STORE_PATH -from mavedb_mapping.schemas import ScoreRow, ScoresetMetadata - -_logger = logging.getLogger("mavedb_mapping") - - -def get_all_scoreset_urns() -> Set[str]: - """Fetch all scoreset URNs. Since species is annotated at the scoreset target level, - we can't yet filter on anything like `homo sapien`. - - :return: set of URN strings - """ - r = requests.get("https://api.mavedb.org/api/v1/experiments/") - r.raise_for_status() - scoreset_urn_lists = [ - experiment["scoreSetUrns"] - for experiment in r.json() - if experiment.get("scoreSetUrns") - ] - return set([urn for urns in scoreset_urn_lists for urn in urns]) - - -def get_all_human_scoreset_urns() -> List[str]: - """Fetch all human scoreset URNs. Pretty slow, shouldn't be used frequently because - it requires requesting every single scoreset. - - :return: list of human scoreset URNs - """ - scoreset_urns = get_all_scoreset_urns() - human_scoresets: List[str] = [] - for urn in scoreset_urns: - r = requests.get(f"https://api.mavedb.org/api/v1/score-sets/{urn}") - try: - r.raise_for_status() - except requests.exceptions.HTTPError: - _logger.info(f"Unable to retrieve scoreset data for URN {urn}") - continue - data = r.json() - - ref_maps = data.get("targetGene", {}).get("referenceMaps", []) - if ref_maps: - for ref_map in ref_maps: - if ref_map.get("genome", {}).get("organismName", "") == "Homo sapiens": - human_scoresets.append(urn) - return human_scoresets - - -def get_scoreset_metadata(scoreset_urn: str) -> Optional[ScoresetMetadata]: - """Acquire metadata for scoreset. - - Runs an HTTP request every time. Ideally, we should allow the option to cache this, - for users who want to work from a stable trove of data (e.g. for reproducibility) or - to lessen pressure on the MaveDB API. - - :param scoreset_urn: URN for scoreset - :return: Object containing salient metadata - """ - r = requests.get(f"https://api.mavedb.org/api/v1/score-sets/{scoreset_urn}") - r.raise_for_status() - data = r.json() - try: - structured_data = ScoresetMetadata( - urn=data["urn"], - target_gene_name=data["targetGene"]["name"], - target_gene_category=data["targetGene"]["category"], - target_sequence=data["targetGene"]["wtSequence"]["sequence"], - target_sequence_type=data["targetGene"]["wtSequence"]["sequenceType"], - ) - except (KeyError, ValidationError): - _logger.warning( - f"Unable to extract metadata from API response for scoreset {scoreset_urn}" - ) - return None - return structured_data - - -def _get_scores_csv(scoreset_urn: str) -> Optional[Path]: - """Acquire scoreset CSV file. - - Currently, if a local version is available, we'll use it. In the future, we should - think about options to force fetch from MaveDB, or check if the cached version is - invalidated. - - :param scoreset_urn: URN for scoreset - :return: Path to downloaded CSV if acquisition succeeds - """ - filename = f"{scoreset_urn.replace(':', ' ')}_scores.csv" - outfile_path = LOCAL_STORE_PATH / filename - if not outfile_path.exists(): - url = f"https://api.mavedb.org/api/v1/score-sets/{scoreset_urn}/scores" - with requests.get(url, stream=True) as r: - r.raise_for_status() - with open(outfile_path, "wb") as h: - for chunk in r.iter_content(chunk_size=8192): - if chunk: - h.write(chunk) - return outfile_path - - -def get_scores_data(scoreset_urn: str) -> List[ScoreRow]: - """Get scoreset records. - - :param scoreset_urn: URN for scoreset - :return: Array of individual ScoreRow objects, containing information like protein - changes and scores - """ - scores_csv = _get_scores_csv(scoreset_urn) - if not scores_csv: - raise Exception(f"Failed to acquire scores CSV for scoreset {scoreset_urn}") - - scores_data: List[ScoreRow] = [] - - with open(scores_csv, "r") as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - scores_data.append(ScoreRow(**row)) - - return scores_data diff --git a/mavedb_mapping/resources.py b/mavedb_mapping/resources.py new file mode 100644 index 0000000..9f29ef9 --- /dev/null +++ b/mavedb_mapping/resources.py @@ -0,0 +1,197 @@ +"""Manage external data resources. + +This module is responsible for handling requests for MaveDB data, such as scoresets +or scoreset metadata. It should also instantiate any external resources needed for +tasks like transcript selection. + +This isn't a priority, but eventually, methods that send data requests to +remote APIs should first check the local mavedb_mapping cache, and the +:py:module:`cache` module should be built out to support cache invalidation, remote +syncing, etc. +""" +import csv +import logging +from importlib import resources as impresources +from pathlib import Path +from typing import List, Set + +import requests +from pydantic import ValidationError +from tqdm import tqdm + +from mavedb_mapping.cache import LOCAL_STORE_PATH +from mavedb_mapping.schemas import ScoreRow, ScoresetMetadata + +_logger = logging.getLogger("mavedb_mapping") + + +class ResourceAcquisitionError(Exception): + """Raise when resource acquisition fails.""" + + +def _http_download(url: str, out_path: Path, show_progress: bool = False) -> Path: + """Download a file via HTTP. + + :param url: location of file to retrieve + :param out_path: location to save file to + :param show_progress: show TQDM progress bar if true + :return: Path if download successful + :raise requests.HTTPError: if request is unsuccessful + """ + with requests.get(url, stream=True) as r: + r.raise_for_status() + total_size = int(r.headers.get("content-length", 0)) + with open(out_path, "wb") as h: + if show_progress: + with tqdm( + total=total_size, + unit="B", + unit_scale=True, + desc=f"Downloading {out_path.name}", + ncols=80, + ) as progress_bar: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + h.write(chunk) + progress_bar.update(len(chunk)) + else: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + h.write(chunk) + return out_path + + +def fetch_all_scoreset_urns() -> Set[str]: + """Fetch all scoreset URNs. Since species is annotated at the scoreset target level, + we can't yet filter on anything like `homo sapien`. + + :return: set of URN strings + """ + r = requests.get("https://api.mavedb.org/api/v1/experiments/") + r.raise_for_status() + scoreset_urn_lists = [ + experiment["scoreSetUrns"] + for experiment in r.json() + if experiment.get("scoreSetUrns") + ] + return set([urn for urns in scoreset_urn_lists for urn in urns]) + + +def fetch_all_human_scoreset_urns() -> List[str]: + """Fetch all human scoreset URNs. Pretty slow, shouldn't be used frequently because + it requires requesting every single scoreset. + + :return: list of human scoreset URNs + """ + scoreset_urns = fetch_all_scoreset_urns() + human_scoresets: List[str] = [] + for urn in scoreset_urns: + r = requests.get(f"https://api.mavedb.org/api/v1/score-sets/{urn}") + try: + r.raise_for_status() + except requests.exceptions.HTTPError: + _logger.info(f"Unable to retrieve scoreset data for URN {urn}") + continue + data = r.json() + + ref_maps = data.get("targetGene", {}).get("referenceMaps", []) + if ref_maps: + for ref_map in ref_maps: + if ref_map.get("genome", {}).get("organismName", "") == "Homo sapiens": + human_scoresets.append(urn) + return human_scoresets + + +def get_scoreset_metadata(scoreset_urn: str) -> ScoresetMetadata: + """Acquire metadata for scoreset. + + Currently makes an API call every time. In the future, could be cached. + + :param scoreset_urn: URN for scoreset + :return: Object containing salient metadata + :raise ResourceAcquisitionError: if unable to acquire metadata + """ + url = f"https://api.mavedb.org/api/v1/score-sets/{scoreset_urn}" + r = requests.get(url) + try: + r.raise_for_status() + except requests.HTTPError: + _logger.error(f"Received HTTPError from {url}") + raise ResourceAcquisitionError(f"Metadata for scoreset {scoreset_urn}") + metadata = r.json() + try: + structured_data = ScoresetMetadata( + urn=metadata["urn"], + target_gene_name=metadata["targetGene"]["name"], + target_gene_category=metadata["targetGene"]["category"], + target_sequence=metadata["targetGene"]["wtSequence"]["sequence"], + target_sequence_type=metadata["targetGene"]["wtSequence"]["sequenceType"], + ) + except (KeyError, ValidationError) as e: + _logger.error( + f"Unable to extract metadata from API response for scoreset {scoreset_urn}: {e}" + ) + raise ResourceAcquisitionError(f"Metadata for scoreset {scoreset_urn}") + return structured_data + + +def get_scoreset_records(scoreset_urn: str) -> List[ScoreRow]: + """Get scoreset records. + + Only hit the MaveDB API if unavailable locally. In the future, we should use + caching utilities and function args to allow invalidation or force fetching from + remote. + + :param scoreset_urn: URN for scoreset + :return: Array of individual ScoreRow objects, containing information like protein + changes and scores + :raise ResourceAcquisitionError: if unable to fetch file + """ + scores_csv = LOCAL_STORE_PATH / f"{scoreset_urn.replace(':', ' ')}_scores.csv" + if not scores_csv.exists(): + url = f"https://api.mavedb.org/api/v1/score-sets/{scoreset_urn}/scores" + try: + _http_download(url, scores_csv) + except requests.HTTPError: + _logger.error(f"HTTPError when fetching scores CSV from {url}") + raise ResourceAcquisitionError(f"Scores CSV for scoreset {scoreset_urn}") + + scores_data: List[ScoreRow] = [] + with open(scores_csv, "r") as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + scores_data.append(ScoreRow(**row)) + return scores_data + + +def get_ref_genome_file( + url: str = "https://hgdownload.cse.ucsc.edu/goldenpath/hg38/bigZips/hg38.2bit", +) -> Path: + """Acquire reference genome file. This file shouldn't change, so no need + to worry about cache handling once it's fetched. + + :param url: URL to fetch reference file from. By default, points to the USCS-hosted + hg38 file in the 2bit file format. + :return: path to acquired file + :raise ResourceAcquisitionError: if unable to acquire file. + """ + genome_file = LOCAL_STORE_PATH / "hg38.2bit" + if not genome_file.exists(): + try: + _http_download(url, genome_file, True) + except requests.HTTPError: + _logger.error(f"HTTPError when fetching reference genome file from {url}") + raise ResourceAcquisitionError(f"Reference genome file at {url}") + return genome_file + + +def get_mapping_tmp_dir() -> Path: + """Acquire app-specific "tmp" directory. It's not actually temporary because it's + manually maintained, but we need a slightly more durable file location than what the + system tmp directory can provide. + + :return: path to temporary file directory + """ + tmp: Path = impresources.files("mavedb_mapping") / "tmp" # type: ignore + tmp.mkdir(exist_ok=True) + return tmp diff --git a/mavedb_mapping/schemas.py b/mavedb_mapping/schemas.py index 7177121..e009cd0 100644 --- a/mavedb_mapping/schemas.py +++ b/mavedb_mapping/schemas.py @@ -5,11 +5,17 @@ class TargetGeneCategory(str, Enum): - """Define target gene category options""" + """Define target gene category options. Add more definitions as needed.""" PROTEIN_CODING = "Protein coding" +class TargetSequenceType(str, Enum): + """Define target sequence type. Add more definitions as needed.""" + + PROTEIN = "protein" + + class ScoresetMetadata(BaseModel): """Salient metadata for a scoreset.""" @@ -17,7 +23,7 @@ class ScoresetMetadata(BaseModel): target_gene_name: str target_gene_category: TargetGeneCategory target_sequence: str - target_sequence_type: str + target_sequence_type: TargetSequenceType class ScoreRow(BaseModel): @@ -25,5 +31,5 @@ class ScoreRow(BaseModel): hgvs_pro: str hgvs_nt: str - scores: str + score: str accession: str diff --git a/pyproject.toml b/pyproject.toml index 108a7d3..6f2acf6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ 'Programming Language :: Python :: 3.11', ] requires-python = ">=3.8" -dependencies = ["requests", "pydantic"] +dependencies = ["requests", "pydantic", "biopython", "tqdm"] [build-system] requires = ["setuptools"]