Skip to content

Commit

Permalink
Merge branch 'dev' into #108_Azure_does_not_work_with_Akka.Cluster.Sh…
Browse files Browse the repository at this point in the history
…arding

# Conflicts:
#	src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
#	src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs
#	src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs
#	src/Akka.Persistence.Azure/reference.conf
  • Loading branch information
Arkatufus committed Jul 16, 2020
2 parents 5905773 + 6fc7fad commit 9047a51
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 75 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@

Akka.Persistence implementation that uses Windows Azure table and blob storage.

## Configuration

Here is a default configuration used by this plugin: https://github.com/petabridge/Akka.Persistence.Azure/blob/dev/src/Akka.Persistence.Azure/reference.conf

You will need to provide connection string and Azure Table name for journal, and connection string with container name for Azure Blob Store:
```
# Need to enable plugin
akka.persistence.journal.plugin = akka.persistence.journal.azure-table
akka.persistence.snapshot-store.plugin = akka.persistence.snapshot-store.azure-blob-store
# Configure journal
akka.persistence.journal.azure-table.connection-string = "Your Azure Storage connection string"
akka.persistence.journal.azure-table.table-name = "Your table name"
# Configure snapshots
akka.persistence.snapshot-store.azure-blob-store.connection-string = "Your Azure Storage connection string"
akka.persistence.snapshot-store.azure-blob-store.container-name = "Your container name"
```

## Building this solution
To run the build script associated with this solution, execute the following:

Expand Down
14 changes: 7 additions & 7 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#### 0.6.0-rc2 March 10 2020 ####
**Beta Release of Akka.Persistence.Azure**
#### 0.6.1 July 10 2020 ####
**Release of Akka.Persistence.Azure**

Upgraded Akka.Persistence.Azure v0.5.0 to target [the new Akka.NET v1.4 interfaces](https://getakka.net/community/whats-new/akkadotnet-v1.4.html).
- Default configuration and documentation improvements
- Fixed Akka.Cluster.Sharding support (see https://github.com/petabridge/Akka.Persistence.Azure/issues/98)

----
#### 0.6.0 March 12 2020 ####
**Release of Akka.Persistence.Azure**

Akka.Persistence.Azure v0.5.0 is a major leap forward ahead of v0.1.0. It fully implements Akka.Persistence.Query and fully implements Akka.NET v1.4.0-compatible serialization techniques.

There are still some issues with respect to ordering and result sets from Akka.Persistence.Query and those will be addressed in a future release of Akka.Persistence.Azure.
Updates Akka version to 1.4.1
2 changes: 1 addition & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Target "RunTests" (fun _ ->
info.WorkingDirectory <- (Directory.GetParent project).FullName
info.Arguments <- arguments) (TimeSpan.FromMinutes 30.0)

ResultHandling.failBuildIfXUnitReportedError TestRunnerErrorLevel.DontFailBuild result
ResultHandling.failBuildIfXUnitReportedError TestRunnerErrorLevel.Error result

projects |> Seq.iter (log)
projects |> Seq.iter (runSingleProject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

<ItemGroup>
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="5.10.2" />
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
25 changes: 25 additions & 0 deletions src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ public void ShouldParseDefaultSnapshotConfig()
blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3));
blobSettings.VerboseLogging.Should().BeFalse();
}

[Fact]
public void ShouldProvideDefaultContainerNameValue()
{
var blobSettings =
AzureBlobSnapshotStoreSettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{
connection-string = foo
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.snapshot-store.azure-blob-store"));

blobSettings.ContainerName.Should().Be("akka-persistence-default-container");
}

[Fact]
public void ShouldParseTableConfig()
Expand All @@ -61,6 +74,18 @@ public void ShouldParseTableConfig()
tableSettings.VerboseLogging.Should().BeFalse();
}

[Fact]
public void ShouldProvideDefaultTableNameValue()
{
var tableSettings =
AzureTableStorageJournalSettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{
connection-string = foo
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.journal.azure-table"));
tableSettings.TableName.Should().Be("AkkaPersistenceDefaultTable");
}

[Theory]
[InlineData("fo", "Invalid table name length")]
[InlineData("1foo", "Invalid table name")]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Reflection;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Azure.TestHelpers;
using Akka.Persistence.TCK;
using Akka.Persistence.TCK.Journal;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper;

namespace Akka.Persistence.Azure.Tests
{
public class AzureTableJournalEscapePersistentIdSpec : AzureTableJournalSpec, IClassFixture<WindowsAzureStorageEmulatorFixture>
{
public AzureTableJournalEscapePersistentIdSpec(ITestOutputHelper output) : base(output)
{
}

/// <inheritdoc />
protected override void PreparePersistenceId(string pid)
{
base.PreparePersistenceId(pid);

// Before storage is initialized, let's set Pid to the value that needs to be encoded
var persistenceIdUsedForTests = typeof(PluginSpec).GetField($"<{nameof(Pid)}>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic);
var currentValue = persistenceIdUsedForTests.GetValue(this).ToString();
persistenceIdUsedForTests.SetValue(this, $"some/path/to/encode/{currentValue}");
}
}
}
24 changes: 24 additions & 0 deletions src/Akka.Persistence.Azure.Tests/PartitionKeyEscapeHelperSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Akka.Actor;
using Akka.Persistence.Azure.Util;
using FluentAssertions;
using Xunit;

namespace Akka.Persistence.Azure.Tests
{
public class PartitionKeyEscapeHelperSpecs
{
[Theory]
[InlineData("/system/sharding/areaCoordinator/singleton/coordinator")]
[InlineData("/system/sha$rding/areaCoordinator/single$ton/coordinator$")]
[InlineData("/system/sha$$$rding/areaCoord/inator$$/single$ton/coord$inator$")]
public void Should_escape_correctly(string partitionKey)
{
var escapedKey = PartitionKeyEscapeHelper.Escape(partitionKey);
escapedKey.Should().NotContain("/");
var originalKey = PartitionKeyEscapeHelper.Unescape(escapedKey);
originalKey.Should().Be(partitionKey);
}
}

class A : ReceiveActor{ }
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public AzureTableQueryEdgeCaseSpecs(ITestOutputHelper output)
/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/61
/// </summary>
[Fact]
[Fact(Skip = "Need to fix this in https://github.com/petabridge/Akka.Persistence.Azure/issues/107")]
public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
{
var actor = Sys.ActorOf(TagActor.Props("x"));
Expand All @@ -67,7 +67,7 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
var eventsByTag = await ReadJournal.CurrentEventsByTag(typeof(RealMsg).Name)
.RunAggregate(ImmutableHashSet<EventEnvelope>.Empty, (agg, e) => agg.Add(e), Materializer);

eventsByTag.Count.Should().Be(MessageCount);
eventsByTag.Count.Should().Be(MessageCount, "All events should be loaded by tag");

eventsById.All(x => x.Event is RealMsg).Should().BeTrue("Expected all events by id to be RealMsg");
eventsByTag.All(x => x.Event is RealMsg).Should().BeTrue("Expected all events by tag to be RealMsg");
Expand All @@ -76,7 +76,7 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
/// </summary>
[Fact]
[Fact(Skip = "Need to fix this in https://github.com/petabridge/Akka.Persistence.Azure/issues/107")]
public void Bug80_CurrentEventsByTag_should_Recover_until_end()
{
var actor = Sys.ActorOf(TagActor.Props("y"));
Expand All @@ -93,7 +93,7 @@ public void Bug80_CurrentEventsByTag_should_Recover_until_end()
/// <summary>
/// Making sure EventsByTag didn't break during implementation of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
/// </summary>
[Fact]
[Fact(Skip = "Need to fix this in https://github.com/petabridge/Akka.Persistence.Azure/issues/107")]
public void Bug80_AllEventsByTag_should_Recover_all_messages()
{
var actor = Sys.ActorOf(TagActor.Props("y"));
Expand Down
25 changes: 17 additions & 8 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public override async Task ReplayMessagesAsync(
new Persistent(
deserialized.Payload,
deserialized.SequenceNr,
deserialized.PersistenceId,
PartitionKeyEscapeHelper.Unescape(deserialized.PersistenceId),
deserialized.Manifest,
deserialized.IsDeleted,
ActorRefs.NoSender,
Expand Down Expand Up @@ -366,6 +366,12 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
x => batchItems = batchItems.Add(
new HighestSequenceNrEntry(x.Key, x.Value)));

// Encode partition keys for writing
foreach (var tableEntity in batchItems)
{
tableEntity.PartitionKey = PartitionKeyEscapeHelper.Escape(tableEntity.PartitionKey);
}

batchItems.ForEach(x => persistenceBatch.InsertOrReplace(x));

if (_log.IsDebugEnabled && _settings.VerboseLogging)
Expand All @@ -388,9 +394,11 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
{
var allPersistenceIdsBatch = new TableBatchOperation();

highSequenceNumbers.ForEach(
x => allPersistenceIdsBatch.InsertOrReplace(
new AllPersistenceIdsEntry(x.Key)));
highSequenceNumbers.ForEach(x =>
{
var encodedKey = PartitionKeyEscapeHelper.Escape(x.Key);
allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey));
});

var allPersistenceResults = await Table.ExecuteBatchAsync(allPersistenceIdsBatch);

Expand All @@ -413,6 +421,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(

foreach (var item in kvp.Value)
{
item.PartitionKey = PartitionKeyEscapeHelper.Escape(item.PartitionKey);
eventTagsBatch.InsertOrReplace(item);
}

Expand Down Expand Up @@ -471,7 +480,7 @@ private static TableQuery<HighestSequenceNrEntry> GenerateHighestSequenceNumberQ
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistenceId),
PartitionKeyEscapeHelper.Escape(persistenceId)),
TableOperators.And,
TableQuery.GenerateFilterCondition(
"RowKey",
Expand All @@ -492,7 +501,7 @@ private static TableQuery<PersistentJournalEntry> GeneratePersistentJournalEntry
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistenceId);
PartitionKeyEscapeHelper.Escape(persistenceId));

var highestSequenceNrFilter =
TableQuery.GenerateFilterCondition(
Expand Down Expand Up @@ -584,7 +593,7 @@ private static TableQuery<PersistentJournalEntry> GeneratePersistentJournalEntry
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistentId);
PartitionKeyEscapeHelper.Escape(persistentId));

var highestSequenceNrFilter =
TableQuery.GenerateFilterCondition(
Expand Down Expand Up @@ -634,7 +643,7 @@ private static TableQuery<EventTagEntry> GenerateTaggedMessageQuery(
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
EventTagEntry.GetPartitionKey(replay.Tag));
PartitionKeyEscapeHelper.Escape(EventTagEntry.GetPartitionKey(replay.Tag)));

var utcTicksTRowKeyFilter =
TableQuery.CombineFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace Akka.Persistence.Azure.Journal
/// </summary>
public sealed class AzureTableStorageJournalSettings
{

private static readonly string[] ReservedTableNames = {"tables"};

public AzureTableStorageJournalSettings(
Expand Down
60 changes: 60 additions & 0 deletions src/Akka.Persistence.Azure/Util/PartitionKeyEscapeHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;

namespace Akka.Persistence.Azure.Util
{
/// <summary>
/// PartitionKeyEscapeHelper
/// </summary>
public static class PartitionKeyEscapeHelper
{
/// <summary>
/// Sequence we need to escape
/// </summary>
private const string InvalidSequence = "/";
/// <summary>
/// Sequence we use to escape invalid chars
/// </summary>
/// <remarks>
/// Using $ here to resolve https://github.com/petabridge/Akka.Persistence.Azure/issues/98
/// Actor names never start with $ sign, which helps to decode encoded keys
/// </remarks>
private const string EscapeSequence = "$";

/// <summary>
/// Escape special characters in partition key
/// </summary>
/// <returns>Escaped partition key</returns>
public static string Escape(string partitionKey)
{
var escapedKey = partitionKey;
// First, replate escape sequence in case if it is used in original key
escapedKey = escapedKey.Replace(EscapeSequence, EscapeSequence + EscapeSequence);

// Now escape special chars
escapedKey = escapedKey.Replace(InvalidSequence, EscapeSequence);

return escapedKey;
}

/// <summary>
/// Unescape previously escaped partition key
/// </summary>
/// <returns>Original, unescaped partition key</returns>
public static string Unescape(string escapedKey)
{
var originalKey = escapedKey;
var uuid = Guid.NewGuid().ToString("N");
// Do not touch duplicated escape sequence - we will replace them later
originalKey = originalKey.Replace(EscapeSequence + EscapeSequence, uuid);

// Restore escaped characters
originalKey = originalKey.Replace(EscapeSequence, InvalidSequence);

// Now it is safe to decode duplicated sequences
originalKey = originalKey.Replace(uuid, EscapeSequence);

return originalKey;
}
}
}
Loading

0 comments on commit 9047a51

Please sign in to comment.