Scylla CDC Source Connector can be paired with Elasticsearch Sink, allowing you to easily search trough your database. This quickstart will show you how to achieve that.
This guide assumes that you have already set up Scylla and CDC Source Connector - instructions for that are available in README-QUICKSTART.md
.
In order for the connector to be compatible with Elasticsearch Sink (and other sinks) you need to use a ScyllaExtractNewState
transformer. This transformer is described in more detail in README.md file in "ScyllaExtractNewState
transformer" section.
-
Open the Confluent Control Center. By default, it is started at port
9021
: -
Click on the cluster you want to start the connector in and open the "Connect" tab:
-
Add the transformation. In the "Transforms" field add a transformation, with any name. After doing that, a new section, named
Transforms: YourNameHere
will appear. In that section, from drop-down list, select class namecom.scylladb.cdc.debezium.connector.transforms.ScyllaExtractNewRecordState
. -
Click "Next" and then "Launch".
Now you need to run Elasticsearch instance. The quickest way to do this is to use docker, as described in this guide. There is one small change required: when run with provided command, Elasticsearch runs out of memory after a short while and crashes. You should instead use command below:
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "bootstrap_memory_lock=true" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" --ulimit memlock=-1:-1 docker.elastic.co/elasticsearch/elasticsearch:7.14.0
Now comes the final part of setup: connecting CDC Source to Elasticsearch.
-
Install sink connector using Confluent hub:
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
-
Open the Confluent Control Center. By default, it is started at port
9021
: -
Click on the cluster you want to start the connector in and open the "Connect" tab:
-
Click "Elasticsearch Sink Connector".
-
Configure the connector:
- Input the topic name. In our example it would be
QuickstartConnectorNamespace.ks.t
. - Input any name you want in the "Name" field.
- "Key converter class" must be set to
org.apache.kafka.connect.storage.StringConverter
, because Elasticsearch requires document ids to be numbers or strings. This will unfortunately produce pretty ugly looking ids - you can fix that with custom transformer. - "Value converter class" must be set to the same value as in the source connector.
- Set "Connection URLs" in "Connector" section to proper Elasticsearch URL - in our example it would be
http://localhost:9200
- Input the topic name. In our example it would be
-
Test that everything works. Insert some data to the table, you should see it in Elasticsearch. Delete some rows, they should disappear from Elasticsearch.