Skip to content

Commit

Permalink
feat: Allow handling of timer thread exceptions in timed buffers.
Browse files Browse the repository at this point in the history
Closes #2182
  • Loading branch information
amanda-tarafa committed Feb 2, 2023
1 parent 195e99a commit 957f4b4
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,29 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.AspNetCore3", "Google.Cloud.Diagnostics.AspNetCore3\Google.Cloud.Diagnostics.AspNetCore3.csproj", "{23D8A49D-947C-4CA6-B551-87B2BDECD3A1}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.AspNetCore3", "Google.Cloud.Diagnostics.AspNetCore3\Google.Cloud.Diagnostics.AspNetCore3.csproj", "{23D8A49D-947C-4CA6-B551-87B2BDECD3A1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.Common", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.csproj", "{CC6B069E-A4F1-4DED-8BC7-1E670C48E3DA}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.Common", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.csproj", "{CC6B069E-A4F1-4DED-8BC7-1E670C48E3DA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.AspNetCore3.IntegrationTests", "Google.Cloud.Diagnostics.AspNetCore3.IntegrationTests\Google.Cloud.Diagnostics.AspNetCore3.IntegrationTests.csproj", "{CCE7A221-2FCD-46B4-BB45-8173B3475138}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.AspNetCore3.IntegrationTests", "Google.Cloud.Diagnostics.AspNetCore3.IntegrationTests\Google.Cloud.Diagnostics.AspNetCore3.IntegrationTests.csproj", "{CCE7A221-2FCD-46B4-BB45-8173B3475138}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.ClientTesting", "..\..\tools\Google.Cloud.ClientTesting\Google.Cloud.ClientTesting.csproj", "{48F406D2-8BEC-4FCC-A7AC-CE2A1BE9E5BF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.ClientTesting", "..\..\tools\Google.Cloud.ClientTesting\Google.Cloud.ClientTesting.csproj", "{48F406D2-8BEC-4FCC-A7AC-CE2A1BE9E5BF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.Common.IntegrationTests", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.IntegrationTests\Google.Cloud.Diagnostics.Common.IntegrationTests.csproj", "{5D5E2B8B-04D4-457D-95DA-26C13351AD70}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.Common.IntegrationTests", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.IntegrationTests\Google.Cloud.Diagnostics.Common.IntegrationTests.csproj", "{5D5E2B8B-04D4-457D-95DA-26C13351AD70}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.Common.Tests", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.Tests\Google.Cloud.Diagnostics.Common.Tests.csproj", "{06AAB8AE-1833-44A9-92B7-6590AFEC8542}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.Common.Tests", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.Tests\Google.Cloud.Diagnostics.Common.Tests.csproj", "{06AAB8AE-1833-44A9-92B7-6590AFEC8542}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.AspNetCore3.Snippets", "Google.Cloud.Diagnostics.AspNetCore3.Snippets\Google.Cloud.Diagnostics.AspNetCore3.Snippets.csproj", "{A1ADC7D3-BFF9-4FA2-AF06-AACA6CFF46E8}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.AspNetCore3.Snippets", "Google.Cloud.Diagnostics.AspNetCore3.Snippets\Google.Cloud.Diagnostics.AspNetCore3.Snippets.csproj", "{A1ADC7D3-BFF9-4FA2-AF06-AACA6CFF46E8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Google.Cloud.Diagnostics.AspNetCore3.Tests", "Google.Cloud.Diagnostics.AspNetCore3.Tests\Google.Cloud.Diagnostics.AspNetCore3.Tests.csproj", "{7E9F338F-95C6-4C2B-A3B2-979D4D34918F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.AspNetCore3.Tests", "Google.Cloud.Diagnostics.AspNetCore3.Tests\Google.Cloud.Diagnostics.AspNetCore3.Tests.csproj", "{7E9F338F-95C6-4C2B-A3B2-979D4D34918F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Google.Cloud.Diagnostics.Common.Snippets", "..\Google.Cloud.Diagnostics.Common\Google.Cloud.Diagnostics.Common.Snippets\Google.Cloud.Diagnostics.Common.Snippets.csproj", "{D2D3D836-9D94-4F4E-B167-796AB085F4C8}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{23D8A49D-947C-4CA6-B551-87B2BDECD3A1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{23D8A49D-947C-4CA6-B551-87B2BDECD3A1}.Debug|Any CPU.Build.0 = Debug|Any CPU
Expand Down Expand Up @@ -60,5 +59,15 @@ Global
{7E9F338F-95C6-4C2B-A3B2-979D4D34918F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7E9F338F-95C6-4C2B-A3B2-979D4D34918F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7E9F338F-95C6-4C2B-A3B2-979D4D34918F}.Release|Any CPU.Build.0 = Release|Any CPU
{D2D3D836-9D94-4F4E-B167-796AB085F4C8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D2D3D836-9D94-4F4E-B167-796AB085F4C8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2D3D836-9D94-4F4E-B167-796AB085F4C8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2D3D836-9D94-4F4E-B167-796AB085F4C8}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {891EF638-28FB-453F-B9AD-6D73AC83EF93}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Google LLC
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,10 +74,32 @@ public static IHostBuilder CreateHostBuilder() =>
// End sample
}

private static class TimedBufferHandlesTimerException
{
// Sample: TimedBufferHandlesTimerException
public static IHostBuilder CreateHostBuilder() =>
new HostBuilder()
.ConfigureServices(services =>
{
// Replace ProjectId with your Google Cloud Project ID.
// Replace Service with a name or identifier for the service.
// Replace Version with a version for the service.
services.AddGoogleDiagnostics(ProjectId, Service, Version,
// Configure the three components so that exceptions thrown in the timer thread
// of timed buffers are handled by the given handler.
traceOptions: TraceOptions.Create(bufferOptions: BufferOptions.TimedBuffer().WithTimerExceptionHandler(ex => Console.WriteLine(ex))),
loggingOptions: LoggingOptions.Create(bufferOptions: BufferOptions.TimedBuffer().WithTimerExceptionHandler(ex => Console.WriteLine(ex))),
errorReportingOptions: ErrorReportingOptions.Create(bufferOptions: BufferOptions.TimedBuffer().WithTimerExceptionHandler(ex => Console.WriteLine(ex))));
// Register other services here if you need them.
});
// End sample
}

public static TheoryData<Func<IHostBuilder>> HostBuilders => new TheoryData<Func<IHostBuilder>>
{
DefaultHostBuilder.CreateHostBuilder,
NoBuffersHostBuilder.CreateHostBuilder
NoBuffersHostBuilder.CreateHostBuilder,
TimedBufferHandlesTimerException.CreateHostBuilder,
};

[Theory]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Google.Api.Gax;
using Grpc.Core;
using Moq;
using System;
using System.Collections.Generic;
Expand All @@ -31,7 +29,7 @@ public class TimedBufferingConsumerTest
public void Receive()
{
var mockConsumer = new Mock<IConsumer<int>>();
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime, null);
consumer.Receive(new[] { 1, 2, 3, 4, 5 });
mockConsumer.Verify(c => c.Receive(It.IsAny<IEnumerable<int>>()), Times.Never());
}
Expand All @@ -42,7 +40,7 @@ public void Flush()
int[] intArray = { 1, 2, 3, 4 };
var mockConsumer = new Mock<IConsumer<int>>();
mockConsumer.Setup(c => c.Receive(intArray));
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime, null);

consumer.Receive(intArray);
mockConsumer.Verify(c => c.Receive(It.IsAny<IEnumerable<int>>()), Times.Never());
Expand All @@ -56,7 +54,7 @@ public void Flush_NoTraces()
{
var mockConsumer = new Mock<IConsumer<int>>();
mockConsumer.Setup(c => c.Receive(new int[] { }));
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime, null);

consumer.Receive(new int[] { });
consumer.Flush();
Expand All @@ -67,7 +65,7 @@ public void Flush_NoTraces()
public async Task ReceiveAsync()
{
var mockConsumer = new Mock<IConsumer<int>>();
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime, null);
await consumer.ReceiveAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
mockConsumer.Verify(c => c.ReceiveAsync(
It.IsAny<IEnumerable<int>>(), CancellationToken.None), Times.Never());
Expand All @@ -81,7 +79,7 @@ public async Task FlushAsync()
var mockConsumer = new Mock<IConsumer<int>>();
mockConsumer.Setup(c => c.ReceiveAsync(
intArray, CancellationToken.None)).Returns(CommonUtils.CompletedTask);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime, null);

await consumer.ReceiveAsync(intArray, CancellationToken.None);
mockConsumer.Verify(c => c.ReceiveAsync(
Expand All @@ -97,7 +95,7 @@ public async Task FlushAsync_NoTraces()
var mockConsumer = new Mock<IConsumer<int>>();
mockConsumer.Setup(c => c.ReceiveAsync(
new int[] { }, CancellationToken.None));
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime);
var consumer = TimedBufferingConsumer<int>.Create(mockConsumer.Object, _waitTime, null);

await consumer.ReceiveAsync(new int[] { }, CancellationToken.None);
await consumer.FlushAsync();
Expand All @@ -110,7 +108,7 @@ public void Timer()
{
var mockConsumer = new Mock<IConsumer<int>>();
var fakeTimer = new FakeThreadingTimer();
var consumer = new TimedBufferingConsumer<int>(mockConsumer.Object, _waitTime, fakeTimer);
var consumer = new TimedBufferingConsumer<int>(mockConsumer.Object, _waitTime, null, fakeTimer);

var intArray = new[] { 1, 2, 3, 4, 5 };
consumer.Receive(intArray);
Expand All @@ -129,12 +127,29 @@ public void Timer_SwallowException()
{
var mockConsumer = new Mock<IConsumer<int>>();
var fakeTimer = new FakeThreadingTimer();
var consumer = new TimedBufferingConsumer<int>(mockConsumer.Object, _waitTime, fakeTimer);
var consumer = new TimedBufferingConsumer<int>(mockConsumer.Object, _waitTime, null, fakeTimer);

mockConsumer.Setup(c => c.Receive(It.IsAny<IEnumerable<int>>())).Throws(new Exception());
consumer.Receive(new[] { 1, 2, 3, 4, 5 });
fakeTimer.FullTick();
mockConsumer.Verify(c => c.Receive(It.IsAny<IEnumerable<int>>()), Times.Once());
}

[Fact]
public void Timer_HandleException()
{
var mockConsumer = new Mock<IConsumer<int>>();
var fakeTimer = new FakeThreadingTimer();
int exceptionsCaught = 0;
var consumer = new TimedBufferingConsumer<int>(mockConsumer.Object, _waitTime, HandleException, fakeTimer);

mockConsumer.Setup(c => c.Receive(It.IsAny<IEnumerable<int>>())).Throws(new Exception());
consumer.Receive(new[] { 1, 2, 3, 4, 5 });
fakeTimer.FullTick();
mockConsumer.Verify(c => c.Receive(It.IsAny<IEnumerable<int>>()), Times.Once());
Assert.Equal(1, exceptionsCaught);

void HandleException(Exception exception) => exceptionsCaught++;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,21 @@ public sealed class BufferOptions
/// <summary>The time to wait before the buffer is flushed, only used for <see cref="BufferType.Timed"/></summary>
public TimeSpan BufferWaitTime { get; }

internal BufferOptions(BufferType bufferType, int? bufferSizeBytes = null, TimeSpan? bufferWaitTime = null)
/// <summary>
/// An action that will be called whenever an exception is thrown during a timed flush of the buffer.
/// May be null. Only used for <see cref="BufferType.Timed"/>.
/// </summary>
/// <remarks>
/// Throwing an exception within the timer thread risks inmediately crashing the applicaiton, so we
/// never do that, even if <see cref="RetryOptions.ExceptionHandling"/> has been set to
/// <see cref="ExceptionHandling.Propagate"/> because propagating exceptions is different than crashing
/// an application.
/// Instead, client code can specify a value for this action, that will be called when an exception is thrown
/// within the timer thread. Client code may decide what to do in such cases.
/// </remarks>
public Action<Exception> TimerExceptionHandler { get; }

internal BufferOptions(BufferType bufferType, int? bufferSizeBytes = null, TimeSpan? bufferWaitTime = null, Action<Exception> timerExceptionHandler = null)
{
BufferType = bufferType;
BufferSizeBytes = bufferSizeBytes ?? default;
Expand All @@ -73,14 +87,23 @@ internal BufferOptions(BufferType bufferType, int? bufferSizeBytes = null, TimeS
/// Create <see cref="BufferOptions"/> for <see cref="BufferType.Sized"/>
/// </summary>
/// <param name="bufferSizeBytes">Optional, The buffer size in bytes.</param>
public static BufferOptions SizedBuffer(int bufferSizeBytes = DefaultBufferSize)
=> new BufferOptions(BufferType.Sized, bufferSizeBytes);
public static BufferOptions SizedBuffer(int bufferSizeBytes = DefaultBufferSize) =>
new BufferOptions(BufferType.Sized, bufferSizeBytes);

/// <summary>
/// Create <see cref="BufferOptions"/> for <see cref="BufferType.Timed"/>
/// </summary>
/// <param name="bufferWaitTime">Optional, The minimum amount of time between flushes.</param>
public static BufferOptions TimedBuffer(TimeSpan? bufferWaitTime = null)
=> new BufferOptions(BufferType.Timed, null, bufferWaitTime ?? DefaultWaitTime);
public static BufferOptions TimedBuffer(TimeSpan? bufferWaitTime = null) =>
new BufferOptions(BufferType.Timed, null, bufferWaitTime ?? DefaultWaitTime);

/// <summary>
/// Creates a new <see cref="BufferOptions"/> instance which is identical to this one
/// but with the given timer exception handler.
/// </summary>
public BufferOptions WithTimerExceptionHandler(Action<Exception> handler) =>
BufferType == BufferType.Timed
? new BufferOptions(BufferType.Timed, null, BufferWaitTime, handler)
: throw new InvalidOperationException($"This buffer is of type {BufferType.Sized} and timer exception handlers can only be added to buffers of type {BufferType.Timed}");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,7 @@ internal static IConsumer<T> GetConsumer(IConsumer<T> consumer, Func<T, int> siz
case BufferType.Sized:
return SizedBufferingConsumer<T>.Create(retryConsumer, sizer, bufferOptions.BufferSizeBytes);
case BufferType.Timed:
return TimedBufferingConsumer<T>.Create(retryConsumer, bufferOptions.BufferWaitTime);
return TimedBufferingConsumer<T>.Create(retryConsumer, bufferOptions.BufferWaitTime, bufferOptions.TimerExceptionHandler);
case BufferType.None:
return retryConsumer;
default:
Expand Down
Loading

0 comments on commit 957f4b4

Please sign in to comment.