8000 Simplify internal commit notification by tgoyne · Pull Request #7031 · realm/realm-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Simplify internal commit notification #7031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* Empty commits no longer trigger an extra invocation of the sync progress handler reporting the exact same information as the previous invocation ([PR #7031](https://github.com/realm/realm-core/pull/7031)).

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* Fixed a bug preventing SSL handshake from completing successfuly due to failed hostname verification when linking against BoringSSL. (PR [#7034](https://github.com/realm/realm-core/pull/7034))
* Updating subscriptions did not trigger Realm autorefreshes, sometimes resulting in async refresh hanging until another write was performed by something else ([PR #7031](https://github.com/realm/realm-core/pull/7031)).

### Breaking changes
* None.
Expand Down
21 changes: 21 additions & 0 deletions src/realm/db.cpp
< 8000 td class="blob-code blob-code-context js-file-line"> }
Original file line number Diff line number Diff line change
Expand Up @@ -2429,6 +2429,14 @@ Replication::version_type DB::do_commit(Transaction& transaction, bool commit_to
else {
low_level_commit(new_version, transaction); // Throws
}

{
std::lock_guard lock(m_commit_listener_mutex);
for (auto listener : m_commit_listeners) {
listener->on_commit(new_version);
}
}

return new_version;

Expand Down Expand Up @@ -2833,6 +2841,19 @@ void DB::end_write_on_correct_thread() noexcept
}
}

void DB::add_commit_listener(CommitListener* listener)
{
std::lock_guard lock(m_commit_listener_mutex);
m_commit_listeners.push_back(listener);
}

void DB::remove_commit_listener(CommitListener* listener)
{
std::lock_guard lock(m_commit_listener_mutex);
m_commit_listeners.erase(std::remove(m_commit_listeners.begin(), m_commit_listeners.end(), listener),
m_commit_listeners.end());
}

DisableReplication::DisableReplication(Transaction& t)
: m_tr(t)
, m_owner(t.get_db())
Expand Down
10 changes: 10 additions & 0 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,14 @@ class DB : public std::enable_shared_from_this<DB> {
/// To be used only when already holding the lock.
bool other_writers_waiting_for_lock() const;

struct CommitListener {
virtual ~CommitListener() = default;
virtual void on_commit(version_type new_version) = 0;
};

void add_commit_listener(CommitListener*);
void remove_commit_listener(CommitListener*);

protected:
explicit DB(const DBOptions& options);

Expand Down Expand Up @@ -504,6 +512,8 @@ class DB : public std::enable_shared_from_this<DB> {
std::function<void(int, int)> m_upgrade_callback;
std::unique_ptr<AsyncCommitHelper> m_commit_helper;
std::shared_ptr<util::Logger> m_logger;
std::mutex m_commit_listener_mutex;
std::vector<CommitListener*> m_commit_listeners;
bool m_is_sync_agent = false;
// Id for this DB to be used in logging. We will just use some bits from the pointer.
// The path cannot be used as this would not allow us to distinguish between two DBs opening
Expand Down
45 changes: 19 additions & 26 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ void RealmCoordinator::init_external_helpers()
// happens on background threads, so to avoid needing locking on every access
// we have to wire things up in a specific order.
#if REALM_ENABLE_SYNC
// We may have reused an existing sync session that outlived its original RealmCoordinator
// We may have reused an existing sync session that outlived its original
// RealmCoordinator. If not, we need to create a new one now.
if (m_config.sync_config && !m_sync_session)
m_sync_session = m_config.sync_config->user->sync_manager()->get_session(m_db, m_config);
#endif
Expand All @@ -548,18 +549,7 @@ void RealmCoordinator::init_external_helpers()
ex.code().value());
}
}

#if REALM_ENABLE_SYNC
if (m_sync_session) {
std::weak_ptr<RealmCoordinator> weak_self = shared_from_this();
SyncSession::Internal::set_sync_transact_callback(*m_sync_session, [weak_self](VersionID, VersionID) {
if (auto self = weak_self.lock()) {
if (self->m_notifier)
self->m_notifier->notify_others();
}
});
}
#endif
m_db->add_commit_listener(this);
}

void RealmCoordinator::close()
Expand Down Expand Up @@ -649,11 +639,17 @@ RealmCoordinator::~RealmCoordinator()
}
}
}

if (m_db) {
m_db->remove_commit_listener(this);
}

// Waits for the worker thread to join
m_notifier = nullptr;
m_notifier.reset();

// Ensure the notifiers aren't holding on to Transactions after we destroy
// the History object the DB depends on
// If there's any active NotificationTokens they'll keep the notifiers alive,
// so tell the notifiers to release their Transactions so that the DB can
// be closed immediately.
Comment on lines -655 to +652
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was stale and the reason why we originally needed release_data() no longer applies, but it is still required for other reasons.

// No locking needed here because the worker thread is gone
for (auto& notifier : m_new_notifiers)
notifier->release_data();
Expand Down Expand Up @@ -789,16 +785,6 @@ void RealmCoordinator::commit_write(Realm& realm, bool commit_to_disk)
}
}

#if REALM_ENABLE_SYNC
// Realm could be closed in did_change. So send sync notification first before did_change.
if (m_sync_session) {
SyncSession::Internal::nonsync_transact_notify(*m_sync_session, new_version.version);
}
#endif
if (m_notifier) {
m_notifier->notify_others();
}

if (realm.m_binding_context) {
realm.m_binding_context->did_change({}, {});
}
Expand Down Expand Up @@ -867,6 +853,13 @@ void RealmCoordinator::clean_up_dead_notifiers()
swap_remove(m_new_notifiers);
}

void RealmCoordinator::on_commit(DB::version_type)
{
if (m_notifier) {
m_notifier->notify_others();
}
}

void RealmCoordinator::on_change()
{
#if REALM_ENABLE_SYNC
Expand Down
3 changes: 2 additions & 1 deletion src/realm/object-store/impl/realm_coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class WeakRealmNotifier;

// RealmCoordinator manages the weak cache of Realm instances and communication
// between per-thread Realm instances for a given file
class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator>, DB::CommitListener {
public:
// Get the coordinator for the given path, creating it if neccesary
static std::shared_ptr<RealmCoordinator> get_coordinator(StringData path);
Expand Down Expand Up @@ -265,6 +265,7 @@ class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {

void set_config(const Realm::Config&) REQUIRES(m_realm_mutex, !m_schema_cache_mutex);
void init_external_helpers() REQUIRES(m_realm_mutex);
void on_commit(DB::version_type) override;
std::shared_ptr<Realm> do_get_cached_realm(Realm::Config const& config,
std::shared_ptr<util::Scheduler> scheduler = nullptr)
REQUIRES(m_realm_mutex);
Expand Down
30 changes: 1 addition & 29 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,19 +914,6 @@ void SyncSession::create_sync_session()

std::weak_ptr<SyncSession> weak_self = weak_from_this();

// Configure the sync transaction callback.
auto wrapped_callback = [weak_self](VersionID old_version, VersionID new_version) {
std::function<TransactionCallback> callback;
if (auto self = weak_self.lock()) {
util::CheckedLockGuard l(self->m_state_mutex);
callback = self->m_sync_transact_callback;
}
if (callback) {
callback(old_version, new_version);
}
};
m_session->set_sync_transact_callback(std::move(wrapped_callback));

// Set up the wrapped progress handler callback
m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
uint_fast64_t uploaded, uint_fast64_t uploadable,
Expand Down Expand Up @@ -974,12 +961,6 @@ void SyncSession::create_sync_session()
});
}

void SyncSession::set_sync_transact_callback(std::function<sync::Session::SyncTransactCallback>&& callback)
{
util::CheckedLockGuard l(m_state_mutex);
m_sync_transact_callback = std::move(callback);
}

void SyncSession::nonsync_transact_notify(sync::version_type version)
{
m_progress_notifier.set_local_version(version);
Expand Down Expand Up @@ -1383,16 +1364,7 @@ void SyncSession::create_subscription_store()
// remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
// will be reset when rolling back to PBS after a client FLX migration
if (!m_subscription_store_base) {
m_subscription_store_base = sync::SubscriptionStore::create(m_db, [this](int64_t new_version) {
util::CheckedLockGuard lk(m_state_mutex);
if (m_state != State::Active && m_state != State::WaitingForAccessToken) {
return;
}
// There may be no session yet (i.e., waiting to refresh the access token).
if (m_session) {
m_session->on_new_flx_sync_subscription(new_version);
}
});
m_subscription_store_base = sync::SubscriptionStore::create(m_db);
}

// m_subscription_store_base is always around for the life of SyncSession, but the
Expand Down
6 changes: 0 additions & 6 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
class Internal {
friend class _impl::RealmCoordinator;

static void set_sync_transact_callback(SyncSession& session, std::function<TransactionCallback>&& callback)
{
session.set_sync_transact_callback(std::move(callback));
}

static void nonsync_transact_notify(SyncSession& session, VersionID::version_type version)
{
session.nonsync_transact_notify(version);
Expand Down Expand Up @@ -394,7 +389,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t);
void handle_new_flx_sync_query(int64_t version);

void set_sync_transact_callback(std::function<TransactionCallback>&&) REQUIRES(!m_state_mutex);
void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex);

void create_sync_session() REQUIRES(m_state_mutex, !m_config_mutex);
Expand Down
Loading
0