Skip to content

Commit

Permalink
Unregister SyncSession callbacks when SyncManager is destroyed
Browse files Browse the repository at this point in the history
  • Loading branch information
danieltabacaru committed Aug 7, 2024
1 parent 2c3c3a0 commit ac499d0
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
38 changes: 36 additions & 2 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,19 @@ SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr<DB> db, co

void SyncSession::detach_from_sync_manager()
{
shutdown_and_wait();
// Unregister all callbacks when the App and SyncManager are destroyed.
{
util::CheckedLockGuard lk(m_state_mutex);
m_completion_callbacks.clear();
}
{
util::CheckedLockGuard lk(m_connection_state_mutex);
m_connection_change_notifier.remove_callbacks();
}
m_progress_notifier.unregister_callbacks();

static constexpr bool cancel_subscription_notifications = false;
shutdown_and_wait(cancel_subscription_notifications);
util::CheckedLockGuard lk(m_state_mutex);
m_sync_manager = nullptr;
}
Expand Down Expand Up @@ -1178,6 +1190,12 @@ void SyncSession::close(util::CheckedUniqueLock lock)
}

void SyncSession::shutdown_and_wait()
{
static constexpr bool cancel_subscription_notifications = true;
shutdown_and_wait(cancel_subscription_notifications);
}

void SyncSession::shutdown_and_wait(bool cancel_subscription_notifications)
{
{
// Transition immediately to `inactive` state. Calling this function must guarantee that any
Expand All @@ -1188,7 +1206,7 @@ void SyncSession::shutdown_and_wait()
// `inactive` state after the invocation of shutdown_and_wait().
util::CheckedUniqueLock lock(m_state_mutex);
if (m_state != State::Inactive && m_state != State::Paused) {
become_inactive(std::move(lock));
become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications);
}
}
m_client.wait_for_session_terminations();
Expand Down Expand Up @@ -1555,6 +1573,14 @@ void SyncProgressNotifier::unregister_callback(uint64_t token)
m_packages.erase(token);
}

void SyncProgressNotifier::unregister_callbacks()
{
std::lock_guard<std::mutex> lock(m_mutex);
m_packages.clear();
m_current_progress.reset();
m_local_transaction_version = 0;
}

void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
uint64_t snapshot_version, double download_estimate, double upload_estimate,
int64_t query_version)
Expand Down Expand Up @@ -1662,6 +1688,14 @@ void SyncSession::ConnectionChangeNotifier::remove_callback(uint64_t token)
}
}

void SyncSession::ConnectionChangeNotifier::remove_callbacks()
{
std::lock_guard<std::mutex> lock(m_callback_mutex);
m_callbacks.clear();
m_callback_count = -1;
m_callback_index = -1;
}

void SyncSession::ConnectionChangeNotifier::invoke_callbacks(ConnectionState old_state, ConnectionState new_state)
{
std::unique_lock lock(m_callback_mutex);
Expand Down
6 changes: 4 additions & 2 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class SyncProgressNotifier {
uint64_t register_callback(std::function<ProgressNotifierCallback>, NotifierType direction, bool is_streaming,
int64_t pending_query_version);
void unregister_callback(uint64_t);
void unregister_callbacks();

void set_local_version(uint64_t);
void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
Expand Down Expand Up @@ -368,6 +369,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
public:
uint64_t add_callback(std::function<ConnectionStateChangeCallback> callback);
void remove_callback(uint64_t token);
void remove_callbacks();
void invoke_callbacks(ConnectionState old_state, ConnectionState new_state);

private:
Expand All @@ -394,8 +396,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
}
// }

std::shared_ptr<SyncManager> sync_manager() const REQUIRES(!m_state_mutex);

static util::UniqueFunction<void(std::optional<app::AppError>)>
handle_refresh(const std::shared_ptr<SyncSession>&, bool);

Expand Down Expand Up @@ -432,6 +432,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void did_drop_external_reference()
REQUIRES(!m_state_mutex, !m_config_mutex, !m_external_reference_mutex, !m_connection_state_mutex);
void close(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_config_mutex, !m_connection_state_mutex);
void shutdown_and_wait(bool cancel_subscription_notifications)
REQUIRES(!m_state_mutex, !m_connection_state_mutex);

void become_active() REQUIRES(m_state_mutex, !m_config_mutex);
void become_dying(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
Expand Down
20 changes: 20 additions & 0 deletions test/object-store/sync/session/connection_change_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,24 @@ TEST_CASE("sync: Connection state changes", "[sync][session][connection change]"
REQUIRE(listener1_call_cnt == 1); // Only called once before unregister
REQUIRE(listener2_called);
}

SECTION("Callback not invoked when SyncSession is detached from SyncManager") {
auto session = sync_session(
user, "/connection-state-changes-1", [](auto, auto) {}, SyncSessionStopPolicy::AfterChangesUploaded);

EventLoop::main().run_until([&] {
return sessions_are_active(*session);
});
EventLoop::main().run_until([&] {
return sessions_are_connected(*session);
});

bool listener_called = false;
session->register_connection_change_callback([&](SyncSession::ConnectionState, SyncSession::ConnectionState) {
listener_called = true;
});

session->detach_from_sync_manager();
REQUIRE_FALSE(listener_called);
}
}

0 comments on commit ac499d0

Please sign in to comment.