diff --git a/tiledb/sm/array/array.cc b/tiledb/sm/array/array.cc index 1a140954676..5f9350ad0f2 100644 --- a/tiledb/sm/array/array.cc +++ b/tiledb/sm/array/array.cc @@ -91,9 +91,10 @@ Array::Array( , encryption_key_(make_shared(HERE())) , is_open_(false) , is_opening_or_closing_(false) - , timestamp_start_(0) - , timestamp_end_(UINT64_MAX) - , timestamp_end_opened_at_(UINT64_MAX) + , array_dir_timestamp_start_(0) + , user_set_timestamp_end_(nullopt) + , array_dir_timestamp_end_(UINT64_MAX) + , new_component_timestamp_(nullopt) , storage_manager_(storage_manager) , resources_(storage_manager_->resources()) , config_(resources_.config()) @@ -245,8 +246,8 @@ Status Array::open( uint32_t key_length) { return Array::open( query_type, - timestamp_start_, - timestamp_end_, + array_dir_timestamp_start_, + user_set_timestamp_end_.value_or(UINT64_MAX), encryption_type, encryption_key, key_length); @@ -265,16 +266,17 @@ Status Array::open( // Checks if (is_open()) { return LOG_STATUS( - Status_ArrayError("Cannot open array; Array already open")); + Status_ArrayError("Cannot open array; Array already open.")); } metadata_.clear(); metadata_loaded_ = false; non_empty_domain_computed_ = false; - timestamp_start_ = timestamp_start; - timestamp_end_opened_at_ = timestamp_end; query_type_ = query_type; + set_timestamps( + timestamp_start, timestamp_end, query_type_ == QueryType::READ); + /* Note: the open status MUST be exception safe. If anything interrupts the * opening process, it will throw and the array will be set as closed. */ try { @@ -338,19 +340,6 @@ Status Array::open( throw StatusException(st); } - if (timestamp_end_opened_at_ == UINT64_MAX) { - if (query_type == QueryType::READ) { - timestamp_end_opened_at_ = utils::time::timestamp_now_ms(); - } else if ( - query_type == QueryType::WRITE || - query_type == QueryType::MODIFY_EXCLUSIVE || - query_type == QueryType::DELETE || query_type == QueryType::UPDATE) { - timestamp_end_opened_at_ = 0; - } else { - throw ArrayException("Cannot open array; Unsupported query type."); - } - } - if (remote_) { auto rest_client = resources_.rest_client(); if (rest_client == nullptr) { @@ -360,9 +349,7 @@ Status Array::open( if (!use_refactored_array_open()) { auto&& [st, array_schema_latest] = rest_client->get_array_schema_from_rest(array_uri_); - if (!st.ok()) { - throw StatusException(st); - } + throw_if_not_ok(st); array_schema_latest_ = array_schema_latest.value(); } else { auto st = rest_client->post_array_from_rest( @@ -376,7 +363,10 @@ Status Array::open( auto timer_se = resources_.stats().start_timer("array_open_read_load_directory"); array_dir_ = ArrayDirectory( - resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_); + resources_, + array_uri_, + array_dir_timestamp_start_, + array_dir_timestamp_end_); } std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) = open_for_reads(); @@ -389,8 +379,8 @@ Status Array::open( array_dir_ = ArrayDirectory( resources_, array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, ArrayDirectoryMode::SCHEMA_ONLY); } auto&& [st, array_schema_latest, array_schemas] = open_for_writes(); @@ -400,7 +390,9 @@ Status Array::open( array_schema_latest_ = array_schema_latest.value(); array_schemas_all_ = array_schemas.value(); - metadata_.reset(timestamp_end_opened_at_); + // Set the timestamp + metadata_.reset(timestamp_for_new_component()); + } else if ( query_type == QueryType::DELETE || query_type == QueryType::UPDATE) { { @@ -409,8 +401,8 @@ Status Array::open( array_dir_ = ArrayDirectory( resources_, array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, ArrayDirectoryMode::READ); } auto&& [st, array_schema_latest, array_schemas] = open_for_writes(); @@ -440,7 +432,8 @@ Status Array::open( return LOG_STATUS(Status_ArrayError(err.str())); } - metadata_.reset(timestamp_end_opened_at_); + // Updates the timestamp to use for metadata. + metadata_.reset(timestamp_for_new_component()); } else { throw ArrayException("Cannot open array; Unsupported query type."); } @@ -480,13 +473,15 @@ Status Array::close() { // metadata it won't trigger a deadlock metadata_loaded_ = true; auto rest_client = resources_.rest_client(); - if (rest_client == nullptr) + if (rest_client == nullptr) { throw Status_ArrayError( "Error closing array; remote array with no REST client."); - st = rest_client->post_array_metadata_to_rest( - array_uri_, timestamp_start_, timestamp_end_opened_at_, this); - if (!st.ok()) - throw StatusException(st); + } + throw_if_not_ok(rest_client->post_array_metadata_to_rest( + array_uri_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, + this)); } // Storage manager does not own the array schema for remote arrays. @@ -510,7 +505,6 @@ Status Array::close() { array_schemas_all_.clear(); metadata_.clear(); metadata_loaded_ = false; - } catch (std::exception& e) { is_opening_or_closing_ = false; throw Status_ArrayError(e.what()); @@ -618,8 +612,8 @@ std::vector> Array::get_enumerations( loaded = rest_client->post_enumerations_from_rest( array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, this, names_to_load); } else { @@ -820,43 +814,58 @@ const EncryptionKey& Array::get_encryption_key() const { } Status Array::reopen() { - if (timestamp_end_ == timestamp_end_opened_at_) { - // The user has not set `timestamp_end_` since it was last opened. - // In this scenario, re-open at the current timestamp. - return reopen(timestamp_start_, utils::time::timestamp_now_ms()); + // Note: Array will only reopen for reads. This is why we are checking the + // timestamp for the array directory and not new components. This needs to be + // updated if non-read reopens are allowed. + if (!user_set_timestamp_end_.has_value() || + user_set_timestamp_end_.value() == array_dir_timestamp_end_) { + // The user has not set `timestamp_end_` since it was last opened or set it + // to use the current timestamp. In this scenario, re-open at the current + // timestamp. + return reopen(array_dir_timestamp_start_, utils::time::timestamp_now_ms()); } else { - // The user has set `timestamp_end_`. Reopen at that time stamp. - return reopen(timestamp_start_, timestamp_end_); + // The user has changed the end timestamp. Reopen at the user set timestamp. + return reopen(array_dir_timestamp_start_, user_set_timestamp_end_.value()); } } Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { auto timer = resources_.stats().start_timer("array_reopen"); + // Check the array was opened already in READ mode. if (!is_open_) { return LOG_STATUS( Status_ArrayError("Cannot reopen array; Array is not open")); } - if (query_type_ != QueryType::READ) { - return LOG_STATUS( - Status_ArrayError("Cannot reopen array; Array was " - "not opened in read mode")); + return LOG_STATUS(Status_ArrayError( + "Cannot reopen array; Array was not opened in read mode")); + } + + // Update the user set timestamp and the timestamp range to pass to the array + // directory. + if (timestamp_end == UINT64_MAX) { + user_set_timestamp_end_ = nullopt; + array_dir_timestamp_end_ = utils::time::timestamp_now_ms(); + + } else { + user_set_timestamp_end_ = timestamp_end; + array_dir_timestamp_end_ = timestamp_end; } + array_dir_timestamp_start_ = timestamp_start; + // Reset the last max buffer sizes. clear_last_max_buffer_sizes(); - timestamp_start_ = timestamp_start; - timestamp_end_opened_at_ = timestamp_end; + // Reopen metadata. fragment_metadata_.clear(); metadata_.clear(); metadata_loaded_ = false; + + // Reset the non-empty domain - may be different. non_empty_domain_.clear(); non_empty_domain_computed_ = false; - if (timestamp_end_opened_at_ == UINT64_MAX) { - timestamp_end_opened_at_ = utils::time::timestamp_now_ms(); - } - + // Use open to reopen a remote array. if (remote_) { try { set_array_closed(); @@ -873,14 +882,15 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { encryption_key_->key().size()); } + // Reload the array directory in READ mode (reopen only supports reads). try { { auto timer_se = resources_.stats().start_timer("array_reopen_directory"); array_dir_ = ArrayDirectory( resources_, array_uri_, - timestamp_start_, - timestamp_end_opened_at_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, query_type_ == QueryType::READ ? ArrayDirectoryMode::READ : ArrayDirectoryMode::SCHEMA_ONLY); } @@ -888,6 +898,7 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { return LOG_STATUS(Status_ArrayDirectoryError(le.what())); } + // Reopen the array and update private variables. std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) = open_for_reads(); @@ -1132,6 +1143,19 @@ bool Array::serialize_metadata() const { return serialize_metadata_array_open; } +void Array::set_timestamps( + uint64_t timestamp_start, uint64_t timestamp_end, bool set_current_time) { + array_dir_timestamp_start_ = timestamp_start; + array_dir_timestamp_end_ = (set_current_time && timestamp_end == UINT64_MAX) ? + utils::time::timestamp_now_ms() : + timestamp_end; + if (timestamp_end == 0 || timestamp_end == UINT64_MAX) { + new_component_timestamp_ = nullopt; + } else { + new_component_timestamp_ = timestamp_end; + } +} + bool Array::use_refactored_array_open() const { auto found = false; auto refactored_array_open = false; @@ -1146,6 +1170,10 @@ bool Array::use_refactored_array_open() const { return refactored_array_open || use_refactored_query_submit(); } +uint64_t Array::timestamp_for_new_component() const { + return new_component_timestamp_.value_or(utils::time::timestamp_now_ms()); +} + bool Array::use_refactored_query_submit() const { auto found = false; auto refactored_query_submit = false; @@ -1503,7 +1531,10 @@ Status Array::load_metadata() { "Cannot load metadata; remote array with no REST client.")); } RETURN_NOT_OK(rest_client->get_array_metadata_from_rest( - array_uri_, timestamp_start_, timestamp_end_opened_at_, this)); + array_uri_, + array_dir_timestamp_start_, + timestamp_end_opened_at(), + this)); } else { do_load_metadata(); } @@ -1519,7 +1550,7 @@ Status Array::load_remote_non_empty_domain() { "Cannot load metadata; remote array with no REST client.")); } RETURN_NOT_OK(rest_client->get_array_non_empty_domain( - this, timestamp_start_, timestamp_end_opened_at_)); + this, array_dir_timestamp_start_, timestamp_end_opened_at())); non_empty_domain_computed_ = true; } return Status::Ok(); @@ -1541,7 +1572,11 @@ ArrayDirectory& Array::load_array_directory() { ArrayDirectoryMode::READ; array_dir_ = ArrayDirectory( - resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_, mode); + resources_, + array_uri_, + array_dir_timestamp_start_, + array_dir_timestamp_end_, + mode); return array_dir_; } diff --git a/tiledb/sm/array/array.h b/tiledb/sm/array/array.h index 8023c5efc9e..37a0f535354 100644 --- a/tiledb/sm/array/array.h +++ b/tiledb/sm/array/array.h @@ -341,36 +341,75 @@ class Array { */ Status reopen(uint64_t timestamp_start, uint64_t timestamp_end); - /** Returns the start timestamp. */ + /** Returns the start timestamp used to load the array directory. */ inline uint64_t timestamp_start() const { - return timestamp_start_; + return array_dir_timestamp_start_; } - /** Returns the end timestamp. */ + /** + * Returns the end timestamp as set by the user. + * + * This may differ from the actual timestamp in use if the array has not yet + * been opened, the user has changed this value, or if using the sentinel + * value of `UINT64_MAX`. + */ inline uint64_t timestamp_end() const { - return timestamp_end_; + return user_set_timestamp_end_.value_or(UINT64_MAX); } - /** Returns the timestamp at which the array was opened. */ + /** + * Returns the timestamp at which the array was opened. + * + * WARNING: This is a legacy function that is needed to support the current + * API and REST calls. Do not use in new code. + */ inline uint64_t timestamp_end_opened_at() const { - return timestamp_end_opened_at_; + return query_type_ == QueryType::READ ? + array_dir_timestamp_end_ : + new_component_timestamp_.value_or(0); } + /** + * Returns the timestamp to use when writing components (fragment, + * metadata, etc.) + * + * If set to use the lastest time, this will get the time when called. + */ + uint64_t timestamp_for_new_component() const; + /** Directly set the timestamp start value. */ inline void set_timestamp_start(uint64_t timestamp_start) { - timestamp_start_ = timestamp_start; + array_dir_timestamp_start_ = timestamp_start; } /** Directly set the timestamp end value. */ inline void set_timestamp_end(uint64_t timestamp_end) { - timestamp_end_ = timestamp_end; + if (timestamp_end == UINT64_MAX) { + user_set_timestamp_end_ = nullopt; + } else { + user_set_timestamp_end_ = timestamp_end; + } } - /** Directly set the timestamp end opened at value. */ - inline void set_timestamp_end_opened_at( - const uint64_t timestamp_end_opened_at) { - timestamp_end_opened_at_ = timestamp_end_opened_at; - } + /** + * Set the internal timestamps. + * + * Note for sentinel values for `timestamp_end`: + * * `timestamp_end == UINT64_MAX`: + * The array directory end timestamp will be set to the current time if + * ``set_current_time=True``. New components will use the time at query + * submission. + * * `timestamp_end` == 0: + * The new component timestamp will use the time at query submission. + * + * @param timestamp_start The starting timestamp for opening the array + * directory. + * @param timstamp_end The ending timestamp for opening the array directory + * and setting new components. See above comments for sentinel values `0` and + * `UINT64_MAX`. + */ + void set_timestamps( + uint64_t timetamp_start, uint64_t timestamp_end, bool set_current_time); /** Directly set the array config. * @@ -631,28 +670,40 @@ class Array { QueryType query_type_ = QueryType::READ; /** - * The starting timestamp between to open `open_array_` at. - * In TileDB, timestamps are in ms elapsed since - * 1970-01-01 00:00:00 +0000 (UTC). + * Starting timestamp to open fragments between. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). */ - uint64_t timestamp_start_; + uint64_t array_dir_timestamp_start_; /** - * The ending timestamp between to open `open_array_` at. - * In TileDB, timestamps are in ms elapsed since - * 1970-01-01 00:00:00 +0000 (UTC). A value of UINT64_T - * will be interpretted as the current timestamp. + * Timestamp set by the user. + * + * This is used when setting the end timestamp for loading the array directory + * and the timestamp to use when creating fragments, metadata, etc. This may + * be changed by the user at any time. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). If set to + * `nullopt`, use the current time. */ - uint64_t timestamp_end_; + optional user_set_timestamp_end_; /** - * The ending timestamp that the array was last opened - * at. This is useful when `timestamp_end_` has been - * set to UINT64_T. In this scenario, this variable will - * store the timestamp for the time that the array was - * opened. + * Ending timestamp to open fragments between. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). Set to a + * sentinel value of UINT64_MAX before the array is opened. + */ + uint64_t array_dir_timestamp_end_; + + /** + * The timestamp to use when creating fragments, delete/update commits, + * metadata, etc. + * + * Timestamps are ms elapsed since 1970-01-01 00:00:00 +0000 (UTC). If set to + * `nullopt`, use the current time. */ - uint64_t timestamp_end_opened_at_; + optional new_component_timestamp_; /** TileDB storage manager. */ StorageManager* storage_manager_; diff --git a/tiledb/sm/serialization/array.cc b/tiledb/sm/serialization/array.cc index 812430e97ac..b0d3f3c93b7 100644 --- a/tiledb/sm/serialization/array.cc +++ b/tiledb/sm/serialization/array.cc @@ -240,8 +240,6 @@ Status array_from_capnp( if (array_reader.hasUri()) { array->set_uri_serialized(array_reader.getUri().cStr()); } - array->set_timestamp_start(array_reader.getStartTimestamp()); - array->set_timestamp_end(array_reader.getEndTimestamp()); if (array_reader.hasQueryType()) { auto query_type_str = array_reader.getQueryType(); @@ -252,22 +250,16 @@ Status array_from_capnp( array->set_serialized_array_open(); } - array->set_timestamp_end_opened_at(array_reader.getOpenedAtEndTimestamp()); - if (array->timestamp_end_opened_at() == UINT64_MAX) { - if (query_type == QueryType::READ) { - array->set_timestamp_end_opened_at( - tiledb::sm::utils::time::timestamp_now_ms()); - } else if ( - query_type == QueryType::WRITE || - query_type == QueryType::MODIFY_EXCLUSIVE || - query_type == QueryType::DELETE || query_type == QueryType::UPDATE) { - array->set_timestamp_end_opened_at(0); - } else { - throw StatusException(Status_SerializationError( - "Cannot open array; Unsupported query type.")); - } - } - } + array->set_timestamps( + array_reader.getStartTimestamp(), + array_reader.getEndTimestamp(), + query_type == QueryType::READ); + } else { + array->set_timestamps( + array_reader.getStartTimestamp(), + array_reader.getEndTimestamp(), + false); + }; if (array_reader.hasArraySchemasAll()) { std::unordered_map> all_schemas;