-
Notifications
You must be signed in to change notification settings - Fork 2k
/
OrleansTaskScheduler.cs
321 lines (280 loc) · 13.9 KB
/
OrleansTaskScheduler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
namespace Orleans.Runtime.Scheduler
{
internal class OrleansTaskScheduler
{
private static readonly Action<IWorkItem> ExecuteWorkItemAction = workItem => workItem.Execute();
private static readonly WaitCallback ExecuteWorkItemCallback = obj => ((IWorkItem)obj).Execute();
private readonly ILogger logger;
private readonly SchedulerStatisticsGroup schedulerStatistics;
private readonly IOptions<StatisticsOptions> statisticsOptions;
private readonly ConcurrentDictionary<IGrainContext, WorkItemGroup> workgroupDirectory = new();
private readonly ILogger<WorkItemGroup> workItemGroupLogger;
private readonly ILogger<ActivationTaskScheduler> activationTaskSchedulerLogger;
private readonly CancellationTokenSource cancellationTokenSource;
private bool applicationTurnsStopped;
internal static TimeSpan TurnWarningLengthThreshold { get; set; }
// This is the maximum number of pending work items for a single activation before we write a warning log.
internal int MaxPendingItemsSoftLimit { get; private set; }
public OrleansTaskScheduler(
IOptions<SchedulingOptions> options,
ILoggerFactory loggerFactory,
SchedulerStatisticsGroup schedulerStatistics,
IOptions<StatisticsOptions> statisticsOptions)
{
this.schedulerStatistics = schedulerStatistics;
this.statisticsOptions = statisticsOptions;
this.logger = loggerFactory.CreateLogger<OrleansTaskScheduler>();
this.workItemGroupLogger = loggerFactory.CreateLogger<WorkItemGroup>();
this.activationTaskSchedulerLogger = loggerFactory.CreateLogger<ActivationTaskScheduler>();
this.cancellationTokenSource = new CancellationTokenSource();
this.SchedulingOptions = options.Value;
applicationTurnsStopped = false;
TurnWarningLengthThreshold = options.Value.TurnWarningLengthThreshold;
this.MaxPendingItemsSoftLimit = options.Value.MaxPendingWorkItemsSoftLimit;
this.StoppedWorkItemGroupWarningInterval = options.Value.StoppedActivationWarningInterval;
IntValueStatistic.FindOrCreate(StatisticNames.SCHEDULER_WORKITEMGROUP_COUNT, workgroupDirectory.LongCount);
if (!schedulerStatistics.CollectShedulerQueuesStats) return;
FloatValueStatistic.FindOrCreate(new StatisticName(StatisticNames.QUEUES_QUEUE_SIZE_AVERAGE_PER_QUEUE, "Scheduler.LevelTwo.Average"), AverageRunQueueLengthLevelTwo);
FloatValueStatistic.FindOrCreate(new StatisticName(StatisticNames.QUEUES_ENQUEUED_PER_QUEUE, "Scheduler.LevelTwo.Average"), AverageEnqueuedLevelTwo);
FloatValueStatistic.FindOrCreate(new StatisticName(StatisticNames.QUEUES_AVERAGE_ARRIVAL_RATE_PER_QUEUE, "Scheduler.LevelTwo.Average"), AverageArrivalRateLevelTwo);
FloatValueStatistic.FindOrCreate(new StatisticName(StatisticNames.QUEUES_QUEUE_SIZE_AVERAGE_PER_QUEUE, "Scheduler.LevelTwo.Sum"), SumRunQueueLengthLevelTwo);
FloatValueStatistic.FindOrCreate(new StatisticName(StatisticNames.QUEUES_ENQUEUED_PER_QUEUE, "Scheduler.LevelTwo.Sum"), SumEnqueuedLevelTwo);
FloatValueStatistic.FindOrCreate(new StatisticName(StatisticNames.QUEUES_AVERAGE_ARRIVAL_RATE_PER_QUEUE, "Scheduler.LevelTwo.Sum"), SumArrivalRateLevelTwo);
}
public TimeSpan StoppedWorkItemGroupWarningInterval { get; }
public SchedulingOptions SchedulingOptions { get; }
private float AverageRunQueueLengthLevelTwo() => Average(g => g.AverageQueueLength);
private float AverageEnqueuedLevelTwo() => Average(g => g.NumEnqueuedRequests);
private float AverageArrivalRateLevelTwo() => Average(g => g.ArrivalRate);
private float SumRunQueueLengthLevelTwo() => workgroupDirectory.Sum(g => g.Value.AverageQueueLength);
private float SumEnqueuedLevelTwo() => workgroupDirectory.Sum(g => g.Value.NumEnqueuedRequests);
private float SumArrivalRateLevelTwo() => workgroupDirectory.Sum(g => g.Value.ArrivalRate);
private float Average(Func<WorkItemGroup, float> stat)
{
double sum = 0;
var count = 0;
foreach (var kv in workgroupDirectory)
{
sum += stat(kv.Value);
count++;
}
return count == 0 ? 0 : (float)(sum / count);
}
public void StopApplicationTurns()
{
#if DEBUG
logger.Debug("StopApplicationTurns");
#endif
// Do not RunDown the application run queue, since it is still used by low priority system targets.
applicationTurnsStopped = true;
foreach (var group in workgroupDirectory)
{
if (!group.Value.IsSystemGroup)
{
group.Value.Stop();
}
}
}
public void Stop()
{
// Stop system work groups.
var stopAll = !this.applicationTurnsStopped;
foreach (var group in workgroupDirectory)
{
if (stopAll || group.Value.IsSystemGroup)
{
group.Value.Stop();
}
}
cancellationTokenSource.Cancel();
}
private static readonly Action<Action> ExecuteActionCallback = obj => obj.Invoke();
private static readonly WaitCallback ExecuteAction = obj => ((Action)obj).Invoke();
public void QueueAction(Action action, IGrainContext context)
{
#if DEBUG
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("ScheduleTask on {Context}", context);
#endif
var workItemGroup = GetWorkItemGroup(context);
if (applicationTurnsStopped && (workItemGroup != null) && !workItemGroup.IsSystemGroup)
{
// Drop the task on the floor if it's an application work item and application turns are stopped
logger.LogWarning((int)ErrorCode.SchedulerAppTurnsStopped_1, "Dropping task item {Task} on context {Context} because application turns are stopped", action, context);
return;
}
if (workItemGroup?.TaskScheduler is TaskScheduler scheduler)
{
// This will make sure the TaskScheduler.Current is set correctly on any task that is created implicitly in the execution of this workItem.
// We must wrap any work item in Task and enqueue it as a task to the right scheduler via Task.Start.
Task t = new Task(action);
t.Start(scheduler);
}
else
{
// Note that we do not use UnsafeQueueUserWorkItem here because we typically want to propagate execution context,
// which includes async locals.
#if NETCOREAPP
ThreadPool.QueueUserWorkItem(ExecuteActionCallback, action, preferLocal: true);
#else
ThreadPool.QueueUserWorkItem(ExecuteAction, action);
#endif
}
}
// Enqueue a work item to a given context
public void QueueWorkItem(IWorkItem workItem)
{
#if DEBUG
if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("QueueWorkItem " + workItem);
#endif
var workItemGroup = GetWorkItemGroup(workItem.GrainContext);
if (applicationTurnsStopped && (workItemGroup != null) && !workItemGroup.IsSystemGroup)
{
// Drop the task on the floor if it's an application work item and application turns are stopped
var msg = string.Format("Dropping work item {0} because application turns are stopped", workItem);
logger.Warn(ErrorCode.SchedulerAppTurnsStopped_1, msg);
return;
}
if (workItemGroup?.TaskScheduler is TaskScheduler scheduler)
{
// This will make sure the TaskScheduler.Current is set correctly on any task that is created implicitly in the execution of this workItem.
// We must wrap any work item in Task and enqueue it as a task to the right scheduler via Task.Start.
Task t = TaskSchedulerUtils.WrapWorkItemAsTask(workItem);
t.Start(scheduler);
}
else
{
// Note that we do not use UnsafeQueueUserWorkItem here because we typically want to propagate execution context,
// which includes async locals.
#if NETCOREAPP
ThreadPool.QueueUserWorkItem(ExecuteWorkItemAction, workItem, preferLocal: true);
#else
ThreadPool.QueueUserWorkItem(ExecuteWorkItemCallback, workItem);
#endif
}
}
// Only required if you have work groups flagged by a context that is not a WorkGroupingContext
public void RegisterWorkContext(IGrainContext context)
{
if (context is null)
{
return;
}
var wg = new WorkItemGroup(
this,
context,
this.workItemGroupLogger,
this.activationTaskSchedulerLogger,
this.cancellationTokenSource.Token,
this.schedulerStatistics,
this.statisticsOptions);
if (context is SystemTarget systemTarget)
{
systemTarget.WorkItemGroup = wg;
}
if (context is ActivationData activation)
{
activation.WorkItemGroup = wg;
}
if (!workgroupDirectory.TryAdd(context, wg))
{
wg.Stop();
}
}
// Only required if you have work groups flagged by a context that is not a WorkGroupingContext
public void UnregisterWorkContext(IGrainContext context)
{
if (context is null)
{
return;
}
WorkItemGroup workGroup;
if (workgroupDirectory.TryRemove(context, out workGroup))
{
workGroup.Stop();
}
if (context is SystemTarget systemTarget)
{
systemTarget.WorkItemGroup = null;
}
if (context is ActivationData activation)
{
activation.WorkItemGroup = null;
}
}
// public for testing only -- should be private, otherwise
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public WorkItemGroup GetWorkItemGroup(IGrainContext context)
{
switch (context)
{
case null:
return null;
case SystemTarget systemTarget when systemTarget.WorkItemGroup is WorkItemGroup wg:
return wg;
case ActivationData activation when activation.WorkItemGroup is WorkItemGroup wg:
return wg;
default:
{
if (this.workgroupDirectory.TryGetValue(context, out var workGroup)) return workGroup;
this.ThrowNoWorkItemGroup(context);
return null;
}
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowNoWorkItemGroup(IGrainContext context)
{
var error = string.Format("QueueWorkItem was called on a non-null context {0} but there is no valid WorkItemGroup for it.", context);
logger.Error(ErrorCode.SchedulerQueueWorkItemWrongContext, error);
throw new InvalidSchedulingContextException(error);
}
[MethodImpl(MethodImplOptions.NoInlining)]
internal void CheckSchedulingContextValidity(IGrainContext context)
{
if (context is null)
{
throw new InvalidSchedulingContextException(
"CheckSchedulingContextValidity was called on a null SchedulingContext."
+ "Please make sure you are not trying to create a Timer from outside Orleans Task Scheduler, "
+ "which will be the case if you create it inside Task.Run.");
}
GetWorkItemGroup(context); // GetWorkItemGroup throws for Invalid context
}
internal void DumpSchedulerStatus(bool alwaysOutput = true)
{
if (!alwaysOutput && !logger.IsEnabled(LogLevel.Debug)) return;
var all = workgroupDirectory.ToList();
if (logger.IsEnabled(LogLevel.Information))
{
var stats = Utils.EnumerableToString(all.Select(i => i.Value).OrderBy(wg => wg.Name), wg => string.Format("--{0}", wg.DumpStatus()), Environment.NewLine);
if (stats.Length > 0)
logger.Info(ErrorCode.SchedulerStatistics,
"OrleansTaskScheduler.PrintStatistics(): WorkItems={0}, Directory:" + Environment.NewLine + "{1}", all.Count, stats);
}
var sb = new StringBuilder();
sb.AppendLine("Dump of current OrleansTaskScheduler status:");
sb.AppendFormat("CPUs={0} WorkItems={1} {2}",
Environment.ProcessorCount,
all.Count,
applicationTurnsStopped ? "STOPPING" : "").AppendLine();
// todo: either remove or support. At the time of writting is being used only in tests
// sb.AppendLine("RunQueue:");
// RunQueue.DumpStatus(sb); - woun't work without additional costs
// Pool.DumpStatus(sb);
foreach (var workgroup in all)
sb.AppendLine(workgroup.Value.DumpStatus());
logger.Info(ErrorCode.SchedulerStatus, sb.ToString());
}
}
}