Skip to content

Commit

Permalink
Akka.Persistence.azure does not play nice with Akka.Cluster.Sharding (#…
Browse files Browse the repository at this point in the history
…109)

* Add support for Azure Storage Emulator development environment

* Add Config param to .ctor, so Akka.Persistence.Azure can play nicely with Akka.Cluster.Sharding

* Remove code rot

* Add Azure Storage Emulator settings to default config

* Make sure that the default query write-plugin does not point to the default akka local storage journal
  • Loading branch information
Aaronontheweb authored Jul 16, 2020
2 parents 6fc7fad + 321f8c8 commit 5616c0d
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 73 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ akka.persistence.snapshot-store.azure-blob-store.connection-string = "Your Azure
akka.persistence.snapshot-store.azure-blob-store.container-name = "Your container name"
```

## Using the plugin in local development environment

You can use this plugin with Azure Storage Emulator in a local development environment by setting the development flag in the configuration file:
```
akka.persistence.journal.azure-table.development = on
akka.persistence.snapshot-store.azure-blob-store.development = on
```

you do **not** need to provide a connection string for this to work, it is handled automatically by the Microsoft Azure SDK.

## Building this solution
To run the build script associated with this solution, execute the following:

Expand Down
19 changes: 0 additions & 19 deletions src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,6 @@ public void ShouldProvideDefaultTableNameValue()
tableSettings.TableName.Should().Be("AkkaPersistenceDefaultTable");
}

[Fact]
public void ShouldParseQueryConfig()
{
var querySettings =
AzureTableStorageQuerySettings.Create(
ConfigurationFactory.ParseString(@"akka.persistence.query.journal.azure-table{
class = ""classname""
write-plugin = foo
max-buffer-size = 100
refresh-interval = 3s
}").WithFallback(AzurePersistence.DefaultConfig)
.GetConfig("akka.persistence.query.journal.azure-table"));

querySettings.Class.Should().Be("classname");
querySettings.WritePlugin.Should().Be("foo");
querySettings.MaxBufferSize.Should().Be("100");
querySettings.RefreshInterval.Should().Be(new TimeSpan(0, 0, 3));
}

[Theory]
[InlineData("fo", "Invalid table name length")]
[InlineData("1foo", "Invalid table name")]
Expand Down
13 changes: 10 additions & 3 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
using Debug = System.Diagnostics.Debug;

namespace Akka.Persistence.Azure.Journal
Expand Down Expand Up @@ -54,11 +55,17 @@ public class AzureTableStorageJournal : AsyncWriteJournal
private readonly Lazy<CloudTable> _tableStorage;
private readonly Dictionary<string, ISet<IActorRef>> _tagSubscribers = new Dictionary<string, ISet<IActorRef>>();

public AzureTableStorageJournal()
public AzureTableStorageJournal(Config config = null)
{
_settings = AzurePersistence.Get(Context.System).TableSettings;
_settings = config is null ?
AzurePersistence.Get(Context.System).TableSettings :
AzureTableStorageJournalSettings.Create(config);

_serialization = new SerializationHelper(Context.System);
_storageAccount = CloudStorageAccount.Parse(_settings.ConnectionString);
_storageAccount = _settings.Development ?
CloudStorageAccount.DevelopmentStorageAccount :
CloudStorageAccount.Parse(_settings.ConnectionString);

_tableStorage = new Lazy<CloudTable>(() => InitCloudStorage(5).Result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ public AzureTableStorageJournalSettings(
string tableName,
TimeSpan connectTimeout,
TimeSpan requestTimeout,
bool verboseLogging)
bool verboseLogging,
bool development)
{
if(string.IsNullOrWhiteSpace(tableName))
throw new ConfigurationException("[AzureTableStorageJournal] Table name is null or empty.");

NameValidator.ValidateTableName(tableName);

if (ReservedTableNames.Contains(tableName))
Expand All @@ -38,6 +42,7 @@ public AzureTableStorageJournalSettings(
ConnectTimeout = connectTimeout;
RequestTimeout = requestTimeout;
VerboseLogging = verboseLogging;
Development = development;
}

/// <summary>
Expand Down Expand Up @@ -65,6 +70,8 @@ public AzureTableStorageJournalSettings(
/// </summary>
public bool VerboseLogging { get; }

public bool Development { get; }

/// <summary>
/// Creates an <see cref="AzureTableStorageJournalSettings" /> instance using the
/// `akka.persistence.journal.azure-table` HOCON configuration section.
Expand All @@ -78,12 +85,15 @@ public static AzureTableStorageJournalSettings Create(Config config)
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);
var development = config.GetBoolean("development", false);

return new AzureTableStorageJournalSettings(
connectionString,
tableName,
connectTimeout,
requestTimeout,
verbose);
verbose,
development);
}
}
}
42 changes: 0 additions & 42 deletions src/Akka.Persistence.Azure/Query/AzureTableStorageQuerySettings.cs

This file was deleted.

11 changes: 8 additions & 3 deletions src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Event;
using Akka.Persistence.Azure.Util;
using Akka.Persistence.Snapshot;
Expand All @@ -32,12 +33,16 @@ public class AzureBlobSnapshotStore : SnapshotStore
private readonly AzureBlobSnapshotStoreSettings _settings;
private readonly CloudStorageAccount _storageAccount;

public AzureBlobSnapshotStore()
public AzureBlobSnapshotStore(Config config = null)
{
_settings = AzurePersistence.Get(Context.System).BlobSettings;
_serialization = new SerializationHelper(Context.System);
_storageAccount = CloudStorageAccount.Parse(_settings.ConnectionString);
_settings = config is null
? AzurePersistence.Get(Context.System).BlobSettings
: AzureBlobSnapshotStoreSettings.Create(config);

_storageAccount = _settings.Development ?
CloudStorageAccount.DevelopmentStorageAccount :
CloudStorageAccount.Parse(_settings.ConnectionString);
_container = new Lazy<CloudBlobContainer>(() => InitCloudStorage().Result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ namespace Akka.Persistence.Azure.Snapshot
public sealed class AzureBlobSnapshotStoreSettings
{
public AzureBlobSnapshotStoreSettings(string connectionString, string containerName,
TimeSpan connectTimeout, TimeSpan requestTimeout, bool verboseLogging)
TimeSpan connectTimeout, TimeSpan requestTimeout, bool verboseLogging, bool development)
{
if (string.IsNullOrWhiteSpace(containerName))
throw new ConfigurationException("[AzureBlobSnapshotStore] Container name is null or empty.");

NameValidator.ValidateContainerName(containerName);
ConnectionString = connectionString;
ContainerName = containerName;
RequestTimeout = requestTimeout;
ConnectTimeout = connectTimeout;
VerboseLogging = verboseLogging;
Development = development;
}

/// <summary>
Expand Down Expand Up @@ -52,6 +56,8 @@ public AzureBlobSnapshotStoreSettings(string connectionString, string containerN
/// </summary>
public bool VerboseLogging { get; }

public bool Development { get; }

/// <summary>
/// Creates an <see cref="AzureBlobSnapshotStoreSettings" /> instance using the
/// `akka.persistence.snapshot-store.azure-blob-store` HOCON configuration section.
Expand All @@ -61,16 +67,19 @@ public AzureBlobSnapshotStoreSettings(string connectionString, string containerN
public static AzureBlobSnapshotStoreSettings Create(Config config)
{
var connectionString = config.GetString("connection-string");
var containerName = config.GetString("container-name");
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3));
var verbose = config.GetBoolean("verbose-logging", false);
var containerName = config.GetString("container-name");
var development = config.GetBoolean("development", false);

return new AzureBlobSnapshotStoreSettings(
connectionString,
containerName,
connectTimeout,
requestTimeout,
verbose);
requestTimeout,
verbose,
development);
}
}
}
8 changes: 8 additions & 0 deletions src/Akka.Persistence.Azure/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ akka.persistence {

# dispatcher used to drive journal actor
plugin-dispatcher = "akka.actor.default-dispatcher"

# Support for Azure Storage Emulator for local development.
# Will ignore connection string settings if turned on.
development = off
}
}

Expand Down Expand Up @@ -78,6 +82,10 @@ akka.persistence {

# dispatcher used to drive snapshot storage actor
plugin-dispatcher = "akka.actor.default-dispatcher"

# Support for Azure Storage Emulator for local development.
# Will ignore connection string settings if turned on.
development = off
}
}
}

0 comments on commit 5616c0d

Please sign in to comment.