Skip to content

Commit

Permalink
feat(kafka): support for sasl certs
Browse files Browse the repository at this point in the history
  • Loading branch information
pogromistik committed Apr 3, 2024
1 parent 2987365 commit 1ab2b88
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/Sitko.Core.Kafka/KafkaConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers.DistributionStrategies;
using SecurityProtocol = KafkaFlow.Configuration.SecurityProtocol;

namespace Sitko.Core.Kafka;

Expand Down Expand Up @@ -66,6 +67,10 @@ public void Build(IKafkaConfigurationBuilder builder, KafkaModuleOptions options
information.SaslUsername = options.SaslUserName;
information.SaslMechanism = options.SaslMechanisms;
information.SecurityProtocol = options.SecurityProtocol;
if (information.SecurityProtocol == SecurityProtocol.SaslSsl)
{
information.SslCaLocation = options.GetSaslCertPath();
}
});
}
if (!ensureOffsets)
Expand Down
8 changes: 8 additions & 0 deletions src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ await adminClient.CreateTopicsAsync(new[]
consumerConfig.SaslUsername = options.SaslUserName;
consumerConfig.SaslMechanism = (SaslMechanism?)options.SaslMechanisms;
consumerConfig.SecurityProtocol = (SecurityProtocol?)options.SecurityProtocol;
if (consumerConfig.SecurityProtocol == SecurityProtocol.SaslSsl)
{
consumerConfig.SslCaLocation = options.GetSaslCertPath();
}
}
var cts = new CancellationTokenSource();
using var confluentConsumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
Expand Down Expand Up @@ -154,6 +158,10 @@ private IAdminClient GetAdminClient(KafkaModuleOptions options)
adminClientConfig.SaslUsername = options.SaslUserName;
adminClientConfig.SaslMechanism = (SaslMechanism?)options.SaslMechanisms;
adminClientConfig.SecurityProtocol = (SecurityProtocol?)options.SecurityProtocol;
if (adminClientConfig.SecurityProtocol == SecurityProtocol.SaslSsl)
{
adminClientConfig.SslCaLocation = options.GetSaslCertPath();
}
}

var adminClient = new AdminClientBuilder(adminClientConfig)
Expand Down
16 changes: 14 additions & 2 deletions src/Sitko.Core.Kafka/KafkaModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class KafkaModuleOptions : BaseModuleOptions
public string[] Brokers { get; set; } = Array.Empty<string>();
public TimeSpan SessionTimeout { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan MaxPollInterval { get; set; } = TimeSpan.FromMinutes(5);

public bool UseSaslAuth { get; set; }
public string SaslUserName { get; set; } = "";
public string SaslPassword { get; set; } = "";
public string SaslCertBase64 { get; set; } = "";
public KafkaFlow.Configuration.SaslMechanism SaslMechanisms { get; set; } = KafkaFlow.Configuration.SaslMechanism.ScramSha512;
public SecurityProtocol? SecurityProtocol { get; set; } = KafkaFlow.Configuration.SecurityProtocol.Plaintext;
public int MaxPartitionFetchBytes { get; set; } = 5 * 1024 * 1024;
Expand All @@ -70,10 +70,22 @@ public class KafkaModuleOptions : BaseModuleOptions
public bool EnableIdempotence { get; set; } = true;
public bool SocketNagleDisable { get; set; } = true;
public Acks Acks { get; set; } = Acks.All;

public string GetSaslCertPath()
{
var cert = Convert.FromBase64String(SaslCertBase64);
var path = Path.GetTempFileName();
File.WriteAllBytes(path, cert);
return path;
}
}

public class KafkaModuleOptionsValidator : AbstractValidator<KafkaModuleOptions>
{
public KafkaModuleOptionsValidator() =>
public KafkaModuleOptionsValidator()
{
RuleFor(options => options.Brokers).NotEmpty().WithMessage("Specify Kafka brokers");
RuleFor(options => options.SaslCertBase64).NotEmpty()
.When(options => options.SecurityProtocol == SecurityProtocol.SaslSsl).WithMessage("Specify kafka sasl certificate");
}
}

0 comments on commit 1ab2b88

Please sign in to comment.