Skip to content

Commit

Permalink
Add functionality to map kafka-headers into http headers
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Nov 20, 2024
1 parent 27aef30 commit e8a306a
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,20 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur
json payload = check value:fromJsonString(message);
websubhub:ContentDistributionMessage distributionMsg = {
content: payload,
contentType: mime:APPLICATION_JSON
contentType: mime:APPLICATION_JSON,
headers: check getHeaders(kafkaRecord)
};
return distributionMsg;
}

isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map<string|string[]>|error {
map<string|string[]> headers = {};
foreach var ['key, value] in kafkaRecord.headers.entries().toArray() {
if value is string || value is string[] {
headers['key] = value;
} else if value is byte[] {
headers['key] = check string:fromBytes(value);
}
}
return headers;
}

0 comments on commit e8a306a

Please sign in to comment.