Skip to content

Commit

Permalink
Merge pull request #52 from thefringeninja/channel-options
Browse files Browse the repository at this point in the history
Use gRPC Auth Pipeline Instead of Metadata
  • Loading branch information
shaan1337 authored Jul 1, 2020
2 parents 65edf78 + 27118d1 commit 540fd9f
Show file tree
Hide file tree
Showing 20 changed files with 101 additions and 74 deletions.
34 changes: 34 additions & 0 deletions src/EventStore.Client.Common/EventStoreClientSettingsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Threading;
using Grpc.Core;

#nullable enable
namespace EventStore.Client {
internal static class EventStoreCallOptions {
public static CallOptions Create(EventStoreClientSettings settings,
EventStoreClientOperationOptions operationOptions, UserCredentials? userCredentials,
CancellationToken cancellationToken) => new CallOptions(
cancellationToken: cancellationToken,
deadline: DeadlineAfter(operationOptions.TimeoutAfter),
headers: new Metadata(),
credentials: CallCredentials.FromInterceptor(async (context, metadata) => {
var credentials = settings.DefaultCredentials ?? userCredentials;
if (credentials == null) {
return;
}
var authorizationHeader = await settings.OperationOptions
.GetAuthenticationHeaderValue(credentials, CancellationToken.None)
.ConfigureAwait(false);
metadata.Add(Constants.Headers.Authorization, authorizationHeader);
})
);

private static DateTime? DeadlineAfter(TimeSpan? timeoutAfter) => !timeoutAfter.HasValue
? new DateTime?()
: timeoutAfter.Value == TimeSpan.MaxValue || timeoutAfter.Value == Timeout.InfiniteTimeSpan
? DateTime.MaxValue
: DateTime.UtcNow.Add(timeoutAfter.Value);
}
}
13 changes: 0 additions & 13 deletions src/EventStore.Client.Common/RequestMetadata.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,29 @@ public partial class EventStoreOperationsClient {
public async Task ShutdownAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await _client.ShutdownAsync(EmptyResult, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
await _client.ShutdownAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}

public async Task MergeIndexesAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await _client.MergeIndexesAsync(EmptyResult, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
await _client.MergeIndexesAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}

public async Task ResignNodeAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await _client.ResignNodeAsync(EmptyResult, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
await _client.ResignNodeAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}

public async Task SetNodePriorityAsync(int nodePriority,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await _client.SetNodePriorityAsync(new SetNodePriorityReq {Priority = nodePriority},
RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
}

var result = await _client.StartScavengeAsync(new StartScavengeReq {
Options = new StartScavengeReq.Types.Options {
ThreadCount = threadCount,
StartFromChunk = startFromChunk
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
Options = new StartScavengeReq.Types.Options {
ThreadCount = threadCount,
StartFromChunk = startFromChunk
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand All @@ -40,11 +39,10 @@ public async Task<DatabaseScavengeResult> StopScavengeAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var result = await _client.StopScavengeAsync(new StopScavengeReq {
Options = new StopScavengeReq.Types.Options {
ScavengeId = scavengeId
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
Options = new StopScavengeReq.Types.Options {
ScavengeId = scavengeId
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ await _client.CreateAsync(new CreateReq {
ReadBatchSize = settings.ReadBatchSize
}
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ await _client.DeleteAsync(new DeleteReq {
StreamIdentifier = streamName,
GroupName = groupName
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public Task<PersistentSubscription> SubscribeAsync(string streamName, string gro
throw new ArgumentOutOfRangeException(nameof(bufferSize));
}

var call = _client.Read(RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
var call = _client.Read(EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));

return PersistentSubscription.Confirm(call, new ReadReq.Types.Options {
BufferSize = bufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ await _client.UpdateAsync(new UpdateReq {
ReadBatchSize = settings.ReadBatchSize
}
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public async Task EnableAsync(string name, UserCredentials? userCredentials = nu
Options = new EnableReq.Types.Options {
Name = name
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

Expand All @@ -30,14 +30,14 @@ private async Task DisableInternalAsync(string name, bool writeCheckpoint, UserC
Name = name,
WriteCheckpoint = writeCheckpoint
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

public async Task RestartSubsystemAsync(UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await _client.RestartSubsystemAsync(new Empty(), RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
cancellationToken: cancellationToken);
await _client.RestartSubsystemAsync(new Empty(),
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public async Task CreateOneTimeAsync(string query, UserCredentials? userCredenti
OneTime = new Empty(),
Query = query
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

Expand All @@ -26,7 +26,7 @@ public async Task CreateContinuousAsync(string name, string query, bool trackEmi
},
Query = query
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

Expand All @@ -39,7 +39,7 @@ public async Task CreateTransientAsync(string name, string query, UserCredential
},
Query = query
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private async ValueTask<Value> GetResultInternalAsync(string name, string? parti
Name = name,
Partition = partition ?? string.Empty
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

var response = await call.ResponseAsync.ConfigureAwait(false);
return response.Result;
Expand Down Expand Up @@ -89,7 +89,7 @@ private async ValueTask<Value> GetStateInternalAsync(string name, string? partit
Name = name,
Partition = partition ?? string.Empty
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

var response = await call.ResponseAsync.ConfigureAwait(false);
return response.State;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private async IAsyncEnumerable<ProjectionDetails> ListInternalAsync(StatisticsRe
[EnumeratorCancellation] CancellationToken cancellationToken) {
using var call = _client.Statistics(new StatisticsReq {
Options = options
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials));
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

await foreach (var projectionDetails in call.ResponseStream
.ReadAllAsync(cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ public async Task UpdateAsync(string name, string query, bool? emitEnabled = nul
}

using var call = _client.UpdateAsync(new UpdateReq {
Options = options
},
RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
Options = options
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

await call.ResponseAsync.ConfigureAwait(false);
}
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private async Task<IWriteResult> AppendToStreamInternal(
EventStoreClientOperationOptions operationOptions,
UserCredentials? userCredentials,
CancellationToken cancellationToken) {
using var call = _client.Append(RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
DeadLine.After(operationOptions.TimeoutAfter), cancellationToken);
using var call = _client.Append(EventStoreCallOptions.Create(Settings, operationOptions,
userCredentials, cancellationToken));

IWriteResult writeResult;
try {
Expand Down
7 changes: 4 additions & 3 deletions src/EventStore.Client.Streams/EventStoreClient.Delete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ private Task<DeleteResult> SoftDeleteAsync(
}
}, operationOptions, userCredentials, cancellationToken);

private async Task<DeleteResult> DeleteInternal(DeleteReq request, EventStoreClientOperationOptions operationOptions,
private async Task<DeleteResult> DeleteInternal(DeleteReq request,
EventStoreClientOperationOptions operationOptions,
UserCredentials? userCredentials,
CancellationToken cancellationToken) {
_log.LogDebug("Deleting stream {streamName}.", request.Options.StreamIdentifier);
var result = await _client.DeleteAsync(request, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
deadline: DeadLine.After(operationOptions.TimeoutAfter), cancellationToken);
var result = await _client.DeleteAsync(request,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));

return new DeleteResult(new Position(result.Position.CommitPosition, result.Position.PreparePosition));
}
Expand Down
14 changes: 7 additions & 7 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private ReadStreamResult ReadStreamAsync(
Count = (ulong)maxCount
}
},
Settings,
operationOptions,
userCredentials,
cancellationToken);
Expand Down Expand Up @@ -130,6 +131,7 @@ public class ReadStreamResult : IAsyncEnumerable<ResolvedEvent>, IAsyncEnumerato
public ReadStreamResult(
Streams.Streams.StreamsClient client,
ReadReq request,
EventStoreClientSettings settings,
EventStoreClientOperationOptions operationOptions,
UserCredentials? userCredentials, CancellationToken cancellationToken) {
if (request.Options.CountOptionCase == ReadReq.Types.Options.CountOptionOneofCase.Count &&
Expand All @@ -145,10 +147,9 @@ public ReadStreamResult(

request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()};
_moved = false;
_call = client.Read(
request, RequestMetadata.Create(userCredentials),
deadline: DeadLine.After(operationOptions.TimeoutAfter), cancellationToken).ResponseStream
.ReadAllAsync().GetAsyncEnumerator();
_call = client.Read(request,
EventStoreCallOptions.Create(settings, operationOptions, userCredentials, cancellationToken))
.ResponseStream.ReadAllAsync().GetAsyncEnumerator();

ReadState = GetStateInternal();

Expand Down Expand Up @@ -231,9 +232,8 @@ private static Exception ExceptionFromState(ReadState state, string streamName)

request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()};

using var call = _client.Read(
request, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
deadline: DeadLine.After(operationOptions.TimeoutAfter), cancellationToken);
using var call = _client.Read(request,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));

await foreach (var e in call.ResponseStream
.ReadAllAsync(cancellationToken)
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ private async Task<DeleteResult> TombstoneInternal(TombstoneReq request,
CancellationToken cancellationToken) {
_log.LogDebug("Tombstoning stream {streamName}.", request.Options.StreamIdentifier);

var result = await _client.TombstoneAsync(request, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
deadline: DeadLine.After(operationOptions.TimeoutAfter), cancellationToken);
var result = await _client.TombstoneAsync(request,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));

return new DeleteResult(new Position(result.Position.CommitPosition, result.Position.PreparePosition));
}
Expand Down
Loading

0 comments on commit 540fd9f

Please sign in to comment.