-
Notifications
You must be signed in to change notification settings - Fork 646
feat: delay temporal filter && optimize dyn filter with always relax condition #13985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #13985 +/- ##
==========================================
+ Coverage 68.04% 68.06% +0.02%
==========================================
Files 1536 1536
Lines 265364 265521 +157
==========================================
+ Hits 180557 180727 +170
+ Misses 84807 84794 -13
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
// TODO(st1page): the condition is wrong, introduce monotonically increasing property of the node | ||
// TODO(st1page): https://github.com/risingwavelabs/risingwave/pull/13984 | ||
let right_monotonically_increasing = { | ||
if let Some(e) = core.right().as_stream_exchange() && *e.distribution() == Distribution::Broadcast { | ||
if let Some(proj) = e.input().as_stream_project() { | ||
proj.input().as_stream_now().is_some() | ||
} else { | ||
false | ||
} | ||
} else { | ||
false | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hack for the temporal filter, but will fix it with monotonic property later. The #13984 is too big and I'd like to merge this PR at first because we only use now executor as the temporal filter and the hack is correct.
// TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/13998 | ||
if self.condition_always_relax && !self.cleaned_by_watermark { | ||
to_delete_rows.push(row.clone()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -88,7 +88,12 @@ fn visit_stream_node_tables_inner<F>( | |||
always!(node.right_degree_table, "HashJoinDegreeRight"); | |||
} | |||
NodeBody::DynamicFilter(node) => { | |||
always!(node.left_table, "DynamicFilterLeft"); | |||
if node.condition_always_relax { | |||
always!(node.left_table, "DynamicFilterLeftNotSatisfy"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we test this new branch somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current e2e test includes the branch.
@@ -60,6 +60,11 @@ pub struct DynamicFilterExecutor<S: StateStore, const USE_WATERMARK_CACHE: bool> | |||
metrics: Arc<StreamingMetrics>, | |||
/// The maximum size of the chunk produced by executor at a time. | |||
chunk_size: usize, | |||
/// If the right side's change always make the condition more relaxed. | |||
/// In other words, make more record in the left side satisfy the condition. | |||
/// In that case, there are only records which does not satisfy the condition in the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this means we store records which do not satisfy the condition,
so if the condition is something like:
LHS < NOW()
We only store
LHS >= NOW()
But why do we store records which do not satisfy the condition in the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I got it.
Consider the following case with the following records in upstream:
1
2
3
4
And the current filter (RHS < NOW()) is:
3
We store internal state, which only stores values which are excluded:
3
4
When RHS increases to 5,
3
4
We can now emit 3, 4 to downstream.
@@ -429,11 +429,9 @@ message DynamicFilterNode { | |||
catalog.Table left_table = 3; | |||
// Right table stores single value from RHS of predicate. | |||
catalog.Table right_table = 4; | |||
// It is true when the right side of the inequality predicate is monotonically: | |||
// - decreasing for <, <=, increasing for >, >= | |||
// bool is_monotonic = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about case where we have non-monotonic input. 🤔
For now I suppose it's not supported. But I guess that's the intention of having the is_monotonic
flag previously.
Perhaps we should just keep that field commented out instead of removing it.
I don't think condition_always_relax
can handle it.
Because when condition_always_relax
is false, I think the current intention is for it to mean that right hand's side change always more the condition more constrained. But this only holds for monotonic RHS.
With no monotonicity property, the opposite of it actually implies that right side's change sometimes makes the condition more relaxed, sometimes more constrained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we need it, we can add it back. I think the field should be on-demand by the executor with its specific optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case I think we should still add some comment, because DynamicFilter
's RHS may not always be monotonic.
So for the condition_always_relax
we should add a comment which says that we assume the input to always be monotonically changing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some minor comments.
Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com>
Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com>
Co-authored-by: Eric Fu <eric@singularity-data.com>
…condition (#13985) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Co-authored-by: Eric Fu <eric@singularity-data.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#13916
condition_always_relax
flag means the right side's change always makes the condition more relaxed, only store records which does not satisfy the condition in the table in this case.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Please ping me and I will write the doc for the usage of the delay temporal filter, thanks
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.