Skip to content

Commit

Permalink
Add filtered persistent subscriptions (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
hayley-jean authored Jun 8, 2021
1 parent 5c97e6d commit a5a483d
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ message CreateReq {
event_store.client.Empty start = 2;
event_store.client.Empty end = 3;
}
oneof filter_option {
event_store.client.FilterOptions filter = 4;
event_store.client.Empty no_filter = 5;
}
}

message Position {
Expand Down
17 changes: 17 additions & 0 deletions src/EventStore.Client.Common/protos/shared.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,20 @@ message StreamIdentifier {
reserved 1 to 2;
bytes streamName = 3;
}

message FilterOptions {
oneof filter {
Expression stream_name = 1;
Expression event_type = 2;
}
oneof window {
uint32 max = 3;
event_store.client.Empty count = 4;
}
uint32 checkpointIntervalMultiplier = 5;

message Expression {
string regex = 1;
repeated string prefix = 2;
}
}
17 changes: 1 addition & 16 deletions src/EventStore.Client.Common/protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message ReadReq {
SubscriptionOptions subscription = 6;
}
oneof filter_option {
FilterOptions filter = 7;
event_store.client.FilterOptions filter = 7;
event_store.client.Empty no_filter = 8;
}
UUIDOption uuid_option = 9;
Expand Down Expand Up @@ -56,22 +56,7 @@ message ReadReq {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}
message FilterOptions {
oneof filter {
Expression stream_name = 1;
Expression event_type = 2;
}
oneof window {
uint32 max = 3;
event_store.client.Empty count = 4;
}
uint32 checkpointIntervalMultiplier = 5;

message Expression {
string regex = 1;
repeated string prefix = 2;
}
}
message UUIDOption {
oneof content {
event_store.client.Empty structured = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,84 @@ private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string
};
}

private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position) {
private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position, IEventFilter? filter) {
var allFilter = GetFilterOptions(filter);
CreateReq.Types.AllOptions allOptions;
if (position == Position.Start) {
return new CreateReq.Types.AllOptions {
Start = new Empty()
allOptions = new CreateReq.Types.AllOptions {
Start = new Empty(),
};
}

if (position == Position.End) {
return new CreateReq.Types.AllOptions {
else if (position == Position.End) {
allOptions = new CreateReq.Types.AllOptions {
End = new Empty()
};
} else {
allOptions = new CreateReq.Types.AllOptions {
Position = new CreateReq.Types.Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
};
}

return new CreateReq.Types.AllOptions {
Position = new CreateReq.Types.Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
if (allFilter is null) {
allOptions.NoFilter = new Empty();
} else {
allOptions.Filter = allFilter;
}

return allOptions;
}

private static FilterOptions? GetFilterOptions(IEventFilter? filter) {
if (filter == null) {
return null;
}

var options = filter switch {
StreamFilter _ => new FilterOptions {
StreamName = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
EventTypeFilter _ => new FilterOptions {
EventType = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
_ => throw new InvalidOperationException()
};

if (filter.MaxSearchWindow.HasValue) {
options.Max = filter.MaxSearchWindow.Value;
} else {
options.Count = new Empty();
}

return options;
}


Expand All @@ -68,6 +127,18 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public async Task CreateAsync(string streamName, string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: streamName,
groupName: groupName,
eventFilter: null,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

private async Task CreateInternalAsync(string streamName, string groupName, IEventFilter? eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
if (streamName == null) {
Expand All @@ -90,13 +161,17 @@ public async Task CreateAsync(string streamName, string groupName,
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}

if (eventFilter != null && streamName != SystemStreams.AllStream) {
throw new ArgumentException($"Filters are only supported when subscribing to {SystemStreams.AllStream}");
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
Stream = streamName != SystemStreams.AllStream ?
StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null,
All = streamName == SystemStreams.AllStream ?
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End)) : null,
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter) : null,
#pragma warning disable 612
StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
Expand All @@ -123,22 +198,44 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne
}

/// <summary>
/// Creates a persistent subscription to $all.
/// Creates a filtered persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="eventFilter"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName,
public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateAsync(
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Creates a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: null,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
14 changes: 7 additions & 7 deletions src/EventStore.Client.Streams/EventStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
_log = Settings.LoggerFactory?.CreateLogger<EventStoreClient>() ?? new NullLogger<EventStoreClient>();
}

private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
private static FilterOptions? GetFilterOptions(
SubscriptionFilterOptions? filterOptions) {
if (filterOptions == null) {
return null;
Expand All @@ -71,33 +71,33 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
var filter = filterOptions.Filter;

var options = filter switch {
StreamFilter _ => new ReadReq.Types.Options.Types.FilterOptions {
StreamFilter _ => new FilterOptions {
StreamName = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression {
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
EventTypeFilter _ => new ReadReq.Types.Options.Types.FilterOptions {
EventTypeFilter _ => new FilterOptions {
EventType = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression {
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit a5a483d

Please sign in to comment.