-
Notifications
You must be signed in to change notification settings - Fork 178
Allow sync tests to control their event loop lifecycles #6244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3650624
a575487
87e25b6
cf89c88
09ff6a8
68f8bb7
ee238f5
81565a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -458,7 +458,7 @@ void DefaultWebSocketImpl::initiate_websocket_handshake() | |
/// | ||
|
||
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger, | ||
const std::string user_agent) | ||
const std::string user_agent, AutoStart auto_start) | ||
: m_logger_ptr{logger} | ||
, m_service{} | ||
, m_random{} | ||
|
@@ -471,18 +471,19 @@ DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger> | |
REALM_ASSERT(m_logger_ptr); // Make sure the logger is valid | ||
util::seed_prng_nondeterministically(m_random); // Throws | ||
start_keep_running_timer(); // TODO: Update service so this timer is not needed | ||
start(); | ||
if (auto_start) { | ||
start(); | ||
} | ||
} | ||
|
||
DefaultSocketProvider::~DefaultSocketProvider() | ||
{ | ||
// Wait for the thread to stop | ||
stop(true); | ||
m_logger_ptr->trace("Default event loop teardown"); | ||
if (m_keep_running_timer) | ||
m_keep_running_timer->cancel(); | ||
if (m_thread.joinable()) { | ||
m_thread.join(); | ||
} | ||
|
||
// Wait for the thread to stop | ||
stop(true); | ||
// Shutting down - no need to lock mutex before check | ||
REALM_ASSERT(m_state == State::Stopped); | ||
} | ||
|
@@ -491,70 +492,76 @@ void DefaultSocketProvider::start() | |
{ | ||
std::unique_lock<std::mutex> lock(m_mutex); | ||
// Has the thread already ED48 been started or is running | ||
if (m_state == State::Starting || m_state == State::Started || m_state == State::Running) | ||
if (m_state == State::Starting || m_state == State::Running) | ||
return; // early return | ||
|
||
// If the thread has been previously run, make sure it has been joined first | ||
if (m_state == State::Stopping || m_state == State::Stopped) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had these lines in here in case |
||
if (m_thread.joinable()) { | ||
lock.unlock(); | ||
m_thread.join(); | ||
lock.lock(); | ||
} | ||
if (m_state == State::Stopping) { | ||
state_wait_for(lock, State::Stopped); | ||
} | ||
|
||
m_logger_ptr->trace("Default event loop: start()"); | ||
REALM_ASSERT(m_state == State::Stopped); | ||
do_state_update(State::Starting); | ||
do_state_update(lock, State::Starting); | ||
m_thread = std::thread{&DefaultSocketProvider::event_loop, this}; | ||
lock.unlock(); | ||
// Wait for the thread to start before continuing | ||
state_wait_for(State::Started); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this was actually causing some subtle races where we'd wait for the thread to reach the "Started" state, but it wouldn't actually be ready to serve work until the the "Running" state. Since the Started/Running states should always be a matter of microseconds apart, I just removed the "Started" state and made it so we don't enter the "Running" state until we're actually inside the event loop. |
||
state_wait_for(lock, State::Running); | ||
} | ||
|
||
void DefaultSocketProvider::event_loop() | ||
{ | ||
m_logger_ptr->trace("Default event loop: thread running"); | ||
std::unique_lock<std::mutex> lock(m_mutex); | ||
// The thread has started - don't return early due to stop() until after did_create_thread() | ||
if (m_state == State::Starting) { | ||
do_state_update(State::Started); | ||
} | ||
lock.unlock(); | ||
|
||
auto will_destroy_thread = util::make_scope_exit([&]() noexcept { | ||
m_logger_ptr->trace("Default event loop: thread exiting"); | ||
if (g_binding_callback_thread_observer) | ||
g_binding_callback_thread_observer->will_destroy_thread(); | ||
|
||
std::lock_guard<std::mutex> lock(m_mutex); | ||
std::unique_lock<std::mutex> lock(m_mutex); | ||
// Did we get here due to an unhandled exception? | ||
if (m_state != State::Stopping) { | ||
m_logger_ptr->error("Default event loop: thread exited unexpectedly"); | ||
} | ||
do_state_update(State::Stopped); | ||
m_state = State::Stopped; | ||
std::notify_all_at_thread_exit(m_state_cv, std::move(lock)); | ||
}); | ||
|
||
if (g_binding_callback_thread_observer) | ||
g_binding_callback_thread_observer->did_create_thread(); | ||
|
||
lock.lock(); | ||
if (m_state != State::Started) { | ||
// Stop has already been requested - exit early | ||
lock.unlock(); // make sure the mutex is unloaded before exiting | ||
return; // early return | ||
{ | ||
std::lock_guard<std::mutex> lock(m_mutex); | ||
REALM_ASSERT(m_state == State::Starting); | ||
} | ||
do_state_update(State::Running); | ||
lock.unlock(); | ||
|
||
m_logger_ptr->trace("Default event loop: service run"); | ||
// We update the state to Running from inside the event loop so that start() is blocked until | ||
// the event loop is actually ready to receive work. | ||
m_service.post([this, my_generation = ++m_event_loop_generation](Status status) { | ||
if (status == ErrorCodes::OperationAborted) { | ||
return; | ||
} | ||
|
||
REALM_ASSERT(status.is_ok()); | ||
|
||
std::unique_lock<std::mutex> lock(m_mutex); | ||
// This is a callback from a previous generation | ||
if (m_event_loop_generation != my_generation) { | ||
return; | ||
} | ||
if (m_state == State::Stopping) { | ||
return; | ||
} | ||
m_logger_ptr->trace("Default event loop: service run"); | ||
REALM_ASSERT(m_state == State::Starting); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if this were ever false then we should have gotten a Status of OperationAborted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above |
||
do_state_update(lock, State::Running); | ||
}); | ||
|
||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might want to add a check here to confirm the state is still starting before it calls |
||
m_service.run(); // Throws | ||
} | ||
catch (const std::exception& e) { | ||
lock.lock(); | ||
std::unique_lock<std::mutex> lock(m_mutex); | ||
// Service is no longer running, event loop thread is stopping | ||
do_state_update(State::Stopping); | ||
do_state_update(lock, State::Stopping); | ||
lock.unlock(); | ||
m_logger_ptr->error("Default event loop exception: ", e.what()); | ||
if (g_binding_callback_thread_observer) | ||
|
@@ -569,38 +576,39 @@ void DefaultSocketProvider::stop(bool wait_for_stop) | |
std::unique_lock<std::mutex> lock(m_mutex); | ||
|
||
// Do nothing if the thread is not started or running or stop has already been called | ||
if (m_state == State::Starting || m_state == State::Started || m_state == State::Running) { | ||
if (m_state == State::Starting || m_state == State::Running) { | ||
m_logger_ptr->trace("Default event loop: stop()"); | ||
do_state_update(State::Stopping); | ||
do_state_update(lock, State::Stopping); | ||
// Updating state to Stopping will free a start() if it is waiting for the thread to | ||
// start and may cause the thread to exit early before calling service.run() | ||
m_service.stop(); // Unblocks m_service.run() | ||
} | ||
|
||
// Wait until the thread is stopped (exited) if requested | ||
if (wait_for_stop && m_state == State::Stopping) { | ||
lock.unlock(); | ||
if (wait_for_stop) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't think of a way we'd have a state != State::Stopping || State::Stopped here, so we shouldn't need the extra check |
||
m_logger_ptr->trace("Default event loop: wait for stop"); | ||
state_wait_for(State::Stopped); | ||
state_wait_for(lock, State::Stopped); | ||
if (m_thread.joinable()) { | ||
m_thread.join(); | ||
} | ||
} | ||
} | ||
|
||
// +--------------------------------------------------+ | ||
// \/ | | ||
// State Machine: Stopped -> Starting -> Started -> Running -> Stopping -+ | ||
// | | ^ | ||
// +-----------+---------------------+ | ||
// +---------------------------------------+ | ||
// \/ | | ||
// State Machine: Stopped -> Starting -> Running -> Stopping -+ | ||
// | | ^ | ||
// +----------------------+ | ||
|
||
void DefaultSocketProvider::do_state_update(State new_state) | ||
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state) | ||
{ | ||
// m_state_mutex should already be locked... | ||
m_state = new_state; | ||
m_state_cv.notify_all(); // Let any waiters check the state | ||
} | ||
|
||
void DefaultSocketProvider::state_wait_for(State expected_state) | ||
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state) | ||
{ | ||
std::unique_lock<std::mutex> lock(m_mutex); | ||
// Check for condition already met or superseded | ||
if (m_state >= expected_state) | ||
return; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
#include <vector> | ||
#include <sstream> | ||
|
||
#include <realm/sync/network/default_socket.hpp> | ||
#include <realm/sync/network/http.hpp> | ||
#include <realm/sync/network/network.hpp> | ||
#include <realm/string_data.hpp> | ||
|
@@ -534,6 +535,10 @@ class MultiClientServerFixture { | |
m_clients.resize(num_clients); | ||
for (int i = 0; i < num_clients; ++i) { | ||
Client::Config config_2; | ||
|
||
m_client_socket_providers.push_back(std::make_shared<websocket::DefaultSocketProvider>( | ||
m_client_loggers[i], "", websocket::DefaultSocketProvider::AutoStart{false})); | ||
config_2.socket_provider = m_client_socket_providers.back(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very clever! |
||
config_2.user_agent_application_info = "TestFixture/" REALM_VERSION_STRING; | ||
config_2.logger = m_client_loggers[i]; | ||
config_2.reconnect_mode = ReconnectMode::testing; | ||
|
@@ -562,6 +567,7 @@ class MultiClientServerFixture { | |
{ | ||
unit_test::TestContext& test_context = m_test_context; | ||
stop(); | ||
m_client_socket_providers.clear(); | ||
for (int i = 0; i < m_num_servers; ++i) { | ||
if (m_server_threads[i].joinable()) | ||
CHECK(!m_server_threads[i].join()); | ||
|
@@ -599,7 +605,7 @@ class MultiClientServerFixture { | |
// Post the new simulated error rate | ||
using sf = _impl::SimulatedFailure; | ||
// Post it onto the event loop to update the event loop thread | ||
m_clients[client_index]->post_for_testing([sim = std::move(sim)](Status) { | ||
m_client_socket_providers[client_index]->post([sim = std::move(sim)](Status) { | ||
sf::prime_random(sf::sync_client__read_head, sim.first, sim.second, | ||
random_int<uint_fast64_t>()); // Seed from global generator | ||
}); | ||
|
@@ -618,6 +624,16 @@ class MultiClientServerFixture { | |
m_server_threads[i].start([this, i] { | ||
run_server(i); | ||
}); | ||
|
||
for (int i = 0; i < m_num_clients; ++i) { | ||
m_client_socket_providers[i]->start(); | ||
} | ||
} | ||
|
||
void start_client(int index) | ||
{ | ||
REALM_ASSERT(index >= 0 && index < m_num_clients); | ||
m_client_socket_providers[index]->start(); | ||
} | ||
|
||
// Use either the methods below or `start()`. | ||
|
@@ -647,18 +663,19 @@ class MultiClientServerFixture { | |
if (sim.first != 0) { | ||
using sf = _impl::SimulatedFailure; | ||
// If we're using a simulated failure, clear it by posting onto the event loop | ||
client.post_for_testing([](Status) mutable { | ||
m_client_socket_providers[index]->post([](Status) mutable { | ||
sf::unprime(sf::sync_client__read_head); // Clear the sim failure set when started | ||
}); | ||
} | ||
// We can't wait for clearing the simulated failure since some tests stop the client early | ||
client.stop(); | ||
m_client_socket_providers[index]->stop(true); | ||
} | ||
|
||
void stop() | ||
{ | ||
for (int i = 0; i < m_num_clients; ++i) | ||
stop_client(i); | ||
m_clients[i]->stop(); | ||
for (int i = 0; i < m_num_servers; ++i) | ||
m_servers[i]->stop(); | ||
} | ||
|
@@ -797,6 +814,7 @@ class MultiClientServerFixture { | |
std::vector<std::function<ConnectionStateChangeListener>> m_connection_state_change_listeners; | ||
std::vector<port_type> m_server_ports; | ||
std::vector<ThreadWrapper> m_server_threads; | ||
std::vector<std::shared_ptr<websocket::DefaultSocketProvider>> m_client_socket_providers; | ||
std::vector<std::pair<int, int>> m_simulated_server_error_rates; | ||
std::vector<std::pair<int, int>> m_simulated_client_error_rates; | ||
std::vector<uint_least64_t> m_allow_server_errors; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think stop() should be the thing that joins the worker thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But only if
wait_for_stop
is true. Sometimesclient.stop()
, which callssocket_provider.stop()
(for now) is called on the event loop thread.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but we're calling
stop(true)
here. I guess my comment there means "we're calling stop(true), which calls join if wait_for_stop is true, which it is, so we don't need to join here."There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, we should either just detach the thread so that joining it doesn't matter, or we should join it when stop is called with
wait_for_stop==true
.