Skip to content

Commit

Permalink
#758 added RemoteDeploymentWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim.salamatko committed Oct 9, 2015
1 parent a36dadd commit 44c29cc
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Remote/Akka.Remote.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<Compile Include="Proto\Containerformats.cs" />
<Compile Include="Proto\Wireformats.cs" />
<Compile Include="RemoteActorRef.cs" />
<Compile Include="RemoteDeploymentWatcher.cs" />
<Compile Include="RemoteSystemDaemon.cs" />
<Compile Include="RemoteDeployer.cs" />
<Compile Include="RemoteSettings.cs" />
Expand Down
9 changes: 9 additions & 0 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void UnregisterTempActor(ActorPath path)

private volatile IActorRef _remotingTerminator;
private volatile IActorRef _remoteWatcher;
private volatile IActorRef _remoteDeploymentWatcher;

public virtual void Init(ActorSystemImpl system)
{
Expand All @@ -111,6 +112,7 @@ public virtual void Init(ActorSystemImpl system)

Transport.Start();
_remoteWatcher = CreateRemoteWatcher(system);
_remoteDeploymentWatcher = CreateRemoteDeploymentWatcher(system);
}

protected virtual IActorRef CreateRemoteWatcher(ActorSystemImpl system)
Expand All @@ -124,6 +126,12 @@ protected virtual IActorRef CreateRemoteWatcher(ActorSystemImpl system)
RemoteSettings.WatchHeartbeatExpectedResponseAfter)), "remote-watcher");
}

protected virtual IActorRef CreateRemoteDeploymentWatcher(ActorSystemImpl system)
{
return system.SystemActorOf(RemoteSettings.ConfigureDispatcher(Props.Create<RemoteDeploymentWatcher>()),
"remote-deployment-watcher");
}

protected DefaultFailureDetectorRegistry<Address> CreateRemoteWatcherFailureDetector(ActorSystem system)
{
return new DefaultFailureDetectorRegistry<Address>(() =>
Expand Down Expand Up @@ -358,6 +366,7 @@ public void UseActorOnNode(RemoteActorRef actor, Props props, Deploy deploy, IIn
_log.Debug("[{0}] Instantiating Remote Actor [{1}]", RootPath, actor.Path);
IActorRef remoteNode = ResolveActorRef(new RootActorPath(actor.Path.Address) / "remote");
remoteNode.Tell(new DaemonMsgCreate(props, deploy, actor.Path.ToSerializationFormat(), supervisor));
_remoteDeploymentWatcher.Tell(new RemoteDeploymentWatcher.WatchRemote(actor, supervisor));
}

/// <summary>
Expand Down
55 changes: 55 additions & 0 deletions src/core/Akka.Remote/RemoteDeploymentWatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
using Akka.Util.Internal.Collections;

namespace Akka.Remote
{
/// <summary>
/// Responsible for cleaning up child references of remote deployed actors when remote node
/// goes down (crash, network failure), i.e. triggered by Akka.Actor.Terminated.AddressTerminated
/// </summary>
internal class RemoteDeploymentWatcher : ActorBase, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
{

private readonly IImmutableMap<IActorRef, IInternalActorRef> _supervisors =
ImmutableTreeMap<IActorRef, IInternalActorRef>.Empty;
protected override bool Receive(object message)
{
if (message == null)
{
return false;
}
return message.Match().With<WatchRemote>(w =>
{
_supervisors.Add(w.Actor, w.Supervisor);
Context.Watch(w.Actor);
}).With<Terminated>(t =>
{
IInternalActorRef supervisor;
if (_supervisors.TryGet(t.ActorRef, out supervisor))
{
supervisor.SendSystemMessage(new DeathWatchNotification(t.ActorRef, t.ExistenceConfirmed, t.AddressTerminated), supervisor);
_supervisors.Remove(t.ActorRef);
}
}).WasHandled;
}

internal class WatchRemote
{
public WatchRemote(IActorRef actor, IInternalActorRef supervisor)
{
Actor = actor;
Supervisor = supervisor;
}

public IActorRef Actor { get; private set; }
public IInternalActorRef Supervisor { get; private set; }
}
}
}
6 changes: 6 additions & 0 deletions src/core/Akka.TestKit/TestActorRefBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
using Akka.TestKit.Internal;
using Akka.Util;

Expand Down Expand Up @@ -238,6 +239,11 @@ void IInternalActorRef.Suspend()
{
_internalRef.Suspend();
}

public void SendSystemMessage(ISystemMessage message, IActorRef sender)
{
_internalRef .SendSystemMessage(message, sender);
}
}
}

6 changes: 6 additions & 0 deletions src/core/Akka.TestKit/TestProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Dispatch.SysMsg;
using Akka.Util;

namespace Akka.TestKit
Expand Down Expand Up @@ -139,6 +140,11 @@ void IInternalActorRef.Suspend()
((IInternalActorRef)TestActor).Suspend();
}

public void SendSystemMessage(ISystemMessage message, IActorRef sender)
{
((IInternalActorRef)TestActor).SendSystemMessage(message, sender);
}

public int CompareTo(object obj)
{
return TestActor.CompareTo(obj);
Expand Down
26 changes: 13 additions & 13 deletions src/core/Akka/Actor/ActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,6 @@ protected override void TellInternal(object message, IActorRef sender)
}
}
}

protected void SendSystemMessage(ISystemMessage message, IActorRef sender)
{
var d = message as DeathWatchNotification;
if (message is Terminate)
{
Stop();
}
else if (d != null)
{
this.Tell(new Terminated(d.Actor, d.ExistenceConfirmed, d.AddressTerminated));
}
}
}


Expand Down Expand Up @@ -263,6 +250,7 @@ public interface IInternalActorRef : IActorRef, IActorRefScope
void Stop();
void Restart(Exception cause);
void Suspend();
void SendSystemMessage(ISystemMessage message, IActorRef sender);
}

public abstract class InternalActorRefBase : ActorRefBase, IInternalActorRef
Expand All @@ -287,6 +275,18 @@ public abstract class InternalActorRefBase : ActorRefBase, IInternalActorRef

public abstract bool IsTerminated { get; }
public abstract bool IsLocal { get; }
public void SendSystemMessage(ISystemMessage message, IActorRef sender)
{
var d = message as DeathWatchNotification;
if (message is Terminate)
{
Stop();
}
else if (d != null)
{
this.Tell(new Terminated(d.Actor, d.ExistenceConfirmed, d.AddressTerminated));
}
}
}

public abstract class MinimalActorRef : InternalActorRefBase, ILocalRef
Expand Down

0 comments on commit 44c29cc

Please sign in to comment.