8000 GitHub - xiagit000/sea-otter: A data collection tool that relies on spark's batch processing capabilities and flink streaming capabilities
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

A data collection tool that relies on spark's batch processing capabilities and flink streaming capabilities

License

Notifications You must be signed in to change notification settings

xiagit000/sea-otter

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SeaOtter(海獭)-企业级多源异构数据集成引擎

介绍

一个支持多数据源、使用api集成、依赖spark及flink大数据计算平台完成数据流的批量及实时同步的数据引擎,目前支持的功能有:数据库预览、 数据库同步及同步任务回调的功能,支持的数据源包括:Mysql、达梦、oracle、starrocks、ftp/sftp等。

环境要求

依赖 版本号 说明
java 1.8 初个版本为了兼容之前老项目,后续会逐步升级
spark 3.4.3 用于历史数据的批量处理
starrocks-spark-connector 3.4._2.12 spark同步starrocks依赖
flink 1.9.1 实时数据流处理基础依赖
flink-cdc 3.3.0 利用其flink-cdc实现数据实时更新同步
flink-connector-starrocks 1.2.10 flink同步starrocks的依赖
jsch 0.1.55 处理SFTP文件协议
antlr 4.9.3 数据库sql解析与自定义映射
common-nets 3.8.0 处理FTP文件协议
mysql-connector-java 8.0.28 mysql连接驱动
ojdbc 21.1.0.0 oracle连接驱动
DmJdbcDeriver 8.1.3.140 达梦数据库连接驱动

使用示例

导入核心依赖

<parent>
    <groupId>com.ybt.seaotter</groupId>
    <artifactId>core</artifactId>
    <version>1.0</version>
</parent>

初始化sdk

SeaOtterConfig seaOtterConfig = SeaOtterConfig.builder()
    .sparkOptions(new SparkOptions("<host>", 6066))
    .flinkOptions(new FlinkOptions("<host>", 8081))
    .callback("<callbackUrl>")
    .build();
    seaOtter = SeaOtter.config(seaOtterConfig);

定义数据源

private final SourceConnector source = new MysqlConnector()
        .setHost("<host>")
        .setPort(3306)
        .setUsername("<username>")
        .setPassword("<pwd>")
        .setDatabase("<db>")
        .setTable("<table>");

private final SourceConnector sink = new StarrocksConnector()
        .setHost("<host>")
        .setHttpPort(8080)
        .setRpcPort(9030)
        .setUsername("<username>")
        .setPassword("<pwd>")
        .setDatabase("<db>")
        .setTable("<table>");

执行任务

// 批处理全量同步
seaOtter.job()
    .from(source).to(sink)
    .batchMode(TransmissionMode.OVERWRITE)
    .submit()

// 实时同步
seaOtter.job().from(source).to(sink).CDCMode().submit()

数据源操作

// 查询所有数据库
List<String> databases = seaOtter.db(source).databases();

// 查询指定数据库下的所有表
List<String> databases = seaOtter.db(source).database("<db>").tables();

...

// 文件系统操作
List<FileObject> files = seaOtter.file(source).list("/", Lists.newArrayList("txt, csv"));

...

计划支持的数据库

  1. MySQL
  2. PrestoSQL
  3. PostgreSQL
  4. Sql Server
  5. StarRocks
  6. Oracle
  7. OceanBase
  8. Flink SQL / Flink CDC SQL

About

A data collection tool that relies on spark's batch processing capabilities and flink streaming capabilities

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

0