diff --git a/include/silo/common/table_reader.h b/include/silo/common/table_reader.h index 12763b78a..380478318 100644 --- a/include/silo/common/table_reader.h +++ b/include/silo/common/table_reader.h @@ -35,8 +35,8 @@ class TableReader { std::string order_by_clause; std::unique_ptr query_result; std::unique_ptr current_chunk; - size_t current_row; - size_t current_row_in_chunk; + size_t current_start_of_chunk = 0; + size_t current_chunk_size = 0; public: explicit TableReader( @@ -51,12 +51,8 @@ class TableReader { size_t read(); private: - std::optional nextKey(); - std::string getTableQuery(); - void advanceRow(); - void loadTable(); }; } // namespace silo diff --git a/src/silo/common/table_reader.cpp b/src/silo/common/table_reader.cpp index 9c4848947..8002d5630 100644 --- a/src/silo/common/table_reader.cpp +++ b/src/silo/common/table_reader.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -33,25 +34,38 @@ silo::TableReader::TableReader( where_clause(where_clause), order_by_clause(order_by_clause) {} -std::optional silo::TableReader::nextKey() { - if (!current_chunk) { - return std::nullopt; - } - - return current_chunk->GetValue(0, current_row_in_chunk).GetValue(); -} - size_t silo::TableReader::read() { loadTable(); assert(query_result->ColumnCount() == column_functions.size() + 1); - while (nextKey()) { - for (size_t column_idx = 0; column_idx < column_functions.size(); column_idx++) { - column_functions.at(column_idx) - .function(current_row, current_chunk->GetValue(column_idx + 1, current_row_in_chunk)); + while (true) { + current_chunk = query_result->Fetch(); + if (!current_chunk) { + break; } - advanceRow(); - } - return current_row; + if (current_chunk->size() == 0) { + continue; + } + current_chunk_size = current_chunk->size(); + + tbb::parallel_for( + tbb::blocked_range(0, column_functions.size()), + [&](const auto& local) { + for (size_t column_idx = local.begin(); column_idx != local.end(); column_idx++) { + auto& column = current_chunk->data[column_idx + 1]; + for (size_t row_in_chunk = 0; row_in_chunk < current_chunk_size; row_in_chunk++) { + column_functions.at(column_idx) + .function( + current_start_of_chunk + row_in_chunk, + column.GetValue(current_start_of_chunk + row_in_chunk) + ); + } + } + } + ); + current_start_of_chunk += current_chunk_size; + }; + + return current_start_of_chunk; } std::string silo::TableReader::getTableQuery() { @@ -87,23 +101,4 @@ void silo::TableReader::loadTable() { "Error when executing SQL " + query_result->GetError() ); } - current_chunk = query_result->Fetch(); - current_row = 0; - current_row_in_chunk = 0; - - while (current_chunk && current_chunk->size() == 0) { - current_chunk = query_result->Fetch(); - } -} - -void silo::TableReader::advanceRow() { - current_row++; - current_row_in_chunk++; - if (current_row_in_chunk == current_chunk->size()) { - current_row_in_chunk = 0; - current_chunk = query_result->Fetch(); - while (current_chunk && current_chunk->size() == 0) { - current_chunk = query_result->Fetch(); - } - } } diff --git a/src/silo/preprocessing/preprocessor.cpp b/src/silo/preprocessing/preprocessor.cpp index 25b95ad64..c027991bc 100644 --- a/src/silo/preprocessing/preprocessor.cpp +++ b/src/silo/preprocessing/preprocessor.cpp @@ -666,8 +666,18 @@ void Preprocessor::buildMetadataStore( fmt::format("partition_id = {}", partition_id), order_by_clause ); - const size_t number_of_rows = table_reader.read(); - database.partitions.at(partition_id).sequence_count += number_of_rows; + + int64_t fill_time; + { + const silo::common::BlockTimer timer(fill_time); + const size_t number_of_rows = table_reader.read(); + database.partitions.at(partition_id).sequence_count += number_of_rows; + } + SPDLOG_DEBUG( + "build - finished fill columns for partition {} in {} microseconds", + partition_id, + fill_time + ); SPDLOG_INFO("build - finished columns for partition {}", partition_id); } }