Skip to content

Commit

Permalink
Add support to map HTTP headers to kafka-headers
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Nov 20, 2024
1 parent e8a306a commit 3550385
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions examples/kafka-hub/hub/modules/persistence/persistence.bal
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ballerina/websubhub;
import kafkaHub.config;
import kafkaHub.connections as conn;
import ballerinax/kafka;

public isolated function addRegsiteredTopic(websubhub:TopicRegistration message) returns error? {
check updateHubState(message);
Expand Down Expand Up @@ -49,8 +50,18 @@ public isolated function addUpdateMessage(string topicName, websubhub:UpdateMess
check produceKafkaMessage(topicName, payload);
}

isolated function produceKafkaMessage(string topicName, json payload) returns error? {
byte[] serializedContent = payload.toJsonString().toBytes();
check conn:statePersistProducer->send({ topic: topicName, value: serializedContent });
isolated function produceKafkaMessage(string topicName, json payload,
map<string|string[]> headers = {}) returns error? {
kafka:AnydataProducerRecord message = getProducerMsg(topicName, payload, headers);
check conn:statePersistProducer->send(message);
check conn:statePersistProducer->'flush();
}

isolated function getProducerMsg(string topic, json payload,
map<string|string[]> headers) returns kafka:AnydataProducerRecord {
byte[] value = payload.toJsonString().toBytes();
if headers.length() == 0 {
return { topic, value };
}
return { topic, value, headers };
}

0 comments on commit 3550385

Please sign in to comment.