-
-
Notifications
You must be signed in to change notification settings - Fork 287
/
ResourceReaper.cs
465 lines (386 loc) · 17.6 KB
/
ResourceReaper.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
namespace DotNet.Testcontainers.Containers
{
using System;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Clients;
using DotNet.Testcontainers.Configurations;
using DotNet.Testcontainers.Images;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
/// <summary>
/// The Resource Reaper takes care of the remaining Docker resources and removes them: https://dotnet.testcontainers.org/api/resource-reaper/.
/// </summary>
[PublicAPI]
public sealed class ResourceReaper : IAsyncDisposable
{
public const string ResourceReaperSessionLabel = TestcontainersClient.TestcontainersLabel + ".resource-reaper-session";
private const ushort RyukPort = 8080;
/// <summary>
/// 60 seconds connection timeout.
/// </summary>
private const int ConnectionTimeoutInSeconds = 60;
/// <summary>
/// 2 seconds retry timeout.
/// </summary>
private const int RetryTimeoutInSeconds = 2;
private static readonly IImage RyukImage = new DockerImage("testcontainers/ryuk:0.6.0");
private static readonly SemaphoreSlim DefaultLock = new SemaphoreSlim(1, 1);
private static readonly LingerOption DiscardAllPendingData = new LingerOption(true, 0);
private static ResourceReaper _defaultInstance;
private readonly CancellationTokenSource _maintainConnectionCts = new CancellationTokenSource();
private readonly IContainer _resourceReaperContainer;
private Task _maintainConnectionTask = Task.CompletedTask;
private bool _disposed;
static ResourceReaper()
{
}
private ResourceReaper(Guid sessionId, IDockerEndpointAuthenticationConfiguration dockerEndpointAuthConfig, IImage resourceReaperImage, IMount dockerSocket, ILogger logger, bool requiresPrivilegedMode)
{
_resourceReaperContainer = new ContainerBuilder()
.WithName($"testcontainers-ryuk-{sessionId:D}")
.WithDockerEndpoint(dockerEndpointAuthConfig)
.WithImage(resourceReaperImage)
.WithPrivileged(requiresPrivilegedMode)
.WithAutoRemove(true)
.WithCleanUp(false)
.WithPortBinding(TestcontainersSettings.ResourceReaperPublicHostPort.Invoke(dockerEndpointAuthConfig), RyukPort)
.WithMount(dockerSocket)
.WithLogger(logger)
.Build();
SessionId = sessionId;
}
/// <summary>
/// Occurs when a Resource Reaper state has changed.
/// </summary>
/// <remarks>
/// It emits state changes to uninitialized instances too.
/// </remarks>
[PublicAPI]
public static event EventHandler<ResourceReaperStateEventArgs> StateChanged;
/// <summary>
/// Gets the default <see cref="ResourceReaper" /> session id.
/// </summary>
/// <remarks>
/// The default <see cref="ResourceReaper" /> will start either on <see cref="GetAndStartDefaultAsync(IDockerEndpointAuthenticationConfiguration, ILogger, bool, CancellationToken)" />
/// or if a <see cref="IContainer" /> is configured with <see cref="IAbstractBuilder{TBuilderEntity, TContainerEntity, TCreateResourceEntity}.WithCleanUp" />.
/// </remarks>
[PublicAPI]
public static Guid DefaultSessionId { get; }
= Guid.NewGuid();
/// <summary>
/// Gets the <see cref="ResourceReaper" /> session id.
/// </summary>
[PublicAPI]
public Guid SessionId { get; }
/// <summary>
/// Starts and returns the default <see cref="ResourceReaper" /> instance.
/// </summary>
/// <param name="dockerEndpointAuthConfig">The Docker endpoint authentication configuration.</param>
/// <param name="logger">The logger.</param>
/// <param name="isWindowsEngineEnabled">Determines whether the Windows engine is enabled or not.</param>
/// <param name="ct">The cancellation token to cancel the <see cref="ResourceReaper" /> initialization.</param>
/// <returns>Task that completes when the <see cref="ResourceReaper" /> has been started.</returns>
[PublicAPI]
public static async Task<ResourceReaper> GetAndStartDefaultAsync(IDockerEndpointAuthenticationConfiguration dockerEndpointAuthConfig, ILogger logger, bool isWindowsEngineEnabled = false, CancellationToken ct = default)
{
if (isWindowsEngineEnabled)
{
return null;
}
if (_defaultInstance != null && !_defaultInstance._disposed)
{
return _defaultInstance;
}
await DefaultLock.WaitAsync(ct)
.ConfigureAwait(false);
if (_defaultInstance != null && !_defaultInstance._disposed)
{
DefaultLock.Release();
return _defaultInstance;
}
try
{
var resourceReaperImage = TestcontainersSettings.ResourceReaperImage ?? RyukImage;
var requiresPrivilegedMode = TestcontainersSettings.ResourceReaperPrivilegedModeEnabled;
_defaultInstance = await GetAndStartNewAsync(DefaultSessionId, dockerEndpointAuthConfig, resourceReaperImage, new UnixSocketMount(dockerEndpointAuthConfig.Endpoint), logger, requiresPrivilegedMode, ct: ct)
.ConfigureAwait(false);
return _defaultInstance;
}
finally
{
DefaultLock.Release();
}
}
/// <inheritdoc />
[PublicAPI]
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
try
{
_maintainConnectionCts.Cancel();
// Close connection before disposing Resource Reaper.
await _maintainConnectionTask
.ConfigureAwait(false);
}
finally
{
_maintainConnectionCts.Dispose();
}
if (_resourceReaperContainer != null)
{
await _resourceReaperContainer.DisposeAsync()
.ConfigureAwait(false);
}
}
/// <summary>
/// Starts and returns a new <see cref="ResourceReaper" /> instance.
/// </summary>
/// <param name="dockerEndpointAuthConfig">The Docker endpoint authentication configuration.</param>
/// <param name="resourceReaperImage">The Resource Reaper image.</param>
/// <param name="dockerSocket">The Docker socket.</param>
/// <param name="logger">The logger.</param>
/// <param name="requiresPrivilegedMode">True if the container requires privileged mode, otherwise false.</param>
/// <param name="initTimeout">The timeout to initialize the Ryuk connection (Default: <inheritdoc cref="ConnectionTimeoutInSeconds" />).</param>
/// <param name="ct">The cancellation token to cancel the <see cref="ResourceReaper" /> initialization.</param>
/// <returns>Task that completes when the <see cref="ResourceReaper" /> has been started.</returns>
[PublicAPI]
private static Task<ResourceReaper> GetAndStartNewAsync(IDockerEndpointAuthenticationConfiguration dockerEndpointAuthConfig, IImage resourceReaperImage, IMount dockerSocket, ILogger logger, bool requiresPrivilegedMode = false, TimeSpan initTimeout = default, CancellationToken ct = default)
{
return GetAndStartNewAsync(Guid.NewGuid(), dockerEndpointAuthConfig, resourceReaperImage, dockerSocket, logger, requiresPrivilegedMode, initTimeout, ct);
}
/// <summary>
/// Starts and returns a new <see cref="ResourceReaper" /> instance.
/// </summary>
/// <param name="sessionId">The session id.</param>
/// <param name="dockerEndpointAuthConfig">The Docker endpoint authentication configuration.</param>
/// <param name="resourceReaperImage">The Resource Reaper image.</param>
/// <param name="dockerSocket">The Docker socket.</param>
/// <param name="logger">The logger.</param>
/// <param name="requiresPrivilegedMode">True if the container requires privileged mode, otherwise false.</param>
/// <param name="initTimeout">The timeout to initialize the Ryuk connection (Default: <inheritdoc cref="ConnectionTimeoutInSeconds" />).</param>
/// <param name="ct">The cancellation token to cancel the <see cref="ResourceReaper" /> initialization.</param>
/// <returns>Task that completes when the <see cref="ResourceReaper" /> has been started.</returns>
[PublicAPI]
private static async Task<ResourceReaper> GetAndStartNewAsync(Guid sessionId, IDockerEndpointAuthenticationConfiguration dockerEndpointAuthConfig, IImage resourceReaperImage, IMount dockerSocket, ILogger logger, bool requiresPrivilegedMode = false, TimeSpan initTimeout = default, CancellationToken ct = default)
{
var ryukInitializedTaskSource = new TaskCompletionSource<bool>();
var resourceReaper = new ResourceReaper(sessionId, dockerEndpointAuthConfig, resourceReaperImage, dockerSocket, logger, requiresPrivilegedMode);
initTimeout = TimeSpan.Equals(default, initTimeout) ? TimeSpan.FromSeconds(ConnectionTimeoutInSeconds) : initTimeout;
try
{
StateChanged?.Invoke(null, new ResourceReaperStateEventArgs(resourceReaper, ResourceReaperState.Created));
await resourceReaper._resourceReaperContainer.StartAsync(ct)
.ConfigureAwait(false);
StateChanged?.Invoke(null, new ResourceReaperStateEventArgs(resourceReaper, ResourceReaperState.InitializingConnection));
using (var initTimeoutCts = new CancellationTokenSource())
{
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(initTimeoutCts.Token, ct))
{
resourceReaper._maintainConnectionTask = resourceReaper.MaintainRyukConnection(ryukInitializedTaskSource, linkedCts.Token);
initTimeoutCts.CancelAfter(initTimeout);
await ryukInitializedTaskSource.Task
.ConfigureAwait(false);
}
}
StateChanged?.Invoke(null, new ResourceReaperStateEventArgs(resourceReaper, ResourceReaperState.MaintainingConnection));
}
catch (Exception)
{
await resourceReaper.DisposeAsync()
.ConfigureAwait(false);
throw;
}
return resourceReaper;
}
private bool TryGetEndpoint(out string host, out ushort port)
{
try
{
host = _resourceReaperContainer.Hostname;
port = _resourceReaperContainer.GetMappedPublicPort(RyukPort);
return true;
}
catch (Exception e)
{
_resourceReaperContainer.Logger.CanNotGetResourceReaperEndpoint(SessionId, e);
host = null;
port = 0;
return false;
}
}
/// <summary>
/// Establishes and maintains the connection to the running Ryuk container.
///
/// This is split into two phases:
///
/// 1) Initialization
/// Tries to establish a connection to Ryuk. After establishment, sends a Docker resource filter to Ryuk. Ryuk will acknowledge the reception of the filter.
/// On termination, Ryuk will delete all Docker resources matching the filter. You can cancel the initialization with <paramref name="ct" />.
///
/// 2) Maintenance
/// After initialization, we need to keep the connection to Ryuk open. If we lose the connection for any reason, Ryuk allows reconnecting within 10 seconds.
/// It's not necessary to send the filter again after reconnecting.
/// </summary>
/// <param name="ryukInitializedTaskSource">The task that completes after the initialization.</param>
/// <param name="ct">The cancellation token to cancel the <see cref="ResourceReaper" /> initialization. This will not cancel the maintained connection.</param>
private async Task MaintainRyukConnection(TaskCompletionSource<bool> ryukInitializedTaskSource, CancellationToken ct)
{
connect_to_ryuk: while (!_maintainConnectionCts.IsCancellationRequested && !ct.IsCancellationRequested && !ryukInitializedTaskSource.Task.IsCompleted)
{
if (!TryGetEndpoint(out var host, out var port))
{
await Task.Delay(TimeSpan.FromSeconds(RetryTimeoutInSeconds), CancellationToken.None)
.ConfigureAwait(false);
continue;
}
using (var tcpClient = new TcpClient())
{
tcpClient.LingerState = DiscardAllPendingData;
try
{
#if NET6_0_OR_GREATER
await tcpClient.ConnectAsync(host, port, ct)
.ConfigureAwait(false);
#else
await tcpClient.ConnectAsync(host, port)
.ConfigureAwait(false);
#endif
var stream = tcpClient.GetStream();
var filter = $"label={ResourceReaperSessionLabel}={SessionId:D}\n";
var sendBytes = Encoding.ASCII.GetBytes(filter);
var readBytes = new byte[tcpClient.ReceiveBufferSize];
if (!ryukInitializedTaskSource.Task.IsCompleted)
{
using (var messageBuffer = new MemoryStream())
{
#if NETSTANDARD2_0
await stream.WriteAsync(sendBytes, 0, sendBytes.Length, ct)
.ConfigureAwait(false);
#else
await stream.WriteAsync(sendBytes, ct)
.ConfigureAwait(false);
#endif
await stream.FlushAsync(ct)
.ConfigureAwait(false);
bool hasAcknowledge;
do
{
#if NETSTANDARD2_0
var numberOfBytes = await stream.ReadAsync(readBytes, 0, readBytes.Length, ct)
.ConfigureAwait(false);
#else
var numberOfBytes = await stream.ReadAsync(readBytes, ct)
.ConfigureAwait(false);
#endif
if (numberOfBytes == 0)
{
// Even if there is no listening socket behind the bound port, the TcpClient establishes a connection.
// If we do not receive any data, the socket is not ready yet.
await Task.Delay(TimeSpan.FromSeconds(RetryTimeoutInSeconds), ct)
.ConfigureAwait(false);
#pragma warning disable S907
goto connect_to_ryuk;
#pragma warning restore S907
}
var indexOfNewLine = Array.IndexOf(readBytes, (byte)'\n');
if (indexOfNewLine == -1)
{
// We have not received the entire message yet. Read from stream again.
await messageBuffer.WriteAsync(readBytes, 0, numberOfBytes, ct)
.ConfigureAwait(false);
hasAcknowledge = false;
}
else
{
await messageBuffer.WriteAsync(readBytes, 0, indexOfNewLine, ct)
.ConfigureAwait(false);
hasAcknowledge = "ack".Equals(Encoding.ASCII.GetString(messageBuffer.ToArray()), StringComparison.OrdinalIgnoreCase);
messageBuffer.SetLength(0);
}
}
while (!hasAcknowledge);
ryukInitializedTaskSource.SetResult(true);
}
}
while (!_maintainConnectionCts.IsCancellationRequested)
{
// Keep the connection to Ryuk up.
#if NETSTANDARD2_0
_ = await stream.ReadAsync(readBytes, 0, readBytes.Length, _maintainConnectionCts.Token)
.ConfigureAwait(false);
#else
_ = await stream.ReadAsync(readBytes, _maintainConnectionCts.Token)
.ConfigureAwait(false);
#endif
}
}
catch (OperationCanceledException)
{
// Ignore cancellation.
}
catch (SocketException e)
{
_resourceReaperContainer.Logger.CanNotConnectToResourceReaper(SessionId, host, port, e);
await Task.Delay(TimeSpan.FromSeconds(RetryTimeoutInSeconds), CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception e)
{
_resourceReaperContainer.Logger.LostConnectionToResourceReaper(SessionId, host, port, e);
await Task.Delay(TimeSpan.FromSeconds(RetryTimeoutInSeconds), CancellationToken.None)
.ConfigureAwait(false);
}
}
}
if (ryukInitializedTaskSource.Task.IsCompleted)
{
StateChanged?.Invoke(null, new ResourceReaperStateEventArgs(this, ResourceReaperState.ConnectionTerminated));
return;
}
if (ct.IsCancellationRequested)
{
ryukInitializedTaskSource.SetException(new ResourceReaperException("Initialization has been cancelled."));
}
else
{
ryukInitializedTaskSource.SetException(new ResourceReaperException("Initialization failed."));
}
}
private sealed class UnixSocketMount : IMount
{
private const string DockerSocketFilePath = "/var/run/docker.sock";
public UnixSocketMount([NotNull] Uri dockerEndpoint)
{
// If the Docker endpoint is a Unix socket, extract the socket path from the URI; otherwise, fallback to the default Unix socket path.
Source = "unix".Equals(dockerEndpoint.Scheme, StringComparison.OrdinalIgnoreCase) ? dockerEndpoint.AbsolutePath : DockerSocketFilePath;
// If the user has overridden the Docker socket path, use the user-specified path; otherwise, keep the previously determined source.
Source = !string.IsNullOrEmpty(TestcontainersSettings.DockerSocketOverride) ? TestcontainersSettings.DockerSocketOverride : Source;
Target = DockerSocketFilePath;
}
public MountType Type
=> MountType.Bind;
public AccessMode AccessMode
=> AccessMode.ReadOnly;
public string Source { get; }
public string Target { get; }
public Task CreateAsync(CancellationToken ct = default)
{
return Task.CompletedTask;
}
public Task DeleteAsync(CancellationToken ct = default)
{
return Task.CompletedTask;
}
}
}
}