From 1918dc0f43f13bdcbd7960784242ff4f0d4466ad Mon Sep 17 00:00:00 2001 From: DaAlbrecht Date: Thu, 26 Sep 2024 00:35:38 +0200 Subject: [PATCH 1/5] feat: support rabbitmq headers --- ballerina/rabbitmq_commons.bal | 2 ++ .../ballerina/stdlib/rabbitmq/RabbitMQConstants.java | 1 + .../ballerina/stdlib/rabbitmq/util/ChannelUtils.java | 12 ++++++++++++ 3 files changed, 15 insertions(+) diff --git a/ballerina/rabbitmq_commons.bal b/ballerina/rabbitmq_commons.bal index dba58cb2..c8f80f5d 100644 --- a/ballerina/rabbitmq_commons.bal +++ b/ballerina/rabbitmq_commons.bal @@ -43,11 +43,13 @@ public const TOPIC_EXCHANGE = "topic"; # + contentType - The content type of the message # + contentEncoding - The content encoding of the message # + correlationId - The client-specific ID that can be used to mark or identify messages between clients +# + headers - A map of additional arbitrary headers to be included in the message. public type BasicProperties record {| string replyTo?; string contentType?; string contentEncoding?; string correlationId?; + map headers?; |}; # Additional configurations used to declare a queue. diff --git a/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQConstants.java b/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQConstants.java index f31c4056..568e1213 100644 --- a/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQConstants.java @@ -144,6 +144,7 @@ public class RabbitMQConstants { public static final BString ALIAS_CONTENT_TYPE = StringUtils.fromString("contentType"); public static final BString ALIAS_CONTENT_ENCODING = StringUtils.fromString("contentEncoding"); public static final BString ALIAS_CORRELATION_ID = StringUtils.fromString("correlationId"); + public static final BString ALIAS_HEADERS = StringUtils.fromString("headers"); private RabbitMQConstants() { } diff --git a/native/src/main/java/io/ballerina/stdlib/rabbitmq/util/ChannelUtils.java b/native/src/main/java/io/ballerina/stdlib/rabbitmq/util/ChannelUtils.java index 60e11b4a..f6fd0f11 100644 --- a/native/src/main/java/io/ballerina/stdlib/rabbitmq/util/ChannelUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/rabbitmq/util/ChannelUtils.java @@ -282,6 +282,7 @@ public static Object publishNative(Environment environment, BObject channelObj, String contentType = null; String contentEncoding = null; String correlationId = null; + Map headers = new HashMap<>(); if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_REPLY_TO)) { replyTo = basicPropsMap.getStringValue(RabbitMQConstants.ALIAS_REPLY_TO).getValue(); } @@ -295,6 +296,14 @@ public static Object publishNative(Environment environment, BObject channelObj, if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_CORRELATION_ID)) { correlationId = basicPropsMap.getStringValue(RabbitMQConstants.ALIAS_CORRELATION_ID).getValue(); } + if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_HEADERS)) { + @SuppressWarnings(RabbitMQConstants.UNCHECKED) + BMap headersMap = (BMap) basicPropsMap + .getMapValue(RabbitMQConstants.ALIAS_HEADERS); + headersMap.entrySet() + .forEach(entry -> headers.put(entry.getKey().getValue(), + headersMap.getStringValue(entry.getKey()).getValue())); + } if (replyTo != null) { builder.replyTo(replyTo); } @@ -307,6 +316,9 @@ public static Object publishNative(Environment environment, BObject channelObj, if (correlationId != null) { builder.correlationId(correlationId); } + if (!headers.isEmpty()) { + builder.headers(headers); + } } if (TransactionResourceManager.getInstance().isInTransaction()) { RabbitMQUtils.handleTransaction(channelObj); From ca4f02bebfb4a7b3ad44da85e8d97f6c34948909 Mon Sep 17 00:00:00 2001 From: DaAlbrecht Date: Fri, 27 Sep 2024 01:12:52 +0200 Subject: [PATCH 2/5] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 6 +++--- ballerina/CompilerPlugin.toml | 2 +- ballerina/Dependencies.toml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 0074f4e5..9a70295a 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "rabbitmq" -version = "3.1.0" +version = "3.1.1" authors = ["Ballerina"] keywords = ["service", "client", "messaging", "network", "pubsub"] repository = "https://github.com/ballerina-platform/module-ballerinax-rabbitmq" @@ -18,8 +18,8 @@ path = "./lib/amqp-client-5.18.0.jar" [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" artifactId = "rabbitmq-native" -version = "3.1.0" -path = "../native/build/libs/rabbitmq-native-3.1.0.jar" +version = "3.1.1" +path = "../native/build/libs/rabbitmq-native-3.1.1-SNAPSHOT.jar" [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index cc653dda..f058e27c 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "rabbitmq-compiler-plugin" class = "io.ballerina.stdlib.rabbitmq.plugin.RabbitmqCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.1.0.jar" +path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.1.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 0958eea6..7f49c584 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -389,7 +389,7 @@ modules = [ [[package]] org = "ballerinax" name = "rabbitmq" -version = "3.1.0" +version = "3.1.1" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "crypto"}, From a6ab6b85c04cd02cbdb8c9ba701b8d20370c6d90 Mon Sep 17 00:00:00 2001 From: DaAlbrecht Date: Fri, 27 Sep 2024 02:07:51 +0200 Subject: [PATCH 3/5] feat: consume messages with headers, chore: add unit test --- ballerina/tests/rabbitmq_client_tests.bal | 43 +++++++++++++++++-- .../stdlib/rabbitmq/RabbitMQUtils.java | 13 +++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/ballerina/tests/rabbitmq_client_tests.bal b/ballerina/tests/rabbitmq_client_tests.bal index 8d7e3ac3..4ef45a7d 100644 --- a/ballerina/tests/rabbitmq_client_tests.bal +++ b/ballerina/tests/rabbitmq_client_tests.bal @@ -379,6 +379,43 @@ public isolated function testProducerTransactional() returns error? { return; } +@test:Config { + dependsOn: [testClient], + groups: ["rabbitmq"] +} +public isolated function testProducerWithHeaders() returns error? { + string queue = "testProducerWithHeaders"; + string message = "Test producing with headers"; + Client newClient = check new (DEFAULT_HOST, DEFAULT_PORT); + map headers = { + "header1": "value1", + "header2": "value2" + }; + check newClient->queueDeclare(queue); + check newClient->publishMessage({content: message.toBytes(), routingKey: queue, properties: {headers: headers}}); + BytesMessage|Error consumeResult = newClient->consumeMessage(queue, false); + if consumeResult is BytesMessage { + string messageContent = check 'string:fromBytes(consumeResult.content); + BasicProperties? properties = consumeResult.properties; + + if properties is BasicProperties { + map? receivedHeaders = properties.headers; + if receivedHeaders is () { + test:assertFail("No headers received."); + } else { + test:assertEquals(receivedHeaders["header1"], "value1", msg = "Header1 mismatch."); + test:assertEquals(receivedHeaders["header2"], "value2", msg = "Header2 mismatch."); + } + } + log:printInfo("The message received: " + messageContent); + test:assertEquals(messageContent, message, msg = "Message received does not match."); + } else { + test:assertFail("Error when trying to consume messages using client."); + } + check newClient->close(); + return; +} + @test:Config { dependsOn: [testClient], groups: ["rabbitmq"] @@ -418,7 +455,7 @@ public isolated function testDeclareQueueWithArgsNegative() returns error? { string expectedError = "Error occurred while declaring the queue: Unsupported type in arguments map passed " + "while declaring a queue."; test:assertEquals(result.message(), expectedError, - msg = "Error message mismatch in declaring queue with invalid args."); + msg = "Error message mismatch in declaring queue with invalid args."); } return newClient->close(); } @@ -594,11 +631,11 @@ public function testListenerQueueDeclareDuplicate() returns error? { } @test:Config { - dependsOn: [testListener, testSyncConsumer,testListenerQueueDeclareDuplicate], + dependsOn: [testListener, testSyncConsumer, testListenerQueueDeclareDuplicate], groups: ["rabbitmq"] } public function testListenerQueueDeclareDuplicateError() returns error? { - Listener channelListener = check new(DEFAULT_HOST, DEFAULT_PORT); + Listener channelListener = check new (DEFAULT_HOST, DEFAULT_PORT); if channelListener is Listener { error? result = channelListener.attach(queueConfigDuplicateError); test:assertTrue(result is error, msg = "Error expected when declaring same queue with different properties."); diff --git a/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQUtils.java b/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQUtils.java index 772a244c..97c408b0 100644 --- a/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/rabbitmq/RabbitMQUtils.java @@ -43,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Map; import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; import static io.ballerina.runtime.api.TypeTags.STRING_TAG; @@ -138,14 +139,24 @@ public static BMap createAndPopulateMessageRecord(byte[] messag String contentType = properties.getContentType(); String contentEncoding = properties.getContentEncoding(); String correlationId = properties.getCorrelationId(); + Map headersMap = properties.getHeaders(); + BMap headers = ValueCreator.createMapValue(); + + if (headersMap != null) { + headersMap.forEach((key, value) -> headers.put(StringUtils.fromString(key), StringUtils.fromString( + value.toString()))); + } + BMap basicProperties = ValueCreator.createRecordValue(getModule(), RabbitMQConstants.RECORD_BASIC_PROPERTIES); - Object[] propValues = new Object[4]; + Object[] propValues = new Object[5]; propValues[0] = replyTo; propValues[1] = contentType; propValues[2] = contentEncoding; propValues[3] = correlationId; + propValues[4] = headers; + messageRecord.put(StringUtils.fromString(MESSAGE_PROPERTIES_FIELD), ValueCreator .createRecordValue(basicProperties, propValues)); } From 263d42a3b6720adc567c312b1a0b156fb050199b Mon Sep 17 00:00:00 2001 From: aashikam Date: Fri, 27 Sep 2024 21:56:18 +0530 Subject: [PATCH 4/5] Fix compiler plugin test failure --- .../stdlib/rabbitmq/plugin/PluginConstants.java | 1 + .../rabbitmq/plugin/RabbitmqFunctionValidator.java | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/PluginConstants.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/PluginConstants.java index 380ca20c..0b63900d 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/PluginConstants.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/PluginConstants.java @@ -44,6 +44,7 @@ public class PluginConstants { public static final String MESSAGE_CONTENT_TYPE = "contentType"; public static final String MESSAGE_CONTENT_ENCODING = "contentEncoding"; public static final String MESSAGE_CORRELATION_ID = "correlationId"; + public static final String MESSAGE_HEADERS = "headers"; // return types error or nil public static final String ERROR = "error"; diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/RabbitmqFunctionValidator.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/RabbitmqFunctionValidator.java index 7cd87a98..577a158a 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/RabbitmqFunctionValidator.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/rabbitmq/plugin/RabbitmqFunctionValidator.java @@ -107,6 +107,7 @@ import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_CORRELATION_ID; import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_DELIVERY_TAG; import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_EXCHANGE; +import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_HEADERS; import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_PROPERTIES; import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_REPLY_TO; import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_ROUTING_KEY; @@ -403,10 +404,11 @@ private boolean validatePropertiesField(TypeSymbol propertiesTypeSymbol) { propertiesRecordSymbol = (RecordTypeSymbol) propertiesTypeSymbol; } Map propertiesFieldDescriptors = propertiesRecordSymbol.fieldDescriptors(); - if (propertiesFieldDescriptors.size() != 4 || !propertiesFieldDescriptors.containsKey(MESSAGE_REPLY_TO) || + if (propertiesFieldDescriptors.size() != 5 || !propertiesFieldDescriptors.containsKey(MESSAGE_REPLY_TO) || !propertiesFieldDescriptors.containsKey(MESSAGE_CONTENT_TYPE) || !propertiesFieldDescriptors.containsKey(MESSAGE_CONTENT_ENCODING) || - !propertiesFieldDescriptors.containsKey(MESSAGE_CORRELATION_ID)) { + !propertiesFieldDescriptors.containsKey(MESSAGE_CORRELATION_ID) || + !propertiesFieldDescriptors.containsKey(MESSAGE_HEADERS)) { return false; } if (propertiesFieldDescriptors.get(MESSAGE_REPLY_TO).typeDescriptor().typeKind() != STRING) { @@ -421,6 +423,9 @@ private boolean validatePropertiesField(TypeSymbol propertiesTypeSymbol) { if (propertiesFieldDescriptors.get(MESSAGE_CORRELATION_ID).typeDescriptor().typeKind() != STRING) { return false; } + if (propertiesFieldDescriptors.get(MESSAGE_HEADERS).typeDescriptor().typeKind() != MAP) { + return false; + } return true; } From bc171605874238635d4cd2d5632df7aededa5ccd Mon Sep 17 00:00:00 2001 From: DAA <42379074+DaAlbrecht@users.noreply.github.com> Date: Fri, 27 Sep 2024 20:11:57 +0200 Subject: [PATCH 5/5] Update ballerina/rabbitmq_commons.bal Co-authored-by: Arshika Mohottige --- ballerina/rabbitmq_commons.bal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballerina/rabbitmq_commons.bal b/ballerina/rabbitmq_commons.bal index c8f80f5d..6dda1ec5 100644 --- a/ballerina/rabbitmq_commons.bal +++ b/ballerina/rabbitmq_commons.bal @@ -43,7 +43,7 @@ public const TOPIC_EXCHANGE = "topic"; # + contentType - The content type of the message # + contentEncoding - The content encoding of the message # + correlationId - The client-specific ID that can be used to mark or identify messages between clients -# + headers - A map of additional arbitrary headers to be included in the message. +# + headers - A map of additional arbitrary headers to be included in the message public type BasicProperties record {| string replyTo?; string contentType?;