Skip to content

Commit

Permalink
Remame insertions_to_fasta.generate_insertions_fasta to `insertion_…
Browse files Browse the repository at this point in the history
…detector.detect_insertions` because the function is not only for generating fasta files but also for generating csv tag.
  • Loading branch information
akikuno committed Jul 28, 2024
1 parent f2d3dc4 commit 63c9d63
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 137 deletions.
4 changes: 2 additions & 2 deletions src/DAJIN2/core/preprocess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from DAJIN2.core.preprocess.directory_manager import create_report_directories, create_temporal_directories
from DAJIN2.core.preprocess.genome_fetcher import fetch_chromosome_size, fetch_coordinates
from DAJIN2.core.preprocess.input_formatter import format_inputs
from DAJIN2.core.preprocess.insertions_to_fasta import generate_insertion_fasta
from DAJIN2.core.preprocess.inversions_to_fasta import generate_inversion_fasta
from DAJIN2.core.preprocess.insertion_detector import detect_insertions
from DAJIN2.core.preprocess.inversion_detector import detect_inversions
from DAJIN2.core.preprocess.knockin_handler import extract_knockin_loci
from DAJIN2.core.preprocess.mapping import generate_sam
from DAJIN2.core.preprocess.midsv_caller import generate_midsv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,21 @@
from sklearn.cluster import MeanShift

from DAJIN2.core.preprocess.mapping import to_sam
from DAJIN2.core.preprocess.sv_handler import add_unique_allele_keys, extract_unique_sv, save_cstag, save_fasta
from DAJIN2.utils import config, io
from DAJIN2.utils.cssplits_handler import convert_cssplits_to_cstag
from DAJIN2.utils.cssplits_handler import convert_cssplit_to_dna, convert_cssplits_to_cstag

config.set_warnings_ignore()

import cstag


def remove_non_alphabets(cssplits: str) -> str:
"""Convert a cssplits to a plain DNA sequence."""
return "".join([char for char in cssplits if char.isalpha()])


###########################################################
# Cluster insertion alleles
###########################################################


def clustering_insertions(cssplits_insertion: list[str]) -> list[int]:
seq_all = [remove_non_alphabets(seq) for seq in cssplits_insertion]
seq_all = [convert_cssplit_to_dna(seq) for seq in cssplits_insertion]
query = seq_all[0]
_, distances, _ = zip(*process.extract_iter(query, seq_all, scorer=DamerauLevenshtein.normalized_distance))

Expand Down Expand Up @@ -225,7 +220,7 @@ def subset_insertions(
sequences = random.sample(cs_insertion, num_subset)
else:
sequences = cs_insertion
sequences = tuple(remove_non_alphabets(seq) for seq in sequences)
sequences = tuple(convert_cssplit_to_dna(seq) for seq in sequences)
insertions_merged_subset[idx_grouped][sequences] = counts

return dict(insertions_merged_subset)
Expand Down Expand Up @@ -272,7 +267,7 @@ def extract_score_and_sequence(
flag_break = False
for seqs, count in insertions_merged_subset[idx_grouped].items():
for _, distance, _ in process.extract_iter(
remove_non_alphabets(cssplits[idx]), seqs, scorer=DamerauLevenshtein.normalized_distance
convert_cssplit_to_dna(cssplits[idx]), seqs, scorer=DamerauLevenshtein.normalized_distance
):
# Record the insertion sequence and count if the sample and insertion sequences are similar
if distance < 0.1:
Expand Down Expand Up @@ -482,68 +477,6 @@ def generate_fasta(cstag_insertions: dict[str, str]) -> dict[str, str]:
return fasta_insertions


def extract_unique_insertions(fasta_insertions: dict[str, str], FASTA_ALLELES: dict[str, str]) -> dict[str, str]:
"""
Extract unique insertion alleles if they are dissimilar to the FASTA_ALLELES input by the user.
"Unique insertion alleles" are defined as sequences that have a difference of more than 10 bases compared to the sequences in FASTA_ALLELES
"""
fasta_insertions_unique = fasta_insertions.copy()

# Remove insertion alleles that are similar to the FASTA_ALLELES input by the user
to_delete = set()
for key, seq in fasta_insertions_unique.items():
_, distances, _ = zip(*process.extract_iter(seq, FASTA_ALLELES.values(), scorer=DamerauLevenshtein.distance))
if any(d < 10 for d in distances):
to_delete |= {key}

# Remove insertion alleles that are similar to each other
for key, seq in fasta_insertions_unique.items():
if key in to_delete:
continue
_, distances, _ = zip(
*process.extract_iter(seq, fasta_insertions_unique.values(), scorer=DamerauLevenshtein.distance)
)
similar_index = {i if d < 10 else None for i, d in enumerate(distances) if i != key}
to_delete |= similar_index

return {k: v for k, v in fasta_insertions_unique.items() if k not in to_delete}


def update_labels(d: dict, FASTA_ALLELES: dict) -> dict:
"""
Update labels to avoid duplicating user-specified alleles.
If the allele 'insertion1' exists in FASTA_ALLELES, increment the digits.
(insertion1 -> insertion01 -> insertion001...)
"""
user_defined_alleles = set(FASTA_ALLELES)
d_values = list(d.values())
len_d = len(d_values)
digits_up = 0
while True:
digits = len(str(len_d)) + digits_up
d_updated = {f"insertion{i+1:0{digits}}": seq for i, seq in enumerate(d_values)}
if user_defined_alleles.isdisjoint(set(d_updated)):
break
digits_up += 1

return d_updated


###########################################################
# Save cstag and fasta
###########################################################


def save_fasta(TEMPDIR: Path | str, SAMPLE_NAME: str, fasta_insertions: dict[str, str]) -> None:
for header, seq in fasta_insertions.items():
Path(TEMPDIR, SAMPLE_NAME, "fasta", f"{header}.fasta").write_text(f">{header}\n{seq}\n")


def save_cstag(TEMPDIR: Path | str, SAMPLE_NAME: str, cstag_insertions: dict[str, str]) -> None:
for header, cs_tag in cstag_insertions.items():
Path(TEMPDIR, SAMPLE_NAME, "cstag", f"{header}.txt").write_text(cs_tag + "\n")


###########################################################
# main
###########################################################
Expand All @@ -554,7 +487,7 @@ def remove_temporal_files(TEMPDIR: Path, SAMPLE_NAME: str) -> None:
_ = [path.unlink() for path in Path(TEMPDIR, SAMPLE_NAME, "fasta").glob("query-*.fasta")]


def generate_insertion_fasta(TEMPDIR, SAMPLE_NAME, CONTROL_NAME, FASTA_ALLELES) -> None:
def detect_insertions(TEMPDIR, SAMPLE_NAME, CONTROL_NAME, FASTA_ALLELES) -> None:
PATH_SAMPLE = Path(TEMPDIR, SAMPLE_NAME, "midsv", "control", f"{SAMPLE_NAME}.jsonl")
PATH_CONTROL = Path(TEMPDIR, CONTROL_NAME, "midsv", "control", f"{CONTROL_NAME}.jsonl")
MUTATION_LOCI = io.load_pickle(Path(TEMPDIR, SAMPLE_NAME, "mutation_loci", "control", "mutation_loci.pickle"))
Expand Down Expand Up @@ -596,16 +529,16 @@ def generate_insertion_fasta(TEMPDIR, SAMPLE_NAME, CONTROL_NAME, FASTA_ALLELES)
index_of_insertions = extract_index_of_insertions(insertions, insertions_merged)
cstag_insertions = generate_cstag(consensus_of_insertions, index_of_insertions, FASTA_ALLELES["control"])
fasta_insertions = generate_fasta(cstag_insertions)
fasta_insertions_unique = extract_unique_insertions(fasta_insertions, FASTA_ALLELES)
fasta_insertions_unique = extract_unique_sv(fasta_insertions, FASTA_ALLELES)

if fasta_insertions_unique == {}:
remove_temporal_files(TEMPDIR, SAMPLE_NAME)
return None

# Update labels
cstag_insertions_update = {key: cstag_insertions[key] for key in fasta_insertions_unique.keys()}
cstag_insertions_update = update_labels(cstag_insertions_update, FASTA_ALLELES)
fasta_insertions_update = update_labels(fasta_insertions_unique, FASTA_ALLELES)
cstag_insertions_update = add_unique_allele_keys(cstag_insertions_update, FASTA_ALLELES, "insertion")
fasta_insertions_update = add_unique_allele_keys(fasta_insertions_unique, FASTA_ALLELES, "insertion")

save_cstag(TEMPDIR, SAMPLE_NAME, cstag_insertions_update)
save_fasta(TEMPDIR, SAMPLE_NAME, fasta_insertions_update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,14 @@
from sklearn.cluster import MeanShift

from DAJIN2.core.consensus.consensus import call_percentage
from DAJIN2.core.preprocess.insertions_to_fasta import extract_unique_insertions
from DAJIN2.core.preprocess.sv_handler import add_unique_allele_keys, extract_unique_sv, save_cstag, save_fasta
from DAJIN2.utils import config, io
from DAJIN2.utils.cssplits_handler import convert_cssplits_to_cstag, revcomp_cssplits
from DAJIN2.utils.cssplits_handler import convert_cssplit_to_dna, convert_cssplits_to_cstag, revcomp_cssplits

config.set_warnings_ignore()

import cstag


def remove_non_alphabets(cssplit: str) -> str:
"""Convert a cssplit to a plain DNA sequence."""
return "".join([char for char in cssplit if char.isalpha()])


###########################################################
# Detect insertion sequences
###########################################################
Expand All @@ -47,7 +41,7 @@ def extract_inversions(midsv_sample: Generator[list[str]]) -> list[dict[int, int
{
"start": min_idx,
"end": max_idx,
"seq": remove_non_alphabets(",".join(cs_inversion)),
"seq": convert_cssplit_to_dna(",".join(cs_inversion)),
"CSSPLIT": m_sample["CSSPLIT"],
}
)
Expand Down Expand Up @@ -127,59 +121,12 @@ def call_consensus_of_inversion(
return consensus_of_inversions


###########################################################
# Update keys
###########################################################


def _check_duplicates_of_sets(set1: set[str], set2: set[str]) -> bool:
"""if True, there are duplicates."""
union_set = set1.union(set2)
total_elements = len(set1) + len(set2)
return len(union_set) != total_elements


def update_labels(ALLELES: dict[str, str], FASTA_ALLELES: dict[str, set], key: str) -> dict:
"""
Update labels to avoid duplicating user-specified alleles.
If the allele 'insertion1' exists in FASTA_ALLELES, increment the digits.
(insertion1 -> insertion01 -> insertion001...)
"""
user_defined_alleles = set(FASTA_ALLELES)
key_alleles = {allele for allele in user_defined_alleles if key in allele}
if key_alleles == set():
return {f"inversion{i+1}": value for i, value in enumerate(ALLELES.values())}

key_candidates = set()
digits = 2
while _check_duplicates_of_sets(key_candidates, key_alleles):
key_candidates = {f"{key}{i+1:0{digits}}" for i, _ in enumerate(ALLELES)}
digits += 1

return dict(zip(key_candidates, ALLELES.values()))


###########################################################
# Save cstag and fasta
###########################################################


def save_fasta(TEMPDIR: Path | str, SAMPLE_NAME: str, fasta_insertions: dict[str, str]) -> None:
for header, seq in fasta_insertions.items():
Path(TEMPDIR, SAMPLE_NAME, "fasta", f"{header}.fasta").write_text(f">{header}\n{seq}\n")


def save_cstag(TEMPDIR: Path | str, SAMPLE_NAME: str, cstag_insertions: dict[str, str]) -> None:
for header, cs_tag in cstag_insertions.items():
Path(TEMPDIR, SAMPLE_NAME, "cstag", f"{header}.txt").write_text(cs_tag + "\n")


###########################################################
# main
###########################################################


def generate_inversion_fasta(TEMPDIR, SAMPLE_NAME, CONTROL_NAME, FASTA_ALLELES) -> None:
def detect_inversions(TEMPDIR, SAMPLE_NAME, CONTROL_NAME, FASTA_ALLELES) -> None:
path_sample = Path(TEMPDIR, SAMPLE_NAME, "midsv", "control", f"{SAMPLE_NAME}.jsonl")
# path_control = Path(TEMPDIR, CONTROL_NAME, "midsv", "control", f"{CONTROL_NAME}.jsonl")
mutation_loci = io.load_pickle(Path(TEMPDIR, SAMPLE_NAME, "mutation_loci", "control", "mutation_loci.pickle"))
Expand All @@ -197,12 +144,12 @@ def generate_inversion_fasta(TEMPDIR, SAMPLE_NAME, CONTROL_NAME, FASTA_ALLELES)

seq_inversions = {key: d["SEQ"] for key, d in consensus_of_inversions.items()}
consensus_of_unique_inversions = {
key: consensus_of_inversions[key] for key in extract_unique_insertions(seq_inversions, FASTA_ALLELES)
key: consensus_of_inversions[key] for key in extract_unique_sv(seq_inversions, FASTA_ALLELES)
}
if consensus_of_unique_inversions == {}:
return None

consensus_of_unique_inversions = update_labels(consensus_of_unique_inversions, FASTA_ALLELES, "inversion")
consensus_of_unique_inversions = add_unique_allele_keys(consensus_of_unique_inversions, FASTA_ALLELES, "inversion")

save_fasta(TEMPDIR, SAMPLE_NAME, {key: d["SEQ"] for key, d in consensus_of_unique_inversions.items()})
save_cstag(TEMPDIR, SAMPLE_NAME, {key: d["CSTAG"] for key, d in consensus_of_unique_inversions.items()})
Expand Down
5 changes: 5 additions & 0 deletions src/DAJIN2/utils/cssplits_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def find_n_boundaries(cssplits: list[str]) -> tuple[int, int]:

return left_idx_n - 1, right_idx_n + 1

# ToDo: 点変異が扱えていない。convert_cssplits_to_cstagをinversion検出用に改変すれば、必要なくなる。
def convert_cssplit_to_dna(cssplit: str) -> str:
"""Convert a cssplit to a plain DNA sequence."""
return "".join([char for char in cssplit if char.isalpha()])


###########################################################
# reverse complement to cssplits
Expand Down

0 comments on commit 63c9d63

Please sign in to comment.