Skip to content

Commit

Permalink
Update report.report_bam and rename to report.bam_exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
akikuno committed Mar 29, 2024
1 parent f9b9382 commit 011b21a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from __future__ import annotations

import random
from collections import defaultdict
from itertools import groupby
from pathlib import Path

import midsv
import pysam

from DAJIN2.utils import sam_handler
from DAJIN2.utils import io, sam_handler


def realign(sam: list[list[str]], GENOME_COODINATES: dict) -> list[str]:
def recalculate_sam_coodinates_to_reference(sam: list[list[str]], GENOME_COODINATES: dict) -> list[str]:
"""Recalculate SAM genomic coordinates with the reference genome, not with the FASTA_ALLELE"""
sam_headers = [s for s in sam if s[0].startswith("@")]
sam_contents = [s for s in sam if not s[0].startswith("@")]
for s in sam_headers:
Expand All @@ -29,31 +28,44 @@ def realign(sam: list[list[str]], GENOME_COODINATES: dict) -> list[str]:
return sam_headers + sam_contents


def convert_pos_to_one_indexed(sam_lines: list[list[str]]) -> list[list[str]]:
"""Convert SAM POS from 0-indexed to 1-indexed"""

def convert_line(line: list[str]) -> list[str]:
if not line[0].startswith("@") and line[3] == "0":
line[3] = "1"
return line

return [convert_line(line) for line in sam_lines]


def group_by_name(sam_contents: list[str], clust_sample: list[dict]) -> dict[list]:
"""Group alignments in map-ont.sam by allele name (NAME)"""
sam_contents.sort()
clust_sample_qname = sorted(clust_sample, key=lambda x: x["QNAME"])
clust_sample_qname_set = set()
for qnames in clust_sample_qname:
qname = qnames["QNAME"]
clust_sample_qname_set.add(qname)
clust_sample_sorted = sorted(clust_sample, key=lambda x: x["QNAME"])

qnames: set[str] = {c["QNAME"] for c in clust_sample_sorted}

sam_groups = defaultdict(list)
idx_left = 0
idx_right = 0
while idx_left < len(sam_contents) and idx_right < len(clust_sample_qname):
read_left = sam_contents[idx_left][:-1]
read_right = clust_sample_qname[idx_right]
qname_left = read_left[0]
qname_right = read_right["QNAME"]
if qname_left not in clust_sample_qname_set:
idx_left += 1
idx_sam_contents = 0
idx_clust_sample = 0
while idx_sam_contents < len(sam_contents) and idx_clust_sample < len(clust_sample_sorted):
alignments_sam = sam_contents[idx_sam_contents][:-1] # Discard CS tags to reduce file size
alignments_clsut_sample = clust_sample_sorted[idx_clust_sample]
qname_sam = alignments_sam[0]

if qname_sam not in qnames:
idx_sam_contents += 1
continue
if qname_left == qname_right:
key = read_right["NAME"]
sam_groups[key].append(read_left)
idx_left += 1

if qname_sam == alignments_clsut_sample["QNAME"]:
key = alignments_clsut_sample["NAME"]
sam_groups[key].append(alignments_sam)
idx_sam_contents += 1
else:
idx_right += 1
return sam_groups
idx_clust_sample += 1

return dict(sam_groups)


###############################################################################
Expand All @@ -67,13 +79,11 @@ def subset_qnames(RESULT_SAMPLE, readnum: int = 100) -> dict[set[str]]:
group = list(group)
qnames = [res["QNAME"] for res in group[:readnum]]
qnames_by_name[name] = set(qnames)
return qnames_by_name
return dict(qnames_by_name)


def subset_reads(name, sam_content, qnames_by_name):
qnames = qnames_by_name[name]
sam_subset = [sam for sam in sam_content if sam[0] in qnames]
return sam_subset
def subset_reads(sam_content: list[str], qnames: set[str]) -> list[str]:
return [sam for sam in sam_content if sam[0] in qnames]


###############################################################################
Expand All @@ -89,31 +99,34 @@ def write_sam_to_bam(sam: list[list[str]], path_sam: str | Path, path_bam: str |


def update_sam(sam: list, GENOME_COODINATES: dict = {}) -> list:
sam_update = sam.copy()
sam_update = sam_handler.remove_overlapped_reads(sam_update)
sam_update = sam_handler.remove_microhomology(sam_update)
if "genome" in GENOME_COODINATES:
sam_update = realign(sam_update, GENOME_COODINATES)
return sam_update
sam_records = sam.copy()
sam_records = sam_handler.remove_overlapped_reads(sam_records)
sam_records = sam_handler.remove_microhomology(sam_records)
if GENOME_COODINATES["genome"]:
return recalculate_sam_coodinates_to_reference(sam_records, GENOME_COODINATES)
else:
return convert_pos_to_one_indexed(sam_records)


def export_to_bam(TEMPDIR, NAME, GENOME_COODINATES, THREADS, RESULT_SAMPLE=None, is_control=False) -> None:
randomnum = random.randint(100_000, 999_999)
def export_to_bam(TEMPDIR, NAME, GENOME_COODINATES, THREADS, UUID, RESULT_SAMPLE=None, is_control=False) -> None:
path_sam_input = Path(TEMPDIR, NAME, "sam", "map-ont_control.sam")
sam = list(midsv.read_sam(path_sam_input))
sam_records = list(io.read_sam(path_sam_input))

# Update sam
sam_update = update_sam(sam, GENOME_COODINATES)
sam_updated = update_sam(sam_records, GENOME_COODINATES)

# Output SAM and BAM
path_sam_output = Path(TEMPDIR, "report", "BAM", f"tmp{randomnum}_{NAME}_control.sam")
path_sam_output = Path(TEMPDIR, "report", "BAM", f"temp_{UUID}_{NAME}_control.sam")
path_bam_output = Path(TEMPDIR, "report", "BAM", NAME, f"{NAME}.bam")
write_sam_to_bam(sam_update, path_sam_output, path_bam_output, THREADS)
write_sam_to_bam(sam_updated, path_sam_output, path_bam_output, THREADS)

# Prepare SAM headers and contents
sam_headers = [s for s in sam_update if s[0].startswith("@")]
sam_contents = [s for s in sam_update if not s[0].startswith("@")]
sam_headers = [s for s in sam_updated if s[0].startswith("@")]
sam_contents = [s for s in sam_updated if not s[0].startswith("@")]
if is_control:
qnames = set(list(set(s[0] for s in sam_contents[:10000]))[:100])
sam_subset = [s for s in sam_update if s[0] in qnames]
path_sam_output = Path(TEMPDIR, "report", "BAM", f"tmp{randomnum}_{NAME}_control_cache.sam")
qnames: set[str] = set(list(set(s[0] for s in sam_contents[:10000]))[:100])
sam_subset = [s for s in sam_updated if s[0] in qnames]
path_sam_output = Path(TEMPDIR, "report", "BAM", f"temp_{UUID}_{NAME}_control_cache.sam")
path_bam_output = Path(TEMPDIR, "cache", ".igvjs", NAME, "control.bam")
write_sam_to_bam(sam_headers + sam_subset, path_sam_output, path_bam_output, THREADS)
else:
Expand All @@ -122,14 +135,15 @@ def export_to_bam(TEMPDIR, NAME, GENOME_COODINATES, THREADS, RESULT_SAMPLE=None,
# Output SAM and BAM
for name, sam_content in sam_groups.items():
# BAM
path_sam_output = Path(TEMPDIR, "report", "bam", f"tmp{randomnum}_{name}.sam")
path_sam_output = Path(TEMPDIR, "report", "BAM", f"temp_{UUID}_{name}.sam")
path_bam_output = Path(TEMPDIR, "report", "BAM", NAME, f"{NAME}_{name}.bam")
write_sam_to_bam(sam_headers + sam_content, path_sam_output, path_bam_output, THREADS)
# igvjs
sam_subset = subset_reads(name, sam_content, qnames_by_name)
path_sam_output = Path(TEMPDIR, "report", "bam", f"tmp{randomnum}_{name}_subset.sam")
sam_subset = subset_reads(sam_content, qnames_by_name[name])
path_sam_output = Path(TEMPDIR, "report", "BAM", f"temp_{UUID}_{name}_subset.sam")
path_bam_output = Path(TEMPDIR, "report", ".igvjs", NAME, f"{name}.bam")
write_sam_to_bam(sam_headers + sam_subset, path_sam_output, path_bam_output, THREADS)

# Remove temporary files
sam_temp = Path(TEMPDIR, "report", "BAM").glob(f"tmp{randomnum}*.sam")
sam_temp = Path(TEMPDIR, "report", "BAM").glob(f"temp_{UUID}*.sam")
[s.unlink() for s in sam_temp]
36 changes: 36 additions & 0 deletions tests/src/report/test_bam_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import pytest
from pathlib import Path
from unittest.mock import patch
from src.DAJIN2.core.report import bam_exporter

###############################################################################
# Output
###############################################################################


@pytest.fixture
def mock_pysam_sort_and_index():
with patch("pysam.sort") as mock_sort:
with patch("pysam.index") as mock_index:
yield mock_sort, mock_index


def test_write_sam_to_bam(mock_pysam_sort_and_index):
mock_sort, mock_index = mock_pysam_sort_and_index
sam_data = [["r1", "seq1", "+", "chr1", "1000"], ["r2", "seq2", "-", "chr2", "2000"]]
path_sam = "test.sam"
path_bam = "test.bam"
bam_exporter.write_sam_to_bam(sam_data, path_sam, path_bam)

mock_sort.assert_called_once()
mock_index.assert_called_once()

with open(path_sam) as f:
content = f.read()
expected_content = "r1\tseq1\t+\tchr1\t1000\nr2\tseq2\t-\tchr2\t2000\n"
assert content == expected_content

# テスト後のクリーンアップ
Path(path_sam).unlink()

0 comments on commit 011b21a

Please sign in to comment.