Skip to content

Commit

Permalink
Added discard() when exception is thrown from RawBatchMessageContaine…
Browse files Browse the repository at this point in the history
…rImpl
  • Loading branch information
heesung-sn committed Nov 7, 2022
1 parent a53b13c commit b4ee527
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ private ByteBuf encrypt(ByteBuf compressedPayload) {
msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata,
compressedPayload.nioBuffer(), targetBuffer);
} catch (PulsarClientException e) {
encryptedPayload.release();
compressedPayload.release();
discard(e);
throw new RuntimeException("Failed to encrypt payload", e);
}
encryptedPayload.writerIndex(targetBuffer.remaining());
Expand Down Expand Up @@ -99,17 +102,23 @@ public ByteBuf toByteBuf() {
EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get();

if (cryptoKeyReader == null) {
throw new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
IllegalStateException ex =
new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
discard(ex);
throw ex;
}

this.encryptionKeys = encryptionContext.getKeys().keySet();
this.msgCrypto =
new MessageCryptoBc(String.format(
"[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
try {
msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
} catch (PulsarClientException.CryptoException e) {
throw new IllegalArgumentException("Failed to set encryption keys", e);
encryptionKeys = encryptionContext.getKeys().keySet();
if (msgCrypto == null) {
msgCrypto =
new MessageCryptoBc(String.format(
"[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
try {
msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
} catch (PulsarClientException.CryptoException e) {
discard(e);
throw new IllegalArgumentException("Failed to set encryption keys", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ <T> CompletableFuture<Boolean> addToCompactedLedger(
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
if (batchMessageContainer.getNumMessagesInBatch() > 0) {
ByteBuf serialized = batchMessageContainer.toByteBuf();
try {
ByteBuf serialized = batchMessageContainer.toByteBuf();
outstanding.acquire();
mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
long start = System.nanoTime();
Expand All @@ -419,6 +419,8 @@ <T> CompletableFuture<Boolean> addToCompactedLedger(
}, null);

} catch (Throwable t) {
log.error("Failed to add entry", t);
batchMessageContainer.discard((Exception) t);
return FutureUtil.failedFuture(t);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.CryptoKeyReader;
Expand Down Expand Up @@ -241,12 +242,42 @@ public void testCreateOpSendMsg() {
container.createOpSendMsg();
}

@Test(expectedExceptions = IllegalStateException.class)
@Test
public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
setEncryptionAndCompression(true, false);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
container.toByteBuf();
Assert.assertEquals(container.getNumMessagesInBatch(), 1);
Throwable e = null;
try {
container.toByteBuf();
} catch (IllegalStateException ex){
e = ex;
}
Assert.assertEquals(e.getClass(), IllegalStateException.class);
Assert.assertEquals(container.getNumMessagesInBatch(), 0);
Assert.assertEquals(container.batchedMessageMetadataAndPayload, null);
}

@Test
public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
setEncryptionAndCompression(true, false);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
container.setCryptoKeyReader(cryptoKeyReader);
encryptKeys = new HashMap<>();
encryptKeys.put(null, null);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
Assert.assertEquals(container.getNumMessagesInBatch(), 1);
Throwable e = null;
try {
container.toByteBuf();
} catch (IllegalArgumentException ex){
e = ex;
}
Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
Assert.assertEquals(container.getNumMessagesInBatch(), 0);
Assert.assertEquals(container.batchedMessageMetadataAndPayload, null);
}
}

0 comments on commit b4ee527

Please sign in to comment.