Skip to content

Commit

Permalink
change module version and modify deprecated method
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Oct 19, 2022
1 parent e03c4ab commit 913467a
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 22 deletions.
6 changes: 3 additions & 3 deletions flink-connector-db2-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ under the License.
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.1-SNAPSHOT</version>
<version>2.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -67,7 +67,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -127,7 +127,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package com.ververica.cdc.connectors.db2.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

Expand All @@ -40,7 +41,10 @@
/** TableSource for DB2 CDC connector. */
public class Db2TableSource implements ScanTableSource {

private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
/** Data type that describes the final output of the source. */
protected DataType producedDataType;

private final int port;
private final String hostname;
private final String database;
Expand All @@ -53,7 +57,7 @@ public class Db2TableSource implements ScanTableSource {
private final Properties dbzProperties;

public Db2TableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
Expand All @@ -75,6 +79,7 @@ public Db2TableSource(
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
}

@Override
Expand All @@ -89,12 +94,16 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
final TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setResultTypeInfo(typeInfo)
.setServerTimeZone(serverTimeZone)
.build();
Db2Source.Builder<RowData> builder =
Db2Source.<RowData>builder()
.hostname(hostname)
Expand All @@ -107,7 +116,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.deserializer(deserializer)
.startupOptions(startupOptions);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();

return SourceFunctionProvider.of(sourceFunction, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;

import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;

/** Table source factory for DB2 CDC connector. */
public class Db2TableSourceFactory implements DynamicTableSourceFactory {
Expand Down Expand Up @@ -113,8 +113,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
StartupOptions startupOptions = getStartupOptions(config);

return new Db2TableSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@
*/
public enum StartupMode {
INITIAL,

LATEST_OFFSET
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ private StartupOptions(StartupMode startupMode) {

switch (startupMode) {
case INITIAL:

case LATEST_OFFSET:
break;

default:
throw new UnsupportedOperationException(startupMode + " mode is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Test;
Expand All @@ -42,6 +41,7 @@
import java.util.Map;
import java.util.Properties;

import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testCommonProperties() {
DynamicTableSource actualSource = createTableSource(properties);
Db2TableSource expectedSource =
new Db2TableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
getPhysicalSchema(SCHEMA),
50000,
MY_LOCALHOST,
MY_DATABASE,
Expand All @@ -103,7 +103,7 @@ public void testOptionalProperties() {
dbzProperties.put("snapshot.mode", "schema_only");
Db2TableSource expectedSource =
new Db2TableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
getPhysicalSchema(SCHEMA),
50000,
MY_LOCALHOST,
MY_DATABASE,
Expand Down

0 comments on commit 913467a

Please sign in to comment.