Skip to content

Commit

Permalink
Add support for scheduled messages
Browse files Browse the repository at this point in the history
Remove synchronize in Virtual Threads
Open up for external implementations
  • Loading branch information
paxel committed Oct 3, 2024
1 parent bd68403 commit b260309
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 34 deletions.
18 changes: 18 additions & 0 deletions src/main/java/paxel/lintstone/api/AutoClosableLock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package paxel.lintstone.api;

import java.util.concurrent.locks.ReentrantLock;

public class AutoClosableLock implements AutoCloseable {

private final ReentrantLock lock;

public AutoClosableLock(ReentrantLock lock) {
this.lock = lock;
lock.lock();
}

@Override
public void close() {
lock.unlock();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package paxel.lintstone.api;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -58,6 +59,17 @@ public interface LintStoneMessageEventContext {
*/
void tell(String name, Object msg) throws UnregisteredRecipientException;

/**
* Sends the message to the actor with the registered name.
*
* @param name the name of the actor.
* @param msg The message to send.
* @param delay The delay of the message send. The message will be enqueued not before this duration has passed.
* @throws UnregisteredRecipientException if there is no actor with that
* name.
*/
void tell(String name, Object msg, Duration delay) throws UnregisteredRecipientException;

/**
* Sends the message to the actor with the registered name.
* The replies of that actor are processed by the given Reply Handler in the thread context of this actor.
Expand Down Expand Up @@ -110,8 +122,8 @@ public interface LintStoneMessageEventContext {
* This method delegates to
* {@link LintStoneSystem#registerActor(String, LintStoneActorFactory, ActorSettings)}.
*
* @param name The name of the actor.
* @param factory The factory.
* @param name The name of the actor.
* @param factory The factory.
* @return The new or old actor access.
*/
LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, ActorSettings settings);
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/paxel/lintstone/api/ProcessorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package paxel.lintstone.api;

import paxel.lintstone.impl.SequentialProcessorBuilder;

import java.util.List;
import java.util.concurrent.TimeUnit;

public interface ProcessorFactory {
SequentialProcessorBuilder create();

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
}
19 changes: 19 additions & 0 deletions src/main/java/paxel/lintstone/api/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package paxel.lintstone.api;

import java.time.Duration;

public interface Scheduler {

/**
* Run the runnable after the duration has passed.
*
* @param runnable The runnable to execute
* @param duration The duration to wait
*/
void runLater(Runnable runnable, Duration duration);

/**
* Stops the scheduler. Currently running runnables are finished, but no other Runnables will be executed.
*/
void shutDown();
}
22 changes: 17 additions & 5 deletions src/main/java/paxel/lintstone/impl/Actor.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package paxel.lintstone.impl;

import paxel.lintstone.api.LintStoneActor;
import paxel.lintstone.api.NoSenderException;
import paxel.lintstone.api.ReplyHandler;
import paxel.lintstone.api.UnregisteredRecipientException;
import paxel.lintstone.api.*;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
Expand All @@ -25,11 +23,13 @@ class Actor {
private final AtomicLong totalMessages = new AtomicLong();
private final AtomicLong totalReplies = new AtomicLong();
private final MessageContextFactory messageContextFactory;
private final Scheduler scheduler;

Actor(String name, LintStoneActor actorInstance, SequentialProcessor sequentialProcessor, ActorSystem system, SelfUpdatingActorAccessor sender) {
Actor(String name, LintStoneActor actorInstance, SequentialProcessor sequentialProcessor, ActorSystem system, SelfUpdatingActorAccessor sender, Scheduler scheduler) {
this.name = name;
this.actorInstance = actorInstance;
this.sequentialProcessor = sequentialProcessor;
this.scheduler = scheduler;
messageContextFactory = new MessageContextFactory(system, new SelfUpdatingActorAccessor(name, this, system, sender));
}

Expand All @@ -48,6 +48,18 @@ void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHa
totalMessages.incrementAndGet();
}

void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHandler, Duration delay) throws UnregisteredRecipientException {
scheduler.runLater(() -> {
if (!registered) {

}
Runnable runnable = createRunnable(message, sender, replyHandler);
sequentialProcessor.add(runnable);
totalMessages.incrementAndGet();
}, delay);
}


void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHandler, int blockThreshold) throws UnregisteredRecipientException, InterruptedException {
if (!registered) {
throw new UnregisteredRecipientException("Actor " + name + " is not registered");
Expand Down
54 changes: 32 additions & 22 deletions src/main/java/paxel/lintstone/impl/ActorSystem.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package paxel.lintstone.impl;

import paxel.lintstone.api.*;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import paxel.lintstone.api.*;
import java.util.concurrent.locks.ReentrantLock;

public class ActorSystem implements LintStoneSystem {

private final Map<String, Actor> actors = Collections.synchronizedMap(new HashMap<>());
private final GroupingExecutor groupingExecutor;
private final Map<String, Actor> actors = new ConcurrentHashMap<>();
private final ProcessorFactory processorFactory;
private final Scheduler scheduler;
private final ReentrantLock lock = new ReentrantLock();

public ActorSystem() {
groupingExecutor = new GroupingExecutor();
processorFactory = new GroupingExecutor();
scheduler = new SimpleScheduler();
}

public ActorSystem(ProcessorFactory processorFactory, Scheduler scheduler) {
this.processorFactory = processorFactory;
this.scheduler = scheduler;
}

@Override
Expand All @@ -30,26 +38,24 @@ public LintStoneActorAccessor registerActor(String name, LintStoneActorFactory f

@Override
public LintStoneActorAccessor getActor(String name) {
synchronized (actors) {
return new SelfUpdatingActorAccessor(name, actors.get(name), this, null);
}
return new SelfUpdatingActorAccessor(name, actors.get(name), this, null);
}

LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, SelfUpdatingActorAccessor sender, ActorSettings settings, Object initMessage) {
SequentialProcessorBuilder sequentialProcessorBuilder = groupingExecutor.create();
SequentialProcessorBuilder sequentialProcessorBuilder = processorFactory.create();
sequentialProcessorBuilder.setErrorHandler(settings.errorHandler());
return registerActor(name, factory, initMessage, sender, sequentialProcessorBuilder);
}


private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, Object initMessage, SelfUpdatingActorAccessor sender, SequentialProcessorBuilder sequentialProcessor) {
synchronized (actors) {
try (AutoClosableLock ignored = new AutoClosableLock(lock)) {
Actor existing = actors.get(name);
if (existing != null) {
return new SelfUpdatingActorAccessor(name, existing, this, sender);
}
LintStoneActor actorInstance = factory.create();
Actor newActor = new Actor(name, actorInstance, sequentialProcessor.build(), this, sender);
Actor newActor = new Actor(name, actorInstance, sequentialProcessor.build(), this, sender, scheduler);
// actor receives the initMessage as first message.
Optional.ofNullable(initMessage).ifPresent(msg -> newActor.send(msg, null, null));
actors.put(name, newActor);
Expand All @@ -61,40 +67,44 @@ private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory
@Override
public void shutDown() {
shutdownActors(false);
groupingExecutor.shutdown();
processorFactory.shutdown();
scheduler.shutDown();
}

@Override
public void shutDownAndWait() throws InterruptedException {
shutdownActors(false);
groupingExecutor.shutdown();
processorFactory.shutdown();
scheduler.shutDown();
//wait forever and a day
groupingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
processorFactory.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}

@Override
public boolean shutDownAndWait(Duration timeout) throws InterruptedException {
shutdownActors(false);
groupingExecutor.shutdown();
return groupingExecutor.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS);
processorFactory.shutdown();
scheduler.shutDown();
return processorFactory.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS);
}

@Override
public void shutDownNow() {
shutdownActors(true);
groupingExecutor.shutdownNow();
processorFactory.shutdownNow();
scheduler.shutDown();
}

private void shutdownActors(boolean now) {
synchronized (actors) {
try (AutoClosableLock ignored = new AutoClosableLock(lock)) {
actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(now));
}
}


@Override
public boolean unregisterActor(String name) {
synchronized (actors) {
try (AutoClosableLock ignored = new AutoClosableLock(lock)) {
Actor remove = actors.remove(name);
if (remove != null) {
// this actor will not accept any messages anymore. The Accesses should try to get a new instance or fail.
Expand All @@ -116,7 +126,7 @@ public String toString() {
StringBuilder stringBuilder = new StringBuilder("ActorSystem{");

actors.forEach((a, f) -> stringBuilder.append(f.toString()).append("\n"));
stringBuilder.append(" exec:").append(groupingExecutor.toString());
stringBuilder.append(" exec:").append(processorFactory.toString());
stringBuilder.append("}");

return stringBuilder.toString();
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/paxel/lintstone/impl/GroupingExecutor.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package paxel.lintstone.impl;

import paxel.lintstone.api.ProcessorFactory;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class GroupingExecutor {
public class GroupingExecutor implements ProcessorFactory {
private final ExecutorService executorService;


Expand All @@ -14,27 +16,28 @@ public GroupingExecutor() {
this.executorService = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public SequentialProcessorBuilder create() {
// the Builder will submit the runnable to the service when the Processor is build.
return new SequentialProcessorBuilder(executorService);
}

@Override
public void shutdown() {
executorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return executorService.isShutdown();
}

public boolean isTerminated() {
return executorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/paxel/lintstone/impl/MessageContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import paxel.lintstone.api.*;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -47,6 +48,15 @@ public void tell(String name, Object msg) throws UnregisteredRecipientException
actor.get().send(msg, self, null);
}

@Override
public void tell(String name, Object msg, Duration delay) throws UnregisteredRecipientException {
Optional<Actor> actor = actorSystem.getOptionalActor(name);
if (actor.isEmpty()) {
throw new UnregisteredRecipientException("Actor with name " + name + " does not exist");
}
actor.get().send(msg, self, null, delay);
}

@Override
public void ask(String name, Object msg, ReplyHandler handler) throws UnregisteredRecipientException {
Optional<Actor> actor = actorSystem.getOptionalActor(name);
Expand Down
Loading

0 comments on commit b260309

Please sign in to comment.