-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
End to End Encryption Support - Java client #731
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just did a very brief initial pass. Will look it thoroughly later today.
Have you looked at other existing crypto libraries, such as https://github.com/google/tink that support both C++ and Java and also have abstractions and hooks for key management?
@@ -34,6 +34,11 @@ message KeyValue { | |||
required string value = 2; | |||
} | |||
|
|||
message KeyByteValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string
and bytes
types are effectively identical. We could use the KeyValue
message type here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. One of the recommendation is to use byte array when keys are stored. Since it's encrypted key, we should be ok with string. Will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in both cases you get the ByteString
which is essentially a byte array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting the encrypted key byte to String require it be encoded(e.g: Base64). For an encrypted key of size 256, the encoded value is 344 bytes long. Since this is sent in every message, it will unnecessarily increase the size. Unless there is a better way to convert byte array to string and back, I would prefer to use bytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to used base64. In protobuf wire protocol, bytes
and string
are exactly the same type and they're also exposed in the same way in the Java API through ByteString
class. ByteString
can hold both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, ByteString can hold both. However, in our proto, the the "value" in KeyValue is defined as string, which can't hold ByteString.
In this use case, the result of encrypted key is a byte array which can be easily converted to ByteString without additional bytes. However, converting to String would require additional bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, just checked with Andrews and the type is different in Java (string
--> String
and bytes
--> ByteString
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Matteo. Let us keep the KeyByteValue since it's efficient.
@@ -60,6 +65,12 @@ message MessageMetadata { | |||
//optional sfixed64 checksum = 10; | |||
// differentiate single and batch message metadata | |||
optional int32 num_messages_in_batch = 11 [default = 1]; | |||
// Comman separated encryption key name(s) used to encrypt data key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If encryption_keys
is effectively a map, the comment should no mention "comma-separated"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed to update the comment. Will update it.
@@ -24,6 +24,8 @@ | |||
import java.io.Serializable; | |||
import java.util.concurrent.TimeUnit; | |||
|
|||
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be used
@@ -110,6 +121,7 @@ enum ProtocolVersion { | |||
v8 = 8; // Added CommandConsumerStats - Client fetches broker side consumer stats | |||
v9 = 9; // Added end of topic notification | |||
v10 = 10;// Added proxy to broker | |||
v11 = 11;// Added encryption_key metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to bump up the protocol version if only adding new fields. Unknown fields are anyway ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
ClientCnx currentCnx) { | ||
|
||
if (msgMetadata.getEncryptionKeysCount() == 0) { | ||
return payload.retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to retain()
payload in this method? because I don't see it released for this retain()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unnecessary as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to put it back since encryptedPayload is released explicitly.
if (conf.getCryptoKeyReader() == null) { | ||
|
||
if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) { | ||
log.warn("CryptoKeyReader interface is not implemented. Consuming encrypted message."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also log subscription
and consumerName
in msg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
// Create msgCrypto if not created already | ||
if (this.msgCrypto == null) { | ||
this.msgCrypto = new MessageCrypto(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though this method will not be called concurrently but as this method is not thread-safe, should we initialize msgCrypto
in constructor if conf.getCryptoKeyReader()
present.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to create it on demand only when the incoming message has a key. But I agree, it's cleaner to do it in constructor.
} | ||
|
||
log.error("[{}][{}] Failed to decrypt message {}", topic, subscription, messageId); | ||
if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as this logic is duplicate, so should we create a method with this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly different message printed. Since it's just 6 lines, I would leave them.
|
||
} | ||
|
||
private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageCrypto.class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed.
} | ||
|
||
optional ValidationError validation_error = 4; | ||
optional ValidationError validation_error = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to change field value here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it supposed to be set to a value outside the ValidationError enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK - It is the field number, not the value - so needs to remain 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we send BatchDeSerializeError if consumer received invalid message?
|
||
} catch (NoSuchAlgorithmException | NoSuchProviderException | NoSuchPaddingException e) { | ||
|
||
cipher = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may not need it as cipher
is anyway null here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KeyGenerator.getInstance() exception is also handled here. At that point, cipher is already initialized.
try { | ||
addPublicKeyCipher(keyName, keyReader); | ||
} catch (CryptoException e) { | ||
throw new RuntimeException(e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw Checked exception else, caller of this method ProducerImpl
is not catching the exception and couldn't complete the future. So, client will keep waiting of the producer-create future.??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed.
private boolean decryptDataKey(String keyName, byte[] encryptedDataKey, CryptoKeyReader keyReader) { | ||
|
||
// Prefix "private-key." into key name and read the key value using callback | ||
String privKeyName = "private-key." + keyName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason of adding prefix? do we have to document this convention so, CryptoKeyReader
can implement it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need it to differentiate the public/private keys. I will update the doc.
ecParam = ECNamedCurveTable.getByOID(ecOID); | ||
if (ecParam == null) { | ||
throw new PEMException("Unable to find EC Parameter for the given curve oid: " | ||
+ ((ASN1ObjectIdentifier) pemObj).getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can avoid duplicate casting ecOID.getId()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will fix it.
try (PEMParser pemReader = new PEMParser(keyReader)) { | ||
X9ECParameters ecParam = null; | ||
|
||
Object pemObj = pemReader.readObject(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PEMParser. readObject()
can return null
so, should we handle null
value ?
if (keyName == null) { | ||
return false; | ||
} | ||
encryptedDataKey.remove(keyName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return encryptedDataKey.remove(keyName) == null
?
throws PulsarClientException { | ||
|
||
if (encKeys.isEmpty()) { | ||
return payload.retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason of retaining the payload here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary.
try { | ||
encryptedPayload = msgCrypto.encrypt(conf.getEncryptionKeys(), msgMetadata, compressedPayload); | ||
} catch (PulsarClientException e) { | ||
// TODO: Provide option to fail the request vs. proceed with warning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added warning msg.
// Convert key from byte to PivateKey | ||
PrivateKey privateKey; | ||
try { | ||
privateKey = loadPrivateKey(keyValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we cache this privateKey for a given key-name. Because, for a given key-name, value of the key will not be changed. Else we will end up calling loadPrivateKey
for alternate message if we have multiple producers (because each producer has different dataKey which will fail at line: 479 and it forces to recalculate dataKey
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could cache it. However, the data key is decrypted at the beginning and when the key changes, which is for every 100k/1m messages or every 4 hours(yet to add this logic).
Another reason is to be able to handle cases where the key value itself is changed at run time. If we cache it, we also have to store the key and verify if it's same.
Lastly keeping memory footprint of private key is a bad idea since memory dump could have its trace and could be a security issue.
if (dataKey != null) { | ||
ByteBuf decryptedData = decryptData(msgMetadata, payload); | ||
// If decryption succeeded, data is non null | ||
if (decryptedData != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep cache for dataKey
?
eg. if we have two producers and each has different dataKey then here on every msg coming from different producer will not be able to decrypt data and requires to derive dataKey again.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Implemented cache with time based expiry based on last access.
b2f6f83
to
d1faece
Compare
retest this please |
@merlimat @rdhabalia Do you have any more comments? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gave it one look - seems good so far
Will go through MessageCrypto.java again before approving.
FAIL, // This is the default option to fail consume until crypto succeeds | ||
DISCARD, // Message is silently acknowledged and not delivered to the application | ||
CONSUME // Deliver the encrypted message to the application. It's the application's | ||
// responsibility to decrypt the message. If message is also compressed, decompression will fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to mention what will happen in case of batch messages??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It's already explained in the design.
* Returns true if encryption keys are added | ||
* | ||
*/ | ||
public boolean isEncryptionEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this method is required in Configuration class because usually we just have setters and getters here.
return decryptedData; | ||
} | ||
|
||
log.error("[{}][{}] Failed to decrypt message {}", topic, subscription, messageId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant logging-
Don't require this line if you add messageId in the log messages below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
* If batch messaging is enabled, the batched message is encrypted. | ||
* | ||
*/ | ||
public void addEncryptionKey(String key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to mention that this option is not to be used if batch messaging is enables and add appropriate asserts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch message will work as long as proper keys are used. It's a problem only for cases with decryption failure, which is not very different from non batch message failure.
} | ||
|
||
optional ValidationError validation_error = 4; | ||
optional ValidationError validation_error = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK - It is the field number, not the value - so needs to remain 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - few minor suggestions
import java.security.spec.InvalidKeySpecException; | ||
import java.util.List; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executors; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few imports not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
Reader keyReader = new StringReader(new String(keyBytes)); | ||
PublicKey publicKey = null; | ||
try (org.bouncycastle.openssl.PEMParser pemReader = new org.bouncycastle.openssl.PEMParser(keyReader)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for full class names????
pemObj = pemReader.readObject(); | ||
} | ||
|
||
if (pemObj instanceof org.bouncycastle.cert.X509CertificateHolder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for full class names????
ECPublicKeySpec keySpec = new ECPublicKeySpec(((BCECPublicKey) publicKey).getQ(), ecSpec); | ||
publicKey = (PublicKey) keyFactory.generatePublic(keySpec); | ||
} | ||
} catch (IOException | NoSuchAlgorithmException | NoSuchProviderException | InvalidKeySpecException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not let this exception pass through - why to create a new one??
*/ | ||
package org.apache.pulsar.client.api; | ||
|
||
public interface CryptoKeyReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should mention that in case of Consumer the input will be "private-key." + keyName
and "public-key." + keyName
in case of producer
Because otherwise, the user will confuse it for the key name it gives in ProducerCong.addEncryptionKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I have copied it from the doc here.
|
||
// Update message metadata with encrypted data key | ||
encKeys.forEach(keyName -> { | ||
msgMetadata.addEncryptionKeys(KeyByteValue.newBuilder().setKey(keyName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be add assertion to ensure the key name is in the encryptedDataKeyMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
KeyByteValue keyByteValue = encKeys.stream().filter(kbv -> { | ||
|
||
byte[] encDataKey = kbv.getValue().toByteArray(); | ||
if (!decryptDataKey(kbv.getKey(), encDataKey, keyReader)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return decryptDataKey(kbv.getKey(), encDataKey, keyReader);
retest this please |
a73aae5
to
72a9c18
Compare
retest this please |
this.msgCrypto = new MessageCrypto(true); | ||
|
||
// Regenerate data key cipher at fixed interval | ||
keyGenExecutor = Executors.newSingleThreadScheduledExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we are not closing this executor.??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
|
||
// Regenerate data key cipher at fixed interval | ||
keyGenExecutor = Executors.newSingleThreadScheduledExecutor(); | ||
keyGenExecutor.scheduleWithFixedDelay(new Runnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead we can directly use lambda
keyGenExecutor.scheduleWithFixedDela(()->{
//logic
}, 0L, 4L, TimeUnit.HOURS);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adressed
80d376d
to
0e36311
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestions
try { | ||
encryptedPayload = msgCrypto.encrypt(conf.getEncryptionKeys(), msgMetadata, compressedPayload); | ||
} catch (PulsarClientException e) { | ||
// TODO: Provide option to fail the request vs. proceed with warning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove TODO
producerCreatedFuture.completeExceptionally(e); | ||
} | ||
} | ||
}, 0L, 4L, TimeUnit.HOURS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this 4 hour be configurable in case the client is too paranoid and want to change the symmetric key more frequently?
import io.netty.buffer.ByteBuf; | ||
import io.netty.buffer.PooledByteBufAllocator; | ||
|
||
public class MessageCrypto { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a suggestion - Have a different class for Producer and Consumer Message Crypto since except for secureRandom the two flow have nothing in common:-
a. Producer uses loadPublicKey and encrypt function
b. Consumer uses loadPrivateKey and decrypt* functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not separate it. One provider should do both encryption and decryption. eg: CompressionCodec
, CheckSumProvider
, Encoder
.
* called at the time of producer creation as well as consumer receiving messages. | ||
* Hence, application should not make any blocking calls within the implementation. | ||
* While reading public key from the producer, the key name will have prefix "public-key." added to it. | ||
* Similarly, to get private key from consumer, prefix "private-key." is used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a thought : instead adding prefix business, should we have two apis
byte [] getPrivateKey(String keyName);
byte [] getPublicKey(String keyName);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this proposal. @msb-at-yahoo Do you have any objection?
try { | ||
msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader()); | ||
} catch (CryptoException e) { | ||
if (!producerCreatedFuture.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log warn
message, else there is no way to figure out if key stopped getting generated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added warn msg.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM.. just few last minor comments..
*/ | ||
public void addEncryptionKey(String key) { | ||
if (this.encryptionKeys == null) { | ||
this.encryptionKeys = new ConcurrentOpenHashSet<String>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by default concurrency is 16 for ConcurrentOpenHashSet
which we may not need here. should we make it this.encryptionKeys = new ConcurrentOpenHashSet<>(256, 1);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could even start with 16 entries, since most use case won't have more than one key.
public MessageCrypto(boolean keyGenNeeded) { | ||
|
||
encryptedDataKeyMap = new ConcurrentHashMap<String, byte[]>(); | ||
dataKeyCache = CacheBuilder.newBuilder().expireAfterAccess(6, TimeUnit.HOURS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if producer regenerates key at every 4 hours and should we also cache up to 4 hours only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would require few secs/mins more than 4hrs for ideal use case.
// Taking a small performance hit here if the hash collides. When it | ||
// retruns a different key decryption fails. At this point, we would | ||
// call decryptDataKey to refresh the cache and come here again to decrypt. | ||
decryptedData = decryptData(storedSecretKey, msgMetadata, payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if key
is not present then dataKeyCache
returns null
. so, instead decryptData(..)
with null key, can't we just return if storedSecretKey==null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may use Optional
if we are returning null intentionally..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null is also returned when decryption fails for other reasons. Also even if it fails with one key, we would attempt to decrypt using the remaining keys.
Motivation
Added Java client changes to encrypt messages published to pulsar. Once encrypted, it only allows the consumer with the right set of keys to be able to decrypt the original message published by the producer.
Modifications
Changes are implemented as per PIP-4.
Addresses issue #633