metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * List of metadata that can be read with this format.
+ */
+ public enum ReadableMetadata {
+
+ SCHEMA(
+ "schema",
+ DataTypes.STRING().nullable(),
+ false,
+ DataTypes.FIELD("schema", DataTypes.STRING()),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+
+ INGESTION_TIMESTAMP(
+ "ingestion-timestamp",
+ DataTypes.BIGINT().nullable(),
+ true,
+ DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getLong(pos);
+ }
+ }),
+
+ SOURCE_TIMESTAMP(
+ "source.timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ true,
+ DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ final StringData timestamp =
+ (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
+ if (timestamp == null) {
+ return null;
+ }
+ return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
+ }
+ }),
+
+ SOURCE_DATABASE(
+ "source.database",
+ DataTypes.STRING().nullable(),
+ true,
+ DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return readProperty(row, pos, KEY_SOURCE_DATABASE);
+ }
+ }),
+
+ SOURCE_SCHEMA(
+ "source.schema",
+ DataTypes.STRING().nullable(),
+ true,
+ DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return readProperty(row, pos, KEY_SOURCE_SCHEMA);
+ }
+ }),
+
+ SOURCE_TABLE(
+ "source.table",
+ DataTypes.STRING().nullable(),
+ true,
+ DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return readProperty(row, pos, KEY_SOURCE_TABLE);
+ }
+ }),
+
+ SOURCE_PROPERTIES(
+ "source.properties",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
+ .nullable(),
+ true,
+ DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getMap(pos);
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final boolean isJsonPayload;
+
+ final DataTypes.Field requiredJsonField;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(
+ String key,
+ DataType dataType,
+ boolean isJsonPayload,
+ DataTypes.Field requiredJsonField,
+ MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.isJsonPayload = isJsonPayload;
+ this.requiredJsonField = requiredJsonField;
+ this.converter = converter;
+ }
+
+ public String getKey() {
+ return key;
+ }
+ }
+
+ private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");
+
+ private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");
+
+ private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");
+
+ private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
+
+ private static Object readProperty(GenericRowData row, int pos, StringData key) {
+ final GenericMapData map = (GenericMapData) row.getMap(pos);
+ if (map == null) {
+ return null;
+ }
+ return map.get(key);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
new file mode 100644
index 00000000000..a53d6c99654
--- /dev/null
+++ b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.debezium;
+
+import org.apache.inlong.sort.formats.json.MysqlBinLogData;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.inlong.sort.formats.json.debezium.DebeziumUtils.getMysqlMetadataKey;
+
+/**
+ * Copied from apache flink project with a litter change.
+ *
+ * Deserialization schema from Debezium JSON to Flink Table/SQL internal data structure {@link
+ * RowData}. The deserialization schema knows Debezium's schema definition and can extract the
+ * database data and convert into {@link RowData} with {@link RowKind}.
+ *
+ * Deserializes a byte[]
message as a JSON object and reads the specified fields.
+ *
+ * Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see Debezium
+ */
+@Internal
+public final class DebeziumJsonDeserializationSchema implements DeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String OP_READ = "r"; // snapshot read
+ private static final String OP_CREATE = "c"; // insert
+ private static final String OP_UPDATE = "u"; // update
+ private static final String OP_DELETE = "d"; // delete
+
+ private static final int BEFORE_POS = 0;
+ private static final int AFTER_POS = 1;
+ private static final int OP_POS = 2;
+
+ private static final String REPLICA_IDENTITY_EXCEPTION =
+ "The \"before\" field of %s message is null, "
+ + "if you are using Debezium Postgres Connector, "
+ + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+ /** The deserializer to deserialize Debezium JSON data. */
+ private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+ /** Flag that indicates that an additional projection is required for metadata. */
+ private final boolean hasMetadata;
+
+ /** Metadata to be extracted for every record. */
+ private final MetadataConverter[] metadataConverters;
+
+ private final List requestedMetadata;
+
+ /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
+ private final TypeInformation producedTypeInfo;
+
+ /**
+ * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+ * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+ * information, but we just ignore "schema" and extract data from "payload".
+ */
+ private final boolean schemaInclude;
+
+ /**
+ * Flag indicating whether to emit update before row.
+ */
+ private final boolean updateBeforeInclude;
+
+ /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ private final boolean ignoreParseErrors;
+
+ private final boolean isMigrateAll;
+
+ /**
+ * Constructor of DebeziumJsonDeserializationSchema.
+ */
+ public DebeziumJsonDeserializationSchema(
+ DataType physicalDataType,
+ List requestedMetadata,
+ TypeInformation producedTypeInfo,
+ boolean schemaInclude,
+ boolean updateBeforeInclude,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat,
+ boolean isMigrateAll) {
+ this.isMigrateAll = isMigrateAll;
+ final RowType jsonRowType =
+ createJsonRowType(physicalDataType, requestedMetadata, schemaInclude, isMigrateAll);
+ this.jsonDeserializer =
+ new JsonRowDataDeserializationSchema(
+ jsonRowType,
+ // the result type is never used, so it's fine to pass in the produced type
+ // info
+ producedTypeInfo,
+ false, // ignoreParseErrors already contains the functionality of
+ // failOnMissingField
+ ignoreParseErrors,
+ timestampFormat);
+ this.hasMetadata = requestedMetadata.size() > 0;
+ this.metadataConverters =
+ createMetadataConverters(jsonRowType, requestedMetadata, schemaInclude);
+ this.requestedMetadata = requestedMetadata;
+ this.producedTypeInfo = producedTypeInfo;
+ this.schemaInclude = schemaInclude;
+ this.updateBeforeInclude = updateBeforeInclude;
+ this.ignoreParseErrors = ignoreParseErrors;
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector out) throws IOException {
+ if (message == null || message.length == 0) {
+ // skip tombstone messages
+ return;
+ }
+ try {
+ GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
+ GenericRowData payload;
+ if (schemaInclude) {
+ payload = (GenericRowData) row.getField(0);
+ } else {
+ payload = row;
+ }
+
+ GenericRowData before;
+ GenericRowData after;
+ if (isMigrateAll) {
+ before = GenericRowData.of(payload.getField(BEFORE_POS));
+ after = GenericRowData.of(payload.getField(AFTER_POS));
+ } else {
+ before = (GenericRowData) payload.getField(BEFORE_POS);
+ after = (GenericRowData) payload.getField(AFTER_POS);
+ }
+
+ String op = payload.getField(OP_POS).toString();
+ if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
+ after.setRowKind(RowKind.INSERT);
+ emitRow(row, after, out);
+ } else if (OP_UPDATE.equals(op)) {
+ if (before == null) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ if (updateBeforeInclude) {
+ emitRow(row, before, out);
+ }
+ emitRow(row, after, out);
+ } else if (OP_DELETE.equals(op)) {
+ if (before == null) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
+ }
+ before.setRowKind(RowKind.DELETE);
+ emitRow(row, before, out);
+ } else {
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format(
+ "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'",
+ op, new String(message)));
+ }
+ }
+ } catch (Throwable t) {
+ // a big try catch to protect the processing.
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
+ }
+ }
+ }
+
+ private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector out) {
+ int physicalArity = physicalRow.getArity();
+ if (isMigrateAll) {
+ physicalArity = 0;
+ }
+ final int metadataArity = metadataConverters.length;
+
+ final GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + 1);
+
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos + 1, physicalRow.getField(physicalPos));
+ }
+
+ // Put metadata in the first field of the emitted RowData
+ Map metadataMap = new HashMap<>();
+ metadataMap.put(
+ StringData.fromString(MysqlBinLogData.MYSQL_METADATA_IS_DDL),
+ StringData.fromString("false"));
+
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ metadataMap.put(
+ StringData.fromString(getMysqlMetadataKey(requestedMetadata.get(metadataPos))),
+ StringData.fromString(metadataConverters[metadataPos].convert(rootRow).toString()));
+ }
+
+ if (isMigrateAll) {
+ metadataMap.put(
+ StringData.fromString(MysqlBinLogData.MYSQL_METADATA_DATA),
+ (StringData) physicalRow.getField(0));
+ }
+
+ producedRow.setField(0, new GenericMapData(metadataMap));
+
+ out.collect(producedRow);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DebeziumJsonDeserializationSchema that = (DebeziumJsonDeserializationSchema) o;
+ return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+ && hasMetadata == that.hasMetadata
+ && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+ && schemaInclude == that.schemaInclude
+ && ignoreParseErrors == that.ignoreParseErrors;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jsonDeserializer, hasMetadata, producedTypeInfo, schemaInclude, ignoreParseErrors);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static RowType createJsonRowType(
+ DataType physicalDataType,
+ List readableMetadata,
+ boolean schemaInclude,
+ boolean isMigrateAll) {
+
+ DataType dataTypeForDataFields = physicalDataType;
+ if (isMigrateAll) {
+ dataTypeForDataFields = DataTypes.STRING();
+ }
+
+ DataType payload = DataTypes.ROW(
+ DataTypes.FIELD("before", dataTypeForDataFields),
+ DataTypes.FIELD("after", dataTypeForDataFields),
+ DataTypes.FIELD("op", DataTypes.STRING()));
+
+ // append fields that are required for reading metadata in the payload
+ final List payloadMetadataFields =
+ readableMetadata.stream()
+ .filter(m -> m.isJsonPayload)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
+
+ DataType root = payload;
+ if (schemaInclude) {
+ // when Debezium Kafka connect enables "value.converter.schemas.enable",
+ // the JSON will contain "schema" information and we need to extract data from
+ // "payload".
+ root = DataTypes.ROW(DataTypes.FIELD("payload", payload));
+ }
+
+ // append fields that are required for reading metadata in the root
+ final List rootMetadataFields =
+ readableMetadata.stream()
+ .filter(m -> !m.isJsonPayload)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
+
+ return (RowType) root.getLogicalType();
+ }
+
+ private static MetadataConverter[] createMetadataConverters(
+ RowType jsonRowType, List requestedMetadata, boolean schemaInclude) {
+ return requestedMetadata.stream()
+ .map(
+ m -> {
+ if (m.isJsonPayload) {
+ return convertInPayload(jsonRowType, m, schemaInclude);
+ } else {
+ return convertInRoot(jsonRowType, m);
+ }
+ })
+ .toArray(MetadataConverter[]::new);
+ }
+
+ private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
+ final int pos = findFieldPos(metadata, jsonRowType);
+ return new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData root, int unused) {
+ return metadata.converter.convert(root, pos);
+ }
+ };
+ }
+
+ private static MetadataConverter convertInPayload(
+ RowType jsonRowType, ReadableMetadata metadata, boolean schemaInclude) {
+ if (schemaInclude) {
+ final int pos = findFieldPos(metadata, (RowType) jsonRowType.getChildren().get(0));
+ return new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData root, int unused) {
+ final GenericRowData payload = (GenericRowData) root.getField(0);
+ return metadata.converter.convert(payload, pos);
+ }
+ };
+ }
+ return convertInRoot(jsonRowType, metadata);
+ }
+
+ private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
+ return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Converter that extracts a metadata field from the row (root or payload) that comes out of the
+ * JSON schema and converts it to the desired data type.
+ */
+ interface MetadataConverter extends Serializable {
+
+ // Method for top-level access.
+ default Object convert(GenericRowData row) {
+ return convert(row, -1);
+ }
+
+ Object convert(GenericRowData row, int pos);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java
new file mode 100644
index 00000000000..f95a3bef4ac
--- /dev/null
+++ b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.debezium;
+
+import org.apache.inlong.sort.formats.json.MysqlBinLogData;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
+
+public class DebeziumUtils {
+
+ public static String getMysqlMetadataKey(ReadableMetadata readableMetadata) {
+ switch (readableMetadata) {
+ case SOURCE_DATABASE:
+ return MysqlBinLogData.MYSQL_METADATA_DATABASE;
+ case SOURCE_TABLE:
+ return MysqlBinLogData.MYSQL_METADATA_TABLE;
+ case INGESTION_TIMESTAMP:
+ return MysqlBinLogData.MYSQL_METADATA_EVENT_TIME;
+ default:
+ throw new IllegalArgumentException("Not supported yet");
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
new file mode 100644
index 00000000000..8223129964b
--- /dev/null
+++ b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.utils;
+
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Map;
+
+import static org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+import static org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;
+
+public class FormatJsonUtil {
+
+ public static final Map SQL_TYPE_2_FLINK_TYPE_MAPPING =
+ ImmutableMap.builder()
+ .put(java.sql.Types.CHAR, new CharType())
+ .put(java.sql.Types.VARCHAR, new VarCharType())
+ .put(java.sql.Types.SMALLINT, new SmallIntType())
+ .put(java.sql.Types.INTEGER, new IntType())
+ .put(java.sql.Types.BIGINT, new BigIntType())
+ .put(java.sql.Types.REAL, new FloatType())
+ .put(java.sql.Types.DOUBLE, new DoubleType())
+ .put(java.sql.Types.FLOAT, new FloatType())
+ .put(java.sql.Types.DECIMAL, new DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
+ .put(java.sql.Types.NUMERIC, new DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
+ .put(java.sql.Types.BIT, new BooleanType())
+ .put(java.sql.Types.TIME, new TimeType())
+ .put(java.sql.Types.TIME_WITH_TIMEZONE, new TimeType())
+ .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new LocalZonedTimestampType())
+ .put(ORACLE_TIMESTAMP_TIME_ZONE, new LocalZonedTimestampType())
+ .put(java.sql.Types.TIMESTAMP, new TimestampType())
+ .put(java.sql.Types.BINARY, new BinaryType())
+ .put(java.sql.Types.VARBINARY, new VarBinaryType())
+ .put(java.sql.Types.BLOB, new VarBinaryType())
+ .put(java.sql.Types.CLOB, new VarBinaryType())
+ .put(java.sql.Types.DATE, new DateType())
+ .put(java.sql.Types.BOOLEAN, new BooleanType())
+ .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
+ .put(java.sql.Types.LONGVARBINARY, new VarCharType())
+ .put(java.sql.Types.LONGVARCHAR, new VarCharType())
+ .put(java.sql.Types.ARRAY, new VarCharType())
+ .put(java.sql.Types.NCHAR, new CharType())
+ .put(java.sql.Types.NCLOB, new VarBinaryType())
+ .put(java.sql.Types.TINYINT, new TinyIntType())
+ .put(java.sql.Types.OTHER, new VarCharType())
+ .build();
+ public static final Map SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
+ ImmutableMap.builder()
+ .put(java.sql.Types.CHAR, new CharType())
+ .put(java.sql.Types.VARCHAR, new VarCharType())
+ .put(java.sql.Types.SMALLINT, new SmallIntType())
+ .put(java.sql.Types.INTEGER, new IntType())
+ .put(java.sql.Types.BIGINT, new BigIntType())
+ .put(java.sql.Types.REAL, new FloatType())
+ .put(java.sql.Types.DOUBLE, new DoubleType())
+ .put(java.sql.Types.FLOAT, new FloatType())
+ .put(java.sql.Types.DECIMAL, new DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
+ .put(java.sql.Types.NUMERIC, new DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
+ .put(java.sql.Types.BIT, new BooleanType())
+ .put(java.sql.Types.TIME, new VarCharType())
+ .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new LocalZonedTimestampType())
+ .put(ORACLE_TIMESTAMP_TIME_ZONE, new LocalZonedTimestampType())
+ .put(java.sql.Types.TIMESTAMP, new LocalZonedTimestampType())
+ .put(java.sql.Types.BINARY, new BinaryType())
+ .put(java.sql.Types.VARBINARY, new VarBinaryType())
+ .put(java.sql.Types.BLOB, new VarBinaryType())
+ .put(java.sql.Types.DATE, new DateType())
+ .put(java.sql.Types.BOOLEAN, new BooleanType())
+ .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
+ .put(java.sql.Types.LONGVARBINARY, new VarCharType())
+ .put(java.sql.Types.LONGVARCHAR, new VarCharType())
+ .put(java.sql.Types.ARRAY, new VarCharType())
+ .put(java.sql.Types.NCHAR, new CharType())
+ .put(java.sql.Types.NCLOB, new VarBinaryType())
+ .put(java.sql.Types.TINYINT, new TinyIntType())
+ .put(java.sql.Types.OTHER, new VarCharType())
+ .build();
+ public static final Map DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
+ ImmutableMap.builder()
+ .put("BOOLEAN", new BooleanType())
+ .put("INT8", new TinyIntType())
+ .put("INT16", new SmallIntType())
+ .put("INT32", new IntType())
+ .put("INT64", new BigIntType())
+ .put("FLOAT32", new FloatType())
+ .put("FLOAT64", new DoubleType())
+ .put("STRING", new VarCharType())
+ .put("BYTES", new VarBinaryType())
+ .build();
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(DataType physicalRowDataType) {
+ return rowDataToJsonConverter(TimestampFormat.SQL, null, physicalRowDataType);
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(TimestampFormat timestampFormat,
+ String mapNullKeyLiteral,
+ DataType physicalRowDataType) {
+ return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, mapNullKeyLiteral, physicalRowDataType);
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(TimestampFormat timestampFormat,
+ MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral, DataType physicalRowDataType) {
+ return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
+ .createConverter(physicalRowDataType.getLogicalType());
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType rowType) {
+ return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(TimestampFormat timestampFormat,
+ String mapNullKeyLiteral,
+ LogicalType rowType) {
+ return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, mapNullKeyLiteral, rowType);
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(TimestampFormat timestampFormat,
+ MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral, LogicalType rowType) {
+ return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
+ .createConverter(rowType);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..7ce31e31ed4
--- /dev/null
+++ b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedFormatFactory
diff --git a/inlong-sort/sort-formats/format-row/format-json-v1.18/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java
new file mode 100644
index 00000000000..fe54e39e1ab
--- /dev/null
+++ b/inlong-sort/sort-formats/format-row/format-json-v1.18/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+public class CanalJsonEnhancedFormatFactoryTest {
+
+ private static final InternalTypeInfo ROW_TYPE_INFO =
+ InternalTypeInfo.of(PHYSICAL_TYPE);
+
+ @Test
+ public void testUserDefinedOptions() {
+ final Map tableOptions =
+ getModifiedOptions(opts -> {
+ opts.put("canal-json-inlong.map-null-key.mode", "LITERAL");
+ opts.put("canal-json-inlong.map-null-key.literal", "nullKey");
+ opts.put("canal-json-inlong.ignore-parse-errors", "true");
+ opts.put("canal-json-inlong.timestamp-format.standard", "ISO-8601");
+ opts.put("canal-json-inlong.database.include", "mydb");
+ opts.put("canal-json-inlong.table.include", "mytable");
+ opts.put("canal-json-inlong.map-null-key.mode", "LITERAL");
+ opts.put("canal-json-inlong.map-null-key.literal", "nullKey");
+ opts.put("canal-json-inlong.encode.decimal-as-plain-number", "true");
+ });
+
+ // test Deser
+ CanalJsonEnhancedDeserializationSchema expectedDeser =
+ CanalJsonEnhancedDeserializationSchema.builder(
+ PHYSICAL_DATA_TYPE, Collections.emptyList(), ROW_TYPE_INFO)
+ .setIgnoreParseErrors(true)
+ .setTimestampFormat(TimestampFormat.ISO_8601)
+ .setDatabase("mydb")
+ .setTable("mytable")
+ .build();
+ DeserializationSchema actualDeser = createDeserializationSchema(tableOptions);
+ assertEquals(expectedDeser, actualDeser);
+
+ // test Ser
+ CanalJsonEnhancedSerializationSchema expectedSer =
+ new CanalJsonEnhancedSerializationSchema(
+ PHYSICAL_DATA_TYPE,
+ new ArrayList<>(),
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "nullKey",
+ true);
+ SerializationSchema actualSer = createSerializationSchema(tableOptions);
+ assertEquals(expectedSer, actualSer);
+ }
+
+ // ------------------------------------------------------------------------
+ // Public Tools
+ // ------------------------------------------------------------------------
+
+ public static DeserializationSchema createDeserializationSchema(
+ Map options) {
+ DynamicTableSource source = createTableSource(SCHEMA, options);
+
+ assert source instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) source;
+
+ return scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
+ }
+
+ public static SerializationSchema createSerializationSchema(
+ Map options) {
+ DynamicTableSink sink = createTableSink(SCHEMA, options);
+
+ assert sink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) sink;
+
+ return sinkMock.valueFormat.createRuntimeEncoder(
+ new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
+ }
+
+ /**
+ * Returns the full options modified by the given consumer {@code optionModifier}.
+ *
+ * @param optionModifier Consumer to modify the options
+ */
+ public static Map getModifiedOptions(Consumer