-
Notifications
You must be signed in to change notification settings - Fork 649
fix: poll stream consumer for triggering callback func #21900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// poll consumer to trigger callback functions | ||
// We don't need to wait for the result, because recv may block and wait for next new message | ||
let _ = tokio::time::timeout(Duration::from_millis(1000), consumer.recv()).await; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this read a new message and ignore it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after adjusting the code position, the recv
call will not dismiss the first message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4d50e8d from #21868 also do the same change. already contained in image v2.2.6-patch-kafka-poll-log-level-3
@@ -100,6 +100,10 @@ impl SplitReader for KafkaSplitReader { | |||
.await | |||
.context("failed to create kafka consumer")?; | |||
|
|||
// poll consumer to trigger callback functions | |||
// We don't need to wait for the result, because recv may block and wait for next new message | |||
let _ = tokio::time::timeout(Duration::from_millis(100), consumer.recv()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because we call recv
before assign
(in L155) that recv
becomes a function purely for side-effect (trigger the callback) without fetching any message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, IIUC consumer.recv
will be blocked indefinitely here. how do we make sure that the callback function must be triggered before the 100ms timeout?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
Documentation
Release note