diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 50f11e6f..96fe9b2a 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -96,10 +96,15 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL, # + groupName - The consumer group name # + return - `kafka:Consumer` if succcessful or else `error` public isolated function createMessageConsumer(string topicName, string groupName) returns kafka:Consumer|error { + // Messages are distributed to subscribers in parallel. + // In this scenario, manually committing offsets is unnecessary because + // the next message polling starts as soon as the worker begins delivering messages to the subscribers. + // Therefore, auto-commit is enabled to handle offset management automatically. + // Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376 kafka:ConsumerConfiguration consumerConfiguration = { groupId: groupName, topics: [topicName], - autoCommit: false, + autoCommit: true, secureSocket: secureSocketConfig, securityProtocol: kafka:PROTOCOL_SSL, maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 17de27f1..4db40497 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -82,11 +82,11 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc }); do { while true { - kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); + readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); if !isValidConsumer(subscription.hubTopic, subscriberId) { fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`); } - _ = check notifySubscribers(records, clientEp, consumerEp); + _ = start notifySubscribers(records, clientEp); } } on fail var e { util:logError("Error occurred while sending notification to subscriber", e); @@ -110,19 +110,14 @@ isolated function isValidSubscription(string subscriberId) returns boolean { } } -isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp) returns error? { - foreach var kafkaRecord in records { - var message = deSerializeKafkaRecord(kafkaRecord); - if message is websubhub:ContentDistributionMessage { - var response = clientEp->notifyContentDistribution(message); - if response is websubhub:ContentDistributionSuccess { - _ = check consumerEp->commit(); - return; - } - return response; - } else { - log:printError("Error occurred while retrieving message data", err = message.message()); +isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { + do { + foreach var kafkaRecord in records { + websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord); + _ = check clientEp->notifyContentDistribution(message); } + } on fail error e { + log:printError("Error occurred while delivering messages to the subscriber", err = e.message()); } }