From a2712f7a7d6f01353e4c9bc3b1bdf6821b5d38fc Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 13 Mar 2023 17:42:53 +0800 Subject: [PATCH 01/13] add generated column in proto --- proto/plan_common.proto | 7 ++ src/common/src/catalog/column.rs | 10 +- src/common/src/catalog/mod.rs | 1 + src/common/src/catalog/test_utils.rs | 1 + src/compute/tests/integration_tests.rs | 1 + src/connector/src/parser/avro/util.rs | 1 + .../src/parser/debezium/avro_parser.rs | 32 +---- src/connector/src/parser/protobuf/parser.rs | 1 + src/connector/src/source/manager.rs | 1 + src/frontend/src/binder/expr/mod.rs | 2 + .../src/catalog/system_catalog/mod.rs | 1 + src/frontend/src/catalog/table_catalog.rs | 19 +-- src/frontend/src/handler/create_source.rs | 7 +- src/frontend/src/handler/create_table.rs | 109 ++++++++++-------- src/prost/build.rs | 1 + src/source/src/source_desc.rs | 1 + src/sqlparser/src/ast/ddl.rs | 3 + src/sqlparser/src/parser.rs | 2 + .../hummock_sdk/src/filter_key_extractor.rs | 43 +++---- 19 files changed, 123 insertions(+), 120 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index b378dd90e5365..469bd36ab2be8 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -4,6 +4,7 @@ package plan_common; import "common.proto"; import "data.proto"; +import "expr.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -26,6 +27,8 @@ message ColumnDesc { // For example, when the type is created from a protobuf schema file, // this field will store the message name. string type_name = 5; + // The the column is a generated column. + GeneratedColumnDesc generated_column = 7; } message ColumnCatalog { @@ -33,6 +36,10 @@ message ColumnCatalog { bool is_hidden = 2; } +message GeneratedColumnDesc { + expr.ExprNode expr = 1; +} + message StorageTableDesc { uint32 table_id = 1; repeated ColumnDesc columns = 2; diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 63235b643c585..9489de8b0fd59 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -15,7 +15,7 @@ use std::borrow::Cow; use itertools::Itertools; -use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; +use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc, GeneratedColumnDesc}; use super::row_id_column_desc; use crate::catalog::{Field, ROW_ID_COLUMN_ID}; @@ -88,6 +88,7 @@ pub struct ColumnDesc { pub name: String, pub field_descs: Vec, pub type_name: String, + pub generated_column: Option, } impl ColumnDesc { @@ -98,6 +99,7 @@ impl ColumnDesc { name: String::new(), field_descs: vec![], type_name: String::new(), + generated_column: None, } } @@ -114,6 +116,7 @@ impl ColumnDesc { .map(|f| f.to_protobuf()) .collect_vec(), type_name: self.type_name.clone(), + generated_column: self.generated_column.clone(), } } @@ -156,6 +159,7 @@ impl ColumnDesc { name: name.to_string(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, } } @@ -175,6 +179,7 @@ impl ColumnDesc { name: name.to_string(), field_descs: fields, type_name: type_name.to_string(), + generated_column: None, } } @@ -189,6 +194,7 @@ impl ColumnDesc { .map(Self::from_field_without_column_id) .collect_vec(), type_name: field.type_name.clone(), + generated_column: None, } } @@ -210,6 +216,7 @@ impl From for ColumnDesc { name: prost.name, type_name: prost.type_name, field_descs, + generated_column: prost.generated_column, } } } @@ -228,6 +235,7 @@ impl From<&ColumnDesc> for PbColumnDesc { name: c.name.clone(), field_descs: c.field_descs.iter().map(ColumnDesc::to_protobuf).collect(), type_name: c.type_name.clone(), + generated_column: c.generated_column.clone(), } } } diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 4812f05b35cb2..0b8e25b208e73 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -88,6 +88,7 @@ pub fn row_id_column_desc() -> ColumnDesc { name: row_id_column_name(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, } } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index 54251062b7908..e0df76709ad4c 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -56,6 +56,7 @@ impl ColumnDescTestExt for ColumnDesc { name: name.to_string(), type_name: type_name.to_string(), field_descs: fields, + generated_column: None, } } } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 9224c4433095f..a642d16530478 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -157,6 +157,7 @@ async fn test_table_materialize() -> StreamResult<()> { name: field.name, field_descs: vec![], type_name: "".to_string(), + generated_column: None, }) .collect_vec(); let (barrier_tx, barrier_rx) = unbounded_channel(); diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 4761855bd7970..b92c9f1d4ad8b 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -50,6 +50,7 @@ pub(crate) fn avro_field_to_column_desc( name: name.to_owned(), field_descs: vec_column, type_name: schema_name.to_string(), + generated_column: None, }) } _ => { diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 0946a32e20868..c617c8ac6545d 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -382,46 +382,22 @@ mod tests { assert_eq!(columns.len(), 4); assert_eq!( - CatColumnDesc { - data_type: DataType::Int32, - column_id: 1.into(), - name: "id".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Int32, "id", 1), columns[0] ); assert_eq!( - CatColumnDesc { - data_type: DataType::Varchar, - column_id: 2.into(), - name: "first_name".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Varchar, "first_name", 2), columns[1] ); assert_eq!( - CatColumnDesc { - data_type: DataType::Varchar, - column_id: 3.into(), - name: "last_name".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Varchar, "last_name", 3), columns[2] ); assert_eq!( - CatColumnDesc { - data_type: DataType::Varchar, - column_id: 4.into(), - name: "email".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Varchar, "email", 4), columns[3] ); } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 0fa3ea870a3cf..775241ea6de51 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -154,6 +154,7 @@ impl ProtobufParserConfig { column_type: Some(field_type.to_protobuf()), field_descs, type_name: m.full_name().to_string(), + generated_column: None, }) } else { *index += 1; diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 25afb4f55917c..ce0eb74064ad0 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -72,6 +72,7 @@ impl From<&SourceColumnDesc> for ColumnDesc { name: s.name.clone(), field_descs: s.fields.clone(), type_name: "".to_string(), + generated_column: None, } } } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 2f315f6491673..43bd037aa950f 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -447,6 +447,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { name: f.name.real_value(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, }) }) .collect::>>()? @@ -459,6 +460,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { name: column_def.name.real_value(), field_descs, type_name: "".to_string(), + generated_column: None, }) } diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 8bdd8c5fbebe2..b2ef50bf96585 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -131,6 +131,7 @@ macro_rules! def_sys_catalog { name: col.1.to_string(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, }, is_hidden: false, }) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index b5b2664954e53..0b043aac89c79 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -548,22 +548,11 @@ mod tests { column_id: ColumnId::new(1), name: "country".to_string(), field_descs: vec![ - ColumnDesc { - data_type: DataType::Varchar, - column_id: ColumnId::new(2), - name: "address".to_string(), - field_descs: vec![], - type_name: String::new(), - }, - ColumnDesc { - data_type: DataType::Varchar, - column_id: ColumnId::new(3), - name: "zipcode".to_string(), - field_descs: vec![], - type_name: String::new(), - } + ColumnDesc::new_atomic(DataType::Varchar, "address", 2), + ColumnDesc::new_atomic(DataType::Varchar, "zipcode", 3), ], - type_name: ".test.Country".to_string() + type_name: ".test.Country".to_string(), + generated_column: None, }, is_hidden: false } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index b5ed06085086f..f9af28ca25974 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -41,7 +41,7 @@ use risingwave_sqlparser::ast::{ SourceWatermark, }; -use super::create_table::bind_sql_table_constraints; +use super::create_table::bind_sql_table_column_constraints; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::ColumnId; @@ -460,6 +460,7 @@ fn check_and_add_timestamp_column( name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, }; column_descs.push(kafka_timestamp_column); } @@ -590,7 +591,7 @@ pub async fn handle_create_source( check_and_add_timestamp_column(&with_properties, &mut column_descs, &mut col_id_gen); - let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_constraints( + let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_column_constraints( column_descs.clone(), pk_column_id_from_columns, stmt.constraints, @@ -615,6 +616,8 @@ pub async fn handle_create_source( debug_assert!(is_column_ids_dedup(&columns)); + + let watermark_descs = bind_source_watermark(&session, name.clone(), stmt.source_watermarks, &columns)?; // TODO(yuhao): allow multiple watermark on source. diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 5276ae8c8fe56..f43f37858b5fb 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -29,7 +29,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark, - TableConstraint, + TableConstraint, ColumnOptionDef, }; use super::create_source::resolve_source_schema; @@ -42,6 +42,7 @@ use crate::handler::HandlerArgs; use crate::optimizer::plan_node::LogicalSource; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; +use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::{Binder, TableCatalog, WithOptions}; @@ -145,55 +146,69 @@ pub fn bind_sql_columns( ) .into()); } - for option_def in options { - match option_def.option { - ColumnOption::Unique { is_primary: true } => { - if pk_column_id.is_some() { - return Err(ErrorCode::BindError( - "multiple primary keys are not allowed".into(), - ) - .into()); - } - pk_column_id = Some(column_id); - } - _ => { - return Err(ErrorCode::NotImplemented( - format!("column constraints \"{}\"", option_def), - None.into(), - ) - .into()) - } - } - } - check_valid_column_name(&name.real_value())?; - let field_descs = if let AstDataType::Struct(fields) = &data_type { - fields - .iter() - .map(bind_struct_field) - .collect::>>()? - } else { - vec![] - }; - column_descs.push(ColumnDesc { - data_type: bind_data_type(&data_type)?, - column_id, - name: name.real_value(), - field_descs, - type_name: "".to_string(), - }); + } Ok((column_descs, pk_column_id)) } -/// Binds table constraints given the binding results from column definitions. +/// Binds constraits that can be only specified in column definitions. +pub fn bind_sql_column_constraints(session: &SessionImpl, table_name: String, column_catalogs: Vec, column_constrains: Vec) { + let binder = Binder::new(session); + binder.bind_columns_to_context(table_name, column_catalogs); + for option_def in column_constrains { + match option_def.option { + ColumnOption::GeneratedColumns(expr) => { + + } + ColumnOption::Unique { is_primary: true } => { + // Bind primary key in `bind_sql_table_column_constraints` + } + _ => { + return Err(ErrorCode::NotImplemented( + format!("column constraints \"{}\"", option_def), + None.into(), + ) + .into()) + } + } + } +} + +/// Binds constraints that can be specified in both column definitions and table definition. +/// /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. -pub fn bind_sql_table_constraints( +pub fn bind_sql_table_column_constraints( column_descs: Vec, - pk_column_id_from_columns: Option, - constraints: Vec, + column_constrains: Vec, + table_constraints: Vec, ) -> Result<(Vec, Vec, Option)> { + let mut pk_column_id_from_columns = None; + for option_def in options { + match option_def.option { + ColumnOption::Unique { is_primary: true } => { + if pk_column_id_from_columns.is_some() { + return Err(ErrorCode::BindError( + "multiple primary keys are not allowed".into(), + ) + .into()); + } + pk_column_id_from_columns = Some(column_id); + } + ColumnOption::GeneratedColumns(_) => { + // Bind generated columns in `bind_sql_column_constraints` + } + _ => { + return Err(ErrorCode::NotImplemented( + format!("column constraints \"{}\"", option_def), + None.into(), + ) + .into()) + } + } + } + let mut pk_column_names = vec![]; for constraint in constraints { match constraint { @@ -291,14 +306,15 @@ pub(crate) async fn gen_create_table_plan_with_source( mut col_id_gen: ColumnIdGenerator, append_only: bool, ) -> Result<(PlanRef, Option, ProstTable)> { + let session = context.session_ctx(); let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; let properties = context.with_options().inner().clone().into_iter().collect(); let (mut columns, mut pk_column_ids, mut row_id_index) = - bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; + bind_pk_constraints(column_descs, pk_column_id_from_columns, constraints)?; let watermark_descs = bind_source_watermark( - context.session_ctx(), + session, table_name.real_value(), source_watermarks, &columns, @@ -372,7 +388,7 @@ pub(crate) fn gen_create_table_plan_without_bind( version: Option, ) -> Result<(PlanRef, Option, ProstTable)> { let (columns, pk_column_ids, row_id_index) = - bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; + bind_sql_table_column_constraints(column_descs, pk_column_id_from_columns, constraints)?; let watermark_descs = bind_source_watermark( context.session_ctx(), @@ -705,6 +721,7 @@ mod tests { Err("column \"v3\" named in key does not exist"), ), ] { + let session = SessionImpl::mock(); let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap(); let risingwave_sqlparser::ast::Statement::CreateTable { columns, @@ -713,8 +730,8 @@ mod tests { } = ast.remove(0) else { panic!("test case should be create table") }; let actual: Result<_> = (|| { let (column_descs, pk_column_id_from_columns) = - bind_sql_columns(columns, &mut ColumnIdGenerator::new_initial())?; - let (_, pk_column_ids, _) = bind_sql_table_constraints( + bind_sql_columns(&session, columns, &mut ColumnIdGenerator::new_initial())?; + let (_, pk_column_ids, _) = bind_sql_table_column_constraints( column_descs, pk_column_id_from_columns, constraints, diff --git a/src/prost/build.rs b/src/prost/build.rs index bdf1702f56712..7142327d4bda5 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -64,6 +64,7 @@ fn main() -> Result<(), Box> { .type_attribute("expr.FunctionCall", "#[derive(Eq, Hash)]") .type_attribute("expr.UserDefinedFunction", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.GeneratedColumnDesc", "#[derive(Eq, Hash)]") .out_dir(out_dir.as_path()) .compile(&protos, &[proto_dir.to_string()]) .expect("Failed to compile grpc!"); diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index d6c0036b6fb28..ca92334e3565c 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -214,6 +214,7 @@ pub mod test_utils { name: f.name.clone(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, } .to_protobuf(), ), diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 21f24bb67e4cb..4a6dfb6ba9bbe 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -371,6 +371,8 @@ pub enum ColumnOption { /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT` /// - ... DialectSpecific(Vec), + /// AS ( )` + GeneratedColumns(Expr), } impl fmt::Display for ColumnOption { @@ -403,6 +405,7 @@ impl fmt::Display for ColumnOption { } Check(expr) => write!(f, "CHECK ({})", expr), DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")), + GeneratedColumns(expr) => write!(f, "AS {}", expr), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 2b6654d0cc4df..904d22ef32026 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2170,6 +2170,8 @@ impl Parser { let expr = self.parse_expr()?; self.expect_token(&Token::RParen)?; Ok(Some(ColumnOption::Check(expr))) + } else if self.parse_keyword(Keyword::AS) { + Ok(Some(ColumnOption::GeneratedColumns(self.parse_expr()?))) } else { Ok(None) } diff --git a/src/storage/hummock_sdk/src/filter_key_extractor.rs b/src/storage/hummock_sdk/src/filter_key_extractor.rs index 1d9817646a58e..6f948de089923 100644 --- a/src/storage/hummock_sdk/src/filter_key_extractor.rs +++ b/src/storage/hummock_sdk/src/filter_key_extractor.rs @@ -385,53 +385,40 @@ mod tests { columns: vec![ ProstColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(0), - name: "_row_id".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }) + ( + &ColumnDesc::new_atomic(DataType::Int64, "_row_id", 0) + ) .into(), ), is_hidden: true, }, ProstColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(0), - name: "col_1".to_string(), - field_descs: vec![], - type_name: "Int64".to_string(), - }) + ( + &ColumnDesc::new_atomic(DataType::Int64, "col_1", 0) + ) .into(), + ), is_hidden: false, }, ProstColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Float64, - column_id: ColumnId::new(0), - name: "col_2".to_string(), - field_descs: vec![], - type_name: "Float64".to_string(), - }) + ( + &ColumnDesc::new_atomic(DataType::Float64, "col_2", 0) + ) .into(), + ), is_hidden: false, }, ProstColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Varchar, - column_id: ColumnId::new(0), - name: "col_3".to_string(), - field_descs: vec![], - type_name: "Varchar".to_string(), - }) + ( + &ColumnDesc::new_atomic(DataType::Varchar, "col_3", 0) + ) .into(), + ), is_hidden: false, }, From f3e387c38e918fe2761e882a8e4a508faf92345d Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 15 Mar 2023 13:34:52 +0800 Subject: [PATCH 02/13] refactor --- src/common/src/catalog/column.rs | 2 +- src/frontend/src/handler/create_source.rs | 22 +- src/frontend/src/handler/create_table.rs | 196 ++++++++++-------- src/frontend/src/handler/create_table_as.rs | 2 +- src/frontend/src/optimizer/mod.rs | 28 ++- .../src/scheduler/distributed/query.rs | 24 +-- .../hummock_sdk/src/filter_key_extractor.rs | 25 +-- src/stream/src/executor/lookup/tests.rs | 34 +-- 8 files changed, 162 insertions(+), 171 deletions(-) diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 9489de8b0fd59..36fe2e864e67c 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -15,7 +15,7 @@ use std::borrow::Cow; use itertools::Itertools; -use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc, GeneratedColumnDesc}; +use risingwave_pb::plan_common::{GeneratedColumnDesc, PbColumnCatalog, PbColumnDesc}; use super::row_id_column_desc; use crate::catalog::{Field, ROW_ID_COLUMN_ID}; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 1d5f32440d5b3..5af137d9a09ad 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -46,7 +46,7 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::ColumnId; use crate::expr::Expr; -use crate::handler::create_table::{bind_sql_columns, ColumnIdGenerator}; +use crate::handler::create_table::{bind_sql_columns, ColumnIdGenerator, bind_sql_column_constraints}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::KAFKA_TIMESTAMP_COLUMN_NAME; use crate::session::SessionImpl; @@ -594,17 +594,19 @@ pub async fn handle_create_source( .collect(); let mut col_id_gen = ColumnIdGenerator::new_initial(); - - let (mut column_descs, pk_column_id_from_columns) = - bind_sql_columns(stmt.columns, &mut col_id_gen)?; + + let mut column_descs = + bind_sql_columns(stmt.columns.clone(), &mut col_id_gen)?; check_and_add_timestamp_column(&with_properties, &mut column_descs, &mut col_id_gen); let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_column_constraints( - column_descs.clone(), - pk_column_id_from_columns, - stmt.constraints, - )?; + column_descs, + stmt.columns.clone(), + stmt.constraints, + )?; + + if row_id_index.is_none() { return Err(ErrorCode::InvalidInputSyntax( "Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead" @@ -625,13 +627,13 @@ pub async fn handle_create_source( debug_assert!(is_column_ids_dedup(&columns)); - - let watermark_descs = bind_source_watermark(&session, name.clone(), stmt.source_watermarks, &columns)?; // TODO(yuhao): allow multiple watermark on source. assert!(watermark_descs.len() <= 1); + bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?; + let row_id_index = row_id_index.map(|index| index as _); let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect(); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a4c45d887b3e4..ad92c541760a3 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -26,10 +26,11 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::{ Source as ProstSource, StreamSourceInfo, Table as ProstTable, WatermarkDesc, }; +use risingwave_pb::plan_common::GeneratedColumnDesc; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark, - TableConstraint, ColumnOptionDef, + TableConstraint, }; use super::create_source::resolve_source_schema; @@ -37,6 +38,7 @@ use super::RwPgResponse; use crate::binder::{bind_data_type, bind_struct_field}; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId}; +use crate::expr::Expr; use crate::handler::create_source::{bind_source_watermark, UPSTREAM_SOURCE_KEY}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::LogicalSource; @@ -120,9 +122,7 @@ impl ColumnIdGenerator { pub fn bind_sql_columns( columns: Vec, col_id_gen: &mut ColumnIdGenerator, -) -> Result<(Vec, Option)> { - // In `ColumnDef`, pk can contain only one column. So we use `Option` rather than `Vec`. - let mut pk_column_id = None; +) -> Result> { let mut column_descs = Vec::with_capacity(columns.len()); for column in columns { @@ -134,8 +134,9 @@ pub fn bind_sql_columns( name, data_type, collation, - options, + .. } = column; + let data_type = data_type.ok_or(ErrorCode::InvalidInputSyntax( "data type is not specified".into(), ))?; @@ -147,32 +148,56 @@ pub fn bind_sql_columns( .into()); } + check_valid_column_name(&name.real_value())?; + + let field_descs = if let AstDataType::Struct(fields) = &data_type { + fields + .iter() + .map(bind_struct_field) + .collect::>>()? + } else { + vec![] + }; + column_descs.push(ColumnDesc { + data_type: bind_data_type(&data_type)?, + column_id, + name: name.real_value(), + field_descs, + type_name: "".to_string(), + generated_column: None, + }); } - Ok((column_descs, pk_column_id)) + Ok(column_descs) } /// Binds constraits that can be only specified in column definitions. -pub fn bind_sql_column_constraints(session: &SessionImpl, table_name: String, column_catalogs: Vec, column_constrains: Vec) { - let binder = Binder::new(session); - binder.bind_columns_to_context(table_name, column_catalogs); - for option_def in column_constrains { - match option_def.option { - ColumnOption::GeneratedColumns(expr) => { - - } - ColumnOption::Unique { is_primary: true } => { - // Bind primary key in `bind_sql_table_column_constraints` - } - _ => { - return Err(ErrorCode::NotImplemented( - format!("column constraints \"{}\"", option_def), - None.into(), - ) - .into()) +pub fn bind_sql_column_constraints(session: &SessionImpl, table_name: String, column_catalogs: &mut[ColumnCatalog], columns: Vec) -> Result<()> { + let mut binder = Binder::new(session); + binder.bind_columns_to_context(table_name.clone(), column_catalogs.to_vec())?; + for column in columns { + + for option_def in column.options { + match option_def.option { + ColumnOption::GeneratedColumns(expr) => { + let idx = binder.get_column_binding_index(table_name.clone(), &column.name.real_value())?; + let expr_node = binder.bind_expr(expr)?.to_expr_proto(); + column_catalogs[idx].column_desc.generated_column = Some(GeneratedColumnDesc{expr: Some(expr_node)}); + } + ColumnOption::Unique { is_primary: true } => { + // Bind primary key in `bind_sql_table_column_constraints` + } + _ => { + return Err(ErrorCode::NotImplemented( + format!("column constraints \"{}\"", option_def), + None.into(), + ) + .into()) + } } } } + Ok(()) } /// Binds constraints that can be specified in both column definitions and table definition. @@ -180,37 +205,46 @@ pub fn bind_sql_column_constraints(session: &SessionImpl, table_name: String, co /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. pub fn bind_sql_table_column_constraints( - column_descs: Vec, - column_constrains: Vec, + columns_descs: Vec, + columns_defs: Vec, table_constraints: Vec, ) -> Result<(Vec, Vec, Option)> { - let mut pk_column_id_from_columns = None; - for option_def in options { - match option_def.option { - ColumnOption::Unique { is_primary: true } => { - if pk_column_id_from_columns.is_some() { - return Err(ErrorCode::BindError( - "multiple primary keys are not allowed".into(), + let mut pk_column_names = vec![]; + // Mapping from column name to column id. + let name_to_id = columns_descs + .iter() + .map(|c| (c.name.as_str(), c.column_id)) + .collect::>(); + + // Bind column constraints + for column in columns_defs { + for option_def in column.options { + match option_def.option { + ColumnOption::Unique { is_primary: true } => { + if !pk_column_names.is_empty() { + return Err(ErrorCode::BindError( + "multiple primary keys are not allowed".into(), + ) + .into()); + } + pk_column_names.push(column.name.real_value()); + } + ColumnOption::GeneratedColumns(_) => { + // Bind generated columns in `bind_sql_column_constraints` + } + _ => { + return Err(ErrorCode::NotImplemented( + format!("column constraints \"{}\"", option_def), + None.into(), ) - .into()); + .into()) } - pk_column_id_from_columns = Some(column_id); - } - ColumnOption::GeneratedColumns(_) => { - // Bind generated columns in `bind_sql_column_constraints` - } - _ => { - return Err(ErrorCode::NotImplemented( - format!("column constraints \"{}\"", option_def), - None.into(), - ) - .into()) } } } - let mut pk_column_names = vec![]; - for constraint in constraints { + // Bind table constraints. + for constraint in table_constraints { match constraint { TableConstraint::Unique { name: _, @@ -223,7 +257,7 @@ pub fn bind_sql_table_column_constraints( ) .into()); } - pk_column_names = columns; + pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); } _ => { return Err(ErrorCode::NotImplemented( @@ -234,36 +268,19 @@ pub fn bind_sql_table_column_constraints( } } } - let mut pk_column_ids = match (pk_column_id_from_columns, pk_column_names.is_empty()) { - (Some(_), false) => { - return Err(ErrorCode::BindError("multiple primary keys are not allowed".into()).into()) - } - (None, true) => { - // We don't have a pk column now, so we need to add row_id column as the pk column - // later. - vec![] - } - (Some(cid), true) => vec![cid], - (None, false) => { - let name_to_id = column_descs - .iter() - .map(|c| (c.name.as_str(), c.column_id)) - .collect::>(); - pk_column_names - .iter() - .map(|ident| { - let name = ident.real_value(); - name_to_id.get(name.as_str()).copied().ok_or_else(|| { - ErrorCode::BindError(format!( - "column \"{name}\" named in key does not exist" - )) - }) - }) - .try_collect()? - } - }; - let mut columns_catalog = column_descs + let mut pk_column_ids: Vec<_> = pk_column_names + .iter() + .map(|name| { + name_to_id.get(name.as_str()).copied().ok_or_else(|| { + ErrorCode::BindError(format!( + "column \"{name}\" named in key does not exist" + )) + }) + }) + .try_collect()?; + + let mut columns_catalog = columns_descs .into_iter() .map(|c| { // All columns except `_row_id` or starts with `_rw` should be visible. @@ -299,7 +316,7 @@ pub fn bind_sql_table_column_constraints( pub(crate) async fn gen_create_table_plan_with_source( context: OptimizerContext, table_name: ObjectName, - columns: Vec, + column_defs: Vec, constraints: Vec, source_schema: SourceSchema, source_watermarks: Vec, @@ -307,11 +324,11 @@ pub(crate) async fn gen_create_table_plan_with_source( append_only: bool, ) -> Result<(PlanRef, Option, ProstTable)> { let session = context.session_ctx(); - let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + let column_descs = bind_sql_columns(column_defs.clone(), &mut col_id_gen)?; let mut properties = context.with_options().inner().clone().into_iter().collect(); let (mut columns, mut pk_column_ids, mut row_id_index) = - bind_pk_constraints(column_descs, pk_column_id_from_columns, constraints)?; + bind_sql_table_column_constraints(column_descs, column_defs.clone(), constraints)?; let watermark_descs = bind_source_watermark( session, @@ -334,6 +351,8 @@ pub(crate) async fn gen_create_table_plan_with_source( ) .await?; + bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?; + gen_table_plan_inner( context.into(), table_name, @@ -361,14 +380,14 @@ pub(crate) fn gen_create_table_plan( append_only: bool, ) -> Result<(PlanRef, Option, ProstTable)> { let definition = context.normalized_sql().to_owned(); - let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + let column_descs = bind_sql_columns(columns.clone(), &mut col_id_gen)?; let properties = context.with_options().inner().clone().into_iter().collect(); gen_create_table_plan_without_bind( context, table_name, column_descs, - pk_column_id_from_columns, + columns, constraints, properties, definition, @@ -383,7 +402,7 @@ pub(crate) fn gen_create_table_plan_without_bind( context: OptimizerContext, table_name: ObjectName, column_descs: Vec, - pk_column_id_from_columns: Option, + column_defs: Vec, constraints: Vec, properties: HashMap, definition: String, @@ -391,8 +410,8 @@ pub(crate) fn gen_create_table_plan_without_bind( append_only: bool, version: Option, ) -> Result<(PlanRef, Option, ProstTable)> { - let (columns, pk_column_ids, row_id_index) = - bind_sql_table_column_constraints(column_descs, pk_column_id_from_columns, constraints)?; + let (mut columns, pk_column_ids, row_id_index) = + bind_sql_table_column_constraints(column_descs, column_defs.clone(), constraints)?; let watermark_descs = bind_source_watermark( context.session_ctx(), @@ -401,6 +420,8 @@ pub(crate) fn gen_create_table_plan_without_bind( &columns, )?; + bind_sql_column_constraints(context.session_ctx(), table_name.real_value(), &mut columns, column_defs)?; + gen_table_plan_inner( context.into(), table_name, @@ -727,7 +748,6 @@ mod tests { Err("column \"v3\" named in key does not exist"), ), ] { - let session = SessionImpl::mock(); let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap(); let risingwave_sqlparser::ast::Statement::CreateTable { columns, @@ -735,11 +755,11 @@ mod tests { .. } = ast.remove(0) else { panic!("test case should be create table") }; let actual: Result<_> = (|| { - let (column_descs, pk_column_id_from_columns) = - bind_sql_columns(&session, columns, &mut ColumnIdGenerator::new_initial())?; + let column_descs = + bind_sql_columns(columns.clone(), &mut ColumnIdGenerator::new_initial())?; let (_, pk_column_ids, _) = bind_sql_table_column_constraints( column_descs, - pk_column_id_from_columns, + columns, constraints, )?; Ok(pk_column_ids) diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 4db3dbd5cfa60..488dffe8729c7 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -97,7 +97,7 @@ pub async fn handle_create_as( context, table_name.clone(), column_descs, - None, + vec![], vec![], properties, "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 91dac433c14ce..d1b0c6893fac4 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -38,6 +38,7 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_pb::catalog::WatermarkDesc; +use risingwave_pb::plan_common::GeneratedColumnDesc; use self::heuristic_optimizer::ApplyOrder; use self::plan_node::{ @@ -50,7 +51,7 @@ use self::plan_visitor::InputRefValidator; use self::property::RequiredDist; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; -use crate::expr::InputRef; +use crate::expr::{InputRef, ExprRewriter}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, }; @@ -368,6 +369,31 @@ impl PlanRoot { Ok(plan) } + pub fn gen_optional_generated_column_project(columns: Vec) -> Option { + let col_mapping = { + let mut mapping = Vec::with_capacity(columns.len()); + let mut cur = 0; + for (idx, column) in columns.iter().enumerate() { + if column.column_desc.generated_column.is_some() { + mapping[idx] = Some(cur); + cur += 1; + } else { + mapping[idx] = None; + } + } + ColIndexMapping::new(mapping) + }; + for column in columns { + let GeneratedColumnDesc{expr} = column.column_desc.generated_column.unwrap(); + if let Some(expr) = expr { + + } + + } + None + + } + /// Optimize and generate a create table plan. #[allow(clippy::too_many_arguments)] pub fn gen_table_plan( diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 4689f2bdd8748..8bef1cd57e6ac 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -519,27 +519,9 @@ pub(crate) mod tests { stream_key: vec![], pk: vec![], columns: vec![ - ColumnDesc { - data_type: DataType::Int32, - column_id: 0.into(), - name: "a".to_string(), - type_name: String::new(), - field_descs: vec![], - }, - ColumnDesc { - data_type: DataType::Float64, - column_id: 1.into(), - name: "b".to_string(), - type_name: String::new(), - field_descs: vec![], - }, - ColumnDesc { - data_type: DataType::Int64, - column_id: 2.into(), - name: "_row_id".to_string(), - type_name: String::new(), - field_descs: vec![], - }, + ColumnDesc::new_atomic(DataType::Int32, "a", 0), + ColumnDesc::new_atomic(DataType::Float64, "b", 1), + ColumnDesc::new_atomic(DataType::Int64, "c", 2), ], distribution_key: vec![2], append_only: false, diff --git a/src/storage/hummock_sdk/src/filter_key_extractor.rs b/src/storage/hummock_sdk/src/filter_key_extractor.rs index 12bbaae1ca72a..1279fe562d629 100644 --- a/src/storage/hummock_sdk/src/filter_key_extractor.rs +++ b/src/storage/hummock_sdk/src/filter_key_extractor.rs @@ -338,7 +338,7 @@ mod tests { use bytes::{BufMut, BytesMut}; use itertools::Itertools; - use risingwave_common::catalog::{ColumnDesc, ColumnId}; + use risingwave_common::catalog::ColumnDesc; use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; @@ -383,40 +383,25 @@ mod tests { columns: vec![ ProstColumnCatalog { column_desc: Some( - ( - &ColumnDesc::new_atomic(DataType::Int64, "_row_id", 0) - ) - .into(), + (&ColumnDesc::new_atomic(DataType::Int64, "_row_id", 0)).into(), ), is_hidden: true, }, ProstColumnCatalog { column_desc: Some( - ( - &ColumnDesc::new_atomic(DataType::Int64, "col_1", 0) - ) - .into(), - + (&ColumnDesc::new_atomic(DataType::Int64, "col_1", 0)).into(), ), is_hidden: false, }, ProstColumnCatalog { column_desc: Some( - ( - &ColumnDesc::new_atomic(DataType::Float64, "col_2", 0) - ) - .into(), - + (&ColumnDesc::new_atomic(DataType::Float64, "col_2", 0)).into(), ), is_hidden: false, }, ProstColumnCatalog { column_desc: Some( - ( - &ColumnDesc::new_atomic(DataType::Varchar, "col_3", 0) - ) - .into(), - + (&ColumnDesc::new_atomic(DataType::Varchar, "col_3", 0)).into(), ), is_hidden: false, }, diff --git a/src/stream/src/executor/lookup/tests.rs b/src/stream/src/executor/lookup/tests.rs index 66898dbd0d48a..6c15acf04fa0d 100644 --- a/src/stream/src/executor/lookup/tests.rs +++ b/src/stream/src/executor/lookup/tests.rs @@ -20,7 +20,7 @@ use futures::StreamExt; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_storage::memory::MemoryStateStore; @@ -35,20 +35,8 @@ use crate::executor::{ fn arrangement_col_descs() -> Vec { vec![ - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(0), - name: "rowid_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(1), - name: "join_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, + ColumnDesc::new_atomic(DataType::Int64, "rowid_column", 0), + ColumnDesc::new_atomic(DataType::Int64, "join_column", 1), ] } @@ -152,20 +140,8 @@ async fn create_arrangement( /// | b | | | 3 -> 4 | fn create_source() -> Box { let columns = vec![ - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(1), - name: "join_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(2), - name: "rowid_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, + ColumnDesc::new_atomic(DataType::Int64, "join_column", 1), + ColumnDesc::new_atomic(DataType::Int64, "rowid_column", 2), ]; // Prepare source chunks. From e5710ea6b4c38588415195f52e2d0c3c37a9cb89 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 15 Mar 2023 13:50:42 +0800 Subject: [PATCH 03/13] new for stream --- src/frontend/src/binder/expr/function.rs | 4 ++-- src/frontend/src/binder/mod.rs | 8 ++++---- src/frontend/src/handler/create_sink.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 040ba156da5b6..3c565bb809eac 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -295,7 +295,7 @@ impl Binder { fn now() -> Handle { Box::new(move |binder, mut inputs| { binder.ensure_now_function_allowed()?; - if !binder.in_create_mv { + if !binder.in_streaming { inputs.push(ExprImpl::from(Literal::new( Some(ScalarImpl::Int64((binder.bind_timestamp_ms * 1000) as i64)), DataType::Timestamptz, @@ -627,7 +627,7 @@ impl Binder { } fn ensure_now_function_allowed(&self) -> Result<()> { - if self.in_create_mv + if self.in_streaming && !matches!( self.context.clause, Some(Clause::Where) | Some(Clause::Having) diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index b52c2b79d6d1a..09242981f1930 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -85,15 +85,15 @@ pub struct Binder { next_share_id: ShareId, search_path: SearchPath, - /// Whether the Binder is binding an MV. - in_create_mv: bool, + /// Whether the Binder is binding an MV/SINK. + in_streaming: bool, /// `ShareId`s identifying shared views. shared_views: HashMap, } impl Binder { - fn new_inner(session: &SessionImpl, in_create_mv: bool) -> Binder { + fn new_inner(session: &SessionImpl, in_streaming: bool) -> Binder { let now_ms = session .env() .hummock_snapshot_manager() @@ -112,7 +112,7 @@ impl Binder { next_values_id: 0, next_share_id: 0, search_path: session.config().get_search_path(), - in_create_mv, + in_streaming, shared_views: HashMap::new(), } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b7aad90ac95b7..d0746db170e50 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -79,7 +79,7 @@ pub fn gen_sink_plan( let definition = context.normalized_sql().to_owned(); let bound = { - let mut binder = Binder::new(session); + let mut binder = Binder::new_for_stream(session); binder.bind_query(*query)? }; From c03eae4696cf52ab3fa6b6cb1b9f691fc6157c9b Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 22 Mar 2023 03:10:33 +0800 Subject: [PATCH 04/13] work --- src/batch/src/executor/insert.rs | 1 - src/batch/src/executor/values.rs | 1 - src/common/src/catalog/column.rs | 9 ++ src/frontend/src/binder/insert.rs | 36 +++++-- src/frontend/src/catalog/table_catalog.rs | 14 +++ src/frontend/src/handler/create_source.rs | 17 ++- src/frontend/src/handler/create_table.rs | 100 +++++++++++++----- src/frontend/src/optimizer/mod.rs | 46 +++----- .../src/optimizer/plan_node/generic/agg.rs | 2 +- .../src/optimizer/plan_node/logical_source.rs | 75 ++++++++++++- src/frontend/src/planner/relation.rs | 3 +- src/frontend/src/utils/rewrite_index.rs | 8 +- src/source/src/table.rs | 4 +- 13 files changed, 235 insertions(+), 81 deletions(-) diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 00388a054f704..0d78da1fe3aaa 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -126,7 +126,6 @@ impl InsertExecutor { let row_id_col = I64Array::from_iter(repeat(None).take(cap)); columns.insert(row_id_index, row_id_col.into()) } - let stream_chunk = StreamChunk::new(vec![Op::Insert; cap], columns, vis.into_visibility()); diff --git a/src/batch/src/executor/values.rs b/src/batch/src/executor/values.rs index c6cc8bf83c611..38c1478d232ef 100644 --- a/src/batch/src/executor/values.rs +++ b/src/batch/src/executor/values.rs @@ -94,7 +94,6 @@ impl ValuesExecutor { .collect(); let chunk = DataChunk::new(columns, chunk_size); - yield chunk } } diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 7f9b56cfda3fb..29e0d7db7cb13 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -206,6 +206,10 @@ impl ColumnDesc { pub fn from_field_without_column_id(field: &Field) -> Self { Self::from_field_with_column_id(field, 0) } + + pub fn is_generated_column(&self) -> bool { + self.generated_column.is_some() + } } impl From for ColumnDesc { @@ -257,6 +261,11 @@ impl ColumnCatalog { self.is_hidden } + /// If the column is a generated column + pub fn is_generated(&self) -> bool { + self.column_desc.generated_column.is_some() + } + /// Get a reference to the column desc's data type. pub fn data_type(&self) -> &DataType { &self.column_desc.data_type diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index a0cfe457183d1..5bb95ac31062e 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -81,19 +81,41 @@ impl Binder { let table_id = table_catalog.id; let owner = table_catalog.owner; let table_version_id = table_catalog.version_id().expect("table must be versioned"); - let columns_to_insert = table_catalog - .columns - .clone() - .into_iter() - .filter(|c| !c.is_hidden()) - .collect_vec(); - let row_id_index = table_catalog.row_id_index; + let columns_to_insert = table_catalog.columns_to_insert().cloned().collect_vec(); let expected_types: Vec = columns_to_insert .iter() .map(|c| c.data_type().clone()) .collect(); + let generated_column_names: HashSet<_> = table_catalog.generated_column_names().collect(); + for query_col in &columns { + let query_col_name = query_col.real_value(); + if generated_column_names.contains(query_col_name.as_str()) { + return Err(RwError::from(ErrorCode::BindError(format!( + "cannot insert a non-DEFAULT value into column \"{0}\". Column \"{0}\" is a generated column.", + &query_col_name + )))); + } + } + + // TODO(yuhao): refine this if row_id is always the last column. + // + // `row_id_index` in bin insert operation should rule out generated column + let row_id_index = { + if let Some(row_id_index) = table_catalog.row_id_index { + let mut cnt = 0; + for col in table_catalog.columns().iter().take(row_id_index + 1) { + if col.is_generated() { + cnt += 1; + } + } + Some(row_id_index - cnt) + } else { + None + } + }; + // When the column types of `source` query do not match `expected_types`, casting is // needed. // diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index ad262515c8767..5a461120ac68b 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -372,6 +372,20 @@ impl TableCatalog { handle_pk_conflict_behavior: self.conflict_behavior_type, } } + + /// Get columns excluding hidden columns and generated golumns. + pub fn columns_to_insert(&self) -> impl Iterator { + self.columns + .iter() + .filter(|c| !c.is_hidden() && !c.is_generated()) + } + + pub fn generated_column_names(&self) -> impl Iterator { + self.columns + .iter() + .filter(|c| c.is_generated()) + .map(|c| c.name()) + } } impl From for TableCatalog { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5af137d9a09ad..06c3b73727ea4 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -46,7 +46,9 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::ColumnId; use crate::expr::Expr; -use crate::handler::create_table::{bind_sql_columns, ColumnIdGenerator, bind_sql_column_constraints}; +use crate::handler::create_table::{ + bind_sql_column_constraints, bind_sql_columns, ColumnIdGenerator, +}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::KAFKA_TIMESTAMP_COLUMN_NAME; use crate::session::SessionImpl; @@ -594,18 +596,13 @@ pub async fn handle_create_source( .collect(); let mut col_id_gen = ColumnIdGenerator::new_initial(); - - let mut column_descs = - bind_sql_columns(stmt.columns.clone(), &mut col_id_gen)?; - check_and_add_timestamp_column(&with_properties, &mut column_descs, &mut col_id_gen); + let mut column_descs = bind_sql_columns(stmt.columns.clone(), &mut col_id_gen)?; - let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_column_constraints( - column_descs, - stmt.columns.clone(), - stmt.constraints, - )?; + check_and_add_timestamp_column(&with_properties, &mut column_descs, &mut col_id_gen); + let (mut columns, mut pk_column_ids, mut row_id_index) = + bind_sql_table_column_constraints(column_descs, stmt.columns.clone(), stmt.constraints)?; if row_id_index.is_none() { return Err(ErrorCode::InvalidInputSyntax( diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ad92c541760a3..1ac5a9eb0991a 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -38,7 +38,7 @@ use super::RwPgResponse; use crate::binder::{bind_data_type, bind_struct_field}; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId}; -use crate::expr::Expr; +use crate::expr::{Expr, ExprImpl}; use crate::handler::create_source::{bind_source_watermark, UPSTREAM_SOURCE_KEY}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::LogicalSource; @@ -147,9 +147,9 @@ pub fn bind_sql_columns( ) .into()); } - + check_valid_column_name(&name.real_value())?; - + let field_descs = if let AstDataType::Struct(fields) = &data_type { fields .iter() @@ -171,18 +171,68 @@ pub fn bind_sql_columns( Ok(column_descs) } -/// Binds constraits that can be only specified in column definitions. -pub fn bind_sql_column_constraints(session: &SessionImpl, table_name: String, column_catalogs: &mut[ColumnCatalog], columns: Vec) -> Result<()> { - let mut binder = Binder::new(session); +fn check_generated_column_constraints( + column_name: &String, + expr: &ExprImpl, + column_catalogs: &[ColumnCatalog], + generated_column_names: &[String], +) -> Result<()> { + let input_refs = expr.collect_input_refs(column_catalogs.len()); + for idx in input_refs.ones() { + let referred_generated_column = &column_catalogs[idx].column_desc.name; + if generated_column_names + .iter() + .any(|c| c == referred_generated_column) + { + return Err(ErrorCode::BindError( + format!("Generated can not reference another generated column, but here generated column \"{}\" referenced another generated column \"{}\"", column_name, referred_generated_column), + ) + .into()); + } + } + Ok(()) +} + +/// Binds constraints that can be only specified in column definitions. +pub fn bind_sql_column_constraints( + session: &SessionImpl, + table_name: String, + column_catalogs: &mut [ColumnCatalog], + columns: Vec, +) -> Result<()> { + let generated_column_names = { + let mut names = vec![]; + for column in &columns { + for option_def in &column.options { + if let ColumnOption::GeneratedColumns(_) = option_def.option { + names.push(column.name.real_value()); + break; + } + } + } + names + }; + + let mut binder = Binder::new_for_stream(session); binder.bind_columns_to_context(table_name.clone(), column_catalogs.to_vec())?; for column in columns { - for option_def in column.options { match option_def.option { ColumnOption::GeneratedColumns(expr) => { - let idx = binder.get_column_binding_index(table_name.clone(), &column.name.real_value())?; - let expr_node = binder.bind_expr(expr)?.to_expr_proto(); - column_catalogs[idx].column_desc.generated_column = Some(GeneratedColumnDesc{expr: Some(expr_node)}); + let idx = binder + .get_column_binding_index(table_name.clone(), &column.name.real_value())?; + let expr_impl = binder.bind_expr(expr)?; + + check_generated_column_constraints( + &column.name.real_value(), + &expr_impl, + column_catalogs, + &generated_column_names, + )?; + + column_catalogs[idx].column_desc.generated_column = Some(GeneratedColumnDesc { + expr: Some(expr_impl.to_expr_proto()), + }); } ColumnOption::Unique { is_primary: true } => { // Bind primary key in `bind_sql_table_column_constraints` @@ -201,7 +251,7 @@ pub fn bind_sql_column_constraints(session: &SessionImpl, table_name: String, co } /// Binds constraints that can be specified in both column definitions and table definition. -/// +/// /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. pub fn bind_sql_table_column_constraints( @@ -270,15 +320,13 @@ pub fn bind_sql_table_column_constraints( } let mut pk_column_ids: Vec<_> = pk_column_names - .iter() - .map(|name| { - name_to_id.get(name.as_str()).copied().ok_or_else(|| { - ErrorCode::BindError(format!( - "column \"{name}\" named in key does not exist" - )) + .iter() + .map(|name| { + name_to_id.get(name.as_str()).copied().ok_or_else(|| { + ErrorCode::BindError(format!("column \"{name}\" named in key does not exist")) + }) }) - }) - .try_collect()?; + .try_collect()?; let mut columns_catalog = columns_descs .into_iter() @@ -420,7 +468,12 @@ pub(crate) fn gen_create_table_plan_without_bind( &columns, )?; - bind_sql_column_constraints(context.session_ctx(), table_name.real_value(), &mut columns, column_defs)?; + bind_sql_column_constraints( + context.session_ctx(), + table_name.real_value(), + &mut columns, + column_defs, + )?; gen_table_plan_inner( context.into(), @@ -757,11 +810,8 @@ mod tests { let actual: Result<_> = (|| { let column_descs = bind_sql_columns(columns.clone(), &mut ColumnIdGenerator::new_initial())?; - let (_, pk_column_ids, _) = bind_sql_table_column_constraints( - column_descs, - columns, - constraints, - )?; + let (_, pk_column_ids, _) = + bind_sql_table_column_constraints(column_descs, columns, constraints)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index d1b0c6893fac4..8424c9f5d40a9 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -38,12 +38,11 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_pb::catalog::WatermarkDesc; -use risingwave_pb::plan_common::GeneratedColumnDesc; use self::heuristic_optimizer::ApplyOrder; use self::plan_node::{ - BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject, - StreamRowIdGen, StreamSink, StreamWatermarkFilter, + BatchProject, Convention, LogicalProject, LogicalSource, StreamDml, StreamMaterialize, + StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, }; use self::plan_visitor::has_batch_exchange; #[cfg(debug_assertions)] @@ -51,7 +50,7 @@ use self::plan_visitor::InputRefValidator; use self::property::RequiredDist; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; -use crate::expr::{InputRef, ExprRewriter}; +use crate::expr::InputRef; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, }; @@ -369,31 +368,6 @@ impl PlanRoot { Ok(plan) } - pub fn gen_optional_generated_column_project(columns: Vec) -> Option { - let col_mapping = { - let mut mapping = Vec::with_capacity(columns.len()); - let mut cur = 0; - for (idx, column) in columns.iter().enumerate() { - if column.column_desc.generated_column.is_some() { - mapping[idx] = Some(cur); - cur += 1; - } else { - mapping[idx] = None; - } - } - ColIndexMapping::new(mapping) - }; - for column in columns { - let GeneratedColumnDesc{expr} = column.column_desc.generated_column.unwrap(); - if let Some(expr) = expr { - - } - - } - None - - } - /// Optimize and generate a create table plan. #[allow(clippy::too_many_arguments)] pub fn gen_table_plan( @@ -412,10 +386,22 @@ impl PlanRoot { stream_plan = StreamDml::new( stream_plan, append_only, - columns.iter().map(|c| c.column_desc.clone()).collect(), + columns + .iter() + .filter_map(|c| (!c.is_generated()).then_some(c.column_desc.clone())) + .collect(), ) .into(); + // Add generated columns. + let exprs = LogicalSource::gen_optional_generated_column_project_exprs( + columns.iter().map(|c| c.column_desc.clone()).collect(), + )?; + if let Some(exprs) = exprs { + let logical_project = LogicalProject::new(stream_plan, exprs); + stream_plan = StreamProject::new(logical_project).into(); + } + // Add WatermarkFilter node. if !watermark_descs.is_empty() { stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into(); diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index a0322817b4aea..944fb2b60ee05 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -550,7 +550,7 @@ impl PlanAggCall { }); // modify filter - let mut rewriter = IndexRewriter { mapping }; + let mut rewriter = IndexRewriter::new(mapping); self.filter.conjunctions.iter_mut().for_each(|x| { *x = rewriter.rewrite_expr(x.clone()); }); diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 5300db3ca2610..94abbe37b9f78 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -21,6 +21,7 @@ use std::rc::Rc; use risingwave_common::catalog::{ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; +use risingwave_pb::plan_common::GeneratedColumnDesc; use super::stream_watermark_filter::StreamWatermarkFilter; use super::{ @@ -29,12 +30,12 @@ use super::{ }; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::ColumnId; -use crate::expr::{Expr, ExprImpl, ExprType}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::utils::{ColIndexMapping, Condition}; +use crate::utils::{ColIndexMapping, Condition, IndexRewriter}; use crate::TableCatalog; /// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we @@ -83,6 +84,76 @@ impl LogicalSource { } } + pub fn gen_optional_generated_column_project_exprs( + column_descs: Vec, + ) -> Result>> { + if !column_descs.iter().any(|c| c.generated_column.is_some()) { + return Ok(None); + } + + let col_mapping = { + let mut mapping = vec![None; column_descs.len()]; + let mut cur = 0; + for (idx, column_desc) in column_descs.iter().enumerate() { + if column_desc.generated_column.is_none() { + mapping[idx] = Some(cur); + cur += 1; + } else { + mapping[idx] = None; + } + } + ColIndexMapping::new(mapping) + }; + + let mut rewriter = IndexRewriter::new(col_mapping); + let mut exprs = Vec::with_capacity(column_descs.len()); + let mut cur = 0; + for column_desc in column_descs { + let ret_data_type = column_desc.data_type.clone(); + if let Some(generated_column) = column_desc.generated_column { + let GeneratedColumnDesc { expr } = generated_column; + // TODO(yuhao): avoid this `from_expr_proto`. + let proj_expr = rewriter.rewrite_expr(ExprImpl::from_expr_proto(&expr.unwrap())?); + exprs.push(proj_expr); + } else { + let input_ref = InputRef { + data_type: ret_data_type, + index: cur, + }; + cur += 1; + exprs.push(ExprImpl::InputRef(Box::new(input_ref))); + } + } + + Ok(Some(exprs)) + } + + pub fn create( + source_catalog: Option>, + column_descs: Vec, + pk_col_ids: Vec, + row_id_index: Option, + gen_row_id: bool, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let source = Self::new( + source_catalog, + column_descs.clone(), + pk_col_ids, + row_id_index, + gen_row_id, + for_table, + ctx, + ); + let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; + if let Some(exprs) = exprs { + Ok(LogicalProject::new(source.into(), exprs).into()) + } else { + Ok(source.into()) + } + } + pub(super) fn column_names(&self) -> Vec { self.schema() .fields() diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index f3f1946d1f51c..1103d30322e03 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -86,7 +86,7 @@ impl Planner { let pk_col_ids = source.catalog.pk_col_ids.clone(); let row_id_index = source.catalog.row_id_index; let gen_row_id = source.catalog.append_only; - Ok(LogicalSource::new( + LogicalSource::create( Some(Rc::new(source.catalog)), column_descs, pk_col_ids, @@ -95,7 +95,6 @@ impl Planner { false, self.ctx(), ) - .into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { diff --git a/src/frontend/src/utils/rewrite_index.rs b/src/frontend/src/utils/rewrite_index.rs index 2107cb55d5219..74b4bf3986f73 100644 --- a/src/frontend/src/utils/rewrite_index.rs +++ b/src/frontend/src/utils/rewrite_index.rs @@ -16,7 +16,13 @@ use super::ColIndexMapping; use crate::expr::{ExprImpl, ExprRewriter, InputRef}; pub struct IndexRewriter { - pub mapping: ColIndexMapping, + mapping: ColIndexMapping, +} + +impl IndexRewriter { + pub fn new(mapping: ColIndexMapping) -> Self { + Self { mapping } + } } impl ExprRewriter for IndexRewriter { diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 1e1b9e16f5d70..778b33846d525 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -100,7 +100,9 @@ impl TableDmlHandle { #[cfg(debug_assertions)] risingwave_common::util::schema_check::schema_check( - self.column_descs.iter().map(|c| &c.data_type), + self.column_descs + .iter() + .filter_map(|c| (!c.is_generated_column()).then_some(&c.data_type)), chunk.columns(), ) .expect("table source write chunk schema check failed"); From 78887f67dc1e44fc4fc89ef79d36cf31ac1d6bc3 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 22 Mar 2023 03:32:14 +0800 Subject: [PATCH 05/13] dashboard --- dashboard/proto/gen/plan_common.ts | 37 +++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/dashboard/proto/gen/plan_common.ts b/dashboard/proto/gen/plan_common.ts index 384b8d11554c4..ae12ba8f709a0 100644 --- a/dashboard/proto/gen/plan_common.ts +++ b/dashboard/proto/gen/plan_common.ts @@ -1,6 +1,7 @@ /* eslint-disable */ import { ColumnOrder } from "./common"; import { DataType } from "./data"; +import { ExprNode } from "./expr"; export const protobufPackage = "plan_common"; @@ -202,6 +203,8 @@ export interface ColumnDesc { * this field will store the message name. */ typeName: string; + /** The the column is a generated column. */ + generatedColumn: GeneratedColumnDesc | undefined; } export interface ColumnCatalog { @@ -209,6 +212,10 @@ export interface ColumnCatalog { isHidden: boolean; } +export interface GeneratedColumnDesc { + expr: ExprNode | undefined; +} + export interface StorageTableDesc { tableId: number; columns: ColumnDesc[]; @@ -255,7 +262,7 @@ export const Field = { }; function createBaseColumnDesc(): ColumnDesc { - return { columnType: undefined, columnId: 0, name: "", fieldDescs: [], typeName: "" }; + return { columnType: undefined, columnId: 0, name: "", fieldDescs: [], typeName: "", generatedColumn: undefined }; } export const ColumnDesc = { @@ -266,6 +273,7 @@ export const ColumnDesc = { name: isSet(object.name) ? String(object.name) : "", fieldDescs: Array.isArray(object?.fieldDescs) ? object.fieldDescs.map((e: any) => ColumnDesc.fromJSON(e)) : [], typeName: isSet(object.typeName) ? String(object.typeName) : "", + generatedColumn: isSet(object.generatedColumn) ? GeneratedColumnDesc.fromJSON(object.generatedColumn) : undefined, }; }, @@ -281,6 +289,8 @@ export const ColumnDesc = { obj.fieldDescs = []; } message.typeName !== undefined && (obj.typeName = message.typeName); + message.generatedColumn !== undefined && + (obj.generatedColumn = message.generatedColumn ? GeneratedColumnDesc.toJSON(message.generatedColumn) : undefined); return obj; }, @@ -293,6 +303,9 @@ export const ColumnDesc = { message.name = object.name ?? ""; message.fieldDescs = object.fieldDescs?.map((e) => ColumnDesc.fromPartial(e)) || []; message.typeName = object.typeName ?? ""; + message.generatedColumn = (object.generatedColumn !== undefined && object.generatedColumn !== null) + ? GeneratedColumnDesc.fromPartial(object.generatedColumn) + : undefined; return message; }, }; @@ -327,6 +340,28 @@ export const ColumnCatalog = { }, }; +function createBaseGeneratedColumnDesc(): GeneratedColumnDesc { + return { expr: undefined }; +} + +export const GeneratedColumnDesc = { + fromJSON(object: any): GeneratedColumnDesc { + return { expr: isSet(object.expr) ? ExprNode.fromJSON(object.expr) : undefined }; + }, + + toJSON(message: GeneratedColumnDesc): unknown { + const obj: any = {}; + message.expr !== undefined && (obj.expr = message.expr ? ExprNode.toJSON(message.expr) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): GeneratedColumnDesc { + const message = createBaseGeneratedColumnDesc(); + message.expr = (object.expr !== undefined && object.expr !== null) ? ExprNode.fromPartial(object.expr) : undefined; + return message; + }, +}; + function createBaseStorageTableDesc(): StorageTableDesc { return { tableId: 0, From 087caf59e980ea9581381cc37a032eb2471fac6f Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 23 Mar 2023 02:19:15 +0800 Subject: [PATCH 06/13] fix --- proto/plan_common.proto | 2 +- src/batch/src/executor/insert.rs | 1 + src/batch/src/executor/values.rs | 1 + src/frontend/src/handler/create_table.rs | 12 ++------- .../src/optimizer/plan_node/generic/source.rs | 25 ++++++++++++++----- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 8508a7e34949f..84b7ffd76553a 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -28,7 +28,7 @@ message ColumnDesc { // this field will store the message name. string type_name = 5; // The the column is a generated column. - GeneratedColumnDesc generated_column = 7; + GeneratedColumnDesc generated_column = 6; } message ColumnCatalog { diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 67972b3e7777e..52fb724d96a28 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -125,6 +125,7 @@ impl InsertExecutor { let row_id_col = SerialArray::from_iter(repeat(None).take(cap)); columns.insert(row_id_index, row_id_col.into()) } + let stream_chunk = StreamChunk::new(vec![Op::Insert; cap], columns, vis.into_visibility()); diff --git a/src/batch/src/executor/values.rs b/src/batch/src/executor/values.rs index 429fbc183b9ea..106af69b57a62 100644 --- a/src/batch/src/executor/values.rs +++ b/src/batch/src/executor/values.rs @@ -94,6 +94,7 @@ impl ValuesExecutor { .collect(); let chunk = DataChunk::new(columns, chunk_size); + yield chunk } } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 846db91ed3a95..d6d8b22a817a6 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -540,21 +540,13 @@ fn gen_table_plan_inner( ) .into(); - let mut required_cols = FixedBitSet::with_capacity(source_node.schema().len()); - required_cols.toggle_range(..); - let mut out_names = source_node.schema().names(); - - if let Some(row_id_index) = row_id_index { - required_cols.toggle(row_id_index); - out_names.remove(row_id_index); - } - + let required_cols = FixedBitSet::with_capacity(source_node.schema().len()); let mut plan_root = PlanRoot::new( source_node, RequiredDist::Any, Order::any(), required_cols, - out_names, + vec![], ); if append_only && row_id_index.is_none() { diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index b1beb588e8337..8a650007f3458 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -49,15 +49,20 @@ pub struct Source { impl GenericPlanNode for Source { fn schema(&self) -> Schema { - let fields = self.column_descs.iter().map(Into::into).collect(); + let fields = self.non_generated_columns().map(Into::into).collect(); + // let fields = self.column_descs.iter().map(Into::into).collect(); Schema { fields } } fn logical_pk(&self) -> Option> { let mut id_to_idx = HashMap::new(); - self.column_descs.iter().enumerate().for_each(|(idx, c)| { - id_to_idx.insert(c.column_id, idx); - }); + // self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx, + // c)| { + self.non_generated_columns() + .enumerate() + .for_each(|(idx, c)| { + id_to_idx.insert(c.column_id, idx); + }); self.pk_col_ids .iter() .map(|c| id_to_idx.get(c).copied()) @@ -70,11 +75,12 @@ impl GenericPlanNode for Source { fn functional_dependency(&self) -> FunctionalDependencySet { let pk_indices = self.logical_pk(); + let non_generated_columns_count = self.non_generated_columns().count(); match pk_indices { Some(pk_indices) => { - FunctionalDependencySet::with_key(self.column_descs.len(), &pk_indices) + FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices) } - None => FunctionalDependencySet::new(self.column_descs.len()), + None => FunctionalDependencySet::new(non_generated_columns_count), } } } @@ -108,4 +114,11 @@ impl Source { builder.build(vec![], 1) } + + /// Non-generated columns + fn non_generated_columns(&self) -> impl Iterator { + self.column_descs + .iter() + .filter(|c| !c.is_generated_column()) + } } From 28ff462758bf483a34e80d31c0f7a8d6b9740e83 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 23 Mar 2023 17:52:46 +0800 Subject: [PATCH 07/13] add tests; ban detele and create source with generated columns --- batch-ddl-debug-junit.xml | 7 + e2e_test/ddl/table.slt | 263 +----------------- e2e_test/ddl/table/generated_columns.slt.part | 37 +++ e2e_test/ddl/table/table.slt.part | 261 +++++++++++++++++ .../tests/testdata/generated_columns.yaml | 11 + src/frontend/src/binder/delete.rs | 9 +- src/frontend/src/catalog/table_catalog.rs | 4 + src/frontend/src/handler/create_source.rs | 7 + 8 files changed, 337 insertions(+), 262 deletions(-) create mode 100644 batch-ddl-debug-junit.xml create mode 100644 e2e_test/ddl/table/generated_columns.slt.part create mode 100644 e2e_test/ddl/table/table.slt.part create mode 100644 src/frontend/planner_test/tests/testdata/generated_columns.yaml diff --git a/batch-ddl-debug-junit.xml b/batch-ddl-debug-junit.xml new file mode 100644 index 0000000000000..3f1e6d54e2d90 --- /dev/null +++ b/batch-ddl-debug-junit.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/e2e_test/ddl/table.slt b/e2e_test/ddl/table.slt index 525982f2c579d..f892974cbcd60 100644 --- a/e2e_test/ddl/table.slt +++ b/e2e_test/ddl/table.slt @@ -1,261 +1,2 @@ -# Create a table. -statement ok -create table ddl_t (v1 int); - -statement ok -explain select v1 from ddl_t; - -# Create another table with duplicated name. -statement error -create table ddl_t (v2 int); - -# Create a table using a empty string. -statement error -create table ""(v2 int); - -statement ok -create table if not exists ddl_t (v2 int); - -# Drop the table. -statement ok -drop table ddl_t; - -# Drop it again. -statement error -drop table ddl_t; - -# Create another table with the same name. -statement ok -create table ddl_t (v2 int); - -statement ok -explain select v2 from ddl_t; - -# Create a mview on top of it. -statement ok -create materialized view ddl_mv as select v2 from ddl_t; - -statement ok -explain select v2 from ddl_t; - -statement ok -explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); - -statement ok -explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); - -# Create a mview with duplicated name. -statement error -create materialized view ddl_mv as select v2 from ddl_t; - -# Drop the table before dropping the mview. -statement error -drop table ddl_t; - -# We're not allowed to drop the mview using `DROP TABLE`. -statement error -drop table ddl_mv; - -# Drop the mview. -statement ok -drop materialized view ddl_mv; - -# Drop it again. -statement error -drop materialized view ddl_mv; - -# We're not allowed to drop the table using `DROP MATERIALIZED VIEW`. -statement error -drop materialized view ddl_t; - -# Now, we can drop the base table. -statement ok -drop table ddl_t; - -# Create table concludes struct column. -statement ok -create table st (v1 int, v2 struct>); - -statement ok -drop table st - -# We test the case sensitivity of table name and column name. -statement ok -create table t1 (v1 int); - -statement ok -drop table T1; - -statement ok -create table T1 (v1 int); - -statement ok -drop table t1; - -statement ok -create table "T1" (v1 int); - -# Since we have not really bound the columns in the insert statement -# this test case cannot be enabled. -# statement error -# insert into "T1" ("V1") values (1); - -statement error -drop table t1; - -statement error -drop table T1; - -statement ok -drop table "T1"; - -statement ok -create table "T2" ("V1" int); - -# Since we have not really bound the columns in the insert statement -# this test case cannot be enabled. -# statement error -# insert into "T2" (V1) values (1); - -statement ok -insert into "T2" ("V1") values (1); - -statement ok -drop table "T2" - -statement error -create table C1 (c1 varchar(5)); - -statement error -create table t (v1 int not null); - -statement error -create table t (v1 varchar collate "en_US"); - -# Test create-table-as -statement ok -create table t as select 1; - -statement ok -drop table t; - -statement error -create table t as select 1,2; - -statement ok -create table t as select 1 as a, 2 as b; - -statement ok -drop table t; - -statement ok -create table t(v1) as select 1; - -statement ok -drop table t; - -statement ok -create table t (v1 int,v2 int); - -statement ok -insert into t values (1,1); - -statement ok -insert into t values (1,1); - -statement ok -insert into t values (1,1); - -statement ok -flush - -statement ok -create table t1 as select * from t; - -statement ok -flush; - -query I -select * from t1; ----- -1 1 -1 1 -1 1 - -statement ok -drop table t1; - -statement ok -drop table t; - -statement ok -create table t AS SELECT * FROM generate_series(0, 5,1) tbl(i); - -statement ok -flush; - -query I -select * from t order by i; ----- -0 -1 -2 -3 -4 -5 - -statement ok -drop table t; - -statement ok -create table t (v1 int); - -statement ok -insert into t values (1); - -statement ok -insert into t values (2); - -statement ok -create table n1 as select sum(v1) from t; - -statement ok -flush; - -query I -select * from n1; ----- -3 - -statement error -create table n1 (v2 int); - -statement error -create table n1 as select * from t; - -statement ok -create table if not exists n1 (v2 int); - -statement ok -drop table n1; - -statement ok -drop table t; - -statement ok -create table t (v1 int,v2 int); - -statement ok -create table t1(a,b) as select v1,v2 from t; - -statement ok -create table t2(a) as select v1,v2 from t; - -statement ok -drop table t; - -statement ok -drop table t1; - -statement ok -drop table t2; +include ./table/*.slt.part +include ./table/generated_columns.slt.part \ No newline at end of file diff --git a/e2e_test/ddl/table/generated_columns.slt.part b/e2e_test/ddl/table/generated_columns.slt.part new file mode 100644 index 0000000000000..04aa4878486a3 --- /dev/null +++ b/e2e_test/ddl/table/generated_columns.slt.part @@ -0,0 +1,37 @@ +# Create a table with generated columns. +statement ok +create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1); + +statement ok +insert into t1 (v2) values (1), (2); + +statement ok +flush; + +query III +select * from t1; +---- +0 1 2 +1 2 3 + +statement ok +drop table t1; + +# Create a table with generated columns. +statement ok +create table t2 (v1 int, v2 int as v1+1); + +statement ok +insert into t2 values (1), (2); + +statement ok +flush; + +query II +select * from t2; +---- +1 2 +2 3 + +statement ok +drop table t2; \ No newline at end of file diff --git a/e2e_test/ddl/table/table.slt.part b/e2e_test/ddl/table/table.slt.part new file mode 100644 index 0000000000000..525982f2c579d --- /dev/null +++ b/e2e_test/ddl/table/table.slt.part @@ -0,0 +1,261 @@ +# Create a table. +statement ok +create table ddl_t (v1 int); + +statement ok +explain select v1 from ddl_t; + +# Create another table with duplicated name. +statement error +create table ddl_t (v2 int); + +# Create a table using a empty string. +statement error +create table ""(v2 int); + +statement ok +create table if not exists ddl_t (v2 int); + +# Drop the table. +statement ok +drop table ddl_t; + +# Drop it again. +statement error +drop table ddl_t; + +# Create another table with the same name. +statement ok +create table ddl_t (v2 int); + +statement ok +explain select v2 from ddl_t; + +# Create a mview on top of it. +statement ok +create materialized view ddl_mv as select v2 from ddl_t; + +statement ok +explain select v2 from ddl_t; + +statement ok +explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); + +statement ok +explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); + +# Create a mview with duplicated name. +statement error +create materialized view ddl_mv as select v2 from ddl_t; + +# Drop the table before dropping the mview. +statement error +drop table ddl_t; + +# We're not allowed to drop the mview using `DROP TABLE`. +statement error +drop table ddl_mv; + +# Drop the mview. +statement ok +drop materialized view ddl_mv; + +# Drop it again. +statement error +drop materialized view ddl_mv; + +# We're not allowed to drop the table using `DROP MATERIALIZED VIEW`. +statement error +drop materialized view ddl_t; + +# Now, we can drop the base table. +statement ok +drop table ddl_t; + +# Create table concludes struct column. +statement ok +create table st (v1 int, v2 struct>); + +statement ok +drop table st + +# We test the case sensitivity of table name and column name. +statement ok +create table t1 (v1 int); + +statement ok +drop table T1; + +statement ok +create table T1 (v1 int); + +statement ok +drop table t1; + +statement ok +create table "T1" (v1 int); + +# Since we have not really bound the columns in the insert statement +# this test case cannot be enabled. +# statement error +# insert into "T1" ("V1") values (1); + +statement error +drop table t1; + +statement error +drop table T1; + +statement ok +drop table "T1"; + +statement ok +create table "T2" ("V1" int); + +# Since we have not really bound the columns in the insert statement +# this test case cannot be enabled. +# statement error +# insert into "T2" (V1) values (1); + +statement ok +insert into "T2" ("V1") values (1); + +statement ok +drop table "T2" + +statement error +create table C1 (c1 varchar(5)); + +statement error +create table t (v1 int not null); + +statement error +create table t (v1 varchar collate "en_US"); + +# Test create-table-as +statement ok +create table t as select 1; + +statement ok +drop table t; + +statement error +create table t as select 1,2; + +statement ok +create table t as select 1 as a, 2 as b; + +statement ok +drop table t; + +statement ok +create table t(v1) as select 1; + +statement ok +drop table t; + +statement ok +create table t (v1 int,v2 int); + +statement ok +insert into t values (1,1); + +statement ok +insert into t values (1,1); + +statement ok +insert into t values (1,1); + +statement ok +flush + +statement ok +create table t1 as select * from t; + +statement ok +flush; + +query I +select * from t1; +---- +1 1 +1 1 +1 1 + +statement ok +drop table t1; + +statement ok +drop table t; + +statement ok +create table t AS SELECT * FROM generate_series(0, 5,1) tbl(i); + +statement ok +flush; + +query I +select * from t order by i; +---- +0 +1 +2 +3 +4 +5 + +statement ok +drop table t; + +statement ok +create table t (v1 int); + +statement ok +insert into t values (1); + +statement ok +insert into t values (2); + +statement ok +create table n1 as select sum(v1) from t; + +statement ok +flush; + +query I +select * from n1; +---- +3 + +statement error +create table n1 (v2 int); + +statement error +create table n1 as select * from t; + +statement ok +create table if not exists n1 (v2 int); + +statement ok +drop table n1; + +statement ok +drop table t; + +statement ok +create table t (v1 int,v2 int); + +statement ok +create table t1(a,b) as select v1,v2 from t; + +statement ok +create table t2(a) as select v1,v2 from t; + +statement ok +drop table t; + +statement ok +drop table t1; + +statement ok +drop table t2; diff --git a/src/frontend/planner_test/tests/testdata/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/generated_columns.yaml new file mode 100644 index 0000000000000..48785276e175e --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/generated_columns.yaml @@ -0,0 +1,11 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- name: table with generated columns + sql: | + explain create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1); + explain_output: | + StreamMaterialize { columns: [v1, v2, v3, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "overwrite" } + └─StreamExchange { dist: HashShard(_row_id) } + └─StreamRowIdGen { row_id_index: 3 } + └─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] } + └─StreamDml { columns: [v2, _row_id] } + └─StreamSource diff --git a/src/frontend/src/binder/delete.rs b/src/frontend/src/binder/delete.rs index 2de1a7e0e182d..62a268e94bab5 100644 --- a/src/frontend/src/binder/delete.rs +++ b/src/frontend/src/binder/delete.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::catalog::{Schema, TableVersionId}; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_sqlparser::ast::{Expr, ObjectName, SelectItem}; use super::statement::RewriteExprsRecursive; @@ -77,6 +77,13 @@ impl Binder { let owner = table_catalog.owner; let table_version_id = table_catalog.version_id().expect("table must be versioned"); + // TODO(yuhao): delete from table with generated columns + if table_catalog.has_generated_column() { + return Err(RwError::from(ErrorCode::BindError( + "Delete from a table with generated column has not been implemented.".to_string(), + ))); + } + let table = self.bind_table(schema_name, &table_name, None)?; let (returning_list, fields) = self.bind_returning_list(returning_items)?; let returning = !returning_list.is_empty(); diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 8ee3ec5f0f6d6..9fc4073c48d67 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -384,6 +384,10 @@ impl TableCatalog { .filter(|c| c.is_generated()) .map(|c| c.name()) } + + pub fn has_generated_column(&self) -> bool { + self.columns.iter().any(|c| c.is_generated()) + } } impl From for TableCatalog { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index aaa4c9c4189d0..3c0f316f86b84 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -631,6 +631,13 @@ pub async fn handle_create_source( bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?; + if columns.iter().any(|c| c.is_generated()) { + // TODO(yuhao): allow generated columns on source + return Err(RwError::from(ErrorCode::BindError( + "Generated columns on source has not been implemented.".to_string(), + ))); + } + let row_id_index = row_id_index.map(|index| index as _); let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect(); From 83816270deaf1f8f67986f8d6fb911ea30b470ac Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 23 Mar 2023 17:57:24 +0800 Subject: [PATCH 08/13] new line --- e2e_test/ddl/table.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/ddl/table.slt b/e2e_test/ddl/table.slt index f892974cbcd60..a1c6581480a89 100644 --- a/e2e_test/ddl/table.slt +++ b/e2e_test/ddl/table.slt @@ -1,2 +1,2 @@ include ./table/*.slt.part -include ./table/generated_columns.slt.part \ No newline at end of file +include ./table/generated_columns.slt.part From cb843d6a6dad9a650a5d4d2615f52746f40cc7d2 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 23 Mar 2023 17:58:40 +0800 Subject: [PATCH 09/13] what file --- batch-ddl-debug-junit.xml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 batch-ddl-debug-junit.xml diff --git a/batch-ddl-debug-junit.xml b/batch-ddl-debug-junit.xml deleted file mode 100644 index 3f1e6d54e2d90..0000000000000 --- a/batch-ddl-debug-junit.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - From 618c84d505bf6ea06e0f1fbabc38565f57779e03 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 23 Mar 2023 18:00:32 +0800 Subject: [PATCH 10/13] new line --- e2e_test/ddl/table/generated_columns.slt.part | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/ddl/table/generated_columns.slt.part b/e2e_test/ddl/table/generated_columns.slt.part index 04aa4878486a3..ec8d74b7f422b 100644 --- a/e2e_test/ddl/table/generated_columns.slt.part +++ b/e2e_test/ddl/table/generated_columns.slt.part @@ -34,4 +34,4 @@ select * from t2; 2 3 statement ok -drop table t2; \ No newline at end of file +drop table t2; From eb2e7e0b89e16545b466a07c97c762c1e94e5e8c Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 24 Mar 2023 15:14:43 +0800 Subject: [PATCH 11/13] fix --- e2e_test/ddl/table/generated_columns.slt.part | 4 ++++ proto/plan_common.proto | 2 +- src/frontend/src/optimizer/mod.rs | 2 +- src/frontend/src/optimizer/plan_node/logical_source.rs | 4 ++++ 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/e2e_test/ddl/table/generated_columns.slt.part b/e2e_test/ddl/table/generated_columns.slt.part index ec8d74b7f422b..f6c0a18067838 100644 --- a/e2e_test/ddl/table/generated_columns.slt.part +++ b/e2e_test/ddl/table/generated_columns.slt.part @@ -35,3 +35,7 @@ select * from t2; statement ok drop table t2; + +# Generated column reference another generated column +statement error +create table t2 (v1 int as v2+1, v2 int, v3 int as v1-1); diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 84b7ffd76553a..98d4db458a487 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -27,7 +27,7 @@ message ColumnDesc { // For example, when the type is created from a protobuf schema file, // this field will store the message name. string type_name = 5; - // The the column is a generated column. + // Optional description for the generated column. GeneratedColumnDesc generated_column = 6; } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 8424c9f5d40a9..11fc45ab0f4cf 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -388,7 +388,7 @@ impl PlanRoot { append_only, columns .iter() - .filter_map(|c| (!c.is_generated()).then_some(c.column_desc.clone())) + .filter_map(|c| (!c.is_generated()).then(|| c.column_desc.clone())) .collect(), ) .into(); diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 94abbe37b9f78..f582bc9eb2bfc 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -18,6 +18,7 @@ use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; +use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; @@ -114,6 +115,9 @@ impl LogicalSource { let GeneratedColumnDesc { expr } = generated_column; // TODO(yuhao): avoid this `from_expr_proto`. let proj_expr = rewriter.rewrite_expr(ExprImpl::from_expr_proto(&expr.unwrap())?); + if proj_expr.return_type() != column_desc.data_type { + bail!("Expression return type should match the type specified for the column"); + } exprs.push(proj_expr); } else { let input_ref = InputRef { From 656f4ec800da5244b6c5a861b28360d82baecdfd Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 24 Mar 2023 15:49:12 +0800 Subject: [PATCH 12/13] ban alter table with generated column --- src/frontend/src/handler/alter_table_column.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index bd61c83bbe83e..6e3168d983608 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -15,7 +15,7 @@ use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::Table; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -70,6 +70,13 @@ pub async fn handle_alter_table_column( table.clone() }; + // TODO(yuhao): alter table with generated columns. + if original_catalog.has_generated_column() { + return Err(RwError::from(ErrorCode::BindError( + "Alter a table with generated column has not been implemented.".to_string(), + ))); + } + // Retrieve the original table definition and parse it to AST. let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition) .context("unable to parse original table definition")? From 8faa3876f78eb01ca237ee1516cb9c8e2646dd23 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 24 Mar 2023 15:56:28 +0800 Subject: [PATCH 13/13] fix --- e2e_test/ddl/table.slt | 1 - 1 file changed, 1 deletion(-) diff --git a/e2e_test/ddl/table.slt b/e2e_test/ddl/table.slt index a1c6581480a89..4c076b979fc25 100644 --- a/e2e_test/ddl/table.slt +++ b/e2e_test/ddl/table.slt @@ -1,2 +1 @@ include ./table/*.slt.part -include ./table/generated_columns.slt.part