From a10e07fd36688d159a42ed50eb5abe0f5cd6616d Mon Sep 17 00:00:00 2001 From: He Wang Date: Tue, 5 Dec 2023 17:26:16 +0800 Subject: [PATCH] [cdc-connector][oceanbase] Add option for obcdc extra configs (#2543) This closes #2543. --- docs/content/connectors/oceanbase-cdc(ZH).md | 7 ++++++ docs/content/connectors/oceanbase-cdc.md | 7 ++++++ .../connectors/oceanbase/OceanBaseSource.java | 14 ++++++++++++ .../oceanbase/table/OceanBaseTableSource.java | 11 ++++++++-- .../table/OceanBaseTableSourceFactory.java | 22 +++++++++++++++++-- .../table/OceanBaseConnectorITCase.java | 3 ++- .../table/OceanBaseTableFactoryTest.java | 9 +++++--- 7 files changed, 65 insertions(+), 8 deletions(-) diff --git a/docs/content/connectors/oceanbase-cdc(ZH).md b/docs/content/connectors/oceanbase-cdc(ZH).md index cd1565334e6..9940d5f0a2a 100644 --- a/docs/content/connectors/oceanbase-cdc(ZH).md +++ b/docs/content/connectors/oceanbase-cdc(ZH).md @@ -311,6 +311,13 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表 String 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'。 + + obcdc.properties.* + 否 + 无 + String + 传递参数到libobcdc,如 'obcdc.properties.sort_trans_participants' = '1'。更多参数信息见 obcdc 配置项说明。 + diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index da44a4656ba..57ae961239f 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -314,6 +314,13 @@ The OceanBase CDC Connector contains some options for both sql and stream api as String Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'. + + obcdc.properties.* + optional + (none) + String + Option to pass custom configurations to the libobcdc, eg: 'obcdc.properties.sort_trans_participants' = '1'. Please refer to obcdc parameters for more details. + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java index f741092d58f..195d5f31673 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java @@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -69,6 +71,7 @@ public static class Builder { private String rsList; private String configUrl; private String workingMode; + private Properties obcdcProperties; private OceanBaseDeserializationSchema deserializer; @@ -177,6 +180,11 @@ public Builder workingMode(String workingMode) { return this; } + public Builder obcdcProperties(Properties obcdcProperties) { + this.obcdcProperties = obcdcProperties; + return this; + } + public Builder deserializer(OceanBaseDeserializationSchema deserializer) { this.deserializer = deserializer; return this; @@ -261,6 +269,12 @@ public SourceFunction build() { obReaderConfig.setStartTimestamp(startupTimestamp); obReaderConfig.setTimezone(serverTimeZone); + if (obcdcProperties != null && !obcdcProperties.isEmpty()) { + Map extraConfigs = new HashMap<>(); + obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString())); + obReaderConfig.setExtraConfigs(extraConfigs); + } + return new OceanBaseRichSourceFunction<>( StartupMode.INITIAL.equals(startupMode), username, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java index 752b8fe29fa..8e521b7a5c2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java @@ -70,6 +70,7 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet private final String rsList; private final String configUrl; private final String workingMode; + private final Properties obcdcProperties; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -103,7 +104,8 @@ public OceanBaseTableSource( Long startupTimestamp, String rsList, String configUrl, - String workingMode) { + String workingMode, + Properties obcdcProperties) { this.physicalSchema = physicalSchema; this.startupMode = checkNotNull(startupMode); this.username = checkNotNull(username); @@ -126,6 +128,7 @@ public OceanBaseTableSource( this.rsList = rsList; this.configUrl = configUrl; this.workingMode = workingMode; + this.obcdcProperties = obcdcProperties; this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); @@ -174,6 +177,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { .rsList(rsList) .configUrl(configUrl) .workingMode(workingMode) + .obcdcProperties(obcdcProperties) .deserializer(deserializer); return SourceFunctionProvider.of(builder.build(), false); } @@ -233,7 +237,8 @@ public DynamicTableSource copy() { startupTimestamp, rsList, configUrl, - workingMode); + workingMode, + obcdcProperties); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -270,6 +275,7 @@ public boolean equals(Object o) { && Objects.equals(this.rsList, that.rsList) && Objects.equals(this.configUrl, that.configUrl) && Objects.equals(this.workingMode, that.workingMode) + && Objects.equals(this.obcdcProperties, that.obcdcProperties) && Objects.equals(this.producedDataType, that.producedDataType) && Objects.equals(this.metadataKeys, that.metadataKeys); } @@ -299,6 +305,7 @@ public int hashCode() { rsList, configUrl, workingMode, + obcdcProperties, producedDataType, metadataKeys); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java index 07e0d0bc805..5ffa834ca0f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java @@ -30,7 +30,9 @@ import java.time.Duration; import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.Set; /** Factory for creating configured instance of {@link OceanBaseTableSource}. */ @@ -173,11 +175,13 @@ public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory { .withDescription( "The working mode of 'obcdc', can be `storage` (default value, supported from `obcdc` 3.1.3) or `memory`."); + public static final String OBCDC_PROPERTIES_PREFIX = "obcdc.properties."; + @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validateExcept(JdbcUrlUtils.PROPERTIES_PREFIX); + helper.validateExcept(JdbcUrlUtils.PROPERTIES_PREFIX, OBCDC_PROPERTIES_PREFIX); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); @@ -233,7 +237,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { startupTimestamp, rsList, configUrl, - workingMode); + workingMode, + getProperties(context.getCatalogTable().getOptions(), OBCDC_PROPERTIES_PREFIX)); } @Override @@ -295,4 +300,17 @@ private void validate(ReadableConfig config) { } } } + + private Properties getProperties(Map tableOptions, String prefix) { + Properties properties = new Properties(); + tableOptions.keySet().stream() + .filter(key -> key.startsWith(prefix)) + .forEach( + key -> { + final String value = tableOptions.get(key); + final String subKey = key.substring(prefix.length()); + properties.put(subKey, value); + }); + return properties; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java index 80fbbddf3c3..bd95e6f64fb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java @@ -374,7 +374,8 @@ public void testAllDataTypes() throws Exception { + " 'logproxy.port' = '%s'," + " 'rootserver-list' = '%s'," + " 'working-mode' = 'memory'," - + " 'jdbc.properties.useSSL' = 'false'" + + " 'jdbc.properties.useSSL' = 'false'," + + " 'obcdc.properties.sort_trans_participants' = '1'" + ")", getUsername(), getPassword(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java index f1340b2bdc5..d66ccdad28c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java @@ -123,7 +123,8 @@ public void testCommonProperties() { null, RS_LIST, null, - WORKING_MODE); + WORKING_MODE, + new Properties()); assertEquals(expectedSource, actualSource); } @@ -165,7 +166,8 @@ public void testOptionalProperties() { null, RS_LIST, null, - WORKING_MODE); + WORKING_MODE, + new Properties()); assertEquals(expectedSource, actualSource); } @@ -207,7 +209,8 @@ public void testMetadataColumns() { null, RS_LIST, null, - WORKING_MODE); + WORKING_MODE, + new Properties()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "tenant_name", "database_name", "table_name");