diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index 6c098573..41e48279 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -17,6 +17,7 @@ import ballerina/websubhub; import kafkaHub.config; import kafkaHub.connections as conn; +import ballerinax/kafka; public isolated function addRegsiteredTopic(websubhub:TopicRegistration message) returns error? { check updateHubState(message); @@ -49,8 +50,18 @@ public isolated function addUpdateMessage(string topicName, websubhub:UpdateMess check produceKafkaMessage(topicName, payload); } -isolated function produceKafkaMessage(string topicName, json payload) returns error? { - byte[] serializedContent = payload.toJsonString().toBytes(); - check conn:statePersistProducer->send({ topic: topicName, value: serializedContent }); +isolated function produceKafkaMessage(string topicName, json payload, + map headers = {}) returns error? { + kafka:AnydataProducerRecord message = getProducerMsg(topicName, payload, headers); + check conn:statePersistProducer->send(message); check conn:statePersistProducer->'flush(); } + +isolated function getProducerMsg(string topic, json payload, + map headers) returns kafka:AnydataProducerRecord { + byte[] value = payload.toJsonString().toBytes(); + if headers.length() == 0 { + return { topic, value }; + } + return { topic, value, headers }; +}