10000 add compressionType,enableChunking for pulsar client , upgrade dependencies for security by zhangjiayin · Pull Request #5191 · alibaba/canal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

add compressionType,enableChunking for pulsar client , upgrade dependencies for security #5191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.4</version>
<version>2.5.15</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -48,7 +48,12 @@
<dependency>
<groupId>io.ebean</groupId>
<artifactId>ebean</artifactId>
<version>11.41.1</version>
<version>11.45.1</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion client-adapter/launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.4</version>
<version>2.5.15</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions client-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>2.5.4</version>
<version>2.5.15</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
Expand Down Expand Up @@ -238,7 +238,7 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.29</version>
<version>2.0</version>
</dependency>
<!-- test -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,18 @@ public class PulsarMQConstants {
public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";

/**
* Pulsar admin服务器地址
* Pulsar 监听器名字
*/
public static final String PULSARMQ_LISTENER_NAME = ROOT + "." + "listenerName";


/**
* Pulsar 开启chunking
*/
public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking";

/**
* Pulsar 压缩算法
*/
public static final String PULSARMQ_COMPRESSION_TYPE = ROOT + "." + "compressionType";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public class PulsarMQProducerConfig extends MQProperties {
*/
private String listenerName;

/**
* enableChunking
*/
private boolean enableChunking;

/**
* compressionType
*/
private String compressionType;

public String getServerUrl() {
return serverUrl;
}
Expand Down Expand Up @@ -81,4 +91,18 @@ public String getListenerName() {
public void setListenerName(String listenerName) {
this.listenerName = listenerName;
}

public void setEnableChunking(boolean enableChunking) {
this.enableChunking = enableChunking;
}
public boolean getEnableChunking() {
return this.enableChunking;
}

public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getCompressionType() {
return this.compressionType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ private void loadPulsarMQProperties(Properties properties) {
tmpProperties.setListenerName(listenerName);
}

String enableChunkingStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_CHUNKING);
if (!StringUtils.isEmpty(enableChunkingStr)) {
tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr));
}

String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE);
if (!StringUtils.isEmpty(compressionType)) {
tmpProperties.setCompressionType(compressionType);
}


if (logger.isDebugEnabled()) {
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
}
Expand Down Expand Up @@ -408,11 +419,34 @@ private Producer<byte[]> getProducer(String topic) {
}

// 创建指定topic的生产者
producer = client.newProducer()
.topic(fullTopic)
ProducerBuilder producerBuilder = client.newProducer();
if(pulsarMQProperties.getEnableChunking()){
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
}

if(!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) {
switch(pulsarMQProperties.getCompressionType().toLowerCase()) {
case "lz4":
producerBuilder.compressionType(CompressionType.LZ4);
break;
case "zlib":
producerBuilder.compressionType(CompressionType.ZLIB);
break;
case "zstd":
producerBuilder.compressionType(CompressionType.ZSTD);
break;
case "snappy":
producerBuilder.compressionType(CompressionType.SNAPPY);
break;
}
}

producer = producerBuilder.topic(fullTopic)
// 指定路由器
.messageRouter(new MessageRouterImpl(topic))
.create();

// 放入缓存
PRODUCERS.put(topic, producer);
}
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@
<java_target_version>1.8</java_target_version>
<file_encoding>UTF-8</file_encoding>
<javadoc_skip>true</javadoc_skip>
<spring_version>5.3.9</spring_version>
<spring_version>5.3.26</spring_version>
<log4j_version>2.17.0</log4j_version>
<rocketmq_version>4.8.0</rocketmq_version>
<rocketmq_version>4.9.8</rocketmq_version>
<rabbitmq_version>5.18.0</rabbitmq_version>
<mq_amqp_client>1.0.3</mq_amqp_client>
<kafka_version>2.4.0</kafka_version>
<pulsar_version>2.8.1</pulsar_version>
<pulsar_version>2.11.4</pulsar_version>
<mysql_driver_version>5.1.48</mysql_driver_version>
<maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
<maven-surefire.version>2.22.1</maven-surefire.version>
Expand Down
0