Skip to content

Commit

Permalink
Splits timestamp_end_opened_at_ into 2 separate timestamps
Browse files Browse the repository at this point in the history
The private `Array` variable `timestamp_end_opened_at_` is being used
for two different use cases:

1. Specify end of timestamp range for fragments to load from file.
2. Specify timestamp to use for new fragments, metadata, etc.

Split the private variable into two variables to make this explicit.
  • Loading branch information
jp-dark committed Nov 4, 2022
1 parent 68320a8 commit 0da5d8a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 78 deletions.
142 changes: 83 additions & 59 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ Array::Array(
, encryption_key_(make_shared<EncryptionKey>(HERE()))
, is_open_(false)
, is_opening_or_closing_(false)
, timestamp_start_(0)
, timestamp_end_(UINT64_MAX)
, timestamp_end_opened_at_(UINT64_MAX)
, array_dir_timestamp_start_(0)
, user_set_timestamp_end_(UINT64_MAX)
, array_dir_timestamp_end_(UINT64_MAX)
, new_component_timestamp_(UINT64_MAX)
, storage_manager_(storage_manager)
, config_(storage_manager_->config())
, remote_(array_uri.is_tiledb())
Expand Down Expand Up @@ -232,8 +233,8 @@ Status Array::open(
uint32_t key_length) {
return Array::open(
query_type,
timestamp_start_,
timestamp_end_,
array_dir_timestamp_start_,
user_set_timestamp_end_,
encryption_type,
encryption_key,
key_length);
Expand All @@ -256,8 +257,9 @@ Status Array::open(
metadata_.clear();
metadata_loaded_ = false;
non_empty_domain_computed_ = false;
timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;
array_dir_timestamp_start_ = timestamp_start;
array_dir_timestamp_end_ = timestamp_end;
new_component_timestamp_ = timestamp_end;
query_type_ = query_type;

/* Note: the open status MUST be exception safe. If anything interrupts the
Expand Down Expand Up @@ -329,14 +331,16 @@ Status Array::open(
throw StatusException(st);
}

if (timestamp_end_opened_at_ == UINT64_MAX) {
if (timestamp_end == UINT64_MAX) {
if (query_type == QueryType::READ) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
array_dir_timestamp_end_ = utils::time::timestamp_now_ms();
new_component_timestamp_ = 0;
} else if (
query_type == QueryType::WRITE ||
query_type == QueryType::MODIFY_EXCLUSIVE ||
query_type == QueryType::DELETE || query_type == QueryType::UPDATE) {
timestamp_end_opened_at_ = 0;
array_dir_timestamp_end_ = UINT64_MAX;
new_component_timestamp_ = 0;
} else {
throw Status_ArrayError("Cannot open array; Unsupported query type.");
}
Expand All @@ -351,29 +355,23 @@ Status Array::open(
if (!use_refactored_array_open()) {
auto&& [st, array_schema_latest] =
rest_client->get_array_schema_from_rest(array_uri_);
if (!st.ok()) {
throw StatusException(st);
}
throw_if_not_ok(st);
array_schema_latest_ = array_schema_latest.value();
} else {
auto st = rest_client->post_array_from_rest(array_uri_, this);
if (!st.ok()) {
throw StatusException(st);
}
throw_if_not_ok(rest_client->post_array_from_rest(array_uri_, this));
}
} else if (query_type == QueryType::READ) {
array_dir_ = ArrayDirectory(
storage_manager_->vfs(),
storage_manager_->compute_tp(),
array_uri_,
timestamp_start_,
timestamp_end_opened_at_);
array_dir_timestamp_start_,
array_dir_timestamp_end_);

auto&& [st, array_schema_latest, array_schemas, fragment_metadata] =
storage_manager_->array_open_for_reads(this);
if (!st.ok()) {
throw StatusException(st);
}
throw_if_not_ok(st);

// Set schemas
array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();
Expand All @@ -385,8 +383,8 @@ Status Array::open(
storage_manager_->vfs(),
storage_manager_->compute_tp(),
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::SCHEMA_ONLY);

auto&& [st, array_schema_latest, array_schemas] =
Expand All @@ -397,15 +395,17 @@ Status Array::open(
array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();

metadata_.reset(timestamp_end_opened_at_);
// Set the timestamp
metadata_.reset(timestamp_for_new_component());

} else if (
query_type == QueryType::DELETE || query_type == QueryType::UPDATE) {
array_dir_ = ArrayDirectory(
storage_manager_->vfs(),
storage_manager_->compute_tp(),
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::READ);

auto&& [st, array_schema_latest, array_schemas] =
Expand Down Expand Up @@ -436,7 +436,8 @@ Status Array::open(
return LOG_STATUS(Status_ArrayError(err.str()));
}

metadata_.reset(timestamp_end_opened_at_);
// Updates the timestamp to use for metadata.
metadata_.reset(timestamp_for_new_component());
} else {
throw Status_ArrayError("Cannot open array; Unsupported query type.");
}
Expand Down Expand Up @@ -476,13 +477,15 @@ Status Array::close() {
// metadata it won't trigger a deadlock
metadata_loaded_ = true;
auto rest_client = storage_manager_->rest_client();
if (rest_client == nullptr)
if (rest_client == nullptr) {
throw Status_ArrayError(
"Error closing array; remote array with no REST client.");
st = rest_client->post_array_metadata_to_rest(
array_uri_, timestamp_start_, timestamp_end_opened_at_, this);
if (!st.ok())
throw StatusException(st);
}
throw_if_not_ok(rest_client->post_array_metadata_to_rest(
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this));
}

// Storage manager does not own the array schema for remote arrays.
Expand Down Expand Up @@ -751,42 +754,54 @@ const EncryptionKey& Array::get_encryption_key() const {
}

Status Array::reopen() {
if (timestamp_end_ == timestamp_end_opened_at_) {
// Note: Timestamp will only reopen for reads. This is why we are checking the
// timestamp for the array directory and not new components. This needs to be
// updated if non-read timestamps are added to the ``reopen(timestamp_start,
// timestamp_end)`` below.

if (user_set_timestamp_end_ == array_dir_timestamp_end_) {
// The user has not set `timestamp_end_` since it was last opened.
// In this scenario, re-open at the current timestamp.
return reopen(timestamp_start_, utils::time::timestamp_now_ms());
return reopen(array_dir_timestamp_start_, utils::time::timestamp_now_ms());
} else {
// The user has set `timestamp_end_`. Reopen at that time stamp.
return reopen(timestamp_start_, timestamp_end_);
// The user has changed the end timestamp or it is set to UINT64_MAX. Reopen
// at the user set timestamp.
return reopen(array_dir_timestamp_start_, user_set_timestamp_end_);
}
}

Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
// Check the array was opened already in READ mode.
if (!is_open_) {
return LOG_STATUS(
Status_ArrayError("Cannot reopen array; Array is not open"));
}

if (query_type_ != QueryType::READ) {
return LOG_STATUS(
Status_ArrayError("Cannot reopen array; Array was "
"not opened in read mode"));
return LOG_STATUS(Status_ArrayError(
"Cannot reopen array; Array was not opened in read mode"));
}

// Update the user set timestamp and the timestamp range to pass to the array
// directory.
user_set_timestamp_end_ = timestamp_end;
array_dir_timestamp_start_ = timestamp_start;
array_dir_timestamp_end_ = timestamp_end == UINT64_MAX ?
utils::time::timestamp_now_ms() :
timestamp_end;

// Reset the last max buffer sizes.
clear_last_max_buffer_sizes();

timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;
// Reopen metadata.
fragment_metadata_.clear();
metadata_.clear();
metadata_loaded_ = false;

// Reset the non-empty domain - may be different.
non_empty_domain_.clear();
non_empty_domain_computed_ = false;

if (timestamp_end_opened_at_ == UINT64_MAX) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
}

// Use open to reopen a remote array.
if (remote_) {
return open(
query_type_,
Expand All @@ -795,23 +810,23 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
encryption_key_->key().size());
}

// Reload the array directory in READ mode (reopen only supports reads).
try {
array_dir_ = ArrayDirectory(
storage_manager_->vfs(),
storage_manager_->compute_tp(),
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
query_type_ == QueryType::READ ? ArrayDirectoryMode::READ :
ArrayDirectoryMode::SCHEMA_ONLY);
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::READ);
} catch (const std::logic_error& le) {
return LOG_STATUS(Status_ArrayDirectoryError(le.what()));
}

// Reopen the array and update private variables.
auto&& [st, array_schema_latest, array_schemas, fragment_metadata] =
storage_manager_->array_reopen(this);
RETURN_NOT_OK(st);

array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();
fragment_metadata_ = fragment_metadata.value();
Expand All @@ -820,25 +835,31 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
}

Status Array::set_timestamp_start(const uint64_t timestamp_start) {
timestamp_start_ = timestamp_start;
array_dir_timestamp_start_ = timestamp_start;
return Status::Ok();
}

uint64_t Array::timestamp_start() const {
return timestamp_start_;
return array_dir_timestamp_start_;
}

Status Array::set_timestamp_end(const uint64_t timestamp_end) {
timestamp_end_ = timestamp_end;
user_set_timestamp_end_ = timestamp_end;
return Status::Ok();
}

uint64_t Array::timestamp_end() const {
return timestamp_end_;
return user_set_timestamp_end_;
}

uint64_t Array::timestamp_end_opened_at() const {
return timestamp_end_opened_at_;
return query_type_ == QueryType::READ ? array_dir_timestamp_end_ :
new_component_timestamp_;
}

uint64_t Array::timestamp_for_new_component() const {
return new_component_timestamp_ == 0 ? utils::time::timestamp_now_ms() :
new_component_timestamp_;
}

Status Array::set_config(Config config) {
Expand Down Expand Up @@ -1219,7 +1240,10 @@ Status Array::load_metadata() {
"Cannot load metadata; remote array with no REST client."));
}
RETURN_NOT_OK(rest_client->get_array_metadata_from_rest(
array_uri_, timestamp_start_, timestamp_end_opened_at_, this));
array_uri_,
array_dir_timestamp_start_,
timestamp_end_opened_at(),
this));
} else {
assert(array_dir_.loaded());
storage_manager_->load_array_metadata(
Expand All @@ -1237,7 +1261,7 @@ Status Array::load_remote_non_empty_domain() {
"Cannot load metadata; remote array with no REST client."));
}
RETURN_NOT_OK(rest_client->get_array_non_empty_domain(
this, timestamp_start_, timestamp_end_opened_at_));
this, array_dir_timestamp_start_, timestamp_end_opened_at()));
non_empty_domain_computed_ = true;
}
return Status::Ok();
Expand Down
Loading

0 comments on commit 0da5d8a

Please sign in to comment.