Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support rabbitmq headers #997

Merged
merged 7 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "rabbitmq"
version = "3.1.0"
version = "3.1.1"
authors = ["Ballerina"]
keywords = ["service", "client", "messaging", "network", "pubsub"]
repository = "https://github.com/ballerina-platform/module-ballerinax-rabbitmq"
Expand All @@ -18,8 +18,8 @@ path = "./lib/amqp-client-5.18.0.jar"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "rabbitmq-native"
version = "3.1.0"
path = "../native/build/libs/rabbitmq-native-3.1.0.jar"
version = "3.1.1"
path = "../native/build/libs/rabbitmq-native-3.1.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "rabbitmq-compiler-plugin"
class = "io.ballerina.stdlib.rabbitmq.plugin.RabbitmqCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.1.0.jar"
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.1.1-SNAPSHOT.jar"
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "rabbitmq"
version = "3.1.0"
version = "3.1.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
2 changes: 2 additions & 0 deletions ballerina/rabbitmq_commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ public const TOPIC_EXCHANGE = "topic";
# + contentType - The content type of the message
# + contentEncoding - The content encoding of the message
# + correlationId - The client-specific ID that can be used to mark or identify messages between clients
# + headers - A map of additional arbitrary headers to be included in the message
public type BasicProperties record {|
string replyTo?;
string contentType?;
string contentEncoding?;
string correlationId?;
map<anydata> headers?;
|};

# Additional configurations used to declare a queue.
Expand Down
43 changes: 40 additions & 3 deletions ballerina/tests/rabbitmq_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,43 @@ public isolated function testProducerTransactional() returns error? {
return;
}

@test:Config {
dependsOn: [testClient],
groups: ["rabbitmq"]
}
public isolated function testProducerWithHeaders() returns error? {
string queue = "testProducerWithHeaders";
string message = "Test producing with headers";
Client newClient = check new (DEFAULT_HOST, DEFAULT_PORT);
map<string> headers = {
"header1": "value1",
"header2": "value2"
};
check newClient->queueDeclare(queue);
check newClient->publishMessage({content: message.toBytes(), routingKey: queue, properties: {headers: headers}});
BytesMessage|Error consumeResult = newClient->consumeMessage(queue, false);
if consumeResult is BytesMessage {
string messageContent = check 'string:fromBytes(consumeResult.content);
BasicProperties? properties = consumeResult.properties;

if properties is BasicProperties {
map<anydata>? receivedHeaders = properties.headers;
if receivedHeaders is () {
test:assertFail("No headers received.");
} else {
test:assertEquals(receivedHeaders["header1"], "value1", msg = "Header1 mismatch.");
test:assertEquals(receivedHeaders["header2"], "value2", msg = "Header2 mismatch.");
}
}
log:printInfo("The message received: " + messageContent);
test:assertEquals(messageContent, message, msg = "Message received does not match.");
} else {
test:assertFail("Error when trying to consume messages using client.");
}
check newClient->close();
return;
}

@test:Config {
dependsOn: [testClient],
groups: ["rabbitmq"]
Expand Down Expand Up @@ -418,7 +455,7 @@ public isolated function testDeclareQueueWithArgsNegative() returns error? {
string expectedError = "Error occurred while declaring the queue: Unsupported type in arguments map passed "
+ "while declaring a queue.";
test:assertEquals(result.message(), expectedError,
msg = "Error message mismatch in declaring queue with invalid args.");
msg = "Error message mismatch in declaring queue with invalid args.");
}
return newClient->close();
}
Expand Down Expand Up @@ -594,11 +631,11 @@ public function testListenerQueueDeclareDuplicate() returns error? {
}

@test:Config {
dependsOn: [testListener, testSyncConsumer,testListenerQueueDeclareDuplicate],
dependsOn: [testListener, testSyncConsumer, testListenerQueueDeclareDuplicate],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareDuplicateError() returns error? {
Listener channelListener = check new(DEFAULT_HOST, DEFAULT_PORT);
Listener channelListener = check new (DEFAULT_HOST, DEFAULT_PORT);
if channelListener is Listener {
error? result = channelListener.attach(queueConfigDuplicateError);
test:assertTrue(result is error, msg = "Error expected when declaring same queue with different properties.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PluginConstants {
public static final String MESSAGE_CONTENT_TYPE = "contentType";
public static final String MESSAGE_CONTENT_ENCODING = "contentEncoding";
public static final String MESSAGE_CORRELATION_ID = "correlationId";
public static final String MESSAGE_HEADERS = "headers";

// return types error or nil
public static final String ERROR = "error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_CORRELATION_ID;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_DELIVERY_TAG;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_EXCHANGE;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_HEADERS;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_PROPERTIES;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_REPLY_TO;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_ROUTING_KEY;
Expand Down Expand Up @@ -403,10 +404,11 @@ private boolean validatePropertiesField(TypeSymbol propertiesTypeSymbol) {
propertiesRecordSymbol = (RecordTypeSymbol) propertiesTypeSymbol;
}
Map<String, RecordFieldSymbol> propertiesFieldDescriptors = propertiesRecordSymbol.fieldDescriptors();
if (propertiesFieldDescriptors.size() != 4 || !propertiesFieldDescriptors.containsKey(MESSAGE_REPLY_TO) ||
if (propertiesFieldDescriptors.size() != 5 || !propertiesFieldDescriptors.containsKey(MESSAGE_REPLY_TO) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_CONTENT_TYPE) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_CONTENT_ENCODING) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_CORRELATION_ID)) {
!propertiesFieldDescriptors.containsKey(MESSAGE_CORRELATION_ID) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_HEADERS)) {
return false;
}
if (propertiesFieldDescriptors.get(MESSAGE_REPLY_TO).typeDescriptor().typeKind() != STRING) {
Expand All @@ -421,6 +423,9 @@ private boolean validatePropertiesField(TypeSymbol propertiesTypeSymbol) {
if (propertiesFieldDescriptors.get(MESSAGE_CORRELATION_ID).typeDescriptor().typeKind() != STRING) {
return false;
}
if (propertiesFieldDescriptors.get(MESSAGE_HEADERS).typeDescriptor().typeKind() != MAP) {
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class RabbitMQConstants {
public static final BString ALIAS_CONTENT_TYPE = StringUtils.fromString("contentType");
public static final BString ALIAS_CONTENT_ENCODING = StringUtils.fromString("contentEncoding");
public static final BString ALIAS_CORRELATION_ID = StringUtils.fromString("correlationId");
public static final BString ALIAS_HEADERS = StringUtils.fromString("headers");

private RabbitMQConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;

import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG;
import static io.ballerina.runtime.api.TypeTags.STRING_TAG;
Expand Down Expand Up @@ -138,14 +139,24 @@ public static BMap<BString, Object> createAndPopulateMessageRecord(byte[] messag
String contentType = properties.getContentType();
String contentEncoding = properties.getContentEncoding();
String correlationId = properties.getCorrelationId();
Map<String, Object> headersMap = properties.getHeaders();
BMap<BString, Object> headers = ValueCreator.createMapValue();

if (headersMap != null) {
headersMap.forEach((key, value) -> headers.put(StringUtils.fromString(key), StringUtils.fromString(
value.toString())));
}

BMap<BString, Object> basicProperties =
ValueCreator.createRecordValue(getModule(),
RabbitMQConstants.RECORD_BASIC_PROPERTIES);
Object[] propValues = new Object[4];
Object[] propValues = new Object[5];
propValues[0] = replyTo;
propValues[1] = contentType;
propValues[2] = contentEncoding;
propValues[3] = correlationId;
propValues[4] = headers;

messageRecord.put(StringUtils.fromString(MESSAGE_PROPERTIES_FIELD), ValueCreator
.createRecordValue(basicProperties, propValues));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public static Object publishNative(Environment environment, BObject channelObj,
String contentType = null;
String contentEncoding = null;
String correlationId = null;
Map<String, Object> headers = new HashMap<>();
if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_REPLY_TO)) {
replyTo = basicPropsMap.getStringValue(RabbitMQConstants.ALIAS_REPLY_TO).getValue();
}
Expand All @@ -295,6 +296,14 @@ public static Object publishNative(Environment environment, BObject channelObj,
if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_CORRELATION_ID)) {
correlationId = basicPropsMap.getStringValue(RabbitMQConstants.ALIAS_CORRELATION_ID).getValue();
}
if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_HEADERS)) {
@SuppressWarnings(RabbitMQConstants.UNCHECKED)
BMap<BString, BString> headersMap = (BMap<BString, BString>) basicPropsMap
.getMapValue(RabbitMQConstants.ALIAS_HEADERS);
headersMap.entrySet()
.forEach(entry -> headers.put(entry.getKey().getValue(),
headersMap.getStringValue(entry.getKey()).getValue()));
}
if (replyTo != null) {
builder.replyTo(replyTo);
}
Expand All @@ -307,6 +316,9 @@ public static Object publishNative(Environment environment, BObject channelObj,
if (correlationId != null) {
builder.correlationId(correlationId);
}
if (!headers.isEmpty()) {
builder.headers(headers);
}
}
if (TransactionResourceManager.getInstance().isInTransaction()) {
RabbitMQUtils.handleTransaction(channelObj);
Expand Down
Loading