-
Notifications
You must be signed in to change notification settings - Fork 178
Mitigate races in accessing m_initated
and m_finalized
in various REALM_ASSERTs
#7338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
83b2722
16d7963
f2797ad
fcd5a41
28ff4aa
7e44c7f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,6 +127,18 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener | |
|
||
std::string get_appservices_connection_id(); | ||
|
||
protected: | ||
friend class ClientImpl; | ||
|
||
// m_initiated/m_abandoned is used to check that we aren't trying to update immutable properties like the progress | ||
// handler or connection state listener after we've bound the session. We read the variable a bunch in | ||
// REALM_ASSERTS on the event loop and on the user's thread, but we only set it once and while we're registering | ||
// the session wrapper to be actualized. This function gets called from | ||
// ClientImpl::register_unactualized_session_wrapper() to synchronize updating this variable on the main thread | ||
// with reading the variable on the event loop. | ||
void mark_initiated(); | ||
void mark_abandoned(); | ||
|
||
private: | ||
ClientImpl& m_client; | ||
DBRef m_db; | ||
|
@@ -200,6 +212,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener | |
|
||
bool m_suspended = false; | ||
|
||
// Set when the session has been abandoned, but before it's been finalized. | ||
bool m_abandoned = false; | ||
// Has the SessionWrapper been finalized? | ||
bool m_finalized = false; | ||
|
||
|
@@ -310,7 +324,6 @@ SessionWrapperStack::~SessionWrapperStack() | |
|
||
// ################ ClientImpl ################ | ||
|
||
|
||
ClientImpl::~ClientImpl() | ||
{ | ||
// Since no other thread is allowed to be accessing this client or any of | ||
|
@@ -511,51 +524,37 @@ void ClientImpl::shutdown() noexcept | |
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint) | ||
{ | ||
// Thread safety required. | ||
|
||
std::lock_guard lock{m_mutex}; | ||
REALM_ASSERT(m_actualize_and_finalize); | ||
m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws | ||
bool retrigger = !m_actualize_and_finalize_needed; | ||
m_actualize_and_finalize_needed = true; | ||
// The conditional triggering needs to happen before releasing the mutex, | ||
// because if two threads call register_unactualized_session_wrapper() | ||
// roughly concurrently, then only the first one is guaranteed to be asked | ||
// to retrigger, but that retriggering must have happened before the other | ||
// thread returns from register_unactualized_session_wrapper(). | ||
// | ||
// Note that a similar argument applies when two threads call | ||
// register_abandoned_session_wrapper(), and when one thread calls one of | ||
// them and another thread call the other. | ||
if (retrigger) | ||
m_actualize_and_finalize->trigger(); | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This just moves the lock_guard into its own scope, since calling |
||
std::lock_guard lock{m_mutex}; | ||
REALM_ASSERT(m_actualize_and_finalize); | ||
wrapper->mark_initiated(); | ||
m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws | ||
} | ||
m_actualize_and_finalize->trigger(); | ||
} | ||
|
||
|
||
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept | ||
{ | ||
// Thread safety required. | ||
|
||
std::lock_guard lock{m_mutex}; | ||
REALM_ASSERT(m_actualize_and_finalize); | ||
|
||
// If the session wrapper has not yet been actualized (on the event loop | ||
// thread), it can be immediately finalized. This ensures that we will | ||
// generally not actualize a session wrapper that has already been | ||
// abandoned. | ||
auto i = m_unactualized_session_wrappers.find(wrapper.get()); | ||
if (i != m_unactualized_session_wrappers.end()) { | ||
m_unactualized_session_wrappers.erase(i); | ||
wrapper->finalize_before_actualization(); | ||
return; | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This just moves the lock_guard into its own scope, since calling |
||
std::lock_guard lock{m_mutex}; | ||
REALM_ASSERT(m_actualize_and_finalize); | ||
wrapper->mark_abandoned(); | ||
|
||
// If the session wrapper has not yet been actualized (on the event loop | ||
// thread), it can be immediately finalized. This ensures that we will | ||
// generally not actualize a session wrapper that has already been | ||
// abandoned. | ||
auto i = m_unactualized_session_wrappers.find(wrapper.get()); | ||
if (i != m_unactualized_session_wrappers.end()) { | ||
m_unactualized_session_wrappers.erase(i); | ||
wrapper->finalize_before_actualization(); | ||
return; | ||
} | ||
m_abandoned_session_wrappers.push(std::move(wrapper)); | ||
} | ||
m_abandoned_session_wrappers.push(std::move(wrapper)); | ||
bool retrigger = !m_actualize_and_finalize_needed; | ||
m_actualize_and_finalize_needed = true; | ||
// The conditional triggering needs to happen before releasing the | ||
// mutex. See implementation of register_unactualized_session_wrapper() for | ||
// details. | ||
if (retrigger) | ||
m_actualize_and_finalize->trigger(); | ||
m_actualize_and_finalize->trigger(); | ||
} | ||
|
||
|
||
|
@@ -567,7 +566,6 @@ void ClientImpl::actualize_and_finalize_session_wrappers() | |
bool stopped; | ||
{ | ||
std::lock_guard lock{m_mutex}; | ||
m_actualize_and_finalize_needed = false; | ||
swap(m_unactualized_session_wrappers, unactualized_session_wrappers); | ||
swap(m_abandoned_session_wrappers, abandoned_session_wrappers); | ||
stopped = m_stopped; | ||
|
@@ -1279,6 +1277,21 @@ MigrationStore* SessionWrapper::get_migration_store() | |
return m_migration_store.get(); | ||
} | ||
|
||
inline void SessionWrapper::mark_initiated() | ||
{ | ||
REALM_ASSERT(!m_initiated); | ||
REALM_ASSERT(!m_abandoned); | ||
m_initiated = true; | ||
} | ||
|
||
|
||
inline void SessionWrapper::mark_abandoned() | ||
{ | ||
REALM_ASSERT(!m_abandoned); | ||
m_abandoned = true; | ||
} | ||
|
||
|
||
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler) | ||
{ | ||
REALM_ASSERT(!m_initiated); | ||
|
@@ -1296,10 +1309,8 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<Connec | |
|
||
void SessionWrapper::initiate() | ||
{ | ||
REALM_ASSERT(!m_initiated); | ||
ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode}; | ||
m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws | ||
m_initiated = true; | ||
m_db->add_commit_listener(this); | ||
} | ||
|
||
|
@@ -1309,10 +1320,6 @@ void SessionWrapper::on_commit(version_type new_version) | |
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
|
||
if (REALM_UNLIKELY(m_finalized || m_force_closed)) { | ||
return; | ||
} | ||
|
||
util::bind_ptr<SessionWrapper> self{this}; | ||
m_client.post([self = std::move(self), new_version](Status status) { | ||
if (status == ErrorCodes::OperationAborted) | ||
|
@@ -1321,6 +1328,10 @@ void SessionWrapper::on_commit(version_type new_version) | |
throw Exception(status); | ||
|
||
REALM_ASSERT(self->m_actualized); | ||
if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) { | ||
return; | ||
} | ||
|
||
if (REALM_UNLIKELY(!self->m_sess)) | ||
return; // Already finalized | ||
SessionImpl& sess = *self->m_sess; | ||
|
@@ -1336,10 +1347,6 @@ void SessionWrapper::cancel_reconnect_delay() | |
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
|
||
if (REALM_UNLIKELY(m_finalized || m_force_closed)) { | ||
return; | ||
} | ||
|
||
util::bind_ptr<SessionWrapper> self{this}; | ||
m_client.post([self = std::move(self)](Status status) { | ||
if (status == ErrorCodes::OperationAborted) | ||
|
@@ -1348,6 +1355,10 @@ void SessionWrapper::cancel_reconnect_delay() | |
throw Exception(status); | ||
|
||
REALM_ASSERT(self->m_actualized); | ||
if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) { | ||
return; | ||
} | ||
|
||
if (REALM_UNLIKELY(!self->m_sess)) | ||
return; // Already finalized | ||
SessionImpl& sess = *self->m_sess; | ||
|
@@ -1362,7 +1373,6 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple | |
{ | ||
REALM_ASSERT(upload_completion || download_completion); | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
util::bind_ptr<SessionWrapper> self{this}; | ||
m_client.post([self = std::move(self), handler = std::move(handler), upload_completion, | ||
|
@@ -1405,7 +1415,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(!m_abandoned); | ||
|
||
std::int_fast64_t target_mark; | ||
{ | ||
|
@@ -1421,6 +1431,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() | |
throw Exception(status); | ||
|
||
REALM_ASSERT(self->m_actualized); | ||
REALM_ASSERT(!self->m_finalized); | ||
// The session wrapper may already have been finalized. This can only | ||
// happen if it was abandoned, but in that case, the call of | ||
// wait_for_upload_complete_or_client_stopped() must have returned | ||
|
@@ -1449,7 +1460,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(!m_abandoned); | ||
|
||
std::int_fast64_t target_mark; | ||
{ | ||
|
@@ -1465,6 +1476,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() | |
throw Exception(status); | ||
|
||
REALM_ASSERT(self->m_actualized); | ||
REALM_ASSERT(!self->m_finalized); | ||
// The session wrapper may already have been finalized. This can only | ||
// happen if it was abandoned, but in that case, the call of | ||
// wait_for_download_complete_or_client_stopped() must have returned | ||
|
@@ -1493,7 +1505,7 @@ void SessionWrapper::refresh(std::string signed_access_token) | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(!m_abandoned); | ||
|
||
m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) { | ||
if (status == ErrorCodes::OperationAborted) | ||
|
@@ -1527,6 +1539,7 @@ inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noex | |
// Must be called from event loop thread | ||
void SessionWrapper::actualize(ServerEndpoint endpoint) | ||
{ | ||
REALM_ASSERT_DEBUG(m_initiated); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding this assert reproduced the bug immediately. |
||
REALM_ASSERT(!m_actualized); | ||
REALM_ASSERT(!m_sess); | ||
// Cannot be actualized if it's already been finalized or force closed | ||
|
@@ -1617,6 +1630,7 @@ void SessionWrapper::force_close() | |
void SessionWrapper::finalize() | ||
{ | ||
REALM_ASSERT(m_actualized); | ||
REALM_ASSERT(m_abandoned); | ||
|
||
// Already finalized? | ||
if (m_finalized) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this guarantee as an optimization, but I wonder if it has other implications (cannot see any)