Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to repro transaction problems #1200

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/SqlPersistence.PersistenceTests/OutboxStorageAdditionalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
namespace NServiceBus.PersistenceTesting.Outbox
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Extensibility;
using NServiceBus.Outbox;
using NUnit.Framework;
using Transport;
using TransportOperation = NServiceBus.Outbox.TransportOperation;

[TestFixtureSource(typeof(PersistenceTestsConfiguration), nameof(PersistenceTestsConfiguration.OutboxVariants))]
class OutboxStorageAdditionalTests
{
public OutboxStorageAdditionalTests(TestVariant param)
{
this.param = param.DeepCopy();
variant = (SqlTestVariant)param.Values[0];
}

[OneTimeSetUp]
public async Task OneTimeSetUp()
{
configuration = new PersistenceTestsConfiguration(param);
await configuration.Configure();
}

[OneTimeTearDown]
public async Task OneTimeTearDown()
{
await configuration.Cleanup();
}

[Test]
public async Task Should_()
{
configuration.RequiresOutboxSupport();
variant.RequiresOutboxPessimisticConcurrencySupport();

var messageId = Guid.NewGuid().ToString();
var storage = configuration.OutboxStorage;

var firstSessionBeginDone = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

ContextBag GetContextBagForOutbox(string incomingMessageId)
{
var contextBag = new ContextBag();
contextBag.Set(new IncomingMessage(incomingMessageId, new Dictionary<string, string>(), Array.Empty<byte>()));
return contextBag;
};

async Task FirstSession()
{
var firstSessionContextBag = GetContextBagForOutbox(messageId);
var outboxMessage = await storage.Get(messageId, firstSessionContextBag);
Assert.Null(outboxMessage);

Console.WriteLine("First session begin transaction");
using var transactionA = await storage.BeginTransaction(firstSessionContextBag);
firstSessionBeginDone.SetResult(true);
Console.WriteLine("First session began transaction");

Console.WriteLine("First session store");
await storage.Store(new OutboxMessage(messageId, Array.Empty<TransportOperation>()), transactionA, firstSessionContextBag);
Console.WriteLine("First session stored");
Console.WriteLine("First session commit");

await Task.Delay(TimeSpan.FromMinutes(2));

await transactionA.Commit();
Console.WriteLine("First session committed");
}

async Task SecondSession()
{
var secondSessionContextBag = GetContextBagForOutbox(messageId);
await firstSessionBeginDone.Task;
var outboxMessage = await storage.Get(messageId, secondSessionContextBag);
Assert.Null(outboxMessage);

Console.WriteLine("Second session begin transaction");
using var transactionA = await storage.BeginTransaction(secondSessionContextBag);
Console.WriteLine("Second session began transaction");

Console.WriteLine("Second session store");
await storage.Store(new OutboxMessage(messageId, Array.Empty<TransportOperation>()), transactionA, secondSessionContextBag);
Console.WriteLine("Second session stored");
Console.WriteLine("Second session commit");
await transactionA.Commit();
Console.WriteLine("Second session committed");
}

await Task.WhenAll(SecondSession(), FirstSession());

var message = await storage.Get(messageId, configuration.GetContextBagForOutbox());

Assert.That(message, Is.Not.Null);
CollectionAssert.IsEmpty(message.TransportOperations);
}

IPersistenceTestsConfiguration configuration;
TestVariant param;
readonly SqlTestVariant variant;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Task Configure(CancellationToken cancellationToken = default)
var connectionManager = new ConnectionManager(connectionFactory);
SagaIdGenerator = new DefaultSagaIdGenerator();
SagaStorage = new SagaPersister(infoCache, dialect);
OutboxStorage = CreateOutboxPersister(connectionManager, dialect, false, false);
OutboxStorage = CreateOutboxPersister(connectionManager, dialect, pessimisticMode, false);
SupportsPessimisticConcurrency = pessimisticMode;
CreateStorageSession = () => new StorageSession(connectionManager, infoCache, dialect);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace NServiceBus.PersistenceTesting
using System.Data.Common;
using Npgsql;
using NpgsqlTypes;
using NUnit.Framework;
using Persistence.Sql.ScriptBuilder;

static class SqlTestVariantExtensions
Expand Down Expand Up @@ -33,5 +34,13 @@ public static DbConnection Open(this SqlTestVariant variant)
$"{nameof(SqlTestVariant.BuildDialect)} '{variant.BuildDialect}' is not supported yet as a test variant.")
};
}

public static void RequiresOutboxPessimisticConcurrencySupport(this SqlTestVariant variant)
{
if (!variant.UsePessimisticMode)
{
Assert.Ignore("Ignoring this test because it requires pessimistic concurrency support from persister.");
}
}
}
}