diff --git a/README.md b/README.md index b18528e7..033c41e3 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,13 @@ A Cross-SQL-DB Engine Akka.Persistence plugin with broad database compatibility thanks to Linq2Db. -This is a Fairly-Naive port of the amazing akka-persistence-jdbc package from Scala to C#. +This is a port of the amazing akka-persistence-jdbc package from Scala, with a few improvements based on C# as well as our choice of data library. Please read the documentation carefully. Some features may be specific to use case and have trade-offs (namely, compatibility modes) ## Status -- Usable for basic Read/Writes + - Implements the following for `Akka.Persistence.Query`: - IPersistenceIdsQuery - ICurrentPersistenceIdsQuery @@ -20,9 +20,7 @@ Please read the documentation carefully. Some features may be specific to use ca - ICurrentAllEventsQuery - Snapshot Store Support -#### This is still a WORK IN PROGRESS - -**Pull Requests are Welcome** but please note this is still considered 'work in progress' and only used if one understands the risks. While the TCK Specs pass you should still test in a 'safe' non-production environment carefully before deciding to fully deploy. +See something you want to add or improve? **Pull Requests are Welcome!** Working: @@ -102,27 +100,8 @@ Compatibility with existing Providers is partially implemented via `table-compat # Performance -Tests based on i-7 8750H, 32GB Ram, 2TB SSD, Windows 10 version Version 10.0.19041.630. -Databases running on Docker WSL2. - -All numbers are in msg/sec. - -|Test |SqlServer (normal) | SqlServer Batching | Linq2Db |vs Normal| vs Batching| -|:------------- |:------------- | :----------: | -----------: | -----------: | -----------: | -|Persist | 164|427| 235|143.29%|55.04%| -|PersistAll | 782|875| 5609|717.26%|641.03%| -|PersistAsync | 630|846| 16099|2555.40%|1902.96%| -|PersistAllAsync | 2095|902| 15681|748.50%|1738.47%| -|PersistGroup10 | 590|680| 1069|181.19%|157.21%| -|PersistGroup100 | 607|965| 5537|912.19%|573.78%| -|PersistGroup200 | 628|1356| 7966|1268.47%|587.46%| -|PersistGroup25 | 629|675| 2189|348.01%|324.30%| -|PersistGroup400 | 612|1011| 7237|1182.52%|715.83%| -|PersistGroup50 | 612|654| 3867|631.86%|591.28%| -|Recovering | 41903|38766| 42592|101.64%|109.87%| -|Recovering8 | 75466|65515| 63960|84.75%|97.63%| -|RecoveringFour | 59259|51355| 58437|98.61%|113.79%| -|RecoveringTwo | 41745|35512| 41108|98.47%|115.76%| +Updated Performance numbers pending. + ## Sql.Common Compatibility modes - Delete Compatibility mode is available. diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj index 506f3ae7..24982456 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Akka.Persistence.Linq2Db.Benchmark.DockerTests.csproj @@ -13,12 +13,12 @@ - - + + - + diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj index 378a9226..fe771662 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Akka.Persistence.Linq2Db.Benchmark.Tests.csproj @@ -16,12 +16,12 @@ - - + + - + diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs index be9d7899..8bb31b23 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/DupJournalPerfSpec.cs @@ -136,6 +136,7 @@ internal void MeasureGroup(Func msg, Action block, int numMsg, { var measurements = new List(MeasurementIterations); + block(); block(); //warm-up int i = 0; @@ -550,7 +551,7 @@ public void PersistenceActor_performance_must_measure_Recovering8() FeedAndExpectLastSpecific(p6, "p", Commands); FeedAndExpectLastSpecific(p7, "p", Commands); FeedAndExpectLastSpecific(p8, "p", Commands); - MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () => + MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms , {(EventsCount*8 / d.TotalMilliseconds) * 1000} total msg/sec", () => { var task1 = Task.Run(()=> { diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj index bbb72821..61f8693a 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj @@ -12,8 +12,8 @@ - - + + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj index 68626e8d..a9124902 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs index 97789cc8..c442f370 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs @@ -50,6 +50,7 @@ public async Task Can_Recover_SqlCommon_Journal() persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1}); Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(1000); persistRef = sys1.ActorOf(Props.Create(() => new JournalCompatActor(NewJournal, "p-1")), "test-recover-1"); @@ -90,6 +91,7 @@ public async Task SqlCommon_Journal_Can_Recover_L2Db_Journal() persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1}); Assert.True(persistRef.Ask(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result); await persistRef.GracefulStop(TimeSpan.FromSeconds(5)); + await Task.Delay(1000); persistRef = sys1.ActorOf(Props.Create(() => new JournalCompatActor(OldJournal, "p-3")), "test-recover-2"); diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs index 92d9649d..43a97dfc 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs @@ -51,12 +51,13 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" provider-name = """ + LinqToDB.ProviderName.SQLiteMS + $@""" #use-clone-connection = true table-compatibility-mode = sqlite + table-name = ""{tablename}"" tables {{ snapshot {{ auto-init = true warn-on-auto-init-fail = false - table-name = ""{tablename}"" + #table-name = ""{tablename}"" }} }} }} @@ -92,11 +93,12 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" #connection-string = ""FullUri=file:test.db&cache=shared"" provider-name = ""{LinqToDB.ProviderName.SQLiteMS}"" parallelism = 3 + table-name = ""{tablename}"" table-compatibility-mode = ""sqlite"" tables.journal {{ auto-init = true warn-on-auto-init-fail = false - table-name = ""{tablename}"" + #table-name = ""{tablename}"" metadata-table-name = ""{metadatatablename}"" }} diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj index 476d40ec..39c48968 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/Akka.Persistence.Linq2Db.Journal.Query.Tests.csproj @@ -8,10 +8,10 @@ - - - - + + + + diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj index fc69ab38..5e0ec9d1 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Akka.Persistence.Sql.Linq2Db.DockerTests.csproj @@ -12,16 +12,16 @@ - - - - - - + + + + + + - + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs index dd9f6b94..130c5c86 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs @@ -15,7 +15,8 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker /// public class PostgreSQLFixture : IAsyncLifetime { - protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}"; + protected readonly string PostgresContainerName = $"postgresSqlServer-{Guid.NewGuid():N}"; + //protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}"; protected DockerClient Client; public PostgreSQLFixture() @@ -31,34 +32,33 @@ public PostgreSQLFixture() Client = config.CreateClient(); } - protected string PostgreSQLImageName - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "microsoft/mssql-server-windows-express"; - return "postgres"; - } - } + + protected string ImageName => "postgres"; + protected string Tag => "latest"; - protected string SqlServerImageTag - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "2017-latest"; - return "latest"; - } - } + protected string PostgresImageName => $"{ImageName}:{Tag}"; public string ConnectionString { get; private set; } public async Task InitializeAsync() { - var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = PostgreSQLImageName}); + var images = await Client.Images.ListImagesAsync(new ImagesListParameters + { + Filters = new Dictionary> + { + { + "reference", + new Dictionary + { + {PostgresImageName, true} + } + } + } + }); + if (images.Count == 0) await Client.Images.CreateImageAsync( - new ImagesCreateParameters {FromImage = PostgreSQLImageName, Tag = "latest"}, null, + new ImagesCreateParameters { FromImage = ImageName, Tag = Tag }, null, new Progress(message => { Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) @@ -66,13 +66,13 @@ await Client.Images.CreateImageAsync( : $"{message.ID} {message.Status} {message.ProgressMessage}"); })); - var postgresPort = ThreadLocalRandom.Current.Next(9000, 10000); + var sqlServerHostPort = ThreadLocalRandom.Current.Next(9000, 10000); // create the container await Client.Containers.CreateContainerAsync(new CreateContainerParameters { - Image = PostgreSQLImageName, - Name = PostgreSqlImageName, + Image = PostgresImageName, + Name = PostgresContainerName, Tty = true, ExposedPorts = new Dictionary { @@ -88,37 +88,44 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters { new PortBinding { - HostPort = $"{postgresPort}" + HostPort = $"{sqlServerHostPort}" } } } } }, - Env = new[] {"POSTGRES_PASSWORD=l0lTh1sIsOpenSource"} + Env = new[] + { + "POSTGRES_PASSWORD=postgres", + "POSTGRES_USER=postgres" + } }); // start the container - await Client.Containers.StartContainerAsync(PostgreSqlImageName, new ContainerStartParameters()); + await Client.Containers.StartContainerAsync(PostgresContainerName, new ContainerStartParameters()); - // Provide a 30 second startup delay - await Task.Delay(TimeSpan.FromSeconds(30)); + // Provide a 10 second startup delay + await Task.Delay(TimeSpan.FromSeconds(10)); - var connectionString = new NpgsqlConnectionStringBuilder() - { - Host = "localhost", Password = "l0lTh1sIsOpenSource", - Username = "postgres", Database = "postgres", - Port = postgresPort - }; + ConnectionString = $"Server=127.0.0.1;Port={sqlServerHostPort};" + + "Database=postgres;User Id=postgres;Password=postgres"; - ConnectionString = connectionString.ToString(); + //var connectionString = new NpgsqlConnectionStringBuilder() + //{ + // Host = "localhost", Password = "l0lTh1sIsOpenSource", + // Username = "postgres", Database = "postgres", + // Port = sqlServerHostPort + //}; + // + //ConnectionString = connectionString.ToString(); } public async Task DisposeAsync() { if (Client != null) { - await Client.Containers.StopContainerAsync(PostgreSqlImageName, new ContainerStopParameters()); - await Client.Containers.RemoveContainerAsync(PostgreSqlImageName, + await Client.Containers.StopContainerAsync(PostgresContainerName, new ContainerStopParameters()); + await Client.Containers.RemoveContainerAsync(PostgresContainerName, new ContainerRemoveParameters {Force = true}); Client.Dispose(); } diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs index 1737bc23..e04f9267 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerFixture.cs @@ -31,34 +31,35 @@ public SqlServerFixture() Client = config.CreateClient(); } - protected string SqlServerImageName - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "microsoft/mssql-server-windows-express"; - return "mcr.microsoft.com/mssql/server"; - } - } - - protected string SqlServerImageTag - { - get - { - //if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - // return "2017-latest"; - return "2017-latest-ubuntu"; - } - } + protected string ImageName => + //RuntimeInformation.IsOSPlatform(OSPlatform.Windows) + // ? "microsoft/mssql-server-windows-express" + // : + "mcr.microsoft.com/mssql/server"; + protected string Tag => "2017-latest"; + protected string SqlServerImageName => $"{ImageName}:{Tag}"; public string ConnectionString { get; private set; } public async Task InitializeAsync() { - var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = SqlServerImageName}); + var images = await Client.Images.ListImagesAsync(new ImagesListParameters + { + Filters = new Dictionary> + { + { + "reference", + new Dictionary + { + {SqlServerImageName, true} + } + } + } + }); + if (images.Count == 0) await Client.Images.CreateImageAsync( - new ImagesCreateParameters {FromImage = SqlServerImageName, Tag = "latest"}, null, + new ImagesCreateParameters {FromImage = ImageName, Tag = Tag}, null, new Progress(message => { Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) @@ -107,7 +108,7 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters var connectionString = new DbConnectionStringBuilder { ConnectionString = - "data source=.;database=akka_persistence_tests;user id=sa;password=l0lTh1sIsOpenSource;" + "data source=.;database=akka_persistence_tests;user id=sa;password=l0lTh1sIsOpenSource" }; connectionString["Data Source"] = $"localhost,{sqlServerHostPort}"; diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj index 2cacaa68..7369e828 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Akka.Persistence.Sql.Linq2Db.Tests.csproj @@ -11,16 +11,16 @@ - - - - - - + + + + + + - + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj index ea42a61f..25ab378a 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj +++ b/src/Akka.Persistence.Sql.Linq2Db/Akka.Persistence.Sql.Linq2Db.csproj @@ -3,18 +3,18 @@ - 8 + default $(LibraryFramework) An Akka Persistence Module for SQL Databases using Linq2Db. - - - - + + + + - + diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs index e3dbce24..0410df3d 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs @@ -9,6 +9,10 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) BufferSize = config.GetInt("buffer-size", 5000); BatchSize = config.GetInt("batch-size", 100); + DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000); + PreferParametersOnMultiRowInsert = + config.GetBoolean("prefer-parameters-on-multirow-insert", + false); ReplayBatchSize = config.GetInt("replay-batch-size", 1000); Parallelism = config.GetInt("parallelism", 2); LogicalDelete = config.GetBoolean("logical-delete", false); @@ -17,6 +21,10 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) config.GetBoolean("delete-compatibility-mode", true); } + public bool PreferParametersOnMultiRowInsert { get; set; } + + public int DbRoundTripBatchSize { get; set; } + /// /// Specifies the batch size at which point /// will switch to 'Default' instead of 'MultipleRows'. For smaller sets diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs index ac4246ae..1b39ddda 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs @@ -9,12 +9,11 @@ public class SnapshotTableConfiguration { public SnapshotTableConfiguration(Configuration.Config config) { - config = - config.SafeWithFallback(Linq2DbSnapshotStore - .DefaultConfiguration); - var localcfg = config.GetConfig("tables.snapshot"); + + var localcfg = config.GetConfig("tables.snapshot") + .SafeWithFallback(config).SafeWithFallback(Configuration.Config.Empty); ColumnNames= new SnapshotTableColumnNames(config); - TableName = localcfg.GetString("table-name", "snapshot"); + TableName = config.GetString("table-name", localcfg.GetString("table-name", "snapshot")); SchemaName = localcfg.GetString("schema-name", null); AutoInitialize = localcfg.GetBoolean("auto-init", false); WarnOnAutoInitializeFail = diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index 3afe256c..0ef35926 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Data; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -37,6 +38,7 @@ public abstract class BaseByteArrayJournalDao : public bool logicalDelete; protected readonly ILoggingAdapter _logger; private Flow, long)>, NotUsed> deserializeFlow; + private Flow, NotUsed> deserializeFlowMapped; protected BaseByteArrayJournalDao(IAdvancedScheduler sched, IMaterializer materializerr, @@ -49,6 +51,7 @@ protected BaseByteArrayJournalDao(IAdvancedScheduler sched, logicalDelete = _journalConfig.DaoConfig.LogicalDelete; Serializer = serializer; deserializeFlow = Serializer.DeserializeFlow(); + deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper()); //Due to C# rules we have to initialize WriteQueue here //Keeping it here vs init function prevents accidental moving of init //to where variables aren't set yet. @@ -132,13 +135,38 @@ private async Task QueueWriteJournalRows(Seq xs) private async Task WriteJournalRows(Seq xs) { - using (var db = _connectionFactory.GetConnection()) { //hot path: //If we only have one row, penalty for BulkCopy //Isn't worth it due to insert caching/etc. if (xs.Count > 1) { + await InsertMultiple(xs); + } + else if (xs.Count > 0) + { + await InsertSingle(xs); + } + } + + } + + private async Task InsertSingle(Seq xs) + { + using (var db = _connectionFactory.GetConnection()) + { + await db.InsertAsync(xs.Head); + } + } + + private async Task InsertMultiple(Seq xs) + { + using (var db = _connectionFactory.GetConnection()) + { + try + { + await db.BeginTransactionAsync(IsolationLevel + .ReadCommitted); await db.GetTable() .BulkCopyAsync( new BulkCopyOptions() @@ -148,30 +176,50 @@ await db.GetTable() .MaxRowByRowSize ? BulkCopyType.Default : BulkCopyType.MultipleRows, - UseInternalTransaction = true + //TODO: When Parameters are allowed, + //Make a Config Option + //Or default to true + UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert, + MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize }, xs); + await db.CommitTransactionAsync(); } - else if (xs.Count > 0) + catch (Exception e) { - await db.InsertAsync(xs.Head); + try + { + await db.RollbackTransactionAsync(); + } + catch (Exception exception) + { + throw e; + } + + throw; } } - } - + public async Task> AsyncWriteMessages( IEnumerable messages, long timeStamp = 0) { - var serializedTries = Serializer.Serialize(messages, timeStamp); //Just a little bit of magic here; //.ToList() keeps it all working later for whatever reason //while still keeping our allocations in check. + + /*var trySet = new List(); + foreach (var serializedTry in serializedTries) + { + trySet.AddRange(serializedTry.Success.GetOrElse(new List(0))); + } + + var rows = Seq(trySet);*/ var rows = Seq(serializedTries.SelectMany(serializedTry => serializedTry.Success.GetOrElse(new List(0))) .ToList()); - + // return await QueueWriteJournalRows(rows).ContinueWith(task => @@ -400,30 +448,27 @@ public override { query = query.Take((int) max); } - - var runninng = query.ToListAsync(); - return Source.FromTask(runninng).SelectMany(r => r) - .Via(deserializeFlow.Select(MessageWithBatchMapper())); + + + return + Source.FromTask(query.ToListAsync()) + .SelectMany(r => r) + .Via(deserializeFlowMapped); //return AsyncSource.FromEnumerable(query,async q=>await q.ToListAsync()) // .Via( // deserializeFlow).Select(MessageWithBatchMapper()); } } - private static Func, long)>, Util.Try> MessageWithBatchMapper() - { - return sertry => + private static Func, long)>, Util.Try> MessageWithBatchMapper() => + sertry => { if (sertry.IsSuccess) { - var success = sertry.Success.Value; return new Util.Try( - new ReplayCompletion() - { - repr = success.Item1, - Ordering = success.Item3 - }); + new ReplayCompletion( sertry.Success.Value + )); } else { @@ -432,6 +477,5 @@ public override sertry.Failure.Value); } }; - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs index 2ff59c08..78871b38 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs @@ -61,7 +61,7 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, Util.Option lastSeq = Util.Option.None; if (lastMsg != null && lastMsg.IsSuccess) { - lastSeq = lastMsg.Success.Select(r => r.repr.SequenceNr); + lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr); } else if (lastMsg != null && lastMsg.Failure.HasValue) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs index 16e418b0..21ec4de9 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs @@ -28,41 +28,67 @@ public ByteArrayJournalSerializer(IProviderConfig journalCon _separator = separator; _separatorArray = new[] {_separator}; } + + /// + /// Concatenates a set of tags using a provided separator. + /// + /// + /// + /// + private static string StringSep(IImmutableSet tags, + string separator) + { + if (tags.Count == 0) + { + return ""; + } + + return tags.Aggregate((tl, tr) => + tl + separator + tr); + } + protected override Try Serialize(IPersistentRepresentation persistentRepr, IImmutableSet tTags, long timeStamp = 0) { try { - var serializer = _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - string manifest = ""; - var binary = Akka.Serialization.Serialization.WithTransport(_serializer.System, () => - { - - if (serializer is SerializerWithStringManifest stringManifest) - { - manifest = - stringManifest.Manifest(persistentRepr.Payload); - } - else - { - if (serializer.IncludeManifest) + return Akka.Serialization.Serialization.WithTransport( + _serializer.System, (persistentRepr + , _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer), + StringSep(tTags,_separator), + timeStamp + ), + state => { - manifest = persistentRepr.Payload.GetType().TypeQualifiedName(); - } - } - - return serializer.ToBinary(persistentRepr.Payload); - }); - return new Try(new JournalRow() - { - manifest = manifest, - message = binary, - persistenceId = persistentRepr.PersistenceId, - tags = tTags.Any()? tTags.Aggregate((tl, tr) => tl + _separator + tr) : "", - Identifier = serializer.Identifier, - sequenceNumber = persistentRepr.SequenceNr, - Timestamp = persistentRepr.Timestamp==0? timeStamp: persistentRepr.Timestamp - }); + var (_persistentRepr, serializer,tags,ts) = state; + string thisManifest = ""; + if (serializer is SerializerWithStringManifest withStringManifest) + { + thisManifest = + withStringManifest.Manifest(_persistentRepr.Payload); + } + else + { + if (serializer.IncludeManifest) + { + thisManifest = _persistentRepr.Payload + .GetType().TypeQualifiedName(); + } + } + return new Try(new JournalRow() + { + message = + serializer.ToBinary(_persistentRepr.Payload), + manifest = thisManifest, + persistenceId = _persistentRepr.PersistenceId, + tags = tags, + Identifier = serializer.Identifier, + sequenceNumber = _persistentRepr.SequenceNr, + Timestamp = _persistentRepr.Timestamp == 0 + ? ts + : _persistentRepr.Timestamp + }); + }); } catch (Exception e) { @@ -74,34 +100,44 @@ protected override Try Serialize(IPersistentRepresentation persisten { try { - object deserialized = null; - if (t.Identifier.HasValue == false) + //object deserialized = null; + var identifierMaybe = t.Identifier; + if (identifierMaybe.HasValue == false) { var type = System.Type.GetType(t.manifest, true); - var deserializer = - _serializer.FindSerializerForType(type, null); + // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - deserialized = - Akka.Serialization.Serialization.WithTransport( - _serializer.System, - () => deserializer.FromBinary(t.message, type)); + + return new Try<(IPersistentRepresentation, IImmutableSet + , long)>(( + new Persistent( Akka.Serialization.Serialization.WithTransport( + _serializer.System, (_serializer.FindSerializerForType(type,_journalConfig.DefaultSerializer),t.message,type), + (state) => + { + return state.Item1.FromBinary( + state.message, state.type); + }), t.sequenceNumber, + t.persistenceId, + t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), + t.tags?.Split(_separatorArray, + StringSplitOptions.RemoveEmptyEntries) + .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + t.ordering)); } else { - var serializerId = t.Identifier.Value; + return new Try<(IPersistentRepresentation, IImmutableSet + , long)>(( + new Persistent(_serializer.Deserialize(t.message, + identifierMaybe.Value,t.manifest), t.sequenceNumber, + t.persistenceId, + t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), + t.tags?.Split(_separatorArray, + StringSplitOptions.RemoveEmptyEntries) + .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + t.ordering)); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - deserialized = _serializer.Deserialize(t.message, - serializerId,t.manifest); } - - return ( - new Persistent(deserialized, t.sequenceNumber, - t.persistenceId, - t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), - t.tags?.Split(_separatorArray, - StringSplitOptions.RemoveEmptyEntries) - .ToImmutableHashSet() ?? ImmutableHashSet.Empty, - t.ordering); } catch (Exception e) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs index ffbbb005..2a575137 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs @@ -4,16 +4,28 @@ public class FlowControl { public class Continue : FlowControl { + private Continue() + { + } + public static Continue Instance = new Continue(); } public class ContinueDelayed : FlowControl { + private ContinueDelayed() + { + } + public static ContinueDelayed Instance = new ContinueDelayed(); } public class Stop : FlowControl { + private Stop() + { + } + public static Stop Instance = new Stop(); } } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs index f78cd947..7093bf17 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs @@ -133,7 +133,7 @@ await _journal.MessagesWithBatch(persistenceId, fromSequenceNr, t.Failure.Value)) .RunForeach(r => { - recoveryCallback(r.repr); + recoveryCallback(r.Repr); }, _mat); } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs index 80b771cc..4574e9d6 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs @@ -5,6 +5,10 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { public sealed class JournalRow { + public JournalRow() + { + + } public long ordering { get; set; } public long Timestamp { get; set; } = 0; diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs index c5e105f6..28cad0ff 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/ReplayCompletion.cs @@ -1,8 +1,23 @@ -namespace Akka.Persistence.Sql.Linq2Db.Journal.Types +using System.Collections.Immutable; + +namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { public class ReplayCompletion { - public IPersistentRepresentation repr { get; set; } - public long Ordering { get; set; } + public ReplayCompletion(IPersistentRepresentation repr, long ordering) + { + Repr = repr; + Ordering = ordering; + } + + public readonly IPersistentRepresentation Repr; + public readonly long Ordering; + + public ReplayCompletion((IPersistentRepresentation, IImmutableSet, long) success) + { + //(Repr, _, Ordering) = success; + Repr = success.Item1; + Ordering = success.Item3; + } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index 28afb73d..9d1ff11f 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -156,11 +156,10 @@ private Flow perfectlyMatchTag( DataConnection dc, string persistenceId, long fromSequenceNr, long toSequenceNr, long max) { - var toTake = MaxTake(max); return AsyncSource.FromEnumerable( new { - dc, persistenceId, fromSequenceNr, toSequenceNr, toTake, + dc, persistenceId, fromSequenceNr, toSequenceNr,toTake= MaxTake(max), includeDeleted }, async (state) => @@ -180,10 +179,8 @@ await baseQueryStatic(state.dc, state.includeDeleted) { var val = t.Get(); return new Akka.Util.Try( - new ReplayCompletion() - { - repr = val.Item1, Ordering = val.Item3 - }); + new ReplayCompletion(val.Item1,val.Item3) + ); } catch (Exception e) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs index e03f9f33..03bca12e 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Linq2DbReadJournal.cs @@ -154,8 +154,8 @@ private Source _eventsByPersistenceIdSource( toSequenceNr, batchSize, refreshInterval) .SelectAsync, ReplayCompletion, NotUsed>(1, reprAndOrdNr => Task.FromResult(reprAndOrdNr.Get())) - .SelectMany((ReplayCompletion r) => _adaptEvents(r.repr) - .Select(p => new {repr = r.repr, ordNr = r.Ordering})) + .SelectMany((ReplayCompletion r) => _adaptEvents(r.Repr) + .Select(p => new {repr = r.Repr, ordNr = r.Ordering})) .Select(r => new EventEnvelope(new Sequence(r.ordNr), r.repr.PersistenceId, r.repr.SequenceNr, r.repr.Payload,r.repr.Timestamp)); } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs index ad6892cf..24c67720 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs @@ -25,23 +25,10 @@ public bool InRange(long number) public IEnumerable ToEnumerable() { - var itemCount = until - from; - List returnList; - if (itemCount < Int32.MaxValue) - { - returnList = new List(); - } - else - { - returnList = new List(); - } - for (long i = from; i < until; i++) { - returnList.Add(i); + yield return i; } - - return returnList; } public IEnumerator GetEnumerator() diff --git a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs index a897c59a..c2b1e347 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs @@ -44,6 +44,7 @@ public abstract class PersistentReprSerializer if (opt.HasValue) { retList.Add(opt.Value); + return new Util.Try>(retList); } else { @@ -65,43 +66,16 @@ public abstract class PersistentReprSerializer return new Util.Try>(ser.Failure.Value); } } + + return new Util.Try>(retList); } - return new Util.Try>(retList); + //return new Util.Try>(retList); }).ToList(); } - public Seq>> SerializeSeq( - IEnumerable messages) - { - return messages.Select(aw => - { - return Util.Try>.From(() => - { - var serialized = - (aw.Payload as IEnumerable) - .Select(p=> Serialize(p).Get()); - return serialized.ToSeq(); - }); - }).ToSeq(); - - //return messages.Select(aw => - //{ - // var serialized = (aw.Payload as IEnumerable) - // .Select(Serialize). - //}) - //return Seq(messages.Select(aw => - //{ - // var serialized = - // (aw.Payload as IEnumerable) - // .Select(Serialize); - // return TrySeq.SequenceSeq(serialized); - //})); - } - - public Akka.Util.Try Serialize(IPersistentRepresentation persistentRepr, long timeStamp = 0) { switch (persistentRepr.Payload) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs index e7275493..90372dbf 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/ByteArraySnapshotSerializer.cs @@ -45,8 +45,11 @@ protected object GetSnapshot(SnapshotRow reader) { var type = Type.GetType(manifest, true); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - var serializer = _serialization.FindSerializerForType(type, _config.DefaultSerializer); - obj = Akka.Serialization.Serialization.WithTransport(_serialization.System, () => serializer.FromBinary(binary, type)); + obj = Akka.Serialization.Serialization.WithTransport( + _serialization.System, + (serializer: _serialization.FindSerializerForType(type, _config.DefaultSerializer), + binary, type), + (state) => state.serializer.FromBinary(state.binary, state.type)); } else { @@ -60,8 +63,8 @@ private SnapshotRow ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) { var snapshotType = snapshot.GetType(); var serializer = _serialization.FindSerializerForType(snapshotType, _config.DefaultSerializer); - var binary = Akka.Serialization.Serialization.WithTransport(_serialization.System, - () => serializer.ToBinary(snapshot)); + var binary = Akka.Serialization.Serialization.WithTransport(_serialization.System,(serializer, snapshot), + (state ) => state.serializer.ToBinary(state.snapshot)); string manifest = ""; if (serializer is SerializerWithStringManifest) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf index 0efd8e7b..b0000294 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf +++ b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf @@ -18,7 +18,9 @@ # Rather than actual physical deletions logical-delete = false - # If true, journal_metadata is created + # If true, journal_metadata is created and used for deletes + # and max sequence number queries. + # note that there is a performance penalty for using this. delete-compatibility-mode = true # If "sqlite" or "sqlserver", default column names are compatible with @@ -32,11 +34,25 @@ buffer-size = 5000 #Batch size refers to the number of items included in a batch to DB + # (In cases where an AtomicWrite is greater than batch-size, + # The Atomic write will still be handled in a single batch.) #JDBC Default is/was 400 but testing against scenarios indicates #100 is better for overall latency. That said, #larger batches may be better if you have A fast/local DB. batch-size = 100 + #This batch size controls the maximum number of rows that will be sent + #In a single round trip to the DB. This is different than the -actual- batch size, + #And intentionally set larger than batch-size, + #to help atomicwrites be faster + #Note that Linq2Db may use a lower number per round-trip in some cases. + db-round-trip-max-batch-size = 1000 + + #Linq2Db by default will use a built string for multi-row inserts + #Somewhat counterintuitively, this is faster than using parameters in most cases, + #But if you would prefer parameters, you can set this to true. + prefer-parameters-on-multirow-insert = true + # Denotes the number of messages retrieved # Per round-trip to DB on recovery. # This is to limit both size of dataset from DB (possibly lowering locking requirements) diff --git a/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf b/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf index d30aa2d8..40ca63b0 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf +++ b/src/Akka.Persistence.Sql.Linq2Db/snapshot.conf @@ -23,8 +23,7 @@ # Column names will be compatible with Akka.Persistence.Sql # You still must set your table name! table-compatibility-mode = false - tables - { + tables. snapshot { schema-name = null @@ -69,7 +68,6 @@ serializerId: "serializer_id", } } - } } } } \ No newline at end of file