8000 Fix session multiplexing by jbreams · Pull Request #6320 · realm/realm-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix session multiplexing #6320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e8b52d0
refactor bind and consider user_id/sync server mode part of server en…
jbreams Feb 17, 2023
5cf5b22
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Mar 3, 2023
f11a811
wip
jbreams Mar 6, 2023
d711388
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Mar 23, 2023
b144140
not crashing
jbreams Mar 23, 2023
a179585
make tests that assume only one connection force reconnects
jbreams Mar 23, 2023
bafcb43
clang format
jbreams Mar 23, 2023
fcd4d26
update test error code
jbreams Mar 23, 2023
9e54db2
undo error handling bits
jbreams Mar 23, 2023
cc0a75c
fix asan failure
jbreams Mar 24, 2023
86a03d3
more asan failures
jbreams Mar 24, 2023
65f0bd8
more asan failures
jbreams Mar 24, 2023
e833eee
add evergreen variant
jbreams Mar 24, 2023
82f53e8
allow unbound responses for suspended sessions
jbreams Mar 24, 2023
4e944b5
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Mar 27, 2023
47bdad3
ensure we only become de-activated when the SessionWrapper is finalized
jbreams Mar 29, 2023
a2b9245
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Mar 29, 2023
7f55d0a
fixups
jbreams Mar 29, 2023
f2436ee
fixes from review
jbreams Apr 6, 2023
37abe4c
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Apr 6, 2023
86f2127
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Apr 20, 2023
28d65ed
fix build config typo
jbreams Apr 21, 2023
15fbc42
Merge remote-tracking branch 'origin/master' into jbr/fix_user_sync_m…
jbreams Apr 21, 2023
c9def4c
changelog
jbreams Apr 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
* None.

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* If session multiplexing was enabled in the sync client and multiple realms for multiple users were being synchronized, a connection authenticated for the wrong user could have been used, reuslting in a UserMismatch error from the server. ([PR #6320](https://github.com/realm/realm-core/pull/6320), since v10.0.0).
* If session multiplexing was enabled and an automatic client reset failed, it could cause all sessions to fail with a fatal ProtocolError rather than just the session that failed to client reset. This would mean that no other sync session would be able to be opened for up to an hour without restarting the app. ([PR #6320](https://github.com/realm/realm-core/pull/6320), since v11.5.0)
* If a DOWNLOAD message was received after a sync session was de-activated but before the UNBOUND message was received by the client, a use-after-free error may have occurred when the sync session tried to process the download messaage. So far this has only been reproducible if session multiplexing was enabled. ([PR #6320](https://github.com/realm/realm-core/pull/6320), since v12.9.0)

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ option(REALM_ENABLE_MEMDEBUG "Add additional memory checks" OFF)
option(REALM_VALGRIND "Tell the test suite we are running with valgrind" OFF)
option(REALM_METRICS "Enable various metric tracking" ON)
option(REALM_INCLUDE_CERTS "Include a list of trust certificates in the build for SSL certificate verification" ${REALM_INCLUDE_CERTS_DEFAULT})
option(REALM_ENABLE_SYNC_MULTIPLEXING "Enables sync session multi-plexing by default")
set(REALM_MAX_BPNODE_SIZE "1000" CACHE STRING "Max B+ tree node size.")

# Find dependencies
Expand Down
19 changes: 19 additions & 0 deletions evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ functions:
set_cmake_var realm_vars REALM_TEST_DURATION STRING "${long_running_test_duration}"
fi

if [ -n "${enable_sync_multiplexing|}" ]; then
set_cmake_var realm_vars REALM_ENABLE_SYNC_MULTIPLEXING BOOL On
fi

set_cmake_var realm_vars REALM_BUILD_COMMANDLINE_TOOLS BOOL On
set_cmake_var realm_vars REALM_ENABLE_ENCRYPTION BOOL On

Expand Down Expand Up @@ -824,6 +828,21 @@ buildvariants:
distros:
- ubuntu2004-large

- name: ubuntu2004-session-multiplexing
display_name: "Ubuntu 20.04 x86_64 (Sync Multiplexing Enabled)"
run_on: ubuntu2004-large
expansions:
clang_url: "https://s3.amazonaws.com/static.realm.io/evergreen-assets/clang%2Bllvm-11.0.0-x86_64-linux-gnu-ubuntu-20.04.tar.xz"
cmake_url: "https://s3.amazonaws.com/static.realm.io/evergreen-assets/cmake-3.20.3-linux-x86_64.tar.gz"
cmake_bindir: "./cmake_binaries/bin"
fetch_missing_dependencies: On
run_tests_against_baas: On
c_compiler: "./clang_binaries/bin/clang"
cxx_compiler: "./clang_binaries/bin/clang++"
enable_sync_multiplexing: On
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't multiplexing enabled by default? Do we need the extra variant with the env_var and CMAKE setting? And if we keep it, should it also run the sync or object store tests instead of simply compiling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this PR just fixes it, but doesn't turn it on by default. My plan was to release this and do a separate much smaller PR that enables it by default and then this variant will be switched to test that sync works with multiplexing turned off so we don't regress and of those code paths the way multiplexing regressed.

tasks:
- name: compile_test

- name: ubuntu2004-encryption-tsan
display_name: "Ubuntu 20.04 x86_64 (Clang 11 Encryption Enabled w/TSAN)"
run_on: ubuntu2004-small
Expand Down
4 changes: 4 additions & 0 deletions src/realm/object-store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ else()
target_sources(ObjectStore PRIVATE impl/generic/external_commit_helper.cpp)
endif()

if(REALM_ENABLE_SYNC AND REALM_ENABLE_SYNC_MULTIPLEXING)
target_compile_definitions(ObjectStore PUBLIC REALM_ENABLE_SYNC_MULTIPLEXING=1)
endif()

if(REALM_ENABLE_SYNC OR REALM_ENABLE_SERVER)
# needed to disable assertions in external/json
target_compile_definitions(ObjectStore PUBLIC
Expand Down
5 changes: 5 additions & 0 deletions src/realm/object-store/sync/impl/sync_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ struct SyncClient {
m_client.shutdown();
}

void voluntary_disconnect_all_connections()
{
m_client.voluntary_disconnect_all_connections();
}

std::unique_ptr<sync::Session> make_session(std::shared_ptr<DB> db,
std::shared_ptr<sync::SubscriptionStore> flx_sub_store,
std::shared_ptr<sync::MigrationStore> migration_store,
Expand Down
5 changes: 5 additions & 0 deletions src/realm/object-store/sync/sync_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,8 @@ void SyncManager::close_all_sessions()

get_sync_client().wait_for_session_terminations();
}

void SyncManager::OnlyForTesting::voluntary_disconnect_all_connections(SyncManager& mgr)
{
mgr.get_sync_client().voluntary_disconnect_all_connections();
}
10 changes: 10 additions & 0 deletions src/realm/object-store/sync/sync_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ struct SyncClientConfig {
LoggerFactory logger_factory;
util::Logger::Level log_level = util::Logger::Level::info;
ReconnectMode reconnect_mode = ReconnectMode::normal; // For internal sync-client testing only!
#if REALM_ENABLE_SYNC_MULTIPLEXING
bool multiplex_sessions = true;
#else
bool multiplex_sessions = false;
#endif

// The SyncSocket instance used by the Sync Client for event synchronization
// and creating WebSockets. If not provided the default implementation will be used.
Expand Down Expand Up @@ -254,6 +258,12 @@ class SyncManager : public std::enable_shared_from_this<SyncManager> {
SyncManager(const SyncManager&) = delete;
SyncManager& operator=(const SyncManager&) = delete;

struct OnlyForTesting {
friend class TestHelper;

static void voluntary_disconnect_all_connections(SyncManager&);
};

protected:
friend class SyncUser;
friend class SyncSesson;
Expand Down
8 changes: 8 additions & 0 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,13 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
}
}

// If the websocket was closed cleanly or if the socket disappeared, don't notify the user as an error
// since the sync client will retry.
if (error_code == sync::websocket::WebSocketError::websocket_read_error ||
error_code == sync::websocket::WebSocketError::websocket_write_error) {
return;
}
Comment on lines +780 to +783
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't clear out the WebSocket object in the Connection when the normal websocket close code is received. This is done for voluntary and involuntary disconnect initiated by the client. Should it also be done in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that happen at a lower level? All this does is make it so we don't notify the user of an error if the error is just the websocket getting closed. These get hit when the websocket is closed unexpectedly without a close code.


// Surface a simplified websocket error to the user.
auto simplified_error = sync::websocket::get_simplified_websocket_error(
static_cast<sync::websocket::WebSocketError>(error_code.value()));
Expand Down Expand Up @@ -896,6 +903,7 @@ void SyncSession::create_sync_session()

sync::Session::Config session_config;
session_config.signed_user_token = sync_config.user->access_token();
session_config.user_id = sync_config.user->identity();
session_config.realm_identifier = sync_config.partition_value;
session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl;
session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path;
Expand Down
4 changes: 4 additions & 0 deletions src/realm/sync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ set_target_properties(Sync PROPERTIES
OUTPUT_NAME "realm-sync"
)

if(REALM_ENABLE_SYNC_MULTIPLEXING)
target_compile_definitions(Sync PUBLIC REALM_ENABLE_SYNC_MULTIPLEXING=1)
endif()

target_link_libraries(Sync PUBLIC Storage)

if(APPLE AND NOT REALM_FORCE_OPENSSL)
Expand Down
130 changes: 103 additions & 27 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac

void handle_pending_client_reset_acknowledgement();

std::string get_appservices_connection_id();

private:
ClientImpl& m_client;
DBRef m_db;
Expand All @@ -228,6 +230,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
const ProtocolEnvelope m_protocol_envelope;
const std::string m_server_address;
const port_type m_server_port;
const std::string m_user_id;
const SyncServerMode m_sync_mode;
const std::string m_authorization_header_name;
const std::map<std::string, std::string> m_custom_http_headers;
const bool m_verify_servers_ssl_certificate;
Expand Down Expand Up @@ -327,8 +331,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;

void do_initiate(ProtocolEnvelope, std::string server_address, port_type server_port);

void on_sync_progress();
void on_upload_completion();
void on_download_completion();
Expand Down Expand Up @@ -446,6 +448,52 @@ void ClientImpl::cancel_reconnect_delay()
}


void ClientImpl::voluntary_disconnect_all_connections()
{
auto done_pf = util::make_promise_future<void>();
post([this, promise = std::move(done_pf.promise)](Status status) mutable {
if (status == ErrorCodes::OperationAborted) {
return;
}

REALM_ASSERT(status.is_ok());

try {
for (auto& p : m_server_slots) {
ServerSlot& slot = p.second;
if (m_one_connection_per_session) {
REALM_ASSERT(!slot.connection);
for (const auto& p : slot.alt_connections) {
ClientImpl::Connection& conn = *p.second;
if (conn.get_state() == ConnectionState::disconnected) {
continue;
}
conn.voluntary_disconnect();
}
}
else {
REALM_ASSERT(slot.alt_connections.empty());
if (!slot.connection) {
continue;
}
ClientImpl::Connection& conn = *slot.connection;
if (conn.get_state() == ConnectionState::disconnected) {
continue;
}
conn.voluntary_disconnect();
}
}
}
catch (...) {
promise.set_error(exception_to_status());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we do!

F438
return;
}
promise.emplace_value();
});
done_pf.future.get();
}


bool ClientImpl::wait_for_session_terminations_or_client_stopped()
{
// Thread safety required
Expand Down Expand Up @@ -617,12 +665,13 @@ void ClientImpl::actualize_and_finalize_session_wrappers()
}


ClientImpl::Connection&
ClientImpl::get_connection(ServerEndpoint endpoint, const std::string& authorization_header_name,
const std::map<std::string, std::string>& custom_http_headers,
bool verify_servers_ssl_certificate, Optional<std::string> ssl_trust_certificate_path,
std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
Optional<ProxyConfig> proxy_config, SyncServerMode sync_mode, bool& was_created)
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
const std::string& authorization_header_name,
const std::map<std::string, std::string>& custom_http_headers,
bool verify_servers_ssl_certificate,
Optional<std::string> ssl_trust_certificate_path,
std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
Optional<ProxyConfig> proxy_config, bool& was_created)
{
ServerSlot& server_slot = m_server_slots[endpoint]; // Throws

Expand All @@ -639,8 +688,7 @@ ClientImpl::get_connection(ServerEndpoint endpoint, const std::string& authoriza
std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
*this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
std::move(proxy_config), server_slot.reconnect_info,
sync_mode); // Throws
std::move(proxy_config), server_slot.reconnect_info); // Throws
ClientImpl::Connection& conn = *conn_2;
if (!m_one_connection_per_session) {
server_slot.connection = std::move(conn_2);
Expand Down Expand Up @@ -982,6 +1030,10 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data
REALM_ASSERT(!err_processing_err);
return SyncClientHookAction::EarlyReturn;
}
case realm::SyncClientHookAction::TriggerReconnect: {
get_connection().voluntary_disconnect();
return SyncClientHookAction::EarlyReturn;
}
default:
return action;
}
Expand Down Expand Up @@ -1082,6 +1134,8 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_protocol_envelope{config.protocol_envelope}
, m_server_address{std::move(config.server_address)}
, m_server_port{config.server_port}
, m_user_id(std::move(config.user_id))
, m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
, m_authorization_header_name{config.authorization_header_name}
, m_custom_http_headers{config.custom_http_headers}
, m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
Expand Down Expand Up @@ -1254,7 +1308,10 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<Connec

inline void SessionWrapper::initiate()
{
do_initiate(m_protocol_envelope, m_server_address, m_server_port); // Throws
REALM_ASSERT(!m_initiated);
ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
m_initiated = true;
}


Expand Down Expand Up @@ -1471,13 +1528,12 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)
REALM_ASSERT(!m_actualized);
REALM_ASSERT(!m_sess);
m_db->claim_sync_agent();

SyncServerMode sync_mode = m_flx_subscription_store ? SyncServerMode::FLX : SyncServerMode::PBS;
auto sync_mode = endpoint.server_mode;

bool was_created = false;
ClientImpl::Connection& conn = m_client.get_connection(
std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config, sync_mode,
m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
was_created); // Throws
try {
// FIXME: This only makes sense when each session uses a separate connection.
Expand Down Expand Up @@ -1589,13 +1645,6 @@ inline void SessionWrapper::report_sync_transact(VersionID old_version, VersionI
m_sync_transact_handler(old_version, new_version); // Throws
}

void SessionWrapper::do_initiate(ProtocolEnvelope protocol, std::string server_address, port_type server_port)
{
REALM_ASSERT(!m_initiated);
ServerEndpoint server_endpoint{protocol, std::move(server_address), server_port};
m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
m_initiated = true;
}

inline void SessionWrapper::on_sync_progress()
{
Expand Down Expand Up @@ -1776,6 +1825,29 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()
});
}

std::string SessionWrapper::get_appservices_connection_id()
{
auto pf = util::make_promise_future<std::string>();
REALM_ASSERT(m_initiated);

util::bind_ptr<SessionWrapper> self(this);
get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
if (!status.is_ok()) {
promise.set_error(status);
return;
}

if (!self->m_sess) {
promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
return;
}

promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
});

return pf.future.get();
}

// ################ ClientImpl::Connection ################

ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
Expand All @@ -1784,20 +1856,15 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide
bool verify_servers_ssl_certificate,
Optional<std::string> ssl_trust_certificate_path,
std::function<SSLVerifyCallback> ssl_verify_callback,
Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info,
SyncServerMode sync_mode)
Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
: logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
, logger{*logger_ptr}
, m_client{client}
, m_protocol_envelope{std::get<0>(endpoint)}
, m_address{std::get<1>(endpoint)}
, m_port{std::get<2>(endpoint)}
, m_verify_servers_ssl_certificate{verify_servers_ssl_certificate} // DEPRECATED
, m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED
, m_proxy_config{std::move(proxy_config)} // DEPRECATED
, m_reconnect_info{reconnect_info}
, m_sync_mode(sync_mode)
, m_ident{ident}
, m_server_endpoint{std::move(endpoint)}
, m_authorization_header_name{authorization_header_name} // DEPRECATED
Expand Down Expand Up @@ -1921,6 +1988,10 @@ void Client::cancel_reconnect_delay()
m_impl->cancel_reconnect_delay();
}

void Client::voluntary_disconnect_all_connections()
{
m_impl->voluntary_disconnect_all_connections();
}

bool Client::wait_for_session_terminations_or_client_stopped()
{
Expand Down Expand Up @@ -2028,6 +2099,11 @@ util::Future<std::string> Session::send_test_command(std::string body)
return m_impl->send_test_command(std::move(body));
}

std::string Session::get_appservices_connection_id()
{
return m_impl->get_appservices_connection_id();
}

const std::error_category& client_error_category() noexcept
{
return g_error_category;
Expand Down
Loading
0