diff --git a/e2e_test/source_inline/connection/ddl.slt b/e2e_test/source_inline/connection/ddl.slt index 30c2587490df0..db2952d625387 100644 --- a/e2e_test/source_inline/connection/ddl.slt +++ b/e2e_test/source_inline/connection/ddl.slt @@ -70,6 +70,14 @@ insert into data_table values (1, 'a'), (2, 'b'), (3, 'c'); statement ok flush; +statement error missing field 'connector' in WITH clause +create sink sink_kafka from data_table with ( + connection = conn, + topic = 'connection_ddl_1' +) format plain encode json ( + force_append_only='true' +); + statement ok create sink sink_kafka from data_table with ( connector = 'kafka', diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 885a31cbe52ea..b411258f9825f 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -114,7 +114,12 @@ pub async fn gen_sink_plan( // if not using connection, we don't need to check connector match connection type if !matches!(connection_type, PbConnectionType::Unspecified) { - let connector = resolved_with_options.get_connector().unwrap(); + let Some(connector) = resolved_with_options.get_connector() else { + return Err(RwError::from(ErrorCode::ProtocolError(format!( + "missing field '{}' in WITH clause", + CONNECTOR_TYPE_KEY + )))); + }; check_connector_match_connection_type(connector.as_str(), &connection_type)?; } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e489295914a4f..ff31c88356716 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -832,7 +832,12 @@ pub async fn bind_create_source_or_table_with_connector( // if not using connection, we don't need to check connector match connection type if !matches!(connection_type, PbConnectionType::Unspecified) { - let connector = with_properties.get_connector().unwrap(); + let Some(connector) = with_properties.get_connector() else { + return Err(RwError::from(ProtocolError(format!( + "missing field '{}' in WITH clause", + UPSTREAM_SOURCE_KEY + )))); + }; check_connector_match_connection_type(connector.as_str(), &connection_type)?; }