-
Notifications
You must be signed in to change notification settings - Fork 119
mongo: handle large mongo document #3108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
b91eaa1
to
190a9c7
Compare
190a9c7
to
46f7f3e
Compare
@@ -25,12 +25,14 @@ func ItemsToJSON(items Items) (string, error) { | |||
|
|||
// encoding/gob cannot encode unexported fields | |||
type RecordItems struct { | |||
ColToVal map[string]types.QValue | |||
ColToVal map[string]types.QValue | |||
NoTruncate bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be size limit instead of bool. Let's go larger than 15MB for non SF connectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, i only set the callsite in mongo to true, everything else is false, to maintains backwards compatibility. if we go larger than 15MB for all non SF connectors, we'll break back compatibility (in the sense that data that was truncated are now available), is this okay?
@@ -18,7 +18,7 @@ func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStr | |||
} | |||
output.SetSchema(schema) | |||
for record := range stream.Records { | |||
row := model.NewRecordItems(len(record)) | |||
row := model.NewRecordItems(len(record), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be true, makes it clearer target stream responsible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At each of the call site for NewRecordItems
the information about target DWH is not always available. is there an easy way to wire this information through to the callsite?
This PR handles two issues w.r.t. large event (separated by commits, recommend review them separately):
With this information I was able to generate document in test that is exactly the maximum size that we can write to mongodb -- and we should support up to this size in our CDC connector without errors.
< 10000 p dir="auto">Currently we seem to be imposing snowflake's 15MB limit in the sharedMarshalJSONWithOptions
function, so the e2e test added to this PR was failing because the json got set to "{}". With this PR, Mongo is made an exception (although i think the current implementation is sloppy since it's based on source data type rather than destination datatypes). Looking for feedback on a better solution, or if we can get ride of the 15MB limit altogether inMarshalJSONWithOptions
givenQValueToAvro
seem to handle snowflake's edge cases already -- but not sure if I'm missing something)fullDocument
field and once in theupdateDescription
field, not to mention it could also be present in thefullDocumentBeforeChange
field if available). So the PR:fullDocumentBeforeChange
tooff
always, which we are not usingupdateDescription
field, which we are not usingIntentionally not using
$changeStreamSplitLargeEvent
for now as it seems quite discouraged by mongo doc -- we can support it if current implementation is no longer sufficient.Test: e2e test passing with this change.