Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Stream Revision Echoed from the Server #73

Merged
merged 4 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/EventStore.Client.Common/protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,26 @@ message AppendResp {
}

message WrongExpectedVersion {
oneof current_revision_option_20_6_0 {
uint64 current_revision_20_6_0 = 1;
event_store.client.Empty no_stream_20_6_0 = 2;
}
oneof expected_revision_option_20_6_0 {
uint64 expected_revision_20_6_0 = 3;
event_store.client.Empty any_20_6_0 = 4;
event_store.client.Empty stream_exists_20_6_0 = 5;
}
oneof current_revision_option {
uint64 current_revision = 1;
event_store.client.Empty no_stream = 2;
uint64 current_revision = 6;
event_store.client.Empty current_no_stream = 7;
}
oneof expected_revision_option {
uint64 expected_revision = 3;
event_store.client.Empty any = 4;
event_store.client.Empty stream_exists = 5;
uint64 expected_revision = 8;
event_store.client.Empty expected_any = 9;
event_store.client.Empty expected_stream_exists = 10;
event_store.client.Empty expected_no_stream = 11;
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Client.Streams/ConditionalWriteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal static ConditionalWriteResult FromWriteResult(IWriteResult writeResult)
};

internal static ConditionalWriteResult FromWrongExpectedVersion(WrongExpectedVersionException ex)
=> new ConditionalWriteResult(ex.ActualStreamRevision, Position.End,
=> new ConditionalWriteResult(ex.ExpectedStreamRevision, Position.End,
ConditionalWriteStatus.VersionMismatch);

/// <inheritdoc />
Expand Down
58 changes: 33 additions & 25 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Google.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.Logging;

#nullable enable
Expand Down Expand Up @@ -125,9 +124,8 @@ await call.RequestStream.WriteAsync(new AppendReq {
var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
writeResult = new SuccessResult(response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
Expand All @@ -138,40 +136,50 @@ await call.RequestStream.WriteAsync(new AppendReq {
header.Options.StreamIdentifier, writeResult.LogPosition, writeResult.NextExpectedStreamRevision);
} else {
if (response.WrongExpectedVersion != null) {
var currentRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.NoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision), currentRevision);

header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision),
actualStreamRevision);

if (operationOptions.ThrowOnAppendFailure) {
if (header.Options.ExpectedStreamRevisionCase ==
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Revision) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
currentRevision);
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
}

var streamState = header.Options.ExpectedStreamRevisionCase switch {
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Any =>
StreamState.Any,
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.NoStream =>
StreamState.NoStream,
AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.StreamExists =>
StreamState.StreamExists,
var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedStreamExists =>
StreamState.StreamExists,
_ => throw new InvalidOperationException()
};

throw new WrongExpectedVersionException(header.Options.StreamIdentifier,
streamState, currentRevision);
expectedStreamState, actualStreamRevision);
}

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier,
shaan1337 marked this conversation as resolved.
Show resolved Hide resolved
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
} else {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier,
StreamRevision.None,
actualStreamRevision);
}

writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier, currentRevision);
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
Expand Down
15 changes: 15 additions & 0 deletions src/EventStore.Client.Streams/WrongExpectedVersionResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,20 @@ public WrongExpectedVersionResult(string streamName, StreamRevision nextExpected
NextExpectedStreamRevision = nextExpectedStreamRevision;
LogPosition = default;
}

/// <summary>
/// Construct a new <see cref="WrongExpectedVersionResult"/>.
/// </summary>
/// <param name="streamName"></param>
/// <param name="nextExpectedStreamRevision"></param>
/// <param name="actualStreamRevision"></param>
public WrongExpectedVersionResult(string streamName, StreamRevision nextExpectedStreamRevision,
StreamRevision actualStreamRevision) {
StreamName = streamName;
ActualVersion = actualStreamRevision.ToInt64();
NextExpectedVersion = nextExpectedStreamRevision.ToInt64();
NextExpectedStreamRevision = nextExpectedStreamRevision;
LogPosition = default;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class WrongExpectedVersionException : Exception {
/// </summary>
public StreamRevision ActualStreamRevision { get; }

/// <summary>
/// If available, the expected version specified for the operation that failed.
/// </summary>
public StreamRevision ExpectedStreamRevision { get; }

/// <summary>
/// Constructs a new instance of <see cref="WrongExpectedVersionException" /> with the expected and actual versions if available.
/// </summary>
Expand All @@ -38,6 +43,7 @@ public WrongExpectedVersionException(string streamName, StreamRevision expectedS
exception) {
StreamName = streamName;
ActualStreamRevision = actualStreamRevision;
ExpectedStreamRevision = expectedStreamRevision;
ExpectedVersion = expectedStreamRevision == StreamRevision.None ? new long?() : expectedStreamRevision.ToInt64();
ActualVersion = actualStreamRevision == StreamRevision.None ? new long?() : actualStreamRevision.ToInt64();
}
Expand All @@ -56,6 +62,7 @@ public WrongExpectedVersionException(string streamName, StreamState expectedStre
StreamName = streamName;
ActualStreamRevision = actualStreamRevision;
ActualVersion = actualStreamRevision == StreamRevision.None ? new long?() : actualStreamRevision.ToInt64();
ExpectedStreamRevision = StreamRevision.None;
}
}
}
2 changes: 1 addition & 1 deletion test/EventStore.Client.Streams.Tests/append_to_stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public async Task appending_with_wrong_expected_version_to_existing_stream_retur

var wrongExpectedVersionResult = (WrongExpectedVersionResult)writeResult;

Assert.Equal(StreamRevision.None, wrongExpectedVersionResult.NextExpectedStreamRevision);
Assert.Equal(new StreamRevision(1), wrongExpectedVersionResult.NextExpectedStreamRevision);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
using System.Threading;
using System.Threading.Tasks;
using Ductus.FluentDocker.Builders;
using Ductus.FluentDocker.Commands;
using Ductus.FluentDocker.Model.Builders;
using Ductus.FluentDocker.Services;
using Ductus.FluentDocker.Services.Extensions;
using Polly;
using Serilog;
using Serilog.Events;
Expand Down