Skip to content

Commit

Permalink
Fix error handling and logging in Kafka components
Browse files Browse the repository at this point in the history
Enhanced error handling in Worker.cs to catch consume exceptions and log warnings. Added logging to TriggerWorkflows.cs for better traceability and fixed issues with expression evaluation. Also updated predicate in ConsumerWorkflow.cs and set CanStartWorkflow to true.
  • Loading branch information
sfmskywalker committed Nov 22, 2024
1 parent b534b42 commit 44bc1d8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/apps/Elsa.Server.Web/Workflows/ConsumerWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ protected override void Build(IWorkflowBuilder builder)
{
ConsumerDefinitionId = new("consumer-1"),
Topics = new(["topic-1"]),
Predicate = new(JavaScriptExpression.Create("message => message.OrderId == '1'")),
Predicate = new(JavaScriptExpression.Create("getMessage().OrderId == '1'")),
Result = new(message),
CanStartWorkflow = false
CanStartWorkflow = true
},
new WriteLine(c => JsonSerializer.Serialize(message.Get(c)))
}
Expand Down
27 changes: 20 additions & 7 deletions src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Options;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Elsa.Kafka.Handlers;
Expand All @@ -21,7 +22,8 @@ public class TriggerWorkflows(
ICorrelationStrategy correlationStrategy,
IExpressionEvaluator expressionEvaluator,
IOptions<KafkaOptions> options,
IServiceProvider serviceProvider) : INotificationHandler<TransportMessageReceived>
IServiceProvider serviceProvider,
ILogger<TriggerWorkflows> logger) : INotificationHandler<TransportMessageReceived>
{
private static readonly string MessageReceivedActivityTypeName = ActivityTypeNameHelper.GenerateTypeName<MessageReceived>();

Expand Down Expand Up @@ -137,7 +139,7 @@ private async Task<IEnumerable<BookmarkBinding>> GetMatchingBookmarkBindingsAsyn

var correlationId = GetCorrelationId(transportMessage);
var workflowInstanceId = GetWorkflowInstanceId(transportMessage);

foreach (var binding in boundBookmarks)
{
var stimulus = binding.Stimulus;
Expand Down Expand Up @@ -176,13 +178,24 @@ private async Task<bool> EvaluatePredicateAsync(KafkaTransportMessage transportM
return true;

var memory = new MemoryRegister();
var messageVariable = new Variable("message", transportMessage);
var message = transportMessage;
var transportMessageVariable = new Variable("transportMessage", transportMessage);
var messageVariable = new Variable("message", transportMessage.Value);
var expressionExecutionContext = new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken);
messageVariable.Set(expressionExecutionContext, message);
return await expressionEvaluator.EvaluateAsync<bool>(predicate, expressionExecutionContext);

transportMessageVariable.Set(expressionExecutionContext, transportMessage);
messageVariable.Set(expressionExecutionContext, transportMessage.Value);

try
{
return await expressionEvaluator.EvaluateAsync<bool>(predicate, expressionExecutionContext);
}
catch (Exception e)
{
logger.LogWarning(e, "An error occurred while evaluating the predicate for stimulus {Stimulus}", stimulus);
return false;
}
}

private string? GetWorkflowInstanceId(KafkaTransportMessage transportMessage)
{
var key = options.Value.WorkflowInstanceIdHeaderKey;
Expand Down
36 changes: 27 additions & 9 deletions src/modules/Elsa.Kafka/Implementations/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,44 @@ private void Subscribe(IEnumerable<string> topics)
throw new InvalidOperationException("The worker is not running.");

var topicList = topics.ToHashSet();

if (topicList.SetEquals(_subscribedTopics))
return;

_subscribedTopics = topicList.ToHashSet();
consumer.Subscribe(_subscribedTopics);

logger.LogInformation("Subscribed to topics: {Topics}", string.Join(", ", _subscribedTopics));
}

private async Task RunAsync(CancellationToken cancellationToken)
{
var consumeExceptionCount = 0;
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);

if (consumeResult.IsPartitionEOF)
continue;

await ProcessMessageAsync(consumeResult, cancellationToken);
try
{
var consumeResult = consumer.Consume(cancellationToken);
consumeExceptionCount = 0;

if (consumeResult.IsPartitionEOF)
continue;

await ProcessMessageAsync(consumeResult, cancellationToken);
}
catch (ConsumeException e)
{
logger.LogWarning(e, "Error consuming message.");
consumeExceptionCount++;

if (consumeExceptionCount > 100)
throw new InvalidOperationException("Too many consume exceptions.");
}
catch (OperationCanceledException)
{
logger.LogInformation("Consumer was cancelled.");
break;
}
}

consumer.Unsubscribe();
Expand Down

0 comments on commit 44bc1d8

Please sign in to comment.