Skip to content

Commit

Permalink
Merge pull request #104 from petabridge/dev
Browse files Browse the repository at this point in the history
v0.6.1 Release
  • Loading branch information
Aaronontheweb authored Jul 10, 2020
2 parents a8ba723 + dc609bf commit 420e9e4
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 55 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@

Akka.Persistence implementation that uses Windows Azure table and blob storage.

## Configuration

Here is a default configuration used by this plugin: https://github.com/petabridge/Akka.Persistence.Azure/blob/dev/src/Akka.Persistence.Azure/reference.conf

You will need to provide connection string and Azure Table name for journal, and connection string with container name for Azure Blob Store:
```
# Need to enable plugin
akka.persistence.journal.plugin = akka.persistence.journal.azure-table
akka.persistence.snapshot-store.plugin = akka.persistence.snapshot-store.azure-blob-store
# Configure journal
akka.persistence.journal.azure-table.connection-string = "Your Azure Storage connection string"
akka.persistence.journal.azure-table.table-name = "Your table name"
# Configure snapshots
akka.persistence.snapshot-store.azure-blob-store.connection-string = "Your Azure Storage connection string"
akka.persistence.snapshot-store.azure-blob-store.container-name = "Your container name"
```

## Building this solution
To run the build script associated with this solution, execute the following:

Expand Down
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#### 0.6.1 July 10 2020 ####
**Release of Akka.Persistence.Azure**

- Default configuration and documentation improvements
- Fixed Akka.Cluster.Sharding support (see https://github.com/petabridge/Akka.Persistence.Azure/issues/98)

#### 0.6.0 March 12 2020 ####
**Release of Akka.Persistence.Azure**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

<ItemGroup>
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="5.10.2" />
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
25 changes: 25 additions & 0 deletions src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ public void ShouldParseDefaultSnapshotConfig()
blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3));
blobSettings.VerboseLogging.Should().BeFalse();
}

[Fact]
public void ShouldProvideDefaultContainerNameValue()
{
var blobSettings =
AzureBlobSnapshotStoreSettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{
connection-string = foo
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.snapshot-store.azure-blob-store"));

blobSettings.ContainerName.Should().Be("akka-persistence-default-container");
}

[Fact]
public void ShouldParseTableConfig()
Expand All @@ -61,6 +74,18 @@ public void ShouldParseTableConfig()
tableSettings.VerboseLogging.Should().BeFalse();
}

[Fact]
public void ShouldProvideDefaultTableNameValue()
{
var tableSettings =
AzureTableStorageJournalSettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{
connection-string = foo
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.journal.azure-table"));
tableSettings.TableName.Should().Be("AkkaPersistenceDefaultTable");
}

[Fact]
public void ShouldParseQueryConfig()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Reflection;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Azure.TestHelpers;
using Akka.Persistence.TCK;
using Akka.Persistence.TCK.Journal;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper;

namespace Akka.Persistence.Azure.Tests
{
public class AzureTableJournalEscapePersistentIdSpec : AzureTableJournalSpec, IClassFixture<WindowsAzureStorageEmulatorFixture>
{
public AzureTableJournalEscapePersistentIdSpec(ITestOutputHelper output) : base(output)
{
}

/// <inheritdoc />
protected override void PreparePersistenceId(string pid)
{
base.PreparePersistenceId(pid);

// Before storage is initialized, let's set Pid to the value that needs to be encoded
var persistenceIdUsedForTests = typeof(PluginSpec).GetField($"<{nameof(Pid)}>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic);
var currentValue = persistenceIdUsedForTests.GetValue(this).ToString();
persistenceIdUsedForTests.SetValue(this, $"some/path/to/encode/{currentValue}");
}
}
}
24 changes: 24 additions & 0 deletions src/Akka.Persistence.Azure.Tests/PartitionKeyEscapeHelperSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Akka.Actor;
using Akka.Persistence.Azure.Util;
using FluentAssertions;
using Xunit;

namespace Akka.Persistence.Azure.Tests
{
public class PartitionKeyEscapeHelperSpecs
{
[Theory]
[InlineData("/system/sharding/areaCoordinator/singleton/coordinator")]
[InlineData("/system/sha$rding/areaCoordinator/single$ton/coordinator$")]
[InlineData("/system/sha$$$rding/areaCoord/inator$$/single$ton/coord$inator$")]
public void Should_escape_correctly(string partitionKey)
{
var escapedKey = PartitionKeyEscapeHelper.Escape(partitionKey);
escapedKey.Should().NotContain("/");
var originalKey = PartitionKeyEscapeHelper.Unescape(escapedKey);
originalKey.Should().Be(partitionKey);
}
}

class A : ReceiveActor{ }
}
25 changes: 17 additions & 8 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public override async Task ReplayMessagesAsync(
new Persistent(
deserialized.Payload,
deserialized.SequenceNr,
deserialized.PersistenceId,
PartitionKeyEscapeHelper.Unescape(deserialized.PersistenceId),
deserialized.Manifest,
deserialized.IsDeleted,
ActorRefs.NoSender,
Expand Down Expand Up @@ -359,6 +359,12 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
x => batchItems = batchItems.Add(
new HighestSequenceNrEntry(x.Key, x.Value)));

// Encode partition keys for writing
foreach (var tableEntity in batchItems)
{
tableEntity.PartitionKey = PartitionKeyEscapeHelper.Escape(tableEntity.PartitionKey);
}

batchItems.ForEach(x => persistenceBatch.InsertOrReplace(x));

if (_log.IsDebugEnabled && _settings.VerboseLogging)
Expand All @@ -381,9 +387,11 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
{
var allPersistenceIdsBatch = new TableBatchOperation();

highSequenceNumbers.ForEach(
x => allPersistenceIdsBatch.InsertOrReplace(
new AllPersistenceIdsEntry(x.Key)));
highSequenceNumbers.ForEach(x =>
{
var encodedKey = PartitionKeyEscapeHelper.Escape(x.Key);
allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey));
});

var allPersistenceResults = await Table.ExecuteBatchAsync(allPersistenceIdsBatch);

Expand All @@ -406,6 +414,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(

foreach (var item in kvp.Value)
{
item.PartitionKey = PartitionKeyEscapeHelper.Escape(item.PartitionKey);
eventTagsBatch.InsertOrReplace(item);
}

Expand Down Expand Up @@ -464,7 +473,7 @@ private static TableQuery<HighestSequenceNrEntry> GenerateHighestSequenceNumberQ
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistenceId),
PartitionKeyEscapeHelper.Escape(persistenceId)),
TableOperators.And,
TableQuery.GenerateFilterCondition(
"RowKey",
Expand All @@ -485,7 +494,7 @@ private static TableQuery<PersistentJournalEntry> GeneratePersistentJournalEntry
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistenceId);
PartitionKeyEscapeHelper.Escape(persistenceId));

var highestSequenceNrFilter =
TableQuery.GenerateFilterCondition(
Expand Down Expand Up @@ -577,7 +586,7 @@ private static TableQuery<PersistentJournalEntry> GeneratePersistentJournalEntry
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
persistentId);
PartitionKeyEscapeHelper.Escape(persistentId));

var highestSequenceNrFilter =
TableQuery.GenerateFilterCondition(
Expand Down Expand Up @@ -627,7 +636,7 @@ private static TableQuery<EventTagEntry> GenerateTaggedMessageQuery(
TableQuery.GenerateFilterCondition(
"PartitionKey",
QueryComparisons.Equal,
EventTagEntry.GetPartitionKey(replay.Tag));
PartitionKeyEscapeHelper.Escape(EventTagEntry.GetPartitionKey(replay.Tag)));

var utcTicksTRowKeyFilter =
TableQuery.CombineFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace Akka.Persistence.Azure.Journal
/// </summary>
public sealed class AzureTableStorageJournalSettings
{

private static readonly string[] ReservedTableNames = {"tables"};

public AzureTableStorageJournalSettings(
Expand Down Expand Up @@ -79,7 +78,6 @@ public static AzureTableStorageJournalSettings Create(Config config)
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);

return new AzureTableStorageJournalSettings(
connectionString,
tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ public AzureBlobSnapshotStoreSettings(string connectionString, string containerN
public static AzureBlobSnapshotStoreSettings Create(Config config)
{
var connectionString = config.GetString("connection-string");
var containerName = config.GetString("container-name");
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);
return new AzureBlobSnapshotStoreSettings(connectionString, containerName, connectTimeout, requestTimeout,
var containerName = config.GetString("container-name");
return new AzureBlobSnapshotStoreSettings(
connectionString,
containerName,
connectTimeout,
requestTimeout,
verbose);
}
}
Expand Down
60 changes: 60 additions & 0 deletions src/Akka.Persistence.Azure/Util/PartitionKeyEscapeHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;

namespace Akka.Persistence.Azure.Util
{
/// <summary>
/// PartitionKeyEscapeHelper
/// </summary>
public static class PartitionKeyEscapeHelper
{
/// <summary>
/// Sequence we need to escape
/// </summary>
private const string InvalidSequence = "/";
/// <summary>
/// Sequence we use to escape invalid chars
/// </summary>
/// <remarks>
/// Using $ here to resolve https://github.com/petabridge/Akka.Persistence.Azure/issues/98
/// Actor names never start with $ sign, which helps to decode encoded keys
/// </remarks>
private const string EscapeSequence = "$";

/// <summary>
/// Escape special characters in partition key
/// </summary>
/// <returns>Escaped partition key</returns>
public static string Escape(string partitionKey)
{
var escapedKey = partitionKey;
// First, replate escape sequence in case if it is used in original key
escapedKey = escapedKey.Replace(EscapeSequence, EscapeSequence + EscapeSequence);

// Now escape special chars
escapedKey = escapedKey.Replace(InvalidSequence, EscapeSequence);

return escapedKey;
}

/// <summary>
/// Unescape previously escaped partition key
/// </summary>
/// <returns>Original, unescaped partition key</returns>
public static string Unescape(string escapedKey)
{
var originalKey = escapedKey;
var uuid = Guid.NewGuid().ToString("N");
// Do not touch duplicated escape sequence - we will replace them later
originalKey = originalKey.Replace(EscapeSequence + EscapeSequence, uuid);

// Restore escaped characters
originalKey = originalKey.Replace(EscapeSequence, InvalidSequence);

// Now it is safe to decode duplicated sequences
originalKey = originalKey.Replace(uuid, EscapeSequence);

return originalKey;
}
}
}
Loading

0 comments on commit 420e9e4

Please sign in to comment.