From 2c048784efb0e807ca8fb720813ab8c810cfb074 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 16 Jul 2024 21:46:14 +0300 Subject: [PATCH] support consolidation with max_frag_size in Dense --- test/src/unit-cppapi-max-fragment-size.cc | 92 +++++++++++++++++++ tiledb/sm/fragment/fragment_metadata.cc | 4 +- .../sm/query/writers/global_order_writer.cc | 57 +++++++++++- tiledb/sm/query/writers/global_order_writer.h | 15 +++ 4 files changed, 165 insertions(+), 3 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index b86a2a1937dc..2725f1e58e0c 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -504,3 +504,95 @@ TEST_CASE( array.close(); } + +TEST_CASE( + "Setting max_fragment_size in Dense consolidation", + "[global-order-writer]") { + std::string array_name = "cpp_max_fragment_size_bug"; + Context ctx; + + auto cleanup = [&]() { + auto obj = Object::object(ctx, array_name); + if (obj.type() == Object::Type::Array) { + Object::remove(ctx, array_name); + } + }; + + cleanup(); + + // Remove the array at the end of this test. + ScopedExecutor deferred(cleanup); + + // Create an array with exactly 9 tiles and tile extend 1 + Domain domain(ctx); + ArraySchema schema(ctx, TILEDB_DENSE); + auto d1 = tiledb::Dimension::create(ctx, "d1", {{0, 2}}, 1); + auto d2 = tiledb::Dimension::create(ctx, "d2", {{0, 2}}, 1); + domain.add_dimension(d1); + domain.add_dimension(d2); + + auto a1 = tiledb::Attribute::create(ctx, "a"); + schema.add_attribute(a1); + + schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}}); + schema.set_domain(domain); + + Array::create(array_name, schema); + + // Populate array with data from 1 to 9 + int value = 0; + for (int i = 0; i < 3; i++) { + Array array(ctx, array_name, TILEDB_WRITE); + Query query(ctx, array); + query.set_layout(TILEDB_ROW_MAJOR); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({i, i, 0, 2}); + query.set_subarray(sub); + std::vector data = {++value, ++value, ++value}; + query.set_data_buffer("a", data); + query.submit(); + array.close(); + } + + // Read data to validate write and num of fragments. + CHECK(tiledb::test::num_fragments(array_name) == 3); + Array array(ctx, array_name, TILEDB_READ); + const std::vector subarray = {0, 2, 0, 2}; + std::vector a(9); + Query query(ctx, array, TILEDB_READ); + query.set_subarray(subarray) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a); + query.submit(); + array.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a[i] == i + 1); + } + + // Consolidate with a size limitation for the fragment. This will result in + // the creation of two new fragments. + tiledb::Config cfg; + cfg.set("sm.consolidation.max_fragment_size", "150"); + ctx = Context(cfg); + Array::consolidate(ctx, array_name); + Array::vacuum(ctx, array_name); + + // Check that we now have 2 fragments instead of 3 + CHECK(tiledb::test::num_fragments(array_name) == 2); + + // Read data to validate correctness + Array array2(ctx, array_name, TILEDB_READ); + const std::vector subarray2 = {0, 2, 0, 2}; + std::vector a2(9); + Query query2(ctx, array2, TILEDB_READ); + query2.set_subarray(subarray2) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a2); + query2.submit(); + array2.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a2[i] == i + 1); + } +} diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index dcb9bc08f790..836c6307e803 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -695,8 +695,8 @@ void FragmentMetadata::init_domain(const NDRange& non_empty_domain) { // Sanity check assert(!non_empty_domain.empty()); - assert(non_empty_domain_.empty()); - assert(domain_.empty()); + // assert(non_empty_domain_.empty()); todo, this might cause problems + // assert(domain_.empty()); // Set non-empty domain for dense arrays (for sparse it will be calculated // via the MBRs) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index cff8f31b69af..785d352b8a47 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -38,6 +38,7 @@ #include "tiledb/sm/array_schema/array_schema.h" #include "tiledb/sm/array_schema/dimension.h" #include "tiledb/sm/consolidator/consolidator.h" +#include "tiledb/sm/enums/array_type.h" #include "tiledb/sm/fragment/fragment_metadata.h" #include "tiledb/sm/misc/comparators.h" #include "tiledb/sm/misc/hilbert.h" @@ -92,7 +93,8 @@ GlobalOrderWriter::GlobalOrderWriter( fragment_name) , processed_conditions_(processed_conditions) , fragment_size_(fragment_size) - , current_fragment_size_(0) { + , current_fragment_size_(0) + , rows_written_(0) { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -782,6 +784,13 @@ Status GlobalOrderWriter::global_write() { // Compute the number of tiles that will fit in this fragment. auto num = num_tiles_to_write(idx, tile_num, tiles); + if (tile_num != num && array_schema_.array_type() == ArrayType::DENSE) { + // if it is a dense array and not all tiles can fit in the current + // fragment then we need to split the domain + NDRange new_nd = ndranges_after_split(num); + frag_meta->init_domain(new_nd); + } + // If we're resuming a fragment write and the first tile doesn't fit into // the previous fragment, we need to start a new fragment and recalculate // the number of tiles to write. @@ -1423,6 +1432,52 @@ uint64_t GlobalOrderWriter::num_tiles_to_write( return tile_num - start; } +uint64_t GlobalOrderWriter::num_tiles_per_row() { + auto dim_num = array_schema_.dim_num(); + uint64_t ret = 1; + for (unsigned d = 1; d < dim_num; ++d) { + // skip first dim. todo Explain + auto dim{array_schema_.dimension_ptr(d)}; + auto dim_dom = dim->domain(); + ret *= dim->domain_range(dim_dom); + } + return ret; +} + +NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { + uint64_t tiles_per_row = num_tiles_per_row(); + auto dim_num = array_schema_.dim_num(); + NDRange nd; + nd.reserve(dim_num); + + if (num % tiles_per_row != 0) { + throw GlobalOrderWriterException( + "This fragment target size is not possible please try something else "); // todo fix + } + + // Calculate how many rows we will write in the current fragment + uint64_t rows_to_write = num / tiles_per_row; + + // Create the range for the index dim (first). + int start = rows_written_; + int end = start + rows_to_write - 1; + Range range(&start, &end, sizeof(int)); + nd.emplace_back(range); + + // Use the domain as ranges for the rest of the dims + for (unsigned d = 1; d < dim_num; ++d) { + // begin from second dim + auto dim{array_schema_.dimension_ptr(d)}; + auto dim_dom = dim->domain(); + nd.emplace_back(dim_dom); + } + + // add rows written to the cache + rows_written_ += rows_to_write; + + return nd; +} + Status GlobalOrderWriter::start_new_fragment() { auto frag_meta = global_write_state_->frag_meta_; auto& uri = frag_meta->fragment_uri(); diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index c15b81f67c8d..cf6cc417e6c0 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -215,6 +215,12 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t current_fragment_size_; + /** + * Counter for the number of rows written. This is used only when the + * consolidation produces more than one fragment in Dense arrays + */ + uint64_t rows_written_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -385,6 +391,15 @@ class GlobalOrderWriter : public WriterBase { uint64_t tile_num, tdb::pmr::unordered_map& tiles); + /** + * Return the number of tiles a single row can hold + * + * @return Number of tiles. + */ + NDRange ndranges_after_split(uint64_t num); + + uint64_t num_tiles_per_row(); + /** * Close the current fragment and start a new one. The closed fragment will * be added to `frag_uris_to_commit_` so that all fragments in progress can