Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the RegisterOnTermination feature. #1523

Merged
merged 1 commit into from
Dec 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 66 additions & 4 deletions src/core/Akka.Tests/Actor/ActorSystemSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public ActorSystemSpec()
[Fact]
public void AnActorSystemMustRejectInvalidNames()
{
(new List<string> {
new List<string> {
"hallo_welt",
"-hallowelt",
"hallo*welt",
"hallo@welt",
"hallo#welt",
"hallo$welt",
"hallo%welt",
"hallo/welt"}).ForEach(n =>
"hallo/welt"}.ForEach(n =>
{
XAssert.Throws<ArgumentException>(() => ActorSystem.Create(n));
});
Expand Down Expand Up @@ -68,10 +68,72 @@ public void Given_a_system_that_isnt_going_to_shutdown_When_waiting_for_system_s
actorSystem.AwaitTermination(TimeSpan.FromMilliseconds(10)).ShouldBeFalse();
}

#region Extensions tests
[Fact]
public void Run_termination_callbacks_in_order()
{
var actorSystem = ActorSystem.Create(Guid.NewGuid().ToString());
var result = new List<int>();
var expected = new List<int>();
var count = 10;
var latch = new TestLatch(count);


for (int i = 0; i < count; i++)
{
expected.Add(i);

var value = i;
actorSystem.RegisterOnTermination(() =>
{
Task.Delay(Dilated(TimeSpan.FromMilliseconds(value % 3))).Wait();
result.Add(value);
latch.CountDown();
});
}

actorSystem.Shutdown();
latch.Ready();

expected.Reverse();

Assert.Equal(expected, result);
}

[Fact]
public void AwaitTermination_after_termination_callbacks()
{
var actorSystem = ActorSystem.Create(Guid.NewGuid().ToString());
var callbackWasRun = false;

actorSystem.RegisterOnTermination(() =>
{
Task.Delay(Dilated(TimeSpan.FromMilliseconds(50))).Wait();
callbackWasRun = true;
});

new TaskFactory().StartNew(() =>
{
Task.Delay(Dilated(TimeSpan.FromMilliseconds(200))).Wait();
actorSystem.Shutdown();
});

actorSystem.AwaitTermination(TimeSpan.FromSeconds(5));
Assert.True(callbackWasRun);
}

[Fact]
public void Throw_exception_when_register_callback_after_shutdown()
{
var actorSystem = ActorSystem.Create(Guid.NewGuid().ToString());

actorSystem.Shutdown();
actorSystem.AwaitTermination(TimeSpan.FromSeconds(10));

var ex = Assert.Throws<Exception>(() => actorSystem.RegisterOnTermination(() => { }));
Assert.Equal("ActorSystem already terminated.", ex.Message);
}

#region Extensions tests

[Fact]
public void AnActorSystem_Must_Support_Extensions()
{
Expand Down
14 changes: 12 additions & 2 deletions src/core/Akka/Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static ActorSystem Create(string name, Config config)
/// <returns>ActorSystem.</returns>
public static ActorSystem Create(string name)
{
return CreateAndStartSystem(name,ConfigurationFactory.Load());
return CreateAndStartSystem(name, ConfigurationFactory.Load());
}

private static ActorSystem CreateAndStartSystem(string name, Config withFallback)
Expand Down Expand Up @@ -136,6 +136,16 @@ private static ActorSystem CreateAndStartSystem(string name, Config withFallback
/// </summary>
public abstract bool TryGetExtension<T>(out T extension) where T : class, IExtension;

/// <summary>
/// Register a block of code (callback) to run after ActorSystem.shutdown has been issued and
/// all actors in this actor system have been stopped.
/// Multiple code blocks may be registered by calling this method multiple times.
/// The callbacks will be run sequentially in reverse order of registration, i.e.
/// last registration is run first.
/// </summary>
/// <param name="code">The code to run</param>
/// <exception cref="Exception">Thrown if the System has already shut down or if shutdown has been initiated.</exception>
public abstract void RegisterOnTermination(Action code);

/// <summary>
/// Stop this actor system. This will stop the guardian actor, which in turn
Expand Down Expand Up @@ -237,7 +247,7 @@ private void Dispose(bool disposing)
public abstract object RegisterExtension(IExtensionId extension);

public abstract IActorRef ActorOf(Props props, string name = null);

public abstract ActorSelection ActorSelection(ActorPath actorPath);
public abstract ActorSelection ActorSelection(string actorPath);

Expand Down
65 changes: 63 additions & 2 deletions src/core/Akka/Actor/Internal/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
using Akka.Event;
using Akka.Util;


namespace Akka.Actor.Internal
Expand All @@ -37,6 +38,7 @@ public class ActorSystemImpl : ExtendedActorSystem
private Mailboxes _mailboxes;
private IScheduler _scheduler;
private ActorProducerPipelineResolver _actorProducerPipelineResolver;
private TerminationCallbacks _terminationCallbacks;

public ActorSystemImpl(string name)
: this(name, ConfigurationFactory.Load())
Expand All @@ -55,6 +57,7 @@ public ActorSystemImpl(string name, Config config)
ConfigureSettings(config);
ConfigureEventStream();
ConfigureProvider();
ConfigureTerminationCallbacks();
ConfigureScheduler();
ConfigureSerialization();
ConfigureMailboxes();
Expand Down Expand Up @@ -296,6 +299,28 @@ private void ConfigureActorProducerPipeline()
_actorProducerPipelineResolver = new ActorProducerPipelineResolver(() => Log);
}

/// <summary>
/// Configures the termination callbacks.
/// </summary>
private void ConfigureTerminationCallbacks()
{
_terminationCallbacks = new TerminationCallbacks(Provider.TerminationTask);
}

/// <summary>
/// Register a block of code (callback) to run after ActorSystem.shutdown has been issued and
/// all actors in this actor system have been stopped.
/// Multiple code blocks may be registered by calling this method multiple times.
/// The callbacks will be run sequentially in reverse order of registration, i.e.
/// last registration is run first.
/// </summary>
/// <param name="code">The code to run</param>
/// <exception cref="Exception">Thrown if the System has already shut down or if shutdown has been initiated.</exception>
public override void RegisterOnTermination(Action code)
{
_terminationCallbacks.Add(code);
}

/// <summary>
/// Stop this actor system. This will stop the guardian actor, which in turn
/// will recursively stop all its child actors, then the system guardian
Expand All @@ -308,7 +333,7 @@ public override void Shutdown()
_provider.Guardian.Stop();
}

public override Task TerminationTask { get { return _provider.TerminationTask; } }
public override Task TerminationTask { get { return _terminationCallbacks.TerminationTask; } }

public override void AwaitTermination()
{
Expand All @@ -324,7 +349,7 @@ public override bool AwaitTermination(TimeSpan timeout, CancellationToken cancel
{
try
{
return _provider.TerminationTask.Wait((int) timeout.TotalMilliseconds, cancellationToken);
return _terminationCallbacks.TerminationTask.Wait((int) timeout.TotalMilliseconds, cancellationToken);
}
catch(OperationCanceledException)
{
Expand All @@ -345,7 +370,43 @@ public override void Stop(IActorRef actor)
((IInternalActorRef)actor).Stop();
}

}

class TerminationCallbacks
{
private Task _terminationTask;
private AtomicReference<Task> _atomicRef;

public TerminationCallbacks(Task upStreamTerminated)
{
_atomicRef = new AtomicReference<Task>(new Task(() => {}));

upStreamTerminated.ContinueWith(_ =>
{
_terminationTask = Interlocked.Exchange(ref _atomicRef, new AtomicReference<Task>(null)).Value;
_terminationTask.Start();
});
}

public void Add(Action code)
{
var previous = _atomicRef.Value;

if (_atomicRef.Value == null)
throw new Exception("ActorSystem already terminated.");

var t = new Task(code);

if (_atomicRef.CompareAndSet(previous, t))
{
t.ContinueWith(_ => previous.Start());
return;
}

Add(code);
}

public Task TerminationTask { get { return _atomicRef.Value ?? _terminationTask; } }
}
}