Skip to content

Commit

Permalink
Allow to add proctime column to tables
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorz8 committed Jan 26, 2024
1 parent 0b1ddf5 commit ab0ff36
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 271 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [Unreleased]

- Add `catalog.include` and `catalog.exclude` options.
- Allow to add `proctime` column to tables.

## [0.1.0] - 2023-12-13

Expand Down
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ separating them with a comma `,` sign.

The Flink tables created this way can also be partitioned just as other Flink tables by providing default catalog scan options or table specific scan options.

### Timeattributes
It is possible to declare `timeattributes` for tables in the catalog by specifying additional
We can either use processing time by specifying how to name a processing time column:
- `properties.timeattribute.{tablename}.proctime.column`
### Time attributes

or column and delay for watermarking:
- `properties.timeattribute.{tablename}.watermark.column`
- `properties.timeattribute.{tablename}.watermark.delay`
It is possible to add `proctime` column to each catalog table.

It is possible to specify watermark properties for catalog tables.
```properties
catalog.add-proctime-column=true
```

---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -81,7 +82,7 @@ public class ElasticCatalog extends AbstractJdbcCatalog {
private final String catalogDefaultScanPartitionColumnName;
private final String catalogDefaultScanPartitionCapacity;
private final Map<String, ScanPartitionProperties> scanPartitionProperties;
private final Map<String, TimeAttributeProperties> timeAttributeProperties;
private final boolean addProctimeColumn;
private final List<String> indexPatterns;
private final IndexFilterResolver indexFilterResolver;

Expand All @@ -92,18 +93,7 @@ public ElasticCatalog(ClassLoader userClassLoader,
String password,
String baseUrl) {
this(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl,
IndexFilterResolver.acceptAll(), new HashMap<>());
}

public ElasticCatalog(ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String password,
String baseUrl,
IndexFilterResolver indexFilterResolver) {
this(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl, indexFilterResolver,
new HashMap<>());
false, IndexFilterResolver.acceptAll(), Collections.emptyMap());
}

public ElasticCatalog(ClassLoader userClassLoader,
Expand All @@ -112,6 +102,7 @@ public ElasticCatalog(ClassLoader userClassLoader,
String username,
String password,
String baseUrl,
boolean addProctimeColumn,
IndexFilterResolver indexFilterResolver,
Map<String, String> properties) {
super(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl);
Expand All @@ -121,7 +112,7 @@ public ElasticCatalog(ClassLoader userClassLoader,
this.catalogDefaultScanPartitionColumnName = catalogDefaultScanProperties[0];
this.catalogDefaultScanPartitionCapacity = catalogDefaultScanProperties[1];
this.scanPartitionProperties = extractScanTablePartitionProperties(properties);
this.timeAttributeProperties = extractTimeAttributeProperties(properties);
this.addProctimeColumn = addProctimeColumn;
this.indexFilterResolver = indexFilterResolver;
}

Expand Down Expand Up @@ -185,50 +176,6 @@ private Map<String, ScanPartitionProperties> extractScanTablePartitionProperties
return scanPartitionProperties;
}

private Map<String, TimeAttributeProperties> extractTimeAttributeProperties(Map<String, String> properties) {
Map<String, TimeAttributeProperties> timeAttributeProperties = new HashMap<>();

for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
if (!key.startsWith("properties.timeattribute.") ||
!(
key.endsWith(".watermark.column") ||
key.endsWith(".watermark.delay") ||
key.endsWith(".proctime.column")
)
) {
continue;
}
String tableName = key.replace("properties.timeattribute.", "").
replace(".watermark.column", "").
replace(".watermark.delay", "").
replace(".proctime.column", "");
boolean timeAttributePropertiesForTableFound = timeAttributeProperties.containsKey(tableName);
TimeAttributeProperties timeAttributePropertiesForTable = timeAttributePropertiesForTableFound
? timeAttributeProperties.get(tableName)
: new TimeAttributeProperties();

if (entry.getKey().endsWith(".watermark.column")) {
if (timeAttributePropertiesForTable.getWatermarkColumn() == null) {
timeAttributePropertiesForTable.setWatermarkColumn(entry.getValue());
}
} else if (entry.getKey().endsWith(".watermark.delay")) {
if (timeAttributePropertiesForTable.getWatermarkDelay() == null) {
timeAttributePropertiesForTable.setWatermarkDelay(entry.getValue());
}
} else if (entry.getKey().endsWith(".proctime.column")) {
if (timeAttributePropertiesForTable.getProctimeColumn() == null) {
timeAttributePropertiesForTable.setProctimeColumn(entry.getValue());
}
}
// Adding a new watermarkProperties to the map
if (!timeAttributePropertiesForTableFound) {
timeAttributeProperties.put(tableName, timeAttributePropertiesForTable);
}
}
return timeAttributeProperties;
}

@Override
public void open() throws CatalogException {
// load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail
Expand Down Expand Up @@ -310,9 +257,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
DataType[] types = columns.values().toArray(new DataType[0]);

String tableName = getSchemaTableName(tablePath);
TimeAttributeProperties tableTimeAttributeProperties = timeAttributeProperties.get(tableName);
checkTimeAttributeProperties(tableTimeAttributeProperties, tableName);
Schema tableSchema = buildSchema(columnNames, types, primaryKey, tableTimeAttributeProperties);
Schema tableSchema = buildSchema(columnNames, types, primaryKey);

ScanPartitionProperties properties = scanPartitionProperties.get(tableName);

Expand Down Expand Up @@ -382,27 +327,6 @@ private void checkScanPartitionNumber(int partitionNumber) {
}
}

private void checkTimeAttributeProperties(TimeAttributeProperties timeAttributeProperties, String tableName) {
if (timeAttributeProperties == null) {
return;
}

String proctimeColumn = timeAttributeProperties.getProctimeColumn();
String watermarkDelay = timeAttributeProperties.getWatermarkDelay();
String watermarkColumn = timeAttributeProperties.getWatermarkColumn();

if (proctimeColumn != null && (
watermarkDelay != null ||
watermarkColumn != null)) {
throw new IllegalArgumentException("Either proctime or watermark properties should be specified for a table "
+ tableName + ".");
}
if (proctimeColumn == null && (watermarkDelay == null || watermarkColumn == null)) {
throw new IllegalArgumentException("You should specify both watermarkDelay and watermarkColumn properties when using watermark for a table "
+ tableName + ".");
}
}

private DataType retrievePartitionColumnDataType(String[] columnNames, DataType[] types, String partitionColumnName, String tableName) {
for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) {
if (Objects.equals(columnNames[columnIndex], partitionColumnName)) {
Expand Down Expand Up @@ -441,22 +365,12 @@ private ResultSetMetaData retrieveResultSetMetaData(Connection conn, ObjectPath
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private Schema buildSchema(String[] columnNames, DataType[] types, Optional<UniqueConstraint> primaryKey,
TimeAttributeProperties timeAttributeProperties) {
private Schema buildSchema(String[] columnNames, DataType[] types, Optional<UniqueConstraint> primaryKey) {
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
primaryKey.ifPresent(pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));

if (timeAttributeProperties != null) {
// using proctime
if (timeAttributeProperties.getProctimeColumn() != null) {
schemaBuilder.columnByExpression(timeAttributeProperties.getProctimeColumn(), "PROCTIME()");
} else { // using watermarking
schemaBuilder.watermark(
timeAttributeProperties.getWatermarkColumn(),
timeAttributeProperties.getWatermarkColumn() +
" - INTERVAL " +
timeAttributeProperties.getWatermarkDelay());
}
if (addProctimeColumn) {
schemaBuilder.columnByExpression("proctime", "PROCTIME()");
}
return schemaBuilder.build();
}
Expand Down Expand Up @@ -620,40 +534,4 @@ public String toString() {
", scanPartitionUpperBound=" + scanPartitionUpperBound;
}
}

static class TimeAttributeProperties {
private String watermarkColumn;
private String watermarkDelay;
private String proctimeColumn;

public String getWatermarkColumn() {
return this.watermarkColumn;
}

public String getWatermarkDelay() {
return this.watermarkDelay;
}

public String getProctimeColumn() {
return this.proctimeColumn;
}

public void setWatermarkColumn(String watermarkColumn) {
this.watermarkColumn = watermarkColumn;
}

public void setWatermarkDelay(String watermarkDelay) {
this.watermarkDelay = watermarkDelay;
}

public void setProctimeColumn(String proctimeColumn) {
this.proctimeColumn = proctimeColumn;
}

public String toString() {
return "watermarkColumn=" + watermarkColumn +
", watermarkDelay=" + watermarkDelay +
", proctimeColumn=" + proctimeColumn;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.ADD_PROCTIME_COLUMN;
import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.BASE_URL;
import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.DEFAULT_DATABASE;
import static com.getindata.flink.connector.jdbc.catalog.ElasticJdbcCatalogFactoryOptions.DEFAULT_SCAN_PARTITION_COLUMN_NAME;
Expand Down Expand Up @@ -61,6 +62,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(DEFAULT_SCAN_PARTITION_COLUMN_NAME);
options.add(DEFAULT_SCAN_PARTITION_SIZE);
options.add(PROPERTIES_INDEX_PATTERNS);
options.add(ADD_PROCTIME_COLUMN);
options.add(EXCLUDE);
options.add(INCLUDE);
return options;
Expand All @@ -80,6 +82,7 @@ public Catalog createCatalog(Context context) {
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD),
helper.getOptions().get(BASE_URL),
Boolean.getBoolean(context.getOptions().get(ADD_PROCTIME_COLUMN)),
IndexFilterResolver.of(helper.getOptions().get(INCLUDE), helper.getOptions().get(EXCLUDE)),
context.getOptions()
);
Expand All @@ -90,16 +93,11 @@ private void validateDynamicOptions(Map<String, String> options) {
for (Map.Entry<String, String> entry : scanOptions.entrySet()) {
String key = entry.getKey();
if (!(key.startsWith("properties.scan.") && key.endsWith("partition.column.name")) &&
!(key.startsWith("properties.scan.") && key.endsWith("partition.number")) &&
!(key.startsWith("properties.watermark.") && key.endsWith("interval")) &&
!(key.startsWith("properties.watermark.") && key.endsWith("unit"))) {
throw new IllegalArgumentException("Parameter " + entry.getKey() + " is not supported. We support" +
" properties.scan.<table_name>.partition.column.name, " +
" properties.scan.<table_name>.partition.number, " +
" properties.timeattribute.<table_name>.watermark.column, " +
" properties.timeattribute.<table_name>.watermark.delay, " +
" properties.timeattribute.<table_name>.proctime.column " +
"dynamic properties only."
!(key.startsWith("properties.scan.") && key.endsWith("partition.number"))) {
throw new IllegalArgumentException("Parameter " + entry.getKey() + " is not supported." +
" We support only the following dynamic properties:\n" +
" properties.scan.<table_name>.partition.column.name\n" +
" properties.scan.<table_name>.partition.number"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class ElasticJdbcCatalogFactoryOptions {
.withDescription(
"Index patterns.");

public static final ConfigOption<Boolean> ADD_PROCTIME_COLUMN =
ConfigOptions.key("catalog.add-proctime-column")
.booleanType()
.defaultValue(false)
.withDescription("Indicates if proctime column should be added to all tables.");

public static final ConfigOption<String> EXCLUDE =
ConfigOptions.key("catalog.exclude")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.getindata.flink.connector.jdbc.catalog;

import java.util.Map;

public final class ElasticCatalogBuilder {
private ClassLoader userClassLoader;
private String username;
private String password;
private String baseUrl;
private boolean addProctimeColumn;
private IndexFilterResolver indexFilterResolver;
private String catalogName;
private String defaultDatabase;

private Map<String, String> properties;

private ElasticCatalogBuilder() {
}

public static ElasticCatalogBuilder anElasticCatalog() {
return new ElasticCatalogBuilder();
}

public ElasticCatalogBuilder userClassLoader(ClassLoader userClassLoader) {
this.userClassLoader = userClassLoader;
return this;
}

public ElasticCatalogBuilder username(String username) {
this.username = username;
return this;
}

public ElasticCatalogBuilder password(String password) {
this.password = password;
return this;
}

public ElasticCatalogBuilder baseUrl(String baseUrl) {
this.baseUrl = baseUrl;
return this;
}

public ElasticCatalogBuilder addProctimeColumn(boolean addProctimeColumn) {
this.addProctimeColumn = addProctimeColumn;
return this;
}

public ElasticCatalogBuilder indexFilterResolver(IndexFilterResolver indexFilterResolver) {
this.indexFilterResolver = indexFilterResolver;
return this;
}

public ElasticCatalogBuilder catalogName(String catalogName) {
this.catalogName = catalogName;
return this;
}

public ElasticCatalogBuilder defaultDatabase(String defaultDatabase) {
this.defaultDatabase = defaultDatabase;
return this;
}


public ElasticCatalogBuilder properties(Map<String, String> properties) {
this.properties = properties;
return this;
}

public ElasticCatalog build() {
return new ElasticCatalog(userClassLoader, catalogName, defaultDatabase, username, password, baseUrl,
addProctimeColumn, indexFilterResolver, properties);
}
}
Loading

0 comments on commit ab0ff36

Please sign in to comment.