Skip to content

Commit

Permalink
Merge pull request #194 from thefringeninja/rfc-032
Browse files Browse the repository at this point in the history
Implement RFC 032 gRPC Client Deadlines
  • Loading branch information
timothycoleman authored Feb 18, 2022
2 parents 24fa50c + cf2c284 commit 786a246
Show file tree
Hide file tree
Showing 166 changed files with 900 additions and 987 deletions.
4 changes: 2 additions & 2 deletions samples/appending-events/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ private static async Task AppendWithConcurrencyCheck(EventStoreClient client) {
await client.AppendToStreamAsync("concurrency-stream", StreamRevision.None,
new[] {new EventData(Uuid.NewUuid(), "-", ReadOnlyMemory<byte>.Empty)});
#region append-with-concurrency-check

var clientOneRead = client.ReadStreamAsync(
Direction.Forwards,
"concurrency-stream",
StreamPosition.Start,
configureOperationOptions: options => options.ThrowOnAppendFailure = false);
StreamPosition.Start);
var clientOneRevision = (await clientOneRead.LastAsync()).Event.EventNumber.ToUInt64();

var clientTwoRead = client.ReadStreamAsync(Direction.Forwards, "concurrency-stream", StreamPosition.Start);
Expand Down
8 changes: 4 additions & 4 deletions samples/persistent-subscriptions/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ await client.CreateAsync(
"test-stream",
"subscription-group",
settings,
userCredentials);
userCredentials: userCredentials);
#endregion create-persistent-subscription-to-stream
}

Expand Down Expand Up @@ -56,7 +56,7 @@ await client.CreateToAllAsync(
"subscription-group",
filter,
settings,
userCredentials);
userCredentials: userCredentials);
#endregion create-persistent-subscription-to-all
}

Expand Down Expand Up @@ -101,7 +101,7 @@ await client.UpdateAsync(
"test-stream",
"subscription-group",
settings,
userCredentials);
userCredentials: userCredentials);
#endregion update-persistent-subscription
}

Expand All @@ -111,7 +111,7 @@ static async Task DeletePersistentSubscription(EventStorePersistentSubscriptions
await client.DeleteAsync(
"test-stream",
"subscription-group",
userCredentials);
userCredentials: userCredentials);
#endregion delete-persistent-subscription
}

Expand Down
19 changes: 15 additions & 4 deletions src/EventStore.Client.Common/EventStoreCallOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
using System.Threading;
using Grpc.Core;
using Timeout_ = System.Threading.Timeout;

#nullable enable
namespace EventStore.Client {
internal static class EventStoreCallOptions {
public static CallOptions Create(EventStoreClientSettings settings,
EventStoreClientOperationOptions operationOptions, UserCredentials? userCredentials,
CancellationToken cancellationToken) => new(
// deadline falls back to infinity
public static CallOptions CreateStreaming(EventStoreClientSettings settings,
TimeSpan? deadline = null, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
Create(settings, deadline, userCredentials, cancellationToken);

// deadline falls back to connection DefaultDeadline
public static CallOptions CreateNonStreaming(EventStoreClientSettings settings, TimeSpan? deadline,
UserCredentials? userCredentials, CancellationToken cancellationToken) => Create(settings,
deadline ?? settings.DefaultDeadline, userCredentials, cancellationToken);

private static CallOptions Create(EventStoreClientSettings settings, TimeSpan? deadline,
UserCredentials? userCredentials, CancellationToken cancellationToken) => new(
cancellationToken: cancellationToken,
deadline: DeadlineAfter(operationOptions.TimeoutAfter),
deadline: DeadlineAfter(deadline),
headers: new Metadata {
{
Constants.Headers.RequiresLeader,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.Operations;
Expand All @@ -10,82 +11,93 @@ public partial class EventStoreOperationsClient {
/// <summary>
/// Shuts down the EventStoreDB node.
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ShutdownAsync(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ShutdownAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Initiates an index merge operation.
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task MergeIndexesAsync(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).MergeIndexesAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Resigns a node.
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ResignNodeAsync(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ResignNodeAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Sets the node priority.
/// </summary>
/// <param name="nodePriority"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task SetNodePriorityAsync(int nodePriority,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).SetNodePriorityAsync(
new SetNodePriorityReq {Priority = nodePriority},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Restart persistent subscriptions
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task RestartPersistentSubscriptions(UserCredentials? userCredentials = null,
public async Task RestartPersistentSubscriptions(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).RestartPersistentSubscriptionsAsync(
EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ public partial class EventStoreOperationsClient {
/// </summary>
/// <param name="threadCount"></param>
/// <param name="startFromChunk"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public async Task<DatabaseScavengeResult> StartScavengeAsync(
int threadCount = 1,
int startFromChunk = 0,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
if (threadCount <= 0) {
Expand All @@ -37,8 +39,7 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
StartFromChunk = startFromChunk
}
},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return result.ScavengeResult switch {
Expand All @@ -53,11 +54,13 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
/// Stops a scavenge operation.
/// </summary>
/// <param name="scavengeId"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<DatabaseScavengeResult> StopScavengeAsync(
string scavengeId,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
Expand All @@ -66,7 +69,7 @@ public async Task<DatabaseScavengeResult> StopScavengeAsync(
Options = new StopScavengeReq.Types.Options {
ScavengeId = scavengeId
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}, EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System;
using System.Net.Http;
using EventStore.Client;
using EventStore.Client.Operations;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,54 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi
/// <param name="streamName"></param>
/// <param name="groupName"></param>
/// <param name="settings"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public async Task CreateAsync(string streamName, string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
public async Task CreateAsync(string streamName, string groupName, PersistentSubscriptionSettings settings,
TimeSpan? deadline = null, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: streamName,
groupName: groupName,
eventFilter: null,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
await CreateInternalAsync(streamName, groupName, null, settings, deadline, userCredentials,
cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Creates a filtered persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="eventFilter"></param>
/// <param name="settings"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, TimeSpan? deadline = null, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(SystemStreams.AllStream, groupName, eventFilter, settings, deadline,
userCredentials, cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Creates a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="settings"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName, PersistentSubscriptionSettings settings,
TimeSpan? deadline = null, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(SystemStreams.AllStream, groupName, null, settings, deadline, userCredentials,
cancellationToken)
.ConfigureAwait(false);

private async Task CreateInternalAsync(string streamName, string groupName, IEventFilter? eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
PersistentSubscriptionSettings settings, TimeSpan? deadline, UserCredentials? userCredentials,
CancellationToken cancellationToken) {
if (streamName == null) {
throw new ArgumentNullException(nameof(streamName));
}
Expand All @@ -154,13 +183,13 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve
}

if (streamName != SystemStreams.AllStream && settings.StartFrom != null &&
!(settings.StartFrom is StreamPosition)) {
settings.StartFrom is not StreamPosition) {
throw new ArgumentException(
$"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream");
}

if (streamName == SystemStreams.AllStream && settings.StartFrom != null &&
!(settings.StartFrom is Position)) {
settings.StartFrom is not Position) {
throw new ArgumentException(
$"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}
Expand Down Expand Up @@ -214,49 +243,8 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve
ReadBatchSize = settings.ReadBatchSize
}
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}, EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Creates a filtered persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="eventFilter"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Creates a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: null,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
Loading

0 comments on commit 786a246

Please sign in to comment.