8000 Integrate PBS->FLX client migration into protocol by michael-wb · Pull Request #6355 · realm/realm-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Integrate PBS->FLX client migration into protocol #6355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f20cb46
First cut of flx migration protocol handling
Mar 2, 2023
73b47d8
Updated SyncSession realm config initialization
Mar 2, 2023
d446180
Updated changelog
Mar 2, 2023
47635b2
Updates from review
Mar 3, 2023
39f8e4d
Updated use of migration_store, since it can be null
Mar 3, 2023
4abe097
Reverted update_configuration
Mar 3, 2023
c4e3f5a
Can't easily use static func for clear()
Mar 3, 2023
fe059a6
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 3, 2023
6ba259a
Updated protocol handling to prevent releasing session waiters
Mar 15, 2023
3a62c93
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 15, 2023
ac00b73
updated changelog after release
Mar 15, 2023
072f429
Updates to fix tsan errors
Mar 16, 2023
3f56270
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 16, 2023
2af6539
Updates from review
Mar 16, 2023
52e7c8b
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 16, 2023
cb1bfce
additional updates from review
Mar 16, 2023
380a82a
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 16, 2023
f14d561
Updates from review - added read-only support for metadata schema ver…
Mar 16, 2023
7f9d1b3
Updated SyncSession handle_error to take a SessionErrorInfo
Mar 17, 2023
6348ec4
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 17, 2023
a0dd509
updates from review - added test for SyncMetadataSchemaVersions
Mar 18, 2023
b4f888a
Added a few more SyncMetadataSchemaVersions tests
Mar 18, 2023
f8e64c6
converted some util::Optionals to std::optional
Mar 18, 2023
538284e
Updated some comments
Mar 19, 2023
6533ab2
Removed is_initialized and util::Optional to std::optional
Mar 19, 2023
48a71d3
Updated error if using FLX after server rolled back
Mar 20, 2023
84757c4
Merge branch 'master' of github.com:realm/realm-core into mwb/migrati…
Mar 20, 2023
ee32db7
More updates from review
Mar 21, 2023
cee89f8
Removed transaction locking for updating history validator
Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

### Internals
* Add admin api and test for performing the PBS->FLX migration and roll back on the server. (PR [#6366](https://github.com/realm/realm-core/pull/6366))
* Integrate protocol support for PBS->FLX client migration ([PR #6355](https://github.com/realm/realm-core/pull/6355))

----------------------------------------------

Expand Down
2 changes: 2 additions & 0 deletions src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3369,6 +3369,8 @@ typedef enum realm_sync_error_action {
RLM_SYNC_ERROR_ACTION_DELETE_REALM,
RLM_SYNC_ERROR_ACTION_CLIENT_RESET,
RLM_SYNC_ERROR_ACTION_CLIENT_RESET_NO_RECOVERY,
RLM_SYNC_ERROR_ACTION_MIGRATE_TO_FLX,
RLM_SYNC_ERROR_ACTION_REVERT_TO_PBS,
} realm_sync_error_action_e;

typedef struct realm_sync_session realm_sync_session_t;
Expand Down
2 changes: 2 additions & 0 deletions src/realm/error_codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ typedef enum realm_sync_errno_session {
RLM_SYNC_ERR_SESSION_INITIAL_SYNC_NOT_COMPLETED = 229,
RLM_SYNC_ERR_SESSION_WRITE_NOT_ALLOWED = 230,
RLM_SYNC_ERR_SESSION_COMPENSATING_WRITE = 231,
RLM_SYNC_ERR_SESSION_MIGRATE_TO_FLX = 232,
RLM_SYNC_ERR_SESSION_BAD_PROGRESS = 233,
RLM_SYNC_ERR_SESSION_REVERT_TO_PBS = 234,
} realm_sync_errno_session_e;

typedef enum realm_web_socket_errno {
Expand Down
6 changes: 5 additions & 1 deletion src/realm/object-store/c_api/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientReset)
RLM_SYNC_ERROR_ACTION_CLIENT_RESET);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientResetNoRecovery) ==
RLM_SYNC_ERROR_ACTION_CLIENT_RESET_NO_RECOVERY);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::MigrateToFLX) ==
RLM_SYNC_ERROR_ACTION_MIGRATE_TO_FLX);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::RevertToPBS) ==
RLM_SYNC_ERROR_ACTION_REVERT_TO_PBS);

static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Pending) ==
RLM_SYNC_SUBSCRIPTION_PENDING);
Expand Down Expand Up @@ -878,7 +882,7 @@ RLM_API void realm_sync_session_handle_error_for_testing(const realm_sync_sessio
error_message};
std::error_code err;
sync_error_to_error_code(sync_error, &err);
SyncSession::OnlyForTesting::handle_error(*session->get(), {err, error_message, is_fatal});
SyncSession::OnlyForTesting::handle_error(*session->get(), sync::SessionErrorInfo{err, error_message, !is_fatal});
}

} // namespace realm::c_api
273 changes: 185 additions & 88 deletions src/realm/object-store/sync/sync_session.cpp

Large diffs are not rendered by default.

29 changes: 22 additions & 7 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class SyncUser;

namespace sync {
class Session;
struct SessionErrorInfo;
class MigrationStore;
}

namespace _impl {
Expand Down Expand Up @@ -227,6 +229,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
// Update the sync configuration used for this session. The new configuration must have the
// same user and reference realm url as the old configuration. The session will immediately
// disconnect (if it was active), and then attempt to connect using the new configuration.
// This is primarily intended to be used for TESTING only, even though it is used by the
// Swift SDK in `setCustomRequestHeaders` and is defined in the realm-js bindgen definitions.
void update_configuration(SyncConfig new_config)
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);

Expand Down Expand Up @@ -254,7 +258,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return m_server_url;
}

const std::shared_ptr<sync::SubscriptionStore>& get_flx_subscription_store();
std::shared_ptr<sync::SubscriptionStore> get_flx_subscription_store() REQUIRES(!m_state_mutex);

// Create an external reference to this session. The sync session attempts to remain active
// as long as an external reference to the session exists.
Expand Down Expand Up @@ -286,10 +290,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

// Expose some internal functionality to testing code.
struct OnlyForTesting {
static void handle_error(SyncSession& session, SyncError error)
{
session.handle_error(std::move(error));
}
static void handle_error(SyncSession& session, sync::SessionErrorInfo&& error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually add an overload of handle_error that takes a SyncError here? I believe SDKs actually use this for testing. They won't be able to mock out the migration query strong responses that way, but I think that's okay and it'll make adopting this easier for SDKs and our existing tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the SyncError should be converted to a SessionErrorInfo and passed through the current handle_error() function.

static void handle_error(SyncSession& session, SyncError&& error);
static void nonsync_transact_notify(SyncSession& session, VersionID::version_type version)
{
session.nonsync_transact_notify(version);
Expand Down Expand Up @@ -358,12 +360,17 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

SyncSession(_impl::SyncClient&, std::shared_ptr<DB>, const RealmConfig&, SyncManager* sync_manager);

// Initialize or tear down the subscription store based on whether or not flx_sync_requested is true
void update_subscription_store(bool flx_sync_requested) REQUIRES(m_state_mutex);
// Update the sync config after a PBS->FLX migration or FLX->PBS rollback occurs
void update_sync_config_after_migration() REQUIRES(!m_config_mutex, !m_state_mutex);

void download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
REQUIRES(!m_config_mutex, !m_state_mutex, !m_connection_state_mutex);
void handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action)
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_error(SyncError) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_error(sync::SessionErrorInfo) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status error_code, std::string_view context_message)
REQUIRES(!m_state_mutex, !m_config_mutex);
void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex);
Expand All @@ -387,6 +394,10 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void become_paused(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_waiting_for_access_token() REQUIRES(m_state_mutex);

// do restart session restarts the session without freeing any of the waiters
void do_restart_session(util::CheckedUniqueLock)
REQUIRES(m_state_mutex, !m_connection_state_mutex, !m_config_mutex);

// do_become_inactive is called from both become_paused()/become_inactive() and does all the steps to
// shutdown and cleanup the sync session besides setting m_state.
void do_become_inactive(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex)
Expand Down Expand Up @@ -427,7 +438,11 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
mutable util::CheckedMutex m_config_mutex;
RealmConfig m_config GUARDED_BY(m_config_mutex);
const std::shared_ptr<DB> m_db;
const std::shared_ptr<sync::SubscriptionStore> m_flx_subscription_store;
std::shared_ptr<sync::SubscriptionStore> m_flx_subscription_store GUARDED_BY(m_state_mutex);
std::optional<bool> m_needs_subscription_store_updated GUARDED_BY(m_state_mutex);
// Original sync config for reverting back to PBS if FLX migration is rolled back
const std::shared_ptr<SyncConfig> m_original_sync_config; // does not change after construction
const std::shared_ptr<sync::MigrationStore> m_migration_store;
sync::ProtocolErrorInfo::Action
m_server_requests_action GUARDED_BY(m_state_mutex) = sync::ProtocolErrorInfo::Action::NoAction;
DBRef m_client_reset_fresh_copy GUARDED_BY(m_state_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using ProtocolError = realm::sync::ProtocolError;
static const constexpr std::string_view s_middle(" Logs: ");

SyncError::SyncError(std::error_code error_code, std::string_view msg, bool is_fatal,
util::Optional<std::string_view> serverLog,
std::optional<std::string_view> serverLog,
std::vector<sync::CompensatingWriteErrorInfo> compensating_writes)
: SystemError(error_code, serverLog ? util::format("%1%2%3", msg, s_middle, *serverLog) : std::string(msg))
, is_fatal(is_fatal)
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct SyncError : public SystemError {
std::vector<sync::CompensatingWriteErrorInfo> compensating_writes_info;

SyncError(std::error_code error_code, std::string_view msg, bool is_fatal,
util::Optional<std::string_view> serverLog = util::none,
std::optional<std::string_view> serverLog = std::nullopt,
std::vector<sync::CompensatingWriteErrorInfo> compensating_writes = {});

static constexpr const char c_original_file_path_key[] = "ORIGINAL_FILE_PATH";
Expand Down
174 changes: 119 additions & 55 deletions src/realm/sync/noinst/migration_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace realm::sync {
namespace {
constexpr static int c_schema_version = 1;
constexpr static std::string_view c_flx_migration_table("flx_migration");
constexpr static std::string_view c_flx_migration_started_at("flx_migration_started_at");
constexpr static std::string_view c_flx_migration_completed_at("flx_migration_completed_at");
constexpr static std::string_view c_flx_migration_state("flx_migration_state");
constexpr static std::string_view c_flx_migration_query_string("flx_migration_query_string");
Expand All @@ -16,72 +15,117 @@ constexpr static std::string_view c_flx_subscription_name_prefix("flx_migrated_"

class MigrationStoreInit : public MigrationStore {
public:
explicit MigrationStoreInit(DBRef db,
std::function<void(MigrationStore::MigrationState)>&& on_migration_state_changed)
: MigrationStore(std::move(db), std::move(on_migration_state_changed))
explicit MigrationStoreInit(DBRef db)
: MigrationStore(std::move(db))
{
}
};

} // namespace

MigrationStoreRef
MigrationStore::create(DBRef db, std::function<void(MigrationStore::MigrationState)>&& on_migration_state_changed)
MigrationStoreRef MigrationStore::create(DBRef db)
{
return std::make_shared<MigrationStoreInit>(std::move(db), std::move(on_migration_state_changed));
return std::make_shared<MigrationStoreInit>(std::move(db));
}

MigrationStore::MigrationStore(DBRef db,
std::function<void(MigrationStore::MigrationState)>&& on_migration_state_changed)
MigrationStore::MigrationStore(DBRef db)
: m_db(std::move(db))
, m_on_migration_state_changed(std::move(on_migration_state_changed))
, m_state(MigrationState::NotMigrated)
, m_query_string{}
{
load_data(true); // read_only, default to NotMigrated if table is not initialized
}

bool MigrationStore::load_data(bool read_only)
{
if (m_migration_table) {
return true; // already initialized
}

std::vector<SyncMetadataTable> internal_tables{
{&m_migration_table,
c_flx_migration_table,
{
{&m_migration_started_at, c_flx_migration_started_at, type_Timestamp},
{&m_migration_started_at, c_flx_migration_completed_at, type_Timestamp},
{&m_migration_completed_at, c_flx_migration_completed_at, type_Timestamp},
{&m_migration_state, c_flx_migration_state, type_Int},
{&m_migration_query_str, c_flx_migration_query_string, type_String},
}},
};

std::optional<int64_t> schema_version;
auto tr = m_db->start_read();
SyncMetadataSchemaVersions schema_versions(tr);

if (auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
!schema_version) {
tr->promote_to_write();
schema_versions.set_version_for(tr, internal_schema_groups::c_flx_migration_store, c_schema_version);
create_sync_metadata_schema(tr, &internal_tables);
// create migration object
auto migration_store_obj = tr->get_table(m_migration_table)->create_object();
migration_store_obj.set(m_migration_state, int64_t(MigrationState::NotStarted));
tr->commit_and_continue_as_read();
m_state = MigrationState::NotStarted;
if (read_only) {
// Writing is disabled
SyncMetadataSchemaVersionsReader schema_versions(tr);
schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
if (!schema_version) {
return false; // Either table is not initialized or version does not exist
}
}
else {
else { // writable
SyncMetadataSchemaVersions schema_versions(tr);
schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
// Create the version and metadata_schema if it doesn't exist
if (!schema_version) {
tr->promote_to_write();
schema_versions.set_version_for(tr, internal_schema_groups::c_flx_migration_store, c_schema_version);
create_sync_metadata_schema(tr, &internal_tables);
tr->commit_and_continue_as_read();
}
}
// Load the metadata schema unless it was just created
if (!m_migration_table) {
if (*schema_version != c_schema_version) {
throw std::runtime_error("Invalid schema version for flexible sync migration store metadata");
}
load_sync_metadata_schema(tr, &internal_tables);
}

m_on_migration_state_changed(m_state);
// Read the migration object if exists, or default to not migrated
std::lock_guard lock(m_mutex);
if (auto migration_table = tr->get_table(m_migration_table); !migration_table->is_empty()) {
auto migration_store_obj = migration_table->get_object(0);
m_state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
m_query_string = migration_store_obj.get<String>(m_migration_query_str);
}
else {
m_state = MigrationState::NotMigrated;
m_query_string = {};
}
return true;
}

bool MigrationStore::is_migrated()
{
std::lock_guard lock{m_mutex};
return m_state == MigrationState::Migrated;
}

std::string_view MigrationStore::get_query_string()
{
std::lock_guard lock{m_mutex};
return m_query_string;
}

std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shared_ptr<realm::SyncConfig> config)
{
REALM_ASSERT(config);
// If load data failed in the constructor, m_state defaults to NotMigrated

std::lock_guard lock{m_mutex};
if (config->flx_sync_requested) {
cancel_migration();
return config;
}
if (m_state == MigrationState::NotStarted) {
return config;
{
std::unique_lock lock{m_mutex};
if (config->flx_sync_requested) {
// Will need to clear the config if the user SyncConfig if FLX and state is
// migrated, but not during download fresh realm as part of a client reset
if (m_state == MigrationState::Migrated) {
// No need to fire the notification callback here, just proceed as normal
clear(std::move(lock));
}
return config;
}
if (m_state == MigrationState::NotMigrated) {
return config;
}
}

auto flx_config = std::make_shared<realm::SyncConfig>(*config); // deep copy
Expand All @@ -91,44 +135,64 @@ std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shar
return flx_config;
}

void MigrationStore::migrate_to_flx(std::string rql_query_string)
void MigrationStore::migrate_to_flx(std::string_view rql_query_string)
{
REALM_ASSERT(!rql_query_string.empty());

std::unique_lock lock{m_mutex};
m_query_string = rql_query_string;
m_state = MigrationState::Completed;
lock.unlock();

auto tr = m_db->start_write();
auto migration_store_obj = tr->get_table(m_migration_table)->get_object(0);
migration_store_obj.set(m_migration_query_str, m_query_string);
migration_store_obj.set(m_migration_state, int64_t(m_state));
migration_store_obj.set(m_migration_completed_at, Timestamp{std::chrono::system_clock::now()});
tr->commit();
// Ensure the migration table has been initialized
REALM_ASSERT(load_data());

m_on_migration_state_changed(m_state);
{
std::unique_lock lock{m_mutex};
REALM_ASSERT(m_state == MigrationState::NotMigrated);
m_query_string = rql_query_string;
m_state = MigrationState::Migrated;

auto tr = m_db->start_write();
auto migration_table = tr->get_table(m_migration_table);
// This should be called in the non-migrated state, so the migration table should not exist
REALM_ASSERT(migration_table->is_empty());
auto migration_store_obj = migration_table->create_object();
migration_store_obj.set(m_migration_query_str, m_query_string);
migration_store_obj.set(m_migration_state, int64_t(m_state));
migration_store_obj.set(m_migration_completed_at, Timestamp{std::chrono::system_clock::now()});
tr->commit();
}
}

void MigrationStore::cancel_migration()
{
auto tr = m_db->start_write();
auto migration_table = tr->get_table(m_migration_table);
migration_table->clear();
tr->commit();
// Ensure the migration table has been initialized
REALM_ASSERT(load_data());

{
std::lock_guard lock{m_mutex};
m_state = MigrationState::NotStarted;
// Clear the migration state
std::unique_lock lock{m_mutex};
REALM_ASSERT(m_state == MigrationState::Migrated);
clear(std::move(lock)); // releases the lock
}

void MigrationStore::clear(std::unique_lock<std::mutex>)
{
// Make sure the migration table has been initialized before calling clear()
REALM_ASSERT(m_migration_table);

auto tr = m_db->start_read();
auto migration_table = tr->get_table(m_migration_table);
if (migration_table->is_empty()) {
return; // already cleared
}

m_on_migration_state_changed(m_state);
m_state = MigrationState::NotMigrated;
m_query_string = {};
tr->promote_to_write();
migration_table->clear();
tr->commit();
}

std::optional<Subscription> MigrationStore::make_subscription(const std::string& object_class_name)
{
std::lock_guard lock{m_mutex};
if (m_state == MigrationState::NotStarted) {
if (m_state == MigrationState::NotMigrated) {
return std::nullopt;
}
if (object_class_name.empty()) {
Expand Down
Loading
0