The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud Platform (GCP) first-party connectors for Pub/Sub products with Kafka Connect. You can use the library to transmit data from Apache Kafka to Cloud Pub/Sub or Pub/Sub Lite and vice versa.
CloudPubSubSinkConnector
is a sink connector that reads records from Kafka and publishes them to Cloud Pub/Sub.CloudPubSubSourceConnector
is a source connector that reads messages from Cloud Pub/Sub and writes them to Kafka.PubSubLiteSinkConnector
is a sink connector that reads records from Kafka and publishes them to Pub/Sub Lite.PubSubLiteSourceConnector
is a source connector that reads messages from Pub/Sub Lite and writes them to Kafka.
You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite.
Follow these setup steps for Pub/Sub before doing the quickstart.
Follow these setup steps for Pub/Sub Lite before doing the quickstart.
For general information on how to authenticate with GCP when using the Google Cloud Pub/Sub Group Kafka Connector library, please visit Provide credentials for Application Default Credentials.
In this quickstart, you will learn how to send data from a Kafka topic to a Pub/Sub or Pub/Sub Lite topic and vice versa, using Kafka Connect running locally in standalone mode (single process).
-
Follow the Kafka quickstart to download Kafka, start the Kafka environment, and create a Kafka topic.
Note: Please use the same Kafka API major version as that used by the connector. Otherwise, the connector may not work properly. Check the Kafka version used by the connector in pom.xml.
-
Acquire the connector jar.
-
Update your Kafka Connect configurations.
Open
/config/connect-standalone.properties
in the Kafka download folder. Add the filepath of the downloaded connector jar toplugin.path
and uncomment the line if needed. In addition, because the connector is using Kafka Connect in standalone mode, includeoffset.storage.file.filename
with a valid filename to store offset data in. -
Create a pair of Pub/Sub or Pub/Sub Lite topic and subscription.
CloudPubSubSinkConnector
andCloudPubSubSourceConnector
- Create a pair of Pub/Sub topic and subscription
PubSubLiteSinkConnector
andPubSubLiteSourceConnector
- Create a pair of Pub/Sub Lite topic and subscription.
-
Update the connector configurations.
Open the connector configuration files at /config. Update variables labeled
TODO (developer)
with appropriate input.-
CloudPubSubSinkConnector
- Open
cps-sink-connector.properties
. - Update
topics
,cps.project
, andcps.topic
.
- Open
-
CloudPubSubSourceConnector
- Open
cps-source-connector.properties
. - Update
kafka.topic
,cps.project
, andcps.subscription
.
- Open
-
PubSubLiteSinkConnector
- Open
pubsub-lite-sink-connector.properties
. - Update
topics
,pubsublite.project
,pubsublite.location
andpubsublite.topic
.
- Open
-
PubSubLiteSourceConnector
- Open
pubsub-lite-source-connector.properties
. - Update
kafka.topic
,pubsublite.project
,pubsublite.location
andpubsublite.subscription
.
- Open
-
-
Run the following command to start the appropriate sink or source connector. You can run multiple connector tasks at the same time.
> bin/connect-standalone.sh \ config/connect-standalone.properties \ path/to/pubsub/sink/connector.properties [source.connector.properties ...]
-
Test the connector.
-
CloudPubSubSinkConnector
- Follow the instructions in the Kafka quickstart to publish a message to the Kafka topic.
- Pull the message from your Pub/Sub subscription.
-
CloudPubSubSourceConnector
- Publish a message to your Pub/Sub topic.
- Follow the instructions in the Kafka quickstart to read the message from your Kafka topic.
-
PubSubLiteSinkConnector
- Publish a message to your Pub/Sub Lite topic.
- Follow the instructions in the Kafka quickstart to read the message from your Kafka topic.
-
PubSubLiteSourceConnector
- Follow the instructions in the Kafka quickstart to publish a message to the Kafka topic.
- Pull the message from your Pub/Sub Lite subscription.
-
The connector is available from Maven Central repository. Select the latest version of the connector, then download "jar" from the Downloads dropdown menu.
You can also build the connector from head.
Please refer to Kafka User Guide for general information on running connectors using Kafka Connect.
To run this connector using Kafka Connect in standalone mode, follow these steps:
-
Copy the connector jar where you will run Kafka Connect.
-
Create a configuration file for your Kafka Connect instance. Make sure to include the filepath to the connector jar in
plugin.path
. -
Make a copy of the connector configuration files at /config and update the configuration options accordingly.
-
Start Kafka Connect with your connector with the following command:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
-
If running the Kafka Connect behind a proxy, export the
KAFKA_OPTS
variable with options for connecting around the proxy.> export KAFKA_OPTS="-Dhttp.proxyHost=<host> -Dhttp.proxyPort=<port> -Dhttps.proxyHost=<host> -Dhttps.proxyPort=<port>"
When running the connector on a Kafka cluster in distributed mode, "the connector configurations are not passed on the command line. Instead, use the REST API described below to create, modify, and destroy connectors" (Kafka User Guide).
In addition to the Kafka Connect configurations supplied by the Kafka Connect API, the Pub/Sub connector supports the following configurations:
Config | Value Range | Default | Description |
---|---|---|---|
cps.subscription | String | REQUIRED (No default) | The Pub/Sub subscription ID, e.g. "baz" for subscription "/projects/bar/subscriptions/baz". |
cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub subscription, e.g. "bar" from above. |
cps.endpoint | String | "pubsub.googleapis.com:443" | The Pub/Sub endpoint to use. |
kafka.topic | String | REQUIRED (No default) | The Kafka topic which will receive messages from the Pub/Sub subscription. |
cps.maxBatchSize | Integer | 100 | The maximum number of messages per batch in a pull request to Pub/Sub. |
cps.makeOrderingKeyAttribute | Boolean | false | When true, copy the ordering key to the set of attributes set in the Kafka message. |
kafka.key.attribute | String | null | The Pub/Sub message attribute to use as a key for messages published to Kafka. If set to "orderingKey", use the message's ordering key. |
kafka.partition.count | Integer | 1 | The number of Kafka partitions for the Kafka topic in which messages will be published to. NOTE: this parameter is ignored if partition scheme is "kafka_partitioner". |
kafka.partition.scheme | round_robin, hash_key, hash_value, kafka_partitioner, ordering_key | round_robin | The scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to Kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not. "ordering_key" uses the hash code of a message's ordering key. If no ordering key is present, uses "round_robin". |
gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used.If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
kafka.record.headers | Boolean | false | Use Kafka record headers to store Pub/Sub message attributes. |
cps.streamingPull.enabled | Boolean | false | Whether to use streaming pull for the connector to connect to Pub/Sub. If provided, cps.maxBatchSize is ignored. |
cps.streamingPull.flowControlMessages | Long | 1,000 | The maximum number of outstanding messages per task when using streaming pull. |
cps.streamingPull.flowControlBytes | Long | 100L * 1024 * 1024 (100 MiB) | The maximum number of outstanding message bytes per task when using streaming pull. |
cps.streamingPull.parallelStreams | Integer | 1 | The maximum number of outstanding message bytes per task when using streaming pull. |
cps.streamingPull.maxAckExtensionMs | Long | 0 | The maximum number of milliseconds the subscribe deadline will be extended to in milliseconds when using streaming pull. A value of 0 implies the java-pubsub library default value. |
cps.streamingPull.maxMsPerAckExtension | Long | 0 | The maximum number of milliseconds to extend the subscribe deadline for at a time when using streaming pull. A value of 0 implies the java-pubsub library default value. |
Config | Value Range | Default | Description |
---|---|---|---|
cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
cps.endpoint | String | "pubsub.googleapis.com:443" | The Pub/Sub endpoint to use. |
maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
enableCompression | Boolean | false | When true, enable publish-side compression in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress. |
In addition to the Kafka Connect configurations supplied by the Kafka Connect API, the Pub/Sub Lite connector supports the following configurations:
Config | Value Range | Default | Description |
---|---|---|---|
pubsublite.subscription | String | REQUIRED (No default) | The Pub/Sub Lite subscription ID, e.g. "baz" for the subscription "/projects/bar/locations/europe-south7-q/subscriptions/baz". |
pubsublite.project | String | REQUIRED (No default) | The project containing the Pub/Sub Lite subscription, e.g. "bar" from above. |
pubsublite.location | String | REQUIRED (No default) | The location of the Pub/Sub Lite subscription, e.g. "europe-south7-q" from above. |
kafka.topic | String | REQUIRED (No default) | The Kafka topic which will receive messages from Pub/Sub Lite. |
pubsublite.partition_flow_control.messages | Long | Long.MAX_VALUE | The maximum number of outstanding messages per Pub/Sub Lite partition. |
pubsublite.partition_flow_control.bytes | Long | 20,000,000 | The maximum number of outstanding bytes per Pub/Sub Lite partition. |
Config | Value Range | Default | Description |
---|---|---|---|
pubsublite.topic | String | REQUIRED (No default) | The Pub/Sub Lite topic ID, e.g. "foo" for topic "/projects/bar/locations/europe-south7-q/topics/foo". |
pubsublite.project | String | REQUIRED (No default) | The project containing the Pub/Sub Lite topic, e.g. "bar" from above. |
pubsublite.location | String | REQUIRED (No default) | The location of the Pub/Sub Lite topic, e.g. "europe-south7-q" from above. |
These following configurations are shared by the Pub/Sub and Pub/Sub Lite connectors.
Config | Value Range | Default | Description |
---|---|---|---|
gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, the environment variable GOOGLE_APPLICATION_CREDENTIALS is used. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
The message data field of PubSubMessage
is a ByteString
object that translates well to and from the byte[]
bodies of Kafka messages.
We recommend using a converter that produces primitive data types (i.e. integer,
float, string, or bytes types) where possible to avoid deserializing and
re-serializing the same message body.
Additionally, a Pub/Sub message cannot exceed 10 MB. We recommend checking your
message.max.bytes
configuration to prevent possible errors.
The sink connector handles message conversion in the following way:
- Integer, float, string, and bytes types in Kafka messages are passed directly into the Pub/Sub message body as bytes.
- Map and struct types in Kafka messages are stored as Pub/Sub attributes.
Pub/Sub attributes only supports string to string mapping. To make the
connector as versatile as possible, the
toString()
method is called on objects passed in as the key or value for a map and the value for a struct.- If you set the
messageBodyName
configuration to a struct field or map key, the value of the structure field or the map value (of integer, byte, float, and array type) will be stored in the Pub/Sub message body as bytes.
- If you set the
- Only primitive array types are supported due to potential collisions of field names or keys of a struct/map array. The connector handles arrays in a fairly predictable fashion, where values are concatenated into a ByteString object.
- The record-level key for a Kafka message is stored in Pub/Sub message attributes as a string, with the name "key".
Note: Pub/Sub message attributes have the following limitations:
- Attributes per message: 100
- Attribute key size: 256 bytes
- Attribute value size: 1024 bytes
The connector will transform Kafka record-level message headers that meet these limitations and ignore those that don't.
The source connector handles the conversion from a Pub/Sub message into a Kafka
SourceRecord
in a similar way:
- The source connector searches for
kafka.key.attribute
in the attributes of a Pub/Sub message. If found, it will be used as the Kafka messagekey
as a string. Otherwise, the Kafka messagekey
will be set to null. - If a Pub/Sub message has no attributes, the message body will be stored as a
byte[] object for the Kafka message
value
. - If a Pub/Sub message contains attributes other than
kafka.key.attribute
, they will be assigned a struct schema. The message attribute keys will become struct field names, and the corresponding attribute values will become values of those struct fields. The message body will be transformed into a struct field of namemessage
and of type bytes.- To carry forward the structure of data stored in Pub/Sub message attributes, we recommend using a converter that represents a struct schema type, like JsonConverter.
Pub/Sub Lite messages have the following structure:
class Message {
ByteString key;
ByteString data;
ListMultimap<String, ByteString> attributes;
Optional<Timestamp> eventTime;
}
This table shows how each field in a Kafka SinkRecord
maps to a Pub/Sub
Lite message by the sink connector:
SinkRecord | Message |
---|---|
key{Schema} | key |
value{Schema} | data |
headers | attributes |
topic | attributes["x-goog-pubsublite-source-kafka-topic"] |
kafkaPartition | attributes["x-goog-pubsublite-source-kafka-partition"] |
kafkaOffset | attributes["x-goog-pubsublite-source-kafka-offset"] |
timestamp | eventTime |
timestampType | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
When a key, value or header value with a schema is encoded as a ByteString, the following logic will be used:
- Null schemas are treated as
Schema.STRING_SCHEMA
- Top-level BYTES payloads are unmodified
- Top-level STRING payloads are encoded using
copyFromUtf8
- Top-level Integral payloads are converted using
copyFromUtf8(Long.toString( x.longValue()))
- Top-level Floating point payloads are converted using
copyFromUtf8( Double.toString(x.doubleValue()))
- All other payloads are encoded into a protobuf Value, then converted to a
ByteString
- Nested STRING fields are encoded into a protobuf Value
- Nested BYTES fields are encoded to a protobuf Value holding the base64 encoded bytes
- Nested Numeric fields are encoded as a double into a protobuf Value
- Maps with Array, Map, or Struct keys are not supported
- BYTES keys in maps are base64 encoded
- Integral keys are converted using
Long.toString(x.longValue())
- Floating point keys are converted using
Double.toString( x.doubleValue())
The source connector performs a one-to-one mapping from
SequencedMessage
fields to their Kafka SourceRecord
counterparts.
Pub/Sub Lite message of empty message.key
fields will have their field values
be converted to null
, and they will be assigned to Kafka partitions using the
round-robin scheme. Messages with identical, non-empty keys will be routed to
the same Kafka partition.
SequencedMessage | SourceRecord field | SourceRecord schema |
---|---|---|
message.key | key | BYTES |
message.data | value | BYTES |
message.attributes | headers | BYTES |
<source topic> |
sourcePartition["topic"] | String field in map |
<source partition> |
sourcePartition["partition"] | Integer field in map |
cursor.offset | sourceOffset["offset"] | Long field in map |
message.event_time | timestamp | long milliseconds since unix epoch if present |
publish_time | timestamp | long milliseconds since unix epoch if no event_time exists |
These instructions assume you are using Maven.
-
Clone this repository:
> git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
-
Package the connector jar:
> mvn clean package -DskipTests=True
You should see the resulting jar at
target/pubsub-group-kafka-connector-${VERSION}-SNAPSHOT.jar
on success.
This library follows Semantic Versioning.
Contributions to this library are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.