From e7b4e85a0d70e43554b437f724a8801ad164520e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Apr 2025 14:48:01 +0800 Subject: [PATCH 1/8] refactor: deprecate frontend generating `ColumnIndexMapping` when altering a table Signed-off-by: Bugen Zhao --- src/meta/src/stream/stream_graph/fragment.rs | 53 ++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index a4bd6037dbbf4..72e59a83d1e71 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -26,9 +26,8 @@ use risingwave_common::catalog::{ }; use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::stream_graph_visitor; use risingwave_common::util::stream_graph_visitor::{ - visit_stream_node_cont, visit_stream_node_cont_mut, + self, visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut, }; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::Table; @@ -1141,12 +1140,60 @@ impl CompleteStreamFragmentGraph { for (dispatch_strategy, fragment) in &downstream_fragments { let id = GlobalFragmentId::new(fragment.fragment_id); + let mut res = None; + + stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| { + let columns = match node_body { + NodeBody::StreamScan(stream_scan) => { + stream_scan.upstream_column_ids.clone() + } + _ => return, + }; + res = Some(columns); + }); + + let columns = res.context("failed to locate downstream scan")?; + + let mut upstream_columns = None; + + let node = graph + .fragments + .get(&table_fragment_id) + .unwrap() + .node + .as_ref() + .unwrap(); + + stream_graph_visitor::visit_stream_node(node, |node_body| { + if let NodeBody::Materialize(materialize) = node_body { + let columns = materialize.column_ids().clone(); + upstream_columns = Some(columns); + } + }); + + let upstream_columns = + upstream_columns.context("failed to locate upstream materialize")?; + + let output_indices = columns + .iter() + .map(|c| { + upstream_columns + .iter() + .position(|&id| id == *c) + .map(|i| i as u32) + }) + .collect::>>() + .context("column not found in the upstream materialize")?; + let edge = StreamFragmentEdge { id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId { original_upstream_fragment_id: original_table_fragment_id, downstream_fragment_id: id, }), - dispatch_strategy: dispatch_strategy.clone(), + dispatch_strategy: DispatchStrategy { + output_indices, + ..dispatch_strategy.clone() + }, }; extra_downstreams From 721eccd47cf3354e15c6c11ffd247b8aa532efc3 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Apr 2025 15:16:12 +0800 Subject: [PATCH 2/8] fix dist key indices & error message Signed-off-by: Bugen Zhao --- src/meta/src/stream/stream_graph/fragment.rs | 54 ++++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 72e59a83d1e71..e7f4af3f1b4e2 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -42,12 +42,12 @@ use risingwave_pb::stream_plan::{ StreamScanType, backfill_order_strategy, }; -use crate::MetaResult; use crate::barrier::SnapshotBackfillInfo; use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType}; use crate::model::{ActorId, Fragment, FragmentId, StreamActor}; use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen}; use crate::stream::stream_graph::schedule::Distribution; +use crate::{MetaError, MetaResult}; /// The fragment in the building phase, including the [`StreamFragment`] from the frontend and /// several additional helper fields. @@ -1027,7 +1027,7 @@ impl CompleteStreamFragmentGraph { }) .collect::>>() .context( - "column not found in the upstream materialized view", + "BUG: column not found in the upstream materialized view", )?; (dist_key_indices, output_indices) }; @@ -1152,11 +1152,9 @@ impl CompleteStreamFragmentGraph { res = Some(columns); }); - let columns = res.context("failed to locate downstream scan")?; + let output_columns = res.context("failed to locate downstream scan")?; - let mut upstream_columns = None; - - let node = graph + let nodes = graph .fragments .get(&table_fragment_id) .unwrap() @@ -1164,26 +1162,27 @@ impl CompleteStreamFragmentGraph { .as_ref() .unwrap(); - stream_graph_visitor::visit_stream_node(node, |node_body| { - if let NodeBody::Materialize(materialize) = node_body { - let columns = materialize.column_ids().clone(); - upstream_columns = Some(columns); - } - }); - - let upstream_columns = - upstream_columns.context("failed to locate upstream materialize")?; - - let output_indices = columns - .iter() - .map(|c| { - upstream_columns - .iter() - .position(|&id| id == *c) - .map(|i| i as u32) - }) - .collect::>>() - .context("column not found in the upstream materialize")?; + let (dist_key_indices, output_indices) = { + let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); + let all_column_ids = mview_node.column_ids(); + let dist_key_indices = mview_node.dist_key_indices(); + let output_indices = output_columns + .iter() + .map(|c| { + all_column_ids + .iter() + .position(|&id| id == *c) + .map(|i| i as u32) + }) + .collect::>>() + .ok_or_else(|| { + MetaError::invalid_parameter( + "unable to drop or alter the column due to \ + being referenced by downstream materialized views or sinks", + ) + })?; + (dist_key_indices, output_indices) + }; let edge = StreamFragmentEdge { id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId { @@ -1192,7 +1191,8 @@ impl CompleteStreamFragmentGraph { }), dispatch_strategy: DispatchStrategy { output_indices, - ..dispatch_strategy.clone() + r#type: dispatch_strategy.r#type, + dist_key_indices, }, }; From 3f8017dc40d2dcddadde499a85cec071a90a459c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Apr 2025 16:32:09 +0800 Subject: [PATCH 3/8] only resolve dispatcher type Signed-off-by: Bugen Zhao --- src/meta/src/controller/fragment.rs | 16 ++++-------- src/meta/src/manager/metadata.rs | 4 +-- src/meta/src/rpc/ddl_controller.rs | 26 +++----------------- src/meta/src/stream/stream_graph/fragment.rs | 12 ++++----- 4 files changed, 17 insertions(+), 41 deletions(-) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index e5dd061f14c2b..87a4ea3fde164 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -50,8 +50,8 @@ use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - DispatchStrategy, PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode, - PbStreamScanType, StreamScanType, + PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode, PbStreamScanType, + StreamScanType, }; use sea_orm::ActiveValue::Set; use sea_orm::sea_query::Expr; @@ -1559,7 +1559,7 @@ impl CatalogController { pub async fn get_downstream_fragments( &self, job_id: ObjectId, - ) -> MetaResult<(Vec<(DispatchStrategy, Fragment)>, Vec<(ActorId, WorkerId)>)> { + ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> { let (root_fragment, actors) = self.get_root_fragment(job_id).await?; let inner = self.inner.read().await; @@ -1578,8 +1578,6 @@ impl CatalogController { for fragment_relation::Model { target_fragment_id: fragment_id, dispatcher_type, - dist_key_indices, - output_indices, .. } in downstream_fragment_relations { @@ -1592,13 +1590,9 @@ impl CatalogController { } assert_eq!(fragment_actors.len(), 1); let (fragment, actors) = fragment_actors.pop().unwrap(); - let dispatch_strategy = DispatchStrategy { - r#type: PbDispatcherType::from(dispatcher_type) as _, - dist_key_indices: dist_key_indices.into_u32_array(), - output_indices: output_indices.into_u32_array(), - }; + let dispatch_type = PbDispatcherType::from(dispatcher_type); let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0; - downstream_fragments.push((dispatch_strategy, fragment)); + downstream_fragments.push((dispatch_type, fragment)); } Ok((downstream_fragments, actors)) diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index c060a90bf6278..ed71bb7ba1806 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -25,7 +25,7 @@ use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; -use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType}; +use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType}; use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; use tokio::sync::oneshot; use tokio::time::{Instant, sleep}; @@ -506,7 +506,7 @@ impl MetadataManager { &self, job_id: u32, ) -> MetaResult<( - Vec<(PbDispatchStrategy, Fragment)>, + Vec<(PbDispatcherType, Fragment)>, HashMap, )> { let (fragments, actors) = self diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 4daca6c5841e9..df08bc8f313e3 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -726,7 +726,7 @@ impl DdlController { fragment_graph: StreamFragmentGraph, ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> { let (mut replace_table_ctx, mut stream_job_fragments) = self - .build_replace_job(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) + .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _) .await?; let target_table = streaming_job.table().unwrap(); @@ -1407,13 +1407,7 @@ impl DdlController { let mut drop_table_connector_ctx = None; let result: MetaResult<_> = try { let (mut ctx, mut stream_job_fragments) = self - .build_replace_job( - ctx, - &streaming_job, - fragment_graph, - col_index_mapping.as_ref(), - tmp_id as _, - ) + .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _) .await?; drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone(); @@ -1845,7 +1839,6 @@ impl DdlController { stream_ctx: StreamContext, stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, - col_index_mapping: Option<&ColIndexMapping>, tmp_job_id: TableId, ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> { match &stream_job { @@ -1904,20 +1897,9 @@ impl DdlController { let job_type = StreamingJobType::from(stream_job); - // Map the column indices in the dispatchers with the given mapping. - let (mut downstream_fragments, downstream_actor_location) = + // Extract the downstream fragments from the fragment graph. + let (downstream_fragments, downstream_actor_location) = self.metadata_manager.get_downstream_fragments(id).await?; - if let Some(mapping) = &col_index_mapping { - for (d, _f) in &mut downstream_fragments { - *d = mapping.rewrite_dispatch_strategy(d).ok_or_else(|| { - // The `rewrite` only fails if some column is dropped (missing) or altered (type changed). - // TODO: support altering referenced columns - MetaError::invalid_parameter( - "unable to drop or alter the column due to being referenced by downstream materialized views or sinks", - ) - })?; - } - } // build complete graph based on the table job type let complete_graph = match &job_type { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e7f4af3f1b4e2..e4d2100d6b795 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -27,7 +27,7 @@ use risingwave_common::catalog::{ use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor::{ - self, visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut, + self, visit_stream_node_cont, visit_stream_node_cont_mut, }; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::Table; @@ -852,7 +852,7 @@ pub struct FragmentGraphUpstreamContext { pub struct FragmentGraphDownstreamContext { original_root_fragment_id: FragmentId, - downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + downstream_fragments: Vec<(DispatcherType, Fragment)>, downstream_actor_location: HashMap, } @@ -895,7 +895,7 @@ impl CompleteStreamFragmentGraph { pub fn with_downstreams( graph: StreamFragmentGraph, original_root_fragment_id: FragmentId, - downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + downstream_fragments: Vec<(DispatcherType, Fragment)>, existing_actor_location: HashMap, job_type: StreamingJobType, ) -> MetaResult { @@ -917,7 +917,7 @@ impl CompleteStreamFragmentGraph { upstream_root_fragments: HashMap, upstream_actor_location: HashMap, original_root_fragment_id: FragmentId, - downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + downstream_fragments: Vec<(DispatcherType, Fragment)>, downstream_actor_location: HashMap, job_type: StreamingJobType, ) -> MetaResult { @@ -1137,7 +1137,7 @@ impl CompleteStreamFragmentGraph { // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the // existing materialized views. - for (dispatch_strategy, fragment) in &downstream_fragments { + for (dispatcher_type, fragment) in &downstream_fragments { let id = GlobalFragmentId::new(fragment.fragment_id); let mut res = None; @@ -1190,8 +1190,8 @@ impl CompleteStreamFragmentGraph { downstream_fragment_id: id, }), dispatch_strategy: DispatchStrategy { + r#type: *dispatcher_type as i32, output_indices, - r#type: dispatch_strategy.r#type, dist_key_indices, }, }; From bdc6e0ea2e85205068ccec0c269a7a3acab40ea8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Apr 2025 17:32:08 +0800 Subject: [PATCH 4/8] fix source backfill Signed-off-by: Bugen Zhao --- e2e_test/ddl/alter_table_column_type.slt | 9 +- .../cdc_inline/alter/cdc_table_alter.slt | 2 +- src/meta/src/stream/stream_graph/fragment.rs | 217 +++++++++--------- src/prost/src/lib.rs | 9 + 4 files changed, 127 insertions(+), 110 deletions(-) diff --git a/e2e_test/ddl/alter_table_column_type.slt b/e2e_test/ddl/alter_table_column_type.slt index f9ce1346d8e42..033c768d96034 100644 --- a/e2e_test/ddl/alter_table_column_type.slt +++ b/e2e_test/ddl/alter_table_column_type.slt @@ -65,11 +65,12 @@ NULL # If a column is currently being referenced by any downstream job, the column cannot be altered. # TODO: support altering referenced columns -statement ok -create materialized view my_mv_a as select a from my_table; +# TODO: supporting in progress, temporarily disable this test +# statement ok +# create materialized view my_mv_a as select a from my_table; -statement error unable to drop or alter the column due to being referenced by downstream materialized views or sinks -alter table my_table alter column a type struct; +# statement error unable to drop or alter the column due to being referenced by downstream materialized views or sinks +# alter table my_table alter column a type struct; statement ok drop table my_table cascade; diff --git a/e2e_test/source_legacy/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source_legacy/cdc_inline/alter/cdc_table_alter.slt index a837c53224dd7..cc5bf447405b4 100644 --- a/e2e_test/source_legacy/cdc_inline/alter/cdc_table_alter.slt +++ b/e2e_test/source_legacy/cdc_inline/alter/cdc_table_alter.slt @@ -210,7 +210,7 @@ psql -c " ALTER TABLE shipments1 DROP COLUMN destination; " -statement error unable to drop or alter the column due to being referenced by downstream materialized views or sinks +statement error unable to drop the column due to being referenced by downstream materialized views or sinks ALTER TABLE pg_shipments DROP COLUMN destination; # wait alter ddl diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e4d2100d6b795..1bd42bee9aed0 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -224,11 +224,7 @@ impl BuildingFragment { NodeBody::SourceBackfill(backfill) => ( backfill.upstream_source_id.into(), // FIXME: only pass required columns instead of all columns here - backfill - .columns - .iter() - .map(|c| c.column_desc.as_ref().unwrap().column_id) - .collect(), + backfill.column_ids(), ), _ => return, }; @@ -956,14 +952,16 @@ impl CompleteStreamFragmentGraph { { for (&id, fragment) in &mut graph.fragments { let uses_shuffled_backfill = fragment.has_shuffled_backfill(); - for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { - let (up_fragment_id, edge) = match job_type { - StreamingJobType::Table(TableJobType::SharedCdcSource) => { - let source_fragment = upstream_root_fragments - .get(&upstream_table_id) - .context("upstream source fragment not found")?; - let source_job_id = GlobalFragmentId::new(source_fragment.fragment_id); + for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns { + let upstream_fragment = upstream_root_fragments + .get(&upstream_table_id) + .context("upstream fragment not found")?; + let upstream_root_fragment_id = + GlobalFragmentId::new(upstream_fragment.fragment_id); + + let edge = match job_type { + StreamingJobType::Table(TableJobType::SharedCdcSource) => { // we traverse all fragments in the graph, and we should find out the // CdcFilter fragment and add an edge between upstream source fragment and it. assert_ne!( @@ -972,13 +970,14 @@ impl CompleteStreamFragmentGraph { ); tracing::debug!( - ?source_job_id, - ?output_columns, + ?upstream_root_fragment_id, + ?required_columns, identity = ?fragment.inner.get_node().unwrap().get_identity(), current_frag_id=?id, "CdcFilter with upstream source fragment" ); - let edge = StreamFragmentEdge { + + StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, downstream_fragment_id: id, @@ -990,23 +989,15 @@ impl CompleteStreamFragmentGraph { dist_key_indices: vec![], // not used for `NoShuffle` output_indices: (0..CDC_SOURCE_COLUMN_NUM as _).collect(), }, - }; - - (source_job_id, edge) + } } + + // handle MV on MV/Source StreamingJobType::MaterializedView | StreamingJobType::Sink | StreamingJobType::Index => { - // handle MV on MV/Source - - // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` - // of the new materialized view. - let upstream_fragment = upstream_root_fragments - .get(&upstream_table_id) - .context("upstream materialized view fragment not found")?; - let upstream_root_fragment_id = - GlobalFragmentId::new(upstream_fragment.fragment_id); - + // Build the extra edges between the upstream `Materialize` and + // the downstream `StreamScan` of the new job. if upstream_fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 { @@ -1017,18 +1008,13 @@ impl CompleteStreamFragmentGraph { nodes.get_node_body().unwrap().as_materialize().unwrap(); let all_column_ids = mview_node.column_ids(); let dist_key_indices = mview_node.dist_key_indices(); - let output_indices = output_columns - .iter() - .map(|c| { - all_column_ids - .iter() - .position(|&id| id == *c) - .map(|i| i as u32) - }) - .collect::>>() - .context( - "BUG: column not found in the upstream materialized view", - )?; + let output_indices = gen_output_indices( + required_columns, + all_column_ids, + ) + .context( + "BUG: column not found in the upstream materialized view", + )?; (dist_key_indices, output_indices) }; let dispatch_strategy = mv_on_mv_dispatch_strategy( @@ -1036,44 +1022,33 @@ impl CompleteStreamFragmentGraph { dist_key_indices, output_indices, ); - let edge = StreamFragmentEdge { + + StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, downstream_fragment_id: id, }, dispatch_strategy, - }; - - (upstream_root_fragment_id, edge) - } else if upstream_fragment.fragment_type_mask + } + } + // Build the extra edges between the upstream `Source` and + // the downstream `SourceBackfill` of the new job. + else if upstream_fragment.fragment_type_mask & FragmentTypeFlag::Source as u32 != 0 { - let source_fragment = upstream_root_fragments - .get(&upstream_table_id) - .context("upstream source fragment not found")?; - let source_job_id = - GlobalFragmentId::new(source_fragment.fragment_id); - let output_indices = { let nodes = &upstream_fragment.nodes; let source_node = nodes.get_node_body().unwrap().as_source().unwrap(); let all_column_ids = source_node.column_ids().unwrap(); - output_columns - .iter() - .map(|c| { - all_column_ids - .iter() - .position(|&id| id == *c) - .map(|i| i as u32) - }) - .collect::>>() - .context("column not found in the upstream source node")? + gen_output_indices(required_columns, all_column_ids).context( + "BUG: column not found in the upstream source node", + )? }; - let edge = StreamFragmentEdge { + StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, downstream_fragment_id: id, @@ -1085,9 +1060,7 @@ impl CompleteStreamFragmentGraph { dist_key_indices: vec![], // not used for `NoShuffle` output_indices, }, - }; - - (source_job_id, edge) + } } else { bail!( "the upstream fragment should be a MView or Source, got fragment type: {:b}", @@ -1105,14 +1078,14 @@ impl CompleteStreamFragmentGraph { // put the edge into the extra edges extra_downstreams - .entry(up_fragment_id) + .entry(upstream_root_fragment_id) .or_insert_with(HashMap::new) .try_insert(id, edge.clone()) .unwrap(); extra_upstreams .entry(id) .or_insert_with(HashMap::new) - .try_insert(up_fragment_id, edge) + .try_insert(upstream_root_fragment_id, edge) .unwrap(); } } @@ -1140,48 +1113,69 @@ impl CompleteStreamFragmentGraph { for (dispatcher_type, fragment) in &downstream_fragments { let id = GlobalFragmentId::new(fragment.fragment_id); - let mut res = None; + // Similar to `extract_upstream_table_columns_except_cross_db_backfill`. + let output_columns = || { + let mut res = None; - stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| { - let columns = match node_body { - NodeBody::StreamScan(stream_scan) => { - stream_scan.upstream_column_ids.clone() - } - _ => return, - }; - res = Some(columns); - }); + stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| { + let columns = match node_body { + NodeBody::StreamScan(stream_scan) => { + stream_scan.upstream_column_ids.clone() + } + NodeBody::SourceBackfill(source_backfill) => { + // FIXME: only pass required columns instead of all columns here + source_backfill.column_ids() + } + _ => return, + }; + res = Some(columns); + }); - let output_columns = res.context("failed to locate downstream scan")?; + res.context("failed to locate downstream scan") + }; - let nodes = graph - .fragments - .get(&table_fragment_id) - .unwrap() - .node - .as_ref() - .unwrap(); + let table_fragment = graph.fragments.get(&table_fragment_id).unwrap(); + let nodes = table_fragment.node.as_ref().unwrap(); + + let (dist_key_indices, output_indices) = match job_type { + StreamingJobType::Table(TableJobType::General) => { + let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); + let all_column_ids = mview_node.column_ids(); + let dist_key_indices = mview_node.dist_key_indices(); + let output_indices = gen_output_indices(&output_columns()?, all_column_ids) + .ok_or_else(|| { + MetaError::invalid_parameter( + "unable to drop the column due to \ + being referenced by downstream materialized views or sinks", + ) + })?; + (dist_key_indices, output_indices) + } + StreamingJobType::Table(TableJobType::SharedCdcSource) => { + assert_eq!(*dispatcher_type, DispatcherType::NoShuffle); + ( + vec![], // not used for `NoShuffle` + (0..CDC_SOURCE_COLUMN_NUM as _).collect(), + ) + } + StreamingJobType::Source => { + let source_node = nodes.get_node_body().unwrap().as_source().unwrap(); + let all_column_ids = source_node.column_ids().unwrap(); + let output_indices = gen_output_indices(&output_columns()?, all_column_ids) + .ok_or_else(|| { + MetaError::invalid_parameter( + "unable to drop the column due to \ + being referenced by downstream materialized views or sinks", + ) + })?; + assert_eq!(*dispatcher_type, DispatcherType::NoShuffle); + ( + vec![], // not used for `NoShuffle` + output_indices, + ) + } - let (dist_key_indices, output_indices) = { - let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); - let all_column_ids = mview_node.column_ids(); - let dist_key_indices = mview_node.dist_key_indices(); - let output_indices = output_columns - .iter() - .map(|c| { - all_column_ids - .iter() - .position(|&id| id == *c) - .map(|i| i as u32) - }) - .collect::>>() - .ok_or_else(|| { - MetaError::invalid_parameter( - "unable to drop or alter the column due to \ - being referenced by downstream materialized views or sinks", - ) - })?; - (dist_key_indices, output_indices) + _ => bail!("unsupported job type for replacement: {job_type:?}"), }; let edge = StreamFragmentEdge { @@ -1227,6 +1221,19 @@ impl CompleteStreamFragmentGraph { } } +/// Generate the `output_indices` for [`DispatchStrategy`]. +fn gen_output_indices(required_columns: &Vec, upstream_columns: Vec) -> Option> { + required_columns + .iter() + .map(|c| { + upstream_columns + .iter() + .position(|&id| id == *c) + .map(|i| i as u32) + }) + .collect() +} + fn mv_on_mv_dispatch_strategy( uses_shuffled_backfill: bool, dist_key_indices: Vec, diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 8130966483e91..5e4842bcd949b 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -239,6 +239,15 @@ impl stream_plan::MaterializeNode { } } +impl stream_plan::SourceBackfillNode { + pub fn column_ids(&self) -> Vec { + self.columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id) + .collect() + } +} + // Encapsulating the use of parallelism. impl common::WorkerNode { pub fn compute_node_parallelism(&self) -> usize { From bdef594e2d15707ac3a76fbc2592b620a6893974 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Apr 2025 18:05:39 +0800 Subject: [PATCH 5/8] fix replace cdc table Signed-off-by: Bugen Zhao --- src/meta/src/stream/stream_graph/fragment.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 1bd42bee9aed0..188aa916270de 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -1114,7 +1114,7 @@ impl CompleteStreamFragmentGraph { let id = GlobalFragmentId::new(fragment.fragment_id); // Similar to `extract_upstream_table_columns_except_cross_db_backfill`. - let output_columns = || { + let output_columns = { let mut res = None; stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| { @@ -1131,18 +1131,18 @@ impl CompleteStreamFragmentGraph { res = Some(columns); }); - res.context("failed to locate downstream scan") + res.context("failed to locate downstream scan")? }; let table_fragment = graph.fragments.get(&table_fragment_id).unwrap(); let nodes = table_fragment.node.as_ref().unwrap(); let (dist_key_indices, output_indices) = match job_type { - StreamingJobType::Table(TableJobType::General) => { + StreamingJobType::Table(_) => { let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); let all_column_ids = mview_node.column_ids(); let dist_key_indices = mview_node.dist_key_indices(); - let output_indices = gen_output_indices(&output_columns()?, all_column_ids) + let output_indices = gen_output_indices(&output_columns, all_column_ids) .ok_or_else(|| { MetaError::invalid_parameter( "unable to drop the column due to \ @@ -1151,17 +1151,11 @@ impl CompleteStreamFragmentGraph { })?; (dist_key_indices, output_indices) } - StreamingJobType::Table(TableJobType::SharedCdcSource) => { - assert_eq!(*dispatcher_type, DispatcherType::NoShuffle); - ( - vec![], // not used for `NoShuffle` - (0..CDC_SOURCE_COLUMN_NUM as _).collect(), - ) - } + StreamingJobType::Source => { let source_node = nodes.get_node_body().unwrap().as_source().unwrap(); let all_column_ids = source_node.column_ids().unwrap(); - let output_indices = gen_output_indices(&output_columns()?, all_column_ids) + let output_indices = gen_output_indices(&output_columns, all_column_ids) .ok_or_else(|| { MetaError::invalid_parameter( "unable to drop the column due to \ From bae2b2e45466f8fde2c262a7e2449b27a5363d2c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 29 Apr 2025 17:14:26 +0800 Subject: [PATCH 6/8] remove strategy rewrite Signed-off-by: Bugen Zhao --- src/common/src/util/column_index_mapping.rs | 23 --------------------- 1 file changed, 23 deletions(-) diff --git a/src/common/src/util/column_index_mapping.rs b/src/common/src/util/column_index_mapping.rs index cf35c5850f5d6..1f495581236a0 100644 --- a/src/common/src/util/column_index_mapping.rs +++ b/src/common/src/util/column_index_mapping.rs @@ -18,7 +18,6 @@ use std::vec; use itertools::Itertools; use risingwave_pb::catalog::PbColIndexMapping; -use risingwave_pb::stream_plan::DispatchStrategy; /// `ColIndexMapping` is a partial mapping from usize to usize. /// @@ -322,28 +321,6 @@ impl ColIndexMapping { } } -impl ColIndexMapping { - /// Rewrite the dist-key indices and output indices in the given dispatch strategy. Returns - /// `None` if any of the indices is not mapped to the target. - pub fn rewrite_dispatch_strategy( - &self, - strategy: &DispatchStrategy, - ) -> Option { - let map = |index: &[u32]| -> Option> { - index - .iter() - .map(|i| self.try_map(*i as usize).map(|i| i as u32)) - .collect() - }; - - Some(DispatchStrategy { - r#type: strategy.r#type, - dist_key_indices: map(&strategy.dist_key_indices)?, - output_indices: map(&strategy.output_indices)?, - }) - } -} - impl Debug for ColIndexMapping { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( From 0e3fa3bd99a638f96c3543dd52d3c933c328d56c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 30 Apr 2025 17:31:09 +0800 Subject: [PATCH 7/8] remove warning and refine comments on generating col idx mapping Signed-off-by: Bugen Zhao --- .../src/handler/alter_table_column.rs | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index b072f80edfd56..1b950f3bd7e99 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -117,8 +117,14 @@ pub async fn get_replace_table_plan( .await?; // Calculate the mapping from the original columns to the new columns. - // This will be used to map the output of the table in the dispatcher to make - // existing downstream jobs work correctly. + // + // Note: Previously, this will be used to map the output of the table in the dispatcher to make + // existing downstream jobs work correctly. This is no longer the case. We will generate mapping + // directly in the meta service by checking the new schema of this table and all downstream jobs, + // which simplifies handling `ALTER TABLE ALTER COLUMN TYPE`. + // + // TODO: However, we still generate this mapping and use it for rewriting downstream indexes' + // `index_item`. We should consider completely removing this in future works. let col_index_mapping = ColIndexMapping::new( old_catalog .columns() @@ -128,26 +134,17 @@ pub async fn get_replace_table_plan( let new_c = new_c.get_column_desc().unwrap(); // We consider both the column ID and the data type. - // If either of them does not match, we will treat it as a new column. + // If either of them does not match, we will treat it as a different column. // - // TODO: Since we've succeeded in assigning column IDs in the step above, - // the new data type is actually _compatible_ with the old one. - // Theoretically, it's also possible to do some sort of mapping for - // the downstream job to work correctly. However, the current impl - // only supports simple column projection, which we may improve in - // future works. - // However, by treating it as a new column, we can at least reject - // the case where the column with type change is referenced by any - // downstream jobs (because the original column is considered dropped). + // Note that this does not hurt the ability for `ALTER TABLE ALTER COLUMN TYPE`, + // because we don't rely on this mapping in dispatcher. However, if there's an + // index on the column, currently it will fail to rewrite as the column is + // considered as if it's dropped. let id_matches = || new_c.column_id == old_c.column_id().get_id(); let type_matches = || { let original_data_type = old_c.data_type(); let new_data_type = DataType::from(new_c.column_type.as_ref().unwrap()); - let matches = original_data_type == &new_data_type; - if !matches { - notice_to_user(format!("the data type of column \"{}\" has changed, treating as a new column", old_c.name())); - } - matches + original_data_type == &new_data_type }; id_matches() && type_matches() From 8de600f351321ad318e81f46e47e8407b58a5277 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 13 May 2025 15:12:49 +0800 Subject: [PATCH 8/8] fix unused import Signed-off-by: Bugen Zhao --- src/frontend/src/handler/alter_table_column.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 1b950f3bd7e99..ed95716e15017 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -41,7 +41,6 @@ use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; use crate::session::SessionImpl; -use crate::session::current::notice_to_user; use crate::{Binder, TableCatalog}; /// Used in auto schema change process