Skip to content

Commit

Permalink
added local crdts operating on current cluster context
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Nov 9, 2016
1 parent db50474 commit c6b8709
Show file tree
Hide file tree
Showing 37 changed files with 1,226 additions and 75 deletions.
1 change: 1 addition & 0 deletions src/contrib/cluster/Akka.DistributedData.Tests/FlagSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class FlagSpec
{
public FlagSpec(ITestOutputHelper output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

using Akka.Cluster;
using System.Numerics;
using Akka.Actor;
using Xunit;
using Xunit.Abstractions;

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class GCounterSpec
{
private readonly UniqueAddress _node1 = new UniqueAddress(new Actor.Address("akka.tcp", "Sys", "localhost", 2551), 1);
Expand Down
1 change: 1 addition & 0 deletions src/contrib/cluster/Akka.DistributedData.Tests/GSetSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class GSetSpec
{
const string user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class LWWDictionarySpec
{
private readonly UniqueAddress _node1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class LWWRegisterSpec
{
private readonly UniqueAddress _node1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class LocalConcurrencySpec : Akka.TestKit.Xunit2.TestKit
{
public sealed class Updater : ReceiveActor, IWithUnboundedStash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class ORDictionarySpec
{
private readonly UniqueAddress _node1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class ORMultiDictionarySpec
{
private readonly UniqueAddress _node1;
Expand Down Expand Up @@ -139,7 +140,7 @@ public void A_ORMultiDictionary_should_be_able_to_get_all_bindings_for_an_entry_
m.TryGetValue("a", out a);
Assert.Equal(ImmutableHashSet.Create("A1", "A2"), a);

var m2 = m.SetItem(_node1, "a", a.Remove("A1"));
var m2 = m.SetItems(_node1, "a", a.Remove("A1"));
Assert.Equal(ImmutableDictionary.CreateRange(new[]
{
new KeyValuePair<string, IImmutableSet<string>>("a", ImmutableHashSet.Create("A2")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class ORSetSpec
{
private readonly string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class PNCounterDictionarySpec
{
readonly UniqueAddress _node1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class PNCounterSpec
{
readonly UniqueAddress _address1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class PruningStateSpec
{
private readonly UniqueAddress _node1 = new UniqueAddress(new Address("akka.tcp", "Sys", "localhost", 2551), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections;
using System.Linq;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster;
Expand All @@ -15,6 +18,7 @@

namespace Akka.DistributedData.Tests.Serialization
{
[Collection("DistributedDataSpec")]
public class ReplicatedDataSerializerSpec : TestKit.Xunit2.TestKit
{
private static readonly Config BaseConfig = ConfigurationFactory.ParseString(@"
Expand Down Expand Up @@ -159,11 +163,11 @@ public void ReplicatedDataSerializer_should_serialize_ORMultiDictionary()
CheckSerialization(ORMultiDictionary<string, string>.Empty.AddItem(_address1, "a", "A"));
CheckSerialization(ORMultiDictionary<string, string>.Empty
.AddItem(_address1, "a", "A1")
.SetItem(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" }))
.SetItems(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" }))
.AddItem(_address2, "a", "A2"));

var m1 = ORMultiDictionary<string, string>.Empty.AddItem(_address1, "a", "A1").AddItem(_address2, "a", "A2");
var m2 = ORMultiDictionary<string, string>.Empty.SetItem(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" }));
var m2 = ORMultiDictionary<string, string>.Empty.SetItems(_address2, "b", ImmutableHashSet.CreateRange(new[] { "B1", "B2", "B3" }));
CheckSameContent(m1.Merge(m2), m2.Merge(m1));
}

Expand All @@ -185,18 +189,24 @@ public void ReplicatedDataSerializer_should_serialize_VersionVector()
CheckSameContent(v1.Merge(v2), v2.Merge(v1));
}

private void CheckSerialization(object expected)
private void CheckSerialization<T>(T expected)
{
var serializer = Sys.Serialization.FindSerializerFor(expected);
var blob = serializer.ToBinary(expected);
var actual = serializer.FromBinary(blob, expected.GetType());

Assert.Equal(expected, actual);

// we cannot use Assert.Equal here since ORMultiDictionary will be resolved as
// IEnumerable<KeyValuePair<string, ImmutableHashSet<string>> and immutable sets
// fails on structural equality
Assert.True(expected.Equals(actual));
}

private void CheckSameContent(object a, object b)
{
Assert.Equal(a, b);
// we cannot use Assert.Equal here since ORMultiDictionary will be resolved as
// IEnumerable<KeyValuePair<string, ImmutableHashSet<string>> and immutable sets
// fails on structural equality
Assert.True(a.Equals(b));
var serializer = Sys.Serialization.FindSerializerFor(a);
var blobA = serializer.ToBinary(a);
var blobB = serializer.ToBinary(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace Akka.DistributedData.Tests.Serialization
{
[Collection("DistributedDataSpec")]
public class ReplicatorMessageSerializerSpec : TestKit.Xunit2.TestKit
{
private static readonly Config BaseConfig = ConfigurationFactory.ParseString(@"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class VersionVectorSpec
{
private readonly UniqueAddress _node1 = new UniqueAddress(new Address("akka.tcp", "Sys", "localhost", 2551), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Akka.DistributedData.Tests
{
[Collection("DistributedDataSpec")]
public class WriteAggregatorSpec : Akka.TestKit.Xunit2.TestKit
{
internal class TestWriteAggregator : WriteAggregator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Local\ClusterReplicatedDataExtensions.cs" />
<Compile Include="DistributedData.cs" />
<Compile Include="Dsl.cs" />
<Compile Include="FastMerge.cs" />
Expand All @@ -63,6 +64,13 @@
<Compile Include="Internal\Internal.cs" />
<Compile Include="IReplicatedDataSerialization.cs" />
<Compile Include="Key.cs" />
<Compile Include="Local\LocalLWWDictionary.cs" />
<Compile Include="Local\LocalLWWRegister.cs" />
<Compile Include="Local\LocalORDictionary.cs" />
<Compile Include="Local\LocalORMultiDictionary.cs" />
<Compile Include="Local\LocalORSet.cs" />
<Compile Include="Local\LocalPNCounter.cs" />
<Compile Include="Local\LocalPNCounterDictionary.cs" />
<Compile Include="LWWDictionary.cs" />
<Compile Include="LWWRegister.cs" />
<Compile Include="ORDictionary.cs" />
Expand All @@ -78,6 +86,7 @@
<Compile Include="Replicator.cs" />
<Compile Include="Replicator.Messages.cs" />
<Compile Include="ReplicatorSettings.cs" />
<Compile Include="Local\LocalGCounter.cs" />
<Compile Include="VersionVector.cs" />
<Compile Include="WriteAggregator.cs" />
</ItemGroup>
Expand Down
2 changes: 2 additions & 0 deletions src/contrib/cluster/Akka.DistributedData/Flag.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public bool Equals(Flag other)
public IReplicatedData Merge(IReplicatedData other) => Merge((Flag) other);
public Flag Merge(Flag other) => other.Enabled ? other : this;
public Flag SwitchOn() => Enabled ? this : new Flag(true);

public static implicit operator bool(Flag flag) => flag.Enabled;
}

[Serializable]
Expand Down
7 changes: 6 additions & 1 deletion src/contrib/cluster/Akka.DistributedData/GCounter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
using System.Collections.Immutable;
using System.Linq;
using System.Numerics;
using Akka.Actor;
using Akka.Util;

namespace Akka.DistributedData
{
Expand Down Expand Up @@ -45,13 +47,14 @@ public sealed class GCounter : FastMerge<GCounter>, IRemovedNodePruning<GCounter
/// <summary>
/// Current total value of the counter.
/// </summary>
public BigInteger Value => State.Aggregate(Zero, (v, acc) => v + acc.Value);
public BigInteger Value { get; }

public GCounter() : this(ImmutableDictionary<UniqueAddress, BigInteger>.Empty) { }

public GCounter(IImmutableDictionary<UniqueAddress, BigInteger> state)
{
State = state;
Value = State.Aggregate(Zero, (v, acc) => v + acc.Value);
}

/// <summary>
Expand Down Expand Up @@ -146,5 +149,7 @@ public int CompareTo(GCounter other)
}

public override string ToString() => $"GCounter({Value})";

public static implicit operator BigInteger(GCounter counter) => counter.Value;
}
}
8 changes: 7 additions & 1 deletion src/contrib/cluster/Akka.DistributedData/GSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
//-----------------------------------------------------------------------

using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Cluster;

namespace Akka.DistributedData
{
Expand Down Expand Up @@ -35,7 +38,7 @@ public static class GSet
/// This class is immutable, i.e. "modifying" methods return a new instance.
/// </summary>
[Serializable]
public sealed class GSet<T> : FastMerge<GSet<T>>, IReplicatedDataSerialization, IGSet, IEquatable<GSet<T>>
public sealed class GSet<T> : FastMerge<GSet<T>>, IReplicatedDataSerialization, IGSet, IEquatable<GSet<T>>, IEnumerable<T>
{
public static readonly GSet<T> Empty = new GSet<T>();

Expand Down Expand Up @@ -77,9 +80,12 @@ public bool Equals(GSet<T> other)
return Elements.SetEquals(other.Elements);
}

public IEnumerator<T> GetEnumerator() => Elements.GetEnumerator();

public override bool Equals(object obj) => obj is GSet<T> && Equals((GSet<T>) obj);

public override int GetHashCode() => Elements.GetHashCode();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

internal interface IGSetKey
Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.DistributedData/Key.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface IKeyWithGenericType : IKey

/// <summary>
/// Key for the key-value data in <see cref="Replicator"/>. The type of the data value
/// is defined in the key. Keys are compared equal if the `id` strings are equal,
/// is defined in the key. KeySet are compared equal if the `id` strings are equal,
/// i.e. use unique identifiers.
///
/// Specific classes are provided for the built in data types, e.g. <see cref="ORSetKey{T}"/>,
Expand Down
Loading

0 comments on commit c6b8709

Please sign in to comment.