diff --git a/src/AcceptanceTestsHolder/AcceptanceTestsHolder.csproj b/src/AcceptanceTestsHolder/AcceptanceTestsHolder.csproj index 647a8de99..3d928d4d4 100644 --- a/src/AcceptanceTestsHolder/AcceptanceTestsHolder.csproj +++ b/src/AcceptanceTestsHolder/AcceptanceTestsHolder.csproj @@ -241,6 +241,7 @@ + diff --git a/src/AcceptanceTestsHolder/App_Packages/When_using_outbox_but_no_sagas.cs b/src/AcceptanceTestsHolder/App_Packages/When_using_outbox_but_no_sagas.cs new file mode 100644 index 000000000..e4721cb3a --- /dev/null +++ b/src/AcceptanceTestsHolder/App_Packages/When_using_outbox_but_no_sagas.cs @@ -0,0 +1,46 @@ +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTests; +using NServiceBus.AcceptanceTests.EndpointTemplates; +using NServiceBus.Features; +using NUnit.Framework; + +[TestFixture] +public class When_using_outbox_but_no_sagas : NServiceBusAcceptanceTest +{ + [Test] + public async Task Should_be_able_to_start_the_endpoint() + { + // The EndpointsStarted flag is set by acceptance framework + var context = await Scenario.Define() + .WithEndpoint() + .Done(c => c.EndpointsStarted) + .Run() + .ConfigureAwait(false); + + Assert.True(context.EndpointsStarted); + } + + public class Context : ScenarioContext + { + } + + public class OutboxEndpointWithSagasDisabled : EndpointConfigurationBuilder + { + public OutboxEndpointWithSagasDisabled() + { + EndpointSetup(c => + { + c.DisableFeature(); + c.EnableOutbox(); + }); + } + } + + public class StartSagaMessage : IMessage + { + public string Property { get; set; } + } + +} \ No newline at end of file diff --git a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MsSql.approved.txt b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MsSql.approved.txt index 70e30ec82..961e09ec2 100644 --- a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MsSql.approved.txt +++ b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MsSql.approved.txt @@ -3,8 +3,9 @@ declare @dummy int; merge [TheSchema].[TheTablePrefixSubscriptionData] with (holdlock, tablock) as target using(select @Endpoint as Endpoint, @Subscriber as Subscriber, @MessageType as MessageType) as source on target.Subscriber = source.Subscriber - and target.MessageType = source.MessageType - and ((target.Endpoint = source.Endpoint) or (target.Endpoint is null and source.endpoint is null)) + and target.MessageType = source.MessageType +when matched and source.Endpoint is not null and (target.Endpoint is null or target.Endpoint <> source.Endpoint) then +update set Endpoint = @Endpoint, PersistenceVersion = @PersistenceVersion when not matched then insert ( diff --git a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MySql.approved.txt b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MySql.approved.txt index 65973d7dd..79b2e2b42 100644 --- a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MySql.approved.txt +++ b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.MySql.approved.txt @@ -14,5 +14,5 @@ values @PersistenceVersion ) on duplicate key update - Endpoint = @Endpoint, + Endpoint = coalesce(@Endpoint, Endpoint), PersistenceVersion = @PersistenceVersion diff --git a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.Oracle.approved.txt b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.Oracle.approved.txt index 65bcb10e0..72b98fe44 100644 --- a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.Oracle.approved.txt +++ b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.Oracle.approved.txt @@ -16,6 +16,15 @@ begin ); commit; exception - when DUP_VAL_ON_INDEX - then ROLLBACK; + when DUP_VAL_ON_INDEX then + if :Endpoint is not null then + update "THETABLEPREFIXSS" set + Endpoint = :Endpoint, + PersistenceVersion = :PersistenceVersion + where + MessageType = :MessageType + and Subscriber = :Subscriber; + else + ROLLBACK; + end if; end; diff --git a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.PostgreSql.approved.txt b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.PostgreSql.approved.txt index 443800388..554bcc83a 100644 --- a/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.PostgreSql.approved.txt +++ b/src/SqlPersistence.Tests/Subscription/CommandTests/SubscriptionCommandTests.Subscribe.PostgreSql.approved.txt @@ -16,5 +16,5 @@ values @PersistenceVersion ) on conflict ("Id") do update - set "Endpoint" = @Endpoint, + set "Endpoint" = coalesce(@Endpoint, "public"."TheTablePrefixSubscriptionData"."Endpoint"), "PersistenceVersion" = @PersistenceVersion diff --git a/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_different_endpoint_name.approved.txt b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_different_endpoint_name.approved.txt new file mode 100644 index 000000000..c1f5e4c73 --- /dev/null +++ b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_different_endpoint_name.approved.txt @@ -0,0 +1,6 @@ +[ + { + "TransportAddress": "e@machine1", + "Endpoint": "e2" + } +] \ No newline at end of file diff --git a/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_should_not_downgrade.approved.txt b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_should_not_downgrade.approved.txt new file mode 100644 index 000000000..daba79c86 --- /dev/null +++ b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_should_not_downgrade.approved.txt @@ -0,0 +1,6 @@ +[ + { + "TransportAddress": "e@machine1", + "Endpoint": "endpoint" + } +] \ No newline at end of file diff --git a/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_version_migration.approved.txt b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_version_migration.approved.txt new file mode 100644 index 000000000..daba79c86 --- /dev/null +++ b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.Subscribe_version_migration.approved.txt @@ -0,0 +1,6 @@ +[ + { + "TransportAddress": "e@machine1", + "Endpoint": "endpoint" + } +] \ No newline at end of file diff --git a/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.cs b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.cs index 25bcbec68..7d336f89c 100644 --- a/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.cs +++ b/src/SqlPersistence.Tests/Subscription/SubscriptionPersisterTests.cs @@ -223,6 +223,54 @@ public void Subscribe_duplicate_add() #endif } + [Test] + public void Subscribe_version_migration() + { + var persister = Setup(schema); + var type1 = new MessageType("type1", new Version(0, 0, 0, 0)); + //NSB 5.x: endpoint is null + persister.Subscribe(new Subscriber("e@machine1", null), type1, null).Await(); + //NSB 6.x: same subscriber now mentions endpoint + persister.Subscribe(new Subscriber("e@machine1", "endpoint"), type1, null).Await(); + var result = persister.GetSubscribers(type1).Result.ToList(); + Assert.IsNotEmpty(result); +#if NET452 + ObjectApprover.VerifyWithJson(result); +#endif + } + + [Test] + public void Subscribe_different_endpoint_name() + { + var persister = Setup(schema); + var type1 = new MessageType("type1", new Version(0, 0, 0, 0)); + //NSB 6.x: old endpoint value + persister.Subscribe(new Subscriber("e@machine1", "e1"), type1, null).Await(); + //NSB 6.x: same address, new endpoint value + persister.Subscribe(new Subscriber("e@machine1", "e2"), type1, null).Await(); + var result = persister.GetSubscribers(type1).Result.ToList(); + Assert.IsNotEmpty(result); +#if NET452 + ObjectApprover.VerifyWithJson(result); +#endif + } + + [Test] + public void Subscribe_should_not_downgrade() + { + var persister = Setup(schema); + var type1 = new MessageType("type1", new Version(0, 0, 0, 0)); + //NSB 6.x: subscriber contains endpoint + persister.Subscribe(new Subscriber("e@machine1", "endpoint"), type1, null).Await(); + //NSB 5.x: endpoint is null, don't want to remove endpoint value from table though + persister.Subscribe(new Subscriber("e@machine1", null), type1, null).Await(); + var result = persister.GetSubscribers(type1).Result.ToList(); + Assert.IsNotEmpty(result); +#if NET452 + ObjectApprover.VerifyWithJson(result); +#endif + } + [Test] public void Unsubscribe() { diff --git a/src/SqlPersistence/SqlPersistenceStorageSessionExtensions.cs b/src/SqlPersistence/SqlPersistenceStorageSessionExtensions.cs index 5e6ee66bc..9649aac71 100644 --- a/src/SqlPersistence/SqlPersistenceStorageSessionExtensions.cs +++ b/src/SqlPersistence/SqlPersistenceStorageSessionExtensions.cs @@ -46,8 +46,14 @@ public static Task GetSagaData(this SynchronizedStorageSes Guard.AgainstNull(nameof(context), context); Guard.AgainstNull(nameof(appendParameters), appendParameters); Guard.AgainstNullAndEmpty(nameof(whereClause), whereClause); + var writableContextBag = (ContextBag)context; var sqlSession = session.GetSqlStorageSession(); + + if (sqlSession.InfoCache == null) + { + throw new Exception("Cannot load saga data because the Sagas feature is disabled in the endpoint."); + } return SagaPersister.GetByWhereClause(whereClause, session, writableContextBag, appendParameters, sqlSession.InfoCache); } } diff --git a/src/SqlPersistence/Subscription/SqlDialect_MsSqlServer.cs b/src/SqlPersistence/Subscription/SqlDialect_MsSqlServer.cs index 0b18b5e39..09e5f6a73 100644 --- a/src/SqlPersistence/Subscription/SqlDialect_MsSqlServer.cs +++ b/src/SqlPersistence/Subscription/SqlDialect_MsSqlServer.cs @@ -21,8 +21,9 @@ internal override string GetSubscriptionSubscribeCommand(string tableName) merge {tableName} with (holdlock, tablock) as target using(select @Endpoint as Endpoint, @Subscriber as Subscriber, @MessageType as MessageType) as source on target.Subscriber = source.Subscriber - and target.MessageType = source.MessageType - and ((target.Endpoint = source.Endpoint) or (target.Endpoint is null and source.endpoint is null)) + and target.MessageType = source.MessageType +when matched and source.Endpoint is not null and (target.Endpoint is null or target.Endpoint <> source.Endpoint) then +update set Endpoint = @Endpoint, PersistenceVersion = @PersistenceVersion when not matched then insert ( diff --git a/src/SqlPersistence/Subscription/SqlDialect_MySql.cs b/src/SqlPersistence/Subscription/SqlDialect_MySql.cs index 296a049aa..206748055 100644 --- a/src/SqlPersistence/Subscription/SqlDialect_MySql.cs +++ b/src/SqlPersistence/Subscription/SqlDialect_MySql.cs @@ -32,7 +32,7 @@ insert into {tableName} @PersistenceVersion ) on duplicate key update - Endpoint = @Endpoint, + Endpoint = coalesce(@Endpoint, Endpoint), PersistenceVersion = @PersistenceVersion "; } diff --git a/src/SqlPersistence/Subscription/SqlDialect_Oracle.cs b/src/SqlPersistence/Subscription/SqlDialect_Oracle.cs index d7eff9022..1f68f2853 100644 --- a/src/SqlPersistence/Subscription/SqlDialect_Oracle.cs +++ b/src/SqlPersistence/Subscription/SqlDialect_Oracle.cs @@ -34,8 +34,17 @@ insert into {tableName} ); commit; exception - when DUP_VAL_ON_INDEX - then ROLLBACK; + when DUP_VAL_ON_INDEX then + if :Endpoint is not null then + update {tableName} set + Endpoint = :Endpoint, + PersistenceVersion = :PersistenceVersion + where + MessageType = :MessageType + and Subscriber = :Subscriber; + else + ROLLBACK; + end if; end; "; } diff --git a/src/SqlPersistence/Subscription/SqlDialect_PostgreSql.cs b/src/SqlPersistence/Subscription/SqlDialect_PostgreSql.cs index e2fb8e04c..a4b161d38 100644 --- a/src/SqlPersistence/Subscription/SqlDialect_PostgreSql.cs +++ b/src/SqlPersistence/Subscription/SqlDialect_PostgreSql.cs @@ -34,7 +34,7 @@ insert into {tableName} @PersistenceVersion ) on conflict (""Id"") do update - set ""Endpoint"" = @Endpoint, + set ""Endpoint"" = coalesce(@Endpoint, {tableName}.""Endpoint""), ""PersistenceVersion"" = @PersistenceVersion "; } diff --git a/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs b/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs index ce0d8f09f..27288be07 100644 --- a/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs +++ b/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs @@ -22,9 +22,14 @@ protected override void Setup(FeatureConfigurationContext context) var isOutboxEnabledForSqlPersistence = settings.IsFeatureActive(typeof(SqlOutboxFeature)); SagaInfoCache infoCache = null; - if (isOutboxEnabledForSqlPersistence || isSagasEnabledForSqlPersistence) + if (isSagasEnabledForSqlPersistence) { infoCache = BuildSagaInfoCache(sqlDialect, settings); + } + + if (isOutboxEnabledForSqlPersistence || isSagasEnabledForSqlPersistence) + { + //Info cache can be null if Outbox is enabled but Sagas are disabled. container.ConfigureComponent(() => new SynchronizedStorage(connectionBuilder, infoCache), DependencyLifecycle.SingleInstance); container.ConfigureComponent(() => new StorageAdapter(connectionBuilder, infoCache, sqlDialect), DependencyLifecycle.SingleInstance); }