diff --git a/CHANGELOG.md b/CHANGELOG.md index 95f65b2..792a629 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [Unreleased] - Add `catalog.include` and `catalog.exclude` options. +- Allow to add `proctime` column to tables. ## [0.1.0] - 2023-12-13 diff --git a/README.md b/README.md index 02b653c..6b15e33 100644 --- a/README.md +++ b/README.md @@ -58,16 +58,13 @@ separating them with a comma `,` sign. The Flink tables created this way can also be partitioned just as other Flink tables by providing default catalog scan options or table specific scan options. -### Timeattributes -It is possible to declare `timeattributes` for tables in the catalog by specifying additional -We can either use processing time by specifying how to name a processing time column: -- `properties.timeattribute.{tablename}.proctime.column` +### Time attributes -or column and delay for watermarking: -- `properties.timeattribute.{tablename}.watermark.column` -- `properties.timeattribute.{tablename}.watermark.delay` +It is possible to add `proctime` column to each catalog table. -It is possible to specify watermark properties for catalog tables. +```properties +catalog.add-proctime-column=true +``` --- diff --git a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java index 73b609c..4b3484e 100644 --- a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java +++ b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java @@ -46,6 +46,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -81,7 +82,7 @@ public class ElasticCatalog extends AbstractJdbcCatalog { private final String catalogDefaultScanPartitionColumnName; private final String catalogDefaultScanPartitionCapacity; private final Map scanPartitionProperties; - private final Map timeAttributeProperties; + private final boolean addProctimeColumn; private final List indexPatterns; private final IndexFilterResolver indexFilterResolver; @@ -92,18 +93,7 @@ public ElasticCatalog(ClassLoader userClassLoader, String password, String baseUrl) { this(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl, - IndexFilterResolver.acceptAll(), new HashMap<>()); - } - - public ElasticCatalog(ClassLoader userClassLoader, - String catalogName, - String defaultDatabase, - String username, - String password, - String baseUrl, - IndexFilterResolver indexFilterResolver) { - this(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl, indexFilterResolver, - new HashMap<>()); + false, IndexFilterResolver.acceptAll(), Collections.emptyMap()); } public ElasticCatalog(ClassLoader userClassLoader, @@ -112,6 +102,7 @@ public ElasticCatalog(ClassLoader userClassLoader, String username, String password, String baseUrl, + boolean addProctimeColumn, IndexFilterResolver indexFilterResolver, Map properties) { super(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl); @@ -121,7 +112,7 @@ public ElasticCatalog(ClassLoader userClassLoader, this.catalogDefaultScanPartitionColumnName = catalogDefaultScanProperties[0]; this.catalogDefaultScanPartitionCapacity = catalogDefaultScanProperties[1]; this.scanPartitionProperties = extractScanTablePartitionProperties(properties); - this.timeAttributeProperties = extractTimeAttributeProperties(properties); + this.addProctimeColumn = addProctimeColumn; this.indexFilterResolver = indexFilterResolver; } @@ -185,50 +176,6 @@ private Map extractScanTablePartitionProperties return scanPartitionProperties; } - private Map extractTimeAttributeProperties(Map properties) { - Map timeAttributeProperties = new HashMap<>(); - - for (Map.Entry entry : properties.entrySet()) { - String key = entry.getKey(); - if (!key.startsWith("properties.timeattribute.") || - !( - key.endsWith(".watermark.column") || - key.endsWith(".watermark.delay") || - key.endsWith(".proctime.column") - ) - ) { - continue; - } - String tableName = key.replace("properties.timeattribute.", ""). - replace(".watermark.column", ""). - replace(".watermark.delay", ""). - replace(".proctime.column", ""); - boolean timeAttributePropertiesForTableFound = timeAttributeProperties.containsKey(tableName); - TimeAttributeProperties timeAttributePropertiesForTable = timeAttributePropertiesForTableFound - ? timeAttributeProperties.get(tableName) - : new TimeAttributeProperties(); - - if (entry.getKey().endsWith(".watermark.column")) { - if (timeAttributePropertiesForTable.getWatermarkColumn() == null) { - timeAttributePropertiesForTable.setWatermarkColumn(entry.getValue()); - } - } else if (entry.getKey().endsWith(".watermark.delay")) { - if (timeAttributePropertiesForTable.getWatermarkDelay() == null) { - timeAttributePropertiesForTable.setWatermarkDelay(entry.getValue()); - } - } else if (entry.getKey().endsWith(".proctime.column")) { - if (timeAttributePropertiesForTable.getProctimeColumn() == null) { - timeAttributePropertiesForTable.setProctimeColumn(entry.getValue()); - } - } - // Adding a new watermarkProperties to the map - if (!timeAttributePropertiesForTableFound) { - timeAttributeProperties.put(tableName, timeAttributePropertiesForTable); - } - } - return timeAttributeProperties; - } - @Override public void open() throws CatalogException { // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail @@ -310,9 +257,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep DataType[] types = columns.values().toArray(new DataType[0]); String tableName = getSchemaTableName(tablePath); - TimeAttributeProperties tableTimeAttributeProperties = timeAttributeProperties.get(tableName); - checkTimeAttributeProperties(tableTimeAttributeProperties, tableName); - Schema tableSchema = buildSchema(columnNames, types, primaryKey, tableTimeAttributeProperties); + Schema tableSchema = buildSchema(columnNames, types, primaryKey); ScanPartitionProperties properties = scanPartitionProperties.get(tableName); @@ -382,27 +327,6 @@ private void checkScanPartitionNumber(int partitionNumber) { } } - private void checkTimeAttributeProperties(TimeAttributeProperties timeAttributeProperties, String tableName) { - if (timeAttributeProperties == null) { - return; - } - - String proctimeColumn = timeAttributeProperties.getProctimeColumn(); - String watermarkDelay = timeAttributeProperties.getWatermarkDelay(); - String watermarkColumn = timeAttributeProperties.getWatermarkColumn(); - - if (proctimeColumn != null && ( - watermarkDelay != null || - watermarkColumn != null)) { - throw new IllegalArgumentException("Either proctime or watermark properties should be specified for a table " - + tableName + "."); - } - if (proctimeColumn == null && (watermarkDelay == null || watermarkColumn == null)) { - throw new IllegalArgumentException("You should specify both watermarkDelay and watermarkColumn properties when using watermark for a table " - + tableName + "."); - } - } - private DataType retrievePartitionColumnDataType(String[] columnNames, DataType[] types, String partitionColumnName, String tableName) { for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { if (Objects.equals(columnNames[columnIndex], partitionColumnName)) { @@ -441,22 +365,12 @@ private ResultSetMetaData retrieveResultSetMetaData(Connection conn, ObjectPath } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private Schema buildSchema(String[] columnNames, DataType[] types, Optional primaryKey, - TimeAttributeProperties timeAttributeProperties) { + private Schema buildSchema(String[] columnNames, DataType[] types, Optional primaryKey) { Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); primaryKey.ifPresent(pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); - if (timeAttributeProperties != null) { - // using proctime - if (timeAttributeProperties.getProctimeColumn() != null) { - schemaBuilder.columnByExpression(timeAttributeProperties.getProctimeColumn(), "PROCTIME()"); - } else { // using watermarking - schemaBuilder.watermark( - timeAttributeProperties.getWatermarkColumn(), - timeAttributeProperties.getWatermarkColumn() + - " - INTERVAL " + - timeAttributeProperties.getWatermarkDelay()); - } + if (addProctimeColumn) { + schemaBuilder.columnByExpression("proctime", "PROCTIME()"); } return schemaBuilder.build(); } @@ -620,40 +534,4 @@ public String toString() { ", scanPartitionUpperBound=" + scanPartitionUpperBound; } } - - static class TimeAttributeProperties { - private String watermarkColumn; - private String watermarkDelay; - private String proctimeColumn; - - public String getWatermarkColumn() { - return this.watermarkColumn; - } - - public String getWatermarkDelay() { - return this.watermarkDelay; - } - - public String getProctimeColumn() { - return this.proctimeColumn; - } - - public void setWatermarkColumn(String watermarkColumn) { - this.watermarkColumn = watermarkColumn; - } - - public void setWatermarkDelay(String watermarkDelay) { - this.watermarkDelay = watermarkDelay; - } - - public void setProctimeColumn(String proctimeColumn) { - this.proctimeColumn = proctimeColumn; - } - - public String toString() { - return "watermarkColumn=" + watermarkColumn + - ", watermarkDelay=" + watermarkDelay + - ", proctimeColumn=" + proctimeColumn; - } - } } diff --git a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactory.java b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactory.java index c2b2dfe..09b35cb 100644 --- a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactory.java +++ b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactory.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.ADD_PROCTIME_COLUMN; import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.BASE_URL; import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.DEFAULT_DATABASE; import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.DEFAULT_SCAN_PARTITION_COLUMN_NAME; @@ -61,6 +62,7 @@ public Set> optionalOptions() { options.add(DEFAULT_SCAN_PARTITION_COLUMN_NAME); options.add(DEFAULT_SCAN_PARTITION_SIZE); options.add(PROPERTIES_INDEX_PATTERNS); + options.add(ADD_PROCTIME_COLUMN); options.add(EXCLUDE); options.add(INCLUDE); return options; @@ -80,6 +82,7 @@ public Catalog createCatalog(Context context) { helper.getOptions().get(USERNAME), helper.getOptions().get(PASSWORD), helper.getOptions().get(BASE_URL), + Boolean.getBoolean(context.getOptions().get(ADD_PROCTIME_COLUMN)), IndexFilterResolver.of(helper.getOptions().get(INCLUDE), helper.getOptions().get(EXCLUDE)), context.getOptions() ); @@ -90,16 +93,11 @@ private void validateDynamicOptions(Map options) { for (Map.Entry entry : scanOptions.entrySet()) { String key = entry.getKey(); if (!(key.startsWith("properties.scan.") && key.endsWith("partition.column.name")) && - !(key.startsWith("properties.scan.") && key.endsWith("partition.number")) && - !(key.startsWith("properties.watermark.") && key.endsWith("interval")) && - !(key.startsWith("properties.watermark.") && key.endsWith("unit"))) { - throw new IllegalArgumentException("Parameter " + entry.getKey() + " is not supported. We support" + - " properties.scan..partition.column.name, " + - " properties.scan..partition.number, " + - " properties.timeattribute..watermark.column, " + - " properties.timeattribute..watermark.delay, " + - " properties.timeattribute..proctime.column " + - "dynamic properties only." + !(key.startsWith("properties.scan.") && key.endsWith("partition.number"))) { + throw new IllegalArgumentException("Parameter " + entry.getKey() + " is not supported." + + " We support only the following dynamic properties:\n" + + " properties.scan..partition.column.name\n" + + " properties.scan..partition.number" ); } } diff --git a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryOptions.java b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryOptions.java index 805bca0..256d30c 100644 --- a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryOptions.java +++ b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryOptions.java @@ -72,6 +72,12 @@ public class ElasticJdbcCatalogFactoryOptions { .withDescription( "Index patterns."); + public static final ConfigOption ADD_PROCTIME_COLUMN = + ConfigOptions.key("catalog.add-proctime-column") + .booleanType() + .defaultValue(false) + .withDescription("Indicates if proctime column should be added to all tables."); + public static final ConfigOption EXCLUDE = ConfigOptions.key("catalog.exclude") .stringType() diff --git a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogBuilder.java b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogBuilder.java new file mode 100644 index 0000000..36c67c1 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogBuilder.java @@ -0,0 +1,74 @@ +package com.getindata.flink.connector.jdbc.catalog; + +import java.util.Map; + +public final class ElasticCatalogBuilder { + private ClassLoader userClassLoader; + private String username; + private String password; + private String baseUrl; + private boolean addProctimeColumn; + private IndexFilterResolver indexFilterResolver; + private String catalogName; + private String defaultDatabase; + + private Map properties; + + private ElasticCatalogBuilder() { + } + + public static ElasticCatalogBuilder anElasticCatalog() { + return new ElasticCatalogBuilder(); + } + + public ElasticCatalogBuilder userClassLoader(ClassLoader userClassLoader) { + this.userClassLoader = userClassLoader; + return this; + } + + public ElasticCatalogBuilder username(String username) { + this.username = username; + return this; + } + + public ElasticCatalogBuilder password(String password) { + this.password = password; + return this; + } + + public ElasticCatalogBuilder baseUrl(String baseUrl) { + this.baseUrl = baseUrl; + return this; + } + + public ElasticCatalogBuilder addProctimeColumn(boolean addProctimeColumn) { + this.addProctimeColumn = addProctimeColumn; + return this; + } + + public ElasticCatalogBuilder indexFilterResolver(IndexFilterResolver indexFilterResolver) { + this.indexFilterResolver = indexFilterResolver; + return this; + } + + public ElasticCatalogBuilder catalogName(String catalogName) { + this.catalogName = catalogName; + return this; + } + + public ElasticCatalogBuilder defaultDatabase(String defaultDatabase) { + this.defaultDatabase = defaultDatabase; + return this; + } + + + public ElasticCatalogBuilder properties(Map properties) { + this.properties = properties; + return this; + } + + public ElasticCatalog build() { + return new ElasticCatalog(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl, + addProctimeColumn, indexFilterResolver, properties); + } +} diff --git a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java index e59e096..7e03ad2 100644 --- a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java +++ b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -84,6 +85,19 @@ public static void beforeAll() throws Exception { TimeUnit.SECONDS.sleep(1); } + private ElasticCatalogBuilder catalogBuilder() { + return ElasticCatalogBuilder.anElasticCatalog() + .userClassLoader(this.getClass().getClassLoader()) + .catalogName("test-catalog") + .defaultDatabase("test-database") + .username(USERNAME) + .password(PASSWORD) + .baseUrl(url) + .addProctimeColumn(false) + .indexFilterResolver(IndexFilterResolver.acceptAll()) + .properties(Collections.emptyMap()); + } + @Test public void testListDatabases() throws DatabaseNotExistException, TableNotExistException { // given @@ -91,8 +105,7 @@ public void testListDatabases() throws DatabaseNotExistException, TableNotExistE container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", - USERNAME, PASSWORD, url); + ElasticCatalog catalog = catalogBuilder().build(); // then List databases = catalog.listDatabases(); @@ -107,8 +120,7 @@ public void testListTables() throws DatabaseNotExistException, InterruptedExcept container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", - USERNAME, PASSWORD, url); + ElasticCatalog catalog = catalogBuilder().build(); // then List tables = catalog.listTables("docker-cluster"); @@ -133,8 +145,9 @@ public void testTableFiltering() throws DatabaseNotExistException { container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", - USERNAME, PASSWORD, url, IndexFilterResolver.of("test_m.*", "test_mi.*")); + ElasticCatalog catalog = catalogBuilder() + .indexFilterResolver(IndexFilterResolver.of("test_m.*", "test_mi.*")) + .build(); // then List tables = catalog.listTables("docker-cluster"); @@ -152,8 +165,7 @@ public void testTableExists() throws DatabaseNotExistException { container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", - USERNAME, PASSWORD, url); + ElasticCatalog catalog = catalogBuilder().build(); // then assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "test_single_record_table"))); @@ -173,8 +185,7 @@ public void testTableNotExists() { container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", - USERNAME, PASSWORD, url); + ElasticCatalog catalog = catalogBuilder().build(); // then assertFalse(catalog.tableExists(new ObjectPath("docker-cluster", "nonexisting_table"))); @@ -187,8 +198,7 @@ public void testGetNonPartitionedTable() throws TableNotExistException { container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", - USERNAME, PASSWORD, url); + ElasticCatalog catalog = catalogBuilder().build(); CatalogBaseTable table = catalog.getTable(new ObjectPath( "docker-cluster", "test_multiple_records_table")); @@ -211,8 +221,7 @@ public void testGetTablePartitionedByTimestamp() throws TableNotExistException { properties.put("properties.scan.test_multiple_records_table.partition.number", "10"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath( "docker-cluster", "test_multiple_records_table")); @@ -236,8 +245,7 @@ public void testGetTablePartitionedByInteger() throws TableNotExistException { properties.put("properties.scan.test_multiple_records_table.partition.number", "10"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath( "docker-cluster", "test_multiple_records_table")); @@ -261,8 +269,7 @@ public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistExce properties.put("catalog.default.scan.partition.size", "100"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_empty_table")); // then @@ -286,8 +293,7 @@ public void testFailNoPartitionColumnProvided() throws TableNotExistException { properties.put("properties.scan.test_multiple_records_table.partition.number", "10"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); try { catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); @@ -307,8 +313,7 @@ public void testFailNoPartitionNumberProvided() throws TableNotExistException { properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); try { catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); @@ -329,8 +334,7 @@ public void testFailNoPartitionColumnInTable() throws TableNotExistException { properties.put("properties.scan.test_missing_date_col_table.partition.number", "10"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); try { catalog.getTable(new ObjectPath("docker-cluster", "test_missing_date_col_table")); @@ -351,8 +355,7 @@ public void testFailPartitionColumnNotSupported() throws TableNotExistException properties.put("properties.scan.test_single_record_table.partition.number", "10"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); try { catalog.getTable(new ObjectPath("docker-cluster", "test_single_record_table")); @@ -373,8 +376,7 @@ public void testFailInappropriatePartitionNumber() throws TableNotExistException properties.put("properties.scan.test_multiple_records_table.partition.number", "0"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); try { catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); @@ -393,8 +395,7 @@ public void testUnsupportedDataTypeInTable() throws TableNotExistException { container.getElasticPort()); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url); + ElasticCatalog catalog = catalogBuilder().build(); try { catalog.getTable(new ObjectPath("docker-cluster", "test_unsupported_data_type_table")); @@ -415,8 +416,7 @@ public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotE properties.put("catalog.default.scan.partition.size", "5"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); // then @@ -441,8 +441,7 @@ public void testGetTableOverwriteCatalogScanProperties() throws TableNotExistExc properties.put("catalog.default.scan.partition.size", "5"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); // then @@ -463,8 +462,7 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo properties.put("properties.index.patterns", "test_*_record_table"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_*_record_table")); // then @@ -525,8 +523,7 @@ public void testGetMultipleIndexPatternPartitionedTables() throws TableNotExistE properties.put("properties.index.patterns", "test_*_record*_table,test_partial_schema_table_*"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_*_record*_table")); CatalogBaseTable table2 = catalog.getTable(new ObjectPath("docker-cluster", "test_partial_schema_table_*")); @@ -560,8 +557,7 @@ public void testGetTableDuplicatedIndexPattern() throws TableNotExistException, properties.put("properties.index.patterns", "test_partial_schema_table_*, test_partial_schema_table_*"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); catalog.getTable(new ObjectPath("docker-cluster", "test_partial_schema_table_*")); // then @@ -581,8 +577,7 @@ public void testGetTableIndexPatternDifferentTableSchamas() throws TableNotExist properties.put("properties.index.patterns", "test_partial_schema_table_*"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_partial_schema_table_*")); Schema expectedSchema = Schema.newBuilder().fromFields( @@ -619,8 +614,7 @@ public void testGetTablePartitionBySpecialCharacterColumn() throws TableNotExist properties.put("properties.scan.test_special_character_column_names_table.partition.number", "10"); // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); + ElasticCatalog catalog = catalogBuilder().properties(properties).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_special_character_column_names_table")); // then @@ -662,20 +656,9 @@ public void testGetTableTimeAttributesProctime() throws TableNotExistException { // given String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), container.getElasticPort()); - Map properties = new HashMap(); - properties.put("properties.timeattribute.test_multiple_records_table.proctime.column", "my_proctime"); // when - ElasticCatalog catalog = new ElasticCatalog( - this.getClass().getClassLoader(), - "test-catalog", - "test-database", - USERNAME, - PASSWORD, - url, - IndexFilterResolver.acceptAll(), - properties - ); + ElasticCatalog catalog = catalogBuilder().addProctimeColumn(true).build(); CatalogBaseTable table = catalog.getTable(new ObjectPath( "docker-cluster", "test_multiple_records_table" @@ -689,74 +672,9 @@ public void testGetTableTimeAttributesProctime() throws TableNotExistException { List columns = schema.getColumns(); UnresolvedColumn actualColumn = columns.get(columns.size() - 1); UnresolvedComputedColumn expectedColumn = (UnresolvedComputedColumn) Schema.newBuilder().columnByExpression( - "my_proctime", + "proctime", "PROCTIME()" ).build().getColumns().get(0); assertEquals(actualColumn, expectedColumn); } - - @Test - public void testGetTableTimeAttributesWatermark() throws TableNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); - properties.put("properties.timeattribute.test_multiple_records_table.watermark.column", "date_col"); - properties.put("properties.timeattribute.test_multiple_records_table.watermark.delay", "'5' SECOND"); - - // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); - CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); - - // then - Schema schema = table.getUnresolvedSchema(); - - assertEquals(1, schema.getWatermarkSpecs().size()); - assertEquals("[date_col - INTERVAL '5' SECOND]", schema.getWatermarkSpecs().get(0).getWatermarkExpression().toString()); - } - - @Test - public void testFailGetTableTimeAttributesProctimeAndWatermarkProvided() throws TableNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); - properties.put("properties.timeattribute.test_multiple_records_table.proctime.column", "my_proctime"); - properties.put("properties.timeattribute.test_multiple_records_table.watermark.column", "date_col"); - properties.put("properties.timeattribute.test_multiple_records_table.watermark.delay", "'5' SECOND"); - - // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); - try { - catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); - - // then - fail("Should have thrown CatalogException"); - } catch (CatalogException e) { - assertTrue(e.getCause().getMessage().contains("Either proctime or watermark properties should be specified for a table test_multiple_records_table.")); - } - } - - @Test - public void testFailGetTableWithWatermarkMissingProperty() throws TableNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); - properties.put("properties.timeattribute.test_multiple_records_table.watermark.column", "date_col"); - - // when - ElasticCatalog catalog = new ElasticCatalog(this.getClass().getClassLoader(), "test-catalog", "test-database", USERNAME, - PASSWORD, url, IndexFilterResolver.acceptAll(), properties); - try { - catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table")); - - // then - fail("Should have thrown CatalogException"); - } catch (CatalogException e) { - assertTrue(e.getCause().getMessage().contains("You should specify both watermarkDelay and watermarkColumn properties when using watermark for a table test_multiple_records_table.")); - } - } }