Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split timestamp_end_opened_at_ into 2 separate timestamps #3633

Merged
merged 1 commit into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 95 additions & 60 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ Array::Array(
, encryption_key_(make_shared<EncryptionKey>(HERE()))
, is_open_(false)
, is_opening_or_closing_(false)
, timestamp_start_(0)
, timestamp_end_(UINT64_MAX)
, timestamp_end_opened_at_(UINT64_MAX)
, array_dir_timestamp_start_(0)
, user_set_timestamp_end_(nullopt)
, array_dir_timestamp_end_(UINT64_MAX)
, new_component_timestamp_(nullopt)
, storage_manager_(storage_manager)
, resources_(storage_manager_->resources())
, config_(resources_.config())
Expand Down Expand Up @@ -244,8 +245,8 @@ Status Array::open(
uint32_t key_length) {
return Array::open(
query_type,
timestamp_start_,
timestamp_end_,
array_dir_timestamp_start_,
user_set_timestamp_end_.value_or(UINT64_MAX),
encryption_type,
encryption_key,
key_length);
Expand All @@ -264,16 +265,17 @@ Status Array::open(
// Checks
if (is_open()) {
return LOG_STATUS(
Status_ArrayError("Cannot open array; Array already open"));
Status_ArrayError("Cannot open array; Array already open."));
}

metadata_.clear();
metadata_loaded_ = false;
non_empty_domain_computed_ = false;
timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;
query_type_ = query_type;

set_timestamps(
timestamp_start, timestamp_end, query_type_ == QueryType::READ);

/* Note: the open status MUST be exception safe. If anything interrupts the
* opening process, it will throw and the array will be set as closed. */
try {
Expand Down Expand Up @@ -343,19 +345,6 @@ Status Array::open(
throw StatusException(st);
}

if (timestamp_end_opened_at_ == UINT64_MAX) {
if (query_type == QueryType::READ) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
} else if (
query_type == QueryType::WRITE ||
query_type == QueryType::MODIFY_EXCLUSIVE ||
query_type == QueryType::DELETE || query_type == QueryType::UPDATE) {
timestamp_end_opened_at_ = 0;
} else {
throw ArrayException("Cannot open array; Unsupported query type.");
}
}

if (remote_) {
auto rest_client = resources_.rest_client();
if (rest_client == nullptr) {
Expand All @@ -365,9 +354,7 @@ Status Array::open(
if (!use_refactored_array_open()) {
auto&& [st, array_schema_latest] =
rest_client->get_array_schema_from_rest(array_uri_);
if (!st.ok()) {
throw StatusException(st);
}
throw_if_not_ok(st);
array_schema_latest_ = array_schema_latest.value();
} else {
auto st = rest_client->post_array_from_rest(
Expand All @@ -381,7 +368,10 @@ Status Array::open(
auto timer_se =
resources_.stats().start_timer("array_open_read_load_directory");
array_dir_ = ArrayDirectory(
resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_);
resources_,
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_);
}
std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) =
open_for_reads();
Expand All @@ -394,8 +384,8 @@ Status Array::open(
array_dir_ = ArrayDirectory(
resources_,
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::SCHEMA_ONLY);
}
auto&& [st, array_schema_latest, array_schemas] = open_for_writes();
Expand All @@ -405,7 +395,9 @@ Status Array::open(
array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();

metadata_.reset(timestamp_end_opened_at_);
// Set the timestamp
metadata_.reset(timestamp_for_new_component());

} else if (
query_type == QueryType::DELETE || query_type == QueryType::UPDATE) {
{
Expand All @@ -414,8 +406,8 @@ Status Array::open(
array_dir_ = ArrayDirectory(
resources_,
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
ArrayDirectoryMode::READ);
}
auto&& [st, array_schema_latest, array_schemas] = open_for_writes();
Expand Down Expand Up @@ -445,7 +437,8 @@ Status Array::open(
return LOG_STATUS(Status_ArrayError(err.str()));
}

metadata_.reset(timestamp_end_opened_at_);
// Updates the timestamp to use for metadata.
metadata_.reset(timestamp_for_new_component());
} else {
throw ArrayException("Cannot open array; Unsupported query type.");
}
Expand Down Expand Up @@ -485,13 +478,15 @@ Status Array::close() {
// metadata it won't trigger a deadlock
metadata_loaded_ = true;
auto rest_client = resources_.rest_client();
if (rest_client == nullptr)
if (rest_client == nullptr) {
throw Status_ArrayError(
"Error closing array; remote array with no REST client.");
st = rest_client->post_array_metadata_to_rest(
array_uri_, timestamp_start_, timestamp_end_opened_at_, this);
if (!st.ok())
throw StatusException(st);
}
throw_if_not_ok(rest_client->post_array_metadata_to_rest(
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this));
}

// Storage manager does not own the array schema for remote arrays.
Expand All @@ -515,7 +510,6 @@ Status Array::close() {
array_schemas_all_.clear();
metadata_.clear();
metadata_loaded_ = false;

} catch (std::exception& e) {
is_opening_or_closing_ = false;
throw Status_ArrayError(e.what());
Expand Down Expand Up @@ -623,8 +617,8 @@ std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(

loaded = rest_client->post_enumerations_from_rest(
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this,
names_to_load);
} else {
Expand Down Expand Up @@ -825,43 +819,58 @@ const EncryptionKey& Array::get_encryption_key() const {
}

Status Array::reopen() {
if (timestamp_end_ == timestamp_end_opened_at_) {
// The user has not set `timestamp_end_` since it was last opened.
// In this scenario, re-open at the current timestamp.
return reopen(timestamp_start_, utils::time::timestamp_now_ms());
// Note: Array will only reopen for reads. This is why we are checking the
// timestamp for the array directory and not new components. This needs to be
// updated if non-read reopens are allowed.
if (!user_set_timestamp_end_.has_value() ||
user_set_timestamp_end_.value() == array_dir_timestamp_end_) {
// The user has not set `timestamp_end_` since it was last opened or set it
// to use the current timestamp. In this scenario, re-open at the current
// timestamp.
return reopen(array_dir_timestamp_start_, utils::time::timestamp_now_ms());
} else {
// The user has set `timestamp_end_`. Reopen at that time stamp.
return reopen(timestamp_start_, timestamp_end_);
// The user has changed the end timestamp. Reopen at the user set timestamp.
return reopen(array_dir_timestamp_start_, user_set_timestamp_end_.value());
}
}

Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
auto timer = resources_.stats().start_timer("array_reopen");
// Check the array was opened already in READ mode.
if (!is_open_) {
return LOG_STATUS(
Status_ArrayError("Cannot reopen array; Array is not open"));
}

if (query_type_ != QueryType::READ) {
return LOG_STATUS(
Status_ArrayError("Cannot reopen array; Array was "
"not opened in read mode"));
return LOG_STATUS(Status_ArrayError(
"Cannot reopen array; Array was not opened in read mode"));
}

// Update the user set timestamp and the timestamp range to pass to the array
// directory.
if (timestamp_end == UINT64_MAX) {
user_set_timestamp_end_ = nullopt;
array_dir_timestamp_end_ = utils::time::timestamp_now_ms();

} else {
user_set_timestamp_end_ = timestamp_end;
array_dir_timestamp_end_ = timestamp_end;
}
array_dir_timestamp_start_ = timestamp_start;

// Reset the last max buffer sizes.
clear_last_max_buffer_sizes();

timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;
// Reopen metadata.
fragment_metadata_.clear();
metadata_.clear();
metadata_loaded_ = false;

// Reset the non-empty domain - may be different.
non_empty_domain_.clear();
non_empty_domain_computed_ = false;

if (timestamp_end_opened_at_ == UINT64_MAX) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
}

// Use open to reopen a remote array.
if (remote_) {
try {
set_array_closed();
Expand All @@ -878,21 +887,23 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
encryption_key_->key().size());
}

// Reload the array directory in READ mode (reopen only supports reads).
try {
{
auto timer_se = resources_.stats().start_timer("array_reopen_directory");
array_dir_ = ArrayDirectory(
resources_,
array_uri_,
timestamp_start_,
timestamp_end_opened_at_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
query_type_ == QueryType::READ ? ArrayDirectoryMode::READ :
ArrayDirectoryMode::SCHEMA_ONLY);
}
} catch (const std::logic_error& le) {
return LOG_STATUS(Status_ArrayDirectoryError(le.what()));
}

// Reopen the array and update private variables.
std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) =
open_for_reads();

Expand Down Expand Up @@ -1137,6 +1148,19 @@ bool Array::serialize_metadata() const {
return serialize_metadata_array_open;
}

void Array::set_timestamps(
uint64_t timestamp_start, uint64_t timestamp_end, bool set_current_time) {
array_dir_timestamp_start_ = timestamp_start;
array_dir_timestamp_end_ = (set_current_time && timestamp_end == UINT64_MAX) ?
utils::time::timestamp_now_ms() :
timestamp_end;
if (timestamp_end == 0 || timestamp_end == UINT64_MAX) {
new_component_timestamp_ = nullopt;
} else {
new_component_timestamp_ = timestamp_end;
}
}

bool Array::use_refactored_array_open() const {
auto found = false;
auto refactored_array_open = false;
Expand All @@ -1151,6 +1175,10 @@ bool Array::use_refactored_array_open() const {
return refactored_array_open || use_refactored_query_submit();
}

uint64_t Array::timestamp_for_new_component() const {
return new_component_timestamp_.value_or(utils::time::timestamp_now_ms());
}

bool Array::use_refactored_query_submit() const {
auto found = false;
auto refactored_query_submit = false;
Expand Down Expand Up @@ -1508,7 +1536,10 @@ Status Array::load_metadata() {
"Cannot load metadata; remote array with no REST client."));
}
RETURN_NOT_OK(rest_client->get_array_metadata_from_rest(
array_uri_, timestamp_start_, timestamp_end_opened_at_, this));
array_uri_,
array_dir_timestamp_start_,
timestamp_end_opened_at(),
this));
} else {
do_load_metadata();
}
Expand All @@ -1524,7 +1555,7 @@ Status Array::load_remote_non_empty_domain() {
"Cannot load metadata; remote array with no REST client."));
}
RETURN_NOT_OK(rest_client->get_array_non_empty_domain(
this, timestamp_start_, timestamp_end_opened_at_));
this, array_dir_timestamp_start_, timestamp_end_opened_at()));
non_empty_domain_computed_ = true;
}
return Status::Ok();
Expand All @@ -1546,7 +1577,11 @@ ArrayDirectory& Array::load_array_directory() {
ArrayDirectoryMode::READ;

array_dir_ = ArrayDirectory(
resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_, mode);
resources_,
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
mode);

return array_dir_;
}
Expand Down
Loading
Loading