Skip to content

Commit

Permalink
Update utils.input_validator and preprocess.genome_fetcher to tem…
Browse files Browse the repository at this point in the history
…porarily disable SSL certificate verification, allowing access to UCSC servers.
  • Loading branch information
akikuno committed May 15, 2024
1 parent ff317cb commit 0392fb3
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 30 deletions.
15 changes: 11 additions & 4 deletions src/DAJIN2/core/preprocess/genome_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from __future__ import annotations

import ssl
from urllib.request import urlopen


def fetch_html_without_verification(url: str) -> str:
context = ssl._create_unverified_context() # Create an SSL context that temporarily disables verification
with urlopen(url, context=context, timeout=10) as response:
return response.read().decode("utf-8").split("\n")


def fetch_seq_coordinates(genome: str, blat_url: str, seq: str) -> dict:
url = f"{blat_url}?db={genome}&type=BLAT&userSeq={seq}"
records = urlopen(url).read().decode("utf8").split("\n")
records = fetch_html_without_verification(url)
matches = []
for record in records:
if "100.0%" not in record:
Expand Down Expand Up @@ -43,9 +50,9 @@ def fetch_chromosome_size(genome_coordinates: dict, genome_urls: dict) -> int:
genome = genome_coordinates["genome"]
url = f"{genome_urls['goldenpath']}/{genome}/bigZips/{genome}.chrom.sizes"

response = urlopen(url).read().decode("utf8").split("\n")
for line in response:
chrom_name, size = line.split("\t")
records = fetch_html_without_verification(url)
for record in records:
chrom_name, size = record.split("\t")
if chrom == chrom_name:
return int(size)
raise ValueError(f"Chromosome {chrom} size not found.")
42 changes: 16 additions & 26 deletions src/DAJIN2/utils/input_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import re
import ssl
import hashlib

from pathlib import Path
Expand All @@ -11,17 +12,6 @@

import mappy

########################################################################
# To ensure that SSL certificate verification does not fail,
# obtain the latest Certificate Authority (CA) certificates.
########################################################################

import ssl
import certifi

context = ssl.create_default_context(cafile=certifi.where())


########################################################################
# To accommodate cases where a user might input negative values or
# excessively large values, update the number of threads
Expand Down Expand Up @@ -124,13 +114,10 @@ def exists_cached_genome(genome: str, tempdir: Path, exists_cache_control: bool)
########################################################################


def get_html(url: str) -> str:
try:
with urlopen(url, timeout=10, context=context) as response:
html = response.read().decode("utf-8")
return html
except URLError:
return ""
def fetch_html_without_verification(url: str) -> str:
context = ssl._create_unverified_context() # Create an SSL context that temporarily disables verification
with urlopen(url, context=context, timeout=10) as response:
return response.read().decode("utf-8")


def format_url(key: str, url: str) -> str:
Expand All @@ -139,28 +126,31 @@ def format_url(key: str, url: str) -> str:

def get_first_available_url(key: str, urls: list[str]) -> str | None:
search_keys = {"blat": "BLAT Search Genome", "das": "GRCh38/hg38", "goldenpath": "bigZips"}
return next((url for url in urls if search_keys[key] in get_html(format_url(key, url))), None)
return next(
(url for url in urls if search_keys[key] in fetch_html_without_verification(format_url(key, url))), None
)


def fetch_xml_data(url: str) -> bytes:
def fetch_xml_without_verification(url: str) -> bytes:
"""Fetch XML data from a given URL."""
with urlopen(url, context=context) as response:
context = ssl._create_unverified_context() # Create an SSL context that temporarily disables verification
with urlopen(url, context=context, timeout=10) as response:
return response.read()


def extract_genome_ids_from_xml(xml_data: bytes) -> set:
def extract_genome_ids_from_xml(xml_data: bytes) -> set[str]:
"""Extract genome IDs from XML data."""
root = ET.fromstring(xml_data)
return {cc.attrib["id"] for child in root for cc in child if cc.tag == "SOURCE"}


def get_genome_ids_in_ucsc(url_das: str) -> set:
def get_genome_ids_in_ucsc(url_das: str) -> set[str]:
"""Get available genome IDs in UCSC."""
xml_data = fetch_xml_data(url_das)
xml_data = fetch_xml_without_verification(url_das)
return extract_genome_ids_from_xml(xml_data)


def is_genome_in_ucsc_ids(genome: str, url_das: str) -> bool:
def is_genome_id_available_in_ucsc(genome: str, url_das: str) -> bool:
genome_ids = get_genome_ids_in_ucsc(url_das)
return genome in genome_ids

Expand Down Expand Up @@ -195,7 +185,7 @@ def validate_genome_and_fetch_urls(genome: str) -> dict[str, str]:
if available_servers[key] is None:
raise URLError(message)

if not is_genome_in_ucsc_ids(genome, available_servers["das"]):
if not is_genome_id_available_in_ucsc(genome, available_servers["das"]):
raise ValueError(f"{genome} is not listed. Available genomes are in {available_servers['das']}")

del available_servers["das"]
Expand Down

0 comments on commit 0392fb3

Please sign in to comment.