-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix missing sync events during historical batch imports #12319
Changes from all commits
78e3df8
00f81f0
e974d6c
28880bb
446d647
e440db9
ef07fcd
0df9e93
bd87a3a
b2c6c20
9a78d14
8e319cc
77ac51c
361dd38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper. | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,17 +175,13 @@ async def get_state_events( | |
state_filter = state_filter or StateFilter.all() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we have two assumptions that we should probably enforce:
This also aligns with what we have documented:
The current behavior of There is a bug in how incremental I initially assumed we would want flexibility in how these endpoints sorted events according to the type of pagination token given but it's sounding like we actually want to enforce a given sort according to the endpoint. In which case, we can revert to @Fizzadar initial approach of plumbing a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Sorry for not getting to this yesterday, it sat at about the second thing on my todo list all day :/) Thanks for the clarification. There's the added bonus that
I wonder if we're just making this harder for ourselves by re-using the same query to do both SELECT event_id FROM events
WHERE room_id = ? AND stream_ordering <= ?
ORDER BY stream_ordering DESC
LIMIT 1 So it might be simpler to just have a separate Entirely separately, and I'm not suggesting we necessarily do this now, but it occurs to me that we have a Another approach that has also literally just now occurred to me is that we could use the Anyway, just wanted to record my thoughts here before I forget them There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Fizzadar sorry for going around the houses really slowly on this 😞 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No worries at all @erikjohnston - totally agree a separate function makes sense here; I've pushed that up in 9a78d14. I just wrapped an existing function Have undone the pagination ordering changes too! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! Will have a look after I've finished doing the v1.57.0rc1 release ❤️ |
||
|
||
if at_token: | ||
# FIXME this claims to get the state at a stream position, but | ||
# get_recent_events_for_room operates by topo ordering. This therefore | ||
# does not reliably give you the state at the given stream position. | ||
# (https://github.com/matrix-org/synapse/issues/3305) | ||
last_events, _ = await self.store.get_recent_events_for_room( | ||
room_id, end_token=at_token.room_key, limit=1 | ||
last_event = await self.store.get_last_event_in_room_before_stream_ordering( | ||
room_id, | ||
end_token=at_token.room_key, | ||
) | ||
|
||
if not last_events: | ||
if not last_event: | ||
raise NotFoundError("Can't find event for token %s" % (at_token,)) | ||
last_event = last_events[0] | ||
|
||
# check whether the user is in the room at that time to determine | ||
# whether they should be treated as peeking. | ||
|
@@ -204,7 +200,7 @@ async def get_state_events( | |
visible_events = await filter_events_for_client( | ||
self.storage, | ||
user_id, | ||
last_events, | ||
[last_event], | ||
filter_send_to_client=False, | ||
is_peeking=is_peeking, | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -661,16 +661,15 @@ async def get_state_at( | |
stream_position: point at which to get state | ||
state_filter: The state filter used to fetch state from the database. | ||
""" | ||
# FIXME this claims to get the state at a stream position, but | ||
# get_recent_events_for_room operates by topo ordering. This therefore | ||
# does not reliably give you the state at the given stream position. | ||
# (https://github.com/matrix-org/synapse/issues/3305) | ||
last_events, _ = await self.store.get_recent_events_for_room( | ||
room_id, end_token=stream_position.room_key, limit=1 | ||
# FIXME: This gets the state at the latest event before the stream ordering, | ||
# which might not be the same as the "current state" of the room at the time | ||
# of the stream token if there were multiple forward extremities at the time. | ||
last_event = await self.store.get_last_event_in_room_before_stream_ordering( | ||
Fizzadar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
room_id, | ||
end_token=stream_position.room_key, | ||
) | ||
Fizzadar marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
if last_events: | ||
last_event = last_events[-1] | ||
if last_event: | ||
state = await self.get_state_after_event( | ||
last_event, state_filter=state_filter or StateFilter.all() | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,9 @@ | |
from synapse.api.constants import EventContentFields, EventTypes | ||
from synapse.appservice import ApplicationService | ||
from synapse.rest import admin | ||
from synapse.rest.client import login, register, room, room_batch | ||
from synapse.rest.client import login, register, room, room_batch, sync | ||
from synapse.server import HomeServer | ||
from synapse.types import JsonDict | ||
from synapse.types import JsonDict, RoomStreamToken | ||
from synapse.util import Clock | ||
|
||
from tests import unittest | ||
|
@@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): | |
room.register_servlets, | ||
register.register_servlets, | ||
login.register_servlets, | ||
sync.register_servlets, | ||
] | ||
|
||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: | ||
|
@@ -178,3 +179,123 @@ def test_same_state_groups_for_whole_historical_batch(self) -> None: | |
"Expected a single state_group to be returned by saw state_groups=%s" | ||
% (state_group_map.keys(),), | ||
) | ||
|
||
@unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) | ||
def test_sync_while_batch_importing(self) -> None: | ||
""" | ||
Make sure that /sync correctly returns full room state when a user joins | ||
during ongoing batch backfilling. | ||
See: https://github.com/matrix-org/synapse/issues/12281 | ||
""" | ||
# Create user who will be invited & join room | ||
user_id = self.register_user("beep", "test") | ||
user_tok = self.login("beep", "test") | ||
|
||
time_before_room = int(self.clock.time_msec()) | ||
|
||
# Create a room with some events | ||
room_id, _, _, _ = self._create_test_room() | ||
# Invite the user | ||
self.helper.invite( | ||
room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id | ||
) | ||
|
||
# Create another room, send a bunch of events to advance the stream token | ||
other_room_id = self.helper.create_room_as( | ||
self.appservice.sender, tok=self.appservice.token | ||
) | ||
for _ in range(5): | ||
self.helper.send_event( | ||
room_id=other_room_id, | ||
type=EventTypes.Message, | ||
content={"msgtype": "m.text", "body": "C"}, | ||
tok=self.appservice.token, | ||
) | ||
|
||
# Join the room as the normal user | ||
self.helper.join(room_id, user_id, tok=user_tok) | ||
|
||
# Create an event to hang the historical batch from - In order to see | ||
# the failure case originally reported in #12281, the historical batch | ||
# must be hung from the most recent event in the room so the base | ||
# insertion event ends up with the highest `topogological_ordering` | ||
# (`depth`) in the room but will have a negative `stream_ordering` | ||
# because it's a `historical` event. Previously, when assembling the | ||
# `state` for the `/sync` response, the bugged logic would sort by | ||
# `topological_ordering` descending and pick up the base insertion | ||
# event because it has a negative `stream_ordering` below the given | ||
# pagination token. Now we properly sort by `stream_ordering` | ||
# descending which puts `historical` events with a negative | ||
# `stream_ordering` way at the bottom and aren't selected as expected. | ||
response = self.helper.send_event( | ||
room_id=room_id, | ||
type=EventTypes.Message, | ||
content={ | ||
"msgtype": "m.text", | ||
"body": "C", | ||
}, | ||
tok=self.appservice.token, | ||
) | ||
event_to_hang_id = response["event_id"] | ||
Fizzadar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
channel = self.make_request( | ||
"POST", | ||
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" | ||
% (room_id, event_to_hang_id), | ||
content={ | ||
"events": _create_message_events_for_batch_send_request( | ||
self.virtual_user_id, time_before_room, 3 | ||
), | ||
"state_events_at_start": _create_join_state_events_for_batch_send_request( | ||
[self.virtual_user_id], time_before_room | ||
), | ||
}, | ||
access_token=self.appservice.token, | ||
) | ||
self.assertEqual(channel.code, 200, channel.result) | ||
|
||
# Now we need to find the invite + join events stream tokens so we can sync between | ||
main_store = self.hs.get_datastores().main | ||
events, next_key = self.get_success( | ||
main_store.get_recent_events_for_room( | ||
room_id, | ||
50, | ||
end_token=main_store.get_room_max_token(), | ||
), | ||
) | ||
invite_event_position = None | ||
for event in events: | ||
if ( | ||
event.type == "m.room.member" | ||
and event.content["membership"] == "invite" | ||
): | ||
invite_event_position = self.get_success( | ||
main_store.get_topological_token_for_event(event.event_id) | ||
) | ||
break | ||
|
||
assert invite_event_position is not None, "No invite event found" | ||
|
||
# Remove the topological order from the token by re-creating w/stream only | ||
invite_event_position = RoomStreamToken(None, invite_event_position.stream) | ||
|
||
# Sync everything after this token | ||
since_token = self.get_success(invite_event_position.to_string(main_store)) | ||
sync_response = self.make_request( | ||
"GET", | ||
f"/sync?since={since_token}", | ||
access_token=user_tok, | ||
) | ||
|
||
# Assert that, for this room, the user was considered to have joined and thus | ||
# receives the full state history | ||
state_event_types = [ | ||
event["type"] | ||
for event in sync_response.json_body["rooms"]["join"][room_id]["state"][ | ||
"events" | ||
] | ||
] | ||
|
||
assert ( | ||
"m.room.create" in state_event_types | ||
), "Missing room full state in sync response" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tested and fails on |
Uh oh!
There was an error while loading. Please reload this page.