-
Notifications
You must be signed in to change notification settings - Fork 178
Add migrated state information to flexible sync client BIND message #6464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c3e7128
fd328a3
2a6bdb5
a14058f
29b6de5
57b1bf5
2f70ed9
a393310
0c52615
54bcc1b
a8aba8c
1a0593c
944d91b
e72d185
3224b0a
aebfdd1
84eed56
9fb8740
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1952,7 +1952,6 @@ void Session::send_bind_message() | |
REALM_ASSERT(m_state == Active); | ||
|
||
session_ident_type session_ident = m_ident; | ||
const std::string& path = get_virt_path(); | ||
bool need_client_file_ident = !have_client_file_ident(); | ||
const bool is_subserver = false; | ||
|
||
|
@@ -1961,9 +1960,31 @@ void Session::send_bind_message() | |
int protocol_version = m_conn.get_negotiated_protocol_version(); | ||
OutputBuffer& out = m_conn.get_output_buffer(); | ||
// Discard the token since it's ignored by the server. | ||
std::string empty_access_token{}; | ||
protocol.make_bind_message(protocol_version, out, session_ident, path, empty_access_token, need_client_file_ident, | ||
is_subserver); // Throws | ||
std::string empty_access_token; | ||
if (m_is_flx_sync_session) { | ||
nlohmann::json bind_json_data; | ||
if (auto migrated_partition = get_migration_store()->get_migrated_partition()) { | ||
bind_json_data["migratedPartition"] = *migrated_partition; | ||
} | ||
if (logger.would_log(util::Logger::Level::debug)) { | ||
std::string json_data_dump; | ||
if (!bind_json_data.empty()) { | ||
json_data_dump = bind_json_data.dump(); | ||
} | ||
logger.debug( | ||
"Sending: BIND(session_ident=%1, need_client_file_ident=%2 is_subserver=%3 json_data=\"%4\")", | ||
session_ident, need_client_file_ident, is_subserver, json_data_dump); | ||
} | ||
protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token, | ||
need_client_file_ident, is_subserver); // Throws | ||
} | ||
else { | ||
std::string server_path = get_virt_path(); | ||
logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2 is_subserver=%3 server_path=%4)", | ||
session_ident, ne 10000 ed_client_file_ident, is_subserver, server_path); | ||
protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unrelated: is there a plan, at some point, to make pbs work with a json so we don't need to treat it differently than flx? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe so - the JSON data was "shimmed" into the BIND message for FLX, since the existing server_path section, which is used to send the partition value in PBS, was not used by FLX. In addition, since we want to move away from PBS eventually, there likely won't be a need for adding JSON values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great! |
||
need_client_file_ident, is_subserver); // Throws | ||
} | ||
m_conn.initiate_write_message(out, this); // Throws | ||
|
||
m_bind_message_sent = true; | ||
|
@@ -2329,7 +2350,7 @@ std::error_code Session::receive_ident_message(SaltedFileIdent client_file_ident | |
// this point forward. | ||
auto client_reset_operation = std::move(m_client_reset_operation); | ||
util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) { | ||
this->non_sync_flx_completion(version); | ||
this->on_flx_sync_version_complete(version); | ||
}; | ||
if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(), | ||
std::move(on_flx_subscription_complete))) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,11 +7,11 @@ namespace realm::sync { | |
namespace { | ||
constexpr static int c_schema_version = 1; | ||
constexpr static std::string_view c_flx_migration_table("flx_migration"); | ||
constexpr static std::string_view c_flx_migration_started_at("flx_migration_started_at"); | ||
constexpr static std::string_view c_flx_migration_completed_at("flx_migration_completed_at"); | ||
constexpr static std::string_view c_flx_migration_state("flx_migration_state"); | ||
constexpr static std::string_view c_flx_migration_query_string("flx_migration_query_string"); | ||
|
||
constexpr static std::string_view c_flx_migration_started_at("started_at"); | ||
constexpr static std::string_view c_flx_migration_completed_at("completed_at"); | ||
constexpr static std::string_view c_flx_migration_state("state"); | ||
constexpr static std::string_view c_flx_migration_query_string("query_string"); | ||
constexpr static std::string_view c_flx_migration_original_partition("original_partition"); | ||
constexpr static std::string_view c_flx_subscription_name_prefix("flx_migrated_"); | ||
|
||
class MigrationStoreInit : public MigrationStore { | ||
|
@@ -32,7 +32,6 @@ MigrationStoreRef MigrationStore::create(DBRef db) | |
MigrationStore::MigrationStore(DBRef db) | ||
: m_db(std::move(db)) | ||
, m_state(MigrationState::NotMigrated) | ||
, m_query_string{} | ||
{ | ||
load_data(true); // read_only, default to NotMigrated if table is not initialized | ||
} | ||
|
@@ -51,6 +50,7 @@ bool MigrationStore::load_data(bool read_only) | |
{&m_migration_completed_at, c_flx_migration_completed_at, type_Timestamp}, | ||
{&m_migration_state, c_flx_migration_state, type_Int}, | ||
{&m_migration_query_str, c_flx_migration_query_string, type_String}, | ||
{&m_migration_partition, c_flx_migration_original_partition, type_String}, | ||
}}, | ||
}; | ||
|
||
|
@@ -91,10 +91,12 @@ bool MigrationStore::load_data(bool read_only) | |
auto migration_store_obj = migration_table->get_object(0); | ||
m_state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state)); | ||
m_query_string = migration_store_obj.get<String>(m_migration_query_str); | ||
m_migrated_partition = migration_store_obj.get<String>(m_migration_partition); | ||
} | ||
else { | ||
m_state = MigrationState::NotMigrated; | ||
m_query_string = {}; | ||
m_query_string.reset(); | ||
m_migrated_partition.reset(); | ||
} | ||
return true; | ||
} | ||
|
@@ -132,9 +134,17 @@ void MigrationStore::complete_migration() | |
tr->commit(); | ||
} | ||
|
||
std::string MigrationStore::get_query_string() | ||
std::optional<std::string> MigrationStore::get_migrated_partition() | ||
{ | ||
std::lock_guard lock{m_mutex}; | ||
// This will be valid if migration in progress or complete | ||
return m_migrated_partition; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm just realizing that this function and get_query_string() are returning a string_view which is an unowned view of a std::string protected by a lock but don't guarantee that the lock is held after returned, so it's possible to call get_migrated_partition() and have the returned string_view pointing to free'd or otherwise invalid memory after return. I think both this and get_query_string() need to return a std::optionalstd::string |
||
} | ||
|
||
std::optional<std::string> MigrationStore::get_query_string() | ||
{ | ||
std::lock_guard lock{m_mutex}; | ||
// This will be valid if migration in progress or complete | ||
return m_query_string; | ||
} | ||
|
||
|
@@ -143,14 +153,18 @@ std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shar | |
REALM_ASSERT(config); | ||
// If load data failed in the constructor, m_state defaults to NotMigrated | ||
|
||
{ | ||
std::unique_lock lock{m_mutex}; | ||
if (config->flx_sync_requested) { | ||
return config; | ||
} | ||
if (m_state == MigrationState::NotMigrated) { | ||
return config; | ||
} | ||
std::unique_lock lock{m_mutex}; | ||
if (config->flx_sync_requested || m_state == MigrationState::NotMigrated) { | ||
return config; | ||
} | ||
|
||
// Once in the migrated state, the partition value cannot change for the same realm file | ||
kmorkos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (m_state == MigrationState::Migrated && m_migrated_partition && | ||
m_migrated_partition != config->partition_value) { | ||
throw LogicError( | ||
ErrorCodes::IllegalOperation, | ||
util::format("Partition value cannot be changed for migrated realms\n - original: %1\n - config: %2", | ||
m_migrated_partition, config->partition_value)); | ||
} | ||
|
||
auto flx_config = std::make_shared<realm::SyncConfig>(*config); // deep copy | ||
|
@@ -160,7 +174,7 @@ std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shar | |
return flx_config; | ||
} | ||
|
||
void MigrationStore::migrate_to_flx(std::string_view rql_query_string) | ||
void MigrationStore::migrate_to_flx(std::string_view rql_query_string, std::string_view partition_value) | ||
{ | ||
REALM_ASSERT(!rql_query_string.empty()); | ||
|
||
|
@@ -171,26 +185,30 @@ void MigrationStore::migrate_to_flx(std::string_view rql_query_string) | |
std::unique_lock lock{m_mutex}; | ||
// Can call migrate_to_flx multiple times if migration has not completed. | ||
REALM_ASSERT(m_state != MigrationState::Migrated); | ||
m_query_string = rql_query_string; | ||
m_state = MigrationState::InProgress; | ||
m_query_string.emplace(rql_query_string); | ||
m_migrated_partition.emplace(partition_value); | ||
|
||
10000 auto tr = m_db->start_read(); | ||
auto migration_table = tr->get_table(m_migration_table); | ||
// A migration object may exist if the migration was started in a previous session. | ||
if (migration_table->is_empty()) { | ||
tr->promote_to_write(); | ||
auto migration_store_obj = migration_table->create_object(); | ||
migration_store_obj.set(m_migration_query_str, m_query_string); | ||
migration_store_obj.set(m_migration_query_str, *m_query_string); | ||
migration_store_obj.set(m_migration_state, int64_t(m_state)); | ||
migration_store_obj.set(m_migration_partition, *m_migrated_partition); | ||
migration_store_obj.set(m_migration_started_at, Timestamp{std::chrono::system_clock::now()}); | ||
tr->commit(); | ||
} | ||
else { | ||
auto migration_store_obj = migration_table->get_object(0); | ||
auto state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state)); | ||
auto query_string = migration_store_obj.get<String>(m_migration_query_str); | ||
auto migrated_partition = migration_store_obj.get<String>(m_migration_partition); | ||
REALM_ASSERT(m_state == state); | ||
REALM_ASSERT(m_query_string == query_string); | ||
REALM_ASSERT(m_migrated_partition == migrated_partition); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
} | ||
} | ||
|
@@ -218,7 +236,8 @@ void MigrationStore::clear(std::unique_lock<std::mutex>) | |
} | ||
|
||
m_state = MigrationState::NotMigrated; | ||
m_query_string = {}; | ||
m_query_string.reset(); | ||
m_migrated_partition.reset(); | ||
tr->promote_to_write(); | ||
migration_table->clear(); | ||
tr->commit(); | ||
|
@@ -239,7 +258,8 @@ void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store) | |
return; | ||
} | ||
|
||
create_subscriptions(subs_store, m_query_string); | ||
REALM_ASSERT(m_query_string); | ||
create_subscriptions(subs_store, *m_query_string); | ||
} | ||
|
||
void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store, const std::string& rql_query_string) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so
server_path
is not needed anymore for flx bind?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was not being used - the call to
get_virt_path()
returns the partition value provided in SyncConfig, which is always an empty string for FLX sync.