Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP just for testing CI #5363

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ void set_result_tile_dim(
std::nullopt,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, std::nullopt},
{nullptr, std::nullopt},
{nullptr, std::nullopt}};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
20 changes: 16 additions & 4 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, std::nullopt},
{nullptr, std::nullopt},
{nullptr, std::nullopt}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -230,7 +233,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, std::nullopt},
{nullptr, std::nullopt},
{nullptr, std::nullopt}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -326,7 +332,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, std::nullopt},
{nullptr, std::nullopt},
{nullptr, std::nullopt}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -343,7 +352,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, std::nullopt},
{nullptr, std::nullopt},
{nullptr, std::nullopt}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
1 change: 1 addition & 0 deletions tiledb/common/thread_pool/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace tiledb::common {
class ThreadPool {
public:
using Task = std::future<Status>;
using SharedTask = std::shared_future<Status>;

/* ********************************* */
/* CONSTRUCTORS & DESTRUCTORS */
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/filter_test_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ Tile create_tile_for_unfiltering(
tile->cell_size() * nelts,
tile->filtered_buffer().data(),
tile->filtered_buffer().size(),
tracker};
tracker,
nullptr};
}

void run_reverse(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/tile_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class TileDataGenerator {
original_tile_size(),
filtered_buffer.data(),
filtered_buffer.size(),
memory_tracker);
memory_tracker,
nullptr);
}

/** Returns the size of the original unfiltered data. */
Expand Down
9 changes: 6 additions & 3 deletions tiledb/sm/metadata/test/unit_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ TEST_CASE(
tile1->size(),
tile1->filtered_buffer().data(),
tile1->filtered_buffer().size(),
tracker);
tracker,
nullptr);
memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size());

metadata_tiles[1] = tdb::make_shared<Tile>(
Expand All @@ -135,7 +136,8 @@ TEST_CASE(
tile2->size(),
tile2->filtered_buffer().data(),
tile2->filtered_buffer().size(),
tracker);
tracker,
nullptr);
memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size());

metadata_tiles[2] = tdb::make_shared<Tile>(
Expand All @@ -147,7 +149,8 @@ TEST_CASE(
tile3->size(),
tile3->filtered_buffer().data(),
tile3->filtered_buffer().size(),
tracker);
tracker,
nullptr);
memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size());

meta = Metadata::deserialize(metadata_tiles);
Expand Down
47 changes: 30 additions & 17 deletions tiledb/sm/query/readers/filtered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ class FilteredDataBlock {
offset + size <= offset_ + size_;
}

void set_io_task(ThreadPool::SharedTask task) {
io_task_ = std::move(task);
}

ThreadPool::SharedTask io_task() {
return io_task_;
}

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand All @@ -139,6 +147,9 @@ class FilteredDataBlock {

/** Data for the data block. */
tdb::pmr::unique_ptr<std::byte> filtered_data_;

/** IO Task to block on for data access. */
ThreadPool::SharedTask io_task_;
};

/**
Expand Down Expand Up @@ -187,7 +198,6 @@ class FilteredData {
const bool var_sized,
const bool nullable,
const bool validity_only,
std::vector<ThreadPool::Task>& read_tasks,
shared_ptr<MemoryTracker> memory_tracker)
: resources_(resources)
, memory_tracker_(memory_tracker)
Expand All @@ -200,8 +210,7 @@ class FilteredData {
, name_(name)
, fragment_metadata_(fragment_metadata)
, var_sized_(var_sized)
, nullable_(nullable)
, read_tasks_(read_tasks) {
, nullable_(nullable) {
if (result_tiles.size() == 0) {
return;
}
Expand Down Expand Up @@ -325,12 +334,14 @@ class FilteredData {
* @param rt Result tile.
* @return Fixed filtered data pointer.
*/
inline void* fixed_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
inline std::tuple<void*, std::optional<ThreadPool::SharedTask>>
fixed_filtered_data(const FragmentMetadata* fragment, const ResultTile* rt) {
auto offset{
fragment->loaded_metadata()->file_offset(name_, rt->tile_idx())};
ensure_data_block_current(TileType::FIXED, fragment, rt, offset);
return current_data_block(TileType::FIXED)->data_at(offset);
return {
current_data_block(TileType::FIXED)->data_at(offset),
current_data_block(TileType::FIXED)->io_task()};
}

/**
Expand All @@ -340,16 +351,18 @@ class FilteredData {
* @param rt Result tile.
* @return Var filtered data pointer.
*/
inline void* var_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
inline std::tuple<void*, std::optional<ThreadPool::SharedTask>>
var_filtered_data(const FragmentMetadata* fragment, const ResultTile* rt) {
if (!var_sized_) {
return nullptr;
return {nullptr, std::nullopt};
}

auto offset{
fragment->loaded_metadata()->file_var_offset(name_, rt->tile_idx())};
ensure_data_block_current(TileType::VAR, fragment, rt, offset);
return current_data_block(TileType::VAR)->data_at(offset);
return {
current_data_block(TileType::VAR)->data_at(offset),
current_data_block(TileType::VAR)->io_task()};
}

/**
Expand All @@ -359,16 +372,19 @@ class FilteredData {
* @param rt Result tile.
* @return Nullable filtered data pointer.
*/
inline void* nullable_filtered_data(
inline std::tuple<void*, std::optional<ThreadPool::SharedTask>>
nullable_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
if (!nullable_) {
return nullptr;
return {nullptr, std::nullopt};
}

auto offset{fragment->loaded_metadata()->file_validity_offset(
name_, rt->tile_idx())};
ensure_data_block_current(TileType::NULLABLE, fragment, rt, offset);
return current_data_block(TileType::NULLABLE)->data_at(offset);
return {
current_data_block(TileType::NULLABLE)->data_at(offset),
current_data_block(TileType::NULLABLE)->io_task()};
}

private:
Expand Down Expand Up @@ -398,7 +414,7 @@ class FilteredData {
throw_if_not_ok(resources_.vfs().read(uri, offset, data, size, false));
return Status::Ok();
});
read_tasks_.push_back(std::move(task));
block.set_io_task(task.share());
}

/** @return Data blocks corresponding to the tile type. */
Expand Down Expand Up @@ -634,9 +650,6 @@ class FilteredData {

/** Is the attribute nullable? */
const bool nullable_;

/** Read tasks. */
std::vector<ThreadPool::Task>& read_tasks_;
};

} // namespace tiledb::sm
Expand Down
14 changes: 4 additions & 10 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ std::list<FilteredData> ReaderBase::read_tiles(
}

uint64_t num_tiles_read{0};
std::vector<ThreadPool::Task> read_tasks;

// Run all attributes independently.
for (auto n : names) {
Expand All @@ -721,7 +720,6 @@ std::list<FilteredData> ReaderBase::read_tiles(
var_sized,
nullable,
val_only,
read_tasks,
memory_tracker_);

// Go through each tiles and create the attribute tiles.
Expand Down Expand Up @@ -749,12 +747,14 @@ std::list<FilteredData> ReaderBase::read_tiles(
// 'TileData' objects should be returned by this function and passed into
// 'unfilter_tiles' so that the filter pipeline can stop using the
// 'ResultTile' object to get access to the filtered data.
std::tuple<void*, std::optional<ThreadPool::SharedTask>> n = {
nullptr, std::nullopt};
ResultTile::TileData tile_data{
val_only ?
nullptr :
n :
filtered_data.back().fixed_filtered_data(fragment.get(), tile),
val_only ?
nullptr :
n :
filtered_data.back().var_filtered_data(fragment.get(), tile),
filtered_data.back().nullable_filtered_data(fragment.get(), tile)};

Expand All @@ -779,12 +779,6 @@ std::list<FilteredData> ReaderBase::read_tiles(

stats_->add_counter("num_tiles_read", num_tiles_read);

// Wait for the read tasks to finish.
auto statuses{resources_.io_tp().wait_all_status(read_tasks)};
for (const auto& st : statuses) {
throw_if_not_ok(st);
}

return filtered_data;
}

Expand Down
54 changes: 45 additions & 9 deletions tiledb/sm/query/readers/result_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,18 @@ class ResultTile {
/* CONSTRUCTORS & DESTRUCTORS */
/* ********************************* */
TileData(
void* fixed_filtered_data,
void* var_filtered_data,
void* validity_filtered_data)
: fixed_filtered_data_(fixed_filtered_data)
, var_filtered_data_(var_filtered_data)
, validity_filtered_data_(validity_filtered_data) {
std::tuple<void*, std::optional<ThreadPool::SharedTask>>
fixed_filtered_data,
std::tuple<void*, std::optional<ThreadPool::SharedTask>>
var_filtered_data,
std::tuple<void*, std::optional<ThreadPool::SharedTask>>
validity_filtered_data)
: fixed_filtered_data_(std::get<0>(fixed_filtered_data))
, var_filtered_data_(std::get<0>(var_filtered_data))
, validity_filtered_data_(std::get<0>(validity_filtered_data))
, fixed_filtered_data_task_(std::get<1>(fixed_filtered_data))
, var_filtered_data_task_(std::get<1>(var_filtered_data))
, validity_filtered_data_task_(std::get<1>(validity_filtered_data)) {
}

/* ********************************* */
Expand All @@ -240,6 +246,24 @@ class ResultTile {
return validity_filtered_data_;
}

/** @return The fixed filtered data I/O task. */
inline std::optional<ThreadPool::SharedTask> fixed_filtered_data_task()
const {
return fixed_filtered_data_task_;
}

/** @return The var filtered data I/O task. */
inline std::optional<ThreadPool::SharedTask> var_filtered_data_task()
const {
return var_filtered_data_task_;
}

/** @return The validity filtered data I/O task. */
inline std::optional<ThreadPool::SharedTask> validity_filtered_data_task()
const {
return validity_filtered_data_task_;
}

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand All @@ -253,6 +277,15 @@ class ResultTile {

/** Stores the validity filtered data pointer. */
void* validity_filtered_data_;

/** Stores the fixed filtered data I/O task. */
std::optional<ThreadPool::SharedTask> fixed_filtered_data_task_;

/** Stores the var filtered data I/O task. */
std::optional<ThreadPool::SharedTask> var_filtered_data_task_;

/** Stores the validity filtered data I/O task. */
std::optional<ThreadPool::SharedTask> validity_filtered_data_task_;
};

/**
Expand Down Expand Up @@ -286,7 +319,8 @@ class ResultTile {
tile_sizes.tile_size(),
tile_data.fixed_filtered_data(),
tile_sizes.tile_persisted_size(),
memory_tracker_) {
memory_tracker_,
tile_data.fixed_filtered_data_task()) {
if (tile_sizes.has_var_tile()) {
auto type = array_schema.type(name);
var_tile_.emplace(
Expand All @@ -297,7 +331,8 @@ class ResultTile {
tile_sizes.tile_var_size(),
tile_data.var_filtered_data(),
tile_sizes.tile_var_persisted_size(),
memory_tracker_);
memory_tracker_,
tile_data.var_filtered_data_task());
}

if (tile_sizes.has_validity_tile()) {
Expand All @@ -309,7 +344,8 @@ class ResultTile {
tile_sizes.tile_validity_size(),
tile_data.validity_filtered_data(),
tile_sizes.tile_validity_persisted_size(),
memory_tracker_);
memory_tracker_,
tile_data.validity_filtered_data_task());
}
}

Expand Down
Loading
Loading