Skip to content

Commit

Permalink
Update Akka.Net version, Clean up dead code in `ByteArrayJournalSeria…
Browse files Browse the repository at this point in the history
…lizer`, remove perf numbers till we make new ones
  • Loading branch information
to11mtm committed May 2, 2021
1 parent 9251484 commit 3748fcf
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 222 deletions.
31 changes: 5 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

A Cross-SQL-DB Engine Akka.Persistence plugin with broad database compatibility thanks to Linq2Db.

This is a Fairly-Naive port of the amazing akka-persistence-jdbc package from Scala to C#.
This is a port of the amazing akka-persistence-jdbc package from Scala, with a few improvements based on C# as well as our choice of data library.

Please read the documentation carefully. Some features may be specific to use case and have trade-offs (namely, compatibility modes)

## Status

- Usable for basic Read/Writes

- Implements the following for `Akka.Persistence.Query`:
- IPersistenceIdsQuery
- ICurrentPersistenceIdsQuery
Expand All @@ -20,9 +20,7 @@ Please read the documentation carefully. Some features may be specific to use ca
- ICurrentAllEventsQuery
- Snapshot Store Support

#### This is still a WORK IN PROGRESS

**Pull Requests are Welcome** but please note this is still considered 'work in progress' and only used if one understands the risks. While the TCK Specs pass you should still test in a 'safe' non-production environment carefully before deciding to fully deploy.
See something you want to add or improve? **Pull Requests are Welcome!**

Working:

Expand Down Expand Up @@ -102,27 +100,8 @@ Compatibility with existing Providers is partially implemented via `table-compat

# Performance

Tests based on i-7 8750H, 32GB Ram, 2TB SSD, Windows 10 version Version 10.0.19041.630.
Databases running on Docker WSL2.

All numbers are in msg/sec.

|Test |SqlServer (normal) | SqlServer Batching | Linq2Db |vs Normal| vs Batching|
|:------------- |:------------- | :----------: | -----------: | -----------: | -----------: |
|Persist | 164|445| 265|143.29%|55.04%|
|PersistAll | 782|958| 6364|717.26%|641.03%|
|PersistAsync | 630|917| 16463|2555.40%|1902.96%|
|PersistAllAsync | 2095|908| 16168|748.50%|1738.47%|
|PersistGroup10 | 590|633| 1239|181.19%|157.21%|
|PersistGroup100 | 607|930| 5504|912.19%|573.78%|
|PersistGroup200 | 628|1353| 8498|1268.47%|587.46%|
|PersistGroup25 | 629|632| 2473|348.01%|324.30%|
|PersistGroup400 | 612|1785| 8136|1182.52%|715.83%|
|PersistGroup50 | 612|651| 4075|631.86%|591.28%|
|Recovering | 41903|45161| 43442|101.64%|109.87%|
|Recovering8 | 75466|69575| 66186|84.75%|97.63%|
|RecoveringFour | 59259|51979| 58230|98.61%|113.79%|
|RecoveringTwo | 41745|37487| 41525|98.47%|115.76%|
Updated Performance numbers pending.

## Sql.Common Compatibility modes

- Delete Compatibility mode is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Akka.Persistence.Redis" Version="1.4.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Akka.Persistence.Redis" Version="1.4.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.PostgreSql" Version="1.4.17" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.18" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.4" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.19" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.18" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.SqlServer" Version="1.4.16" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.TestKit" Version="1.4.18" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.TestKit" Version="1.4.19" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.19" />
<PackageReference Include="Docker.DotNet" Version="3.125.4" />
<PackageReference Include="Hyperion" Version="0.9.17" />
<PackageReference Include="Hyperion" Version="0.10.1" />
<PackageReference Include="JetBrains.dotMemoryUnit" Version="3.1.20200127.214830" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.4" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="Npgsql" Version="4.1.4" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.18" />
<PackageReference Include="Akka" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.SqlServer" Version="1.4.16" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.TestKit" Version="1.4.18" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.18" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.4.19" />
<PackageReference Include="Akka.Persistence.TestKit" Version="1.4.19" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.19" />
<PackageReference Include="Docker.DotNet" Version="3.125.4" />
<PackageReference Include="Hyperion" Version="0.9.17" />
<PackageReference Include="Hyperion" Version="0.10.1" />
<PackageReference Include="JetBrains.dotMemoryUnit" Version="3.1.20200127.214830" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.4" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="5.0.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="Npgsql" Version="4.1.4" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,180 +28,6 @@ public ByteArrayJournalSerializer(IProviderConfig<JournalTableConfig> journalCon
_separator = separator;
_separatorArray = new[] {_separator};
}
/*
protected Try<JournalRow> SerializeNew(IPersistentRepresentation persistentRepr, IImmutableSet<string> tTags, long timeStamp = 0)
{
try
{
var ser = _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = toBinary(persistentRepr, ser, out var manifest);
return new Try<JournalRow>(new JournalRow()
{
manifest = manifest,
message = binary,
persistenceId = persistentRepr.PersistenceId,
tags = tTags.Any()? tTags.Aggregate((tl, tr) => tl + _separator + tr) : "",
Identifier = ser.Identifier,
sequenceNumber = persistentRepr.SequenceNr,
Timestamp = persistentRepr.Timestamp==0? timeStamp: persistentRepr.Timestamp
});
}
catch (Exception e)
{
return new Try<JournalRow>(e);
}
}
protected Try<JournalRow> SerializeNew2(IPersistentRepresentation persistentRepr, IImmutableSet<string> tTags, long timeStamp = 0)
{
try
{
var ser = _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var (binary, manifest) = toBinary2(persistentRepr.Payload, ser);
//var binary = toBinary(persistentRepr, ser, out var manifest);
return new Try<JournalRow>(new JournalRow()
{
manifest = manifest,
message = binary,
persistenceId = persistentRepr.PersistenceId,
tags = tTags.Any()? tTags.Aggregate((tl, tr) => tl + _separator + tr) : "",
Identifier = ser.Identifier,
sequenceNumber = persistentRepr.SequenceNr,
Timestamp = persistentRepr.Timestamp==0? timeStamp: persistentRepr.Timestamp
});
}
catch (Exception e)
{
return new Try<JournalRow>(e);
}
}
private byte[] toBinary(IPersistentRepresentation persistentRepr,
Serializer ser, out string manifest)
{
byte[] binary;
(binary, manifest) = Akka.Serialization.Serialization.WithTransport(
_serializer.System, (persistentRepr.Payload, ser),
static state =>
{
var (payload, serializer) = state;
string thisManifest = "";
if (serializer is SerializerWithStringManifest
stringManifest)
{
thisManifest =
stringManifest.Manifest(payload);
}
else
{
if (serializer.IncludeManifest)
{
thisManifest = payload
.GetType().TypeQualifiedName();
}
}
return (serializer.ToBinary(payload),
thisManifest);
});
return binary;
}
private (byte[],string) toBinary2(object payload,
Serializer ser)
{
return Akka.Serialization.Serialization.WithTransport(
_serializer.System, (payload, ser),
static state =>
{
var (payload, serializer) = state;
string thisManifest = "";
if (serializer is SerializerWithStringManifest
stringManifest)
{
thisManifest =
stringManifest.Manifest(payload);
}
else
{
if (serializer.IncludeManifest)
{
thisManifest = payload
.GetType().TypeQualifiedName();
}
}
return (serializer.ToBinary(payload),
thisManifest);
});
}
private JournalRow toBinary3(object payload)
{
return Akka.Serialization.Serialization.WithTransport(
_serializer.System, (payload,
_serializer.FindSerializerForType(
payload.GetType(),
_journalConfig.DefaultSerializer)),
static state =>
{
var (payload, serializer) = state;
if (serializer is SerializerWithStringManifest
stringManifest)
{
return new JournalRow()
{
message =serializer.ToBinary(payload),
manifest = stringManifest.Manifest(payload),
Identifier = serializer.Identifier
};
}
if (serializer.IncludeManifest)
{
return new JournalRow()
{
message =serializer.ToBinary(payload),
manifest = payload
.GetType().TypeQualifiedName(),
Identifier = serializer.Identifier
};
}
return new JournalRow()
{
message =serializer.ToBinary(payload),
manifest = String.Empty,
Identifier = serializer.Identifier
};
});
}
protected Try<JournalRow> SerializeN(IPersistentRepresentation persistentRepr, IImmutableSet<string> tTags, long timeStamp = 0)
{
try
{
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var buildingPayload = toBinary3(persistentRepr.Payload);
//var binary = toBinary(persistentRepr, ser, out var manifest);
buildingPayload.persistenceId = persistentRepr.PersistenceId;
buildingPayload.tags = tTags.Count == 0
? ""
: tTags.Aggregate((tl, tr) => tl + _separator + tr);
buildingPayload.sequenceNumber = persistentRepr.SequenceNr;
buildingPayload.Timestamp = persistentRepr.Timestamp == 0
? timeStamp
: persistentRepr.Timestamp;
return buildingPayload;
}
catch (Exception e)
{
return new Try<JournalRow>(e);
}
}*/

/// <summary>
/// Concatenates a set of tags using a provided separator.
Expand Down

0 comments on commit 3748fcf

Please sign in to comment.