Skip to content

Commit

Permalink
Re-implement InitCacheFlushTimeout option and tests (#1644)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjohnsonpint authored May 6, 2022
1 parent eab8935 commit 0978be0
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 83 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Fixes

- Rework how the `InitCacheFlushTimeout` option is implemented. ([#1644](https://github.com/getsentry/sentry-dotnet/pull/1644))

## 3.17.0

**Notice:** If you are using self-hosted Sentry, this version and forward requires either Sentry version >= [21.9.0](https://github.com/getsentry/relay/blob/master/CHANGELOG.md#2190), or you must manually disable sending client reports via the `SendClientReports` option.
Expand Down
31 changes: 31 additions & 0 deletions src/Sentry/Internal/Http/CachingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ internal class CachingTransport : ITransport, IAsyncDisposable, IDisposable
private readonly CancellationTokenSource _workerCts = new();
private Task _worker = null!;

private ManualResetEventSlim? _initCacheResetEvent;

// Inner transport exposed internally primarily for testing
internal ITransport InnerTransport => _innerTransport;

Expand Down Expand Up @@ -80,6 +82,32 @@ private void Initialize(bool startWorker)

// Start a worker, if one is needed
_worker = startWorker ? Task.Run(CachedTransportBackgroundTaskAsync) : Task.CompletedTask;

// Wait for init timeout, if configured. (Can't do this without a worker.)
if (startWorker && _options.InitCacheFlushTimeout > TimeSpan.Zero)
{
_options.LogDebug("Blocking initialization to flush the cache.");

using (_initCacheResetEvent = new ManualResetEventSlim())
{
// This will complete either when the first round of processing is done,
// or on timeout, whichever comes first.
var completed = _initCacheResetEvent.Wait(_options.InitCacheFlushTimeout);
if (completed)
{
_options.LogDebug("Completed flushing the cache. Resuming initialization.");
}
else
{
_options.LogDebug(
$"InitCacheFlushTimeout of {_options.InitCacheFlushTimeout} reached. " +
"Resuming initialization. Cache will continue flushing in the background.");
}
}

// We're done with this. Set null to avoid object disposed exceptions on future processing calls.
_initCacheResetEvent = null;
}
}

private async Task CachedTransportBackgroundTaskAsync()
Expand Down Expand Up @@ -185,6 +213,9 @@ private async Task ProcessCacheAsync(CancellationToken cancellation)
{
await InnerProcessCacheAsync(file, cancellation).ConfigureAwait(false);
}

// Signal that we can continue with initialization, if we're using _options.InitCacheFlushTimeout
_initCacheResetEvent?.Set();
}

private async Task InnerProcessCacheAsync(string file, CancellationToken cancellation)
Expand Down
51 changes: 1 addition & 50 deletions src/Sentry/Internal/SdkComposer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ private ITransport CreateTransport()
// When a cache directory path is given, wrap the transport in a caching transport.
if (!string.IsNullOrWhiteSpace(_options.CacheDirectoryPath))
{
var cachingTransport = CachingTransport.Create(transport, _options);
BlockCacheFlush(cachingTransport);
transport = cachingTransport;
transport = CachingTransport.Create(transport, _options);
}

// Always persist the transport on the options, so other places can pick it up where necessary.
Expand All @@ -52,53 +50,6 @@ private HttpTransport CreateHttpTransport()
return new HttpTransport(_options, httpClient);
}

internal void BlockCacheFlush(CachingTransport transport)
{
// If configured, flush existing cache
if (_options.InitCacheFlushTimeout > TimeSpan.Zero)
{
_options.LogDebug(
"Flushing existing cache during transport activation up to {0}.",
_options.InitCacheFlushTimeout);

try
{
// Flush cache but block on it only for a limited amount of time.
// If we don't flush it in time, then let it continue to run on the
// background but don't block the calling thread any more than set timeout.
#if NET6_0_OR_GREATER
transport.FlushAsync().WaitAsync(_options.InitCacheFlushTimeout)
// Block calling thread (Init) until either Flush or Timeout is reached
.GetAwaiter().GetResult();
#else
var timeoutTask = Task.Delay(_options.InitCacheFlushTimeout);
var flushTask = transport.FlushAsync();

// If flush finished in time, finalize the task by awaiting it to
// propagate potential exceptions.
if (Task.WhenAny(timeoutTask, flushTask).GetAwaiter().GetResult() == flushTask)
{
flushTask.GetAwaiter().GetResult();
}
// If flush timed out, log and continue
else
{
_options.LogInfo(
"Cache flushing is taking longer than the configured timeout of {0}. " +
"Continuing without waiting for the task to finish.",
_options.InitCacheFlushTimeout);
}
#endif
}
catch (Exception ex)
{
_options.LogError(
"Cache flushing failed.",
ex);
}
}
}

public IBackgroundWorker CreateBackgroundWorker()
{
if (_options.BackgroundWorker is { } worker)
Expand Down
21 changes: 16 additions & 5 deletions test/Sentry.Testing/FakeTransport.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
using System.Collections.Concurrent;

namespace Sentry.Testing;

internal class FakeTransport : ITransport, IDisposable
{
private readonly List<Envelope> _envelopes = new();
private readonly TimeSpan _artificialDelay;
private readonly ConcurrentQueue<Envelope> _envelopes = new();

public event EventHandler<Envelope> EnvelopeSent;

public virtual Task SendEnvelopeAsync(
public FakeTransport(TimeSpan artificialDelay = default)
{
_artificialDelay = artificialDelay;
}

public virtual async Task SendEnvelopeAsync(
Envelope envelope,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

_envelopes.Add(envelope);
EnvelopeSent?.Invoke(this, envelope);
if (_artificialDelay > TimeSpan.Zero)
{
await Task.Delay(_artificialDelay, CancellationToken.None);
}

return Task.CompletedTask;
_envelopes.Enqueue(envelope);
EnvelopeSent?.Invoke(this, envelope);
}

public IReadOnlyList<Envelope> GetSentEnvelopes() => _envelopes.ToArray();
Expand Down
98 changes: 70 additions & 28 deletions test/Sentry.Tests/SentrySdkTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Reflection;
using Sentry.Internal.Http;
using Sentry.Internal.ScopeStack;
Expand Down Expand Up @@ -234,44 +235,85 @@ public void Init_MultipleCalls_ReplacesHubWithLatest()
second.Dispose();
}

[Fact(Skip = "Flaky (after review)")]
public async Task Init_WithCache_BlocksUntilExistingCacheIsFlushed()
[Theory]
[InlineData(true)]
[InlineData(false)]
[InlineData(null)]
public async Task Init_WithCache_BlocksUntilExistingCacheIsFlushed(bool? testDelayWorking)
{
// Arrange
using var cacheDirectory = new TempDirectory();
var cachePath = cacheDirectory.Path;

// Pre-populate cache
var initialInnerTransport = Substitute.For<ITransport>();
await using var initialTransport = CachingTransport.Create(initialInnerTransport, new SentryOptions
{
DiagnosticLogger = _logger,
Dsn = ValidDsnWithoutSecret,
CacheDirectoryPath = cachePath
}, startWorker: false);
const int numEnvelopes = 3;
for (var i = 0; i < numEnvelopes; i++)
{
using var envelope = Envelope.FromEvent(new SentryEvent());
await initialTransport.SendEnvelopeAsync(envelope);
}

// Setup the transport to be slow.
// NOTE: This must be slow enough for CI or the tests will fail. If the test becomes flaky, increase the timeout.
// We are testing the timing delay behavior, so there's no alternative that will suffice.
var processingDelayPerEnvelope = TimeSpan.FromSeconds(2);
var transport = new FakeTransport(processingDelayPerEnvelope);

// Set the timeout for the desired result
var initFlushTimeout = testDelayWorking switch
{
true => TimeSpan.FromTicks(processingDelayPerEnvelope.Ticks * (numEnvelopes + 1)),
false => TimeSpan.FromTicks((long)(processingDelayPerEnvelope.Ticks * 1.9)), // not quite 2, since we want 1 envelope
null => TimeSpan.Zero
};

// Act
SentryOptions options = null;
try
{
// Pre-populate cache
var initialInnerTransport = new FakeFailingTransport();
await using var initialTransport = CachingTransport.Create(initialInnerTransport, new SentryOptions
var stopwatch = Stopwatch.StartNew();

using var _ = SentrySdk.Init(o =>
{
DiagnosticLogger = _logger,
Dsn = ValidDsnWithoutSecret,
CacheDirectoryPath = cacheDirectory.Path
}, startWorker: false);
o.Dsn = ValidDsnWithoutSecret;
o.DiagnosticLogger = _logger;
o.CacheDirectoryPath = cachePath;
o.InitCacheFlushTimeout = initFlushTimeout;
o.Transport = transport;
options = o;
});

stopwatch.Stop();

// Assert
var actualCount = transport.GetSentEnvelopes().Count;
var expectedCount = testDelayWorking switch
{
true => numEnvelopes, // We waited long enough to have them all
false => 1, // We only waited long enough to have one
null => 0 // We shouldn't have any, as we didn't ask to flush the cache on init
};

Assert.Equal(expectedCount, actualCount);

for (var i = 0; i < 3; i++)
if (testDelayWorking is true)
{
using var envelope = Envelope.FromEvent(new SentryEvent());
await initialTransport.SendEnvelopeAsync(envelope);
Assert.True(stopwatch.Elapsed < initFlushTimeout, "Should not have waited for the entire timeout!");
}
}

// Act
using var transport = new FakeTransport();
using var _ = SentrySdk.Init(o =>
finally
{
o.Dsn = ValidDsnWithoutSecret;
o.DiagnosticLogger = _logger;
o.CacheDirectoryPath = cacheDirectory.Path;
o.InitCacheFlushTimeout = TimeSpan.FromSeconds(30);
o.Transport = transport;
});

// Assert
Directory
.EnumerateFiles(cacheDirectory.Path, "*", SearchOption.AllDirectories)
.ToArray()
.Should().BeEmpty();
// cleanup to avoid disposing/deleting the temp directory while the cache worker is still running
var cachingTransport = (CachingTransport) options!.Transport;
await cachingTransport!.StopWorkerAsync();
}
}

[Fact]
Expand Down

0 comments on commit 0978be0

Please sign in to comment.