diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 75b56218431..b8a7b865467 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -646,6 +646,7 @@ private void handleCompressedMessage(final ClientMessageInternal clMessage) thro qbuff.readBytes(body); largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController)); currentLargeMessageController.addPacket(body, body.length, false); + largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); handleRegularMessage(largeMessage); } @@ -674,6 +675,7 @@ public synchronized void handleLargeMessage(final ClientLargeMessageInternal cli if (clientLargeMessage.isCompressed()) { clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController)); + clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); } else { clientLargeMessage.setLargeMessageController(currentLargeMessageController); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index c0a238bf2ac..f3b01a7eca7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -443,10 +443,6 @@ private void largeMessageSendStreamed(final boolean sendBlocking, deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); deflaterReader.setLevel(session.getCompressionLevel()); input = deflaterReader; - } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) { - //This needs to be false if we do not intend to compress the message - //and the header already exists - msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); } long totalSize = 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index 53661c7a8e6..d85ce0a50b8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -29,6 +29,7 @@ import java.util.zip.Deflater; import io.netty.util.internal.PlatformDependent; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -518,52 +519,59 @@ public void testLargeMessageCompressionRestartAndCheckSize() throws Exception { public void testPreviouslyCompressedMessageCleanup() throws Exception { final int messageSize = 1024 * 1024; + byte[] payload = new byte[messageSize]; + byte[] response = new byte[messageSize]; + ActiveMQServer server = createServer(true, isNetty()); server.start(); - ClientSessionFactory sf1 = createSessionFactory(locator); - ClientSession session1 = addClientSession(sf1.createSession(false, true, true)); - session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); - ClientProducer producer = session1.createProducer(ADDRESS); - - ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0"); - locator2.setCompressLargeMessage(false); - ClientSessionFactory sf2 = locator2.createSessionFactory(); - ClientSession session2 = sf2.createSession(false, true, true); - ClientConsumer consumer = session2.createConsumer(ADDRESS); - ClientProducer producer2 = session2.createProducer(ADDRESS); - session2.start(); - - byte[] payload = new byte[messageSize]; - byte[] response = new byte[messageSize]; + server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); for (int i = 0; i < payload.length; i++) { payload[i] = RandomUtil.randomByte(); } - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeBytes(payload); - producer.send(message); + try (ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(true, true); + ClientProducer producer = session.createProducer(ADDRESS)) { - message = consumer.receive(); - assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeBytes(payload); + assertNull(message.getAnnotation(Message.HDR_LARGE_COMPRESSED)); - message.getBodyBuffer().readBytes(response); - message.getBodyBuffer().writeBytes(response); - producer2.send(message); + producer.send(message); + } - message = consumer.receive(); - assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + ServerLocator locator2 = createFactory(isNetty()); + locator2.setCompressLargeMessage(false); + locator2.setMinLargeMessageSize(1024); - message.getBodyBuffer().readBytes(payload); - message.getBodySize(); - assertTrue(Arrays.equals(payload, response)); + try (ClientSessionFactory sf = locator2.createSessionFactory(); + ClientSession session = sf.createSession(true, true); + ClientConsumer consumer = session.createConsumer(ADDRESS); + ClientProducer producer = session.createProducer(ADDRESS)) { + + ClientMessage message = session.createMessage(true); + ICoreMessage serverMessage = server.locateQueue(ADDRESS).browserIterator().next().getMessage().copy().toCore(); + + message.moveHeadersAndProperties(serverMessage); + message.getBodyBuffer().writeBytes(serverMessage.getReadOnlyBodyBuffer(), serverMessage.getBodyBufferSize()); + + assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + + producer.send(message); + session.start(); + + for (int i = 0; i < 2; i++) { + message = consumer.receive(2000); + assertNotNull(message); + + message.getBodyBuffer().readBytes(response); + assertTrue(Arrays.equals(payload, response)); + message.acknowledge(); + } + } - session1.close(); - session2.close(); - sf1.close(); - locator.close(); - sf2.close(); locator2.close(); }