From d2b0d1967152598f69a4ceff46dfe94fe4672203 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 9 Nov 2016 23:11:49 +0100 Subject: [PATCH] local crdts & fixed tests --- src/Akka.sln | 1 + src/SharedAssemblyInfo.cs | 10 +- .../Akka.DistributedData.Tests/FlagSpec.cs | 1 + .../GCounterSpec.cs | 2 + .../Akka.DistributedData.Tests/GSetSpec.cs | 1 + .../LWWDictionarySpec.cs | 1 + .../LWWRegisterSpec.cs | 1 + .../LocalConcurrencySpec.cs | 1 + .../ORDictionarySpec.cs | 1 + .../ORMultiDictionarySpec.cs | 3 +- .../Akka.DistributedData.Tests/ORSetSpec.cs | 1 + .../PNCounterDictionarySpec.cs | 1 + .../PNCounterSpec.cs | 1 + .../PruningStateSpec.cs | 1 + .../ReplicatedDataSerializerSpec.cs | 22 ++- .../ReplicatorMessageSerializerSpec.cs | 3 +- .../VersionVectorSpec.cs | 1 + .../WriteAggregatorSpec.cs | 1 + .../Akka.DistributedData.csproj | 9 ++ .../cluster/Akka.DistributedData/Flag.cs | 2 + .../cluster/Akka.DistributedData/GCounter.cs | 7 +- .../cluster/Akka.DistributedData/GSet.cs | 21 ++- .../Akka.DistributedData/Internal/Internal.cs | 132 +++++++++++------- .../cluster/Akka.DistributedData/Key.cs | 2 +- .../Akka.DistributedData/LWWDictionary.cs | 71 +++++++++- .../Akka.DistributedData/LWWRegister.cs | 19 ++- .../Local/ClusterReplicatedDataExtensions.cs | 98 +++++++++++++ .../Local/LocalGCounter.cs | 80 +++++++++++ .../Local/LocalLWWDictionary.cs | 100 +++++++++++++ .../Local/LocalLWWRegister.cs | 72 ++++++++++ .../Local/LocalORDictionary.cs | 108 ++++++++++++++ .../Local/LocalORMultiDictionary.cs | 132 ++++++++++++++++++ .../Akka.DistributedData/Local/LocalORSet.cs | 91 ++++++++++++ .../Local/LocalPNCounter.cs | 110 +++++++++++++++ .../Local/LocalPNCounterDictionary.cs | 123 ++++++++++++++++ .../Akka.DistributedData/ORDictionary.cs | 126 +++++++++++------ .../Akka.DistributedData/ORMultiDictionary.cs | 59 +++++++- .../cluster/Akka.DistributedData/ORSet.cs | 34 +++-- .../cluster/Akka.DistributedData/PNCounter.cs | 5 +- .../PNCounterDictionary.cs | 68 ++++++++- .../Akka.DistributedData/PruningState.cs | 2 + .../Akka.DistributedData/Replicator.cs | 2 +- .../Akka.FSharp/Properties/AssemblyInfo.fs | 10 +- 43 files changed, 1393 insertions(+), 143 deletions(-) create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/ClusterReplicatedDataExtensions.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalGCounter.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalLWWDictionary.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalLWWRegister.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalORDictionary.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalORMultiDictionary.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalORSet.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounter.cs create mode 100644 src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounterDictionary.cs diff --git a/src/Akka.sln b/src/Akka.sln index 14cfce1d3ea..775fc884407 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -245,6 +245,7 @@ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Tests.TCK", "core\Akka.Streams.Tests.TCK\Akka.Streams.Tests.TCK.csproj", "{805908F6-8792-47C0-8E4F-4DB4C5736A47}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Cluster.Sharding.Tests.MultiNode", "contrib\cluster\Akka.Cluster.Sharding.Tests.MultiNode\Akka.Cluster.Sharding.Tests.MultiNode.csproj", "{F79A5E87-1DB5-4D76-9B14-382F8C536DA6}" +EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.DistributedData", "contrib\cluster\Akka.DistributedData\Akka.DistributedData.csproj", "{59CFFC88-8A73-445D-B191-281E40BE9421}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.DistributedData.Tests", "contrib\cluster\Akka.DistributedData.Tests\Akka.DistributedData.Tests.csproj", "{0DBF3D66-6E5E-4A89-AA3D-674B92FE81B7}" diff --git a/src/SharedAssemblyInfo.cs b/src/SharedAssemblyInfo.cs index 191836bcac4..bd6ce01c650 100644 --- a/src/SharedAssemblyInfo.cs +++ b/src/SharedAssemblyInfo.cs @@ -1,11 +1,4 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- - -// +// using System.Reflection; [assembly: AssemblyCompanyAttribute("Akka.NET Team")] @@ -13,4 +6,3 @@ [assembly: AssemblyTrademarkAttribute("")] [assembly: AssemblyVersionAttribute("1.1.2.0")] [assembly: AssemblyFileVersionAttribute("1.1.2.0")] - diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/FlagSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/FlagSpec.cs index cbb5fbac825..692d737c04c 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/FlagSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/FlagSpec.cs @@ -10,6 +10,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class FlagSpec { public FlagSpec(ITestOutputHelper output) diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/GCounterSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/GCounterSpec.cs index 1a6faa84b4e..94871106d9c 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/GCounterSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/GCounterSpec.cs @@ -7,11 +7,13 @@ using Akka.Cluster; using System.Numerics; +using Akka.Actor; using Xunit; using Xunit.Abstractions; namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class GCounterSpec { private readonly UniqueAddress _node1 = new UniqueAddress(new Actor.Address("akka.tcp", "Sys", "localhost", 2551), 1); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/GSetSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/GSetSpec.cs index e786318596a..89633e1f201 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/GSetSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/GSetSpec.cs @@ -10,6 +10,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class GSetSpec { const string user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs index f33bb213a10..99681aaf88e 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs @@ -15,6 +15,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class LWWDictionarySpec { private readonly UniqueAddress _node1; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/LWWRegisterSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/LWWRegisterSpec.cs index ef57433659d..fd6003cdf50 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/LWWRegisterSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/LWWRegisterSpec.cs @@ -14,6 +14,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class LWWRegisterSpec { private readonly UniqueAddress _node1; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs index be63e06043b..c3237797f10 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs @@ -13,6 +13,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class LocalConcurrencySpec : Akka.TestKit.Xunit2.TestKit { public sealed class Updater : ReceiveActor, IWithUnboundedStash diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs index 8569046d883..2327fca6e2d 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs @@ -14,6 +14,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class ORDictionarySpec { private readonly UniqueAddress _node1; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs index 92c7d7123ab..20de99a1229 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs @@ -14,6 +14,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class ORMultiDictionarySpec { private readonly UniqueAddress _node1; @@ -139,7 +140,7 @@ public void A_ORMultiDictionary_should_be_able_to_get_all_bindings_for_an_entry_ m.TryGetValue("a", out a); Assert.Equal(ImmutableHashSet.Create("A1", "A2"), a); - var m2 = m.SetItem(_node1, "a", a.Remove("A1")); + var m2 = m.SetItems(_node1, "a", a.Remove("A1")); Assert.Equal(ImmutableDictionary.CreateRange(new[] { new KeyValuePair>("a", ImmutableHashSet.Create("A2")), diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs index 9a6995e40bc..1ad792a16da 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs @@ -12,6 +12,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class ORSetSpec { private readonly string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterDictionarySpec.cs index e851eef685c..232b597ea08 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterDictionarySpec.cs @@ -14,6 +14,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class PNCounterDictionarySpec { readonly UniqueAddress _node1; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterSpec.cs index 1789d5ac34c..fb10ca0dea9 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/PNCounterSpec.cs @@ -11,6 +11,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class PNCounterSpec { readonly UniqueAddress _address1; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/PruningStateSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/PruningStateSpec.cs index 199fd9a4753..56f75b99831 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/PruningStateSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/PruningStateSpec.cs @@ -12,6 +12,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class PruningStateSpec { private readonly UniqueAddress _node1 = new UniqueAddress(new Address("akka.tcp", "Sys", "localhost", 2551), 1); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs index e92afd2e4d2..ade124188e6 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs @@ -5,6 +5,9 @@ // //----------------------------------------------------------------------- +using System; +using System.Collections; +using System.Linq; using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster; @@ -15,6 +18,7 @@ namespace Akka.DistributedData.Tests.Serialization { + [Collection("DistributedDataSpec")] public class ReplicatedDataSerializerSpec : TestKit.Xunit2.TestKit { private static readonly Config BaseConfig = ConfigurationFactory.ParseString(@" @@ -159,11 +163,11 @@ public void ReplicatedDataSerializer_should_serialize_ORMultiDictionary() CheckSerialization(ORMultiDictionary.Empty.AddItem(_address1, "a", "A")); CheckSerialization(ORMultiDictionary.Empty .AddItem(_address1, "a", "A1") - .SetItem(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" })) + .SetItems(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" })) .AddItem(_address2, "a", "A2")); var m1 = ORMultiDictionary.Empty.AddItem(_address1, "a", "A1").AddItem(_address2, "a", "A2"); - var m2 = ORMultiDictionary.Empty.SetItem(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" })); + var m2 = ORMultiDictionary.Empty.SetItems(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" })); CheckSameContent(m1.Merge(m2), m2.Merge(m1)); } @@ -185,18 +189,24 @@ public void ReplicatedDataSerializer_should_serialize_VersionVector() CheckSameContent(v1.Merge(v2), v2.Merge(v1)); } - private void CheckSerialization(object expected) + private void CheckSerialization(T expected) { var serializer = Sys.Serialization.FindSerializerFor(expected); var blob = serializer.ToBinary(expected); var actual = serializer.FromBinary(blob, expected.GetType()); - - Assert.Equal(expected, actual); + + // we cannot use Assert.Equal here since ORMultiDictionary will be resolved as + // IEnumerable> and immutable sets + // fails on structural equality + Assert.True(expected.Equals(actual), $"Expected: {expected}\nActual: {actual}"); } private void CheckSameContent(object a, object b) { - Assert.Equal(a, b); + // we cannot use Assert.Equal here since ORMultiDictionary will be resolved as + // IEnumerable> and immutable sets + // fails on structural equality + Assert.True(a.Equals(b)); var serializer = Sys.Serialization.FindSerializerFor(a); var blobA = serializer.ToBinary(a); var blobB = serializer.ToBinary(b); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs index d2952f9e9f1..b5fd7395d9f 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs @@ -19,6 +19,7 @@ namespace Akka.DistributedData.Tests.Serialization { + [Collection("DistributedDataSpec")] public class ReplicatorMessageSerializerSpec : TestKit.Xunit2.TestKit { private static readonly Config BaseConfig = ConfigurationFactory.ParseString(@" @@ -87,7 +88,7 @@ private void CheckSerialization(object expected) var blob = serializer.ToBinary(expected); var actual = serializer.FromBinary(blob, expected.GetType()); - Assert.Equal(expected, actual); + Assert.True(expected.Equals(actual), $"Expected: {expected}\nActual: {actual}"); } } } diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/VersionVectorSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/VersionVectorSpec.cs index 929435ea9e8..38b6179fce4 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/VersionVectorSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/VersionVectorSpec.cs @@ -12,6 +12,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class VersionVectorSpec { private readonly UniqueAddress _node1 = new UniqueAddress(new Address("akka.tcp", "Sys", "localhost", 2551), 1); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs index ca0c3927ec9..3911077d484 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs @@ -17,6 +17,7 @@ namespace Akka.DistributedData.Tests { + [Collection("DistributedDataSpec")] public class WriteAggregatorSpec : Akka.TestKit.Xunit2.TestKit { internal class TestWriteAggregator : WriteAggregator diff --git a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj index bb8ed5f0efb..1f57336e15b 100644 --- a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj +++ b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj @@ -54,6 +54,7 @@ + @@ -63,6 +64,13 @@ + + + + + + + @@ -78,6 +86,7 @@ + diff --git a/src/contrib/cluster/Akka.DistributedData/Flag.cs b/src/contrib/cluster/Akka.DistributedData/Flag.cs index c94e0ea5d43..10a0acd1c8a 100644 --- a/src/contrib/cluster/Akka.DistributedData/Flag.cs +++ b/src/contrib/cluster/Akka.DistributedData/Flag.cs @@ -46,6 +46,8 @@ public bool Equals(Flag other) public IReplicatedData Merge(IReplicatedData other) => Merge((Flag) other); public Flag Merge(Flag other) => other.Enabled ? other : this; public Flag SwitchOn() => Enabled ? this : new Flag(true); + + public static implicit operator bool(Flag flag) => flag.Enabled; } [Serializable] diff --git a/src/contrib/cluster/Akka.DistributedData/GCounter.cs b/src/contrib/cluster/Akka.DistributedData/GCounter.cs index 3f3772717b7..740a4549f38 100644 --- a/src/contrib/cluster/Akka.DistributedData/GCounter.cs +++ b/src/contrib/cluster/Akka.DistributedData/GCounter.cs @@ -10,6 +10,8 @@ using System.Collections.Immutable; using System.Linq; using System.Numerics; +using Akka.Actor; +using Akka.Util; namespace Akka.DistributedData { @@ -45,13 +47,14 @@ public sealed class GCounter : FastMerge, IRemovedNodePruning /// Current total value of the counter. /// - public BigInteger Value => State.Aggregate(Zero, (v, acc) => v + acc.Value); + public BigInteger Value { get; } public GCounter() : this(ImmutableDictionary.Empty) { } public GCounter(IImmutableDictionary state) { State = state; + Value = State.Aggregate(Zero, (v, acc) => v + acc.Value); } /// @@ -146,5 +149,7 @@ public int CompareTo(GCounter other) } public override string ToString() => $"GCounter({Value})"; + + public static implicit operator BigInteger(GCounter counter) => counter.Value; } } diff --git a/src/contrib/cluster/Akka.DistributedData/GSet.cs b/src/contrib/cluster/Akka.DistributedData/GSet.cs index 7807a4abdde..88d65c18bb2 100644 --- a/src/contrib/cluster/Akka.DistributedData/GSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/GSet.cs @@ -6,8 +6,12 @@ //----------------------------------------------------------------------- using System; +using System.Collections; +using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Text; +using Akka.Cluster; namespace Akka.DistributedData { @@ -35,7 +39,7 @@ public static class GSet /// This class is immutable, i.e. "modifying" methods return a new instance. /// [Serializable] - public sealed class GSet : FastMerge>, IReplicatedDataSerialization, IGSet, IEquatable> + public sealed class GSet : FastMerge>, IReplicatedDataSerialization, IGSet, IEquatable>, IEnumerable { public static readonly GSet Empty = new GSet(); @@ -77,9 +81,24 @@ public bool Equals(GSet other) return Elements.SetEquals(other.Elements); } + public IEnumerator GetEnumerator() => Elements.GetEnumerator(); + public override bool Equals(object obj) => obj is GSet && Equals((GSet) obj); public override int GetHashCode() => Elements.GetHashCode(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public override string ToString() + { + var sb = new StringBuilder("GSet("); + foreach (var element in Elements) + { + sb.Append(element).Append(','); + } + sb.Append(")"); + + return sb.ToString(); + } } internal interface IGSetKey diff --git a/src/contrib/cluster/Akka.DistributedData/Internal/Internal.cs b/src/contrib/cluster/Akka.DistributedData/Internal/Internal.cs index 36b5a8e8b8c..a1096e2cd71 100644 --- a/src/contrib/cluster/Akka.DistributedData/Internal/Internal.cs +++ b/src/contrib/cluster/Akka.DistributedData/Internal/Internal.cs @@ -9,6 +9,7 @@ using Akka.IO; using System.Collections.Immutable; using System.Linq; +using System.Text; using Akka.Actor; using Akka.Cluster; using Akka.Event; @@ -19,24 +20,24 @@ namespace Akka.DistributedData.Internal internal sealed class GossipTick { internal static readonly GossipTick Instance = new GossipTick(); - private GossipTick() { } + public override string ToString() => "GossipTick"; } [Serializable] internal class RemovedNodePruningTick { internal static readonly RemovedNodePruningTick Instance = new RemovedNodePruningTick(); - private RemovedNodePruningTick() { } + public override string ToString() => "RemovedNodePruningTick"; } [Serializable] internal class ClockTick { internal static readonly ClockTick Instance = new ClockTick(); - private ClockTick() { } + public override string ToString() => "ClockTick"; } [Serializable] @@ -65,9 +66,11 @@ public override int GetHashCode() { unchecked { - return ((Key != null ? Key.GetHashCode() : 0)*397) ^ (Envelope != null ? Envelope.GetHashCode() : 0); + return ((Key != null ? Key.GetHashCode() : 0) * 397) ^ (Envelope != null ? Envelope.GetHashCode() : 0); } } + + public override string ToString() => $"Write(key={Key}, envelope={Envelope})"; } [Serializable] @@ -76,20 +79,10 @@ internal sealed class WriteAck : IReplicatorMessage, IEquatable internal static readonly WriteAck Instance = new WriteAck(); private WriteAck() { } - public bool Equals(WriteAck other) - { - return true; - } - - public override bool Equals(object obj) - { - return obj is WriteAck; - } - - public override int GetHashCode() - { - return 1; - } + public bool Equals(WriteAck other) => true; + public override bool Equals(object obj) => obj is WriteAck; + public override int GetHashCode() => 1; + public override string ToString() => "WriteAck"; } [Serializable] @@ -107,9 +100,11 @@ public bool Equals(Read other) return other != null && Key == other.Key; } - public override bool Equals(object obj) => obj is Read && Equals((Read) obj); + public override bool Equals(object obj) => obj is Read && Equals((Read)obj); public override int GetHashCode() => Key?.GetHashCode() ?? 0; + + public override string ToString() => $"Read(key={Key})"; } [Serializable] @@ -130,9 +125,11 @@ public bool Equals(ReadResult other) return Equals(Envelope, other.Envelope); } - public override bool Equals(object obj) => obj is ReadResult && Equals((ReadResult) obj); + public override bool Equals(object obj) => obj is ReadResult && Equals((ReadResult)obj); public override int GetHashCode() => Envelope?.GetHashCode() ?? 0; + + public override string ToString() => $"ReadResult(envelope={Envelope})"; } [Serializable] @@ -155,15 +152,17 @@ public bool Equals(ReadRepair other) return Equals(Key, other.Key) && Equals(Envelope, other.Envelope); } - public override bool Equals(object obj) => obj is ReadRepair && Equals((ReadRepair) obj); + public override bool Equals(object obj) => obj is ReadRepair && Equals((ReadRepair)obj); public override int GetHashCode() { unchecked { - return ((Key?.GetHashCode() ?? 0)*397) ^ (Envelope?.GetHashCode() ?? 0); + return ((Key?.GetHashCode() ?? 0) * 397) ^ (Envelope?.GetHashCode() ?? 0); } } + + public override string ToString() => $"ReadRepair(key={Key}, envelope={Envelope})"; } [Serializable] @@ -172,6 +171,8 @@ internal sealed class ReadRepairAck public static readonly ReadRepairAck Instance = new ReadRepairAck(); private ReadRepairAck() { } + + public override string ToString() => $"ReadRepairAck"; } [Serializable] @@ -197,7 +198,7 @@ internal bool NeedPruningFrom(UniqueAddress removedNode) return r != null && r.NeedPruningFrom(removedNode); } - internal DataEnvelope InitRemovedNodePruning(UniqueAddress removed, UniqueAddress owner) => + internal DataEnvelope InitRemovedNodePruning(UniqueAddress removed, UniqueAddress owner) => new DataEnvelope(Data, Pruning.Add(removed, new PruningState(owner, new PruningInitialized(ImmutableHashSet
.Empty)))); internal DataEnvelope Prune(UniqueAddress from) @@ -243,10 +244,10 @@ internal DataEnvelope Merge(IReplicatedData otherData) private IReplicatedData Cleaned(IReplicatedData c, IImmutableDictionary p) => p.Aggregate(c, (state, kvp) => { - if (c is IRemovedNodePruning + if (c is IRemovedNodePruning && kvp.Value.Phase is PruningPerformed && ((IRemovedNodePruning)c).NeedPruningFrom(kvp.Key)) - return ((IRemovedNodePruning) c).PruningCleanup(kvp.Key); + return ((IRemovedNodePruning)c).PruningCleanup(kvp.Key); return c; }); @@ -273,15 +274,28 @@ public bool Equals(DataEnvelope other) return Data.Equals(other.Data) && pruningCountsEqual && elementsEqual; } - public override bool Equals(object obj) => obj is DataEnvelope && Equals((DataEnvelope) obj); + public override bool Equals(object obj) => obj is DataEnvelope && Equals((DataEnvelope)obj); public override int GetHashCode() { unchecked { - return ((Data != null ? Data.GetHashCode() : 0)*397) ^ (Pruning != null ? Pruning.GetHashCode() : 0); + return ((Data != null ? Data.GetHashCode() : 0) * 397) ^ (Pruning != null ? Pruning.GetHashCode() : 0); } } + + public override string ToString() + { + var sb = new StringBuilder("{"); + if (Pruning != null) + foreach (var entry in Pruning) + { + sb.Append(entry.Key).Append("->").Append(entry.Value).Append(","); + } + sb.Append('}'); + + return $"DataEnvelope(data={Data}, prunning={sb})"; + } } [Serializable] @@ -293,57 +307,63 @@ private DeletedData() { } public DeletedData Merge(DeletedData other) => this; - public IReplicatedData Merge(IReplicatedData other) => Merge((DeletedData) other); - public bool Equals(DeletedData other) - { - return true; - } + public IReplicatedData Merge(IReplicatedData other) => Merge((DeletedData)other); + public bool Equals(DeletedData other) => true; - public override bool Equals(object obj) - { - return obj is DeletedData; - } + public override bool Equals(object obj) => obj is DeletedData; - public override int GetHashCode() - { - return 1; - } + public override int GetHashCode() => 1; + + public override string ToString() => "DeletedData"; } - + [Serializable] internal sealed class Status : IReplicatorMessage, IEquatable { public IImmutableDictionary Digests { get; } public int Chunk { get; } - public int TotChunks { get; } + public int TotalChunks { get; } - public Status(IImmutableDictionary digests, int chunk, int totChunks) + public Status(IImmutableDictionary digests, int chunk, int totalChunks) { Digests = digests; Chunk = chunk; - TotChunks = totChunks; + TotalChunks = totalChunks; } public bool Equals(Status other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(this, other)) return true; - - return other.Chunk.Equals(Chunk) && other.TotChunks.Equals(TotChunks) && Digests.SequenceEqual(other.Digests); + + return other.Chunk.Equals(Chunk) && other.TotalChunks.Equals(TotalChunks) && Digests.SequenceEqual(other.Digests); } - public override bool Equals(object obj) => obj is Status && Equals((Status) obj); + public override bool Equals(object obj) => obj is Status && Equals((Status)obj); public override int GetHashCode() { unchecked { var hashCode = (Digests != null ? Digests.GetHashCode() : 0); - hashCode = (hashCode*397) ^ Chunk; - hashCode = (hashCode*397) ^ TotChunks; + hashCode = (hashCode * 397) ^ Chunk; + hashCode = (hashCode * 397) ^ TotalChunks; return hashCode; } } + + public override string ToString() + { + var sb = new StringBuilder("{"); + if (Digests != null) + foreach (var entry in Digests) + { + sb.Append(entry.Key).Append("->").Append(entry.Value).Append(","); + } + sb.Append('}'); + + return $"Status(chunk={Chunk}, totalChunks={TotalChunks}, digest={sb})"; + } } [Serializable] @@ -366,7 +386,7 @@ public bool Equals(Gossip other) return other.SendBack.Equals(SendBack) && UpdatedData.SequenceEqual(other.UpdatedData); } - public override bool Equals(object obj) => obj is Gossip && Equals((Gossip) obj); + public override bool Equals(object obj) => obj is Gossip && Equals((Gossip)obj); public override int GetHashCode() { @@ -376,5 +396,17 @@ public override int GetHashCode() } } + public override string ToString() + { + var sb = new StringBuilder("{"); + if (UpdatedData != null) + foreach (var entry in UpdatedData) + { + sb.Append(entry.Key).Append("->").Append(entry.Value).Append(","); + } + sb.Append('}'); + + return $"Gossip(sendBack={SendBack}, updatedData={sb})"; + } } } diff --git a/src/contrib/cluster/Akka.DistributedData/Key.cs b/src/contrib/cluster/Akka.DistributedData/Key.cs index 247445bb65a..76ea8fa2cac 100644 --- a/src/contrib/cluster/Akka.DistributedData/Key.cs +++ b/src/contrib/cluster/Akka.DistributedData/Key.cs @@ -23,7 +23,7 @@ interface IKeyWithGenericType : IKey /// /// Key for the key-value data in . The type of the data value - /// is defined in the key. Keys are compared equal if the `id` strings are equal, + /// is defined in the key. KeySet are compared equal if the `id` strings are equal, /// i.e. use unique identifiers. /// /// Specific classes are provided for the built in data types, e.g. , diff --git a/src/contrib/cluster/Akka.DistributedData/LWWDictionary.cs b/src/contrib/cluster/Akka.DistributedData/LWWDictionary.cs index b625714a9d2..384a35710cb 100644 --- a/src/contrib/cluster/Akka.DistributedData/LWWDictionary.cs +++ b/src/contrib/cluster/Akka.DistributedData/LWWDictionary.cs @@ -6,10 +6,14 @@ //----------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Text; +using Akka.Actor; using Akka.Cluster; +using Akka.Util; namespace Akka.DistributedData { @@ -34,7 +38,7 @@ public static LWWDictionary Create(IEnumerable - /// Specialized with values. + /// Specialized with values. /// /// relies on synchronized clocks and should only be used when the choice of /// value is not important for concurrent updates occurring within the clock skew. @@ -50,8 +54,14 @@ public static LWWDictionary Create(IEnumerable [Serializable] public sealed class LWWDictionary : IReplicatedData>, - IRemovedNodePruning>, IReplicatedDataSerialization, IEquatable> + IRemovedNodePruning>, + IReplicatedDataSerialization, + IEquatable>, + IEnumerable> { + /// + /// An empty instance of the + /// public static readonly LWWDictionary Empty = new LWWDictionary(ORDictionary>.Empty); private readonly ORDictionary> _underlying; @@ -61,16 +71,41 @@ public LWWDictionary(ORDictionary> underlying) _underlying = underlying; } + /// + /// Returns all entries stored within current + /// public IImmutableDictionary Entries => _underlying.Entries .Select(kv => new KeyValuePair(kv.Key, kv.Value.Value)) .ToImmutableDictionary(); + /// + /// Returns collection of keys stored within current . + /// + public IEnumerable Keys => _underlying.Keys; + + /// + /// Returns collection of values stored within current . + /// + public IEnumerable Values => _underlying.Values.Select(x => x.Value); + + /// + /// Returns value stored under provided . + /// public TValue this[TKey key] => _underlying[key].Value; + /// + /// Determines current contains entry with provided . + /// public bool ContainsKey(TKey key) => _underlying.ContainsKey(key); + /// + /// Determines if current is empty. + /// public bool IsEmpty => _underlying.IsEmpty; + /// + /// Returns number of entries stored within current . + /// public int Count => _underlying.Count; /// @@ -100,6 +135,22 @@ public LWWDictionary SetItem(UniqueAddress node, TKey key, TValue public LWWDictionary Remove(UniqueAddress node, TKey key) => new LWWDictionary(_underlying.Remove(node, key)); + /// + /// Tries to return a value under provided is such value exists. + /// + public bool TryGetValue(TKey key, out TValue value) + { + LWWRegister register; + if (_underlying.TryGetValue(key, out register)) + { + value = register.Value; + return true; + } + + value = default(TValue); + return false; + } + public LWWDictionary Merge(LWWDictionary other) => new LWWDictionary(_underlying.Merge(other._underlying)); @@ -123,12 +174,24 @@ public bool Equals(LWWDictionary other) return _underlying.Equals(other._underlying); } + public IEnumerator> GetEnumerator() => + _underlying.Select(x => new KeyValuePair(x.Key, x.Value.Value)).GetEnumerator(); + public override bool Equals(object obj) => obj is LWWDictionary && Equals((LWWDictionary) obj); - public override int GetHashCode() + public override int GetHashCode() => _underlying.GetHashCode(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public override string ToString() { - return _underlying.GetHashCode(); + var sb = new StringBuilder("LWWDictionary("); + foreach (var entry in Entries) + { + sb.Append(entry.Key).Append("->").Append(entry.Value).Append(", "); + } + sb.Append(')'); + return sb.ToString(); } } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/LWWRegister.cs b/src/contrib/cluster/Akka.DistributedData/LWWRegister.cs index 33555abe7c7..6b8e9e8920d 100644 --- a/src/contrib/cluster/Akka.DistributedData/LWWRegister.cs +++ b/src/contrib/cluster/Akka.DistributedData/LWWRegister.cs @@ -6,8 +6,12 @@ //----------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; +using System.Collections.Immutable; +using Akka.Actor; using Akka.Cluster; +using Akka.Util; namespace Akka.DistributedData { @@ -27,8 +31,6 @@ public LWWRegisterKey(string id) : base(id) { } } - - public interface ILWWRegister {} /// /// Implements a 'Last Writer Wins Register' CRDT, also called a 'LWW-Register'. @@ -53,7 +55,7 @@ public interface ILWWRegister {} /// This class is immutable, i.e. "modifying" methods return a new instance. /// [Serializable] - public class LWWRegister : ILWWRegister, IReplicatedData>, IReplicatedDataSerialization, IEquatable> + public class LWWRegister : IReplicatedData>, IReplicatedDataSerialization, IEquatable> { /// /// Default clock is using max between DateTime.UtcNow.Ticks and current timestamp + 1. @@ -89,10 +91,19 @@ public LWWRegister(UniqueAddress node, T initial, Clock clock) Timestamp = clock(0L, initial); } + /// + /// Returns a timestamp used to determine predecende in current register updates. + /// public long Timestamp { get; } + /// + /// Returns value of the current register. + /// public T Value { get; } + /// + /// Returns a unique address of the last cluster node, that updated current register value. + /// public UniqueAddress UpdatedBy { get; } /// @@ -138,5 +149,7 @@ public override int GetHashCode() return hashCode; } } + + public override string ToString() => $"LWWRegister(value={Value}, timestamp={Timestamp}, updatedBy={UpdatedBy})"; } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/ClusterReplicatedDataExtensions.cs b/src/contrib/cluster/Akka.DistributedData/Local/ClusterReplicatedDataExtensions.cs new file mode 100644 index 00000000000..cf2bb93d909 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/ClusterReplicatedDataExtensions.cs @@ -0,0 +1,98 @@ +namespace Akka.DistributedData.Local +{ + /// + /// Extensions methods container to ease building CRDTs scoped to local cluster. + /// + public static class ClusterReplicatedDataExtensions + { + /// + /// Creates a new instance of GCounter, that works within the scope of the current cluster. + /// + public static LocalGCounter GCounter(this Cluster.Cluster cluster) => + new LocalGCounter(cluster, Akka.DistributedData.GCounter.Empty); + + /// + /// Creates an instance of GCounter scoped to a current cluster. + /// + public static LocalGCounter GCounter(this Cluster.Cluster cluster, GCounter counter) => + new LocalGCounter(cluster, counter); + + /// + /// Creates a new instance of PNCounter, that works within the scope of the current cluster. + /// + public static LocalPNCounter PNCounter(this Cluster.Cluster cluster) => + new LocalPNCounter(cluster, Akka.DistributedData.PNCounter.Empty); + + /// + /// Creates an instance of PNCounter scoped to a current cluster. + /// + public static LocalPNCounter PNCounter(this Cluster.Cluster cluster, PNCounter counter) => + new LocalPNCounter(cluster, counter); + + /// + /// Creates a new instance of , that works within the scope of the current cluster. + /// + public static LocalORSet ORSet(this Cluster.Cluster cluster) => + new LocalORSet(cluster, Akka.DistributedData.ORSet.Empty); + + /// + /// Creates an instance of an ORSet scoped to a current cluster. + /// + public static LocalORSet ORSet(this Cluster.Cluster cluster, ORSet orset) => + new LocalORSet(cluster, orset); + + /// + /// Creates a new instance of an ORDictionary scoped to a current cluster. + /// + public static LocalORDictionary ORDictionary(this Cluster.Cluster cluster) where TVal : IReplicatedData => + new LocalORDictionary(cluster, Akka.DistributedData.ORDictionary.Empty); + + /// + /// Creates an instance of an ORDictionary scoped to a current cluster. + /// + public static LocalORDictionary ORDictionary(this Cluster.Cluster cluster, ORDictionary dictionary) where TVal : IReplicatedData => + new LocalORDictionary(cluster, dictionary); + + /// + /// Creates a new instance of an ORMultiDictionary scoped to a current cluster. + /// + public static LocalORMultiDictionary ORMultiDictionary(this Cluster.Cluster cluster) => + new LocalORMultiDictionary(cluster, Akka.DistributedData.ORMultiDictionary.Empty); + + /// + /// Creates an instance of an ORMultiDictionary scoped to a current cluster. + /// + public static LocalORMultiDictionary ORMultiDictionary(this Cluster.Cluster cluster, ORMultiDictionary dictionary) => + new LocalORMultiDictionary(cluster, dictionary); + + /// + /// Creates a new instance of an PNCounterDictionary scoped to a current cluster. + /// + public static LocalPNCounterDictionary PNCounterDictionary(this Cluster.Cluster cluster) => + new LocalPNCounterDictionary(cluster, Akka.DistributedData.PNCounterDictionary.Empty); + + /// + /// Creates an instance of an PNCounterDictionary scoped to a current cluster. + /// + public static LocalPNCounterDictionary PNCounterDictionary(this Cluster.Cluster cluster, PNCounterDictionary dictionary) => + new LocalPNCounterDictionary(cluster, dictionary); + + /// + /// Creates an instance of an LWWRegister scoped to a current cluster. + /// + public static LocalLWWRegister LWWRegister(this Cluster.Cluster cluster, LWWRegister register) => + new LocalLWWRegister(cluster, register); + /// + /// Creates a new instance of an LWWDictionary scoped to a current cluster. + /// + public static LocalLWWDictionary LWWDictionary(this Cluster.Cluster cluster) => + new LocalLWWDictionary(cluster, Akka.DistributedData.LWWDictionary.Empty); + + /// + /// Creates an instance of an LWWDictionary scoped to a current cluster. + /// + public static LocalLWWDictionary LWWDictionary(this Cluster.Cluster cluster, LWWDictionary dictionary) => + new LocalLWWDictionary(cluster, dictionary); + + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalGCounter.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalGCounter.cs new file mode 100644 index 00000000000..03234f9dbd9 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalGCounter.cs @@ -0,0 +1,80 @@ +using System.Numerics; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + /// + /// A wrapper around instance, that binds it's operations to a current cluster node. + /// + public struct LocalGCounter : ISurrogated + { + internal sealed class Surrogate : ISurrogate + { + private readonly GCounter _counter; + + public Surrogate(GCounter counter) + { + this._counter = counter; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalGCounter(Cluster.Cluster.Get(system).SelfUniqueAddress, _counter); + } + + private readonly UniqueAddress _currentNode; + private readonly GCounter _crdt; + + internal LocalGCounter(UniqueAddress currentNode, GCounter crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalGCounter(Cluster.Cluster cluster, GCounter counter) : this(cluster.SelfUniqueAddress, counter) + { + } + + /// + /// Returns a value of the underlying GCounter. + /// + public BigInteger Value => _crdt.Value; + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalGCounter Merge(GCounter counter) => new LocalGCounter(_currentNode, _crdt.Merge(counter)); + + /// + /// Increments current GCounter value by 1 in current cluster node context. + /// + public static LocalGCounter operator ++(LocalGCounter counter) + { + var node = counter._currentNode; + return new LocalGCounter(node, counter._crdt.Increment(node)); + } + + /// + /// Increments current GCounter value by provided in current cluster node context. + /// + public static LocalGCounter operator +(LocalGCounter counter, ulong delta) + { + var node = counter._currentNode; + return new LocalGCounter(node, counter._crdt.Increment(node, delta)); + } + + /// + /// Increments current GCounter value by provided in current cluster node context. + /// + public static LocalGCounter operator +(LocalGCounter counter, BigInteger delta) + { + var node = counter._currentNode; + return new LocalGCounter(node, counter._crdt.Increment(node, delta)); + } + + public static implicit operator GCounter(LocalGCounter counter) => counter._crdt; + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalLWWDictionary.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalLWWDictionary.cs new file mode 100644 index 00000000000..49941fe3786 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalLWWDictionary.cs @@ -0,0 +1,100 @@ +using System.Collections; +using System.Collections.Generic; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + /// + /// A wrapper around that works in the context of the current cluster node. + /// + /// + /// + public struct LocalLWWDictionary : ISurrogated, IEnumerable> + { + internal sealed class Surrogate : ISurrogate + { + private readonly LWWDictionary _dictionary; + + public Surrogate(LWWDictionary dictionary) + { + _dictionary = dictionary; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalLWWDictionary(Cluster.Cluster.Get(system), _dictionary); + } + + private readonly UniqueAddress _currentNode; + private readonly LWWDictionary _crdt; + + internal LocalLWWDictionary(UniqueAddress currentNode, LWWDictionary crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalLWWDictionary(Cluster.Cluster cluster, LWWDictionary crdt) : this(cluster.SelfUniqueAddress, crdt) + { + } + + /// + /// Returns collection of the elements inside the current set. + /// + public IImmutableDictionary Entries => _crdt.Entries; + + /// + /// Returns number of elements inside the current set. + /// + public int Count => _crdt.Count; + + /// + /// Determines if underlying LWWDictionary is empty. + /// + public bool IsEmpty => _crdt.IsEmpty; + + /// + /// Gets or sets provided key-valu of the underlying ORDicationary within scope of the current cluster node. + /// + public TVal this[TKey key] => _crdt[key]; + + /// + /// Gets value determining, if underlying LWWDictionary contains specified . + /// + public bool ContainsKey(TKey key) => _crdt.ContainsKey(key); + + /// + /// Tries to retrieve element stored under provided in the underlying LWWDictionary, + /// returning true if such value existed. + /// + public bool TryGetValue(TKey key, out TVal value) => _crdt.TryGetValue(key, out value); + + /// + /// Stored provided in entry with given inside the + /// underlying LWWDictionary in scope of the current node, and returning new local dictionary in result. + /// + public LocalLWWDictionary SetItem(TKey key, TVal value) => + new LocalLWWDictionary(_currentNode, _crdt.SetItem(_currentNode, key, value)); + + /// + /// Removes an entry from underlying LWWDictionary in the context of the current cluster node, given a . + /// + public LocalLWWDictionary Remove(TKey key) => + new LocalLWWDictionary(_currentNode, _crdt.Remove(_currentNode, key)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + public IEnumerator> GetEnumerator() => _crdt.GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalLWWDictionary Merge(LWWDictionary dictionary) => + new LocalLWWDictionary(_currentNode, _crdt.Merge(dictionary)); + + public static implicit operator LWWDictionary(LocalLWWDictionary set) => set._crdt; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalLWWRegister.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalLWWRegister.cs new file mode 100644 index 00000000000..26283399d27 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalLWWRegister.cs @@ -0,0 +1,72 @@ +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + + /// + /// A wrapper around that will work in the context of the current cluster node. + /// + /// + public struct LocalLWWRegister : ISurrogated + { + internal sealed class Surrogate : ISurrogate + { + private readonly LWWRegister register; + + public Surrogate(LWWRegister register) + { + this.register = register; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalLWWRegister(Cluster.Cluster.Get(system), register); + } + + private readonly UniqueAddress _currentNode; + private readonly LWWRegister _crdt; + + internal LocalLWWRegister(UniqueAddress currentNode, LWWRegister crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalLWWRegister(Cluster.Cluster cluster, LWWRegister crdt) : this(cluster.SelfUniqueAddress, crdt) + { + } + + /// + /// Returns a timestamp used to determine predecende in current register updates. + /// + public long Timestamp => _crdt.Timestamp; + + /// + /// Returns value of the current register. + /// + public T Value => _crdt.Value; + + /// + /// Returns a unique address of the last cluster node, that updated current register value. + /// + public UniqueAddress UpdatedBy => _crdt.UpdatedBy; + + /// + /// Change the value of the undelrying register. + /// + public LocalLWWRegister WithValue(T value, Clock clock = null) => + new LocalLWWRegister(_currentNode, _crdt.WithValue(_currentNode, value, clock)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalLWWRegister Merge(LWWRegister register) => + new LocalLWWRegister(_currentNode, _crdt.Merge(register)); + + public static implicit operator LWWRegister(LocalLWWRegister set) => set._crdt; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalORDictionary.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalORDictionary.cs new file mode 100644 index 00000000000..d4d85db3c2a --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalORDictionary.cs @@ -0,0 +1,108 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + /// + /// A wrapper around that works in context of the current cluster. + /// + /// + /// + public struct LocalORDictionary : ISurrogated, IEnumerable> where TVal : IReplicatedData + { + internal sealed class Surrogate : ISurrogate + { + private readonly ORDictionary _dictionary; + + public Surrogate(ORDictionary dictionary) + { + _dictionary = dictionary; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalORDictionary(Cluster.Cluster.Get(system), _dictionary); + } + + private readonly UniqueAddress _currentNode; + private readonly ORDictionary _crdt; + + internal LocalORDictionary(UniqueAddress currentNode, ORDictionary crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalORDictionary(Cluster.Cluster cluster, ORDictionary crdt) : this(cluster.SelfUniqueAddress, crdt) + { + } + + /// + /// Returns collection of the elements inside the current set. + /// + public IImmutableDictionary Entries => _crdt.Entries; + + /// + /// Returns number of elements inside the current set. + /// + public int Count => _crdt.Count; + + /// + /// Determines if underlying ORDictionary is empty. + /// + public bool IsEmpty => _crdt.IsEmpty; + + /// + /// Gets or sets provided key-valu of the underlying ORDicationary within scope of the current cluster node. + /// + public TVal this[TKey key] => _crdt[key]; + + /// + /// Gets value determining, if underlying ORDictionary contains specified . + /// + public bool ContainsKey(TKey key) => _crdt.ContainsKey(key); + + /// + /// Tries to retrieve element stored under provided in the underlying ORDictionary, + /// returning true if such value existed. + /// + public bool TryGetValue(TKey key, out TVal value) => _crdt.TryGetValue(key, out value); + + /// + /// Stored provided in entry with given inside the + /// underlying ORDictionary in scope of the current node, and returning new local dictionary in result. + /// + public LocalORDictionary SetItem(TKey key, TVal value) => + new LocalORDictionary(_currentNode, _crdt.SetItem(_currentNode, key, value)); + + /// + /// Adds or updated a value in entry with given using function + /// if other value existed there previously, within a constext of the current cluster node. + /// + public LocalORDictionary AddOrUpdate(TKey key, TVal value, Func modify) => + new LocalORDictionary(_currentNode, _crdt.AddOrUpdate(_currentNode, key, value, modify)); + + /// + /// Removes an entry from underlying ORDictionary in the context of the current cluster node, given a . + /// + public LocalORDictionary Remove(TKey key) => + new LocalORDictionary(_currentNode, _crdt.Remove(_currentNode, key)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + public IEnumerator> GetEnumerator() => _crdt.GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalORDictionary Merge(ORDictionary dictionary) => + new LocalORDictionary(_currentNode, _crdt.Merge(dictionary)); + + public static implicit operator ORDictionary(LocalORDictionary set) => set._crdt; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalORMultiDictionary.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalORMultiDictionary.cs new file mode 100644 index 00000000000..3bcad2665ca --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalORMultiDictionary.cs @@ -0,0 +1,132 @@ +using System.Collections; +using System.Collections.Generic; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + + /// + /// A wrapper around that works in context of the current cluster. + /// + /// + /// + public struct LocalORMultiDictionary : ISurrogated, IEnumerable>> + { + internal sealed class Surrogate : ISurrogate + { + private readonly ORMultiDictionary _dictionary; + + public Surrogate(ORMultiDictionary dictionary) + { + _dictionary = dictionary; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalORMultiDictionary(Cluster.Cluster.Get(system), _dictionary); + } + + private readonly UniqueAddress _currentNode; + private readonly ORMultiDictionary _crdt; + + internal LocalORMultiDictionary(UniqueAddress currentNode, ORMultiDictionary crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalORMultiDictionary(Cluster.Cluster cluster, ORMultiDictionary crdt) : this(cluster.SelfUniqueAddress, crdt) + { + } + + /// + /// Returns collection of the elements inside the underlying ORMultiDictionary. + /// + public IImmutableDictionary> Entries => _crdt.Entries; + + /// + /// Returns all keys stored within underlying ORMultiDictionary. + /// + public IEnumerable Keys => _crdt.Keys; + + /// + /// Returns all values stored in all buckets within underlying ORMultiDictionary. + /// + public IEnumerable Values => _crdt.Values; + + /// + /// Returns number of elements inside the unterlying ORMultiDictionary. + /// + public int Count => _crdt.Count; + + /// + /// Determines if underlying ORMultiDictionary is empty. + /// + public bool IsEmpty => _crdt.IsEmpty; + + /// + /// Gets or sets provided key-valu of the underlying ORMultiDictionary within scope of the current cluster node. + /// + public IImmutableSet this[TKey key] => _crdt[key]; + + /// + /// Gets value determining, if underlying ORMultiDictionary contains specified . + /// + public bool ContainsKey(TKey key) => _crdt.ContainsKey(key); + + /// + /// Tries to retrieve element stored under provided in the underlying ORMultiDictionary, + /// returning true if such value existed. + /// + public bool TryGetValue(TKey key, out IImmutableSet value) => _crdt.TryGetValue(key, out value); + + /// + /// Stored provided in entry with given inside the + /// underlying ORMultiDictionary in scope of the current node, and returning new local dictionary in result. + /// + public LocalORMultiDictionary SetItems(TKey key, IImmutableSet value) => + new LocalORMultiDictionary(_currentNode, _crdt.SetItems(_currentNode, key, value)); + + /// + /// Adds provided into a bucket under provided + /// within the context of the current cluster. + /// + public LocalORMultiDictionary AddItem(TKey key, TVal value) => + new LocalORMultiDictionary(_currentNode, _crdt.AddItem(_currentNode, key, value)); + + /// + /// Removes provided from a bucket under provided + /// within the context of the current cluster. + /// + public LocalORMultiDictionary RemoveItem(TKey key, TVal value) => + new LocalORMultiDictionary(_currentNode, _crdt.RemoveItem(_currentNode, key, value)); + + /// + /// Replaces provided with inside + /// a bucket under provided within the context of the current cluster. + /// + public LocalORMultiDictionary ReplaceItem(TKey key, TVal oldValue, TVal newValue) => + new LocalORMultiDictionary(_currentNode, _crdt.ReplaceItem(_currentNode, key, oldValue, newValue)); + + /// + /// Removes a bucket from underlying ORMultiDictionary in the context of the current cluster node, given a . + /// + public LocalORMultiDictionary Remove(TKey key) => + new LocalORMultiDictionary(_currentNode, _crdt.Remove(_currentNode, key)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + public IEnumerator>> GetEnumerator() => _crdt.GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalORMultiDictionary Merge(ORMultiDictionary dictionary) => + new LocalORMultiDictionary(_currentNode, _crdt.Merge(dictionary)); + + public static implicit operator ORMultiDictionary(LocalORMultiDictionary set) => set._crdt; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalORSet.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalORSet.cs new file mode 100644 index 00000000000..111c7fd524c --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalORSet.cs @@ -0,0 +1,91 @@ +using System.Collections; +using System.Collections.Generic; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + + /// + /// A wrapper around instance, that binds it's operations to a current cluster node. + /// + public struct LocalORSet : ISurrogated, IEnumerable + { + internal sealed class Surrogate : ISurrogate + { + private readonly ORSet _set; + + public Surrogate(ORSet set) + { + _set = set; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalORSet(Cluster.Cluster.Get(system).SelfUniqueAddress, _set); + } + + private readonly UniqueAddress _currentNode; + private readonly ORSet _crdt; + + internal LocalORSet(UniqueAddress currentNode, ORSet crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalORSet(Cluster.Cluster cluster, ORSet crdt) : this(cluster.SelfUniqueAddress, crdt) + { + } + + /// + /// Returns collection of the elements inside the current set. + /// + public IImmutableSet Elements => _crdt.Elements; + + /// + /// Returns number of elements inside the current set. + /// + public int Count => _crdt.Count; + + /// + /// Clears underlying ORSet in scope of the current cluster node. + /// + /// + public LocalORSet Clear() => new LocalORSet(_currentNode, _crdt.Clear(_currentNode)); + + /// + /// Checks if target exists within underlying ORSet. + /// + /// + /// + public bool Contains(T element) => _crdt.Contains(element); + + /// + /// Adds an to the underlying ORSet in scope of a current cluster node. + /// + /// + /// + public LocalORSet Add(T element) => new LocalORSet(_currentNode, _crdt.Add(_currentNode, element)); + + /// + /// Remose an from the underlying ORSet in scope of a current cluster node. + /// + /// + /// + public LocalORSet Remove(T element) => new LocalORSet(_currentNode, _crdt.Remove(_currentNode, element)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + public IEnumerator GetEnumerator() => _crdt.GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalORSet Merge(ORSet set) => new LocalORSet(_currentNode, _crdt.Merge(set)); + + public static implicit operator ORSet(LocalORSet set) => set._crdt; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounter.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounter.cs new file mode 100644 index 00000000000..f10c7da2810 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounter.cs @@ -0,0 +1,110 @@ +using System.Numerics; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + + /// + /// A wrapper around instance, that binds it's operations to a current cluster node. + /// + public struct LocalPNCounter : ISurrogated + { + internal sealed class Surrogate : ISurrogate + { + private readonly PNCounter _counter; + + public Surrogate(PNCounter counter) + { + _counter = counter; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalPNCounter(Cluster.Cluster.Get(system).SelfUniqueAddress, _counter); + } + + private readonly UniqueAddress _currentNode; + private readonly PNCounter _crdt; + + internal LocalPNCounter(UniqueAddress currentNode, PNCounter crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalPNCounter(Cluster.Cluster cluster, PNCounter counter) : this(cluster.SelfUniqueAddress, counter) + { + } + + /// + /// Returns value of the underlying PNCounter. + /// + public BigInteger Value => _crdt.Value; + + /// + /// Increments value of the underlying PNCounter by 1 in current cluster node context. + /// + public static LocalPNCounter operator ++(LocalPNCounter counter) + { + var node = counter._currentNode; + return new LocalPNCounter(node, counter._crdt.Increment(node)); + } + + /// + /// Decrements value of the underlying PNCounter by 1 in current cluster node context. + /// + public static LocalPNCounter operator --(LocalPNCounter counter) + { + var node = counter._currentNode; + return new LocalPNCounter(node, counter._crdt.Decrement(node)); + } + + /// + /// Increments value of the underlying PNCounter by provided in current cluster node context. + /// + public static LocalPNCounter operator +(LocalPNCounter counter, ulong delta) + { + var node = counter._currentNode; + return new LocalPNCounter(node, counter._crdt.Increment(node, delta)); + } + + /// + /// Increments value of the underlying PNCounter by provided in current cluster node context. + /// + public static LocalPNCounter operator +(LocalPNCounter counter, BigInteger delta) + { + var node = counter._currentNode; + return new LocalPNCounter(node, counter._crdt.Increment(node, delta)); + } + + /// + /// Decrements value of the underlying PNCounter by provided in current cluster node context. + /// + public static LocalPNCounter operator -(LocalPNCounter counter, ulong delta) + { + var node = counter._currentNode; + return new LocalPNCounter(node, counter._crdt.Decrement(node, delta)); + } + + /// + /// Decrements value of the underlying PNCounter by provided in current cluster node context. + /// + public static LocalPNCounter operator -(LocalPNCounter counter, BigInteger delta) + { + var node = counter._currentNode; + return new LocalPNCounter(node, counter._crdt.Decrement(node, delta)); + } + + public static implicit operator PNCounter(LocalPNCounter counter) => counter._crdt; + + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalPNCounter Merge(PNCounter counter) => new LocalPNCounter(_currentNode, _crdt.Merge(counter)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounterDictionary.cs b/src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounterDictionary.cs new file mode 100644 index 00000000000..45c8f7a1cf0 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Local/LocalPNCounterDictionary.cs @@ -0,0 +1,123 @@ +using System.Collections; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Numerics; +using Akka.Actor; +using Akka.Cluster; +using Akka.Util; + +namespace Akka.DistributedData.Local +{ + + /// + /// Wrapper around that provides + /// execution context of the current cluster node. + /// + /// + public struct LocalPNCounterDictionary : ISurrogated, IEnumerable> + { + internal sealed class Surrogate : ISurrogate + { + private readonly PNCounterDictionary _dictionary; + + public Surrogate(PNCounterDictionary dictionary) + { + _dictionary = dictionary; + } + + public ISurrogated FromSurrogate(ActorSystem system) => + new LocalPNCounterDictionary(Cluster.Cluster.Get(system), _dictionary); + } + + private readonly UniqueAddress _currentNode; + private readonly PNCounterDictionary _crdt; + + internal LocalPNCounterDictionary(UniqueAddress currentNode, PNCounterDictionary crdt) : this() + { + _currentNode = currentNode; + _crdt = crdt; + } + + public LocalPNCounterDictionary(Cluster.Cluster cluster, PNCounterDictionary crdt) : this(cluster.SelfUniqueAddress, crdt) + { + } + + /// + /// Returns collection of the elements inside the underlying PNCounterDictionary. + /// + public IImmutableDictionary Entries => _crdt.Entries; + + /// + /// Returns all keys stored within underlying PNCounterDictionary. + /// + public IEnumerable Keys => _crdt.Keys; + + /// + /// Returns all values stored in all buckets within underlying PNCounterDictionary. + /// + public IEnumerable Values => _crdt.Values; + + /// + /// Returns number of elements inside the unterlying PNCounterDictionary. + /// + public int Count => _crdt.Count; + + /// + /// Determines if underlying PNCounterDictionary is empty. + /// + public bool IsEmpty => _crdt.IsEmpty; + + /// + /// Gets or sets provided key-valu of the underlying PNCounterDictionary within scope of the current cluster node. + /// + public BigInteger this[TKey key] => _crdt[key]; + + /// + /// Gets value determining, if underlying PNCounterDictionary contains specified . + /// + public bool ContainsKey(TKey key) => _crdt.ContainsKey(key); + + /// + /// Tries to retrieve element stored under provided in the underlying PNCounterDictionary, + /// returning true if such value existed. + /// + public bool TryGetValue(TKey key, out BigInteger value) => _crdt.TryGetValue(key, out value); + + /// + /// Increment the counter with the delta specified. + /// If the delta is negative then it will decrement instead of increment. + /// + public LocalPNCounterDictionary Increment(TKey key, long delta = 1L) => + new LocalPNCounterDictionary(_currentNode, _crdt.Increment(_currentNode, key, delta)); + + /// + /// Decrement the counter with the delta specified. + /// If the delta is negative then it will increment instead of decrement. + /// + public LocalPNCounterDictionary Decrement(TKey key, long delta = 1L) => + new LocalPNCounterDictionary(_currentNode, _crdt.Decrement(_currentNode, key, delta)); + + /// + /// Removes an entry from the map. + /// Note that if there is a conflicting update on another node the entry will + /// not be removed after merge. + /// + public LocalPNCounterDictionary Remove(TKey key) => + new LocalPNCounterDictionary(_currentNode, _crdt.Remove(_currentNode, key)); + + public ISurrogate ToSurrogate(ActorSystem system) => new Surrogate(_crdt); + + public IEnumerator> GetEnumerator() => _crdt.GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Merges data from provided into current CRDT, + /// creating new immutable instance in a result. + /// + public LocalPNCounterDictionary Merge(PNCounterDictionary dictionary) => + new LocalPNCounterDictionary(_currentNode, _crdt.Merge(dictionary)); + + public static implicit operator PNCounterDictionary(LocalPNCounterDictionary set) => set._crdt; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/ORDictionary.cs b/src/contrib/cluster/Akka.DistributedData/ORDictionary.cs index dbbd93aca3c..2ee9717e446 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORDictionary.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORDictionary.cs @@ -6,11 +6,15 @@ //----------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Text; +using Akka.Actor; using Akka.Cluster; using Akka.Pattern; +using Akka.Util; namespace Akka.DistributedData { @@ -19,17 +23,7 @@ public sealed class ORDictionaryKey : Key : IORDictionary - { - - } - + public static class ORDictionary { public static ORDictionary Create(UniqueAddress node, TKey key, TValue value) where TValue : IReplicatedData => @@ -51,32 +45,69 @@ public static ORDictionary Create(IEnumerable [Serializable] - public class ORDictionary : IReplicatedData>, IORDictionary, + public class ORDictionary : IReplicatedData>, IEnumerable>, IRemovedNodePruning>, IEquatable>, IReplicatedDataSerialization where TValue : IReplicatedData { + /// + /// An empty instance of the + /// public static readonly ORDictionary Empty = new ORDictionary(ORSet.Empty, ImmutableDictionary.Empty); - internal readonly ORSet Keys; - internal readonly IImmutableDictionary Values; + internal readonly ORSet KeySet; + internal readonly IImmutableDictionary ValueMap; - public ORDictionary(ORSet keys, IImmutableDictionary values) + /// + /// Creates a new instance of the class. + /// + /// + /// + public ORDictionary(ORSet keySet, IImmutableDictionary valueMap) { - Keys = keys; - Values = values; + KeySet = keySet; + ValueMap = valueMap; } - public IImmutableDictionary Entries => Values; + /// + /// Returns all keys stored within current + /// + public IEnumerable Keys => KeySet; + + /// + /// Returns all values stored within current + /// + public IEnumerable Values => ValueMap.Values; + + /// + /// Returns all entries stored within current + /// + public IImmutableDictionary Entries => ValueMap; - public TValue this[TKey key] => Values[key]; + /// + /// Returns an element stored under provided . + /// + public TValue this[TKey key] => ValueMap[key]; - public bool TryGetValue(TKey key, out TValue value) => Values.TryGetValue(key, out value); + /// + /// Tries to retrieve value under provided , + /// returning true if value under that key has been found. + /// + public bool TryGetValue(TKey key, out TValue value) => ValueMap.TryGetValue(key, out value); - public bool ContainsKey(TKey key) => Values.ContainsKey(key); + /// + /// Checks if provided can be found inside current + /// + public bool ContainsKey(TKey key) => ValueMap.ContainsKey(key); - public bool IsEmpty => Values.Count == 0; + /// + /// Determines if current doesn't contain any value. + /// + public bool IsEmpty => ValueMap.Count == 0; - public int Count => Values.Count; + /// + /// Returns number of entries stored within current + /// + public int Count => ValueMap.Count; /// /// Adds an entry to the map. @@ -93,10 +124,10 @@ public ORDictionary(ORSet keys, IImmutableDictionary values) /// public ORDictionary SetItem(UniqueAddress node, TKey key, TValue value) { - if (value is IORSet && Values.ContainsKey(key)) - throw new ArgumentException("ORDictionary.SetItem may not be used to replace an existing ORSet", nameof(value)); + if (value is IORSet && ValueMap.ContainsKey(key)) + throw new ArgumentException("ORDictionary.SetItems may not be used to replace an existing ORSet", nameof(value)); - return new ORDictionary(Keys.Add(node, key), Values.SetItem(key, value)); + return new ORDictionary(KeySet.Add(node, key), ValueMap.SetItem(key, value)); } /// @@ -109,9 +140,9 @@ public ORDictionary AddOrUpdate(UniqueAddress node, TKey key, TVal Func modify) { TValue value; - return Values.TryGetValue(key, out value) - ? new ORDictionary(Keys.Add(node, key), Values.SetItem(key, modify(value))) - : new ORDictionary(Keys.Add(node, key), Values.SetItem(key, modify(initial))); + return ValueMap.TryGetValue(key, out value) + ? new ORDictionary(KeySet.Add(node, key), ValueMap.SetItem(key, modify(value))) + : new ORDictionary(KeySet.Add(node, key), ValueMap.SetItem(key, modify(initial))); } /// @@ -120,17 +151,17 @@ public ORDictionary AddOrUpdate(UniqueAddress node, TKey key, TVal /// not be removed after merge. /// public ORDictionary Remove(UniqueAddress node, TKey key) => - new ORDictionary(Keys.Remove(node, key), Values.Remove(key)); + new ORDictionary(KeySet.Remove(node, key), ValueMap.Remove(key)); public ORDictionary Merge(ORDictionary other) { - var mergedKeys = Keys.Merge(other.Keys); + var mergedKeys = KeySet.Merge(other.KeySet); var mergedValues = ImmutableDictionary.Empty.ToBuilder(); foreach (var key in mergedKeys.Elements) { TValue left, right; - var leftFound = Values.TryGetValue(key, out left); - var rightFound = other.Values.TryGetValue(key, out right); + var leftFound = ValueMap.TryGetValue(key, out left); + var rightFound = other.ValueMap.TryGetValue(key, out right); if (leftFound && rightFound) mergedValues.Add(key, (TValue) left.Merge(right)); else if (leftFound) @@ -148,7 +179,7 @@ public ORDictionary Merge(ORDictionary other) public bool NeedPruningFrom(UniqueAddress removedNode) { - return Keys.NeedPruningFrom(removedNode) || Values.Any(x => + return KeySet.NeedPruningFrom(removedNode) || ValueMap.Any(x => { var data = x.Value as IRemovedNodePruning; return data != null && data.NeedPruningFrom(removedNode); @@ -157,8 +188,8 @@ public bool NeedPruningFrom(UniqueAddress removedNode) public ORDictionary Prune(UniqueAddress removedNode, UniqueAddress collapseInto) { - var prunedKeys = Keys.Prune(removedNode, collapseInto); - var prunedValues = Values.Aggregate(Values, (acc, kv) => + var prunedKeys = KeySet.Prune(removedNode, collapseInto); + var prunedValues = ValueMap.Aggregate(ValueMap, (acc, kv) => { var data = kv.Value as IRemovedNodePruning; return data != null && data.NeedPruningFrom(removedNode) @@ -171,8 +202,8 @@ public ORDictionary Prune(UniqueAddress removedNode, UniqueAddress public ORDictionary PruningCleanup(UniqueAddress removedNode) { - var pruningCleanupKeys = Keys.PruningCleanup(removedNode); - var pruningCleanupValues = Values.Aggregate(Values, (acc, kv) => + var pruningCleanupKeys = KeySet.PruningCleanup(removedNode); + var pruningCleanupValues = ValueMap.Aggregate(ValueMap, (acc, kv) => { var data = kv.Value as IRemovedNodePruning; return data != null && data.NeedPruningFrom(removedNode) @@ -188,7 +219,7 @@ public bool Equals(ORDictionary other) if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(this, other)) return true; - return Equals(Keys, other.Keys) && Values.SequenceEqual(other.Values); + return Equals(KeySet, other.KeySet) && ValueMap.SequenceEqual(other.ValueMap); } public override bool Equals(object obj) @@ -200,8 +231,23 @@ public override int GetHashCode() { unchecked { - return ((Keys.GetHashCode() *397) ^ Values.GetHashCode()); + return ((KeySet.GetHashCode() *397) ^ ValueMap.GetHashCode()); + } + } + + public IEnumerator> GetEnumerator() => ValueMap.GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public override string ToString() + { + var sb = new StringBuilder("ORDictionary("); + foreach (var entry in Entries) + { + sb.Append(entry.Key).Append("->").Append(entry.Value).Append(", "); } + sb.Append(')'); + return sb.ToString(); } } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs b/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs index f72cbd5dd77..d7534e2e633 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs @@ -6,10 +6,14 @@ //----------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Text; +using Akka.Actor; using Akka.Cluster; +using Akka.Util; namespace Akka.DistributedData { @@ -31,7 +35,8 @@ public ORMultiDictionaryKey(string id) : base(id) public class ORMultiDictionary : IReplicatedData>, IRemovedNodePruning>, - IReplicatedDataSerialization, IEquatable> + IReplicatedDataSerialization, IEquatable>, + IEnumerable>> { public static readonly ORMultiDictionary Empty = new ORMultiDictionary(ORDictionary>.Empty); @@ -67,14 +72,42 @@ public bool TryGetValue(TKey key, out IImmutableSet value) public int Count => _underlying.Count; - public ORMultiDictionary SetItem(UniqueAddress node, TKey key, IImmutableSet value) + /// + /// Returns all keys stored within current ORMultiDictionary. + /// + public IEnumerable Keys => _underlying.KeySet; + + /// + /// Returns all values stored in all buckets within current ORMultiDictionary. + /// + public IEnumerable Values + { + get + { + foreach (var value in _underlying.Values) + foreach (var v in value) + { + yield return v; + } + } + } + + /// + /// Sets a of values inside current dictionary under provided + /// in the context of the provided cluster . + /// + public ORMultiDictionary SetItems(UniqueAddress node, TKey key, IImmutableSet bucket) { var newUnderlying = _underlying.AddOrUpdate(node, key, ORSet.Empty, old => - value.Aggregate(old.Clear(node), (set, element) => set.Add(node, element))); + bucket.Aggregate(old.Clear(node), (set, element) => set.Add(node, element))); return new ORMultiDictionary(newUnderlying); } + /// + /// Removes all values inside current dictionary stored under provided + /// in the context of the provided cluster . + /// public ORMultiDictionary Remove(UniqueAddress node, TKey key) => new ORMultiDictionary(_underlying.Remove(node, key)); @@ -132,9 +165,29 @@ public bool Equals(ORMultiDictionary other) return Equals(_underlying, other._underlying); } + public IEnumerator>> GetEnumerator() => + _underlying.Select(x => new KeyValuePair>(x.Key, x.Value.Elements)).GetEnumerator(); + public override bool Equals(object obj) => obj is ORMultiDictionary && Equals((ORMultiDictionary) obj); public override int GetHashCode() => _underlying.GetHashCode(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public override string ToString() + { + var sb = new StringBuilder("ORMutliDictionary("); + foreach (var entry in Entries) + { + sb.Append(entry.Key).Append("-> ["); + foreach (var value in entry.Value) + { + sb.Append(value).Append(", "); + } + sb.Append("], "); + } + sb.Append(')'); + return sb.ToString(); + } } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/ORSet.cs b/src/contrib/cluster/Akka.DistributedData/ORSet.cs index 176dd21d1f8..623530f2188 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORSet.cs @@ -6,10 +6,15 @@ //----------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Numerics; +using System.Text; +using Akka.Actor; using Akka.Cluster; +using Akka.Util; namespace Akka.DistributedData { @@ -18,16 +23,8 @@ public sealed class ORSetKey : Key> { public ORSetKey(string id) : base(id) { } } - - public interface IORSet - { - - } - - public interface IORSet : IORSet - { - - } + + internal interface IORSet { } public static class ORSet { @@ -71,7 +68,7 @@ public static ORSet Create(IEnumerable> ele /// This class is immutable, i.e. "modifying" methods return a new instance. /// [Serializable] - public class ORSet : FastMerge>, IORSet, IReplicatedDataSerialization, IRemovedNodePruning>, IEquatable> + public class ORSet : FastMerge>, IORSet, IReplicatedDataSerialization, IRemovedNodePruning>, IEquatable>, IEnumerable { public static readonly ORSet Empty = new ORSet(); @@ -309,6 +306,8 @@ public bool Equals(ORSet other) return _versionVector == other._versionVector && _elementsMap.SequenceEqual(other._elementsMap); } + public IEnumerator GetEnumerator() => _elementsMap.Keys.GetEnumerator(); + public override bool Equals(object obj) => obj is ORSet && Equals((ORSet)obj); public override int GetHashCode() @@ -318,5 +317,18 @@ public override int GetHashCode() return (_elementsMap.GetHashCode() * 397) ^ (_versionVector.GetHashCode()); } } + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public override string ToString() + { + var sb = new StringBuilder("ORSet("); + foreach (var element in Elements) + { + sb.Append(element).Append(", "); + } + sb.Append(')'); + return sb.ToString(); + } } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/PNCounter.cs b/src/contrib/cluster/Akka.DistributedData/PNCounter.cs index 5f93d35f57d..5c814e9eafb 100644 --- a/src/contrib/cluster/Akka.DistributedData/PNCounter.cs +++ b/src/contrib/cluster/Akka.DistributedData/PNCounter.cs @@ -7,7 +7,10 @@ using Akka.Cluster; using System; +using System.Collections.Immutable; using System.Numerics; +using Akka.Actor; +using Akka.Util; namespace Akka.DistributedData { @@ -20,7 +23,7 @@ namespace Akka.DistributedData /// PN-Counters allow the counter to be incremented by tracking the /// increments (P) separate from the decrements (N). Both P and N are represented /// as two internal [[GCounter]]s. Merge is handled by merging the internal P and N - /// counters. The value of the counter is the value of the P counter minus + /// counters. The value of the counter is the value of the P _counter minus /// the value of the N counter. /// /// This class is immutable, i.e. "modifying" methods return a new instance. diff --git a/src/contrib/cluster/Akka.DistributedData/PNCounterDictionary.cs b/src/contrib/cluster/Akka.DistributedData/PNCounterDictionary.cs index a625d3737a3..6f00226265d 100644 --- a/src/contrib/cluster/Akka.DistributedData/PNCounterDictionary.cs +++ b/src/contrib/cluster/Akka.DistributedData/PNCounterDictionary.cs @@ -6,10 +6,14 @@ //----------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Numerics; +using System.Text; +using Akka.Actor; +using Akka.Util; using UniqueAddress = Akka.Cluster.UniqueAddress; namespace Akka.DistributedData @@ -19,7 +23,11 @@ namespace Akka.DistributedData /// with values. /// This class is immutable, i.e. "modifying" methods return a new instance. /// - public class PNCounterDictionary : IReplicatedData>, IRemovedNodePruning>, IReplicatedDataSerialization, IEquatable> + public class PNCounterDictionary : IReplicatedData>, + IRemovedNodePruning>, + IReplicatedDataSerialization, + IEquatable>, + IEnumerable> { public static readonly PNCounterDictionary Empty = new PNCounterDictionary(ORDictionary.Empty); @@ -30,18 +38,61 @@ public PNCounterDictionary(ORDictionary underlying) _underlying = underlying; } + /// + /// Returns all entries stored within current + /// public IImmutableDictionary Entries => _underlying.Entries .Select(kv => new KeyValuePair(kv.Key, kv.Value.Value)) .ToImmutableDictionary(); + /// + /// Returns a counter value stored within current + /// under provided + /// public BigInteger this[TKey key] => _underlying[key].Value; + /// + /// Determines if current has a counter + /// registered under provided . + /// public bool ContainsKey(TKey key) => _underlying.ContainsKey(key); + /// + /// Determines if current is empty. + /// public bool IsEmpty => _underlying.IsEmpty; + /// + /// Returns number of entries stored within current . + /// public int Count => _underlying.Count; + /// + /// Returns all keys of the current . + /// + public IEnumerable Keys => _underlying.Keys; + + /// + /// Returns all values stored within current . + /// + public IEnumerable Values => _underlying.Values.Select(x => x.Value); + + /// + /// Tries to return a value under provided , if such entry exists. + /// + public bool TryGetValue(TKey key, out BigInteger value) + { + PNCounter counter; + if (_underlying.TryGetValue(key, out counter)) + { + value = counter.Value; + return true; + } + + value = BigInteger.Zero; + return false; + } + /// /// Increment the counter with the delta specified. /// If the delta is negative then it will decrement instead of increment. @@ -87,10 +138,25 @@ public bool Equals(PNCounterDictionary other) return Equals(_underlying, other._underlying); } + public IEnumerator> GetEnumerator() => + _underlying.Select(x => new KeyValuePair(x.Key, x.Value.Value)).GetEnumerator(); + public override bool Equals(object obj) => obj is PNCounterDictionary && Equals((PNCounterDictionary) obj); public override int GetHashCode() => _underlying.GetHashCode(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public override string ToString() + { + var sb = new StringBuilder("PNCounterDictionary("); + foreach (var entry in Entries) + { + sb.Append(entry.Key).Append("->").Append(entry.Value).Append(", "); + } + sb.Append(')'); + return sb.ToString(); + } } public class PNCounterDictionaryKey : Key> diff --git a/src/contrib/cluster/Akka.DistributedData/PruningState.cs b/src/contrib/cluster/Akka.DistributedData/PruningState.cs index 16f31100df8..c3a4b1bd080 100644 --- a/src/contrib/cluster/Akka.DistributedData/PruningState.cs +++ b/src/contrib/cluster/Akka.DistributedData/PruningState.cs @@ -130,5 +130,7 @@ public override int GetHashCode() return ((Owner != null ? Owner.GetHashCode() : 0)*397) ^ (Phase != null ? Phase.GetHashCode() : 0); } } + + public override string ToString() => $"PrunningState(owner={Owner}, phase={Phase})"; } } diff --git a/src/contrib/cluster/Akka.DistributedData/Replicator.cs b/src/contrib/cluster/Akka.DistributedData/Replicator.cs index 1ba7a29d6f6..a0bf9dfcbce 100644 --- a/src/contrib/cluster/Akka.DistributedData/Replicator.cs +++ b/src/contrib/cluster/Akka.DistributedData/Replicator.cs @@ -122,7 +122,7 @@ protected override bool Receive(object message) => message.Match() .With(x => ReceiveFlushChanges()) .With(_ => ReceiveGossipTick()) .With(c => ReceiveClockTick()) - .With(s => ReceiveStatus(s.Digests, s.Chunk, s.TotChunks)) + .With(s => ReceiveStatus(s.Digests, s.Chunk, s.TotalChunks)) .With(g => ReceiveGossip(g.UpdatedData, g.SendBack)) .With(s => ReceiveSubscribe(s.Key, s.Subscriber)) .With(u => ReceiveUnsubscribe(u.Key, u.Subscriber)) diff --git a/src/core/Akka.FSharp/Properties/AssemblyInfo.fs b/src/core/Akka.FSharp/Properties/AssemblyInfo.fs index 7928701c9d2..b24a16ff814 100644 --- a/src/core/Akka.FSharp/Properties/AssemblyInfo.fs +++ b/src/core/Akka.FSharp/Properties/AssemblyInfo.fs @@ -1,11 +1,4 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- - -namespace System +namespace System open System open System.Reflection open System.Runtime.InteropServices @@ -23,4 +16,3 @@ do () module internal AssemblyVersionInformation = let [] Version = "1.1.2.0" -