This project provides a Solace PubSub+ Event Broker to Kafka Source Connector (adapter) that makes use of the Kafka Connect API.
Note: there is also a PubSub+ Kafka Sink Connector available from the Solace PubSub+ Connector for Kafka: Sink GitHub repository.
Contents:
The PubSub+ Kafka Source Connector consumes PubSub+ event broker real-time queue or topic data events and streams them to a Kafka topic as Source Records.
The connector was created using PubSub+ high performance Java API to move PubSub+ data events to the Kafka Broker.
Unlike many other message brokers, the Solace PubSub+ Event Broker supports transparent protocol and API messaging transformations.
As the following diagram shows, any message that reaches the PubSub+ broker via the many supported message transports or language/API (examples can include an iPhone via C API, a REST POST, an AMQP, JMS or MQTT message) can be moved to a Topic (Key or not Keyed) on the Kafka broker via the single PubSub+ Source Connector.
The PubSub+ Event Mesh is a clustered group of PubSub+ Event Brokers, which appears to individual services (consumers or producers of data events) to be a single transparent event broker. The Event Mesh routes data events in real-time to any of its client services. The Solace PubSub+ brokers can be any of the three categories: dedicated extreme performance hardware appliances, high performance software brokers that are deployed as software images (deployable under most Hypervisors, Cloud IaaS and PaaS layers and in Docker) or provided as a fully-managed Cloud MaaS (Messaging as a Service).
Simply by having the PubSub+ Source Connector register interest in receiving events, the entire Event Mesh becomes aware of the registration request and will know how to securely route the appropriate events generated by other service on the Event Mesh to the PubSub+ Source Connector. The PubSub+ Source Connector takes those event messages and sends them as Kafka Source Records to the Kafka broker for storage in a Kafka Topic, regardless where in the Service Mesh the service is located that generated the event.
The PubSub+ Source Connector eliminates the complexity and overhead of maintaining separate Source Connectors for each and every upstream service that generates data events that Kafka may wish to consume. There is the added benefit of access to services where there is no Kafka Source Connector available, thereby eliminating the need to create and maintain a new connector for services from which Kafka may wish to store the data.
The PubSub+ Kafka Source Connector is available as a ZIP or TAR package from the downloads page.
The package includes jar libraries, documentation with license information and sample property files. Download and extract it into a directory that is on the plugin.path
of your connect-standalone
or connect-distributed
properties file.
This example demonstrates an end-to-end scenario similar to the Protocol and API messaging transformations use case, using the WebSocket API to publish a message to the PubSub+ event broker.
It builds on the open source Apache Kafka Quickstart tutorial and walks through getting started in a standalone environment for development purposes. For setting up a distributed environment for production purposes, refer to the User Guide section.
Note: The steps are similar if using Confluent Kafka; there may be difference in the root directory where the Kafka binaries (bin
) and properties (etc/kafka
) are located.
Steps
-
Install Kafka. Follow the Apache tutorial to download the Kafka release code, start the Zookeeper and Kafka servers in separate command line sessions, then create a topic named
test
and verify it exists. -
Install PubSub+ Source Connector. Designate and create a directory for the PubSub+ Source Connector - assuming it is named
connectors
. Editconfig/connect-standalone.properties
and ensure theplugin.path
parameter value includes the absolute path of theconnectors
directory. Download and extract the PubSub+ Source Connector into theconnectors
directory. -
Acquire access to a PubSub+ message broker. If you don't already have one available, the easiest option is to get a free-tier service in a few minutes in PubSub+ Cloud , following the instructions in Creating Your First Messaging Service.
-
Configure the PubSub+ Source Connector:
a) Locate the following connection information of your messaging service for the "Solace Java API" (this is what the connector is using inside):
- Username
- Password
- Message VPN
- one of the Host URIs
b) edit the PubSub+ Source Connector properties file located at
connectors/pubsubplus-connector-kafka-source-<version>/etc/solace_source.properties
updating following respective parameters so the connector can access the PubSub+ event broker:sol.username
sol.password
sol.vpn_name
sol.host
Note: In the configured source and destination information, the
sol.topics
parameter specifies the ingress topic on PubSub+ (sourcetest
) andkafka.topic
is the Kafka destination topic (test
), created in Step 1. -
Start the connector in standalone mode. In a command line session run:
bin/connect-standalone.sh \ config/connect-standalone.properties \ connectors/pubsubplus-connector-kafka-source-<version>/etc/solace_source.properties
After startup, the logs will eventually contain following line:
================Session is Connected
-
Start to watch messages arriving to Kafka. See the instructions in the Kafka tutorial to start a consumer on the
test
topic. -
Demo time! To generate an event into PubSub+, we use the "Try Me!" test service of the browser-based administration console to publish test messages to the
sourcetest
topic. Behind the scenes, "Try Me!" uses the JavaScript WebSocket API.-
If you are using PubSub+ Cloud for your messaging service follow the instructions in Trying Out Your Messaging Service.
-
If you are using an existing event broker, log in to its PubSub+ Manager admin console and follow the instructions in How to Send and Receive Test Messages.
In both cases, ensure to set the topic to
sourcetest
, which the connector is listening to.The Kafka consumer from Step 6 should now display the new message arriving to Kafka through the PubSub+ Kafka Source Connector:
Hello world!
-
The Connector parameters consist of Kafka-defined parameters and PubSub+ connector-specific parameters.
Refer to the in-line documentation of the sample PubSub+ Kafka Source Connector properties file and additional information in the Configuration section.
The PubSub+ Source Connector deployment has been tested on Apache Kafka 3.5 and Confluent Kafka 7.4 platforms. The Kafka software is typically placed under the root directory: /opt/<provider>/<kafka or confluent-version>
.
Kafka distributions may be available as install bundles, Docker images, Kubernetes deployments, etc. They all support Kafka Connect which includes the scripts, tools and sample properties for Kafka connectors.
Kafka provides two options for connector deployment: standalone mode and distributed mode.
-
In standalone mode, recommended for development or testing only, configuration is provided together in the Kafka
connect-standalone.properties
and in the PubSub+ Source Connectorsolace_source.properties
files and passed to theconnect-standalone
Kafka shell script running on a single worker node (machine), as seen in the Quick Start. -
In distributed mode, Kafka configuration is provided in
connect-distributed.properties
and passed to theconnect-distributed
Kafka shell script, which is started on each worker node. Thegroup.id
parameter identifies worker nodes belonging the same group. The script starts a REST server on each worker node and PubSub+ Source Connector configuration is passed to any one of the worker nodes in the group through REST requests in JSON format.
To deploy the Connector, for each target machine, download and extract the PubSub+ Source Connector into a directory and ensure the plugin.path
parameter value in the connect-*.properties
includes the absolute path to that directory. Note that Kafka Connect, i.e., the connect-standalone
or connect-distributed
Kafka shell scripts, must be restarted (or equivalent action from a Kafka console is required) if the PubSub+ Source Connector deployment is updated.
Some PubSub+ Source Connector configurations may require the deployment of additional specific files like keystores, truststores, Kerberos config files, etc. It does not matter where these additional files are located, but they must be available on all Kafka Connect Cluster nodes and placed in the same location on all the nodes because they are referenced by absolute location and configured only once through one REST request for all.
First test to confirm the PubSub+ Source Connector is available for use in distributed mode with the command:
curl http://18.218.82.209:8083/connector-plugins | jq
In this case the IP address is one of the nodes running the distributed mode worker process, the port defaults to 8083 or as specified in the rest.port
property in connect-distributed.properties
. If the connector is loaded correctly, you should see a response similar to:
{
"class": "com.solace.connector.kafka.connect.source.SolaceSourceConnector",
"type": "source",
"version": "3.1.0"
},
At this point, it is now possible to start the connector in distributed mode with a command similar to:
curl -X POST -H "Content-Type: application/json" \
-d @solace_source_properties.json \
http://18.218.82.209:8083/connectors
The connector's JSON configuration file, in this case, is called solace_source_properties.json
. A sample is available here, which can be extended with the same properties as described in the Parameters section.
Determine if the Source Connector is running with the following command:
curl 18.218.82.209:8083/connectors/solaceSourceConnector/status | jq
If there was an error in starting, the details are returned with this command.
In standalone mode, the connect logs are written to the console. If you do not want to send the output to the console, simply add the "-daemon" option to have all output directed to the logs directory.
In distributed mode, the logs location is determined by the connect-log4j.properties
located at the config
directory in the Apache Kafka distribution or under etc/kafka/
in the Confluent distribution.
If logs are redirected to the standard output, here is a sample log4j.properties snippet to direct them to a file:
log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka/connect.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true
To troubleshoot PubSub+ connection issues, increase logging level to DEBUG by adding following line:
log4j.logger.com.solacesystems.jcsmp=DEBUG
Ensure that you set it back to INFO or WARN for production.
There are many ways to map PubSub+ messages to Kafka topics, partitions, keys, and values, depending on the application behind the events.
The PubSub+ Source Connector comes with two sample message processors that can be used as-is, or as a starting point to develop a customized message processor.
- SimpleMessageProcessor: Takes the PubSub+ message as a binary payload and creates a Kafka Source record with a Binary Schema for the value (from the PubSub+ message payload).
- SampleKeyedMessageProcessor: A more complex sample that allows the flexibility of changing the source record Key Schema and which value from the PubSub+ message to use as a key. The option of no key in the record is also possible.
The desired message processor is loaded at runtime based on the configuration of the JSON or properties configuration file, for example:
sol.message_processor_class=com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor
It is possible to create more custom message processors based on your Kafka record requirements for keying and/or value serialization and the desired format of the PubSub+ event message. Simply add the new message processor classes to the project. The desired message processor is installed at run time based on the configuration file.
Refer to the Developers Guide for more information about building the Source Connector and extending message processors.
By default, the Source Connector will process live events from the PubSub+ event broker. If replay of past recorded events is required for later consumption, it also is possible to use the PubSub+ Message Replay feature, initiated through PubSub+ management.
The event broker uses a "best effort" approach to deliver the events from PubSub+ Topics to the Source Connector. If the connector is down, or messages are constantly generated at a rate faster than can be written to Kafka, there is a potential for data loss. When Kafka is configured for its highest throughput, it is susceptible to loss and, obviously, the connector cannot add records if the Kafka broker is unavailable.
When a Kafka Topic is configured for high throughput the use of topics to receive data event messages is acceptable and recommended.
The connector can ingest using a list of topic subscriptions, where each can be a wild-card subscription.
It is also possible to have the PubSub+ Source Connector attract data events from a PubSub+ Queue. A queue guarantees order of delivery, provides High Availability and Disaster Recovery (depending on the setup of the PubSub+ brokers) and provides an acknowledgment to the message producer (in this case the PubSub+ event producer application) when the event is stored in all HA and DR members and flushed to disk. This is a higher guarantee than is provided by Kafka even for Kafka idempotent delivery.
When a Kafka Topic is configured with its highest quality-of-service with respect to record loss or duplication, it results in a large reduction in record processing throughput. However, in some application requirements this QoS is required. In this case, the PubSub+ Source Connector should use Queues for the consumption of events from the Event Mesh.
Note that one connector can ingest from only one queue.
When the connector is consuming from a PubSub+ queue, a timed Kafka Connect process commits the source records and offset to disk on the Kafka broker and calls the connector to acknowledge the messages that were processed so far, which removes these event messages from the event broker queue.
If the Kafka Connect API or the Kafka broker goes down, unacknowledged messages are not lost; they will be retransmitted as soon as the Connect API or Kafka broker is restarted. It is important to note that while the Connect API or the Kafka Broker are off-line, the PubSub+ queue continues to add event messages, so there is no loss of new data from the PubSub+ Event Mesh.
The commit time interval is configurable via the offset.flush.interval.ms
parameter (default 60,000 ms) in the worker's connect-distributed.properties
configuration file. If high message rate is expected the parameter shall be tuned, taking into consideration that each task (in case of Multiple Workers) shall not allow excessively large (for example, 10,000 or more) amount of unacknowledged messages.
If the throughput through the Connect API is not high enough, and messages are starting to accumulate in the PubSub+ Queue, scaling of the Connector is recommended as discussed below.
If the Source Connector has not been scaled to a sufficient level to deal with bursts, the Queue can act as a "shock absorber" to deal with micro-bursts or sustained periods of heavy event generation in the Event Mesh so that data events are no longer lost due to an unforeseen event.
The Topic-to-Queue Mapping is the simple process of configuring a PubSub+ Queue to attract PubSub+ Topic data event. These data events are immediately available via the queue with a protection against record loss and with the "shock absorber" advantage that use of a Queue provides.
Topic-to-Queue Mapping, just like any PubSub+ topic subscriptions, allows wild-card subscriptions to multiple topics.
The PubSub+ broker supports far greater throughput than can be afforded through a single instance of the Connect API. The Kafka Broker can also process records at a rate far greater than available through a single instance of the Connector. Therefore, multiple instances of the Source Connector can increase throughput from the Kafka broker to the Solace PubSub+ broker.
You can deploy and automatically and spread multiple connector tasks across all available Connect API workers simply by indicating the number of desired tasks in the connector configuration file.
PubSub+ queue or topic subscriptions must be configured properly to support distributed consumption, so events are automatically load balanced between the multiple workers:
-
If the ingestion source is a Queue, it must be configured as non-exclusive, which permits multiple consumers to receive messages in a round-robin fashion.
-
By the nature of Topics, if there are multiple subscribers to a topic, all subscribers receive all of the same topic data event messages. Load balancing can be achieved by applying Shared Subscriptions, which ensures that messages are delivered to only one active subscriber at a time.
Note that Shared Subscriptions may need to be administratively enabled in the event broker Client Profile. Also note that Shared Subscriptions are not available on older versions of the event broker. The deprecated DTO (Deliver-To-One) feature can be used instead.
The security setup and operation between the PubSub+ broker and the Source Connector and Kafka broker and the Source Connector operate completely independently.
The Source Connector supports both PKI and Kerberos for more secure authentication beyond the simple user name/password, when connecting to the PubSub+ event broker.
The security setup between the Source Connector and the Kafka brokers is controlled by the Kafka Connect libraries. These are exposed in the configuration file as parameters based on the Kafka-documented parameters and configuration. Please refer to the Kafka documentation for details on securing the Source Connector to the Kafka brokers for both PKI/TLS and Kerberos.
The PKI/TLS support is well documented in the Solace Documentation, and will not be repeated here. All the PKI required configuration parameters are part of the configuration variable for the Solace session and transport as referenced above in the Parameters section. Sample parameters are found in the included properties file.
Kerberos authentication support requires a bit more configuration than PKI since it is not defined as part of the Solace session or transport.
Typical Kerberos client applications require details about the Kerberos configuration and details for the authentication. Since the Source Connector is a server application (i.e. no direct user interaction) a Kerberos keytab file is required as part of the authentication, on each Kafka Connect Cluster worker node where the connector is deployed.
The enclosed krb5.conf and login.conf configuration files are samples that allow automatic Kerberos authentication for the Source Connector when it is deployed to the Connect Cluster. Together with the keytab file, they must be also available on all Kafka Connect cluster nodes and placed in the same (any) location on all the nodes. The files are then referenced in the Source Connector properties, for example:
sol.kerberos.login.conf=/opt/kerberos/login.conf
sol.kerberos.krb5.conf=/opt/kerberos/krb5.conf
The following property entry is also required to specify Kerberos Authentication:
sol.authentication_scheme=AUTHENTICATION_SCHEME_GSS_KRB
Kerberos has some very specific requirements to operate correctly. Some additional tips are as follows:
- DNS must be operating correctly both in the Kafka brokers and on the Solace PS+ broker.
- Time services are recommended for use with the Kafka Cluster nodes and the Solace PS+ broker. If there is too much drift in the time between the nodes, Kerberos will fail.
- You must use the DNS name and not the IP address in the Solace PS+ host URI in the Connector configuration file
- You must use the full Kerberos user name (including the Realm) in the configuration property; obviously, no password is required.
JDK 11 or higher is required for this project.
- First, clone this GitHub repo:
git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-source.git cd pubsubplus-connector-kafka-source
- Install the test support module:
git submodule update --init --recursive cd solace-integration-test-support ./mvnw clean install -DskipTests -Dchangelist= cd ..
- Then run the build script:
gradlew clean build
This script creates artifacts in the build
directory, including the deployable packaged PubSub+ Source Connector archives under build\distributions
.
An integration test suite is also included, which spins up a Docker-based deployment environment that includes a PubSub+ event broker, Zookeeper, Kafka broker, Kafka Connect. It deploys the connector to Kafka Connect and runs end-to-end tests.
- Run the tests:
./gradlew clean test integrationTest
The processing of the Solace message to create a Kafka source record is handled by SolaceMessageProcessorIF
. This is a simple interface that creates the Kafka source records from the PubSub+ messages.
To get started, import the following dependency into your project:
Maven
<dependency>
<groupId>com.solace.connector.kafka.connect</groupId>
<artifactId>pubsubplus-connector-kafka-source</artifactId>
<version>3.0.0</version>
</dependency>
Gradle
compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-source:3.0.0"
Now you can implement your custom SolMessageProcessorIF
.
For reference, this project includes two examples which you can use as starting points for implementing your own custom message processors:
Above two processors by default won't map/forward the Solace message user properties and Solace standard properties. If you want to map/forward them as Kafka record headers set below two properties to true
in connector configuration. Refer sample here and Parameters section section for details.
sol.message_processor.map_user_properties=true
sol.message_processor.map_solace_standard_properties=true
Once you've built the jar file for your custom message processor project, place it into the same directory as this connector, and update the connector's sol.message_processor_class
config to point to the class of your new message processor.
More information on Kafka source connector development can be found here:
For additional information, use cases and explanatory videos, please visit the PubSub+/Kafka Integration Guide.
Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.
See the list of contributors who participated in this project.
This project is licensed under the Apache License, Version 2.0. See the LICENSE file for details.
For more information about Solace technology in general, please visit these resources:
- How to Setup and Use
- The Solace Developers website
- Understanding Solace technology
- Ask the Solace Community