From 1a3a0d05204efcf59d0c204fc5fefe3dce087ff6 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 30 Jun 2018 23:31:12 -0500 Subject: [PATCH 01/19] Journal fixes (#5) * fixed bug with recovery stage in journal * added SerializerSpecs and fixed more issues with the journal * made significnat progress on fixing Journal * have all but 1 spec passing now --- .../SerializerHelperSpecs.cs | 35 +++++ .../Journal/AzureTableStorageJournal.cs | 126 +++++++++++++----- .../SerializationHelper.cs | 3 +- 3 files changed, 129 insertions(+), 35 deletions(-) create mode 100644 src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs diff --git a/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs b/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs new file mode 100644 index 0000000..e485366 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Text; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Azure.Tests +{ + public class SerializerHelperSpecs : Akka.TestKit.Xunit2.TestKit + { + private readonly SerializationHelper _helper; + + public SerializerHelperSpecs(ITestOutputHelper helper) : base(output: helper) + { + // force Akka.Persistence serializers to be loaded + AzurePersistence.Get(Sys); + _helper = new SerializationHelper(Sys); + } + + [Fact] + public void ShouldSerializeAndDeserializePersistentRepresentation() + { + var persistentRepresentation = new Persistent("hi", 1L, "aaron"); + var bytes = _helper.PersistentToBytes(persistentRepresentation); + var deserialized = _helper.PersistentFromBytes(bytes); + + deserialized.Payload.Should().Be(persistentRepresentation.Payload); + deserialized.Manifest.Should().Be(persistentRepresentation.Manifest); + deserialized.SequenceNr.Should().Be(persistentRepresentation.SequenceNr); + deserialized.PersistenceId.Should().Be(persistentRepresentation.PersistenceId); + deserialized.IsDeleted.Should().BeFalse(); + } + } +} diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 4fc8dbd..255590c 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -82,30 +82,62 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per Action recoveryCallback) { #if DEBUG - _log.Debug("Entering method ReplayMessagesAsync for persistentId [{0}]", persistenceId); + _log.Debug("Entering method ReplayMessagesAsync for persistentId [{0}] from seqNo range [{1}, {2}] and taking up to max [{3}]", persistenceId, fromSequenceNr, toSequenceNr, max); #endif if (max == 0) return; - var replayQuery = ReplayQuery(persistenceId, fromSequenceNr, toSequenceNr, (int)max); + var replayQuery = ReplayQuery(persistenceId, fromSequenceNr, toSequenceNr); var nextTask = Table.ExecuteQuerySegmentedAsync(replayQuery, null); - + var count = 0L; while (nextTask != null) { var tableQueryResult = await nextTask; - // we have results - if (tableQueryResult.Results.Count > 0) +#if DEBUG + if (_log.IsDebugEnabled && _settings.VerboseLogging) + { + _log.Debug("Recovered [{0}] messages for entity [{1}]", tableQueryResult.Results.Count, persistenceId); + } +#endif + + if (tableQueryResult.ContinuationToken != null) + { + if (_log.IsDebugEnabled && _settings.VerboseLogging) + { + _log.Debug("Have additional messages to download for entity [{0}]", persistenceId); + } + // start the next query while we process the results of this one + nextTask = Table.ExecuteQuerySegmentedAsync(replayQuery, tableQueryResult.ContinuationToken); + } + else + { + if (_log.IsDebugEnabled && _settings.VerboseLogging) + { + _log.Debug("Completed download of messages for entity [{0}]", persistenceId); + } + + // terminates the loop + nextTask = null; + } + + foreach (var savedEvent in tableQueryResult.Results) { - // and we have more data waiting on the wire - if (tableQueryResult.ContinuationToken != null) - nextTask = Table.ExecuteQuerySegmentedAsync(replayQuery, tableQueryResult.ContinuationToken); - else - nextTask = null; // query is finished - - foreach (var savedEvent in tableQueryResult.Results) - recoveryCallback(_serialization.PersistentFromBytes(savedEvent.Payload)); + // check if we've hit max recovery + if (count >= max) + return; + ++count; + + var deserialized = _serialization.PersistentFromBytes(savedEvent.Payload); +#if DEBUG + if (_log.IsDebugEnabled && _settings.VerboseLogging) + { + _log.Debug("Recovering [{0}] for entity [{1}].", deserialized, savedEvent.PartitionKey); + } +#endif + recoveryCallback(deserialized); + } } @@ -115,7 +147,7 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per } private static TableQuery ReplayQuery(string persistentId, long fromSequenceNumber, - long toSequenceNumber, int max) + long toSequenceNumber) { return new TableQuery().Where( TableQuery.CombineFilters( @@ -125,7 +157,7 @@ private static TableQuery ReplayQuery(string persistentI TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.GreaterThanOrEqual, fromSequenceNumber.ToJournalRowKey())), TableOperators.And, TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.LessThanOrEqual, - toSequenceNumber.ToJournalRowKey()))).Take(max); + toSequenceNumber.ToJournalRowKey()))); } public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) @@ -133,23 +165,34 @@ public override async Task ReadHighestSequenceNrAsync(string persistenceId #if DEBUG _log.Debug("Entering method ReadHighestSequenceNrAsync"); #endif - var result = await Table.ExecuteQuerySegmentedAsync( - new TableQuery() - .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, persistenceId)) - .Take(1), - null); + var sequenceNumberQuery = GenerateHighestSequenceNumberQuery(persistenceId, fromSequenceNr); + var result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, null); + long seqNo = 0L; + + do + { + if (result.Results.Count > 0) + seqNo = Math.Max(seqNo, result.Results.Max(x => x.SeqNo)); + if(result.ContinuationToken != null) + result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, result.ContinuationToken); + } while (result.ContinuationToken != null); - long seqNo = 0L; - if (result.Results.Count > 0) - seqNo = result.Results.First().SeqNo; #if DEBUG - _log.Debug("Leaving method ReadHighestSequenceNrAsync with SeqNo [{0}] for PersistentId [{1}}]", seqNo, persistenceId); + _log.Debug("Leaving method ReadHighestSequenceNrAsync with SeqNo [{0}] for PersistentId [{1}]", seqNo, persistenceId); #endif return seqNo; } + private static TableQuery GenerateHighestSequenceNumberQuery(string persistenceId, long fromSequenceNr) + { + return new TableQuery() + .Where(TableQuery.CombineFilters(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, persistenceId), + TableOperators.And, + TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.GreaterThanOrEqual, fromSequenceNr.ToJournalRowKey()))); + } + protected override async Task> WriteMessagesAsync(IEnumerable messages) { try @@ -184,7 +227,7 @@ protected override async Task> WriteMessagesAsync(IEnu try { var results = await Table.ExecuteBatchAsync(batch, - new TableRequestOptions {MaximumExecutionTime = _settings.RequestTimeout}, + new TableRequestOptions { MaximumExecutionTime = _settings.RequestTimeout }, new OperationContext()); if (_log.IsDebugEnabled && _settings.VerboseLogging) @@ -234,11 +277,28 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t TableOperators.And, TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.LessThanOrEqual, toSequenceNr.ToJournalRowKey()))) - .Select(new[] { "PartitionKey", "RowKey" }); + ; + + var nextQuery = Table.ExecuteQuerySegmentedAsync(deleteQuery, null); - async Task DeleteRows(Task> queryTask) + while (nextQuery != null) { - var queryResults = await queryTask; + var queryResults = await nextQuery; + + if (_log.IsDebugEnabled && _settings.VerboseLogging) + { + _log.Debug("Have [{0}] messages to delete for entity [{1}]", queryResults.Results.Count, persistenceId); + } + + if (queryResults.ContinuationToken != null) // more data on the wire + { + nextQuery = Table.ExecuteQuerySegmentedAsync(deleteQuery, queryResults.ContinuationToken); + } + else + { + nextQuery = null; + } + if (queryResults.Results.Count > 0) { var tableBatchOperation = new TableBatchOperation(); @@ -246,17 +306,15 @@ async Task DeleteRows(Task> queryTask) tableBatchOperation.Delete(toBeDeleted); var deleteTask = Table.ExecuteBatchAsync(tableBatchOperation); - if (queryResults.ContinuationToken != null) // more data on the wire - { - var nextQuery = Table.ExecuteQuerySegmentedAsync(deleteQuery, queryResults.ContinuationToken); - await DeleteRows(nextQuery); - } + + await deleteTask; } } + - await DeleteRows(Table.ExecuteQuerySegmentedAsync(deleteQuery, null)); + #if DEBUG _log.Debug("Leaving method DeleteMessagesToAsync for persistentId [{0}] and up to seqNo [{1}]", persistenceId, toSequenceNr); diff --git a/src/Akka.Persistence.Azure/SerializationHelper.cs b/src/Akka.Persistence.Azure/SerializationHelper.cs index 79c2f20..6b78543 100644 --- a/src/Akka.Persistence.Azure/SerializationHelper.cs +++ b/src/Akka.Persistence.Azure/SerializationHelper.cs @@ -44,7 +44,8 @@ public IPersistentRepresentation PersistentFromBytes(byte[] bytes) */ var serializer = _actorSystem.Serialization.FindSerializerForType(_persistentRepresentation); - return serializer.FromBinary(bytes); + var msg = serializer.FromBinary(bytes); + return msg; } public IPersistentRepresentation PersistentFromBytesWithManifest(byte[] bytes, string manifest) From be1961513ae75efccf7c885af1a4818882e1b792 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sun, 1 Jul 2018 12:54:42 -0500 Subject: [PATCH 02/19] dealt with NBench version (#6) * dealt with NBench version * added real journal benchmarks * fix benchmarks * fixed first round of benchmarks to execute correctly * swap to PersistAsync * disabled logging during benchmarks * fixed termination logic * fixed log levels * turned DEBUG mode back on * more tuning --- .gitignore | 1 + build.fsx | 4 +- build.ps1 | 2 +- .../DbUtils.cs | 16 +++ ...Persistence.Azure.Tests.Performance.csproj | 1 + .../AzureJournalPerfSpecs.cs | 131 ++++++++++++++++++ .../PersistentBenchmarkMsgs.cs | 36 +++++ .../PersistentJournalBenchmarkActor.cs | 76 ++++++++++ .../UnitTest1.cs | 31 ----- .../AzureTableJournalSpec.cs | 8 +- 10 files changed, 268 insertions(+), 38 deletions(-) create mode 100644 src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs create mode 100644 src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs create mode 100644 src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs create mode 100644 src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs delete mode 100644 src/Akka.Persistence.Azure.Tests.Performance/UnitTest1.cs diff --git a/.gitignore b/.gitignore index 3ae1c36..4f6922e 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,7 @@ _site/ # MSTest test Results [Tt]est[Rr]esult*/ +[Pp]erf[Rr]esult*/ [Bb]uild[Ll]og.* # NUNIT diff --git a/build.fsx b/build.fsx index 77e638a..6b59050 100644 --- a/build.fsx +++ b/build.fsx @@ -122,7 +122,7 @@ Target "NBench" <| fun _ -> let nbenchTestPath = findToolInSubPath "NBench.Runner.exe" (toolsDir @@ "NBench.Runner*") printfn "Using NBench.Runner: %s" nbenchTestPath - let nbenchTestAssemblies = !! "./src/**/*Tests.Performance.dll" // doesn't support .NET Core at the moment + let nbenchTestAssemblies = !! "./src/Akka.Persistence.Azure.Tests.Performance/bin/Release/**/*Tests.Performance.dll" // doesn't support .NET Core at the moment let runNBench assembly = let includes = getBuildParam "include" @@ -264,7 +264,7 @@ Target "Nuget" DoNothing // all "BuildRelease" ==> "All" "RunTests" ==> "All" -//"NBench" ==> "All" +"NBench" ==> "All" "Nuget" ==> "All" RunTargetOrDefault "Help" \ No newline at end of file diff --git a/build.ps1 b/build.ps1 index f2180fc..f5a04cd 100644 --- a/build.ps1 +++ b/build.ps1 @@ -30,7 +30,7 @@ Param( ) $FakeVersion = "4.61.2" -$NBenchVersion = "1.0.1" +$NBenchVersion = "1.0.4" $DotNetChannel = "LTS"; $DotNetVersion = "2.0.0"; $DotNetInstallerUri = "https://raw.githubusercontent.com/dotnet/cli/v$DotNetVersion/scripts/obtain/dotnet-install.ps1"; diff --git a/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs b/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs new file mode 100644 index 0000000..531ace1 --- /dev/null +++ b/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs @@ -0,0 +1,16 @@ +using System.Threading.Tasks; +using Akka.Actor; +using Microsoft.WindowsAzure.Storage; + +namespace Akka.Persistence.Azure.TestHelpers +{ + public class DbUtils + { + public static Task CleanupCloudTable(string connectionString, string tableName) + { + var account = CloudStorageAccount.Parse(connectionString); + var table = account.CreateCloudTableClient().GetTableReference(tableName); + return table.DeleteIfExistsAsync(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj index f91b156..003863b 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj +++ b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj @@ -12,6 +12,7 @@ + diff --git a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs new file mode 100644 index 0000000..69025ea --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs @@ -0,0 +1,131 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2018 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Azure.TestHelpers; +using Akka.Util.Internal; +using NBench; + +namespace Akka.Persistence.Azure.Tests.Performance +{ + public class AzureJournalPerfSpecs + { + public const string RecoveryCounterName = "MsgRecovered"; + private Counter _recoveryCounter; + + public const string WriteCounterName = "MsgPersisted"; + private Counter _writeCounter; + + public static AtomicCounter TableVersionCounter = new AtomicCounter(0); + public static string TableName { get; private set; } + + public const int PersistentActorCount = 200; + public const int PersistedMessageCount = 100; + + public static Config JournalConfig() + { + if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) + return JournalConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); + + return JournalConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + } + + public static Config JournalConfig(string connectionString) + { + TableName = "TestTable" + TableVersionCounter.IncrementAndGet(); + + return ConfigurationFactory.ParseString( + @"akka.loglevel = DEBUG + akka.log-config-on-start = on + akka.persistence.journal.azure-table.class = ""Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure"" + akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" + akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" + akka.persistence.journal.azure-table.verbose-logging = on") + .WithFallback("akka.persistence.journal.azure-table.table-name=" + TableName); + } + + private ActorSystem ActorSystem { get; set; } + + private Dictionary _persistentActors = new Dictionary(); + + [PerfSetup] + public void Setup(BenchmarkContext context) + { + _recoveryCounter = context.GetCounter(RecoveryCounterName); + _writeCounter = context.GetCounter(WriteCounterName); + + + ActorSystem = Actor.ActorSystem.Create(nameof(AzureJournalPerfSpecs) + TableVersionCounter.Current, JournalConfig()); + Console.WriteLine(ActorSystem.Settings.Config.ToString()); + foreach (var i in Enumerable.Range(0, PersistentActorCount)) + { + var id = "persistent" + Guid.NewGuid(); + var actorRef = + ActorSystem.ActorOf( + Props.Create(() => new PersistentJournalBenchmarkActor(id, _recoveryCounter, _writeCounter)), + id); + + _persistentActors[id] = actorRef; + } + } + + [PerfBenchmark(NumberOfIterations = 5, RunMode = RunMode.Iterations, + Description = "Write performance spec by 200 persistent actors", SkipWarmups = true)] + [CounterMeasurement(RecoveryCounterName)] + [CounterMeasurement(WriteCounterName)] + [GcMeasurement(GcMetric.TotalCollections, GcGeneration.AllGc)] + [MemoryMeasurement(MemoryMetric.TotalBytesAllocated)] + public void BatchJournalWriteSpec(BenchmarkContext context) + { + foreach (var i in Enumerable.Range(0, PersistedMessageCount)) + { + foreach (var actor in _persistentActors) + { + actor.Value.Tell(i); + } + } + + using (var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1))) + { + var tasks = new List(); + foreach (var actor in _persistentActors) + { + tasks.Add(actor.Value.Ask(r => + new PersistentBenchmarkMsgs.NotifyWhenCounterHits(PersistedMessageCount, r), null, cts.Token)); + } + + try + { + Task.WaitAll(tasks.ToArray(), cts.Token); + } + catch(Exception ex) + { + context.Trace.Error(ex, "Failed to process results after 1 minute"); + return; + } + } + + } + + [PerfCleanup] + public void CleanUp() + { + ActorSystem.Terminate().Wait(); + + try + { + DbUtils.CleanupCloudTable(AzurePersistence.Get(ActorSystem).TableSettings.ConnectionString, TableName).Wait(TimeSpan.FromSeconds(3)); + } + catch { } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs b/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs new file mode 100644 index 0000000..1286880 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs @@ -0,0 +1,36 @@ +using Akka.Actor; + +namespace Akka.Persistence.Azure.Tests.Performance +{ + /// + /// Messages used for working with benchmark actors + /// + public static class PersistentBenchmarkMsgs + { + public sealed class RecoveryComplete : INoSerializationVerificationNeeded + { + public static readonly RecoveryComplete Instance = new RecoveryComplete(); + private RecoveryComplete() { } + } + + public sealed class ResetCounter : INoSerializationVerificationNeeded + { + public static readonly ResetCounter Instance = new ResetCounter(); + + private ResetCounter() { } + } + + public sealed class NotifyWhenCounterHits + { + public NotifyWhenCounterHits(int target, IActorRef subscriber) + { + Target = target; + Subscriber = subscriber; + } + + public int Target { get; } + + public IActorRef Subscriber { get; } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs b/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs new file mode 100644 index 0000000..86043f5 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs @@ -0,0 +1,76 @@ +using Akka.Actor; +using Akka.Event; +using NBench; + +namespace Akka.Persistence.Azure.Tests.Performance +{ + public class PersistentJournalBenchmarkActor : ReceivePersistentActor + { + private readonly Counter _recoveredMessageCounter; + private readonly Counter _msgWriteCounter; + + private readonly ILoggingAdapter _log = Context.GetLogger(); + + private PersistentBenchmarkMsgs.NotifyWhenCounterHits _target; + + /// + /// Our stored value + /// + private int TotalCount { get; set; } + + public PersistentJournalBenchmarkActor(string persistenceId, Counter recoveredMessageCounter, Counter msgWriteCounter) + { + PersistenceId = persistenceId; + _recoveredMessageCounter = recoveredMessageCounter; + _msgWriteCounter = msgWriteCounter; + + Recover(offer => + { + if (offer.Snapshot is int i) + { + TotalCount = i; + } + _recoveredMessageCounter.Increment(); + }); + + Recover(i => + { + TotalCount += i; + _recoveredMessageCounter.Increment(); + }); + + Command(i => + { + Persist(i, i1 => + { + _msgWriteCounter.Increment(); + TotalCount += i1; + TellTargetWhenReady(); + + }); + }); + + Command(n => + { + _target = n; + TellTargetWhenReady(); + }); + + Command(r => + { + Sender.Tell(r); + }); + } + + private void TellTargetWhenReady() + { + if (_target != null && _target.Target <= TotalCount) + { + _log.Info("Notifying that we have hit or exceeded requested target of [{0}] with actual target of [{1}]", _target.Target, TotalCount); + _target.Subscriber.Tell(TotalCount); + } + } + + public override string PersistenceId { get; } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests.Performance/UnitTest1.cs b/src/Akka.Persistence.Azure.Tests.Performance/UnitTest1.cs deleted file mode 100644 index 80276d4..0000000 --- a/src/Akka.Persistence.Azure.Tests.Performance/UnitTest1.cs +++ /dev/null @@ -1,31 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2015 - 2018 Petabridge, LLC -// -// ----------------------------------------------------------------------- - -using NBench; - -namespace Akka.Persistence.Azure.Tests.Performance -{ - public class UnitTest1 - { - public const string CounterName = "Operations"; - private Counter _opsCounter; - - [PerfSetup] - public void Setup(BenchmarkContext context) - { - _opsCounter = context.GetCounter(CounterName); - } - - [PerfBenchmark(NumberOfIterations = 5, RunMode = RunMode.Throughput, RunTimeMilliseconds = 1000)] - [CounterMeasurement(CounterName)] - [GcMeasurement(GcMetric.TotalCollections, GcGeneration.AllGc)] - [MemoryMeasurement(MemoryMetric.TotalBytesAllocated)] - public void TestMethod1() - { - _opsCounter.Increment(); - } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs index 3502001..4cf4ed9 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs @@ -5,11 +5,14 @@ // ----------------------------------------------------------------------- using System; +using System.Threading.Tasks; +using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.TCK.Journal; using Akka.Util.Internal; using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; using Xunit; using Xunit.Abstractions; @@ -52,10 +55,7 @@ public static Config JournalConfig(string connectionString) protected override void Dispose(bool disposing) { base.Dispose(disposing); - var connectionString = AzurePersistence.Get(Sys).TableSettings.ConnectionString; - var account = CloudStorageAccount.Parse(connectionString); - var table = account.CreateCloudTableClient().GetTableReference(TableName); - if (table.DeleteIfExistsAsync().Wait(TimeSpan.FromSeconds(3))) + if (DbUtils.CleanupCloudTable(AzurePersistence.Get(Sys).TableSettings.ConnectionString, TableName).Wait(TimeSpan.FromSeconds(3))) { Log.Info("Successfully deleted table [{0}] after test run.", TableName); } From e84201ddc959c42a60d41299556899b6b9e69a43 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sun, 1 Jul 2018 23:52:51 -0500 Subject: [PATCH 03/19] upgraded to NBench v1.1 (#9) --- .../Akka.Persistence.Azure.Tests.Performance.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj index 003863b..6d698db 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj +++ b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj @@ -8,7 +8,7 @@ - + From 4a0c5ae5ef4d019d5583c38304b1fce708b1eb75 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 2 Jul 2018 00:57:50 -0500 Subject: [PATCH 04/19] came up with a better way of creating load on the journal (#8) * came up with a better way of creating load on the jounrnal * break up the load generation into smaller chunks * working on redesigning the benchmark to work a bit more like the Sqlite one from Akka.NET * fixed config * more fixes to benchmarks * gave NBench tables a unique naming schem to help avoid conflicts per #10 * waiting indefinitely * cleaned up some of the recording code * pumped up the number of messages per actor --- .../AzureJournalPerfSpecs.cs | 63 +++++++++---------- .../PersistentBenchmarkMsgs.cs | 44 +++++++++---- .../PersistentJournalBenchmarkActor.cs | 57 +++++------------ .../AzureTableJournalSpec.cs | 1 + .../Journal/AzureTableStorageJournal.cs | 4 +- 5 files changed, 80 insertions(+), 89 deletions(-) diff --git a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs index 69025ea..300a9f8 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs @@ -29,7 +29,7 @@ public class AzureJournalPerfSpecs public static string TableName { get; private set; } public const int PersistentActorCount = 200; - public const int PersistedMessageCount = 100; + public const int PersistedMessageCount = 20; public static Config JournalConfig() { @@ -41,21 +41,20 @@ public static Config JournalConfig() public static Config JournalConfig(string connectionString) { - TableName = "TestTable" + TableVersionCounter.IncrementAndGet(); - + TableName = "PerfTestTable" + TableVersionCounter.IncrementAndGet(); + return ConfigurationFactory.ParseString( - @"akka.loglevel = DEBUG - akka.log-config-on-start = on + @"akka.loglevel = INFO akka.persistence.journal.azure-table.class = ""Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure"" akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" - akka.persistence.journal.azure-table.verbose-logging = on") + akka.persistence.journal.azure-table.verbose-logging = off") .WithFallback("akka.persistence.journal.azure-table.table-name=" + TableName); } private ActorSystem ActorSystem { get; set; } - private Dictionary _persistentActors = new Dictionary(); + private List _persistentActors = new List(PersistentActorCount); [PerfSetup] public void Setup(BenchmarkContext context) @@ -63,57 +62,53 @@ public void Setup(BenchmarkContext context) _recoveryCounter = context.GetCounter(RecoveryCounterName); _writeCounter = context.GetCounter(WriteCounterName); - + ActorSystem = Actor.ActorSystem.Create(nameof(AzureJournalPerfSpecs) + TableVersionCounter.Current, JournalConfig()); - Console.WriteLine(ActorSystem.Settings.Config.ToString()); + foreach (var i in Enumerable.Range(0, PersistentActorCount)) { var id = "persistent" + Guid.NewGuid(); var actorRef = ActorSystem.ActorOf( - Props.Create(() => new PersistentJournalBenchmarkActor(id, _recoveryCounter, _writeCounter)), + Props.Create(() => new PersistentJournalBenchmarkActor(id)), id); - _persistentActors[id] = actorRef; + _persistentActors.Add(actorRef); } + + // force the system to initialize + Task.WaitAll(_persistentActors.Select(a => a.Ask(PersistentBenchmarkMsgs.Init.Instance)).Cast().ToArray()); } - [PerfBenchmark(NumberOfIterations = 5, RunMode = RunMode.Iterations, + [PerfBenchmark(NumberOfIterations = 5, RunMode = RunMode.Iterations, Description = "Write performance spec by 200 persistent actors", SkipWarmups = true)] [CounterMeasurement(RecoveryCounterName)] [CounterMeasurement(WriteCounterName)] [GcMeasurement(GcMetric.TotalCollections, GcGeneration.AllGc)] [MemoryMeasurement(MemoryMetric.TotalBytesAllocated)] + [TimingMeasurement] public void BatchJournalWriteSpec(BenchmarkContext context) { - foreach (var i in Enumerable.Range(0, PersistedMessageCount)) - { - foreach (var actor in _persistentActors) + for (int i = 0; i < PersistedMessageCount; i++) + for (int j = 0; j < PersistentActorCount; j++) { - actor.Value.Tell(i); + _persistentActors[j].Tell(new PersistentBenchmarkMsgs.Store(1)); } - } - using (var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1))) + var finished = new Task[PersistentActorCount]; + for (int i = 0; i < PersistentActorCount; i++) { - var tasks = new List(); - foreach (var actor in _persistentActors) - { - tasks.Add(actor.Value.Ask(r => - new PersistentBenchmarkMsgs.NotifyWhenCounterHits(PersistedMessageCount, r), null, cts.Token)); - } + var task = _persistentActors[i] + .Ask(PersistentBenchmarkMsgs.Finish.Instance); - try - { - Task.WaitAll(tasks.ToArray(), cts.Token); - } - catch(Exception ex) - { - context.Trace.Error(ex, "Failed to process results after 1 minute"); - return; - } + finished[i] = task; + } + + Task.WaitAll(finished.Cast().ToArray()); + foreach (var task in finished.Where(x => x.IsCompleted)) + { + _writeCounter.Increment(task.Result.State); } - } [PerfCleanup] diff --git a/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs b/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs index 1286880..8efb5c7 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs +++ b/src/Akka.Persistence.Azure.Tests.Performance/PersistentBenchmarkMsgs.cs @@ -7,30 +7,50 @@ namespace Akka.Persistence.Azure.Tests.Performance /// public static class PersistentBenchmarkMsgs { - public sealed class RecoveryComplete : INoSerializationVerificationNeeded + public sealed class Init { - public static readonly RecoveryComplete Instance = new RecoveryComplete(); - private RecoveryComplete() { } + public static readonly Init Instance = new Init(); + private Init() { } } - public sealed class ResetCounter : INoSerializationVerificationNeeded + public sealed class Finish { - public static readonly ResetCounter Instance = new ResetCounter(); + public static readonly Finish Instance = new Finish(); + private Finish() { } + } + public sealed class Done + { + public static readonly Done Instance = new Done(); + private Done() { } + } + public sealed class Finished + { + public readonly long State; - private ResetCounter() { } + public Finished(long state) + { + State = state; + } } - public sealed class NotifyWhenCounterHits + public sealed class Store { - public NotifyWhenCounterHits(int target, IActorRef subscriber) + public readonly int Value; + + public Store(int value) { - Target = target; - Subscriber = subscriber; + Value = value; } + } - public int Target { get; } + public sealed class Stored + { + public readonly int Value; - public IActorRef Subscriber { get; } + public Stored(int value) + { + Value = value; + } } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs b/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs index 86043f5..bafa2cb 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs +++ b/src/Akka.Persistence.Azure.Tests.Performance/PersistentJournalBenchmarkActor.cs @@ -1,76 +1,53 @@ +using System.Linq; using Akka.Actor; using Akka.Event; using NBench; +using static Akka.Persistence.Azure.Tests.Performance.PersistentBenchmarkMsgs; namespace Akka.Persistence.Azure.Tests.Performance { public class PersistentJournalBenchmarkActor : ReceivePersistentActor { - private readonly Counter _recoveredMessageCounter; - private readonly Counter _msgWriteCounter; - private readonly ILoggingAdapter _log = Context.GetLogger(); - private PersistentBenchmarkMsgs.NotifyWhenCounterHits _target; - /// /// Our stored value /// private int TotalCount { get; set; } - public PersistentJournalBenchmarkActor(string persistenceId, Counter recoveredMessageCounter, Counter msgWriteCounter) + public PersistentJournalBenchmarkActor(string persistenceId) { PersistenceId = persistenceId; - _recoveredMessageCounter = recoveredMessageCounter; - _msgWriteCounter = msgWriteCounter; - - Recover(offer => - { - if (offer.Snapshot is int i) - { - TotalCount = i; - } - _recoveredMessageCounter.Increment(); - }); - Recover(i => + Recover(i => { - TotalCount += i; - _recoveredMessageCounter.Increment(); + TotalCount += i.Value; }); - Command(i => + Command(store => { - Persist(i, i1 => + Persist(new Stored(store.Value), s => { - _msgWriteCounter.Increment(); - TotalCount += i1; - TellTargetWhenReady(); - + TotalCount += s.Value; }); }); - Command(n => + Command(i => { - _target = n; - TellTargetWhenReady(); + var sender = Sender; + Persist(new Stored(0), s => + { + TotalCount += s.Value; + sender.Tell(PersistentBenchmarkMsgs.Done.Instance); + }); }); - Command(r => + Command(r => { - Sender.Tell(r); + Sender.Tell(new Finished(TotalCount)); }); } - private void TellTargetWhenReady() - { - if (_target != null && _target.Target <= TotalCount) - { - _log.Info("Notifying that we have hit or exceeded requested target of [{0}] with actual target of [{1}]", _target.Target, TotalCount); - _target.Subscriber.Tell(TotalCount); - } - } - public override string PersistenceId { get; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs index 4cf4ed9..1920787 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs @@ -45,6 +45,7 @@ public static Config JournalConfig(string connectionString) return ConfigurationFactory.ParseString( @"akka.loglevel = DEBUG + akka.log-config-on-start = off akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" akka.persistence.journal.azure-table.verbose-logging = on diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 255590c..70fb348 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -226,9 +226,7 @@ protected override async Task> WriteMessagesAsync(IEnu try { - var results = await Table.ExecuteBatchAsync(batch, - new TableRequestOptions { MaximumExecutionTime = _settings.RequestTimeout }, - new OperationContext()); + var results = await Table.ExecuteBatchAsync(batch); if (_log.IsDebugEnabled && _settings.VerboseLogging) foreach (var r in results) From 274cf844855ed3277ac3871c76f493515cae8f34 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 6 Jul 2018 07:25:54 -0500 Subject: [PATCH 05/19] added ability to log outgoing batch size prior to write (#13) --- .../AzureJournalPerfSpecs.cs | 4 ++-- .../Journal/AzureTableStorageJournal.cs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs index 300a9f8..08c738a 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs @@ -44,11 +44,11 @@ public static Config JournalConfig(string connectionString) TableName = "PerfTestTable" + TableVersionCounter.IncrementAndGet(); return ConfigurationFactory.ParseString( - @"akka.loglevel = INFO + @"akka.loglevel = DEBUG akka.persistence.journal.azure-table.class = ""Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure"" akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" - akka.persistence.journal.azure-table.verbose-logging = off") + akka.persistence.journal.azure-table.verbose-logging = on") .WithFallback("akka.persistence.journal.azure-table.table-name=" + TableName); } diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 70fb348..6058d58 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -208,12 +208,9 @@ protected override async Task> WriteMessagesAsync(IEnu Debug.Assert(atomicWrites.Current != null, "atomicWrites.Current != null"); var batch = new TableBatchOperation(); - using (var persistentMsgs = atomicWrites.Current.Payload - .AsInstanceOf>().GetEnumerator()) + foreach(var currentMsg in atomicWrites.Current.Payload + .AsInstanceOf>()) { - while (persistentMsgs.MoveNext()) - { - var currentMsg = persistentMsgs.Current; Debug.Assert(currentMsg != null, nameof(currentMsg) + " != null"); @@ -221,11 +218,14 @@ protected override async Task> WriteMessagesAsync(IEnu new PersistentJournalEntry(currentMsg.PersistenceId, currentMsg.SequenceNr, _serialization.PersistentToBytes(currentMsg), currentMsg.Manifest)); - } + } try { + if (_log.IsDebugEnabled && _settings.VerboseLogging) + _log.Debug("Attempting to write batch of {0} messages to Azure storage", batch.Count); + var results = await Table.ExecuteBatchAsync(batch); if (_log.IsDebugEnabled && _settings.VerboseLogging) From 6d0e5bdc89b41a77e6a31d29bae9b1ecf18b2a31 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Jul 2018 11:29:12 -0500 Subject: [PATCH 06/19] installed NBench v1.2.0 and am trying to execute it via the runner (#18) * installed NBench v1.2.0 and am trying to execute it via the runner * Added NBench v1.2.1 * Added timeouts to the Azure spec * added expontential backoff for starting Azure table storage journal --- build.fsx | 50 +++++++------------ build.ps1 | 15 ------ .../Akka.Persistence.Azure.TestHelpers.csproj | 2 +- ...Persistence.Azure.Tests.Performance.csproj | 5 +- .../AzureJournalPerfSpecs.cs | 9 ++-- .../Akka.Persistence.Azure.csproj | 8 +-- .../Journal/AzureTableStorageJournal.cs | 45 ++++++++++++----- 7 files changed, 63 insertions(+), 71 deletions(-) diff --git a/build.fsx b/build.fsx index 6b59050..debeb97 100644 --- a/build.fsx +++ b/build.fsx @@ -119,38 +119,26 @@ Target "RunTests" (fun _ -> ) Target "NBench" <| fun _ -> - let nbenchTestPath = findToolInSubPath "NBench.Runner.exe" (toolsDir @@ "NBench.Runner*") - printfn "Using NBench.Runner: %s" nbenchTestPath - - let nbenchTestAssemblies = !! "./src/Akka.Persistence.Azure.Tests.Performance/bin/Release/**/*Tests.Performance.dll" // doesn't support .NET Core at the moment - - let runNBench assembly = - let includes = getBuildParam "include" - let excludes = getBuildParam "exclude" - let teamcityStr = (getBuildParam "teamcity") - let enableTeamCity = - match teamcityStr with - | null -> false - | "" -> false - | _ -> bool.Parse teamcityStr - - let args = StringBuilder() - |> append assembly - |> append (sprintf "output-directory=\"%s\"" outputPerfTests) - |> append (sprintf "concurrent=\"%b\"" true) - |> append (sprintf "trace=\"%b\"" true) - |> append (sprintf "teamcity=\"%b\"" enableTeamCity) - |> appendIfNotNullOrEmpty includes "include=" - |> appendIfNotNullOrEmpty excludes "include=" - |> toText - - let result = ExecProcess(fun info -> - info.FileName <- nbenchTestPath - info.WorkingDirectory <- (Path.GetDirectoryName (FullName nbenchTestPath)) - info.Arguments <- args) (System.TimeSpan.FromMinutes 45.0) (* Reasonably long-running task. *) - if result <> 0 then failwithf "NBench.Runner failed. %s %s" nbenchTestPath args + let projects = + match (isWindows) with + | true -> !! "./src/**/*.Tests.Performance.csproj" + | _ -> !! "./src/**/*.Tests.Performance.csproj" // if you need to filter specs for Linux vs. Windows, do it here + + + let runSingleProject project = + let arguments = + match (hasTeamCity) with + | true -> (sprintf "nbench --nobuild --teamcity --concurrent true --trace true --output %s" (outputPerfTests)) + | false -> (sprintf "nbench --nobuild --concurrent true --trace true --output %s" (outputPerfTests)) + + let result = ExecProcess(fun info -> + info.FileName <- "dotnet" + info.WorkingDirectory <- (Directory.GetParent project).FullName + info.Arguments <- arguments) (TimeSpan.FromMinutes 30.0) + + ResultHandling.failBuildIfXUnitReportedError TestRunnerErrorLevel.DontFailBuild result - nbenchTestAssemblies |> Seq.iter runNBench + projects |> Seq.iter runSingleProject //-------------------------------------------------------------------------------- diff --git a/build.ps1 b/build.ps1 index f5a04cd..1d75c51 100644 --- a/build.ps1 +++ b/build.ps1 @@ -30,7 +30,6 @@ Param( ) $FakeVersion = "4.61.2" -$NBenchVersion = "1.0.4" $DotNetChannel = "LTS"; $DotNetVersion = "2.0.0"; $DotNetInstallerUri = "https://raw.githubusercontent.com/dotnet/cli/v$DotNetVersion/scripts/obtain/dotnet-install.ps1"; @@ -115,20 +114,6 @@ if (!(Test-Path $FakeExePath)) { } } -########################################################################### -# INSTALL NBench Runner -########################################################################### - -# Make sure NBench Runner has been installed. -$NBenchDllPath = Join-Path $ToolPath "NBench.Runner/lib/net45/NBench.Runner.exe" -if (!(Test-Path $NBenchDllPath)) { - Write-Host "Installing NBench..." - Invoke-Expression "&`"$NugetPath`" install NBench.Runner -ExcludeVersion -Version $NBenchVersion -OutputDirectory `"$ToolPath`"" | Out-Null; - if ($LASTEXITCODE -ne 0) { - Throw "An error occured while restoring NBench.Runner from NuGet." - } -} - ########################################################################### # Docfx ########################################################################### diff --git a/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj b/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj index 4ec53a5..95113b8 100644 --- a/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj +++ b/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj @@ -1,7 +1,7 @@  - netstandard2.0 + netstandard1.6 false diff --git a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj index 6d698db..2ae3739 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj +++ b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj @@ -3,12 +3,13 @@ - net461 + net461;netcoreapp2.0 - + + diff --git a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs index 08c738a..e855e6f 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs @@ -31,6 +31,8 @@ public class AzureJournalPerfSpecs public const int PersistentActorCount = 200; public const int PersistedMessageCount = 20; + public static readonly TimeSpan MaxTimeout = TimeSpan.FromMinutes(6); + public static Config JournalConfig() { if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) @@ -44,11 +46,12 @@ public static Config JournalConfig(string connectionString) TableName = "PerfTestTable" + TableVersionCounter.IncrementAndGet(); return ConfigurationFactory.ParseString( - @"akka.loglevel = DEBUG + @"akka.loglevel = INFO + akka.persistence.max-concurrent-recoveries = 25 # since Azure seems to have some trouble at 50 akka.persistence.journal.azure-table.class = ""Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure"" akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" - akka.persistence.journal.azure-table.verbose-logging = on") + akka.persistence.journal.azure-table.verbose-logging = off") .WithFallback("akka.persistence.journal.azure-table.table-name=" + TableName); } @@ -99,7 +102,7 @@ public void BatchJournalWriteSpec(BenchmarkContext context) for (int i = 0; i < PersistentActorCount; i++) { var task = _persistentActors[i] - .Ask(PersistentBenchmarkMsgs.Finish.Instance); + .Ask(PersistentBenchmarkMsgs.Finish.Instance, MaxTimeout); finished[i] = task; } diff --git a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj index 8eaaf72..4d84f98 100644 --- a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj +++ b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj @@ -3,16 +3,10 @@ - netstandard2.0 + netstandard1.6 Akka.Persistence support for Windows Azure Table storage and Azure blob storage. - - - - - - diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 6058d58..2d5120c 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -43,25 +43,46 @@ public AzureTableStorageJournal() _serialization = new SerializationHelper(Context.System); _storageAccount = CloudStorageAccount.Parse(_settings.ConnectionString); - _tableStorage = new Lazy(() => InitCloudStorage().Result); + _tableStorage = new Lazy(() => InitCloudStorage(5).Result); } public CloudTable Table => _tableStorage.Value; - private async Task InitCloudStorage() + private static readonly Dictionary RetryInterval = new Dictionary() { - var tableClient = _storageAccount.CreateCloudTableClient(); - var tableRef = tableClient.GetTableReference(_settings.TableName); - var op = new OperationContext(); - using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) + { 5, TimeSpan.FromMilliseconds(100) }, + { 4, TimeSpan.FromMilliseconds(500) }, + { 3, TimeSpan.FromMilliseconds(1000) }, + { 2, TimeSpan.FromMilliseconds(2000) }, + { 1, TimeSpan.FromMilliseconds(4000) }, + { 0, TimeSpan.FromMilliseconds(8000) }, + }; + + private async Task InitCloudStorage(int remainingTries) + { + try { - if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token)) - _log.Info("Created Azure Cloud Table", _settings.TableName); - else - _log.Info("Successfully connected to existing table", _settings.TableName); - } + var tableClient = _storageAccount.CreateCloudTableClient(); + var tableRef = tableClient.GetTableReference(_settings.TableName); + var op = new OperationContext(); + using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) + { + if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token)) + _log.Info("Created Azure Cloud Table", _settings.TableName); + else + _log.Info("Successfully connected to existing table", _settings.TableName); + } - return tableRef; + return tableRef; + } + catch (Exception ex) + { + _log.Error(ex, "[{0}] more tries to initialize table storage remaining...", remainingTries); + if (remainingTries == 0) + throw; + await Task.Delay(RetryInterval[remainingTries]); + return await InitCloudStorage(remainingTries - 1); + } } protected override void PreStart() From 365d8035f52d1cfdd5577b9856117043b9b7938f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 16 Jul 2018 11:01:49 -0500 Subject: [PATCH 07/19] Update README.md (#19) --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b25dd21..31e4493 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # Akka.Persistence.Azure -Update this readme file with your details. +**Currently not released** + +We're still in the process of developing a high-throughput journal and snapshot store system around Azure Table and Blob storage. We expect this to be released soon, however. ## Building this solution To run the build script associated with this solution, execute the following: @@ -49,4 +51,4 @@ If you add any new projects to the solution created with this template, be sure ``` -``` \ No newline at end of file +``` From f3e4952e6c659026591d532396052621ce8fffd5 Mon Sep 17 00:00:00 2001 From: Isaac Z <46451831+izavala@users.noreply.github.com> Date: Tue, 9 Apr 2019 14:28:27 -0500 Subject: [PATCH 08/19] Update Azure Pipeline (#23) --- build-system/README.md | 8 +++++ build-system/azure-pipeline.template.yaml | 43 +++++++++++++++++++++++ build-system/linux-pr-validation.yaml | 22 ++++++++++++ build-system/windows-pr-validation.yaml | 22 ++++++++++++ build-system/windows-release.yaml | 38 ++++++++++++++++++++ 5 files changed, 133 insertions(+) create mode 100644 build-system/README.md create mode 100644 build-system/azure-pipeline.template.yaml create mode 100644 build-system/linux-pr-validation.yaml create mode 100644 build-system/windows-pr-validation.yaml create mode 100644 build-system/windows-release.yaml diff --git a/build-system/README.md b/build-system/README.md new file mode 100644 index 0000000..699c2d4 --- /dev/null +++ b/build-system/README.md @@ -0,0 +1,8 @@ +# Azure Pipelines Build Files +These `.yaml` files are used by Windows Azure DevOps Pipelines to help execute the following types of builds: + +- Pull request validation on Linux (Mono / .NET Core) +- Pull request validation on Windows (.NET Framework / .NET Core) +- NuGet releases with automatic release notes posted to a Github Release repository. + +**NOTE**: you will need to change some of the pipeline variables inside the `windows-release.yaml` for your specific project and you will also want to create variable groups with your signing and NuGet push information. \ No newline at end of file diff --git a/build-system/azure-pipeline.template.yaml b/build-system/azure-pipeline.template.yaml new file mode 100644 index 0000000..6bd6870 --- /dev/null +++ b/build-system/azure-pipeline.template.yaml @@ -0,0 +1,43 @@ +parameters: + name: '' + vmImage: '' + scriptFileName: '' + scriptArgs: 'all' + timeoutInMinutes: 120 + +jobs: + - job: ${{ parameters.name }} + timeoutInMinutes: ${{ parameters.timeoutInMinutes }} + pool: + vmImage: ${{ parameters.vmImage }} + steps: + - checkout: self # self represents the repo where the initial Pipelines YAML file was found + clean: false # whether to fetch clean each time + submodules: recursive # set to 'true' for a single level of submodules or 'recursive' to get submodules of submodules + persistCredentials: true + # Linux or macOS + - task: Bash@3 + displayName: Linux / OSX Build + inputs: + filePath: ${{ parameters.scriptFileName }} + arguments: ${{ parameters.scriptArgs }} + continueOnError: true + condition: in( variables['Agent.OS'], 'Linux', 'Darwin' ) + # Windows + - task: BatchScript@1 + displayName: Windows Build + inputs: + filename: ${{ parameters.scriptFileName }} + arguments: ${{ parameters.scriptArgs }} + continueOnError: true + condition: eq( variables['Agent.OS'], 'Windows_NT' ) + - task: PublishTestResults@2 + inputs: + testRunner: VSTest + testResultsFiles: '**/*.trx' #TestResults folder usually + testRunTitle: ${{ parameters.name }} + mergeTestResults: true + - script: 'echo 1>&2' + failOnStderr: true + displayName: 'If above is partially succeeded, then fail' + condition: eq(variables['Agent.JobStatus'], 'SucceededWithIssues') \ No newline at end of file diff --git a/build-system/linux-pr-validation.yaml b/build-system/linux-pr-validation.yaml new file mode 100644 index 0000000..61f748e --- /dev/null +++ b/build-system/linux-pr-validation.yaml @@ -0,0 +1,22 @@ +# Pull request validation for Linux against the `dev` and `master` branches +# See https://docs.microsoft.com/en-us/azure/devops/pipelines/yaml-schema for reference +trigger: + branches: + include: + - dev + - master + +name: $(Year:yyyy).$(Month).$(DayOfMonth)$(Rev:.r) + +pr: + autoCancel: true # indicates whether additional pushes to a PR should cancel in-progress runs for the same PR. Defaults to true + branches: + include: [ dev, master ] # branch names which will trigger a build + +jobs: +- template: azure-pipeline.template.yaml + parameters: + name: Ubuntu + vmImage: 'ubuntu-16.04' + scriptFileName: ./build.sh + scriptArgs: all \ No newline at end of file diff --git a/build-system/windows-pr-validation.yaml b/build-system/windows-pr-validation.yaml new file mode 100644 index 0000000..47f6ea3 --- /dev/null +++ b/build-system/windows-pr-validation.yaml @@ -0,0 +1,22 @@ +# Pull request validation for Windows against the `dev` and `master` branches +# See https://docs.microsoft.com/en-us/azure/devops/pipelines/yaml-schema for reference +trigger: + branches: + include: + - dev + - master + +pr: + autoCancel: true # indicates whether additional pushes to a PR should cancel in-progress runs for the same PR. Defaults to true + branches: + include: [ dev, master ] # branch names which will trigger a build + +name: $(Year:yyyy).$(Month).$(DayOfMonth)$(Rev:.r) + +jobs: +- template: azure-pipeline.template.yaml + parameters: + name: Windows + vmImage: 'vs2017-win2016' + scriptFileName: build.cmd + scriptArgs: all \ No newline at end of file diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml new file mode 100644 index 0000000..d9cb531 --- /dev/null +++ b/build-system/windows-release.yaml @@ -0,0 +1,38 @@ +# Release task for PbLib projects +# See https://docs.microsoft.com/en-us/azure/devops/pipelines/yaml-schema for reference + +pool: + vmImage: vs2017-win2016 + demands: Cmd + +trigger: + branches: + include: + - refs/tags/* + +variables: + - group: signingSecrets #create this group with SECRET variables `signingUsername` and `signingPassword` + - group: nugetKeys #create this group with SECRET variables `nugetKey` + - name: githubConnectionName + value: yourConnection #replace this + - name: projectName + value: yourProjectName #replace this + - name: githubRepositoryName + value: yourOrganization/yourRepo #replace this + +steps: +- task: BatchScript@1 + displayName: 'FAKE Build' + inputs: + filename: build.cmd + arguments: 'All SignClientUser=$(signingUsername) SignClientSecret=$(signingPassword) nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(nugetKey)' + +- task: GitHubRelease@0 + displayName: 'GitHub release (create)' + inputs: + gitHubConnection: $(githubConnectionName) + repositoryName: $(githubRepositoryName) + title: '$(projectName) v$(Build.SourceBranchName)' + releaseNotesFile: 'RELEASE_NOTES.md' + assets: | + bin\nuget\*.nupkg \ No newline at end of file From ebd20c5cf25475cc7dc80084605ab3d3b72f0a4f Mon Sep 17 00:00:00 2001 From: Isaac Z <46451831+izavala@users.noreply.github.com> Date: Tue, 9 Apr 2019 16:48:58 -0500 Subject: [PATCH 09/19] Update_Azure_Connection_String (#24) * Test * Update secrete key --- build-system/README.md | 2 +- build-system/azure-pipeline.template.yaml | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/build-system/README.md b/build-system/README.md index 699c2d4..d6368f7 100644 --- a/build-system/README.md +++ b/build-system/README.md @@ -5,4 +5,4 @@ These `.yaml` files are used by Windows Azure DevOps Pipelines to help execute t - Pull request validation on Windows (.NET Framework / .NET Core) - NuGet releases with automatic release notes posted to a Github Release repository. -**NOTE**: you will need to change some of the pipeline variables inside the `windows-release.yaml` for your specific project and you will also want to create variable groups with your signing and NuGet push information. \ No newline at end of file +**NOTE**: you will need to change some of the pipeline variables inside the `windows-release.yaml` for your specific project and you will also want to create variable groups with your signing and NuGet push information. diff --git a/build-system/azure-pipeline.template.yaml b/build-system/azure-pipeline.template.yaml index 6bd6870..dd59d85 100644 --- a/build-system/azure-pipeline.template.yaml +++ b/build-system/azure-pipeline.template.yaml @@ -23,6 +23,8 @@ jobs: arguments: ${{ parameters.scriptArgs }} continueOnError: true condition: in( variables['Agent.OS'], 'Linux', 'Darwin' ) + env: + AZURE_CONNECTION_STR: $(azureConnectionString) # Windows - task: BatchScript@1 displayName: Windows Build @@ -31,6 +33,8 @@ jobs: arguments: ${{ parameters.scriptArgs }} continueOnError: true condition: eq( variables['Agent.OS'], 'Windows_NT' ) + env: + AZURE_CONNECTION_STR: $(azureConnectionString) - task: PublishTestResults@2 inputs: testRunner: VSTest From e46d10fc0a081c3d5cf3b7831885f71915dd67b2 Mon Sep 17 00:00:00 2001 From: Isaac Z <46451831+izavala@users.noreply.github.com> Date: Fri, 12 Apr 2019 15:54:48 -0500 Subject: [PATCH 10/19] Updated Build.fx (#26) * Updated Build.fx * Updated spacing * UPdate build file * Updated dotnet version * SDK version * Added signing package code * Added call to clean --- build.fsx | 91 ++++++++++++++++++++++++++++++++++++++++++++--------- build.ps1 | 4 +-- build.sh | 4 +-- global.json | 2 +- 4 files changed, 82 insertions(+), 19 deletions(-) diff --git a/build.fsx b/build.fsx index debeb97..61bab3f 100644 --- a/build.fsx +++ b/build.fsx @@ -18,11 +18,16 @@ let description = "Your description here" let tags = ["";] let configuration = "Release" +// Metadata used when signing packages and DLLs +let signingName = "Akka.Persistence.Azure" +let signingDescription = "Akka.Persistence.Azure library for use with Akka.NET." +let signingUrl = "https://devops.petabridge.com/" + // Read release notes and version let solutionFile = FindFirstMatchingFile "*.sln" __SOURCE_DIRECTORY__ // dynamically look up the solution let buildNumber = environVarOrDefault "BUILD_NUMBER" "0" let hasTeamCity = (not (buildNumber = "0")) // check if we have the TeamCity environment variable for build # set -let preReleaseVersionSuffix = (if (not (buildNumber = "0")) then (buildNumber) else "") + "-beta" +let preReleaseVersionSuffix = "beta" + (if (not (buildNumber = "0")) then (buildNumber) else DateTime.UtcNow.Ticks.ToString()) let versionSuffix = match (getBuildParam "nugetprerelease") with | "dev" -> preReleaseVersionSuffix @@ -45,6 +50,9 @@ let workingDir = output @@ "build" let nugetExe = FullName @"./tools/nuget.exe" Target "Clean" (fun _ -> + ActivateFinalTarget "KillCreatedProcesses" + + CleanDir output CleanDir outputTests CleanDir outputPerfTests @@ -57,13 +65,6 @@ Target "AssemblyInfo" (fun _ -> XmlPokeInnerText "./src/common.props" "//Project/PropertyGroup/PackageReleaseNotes" (releaseNotes.Notes |> String.concat "\n") ) -Target "RestorePackages" (fun _ -> - DotNetCli.Restore - (fun p -> - { p with - Project = solutionFile - NoCache = false }) -) Target "Build" (fun _ -> DotNetCli.Build @@ -104,8 +105,8 @@ Target "RunTests" (fun _ -> let runSingleProject project = let arguments = match (hasTeamCity) with - | true -> (sprintf "xunit -c Release -nobuild -parallel none -teamcity -xml %s_xunit.xml" (outputTests @@ fileNameWithoutExt project)) - | false -> (sprintf "xunit -c Release -nobuild -parallel none -xml %s_xunit.xml" (outputTests @@ fileNameWithoutExt project)) + | true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --results-directory %s -- -parallel none -teamcity" (outputTests)) + | false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --results-directory %s -- -parallel none" (outputTests)) let result = ExecProcess(fun info -> info.FileName <- "dotnet" @@ -140,6 +141,53 @@ Target "NBench" <| fun _ -> projects |> Seq.iter runSingleProject +//-------------------------------------------------------------------------------- +// Code signing targets +//-------------------------------------------------------------------------------- +Target "SignPackages" (fun _ -> + let canSign = hasBuildParam "SignClientSecret" && hasBuildParam "SignClientUser" + if(canSign) then + log "Signing information is available." + + let assemblies = !! (outputNuGet @@ "*.nupkg") + + let signPath = + let globalTool = tryFindFileOnPath "SignClient.exe" + match globalTool with + | Some t -> t + | None -> if isWindows then findToolInSubPath "SignClient.exe" "tools/signclient" + elif isMacOS then findToolInSubPath "SignClient" "tools/signclient" + else findToolInSubPath "SignClient" "tools/signclient" + + let signAssembly assembly = + let args = StringBuilder() + |> append "sign" + |> append "--config" + |> append (__SOURCE_DIRECTORY__ @@ "appsettings.json") + |> append "-i" + |> append assembly + |> append "-r" + |> append (getBuildParam "SignClientUser") + |> append "-s" + |> append (getBuildParam "SignClientSecret") + |> append "-n" + |> append signingName + |> append "-d" + |> append signingDescription + |> append "-u" + |> append signingUrl + |> toText + + let result = ExecProcess(fun info -> + info.FileName <- signPath + info.WorkingDirectory <- __SOURCE_DIRECTORY__ + info.Arguments <- args) (System.TimeSpan.FromMinutes 5.0) (* Reasonably long-running task. *) + if result <> 0 then failwithf "SignClient failed.%s" args + + assemblies |> Seq.iter (signAssembly) + else + log "SignClientSecret not available. Skipping signing" +) //-------------------------------------------------------------------------------- // Nuget targets @@ -209,6 +257,19 @@ Target "DocFx" (fun _ -> DocFxJson = docsPath @@ "docfx.json" }) ) +//-------------------------------------------------------------------------------- +// Cleanup +//-------------------------------------------------------------------------------- + +FinalTarget "KillCreatedProcesses" (fun _ -> + log "Shutting down dotnet build-server" + let result = ExecProcess(fun info -> + info.FileName <- "dotnet" + info.WorkingDirectory <- __SOURCE_DIRECTORY__ + info.Arguments <- "build-server shutdown") (System.TimeSpan.FromMinutes 2.0) + if result <> 0 then failwithf "dotnet build-server shutdown failed" +) + //-------------------------------------------------------------------------------- // Help //-------------------------------------------------------------------------------- @@ -221,6 +282,7 @@ Target "Help" <| fun _ -> " Targets for building:" " * Build Builds" " * Nuget Create and optionally publish nugets packages" + " * SignPackages Signs all NuGet packages, provided that the following arguments are passed into the script: SignClientSecret={secret} and SignClientUser={username}" " * RunTests Runs tests" " * All Builds, run tests, creates and optionally publish nuget packages" " * DocFx Creates a DocFx-based website for this solution" @@ -238,16 +300,17 @@ Target "All" DoNothing Target "Nuget" DoNothing // build dependencies -"Clean" ==> "RestorePackages" ==> "AssemblyInfo" ==> "Build" ==> "BuildRelease" +"Clean" ==> "AssemblyInfo" ==> "Build" ==> "BuildRelease" // tests dependencies +"Clean" ==> "Build" ==> "RunTests" // nuget dependencies -"Clean" ==> "RestorePackages" ==> "Build" ==> "CreateNuget" -"CreateNuget" ==> "PublishNuget" ==> "Nuget" +"Clean" ==> "Build" ==> "CreateNuget" +"CreateNuget" ==> "SignPackages" ==> "PublishNuget" ==> "Nuget" // docs -"BuildRelease" ==> "Docfx" +"Clean" ==> "BuildRelease" ==> "Docfx" // all "BuildRelease" ==> "All" diff --git a/build.ps1 b/build.ps1 index 1d75c51..69d6b00 100644 --- a/build.ps1 +++ b/build.ps1 @@ -31,12 +31,12 @@ Param( $FakeVersion = "4.61.2" $DotNetChannel = "LTS"; -$DotNetVersion = "2.0.0"; +$DotNetVersion = "2.1.500"; $DotNetInstallerUri = "https://raw.githubusercontent.com/dotnet/cli/v$DotNetVersion/scripts/obtain/dotnet-install.ps1"; $NugetVersion = "4.1.0"; $NugetUrl = "https://dist.nuget.org/win-x86-commandline/v$NugetVersion/nuget.exe" $ProtobufVersion = "3.2.0" -$DocfxVersion = "2.21.1" +$DocfxVersion = "2.40.5" # Make sure tools folder exists $PSScriptRoot = Split-Path $MyInvocation.MyCommand.Path -Parent diff --git a/build.sh b/build.sh index 8e70878..402525e 100644 --- a/build.sh +++ b/build.sh @@ -10,8 +10,8 @@ NUGET_EXE=$TOOLS_DIR/nuget.exe NUGET_URL=https://dist.nuget.org/win-x86-commandline/v4.0.0/nuget.exe FAKE_VERSION=4.61.2 FAKE_EXE=$TOOLS_DIR/FAKE/tools/FAKE.exe -DOTNET_VERSION=2.0.0 -DOTNET_INSTALLER_URL=https://raw.githubusercontent.com/dotnet/cli/2.0.0/scripts/obtain/dotnet-install.sh +DOTNET_VERSION=2.1.500 +DOTNET_INSTALLER_URL=https://raw.githubusercontent.com/dotnet/cli/v$DOTNET_VERSION/scripts/obtain/dotnet-install.sh # Define default arguments. TARGET="Default" CONFIGURATION="Release" diff --git a/global.json b/global.json index 4d61027..21e27ce 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "2.0.0" + "version": "2.1.500" } } \ No newline at end of file From 9444d834cd47a3f208aff560e62417511d38ad0e Mon Sep 17 00:00:00 2001 From: Daniel Tandberg Abrahamsen Date: Fri, 21 Jun 2019 11:29:33 +0200 Subject: [PATCH 11/19] Bugfix: When filter blob on timestamp, ticks should be equal or bigger than criteria's min timestamp (#27) --- src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs index 06a88de..eb3b3d6 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs @@ -234,7 +234,7 @@ private static bool FilterBlobTimestamp(SnapshotSelectionCriteria criteria, Clou { var ticks = FetchBlobTimestamp(x); return ticks <= criteria.MaxTimeStamp.Ticks && - (!criteria.MinTimestamp.HasValue || criteria.MinTimestamp.Value.Ticks >= ticks); + (!criteria.MinTimestamp.HasValue || ticks >= criteria.MinTimestamp.Value.Ticks); } private static long FetchBlobTimestamp(CloudBlob x) From 67d42ec232652768201251ade5a1cfeaa24e91fd Mon Sep 17 00:00:00 2001 From: Daniel Tandberg Abrahamsen Date: Sat, 6 Jul 2019 00:41:48 +0200 Subject: [PATCH 12/19] Bugfix: ReadHighestSequenceNumber: Include results from the last request (when continuationToken is null) (#22) --- .../Journal/AzureTableStorageJournal.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 2d5120c..3871337 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -187,16 +187,18 @@ public override async Task ReadHighestSequenceNrAsync(string persistenceId _log.Debug("Entering method ReadHighestSequenceNrAsync"); #endif var sequenceNumberQuery = GenerateHighestSequenceNumberQuery(persistenceId, fromSequenceNr); - var result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, null); + TableQuerySegment result = null; long seqNo = 0L; do { + result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, result?.ContinuationToken); + if (result.Results.Count > 0) + { seqNo = Math.Max(seqNo, result.Results.Max(x => x.SeqNo)); + } - if(result.ContinuationToken != null) - result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, result.ContinuationToken); } while (result.ContinuationToken != null); #if DEBUG From a4ad0c783bcc77d7e9f1a91728236d927b2ecda4 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 5 Jul 2019 18:15:26 -0500 Subject: [PATCH 13/19] disable NBench specs (#34) --- build.fsx | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/build.fsx b/build.fsx index 61bab3f..ede477d 100644 --- a/build.fsx +++ b/build.fsx @@ -11,11 +11,6 @@ open Fake.DocFxHelper // Information about the project for Nuget and Assembly info files let product = "Akka.Persistence.Azure" -let authors = [ "Your name here" ] -let copyright = "Copyright © 2017" -let company = "Your name here" -let description = "Your description here" -let tags = ["";] let configuration = "Release" // Metadata used when signing packages and DLLs @@ -315,7 +310,7 @@ Target "Nuget" DoNothing // all "BuildRelease" ==> "All" "RunTests" ==> "All" -"NBench" ==> "All" +//"NBench" ==> "All" "Nuget" ==> "All" RunTargetOrDefault "Help" \ No newline at end of file From 9c3cea5decad0bffda4d2d82a8c786ec8422b4f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20F=C3=B8lstad?= <32196030+ef-computas@users.noreply.github.com> Date: Sat, 6 Jul 2019 01:20:50 +0200 Subject: [PATCH 14/19] Added validation for table and container names (#3) (#21) --- .../AzurePersistenceConfigSpec.cs | 33 +++++++++++++++++++ .../AzureTableStorageJournalSettings.cs | 13 ++++++++ .../AzureBlobSnapshotStoreSettings.cs | 2 ++ 3 files changed, 48 insertions(+) diff --git a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs index 4b33929..d2df43e 100644 --- a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs @@ -9,6 +9,7 @@ using Akka.Persistence.Azure.Journal; using Akka.Persistence.Azure.Snapshot; using FluentAssertions; +using FluentAssertions.Equivalency; using Xunit; namespace Akka.Persistence.Azure.Tests @@ -58,5 +59,37 @@ public void ShouldParseTableConfig() tableSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3)); tableSettings.VerboseLogging.Should().BeFalse(); } + + [Theory] + [InlineData("fo", "Invalid table name length")] + [InlineData("1foo", "Invalid table name")] + [InlineData("tables", "Reserved table name")] + public void ShouldThrowArgumentExceptionForIllegalTableNames(string tableName, string reason) + { + Action createJournalSettings = () => AzureTableStorageJournalSettings.Create( + ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{ + connection-string = foo + table-name = " + tableName + @" + }").WithFallback(AzurePersistence.DefaultConfig) + .GetConfig("akka.persistence.journal.azure-table")); + + createJournalSettings.Should().Throw(reason); + } + + [Theory] + [InlineData("ba", "Invalid container name length")] + [InlineData("bar--table", "Invalid container name")] + public void ShouldThrowArgumentExceptionForIllegalContainerNames(string containerName, string reason) + { + Action createSnapshotSettings = () => + AzureBlobSnapshotStoreSettings.Create( + ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{ + connection-string = foo + container-name = " + containerName + @" + }").WithFallback(AzurePersistence.DefaultConfig) + .GetConfig("akka.persistence.snapshot-store.azure-blob-store")); + + createSnapshotSettings.Should().Throw(reason); + } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs index fcb1646..10e3ae4 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs @@ -5,7 +5,9 @@ // ----------------------------------------------------------------------- using System; +using System.Linq; using Akka.Configuration; +using Microsoft.WindowsAzure.Storage; namespace Akka.Persistence.Azure.Journal { @@ -14,9 +16,20 @@ namespace Akka.Persistence.Azure.Journal /// public sealed class AzureTableStorageJournalSettings { + + private static readonly string[] ReservedTableNames = {"tables"}; + public AzureTableStorageJournalSettings(string connectionString, string tableName, TimeSpan connectTimeout, TimeSpan requestTimeout, bool verboseLogging) { + NameValidator.ValidateTableName(tableName); + + if (ReservedTableNames.Contains(tableName)) + { + throw new ArgumentException( + "Reserved table name. Check MSDN for more information about valid table naming", nameof(tableName)); + } + ConnectionString = connectionString; TableName = tableName; ConnectTimeout = connectTimeout; diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs index 326d612..8cf2512 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs @@ -6,6 +6,7 @@ using System; using Akka.Configuration; +using Microsoft.WindowsAzure.Storage; namespace Akka.Persistence.Azure.Snapshot { @@ -18,6 +19,7 @@ public sealed class AzureBlobSnapshotStoreSettings public AzureBlobSnapshotStoreSettings(string connectionString, string containerName, TimeSpan connectTimeout, TimeSpan requestTimeout, bool verboseLogging) { + NameValidator.ValidateContainerName(containerName); ConnectionString = connectionString; ContainerName = containerName; RequestTimeout = requestTimeout; From f205cb4db62adfdfbb0b2cd508db078f7f2ae18f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 12 Jul 2019 15:24:35 -0500 Subject: [PATCH 15/19] added windows azure release pipeline (#35) --- build-system/windows-release.yaml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index d9cb531..0bbb027 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -11,21 +11,22 @@ trigger: - refs/tags/* variables: - - group: signingSecrets #create this group with SECRET variables `signingUsername` and `signingPassword` - - group: nugetKeys #create this group with SECRET variables `nugetKey` + - group: Signing Keys #create this group with SECRET variables `signingUsername` and `signingPassword` + - group: NuGet Keys #create this group with SECRET variables `nugetKey` - name: githubConnectionName - value: yourConnection #replace this + value: petabridge #replace this - name: projectName - value: yourProjectName #replace this + value: Akka.Persistence.Azure #replace this - name: githubRepositoryName - value: yourOrganization/yourRepo #replace this + value: petabridge/Akka.Persistence.Azure #replace this steps: - task: BatchScript@1 displayName: 'FAKE Build' inputs: filename: build.cmd - arguments: 'All SignClientUser=$(signingUsername) SignClientSecret=$(signingPassword) nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(nugetKey)' + arguments: 'All SignClientUser=$(signingUsername) SignClientSecret=$(signingPassword) nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(Akka.NET Tools NuGet +)' - task: GitHubRelease@0 displayName: 'GitHub release (create)' From ff0d5c738d9b2d8f8775c0683a3f3e6462e2c9d5 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 12 Jul 2019 15:30:40 -0500 Subject: [PATCH 16/19] Update windows-release.yaml for Azure Pipelines --- build-system/windows-release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index 0bbb027..6df1594 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -14,7 +14,7 @@ variables: - group: Signing Keys #create this group with SECRET variables `signingUsername` and `signingPassword` - group: NuGet Keys #create this group with SECRET variables `nugetKey` - name: githubConnectionName - value: petabridge #replace this + value: Releases #replace this - name: projectName value: Akka.Persistence.Azure #replace this - name: githubRepositoryName From 3b8cefc6d57a1d58b29943afc7d3fb95d5e01027 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 12 Jul 2019 15:32:15 -0500 Subject: [PATCH 17/19] Update windows-release.yaml for Azure Pipelines --- build-system/windows-release.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index 6df1594..0f40d97 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -25,8 +25,7 @@ steps: displayName: 'FAKE Build' inputs: filename: build.cmd - arguments: 'All SignClientUser=$(signingUsername) SignClientSecret=$(signingPassword) nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(Akka.NET Tools NuGet -)' + arguments: 'All SignClientUser=$(signingUsername) SignClientSecret=$(signingPassword) nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(Akka.NET Tools NuGet)' - task: GitHubRelease@0 displayName: 'GitHub release (create)' From 8f94dc8bc7a73e312d49f43629be4d2c9f86e162 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 12 Jul 2019 15:32:59 -0500 Subject: [PATCH 18/19] updated release notes (#36) --- RELEASE_NOTES.md | 4 ++-- src/common.props | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index a66b26a..2603334 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,2 +1,2 @@ -#### 0.1.0 August 14 2017 #### -First release \ No newline at end of file +#### 0.1.0 July 12 2019 #### +**Beta Release of Akka.Persistence.Azure** \ No newline at end of file diff --git a/src/common.props b/src/common.props index d4475b5..a06d731 100644 --- a/src/common.props +++ b/src/common.props @@ -3,7 +3,7 @@ Copyright © 2017 Your Company Your Authors 0.1.0 - First release + Beta Release of Akka.Persistence.Azure** From fcbbc0706f65bdbf54df3f29fb26966000f5cc23 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 12 Jul 2019 15:34:00 -0500 Subject: [PATCH 19/19] =?UTF-8?q?provided=20valid=20names=20for=20all=20Az?= =?UTF-8?q?ure=20containers=20and=20snapshot=20stores=20in=20=E2=80=A6=20(?= =?UTF-8?q?#32)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * provided valid names for all Azure containers and snapshot stores in all tests * fixing randomly generated names to be unique and in compliance with Azure standards * get rid of dashes from container names --- .../AzureBlobStorageSpec.cs | 16 ++++------ .../AzureStorageConfigHelper.cs | 29 +++++++++++++++++++ .../AzureTableJournalSpec.cs | 24 ++++----------- 3 files changed, 39 insertions(+), 30 deletions(-) create mode 100644 src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs diff --git a/src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs index 2772fb6..9b8db3b 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureBlobStorageSpec.cs @@ -10,32 +10,26 @@ using Akka.Persistence.TCK.Snapshot; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { [Collection("AzureSnapshot")] public class AzureBlobStorageSpec : SnapshotStoreSpec { - public AzureBlobStorageSpec(ITestOutputHelper output) : base(SnapshotStoreConfig(), + public AzureBlobStorageSpec(ITestOutputHelper output) : base(Config(), nameof(AzureTableJournalSpec), output) { AzurePersistence.Get(Sys); Initialize(); } - public static Config SnapshotStoreConfig() + public static Config Config() { if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) - return SnapshotStoreConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); + return AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); - return SnapshotStoreConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - } - - public static Config SnapshotStoreConfig(string connectionString) - { - return ConfigurationFactory.ParseString( - @"akka.persistence.snapshot-store.azure-blob-store.connection-string=""+ connectionString +""") - .WithFallback("akka.persistence.snapshot-store.azure-blob-store.container-name=" + Guid.NewGuid()); + return AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs b/src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs new file mode 100644 index 0000000..d265b32 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/AzureStorageConfigHelper.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Akka.Configuration; + +namespace Akka.Persistence.Azure.Tests +{ + public static class AzureStorageConfigHelper + { + public static Config AzureConfig(string connectionString) + { + var tableName = "t" + Guid.NewGuid().ToString().Replace("-", ""); + var containerName = "testcontainer" + Guid.NewGuid(); + + return ConfigurationFactory.ParseString( + @"akka.loglevel = DEBUG + akka.log-config-on-start = off + akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" + akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" + akka.persistence.snapshot-store.azure-blob-store.connection-string=""" + connectionString + @""" + akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.azure-blob-store"" + akka.persistence.journal.azure-table.verbose-logging = on + akka.test.single-expect-default = 3s") + .WithFallback("akka.persistence.journal.azure-table.table-name=" + tableName) + .WithFallback("akka.persistence.snapshot-store.azure-blob-store.container-name=" + containerName); + } + + } +} diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs index 1920787..213da45 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs @@ -15,42 +15,28 @@ using Microsoft.WindowsAzure.Storage.Table; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { [Collection("AzureJournal")] public class AzureTableJournalSpec : JournalSpec { - public static AtomicCounter TableVersionCounter = new AtomicCounter(0); public static string TableName { get; private set; } - public AzureTableJournalSpec(ITestOutputHelper output) : base(JournalConfig(), nameof(AzureTableJournalSpec), + public AzureTableJournalSpec(ITestOutputHelper output) : base(Config(), nameof(AzureTableJournalSpec), output) { AzurePersistence.Get(Sys); Initialize(); } - public static Config JournalConfig() + public static Config Config() { if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) - return JournalConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); + return AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); - return JournalConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - } - - public static Config JournalConfig(string connectionString) - { - TableName = "TestTable" + TableVersionCounter.IncrementAndGet(); - - return ConfigurationFactory.ParseString( - @"akka.loglevel = DEBUG - akka.log-config-on-start = off - akka.persistence.journal.plugin = ""akka.persistence.journal.azure-table"" - akka.persistence.journal.azure-table.connection-string=""" + connectionString + @""" - akka.persistence.journal.azure-table.verbose-logging = on - akka.test.single-expect-default = 3s") - .WithFallback("akka.persistence.journal.azure-table.table-name=" + TableName); + return AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); } protected override void Dispose(bool disposing)