Skip to content

Commit

Permalink
refactor: reading of column metadata from duckdb in parallel (#483)
Browse files Browse the repository at this point in the history
Co-authored-by: David Gichev <[email protected]>
  • Loading branch information
davidgicev and David Gichev authored Jun 20, 2024
1 parent 388ede7 commit b971df0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 42 deletions.
8 changes: 2 additions & 6 deletions include/silo/common/table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class TableReader {
std::string order_by_clause;
std::unique_ptr<duckdb::MaterializedQueryResult> query_result;
std::unique_ptr<duckdb::DataChunk> current_chunk;
size_t current_row;
size_t current_row_in_chunk;
size_t current_start_of_chunk = 0;
size_t current_chunk_size = 0;

public:
explicit TableReader(
Expand All @@ -51,12 +51,8 @@ class TableReader {
size_t read();

private:
std::optional<std::string> nextKey();

std::string getTableQuery();

void advanceRow();

void loadTable();
};
} // namespace silo
63 changes: 29 additions & 34 deletions src/silo/common/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include <fmt/format.h>
#include <oneapi/tbb/parallel_for.h>
#include <spdlog/spdlog.h>
#include <duckdb.hpp>

Expand Down Expand Up @@ -33,25 +34,38 @@ silo::TableReader::TableReader(
where_clause(where_clause),
order_by_clause(order_by_clause) {}

std::optional<std::string> silo::TableReader::nextKey() {
if (!current_chunk) {
return std::nullopt;
}

return current_chunk->GetValue(0, current_row_in_chunk).GetValue<std::string>();
}

size_t silo::TableReader::read() {
loadTable();
assert(query_result->ColumnCount() == column_functions.size() + 1);
while (nextKey()) {
for (size_t column_idx = 0; column_idx < column_functions.size(); column_idx++) {
column_functions.at(column_idx)
.function(current_row, current_chunk->GetValue(column_idx + 1, current_row_in_chunk));
while (true) {
current_chunk = query_result->Fetch();
if (!current_chunk) {
break;
}
advanceRow();
}
return current_row;
if (current_chunk->size() == 0) {
continue;
}
current_chunk_size = current_chunk->size();

tbb::parallel_for(
tbb::blocked_range<uint32_t>(0, column_functions.size()),
[&](const auto& local) {
for (size_t column_idx = local.begin(); column_idx != local.end(); column_idx++) {
auto& column = current_chunk->data[column_idx + 1];
for (size_t row_in_chunk = 0; row_in_chunk < current_chunk_size; row_in_chunk++) {
column_functions.at(column_idx)
.function(
current_start_of_chunk + row_in_chunk,
column.GetValue(current_start_of_chunk + row_in_chunk)
);
}
}
}
);
current_start_of_chunk += current_chunk_size;
};

return current_start_of_chunk;
}

std::string silo::TableReader::getTableQuery() {
Expand Down Expand Up @@ -87,23 +101,4 @@ void silo::TableReader::loadTable() {
"Error when executing SQL " + query_result->GetError()
);
}
current_chunk = query_result->Fetch();
current_row = 0;
current_row_in_chunk = 0;

while (current_chunk && current_chunk->size() == 0) {
current_chunk = query_result->Fetch();
}
}

void silo::TableReader::advanceRow() {
current_row++;
current_row_in_chunk++;
if (current_row_in_chunk == current_chunk->size()) {
current_row_in_chunk = 0;
current_chunk = query_result->Fetch();
while (current_chunk && current_chunk->size() == 0) {
current_chunk = query_result->Fetch();
}
}
}
14 changes: 12 additions & 2 deletions src/silo/preprocessing/preprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,18 @@ void Preprocessor::buildMetadataStore(
fmt::format("partition_id = {}", partition_id),
order_by_clause
);
const size_t number_of_rows = table_reader.read();
database.partitions.at(partition_id).sequence_count += number_of_rows;

int64_t fill_time;
{
const silo::common::BlockTimer timer(fill_time);
const size_t number_of_rows = table_reader.read();
database.partitions.at(partition_id).sequence_count += number_of_rows;
}
SPDLOG_DEBUG(
"build - finished fill columns for partition {} in {} microseconds",
partition_id,
fill_time
);
SPDLOG_INFO("build - finished columns for partition {}", partition_id);
}
}
Expand Down

0 comments on commit b971df0

Please sign in to comment.