From f36563c0a43b395f6cfbb1278f5b598eb82f43a8 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 14:25:50 +0530 Subject: [PATCH 01/13] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 6 +++--- ballerina/CompilerPlugin.toml | 2 +- ballerina/Dependencies.toml | 9 +++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 9d37e659..98008be0 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "kafka" -version = "3.10.0" +version = "3.10.1" authors = ["Ballerina"] keywords = ["kafka", "event streaming", "network", "messaging"] repository = "https://github.com/ballerina-platform/module-ballerinax-kafka" @@ -15,8 +15,8 @@ graalvmCompatible = true [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" artifactId = "kafka-native" -version = "3.10.0" -path = "../native/build/libs/kafka-native-3.10.0.jar" +version = "3.10.1" +path = "../native/build/libs/kafka-native-3.10.1-SNAPSHOT.jar" [[platform.java17.dependency]] groupId = "org.apache.kafka" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 5d6bff77..4aa1e9e6 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "kafka-compiler-plugin" class = "io.ballerina.stdlib.kafka.plugin.KafkaCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.0.jar" +path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 934f04a5..c924fe4e 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -23,7 +23,7 @@ dependencies = [ [[package]] org = "ballerina" name = "cache" -version = "3.7.0" +version = "3.7.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "constraint"}, @@ -71,7 +71,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.0" +version = "2.10.10" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -283,7 +283,7 @@ dependencies = [ [[package]] org = "ballerina" name = "observe" -version = "1.2.0" +version = "1.2.2" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -325,6 +325,7 @@ modules = [ org = "ballerina" name = "time" version = "2.4.0" +scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -389,7 +390,7 @@ modules = [ [[package]] org = "ballerinax" name = "kafka" -version = "3.10.0" +version = "3.10.1" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "crypto"}, From 587930ad81df55b3d35533d9f7764b16e9695fef Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 16:24:00 +0530 Subject: [PATCH 02/13] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index c924fe4e..e003d8d7 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -71,7 +71,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.10" +version = "2.10.11" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, From dc48759a3fe95877dd6af632c0e66fe1e489496f Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 17:15:54 +0530 Subject: [PATCH 03/13] Add header support for records --- ballerina/kafka_records.bal | 16 +++++--- ballerina/producer.bal | 2 +- ballerina/producer_utils.bal | 10 ++--- .../tests/listener_data_binding_tests.bal | 2 + ballerina/tests/producer_client_tests.bal | 2 +- .../producer/SendByteArrayValues.java | 35 +++++++++++++++-- .../stdlib/kafka/utils/KafkaConstants.java | 9 +++-- .../stdlib/kafka/utils/KafkaUtils.java | 38 +++++++++++++++++-- 8 files changed, 90 insertions(+), 24 deletions(-) diff --git a/ballerina/kafka_records.bal b/ballerina/kafka_records.bal index 5416cca7..90d70cce 100644 --- a/ballerina/kafka_records.bal +++ b/ballerina/kafka_records.bal @@ -197,6 +197,7 @@ public type TopicPartition record {| # + value - Record content # + timestamp - Timestamp of the record, in milliseconds since epoch # + offset - Topic partition position in which the consumed record is stored +# + headers - Map of headers included with the record # # Deprecated # Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord # instead to support data-binding @@ -206,6 +207,7 @@ public type ConsumerRecord record {| byte[] value; int timestamp; PartitionOffset offset; + map headers; |}; # Type related to anydata consumer record. @@ -214,11 +216,13 @@ public type ConsumerRecord record {| # + value - Anydata record content # + timestamp - Timestamp of the record, in milliseconds since epoch # + offset - Topic partition position in which the consumed record is stored +# + headers - Map of headers included with the record public type AnydataConsumerRecord record {| anydata key?; anydata value; int timestamp; PartitionOffset offset; + map headers; |}; # Subtype related to `kafka:AnydataConsumerRecord` record. @@ -250,17 +254,19 @@ public type ProducerRecord record {| # Details related to the anydata producer record. # -# + topic - Topic to which the record will be appended -# + key - Key that is included in the record -# + value - Anydata record content -# + timestamp - Timestamp of the record, in milliseconds since epoch -# + partition - Partition to which the record should be sent +# + topic - Topic to which the record will be appended +# + key - Key that is included in the record +# + value - Anydata record content +# + timestamp - Timestamp of the record, in milliseconds since epoch +# + partition - Partition to which the record should be sent +# + headers - Map of headers to be included with the record public type AnydataProducerRecord record {| string topic; anydata key?; anydata value; int timestamp?; int partition?; + map headers?; |}; # Subtype related to `kafka:AnydataProducerRecord` record. diff --git a/ballerina/producer.bal b/ballerina/producer.bal index 20353fa5..6fc554d8 100644 --- a/ballerina/producer.bal +++ b/ballerina/producer.bal @@ -116,7 +116,7 @@ public client isolated class Producer { } else if anydataKey !is () { key = anydataKey.toJsonString().toBytes(); } - return sendByteArrayValues(self, value, producerRecord.topic, key, + return sendByteArrayValues(self, value, producerRecord.topic, producerRecord.headers, key, producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType); } } diff --git a/ballerina/producer_utils.bal b/ballerina/producer_utils.bal index 8ddcc559..1d870c71 100644 --- a/ballerina/producer_utils.bal +++ b/ballerina/producer_utils.bal @@ -16,14 +16,14 @@ import ballerina/jballerina.java; -isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, anydata? key, int? partition, +isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, map? headers, anydata? key, int? partition, int? timestamp, string keySerializerType) returns Error? { if key is () { - return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp); + return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp, headers); } if keySerializerType == SER_BYTE_ARRAY { if key is byte[] { - return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp); + return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp, headers); } panic getKeyTypeMismatchError(BYTE_ARRAY); } @@ -31,13 +31,13 @@ isolated function sendByteArrayValues(Producer producer, byte[] value, string to //Send byte[] values with different types of keys isolated function sendByteArrayValuesNilKeys(Producer producer, byte[] value, string topic, int? partition = (), - int? timestamp = ()) returns Error? = + int? timestamp = (), map? headers = ()) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues" } external; isolated function sendByteArrayValuesByteArrayKeys(Producer producer, byte[] value, string topic, byte[] key, - int? partition = (), int? timestamp = ()) returns Error? = + int? partition = (), int? timestamp = (), map? headers = ()) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues" } external; diff --git a/ballerina/tests/listener_data_binding_tests.bal b/ballerina/tests/listener_data_binding_tests.bal index 4085e762..eb923959 100644 --- a/ballerina/tests/listener_data_binding_tests.bal +++ b/ballerina/tests/listener_data_binding_tests.bal @@ -91,6 +91,7 @@ public type IntConsumerRecord record {| int value; int timestamp; PartitionOffset offset; + map headers; |}; public type FloatConsumerRecord record {| @@ -146,6 +147,7 @@ public type JsonConsumerRecord record {| json key?; int timestamp; json value; + map headers; |}; public type PayloadConsumerRecord record {| diff --git a/ballerina/tests/producer_client_tests.bal b/ballerina/tests/producer_client_tests.bal index 83d9069c..83edae29 100644 --- a/ballerina/tests/producer_client_tests.bal +++ b/ballerina/tests/producer_client_tests.bal @@ -102,7 +102,7 @@ function producerKeyTypeMismatchErrorTest() returns error? { string topic = "key-type-mismatch-error-test-topic"; Producer producer = check new (DEFAULT_URL, producerConfiguration); string message = "Hello, Ballerina"; - error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, MESSAGE_KEY, 0, (), SER_BYTE_ARRAY); + error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, (), MESSAGE_KEY, 0, (), SER_BYTE_ARRAY); if result is error { string expectedErr = "Invalid type found for Kafka key. Expected key type: 'byte[]'."; test:assertEquals(result.message(), expectedErr); diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java index 73925fbc..719ca22c 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java @@ -20,12 +20,18 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + import static io.ballerina.stdlib.kafka.utils.KafkaConstants.ALIAS_PARTITION; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getIntValue; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getLongValue; @@ -44,22 +50,43 @@ public class SendByteArrayValues extends Send { // ballerina byte[] public static Object sendByteArrayValuesNilKeys(Environment env, BObject producer, BArray value, BString topic, - Object partition, Object timestamp) { + Object partition, Object timestamp, Object bHeaders) { Integer partitionValue = getIntValue(partition, ALIAS_PARTITION, logger); Long timestampValue = getLongValue(timestamp); + List
headers = getHeadersFromBHeaders(bHeaders); ProducerRecord kafkaRecord = new ProducerRecord<>(topic.getValue(), partitionValue, timestampValue, - null, value.getBytes()); + null, value.getBytes(), headers); return sendKafkaRecord(env, kafkaRecord, producer); } // ballerina byte[] and ballerina byte[] public static Object sendByteArrayValuesByteArrayKeys(Environment env, BObject producer, BArray value, BString topic, BArray key, Object partition, - Object timestamp) { + Object timestamp, Object bHeaders) { Integer partitionValue = getIntValue(partition, ALIAS_PARTITION, logger); Long timestampValue = getLongValue(timestamp); + List
headers = getHeadersFromBHeaders(bHeaders); ProducerRecord kafkaRecord = new ProducerRecord<>(topic.getValue(), partitionValue, - timestampValue, key.getBytes(), value.getBytes()); + timestampValue, key.getBytes(), value.getBytes(), headers); return sendKafkaRecord(env, kafkaRecord, producer); } + + private static List
getHeadersFromBHeaders(Object headerObj) { + List
headers = new ArrayList<>(); + if (headerObj instanceof BMap) { + BMap bHeaders = (BMap) headerObj; + for (BString key : (BString[]) bHeaders.getKeys()) { + BArray headerValues = bHeaders.getArrayValue(key); + if (!headerValues.isEmpty() && headerValues.get(0) instanceof BArray) { + for (int i = 0; i < headerValues.size(); i++) { + BArray headerValue = (BArray) headerValues.get(i); + headers.add(new RecordHeader(key.getValue(), headerValue.getByteArray())); + } + } else if (!headerValues.isEmpty()) { + headers.add(new RecordHeader(key.getValue(), headerValues.getByteArray())); + } + } + } + return headers; + } } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java index 0fcabd38..b04053a1 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java @@ -64,10 +64,11 @@ private KafkaConstants() { public static final String KAFKA_RESOURCE_ON_RECORD = "onConsumerRecord"; public static final String KAFKA_RESOURCE_ON_ERROR = "onError"; public static final String KAFKA_RESOURCE_IS_ANYDATA_CONSUMER_RECORD = "isAnydataConsumerRecord"; - public static final String KAFKA_RECORD_KEY = "key"; - public static final String KAFKA_RECORD_VALUE = "value"; - public static final String KAFKA_RECORD_TIMESTAMP = "timestamp"; - public static final String KAFKA_RECORD_PARTITION_OFFSET = "offset"; + public static final BString KAFKA_RECORD_KEY = StringUtils.fromString("key"); + public static final BString KAFKA_RECORD_VALUE = StringUtils.fromString("value"); + public static final BString KAFKA_RECORD_TIMESTAMP = StringUtils.fromString("timestamp"); + public static final BString KAFKA_RECORD_PARTITION_OFFSET = StringUtils.fromString("offset"); + public static final BString KAFKA_RECORD_HEADERS = StringUtils.fromString("headers"); public static final String PARAM_ANNOTATION_PREFIX = "$param$."; public static final BString PARAM_PAYLOAD_ANNOTATION_NAME = StringUtils.fromString( diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index 2977ebdd..9c22408b 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -57,6 +57,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; import java.io.IOException; @@ -86,6 +88,7 @@ import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSUMER_ENABLE_AUTO_COMMIT_CONFIG; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSUMER_ENABLE_AUTO_SEEK_CONFIG; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_ERROR; +import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_HEADERS; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_KEY; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_PARTITION_OFFSET; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_TIMESTAMP; @@ -614,18 +617,45 @@ public static BMap populateConsumerRecord(ConsumerRecord record Object value = getValueWithIntendedType(valueType, (byte[]) record.value(), record, autoSeek); BMap topicPartition = ValueCreator.createRecordValue(getTopicPartitionRecord(), record.topic(), (long) record.partition()); + BMap bHeaders = getBHeadersFromRecord(record.headers()); BMap consumerRecord = ValueCreator.createRecordValue(recordType); - consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_KEY), key); - consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_VALUE), value); - consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_TIMESTAMP), record.timestamp()); - consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_PARTITION_OFFSET), ValueCreator.createRecordValue( + consumerRecord.put(KAFKA_RECORD_KEY, key); + consumerRecord.put(KAFKA_RECORD_VALUE, value); + consumerRecord.put(KAFKA_RECORD_TIMESTAMP, record.timestamp()); + consumerRecord.put(KAFKA_RECORD_PARTITION_OFFSET, ValueCreator.createRecordValue( getPartitionOffsetRecord(), topicPartition, record.offset())); + consumerRecord.put(KAFKA_RECORD_HEADERS, bHeaders); if (validateConstraints) { validateConstraints(consumerRecord, ValueCreator.createTypedescValue(recordType), record, autoSeek); } return consumerRecord; } + private static BMap getBHeadersFromRecord(Headers headers) { + BMap bHeaderMap = ValueCreator.createMapValue(); + HashMap headerMap = new HashMap<>(); + for (Header header : headers) { + if (headerMap.containsKey(header.key())) { + BArray values = headerMap.get(header.key()); + values.add(values.size(), ValueCreator.createArrayValue(header.value())); + headerMap.put(header.key(), values); + } else { + ArrayType arrayOfByteArrayType = TypeCreator.createArrayType(TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE)); + BArray arrayOfByteArray = ValueCreator.createArrayValue(arrayOfByteArrayType); + arrayOfByteArray.add(0, ValueCreator.createArrayValue(header.value())); + headerMap.put(header.key(), arrayOfByteArray); + } + } + headerMap.forEach((key, value) -> { + if (value.size() > 1) { + bHeaderMap.put(StringUtils.fromString(key), value); + } else { + bHeaderMap.put(StringUtils.fromString(key), value.get(0)); + } + }); + return bHeaderMap; + } + public static BArray getConsumerRecords(ConsumerRecords records, RecordType recordType, boolean readonly, boolean validateConstraints, boolean autoCommit, KafkaConsumer consumer, boolean autoSeek) { From 49d889734355d9145ea6ab678e9703f9ca7a424f Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 17:24:45 +0530 Subject: [PATCH 04/13] Fix checkstyle --- .../main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index 9c22408b..54e88e49 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -640,7 +640,8 @@ private static BMap getBHeadersFromRecord(Headers headers) { values.add(values.size(), ValueCreator.createArrayValue(header.value())); headerMap.put(header.key(), values); } else { - ArrayType arrayOfByteArrayType = TypeCreator.createArrayType(TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE)); + ArrayType arrayOfByteArrayType = TypeCreator.createArrayType(TypeCreator + .createArrayType(PredefinedTypes.TYPE_BYTE)); BArray arrayOfByteArray = ValueCreator.createArrayValue(arrayOfByteArrayType); arrayOfByteArray.add(0, ValueCreator.createArrayValue(header.value())); headerMap.put(header.key(), arrayOfByteArray); From 248a71b9f9ac1dd61d18a92f33181a7fc3bec82b Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 20:12:56 +0530 Subject: [PATCH 05/13] Add testcases --- ballerina/tests/consumer_client_tests.bal | 24 ++++++++++++-- ballerina/tests/listener_client_tests.bal | 33 +++++++++++++++++++ .../stdlib/kafka/utils/KafkaConstants.java | 4 +-- .../stdlib/kafka/utils/KafkaUtils.java | 4 +-- 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/ballerina/tests/consumer_client_tests.bal b/ballerina/tests/consumer_client_tests.bal index 2c4cce8f..5322857d 100644 --- a/ballerina/tests/consumer_client_tests.bal +++ b/ballerina/tests/consumer_client_tests.bal @@ -1483,6 +1483,26 @@ function commitOffsetWithPolledOffsetValue() returns error? { check consumer->close(); } -function sendMessage(anydata message, string topic, anydata? key = ()) returns error? { - return producer->send({ topic: topic, value: message, key }); +@test:Config {enable: true} +function consumerReadHeadersTest() returns error? { + string topic = "consumer-read-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, headers); + check consumer->close(); +} + +function sendMessage(anydata message, string topic, anydata? key = (), map? headers = ()) returns error? { + return producer->send({ topic: topic, value: message, key, headers }); } diff --git a/ballerina/tests/listener_client_tests.bal b/ballerina/tests/listener_client_tests.bal index 9bdeab17..7f8e2a21 100644 --- a/ballerina/tests/listener_client_tests.bal +++ b/ballerina/tests/listener_client_tests.bal @@ -30,6 +30,7 @@ string detachMsg1 = ""; string detachMsg2 = ""; string incorrectEndpointMsg = ""; string receivedTimeoutConfigValue = ""; +map receivedHeaders = {}; int receivedMsgCount = 0; @@ -986,3 +987,35 @@ function listenerWithPollTimeoutConfigTest() returns error? { check configListener.gracefulStop(); test:assertEquals(receivedTimeoutConfigValue, TEST_MESSAGE); } + +@test:Config {enable: true} +function listenerWithConsumerHeadersTest() returns error? { + string topic = "listener-consumer-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE, topic, (), headers); + + Service headersService = + service object { + remote function onConsumerRecord(BytesConsumerRecord[] records) returns error? { + foreach int i in 0 ... records.length() - 1 { + receivedHeaders = records[i].headers; + } + } + }; + + ConsumerConfiguration consumerConfiguration = { + topics: topic, + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "test-listener-group-29", + clientId: "test-listener-29", + pollingInterval: 2, + pollingTimeout: 1 + }; + Listener headersListener = check new (DEFAULT_URL, consumerConfiguration); + check headersListener.attach(headersService); + check headersListener.'start(); + runtime:sleep(3); + check headersListener.gracefulStop(); + test:assertEquals(receivedHeaders, headers); +} diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java index b04053a1..30033d11 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java @@ -64,8 +64,8 @@ private KafkaConstants() { public static final String KAFKA_RESOURCE_ON_RECORD = "onConsumerRecord"; public static final String KAFKA_RESOURCE_ON_ERROR = "onError"; public static final String KAFKA_RESOURCE_IS_ANYDATA_CONSUMER_RECORD = "isAnydataConsumerRecord"; - public static final BString KAFKA_RECORD_KEY = StringUtils.fromString("key"); - public static final BString KAFKA_RECORD_VALUE = StringUtils.fromString("value"); + public static final String KAFKA_RECORD_KEY = "key"; + public static final String KAFKA_RECORD_VALUE = "value"; public static final BString KAFKA_RECORD_TIMESTAMP = StringUtils.fromString("timestamp"); public static final BString KAFKA_RECORD_PARTITION_OFFSET = StringUtils.fromString("offset"); public static final BString KAFKA_RECORD_HEADERS = StringUtils.fromString("headers"); diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index 54e88e49..b2b5b87b 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -619,8 +619,8 @@ public static BMap populateConsumerRecord(ConsumerRecord record (long) record.partition()); BMap bHeaders = getBHeadersFromRecord(record.headers()); BMap consumerRecord = ValueCreator.createRecordValue(recordType); - consumerRecord.put(KAFKA_RECORD_KEY, key); - consumerRecord.put(KAFKA_RECORD_VALUE, value); + consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_KEY), key); + consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_VALUE), value); consumerRecord.put(KAFKA_RECORD_TIMESTAMP, record.timestamp()); consumerRecord.put(KAFKA_RECORD_PARTITION_OFFSET, ValueCreator.createRecordValue( getPartitionOffsetRecord(), topicPartition, record.offset())); From 95881813169ac8cc37d1a044778c62599afc1b2e Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 21:00:28 +0530 Subject: [PATCH 06/13] Update testcases --- ballerina/tests/consumer_constraint_tests.bal | 3 +-- ballerina/tests/listener_client_tests.bal | 4 +--- ballerina/tests/listener_data_binding_tests.bal | 2 ++ 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ballerina/tests/consumer_constraint_tests.bal b/ballerina/tests/consumer_constraint_tests.bal index da448fff..813ca26c 100644 --- a/ballerina/tests/consumer_constraint_tests.bal +++ b/ballerina/tests/consumer_constraint_tests.bal @@ -32,14 +32,13 @@ public type StringConstraintConsumerRecord record {| |}; public type IntConstraintConsumerRecord record {| + *AnydataConsumerRecord; int key?; @constraint:Int { maxValue: 100, minValue: 10 } int value; - int timestamp; - PartitionOffset offset; |}; @constraint:Float { diff --git a/ballerina/tests/listener_client_tests.bal b/ballerina/tests/listener_client_tests.bal index 7f8e2a21..cdb2594e 100644 --- a/ballerina/tests/listener_client_tests.bal +++ b/ballerina/tests/listener_client_tests.bal @@ -1008,9 +1008,7 @@ function listenerWithConsumerHeadersTest() returns error? { topics: topic, offsetReset: OFFSET_RESET_EARLIEST, groupId: "test-listener-group-29", - clientId: "test-listener-29", - pollingInterval: 2, - pollingTimeout: 1 + clientId: "test-listener-29" }; Listener headersListener = check new (DEFAULT_URL, consumerConfiguration); check headersListener.attach(headersService); diff --git a/ballerina/tests/listener_data_binding_tests.bal b/ballerina/tests/listener_data_binding_tests.bal index eb923959..b5b57a48 100644 --- a/ballerina/tests/listener_data_binding_tests.bal +++ b/ballerina/tests/listener_data_binding_tests.bal @@ -140,6 +140,7 @@ public type TableConsumerRecord record {| table value; int timestamp; PartitionOffset offset; + map headers; |}; public type JsonConsumerRecord record {| @@ -161,6 +162,7 @@ public type PayloadConsumerRecord record {| int partition; |} partition; |} offset; + map headers?; |}; PayloadConsumerRecord payloadConsumerRecord = { From f2bb1969b9a3278c47f4dd6c888e9bd575a6c254 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Tue, 19 Mar 2024 21:25:31 +0530 Subject: [PATCH 07/13] Update compiler plugin testcases --- .../ballerina_sources/invalid_service_22/service.bal | 2 ++ .../ballerina_sources/valid_service_11/service.bal | 2 ++ .../ballerina_sources/valid_service_12/service.bal | 3 +++ .../stdlib/kafka/plugin/KafkaFunctionValidator.java | 9 +++++++-- .../ballerina/stdlib/kafka/plugin/PluginConstants.java | 1 + 5 files changed, 15 insertions(+), 2 deletions(-) diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal index 9160d17e..dd0ba46e 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal @@ -74,6 +74,7 @@ public type PayloadConsumerRecord record {| int partition; |} partition; |} offset; + map headers; |}; public type PayloadConsumerRecordWithTypeReference record {| @@ -81,6 +82,7 @@ public type PayloadConsumerRecordWithTypeReference record {| string value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type Person record {| diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal index 90ebd41b..ea266510 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal @@ -162,6 +162,7 @@ public type PayloadConsumerRecord record {| int partition; |} partition; |} offset; + map headers; |}; public type PayloadConsumerRecordWithTypeReference record {| @@ -169,6 +170,7 @@ public type PayloadConsumerRecordWithTypeReference record {| string value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type Person record {| diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal index af7505bc..e2fd3253 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal @@ -204,6 +204,7 @@ public type IntConsumerRecord record {| int value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type FloatConsumerRecord record {| @@ -252,6 +253,7 @@ public type TableConsumerRecord record {| table value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type JsonConsumerRecord record {| @@ -259,4 +261,5 @@ public type JsonConsumerRecord record {| json key?; int timestamp; json value; + map headers; |}; diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java index 4421bf27..80c92a5b 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java @@ -87,6 +87,7 @@ import static io.ballerina.compiler.syntax.tree.SyntaxKind.UNION_TYPE_DESC; import static io.ballerina.compiler.syntax.tree.SyntaxKind.XML_TYPE_DESC; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CALLER; +import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_HEADERS; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_KEY; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_OFFSET; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_PARTITION; @@ -393,10 +394,11 @@ private boolean isConsumerRecordType(TypeSymbol typeSymbol) { } private boolean validateConsumerRecordFields(Map fieldDescriptors) { - if (fieldDescriptors.size() != 4 || !fieldDescriptors.containsKey(CONSUMER_RECORD_KEY) || + if (fieldDescriptors.size() != 5 || !fieldDescriptors.containsKey(CONSUMER_RECORD_KEY) || !fieldDescriptors.containsKey(CONSUMER_RECORD_VALUE) || !fieldDescriptors.containsKey(CONSUMER_RECORD_TIMESTAMP) || - !fieldDescriptors.containsKey(CONSUMER_RECORD_OFFSET)) { + !fieldDescriptors.containsKey(CONSUMER_RECORD_OFFSET) || + !fieldDescriptors.containsKey(CONSUMER_RECORD_HEADERS)) { return false; } if (fieldDescriptors.get(CONSUMER_RECORD_TIMESTAMP).typeDescriptor().typeKind() != INT) { @@ -406,6 +408,9 @@ private boolean validateConsumerRecordFields(Map fiel fieldDescriptors.get(CONSUMER_RECORD_OFFSET).typeDescriptor().typeKind() != RECORD) { return false; } + if (fieldDescriptors.get(CONSUMER_RECORD_HEADERS).typeDescriptor().typeKind() != MAP) { + return false; + } if (!validateOffsetField(fieldDescriptors.get(CONSUMER_RECORD_OFFSET).typeDescriptor())) { return false; } diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java index 83651da3..c96c80bd 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java @@ -42,6 +42,7 @@ public class PluginConstants { public static final String CONSUMER_RECORD_OFFSET = "offset"; public static final String CONSUMER_RECORD_TOPIC = "topic"; public static final String CONSUMER_RECORD_PARTITION = "partition"; + public static final String CONSUMER_RECORD_HEADERS = "headers"; // return types error or nil public static final String ERROR = "error"; From 12c8f06372e8870a60bf7c6a17ca736e3d9ae945 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Wed, 20 Mar 2024 09:37:58 +0530 Subject: [PATCH 08/13] Update spec --- docs/spec/spec.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 4991c612..f5248240 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -219,6 +219,8 @@ public type AnydataProducerRecord record {| int timestamp?; # Partition to which the record should be sent int partition?; + # Map of headers to be included with the record + map headers?; |}; ``` * `kafka:BytesProducerRecord` defines the subtype of `kafka:AnydataProducerRecord` where the value is a `byte[]`; @@ -418,6 +420,8 @@ public type AnydataConsumerRecord record {| int timestamp; # Topic partition position in which the consumed record is stored PartitionOffset offset; + # Map of headers included with the record + map headers; |}; ``` * `kafka:BytesConsumerRecord` defines the subtype of `kafka:AnydataConsumerRecord` where the value is a `byte[]`. From b9e012f720a1a966d3d996d6cef72d7fa69e5290 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Wed, 20 Mar 2024 09:47:07 +0530 Subject: [PATCH 09/13] Update changelog.md --- changelog.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 189bf729..6567a393 100644 --- a/changelog.md +++ b/changelog.md @@ -5,14 +5,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +### Added + +- [Added header support for Consumr and Producer](https://github.com/ballerina-platform/ballerina-library/issues/6196) + +## [3.10.0] - 2023-09-18 + ### Changed -- [Changed disallowing service level annotations in the compiler plugin](https://github.com/ballerina-platform/ballerina-standard-library/issues/4731) - [Updated Apache Kafka client version to `3.5.1`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4752) ### Fixed - [Removed `client` keyword from the `kafka:Listener`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4750) +## [3.9.1] - 2023-08-18 + +### Changed + +- [Changed disallowing service level annotations in the compiler plugin](https://github.com/ballerina-platform/ballerina-standard-library/issues/4731) + ## [3.8.0] - 2023-06-01 ### Fixed From 5443710e200ca6d70329bbef707130b925c51b9e Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Wed, 20 Mar 2024 11:18:37 +0530 Subject: [PATCH 10/13] Fix review comments --- ballerina/kafka_records.bal | 2 ++ ballerina/producer.bal | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ballerina/kafka_records.bal b/ballerina/kafka_records.bal index 90d70cce..08918ca0 100644 --- a/ballerina/kafka_records.bal +++ b/ballerina/kafka_records.bal @@ -240,6 +240,7 @@ public type BytesConsumerRecord record {| # + value - Record content # + timestamp - Timestamp of the record, in milliseconds since epoch # + partition - Partition to which the record should be sent +# + headers - Map of headers to be included with the record # # Deprecated # Usage of this record is deprecated. Use subtypes of AnydataProducerRecord # instead to support data-binding @@ -250,6 +251,7 @@ public type ProducerRecord record {| byte[] value; int timestamp?; int partition?; + map headers?; |}; # Details related to the anydata producer record. diff --git a/ballerina/producer.bal b/ballerina/producer.bal index 6fc554d8..1f957920 100644 --- a/ballerina/producer.bal +++ b/ballerina/producer.bal @@ -116,7 +116,7 @@ public client isolated class Producer { } else if anydataKey !is () { key = anydataKey.toJsonString().toBytes(); } - return sendByteArrayValues(self, value, producerRecord.topic, producerRecord.headers, key, + return sendByteArrayValues(self, value, producerRecord.topic, producerRecord?.headers, key, producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType); } } From 1f38eb42de176ab1dfb51f7191d6826bb403cac9 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Thu, 21 Mar 2024 23:43:43 +0530 Subject: [PATCH 11/13] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index e003d8d7..5d89a3ce 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -71,7 +71,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.11" +version = "2.10.12" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, From 1423f08933c5f3e47606d479f6e680f41f2fa4cb Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Fri, 22 Mar 2024 15:16:24 +0530 Subject: [PATCH 12/13] Update implementation to use multiple types --- ballerina/kafka_records.bal | 12 +- ballerina/producer.bal | 25 ++- ballerina/producer_utils.bal | 6 +- ballerina/tests/producer_client_tests.bal | 2 +- .../producer/SendByteArrayValues.java | 26 +-- .../stdlib/kafka/utils/KafkaUtils.java | 173 +++++++----------- 6 files changed, 115 insertions(+), 129 deletions(-) diff --git a/ballerina/kafka_records.bal b/ballerina/kafka_records.bal index 08918ca0..c09844cf 100644 --- a/ballerina/kafka_records.bal +++ b/ballerina/kafka_records.bal @@ -207,7 +207,7 @@ public type ConsumerRecord record {| byte[] value; int timestamp; PartitionOffset offset; - map headers; + map headers; |}; # Type related to anydata consumer record. @@ -222,15 +222,17 @@ public type AnydataConsumerRecord record {| anydata value; int timestamp; PartitionOffset offset; - map headers; + map headers; |}; # Subtype related to `kafka:AnydataConsumerRecord` record. # # + value - Record content in bytes +# + headers - Headers as a byte[] or byte[][] public type BytesConsumerRecord record {| *AnydataConsumerRecord; byte[] value; + map headers; |}; # Details related to the producer record. @@ -251,7 +253,7 @@ public type ProducerRecord record {| byte[] value; int timestamp?; int partition?; - map headers?; + map headers?; |}; # Details related to the anydata producer record. @@ -268,15 +270,17 @@ public type AnydataProducerRecord record {| anydata value; int timestamp?; int partition?; - map headers?; + map headers?; |}; # Subtype related to `kafka:AnydataProducerRecord` record. # # + value - Record content in bytes +# + headers - Headers as a byte[] or byte[][] public type BytesProducerRecord record {| *AnydataProducerRecord; byte[] value; + map headers?; |}; // Producer-related records diff --git a/ballerina/producer.bal b/ballerina/producer.bal index 1f957920..f10c9452 100644 --- a/ballerina/producer.bal +++ b/ballerina/producer.bal @@ -116,7 +116,30 @@ public client isolated class Producer { } else if anydataKey !is () { key = anydataKey.toJsonString().toBytes(); } - return sendByteArrayValues(self, value, producerRecord.topic, producerRecord?.headers, key, + return sendByteArrayValues(self, value, producerRecord.topic, self.getHeaderValueAsByteArrayList(producerRecord?.headers), key, producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType); } + + private isolated function getHeaderValueAsByteArrayList(map? headers) returns [string, byte[]][] { + [string, byte[]][] bytesHeaderList = []; + if headers is map { + foreach string key in headers.keys() { + byte[]|byte[][]|string|string[] values = headers.get(key); + if values is byte[] { + bytesHeaderList.push([key, values]); + } else if values is byte[][] { + foreach byte[] headerValue in values { + bytesHeaderList.push([key, headerValue]); + } + } else if values is string { + bytesHeaderList.push([key, values.toBytes()]); + } else if values is string[] { + foreach string headerValue in values { + bytesHeaderList.push([key, headerValue.toBytes()]); + } + } + } + } + return bytesHeaderList; + } } diff --git a/ballerina/producer_utils.bal b/ballerina/producer_utils.bal index 1d870c71..4f5f1f6f 100644 --- a/ballerina/producer_utils.bal +++ b/ballerina/producer_utils.bal @@ -16,7 +16,7 @@ import ballerina/jballerina.java; -isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, map? headers, anydata? key, int? partition, +isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, [string, byte[]][] headers, anydata? key, int? partition, int? timestamp, string keySerializerType) returns Error? { if key is () { return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp, headers); @@ -31,13 +31,13 @@ isolated function sendByteArrayValues(Producer producer, byte[] value, string to //Send byte[] values with different types of keys isolated function sendByteArrayValuesNilKeys(Producer producer, byte[] value, string topic, int? partition = (), - int? timestamp = (), map? headers = ()) returns Error? = + int? timestamp = (), [string, byte[]][] headers = []) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues" } external; isolated function sendByteArrayValuesByteArrayKeys(Producer producer, byte[] value, string topic, byte[] key, - int? partition = (), int? timestamp = (), map? headers = ()) returns Error? = + int? partition = (), int? timestamp = (), [string, byte[]][] headers = []) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues" } external; diff --git a/ballerina/tests/producer_client_tests.bal b/ballerina/tests/producer_client_tests.bal index 83edae29..76e9c4ec 100644 --- a/ballerina/tests/producer_client_tests.bal +++ b/ballerina/tests/producer_client_tests.bal @@ -102,7 +102,7 @@ function producerKeyTypeMismatchErrorTest() returns error? { string topic = "key-type-mismatch-error-test-topic"; Producer producer = check new (DEFAULT_URL, producerConfiguration); string message = "Hello, Ballerina"; - error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, (), MESSAGE_KEY, 0, (), SER_BYTE_ARRAY); + error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, [], MESSAGE_KEY, 0, (), SER_BYTE_ARRAY); if result is error { string expectedErr = "Invalid type found for Kafka key. Expected key type: 'byte[]'."; test:assertEquals(result.message(), expectedErr); diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java index 719ca22c..b34db402 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java @@ -50,10 +50,10 @@ public class SendByteArrayValues extends Send { // ballerina byte[] public static Object sendByteArrayValuesNilKeys(Environment env, BObject producer, BArray value, BString topic, - Object partition, Object timestamp, Object bHeaders) { + Object partition, Object timestamp, BArray headerList) { Integer partitionValue = getIntValue(partition, ALIAS_PARTITION, logger); Long timestampValue = getLongValue(timestamp); - List
headers = getHeadersFromBHeaders(bHeaders); + List
headers = getHeadersFromBHeaders(headerList); ProducerRecord kafkaRecord = new ProducerRecord<>(topic.getValue(), partitionValue, timestampValue, null, value.getBytes(), headers); return sendKafkaRecord(env, kafkaRecord, producer); @@ -62,30 +62,20 @@ public static Object sendByteArrayValuesNilKeys(Environment env, BObject produce // ballerina byte[] and ballerina byte[] public static Object sendByteArrayValuesByteArrayKeys(Environment env, BObject producer, BArray value, BString topic, BArray key, Object partition, - Object timestamp, Object bHeaders) { + Object timestamp, BArray headerList) { Integer partitionValue = getIntValue(partition, ALIAS_PARTITION, logger); Long timestampValue = getLongValue(timestamp); - List
headers = getHeadersFromBHeaders(bHeaders); + List
headers = getHeadersFromBHeaders(headerList); ProducerRecord kafkaRecord = new ProducerRecord<>(topic.getValue(), partitionValue, timestampValue, key.getBytes(), value.getBytes(), headers); return sendKafkaRecord(env, kafkaRecord, producer); } - private static List
getHeadersFromBHeaders(Object headerObj) { + private static List
getHeadersFromBHeaders(BArray headerList) { List
headers = new ArrayList<>(); - if (headerObj instanceof BMap) { - BMap bHeaders = (BMap) headerObj; - for (BString key : (BString[]) bHeaders.getKeys()) { - BArray headerValues = bHeaders.getArrayValue(key); - if (!headerValues.isEmpty() && headerValues.get(0) instanceof BArray) { - for (int i = 0; i < headerValues.size(); i++) { - BArray headerValue = (BArray) headerValues.get(i); - headers.add(new RecordHeader(key.getValue(), headerValue.getByteArray())); - } - } else if (!headerValues.isEmpty()) { - headers.add(new RecordHeader(key.getValue(), headerValues.getByteArray())); - } - } + for (int i = 0; i < headerList.size(); i++) { + BArray headerItem = (BArray) headerList.get(i); + headers.add(new RecordHeader(headerItem.getBString(0).getValue(), ((BArray) headerItem.get(1)).getByteArray())); } return headers; } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index b2b5b87b..83bffaf3 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -26,6 +26,7 @@ import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.Field; import io.ballerina.runtime.api.types.IntersectionType; +import io.ballerina.runtime.api.types.MapType; import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.RecordType; @@ -127,14 +128,6 @@ public static Properties processKafkaConsumerConfig(Object bootStrapServers, BMa addDeserializerConfigs(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties); addDeserializerConfigs(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties); - // TODO: Disabled as the custom SerDes support is to be revisited and improved. - // Fix once the design for that is completed. -// addCustomDeserializer(KafkaConstants.CONSUMER_KEY_DESERIALIZER_CONFIG, -// KafkaConstants.CONSUMER_KEY_DESERIALIZER_TYPE_CONFIG, properties, -// configurations); -// addCustomDeserializer(KafkaConstants.CONSUMER_VALUE_DESERIALIZER_CONFIG, -// KafkaConstants.CONSUMER_VALUE_DESERIALIZER_TYPE_CONFIG, properties, -// configurations); addStringParamIfPresent(KafkaConstants.SCHEMA_REGISTRY_URL, configurations, properties, KafkaConstants.CONSUMER_SCHEMA_REGISTRY_URL); @@ -238,11 +231,6 @@ public static Properties processKafkaProducerConfig(Object bootstrapServers, BMa addSerializerTypeConfigs(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties); addSerializerTypeConfigs(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties); - // TODO: Disabled as the custom SerDes support is to be revisited and improved. - // Fix once the design for that is completed. -// addCustomKeySerializer(properties, configurations); -// addCustomValueSerializer(properties, configurations); - addIntParamIfPresent(ProducerConfig.BUFFER_MEMORY_CONFIG, configurations, properties, KafkaConstants.PRODUCER_BUFFER_MEMORY_CONFIG); addIntParamIfPresent(ProducerConfig.RETRIES_CONFIG, configurations, @@ -402,75 +390,6 @@ private static void addDeserializerConfigs(String paramName, Properties configPa configParams.put(paramName, KafkaConstants.BYTE_ARRAY_DESERIALIZER); } - // TODO: Disabled as the SerDes support is to be revisited and improved. Fix once the design for that is completed. -// private static void addCustomKeySerializer(Properties properties, BMap configurations) { -// Object serializer = configurations.get(KafkaConstants.PRODUCER_KEY_SERIALIZER_CONFIG); -// String serializerType = -// configurations.getStringValue(KafkaConstants.PRODUCER_KEY_SERIALIZER_TYPE_CONFIG).getValue(); -// if (Objects.nonNull(serializer) && KafkaConstants.SERDES_CUSTOM.equals(serializerType)) { -// properties.put(KafkaConstants.PRODUCER_KEY_SERIALIZER_CONFIG.getValue(), -// configurations.get(KafkaConstants.PRODUCER_KEY_SERIALIZER_CONFIG)); -// } -// } -// -// private static void addCustomValueSerializer(Properties properties, BMap configurations) { -// Object serializer = configurations.get(KafkaConstants.PRODUCER_VALUE_SERIALIZER_CONFIG); -// String serializerType = -// configurations.getStringValue(KafkaConstants.PRODUCER_VALUE_SERIALIZER_TYPE_CONFIG).getValue(); -// if (Objects.nonNull(serializer) && KafkaConstants.SERDES_CUSTOM.equals(serializerType)) { -// properties.put(KafkaConstants.PRODUCER_VALUE_SERIALIZER_CONFIG.getValue(), -// configurations.get(KafkaConstants.PRODUCER_VALUE_SERIALIZER_CONFIG)); -// } -// } - -// private static void addCustomDeserializer(BString configParam, BString typeConfig, Properties properties, -// BMap configurations) { -// Object deserializer = configurations.get(configParam); -// String deserializerType = configurations.getStringValue(typeConfig).getValue(); -// if (Objects.nonNull(deserializer) && KafkaConstants.SERDES_CUSTOM.equals(deserializerType)) { -// properties.put(configParam.getValue(), configurations.get(configParam)); -// properties.put(KafkaConstants.BALLERINA_STRAND, Runtime.getCurrentRuntime()); -// } -// } - -// private static String getSerializerType(String value) { -// switch (value) { -// case KafkaConstants.SERDES_BYTE_ARRAY: -// return KafkaConstants.BYTE_ARRAY_SERIALIZER; -// case KafkaConstants.SERDES_STRING: -// return KafkaConstants.STRING_SERIALIZER; -// case KafkaConstants.SERDES_INT: -// return KafkaConstants.INT_SERIALIZER; -// case KafkaConstants.SERDES_FLOAT: -// return KafkaConstants.FLOAT_SERIALIZER; -// case KafkaConstants.SERDES_AVRO: -// return KafkaConstants.AVRO_SERIALIZER; -// case KafkaConstants.SERDES_CUSTOM: -// return KafkaConstants.CUSTOM_SERIALIZER; -// default: -// return value; -// } -// } -// -// private static String getDeserializerValue(String value) { -// switch (value) { -// case KafkaConstants.SERDES_BYTE_ARRAY: -// return KafkaConstants.BYTE_ARRAY_DESERIALIZER; -// case KafkaConstants.SERDES_STRING: -// return KafkaConstants.STRING_DESERIALIZER; -// case KafkaConstants.SERDES_INT: -// return KafkaConstants.INT_DESERIALIZER; -// case KafkaConstants.SERDES_FLOAT: -// return KafkaConstants.FLOAT_DESERIALIZER; -// case KafkaConstants.SERDES_AVRO: -// return KafkaConstants.AVRO_DESERIALIZER; -// case KafkaConstants.SERDES_CUSTOM: -// return KafkaConstants.CUSTOM_DESERIALIZER; -// default: -// return value; -// } -// } - private static void addStringParamIfPresent(String paramName, BMap configs, Properties configParams, @@ -617,7 +536,8 @@ public static BMap populateConsumerRecord(ConsumerRecord record Object value = getValueWithIntendedType(valueType, (byte[]) record.value(), record, autoSeek); BMap topicPartition = ValueCreator.createRecordValue(getTopicPartitionRecord(), record.topic(), (long) record.partition()); - BMap bHeaders = getBHeadersFromRecord(record.headers()); + MapType headerType = (MapType) getReferredType(fieldMap.get(KAFKA_RECORD_HEADERS.getValue()).getFieldType()); + BMap bHeaders = getBHeadersFromConsumerRecord(record.headers(), headerType.getConstrainedType()); BMap consumerRecord = ValueCreator.createRecordValue(recordType); consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_KEY), key); consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_VALUE), value); @@ -631,32 +551,81 @@ public static BMap populateConsumerRecord(ConsumerRecord record return consumerRecord; } - private static BMap getBHeadersFromRecord(Headers headers) { - BMap bHeaderMap = ValueCreator.createMapValue(); - HashMap headerMap = new HashMap<>(); + private static BMap getBHeadersFromConsumerRecord(Headers headers, Type headerType) { + HashMap> headerMap = new HashMap<>(); for (Header header : headers) { if (headerMap.containsKey(header.key())) { - BArray values = headerMap.get(header.key()); - values.add(values.size(), ValueCreator.createArrayValue(header.value())); - headerMap.put(header.key(), values); + ArrayList headerList = headerMap.get(header.key()); + headerList.add(header.value()); } else { - ArrayType arrayOfByteArrayType = TypeCreator.createArrayType(TypeCreator - .createArrayType(PredefinedTypes.TYPE_BYTE)); - BArray arrayOfByteArray = ValueCreator.createArrayValue(arrayOfByteArrayType); - arrayOfByteArray.add(0, ValueCreator.createArrayValue(header.value())); - headerMap.put(header.key(), arrayOfByteArray); + ArrayList headerList = new ArrayList(); + headerList.add(header.value()); + headerMap.put(header.key(), headerList); } } - headerMap.forEach((key, value) -> { - if (value.size() > 1) { - bHeaderMap.put(StringUtils.fromString(key), value); + BMap bHeaderMap = ValueCreator.createMapValue(); + headerMap.forEach((key, valueList) -> { + if (headerType instanceof UnionType unionType) { + Type appropriateType = getMostAppropriateTypeFromUnionType(unionType.getMemberTypes(), valueList.size()); + handleSupportedTypesForHeaders(key, valueList, appropriateType, bHeaderMap); } else { - bHeaderMap.put(StringUtils.fromString(key), value.get(0)); + handleSupportedTypesForHeaders(key, valueList, headerType, bHeaderMap); } }); return bHeaderMap; } + private static void handleSupportedTypesForHeaders(String key, ArrayList list, Type appropriateType, BMap bHeaderMap) { + if (appropriateType instanceof ArrayType arrayType) { + handleHeaderValuesWithArrayType(key, list, arrayType, bHeaderMap); + } else if (appropriateType.getTag() == STRING_TAG) { + bHeaderMap.put(StringUtils.fromString(key), StringUtils.fromString(new String(list.get(0)))); + } + } + + private static void handleHeaderValuesWithArrayType(String key, ArrayList list, ArrayType arrayType, BMap bHeaderMap) { + Type elementType = arrayType.getElementType(); + if (elementType.getTag() == ARRAY_TAG) { + BArray valueArray = ValueCreator.createArrayValue(arrayType); + for (int i = 0; i < list.size(); i++) { + valueArray.add(i, ValueCreator.createArrayValue(list.get(i))); + } + bHeaderMap.put(StringUtils.fromString(key), valueArray); + } else if (elementType.getTag() == STRING_TAG) { + BArray valueArray = ValueCreator.createArrayValue(arrayType); + for (int i = 0; i < list.size(); i++) { + valueArray.add(i, StringUtils.fromString(new String(list.get(i)))); + } + bHeaderMap.put(StringUtils.fromString(key), valueArray); + } else if (elementType.getTag() == BYTE_TAG) { + bHeaderMap.put(StringUtils.fromString(key), ValueCreator.createArrayValue(list.get(0))); + } + } + + private static Type getMostAppropriateTypeFromUnionType(List memberTypes, int size) { + Type firstType = memberTypes.get(0); + if (memberTypes.size() == 1) { + return firstType; + } + if (size > 1) { + if (firstType instanceof ArrayType arrayType) { + if (arrayType.getElementType().getTag() == BYTE_TAG) { + return getMostAppropriateTypeFromUnionType(memberTypes.subList(1, memberTypes.size()), size); + } + return arrayType; + } + return getMostAppropriateTypeFromUnionType(memberTypes.subList(1, memberTypes.size()), size); + } else { + if (firstType instanceof ArrayType arrayType) { + if (arrayType.getElementType().getTag() == BYTE_TAG) { + return arrayType; + } + return getMostAppropriateTypeFromUnionType(memberTypes.subList(1, memberTypes.size()), size); + } + return firstType; + } + } + public static BArray getConsumerRecords(ConsumerRecords records, RecordType recordType, boolean readonly, boolean validateConstraints, boolean autoCommit, KafkaConsumer consumer, boolean autoSeek) { @@ -741,10 +710,10 @@ public static Object getValueWithIntendedType(Type type, byte[] value, ConsumerR intendedValue = ValueCreator.createArrayValue(value); break; case RECORD_TYPE_TAG: - intendedValue = ValueUtils.convert(JsonUtils.parse(strValue), type); + intendedValue = getValueFromJson(type, strValue); break; case UNION_TAG: - if (hasStringType((UnionType) type)) { + if (hasExpectedType((UnionType) type, STRING_TAG)) { intendedValue = StringUtils.fromString(strValue); break; } @@ -768,9 +737,9 @@ public static Object getValueWithIntendedType(Type type, byte[] value, ConsumerR return intendedValue; } - private static boolean hasStringType(UnionType type) { + private static boolean hasExpectedType(UnionType type, int typeTag) { return type.getMemberTypes().stream().anyMatch(memberType -> { - if (memberType.getTag() == STRING_TAG) { + if (memberType.getTag() == typeTag) { return true; } return false; From 574444f01009c353734558c07914b0050de819fb Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Fri, 22 Mar 2024 16:14:53 +0530 Subject: [PATCH 13/13] Add test cases --- ballerina/tests/consumer_client_tests.bal | 23 +- ballerina/tests/consumer_header_tests.bal | 239 ++++++++++++++++++ .../producer/SendByteArrayValues.java | 4 +- .../stdlib/kafka/utils/KafkaUtils.java | 18 +- 4 files changed, 252 insertions(+), 32 deletions(-) create mode 100644 ballerina/tests/consumer_header_tests.bal diff --git a/ballerina/tests/consumer_client_tests.bal b/ballerina/tests/consumer_client_tests.bal index 5322857d..62dea8a5 100644 --- a/ballerina/tests/consumer_client_tests.bal +++ b/ballerina/tests/consumer_client_tests.bal @@ -99,6 +99,7 @@ ProducerConfiguration producerConfiguration = { requestTimeout: 2, retryCount: 3 }; + Producer producer = check new (DEFAULT_URL, producerConfiguration); @test:Config {enable: true} @@ -1483,26 +1484,6 @@ function commitOffsetWithPolledOffsetValue() returns error? { check consumer->close(); } -@test:Config {enable: true} -function consumerReadHeadersTest() returns error? { - string topic = "consumer-read-headers-test-topic"; - kafkaTopics.push(topic); - map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; - check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); - ConsumerConfiguration consumerConfiguration = { - topics: [topic], - offsetReset: OFFSET_RESET_EARLIEST, - groupId: "consumer-read-headers-test-group", - clientId: "test-consumer-61" - }; - Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - BytesConsumerRecord[] consumerRecords = check consumer->poll(5); - test:assertEquals(consumerRecords.length(), 1); - map receivedHeaders = consumerRecords[0].headers; - test:assertEquals(receivedHeaders, headers); - check consumer->close(); -} - -function sendMessage(anydata message, string topic, anydata? key = (), map? headers = ()) returns error? { +function sendMessage(anydata message, string topic, anydata? key = (), map? headers = ()) returns error? { return producer->send({ topic: topic, value: message, key, headers }); } diff --git a/ballerina/tests/consumer_header_tests.bal b/ballerina/tests/consumer_header_tests.bal new file mode 100644 index 00000000..b5f4bb8b --- /dev/null +++ b/ballerina/tests/consumer_header_tests.bal @@ -0,0 +1,239 @@ +// Copyright (c) 2024 WSO2 LLC. (http://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +type StringHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringHeadersTest() returns error? { + string topic = "consumer-read-string-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": "header1", "key2": "header3"}); + check consumer->close(); +} + +type StringArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringArrayHeadersTest() returns error? { + string topic = "consumer-read-string-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1", "header2"], "key2": ["header3"]}); + check consumer->close(); +} + +type ByteHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadByteHeadersTest() returns error? { + string topic = "consumer-read-byte-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + ByteHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": "header1".toBytes(), "key2": "header3".toBytes()}); + check consumer->close(); +} + +type ByteArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadByteArrayHeadersTest() returns error? { + string topic = "consumer-read-byte-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + ByteArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": ["header3".toBytes()]}); + check consumer->close(); +} + +type StringAndStringArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringAndStringArrayHeadersTest() returns error? { + string topic = "consumer-read-string-string-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-string-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringAndStringArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1", "header2"], "key2": "header3"}); + check consumer->close(); +} + +type StringAndByteArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringAndByteArrayHeadersTest() returns error? { + string topic = "consumer-read-string-and-byte-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-and-byte-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringAndByteArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": "header1".toBytes(), "key2": "header3"}); + check consumer->close(); +} + +type ByteAndStringArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadByteAndStringArrayHeadersTest() returns error? { + string topic = "consumer-read-byte-and-string-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-and-string-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + ByteAndStringArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1", "header2"], "key2": "header3".toBytes()}); + check consumer->close(); +} + +@test:Config {enable: true} +function consumerReadByteAndByteArrayHeadersTest() returns error? { + string topic = "consumer-read-byte-and-byte-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-and-byte-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}); + check consumer->close(); +} + +@test:Config {enable: true} +function consumerReadAllSupportedTypesHeadersTest() returns error? { + string topic = "consumer-read-all-types-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-all-types-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + AnydataConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}); + check consumer->close(); +} diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java index b34db402..93cbc606 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java @@ -20,7 +20,6 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.values.BArray; -import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import org.apache.kafka.clients.producer.ProducerRecord; @@ -75,7 +74,8 @@ private static List
getHeadersFromBHeaders(BArray headerList) { List
headers = new ArrayList<>(); for (int i = 0; i < headerList.size(); i++) { BArray headerItem = (BArray) headerList.get(i); - headers.add(new RecordHeader(headerItem.getBString(0).getValue(), ((BArray) headerItem.get(1)).getByteArray())); + headers.add(new RecordHeader(headerItem.getBString(0).getValue(), + ((BArray) headerItem.get(1)).getByteArray())); } return headers; } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index 83bffaf3..8ef497d8 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -78,7 +78,6 @@ import static io.ballerina.runtime.api.TypeTags.ARRAY_TAG; import static io.ballerina.runtime.api.TypeTags.BYTE_TAG; import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; -import static io.ballerina.runtime.api.TypeTags.RECORD_TYPE_TAG; import static io.ballerina.runtime.api.TypeTags.STRING_TAG; import static io.ballerina.runtime.api.TypeTags.UNION_TAG; import static io.ballerina.runtime.api.TypeTags.XML_TAG; @@ -566,7 +565,8 @@ private static BMap getBHeadersFromConsumerRecord(Headers headers, Type headerTy BMap bHeaderMap = ValueCreator.createMapValue(); headerMap.forEach((key, valueList) -> { if (headerType instanceof UnionType unionType) { - Type appropriateType = getMostAppropriateTypeFromUnionType(unionType.getMemberTypes(), valueList.size()); + Type appropriateType = getMostAppropriateTypeFromUnionType(unionType.getMemberTypes(), + valueList.size()); handleSupportedTypesForHeaders(key, valueList, appropriateType, bHeaderMap); } else { handleSupportedTypesForHeaders(key, valueList, headerType, bHeaderMap); @@ -575,15 +575,18 @@ private static BMap getBHeadersFromConsumerRecord(Headers headers, Type headerTy return bHeaderMap; } - private static void handleSupportedTypesForHeaders(String key, ArrayList list, Type appropriateType, BMap bHeaderMap) { + private static void handleSupportedTypesForHeaders(String key, ArrayList list, Type appropriateType, + BMap bHeaderMap) { if (appropriateType instanceof ArrayType arrayType) { handleHeaderValuesWithArrayType(key, list, arrayType, bHeaderMap); } else if (appropriateType.getTag() == STRING_TAG) { - bHeaderMap.put(StringUtils.fromString(key), StringUtils.fromString(new String(list.get(0)))); + bHeaderMap.put(StringUtils.fromString(key), StringUtils.fromString(new String(list.get(0), + StandardCharsets.UTF_8))); } } - private static void handleHeaderValuesWithArrayType(String key, ArrayList list, ArrayType arrayType, BMap bHeaderMap) { + private static void handleHeaderValuesWithArrayType(String key, ArrayList list, ArrayType arrayType, + BMap bHeaderMap) { Type elementType = arrayType.getElementType(); if (elementType.getTag() == ARRAY_TAG) { BArray valueArray = ValueCreator.createArrayValue(arrayType); @@ -594,7 +597,7 @@ private static void handleHeaderValuesWithArrayType(String key, ArrayList