From 78cd95102cfc787ab6d6ef5662795575534f8410 Mon Sep 17 00:00:00 2001 From: Julia Dark <24235303+jp-dark@users.noreply.github.com> Date: Sat, 4 Nov 2023 05:55:25 -0400 Subject: [PATCH] Split `timestamp_end_opened_at_` into 2 separate timestamps (#3633) The private `Array` variable `timestamp_end_opened_at_` is being used for two different use cases: 1. Specify end of timestamp range for fragments to load from file. 2. Specify timestamp to use for new fragments, metadata, etc. Split the private variable into two variables to make this explicit. --- TYPE: IMPROVEMENT DESC: Internal clean-up of array timestamps --- tiledb/sm/array/array.cc | 155 +++++++++++++++++++------------ tiledb/sm/array/array.h | 107 +++++++++++++++------ tiledb/sm/serialization/array.cc | 28 ++---- 3 files changed, 184 insertions(+), 106 deletions(-) diff --git a/tiledb/sm/array/array.cc b/tiledb/sm/array/array.cc index 1a140954676..5f9350ad0f2 100644 --- a/tiledb/sm/array/array.cc +++ b/tiledb/sm/array/array.cc @@ -91,9 +91,10 @@ Array::Array( , encryption_key_(make_shared(HERE())) , is_open_(false) , is_opening_or_closing_(false) - , timestamp_start_(0) - , timestamp_end_(UINT64_MAX) - , timestamp_end_opened_at_(UINT64_MAX) + , array_dir_timestamp_start_(0) + , user_set_timestamp_end_(nullopt) + , array_dir_timestamp_end_(UINT64_MAX) + , new_component_timestamp_(nullopt) , storage_manager_(storage_manager) , resources_(storage_manager_->resources()) , config_(resources_.config()) @@ -245,8 +246,8 @@ Status Array::open( uint32_t key_length) { return Array::open( query_type, - timestamp_start_, - timestamp_end_, + array_dir_timestamp_start_, + user_set_timestamp_end_.value_or(UINT64_MAX), encryption_type, encryption_key, key_length); @@ -265,16 +266,17 @@ Status Array::open( // Checks if (is_open()) { return LOG_STATUS( - Status_ArrayError("Cannot open array; Array already open")); + Status_ArrayError("Cannot open array; Array already open.")); } metadata_.clear(); metadata_loaded_ = false; non_empty_domain_computed_ = false; - timestamp_start_ = timestamp_start; - timestamp_end_opened_at_ = timestamp_end; query_type_ = query_type; + set_timestamps( + timestamp_start, timestamp_end, query_type_ == QueryType::READ); + /* Note: the open status MUST be exception safe. If anything interrupts the * opening process, it will throw and the array will be set as closed. */ try { @@ -338,19 +340,6 @@ Status Array::open( throw StatusException(st); } - if (timestamp_end_opened_at_ == UINT64_MAX) { - if (query_type == QueryType::READ) { - timestamp_end_opened_at_ = utils::time::timestamp_now_ms(); - } else if ( - query_type == QueryType::WRITE || - query_type == QueryType::MODIFY_EXCLUSIVE || - query_type == QueryType::DELETE || query_type == QueryType::UPDATE) { - timestamp_end_opened_at_ = 0; - } else { - throw ArrayException("Cannot open array; Unsupported query type."); - } - } - if (remote_) { auto rest_client = resources_.rest_client(); if (rest_client == nullptr) { @@ -360,9 +349,7 @@ Status Array::open( if (!use_refactored_array_open()) { auto&& [st, array_schema_latest] = rest_client->get_array_schema_from_rest(array_uri_); - if (!st.ok()) { - throw StatusException(st); - } + throw_if_not_ok(st); array_schema_latest_ = array_schema_latest.value(); } else { auto st = rest_client->post_array_from_rest( @@ -376,7 +363,10 @@ Status Array::open( auto timer_se = resources_.stats().start_timer("array_open_read_load_directory"); array_dir_ = ArrayDirectory( - resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_); + resources_, + array_uri_, + array_dir_timestamp_start_, + array_dir_timestamp_end_); } std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) = open_for_reads(); @@ -389,8 +379,8 @@ Status Array::open( array_dir_ = ArrayDirectory( resources_, array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, ArrayDirectoryMode::SCHEMA_ONLY); } auto&& [st, array_schema_latest, array_schemas] = open_for_writes(); @@ -400,7 +390,9 @@ Status Array::open( array_schema_latest_ = array_schema_latest.value(); array_schemas_all_ = array_schemas.value(); - metadata_.reset(timestamp_end_opened_at_); + // Set the timestamp + metadata_.reset(timestamp_for_new_component()); + } else if ( query_type == QueryType::DELETE || query_type == QueryType::UPDATE) { { @@ -409,8 +401,8 @@ Status Array::open( array_dir_ = ArrayDirectory( resources_, array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, ArrayDirectoryMode::READ); } auto&& [st, array_schema_latest, array_schemas] = open_for_writes(); @@ -440,7 +432,8 @@ Status Array::open( return LOG_STATUS(Status_ArrayError(err.str())); } - metadata_.reset(timestamp_end_opened_at_); + // Updates the timestamp to use for metadata. + metadata_.reset(timestamp_for_new_component()); } else { throw ArrayException("Cannot open array; Unsupported query type."); } @@ -480,13 +473,15 @@ Status Array::close() { // metadata it won't trigger a deadlock metadata_loaded_ = true; auto rest_client = resources_.rest_client(); - if (rest_client == nullptr) + if (rest_client == nullptr) { throw Status_ArrayError( "Error closing array; remote array with no REST client."); - st = rest_client->post_array_metadata_to_rest( - array_uri_, timestamp_start_, timestamp_end_opened_at_, this); - if (!st.ok()) - throw StatusException(st); + } + throw_if_not_ok(rest_client->post_array_metadata_to_rest( + array_uri_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, + this)); } // Storage manager does not own the array schema for remote arrays. @@ -510,7 +505,6 @@ Status Array::close() { array_schemas_all_.clear(); metadata_.clear(); metadata_loaded_ = false; - } catch (std::exception& e) { is_opening_or_closing_ = false; throw Status_ArrayError(e.what()); @@ -618,8 +612,8 @@ std::vector> Array::get_enumerations( loaded = rest_client->post_enumerations_from_rest( array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, this, names_to_load); } else { @@ -820,43 +814,58 @@ const EncryptionKey& Array::get_encryption_key() const { } Status Array::reopen() { - if (timestamp_end_ == timestamp_end_opened_at_) { - // The user has not set `timestamp_end_` since it was last opened. - // In this scenario, re-open at the current timestamp. - return reopen(timestamp_start_, utils::time::timestamp_now_ms()); + // Note: Array will only reopen for reads. This is why we are checking the + // timestamp for the array directory and not new components. This needs to be + // updated if non-read reopens are allowed. + if (!user_set_timestamp_end_.has_value() || + user_set_timestamp_end_.value() == array_dir_timestamp_end_) { + // The user has not set `timestamp_end_` since it was last opened or set it + // to use the current timestamp. In this scenario, re-open at the current + // timestamp. + return reopen(array_dir_timestamp_start_, utils::time::timestamp_now_ms()); } else { - // The user has set `timestamp_end_`. Reopen at that time stamp. - return reopen(timestamp_start_, timestamp_end_); + // The user has changed the end timestamp. Reopen at the user set timestamp. + return reopen(array_dir_timestamp_start_, user_set_timestamp_end_.value()); } } Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { auto timer = resources_.stats().start_timer("array_reopen"); + // Check the array was opened already in READ mode. if (!is_open_) { return LOG_STATUS( Status_ArrayError("Cannot reopen array; Array is not open")); } - if (query_type_ != QueryType::READ) { - return LOG_STATUS( - Status_ArrayError("Cannot reopen array; Array was " - "not opened in read mode")); + return LOG_STATUS(Status_ArrayError( + "Cannot reopen array; Array was not opened in read mode")); + } + + // Update the user set timestamp and the timestamp range to pass to the array + // directory. + if (timestamp_end == UINT64_MAX) { + user_set_timestamp_end_ = nullopt; + array_dir_timestamp_end_ = utils::time::timestamp_now_ms(); + + } else { + user_set_timestamp_end_ = timestamp_end; + array_dir_timestamp_end_ = timestamp_end; } + array_dir_timestamp_start_ = timestamp_start; + // Reset the last max buffer sizes. clear_last_max_buffer_sizes(); - timestamp_start_ = timestamp_start; - timestamp_end_opened_at_ = timestamp_end; + // Reopen metadata. fragment_metadata_.clear(); metadata_.clear(); metadata_loaded_ = false; + + // Reset the non-empty domain - may be different. non_empty_domain_.clear(); non_empty_domain_computed_ = false; - if (timestamp_end_opened_at_ == UINT64_MAX) { - timestamp_end_opened_at_ = utils::time::timestamp_now_ms(); - } - + // Use open to reopen a remote array. if (remote_) { try { set_array_closed(); @@ -873,14 +882,15 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { encryption_key_->key().size()); } + // Reload the array directory in READ mode (reopen only supports reads). try { { auto timer_se = resources_.stats().start_timer("array_reopen_directory"); array_dir_ = ArrayDirectory( resources_, array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, query_type_ == QueryType::READ ? ArrayDirectoryMode::READ : ArrayDirectoryMode::SCHEMA_ONLY); } @@ -888,6 +898,7 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { return LOG_STATUS(Status_ArrayDirectoryError(le.what())); } + // Reopen the array and update private variables. std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) = open_for_reads(); @@ -1132,6 +1143,19 @@ bool Array::serialize_metadata() const { return serialize_metadata_array_open; } +void Array::set_timestamps( + uint64_t timestamp_start, uint64_t timestamp_end, bool set_current_time) { + array_dir_timestamp_start_ = timestamp_start; + array_dir_timestamp_end_ = (set_current_time && timestamp_end == UINT64_MAX) ? + utils::time::timestamp_now_ms() : + timestamp_end; + if (timestamp_end == 0 || timestamp_end == UINT64_MAX) { + new_component_timestamp_ = nullopt; + } else { + new_component_timestamp_ = timestamp_end; + } +} + bool Array::use_refactored_array_open() const { auto found = false; auto refactored_array_open = false; @@ -1146,6 +1170,10 @@ bool Array::use_refactored_array_open() const { return refactored_array_open || use_refactored_query_submit(); } +uint64_t Array::timestamp_for_new_component() const { + return new_component_timestamp_.value_or(utils::time::timestamp_now_ms()); +} + bool Array::use_refactored_query_submit() const { auto found = false; auto refactored_query_submit = false; @@ -1503,7 +1531,10 @@ Status Array::load_metadata() { "Cannot load metadata; remote array with no REST client.")); } RETURN_NOT_OK(rest_client->get_array_metadata_from_rest( - array_uri_, timestamp_start_, timestamp_end_opened_at_, this)); + array_uri_, + array_dir_timestamp_start_, + timestamp_end_opened_at(), + this)); } else { do_load_metadata(); } @@ -1519,7 +1550,7 @@ Status Array::load_remote_non_empty_domain() { "Cannot load metadata; remote array with no REST client.")); } RETURN_NOT_OK(rest_client->get_array_non_empty_domain( - this, timestamp_start_, timestamp_end_opened_at_)); + this, array_dir_timestamp_start_, timestamp_end_opened_at())); non_empty_domain_computed_ = true; } return Status::Ok(); @@ -1541,7 +1572,11 @@ ArrayDirectory& Array::load_array_directory() { ArrayDirectoryMode::READ; array_dir_ = ArrayDirectory( - resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_, mode); + resources_, + array_uri_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, + mode); return array_dir_; } diff --git a/tiledb/sm/array/array.h b/tiledb/sm/array/array.h index 8023c5efc9e..37a0f535354 100644 --- a/tiledb/sm/array/array.h +++ b/tiledb/sm/array/array.h @@ -341,36 +341,75 @@ class Array { */ Status reopen(uint64_t timestamp_start, uint64_t timestamp_end); - /** Returns the start timestamp. */ + /** Returns the start timestamp used to load the array directory. */ inline uint64_t timestamp_start() const { - return timestamp_start_; + return array_dir_timestamp_start_; } - /** Returns the end timestamp. */ + /** + * Returns the end timestamp as set by the user. + * + * This may differ from the actual timestamp in use if the array has not yet + * been opened, the user has changed this value, or if using the sentinel + * value of `UINT64_MAX`. + */ inline uint64_t timestamp_end() const { - return timestamp_end_; + return user_set_timestamp_end_.value_or(UINT64_MAX); } - /** Returns the timestamp at which the array was opened. */ + /** + * Returns the timestamp at which the array was opened. + * + * WARNING: This is a legacy function that is needed to support the current + * API and REST calls. Do not use in new code. + */ inline uint64_t timestamp_end_opened_at() const { - return timestamp_end_opened_at_; + return query_type_ == QueryType::READ ? + array_dir_timestamp_end_ : + new_component_timestamp_.value_or(0); } + /** + * Returns the timestamp to use when writing components (fragment, + * metadata, etc.) + * + * If set to use the lastest time, this will get the time when called. + */ + uint64_t timestamp_for_new_component() const; + /** Directly set the timestamp start value. */ inline void set_timestamp_start(uint64_t timestamp_start) { - timestamp_start_ = timestamp_start; + array_dir_timestamp_start_ = timestamp_start; } /** Directly set the timestamp end value. */ inline void set_timestamp_end(uint64_t timestamp_end) { - timestamp_end_ = timestamp_end; + if (timestamp_end == UINT64_MAX) { + user_set_timestamp_end_ = nullopt; + } else { + user_set_timestamp_end_ = timestamp_end; + } } - /** Directly set the timestamp end opened at value. */ - inline void set_timestamp_end_opened_at( - const uint64_t timestamp_end_opened_at) { - timestamp_end_opened_at_ = timestamp_end_opened_at; - } + /** + * Set the internal timestamps. + * + * Note for sentinel values for `timestamp_end`: + * * `timestamp_end == UINT64_MAX`: + * The array directory end timestamp will be set to the current time if + * ``set_current_time=True``. New components will use the time at query + * submission. + * * `timestamp_end` == 0: + * The new component timestamp will use the time at query submission. + * + * @param timestamp_start The starting timestamp for opening the array + * directory. + * @param timstamp_end The ending timestamp for opening the array directory + * and setting new components. See above comments for sentinel values `0` and + * `UINT64_MAX`. + */ + void set_timestamps( + uint64_t timetamp_start, uint64_t timestamp_end, bool set_current_time); /** Directly set the array config. * @@ -631,28 +670,40 @@ class Array { QueryType query_type_ = QueryType::READ; /** - * The starting timestamp between to open `open_array_` at. - * In TileDB, timestamps are in ms elapsed since - * 1970-01-01 00:00:00 +0000 (UTC). + * Starting timestamp to open fragments between. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). */ - uint64_t timestamp_start_; + uint64_t array_dir_timestamp_start_; /** - * The ending timestamp between to open `open_array_` at. - * In TileDB, timestamps are in ms elapsed since - * 1970-01-01 00:00:00 +0000 (UTC). A value of UINT64_T - * will be interpretted as the current timestamp. + * Timestamp set by the user. + * + * This is used when setting the end timestamp for loading the array directory + * and the timestamp to use when creating fragments, metadata, etc. This may + * be changed by the user at any time. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). If set to + * `nullopt`, use the current time. */ - uint64_t timestamp_end_; + optional user_set_timestamp_end_; /** - * The ending timestamp that the array was last opened - * at. This is useful when `timestamp_end_` has been - * set to UINT64_T. In this scenario, this variable will - * store the timestamp for the time that the array was - * opened. + * Ending timestamp to open fragments between. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). Set to a + * sentinel value of UINT64_MAX before the array is opened. + */ + uint64_t array_dir_timestamp_end_; + + /** + * The timestamp to use when creating fragments, delete/update commits, + * metadata, etc. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). If set to + * `nullopt`, use the current time. */ - uint64_t timestamp_end_opened_at_; + optional new_component_timestamp_; /** TileDB storage manager. */ StorageManager* storage_manager_; diff --git a/tiledb/sm/serialization/array.cc b/tiledb/sm/serialization/array.cc index 812430e97ac..b0d3f3c93b7 100644 --- a/tiledb/sm/serialization/array.cc +++ b/tiledb/sm/serialization/array.cc @@ -240,8 +240,6 @@ Status array_from_capnp( if (array_reader.hasUri()) { array->set_uri_serialized(array_reader.getUri().cStr()); } - array->set_timestamp_start(array_reader.getStartTimestamp()); - array->set_timestamp_end(array_reader.getEndTimestamp()); if (array_reader.hasQueryType()) { auto query_type_str = array_reader.getQueryType(); @@ -252,22 +250,16 @@ Status array_from_capnp( array->set_serialized_array_open(); } - array->set_timestamp_end_opened_at(array_reader.getOpenedAtEndTimestamp()); - if (array->timestamp_end_opened_at() == UINT64_MAX) { - if (query_type == QueryType::READ) { - array->set_timestamp_end_opened_at( - tiledb::sm::utils::time::timestamp_now_ms()); - } else if ( - query_type == QueryType::WRITE || - query_type == QueryType::MODIFY_EXCLUSIVE || - query_type == QueryType::DELETE || query_type == QueryType::UPDATE) { - array->set_timestamp_end_opened_at(0); - } else { - throw StatusException(Status_SerializationError( - "Cannot open array; Unsupported query type.")); - } - } - } + array->set_timestamps( + array_reader.getStartTimestamp(), + array_reader.getEndTimestamp(), + query_type == QueryType::READ); + } else { + array->set_timestamps( + array_reader.getStartTimestamp(), + array_reader.getEndTimestamp(), + false); + }; if (array_reader.hasArraySchemasAll()) { std::unordered_map> all_schemas;