Skip to content

Commit

Permalink
implement RFC 574
Browse files Browse the repository at this point in the history
- set grpc.max_receive_message_length to 17MB
- force rediscovery only when lost connection or unknown error
  • Loading branch information
thefringeninja committed Feb 14, 2022
1 parent 374da1b commit 34e797d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
7 changes: 6 additions & 1 deletion src/EventStore.Client/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#nullable enable
namespace EventStore.Client {
internal static class ChannelFactory {
private const int MaxReceiveMessageLength = 17 * 1024 * 1024;

public static TChannel CreateChannel(EventStoreClientSettings settings, EndPoint endPoint, bool https) =>
CreateChannel(settings, endPoint.ToUri(https));

Expand All @@ -32,7 +34,8 @@ public static TChannel CreateChannel(EventStoreClientSettings settings, Uri? add
},
LoggerFactory = settings.LoggerFactory,
Credentials = settings.ChannelCredentials,
DisposeHttpClient = true
DisposeHttpClient = true,
MaxReceiveMessageSize = MaxReceiveMessageLength
});

HttpMessageHandler CreateHandler() {
Expand All @@ -56,6 +59,8 @@ IEnumerable<ChannelOption> GetChannelOptions() {

yield return new ChannelOption("grpc.keepalive_timeout_ms",
GetValue((int)settings.ConnectivitySettings.KeepAliveTimeout.TotalMilliseconds));

yield return new ChannelOption("grpc.max_receive_message_length", MaxReceiveMessageLength);
}

static int GetValue(int value) => value switch {
Expand Down
7 changes: 5 additions & 2 deletions src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
private void ReportNewLeader<TResponse>(Task<TResponse> task) {
if (task.Exception?.InnerException is NotLeaderException ex) {
_onError(ex.LeaderEndpoint);
} else if (task.Exception?.InnerException?.InnerException is RpcException rpcException &&
rpcException.StatusCode == StatusCode.Unavailable) {
} else if (task.Exception?.InnerException is RpcException {
StatusCode: StatusCode.Unavailable or
// StatusCode.Unknown or TODO: use RPC exceptions on server
StatusCode.Aborted
}) {
_onError(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,27 @@

namespace EventStore.Client.Interceptors {
public class ReportLeaderInterceptorTests {
private static readonly Marshaller<object> _marshaller =
new Marshaller<object>(_ => Array.Empty<byte>(), _ => new object());

public delegate Task GrpcCall(Interceptor interceptor, Task<object> response = null);

private static readonly Marshaller<object> _marshaller = new(_ => Array.Empty<byte>(), _ => new object());

private static readonly StatusCode[] ForcesRediscoveryStatusCodes = {
StatusCode.Aborted,
//StatusCode.Unknown, TODO: use RPC exceptions on server
StatusCode.Unavailable
};


private static IEnumerable<GrpcCall> GrpcCalls() {
yield return MakeUnaryCall;
yield return MakeClientStreamingCall;
yield return MakeDuplexStreamingCall;
yield return MakeServerStreamingCall;
}

public static IEnumerable<object[]> TestCases() => GrpcCalls().Select(call => new object[] {call});
public static IEnumerable<object[]> ReportsNewLeaderCases() => GrpcCalls().Select(call => new object[] {call});

[Theory, MemberData(nameof(TestCases))]
[Theory, MemberData(nameof(ReportsNewLeaderCases))]
public async Task ReportsNewLeader(GrpcCall call) {
EndPoint actual = default;
var sut = new ReportLeaderInterceptor(ep => actual = ep);
Expand All @@ -34,6 +40,43 @@ public async Task ReportsNewLeader(GrpcCall call) {
Assert.Equal(result.LeaderEndpoint, actual);
}

public static IEnumerable<object[]> ForcesRediscoveryCases() => from call in GrpcCalls()
from statusCode in ForcesRediscoveryStatusCodes
select new object[] {call, statusCode};

[Theory, MemberData(nameof(ForcesRediscoveryCases))]
public async Task ForcesRediscovery(GrpcCall call, StatusCode statusCode) {
EndPoint actual = default;
bool invoked = false;

var sut = new ReportLeaderInterceptor(ep => {
invoked = true;
actual = ep;
});

var result = await Assert.ThrowsAsync<RpcException>(() => call(sut,
Task.FromException<object>(new RpcException(new Status(statusCode, "oops")))));
Assert.Null(actual);
Assert.True(invoked);
}

public static IEnumerable<object[]> DoesNotForceRediscoveryCases() => from call in GrpcCalls()
from statusCode in Enum.GetValues(typeof(StatusCode))
.OfType<StatusCode>()
.Except(ForcesRediscoveryStatusCodes)
select new object[] {call, statusCode};

[Theory, MemberData(nameof(DoesNotForceRediscoveryCases))]
public async Task DoesNotForceRediscovery(GrpcCall call, StatusCode statusCode) {
bool invoked = false;
var sut = new ReportLeaderInterceptor(ep => invoked = true);

var result = await Assert.ThrowsAsync<RpcException>(() => call(sut,
Task.FromException<object>(new RpcException(new Status(statusCode, "oops")))));
Assert.False(invoked);
}


private static async Task MakeUnaryCall(Interceptor interceptor, Task<object> response = null) {
using var call = interceptor.AsyncUnaryCall(new object(),
CreateClientInterceptorContext(MethodType.Unary),
Expand Down Expand Up @@ -73,8 +116,7 @@ private static async Task MakeDuplexStreamingCall(Interceptor interceptor, Task<
private static void OnDispose() { }

private static ClientInterceptorContext<object, object> CreateClientInterceptorContext(MethodType methodType) =>
new ClientInterceptorContext<object, object>(
new Method<object, object>(methodType, string.Empty, string.Empty, _marshaller, _marshaller),
new(new Method<object, object>(methodType, string.Empty, string.Empty, _marshaller, _marshaller),
null, new CallOptions(new Metadata()));

private class TestAsyncStreamReader : IAsyncStreamReader<object> {
Expand Down

0 comments on commit 34e797d

Please sign in to comment.