Skip to content

Commit

Permalink
feat: use the correct snapshot logic
Browse files Browse the repository at this point in the history
  • Loading branch information
CumpsD committed Mar 23, 2023
1 parent 26d8d4c commit 09dcdd5
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 117 deletions.
51 changes: 51 additions & 0 deletions src/Akka.Persistence.Sql/Db/AkkaDataConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// -----------------------------------------------------------------------
// <copyright file="AkkaDataConnection.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.Data.RetryPolicy;

namespace Akka.Persistence.Sql.Db
{
public class AkkaDataConnection : IDisposable, IAsyncDisposable
{
private readonly string _providerName;

public AkkaDataConnection(
string providerName,
DataConnection connection)
{
_providerName = providerName.ToLower();
Db = connection;
}

public bool UseDateTime =>
!_providerName.Contains("sqlite") &&
!_providerName.Contains("postgresql");

public DataConnection Db { get; }

public IRetryPolicy RetryPolicy
{
get => Db.RetryPolicy;
set => Db.RetryPolicy = value;
}

public ValueTask DisposeAsync()
=> Db.DisposeAsync();

public void Dispose()
=> Db.Dispose();

public AkkaDataConnection Clone()
=> new(_providerName, (DataConnection)Db.Clone());

public ITable<T> GetTable<T>() where T : class
=> Db.GetTable<T>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Journal.Types;
using Akka.Persistence.Sql.Snapshot;
using LinqToDB;
using LinqToDB.Configuration;
using LinqToDB.Data;
using LinqToDB.Data.RetryPolicy;
Expand Down Expand Up @@ -371,42 +369,4 @@ public AkkaDataConnection GetConnection()
return connection;
}
}

public class AkkaDataConnection : IDisposable, IAsyncDisposable
{
private readonly string _providerName;

public AkkaDataConnection(
string providerName,
DataConnection connection)
{
_providerName = providerName.ToLower();

Db = connection;
}

public bool UseDateTime =>
!_providerName.Contains("sqlite") &&
!_providerName.Contains("postgresql");

public DataConnection Db { get; }

public IRetryPolicy RetryPolicy
{
get => Db.RetryPolicy;
set => Db.RetryPolicy = value;
}

public AkkaDataConnection Clone()
=> new(_providerName, (DataConnection)Db.Clone());

public void Dispose()
=> Db.Dispose();

public ValueTask DisposeAsync()
=> Db.DisposeAsync();

public ITable<T> GetTable<T>() where T : class
=> Db.GetTable<T>();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// -----------------------------------------------------------------------
// <copyright file="ByteArraySnapshotSerializer.cs" company="Akka.NET Project">
// <copyright file="ByteArrayDateTimeSnapshotSerializer.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
Expand All @@ -11,12 +11,14 @@

namespace Akka.Persistence.Sql.Snapshot
{
public class ByteArraySnapshotSerializer : ISnapshotSerializer<DateTimeSnapshotRow>
public class ByteArrayDateTimeSnapshotSerializer : ISnapshotSerializer<DateTimeSnapshotRow>
{
private readonly SnapshotConfig _config;
private readonly Akka.Serialization.Serialization _serialization;

public ByteArraySnapshotSerializer(Akka.Serialization.Serialization serialization, SnapshotConfig config)
public ByteArrayDateTimeSnapshotSerializer(
Akka.Serialization.Serialization serialization,
SnapshotConfig config)
{
_serialization = serialization;
_config = config;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// -----------------------------------------------------------------------
// <copyright file="ByteArrayLongSnapshotSerializer.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using Akka.Persistence.Sql.Config;
using Akka.Serialization;
using Akka.Util;

namespace Akka.Persistence.Sql.Snapshot
{
public class ByteArrayLongSnapshotSerializer : ISnapshotSerializer<LongSnapshotRow>
{
private readonly SnapshotConfig _config;
private readonly Akka.Serialization.Serialization _serialization;

public ByteArrayLongSnapshotSerializer(
Akka.Serialization.Serialization serialization,
SnapshotConfig config)
{
_serialization = serialization;
_config = config;
}

public Try<LongSnapshotRow> Serialize(SnapshotMetadata metadata, object snapshot)
=> Try<LongSnapshotRow>.From(() => ToSnapshotEntry(metadata, snapshot));

public Try<SelectedSnapshot> Deserialize(LongSnapshotRow t)
=> Try<SelectedSnapshot>.From(() => ReadSnapshot(t));

protected SelectedSnapshot ReadSnapshot(LongSnapshotRow reader)
{
var metadata = new SnapshotMetadata(
reader.PersistenceId,
reader.SequenceNumber,
new DateTime(reader.Created));

var snapshot = GetSnapshot(reader);

return new SelectedSnapshot(metadata, snapshot);
}

protected object GetSnapshot(LongSnapshotRow reader)
{
var manifest = reader.Manifest;
var binary = reader.Payload;

if (reader.SerializerId is null)
{
var type = Type.GetType(manifest, true);

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
return Akka.Serialization.Serialization.WithTransport(
system: _serialization.System,
state: (serializer: _serialization.FindSerializerForType(type, _config.DefaultSerializer), binary, type),
action: state => state.serializer.FromBinary(state.binary, state.type));
}

var serializerId = reader.SerializerId.Value;
return _serialization.Deserialize(binary, serializerId, manifest);
}

private LongSnapshotRow ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
{
var snapshotType = snapshot.GetType();
var serializer = _serialization.FindSerializerForType(snapshotType, _config.DefaultSerializer);
var binary = Akka.Serialization.Serialization.WithTransport(
system: _serialization.System,
state: (serializer, snapshot),
action: state => state.serializer.ToBinary(state.snapshot));

var manifest = serializer switch
{
SerializerWithStringManifest stringManifest => stringManifest.Manifest(snapshot),
{ IncludeManifest: true } => snapshotType.TypeQualifiedName(),
_ => string.Empty
};

return new LongSnapshotRow
{
PersistenceId = metadata.PersistenceId,
SequenceNumber = metadata.SequenceNr,
Created = metadata.Timestamp.Ticks,
Manifest = manifest,
Payload = binary,
SerializerId = serializer.Identifier
};
}
}
}
Loading

0 comments on commit 09dcdd5

Please sign in to comment.