Skip to content

Commit

Permalink
Stored json gets corrupted on SqlServer after an update with a smalle…
Browse files Browse the repository at this point in the history
…r sized payload (#1332)

* Stored json gets corrupted on SqlServer after an update with a smaller sized payload (#1331)

* Fix issue where sql parameter optimalisation was applied for INSERT/UPDATE. Now only applied for `ExecuteReaderAsync` and `ExecuteReaderAsync`.

* Fix failing CI tests due to new test using incorrect table for testing

* Updated the test name to better express its intent

* Version still supports NETFRAMEWORK and await  using isn't supported in NETFX
  • Loading branch information
ramonsmits authored Nov 15, 2023
1 parent bb6a57a commit b610ee0
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
namespace NServiceBus.PersistenceTesting.Sagas
{
using System;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;

public class When_updating_saga_with_smaller_state : SagaPersisterTests
{
[Test]
public async Task It_should_truncate_the_stored_state()
{
// When updating an existing saga where the serialized state is smaller in length than the previous the column value should not have any left over data from the previous value.
// The deserializer ignores any trailing

var sqlVariant = (SqlTestVariant)param.Values[0];

if (sqlVariant.Dialect is not SqlDialect.MsSqlServer)
{
Assert.Ignore("Only relevant for SQL Server");
return; // Satisfy compiler
}

var sagaData = new SagaWithCorrelationPropertyData
{
CorrelatedProperty = Guid.NewGuid().ToString(),
Payload = "very long state"
};

await SaveSaga(sagaData);

SagaWithCorrelationPropertyData retrieved;
var context = configuration.GetContextBagForSagaStorage();
var persister = configuration.SagaStorage;

using (var completeSession = configuration.CreateStorageSession())
{
await completeSession.Open(context);

retrieved = await persister.Get<SagaWithCorrelationPropertyData>(nameof(sagaData.CorrelatedProperty), sagaData.CorrelatedProperty, completeSession, context);

retrieved.Payload = "short";

await persister.Update(retrieved, completeSession, context);
await completeSession.CompleteAsync();
}

var retrieved2 = await GetById<SagaWithCorrelationPropertyData>(sagaData.Id);

Assert.LessOrEqual(retrieved.Payload, sagaData.Payload); // No real need, but here to prevent accidental updates
Assert.AreEqual(retrieved.Payload, retrieved2.Payload);
#if NETFRAMEWORK
using var con = sqlVariant.Open();
#else
await using var con = sqlVariant.Open();
#endif
await con.OpenAsync();
var cmd = con.CreateCommand();
cmd.CommandText = $"SELECT Data FROM [PersistenceTests_SWCP] WHERE Id = '{retrieved.Id}'";
var data = (string)await cmd.ExecuteScalarAsync();

// Payload should only have a single closing bracket, if there are more that means there is trailing data
var countClosingBrackets = data.ToCharArray().Count(x => x == '}');

Assert.AreEqual(1, countClosingBrackets);
}

public class SagaWithCorrelationProperty : Saga<SagaWithCorrelationPropertyData>, IAmStartedByMessages<StartMessage>
{
public Task Handle(StartMessage message, IMessageHandlerContext context)
{
throw new NotImplementedException();
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SagaWithCorrelationPropertyData> mapper)
{
mapper.ConfigureMapping<StartMessage>(msg => msg.SomeId).ToSaga(saga => saga.CorrelatedProperty);
}
}

public class SagaWithCorrelationPropertyData : ContainSagaData
{
public string CorrelatedProperty { get; set; }
public string Payload { get; set; }
}

public class StartMessage
{
public string SomeId { get; set; }
}

public When_updating_saga_with_smaller_state(TestVariant param) : base(param)
{
}
}
}
2 changes: 2 additions & 0 deletions src/SqlPersistence/CommandWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ public Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken = defa

public Task<DbDataReader> ExecuteReaderAsync(CancellationToken cancellationToken = default)
{
dialect.OptimizeForReads(command);
return command.ExecuteReaderAsync(cancellationToken);
}

public Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken = default)
{
var resultingBehavior = dialect.ModifyBehavior(command.Connection, behavior);
dialect.OptimizeForReads(command);
return command.ExecuteReaderAsync(resultingBehavior, cancellationToken);
}

Expand Down
4 changes: 4 additions & 0 deletions src/SqlPersistence/Config/SqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,9 @@ internal virtual void ValidateTablePrefix(string tablePrefix)
}

internal abstract object GetCustomDialectDiagnosticsInfo();

internal virtual void OptimizeForReads(DbCommand command)
{
}
}
}
33 changes: 21 additions & 12 deletions src/SqlPersistence/Config/SqlDialect_MsSqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,7 @@ internal override void SetParameterValue(DbParameter parameter, object value)
if (value is ArraySegment<char> charSegment)
{
parameter.Value = charSegment.Array;

// Set to 4000 or -1 to improve query execution plan reuse
// Must be set when exceeding 4000 characters for nvarchar(max) https://stackoverflow.com/a/973269/199551
parameter.Size = charSegment.Count > 4000 ? -1 : 4000;
}
else if (value is string stringValue)
{
parameter.Value = stringValue;

// Set to 4000 or -1 to improve query execution plan reuse
// Must be set when exceeding 4000 characters for nvarchar(max) https://stackoverflow.com/a/973269/199551
parameter.Size = stringValue.Length > 4000 ? -1 : 4000;
parameter.Size = charSegment.Count;
}
else
{
Expand Down Expand Up @@ -84,9 +73,29 @@ internal override object GetCustomDialectDiagnosticsInfo()
};
}

internal override void OptimizeForReads(DbCommand command)
{
foreach (DbParameter parameter in command.Parameters)
{
if (parameter.Value is ArraySegment<char> charSegment)
{
// Set to 4000 or -1 to improve query execution plan reuse
// Must be set when exceeding 4000 characters for nvarchar(max) https://stackoverflow.com/a/973269/199551
parameter.Size = charSegment.Count > 4000 ? -1 : 4000;
}
else if (parameter.Value is string stringValue)
{
// Set to 4000 or -1 to improve query execution plan reuse
// Must be set when exceeding 4000 characters for nvarchar(max) https://stackoverflow.com/a/973269/199551
parameter.Size = stringValue.Length > 4000 ? -1 : 4000;
}
}
}

internal string Schema { get; set; }
bool hasConnectionBeenInspectedForEncryption;
bool isConnectionEncrypted;
}

}
}

0 comments on commit b610ee0

Please sign in to comment.