Skip to content

Commit

Permalink
Added env variables to set replication factor and number of partition…
Browse files Browse the repository at this point in the history
…s for topics in kafka
  • Loading branch information
KaterynaSenchenko committed Apr 27, 2020
1 parent 0cb75c7 commit 62bec46
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 16 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2020-04-27 v1.1.5
* [MODPUBSUB-83](https://issues.folio.org/browse/MODPUBSUB-83) Add env variable to set replication factor and number of partitions for topics in kafka

## 2020-04-22 v1.1.4
* Extended README documentation
* Added creating of topics on module startup
Expand Down
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ See the file "[LICENSE](LICENSE)" for more information.
* [Docker](#docker)
* [Installing the module](#installing-the-module)
* [Deploying the module](#deploying-the-module)
* [Environment variables](#environment-variables)
* [Verifying the module can connect and work with kafka](#verifying-the-module-can-connect-and-work-with-kafka)
* [Database schemas](#Database-schemas)
* [PubSub Client](#PubSub-Client)
Expand Down Expand Up @@ -97,6 +98,38 @@ curl -w '\n' -X POST -D - \
http://localhost:9130/_/proxy/tenants/<tenant_name>/modules
```

## Environment variables
Pubsub requires kafka to be running, and to ensure it can connect and interact with kafka the following environment variable should be specified on deployment:
```
{
"name": "KAFKA_HOST",
"value": "10.0.2.15"
},
{
"name": "KAFKA_PORT",
"value": "9092"
},
{
"name": "OKAPI_URL",
"value": "http://10.0.2.15:9130"
}
```
There are two additional parameters required for pubsub to create topics in kafka - number of partitions and replication factor.
The replication factor controls how many servers will replicate each message that is written. If replication factor set to 3 then up to 2 servers can fail before access to the data will be lost.
The partition count controls how many logs the topic will be sharded into.

```
{
"name": "REPLICATION_FACTOR",
"value": "3"
},
{
"name": "NUMBER_OF_PARTITIONS",
"value": "1"
}
```
If these values are not set then topics will be created with 1 partition and 1 replica.

## Verifying the module can connect and work with kafka

To verify that pubsub can successfully connect and work with kafka send the following requests:
Expand Down
12 changes: 12 additions & 0 deletions mod-pubsub-server/src/main/java/org/folio/kafka/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public class KafkaConfig {
private String kafkaPort;
@Value("${OKAPI_URL:http://10.0.2.15:9130}")
private String okapiUrl;
@Value("${REPLICATION_FACTOR:1}")
private int replicationFactor;
@Value("${NUMBER_OF_PARTITIONS:1}")
private int numberOfPartitions;

public String getKafkaHost() {
return kafkaHost;
Expand Down Expand Up @@ -52,4 +56,12 @@ public Map<String, String> getConsumerProps() {
public String getKafkaUrl() {
return kafkaHost + ":" + kafkaPort;
}

public int getReplicationFactor() {
return replicationFactor;
}

public int getNumberOfPartitions() {
return numberOfPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ public interface KafkaTopicService {
*
* @param eventTypes list of event types, for which topics should be created
* @param tenantId tenant id, for which topics should be created
* @param numPartitions number of partitions
* @param replicationFactor replication factor
* @return future with true if succeeded
*/
Future<Boolean> createTopics(List<String> eventTypes, String tenantId, int numPartitions, short replicationFactor);
Future<Boolean> createTopics(List<String> eventTypes, String tenantId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.vertx.kafka.admin.KafkaAdminClient;
import io.vertx.kafka.admin.NewTopic;
import org.apache.commons.lang3.StringUtils;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.PubSubConfig;
import org.folio.services.KafkaTopicService;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -21,16 +22,18 @@ public class KafkaTopicServiceImpl implements KafkaTopicService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTopicServiceImpl.class);

private KafkaAdminClient kafkaAdminClient;
private KafkaConfig kafkaConfig;

public KafkaTopicServiceImpl(@Autowired KafkaAdminClient kafkaAdminClient) {
public KafkaTopicServiceImpl(@Autowired KafkaAdminClient kafkaAdminClient, @Autowired KafkaConfig kafkaConfig) {
this.kafkaAdminClient = kafkaAdminClient;
this.kafkaConfig = kafkaConfig;
}

@Override
public Future<Boolean> createTopics(List<String> eventTypes, String tenantId, int numPartitions, short replicationFactor) {
public Future<Boolean> createTopics(List<String> eventTypes, String tenantId) {
Promise<Boolean> promise = Promise.promise();
List<NewTopic> topics = eventTypes.stream()
.map(eventType -> new NewTopic(new PubSubConfig(tenantId, eventType).getTopicName(), numPartitions, replicationFactor))
.map(eventType -> new NewTopic(new PubSubConfig(tenantId, eventType).getTopicName(), kafkaConfig.getNumberOfPartitions(), (short) kafkaConfig.getReplicationFactor()))
.collect(Collectors.toList());
kafkaAdminClient.createTopics(topics, ar -> {
if (ar.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ public class MessagingModuleServiceImpl implements MessagingModuleService {

private static final Logger LOGGER = LoggerFactory.getLogger(MessagingModuleServiceImpl.class);

private static final int NUMBER_OF_PARTITIONS = 1;
private static final short REPLICATION_FACTOR = 1;

private MessagingModuleDao messagingModuleDao;
private EventDescriptorDao eventDescriptorDao;
private KafkaTopicService kafkaTopicService;
Expand Down Expand Up @@ -113,7 +110,7 @@ public Future<Boolean> savePublisher(PublisherDescriptor publisherDescriptor, St

return messagingModuleDao.save(messagingModules)
.onSuccess(ar -> cache.invalidate())
.compose(ar -> kafkaTopicService.createTopics(eventTypes, tenantId, NUMBER_OF_PARTITIONS, REPLICATION_FACTOR));
.compose(ar -> kafkaTopicService.createTopics(eventTypes, tenantId));
}

@Override
Expand All @@ -133,7 +130,7 @@ public Future<Boolean> saveSubscriber(SubscriberDescriptor subscriberDescriptor,

return messagingModuleDao.save(messagingModules)
.onSuccess(ar -> cache.invalidate())
.compose(ar -> kafkaTopicService.createTopics(eventTypes, params.getTenantId(), NUMBER_OF_PARTITIONS, REPLICATION_FACTOR))
.compose(ar -> kafkaTopicService.createTopics(eventTypes, params.getTenantId()))
.compose(ar -> consumerService.subscribe(eventTypes, params));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
@Component
public class StartupServiceImpl implements StartupService {

private static final int NUMBER_OF_PARTITIONS = 1;
private static final short REPLICATION_FACTOR = 1;

private Vertx vertx;
private KafkaConfig kafkaConfig;
private MessagingModuleDao messagingModuleDao;
Expand All @@ -48,8 +45,7 @@ public void initSubscribers() {
OkapiConnectionParams params = new OkapiConnectionParams(vertx);
params.setOkapiUrl(kafkaConfig.getOkapiUrl());
params.setTenantId(messagingModule.getTenantId());
kafkaTopicService.createTopics(Collections.singletonList(messagingModule.getEventType()),
messagingModule.getTenantId(), NUMBER_OF_PARTITIONS, REPLICATION_FACTOR)
kafkaTopicService.createTopics(Collections.singletonList(messagingModule.getEventType()), messagingModule.getTenantId())
.compose(ar -> consumerService.subscribe(Collections.singletonList(messagingModule.getEventType()), params));
});
return Future.succeededFuture();
Expand Down

0 comments on commit 62bec46

Please sign in to comment.