-
Notifications
You must be signed in to change notification settings - Fork 178
Fix session multiplexing #6320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix session multiplexing #6320
Changes from all commits
e8b52d0
5cf5b22
f11a811
d711388
b144140
a179585
bafcb43
fcd4d26
9e54db2
cc0a75c
86a03d3
65f0bd8
e833eee
82f53e8
4e944b5
47bdad3
a2b9245
7f55d0a
f2436ee
37abe4c
86f2127
28d65ed
15fbc42
c9def4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -775,6 +775,13 @@ void SyncSession::handle_error(sync::SessionErrorInfo error) | |
} | ||
} | ||
|
||
// If the websocket was closed cleanly or if the socket disappeared, don't notify the user as an error | ||
// since the sync client will retry. | ||
if (error_code == sync::websocket::WebSocketError::websocket_read_error || | ||
error_code == sync::websocket::WebSocketError::websocket_write_error) { | ||
return; | ||
} | ||
Comment on lines
+780
to
+783
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't clear out the WebSocket object in the Connection when the normal websocket close code is received. This is done for voluntary and involuntary disconnect initiated by the client. Should it also be done in this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't that happen at a lower level? All this does is make it so we don't notify the user of an error if the error is just the websocket getting closed. These get hit when the websocket is closed unexpectedly without a close code. |
||
|
||
// Surface a simplified websocket error to the user. | ||
auto simplified_error = sync::websocket::get_simplified_websocket_error( | ||
static_cast<sync::websocket::WebSocketError>(error_code.value())); | ||
|
@@ -896,6 +903,7 @@ void SyncSession::create_sync_session() | |
|
||
sync::Session::Config session_config; | ||
session_config.signed_user_token = sync_config.user->access_token(); | ||
session_config.user_id = sync_config.user->identity(); | ||
session_config.realm_identifier = sync_config.partition_value; | ||
session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl; | ||
session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -220,6 +220,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac | |
|
||
void handle_pending_client_reset_acknowledgement(); | ||
|
||
std::string get_appservices_connection_id(); | ||
|
||
private: | ||
ClientImpl& m_client; | ||
DBRef m_db; | ||
|
@@ -228,6 +230,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac | |
const ProtocolEnvelope m_protocol_envelope; | ||
const std::string m_server_address; | ||
const port_type m_server_port; | ||
const std::string m_user_id; | ||
const SyncServerMode m_sync_mode; | ||
const std::string m_authorization_header_name; | ||
const std::map<std::string, std::string> m_custom_http_headers; | ||
const bool m_verify_servers_ssl_certificate; | ||
|
@@ -327,8 +331,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac | |
std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0; | ||
std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0; | ||
|
||
void do_initiate(ProtocolEnvelope, std::string server_address, port_type server_port); | ||
|
||
void on_sync_progress(); | ||
void on_upload_completion(); | ||
void on_download_completion(); | ||
|
@@ -446,6 +448,52 @@ void ClientImpl::cancel_reconnect_delay() | |
} | ||
|
||
|
||
void ClientImpl::voluntary_disconnect_all_connections() | ||
{ | ||
auto done_pf = util::make_promise_future<void>(); | ||
post([this, promise = std::move(done_pf.promise)](Status status) mutable { | ||
if (status == ErrorCodes::OperationAborted) { | ||
return; | ||
} | ||
|
||
REALM_ASSERT(status.is_ok()); | ||
|
||
try { | ||
for (auto& p : m_server_slots) { | ||
ServerSlot& slot = p.second; | ||
if (m_one_connection_per_session) { | ||
REALM_ASSERT(!slot.connection); | ||
for (const auto& p : slot.alt_connections) { | ||
ClientImpl::Connection& conn = *p.second; | ||
if (conn.get_state() == ConnectionState::disconnected) { | ||
continue; | ||
} | ||
conn.voluntary_disconnect(); | ||
} | ||
} | ||
else { | ||
REALM_ASSERT(slot.alt_connections.empty()); | ||
if (!slot.connection) { | ||
continue; | ||
} | ||
ClientImpl::Connection& conn = *slot.connection; | ||
if (conn.get_state() == ConnectionState::disconnected) { | ||
continue; | ||
} | ||
conn.voluntary_disconnect(); | ||
} | ||
} | ||
} | ||
catch (...) { | ||
promise.set_error(exception_to_status()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need a return here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, we do! |
||
return; | ||
} | ||
promise.emplace_value(); | ||
}); | ||
done_pf.future.get(); | ||
} | ||
|
||
|
||
bool ClientImpl::wait_for_session_terminations_or_client_stopped() | ||
{ | ||
// Thread safety required | ||
|
@@ -617,12 +665,13 @@ void ClientImpl::actualize_and_finalize_session_wrappers() | |
} | ||
|
||
|
||
ClientImpl::Connection& | ||
ClientImpl::get_connection(ServerEndpoint endpoint, const std::string& authorization_header_name, | ||
const std::map<std::string, std::string>& custom_http_headers, | ||
bool verify_servers_ssl_certificate, Optional<std::string> ssl_trust_certificate_path, | ||
std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback, | ||
Optional<ProxyConfig> proxy_config, SyncServerMode sync_mode, bool& was_created) | ||
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint, | ||
const std::string& authorization_header_name, | ||
const std::map<std::string, std::string>& custom_http_headers, | ||
bool verify_servers_ssl_certificate, | ||
Optional<std::string> ssl_trust_certificate_path, | ||
std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback, | ||
Optional<ProxyConfig> proxy_config, bool& was_created) | ||
{ | ||
ServerSlot& server_slot = m_server_slots[endpoint]; // Throws | ||
|
||
|
@@ -639,8 +688,7 @@ ClientImpl::get_connection(ServerEndpoint endpoint, const std::string& authoriza | |
std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>( | ||
*this, ident, std::move(endpoint), authorization_header_name, custom_http_headers, | ||
verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback), | ||
std::move(proxy_config), server_slot.reconnect_info, | ||
sync_mode); // Throws | ||
std::move(proxy_config), server_slot.reconnect_info); // Throws | ||
ClientImpl::Connection& conn = *conn_2; | ||
if (!m_one_connection_per_session) { | ||
server_slot.connection = std::move(conn_2); | ||
|
@@ -982,6 +1030,10 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data | |
REALM_ASSERT(!err_processing_err); | ||
return SyncClientHookAction::EarlyReturn; | ||
} | ||
case realm::SyncClientHookAction::TriggerReconnect: { | ||
get_connection().voluntary_disconnect(); | ||
return SyncClientHookAction::EarlyReturn; | ||
} | ||
default: | ||
return action; | ||
} | ||
|
@@ -1082,6 +1134,8 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub | |
, m_protocol_envelope{config.protocol_envelope} | ||
, m_server_address{std::move(config.server_address)} | ||
, m_server_port{config.server_port} | ||
, m_user_id(std::move(config.user_id)) | ||
, m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS) | ||
, m_authorization_header_name{config.authorization_header_name} | ||
, m_custom_http_headers{config.custom_http_headers} | ||
, m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate} | ||
|
@@ -1254,7 +1308,10 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<Connec | |
|
||
inline void SessionWrapper::initiate() | ||
{ | ||
do_initiate(m_protocol_envelope, m_server_address, m_server_port); // Throws | ||
REALM_ASSERT(!m_initiated); | ||
ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode}; | ||
m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws | ||
m_initiated = true; | ||
} | ||
|
||
|
||
|
@@ -1471,13 +1528,12 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) | |
REALM_ASSERT(!m_actualized); | ||
REALM_ASSERT(!m_sess); | ||
m_db->claim_sync_agent(); | ||
|
||
SyncServerMode sync_mode = m_flx_subscription_store ? SyncServerMode::FLX : SyncServerMode::PBS; | ||
auto sync_mode = endpoint.server_mode; | ||
|
||
bool was_created = false; | ||
ClientImpl::Connection& conn = m_client.get_connection( | ||
std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate, | ||
m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config, sync_mode, | ||
m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config, | ||
was_created); // Throws | ||
try { | ||
// FIXME: This only makes sense when each session uses a separate connection. | ||
|
@@ -1589,13 +1645,6 @@ inline void SessionWrapper::report_sync_transact(VersionID old_version, VersionI | |
m_sync_transact_handler(old_version, new_version); // Throws | ||
} | ||
|
||
void SessionWrapper::do_initiate(ProtocolEnvelope protocol, std::string server_address, port_type server_port) | ||
{ | ||
REALM_ASSERT(!m_initiated); | ||
ServerEndpoint server_endpoint{protocol, std::move(server_address), server_port}; | ||
m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws | ||
m_initiated = true; | ||
} | ||
|
||
inline void SessionWrapper::on_sync_progress() | ||
{ | ||
|
@@ -1776,6 +1825,29 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement() | |
}); | ||
} | ||
|
||
std::string SessionWrapper::get_appservices_connection_id() | ||
{ | ||
auto pf = util::make_promise_future<std::string>(); | ||
REALM_ASSERT(m_initiated); | ||
|
||
util::bind_ptr<SessionWrapper> self(this); | ||
get_client().post([self, promise = std::move(pf.promise)](Status status) mutable { | ||
if (!status.is_ok()) { | ||
promise.set_error(status); | ||
return; | ||
} | ||
|
||
if (!self->m_sess) { | ||
promise.set_error({ErrorCodes::RuntimeError, "session already finalized"}); | ||
return; | ||
} | ||
|
||
promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id()); | ||
}); | ||
|
||
return pf.future.get(); | ||
} | ||
|
||
// ################ ClientImpl::Connection ################ | ||
|
||
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint, | ||
|
@@ -1784,20 +1856,15 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide | |
bool verify_servers_ssl_certificate, | ||
Optional<std::string> ssl_trust_certificate_path, | ||
std::function<SSLVerifyCallback> ssl_verify_callback, | ||
Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info, | ||
SyncServerMode sync_mode) | ||
Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info) | ||
: logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws | ||
, logger{*logger_ptr} | ||
, m_client{client} | ||
, m_protocol_envelope{std::get<0>(endpoint)} | ||
, m_address{std::get<1>(endpoint)} | ||
, m_port{std::get<2>(endpoint)} | ||
, m_verify_servers_ssl_certificate{verify_servers_ssl_certificate} // DEPRECATED | ||
, m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED | ||
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED | ||
, m_proxy_config{std::move(proxy_config)} // DEPRECATED | ||
, m_reconnect_info{reconnect_info} | ||
, m_sync_mode(sync_mode) | ||
, m_ident{ident} | ||
, m_server_endpoint{std::move(endpoint)} | ||
, m_authorization_header_name{authorization_header_name} // DEPRECATED | ||
|
@@ -1921,6 +1988,10 @@ void Client::cancel_reconnect_delay() | |
m_impl->cancel_reconnect_delay(); | ||
} | ||
|
||
void Client::voluntary_disconnect_all_connections() | ||
{ | ||
m_impl->voluntary_disconnect_all_connections(); | ||
} | ||
|
||
bool Client::wait_for_session_terminations_or_client_stopped() | ||
{ | ||
|
@@ -2028,6 +2099,11 @@ util::Future<std::string> Session::send_test_command(std::string body) | |
return m_impl->send_test_command(std::move(body)); | ||
} | ||
|
||
std::string Session::get_appservices_connection_id() | ||
{ | ||
return m_impl->get_appservices_connection_id(); | ||
} | ||
|
||
const std::error_category& client_error_category() noexcept | ||
{ | ||
return g_error_category; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't multiplexing enabled by default? Do we need the extra variant with the env_var and CMAKE setting? And if we keep it, should it also run the sync or object store tests instead of simply compiling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this PR just fixes it, but doesn't turn it on by default. My plan was to release this and do a separate much smaller PR that enables it by default and then this variant will be switched to test that sync works with multiplexing turned off so we don't regress and of those code paths the way multiplexing regressed.