Skip to content

Commit

Permalink
Merge branch 'INSTX-6145_fastq_Us' into 'master'
Browse files Browse the repository at this point in the history
INSTX-6145 Support for RNA fastq files

Closes INSTX-6145

See merge request machine-learning/dorado!1168
  • Loading branch information
hpendry-ont committed Sep 12, 2024
2 parents 667d160 + 302475e commit a90fbf9
Show file tree
Hide file tree
Showing 14 changed files with 820 additions and 39 deletions.
15 changes: 6 additions & 9 deletions dorado/alignment/alignment_processing_items.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "alignment_processing_items.h"

#include "utils/PostCondition.h"
#include "utils/fastq_reader.h"
#include "utils/scoped_trace_log.h"
#include "utils/stream_utils.h"
#include "utils/tty_utils.h"
Expand Down Expand Up @@ -31,22 +32,18 @@ std::set<std::string>& get_supported_compression_extensions() {
return supported_compression_extensions;
};

bool is_valid_input_file(const std::filesystem::path& input_path) {
bool is_loadable_by_htslib(const std::filesystem::path& input_path) {
dorado::HtsFilePtr hts_file(hts_open(input_path.string().c_str(), "r"));
if (!hts_file) {
return false;
}

dorado::SamHdrPtr header(sam_hdr_read(hts_file.get()));
if (!header) {
spdlog::error(
"Could not read hts header from file {} - possibly a fastq file containing U "
"bases?",
input_path.string());
return false;
}
return header != nullptr;
}

return true;
bool is_valid_input_file(const std::filesystem::path& input_path) {
return is_loadable_by_htslib(input_path) || dorado::utils::is_fastq(input_path.string());
}

fs::path replace_extension(fs::path output_path) {
Expand Down
4 changes: 2 additions & 2 deletions dorado/cli/aligner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ int aligner(int argc, char* argv[]) {
return EXIT_FAILURE;
}

spdlog::debug("> input fmt: {} aligned: {}", reader->format, reader->is_aligned);
auto header = SamHdrPtr(sam_hdr_dup(reader->header));
spdlog::debug("> input fmt: {} aligned: {}", reader->format(), reader->is_aligned);
auto header = SamHdrPtr(sam_hdr_dup(reader->header()));
utils::add_hd_header_line(header.get());
add_pg_hdr(header.get());
dorado::utils::strip_alignment_data_from_header(header.get());
Expand Down
4 changes: 2 additions & 2 deletions dorado/cli/demux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ int demuxer(int argc, char* argv[]) {

HtsReader reader(all_files[0].input, read_list);
utils::MergeHeaders hdr_merger(strip_alignment);
hdr_merger.add_header(reader.header, all_files[0].input);
hdr_merger.add_header(reader.header(), all_files[0].input);

// Fold in the headers from all the other files in the input list.
for (size_t input_idx = 1; input_idx < all_files.size(); input_idx++) {
HtsReader header_reader(all_files[input_idx].input, read_list);
auto error_msg = hdr_merger.add_header(header_reader.header, all_files[input_idx].input);
auto error_msg = hdr_merger.add_header(header_reader.header(), all_files[input_idx].input);
if (!error_msg.empty()) {
spdlog::error("Unable to combine headers from all input files: " + error_msg);
return EXIT_FAILURE;
Expand Down
2 changes: 1 addition & 1 deletion dorado/cli/trim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ int trim(int argc, char* argv[]) {
}

HtsReader reader(reads[0], read_list);
auto header = SamHdrPtr(sam_hdr_dup(reader.header));
auto header = SamHdrPtr(sam_hdr_dup(reader.header()));
cli::add_pg_hdr(header.get(), "trim", args, "cpu");
// Always remove alignment information from input header
// because at minimum the adapters are trimmed, which
Expand Down
167 changes: 151 additions & 16 deletions dorado/read_pipeline/HtsReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,174 @@

#include "read_pipeline/DefaultClientInfo.h"
#include "read_pipeline/ReadPipeline.h"
#include "read_pipeline/messages.h"
#include "utils/fastq_reader.h"
#include "utils/types.h"

#include <htslib/sam.h>
#include <spdlog/spdlog.h>

#include <filesystem>
#include <functional>
#include <memory>
#include <set>
#include <sstream>
#include <stdexcept>
#include <string>
#include <unordered_set>
#include <vector>

namespace dorado {

namespace {

const std::string HTS_FORMAT_TEXT_FASTQ{"FASTQ sequence text"};

void write_bam_aux_tag_from_string(bam1_t& record, const std::string& bam_tag_string) {
// Format TAG:TYPE:VALUE where TAG is a 2 char string, TYPE is a single char, and value
std::istringstream tag_stream{bam_tag_string};

std::string tag_id;
if (!std::getline(tag_stream, tag_id, ':') || tag_id.size() != 2) {
return;
}

// Currently we only write to the fastq header the tags RG:Z, st:Z and DS:Z.
// These these are simple to read as a std::string as they are just a string
// of printable characters including SPACE, regex: [ !-~]*
// So filter out anything apart from string fields so we don't need to worry
// about the encoding of other tags when written to a fastq text file.
std::string tag_type;
if (!std::getline(tag_stream, tag_type, ':') || tag_type != "Z") {
return;
}

std::string tag_data;
if (!std::getline(tag_stream, tag_data) || tag_data.size() == 0) {
return;
}

//int bam_aux_append(bam1_t * b, const char tag[2], char type, int len, const uint8_t* data);
bam_aux_append(&record, tag_id.data(), tag_type.at(0), static_cast<int>(tag_data.size() + 1),
reinterpret_cast<const uint8_t*>(tag_data.c_str()));
}

void write_bam_aux_tags_from_fastq(bam1_t& record, const utils::FastqRecord& fastq_record) {
for (const auto& bam_tag_string : fastq_record.get_bam_tags()) {
write_bam_aux_tag_from_string(record, bam_tag_string);
}
}

bool try_assign_bam_from_fastq(bam1_t& record, const utils::FastqRecord& fastq_record) {
std::vector<uint8_t> qscore{};
qscore.reserve(fastq_record.qstring().size());
std::transform(fastq_record.qstring().begin(), fastq_record.qstring().end(),
std::back_inserter(qscore), [](char c) { return static_cast<uint8_t>(c - 33); });
constexpr uint16_t flags = 4; // 4 = UNMAPPED
constexpr int leftmost_pos = -1; // UNMAPPED - will be written as 0
constexpr uint8_t map_q = 0; // UNMAPPED
constexpr int next_pos = -1; // UNMAPPED - will be written as 0
const auto read_id = fastq_record.read_id_view();
if (bam_set1(&record, read_id.size(), read_id.data(), flags, -1, leftmost_pos, map_q, 0,
nullptr, -1, next_pos, 0, fastq_record.sequence().size(),
fastq_record.sequence().c_str(), (char*)qscore.data(), 0) < 0) {
return false;
}

write_bam_aux_tags_from_fastq(record, fastq_record);
return true;
}

class FastqBamRecordGenerator {
utils::FastqReader m_fastq_reader;
SamHdrPtr m_header;

public:
FastqBamRecordGenerator(const std::string& filename) : m_fastq_reader(filename) {
if (!is_valid()) {
return;
}

m_header.reset(sam_hdr_init());
}

bool is_valid() const { return m_fastq_reader.is_valid(); }

sam_hdr_t* header() { return m_header.get(); }

const sam_hdr_t* header() const { return m_header.get(); }

const std::string& format() const { return HTS_FORMAT_TEXT_FASTQ; }

bool try_get_next_record(bam1_t& record) {
auto fastq_record = m_fastq_reader.try_get_next_record();
if (!fastq_record) {
return false;
}
return try_assign_bam_from_fastq(record, *fastq_record);
}
};

class HtsLibBamRecordGenerator {
HtsFilePtr m_file{};
SamHdrPtr m_header{};
std::string m_format{};

public:
HtsLibBamRecordGenerator(const std::string& filename) {
m_file.reset(hts_open(filename.c_str(), "r"));
if (!m_file) {
return;
}
// If input format is FASTX, read tags from the query name line.
hts_set_opt(m_file.get(), FASTQ_OPT_AUX, "1");
auto format = hts_format_description(hts_get_format(m_file.get()));
if (format) {
m_format = format;
hts_free(format);
}
m_header.reset(sam_hdr_read(m_file.get()));
if (!m_header) {
return;
}
}

bool is_valid() const { return m_file != nullptr && m_header != nullptr; }

sam_hdr_t* header() const { return m_header.get(); }
const std::string& format() const { return m_format; }

bool try_get_next_record(bam1_t& record) {
return sam_read1(m_file.get(), m_header.get(), &record) >= 0;
}
};

} // namespace

HtsReader::HtsReader(const std::string& filename,
std::optional<std::unordered_set<std::string>> read_list)
: m_client_info(std::make_shared<DefaultClientInfo>()), m_read_list(std::move(read_list)) {
m_file = hts_open(filename.c_str(), "r");
if (!m_file) {
if (!try_initialise_generator<HtsLibBamRecordGenerator>(filename) &&
!try_initialise_generator<FastqBamRecordGenerator>(filename)) {
throw std::runtime_error("Could not open file: " + filename);
}
// If input format is FASTX, read tags from the query name line.
hts_set_opt(m_file, FASTQ_OPT_AUX, "1");
format = hts_format_description(hts_get_format(m_file));
header = sam_hdr_read(m_file);
if (!header) {
throw std::runtime_error("Could not read header from file: " + filename);
}
is_aligned = header->n_targets > 0;
is_aligned = m_header->n_targets > 0;

record.reset(bam_init1());
}

HtsReader::~HtsReader() {
hts_free(format);
sam_hdr_destroy(header);
record.reset();
hts_close(m_file);
template <typename T>
bool HtsReader::try_initialise_generator(const std::string& filename) {
auto generator = std::make_shared<T>(filename); // shared to allow copy assignment
if (!generator->is_valid()) {
return false;
}
m_header = generator->header();
m_format = generator->format();
m_bam_record_generator = [generator = std::move(generator)](bam1_t& bam_record) {
return generator->try_get_next_record(bam_record);
};
return true;
}

void HtsReader::set_client_info(std::shared_ptr<ClientInfo> client_info) {
Expand All @@ -49,7 +180,7 @@ void HtsReader::set_record_mutator(std::function<void(BamPtr&)> mutator) {
m_record_mutator = std::move(mutator);
}

bool HtsReader::read() { return sam_read1(m_file, header, record.get()) >= 0; }
bool HtsReader::read() { return m_bam_record_generator(*record); }

bool HtsReader::has_tag(const char* tagname) {
uint8_t* tag = bam_aux_get(record.get(), tagname);
Expand Down Expand Up @@ -83,6 +214,10 @@ std::size_t HtsReader::read(Pipeline& pipeline, std::size_t max_reads) {
return num_reads;
}

sam_hdr_t* HtsReader::header() { return m_header; }

const std::string& HtsReader::format() const { return m_format; }

ReadMap read_bam(const std::string& filename, const std::unordered_set<std::string>& read_ids) {
HtsReader reader(filename, std::nullopt);

Expand Down
16 changes: 12 additions & 4 deletions dorado/read_pipeline/HtsReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <htslib/sam.h>

#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
Expand All @@ -24,7 +25,6 @@ class HtsReader {
public:
HtsReader(const std::string& filename,
std::optional<std::unordered_set<std::string>> read_list);
~HtsReader();
bool read();

// If reading directly into a pipeline need to set the client info on the messages
Expand All @@ -35,17 +35,25 @@ class HtsReader {
bool has_tag(const char* tagname);
void set_record_mutator(std::function<void(BamPtr&)> mutator);

char* format{nullptr};
bool is_aligned{false};
BamPtr record{nullptr};
sam_hdr_t* header{nullptr};

sam_hdr_t* header();
const sam_hdr_t* header() const;
const std::string& format() const;

private:
htsFile* m_file{nullptr};
sam_hdr_t* m_header{nullptr}; // non-owning
std::string m_format{};
std::shared_ptr<ClientInfo> m_client_info;

std::function<void(BamPtr&)> m_record_mutator{};
std::optional<std::unordered_set<std::string>> m_read_list;

std::function<bool(bam1_t&)> m_bam_record_generator{};

template <typename T>
bool try_initialise_generator(const std::string& filename);
};

template <typename T>
Expand Down
6 changes: 3 additions & 3 deletions dorado/summary/summary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool SummaryData::process_file(const std::string& filename, std::ostream& writer
if (reader.is_aligned) {
m_field_flags |= ALIGNMENT_FIELDS;
}
auto read_group_exp_start_time = utils::get_read_group_info(reader.header, "DT");
auto read_group_exp_start_time = utils::get_read_group_info(reader.header(), "DT");
write_header(writer);
return write_rows_from_reader(reader, writer, read_group_exp_start_time);
}
Expand All @@ -103,7 +103,7 @@ bool SummaryData::process_tree(const std::string& folder, std::ostream& writer)
write_header(writer);
for (const auto& read_file : files) {
HtsReader reader(read_file, std::nullopt);
auto read_group_exp_start_time = utils::get_read_group_info(reader.header, "DT");
auto read_group_exp_start_time = utils::get_read_group_info(reader.header(), "DT");
bool ok = write_rows_from_reader(reader, writer, read_group_exp_start_time);
if (!ok) {
spdlog::error("File {} could not be processed. Skipping file.", read_file);
Expand Down Expand Up @@ -227,7 +227,7 @@ bool SummaryData::write_rows_from_reader(

if (reader.is_aligned && !(reader.record->core.flag & BAM_FUNMAP)) {
alignment_mapq = static_cast<int>(reader.record->core.qual);
alignment_genome = reader.header->target_name[reader.record->core.tid];
alignment_genome = reader.header()->target_name[reader.record->core.tid];

alignment_genome_start = int32_t(reader.record->core.pos);
alignment_genome_end = int32_t(bam_endpos(reader.record.get()));
Expand Down
2 changes: 2 additions & 0 deletions dorado/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ add_library(dorado_utils
crypto_utils.cpp
dev_utils.cpp
dev_utils.h
fastq_reader.cpp
fastq_reader.h
fs_utils.cpp
fs_utils.h
hts_file.cpp
Expand Down
Loading

0 comments on commit a90fbf9

Please sign in to comment.