Skip to content

Commit

Permalink
Some more PR cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Dec 19, 2024
1 parent 0a97612 commit ed9b334
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 23 deletions.
12 changes: 7 additions & 5 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,13 @@ TEST_CASE_METHOD(
if (one_frag) {
CHECK(1 == loop_num->second);
}

// FIXME: This check has become unpredictable, see why the loop number is
// not consistent } else {
// CHECK(20 == loop_num->second);
// }
/**
* FIXME: The loop_num appears to be different on different architectures/build modes.
* SC-61065 to investigate why.
* } else {
* CHECK(20 == loop_num->second);
* }
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/query/readers/filtered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class FilteredData {
auto timer_se = stats_->start_timer("read");
return resources_.vfs().read(uri, offset, data, size, false);
});
// This should be changes once we use taskgraphs for modeling the data flow
// This should be changed once we use taskgraphs for modeling the data flow
block.set_io_task(task);
}

Expand Down
2 changes: 2 additions & 0 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ Status ReaderBase::unfilter_tiles(
return Status::Ok();
}

// The current threadpool design does not allow for unfiltering to happen in chunks using a parallel for within this async task as the wait_all in the end of the parallel for can deadlock.
for (uint64_t range_thread_idx = 0;
range_thread_idx < num_range_threads;
range_thread_idx++) {
Expand Down Expand Up @@ -998,6 +999,7 @@ Status ReaderBase::unfilter_tiles(
continue;
}

// Unfiltering tasks have been launched, set the tasks to wait for in the corresponding tiles. When those tasks(futures) will be ready the tile processing that depends on the unfiltered tile will get unblocked.
auto tile_tuple = result_tile->tile_tuple(name);
tile_tuple->fixed_tile().set_unfilter_data_compute_task(task);

Expand Down
25 changes: 14 additions & 11 deletions tiledb/sm/query/readers/result_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,20 @@ class ResultTile {
}

~TileData() {
// TODO: destructor should not throw, catch any exceptions
if (fixed_filtered_data_task_.valid()) {
auto st = fixed_filtered_data_task_.wait();
}

if (var_filtered_data_task_.valid()) {
auto st = var_filtered_data_task_.wait();
}

if (validity_filtered_data_task_.valid()) {
auto st = validity_filtered_data_task_.wait();
try {
if (fixed_filtered_data_task_.valid()) {
auto st = fixed_filtered_data_task_.wait();
}

if (var_filtered_data_task_.valid()) {
auto st = var_filtered_data_task_.wait();
}

if (validity_filtered_data_task_.valid()) {
auto st = validity_filtered_data_task_.wait();
}
} catch (...) {
return;
}
}

Expand Down
1 change: 0 additions & 1 deletion tiledb/sm/tile/tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include "tiledb/sm/tile/tile.h"

#include <utility>
#include "tiledb/common/exception/exception.h"
#include "tiledb/common/heap_memory.h"
#include "tiledb/common/memory_tracker.h"
Expand Down
15 changes: 10 additions & 5 deletions tiledb/sm/tile/tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ class TileBase {
return static_cast<T*>(data());
}

/** Converts the data pointer to a specific type with no check on compute
/**
* Converts the data pointer to a specific type with no check on compute
* task. This is used for getting thte data from inside the compute thread
* itself for unfiltering. */
* itself for unfiltering.
*/
template <class T>
inline T* data_as_unsafe() const {
return static_cast<T*>(data_unsafe());
Expand All @@ -134,8 +136,10 @@ class TileBase {
return data_.get();
}

/** Returns the internal buffer. This is used for getting thte data from
* inside the compute thread itself for unfiltering. */
/**
* Returns the internal buffer. This is used for getting thte data from
* inside the compute thread itself for unfiltering.
*/
inline void* data_unsafe() const {
return data_.get();
}
Expand Down Expand Up @@ -198,7 +202,8 @@ class TileBase {
/** The tile data type. */
Datatype type_;

/** Whether to block waiting for io data to be ready before accessing data()
/**
* Whether to block waiting for io data to be ready before accessing data()
*/
const bool skip_waiting_on_io_task_;

Expand Down

0 comments on commit ed9b334

Please sign in to comment.