diff --git a/src/SqlPersistence.Tests/APIApprovals.Approve.approved.txt b/src/SqlPersistence.Tests/APIApprovals.Approve.approved.txt index 52e8e563d..e3be49652 100644 --- a/src/SqlPersistence.Tests/APIApprovals.Approve.approved.txt +++ b/src/SqlPersistence.Tests/APIApprovals.Approve.approved.txt @@ -133,7 +133,11 @@ namespace NServiceBus public Oracle() { } } } - public class SqlDialectSettings + public abstract class SqlDialectSettings + { + protected SqlDialectSettings(NServiceBus.SqlDialect dialect) { } + } + public class SqlDialectSettings : NServiceBus.SqlDialectSettings where T : NServiceBus.SqlDialect, new () { public SqlDialectSettings() { } @@ -142,6 +146,7 @@ namespace NServiceBus { public static void ConnectionBuilder(this NServiceBus.PersistenceExtensions configuration, System.Func connectionBuilder) { } public static void DisableInstaller(this NServiceBus.PersistenceExtensions configuration) { } + public static void DoNotUseSqlServerTransportConnection(this NServiceBus.SqlDialectSettings dialectSettings) { } public static NServiceBus.Persistence.Sql.SagaSettings SagaSettings(this NServiceBus.PersistenceExtensions configuration) { } [System.ObsoleteAttribute("Use `persistence.SqlDialect().Schema(\"schema_name\")` inst" + "ead. The member currently throws a NotImplementedException. Will be removed in v" + diff --git a/src/SqlPersistence.Tests/Integration/SqlTransportIntegrationTests.cs b/src/SqlPersistence.Tests/Integration/SqlTransportIntegrationTests.cs index 484c5c1a9..092910fa9 100644 --- a/src/SqlPersistence.Tests/Integration/SqlTransportIntegrationTests.cs +++ b/src/SqlPersistence.Tests/Integration/SqlTransportIntegrationTests.cs @@ -1,4 +1,6 @@ -using System; +//TODO: add tests for the new option to not reuse the connection + +using System; using System.Data.SqlClient; using System.Threading; using System.Threading.Tasks; diff --git a/src/SqlPersistence/Config/SqlDialectSettings.cs b/src/SqlPersistence/Config/SqlDialectSettings.cs index d90d100c2..73ba9e2cc 100644 --- a/src/SqlPersistence/Config/SqlDialectSettings.cs +++ b/src/SqlPersistence/Config/SqlDialectSettings.cs @@ -3,16 +3,32 @@ namespace NServiceBus /// /// Exposes settings options available for the selected database engine. /// - public class SqlDialectSettings where T : SqlDialect, new() + public abstract class SqlDialectSettings + { + internal SqlDialect Dialect; + + /// + /// Exposes settings options available for the selected database engine. + /// + protected SqlDialectSettings(SqlDialect dialect) + { + this.Dialect = dialect; + } + } + + /// + /// Exposes settings options available for the selected database engine. + /// + public class SqlDialectSettings : SqlDialectSettings + where T : SqlDialect, new() { /// /// Exposes settings options available for the selected database engine. /// - public SqlDialectSettings() + public SqlDialectSettings() : base(new T()) { - Settings = new T(); } - internal T Settings { get; } + internal T TypedDialect => (T)Dialect; } } \ No newline at end of file diff --git a/src/SqlPersistence/Config/SqlDialect_SqlServer.cs b/src/SqlPersistence/Config/SqlDialect_MsSqlServer.cs similarity index 100% rename from src/SqlPersistence/Config/SqlDialect_SqlServer.cs rename to src/SqlPersistence/Config/SqlDialect_MsSqlServer.cs diff --git a/src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect.cs b/src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect.cs index 97b635bf9..3d8ea6c05 100644 --- a/src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect.cs +++ b/src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect.cs @@ -13,9 +13,9 @@ public static partial class SqlPersistenceConfig internal static SqlDialect GetSqlDialect(this ReadOnlySettings settings) { - if (settings.TryGet("SqlPersistence.SqlDialect", out SqlDialect value)) + if (settings.TryGet("SqlPersistence.SqlDialect", out SqlDialectSettings value)) { - return value; + return value.Dialect; } throw new Exception("Must specify SQL dialect using persistence.SqlDialect() method."); } @@ -28,11 +28,17 @@ public static SqlDialectSettings SqlDialect(this PersistenceExtensions dialectSettings; + if (settings.TryGet("SqlPersistence.SqlDialect", out dialectSettings)) + { + return dialectSettings; + } + var type = typeof(SqlDialectSettings<>).MakeGenericType(typeof(T)); - var instance = (SqlDialectSettings)Activator.CreateInstance(type); - settings.Set("SqlPersistence.SqlDialect", instance.Settings); - return instance; + dialectSettings = (SqlDialectSettings)Activator.CreateInstance(type); + settings.Set("SqlPersistence.SqlDialect", dialectSettings); + return dialectSettings; } } } \ No newline at end of file diff --git a/src/SqlPersistence/Config/SqlPersistenceConfig_Schema.cs b/src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect_MsSqlServer.cs similarity index 52% rename from src/SqlPersistence/Config/SqlPersistenceConfig_Schema.cs rename to src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect_MsSqlServer.cs index 399df1b00..d65ae8f30 100644 --- a/src/SqlPersistence/Config/SqlPersistenceConfig_Schema.cs +++ b/src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect_MsSqlServer.cs @@ -11,8 +11,16 @@ public static void Schema(this SqlDialectSettings dialec Guard.AgainstNull(nameof(dialectSettings), dialectSettings); Guard.AgainstNullAndEmpty(nameof(schema), schema); Guard.AgainstSqlDelimiters(nameof(schema), schema); - dialectSettings.Settings.Schema = schema; + dialectSettings.TypedDialect.Schema = schema; } + /// + /// Instructs the persistence to not use the connection established by the SQL Server transport. + /// + public static void DoNotUseSqlServerTransportConnection(this SqlDialectSettings dialectSettings) + { + Guard.AgainstNull(nameof(dialectSettings), dialectSettings); + dialectSettings.TypedDialect.DoNotUseTransportConnection = true; + } } } \ No newline at end of file diff --git a/src/SqlPersistence/SynchronizedStorage/SqlDialect.cs b/src/SqlPersistence/SynchronizedStorage/SqlDialect.cs new file mode 100644 index 000000000..33c168df4 --- /dev/null +++ b/src/SqlPersistence/SynchronizedStorage/SqlDialect.cs @@ -0,0 +1,14 @@ +namespace NServiceBus +{ + using System; + using System.Data.Common; + using System.Threading.Tasks; + using Extensibility; + using Persistence; + using Transport; + + public partial class SqlDialect + { + internal abstract Task TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func connectionBuilder, Func storageSessionFactory); + } +} \ No newline at end of file diff --git a/src/SqlPersistence/SynchronizedStorage/SqlDialect_MsSqlServer.cs b/src/SqlPersistence/SynchronizedStorage/SqlDialect_MsSqlServer.cs new file mode 100644 index 000000000..2c54b9087 --- /dev/null +++ b/src/SqlPersistence/SynchronizedStorage/SqlDialect_MsSqlServer.cs @@ -0,0 +1,54 @@ +namespace NServiceBus +{ + using System; + using System.Data.Common; + using System.Threading.Tasks; + using System.Transactions; + using Extensibility; + using Persistence; + using Transport; + + public partial class SqlDialect + { + public partial class MsSqlServer + { + internal override async Task TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func connectionBuilder, Func storageSessionFactory) + { + if (DoNotUseTransportConnection) + { + return null; + } + + //SQL server transport in native TX mode + if (transportTransaction.TryGet("System.Data.SqlClient.SqlConnection", out DbConnection existingSqlConnection) && + transportTransaction.TryGet("System.Data.SqlClient.SqlTransaction", out DbTransaction existingSqlTransaction)) + { + return storageSessionFactory(existingSqlConnection, existingSqlTransaction, false); + } + + // Transport supports DTC and uses TxScope owned by the transport + var scopeTx = Transaction.Current; + if (transportTransaction.TryGet(out Transaction transportTx) && + scopeTx != null && + transportTx != scopeTx) + { + throw new Exception("A TransactionScope has been opened in the current context overriding the one created by the transport. " + + "This setup can result in inconsistent data because operations done via connections enlisted in the context scope won't be committed " + + "atomically with the receive transaction. To manually control the TransactionScope in the pipeline switch the transport transaction mode " + + $"to values lower than '{nameof(TransportTransactionMode.TransactionScope)}'."); + } + var ambientTransaction = transportTx ?? scopeTx; + if (ambientTransaction == null) + { + //Other modes handled by creating a new session. + return null; + } + var connection = await connectionBuilder.OpenConnection().ConfigureAwait(false); + connection.EnlistTransaction(ambientTransaction); + return storageSessionFactory(connection, null, true); + } + + internal bool DoNotUseTransportConnection { get; set; } + } + } +} \ No newline at end of file diff --git a/src/SqlPersistence/SynchronizedStorage/SqlDialect_MySql.cs b/src/SqlPersistence/SynchronizedStorage/SqlDialect_MySql.cs new file mode 100644 index 000000000..aafcc98ea --- /dev/null +++ b/src/SqlPersistence/SynchronizedStorage/SqlDialect_MySql.cs @@ -0,0 +1,23 @@ +namespace NServiceBus +{ + using System; + using System.Data.Common; + using System.Threading.Tasks; + using Extensibility; + using Persistence; + using Transport; + + public partial class SqlDialect + { + public partial class MySql + { + static Task result = Task.FromResult((CompletableSynchronizedStorageSession)null); + + internal override Task TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func connectionBuilder, Func storageSessionFactory) + { + // MySQL does not support DTC so we should not enlist if transport has such a transaction. + return result; + } + } + } +} \ No newline at end of file diff --git a/src/SqlPersistence/SynchronizedStorage/SqlDialect_Oracle.cs b/src/SqlPersistence/SynchronizedStorage/SqlDialect_Oracle.cs new file mode 100644 index 000000000..3cc2e2833 --- /dev/null +++ b/src/SqlPersistence/SynchronizedStorage/SqlDialect_Oracle.cs @@ -0,0 +1,40 @@ +namespace NServiceBus +{ + using System; + using System.Data.Common; + using System.Threading.Tasks; + using System.Transactions; + using Extensibility; + using Persistence; + using Transport; + + public partial class SqlDialect + { + public partial class Oracle + { + internal override async Task TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func connectionBuilder, Func storageSessionFactory) + { + // Oracle supports DTC so we should enlist in the transport's TransactionScope if present + var scopeTx = Transaction.Current; + if (transportTransaction.TryGet(out Transaction transportTx) && + scopeTx != null && + transportTx != scopeTx) + { + throw new Exception("A TransactionScope has been opened in the current context overriding the one created by the transport. " + + "This setup can result in inconsistent data because operations done via connections enlisted in the context scope won't be committed " + + "atomically with the receive transaction. To manually control the TransactionScope in the pipeline switch the transport transaction mode " + + $"to values lower than '{nameof(TransportTransactionMode.TransactionScope)}'."); + } + var ambientTransaction = transportTx ?? scopeTx; + if (ambientTransaction == null) + { + //Other modes handled by creating a new session. + return null; + } + var connection = await connectionBuilder.OpenConnection().ConfigureAwait(false); + connection.EnlistTransaction(ambientTransaction); + return storageSessionFactory(connection, null, true); + } + } + } +} \ No newline at end of file diff --git a/src/SqlPersistence/SynchronizedStorage/StorageAdapter.cs b/src/SqlPersistence/SynchronizedStorage/StorageAdapter.cs index 6d29a42f6..585ff6d52 100644 --- a/src/SqlPersistence/SynchronizedStorage/StorageAdapter.cs +++ b/src/SqlPersistence/SynchronizedStorage/StorageAdapter.cs @@ -1,7 +1,6 @@ using System; using System.Data.Common; using System.Threading.Tasks; -using System.Transactions; using NServiceBus; using NServiceBus.Extensibility; using NServiceBus.Outbox; @@ -13,12 +12,14 @@ class StorageAdapter : ISynchronizedStorageAdapter static Task EmptyResultTask = Task.FromResult(default(CompletableSynchronizedStorageSession)); SagaInfoCache infoCache; + SqlDialect dialect; Func connectionBuilder; - public StorageAdapter(Func connectionBuilder, SagaInfoCache infoCache) + public StorageAdapter(Func connectionBuilder, SagaInfoCache infoCache, SqlDialect dialect) { this.connectionBuilder = connectionBuilder; this.infoCache = infoCache; + this.dialect = dialect; } public Task TryAdapt(OutboxTransaction transaction, ContextBag context) @@ -32,35 +33,9 @@ public Task TryAdapt(OutboxTransaction tr return Task.FromResult(session); } - public async Task TryAdapt(TransportTransaction transportTransaction, ContextBag context) + public Task TryAdapt(TransportTransaction transportTransaction, ContextBag context) { - //SQL server transport in native TX mode - if (transportTransaction.TryGet("System.Data.SqlClient.SqlConnection", out DbConnection existingSqlConnection) && - transportTransaction.TryGet("System.Data.SqlClient.SqlTransaction", out DbTransaction existingSqlTransaction)) - { - return new StorageSession(existingSqlConnection, existingSqlTransaction, false, infoCache); - } - - // Transport supports DTC and uses TxScope owned by the transport - var scopeTx = Transaction.Current; - if (transportTransaction.TryGet(out Transaction transportTx) && - scopeTx != null && - transportTx != scopeTx) - { - throw new Exception("A TransactionScope has been opened in the current context overriding the one created by the transport. " - + "This setup can result in inconsistent data because operations done via connections enlisted in the context scope won't be committed " - + "atomically with the receive transaction. To manually control the TransactionScope in the pipeline switch the transport transaction mode " - + $"to values lower than '{nameof(TransportTransactionMode.TransactionScope)}'."); - } - var ambientTransaction = transportTx ?? scopeTx; - if (ambientTransaction == null) - { - //Other modes handled by creating a new session. - return null; - } - var connection = await connectionBuilder.OpenConnection().ConfigureAwait(false); - connection.EnlistTransaction(ambientTransaction); - return new StorageSession(connection, null, true, infoCache); - + return dialect.TryAdaptTransportConnection(transportTransaction, context, connectionBuilder, + (conn, trans, ownsTx) => new StorageSession(conn, trans, ownsTx, infoCache)); } } \ No newline at end of file diff --git a/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs b/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs index d38329fe3..cd80c575d 100644 --- a/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs +++ b/src/SqlPersistence/SynchronizedStorage/StorageSessionFeature.cs @@ -27,7 +27,7 @@ protected override void Setup(FeatureConfigurationContext context) { infoCache = BuildSagaInfoCache(sqlDialect, settings); container.ConfigureComponent(() => new SynchronizedStorage(connectionBuilder, infoCache), DependencyLifecycle.SingleInstance); - container.ConfigureComponent(() => new StorageAdapter(connectionBuilder, infoCache), DependencyLifecycle.SingleInstance); + container.ConfigureComponent(() => new StorageAdapter(connectionBuilder, infoCache, sqlDialect), DependencyLifecycle.SingleInstance); } if (isSagasEnabledForSqlPersistence) {