Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Persistence.Azure v0.8.0 Release #158

Merged
merged 21 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
114f346
Problem with implimentation of akka persistence contract (#143)
MattiasJakobsson Apr 3, 2021
c58a2a2
Limiting batch size for table storage (#145)
MattiasJakobsson Apr 3, 2021
ad76518
Bump Microsoft.NET.Test.Sdk from 16.8.3 to 16.9.4 (#149)
dependabot-preview[bot] Apr 3, 2021
8b318c6
Added settings for auto-initialize. Fixes #144
MattiasJakobsson Apr 6, 2021
1a0f8d6
Merge pull request #150 from MattiasJakobsson/add-options-for-auto-in…
Arkatufus Apr 9, 2021
ddf9ca3
Upgrade WindowsAzure.Storage to Microsoft.Azure.Cosmos.Table and Asur…
Arkatufus Apr 13, 2021
f5508ff
Bump FluentAssertions from 4.14.0 to 5.10.3
dependabot-preview[bot] Apr 13, 2021
a4664a9
Added support to configure blob container public access level
MattiasJakobsson Apr 13, 2021
c8ae12e
Update changes to FluentAssertions API
Arkatufus Apr 13, 2021
8e6e8e6
Remove reference to FluentAssertions, update FluentAssertions API cha…
Arkatufus Apr 13, 2021
b09655b
Merge pull request #139 from petabridge/dependabot/nuget/FluentAssert…
Arkatufus Apr 13, 2021
537c37e
Bump AkkaVersion from 1.4.14 to 1.4.18
dependabot-preview[bot] Apr 13, 2021
09c1ad0
Update FluentAssertions API call
Arkatufus Apr 13, 2021
3e822c9
Merge pull request #148 from petabridge/dependabot/nuget/AkkaVersion-…
Arkatufus Apr 13, 2021
907c1d4
Merge branch 'dev' into configure-container-access
Arkatufus Apr 13, 2021
6f8e4cc
Merge pull request #152 from MattiasJakobsson/configure-container-access
Arkatufus Apr 13, 2021
827f502
Clean up code. Add spec and backward compatibility. (#153)
Arkatufus Apr 13, 2021
9c79c55
Change the default public access type of auto-init containers to None…
Arkatufus Apr 13, 2021
e0f14d0
Add DefaultAzureIdentity support (#155)
Arkatufus Apr 15, 2021
1ea31d6
Improve README documentation (#156)
Arkatufus Apr 16, 2021
9b5559f
Update RELEASE_NOTES.md for 0.8.0 release (#157)
Arkatufus Apr 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,49 @@ akka.persistence.snapshot-store.azure-blob-store.connection-string = "Your Azure
akka.persistence.snapshot-store.azure-blob-store.container-name = "Your container name"
```

### Local development mode
You can turn local development mode by changing these two settings:
```
akka.persistence.journal.azure-table.development = on
akka.persistence.snapshot-store.azure-blob-store.development = on
```
When set, the plugin will ignore the `connection-string` setting and uses the Azure Storage Emulator default connection string of "UseDevelopmentStorage=true" instead.

### Configuring snapshots Blob Storage

#### Auto-initialize blob container

Blob container auto-initialize behaviour can be changed by changing this flag setting:
```
# Creates the required container if set
akka.persistence.snapshot-store.azure-blob-store.auto-initialize = on
```

#### Container public access type

Auto-initialized blob container public access type can be controlled by changing this setting:
```
# Public access level for the auto-initialized storage container.
# Valid values are "None", "BlobContainer" or "Blob"
akka.persistence.snapshot-store.azure-blob-store.container-public-access-type = "None"
```

#### DefaultAzureCredential

`Azure.Identity` `DefaultAzureCredential` can be used to configure the resource by using `AzureBlobSnapshotSetup`. When using `DefaultAzureCredential`, the HOCON 'connection-string' setting is ignored.

Example:
```
var blobStorageSetup = AzureBlobSnapshotSetup.Create(
new Uri("https://{account_name}.blob.core.windows.net"), // This is the blob service URI
new DefaultAzureCredential() // You can pass a DefaultAzureCredentialOption here.
// https://docs.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential?view=azure-dotnet
);

var bootstrap = BootstrapSetup.Create().And(blobStorageSetup);
var system = ActorSystem.Create("actorSystem", bootstrap);
```

## Using the plugin in local development environment

You can use this plugin with Azure Storage Emulator in a local development environment by setting the development flag in the configuration file:
Expand Down
13 changes: 11 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
#### 0.7.2 January 08 2021 ####
#### 0.8.0 April 16 2021 ####
**Release of Akka.Persistence.Azure**

- Upgraded to [Akka.NET v1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14)
- [Problem with implimentation of akka persistence contract](https://github.com/petabridge/Akka.Persistence.Azure/pull/143)
- [Limiting batch size for table storage](https://github.com/petabridge/Akka.Persistence.Azure/pull/145)
- [Bump AkkaVersion from 1.4.14 to 1.4.18](https://github.com/petabridge/Akka.Persistence.Azure/pull/148)
- [Added settings for auto-initialize](https://github.com/petabridge/Akka.Persistence.Azure/pull/150)
- [Upgrade WindowsAzure.Storage to Microsoft.Azure.Cosmos.Table and Azure.Storage.Blobs](https://github.com/petabridge/Akka.Persistence.Azure/pull/151)
- [Added support to configure blob container public access level](https://github.com/petabridge/Akka.Persistence.Azure/pull/152)
- [Change the default public access type of auto-init containers to None](https://github.com/petabridge/Akka.Persistence.Azure/pull/154)
- [Add DefaultAzureIdentity support for snapshot Azure Blob Storage](https://github.com/petabridge/Akka.Persistence.Azure/pull/155)

See the README for documentation on how to use the latest features in Akka.Persistence.Azure: https://github.com/petabridge/Akka.Persistence.Azure#akkapersistenceazure
2 changes: 1 addition & 1 deletion src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Threading.Tasks;
using Akka.Actor;
using Microsoft.WindowsAzure.Storage;
using Microsoft.Azure.Cosmos.Table;

namespace Akka.Persistence.Azure.TestHelpers
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

<ItemGroup>
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Akka.Persistence.Azure.Journal;
using Akka.Persistence.Azure.Query;
using Akka.Persistence.Azure.Snapshot;
using Azure.Storage.Blobs.Models;
using FluentAssertions;
using Xunit;

Expand Down Expand Up @@ -40,6 +41,7 @@ public void ShouldParseDefaultSnapshotConfig()
blobSettings.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(3));
blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3));
blobSettings.VerboseLogging.Should().BeFalse();
blobSettings.ContainerPublicAccessType.Should().Be(PublicAccessType.None);
}

[Fact]
Expand Down Expand Up @@ -97,7 +99,7 @@ public void ShouldThrowArgumentExceptionForIllegalTableNames(string tableName, s
table-name = " + tableName + @"
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.journal.azure-table"));
createJournalSettings.ShouldThrow<ArgumentException>(reason);
createJournalSettings.Should().Throw<ArgumentException>(reason);
}

[Theory]
Expand All @@ -113,7 +115,7 @@ public void ShouldThrowArgumentExceptionForIllegalContainerNames(string containe
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.snapshot-store.azure-blob-store"));

createSnapshotSettings.ShouldThrow<ArgumentException>(reason);
createSnapshotSettings.Should().Throw<ArgumentException>(reason);
}
}
}
6 changes: 5 additions & 1 deletion src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<PropertyGroup>
<TargetFramework>$(NetStandardLibVersion)</TargetFramework>
<Description>Akka.Persistence support for Windows Azure Table storage and Azure blob storage.</Description>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand All @@ -14,7 +15,10 @@

<ItemGroup>
<PackageReference Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
<PackageReference Include="Azure.Identity" Version="1.3.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.8.1" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageReference Include="System.Linq.Async" Version="5.0.0" />
</ItemGroup>

</Project>
46 changes: 46 additions & 0 deletions src/Akka.Persistence.Azure/CloudTableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;

namespace Akka.Persistence.Azure
{
public static class CloudTableExtensions
{
private const int MaxBatchSize = 100;

public static async Task<IList<TableResult>> ExecuteBatchAsLimitedBatches(
this CloudTable table,
TableBatchOperation batch)
{
if (batch.Count < 1)
return new List<TableResult>();

if (batch.Count <= MaxBatchSize)
return await table.ExecuteBatchAsync(batch);

var result = new List<TableResult>();
var limitedBatchOperationLists = batch.ChunkBy(MaxBatchSize);

foreach (var limitedBatchOperationList in limitedBatchOperationLists)
{
var limitedBatch = CreateLimitedTableBatchOperation(limitedBatchOperationList);
var limitedBatchResult = await table.ExecuteBatchAsync(limitedBatch);
result.AddRange(limitedBatchResult);
}

return result;
}

private static TableBatchOperation CreateLimitedTableBatchOperation(
IEnumerable<TableOperation> limitedBatchOperationList)
{
var limitedBatch = new TableBatchOperation();
foreach (var limitedBatchOperation in limitedBatchOperationList)
{
limitedBatch.Add(limitedBatchOperation);
}

return limitedBatch;
}
}
}
40 changes: 30 additions & 10 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
using Akka.Persistence.Azure.Util;
using Akka.Persistence.Journal;
using Akka.Util.Internal;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
using Microsoft.Azure.Cosmos.Table;
using Debug = System.Diagnostics.Debug;

namespace Akka.Persistence.Azure.Journal
Expand Down Expand Up @@ -223,7 +222,7 @@ protected override async Task DeleteMessagesToAsync(
tableBatchOperation.Delete(toBeDeleted);
}

var deleteTask = Table.ExecuteBatchAsync(tableBatchOperation);
var deleteTask = Table.ExecuteBatchAsLimitedBatches(tableBatchOperation);

await deleteTask;
}
Expand Down Expand Up @@ -378,20 +377,24 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
if (_log.IsDebugEnabled && _settings.VerboseLogging)
_log.Debug("Attempting to write batch of {0} messages to Azure storage", persistenceBatch.Count);

var persistenceResults = await Table.ExecuteBatchAsync(persistenceBatch);
var persistenceResults = await Table.ExecuteBatchAsLimitedBatches(persistenceBatch);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in persistenceResults)
_log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode);

exceptions = exceptions.Add(null);
}
catch (Exception ex)
{
_log.Warning(ex, "Failure while writing messages to Azure table storage");

exceptions = exceptions.Add(ex);
}
}
}

if (exceptions.IsEmpty)
if (exceptions.All(ex => ex == null))
{
var allPersistenceIdsBatch = new TableBatchOperation();

Expand All @@ -401,7 +404,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey));
});

var allPersistenceResults = await Table.ExecuteBatchAsync(allPersistenceIdsBatch);
var allPersistenceResults = await Table.ExecuteBatchAsLimitedBatches(allPersistenceIdsBatch);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in allPersistenceResults)
Expand All @@ -426,7 +429,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
eventTagsBatch.InsertOrReplace(item);
}

var eventTagsResults = await Table.ExecuteBatchAsync(eventTagsBatch);
var eventTagsResults = await Table.ExecuteBatchAsLimitedBatches(eventTagsBatch);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in eventTagsResults)
Expand All @@ -439,7 +442,6 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
NotifyTagChange(tag);
}
}

}
}
}
Expand All @@ -451,7 +453,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(
*
* Either everything fails or everything succeeds is the idea I guess.
*/
return exceptions.IsEmpty ? null : exceptions;
return exceptions.Any(ex => ex != null) ? exceptions : null;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -709,7 +711,7 @@ private async Task<IEnumerable<string>> GetAllPersistenceIds()
{
var query = GenerateAllPersistenceIdsQuery();

TableQuerySegment result = null;
TableQuerySegment<DynamicTableEntity> result = null;

var returnValue = ImmutableList<string>.Empty;

Expand All @@ -734,8 +736,26 @@ private async Task<CloudTable> InitCloudStorage(
var tableClient = _storageAccount.CreateCloudTableClient();
var tableRef = tableClient.GetTableReference(_settings.TableName);
var op = new OperationContext();

using (var cts = new CancellationTokenSource(_settings.ConnectTimeout))
{
if (!_settings.AutoInitialize)
{
var exists = await tableRef.ExistsAsync(null, null, cts.Token);

if (!exists)
{
remainingTries = 0;

throw new Exception(
$"Table {_settings.TableName} doesn't exist. Either create it or turn auto-initialize on");
}

_log.Info("Successfully connected to existing table", _settings.TableName);

return tableRef;
}

if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token))
_log.Info("Created Azure Cloud Table", _settings.TableName);
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using System;
using System.Linq;
using Akka.Configuration;
using Microsoft.WindowsAzure.Storage;
using Microsoft.Azure.Cosmos.Table;

namespace Akka.Persistence.Azure.Journal
{
Expand All @@ -24,7 +24,8 @@ public AzureTableStorageJournalSettings(
TimeSpan connectTimeout,
TimeSpan requestTimeout,
bool verboseLogging,
bool development)
bool development,
bool autoInitialize)
{
if(string.IsNullOrWhiteSpace(tableName))
throw new ConfigurationException("[AzureTableStorageJournal] Table name is null or empty.");
Expand All @@ -43,6 +44,7 @@ public AzureTableStorageJournalSettings(
RequestTimeout = requestTimeout;
VerboseLogging = verboseLogging;
Development = development;
AutoInitialize = autoInitialize;
}

/// <summary>
Expand Down Expand Up @@ -71,6 +73,8 @@ public AzureTableStorageJournalSettings(
public bool VerboseLogging { get; }

public bool Development { get; }

public bool AutoInitialize { get; }

/// <summary>
/// Creates an <see cref="AzureTableStorageJournalSettings" /> instance using the
Expand All @@ -86,14 +90,16 @@ public static AzureTableStorageJournalSettings Create(Config config)
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);
var development = config.GetBoolean("development", false);
var autoInitialize = config.GetBoolean("auto-initialize", true);

return new AzureTableStorageJournalSettings(
connectionString,
tableName,
connectTimeout,
requestTimeout,
verbose,
development);
development,
autoInitialize);
}
}
}
3 changes: 1 addition & 2 deletions src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Microsoft.Azure.Cosmos.Table;

namespace Akka.Persistence.Azure.Journal
{
Expand Down
18 changes: 18 additions & 0 deletions src/Akka.Persistence.Azure/ListExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;

namespace Akka.Persistence.Azure
{
public static class ListExtensions
{
public static IImmutableList<IEnumerable<T>> ChunkBy<T>(this IEnumerable<T> source, int chunkSize)
{
return source
.Select((x, i) => new { Index = i, Value = x })
.GroupBy(x => x.Index / chunkSize)
.Select(x => x.Select(v => v.Value).ToList().AsEnumerable())
.ToImmutableList();
}
}
}
Loading