8000 refactor(meta): avoid using frontend generated `ColumnIndexMapping` when constructing new dispatcher for replacement by BugenZhao · Pull Request #21499 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor(meta): avoid using frontend generated ColumnIndexMapping when constructing new dispatcher for replacement #21499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions e2e_test/ddl/alter_table_column_type.slt
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ NULL

# If a column is currently being referenced by any downstream job, the column cannot be altered.
# TODO: support altering referenced columns
statement ok
create materialized view my_mv_a as select a from my_table;
# TODO: supporting in progress, temporarily disable this test
# statement ok
# create materialized view my_mv_a as select a from my_table;

statement error unable to drop or alter the column due to being referenced by downstream materialized views or sinks
alter table my_table alter column a type struct<v double, w varchar, x int>;
# statement error unable to drop or alter the column due to being referenced by downstream materialized views or sinks
# alter table my_table alter column a type struct<v double, w varchar, x int>;

statement ok
drop table my_table cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ psql -c "
ALTER TABLE shipments1 DROP COLUMN destination;
"

statement error unable to drop or alter the column due to being referenced by downstream materialized views or sinks
statement error unable to drop the column due to being referenced by downstream materialized views or sinks
ALTER TABLE pg_shipments DROP COLUMN destination;

# wait alter ddl
Expand Down
23 changes: 0 additions & 23 deletions src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::vec;

use itertools::Itertools;
use risingwave_pb::catalog::PbColIndexMapping;
use risingwave_pb::stream_plan::DispatchStrategy;

/// `ColIndexMapping` is a partial mapping from usize to usize.
///
Expand Down Expand Up @@ -322,28 +321,6 @@ impl ColIndexMapping {
}
}

impl ColIndexMapping {
/// Rewrite the dist-key indices and output indices in the given dispatch strategy. Returns
/// `None` if any of the indices is not mapped to the target.
pub fn rewrite_dispatch_strategy(
&self,
strategy: &DispatchStrategy,
) -> Option<DispatchStrategy> {
let map = |index: &[u32]| -> Option<Vec<u32>> {
index
.iter()
.map(|i| self.try_map(*i as usize).map(|i| i as u32))
.collect()
};

Some(DispatchStrategy {
r#type: strategy.r#type,
dist_key_indices: map(&strategy.dist_key_indices)?,
output_indices: map(&strategy.output_indices)?,
})
}
}

impl Debug for ColIndexMapping {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
32 changes: 14 additions & 18 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::session::SessionImpl;
use crate::session::current::notice_to_user;
use crate::{Binder, TableCatalog};

/// Used in auto schema change process
Expand Down Expand Up @@ -117,8 +116,14 @@ pub async fn get_replace_table_plan(
.await?;

// Calculate the mapping from the original columns to the new columns.
// This will be used to map the output of the table in the dispatcher to make
// existing downstream jobs work correctly.
//
// Note: Previously, this will be used to map the output of the table in the dispatcher to make
// existing downstream jobs work correctly. This is no longer the case. We will generate mapping
// directly in the meta service by checking the new schema of this table and all downstream jobs,
// which simplifies handling `ALTER TABLE ALTER COLUMN TYPE`.
//
// TODO: However, we still generate this mapping and use it for rewriting downstream indexes'
// `index_item`. We should consider completely removing this in future works.
let col_index_mapping = ColIndexMapping::new(
old_catalog
.columns()
Expand All @@ -128,26 +133,17 @@ pub async fn get_replace_table_plan(
let new_c = new_c.get_column_desc().unwrap();

// We consider both the column ID and the data type.
// If either of them does not match, we will treat it as a new column.
// If either of them does not match, we will treat it as a different column.
//
// TODO: Since we've succeeded in assigning column IDs in the step above,
// the new data type is actually _compatible_ with the old one.
// Theoretically, it's also possible to do some sort of mapping for
// the downstream job to work correctly. However, the current impl
// only supports simple column projection, which we may improve in
// future works.
// However, by treating it as a new column, we can at least reject
// the case where the column with type change is referenced by any
// downstream jobs (because the original column is considered dropped).
// Note that this does not hurt the ability for `ALTER TABLE ALTER COLUMN TYPE`,
// because we don't rely on this mapping in dispatcher. However, if there's an
// index on the column, currently it will fail to rewrite as the column is
// considered as if it's dropped.
let id_matches = || new_c.column_id == old_c.column_id().get_id();
let type_matches = || {
let original_data_type = old_c.data_type();
let new_data_type = DataType::from(new_c.column_type.as_ref().unwrap());
let matches = original_data_type == &new_data_type;
if !matches {
notice_to_user(format!("the data type of column \"{}\" has changed, treating as a new column", old_c.name()));
}
matches
original_data_type == &new_data_type
};

id_matches() && type_matches()
Expand Down
16 changes: 5 additions & 11 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping
use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
DispatchStrategy, PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode,
PbStreamScanType, StreamScanType,
PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode, PbStreamScanType,
StreamScanType,
};
use sea_orm::ActiveValue::Set;
use sea_orm::sea_query::Expr;
Expand Down Expand Up @@ -1559,7 +1559,7 @@ impl CatalogController {
pub async fn get_downstream_fragments(
&self,
job_id: ObjectId,
) -> MetaResult<(Vec<(DispatchStrategy, Fragment)>, Vec<(ActorId, WorkerId)>)> {
) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
let (root_fragment, actors) = self.get_root_fragment(job_id).await?;

let inner = self.inner.read().await;
Expand All @@ -1578,8 +1578,6 @@ impl CatalogController {
for fragment_relation::Model {
target_fragment_id: fragment_id,
dispatcher_type,
dist_key_indices,
output_indices,
..
} in downstream_fragment_relations
{
Expand All @@ -1592,13 +1590,9 @@ impl CatalogController {
}
assert_eq!(fragment_actors.len(), 1);
let (fragment, actors) = fragment_actors.pop().unwrap();
let dispatch_strategy = DispatchStrategy {
r#type: PbDispatcherType::from(dispatcher_type) as _,
dist_key_indices: dist_key_indices.into_u32_array(),
output_indices: output_indices.into_u32_array(),
};
let dispatch_type = PbDispatcherType::from(dispatcher_type);
let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
downstream_fragments.push((dispatch_strategy, fragment));
downstream_fragments.push((dispatch_type, fragment));
}

Ok((downstream_fragments, actors))
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType};
use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
use tokio::sync::oneshot;
use tokio::time::{Instant, sleep};
Expand Down Expand Up @@ -506,7 +506,7 @@ impl MetadataManager {
&self,
job_id: u32,
) -> MetaResult<(
Vec<(PbDispatchStrategy, Fragment)>,
Vec<(PbDispatcherType, Fragment)>,
HashMap<ActorId, WorkerId>,
)> {
let (fragments, actors) = self
Expand Down
26 changes: 4 additions & 22 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ impl DdlController {
fragment_graph: StreamFragmentGraph,
) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
let (mut replace_table_ctx, mut stream_job_fragments) = self
.build_replace_job(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _)
.build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
.await?;

let target_table = streaming_job.table().unwrap();
Expand Down Expand Up @@ -1407,13 +1407,7 @@ impl DdlController {
let mut drop_table_connector_ctx = None;
let result: MetaResult<_> = try {
let (mut ctx, mut stream_job_fragments) = self
.build_replace_job(
ctx,
&streaming_job,
fragment_graph,
col_index_mapping.as_ref(),
tmp_id as _,
)
.build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
.await?;
drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();

Expand Down Expand Up @@ -1845,7 +1839,6 @@ impl DdlController {
stream_ctx: StreamContext,
stream_job: &StreamingJob,
mut fragment_graph: StreamFragmentGraph,
col_index_mapping: Option<&ColIndexMapping>,
tmp_job_id: TableId,
) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
match &stream_job {
Expand Down Expand Up @@ -1904,20 +1897,9 @@ impl DdlController {

let job_type = StreamingJobType::from(stream_job);

// Map the column indices in the dispatchers with the given mapping.
let (mut downstream_fragments, downstream_actor_location) =
// Extract the downstream fragments from the fragment graph.
let (downstream_fragments, downstream_actor_location) =
self.metadata_manager.get_downstream_fragments(id).await?;
if let Some(mapping) = &col_index_mapping {
for (d, _f) in &mut downstream_fragments {
*d = mapping.rewrite_dispatch_strategy(d).ok_or_else(|| {
// The `rewrite` only fails if some column is dropped (missing) or altered (type changed).
// TODO: support altering referenced columns
MetaError::invalid_parameter(
"unable to drop or alter the column due to being referenced by downstream materialized views or sinks",
)
})?;
}
}

// build complete graph based on the table job type
let complete_graph = match &job_type {
Expand Down
Loading
Loading
0