Skip to content

Commit

Permalink
support keep alives
Browse files Browse the repository at this point in the history
  • Loading branch information
thefringeninja committed Jan 11, 2021
1 parent a63eaa1 commit e24ea8b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 49 deletions.
33 changes: 29 additions & 4 deletions src/EventStore.Client/ChannelFactory.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System;
using System.Net;
using Grpc.Core;

#if !GRPC_CORE
using System.Net.Http;
using System.Threading;
using Grpc.Net.Client;
#else
using System.Collections.Generic;
#endif

#nullable enable
namespace EventStore.Client {
internal static class ChannelFactory {
Expand All @@ -16,13 +20,13 @@ public static ChannelBase CreateChannel(EventStoreClientSettings settings, Uri?
address ??= settings.ConnectivitySettings.Address;

#if !GRPC_CORE
if (address.Scheme == Uri.UriSchemeHttp ||
!settings.ConnectivitySettings.GossipOverHttps) {
if (address.Scheme == Uri.UriSchemeHttp ||!settings.ConnectivitySettings.GossipOverHttps) {
//this must be switched on before creation of the HttpMessageHandler
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
}

return GrpcChannel.ForAddress(address, new GrpcChannelOptions {
HttpClient = new HttpClient(settings.CreateHttpMessageHandler?.Invoke() ?? new SocketsHttpHandler(),
HttpClient = new HttpClient(CreateHandler(),
true) {
Timeout = Timeout.InfiniteTimeSpan,
DefaultRequestVersion = new Version(2, 0),
Expand All @@ -31,8 +35,29 @@ public static ChannelBase CreateChannel(EventStoreClientSettings settings, Uri?
Credentials = settings.ChannelCredentials,
DisposeHttpClient = true
});

HttpMessageHandler CreateHandler() {
if (settings.CreateHttpMessageHandler != null) {
return settings.CreateHttpMessageHandler.Invoke();
}

var handler = new SocketsHttpHandler();
if (settings.ConnectivitySettings.KeepAlive.HasValue) {
handler.KeepAlivePingDelay = settings.ConnectivitySettings.KeepAlive.Value;
}

return handler;
}
#else
return new Channel(address.Host, address.Port, settings.ChannelCredentials ?? ChannelCredentials.Insecure);
return new Channel(address.Host, address.Port, settings.ChannelCredentials ?? ChannelCredentials.Insecure,
GetChannelOptions());

IEnumerable<ChannelOption> GetChannelOptions() {
if (settings.ConnectivitySettings.KeepAlive.HasValue) {
yield return new ChannelOption("grpc.keepalive_time_ms",
(int)settings.ConnectivitySettings.KeepAlive.Value.TotalMilliseconds);
}
}
#endif
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/EventStore.Client/EventStoreClientConnectivitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public class EventStoreClientConnectivitySettings {
/// </summary>
public NodePreference NodePreference { get; set; }

/// <summary>
/// The optional amount of time to wait after which a keepalive ping is sent on the transport.
/// </summary>
public TimeSpan? KeepAlive { get; set; }

/// <summary>
/// True if pointing to a single EventStoreDB node.
Expand Down
57 changes: 33 additions & 24 deletions src/EventStore.Client/EventStoreClientSettings.ConnectionString.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ public partial class EventStoreClientSettings {
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
public static EventStoreClientSettings Create(string connectionString) {
return ConnectionStringParser.Parse(connectionString);
}
public static EventStoreClientSettings Create(string connectionString) =>
ConnectionStringParser.Parse(connectionString);

private static class ConnectionStringParser {
private const string SchemeSeparator = "://";
Expand All @@ -28,14 +27,15 @@ private static class ConnectionStringParser {
private const string QuestionMark = "?";

private const string Tls = nameof(Tls);
private const string ConnectionName = "ConnectionName";
private const string MaxDiscoverAttempts = "MaxDiscoverAttempts";
private const string DiscoveryInterval = "DiscoveryInterval";
private const string GossipTimeout = "GossipTimeout";
private const string NodePreference = "NodePreference";
private const string TlsVerifyCert = "TlsVerifyCert";
private const string OperationTimeout = "OperationTimeout";
private const string ThrowOnAppendFailure = "ThrowOnAppendFailure";
private const string ConnectionName = nameof(ConnectionName);
private const string MaxDiscoverAttempts = nameof(MaxDiscoverAttempts);
private const string DiscoveryInterval = nameof(DiscoveryInterval);
private const string GossipTimeout = nameof(GossipTimeout);
private const string NodePreference = nameof(NodePreference);
private const string TlsVerifyCert = nameof(TlsVerifyCert);
private const string OperationTimeout = nameof(OperationTimeout);
private const string ThrowOnAppendFailure = nameof(ThrowOnAppendFailure);
private const string KeepAlive = nameof(KeepAlive);

private const string UriSchemeDiscover = "esdb+discover";

Expand All @@ -53,7 +53,8 @@ private static class ConnectionStringParser {
{Tls, typeof(bool)},
{TlsVerifyCert, typeof(bool)},
{OperationTimeout, typeof(int)},
{ThrowOnAppendFailure, typeof(bool)}
{ThrowOnAppendFailure, typeof(bool)},
{KeepAlive, typeof(int)}
};

public static EventStoreClientSettings Parse(string connectionString) {
Expand Down Expand Up @@ -159,24 +160,16 @@ private static EventStoreClientSettings CreateSettings(string scheme, (string us
useTls = (bool)tls;
}

if (typedOptions.TryGetValue(TlsVerifyCert, out object tlsVerifyCert)) {
if (!(bool)tlsVerifyCert) {
#if !GRPC_CORE
settings.CreateHttpMessageHandler = () => new SocketsHttpHandler {
SslOptions = {
RemoteCertificateValidationCallback = delegate { return true; }
}
};
#endif
}
}

if (typedOptions.TryGetValue(OperationTimeout, out object operationTimeout))
settings.OperationOptions.TimeoutAfter = TimeSpan.FromMilliseconds((int)operationTimeout);

if (typedOptions.TryGetValue(ThrowOnAppendFailure, out object throwOnAppendFailure))
settings.OperationOptions.ThrowOnAppendFailure = (bool)throwOnAppendFailure;

if (typedOptions.TryGetValue(KeepAlive, out var keepAliveMs)) {
settings.ConnectivitySettings.KeepAlive = TimeSpan.FromMilliseconds((int)keepAliveMs);
}

if (hosts.Length == 1 && scheme != UriSchemeDiscover) {
connSettings.Address = hosts[0].ToUri(useTls);
} else {
Expand All @@ -189,6 +182,22 @@ private static EventStoreClientSettings CreateSettings(string scheme, (string us
connSettings.GossipOverHttps = useTls;
}

#if !GRPC_CORE
settings.CreateHttpMessageHandler = () => {
var handler = new SocketsHttpHandler();

if (typedOptions.TryGetValue(TlsVerifyCert, out var tlsVerifyCert) && !(bool)tlsVerifyCert) {
handler.SslOptions.RemoteCertificateValidationCallback = delegate { return true; };
}

if (settings.ConnectivitySettings.KeepAlive.HasValue) {
handler.KeepAlivePingDelay = settings.ConnectivitySettings.KeepAlive.Value;
}

return handler;
};
#endif

return settings;
}

Expand Down
45 changes: 24 additions & 21 deletions test/EventStore.Client.Tests/ConnectionStringTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void connection_string_with_duplicate_key_should_throw(string connectionS
InlineData("esdb://user:[email protected]/?discoveryInterval=abcd"),
InlineData("esdb://user:[email protected]/?gossipTimeout=defg"),
InlineData("esdb://user:[email protected]/?tlsVerifyCert=truee"),
InlineData("esdb://user:[email protected]/?nodePreference=blabla")]
InlineData("esdb://user:[email protected]/?nodePreference=blabla"),
InlineData("esdb://user:[email protected]/?keepAlive=blabla")]
public void connection_string_with_invalid_settings_should_throw(string connectionString) {
Assert.Throws<InvalidSettingException>(() => EventStoreClientSettings.Create(connectionString));
}
Expand All @@ -107,10 +108,8 @@ public void with_different_node_preferences(string nodePreference, NodePreferenc

[Fact]
public void with_valid_single_node_connection_string() {
EventStoreClientSettings settings;

settings = EventStoreClientSettings.Create(
"esdb://user:[email protected]/?maxDiscoverAttempts=13&DiscoveryInterval=37&gossipTimeout=33&nOdEPrEfErence=FoLLoWer&tlsVerifyCert=false");
var settings = EventStoreClientSettings.Create(
"esdb://user:[email protected]/?maxDiscoverAttempts=13&DiscoveryInterval=37&gossipTimeout=33&nOdEPrEfErence=FoLLoWer&tlsVerifyCert=false&keepAlive=10");
Assert.Equal("user", settings.DefaultCredentials.Username);
Assert.Equal("pass", settings.DefaultCredentials.Password);
Assert.Equal("https://127.0.0.1:2113/", settings.ConnectivitySettings.Address.ToString());
Expand All @@ -121,14 +120,15 @@ public void with_valid_single_node_connection_string() {
Assert.Equal(37, settings.ConnectivitySettings.DiscoveryInterval.TotalMilliseconds);
Assert.Equal(33, settings.ConnectivitySettings.GossipTimeout.TotalMilliseconds);
Assert.Equal(NodePreference.Follower, settings.ConnectivitySettings.NodePreference);
Assert.Equal(TimeSpan.FromMilliseconds(10), settings.ConnectivitySettings.KeepAlive);
#if !GRPC_CORE
#if !GRPC_CORE
Assert.NotNull(settings.CreateHttpMessageHandler);
#endif

#endif
settings = EventStoreClientSettings.Create(
"esdb://127.0.0.1?connectionName=test&maxDiscoverAttempts=13&DiscoveryInterval=37&nOdEPrEfErence=FoLLoWer&tls=true&tlsVerifyCert=true&operationTimeout=330&throwOnAppendFailure=faLse");
"esdb://127.0.0.1?connectionName=test&maxDiscoverAttempts=13&DiscoveryInterval=37&nOdEPrEfErence=FoLLoWer&tls=true&tlsVerifyCert=true&operationTimeout=330&throwOnAppendFailure=faLse&KEepAlive=10");
Assert.Null(settings.DefaultCredentials);
Assert.Equal("test", settings.ConnectionName);
Assert.Equal("https://127.0.0.1:2113/", settings.ConnectivitySettings.Address.ToString());
Expand All @@ -139,8 +139,9 @@ public void with_valid_single_node_connection_string() {
Assert.Equal(13, settings.ConnectivitySettings.MaxDiscoverAttempts);
Assert.Equal(37, settings.ConnectivitySettings.DiscoveryInterval.TotalMilliseconds);
Assert.Equal(NodePreference.Follower, settings.ConnectivitySettings.NodePreference);
Assert.Equal(TimeSpan.FromMilliseconds(10), settings.ConnectivitySettings.KeepAlive);
#if !GRPC_CORE
Assert.Null(settings.CreateHttpMessageHandler);
Assert.NotNull(settings.CreateHttpMessageHandler);
#endif

Assert.Equal(330, settings.OperationOptions.TimeoutAfter.Value.TotalMilliseconds);
Expand All @@ -153,15 +154,15 @@ public void with_valid_single_node_connection_string() {
Assert.Null(settings.ConnectivitySettings.IpGossipSeeds);
Assert.Null(settings.ConnectivitySettings.DnsGossipSeeds);
Assert.True(settings.ConnectivitySettings.GossipOverHttps);
Assert.Null(settings.ConnectivitySettings.KeepAlive);
#if !GRPC_CORE
Assert.Null(settings.CreateHttpMessageHandler);
Assert.NotNull(settings.CreateHttpMessageHandler);
#endif
}

[Fact]
public void with_default_settings() {
EventStoreClientSettings settings;
settings = EventStoreClientSettings.Create("esdb://hostname:4321/");
var settings = EventStoreClientSettings.Create("esdb://hostname:4321/");

Assert.Null(settings.ConnectionName);
Assert.Equal(EventStoreClientConnectivitySettings.Default.Address.Scheme,
Expand All @@ -179,18 +180,18 @@ public void with_default_settings() {
settings.ConnectivitySettings.NodePreference);
Assert.Equal(EventStoreClientConnectivitySettings.Default.GossipOverHttps,
settings.ConnectivitySettings.GossipOverHttps);
Assert.Equal(EventStoreClientOperationOptions.Default.TimeoutAfter.Value.TotalMilliseconds,
settings.OperationOptions.TimeoutAfter.Value.TotalMilliseconds);
Assert.Equal(EventStoreClientOperationOptions.Default.TimeoutAfter!.Value.TotalMilliseconds,
settings.OperationOptions.TimeoutAfter!.Value.TotalMilliseconds);
Assert.Equal(EventStoreClientOperationOptions.Default.ThrowOnAppendFailure,
settings.OperationOptions.ThrowOnAppendFailure);
Assert.Equal(EventStoreClientConnectivitySettings.Default.KeepAlive,
settings.ConnectivitySettings.KeepAlive);
}

[Fact]
public void with_valid_cluster_connection_string() {
EventStoreClientSettings settings;

settings = EventStoreClientSettings.Create(
"esdb://user:[email protected],127.0.0.2:3321,127.0.0.3/?maxDiscoverAttempts=13&DiscoveryInterval=37&nOdEPrEfErence=FoLLoWer&tlsVerifyCert=false");
var settings = EventStoreClientSettings.Create(
"esdb://user:[email protected],127.0.0.2:3321,127.0.0.3/?maxDiscoverAttempts=13&DiscoveryInterval=37&nOdEPrEfErence=FoLLoWer&tlsVerifyCert=false&KEepAlive=10");
Assert.Equal("user", settings.DefaultCredentials.Username);
Assert.Equal("pass", settings.DefaultCredentials.Password);
Assert.NotEmpty(settings.ConnectivitySettings.GossipSeeds);
Expand All @@ -207,13 +208,14 @@ public void with_valid_cluster_connection_string() {
Assert.Equal(13, settings.ConnectivitySettings.MaxDiscoverAttempts);
Assert.Equal(37, settings.ConnectivitySettings.DiscoveryInterval.TotalMilliseconds);
Assert.Equal(NodePreference.Follower, settings.ConnectivitySettings.NodePreference);
Assert.Equal(TimeSpan.FromMilliseconds(10), settings.ConnectivitySettings.KeepAlive);
#if !GRPC_CORE
Assert.NotNull(settings.CreateHttpMessageHandler);
#endif


settings = EventStoreClientSettings.Create(
"esdb://user:pass@host1,host2:3321,127.0.0.3/?tls=false&maxDiscoverAttempts=13&DiscoveryInterval=37&nOdEPrEfErence=FoLLoWer&tlsVerifyCert=false");
"esdb://user:pass@host1,host2:3321,127.0.0.3/?tls=false&maxDiscoverAttempts=13&DiscoveryInterval=37&nOdEPrEfErence=FoLLoWer&tlsVerifyCert=false&KEepAlive=10");
Assert.Equal("user", settings.DefaultCredentials.Username);
Assert.Equal("pass", settings.DefaultCredentials.Password);
Assert.NotEmpty(settings.ConnectivitySettings.GossipSeeds);
Expand All @@ -230,6 +232,7 @@ public void with_valid_cluster_connection_string() {
Assert.Equal(13, settings.ConnectivitySettings.MaxDiscoverAttempts);
Assert.Equal(37, settings.ConnectivitySettings.DiscoveryInterval.TotalMilliseconds);
Assert.Equal(NodePreference.Follower, settings.ConnectivitySettings.NodePreference);
Assert.Equal(TimeSpan.FromMilliseconds(10), settings.ConnectivitySettings.KeepAlive);
#if !GRPC_CORE
Assert.NotNull(settings.CreateHttpMessageHandler);
#endif
Expand Down Expand Up @@ -264,19 +267,19 @@ public void with_different_tls_verify_cert_settings() {
EventStoreClientSettings settings;

settings = EventStoreClientSettings.Create("esdb://127.0.0.1/");
Assert.Null(settings.CreateHttpMessageHandler);
Assert.NotNull(settings.CreateHttpMessageHandler);

settings = EventStoreClientSettings.Create("esdb://127.0.0.1/?tlsVerifyCert=TrUe");
Assert.Null(settings.CreateHttpMessageHandler);
Assert.NotNull(settings.CreateHttpMessageHandler);

settings = EventStoreClientSettings.Create("esdb://127.0.0.1/?tlsVerifyCert=FaLsE");
Assert.NotNull(settings.CreateHttpMessageHandler);

settings = EventStoreClientSettings.Create("esdb://127.0.0.1,127.0.0.2:3321,127.0.0.3/");
Assert.Null(settings.CreateHttpMessageHandler);
Assert.NotNull(settings.CreateHttpMessageHandler);

settings = EventStoreClientSettings.Create("esdb://127.0.0.1,127.0.0.2:3321,127.0.0.3/?tlsVerifyCert=true");
Assert.Null(settings.CreateHttpMessageHandler);
Assert.NotNull(settings.CreateHttpMessageHandler);

settings = EventStoreClientSettings.Create(
"esdb://127.0.0.1,127.0.0.2:3321,127.0.0.3/?tlsVerifyCert=false");
Expand Down

0 comments on commit e24ea8b

Please sign in to comment.