diff --git a/descriptors/mongodb/debezium-mongodb-2.0.1.Final.json b/descriptors/mongodb/debezium-mongodb-2.0.1.Final.json
index 6617d6e..09de35d 100644
--- a/descriptors/mongodb/debezium-mongodb-2.0.1.Final.json
+++ b/descriptors/mongodb/debezium-mongodb-2.0.1.Final.json
@@ -162,6 +162,360 @@
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -173,6 +527,55 @@
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
diff --git a/descriptors/mysql/debezium-mysql-2.0.1.Final.json b/descriptors/mysql/debezium-mysql-2.0.1.Final.json
index b931351..99429bd 100644
--- a/descriptors/mysql/debezium-mysql-2.0.1.Final.json
+++ b/descriptors/mysql/debezium-mysql-2.0.1.Final.json
@@ -192,6 +192,360 @@
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -203,6 +557,55 @@
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
diff --git a/descriptors/postgres/debezium-postgres-2.0.1.Final.json b/descriptors/postgres/debezium-postgres-2.0.1.Final.json
index 38753a0..53302e4 100644
--- a/descriptors/postgres/debezium-postgres-2.0.1.Final.json
+++ b/descriptors/postgres/debezium-postgres-2.0.1.Final.json
@@ -216,6 +216,360 @@
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -227,6 +581,55 @@
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
diff --git a/descriptors/sqlserver/debezium-sqlserver-2.0.1.Final.json b/descriptors/sqlserver/debezium-sqlserver-2.0.1.Final.json
index e48f8ab..8c2d287 100644
--- a/descriptors/sqlserver/debezium-sqlserver-2.0.1.Final.json
+++ b/descriptors/sqlserver/debezium-sqlserver-2.0.1.Final.json
@@ -166,6 +166,360 @@
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -177,6 +531,55 @@
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
diff --git a/examples/processors.json b/examples/processors.json
new file mode 100644
index 0000000..462fc58
--- /dev/null
+++ b/examples/processors.json
@@ -0,0 +1,113 @@
+{
+ "processors": [
+ {
+ "x-step-description": "Step 01: Some optional user description what this processor/transformation step is doing",
+ "cast_message": {"apply_to": "message key", "cast_to": "string"}
+ },
+ {"cast_message": {"apply_to": "message value", "cast_to": "string"}},
+
+
+ {
+ "x-step-description": "03 Make abc field a string and the def field a float number",
+ "cast_fields": {
+ "apply_to": "message value",
+ "fields": [ {"field_name": "abc", "cast_to": "string"}, {"field_name": "def", "cast_to": "float64"} ]
+ }
+ },
+ {
+ "cast_fields": {
+ "apply_to": "message key",
+ "fields": [ {"field_name": "id", "cast_to": "string"}, {"field_name": "amount", "cast_to": "float64"} ]
+ }
+ },
+
+
+ {"insert_static_field": {"apply_to": "message key", "field_name": "hello.key.field?", "value": "hello value!"}},
+ {"insert_static_field": {"apply_to": "message value", "field_name": "hello.value.field?", "value": "hello value!"}},
+
+
+ {"insert_kafka_field": {"apply_to": "message key", "field_name": "kafka-topic-field!", "kafka_value": "topic name"}},
+ {"insert_kafka_field": {"apply_to": "message value", "field_name": "kafka-partition-field?", "kafka_value": "partition number"}},
+ {"insert_kafka_field": {"apply_to": "message value", "field_name": "kafka-timestamp-field?", "kafka_value": "message timestamp"}},
+
+
+ {
+ "x-step-description": "A short comment what happens in this transform step",
+ "filter_fields": {
+ "apply_to": "message value",
+ "includes": [ "included.hello.key.field" ]
+ }
+ },
+ {
+ "x-step-description": "Exclude all sensitive data",
+ "filter_fields": {
+ "apply_to": "message key",
+ "excludes": [ "an_excluded_field" ]
+ }
+ },
+ {
+ "filter_fields": {
+ "apply_to": "message key",
+ "rename_fields": [ {"old_name": "c2", "new_name": "address"}, {"old_name": "b2", "new_name": "street_no"} ]
+ }
+ },
+ {
+ "filter_fields": {
+ "apply_to": "message key",
+ "includes": [ "included.hello.key.field" ],
+ "excludes": [ "an_excluded_field" ],
+ "rename_fields": [ {"old_name": "c2", "new_name": "address"}, {"old_name": "b2", "new_name": "street_no"} ]
+ }
+ },
+
+
+ {"mask_fields": {"apply_to": "message key", "fields": [ "password", "signature" ]}},
+ {
+ "x-step-description": "Mask sensitive data (checksum, pw, signature) with xxx",
+ "mask_fields": {"apply_to": "message value", "fields": [ "crc", "password", "signature" ], "replacement": "xxx"}
+ },
+
+
+ {"extract_field": {"apply_to": "message value", "field": "name"}},
+
+
+ {"key_from_value_fields": {"fields": ["id", "customer_id"]}},
+
+
+ {"debezium_topic_router_simple": {"topic_regex": "(.*)customers_shard-(.*)", "output_topic": "$1customer_all_shards"}},
+
+
+ {
+ "x-step-description": "Route sharded 'customers` tables into one topic",
+ "debezium_topic_router_unique_keys": {
+ "topic_regex": "(.*)customers_shard-(.*)",
+ "output_topic": "$1customer_all_shards",
+ "key_field_name": "__shard_id"
+ }
+ },
+ {
+ "debezium_topic_router_unique_keys": {
+ "topic_regex": "(.*)customers_shard-(.*)",
+ "output_topic": "$1customer_all_shards",
+ "key_field_name": "__shard_id",
+ "key_field_topic_regex": "(.*)customers_shard(.*)",
+ "key_field_value": "$2"}
+ },
+
+
+ {
+ "x-step-description": "Use upsert records with tombstones",
+ "convert_to_upsert": { "delete_handling_mode": "tombstone" }
+ },
+ {
+ "convert_to_upsert": {
+ "delete_handling_mode": "drop",
+ "add_metadata_to_value": ["table", "ts_ms", "source.snapshot"],
+ "metadata_value_prefix": "_dbz__",
+ "add_metadata_to_headers": ["source.connector", "source.version", "source.db"],
+ "metadata_headers_prefix": "_dbz__"
+ }
+ }
+
+ ]
+}
diff --git a/gen_templates.sh b/gen_templates.sh
index 0417ec6..5d518e3 100755
--- a/gen_templates.sh
+++ b/gen_templates.sh
@@ -37,4 +37,4 @@ for D in "${CONNECTORS_DIR}"/*; do
--from-file="${CONNECTORS_DIR}/${CM_NAME}/" \
--dry-run=client \
-o yaml | sed -e 's/^/ /' >> $TEMPLATE
-done
\ No newline at end of file
+done
diff --git a/src/main/resources/additional_definitions.json b/src/main/resources/additional_definitions.json
index 9d71451..07fb38e 100644
--- a/src/main/resources/additional_definitions.json
+++ b/src/main/resources/additional_definitions.json
@@ -1,7 +1,61 @@
{
- "serializer" : {
- "type" : "string",
- "enum" : [ "JSON", "JSON without schema" ],
- "default" : "JSON"
+ "serializer": {
+ "type": "string",
+ "enum": [ "JSON", "JSON without schema" ],
+ "default": "JSON"
+ },
+ "processor_step_description": {
+ "title": "Step name or description",
+ "description": "Optionally you can give this processor step a name or a short description.",
+ "type": "string"
+ },
+ "insert_field_name": {
+ "title": "Field name",
+ "description": "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type": "string"
+ },
+ "apply_to": {
+ "title": "Apply to",
+ "description": "Apply transformation to Kafka message key or message value.",
+ "type": "string",
+ "enum": [ "message key", "message value" ]
+ },
+ "kafka_connect_types": {
+ "type": "string",
+ "enum": ["string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean"]
+ },
+ "include_exclude_list": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "debezium_topic_router": {
+ "type": "object",
+ "additionalProperties": false,
+ "required": ["topic", "replacement"],
+ "properties": {
+ "topic": {
+ "type": "string",
+ "format": "regex"
+ },
+ "replacement": {
+ "type": "string",
+ "format": "regex"
+ }
+ }
+ },
+ "debezium_metadata": {
+ "type": "array",
+ "items": {
+ "type": "string",
+ "enum": ["op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot"]
+ }
+ },
+ "debezium_metadata_prefix": {
+ "title": "Metadata field name prefix",
+ "description": "Optional prefix for the added Debezium metadata fields.",
+ "type": "string",
+ "default": "__"
}
}
diff --git a/src/main/resources/additional_properties.json b/src/main/resources/additional_properties.json
index d5d0169..d27af7b 100644
--- a/src/main/resources/additional_properties.json
+++ b/src/main/resources/additional_properties.json
@@ -1,22 +1,390 @@
{
- "data_shape" : {
- "type" : "object",
- "additionalProperties" : false,
- "properties" : {
- "key" : {
- "title" : "Kafka Message Key Format",
- "description" : "The serialization format for the Kafka message key.",
- "x-name" : "data_shape.key",
- "x-category" : "CONNECTOR",
- "$ref" : "#/$defs/serializer"
+ "data_shape": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "key": {
+ "title": "Kafka Message Key Format",
+ "description": "The serialization format for the Kafka message key.",
+ "x-name": "data_shape.key",
+ "x-category": "CONNECTOR",
+ "$ref": "#/$defs/serializer"
},
- "value" : {
- "title" : "Kafka Message Value Format",
- "description" : "The serialization format for the Kafka message value.",
- "x-name" : "data_shape.value",
- "x-category" : "CONNECTOR",
- "$ref" : "#/$defs/serializer"
+ "value": {
+ "title": "Kafka Message Value Format",
+ "description": "The serialization format for the Kafka message value.",
+ "x-name": "data_shape.value",
+ "x-category": "CONNECTOR",
+ "$ref": "#/$defs/serializer"
}
}
+ },
+ "processors": {
+ "type": "array",
+ "items": {
+ "oneOf": [
+ {
+ "title": "Cast Kafka message key or value",
+ "description": "Cast an entire Kafka message key or value to another data type.",
+ "type": "object",
+ "required": ["cast_message"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "cast_message": {
+ "type": "object",
+ "required": ["apply_to", "cast_to"],
+ "additionalProperties": false,
+ "properties": {
+ "apply_to": {
+ "$ref": "#/$defs/apply_to"
+ },
+ "cast_to": {
+ "title": "Target data type",
+ "description": "The target data type to cast to.",
+ "$ref": "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Cast payload fields",
+ "description": "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type": "object",
+ "required": ["cast_fields"],
+ "additionalProperties": false,
+ "properties": {
+ "cast_fields": {
+ "type": "object",
+ "required": ["apply_to", "fields"],
+ "additionalProperties": false,
+ "properties": {
+ "apply_to": {
+ "$ref": "#/$defs/apply_to"
+ },
+ "fields": {
+ "title": "Fields",
+ "description": "A list of the field names and the target data types for the fields that you want to cast.",
+ "type": "array",
+ "items": {
+ "type": "object",
+ "required": ["field_name", "cast_to"],
+ "additionalProperties": false,
+ "properties": {
+ "field_name": {
+ "title": "Field name",
+ "description": "The name of the field in the payload whose value you want to cast to another data type.",
+ "type": "string"
+ },
+ "cast_to": {
+ "title": "Target type",
+ "description": "The target data type to cast to.",
+ "$ref": "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Insert a static field",
+ "description": "Insert a field with a static string value into the Kafka message key or value payload.",
+ "type": "object",
+ "required": ["insert_static_field"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "insert_static_field": {
+ "additionalProperties": false,
+ "required": ["apply_to", "field_name", "value"],
+ "properties": {
+ "apply_to": {
+ "$ref": "#/$defs/apply_to"
+ },
+ "field_name": {
+ "$ref": "#/$defs/insert_field_name"
+ },
+ "value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Insert Kafka metadata field",
+ "description": "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type": "object",
+ "required": ["insert_kafka_field"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "insert_kafka_field": {
+ "additionalProperties": false,
+ "required": ["apply_to", "kafka_value", "field_name"],
+ "properties": {
+ "apply_to": {
+ "$ref": "#/$defs/apply_to"
+ },
+ "kafka_value": {
+ "title": "Kafka record metadata type",
+ "description": "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type": "string",
+ "enum": ["topic name", "partition number", "message timestamp"]
+ },
+ "field_name": {
+ "$ref": "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Filter or rename fields",
+ "description": "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type": "object",
+ "required": ["filter_fields"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "filter_fields": {
+ "additionalProperties": false,
+ "required": ["apply_to"],
+ "anyOf": [{"required": ["includes"]}, {"required": ["excludes"]}, {"required": ["rename_fields"]}],
+ "properties": {
+ "apply_to": {
+ "$ref": "#/$defs/apply_to"
+ },
+ "includes": {
+ "title": "Included fields",
+ "description": "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref": "#/$defs/include_exclude_list"
+ },
+ "excludes": {
+ "title": "Excluded fields",
+ "description": "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref": "#/$defs/include_exclude_list"
+ },
+ "rename_fields": {
+ "title": "Rename fields",
+ "description": "Rename fields in the payload.",
+ "type": "array",
+ "items": {
+ "type": "object",
+ "required": ["old_name", "new_name"],
+ "additionalProperties": false,
+ "properties": {
+ "old_name": {
+ "title": "Old field name",
+ "description": "The original label of the field that you want to rename in the payload.",
+ "type": "string"
+ },
+ "new_name": {
+ "title": "New field name",
+ "description": "The new name of the field.",
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Mask fields",
+ "description": "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type": "object",
+ "required": ["mask_fields"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "mask_fields": {
+ "additionalProperties": false,
+ "required": ["apply_to", "fields"],
+ "properties": {
+ "apply_to": {"$ref": "#/$defs/apply_to"},
+ "fields": {
+ "title": "Fields",
+ "description": "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "replacement": {
+ "title": "Replacement",
+ "description": "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type": "string"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Extract field",
+ "description": "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type": "object",
+ "required": ["extract_field"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "extract_field": {
+ "additionalProperties": false,
+ "required": ["apply_to", "field"],
+ "properties": {
+ "apply_to": {
+ "$ref": "#/$defs/apply_to"
+ },
+ "field": {
+ "title": "Field to extract",
+ "description": "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type": "string"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Message key from fields",
+ "description": "Replace the message key with one or more fields from the message's value payload.",
+ "type": "object",
+ "required": ["key_from_value_fields"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "key_from_value_fields": {
+ "additionalProperties": false,
+ "required": ["fields"],
+ "properties": {
+ "fields": {
+ "title": "Fields for new key",
+ "description": "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description": "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type": "object",
+ "required": ["debezium_topic_router_simple"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "debezium_topic_router_simple": {
+ "additionalProperties": false,
+ "required": ["topic_regex", "output_topic"],
+ "properties": {
+ "topic_regex": {
+ "title": "Input topic regex",
+ "description": "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type": "string",
+ "format": "regex"
+ },
+ "output_topic": {
+ "title": "Output topic",
+ "description": "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type": "string"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description": "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type": "object",
+ "required": ["debezium_topic_router_unique_keys"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "debezium_topic_router_unique_keys": {
+ "additionalProperties": false,
+ "required": ["topic_regex", "output_topic", "key_field_name"],
+ "properties": {
+ "topic_regex": {
+ "title": "Input topic regex",
+ "description": "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type": "string",
+ "format": "regex"
+ },
+ "output_topic": {
+ "title": "Output topic",
+ "description": "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type": "string"
+ },
+ "key_field_name": {
+ "title": "Key field name",
+ "description": "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type": "string",
+ "default": "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex": {
+ "title": "Key field topic regex",
+ "description": "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type": "string",
+ "format": "regex",
+ "default": "(.*)"
+ },
+ "key_field_value": {
+ "title": "Key field value",
+ "description": "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type": "string",
+ "default": "$1"
+ }
+ }
+ }
+ }
+ },
+ {
+ "title": "Convert to upsert record (extract new record state)",
+ "description": "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type": "object",
+ "required": ["convert_to_upsert"],
+ "additionalProperties": false,
+ "properties": {
+ "x-step-description": {"$ref": "#/$defs/processor_step_description"},
+ "convert_to_upsert": {
+ "additionalProperties": false,
+ "required": ["delete_handling_mode"],
+ "properties": {
+ "delete_handling_mode": {
+ "title": "Handle DELETE events",
+ "description": "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type": "string",
+ "default": "tombstone",
+ "enum": ["tombstone", "drop"]
+ },
+ "add_metadata_to_value": {
+ "title": "Add Debezium metadata to the Kafka message value",
+ "description": "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref": "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix": {
+ "$ref": "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers": {
+ "title": "Add Debezium metadata to Kafka message headers",
+ "description": "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref": "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix": {
+ "$ref": "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ }
+ ]
+ }
}
-}
\ No newline at end of file
+}
diff --git a/templates/cos-fleet-catalog-debezium.yaml b/templates/cos-fleet-catalog-debezium.yaml
index 8810075..a8a1671 100644
--- a/templates/cos-fleet-catalog-debezium.yaml
+++ b/templates/cos-fleet-catalog-debezium.yaml
@@ -175,6 +175,360 @@ objects:
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -186,6 +540,55 @@ objects:
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
@@ -407,6 +810,360 @@ objects:
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -418,6 +1175,55 @@ objects:
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
@@ -663,6 +1469,360 @@ objects:
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -674,6 +1834,55 @@ objects:
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}
@@ -869,6 +2078,360 @@ objects:
"$ref" : "#/$defs/serializer"
}
}
+ },
+ "processors" : {
+ "type" : "array",
+ "items" : {
+ "oneOf" : [ {
+ "title" : "Cast Kafka message key or value",
+ "description" : "Cast an entire Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_message" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_message" : {
+ "type" : "object",
+ "required" : [ "apply_to", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "cast_to" : {
+ "title" : "Target data type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "cast_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "cast_fields" : {
+ "type" : "object",
+ "required" : [ "apply_to", "fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "A list of the field names and their target data type of the fields that you want to cast.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "field_name", "cast_to" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field in the payload whose value you want to cast to another data type.",
+ "type" : "string"
+ },
+ "cast_to" : {
+ "title" : "Target type",
+ "description" : "The target data type to cast to.",
+ "$ref" : "#/$defs/kafka_connect_types"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Cast payload fields",
+ "description" : "Cast specific field(s) in a Kafka message key or value to another data type.",
+ "type" : "object",
+ "required" : [ "insert_static_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_static_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field_name", "value" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Insert Kafka metadata field",
+ "description" : "Insert a field into the payload. For the field's value, select a Kafka record metadata type: topic name, partition number, or message timestamp.",
+ "type" : "object",
+ "required" : [ "insert_kafka_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "insert_kafka_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "kafka_value", "field_name" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "kafka_value" : {
+ "title" : "Kafka record metadata type",
+ "description" : "Select the Kafka record metadata (topic name, partition number, or message timestamp) to use for the value of the inserted field.",
+ "type" : "string",
+ "enum" : [ "topic name", "partition number", "message timestamp" ]
+ },
+ "field_name" : {
+ "$ref" : "#/$defs/insert_field_name"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Filter or rename fields",
+ "description" : "Filter out (excludes) or pass (includes) only the specified fields in the payload, and/or rename field names.",
+ "type" : "object",
+ "required" : [ "filter_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "filter_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to" ],
+ "anyOf" : [ {
+ "required" : [ "includes" ]
+ }, {
+ "required" : [ "excludes" ]
+ }, {
+ "required" : [ "rename_fields" ]
+ } ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "includes" : {
+ "title" : "Included fields",
+ "description" : "The list of field names of fields to include in the payload. Fields that are not listed are removed from the payload. Note that the \"Excluded fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "excludes" : {
+ "title" : "Excluded fields",
+ "description" : "The list of field names of fields to remove from the payload. The \"Exclude fields\" list takes precedence over the \"Included fields\" list.",
+ "$ref" : "#/$defs/include_exclude_list"
+ },
+ "rename_fields" : {
+ "title" : "Rename fields",
+ "description" : "Rename fields in the payload.",
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "required" : [ "old_name", "new_name" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "old_name" : {
+ "title" : "Old field name",
+ "description" : "The original label of the field that you want to rename in the payload.",
+ "type" : "string"
+ },
+ "new_name" : {
+ "title" : "New field name",
+ "description" : "The new name of the field.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Mask fields",
+ "description" : "Mask fields with a valid null value for the field type (for example: 0, false, empty string, etc) or with a replacement value (only for numeric and string field types).",
+ "type" : "object",
+ "required" : [ "mask_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "mask_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "fields" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "fields" : {
+ "title" : "Fields",
+ "description" : "The fields in the payload whose original values you want to replace with replacement
. If you do not specify a replacement value, the original value is replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "replacement" : {
+ "title" : "Replacement",
+ "description" : "When set, the values of the specified fields are replaced with this value which will be converted to the correct type. If no replacement value is specified, values are replaced with a valid null value for the field type (for example: 0, false, empty string, etc).",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Extract field",
+ "description" : "Extract a field from the message's key or value payload and replace the whole key or value with the value of the extracted field.",
+ "type" : "object",
+ "required" : [ "extract_field" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "extract_field" : {
+ "additionalProperties" : false,
+ "required" : [ "apply_to", "field" ],
+ "properties" : {
+ "apply_to" : {
+ "$ref" : "#/$defs/apply_to"
+ },
+ "field" : {
+ "title" : "Field to extract",
+ "description" : "The name of the field to extract from the payload and whose value replaces the entire key or value.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Message key from fields",
+ "description" : "Replace the message key with one or more fields from the message's value payload.",
+ "type" : "object",
+ "required" : [ "key_from_value_fields" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "key_from_value_fields" : {
+ "additionalProperties" : false,
+ "required" : [ "fields" ],
+ "properties" : {
+ "fields" : {
+ "title" : "Fields for new key",
+ "description" : "Name(s) of the field(s) in the payload to be used to replace the original message key.",
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Simple Debezium Topic Router (not enforcing message key uniqueness)",
+ "description" : "Simple Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router does not enforce message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_simple" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_simple" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression that evaluates the original destination topic name of each message that the connector emits to determine whether to route it to an alternate destination topic. The expression captures one or more groups of characters to apply in output_topic
.",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Unique Key Debezium Topic Router (enforcing message key uniqueness)",
+ "description" : "Debezium Topic Router for sending change events from multiple physical tables to a specified Kafka topic. The router enforces message key uniqueness. See: Topic Routing section in the Debezium docs.",
+ "type" : "object",
+ "required" : [ "debezium_topic_router_unique_keys" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "debezium_topic_router_unique_keys" : {
+ "additionalProperties" : false,
+ "required" : [ "topic_regex", "output_topic", "key_field_name" ],
+ "properties" : {
+ "topic_regex" : {
+ "title" : "Input topic regex",
+ "description" : "A regular expression on the original destination topic name of each message that determines if it should be routed to a another destination topic and captures one or more groups of characters to be applied in output_topic
",
+ "type" : "string",
+ "format" : "regex"
+ },
+ "output_topic" : {
+ "title" : "Output topic",
+ "description" : "A string that defines the destination topic name. The destination topic name can be static or dynamic. To define dynamic destination topic name(s) use the capture groups matched from topic_regex
.",
+ "type" : "string"
+ },
+ "key_field_name" : {
+ "title" : "Key field name",
+ "description" : "Specifies the name of the field to add to the message key. The field value identifies the name of the source table, based on the original destination topic name. Optionally, you can manipulate the key field by setting the key_field_topic_regex
and key_field_value
.",
+ "type" : "string",
+ "default" : "__dbz__physicalTableIdentifier"
+ },
+ "key_field_topic_regex" : {
+ "title" : "Key field topic regex",
+ "description" : "A regular expression that is applied to the original destination topic name to capture one or more groups of characters and then apply them in key_field_value
",
+ "type" : "string",
+ "format" : "regex",
+ "default" : "(.*)"
+ },
+ "key_field_value" : {
+ "title" : "Key field value",
+ "description" : "A regular expression replacement string that sets the value of the inserted key field based on the captured groups from key_field_topic_regex
.",
+ "type" : "string",
+ "default" : "$1"
+ }
+ }
+ }
+ }
+ }, {
+ "title" : "Convert to upsert record (extract new record state)",
+ "description" : "Convert the complex Debezium data format into a flat upsert record. This transformation will remove the Debezium \"envelope\" with its metadata from the message and will only keep the updated dataset/row.",
+ "type" : "object",
+ "required" : [ "convert_to_upsert" ],
+ "additionalProperties" : false,
+ "properties" : {
+ "convert_to_upsert" : {
+ "additionalProperties" : false,
+ "required" : [ "delete_handling_mode" ],
+ "properties" : {
+ "delete_handling_mode" : {
+ "title" : "Handle DELETE events",
+ "description" : "When set to \"tombstone\" for each DELETE operation a tombstone record will be send to Kafka. When set to \"drop\" DELETE event records will be removed from the stream and not send to Kafka.",
+ "type" : "string",
+ "default" : "tombstone",
+ "enum" : [ "tombstone", "drop" ]
+ },
+ "add_metadata_to_value" : {
+ "title" : "Add Debezium metadata to the Kafka message value",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message value as additional fields of the payload.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_value_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ },
+ "add_metadata_to_headers" : {
+ "title" : "Add Debezium metadata to Kafka message headers",
+ "description" : "Optionally select Debezium metadata that should be added to the Kafka message headers as additional header fields.",
+ "$ref" : "#/$defs/debezium_metadata"
+ },
+ "metadata_headers_prefix" : {
+ "$ref" : "#/$defs/debezium_metadata_prefix"
+ }
+ }
+ }
+ }
+ } ]
+ }
}
},
"additionalProperties" : true,
@@ -880,6 +2443,55 @@ objects:
"type" : "string",
"enum" : [ "JSON", "JSON without schema" ],
"default" : "JSON"
+ },
+ "insert_field_name" : {
+ "title" : "Field name",
+ "description" : "The name of the field to insert. Append the suffix !
for a required field, or ?
for an optional field.",
+ "type" : "string"
+ },
+ "apply_to" : {
+ "title" : "Apply to",
+ "description" : "Apply transformation to Kafka message key or message value.",
+ "type" : "string",
+ "enum" : [ "message key", "message value" ]
+ },
+ "kafka_connect_types" : {
+ "type" : "string",
+ "enum" : [ "string", "int8", "int16", "int32", "int64", "float32", "float64", "boolean" ]
+ },
+ "include_exclude_list" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ },
+ "debezium_topic_router" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "required" : [ "topic", "replacement" ],
+ "properties" : {
+ "topic" : {
+ "type" : "string",
+ "format" : "regex"
+ },
+ "replacement" : {
+ "type" : "string",
+ "format" : "regex"
+ }
+ }
+ },
+ "debezium_metadata" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string",
+ "enum" : [ "op", "ts_ms", "source.ts_ms", "source.version", "source.connector", "source.db", "source.snapshot" ]
+ }
+ },
+ "debezium_metadata_prefix" : {
+ "title" : "Metadata field name prefix",
+ "description" : "Optional prefix for the added Debezium metadata fields.",
+ "type" : "string",
+ "default" : "__"
}
}
}