diff --git a/src/connector/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs index faf16bc32f485..fe257e015ee3c 100644 --- a/src/connector/src/sink/encoder/template.rs +++ b/src/connector/src/sink/encoder/template.rs @@ -28,6 +28,7 @@ pub enum TemplateEncoder { String(TemplateStringEncoder), RedisGeoKey(TemplateRedisGeoKeyEncoder), RedisGeoValue(TemplateRedisGeoValueEncoder), + RedisPubSubKey(TemplateRedisPubSubKeyEncoder), } impl TemplateEncoder { pub fn new_string(schema: Schema, col_indices: Option>, template: String) -> Self { @@ -55,6 +56,17 @@ impl TemplateEncoder { TemplateRedisGeoKeyEncoder::new(schema, col_indices, member_name, template)?, )) } + + pub fn new_pubsub_key( + schema: Schema, + col_indices: Option>, + channel: Option, + channel_column: Option, + ) -> Result { + Ok(TemplateEncoder::RedisPubSubKey( + TemplateRedisPubSubKeyEncoder::new(schema, col_indices, channel, channel_column)?, + )) + } } impl RowEncoder for TemplateEncoder { type Output = TemplateEncoderOutput; @@ -64,6 +76,7 @@ impl RowEncoder for TemplateEncoder { TemplateEncoder::String(encoder) => &encoder.schema, TemplateEncoder::RedisGeoValue(encoder) => &encoder.schema, TemplateEncoder::RedisGeoKey(encoder) => &encoder.key_encoder.schema, + TemplateEncoder::RedisPubSubKey(encoder) => &encoder.schema, } } @@ -72,6 +85,7 @@ impl RowEncoder for TemplateEncoder { TemplateEncoder::String(encoder) => encoder.col_indices.as_deref(), TemplateEncoder::RedisGeoValue(encoder) => encoder.col_indices.as_deref(), TemplateEncoder::RedisGeoKey(encoder) => encoder.key_encoder.col_indices.as_deref(), + TemplateEncoder::RedisPubSubKey(encoder) => encoder.col_indices.as_deref(), } } @@ -86,6 +100,7 @@ impl RowEncoder for TemplateEncoder { )), TemplateEncoder::RedisGeoValue(encoder) => encoder.encode_cols(row, col_indices), TemplateEncoder::RedisGeoKey(encoder) => encoder.encode_cols(row, col_indices), + TemplateEncoder::RedisPubSubKey(encoder) => encoder.encode_cols(row, col_indices), } } } @@ -259,6 +274,73 @@ impl TemplateRedisGeoKeyEncoder { } } +pub enum TemplateRedisPubSubKeyEncoderInner { + PubSubName(String), + PubSubColumnIndex(usize), +} +pub struct TemplateRedisPubSubKeyEncoder { + inner: TemplateRedisPubSubKeyEncoderInner, + schema: Schema, + col_indices: Option>, +} + +impl TemplateRedisPubSubKeyEncoder { + pub fn new( + schema: Schema, + col_indices: Option>, + channel: Option, + channel_column: Option, + ) -> Result { + if let Some(channel) = channel { + return Ok(Self { + inner: TemplateRedisPubSubKeyEncoderInner::PubSubName(channel), + schema, + col_indices, + }); + } + if let Some(channel_column) = channel_column { + let channel_column_index = schema + .names_str() + .iter() + .position(|name| name == &channel_column) + .ok_or_else(|| { + SinkError::Redis(format!( + "Can't find pubsub column({}) in schema", + channel_column + )) + })?; + return Ok(Self { + inner: TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(channel_column_index), + schema, + col_indices, + }); + } + Err(SinkError::Redis( + "`channel` or `channel_column` must be set".to_owned(), + )) + } + + pub fn encode_cols( + &self, + row: impl Row, + _col_indices: impl Iterator, + ) -> Result { + match &self.inner { + TemplateRedisPubSubKeyEncoderInner::PubSubName(channel) => { + Ok(TemplateEncoderOutput::RedisPubSubKey(channel.clone())) + } + TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(pubsub_col) => { + let pubsub_key = row + .datum_at(*pubsub_col) + .ok_or_else(|| SinkError::Redis("pubsub_key is null".to_owned()))? + .to_text() + .clone(); + Ok(TemplateEncoderOutput::RedisPubSubKey(pubsub_key)) + } + } + } +} + pub enum TemplateEncoderOutput { // String formatted according to the template String(String), @@ -266,6 +348,8 @@ pub enum TemplateEncoderOutput { RedisGeoValue((String, String)), // The key of redis's geospatial, including redis's key and member RedisGeoKey((String, String)), + + RedisPubSubKey(String), } impl TemplateEncoderOutput { @@ -278,6 +362,7 @@ impl TemplateEncoderOutput { TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode( "RedisGeoVelue can't convert to string".to_owned(), )), + TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s), } } } @@ -292,11 +377,13 @@ impl SerTo for TemplateEncoderOutput { TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode( "RedisGeoVelue can't convert to string".to_owned(), )), + TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s), } } } /// The enum of inputs to `RedisSinkPayloadWriter` +#[derive(Debug)] pub enum RedisSinkPayloadWriterInput { // Json and String will be convert to string String(String), @@ -304,6 +391,7 @@ pub enum RedisSinkPayloadWriterInput { RedisGeoValue((String, String)), // The key of redis's geospatial, including redis's key and member RedisGeoKey((String, String)), + RedisPubSubKey(String), } impl SerTo for TemplateEncoderOutput { @@ -316,6 +404,9 @@ impl SerTo for TemplateEncoderOutput { TemplateEncoderOutput::RedisGeoValue((key, member)) => { Ok(RedisSinkPayloadWriterInput::RedisGeoValue((key, member))) } + TemplateEncoderOutput::RedisPubSubKey(s) => { + Ok(RedisSinkPayloadWriterInput::RedisPubSubKey(s)) + } } } } diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 92ad7a025241a..94fa8e644eb07 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -37,8 +37,8 @@ use super::encoder::{ TimestamptzHandlingMode, }; use super::redis::{ - KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE, REDIS_VALUE_TYPE_GEO, - REDIS_VALUE_TYPE_STRING, VALUE_FORMAT, + CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE, + REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, VALUE_FORMAT, }; use crate::sink::encoder::{ AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode, @@ -328,6 +328,30 @@ impl EncoderBuild for TemplateEncoder { TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name) } }, + REDIS_VALUE_TYPE_PUBSUB => match pk_indices { + Some(_) => { + let channel = b.format_desc.options.get(CHANNEL).cloned(); + let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned(); + if (channel.is_none() && channel_column.is_none()) + || (channel.is_some() && channel_column.is_some()) + { + return Err(SinkError::Config(anyhow!( + "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set" + ))); + } + TemplateEncoder::new_pubsub_key(b.schema, pk_indices, channel, channel_column) + } + None => { + let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it.")) + })?; + Ok(TemplateEncoder::new_string( + b.schema, + pk_indices, + template.clone(), + )) + } + }, _ => Err(SinkError::Config(anyhow!( "The value type {} is not supported", redis_value_type diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 67db7261933ae..8676a8350e6fc 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -46,9 +46,12 @@ pub const VALUE_FORMAT: &str = "value_format"; pub const REDIS_VALUE_TYPE: &str = "redis_value_type"; pub const REDIS_VALUE_TYPE_STRING: &str = "string"; pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial"; +pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub"; pub const LON_NAME: &str = "longitude"; pub const LAT_NAME: &str = "latitude"; pub const MEMBER_NAME: &str = "member"; +pub const CHANNEL: &str = "channel"; +pub const CHANNEL_COLUMN: &str = "channel_column"; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct RedisCommon { @@ -100,6 +103,12 @@ impl RedisPipe { ) => { pipe.geo_add(key, (lon, lat, member)); } + ( + RedisSinkPayloadWriterInput::RedisPubSubKey(key), + RedisSinkPayloadWriterInput::String(v), + ) => { + pipe.publish(key, v); + } _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())), }, RedisPipe::Single(pipe) => match (k, v) { @@ -115,6 +124,12 @@ impl RedisPipe { ) => { pipe.geo_add(key, (lon, lat, member)); } + ( + RedisSinkPayloadWriterInput::RedisPubSubKey(key), + RedisSinkPayloadWriterInput::String(v), + ) => { + pipe.publish(key, v); + } _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())), }, }; @@ -280,12 +295,6 @@ impl Sink for RedisSink { self.format_desc.encode, super::catalog::SinkEncode::Template ) { - let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find '{KEY_FORMAT}', please set it or use JSON" - )) - })?; - TemplateStringEncoder::check_string_format(key_format, &pk_map)?; match self .format_desc .options @@ -294,6 +303,12 @@ impl Sink for RedisSink { { // if not set, default to string Some(REDIS_VALUE_TYPE_STRING) | None => { + let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find '{KEY_FORMAT}', please set it or use JSON" + )) + })?; + TemplateStringEncoder::check_string_format(key_format, &pk_map)?; let value_format = self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { SinkError::Config(anyhow!( @@ -303,6 +318,13 @@ impl Sink for RedisSink { TemplateStringEncoder::check_string_format(value_format, &all_map)?; } Some(REDIS_VALUE_TYPE_GEO) => { + let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find '{KEY_FORMAT}', please set it or use JSON" + )) + })?; + TemplateStringEncoder::check_string_format(key_format, &pk_map)?; + let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| { SinkError::Config(anyhow!( "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`" @@ -350,9 +372,35 @@ impl Sink for RedisSink { ))); } } + Some(REDIS_VALUE_TYPE_PUBSUB) => { + let channel = self.format_desc.options.get(CHANNEL); + let channel_column = self.format_desc.options.get(CHANNEL_COLUMN); + if (channel.is_none() && channel_column.is_none()) + || (channel.is_some() && channel_column.is_some()) + { + return Err(SinkError::Config(anyhow!( + "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set" + ))); + } + + if let Some(channel_column) = channel_column + && let Some(channel_column_type) = all_map.get(channel_column) + && (channel_column_type != &DataType::Varchar) + { + return Err(SinkError::Config(anyhow!( + "`{CHANNEL_COLUMN}` must be set to `varchar`" + ))); + } + + let value_format = + self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`")) + })?; + TemplateStringEncoder::check_string_format(value_format, &all_map)?; + } _ => { return Err(SinkError::Config(anyhow!( - "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}`" + "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`" ))); } }