Skip to content

Commit

Permalink
Improve journal startup code (#189)
Browse files Browse the repository at this point in the history
* Improve journal startup code

* Add comments

---------

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
Arkatufus and Aaronontheweb authored Mar 30, 2023
1 parent c4e6b59 commit 62a0698
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 91 deletions.
3 changes: 3 additions & 0 deletions src/Akka.Persistence.Sql/Db/AkkaDataConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.Data.RetryPolicy;
using LinqToDB.DataProvider;
using LinqToDB.SchemaProvider;

namespace Akka.Persistence.Sql.Db
Expand All @@ -36,6 +37,8 @@ public AkkaDataConnection(

public bool UseDateTime { get; }

public IDataProvider DataProvider => _connection.DataProvider;

public IRetryPolicy RetryPolicy
{
get => _connection.RetryPolicy;
Expand Down
59 changes: 34 additions & 25 deletions src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using System.Linq.Expressions;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class BaseByteArrayJournalDao : BaseJournalDaoWithReadMessages,
private static readonly Expression<Func<PersistenceIdAndSequenceNumber, long>> SequenceNumberSelector =
r => r.SequenceNumber;

protected readonly CancellationToken ShutdownToken;
private readonly Flow<JournalRow, Util.Try<ReplayCompletion>, NotUsed> _deserializeFlowMapped;
private readonly TagMode _tagWriteMode;
protected readonly JournalConfig JournalConfig;
Expand All @@ -86,14 +88,16 @@ protected BaseByteArrayJournalDao(
AkkaPersistenceDataConnectionFactory connectionFactory,
JournalConfig config,
ByteArrayJournalSerializer serializer,
ILoggingAdapter logger)
ILoggingAdapter logger,
CancellationToken shutdownToken)
: base(scheduler, materializer, connectionFactory)
{
Logger = logger;
JournalConfig = config;
Serializer = serializer;
_deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper());
_tagWriteMode = JournalConfig.PluginConfig.TagMode;
ShutdownToken = shutdownToken;

// Due to C# rules we have to initialize WriteQueue here
// Keeping it here vs init function prevents accidental moving of init
Expand Down Expand Up @@ -152,7 +156,7 @@ public async Task Delete(string persistenceId, long maxSequenceNr)
{
await using var connection = ConnectionFactory.GetConnection();

var transaction = await connection.BeginTransactionAsync();
var transaction = await connection.BeginTransactionAsync(ShutdownToken);

try
{
Expand All @@ -162,10 +166,10 @@ await connection
r.PersistenceId == persistenceId &&
r.SequenceNumber <= maxSequenceNr)
.Set(r => r.Deleted, true)
.UpdateAsync();
.UpdateAsync(ShutdownToken);

var maxMarkedDeletion =
await MaxMarkedForDeletionMaxPersistenceIdQuery(connection, persistenceId).FirstOrDefaultAsync();
await MaxMarkedForDeletionMaxPersistenceIdQuery(connection, persistenceId).FirstOrDefaultAsync(ShutdownToken);

if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode)
{
Expand All @@ -181,7 +185,8 @@ await connection.GetTable<JournalMetaData>()
{
PersistenceId = persistenceId,
SequenceNumber = maxMarkedDeletion
});
},
token: ShutdownToken);
}

await connection
Expand All @@ -190,7 +195,7 @@ await connection
r.PersistenceId == persistenceId &&
r.SequenceNumber <= maxSequenceNr &&
r.SequenceNumber < maxMarkedDeletion)
.DeleteAsync();
.DeleteAsync(token: ShutdownToken);

if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode)
{
Expand All @@ -199,7 +204,7 @@ await connection
.Where(r =>
r.PersistenceId == persistenceId &&
r.SequenceNumber < maxMarkedDeletion)
.DeleteAsync();
.DeleteAsync(token: ShutdownToken);
}

if (JournalConfig.PluginConfig.TagMode != TagMode.Csv)
Expand All @@ -209,18 +214,18 @@ await connection
.Where(r =>
r.SequenceNumber <= maxSequenceNr &&
r.PersistenceId == persistenceId)
.DeleteAsync();
.DeleteAsync(token: ShutdownToken);
}

await transaction.CommitAsync();
await transaction.CommitAsync(ShutdownToken);
}
catch (Exception ex)
{
Logger.Error(ex, "Error on delete!");

try
{
await transaction.RollbackAsync();
await transaction.RollbackAsync(ShutdownToken);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -252,7 +257,7 @@ await connection
r.PersistenceId == persistenceId &&
r.SequenceNumber == write.SequenceNr)
.Set(r => r.Message, serialize.Get().Message)
.UpdateAsync();
.UpdateAsync(token: ShutdownToken);

return Done.Instance;
}
Expand All @@ -261,7 +266,7 @@ public async Task<long> HighestSequenceNr(string persistenceId, long fromSequenc
{
await using var connection = ConnectionFactory.GetConnection();

return (await MaxSeqNumberForPersistenceIdQuery(connection, persistenceId, fromSequenceNr).MaxAsync())
return (await MaxSeqNumberForPersistenceIdQuery(connection, persistenceId, fromSequenceNr).MaxAsync(token: ShutdownToken))
.GetValueOrDefault(0);
}

Expand Down Expand Up @@ -295,7 +300,7 @@ public async Task<long> HighestSequenceNr(string persistenceId, long fromSequenc
query = query.Take((int)max);

return Source
.FromTask(query.ToListAsync())
.FromTask(query.ToListAsync(token: ShutdownToken))
.SelectMany(r => r)
.Via(_deserializeFlowMapped);
}
Expand Down Expand Up @@ -348,7 +353,7 @@ private async Task WriteJournalRows(Seq<JournalRow> xs)
// If we are writing a single row,
// we don't need to worry about transactions.
await using var connection = ConnectionFactory.GetConnection();
await connection.InsertAsync(xs.Head);
await connection.InsertAsync(xs.Head, ShutdownToken);
break;
}

Expand All @@ -362,13 +367,13 @@ private async Task InsertMultiple(Seq<JournalRow> xs)
{
await using var connection = ConnectionFactory.GetConnection();

await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadCommitted);
await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadCommitted, ShutdownToken);

try
{
if (_tagWriteMode == TagMode.Csv)
{
await BulkInsertNoTagTableTags(connection, xs, JournalConfig.DaoConfig);
await BulkInsertNoTagTableTags(connection, xs, JournalConfig.DaoConfig, ShutdownToken);
}
else
{
Expand All @@ -378,21 +383,21 @@ private async Task InsertMultiple(Seq<JournalRow> xs)
{
(var noTags, tail) = tail.Span(r => r.TagArr.Length == 0);
if (noTags.Count > 0)
await BulkInsertNoTagTableTags(connection, noTags, config);
await BulkInsertNoTagTableTags(connection, noTags, config, ShutdownToken);

(var hasTags, tail) = tail.Span(r => r.TagArr.Length > 0);
if (hasTags.Count > 0)
await InsertWithOrderingAndBulkInsertTags(connection, hasTags, config);
await InsertWithOrderingAndBulkInsertTags(connection, hasTags, config, ShutdownToken);
}
}

await connection.CommitTransactionAsync();
await connection.CommitTransactionAsync(ShutdownToken);
}
catch (Exception e1)
{
try
{
await connection.RollbackTransactionAsync();
await connection.RollbackTransactionAsync(ShutdownToken);
}
catch (Exception e2)
{
Expand All @@ -407,7 +412,8 @@ private async Task InsertMultiple(Seq<JournalRow> xs)
private static async Task InsertWithOrderingAndBulkInsertTags(
AkkaDataConnection connection,
Seq<JournalRow> xs,
BaseByteArrayJournalDaoConfig config)
BaseByteArrayJournalDaoConfig config,
CancellationToken token)
{
var tagsToInsert = new List<JournalTagRow>(xs.Count);

Expand All @@ -416,7 +422,7 @@ private static async Task InsertWithOrderingAndBulkInsertTags(
// We're forced to insert the rows one by one.
foreach (var journalRow in xs)
{
var dbId = await connection.InsertWithInt64IdentityAsync(journalRow);
var dbId = await connection.InsertWithInt64IdentityAsync(journalRow, token);

tagsToInsert.AddRange(
journalRow.TagArr.Select(
Expand All @@ -436,14 +442,16 @@ await connection
.WithBulkCopyType(BulkCopyType.MultipleRows)
.WithUseParameters(config.PreferParametersOnMultiRowInsert)
.WithMaxBatchSize(config.DbRoundTripTagBatchSize),
tagsToInsert);
tagsToInsert,
cancellationToken: token);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static async Task BulkInsertNoTagTableTags(
AkkaDataConnection connection,
Seq<JournalRow> xs,
BaseByteArrayJournalDaoConfig config)
BaseByteArrayJournalDaoConfig config,
CancellationToken token)
=> await connection
.GetTable<JournalRow>()
.BulkCopyAsync(
Expand All @@ -454,7 +462,8 @@ private static async Task BulkInsertNoTagTableTags(
: BulkCopyType.MultipleRows)
.WithUseParameters(config.PreferParametersOnMultiRowInsert)
.WithMaxBatchSize(config.DbRoundTripBatchSize),
xs);
xs,
cancellationToken: token);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Guid NextUuid()
Expand Down
69 changes: 39 additions & 30 deletions src/Akka.Persistence.Sql/Journal/Dao/ByteArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Sql.Config;
Expand All @@ -23,7 +26,8 @@ public ByteArrayJournalDao(
AkkaPersistenceDataConnectionFactory connection,
JournalConfig journalConfig,
Akka.Serialization.Serialization serializer,
ILoggingAdapter logger)
ILoggingAdapter logger,
CancellationToken shutdownToken)
: base(
scheduler: scheduler,
materializer: mat,
Expand All @@ -33,45 +37,50 @@ public ByteArrayJournalDao(
journalConfig,
serializer,
journalConfig.PluginConfig.TagSeparator),
logger: logger) { }
logger: logger,
shutdownToken: shutdownToken) { }

// TODO: change this to async
public void InitializeTables()
public async Task InitializeTables(CancellationToken token)
{
using var connection = ConnectionFactory.GetConnection();
await using var connection = ConnectionFactory.GetConnection();

try
{
connection.CreateTable<JournalRow>();
if (JournalConfig.PluginConfig.TagMode is not TagMode.Csv)
connection.CreateTable<JournalTagRow>();
}
catch (Exception e)
{
if (JournalConfig.WarnOnAutoInitializeFail)
{
Logger.Warning(
e,
$"Could not Create Journal Table {JournalConfig.TableConfig.EventJournalTable.Name} as requested by config.");
}
}

if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode)
// MS Sqlite does not support schema, we have to blindly try and create the tables
if (connection.DataProvider.Name is ProviderName.SQLiteMS)
{
try
{
connection.CreateTable<JournalMetaData>();
await connection.CreateTableAsync<JournalRow>(token);
}
catch (Exception e)
{
if (JournalConfig.WarnOnAutoInitializeFail)
catch { /* no-op */ }

if (JournalConfig.PluginConfig.TagMode is not TagMode.Csv)
try
{
Logger.Warning(
e,
$"Could not Create Journal Metadata Table {JournalConfig.TableConfig.MetadataTable.Name} as requested by config.");
await connection.CreateTableAsync<JournalTagRow>(token);
}
}
catch { /* no-op */ }

if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode)
try
{
await connection.CreateTableAsync<JournalMetaData>(token);
}
catch { /* no-op */ }

return;
}

var schema = connection.GetSchema();
if(schema.Tables.All(t => t.TableName != JournalConfig.TableConfig.EventJournalTable.Name))
await connection.CreateTableAsync<JournalRow>(token);

if (JournalConfig.PluginConfig.TagMode is not TagMode.Csv)
if(schema.Tables.All(t => t.TableName != JournalConfig.TableConfig.TagTable.Name))
await connection.CreateTableAsync<JournalTagRow>(token);

if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode)
if(schema.Tables.All(t => t.TableName != JournalConfig.TableConfig.MetadataTable.Name))
await connection.CreateTableAsync<JournalMetaData>(token);
}
}
}
Loading

0 comments on commit 62a0698

Please sign in to comment.