8000 Allow sync tests to control their event loop lifecycles by jbreams · Pull Request #6244 · realm/realm-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Allow sync tests to control their event loop lifecycles #6244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ ClientImpl::~ClientImpl()
// Since no other thread is allowed to be accessing this client or any of
// its subobjects at this time, no mutex locking is necessary.

// Event Loop TODO: Until the event loop can be free-running, wait for the
// thread to exit before tearing down the client.
m_socket_provider->stop(true);

Expand Down Expand Up @@ -1842,12 +1841,6 @@ void Client::stop() noexcept
}


void Client::post_for_testing(SyncSocketProvider::FunctionHandler&& handler)
{
m_impl->post(std::move(handler));
}


void Client::cancel_reconnect_delay()
{
m_impl->cancel_reconnect_delay();
Expand Down
3 changes: 0 additions & 3 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ class Client {
Client(Client&&) noexcept;
~Client() noexcept;

/// Used by testing to post a function handler onto the event loop
void post_for_testing(SyncSocketProvider::FunctionHandler&& handler);

/// Run the internal event-loop of the client. At most one thread may
/// execute run() at any given time. The call will not return until somebody
/// calls stop().
Expand Down
106 changes: 57 additions & 49 deletions src/realm/sync/network/default_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ void DefaultWebSocketImpl::initiate_websocket_handshake()
///

DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
const std::string user_agent)
const std::string user_agent, AutoStart auto_start)
: m_logger_ptr{logger}
, m_service{}
, m_random{}
Expand All @@ -471,18 +471,19 @@ DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>
REALM_ASSERT(m_logger_ptr); // Make sure the logger is valid
util::seed_prng_nondeterministically(m_random); // Throws
start_keep_running_timer(); // TODO: Update service so this timer is not needed
start();
if (auto_start) {
start();
}
}

DefaultSocketProvider::~DefaultSocketProvider()
{
// Wait for the thread to stop
stop(true);
m_logger_ptr->trace("Default event loop teardown");
if (m_keep_running_timer)
m_keep_running_timer->cancel();
if (m_thread.joinable()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think stop() should be the thing that joins the worker thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But only if wait_for_stop is true. Sometimes client.stop(), which calls socket_provider.stop() (for now) is called on the event loop thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but we're calling stop(true) here. I guess my comment there means "we're calling stop(true), which calls join if wait_for_stop is true, which it is, so we don't need to join here."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, we should either just detach the thread so that joining it doesn't matter, or we should join it when stop is called with wait_for_stop==true.

m_thread.join();
}

// Wait for the thread to stop
stop(true);
// Shutting down - no need to lock mutex before check
REALM_ASSERT(m_state == State::Stopped);
}
Expand All @@ -491,70 +492,76 @@ void DefaultSocketProvider::start()
{
std::unique_lock<std::mutex> lock(m_mutex);
// Has the thread already ED48 been started or is running
if (m_state == State::Starting || m_state == State::Started || m_state == State::Running)
if (m_state == State::Starting || m_state == State::Running)
return; // early return

// If the thread has been previously run, make sure it has been joined first
if (m_state == State::Stopping || m_state == State::Stopped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had these lines in here in case stop() (without waiting) and then start() was called, but we never do this in the code; and I am working to change the behavior of stop() being called, in general, so this should be fine.

if (m_thread.joinable()) {
lock.unlock();
m_thread.join();
lock.lock();
}
if (m_state == State::Stopping) {
state_wait_for(lock, State::Stopped);
}

m_logger_ptr->trace("Default event loop: start()");
REALM_ASSERT(m_state == State::Stopped);
do_state_update(State::Starting);
do_state_update(lock, State::Starting);
m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
lock.unlock();
// Wait for the thread to start before continuing
state_wait_for(State::Started);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was actually causing some subtle races where we'd wait for the thread to reach the "Started" state, but it wouldn't actually be ready to serve work until the the "Running" state. Since the Started/Running states should always be a matter of microseconds apart, I just removed the "Started" state and made it so we don't enter the "Running" state until we're actually inside the event loop.

state_wait_for(lock, State::Running);
}

void DefaultSocketProvider::event_loop()
{
m_logger_ptr->trace("Default event loop: thread running");
std::unique_lock<std::mutex> lock(m_mutex);
// The thread has started - don't return early due to stop() until after did_create_thread()
if (m_state == State::Starting) {
do_state_update(State::Started);
}
lock.unlock();

auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
m_logger_ptr->trace("Default event loop: thread exiting");
if (g_binding_callback_thread_observer)
g_binding_callback_thread_observer->will_destroy_thread();

std::lock_guard<std::mutex> lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
// Did we get here due to an unhandled exception?
if (m_state != State::Stopping) {
m_logger_ptr->error("Default event loop: thread exited unexpectedly");
}
do_state_update(State::Stopped);
m_state = State::Stopped;
std::notify_all_at_thread_exit(m_state_cv, std::move(lock));
});

if (g_binding_callback_thread_observer)
g_binding_callback_thread_observer->did_create_thread();

lock.lock();
if (m_state != State::Started) {
// Stop has already been requested - exit early
lock.unlock(); // make sure the mutex is unloaded before exiting
return; // early return
{
std::lock_guard<std::mutex> lock(m_mutex);
REALM_ASSERT(m_state == State::Starting);
}
do_state_update(State::Running);
lock.unlock();

m_logger_ptr->trace("Default event loop: service run");
// We update the state to Running from inside the event loop so that start() is blocked until
// the event loop is actually ready to receive work.
m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
if (status == ErrorCodes::OperationAborted) {
return;
}

REALM_ASSERT(status.is_ok());

std::unique_lock<std::mutex> lock(m_mutex);
// This is a callback from a previous generation
if (m_event_loop_generation != my_generation) {
return;
}
if (m_state == State::Stopping) {
return;
}
m_logger_ptr->trace("Default event loop: service run");
REALM_ASSERT(m_state == State::Starting);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if this were ever false then we should have gotten a Status of OperationAborted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

do_state_update(lock, State::Running);
});

try {
Copy link
Contributor
@michael-wb michael-wb Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to add a check here to confirm the state is still starting before it calls m_service.run() - if stop() was called before this, you would hit the REALM_ASSERT() in the post function handler.

m_service.run(); // Throws
}
catch (const std::exception& e) {
lock.lock();
std::unique_lock<std::mutex> lock(m_mutex);
// Service is no longer running, event loop thread is stopping
do_state_update(State::Stopping);
do_state_update(lock, State::Stopping);
lock.unlock();
m_logger_ptr->error("Default event loop exception: ", e.what());
if (g_binding_callback_thread_observer)
Expand All @@ -569,38 +576,39 @@ void DefaultSocketProvider::stop(bool wait_for_stop)
std::unique_lock<std::mutex> lock(m_mutex);

// Do nothing if the thread is not started or running or stop has already been called
if (m_state == State::Starting || m_state == State::Started || m_state == State::Running) {
if (m_state == State::Starting || m_state == State::Running) {
m_logger_ptr->trace("Default event loop: stop()");
do_state_update(State::Stopping);
do_state_update(lock, State::Stopping);
// Updating state to Stopping will free a start() if it is waiting for the thread to
// start and may cause the thread to exit early before calling service.run()
m_service.stop(); // Unblocks m_service.run()
}

// Wait until the thread is stopped (exited) if requested
if (wait_for_stop && m_state == State::Stopping) {
lock.unlock();
if (wait_for_stop) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think of a way we'd have a state != State::Stopping || State::Stopped here, so we shouldn't need the extra check

m_logger_ptr->trace("Default event loop: wait for stop");
state_wait_for(State::Stopped);
state_wait_for(lock, State::Stopped);
if (m_thread.joinable()) {
m_thread.join();
}
}
}

// +--------------------------------------------------+
// \/ |
// State Machine: Stopped -> Starting -> Started -> Running -> Stopping -+
// | | ^
// +-----------+---------------------+
// +---------------------------------------+
// \/ |
// State Machine: Stopped -> Starting -> Running -> Stopping -+
// | | ^
// +----------------------+

void DefaultSocketProvider::do_state_update(State new_state)
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
{
// m_state_mutex should already be locked...
m_state = new_state;
m_state_cv.notify_all(); // Let any waiters check the state
}

void DefaultSocketProvider::state_wait_for(State expected_state)
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
{
std::unique_lock<std::mutex> lock(m_mutex);
// Check for condition already met or superseded
if (m_state >= expected_state)
return;
Expand Down
21 changes: 14 additions & 7 deletions src/realm/sync/network/default_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <realm/sync/network/network.hpp>
#include <realm/util/future.hpp>
#include <realm/util/random.hpp>
#include <realm/util/tagged_bool.hpp>

namespace realm::sync::network {
class Service;
Expand Down Expand Up @@ -44,13 +45,21 @@ class DefaultSocketProvider : public SyncSocketProvider {
network::DeadlineTimer m_timer;
};

DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger, const std::string user_agent);
struct AutoStartTag {
};

using AutoStart = util::TaggedBool<AutoStartTag>;
DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger, const std::string user_agent,
AutoStart auto_start = AutoStart{true});

// Don't allow move or copy constructor
DefaultSocketProvider(DefaultSocketProvider&&) = delete;

~DefaultSocketProvider();

// Start the event loop if it is not started already. Otherwise, do nothing.
void start();

/// Temporary workaround until client shutdown has been updated in a separate PR - these functions
/// will be handled internally when this happens.
/// Stops the internal event loop (provided by network::Service)
Expand All @@ -72,15 +81,12 @@ class DefaultSocketProvider : public SyncSocketProvider {
}

private:
enum class State { Starting, Started, Running, Stopping, Stopped };

// Start the event loop
void start();
enum class State { Starting, Running, Stopping, Stopped };

/// Block until the state reaches the expected or later state - return true if state matches expected state
void state_wait_for(State expected_state);
void state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state);
/// Internal function for updating the state and signaling the wait_for_state condvar
void do_state_update(State new_state);
void do_state_update(std::unique_lock<std::mutex>&, State new_state);
/// The execution code for the event loop thread
void event_loop();

Expand All @@ -100,6 +106,7 @@ class DefaultSocketProvider : public SyncSocketProvider {
const std::string m_user_agent;
SyncTimer m_keep_running_timer;
std::mutex m_mutex;
uint64_t m_event_loop_generation = 0;
State m_state; // protected by m_mutex
std::condition_variable m_state_cv; // uses m_mutex
std::thread m_thread; // protected by m_mutex
Expand Down
24 changes: 21 additions & 3 deletions test/sync_fixtures.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <vector>
#include <sstream>

#include <realm/sync/network/default_socket.hpp>
#include <realm/sync/network/http.hpp>
#include <realm/sync/network/network.hpp>
#include <realm/string_data.hpp>
Expand Down Expand Up @@ -534,6 +535,10 @@ class MultiClientServerFixture {
m_clients.resize(num_clients);
for (int i = 0; i < num_clients; ++i) {
Client::Config config_2;

m_client_socket_providers.push_back(std::make_shared<websocket::DefaultSocketProvider>(
m_client_loggers[i], "", websocket::DefaultSocketProvider::AutoStart{false}));
config_2.socket_provider = m_client_socket_providers.back();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clever!

config_2.user_agent_application_info = "TestFixture/" REALM_VERSION_STRING;
config_2.logger = m_client_loggers[i];
config_2.reconnect_mode = ReconnectMode::testing;
Expand Down Expand Up @@ -562,6 +567,7 @@ class MultiClientServerFixture {
{
unit_test::TestContext& test_context = m_test_context;
stop();
m_client_socket_providers.clear();
for (int i = 0; i < m_num_servers; ++i) {
if (m_server_threads[i].joinable())
CHECK(!m_server_threads[i].join());
Expand Down Expand Up @@ -599,7 +605,7 @@ class MultiClientServerFixture {
// Post the new simulated error rate
using sf = _impl::SimulatedFailure;
// Post it onto the event loop to update the event loop thread
m_clients[client_index]->post_for_testing([sim = std::move(sim)](Status) {
m_client_socket_providers[client_index]->post([sim = std::move(sim)](Status) {
sf::prime_random(sf::sync_client__read_head, sim.first, sim.second,
random_int<uint_fast64_t>()); // Seed from global generator
});
Expand All @@ -618,6 +624,16 @@ class MultiClientServerFixture {
m_server_threads[i].start([this, i] {
run_server(i);
});

for (int i = 0; i < m_num_clients; ++i) {
m_client_socket_providers[i]->start();
}
}

void start_client(int index)
{
REALM_ASSERT(index >= 0 && index < m_num_clients);
m_client_socket_providers[index]->start();
}

// Use either the methods below or `start()`.
Expand Down Expand Up @@ -647,18 +663,19 @@ class MultiClientServerFixture {
if (sim.first != 0) {
using sf = _impl::SimulatedFailure;
// If we're using a simulated failure, clear it by posting onto the event loop
client.post_for_testing([](Status) mutable {
m_client_socket_providers[index]->post([](Status) mutable {
sf::unprime(sf::sync_client__read_head); // Clear the sim failure set when started
});
}
// We can't wait for clearing the simulated failure since some tests stop the client early
client.stop();
m_client_socket_providers[index]->stop(true);
}

void stop()
{
for (int i = 0; i < m_num_clients; ++i)
stop_client(i);
m_clients[i]->stop();
for (int i = 0; i < m_num_servers; ++i)
m_servers[i]->stop();
}
Expand Down Expand Up @@ -797,6 +814,7 @@ class MultiClientServerFixture {
std::vector<std::function<ConnectionStateChangeListener>> m_connection_state_change_listeners;
std::vector<port_type> m_server_ports;
std::vector<ThreadWrapper> m_server_threads;
std::vector<std::shared_ptr<websocket::DefaultSocketProvider>> m_client_socket_providers;
std::vector<std::pair<int, int>> m_simulated_server_error_rates;
std::vector<std::pair<int, int>> m_simulated_client_error_rates;
std::vector<uint_least64_t> m_allow_server_errors;
Expand Down
Loading
0