Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.6.1 Release #104

Merged
merged 9 commits into from
Jul 10, 2020
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@

Akka.Persistence implementation that uses Windows Azure table and blob storage.

## Configuration

Here is a default configuration used by this plugin: https://github.com/petabridge/Akka.Persistence.Azure/blob/dev/src/Akka.Persistence.Azure/reference.conf

You will need to provide connection string and Azure Table name for journal, and connection string with container name for Azure Blob Store:
```
# Need to enable plugin
akka.persistence.journal.plugin = akka.persistence.journal.azure-table
akka.persistence.snapshot-store.plugin = akka.persistence.snapshot-store.azure-blob-store

# Configure journal
akka.persistence.journal.azure-table.connection-string = "Your Azure Storage connection string"
akka.persistence.journal.azure-table.table-name = "Your table name"

# Configure snapshots
akka.persistence.snapshot-store.azure-blob-store.connection-string = "Your Azure Storage connection string"
akka.persistence.snapshot-store.azure-blob-store.container-name = "Your container name"
```

## Building this solution
To run the build script associated with this solution, execute the following:

Expand Down
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#### 0.6.1 July 10 2020 ####
**Release of Akka.Persistence.Azure**

- Default configuration and documentation improvements
- Fixed Akka.Cluster.Sharding support (see https://github.com/petabridge/Akka.Persistence.Azure/issues/98)

#### 0.6.0 March 12 2020 ####
**Release of Akka.Persistence.Azure**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

<ItemGroup>
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="5.10.2" />
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
25 changes: 25 additions & 0 deletions src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ public void ShouldParseDefaultSnapshotConfig()
blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3));
blobSettings.VerboseLogging.Should().BeFalse();
}

[Fact]
public void ShouldProvideDefaultContainerNameValue()
{
var blobSettings =
AzureBlobSnapshotStoreSettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{
connection-string = foo
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.snapshot-store.azure-blob-store"));

blobSettings.ContainerName.Should().Be("akka-persistence-default-container");
}

[Fact]
public void ShouldParseTableConfig()
Expand All @@ -61,6 +74,18 @@ public void ShouldParseTableConfig()
tableSettings.VerboseLogging.Should().BeFalse();
}

[Fact]
public void ShouldProvideDefaultTableNameValue()
{
var tableSettings =
AzureTableStorageJournalSettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{
connection-string = foo
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.journal.azure-table"));
tableSettings.TableName.Should().Be("AkkaPersistenceDefaultTable");
}

[Fact]
public void ShouldParseQueryConfig()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Reflection;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Azure.TestHelpers;
using Akka.Persistence.TCK;
using Akka.Persistence.TCK.Journal;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper;

namespace Akka.Persistence.Azure.Tests
{
public class AzureTableJournalEscapePersistentIdSpec : AzureTableJournalSpec, IClassFixture<WindowsAzureStorageEmulatorFixture>
{
public AzureTableJournalEscapePersistentIdSpec(ITestOutputHelper output) : base(output)
{
}

/// <inheritdoc />
protected override void PreparePersistenceId(string pid)
{
base.PreparePersistenceId(pid);

// Before storage is initialized, let's set Pid to the value that needs to be encoded
var persistenceIdUsedForTests = typeof(PluginSpec).GetField($"<{nameof(Pid)}>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic);
var currentValue = persistenceIdUsedForTests.GetValue(this).ToString();
persistenceIdUsedForTests.SetValue(this, $"some/path/to/encode/{currentValue}");
}
}
}
24 changes: 24 additions & 0 deletions src/Akka.Persistence.Azure.Tests/PartitionKeyEscapeHelperSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Akka.Actor;
using Akka.Persistence.Azure.Util;
using FluentAssertions;
using Xunit;

namespace Akka.Persistence.Azure.Tests
{
public class PartitionKeyEscapeHelperSpecs
{
[Theory]
[InlineData("/system/sharding/areaCoordinator/singleton/coordinator")]
[InlineData("/system/sha$rding/areaCoordinator/single$ton/coordinator$")]
[InlineData("/system/sha$$$rding/areaCoord/inator$$/single$ton/coord$inator$")]
public void Should_escape_correctly(string partitionKey)
{
var escapedKey = PartitionKeyEscapeHelper.Escape(partitionKey);
escapedKey.Should().NotContain("/");
var originalKey = PartitionKeyEscapeHelper.Unescape(escapedKey);
originalKey.Should().Be(partitionKey);
}
}

class A : ReceiveActor{ }
}
25 changes: 17 additions & 8 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public override async Task ReplayMessagesAsync(
new Persistent(
deserialized.Payload,
deserialized.SequenceNr,
deserialized.PersistenceId,
PartitionKeyEscapeHelper.Unescape(deserialized.PersistenceId),
deserialized.Manifest,
deserialized.IsDeleted,
ActorRefs.NoSender,
Expand Down Expand Up @@ -359,6 +359,12 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
x => batchItems = batchItems.Add(
new HighestSequenceNrEntry(x.Key, x.Value)));

// Encode partition keys for writing
foreach (var tableEntity in batchItems)
{
tableEntity.PartitionKey = PartitionKeyEscapeHelper.Escape(tableEntity.PartitionKey);
}

batchItems.ForEach(x => persistenceBatch.InsertOrReplace(x));

if (_log.IsDebugEnabled && _settings.VerboseLogging)
Expand All @@ -381,9 +387,11 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
{
var allPersistenceIdsBatch = new TableBatchOperation();

highSequenceNumbers.ForEach(
x => allPersistenceIdsBatch.InsertOrReplace(
new AllPersistenceIdsEntry(x.Key)));
highSequenceNumbers.ForEach(x =>
{
var encodedKey = PartitionKeyEscapeHelper.Escape(x.Key);
allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey));
});

var allPersistenceResults = await Table.ExecuteBatchAsync(allPersistenceIdsBatch);

Expand All @@ -406,6 +414,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(

foreach (var item in kvp.Value)
{
item.PartitionKey = PartitionKeyEscapeHelper.Escape(item.PartitionKey);
eventTagsBatch.InsertOrReplace(item);
}

Expand Down Expand Up @@ -464,7 +473,7 @@ private static TableQuery<HighestSequenceNrEntry> GenerateHighestSequenceNumberQ
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistenceId),
PartitionKeyEscapeHelper.Escape(persistenceId)),
TableOperators.And,
TableQuery.GenerateFilterCondition(
"RowKey",
Expand All @@ -485,7 +494,7 @@ private static TableQuery<PersistentJournalEntry> GeneratePersistentJournalEntry
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistenceId);
PartitionKeyEscapeHelper.Escape(persistenceId));

var highestSequenceNrFilter =
TableQuery.GenerateFilterCondition(
Expand Down Expand Up @@ -577,7 +586,7 @@ private static TableQuery<PersistentJournalEntry> GeneratePersistentJournalEntry
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistentId);
PartitionKeyEscapeHelper.Escape(persistentId));

var highestSequenceNrFilter =
TableQuery.GenerateFilterCondition(
Expand Down Expand Up @@ -627,7 +636,7 @@ private static TableQuery<EventTagEntry> GenerateTaggedMessageQuery(
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
EventTagEntry.GetPartitionKey(replay.Tag));
PartitionKeyEscapeHelper.Escape(EventTagEntry.GetPartitionKey(replay.Tag)));

var utcTicksTRowKeyFilter =
TableQuery.CombineFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace Akka.Persistence.Azure.Journal
/// </summary>
public sealed class AzureTableStorageJournalSettings
{

private static readonly string[] ReservedTableNames = {"tables"};

public AzureTableStorageJournalSettings(
Expand Down Expand Up @@ -79,7 +78,6 @@ public static AzureTableStorageJournalSettings Create(Config config)
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);

return new AzureTableStorageJournalSettings(
connectionString,
tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ public AzureBlobSnapshotStoreSettings(string connectionString, string containerN
public static AzureBlobSnapshotStoreSettings Create(Config config)
{
var connectionString = config.GetString("connection-string");
var containerName = config.GetString("container-name");
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);
return new AzureBlobSnapshotStoreSettings(connectionString, containerName, connectTimeout, requestTimeout,
var containerName = config.GetString("container-name");
return new AzureBlobSnapshotStoreSettings(
connectionString,
containerName,
connectTimeout,
requestTimeout,
verbose);
}
}
Expand Down
60 changes: 60 additions & 0 deletions src/Akka.Persistence.Azure/Util/PartitionKeyEscapeHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;

namespace Akka.Persistence.Azure.Util
{
/// <summary>
/// PartitionKeyEscapeHelper
/// </summary>
public static class PartitionKeyEscapeHelper
{
/// <summary>
/// Sequence we need to escape
/// </summary>
private const string InvalidSequence = "/";
/// <summary>
/// Sequence we use to escape invalid chars
/// </summary>
/// <remarks>
/// Using $ here to resolve https://github.com/petabridge/Akka.Persistence.Azure/issues/98
/// Actor names never start with $ sign, which helps to decode encoded keys
/// </remarks>
private const string EscapeSequence = "$";

/// <summary>
/// Escape special characters in partition key
/// </summary>
/// <returns>Escaped partition key</returns>
public static string Escape(string partitionKey)
{
var escapedKey = partitionKey;
// First, replate escape sequence in case if it is used in original key
escapedKey = escapedKey.Replace(EscapeSequence, EscapeSequence + EscapeSequence);

// Now escape special chars
escapedKey = escapedKey.Replace(InvalidSequence, EscapeSequence);

return escapedKey;
}

/// <summary>
/// Unescape previously escaped partition key
/// </summary>
/// <returns>Original, unescaped partition key</returns>
public static string Unescape(string escapedKey)
{
var originalKey = escapedKey;
var uuid = Guid.NewGuid().ToString("N");
// Do not touch duplicated escape sequence - we will replace them later
originalKey = originalKey.Replace(EscapeSequence + EscapeSequence, uuid);

// Restore escaped characters
originalKey = originalKey.Replace(EscapeSequence, InvalidSequence);

// Now it is safe to decode duplicated sequences
originalKey = originalKey.Replace(uuid, EscapeSequence);

return originalKey;
}
}
}
Loading