Skip to content

Commit

Permalink
support consolidation with max_frag_size in Dense
Browse files Browse the repository at this point in the history
  • Loading branch information
DimitrisStaratzis committed Jul 16, 2024
1 parent 91df984 commit 2c04878
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 3 deletions.
92 changes: 92 additions & 0 deletions test/src/unit-cppapi-max-fragment-size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,95 @@ TEST_CASE(

array.close();
}

TEST_CASE(
"Setting max_fragment_size in Dense consolidation",
"[global-order-writer]") {
std::string array_name = "cpp_max_fragment_size_bug";
Context ctx;

auto cleanup = [&]() {
auto obj = Object::object(ctx, array_name);
if (obj.type() == Object::Type::Array) {
Object::remove(ctx, array_name);
}
};

cleanup();

// Remove the array at the end of this test.
ScopedExecutor deferred(cleanup);

// Create an array with exactly 9 tiles and tile extend 1
Domain domain(ctx);
ArraySchema schema(ctx, TILEDB_DENSE);
auto d1 = tiledb::Dimension::create<int32_t>(ctx, "d1", {{0, 2}}, 1);
auto d2 = tiledb::Dimension::create<int32_t>(ctx, "d2", {{0, 2}}, 1);
domain.add_dimension(d1);
domain.add_dimension(d2);

auto a1 = tiledb::Attribute::create<int32_t>(ctx, "a");
schema.add_attribute(a1);

schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
schema.set_domain(domain);

Array::create(array_name, schema);

// Populate array with data from 1 to 9
int value = 0;
for (int i = 0; i < 3; i++) {
Array array(ctx, array_name, TILEDB_WRITE);
Query query(ctx, array);
query.set_layout(TILEDB_ROW_MAJOR);
tiledb::Subarray sub(ctx, array);
sub.set_subarray({i, i, 0, 2});
query.set_subarray(sub);
std::vector<int32_t> data = {++value, ++value, ++value};
query.set_data_buffer("a", data);
query.submit();
array.close();
}

// Read data to validate write and num of fragments.
CHECK(tiledb::test::num_fragments(array_name) == 3);
Array array(ctx, array_name, TILEDB_READ);
const std::vector<int32_t> subarray = {0, 2, 0, 2};
std::vector<int32_t> a(9);
Query query(ctx, array, TILEDB_READ);
query.set_subarray(subarray)
.set_layout(TILEDB_ROW_MAJOR)
.set_data_buffer("a", a);
query.submit();
array.close();

for (int i = 0; i < 9; i++) {
CHECK(a[i] == i + 1);
}

// Consolidate with a size limitation for the fragment. This will result in
// the creation of two new fragments.
tiledb::Config cfg;
cfg.set("sm.consolidation.max_fragment_size", "150");
ctx = Context(cfg);
Array::consolidate(ctx, array_name);
Array::vacuum(ctx, array_name);

// Check that we now have 2 fragments instead of 3
CHECK(tiledb::test::num_fragments(array_name) == 2);

// Read data to validate correctness
Array array2(ctx, array_name, TILEDB_READ);
const std::vector<int32_t> subarray2 = {0, 2, 0, 2};
std::vector<int32_t> a2(9);
Query query2(ctx, array2, TILEDB_READ);
query2.set_subarray(subarray2)
.set_layout(TILEDB_ROW_MAJOR)
.set_data_buffer("a", a2);
query2.submit();
array2.close();

for (int i = 0; i < 9; i++) {
CHECK(a2[i] == i + 1);
}
}
4 changes: 2 additions & 2 deletions tiledb/sm/fragment/fragment_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ void FragmentMetadata::init_domain(const NDRange& non_empty_domain) {

// Sanity check
assert(!non_empty_domain.empty());
assert(non_empty_domain_.empty());
assert(domain_.empty());
// assert(non_empty_domain_.empty()); todo, this might cause problems
// assert(domain_.empty());

// Set non-empty domain for dense arrays (for sparse it will be calculated
// via the MBRs)
Expand Down
57 changes: 56 additions & 1 deletion tiledb/sm/query/writers/global_order_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "tiledb/sm/array_schema/array_schema.h"
#include "tiledb/sm/array_schema/dimension.h"
#include "tiledb/sm/consolidator/consolidator.h"
#include "tiledb/sm/enums/array_type.h"
#include "tiledb/sm/fragment/fragment_metadata.h"
#include "tiledb/sm/misc/comparators.h"
#include "tiledb/sm/misc/hilbert.h"
Expand Down Expand Up @@ -92,7 +93,8 @@ GlobalOrderWriter::GlobalOrderWriter(
fragment_name)
, processed_conditions_(processed_conditions)
, fragment_size_(fragment_size)
, current_fragment_size_(0) {
, current_fragment_size_(0)
, rows_written_(0) {
// Check the layout is global order.
if (layout_ != Layout::GLOBAL_ORDER) {
throw GlobalOrderWriterException(
Expand Down Expand Up @@ -782,6 +784,13 @@ Status GlobalOrderWriter::global_write() {
// Compute the number of tiles that will fit in this fragment.
auto num = num_tiles_to_write(idx, tile_num, tiles);

if (tile_num != num && array_schema_.array_type() == ArrayType::DENSE) {
// if it is a dense array and not all tiles can fit in the current
// fragment then we need to split the domain
NDRange new_nd = ndranges_after_split(num);
frag_meta->init_domain(new_nd);
}

// If we're resuming a fragment write and the first tile doesn't fit into
// the previous fragment, we need to start a new fragment and recalculate
// the number of tiles to write.
Expand Down Expand Up @@ -1423,6 +1432,52 @@ uint64_t GlobalOrderWriter::num_tiles_to_write(
return tile_num - start;
}

uint64_t GlobalOrderWriter::num_tiles_per_row() {
auto dim_num = array_schema_.dim_num();
uint64_t ret = 1;
for (unsigned d = 1; d < dim_num; ++d) {
// skip first dim. todo Explain
auto dim{array_schema_.dimension_ptr(d)};
auto dim_dom = dim->domain();
ret *= dim->domain_range(dim_dom);
}
return ret;
}

NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) {
uint64_t tiles_per_row = num_tiles_per_row();
auto dim_num = array_schema_.dim_num();
NDRange nd;
nd.reserve(dim_num);

if (num % tiles_per_row != 0) {
throw GlobalOrderWriterException(
"This fragment target size is not possible please try something else "); // todo fix
}

// Calculate how many rows we will write in the current fragment
uint64_t rows_to_write = num / tiles_per_row;

// Create the range for the index dim (first).
int start = rows_written_;
int end = start + rows_to_write - 1;
Range range(&start, &end, sizeof(int));
nd.emplace_back(range);

// Use the domain as ranges for the rest of the dims
for (unsigned d = 1; d < dim_num; ++d) {
// begin from second dim
auto dim{array_schema_.dimension_ptr(d)};
auto dim_dom = dim->domain();
nd.emplace_back(dim_dom);
}

// add rows written to the cache
rows_written_ += rows_to_write;

return nd;
}

Status GlobalOrderWriter::start_new_fragment() {
auto frag_meta = global_write_state_->frag_meta_;
auto& uri = frag_meta->fragment_uri();
Expand Down
15 changes: 15 additions & 0 deletions tiledb/sm/query/writers/global_order_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ class GlobalOrderWriter : public WriterBase {
*/
uint64_t current_fragment_size_;

/**
* Counter for the number of rows written. This is used only when the
* consolidation produces more than one fragment in Dense arrays
*/
uint64_t rows_written_;

/* ********************************* */
/* PRIVATE METHODS */
/* ********************************* */
Expand Down Expand Up @@ -385,6 +391,15 @@ class GlobalOrderWriter : public WriterBase {
uint64_t tile_num,
tdb::pmr::unordered_map<std::string, WriterTileTupleVector>& tiles);

/**
* Return the number of tiles a single row can hold
*
* @return Number of tiles.
*/
NDRange ndranges_after_split(uint64_t num);

uint64_t num_tiles_per_row();

/**
* Close the current fragment and start a new one. The closed fragment will
* be added to `frag_uris_to_commit_` so that all fragments in progress can
Expand Down

0 comments on commit 2c04878

Please sign in to comment.