diff --git a/README.md b/README.md
index 4dd9985..b16a0ba 100644
--- a/README.md
+++ b/README.md
@@ -4,19 +4,19 @@
[![Donate](https://img.shields.io/badge/Donate-PayPal-green.svg)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=E3P9D3REZXTJS)
It's a basic [Apache Kafka](https://kafka.apache.org/) [Connect SinkConnector](https://kafka.apache.org/documentation/#connect) for [MongoDB](https://www.mongodb.com/).
-The connector uses the official MongoDB [Java Driver](http://mongodb.github.io/mongo-java-driver/3.6/).
-Future releases might additionally support the [asynchronous driver](http://mongodb.github.io/mongo-java-driver/3.6/driver-async/).
+The connector uses the official MongoDB [Java Driver](http://mongodb.github.io/mongo-java-driver/3.10/).
+Future releases might additionally support the [asynchronous driver](http://mongodb.github.io/mongo-java-driver/3.10/driver-async/).
### Users / Testimonials
| Company | |
|---------|---|
| [![QUDOSOFT](docs/logos/qudosoft.png)](http://www.qudosoft.de/) | "As a subsidiary of a well-established major german retailer,
Qudosoft is challenged by incorporating innovative and
performant concepts into existing workflows. At the core of
a novel event-driven architecture, Kafka has been in
experimental use since 2016, followed by Connect in 2017.
Since MongoDB is one of our databases of choice, we were
glad to discover a production-ready sink connector for it.
We use it, e.g. to persist customer contact events, making
them available to applications that aren't integrated into our
Kafka environment. Currently, this MongoDB sink connector
runs on five workers consuming approx. 50 - 200k AVRO
messages per day, which are written to a replica set." |
-| [![RUNTITLE](docs/logos/runtitle.png)](https://www.runtitle.com/) | "RunTitle.com is a data-driven start-up in the Oil & Gas space.
We curate mineral ownership data from millions of county
records and help facilitate deals between mineral owners
and buyers. We use Kafka to create an eco-system of loosely
coupled, specialized applications that share information.
We have identified the mongodb-sink-connector to be central
to our plans and were excited that it was readily enhanced to
[support our particular use-case.](https://github.com/hpgrahsl/kafka-connect-mongodb#custom-write-model-filters) The connector documentation
and code are very clean and thorough. We are extremely positive
about relying on OSS backed by such responsive curators." |
+| [![RUNTITLE](docs/logos/runtitle.png)](https://www.runtitle.com/) | "RunTitle.com is a data-driven start-up in the Oil & Gas space.
We curate mineral ownership data from millions of county
records and help facilitate deals between mineral owners
and buyers. We use Kafka to create an eco-system of loosely
coupled, specialized applications that share information.
We have identified the mongodb-sink-connector to be central
to our plans and were excited that it was readily enhanced to
[support our particular use-case.](https://github.com/hpgrahsl/kafka-connect-mongodb#custom-write-models) The connector documentation
and code are very clean and thorough. We are extremely positive
about relying on OSS backed by such responsive curators." |
### Supported Sink Record Structure
Currently the connector is able to process Kafka Connect SinkRecords with
-support for the following schema types [Schema.Type](https://kafka.apache.org/10/javadoc/org/apache/kafka/connect/data/Schema.Type.html):
+support for the following schema types [Schema.Type](https://kafka.apache.org/21/javadoc/org/apache/kafka/connect/data/Schema.Type.html):
*INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT*.
The conversion is able to generically deal with nested key or value structures - based on the supported types above - like the following example which is based on [AVRO](https://avro.apache.org/)
@@ -53,14 +53,14 @@ The conversion is able to generically deal with nested key or value structures -
```
##### Logical Types
-Besides the standard types it is possible to use [AVRO logical types](http://avro.apache.org/docs/1.8.1/spec.html#Logical+Types) in order to have field type support for
+Besides the standard types it is possible to use [AVRO logical types](http://avro.apache.org/docs/1.8.2/spec.html#Logical+Types) in order to have field type support for
* **Decimal**
* **Date**
* **Time** (millis/micros)
* **Timestamp** (millis/micros)
-The following example based on exemplary logical type definitions should make this clearer:
+The following AVRO schema snippet based on exemplary logical type definitions should make this clearer:
```json
{
@@ -137,7 +137,9 @@ The sink connector implementation is configurable in order to support
* **JSON plain** (offers JSON record structure without any attached schema)
* **RAW JSON** (string only - JSON structure not managed by Kafka connect)
-Since these settings can be independently configured, it's possible to have different settings for the key and value of record respectively.
+Since key and value settings can be independently configured, it is possible to work with different data formats for records' keys and values respectively.
+
+_NOTE: Even when using RAW JSON mode i.e. with [StringConverter](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/connect/storage/StringConverter.html) the expected Strings have to be valid and parsable JSON._
##### Configuration example for AVRO
```properties
@@ -363,7 +365,7 @@ These settings cause:
Note the use of the **"." character** as navigational operator in both examples. It's used in order to refer to nested fields in sub documents of the record structure. The prefix at the very beginning is used as a simple convention to distinguish between the _key_ and _value_ structure of a document.
### Custom Write Models
-The default behaviour for the connector whenever documents are written to MongoDB collections is to make use of a proper [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html) with [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) and **create the filter document based on the _id field** which results from applying the configured DocumentIdAdder in the value structure of the sink document.
+The default behaviour for the connector whenever documents are written to MongoDB collections is to make use of a proper [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.10/javadoc/com/mongodb/client/model/ReplaceOneModel.html) with [upsert mode](http://mongodb.github.io/mongo-java-driver/3.10/javadoc/com/mongodb/client/model/ReplaceOneModel.html) and **create the filter document based on the _id field** which results from applying the configured DocumentIdAdder in the value structure of the sink document.
However, there are other use cases which need different approaches and the **customization option for generating custom write models** can support these. The configuration entry (_mongodb.writemodel.strategy_) allows for such customizations. Currently, the following strategies are implemented:
@@ -505,15 +507,15 @@ The sink connector can also be used in a different operation mode in order to ha
* [MongoDB](http://debezium.io/docs/connectors/mongodb/)
* [MySQL](http://debezium.io/docs/connectors/mysql/)
* [PostgreSQL](http://debezium.io/docs/connectors/postgresql/)
-* **Oracle** _coming soon!_ ([early preview at Debezium Project](http://debezium.io/docs/connectors/oracle/))
-* **SQL Server** ([not yet finished at Debezium Project](http://debezium.io/docs/connectors/sqlserver/))
+* **Oracle** ([incubating at Debezium Project](http://debezium.io/docs/connectors/oracle/))
+* **SQL Server** ([incubating at Debezium Project](http://debezium.io/docs/connectors/sqlserver/))
-This effectively allows to replicate all state changes within the source databases into MongoDB collections. Debezium produces very similar CDC events for MySQL and PostgreSQL. The so far addressed use cases worked fine based on the same code which is why there is only one _RdbmsHandler_ implementation to support them both at the moment. Debezium Oracle CDC format will be integrated in a future release.
+This effectively allows to replicate all state changes within the source databases into MongoDB collections. Debezium produces very similar CDC events for MySQL and PostgreSQL. The so far addressed use cases worked fine based on the same code which is why there is only one _RdbmsHandler_ implementation to support them both at the moment. Compatibility with Debezium's Oracle & SQL Server CDC format will be addressed in a future release.
Also note that **both serialization formats (JSON+Schema & AVRO) can be used** depending on which configuration is a better fit for your use case.
##### CDC Handler Configuration
-The sink connector configuration offers a property called *mongodb.change.data.capture.handler* which is set to the fully qualified class name of the respective CDC format handler class. These classes must extend from the provided abstract class *[CdcHandler](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/CdcHandler.java)*. As soon as this configuration property is set the connector runs in **CDC operation mode**. Find below a JSON based configuration sample for the sink connector which uses the current default implementation that is capable to process Debezium CDC MongoDB events. This config can be posted to the [Kafka connect REST endpoint](https://docs.confluent.io/current/connect/restapi.html) in order to run the sink connector.
+The sink connector configuration offers a property called *mongodb.change.data.capture.handler* which is set to the fully qualified class name of the respective CDC format handler class. These classes must extend from the provided abstract class *[CdcHandler](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/CdcHandler.java)*. As soon as this configuration property is set the connector runs in **CDC operation mode**. Find below a JSON based configuration sample for the sink connector which uses the current default implementation that is capable to process Debezium CDC MongoDB events. This config can be posted to the [Kafka connect REST endpoint](https://docs.confluent.io/current/connect/references/restapi.html) in order to run the sink connector.
```json
{
@@ -542,7 +544,7 @@ mongodb.delete.on.null.values=true
Based on this setting the sink connector tries to delete a MongoDB document from the corresponding collection based on the sink record's key or actually the resulting *_id* value thereof, which is generated according to the specified [DocumentIdAdder](https://github.com/hpgrahsl/kafka-connect-mongodb#documentidadder-mandatory).
### MongoDB Persistence
-The sink records are converted to BSON documents which are in turn inserted into the corresponding MongoDB target collection. The implementation uses unorderd bulk writes. According to the chosen write model strategy either a [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html) or an [UpdateOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOneModel.html) - both of which are run in [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) - is used whenever inserts or updates are handled. If the connector is configured to process convention-based deletes when _null_ values of sink records are discovered then it uses a [DeleteOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/DeleteOneModel.html) respectively.
+The sink records are converted to BSON documents which are in turn inserted into the corresponding MongoDB target collection. The implementation uses unorderd bulk writes. According to the chosen write model strategy either a [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.10/javadoc/com/mongodb/client/model/ReplaceOneModel.html) or an [UpdateOneModel](http://mongodb.github.io/mongo-java-driver/3.10/javadoc/com/mongodb/client/model/ReplaceOneModel.html) - both of which are run in [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) - is used whenever inserts or updates are handled. If the connector is configured to process convention-based deletes when _null_ values of sink records are discovered then it uses a [DeleteOneModel](http://mongodb.github.io/mongo-java-driver/3.10/javadoc/com/mongodb/client/model/ReplaceOneModel.html) respectively.
Data is written using acknowledged writes and the configured write concern level of the connection as specified in the connection URI. If the bulk write fails (totally or partially) errors are logged and a simple retry logic is in place. More robust/sophisticated failure mode handling has yet to be implemented.
@@ -550,25 +552,28 @@ Data is written using acknowledged writes and the configured write concern level
At the moment the following settings can be configured by means of the *connector.properties* file. For a config file containing default settings see [this example](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/config/MongoDbSinkConnector.properties).
-| Name | Description | Type | Default | Valid Values | Importance |
-|-------------------------------------|----------------------------------------------------------------------------------------|---------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------|------------|
-| mongodb.collection | single sink collection name to write to | string | kafkatopic | | high |
-| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true | | high |
-| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | valid fully-qualified class name which implements IdStrategy and is available on the classpath | high |
-| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false | | medium |
-| mongodb.max.batch.size | maximum number of sink records to possibly batch together for processing | int | 0 | [0,...] | medium |
-| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 | [0,...] | medium |
-| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 | [0,...] | medium |
-| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | valid fully-qualified class name which extends CdcHandler and is available on the classpath | low |
-| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | list of valid fully-qualified class names which implement IdStrategy and are available on the classpath | low |
-| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings | string | [] | | low |
-| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings | string | [] | | low |
-| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" | | low |
-| mongodb.key.projection.type | whether or not and which key projection to use | string | none | [none, blacklist, whitelist] | low |
-| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | list of valid fully-qualified class names which extend PostProcessor and are available on the classpath | low |
-| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" | | low |
-| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
-| mongodb.writemodel.strategy | how to build the write models for the sink documents | string | at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy | | low |
+| Name | Description | Type | Default | Valid Values | Importance |
+|-------------------------------------|------------------------------------------------------------------------------------------------------|---------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------|------------|
+| mongodb.collection | single sink collection name to write to | string | "" | | high |
+| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true | | high |
+| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | | high |
+| mongodb.collections | names of sink collections to write to for which there can be topic-level specific properties defined | string | "" | | medium |
+| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false | | medium |
+| mongodb.max.batch.size | maximum number of sink records to possibly batch together for processing | int | 0 | [0,...] | medium |
+| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 | [0,...] | medium |
+| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 | [0,...] | medium |
+| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | | low |
+| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | | low |
+| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings | string | [] | | low |
+| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings | string | [] | | low |
+| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" | | low |
+| mongodb.key.projection.type | whether or not and which key projection to use | string | none | [none, blacklist, whitelist] | low |
+| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | | low |
+| mongodb.rate.limiting.every.n | after how many processed batches the rate limit should trigger (NO rate limiting if n=0) | int | 0 | [0,...] | low |
+| mongodb.rate.limiting.timeout | how long in ms processing should wait before continue processing | int | 0 | [0,...] | low |
+| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" | | low |
+| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
+| mongodb.writemodel.strategy | how to build the write models for the sink documents | string | at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy | | low |
The above listed *connector.properties* are the 'original' (still valid / supported) way to configure the sink connector. The main drawback with it is that only one MongoDB collection could be used so far to sink data from either a single / multiple Kafka topic(s).
@@ -596,6 +601,11 @@ mongodb.collection.blah-t=blah-c
```
+**NOTE:** In case there is no explicit mapping between Kafka topic names and MongoDB collection names the following convention applies:
+
+* if the configuration property for ```mongodb.collection``` is set to any non-empty string this MongoDB collection name will be taken for any Kafka topic for which there is no defined mapping
+* if no _default name_ is configured with the above configuration property the connector falls back to using the original Kafka topic name as MongoDB collection name
+
##### Individual Settings for each Collection
Configuration properties can then be defined specifically for any of the collections for which there is a named mapping defined. The following configuration fragments show how to apply different settings for *foo-c* and *blah-c* MongoDB sink collections.
@@ -683,7 +693,7 @@ If you like this project and want to support its further development and maintan
This project is licensed according to [Apache License Version 2.0](https://www.apache.org/licenses/LICENSE-2.0)
```
-Copyright (c) 2018. Hans-Peter Grahsl (grahslhp@gmail.com)
+Copyright (c) 2019. Hans-Peter Grahsl (grahslhp@gmail.com)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.