8000 feat: delay temporal filter && optimize dyn filter with always relax condition by st1page · Pull Request #13985 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: delay temporal filter && optimize dyn filter with always relax condition #13985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,13 @@ message DynamicFilterNode {
catalog.Table left_table = 3;
// Right table stores single value from RHS of predicate.
catalog.Table right_table = 4;
// It is true when the right side of the inequality predicate is monotonically:
// - decreasing for <, <=, increasing for >, >=
// bool is_monotonic = 10;
Copy link
Contributor
@kwannoel kwannoel Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about case where we have non-monotonic input. 🤔

For now I suppose it's not supported. But I guess that's the intention of having the is_monotonic flag previously.

Perhaps we should just keep that field commented out instead of removing it.

I don't think condition_always_relax can handle it.
Because when condition_always_relax is false, I think the current intention is for it to mean that right hand's side change always more the condition more constrained. But this only holds for monotonic RHS.

With no monotonicity property, the opposite of it actually implies that right side's change sometimes makes the condition more relaxed, sometimes more constrained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we need it, we can add it back. I think the field should be on-demand by the executor with its specific optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I think we should still add some comment, because DynamicFilter's RHS may not always be monotonic.

So for the condition_always_relax we should add a comment which says that we assume the input to always be monotonically changing.

// the output indices of current node
// repeated uint32 output_indices = 11;
// If the right side's change always make the condition more relaxed.
// In other words, make more record in the left side satisfy the condition.
// If this is true, we need to store LHS records which do not match the condition in the internal table.
// When the condition changes, we will tell downstream to insert the LHS records which now match the condition.
// If this is false, we need to store RHS records which match the condition in the internal table.
// When the condition changes, we will tell downstream to delete the LHS records which now no longer match the condition.
bool condition_always_relax = 5;
}

// Delta join with two indexes. This is a pseudo plan node generated on frontend. On meta
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ fn visit_stream_node_tables_inner<F>(
always!(node.right_degree_table, "HashJoinDegreeRight");
}
NodeBody::DynamicFilter(node) => {
always!(node.left_table, "DynamicFilterLeft");
if node.condition_always_relax {
always!(node.left_table, "DynamicFilterLeftNotSatisfy");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we test this new branch somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current e2e test includes the branch.

} else {
always!(node.left_table, "DynamicFilterLeft");
}

always!(node.right_table, "DynamicFilterRight");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
select * from t1 where now() - interval '15 minutes' < ts + additional_time_to_live * 1.5;
expected_outputs:
- stream_plan
- name: Temporal filter fails without `now()` in lower bound
- name: Temporal filter with `now()` in upper bound
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where now() - interval '15 minutes' > ts;
expected_outputs:
- stream_error
- stream_plan
- stream_dist_plan
- name: Temporal filter with `now()` in upper bound on append only table
sql: |-
create table t1 (ts timestamp with time zone) APPEND ONLY;
select * from t1 where now() - interval '15 minutes' > ts;
expected_outputs:
- stream_plan
- stream_dist_plan
- name: Temporal filter reorders now expressions correctly
sql: |
create table t1 (ts timestamp with time zone);
Expand Down
119 changes: 112 additions & 7 deletions src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,121 @@
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] }
└─StreamNow { output: [now] }
- name: Temporal filter fails without `now()` in lower bound
- name: Temporal filter with `now()` in upper bound
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where now() - interval '15 minutes' > ts;
stream_error: All `now()` exprs were valid, but the condition must have at least one now expr as a lower bound.
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow { output: [now] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├── left table: 0
├── right table: 1
├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ ├── state table: 2
│ ├── Upstream
│ └── BatchPlanNode
└── StreamExchange Broadcast from 1

Fragment 1
StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└── StreamNow { output: [now] } { state table: 3 }

Table 0
├── columns: [ t1_ts, t1__row_id ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 1

Table 1 { columns: [ $expr1 ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 }

Table 2
├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 3 { columns: [ now ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 }

Table 4294967294
├── columns: [ ts, t1._row_id ]
├── primary key: [ $1 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 1

- name: Temporal filter with `now()` in upper bound on append only table
sql: |-
create table t1 (ts timestamp with time zone) APPEND ONLY;
select * from t1 where now() - interval '15 minutes' > ts;
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow { output: [now] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├── left table: 0
├── right table: 1
├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ ├── state table: 2
│ ├── Upstream
│ └── BatchPlanNode
└── StreamExchange Broadcast from 1

Fragment 1
StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└── StreamNow { output: [now] } { state table: 3 }

Table 0
├── columns: [ t1_ts, t1__row_id ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 1

Table 1 { columns: [ $expr1 ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 }

Table 2
├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 3 { columns: [ now ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 }

Table 4294967294
├── columns: [ ts, t1._row_id ]
├── primary key: [ $1 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 1

- name: Temporal filter reorders now expressions correctly
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts < now() - interval '1 hour' and ts >= now() - interval '2 hour';
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (t1.ts < $expr2), output: [t1.ts, t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.ts < $expr2), output: [t1.ts, t1._row_id], condition_always_relax: true }
├─StreamDynamicFilter { predicate: (t1.ts >= $expr1), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
Expand All @@ -60,7 +163,9 @@
Fragment 0
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamDynamicFilter { predicate: (t1.ts < $expr2), output: [t1.ts, t1._row_id] } { left table: 0, right table: 1 }
└── StreamDynamicFilter { predicate: (t1.ts < $expr2), output: [t1.ts, t1._row_id], condition_always_relax: true }
├── left table: 0
├── right table: 1
├── StreamDynamicFilter { predicate: (t1.ts >= $expr1), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
│ ├── left table: 2
│ ├── right table: 3
Expand Down Expand Up @@ -126,7 +231,7 @@
└─StreamExchange { dist: HashShard(t1.a, t1._row_id, t2._row_id) }
└─StreamHashJoin { type: Inner, predicate: t1.a = t2.b, output: [t1.a, t1.ta, t2.b, t2.tb, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamDynamicFilter { predicate: (t1.ta < $expr2), output: [t1.a, t1.ta, t1._row_id] }
│ └─StreamDynamicFilter { predicate: (t1.ta < $expr2), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true }
│ ├─StreamDynamicFilter { predicate: (t1.ta >= $expr1), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true }
│ │ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ │ └─StreamExchange { dist: Broadcast }
Expand Down Expand Up @@ -157,7 +262,7 @@
├─StreamExchange { dist: HashShard(t2.b) }
│ └─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(t1.a) }
└─StreamDynamicFilter { predicate: (t1.ta < $expr2), output: [t1.a, t1.ta, t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.ta < $expr2), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true }
├─StreamDynamicFilter { predicate: (t1.ta >= $expr1), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
Expand Down Expand Up @@ -186,7 +291,7 @@
├─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.b) }
└─StreamDynamicFilter { predicate: (t2.tb < $expr2), output: [t2.b, t2.tb, t2._row_id] }
└─StreamDynamicFilter { predicate: (t2.tb < $expr2), output: [t2.b, t2.tb, t2._row_id], condition_always_relax: true }
├─StreamDynamicFilter { predicate: (t2.tb >= $expr1), output_watermarks: [t2.tb], output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
│ └─StreamExchange { dist: Broadcast }
Expand Down
53 changes: 47 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ use risingwave_pb::stream_plan::DynamicFilterNode;
use super::generic::{DynamicFilter, GenericPlanRef};
use super::stream::prelude::*;
use super::stream::StreamPlanRef;
use super::utils::{childless_record, column_names_pretty, watermark_pretty, Distill};
use super::{generic, ExprRewritable};
use super::utils::{
childless_record, column_names_pretty, plan_node_name, watermark_pretty, Distill,
};
use super::{generic, ExprRewritable, PlanTreeNodeUnary};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::optimizer::PlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand All @@ -33,18 +36,42 @@ pub struct StreamDynamicFilter {
pub base: PlanBase<Stream>,
core: generic::DynamicFilter<PlanRef>,
cleaned_by_watermark: bool,
condition_always_relax: bool,
}

impl StreamDynamicFilter {
pub fn new(core: DynamicFilter<PlanRef>) -> Self {
let watermark_columns = core.watermark_columns(core.right().watermark_columns()[0]);

// TODO: derive from input
// TODO(st1page): here we just check if RHS
// is a `StreamNow`. It will be generalized to more cases
// by introducing monotonically increasing property of the node in https://github.com/risingwavelabs/risingwave/pull/13984.
let right_monotonically_increasing = {
if let Some(e) = core.right().as_stream_exchange() && *e.distribution() == Distribution::Broadcast {
if let Some(proj) = e.input().as_stream_project() {
proj.input().as_stream_now().is_some()
} else {
false
}
} else {
false
}
};
let condition_always_relax = right_monotonically_increasing
&& matches!(
core.comparator(),
ExprType::LessThan | ExprType::LessThanOrEqual
);

let append_only = if condition_always_relax {
core.left().append_only()
} else {
false
};
let base = PlanBase::new_stream_with_core(
&core,
core.left().distribution().clone(),
false, /* we can have a new abstraction for append only and monotonically increasing
* in the future */
append_only,
false, // TODO(rc): decide EOWC property
watermark_columns,
);
Expand All @@ -53,6 +80,7 @@ impl StreamDynamicFilter {
base,
core,
cleaned_by_watermark,
condition_always_relax,
}
}

Expand Down Expand Up @@ -95,7 +123,19 @@ impl Distill for StreamDynamicFilter {
Pretty::display(&self.cleaned_by_watermark),
));
}
childless_record("StreamDynamicFilter", vec)
if self.condition_always_relax {
vec.push((
"condition_always_relax",
Pretty::display(&self.condition_always_relax),
));
}
childless_record(
plan_node_name!(
"StreamDynamicFilter",
{ "append_only", self.append_only() },
),
vec,
)
}
}

Expand Down Expand Up @@ -136,6 +176,7 @@ impl StreamNode for StreamDynamicFilter {
condition,
left_table: Some(left_table.to_internal_table_prost()),
right_table: Some(right_table.to_internal_table_prost()),
condition_always_relax: self.condition_always_relax,
})
}
}
Expand Down
4D32
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,8 @@ impl Rule for FilterWithNowToJoinRule {
// increasing)
now_filters.sort_by_key(|l| rank_cmp(l.func_type()));

// Ignore no now filter & forbid now filters that do not create a watermark
if now_filters.is_empty()
|| !matches!(
now_filters[0].func_type(),
Type::GreaterThan | Type::GreaterThanOrEqual
)
{
// Ignore no now filter
if now_filters.is_empty() {
return None;
}
let mut new_plan = plan.inputs()[0].clone();
Expand Down
Loading
0