From ef02c45708d4aa5c924bfc920836d044f6aba0de Mon Sep 17 00:00:00 2001 From: Stavros Papadopoulos Date: Wed, 27 Nov 2019 12:48:33 -0500 Subject: [PATCH] Towards addressing #93. Split writer algorithms such that the dimension coordinates are split in different buffers and handled separately. --- test/src/unit-capi-dense_array.cc | 2 +- test/src/unit-cppapi-array.cc | 21 +- tiledb/sm/array_schema/dimension.cc | 142 ++ tiledb/sm/array_schema/dimension.h | 43 + tiledb/sm/array_schema/domain.cc | 194 ++- tiledb/sm/array_schema/domain.h | 119 ++ tiledb/sm/filter/filter_pipeline.cc | 4 - tiledb/sm/fragment/fragment_metadata.cc | 2 + tiledb/sm/misc/comparators.h | 51 + tiledb/sm/misc/utils.cc | 35 + tiledb/sm/misc/utils.h | 16 + tiledb/sm/query/types.h | 65 + tiledb/sm/query/writer.cc | 1726 ++++++++++++++--------- tiledb/sm/query/writer.h | 363 +++-- tiledb/sm/tile/tile.cc | 1 + 15 files changed, 2034 insertions(+), 750 deletions(-) diff --git a/test/src/unit-capi-dense_array.cc b/test/src/unit-capi-dense_array.cc index b4eaa27ca88a..223877169556 100644 --- a/test/src/unit-capi-dense_array.cc +++ b/test/src/unit-capi-dense_array.cc @@ -4160,7 +4160,7 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( DenseArrayFx, "C API: Test dense array, check if coords exist in unordered writes", - "[capi], [dense], [dense-coords-exist-unordered]") { + "[capi][dense][coords-exist-unordered]") { SECTION("- No serialization") { serialize_query_ = false; } diff --git a/test/src/unit-cppapi-array.cc b/test/src/unit-cppapi-array.cc index 70aa347a7e1d..47645edaaef7 100644 --- a/test/src/unit-cppapi-array.cc +++ b/test/src/unit-cppapi-array.cc @@ -388,7 +388,7 @@ TEST_CASE( vfs.remove_dir(array_name_1d); } -TEST_CASE("C++ API: Read subarray with expanded domain", "[cppapi], [dense]") { +TEST_CASE("C++ API: Read subarray with expanded domain", "[cppapi][dense]") { const std::vector tile_layouts = {TILEDB_ROW_MAJOR, TILEDB_COL_MAJOR}, cell_layouts = {TILEDB_ROW_MAJOR, @@ -455,8 +455,7 @@ TEST_CASE("C++ API: Read subarray with expanded domain", "[cppapi], [dense]") { } } -TEST_CASE( - "C++ API: Consolidation of empty arrays", "[cppapi], [consolidation]") { +TEST_CASE("C++ API: Consolidation of empty arrays", "[cppapi][consolidation]") { Context ctx; VFS vfs(ctx); const std::string array_name = "cpp_unit_array"; @@ -530,7 +529,7 @@ TEST_CASE( vfs.remove_dir(array_name); } -TEST_CASE("C++ API: Encrypted array", "[cppapi], [encryption]") { +TEST_CASE("C++ API: Encrypted array", "[cppapi][encryption]") { Context ctx; VFS vfs(ctx); const std::string array_name = "cpp_unit_array"; @@ -625,9 +624,7 @@ TEST_CASE("C++ API: Encrypted array", "[cppapi], [encryption]") { vfs.remove_dir(array_name); } -TEST_CASE( - "C++ API: Encrypted array, std::string key", - "[cppapi][encryption][cppapi-encryption]") { +TEST_CASE("C++ API: Encrypted array, std::string key", "[cppapi][encryption]") { Context ctx; VFS vfs(ctx); const std::string array_name = "cpp_unit_array"; @@ -711,7 +708,7 @@ TEST_CASE( TEST_CASE( "C++ API: Open array with anonymous attribute", - "[cppapi], [cppapi-open-array-anon-attr]") { + "[cppapi][open-array-anon-attr]") { Context ctx; VFS vfs(ctx); const std::string array_name = "cppapi_open_array_anon_attr"; @@ -736,7 +733,7 @@ TEST_CASE( vfs.remove_dir(array_name); } -TEST_CASE("C++ API: Open array at", "[cppapi], [cppapi-open-array-at]") { +TEST_CASE("C++ API: Open array at", "[cppapi][open-array-at]") { Context ctx; VFS vfs(ctx); const std::string array_name = "cppapi_open_array_at"; @@ -840,8 +837,7 @@ TEST_CASE("C++ API: Open array at", "[cppapi], [cppapi-open-array-at]") { } TEST_CASE( - "C++ API: Open encrypted array at", - "[cppapi], [cppapi-open-encrypted-array-at]") { + "C++ API: Open encrypted array at", "[cppapi][open-encrypted-array-at]") { const char key[] = "0123456789abcdeF0123456789abcdeF"; uint32_t key_len = (uint32_t)strlen(key); @@ -943,7 +939,8 @@ TEST_CASE( } TEST_CASE( - "C++ API: Writing single cell with global order", "[cppapi], [sparse]") { + "C++ API: Writing single cell with global order", + "[cppapi][sparse][global]") { const std::string array_name = "cpp_unit_array"; Context ctx; VFS vfs(ctx); diff --git a/tiledb/sm/array_schema/dimension.cc b/tiledb/sm/array_schema/dimension.cc index f76d00fa536e..1476c5e290f6 100644 --- a/tiledb/sm/array_schema/dimension.cc +++ b/tiledb/sm/array_schema/dimension.cc @@ -36,6 +36,7 @@ #include #include +#include namespace tiledb { namespace sm { @@ -48,6 +49,7 @@ Dimension::Dimension() { domain_ = nullptr; tile_extent_ = nullptr; type_ = Datatype::INT32; + set_oob_func(); } Dimension::Dimension(const std::string& name, Datatype type) @@ -55,6 +57,7 @@ Dimension::Dimension(const std::string& name, Datatype type) , type_(type) { domain_ = nullptr; tile_extent_ = nullptr; + set_oob_func(); } Dimension::Dimension(const Dimension* dim) { @@ -62,6 +65,7 @@ Dimension::Dimension(const Dimension* dim) { name_ = dim->name(); type_ = dim->type_; + oob_func_ = dim->oob_func_; uint64_t type_size = datatype_size(type_); domain_ = std::malloc(2 * type_size); std::memcpy(domain_, dim->domain(), 2 * type_size); @@ -85,6 +89,67 @@ Dimension::~Dimension() { /* API */ /* ********************************* */ +uint64_t Dimension::coord_size() const { + return datatype_size(type_); +} + +std::string Dimension::coord_to_str(const void* coord) const { + std::stringstream ss; + assert(coord != nullptr); + + switch (type_) { + case Datatype::INT32: + ss << *((int32_t*)coord); + break; + case Datatype::INT64: + ss << *((int64_t*)coord); + break; + case Datatype::INT8: + ss << *((int8_t*)coord); + break; + case Datatype::UINT8: + ss << *((uint8_t*)coord); + break; + case Datatype::INT16: + ss << *((int16_t*)coord); + break; + case Datatype::UINT16: + ss << *((uint16_t*)coord); + break; + case Datatype::UINT32: + ss << *((uint32_t*)coord); + break; + case Datatype::UINT64: + ss << *((uint64_t*)coord); + break; + case Datatype::FLOAT32: + ss << *((float*)coord); + break; + case Datatype::FLOAT64: + ss << *((double*)coord); + break; + case Datatype::DATETIME_YEAR: + case Datatype::DATETIME_MONTH: + case Datatype::DATETIME_WEEK: + case Datatype::DATETIME_DAY: + case Datatype::DATETIME_HR: + case Datatype::DATETIME_MIN: + case Datatype::DATETIME_SEC: + case Datatype::DATETIME_MS: + case Datatype::DATETIME_US: + case Datatype::DATETIME_NS: + case Datatype::DATETIME_PS: + case Datatype::DATETIME_FS: + case Datatype::DATETIME_AS: + ss << *((int64_t*)coord); + break; + default: + break; + } + + return ss.str(); +} + // ===== FORMAT ===== // dimension_name_size (uint32_t) // dimension_name (string) @@ -124,6 +189,8 @@ Status Dimension::deserialize(ConstBuffer* buff, Datatype type) { RETURN_NOT_OK(buff->read(tile_extent_, datatype_size(type_))); } + set_oob_func(); + return Status::Ok(); } @@ -153,6 +220,27 @@ bool Dimension::is_anonymous() const { utils::parse::starts_with(name_, constants::default_dim_name); } +template +bool Dimension::oob( + const Dimension* dim, const void* coord, std::string* err_msg) { + auto domain = (const T*)dim->domain(); + auto coord_t = (const T*)coord; + if (*coord_t < domain[0] || *coord_t > domain[1]) { + std::stringstream ss; + ss << "Coordinate " << *coord_t << " is out of domain bounds [" << domain[0] + << ", " << domain[1] << "] on dimension '" << dim->name() << "'"; + *err_msg = ss.str(); + return true; + } + + return false; +} + +bool Dimension::oob(const void* coord, std::string* err_msg) const { + assert(oob_func_ != nullptr); + return oob_func_(this, coord, err_msg); +} + // ===== FORMAT ===== // dimension_name_size (uint32_t) // dimension_name (string) @@ -478,6 +566,60 @@ Status Dimension::check_tile_extent() const { return Status::Ok(); } +void Dimension::set_oob_func() { + // Set + switch (type_) { + case Datatype::INT32: + oob_func_ = oob; + break; + case Datatype::INT64: + oob_func_ = oob; + break; + case Datatype::INT8: + oob_func_ = oob; + break; + case Datatype::UINT8: + oob_func_ = oob; + break; + case Datatype::INT16: + oob_func_ = oob; + break; + case Datatype::UINT16: + oob_func_ = oob; + break; + case Datatype::UINT32: + oob_func_ = oob; + break; + case Datatype::UINT64: + oob_func_ = oob; + break; + case Datatype::FLOAT32: + oob_func_ = oob; + break; + case Datatype::FLOAT64: + oob_func_ = oob; + break; + case Datatype::DATETIME_YEAR: + case Datatype::DATETIME_MONTH: + case Datatype::DATETIME_WEEK: + case Datatype::DATETIME_DAY: + case Datatype::DATETIME_HR: + case Datatype::DATETIME_MIN: + case Datatype::DATETIME_SEC: + case Datatype::DATETIME_MS: + case Datatype::DATETIME_US: + case Datatype::DATETIME_NS: + case Datatype::DATETIME_PS: + case Datatype::DATETIME_FS: + case Datatype::DATETIME_AS: + oob_func_ = oob; + break; + default: + oob_func_ = nullptr; + break; + } +} + } // namespace sm } // namespace tiledb diff --git a/tiledb/sm/array_schema/dimension.h b/tiledb/sm/array_schema/dimension.h index de1e6399629b..23ff33d2df30 100644 --- a/tiledb/sm/array_schema/dimension.h +++ b/tiledb/sm/array_schema/dimension.h @@ -76,6 +76,12 @@ class Dimension { /* API */ /* ********************************* */ + /** Returns the size (in bytes) of a coordinate in this dimension. */ + uint64_t coord_size() const; + + /** Returns the input coordinate in string format. */ + std::string coord_to_str(const void* coord) const; + /** * Populates the object members from the data in the input binary buffer. * @@ -97,6 +103,33 @@ class Dimension { /** Returns true if this is an anonymous (unlabled) dimension **/ bool is_anonymous() const; + /** + * Returns true if the input coordinate is out-of-bounds with respect + * to the dimension domain. + * + * @param coord The coordinate to be checked. It will properly be + * type-cast to the dimension datatype. + * @param err_msg An error message to be retrieved in case the function + * returns true. + * @return True if the input coordinates is out-of-bounds. + */ + bool oob(const void* coord, std::string* err_msg) const; + + /** + * Returns true if the input coordinate is out-of-bounds with respect + * to the dimension domain. + * + * @param dim The dimension to apply the oob check on. + * @param coord The coordinate to be checked. It will properly be + * type-cast to the dimension datatype. + * @param err_msg An error message to be retrieved in case the function + * returns true. + * @return True if the input coordinates is out-of-bounds. + */ + template + static bool oob( + const Dimension* dim, const void* coord, std::string* err_msg); + /** * Serializes the object members into a binary buffer. * @@ -149,6 +182,13 @@ class Dimension { /** The dimension type. */ Datatype type_; + /** + * Stores the appropriate templated oob() function based on the + * dimension datatype. + */ + std::function + oob_func_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -229,6 +269,9 @@ class Dimension { */ template Status check_tile_extent() const; + + /** Sets the templated oob function. */ + void set_oob_func(); }; } // namespace sm diff --git a/tiledb/sm/array_schema/domain.cc b/tiledb/sm/array_schema/domain.cc index 0cd736389e00..0d37bd99741b 100644 --- a/tiledb/sm/array_schema/domain.cc +++ b/tiledb/sm/array_schema/domain.cc @@ -81,6 +81,8 @@ Domain::Domain(const Domain* domain) { cell_order_ = domain->cell_order_; dim_num_ = domain->dim_num_; type_ = domain->type_; + cell_order_cmp_func_ = domain->cell_order_cmp_func_; + tile_order_cmp_func_ = domain->tile_order_cmp_func_; for (auto dim : domain->dimensions_) dimensions_.emplace_back(new Dimension(dim)); @@ -501,6 +503,52 @@ int Domain::cell_order_cmp(const T* coords_a, const T* coords_b) const { return 0; } +template +int Domain::cell_order_cmp( + const Dimension* dim, + const std::vector& coord_buffs, + uint64_t a, + uint64_t b, + unsigned d) { + auto coord_buff = (unsigned char*)coord_buffs[d]; + auto coord_size = dim->coord_size(); + auto coords_a = (T*)&(coord_buff[a * coord_size]); + auto coords_b = (T*)&(coord_buff[b * coord_size]); + if (*coords_a < *coords_b) + return -1; + if (*coords_a > *coords_b) + return 1; + return 0; +} + +int Domain::cell_order_cmp( + const std::vector& coord_buffs, uint64_t a, uint64_t b) const { + if (cell_order_ == Layout::ROW_MAJOR) { + for (unsigned d = 0; d < dim_num_; ++d) { + auto dim = dimension(d); + auto res = cell_order_cmp_func_[d](dim, coord_buffs, a, b, d); + + if (res == 1 || res == -1) + return res; + // else same tile on dimension d --> continue + } + } else { // COL_MAJOR + for (unsigned d = dim_num_ - 1;; --d) { + auto dim = dimension(d); + auto res = cell_order_cmp_func_[d](dim, coord_buffs, a, b, d); + + if (res == 1 || res == -1) + return res; + // else same tile on dimension d --> continue + + if (d == 0) + break; + } + } + + return 0; +} + void Domain::crop_domain(void* domain) const { switch (type_) { case Datatype::INT32: @@ -567,6 +615,8 @@ Status Domain::deserialize(ConstBuffer* buff) { dimensions_.emplace_back(dim); } + set_tile_cell_order_cmp_funcs(); + return Status::Ok(); } @@ -845,6 +895,9 @@ Status Domain::init(Layout cell_order, Layout tile_order) { // Compute tile offsets compute_tile_offsets(); + // Compute the tile/cell order cmp functions + set_tile_cell_order_cmp_funcs(); + return Status::Ok(); } @@ -999,6 +1052,60 @@ int Domain::tile_order_cmp(const T* coords_a, const T* coords_b) const { return 0; } +template +int Domain::tile_order_cmp( + const Dimension* dim, + const std::vector& coord_buffs, + uint64_t a, + uint64_t b, + unsigned d) { + auto tile_extent = (T*)dim->tile_extent(); + assert(tile_extent != nullptr); + auto domain = (T*)dim->domain(); + auto coord_buff = (unsigned char*)coord_buffs[d]; + auto coord_size = dim->coord_size(); + auto coords_a = (T*)&(coord_buff[a * coord_size]); + auto coords_b = (T*)&(coord_buff[b * coord_size]); + auto ta = (T)((*coords_a - domain[0]) / *tile_extent); + auto tb = (T)((*coords_b - domain[0]) / *tile_extent); + if (ta < tb) + return -1; + if (ta > tb) + return 1; + return 0; +} + +int Domain::tile_order_cmp( + const std::vector& coord_buffs, uint64_t a, uint64_t b) const { + if (tile_extents_ == nullptr) + return 0; + + if (tile_order_ == Layout::ROW_MAJOR) { + for (unsigned d = 0; d < dim_num_; ++d) { + auto dim = dimension(d); + auto res = tile_order_cmp_func_[d](dim, coord_buffs, a, b, d); + + if (res == 1 || res == -1) + return res; + // else same tile on dimension d --> continue + } + } else { // COL_MAJOR + for (unsigned d = dim_num_ - 1;; --d) { + auto dim = dimension(d); + auto res = tile_order_cmp_func_[d](dim, coord_buffs, a, b, d); + + if (res == 1 || res == -1) + return res; + // else same tile on dimension d --> continue + + if (d == 0) + break; + } + } + + return 0; +} + template int Domain::tile_order_cmp_tile_coords( const T* tile_coords_a, const T* tile_coords_b) const { @@ -1158,6 +1265,81 @@ void Domain::compute_tile_domain() { } } +void Domain::set_tile_cell_order_cmp_funcs() { + tile_order_cmp_func_.resize(dim_num_); + cell_order_cmp_func_.resize(dim_num_); + for (unsigned d = 0; d < dim_num_; ++d) { + switch (type_) { + case Datatype::INT32: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::INT64: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::INT8: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::UINT8: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::INT16: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::UINT16: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::UINT32: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::UINT64: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::DATETIME_YEAR: + case Datatype::DATETIME_MONTH: + case Datatype::DATETIME_WEEK: + case Datatype::DATETIME_DAY: + case Datatype::DATETIME_HR: + case Datatype::DATETIME_MIN: + case Datatype::DATETIME_SEC: + case Datatype::DATETIME_MS: + case Datatype::DATETIME_US: + case Datatype::DATETIME_NS: + case Datatype::DATETIME_PS: + case Datatype::DATETIME_FS: + case Datatype::DATETIME_AS: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::FLOAT32: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::FLOAT64: + tile_order_cmp_func_[d] = tile_order_cmp; + cell_order_cmp_func_[d] = cell_order_cmp; + break; + case Datatype::CHAR: + case Datatype::STRING_ASCII: + case Datatype::STRING_UTF8: + case Datatype::STRING_UTF16: + case Datatype::STRING_UTF32: + case Datatype::STRING_UCS2: + case Datatype::STRING_UCS4: + case Datatype::ANY: + // Not supported domain types + assert(false); + } + } +} + template void Domain::compute_tile_domain() { if (tile_extents_ == nullptr) @@ -1293,7 +1475,8 @@ uint64_t Domain::get_cell_pos_col(const T* coords) const { uint64_t pos = 0; T coords_norm; // Normalized coordinates inside the tile - // Special-case for low dimensions to an unrolled version of the default loop. + // Special-case for low dimensions to an unrolled version of the default + // loop. switch (dim_num_) { case 1: coords_norm = (coords[0] - domain[2 * 0]); @@ -1341,7 +1524,8 @@ template uint64_t Domain::get_cell_pos_col(const T* subarray, const T* coords) const { uint64_t pos = 0; - // Special-case for low dimensions to an unrolled version of the default loop. + // Special-case for low dimensions to an unrolled version of the default + // loop. switch (dim_num_) { case 1: pos += (coords[0] - subarray[2 * 0]) * 1; @@ -1382,7 +1566,8 @@ uint64_t Domain::get_cell_pos_row(const T* coords) const { uint64_t pos = 0; T coords_norm; // Normalized coordinates inside the tile - // Special-case for low dimensions to an unrolled version of the default loop. + // Special-case for low dimensions to an unrolled version of the default + // loop. switch (dim_num_) { case 1: coords_norm = (coords[0] - domain[2 * 0]); @@ -1436,7 +1621,8 @@ template uint64_t Domain::get_cell_pos_row(const T* subarray, const T* coords) const { uint64_t pos = 0; - // Special-case for low dimensions to an unrolled version of the default loop. + // Special-case for low dimensions to an unrolled version of the default + // loop. switch (dim_num_) { case 1: pos += (coords[0] - subarray[2 * 0]) * 1; diff --git a/tiledb/sm/array_schema/domain.h b/tiledb/sm/array_schema/domain.h index 00eb64567f15..1e3e648471ed 100644 --- a/tiledb/sm/array_schema/domain.h +++ b/tiledb/sm/array_schema/domain.h @@ -224,6 +224,47 @@ class Domain { template int cell_order_cmp(const T* coords_a, const T* coords_b) const; + /** + * Checks the cell order of the input coordinates on the given dimension. + * + * @param The dimension to compare on. + * @param coord_buffs The input coordinates, given in separate buffers, + * one per dimension. The buffers are sorted in the same order of the + * dimensions as defined in the array schema. + * @param a The position of the first coordinate tuple across all buffers. + * @param b The position of the second coordinate tuple across all buffers. + * @param d The dimension index to compare on. + * @return One of the following: + * - -1 if the first coordinates precede the second on the cell order + * - 0 if the two coordinates have the same cell order + * - +1 if the first coordinates succeed the second on the cell order + */ + template + static int cell_order_cmp( + const Dimension* dim, + const std::vector& coord_buffs, + uint64_t a, + uint64_t b, + unsigned d); + + /** + * Checks the cell order of the input coordinates. + * + * @param coord_buffs The input coordinates, given n separate buffers, + * one per dimension. The buffers are sorted in the same order of the + * dimensions as defined in the array schema. + * @param a The position of the first coordinate tuple across all buffers. + * @param b The position of the second coordinate tuple across all buffers. + * @return One of the following: + * - -1 if the first coordinates precede the second on the cell order + * - 0 if the two coordinates have the same cell order + * - +1 if the first coordinates succeed the second on the cell order + */ + int cell_order_cmp( + const std::vector& coord_buffs, + uint64_t a, + uint64_t b) const; + /** * Populates the object members from the data in the input binary buffer. * @@ -607,6 +648,47 @@ class Domain { template int tile_order_cmp(const T* coords_a, const T* coords_b) const; + /** + * Checks the tile order of the input coordinates on the given dimension. + * + * @param The dimension to compare on. + * @param coord_buffs The input coordinates, given in separate buffers, + * one per dimension. The buffers are sorted in the same order of the + * dimensions as defined in the array schema. + * @param a The position of the first coordinate tuple across all buffers. + * @param b The position of the second coordinate tuple across all buffers. + * @param d The dimension index to compare on. + * @return One of the following: + * - -1 if the first coordinates precede the second on the tile order + * - 0 if the two coordinates have the same tile order + * - +1 if the first coordinates succeed the second on the tile order + */ + template + static int tile_order_cmp( + const Dimension* dim, + const std::vector& coord_buffs, + uint64_t a, + uint64_t b, + unsigned d); + + /** + * Checks the tile order of the input coordinates. + * + * @param coord_buffs The input coordinates, given n separate buffers, + * one per dimension. The buffers are sorted in the same order of the + * dimensions as defined in the array schema. + * @param a The position of the first coordinate tuple across all buffers. + * @param b The position of the second coordinate tuple across all buffers. + * @return One of the following: + * - -1 if the first coordinates precede the second on the tile order + * - 0 if the two coordinates have the same tile order + * - +1 if the first coordinates succeed the second on the tile order + */ + int tile_order_cmp( + const std::vector& coord_buffs, + uint64_t a, + uint64_t b) const; + /** * Checks the tile order of the input tile coordinates. * @@ -733,6 +815,40 @@ class Domain { /** The tile order of the array the domain belongs to. */ Layout tile_order_; + /** + * Vector of functions, one per dimension, for comparing the cell order of + * two coordinates. The inputs to the function are: + * + * - dim: The dimension to compare on. + * - coord_buffs: The coordinates, split in one buffer per dimensions. + * - a,b: The two positions of the coordinates to compare. + * - d: The dimension index to compare on. + */ + std::vector& coord_buffs, + uint64_t a, + uint64_t b, + unsigned d)> + cell_order_cmp_func_; + + /** + * Vector of functions, one per dimension, for comparing the tile order of + * two coordinates. The inputs to the function are: + * + * - dim: The dimension to compare on. + * - coord_buffs: The coordinates, split in one buffer per dimensions. + * - a,b: The two positions of the coordinates to compare. + * - d: The dimension index to compare on. + */ + std::vector& coord_buffs, + uint64_t a, + uint64_t b, + unsigned d)> + tile_order_cmp_func_; + /** The type of dimensions. */ Datatype type_; @@ -752,6 +868,9 @@ class Domain { template void compute_cell_num_per_tile(); + /** Prepares the comparator functions for each dimension. */ + void set_tile_cell_order_cmp_funcs(); + /** Computes the tile domain. */ void compute_tile_domain(); diff --git a/tiledb/sm/filter/filter_pipeline.cc b/tiledb/sm/filter/filter_pipeline.cc index 4bc2db963a58..77d848ae405f 100644 --- a/tiledb/sm/filter/filter_pipeline.cc +++ b/tiledb/sm/filter/filter_pipeline.cc @@ -355,10 +355,6 @@ Status FilterPipeline::run_forward(Tile* tile) const { current_tile_ = tile; - // Split the coords if the tile stores coordinates. - if (tile->stores_coords()) - tile->split_coordinates(); - // Compute the chunks. std::vector> chunks; RETURN_NOT_OK(compute_tile_chunks(tile, &chunks)); diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index 75aae4f60f16..70b29adee2da 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -839,6 +839,8 @@ void FragmentMetadata::get_subarray_tile_domain( template Status FragmentMetadata::expand_non_empty_domain(const T* mbr) { + std::lock_guard lock(mtx_); + if (non_empty_domain_ == nullptr) { auto domain_size = 2 * array_schema_->coords_size(); non_empty_domain_ = std::malloc(domain_size); diff --git a/tiledb/sm/misc/comparators.h b/tiledb/sm/misc/comparators.h index 77f485b3e4d6..4bcdb8041258 100644 --- a/tiledb/sm/misc/comparators.h +++ b/tiledb/sm/misc/comparators.h @@ -202,6 +202,57 @@ class GlobalCmp { unsigned dim_num_; }; +/** + * Wrapper of comparison function for sorting coords on the global order + * of some domain. + */ +class GlobalCmp2 { + public: + /** + * Constructor. + * + * @param domain The array domain. + * @param coord_buffs The coordinate buffers, one per dimension, containing + * the actual values, used in positional comparisons. + */ + GlobalCmp2(const Domain* domain, const std::vector& coord_buffs) + : domain_(domain) + , coord_buffs_(coord_buffs) { + } + + /** + * Positional comparison operator. + * + * @param a The first cell position. + * @param b The second cell position. + * @return `true` if cell at `a` across all coordinate buffers precedes + * cell at `b`, and `false` otherwise. + */ + bool operator()(uint64_t a, uint64_t b) const { + // Compare tile order first + auto tile_cmp = domain_->tile_order_cmp(coord_buffs_, a, b); + + if (tile_cmp == -1) + return true; + if (tile_cmp == 1) + return false; + // else tile_cmp == 0 --> continue + + // Compare cell order + auto cell_cmp = domain_->cell_order_cmp(coord_buffs_, a, b); + return cell_cmp == -1; + } + + private: + /** The domain. */ + const Domain* domain_; + /** + * The coordinate buffers, one per dimension, sorted in the order the + * dimensions are defined in the array schema. + */ + const std::vector& coord_buffs_; +}; + } // namespace sm } // namespace tiledb diff --git a/tiledb/sm/misc/utils.cc b/tiledb/sm/misc/utils.cc index 331139b78da9..34aa80449886 100644 --- a/tiledb/sm/misc/utils.cc +++ b/tiledb/sm/misc/utils.cc @@ -675,6 +675,20 @@ void expand_mbr(T* mbr, const T* coords, unsigned int dim_num) { } } +template +void expand_mbr(const std::vector& coords, const uint64_t pos, T* mbr) { + auto dim_num = (unsigned)coords.size(); + for (unsigned int d = 0; d < dim_num; ++d) { + // Update lower bound on dimension i + if (mbr[2 * d] > coords[d][pos]) + mbr[2 * d] = coords[d][pos]; + + // Update upper bound on dimension i + if (mbr[2 * d + 1] < coords[d][pos]) + mbr[2 * d + 1] = coords[d][pos]; + } +} + template void expand_mbr_with_mbr(T* mbr_a, const T* mbr_b, unsigned int dim_num) { for (unsigned int i = 0; i < dim_num; ++i) { @@ -929,6 +943,27 @@ template void expand_mbr( template void expand_mbr( uint64_t* mbr, const uint64_t* coords, unsigned int dim_num); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, int8_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, uint8_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, int16_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, uint16_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, int32_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, uint32_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, int64_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, uint64_t* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, float* mbr); +template void expand_mbr( + const std::vector& coords, const uint64_t pos, double* mbr); + template void expand_mbr_with_mbr( int* mbr_a, const int* mbr_b, unsigned int dim_num); template void expand_mbr_with_mbr( diff --git a/tiledb/sm/misc/utils.h b/tiledb/sm/misc/utils.h index 90e27fe3303e..5e16ef8b9c88 100644 --- a/tiledb/sm/misc/utils.h +++ b/tiledb/sm/misc/utils.h @@ -278,6 +278,22 @@ void compute_mbr_union( template void expand_mbr(T* mbr, const T* coords, unsigned int dim_num); +/** + * Expands the input MBR with the input coordinates. The input coordinates + * are given as a vector of values, one per dimension, and a position in + * the vectors along the dimensions. The input MBR will be expanded using + * the values along the dimensions located at the input position. + * + * @tparam T The type of the MBR and coordinates. + * @param coords A vector of coordinate buffers, one per dimension. + * @param pos The position of the values in the coordinate buffers that + * will be used to expand the MBR with. + * @param mbr The input MBR to be expanded. + * @return void + */ +template +void expand_mbr(const std::vector& coords, const uint64_t pos, T* mbr); + /** * Expands `mbr_a` so that it encompasses `mbr_b`. * diff --git a/tiledb/sm/query/types.h b/tiledb/sm/query/types.h index 06012027fe43..6b5fdcec66b7 100644 --- a/tiledb/sm/query/types.h +++ b/tiledb/sm/query/types.h @@ -97,6 +97,71 @@ struct AttributeBuffer { (buffer_var_size_ != nullptr) ? *buffer_var_size : 0; } }; + +/** + * Contains the buffer(s) and buffer size(s) for the coordinates of + * a dimension. + */ +struct CoordBuffer { + /** + * The coordinate buffer. In case the dimension is var-sized, this is + * the offsets buffer. + */ + void* buffer_; + /** + * For a var-sized dimension, this is the data buffer. It is `nullptr` + * for fixed-sized dimensions. + */ + void* buffer_var_; + /** + * The size (in bytes) of `buffer_`. Note that this size may be altered by + * a read query to reflect the useful data written in the buffer. + */ + uint64_t* buffer_size_; + /** + * The size (in bytes) of `buffer_var_`. Note that this size may be altered + * by a read query to reflect the useful data written in the buffer. + */ + uint64_t* buffer_var_size_; + /** + * This is the original size (in bytes) of `buffer_` (before + * potentially altered by the query). + */ + uint64_t original_buffer_size_; + /** + * This is the original size (in bytes) of `buffer_var_` (before + * potentially altered by the query). + */ + uint64_t original_buffer_var_size_; + + /** Constructor. */ + CoordBuffer() { + buffer_ = nullptr; + buffer_var_ = nullptr; + buffer_size_ = nullptr; + buffer_var_size_ = nullptr; + original_buffer_size_ = 0; + original_buffer_var_size_ = 0; + } + + /** Constructor. */ + CoordBuffer( + void* buffer, + void* buffer_var, + uint64_t* buffer_size, + uint64_t* buffer_var_size) + : buffer_(buffer) + , buffer_var_(buffer_var) + , buffer_size_(buffer_size) + , buffer_var_size_(buffer_var_size) { + original_buffer_size_ = *buffer_size; + original_buffer_var_size_ = + (buffer_var_size_ != nullptr) ? *buffer_var_size : 0; + } +}; + } // namespace sm + } // namespace tiledb + #endif // TILEDB_TYPES_H diff --git a/tiledb/sm/query/writer.cc b/tiledb/sm/query/writer.cc index 94c8141c23ac..dcb52a9772e7 100644 --- a/tiledb/sm/query/writer.cc +++ b/tiledb/sm/query/writer.cc @@ -54,6 +54,10 @@ namespace sm { Writer::Writer() { array_ = nullptr; array_schema_ = nullptr; + coords_buffer_ = nullptr; + coords_buffer_size_ = nullptr; + coord_buffers_alloced_ = false; + coords_num_ = 0; global_write_state_.reset(nullptr); initialized_ = false; layout_ = Layout::ROW_MAJOR; @@ -63,22 +67,32 @@ Writer::Writer() { Writer::~Writer() { std::free(subarray_); + clear_coord_buffers(); } /* ****************************** */ /* API */ /* ****************************** */ +std::vector Writer::attributes() const { + std::vector ret; + for (const auto& it : attr_buffers_) + ret.push_back(it.first); + if (coords_buffer_ != nullptr) + ret.push_back(constants::coords); + return ret; +} + const ArraySchema* Writer::array_schema() const { return array_schema_; } -std::vector Writer::attributes() const { - return attributes_; -} +AttributeBuffer Writer::buffer(const std::string& name) const { + if (name == constants::coords) + return AttributeBuffer( + coords_buffer_, nullptr, coords_buffer_size_, nullptr); -AttributeBuffer Writer::buffer(const std::string& attribute) const { - auto attrbuf = attr_buffers_.find(attribute); + auto attrbuf = attr_buffers_.find(name); if (attrbuf == attr_buffers_.end()) return AttributeBuffer{}; return attrbuf->second; @@ -91,8 +105,14 @@ Status Writer::finalize() { } Status Writer::get_buffer( - const std::string& attribute, void** buffer, uint64_t** buffer_size) const { - auto it = attr_buffers_.find(attribute); + const std::string& name, void** buffer, uint64_t** buffer_size) const { + if (name == constants::coords) { + *buffer = coords_buffer_; + *buffer_size = coords_buffer_size_; + return Status::Ok(); + } + + auto it = attr_buffers_.find(name); if (it == attr_buffers_.end()) { *buffer = nullptr; *buffer_size = nullptr; @@ -105,12 +125,12 @@ Status Writer::get_buffer( } Status Writer::get_buffer( - const std::string& attribute, + const std::string& name, uint64_t** buffer_off, uint64_t** buffer_off_size, void** buffer_val, uint64_t** buffer_val_size) const { - auto it = attr_buffers_.find(attribute); + auto it = attr_buffers_.find(name); if (it == attr_buffers_.end()) { *buffer_off = nullptr; *buffer_off_size = nullptr; @@ -157,13 +177,10 @@ Status Writer::init() { "Cannot initialize query; Storage manager not set")); if (array_schema_ == nullptr) return LOG_STATUS( - Status::WriterError("Cannot initialize query; Array metadata not set")); + Status::WriterError("Cannot initialize query; Array schema not set")); if (attr_buffers_.empty()) return LOG_STATUS( Status::WriterError("Cannot initialize query; Buffers not set")); - if (attributes_.empty()) - return LOG_STATUS( - Status::WriterError("Cannot initialize query; Attributes not set")); if (subarray_ == nullptr) RETURN_NOT_OK(set_subarray(nullptr)); @@ -204,7 +221,7 @@ void Writer::set_array_schema(const ArraySchema* array_schema) { } Status Writer::set_buffer( - const std::string& attribute, void* buffer, uint64_t* buffer_size) { + const std::string& name, void* buffer, uint64_t* buffer_size) { // Check buffer if (buffer == nullptr || buffer_size == nullptr) return LOG_STATUS(Status::WriterError( @@ -216,39 +233,43 @@ Status Writer::set_buffer( Status::WriterError("Cannot set buffer; Array schema not set")); // Check that attribute exists - if (attribute != constants::coords && - array_schema_->attribute(attribute) == nullptr) + if (name != constants::coords && array_schema_->attribute(name) == nullptr) return LOG_STATUS( Status::WriterError("Cannot set buffer; Invalid attribute")); // Check that attribute is fixed-sized - bool var_size = - (attribute != constants::coords && array_schema_->var_size(attribute)); + bool var_size = (name != constants::coords && array_schema_->var_size(name)); if (var_size) return LOG_STATUS(Status::WriterError( - std::string("Cannot set buffer; Input attribute '") + attribute + + std::string("Cannot set buffer; Input attribute '") + name + "' is var-sized")); // Error if setting a new attribute after initialization - bool attr_exists = attr_buffers_.find(attribute) != attr_buffers_.end(); - if (initialized_ && !attr_exists) + bool attr_exists = attr_buffers_.find(name) != attr_buffers_.end(); + if (initialized_ && name != constants::coords && !attr_exists) return LOG_STATUS(Status::WriterError( - std::string("Cannot set buffer for new attribute '") + attribute + + std::string("Cannot set buffer for new attribute '") + name + "' after initialization")); - // Append to attributes only if buffer not set before - if (!attr_exists) - attributes_.push_back(std::string(attribute)); + // Error if setting non-existing coordinates after initialization + bool has_coords = coords_buffer_ != nullptr || !coord_buffers_.empty(); + if (initialized_ && name == constants::coords && has_coords) + return LOG_STATUS(Status::WriterError( + std::string("Cannot set coordinates after initialization"))); - // Set attribute buffer - attr_buffers_[attribute] = - AttributeBuffer(buffer, nullptr, buffer_size, nullptr); + if (name == constants::coords) { + coords_buffer_ = buffer; + coords_buffer_size_ = buffer_size; + } else { + attr_buffers_[name] = + AttributeBuffer(buffer, nullptr, buffer_size, nullptr); + } return Status::Ok(); } Status Writer::set_buffer( - const std::string& attribute, + const std::string& name, uint64_t* buffer_off, uint64_t* buffer_off_size, void* buffer_val, @@ -265,32 +286,26 @@ Status Writer::set_buffer( Status::WriterError("Cannot set buffer; Array schema not set")); // Check that attribute exists - if (attribute != constants::coords && - array_schema_->attribute(attribute) == nullptr) + if (name != constants::coords && array_schema_->attribute(name) == nullptr) return LOG_STATUS( Status::WriterError("Cannot set buffer; Invalid attribute")); // Check that attribute is var-sized - bool var_size = - (attribute != constants::coords && array_schema_->var_size(attribute)); + bool var_size = (name != constants::coords && array_schema_->var_size(name)); if (!var_size) return LOG_STATUS(Status::WriterError( - std::string("Cannot set buffer; Input attribute '") + attribute + + std::string("Cannot set buffer; Input attribute '") + name + "' is fixed-sized")); // Error if setting a new attribute after initialization - bool attr_exists = attr_buffers_.find(attribute) != attr_buffers_.end(); + bool attr_exists = attr_buffers_.find(name) != attr_buffers_.end(); if (initialized_ && !attr_exists) return LOG_STATUS(Status::WriterError( - std::string("Cannot set buffer for new attribute '") + attribute + + std::string("Cannot set buffer for new attribute '") + name + "' after initialization")); - // Append to attributes only if buffer not set before - if (!attr_exists) - attributes_.push_back(std::string(attribute)); - // Set attribute buffer - attr_buffers_[attribute] = + attr_buffers_[name] = AttributeBuffer(buffer_off, buffer_val, buffer_off_size, buffer_val_size); return Status::Ok(); @@ -366,6 +381,10 @@ void* Writer::subarray() const { Status Writer::write() { STATS_FUNC_IN(writer_write); + + // In case the user has provided a coordinates buffer + RETURN_NOT_OK(split_coords_buffer()); + if (check_coord_oob_) RETURN_NOT_OK(check_coord_oob()); @@ -398,17 +417,7 @@ void Writer::add_written_fragment_info(const URI& uri) { } Status Writer::check_attributes() { - // There should be no duplicate attributes - std::set unique_attributes; - int has_coords = 0; - for (const auto& attr : attributes_) { - unique_attributes.insert(attr); - if (attr == constants::coords) - has_coords = 1; - } - if (unique_attributes.size() != attributes_.size()) - return LOG_STATUS( - Status::WriterError("Check attributes failed; Duplicate attributes")); + bool has_coords = !coord_buffers_.empty() || coords_buffer_ != nullptr; // If the array is sparse, the coordinates must be provided if (!array_schema_->dense() && !has_coords) @@ -422,10 +431,9 @@ Status Writer::check_attributes() { "Unordered writes expect the coordinates of the cells to be written")); // All attributes must be provided - if (attributes_.size() != array_schema_->attribute_num() + has_coords) - return LOG_STATUS(Status::WriterError( - "Check attributes failed; Writes expect " - "all attributes (plus coordinates for unordered writes) to be set")); + if (attr_buffers_.size() != array_schema_->attribute_num()) + return LOG_STATUS( + Status::WriterError("Writes expect all attributes to be set")); return Status::Ok(); } @@ -438,15 +446,16 @@ Status Writer::check_buffer_sizes() const { auto cell_num = array_schema_->domain()->cell_num(subarray_); uint64_t expected_cell_num = 0; - for (const auto& attr : attributes_) { + for (const auto& it : attr_buffers_) { + auto attr = it.first; bool is_var = array_schema_->var_size(attr); - auto it = attr_buffers_.find(attr); - auto buffer_size = *it->second.buffer_size_; + auto buffer_size = *it.second.buffer_size_; if (is_var) { expected_cell_num = buffer_size / constants::cell_var_offset_size; } else { expected_cell_num = buffer_size / array_schema_->cell_size(attr); } + if (expected_cell_num != cell_num) { std::stringstream ss; ss << "Buffer sizes check failed; Invalid number of cells given for "; @@ -461,28 +470,48 @@ Status Writer::check_buffer_sizes() const { Status Writer::check_coord_dups(const std::vector& cell_pos) const { STATS_FUNC_IN(writer_check_coord_dups); - auto coords_buff_it = attr_buffers_.find(constants::coords); - if (coords_buff_it == attr_buffers_.end()) + if (coord_buffers_.empty()) { return LOG_STATUS( Status::WriterError("Cannot check for coordinate duplicates; " "Coordinates buffer not found")); + } - auto coords_buff = (unsigned char*)coords_buff_it->second.buffer_; - auto coords_size = array_schema_->coords_size(); - auto coords_num = cell_pos.size(); - - if (coords_num < 2) + if (coords_num_ < 2) return Status::Ok(); - for (uint64_t i = 1; i < coords_num; ++i) { - if (!memcmp( - coords_buff + cell_pos[i] * coords_size, - coords_buff + cell_pos[i - 1] * coords_size, - coords_size)) { + // Prepare auxiliary vector for better performance + auto dim_num = array_schema_->dim_num(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + } + + auto statuses = parallel_for(1, coords_num_, [&](uint64_t i) { + // Check for duplicate in adjacent cells + bool found_dup = true; + for (unsigned d = 0; d < dim_num; ++d) { + if (memcmp( + buffs[d] + cell_pos[i] * coord_sizes_[d], + buffs[d] + cell_pos[i - 1] * coord_sizes_[d], + coord_sizes_[d]) != 0) { // Not the same + found_dup = false; + break; + } + } + + // Found duplicate + if (found_dup) { return LOG_STATUS( Status::WriterError("Duplicate coordinates are not allowed")); } - } + + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); return Status::Ok(); STATS_FUNC_OUT(writer_check_coord_dups); @@ -491,224 +520,116 @@ Status Writer::check_coord_dups(const std::vector& cell_pos) const { Status Writer::check_coord_dups() const { STATS_FUNC_IN(writer_check_coord_dups_global); - auto coords_buff_it = attr_buffers_.find(constants::coords); - if (coords_buff_it == attr_buffers_.end()) + if (coord_buffers_.empty()) { return LOG_STATUS( Status::WriterError("Cannot check for coordinate duplicates; " "Coordinates buffer not found")); + } - auto coords_buff = (unsigned char*)coords_buff_it->second.buffer_; - auto coords_buff_size = *coords_buff_it->second.buffer_size_; - auto coords_size = array_schema_->coords_size(); - auto coords_num = coords_buff_size / coords_size; - - if (coords_num < 2) + if (coords_num_ < 2) return Status::Ok(); - for (uint64_t i = 1; i < coords_num; ++i) { - if (!memcmp( - coords_buff + i * coords_size, - coords_buff + (i - 1) * coords_size, - coords_size)) { + // Prepare auxiliary vector for better performance + auto dim_num = array_schema_->dim_num(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + } + + auto statuses = parallel_for(1, coords_num_, [&](uint64_t i) { + // Check for duplicate in adjacent cells + bool found_dup = true; + for (unsigned d = 0; d < dim_num; ++d) { + if (memcmp( + buffs[d] + i * coord_sizes_[d], + buffs[d] + (i - 1) * coord_sizes_[d], + coord_sizes_[d]) != 0) { // Not the same + found_dup = false; + break; + } + } + + // Found duplicate + if (found_dup) { return LOG_STATUS( Status::WriterError("Duplicate coordinates are not allowed")); } - } - return Status::Ok(); - - STATS_FUNC_OUT(writer_check_coord_dups_global); -} + return Status::Ok(); + }); -Status Writer::check_coord_oob() const { - switch (array_schema_->domain()->type()) { - case Datatype::INT8: - return check_coord_oob(); - case Datatype::UINT8: - return check_coord_oob(); - case Datatype::INT16: - return check_coord_oob(); - case Datatype::UINT16: - return check_coord_oob(); - case Datatype::INT32: - return check_coord_oob(); - case Datatype::UINT32: - return check_coord_oob(); - case Datatype::INT64: - return check_coord_oob(); - case Datatype::UINT64: - return check_coord_oob(); - case Datatype::FLOAT32: - return check_coord_oob(); - case Datatype::FLOAT64: - return check_coord_oob(); - case Datatype::DATETIME_YEAR: - case Datatype::DATETIME_MONTH: - case Datatype::DATETIME_WEEK: - case Datatype::DATETIME_DAY: - case Datatype::DATETIME_HR: - case Datatype::DATETIME_MIN: - case Datatype::DATETIME_SEC: - case Datatype::DATETIME_MS: - case Datatype::DATETIME_US: - case Datatype::DATETIME_NS: - case Datatype::DATETIME_PS: - case Datatype::DATETIME_FS: - case Datatype::DATETIME_AS: - return check_coord_oob(); - case Datatype::CHAR: - case Datatype::STRING_ASCII: - case Datatype::STRING_UTF8: - case Datatype::STRING_UTF16: - case Datatype::STRING_UTF32: - case Datatype::STRING_UCS2: - case Datatype::STRING_UCS4: - case Datatype::ANY: - // Not supported domain type - assert(false); - return LOG_STATUS( - Status::WriterError("Cannot perform out-of-bounds check on " - "coordinates; Domain type not supported")); + // Check all statuses + for (auto& st : statuses) { + if (!st.ok()) + return st; } return Status::Ok(); + + STATS_FUNC_OUT(writer_check_coord_dups_global); } -template Status Writer::check_coord_oob() const { - auto coords_it = attr_buffers_.find(constants::coords); - // Applicable only to sparse writes - exit if coordinates do not exist - if (coords_it == attr_buffers_.end()) + if (coord_buffers_.empty()) return Status::Ok(); - // Get coordinates buffer - auto coords_buff = (T*)coords_it->second.buffer_; - auto coords_buff_size = *(coords_it->second.buffer_size_); - auto coords_num = coords_buff_size / array_schema_->coords_size(); - auto dim_num = array_schema_->dim_num(); - auto domain = (T*)array_schema_->domain()->domain(); - - if (coords_num == 0) + // Exit if there are no coordinates to write + if (coords_num_ == 0) return Status::Ok(); + // Prepare auxiliary vector for better performance + auto dim_num = array_schema_->dim_num(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + } + // Check if all coordinates fall in the domain in parallel - auto statuses = parallel_for(0, coords_num, [&](uint64_t i) { - if (!utils::geometry::coords_in_rect( - &coords_buff[i * dim_num], domain, dim_num)) { - std::stringstream ss; - ss << "Write failed; Coordinates (" << coords_buff[i * dim_num]; - for (unsigned int j = 1; j < dim_num; ++j) - ss << "," << coords_buff[i * dim_num + j]; - ss << ") are out of bounds"; - return LOG_STATUS(Status::WriterError(ss.str())); - } - return Status::Ok(); - }); + auto statuses = + parallel_for_2d(0, coords_num_, 0, dim_num, [&](uint64_t c, unsigned d) { + auto dim = array_schema_->dimension(d); + std::string err_msg; + if (dim->oob(buffs[d] + c * coord_sizes_[d], &err_msg)) + return Status::WriterError(err_msg); + return Status::Ok(); + }); // Check all statuses - for (auto& st : statuses) { - if (!st.ok()) - return st; - } + for (auto& st : statuses) + RETURN_NOT_OK_ELSE(st, LOG_STATUS(st)); // Success return Status::Ok(); } Status Writer::check_global_order() const { - STATS_FUNC_IN(writer_check_global_order); - - switch (array_schema_->domain()->type()) { - case Datatype::INT8: - return check_global_order(); - case Datatype::UINT8: - return check_global_order(); - case Datatype::INT16: - return check_global_order(); - case Datatype::UINT16: - return check_global_order(); - case Datatype::INT32: - return check_global_order(); - case Datatype::UINT32: - return check_global_order(); - case Datatype::INT64: - return check_global_order(); - case Datatype::UINT64: - return check_global_order(); - case Datatype::FLOAT32: - return check_global_order(); - case Datatype::FLOAT64: - return check_global_order(); - case Datatype::DATETIME_YEAR: - case Datatype::DATETIME_MONTH: - case Datatype::DATETIME_WEEK: - case Datatype::DATETIME_DAY: - case Datatype::DATETIME_HR: - case Datatype::DATETIME_MIN: - case Datatype::DATETIME_SEC: - case Datatype::DATETIME_MS: - case Datatype::DATETIME_US: - case Datatype::DATETIME_NS: - case Datatype::DATETIME_PS: - case Datatype::DATETIME_FS: - case Datatype::DATETIME_AS: - return check_global_order(); - case Datatype::CHAR: - case Datatype::STRING_ASCII: - case Datatype::STRING_UTF8: - case Datatype::STRING_UTF16: - case Datatype::STRING_UTF32: - case Datatype::STRING_UCS2: - case Datatype::STRING_UCS4: - case Datatype::ANY: - // Not supported domain type - assert(false); - return LOG_STATUS( - Status::WriterError("Cannot perform global order check on " - "coordinates; Domain type not supported")); - } - - return Status::Ok(); - - STATS_FUNC_OUT(writer_check_global_order); -} - -template -Status Writer::check_global_order() const { - auto coords_it = attr_buffers_.find(constants::coords); - // Applicable only to sparse writes - exit if coordinates do not exist - if (coords_it == attr_buffers_.end()) + if (coord_buffers_.empty() || coords_num_ < 2) return Status::Ok(); - // Get coordinates buffer - auto coords_buff = (T*)coords_it->second.buffer_; - auto coords_buff_size = *(coords_it->second.buffer_size_); - auto coords_num = coords_buff_size / array_schema_->coords_size(); + // Prepare auxiliary vector for better performance auto dim_num = array_schema_->dim_num(); - auto domain = array_schema_->domain(); - - if (coords_num < 2) - return Status::Ok(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = coord_buffers_.find(dim_name)->second.buffer_; + } // Check if all coordinates fall in the domain in parallel - auto statuses = parallel_for(0, coords_num - 1, [&](uint64_t i) { - auto tile_cmp = domain->tile_order_cmp( - &coords_buff[i * dim_num], &coords_buff[(i + 1) * dim_num]); - auto fail = (tile_cmp > 0) || - ((tile_cmp == 0) && domain->cell_order_cmp( - &coords_buff[i * dim_num], - &coords_buff[(i + 1) * dim_num]) > 0); + auto domain = array_schema_->domain(); + auto statuses = parallel_for(0, coords_num_ - 1, [&](uint64_t i) { + auto tile_cmp = domain->tile_order_cmp(buffs, i, i + 1); + auto fail = (tile_cmp > 0) || ((tile_cmp == 0) && + domain->cell_order_cmp(buffs, i, i + 1) > 0); + if (fail) { std::stringstream ss; - ss << "Write failed; Coordinates (" << coords_buff[i * dim_num]; - for (unsigned int j = 1; j < dim_num; ++j) - ss << "," << coords_buff[i * dim_num + j]; - ss << ") succeed (" << coords_buff[(i + 1) * dim_num]; - for (unsigned int j = 1; j < dim_num; ++j) - ss << "," << coords_buff[(i + 1) * dim_num + j]; - ss << ") in the global order"; + ss << "Write failed; Coordinates " << coords_to_str(i); + ss << " succeed " << coords_to_str(i + 1); + ss << " in the global order"; return LOG_STATUS(Status::WriterError(ss.str())); } return Status::Ok(); @@ -816,13 +737,35 @@ Status Writer::check_subarray() const { return Status::Ok(); } +void Writer::clear_coord_buffers() { + // Applicable only if the coordinate buffers have been allocated by + // TileDB + if (coord_buffers_alloced_) { + for (auto& buff : coord_buffers_) { + std::free(buff.second.buffer_); + std::free(buff.second.buffer_var_); + } + coord_buffer_sizes_.clear(); + coord_buffers_.clear(); + coord_buffers_alloced_ = false; + } +} + Status Writer::close_files(FragmentMetadata* meta) const { - for (const auto& attr : attributes_) { + // Close attribute files + for (const auto& it : attr_buffers_) { + const auto& attr = it.first; RETURN_NOT_OK(storage_manager_->close_file(meta->attr_uri(attr))); if (array_schema_->var_size(attr)) RETURN_NOT_OK(storage_manager_->close_file(meta->attr_var_uri(attr))); } + // Close coordinate files + // TODO: close separate coordinate files + if (!coord_buffers_.empty()) + RETURN_NOT_OK( + storage_manager_->close_file(meta->attr_uri(constants::coords))); + return Status::Ok(); } @@ -831,23 +774,50 @@ Status Writer::compute_coord_dups( std::set* coord_dups) const { STATS_FUNC_IN(writer_compute_coord_dups); - auto coords_buff_it = attr_buffers_.find(constants::coords); - if (coords_buff_it == attr_buffers_.end()) + if (coord_buffers_.empty()) { return LOG_STATUS( Status::WriterError("Cannot check for coordinate duplicates; " "Coordinates buffer not found")); + } - auto coords_buff = (unsigned char*)coords_buff_it->second.buffer_; - auto coords_size = array_schema_->coords_size(); - auto coords_num = cell_pos.size(); + if (coords_num_ < 2) + return Status::Ok(); + + // Prepare auxiliary vector for better performance + auto dim_num = array_schema_->dim_num(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + } - for (uint64_t i = 1; i < coords_num; ++i) { - if (!memcmp( - coords_buff + cell_pos[i] * coords_size, - coords_buff + cell_pos[i - 1] * coords_size, - coords_size)) { + std::mutex mtx; + auto statuses = parallel_for(1, coords_num_, [&](uint64_t i) { + // Check for duplicate in adjacent cells + bool found_dup = true; + for (unsigned d = 0; d < dim_num; ++d) { + if (memcmp( + buffs[d] + cell_pos[i] * coord_sizes_[d], + buffs[d] + cell_pos[i - 1] * coord_sizes_[d], + coord_sizes_[d]) != 0) { // Not the same + found_dup = false; + break; + } + } + + // Found duplicate + if (found_dup) { + std::lock_guard lock(mtx); coord_dups->insert(cell_pos[i]); } + + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) { + if (!st.ok()) + return st; } return Status::Ok(); @@ -858,24 +828,50 @@ Status Writer::compute_coord_dups( Status Writer::compute_coord_dups(std::set* coord_dups) const { STATS_FUNC_IN(writer_compute_coord_dups_global); - auto coords_buff_it = attr_buffers_.find(constants::coords); - if (coords_buff_it == attr_buffers_.end()) + if (coord_buffers_.empty()) { return LOG_STATUS( Status::WriterError("Cannot check for coordinate duplicates; " "Coordinates buffer not found")); + } - auto coords_buff = (unsigned char*)coords_buff_it->second.buffer_; - auto coords_buff_size = *coords_buff_it->second.buffer_size_; - auto coords_size = array_schema_->coords_size(); - auto coords_num = coords_buff_size / coords_size; + if (coords_num_ < 2) + return Status::Ok(); - for (uint64_t i = 1; i < coords_num; ++i) { - if (!memcmp( - coords_buff + i * coords_size, - coords_buff + (i - 1) * coords_size, - coords_size)) { + // Prepare auxiliary vector for better performance + auto dim_num = array_schema_->dim_num(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + } + + std::mutex mtx; + auto statuses = parallel_for(1, coords_num_, [&](uint64_t i) { + // Check for duplicate in adjacent cells + bool found_dup = true; + for (unsigned d = 0; d < dim_num; ++d) { + if (memcmp( + buffs[d] + i * coord_sizes_[d], + buffs[d] + (i - 1) * coord_sizes_[d], + coord_sizes_[d]) != 0) { // Not the same + found_dup = false; + break; + } + } + + // Found duplicate + if (found_dup) { + std::lock_guard lock(mtx); coord_dups->insert(i); } + + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) { + if (!st.ok()) + return st; } return Status::Ok(); @@ -883,42 +879,97 @@ Status Writer::compute_coord_dups(std::set* coord_dups) const { STATS_FUNC_OUT(writer_compute_coord_dups_global); } +Status Writer::compute_coords_metadata( + const std::unordered_map>& tiles, + FragmentMetadata* meta) const { + auto coords_type = array_schema_->coords_type(); + switch (coords_type) { + case Datatype::INT8: + return compute_coords_metadata(tiles, meta); + case Datatype::UINT8: + return compute_coords_metadata(tiles, meta); + case Datatype::INT16: + return compute_coords_metadata(tiles, meta); + case Datatype::UINT16: + return compute_coords_metadata(tiles, meta); + case Datatype::INT32: + return compute_coords_metadata(tiles, meta); + case Datatype::UINT32: + return compute_coords_metadata(tiles, meta); + case Datatype::INT64: + return compute_coords_metadata(tiles, meta); + case Datatype::UINT64: + return compute_coords_metadata(tiles, meta); + case Datatype::FLOAT32: + assert(!array_schema_->dense()); + return compute_coords_metadata(tiles, meta); + case Datatype::FLOAT64: + assert(!array_schema_->dense()); + return compute_coords_metadata(tiles, meta); + case Datatype::DATETIME_YEAR: + case Datatype::DATETIME_MONTH: + case Datatype::DATETIME_WEEK: + case Datatype::DATETIME_DAY: + case Datatype::DATETIME_HR: + case Datatype::DATETIME_MIN: + case Datatype::DATETIME_SEC: + case Datatype::DATETIME_MS: + case Datatype::DATETIME_US: + case Datatype::DATETIME_NS: + case Datatype::DATETIME_PS: + case Datatype::DATETIME_FS: + case Datatype::DATETIME_AS: + return compute_coords_metadata(tiles, meta); + default: + return LOG_STATUS(Status::WriterError( + "Cannot compute coordinates metadata; Unsupported domain type")); + } + + return Status::Ok(); +} + template Status Writer::compute_coords_metadata( - const std::vector& tiles, FragmentMetadata* meta) const { + const std::unordered_map>& tiles, + FragmentMetadata* meta) const { STATS_FUNC_IN(writer_compute_coords_metadata); // Check if tiles are empty - if (tiles.empty()) + if (tiles.empty() || tiles.begin()->second.empty()) return Status::Ok(); // For easy reference - auto coords_size = array_schema_->coords_size(); + auto tile_num = tiles.begin()->second.size(); auto dim_num = array_schema_->dim_num(); - std::vector mbr; - mbr.resize(2 * dim_num); // Compute MBRs - for (uint64_t tile_id = 0; tile_id < tiles.size(); tile_id++) { - const auto& tile = tiles[tile_id]; + auto statuses = parallel_for(0, tile_num, [&](uint64_t t) { + std::vector mbr(2 * dim_num); + std::vector data(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + auto tiles_it = tiles.find(dim_name); + data[d] = (T*)(tiles_it->second[t].data()); + } + // Initialize MBR with the first coords - auto data = (T*)tile.data(); - auto cell_num = tile.size() / coords_size; + auto cell_num = tiles.begin()->second[t].cell_num(); assert(cell_num > 0); - for (unsigned i = 0; i < dim_num; ++i) { - mbr[2 * i] = data[i]; - mbr[2 * i + 1] = data[i]; + for (unsigned d = 0; d < dim_num; ++d) { + mbr[2 * d] = data[d][0]; + mbr[2 * d + 1] = data[d][0]; } // Expand the MBR with the rest coords - for (uint64_t i = 1; i < cell_num; ++i) - utils::geometry::expand_mbr(&mbr[0], &data[i * dim_num], dim_num); + for (uint64_t c = 1; c < cell_num; ++c) + utils::geometry::expand_mbr(data, c, &mbr[0]); - meta->set_mbr(tile_id, &mbr[0]); - } + meta->set_mbr(t, &mbr[0]); + return Status::Ok(); + }); // Set last tile cell number - meta->set_last_tile_cell_num(tiles.back().size() / coords_size); + meta->set_last_tile_cell_num(tiles.begin()->second.back().cell_num()); return Status::Ok(); @@ -1007,6 +1058,27 @@ Status Writer::create_fragment( STATS_FUNC_OUT(writer_create_fragment); } +Status Writer::filter_attr_tiles( + std::unordered_map>* attr_tiles) const { + auto attr_num = attr_buffers_.size(); + auto statuses = parallel_for(0, attr_num, [&](uint64_t i) { + auto buff_it = attr_buffers_.begin(); + std::advance(buff_it, i); + const auto& attr = buff_it->first; + auto& tiles = (*attr_tiles)[attr]; + RETURN_CANCEL_OR_ERROR(filter_tiles(attr, &tiles)); + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) { + if (!st.ok()) + return st; + } + + return Status::Ok(); +} + Status Writer::filter_tiles( const std::string& attribute, std::vector* tiles) const { STATS_FUNC_IN(writer_filter_tiles); @@ -1054,108 +1126,72 @@ Status Writer::filter_tile( return Status::Ok(); } -Status Writer::finalize_global_write_state() { - auto coords_type = array_schema_->coords_type(); - switch (coords_type) { - case Datatype::INT8: - return finalize_global_write_state(); - case Datatype::UINT8: - return finalize_global_write_state(); - case Datatype::INT16: - return finalize_global_write_state(); - case Datatype::UINT16: - return finalize_global_write_state(); - case Datatype::INT32: - return finalize_global_write_state(); - case Datatype::UINT32: - return finalize_global_write_state(); - case Datatype::INT64: - return finalize_global_write_state(); - case Datatype::UINT64: - return finalize_global_write_state(); - case Datatype::FLOAT32: - return finalize_global_write_state(); - case Datatype::FLOAT64: - return finalize_global_write_state(); - case Datatype::DATETIME_YEAR: - case Datatype::DATETIME_MONTH: - case Datatype::DATETIME_WEEK: - case Datatype::DATETIME_DAY: - case Datatype::DATETIME_HR: - case Datatype::DATETIME_MIN: - case Datatype::DATETIME_SEC: - case Datatype::DATETIME_MS: - case Datatype::DATETIME_US: - case Datatype::DATETIME_NS: - case Datatype::DATETIME_PS: - case Datatype::DATETIME_FS: - case Datatype::DATETIME_AS: - return finalize_global_write_state(); - default: - return LOG_STATUS(Status::WriterError( - "Cannot finalize global write state; Unsupported domain type")); - } - - return Status::Ok(); -} - -template Status Writer::finalize_global_write_state() { assert(layout_ == Layout::GLOBAL_ORDER); auto meta = global_write_state_->frag_meta_.get(); + auto uri = meta->fragment_uri(); // Handle last tile - Status st = global_write_handle_last_tile(); + Status st = global_write_handle_last_tile(); if (!st.ok()) { close_files(meta); - storage_manager_->vfs()->remove_dir(meta->fragment_uri()); - global_write_state_.reset(nullptr); + clean_up(uri); return st; } // Close all files - st = close_files(meta); - if (!st.ok()) { - storage_manager_->vfs()->remove_dir(meta->fragment_uri()); - global_write_state_.reset(nullptr); - return st; - } + RETURN_NOT_OK_ELSE(close_files(meta), clean_up(uri)); // Check that the same number of cells was written across attributes - for (size_t i = 1; i < attributes_.size(); ++i) { - if (global_write_state_->cells_written_[attributes_[i]] != - global_write_state_->cells_written_[attributes_[i - 1]]) { - storage_manager_->vfs()->remove_dir(meta->fragment_uri()); - global_write_state_.reset(nullptr); + // and dimensions + uint64_t cell_num = 0; + if (!coord_buffers_.empty()) { + const auto& dim_name = coord_buffers_.begin()->first; + cell_num = global_write_state_->coord_cells_written_[dim_name]; + } else if (!attr_buffers_.empty()) { + auto attr = attr_buffers_.begin()->first; + cell_num = global_write_state_->attr_cells_written_[attr]; + } + + for (const auto& it : attr_buffers_) { + auto attr = it.first; + if (global_write_state_->attr_cells_written_[attr] != cell_num) { + std::cout << global_write_state_->attr_cells_written_[attr] << "\n"; + clean_up(uri); return LOG_STATUS(Status::WriterError( "Failed to finalize global write state; Different " - "number of cells written across attributes")); + "number of cells written across attributes and coordinates")); + } + } + for (const auto& it : coord_buffers_) { + const auto& dim_name = it.first; + if (global_write_state_->coord_cells_written_[dim_name] != cell_num) { + clean_up(uri); + return LOG_STATUS(Status::WriterError( + "Failed to finalize global write state; Different " + "number of cells written across attributes and coordinates")); } } // Check if the total number of cells written is equal to the subarray size - if (!has_coords()) { - auto cells_written = global_write_state_->cells_written_[attributes_[0]]; - if (cells_written != array_schema_->domain()->cell_num((T*)subarray_)) { - storage_manager_->vfs()->remove_dir(meta->fragment_uri()); - global_write_state_.reset(nullptr); + if (coord_buffers_.empty()) { + auto expected_cell_num = array_schema_->domain()->cell_num(subarray_); + if (cell_num != expected_cell_num) { + clean_up(uri); std::stringstream ss; ss << "Failed to finalize global write state; Number " - << "of cells written (" << cells_written + << "of cells written (" << cell_num << ") is different from the number of cells expected (" - << array_schema_->domain()->cell_num((T*)subarray_) - << ") for the query subarray"; + << expected_cell_num << ") for the query subarray"; return LOG_STATUS(Status::WriterError(ss.str())); } } // Flush fragment metadata to storage - st = meta->store(array_->get_encryption_key()); - if (!st.ok()) - storage_manager_->vfs()->remove_dir(meta->fragment_uri()); + RETURN_NOT_OK_ELSE(meta->store(array_->get_encryption_key()), clean_up(uri)); // Add written fragment info - add_written_fragment_info(meta->fragment_uri()); + add_written_fragment_info(uri); // Delete global write state global_write_state_.reset(nullptr); @@ -1164,72 +1200,17 @@ Status Writer::finalize_global_write_state() { } Status Writer::global_write() { - STATS_FUNC_IN(writer_global_write); - // Applicable only to global write on dense/sparse arrays assert(layout_ == Layout::GLOBAL_ORDER); - auto coords_type = array_schema_->coords_type(); - switch (coords_type) { - case Datatype::INT8: - return global_write(); - case Datatype::UINT8: - return global_write(); - case Datatype::INT16: - return global_write(); - case Datatype::UINT16: - return global_write(); - case Datatype::INT32: - return global_write(); - case Datatype::UINT32: - return global_write(); - case Datatype::INT64: - return global_write(); - case Datatype::UINT64: - return global_write(); - case Datatype::FLOAT32: - assert(!array_schema_->dense()); - return global_write(); - case Datatype::FLOAT64: - assert(!array_schema_->dense()); - return global_write(); - case Datatype::DATETIME_YEAR: - case Datatype::DATETIME_MONTH: - case Datatype::DATETIME_WEEK: - case Datatype::DATETIME_DAY: - case Datatype::DATETIME_HR: - case Datatype::DATETIME_MIN: - case Datatype::DATETIME_SEC: - case Datatype::DATETIME_MS: - case Datatype::DATETIME_US: - case Datatype::DATETIME_NS: - case Datatype::DATETIME_PS: - case Datatype::DATETIME_FS: - case Datatype::DATETIME_AS: - return global_write(); - default: - return LOG_STATUS(Status::WriterError( - "Cannot write in global layout; Unsupported domain type")); - } - - return Status::Ok(); - - STATS_FUNC_OUT(writer_global_write); -} - -template -Status Writer::global_write() { // Initialize the global write state if this is the first invocation if (!global_write_state_) RETURN_CANCEL_OR_ERROR(init_global_write_state()); auto frag_meta = global_write_state_->frag_meta_.get(); auto uri = frag_meta->fragment_uri(); - auto num_attributes = attributes_.size(); // Check for coordinate duplicates - bool has_coords = - attr_buffers_.find(constants::coords) != attr_buffers_.end(); - if (has_coords) { + if (!coord_buffers_.empty()) { if (check_coord_dups_ && !dedup_coords_) RETURN_CANCEL_OR_ERROR(check_coord_dups()); if (check_global_order_) @@ -1241,59 +1222,73 @@ Status Writer::global_write() { if (dedup_coords_) RETURN_CANCEL_OR_ERROR(compute_coord_dups(&coord_dups)); - // Prepare tiles for all attributes - std::vector> attribute_tiles(num_attributes); - auto statuses = parallel_for(0, num_attributes, [&](uint64_t i) { - const auto& attr = attributes_[i]; - auto& full_tiles = attribute_tiles[i]; - RETURN_CANCEL_OR_ERROR(prepare_full_tiles(attr, coord_dups, &full_tiles)); + std::unordered_map> coord_tiles; + std::unordered_map> attr_tiles; + auto statuses = parallel_for(0, 2, [&](uint64_t i) { + if (i == 0) { + // Prepare coordinate tiles + RETURN_CANCEL_OR_ERROR_ELSE( + prepare_full_coord_tiles(coord_dups, &coord_tiles), clean_up(uri)); + } else { + // Prepare attribute tiles + RETURN_CANCEL_OR_ERROR_ELSE( + prepare_full_attr_tiles(coord_dups, &attr_tiles), clean_up(uri)); + } + return Status::Ok(); }); - // Check all statuses - for (auto& st : statuses) { - if (!st.ok()) { - storage_manager_->vfs()->remove_dir(uri); - global_write_state_.reset(nullptr); - return st; - } + // Check statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); + + // Find number of tiles + uint64_t tile_num = 0; + if (!attr_tiles.empty()) { + auto attr = attr_buffers_.begin()->first; + auto it = attr_tiles.begin(); + tile_num = array_schema_->var_size(it->first) ? it->second.size() / 2 : + it->second.size(); + } else { + assert(!coord_tiles.empty()); + tile_num = coord_tiles[0].size(); } - statuses.clear(); - // Increment number of tiles in the fragment metadata - uint64_t num_tiles = array_schema_->var_size(attributes_[0]) ? - attribute_tiles[0].size() / 2 : - attribute_tiles[0].size(); - auto new_num_tiles = frag_meta->tile_index_base() + num_tiles; + // No cells to be written + if (tile_num == 0) + return Status::Ok(); + + // Set new number of tiles in the fragment metadata + auto new_num_tiles = frag_meta->tile_index_base() + tile_num; frag_meta->set_num_tiles(new_num_tiles); - // Filter all tiles - statuses = parallel_for(0, num_attributes, [&](uint64_t i) { - const auto& attr = attributes_[i]; - auto& full_tiles = attribute_tiles[i]; - if (attr == constants::coords) - RETURN_CANCEL_OR_ERROR(compute_coords_metadata(full_tiles, frag_meta)); - RETURN_CANCEL_OR_ERROR(filter_tiles(attr, &full_tiles)); + std::vector coords_tiles; + statuses = parallel_for(0, 2, [&](uint64_t i) { + if (i == 0) { + // Filter coordinate tiles + RETURN_CANCEL_OR_ERROR_ELSE( + compute_coords_metadata(coord_tiles, frag_meta), clean_up(uri)); + // TODO: remove and filter coordinate tiles separately + RETURN_CANCEL_OR_ERROR_ELSE( + zip_coord_tiles(coord_tiles, &coords_tiles), clean_up(uri)); + RETURN_CANCEL_OR_ERROR_ELSE( + filter_tiles(constants::coords, &coords_tiles), clean_up(uri)); + } else { + // Filter attribute tiles + RETURN_CANCEL_OR_ERROR_ELSE( + filter_attr_tiles(&attr_tiles), clean_up(uri)); + } + return Status::Ok(); }); - // Check all statuses - for (auto& st : statuses) { - if (!st.ok()) { - storage_manager_->vfs()->remove_dir(uri); - global_write_state_.reset(nullptr); - return st; - } - } - statuses.clear(); + // Check statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); // Write tiles for all attributes - auto st = write_all_tiles(frag_meta, attribute_tiles); - if (!st.ok()) { - storage_manager_->vfs()->remove_dir(uri); - global_write_state_.reset(nullptr); - return st; - } + RETURN_NOT_OK_ELSE( + write_all_tiles(frag_meta, attr_tiles, coords_tiles), clean_up(uri)); // Increment the tile index base for the next global order write. frag_meta->set_tile_index_base(new_num_tiles); @@ -1301,43 +1296,64 @@ Status Writer::global_write() { return Status::Ok(); } -template Status Writer::global_write_handle_last_tile() { - // See if any last tiles are nonempty. - bool all_empty = true; - for (const auto& attr : attributes_) { - auto& last_tile = global_write_state_->last_tiles_[attr].first; - if (!last_tile.empty()) { - all_empty = false; - break; - } - } - - // Return early if there are no tiles to write. - if (all_empty) + if (all_last_tiles_empty()) return Status::Ok(); // Reserve space for the last tile in the fragment metadata auto meta = global_write_state_->frag_meta_.get(); meta->set_num_tiles(meta->tile_index_base() + 1); + const auto& uri = global_write_state_->frag_meta_->fragment_uri(); + + // Filter last tiles + std::vector coords_tiles; + std::unordered_map> attr_tiles; + auto statuses = parallel_for(0, 2, [&](uint64_t i) { + if (i == 0) { + // Filter last coordinate tiles + RETURN_NOT_OK_ELSE(filter_last_coord_tiles(&coords_tiles), clean_up(uri)); + } else { + // Filter last attribute tiles + RETURN_NOT_OK_ELSE(filter_last_attr_tiles(&attr_tiles), clean_up(uri)); + } + + return Status::Ok(); + }); + + // Check statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); + + // Write the last tiles + RETURN_NOT_OK(write_all_tiles(meta, attr_tiles, coords_tiles)); + + // Increment the tile index base. + meta->set_tile_index_base(meta->tile_index_base() + 1); + + return Status::Ok(); +} + +Status Writer::filter_last_attr_tiles( + std::unordered_map>* attr_tiles) const { + // Initialize attribute tiles + for (auto it : attr_buffers_) + (*attr_tiles)[it.first] = std::vector(); - // Filter the last tiles - uint64_t num_attributes = attributes_.size(); - std::vector> attribute_tiles(num_attributes); - auto statuses = parallel_for(0, num_attributes, [&](uint64_t i) { - const auto& attr = attributes_[i]; - auto& last_tile = global_write_state_->last_tiles_[attr].first; - auto& last_tile_var = global_write_state_->last_tiles_[attr].second; + uint64_t attr_num = attr_buffers_.size(); + auto statuses = parallel_for(0, attr_num, [&](uint64_t i) { + auto buff_it = attr_buffers_.begin(); + std::advance(buff_it, i); + const auto& attr = buff_it->first; + auto& last_tile = global_write_state_->last_attr_tiles_[attr].first; + auto& last_tile_var = global_write_state_->last_attr_tiles_[attr].second; if (!last_tile.empty()) { - std::vector& tiles = attribute_tiles[i]; + std::vector& tiles = (*attr_tiles)[attr]; // Note making shallow clones here, as it's not necessary to copy the // underlying tile Buffers. tiles.push_back(last_tile.clone(false)); if (!last_tile_var.empty()) tiles.push_back(last_tile_var.clone(false)); - if (attr == constants::coords) - RETURN_NOT_OK(compute_coords_metadata(tiles, meta)); RETURN_NOT_OK(filter_tiles(attr, &tiles)); } return Status::Ok(); @@ -1347,17 +1363,62 @@ Status Writer::global_write_handle_last_tile() { for (auto& st : statuses) RETURN_NOT_OK(st); - // Write the last tiles - RETURN_NOT_OK(write_all_tiles(meta, attribute_tiles)); + return Status::Ok(); +} - // Increment the tile index base. - meta->set_tile_index_base(meta->tile_index_base() + 1); +Status Writer::filter_last_coord_tiles(std::vector* coords_tiles) const { + // Prepare coord tiles map + std::unordered_map> coord_tiles; + auto dim_num = array_schema_->dim_num(); + auto meta = global_write_state_->frag_meta_.get(); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + coord_tiles[dim_name] = std::vector(); + } + + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + auto& last_tile = global_write_state_->last_coord_tiles_[dim_name].first; + auto& last_tile_var = + global_write_state_->last_coord_tiles_[dim_name].second; + + if (!last_tile.empty()) { + auto& tiles = coord_tiles[dim_name]; + // Note making shallow clones here, as it's not necessary to copy the + // underlying tile Buffers. + tiles.push_back(last_tile.clone(false)); + if (!last_tile_var.empty()) + tiles.push_back(last_tile_var.clone(false)); + } + } + + RETURN_NOT_OK(compute_coords_metadata(coord_tiles, meta)); + // TODO: remove + RETURN_NOT_OK(zip_coord_tiles(coord_tiles, coords_tiles)); + RETURN_NOT_OK(filter_tiles(constants::coords, coords_tiles)); return Status::Ok(); } -bool Writer::has_coords() const { - return attr_buffers_.find(constants::coords) != attr_buffers_.end(); +bool Writer::all_last_tiles_empty() const { + // See if any last coordinate tiles are nonempty + auto dim_num = array_schema_->dim_num(); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + auto& last_tile = global_write_state_->last_coord_tiles_[dim_name].first; + if (!last_tile.empty()) + return false; + } + + // See if any last coordinate tiles are nonempty + for (const auto& it : attr_buffers_) { + const auto& attr = it.first; + auto& last_tile = global_write_state_->last_attr_tiles_[attr].first; + if (!last_tile.empty()) + return false; + } + + return true; } Status Writer::init_global_write_state() { @@ -1369,61 +1430,73 @@ Status Writer::init_global_write_state() { "Cannot initialize global write state; State not properly finalized")); global_write_state_.reset(new GlobalWriteState); - // Create fragments + bool has_coords = !coord_buffers_.empty(); + + // Create fragment RETURN_NOT_OK( - create_fragment(!has_coords(), &(global_write_state_->frag_meta_))); + create_fragment(!has_coords, &(global_write_state_->frag_meta_))); + auto uri = global_write_state_->frag_meta_->fragment_uri(); - Status st = Status::Ok(); - for (const auto& attr : attributes_) { + // Initialize global write state for coordinates + if (has_coords) { + auto dim_num = array_schema_->dim_num(); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + + // Initialize last tiles + auto last_tile_pair = std::pair>( + dim_name, std::pair(Tile(), Tile())); + auto it_ret = + global_write_state_->last_coord_tiles_.emplace(last_tile_pair); + + auto& last_tile = it_ret.first->second.first; + RETURN_NOT_OK_ELSE(init_coord_tile(d, &last_tile), clean_up(uri)); + + // Initialize cells written + global_write_state_->coord_cells_written_[dim_name] = 0; + } + } + + // Initialize global write state for attributes + for (const auto& it : attr_buffers_) { // Initialize last tiles + auto attr = it.first; auto last_tile_pair = std::pair>( attr, std::pair(Tile(), Tile())); - auto it_ret = global_write_state_->last_tiles_.emplace(last_tile_pair); + auto it_ret = global_write_state_->last_attr_tiles_.emplace(last_tile_pair); if (!array_schema_->var_size(attr)) { auto& last_tile = it_ret.first->second.first; - st = init_tile(attr, &last_tile); - if (!st.ok()) - break; + RETURN_NOT_OK_ELSE(init_tile(attr, &last_tile), clean_up(uri)); } else { auto& last_tile = it_ret.first->second.first; auto& last_tile_var = it_ret.first->second.second; - st = init_tile(attr, &last_tile, &last_tile_var); - if (!st.ok()) - break; + RETURN_NOT_OK_ELSE( + init_tile(attr, &last_tile, &last_tile_var), clean_up(uri)); } // Initialize cells written - global_write_state_->cells_written_[attr] = 0; - } - - // Handle error - if (!st.ok()) { - storage_manager_->vfs()->remove_dir( - global_write_state_->frag_meta_->fragment_uri()); - global_write_state_.reset(nullptr); + global_write_state_->attr_cells_written_[attr] = 0; } - return st; + return Status::Ok(); STATS_FUNC_OUT(writer_init_global_write_state); } Status Writer::init_tile(const std::string& attribute, Tile* tile) const { // For easy reference + auto has_coords = !coord_buffers_.empty(); auto domain = array_schema_->domain(); auto cell_size = array_schema_->cell_size(attribute); auto capacity = array_schema_->capacity(); auto type = array_schema_->type(attribute); - auto is_coords = (attribute == constants::coords); - auto dim_num = (is_coords) ? array_schema_->dim_num() : 0; - auto cell_num_per_tile = - (has_coords()) ? capacity : domain->cell_num_per_tile(); + auto cell_num_per_tile = has_coords ? capacity : domain->cell_num_per_tile(); auto tile_size = cell_num_per_tile * cell_size; // Initialize - RETURN_NOT_OK(tile->init( - constants::format_version, type, tile_size, cell_size, dim_num)); + RETURN_NOT_OK( + tile->init(constants::format_version, type, tile_size, cell_size, 0)); return Status::Ok(); } @@ -1431,11 +1504,11 @@ Status Writer::init_tile(const std::string& attribute, Tile* tile) const { Status Writer::init_tile( const std::string& attribute, Tile* tile, Tile* tile_var) const { // For easy reference + auto has_coords = !coord_buffers_.empty(); auto domain = array_schema_->domain(); auto capacity = array_schema_->capacity(); auto type = array_schema_->type(attribute); - auto cell_num_per_tile = - (has_coords()) ? capacity : domain->cell_num_per_tile(); + auto cell_num_per_tile = has_coords ? capacity : domain->cell_num_per_tile(); auto tile_size = cell_num_per_tile * constants::cell_var_offset_size; // Initialize @@ -1450,6 +1523,21 @@ Status Writer::init_tile( return Status::Ok(); } +Status Writer::init_coord_tile(unsigned dim_idx, Tile* tile) const { + // For easy reference + auto dim = array_schema_->dimension(dim_idx); + auto coord_size = dim->coord_size(); + auto capacity = array_schema_->capacity(); + auto type = dim->type(); + auto tile_size = capacity * coord_size; + + // Initialize + RETURN_NOT_OK( + tile->init(constants::format_version, type, tile_size, coord_size, 0)); + + return Status::Ok(); +} + template Status Writer::init_tile_dense_cell_range_iters( std::vector>* iters) const { @@ -1614,7 +1702,7 @@ Status Writer::ordered_write() { std::vector> dense_cell_range_its; RETURN_CANCEL_OR_ERROR_ELSE( init_tile_dense_cell_range_iters(&dense_cell_range_its), - storage_manager_->vfs()->remove_dir(uri)); + clean_up(uri)); auto tile_num = dense_cell_range_its.size(); if (tile_num == 0) // Nothing to write return Status::Ok(); @@ -1626,36 +1714,27 @@ Status Writer::ordered_write() { RETURN_CANCEL_OR_ERROR_ELSE( compute_write_cell_ranges( &dense_cell_range_its[i], &write_cell_ranges[i]), - storage_manager_->vfs()->remove_dir(uri)); + clean_up(uri)); dense_cell_range_its.clear(); // Set number of tiles in the fragment metadata frag_meta->set_num_tiles(tile_num); - // Prepare tiles for all attributes and filter - uint64_t num_attributes = attributes_.size(); - std::vector> attr_tiles(num_attributes); - auto statuses = parallel_for(0, num_attributes, [&](uint64_t i) { - const auto& attr = attributes_[i]; - std::vector& tiles = attr_tiles[i]; - RETURN_CANCEL_OR_ERROR(prepare_tiles(attr, write_cell_ranges, &tiles)); - RETURN_CANCEL_OR_ERROR(filter_tiles(attr, &tiles)); - return Status::Ok(); - }); - - // Check all statuses - for (auto& st : statuses) - RETURN_NOT_OK_ELSE(st, storage_manager_->vfs()->remove_dir(uri)); + // Prepare tiles and filter attribute tiles + std::unordered_map> attr_tiles; + RETURN_NOT_OK_ELSE( + prepare_and_filter_attr_tiles(write_cell_ranges, &attr_tiles), + clean_up(uri)); // Write tiles for all attributes + std::vector coords_tiles; // Will be ignored RETURN_NOT_OK_ELSE( - write_all_tiles(frag_meta.get(), attr_tiles), + write_all_tiles(frag_meta.get(), attr_tiles, coords_tiles), storage_manager_->vfs()->remove_dir(uri)); // Write the fragment metadata RETURN_CANCEL_OR_ERROR_ELSE( - frag_meta.get()->store(array_->get_encryption_key()), - storage_manager_->vfs()->remove_dir(uri)); + frag_meta->store(array_->get_encryption_key()), clean_up(uri)); // Add written fragment info add_written_fragment_info(frag_meta->fragment_uri()); @@ -1663,6 +1742,55 @@ Status Writer::ordered_write() { return Status::Ok(); } +Status Writer::prepare_and_filter_attr_tiles( + const std::vector& write_cell_ranges, + std::unordered_map>* attr_tiles) const { + // Initialize attribute tiles + for (const auto& it : attr_buffers_) + (*attr_tiles)[it.first] = std::vector(); + + uint64_t attr_num = attr_buffers_.size(); + auto statuses = parallel_for(0, attr_num, [&](uint64_t i) { + auto buff_it = attr_buffers_.begin(); + std::advance(buff_it, i); + const auto& attr = buff_it->first; + auto& tiles = (*attr_tiles)[attr]; + RETURN_CANCEL_OR_ERROR(prepare_tiles(attr, write_cell_ranges, &tiles)); + RETURN_CANCEL_OR_ERROR(filter_tiles(attr, &tiles)); + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); + + return Status::Ok(); +} + +Status Writer::prepare_full_attr_tiles( + const std::set& coord_dups, + std::unordered_map>* attr_tiles) const { + // Initialize attribute tiles + for (const auto& it : attr_buffers_) + (*attr_tiles)[it.first] = std::vector(); + + auto attr_num = attr_buffers_.size(); + auto statuses = parallel_for(0, attr_num, [&](uint64_t i) { + auto buff_it = attr_buffers_.begin(); + std::advance(buff_it, i); + const auto& attr = buff_it->first; + auto& full_tiles = (*attr_tiles)[attr]; + RETURN_CANCEL_OR_ERROR(prepare_full_tiles(attr, coord_dups, &full_tiles)); + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); + + return Status::Ok(); +} + Status Writer::prepare_full_tiles( const std::string& attribute, const std::set& coord_dups, @@ -1679,6 +1807,7 @@ Status Writer::prepare_full_tiles_fixed( STATS_FUNC_IN(writer_prepare_full_tiles_fixed); // For easy reference + auto has_coords = !coord_buffers_.empty(); auto it = attr_buffers_.find(attribute); auto buffer = (unsigned char*)it->second.buffer_; auto buffer_size = it->second.buffer_size_; @@ -1686,15 +1815,14 @@ Status Writer::prepare_full_tiles_fixed( auto cell_size = array_schema_->cell_size(attribute); auto cell_num = *buffer_size / cell_size; auto domain = array_schema_->domain(); - auto cell_num_per_tile = - (has_coords()) ? capacity : domain->cell_num_per_tile(); + auto cell_num_per_tile = has_coords ? capacity : domain->cell_num_per_tile(); // Do nothing if there are no cells to write if (cell_num == 0) return Status::Ok(); // First fill the last tile - auto& last_tile = global_write_state_->last_tiles_[attribute].first; + auto& last_tile = global_write_state_->last_attr_tiles_[attribute].first; uint64_t cell_idx = 0; if (!last_tile.empty()) { if (coord_dups.empty()) { @@ -1771,7 +1899,7 @@ Status Writer::prepare_full_tiles_fixed( } } - global_write_state_->cells_written_[attribute] += cell_num; + global_write_state_->attr_cells_written_[attribute] += cell_num; return Status::Ok(); @@ -1785,6 +1913,7 @@ Status Writer::prepare_full_tiles_var( STATS_FUNC_IN(writer_prepare_full_tiles_var); // For easy reference + auto has_coords = !coord_buffers_.empty(); auto it = attr_buffers_.find(attribute); auto buffer = (uint64_t*)it->second.buffer_; auto buffer_var = (unsigned char*)it->second.buffer_var_; @@ -1793,8 +1922,7 @@ Status Writer::prepare_full_tiles_var( auto capacity = array_schema_->capacity(); auto cell_num = *buffer_size / constants::cell_var_offset_size; auto domain = array_schema_->domain(); - auto cell_num_per_tile = - (has_coords()) ? capacity : domain->cell_num_per_tile(); + auto cell_num_per_tile = has_coords ? capacity : domain->cell_num_per_tile(); uint64_t offset, var_size; // Do nothing if there are no cells to write @@ -1802,7 +1930,7 @@ Status Writer::prepare_full_tiles_var( return Status::Ok(); // First fill the last tile - auto& last_tile_pair = global_write_state_->last_tiles_[attribute]; + auto& last_tile_pair = global_write_state_->last_attr_tiles_[attribute]; auto& last_tile = last_tile_pair.first; auto& last_tile_var = last_tile_pair.second; uint64_t cell_idx = 0; @@ -1937,13 +2065,143 @@ Status Writer::prepare_full_tiles_var( } } - global_write_state_->cells_written_[attribute] += cell_num; + global_write_state_->attr_cells_written_[attribute] += cell_num; return Status::Ok(); STATS_FUNC_OUT(writer_prepare_full_tiles_var); } +Status Writer::prepare_full_coord_tiles( + const std::set& coord_dups, + std::unordered_map>* tiles) const { + // If there are no coordinates, exit + if (coord_buffers_.empty()) + return Status::Ok(); + + // Prepare tiles map + auto dim_num = array_schema_->dim_num(); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + (*tiles)[dim_name] = std::vector(); + } + + // Prepare full coordinate tiles + auto statuses = parallel_for(0, dim_num, [&](uint64_t d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + auto& coord_tiles = (*tiles)[dim_name]; + return prepare_full_coord_tiles_fixed(d, coord_dups, &coord_tiles); + }); + + // Check all statuses + for (auto& st : statuses) { + if (!st.ok()) + return st; + } + + return Status::Ok(); +} + +Status Writer::prepare_full_coord_tiles_fixed( + unsigned dim_idx, + const std::set& coord_dups, + std::vector* tiles) const { + // For easy reference + auto dim = array_schema_->dimension(dim_idx); + const auto& dim_name = dim->name(); + auto it = coord_buffers_.find(dim_name); + assert(it != coord_buffers_.end()); + auto buffer = (unsigned char*)(it->second.buffer_); + auto capacity = array_schema_->capacity(); + auto coord_size = dim->coord_size(); + + // Do nothing if there are no cells to write + if (coords_num_ == 0) + return Status::Ok(); + + // First fill the last tile + auto& last_tile = global_write_state_->last_coord_tiles_[dim_name].first; + uint64_t cell_idx = 0; + if (!last_tile.empty()) { + if (coord_dups.empty()) { + do { + RETURN_NOT_OK( + last_tile.write(buffer + cell_idx * coord_size, coord_size)); + ++cell_idx; + } while (!last_tile.full() && cell_idx != coords_num_); + } else { + do { + if (coord_dups.find(cell_idx) == coord_dups.end()) + RETURN_NOT_OK( + last_tile.write(buffer + cell_idx * coord_size, coord_size)); + ++cell_idx; + } while (!last_tile.full() && cell_idx != coords_num_); + } + } + + // Initialize full tiles and set previous last tile as first tile + auto full_tile_num = + (coords_num_ - cell_idx) / capacity + (int)last_tile.full(); + auto cell_num_to_write = (full_tile_num - last_tile.full()) * capacity; + + if (full_tile_num > 0) { + tiles->resize(full_tile_num); + for (auto& tile : (*tiles)) + RETURN_NOT_OK(init_coord_tile(dim_idx, &tile)); + + // Handle last tile (it must be either full or empty) + if (last_tile.full()) { + (*tiles)[0] = last_tile; + last_tile.reset(); + } else { + assert(last_tile.empty()); + } + + // Write all remaining cells one by one + if (coord_dups.empty()) { + for (uint64_t tile_idx = 0, i = 0; i < cell_num_to_write;) { + if ((*tiles)[tile_idx].full()) + ++tile_idx; + + RETURN_NOT_OK((*tiles)[tile_idx].write( + buffer + cell_idx * coord_size, coord_size * capacity)); + cell_idx += capacity; + i += capacity; + } + } else { + for (uint64_t tile_idx = 0, i = 0; i < cell_num_to_write; + ++cell_idx, ++i) { + if (coord_dups.find(cell_idx) == coord_dups.end()) { + if ((*tiles)[tile_idx].full()) + ++tile_idx; + + RETURN_NOT_OK((*tiles)[tile_idx].write( + buffer + cell_idx * coord_size, coord_size)); + } + } + } + } + + // Potentially fill the last tile + assert(coords_num_ - cell_idx < capacity - last_tile.cell_num()); + if (coord_dups.empty()) { + for (; cell_idx < coords_num_; ++cell_idx) { + RETURN_NOT_OK( + last_tile.write(buffer + cell_idx * coord_size, coord_size)); + } + } else { + for (; cell_idx < coords_num_; ++cell_idx) { + if (coord_dups.find(cell_idx) == coord_dups.end()) + RETURN_NOT_OK( + last_tile.write(buffer + cell_idx * coord_size, coord_size)); + } + } + + global_write_state_->coord_cells_written_[dim_name] += coords_num_; + + return Status::Ok(); +} + Status Writer::prepare_tiles( const std::string& attribute, const std::vector& write_cell_ranges, @@ -2080,6 +2338,115 @@ Status Writer::prepare_tiles_fixed( STATS_FUNC_OUT(writer_prepare_tiles_fixed); } +Status Writer::prepare_attr_tiles( + const std::vector& cell_pos, + const std::set& coord_dups, + std::unordered_map>* attr_tiles) const { + // Initialize attribute tiles + attr_tiles->clear(); + for (const auto& it : attr_buffers_) + (*attr_tiles)[it.first] = std::vector(); + + // Prepare tiles for all attributes + auto attr_num = attr_buffers_.size(); + auto statuses = parallel_for(0, attr_num, [&](uint64_t i) { + auto buff_it = attr_buffers_.begin(); + std::advance(buff_it, i); + auto attr = buff_it->first; + auto& tiles = (*attr_tiles)[attr]; + RETURN_CANCEL_OR_ERROR(prepare_tiles(attr, cell_pos, coord_dups, &tiles)); + return Status::Ok(); + }); + + // Check all statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); + + return Status::Ok(); +} + +Status Writer::prepare_coord_tiles( + const std::vector& cell_pos, + const std::set& coord_dups, + std::unordered_map>* tiles) const { + // If coord buffers are empty, there is nothing to do + if (coord_buffers_.empty()) + return Status::Ok(); + + // Prepare coordinate tiles map + auto dim_num = array_schema_->dim_num(); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + (*tiles)[dim_name] = std::vector(); + } + + // Prepare coordinate tiles in parallel + auto statuses = parallel_for(0, dim_num, [&](uint64_t d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + auto& coord_tiles = (*tiles)[dim_name]; + return prepare_coord_tiles_fixed(d, cell_pos, coord_dups, &coord_tiles); + }); + + // Check all statuses + for (auto& st : statuses) { + if (!st.ok()) + return st; + } + + return Status::Ok(); +} + +Status Writer::prepare_coord_tiles_fixed( + unsigned dim_idx, + const std::vector& cell_pos, + const std::set& coord_dups, + std::vector* tiles) const { + STATS_FUNC_IN(writer_prepare_tiles_fixed); + + // Trivial case + if (cell_pos.empty()) + return Status::Ok(); + + // For easy reference + const auto& dim_name = array_schema_->dimension(dim_idx)->name(); + auto buffer = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + auto capacity = array_schema_->capacity(); + auto dups_num = coord_dups.size(); + auto tile_num = utils::math::ceil(coords_num_ - dups_num, capacity); + auto coord_size = array_schema_->dimension(dim_idx)->coord_size(); + + // Initialize tiles + tiles->resize(tile_num); + for (auto& tile : (*tiles)) + RETURN_NOT_OK(init_coord_tile(dim_idx, &tile)); + + // Write all cells one by one + if (dups_num == 0) { + for (uint64_t i = 0, tile_idx = 0; i < coords_num_; ++i) { + if ((*tiles)[tile_idx].full()) + ++tile_idx; + + RETURN_NOT_OK((*tiles)[tile_idx].write( + buffer + cell_pos[i] * coord_size, coord_size)); + } + } else { + for (uint64_t i = 0, tile_idx = 0; i < coords_num_; ++i) { + if (coord_dups.find(cell_pos[i]) != coord_dups.end()) + continue; + + if ((*tiles)[tile_idx].full()) + ++tile_idx; + + RETURN_NOT_OK((*tiles)[tile_idx].write( + buffer + cell_pos[i] * coord_size, coord_size)); + } + } + + return Status::Ok(); + + STATS_FUNC_OUT(writer_prepare_tiles_fixed); +} + Status Writer::prepare_tiles_var( const std::string& attribute, const std::vector& cell_pos, @@ -2154,91 +2521,87 @@ void Writer::reset() { initialized_ = false; } -template Status Writer::sort_coords(std::vector* cell_pos) const { STATS_FUNC_IN(writer_sort_coords); // For easy reference auto domain = array_schema_->domain(); - uint64_t coords_size = array_schema_->coords_size(); - auto it = attr_buffers_.find(constants::coords); - auto buffer = (T*)it->second.buffer_; - auto buffer_size = it->second.buffer_size_; - uint64_t coords_num = *buffer_size / coords_size; + + // Prepare auxiliary vector for better performance + auto dim_num = array_schema_->dim_num(); + std::vector buffs(dim_num); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + buffs[d] = (const void*)coord_buffers_.find(dim_name)->second.buffer_; + } // Populate cell_pos - cell_pos->resize(coords_num); - for (uint64_t i = 0; i < coords_num; ++i) + cell_pos->resize(coords_num_); + for (uint64_t i = 0; i < coords_num_; ++i) (*cell_pos)[i] = i; // Sort the coordinates in global order - parallel_sort( - cell_pos->begin(), cell_pos->end(), GlobalCmp(domain, buffer)); + parallel_sort(cell_pos->begin(), cell_pos->end(), GlobalCmp2(domain, buffs)); return Status::Ok(); STATS_FUNC_OUT(writer_sort_coords); } -Status Writer::unordered_write() { - STATS_FUNC_IN(writer_unordered_write); +Status Writer::split_coords_buffer() { + // Do nothing if the coordinates buffer is not set + if (coords_buffer_ == nullptr) + return Status::Ok(); - // Applicable only to unordered write on dense/sparse arrays - assert(layout_ == Layout::UNORDERED); + // For easy reference + auto coords_size = array_schema_->coords_size(); + coords_num_ = *coords_buffer_size_ / coords_size; + auto dim_num = array_schema_->dim_num(); - auto coords_type = array_schema_->coords_type(); - switch (coords_type) { - case Datatype::INT8: - return unordered_write(); - case Datatype::UINT8: - return unordered_write(); - case Datatype::INT16: - return unordered_write(); - case Datatype::UINT16: - return unordered_write(); - case Datatype::INT32: - return unordered_write(); - case Datatype::UINT32: - return unordered_write(); - case Datatype::INT64: - return unordered_write(); - case Datatype::UINT64: - return unordered_write(); - case Datatype::FLOAT32: - assert(!array_schema_->dense()); - return unordered_write(); - case Datatype::FLOAT64: - assert(!array_schema_->dense()); - return unordered_write(); - case Datatype::DATETIME_YEAR: - case Datatype::DATETIME_MONTH: - case Datatype::DATETIME_WEEK: - case Datatype::DATETIME_DAY: - case Datatype::DATETIME_HR: - case Datatype::DATETIME_MIN: - case Datatype::DATETIME_SEC: - case Datatype::DATETIME_MS: - case Datatype::DATETIME_US: - case Datatype::DATETIME_NS: - case Datatype::DATETIME_PS: - case Datatype::DATETIME_FS: - case Datatype::DATETIME_AS: - return unordered_write(); - default: - return LOG_STATUS(Status::WriterError( - "Cannot write in unordered layout; Unsupported domain type")); + clear_coord_buffers(); + + coord_buffers_alloced_ = true; + coord_sizes_.resize(dim_num); + + // New coord buffer allocations + for (unsigned d = 0; d < dim_num; ++d) { + auto dim = array_schema_->dimension(d); + const auto& dim_name = dim->name(); + auto coord_buffer_size = coords_num_ * dim->coord_size(); + auto it = coord_buffer_sizes_.emplace(dim_name, coord_buffer_size); + CoordBuffer buff; + buff.buffer_size_ = &(it.first->second); + buff.buffer_ = std::malloc(coord_buffer_size); + if (buff.buffer_ == nullptr) + RETURN_NOT_OK(Status::WriterError( + "Cannot split coordinate buffers; memory allocation failed")); + coord_buffers_.emplace(dim_name, buff); + coord_sizes_[d] = array_schema_->dimension(d)->coord_size(); } - return Status::Ok(); + // Split coordinates + auto coord = (unsigned char*)nullptr; + for (unsigned d = 0; d < dim_num; ++d) { + auto coord_size = array_schema_->dimension(d)->coord_size(); + const auto& dim_name = array_schema_->dimension(d)->name(); + auto buff = (unsigned char*)(coord_buffers_[dim_name].buffer_); + for (uint64_t c = 0; c < coords_num_; ++c) { + coord = + &(((unsigned char*)coords_buffer_)[c * coords_size + d * coord_size]); + std::memcpy(&(buff[c * coord_size]), coord, coord_size); + } + } - STATS_FUNC_OUT(writer_unordered_write); + return Status::Ok(); } -template Status Writer::unordered_write() { + // Applicable only to unordered write on dense/sparse arrays + assert(layout_ == Layout::UNORDERED); + // Sort coordinates first std::vector cell_pos; - RETURN_CANCEL_OR_ERROR(sort_coords(&cell_pos)); + RETURN_CANCEL_OR_ERROR(sort_coords(&cell_pos)); // Check for coordinate duplicates if (check_coord_dups_ && !dedup_coords_) @@ -2252,57 +2615,73 @@ Status Writer::unordered_write() { // Create new fragment std::shared_ptr frag_meta; RETURN_CANCEL_OR_ERROR(create_fragment(false, &frag_meta)); - auto uri = frag_meta->fragment_uri(); + const auto& uri = frag_meta->fragment_uri(); + + // Prepare tiles + std::unordered_map> coord_tiles; + std::unordered_map> attr_tiles; + auto statuses = parallel_for(0, 2, [&](uint64_t i) { + if (i == 0) { + // Prepare coordinate tiles + RETURN_CANCEL_OR_ERROR_ELSE( + prepare_coord_tiles(cell_pos, coord_dups, &coord_tiles), + clean_up(uri)); + } else { + // Prepare attribute tiles + RETURN_CANCEL_OR_ERROR_ELSE( + prepare_attr_tiles(cell_pos, coord_dups, &attr_tiles), clean_up(uri)); + } - // Prepare tiles for all attributes - auto num_attributes = attributes_.size(); - std::vector> attribute_tiles(num_attributes); - auto statuses = parallel_for(0, num_attributes, [&](uint64_t i) { - const auto& attr = attributes_[i]; - auto& tiles = attribute_tiles[i]; - RETURN_CANCEL_OR_ERROR(prepare_tiles(attr, cell_pos, coord_dups, &tiles)); return Status::Ok(); }); + // Check all statuses + for (auto& st : statuses) + RETURN_NOT_OK(st); + // Clear the boolean vector for coordinate duplicates coord_dups.clear(); - // Check all statuses - for (auto& st : statuses) - RETURN_NOT_OK_ELSE(st, storage_manager_->vfs()->remove_dir(uri)); - statuses.clear(); + // No tiles + if (coord_tiles.empty() || coord_tiles.begin()->second.empty()) + return Status::Ok(); // Set the number of tiles in the metadata - uint64_t num_tiles = array_schema_->var_size(attributes_[0]) ? - attribute_tiles[0].size() / 2 : - attribute_tiles[0].size(); - frag_meta->set_num_tiles(num_tiles); + frag_meta->set_num_tiles(coord_tiles.begin()->second.size()); + + // Filter tiles + std::vector coords_tiles; + statuses = parallel_for(0, 2, [&](uint64_t i) { + if (i == 0) { + // Filter coordinate tiles + RETURN_CANCEL_OR_ERROR_ELSE( + compute_coords_metadata(coord_tiles, frag_meta.get()), clean_up(uri)); + // TODO: remove zipping and filter coordinate tiles separately + RETURN_NOT_OK_ELSE( + zip_coord_tiles(coord_tiles, &coords_tiles), clean_up(uri)); + RETURN_CANCEL_OR_ERROR_ELSE( + filter_tiles(constants::coords, &coords_tiles), clean_up(uri)); + } else { + // Filter attribute tiles + RETURN_CANCEL_OR_ERROR_ELSE( + filter_attr_tiles(&attr_tiles), clean_up(uri)); + } - // Filter all tiles - statuses = parallel_for(0, num_attributes, [&](uint64_t i) { - const auto& attr = attributes_[i]; - auto& tiles = attribute_tiles[i]; - if (attr == constants::coords) - RETURN_CANCEL_OR_ERROR( - compute_coords_metadata(tiles, frag_meta.get())); - RETURN_CANCEL_OR_ERROR(filter_tiles(attr, &tiles)); return Status::Ok(); }); // Check all statuses for (auto& st : statuses) - RETURN_NOT_OK_ELSE(st, storage_manager_->vfs()->remove_dir(uri)); - statuses.clear(); + RETURN_NOT_OK(st); - // Write tiles for all attributes + // Write tiles for all attributes and coordinates RETURN_NOT_OK_ELSE( - write_all_tiles(frag_meta.get(), attribute_tiles), - storage_manager_->vfs()->remove_dir(uri)); + write_all_tiles(frag_meta.get(), attr_tiles, coords_tiles), + clean_up(uri)); // Write the fragment metadata RETURN_CANCEL_OR_ERROR_ELSE( - frag_meta.get()->store(array_->get_encryption_key()), - storage_manager_->vfs()->remove_dir(uri)); + frag_meta->store(array_->get_encryption_key()), clean_up(uri)); // Add written fragment info add_written_fragment_info(frag_meta->fragment_uri()); @@ -2377,15 +2756,18 @@ Status Writer::write_cell_range_to_tile_var( Status Writer::write_all_tiles( FragmentMetadata* frag_meta, - const std::vector>& attribute_tiles) const { + const std::unordered_map>& attr_tiles, + const std::vector& coords_tiles) const { STATS_FUNC_IN(writer_write_all_tiles); + assert(!attr_tiles.empty() || !coords_tiles.empty()); + std::vector> tasks; - auto num_attributes = attributes_.size(); - for (uint64_t i = 0; i < num_attributes; i++) { - const auto& attr = attributes_[i]; - auto& tiles = attribute_tiles[i]; + // Attribute tiles + for (const auto& it : attr_buffers_) { + const auto& attr = it.first; + auto& tiles = attr_tiles.find(attr)->second; tasks.push_back( storage_manager_->writer_thread_pool()->enqueue([&, this]() { RETURN_CANCEL_OR_ERROR(write_tiles(attr, frag_meta, tiles)); @@ -2393,6 +2775,16 @@ Status Writer::write_all_tiles( })); } + // Coordinate tiles + if (!coord_buffers_.empty()) { + tasks.push_back( + storage_manager_->writer_thread_pool()->enqueue([&, this]() { + RETURN_CANCEL_OR_ERROR( + write_tiles(constants::coords, frag_meta, coords_tiles)); + return Status::Ok(); + })); + } + // Wait for writes and check all statuses auto statuses = storage_manager_->writer_thread_pool()->wait_all_status(tasks); @@ -2451,5 +2843,61 @@ Status Writer::write_tiles( return Status::Ok(); } +// TODO: remove +Status Writer::zip_coord_tiles( + const std::unordered_map>& coord_tiles, + std::vector* coords_tiles) const { + if (coord_tiles.empty() || coord_tiles.begin()->second.empty()) + return Status::Ok(); + + auto tile_num = coord_tiles.begin()->second.size(); + coords_tiles->clear(); + coords_tiles->resize(tile_num); + auto type = array_schema()->domain()->type(); + unsigned dim_num = array_schema()->dim_num(); + uint64_t coords_size = array_schema()->coords_size(); + for (size_t t = 0; t < tile_num; ++t) { + auto& new_tile = (*coords_tiles)[t]; + auto cell_num = coord_tiles.begin()->second[t].cell_num(); + RETURN_NOT_OK(new_tile.init( + constants::format_version, + type, + cell_num * coords_size, + coords_size, + dim_num)); + for (unsigned d = 0; d < dim_num; ++d) { + const auto& dim_name = array_schema_->dimension(d)->name(); + const auto& coord_tile = coord_tiles.find(dim_name)->second[t]; + new_tile.write(coord_tile.data(), coord_tile.size()); + } + } + + return Status::Ok(); +} + +std::string Writer::coords_to_str(uint64_t i) const { + std::string ret; + auto dim_num = array_schema_->dim_num(); + + ret = "("; + for (unsigned d = 0; d < dim_num; ++d) { + auto dim = array_schema_->dimension(d); + const auto& dim_name = dim->name(); + auto buff = (unsigned char*)coord_buffers_.find(dim_name)->second.buffer_; + auto coord = buff + i * coord_sizes_[d]; + ret += dim->coord_to_str(coord); + if (d < dim_num - 1) + ret += ", "; + } + ret += ")"; + + return ret; +} + +void Writer::clean_up(const URI& uri) { + storage_manager_->vfs()->remove_dir(uri); + global_write_state_.reset(nullptr); +} + } // namespace sm } // namespace tiledb diff --git a/tiledb/sm/query/writer.h b/tiledb/sm/query/writer.h index 8ce6f77f70ec..711e3ffd9567 100644 --- a/tiledb/sm/query/writer.h +++ b/tiledb/sm/query/writer.h @@ -69,13 +69,27 @@ class Writer { * var-sized attributes, the first tile is the offsets tile, whereas * the second tile is the values tile. */ - std::unordered_map> last_tiles_; + std::unordered_map> last_attr_tiles_; /** * Stores the number of cells written for each attribute across the * write operations. */ - std::unordered_map cells_written_; + std::unordered_map attr_cells_written_; + + /** + * Stores the last coordinate tile of each dimension for each write + * operation. For fixed-sized dimensions, the second tile is ignored. For + * var-sized dimensions, the first tile is the offsets tile, whereas + * the second tile is the values tile. + */ + std::unordered_map> last_coord_tiles_; + + /** + * Stores the number of cells written for each dimension across the + * write operations. + */ + std::unordered_map coord_cells_written_; /** The fragment metadata. */ std::shared_ptr frag_meta_; @@ -115,42 +129,34 @@ class Writer { /* API */ /* ********************************* */ + /** Returns the attributes the user has set buffers for. */ + std::vector attributes() const; + /** Returns the array schema. */ const ArraySchema* array_schema() const; - /** - * Return list of attribtues for query - * @return vector of attributes for query - */ - std::vector attributes() const; - - /** - * Fetch AttributeBuffer for attribute - * @param attribute to fetch - * @return AttributeBuffer for attribute - */ - AttributeBuffer buffer(const std::string& attribute) const; + /** Returns the query buffer with the given name. */ + AttributeBuffer buffer(const std::string& name) const; /** Finalizes the reader. */ Status finalize(); /** - * Retrieves the buffer of a fixed-sized attribute. + * Retrieves the buffer of a fixed-sized attribute/dimension. * - * @param attribute The buffer attribute. + * @param name The buffer name. * @param buffer The buffer to be retrieved. * @param buffer_size A pointer to the buffer size to be retrieved. * @return Status */ Status get_buffer( - const std::string& attribute, - void** buffer, - uint64_t** buffer_size) const; + const std::string& name, void** buffer, uint64_t** buffer_size) const; /** - * Retrieves the offsets and values buffers of a var-sized attribute. + * Retrieves the offsets and values buffers of a var-sized + * attribute/dimension. * - * @param attribute The buffer attribute. + * @param name The buffer attribute/dimension. * @param buffer_off The offsets buffer to be retrieved. * @param buffer_off_size A pointer to the offsets buffer size to be * retrieved. @@ -159,7 +165,7 @@ class Writer { * @return Status */ Status get_buffer( - const std::string& attribute, + const std::string& name, uint64_t** buffer_off, uint64_t** buffer_off_size, void** buffer_val, @@ -190,20 +196,20 @@ class Writer { void set_array_schema(const ArraySchema* array_schema); /** - * Sets the buffer for a fixed-sized attribute. + * Sets the buffer for a fixed-sized attribute/dimension. * - * @param attribute The attribute to set the buffer for. + * @param name The attribute/dimension to set the buffer for. * @param buffer The buffer that has the input data to be written. * @param buffer_size The size of `buffer` in bytes. * @return Status */ Status set_buffer( - const std::string& attribute, void* buffer, uint64_t* buffer_size); + const std::string& name, void* buffer, uint64_t* buffer_size); /** - * Sets the buffer for a var-sized attribute. + * Sets the buffer for a var-sized attribute/dimension. * - * @param attribute The attribute to set the buffer for. + * @param name The attribute/dimension to set the buffer for. * @param buffer_off The buffer that has the input data to be written, * This buffer holds the starting offsets of each cell value in * `buffer_val`. @@ -214,7 +220,7 @@ class Writer { * @return Status */ Status set_buffer( - const std::string& attribute, + const std::string& name, uint64_t* buffer_off, uint64_t* buffer_off_size, void* buffer_val, @@ -282,12 +288,40 @@ class Writer { /** The array schema. */ const ArraySchema* array_schema_; - /** The names of the attributes involved in the query. */ - std::vector attributes_; - /** Maps attribute names to their buffers. */ std::unordered_map attr_buffers_; + /** Maps dimension names to their coordinate buffers. */ + std::unordered_map coord_buffers_; + + /** The coordinates buffer potentially set by the user. */ + void* coords_buffer_; + + /** The coordinates buffer size potentially set by the user. */ + uint64_t* coords_buffer_size_; + + /** + * The coordinate sizes, one per dimension, in the order the dimensions + * are defined in the array schema. + */ + std::vector coord_sizes_; + + /** Number of coordinates provided by the user. */ + uint64_t coords_num_; + + /** + * If `true` it means that TileDB alloc'ed these buffers, not the user, and + * TileDB will free this buffers in the writer destructor. + */ + bool coord_buffers_alloced_; + + /** + * The sizes of the coordinate buffers in a map (dimension -> size). + * Needed separate storage since CoordBuffer stores a pointer to the buffer + * sizes. + */ + std::unordered_map coord_buffer_sizes_; + /** * Meaningful only when `dedup_coords_` is `false`. * If `true`, a check for duplicate coordinates will be performed upon @@ -372,16 +406,6 @@ class Writer { */ Status check_coord_oob() const; - /** - * Throws an error if there are coordinates falling out-of-bounds, i.e., - * outside the array domain. - * - * @tparam T The coordinates type. - * @return Status - */ - template - Status check_coord_oob() const; - /** * Throws an error if there are coordinate duplicates. This function * assumes that the coordinates are written in the global layout, @@ -399,16 +423,6 @@ class Writer { */ Status check_global_order() const; - /** - * Throws an error if there are coordinates that do not obey the - * global order. - * - * @tparam T The domain type. - * @return Status - */ - template - Status check_global_order() const; - /** Correctness checks for `subarray_`. */ Status check_subarray() const; @@ -416,6 +430,12 @@ class Writer { template Status check_subarray() const; + /** + * Cleans up the coordinate buffers. Applicable only if the coordinate + * buffers were allocated by TileDB (not the user) + */ + void clear_coord_buffers(); + /** Closes all attribute files, flushing their state to storage. */ Status close_files(FragmentMetadata* meta) const; @@ -452,17 +472,31 @@ class Writer { */ Status compute_coord_dups(std::set* coord_dups) const; + /** + * Computes the coordinates metadata (e.g., MBRs). + * + * @param tiles The tiles to calculate the coords metadata from. It is + * a vector of vectors, one vector of tiles per dimension. + * @param meta The fragment metadata that will store the coords metadata. + * @return Status + */ + Status compute_coords_metadata( + const std::unordered_map>& tiles, + FragmentMetadata* meta) const; + /** * Computes the coordinates metadata (e.g., MBRs). * * @tparam T The domain type. - * @param tiles The tiles to calculate the coords metadata from. + * @param tiles The tiles to calculate the coords metadata from. It is + * a vector of vectors, one vector of tiles per dimension. * @param meta The fragment metadata that will store the coords metadata. * @return Status */ template Status compute_coords_metadata( - const std::vector& tiles, FragmentMetadata* meta) const; + const std::unordered_map>& tiles, + FragmentMetadata* meta) const; /** * Computes the cell ranges to be written, derived from a @@ -488,6 +522,25 @@ class Writer { Status create_fragment( bool dense, std::shared_ptr* frag_meta) const; + /** + * Runs the input tiles for all attributes through the filter pipeline. + * The tile buffers are modified to contain the output of the pipeline. + * + * @return Status + */ + Status filter_attr_tiles( + std::unordered_map>* attr_tiles) const; + + /** + * Applicable only to global writes. Filters the last coordinate tiles + * and zips them into the input vector. + */ + Status filter_last_coord_tiles(std::vector* coords_tiles) const; + + /** Applicable only to global writes. Filters the last attribute tiles. */ + Status filter_last_attr_tiles( + std::unordered_map>* attr_tiles) const; + /** * Runs the input tiles for the input attribute through the filter pipeline. * The tile buffers are modified to contain the output of the pipeline. @@ -515,44 +568,21 @@ class Writer { /** Finalizes the global write state. */ Status finalize_global_write_state(); - /** - * Finalizes the global write state. - * - * @tparam T The domain type. - * @return Status - */ - template - Status finalize_global_write_state(); - /** * Writes in the global layout. Applicable to both dense and sparse * arrays. */ Status global_write(); - /** - * Writes in the global layout. Applicable to both dense and sparse - * arrays. - * - * @tparam T The domain type. - */ - template - Status global_write(); - /** * Applicable only to global writes. Writes the last tiles for each * attribute remaining in the state, and records the metadata for - * the coordinates attribute (if present). + * the coordinates (if present). * - * @tparam T The domain type. * @return Status */ - template Status global_write_handle_last_tile(); - /** Returns `true` if the coordinates are included in the attributes. */ - bool has_coords() const; - /** Initializes the global write state. */ Status init_global_write_state(); @@ -576,6 +606,15 @@ class Writer { Status init_tile( const std::string& attribute, Tile* tile, Tile* tile_var) const; + /** + * Initializes a fixed-sized coordinate tile. + * + * @param dim_idx The index of the dimension the tile belongs to. + * @param tile The tile to be initialized. + * @return Status + */ + Status init_coord_tile(unsigned dim_idx, Tile* tile) const; + /** * Initializes dense cell range iterators for the subarray to be writte, * one per overlapping tile. @@ -649,7 +688,25 @@ class Writer { /** * Applicable only to write in global order. It prepares only full * tiles, storing the last potentially non-full tile in - * `global_write_state->last_tiles_` as part of the state to be used in + * `global_write_state->last_attr_tiles_` as part of the state to be used in + * the next write invocation. The last tiles are written to storage + * upon `finalize`. Upon each invocation, the function first + * populates the partially full last tile from the previous + * invocation. + * + * @param attribute The attribute to prepare the tiles for. + * @param coord_dups The positions of the duplicate coordinates. + * @param attr_tiles The **full** tiles to be created. + * @return Status + */ + Status prepare_full_attr_tiles( + const std::set& coord_dups, + std::unordered_map>* attr_tiles) const; + + /** + * Applicable only to write in global order. It prepares only full + * tiles, storing the last potentially non-full tile in + * `global_write_state->last_attr_tiles_` as part of the state to be used in * the next write invocation. The last tiles are written to storage * upon `finalize`. Upon each invocation, the function first * populates the partially full last tile from the previous @@ -668,7 +725,7 @@ class Writer { /** * Applicable only to write in global order. It prepares only full * tiles, storing the last potentially non-full tile in - * `global_write_state_->last_tiles_` as part of the state to be used in + * `global_write_state_->last_attr_tiles_` as part of the state to be used in * the next write invocation. The last tiles are written to storage * upon `finalize`. Upon each invocation, the function first * populates the partially full last tile from the previous @@ -687,7 +744,7 @@ class Writer { /** * Applicable only to write in global order. It prepares only full * tiles, storing the last potentially non-full tile in - * `global_write_state_->last_tiles_` as part of the state to be used in + * `global_write_state_->last_attr_tiles_` as part of the state to be used in * the next write invocation. The last tiles are written to storage * upon `finalize`. Upon each invocation, the function first * populates the partially full last tile from the previous @@ -703,6 +760,54 @@ class Writer { const std::set& coord_dups, std::vector* tiles) const; + /** + * Applicable only to write in global order. It prepares only full + * coordinate tiles for each dimension, storing the last potentially + * non-full tiles in `global_write_state->last_coord_tiles_` as part of the + * state to be used in the next write invocation. The last tiles are written + * to storage upon `finalize`. Upon each invocation, the function first + * populates the partially full last tile from the previous + * invocation. + * + * @param coord_dups The positions of the duplicate coordinates. + * @param tiles The **full** tiles to be created, one vector per dimension. + * @return Status + */ + Status prepare_full_coord_tiles( + const std::set& coord_dups, + std::unordered_map>* tiles) const; + + /** + * Applicable only to write in global order. It prepares only full + * coordinate tiles for each dimension, storing the last potentially + * non-full tiles in `global_write_state->last_coord_tiles_` as part of the + * state to be used in the next write invocation. The last tiles are written + * to storage upon `finalize`. Upon each invocation, the function first + * populates the partially full last tile from the previous + * invocation. + * + * @param dim_idx The index of the dimension to prepare the full tiles for. + * @param coord_dups The positions of the duplicate coordinates. + * @param tiles The **full** tiles to be created. + * @return Status + */ + Status prepare_full_coord_tiles_fixed( + unsigned dim_idx, + const std::set& coord_dups, + std::vector* tiles) const; + + /** + * It prepares and filters attribute the tiles, copying from the user + * buffers into the tiles the values based on the input write cell ranges. + * + * @param write_cell_ranges The write cell ranges. + * @param tiles The tiles to be created. + * @return Status + */ + Status prepare_and_filter_attr_tiles( + const std::vector& write_cell_ranges, + std::unordered_map>* attr_tiles) const; + /** * It prepares the tiles, copying from the user buffers into the tiles * the values based on the input write cell ranges, focusing on the @@ -774,6 +879,58 @@ class Writer { const std::set& coord_dups, std::vector* tiles) const; + /** + * It prepares the attribute tiles, re-organizing the cells from the user + * buffers based on the input sorted positions. + * + * @param cell_pos The positions that resulted from sorting and + * according to which the cells must be re-arranged. + * @param coord_dups The set with the positions + * of duplicate coordinates/cells. + * @param attr_tiles The tiles to be created, one vector per attribute + * @return Status + */ + Status prepare_attr_tiles( + const std::vector& cell_pos, + const std::set& coord_dups, + std::unordered_map>* attr_tiles) const; + + /** + * It prepares the coordinate tiles, re-organizing the cells from the user + * buffers based on the input sorted positions. + * + * @param cell_pos The positions that resulted from sorting and + * according to which the cells must be re-arranged. + * @param coord_dups The set with the positions + * of duplicate coordinates/cells. + * @param tiles The tiles to be created, one vector per dimension + * @return Status + */ + Status prepare_coord_tiles( + const std::vector& cell_pos, + const std::set& coord_dups, + std::unordered_map>* tiles) const; + + /** + * It prepares the coordinate tiles, re-organizing the cells from the user + * buffers based on the input sorted positions. Applicable to fixed-sized + * coordinates + * + * @param dim_idx The index of the dimension to prepare the coordinate + * tiles for. + * @param cell_pos The positions that resulted from sorting and + * according to which the cells must be re-arranged. + * @param coord_dups The set with the positions + * of duplicate coordinates/cells. + * @param tiles The tiles to be created. + * @return Status + */ + Status prepare_coord_tiles_fixed( + unsigned dim_idx, + const std::vector& cell_pos, + const std::set& coord_dups, + std::vector* tiles) const; + /** Resets the writer object, rendering it incomplete. */ void reset(); @@ -781,26 +938,24 @@ class Writer { * Sorts the coordinates of the user buffers, creating a vector with * the sorted positions. * - * @tparam T The domain type. * @param cell_pos The sorted cell positions to be created. * @return Status */ - template Status sort_coords(std::vector* cell_pos) const; /** - * Writes in unordered layout. Applicable to both dense and sparse arrays. - * Explicit coordinates must be provided for this write. + * Splits the coordinates buffer into separate coordinate + * buffers, one per dimension. Note that this will require extra memory + * allocation, which will be cleaned up in the class destructor. + * + * @return Status */ - Status unordered_write(); + Status split_coords_buffer(); /** * Writes in unordered layout. Applicable to both dense and sparse arrays. * Explicit coordinates must be provided for this write. - * - * @tparam T The domain type. */ - template Status unordered_write(); /** @@ -861,12 +1016,14 @@ class Writer { /** * Writes all the input tiles to storage. * - * @param attribute_tiles Tiles to be written, one element per attribute. + * @param attr_tiles Attribute tiles to be written, one element per attribute. + * @param coords_tiles Coordinate tiles to be written. * @return Status */ Status write_all_tiles( FragmentMetadata* frag_meta, - const std::vector>& attribute_tiles) const; + const std::unordered_map>& attr_tiles, + const std::vector& coords_tiles) const; /** * Writes the input tiles for the input attribute to storage. @@ -880,6 +1037,32 @@ class Writer { const std::string& attribute, FragmentMetadata* frag_meta, const std::vector& tiles) const; + + // TODO: remove + // This will be removed in a subsequent PR very soon, when we will write + // the coordinate tiles in separate files and, therefore, there will be + // no need zipping the coordinates from separate buffers into a single one. + Status zip_coord_tiles( + const std::unordered_map>& coord_tiles, + std::vector* coords_tiles) const; + + /** + * Returns the i-th coordinates in the coordinate buffers in string + * format. + */ + std::string coords_to_str(uint64_t i) const; + + /** + * Invoked on error. It removes the directory of the input URI and + * resets the global write state. + */ + void clean_up(const URI& uri); + + /** + * Applicable only to global writes. Returns true if all last tiles stored + * in the global write state are empty. + */ + bool all_last_tiles_empty() const; }; } // namespace sm diff --git a/tiledb/sm/tile/tile.cc b/tiledb/sm/tile/tile.cc index d47810ba1788..d10333f78fbc 100644 --- a/tiledb/sm/tile/tile.cc +++ b/tiledb/sm/tile/tile.cc @@ -287,6 +287,7 @@ void Tile::set_size(uint64_t size) { uint64_t Tile::size() const { return (buffer_ == nullptr) ? 0 : buffer_->size(); } + void Tile::split_coordinates() { assert(dim_num_ > 0);