Skip to content

Commit

Permalink
Treat completing a client reset as receiving a MARK message
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed Aug 5, 2024
1 parent bf62783 commit 329c1c6
Show file tree
Hide file tree
Showing 22 changed files with 414 additions and 303 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Automatic client reset handling now reports download completion as soon as all changes from the newly downloaded file have been applied to the main Realm file rather than at an inconsistent time afterwards ([PR #7921](https://github.com/realm/realm-core/pull/7921)).
* Cycle detection for automatic client reset handling would sometimes consider two client resets in a row to be a cycle even when the first reset did not recover any changes and so could not have triggered the second. ([PR #7921](https://github.com/realm/realm-core/pull/7921)).

### Breaking changes
* None.
Expand Down
59 changes: 5 additions & 54 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
// Can be called from any thread.
util::Future<std::string> send_test_command(std::string body);

void handle_pending_client_reset_acknowledgement();

// Can be called from any thread.
std::string get_appservices_connection_id();

Expand Down Expand Up @@ -779,14 +777,6 @@ void SessionImpl::on_resumed()
}
}

void SessionImpl::handle_pending_client_reset_acknowledgement()
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.handle_pending_client_reset_acknowledgement();
}
}

bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
{
// Ignore the message if the session is not active or a steady state message
Expand Down Expand Up @@ -1354,8 +1344,12 @@ void SessionWrapper::actualize()
}
}

if (!m_client_reset_config)
if (!m_client_reset_config) {
check_progress(); // Throws
if (auto pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen())) {
m_sess->logger.info(util::LogCategory::reset, "Found pending client reset tracker: %1", *pending_reset);
}
}
}

void SessionWrapper::force_close()
Expand Down Expand Up @@ -1651,49 +1645,6 @@ util::Future<std::string> SessionWrapper::send_test_command(std::string body)
return m_sess->send_test_command(std::move(body));
}

void SessionWrapper::handle_pending_client_reset_acknowledgement()
{
REALM_ASSERT(!m_finalized);

auto has_pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen());
if (!has_pending_reset) {
return; // nothing to do
}

m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);

// Now that the client reset merge is complete, wait for the changes to synchronize with the server
async_wait_for(
true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
if (status == ErrorCodes::OperationAborted) {
return;
}
auto& logger = self->m_sess->logger;
if (!status.is_ok()) {
logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
status);
return;
}

logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);

auto tr = self->m_db->start_write();
auto cur_pending_reset = PendingResetStore::has_pending_reset(*tr);
if (!cur_pending_reset) {
logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
return;
}
if (*cur_pending_reset == pending_reset) {
logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
}
else {
logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
}
PendingResetStore::clear_pending_reset(*tr);
tr->commit();
});
}

std::string SessionWrapper::get_appservices_connection_id()
{
auto pf = util::make_promise_future<std::string>();
Expand Down
2 changes: 0 additions & 2 deletions src/realm/sync/network/default_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
initiate_resolve();
}

virtual ~DefaultWebSocketImpl() = default;

void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
{
m_websocket.async_write_binary(data.data(), data.size(),
Expand Down
28 changes: 21 additions & 7 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <realm/sync/instruction_replication.hpp>
#include <realm/sync/noinst/client_reset.hpp>
#include <realm/sync/noinst/client_reset_recovery.hpp>
#include <realm/sync/noinst/pending_reset_store.hpp>
#include <realm/transaction.hpp>
#include <realm/util/compression.hpp>
#include <realm/util/features.h>
Expand Down Expand Up @@ -335,16 +336,15 @@ void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, boo
}


// Overriding member function in realm::sync::ClientHistoryBase
void ClientHistory::set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
VersionInfo& version_info)
VersionInfo& version_info, util::Logger& logger)
{
TransactionRef wt = m_db->start_write(); // Throws
version_type local_version = wt->get_version();
ensure_updated(local_version); // Throws
prepare_for_write(); // Throws

update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws

// Note: This transaction produces an empty changeset. Empty changesets are
// not uploaded to the server.
Expand Down Expand Up @@ -489,17 +489,17 @@ void ClientHistory::integrate_server_changesets(
// During the bootstrap phase in flexible sync, the server sends multiple download messages with the same
// synthetic server version that represents synthetic changesets generated from state on the server.
if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) {
update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws
}
// Always update progress for download messages from steady state.
else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) {
auto partial_progress = progress;
partial_progress.download.server_version = last_changeset.remote_version;
partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version;
update_sync_progress(partial_progress, downloadable_bytes); // Throws
update_sync_progress(partial_progress, downloadable_bytes, logger); // Throws
}
else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) {
update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws
}
if (run_in_write_tr) {
run_in_write_tr(*transact, changesets_for_cb);
Expand Down Expand Up @@ -876,7 +876,8 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
}


void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes)
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
util::Logger& logger)
{
Array& root = m_arrays->root;

Expand Down Expand Up @@ -947,6 +948,19 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada
root.set(s_progress_uploaded_bytes_iip,
RefOrTagged::make_tagged(uploaded_bytes)); // Throws

if (previous_upload_client_version < progress.upload.client_version) {
// This is part of the client reset cycle detection.
// A client reset operation will write a flag to an internal table indicating that
// the changes there are a result of a successful reset. However, it is not possible to
// know if a recovery has been successful until the changes have been acknowledged by the
// server. The situation we want to avoid is that a recovery itself causes another reset
// which creates a reset cycle. However, at this point, upload progress has been made
// and we can remove the cycle detection flag if there is one.
if (PendingResetStore::clear_pending_reset(*m_group)) {
logger.info(util::LogCategory::reset, "Clearing pending reset tracker after upload completion.");
}
}

m_progress_download = progress.download;

trim_sync_history(); // Throws
Expand Down
5 changes: 3 additions & 2 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class ClientHistory final : public _impl::History, public TransformHistory {
/// \param downloadable_bytes If specified, and if the implementation cares
/// about byte-level progress, this function updates the persistent record
/// of the estimate of the number of remaining bytes to be downloaded.
void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&);
void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&,
util::Logger& logger);

/// \brief Scan through the history for changesets to be uploaded.
///
Expand Down Expand Up @@ -421,7 +422,7 @@ class ClientHistory final : public _impl::History, public TransformHistory {
void prepare_for_write();
Replication::version_type add_changeset(BinaryData changeset, BinaryData sync_changeset);
void add_sync_history_entry(const HistoryEntry&);
void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes);
void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes, util::Logger& logger);
void trim_ct_history();
void trim_sync_history();
void do_trim_sync_history(std::size_t n);
Expand Down
27 changes: 13 additions & 14 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,8 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
if (m_websocket_error_received)
return;

m_sending_session = sess;
m_sending = true;
m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
if (sentinel->destroyed) {
return;
Expand All @@ -991,8 +993,6 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
}
handle_write_message(); // Throws
}); // Throws
m_sending_session = sess;
m_sending = true;
}


Expand Down Expand Up @@ -1571,7 +1571,7 @@ void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast6
"received empty download message that was not the last in batch",
ProtocolError::bad_progress);
}
history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
history.set_sync_progress(progress, downloadable_bytes, version_info, logger); // Throws
return;
}

Expand Down Expand Up @@ -1718,9 +1718,6 @@ void Session::activate()
catch (...) {
on_integration_failure(IntegrationException(exception_to_status()));
}

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
}


Expand Down Expand Up @@ -2270,16 +2267,18 @@ bool Session::client_reset_if_needed()
m_progress.download.last_integrated_client_version);
REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);

m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
// Reset the cached values which are used to calculate progress since the
// last time sync completed
init_progress_handler();
// In recovery mode, there may be new changesets to upload and nothing left to download.
// In FLX DiscardLocal mode, there may be new commits due to subscription handling.
// For both, we want to allow uploads again without needing external changes to download first.
m_delay_uploads = false;

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
// Update the download progress to match what it would have been if we'd
// received a MARK message from the server (as the fresh Realm which we used
// as the source data for the reset did).
m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
m_server_version_at_last_download_mark = m_progress.download.server_version;
m_last_download_mark_received = m_last_download_mark_sent = m_target_download_mark;
check_for_download_completion();

// If a migration or rollback is in progress, mark it complete when client reset is completed.
if (auto migration_store = get_migration_store()) {
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,6 @@ class ClientImpl::Session {
void process_pending_flx_bootstrap();

bool client_reset_if_needed();
void handle_pending_client_reset_acknowledgement();

void gather_pending_compensating_writes(util::Span<Changeset> changesets, std::vector<ProtocolErrorInfo>* out);

Expand Down
8 changes: 8 additions & 0 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,14 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, ut
}
}

// If there was nothing to recover or recovery was disabled then immediately
// mark the client reset as successfully complete
if (recovered.empty()) {
logger.info(util::LogCategory::reset,
"Immediately removing client reset tracker as there are no recovered changesets to upload.");
sync::PendingResetStore::clear_pending_reset(*wt_local);
}

wt_local->commit_and_continue_as_read();

VersionID new_version_local = wt_local->get_version_of_current_transaction();
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/migration_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ bool MigrationStore::load_data(bool read_only)

auto tr = m_db->start_read();
// Start with a reader so it doesn't try to write until we are ready
SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
SyncMetadataSchemaVersionsReader schema_versions_reader(*tr);
if (auto schema_version =
schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_migration_store)) {
schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_flx_migration_store)) {
if (*schema_version != c_schema_version) {
throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion,
"Invalid schema version for flexible sync migration store metadata");
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/pending_bootstrap_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger,

auto tr = m_db->start_read();
// Start with a reader so it doesn't try to write until we are ready
SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
SyncMetadataSchemaVersionsReader schema_versions_reader(*tr);
if (auto schema_version =
schema_versions_reader.get_version_for(tr, internal_schema_groups::c_pending_bootstraps)) {
schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_pending_bootstraps)) {
if (*schema_version != c_schema_version) {
throw RuntimeError(ErrorCodes::SchemaVersionMismatch,
"Invalid schema version for FLX sync pending bootstrap table group");
Expand Down
4 changes: 3 additions & 1 deletion src/realm/sync/noinst/pending_reset_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ constexpr static std::string_view s_reset_action_col_name("action");
constexpr static std::string_view s_reset_error_code_col_name("error_code");
constexpr static std::string_view s_reset_error_msg_col_name("error_msg");

void PendingResetStore::clear_pending_reset(Group& group)
bool PendingResetStore::clear_pending_reset(Group& group)
{
if (auto table = group.get_table(s_meta_reset_table_name); table && !table->is_empty()) {
table->clear();
return true;
}
return false;
}

std::optional<PendingReset> PendingResetStore::has_pending_reset(const Group& group)
Expand Down
3 changes: 2 additions & 1 deletion src/realm/sync/noinst/pending_reset_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PendingResetStore {
static void track_reset(Group& group, ClientResyncMode mode, PendingReset::Action action, Status error);
// Clear the pending reset tracking information, if it exists
// Requires a writable transaction and changes must be committed manually
static void clear_pending_reset(Group& group);
// Returns true if there was anything to remove
static bool clear_pending_reset(Group& group);
static std::optional<PendingReset> has_pending_reset(const Group& group);

static int64_t from_reset_action(PendingReset::Action action);
Expand Down
Loading

0 comments on commit 329c1c6

Please sign in to comment.