From d8c3a7fb08d7bfe6d88d382f7280f93fa90a9adf Mon Sep 17 00:00:00 2001 From: kaleho Date: Mon, 30 Sep 2019 17:47:01 -0500 Subject: [PATCH] Completed implementation (#45) * Fixed blob storage tests Implemented HighestSequenceNrEntry to keep track of the highest sequence number separate from the journal entries themselves Updated replay query to consider the highest sequence number entry * This was the first pass at a completed implementation with the exception of the two tests that will not pass when the entire spec is run but will pass if run individually * Cleaned up table entities --- .../Akka.Persistence.Azure.Tests.csproj | 11 +- ...eSpec.cs => AzureBlobSnapshotStoreSpec.cs} | 8 +- .../AzurePersistenceConfigSpec.cs | 20 + .../AzureStorageConfigHelper.cs | 29 - .../AzureTableJournalSpec.cs | 23 +- .../Helper/AzureStorageConfigHelper.cs | 66 ++ .../Helper/JournalTestActor.cs | 45 + ...reTableCurrentEventsByPersistenceIdSpec.cs | 43 + .../Query/AzureTableCurrentEventsByTagSpec.cs | 49 + .../AzureTableCurrentPersistenceIdsSpec.cs | 86 ++ .../AzureTableEventsByPersistenceIdSpec.cs | 43 + .../Query/AzureTableEventsByTagSpec.cs | 88 ++ .../Query/AzureTablePersistenceIdsSpec.cs | 41 + .../{Util => }/SeqNoHelperSpecs.cs | 7 +- .../SerializerHelperSpecs.cs | 6 +- .../Akka.Persistence.Azure.csproj | 6 +- .../Journal/AzureTableStorageJournal.cs | 985 ++++++++++++++---- .../AzureTableStorageJournalSettings.cs | 15 +- .../Query/AzureTableStorageQuerySettings.cs | 42 + .../Query/AzureTableStorageReadJournal.cs | 194 ++++ .../AzureTableStorageReadJournalProvider.cs | 23 + .../Query/DeliveryBuffer.cs | 59 ++ src/Akka.Persistence.Azure/Query/Messages.cs | 255 +++++ .../AbstractEventsByPersistenceIdPublisher.cs | 106 ++ .../AbstractEventsByTagPublisher.cs | 92 ++ .../Publishers/AllPersistenceIdsPublisher.cs | 57 + .../CurrentEventsByPersistenceIdPublisher.cs | 39 + .../Publishers/CurrentEventsByTagPublisher.cs | 53 + .../EventsByPersistenceIdPublisher.cs | 24 + .../Query/Publishers/EventsByTagPublisher.cs | 24 + .../LiveEventsByPersistenceIdPublisher.cs | 44 + .../Publishers/LiveEventsByTagPublisher.cs | 53 + .../Snapshot/AzureBlobSnapshotStore.cs | 24 +- .../TableEntities/AllPersistenceIdsEntry.cs | 62 ++ .../TableEntities/EventTagEntry.cs | 130 +++ .../TableEntities/HighestSequenceNrEntry.cs | 71 ++ .../PersistentJournalEntry.cs | 75 +- src/Akka.Persistence.Azure/reference.conf | 24 + 38 files changed, 2733 insertions(+), 289 deletions(-) rename src/Akka.Persistence.Azure.Tests/{AzureBlobStorageSpec.cs => AzureBlobSnapshotStoreSpec.cs} (76%) delete mode 100644 src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Helper/JournalTestActor.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs rename src/Akka.Persistence.Azure.Tests/{Util => }/SeqNoHelperSpecs.cs (84%) create mode 100644 src/Akka.Persistence.Azure/Query/AzureTableStorageQuerySettings.cs create mode 100644 src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs create mode 100644 src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalProvider.cs create mode 100644 src/Akka.Persistence.Azure/Query/DeliveryBuffer.cs create mode 100644 src/Akka.Persistence.Azure/Query/Messages.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByPersistenceIdPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByTagPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/AllPersistenceIdsPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByPersistenceIdPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByTagPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/EventsByPersistenceIdPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/EventsByTagPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByPersistenceIdPublisher.cs create mode 100644 src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByTagPublisher.cs create mode 100644 src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs create mode 100644 src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs create mode 100644 src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs rename src/Akka.Persistence.Azure/{Journal => TableEntities}/PersistentJournalEntry.cs (61%) diff --git a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj index 834b8e7..4448072 100644 --- a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj +++ b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj @@ -6,12 +6,15 @@ - - + + - + - + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs similarity index 76% rename from src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs rename to src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs index 9b8db3b..239fd64 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs @@ -1,5 +1,5 @@ // ----------------------------------------------------------------------- -// +// // Copyright (C) 2015 - 2018 Petabridge, LLC // // ----------------------------------------------------------------------- @@ -10,14 +10,14 @@ using Akka.Persistence.TCK.Snapshot; using Xunit; using Xunit.Abstractions; -using static Akka.Persistence.Azure.Tests.AzureStorageConfigHelper; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { [Collection("AzureSnapshot")] - public class AzureBlobStorageSpec : SnapshotStoreSpec + public class AzureBlobSnapshotStoreSpec : SnapshotStoreSpec { - public AzureBlobStorageSpec(ITestOutputHelper output) : base(Config(), + public AzureBlobSnapshotStoreSpec(ITestOutputHelper output) : base(Config(), nameof(AzureTableJournalSpec), output) { AzurePersistence.Get(Sys); diff --git a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs index d2df43e..cbbf699 100644 --- a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs @@ -7,6 +7,7 @@ using System; using Akka.Configuration; using Akka.Persistence.Azure.Journal; +using Akka.Persistence.Azure.Query; using Akka.Persistence.Azure.Snapshot; using FluentAssertions; using FluentAssertions.Equivalency; @@ -60,6 +61,25 @@ public void ShouldParseTableConfig() tableSettings.VerboseLogging.Should().BeFalse(); } + [Fact] + public void ShouldParseQueryConfig() + { + var querySettings = + AzureTableStorageQuerySettings.Create( + ConfigurationFactory.ParseString(@"akka.persistence.query.journal.azure-table{ + class = ""classname"" + write-plugin = foo + max-buffer-size = 100 + refresh-interval = 3s + }").WithFallback(AzurePersistence.DefaultConfig) + .GetConfig("akka.persistence.query.journal.azure-table")); + + querySettings.Class.Should().Be("classname"); + querySettings.WritePlugin.Should().Be("foo"); + querySettings.MaxBufferSize.Should().Be("100"); + querySettings.RefreshInterval.Should().Be(new TimeSpan(0, 0, 3)); + } + [Theory] [InlineData("fo", "Invalid table name length")] [InlineData("1foo", "Invalid table name")] diff --git a/src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs b/src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs deleted file mode 100644 index d265b32..0000000 --- a/src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using Akka.Configuration; - -namespace Akka.Persistence.Azure.Tests -{ - public static class AzureStorageConfigHelper - { - public static Config AzureConfig(string connectionString) - { - var tableName = "t" + Guid.NewGuid().ToString().Replace("-", ""); - var containerName = "testcontainer" + Guid.NewGuid(); - - return ConfigurationFactory.ParseString( - @"akka.loglevel = DEBUG - akka.log-config-on-start = off - akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" - akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" - akka.persistence.snapshot-store.azure-blob-store.connection-string=""" + connectionString + @""" - akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.azure-blob-store"" - akka.persistence.journal.azure-table.verbose-logging = on - akka.test.single-expect-default = 3s") - .WithFallback("akka.persistence.journal.azure-table.table-name=" + tableName) - .WithFallback("akka.persistence.snapshot-store.azure-blob-store.container-name=" + containerName); - } - - } -} diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs index 6d22e35..d765555 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs @@ -4,40 +4,41 @@ // // ----------------------------------------------------------------------- -using System; -using System.Threading.Tasks; -using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.TCK.Journal; +using System; using Xunit; using Xunit.Abstractions; -using static Akka.Persistence.Azure.Tests.AzureStorageConfigHelper; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { [Collection("AzureJournal")] public class AzureTableJournalSpec : JournalSpec { - public static string TableName { get; private set; } - - public AzureTableJournalSpec(ITestOutputHelper output) : base(Config(), nameof(AzureTableJournalSpec), - output) + public AzureTableJournalSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTableJournalSpec), output) { AzurePersistence.Get(Sys); + Initialize(); + + output.WriteLine("Current table: {0}", TableName); } + public static string TableName { get; private set; } + public static Config Config() { - var config = + var azureConfig = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) ? AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) : AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - TableName = config.GetString("akka.persistence.journal.azure-table.table-name"); + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - return config; + return azureConfig; } protected override void Dispose(bool disposing) diff --git a/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs b/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs new file mode 100644 index 0000000..7c9a694 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs @@ -0,0 +1,66 @@ +using System; +using Akka.Configuration; + +namespace Akka.Persistence.Azure.Tests.Helper +{ + public static class AzureStorageConfigHelper + { + public static Config AzureConfig(string connectionString) + { + var tableName = "t" + Guid.NewGuid().ToString().Replace("-", ""); + var containerName = "testcontainer" + Guid.NewGuid(); + + return ConfigurationFactory.ParseString( + @" +akka { + loglevel = DEBUG + log-config-on-start = off + test.single-expect-default = 30s + + persistence { + publish-plugin-commands = on + + journal { + plugin = ""akka.persistence.journal.azure-table"" + + azure-table { + connection-string=""" + connectionString + @""" + connect-timeout = 3s + request-timeout = 3s + verbose-logging = on + + event-adapters { + color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"" + } + event-adapter-bindings = { + ""System.String"" = color-tagger + } + } + } + + query { + journal { + azure-table { + write-plugin = ""akka.persistence.journal.azure-table"" + refresh-interval = 1s + max-buffer-size = 150 + } + } + } + + snapshot-store { + plugin = ""akka.persistence.snapshot-store.azure-blob-store"" + + azure-blob-store { + connection-string=""" + connectionString + @""" + request-timeout = 3s + } + } + } +}") + .WithFallback("akka.persistence.journal.azure-table.table-name=" + tableName) + .WithFallback("akka.persistence.snapshot-store.azure-blob-store.container-name=" + containerName); + } + + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Helper/JournalTestActor.cs b/src/Akka.Persistence.Azure.Tests/Helper/JournalTestActor.cs new file mode 100644 index 0000000..60a82c0 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Helper/JournalTestActor.cs @@ -0,0 +1,45 @@ +using Akka.Actor; + +namespace Akka.Persistence.Azure.Tests.Helper +{ + public class JournalTestActor : UntypedPersistentActor + { + public static Props Props(string persistenceId) => Actor.Props.Create(() => new JournalTestActor(persistenceId)); + + public sealed class DeleteCommand + { + public DeleteCommand(long toSequenceNr) + { + ToSequenceNr = toSequenceNr; + } + + public long ToSequenceNr { get; } + } + + public JournalTestActor(string persistenceId) + { + PersistenceId = persistenceId; + } + + public override string PersistenceId { get; } + + protected override void OnRecover(object message) + { + } + + protected override void OnCommand(object message) + { + switch (message) + { + case DeleteCommand delete: + DeleteMessages(delete.ToSequenceNr); + Sender.Tell($"{delete.ToSequenceNr}-deleted"); + break; + case string cmd: + var sender = Sender; + Persist(cmd, e => sender.Tell($"{e}-done")); + break; + } + } + } +} diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs new file mode 100644 index 0000000..6597b5c --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs @@ -0,0 +1,43 @@ +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; +using Akka.Persistence.Query; +using Akka.Persistence.TCK.Query; +using System; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public sealed class AzureTableCurrentEventsByPersistenceIdSpec + : CurrentEventsByPersistenceIdSpec + { + public AzureTableCurrentEventsByPersistenceIdSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + { + AzurePersistence.Get(Sys); + + ReadJournal = + Sys.ReadJournalFor( + AzureTableStorageReadJournal.Identifier); + + output.WriteLine("Current table: {0}", TableName); + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs new file mode 100644 index 0000000..9cf18cf --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs @@ -0,0 +1,49 @@ +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; +using Akka.Persistence.Query; +using Akka.Persistence.TCK.Query; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public sealed class AzureTableCurrentEventsByTagSpec + : CurrentEventsByTagSpec + { + public AzureTableCurrentEventsByTagSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + { + AzurePersistence.Get(Sys); + + ReadJournal = + Sys.ReadJournalFor( + AzureTableStorageReadJournal.Identifier); + + output.WriteLine("Current table: {0}", TableName); + + var x = Sys.ActorOf(JournalTestActor.Props("x")); + x.Tell("warm-up"); + ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); + + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs new file mode 100644 index 0000000..8cb3eb3 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs @@ -0,0 +1,86 @@ +using System; +using System.Diagnostics; +using System.Linq; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; +using Akka.Persistence.Query; +using Akka.Persistence.TCK.Query; +using Akka.Streams.TestKit; +using Akka.Util.Internal; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public sealed class AzureTableCurrentPersistenceIdsSpec + : CurrentPersistenceIdsSpec + { + private readonly ITestOutputHelper _output; + + public AzureTableCurrentPersistenceIdsSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + { + _output = output; + AzurePersistence.Get(Sys); + + ReadJournal = + Sys.ReadJournalFor( + AzureTableStorageReadJournal.Identifier); + + output.WriteLine("Current table: {0}", TableName); + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + + public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete() + { + var queries = ReadJournal.AsInstanceOf(); + + Setup("a", 1); + Setup("b", 1); + Setup("c", 1); + + var greenSrc = queries.CurrentPersistenceIds(); + var probe = greenSrc.RunWith(this.SinkProbe(), Materializer); + var firstTwo = probe.Request(2).ExpectNextN(2); + Assert.Empty(firstTwo.Except(new[] { "a", "b", "c" }).ToArray()); + + var last = new[] { "a", "b", "c" }.Except(firstTwo).First(); + Setup("d", 1); + + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + probe.Request(5) + .ExpectNext(last) + .ExpectComplete(); + } + + private IActorRef Setup(string persistenceId, int n) + { + var sw = Stopwatch.StartNew(); + var pref = Sys.ActorOf(JournalTestActor.Props(persistenceId)); + for (int i = 1; i <= n; i++) + { + pref.Tell($"{persistenceId}-{i}"); + ExpectMsg($"{persistenceId}-{i}-done", TimeSpan.FromSeconds(10), $"{persistenceId}-{i}-done"); + } + _output.WriteLine(sw.ElapsedMilliseconds.ToString()); + return pref; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs new file mode 100644 index 0000000..f860a9a --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs @@ -0,0 +1,43 @@ +using System; +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; +using Akka.Persistence.Query; +using Akka.Persistence.TCK.Query; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public sealed class AzureTableEventsByPersistenceIdSpec + : EventsByPersistenceIdSpec + { + public AzureTableEventsByPersistenceIdSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + { + AzurePersistence.Get(Sys); + + ReadJournal = + Sys.ReadJournalFor( + AzureTableStorageReadJournal.Identifier); + + output.WriteLine("Current table: {0}", TableName); + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs new file mode 100644 index 0000000..b0cfb65 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Generic; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; +using Akka.Persistence.Journal; +using Akka.Persistence.Query; +using Akka.Persistence.TCK.Query; +using Akka.Streams.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public sealed class AzureTableEventsByTagSpec + : EventsByTagSpec + { + public AzureTableEventsByTagSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTableEventsByTagSpec), output) + { + AzurePersistence.Get(Sys); + + ReadJournal = + Sys.ReadJournalFor( + AzureTableStorageReadJournal.Identifier); + + var x = Sys.ActorOf(JournalTestActor.Props("x")); + x.Tell("warm-up"); + ExpectMsg("warm-up-done", TimeSpan.FromSeconds(60)); + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + + [Fact] + public void ReadJournal_should_delete_EventTags_index_items() + { + var queries = ReadJournal as IEventsByTagQuery; + + var b = Sys.ActorOf(JournalTestActor.Props("b")); + var d = Sys.ActorOf(JournalTestActor.Props("d")); + + b.Tell("a black car"); + ExpectMsg("a black car-done"); + + var blackSrc = queries.EventsByTag("black", offset: Offset.NoOffset()); + var probe = blackSrc.RunWith(this.SinkProbe(), Materializer); + probe.Request(2); + probe.ExpectNext(p => p.PersistenceId == "b" && p.SequenceNr == 1L && p.Event.Equals("a black car")); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + + d.Tell("a black dog"); + ExpectMsg("a black dog-done"); + d.Tell("a black night"); + ExpectMsg("a black night-done"); + + probe.ExpectNext(p => p.PersistenceId == "d" && p.SequenceNr == 1L && p.Event.Equals("a black dog")); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + probe.Request(10); + probe.ExpectNext(p => p.PersistenceId == "d" && p.SequenceNr == 2L && p.Event.Equals("a black night")); + + b.Tell(new JournalTestActor.DeleteCommand(1)); + AwaitAssert(() => ExpectMsg("1-deleted")); + + d.Tell(new JournalTestActor.DeleteCommand(2)); + AwaitAssert(() => ExpectMsg("2-deleted")); + + probe.Request(10); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + + probe.Cancel(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs new file mode 100644 index 0000000..cd711b6 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs @@ -0,0 +1,41 @@ +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Query; +using Akka.Persistence.TCK.Query; +using System; +using Xunit; +using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public sealed class AzureTablePersistenceIdsSpec + : PersistenceIdsSpec + { + public AzureTablePersistenceIdsSpec(ITestOutputHelper output) + : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + { + AzurePersistence.Get(Sys); + + ReadJournal = + Sys.ReadJournalFor( + AzureTableStorageReadJournal.Identifier); + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Util/SeqNoHelperSpecs.cs b/src/Akka.Persistence.Azure.Tests/SeqNoHelperSpecs.cs similarity index 84% rename from src/Akka.Persistence.Azure.Tests/Util/SeqNoHelperSpecs.cs rename to src/Akka.Persistence.Azure.Tests/SeqNoHelperSpecs.cs index f13a221..3e74989 100644 --- a/src/Akka.Persistence.Azure.Tests/Util/SeqNoHelperSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests/SeqNoHelperSpecs.cs @@ -1,12 +1,9 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Linq; using Akka.Persistence.Azure.Util; using FluentAssertions; using Xunit; -namespace Akka.Persistence.Azure.Tests.Util +namespace Akka.Persistence.Azure.Tests { public class SeqNoHelperSpecs { diff --git a/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs b/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs index 20e7462..c202886 100644 --- a/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; using System.Text; +using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; using FluentAssertions; using Xunit; using Xunit.Abstractions; @@ -13,7 +15,7 @@ public class SerializerHelperSpecs : Akka.TestKit.Xunit2.TestKit { private readonly SerializationHelper _helper; - public SerializerHelperSpecs(ITestOutputHelper helper) + public SerializerHelperSpecs(ITestOutputHelper helper) : base(Config(), nameof(SerializerHelperSpecs), output: helper) { // force Akka.Persistence serializers to be loaded @@ -31,6 +33,7 @@ public static Config Config() return azureConfig; } + [Fact] public void ShouldSerializeAndDeserializePersistentRepresentation() { @@ -42,6 +45,7 @@ public void ShouldSerializeAndDeserializePersistentRepresentation() deserialized.Manifest.Should().Be(persistentRepresentation.Manifest); deserialized.SequenceNr.Should().Be(persistentRepresentation.SequenceNr); deserialized.PersistenceId.Should().Be(persistentRepresentation.PersistenceId); + deserialized.Sender.Should().Be(persistentRepresentation.Sender); deserialized.IsDeleted.Should().BeFalse(); } } diff --git a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj index 4d84f98..9fcce1b 100644 --- a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj +++ b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj @@ -13,8 +13,10 @@ - - + + + + \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 4fe9861..d3149c7 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -1,22 +1,24 @@ -// ----------------------------------------------------------------------- +// ----------------------------------------------------------------------- // // Copyright (C) 2015 - 2018 Petabridge, LLC // // ----------------------------------------------------------------------- -using System; -using System.Collections.Generic; -using System.Collections.Immutable; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; using Akka.Actor; using Akka.Event; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TableEntities; using Akka.Persistence.Azure.Util; using Akka.Persistence.Journal; using Akka.Util.Internal; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Table; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Debug = System.Diagnostics.Debug; namespace Akka.Persistence.Azure.Journal @@ -31,84 +33,89 @@ namespace Akka.Persistence.Azure.Journal /// public class AzureTableStorageJournal : AsyncWriteJournal { + private static readonly Dictionary RetryInterval = + new Dictionary() + { + { 5, TimeSpan.FromMilliseconds(100) }, + { 4, TimeSpan.FromMilliseconds(500) }, + { 3, TimeSpan.FromMilliseconds(1000) }, + { 2, TimeSpan.FromMilliseconds(2000) }, + { 1, TimeSpan.FromMilliseconds(4000) }, + { 0, TimeSpan.FromMilliseconds(8000) }, + }; + + private readonly HashSet _allPersistenceIds = new HashSet(); + private readonly HashSet _allPersistenceIdSubscribers = new HashSet(); private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly Dictionary> _persistenceIdSubscribers = new Dictionary>(); private readonly SerializationHelper _serialization; private readonly AzureTableStorageJournalSettings _settings; private readonly CloudStorageAccount _storageAccount; private readonly Lazy _tableStorage; + private readonly Dictionary> _tagSubscribers = new Dictionary>(); public AzureTableStorageJournal() { _settings = AzurePersistence.Get(Context.System).TableSettings; _serialization = new SerializationHelper(Context.System); _storageAccount = CloudStorageAccount.Parse(_settings.ConnectionString); - _tableStorage = new Lazy(() => InitCloudStorage(5).Result); } public CloudTable Table => _tableStorage.Value; - private static readonly Dictionary RetryInterval = new Dictionary() - { - { 5, TimeSpan.FromMilliseconds(100) }, - { 4, TimeSpan.FromMilliseconds(500) }, - { 3, TimeSpan.FromMilliseconds(1000) }, - { 2, TimeSpan.FromMilliseconds(2000) }, - { 1, TimeSpan.FromMilliseconds(4000) }, - { 0, TimeSpan.FromMilliseconds(8000) }, - }; + protected bool HasAllPersistenceIdSubscribers => + _allPersistenceIdSubscribers.Count != 0; - private async Task InitCloudStorage(int remainingTries) - { - try - { - var tableClient = _storageAccount.CreateCloudTableClient(); - var tableRef = tableClient.GetTableReference(_settings.TableName); - var op = new OperationContext(); - using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) - { - if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token)) - _log.Info("Created Azure Cloud Table", _settings.TableName); - else - _log.Info("Successfully connected to existing table", _settings.TableName); - } + protected bool HasPersistenceIdSubscribers => + _persistenceIdSubscribers.Count != 0; - return tableRef; - } - catch (Exception ex) - { - _log.Error(ex, "[{0}] more tries to initialize table storage remaining...", remainingTries); - if (remainingTries == 0) - throw; - await Task.Delay(RetryInterval[remainingTries]); - return await InitCloudStorage(remainingTries - 1); - } - } + protected bool HasTagSubscribers => + _tagSubscribers.Count != 0; - protected override void PreStart() + public override async Task ReadHighestSequenceNrAsync( + string persistenceId, + long fromSequenceNr) { - _log.Debug("Initializing Azure Table Storage..."); + NotifyNewPersistenceIdAdded(persistenceId); - // forces loading of the value - var name = Table.Name; + _log.Debug("Entering method ReadHighestSequenceNrAsync"); - _log.Debug("Successfully started Azure Table Storage!"); + var sequenceNumberQuery = GenerateHighestSequenceNumberQuery(persistenceId); + TableQuerySegment result = null; + long seqNo = 0L; - // need to call the base in order to ensure Akka.Persistence starts up correctly - base.PreStart(); + do + { + result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, result?.ContinuationToken); + + if (result.Results.Count > 0) + { + seqNo = Math.Max(seqNo, result.Results.Max(x => x.HighestSequenceNr)); + } + } while (result.ContinuationToken != null); + + _log.Debug("Leaving method ReadHighestSequenceNrAsync with SeqNo [{0}] for PersistentId [{1}]", seqNo, persistenceId); + + return seqNo; } - public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, - long toSequenceNr, long max, + public override async Task ReplayMessagesAsync( + IActorContext context, + string persistenceId, + long fromSequenceNr, + long toSequenceNr, + long max, Action recoveryCallback) { -#if DEBUG + NotifyNewPersistenceIdAdded(persistenceId); + _log.Debug("Entering method ReplayMessagesAsync for persistentId [{0}] from seqNo range [{1}, {2}] and taking up to max [{3}]", persistenceId, fromSequenceNr, toSequenceNr, max); -#endif + if (max == 0) return; - var replayQuery = GenerateReplayQuery(persistenceId, fromSequenceNr, toSequenceNr); + var replayQuery = GeneratePersistentJournalEntryReplayQuery(persistenceId, fromSequenceNr, toSequenceNr); var nextTask = Table.ExecuteQuerySegmentedAsync(replayQuery, null); var count = 0L; @@ -116,12 +123,10 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per { var tableQueryResult = await nextTask; -#if DEBUG if (_log.IsDebugEnabled && _settings.VerboseLogging) { _log.Debug("Recovered [{0}] messages for entity [{1}]", tableQueryResult.Results.Count, persistenceId); } -#endif if (tableQueryResult.ContinuationToken != null) { @@ -151,164 +156,242 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per ++count; var deserialized = _serialization.PersistentFromBytes(savedEvent.Payload); -#if DEBUG + + // Write the new persistent because it sets the sender as deadLetters which is not correct + var persistent = + new Persistent( + deserialized.Payload, + deserialized.SequenceNr, + deserialized.PersistenceId, + deserialized.Manifest, + deserialized.IsDeleted, + ActorRefs.NoSender, + deserialized.WriterGuid); + if (_log.IsDebugEnabled && _settings.VerboseLogging) { - _log.Debug("Recovering [{0}] for entity [{1}].", deserialized, savedEvent.PartitionKey); + _log.Debug("Recovering [{0}] for entity [{1}].", persistent, savedEvent.PartitionKey); } -#endif - recoveryCallback(deserialized); + recoveryCallback(persistent); } } -#if DEBUG _log.Debug("Leaving method ReplayMessagesAsync"); -#endif } - private static TableQuery GenerateReplayQuery(string persistentId, long fromSequenceNumber, - long toSequenceNumber) + protected override async Task DeleteMessagesToAsync( + string persistenceId, + long toSequenceNr) { - var persistenceIdFilter = - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - persistentId); - - var highestSequenceNrFilter = - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.NotEqual, - HighestSequenceNrEntry.RowKeyValue); + NotifyNewPersistenceIdAdded(persistenceId); - var rowKeyGreaterThanFilter = - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.GreaterThanOrEqual, - fromSequenceNumber.ToJournalRowKey()); + _log.Debug("Entering method DeleteMessagesToAsync for persistentId [{0}] and up to seqNo [{1}]", persistenceId, toSequenceNr); - var rowKeyLessThanFilter = - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.LessThanOrEqual, - toSequenceNumber.ToJournalRowKey()); + var query = GeneratePersistentJournalEntryDeleteQuery(persistenceId, toSequenceNr); - var filter = - TableQuery.CombineFilters( - TableQuery.CombineFilters( - persistenceIdFilter, - TableOperators.And, - highestSequenceNrFilter), - TableOperators.And, - TableQuery.CombineFilters( - rowKeyLessThanFilter, - TableOperators.And, - rowKeyGreaterThanFilter)); + var nextSegment = Table.ExecuteQuerySegmentedAsync(query, null); - var returnValue = new TableQuery().Where(filter); + while (nextSegment != null) + { + var queryResults = await nextSegment; - return returnValue; - } + if (_log.IsDebugEnabled && _settings.VerboseLogging) + { + _log.Debug("Have [{0}] messages to delete for entity [{1}]", queryResults.Results.Count, persistenceId); + } - public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) - { -#if DEBUG - _log.Debug("Entering method ReadHighestSequenceNrAsync"); -#endif - var sequenceNumberQuery = GenerateHighestSequenceNumberQuery(persistenceId, fromSequenceNr); - TableQuerySegment result = null; - long seqNo = 0L; + nextSegment = + queryResults.ContinuationToken != null + ? Table.ExecuteQuerySegmentedAsync(query, queryResults.ContinuationToken) + : null; - do - { - result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, result?.ContinuationToken); - - if (result.Results.Count > 0) + if (queryResults.Results.Count > 0) { - seqNo = Math.Max(seqNo, result.Results.Max(x => x.HighestSequenceNr)); - } + var tableBatchOperation = new TableBatchOperation(); - } while (result.ContinuationToken != null); + foreach (var toBeDeleted in queryResults.Results) + { + tableBatchOperation.Delete(toBeDeleted); + } -#if DEBUG - _log.Debug("Leaving method ReadHighestSequenceNrAsync with SeqNo [{0}] for PersistentId [{1}]", seqNo, persistenceId); -#endif + var deleteTask = Table.ExecuteBatchAsync(tableBatchOperation); - return seqNo; + await deleteTask; + } + } + + _log.Debug("Leaving method DeleteMessagesToAsync for persistentId [{0}] and up to seqNo [{1}]", persistenceId, toSequenceNr); } - private static TableQuery GenerateHighestSequenceNumberQuery(string persistenceId, long fromSequenceNr) + protected override void PreStart() { - var filter = - TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - persistenceId), - TableOperators.And, - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.Equal, - HighestSequenceNrEntry.RowKeyValue)); + _log.Debug("Initializing Azure Table Storage..."); - var returnValue = new TableQuery().Where(filter); + // forces loading of the value + var name = Table.Name; - return returnValue; + _log.Debug("Successfully started Azure Table Storage!"); + + // need to call the base in order to ensure Akka.Persistence starts up correctly + base.PreStart(); } - protected override async Task> WriteMessagesAsync(IEnumerable messages) + protected override bool ReceivePluginInternal( + object message) + { + return message.Match() + .With(replay => + { + ReplayTaggedMessagesAsync(replay) + .PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e)); + }) + .With(subscribe => + { + AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId); + Context.Watch(Sender); + }) + .With(subscribe => + { + AddAllPersistenceIdSubscriber(Sender).Wait(); + Context.Watch(Sender); + }) + .With(subscribe => + { + AddTagSubscriber(Sender, subscribe.Tag); + Context.Watch(Sender); + }) + .With(terminated => RemoveSubscriber(terminated.ActorRef)) + .WasHandled; + } + + protected override async Task> WriteMessagesAsync( + IEnumerable atomicWrites) { try { -#if DEBUG - _log.Debug("Entering method WriteMessagesAsync"); -#endif + var taggedEntries = ImmutableDictionary>.Empty; + var exceptions = ImmutableList.Empty; var highSequenceNumbers = ImmutableDictionary.Empty; - using (var atomicWrites = messages.GetEnumerator()) + using (var currentWrites = atomicWrites.GetEnumerator()) { - while (atomicWrites.MoveNext()) + while (currentWrites.MoveNext()) { - Debug.Assert(atomicWrites.Current != null, "atomicWrites.Current != null"); + Debug.Assert(currentWrites.Current != null, "atomicWrites.Current != null"); - var batch = new TableBatchOperation(); - foreach(var currentMsg in atomicWrites.Current.Payload - .AsInstanceOf>()) - { + var list = currentWrites.Current.Payload.AsInstanceOf>().ToArray(); - Debug.Assert(currentMsg != null, nameof(currentMsg) + " != null"); + var batchItems = ImmutableList.Empty; - batch.Insert( + for (var i = 0; i < list.Length; i++) + { + var item = list[i]; + + Debug.Assert(item != null, nameof(item) + " != null"); + + byte[] payloadBytes = null; + + string[] tags = {}; + + // If the payload is a tagged payload, reset to a non-tagged payload + if (item.Payload is Tagged tagged) + { + item = item.WithPayload(tagged.Payload); + + payloadBytes = _serialization.PersistentToBytes(item); + + if (tagged.Tags.Count > 0) + { + tags = tagged.Tags.ToArray(); + + //foreach (var tag in tags) + //{ + // if (!taggedEntries.ContainsKey(tag)) + // { + // taggedEntries = taggedEntries.SetItem(tag, new List()); + // } + + // taggedEntries[tag].Add( + // new EventTagEntry( + // item.PersistenceId, + // tag, + // item.SequenceNr, + // payloadBytes, + // item.Manifest)); + //} + + //taggedEntries = + // taggedEntries.AddRange( + // tagged.Tags.Select( + // x => + // new EventTagEntry( + // item.PersistenceId, + // x, + // item.SequenceNr, + // payloadBytes, + // item.Manifest))); + } + } + + if (payloadBytes == null) + { + payloadBytes = _serialization.PersistentToBytes(item); + } + + var newItem = new PersistentJournalEntry( - currentMsg.PersistenceId, - currentMsg.SequenceNr, - _serialization.PersistentToBytes(currentMsg), - currentMsg.Manifest)); + item.PersistenceId, + item.SequenceNr, + payloadBytes, + item.Manifest, + tags); + + batchItems = batchItems.Add(newItem); + + foreach (var tag in tags) + { + if (!taggedEntries.ContainsKey(tag)) + { + taggedEntries = taggedEntries.SetItem(tag, new List()); + } + + taggedEntries[tag].Add( + new EventTagEntry( + newItem.PartitionKey, + tag, + newItem.SeqNo, + newItem.Payload, + newItem.Manifest, + newItem.UtcTicks)); + } highSequenceNumbers = highSequenceNumbers.SetItem( - currentMsg.PersistenceId, - currentMsg.SequenceNr); + item.PersistenceId, + item.SequenceNr); } - highSequenceNumbers.ForEach( - x => batch.InsertOrReplace( - new HighestSequenceNrEntry(x.Key, x.Value))); - try { + var persistenceBatch = new TableBatchOperation(); + + highSequenceNumbers.ForEach( + x => batchItems = batchItems.Add( + new HighestSequenceNrEntry(x.Key, x.Value))); + + batchItems.ForEach(x => persistenceBatch.InsertOrReplace(x)); + if (_log.IsDebugEnabled && _settings.VerboseLogging) - _log.Debug("Attempting to write batch of {0} messages to Azure storage", batch.Count); + _log.Debug("Attempting to write batch of {0} messages to Azure storage", persistenceBatch.Count); - var results = await Table.ExecuteBatchAsync(batch); + var persistenceResults = await Table.ExecuteBatchAsync(persistenceBatch); if (_log.IsDebugEnabled && _settings.VerboseLogging) - foreach (var r in results) - _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, - r.HttpStatusCode); + foreach (var r in persistenceResults) + _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); } catch (Exception ex) { @@ -317,19 +400,79 @@ protected override async Task> WriteMessagesAsync(IEnu } } -#if DEBUG - _log.Debug("Leaving method WriteMessagesAsync"); - foreach (var ex in exceptions) + if (exceptions.IsEmpty) { - _log.Error(ex, "recorded exception during write"); + var allPersistenceIdsBatch = new TableBatchOperation(); + + highSequenceNumbers.ForEach( + x => allPersistenceIdsBatch.InsertOrReplace( + new AllPersistenceIdsEntry(x.Key))); + + var allPersistenceResults = await Table.ExecuteBatchAsync(allPersistenceIdsBatch); + + if (_log.IsDebugEnabled && _settings.VerboseLogging) + foreach (var r in allPersistenceResults) + _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); + + if (HasPersistenceIdSubscribers || HasAllPersistenceIdSubscribers) + { + highSequenceNumbers.ForEach(x => NotifyNewPersistenceIdAdded(x.Key)); + } + + if (taggedEntries.Count > 0) + { + //var groupTags = ImmutableDictionary>.Empty; + + //taggedEntries.ForEach( + // x => + // { + // if (!groupTags.ContainsKey(x.IdxTag)) + // { + // groupTags.SetItem(x.IdxTag, new List()); + // } + + // groupTags[x.IdxTag].Add(x); + // }); + + var eventTagsBatch = new TableBatchOperation(); + + foreach (var kvp in taggedEntries) + { + eventTagsBatch.Clear(); + + //taggedItem.Value.ForEach( + // x => eventTagsBatch.InsertOrReplace(x)); + + foreach (var item in kvp.Value) + { + eventTagsBatch.InsertOrReplace(item); + } + + var eventTagsResults = await Table.ExecuteBatchAsync(eventTagsBatch); + + if (_log.IsDebugEnabled && _settings.VerboseLogging) + foreach (var r in eventTagsResults) + _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); + + if (HasTagSubscribers && taggedEntries.Count != 0) + { + //var tags = taggedEntries.Select(x => x.IdxTag).Distinct().ToArray(); + + foreach (var tag in taggedEntries.Keys) + { + NotifyTagChange(tag); + } + } + + } + } } -#endif /* * Part of the Akka.Persistence design. - * + * * Either return null or return an exception for each failed AtomicWrite. - * + * * Either everything fails or everything succeeds is the idea I guess. */ return exceptions.IsEmpty ? null : exceptions; @@ -341,59 +484,499 @@ protected override async Task> WriteMessagesAsync(IEnu } } - protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + private static TableQuery GenerateAllPersistenceIdsQuery() { -#if DEBUG - _log.Debug("Entering method DeleteMessagesToAsync for persistentId [{0}] and up to seqNo [{1}]", persistenceId, toSequenceNr); -#endif - var deleteQuery = new TableQuery().Where( + var filter = + TableQuery.GenerateFilterCondition( + "PartitionKey", + QueryComparisons.Equal, + AllPersistenceIdsEntry.PartitionKeyValue); + + var returnValue = new TableQuery().Where(filter).Select(new[] { "RowKey" }); + + return returnValue; + } + + private static TableQuery GenerateHighestSequenceNumberQuery( + string persistenceId) + { + var filter = + TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition( + "PartitionKey", + QueryComparisons.Equal, + persistenceId), + TableOperators.And, + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.Equal, + HighestSequenceNrEntry.RowKeyValue)); + + var returnValue = new TableQuery().Where(filter); + + return returnValue; + } + + //private static TableQuery GeneratePersistentJournalEntryDeleteQuery( + private static TableQuery GeneratePersistentJournalEntryDeleteQuery( + string persistenceId, + long toSequenceNr) + { + var persistenceIdFilter = + TableQuery.GenerateFilterCondition( + "PartitionKey", + QueryComparisons.Equal, + persistenceId); + + var highestSequenceNrFilter = + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.NotEqual, + HighestSequenceNrEntry.RowKeyValue); + + var rowKeyLessThanFilter = + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.LessThanOrEqual, + toSequenceNr.ToJournalRowKey()); + + var rowKeyFilter = + TableQuery.CombineFilters( + highestSequenceNrFilter, + TableOperators.And, + rowKeyLessThanFilter); + + var filter = + TableQuery.CombineFilters( + persistenceIdFilter, + TableOperators.And, + rowKeyFilter); + + //var returnValue = new TableQuery().Where(filter).Select(new[] { "RowKey" }); + + var returnValue = new TableQuery().Where(filter); + + return returnValue; + } + + private static TableQuery GenerateEventTagEntryDeleteQuery( + string persistenceId, + long fromSequenceNr, + long toSequenceNr) + { + var persistenceIdFilter = + TableQuery.GenerateFilterCondition( + "PartitionKey", + QueryComparisons.Equal, + EventTagEntry.PartitionKeyValue); + + var idxPartitionKeyFilter = + TableQuery.GenerateFilterCondition( + EventTagEntry.IdxPartitionKeyKeyName, + QueryComparisons.Equal, + persistenceId); + + var idxRowKeyGreaterThanFilter = + TableQuery.GenerateFilterCondition( + EventTagEntry.IdxRowKeyKeyName, + QueryComparisons.GreaterThanOrEqual, + fromSequenceNr.ToJournalRowKey()); + + var idxRowKeyLessThanFilter = + TableQuery.GenerateFilterCondition( + EventTagEntry.IdxRowKeyKeyName, + QueryComparisons.LessThanOrEqual, + toSequenceNr.ToJournalRowKey()); + + var partitionKeyFilter = + TableQuery.CombineFilters( + persistenceIdFilter, + TableOperators.And, + idxPartitionKeyFilter); + + var idxRowKeyFilter = + TableQuery.CombineFilters( + idxRowKeyGreaterThanFilter, + TableOperators.And, + idxRowKeyLessThanFilter); + + var filter = + TableQuery.CombineFilters( + partitionKeyFilter, + TableOperators.And, + idxRowKeyFilter); + + //var returnValue = new TableQuery().Where(filter).Select(new[] { "RowKey" }); + + var returnValue = new TableQuery().Where(filter); + + return returnValue; + } + + private static TableQuery GeneratePersistentJournalEntryReplayQuery( + string persistentId, + long fromSequenceNumber, + long toSequenceNumber) + { + var persistenceIdFilter = + TableQuery.GenerateFilterCondition( + "PartitionKey", + QueryComparisons.Equal, + persistentId); + + var highestSequenceNrFilter = + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.NotEqual, + HighestSequenceNrEntry.RowKeyValue); + + var filter = + TableQuery.CombineFilters( + persistenceIdFilter, + TableOperators.And, + highestSequenceNrFilter); + + if (fromSequenceNumber > 0) + { + filter = TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, persistenceId), + filter, TableOperators.And, - TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.LessThanOrEqual, - toSequenceNr.ToJournalRowKey()))) - ; + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.GreaterThanOrEqual, + fromSequenceNumber.ToJournalRowKey())); + } + + if (toSequenceNumber != long.MaxValue) + { + filter = + TableQuery.CombineFilters( + filter, + TableOperators.And, + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.LessThanOrEqual, + toSequenceNumber.ToJournalRowKey())); + } + + var returnValue = new TableQuery().Where(filter); + + return returnValue; + } + + //private static TableQuery GenerateTaggedMessageQuery( + // ReplayTaggedMessages replay) + //{ + // var partitionKeyFilter = + // TableQuery.GenerateFilterCondition( + // "PartitionKey", + // QueryComparisons.Equal, + // EventTagEntry.PartitionKeyValue); + + // var highestSequenceNrFilter = + // TableQuery.GenerateFilterCondition( + // "RowKey", + // QueryComparisons.NotEqual, + // HighestSequenceNrEntry.RowKeyValue); + + // var tagRangeFilter = + // TableQuery.CombineFilters( + // TableQuery.GenerateFilterCondition( + // "RowKey", + // QueryComparisons.GreaterThanOrEqual, + // $"{replay.Tag}{EventTagEntry.Delimiter}"), + // TableOperators.And, + // TableQuery.GenerateFilterCondition( + // "RowKey", + // QueryComparisons.LessThan, + // $"{replay.Tag}{EventTagEntry.AsciiIncrementedDelimiter}")); + + // var rowKeyFilter = + // TableQuery.CombineFilters( + // highestSequenceNrFilter, + // TableOperators.And, + // tagRangeFilter); + + // var utcTicksRangeFilter = + // TableQuery.CombineFilters( + // TableQuery.GenerateFilterConditionForLong( + // EventTagEntry.UtcTicksKeyName, + // QueryComparisons.GreaterThan, + // replay.FromOffset), + // TableOperators.And, + // TableQuery.GenerateFilterConditionForLong( + // EventTagEntry.UtcTicksKeyName, + // QueryComparisons.LessThanOrEqual, + // replay.ToOffset)); + + // var filter = + // TableQuery.CombineFilters( + // TableQuery.CombineFilters( + // partitionKeyFilter, + // TableOperators.And, + // rowKeyFilter), + // TableOperators.And, + // utcTicksRangeFilter); + + // var returnValue = new TableQuery().Where(filter); + + // return returnValue; + //} + + private static TableQuery GenerateTaggedMessageQuery( + ReplayTaggedMessages replay) + { + var partitionKeyFilter = + TableQuery.GenerateFilterCondition( + "PartitionKey", + QueryComparisons.Equal, + EventTagEntry.GetPartitionKey(replay.Tag)); + + //var highestSequenceNrFilter = + // TableQuery.GenerateFilterCondition( + // "RowKey", + // QueryComparisons.NotEqual, + // HighestSequenceNrEntry.RowKeyValue); + + //var tagRangeFilter = + // TableQuery.CombineFilters( + // TableQuery.GenerateFilterCondition( + // "RowKey", + // QueryComparisons.GreaterThanOrEqual, + // $"{replay.Tag}{EventTagEntry.Delimiter}"), + // TableOperators.And, + // TableQuery.GenerateFilterCondition( + // "RowKey", + // QueryComparisons.LessThan, + // $"{replay.Tag}{EventTagEntry.AsciiIncrementedDelimiter}")); + + var utcTicksTRowKeyFilter = + TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.GreaterThan, + $"{replay.FromOffset.ToJournalRowKey()}{EventTagEntry.AsciiIncrementedDelimiter}"), + TableOperators.And, + TableQuery.GenerateFilterCondition( + "RowKey", + QueryComparisons.LessThanOrEqual, + $"{replay.ToOffset.ToJournalRowKey()}{EventTagEntry.Delimiter}")); + + var filter = + TableQuery.CombineFilters( + partitionKeyFilter, + TableOperators.And, + utcTicksTRowKeyFilter); + + //var filter = + // TableQuery.CombineFilters( + // TableQuery.CombineFilters( + // partitionKeyFilter, + // TableOperators.And, + // rowKeyFilter), + // TableOperators.And, + // utcTicksRangeFilter); + + var returnValue = new TableQuery().Where(filter); - var nextQuery = Table.ExecuteQuerySegmentedAsync(deleteQuery, null); + return returnValue; + } + + private async Task AddAllPersistenceIdSubscriber( + IActorRef subscriber) + { + lock (_allPersistenceIdSubscribers) + { + _allPersistenceIdSubscribers.Add(subscriber); + } + subscriber.Tell(new CurrentPersistenceIds(await GetAllPersistenceIds())); + } - while (nextQuery != null) + private void AddPersistenceIdSubscriber( + IActorRef subscriber, + string persistenceId) + { + if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) { - var queryResults = await nextQuery; + subscriptions = new HashSet(); + _persistenceIdSubscribers.Add(persistenceId, subscriptions); + } - if (_log.IsDebugEnabled && _settings.VerboseLogging) - { - _log.Debug("Have [{0}] messages to delete for entity [{1}]", queryResults.Results.Count, persistenceId); - } + subscriptions.Add(subscriber); + } - if (queryResults.ContinuationToken != null) // more data on the wire + private void AddTagSubscriber( + IActorRef subscriber, + string tag) + { + if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) + { + subscriptions = new HashSet(); + _tagSubscribers.Add(tag, subscriptions); + } + + subscriptions.Add(subscriber); + } + + private async Task> GetAllPersistenceIds() + { + var query = GenerateAllPersistenceIdsQuery(); + + TableQuerySegment result = null; + + var returnValue = ImmutableList.Empty; + + do + { + result = await Table.ExecuteQuerySegmentedAsync(query, result?.ContinuationToken); + + if (result.Results.Count > 0) { - nextQuery = Table.ExecuteQuerySegmentedAsync(deleteQuery, queryResults.ContinuationToken); + returnValue = returnValue.AddRange(result.Results.Select(x => x.RowKey)); } - else + } while (result.ContinuationToken != null); + + return returnValue; + } + + private async Task InitCloudStorage( + int remainingTries) + { + try + { + var tableClient = _storageAccount.CreateCloudTableClient(); + var tableRef = tableClient.GetTableReference(_settings.TableName); + var op = new OperationContext(); + using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) { - nextQuery = null; + if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token)) + _log.Info("Created Azure Cloud Table", _settings.TableName); + else + _log.Info("Successfully connected to existing table", _settings.TableName); } - if (queryResults.Results.Count > 0) + return tableRef; + } + catch (Exception ex) + { + _log.Error(ex, "[{0}] more tries to initialize table storage remaining...", remainingTries); + if (remainingTries == 0) + throw; + await Task.Delay(RetryInterval[remainingTries]); + return await InitCloudStorage(remainingTries - 1); + } + } + + private void NotifyNewPersistenceIdAdded( + string persistenceId) + { + var isNew = TryAddPersistenceId(persistenceId); + if (isNew && HasAllPersistenceIdSubscribers) + { + var added = new PersistenceIdAdded(persistenceId); + foreach (var subscriber in _allPersistenceIdSubscribers) + subscriber.Tell(added); + } + } + + private void NotifyTagChange( + string tag) + { + if (_tagSubscribers.TryGetValue(tag, out var subscribers)) + { + var changed = new TaggedEventAppended(tag); + foreach (var subscriber in subscribers) + subscriber.Tell(changed); + } + } + + private void RemoveSubscriber( + IActorRef subscriber) + { + var pidSubscriptions = _persistenceIdSubscribers.Values.Where(x => x.Contains(subscriber)); + foreach (var subscription in pidSubscriptions) + subscription.Remove(subscriber); + + var tagSubscriptions = _tagSubscribers.Values.Where(x => x.Contains(subscriber)); + foreach (var subscription in tagSubscriptions) + subscription.Remove(subscriber); + + _allPersistenceIdSubscribers.Remove(subscriber); + } + + /// + /// Replays all events with given tag within provided boundaries from current database. + /// + /// TBD + /// TBD + private async Task ReplayTaggedMessagesAsync( + ReplayTaggedMessages replay) + { + var query = GenerateTaggedMessageQuery(replay); + + // While we can specify the TakeCount, the CloudTable client does + // not really respect this fact and will keep pulling results. + query.TakeCount = + replay.Max > int.MaxValue + ? int.MaxValue + : (int)replay.Max; + + // In order to actually break at the limit we ask for we have to + // keep a separate counter and track it ourselves. + var counter = 0; + + TableQuerySegment result = null; + + var maxOrderingId = 0L; + + do + { + result = await Table.ExecuteQuerySegmentedAsync(query, result?.ContinuationToken); + + foreach (var entry in result.Results.OrderBy(x => x.UtcTicks)) { - var tableBatchOperation = new TableBatchOperation(); - foreach (var toBeDeleted in queryResults.Results) - tableBatchOperation.Delete(toBeDeleted); + var deserialized = _serialization.PersistentFromBytes(entry.Payload); + + var persistent = + new Persistent( + deserialized.Payload, + deserialized.SequenceNr, + deserialized.PersistenceId, + deserialized.Manifest, + deserialized.IsDeleted, + ActorRefs.NoSender, + deserialized.WriterGuid); + + foreach (var adapted in AdaptFromJournal(persistent)) + { + _log.Debug("Sending replayed message: persistenceId:{0} - sequenceNr:{1} - event:{2}", + deserialized.PersistenceId, deserialized.SequenceNr, deserialized.Payload); - var deleteTask = Table.ExecuteBatchAsync(tableBatchOperation); + replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, replay.Tag, entry.UtcTicks), ActorRefs.NoSender); - + counter++; + } - await deleteTask; + maxOrderingId = Math.Max(maxOrderingId, entry.UtcTicks); } - } - - + if (counter >= replay.Max) + { + break; + } + } while (result.ContinuationToken != null); -#if DEBUG - _log.Debug("Leaving method DeleteMessagesToAsync for persistentId [{0}] and up to seqNo [{1}]", persistenceId, toSequenceNr); -#endif + return maxOrderingId; + } + + private bool TryAddPersistenceId( + string persistenceId) + { + lock (_allPersistenceIds) + { + return _allPersistenceIds.Add(persistenceId); + } } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs index 10e3ae4..67b47be 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs @@ -19,8 +19,12 @@ public sealed class AzureTableStorageJournalSettings private static readonly string[] ReservedTableNames = {"tables"}; - public AzureTableStorageJournalSettings(string connectionString, string tableName, TimeSpan connectTimeout, - TimeSpan requestTimeout, bool verboseLogging) + public AzureTableStorageJournalSettings( + string connectionString, + string tableName, + TimeSpan connectTimeout, + TimeSpan requestTimeout, + bool verboseLogging) { NameValidator.ValidateTableName(tableName); @@ -75,7 +79,12 @@ public static AzureTableStorageJournalSettings Create(Config config) var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3)); var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3)); var verbose = config.GetBoolean("verbose-logging", false); - return new AzureTableStorageJournalSettings(connectionString, tableName, connectTimeout, requestTimeout, + + return new AzureTableStorageJournalSettings( + connectionString, + tableName, + connectTimeout, + requestTimeout, verbose); } } diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageQuerySettings.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageQuerySettings.cs new file mode 100644 index 0000000..7543fd5 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageQuerySettings.cs @@ -0,0 +1,42 @@ +using Akka.Configuration; +using System; + +namespace Akka.Persistence.Azure.Query +{ + public sealed class AzureTableStorageQuerySettings + { + private AzureTableStorageQuerySettings( + string @class, + string writePlugin, + string maxBufferSize, + TimeSpan refreshInterval) + { + Class = @class; + MaxBufferSize = maxBufferSize; + RefreshInterval = refreshInterval; + WritePlugin = writePlugin; + } + + public string Class { get; } + + public string MaxBufferSize { get; } + + public TimeSpan RefreshInterval { get; } + + public string WritePlugin { get; } + + public static AzureTableStorageQuerySettings Create(Config config) + { + var @class = config.GetString("class"); + var writePlugin = config.GetString("write-plugin"); + var maxBufferSize = config.GetString("max-buffer-size"); + var refreshInterval = config.GetTimeSpan("refresh-interval"); + + return new AzureTableStorageQuerySettings( + @class, + writePlugin, + maxBufferSize, + refreshInterval); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs new file mode 100644 index 0000000..e11a592 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs @@ -0,0 +1,194 @@ +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Azure.Query.Publishers; +using Akka.Persistence.Query; +using Akka.Streams.Actors; +using Akka.Streams.Dsl; + +namespace Akka.Persistence.Azure.Query +{ + public class AzureTableStorageReadJournal : IReadJournal, + IPersistenceIdsQuery, + ICurrentPersistenceIdsQuery, + IEventsByPersistenceIdQuery, + ICurrentEventsByPersistenceIdQuery, + IEventsByTagQuery, + ICurrentEventsByTagQuery + { + public static string Identifier = "akka.persistence.query.journal.azure-table"; + + private readonly int _maxBufferSize; + private readonly TimeSpan _refreshInterval; + private readonly string _writeJournalPluginId; + + /// + /// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores. + /// + /// + public static Config DefaultConfiguration() + { + return ConfigurationFactory.FromResource("Akka.Persistence.Azure.reference.conf"); + } + + public AzureTableStorageReadJournal(ExtendedActorSystem system, Config config) + { + _maxBufferSize = config.GetInt("max-buffer-size"); + _refreshInterval = config.GetTimeSpan("refresh-interval"); + _writeJournalPluginId = config.GetString("write-plugin"); + } + + /// + /// + /// is used for retrieving all `persistenceIds` of all + /// persistent actors. + /// + /// The returned event stream is unordered and you can expect different order for multiple + /// executions of the query. + /// + /// The stream is not completed when it reaches the end of the currently used `persistenceIds`, + /// but it continues to push new `persistenceIds` when new persistent actors are created. + /// Corresponding query that is completed when it reaches the end of the currently + /// currently used `persistenceIds` is provided by . + /// + /// The SQL write journal is notifying the query side as soon as new `persistenceIds` are + /// created and there is no periodic polling or batching involved in this query. + /// + /// The stream is completed with failure if there is a failure in executing the query in the + /// backend journal. + /// + /// + public Source PersistenceIds() => + Source.ActorPublisher(AllPersistenceIdsPublisher.Props(true, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("AllPersistenceIds") as Source; + + /// + /// Same type of query as but the stream + /// is completed immediately when it reaches the end of the "result set". Persistent + /// actors that are created after the query is completed are not included in the stream. + /// + public Source CurrentPersistenceIds() => + Source.ActorPublisher(AllPersistenceIdsPublisher.Props(false, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("CurrentPersistenceIds") as Source; + + /// + /// is used for retrieving events for a specific + /// identified by . + /// + /// You can retrieve a subset of all events by specifying and + /// or use `0L` and respectively to retrieve all events. Note that + /// the corresponding sequence number of each event is provided in the + /// , which makes it possible to resume the + /// stream at a later point from a given sequence number. + /// + /// The returned event stream is ordered by sequence number, i.e. the same order as the + /// persisted the events. The same prefix of stream elements (in same order) + /// are returned for multiple executions of the query, except for when events have been deleted. + /// + /// The stream is not completed when it reaches the end of the currently stored events, + /// but it continues to push new events when new events are persisted. + /// Corresponding query that is completed when it reaches the end of the currently + /// stored events is provided by . + /// + /// The SQLite write journal is notifying the query side as soon as events are persisted, but for + /// efficiency reasons the query side retrieves the events in batches that sometimes can + /// be delayed up to the configured `refresh-interval`. + /// + /// The stream is completed with failure if there is a failure in executing the query in the + /// backend journal. + /// + public Source EventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) => + Source.ActorPublisher(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, _refreshInterval, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("EventsByPersistenceId-" + persistenceId) as Source; + + /// + /// Same type of query as but the event stream + /// is completed immediately when it reaches the end of the "result set". Events that are + /// stored after the query is completed are not included in the event stream. + /// + public Source CurrentEventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) => + Source.ActorPublisher(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, null, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("CurrentEventsByPersistenceId-" + persistenceId) as Source; + + /// + /// is used for retrieving events that were marked with + /// a given tag, e.g. all events of an Aggregate Root type. + /// + /// To tag events you create an that wraps the events + /// in a with the given `tags`. + /// + /// You can use to retrieve all events with a given tag or retrieve a subset of all + /// events by specifying a . The `offset` corresponds to an ordered sequence number for + /// the specific tag. Note that the corresponding offset of each event is provided in the + /// , which makes it possible to resume the + /// stream at a later point from a given offset. + /// + /// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + /// in the returned stream.This means that you can use the offset that is returned in + /// as the `offset` parameter in a subsequent query. + /// + /// In addition to the the also provides `persistenceId` and `sequenceNr` + /// for each event. The `sequenceNr` is the sequence number for the persistent actor with the + /// `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique + /// identifier for the event. + /// + /// The returned event stream is ordered by the offset (tag sequence number), which corresponds + /// to the same order as the write journal stored the events. The same stream elements (in same order) + /// are returned for multiple executions of the query. Deleted events are not deleted from the + /// tagged event stream. + /// + /// The stream is not completed when it reaches the end of the currently stored events, + /// but it continues to push new events when new events are persisted. + /// Corresponding query that is completed when it reaches the end of the currently + /// stored events is provided by . + /// + /// The SQL write journal is notifying the query side as soon as tagged events are persisted, but for + /// efficiency reasons the query side retrieves the events in batches that sometimes can + /// be delayed up to the configured `refresh-interval`. + /// + /// The stream is completed with failure if there is a failure in executing the query in the + /// backend journal. + /// + public Source EventsByTag(string tag, Offset offset = null) + { + offset = offset ?? new Sequence(0L); + switch (offset) + { + case Sequence seq: + return Source.ActorPublisher(EventsByTagPublisher.Props(tag, seq.Value, long.MaxValue, _refreshInterval, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named($"EventsByTag-{tag}"); + case NoOffset _: + return EventsByTag(tag, new Sequence(0L)); + default: + throw new ArgumentException($"{GetType().Name} does not support {offset.GetType().Name} offsets"); + } + } + + /// + /// Same type of query as but the event stream + /// is completed immediately when it reaches the end of the "result set". Events that are + /// stored after the query is completed are not included in the event stream. + /// + public Source CurrentEventsByTag(string tag, Offset offset) + { + offset = offset ?? new Sequence(0L); + switch (offset) + { + case Sequence seq: + return Source.ActorPublisher(EventsByTagPublisher.Props(tag, seq.Value, long.MaxValue, null, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named($"CurrentEventsByTag-{tag}"); + case NoOffset _: + return CurrentEventsByTag(tag, new Sequence(0L)); + default: + throw new ArgumentException($"{GetType().Name} does not support {offset.GetType().Name} offsets"); + } + } + } + +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalProvider.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalProvider.cs new file mode 100644 index 0000000..b2e309d --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalProvider.cs @@ -0,0 +1,23 @@ +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Query; + +namespace Akka.Persistence.Azure.Query +{ + public class AzureTableStorageReadJournalProvider : IReadJournalProvider + { + private readonly ExtendedActorSystem _system; + private readonly Config _config; + + public AzureTableStorageReadJournalProvider(ExtendedActorSystem system, Config config) + { + _system = system; + _config = config; + } + + public IReadJournal GetReadJournal() + { + return new AzureTableStorageReadJournal(_system, _config); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/DeliveryBuffer.cs b/src/Akka.Persistence.Azure/Query/DeliveryBuffer.cs new file mode 100644 index 0000000..3f55f3e --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/DeliveryBuffer.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace Akka.Persistence.Azure.Query +{ + internal class DeliveryBuffer + { + private readonly Action _onNext; + + public DeliveryBuffer(Action onNext) + { + _onNext = onNext; + } + + public ImmutableArray Buffer { get; private set; } = ImmutableArray.Empty; + + public bool IsEmpty => Buffer.IsEmpty; + + public int Length => Buffer.Length; + public void Add(T element) + { + Buffer = Buffer.Add(element); + } + + public void AddRange(IEnumerable elements) + { + Buffer = Buffer.AddRange(elements); + } + + public void DeliverBuffer(long demand) + { + if (!Buffer.IsEmpty && demand > 0) + { + var totalDemand = Math.Min((int)demand, Buffer.Length); + if (Buffer.Length == 1) + { + // optimize for this common case + _onNext(Buffer[0]); + Buffer = ImmutableArray.Empty; + } + else if (demand <= int.MaxValue) + { + for (var i = 0; i < totalDemand; i++) + _onNext(Buffer[i]); + + Buffer = Buffer.RemoveRange(0, totalDemand); + } + else + { + foreach (var element in Buffer) + _onNext(element); + + Buffer = ImmutableArray.Empty; + } + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Messages.cs b/src/Akka.Persistence.Azure/Query/Messages.cs new file mode 100644 index 0000000..4fb9c42 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Messages.cs @@ -0,0 +1,255 @@ +using Akka.Actor; +using Akka.Event; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace Akka.Persistence.Azure.Query +{ + /// + /// When message implements this interface, it indicates that such message is EventJournal read subscription command. + /// + public interface ISubscriptionCommand { } + + /// + /// TBD + /// + public sealed class CurrentPersistenceIds : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly IEnumerable AllPersistenceIds; + + /// + /// TBD + /// + /// TBD + public CurrentPersistenceIds(IEnumerable allPersistenceIds) + { + AllPersistenceIds = allPersistenceIds.ToImmutableHashSet(); + } + } + + /// + /// TBD + /// + public sealed class EventAppended : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly string PersistenceId; + + /// + /// TBD + /// + /// TBD + public EventAppended(string persistenceId) + { + PersistenceId = persistenceId; + } + } + + public sealed class PersistenceIdAdded : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly string PersistenceId; + + /// + /// TBD + /// + /// TBD + public PersistenceIdAdded(string persistenceId) + { + PersistenceId = persistenceId; + } + } + + /// + /// TBD + /// + public sealed class ReplayedTaggedMessage : INoSerializationVerificationNeeded, IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly long Offset; + + /// + /// TBD + /// + public readonly IPersistentRepresentation Persistent; + + /// + /// TBD + /// + public readonly string Tag; + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + public ReplayedTaggedMessage(IPersistentRepresentation persistent, string tag, long offset) + { + Persistent = persistent; + Tag = tag; + Offset = offset; + } + } + + /// + /// TBD + /// + public sealed class ReplayTaggedMessages : IJournalRequest + { + /// + /// TBD + /// + public readonly long FromOffset; + + /// + /// TBD + /// + public readonly long Max; + + /// + /// TBD + /// + public readonly IActorRef ReplyTo; + + /// + /// TBD + /// + public readonly string Tag; + + /// + /// TBD + /// + public readonly long ToOffset; + + /// + /// Initializes a new instance of the class. + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// + /// This exception is thrown for a number of reasons. These include the following: + ///
    + ///
  • The specified is less than zero.
  • + ///
  • The specified is less than or equal to zero.
  • + ///
  • The specified is less than or equal to zero.
  • + ///
+ ///
+ /// + /// This exception is thrown when the specified is null or empty. + /// + public ReplayTaggedMessages(long fromOffset, long toOffset, long max, string tag, IActorRef replyTo) + { + if (fromOffset < 0) throw new ArgumentException("From offset may not be a negative number", nameof(fromOffset)); + if (toOffset <= 0) throw new ArgumentException("To offset must be a positive number", nameof(toOffset)); + if (max <= 0) throw new ArgumentException("Maximum number of replayed messages must be a positive number", nameof(max)); + if (string.IsNullOrEmpty(tag)) throw new ArgumentNullException(nameof(tag), "Replay tagged messages require a tag value to be provided"); + + FromOffset = fromOffset; + ToOffset = toOffset; + Max = max; + Tag = tag; + ReplyTo = replyTo; + } + } + + /// + /// Subscribe the `sender` to current and new persistenceIds. + /// Used by query-side. The journal will send one to the + /// subscriber followed by messages when new persistenceIds + /// are created. + /// + public sealed class SubscribeAllPersistenceIds : ISubscriptionCommand + { + /// + /// TBD + /// + public static readonly SubscribeAllPersistenceIds Instance = new SubscribeAllPersistenceIds(); + + private SubscribeAllPersistenceIds() + { + } + } + + /// + /// TBD + /// + /// + /// Subscribe the `sender` to changes (appended events) for a specific `persistenceId`. + /// Used by query-side. The journal will send messages to + /// the subscriber when has been called. + /// + public sealed class SubscribePersistenceId : ISubscriptionCommand + { + /// + /// TBD + /// + public readonly string PersistenceId; + + /// + /// TBD + /// + /// TBD + public SubscribePersistenceId(string persistenceId) + { + PersistenceId = persistenceId; + } + } + + /// + /// Subscribe the `sender` to changes (appended events) for a specific `tag`. + /// Used by query-side. The journal will send messages to + /// the subscriber when `asyncWriteMessages` has been called. + /// Events are tagged by wrapping in + /// via an . + /// + public sealed class SubscribeTag : ISubscriptionCommand + { + /// + /// TBD + /// + public readonly string Tag; + + /// + /// TBD + /// + /// TBD + public SubscribeTag(string tag) + { + Tag = tag; + } + } + + /// + /// TBD + /// + public sealed class TaggedEventAppended : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly string Tag; + + /// + /// TBD + /// + /// TBD + public TaggedEventAppended(string tag) + { + Tag = tag; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByPersistenceIdPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByPersistenceIdPublisher.cs new file mode 100644 index 0000000..4be44c7 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByPersistenceIdPublisher.cs @@ -0,0 +1,106 @@ +using Akka.Actor; +using Akka.Event; +using Akka.Persistence.Query; +using Akka.Streams.Actors; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal abstract class AbstractEventsByPersistenceIdPublisher : ActorPublisher + { + private ILoggingAdapter _log; + + protected DeliveryBuffer Buffer; + protected readonly IActorRef JournalRef; + protected long CurrentSequenceNr; + + protected AbstractEventsByPersistenceIdPublisher(string persistenceId, long fromSequenceNr, long toSequenceNr, int maxBufferSize, string writeJournalPluginId) + { + PersistenceId = persistenceId; + CurrentSequenceNr = FromSequenceNr = fromSequenceNr; + ToSequenceNr = toSequenceNr; + MaxBufferSize = maxBufferSize; + WriteJournalPluginId = writeJournalPluginId; + Buffer = new DeliveryBuffer(OnNext); + + JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + } + + protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected string PersistenceId { get; } + protected long FromSequenceNr { get; } + protected long ToSequenceNr { get; set; } + protected int MaxBufferSize { get; } + protected string WriteJournalPluginId { get; } + + protected bool IsTimeForReplay => (Buffer.IsEmpty || Buffer.Length <= MaxBufferSize / 2) && (CurrentSequenceNr <= ToSequenceNr); + + protected abstract void ReceiveInitialRequest(); + protected abstract void ReceiveIdleRequest(); + protected abstract void ReceiveRecoverySuccess(long highestSequenceNr); + + protected override bool Receive(object message) + { + return Init(message); + } + + protected bool Init(object message) + { + return message.Match() + .With(() => { }) + .With(_ => ReceiveInitialRequest()) + .With(_ => Context.Stop(Self)) + .WasHandled; + } + + protected bool Idle(object message) + { + return message.Match() + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(_ => ReceiveIdleRequest()) + .With(_ => Context.Stop(Self)) + .WasHandled; + } + + protected void Replay() + { + var limit = MaxBufferSize - Buffer.Length; + Log.Debug("request replay for persistenceId [{0}] from [{1}] to [{2}] limit [{3}]", PersistenceId, CurrentSequenceNr, ToSequenceNr, limit); + JournalRef.Tell(new ReplayMessages(CurrentSequenceNr, ToSequenceNr, limit, PersistenceId, Self)); + Context.Become(Replaying(limit)); + } + + protected Receive Replaying(int limit) + { + return message => message.Match() + .With(replayed => { + var seqNr = replayed.Persistent.SequenceNr; + Buffer.Add(new EventEnvelope( + offset: new Sequence(seqNr), + persistenceId: PersistenceId, + sequenceNr: seqNr, + @event: replayed.Persistent.Payload)); + CurrentSequenceNr = seqNr + 1; + Buffer.DeliverBuffer(TotalDemand); + }) + .With(success => { + Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId, CurrentSequenceNr); + ReceiveRecoverySuccess(success.HighestSequenceNr); + }) + .With(failure => { + Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId, failure.Cause.Message); + Buffer.DeliverBuffer(TotalDemand); + OnErrorThenStop(failure.Cause); + }) + .With(_ => Buffer.DeliverBuffer(TotalDemand)) + .With(() => { }) // skip during replay + .With(() => { }) // skip during replay + .With(_ => Context.Stop(Self)) + .WasHandled; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByTagPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByTagPublisher.cs new file mode 100644 index 0000000..1000874 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/AbstractEventsByTagPublisher.cs @@ -0,0 +1,92 @@ +using Akka.Actor; +using Akka.Event; +using Akka.Persistence.Query; +using Akka.Streams.Actors; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal abstract class AbstractEventsByTagPublisher : ActorPublisher + { + private ILoggingAdapter _log; + + protected readonly DeliveryBuffer Buffer; + protected readonly IActorRef JournalRef; + protected long CurrentOffset; + protected AbstractEventsByTagPublisher(string tag, long fromOffset, int maxBufferSize, string writeJournalPluginId) + { + Tag = tag; + CurrentOffset = FromOffset = fromOffset; + MaxBufferSize = maxBufferSize; + WriteJournalPluginId = writeJournalPluginId; + Buffer = new DeliveryBuffer(OnNext); + JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + } + + protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected string Tag { get; } + protected long FromOffset { get; } + protected abstract long ToOffset { get; } + protected int MaxBufferSize { get; } + protected string WriteJournalPluginId { get; } + + protected bool IsTimeForReplay => (Buffer.IsEmpty || Buffer.Length <= MaxBufferSize / 2) && (CurrentOffset <= ToOffset); + + protected abstract void ReceiveInitialRequest(); + protected abstract void ReceiveIdleRequest(); + protected abstract void ReceiveRecoverySuccess(long highestSequenceNr); + + protected override bool Receive(object message) => message.Match() + .With(_ => ReceiveInitialRequest()) + .With(() => { }) + .With(_ => Context.Stop(Self)) + .WasHandled; + + protected bool Idle(object message) => message.Match() + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(ReceiveIdleRequest) + .With(() => Context.Stop(Self)) + .WasHandled; + + protected void Replay() + { + var limit = MaxBufferSize - Buffer.Length; + Log.Debug("request replay for tag [{0}] from [{1}] to [{2}] limit [{3}]", Tag, CurrentOffset, ToOffset, limit); + JournalRef.Tell(new ReplayTaggedMessages(CurrentOffset, ToOffset, limit, Tag, Self)); + Context.Become(Replaying(limit)); + } + + protected Receive Replaying(int limit) + { + return message => message.Match() + .With(replayed => { + Buffer.Add(new EventEnvelope( + offset: new Sequence(replayed.Offset), + persistenceId: replayed.Persistent.PersistenceId, + sequenceNr: replayed.Persistent.SequenceNr, + @event: replayed.Persistent.Payload)); + + CurrentOffset = replayed.Offset; + Buffer.DeliverBuffer(TotalDemand); + }) + .With(success => { + Log.Debug("replay completed for tag [{0}], currOffset [{1}]", Tag, CurrentOffset); + ReceiveRecoverySuccess(success.HighestSequenceNr); + }) + .With(failure => { + Log.Debug("replay failed for tag [{0}], due to [{1}]", Tag, failure.Cause.Message); + Buffer.DeliverBuffer(TotalDemand); + OnErrorThenStop(failure.Cause); + }) + .With(_ => Buffer.DeliverBuffer(TotalDemand)) + .With(() => { }) + .With(() => { }) + .With(() => Context.Stop(Self)) + .WasHandled; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/AllPersistenceIdsPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/AllPersistenceIdsPublisher.cs new file mode 100644 index 0000000..5f149b0 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/AllPersistenceIdsPublisher.cs @@ -0,0 +1,57 @@ +using Akka.Actor; +using Akka.Streams.Actors; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal sealed class AllPersistenceIdsPublisher : ActorPublisher + { + public static Props Props(bool liveQuery, string writeJournalPluginId) + { + return Actor.Props.Create(() => new AllPersistenceIdsPublisher(liveQuery, writeJournalPluginId)); + } + + private readonly bool _liveQuery; + private readonly IActorRef _journalRef; + private readonly DeliveryBuffer _buffer; + + public AllPersistenceIdsPublisher(bool liveQuery, string writeJournalPluginId) + { + _liveQuery = liveQuery; + _buffer = new DeliveryBuffer(OnNext); + _journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + } + + + + protected override bool Receive(object message) => message.Match() + .With(_ => { + _journalRef.Tell(SubscribeAllPersistenceIds.Instance); + Become(Active); + }) + .With(_ => Context.Stop(Self)) + .WasHandled; + + private bool Active(object message) => message.Match() + .With(current => { + _buffer.AddRange(current.AllPersistenceIds); + _buffer.DeliverBuffer(TotalDemand); + + if (!_liveQuery && _buffer.IsEmpty) + OnCompleteThenStop(); + }) + .With(added => { + if (_liveQuery) + { + _buffer.Add(added.PersistenceId); + _buffer.DeliverBuffer(TotalDemand); + } + }) + .With(_ => { + _buffer.DeliverBuffer(TotalDemand); + if (!_liveQuery && _buffer.IsEmpty) + OnCompleteThenStop(); + }) + .With(_ => Context.Stop(Self)) + .WasHandled; + } +} diff --git a/src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByPersistenceIdPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByPersistenceIdPublisher.cs new file mode 100644 index 0000000..65beaff --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByPersistenceIdPublisher.cs @@ -0,0 +1,39 @@ +using Akka.Actor; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal sealed class CurrentEventsByPersistenceIdPublisher : AbstractEventsByPersistenceIdPublisher + { + public CurrentEventsByPersistenceIdPublisher(string persistenceId, long fromSequenceNr, long toSequenceNr, int maxBufferSize, string writeJournalPluginId) + : base(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId) + { + } + + protected override void ReceiveInitialRequest() + { + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentSequenceNr > ToSequenceNr) + OnCompleteThenStop(); + else + Self.Tell(EventsByPersistenceIdPublisher.Continue.Instance); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (highestSequenceNr < ToSequenceNr) + ToSequenceNr = highestSequenceNr; + if (Buffer.IsEmpty && (CurrentSequenceNr > ToSequenceNr || CurrentSequenceNr == FromSequenceNr)) + OnCompleteThenStop(); + else + Self.Tell(EventsByPersistenceIdPublisher.Continue.Instance); + + Context.Become(Idle); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByTagPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByTagPublisher.cs new file mode 100644 index 0000000..67f9a9e --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/CurrentEventsByTagPublisher.cs @@ -0,0 +1,53 @@ +using Akka.Actor; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal sealed class CurrentEventsByTagPublisher + : AbstractEventsByTagPublisher + { + private long _toOffset; + + public CurrentEventsByTagPublisher( + string tag, + long fromOffset, + long toOffset, + int maxBufferSize, + string writeJournalPluginId) + : base(tag, fromOffset, maxBufferSize, writeJournalPluginId) + { + _toOffset = toOffset; + } + + protected override long ToOffset => _toOffset; + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + else + Self.Tell(EventsByTagPublisher.Continue.Instance); + } + + protected override void ReceiveInitialRequest() + { + Replay(); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + + if (highestSequenceNr < ToOffset) + _toOffset = highestSequenceNr; + + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + else + Self.Tell(EventsByTagPublisher.Continue.Instance); + + Context.Become(Idle); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/EventsByPersistenceIdPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/EventsByPersistenceIdPublisher.cs new file mode 100644 index 0000000..41008de --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/EventsByPersistenceIdPublisher.cs @@ -0,0 +1,24 @@ +using System; +using Akka.Actor; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal static class EventsByPersistenceIdPublisher + { + public sealed class Continue + { + public static readonly Continue Instance = new Continue(); + + private Continue() + { + } + } + + public static Props Props(string persistenceId, long fromSequenceNr, long toSequenceNr, TimeSpan? refreshDuration, int maxBufferSize, string writeJournalPluginId) + { + return refreshDuration.HasValue + ? Actor.Props.Create(() => new LiveEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId, refreshDuration.Value)) + : Actor.Props.Create(() => new CurrentEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId)); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/EventsByTagPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/EventsByTagPublisher.cs new file mode 100644 index 0000000..6e79fca --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/EventsByTagPublisher.cs @@ -0,0 +1,24 @@ +using System; +using Akka.Actor; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal static class EventsByTagPublisher + { + public sealed class Continue + { + public static readonly Continue Instance = new Continue(); + + private Continue() + { + } + } + + public static Props Props(string tag, long fromOffset, long toOffset, TimeSpan? refreshInterval, int maxBufferSize, string writeJournalPluginId) + { + return refreshInterval.HasValue + ? Actor.Props.Create(() => new LiveEventsByTagPublisher(tag, fromOffset, toOffset, refreshInterval.Value, maxBufferSize, writeJournalPluginId)) + : Actor.Props.Create(() => new CurrentEventsByTagPublisher(tag, fromOffset, toOffset, maxBufferSize, writeJournalPluginId)); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByPersistenceIdPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByPersistenceIdPublisher.cs new file mode 100644 index 0000000..ff63ef7 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByPersistenceIdPublisher.cs @@ -0,0 +1,44 @@ +using System; +using Akka.Actor; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal sealed class LiveEventsByPersistenceIdPublisher : AbstractEventsByPersistenceIdPublisher + { + private readonly ICancelable _tickCancelable; + + public LiveEventsByPersistenceIdPublisher(string persistenceId, long fromSequenceNr, long toSequenceNr, int maxBufferSize, string writeJournalPluginId, TimeSpan refreshInterval) + : base(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId) + { + _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByPersistenceIdPublisher.Continue.Instance, Self); + } + + protected override void PostStop() + { + _tickCancelable.Cancel(); + base.PostStop(); + } + + protected override void ReceiveInitialRequest() + { + JournalRef.Tell(new SubscribePersistenceId(PersistenceId)); + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentSequenceNr > ToSequenceNr) + OnCompleteThenStop(); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentSequenceNr > ToSequenceNr) + OnCompleteThenStop(); + + Context.Become(Idle); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByTagPublisher.cs b/src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByTagPublisher.cs new file mode 100644 index 0000000..f39a802 --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/Publishers/LiveEventsByTagPublisher.cs @@ -0,0 +1,53 @@ +using System; +using Akka.Actor; + +namespace Akka.Persistence.Azure.Query.Publishers +{ + internal sealed class LiveEventsByTagPublisher : AbstractEventsByTagPublisher + { + private readonly ICancelable _tickCancelable; + + public LiveEventsByTagPublisher( + string tag, + long fromOffset, + long toOffset, + TimeSpan refreshInterval, + int maxBufferSize, + string writeJournalPluginId) + : base(tag, fromOffset, maxBufferSize, writeJournalPluginId) + { + ToOffset = toOffset; + _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self); + } + + protected override long ToOffset { get; } + + protected override void PostStop() + { + _tickCancelable.Cancel(); + base.PostStop(); + } + + protected override void ReceiveInitialRequest() + { + JournalRef.Tell(new SubscribeTag(Tag)); + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + + Context.Become(Idle); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs index b20902d..c1fe15e 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs @@ -115,12 +115,16 @@ await filtered.DownloadToStreamAsync(memoryStream, AccessCondition.GenerateEmpty GenerateOptions(), new OperationContext(), cts.Token); var snapshot = _serialization.SnapshotFromBytes(memoryStream.ToArray()); - return new SelectedSnapshot( - new SnapshotMetadata( - persistenceId, - FetchBlobSeqNo(filtered), - new DateTime(FetchBlobTimestamp(filtered))), - snapshot.Data); + + var returnValue = + new SelectedSnapshot( + new SnapshotMetadata( + persistenceId, + FetchBlobSeqNo(filtered), + new DateTime(FetchBlobTimestamp(filtered))), + snapshot.Data); + + return returnValue; } } @@ -146,9 +150,13 @@ protected override async Task SaveAsync(SnapshotMetadata metadata, object snapsh */ blob.Metadata.Add(SeqNoMetaDataKey, metadata.SequenceNr.ToString()); - await blob.UploadFromByteArrayAsync(snapshotData, 0, snapshotData.Length, + await blob.UploadFromByteArrayAsync( + snapshotData, + 0, + snapshotData.Length, AccessCondition.GenerateEmptyCondition(), - GenerateOptions(), new OperationContext(), + GenerateOptions(), + new OperationContext(), cts.Token); } } diff --git a/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs b/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs new file mode 100644 index 0000000..f2f5005 --- /dev/null +++ b/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs @@ -0,0 +1,62 @@ +using System; +using System.Collections.Generic; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; + +namespace Akka.Persistence.Azure.TableEntities +{ + internal sealed class AllPersistenceIdsEntry + : ITableEntity + { + private const string ManifestKeyName = "manifest"; + public const string PartitionKeyValue = "allPersistenceIdsIdx"; + + // In order to use this in a TableQuery a parameterless constructor is required + public AllPersistenceIdsEntry() + { + } + + public AllPersistenceIdsEntry( + string persistenceId, + string manifest = "") + { + PartitionKey = PartitionKeyValue; + + RowKey = persistenceId; + + Manifest = manifest; + } + + public string ETag { get; set; } + + public string Manifest { get; set; } + + public string PartitionKey { get; set; } + + public string RowKey { get; set; } + + public DateTimeOffset Timestamp { get; set; } + + public void ReadEntity( + IDictionary properties, + OperationContext operationContext) + { + Manifest = + properties.ContainsKey(ManifestKeyName) + ? properties[ManifestKeyName].StringValue + : string.Empty; + } + + public IDictionary WriteEntity( + OperationContext operationContext) + { + var dict = + new Dictionary + { + [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), + }; + + return dict; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs b/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs new file mode 100644 index 0000000..02b3c31 --- /dev/null +++ b/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Akka.Persistence.Azure.Util; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; + +namespace Akka.Persistence.Azure.TableEntities +{ + internal sealed class EventTagEntry + : ITableEntity + { + public const char Delimiter = ':'; + public const char AsciiIncrementedDelimiter = ';'; //(char)((byte)Delimiter + 1) + public const string PartitionKeyValue = "eventTagIdx"; + public const string UtcTicksKeyName = "utcTicks"; + private const string ManifestKeyName = "manifest"; + private const string PayloadKeyName = "payload"; + public const string IdxPartitionKeyKeyName = "idxPartitionKey"; + public const string IdxRowKeyKeyName = "idxRowKey"; + public const string IdxTagKeyName = "idxTag"; + + // In order to use this in a TableQuery a parameterless constructor is required + public EventTagEntry() + { + } + + public EventTagEntry( + string persistenceId, + string tag, + long seqNo, + byte[] payload, + string manifest, + long? utcTicks = null) + { + if (persistenceId.Any(x => x == Delimiter)) + { + throw new ArgumentException($"Must not contain {Delimiter}.", nameof(persistenceId)); + } + + if (tag.Any(x => x == Delimiter)) + { + throw new ArgumentException($"Must not contain {Delimiter}.", nameof(tag)); + } + + PartitionKey = GetPartitionKey(tag); + + Manifest = manifest; + + IdxPartitionKey = persistenceId; + + IdxRowKey = seqNo.ToJournalRowKey(); + + IdxTag = tag; + + UtcTicks = utcTicks ?? DateTime.UtcNow.Ticks; + + RowKey = GetRowKey(UtcTicks, IdxPartitionKey, IdxRowKey); + + Payload = payload; + } + + public string ETag { get; set; } + + public string IdxPartitionKey { get; set; } + + public string IdxRowKey { get; set; } + + public string IdxTag { get; set; } + + public string Manifest { get; set; } + + public string PartitionKey { get; set; } + + public byte[] Payload { get; set; } + + public string RowKey { get; set; } + + public DateTimeOffset Timestamp { get; set; } + + public long UtcTicks { get; set; } + + public void ReadEntity(IDictionary properties, OperationContext operationContext) + { + Manifest = + properties.ContainsKey(ManifestKeyName) + ? properties[ManifestKeyName].StringValue + : string.Empty; + + IdxPartitionKey = properties[IdxPartitionKeyKeyName].StringValue; + + IdxRowKey = properties[IdxRowKeyKeyName].StringValue; + + IdxTag = properties[IdxTagKeyName].StringValue; + + UtcTicks = properties[UtcTicksKeyName].Int64Value.Value; + + Payload = properties[PayloadKeyName].BinaryValue; + } + + public IDictionary WriteEntity(OperationContext operationContext) + { + var dict = + new Dictionary + { + [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), + [PayloadKeyName] = EntityProperty.GeneratePropertyForByteArray(Payload), + [UtcTicksKeyName] = EntityProperty.GeneratePropertyForLong(UtcTicks), + [IdxPartitionKeyKeyName] = EntityProperty.GeneratePropertyForString(IdxPartitionKey), + [IdxRowKeyKeyName] = EntityProperty.GeneratePropertyForString(IdxRowKey), + [IdxTagKeyName] = EntityProperty.GeneratePropertyForString(IdxTag) + }; + + return dict; + } + + public static string GetPartitionKey(string tag) + { + return $"{PartitionKeyValue}-{tag}"; + } + + public static string GetRowKey( + long utcTicks, + string idxPartitionKey, + string idxRowKey) + { + return $"{utcTicks.ToJournalRowKey()}{Delimiter}{idxPartitionKey}{Delimiter}{idxRowKey}"; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs b/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs new file mode 100644 index 0000000..6c3f848 --- /dev/null +++ b/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; + +namespace Akka.Persistence.Azure.TableEntities +{ + internal sealed class HighestSequenceNrEntry + : ITableEntity + { + public const string HighestSequenceNrKey = "highestSequenceNr"; + private const string ManifestKeyName = "manifest"; + public const string RowKeyValue = "highestSequenceNr"; + + // In order to use this in a TableQuery a parameterless constructor is required + public HighestSequenceNrEntry() + { + } + + public HighestSequenceNrEntry( + string persistenceId, + long highestSequenceNr, + string manifest = "") + { + PartitionKey = persistenceId; + + RowKey = RowKeyValue; + + HighestSequenceNr = highestSequenceNr; + + Manifest = manifest; + } + + public string ETag { get; set; } + + public long HighestSequenceNr { get; set; } + + public string Manifest { get; set; } + + public string PartitionKey { get; set; } + + public string RowKey { get; set; } + + public DateTimeOffset Timestamp { get; set; } + + public void ReadEntity( + IDictionary properties, + OperationContext operationContext) + { + Manifest = + properties.ContainsKey(ManifestKeyName) + ? properties[ManifestKeyName].StringValue + : string.Empty; + + HighestSequenceNr = properties[HighestSequenceNrKey].Int64Value.Value; + } + + public IDictionary WriteEntity( + OperationContext operationContext) + { + var dict = + new Dictionary + { + [HighestSequenceNrKey] = EntityProperty.GeneratePropertyForLong(HighestSequenceNr), + [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), + }; + + return dict; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/PersistentJournalEntry.cs b/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs similarity index 61% rename from src/Akka.Persistence.Azure/Journal/PersistentJournalEntry.cs rename to src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs index 42ace36..c6bec5d 100644 --- a/src/Akka.Persistence.Azure/Journal/PersistentJournalEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs @@ -4,13 +4,15 @@ // // ----------------------------------------------------------------------- -using System; -using System.Collections.Generic; +using Akka.Persistence.Azure.Journal; using Akka.Persistence.Azure.Util; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Table; +using System; +using System.Collections.Generic; +using System.Linq; -namespace Akka.Persistence.Azure.Journal +namespace Akka.Persistence.Azure.TableEntities { /// /// INTERNAL API. @@ -27,34 +29,48 @@ namespace Akka.Persistence.Azure.Journal /// internal sealed class PersistentJournalEntry : ITableEntity { - private const string PayloadKeyName = "payload"; + public const string TagsKeyName = "tags"; + public const string UtcTicksKeyName = "utcTicks"; private const string ManifestKeyName = "manifest"; + private const string PayloadKeyName = "payload"; private const string SeqNoKeyName = "seqno"; public PersistentJournalEntry() { } - public PersistentJournalEntry(string persistentId, long seqNo, byte[] payload, string manifest) + public PersistentJournalEntry( + string persistentId, + long seqNo, + byte[] payload, + string manifest = "", + params string[] tags) { Payload = payload; Manifest = manifest; - + Tags = tags ?? new string[]{}; PartitionKey = persistentId; SeqNo = seqNo; RowKey = seqNo.ToJournalRowKey(); + UtcTicks = DateTime.UtcNow.Ticks; } + public string ETag { get; set; } + /// /// The serialization manifest. /// public string Manifest { get; set; } + public string PartitionKey { get; set; } + /// /// The persistent payload /// public byte[] Payload { get; set; } + public string RowKey { get; set; } + /// /// The sequence number. /// @@ -64,32 +80,49 @@ public PersistentJournalEntry(string persistentId, long seqNo, byte[] payload, s /// public long SeqNo { get; set; } + /// + /// Tags associated with this entry, if any + /// + public string[] Tags { get; set; } + + public DateTimeOffset Timestamp { get; set; } + + /// + /// Ticks of current UTC at the time the entry was created + /// + /// + /// Azure Table Storage does not index the Timestamp value so performing + /// any query against it will be extremely slow + /// + public long UtcTicks { get; set; } + public void ReadEntity(IDictionary properties, OperationContext operationContext) { - if (properties.ContainsKey(ManifestKeyName)) - Manifest = properties[ManifestKeyName].StringValue; - else - Manifest = string.Empty; + Manifest = + properties.ContainsKey(ManifestKeyName) + ? properties[ManifestKeyName].StringValue + : string.Empty; // an exception is fine here - means the data is corrupted anyway SeqNo = properties[SeqNoKeyName].Int64Value.Value; Payload = properties[PayloadKeyName].BinaryValue; + Tags = properties[TagsKeyName].StringValue.Split(new []{';'}, StringSplitOptions.RemoveEmptyEntries); + UtcTicks = properties[UtcTicksKeyName].Int64Value.Value; } public IDictionary WriteEntity(OperationContext operationContext) { - var dict = new Dictionary - { - [PayloadKeyName] = EntityProperty.GeneratePropertyForByteArray(Payload), - [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), - [SeqNoKeyName] = EntityProperty.GeneratePropertyForLong(SeqNo) - }; + var dict = + new Dictionary + { + [PayloadKeyName] = EntityProperty.GeneratePropertyForByteArray(Payload), + [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), + [SeqNoKeyName] = EntityProperty.GeneratePropertyForLong(SeqNo), + [TagsKeyName] = EntityProperty.GeneratePropertyForString(string.Join(";", Tags)), + [UtcTicksKeyName] = EntityProperty.GeneratePropertyForLong(UtcTicks) + }; + return dict; } - - public string PartitionKey { get; set; } - public string RowKey { get; set; } - public DateTimeOffset Timestamp { get; set; } - public string ETag { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/reference.conf b/src/Akka.Persistence.Azure/reference.conf index 52aaa0b..8ed299f 100644 --- a/src/Akka.Persistence.Azure/reference.conf +++ b/src/Akka.Persistence.Azure/reference.conf @@ -30,6 +30,30 @@ akka.persistence { } } + query { + journal { + azure-table { + # Implementation class of the Azure ReadJournalProvider + class = "Akka.Persistence.Azure.Query.AzureTableStorageReadJournalProvider, Akka.Persistence.Azure" + + # Absolute path to the write journal plugin configuration entry that this + # query journal will connect to. + # If undefined (or "") it will connect to the default journal as specified by the + # akka.persistence.journal.plugin property. + write-plugin = "" + + # How many events to fetch in one query (replay) and keep buffered until they + # are delivered downstreams. + max-buffer-size = 100 + + # The Azure Table write journal is notifying the query side as soon as things + # are persisted, but for efficiency reasons the query side retrieves the events + # in batches that sometimes can be delayed up to the configured `refresh-interval`. + refresh-interval = 3s + } + } + } + snapshot-store { azure-blob-store { # qualified type name of the Azure Blob Storage persistence snapshot storage actor