Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt-out from reusing SQL Server transport's connection #148

Merged
merged 2 commits into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/SqlPersistence.Tests/APIApprovals.Approve.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ namespace NServiceBus
public Oracle() { }
}
}
public class SqlDialectSettings<T>
public abstract class SqlDialectSettings
{
protected SqlDialectSettings(NServiceBus.SqlDialect dialect) { }
}
public class SqlDialectSettings<T> : NServiceBus.SqlDialectSettings
where T : NServiceBus.SqlDialect, new ()
{
public SqlDialectSettings() { }
Expand All @@ -142,6 +146,7 @@ namespace NServiceBus
{
public static void ConnectionBuilder(this NServiceBus.PersistenceExtensions<NServiceBus.Persistence.Sql.SqlPersistence> configuration, System.Func<System.Data.Common.DbConnection> connectionBuilder) { }
public static void DisableInstaller(this NServiceBus.PersistenceExtensions<NServiceBus.Persistence.Sql.SqlPersistence> configuration) { }
public static void DoNotUseSqlServerTransportConnection(this NServiceBus.SqlDialectSettings<NServiceBus.SqlDialect.MsSqlServer> dialectSettings) { }
public static NServiceBus.Persistence.Sql.SagaSettings SagaSettings(this NServiceBus.PersistenceExtensions<NServiceBus.Persistence.Sql.SqlPersistence> configuration) { }
[System.ObsoleteAttribute("Use `persistence.SqlDialect<SqlDialect.DialectType>().Schema(\"schema_name\")` inst" +
"ead. The member currently throws a NotImplementedException. Will be removed in v" +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
//TODO: add tests for the new option to not reuse the connection

using System;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
Expand Down
24 changes: 20 additions & 4 deletions src/SqlPersistence/Config/SqlDialectSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,32 @@ namespace NServiceBus
/// <summary>
/// Exposes settings options available for the selected database engine.
/// </summary>
public class SqlDialectSettings<T> where T : SqlDialect, new()
public abstract class SqlDialectSettings
{
internal SqlDialect Dialect;

/// <summary>
/// Exposes settings options available for the selected database engine.
/// </summary>
protected SqlDialectSettings(SqlDialect dialect)
{
this.Dialect = dialect;
}
}

/// <summary>
/// Exposes settings options available for the selected database engine.
/// </summary>
public class SqlDialectSettings<T> : SqlDialectSettings
where T : SqlDialect, new()
{
/// <summary>
/// Exposes settings options available for the selected database engine.
/// </summary>
public SqlDialectSettings()
public SqlDialectSettings() : base(new T())
{
Settings = new T();
}

internal T Settings { get; }
internal T TypedDialect => (T)Dialect;
}
}
18 changes: 12 additions & 6 deletions src/SqlPersistence/Config/SqlPersistenceConfig_SqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public static partial class SqlPersistenceConfig

internal static SqlDialect GetSqlDialect(this ReadOnlySettings settings)
{
if (settings.TryGet("SqlPersistence.SqlDialect", out SqlDialect value))
if (settings.TryGet("SqlPersistence.SqlDialect", out SqlDialectSettings value))
{
return value;
return value.Dialect;
}
throw new Exception("Must specify SQL dialect using persistence.SqlDialect<T>() method.");
}
Expand All @@ -28,11 +28,17 @@ public static SqlDialectSettings<T> SqlDialect<T>(this PersistenceExtensions<Sql
where T : SqlDialect, new()
{
var settings = configuration.GetSettings();


SqlDialectSettings<T> dialectSettings;
if (settings.TryGet("SqlPersistence.SqlDialect", out dialectSettings))
{
return dialectSettings;
}

var type = typeof(SqlDialectSettings<>).MakeGenericType(typeof(T));
var instance = (SqlDialectSettings<T>)Activator.CreateInstance(type);
settings.Set("SqlPersistence.SqlDialect", instance.Settings);
return instance;
dialectSettings = (SqlDialectSettings<T>)Activator.CreateInstance(type);
settings.Set("SqlPersistence.SqlDialect", dialectSettings);
return dialectSettings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@ public static void Schema(this SqlDialectSettings<SqlDialect.MsSqlServer> dialec
Guard.AgainstNull(nameof(dialectSettings), dialectSettings);
Guard.AgainstNullAndEmpty(nameof(schema), schema);
Guard.AgainstSqlDelimiters(nameof(schema), schema);
dialectSettings.Settings.Schema = schema;
dialectSettings.TypedDialect.Schema = schema;
}

/// <summary>
/// Instructs the persistence to not use the connection established by the SQL Server transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking how weird this is going to look to someone who isn't also using the SQL Transport. The comment here needs to tell the user why this would be necessary, and the answer is only if you were using a different connection string between SQL Transport and SQL Persistence, right?

So that got me thinking - can't we just look at the connection strings and enlist only if they match?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DavidBoike that used to be an option with NHibernate but since SQL persistence uses Func<DbConnection> it would mean that for each message we need to create the persistence connection using the factory and check if connection string's match.

Alternatively we can assume that the connection factory always returns connection with the same connection string and check only once but I am not sure if we can make such an assumption.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we also can't detect when they have it misconfigured and throw an appropriate exception, because we don't even try creating a connection until we've determined we can't enlist, right?

I'm not sure I like that very much. I wouldn't want to wait around for an "invalid object name" exception with no further clue to what's really going on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about trying to create the connection on strartup to capture the connection string there? We do that in SQL transport if user provides a func:

https://github.com/Particular/NServiceBus.SqlServer/blob/develop/src/NServiceBus.SqlServer/SqlServerTransport.cs#L64

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SzymonPobiega I like the connection string capture on startup idea.

/// </summary>
public static void DoNotUseSqlServerTransportConnection(this SqlDialectSettings<SqlDialect.MsSqlServer> dialectSettings)
{
Guard.AgainstNull(nameof(dialectSettings), dialectSettings);
dialectSettings.TypedDialect.DoNotUseTransportConnection = true;
}
}
}
14 changes: 14 additions & 0 deletions src/SqlPersistence/SynchronizedStorage/SqlDialect.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace NServiceBus
{
using System;
using System.Data.Common;
using System.Threading.Tasks;
using Extensibility;
using Persistence;
using Transport;

public partial class SqlDialect
{
internal abstract Task<CompletableSynchronizedStorageSession> TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func<DbConnection> connectionBuilder, Func<DbConnection, DbTransaction, bool, StorageSession> storageSessionFactory);
}
}
54 changes: 54 additions & 0 deletions src/SqlPersistence/SynchronizedStorage/SqlDialect_MsSqlServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace NServiceBus
{
using System;
using System.Data.Common;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
using Persistence;
using Transport;

public partial class SqlDialect
{
public partial class MsSqlServer
{
internal override async Task<CompletableSynchronizedStorageSession> TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func<DbConnection> connectionBuilder, Func<DbConnection, DbTransaction, bool, StorageSession> storageSessionFactory)
{
if (DoNotUseTransportConnection)
{
return null;
}

//SQL server transport in native TX mode
if (transportTransaction.TryGet("System.Data.SqlClient.SqlConnection", out DbConnection existingSqlConnection) &&
transportTransaction.TryGet("System.Data.SqlClient.SqlTransaction", out DbTransaction existingSqlTransaction))
{
return storageSessionFactory(existingSqlConnection, existingSqlTransaction, false);
}

// Transport supports DTC and uses TxScope owned by the transport
var scopeTx = Transaction.Current;
if (transportTransaction.TryGet(out Transaction transportTx) &&
scopeTx != null &&
transportTx != scopeTx)
{
throw new Exception("A TransactionScope has been opened in the current context overriding the one created by the transport. "
+ "This setup can result in inconsistent data because operations done via connections enlisted in the context scope won't be committed "
+ "atomically with the receive transaction. To manually control the TransactionScope in the pipeline switch the transport transaction mode "
+ $"to values lower than '{nameof(TransportTransactionMode.TransactionScope)}'.");
}
var ambientTransaction = transportTx ?? scopeTx;
if (ambientTransaction == null)
{
//Other modes handled by creating a new session.
return null;
}
var connection = await connectionBuilder.OpenConnection().ConfigureAwait(false);
connection.EnlistTransaction(ambientTransaction);
return storageSessionFactory(connection, null, true);
}

internal bool DoNotUseTransportConnection { get; set; }
}
}
}
23 changes: 23 additions & 0 deletions src/SqlPersistence/SynchronizedStorage/SqlDialect_MySql.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace NServiceBus
{
using System;
using System.Data.Common;
using System.Threading.Tasks;
using Extensibility;
using Persistence;
using Transport;

public partial class SqlDialect
{
public partial class MySql
{
static Task<CompletableSynchronizedStorageSession> result = Task.FromResult((CompletableSynchronizedStorageSession)null);

internal override Task<CompletableSynchronizedStorageSession> TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func<DbConnection> connectionBuilder, Func<DbConnection, DbTransaction, bool, StorageSession> storageSessionFactory)
{
// MySQL does not support DTC so we should not enlist if transport has such a transaction.
return result;
}
}
}
}
40 changes: 40 additions & 0 deletions src/SqlPersistence/SynchronizedStorage/SqlDialect_Oracle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace NServiceBus
{
using System;
using System.Data.Common;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
using Persistence;
using Transport;

public partial class SqlDialect
{
public partial class Oracle
{
internal override async Task<CompletableSynchronizedStorageSession> TryAdaptTransportConnection(TransportTransaction transportTransaction, ContextBag context, Func<DbConnection> connectionBuilder, Func<DbConnection, DbTransaction, bool, StorageSession> storageSessionFactory)
{
// Oracle supports DTC so we should enlist in the transport's TransactionScope if present
var scopeTx = Transaction.Current;
if (transportTransaction.TryGet(out Transaction transportTx) &&
scopeTx != null &&
transportTx != scopeTx)
{
throw new Exception("A TransactionScope has been opened in the current context overriding the one created by the transport. "
+ "This setup can result in inconsistent data because operations done via connections enlisted in the context scope won't be committed "
+ "atomically with the receive transaction. To manually control the TransactionScope in the pipeline switch the transport transaction mode "
+ $"to values lower than '{nameof(TransportTransactionMode.TransactionScope)}'.");
}
var ambientTransaction = transportTx ?? scopeTx;
if (ambientTransaction == null)
{
//Other modes handled by creating a new session.
return null;
}
var connection = await connectionBuilder.OpenConnection().ConfigureAwait(false);
connection.EnlistTransaction(ambientTransaction);
return storageSessionFactory(connection, null, true);
}
}
}
}
37 changes: 6 additions & 31 deletions src/SqlPersistence/SynchronizedStorage/StorageAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Data.Common;
using System.Threading.Tasks;
using System.Transactions;
using NServiceBus;
using NServiceBus.Extensibility;
using NServiceBus.Outbox;
Expand All @@ -13,12 +12,14 @@ class StorageAdapter : ISynchronizedStorageAdapter
static Task<CompletableSynchronizedStorageSession> EmptyResultTask = Task.FromResult(default(CompletableSynchronizedStorageSession));

SagaInfoCache infoCache;
SqlDialect dialect;
Func<DbConnection> connectionBuilder;

public StorageAdapter(Func<DbConnection> connectionBuilder, SagaInfoCache infoCache)
public StorageAdapter(Func<DbConnection> connectionBuilder, SagaInfoCache infoCache, SqlDialect dialect)
{
this.connectionBuilder = connectionBuilder;
this.infoCache = infoCache;
this.dialect = dialect;
}

public Task<CompletableSynchronizedStorageSession> TryAdapt(OutboxTransaction transaction, ContextBag context)
Expand All @@ -32,35 +33,9 @@ public Task<CompletableSynchronizedStorageSession> TryAdapt(OutboxTransaction tr
return Task.FromResult(session);
}

public async Task<CompletableSynchronizedStorageSession> TryAdapt(TransportTransaction transportTransaction, ContextBag context)
public Task<CompletableSynchronizedStorageSession> TryAdapt(TransportTransaction transportTransaction, ContextBag context)
{
//SQL server transport in native TX mode
if (transportTransaction.TryGet("System.Data.SqlClient.SqlConnection", out DbConnection existingSqlConnection) &&
transportTransaction.TryGet("System.Data.SqlClient.SqlTransaction", out DbTransaction existingSqlTransaction))
{
return new StorageSession(existingSqlConnection, existingSqlTransaction, false, infoCache);
}

// Transport supports DTC and uses TxScope owned by the transport
var scopeTx = Transaction.Current;
if (transportTransaction.TryGet(out Transaction transportTx) &&
scopeTx != null &&
transportTx != scopeTx)
{
throw new Exception("A TransactionScope has been opened in the current context overriding the one created by the transport. "
+ "This setup can result in inconsistent data because operations done via connections enlisted in the context scope won't be committed "
+ "atomically with the receive transaction. To manually control the TransactionScope in the pipeline switch the transport transaction mode "
+ $"to values lower than '{nameof(TransportTransactionMode.TransactionScope)}'.");
}
var ambientTransaction = transportTx ?? scopeTx;
if (ambientTransaction == null)
{
//Other modes handled by creating a new session.
return null;
}
var connection = await connectionBuilder.OpenConnection().ConfigureAwait(false);
connection.EnlistTransaction(ambientTransaction);
return new StorageSession(connection, null, true, infoCache);

return dialect.TryAdaptTransportConnection(transportTransaction, context, connectionBuilder,
(conn, trans, ownsTx) => new StorageSession(conn, trans, ownsTx, infoCache));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ protected override void Setup(FeatureConfigurationContext context)
{
infoCache = BuildSagaInfoCache(sqlDialect, settings);
container.ConfigureComponent(() => new SynchronizedStorage(connectionBuilder, infoCache), DependencyLifecycle.SingleInstance);
container.ConfigureComponent(() => new StorageAdapter(connectionBuilder, infoCache), DependencyLifecycle.SingleInstance);
container.ConfigureComponent(() => new StorageAdapter(connectionBuilder, infoCache, sqlDialect), DependencyLifecycle.SingleInstance);
}
if (isSagasEnabledForSqlPersistence)
{
Expand Down