8000 Discussion: Use ORDER BY as part of MV's distribution key · Issue #19321 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Discussion: Use ORDER BY as part of MV's distribution key #19321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
xxchan opened this issue Nov 8, 2024 · 9 comments · Fixed by #20176
Open

Discussion: Use ORDER BY as part of MV's distribution key #19321

xxchan opened this issue Nov 8, 2024 · 9 comments · Fixed by #20176
Labels
type/feature Type: New feature.
Milestone

Comments

@xxchan
Copy link
Member
xxchan commented Nov 8, 2024

context: https://risingwave-labs.slack.com/archives/C034TRPKN1F/p1731041466238309?thread_ts=1731032250.800469&cid=C034TRPKN1F

Some nodes:

  • dist_key just need to be subset of pk, not prefix.
  • MV pk is order_by | stream_key
  • before feat(optimizer): change stream join mv distribution key #13022, MV dist_key is upstream_dist_key (subset of stream_key). And we changed to whole stream_key.
  • For Index, we support distributed by. Since index is essentially the same as MV, there seem to be no reason not to also support it for MV
@xxchan xxchan added the type/feature Type: New feature. label Nov 8, 2024
@github-actions github-actions bot added this to the release-2.2 milestone Nov 8, 2024
@xxchan xxchan changed the title Discussion: Use ORDER BY as MV's distribution key Discussion: Use ORDER BY as part of MV's distribution key Nov 8, 2024

An example to show what the dist key looks like now on main:

create table test_dist (v1 int, v2 int, v3 int, primary key (v1, v2));
create table test_dist2 (v1 int, v2 int, v3 int, primary key (v1, v2));
create table test_dist3 (v1 int, v2 int, v3 int, primary key (v1, v2));


describe test_dist;
-- primary key v1, v2
-- distribution key v1, v2

create materialized view test_dist_mv as select * from test_dist order by v1, v3;

describe test_dist_mv;
-- primary key v1, v3, v2
-- distribution key v1, v2

drop MATERIALIZED view test_join_dist;

create MATERIALIZED view test_join_dist as
select t1.v3 as a, t2.v3 as b, t3.v3 as c
from test_dist as t1
join test_dist2 as t2
on t1.v1 = t2.v1
join test_dist3 as t3
on t2.v1 = t3.v1;

describe test_join_dist;
-- primary key test_dist.v1, test_dist.v2, test_dist2.v2, test_dist3.v2, test_dist2.v1
-- distribution test_dist.v1, test_dist.v2, test_dist2.v2, test_dist3.v2, test_dist2.v1


create MATERIALIZED view test_join_dist2 as
select t1.v3 as a, t2.v3 as b, t3.v3 as c
from test_dist as t1
join test_dist2 as t2
on t1.v1 = t2.v1
join test_dist3 as t3
on t2.v1 = t3.v1
order by t1.v3, t1.v2;

describe test_join_dist2;
-- primary key test_dist.v3, test_dist.v2, test_dist.v1, test_dist2.v2, test_dist3.v2, test_dist2.v1
-- distribution key test_dist2.v1

ideally the dist key for test_dist and dist_join_dist2 can be:

describe test_dist_mv;
-- primary key v1, v3, v2
-- distribution key v1, v3, ...


describe test_join_dist2;
-- primary key test_dist.v3, test_dist.v2, test_dist.v1, test_dist2.v2, test_dist3.v2, test_dist2.v1
-- distribution key test_dist.v3, test_dist.v2, ...

Copy link
Contributor

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean.
Don't worry if you think the issue is still valuable to continue in the future.
It's searchable and can be reopened when it's time. 😄

@kwannoel
Copy link
Contributor
kwannoel commented May 2, 2025

I reproduced a bug from this: #21668.

I think ORDER BY can be part of MV's distribution key. But it must contain the full stream key as well to ensure updates don't become dis-ordered if we re-shuffle back to original distribution.

Consider the following case of StreamTableScan after 27efddf:

       └─StreamExchange { dist: HashShard(token_mint) }
         └─StreamTableScan {
              stream_key: [owner, token_mint],
              pk: [token_mint, balance, owner],
              dist: UpstreamHashShard(token_mint, balance)

The dist is the user order key. It gets shuffled to different actors initially, and the update on 0x16, 0x10 becomes disordered...

         mint   owner    timestamp                 balance
         v      v        v                         v
A) | + | 0x16 | 0x10 | 1970-01-01 03:31:04+00:00 | 3 | ----------> Actor 38
B) | + | 0x16 | 0x10 | 1970-01-01 03:45:06+00:00 | 1 | ----------> Actor 37 out of order
C) | - | 0x16 | 0x10 | 1970-01-01 03:31:04+00:00 | 3 | ----------> Actor 38 with its update insert

... and then shuffled back to the same actor with:

       └─StreamExchange { dist: HashShard(token_mint) }
         └─StreamTableScan {

So we get:

A) | + | 0x16 | 0x10 | 1970-01-01 03:31:04+00:00 | 3 | ----------> Actor 35
B) | + | 0x16 | 0x10 | 1970-01-01 03:45:06+00:00 | 1 | ----------> Actor 35
C) | - | 0x16 | 0x10 | 1970-01-01 03:31:04+00:00 | 3 | ----------> Actor 35

Where B should be after C, and we encounter a double-insert error.

@kwannoel
Copy link
Contributor
kwannoel commented May 4, 2025

But it must contain the full stream key

If we include the stream key, like:

dist_key: | order key | stream _key |

I think the locality will also be affected with the presence of stream key. So we cannot achieve the original objective of having data distributed purely by order key.

It can help avoid data skew however?

@kwannoel
Copy link
Contributor
kwannoel commented May 4, 2025

We'd better check the dist key for index also, to see if we have the same problem.

@BugenZhao
Copy link
Member
BugenZhao commented May 5, 2025

But it must contain the full stream key

I think this is not accurate. The fundamental rule is that, records with same stream key value must be shuffled to the same partition, thus, have the same distribution key value.

Following this...

  • dist key cannot have column that's not within the stream key (in other words, dist key must be the subset of stream key)
  • if we want to add a new column into dist key, then we have to add it into stream key as well

So in the case of #21668, if we enrich the stream key to also include column balance, I think we are all done?


It appears that the goal of this issue is correct ("as part of"), but the implementation in PR #20176 is incorrect ("as").

@kwannoel
Copy link
Contributor
kwannoel commented May 5, 2025

So in the case of #21668, if we enrich the stream key to also include column balance, I think we are all done?

That works

@kwannoel
Copy link
Contributor
kwannoel commented May 5, 2025

But this issue description is not accurate as a result:

dist_key just need to be subset of pk, not prefix.

Rather than MV pk, it needs to be subset of MV stream key.

@kwannoel
Copy link
Contributor
kwannoel commented May 7, 2025

But this issue description is not accurate as a result:

dist_key just need to be subset of pk, not prefix.

Rather than MV pk, it needs to be subset of MV stream key.

Doesn't this just mean that MV stream key should be equal to MV pk, since MV pk will be order_key | upstream_stream_key.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Type: New feature.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants
0