8000 Mitigate races in accessing `m_initated` and `m_finalized` in various REALM_ASSERTs by jbreams · Pull Request #7338 · realm/realm-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Mitigate races in accessing m_initated and m_finalized in various REALM_ASSERTs #7338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* Fixed crash in fulltext index using prefix search with no matches ([#7309](https://github.com/realm/realm-core/issues/7309), since v13.18.0)
* Fix compilation and some warnings when building with Xcode 15.3 ([PR #7297](https://github.com/realm/realm-core/pull/7297)).
* util::make_dir_recursive() attempted to create a directory named "\0" if the path did not have a trailing slash ([PR #7329](https://github.com/realm/realm-core/pull/7329)).
* Fixed a crash with `Assertion failed: m_initiated` during sync session startup ([#7074](https://github.com/realm/realm-core/issues/7074), since v10.0.0).
* Fixed a TSAN violation where the user thread could race to read `m_finalized` with the sync event loop ([#6844](https://github.com/realm/realm-core/issues/6844), since v13.15.1)

### Breaking changes
* None.
Expand Down
122 changes: 68 additions & 54 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

std::string get_appservices_connection_id();

protected:
friend class ClientImpl;

// m_initiated/m_abandoned is used to check that we aren't trying to update immutable properties like the progress
// handler or connection state listener after we've bound the session. We read the variable a bunch in
// REALM_ASSERTS on the event loop and on the user's thread, but we only set it once and while we're registering
// the session wrapper to be actualized. This function gets called from
// ClientImpl::register_unactualized_session_wrapper() to synchronize updating this variable on the main thread
// with reading the variable on the event loop.
void mark_initiated();
void mark_abandoned();

private:
ClientImpl& m_client;
DBRef m_db;
Expand Down Expand Up @@ -200,6 +212,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

bool m_suspended = false;

// Set when the session has been abandoned, but before it's been finalized.
bool m_abandoned = false;
// Has the SessionWrapper been finalized?
bool m_finalized = false;

Expand Down Expand Up @@ -310,7 +324,6 @@ SessionWrapperStack::~SessionWrapperStack()

// ################ ClientImpl ################


ClientImpl::~ClientImpl()
{
// Since no other thread is allowed to be accessing this client or any of
Expand Down Expand Up @@ -511,51 +524,37 @@ void ClientImpl::shutdown() noexcept
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
{
// Thread safety required.

std::lock_guard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
bool retrigger = !m_actualize_and_finalize_needed;
m_actualize_and_finalize_needed = true;
// The conditional triggering needs to happen before releasing the mutex,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this guarantee as an optimization, but I wonder if it has other implications (cannot see any)

// because if two threads call register_unactualized_session_wrapper()
// roughly concurrently, then only the first one is guaranteed to be asked
// to retrigger, but that retriggering must have happened before the other
// thread returns from register_unactualized_session_wrapper().
//
// Note that a similar argument applies when two threads call
// register_abandoned_session_wrapper(), and when one thread calls one of
// them and another thread call the other.
if (retrigger)
m_actualize_and_finalize->trigger();
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just moves the lock_guard into its own scope, since calling m_actualize_and_finalize->trigger() wakes up the event loop thread which will immediately try to acquire m_mutex and wait on a cond var - so this should hopefully reduce the likelihood the event loop will have to block for the mutex.

std::lock_guard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
wrapper->mark_initiated();
m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
}
m_actualize_and_finalize->trigger();
}


void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
{
// Thread safety required.

std::lock_guard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);

// If the session wrapper has not yet been actualized (on the event loop
// thread), it can be immediately finalized. This ensures that we will
// generally not actualize a session wrapper that has already been
// abandoned.
auto i = m_unactualized_session_wrappers.find(wrapper.get());
if (i != m_unactualized_session_wrappers.end()) {
m_unactualized_session_wrappers.erase(i);
wrapper->finalize_before_actualization();
return;
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just moves the lock_guard into its own scope, since calling m_actualize_and_finalize->trigger() wakes up the event loop thread which will immediately try to acquire m_mutex and wait on a cond var - so this should hopefully reduce the likelihood the event loop will have to block for the mutex.

std::lock_guard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
wrapper->mark_abandoned();

// If the session wrapper has not yet been actualized (on the event loop
// thread), it can be immediately finalized. This ensures that we will
// generally not actualize a session wrapper that has already been
// abandoned.
auto i = m_unactualized_session_wrappers.find(wrapper.get());
if (i != m_unactualized_session_wrappers.end()) {
m_unactualized_session_wrappers.erase(i);
wrapper->finalize_before_actualization();
return;
}
m_abandoned_session_wrappers.push(std::move(wrapper));
}
m_abandoned_session_wrappers.push(std::move(wrapper));
bool retrigger = !m_actualize_and_finalize_needed;
m_actualize_and_finalize_needed = true;
// The conditional triggering needs to happen before releasing the
// mutex. See implementation of register_unactualized_session_wrapper() for
// details.
if (retrigger)
m_actualize_and_finalize->trigger();
m_actualize_and_finalize->trigger();
}


Expand All @@ -567,7 +566,6 @@ void ClientImpl::actualize_and_finalize_session_wrappers()
bool stopped;
{
std::lock_guard lock{m_mutex};
m_actualize_and_finalize_needed = false;
swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
stopped = m_stopped;
Expand Down Expand Up @@ -1279,6 +1277,21 @@ MigrationStore* SessionWrapper::get_migration_store()
return m_migration_store.get();
}

inline void SessionWrapper::mark_initiated()
{
REALM_ASSERT(!m_initiated);
REALM_ASSERT(!m_abandoned);
m_initiated = true;
}


inline void SessionWrapper::mark_abandoned()
{
REALM_ASSERT(!m_abandoned);
m_abandoned = true;
}


inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
{
REALM_ASSERT(!m_initiated);
Expand All @@ -1296,10 +1309,8 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<Connec

void SessionWrapper::initiate()
{
REALM_ASSERT(!m_initiated);
ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
m_initiated = true;
m_db->add_commit_listener(this);
}

Expand All @@ -1309,10 +1320,6 @@ void SessionWrapper::on_commit(version_type new_version)
// Thread safety required
REALM_ASSERT(m_initiated);

if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
return;
}

util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self), new_version](Status status) {
if (status == ErrorCodes::OperationAborted)
Expand All @@ -1321,6 +1328,10 @@ void SessionWrapper::on_commit(version_type new_version)
throw Exception(status);

REALM_ASSERT(self->m_actualized);
if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
return;
}

if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
Expand All @@ -1336,10 +1347,6 @@ void SessionWrapper::cancel_reconnect_delay()
// Thread safety required
REALM_ASSERT(m_initiated);

if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
return;
}

util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self)](Status status) {
if (status == ErrorCodes::OperationAborted)
Expand All @@ -1348,6 +1355,10 @@ void SessionWrapper::cancel_reconnect_delay()
throw Exception(status);

REALM_ASSERT(self->m_actualized);
if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
return;
}

if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
Expand All @@ -1362,7 +1373,6 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple
{
REALM_ASSERT(upload_completion || download_completion);
REALM_ASSERT(m_initiated);
REALM_ASSERT(!m_finalized);

util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
Expand Down Expand Up @@ -1405,7 +1415,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
{
// Thread safety required
REALM_ASSERT(m_initiated);
REALM_ASSERT(!m_finalized);
REALM_ASSERT(!m_abandoned);

std::int_fast64_t target_mark;
{
Expand All @@ -1421,6 +1431,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
throw Exception(status);

REALM_ASSERT(self->m_actualized);
REALM_ASSERT(!self->m_finalized);
// The session wrapper may already have been finalized. This can only
// happen if it was abandoned, but in that case, the call of
// wait_for_upload_complete_or_client_stopped() must have returned
Expand Down Expand Up @@ -1449,7 +1460,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped()
{
// Thread safety required
REALM_ASSERT(m_initiated);
REALM_ASSERT(!m_finalized);
REALM_ASSERT(!m_abandoned);

std::int_fast64_t target_mark;
{
Expand All @@ -1465,6 +1476,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped()
throw Exception(status);

REALM_ASSERT(self->m_actualized);
REALM_ASSERT(!self->m_finalized);
// The session wrapper may already have been finalized. This can only
// happen if it was abandoned, but in that case, the call of
// wait_for_download_complete_or_client_stopped() must have returned
Expand Down Expand Up @@ -1493,7 +1505,7 @@ void SessionWrapper::refresh(std::string signed_access_token)
{
// Thread safety required
REALM_ASSERT(m_initiated);
REALM_ASSERT(!m_finalized);
REALM_ASSERT(!m_abandoned);

m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
if (status == ErrorCodes::OperationAborted)
Expand Down Expand Up @@ -1527,6 +1539,7 @@ inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noex
// Must be called from event loop thread
void SessionWrapper::actualize(ServerEndpoint endpoint)
{
REALM_ASSERT_DEBUG(m_initiated);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this assert reproduced the bug immediately.

REALM_ASSERT(!m_actualized);
REALM_ASSERT(!m_sess);
// Cannot be actualized if it's already been finalized or force closed
Expand Down Expand Up @@ -1617,6 +1630,7 @@ void SessionWrapper::force_close()
void SessionWrapper::finalize()
{
REALM_ASSERT(m_actualized);
REALM_ASSERT(m_abandoned);

// Already finalized?
if (m_finalized) {
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ class ClientImpl {

bool m_stopped = false; // Protected by `m_mutex`
bool m_sessions_terminated = false; // Protected by `m_mutex`
bool m_actualize_and_finalize_needed = false; // Protected by `m_mutex`

// The set of session wrappers that are not yet wrapping a session object,
// and are not yet abandoned (still referenced by the application).
Expand Down
0