Skip to content

Commit

Permalink
Merge pull request #1059 from ayeshLK/kafkahub-mtls-dev2
Browse files Browse the repository at this point in the history
Add support for parallel message-delivery
  • Loading branch information
ayeshLK authored Nov 22, 2024
2 parents 7567770 + 2277ac6 commit b20a510
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
7 changes: 6 additions & 1 deletion examples/kafka-hub/hub/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL,
# + groupName - The consumer group name
# + return - `kafka:Consumer` if succcessful or else `error`
public isolated function createMessageConsumer(string topicName, string groupName) returns kafka:Consumer|error {
// Messages are distributed to subscribers in parallel.
// In this scenario, manually committing offsets is unnecessary because
// the next message polling starts as soon as the worker begins delivering messages to the subscribers.
// Therefore, auto-commit is enabled to handle offset management automatically.
// Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376
kafka:ConsumerConfiguration consumerConfiguration = {
groupId: groupName,
topics: [topicName],
autoCommit: false,
autoCommit: true,
secureSocket: secureSocketConfig,
securityProtocol: kafka:PROTOCOL_SSL,
maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS
Expand Down
23 changes: 9 additions & 14 deletions examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
});
do {
while true {
kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
if !isValidConsumer(subscription.hubTopic, subscriberId) {
fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`);
}
_ = check notifySubscribers(records, clientEp, consumerEp);
_ = start notifySubscribers(records, clientEp);
}
} on fail var e {
util:logError("Error occurred while sending notification to subscriber", e);
Expand All @@ -110,19 +110,14 @@ isolated function isValidSubscription(string subscriberId) returns boolean {
}
}

isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp) returns error? {
foreach var kafkaRecord in records {
var message = deSerializeKafkaRecord(kafkaRecord);
if message is websubhub:ContentDistributionMessage {
var response = clientEp->notifyContentDistribution(message);
if response is websubhub:ContentDistributionSuccess {
_ = check consumerEp->commit();
return;
}
return response;
} else {
log:printError("Error occurred while retrieving message data", err = message.message());
isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? {
do {
foreach var kafkaRecord in records {
websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord);
_ = check clientEp->notifyContentDistribution(message);
}
} on fail error e {
log:printError("Error occurred while delivering messages to the subscriber", err = e.message());
}
}

Expand Down

0 comments on commit b20a510

Please sign in to comment.