8000 feat(source): Support AWS privatelink for CREATE TABLE command by StrikeW · Pull Request #9728 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(source): Support AWS privatelink for CREATE TABLE command #9728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ create source s with (
connection.name = 'nonexist',
) row format json;

statement ok
CREATE TABLE mytable with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = '127.0.0.1:29092',
connection.name = 'mock'
) ROW FORMAT JSON;

statement ok
DROP TABLE mytable;


# `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns.
statement error
create source s (
Expand Down
18 changes: 4 additions & 14 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use risingwave_sqlparser::ast::{
use super::create_mv::get_column_names;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::connection_catalog::resolve_private_link_connection;
use crate::handler::create_source::CONNECTION_NAME_KEY;
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::Explain;
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::utils::resolve_connection_in_with_option;
use crate::Planner;

pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result<Query> {
Expand Down Expand Up @@ -95,19 +94,10 @@ pub fn gen_sink_plan(
let col_names = get_column_names(&bound, session, stmt.columns)?;

let mut with_options = context.with_options().clone();
let properties = with_options.inner_mut();
let connection_id = {
if let Some(connection_name) = properties
.get(CONNECTION_NAME_KEY)
.map(|s| s.to_lowercase())
{
let conn = session.get_connection_by_name(sink_schema_name, &connection_name)?;
resolve_private_link_connection(&conn, properties)?;
tracing::debug!("Create sink with connection {:?}", conn.id);
Some(ConnectionId(conn.id))
} else {
None
}
let conn_id =
resolve_connection_in_with_option(&mut with_options, &sink_schema_name, session)?;
conn_id.map(ConnectionId)
};

let mut plan_root = Planner::new(context).plan_query(bound)?;
Expand Down
30 changes: 4 additions & 26 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use risingwave_sqlparser::ast::{
use super::create_table::bind_sql_table_column_constraints;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::connection_catalog::resolve_private_link_connection;
use crate::catalog::ColumnId;
use crate::expr::Expr;
use crate::handler::create_table::{
Expand All @@ -57,6 +56,7 @@ use crate::handler::util::{get_connector, is_kafka_connector};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::KAFKA_TIMESTAMP_COLUMN_NAME;
use crate::session::SessionImpl;
use crate::utils::resolve_connection_in_with_option;
use crate::WithOptions;

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
Expand Down Expand Up @@ -178,13 +178,6 @@ async fn extract_protobuf_table_schema(
.collect_vec())
}

#[inline(always)]
fn get_connection_name(with_properties: &HashMap<String, String>) -> Option<String> {
with_properties
.get(CONNECTION_NAME_KEY)
.map(|s| s.to_lowercase())
}

pub(crate) async fn resolve_source_schema(
source_schema: SourceSchema,
columns: &mut Vec<ColumnCatalog>,
Expand Down Expand Up @@ -791,25 +784,10 @@ pub async fn handle_create_source(

let columns = columns.into_iter().map(|c| c.to_protobuf()).collect_vec();

let connection_name = get_connection_name(&with_properties);
let is_kafka_connector = is_kafka_connector(&with_properties);
// resolve privatelink connection for Kafka source
let mut with_options = WithOptions::new(with_properties);
let connection_id = match connection_name {
Some(connection_name) => {
let connection = session
.get_connection_by_name(schema_name, &connection_name)
.map_err(|_| ErrorCode::ItemNotFound(connection_name))?;
if !is_kafka_connector {
return Err(RwError::from(ErrorCode::ProtocolError(
"Create source with connection is only supported for kafka connectors."
.to_string(),
)));
}
resolve_private_link_connection(&connection, with_options.inner_mut())?;
Some(connection.id)
}
None => None,
};
let connection_id =
resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?;
let definition = handler_args.normalized_sql;

let source = PbSource {
Expand Down
13 changes: 10 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::utils::resolve_connection_in_with_option;
use crate::{Binder, TableCatalog, WithOptions};

/// Column ID generator for a new table or a new version of an existing table to alter.
Expand Down Expand Up @@ -550,7 +551,13 @@ fn gen_table_plan_inner(
let session = context.session_ctx().clone();
let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;

// resolve privatelink connection for Table backed by Kafka source
let mut with_options = WithOptions::new(properties);
let connection_id =
resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?;

let source = source_info.map(|source_info| PbSource {
id: TableId::placeholder().table_id,
Expand All @@ -563,12 +570,12 @@ fn gen_table_plan_inner(
.map(|column| column.to_protobuf())
.collect_vec(),
pk_column_ids: pk_column_ids.iter().map(Into::into).collect_vec(),
properties,
properties: with_options.into_inner().into_iter().collect(),
info: Some(source_info),
owner: session.user_id(),
watermark_descs: watermark_descs.clone(),
definition: "".to_string(),
connection_id: None,
connection_id,
optional_associated_table_id: Some(OptionalAssociatedTableId::AssociatedTableId(
TableId::placeholder().table_id,
)),
Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -34,7 +34,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_expr::vector_op::timestamptz::timestamptz_to_string;

use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::handler::create_source::{CONNECTION_NAME_KEY, UPSTREAM_SOURCE_KEY};
use crate::session::SessionImpl;

pin_project! {
Expand Down Expand Up @@ -214,6 +214,13 @@ pub fn is_kafka_connector(with_properties: &HashMap<String, String>) -> bool {
connector == KAFKA_CONNECTOR
}

#[inline(always)]
pub fn get_connection_name(with_properties: &BTreeMap<String, String>) -> Option<String> {
with_properties
.get(CONNECTION_NAME_KEY)
.map(|s| s.to_lowercase())
}

#[cfg(test)]
mod tests {
use bytes::BytesMut;
Expand Down
42 changes: 41 additions & 1 deletion src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ use std::convert::TryFrom;
use std::num::NonZeroU32;

use itertools::Itertools;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::error::{ErrorCode, Result as RwResult, RwError};
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_sqlparser::ast::{
CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, SqlOption, Statement,
Value,
};

use crate::catalog::connection_catalog::resolve_private_link_connection;
use crate::catalog::ConnectionId;
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::handler::util::get_connection_name;
use crate::session::SessionImpl;

mod options {
use risingwave_common::catalog::hummock::PROPERTIES_RETENTION_SECOND_KEY;

Expand Down Expand Up @@ -103,6 +110,39 @@ impl WithOptions {
}
}

#[inline(always)]
fn is_kafka_connector(with_options: &WithOptions) -> bool {
let Some(connector) = with_options.inner().get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase()) else {
return false;
};
connector == KAFKA_CONNECTOR
}

pub(crate) fn resolve_connection_in_with_option(
with_options: &mut WithOptions,
schema_name: &Option<String>,
session: &SessionImpl,
) -> RwResult<Option<ConnectionId>> {
let connection_name = get_connection_name(with_options);
let is_kafka = is_kafka_connector(with_options);
let connection_id = match connection_name {
Some(connection_name) => {
let connection = session
.get_connection_by_name(schema_name.clone(), &connection_name)
.map_err(|_| ErrorCode::ItemNotFound(connection_name))?;
if !is_kafka {
return Err(RwError::from(ErrorCode::ProtocolError(
"Connection is only supported in kafka connector".to_string(),
)));
}
resolve_private_link_connection(&connection, with_options.inner_mut())?;
Some(connection.id)
}
None => None,
};
Ok(connection_id)
}

impl TryFrom<&[SqlOption]> for WithOptions {
type Error = RwError;

Expand Down
42 changes: 13 additions & 29 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup};

use crate::manager::catalog::utils::{
alter_relation_rename, alter_relation_rename_refs, ReplaceTableExprRewriter,
alter_relation_rename, alter_relation_rename_refs, refcnt_dec_connection,
refcnt_inc_connection, ReplaceTableExprRewriter,
};

pub type CatalogManagerRef<S> = Arc<CatalogManager<S>>;
Expand Down Expand Up @@ -1294,14 +1295,7 @@ where
database_core.mark_creating(&key);
user_core.increase_ref(source.owner);
// We have validate the status of connection before starting the procedure.
if let Some(connection_id) = source.connection_id {
if let Some(_conn) = database_core.get_connection(connection_id) {
// TODO(weili): wait for yezizp to refactor ref cnt
database_core.increase_ref_count(connection_id);
} else {
bail!("connection {} not found.", connection_id);
}
}
refcnt_inc_connection(database_core, source.connection_id)?;
Ok(())
}
}
Expand Down Expand Up @@ -1356,10 +1350,7 @@ where

database_core.unmark_creating(&key);
user_core.decrease_ref(source.owner);
if let Some(connection_id) = source.connection_id {
// TODO(weili): wait for yezizp to refactor ref cnt
database_core.decrease_ref_count(connection_id);
}
refcnt_dec_connection(database_core, source.connection_id);
Ok(())
}

Expand All @@ -1385,10 +1376,7 @@ where
commit_meta!(self, sources, users)?;

user_core.decrease_ref(source.owner);
if let Some(connection_id) = source.connection_id {
// TODO(weili): wait for yezizp to refactor ref cnt
database_core.decrease_ref_count(connection_id);
}
refcnt_dec_connection(database_core, source.connection_id);

for user in users_need_update {
self.notify_frontend(Operation::Update, Info::User(user))
Expand Down Expand Up @@ -1431,6 +1419,9 @@ where
ensure!(table.dependent_relations.is_empty());
// source and table
user_core.increase_ref_count(source.owner, 2);

// We have validate the status of connection before starting the procedure.
refcnt_inc_connection(database_core, source.connection_id)?;
Ok(())
}
}
Expand Down Expand Up @@ -1518,6 +1509,7 @@ where
database_core.unmark_creating(&table_key);
database_core.unmark_creating_streaming_job(table.id);
user_core.decrease_ref_count(source.owner, 2); // source and table
refcnt_dec_connection(database_core, source.connection_id);
}

/// return id of streaming jobs in the database which need to be dropped by stream manager.
Expand Down Expand Up @@ -1610,6 +1602,8 @@ where
// Commit point
commit_meta!(self, tables, sources, indexes, users)?;

refcnt_dec_connection(database_core, source.connection_id);

indexes_removed.iter().for_each(|index| {
user_core.decrease_ref_count(index.owner, 2); // index table and index
});
Expand Down Expand Up @@ -1800,14 +1794,7 @@ where
}
user_core.increase_ref(sink.owner);
// We have validate the status of connection before starting the procedure.
if let Some(connection_id) = sink.connection_id {
if let Some(_conn) = database_core.get_connection(connection_id) {
// TODO(siyuan): wait for yezizp to refactor ref cnt
database_core.increase_ref_count(connection_id);
} else {
bail!("connection {} not found.", connection_id);
}
}
refcnt_inc_connection(database_core, sink.connection_id)?;
Ok(())
}
}
Expand Down Expand Up @@ -1875,10 +1862,7 @@ where
database_core.decrease_ref_count(dependent_relation_id);
}
user_core.decrease_ref(sink.owner);
if let Some(connection_id) = sink.connection_id {
// TODO(siyuan): wait for yezizp to refactor ref cnt
database_core.decrease_ref_count(connection_id);
}
refcnt_dec_connection(database_core, sink.connection_id);
}

pub async fn drop_sink(
Expand Down
28 changes: 28 additions & 0 deletions src/meta/src/manager/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction};
Expand All @@ -23,6 +24,33 @@ use risingwave_sqlparser::ast::{
};
use risingwave_sqlparser::parser::Parser;

use crate::manager::{ConnectionId, DatabaseManager};

pub fn refcnt_inc_connection(
database_mgr: &mut DatabaseManager,
connection_id: Option<ConnectionId>,
) -> anyhow::Result<()> {
if let Some(connection_id) = connection_id {
if let Some(_conn) = database_mgr.get_connection(connection_id) {
// TODO(weili): wait for yezizp to refactor ref cnt
database_mgr.increase_ref_count(connection_id);
} else {
bail!("connection {} not found.", connection_id);
}
}
Ok(())
}

pub fn refcnt_dec_connection(
database_mgr: &mut DatabaseManager,
connection_id: Option<ConnectionId>,
) {
if let Some(connection_id) = connection_id {
// TODO: wait for yezizp to refactor ref cnt
database_mgr.decrease_ref_count(connection_id);
}
}

/// `alter_relation_rename` renames a relation to a new name in its `Create` statement, and returns
/// the updated definition raw sql. Note that the `definition` must be a `Create` statement and the
/// `new_name` must be a valid identifier, it should be validated before calling this function. To
Expand Down
0