8000 Added logic to write history tables in a separate connector mode. by subkanthi · Pull Request #1063 · Altinity/clickhouse-sink-connector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added logic to write history tables in a separate connector mode. #1063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 2.7.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
version: "3.8"

services:
clickhouse-sink-connector-lt:
image: ${CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE}
entrypoint: ["sh", "-c", "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Xms4g -Xmx4g -Dlog4j2.configurationFile=log4j2.xml -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=39999 -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"]
restart: "no"
network_mode: "host"
depends_on:
- clickhouse
# ports:
# - "8089:8083"
# - "5006:5005"
# - "7009:7000"
network_mode: "host"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
Expand Down
4 changes: 2 additions & 2 deletions sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ database.server.name: "ER54"
database.include.list: sbtest

# table.include.list An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
table.include.list: ""
#table.include.list: ""

# Clickhouse Server URL, Specify only the hostname.
clickhouse.server.url: "clickhouse"
Expand Down Expand Up @@ -185,7 +185,7 @@ clickhouse.jdbc.settings: "input_format_null_as_default=0,allow_experimental_obj
#Metrics (Prometheus target), required for Grafana Dashboard
metrics.enable: "true"

#connection.pool.disable: "true"
connection.pool.disable: "true"
#connection.pool.max.size: 500

# Skip schema history capturing, use the following configuration
Expand Down
1 change: 0 additions & 1 deletion sink-connector-lightweight/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3.4"

# Ubuntu , set this for redpanda to start
# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm
Expand Down
5 changes: 5 additions & 0 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@
<artifactId>jackson-databind</artifactId>
<version>2.12.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.12.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants;
import com.altinity.clickhouse.sink.connector.db.DBMetadata;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
Expand Down Expand Up @@ -39,6 +40,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.altinity.clickhouse.sink.connector.config.ReplicationHistoryConfig.loadReplicationHistoryEnable;
import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*;

/**
* Sets up Debezium engine with the configuration passed by the user,
* and creates a separate thread pool to read the records that are
Expand Down Expand Up @@ -145,6 +149,8 @@ public void setupDebeziumEventCapture(Properties props,
ClickHouseSinkConnectorConfig config)
throws IOException, ClassNotFoundException {

// System.out.println("setupDebeziumEventCapture");

DBCredentials dbCredentials = parseDBConfiguration(config);
systemDbConnection = setSystemDbConnection(dbCredentials, config);

Expand Down Expand Up @@ -305,6 +311,7 @@ public void setup(Properties props,
props.getProperty(ClickHouseSinkConnectorConfigVariables.METRICS_ENDPOINT_PORT.toString()));

// Start Debezium event loop if it is requested from REST API.
// System.out.println("setupProcessingThread");
if (!config.getBoolean(ClickHouseSinkConnectorConfigVariables.SKIP_REPLICA_START.toString())
|| forceStart) {
this.setupProcessingThread(config);
Expand Down Expand Up @@ -413,6 +420,12 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
while (numRetries < MAX_DDL_RETRIES) {
try {
executeDDL(clickHouseQuery.toString(), writer);

if(loadReplicationHistoryEnable()){
executeDDL(modifySqlStatement(transformTableName(clickHouseQuery.toString())), writer);
// executeDDL(transformTableName(clickHouseQuery.toString()), writer);
}

DebeziumOffsetManagement.acknowledgeRecords(recordCommitter, cdcRecord, lastRecordInBatch);
break;
} catch (Exception e) {
Expand Down Expand Up @@ -496,6 +509,105 @@ private void executeDDL(String clickHouseQuery, BaseDbWriter writer) throws SQLE
}
}

/**
* Transforms the first occurrence of ALTER TABLE or CREATE TABLE in the
* original SQL by appending "_history" to the captured table name.
*
* @param original the original SQL statement
* @return the transformed SQL statement, or the original if no match was found
*/
private static String transformTableName(String original) {
// Build a regex that matches 'ALTER TABLE ' or 'CREATE TABLE ',
// captures the schema.table identifier, and asserts that the next character
// is either a space or an opening parenthesis.
Pattern pattern = Pattern.compile(
"((?:ALTER TABLE|CREATE TABLE)\\s+)" + // group(1): the clause + trailing whitespace
"([\\w\\.]+)" + // group(2): schema.table
"(?=\\s|\\()", // lookahead: ensure next char is space or '('
Pattern.CASE_INSENSITIVE // allow case-insensitive matching
);

Matcher matcher = pattern.matcher(original);
if (matcher.find()) {
String clause = matcher.group(1); // e.g. "ALTER TABLE " or "CREATE TABLE "
String table = matcher.group(2); // e.g. "employees.contacts"
// Rebuild the prefix + table + "_history", then replace only the first occurrence
return matcher.replaceFirst(clause + table + "_history");
}

// No ALTER/CREATE TABLE match -> return unchanged
return original;
}

/**
* Modifies a dynamically generated CREATE TABLE statement when creating TM history table:
* 1. Modifies the ENGINE from ReplacingMergeTree to MergeTree.
* 2. Adds new columns before the ENGINE clause: operation, database, table, _raw, _time, host, logfile, position, primary_host.
* 3. Keeps the original ORDER BY clause intact.
* 4. Only performs the transformation if the SQL starts with CREATE TABLE.
*
* @param createTableSql The original CREATE TABLE SQL statement.
* @return The modified CREATE TABLE SQL statement.
*/
private static String modifySqlStatement(String createTableSql) {
// 1. Only process if the SQL starts with CREATE TABLE
if (!createTableSql.trim().toUpperCase().startsWith("CREATE TABLE")) {
// If it's not CREATE TABLE, return the original SQL
return createTableSql;
}

// 2. Locate the ENGINE=ReplacingMergeTree part in the SQL
Pattern enginePattern = Pattern.compile("(?i)ENGINE\\s*=\\s*ReplacingMergeTree\\(.*\\)");
Matcher engineMatcher = enginePattern.matcher(createTableSql);

if (engineMatcher.find()) {
// 3. Extract the part before ENGINE (column definitions part)
String beforeEngine = createTableSql.substring(0, engineMatcher.start()).trim();

// Remove the last ')' from the column definitions (before the engine part)
if (beforeEngine.endsWith(")")) {
beforeEngine = beforeEngine.substring(0, beforeEngine.length() - 1).trim(); // Remove the last ')'
}

// 4. Locate the ORDER BY clause
Pattern orderByPattern = Pattern.compile("(?i)ORDER\\s+BY\\s+.*");
Matcher orderByMatcher = orderByPattern.matcher(createTableSql);
String orderByClause = "";
if (orderByMatcher.find()) {
orderByClause = createTableSql.substring(orderByMatcher.start()).trim(); // Get ORDER BY clause
}

// 5. Add the new columns before the ENGINE part, ensuring correct placement of parentheses
String extraColumns = ", "
+ OPERATION_COLUMN + " " + OPERATION_COLUMN_DATA_TYPE + ", "
+ DATABASE_COLUMN + " " + DATABASE_COLUMN_DATA_TYPE + ", "
+ TABLE_COLUMN + " " + TABLE_COLUMN_DATA_TYPE + ", "
+ RAW_COLUMN + " " + RAW_COLUMN_DATA_TYPE + ", "
+ TIME_COLUMN + " " + TIME_COLUMN_DATA_TYPE + ", "
+ HOST_COLUMN + " " + HOST_COLUMN_DATA_TYPE + ", "
+ LOGFILE_COLUMN + " " + LOGFILE_COLUMN_DATA_TYPE + ", "
+ POSITION_COLUMN + " " + POSITION_COLUMN_DATA_TYPE + ", "
+ PRIMARY_HOST_COLUMN + " " +PRIMARY_HOST_COLUMN_DATA_TYPE;

// 6. Replace ENGINE with "ENGINE = MergeTree()" and add a closing parenthesis before ENGINE
String modifiedEngine = ") ENGINE = MergeTree()";

// 7. Combine the modified SQL with the new columns and the original ORDER BY clause
// Ensure the parentheses are correctly closed and combine the modified SQL
String modifiedSql = beforeEngine + extraColumns + modifiedEngine;

// 8. Add the ORDER BY clause if it exists
if (!orderByClause.isEmpty()) {
modifiedSql += " " + orderByClause;
}

return modifiedSql;
} else {
// Throw an exception if ENGINE=ReplacingMergeTree is not found
throw new IllegalArgumentException("The original SQL does not contain ENGINE=ReplacingMergeTree part: " + createTableSql);
}
}

/**
* Updates the DDL metrics using the Metrics class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.DataTypeConverter;

import static com.altinity.clickhouse.sink.connector.config.DefaultColumnDataTypeMappingConfig.loadDefaultColumnDataTypeMapping;
import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*;
import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;

Expand Down Expand Up @@ -297,6 +299,7 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
*/
private Set<String> parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
StringBuilder partitionByColumns) {

List<ParseTree> pt = ctx.children;
Set<String> columnNames = new HashSet<>();

Expand Down Expand Up @@ -481,6 +484,13 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree
chDataType = DataTypeConverter.convertToString(this.config, columnName,
scale, precision, dtc, this.userProvidedTimeZone);

Map<String, String> defaultColumnDataTypeMap = loadDefaultColumnDataTypeMapping();

// Use a single null check with optional.
if (defaultColumnDataTypeMap != null) {
chDataType = defaultColumnDataTypeMap.getOrDefault(columnName, chDataType);
}

return chDataType;
}

Expand Down Expand Up @@ -651,6 +661,14 @@ else if(columnDefChild.getText().equalsIgnoreCase(Constants.NOT_NULL)) {
}
}

// Call the method to load the default column data type mapping.
/*Map<String, String> defaultColumnDataTypeMap = loadDefaultColumnDataTypeMapping();

// Use a single null check with optional.
if (defaultColumnDataTypeMap != null) {
columnType = defaultColumnDataTypeMap.getOrDefault(columnName, columnType);
}*/

// If column name and column type are defined, append them to the query.
if (columnName != null && columnType != null)
if (isNullColumn) {
Expand Down
Loading
Loading
0