Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLUGIN-80 Confluent Cloud realtime Source and Sink plugins #70

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ To build this plugin:
The build will create a .jar and .json file under the ``target`` directory.
These files can be used to deploy your plugins.

Run tests
-----
To run all tests:

```
mvn clean test
```

System properties required for tests:
**test.kafka_server** - Kafka broker instance address.
**test.cluster_api_key** - Confluent API key.
**test.cluster_api_secret** - Confluent API secret.
**test.schema_registry_url** - Schema Registry URL.
**test.schema_registry_api_key** - Schema Registry API key.
**test.schema_registry_api_secret** - Schema Registry API secret.

Deployment
----------
You can deploy your plugins using the CDAP CLI:
Expand Down
77 changes: 77 additions & 0 deletions confluent-kafka-plugins/docs/Confluent-sparksink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Confluent Streaming Sink


Description
-----------
This sink writes data to Confluent.
Sends message to specified Kafka topic per received record. It can also be
configured to partition events being written to kafka based on a configurable key.
The sink can also be configured to operate in sync or async mode and apply different
compression types to events.
Can be used with self-managed Confluent Platform or Confluent Cloud. Supports Schema Registry.


Properties
----------
**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.

**Kafka Brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. (Macro-enabled)

**Kafka Topic:** The Kafka topic to read from. (Macro-enabled)

**Async:** Specifies whether an acknowledgment is required from broker that message was received. Default is No.

**Compression Type:** Compression type to be applied on message.

**Time Field:** Optional name of the field containing the read time of the message.
If this is not set, message will be send with current timestamp.
If set, this field must be present in the input schema and must be a long.

**Key Field:** Optional name of the field containing the message key.
If this is not set, message will be send without a key.
If set, this field must be present in the schema property and must be of type bytes.

**Partition Field:** Optional name of the field containing the partition the message should be written to.
If this is not set, default partition will be used for all messages.
If set, this field must be present in the schema property and must be an int.

**Message Format:** Optional format a structured record should be converted to.
Required if used without Schema Registry.

**Additional Kafka Producer Properties:** Additional Kafka producer properties to set.

**Cluster API Key:** The Confluent API Key used for the source.

**Cluster API Secret:** The Confluent API Secret used for the source.

**Schema Registry URL:** The Schema Registry endpoint URL.

**Schema Registry API Key:** The Schema Registry API Key.

**Schema Registry API Secret:** The Schema Registry API Secret.

Example
-------
This example writes structured record to kafka topic 'alarm' in asynchronous manner
using compression type 'gzip'. The written events will be written in csv format
to kafka running at localhost. The Kafka partition will be decided based on the provided key 'ts'.
Additional properties like number of acknowledgements and client id can also be provided.

```json
{
"name": "Confluent",
"type": "batchsink",
"properties": {
"referenceName": "Kafka",
"brokers": "host1.example.com:9092,host2.example.com:9092",
"topic": "alarm",
"async": "true",
"compressionType": "gzip",
"format": "CSV",
"kafkaProperties": "acks:2,client.id:myclient",
"key": "message",
"clusterApiKey": "",
"clusterApiSecret": ""
}
}
```
132 changes: 132 additions & 0 deletions confluent-kafka-plugins/docs/Confluent-streamingsource.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Confluent Streaming Source


Description
-----------
This source reads data from Confluent.
Emits a record per message from specified Kafka topic.
Can be used with self-managed Confluent Platform or Confluent Cloud. Supports Schema Registry.

Can be configured to parse values from source in following ways:
1. User-defined format. Use **Message Format** field to choose any format supported by CDAP.
1. Schema Registry. Requires credentials for Schema Registry to be specified.
Uses Avro schemas to deserialize Kafka messages. Use **Get Schema** button to fetch key and value schemas from registry.
1. Binary format. Used in case if no message format or Schema Registry credentials were provided.


Properties
----------
**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.

**Kafka Brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. (Macro-enabled)

**Kafka Topic:** The Kafka topic to read from. (Macro-enabled)

**Topic Partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled)

**Default Initial Offset:** The default initial offset for all topic partitions.
An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1.
Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read.
If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. (Macro-enabled)

**Initial Partition Offsets:** The initial offset for each topic partition. If this is not specified,
all partitions will use the same initial offset, which is determined by the defaultInitialOffset property.
Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset.
An offset of -2 means the smallest offset. An offset of -1 means the latest offset.
Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. (Macro-enabled)

**Time Field:** Optional name of the field containing the read time of the batch.
If this is not set, no time field will be added to output records.
If set, this field must be present in the schema property and must be a long.

**Key Field:** Optional name of the field containing the message key.
If this is not set, no key field will be added to output records.
If set, this field must be present in the schema property and must be bytes.

**Partition Field:** Optional name of the field containing the partition the message was read from.
If this is not set, no partition field will be added to output records.
If set, this field must be present in the schema property and must be an int.

**Offset Field:** Optional name of the field containing the partition offset the message was read from.
If this is not set, no offset field will be added to output records.
If set, this field must be present in the schema property and must be a long.

**Message Format:** Optional format of the Kafka event message. Any format supported by CDAP is supported.
For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values.
If no format is given, Kafka message payloads will be treated as bytes.

**Max Rate Per Partition:** Maximum number of records to read per second per partition. Defaults to 1000.

**Additional Kafka Consumer Properties:** Additional Kafka consumer properties to set.

**Cluster API Key:** The Confluent API Key used for the source.

**Cluster API Secret:** The Confluent API Secret used for the source.

**Schema Registry URL:** The Schema Registry endpoint URL.

**Schema Registry API Key:** The Schema Registry API Key.

**Schema Registry API Secret:** The Schema Registry API Secret.

**Value Field:** The name of the field containing the message value. Required to fetch schema from Schema Registry.

**Schema:** Output schema of the source. If you would like the output records to contain a field with the
Kafka message key, the schema must include a field of type bytes/nullable bytes or string/nullable string, and you must
set the **Key Field** property to that field's name. Similarly, if you would like the output records to contain a field
with the timestamp of when the record was read, the schema must include a field of type long or nullable long, and you
must set the **Time Field** property to that field's name. Any field that is not the **Time Field** or **Key Field**
will be used in conjunction with the format to parse Kafka message payloads. If used with Schema Registry then should
be fetched using **Get Schema** button.

Example
-------
***Example 1:*** Read from the 'purchases' topic of a Kafka instance running
on brokers host1.example.com:9092 and host2.example.com:9092. The source will add
a time field named 'readTime' that contains a timestamp corresponding to the micro
batch when the record was read. It will also contain a field named 'key' which will have
the message key in it. It parses the Kafka messages using the 'csv' format
with 'user', 'item', 'count', and 'price' as the message schema.

```json
{
"name": "Confluent",
"type": "streamingsource",
"properties": {
"topics": "purchases",
"brokers": "host1.example.com:9092,host2.example.com:9092",
"format": "csv",
"timeField": "readTime",
"keyField": "key",
"clusterApiKey": "",
"clusterApiSecret": "",
"defaultInitialOffset": "-2",
"schema": "{
\"type\":\"record\",
\"name\":\"purchase\",
\"fields\":[
{\"name\":\"readTime\",\"type\":\"long\"},
{\"name\":\"key\",\"type\":\"bytes\"},
{\"name\":\"user\",\"type\":\"string\"},
{\"name\":\"item\",\"type\":\"string\"},
{\"name\":\"count\",\"type\":\"int\"},
{\"name\":\"price\",\"type\":\"double\"}
]
}"
}
}
```

For each Kafka message read, it will output a record with the schema:

| field name | type |
| ----------- | ---------------- |
| readTime | long |
| key | bytes |
| user | string |
| item | string |
| count | int |
| price | double |

Note that the readTime field is not derived from the Kafka message, but from the time that the
message was read.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading