8000 mongo: handle large mongo document by jgao54 · Pull Request #3108 · PeerDB-io/peerdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

mongo: handle large mongo document #3108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

mongo: handle large mongo document #3108

wants to merge 2 commits into from

Conversation

jgao54
Copy link
Contributor
@jgao54 jgao54 commented Jun 27, 2025

This PR handles two issues w.r.t. large event (separated by commits, recommend review them separately):

  1. Mongo limits document size to 16MB, trying to insert even one byte larger will result in error:

[object to insert too large. size in bytes: xxx, max size: xxx]

With this information I was able to generate document in test that is exactly the maximum size that we can write to mongodb -- and we should support up to this size in our CDC connector without errors.

< 10000 p dir="auto">Currently we seem to be imposing snowflake's 15MB limit in the shared MarshalJSONWithOptions function, so the e2e test added to this PR was failing because the json got set to "{}". With this PR, Mongo is made an exception (although i think the current implementation is sloppy since it's based on source data type rather than destination datatypes). Looking for feedback on a better solution, or if we can get ride of the 15MB limit altogether in MarshalJSONWithOptions given QValueToAvro seem to handle snowflake's edge cases already -- but not sure if I'm missing something)

  1. With above change, inserts/replace/delete events processed successfully during CDC, but update events was still failing because it was double the size as the other events and change stream also has a 16MB limitation on event size (because the value was present at least twice in the event: once in the fullDocument field and once in the updateDescription field, not to mention it could also be present in the fullDocumentBeforeChange field if available). So the PR:
  • set fullDocumentBeforeChange to off always, which we are not using
  • make the pipeline filter out updateDescription field, which we are not using
  • restrict to only event type we care about so far (insert/update/delete/replace) and only to fields we need (operatinType/clusterTime/documentKey/fullDocument/ns) so far.

Intentionally not using $changeStreamSplitLargeEvent for now as it seems quite discouraged by mongo doc -- we can support it if current implementation is no longer sufficient.

Test: e2e test passing with this change.

@jgao54 jgao54 requested a review from serprex June 27, 2025 03:59
@jgao54 jgao54 force-pushed the large-event-handling branch from b91eaa1 to 190a9c7 Compare June 27, 2025 04:15
@jgao54 jgao54 force-pushed the large-event-handling branch from 190a9c7 to 46f7f3e Compare June 27, 2025 04:18
@jgao54 jgao54 requested a review from heavycrystal June 27, 2025 06:06
@@ -25,12 +25,14 @@ func ItemsToJSON(items Items) (string, error) {

// encoding/gob cannot encode unexported fields
type RecordItems struct {
ColToVal map[string]types.QValue
ColToVal map[string]types.QValue
NoTruncate bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be size limit instead of bool. Let's go larger than 15MB for non SF connectors

Copy link
Contributor Author
@jgao54 jgao54 Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, i only set the callsite in mongo to true, everything else is false, to maintains backwards compatibility. if we go larger than 15MB for all non SF connectors, we'll break back compatibility (in the sense that data that was truncated are now available), is this okay?

@@ -18,7 +18,7 @@ func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStr
}
output.SetSchema(schema)
for record := range stream.Records {
row := model.NewRecordItems(len(record))
row := model.NewRecordItems(len(record), false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be true, makes it clearer target stream responsible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At each of the call site for NewRecordItems the information about target DWH is not always available. is there an easy way to wire this information through to the callsite?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0