-
Notifications
You must be signed in to change notification settings - Fork 0
/
Orchestrator.cs
127 lines (106 loc) · 4.54 KB
/
Orchestrator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
using eOperator.CommunicationHub.EventStore;
using EsdbEvents;
using EventStore.Client;
using Timeout = System.Threading.Timeout;
namespace EsdbTestProject1;
public class Orchestrator : IHostedService
{
private readonly bool _isSlave;
private readonly ILogger<Orchestrator> _logger;
private readonly string _runId;
private readonly EventStorePublisher _eventStorePublisher;
private readonly IServiceProvider _provider;
private const int _numberOfStreams = 100000;
private int _currentStreamNumber = 0;
private bool _previousStreamIsDone = true;
private Timer _timer;
public Orchestrator(EventStorePublisher eventStorePublisher, IServiceProvider provider, ILogger<Orchestrator> logger, string runId, bool isSlave = true)
{
_isSlave = isSlave;
_logger = logger;
_runId = runId;
_eventStorePublisher = eventStorePublisher;
_provider = provider;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stream Orchestartor is running.");
var timerCallback = RunNextStreamMaster;
if (_isSlave)
{
_timer = new Timer(RunNextStreamSlave, null, TimeSpan.Zero, TimeSpan.FromMilliseconds(10));
}
else
{
_timer = new Timer(RunNextStreamMaster, null, TimeSpan.Zero, TimeSpan.FromMilliseconds(10));
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stream Orchestartor is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
// Async void is okey for event handlers
public async void RunNextStreamMaster(object? _)
{
if(_numberOfStreams <= _currentStreamNumber)
{
await StopAsync(CancellationToken.None);
}
if (_previousStreamIsDone)
{
_previousStreamIsDone = false;
var streamName = CreateStreamName();
_logger.LogInformation("Master is creating a new stream {streamNr}", streamName);
await _eventStorePublisher.PublishAsync(streamName, new PingEvent { StreamNr = _currentStreamNumber });
var streamCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(30));
// We use provider to ger a new SubscriptionHandler every time to simulate how it would work with masstransit consumers
SubscriptionHandler subscriptonHandler = _provider.GetRequiredService<SubscriptionHandler>();
await subscriptonHandler.SubscribeToStream(streamName, (ResolvedEvent evnt, StreamEvent aggregate) =>
{
if(evnt.Event.EventType == "PongEvent")
{
_currentStreamNumber = aggregate.StreamNr + 1; ;
_previousStreamIsDone = true;
streamCancellationTokenSource.Cancel();
}
return Task.CompletedTask;
},
streamCancellationTokenSource.Token);
}
}
// Async void is okey for event handlers
public async void RunNextStreamSlave(object? _)
{
if (_numberOfStreams <= _currentStreamNumber)
{
await StopAsync(CancellationToken.None);
}
if (_previousStreamIsDone)
{
_previousStreamIsDone = false;
var streamName = CreateStreamName();
_logger.LogInformation("Slave is subscribing a new stream {streamNr}", streamName);
var streamCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(30));
// We use provider to ger a new SubscriptionHandler every time to simulate how it would work with masstransit consumers
SubscriptionHandler subscriptonHandler = _provider.GetRequiredService<SubscriptionHandler>();
await subscriptonHandler.SubscribeToStream(streamName, async (ResolvedEvent evnt, StreamEvent aggregate) =>
{
if (evnt.Event.EventType == "PingEvent")
{
await _eventStorePublisher.PublishAsync(streamName, new PongEvent { StreamNr = _currentStreamNumber });
_currentStreamNumber = aggregate.StreamNr + 1; ;
_previousStreamIsDone = true;
streamCancellationTokenSource.Cancel();
}
},
streamCancellationTokenSource.Token);
}
}
private string CreateStreamName()
{
return $"{_runId}-{_currentStreamNumber}";
}
}