Skip to content

Commit

Permalink
Do more precise checking of when all recovered changesets have been u…
Browse files Browse the repository at this point in the history
…ploaded
  • Loading branch information
tgoyne committed Aug 9, 2024
1 parent 1f7c86a commit edc1174
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 197 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* `Realm::convert()` would sometimes incorrectly throw an exception claiming that there were unuploaded local changes when the source Realm is a synchronized Realm ([#7966](https://github.com/realm/realm-core/issues/7966), since v10.7.0).
* Automatic client reset handling now reports download completion as soon as all changes from the newly downloaded file have been applied to the main Realm file rather than at an inconsistent time afterwards ([PR #7921](https://github.com/realm/realm-core/pull/7921)).
* Cycle detection for automatic client reset handling would sometimes consider two client resets in a row to be a cycle even when the first reset did not recover any changes and so could not have triggered the second. ([PR #7921](https://github.com/realm/realm-core/pull/7921)).
* Cycle detection for automatic client reset handling is now more precise. Previously errors which occurred after all recovered changesets were uploaded would sometimes be incorrectly considered a cycle and skip automatic handling. ([PR #7921](https://github.com/realm/realm-core/pull/7921)).

### Breaking changes
* None.
Expand Down
5 changes: 5 additions & 0 deletions src/realm/object-store/shared_realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,11 @@ void Realm::convert(const Config& config, bool merge_into_existing)
}
}

bool Realm::has_pending_unuploaded_changes() const noexcept
{
return !m_transaction->get_history()->no_pending_local_changes(m_transaction->get_version());
}

OwnedBinaryData Realm::write_copy()
{
verify_thread();
Expand Down
2 changes: 2 additions & 0 deletions src/realm/object-store/shared_realm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ class Realm : public std::enable_shared_from_this<Realm> {
// Returns `true` if the Realm is frozen, `false` otherwise.
bool is_frozen() const;

bool has_pending_unuploaded_changes() const noexcept;

// Returns true if the Realm is either in a read or frozen transaction
bool is_in_read_transaction() const
{
Expand Down
5 changes: 1 addition & 4 deletions src/realm/sync/network/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1414,10 +1414,7 @@ class Service::Impl {
bool resolver_thread_started = m_resolver_thread.joinable();
if (resolver_thread_started)
return;
auto func = [this]() noexcept {
resolver_thread();
};
m_resolver_thread = std::thread{std::move(func)};
m_resolver_thread = std::thread{&Impl::resolver_thread, this};
}

void add_wait_oper(LendersWaitOperPtr op)
Expand Down
6 changes: 2 additions & 4 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void ClientHistory::set_history_adjustments(
for (size_t i = 0, size = m_arrays->remote_versions.size(); i < size; ++i) {
m_arrays->remote_versions.set(i, server_version.version);
version_type version = m_sync_history_base_version + i;
logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version + 1,
logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version,
m_arrays->changesets.get(i).size(), server_version.version);
}
}
Expand Down Expand Up @@ -956,9 +956,7 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada
// server. The situation we want to avoid is that a recovery itself causes another reset
// which creates a reset cycle. However, at this point, upload progress has been made
// and we can remove the cycle detection flag if there is one.
if (PendingResetStore::clear_pending_reset(*m_group)) {
logger.info(util::LogCategory::reset, "Clearing pending reset tracker after upload completion.");
}
PendingResetStore::remove_if_complete(*m_group, progress.upload.client_version, logger);
}

m_progress_download = progress.download;
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,6 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
if (m_websocket_error_received)
return;

m_sending_session = sess;
m_sending = true;
m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
if (sentinel->destroyed) {
return;
Expand All @@ -993,6 +991,8 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
}
handle_write_message(); // Throws
}); // Throws
m_sending_session = sess;
m_sending = true;
}


Expand Down
6 changes: 6 additions & 0 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,12 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, ut
"Immediately removing client reset tracker as there are no recovered changesets to upload.");
sync::PendingResetStore::clear_pending_reset(*wt_local);
}
else {
logger.debug(util::LogCategory::reset,
"Marking %1 as the version which must be uploaded to complete client reset recovery.",
recovered.back().version);
sync::PendingResetStore::set_recovered_version(*wt_local, recovered.back().version);
}

wt_local->commit_and_continue_as_read();

Expand Down
195 changes: 120 additions & 75 deletions src/realm/sync/noinst/pending_reset_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,117 @@ bool operator==(const sync::PendingReset& lhs, const PendingReset::Action& actio
return lhs.action == action;
}

namespace {
// A table without a "class_" prefix will not generate sync instructions.
constexpr static std::string_view s_meta_reset_table_name("client_reset_metadata");
constexpr static std::string_view s_version_col_name("core_version");
constexpr static std::string_view s_timestamp_col_name("time");
constexpr static std::string_view s_reset_recovery_mode_col_name("mode");
constexpr static std::string_view s_reset_action_col_name("action");
constexpr static std::string_view s_reset_error_code_col_name("error_code");
constexpr static std::string_view s_reset_error_msg_col_name("error_msg");

bool PendingResetStore::clear_pending_reset(Group& group)
constexpr std::string_view s_meta_reset_table_name("client_reset_metadata");
constexpr std::string_view s_core_version_col_name("core_version");
constexpr std::string_view s_recovered_version_col_name("recovered_version");
constexpr std::string_view s_timestamp_col_name("time");
constexpr std::string_view s_reset_recovery_mode_col_name("mode");
constexpr std::string_view s_reset_action_col_name("action");
constexpr std::string_view s_reset_error_code_col_name("error_code");
constexpr std::string_view s_reset_error_msg_col_name("error_msg");

int64_t from_reset_action(PendingReset::Action action)
{
switch (action) {
case PendingReset::Action::ClientReset:
return 1;
case PendingReset::Action::ClientResetNoRecovery:
return 2;
case PendingReset::Action::MigrateToFLX:
return 3;
case PendingReset::Action::RevertToPBS:
return 4;
default:
throw ClientResetFailed(util::format("Unsupported client reset action: %1 for pending reset", action));
}
}

PendingReset::Action to_reset_action(int64_t action)
{
switch (action) {
case 1:
return PendingReset::Action::ClientReset;
case 2:
return PendingReset::Action::ClientResetNoRecovery;
case 3:
return PendingReset::Action::MigrateToFLX;
case 4:
return PendingReset::Action::RevertToPBS;
default:
return PendingReset::Action::NoAction;
}
}

ClientResyncMode to_resync_mode(int64_t mode)
{
// Retains compatibility with v1
// RecoverOrDiscard is treated as Recover and is not stored
switch (mode) {
case 0: // DiscardLocal
return ClientResyncMode::DiscardLocal;
case 1: // Recover
return ClientResyncMode::Recover;
default:
throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode));
}
}

int64_t from_resync_mode(ClientResyncMode mode)
{
// Retains compatibility with v1
switch (mode) {
case ClientResyncMode::DiscardLocal:
return 0; // DiscardLocal
case ClientResyncMode::RecoverOrDiscard:
[[fallthrough]]; // RecoverOrDiscard is treated as Recover
case ClientResyncMode::Recover:
return 1; // Recover
default:
throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode));
}
}
} // namespace

void PendingResetStore::clear_pending_reset(Group& group)
{
if (auto table = group.get_table(s_meta_reset_table_name); table && !table->is_empty()) {
table->clear();
return true;
}
return false;
}

void PendingResetStore::remove_if_complete(Group& group, version_type version, util::Logger& logger)
{
auto table = group.get_table(s_meta_reset_table_name);
if (!table || table->is_empty())
return;

auto reset_store = PendingResetStore::load_schema(group);
if (!reset_store) {
logger.info(util::LogCategory::reset, "Clearing pending reset tracker created by different core version.");
table->clear();
return;
}

auto reset_entry = *table->begin();
if (reset_entry.get<String>(reset_store->m_core_version) != REALM_VERSION_STRING) {
logger.info(util::LogCategory::reset, "Clearing pending reset tracker created by different core version.");
table->clear();
return;
}

auto target_version = version_type(reset_entry.get<Int>(reset_store->m_recovered_version));
if (target_version > version) {
logger.detail(util::LogCategory::reset, "Pending reset not complete: uploaded %1 but need to reach %2",
version, target_version);
return;
}

logger.info(util::LogCategory::reset,
"Clearing pending reset tracker after upload of version %1 has been acknowledged by server.",
target_version);
table->clear();
}

std::optional<PendingReset> PendingResetStore::has_pending_reset(const Group& group)
Expand All @@ -86,7 +181,7 @@ std::optional<PendingReset> PendingResetStore::has_pending_reset(const Group& gr
return std::nullopt;
}
auto reset_entry = *table->begin();
if (reset_entry.get<String>(reset_store->m_version) != REALM_VERSION_STRING) {
if (reset_entry.get<String>(reset_store->m_core_version) != REALM_VERSION_STRING) {
// Previous pending reset was written by a different version, so ignore it
return std::nullopt;
}
Expand All @@ -113,7 +208,7 @@ void PendingResetStore::track_reset(Group& group, ClientResyncMode mode, Pending
REALM_ASSERT(table);
table->clear();
table->create_object(null_key, {
{reset_store.m_version, Mixed(REALM_VERSION_STRING)},
{reset_store.m_core_version, Mixed(REALM_VERSION_STRING)},
{reset_store.m_timestamp, Timestamp(std::chrono::system_clock::now())},
{reset_store.m_recovery_mode, from_resync_mode(mode)},
{reset_store.m_action, from_reset_action(action)},
Expand All @@ -122,12 +217,23 @@ void PendingResetStore::track_reset(Group& group, ClientResyncMode mode, Pending
});
}

void PendingResetStore::set_recovered_version(Group& group, version_type version)
{
auto reset_store = PendingResetStore::load_schema(group);
REALM_ASSERT(reset_store);
auto table = group.get_table(reset_store->m_pending_reset_table);
REALM_ASSERT(table);
REALM_ASSERT(!table->is_empty());
table->begin()->set(reset_store->m_recovered_version, int64_t(version));
}

PendingResetStore::PendingResetStore(const Group& g)
: m_internal_tables{
{&m_pending_reset_table,
s_meta_reset_table_name,
{
{&m_version, s_version_col_name, type_String},
{&m_core_version, s_core_version_col_name, type_String},
{&m_recovered_version, s_recovered_version_col_name, type_Int},
{&m_timestamp, s_timestamp_col_name, type_Timestamp},
{&m_recovery_mode, s_reset_recovery_mode_col_name, type_Int},
{&m_action, s_reset_action_col_name, type_Int},
Expand Down Expand Up @@ -164,65 +270,4 @@ PendingResetStore PendingResetStore::load_or_create_schema(Group& group)
return reset_store;
}

int64_t PendingResetStore::from_reset_action(PendingReset::Action action)
{
switch (action) {
case PendingReset::Action::ClientReset:
return 1;
case PendingReset::Action::ClientResetNoRecovery:
return 2;
case PendingReset::Action::MigrateToFLX:
return 3;
case PendingReset::Action::RevertToPBS:
return 4;
default:
throw ClientResetFailed(util::format("Unsupported client reset action: %1 for pending reset", action));
}
}

PendingReset::Action PendingResetStore::to_reset_action(int64_t action)
{
switch (action) {
case 1:
return PendingReset::Action::ClientReset;
case 2:
return PendingReset::Action::ClientResetNoRecovery;
case 3:
return PendingReset::Action::MigrateToFLX;
case 4:
return PendingReset::Action::RevertToPBS;
default:
return PendingReset::Action::NoAction;
}
}

ClientResyncMode PendingResetStore::to_resync_mode(int64_t mode)
{
// Retains compatibility with v1
// RecoverOrDiscard is treated as Recover and is not stored
switch (mode) {
case 0: // DiscardLocal
return ClientResyncMode::DiscardLocal;
case 1: // Recover
return ClientResyncMode::Recover;
default:
throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode));
}
}

int64_t PendingResetStore::from_resync_mode(ClientResyncMode mode)
{
// Retains compatibility with v1
switch (mode) {
case ClientResyncMode::DiscardLocal:
return 0; // DiscardLocal
case ClientResyncMode::RecoverOrDiscard:
[[fallthrough]]; // RecoverOrDiscard is treated as Recover
case ClientResyncMode::Recover:
return 1; // Recover
default:
throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode));
}
}

} // namespace realm::sync
22 changes: 12 additions & 10 deletions src/realm/sync/noinst/pending_reset_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,30 @@ bool operator==(const sync::PendingReset& lhs, const PendingReset::Action& actio

class PendingResetStore {
public:
// Store the pending reset tracking information - it is an error if the tracking info already
// exists in the store
// Store the pending reset tracking information. Any pre-existing tracking
// will be deleted and replaced with this.
// Requires a writable transaction and changes must be committed manually
static void track_reset(Group& group, ClientResyncMode mode, PendingReset::Action action, Status error);
// Record the version of the final recovered changeset that must be uploaded
// for a client reset to be complete. Not called for DiscardLocal or if there
// was nothing to recover.
static void set_recovered_version(Group&, version_type);
// Clear the pending reset tracking information, if it exists
// Requires a writable transaction and changes must be committed manually
// Returns true if there was anything to remove
static bool clear_pending_reset(Group& group);
static void clear_pending_reset(Group& group);
// Remove the pending reset tracking information if it exists and the version
// set with set_recovered_version() is less than or equal to version.
static void remove_if_complete(Group& group, version_type version, util::Logger&);
static std::optional<PendingReset> has_pending_reset(const Group& group);

static int64_t from_reset_action(PendingReset::Action action);
static PendingReset::Action to_reset_action(int64_t action);
static ClientResyncMode to_resync_mode(int64_t mode);
static int64_t from_resync_mode(ClientResyncMode mode);

private:
// The instantiated class is only used internally
PendingResetStore(const Group& group);

std::vector<SyncMetadataTable> m_internal_tables;
TableKey m_pending_reset_table;
ColKey m_version;
ColKey m_core_version;
ColKey m_recovered_version;
ColKey m_timestamp;
ColKey m_recovery_mode;
ColKey m_action;
Expand Down
Loading

0 comments on commit edc1174

Please sign in to comment.