Skip to content

Commit

Permalink
[Kafka] Added possibility to add DefaultConnectionString (#4522)
Browse files Browse the repository at this point in the history
* Added possibility to add DefaultConnectionString if connection string in trigger/activity is empty

* Removed unnessecary parameter

---------

Co-authored-by: Yannick Laubscher <[email protected]>
  • Loading branch information
Snotax and yannicklaubscherswt authored Oct 16, 2023
1 parent c4d0a8e commit 4654fa0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Elsa.Activities.Kafka.Configuration;
using Elsa.Activities.Kafka.Helpers;
using Elsa.Activities.Kafka.Services;
Expand All @@ -10,6 +7,9 @@
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Elsa.Activities.Kafka.Activities.SendKafkaMessage
{
Expand All @@ -22,10 +22,12 @@ namespace Elsa.Activities.Kafka.Activities.SendKafkaMessage
public class SendKafkaMessage : Activity
{
private readonly IMessageSenderClientFactory _messageSenderClientFactory;
private readonly KafkaOptions _kafkaOptions;

public SendKafkaMessage(IMessageSenderClientFactory messageSenderClientFactory)
public SendKafkaMessage(IMessageSenderClientFactory messageSenderClientFactory, KafkaOptions kafkaOptions)
{
_messageSenderClientFactory = messageSenderClientFactory;
_kafkaOptions = kafkaOptions;
}

[ActivityInput(
Expand Down Expand Up @@ -62,7 +64,7 @@ public SendKafkaMessage(IMessageSenderClientFactory messageSenderClientFactory)

protected override async ValueTask<IActivityExecutionResult> OnExecuteAsync(ActivityExecutionContext context)
{
var config = new KafkaConfiguration(ConnectionString, Topic, "", Headers, ClientId);
var config = new KafkaConfiguration(String.IsNullOrEmpty(ConnectionString) ? _kafkaOptions.DefaultConnectionString ?? "" : ConnectionString, Topic, "", Headers, ClientId);

var client = await _messageSenderClientFactory.GetSenderAsync(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@ public class KafkaOptions
/// Security protocol to be used for communication with brokers
/// </summary>
public Confluent.Kafka.SecurityProtocol SecurityProtocol { get; set; }

/// <summary>
/// If the kafka activities / triggers don't have a connection string specified (Empty string), use this one instead.
/// </summary>
public string? DefaultConnectionString { get; set; }
}
}
4 changes: 2 additions & 2 deletions src/activities/Elsa.Activities.Kafka/Services/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Task StartProcessing(string topic, string group)

var consumerConfig = new ConsumerConfig(Configuration.Headers)
{
BootstrapServers = Configuration.ConnectionString,
BootstrapServers = String.IsNullOrEmpty(Configuration.ConnectionString) ? _kafkaOptions.DefaultConnectionString ?? "" : Configuration.ConnectionString,
GroupId = group,
AutoOffsetReset = Configuration.AutoOffsetReset,
SaslMechanism = _kafkaOptions.SaslMechanism,
Expand Down Expand Up @@ -77,7 +77,7 @@ public Task StartProcessing(string topic, string group)
}
}, _cancellationToken);
}

return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class WorkerManager : IWorkerManager
private readonly ILogger _logger;
private readonly IBookmarkSerializer _bookmarkSerializer;
private readonly KafkaOptions _kafkaOptions;

public WorkerManager(
IServiceProvider serviceProvider,
ILogger<WorkerManager> logger,
Expand Down Expand Up @@ -113,7 +113,7 @@ private async Task GetOrCreateWorkerAsync(string tag, KafkaConfiguration configu
var worker = _workers.FirstOrDefault(x => x.Topic == configuration.Topic && x.Group == configuration.Group);

// Create worker if not found and a topic and connectionString are provided.
if (worker is null && !IsNullOrEmpty(configuration.Topic) && !IsNullOrEmpty(configuration.ConnectionString))
if (worker is null && !IsNullOrEmpty(configuration.Topic) && (!IsNullOrEmpty(configuration.ConnectionString) || !IsNullOrEmpty(_kafkaOptions.DefaultConnectionString)))
{
worker = ActivatorUtilities.CreateInstance<Worker>(
_serviceProvider,
Expand Down

0 comments on commit 4654fa0

Please sign in to comment.