From b2e3c85fe934c3ce85e0a6429062674ac4edc070 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 15 Nov 2019 12:27:30 -0600 Subject: [PATCH] added edge case specs for Akka.Persistence.Query (#60) * added edge case specs for Akka.Persistence.Query * updated the spec names --- .../Query/AzureTableQueryEdgeCaseSpecs.cs | 190 ++++++++++++++++++ .../Query/AzureTableStorageReadJournal.cs | 2 +- 2 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs new file mode 100644 index 0000000..0cae1ab --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs @@ -0,0 +1,190 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Azure.Query; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; +using Akka.Persistence.Journal; +using Akka.Persistence.Query; +using Akka.Streams; +using Akka.Util.Internal; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests.Query +{ + [Collection("AzureQuery")] + public class AzureTableQueryEdgeCaseSpecs : Akka.TestKit.Xunit2.TestKit + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + protected AzureTableStorageReadJournal ReadJournal { get; } + + protected IMaterializer Materializer { get; } + + public class RealMsg + { + public RealMsg(string msg) + { + Msg = msg; + } + public string Msg { get; } + } + + public const int MessageCount = 20; + + public AzureTableQueryEdgeCaseSpecs(ITestOutputHelper output) + : base(Config(), nameof(AzureTableQueryEdgeCaseSpecs), output) + { + _output = output; + Materializer = Sys.Materializer(); + ReadJournal = Sys.ReadJournalFor(AzureTableStorageReadJournal.Identifier); + } + + /// + /// Reproduction spec for https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/61 + /// + [Fact] + public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag() + { + var actor = Sys.ActorOf(TagActor.Props("x")); + + actor.Tell(MessageCount); + ExpectMsg($"{MessageCount}-done", TimeSpan.FromSeconds(20)); + + var eventsById = await ReadJournal.CurrentEventsByPersistenceId("x", 0L, long.MaxValue) + .RunAggregate(ImmutableHashSet.Empty, (agg, e) => agg.Add(e), Materializer); + + eventsById.Count.Should().Be(MessageCount); + + var eventsByTag = await ReadJournal.CurrentEventsByTag(typeof(RealMsg).Name) + .RunAggregate(ImmutableHashSet.Empty, (agg, e) => agg.Add(e), Materializer); + + eventsByTag.Count.Should().Be(MessageCount); + + eventsById.All(x => x.Event is RealMsg).Should().BeTrue("Expected all events by id to be RealMsg"); + eventsByTag.All(x => x.Event is RealMsg).Should().BeTrue("Expected all events by tag to be RealMsg"); + } + + /// + /// Reproduction spec for https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80 + /// + [Fact] + public void Bug80_CurrentEventsByTag_should_Recover_until_end() + { + var actor = Sys.ActorOf(TagActor.Props("y")); + var msgCount = 1200; + actor.Tell(msgCount); + ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20)); + + var eventsByTag = ReadJournal.CurrentEventsByTag(typeof(RealMsg).Name) + .RunForeach(e => TestActor.Tell(e), Materializer); + + ReceiveN(msgCount); + } + + /// + /// Making sure EventsByTag didn't break during implementation of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80 + /// + [Fact] + public void Bug80_AllEventsByTag_should_Recover_all_messages() + { + var actor = Sys.ActorOf(TagActor.Props("y")); + var msgCount = 1200; + actor.Tell(msgCount); + ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20)); + + var eventsByTag = ReadJournal.EventsByTag(typeof(RealMsg).Name) + .RunForeach(e => TestActor.Tell(e), Materializer); + + // can't do this because Offset isn't IComparable + // ReceiveN(msgCount).Cast().Select(x => x.Offset).Should().BeInAscendingOrder(); + ReceiveN(msgCount); + + // should receive more messages after the fact + actor.Tell(msgCount); + ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20)); + ReceiveN(msgCount); + } + + private class TagActor : ReceivePersistentActor + { + public static Props Props(string id) + { + return Akka.Actor.Props.Create(() => new TagActor(id)); + } + + public TagActor(string id) + { + PersistenceId = id; + + Command(i => + { + var msgs = new List(); + foreach (var n in Enumerable.Range(0, i)) + { + msgs.Add(new RealMsg(i.ToString())); + } + PersistAll(msgs, m => + { + if (LastSequenceNr >= i) + { + Sender.Tell($"{i}-done"); + } + }); + }); + + Command(r => + { + Persist(r, e => + { + Sender.Tell($"{e.Msg}-done"); + }); + }); + } + + public override string PersistenceId { get; } + } + + private class EventTagger : IWriteEventAdapter + { + public string DefaultTag { get; } + + public EventTagger() + { + DefaultTag = "accounts"; + } + + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + return new Tagged(evt, ImmutableHashSet.Empty.Add(DefaultTag).Add(evt.GetType().Name)); + } + } + + public static string TableName { get; private set; } + + public static Config Config() + { + var azureConfig = + !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) + : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + + TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + + return azureConfig; + } + } +} diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs index e11a592..3654c75 100644 --- a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs +++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs @@ -174,7 +174,7 @@ public Source EventsByTag(string tag, Offset offset = nu /// is completed immediately when it reaches the end of the "result set". Events that are /// stored after the query is completed are not included in the event stream. /// - public Source CurrentEventsByTag(string tag, Offset offset) + public Source CurrentEventsByTag(string tag, Offset offset = null) { offset = offset ?? new Sequence(0L); switch (offset)