Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialization pipeline Fixes/improvements #22

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 5 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

A Cross-SQL-DB Engine Akka.Persistence plugin with broad database compatibility thanks to Linq2Db.

This is a Fairly-Naive port of the amazing akka-persistence-jdbc package from Scala to C#.
This is a port of the amazing akka-persistence-jdbc package from Scala, with a few improvements based on C# as well as our choice of data library.

Please read the documentation carefully. Some features may be specific to use case and have trade-offs (namely, compatibility modes)

## Status

- Usable for basic Read/Writes

- Implements the following for `Akka.Persistence.Query`:
- IPersistenceIdsQuery
- ICurrentPersistenceIdsQuery
Expand All @@ -20,9 +20,7 @@ Please read the documentation carefully. Some features may be specific to use ca
- ICurrentAllEventsQuery
- Snapshot Store Support

#### This is still a WORK IN PROGRESS

**Pull Requests are Welcome** but please note this is still considered 'work in progress' and only used if one understands the risks. While the TCK Specs pass you should still test in a 'safe' non-production environment carefully before deciding to fully deploy.
See something you want to add or improve? **Pull Requests are Welcome!**

Working:

Expand Down Expand Up @@ -102,27 +100,8 @@ Compatibility with existing Providers is partially implemented via `table-compat

# Performance

Tests based on i-7 8750H, 32GB Ram, 2TB SSD, Windows 10 version Version 10.0.19041.630.
Databases running on Docker WSL2.

All numbers are in msg/sec.

|Test |SqlServer (normal) | SqlServer Batching | Linq2Db |vs Normal| vs Batching|
|:------------- |:------------- | :----------: | -----------: | -----------: | -----------: |
|Persist | 164|427| 235|143.29%|55.04%|
|PersistAll | 782|875| 5609|717.26%|641.03%|
|PersistAsync | 630|846| 16099|2555.40%|1902.96%|
|PersistAllAsync | 2095|902| 15681|748.50%|1738.47%|
|PersistGroup10 | 590|680| 1069|181.19%|157.21%|
|PersistGroup100 | 607|965| 5537|912.19%|573.78%|
|PersistGroup200 | 628|1356| 7966|1268.47%|587.46%|
|PersistGroup25 | 629|675| 2189|348.01%|324.30%|
|PersistGroup400 | 612|1011| 7237|1182.52%|715.83%|
|PersistGroup50 | 612|654| 3867|631.86%|591.28%|
|Recovering | 41903|38766| 42592|101.64%|109.87%|
|Recovering8 | 75466|65515| 63960|84.75%|97.63%|
|RecoveringFour | 59259|51355| 58437|98.61%|113.79%|
|RecoveringTwo | 41745|35512| 41108|98.47%|115.76%|
Updated Performance numbers pending.

## Sql.Common Compatibility modes

- Delete Compatibility mode is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.3.9" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Akka.Persistence.Redis" Version="1.4.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="redis-inside" Version="3.3.0" />
<PackageReference Include="StackExchange.Redis" Version="2.1.58" />
<PackageReference Include="StackExchange.Redis" Version="2.2.4" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="coverlet.collector" Version="1.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.3.9" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Akka.Persistence.Redis" Version="1.4.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="redis-inside" Version="3.3.0" />
<PackageReference Include="StackExchange.Redis" Version="2.1.58" />
<PackageReference Include="StackExchange.Redis" Version="2.2.4" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="coverlet.collector" Version="1.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ internal void MeasureGroup(Func<TimeSpan, string> msg, Action block, int numMsg,
{
var measurements = new List<TimeSpan>(MeasurementIterations);

block();
block(); //warm-up

int i = 0;
Expand Down Expand Up @@ -550,7 +551,7 @@ public void PersistenceActor_performance_must_measure_Recovering8()
FeedAndExpectLastSpecific(p6, "p", Commands);
FeedAndExpectLastSpecific(p7, "p", Commands);
FeedAndExpectLastSpecific(p8, "p", Commands);
MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () =>
MeasureGroup(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms , {(EventsCount*8 / d.TotalMilliseconds) * 1000} total msg/sec", () =>
{
var task1 = Task.Run(()=>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.3.9" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.3.9" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public async Task Can_Recover_SqlCommon_Journal()
persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1});
Assert.True(persistRef.Ask<bool>(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
await Task.Delay(1000);
persistRef = sys1.ActorOf(Props.Create(() =>
new JournalCompatActor(NewJournal,
"p-1")), "test-recover-1");
Expand Down Expand Up @@ -90,6 +91,7 @@ public async Task SqlCommon_Journal_Can_Recover_L2Db_Journal()
persistRef.Tell(new SomeEvent(){EventName = "rec-test", Guid = ourGuid, Number = 1});
Assert.True(persistRef.Ask<bool>(new ContainsEvent(){Guid = ourGuid}, TimeSpan.FromSeconds(5)).Result);
await persistRef.GracefulStop(TimeSpan.FromSeconds(5));
await Task.Delay(1000);
persistRef = sys1.ActorOf(Props.Create(() =>
new JournalCompatActor(OldJournal,
"p-3")), "test-recover-2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}""
provider-name = """ + LinqToDB.ProviderName.SQLiteMS + $@"""
#use-clone-connection = true
table-compatibility-mode = sqlite
table-name = ""{tablename}""
tables
{{
snapshot {{
auto-init = true
warn-on-auto-init-fail = false
table-name = ""{tablename}""
#table-name = ""{tablename}""
}}
}}
}}
Expand Down Expand Up @@ -92,11 +93,12 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}""
#connection-string = ""FullUri=file:test.db&cache=shared""
provider-name = ""{LinqToDB.ProviderName.SQLiteMS}""
parallelism = 3
table-name = ""{tablename}""
table-compatibility-mode = ""sqlite""
tables.journal {{
auto-init = true
warn-on-auto-init-fail = false
table-name = ""{tablename}""
#table-name = ""{tablename}""
metadata-table-name = ""{metadatatablename}""

}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.14" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.1" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.19" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.SqlServer" Version="1.4.13" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.14" />
<PackageReference Include="Akka.Persistence.TestKit" Version="1.4.14" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.14" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.SqlServer" Version="1.4.16" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.TestKit" Version="1.4.19" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.19" />
<PackageReference Include="Docker.DotNet" Version="3.125.4" />
<PackageReference Include="Hyperion" Version="0.9.16" />
<PackageReference Include="Hyperion" Version="0.10.1" />
<PackageReference Include="JetBrains.dotMemoryUnit" Version="3.1.20200127.214830" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.1" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="Npgsql" Version="4.1.4" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker
/// </summary>
public class PostgreSQLFixture : IAsyncLifetime
{
protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}";
protected readonly string PostgresContainerName = $"postgresSqlServer-{Guid.NewGuid():N}";
//protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}";
protected DockerClient Client;

public PostgreSQLFixture()
Expand All @@ -31,48 +32,47 @@ public PostgreSQLFixture()
Client = config.CreateClient();
}

protected string PostgreSQLImageName
{
get
{
//if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
// return "microsoft/mssql-server-windows-express";
return "postgres";
}
}

protected string ImageName => "postgres";
protected string Tag => "latest";

protected string SqlServerImageTag
{
get
{
//if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
// return "2017-latest";
return "latest";
}
}
protected string PostgresImageName => $"{ImageName}:{Tag}";

public string ConnectionString { get; private set; }

public async Task InitializeAsync()
{
var images = await Client.Images.ListImagesAsync(new ImagesListParameters {MatchName = PostgreSQLImageName});
var images = await Client.Images.ListImagesAsync(new ImagesListParameters
{
Filters = new Dictionary<string, IDictionary<string, bool>>
{
{
"reference",
new Dictionary<string, bool>
{
{PostgresImageName, true}
}
}
}
});

if (images.Count == 0)
await Client.Images.CreateImageAsync(
new ImagesCreateParameters {FromImage = PostgreSQLImageName, Tag = "latest"}, null,
new ImagesCreateParameters { FromImage = ImageName, Tag = Tag }, null,
new Progress<JSONMessage>(message =>
{
Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage)
? message.ErrorMessage
: $"{message.ID} {message.Status} {message.ProgressMessage}");
}));

var postgresPort = ThreadLocalRandom.Current.Next(9000, 10000);
var sqlServerHostPort = ThreadLocalRandom.Current.Next(9000, 10000);

// create the container
await Client.Containers.CreateContainerAsync(new CreateContainerParameters
{
Image = PostgreSQLImageName,
Name = PostgreSqlImageName,
Image = PostgresImageName,
Name = PostgresContainerName,
Tty = true,
ExposedPorts = new Dictionary<string, EmptyStruct>
{
Expand All @@ -88,37 +88,44 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters
{
new PortBinding
{
HostPort = $"{postgresPort}"
HostPort = $"{sqlServerHostPort}"
}
}
}
}
},
Env = new[] {"POSTGRES_PASSWORD=l0lTh1sIsOpenSource"}
Env = new[]
{
"POSTGRES_PASSWORD=postgres",
"POSTGRES_USER=postgres"
}
});

// start the container
await Client.Containers.StartContainerAsync(PostgreSqlImageName, new ContainerStartParameters());
await Client.Containers.StartContainerAsync(PostgresContainerName, new ContainerStartParameters());

// Provide a 30 second startup delay
await Task.Delay(TimeSpan.FromSeconds(30));
// Provide a 10 second startup delay
await Task.Delay(TimeSpan.FromSeconds(10));

var connectionString = new NpgsqlConnectionStringBuilder()
{
Host = "localhost", Password = "l0lTh1sIsOpenSource",
Username = "postgres", Database = "postgres",
Port = postgresPort
};
ConnectionString = $"Server=127.0.0.1;Port={sqlServerHostPort};" +
"Database=postgres;User Id=postgres;Password=postgres";

ConnectionString = connectionString.ToString();
//var connectionString = new NpgsqlConnectionStringBuilder()
//{
// Host = "localhost", Password = "l0lTh1sIsOpenSource",
// Username = "postgres", Database = "postgres",
// Port = sqlServerHostPort
//};
//
//ConnectionString = connectionString.ToString();
}

public async Task DisposeAsync()
{
if (Client != null)
{
await Client.Containers.StopContainerAsync(PostgreSqlImageName, new ContainerStopParameters());
await Client.Containers.RemoveContainerAsync(PostgreSqlImageName,
await Client.Containers.StopContainerAsync(PostgresContainerName, new ContainerStopParameters());
await Client.Containers.RemoveContainerAsync(PostgresContainerName,
new ContainerRemoveParameters {Force = true});
Client.Dispose();
}
Expand Down
Loading