Skip to content

Commit

Permalink
[cdc-connector][oceanbase] Add option for obcdc extra configs (#2543)
Browse files Browse the repository at this point in the history
This closes #2543.
  • Loading branch information
whhe authored Dec 5, 2023
1 parent 0eae214 commit a10e07f
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 8 deletions.
7 changes: 7 additions & 0 deletions docs/content/connectors/oceanbase-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,13 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>String</td>
<td>传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'</td>
</tr>
<tr>
<td>obcdc.properties.*</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>传递参数到<code>libobcdc</code>,如 'obcdc.properties.sort_trans_participants' = '1'。更多参数信息见 <a href="https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000221094">obcdc 配置项说明</a></td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/connectors/oceanbase-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ The OceanBase CDC Connector contains some options for both sql and stream api as
<td>String</td>
<td>Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.</td>
</tr>
<tr>
<td>obcdc.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Option to pass custom configurations to the <code>libobcdc</code>, eg: 'obcdc.properties.sort_trans_participants' = '1'. Please refer to <a href="https://en.oceanbase.com/docs/common-oceanbase-database-10000000000872541">obcdc parameters</a> for more details.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -69,6 +71,7 @@ public static class Builder<T> {
private String rsList;
private String configUrl;
private String workingMode;
private Properties obcdcProperties;

private OceanBaseDeserializationSchema<T> deserializer;

Expand Down Expand Up @@ -177,6 +180,11 @@ public Builder<T> workingMode(String workingMode) {
return this;
}

public Builder<T> obcdcProperties(Properties obcdcProperties) {
this.obcdcProperties = obcdcProperties;
return this;
}

public Builder<T> deserializer(OceanBaseDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
Expand Down Expand Up @@ -261,6 +269,12 @@ public SourceFunction<T> build() {
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
}

return new OceanBaseRichSourceFunction<>(
StartupMode.INITIAL.equals(startupMode),
username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
private final String rsList;
private final String configUrl;
private final String workingMode;
private final Properties obcdcProperties;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -103,7 +104,8 @@ public OceanBaseTableSource(
Long startupTimestamp,
String rsList,
String configUrl,
String workingMode) {
String workingMode,
Properties obcdcProperties) {
this.physicalSchema = physicalSchema;
this.startupMode = checkNotNull(startupMode);
this.username = checkNotNull(username);
Expand All @@ -126,6 +128,7 @@ public OceanBaseTableSource(
this.rsList = rsList;
this.configUrl = configUrl;
this.workingMode = workingMode;
this.obcdcProperties = obcdcProperties;

this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
Expand Down Expand Up @@ -174,6 +177,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
.rsList(rsList)
.configUrl(configUrl)
.workingMode(workingMode)
.obcdcProperties(obcdcProperties)
.deserializer(deserializer);
return SourceFunctionProvider.of(builder.build(), false);
}
Expand Down Expand Up @@ -233,7 +237,8 @@ public DynamicTableSource copy() {
startupTimestamp,
rsList,
configUrl,
workingMode);
workingMode,
obcdcProperties);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -270,6 +275,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.rsList, that.rsList)
&& Objects.equals(this.configUrl, that.configUrl)
&& Objects.equals(this.workingMode, that.workingMode)
&& Objects.equals(this.obcdcProperties, that.obcdcProperties)
&& Objects.equals(this.producedDataType, that.producedDataType)
&& Objects.equals(this.metadataKeys, that.metadataKeys);
}
Expand Down Expand Up @@ -299,6 +305,7 @@ public int hashCode() {
rsList,
configUrl,
workingMode,
obcdcProperties,
producedDataType,
metadataKeys);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;

/** Factory for creating configured instance of {@link OceanBaseTableSource}. */
Expand Down Expand Up @@ -173,11 +175,13 @@ public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory {
.withDescription(
"The working mode of 'obcdc', can be `storage` (default value, supported from `obcdc` 3.1.3) or `memory`.");

public static final String OBCDC_PROPERTIES_PREFIX = "obcdc.properties.";

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(JdbcUrlUtils.PROPERTIES_PREFIX);
helper.validateExcept(JdbcUrlUtils.PROPERTIES_PREFIX, OBCDC_PROPERTIES_PREFIX);

ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();

Expand Down Expand Up @@ -233,7 +237,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
startupTimestamp,
rsList,
configUrl,
workingMode);
workingMode,
getProperties(context.getCatalogTable().getOptions(), OBCDC_PROPERTIES_PREFIX));
}

@Override
Expand Down Expand Up @@ -295,4 +300,17 @@ private void validate(ReadableConfig config) {
}
}
}

private Properties getProperties(Map<String, String> tableOptions, String prefix) {
Properties properties = new Properties();
tableOptions.keySet().stream()
.filter(key -> key.startsWith(prefix))
.forEach(
key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring(prefix.length());
properties.put(subKey, value);
});
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ public void testAllDataTypes() throws Exception {
+ " 'logproxy.port' = '%s',"
+ " 'rootserver-list' = '%s',"
+ " 'working-mode' = 'memory',"
+ " 'jdbc.properties.useSSL' = 'false'"
+ " 'jdbc.properties.useSSL' = 'false',"
+ " 'obcdc.properties.sort_trans_participants' = '1'"
+ ")",
getUsername(),
getPassword(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void testCommonProperties() {
null,
RS_LIST,
null,
WORKING_MODE);
WORKING_MODE,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -165,7 +166,8 @@ public void testOptionalProperties() {
null,
RS_LIST,
null,
WORKING_MODE);
WORKING_MODE,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -207,7 +209,8 @@ public void testMetadataColumns() {
null,
RS_LIST,
null,
WORKING_MODE);
WORKING_MODE,
new Properties());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "tenant_name", "database_name", "table_name");
Expand Down

0 comments on commit a10e07f

Please sign in to comment.