Skip to content

Commit

Permalink
Aggregates: use tile metadata.
Browse files Browse the repository at this point in the history
This allows the readers to use the tile metadata to do aggregation. The tiles still get loaded into memory even when not required but that will be addressed in a follow up change.

---
TYPE: IMPROVEMENT
DESC: Aggregates: use tile metadata.
  • Loading branch information
KiterLuc committed Oct 28, 2023
1 parent f04b453 commit 4f3d499
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 96 deletions.
2 changes: 1 addition & 1 deletion format_spec/FORMAT_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Format Specification

**Notes:**

* The current TileDB format version number is **20** (`uint32_t`).
* The current TileDB format version number is **21** (`uint32_t`).
* Data written by TileDB and referenced in this document is **little-endian**
with the following exceptions:

Expand Down
6 changes: 3 additions & 3 deletions test/src/test-cppapi-aggregates.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,9 @@ void CppAggregatesFx<T>::create_array_and_write_fragments() {
write_sparse({0, 1, 2, 3}, {1, 1, 1, 2}, {1, 2, 4, 3}, 1, validity_values);
write_sparse({4, 5, 6, 7}, {2, 2, 3, 3}, {2, 4, 2, 3}, 3, validity_values);
write_sparse(
{8, 9, 10, 11}, {2, 1, 3, 4}, {1, 3, 1, 1}, 4, validity_values);
write_sparse(
{12, 13, 14, 15}, {4, 3, 3, 4}, {2, 3, 4, 4}, 6, validity_values);
{8, 9, 10, 11}, {2, 1, 3, 4}, {1, 3, 1, 1}, 5, validity_values);
write_sparse({12, 13}, {4, 3}, {2, 3}, 7, validity_values);
write_sparse({14, 15}, {3, 4}, {4, 4}, 9, validity_values);
}
}

Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/misc/constants.cc
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ const int32_t library_version[3] = {
TILEDB_VERSION_MAJOR, TILEDB_VERSION_MINOR, TILEDB_VERSION_PATCH};

/** The TileDB serialization base format version number. */
const format_version_t base_format_version = 20;
const format_version_t base_format_version = 21;

/**
* The TileDB serialization format version number.
Expand Down
96 changes: 50 additions & 46 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ Status DenseReader::dense_read() {
read_state_.partitioner_.subarray().relevant_fragments(), var_names);
load_tile_offsets(
read_state_.partitioner_.subarray().relevant_fragments(), names);
load_tile_metadata(
read_state_.partitioner_.subarray().relevant_fragments(), names);

uint64_t t_start = 0;
uint64_t t_end = 0;
Expand Down Expand Up @@ -756,15 +758,12 @@ tuple<uint64_t, std::vector<ResultTile*>> DenseReader::compute_result_tiles(
bool done = false;
while (!done && t_end < tile_coords.size()) {
const DimType* tc = (DimType*)&tile_coords[t_end][0];
auto it = result_space_tiles.find(tc);
if (it == result_space_tiles.end()) {
throw DenseReaderStatusException("Tile coordinates not found");
}
auto& result_space_tile = result_space_tiles.at(tc);

// Compute the required memory to load the query condition tiles for the
// current result space tile.
uint64_t condition_memory = 0;
for (const auto& result_tile : it->second.result_tiles()) {
for (const auto& result_tile : result_space_tile.result_tiles()) {
auto& rt = result_tile.second;
for (uint64_t n = 0; n < condition_names.size(); n++) {
condition_memory +=
Expand All @@ -785,7 +784,7 @@ tuple<uint64_t, std::vector<ResultTile*>> DenseReader::compute_result_tiles(
// current space tile.
for (uint64_t n = condition_names.size(); n < names.size(); n++) {
uint64_t tile_memory = 0;
for (const auto& result_tile : it->second.result_tiles()) {
for (const auto& result_tile : result_space_tile.result_tiles()) {
auto& rt = result_tile.second;
tile_memory +=
get_attribute_tile_size(names[n], rt.frag_idx(), rt.tile_idx());
Expand All @@ -808,7 +807,7 @@ tuple<uint64_t, std::vector<ResultTile*>> DenseReader::compute_result_tiles(

// Add the result tiles for this space tile to the returned list to
// process.
for (const auto& result_tile : it->second.result_tiles()) {
for (const auto& result_tile : result_space_tile.result_tiles()) {
result_tiles.push_back(const_cast<ResultTile*>(&result_tile.second));
}

Expand Down Expand Up @@ -911,21 +910,17 @@ Status DenseReader::apply_query_condition(
[&](uint64_t t, uint64_t range_thread_idx) {
// Find out result space tile and tile subarray.
const DimType* tc = (DimType*)&tile_coords[t][0];
auto it = result_space_tiles.find(tc);
if (it == result_space_tiles.end()) {
throw DenseReaderStatusException(
"Tile coordinates not found");
}
auto& result_space_tile = result_space_tiles.at(tc);

// Iterate over all coordinates, retrieved in cell slab.
const auto& frag_domains = it->second.frag_domains();
const auto& frag_domains = result_space_tile.frag_domains();
TileCellSlabIter<DimType> iter(
range_thread_idx,
num_range_threads,
subarray,
tile_subarrays[t],
tile_extents,
it->second.start_coords(),
result_space_tile.start_coords(),
range_info,
cell_order);

Expand Down Expand Up @@ -965,13 +960,22 @@ Status DenseReader::apply_query_condition(
*(fragment_metadata_[frag_domains[i].fid()]
->array_schema()
.get()),
it->second.result_tile(frag_domains[i].fid()),
result_space_tile.result_tile(frag_domains[i].fid()),
start,
end - start + 1,
iter.pos_in_tile(),
stride,
iter.cell_slab_coords().data(),
dest_ptr));

// If any cell doesn't match the query condition, signal
// it in the space tile.
for (uint64_t c = start; c <= end; c++) {
if (dest_ptr[c] == 0) {
result_space_tile.set_qc_filtered_results();
break;
}
}
}
}

Expand Down Expand Up @@ -1085,16 +1089,13 @@ Status DenseReader::copy_attribute(
[&](uint64_t t, uint64_t range_thread_idx) {
// Find out result space tile and tile subarray.
const DimType* tc = (DimType*)&tile_coords[t][0];
auto it = result_space_tiles.find(tc);
if (it == result_space_tiles.end()) {
throw DenseReaderStatusException("Tile coordinates not found");
}
auto& result_space_tile = result_space_tiles.at(tc);

// Copy the tile offsets.
return copy_offset_tiles<DimType, OffType>(
name,
tile_extents,
it->second,
result_space_tile,
subarray,
tile_subarrays[t],
subarray_start_cell,
Expand Down Expand Up @@ -1148,15 +1149,12 @@ Status DenseReader::copy_attribute(
[&](uint64_t t, uint64_t range_thread_idx) {
// Find out result space tile and tile subarray.
const DimType* tc = (DimType*)&tile_coords[t][0];
auto it = result_space_tiles.find(tc);
if (it == result_space_tiles.end()) {
throw DenseReaderStatusException("Tile coordinates not found");
}
auto& result_space_tile = result_space_tiles.at(tc);

return copy_var_tiles<DimType, OffType>(
name,
tile_extents,
it->second,
result_space_tile,
subarray,
tile_subarrays[t],
subarray_start_cell,
Expand Down Expand Up @@ -1193,16 +1191,13 @@ Status DenseReader::copy_attribute(
[&](uint64_t t, uint64_t range_thread_idx) {
// Find out result space tile and tile subarray.
const DimType* tc = (DimType*)&tile_coords[t][0];
auto it = result_space_tiles.find(tc);
if (it == result_space_tiles.end()) {
throw DenseReaderStatusException("Tile coordinates not found");
}
auto& result_space_tile = result_space_tiles.at(tc);

// Copy the tile fixed values.
RETURN_NOT_OK(copy_fixed_tiles(
name,
tile_extents,
it->second,
result_space_tile,
subarray,
tile_subarrays[t],
global_order ? tile_offsets[t] : 0,
Expand Down Expand Up @@ -1298,24 +1293,33 @@ Status DenseReader::process_aggregates(
[&](uint64_t t, uint64_t range_thread_idx) {
// Find out result space tile and tile subarray.
const DimType* tc = (DimType*)&tile_coords[t][0];
auto it = result_space_tiles.find(tc);
if (it == result_space_tiles.end()) {
throw DenseReaderStatusException("Tile coordinates not found");
auto& result_space_tile = result_space_tiles.at(tc);
if (can_aggregate_tile_with_frag_md(
name, result_space_tile, tile_subarrays[t])) {
if (range_thread_idx == 0) {
auto& rt = result_space_tile.single_result_tile();
auto tile_idx = rt.tile_idx();
auto& frag_md = fragment_metadata_[rt.frag_idx()];
auto md = frag_md->get_tile_metadata(name, tile_idx);
auto& aggregates = aggregates_[name];
for (auto& aggregate : aggregates) {
aggregate->aggregate_tile_with_frag_md(md);
}
}
} else {
RETURN_NOT_OK(aggregate_tiles(
name,
tile_extents,
result_space_tile,
subarray,
tile_subarrays[t],
global_order ? tile_offsets[t] : 0,
range_info,
aggregate_bitmap,
range_thread_idx,
num_range_threads));
}

// Copy the tile fixed values.
RETURN_NOT_OK(aggregate_tiles(
name,
tile_extents,
it->second,
subarray,
tile_subarrays[t],
global_order ? tile_offsets[t] : 0,
range_info,
aggregate_bitmap,
range_thread_idx,
num_range_threads));

return Status::Ok();
});
RETURN_NOT_OK(status);
Expand Down
48 changes: 48 additions & 0 deletions tiledb/sm/query/readers/dense_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,54 @@ class DenseReader : public ReaderBase, public IQueryStrategy {
ResultTile::TileTuple* tile_tuple,
optional<void*> bitmap_data);

/**
* Returns wether or not we can aggregate the tile with only the fragment
* metadata.
*
* @param name Name of the field to process.
* @param rst Result space tile.
* @param tile_subarray Tile subarray.
* @return If we can do the aggregation with the frag md or not.
*/
template <class DimType>
inline bool can_aggregate_tile_with_frag_md(
const std::string& name,
ResultSpaceTile<DimType>& rst,
const Subarray& tile_subarray) {
// Make sure there are no filtered results by the query condition and that
// there are only one fragment domain for this tile. Having more fragment
// domains for a tile means we'll have to merge data for many sources so we
// cannot aggregate the full tile.
if (rst.qc_filtered_results() || rst.frag_domains().size() != 1) {
return false;
}

// Now we can get the fragment metadata of the only result tile in this
// space tile.
const auto& rt = rst.single_result_tile();
const auto frag_md = fragment_metadata_[rt.frag_idx()];

// Make sure this tile isn't cropped by ranges and the fragment metadata has
// tile metadata.
if (tile_subarray.cell_num() != rt.cell_num() ||
frag_md->has_tile_metadata()) {
return false;
}

// Fixed size nullable strings had incorrect min/max metadata until
// version 20.
const auto type = array_schema_.type(name);
if ((type == Datatype::STRING_ASCII || type == Datatype::CHAR) &&
array_schema_.cell_val_num(name) != constants::var_num &&
array_schema_.is_nullable(name)) {
if (frag_md->version() <= 20) {
return false;
}
}

return true;
}

/** Process aggregates for a given field. */
template <class DimType, class OffType>
Status process_aggregates(
Expand Down
38 changes: 38 additions & 0 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,44 @@ void ReaderBase::load_tile_var_sizes(
}));
}

void ReaderBase::load_tile_metadata(
const RelevantFragments& relevant_fragments,
const std::vector<std::string>& names) {
auto timer_se = stats_->start_timer("load_tile_metadata");
const auto encryption_key = array_->encryption_key();

throw_if_not_ok(parallel_for(
storage_manager_->compute_tp(),
0,
relevant_fragments.size(),
[&](const uint64_t i) {
auto frag_idx = relevant_fragments[i];
auto& fragment = fragment_metadata_[frag_idx];

// Generate the list of name with aggregates.
const auto& schema = fragment->array_schema();
std::vector<std::string> to_load;
for (auto& n : names) {
// Not a member of array schema, this field was added in array
// schema evolution, ignore for this fragment's tile metadata.
if (!schema->is_field(n)) {
continue;
}

if (aggregates_.count(n) != 0) {
to_load.emplace_back(n);
}
}

fragment->load_tile_max_values(*encryption_key, to_load);
fragment->load_tile_min_values(*encryption_key, to_load);
fragment->load_tile_sum_values(*encryption_key, to_load);
fragment->load_tile_null_count_values(*encryption_key, to_load);

return Status::Ok();
}));
}

void ReaderBase::load_processed_conditions() {
auto timer_se = stats_->start_timer("load_processed_conditions");
const auto encryption_key = array_->encryption_key();
Expand Down
12 changes: 12 additions & 0 deletions tiledb/sm/query/readers/reader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@ class ReaderBase : public StrategyBase {
const RelevantFragments& relevant_fragments,
const std::vector<std::string>& names);

/*
* Loads tile metadata for each attribute/dimension name into
* their associated element in `fragment_metadata_`. This is done for
* attributes with aggregates.
*
* @param relevant_fragments List of relevant fragments.
* @param names The attribute/dimension names.
*/
void load_tile_metadata(
const RelevantFragments& relevant_fragments,
const std::vector<std::string>& names);

/**
* Loads processed conditions from fragment metadata.
*
Expand Down
30 changes: 30 additions & 0 deletions tiledb/sm/query/readers/result_space_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,31 @@ class ResultSpaceTile {
result_tiles_ == rst.result_tiles_;
}

/** The query condition filtered a result for this tile. */
inline void set_qc_filtered_results() {
qc_filtered_results_ = true;
}

/** Returns if the query condition filtered any results for this tile or not.
*/
inline bool qc_filtered_results() const {
return qc_filtered_results_;
}

/**
* Returns the only result tile in this space tile or throws if there are more
* than one.
*/
inline ResultTile& single_result_tile() {
if (result_tiles_.size() != 1) {
throw std::runtime_error(
"Shouldn't call single_result_tile on tiles with more than one "
"fragment domain.");
}

return result_tiles_[frag_domains_[0].fid()];
}

private:
/** The (global) coordinates of the first cell in the space tile. */
std::vector<T> start_coords_;
Expand All @@ -168,6 +193,11 @@ class ResultSpaceTile {
* `(fragment id) -> (result tile)`.
*/
std::map<unsigned, ResultTile> result_tiles_;

/**
* Did the query condition filter any result for this space tile.
*/
bool qc_filtered_results_ = false;
};

} // namespace sm
Expand Down
Loading

0 comments on commit 4f3d499

Please sign in to comment.