8000 feat(sink): enable user-defined primary key for upsert sink by xx01cyx · Pull Request #8610 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(sink): enable user-defined primary key for upsert sink #8610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,29 @@ cargo make ci-start ci-1cn-1fe

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
# sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sleep 1

# check sink destination postgres
# sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
# sleep 1
# sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
# sleep 1
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
sleep 1
sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

# check sink destination mysql using shell
# if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
# if ($1 == 1 && $2 == "Alex") c1++;
# if ($1 == 3 && $2 == "Carl") c2++;
# if ($1 == 4 && $2 == "Doris") c3++;
# if ($1 == 5 && $2 == "Eve") c4++;
# if ($1 == 6 && $2 == "Frank") c5++; }
# END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
# echo "mysql sink check passed"
# else
# echo "The output is not as expected."
# exit 1
# fi
if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
if ($1 == 1 && $2 == "Alex") c1++;
if ($1 == 3 && $2 == "Carl") c2++;
if ($1 == 4 && $2 == "Doris") c3++;
if ($1 == 5 && $2 == "Eve") c4++;
if ($1 == 6 && $2 == "Frank") c5++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
echo "mysql sink check passed"
else
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
pkill -f connector-node
Expand Down
31 changes: 16 additions & 15 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' )
explain create sink sink_t from t with ( connector = 'kafka', type = 'append-only' )

statement ok
drop table t;
4 changes: 2 additions & 2 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );

statement ok
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );

# Create a mview with duplicated name.
statement error
Expand Down
38 changes: 7 additions & 31 deletions e2e_test/sink/append_only_sink.slt
3 changes: 2 additions & 1 deletion e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
@@ -1,35 +1,20 @@
statement ok
create table t1 (v1 int, v2 int);

statement error No primary key for the upsert sink
create sink s1 from t1 with (connector = 'console');

statement ok
create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console');

statement ok
create table t2 (v1 int, v2 int primary key);

statement ok
create sink s2 from t2 with (connector = 'console');

statement error No primary key for the upsert sink
create sink s3 as select avg(v1) from t2 with (connector = 'console');
create table t (v1 int, v2 int);

statement ok
create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
create sink s1 from t with (connector = 'console');

statement ok
create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console');
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console');

statement error The sink cannot be append-only
create sink s5 from t2 with (connector = 'console', format = 'append_only');
create sink s3 from t with (connector = 'console', type = 'append-only');

statement ok
create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true');
create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true');

statement ok
drop sink s1
Expand All @@ -41,13 +26,4 @@ statement ok
drop sink s3

statement ok
drop sink s4

statement ok
drop sink s5

statement ok
drop table t1

statement ok
drop table t2
drop table t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
sink.mode='append-only',
type = 'upsert',
primary_key = 'v1',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether we should use primary.key or primary_key? primary.key seems more consistent with other configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO a.b implies that b is a subcategory (or sub-configuration) under a, so I prefer primary_key. Remember we also have a force_append_only option. cc. @tabVersion @st1page @fuyufjh

warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ from
s5 with (
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'sink_target',
format = 'append_only',
type = 'append-only',
connector = 'kafka'
)

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE SINK bhv_iceberg_sink
FROM
bhv_mv WITH (
connector = 'iceberg',
sink.mode='upsert',
type = 'upsert',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
.collect(Collectors.toList()));
}

/** @deprecated pk here is from Risingwave, it may not match the pk in the database */
@Deprecated
public List<String> getPrimaryKeys() {
return primaryKeys;
}
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_print_sink(input_file):

def test_iceberg_sink(input_file):
test_sink("iceberg",
{"sink.mode":"append-only",
{"type":"append-only",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
Expand All @@ -171,7 +171,7 @@ def test_iceberg_sink(input_file):

def test_upsert_iceberg_sink(input_file):
test_upsert_sink("iceberg",
{"sink.mode":"upsert",
{"type":"upsert",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class IcebergSinkFactory implements SinkFactory {

private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class);

public static final String SINK_MODE_PROP = "sink.mode";
public static final String SINK_TYPE_PROP = "type";
public static final String WAREHOUSE_PATH_PROP = "warehouse.path";
public static final String DATABASE_NAME_PROP = "database.name";
public static final String TABLE_NAME_PROP = "table.name";
Expand All @@ -58,7 +58,7 @@ public class IcebergSinkFactory implements SinkFactory {

@Override
public SinkBase create(TableSchema tableSchema, Map<String, String> tableProperties) {
String mode = tableProperties.get(SINK_MODE_PROP);
String mode = tableProperties.get(SINK_TYPE_PROP);
String warehousePath = getWarehousePath(tableProperties);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);
Expand Down Expand Up @@ -93,22 +93,22 @@ public SinkBase create(TableSchema tableSchema, Map<String, String> tablePropert
@Override
public void validate(
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert
if (!tableProperties.containsKey(SINK_TYPE_PROP) // only append-only, upsert
|| !tableProperties.containsKey(WAREHOUSE_PATH_PROP)
|| !tableProperties.containsKey(DATABASE_NAME_PROP)
|| !tableProperties.containsKey(TABLE_NAME_PROP)) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"%s, %s, %s or %s is not specified",
SINK_MODE_PROP,
SINK_TYPE_PROP,
WAREHOUSE_PATH_PROP,
DATABASE_NAME_PROP,
TABLE_NAME_PROP))
.asRuntimeException();
}

String mode = tableProperties.get(SINK_MODE_PROP);
String mode = tableProperties.get(SINK_TYPE_PROP);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);
String warehousePath = getWarehousePath(tableProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testCreate() throws IOException {
sinkFactory.create(
TableSchema.getMockTableSchema(),
Map.of(
IcebergSinkFactory.SINK_MODE_PROP,
IcebergSinkFactory.SINK_TYPE_PROP,
sinkMode,
IcebergSinkFactory.WAREHOUSE_PATH_PROP,
warehousePath,
Expand Down
Loading
0