Skip to content

Commit

Permalink
Split timestamp_end_opened_at_ into 2 separate timestamps (#3633)
Browse files Browse the repository at this point in the history
The private `Array` variable `timestamp_end_opened_at_` is being used
for two different use cases:

1. Specify end of timestamp range for fragments to load from file.
2. Specify timestamp to use for new fragments, metadata, etc.

Split the private variable into two variables to make this explicit.

---
TYPE:  IMPROVEMENT
DESC: Internal clean-up of array timestamps
  • Loading branch information
jp-dark authored Nov 4, 2023
1 parent f055b0f commit 78cd951
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 106 deletions.
155 changes: 95 additions & 60 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ Array::Array(
, encryption_key_(make_shared<EncryptionKey>(HERE()))
, is_open_(false)
, is_opening_or_closing_(false)
, timestamp_start_(0)
, timestamp_end_(UINT64_MAX)
, timestamp_end_opened_at_(UINT64_MAX)
, array_dir_timestamp_start_(0)
, user_set_timestamp_end_(nullopt)
, array_dir_timestamp_end_(UINT64_MAX)
, new_component_timestamp_(nullopt)
, storage_manager_(storage_manager)
, resources_(storage_manager_->resources())
, config_(resources_.config())
Expand Down Expand Up @@ -245,8 +246,8 @@ Status Array::open(
uint32_t key_length) {
return Array::open(
query_type,
timestamp_start_,
timestamp_end_,
array_dir_timestamp_start_,
user_set_timestamp_end_.value_or(UINT64_MAX),
encryption_type,
encryption_key,
key_length);
Expand All @@ -265,16 +266,17 @@ Status Array::open(
// Checks
if (is_open()) {
return LOG_STATUS(
Status_ArrayError("Cannot open array; Array already open"));
Status_ArrayError("Cannot open array; Array already open."));
}

metadata_.clear();
metadata_loaded_ = false;
non_empty_domain_computed_ = false;
timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;
query_type_ = query_type;

set_timestamps(
timestamp_start, timestamp_end, query_type_ == QueryType::READ);

/* Note: the open status MUST be exception safe. If anything interrupts the
* opening process, it will throw and the array will be set as closed. */
try {
Expand Down Expand Up @@ -338,19 +340,6 @@ Status Array::open(
throw StatusException(st);
}

if (timestamp_end_opened_at_ == UINT64_MAX) {
if (query_type == QueryType::READ) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
} else if (
query_type == QueryType::WRITE ||
query_type == QueryType::MODIFY_EXCLUSIVE ||
query_type == QueryType::DELETE || query_type == QueryType::UPDATE) {
timestamp_end_opened_at_ = 0;
} else {
throw ArrayException("Cannot open array; Unsupported query type.");
}
}

if (remote_) {
auto rest_client = resources_.rest_client();
if (rest_client == nullptr) {
Expand All @@ -360,9 +349,7 @@ Status Array::open(
if (!use_refactored_array_open()) {
auto&& [st, array_schema_latest] =
rest_client->get_array_schema_from_rest(array_uri_);
if (!st.ok()) {
throw StatusException(st);
}
throw_if_not_ok(st);
array_schema_latest_ = array_schema_latest.value();
} else {
auto st = rest_client->post_array_from_rest(
Expand All @@ -376,7 +363,10 @@ Status Array::open(
auto timer_se =
resources_.stats().start_timer("array_open_read_load_directory");
array_dir_ = ArrayDirectory(
resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_);
resources_,
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_);
}
std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) =
open_for_reads();
Expand All @@ -389,8 +379,8 @@ Status Array::open(
array_dir_ = ArrayDirectory(
resources_,
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::SCHEMA_ONLY);
}
auto&& [st, array_schema_latest, array_schemas] = open_for_writes();
Expand All @@ -400,7 +390,9 @@ Status Array::open(
array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();

metadata_.reset(timestamp_end_opened_at_);
// Set the timestamp
metadata_.reset(timestamp_for_new_component());

} else if (
query_type == QueryType::DELETE || query_type == QueryType::UPDATE) {
{
Expand All @@ -409,8 +401,8 @@ Status Array::open(
array_dir_ = ArrayDirectory(
resources_,
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::READ);
}
auto&& [st, array_schema_latest, array_schemas] = open_for_writes();
Expand Down Expand Up @@ -440,7 +432,8 @@ Status Array::open(
return LOG_STATUS(Status_ArrayError(err.str()));
}

metadata_.reset(timestamp_end_opened_at_);
// Updates the timestamp to use for metadata.
metadata_.reset(timestamp_for_new_component());
} else {
throw ArrayException("Cannot open array; Unsupported query type.");
}
Expand Down Expand Up @@ -480,13 +473,15 @@ Status Array::close() {
// metadata it won't trigger a deadlock
metadata_loaded_ = true;
auto rest_client = resources_.rest_client();
if (rest_client == nullptr)
if (rest_client == nullptr) {
throw Status_ArrayError(
"Error closing array; remote array with no REST client.");
st = rest_client->post_array_metadata_to_rest(
array_uri_, timestamp_start_, timestamp_end_opened_at_, this);
if (!st.ok())
throw StatusException(st);
}
throw_if_not_ok(rest_client->post_array_metadata_to_rest(
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this));
}

// Storage manager does not own the array schema for remote arrays.
Expand All @@ -510,7 +505,6 @@ Status Array::close() {
array_schemas_all_.clear();
metadata_.clear();
metadata_loaded_ = false;

} catch (std::exception& e) {
is_opening_or_closing_ = false;
throw Status_ArrayError(e.what());
Expand Down Expand Up @@ -618,8 +612,8 @@ std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(

loaded = rest_client->post_enumerations_from_rest(
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this,
names_to_load);
} else {
Expand Down Expand Up @@ -820,43 +814,58 @@ const EncryptionKey& Array::get_encryption_key() const {
}

Status Array::reopen() {
if (timestamp_end_ == timestamp_end_opened_at_) {
// The user has not set `timestamp_end_` since it was last opened.
// In this scenario, re-open at the current timestamp.
return reopen(timestamp_start_, utils::time::timestamp_now_ms());
// Note: Array will only reopen for reads. This is why we are checking the
// timestamp for the array directory and not new components. This needs to be
// updated if non-read reopens are allowed.
if (!user_set_timestamp_end_.has_value() ||
user_set_timestamp_end_.value() == array_dir_timestamp_end_) {
// The user has not set `timestamp_end_` since it was last opened or set it
// to use the current timestamp. In this scenario, re-open at the current
// timestamp.
return reopen(array_dir_timestamp_start_, utils::time::timestamp_now_ms());
} else {
// The user has set `timestamp_end_`. Reopen at that time stamp.
return reopen(timestamp_start_, timestamp_end_);
// The user has changed the end timestamp. Reopen at the user set timestamp.
return reopen(array_dir_timestamp_start_, user_set_timestamp_end_.value());
}
}

Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
auto timer = resources_.stats().start_timer("array_reopen");
// Check the array was opened already in READ mode.
if (!is_open_) {
return LOG_STATUS(
Status_ArrayError("Cannot reopen array; Array is not open"));
}

if (query_type_ != QueryType::READ) {
return LOG_STATUS(
Status_ArrayError("Cannot reopen array; Array was "
"not opened in read mode"));
return LOG_STATUS(Status_ArrayError(
"Cannot reopen array; Array was not opened in read mode"));
}

// Update the user set timestamp and the timestamp range to pass to the array
// directory.
if (timestamp_end == UINT64_MAX) {
user_set_timestamp_end_ = nullopt;
array_dir_timestamp_end_ = utils::time::timestamp_now_ms();

} else {
user_set_timestamp_end_ = timestamp_end;
array_dir_timestamp_end_ = timestamp_end;
}
array_dir_timestamp_start_ = timestamp_start;

// Reset the last max buffer sizes.
clear_last_max_buffer_sizes();

timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;
// Reopen metadata.
fragment_metadata_.clear();
metadata_.clear();
metadata_loaded_ = false;

// Reset the non-empty domain - may be different.
non_empty_domain_.clear();
non_empty_domain_computed_ = false;

if (timestamp_end_opened_at_ == UINT64_MAX) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
}

// Use open to reopen a remote array.
if (remote_) {
try {
set_array_closed();
Expand All @@ -873,21 +882,23 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
encryption_key_->key().size());
}

// Reload the array directory in READ mode (reopen only supports reads).
try {
{
auto timer_se = resources_.stats().start_timer("array_reopen_directory");
array_dir_ = ArrayDirectory(
resources_,
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
query_type_ == QueryType::READ ? ArrayDirectoryMode::READ :
ArrayDirectoryMode::SCHEMA_ONLY);
}
} catch (const std::logic_error& le) {
return LOG_STATUS(Status_ArrayDirectoryError(le.what()));
}

// Reopen the array and update private variables.
std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) =
open_for_reads();

Expand Down Expand Up @@ -1132,6 +1143,19 @@ bool Array::serialize_metadata() const {
return serialize_metadata_array_open;
}

void Array::set_timestamps(
uint64_t timestamp_start, uint64_t timestamp_end, bool set_current_time) {
array_dir_timestamp_start_ = timestamp_start;
array_dir_timestamp_end_ = (set_current_time && timestamp_end == UINT64_MAX) ?
utils::time::timestamp_now_ms() :
timestamp_end;
if (timestamp_end == 0 || timestamp_end == UINT64_MAX) {
new_component_timestamp_ = nullopt;
} else {
new_component_timestamp_ = timestamp_end;
}
}

bool Array::use_refactored_array_open() const {
auto found = false;
auto refactored_array_open = false;
Expand All @@ -1146,6 +1170,10 @@ bool Array::use_refactored_array_open() const {
return refactored_array_open || use_refactored_query_submit();
}

uint64_t Array::timestamp_for_new_component() const {
return new_component_timestamp_.value_or(utils::time::timestamp_now_ms());
}

bool Array::use_refactored_query_submit() const {
auto found = false;
auto refactored_query_submit = false;
Expand Down Expand Up @@ -1503,7 +1531,10 @@ Status Array::load_metadata() {
"Cannot load metadata; remote array with no REST client."));
}
RETURN_NOT_OK(rest_client->get_array_metadata_from_rest(
array_uri_, timestamp_start_, timestamp_end_opened_at_, this));
array_uri_,
array_dir_timestamp_start_,
timestamp_end_opened_at(),
this));
} else {
do_load_metadata();
}
Expand All @@ -1519,7 +1550,7 @@ Status Array::load_remote_non_empty_domain() {
"Cannot load metadata; remote array with no REST client."));
}
RETURN_NOT_OK(rest_client->get_array_non_empty_domain(
this, timestamp_start_, timestamp_end_opened_at_));
this, array_dir_timestamp_start_, timestamp_end_opened_at()));
non_empty_domain_computed_ = true;
}
return Status::Ok();
Expand All @@ -1541,7 +1572,11 @@ ArrayDirectory& Array::load_array_directory() {
ArrayDirectoryMode::READ;

array_dir_ = ArrayDirectory(
resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_, mode);
resources_,
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
mode);

return array_dir_;
}
Expand Down
Loading

0 comments on commit 78cd951

Please sign in to comment.