Skip to content

Commit

Permalink
Fix partition assignment logic error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Dec 12, 2024
1 parent de0e9d7 commit c2dc7c3
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions examples/kafka-hub/hub/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public isolated function createMessageConsumer(string topicName, string groupNam

kafka:TopicPartition[] parititionsWithoutCmtdOffsets = [];
foreach kafka:TopicPartition partition in kafkaTopicPartitions {
kafka:PartitionOffset|error? offset = consumerEp->getCommittedOffset(partition);
if offset is error {
kafka:PartitionOffset|kafka:Error? offset = consumerEp->getCommittedOffset(partition);
if offset is kafka:Error {
log:printError("Error occurred while retrieving the commited offsets for the topic-partition", offset);
return offset;
}
Expand All @@ -141,7 +141,7 @@ public isolated function createMessageConsumer(string topicName, string groupNam
}

if offset is kafka:PartitionOffset {
kafka:Error? kafkaSeekErr = check consumerEp->seek(offset);
kafka:Error? kafkaSeekErr = consumerEp->seek(offset);
if kafkaSeekErr is error {
log:printError("Error occurred while assigning seeking partitions for the consumer", kafkaSeekErr);
return kafkaSeekErr;
Expand All @@ -150,8 +150,8 @@ public isolated function createMessageConsumer(string topicName, string groupNam
}

if parititionsWithoutCmtdOffsets.length() > 0 {
kafka:Error? kafkaSeekErr = check consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets);
if kafkaSeekErr is error {
kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions (for paritions without committed offsets) for the consumer", kafkaSeekErr);
return kafkaSeekErr;
}
Expand Down

0 comments on commit c2dc7c3

Please sign in to comment.