From a7d3c560c973a75f011561823ce597ca5ae23504 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Mon, 2 Sep 2024 20:52:38 +0300 Subject: [PATCH] fully working with multi loop writes +signed dims support --- test/src/unit-cppapi-max-fragment-size.cc | 15 +++ tiledb/sm/enums/datatype.h | 7 ++ .../sm/query/writers/global_order_writer.cc | 105 ++++++++++++------ tiledb/sm/query/writers/global_order_writer.h | 40 ++++++- 4 files changed, 133 insertions(+), 34 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index 019f3b75d71b..d1fc02f77eaf 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -519,6 +519,15 @@ TEST_CASE( cleanup(); + bool more_than_one_loop = false; + SECTION("More than one loop write") { + more_than_one_loop = true; + } + + SECTION("One loop write") { + more_than_one_loop = false; + } + // Remove the array at the end of this test. ScopedExecutor deferred(cleanup); @@ -573,6 +582,11 @@ TEST_CASE( // the creation of two new fragments. tiledb::Config cfg; cfg.set("sm.consolidation.max_fragment_size", "150"); + cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation + if (more_than_one_loop) { + cfg.set("sm.consolidation.buffer_size", "10"); + } + ctx = Context(cfg); Array::consolidate(ctx, array_name); Array::vacuum(ctx, array_name); @@ -664,6 +678,7 @@ TEST_CASE( // the creation of two new fragments. tiledb::Config cfg; cfg.set("sm.consolidation.max_fragment_size", "80"); + cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation ctx = Context(cfg); Array::consolidate(ctx, array_name); Array::vacuum(ctx, array_name); diff --git a/tiledb/sm/enums/datatype.h b/tiledb/sm/enums/datatype.h index 897766241da6..ab05203fc052 100644 --- a/tiledb/sm/enums/datatype.h +++ b/tiledb/sm/enums/datatype.h @@ -338,6 +338,13 @@ inline bool datatype_is_string(Datatype type) { type == Datatype::STRING_UCS2 || type == Datatype::STRING_UCS4); } +/** Returns true if the input datatype is an unsigned type. */ +inline bool datatype_is_unsigned(Datatype type) { + return ( + type == Datatype::UINT8 || type == Datatype::UINT32 || + type == Datatype::UINT16 || type == Datatype::UINT64); +} + /** Returns true if the input datatype is an integer type. */ inline bool datatype_is_integer(Datatype type) { return ( diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 1d54c7eaf90a..bae43424b2dc 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -96,9 +96,14 @@ GlobalOrderWriter::GlobalOrderWriter( , current_fragment_size_(0) , rows_written_(0) , tiles_in_current_row_(0) + , tiles_written_(0) + , tiles_since_last_split_(0) + , u_start_(0) , start_(0) + , u_end_(0) , end_(0) - , nd_if_dense_split_{} { + , nd_if_dense_split_{} + , dense_with_split_(false) { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -787,23 +792,26 @@ Status GlobalOrderWriter::global_write() { // Compute the number of tiles that will fit in this fragment. auto num = num_tiles_to_write(idx, tile_num, tiles); - - if (array_schema_.array_type() == - ArrayType::DENSE) { //&& this is consolidation - // if it is a dense array and not all tiles can fit in the current - // fragment then we need to split the domain, otherwise if all tiles can - // fit it means that we are in the middle of a write - nd_if_dense_split_ = ndranges_after_split(num, tile_num != num); + bool is_dense = array_schema_.array_type() == ArrayType::DENSE; + bool is_last_range = false; + + if (is_dense && disable_checks_consolidation_) { + // if it is a dense array during consolidation and not all tiles can fit + // in the current fragment then we need to split the domain, otherwise if + // all tiles can fit it means that we are in the middle of a write + nd_if_dense_split_ = ndranges_after_split(num, tile_num, is_last_range); } - if (tile_num != num && !nd_if_dense_split_.empty()) { + if ((!nd_if_dense_split_.empty() && num != tile_num) || + (is_last_range && dense_with_split_)) { frag_meta->init_domain(nd_if_dense_split_); + dense_with_split_ = true; } // If we're resuming a fragment write and the first tile doesn't fit into // the previous fragment, we need to start a new fragment and recalculate // the number of tiles to write. - if (current_fragment_size_ > 0 && num == 0) { + if (current_fragment_size_ > 0 && num == 0 && !dense_with_split_) { RETURN_CANCEL_OR_ERROR(start_new_fragment()); num = num_tiles_to_write(idx, tile_num, tiles); } @@ -1462,33 +1470,47 @@ uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) { } NDRange GlobalOrderWriter::ndranges_after_split( - uint64_t num, bool reached_end_of_fragment) { + uint64_t num, uint64_t tile_num, bool& is_last_range) { // Expand domain to full tiles + bool reached_end_of_fragment = tile_num != num; auto& domain{array_schema_.domain()}; if (disable_checks_consolidation_) { auto expanded_subarray = subarray_.ndrange(0); domain.expand_to_tiles(&expanded_subarray); } + tiles_written_ += num; + tiles_since_last_split_ += num; + // Calculate how many tiles each row can hold uint64_t tiles_per_row = num_tiles_per_row(domain); // Calculate how many rows we will write in the current fragment - uint64_t rows_of_tiles_to_write = - (num - tiles_in_current_row_) / tiles_per_row; + uint64_t rows_of_tiles_to_write = 0; + + if (num != 0) { + rows_of_tiles_to_write = (num - tiles_in_current_row_) / tiles_per_row; + } + + // If we have not written a full row and we have reached the end of the + // fragment abort + + // set vars uint64_t remainder_of_tiles = 0; - bool moved_row = true; + // Calculate how many tiles we have in the current row if (rows_of_tiles_to_write == 0) { remainder_of_tiles += num; - moved_row = false; } else { remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row; } - tiles_in_current_row_ = remainder_of_tiles; + tiles_in_current_row_ += remainder_of_tiles; - // If we have not written a full row and we have reached the end of the - // fragment abort + if (tiles_in_current_row_ == tiles_per_row) { + tiles_in_current_row_ = 0; + } + + // If we have finished the write in the middle of the row, throw if (tiles_in_current_row_ != 0 && reached_end_of_fragment) { throw GlobalOrderWriterException( "The target fragment size cannot be achieved. Please try using a " @@ -1513,23 +1535,39 @@ NDRange GlobalOrderWriter::ndranges_after_split( if (rows_written_ == 0) { // It means that the start has not been set yet. Set it to the minimum value // of the expanded domain for that dim - auto ll = [&](auto T) { - auto dim_dom_data = (const decltype(T)*)dim_dom.data(); - // todo this should be unsigned or signed - return static_cast(dim_dom_data[0]); + auto ll = [&](auto T, size_t index) { + // Return a pair. The domain can be signed or unsigned + auto dim_dom_data = dim_dom.typed_data()[index]; + int64_t ret_s = static_cast(dim_dom_data); + uint64_t ret_u = static_cast(dim_dom_data); + return std::make_pair(ret_s, ret_u); }; - start_ = apply_with_type(ll, dim->type()); - end_ = apply_with_type(ll, dim->type()); + // based on whether the dim is signed or unsigned assign the proper vars + if (datatype_is_unsigned(dim->type())) { + u_start_ = apply_with_type(ll, dim->type(), 0).second; + u_end_ = apply_with_type(ll, dim->type(), 1).second; + } else { + start_ = apply_with_type(ll, dim->type(), 0).first; + end_ = apply_with_type(ll, dim->type(), 1).first; + } } - uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; - // if there is a remainder it means we need one more row - if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_ && - moved_row) { - end++; + // Use 'auto' to temporarily use the cached signed or unsigned start_ and end_ + // values + auto start_to_use = datatype_is_unsigned(dim->type()) ? u_start_ : start_; + auto end_to_use = datatype_is_unsigned(dim->type()) ? u_end_ : end_; + + auto end = + start_to_use + ((tiles_since_last_split_ / tiles_per_row) * tile_extent); + if (tiles_since_last_split_ % tiles_per_row == 0 && end != start_to_use) { + // We are at the finish of the row, subtract 1 from the end so that + // we dont go to the next range + end--; } + rows_written_ = tiles_written_ / tiles_per_row; + // Add range Range range(&start_, &end, sizeof(int)); nd.emplace_back(range); @@ -1543,8 +1581,13 @@ NDRange GlobalOrderWriter::ndranges_after_split( } // add rows written to the cache - rows_written_ += rows_of_tiles_to_write; - start_ = end + 1; + if (tile_num != num) { + start_ = end + 1; + end = start_; + tiles_since_last_split_ = 0; + } + + is_last_range = end == end_to_use; return nd; } diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index 2f0462c5362a..5818296da916 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -227,23 +227,56 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t tiles_in_current_row_; + /** + * The total number of tiles written. It is only being used when consolidating + * Dense arrays where the result can not fit into one fragment only + */ + uint64_t tiles_written_; + + /** + * The number of tiles written since the last Dense domain split. It is only + * being used when consolidating Dense arrays where the result can not fit + * into one fragment only + */ + uint64_t tiles_since_last_split_; + + /** + * This is the unsigned start for the dim range in case we need to split in + * multiple fragments in Dense arrays + */ + uint64_t u_start_; + /** * This is the start for the dim range in case we need to split in multiple * fragments in Dense arrays */ - uint64_t start_; + int64_t start_; + + /** + * This is the unsigned start for the dim range in case we need to split in + * multiple fragments in Dense arrays + */ + uint64_t u_end_; /** * This is the start for the dim range in case we need to split in multiple * fragments in Dense arrays */ - uint64_t end_; + int64_t end_; /** * NDRange in case we have a dense consolidation with split */ NDRange nd_if_dense_split_; + /** + * True if we have made at least one split in the Dense consolidation. By + * split we mean a fragment split so the result of the consolidation if we + * have n fragments is not n+1 but n+m where m is the number of fragments + * created after consolidation + */ + bool dense_with_split_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -422,7 +455,8 @@ class GlobalOrderWriter : public WriterBase { * current frag * */ - NDRange ndranges_after_split(uint64_t num, bool reached_end_of_fragment); + NDRange ndranges_after_split( + uint64_t num, uint64_t tile_num, bool& is_last_range); /** * Return the number of tiles a single row can hold. More specifically, the