Skip to content

Commit

Permalink
ARTEMIS-4185 - Revision on sending already compressed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist authored and clebertsuconic committed Jan 22, 2024
1 parent fbfe81e commit f56595b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ private void handleCompressedMessage(final ClientMessageInternal clMessage) thro
qbuff.readBytes(body);
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
currentLargeMessageController.addPacket(body, body.length, false);
largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);

handleRegularMessage(largeMessage);
}
Expand Down Expand Up @@ -674,6 +675,7 @@ public synchronized void handleLargeMessage(final ClientLargeMessageInternal cli

if (clientLargeMessage.isCompressed()) {
clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
} else {
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,6 @@ private void largeMessageSendStreamed(final boolean sendBlocking,
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader;
} else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
//This needs to be false if we do not intend to compress the message
//and the header already exists
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}

long totalSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.zip.Deflater;

import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand Down Expand Up @@ -518,52 +519,59 @@ public void testLargeMessageCompressionRestartAndCheckSize() throws Exception {
public void testPreviouslyCompressedMessageCleanup() throws Exception {
final int messageSize = 1024 * 1024;

byte[] payload = new byte[messageSize];
byte[] response = new byte[messageSize];

ActiveMQServer server = createServer(true, isNetty());
server.start();

ClientSessionFactory sf1 = createSessionFactory(locator);
ClientSession session1 = addClientSession(sf1.createSession(false, true, true));
session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session1.createProducer(ADDRESS);

ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
locator2.setCompressLargeMessage(false);
ClientSessionFactory sf2 = locator2.createSessionFactory();
ClientSession session2 = sf2.createSession(false, true, true);
ClientConsumer consumer = session2.createConsumer(ADDRESS);
ClientProducer producer2 = session2.createProducer(ADDRESS);
session2.start();

byte[] payload = new byte[messageSize];
byte[] response = new byte[messageSize];
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));

for (int i = 0; i < payload.length; i++) {
payload[i] = RandomUtil.randomByte();
}

ClientMessage message = session1.createMessage(true);
message.getBodyBuffer().writeBytes(payload);
producer.send(message);
try (ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true);
ClientProducer producer = session.createProducer(ADDRESS)) {

message = consumer.receive();
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(payload);
assertNull(message.getAnnotation(Message.HDR_LARGE_COMPRESSED));

message.getBodyBuffer().readBytes(response);
message.getBodyBuffer().writeBytes(response);
producer2.send(message);
producer.send(message);
}

message = consumer.receive();
assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
ServerLocator locator2 = createFactory(isNetty());
locator2.setCompressLargeMessage(false);
locator2.setMinLargeMessageSize(1024);

message.getBodyBuffer().readBytes(payload);
message.getBodySize();
assertTrue(Arrays.equals(payload, response));
try (ClientSessionFactory sf = locator2.createSessionFactory();
ClientSession session = sf.createSession(true, true);
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientProducer producer = session.createProducer(ADDRESS)) {

ClientMessage message = session.createMessage(true);
ICoreMessage serverMessage = server.locateQueue(ADDRESS).browserIterator().next().getMessage().copy().toCore();

message.moveHeadersAndProperties(serverMessage);
message.getBodyBuffer().writeBytes(serverMessage.getReadOnlyBodyBuffer(), serverMessage.getBodyBufferSize());

assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));

producer.send(message);
session.start();

for (int i = 0; i < 2; i++) {
message = consumer.receive(2000);
assertNotNull(message);

message.getBodyBuffer().readBytes(response);
assertTrue(Arrays.equals(payload, response));
message.acknowledge();
}
}

session1.close();
session2.close();
sf1.close();
locator.close();
sf2.close();
locator2.close();
}

Expand Down

0 comments on commit f56595b

Please sign in to comment.