8000 feat(sink): support redis sink with pubsub format (#20991) by github-actions[bot] · Pull Request #21663 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(sink): support redis sink with pubsub format (#20991) #21663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/connector/src/sink/encoder/template.rs
8000
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub enum TemplateEncoder {
String(TemplateStringEncoder),
RedisGeoKey(TemplateRedisGeoKeyEncoder),
RedisGeoValue(TemplateRedisGeoValueEncoder),
RedisPubSubKey(TemplateRedisPubSubKeyEncoder),
}
impl TemplateEncoder {
pub fn new_string(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
Expand Down Expand Up @@ -55,6 +56,17 @@ impl TemplateEncoder {
TemplateRedisGeoKeyEncoder::new(schema, col_indices, member_name, template)?,
))
}

pub fn new_pubsub_key(
schema: Schema,
col_indices: Option<Vec<usize>>,
channel: Option<String>,
channel_column: Option<String>,
) -> Result<Self> {
Ok(TemplateEncoder::RedisPubSubKey(
TemplateRedisPubSubKeyEncoder::new(schema, col_indices, channel, channel_column)?,
))
}
}
impl RowEncoder for TemplateEncoder {
type Output = TemplateEncoderOutput;
Expand All @@ -64,6 +76,7 @@ impl RowEncoder for TemplateEncoder {
TemplateEncoder::String(encoder) => &encoder.schema,
TemplateEncoder::RedisGeoValue(encoder) => &encoder.schema,
TemplateEncoder::RedisGeoKey(encoder) => &encoder.key_encoder.schema,
TemplateEncoder::RedisPubSubKey(encoder) => &encoder.schema,
}
}

Expand All @@ -72,6 +85,7 @@ impl RowEncoder for TemplateEncoder {
TemplateEncoder::String(encoder) => encoder.col_indices.as_deref(),
TemplateEncoder::RedisGeoValue(encoder) => encoder.col_indices.as_deref(),
TemplateEncoder::RedisGeoKey(encoder) => encoder.key_encoder.col_indices.as_deref(),
TemplateEncoder::RedisPubSubKey(encoder) => encoder.col_indices.as_deref(),
}
}

Expand All @@ -86,6 +100,7 @@ impl RowEncoder for TemplateEncoder {
)),
TemplateEncoder::RedisGeoValue(encoder) => encoder.encode_cols(row, col_indices),
TemplateEncoder::RedisGeoKey(encoder) => encoder.encode_cols(row, col_indices),
TemplateEncoder::RedisPubSubKey(encoder) => encoder.encode_cols(row, col_indices),
}
}
}
Expand Down Expand Up @@ -259,13 +274,82 @@ impl TemplateRedisGeoKeyEncoder {
}
}

pub enum TemplateRedisPubSubKeyEncoderInner {
PubSubName(String),
PubSubColumnIndex(usize),
}
pub struct TemplateRedisPubSubKeyEncoder {
inner: TemplateRedisPubSubKeyEncoderInner,
schema: Schema,
col_indices: Option<Vec<usize>>,
}

impl TemplateRedisPubSubKeyEncoder {
pub fn new(
schema: Schema,
col_indices: Option<Vec<usize>>,
channel: Option<String>,
channel_column: Option<String>,
) -> Result<Self> {
if let Some(channel) = channel {
return Ok(Self {
inner: TemplateRedisPubSubKeyEncoderInner::PubSubName(channel),
schema,
col_indices,
});
}
if let Some(channel_column) = channel_column {
let channel_column_index = schema
.names_str()
.iter()
.position(|name| name == &channel_column)
.ok_or_else(|| {
SinkError::Redis(format!(
"Can't find pubsub column({}) in schema",
channel_column
))
})?;
return Ok(Self {
inner: TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(channel_column_index),
schema,
col_indices,
});
}
Err(SinkError::Redis(
"`channel` or `channel_column` must be set".to_owned(),
))
}

pub fn encode_cols(
&self,
row: impl Row,
_col_indices: impl Iterator<Item = usize>,
) -> Result<TemplateEncoderOutput> {
match &self.inner {
TemplateRedisPubSubKeyEncoderInner::PubSubName(channel) => {
Ok(TemplateEncoderOutput::RedisPubSubKey(channel.clone()))
}
TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(pubsub_col) => {
let pubsub_key = row
.datum_at(*pubsub_col)
.ok_or_else(|| SinkError::Redis("pubsub_key is null".to_owned()))?
.to_text()
.clone();
Ok(TemplateEncoderOutput::RedisPubSubKey(pubsub_key))
}
}
}
}

pub enum TemplateEncoderOutput {
// String formatted according to the template
String(String),
// The value of redis's geospatial, including longitude and latitude
RedisGeoValue((String, String)),
// The key of redis's geospatial, including redis's key and member
RedisGeoKey((String, String)),

RedisPubSubKey(String),
}

impl TemplateEncoderOutput {
Expand All @@ -278,6 +362,7 @@ impl TemplateEncoderOutput {
TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
"RedisGeoVelue can't convert to string".to_owned(),
)),
TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s),
}
}
}
Expand All @@ -292,18 +377,21 @@ impl SerTo<String> for TemplateEncoderOutput {
TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
"RedisGeoVelue can't convert to string".to_owned(),
)),
TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s),
}
}
}

/// The enum of inputs to `RedisSinkPayloadWriter`
#[derive(Debug)]
pub enum RedisSinkPayloadWriterInput {
// Json and String will be convert to string
String(String),
// The value of redis's geospatial, including longitude and latitude
RedisGeoValue((String, String)),
// The key of redis's geospatial, including redis's key and member
RedisGeoKey((String, String)),
RedisPubSubKey(String),
}

impl SerTo<RedisSinkPayloadWriterInpu 8000 t> for TemplateEncoderOutput {
Expand All @@ -316,6 +404,9 @@ impl SerTo<RedisSinkPayloadWriterInput> for TemplateEncoderOutput {
TemplateEncoderOutput::RedisGeoValue((key, member)) => {
Ok(RedisSinkPayloadWriterInput::RedisGeoValue((key, member)))
}
TemplateEncoderOutput::RedisPubSubKey(s) => {
Ok(RedisSinkPayloadWriterInput::RedisPubSubKey(s))
}
}
}
}
Expand Down
28 changes: 26 additions & 2 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use super::encoder::{
TimestamptzHandlingMode,
};
use super::redis::{
KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE, REDIS_VALUE_TYPE_GEO,
REDIS_VALUE_TYPE_STRING, VALUE_FORMAT,
CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, VALUE_FORMAT,
};
use crate::sink::encoder::{
AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
Expand Down Expand Up @@ -328,6 +328,30 @@ impl EncoderBuild for TemplateEncoder {
TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
}
},
REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
Some(_) => {
let channel = b.format_desc.options.get(CHANNEL).cloned();
let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
if (channel.is_none() && channel_column.is_none())
|| (channel.is_some() && channel_column.is_some())
{
return Err(SinkError::Config(anyhow!(
"`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
)));
}
TemplateEncoder::new_pubsub_key(b.schema, pk_indices, channel, channel_column)
}
None => {
let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
})?;
Ok(TemplateEncoder::new_string(
b.schema,
pk_indices,
template.clone(),
))
}
},
_ => Err(SinkError::Config(anyhow!(
"The value type {} is not supported",
redis_value_type
Expand Down
62 changes: 55 additions & 7 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ pub const VALUE_FORMAT: &str = "value_format";
pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
pub const REDIS_VALUE_TYPE_STRING: &str = "string";
pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
pub const LON_NAME: &str = "longitude";
pub const LAT_NAME: &str = "latitude";
pub const MEMBER_NAME: &str = "member";
pub const CHANNEL: &str = "channel";
pub const CHANNEL_COLUMN: &str = "channel_column";

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct RedisCommon {
Expand Down Expand Up @@ -100,6 +103,12 @@ impl RedisPipe {
) => {
pipe.geo_add(key, (lon, lat, member));
}
(
RedisSinkPayloadWriterInput::RedisPubSubKey(key),
RedisSinkPayloadWriterInput::String(v),
) => {
pipe.publish(key, v);
}
_ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
},
RedisPipe::Single(pipe) => match (k, v) {
Expand All @@ -115,6 +124,12 @@ impl RedisPipe {
) => {
pipe.geo_add(key, (lon, lat, member));
}
(
RedisSinkPayloadWriterInput::RedisPubSubKey(key),
RedisSinkPayloadWriterInput::String(v),
) => {
pipe.publish(key, v);
}
_ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
},
};
Expand Down Expand Up @@ -280,12 +295,6 @@ impl Sink for RedisSink {
self.format_desc.encode,
super::catalog::SinkEncode::Template
) {
let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find '{KEY_FORMAT}', please set it or use JSON"
))
})?;
TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
match self
.format_desc
.options
Expand All @@ -294,6 +303,12 @@ impl Sink for RedisSink {
{
// if not set, default to string
Some(REDIS_VALUE_TYPE_STRING) | None => {
let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find '{KEY_FORMAT}', please set it or use JSON"
))
})?;
TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
let value_format =
self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
Expand All @@ -303,6 +318,13 @@ impl Sink for RedisSink {
TemplateStringEncoder::check_string_format(value_format, &all_map)?;
}
Some(REDIS_VALUE_TYPE_GEO) => {
let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find '{KEY_FORMAT}', please set it or use JSON"
))
})?;
TemplateStringEncoder::check_string_format(key_format, &pk_map)?;

let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
Expand Down Expand Up @@ -350,9 +372,35 @@ impl Sink for RedisSink {
)));
}
}
Some(REDIS_VALUE_TYPE_PUBSUB) => {
let channel = self.format_desc.options.get(CHANNEL);
let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
if (channel.is_none() && channel_column.is_none())
|| (channel.is_some() && channel_column.is_some())
{
return Err(SinkError::Config(anyhow!(
"`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
)));
}

if let Some(channel_column) = channel_column
&& let Some(channel_column_type) = all_map.get(channel_column)
&& (channel_column_type != &DataType::Varchar)
{
return Err(SinkError::Config(anyhow!(
"`{CHANNEL_COLUMN}` must be set to `varchar`"
)));
}

let value_format =
self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
})?;
TemplateStringEncoder::check_string_format(value_format, &all_map)?;
}
_ => {
return Err(SinkError::Config(anyhow!(
"`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}`"
"`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
)));
}
}
Expand Down
0