diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 9d37e659..98008be0 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "kafka" -version = "3.10.0" +version = "3.10.1" authors = ["Ballerina"] keywords = ["kafka", "event streaming", "network", "messaging"] repository = "https://github.com/ballerina-platform/module-ballerinax-kafka" @@ -15,8 +15,8 @@ graalvmCompatible = true [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" artifactId = "kafka-native" -version = "3.10.0" -path = "../native/build/libs/kafka-native-3.10.0.jar" +version = "3.10.1" +path = "../native/build/libs/kafka-native-3.10.1-SNAPSHOT.jar" [[platform.java17.dependency]] groupId = "org.apache.kafka" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 5d6bff77..4aa1e9e6 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "kafka-compiler-plugin" class = "io.ballerina.stdlib.kafka.plugin.KafkaCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.0.jar" +path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 934f04a5..5d89a3ce 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -23,7 +23,7 @@ dependencies = [ [[package]] org = "ballerina" name = "cache" -version = "3.7.0" +version = "3.7.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "constraint"}, @@ -71,7 +71,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.0" +version = "2.10.12" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -283,7 +283,7 @@ dependencies = [ [[package]] org = "ballerina" name = "observe" -version = "1.2.0" +version = "1.2.2" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -325,6 +325,7 @@ modules = [ org = "ballerina" name = "time" version = "2.4.0" +scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -389,7 +390,7 @@ modules = [ [[package]] org = "ballerinax" name = "kafka" -version = "3.10.0" +version = "3.10.1" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "crypto"}, diff --git a/ballerina/kafka_records.bal b/ballerina/kafka_records.bal index 5416cca7..c09844cf 100644 --- a/ballerina/kafka_records.bal +++ b/ballerina/kafka_records.bal @@ -197,6 +197,7 @@ public type TopicPartition record {| # + value - Record content # + timestamp - Timestamp of the record, in milliseconds since epoch # + offset - Topic partition position in which the consumed record is stored +# + headers - Map of headers included with the record # # Deprecated # Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord # instead to support data-binding @@ -206,6 +207,7 @@ public type ConsumerRecord record {| byte[] value; int timestamp; PartitionOffset offset; + map headers; |}; # Type related to anydata consumer record. @@ -214,19 +216,23 @@ public type ConsumerRecord record {| # + value - Anydata record content # + timestamp - Timestamp of the record, in milliseconds since epoch # + offset - Topic partition position in which the consumed record is stored +# + headers - Map of headers included with the record public type AnydataConsumerRecord record {| anydata key?; anydata value; int timestamp; PartitionOffset offset; + map headers; |}; # Subtype related to `kafka:AnydataConsumerRecord` record. # # + value - Record content in bytes +# + headers - Headers as a byte[] or byte[][] public type BytesConsumerRecord record {| *AnydataConsumerRecord; byte[] value; + map headers; |}; # Details related to the producer record. @@ -236,6 +242,7 @@ public type BytesConsumerRecord record {| # + value - Record content # + timestamp - Timestamp of the record, in milliseconds since epoch # + partition - Partition to which the record should be sent +# + headers - Map of headers to be included with the record # # Deprecated # Usage of this record is deprecated. Use subtypes of AnydataProducerRecord # instead to support data-binding @@ -246,29 +253,34 @@ public type ProducerRecord record {| byte[] value; int timestamp?; int partition?; + map headers?; |}; # Details related to the anydata producer record. # -# + topic - Topic to which the record will be appended -# + key - Key that is included in the record -# + value - Anydata record content -# + timestamp - Timestamp of the record, in milliseconds since epoch -# + partition - Partition to which the record should be sent +# + topic - Topic to which the record will be appended +# + key - Key that is included in the record +# + value - Anydata record content +# + timestamp - Timestamp of the record, in milliseconds since epoch +# + partition - Partition to which the record should be sent +# + headers - Map of headers to be included with the record public type AnydataProducerRecord record {| string topic; anydata key?; anydata value; int timestamp?; int partition?; + map headers?; |}; # Subtype related to `kafka:AnydataProducerRecord` record. # # + value - Record content in bytes +# + headers - Headers as a byte[] or byte[][] public type BytesProducerRecord record {| *AnydataProducerRecord; byte[] value; + map headers?; |}; // Producer-related records diff --git a/ballerina/producer.bal b/ballerina/producer.bal index 20353fa5..f10c9452 100644 --- a/ballerina/producer.bal +++ b/ballerina/producer.bal @@ -116,7 +116,30 @@ public client isolated class Producer { } else if anydataKey !is () { key = anydataKey.toJsonString().toBytes(); } - return sendByteArrayValues(self, value, producerRecord.topic, key, + return sendByteArrayValues(self, value, producerRecord.topic, self.getHeaderValueAsByteArrayList(producerRecord?.headers), key, producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType); } + + private isolated function getHeaderValueAsByteArrayList(map? headers) returns [string, byte[]][] { + [string, byte[]][] bytesHeaderList = []; + if headers is map { + foreach string key in headers.keys() { + byte[]|byte[][]|string|string[] values = headers.get(key); + if values is byte[] { + bytesHeaderList.push([key, values]); + } else if values is byte[][] { + foreach byte[] headerValue in values { + bytesHeaderList.push([key, headerValue]); + } + } else if values is string { + bytesHeaderList.push([key, values.toBytes()]); + } else if values is string[] { + foreach string headerValue in values { + bytesHeaderList.push([key, headerValue.toBytes()]); + } + } + } + } + return bytesHeaderList; + } } diff --git a/ballerina/producer_utils.bal b/ballerina/producer_utils.bal index 8ddcc559..4f5f1f6f 100644 --- a/ballerina/producer_utils.bal +++ b/ballerina/producer_utils.bal @@ -16,14 +16,14 @@ import ballerina/jballerina.java; -isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, anydata? key, int? partition, +isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, [string, byte[]][] headers, anydata? key, int? partition, int? timestamp, string keySerializerType) returns Error? { if key is () { - return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp); + return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp, headers); } if keySerializerType == SER_BYTE_ARRAY { if key is byte[] { - return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp); + return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp, headers); } panic getKeyTypeMismatchError(BYTE_ARRAY); } @@ -31,13 +31,13 @@ isolated function sendByteArrayValues(Producer producer, byte[] value, string to //Send byte[] values with different types of keys isolated function sendByteArrayValuesNilKeys(Producer producer, byte[] value, string topic, int? partition = (), - int? timestamp = ()) returns Error? = + int? timestamp = (), [string, byte[]][] headers = []) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues" } external; isolated function sendByteArrayValuesByteArrayKeys(Producer producer, byte[] value, string topic, byte[] key, - int? partition = (), int? timestamp = ()) returns Error? = + int? partition = (), int? timestamp = (), [string, byte[]][] headers = []) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues" } external; diff --git a/ballerina/tests/consumer_client_tests.bal b/ballerina/tests/consumer_client_tests.bal index 2c4cce8f..62dea8a5 100644 --- a/ballerina/tests/consumer_client_tests.bal +++ b/ballerina/tests/consumer_client_tests.bal @@ -99,6 +99,7 @@ ProducerConfiguration producerConfiguration = { requestTimeout: 2, retryCount: 3 }; + Producer producer = check new (DEFAULT_URL, producerConfiguration); @test:Config {enable: true} @@ -1483,6 +1484,6 @@ function commitOffsetWithPolledOffsetValue() returns error? { check consumer->close(); } -function sendMessage(anydata message, string topic, anydata? key = ()) returns error? { - return producer->send({ topic: topic, value: message, key }); +function sendMessage(anydata message, string topic, anydata? key = (), map? headers = ()) returns error? { + return producer->send({ topic: topic, value: message, key, headers }); } diff --git a/ballerina/tests/consumer_constraint_tests.bal b/ballerina/tests/consumer_constraint_tests.bal index da448fff..813ca26c 100644 --- a/ballerina/tests/consumer_constraint_tests.bal +++ b/ballerina/tests/consumer_constraint_tests.bal @@ -32,14 +32,13 @@ public type StringConstraintConsumerRecord record {| |}; public type IntConstraintConsumerRecord record {| + *AnydataConsumerRecord; int key?; @constraint:Int { maxValue: 100, minValue: 10 } int value; - int timestamp; - PartitionOffset offset; |}; @constraint:Float { diff --git a/ballerina/tests/consumer_header_tests.bal b/ballerina/tests/consumer_header_tests.bal new file mode 100644 index 00000000..b5f4bb8b --- /dev/null +++ b/ballerina/tests/consumer_header_tests.bal @@ -0,0 +1,239 @@ +// Copyright (c) 2024 WSO2 LLC. (http://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +type StringHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringHeadersTest() returns error? { + string topic = "consumer-read-string-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": "header1", "key2": "header3"}); + check consumer->close(); +} + +type StringArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringArrayHeadersTest() returns error? { + string topic = "consumer-read-string-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1", "header2"], "key2": ["header3"]}); + check consumer->close(); +} + +type ByteHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadByteHeadersTest() returns error? { + string topic = "consumer-read-byte-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + ByteHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": "header1".toBytes(), "key2": "header3".toBytes()}); + check consumer->close(); +} + +type ByteArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadByteArrayHeadersTest() returns error? { + string topic = "consumer-read-byte-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + ByteArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": ["header3".toBytes()]}); + check consumer->close(); +} + +type StringAndStringArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringAndStringArrayHeadersTest() returns error? { + string topic = "consumer-read-string-string-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-string-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringAndStringArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1", "header2"], "key2": "header3"}); + check consumer->close(); +} + +type StringAndByteArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadStringAndByteArrayHeadersTest() returns error? { + string topic = "consumer-read-string-and-byte-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-string-and-byte-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + StringAndByteArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": "header1".toBytes(), "key2": "header3"}); + check consumer->close(); +} + +type ByteAndStringArrayHeaderConsumerRecord record {| + *AnydataConsumerRecord; + map headers; + string value; +|}; + +@test:Config {enable: true} +function consumerReadByteAndStringArrayHeadersTest() returns error? { + string topic = "consumer-read-byte-and-string-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-and-string-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + ByteAndStringArrayHeaderConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1", "header2"], "key2": "header3".toBytes()}); + check consumer->close(); +} + +@test:Config {enable: true} +function consumerReadByteAndByteArrayHeadersTest() returns error? { + string topic = "consumer-read-byte-and-byte-array-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-byte-and-byte-array-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}); + check consumer->close(); +} + +@test:Config {enable: true} +function consumerReadAllSupportedTypesHeadersTest() returns error? { + string topic = "consumer-read-all-types-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers); + ConsumerConfiguration consumerConfiguration = { + topics: [topic], + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "consumer-read-all-types-headers-test-group", + clientId: "test-consumer-61" + }; + Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); + AnydataConsumerRecord[] consumerRecords = check consumer->poll(5); + test:assertEquals(consumerRecords.length(), 1); + map receivedHeaders = consumerRecords[0].headers; + test:assertEquals(receivedHeaders, {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}); + check consumer->close(); +} diff --git a/ballerina/tests/listener_client_tests.bal b/ballerina/tests/listener_client_tests.bal index 9bdeab17..cdb2594e 100644 --- a/ballerina/tests/listener_client_tests.bal +++ b/ballerina/tests/listener_client_tests.bal @@ -30,6 +30,7 @@ string detachMsg1 = ""; string detachMsg2 = ""; string incorrectEndpointMsg = ""; string receivedTimeoutConfigValue = ""; +map receivedHeaders = {}; int receivedMsgCount = 0; @@ -986,3 +987,33 @@ function listenerWithPollTimeoutConfigTest() returns error? { check configListener.gracefulStop(); test:assertEquals(receivedTimeoutConfigValue, TEST_MESSAGE); } + +@test:Config {enable: true} +function listenerWithConsumerHeadersTest() returns error? { + string topic = "listener-consumer-headers-test-topic"; + kafkaTopics.push(topic); + map? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()}; + check sendMessage(TEST_MESSAGE, topic, (), headers); + + Service headersService = + service object { + remote function onConsumerRecord(BytesConsumerRecord[] records) returns error? { + foreach int i in 0 ... records.length() - 1 { + receivedHeaders = records[i].headers; + } + } + }; + + ConsumerConfiguration consumerConfiguration = { + topics: topic, + offsetReset: OFFSET_RESET_EARLIEST, + groupId: "test-listener-group-29", + clientId: "test-listener-29" + }; + Listener headersListener = check new (DEFAULT_URL, consumerConfiguration); + check headersListener.attach(headersService); + check headersListener.'start(); + runtime:sleep(3); + check headersListener.gracefulStop(); + test:assertEquals(receivedHeaders, headers); +} diff --git a/ballerina/tests/listener_data_binding_tests.bal b/ballerina/tests/listener_data_binding_tests.bal index 4085e762..b5b57a48 100644 --- a/ballerina/tests/listener_data_binding_tests.bal +++ b/ballerina/tests/listener_data_binding_tests.bal @@ -91,6 +91,7 @@ public type IntConsumerRecord record {| int value; int timestamp; PartitionOffset offset; + map headers; |}; public type FloatConsumerRecord record {| @@ -139,6 +140,7 @@ public type TableConsumerRecord record {| table value; int timestamp; PartitionOffset offset; + map headers; |}; public type JsonConsumerRecord record {| @@ -146,6 +148,7 @@ public type JsonConsumerRecord record {| json key?; int timestamp; json value; + map headers; |}; public type PayloadConsumerRecord record {| @@ -159,6 +162,7 @@ public type PayloadConsumerRecord record {| int partition; |} partition; |} offset; + map headers?; |}; PayloadConsumerRecord payloadConsumerRecord = { diff --git a/ballerina/tests/producer_client_tests.bal b/ballerina/tests/producer_client_tests.bal index 83d9069c..76e9c4ec 100644 --- a/ballerina/tests/producer_client_tests.bal +++ b/ballerina/tests/producer_client_tests.bal @@ -102,7 +102,7 @@ function producerKeyTypeMismatchErrorTest() returns error? { string topic = "key-type-mismatch-error-test-topic"; Producer producer = check new (DEFAULT_URL, producerConfiguration); string message = "Hello, Ballerina"; - error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, MESSAGE_KEY, 0, (), SER_BYTE_ARRAY); + error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, [], MESSAGE_KEY, 0, (), SER_BYTE_ARRAY); if result is error { string expectedErr = "Invalid type found for Kafka key. Expected key type: 'byte[]'."; test:assertEquals(result.message(), expectedErr); diff --git a/changelog.md b/changelog.md index 189bf729..6567a393 100644 --- a/changelog.md +++ b/changelog.md @@ -5,14 +5,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +### Added + +- [Added header support for Consumr and Producer](https://github.com/ballerina-platform/ballerina-library/issues/6196) + +## [3.10.0] - 2023-09-18 + ### Changed -- [Changed disallowing service level annotations in the compiler plugin](https://github.com/ballerina-platform/ballerina-standard-library/issues/4731) - [Updated Apache Kafka client version to `3.5.1`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4752) ### Fixed - [Removed `client` keyword from the `kafka:Listener`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4750) +## [3.9.1] - 2023-08-18 + +### Changed + +- [Changed disallowing service level annotations in the compiler plugin](https://github.com/ballerina-platform/ballerina-standard-library/issues/4731) + ## [3.8.0] - 2023-06-01 ### Fixed diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal index 9160d17e..dd0ba46e 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_22/service.bal @@ -74,6 +74,7 @@ public type PayloadConsumerRecord record {| int partition; |} partition; |} offset; + map headers; |}; public type PayloadConsumerRecordWithTypeReference record {| @@ -81,6 +82,7 @@ public type PayloadConsumerRecordWithTypeReference record {| string value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type Person record {| diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal index 90ebd41b..ea266510 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_11/service.bal @@ -162,6 +162,7 @@ public type PayloadConsumerRecord record {| int partition; |} partition; |} offset; + map headers; |}; public type PayloadConsumerRecordWithTypeReference record {| @@ -169,6 +170,7 @@ public type PayloadConsumerRecordWithTypeReference record {| string value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type Person record {| diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal index af7505bc..e2fd3253 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_12/service.bal @@ -204,6 +204,7 @@ public type IntConsumerRecord record {| int value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type FloatConsumerRecord record {| @@ -252,6 +253,7 @@ public type TableConsumerRecord record {| table value; int timestamp; kafka:PartitionOffset offset; + map headers; |}; public type JsonConsumerRecord record {| @@ -259,4 +261,5 @@ public type JsonConsumerRecord record {| json key?; int timestamp; json value; + map headers; |}; diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java index 4421bf27..80c92a5b 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaFunctionValidator.java @@ -87,6 +87,7 @@ import static io.ballerina.compiler.syntax.tree.SyntaxKind.UNION_TYPE_DESC; import static io.ballerina.compiler.syntax.tree.SyntaxKind.XML_TYPE_DESC; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CALLER; +import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_HEADERS; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_KEY; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_OFFSET; import static io.ballerina.stdlib.kafka.plugin.PluginConstants.CONSUMER_RECORD_PARTITION; @@ -393,10 +394,11 @@ private boolean isConsumerRecordType(TypeSymbol typeSymbol) { } private boolean validateConsumerRecordFields(Map fieldDescriptors) { - if (fieldDescriptors.size() != 4 || !fieldDescriptors.containsKey(CONSUMER_RECORD_KEY) || + if (fieldDescriptors.size() != 5 || !fieldDescriptors.containsKey(CONSUMER_RECORD_KEY) || !fieldDescriptors.containsKey(CONSUMER_RECORD_VALUE) || !fieldDescriptors.containsKey(CONSUMER_RECORD_TIMESTAMP) || - !fieldDescriptors.containsKey(CONSUMER_RECORD_OFFSET)) { + !fieldDescriptors.containsKey(CONSUMER_RECORD_OFFSET) || + !fieldDescriptors.containsKey(CONSUMER_RECORD_HEADERS)) { return false; } if (fieldDescriptors.get(CONSUMER_RECORD_TIMESTAMP).typeDescriptor().typeKind() != INT) { @@ -406,6 +408,9 @@ private boolean validateConsumerRecordFields(Map fiel fieldDescriptors.get(CONSUMER_RECORD_OFFSET).typeDescriptor().typeKind() != RECORD) { return false; } + if (fieldDescriptors.get(CONSUMER_RECORD_HEADERS).typeDescriptor().typeKind() != MAP) { + return false; + } if (!validateOffsetField(fieldDescriptors.get(CONSUMER_RECORD_OFFSET).typeDescriptor())) { return false; } diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java index 83651da3..c96c80bd 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java @@ -42,6 +42,7 @@ public class PluginConstants { public static final String CONSUMER_RECORD_OFFSET = "offset"; public static final String CONSUMER_RECORD_TOPIC = "topic"; public static final String CONSUMER_RECORD_PARTITION = "partition"; + public static final String CONSUMER_RECORD_HEADERS = "headers"; // return types error or nil public static final String ERROR = "error"; diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 4991c612..f5248240 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -219,6 +219,8 @@ public type AnydataProducerRecord record {| int timestamp?; # Partition to which the record should be sent int partition?; + # Map of headers to be included with the record + map headers?; |}; ``` * `kafka:BytesProducerRecord` defines the subtype of `kafka:AnydataProducerRecord` where the value is a `byte[]`; @@ -418,6 +420,8 @@ public type AnydataConsumerRecord record {| int timestamp; # Topic partition position in which the consumed record is stored PartitionOffset offset; + # Map of headers included with the record + map headers; |}; ``` * `kafka:BytesConsumerRecord` defines the subtype of `kafka:AnydataConsumerRecord` where the value is a `byte[]`. diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java index 73925fbc..93cbc606 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/SendByteArrayValues.java @@ -23,9 +23,14 @@ import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + import static io.ballerina.stdlib.kafka.utils.KafkaConstants.ALIAS_PARTITION; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getIntValue; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getLongValue; @@ -44,22 +49,34 @@ public class SendByteArrayValues extends Send { // ballerina byte[] public static Object sendByteArrayValuesNilKeys(Environment env, BObject producer, BArray value, BString topic, - Object partition, Object timestamp) { + Object partition, Object timestamp, BArray headerList) { Integer partitionValue = getIntValue(partition, ALIAS_PARTITION, logger); Long timestampValue = getLongValue(timestamp); + List
headers = getHeadersFromBHeaders(headerList); ProducerRecord kafkaRecord = new ProducerRecord<>(topic.getValue(), partitionValue, timestampValue, - null, value.getBytes()); + null, value.getBytes(), headers); return sendKafkaRecord(env, kafkaRecord, producer); } // ballerina byte[] and ballerina byte[] public static Object sendByteArrayValuesByteArrayKeys(Environment env, BObject producer, BArray value, BString topic, BArray key, Object partition, - Object timestamp) { + Object timestamp, BArray headerList) { Integer partitionValue = getIntValue(partition, ALIAS_PARTITION, logger); Long timestampValue = getLongValue(timestamp); + List
headers = getHeadersFromBHeaders(headerList); ProducerRecord kafkaRecord = new ProducerRecord<>(topic.getValue(), partitionValue, - timestampValue, key.getBytes(), value.getBytes()); + timestampValue, key.getBytes(), value.getBytes(), headers); return sendKafkaRecord(env, kafkaRecord, producer); } + + private static List
getHeadersFromBHeaders(BArray headerList) { + List
headers = new ArrayList<>(); + for (int i = 0; i < headerList.size(); i++) { + BArray headerItem = (BArray) headerList.get(i); + headers.add(new RecordHeader(headerItem.getBString(0).getValue(), + ((BArray) headerItem.get(1)).getByteArray())); + } + return headers; + } } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java index 0fcabd38..30033d11 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java @@ -66,8 +66,9 @@ private KafkaConstants() { public static final String KAFKA_RESOURCE_IS_ANYDATA_CONSUMER_RECORD = "isAnydataConsumerRecord"; public static final String KAFKA_RECORD_KEY = "key"; public static final String KAFKA_RECORD_VALUE = "value"; - public static final String KAFKA_RECORD_TIMESTAMP = "timestamp"; - public static final String KAFKA_RECORD_PARTITION_OFFSET = "offset"; + public static final BString KAFKA_RECORD_TIMESTAMP = StringUtils.fromString("timestamp"); + public static final BString KAFKA_RECORD_PARTITION_OFFSET = StringUtils.fromString("offset"); + public static final BString KAFKA_RECORD_HEADERS = StringUtils.fromString("headers"); public static final String PARAM_ANNOTATION_PREFIX = "$param$."; public static final BString PARAM_PAYLOAD_ANNOTATION_NAME = StringUtils.fromString( diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index 2977ebdd..8ef497d8 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -26,6 +26,7 @@ import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.Field; import io.ballerina.runtime.api.types.IntersectionType; +import io.ballerina.runtime.api.types.MapType; import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.RecordType; @@ -57,6 +58,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; import java.io.IOException; @@ -75,7 +78,6 @@ import static io.ballerina.runtime.api.TypeTags.ARRAY_TAG; import static io.ballerina.runtime.api.TypeTags.BYTE_TAG; import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; -import static io.ballerina.runtime.api.TypeTags.RECORD_TYPE_TAG; import static io.ballerina.runtime.api.TypeTags.STRING_TAG; import static io.ballerina.runtime.api.TypeTags.UNION_TAG; import static io.ballerina.runtime.api.TypeTags.XML_TAG; @@ -86,6 +88,7 @@ import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSUMER_ENABLE_AUTO_COMMIT_CONFIG; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSUMER_ENABLE_AUTO_SEEK_CONFIG; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_ERROR; +import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_HEADERS; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_KEY; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_PARTITION_OFFSET; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.KAFKA_RECORD_TIMESTAMP; @@ -124,14 +127,6 @@ public static Properties processKafkaConsumerConfig(Object bootStrapServers, BMa addDeserializerConfigs(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties); addDeserializerConfigs(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties); - // TODO: Disabled as the custom SerDes support is to be revisited and improved. - // Fix once the design for that is completed. -// addCustomDeserializer(KafkaConstants.CONSUMER_KEY_DESERIALIZER_CONFIG, -// KafkaConstants.CONSUMER_KEY_DESERIALIZER_TYPE_CONFIG, properties, -// configurations); -// addCustomDeserializer(KafkaConstants.CONSUMER_VALUE_DESERIALIZER_CONFIG, -// KafkaConstants.CONSUMER_VALUE_DESERIALIZER_TYPE_CONFIG, properties, -// configurations); addStringParamIfPresent(KafkaConstants.SCHEMA_REGISTRY_URL, configurations, properties, KafkaConstants.CONSUMER_SCHEMA_REGISTRY_URL); @@ -235,11 +230,6 @@ public static Properties processKafkaProducerConfig(Object bootstrapServers, BMa addSerializerTypeConfigs(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties); addSerializerTypeConfigs(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties); - // TODO: Disabled as the custom SerDes support is to be revisited and improved. - // Fix once the design for that is completed. -// addCustomKeySerializer(properties, configurations); -// addCustomValueSerializer(properties, configurations); - addIntParamIfPresent(ProducerConfig.BUFFER_MEMORY_CONFIG, configurations, properties, KafkaConstants.PRODUCER_BUFFER_MEMORY_CONFIG); addIntParamIfPresent(ProducerConfig.RETRIES_CONFIG, configurations, @@ -399,75 +389,6 @@ private static void addDeserializerConfigs(String paramName, Properties configPa configParams.put(paramName, KafkaConstants.BYTE_ARRAY_DESERIALIZER); } - // TODO: Disabled as the SerDes support is to be revisited and improved. Fix once the design for that is completed. -// private static void addCustomKeySerializer(Properties properties, BMap configurations) { -// Object serializer = configurations.get(KafkaConstants.PRODUCER_KEY_SERIALIZER_CONFIG); -// String serializerType = -// configurations.getStringValue(KafkaConstants.PRODUCER_KEY_SERIALIZER_TYPE_CONFIG).getValue(); -// if (Objects.nonNull(serializer) && KafkaConstants.SERDES_CUSTOM.equals(serializerType)) { -// properties.put(KafkaConstants.PRODUCER_KEY_SERIALIZER_CONFIG.getValue(), -// configurations.get(KafkaConstants.PRODUCER_KEY_SERIALIZER_CONFIG)); -// } -// } -// -// private static void addCustomValueSerializer(Properties properties, BMap configurations) { -// Object serializer = configurations.get(KafkaConstants.PRODUCER_VALUE_SERIALIZER_CONFIG); -// String serializerType = -// configurations.getStringValue(KafkaConstants.PRODUCER_VALUE_SERIALIZER_TYPE_CONFIG).getValue(); -// if (Objects.nonNull(serializer) && KafkaConstants.SERDES_CUSTOM.equals(serializerType)) { -// properties.put(KafkaConstants.PRODUCER_VALUE_SERIALIZER_CONFIG.getValue(), -// configurations.get(KafkaConstants.PRODUCER_VALUE_SERIALIZER_CONFIG)); -// } -// } - -// private static void addCustomDeserializer(BString configParam, BString typeConfig, Properties properties, -// BMap configurations) { -// Object deserializer = configurations.get(configParam); -// String deserializerType = configurations.getStringValue(typeConfig).getValue(); -// if (Objects.nonNull(deserializer) && KafkaConstants.SERDES_CUSTOM.equals(deserializerType)) { -// properties.put(configParam.getValue(), configurations.get(configParam)); -// properties.put(KafkaConstants.BALLERINA_STRAND, Runtime.getCurrentRuntime()); -// } -// } - -// private static String getSerializerType(String value) { -// switch (value) { -// case KafkaConstants.SERDES_BYTE_ARRAY: -// return KafkaConstants.BYTE_ARRAY_SERIALIZER; -// case KafkaConstants.SERDES_STRING: -// return KafkaConstants.STRING_SERIALIZER; -// case KafkaConstants.SERDES_INT: -// return KafkaConstants.INT_SERIALIZER; -// case KafkaConstants.SERDES_FLOAT: -// return KafkaConstants.FLOAT_SERIALIZER; -// case KafkaConstants.SERDES_AVRO: -// return KafkaConstants.AVRO_SERIALIZER; -// case KafkaConstants.SERDES_CUSTOM: -// return KafkaConstants.CUSTOM_SERIALIZER; -// default: -// return value; -// } -// } -// -// private static String getDeserializerValue(String value) { -// switch (value) { -// case KafkaConstants.SERDES_BYTE_ARRAY: -// return KafkaConstants.BYTE_ARRAY_DESERIALIZER; -// case KafkaConstants.SERDES_STRING: -// return KafkaConstants.STRING_DESERIALIZER; -// case KafkaConstants.SERDES_INT: -// return KafkaConstants.INT_DESERIALIZER; -// case KafkaConstants.SERDES_FLOAT: -// return KafkaConstants.FLOAT_DESERIALIZER; -// case KafkaConstants.SERDES_AVRO: -// return KafkaConstants.AVRO_DESERIALIZER; -// case KafkaConstants.SERDES_CUSTOM: -// return KafkaConstants.CUSTOM_DESERIALIZER; -// default: -// return value; -// } -// } - private static void addStringParamIfPresent(String paramName, BMap configs, Properties configParams, @@ -614,18 +535,100 @@ public static BMap populateConsumerRecord(ConsumerRecord record Object value = getValueWithIntendedType(valueType, (byte[]) record.value(), record, autoSeek); BMap topicPartition = ValueCreator.createRecordValue(getTopicPartitionRecord(), record.topic(), (long) record.partition()); + MapType headerType = (MapType) getReferredType(fieldMap.get(KAFKA_RECORD_HEADERS.getValue()).getFieldType()); + BMap bHeaders = getBHeadersFromConsumerRecord(record.headers(), headerType.getConstrainedType()); BMap consumerRecord = ValueCreator.createRecordValue(recordType); consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_KEY), key); consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_VALUE), value); - consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_TIMESTAMP), record.timestamp()); - consumerRecord.put(StringUtils.fromString(KAFKA_RECORD_PARTITION_OFFSET), ValueCreator.createRecordValue( + consumerRecord.put(KAFKA_RECORD_TIMESTAMP, record.timestamp()); + consumerRecord.put(KAFKA_RECORD_PARTITION_OFFSET, ValueCreator.createRecordValue( getPartitionOffsetRecord(), topicPartition, record.offset())); + consumerRecord.put(KAFKA_RECORD_HEADERS, bHeaders); if (validateConstraints) { validateConstraints(consumerRecord, ValueCreator.createTypedescValue(recordType), record, autoSeek); } return consumerRecord; } + private static BMap getBHeadersFromConsumerRecord(Headers headers, Type headerType) { + HashMap> headerMap = new HashMap<>(); + for (Header header : headers) { + if (headerMap.containsKey(header.key())) { + ArrayList headerList = headerMap.get(header.key()); + headerList.add(header.value()); + } else { + ArrayList headerList = new ArrayList(); + headerList.add(header.value()); + headerMap.put(header.key(), headerList); + } + } + BMap bHeaderMap = ValueCreator.createMapValue(); + headerMap.forEach((key, valueList) -> { + if (headerType instanceof UnionType unionType) { + Type appropriateType = getMostAppropriateTypeFromUnionType(unionType.getMemberTypes(), + valueList.size()); + handleSupportedTypesForHeaders(key, valueList, appropriateType, bHeaderMap); + } else { + handleSupportedTypesForHeaders(key, valueList, headerType, bHeaderMap); + } + }); + return bHeaderMap; + } + + private static void handleSupportedTypesForHeaders(String key, ArrayList list, Type appropriateType, + BMap bHeaderMap) { + if (appropriateType instanceof ArrayType arrayType) { + handleHeaderValuesWithArrayType(key, list, arrayType, bHeaderMap); + } else if (appropriateType.getTag() == STRING_TAG) { + bHeaderMap.put(StringUtils.fromString(key), StringUtils.fromString(new String(list.get(0), + StandardCharsets.UTF_8))); + } + } + + private static void handleHeaderValuesWithArrayType(String key, ArrayList list, ArrayType arrayType, + BMap bHeaderMap) { + Type elementType = arrayType.getElementType(); + if (elementType.getTag() == ARRAY_TAG) { + BArray valueArray = ValueCreator.createArrayValue(arrayType); + for (int i = 0; i < list.size(); i++) { + valueArray.add(i, ValueCreator.createArrayValue(list.get(i))); + } + bHeaderMap.put(StringUtils.fromString(key), valueArray); + } else if (elementType.getTag() == STRING_TAG) { + BArray valueArray = ValueCreator.createArrayValue(arrayType); + for (int i = 0; i < list.size(); i++) { + valueArray.add(i, StringUtils.fromString(new String(list.get(i), StandardCharsets.UTF_8))); + } + bHeaderMap.put(StringUtils.fromString(key), valueArray); + } else if (elementType.getTag() == BYTE_TAG) { + bHeaderMap.put(StringUtils.fromString(key), ValueCreator.createArrayValue(list.get(0))); + } + } + + private static Type getMostAppropriateTypeFromUnionType(List memberTypes, int size) { + Type firstType = memberTypes.get(0); + if (memberTypes.size() == 1) { + return firstType; + } + if (size > 1) { + if (firstType instanceof ArrayType arrayType) { + if (arrayType.getElementType().getTag() == BYTE_TAG) { + return getMostAppropriateTypeFromUnionType(memberTypes.subList(1, memberTypes.size()), size); + } + return arrayType; + } + return getMostAppropriateTypeFromUnionType(memberTypes.subList(1, memberTypes.size()), size); + } else { + if (firstType instanceof ArrayType arrayType) { + if (arrayType.getElementType().getTag() == BYTE_TAG) { + return arrayType; + } + return getMostAppropriateTypeFromUnionType(memberTypes.subList(1, memberTypes.size()), size); + } + return firstType; + } + } + public static BArray getConsumerRecords(ConsumerRecords records, RecordType recordType, boolean readonly, boolean validateConstraints, boolean autoCommit, KafkaConsumer consumer, boolean autoSeek) { @@ -709,11 +712,8 @@ public static Object getValueWithIntendedType(Type type, byte[] value, ConsumerR case ANYDATA_TAG: intendedValue = ValueCreator.createArrayValue(value); break; - case RECORD_TYPE_TAG: - intendedValue = ValueUtils.convert(JsonUtils.parse(strValue), type); - break; case UNION_TAG: - if (hasStringType((UnionType) type)) { + if (hasExpectedType((UnionType) type, STRING_TAG)) { intendedValue = StringUtils.fromString(strValue); break; } @@ -737,9 +737,9 @@ public static Object getValueWithIntendedType(Type type, byte[] value, ConsumerR return intendedValue; } - private static boolean hasStringType(UnionType type) { + private static boolean hasExpectedType(UnionType type, int typeTag) { return type.getMemberTypes().stream().anyMatch(memberType -> { - if (memberType.getTag() == STRING_TAG) { + if (memberType.getTag() == typeTag) { return true; } return false;