diff --git a/Akka.Persistence.Azure.sln b/Akka.Persistence.Azure.sln index bcd72c8..057760c 100644 --- a/Akka.Persistence.Azure.sln +++ b/Akka.Persistence.Azure.sln @@ -17,6 +17,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{320BFA6C build.sh = build.sh EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Azure.Hosting", "src\Akka.Persistence.Azure.Hosting\Akka.Persistence.Azure.Hosting.csproj", "{64C6B877-9262-456B-8A1C-60C4F272DA19}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,6 +37,14 @@ Global {CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Debug|Any CPU.Build.0 = Debug|Any CPU {CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Release|Any CPU.ActiveCfg = Release|Any CPU {CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Release|Any CPU.Build.0 = Release|Any CPU + {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Release|Any CPU.Build.0 = Release|Any CPU + {64C6B877-9262-456B-8A1C-60C4F272DA19}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {64C6B877-9262-456B-8A1C-60C4F272DA19}.Debug|Any CPU.Build.0 = Debug|Any CPU + {64C6B877-9262-456B-8A1C-60C4F272DA19}.Release|Any CPU.ActiveCfg = Release|Any CPU + {64C6B877-9262-456B-8A1C-60C4F272DA19}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/README.md b/README.md index 918827e..5680bf7 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,47 @@ Akka.Persistence implementation that uses Windows Azure table and blob storage. ## Configuration +### Easy Mode: Using Akka.Hosting + +[Akka.Hosting](https://github.com/akkadotnet/Akka.Hosting) can make configuring Akka.Persistence.Azure trivially easy and HOCON-less. + +First, install the `Akka.Persistence.Azure.Hosting` NuGet package: + +```shell +PS> install-package Akka.Persistence.Azure.Hosting + +``` + +Next, add the `WithAzurePersistence` method calls to your `AkkaConfigurationBuilder` (from Akka.Hosting): + +```csharp +var conn = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); +var host = new HostBuilder() + .ConfigureServices(collection => + { + collection.AddAkka("MyActorSys", builder => + { + // enables both journal and snapshot store + builder.WithAzurePersistence(conn); + builder.StartActors((system, registry) => + { + var myActor = system.ActorOf(Props.Create(() => new MyPersistenceActor("ac1")), "actor1"); + registry.Register(myActor); + }); + }); + }).Build(); + +await host.StartAsync(); +return host; +``` + +You can also call the following methods to activate the journal / snapshot stores independently: + +* ` WithAzureTableJournal` +* `WithAzureBlobsSnapshotStore` + +### Custom Mode: HOCON + Here is a default configuration used by this plugin: https://github.com/petabridge/Akka.Persistence.Azure/blob/dev/src/Akka.Persistence.Azure/reference.conf You will need to provide connection string and Azure Table name for journal, and connection string with container name for Azure Blob Store: diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 064b44f..8a5428b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,41 @@ +#### 0.9.0 July 21 2022 #### +Added [Akka.Hosting](https://github.com/akkadotnet/Akka.Hosting) support to Akka.Persistence.Azure, which you can activate via the following: + +First, install the `Akka.Persistence.Azure.Hosting` NuGet package: + +```shell +PS> install-package Akka.Persistence.Azure.Hosting + +``` + +Next, add the `WithAzurePersistence` method calls to your `AkkaConfigurationBuilder` (from Akka.Hosting): + +```csharp +var conn = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); +var host = new HostBuilder() + .ConfigureServices(collection => + { + collection.AddAkka("MyActorSys", builder => + { + // enables both journal and snapshot store + builder.WithAzurePersistence(conn); + builder.StartActors((system, registry) => + { + var myActor = system.ActorOf(Props.Create(() => new MyPersistenceActor("ac1")), "actor1"); + registry.Register(myActor); + }); + }); + }).Build(); + +await host.StartAsync(); +return host; +``` + +You can also call the following methods to activate the journal / snapshot stores independently: + +* ` WithAzureTableJournal` +* `WithAzureBlobsSnapshotStore` + #### 0.8.4 June 2 2022 #### * Upgraded to [Akka.NET 1.4.39](https://github.com/akkadotnet/akka.net/releases/tag/1.4.39) * [Update Azure.Identity to 1.6.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/205) diff --git a/src/Akka.Persistence.Azure.Hosting/Akka.Persistence.Azure.Hosting.csproj b/src/Akka.Persistence.Azure.Hosting/Akka.Persistence.Azure.Hosting.csproj new file mode 100644 index 0000000..abb7b33 --- /dev/null +++ b/src/Akka.Persistence.Azure.Hosting/Akka.Persistence.Azure.Hosting.csproj @@ -0,0 +1,17 @@ + + + + $(NetStandardLibVersion) + Akka.Hosting support for Akka.Persistence.Azure. + 8.0 + + + + + + + + + + + diff --git a/src/Akka.Persistence.Azure.Hosting/AzurePersistenceExtensions.cs b/src/Akka.Persistence.Azure.Hosting/AzurePersistenceExtensions.cs new file mode 100644 index 0000000..a9fd2aa --- /dev/null +++ b/src/Akka.Persistence.Azure.Hosting/AzurePersistenceExtensions.cs @@ -0,0 +1,102 @@ +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Hosting; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Hosting; + +namespace Akka.Persistence.Azure.Hosting +{ + /// + /// Extension methods for Akka.Hosting and Akka.Azure.Persistence + /// + public static class AzurePersistenceExtensions + { + public const string DefaultTableName = "AkkaPersistenceDefaultTable"; + public const string DefaultBlobContainerName = "akka-persistence-default-container"; + + private static string ToHocon(bool b) + { + return b ? "on" : "off"; + } + + + public static AkkaConfigurationBuilder WithAzureTableJournal(this AkkaConfigurationBuilder builder, + string connectionString, bool autoInitialize = true, string tableName = DefaultTableName, Action configurator = null) + { + Config journalConfiguration = @$" + akka.persistence {{ + journal {{ + plugin = ""akka.persistence.journal.azure-table"" + azure-table {{ + class = ""Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure"" + connection-string = ""{connectionString}"" + # the name of the Windows Azure Table used to persist journal events + table-name = ""{tableName}"" + auto-initialize = {ToHocon(autoInitialize)} + }} + }} + }}"; + + var finalConfig = journalConfiguration; + builder.AddHocon(finalConfig, HoconAddMode.Prepend); + + // PUSH DEFAULT CONFIG TO END + builder.AddHocon(AzurePersistence.DefaultConfig, HoconAddMode.Append); + + if (configurator != null) // configure event adapters + { + builder.WithJournal("azure-table", configurator); + } + + return builder; + } + + public static AkkaConfigurationBuilder WithAzureBlobsSnapshotStore(this AkkaConfigurationBuilder builder, + string connectionString, bool autoInitialize = true, string containerName = DefaultBlobContainerName) + { + Config journalConfiguration = @$" + akka.persistence {{ + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.azure-blob-store"" + azure-blob-store {{ + class = ""Akka.Persistence.Azure.Snapshot.AzureBlobSnapshotStore, Akka.Persistence.Azure"" + connection-string = ""{connectionString}"" + # the name of the Windows Azure Table used to persist journal events + container-name = ""{containerName}"" + auto-initialize = {ToHocon(autoInitialize)} + }} + }} + }}"; + + var finalConfig = journalConfiguration; + builder.AddHocon(finalConfig, HoconAddMode.Prepend); + + // PUSH DEFAULT CONFIG TO END + builder.AddHocon(AzurePersistence.DefaultConfig, HoconAddMode.Append); + + return builder; + } + + /// + /// Adds both AzureTableStorage journal and AzureBlobStorage snapshot-store as the default Akka.Persistence + /// implementations for a given . + /// + /// + /// + /// + /// + /// + /// + /// + public static AkkaConfigurationBuilder WithAzurePersistence(this AkkaConfigurationBuilder builder, + string connectionString, bool autoInitialize = true, string containerName = DefaultBlobContainerName, + string tableName = DefaultTableName, Action configurator = null) + { + builder.WithAzureTableJournal(connectionString, autoInitialize, tableName, configurator); + builder.WithAzureBlobsSnapshotStore(connectionString, autoInitialize, containerName); + + return builder; + } + } +} diff --git a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj index b03985a..82c3a98 100644 --- a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj +++ b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj @@ -7,7 +7,7 @@ - + all @@ -16,6 +16,7 @@ + diff --git a/src/Akka.Persistence.Azure.Tests/Hosting/AzurePersistenceHostingSanityCheck.cs b/src/Akka.Persistence.Azure.Tests/Hosting/AzurePersistenceHostingSanityCheck.cs new file mode 100644 index 0000000..289ea86 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Hosting/AzurePersistenceHostingSanityCheck.cs @@ -0,0 +1,127 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Event; +using Akka.Hosting; +using Akka.Persistence.Azure.Hosting; +using Akka.TestKit.Xunit2.Internals; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Hosting +{ + public class AzurePersistenceHostingSanityCheck + { + public static async Task StartHost(Action testSetup) + { + var conn = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); + var host = new HostBuilder() + .ConfigureServices(collection => + { + collection.AddAkka("MyActorSys", builder => + { + builder.WithAzurePersistence(conn); + testSetup(builder); + }); + }).Build(); + + await host.StartAsync(); + return host; + } + + public sealed class MyPersistenceActor : ReceivePersistentActor + { + private List _values = new List(); + + public MyPersistenceActor(string persistenceId) + { + PersistenceId = persistenceId; + + Recover(offer => + { + if (offer.Snapshot is IEnumerable ints) + { + _values = new List(ints); + } + }); + + Recover(i => { _values.Add(i); }); + + Command(i => + { + Persist(i, i1 => + { + _values.Add(i); + if (LastSequenceNr % 2 == 0) + { + SaveSnapshot(_values); + } + + Sender.Tell("ACK"); + }); + }); + + Command(str => str.Equals("getall"), s => { Sender.Tell(_values.ToArray()); }); + + Command(s => { }); + } + + public override string PersistenceId { get; } + } + + private readonly ITestOutputHelper _output; + + public AzurePersistenceHostingSanityCheck(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task ShouldLaunchAzurePersistence() + { + // arrange + using var host = await StartHost(builder => { + + builder.StartActors((system, registry) => + { + var myActor = system.ActorOf(Props.Create(() => new MyPersistenceActor("ac1")), "actor1"); + registry.Register(myActor); + }) + .WithActors((system, registry) => + { + var extSystem = (ExtendedActorSystem)system; + var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(_output)), "log-test"); + logger.Tell(new InitializeLogger(system.EventStream)); + }); + }); + + var actorSystem = host.Services.GetRequiredService(); + var actorRegistry = host.Services.GetRequiredService(); + var myPersistentActor = actorRegistry.Get(); + + // act + var resp1 = await myPersistentActor.Ask(1, TimeSpan.FromSeconds(3)); + var resp2 = await myPersistentActor.Ask(2, TimeSpan.FromSeconds(3)); + var snapshot = await myPersistentActor.Ask("getall", TimeSpan.FromSeconds(3)); + + // assert + snapshot.Should().BeEquivalentTo(new[] {1, 2}); + + // kill + recreate actor with same PersistentId + await myPersistentActor.GracefulStop(TimeSpan.FromSeconds(3)); + var myPersistentActor2 = actorSystem.ActorOf(Props.Create(() => new MyPersistenceActor("ac1")), "actor1a"); + + var snapshot2 = await myPersistentActor2.Ask("getall", TimeSpan.FromSeconds(3)); + snapshot2.Should().BeEquivalentTo(new[] {1, 2}); + + // validate configs + var config = actorSystem.Settings.Config; + config.GetString("akka.persistence.journal.plugin").Should().Be("akka.persistence.journal.azure-table"); + config.GetString("akka.persistence.snapshot-store.plugin").Should().Be("akka.persistence.snapshot-store.azure-blob-store"); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj index 8a21670..46446b5 100644 --- a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj +++ b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index bb69159..1891569 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -369,7 +369,7 @@ protected override async Task> WriteMessagesAsync(IEnu if (_log.IsDebugEnabled && _settings.VerboseLogging) foreach (var r in allPersistenceResponse.Value) - _log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status); + _log.Debug("Azure table storage wrote entity with status code [{0}]", r.Status); if (HasPersistenceIdSubscribers || HasAllPersistenceIdSubscribers) { @@ -391,7 +391,7 @@ protected override async Task> WriteMessagesAsync(IEnu if (_log.IsDebugEnabled && _settings.VerboseLogging) foreach (var r in eventTagsResponse.Value) - _log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status); + _log.Debug("Azure table storage wrote entity with status code [{0}]", r.Status); if (HasTagSubscribers && taggedEntries.Count != 0) { diff --git a/src/common.props b/src/common.props index 0894d8f..35f12cf 100644 --- a/src/common.props +++ b/src/common.props @@ -2,10 +2,11 @@ Copyright © 2017-2020 Petabridge Petabridge - 0.7.1 - Release of Akka.Persistence.Azure** -- Upgraded to [Akka.NET v1.4.12](https://github.com/akkadotnet/akka.net/releases/tag/1.4.12) -- Added backoff/retry mechanism for `AzureSnapshotStore` + 0.8.4 + Upgraded to [Akka.NET 1.4.39](https://github.com/akkadotnet/akka.net/releases/tag/1.4.39) +[Update Azure.Identity to 1.6.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/205) +[Update System.Linq.Async to 6.0.1](https://github.com/petabridge/Akka.Persistence.Azure/pull/198) +[Upgrade `Microsoft.Azure.Consmos.Table` to `Azure.Data.Tables` 12.5.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/207)