Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add End-to-End encryption support #216

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/DotPulsar/Abstractions/ICryptoKeyReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Abstractions;

using DotPulsar.Internal.Encryption;
using DotPulsar.Internal.PulsarApi;

public interface ICryptoKeyReader
{
/// <summary>
/// Return the encryption (public) key corresponding to the key name in the argument.
/// <p>
/// This method should be implemented to return the key in byte array. This method will be
/// called at the time of producer creation as well as consumer receiving messages.
/// Hence, application should not make any blocking calls within the implementation.
/// </p>
/// </summary>
/// <param name="keyName">Unique name to identify the key</param>
/// <returns>byte array of the public key value</returns>
EncryptionKeyInfo GetPublicKey(String keyName);

/// <summary>
/// Return the decryption (private) key corresponding to the key name in the argument.
/// </summary>
/// <param name="keyName">Unique name to identify the key</param>
/// <returns>byte array of the private key value</returns>
(byte[], List<KeyValue>) GetPrivateKey(String keyName);
}
10 changes: 10 additions & 0 deletions src/DotPulsar/Abstractions/IProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public interface IProducerBuilder<TMessage>
/// </summary>
IProducerBuilder<TMessage> CompressionType(CompressionType compressionType);

/// <summary>
/// Set the crypto key reader.
/// </summary>
IProducerBuilder<TMessage> CryptoKeyReader(ICryptoKeyReader cryptoKeyReader);

/// <summary>
/// Add the name of the public key used to encrypt the session key. There can be multiple public keys.
/// </summary>
IProducerBuilder<TMessage> AddEncryptionKey(string keyName);

/// <summary>
/// Set the initial sequence id. The default is 0.
/// </summary>
Expand Down
39 changes: 39 additions & 0 deletions src/DotPulsar/ConsumerCryptoFailureAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar;

/// <summary>
/// Possible actions that the consumer can take when a message is encrypted and the consumer fails to decrypt the message.
/// </summary>
public enum ConsumerCryptoFailureAction
{
/// <summary>
/// Message is not acknowledged and the operation fails.
/// </summary>
Fail = 0,

/// <summary>
/// Message is silently acknowledged and not delivered to the application.
/// </summary>
Discard = 1,

/// <summary>
/// Deliver the encrypted message to the application. It's the application's
/// responsibility to decrypt the message. If message is also compressed,
/// decompression will fail. If message contain batch messages, client will
/// not be able to retrieve individual messages in the batch.
/// </summary>
Consume = 2
}
33 changes: 33 additions & 0 deletions src/DotPulsar/ConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ public sealed class ConsumerOptions<TMessage>
/// </summary>
public static readonly int DefaultPriorityLevel = 0;

/// <summary>
/// The default encryption keys.
/// </summary>
public static readonly List<string> DefaultEncryptionKeys = new();

/// <summary>
/// The default crypto key reader.
/// </summary>
public static readonly ICryptoKeyReader DefaultCryptoKeyReader = null;

/// <summary>
/// The default action to take when message decryption fails.
/// </summary>
public static readonly ConsumerCryptoFailureAction DefaultCryptoFailureAction = ConsumerCryptoFailureAction.Fail;

/// <summary>
/// The default of whether to read compacted.
/// </summary>
Expand All @@ -59,6 +74,9 @@ public ConsumerOptions(string subscriptionName, string topic, ISchema<TMessage>
InitialPosition = DefaultInitialPosition;
PriorityLevel = DefaultPriorityLevel;
MessagePrefetchCount = DefaultMessagePrefetchCount;
EncryptionKeys = DefaultEncryptionKeys;
CryptoKeyReader = DefaultCryptoKeyReader;
CryptoFailureAction = DefaultCryptoFailureAction;
ReadCompacted = DefaultReadCompacted;
ReplicateSubscriptionState = DefaultReplicateSubscriptionState;
SubscriptionType = DefaultSubscriptionType;
Expand Down Expand Up @@ -88,6 +106,21 @@ public ConsumerOptions(string subscriptionName, string topic, ISchema<TMessage>
/// </summary>
public int PriorityLevel { get; set; }

/// <summary>
/// Set the encryption keys.
/// </summary>
public List<string> EncryptionKeys { get; set; }

/// <summary>
/// Set the crypto key reader
/// </summary>
public ICryptoKeyReader? CryptoKeyReader { get; set; }

/// <summary>
/// Set the action to take when a crypto operation fails. The default is 'ConsumerCryptoFailureAction.Fail'.
/// </summary>
public ConsumerCryptoFailureAction CryptoFailureAction { get; set; }

/// <summary>
/// Whether to read from the compacted topic. The default is 'false'.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/DotPulsar.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0</TargetFrameworks>
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0</TargetFrameworks>
<LangVersion>12.0</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
Expand Down
25 changes: 25 additions & 0 deletions src/DotPulsar/Exceptions/CryptoException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Exceptions;

/// <summary>
/// Error while trying to encrypt or decrypt a message.
/// </summary>
public class CryptoException : DotPulsarException
{
public CryptoException(string message) : base(message) { }

public CryptoException(string message, Exception innerException) : base(message, innerException) { }
}
12 changes: 12 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageCrypto.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace DotPulsar.Internal.Abstractions;

using DotPulsar.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System.Buffers;

public interface IMessageCrypto
{
ReadOnlySequence<byte> Decrypt(ReadOnlySequence<byte> data);

(ReadOnlySequence<byte>, byte[], List<EncryptionKeys>) Encrypt(List<string> encryptionKeyNames, ICryptoKeyReader cryptoKeyReader, ReadOnlySequence<byte> data);
}
18 changes: 17 additions & 1 deletion src/DotPulsar/Internal/ConsumerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Encryption;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;

Expand All @@ -29,6 +30,7 @@ public sealed class ConsumerChannel<TMessage> : IConsumerChannel<TMessage>
private readonly CommandFlow _cachedCommandFlow;
private readonly IMessageFactory<TMessage> _messageFactory;
private readonly IDecompress?[] _decompressors;
private readonly IMessageCrypto _messageCrypto;
private readonly AsyncLock _lock;
private readonly string _topic;
private uint _sendWhenZero;
Expand All @@ -52,12 +54,13 @@ public ConsumerChannel(
_topic = topic;

_decompressors = new IDecompress[5];

foreach (var decompressorFactory in decompressorFactories)
{
_decompressors[(int) decompressorFactory.CompressionType] = decompressorFactory.Create();
}

_messageCrypto = new MessageCrypto();

_lock = new AsyncLock();

_cachedCommandFlow = new CommandFlow
Expand Down Expand Up @@ -98,6 +101,19 @@ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellatio
var metadata = messagePackage.ExtractMetadata(metadataSize);
var data = messagePackage.ExtractData(metadataSize);

if (metadata.EncryptionKeys.Count > 0)
{
try
{
data = _messageCrypto.Decrypt(data);
}
catch
{
await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecryptionError, cancellationToken).ConfigureAwait(false);
continue;
}
}

if (metadata.Compression != CompressionType.None)
{
var decompressor = _decompressors[(int) metadata.Compression];
Expand Down
Loading