diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 318cb12f..48fae53c 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -130,8 +130,8 @@ public isolated function createMessageConsumer(string topicName, string groupNam kafka:TopicPartition[] parititionsWithoutCmtdOffsets = []; foreach kafka:TopicPartition partition in kafkaTopicPartitions { - kafka:PartitionOffset|error? offset = consumerEp->getCommittedOffset(partition); - if offset is error { + kafka:PartitionOffset|kafka:Error? offset = consumerEp->getCommittedOffset(partition); + if offset is kafka:Error { log:printError("Error occurred while retrieving the commited offsets for the topic-partition", offset); return offset; } @@ -141,7 +141,7 @@ public isolated function createMessageConsumer(string topicName, string groupNam } if offset is kafka:PartitionOffset { - kafka:Error? kafkaSeekErr = check consumerEp->seek(offset); + kafka:Error? kafkaSeekErr = consumerEp->seek(offset); if kafkaSeekErr is error { log:printError("Error occurred while assigning seeking partitions for the consumer", kafkaSeekErr); return kafkaSeekErr; @@ -150,8 +150,8 @@ public isolated function createMessageConsumer(string topicName, string groupNam } if parititionsWithoutCmtdOffsets.length() > 0 { - kafka:Error? kafkaSeekErr = check consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets); - if kafkaSeekErr is error { + kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets); + if kafkaSeekErr is kafka:Error { log:printError("Error occurred while assigning seeking partitions (for paritions without committed offsets) for the consumer", kafkaSeekErr); return kafkaSeekErr; }