From 167c94fb014f71dc6c03442fa940288d098cc24d Mon Sep 17 00:00:00 2001 From: Mark Gould Date: Sat, 3 Mar 2018 06:22:36 -0600 Subject: [PATCH 1/2] Fixes concurrency issue when trying to complete saga --- src/SqlPersistence/Saga/SagaPersister_Complete.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/SqlPersistence/Saga/SagaPersister_Complete.cs b/src/SqlPersistence/Saga/SagaPersister_Complete.cs index 64f12e4d4..39cdb4de4 100644 --- a/src/SqlPersistence/Saga/SagaPersister_Complete.cs +++ b/src/SqlPersistence/Saga/SagaPersister_Complete.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using NServiceBus; using NServiceBus.Extensibility; using NServiceBus.Persistence; @@ -21,7 +22,11 @@ internal async Task Complete(IContainSagaData sagaData, SynchronizedStorageSessi command.Transaction = sqlSession.Transaction; command.AddParameter("Id", sagaData.Id); command.AddParameter("Concurrency", concurrency); - await command.ExecuteNonQueryAsync().ConfigureAwait(false); + var affected = await command.ExecuteNonQueryAsync().ConfigureAwait(false); + if (affected != 1) + { + throw new Exception($"Optimistic concurrency violation when trying to complete saga {sagaInfo.SagaType.FullName} {sagaData.Id}. Expected version {concurrency}."); + } } } } \ No newline at end of file From 772947da45952619df0538a0611334b5d149ed2c Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Mon, 5 Mar 2018 12:28:13 +0100 Subject: [PATCH 2/2] Added a unit test to verify the correct Complete behavior wrt concurrency --- .../Saga/SagaPersisterTests.cs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/SqlPersistence.Tests/Saga/SagaPersisterTests.cs b/src/SqlPersistence.Tests/Saga/SagaPersisterTests.cs index 643c55cf1..f20e7686d 100644 --- a/src/SqlPersistence.Tests/Saga/SagaPersisterTests.cs +++ b/src/SqlPersistence.Tests/Saga/SagaPersisterTests.cs @@ -228,6 +228,47 @@ public async Task Complete() } } + [Test] + public async Task CompleteFailsOnConcurrencyViolation() + { + var endpointName = nameof(Complete); + var definition = new SagaDefinition( + tableSuffix: "SagaWithCorrelation", + name: "SagaWithCorrelation", + correlationProperty: new CorrelationProperty + ( + name: "CorrelationProperty", + type: CorrelationPropertyType.String + ), + transitionalCorrelationProperty: new CorrelationProperty + ( + name: "TransitionalCorrelationProperty", + type: CorrelationPropertyType.String + ) + ); + DropAndCreate(definition, endpointName, schema); + var id = Guid.NewGuid(); + var sagaData = new SagaWithCorrelation.SagaData + { + Id = id, + OriginalMessageId = "theOriginalMessageId", + Originator = "theOriginator", + CorrelationProperty = "theCorrelationProperty" + }; + + var persister = SetUp(endpointName, schema); + + using (var connection = dbConnection()) + using (var transaction = connection.BeginTransaction()) + using (var storageSession = new StorageSession(connection, transaction, true, null)) + { + await persister.Save(sagaData, storageSession, "theProperty").ConfigureAwait(false); + + const int invalidConcurrencyVersion = 42; + Assert.ThrowsAsync(() => persister.Complete(sagaData, storageSession, invalidConcurrencyVersion)); + } + } + public class SagaWithWeirdCharactersಠ_ಠ : SqlSaga, IAmStartedByMessages