-
Notifications
You must be signed in to change notification settings - Fork 179
Upload unsynced changes before updating to FLX native #6498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upload unsynced changes before updating to FLX native #6498
Conversation
// Create a subscription set used as sentinel. No-op if not in 'Migrated' state. | ||
// This method is idempotent (i.e, at most one subscription set can be createad during the lifetime of a | ||
// migration) | ||
void create_sentinel_subscription_set(const SubscriptionStore& subs_store); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For lack of better names.. I am open to suggestions
m_flx_subscription_store->get_by_version(*sentinel_query_version) | ||
.get_state_change_notification(sync::SubscriptionSet::State::Complete) | ||
.get_async([=](StatusWith<sync::SubscriptionSet::State> s) { | ||
if (s.is_ok()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also check that the state here matches Complete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safe the way it is. If anything we could assert that the state matches Complete
auto migration_table = tr->get_table(m_migration_table); | ||
REALM_ASSERT(!migration_table->is_empty()); | ||
auto migration_store_obj = migration_table->get_object(0); | ||
migration_store_obj.set(m_sentinel_query_version, *m_sentinel_subscription_set_version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is actually all you need? Just a write that sets this flag and then you can use the snapshot version of that write as a limit the way we use the snapshot version of a subscription set as a limit. Do we actually need to create a new subscription set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A flag would not be enough. We would need the snapshot version in case the session gets closed before uploading everything. This is the reason the sentinel version is stored.
return true; | ||
} | ||
|
||
auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct that this sentinel will always be set before a sync::Session
is created and will remain constant throughout the lifetime of the session? Like we create the sentinel in the constructor of SyncSession
before m_session
(i.e. this class) is created. So could we just set a member variable of sync::Session
while it's being constructed and have the sentinel query version throughout the lifetime of the session? That could allow us to avoid having to open a new separate transaction every time we want to send an upload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no transaction involved here. get_sentinel_subscription_set_version
returns a cached value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't the version marked by the sentinel subscription be a moving target if a session is opened but isn't able to connect and then more pending changes are added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Even if the session cannot connect to the server, the user may commit changes as native flx. The subscription set is the limit between migration flx and native flx (and this limit is only set once). There may be a problem if the user switches back to flx (after connecting with native flx), but I don't think we should account for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right - any pending changes after natively migrating to FLX "shouldn't" need to worry about the partition value.
.get_async([=](StatusWith<sync::SubscriptionSet::State> s) { | ||
if (s.is_ok()) { | ||
m_migration_store->cancel_migration(); | ||
restart_session(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, is the sentinel subscription set necessary so we can use this state change notification machinery to trigger a session restart here?
Is this safe? If you're restarting your session here there could be other I/O that's queued up in the network event loop before the finalize_and_actualize trigger runs as a response to this restart_session() call that actually tears down and restarts the session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. restart_session
is essentially pause
followed by resume
so I assume those work correctly. Is restarting here any different than elsewhere?
// message. | ||
if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX || | ||
!m_migration_store->is_migration_in_progress()) { | ||
.then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
…to create_sync_session so weak_from_this() is valid
logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4", | ||
latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 4 additional parameters, but only 3 in the string
return true; | ||
} | ||
|
||
auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't the version marked by the sentinel subscription be a moving target if a session is opened but isn't able to connect and then more pending changes are added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - just one tiny question.
@@ -51,6 +55,7 @@ bool MigrationStore::load_data(bool read_only) | |||
{&m_migration_state, c_flx_migration_state, type_Int}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should m_migration_completed_at
be optional, too?
e.g.:
{&m_migration_completed_at, c_flx_migration_completed_at, type_Timestamp, true},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could well be. Right now is not really used, so it's not affecting anything. I'll make the change
What, How & Why?
If a migrated client app has pending changes to be uploaded when the client app has been updated to natively use FLX, there is a potential to receive compensating writes, since these changes will not be assigned the partition value when they are uploaded and therefore will no longer be in view.
To prevent this, we hold off updating to native FLX until all changes are uploaded and ack'd by the server. A sentinel subscription set is used to achieve this.
Fixes #6460.
☑️ ToDos
📝 Changelog updateC-API, if public C++ API changed.