8000 Add migrated state information to flexible sync client BIND message by michael-wb · Pull Request #6464 · realm/realm-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add migrated state information to flexible sync client BIND message #6464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Fix compiler warnings with MacOS Clang 14.0.3 ([PR #6467](https://github.com/realm/realm-core/pull/6467))
* Perform a client reset to migrate a sync'd realm from PBS to FLX and vice versa ([#6393](https://github.com/realm/realm-core/issues/6393))
* The following unused util headers have been deleted: call_with_tuple.hpp, get_file_size.hpp, inspect.hpp, substitute.hpp, type_list.hpp, and utf8.hpp.
* Add migrated state information to flexible sync client BIND message for backfilling PBS partition value in objects once client has migrated to FLX. ([PR #6464](https://github.com/realm/realm-core/pull/6464))

----------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ tasks:
export DEVELOPER_DIR="${xcode_developer_dir}"
fi

./evergreen/install_baas.sh -w ./baas-work-dir -b b530bafb31a2c59bcf9ae48bd0ac1ba625ae130b 2>&1 | tee install_baas_output.log
./evergreen/install_baas.sh -w ./baas-work-dir -b 5d5a4f470da7f84132ff97c8b56368e98ad17489 2>&1 | tee install_baas_output.log
fi

- command: shell.exec
Expand Down
2 changes: 2 additions & 0 deletions src/realm/error_codes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ ErrorCategory ErrorCodes::error_categories(Error code)
case UserDisabled:
case UserNotFound:
case UserpassTokenInvalid:
case InvalidServerResponse:
case ValueAlreadyExists:
case ValueDuplicateName:
case ValueNotFound:
Expand Down Expand Up @@ -293,6 +294,7 @@ static const MapElem string_to_error_code[] = {
{"InvalidQueryArg", ErrorCodes::InvalidQueryArg},
{"InvalidSchemaChange", ErrorCodes::InvalidSchemaChange},
{"InvalidSchemaVersion", ErrorCodes::InvalidSchemaVersion},
{"InvalidServerResponse", ErrorCodes::InvalidServerResponse},
{"InvalidSession", ErrorCodes::InvalidSession},
{"InvalidSortDescriptor", ErrorCodes::InvalidSortDescriptor},
{"InvalidTableRef", ErrorCodes::InvalidTableRef},
Expand Down
1 change: 1 addition & 0 deletions src/realm/error_codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ typedef enum realm_errno {
RLM_ERR_APP_UNKNOWN = 4351,
RLM_ERR_MAINTENANCE_IN_PROGRESS = 4352,
RLM_ERR_USERPASS_TOKEN_INVALID = 4353,
RLM_ERR_INVALID_SERVER_RESPONSE = 4354,

RLM_ERR_WEBSOCKET_RESOLVE_FAILED_ERROR = 4400,
RLM_ERR_WEBSOCKET_CONNECTION_CLOSED_CLIENT_ERROR = 4401,
Expand Down
2 changes: 2 additions & 0 deletions src/realm/error_codes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class ErrorCodes {
AppUnknownError = RLM_ERR_APP_UNKNOWN,
MaintenanceInProgress = RLM_ERR_MAINTENANCE_IN_PROGRESS,
UserpassTokenInvalid = RLM_ERR_USERPASS_TOKEN_INVALID,
InvalidServerResponse = RLM_ERR_INVALID_SERVER_RESPONSE,

WebSocketResolveFailedError = RLM_ERR_WEBSOCKET_RESOLVE_FAILED_ERROR,
WebSocketConnectionClosedClientError = RLM_ERR_WEBSOCKET_CONNECTION_CLOSED_CLIENT_ERROR,
WebSocketConnectionClosedServerError = RLM_ERR_WEBSOCKET_CONNECTION_CLOSED_SERVER_ERROR,
Expand Down
25 changes: 14 additions & 11 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,12 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
return util::Future<sync::SubscriptionSet::State>::make_ready(state);
}

fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store,
m_migration_store->get_query_string());
// fresh_sync_session is using a new realm file that doesn't have the migration_store info
// so the query string from the local migration store will need to be provided
auto query_string = m_migration_store->get_query_string();
REALM_ASSERT(query_string);
fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);

auto latest_subs = fresh_sub_store->get_latest();
{
util::CheckedLockGuard lock(m_state_mutex);
Expand Down Expand Up @@ -664,21 +668,19 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
// Original config was PBS, migrating to FLX
m_migration_store->migrate_to_flx(*error.migration_query_string);
m_migration_store->migrate_to_flx(*error.migration_query_string,
m_original_sync_config->partition_value);
save_sync_config_after_migration();
download_fresh_realm(error.server_requests_action);
return;
case sync::ProtocolErrorInfo::Action::RevertToPBS:
// If the client was updated to use FLX natively, but the server was rolled back to PBS,
// propagate the error up to the user
// the server should be sending switch_to_flx_sync; throw exception if this error is not
// received.
if (m_original_sync_config->flx_sync_requested) {
// Update error to the "switch to PBS" connect error
error = sync::SessionErrorInfo(make_error_code(sync::ProtocolError::switch_to_pbs),
"Server rolled back after flexible sync migration - cannot "
"connect with flexible sync config",
false);
next_state = NextStateAfterError::error;
break;
throw LogicError(ErrorCodes::InvalidServerResponse,
"Received 'RevertToPBS' from server after rollback while client is natively "
"using FLX - expected 'SwitchToPBS'");
}
// Original config was PBS, cancel the migration
m_migration_store->cancel_migration();
Expand Down Expand Up @@ -871,6 +873,7 @@ void SyncSession::create_sync_session()
session_config.proxy_config = sync_config.proxy_config;
session_config.simulate_integration_error = sync_config.simulate_integration_error;
session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;

if (sync_config.on_sync_client_event_hook) {
session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
anchor = weak_from_this()](const SyncClientHookData& data) {
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ MigrationStore* SessionImpl::get_migration_store()
return m_wrapper.get_migration_store();
}

void SessionImpl::non_sync_flx_completion(int64_t version)
void SessionImpl::on_flx_sync_version_complete(int64_t version)
{
m_wrapper.on_flx_sync_version_complete(version);
}
Expand Down
31 changes: 26 additions & 5 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,6 @@ void Session::send_bind_message()
REALM_ASSERT(m_state == Active);

session_ident_type session_ident = m_ident;
const std::string& path = get_virt_path();
bool need_client_file_ident = !have_client_file_ident();
const bool is_subserver = false;

Expand All @@ -1961,9 +1960,31 @@ void Session::send_bind_message()
int protocol_version = m_conn.get_negotiated_protocol_version();
OutputBuffer& out = m_conn.get_output_buffer();
// Discard the token since it's ignored by the server.
std::string empty_access_token{};
protocol.make_bind_message(protocol_version, out, session_ident, path, empty_access_token, need_client_file_ident,
is_subserver); // Throws
std::string empty_access_token;
if (m_is_flx_sync_session) {
nlohmann::json bind_json_data;
if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
bind_json_data["migratedPartition"] = *migrated_partition;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so server_path is not needed anymore for flx bind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not being used - the call to get_virt_path() returns the partition value provided in SyncConfig, which is always an empty string for FLX sync.

}
if (logger.would_log(util::Logger::Level::debug)) {
std::string json_data_dump;
if (!bind_json_data.empty()) {
json_data_dump = bind_json_data.dump();
}
logger.debug(
"Sending: BIND(session_ident=%1, need_client_file_ident=%2 is_subserver=%3 json_data=\"%4\")",
session_ident, need_client_file_ident, is_subserver, json_data_dump);
}
protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
need_client_file_ident, is_subserver); // Throws
}
else {
std::string server_path = get_virt_path();
logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2 is_subserver=%3 server_path=%4)",
session_ident, need_client_file_ident, is_subserver, server_path);
protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: is there a plan, at some point, to make pbs work with a json so we don't need to treat it differently than flx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so - the JSON data was "shimmed" into the BIND message for FLX, since the existing server_path section, which is used to send the partition value in PBS, was not used by FLX. In addition, since we want to move away from PBS eventually, there likely won't be a need for adding JSON values.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great!

need_client_file_ident, is_subserver); // Throws
}
m_conn.initiate_write_message(out, this); // Throws

m_bind_message_sent = true;
Expand Down Expand Up @@ -2329,7 +2350,7 @@ std::error_code Session::receive_ident_message(SaltedFileIdent client_file_ident
// this point forward.
auto client_reset_operation = std::move(m_client_reset_operation);
util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) {
this->non_sync_flx_completion(version);
this->on_flx_sync_version_complete(version);
};
if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(),
std::move(on_flx_subscription_complete))) {
Expand Down
5 changes: 3 additions & 2 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
#include <realm/sync/network/default_socket.hpp>
#include <realm/util/span.hpp>
#include <realm/sync/noinst/client_history_impl.hpp>
#include <realm/sync/noinst/protocol_codec.hpp>
#include <realm/sync/noinst/client_reset_operation.hpp>
#include <realm/sync/noinst/migration_store.hpp>
#include <realm/sync/noinst/protocol_codec.hpp>< BEA9 /span>
#include <realm/sync/client_base.hpp>
#include <realm/sync/history.hpp>
#include <realm/sync/protocol.hpp>
Expand Down Expand Up @@ -750,7 +751,7 @@ class ClientImpl::Session {

/// Update internal client state when a flx subscription becomes complete outside
/// of the normal sync process. This can happen during client reset.
void non_sync_flx_completion(int64_t version);
void on_flx_sync_version_complete(int64_t version);

/// \brief Callback for when a new subscription set has been created for FLX sync.
void on_new_flx_subscription_set(int64_t new_version);
Expand Down
62 changes: 41 additions & 21 deletions src/realm/sync/noinst/migration_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ namespace realm::sync {
namespace {
constexpr static int c_schema_version = 1;
constexpr static std::string_view c_flx_migration_table("flx_migration");
constexpr static std::string_view c_flx_migration_started_at("flx_migration_started_at");
constexpr static std::string_view c_flx_migration_completed_at("flx_migration_completed_at");
constexpr static std::string_view c_flx_migration_state("flx_migration_state");
constexpr static std::string_view c_flx_migration_query_string("flx_migration_query_string");

constexpr static std::string_view c_flx_migration_started_at("started_at");
constexpr static std::string_view c_flx_migration_completed_at("completed_at");
constexpr static std::string_view c_flx_migration_state("state");
constexpr static std::string_view c_flx_migration_query_string("query_string");
constexpr static std::string_view c_flx_migration_original_partition("original_partition");
constexpr static std::string_view c_flx_subscription_name_prefix("flx_migrated_");

class MigrationStoreInit : public MigrationStore {
Expand All @@ -32,7 +32,6 @@ MigrationStoreRef MigrationStore::create(DBRef db)
MigrationStore::MigrationStore(DBRef db)
: m_db(std::move(db))
, m_state(MigrationState::NotMigrated)
, m_query_string{}
{
load_data(true); // read_only, default to NotMigrated if table is not initialized
}
Expand All @@ -51,6 +50,7 @@ bool MigrationStore::load_data(bool read_only)
{&m_migration_completed_at, c_flx_migration_completed_at, type_Timestamp},
< 4D1F span class='blob-code-inner blob-code-marker ' data-code-marker=" "> {&m_migration_state, c_flx_migration_state, type_Int},
{&m_migration_query_str, c_flx_migration_query_string, type_String},
{&m_migration_partition, c_flx_migration_original_partition, type_String},
}},
};

Expand Down Expand Up @@ -91,10 +91,12 @@ bool MigrationStore::load_data(bool read_only)
auto migration_store_obj = migration_table->get_object(0);
m_state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
m_query_string = migration_store_obj.get<String>(m_migration_query_str);
m_migrated_partition = migration_store_obj.get<String>(m_migration_partition);
}
else {
m_state = MigrationState::NotMigrated;
m_query_string = {};
m_query_string.reset();
m_migrated_partition.reset();
}
return true;
}
Expand Down Expand Up @@ -132,9 +134,17 @@ void MigrationStore::complete_migration()
tr->co F438 mmit();
}

std::string MigrationStore::get_query_string()
std::optional<std::string> MigrationStore::get_migrated_partition()
{
std::lock_guard lock{m_mutex};
// This will be valid if migration in progress or complete
return m_migrated_partition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just realizing that this function and get_query_string() are returning a string_view which is an unowned view of a std::string protected by a lock but don't guarantee that the lock is held after returned, so it's possible to call get_migrated_partition() and have the returned string_view pointing to free'd or otherwise invalid memory after return.

I think both this and get_query_string() need to return a std::optionalstd::string

}

std::optional<std::string> MigrationStore::get_query_string()
{
std::lock_guard lock{m_mutex};
// This will be valid if migration in progress or complete
return m_query_string;
}

Expand All @@ -143,14 +153,18 @@ std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shar
REALM_ASSERT(config);
// If load data failed in the constructor, m_state defaults to NotMigrated

{
std::unique_lock lock{m_mutex};
if (config->flx_sync_requested) {
return config;
}
if (m_state == MigrationState::NotMigrated) {
return config;
}
std::unique_lock lock{m_mutex};
if (config->flx_sync_requested || m_state == MigrationState::NotMigrated) {
return config;
}

// Once in the migrated state, the partition value cannot change for the same realm file
if (m_state == MigrationState::Migrated && m_migrated_partition &&
m_migrated_partition != config->partition_value) {
throw LogicError(
ErrorCodes::IllegalOperation,
util::format("Partition value cannot be changed for migrated realms\n - original: %1\n - config: %2",
m_migrated_partition, config->partition_value));
}

auto flx_config = std::make_shared<realm::SyncConfig>(*config); // deep copy
Expand All @@ -160,7 +174,7 @@ std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shar
return flx_config;
}

void MigrationStore::migrate_to_flx(std::string_view rql_query_string)
void MigrationStore::migrate_to_flx(std::string_view rql_query_string, std::string_view partition_value)
{
REALM_ASSERT(!rql_query_string.empty());

Expand All @@ -171,26 +185,30 @@ void MigrationStore::migrate_to_flx(std::string_view rql_query_string)
std::unique_lock lock{m_mutex};
// Can call migrate_to_flx multiple times if migration has not completed.
REALM_ASSERT(m_state != MigrationState::Migrated);
m_query_string = rql_query_string;
m_state = MigrationState::InProgress;
m_query_string.emplace(rql_query_string);
m_migrated_partition.emplace(partition_value);

auto tr = m_db->start_read();
auto migration_table = tr->get_table(m_migration_table);
// A migration object may exist if the migration was started in a previous session.
if (migration_table->is_empty()) {
tr->promote_to_write();
auto migration_store_obj = migration_table->create_object();
migration_store_obj.set(m_migration_query_str, m_query_string);
migration_store_obj.set(m_migration_query_str, *m_query_string);
migration_store_obj.set(m_migration_state, int64_t(m_state));
migration_store_obj.set(m_migration_partition, *m_migrated_partition);
migration_store_obj.set(m_migration_started_at, Timestamp{std::chrono::system_clock::now()});
tr->commit();
}
else {
auto migration_store_obj = migration_table->get_object(0);
auto state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
auto query_string = migration_store_obj.get<String>(m_migration_query_str);
auto migrated_partition = migration_store_obj.get<String>(m_migration_partition);
REALM_ASSERT(m_state == state);
REALM_ASSERT(m_query_string == query_string);
REALM_ASSERT(m_migrated_partition == migrated_partition);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
}
Expand Down Expand Up @@ -218,7 +236,8 @@ void MigrationStore::clear(std::unique_lock<std::mutex>)
}

m_state = MigrationState::NotMigrated;
m_query_string = {};
m_query_string.reset();
m_migrated_partition.reset();
tr->promote_to_write();
migration_table->clear();
tr->commit();
Expand All @@ -239,7 +258,8 @@ void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store)
return;
}

create_subscriptions(subs_store, m_query_string);
REALM_ASSERT(m_query_string);
create_subscriptions(subs_store, *m_query_string);
}

void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store, const std::string& rql_query_string)
Expand Down
12 changes: 9 additions & 3 deletions src/realm/sync/noinst/migration_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,21 @@ class MigrationStore : public std::enable_shared_from_this<MigrationStore> {
std::shared_ptr<realm::SyncConfig> convert_sync_config(std::shared_ptr<realm::SyncConfig> config);

// Called when the server responds with migrate to FLX and stores the FLX subscription RQL query string.
void migrate_to_flx(std::string_view rql_query_string);
void migrate_to_flx(std::string_view rql_query_string, std::string_view partition_value);

// Clear the migrated state
void cancel_migration();

// Is a client migration to FLX in progress?
bool is_migration_in_progress();
// Has the client migration to FLX completed?
bool is_migrated();

// Mark the migration complete and update the state. No-op if not in 'InProgress' state.
void complete_migration();

std::string get_query_string();
std::optional<std::string> get_migrated_partition();
std::optional<std::string> get_query_string();

// Create subscriptions for each table that does not have a subscription.
// If subscriptions are created, they are commited and a change of query is sent to the server.
Expand Down Expand Up @@ -93,12 +96,15 @@ class MigrationStore : public std::enable_shared_from_this<MigrationStore> {
ColKey m_migration_completed_at;
ColKey m_migration_state;
ColKey m_migration_query_str;
ColKey m_migration_partition;

std::mutex m_mutex;
// Current migration state
MigrationState m_state;
// RQL query string received from the server
std::string m_query_string;
std::optional<std::string> m_query_string;
// The original PBS partition string before the migration
std::optional<std::string> m_migrated_partition;
};

} // namespace realm::sync
Loading
0