Skip to content

Commit

Permalink
add metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Oct 28, 2022
1 parent 95d9545 commit 19420de
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.db2.table;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;

import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/** Defines the supported metadata columns for {@link Db2TableSource}. */
public enum Db2ReadableMetaData {

/** Name of the table that contain the row. */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
/** Name of the schema that contain the row. */
SCHEMA_NAME(
"schema_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY));
}
}),

/** Name of the database that contain the row. */
DATABASE_NAME(
"database_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
}
}),

/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the change stream, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP_LTZ(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
});

private final String key;

private final DataType dataType;

private final MetadataConverter converter;

Db2ReadableMetaData(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}

public String getKey() {
return key;
}

public DataType getDataType() {
return dataType;
}

public MetadataConverter getConverter() {
return converter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -30,14 +31,20 @@
import com.ververica.cdc.connectors.db2.Db2Source;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;

import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** TableSource for DB2 CDC connector. */
public class Db2TableSource implements ScanTableSource {
public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata {

private final ResolvedSchema physicalSchema;
/** Data type that describes the final output of the source. */
Expand All @@ -54,6 +61,9 @@ public class Db2TableSource implements ScanTableSource {
private final StartupOptions startupOptions;
private final Properties dbzProperties;

/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;

public Db2TableSource(
ResolvedSchema physicalSchema,
int port,
Expand All @@ -78,6 +88,7 @@ public Db2TableSource(
this.dbzProperties = dbzProperties;
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}

@Override
Expand All @@ -94,11 +105,13 @@ public ChangelogMode getChangelogMode() {
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
final TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setServerTimeZone(serverTimeZone)
.build();
Expand All @@ -117,20 +130,40 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return SourceFunctionProvider.of(sourceFunction, false);
}

private MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}

return metadataKeys.stream()
.map(
key ->
Stream.of(Db2ReadableMetaData.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(Db2ReadableMetaData::getConverter)
.toArray(MetadataConverter[]::new);
}

@Override
public DynamicTableSource copy() {
return new Db2TableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
startupOptions);
Db2TableSource source =
new Db2TableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
startupOptions);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}

@Override
Expand All @@ -151,7 +184,8 @@ public boolean equals(Object o) {
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(serverTimeZone, that.serverTimeZone)
&& Objects.equals(dbzProperties, that.dbzProperties);
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(metadataKeys, that.metadataKeys);
}

@Override
Expand All @@ -166,11 +200,26 @@ public int hashCode() {
username,
password,
serverTimeZone,
dbzProperties);
dbzProperties,
metadataKeys);
}

@Override
public String asSummaryString() {
return "DB2-CDC";
}

@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(Db2ReadableMetaData.values())
.collect(
Collectors.toMap(
Db2ReadableMetaData::getKey, Db2ReadableMetaData::getDataType));
}

@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;

/** Table source factory for DB2 CDC connector. */
public class Db2TableSourceFactory implements DynamicTableSourceFactory {
Expand Down Expand Up @@ -111,8 +110,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
StartupOptions startupOptions = getStartupOptions(config);

return new Db2TableSource(
Expand Down
Loading

0 comments on commit 19420de

Please sign in to comment.