-
Notifications
You must be signed in to change notification settings - Fork 178
Allow sync tests to control their event loop lifecycles #6244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for looking into this!
// Updating state to Stopping will free a start() if it is waiting for the thread to | ||
// start and may cause the thread to exit early before calling service.run() | ||
m_service.stop(); // Unblocks m_service.run() | ||
state_wait_for(lock, State::Stopping); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will never wait since our state is already set to Stopping above...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
m_client_socket_providers.push_back(std::make_shared<websocket::DefaultSocketProvider>( | ||
m_client_loggers[i], "", websocket::DefaultSocketProvider::AutoStart{false})); | ||
config_2.socket_provider = m_client_socket_providers.back(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very clever!
test/test_handshake.cpp
Outdated
@@ -285,7 +286,7 @@ void run_client_surprise_server(unit_test::TestContext& test_context, const std: | |||
|
|||
util::Logger& logger = test_context.logger; | |||
util::PrefixLogger server_logger("Server: ", logger); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You might as well make this test_context.logger
as well and delete the line above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't use this file, you can ignore...
Client::Config client_config; | ||
client_config.logger = &client_logger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did this work without crashing in the past?
Oh, it looks like we don't use this file in our tests... (it's not in any of the CMakeLists.txt files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just removed this file.
test/sync_fixtures.hpp
Outdated
@@ -562,6 +567,9 @@ class MultiClientServerFixture { | |||
{ | |||
unit_test::TestContext& test_context = m_test_context; | |||
stop(); | |||
for (int i = 0; i < m_num_clients; ++i) { | |||
m_client_socket_providers[i]->stop(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't stop()
do this (and remove it from here)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from what I saw, the behavior of the tests before was that calling stop()
did not actually join the client worker thread and the worker threads were joined separately here
realm-core/test/sync_fixtures.hpp
Line 568 in 792fd5b
CHECK(!m_client_threads[i].join()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't hurt but not necessary, since the both the client (currently, but will be removed soon) and the DefaultWebsocketProvider does this in their destructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And stop()
doesn't need to do this, since client.stop()
will already start the event loop stop process. But that will be changed with my upcoming changes - once those are in, it shouldn't matter if/when we call stop on the event loop. I can revisit with my changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client calls stop()
but without stop(true)
, so it doesn't really do this. Maybe we could replace this with m_client_socket_providers.clear()
in the future, but since we had a bunch of weird data races I think it's worth it to try to keep this shutdown logic as close to the old behavior as possible.
if (m_keep_running_timer) | ||
m_keep_running_timer->cancel(); | ||
if (m_thread.joinable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think stop() should be the thing that joins the worker thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But only if wait_for_stop
is true. Sometimes client.stop()
, which calls socket_provider.stop()
(for now) is called on the event loop thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but we're calling stop(true)
here. I guess my comment there means "we're calling stop(true), which calls join if wait_for_stop is true, which it is, so we don't need to join here."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, we should either just detach the thread so that joining it doesn't matter, or we should join it when stop is called with wait_for_stop==true
.
// Wait for the thread to start before continuing | ||
state_wait_for(State::Started); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was actually causing some subtle races where we'd wait for the thread to reach the "Started" state, but it wouldn't actually be ready to serve work until the the "Running" state. Since the Started/Running states should always be a matter of microseconds apart, I just removed the "Started" state and made it so we don't enter the "Running" state until we're actually inside the event loop.
m_service.post([this](Status status) { | ||
std::unique_lock<std::mutex> lock(m_mutex); | ||
if (status == ErrorCodes::OperationAborted) { | ||
REALM_ASSERT(m_state == State::Stopping || m_state == State::Stopped); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only way I could think of for this to happen is if we created an event loop and then immediately destroyed it, so I don't think this is super likely to ever happen, so just REALM_ASSERT'ing this should be okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ErrorCodes::OperationAborted
value for Status
isn't implemented... it is always Status::OK
network::Service
doesn't have a mechanism to abort anything on the queue when stop() is called. If there was nothing to run when stop() was called, it will exit immediately, but if there are items in the queue, it will continue processing them until it is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is the right thing to do here to just return early if m_state == Stopping
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could potentially add support for OperationAborted, but I'm not sure if this will break anything...
# network.hpp:2638
inline void Service::post(H handler)
{
// m_stopped is false when Service is created and after calling reset() after stop()
if (m_impl->m_stopped) {
handler(Status(ErrorCodes::OperationAborted), "Event loop is not running");
return;
}
do_post(&Service::post_oper_constr<H>, sizeof(PostOper<H>), &handler);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens to callbacks that have been posted, but not run, when the Service is destroyed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They just sit in the m_completed_operations_2
queue and are destroyed when Service is destroyed - this means stale post function handlers (with potentially invalid objects) could be called when/if run() is called again. I think it would be preferable if this queue was flushed (or called with OperationAborted) when reset()
is called.
I'll create a new task with this information and to be looked at separately (after platform networking).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated this function a bit so that if the service got reset (which I don't think it can be right now) and this callback ran, it has a generation count so that we only set the state to running if we're the correct call to run() for this event loop, and otherwise we return early if m_state == Stopping. I still check for status == OperationAborted, but just return early and don't assert anything.
|
||
REALM_ASSERT(status.is_ok()); | ||
m_logger_ptr->trace("Default event loop: service run"); | ||
REALM_ASSERT(m_state == State::Starting); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if this were ever false then we should have gotten a Status of OperationAborted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above
// Updating state to Stopping will free a start() if it is waiting for the thread to | ||
// start and may cause the thread to exit early before calling service.run() | ||
m_service.stop(); // Unblocks m_service.run() | ||
} | ||
|
||
// Wait until the thread is stopped (exited) if requested | ||
if (wait_for_stop && m_state == State::Stopping) { | ||
lock.unlock(); | ||
if (wait_for_stop) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't think of a way we'd have a state != State::Stopping || State::Stopped here, so we shouldn't need the extra check
REALM_ASSERT(m_state == State::Starting); | ||
do_state_update(lock, State::Running); | ||
}); | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to add a check here to confirm the state is still starting before it calls m_service.run()
- if stop() was called before this, you would hit the REALM_ASSERT() in the post function handler.
return; // early return | ||
|
||
// If the thread has been previously run, make sure it has been joined first | ||
if (m_state == State::Stopping || m_state == State::Stopped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had these lines in here in case stop()
(without waiting) and then start()
was called, but we never do this in the code; and I am working to change the behavior of stop() being called, in general, so this should be fine.
What, How & Why?
This undoes some of the changes in #6151 so that sync tests can control the lifecycle of their own event loops, including controlling when they start/stop and posting directly to them. Hopefully this fixes the TSAN issues we were seeing - I ran all the realm-sync-tests successfully with TSAN enabled locally.
☑️ ToDos
[ ] 📝 Changelog update- this is fixing changes to tests that haven't been released yet.