diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index 5b4e281deb88be..43fa7d6a89aa30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -67,6 +67,9 @@ private ByteBuf encrypt(ByteBuf compressedPayload) { msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata, compressedPayload.nioBuffer(), targetBuffer); } catch (PulsarClientException e) { + encryptedPayload.release(); + compressedPayload.release(); + discard(e); throw new RuntimeException("Failed to encrypt payload", e); } encryptedPayload.writerIndex(targetBuffer.remaining()); @@ -99,17 +102,23 @@ public ByteBuf toByteBuf() { EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get(); if (cryptoKeyReader == null) { - throw new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided."); + IllegalStateException ex = + new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided."); + discard(ex); + throw ex; } - this.encryptionKeys = encryptionContext.getKeys().keySet(); - this.msgCrypto = - new MessageCryptoBc(String.format( - "[%s] [%s]", topicName, "RawBatchMessageContainer"), true); - try { - msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader); - } catch (PulsarClientException.CryptoException e) { - throw new IllegalArgumentException("Failed to set encryption keys", e); + encryptionKeys = encryptionContext.getKeys().keySet(); + if (msgCrypto == null) { + msgCrypto = + new MessageCryptoBc(String.format( + "[%s] [%s]", topicName, "RawBatchMessageContainer"), true); + try { + msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader); + } catch (PulsarClientException.CryptoException e) { + discard(e); + throw new IllegalArgumentException("Failed to set encryption keys", e); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java index 624998ea72ce89..92c229e6af2055 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java @@ -402,8 +402,8 @@ CompletableFuture addToCompactedLedger( CompletableFuture bkf = new CompletableFuture<>(); if (m == null || batchMessageContainer.add((MessageImpl) m, null)) { if (batchMessageContainer.getNumMessagesInBatch() > 0) { - ByteBuf serialized = batchMessageContainer.toByteBuf(); try { + ByteBuf serialized = batchMessageContainer.toByteBuf(); outstanding.acquire(); mxBean.addCompactionWriteOp(topic, serialized.readableBytes()); long start = System.nanoTime(); @@ -419,6 +419,8 @@ CompletableFuture addToCompactedLedger( }, null); } catch (Throwable t) { + log.error("Failed to add entry", t); + batchMessageContainer.discard((Exception) t); return FutureUtil.failedFuture(t); } } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java index 4e216c7356117e..9fa834a166cda1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -241,12 +242,42 @@ public void testCreateOpSendMsg() { container.createOpSendMsg(); } - @Test(expectedExceptions = IllegalStateException.class) + @Test public void testToByteBufWithEncryptionWithoutCryptoKeyReader() { setEncryptionAndCompression(true, false); RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1); String topic = "my-topic"; container.add(createMessage(topic, "hi-1", 0), null); - container.toByteBuf(); + Assert.assertEquals(container.getNumMessagesInBatch(), 1); + Throwable e = null; + try { + container.toByteBuf(); + } catch (IllegalStateException ex){ + e = ex; + } + Assert.assertEquals(e.getClass(), IllegalStateException.class); + Assert.assertEquals(container.getNumMessagesInBatch(), 0); + Assert.assertEquals(container.batchedMessageMetadataAndPayload, null); + } + + @Test + public void testToByteBufWithEncryptionWithInvalidEncryptKeys() { + setEncryptionAndCompression(true, false); + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1); + container.setCryptoKeyReader(cryptoKeyReader); + encryptKeys = new HashMap<>(); + encryptKeys.put(null, null); + String topic = "my-topic"; + container.add(createMessage(topic, "hi-1", 0), null); + Assert.assertEquals(container.getNumMessagesInBatch(), 1); + Throwable e = null; + try { + container.toByteBuf(); + } catch (IllegalArgumentException ex){ + e = ex; + } + Assert.assertEquals(e.getClass(), IllegalArgumentException.class); + Assert.assertEquals(container.getNumMessagesInBatch(), 0); + Assert.assertEquals(container.batchedMessageMetadataAndPayload, null); } }