一个支持多数据源、使用api集成、依赖spark及flink大数据计算平台完成数据流的批量及实时同步的数据引擎,目前支持的功能有:数据库预览、 数据库同步及同步任务回调的功能,支持的数据源包括:Mysql、达梦、oracle、starrocks、ftp/sftp等。
依赖 | 版本号 | 说明 |
---|---|---|
java | 1.8 | 初个版本为了兼容之前老项目,后续会逐步升级 |
spark | 3.4.3 | 用于历史数据的批量处理 |
starrocks-spark-connector | 3.4._2.12 | spark同步starrocks依赖 |
flink | 1.9.1 | 实时数据流处理基础依赖 |
flink-cdc | 3.3.0 | 利用其flink-cdc实现数据实时更新同步 |
flink-connector-starrocks | 1.2.10 | flink同步starrocks的依赖 |
jsch | 0.1.55 | 处理SFTP文件协议 |
antlr | 4.9.3 | 数据库sql解析与自定义映射 |
common-nets | 3.8.0 | 处理FTP文件协议 |
mysql-connector-java | 8.0.28 | mysql连接驱动 |
ojdbc | 21.1.0.0 | oracle连接驱动 |
DmJdbcDeriver | 8.1.3.140 | 达梦数据库连接驱动 |
<parent>
<groupId>com.ybt.seaotter</groupId>
<artifactId>core</artifactId>
<version>1.0</version>
</parent>
SeaOtterConfig seaOtterConfig = SeaOtterConfig.builder()
.sparkOptions(new SparkOptions("<host>", 6066))
.flinkOptions(new FlinkOptions("<host>", 8081))
.callback("<callbackUrl>")
.build();
seaOtter = SeaOtter.config(seaOtterConfig);
private final SourceConnector source = new MysqlConnector()
.setHost("<host>")
.setPort(3306)
.setUsername("<username>")
.setPassword("<pwd>")
.setDatabase("<db>")
.setTable("<table>");
private final SourceConnector sink = new StarrocksConnector()
.setHost("<host>")
.setHttpPort(8080)
.setRpcPort(9030)
.setUsername("<username>")
.setPassword("<pwd>")
.setDatabase("<db>")
.setTable("<table>");
// 批处理全量同步
seaOtter.job()
.from(source).to(sink)
.batchMode(TransmissionMode.OVERWRITE)
.submit()
// 实时同步
seaOtter.job().from(source).to(sink).CDCMode().submit()
// 查询所有数据库
List<String> databases = seaOtter.db(source).databases();
// 查询指定数据库下的所有表
List<String> databases = seaOtter.db(source).database("<db>").tables();
...
// 文件系统操作
List<FileObject> files = seaOtter.file(source).list("/", Lists.newArrayList("txt, csv"));
...