Skip to content

Commit

Permalink
Fixed: data races inside sql journal engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Nov 5, 2015
1 parent dce684d commit f088f0c
Showing 1 changed file with 9 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,14 @@ public abstract class JournalDbEngine : IDisposable
/// Settings applied to journal mapped from HOCON config file.
/// </summary>
public readonly JournalSettings Settings;

/// <summary>
/// List of cancellation tokens for each of the currently pending database operations.
/// </summary>
protected readonly LinkedList<CancellationTokenSource> PendingOperations;


/// <summary>
/// Timestamp provider used for generation of timestamps for incoming persistent messages.
/// </summary>
protected readonly ITimestampProvider TimestampProvider;

private readonly ActorSystem _system;
private readonly CancellationTokenSource _pendingRequestsCancellation;

protected JournalDbEngine(ActorSystem system)
{
Expand All @@ -72,7 +68,7 @@ protected JournalDbEngine(ActorSystem system)
QueryMapper = new DefaultJournalQueryMapper(_system.Serialization);
TimestampProvider = CreateTimestampProvider();

PendingOperations = new LinkedList<CancellationTokenSource>();
_pendingRequestsCancellation = new CancellationTokenSource();
}

/// <summary>
Expand Down Expand Up @@ -118,24 +114,7 @@ public DbConnection CreateDbConnection()
/// </summary>
public void Close()
{
StopPendingOperations();
}

/// <summary>
/// Stops all currently executing database operations.
/// </summary>
protected void StopPendingOperations()
{
// stop all operations executed in the background
var node = PendingOperations.First;
while (node != null)
{
var curr = node;
node = node.Next;

curr.Value.Cancel();
PendingOperations.Remove(curr);
}
_pendingRequestsCancellation.Cancel();
}

void IDisposable.Dispose()
Expand All @@ -154,9 +133,8 @@ public async Task ReadEvents(object queryId, IEnumerable<IHint> hints, IActorRef

var sqlCommand = QueryBuilder.SelectEvents(hints);
CompleteCommand(sqlCommand, connection);

var tokenSource = GetCancellationTokenSource();
var reader = await sqlCommand.ExecuteReaderAsync(tokenSource.Token);

var reader = await sqlCommand.ExecuteReaderAsync(_pendingRequestsCancellation.Token);
try
{
while (reader.Read())
Expand All @@ -168,7 +146,6 @@ public async Task ReadEvents(object queryId, IEnumerable<IHint> hints, IActorRef
}
finally
{
PendingOperations.Remove(tokenSource);
reader.Close();
}
}
Expand All @@ -192,9 +169,8 @@ public async Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr,

var sqlCommand = QueryBuilder.SelectMessages(persistenceId, fromSequenceNr, toSequenceNr, max);
CompleteCommand(sqlCommand, connection);

var tokenSource = GetCancellationTokenSource();
var reader = await sqlCommand.ExecuteReaderAsync(tokenSource.Token);

var reader = await sqlCommand.ExecuteReaderAsync(_pendingRequestsCancellation.Token);

try
{
Expand All @@ -207,7 +183,6 @@ public async Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr,
}
finally
{
PendingOperations.Remove(tokenSource);
reader.Close();
}
}
Expand All @@ -224,9 +199,8 @@ public async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fr

var sqlCommand = QueryBuilder.SelectHighestSequenceNr(persistenceId);
CompleteCommand(sqlCommand, connection);
var tokenSource = GetCancellationTokenSource();

var seqNr = await sqlCommand.ExecuteScalarAsync(tokenSource.Token);
var seqNr = await sqlCommand.ExecuteScalarAsync(_pendingRequestsCancellation.Token);
return seqNr is long ? Convert.ToInt64(seqNr) : 0L;
}
}
Expand Down Expand Up @@ -290,13 +264,6 @@ private void CompleteCommand(DbCommand sqlCommand, DbConnection connection)
sqlCommand.CommandTimeout = (int)Settings.ConnectionTimeout.TotalMilliseconds;
}

private CancellationTokenSource GetCancellationTokenSource()
{
var source = new CancellationTokenSource();
PendingOperations.AddLast(source);
return source;
}

private JournalEntry ToJournalEntry(IPersistentRepresentation message)
{
var payloadType = message.Payload.GetType();
Expand Down

0 comments on commit f088f0c

Please sign in to comment.