8000 add tablestore adapter by 364102729 · Pull Request #3754 · alibaba/canal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

add tablestore adapter #3754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class CanalClientConfig {
// canal adapters 配置
private List<CanalAdapter> canalAdapters;

private Boolean terminateOnException = false;

public String getCanalServerHost() {
return canalServerHost;
}
Expand Down Expand Up @@ -222,6 +224,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public Boolean getTerminateOnException() {
return terminateOnException;
}

public void setTerminateOnException(Boolean terminateOnException) {
this.terminateOnException = terminateOnException;
}

public static class CanalAdapter {

private String instance; // 实例名
Expand Down
13 changes: 13 additions & 0 deletions client-adapter/launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@
<classifier>jar-with-dependencies</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>client-adapter.tablestore</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
<classifier>jar-with-dependencies</classifier>
<scope>provided</scope>
</dependency>
<!-- connector plugin -->
<dependency>
<groupId>com.alibaba.otter</groupId>
Expand Down
7 changes: 7 additions & 0 deletions client-adapter/launcher/src/main/assembly/dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
<exclude>META-INF/**</exclude>
</excludes>
</fileSet>
<fileSet>
<directory>../tablestore/src/main/resources/</directory>
<outputDirectory>/conf</outputDirectory>
<excludes>
<exclude>META-INF/**</exclude>
</excludes>
</fileSet>
<fileSet>
<directory>target</directory>
<outputDirectory>logs</outputDirectory>
Expand Down
7 changes: 7 additions & 0 deletions client-adapter/launcher/src/main/assembly/release.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@
<exclude>META-INF/**</exclude>
</excludes>
</fileSet>
<fileSet>
<directory>../tablestore/src/main/resources/</directory>
<outputDirectory>/conf</outputDirectory>
<excludes>
<exclude>META-INF/**</exclude>
</excludes>
</fileSet>
<fileSet>
<directory>target</directory>
<outputDirectory>logs</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -217,8 +218,15 @@ private void process() {
canalMsgConsumer.rollback(); // 处理失败, 回滚数据
logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));
} else {
canalMsgConsumer.ack();
logger.error(e.getMessage() + " Error sync but ACK!");
if (canalClientConfig.getTerminateOnException()) {
canalMsgConsumer.rollback();
logger.error("Retry fail, turn switch off and abort data transfer.");
syncSwitch.off(canalDestination);
logger.error("finish turn off switch of destination:" + canalDestination);
} else {
canalMsgConsumer.ack();
logger.error(e.getMessage() + " Error sync but ACK!");
}
}
Thread.sleep(500);
}
Expand Down
20 changes: 20 additions & 0 deletions client-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<module>escore</module>
<module>kudu</module>
<module>phoenix</module>
<module>tablestore</module>
</modules>

<licenses>
Expand Down Expand Up @@ -251,6 +252,25 @@
<version>1.9.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.10.3</version>
<classifier>jar-with-dependencies</classifier>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
</exclusion>
</exclusions>
</dependency>


</dependencies>
</dependencyManagement>

Expand Down
93 changes: 93 additions & 0 deletions client-adapter/tablestore/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>canal.client-adapter</artifactId>
<groupId>com.alibaba.otter</groupId>
<version>1.1.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.otter</groupId>
<artifactId>client-adapter.tablestore</artifactId>
<packaging>jar</packaging>
<name>canal client adapter rdb module for otter ${project.version}</name>

<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>client-adapter.common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.10.3</version>
<classifier>jar-with-dependencies</classifier>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
</exclusion>
</exclusions>
</dependency>


</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy todir="${project.basedir}/../launcher/target/classes/tablestore" overwrite="true">
<fileset dir="${project.basedir}/target/classes/tablestore" erroronmissingdir="true">
<include name="*.yml" />
</fileset>
</copy>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading
0