Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parallel message-delivery #1059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL,
# + groupName - The consumer group name
# + return - `kafka:Consumer` if succcessful or else `error`
public isolated function createMessageConsumer(string topicName, string groupName) returns kafka:Consumer|error {
// Messages are distributed to subscribers in parallel.
// In this scenario, manually committing offsets is unnecessary because
// the next message polling starts as soon as the worker begins delivering messages to the subscribers.
// Therefore, auto-commit is enabled to handle offset management automatically.
// Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376
kafka:ConsumerConfiguration consumerConfiguration = {
groupId: groupName,
topics: [topicName],
autoCommit: false,
autoCommit: true,
secureSocket: secureSocketConfig,
securityProtocol: kafka:PROTOCOL_SSL,
maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS
Expand Down
23 changes: 9 additions & 14 deletions examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
});
do {
while true {
kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
if !isValidConsumer(subscription.hubTopic, subscriberId) {
fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`);
}
_ = check notifySubscribers(records, clientEp, consumerEp);
_ = start notifySubscribers(records, clientEp);
}
} on fail var e {
util:logError("Error occurred while sending notification to subscriber", e);
Expand All @@ -110,19 +110,14 @@ isolated function isValidSubscription(string subscriberId) returns boolean {
}
}

isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp) returns error? {
foreach var kafkaRecord in records {
var message = deSerializeKafkaRecord(kafkaRecord);
if message is websubhub:ContentDistributionMessage {
var response = clientEp->notifyContentDistribution(message);
if response is websubhub:ContentDistributionSuccess {
_ = check consumerEp->commit();
return;
}
return response;
} else {
log:printError("Error occurred while retrieving message data", err = message.message());
isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? {
do {
foreach var kafkaRecord in records {
websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord);
_ = check clientEp->notifyContentDistribution(message);
}
} on fail error e {
log:printError("Error occurred while delivering messages to the subscriber", err = e.message());
}
}

Expand Down
Loading