Skip to content

Commit

Permalink
refactor: type safe checking for validation and caching of ndjson-fil…
Browse files Browse the repository at this point in the history
…e-emptiness
  • Loading branch information
Taepper committed Jun 18, 2024
1 parent 9cb5c20 commit c5b86c4
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 63 deletions.
9 changes: 5 additions & 4 deletions include/silo/preprocessing/preprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Database;
namespace preprocessing {

class SequenceInfo;
class ValidatedNdjsonFile;

class Preprocessor {
config::PreprocessingConfig preprocessing_config;
Expand Down Expand Up @@ -45,7 +46,7 @@ class Preprocessor {
static std::string makeNonNullKey(const std::string& field);
std::string getPartitionKeySelect() const;

void buildTablesFromNdjsonInput(const std::filesystem::path& file_name);
void buildTablesFromNdjsonInput(const ValidatedNdjsonFile& input_file);
void buildMetadataTableFromFile(const std::filesystem::path& metadata_filename);

void buildPartitioningTable();
Expand All @@ -58,10 +59,10 @@ class Preprocessor {
const std::string& table_name
);

void createPartitionedSequenceTablesFromNdjson(const std::filesystem::path& file_name);
void createPartitionedSequenceTablesFromNdjson(const ValidatedNdjsonFile& input_file);

void createAlignedPartitionedSequenceViews(const std::filesystem::path& file_name);
void createUnalignedPartitionedSequenceFiles(const std::filesystem::path& file_name);
void createAlignedPartitionedSequenceViews(const ValidatedNdjsonFile& input_file);
void createUnalignedPartitionedSequenceFiles(const ValidatedNdjsonFile& input_file);
void createUnalignedPartitionedSequenceFile(
const std::string& seq_name,
const std::string& table_sql
Expand Down
1 change: 0 additions & 1 deletion include/silo/preprocessing/sequence_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class SequenceInfo {

static void validateNdjsonFile(
const silo::ReferenceGenomes& reference_genomes,
duckdb::Connection& connection,
const std::filesystem::path& input_filename
);
};
Expand Down
33 changes: 33 additions & 0 deletions include/silo/preprocessing/validated_ndjson_file.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <filesystem>

namespace silo {
class ReferenceGenomes;
}

namespace silo::config {
class DatabaseConfig;
}

namespace silo::preprocessing {

class ValidatedNdjsonFile {
std::filesystem::path file_name;
bool empty;

explicit ValidatedNdjsonFile(std::filesystem::path file_name, bool empty);

public:
std::filesystem::path getFileName() const;

bool isEmpty() const;

static ValidatedNdjsonFile validateFileAgainstConfig(
const std::filesystem::path& file_name,
const silo::config::DatabaseConfig& database_config,
const silo::ReferenceGenomes& reference_genomes
);
};

} // namespace silo::preprocessing
59 changes: 21 additions & 38 deletions src/silo/preprocessing/preprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "silo/preprocessing/preprocessing_exception.h"
#include "silo/preprocessing/sequence_info.h"
#include "silo/preprocessing/sql_function.h"
#include "silo/preprocessing/validated_ndjson_file.h"
#include "silo/storage/reference_genomes.h"
#include "silo/storage/unaligned_sequence_store.h"
#include "silo/zstdfasta/zstd_decompressor.h"
Expand Down Expand Up @@ -79,15 +80,18 @@ Database Preprocessor::preprocess() {
const auto& ndjson_input_filename = preprocessing_config.getNdjsonInputFilename();
if (ndjson_input_filename.has_value()) {
SPDLOG_INFO("preprocessing - ndjson pipeline chosen");
auto input_file = ValidatedNdjsonFile::validateFileAgainstConfig(
ndjson_input_filename.value(), database_config, reference_genomes_
);
SPDLOG_DEBUG(
"preprocessing - building preprocessing tables from ndjson input '{}'",
ndjson_input_filename.value().string()
);
buildTablesFromNdjsonInput(ndjson_input_filename.value());
buildTablesFromNdjsonInput(input_file);
SPDLOG_DEBUG("preprocessing - building partitioning tables");
buildPartitioningTable();
SPDLOG_DEBUG("preprocessing - creating compressed sequence views for building SILO");
createPartitionedSequenceTablesFromNdjson(ndjson_input_filename.value());
createPartitionedSequenceTablesFromNdjson(input_file);
} else {
SPDLOG_INFO("preprocessing - classic metadata file pipeline chosen");
SPDLOG_DEBUG(
Expand Down Expand Up @@ -122,41 +126,24 @@ Database Preprocessor::preprocess() {
);
}

void Preprocessor::buildTablesFromNdjsonInput(const std::filesystem::path& file_name) {
void Preprocessor::buildTablesFromNdjsonInput(const ValidatedNdjsonFile& input_file) {
(void)preprocessing_db.query(fmt::format(
"CREATE OR REPLACE TABLE metadata_table({});",
boost::join(MetadataInfo::getMetadataSQLTypes(database_config), ",")
));

SPDLOG_DEBUG("build - checking whether the file '{}' exists: ", file_name.string());
if (!std::filesystem::exists(file_name)) {
throw silo::preprocessing::PreprocessingException(
fmt::format("The specified input file {} does not exist.", file_name.string())
);
}

SPDLOG_DEBUG("build - checking whether the file '{}' is not a directory: ", file_name.string());
if (std::filesystem::is_directory(file_name)) {
throw silo::preprocessing::PreprocessingException(
fmt::format("The specified input file {} is a directory.", file_name.string())
);
}

SPDLOG_DEBUG("build - checking whether the file '{}' is empty: ", file_name.string());
if (MetadataInfo::isNdjsonFileEmpty(file_name)) {
if (input_file.isEmpty()) {
SPDLOG_WARN(
"The specified input file {} is empty. Ignoring its content.", file_name.string()
"The specified input file {} is empty. Ignoring its content.",
input_file.getFileName().string()
);
return;
}

SPDLOG_DEBUG("build - validating metadata file '{}' with config", file_name.string());
MetadataInfo::validateNdjsonFile(file_name, database_config);

(void)preprocessing_db.query(fmt::format(
"INSERT INTO metadata_table BY NAME (SELECT {} FROM read_json_auto('{}'));",
boost::join(MetadataInfo::getMetadataSelects(database_config), ","),
file_name.string()
input_file.getFileName().string()
));

auto null_primary_key_result = preprocessing_db.query(fmt::format(
Expand All @@ -170,7 +157,7 @@ void Preprocessor::buildTablesFromNdjsonInput(const std::filesystem::path& file_
const std::string error_message = fmt::format(
"Error, there are {} primary keys that are NULL",
null_primary_key_result->RowCount(),
file_name.string()
input_file.getFileName().string()
);
SPDLOG_ERROR(error_message);
if (null_primary_key_result->RowCount() <= 10) {
Expand Down Expand Up @@ -321,20 +308,16 @@ FROM metadata_table;
);
}

void Preprocessor::createPartitionedSequenceTablesFromNdjson(const std::filesystem::path& file_name
void Preprocessor::createPartitionedSequenceTablesFromNdjson(const ValidatedNdjsonFile& input_file
) {
SequenceInfo::validateNdjsonFile(
reference_genomes_, preprocessing_db.getConnection(), file_name
);

createUnalignedPartitionedSequenceFiles(file_name);
createUnalignedPartitionedSequenceFiles(input_file);

createAlignedPartitionedSequenceViews(file_name);
createAlignedPartitionedSequenceViews(input_file);
}

void Preprocessor::createAlignedPartitionedSequenceViews(const std::filesystem::path& file_name) {
void Preprocessor::createAlignedPartitionedSequenceViews(const ValidatedNdjsonFile& input_file) {
std::string file_reader_sql;
if (MetadataInfo::isNdjsonFileEmpty(file_name)) {
if (input_file.isEmpty()) {
file_reader_sql = fmt::format(
"SELECT ''::VARCHAR AS key, 'NULL'::VARCHAR AS partition_key {} {} {} {} {} LIMIT 0",
boost::join(silo::prepend(", ''::VARCHAR AS ", prefixed_nuc_sequences), ""),
Expand Down Expand Up @@ -364,7 +347,7 @@ void Preprocessor::createAlignedPartitionedSequenceViews(const std::filesystem::
silo::tieAsString(
", metadata.\"", order_by_fields, "\" AS ", prefixed_order_by_fields, ""
),
file_name.string()
input_file.getFileName().string()
);
}

Expand Down Expand Up @@ -425,10 +408,10 @@ void Preprocessor::createAlignedPartitionedSequenceViews(const std::filesystem::
}
}

void Preprocessor::createUnalignedPartitionedSequenceFiles(const std::filesystem::path& file_name) {
void Preprocessor::createUnalignedPartitionedSequenceFiles(const ValidatedNdjsonFile& input_file) {
for (const auto& [seq_name, _] : reference_genomes_.raw_nucleotide_sequences) {
const std::string file_reader_sql =
MetadataInfo::isNdjsonFileEmpty(file_name)
input_file.isEmpty()
? fmt::format(
"SELECT ''::VARCHAR AS key, 'NULL'::VARCHAR as partition_key,"
" ''::VARCHAR AS unaligned_nuc_{} LIMIT 0",
Expand All @@ -441,7 +424,7 @@ void Preprocessor::createUnalignedPartitionedSequenceFiles(const std::filesystem
database_config.schema.primary_key,
getPartitionKeySelect(),
seq_name,
file_name.string()
input_file.getFileName().string()
);
const std::string table_sql = fmt::format(
"SELECT key, {}, partition_key_to_partition.partition_id \n"
Expand Down
7 changes: 2 additions & 5 deletions src/silo/preprocessing/sequence_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ void validateStruct(

void SequenceInfo::validateNdjsonFile(
const silo::ReferenceGenomes& reference_genomes,
duckdb::Connection& connection,
const std::filesystem::path& input_filename
) {
if (MetadataInfo::isNdjsonFileEmpty(input_filename)) {
return;
}

duckdb::DuckDB duck_db(nullptr);
duckdb::Connection connection(duck_db);
const std::vector<std::string> nuc_sequence_names =
reference_genomes.getSequenceNames<Nucleotide>();
const std::vector<std::string> aa_sequence_names =
Expand Down
19 changes: 4 additions & 15 deletions src/silo/preprocessing/sequence_info.test.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "silo/preprocessing/sequence_info.h"

#include <gtest/gtest.h>
#include <duckdb.hpp>

#include "silo/preprocessing/preprocessing_exception.h"
#include "silo/storage/reference_genomes.h"
Expand All @@ -13,23 +12,18 @@ TEST(SequenceInfo, validatesSuccessfulOnCorrectFile) {
const auto reference_genomes = ReferenceGenomes::readFromFile(
"testBaseData/exampleDataset1000Sequences/reference_genomes.json"
);

duckdb::DuckDB duckdb;
duckdb::Connection connection(duckdb);
ASSERT_NO_THROW(SequenceInfo::validateNdjsonFile(
reference_genomes, connection, "testBaseData/exampleDataset1000Sequences/sample.ndjson.zst"
reference_genomes, "testBaseData/exampleDataset1000Sequences/sample.ndjson.zst"
));
}

TEST(SequenceInfo, failWhenTooManyGenomesInReferences) {
const auto reference_genomes =
ReferenceGenomes::readFromFile("testBaseData/exampleDataset/reference_genomes.json");

duckdb::DuckDB duckdb;
duckdb::Connection connection(duckdb);
ASSERT_THROW(
SequenceInfo::validateNdjsonFile(
reference_genomes, connection, "testBaseData/exampleDataset1000Sequences/sample.ndjson.zst"
reference_genomes, "testBaseData/exampleDataset1000Sequences/sample.ndjson.zst"
),
silo::preprocessing::PreprocessingException
);
Expand All @@ -40,11 +34,9 @@ TEST(SequenceInfo, failWhenTooManyGenomesInJson) {
"testBaseData/exampleDataset1000Sequences/reference_genomes.json"
);

duckdb::DuckDB duckdb;
duckdb::Connection connection(duckdb);
ASSERT_THROW(
SequenceInfo::validateNdjsonFile(
reference_genomes, connection, "testBaseData/ndjsonFiles/oneline_second_nuc.json.zst"
reference_genomes, "testBaseData/ndjsonFiles/oneline_second_nuc.json.zst"
),
silo::preprocessing::PreprocessingException
);
Expand All @@ -54,12 +46,9 @@ TEST(SequenceInfo, failWhenTooFewAASequencesInJson) {
const auto reference_genomes = ReferenceGenomes::readFromFile(
"testBaseData/exampleDataset1000Sequences/reference_genomes.json"
);

duckdb::DuckDB duckdb;
duckdb::Connection connection(duckdb);
ASSERT_THROW(
SequenceInfo::validateNdjsonFile(
reference_genomes, connection, "testBaseData/ndjsonFiles/oneline_without_ORF.json.zst"
reference_genomes, "testBaseData/ndjsonFiles/oneline_without_ORF.json.zst"
),
silo::preprocessing::PreprocessingException
);
Expand Down
58 changes: 58 additions & 0 deletions src/silo/preprocessing/validated_ndjson_file.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include "silo/preprocessing/validated_ndjson_file.h"

#include <spdlog/spdlog.h>

#include "silo/config/database_config.h"
#include "silo/preprocessing/metadata_info.h"
#include "silo/preprocessing/preprocessing_exception.h"
#include "silo/preprocessing/sequence_info.h"
#include "silo/storage/reference_genomes.h"

namespace silo::preprocessing {

ValidatedNdjsonFile::ValidatedNdjsonFile(std::filesystem::path file_name, bool empty)
: file_name(std::move(file_name)),
empty(empty) {}

std::filesystem::path ValidatedNdjsonFile::getFileName() const {
return file_name;
}

bool ValidatedNdjsonFile::isEmpty() const {
return empty;
}

ValidatedNdjsonFile ValidatedNdjsonFile::validateFileAgainstConfig(
const std::filesystem::path& file_name,
const silo::config::DatabaseConfig& database_config,
const silo::ReferenceGenomes& reference_genomes
) {
SPDLOG_DEBUG("build - checking whether the file '{}' exists: ", file_name.string());
if (!std::filesystem::exists(file_name)) {
throw silo::preprocessing::PreprocessingException(
fmt::format("The specified input file {} does not exist.", file_name.string())
);
}

SPDLOG_DEBUG("build - checking whether the file '{}' is not a directory: ", file_name.string());
if (std::filesystem::is_directory(file_name)) {
throw silo::preprocessing::PreprocessingException(
fmt::format("The specified input file {} is a directory.", file_name.string())
);
}

const bool empty = MetadataInfo::isNdjsonFileEmpty(file_name);

SPDLOG_DEBUG("build - checking whether the file '{}' is empty: ", file_name.string());

if (!empty) {
SPDLOG_DEBUG("build - validating metadata file '{}' with config", file_name.string());
MetadataInfo::validateNdjsonFile(file_name, database_config);

SPDLOG_DEBUG("build - validating metadata file '{}' with config", file_name.string());
SequenceInfo::validateNdjsonFile(reference_genomes, file_name);
}

return ValidatedNdjsonFile(file_name, empty);
}
} // namespace silo::preprocessing

0 comments on commit c5b86c4

Please sign in to comment.