Skip to content

Commit

Permalink
feat: make partition_by field in config optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Taepper committed Aug 7, 2023
1 parent 2c8184f commit 3942418
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 43 deletions.
2 changes: 1 addition & 1 deletion include/silo/config/database_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct DatabaseSchema {
std::vector<DatabaseMetadata> metadata;
std::string primary_key;
std::optional<std::string> date_to_sort_by;
std::string partition_by;
std::optional<std::string> partition_by;
};

struct DatabaseConfig {
Expand Down
9 changes: 8 additions & 1 deletion include/silo/prepare_dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ void partitionData(
const silo::preprocessing::PreprocessingConfig& preprocessing_config,
const preprocessing::Partitions& partitions,
const PangoLineageAliasLookup& alias_key,
const silo::config::DatabaseConfig& database_config,
const std::string& primary_key_field,
const std::string& partition_by_field,
const ReferenceGenomes& reference_genomes
);

void copyDataToPartitionDirectory(
const preprocessing::PreprocessingConfig& preprocessing_config,
const std::string& primary_key_field,
const ReferenceGenomes& reference_genomes
);

Expand Down
2 changes: 1 addition & 1 deletion include/silo/preprocessing/pango_lineage_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct PangoLineageCounts {
PangoLineageCounts buildPangoLineageCounts(
const PangoLineageAliasLookup& alias_key,
const std::filesystem::path& metadata_path,
const silo::config::DatabaseConfig& database_config
const std::string& partition_by
);

} // namespace preprocessing
Expand Down
11 changes: 11 additions & 0 deletions include/silo/preprocessing/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define SILO_PARTITION_H

#include <cstdint>
#include <filesystem>
#include <functional>
#include <iosfwd>
#include <string>
Expand All @@ -17,6 +18,9 @@ class access;
namespace silo::common {
class UnaliasedPangoLineage;
}
namespace silo::config {
class DatabaseConfig;
}

namespace silo::preprocessing {

Expand Down Expand Up @@ -88,6 +92,8 @@ class Partitions {
std::unordered_map<std::string, silo::preprocessing::PartitionChunk> pango_to_chunk;

public:
Partitions();

explicit Partitions(std::vector<Partition> partitions_);

void save(std::ostream& output_file) const;
Expand All @@ -104,6 +110,11 @@ class Partitions {

Partitions buildPartitions(const PangoLineageCounts& pango_lineage_counts, Architecture arch);

Partitions createSingletonPartitions(
const std::filesystem::path& metadata_path,
const silo::config::DatabaseConfig& database_config
);

} // namespace silo::preprocessing

template <>
Expand Down
16 changes: 10 additions & 6 deletions src/silo/config/config_repository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,19 @@ void validatePartitionBy(
const DatabaseConfig& config,
std::map<std::string, ValueType>& metadata_map
) {
if (metadata_map.find(config.schema.partition_by) == metadata_map.end()) {
throw ConfigException("partition_by '" + config.schema.partition_by + "' is not in metadata");
if (config.schema.partition_by == std::nullopt) {
return;
}

const std::string partition_by = config.schema.partition_by.value();

if (metadata_map.find(partition_by) == metadata_map.end()) {
throw ConfigException("partition_by '" + partition_by + "' is not in metadata");
}

const auto& partition_by_type = metadata_map[config.schema.partition_by];
const auto& partition_by_type = metadata_map[partition_by];
if (partition_by_type != ValueType::PANGOLINEAGE) {
throw ConfigException(
"partition_by '" + config.schema.partition_by + "' must be of type PANGOLINEAGE"
);
throw ConfigException("partition_by '" + partition_by + "' must be of type PANGOLINEAGE");
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/silo/config/database_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ struct convert<silo::config::DatabaseSchema> {
} else {
schema.date_to_sort_by = std::nullopt;
}
schema.partition_by = node["partitionBy"].as<std::string>();
if (node["partitionBy"].IsDefined()) {
schema.partition_by = node["partitionBy"].as<std::string>();
} else {
schema.partition_by = std::nullopt;
}

if (!node["metadata"].IsSequence()) {
return false;
Expand All @@ -112,7 +116,9 @@ struct convert<silo::config::DatabaseSchema> {
Node node;
node["instanceName"] = schema.instance_name;
node["primaryKey"] = schema.primary_key;
node["partitionBy"] = schema.partition_by;
if (schema.partition_by.has_value()) {
node["partitionBy"] = *schema.partition_by;
}
if (schema.date_to_sort_by.has_value()) {
node["dateToSortBy"] = *schema.date_to_sort_by;
}
Expand Down Expand Up @@ -219,10 +225,13 @@ DatabaseConfig DatabaseConfigReader::readConfig(const std::filesystem::path& con
) -> decltype(ctx.out()) {
return format_to(
ctx.out(),
"{{ instance_name: '{}', primary_key: '{}', partition_by: '{}', metadata: [{}] }}",
"{{ instance_name: '{}', primary_key: '{}', partition_by: {}, date_to_sort_by: {}, metadata: "
"[{}] }}",
database_schema.instance_name,
database_schema.primary_key,
database_schema.partition_by,
database_schema.partition_by.has_value() ? "'" + *database_schema.partition_by + "'" : "none",
database_schema.date_to_sort_by.has_value() ? "'" + *database_schema.date_to_sort_by + "'"
: "none",
fmt::join(database_schema.metadata, ",")
);
}
Expand Down
8 changes: 8 additions & 0 deletions src/silo/config/database_config.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,12 @@ TEST(DatabaseConfigReader, shouldReadConfigWithoutDateToSortBy) {
ASSERT_EQ(config.schema.date_to_sort_by, std::nullopt);
}

TEST(DatabaseConfigReader, shouldReadConfigWithoutPartitionBy) {
const DatabaseConfig& config = DatabaseConfigReader().readConfig(
"testBaseData/test_database_config_without_partition_by.yaml"
);

ASSERT_EQ(config.schema.partition_by, std::nullopt);
}

} // namespace
56 changes: 39 additions & 17 deletions src/silo/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,24 +528,46 @@ Database Database::preprocessing(
const ReferenceGenomes& reference_genomes =
ReferenceGenomes::readFromFile(preprocessing_config.getReferenceGenomeFilename());

SPDLOG_INFO("preprocessing - counting pango lineages");
const preprocessing::PangoLineageCounts pango_descriptor(preprocessing::buildPangoLineageCounts(
database.alias_key, preprocessing_config.getMetadataInputFilename(), database_config_
));

SPDLOG_INFO("preprocessing - calculating partitions");
const preprocessing::Partitions partition_descriptor(
preprocessing::buildPartitions(pango_descriptor, preprocessing::Architecture::MAX_PARTITIONS)
);
preprocessing::Partitions partition_descriptor;
if (database_config_.schema.partition_by.has_value()) {
SPDLOG_INFO("preprocessing - counting pango lineages");
const preprocessing::PangoLineageCounts pango_descriptor(
preprocessing::buildPangoLineageCounts(
database.alias_key,
preprocessing_config.getMetadataInputFilename(),
database_config_.schema.partition_by.value()
)
);

SPDLOG_INFO("preprocessing - partitioning data");
partitionData(
preprocessing_config,
partition_descriptor,
database.alias_key,
database_config_,
reference_genomes
);
SPDLOG_INFO("preprocessing - calculating partitions");
partition_descriptor = preprocessing::Partitions(preprocessing::buildPartitions(
pango_descriptor, preprocessing::Architecture::MAX_PARTITIONS
));

SPDLOG_INFO("preprocessing - partitioning data");
partitionData(
preprocessing_config,
partition_descriptor,
database.alias_key,
database_config_.schema.primary_key,
database_config_.schema.partition_by.value(),
reference_genomes
);

} else {
SPDLOG_INFO(
"preprocessing - skip partition merging because no partition_by key was provided, instead "
"putting all sequences into the same partition"
);

partition_descriptor = preprocessing::createSingletonPartitions(
preprocessing_config.getMetadataInputFilename(), database_config_
);

copyDataToPartitionDirectory(
preprocessing_config, database_config_.schema.primary_key, reference_genomes
);
}

if (database_config_.schema.date_to_sort_by.has_value()) {
SPDLOG_INFO("preprocessing - sorting chunks");
Expand Down
19 changes: 19 additions & 0 deletions src/silo/database.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ TEST(DatabaseTest, shouldBuildDatabaseWithoutErrors) {
EXPECT_EQ(simple_database_info.sequence_count, 100);
}

TEST(DatabaseTest, shouldBuildPartitionLessDatabaseWithoutErrors) {
const silo::preprocessing::InputDirectory input_directory{"./testBaseData/"};

auto config = silo::preprocessing::PreprocessingConfigReader().readConfig(
input_directory.directory + "test_preprocessing_config.yaml"
);

const auto database_config = silo::config::ConfigRepository().getValidatedConfig(
input_directory.directory + "test_database_config_without_partition_by.yaml"
);

auto database = silo::Database::preprocessing(config, database_config);

const auto simple_database_info = database.getDatabaseInfo();

EXPECT_GT(simple_database_info.total_size, 0);
EXPECT_EQ(simple_database_info.sequence_count, 100);
}

// NOLINTNEXTLINE(readability-function-cognitive-complexity)
TEST(DatabaseTest, shouldReturnCorrectDatabaseInfo) {
auto database{buildTestDatabase()};
Expand Down
96 changes: 89 additions & 7 deletions src/silo/prepare_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ std::unordered_map<std::string, silo::preprocessing::PartitionChunk> writeMetada
std::unordered_map<
silo::preprocessing::PartitionChunk,
std::unique_ptr<silo::preprocessing::MetadataWriter>>& chunk_to_metadata_writers,
const silo::config::DatabaseConfig& database_config
const std::string& primary_key_field,
const std::string& partition_by_field
) {
std::unordered_map<std::string, silo::preprocessing::PartitionChunk>
primary_key_to_sequence_partition_chunk;
for (auto& row : metadata_reader) {
std::this_thread::sleep_for(std::chrono::nanoseconds(2));
const std::string_view primary_key = row[database_config.schema.primary_key].get_sv();
const std::string_view primary_key = row[primary_key_field].get_sv();
const silo::common::RawPangoLineage raw_pango_lineage{
row[database_config.schema.partition_by].get<std::string>()};
row[partition_by_field].get<std::string>()};
const silo::common::UnaliasedPangoLineage pango_lineage =
alias_key.unaliasPangoLineage(raw_pango_lineage);

Expand All @@ -105,7 +106,8 @@ std::unordered_map<std::string, silo::preprocessing::PartitionChunk> partitionMe
metadata_partition_filenames,
const std::unordered_map<std::string, silo::preprocessing::PartitionChunk>& pango_to_chunk,
const silo::PangoLineageAliasLookup& alias_key,
const silo::config::DatabaseConfig& database_config
const std::string& primary_key_field,
const std::string& partition_by_field
) {
SPDLOG_INFO("partitioning metadata file {}", metadata_filename.string());

Expand All @@ -115,10 +117,27 @@ std::unordered_map<std::string, silo::preprocessing::PartitionChunk> partitionMe
getMetadataWritersForChunks(metadata_partition_filenames, metadata_reader.reader);

return writeMetadataChunks(
alias_key, pango_to_chunk, metadata_reader.reader, chunk_to_metadata_writers, database_config
alias_key,
pango_to_chunk,
metadata_reader.reader,
chunk_to_metadata_writers,
primary_key_field,
partition_by_field
);
}

void copyMetadataFile(
csv::CSVReader& metadata_reader,
silo::preprocessing::MetadataWriter& metadata_writer,
const std::string& primary_key_field
) {
metadata_writer.writeHeader(metadata_reader);
for (auto& row : metadata_reader) {
std::this_thread::sleep_for(std::chrono::nanoseconds(2));
metadata_writer.writeRow(row);
}
}

std::unordered_map<silo::preprocessing::PartitionChunk, silo::ZstdFastaWriter>
getSequenceWritersForChunks(
const std::unordered_map<silo::preprocessing::PartitionChunk, std::filesystem::path>&
Expand Down Expand Up @@ -173,11 +192,24 @@ void partitionSequenceFile(
writeSequenceChunks(sequence_in, key_to_chunk, chunk_to_seq_writer);
}

void copySequenceFile(silo::FastaReader& sequence_in, silo::ZstdFastaWriter& sequence_out) {
std::optional<std::string> key;
std::string genome;
while (true) {
key = sequence_in.next(genome);
if (!key.has_value()) {
break;
}
sequence_out.write(*key, genome);
}
}

void silo::partitionData(
const preprocessing::PreprocessingConfig& preprocessing_config,
const preprocessing::Partitions& partitions,
const PangoLineageAliasLookup& alias_key,
const silo::config::DatabaseConfig& database_config,
const std::string& primary_key_field,
const std::string& partition_by_field,
const ReferenceGenomes& reference_genomes
) {
const std::filesystem::path metadata_filename = preprocessing_config.getMetadataInputFilename();
Expand All @@ -190,7 +222,8 @@ void silo::partitionData(
metadata_partition_filenames,
partitions.getPangoToChunk(),
alias_key,
database_config
primary_key_field,
partition_by_field
);

for (const auto& [nuc_name, reference_genome] : reference_genomes.raw_nucleotide_sequences) {
Expand Down Expand Up @@ -227,6 +260,55 @@ void silo::partitionData(
SPDLOG_INFO("Finished partitioning");
}

void silo::copyDataToPartitionDirectory(
const preprocessing::PreprocessingConfig& preprocessing_config,
const std::string& primary_key_field,
const ReferenceGenomes& reference_genomes
) {
const std::filesystem::path metadata_filename = preprocessing_config.getMetadataInputFilename();
auto metadata_reader = preprocessing::MetadataReader(metadata_filename);

auto metadata_partition_filename = preprocessing_config.getMetadataPartitionFilename(0, 0);
auto metadata_writer = preprocessing::MetadataWriter(metadata_partition_filename);

copyMetadataFile(metadata_reader.reader, metadata_writer, primary_key_field);

for (const auto& [nuc_name, reference_genome] : reference_genomes.raw_nucleotide_sequences) {
const std::filesystem::path sequence_in_filename =
preprocessing_config.getNucFilename(nuc_name);
FastaReader sequence_in(sequence_in_filename);

auto sequence_out_filename = preprocessing_config.getNucPartitionFilename(nuc_name, 0, 0);
ZstdFastaWriter sequence_out(sequence_out_filename, reference_genome);

SPDLOG_INFO(
"copying and compressing nucleotide sequences from {} to {}",
sequence_in_filename.string(),
sequence_out_filename.string()
);

copySequenceFile(sequence_in, sequence_out);
}

for (const auto& [aa_name, reference_sequence] : reference_genomes.raw_aa_sequences) {
const std::filesystem::path sequence_in_filename =
preprocessing_config.getGeneFilename(aa_name);
FastaReader sequence_in(sequence_in_filename);

auto sequence_out_filename = preprocessing_config.getGenePartitionFilename(aa_name, 0, 0);
ZstdFastaWriter sequence_out(sequence_out_filename, reference_sequence);

SPDLOG_INFO(
"copying and compressing amino acid sequences from {} to {}",
sequence_in_filename.string(),
sequence_out_filename.string()
);

copySequenceFile(sequence_in, sequence_out);
}
SPDLOG_INFO("Finished copying to partition directory");
}

std::unordered_map<std::string, size_t> sortMetadataFile(
silo::preprocessing::MetadataReader& metadata_reader,
silo::preprocessing::MetadataWriter& metadata_writer,
Expand Down
Loading

0 comments on commit 3942418

Please sign in to comment.