From e8a306a7fa7d10a34ee187775125699dfe87acaf Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 14:16:44 +0530 Subject: [PATCH] Add functionality to map kafka-headers into http headers --- examples/kafka-hub/hub/websub_subscribers.bal | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 761b8b46..05042e73 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -131,7 +131,20 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur json payload = check value:fromJsonString(message); websubhub:ContentDistributionMessage distributionMsg = { content: payload, - contentType: mime:APPLICATION_JSON + contentType: mime:APPLICATION_JSON, + headers: check getHeaders(kafkaRecord) }; return distributionMsg; } + +isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { + map headers = {}; + foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { + if value is string || value is string[] { + headers['key] = value; + } else if value is byte[] { + headers['key] = check string:fromBytes(value); + } + } + return headers; +}