diff --git a/README.md b/README.md index 626548f2..f704fdd1 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ kafka:ConsumerConfiguration consumerConfiguration = { listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) { // processes the records ... // commits the offsets manually @@ -85,7 +85,7 @@ string key = "my-key"; check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() }); ``` ```ballerina -kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1); +kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1); foreach var kafkaRecord in records { byte[] messageContent = kafkaRecord.value; diff --git a/ballerina/Module.md b/ballerina/Module.md index 9a9b89f5..5a3a08a2 100644 --- a/ballerina/Module.md +++ b/ballerina/Module.md @@ -52,7 +52,7 @@ kafka:ConsumerConfiguration consumerConfiguration = { listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) { // processes the records ... // commits the offsets manually @@ -78,7 +78,7 @@ string key = "my-key"; check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() }); ``` ```ballerina -kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1); +kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1); foreach var kafkaRecord in records { byte[] messageContent = kafkaRecord.value; diff --git a/ballerina/Package.md b/ballerina/Package.md index 17be1588..d006d7c6 100644 --- a/ballerina/Package.md +++ b/ballerina/Package.md @@ -51,7 +51,7 @@ kafka:ConsumerConfiguration consumerConfiguration = { listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) { // processes the records ... // commits the offsets manually @@ -77,7 +77,7 @@ string key = "my-key"; check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() }); ``` ```ballerina -kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1); +kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1); foreach var kafkaRecord in records { byte[] messageContent = kafkaRecord.value; diff --git a/ballerina/consumer.bal b/ballerina/consumer.bal index d4ee2048..761a49a3 100644 --- a/ballerina/consumer.bal +++ b/ballerina/consumer.bal @@ -262,7 +262,7 @@ public client isolated class Consumer { 'class: "io.ballerina.stdlib.kafka.nativeimpl.consumer.Poll" } external; - # Polls the external broker to retrieve messages in the required data type without the `kafka:ConsumerRecord` + # Polls the external broker to retrieve messages in the required data type without the `kafka:AnydataConsumerRecord` # information. # ```ballerina # Person[] persons = check consumer->pollPayload(10); diff --git a/ballerina/kafka_records.bal b/ballerina/kafka_records.bal index c09844cf..73adf326 100644 --- a/ballerina/kafka_records.bal +++ b/ballerina/kafka_records.bal @@ -191,25 +191,6 @@ public type TopicPartition record {| int partition; |}; -# Type related to consumer record. -# -# + key - Key that is included in the record -# + value - Record content -# + timestamp - Timestamp of the record, in milliseconds since epoch -# + offset - Topic partition position in which the consumed record is stored -# + headers - Map of headers included with the record -# # Deprecated -# Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord -# instead to support data-binding -@deprecated -public type ConsumerRecord record {| - byte[] key?; - byte[] value; - int timestamp; - PartitionOffset offset; - map headers; -|}; - # Type related to anydata consumer record. # # + key - Key that is included in the record @@ -235,27 +216,6 @@ public type BytesConsumerRecord record {| map headers; |}; -# Details related to the producer record. -# -# + topic - Topic to which the record will be appended -# + key - Key that is included in the record -# + value - Record content -# + timestamp - Timestamp of the record, in milliseconds since epoch -# + partition - Partition to which the record should be sent -# + headers - Map of headers to be included with the record -# # Deprecated -# Usage of this record is deprecated. Use subtypes of AnydataProducerRecord -# instead to support data-binding -@deprecated -public type ProducerRecord record {| - string topic; - byte[] key?; - byte[] value; - int timestamp?; - int partition?; - map headers?; -|}; - # Details related to the anydata producer record. # # + topic - Topic to which the record will be appended diff --git a/ballerina/tests/consumer_client_tests.bal b/ballerina/tests/consumer_client_tests.bal index 62dea8a5..400b415f 100644 --- a/ballerina/tests/consumer_client_tests.bal +++ b/ballerina/tests/consumer_client_tests.bal @@ -114,10 +114,10 @@ function consumerCloseTest() returns error? { clientId: "test-consumer-11" }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); Error? closeresult = consumer->close(); test:assertFalse(closeresult is Error, closeresult is Error ? closeresult.toString() : closeresult.toString()); - ConsumerRecord[]|Error result = consumer->poll(5); + BytesConsumerRecord[]|Error result = consumer->poll(5); test:assertTrue(result is Error); if result is Error { string expectedErr = "Failed to poll from the Kafka server: This consumer has already been closed."; @@ -136,10 +136,10 @@ function consumerCloseWithDurationTest() returns error? { clientId: "test-consumer-12" }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); Error? closeresult = consumer->close(TIMEOUT_DURATION); test:assertFalse(closeresult is Error, closeresult is Error ? closeresult.toString() : closeresult.toString()); - ConsumerRecord[]|Error result = consumer->poll(5); + BytesConsumerRecord[]|Error result = consumer->poll(5); test:assertTrue(result is Error); if result is Error { string expectedErr = "Failed to poll from the Kafka server: This consumer has already been closed."; @@ -159,10 +159,10 @@ function consumerCloseWithDefaultTimeoutTest() returns error? { defaultApiTimeout: DEFAULT_TIMEOUT }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); Error? closeresult = consumer->close(); test:assertFalse(closeresult is Error, closeresult is Error ? closeresult.toString() : closeresult.toString()); - ConsumerRecord[]|Error result = consumer->poll(5); + BytesConsumerRecord[]|Error result = consumer->poll(5); test:assertTrue(result is Error); if result is Error { string expectedErr = "Failed to poll from the Kafka server: This consumer has already been closed."; @@ -186,7 +186,7 @@ function consumerConfigTest() returns error? { }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); check sendMessage(TEST_MESSAGE.toBytes(), topic); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -203,7 +203,7 @@ function consumerFunctionsTest() returns error? { clientId: "test-consumer-15" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); byte[] value = consumerRecords[0].value; string message = check 'string:fromBytes(value); @@ -237,7 +237,7 @@ function consumerSeekTest() returns error? { Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); check consumer->assign([topicPartition]); check consumer->seek(partitionOffset); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); Error? result = consumer->seekToBeginning([nonExistingPartition]); @@ -267,7 +267,7 @@ function consumerSeekToBeginningTest() returns error? { partition: 0 }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->seekToBeginning([topicPartition]); consumerRecords = check consumer->poll(5); @@ -300,7 +300,7 @@ function consumerSeekToEndTest() returns error? { partition: 0 }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check sendMessage(TEST_MESSAGE.toBytes(), topic); check consumer->seekToEnd([topicPartition]); @@ -368,7 +368,7 @@ function consumerPositionOffsetsTest() returns error? { test:assertEquals(partitionOffsetBefore, 0, "Expected: 0. Received: " + partitionOffsetBefore.toString()); check sendMessage(TEST_MESSAGE.toBytes(), topic); check sendMessage(TEST_MESSAGE.toBytes(), topic); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); int partitionOffsetAfter = check consumer->getPositionOffset(topicPartition, TIMEOUT_DURATION); test:assertEquals(partitionOffsetAfter, 2, "Expected: 2. Received: " + partitionOffsetAfter.toString()); check consumer->close(); @@ -381,7 +381,7 @@ function consumerPositionOffsetsTest() returns error? { consumer = check new (DEFAULT_URL, consumerConfiguration); check consumer->assign([topicPartition]); check sendMessage(TEST_MESSAGE.toBytes(), topic); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); partitionOffsetAfter = check consumer->getPositionOffset(topicPartition); test:assertEquals(partitionOffsetAfter, 3, "Expected: 3. Received: " + partitionOffsetAfter.toString()); check consumer->close(); @@ -409,11 +409,11 @@ function consumerBeginningOffsetsTest() returns error? { }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); check consumer->assign([topic1Partition, topic2Partition]); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); PartitionOffset[] partitionEndOffsets = check consumer->getBeginningOffsets([topic1Partition, topic2Partition]); test:assertEquals(partitionEndOffsets[0].offset, 0, "Expected: 0. Received: " + partitionEndOffsets[0].offset.toString()); test:assertEquals(partitionEndOffsets[1].offset, 0, "Expected: 0. Received: " + partitionEndOffsets[1].offset.toString()); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); partitionEndOffsets = check consumer->getBeginningOffsets([topic1Partition, topic2Partition], TIMEOUT_DURATION); test:assertEquals(partitionEndOffsets[0].offset, 0, "Expected: 0. Received: " + partitionEndOffsets[0].offset.toString()); test:assertEquals(partitionEndOffsets[1].offset, 0, "Expected: 0. Received: " + partitionEndOffsets[1].offset.toString()); @@ -426,7 +426,7 @@ function consumerBeginningOffsetsTest() returns error? { }; consumer = check new (DEFAULT_URL, consumerConfiguration); check consumer->assign([topic1Partition, topic2Partition]); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); partitionEndOffsets = check consumer->getBeginningOffsets([topic1Partition, topic2Partition]); test:assertEquals(partitionEndOffsets[0].offset, 0, "Expected: 0. Received: " + partitionEndOffsets[0].offset.toString()); test:assertEquals(partitionEndOffsets[1].offset, 0, "Expected: 0. Received: " + partitionEndOffsets[1].offset.toString()); @@ -543,7 +543,7 @@ function consumerPauseResumePartitionTest() returns error? { check sendMessage(TEST_MESSAGE.toBytes(), topic); Error? result = consumer->pause([topicPartition]); test:assertFalse(result is error, result is error ? result.toString() : result.toString()); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 0, "Expected: 0. Received: " + consumerRecords.length().toString()); TopicPartition[] pausedPartitions = check consumer->getPausedPartitions(); @@ -682,7 +682,7 @@ function consumerSubscribeTest() returns error? { string[] subscribedTopics = check consumer->getSubscription(); test:assertEquals(subscribedTopics.length(), 0); check consumer->subscribeWithPattern("consumer.*"); - ConsumerRecord[] _ = check consumer->poll(1); // Polling to force-update the metadata + BytesConsumerRecord[] _ = check consumer->poll(1); // Polling to force-update the metadata string[] newSubscribedTopics = check consumer->getSubscription(); test:assertEquals(newSubscribedTopics.sort(), kafkaTopics.filter(function (string topic) returns boolean { return topic.startsWith("consumer"); @@ -799,7 +799,7 @@ function manualCommitTest() returns error? { check sendMessage(count.toString().toBytes(), topic); count += 1; } - ConsumerRecord[] _ = check consumer->poll(1); + BytesConsumerRecord[] _ = check consumer->poll(1); TopicPartition topicPartition = { topic: topic, partition: 0 @@ -838,7 +838,7 @@ function manualCommitWithDurationTest() returns error? { autoCommit: false }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(1); + BytesConsumerRecord[] _ = check consumer->poll(1); int manualCommitOffset = 5; TopicPartition topicPartition = { topic: topic, @@ -870,7 +870,7 @@ function manualCommitWithDefaultTimeoutTest() returns error? { defaultApiTimeout: DEFAULT_TIMEOUT }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(1); + BytesConsumerRecord[] _ = check consumer->poll(1); int manualCommitOffset = 5; TopicPartition topicPartition = { topic: topic, @@ -925,14 +925,14 @@ function consumerOperationsWithReceivedTopicPartitionsTest() returns error? { clientId: "test-consumer-51" }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(1); + BytesConsumerRecord[] _ = check consumer->poll(1); TopicPartition[] partitions = check consumer->getAssignment(); check consumer->unsubscribe(); check sendMessage(TEST_MESSAGE.toBytes(), topic); check consumer->assign(partitions); check consumer->seekToEnd(partitions); - ConsumerRecord[] consumerRecords = check consumer->poll(1); + BytesConsumerRecord[] consumerRecords = check consumer->poll(1); test:assertEquals(consumerRecords.length(), 0, "Expected: 0. Received: " + consumerRecords.length().toString()); check sendMessage(TEST_MESSAGE.toBytes(), topic); consumerRecords = check consumer->poll(1); @@ -990,7 +990,7 @@ function saslConsumerTest() returns error? { securityProtocol: PROTOCOL_SASL_PLAINTEXT }; Consumer consumer = check new(SASL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -1015,7 +1015,7 @@ function saslConsumerIncorrectCredentialsTest() returns error? { securityProtocol: PROTOCOL_SASL_PLAINTEXT }; Consumer consumer = check new(SASL_URL, consumerConfiguration); - ConsumerRecord[]|Error result = consumer->poll(5); + BytesConsumerRecord[]|Error result = consumer->poll(5); if result is Error { string errorMsg = "Failed to poll from the Kafka server: Authentication failed: Invalid username or password"; test:assertEquals(result.message(), errorMsg); @@ -1046,7 +1046,7 @@ function consumerAdditionalPropertiesTest() returns error? { }; Consumer consumer = check new(DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(5); + BytesConsumerRecord[] _ = check consumer->poll(5); PartitionOffset? committedOffset = check consumer->getCommittedOffset(topicPartition); test:assertEquals(committedOffset, ()); check consumer->close(); @@ -1067,7 +1067,7 @@ function sslKeystoreConsumerTest() returns error? { securityProtocol: PROTOCOL_SSL }; Consumer consumer = check new (SSL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -1100,7 +1100,7 @@ function sslCertKeyConsumerTest() returns error? { securityProtocol: PROTOCOL_SSL }; Consumer consumer = check new (SSL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -1127,7 +1127,7 @@ function sslCertOnlyConsumerTest() returns error? { securityProtocol: PROTOCOL_SSL }; Consumer consumer = check new (SSL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -1148,7 +1148,7 @@ function saslSslConsumerTest() returns error? { securityProtocol: PROTOCOL_SASL_SSL }; Consumer consumer = check new (SASL_SSL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -1165,7 +1165,7 @@ function incorrectKafkaUrlTest() returns error? { clientId: "test-consumer-49" }; Consumer consumer = check new (INCORRECT_KAFKA_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 0, "Expected: 0. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -1185,7 +1185,7 @@ function plaintextToSecuredEndpointsConsumerTest() returns error? { check sendMessage(TEST_MESSAGE.toBytes(), topic); Consumer consumer = check new (SSL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 0, "Expected: 0. Received: " + consumerRecords.length().toString()); check consumer->close(); @@ -1216,7 +1216,7 @@ function invalidSecuredEndpointsConsumerTest() returns error? { securityProtocol: PROTOCOL_SASL_PLAINTEXT }; Consumer consumer = check new (SSL_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 0, "Expected: 0. Received: " + consumerRecords.length().toString()); check consumer->close(); @@ -1278,7 +1278,7 @@ function sslIncorrectStoresConsumerTest() returns error? { securityProtocol: PROTOCOL_SSL }; Consumer consumer = check new (SSL_URL, consumerConfiguration); - ConsumerRecord[]|Error result = consumer->poll(5); + BytesConsumerRecord[]|Error result = consumer->poll(5); test:assertTrue(result is Error); if result is Error { test:assertEquals(result.message(), "Failed to poll from the Kafka server: SSL handshake failed"); @@ -1449,7 +1449,7 @@ function consumerPollFromMultipleTopicsTest() returns error? { clientId: "test-consumer-59" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 2); string message1 = check 'string:fromBytes(consumerRecords[0].value); string message2 = check 'string:fromBytes(consumerRecords[1].value); @@ -1474,7 +1474,7 @@ function commitOffsetWithPolledOffsetValue() returns error? { check sendMessage("Hello".toBytes(), topic); check sendMessage("Hello".toBytes(), topic); check sendMessage("Hello".toBytes(), topic); - ConsumerRecord[] records = check consumer->poll(3); + BytesConsumerRecord[] records = check consumer->poll(3); test:assertEquals(records.length(), 4); check consumer->commitOffset([records[2].offset]); diff --git a/ballerina/tests/listener_client_tests.bal b/ballerina/tests/listener_client_tests.bal index cdb2594e..3e025d22 100644 --- a/ballerina/tests/listener_client_tests.bal +++ b/ballerina/tests/listener_client_tests.bal @@ -299,7 +299,7 @@ function consumerServiceCommitOffsetTest() returns error? { count += 1; } Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(1); + BytesConsumerRecord[] _ = check consumer->poll(1); PartitionOffset? committedOffset = check consumer->getCommittedOffset(topicPartition); test:assertTrue(committedOffset is PartitionOffset); if committedOffset is PartitionOffset { @@ -335,7 +335,7 @@ function consumerServiceCommitTest() returns error? { count += 1; } Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] _ = check consumer->poll(1); + BytesConsumerRecord[] _ = check consumer->poll(1); PartitionOffset? committedOffset = check consumer->getCommittedOffset(topicPartition); test:assertTrue(committedOffset is PartitionOffset); if committedOffset is PartitionOffset { @@ -795,7 +795,7 @@ function invalidSecurityProtocolListenerTest() returns error? { Service messageOrderService = service object { - remote function onConsumerRecord(Caller caller, ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(Caller caller, BytesConsumerRecord[] records) returns error? { foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; string message = check 'string:fromBytes(value); @@ -806,7 +806,7 @@ service object { Service consumerService = service object { - remote function onConsumerRecord(ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(BytesConsumerRecord[] records) returns error? { foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; string message = check 'string:fromBytes(value); @@ -818,7 +818,7 @@ service object { Service consumerGracefulStopService = service object { - remote function onConsumerRecord(readonly & ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(readonly & BytesConsumerRecord[] records) returns error? { foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; string message = check 'string:fromBytes(value); @@ -830,7 +830,7 @@ service object { Service consumerImmediateStopService = service object { - remote function onConsumerRecord(Caller caller, ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(Caller caller, BytesConsumerRecord[] records) returns error? { foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; string message = check 'string:fromBytes(value); @@ -842,7 +842,7 @@ service object { Service consumerServiceWithCommit = service object { - remote function onConsumerRecord(ConsumerRecord[] records, Caller caller) returns error? { + remote function onConsumerRecord(BytesConsumerRecord[] records, Caller caller) returns error? { foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; string message = check 'string:fromBytes(value); @@ -855,7 +855,7 @@ service object { Service consumerServiceWithCommitOffset = service object { - remote function onConsumerRecord(readonly & ConsumerRecord[] records, Caller caller) returns error? { + remote function onConsumerRecord(readonly & BytesConsumerRecord[] records, Caller caller) returns error? { string topic = "listener-commit-offset-test-topic"; foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; @@ -878,7 +878,7 @@ service object { Service consumerConfigService = service object { - remote function onConsumerRecord(ConsumerRecord[] records, Caller caller) returns error? { + remote function onConsumerRecord(BytesConsumerRecord[] records, Caller caller) returns error? { foreach var kafkaRecord in records { byte[] value = kafkaRecord.value; string message = check 'string:fromBytes(value); @@ -890,7 +890,7 @@ service object { Service saslConsumerService = service object { - remote function onConsumerRecord(ConsumerRecord[] records, Caller caller) returns error? { + remote function onConsumerRecord(BytesConsumerRecord[] records, Caller caller) returns error? { foreach var consumerRecord in records { string messageContent = check 'string:fromBytes(consumerRecord.value); log:printInfo(messageContent); @@ -902,7 +902,7 @@ service object { Service saslConsumerIncorrectCredentialsService = service object { remote function onConsumerRecord(Caller caller, - ConsumerRecord[] records) returns error? { + BytesConsumerRecord[] records) returns error? { foreach var consumerRecord in records { string messageContent = check 'string:fromBytes(consumerRecord.value); log:printInfo(messageContent); @@ -913,7 +913,7 @@ service object { Service sslConsumerService = service object { - remote function onConsumerRecord(Caller caller, ConsumerRecord[] & readonly records) returns error? { + remote function onConsumerRecord(Caller caller, BytesConsumerRecord[] & readonly records) returns error? { foreach var consumerRecord in records { string messageContent = check 'string:fromBytes(consumerRecord.value); log:printInfo(messageContent); @@ -925,7 +925,7 @@ service object { Service listenerDetachService1 = service object { remote function onConsumerRecord(Caller caller, - ConsumerRecord[] records) returns error? { + BytesConsumerRecord[] records) returns error? { foreach var consumerRecord in records { string messageContent = check 'string:fromBytes(consumerRecord.value); log:printInfo(messageContent); @@ -936,7 +936,7 @@ service object { Service listenerDetachService2 = service object { - remote function onConsumerRecord(ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(BytesConsumerRecord[] records) returns error? { foreach var consumerRecord in records { string messageContent = check 'string:fromBytes(consumerRecord.value); log:printInfo(messageContent); @@ -947,7 +947,7 @@ service object { Service incorrectEndpointsService = service object { - remote function onConsumerRecord(ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(BytesConsumerRecord[] records) returns error? { foreach var consumerRecord in records { string messageContent = check 'string:fromBytes(consumerRecord.value); log:printInfo(messageContent); diff --git a/ballerina/tests/producer_client_tests.bal b/ballerina/tests/producer_client_tests.bal index 76e9c4ec..ff35273e 100644 --- a/ballerina/tests/producer_client_tests.bal +++ b/ballerina/tests/producer_client_tests.bal @@ -89,7 +89,7 @@ function producerSendStringTest() returns error? { clientId: "test-consumer-46" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 2); byte[] messageValue = consumerRecords[0].value; string messageConverted = check 'string:fromBytes(messageValue); @@ -150,7 +150,7 @@ function producerFlushTest() returns error? { clientId: "test-consumer-47" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals('string:fromBytes(consumerRecords[0].value), TEST_MESSAGE); check consumer->close(); } @@ -212,7 +212,7 @@ function transactionalProducerTest() returns error? { clientId: "test-consumer-48" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 3, "Expected: 3. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -250,7 +250,7 @@ function transactionalProducerWithAbortTest() returns error? { isolationLevel: ISOLATION_COMMITTED }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 0, "Expected: 0. Received: " + consumerRecords.length().toString()); check consumer->close(); @@ -299,7 +299,7 @@ function saslProducerTest() returns error? { clientId: "test-consumer-49" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -374,7 +374,7 @@ function producerAdditionalPropertiesTest() returns error? { clientId: "test-consumer-50" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -404,7 +404,7 @@ function sslProducerTest() returns error? { clientId: "test-consumer-51" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -447,7 +447,7 @@ function sslCertKeyProducerTest() returns error? { clientId: "test-consumer-52" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -484,7 +484,7 @@ function sslCertOnlyProducerTest() returns error? { clientId: "test-consumer-53" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } @@ -555,7 +555,7 @@ function SSLWithSASLAuthProducerTest() returns error? { clientId: "test-consumer-53" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(5); + BytesConsumerRecord[] consumerRecords = check consumer->poll(5); test:assertEquals(consumerRecords.length(), 1, "Expected: 1. Received: " + consumerRecords.length().toString()); check consumer->close(); } diff --git a/ballerina/tests/producer_data_binding_tests.bal b/ballerina/tests/producer_data_binding_tests.bal index bc40929e..c0656daa 100644 --- a/ballerina/tests/producer_data_binding_tests.bal +++ b/ballerina/tests/producer_data_binding_tests.bal @@ -97,15 +97,15 @@ function intProduceTest() returns error? { clientId: "data-binding-producer-01" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); int receivedValue = 0; int receivedKey = 0; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { string receivedMsg = checkpanic 'string:fromBytes(cRecord.value); receivedValue += checkpanic int:fromString(receivedMsg); - string receivedKeyString = checkpanic 'string:fromBytes(cRecord.'key); + string receivedKeyString = checkpanic 'string:fromBytes(cRecord?.'key); receivedKey += checkpanic int:fromString(receivedKeyString); }); test:assertEquals(receivedValue, 30); @@ -133,15 +133,15 @@ function floatProduceTest() returns error? { clientId: "data-binding-producer-02" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); float receivedValue = 0; float receivedKey = 0; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { string receivedMsg = checkpanic 'string:fromBytes(cRecord.value); receivedValue = checkpanic float:fromString(receivedMsg); - string receivedKeyString = checkpanic 'string:fromBytes(cRecord.'key); + string receivedKeyString = checkpanic 'string:fromBytes(cRecord?.'key); receivedKey = checkpanic float:fromString(receivedKeyString); }); test:assertEquals(receivedValue, 100.9); @@ -169,15 +169,15 @@ function decimalProduceTest() returns error? { clientId: "data-binding-producer-03" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); decimal receivedValue = 0; decimal receivedKey = 0; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { string receivedMsg = checkpanic 'string:fromBytes(cRecord.value); receivedValue += checkpanic decimal:fromString(receivedMsg); - string receivedKeyString = checkpanic 'string:fromBytes(cRecord.'key); + string receivedKeyString = checkpanic 'string:fromBytes(cRecord?.'key); receivedKey += checkpanic decimal:fromString(receivedKeyString); }); test:assertEquals(receivedValue, 30.9d); @@ -205,14 +205,14 @@ function booleanProduceTest() returns error? { clientId: "data-binding-producer-04" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); string receivedValue = "false"; string receivedKey = ""; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { receivedValue = checkpanic 'string:fromBytes(cRecord.value); - receivedKey = checkpanic 'string:fromBytes(cRecord.'key); + receivedKey = checkpanic 'string:fromBytes(cRecord?.'key); }); test:assertEquals(receivedValue, "true"); test:assertEquals(receivedKey, "true"); @@ -239,14 +239,14 @@ function stringProduceTest() returns error? { clientId: "data-binding-producer-05" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); string receivedValue = ""; string receivedKey = ""; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { receivedValue = checkpanic 'string:fromBytes(cRecord.value); - receivedKey = checkpanic 'string:fromBytes(cRecord.'key); + receivedKey = checkpanic 'string:fromBytes(cRecord?.'key); }); test:assertEquals(receivedValue, TEST_MESSAGE); test:assertEquals(receivedKey, TEST_KEY); @@ -274,14 +274,14 @@ function xmlProduceTest() returns error? { clientId: "data-binding-producer-06" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); string[] receivedValues = []; string receivedKey = ""; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { receivedValues.push(checkpanic 'string:fromBytes(cRecord.value)); - receivedKey = checkpanic 'string:fromBytes(cRecord.'key); + receivedKey = checkpanic 'string:fromBytes(cRecord?.'key); }); test:assertEquals(receivedValues, [xmlData.toString(), xmlData.toString(), xmlData.toString()]); test:assertEquals(receivedKey, TEST_KEY); @@ -307,11 +307,11 @@ function recordProduceTest() returns error? { clientId: "data-binding-producer-07" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); Person[] receivedValues = []; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { Person person = checkpanic value:fromJsonStringWithType(checkpanic 'string:fromBytes(cRecord.value)); receivedValues.push(person); }); @@ -338,11 +338,11 @@ function mapProduceTest() returns error? { clientId: "data-binding-producer-08" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); map[] receivedValues = []; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { map personMap = checkpanic value:fromJsonStringWithType(checkpanic 'string:fromBytes(cRecord.value)); receivedValues.push(personMap); }); @@ -371,11 +371,11 @@ function tableProduceTest() returns error? { clientId: "data-binding-producer-09" }; Consumer consumer = check new (DEFAULT_URL, consumerConfiguration); - ConsumerRecord[] consumerRecords = check consumer->poll(3); + BytesConsumerRecord[] consumerRecords = check consumer->poll(3); test:assertEquals(consumerRecords.length(), 3); table[] receivedValues = []; - consumerRecords.forEach(function(ConsumerRecord cRecord) { + consumerRecords.forEach(function(BytesConsumerRecord cRecord) { table personTable = checkpanic value:fromJsonStringWithType(checkpanic 'string:fromBytes(cRecord.value)); receivedValues.push(personTable); }); diff --git a/changelog.md b/changelog.md index afe7ccd6..76a02424 100644 --- a/changelog.md +++ b/changelog.md @@ -3,6 +3,11 @@ This file contains all the notable changes done to the Ballerina Kafka package t The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Removed +- [Removed deprecated `kafka:ConsumerRecord` and `kafka:ProducerRecord`](https://github.com/ballerina-platform/ballerina-library/issues/6359) + ## [3.10.2] - 2024-03-28 ### Fixed diff --git a/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/kafka/plugin/KafkaServiceValidationTest.java b/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/kafka/plugin/KafkaServiceValidationTest.java index 19321586..a71aa75d 100644 --- a/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/kafka/plugin/KafkaServiceValidationTest.java +++ b/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/kafka/plugin/KafkaServiceValidationTest.java @@ -74,7 +74,7 @@ public void testValidService3() { Assert.assertEquals(diagnosticResult.errors().size(), 0); } - @Test(enabled = true, description = "Validating using only kafka:ConsumerRecord[]") + @Test(enabled = true, description = "Validating using only kafka:BytesConsumerRecord[]") public void testValidService4() { Package currentPackage = loadPackage("valid_service_4"); PackageCompilation compilation = currentPackage.getCompilation(); @@ -82,7 +82,7 @@ public void testValidService4() { Assert.assertEquals(diagnosticResult.errors().size(), 0); } - @Test(enabled = true, description = "Validate kafka:Caller and kafka:ConsumerRecord[]") + @Test(enabled = true, description = "Validate kafka:Caller and kafka:BytesConsumerRecord[]") public void testValidService5() { Package currentPackage = loadPackage("valid_service_5"); PackageCompilation compilation = currentPackage.getCompilation(); @@ -90,7 +90,7 @@ public void testValidService5() { Assert.assertEquals(diagnosticResult.errors().size(), 0); } - @Test(enabled = true, description = "Validating readonly kafka:ConsumerRecord[]") + @Test(enabled = true, description = "Validating readonly kafka:BytesConsumerRecord[]") public void testValidService6() { Package currentPackage = loadPackage("valid_service_6"); PackageCompilation compilation = currentPackage.getCompilation(); diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_10/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_10/service.bal index 9f3ae7c0..cebb11b2 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_10/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_10/service.bal @@ -28,7 +28,7 @@ listener kafka:Listener kafkaListener = service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) { + kafka:BytesConsumerRecord[] records) { } resource function get greeting() returns string { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_12/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_12/service.bal index 1b3414ef..da81f6a5 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_12/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_12/service.bal @@ -26,11 +26,11 @@ kafka:ConsumerConfiguration consumerConfigs = { listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs); service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:ConsumerRecord[] records, readonly & kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, readonly & kafka:Caller caller) { } } service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:ConsumerRecord[] records, readonly & kafka:Consumer consumer) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, readonly & kafka:Consumer consumer) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_14/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_14/service.bal index b9849a33..eec92e7b 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_14/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_14/service.bal @@ -30,10 +30,10 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } - remote function onError(kafka:ConsumerRecord[] records) returns error|() { + remote function onError(kafka:BytesConsumerRecord[] records) returns error|() { } } @@ -42,10 +42,10 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } - remote function onError(kafka:ConsumerRecord[] records) returns kafka:Error|() { + remote function onError(kafka:BytesConsumerRecord[] records) returns kafka:Error|() { } } @@ -54,7 +54,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(kafka:Caller caller) returns kafka:Error|error? { @@ -66,7 +66,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records, kafka:Caller caller) { } remote function onError(int value) returns kafka:Error? { @@ -75,9 +75,9 @@ service kafka:Service on kafkaListener { service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records) { } - remote function onError(kafka:ConsumerRecord records) returns error? { + remote function onError(kafka:BytesConsumerRecord records) returns error? { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_15/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_15/service.bal index dd885664..a4f7741b 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_15/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_15/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } remote function onError() { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_16/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_16/service.bal index 4786f31d..1575bc53 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_16/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_16/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } remote function onConsumerError(error 'error) { @@ -42,7 +42,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records, kafka:Caller caller) { } remote function onError(kafka:Error 'error) returns kafka:Error? { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_17/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_17/service.bal index d1a09ffa..4d7f8f7e 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_17/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_17/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } function onError(error 'error) { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_18/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_18/service.bal index ab1d00a5..aec3e699 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_18/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_18/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records) { } resource function get onError(kafka:Error 'error) { @@ -42,12 +42,12 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(kafka:Error 'error) { } - resource function get testResource(kafka:ConsumerRecord[] records) { + resource function get testResource(kafka:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_19/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_19/service.bal index 9f6b390e..13195682 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_19/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_19/service.bal @@ -26,7 +26,7 @@ kafka:ConsumerConfiguration consumerConfigs = { listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs); service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records, string[] data, int value) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records, string[] data, int value) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_2/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_2/service.bal index fa723b06..25104877 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_2/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_2/service.bal @@ -28,7 +28,7 @@ listener kafka:Listener kafkaListener = service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) { + kafka:BytesConsumerRecord[] records) { } remote function someFunction() {} diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_20/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_20/service.bal index 551303fe..e33275a2 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_20/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_20/service.bal @@ -26,7 +26,7 @@ kafka:ConsumerConfiguration consumerConfigs = { listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs); service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records, string & readonly data) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records, string & readonly data) { } } @@ -41,6 +41,6 @@ service kafka:Service on kafkaListener { } service kafka:Service on kafkaListener { - remote function onConsumerRecord(int & readonly data, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(int & readonly data, kafka:BytesConsumerRecord[] & readonly records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_24/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_24/service.bal index 7b643b67..4ed532b4 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_24/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_24/service.bal @@ -27,16 +27,16 @@ listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs) service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records) { } - remote function onError(kafka:Error 'error, kafka:ConsumerRecord records) returns error? { + remote function onError(kafka:Error 'error, kafka:BytesConsumerRecord records) returns error? { } } service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(error 'error, int records) returns error? { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_3/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_3/service.bal index 968dc581..e53d6f54 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_3/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_3/service.bal @@ -28,6 +28,6 @@ listener kafka:Listener kafkaListener = service kafka:Service on kafkaListener { function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) { + kafka:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_4/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_4/service.bal index c08ff4fd..9a63643f 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_4/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_4/service.bal @@ -27,11 +27,11 @@ listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs); service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records, string data) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records, string data) { } } service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records, string data) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records, string data) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_5/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_5/service.bal index 75dcf1eb..b4b8fca0 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_5/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_5/service.bal @@ -27,12 +27,12 @@ listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs); service kafka:Service on kafkaListener { - remote function onConsumerRecord(string caller, kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(string caller, kafka:BytesConsumerRecord[] records) { } } service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Consumer caller, kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(kafka:Consumer caller, kafka:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_6/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_6/service.bal index 5c0e8ef2..31fd3216 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_6/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_6/service.bal @@ -28,7 +28,7 @@ listener kafka:Listener kafkaListener = service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) returns string { + kafka:BytesConsumerRecord[] records) returns string { return "Hello"; } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_8/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_8/service.bal index af040375..a34dbb7f 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_8/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_8/service.bal @@ -27,7 +27,7 @@ listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs); service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records, anydata data) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records, anydata data) { } } @@ -37,7 +37,7 @@ service kafka:Service on kafkaListener { } service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records, kafka:ConsumerRecord[] data) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records, kafka:BytesConsumerRecord[] data) { } } @@ -52,6 +52,6 @@ service kafka:Service on kafkaListener { } service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:ConsumerRecord[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:BytesConsumerRecord[] data) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_9/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_9/service.bal index 953bda6f..e8b4f278 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_9/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_service_9/service.bal @@ -30,6 +30,6 @@ listener kafka:Listener kafkaListener2 = service kafka:Service on kafkaListener1, kafkaListener2 { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) { + kafka:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_1/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_1/service.bal index fbf92b08..7df3b805 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_1/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_1/service.bal @@ -28,30 +28,30 @@ listener kafka:Listener kafkaListener = service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) { + kafka:BytesConsumerRecord[] records) { } } service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) returns error? { + kafka:BytesConsumerRecord[] records) returns error? { } } service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) returns kafka:Error? { + kafka:BytesConsumerRecord[] records) returns kafka:Error? { } } service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) returns ()|kafka:Error { + kafka:BytesConsumerRecord[] records) returns ()|kafka:Error { } } service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) returns ()|error { + kafka:BytesConsumerRecord[] records) returns ()|error { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_13/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_13/service.bal index d11dd8ea..c78ea3e3 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_13/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_13/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, int[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, int[] data) { } remote function onError(kafka:Error 'error, kafka:Caller caller) returns error? { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_14/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_14/service.bal index 78c0930f..eafe1474 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_14/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_14/service.bal @@ -33,6 +33,6 @@ service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_2/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_2/service.bal index 02f40348..02754088 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_2/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_2/service.bal @@ -27,6 +27,6 @@ listener foo:Listener kafkaListener = new (foo:DEFAULT_URL, consumerConfigs); service foo:Service on kafkaListener { - remote function onConsumerRecord(foo:Caller caller, foo:ConsumerRecord[] records) { + remote function onConsumerRecord(foo:Caller caller, foo:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_3/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_3/service.bal index ef8052a2..8b362b4a 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_3/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_3/service.bal @@ -31,6 +31,6 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_4/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_4/service.bal index df7734b0..377dd4e1 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_4/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_4/service.bal @@ -31,6 +31,6 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_5/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_5/service.bal index 27c6b37f..e46d51de 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_5/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_5/service.bal @@ -30,6 +30,6 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_6/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_6/service.bal index a3b45e41..a0edb087 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_6/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_6/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } } @@ -39,7 +39,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records) { } } @@ -48,7 +48,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records, kafka:Caller caller) { } } @@ -57,6 +57,6 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records) { } } diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_7/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_7/service.bal index c21c1e5e..087ebecc 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_7/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_7/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } remote function onError(kafka:Error 'error) { @@ -42,7 +42,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(error 'error) { @@ -54,7 +54,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records, kafka:Caller caller) { } remote function onError(kafka:Error 'error) { @@ -63,7 +63,7 @@ service kafka:Service on kafkaListener { service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(kafka:Error 'error) { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_8/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_8/service.bal index b349930b..e45379f7 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_8/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_8/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } remote function onError(kafka:Error 'error) returns error|() { @@ -42,7 +42,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records) { } remote function onError(kafka:Error 'error) returns kafka:Error|() { @@ -54,7 +54,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(error 'error) returns kafka:Error|error? { @@ -66,7 +66,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] & readonly records, kafka:Caller caller) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] & readonly records, kafka:Caller caller) { } remote function onError(kafka:Error 'error) returns kafka:Error? { @@ -75,7 +75,7 @@ service kafka:Service on kafkaListener { service kafka:Service on kafkaListener { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] & readonly records) { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] & readonly records) { } remote function onError(kafka:Error 'error) returns error? { diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_9/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_9/service.bal index 3798228e..3a07384d 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_9/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_service_9/service.bal @@ -30,7 +30,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, int[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, int[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -42,7 +42,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, byte[][] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, byte[][] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -54,7 +54,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, string[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, string[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -66,7 +66,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, Person[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, Person[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -78,7 +78,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, map[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, map[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -90,7 +90,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, table[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, table[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -102,7 +102,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, json[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, json[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -114,7 +114,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, xml[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, xml[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -126,7 +126,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, kafka:Caller caller, anydata[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller, anydata[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -138,7 +138,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, int[] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, int[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -150,7 +150,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(kafka:ConsumerRecord[] records, byte[][] data) { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records, byte[][] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -210,7 +210,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records, kafka:Caller caller, json[] data) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records, kafka:Caller caller, json[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -222,7 +222,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records, kafka:Caller caller, xml[] data) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records, kafka:Caller caller, xml[] data) { } remote function onError(kafka:Error 'error) returns error|() { @@ -234,7 +234,7 @@ service kafka:Service on kafkaListener { private final string var1 = "Kafka Service"; private final int var2 = 54; - remote function onConsumerRecord(readonly & kafka:ConsumerRecord[] records, anydata[] data) { + remote function onConsumerRecord(readonly & kafka:BytesConsumerRecord[] records, anydata[] data) { } remote function onError(kafka:Error 'error) returns error|() { diff --git a/compiler-plugin-tests/src/test/resources/expected_sources/service_1/result.bal b/compiler-plugin-tests/src/test/resources/expected_sources/service_1/result.bal index 1645116e..f92be5cd 100644 --- a/compiler-plugin-tests/src/test/resources/expected_sources/service_1/result.bal +++ b/compiler-plugin-tests/src/test/resources/expected_sources/service_1/result.bal @@ -1,7 +1,7 @@ import ballerinax/kafka; service kafka:Service on new kafka:Listener("localhost:9090") { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns kafka:Error? { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns kafka:Error? { } } diff --git a/compiler-plugin-tests/src/test/resources/expected_sources/service_2/result.bal b/compiler-plugin-tests/src/test/resources/expected_sources/service_2/result.bal index 06d3f393..9bb4a8ba 100644 --- a/compiler-plugin-tests/src/test/resources/expected_sources/service_2/result.bal +++ b/compiler-plugin-tests/src/test/resources/expected_sources/service_2/result.bal @@ -3,7 +3,7 @@ import ballerinax/kafka; service kafka:Service on new kafka:Listener("localhost:9090") { int x = 5; string y = "xx"; - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns kafka:Error? { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns kafka:Error? { } } diff --git a/compiler-plugin-tests/src/test/resources/expected_sources/service_3/result.bal b/compiler-plugin-tests/src/test/resources/expected_sources/service_3/result.bal index 30408edc..78e2ff69 100644 --- a/compiler-plugin-tests/src/test/resources/expected_sources/service_3/result.bal +++ b/compiler-plugin-tests/src/test/resources/expected_sources/service_3/result.bal @@ -1,7 +1,7 @@ import ballerinax/kafka; service kafka:Service on new kafka:Listener("localhost:9090") { - remote function onConsumerRecord(kafka:ConsumerRecord[] records) returns kafka:Error? { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records) returns kafka:Error? { } } diff --git a/compiler-plugin-tests/src/test/resources/expected_sources/service_4/result.bal b/compiler-plugin-tests/src/test/resources/expected_sources/service_4/result.bal index bb31eb74..e39485ca 100644 --- a/compiler-plugin-tests/src/test/resources/expected_sources/service_4/result.bal +++ b/compiler-plugin-tests/src/test/resources/expected_sources/service_4/result.bal @@ -3,7 +3,7 @@ import ballerinax/kafka; service kafka:Service on new kafka:Listener("localhost:9090") { int x = 5; string y = "xx"; - remote function onConsumerRecord(kafka:ConsumerRecord[] records) returns kafka:Error? { + remote function onConsumerRecord(kafka:BytesConsumerRecord[] records) returns kafka:Error? { } } diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithCallerParameter.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithCallerParameter.java index 362cbb99..b2de4ffc 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithCallerParameter.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithCallerParameter.java @@ -50,7 +50,7 @@ public class KafkaCodeTemplateWithCallerParameter implements CodeAction { private static final String REMOTE_FUNCTION_TEXT = LS + "\tremote function onConsumerRecord(kafka:Caller caller, " + - "kafka:ConsumerRecord[] records) returns kafka:Error? {" + LS + LS + "\t}" + LS; + "kafka:BytesConsumerRecord[] records) returns kafka:Error? {" + LS + LS + "\t}" + LS; @Override public List supportedDiagnosticCodes() { diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithoutCallerParameter.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithoutCallerParameter.java index fe600bcf..8913e0cc 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithoutCallerParameter.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/KafkaCodeTemplateWithoutCallerParameter.java @@ -50,7 +50,7 @@ public class KafkaCodeTemplateWithoutCallerParameter implements CodeAction { private static final String REMOTE_FUNCTION_TEXT = LS + "\tremote function onConsumerRecord(" + - "kafka:ConsumerRecord[] records) returns kafka:Error? {" + LS + LS + "\t}" + LS; + "kafka:BytesConsumerRecord[] records) returns kafka:Error? {" + LS + LS + "\t}" + LS; @Override public List supportedDiagnosticCodes() { diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java index c96c80bd..e5bf8234 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/kafka/plugin/PluginConstants.java @@ -31,7 +31,6 @@ public class PluginConstants { // parameters public static final String CALLER = "Caller"; - public static final String RECORD_PARAM = "ConsumerRecord"; public static final String ERROR_PARAM = "Error"; public static final String PAYLOAD_ANNOTATION = "kafka:Payload "; diff --git a/examples/order-manager/notification-service/notification_service.bal b/examples/order-manager/notification-service/notification_service.bal index 40cc55cd..34f628c4 100644 --- a/examples/order-manager/notification-service/notification_service.bal +++ b/examples/order-manager/notification-service/notification_service.bal @@ -34,8 +34,8 @@ listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs) service kafka:Service on kafkaListener { // Listens to Kafka topic for any successful orders - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns error? { - foreach kafka:ConsumerRecord 'record in records { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns error? { + foreach kafka:BytesConsumerRecord 'record in records { // Convert the byte values in the Kafka record to type Order string messageContent = check string:fromBytes('record.value); diff --git a/examples/order-manager/order-processor/order_processor.bal b/examples/order-manager/order-processor/order_processor.bal index 1811513f..2084c5b6 100644 --- a/examples/order-manager/order-processor/order_processor.bal +++ b/examples/order-manager/order-processor/order_processor.bal @@ -37,7 +37,7 @@ listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfigs) service kafka:Service on kafkaListener { // Listens to Kafka topic for any new orders and process them - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns error? { // Uses Ballerina query expressions to filter out the successful orders and publish to Kafka topic error? err = from types:Order 'order in check getOrdersFromRecords(records) where 'order.status == types:SUCCESS do { log:printInfo("Sending successful order to " + PUBLISH_TOPIC + " " + 'order.toString()); @@ -50,9 +50,9 @@ service kafka:Service on kafkaListener { } // Convert the byte values in Kafka records to type Order[] -function getOrdersFromRecords(kafka:ConsumerRecord[] records) returns types:Order[]|error { +function getOrdersFromRecords(kafka:BytesConsumerRecord[] records) returns types:Order[]|error { types:Order[] receivedOrders = []; - foreach kafka:ConsumerRecord 'record in records { + foreach kafka:BytesConsumerRecord 'record in records { string messageContent = check string:fromBytes('record.value); json jsonContent = check value:fromJsonString(messageContent); json jsonClone = jsonContent.cloneReadOnly(); diff --git a/examples/order-manager/order-processor/tests/order_processor_tests.bal b/examples/order-manager/order-processor/tests/order_processor_tests.bal index 046593fc..8b86d668 100644 --- a/examples/order-manager/order-processor/tests/order_processor_tests.bal +++ b/examples/order-manager/order-processor/tests/order_processor_tests.bal @@ -38,7 +38,7 @@ function orderProcessorTest() returns error? { topics: [PUBLISH_TOPIC] }; kafka:Consumer testConsumer = check new (kafka:DEFAULT_URL, testConsumerConfigs); - kafka:ConsumerRecord[] records = check testConsumer->poll(3); + kafka:BytesConsumerRecord[] records = check testConsumer->poll(3); test:assertEquals(records.length(), 1); diff --git a/examples/order-manager/order-service/tests/order_service_tests.bal b/examples/order-manager/order-service/tests/order_service_tests.bal index 32ab57c0..d378494f 100644 --- a/examples/order-manager/order-service/tests/order_service_tests.bal +++ b/examples/order-manager/order-service/tests/order_service_tests.bal @@ -47,7 +47,7 @@ function orderServiceTest() returns error? { }; kafka:Consumer testConsumer = check new (kafka:DEFAULT_URL, testConsumerConfigs); - kafka:ConsumerRecord[] records = check testConsumer->poll(3); + kafka:BytesConsumerRecord[] records = check testConsumer->poll(3); test:assertEquals(records.length(), 1); diff --git a/examples/secured-word-count-calculator/word-count-calculator/main.bal b/examples/secured-word-count-calculator/word-count-calculator/main.bal index 613b63aa..7011e573 100644 --- a/examples/secured-word-count-calculator/word-count-calculator/main.bal +++ b/examples/secured-word-count-calculator/word-count-calculator/main.bal @@ -92,9 +92,9 @@ service kafka:Service on new kafka:Listener(KAFKA_SECURED_URL, consumerConfigs) self.kafkaProducer = check new (KAFKA_SECURED_URL, producerConfigs); } - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns error? { map countResults; - foreach kafka:ConsumerRecord 'record in records { + foreach kafka:BytesConsumerRecord 'record in records { // Process the input sentences one by one countResults = check processRecord(self.wordCountMap, 'record); } @@ -108,7 +108,7 @@ service kafka:Service on new kafka:Listener(KAFKA_SECURED_URL, consumerConfigs) } } -function processRecord(map wordCountMap, kafka:ConsumerRecord 'record) returns map|error { +function processRecord(map wordCountMap, kafka:BytesConsumerRecord 'record) returns map|error { // A temporary word count map is created to count the words in one sentence map tempWordCountMap = {}; string sentence = check string:fromBytes('record.value); diff --git a/examples/secured-word-count-calculator/word-count-calculator/tests/word_count_calculator_tests.bal b/examples/secured-word-count-calculator/word-count-calculator/tests/word_count_calculator_tests.bal index d7319389..847f1e9d 100644 --- a/examples/secured-word-count-calculator/word-count-calculator/tests/word_count_calculator_tests.bal +++ b/examples/secured-word-count-calculator/word-count-calculator/tests/word_count_calculator_tests.bal @@ -31,7 +31,7 @@ function wordCountCalculatorTest() returns error? { topics: [OUTPUT_TOPIC] }; kafka:Consumer testConsumer = check new (kafka:DEFAULT_URL, testConsumerConfigs); - kafka:ConsumerRecord[] records = check testConsumer->poll(3); + kafka:BytesConsumerRecord[] records = check testConsumer->poll(3); test:assertEquals(records.length(), 7); @@ -45,9 +45,9 @@ function wordCountCalculatorTest() returns error? { "examples": 1 }; - foreach kafka:ConsumerRecord 'record in records { + foreach kafka:BytesConsumerRecord 'record in records { string countValue = check string:fromBytes('record.value); - byte[]? key = 'record["key"]; + anydata? key = 'record["key"]; if key is byte[] { string word = check string:fromBytes(key); int? actualResult = expectedResults[word]; diff --git a/examples/twitter-filter/elasticsearch-consumer/consumer.bal b/examples/twitter-filter/elasticsearch-consumer/consumer.bal index f31a1ba0..db06ed95 100644 --- a/examples/twitter-filter/elasticsearch-consumer/consumer.bal +++ b/examples/twitter-filter/elasticsearch-consumer/consumer.bal @@ -67,7 +67,7 @@ public function main() returns error? { kafka:Service consumerService = service object { remote function onConsumerRecord(kafka:Caller caller, - kafka:ConsumerRecord[] records) returns error? { + kafka:BytesConsumerRecord[] records) returns error? { // The set of tweets received by the service are processed one by one. types:Tweet[] convertedTweets =check getTweetsFromConsumerRecords(records); @@ -89,10 +89,10 @@ service object { } }; -function getTweetsFromConsumerRecords(kafka:ConsumerRecord[] records) returns types:Tweet[]|error { +function getTweetsFromConsumerRecords(kafka:BytesConsumerRecord[] records) returns types:Tweet[]|error { types:Tweet[] tweets = []; foreach int i in 0 ..< records.length() { - kafka:ConsumerRecord kafkaRecord = records[i]; + kafka:BytesConsumerRecord kafkaRecord = records[i]; // The value should be a `byte[]` since the byte[] deserializer is used for the value. byte[] value = kafkaRecord.value; diff --git a/load-tests/simple_producer_consumer/src/main.bal b/load-tests/simple_producer_consumer/src/main.bal index 804ca55f..35cc5774 100644 --- a/load-tests/simple_producer_consumer/src/main.bal +++ b/load-tests/simple_producer_consumer/src/main.bal @@ -119,7 +119,7 @@ function startListener() returns error? { kafka:Service kafkaService = service object { - remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns error? { + remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns error? { foreach var consumerRecord in records { string|error messageContent = 'string:fromBytes(consumerRecord.value); if messageContent is error { diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java index 30033d11..6389d15f 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaConstants.java @@ -52,8 +52,6 @@ private KafkaConstants() { public static final String PAYLOAD_BINDING_ERROR = "PayloadBindingError"; public static final String PAYLOAD_VALIDATION_ERROR = "PayloadValidationError"; - public static final String AVRO_GENERIC_RECORD_NAME = "AvroGenericRecord"; - public static final String CONSUMER_RECORD_STRUCT_NAME = "ConsumerRecord"; public static final String CALLER_STRUCT_NAME = "Caller"; public static final String TYPE_CHECKER_OBJECT_NAME = "TypeChecker"; public static final String SERVER_CONNECTOR = "serverConnector";