Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Stream Revision Echoed from the Server #73

Merged
merged 4 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/EventStore.Client.Common/protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,26 @@ message AppendResp {
}

message WrongExpectedVersion {
oneof current_revision_option_20_6_0 {
uint64 current_revision_20_6_0 = 1;
event_store.client.Empty no_stream_20_6_0 = 2;
}
oneof expected_revision_option_20_6_0 {
uint64 expected_revision_20_6_0 = 3;
event_store.client.Empty any_20_6_0 = 4;
event_store.client.Empty stream_exists_20_6_0 = 5;
}
oneof current_revision_option {
uint64 current_revision = 1;
event_store.client.Empty no_stream = 2;
uint64 current_revision = 6;
event_store.client.Empty current_no_stream = 7;
}
oneof expected_revision_option {
uint64 expected_revision = 3;
event_store.client.Empty any = 4;
event_store.client.Empty stream_exists = 5;
uint64 expected_revision = 8;
event_store.client.Empty expected_any = 9;
event_store.client.Empty expected_stream_exists = 10;
event_store.client.Empty expected_no_stream = 11;
}

}
}

Expand Down
58 changes: 33 additions & 25 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Google.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.Logging;

#nullable enable
Expand Down Expand Up @@ -125,9 +124,8 @@ await call.RequestStream.WriteAsync(new AppendReq {
var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
writeResult = new SuccessResult(response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
Expand All @@ -138,40 +136,50 @@ await call.RequestStream.WriteAsync(new AppendReq {
header.Options.StreamIdentifier, writeResult.LogPosition, writeResult.NextExpectedStreamRevision);
} else {
if (response.WrongExpectedVersion != null) {
var currentRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.NoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision), currentRevision);

header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision),
actualStreamRevision);

if (operationOptions.ThrowOnAppendFailure) {
if (header.Options.ExpectedStreamRevisionCase ==
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Revision) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
currentRevision);
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
}

var streamState = header.Options.ExpectedStreamRevisionCase switch {
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Any =>
StreamState.Any,
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.NoStream =>
StreamState.NoStream,
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.StreamExists =>
StreamState.StreamExists,
var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedStreamExists =>
StreamState.StreamExists,
_ => throw new InvalidOperationException()
};

throw new WrongExpectedVersionException(header.Options.StreamIdentifier,
streamState, currentRevision);
expectedStreamState, actualStreamRevision);
}

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier,
shaan1337 marked this conversation as resolved.
Show resolved Hide resolved
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
} else {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier,
StreamRevision.None,
actualStreamRevision);
}

writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier, currentRevision);
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
Expand Down
15 changes: 15 additions & 0 deletions src/EventStore.Client.Streams/WrongExpectedVersionResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,20 @@ public WrongExpectedVersionResult(string streamName, StreamRevision nextExpected
NextExpectedStreamRevision = nextExpectedStreamRevision;
LogPosition = default;
}

/// <summary>
/// Construct a new <see cref="WrongExpectedVersionResult"/>.
/// </summary>
/// <param name="streamName"></param>
/// <param name="nextExpectedStreamRevision"></param>
/// <param name="actualStreamRevision"></param>
public WrongExpectedVersionResult(string streamName, StreamRevision nextExpectedStreamRevision,
StreamRevision actualStreamRevision) {
StreamName = streamName;
ActualVersion = actualStreamRevision.ToInt64();
NextExpectedVersion = nextExpectedStreamRevision.ToInt64();
NextExpectedStreamRevision = nextExpectedStreamRevision;
LogPosition = default;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
using System.Threading;
using System.Threading.Tasks;
using Ductus.FluentDocker.Builders;
using Ductus.FluentDocker.Commands;
using Ductus.FluentDocker.Model.Builders;
using Ductus.FluentDocker.Services;
using Ductus.FluentDocker.Services.Extensions;
using Polly;
using Serilog;
using Serilog.Events;
Expand Down