From 94e2e290e75585648fb9b83068716ddede7d02d7 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 20 May 2025 11:37:39 -0400 Subject: [PATCH 1/2] Added logic to write history tables in a separate connector mode. --- .../clickhouse-sink-connector-lt-service.yml | 10 +- sink-connector-lightweight/docker/config.yml | 4 +- .../docker/docker-compose.yml | 1 - sink-connector-lightweight/pom.xml | 5 + .../cdc/DebeziumChangeEventCapture.java | 112 +++++++++++ .../parser/MySqlDDLParserListenerImpl.java | 18 ++ .../MySqlDDLParserListenerImplTest.java | 30 +++ sink-connector/pom.xml | 25 +++ .../connector/db/ClickHouseDbConstants.java | 135 +++++++++++-- .../sink/connector/db/DbWriter.java | 13 +- .../db/batch/PreparedStatementExecutor.java | 102 +++++++++- .../operations/ClickHouseAutoCreateTable.java | 188 ++++++++++++++++-- .../ClickHouseTableOperationsBase.java | 17 ++ .../executor/ClickHouseBatchRunnable.java | 11 + .../executor/ClickHouseBatchWriter.java | 1 + .../ClickHouseAutoCreateTableTest.java | 47 ++++- 16 files changed, 661 insertions(+), 58 deletions(-) diff --git a/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service.yml b/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service.yml index 926d6b3be..7dc71c096 100644 --- a/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service.yml +++ b/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service.yml @@ -1,11 +1,15 @@ -version: "3.8" - services: clickhouse-sink-connector-lt: image: ${CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE} entrypoint: ["sh", "-c", "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Xms4g -Xmx4g -Dlog4j2.configurationFile=log4j2.xml -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=39999 -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"] restart: "no" - network_mode: "host" + depends_on: + - clickhouse +# ports: +# - "8089:8083" +# - "5006:5005" +# - "7009:7000" + network_mode: "host" extra_hosts: - "host.docker.internal:host-gateway" volumes: diff --git a/sink-connector-lightweight/docker/config.yml b/sink-connector-lightweight/docker/config.yml index a0e789e31..05fa9bcd9 100644 --- a/sink-connector-lightweight/docker/config.yml +++ b/sink-connector-lightweight/docker/config.yml @@ -32,7 +32,7 @@ database.server.name: "ER54" database.include.list: sbtest # table.include.list An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored; -table.include.list: "" +#table.include.list: "" # Clickhouse Server URL, Specify only the hostname. clickhouse.server.url: "clickhouse" @@ -185,7 +185,7 @@ clickhouse.jdbc.settings: "input_format_null_as_default=0,allow_experimental_obj #Metrics (Prometheus target), required for Grafana Dashboard metrics.enable: "true" -#connection.pool.disable: "true" +connection.pool.disable: "true" #connection.pool.max.size: 500 # Skip schema history capturing, use the following configuration diff --git a/sink-connector-lightweight/docker/docker-compose.yml b/sink-connector-lightweight/docker/docker-compose.yml index fa01a7fc3..9efa424e2 100644 --- a/sink-connector-lightweight/docker/docker-compose.yml +++ b/sink-connector-lightweight/docker/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.4" # Ubuntu , set this for redpanda to start # https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index ca199fa23..aace79574 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -427,6 +427,11 @@ jackson-databind 2.12.6 + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.12.6 + diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 7a72db306..69514e96a 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -8,6 +8,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.common.Metrics; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants; import com.altinity.clickhouse.sink.connector.db.DBMetadata; import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor; @@ -39,6 +40,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.altinity.clickhouse.sink.connector.config.ReplicationHistoryConfig.loadReplicationHistoryEnable; +import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*; + /** * Sets up Debezium engine with the configuration passed by the user, * and creates a separate thread pool to read the records that are @@ -145,6 +149,8 @@ public void setupDebeziumEventCapture(Properties props, ClickHouseSinkConnectorConfig config) throws IOException, ClassNotFoundException { + // System.out.println("setupDebeziumEventCapture"); + DBCredentials dbCredentials = parseDBConfiguration(config); systemDbConnection = setSystemDbConnection(dbCredentials, config); @@ -305,6 +311,7 @@ public void setup(Properties props, props.getProperty(ClickHouseSinkConnectorConfigVariables.METRICS_ENDPOINT_PORT.toString())); // Start Debezium event loop if it is requested from REST API. + // System.out.println("setupProcessingThread"); if (!config.getBoolean(ClickHouseSinkConnectorConfigVariables.SKIP_REPLICA_START.toString()) || forceStart) { this.setupProcessingThread(config); @@ -413,6 +420,12 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr, while (numRetries < MAX_DDL_RETRIES) { try { executeDDL(clickHouseQuery.toString(), writer); + + if(loadReplicationHistoryEnable()){ + executeDDL(modifySqlStatement(transformTableName(clickHouseQuery.toString())), writer); + // executeDDL(transformTableName(clickHouseQuery.toString()), writer); + } + DebeziumOffsetManagement.acknowledgeRecords(recordCommitter, cdcRecord, lastRecordInBatch); break; } catch (Exception e) { @@ -496,6 +509,105 @@ private void executeDDL(String clickHouseQuery, BaseDbWriter writer) throws SQLE } } + /** + * Transforms the first occurrence of ALTER TABLE or CREATE TABLE in the + * original SQL by appending "_history" to the captured table name. + * + * @param original the original SQL statement + * @return the transformed SQL statement, or the original if no match was found + */ + private static String transformTableName(String original) { + // Build a regex that matches 'ALTER TABLE ' or 'CREATE TABLE ', + // captures the schema.table identifier, and asserts that the next character + // is either a space or an opening parenthesis. + Pattern pattern = Pattern.compile( + "((?:ALTER TABLE|CREATE TABLE)\\s+)" + // group(1): the clause + trailing whitespace + "([\\w\\.]+)" + // group(2): schema.table + "(?=\\s|\\()", // lookahead: ensure next char is space or '(' + Pattern.CASE_INSENSITIVE // allow case-insensitive matching + ); + + Matcher matcher = pattern.matcher(original); + if (matcher.find()) { + String clause = matcher.group(1); // e.g. "ALTER TABLE " or "CREATE TABLE " + String table = matcher.group(2); // e.g. "employees.contacts" + // Rebuild the prefix + table + "_history", then replace only the first occurrence + return matcher.replaceFirst(clause + table + "_history"); + } + + // No ALTER/CREATE TABLE match -> return unchanged + return original; + } + + /** + * Modifies a dynamically generated CREATE TABLE statement when creating TM history table: + * 1. Modifies the ENGINE from ReplacingMergeTree to MergeTree. + * 2. Adds new columns before the ENGINE clause: operation, database, table, _raw, _time, host, logfile, position, primary_host. + * 3. Keeps the original ORDER BY clause intact. + * 4. Only performs the transformation if the SQL starts with CREATE TABLE. + * + * @param createTableSql The original CREATE TABLE SQL statement. + * @return The modified CREATE TABLE SQL statement. + */ + private static String modifySqlStatement(String createTableSql) { + // 1. Only process if the SQL starts with CREATE TABLE + if (!createTableSql.trim().toUpperCase().startsWith("CREATE TABLE")) { + // If it's not CREATE TABLE, return the original SQL + return createTableSql; + } + + // 2. Locate the ENGINE=ReplacingMergeTree part in the SQL + Pattern enginePattern = Pattern.compile("(?i)ENGINE\\s*=\\s*ReplacingMergeTree\\(.*\\)"); + Matcher engineMatcher = enginePattern.matcher(createTableSql); + + if (engineMatcher.find()) { + // 3. Extract the part before ENGINE (column definitions part) + String beforeEngine = createTableSql.substring(0, engineMatcher.start()).trim(); + + // Remove the last ')' from the column definitions (before the engine part) + if (beforeEngine.endsWith(")")) { + beforeEngine = beforeEngine.substring(0, beforeEngine.length() - 1).trim(); // Remove the last ')' + } + + // 4. Locate the ORDER BY clause + Pattern orderByPattern = Pattern.compile("(?i)ORDER\\s+BY\\s+.*"); + Matcher orderByMatcher = orderByPattern.matcher(createTableSql); + String orderByClause = ""; + if (orderByMatcher.find()) { + orderByClause = createTableSql.substring(orderByMatcher.start()).trim(); // Get ORDER BY clause + } + + // 5. Add the new columns before the ENGINE part, ensuring correct placement of parentheses + String extraColumns = ", " + + OPERATION_COLUMN + " " + OPERATION_COLUMN_DATA_TYPE + ", " + + DATABASE_COLUMN + " " + DATABASE_COLUMN_DATA_TYPE + ", " + + TABLE_COLUMN + " " + TABLE_COLUMN_DATA_TYPE + ", " + + RAW_COLUMN + " " + RAW_COLUMN_DATA_TYPE + ", " + + TIME_COLUMN + " " + TIME_COLUMN_DATA_TYPE + ", " + + HOST_COLUMN + " " + HOST_COLUMN_DATA_TYPE + ", " + + LOGFILE_COLUMN + " " + LOGFILE_COLUMN_DATA_TYPE + ", " + + POSITION_COLUMN + " " + POSITION_COLUMN_DATA_TYPE + ", " + + PRIMARY_HOST_COLUMN + " " +PRIMARY_HOST_COLUMN_DATA_TYPE; + + // 6. Replace ENGINE with "ENGINE = MergeTree()" and add a closing parenthesis before ENGINE + String modifiedEngine = ") ENGINE = MergeTree()"; + + // 7. Combine the modified SQL with the new columns and the original ORDER BY clause + // Ensure the parentheses are correctly closed and combine the modified SQL + String modifiedSql = beforeEngine + extraColumns + modifiedEngine; + + // 8. Add the ORDER BY clause if it exists + if (!orderByClause.isEmpty()) { + modifiedSql += " " + orderByClause; + } + + return modifiedSql; + } else { + // Throw an exception if ENGINE=ReplacingMergeTree is not found + throw new IllegalArgumentException("The original SQL does not contain ENGINE=ReplacingMergeTree part: " + createTableSql); + } + } + /** * Updates the DDL metrics using the Metrics class. * diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java index 88e17640b..061ef4375 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java @@ -2,6 +2,8 @@ import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; import com.altinity.clickhouse.debezium.embedded.parser.DataTypeConverter; + +import static com.altinity.clickhouse.sink.connector.config.DefaultColumnDataTypeMappingConfig.loadDefaultColumnDataTypeMapping; import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*; import static org.apache.commons.lang3.StringUtils.containsIgnoreCase; @@ -297,6 +299,7 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr */ private Set parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns, StringBuilder partitionByColumns) { + List pt = ctx.children; Set columnNames = new HashSet<>(); @@ -481,6 +484,13 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree chDataType = DataTypeConverter.convertToString(this.config, columnName, scale, precision, dtc, this.userProvidedTimeZone); + Map defaultColumnDataTypeMap = loadDefaultColumnDataTypeMapping(); + + // Use a single null check with optional. + if (defaultColumnDataTypeMap != null) { + chDataType = defaultColumnDataTypeMap.getOrDefault(columnName, chDataType); + } + return chDataType; } @@ -651,6 +661,14 @@ else if(columnDefChild.getText().equalsIgnoreCase(Constants.NOT_NULL)) { } } + // Call the method to load the default column data type mapping. + /*Map defaultColumnDataTypeMap = loadDefaultColumnDataTypeMapping(); + + // Use a single null check with optional. + if (defaultColumnDataTypeMap != null) { + columnType = defaultColumnDataTypeMap.getOrDefault(columnName, columnType); + }*/ + // If column name and column type are defined, append them to the query. if (columnName != null && columnType != null) if (isNullColumn) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java index 2304ee941..8bda48950 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java @@ -380,6 +380,36 @@ public void testAlterDatabaseAddColumnNullable() { //Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(clickhouseExpectedQuery)); } + // Add column by data type mapping from 'config_schema_override' file + @Test + public void testAlterDatabaseAddColumnDataTypeMapping() { + + String addColumnNullable = "ALTER TABLE foo_new9 ADD COLUMN gmt_time3 DATETIME"; + String clickhouseExpectedQuery = "ALTER TABLE employees.foo_new9 ADD COLUMN gmt_time3 Nullable(String)"; + StringBuffer clickHouseQuery = new StringBuffer(); + mySQLDDLParserService.parseSql(addColumnNullable, "foo_new9", clickHouseQuery); + + log.info("CLICKHOUSE QUERY" + clickHouseQuery); + + Assert.assertTrue(clickHouseQuery != null && clickHouseQuery.length() != 0); + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(clickhouseExpectedQuery)); + } + + // Add or rename column by data type mapping from 'config_schema_override' file + @Test + public void testAlterDatabaseRenameColumnDataTypeMapping() { + + String addColumnNullable = "ALTER TABLE foo_new9 CHANGE COLUMN gmt_time3 gmt_time5 DATETIME;"; + // String clickhouseExpectedQuery = "ALTER TABLE employees.foo_new9 ADD COLUMN gmt_time3 Nullable(String)"; + StringBuffer clickHouseQuery = new StringBuffer(); + mySQLDDLParserService.parseSql(addColumnNullable, "employees", clickHouseQuery); + + log.info("CLICKHOUSE QUERY" + clickHouseQuery); + + Assert.assertTrue(clickHouseQuery != null && clickHouseQuery.length() != 0); + // Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(clickhouseExpectedQuery)); + } + // Before, After @Test public void testAlterDatabaseAddMultipleColumns1() { diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml index fd1f4be07..e9d76f9d1 100644 --- a/sink-connector/pom.xml +++ b/sink-connector/pom.xml @@ -442,6 +442,31 @@ 1.18.2 + + com.alibaba + fastjson + 1.2.83 + + + + + com.fasterxml.jackson.core + jackson-core + 2.12.6 + + + + com.fasterxml.jackson.core + jackson-databind + 2.12.6 + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.12.6 + + org.testcontainers testcontainers diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/ClickHouseDbConstants.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/ClickHouseDbConstants.java index f68cf7061..228983bd5 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/ClickHouseDbConstants.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/ClickHouseDbConstants.java @@ -27,6 +27,51 @@ public class ClickHouseDbConstants { */ public static final String ALTER_TABLE_DELETE_COLUMN = "delete column"; + /** + * The CREATE TABLE statement keyword. + */ + public static final String CREATE_TABLE = "CREATE TABLE"; + + /** + * Represents a nullability specification of NULL. + */ + public static final String NULL = "NULL"; + + /** + * Represents a nullability specification of NOT NULL. + */ + public static final String NOT_NULL = "NOT NULL"; + + /** + * The PRIMARY KEY fragment in a CREATE TABLE or + * ALTER TABLE statement. + */ + public static final String PRIMARY_KEY = "PRIMARY KEY"; + + /** + * The ORDER BY fragment in a CREATE TABLE or + * ALTER TABLE statement. + */ + public static final String ORDER_BY = "ORDER BY"; + + /** + * A default ORDER BY tuple clause for tables not + * needing a primary key or index. + */ + public static final String ORDER_BY_TUPLE = "ORDER BY tuple()"; + + /** + * The sign column name used in certain table engines, + * such as CollapsingMergeTree. + */ + public static final String SIGN_COLUMN = "_sign"; + + /** + * The data type of the sign column, typically an + * 8-bit signed integer. + */ + public static final String SIGN_COLUMN_DATA_TYPE = "Int8"; + /** * The version column name used in ReplacingMergeTree * tables. @@ -51,50 +96,97 @@ public class ClickHouseDbConstants { */ public static final String IS_DELETED_COLUMN_DATA_TYPE = "UInt8"; + //--- New CDC binlog columns ------------------------------------------- + /** - * The sign column name used in certain table engines, - * such as CollapsingMergeTree. + * The column name for the source database. */ - public static final String SIGN_COLUMN = "_sign"; + public static final String DATABASE_COLUMN = "database"; /** - * The data type of the sign column, typically an - * 8-bit signed integer. + * The data type of the database column. */ - public static final String SIGN_COLUMN_DATA_TYPE = "Int8"; + public static final String DATABASE_COLUMN_DATA_TYPE = "String"; /** - * The CREATE TABLE statement keyword. + * The column name for the source table. */ - public static final String CREATE_TABLE = "CREATE TABLE"; + public static final String TABLE_COLUMN = "table"; /** - * Represents a nullability specification of NULL. + * The data type of the table column. */ - public static final String NULL = "NULL"; + public static final String TABLE_COLUMN_DATA_TYPE = "String"; /** - * Represents a nullability specification of NOT NULL. + * The column name for the raw event payload. */ - public static final String NOT_NULL = "NOT NULL"; + public static final String RAW_COLUMN = "_raw"; /** - * The PRIMARY KEY fragment in a CREATE TABLE or - * ALTER TABLE statement. + * The data type of the raw column. */ - public static final String PRIMARY_KEY = "PRIMARY KEY"; + public static final String RAW_COLUMN_DATA_TYPE = "String"; /** - * The ORDER BY fragment in a CREATE TABLE or - * ALTER TABLE statement. + * The column name for the event timestamp. */ - public static final String ORDER_BY = "ORDER BY"; + public static final String TIME_COLUMN = "_time"; /** - * A default ORDER BY tuple clause for tables not - * needing a primary key or index. + * The data type of the time column. */ - public static final String ORDER_BY_TUPLE = "ORDER BY tuple()"; + public static final String TIME_COLUMN_DATA_TYPE = "UInt64"; + + /** + * The column name for the operation type. + */ + public static final String OPERATION_COLUMN = "operation"; + + /** + * The data type of the operation column. + */ + public static final String OPERATION_COLUMN_DATA_TYPE = "String"; + + /** + * The column name for the host where the event originated. + */ + public static final String HOST_COLUMN = "host"; + + /** + * The data type of the host column. + */ + public static final String HOST_COLUMN_DATA_TYPE = "String"; + + /** + * The column name for the binlog file name. + */ + public static final String LOGFILE_COLUMN = "logfile"; + + /** + * The data type of the logfile column. + */ + public static final String LOGFILE_COLUMN_DATA_TYPE = "String"; + + /** + * The column name for the position within the binlog file. + */ + public static final String POSITION_COLUMN = "position"; + + /** + * The data type of the position column. + */ + public static final String POSITION_COLUMN_DATA_TYPE = "UInt64"; + + /** + * The column name for the primary host in a master-slave setup. + */ + public static final String PRIMARY_HOST_COLUMN = "primary_host"; + + /** + * The data type of the primary_host column. + */ + public static final String PRIMARY_HOST_COLUMN_DATA_TYPE = "String"; /** * A SQL statement used to create the topic_offset_metadata @@ -113,4 +205,3 @@ public class ClickHouseDbConstants { public static final String CHECK_DB_EXISTS_SQL = "SELECT name from system.databases where name='%s'"; } - diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index 2381d3dd4..4e8dedb03 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -18,6 +18,7 @@ import java.util.LinkedHashMap; import java.util.Map; +import static com.altinity.clickhouse.sink.connector.config.ReplicationHistoryConfig.loadReplicationHistoryEnable; import static io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX; /** @@ -32,7 +33,7 @@ public class DbWriter extends BaseDbWriter { * Logger for this class, handling logs and error messages. */ private static final Logger log = LogManager.getLogger( - ClickHouseSinkConnectorConfig.class + DbWriter.class ); /** @@ -204,6 +205,16 @@ public DbWriter( useReplicatedReplacingMergeTree, rmtDeleteColumn ); + + if(loadReplicationHistoryEnable()){ + act.createHistoryTable( + record.getPrimaryKey(), + tableName+"_history", + database, + fields, + this.conn + ); + } } catch (Exception e) { log.error(String.format( "**** Error creating table(%s), database(%s) ***", diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java index ef573ba8a..3e0a7fa0b 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java @@ -31,6 +31,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import static com.altinity.clickhouse.sink.connector.config.ReplicationHistoryConfig.loadReplicationHistoryEnable; +import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*; import static com.altinity.clickhouse.sink.connector.db.batch.CdcOperation.getCdcSectionBasedOnOperation; /** @@ -139,7 +141,6 @@ public boolean addToPreparedStatementBatch(String topicName, Map Set of records // because the data will contain a mix of SQL statements(multiple columns) - if (false == executePreparedStatement(insertQuery, topicName, entry, bmd, config, conn, tableName, columnToDataTypeMap, engine)) { log.error(String.format("**** ERROR: executing prepared statement for Database(%s), " + @@ -334,9 +335,11 @@ public void insertPreparedStatement(Map columnNameToIndexMap, colName.equalsIgnoreCase(replacingMergeTreeDeleteColumn)) { // Ignore version and sign columns } else { - log.error(String.format("********** ERROR: Database(%s), Table(%s), ClickHouse column %s not present in source ************", databaseName, tableName, colName)); - log.error(String.format("********** ERROR: Database(%s), Table(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, tableName, colName)); - } + if(!loadReplicationHistoryEnable()){ + log.error(String.format("********** ERROR: Database(%s), Table(%s), ClickHouse column %s not present in source ************", databaseName, tableName, colName)); + log.error(String.format("********** ERROR: Database(%s), Table(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, tableName, colName)); + } + } ps.setNull(index, Types.OTHER); continue; } @@ -394,6 +397,7 @@ public void insertPreparedStatement(Map columnNameToIndexMap, } // Handle Version column for REPLACING_MERGE_TREE and REPLICATED_REPLACING_MERGE_TREE engines. + Long version=SnowFlakeId.generate(record.getTs_ms(), record.getGtid(), false); if (engine != null && (engine.getEngine() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine() || engine.getEngine() == DBMetadata.TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE.getEngine()) @@ -415,6 +419,96 @@ public void insertPreparedStatement(Map columnNameToIndexMap, } } + // Handle history table columns for the MERGE_TREE engine, + // using default values for all CDC metadata columns. + if (engine != null + && engine.getEngine() == DBMetadata.TABLE_ENGINE.MERGE_TREE.getEngine() + && loadReplicationHistoryEnable()) { + // prepare default values + Integer defaultIsDeleted = 0; // default isDeleted + // database + if (columnNameToDataTypeMap.containsKey(DATABASE_COLUMN) + && columnNameToIndexMap.containsKey(DATABASE_COLUMN)) { + ps.setString(columnNameToIndexMap.get(DATABASE_COLUMN), databaseName); + } + + // table + if (columnNameToDataTypeMap.containsKey(TABLE_COLUMN) + && columnNameToIndexMap.containsKey(TABLE_COLUMN)) { + ps.setString(columnNameToIndexMap.get(TABLE_COLUMN), tableName); + } + + // raw + if (columnNameToDataTypeMap.containsKey(RAW_COLUMN) + && columnNameToIndexMap.containsKey(RAW_COLUMN)) { + TableMetaDataWriter.addRawData(struct, columnNameToIndexMap.get(RAW_COLUMN), ps); + } + + // time + if (columnNameToDataTypeMap.containsKey(TIME_COLUMN) + && columnNameToIndexMap.containsKey(TIME_COLUMN)) { + ps.setLong(columnNameToIndexMap.get(TIME_COLUMN), record.getTs_ms()); + } + + // is deleted + if (columnNameToDataTypeMap.containsKey(IS_DELETED_COLUMN) + && columnNameToIndexMap.containsKey(IS_DELETED_COLUMN)) { + if (record.getCdcOperation().getOperation().equalsIgnoreCase(ClickHouseConverter.CDC_OPERATION.DELETE.getOperation())) { + ps.setInt(columnNameToIndexMap.get(IS_DELETED_COLUMN), -1); + } else if (record.getCdcOperation().getOperation().equalsIgnoreCase(ClickHouseConverter.CDC_OPERATION.UPDATE.getOperation())) { + if (beforeSection) { + ps.setInt(columnNameToIndexMap.get(IS_DELETED_COLUMN), -1); + } else { + ps.setInt(columnNameToIndexMap.get(IS_DELETED_COLUMN), 1); + } + } else { + ps.setInt(columnNameToIndexMap.get(IS_DELETED_COLUMN), defaultIsDeleted); + } + } + + // operation type + if (columnNameToDataTypeMap.containsKey(OPERATION_COLUMN) + && columnNameToIndexMap.containsKey(OPERATION_COLUMN)) { + ps.setString(columnNameToIndexMap.get(OPERATION_COLUMN), record.getCdcOperation().getOperation()); + } + + // version + if (columnNameToDataTypeMap.containsKey(VERSION_COLUMN) + && columnNameToIndexMap.containsKey(VERSION_COLUMN)) { + ps.setLong(columnNameToIndexMap.get(VERSION_COLUMN), version); + } + + // source host + if (columnNameToDataTypeMap.containsKey(HOST_COLUMN) + && columnNameToIndexMap.containsKey(HOST_COLUMN)) { + ps.setString( + columnNameToIndexMap.get(HOST_COLUMN), + record.getServerId() != null ? record.getServerId().toString() : "" + ); + } + + // logfile + if (columnNameToDataTypeMap.containsKey(LOGFILE_COLUMN) + && columnNameToIndexMap.containsKey(LOGFILE_COLUMN)) { + ps.setString(columnNameToIndexMap.get(LOGFILE_COLUMN), record.getFile()); + } + + // position + if (columnNameToDataTypeMap.containsKey(POSITION_COLUMN) + && columnNameToIndexMap.containsKey(POSITION_COLUMN)) { + ps.setLong(columnNameToIndexMap.get(POSITION_COLUMN), record.getPos()); + } + + // primary host + if (columnNameToDataTypeMap.containsKey(PRIMARY_HOST_COLUMN) + && columnNameToIndexMap.containsKey(PRIMARY_HOST_COLUMN)) { + ps.setString( + columnNameToIndexMap.get(PRIMARY_HOST_COLUMN), + record.getServerId() != null ? record.getServerId().toString() : "" + ); + } + } + // Handle Sign column to mark deletes in ReplacingMergeTree. if (this.replacingMergeTreeDeleteColumn != null && columnNameToDataTypeMap.containsKey(replacingMergeTreeDeleteColumn)) { if (columnNameToIndexMap.containsKey(replacingMergeTreeDeleteColumn) && diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 7ead49f17..15508b574 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -1,5 +1,6 @@ package com.altinity.clickhouse.sink.connector.db.operations; +import com.altinity.clickhouse.sink.connector.config.SchemaOverrideConfig; import com.altinity.clickhouse.sink.connector.db.DBMetadata; import com.clickhouse.data.ClickHouseDataType; import com.google.common.annotations.VisibleForTesting; @@ -7,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; @@ -103,22 +105,38 @@ public String createTableSyntax(ArrayList primaryKey, boolean useReplicatedReplacingMergeTree, String rmtDeleteColumn) { + // Load the schema configuration from the YAML file + SchemaOverrideConfig config = new SchemaOverrideConfig(); + String filePath = "config_schema_override.yml"; // File is inside src/main/resources + try { + config.loadTableConfigs(filePath); + } catch (IOException e) { + log.error("load schema override configs error:", e); + } + + // Get the schema configuration for the table "tr_live" in database "dbo" + SchemaOverrideConfig.Table tableConfig = config.getTableConfig(databaseName, tableName); + + // Use the primaryKey from the tableConfig if it is not empty + if (tableConfig != null && tableConfig.getPrimaryKey() != null && !tableConfig.getPrimaryKey().isEmpty()) { + primaryKey = new ArrayList<>(); + primaryKey.add(tableConfig.getPrimaryKey()); // Replace with the primary key from tableConfig + } + StringBuilder createTableSyntax = new StringBuilder(); + // Start creating the CREATE TABLE statement createTableSyntax.append(CREATE_TABLE).append(" ") .append(databaseName).append(".") .append("`").append(tableName).append("`"); - if (useReplicatedReplacingMergeTree == true) { - createTableSyntax.append(" ON CLUSTER `{cluster}` "); - } + // Add columns to the SQL createTableSyntax.append("("); - for (Field f : fields) { String colName = f.name(); String dataType = columnToDataTypesMap.get(colName); boolean isNull = false; - if (f.schema().isOptional() == true) { + if (f.schema().isOptional()) { isNull = true; } createTableSyntax.append("`").append(colName).append("`") @@ -139,6 +157,7 @@ public String createTableSyntax(ArrayList primaryKey, createTableSyntax.append(","); } + // Handle the deletion column logic String isDeletedColumn = IS_DELETED_COLUMN; if (rmtDeleteColumn != null && !rmtDeleteColumn.isEmpty()) { isDeletedColumn = rmtDeleteColumn; @@ -151,16 +170,17 @@ public String createTableSyntax(ArrayList primaryKey, createTableSyntax.append("`").append(isDeletedColumn) .append("` ").append(IS_DELETED_COLUMN_DATA_TYPE); } else { - // Append sign and version columns. createTableSyntax.append("`").append(SIGN_COLUMN) .append("` ").append(SIGN_COLUMN_DATA_TYPE) .append(","); createTableSyntax.append("`").append(VERSION_COLUMN) .append("` ").append(VERSION_COLUMN_DATA_TYPE); } + createTableSyntax.append(")"); - createTableSyntax.append(" "); + // Add the engine type + createTableSyntax.append(" "); if (isNewReplacingMergeTreeEngine == true) { if (useReplicatedReplacingMergeTree == true) { createTableSyntax.append(String.format( @@ -181,35 +201,161 @@ public String createTableSyntax(ArrayList primaryKey, .append(VERSION_COLUMN).append(")"); } } - createTableSyntax.append(" "); - if (primaryKey != null - && isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) { - createTableSyntax.append(PRIMARY_KEY).append("("); - createTableSyntax.append(primaryKey.stream() - .map(Object::toString) - .collect(Collectors.joining(","))); - createTableSyntax.append(") "); + // Add PARTITION BY if it is present + if (tableConfig != null && tableConfig.getPartitionBy() != null && !tableConfig.getPartitionBy().isEmpty()) { + createTableSyntax.append(" PARTITION BY `").append(tableConfig.getPartitionBy()).append("`"); + } + + // Handle ORDER BY clause (primary key is part of ORDER BY in ClickHouse) + createTableSyntax.append(" "); + if (primaryKey != null && isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) { createTableSyntax.append(ORDER_BY).append("("); createTableSyntax.append(primaryKey.stream() .map(Object::toString) .collect(Collectors.joining(","))); createTableSyntax.append(")"); } else { - // TODO: Define a default ORDER BY clause. + // Default ORDER BY clause createTableSyntax.append(ORDER_BY_TUPLE); } + + // Add SETTINGS if they are provided (SETTINGS should be placed last) + if (tableConfig != null && tableConfig.getSettings() != null && !tableConfig.getSettings().isEmpty()) { + createTableSyntax.append(" SETTINGS ").append(tableConfig.getSettings()); + } + return createTableSyntax.toString(); } /** - * Checks if all primary key columns are present in the column-to-data - * type map. + * Creates a history table with MergeTree engine, including + * CDC metadata columns such as database, table, raw payload, + * event time, operation type, host, logfile, position, and primary host. * - * @param primaryKeys a list of primary key column names - * @param columnToDataTypesMap a map of column names to data types - * @return true if all primary key columns are present; false otherwise + * @param primaryKey list of primary key column names + * @param historyTableName name of the history table to create + * @param databaseName name of the database in which the history table is created + * @param fields array of Kafka Connect fields + * @param connection JDBC connection to the ClickHouse database + * @throws SQLException if a SQL exception occurs during table creation */ + public void createHistoryTable(ArrayList primaryKey, + String historyTableName, + String databaseName, + Field[] fields, + Connection connection) + throws SQLException { + Map columnToDataTypesMap = + this.getColumnNameToCHDataTypeMapping(fields); + String sql = createHistoryTableSyntax( + primaryKey, historyTableName, + databaseName, fields, columnToDataTypesMap); + log.info(String.format( + "**** AUTO CREATE HISTORY TABLE for database(%s), Query :%s)", + databaseName, sql)); + DBMetadata metadata = new DBMetadata(); + metadata.executeSystemQuery(connection, sql); + } + + /** + * Builds the CREATE TABLE SQL syntax for a history table, + * adding CDC metadata columns for database, table, raw payload, + * time, operation, host, logfile, position, and primary host. + * + * @param primaryKey list of primary key column names + * @param historyTableName name of the history table to create + * @param databaseName name of the database in which the history table is created + * @param fields array of Kafka Connect fields + * @param columnToDataTypesMap map of column names to ClickHouse data types + * @return SQL statement string for creating the history table + */ + public String createHistoryTableSyntax(ArrayList primaryKey, + String historyTableName, + String databaseName, + Field[] fields, + Map columnToDataTypesMap) { + SchemaOverrideConfig config = new SchemaOverrideConfig(); + try { + config.loadTableConfigs("config_schema_override.yml"); + } catch (IOException e) { + log.error("load schema override configs error:", e); + } + SchemaOverrideConfig.Table tableConfig = + config.getTableConfig(databaseName, historyTableName.replaceAll("_history$", "")); + + // Use the primaryKey from the tableConfig if it is not empty + if (tableConfig != null && tableConfig.getPrimaryKey() != null && !tableConfig.getPrimaryKey().isEmpty()) { + primaryKey = new ArrayList<>(); + primaryKey.add(tableConfig.getPrimaryKey()); // Replace with the primary key from tableConfig + } + + StringBuilder sb = new StringBuilder(); + sb.append(CREATE_TABLE) + .append(' ').append(databaseName) + .append(".`").append(historyTableName).append("`("); + + for (Field f : fields) { + String col = f.name(); + String dt = columnToDataTypesMap.get(col); + sb.append('`').append(col).append("` ") + .append(dt) + .append(f.schema().isOptional() ? ' ' + NULL : ' ' + NOT_NULL) + .append(','); + } + sb.append('`').append(DATABASE_COLUMN).append("` ") + .append(DATABASE_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(TABLE_COLUMN).append("` ") + .append(TABLE_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(RAW_COLUMN).append("` ") + .append(RAW_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(TIME_COLUMN).append("` ") + .append(TIME_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(IS_DELETED_COLUMN).append("` ") + .append(IS_DELETED_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(OPERATION_COLUMN).append("` ") + .append(OPERATION_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(VERSION_COLUMN).append("` ") + .append(VERSION_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(HOST_COLUMN).append("` ") + .append(HOST_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(LOGFILE_COLUMN).append("` ") + .append(LOGFILE_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(POSITION_COLUMN).append("` ") + .append(POSITION_COLUMN_DATA_TYPE).append(','); + sb.append('`').append(PRIMARY_HOST_COLUMN).append("` ") + .append(PRIMARY_HOST_COLUMN_DATA_TYPE); + sb.append(") ENGINE = MergeTree()"); + + if (tableConfig != null && + tableConfig.getPartitionBy() != null && + !tableConfig.getPartitionBy().isEmpty()) { + sb.append(" PARTITION BY `") + .append(tableConfig.getPartitionBy()) + .append("`"); + } + + sb.append(' '); + if (primaryKey != null && + isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) { + sb.append(ORDER_BY) + .append("(") + .append(primaryKey.stream() + .collect(Collectors.joining(","))) + .append(")"); + } else { + sb.append(ORDER_BY_TUPLE); + } + + if (tableConfig != null && + tableConfig.getSettings() != null && + !tableConfig.getSettings().isEmpty()) { + sb.append(" SETTINGS ") + .append(tableConfig.getSettings()); + } + return sb.toString(); + } + @VisibleForTesting boolean isPrimaryKeyColumnPresent(ArrayList primaryKeys, Map columnToDataTypesMap) { diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java index 9232fd360..245106d2b 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java @@ -14,6 +14,8 @@ import java.util.HashMap; import java.util.Map; +import static com.altinity.clickhouse.sink.connector.config.DefaultColumnDataTypeMappingConfig.loadDefaultColumnDataTypeMapping; + /** * Provides base operations to handle ClickHouse table creation and data-type * mapping. This class contains logic to map Kafka Connect {@link Schema} @@ -167,6 +169,21 @@ public Map getColumnNameToCHDataTypeMapping(Field[] fields) { + type.getName() + "SCHEMA NAME:" + schemaName); } } + + // Call the method to load the default column data type mapping. + Map defaultColumnDataTypeMap = loadDefaultColumnDataTypeMapping(); + + // Iterate over columnToDataTypesMap using entrySet for efficient access to keys and values + for (Map.Entry entry : columnToDataTypesMap.entrySet()) { + String key = entry.getKey(); // Get the current key from columnToDataTypesMap + // Check if defaultColumnDataTypeMap contains the key + if (defaultColumnDataTypeMap.containsKey(key)) { + // If defaultColumnDataTypeMap contains the key, update columnToDataTypesMap's value + // with the corresponding value from defaultColumnDataTypeMap + entry.setValue(defaultColumnDataTypeMap.get(key)); + } + } + return columnToDataTypesMap; } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index d9a90bda8..b2b65e524 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -1,5 +1,6 @@ package com.altinity.clickhouse.sink.connector.executor; +import com.alibaba.fastjson.JSONObject; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.common.Metrics; @@ -24,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import static com.altinity.clickhouse.sink.connector.config.ReplicationHistoryConfig.loadReplicationHistoryEnable; + /** * Runnable object that will be called on a schedule to perform the * batch insert of records to ClickHouse. @@ -266,8 +269,16 @@ public void run() { // topic name syntax is server.database.table for (Map.Entry> entry : topicToRecordsMap.entrySet()) { + result = processRecordsByTopic(entry.getKey(), entry.getValue()); + + // insert history data + if(loadReplicationHistoryEnable()){ + processRecordsByTopic(entry.getKey()+"_history", + entry.getValue()); + } + if (result == false) { log.error("Error processing records for topic: " + entry.getKey()); diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java index 860bb4687..5922ea48c 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java @@ -187,6 +187,7 @@ private DBCredentials parseDBConfiguration() { * @param records a list of ClickHouseStruct records to persist */ public void persistRecords(List records) { + // System.out.println("3 persistRecords"); log.info("****** Thread: " + Thread.currentThread().getName() + " Batch Size: " + records.size() + diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java index 4dab92eed..5ef792c06 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java @@ -140,6 +140,19 @@ public void testCreateTableSyntax() { //Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY (customerName)")); } + @Test + public void testCreateMergeTreeHistoryTableSyntax() { + ArrayList primaryKeys = new ArrayList<>(); + primaryKeys.add("customerName"); + + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + + String query = act.createHistoryTableSyntax(primaryKeys, "auto_create_table_history", "employees", + createFields(), this.columnToDataTypesMap); + System.out.println("QUERY" + query); + Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE employees.`auto_create_table_history`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON NOT NULL,`max_amount` Float64 NOT NULL,`database` String,`table` String,`_raw` String,`_time` UInt64,`is_deleted` UInt8,`operation` String,`_version` UInt64,`host` String,`logfile` String,`position` UInt64,`primary_host` String) ENGINE = MergeTree() ORDER BY(customerName")); + } + @Test public void testCreateTableEmptyPrimaryKey() { @@ -176,14 +189,41 @@ public void testCreateNewTable() { String database = "test"; String userName = clickHouseContainer.getUsername(); String password = clickHouseContainer.getPassword(); - String tableName = "employees"; - + String tableName = "employees5"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); + DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, + new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); + + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + ArrayList primaryKeys = new ArrayList<>(); + primaryKeys.add("customerName"); + + try { + act.createNewTable(primaryKeys, tableName, "test", this.createFields(), writer.getConnection(), + false, false, null); + } catch(SQLException se) { + Assert.assertTrue(false); + } + } + + @Test + @Tag("IntegrationTest") + @Disabled + public void testCreateMergeTreeHistoryTable() { + String dbHostName = clickHouseContainer.getHost(); + Integer port = clickHouseContainer.getFirstMappedPort(); + String database = "test"; + String userName = clickHouseContainer.getUsername(); + String password = clickHouseContainer.getPassword(); + String tableName = "employees5_history"; + String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); @@ -193,8 +233,7 @@ public void testCreateNewTable() { primaryKeys.add("customerName"); try { - act.createNewTable(primaryKeys, "auto_create_table", "default", this.createFields(), writer.getConnection(), - false, false, null); + act.createHistoryTable(primaryKeys, tableName, "test", this.createFields(), writer.getConnection()); } catch(SQLException se) { Assert.assertTrue(false); } From 09f245cf26a37ce61a84fdda9dacf24d040c93a6 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 20 May 2025 12:04:24 -0400 Subject: [PATCH 2/2] Added logic to write history tables in a separate connector mode. --- ...ableSchemaOverrideByDataTypeMappingIT.java | 168 +++++++++++ ...ableSchemaOverrideByDataTypeMappingIT.java | 135 +++++++++ ...eeHistoryTableWithAdditionalColumnsIT.java | 274 ++++++++++++++++++ .../DefaultColumnDataTypeMappingConfig.java | 65 +++++ .../config/ReplicationHistoryConfig.java | 90 ++++++ .../config/SchemaOverrideConfig.java | 116 ++++++++ .../main/resources/config_schema_override.yml | 42 +++ ...efaultColumnDataTypeMappingConfigTest.java | 21 ++ .../config/SchemaOverrideConfigTest.java | 77 +++++ 9 files changed, 988 insertions(+) create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableSchemaOverrideByDataTypeMappingIT.java create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableSchemaOverrideByDataTypeMappingIT.java create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MergeTreeHistoryTableWithAdditionalColumnsIT.java create mode 100644 sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfig.java create mode 100644 sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/ReplicationHistoryConfig.java create mode 100644 sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfig.java create mode 100644 sink-connector/src/main/resources/config_schema_override.yml create mode 100644 sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfigTest.java create mode 100644 sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfigTest.java diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableSchemaOverrideByDataTypeMappingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableSchemaOverrideByDataTypeMappingIT.java new file mode 100644 index 000000000..128fd181b --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableSchemaOverrideByDataTypeMappingIT.java @@ -0,0 +1,168 @@ +package com.altinity.clickhouse.debezium.embedded.ddl.parser; + +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.DBMetadata; +import com.altinity.clickhouse.sink.connector.db.HikariDbSource; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.connectToMySQL; +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; + +@Testcontainers +@DisplayName("Integration test validating table alter (add column) with schema overrides by YAML data‑type mappings") +public class AlterTableSchemaOverrideByDataTypeMappingIT { + + protected MySQLContainer mySqlContainer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") + // .withInitScript("data_types.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(25000); + } + + @AfterEach + public void tearDown() { + mySqlContainer.stop(); + clickHouseContainer.stop(); + } + + @Test + public void testMySQLAddColumnsOverridesByDataTypeMapping() throws Exception { + // Start Debezium Change Event Capture in a separate thread + AtomicReference engine = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(props, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + // Give Debezium time to initialize + Thread.sleep(30000); + + // Connect to MySQL and create the `contacts` table + Connection conn = connectToMySQL(mySqlContainer); + conn.prepareStatement( + "CREATE TABLE employees.contacts (\n" + + " id INT AUTO_INCREMENT PRIMARY KEY NOT NULL,\n" + + " first_name VARCHAR(50) NOT NULL,\n" + + " last_name VARCHAR(50) NOT NULL,\n" + + " fullname VARCHAR(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)) STORED,\n" + + " email VARCHAR(100) NOT NULL,\n" + + " gmt_time DATETIME NOT NULL\n" + + ");" + ).execute(); + // Wait for the table creation to propagate + Thread.sleep(30000); + + // Alter the table to add two new DATETIME columns + conn.prepareStatement( + "ALTER TABLE employees.contacts\n" + + " ADD COLUMN gmt_time3 DATETIME NOT NULL,\n" + + " ADD COLUMN gmt_time4 DATETIME NOT NULL;" + ).execute(); + // Allow the schema change to take effect + Thread.sleep(20000); + + // Insert a row including values for the new columns + conn.prepareStatement( + "INSERT INTO employees.contacts (" + + " first_name, last_name, email, gmt_time, gmt_time3, gmt_time4" + + ") VALUES (" + + " 'John', 'Doe', 'john.doe@gmail.com'," + + " '2025-04-10 12:34:56'," + + " '2025-04-10 12:35:56'," + + " '2025-04-10 12:36:56'" + + ");" + ).execute(); + // Give Debezium time to capture the insert event + Thread.sleep(20000); + + // Obtain ClickHouse writer and metadata utility + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + DBMetadata dbMetadata = new DBMetadata(); + Map columnsToDataTypeMap = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "contacts", "employees" + ); + + // Assert that the two new DATETIME columns map to ClickHouse String type + + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time3").equalsIgnoreCase("Nullable(String)")); + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time4").equalsIgnoreCase("Nullable(String)")); + + // Query ClickHouse to verify the actual stored values (converted to UTC) + ResultSet resultSet = ITCommon.executeQueryWithResultSet( + "SELECT gmt_time, gmt_time3, gmt_time4 FROM employees.contacts", + writer.getConnection() + ); + boolean insertCheck = false; + + while (resultSet.next()) { + insertCheck = true; + String gmtTime = resultSet.getString("gmt_time"); + System.out.println(gmtTime); + + String gmtTime3 = resultSet.getString("gmt_time3"); + System.out.println(gmtTime3); + + String gmtTime4 = resultSet.getString("gmt_time4"); + System.out.println(gmtTime4); + + Assert.assertEquals("2025-04-10 07:34:56.000", resultSet.getString("gmt_time")); + Assert.assertEquals("2025-04-10 07:35:56.000", resultSet.getString("gmt_time3")); + Assert.assertEquals("2025-04-10 07:36:56.000", resultSet.getString("gmt_time4")); + } + + // Clean up resources: close connections and stop background services + writer.getConnection().close(); + if (engine.get() != null) { + engine.get().stop(); + } + executorService.shutdown(); + HikariDbSource.close(); + } +} diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableSchemaOverrideByDataTypeMappingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableSchemaOverrideByDataTypeMappingIT.java new file mode 100644 index 000000000..2dbea4e77 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableSchemaOverrideByDataTypeMappingIT.java @@ -0,0 +1,135 @@ +package com.altinity.clickhouse.debezium.embedded.ddl.parser; + +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.DBMetadata; +import com.altinity.clickhouse.sink.connector.db.HikariDbSource; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.*; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.connectToMySQL; +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; + +@Testcontainers +@DisplayName("Integration test validating table creation with schema overrides by YAML data‑type mappings") +public class CreateTableSchemaOverrideByDataTypeMappingIT { + + protected MySQLContainer mySqlContainer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") + // .withInitScript("data_types.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(25000); + } + + @AfterEach + public void tearDown() { + mySqlContainer.stop(); + clickHouseContainer.stop(); + } + + @Test + public void testMySQLGeneratedColumnsByDataTypeMapping() throws Exception { + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(props, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(30000); + + Connection conn = connectToMySQL(mySqlContainer); + + conn.prepareStatement("\n" + + "CREATE TABLE employees.contacts (id INT AUTO_INCREMENT PRIMARY KEY NOT NULL,\n" + + "first_name VARCHAR(50) NOT NULL,\n" + + "last_name VARCHAR(50) NOT NULL,\n" + + "fullname varchar(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),\n" + + "email VARCHAR(100) NOT NULL,\n" + + "gmt_time DATETIME NOT NULL);\n").execute(); + + Thread.sleep(30000); + + conn.prepareStatement("insert into contacts(first_name, last_name, email , gmt_time) values('John', 'Doe', 'john.doe@gmail.com','2025-04-10 12:34:56')").execute(); + Thread.sleep(20000); + + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + DBMetadata dbMetadata = new DBMetadata(); + Map columnsToDataTypeMap = dbMetadata.getColumnsDataTypesForTable(writer.getConnection(), "contacts", "employees"); + + Assert.assertTrue(columnsToDataTypeMap.get("id").equalsIgnoreCase("Int32")); + Assert.assertTrue(columnsToDataTypeMap.get("first_name").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("last_name").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("fullname").equalsIgnoreCase("Nullable(String)")); + Assert.assertTrue(columnsToDataTypeMap.get("email").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time").equalsIgnoreCase("String")); + + ResultSet resultSet = ITCommon.executeQueryWithResultSet("select gmt_time from employees.contacts", writer.getConnection()); + boolean insertCheck = false; + while (resultSet.next()) { + insertCheck = true; + String gmtTime = resultSet.getString("gmt_time"); + System.out.println(gmtTime); + Assert.assertTrue(gmtTime.equalsIgnoreCase("2025-04-10 07:34:56.000")); + } + Thread.sleep(10000); + + Assert.assertTrue(insertCheck); + writer.getConnection().close(); + + Thread.sleep(10000); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + HikariDbSource.close(); + } +} diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MergeTreeHistoryTableWithAdditionalColumnsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MergeTreeHistoryTableWithAdditionalColumnsIT.java new file mode 100644 index 000000000..a1b8537e4 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MergeTreeHistoryTableWithAdditionalColumnsIT.java @@ -0,0 +1,274 @@ +package com.altinity.clickhouse.debezium.embedded.ddl.parser; + +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.DBMetadata; +import com.altinity.clickhouse.sink.connector.db.HikariDbSource; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.connectToMySQL; +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; + +@Testcontainers +@DisplayName("Integration test adding other columns in MergeTree history table") +public class MergeTreeHistoryTableWithAdditionalColumnsIT { + + protected MySQLContainer mySqlContainer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") + // .withInitScript("data_types.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(25000); + } + + @AfterEach + public void tearDown() { + mySqlContainer.stop(); + clickHouseContainer.stop(); + } + + + /** + * Verifies that the MergeTree history table: + * Includes the following additional metadata columns: + additional columns: + - database, + - table, raw , time, + - is_deleted, operation, version, host, logfile, position, primary_host + */ + @Test + public void testCreateMergeTreeHistoryTableWithAdditionalColumns() throws Exception { + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(props, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(30000); + + Connection conn = connectToMySQL(mySqlContainer); + + conn.prepareStatement("\n" + + "CREATE TABLE employees.contacts (id INT AUTO_INCREMENT PRIMARY KEY NOT NULL,\n" + + "first_name VARCHAR(50) NOT NULL,\n" + + "last_name VARCHAR(50) NOT NULL,\n" + + "fullname varchar(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),\n" + + "email VARCHAR(100) NOT NULL,\n" + + "gmt_time DATETIME NOT NULL);\n").execute(); + + Thread.sleep(30000); + + conn.prepareStatement("insert into contacts(first_name, last_name, email , gmt_time) values('John', 'Doe', 'john.doe@gmail.com','2025-04-10 12:34:56')").execute(); + Thread.sleep(20000); + + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + DBMetadata dbMetadata = new DBMetadata(); + Map columnsToDataTypeMap = dbMetadata.getColumnsDataTypesForTable(writer.getConnection(), "contacts_history", "employees"); + + Assert.assertTrue(columnsToDataTypeMap.get("id").equalsIgnoreCase("Int32")); + Assert.assertTrue(columnsToDataTypeMap.get("first_name").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("last_name").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("fullname").equalsIgnoreCase("Nullable(String)")); + Assert.assertTrue(columnsToDataTypeMap.get("email").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("database").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("table").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("_raw").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("_time").equalsIgnoreCase("UInt64")); + Assert.assertTrue(columnsToDataTypeMap.get("is_deleted").equalsIgnoreCase("UInt8")); + Assert.assertTrue(columnsToDataTypeMap.get("operation").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("_version").equalsIgnoreCase("UInt64")); + Assert.assertTrue(columnsToDataTypeMap.get("host").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("logfile").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("position").equalsIgnoreCase("UInt64")); + Assert.assertTrue(columnsToDataTypeMap.get("primary_host").equalsIgnoreCase("String")); + + ResultSet resultSet = ITCommon.executeQueryWithResultSet("select database,table,_raw,is_deleted,operation,host,logfile from employees.contacts_history", writer.getConnection()); + boolean insertCheck = false; + while (resultSet.next()) { + insertCheck = true; + Assert.assertTrue(resultSet.getString("database").equalsIgnoreCase("employees")); + Assert.assertTrue(resultSet.getString("table").equalsIgnoreCase("contacts_history")); + Assert.assertTrue(resultSet.getString("_raw").equalsIgnoreCase("{\"last_name\":\"Doe\",\"id\":1,\"fullname\":\"John Doe\",\"first_name\":\"John\",\"gmt_time\":1744288496000,\"email\":\"john.doe@gmail.com\"}")); + Assert.assertTrue(resultSet.getString("is_deleted").equalsIgnoreCase("0")); + Assert.assertTrue(resultSet.getString("operation").equalsIgnoreCase("C")); + Assert.assertTrue(resultSet.getString("host").equalsIgnoreCase("1")); + Assert.assertTrue(resultSet.getString("logfile").equalsIgnoreCase("binlog.000003")); + } + Thread.sleep(10000); + + Assert.assertTrue(insertCheck); + writer.getConnection().close(); + + Thread.sleep(10000); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + HikariDbSource.close(); + } + + @Test + public void testAlterMergeTreeHistoryTableWithAdditionalColumns() throws Exception { + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(props, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(30000); + + // Connect to MySQL and create the `contacts` table + Connection conn = connectToMySQL(mySqlContainer); + conn.prepareStatement( + "CREATE TABLE employees.contacts (\n" + + " id INT AUTO_INCREMENT PRIMARY KEY NOT NULL,\n" + + " first_name VARCHAR(50) NOT NULL,\n" + + " last_name VARCHAR(50) NOT NULL,\n" + + " fullname VARCHAR(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)) STORED,\n" + + " email VARCHAR(100) NOT NULL,\n" + + " gmt_time DATETIME NOT NULL\n" + + ");" + ).execute(); + // Wait for the table creation to propagate + Thread.sleep(30000); + + // Alter the table to add two new DATETIME columns + conn.prepareStatement( + "ALTER TABLE employees.contacts\n" + + " ADD COLUMN gmt_time3 DATETIME NOT NULL,\n" + + " ADD COLUMN gmt_time4 DATETIME NOT NULL;" + ).execute(); + // Allow the schema change to take effect + Thread.sleep(20000); + + // Insert a row including values for the new columns + conn.prepareStatement( + "INSERT INTO employees.contacts (" + + " first_name, last_name, email, gmt_time, gmt_time3, gmt_time4" + + ") VALUES (" + + " 'John', 'Doe', 'john.doe@gmail.com'," + + " '2025-04-10 12:34:56'," + + " '2025-04-10 12:35:56'," + + " '2025-04-10 12:36:56'" + + ");" + ).execute(); + // Give Debezium time to capture the insert event + Thread.sleep(20000); + + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + DBMetadata dbMetadata = new DBMetadata(); + Map columnsToDataTypeMap = dbMetadata.getColumnsDataTypesForTable(writer.getConnection(), "contacts_history", "employees"); + + Assert.assertTrue(columnsToDataTypeMap.get("id").equalsIgnoreCase("Int32")); + Assert.assertTrue(columnsToDataTypeMap.get("first_name").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("last_name").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("fullname").equalsIgnoreCase("Nullable(String)")); + Assert.assertTrue(columnsToDataTypeMap.get("email").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("database").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("table").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("_raw").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("_time").equalsIgnoreCase("UInt64")); + Assert.assertTrue(columnsToDataTypeMap.get("is_deleted").equalsIgnoreCase("UInt8")); + Assert.assertTrue(columnsToDataTypeMap.get("operation").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("_version").equalsIgnoreCase("UInt64")); + Assert.assertTrue(columnsToDataTypeMap.get("host").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("logfile").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("position").equalsIgnoreCase("UInt64")); + Assert.assertTrue(columnsToDataTypeMap.get("primary_host").equalsIgnoreCase("String")); + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time3").equalsIgnoreCase("Nullable(String)")); + Assert.assertTrue(columnsToDataTypeMap.get("gmt_time4").equalsIgnoreCase("Nullable(String)")); + + ResultSet resultSet = ITCommon.executeQueryWithResultSet("select database,table,_raw,is_deleted,operation,host,logfile,gmt_time,gmt_time3,gmt_time4 from employees.contacts_history", writer.getConnection()); + boolean insertCheck = false; + while (resultSet.next()) { + insertCheck = true; + Assert.assertTrue(resultSet.getString("database").equalsIgnoreCase("employees")); + Assert.assertTrue(resultSet.getString("table").equalsIgnoreCase("contacts_history")); + Assert.assertTrue(resultSet.getString("_raw").equalsIgnoreCase("{\"last_name\":\"Doe\",\"gmt_time3\":1744288556000,\"gmt_time4\":1744288616000,\"id\":1,\"fullname\":\"John Doe\",\"first_name\":\"John\",\"gmt_time\":1744288496000,\"email\":\"john.doe@gmail.com\"}")); + Assert.assertTrue(resultSet.getString("is_deleted").equalsIgnoreCase("0")); + Assert.assertTrue(resultSet.getString("operation").equalsIgnoreCase("C")); + Assert.assertTrue(resultSet.getString("host").equalsIgnoreCase("1")); + Assert.assertTrue(resultSet.getString("logfile").equalsIgnoreCase("binlog.000003")); + Assert.assertTrue(resultSet.getString("gmt_time").equalsIgnoreCase("2025-04-10 07:34:56.000")); + Assert.assertTrue(resultSet.getString("gmt_time3").equalsIgnoreCase("2025-04-10 07:35:56.000")); + Assert.assertTrue(resultSet.getString("gmt_time4").equalsIgnoreCase("2025-04-10 07:36:56.000")); + } + Thread.sleep(10000); + + Assert.assertTrue(insertCheck); + writer.getConnection().close(); + + Thread.sleep(10000); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + HikariDbSource.close(); + } +} diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfig.java new file mode 100644 index 000000000..dcf653486 --- /dev/null +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfig.java @@ -0,0 +1,65 @@ +package com.altinity.clickhouse.sink.connector.config; + +import org.yaml.snakeyaml.Yaml; +import java.io.InputStream; +import java.util.Map; + +/** + * The {@code DefaultColumnDataTypeMappingConfig} class is responsible for loading the + * "default_column_datatype_mapping" section from a YAML configuration file and returning it as a Map. + *

+ * This mapping is used to configure the default data type for columns in the ClickHouse sink connector. + * The YAML file should reside in the src/main/resources directory and be named + * config_schema_override.yml. + *

+ */ +public class DefaultColumnDataTypeMappingConfig { + + /** + * Loads the "default_column_datatype_mapping" section from config_schema_override.yml and returns it as a Map. + *

+ * This method performs the following steps: + *

    + *
  • Creates a new instance of the SnakeYAML {@code Yaml} class.
  • + *
  • Loads the YAML file ("config_schema_override.yml") from the classpath. Ensure that the file is placed + * under the src/main/resources directory so that it is available at runtime.
  • + *
  • Parses the entire YAML file into a generic {@code Map}.
  • + *
  • Extracts the value corresponding to the key "default_column_datatype_mapping".
  • + *
  • Casts the extracted object to {@code Map} if it matches the expected format.
  • + *
  • If the file is not found or the extracted section is not of the expected format, a {@code RuntimeException} + * is thrown.
  • + *
+ * + * @return a {@code Map} that contains the default column data type mapping. + * @throws RuntimeException if the YAML file is not found or the mapping is in an unexpected format. + */ + public static Map loadDefaultColumnDataTypeMapping() { + // Create an instance of SnakeYAML. + Yaml yaml = new Yaml(); + + // Load the YAML file from the classpath. + // The file "config_schema_override.yml" should reside in the src/main/resources directory. + InputStream inputStream = DefaultColumnDataTypeMappingConfig.class.getClassLoader() + .getResourceAsStream("config_schema_override.yml"); + if (inputStream == null) { + throw new RuntimeException("Unable to find config_schema_override.yml file. Please check the resource path!"); + } + + // Parse the entire YAML file into a Map object. + // The resulting map (yamlData) contains keys corresponding to the top-level keys in the YAML file. + Map yamlData = yaml.load(inputStream); + + // Retrieve the object corresponding to the "default_column_datatype_mapping" key. + // This object should be a Map that represents the mapping of column names to their respective data types. + Object mappingObj = yamlData.get("default_column_datatype_mapping"); + if (mappingObj instanceof Map) { + @SuppressWarnings("unchecked") + Map mapping = (Map) mappingObj; + return mapping; + } else { + // If the object is not a Map, throw an exception indicating an unexpected YAML structure. + throw new RuntimeException("The 'default_column_datatype_mapping' section is not in the expected format."); + } + } +} + diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/ReplicationHistoryConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/ReplicationHistoryConfig.java new file mode 100644 index 000000000..61d62a4e2 --- /dev/null +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/ReplicationHistoryConfig.java @@ -0,0 +1,90 @@ +package com.altinity.clickhouse.sink.connector.config; + +import org.yaml.snakeyaml.Yaml; +import java.io.InputStream; +import java.util.Map; + +/** + * The {@code ReplicationHistoryConfig} class is responsible for loading the + * "replication.history.enable" value from a YAML configuration file and returning it as a boolean. + *

+ * The YAML file should reside in the src/main/resources directory + * (e.g., application.yml or a custom file like replication.yml). + *

+ */ +public class ReplicationHistoryConfig { + + // Name of the YAML file on the classpath + private static final String YAML_FILE = "config_schema_override.yml"; // or "replication.yml" + + /** + * Loads the "replication.history.enable" flag from the YAML file. + *

+ * Steps: + *

    + *
  1. Create a SnakeYAML {@code Yaml} instance.
  2. + *
  3. Load the YAML file from the classpath.
  4. + *
  5. Parse it into a {@code Map}.
  6. + *
  7. Navigate through the nested keys: "replication" → "history" → "enable".
  8. + *
  9. Cast the result to {@code Boolean} (or parse from String if necessary).
  10. + *
+ * + * @return the boolean value of {@code replication.history.enable}. + * @throws RuntimeException if the file is missing or the structure is invalid. + */ + public static boolean loadReplicationHistoryEnable() { + Yaml yaml = new Yaml(); + InputStream in = ReplicationHistoryConfig.class.getClassLoader() + .getResourceAsStream(YAML_FILE); + if (in == null) { + throw new RuntimeException("Cannot find " + YAML_FILE + " in classpath"); + } + + // Parse the top-level YAML content into a Map + @SuppressWarnings("unchecked") + Map root = yaml.load(in); + if (root == null) { + throw new RuntimeException(YAML_FILE + " is empty or malformed"); + } + + // Retrieve the 'replication' section + Object repObj = root.get("replication"); + if (!(repObj instanceof Map)) { + throw new RuntimeException("'replication' section is missing or not a map"); + } + @SuppressWarnings("unchecked") + Map replicationMap = (Map) repObj; + + // Retrieve the 'history' subsection + Object histObj = replicationMap.get("history"); + if (!(histObj instanceof Map)) { + throw new RuntimeException("'replication.history' section is missing or not a map"); + } + @SuppressWarnings("unchecked") + Map historyMap = (Map) histObj; + + // Retrieve the 'enable' flag + Object enableObj = historyMap.get("enable"); + if (enableObj == null) { + throw new RuntimeException("'replication.history.enable' is not defined"); + } + // Return boolean if it's already a Boolean + if (enableObj instanceof Boolean) { + return (Boolean) enableObj; + } + // Support string values "true"/"false" + if (enableObj instanceof String) { + return Boolean.parseBoolean((String) enableObj); + } + + throw new RuntimeException( + "'replication.history.enable' has an unsupported type: " + enableObj.getClass().getName() + ); + } + + public static void main(String[] args) { + // Test reading the flag + boolean enabled = loadReplicationHistoryEnable(); + System.out.println("replication.history.enable = " + enabled); + } +} diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfig.java new file mode 100644 index 000000000..1dccc9640 --- /dev/null +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfig.java @@ -0,0 +1,116 @@ +package com.altinity.clickhouse.sink.connector.config; + +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +/** + * SchemaOverrideConfig loads the schema configuration from a YAML file. + * It allows querying the configuration based on database and table name. + */ +public class SchemaOverrideConfig { + + private Map>> databases; + // private Map> databases; + /** + * Loads the table configurations from the provided YAML file. + * @param filePath the path to the YAML file in the classpath (resources) + * @throws IOException if there is an error loading the YAML file + */ + public void loadTableConfigs(String filePath) throws IOException { + // Load the file from the classpath + InputStream inputStream = getClass().getClassLoader().getResourceAsStream(filePath); + if (inputStream == null) { + throw new IOException("File not found in classpath: " + filePath); + } + + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + // Load the entire structure of the YAML into the 'databases' map + databases = mapper.readValue(inputStream, Map.class); + } + + /** + * Retrieves the schema configuration for a specific database and table. + * @param databaseName the name of the database + * @param tableName the name of the table + * @return the schema configuration for the table, or null if not found + */ + public Table getTableConfig(String databaseName, String tableName) { + // Retrieve the configuration for the database + Map> databaseAll = databases.get("databases"); + + // Get the tables map for the specified database + Map tables = databaseAll.get(databaseName); + /*if(tables==null){ + return null; + }*/ + + String jsonString=JSONObject.toJSONString(tables); + + JSONObject jsonObject = JSONObject.parseObject(jsonString); + if(jsonObject==null){ + return null; + } + JSONObject tablesObject = jsonObject.getJSONObject("tables"); + if(tablesObject==null){ + return null; + } + + // Convert the JSONObject to a Map with String as key and Table as value + Map> tableMap = tablesObject.toJavaObject(Map.class); + Map subTableMap=tableMap.get(tableName); + JSONObject jsonObjectTable=(JSONObject) JSONObject.toJSON(subTableMap); + if(jsonObjectTable==null){ + return null; + } + + Table table=jsonObjectTable.toJavaObject(Table.class); + + return table; + } + + /** + * A nested class representing the schema configuration for each table. + * It holds partitioning, primary key, and settings information. + */ + public static class Table { + private String partitionBy; + private String primaryKey; + private String settings; + + // Getter methods + public String getPartitionBy() { + return partitionBy; + } + + public String getPrimaryKey() { + return primaryKey; + } + + public String getSettings() { + return settings; + } + + // Setter methods + public void setPartitionBy(String partitionBy) { + this.partitionBy = partitionBy; + } + + public void setPrimaryKey(String primaryKey) { + this.primaryKey = primaryKey; + } + + public void setSettings(String settings) { + this.settings = settings; + } + + @Override + public String toString() { + return "partition_by: " + partitionBy + ", primary_key: " + primaryKey + ", settings: " + settings; + } + } +} diff --git a/sink-connector/src/main/resources/config_schema_override.yml b/sink-connector/src/main/resources/config_schema_override.yml new file mode 100644 index 000000000..78bf06e6a --- /dev/null +++ b/sink-connector/src/main/resources/config_schema_override.yml @@ -0,0 +1,42 @@ +default_column_datatype_mapping: + # we are no longer turning Date/DateTime/Timestamp as a String + transaction_id: String + exchange_transaction_id: String + unique_transaction_id: String + account_ref: String + otm_identifier: String + tag_reserved_4: String + initiator: String + quantity: String + amount_1: String + amount: String + gmt_time2: String + gmt_time3: String + gmt_time4: String + +databases: + dbo: + tables: + tr_live: + partition_by: tr_date_id + primary_key: gmt_time + settings: allow_nullable_key=1 + test: + tables: + foo16: + partition_by: pid + primary_key: tr_date_id + settings: allow_nullable_key=1 + employee3: + partition_by: tr_date_id + primary_key: gmt_time + settings: allow_nullable_key=1 + employees9: + partition_by: customerName + primary_key: occupation + settings: allow_nullable_key=1 + +replication: + history: + enable: true + diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfigTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfigTest.java new file mode 100644 index 000000000..1d6179e1a --- /dev/null +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/DefaultColumnDataTypeMappingConfigTest.java @@ -0,0 +1,21 @@ +package com.altinity.clickhouse.sink.connector.config; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static com.altinity.clickhouse.sink.connector.config.DefaultColumnDataTypeMappingConfig.loadDefaultColumnDataTypeMapping; + +public class DefaultColumnDataTypeMappingConfigTest { + + @Test + public void testDefaultColumnDataTypeMapping(){ + + // Call the method to load the default column data type mapping. + Map mapping = loadDefaultColumnDataTypeMapping(); + + // Print the contents of the mapping to the console. + System.out.println("Contents of default_column_datatype_mapping:"); + mapping.forEach((key, value) -> System.out.println(key + ": " + value)); + } +} diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfigTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfigTest.java new file mode 100644 index 000000000..00d811c9b --- /dev/null +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/config/SchemaOverrideConfigTest.java @@ -0,0 +1,77 @@ +package com.altinity.clickhouse.sink.connector.config; + +import com.altinity.clickhouse.sink.connector.config.SchemaOverrideConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * SchemaOverrideConfigTest is a test class that validates the functionality of + * SchemaOverrideConfig. It loads the configuration from a YAML file and verifies that + * the lookup by database and table name returns the correct configuration. + */ +public class SchemaOverrideConfigTest { + + private SchemaOverrideConfig config; + + /** + * Set up a new instance of SchemaOverrideConfig and load the YAML configuration + * before each test. The file is loaded from the classpath (resources). + */ + @BeforeEach + public void setUp() throws IOException { + config = new SchemaOverrideConfig(); + // Specify the path to the YAML file relative to the resources folder + String filePath = "config_schema_override.yml"; // File is inside src/main/resources + config.loadTableConfigs(filePath); + } + + /** + * Test to verify that the configuration for dbo.tr_live is loaded correctly. + */ + @Test + public void testDboTrLiveConfig() { + SchemaOverrideConfig.Table tableConfig = config.getTableConfig("test", "employee3"); + assertNotNull(tableConfig, "dbo.tr_live configuration should not be null"); + assertEquals("tr_date_id", tableConfig.getPartitionBy(), "Partition key should match"); + assertEquals("gmt_time", tableConfig.getPrimaryKey(), "Primary key should match"); + assertEquals("allow_nullable_key=1", tableConfig.getSettings(), "Settings should match"); + } + + /** + * Test to verify that the configuration for dbo.tr_live2 is loaded correctly. + */ + @Test + public void testDboTrLive2Config() { + SchemaOverrideConfig.Table tableConfig = config.getTableConfig("dbo", "tr_live2"); + assertNotNull(tableConfig, "dbo.tr_live2 configuration should not be null"); + // For dbo.tr_live2, partition_by and settings are not defined. + assertNull(tableConfig.getPartitionBy(), "Partition key should be null"); + assertEquals("time2", tableConfig.getPrimaryKey(), "Primary key should match"); + assertNull(tableConfig.getSettings(), "Settings should be null"); + } + + /** + * Test to verify that the configuration for dbo2.tr_live is loaded correctly. + */ + @Test + public void testDbo2TrLiveConfig() { + SchemaOverrideConfig.Table tableConfig = config.getTableConfig("dbo2", "tr_live"); + assertNotNull(tableConfig, "dbo2.tr_live configuration should not be null"); + assertEquals("tr_date_id", tableConfig.getPartitionBy(), "Partition key should match"); + assertEquals("gmt_time", tableConfig.getPrimaryKey(), "Primary key should match"); + assertEquals("allow_nullable_key=1", tableConfig.getSettings(), "Settings should match"); + } + + /** + * Test to verify that requesting a non-existent configuration returns null. + */ + @Test + public void testNonExistentConfig() { + SchemaOverrideConfig.Table tableConfig = config.getTableConfig("nonexistent_db", "nonexistent_table"); + assertNull(tableConfig, "Non-existent configuration should return null"); + } +}