Skip to content

Commit

Permalink
implements feature #67 to have fallback topic name support
Browse files Browse the repository at this point in the history
  • Loading branch information
hpgrahsl committed Jan 22, 2019
1 parent 01361bb commit ef3162a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
2 changes: 1 addition & 1 deletion config/MongoDbSinkConnector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector
#specific MongoDB sink connector props
#listed below are the defaults
mongodb.connection.uri=mongodb://localhost:27017/kafkaconnect?w=1&journal=true
mongodb.collection=kafkatopic
mongodb.collection=
mongodb.max.num.retries=3
mongodb.retries.defer.timeout=5000
mongodb.value.projection.type=none
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>at.grahsl.kafka.connect</groupId>
<artifactId>kafka-connect-mongodb</artifactId>
<version>1.2.1-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>kafka-connect-mongodb</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public enum FieldProjectionTypes {

public static final String MONGODB_CONNECTION_URI_DEFAULT = "mongodb://localhost:27017/kafkaconnect?w=1&journal=true";
public static final String MONGODB_COLLECTIONS_DEFAULT = "";
public static final String MONGODB_COLLECTION_DEFAULT = "kafkatopic";
public static final String MONGODB_COLLECTION_DEFAULT = "";
public static final int MONGODB_MAX_NUM_RETRIES_DEFAULT = 3;
public static final int MONGODB_RETRIES_DEFER_TIMEOUT_DEFAULT = 5000;
public static final String MONGODB_VALUE_PROJECTION_TYPE_DEFAULT = "none";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ Map<String, MongoDbSinkRecordBatches> createSinkRecordBatchesPerTopic(Collection
LOGGER.debug("buffering sink records into grouped topic batches");
records.forEach(r -> {
String collection = sinkConfig.getString(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF,r.topic());
if(collection.isEmpty()) {
LOGGER.debug("no explicit collection name mapping found for topic {} "
+ "and default collection name was empty ",r.topic());
LOGGER.debug("using topic name {} as collection name",r.topic());
collection = r.topic();
}
String namespace = database.getName()+"."+collection;
MongoCollection<BsonDocument> mongoCollection = cachedCollections.get(namespace);
if(mongoCollection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,36 @@ void testCreateSinkRecordBatchesPerTopicWithDefaultTopicAndNoBatching() {
TopicSettingsAndResults settings = new TopicSettingsAndResults("kafkatopic","kafkatopic",50,0);
String namespace = "kafkaconnect."+settings.collection;

Map<String,String> props = new HashMap<>();
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF,settings.collection);
sinkTask.start(props);

List<SinkRecord> sinkRecords = createSinkRecordList(settings);
Map<String, MongoDbSinkRecordBatches> batchesMap = sinkTask.createSinkRecordBatchesPerTopic(sinkRecords);

assertEquals(1, batchesMap.size(), "wrong number of entries in batch map");
assertNotNull(batchesMap.get(namespace), "batch map entry for "+namespace+" was null");

assertAll("verify contents of created batches map",
() -> assertEquals(1, batchesMap.get(namespace).getBufferedBatches().size(),
"wrong number of batches in map entry for "+namespace),
() -> assertEquals(settings.numRecords, batchesMap.get(namespace).getBufferedBatches().get(0).size(),
"wrong number of records in single batch of map entry for "+namespace),
() -> assertEquals(sinkRecords, batchesMap.get(namespace).getBufferedBatches().get(0),
"sink record list mismatch in single batch of map entry for"+namespace)
);

}

@Test
@DisplayName("test create sink record batches per topic with NO default topic and no batching")
void testCreateSinkRecordBatchesPerTopicWithNoDefaultTopicAndNoBatching() {

MongoDbSinkTask sinkTask = new MongoDbSinkTask();

TopicSettingsAndResults settings = new TopicSettingsAndResults("kafkaesque","kafkaesque",50,0);
String namespace = "kafkaconnect."+settings.collection;

sinkTask.start(new HashMap<>());

List<SinkRecord> sinkRecords = createSinkRecordList(settings);
Expand Down

0 comments on commit ef3162a

Please sign in to comment.