From b260309287a47fac33514137080944a773d1d808 Mon Sep 17 00:00:00 2001 From: Patrick Zimmer Date: Thu, 3 Oct 2024 09:17:33 +0200 Subject: [PATCH] Add support for scheduled messages Remove synchronize in Virtual Threads Open up for external implementations --- .../paxel/lintstone/api/AutoClosableLock.java | 18 ++++ .../api/LintStoneMessageEventContext.java | 16 +++- .../paxel/lintstone/api/ProcessorFactory.java | 18 ++++ .../java/paxel/lintstone/api/Scheduler.java | 19 +++++ src/main/java/paxel/lintstone/impl/Actor.java | 22 +++-- .../paxel/lintstone/impl/ActorSystem.java | 54 +++++++----- .../lintstone/impl/GroupingExecutor.java | 13 +-- .../paxel/lintstone/impl/MessageContext.java | 10 +++ .../paxel/lintstone/impl/SimpleScheduler.java | 85 +++++++++++++++++++ .../lintstone/impl/SimpleSchedulerTest.java | 30 +++++++ 10 files changed, 251 insertions(+), 34 deletions(-) create mode 100644 src/main/java/paxel/lintstone/api/AutoClosableLock.java create mode 100644 src/main/java/paxel/lintstone/api/ProcessorFactory.java create mode 100644 src/main/java/paxel/lintstone/api/Scheduler.java create mode 100644 src/main/java/paxel/lintstone/impl/SimpleScheduler.java create mode 100644 src/test/java/paxel/lintstone/impl/SimpleSchedulerTest.java diff --git a/src/main/java/paxel/lintstone/api/AutoClosableLock.java b/src/main/java/paxel/lintstone/api/AutoClosableLock.java new file mode 100644 index 0000000..ff24bb4 --- /dev/null +++ b/src/main/java/paxel/lintstone/api/AutoClosableLock.java @@ -0,0 +1,18 @@ +package paxel.lintstone.api; + +import java.util.concurrent.locks.ReentrantLock; + +public class AutoClosableLock implements AutoCloseable { + + private final ReentrantLock lock; + + public AutoClosableLock(ReentrantLock lock) { + this.lock = lock; + lock.lock(); + } + + @Override + public void close() { + lock.unlock(); + } +} diff --git a/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java b/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java index bf20fa5..d0c0282 100644 --- a/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java +++ b/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java @@ -1,5 +1,6 @@ package paxel.lintstone.api; +import java.time.Duration; import java.util.concurrent.CompletableFuture; /** @@ -58,6 +59,17 @@ public interface LintStoneMessageEventContext { */ void tell(String name, Object msg) throws UnregisteredRecipientException; + /** + * Sends the message to the actor with the registered name. + * + * @param name the name of the actor. + * @param msg The message to send. + * @param delay The delay of the message send. The message will be enqueued not before this duration has passed. + * @throws UnregisteredRecipientException if there is no actor with that + * name. + */ + void tell(String name, Object msg, Duration delay) throws UnregisteredRecipientException; + /** * Sends the message to the actor with the registered name. * The replies of that actor are processed by the given Reply Handler in the thread context of this actor. @@ -110,8 +122,8 @@ public interface LintStoneMessageEventContext { * This method delegates to * {@link LintStoneSystem#registerActor(String, LintStoneActorFactory, ActorSettings)}. * - * @param name The name of the actor. - * @param factory The factory. + * @param name The name of the actor. + * @param factory The factory. * @return The new or old actor access. */ LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, ActorSettings settings); diff --git a/src/main/java/paxel/lintstone/api/ProcessorFactory.java b/src/main/java/paxel/lintstone/api/ProcessorFactory.java new file mode 100644 index 0000000..001d4ce --- /dev/null +++ b/src/main/java/paxel/lintstone/api/ProcessorFactory.java @@ -0,0 +1,18 @@ +package paxel.lintstone.api; + +import paxel.lintstone.impl.SequentialProcessorBuilder; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public interface ProcessorFactory { + SequentialProcessorBuilder create(); + + void shutdown(); + + List shutdownNow(); + + boolean isShutdown(); + + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; +} diff --git a/src/main/java/paxel/lintstone/api/Scheduler.java b/src/main/java/paxel/lintstone/api/Scheduler.java new file mode 100644 index 0000000..5bd633f --- /dev/null +++ b/src/main/java/paxel/lintstone/api/Scheduler.java @@ -0,0 +1,19 @@ +package paxel.lintstone.api; + +import java.time.Duration; + +public interface Scheduler { + + /** + * Run the runnable after the duration has passed. + * + * @param runnable The runnable to execute + * @param duration The duration to wait + */ + void runLater(Runnable runnable, Duration duration); + + /** + * Stops the scheduler. Currently running runnables are finished, but no other Runnables will be executed. + */ + void shutDown(); +} diff --git a/src/main/java/paxel/lintstone/impl/Actor.java b/src/main/java/paxel/lintstone/impl/Actor.java index 05b8521..9d2751d 100644 --- a/src/main/java/paxel/lintstone/impl/Actor.java +++ b/src/main/java/paxel/lintstone/impl/Actor.java @@ -1,10 +1,8 @@ package paxel.lintstone.impl; -import paxel.lintstone.api.LintStoneActor; -import paxel.lintstone.api.NoSenderException; -import paxel.lintstone.api.ReplyHandler; -import paxel.lintstone.api.UnregisteredRecipientException; +import paxel.lintstone.api.*; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; @@ -25,11 +23,13 @@ class Actor { private final AtomicLong totalMessages = new AtomicLong(); private final AtomicLong totalReplies = new AtomicLong(); private final MessageContextFactory messageContextFactory; + private final Scheduler scheduler; - Actor(String name, LintStoneActor actorInstance, SequentialProcessor sequentialProcessor, ActorSystem system, SelfUpdatingActorAccessor sender) { + Actor(String name, LintStoneActor actorInstance, SequentialProcessor sequentialProcessor, ActorSystem system, SelfUpdatingActorAccessor sender, Scheduler scheduler) { this.name = name; this.actorInstance = actorInstance; this.sequentialProcessor = sequentialProcessor; + this.scheduler = scheduler; messageContextFactory = new MessageContextFactory(system, new SelfUpdatingActorAccessor(name, this, system, sender)); } @@ -48,6 +48,18 @@ void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHa totalMessages.incrementAndGet(); } + void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHandler, Duration delay) throws UnregisteredRecipientException { + scheduler.runLater(() -> { + if (!registered) { + + } + Runnable runnable = createRunnable(message, sender, replyHandler); + sequentialProcessor.add(runnable); + totalMessages.incrementAndGet(); + }, delay); + } + + void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHandler, int blockThreshold) throws UnregisteredRecipientException, InterruptedException { if (!registered) { throw new UnregisteredRecipientException("Actor " + name + " is not registered"); diff --git a/src/main/java/paxel/lintstone/impl/ActorSystem.java b/src/main/java/paxel/lintstone/impl/ActorSystem.java index 65cb4ef..e24aae8 100644 --- a/src/main/java/paxel/lintstone/impl/ActorSystem.java +++ b/src/main/java/paxel/lintstone/impl/ActorSystem.java @@ -1,21 +1,29 @@ package paxel.lintstone.impl; +import paxel.lintstone.api.*; + import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - -import paxel.lintstone.api.*; +import java.util.concurrent.locks.ReentrantLock; public class ActorSystem implements LintStoneSystem { - private final Map actors = Collections.synchronizedMap(new HashMap<>()); - private final GroupingExecutor groupingExecutor; + private final Map actors = new ConcurrentHashMap<>(); + private final ProcessorFactory processorFactory; + private final Scheduler scheduler; + private final ReentrantLock lock = new ReentrantLock(); public ActorSystem() { - groupingExecutor = new GroupingExecutor(); + processorFactory = new GroupingExecutor(); + scheduler = new SimpleScheduler(); + } + + public ActorSystem(ProcessorFactory processorFactory, Scheduler scheduler) { + this.processorFactory = processorFactory; + this.scheduler = scheduler; } @Override @@ -30,26 +38,24 @@ public LintStoneActorAccessor registerActor(String name, LintStoneActorFactory f @Override public LintStoneActorAccessor getActor(String name) { - synchronized (actors) { - return new SelfUpdatingActorAccessor(name, actors.get(name), this, null); - } + return new SelfUpdatingActorAccessor(name, actors.get(name), this, null); } LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, SelfUpdatingActorAccessor sender, ActorSettings settings, Object initMessage) { - SequentialProcessorBuilder sequentialProcessorBuilder = groupingExecutor.create(); + SequentialProcessorBuilder sequentialProcessorBuilder = processorFactory.create(); sequentialProcessorBuilder.setErrorHandler(settings.errorHandler()); return registerActor(name, factory, initMessage, sender, sequentialProcessorBuilder); } private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, Object initMessage, SelfUpdatingActorAccessor sender, SequentialProcessorBuilder sequentialProcessor) { - synchronized (actors) { + try (AutoClosableLock ignored = new AutoClosableLock(lock)) { Actor existing = actors.get(name); if (existing != null) { return new SelfUpdatingActorAccessor(name, existing, this, sender); } LintStoneActor actorInstance = factory.create(); - Actor newActor = new Actor(name, actorInstance, sequentialProcessor.build(), this, sender); + Actor newActor = new Actor(name, actorInstance, sequentialProcessor.build(), this, sender, scheduler); // actor receives the initMessage as first message. Optional.ofNullable(initMessage).ifPresent(msg -> newActor.send(msg, null, null)); actors.put(name, newActor); @@ -61,32 +67,36 @@ private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory @Override public void shutDown() { shutdownActors(false); - groupingExecutor.shutdown(); + processorFactory.shutdown(); + scheduler.shutDown(); } @Override public void shutDownAndWait() throws InterruptedException { shutdownActors(false); - groupingExecutor.shutdown(); + processorFactory.shutdown(); + scheduler.shutDown(); //wait forever and a day - groupingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS); + processorFactory.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS); } @Override public boolean shutDownAndWait(Duration timeout) throws InterruptedException { shutdownActors(false); - groupingExecutor.shutdown(); - return groupingExecutor.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS); + processorFactory.shutdown(); + scheduler.shutDown(); + return processorFactory.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS); } @Override public void shutDownNow() { shutdownActors(true); - groupingExecutor.shutdownNow(); + processorFactory.shutdownNow(); + scheduler.shutDown(); } private void shutdownActors(boolean now) { - synchronized (actors) { + try (AutoClosableLock ignored = new AutoClosableLock(lock)) { actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(now)); } } @@ -94,7 +104,7 @@ private void shutdownActors(boolean now) { @Override public boolean unregisterActor(String name) { - synchronized (actors) { + try (AutoClosableLock ignored = new AutoClosableLock(lock)) { Actor remove = actors.remove(name); if (remove != null) { // this actor will not accept any messages anymore. The Accesses should try to get a new instance or fail. @@ -116,7 +126,7 @@ public String toString() { StringBuilder stringBuilder = new StringBuilder("ActorSystem{"); actors.forEach((a, f) -> stringBuilder.append(f.toString()).append("\n")); - stringBuilder.append(" exec:").append(groupingExecutor.toString()); + stringBuilder.append(" exec:").append(processorFactory.toString()); stringBuilder.append("}"); return stringBuilder.toString(); diff --git a/src/main/java/paxel/lintstone/impl/GroupingExecutor.java b/src/main/java/paxel/lintstone/impl/GroupingExecutor.java index f9286be..b2b65ff 100644 --- a/src/main/java/paxel/lintstone/impl/GroupingExecutor.java +++ b/src/main/java/paxel/lintstone/impl/GroupingExecutor.java @@ -1,11 +1,13 @@ package paxel.lintstone.impl; +import paxel.lintstone.api.ProcessorFactory; + import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public class GroupingExecutor { +public class GroupingExecutor implements ProcessorFactory { private final ExecutorService executorService; @@ -14,27 +16,28 @@ public GroupingExecutor() { this.executorService = Executors.newVirtualThreadPerTaskExecutor(); } + @Override public SequentialProcessorBuilder create() { // the Builder will submit the runnable to the service when the Processor is build. return new SequentialProcessorBuilder(executorService); } + @Override public void shutdown() { executorService.shutdown(); } + @Override public List shutdownNow() { return executorService.shutdownNow(); } + @Override public boolean isShutdown() { return executorService.isShutdown(); } - public boolean isTerminated() { - return executorService.isTerminated(); - } - + @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executorService.awaitTermination(timeout, unit); } diff --git a/src/main/java/paxel/lintstone/impl/MessageContext.java b/src/main/java/paxel/lintstone/impl/MessageContext.java index 8fd21a1..85eb4ac 100644 --- a/src/main/java/paxel/lintstone/impl/MessageContext.java +++ b/src/main/java/paxel/lintstone/impl/MessageContext.java @@ -2,6 +2,7 @@ import paxel.lintstone.api.*; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -47,6 +48,15 @@ public void tell(String name, Object msg) throws UnregisteredRecipientException actor.get().send(msg, self, null); } + @Override + public void tell(String name, Object msg, Duration delay) throws UnregisteredRecipientException { + Optional actor = actorSystem.getOptionalActor(name); + if (actor.isEmpty()) { + throw new UnregisteredRecipientException("Actor with name " + name + " does not exist"); + } + actor.get().send(msg, self, null, delay); + } + @Override public void ask(String name, Object msg, ReplyHandler handler) throws UnregisteredRecipientException { Optional actor = actorSystem.getOptionalActor(name); diff --git a/src/main/java/paxel/lintstone/impl/SimpleScheduler.java b/src/main/java/paxel/lintstone/impl/SimpleScheduler.java new file mode 100644 index 0000000..62ef0f0 --- /dev/null +++ b/src/main/java/paxel/lintstone/impl/SimpleScheduler.java @@ -0,0 +1,85 @@ +package paxel.lintstone.impl; + +import paxel.lintstone.api.Scheduler; + +import java.time.Duration; +import java.time.Instant; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Simple implementation of a Timer without Synchronize keyword usage + */ +public class SimpleScheduler implements Scheduler, Runnable { + + private final ConcurrentSkipListSet jobs = new ConcurrentSkipListSet<>(); + + private final AtomicBoolean stop = new AtomicBoolean(false); + ReentrantLock lock = new ReentrantLock(); + Condition newJob = lock.newCondition(); + + @Override + public void runLater(Runnable runnable, Duration duration) { + lock.lock(); + try { + ScheduledRunnable scheduledRunnable = new ScheduledRunnable(Instant.now().plus(duration), runnable); + jobs.add(scheduledRunnable); + newJob.signalAll(); + } finally { + lock.unlock(); + } + } + + private TimerTask wrapRunnable(Runnable runnable) { + return new TimerTask() { + @Override + public void run() { + runnable.run(); + } + }; + } + + @Override + public void shutDown() { + this.stop.set(true); + } + + @Override + public void run() { + try { + + while (!stop.get()) { + lock.lock(); + try { + if (jobs.isEmpty()) { + newJob.await(); + } + if (!jobs.isEmpty()) { + if (jobs.getFirst().start.isAfter(Instant.now())) { + // next job is in the future so we need to wait until it is ready, or a new one arrives + newJob.await(Math.max(100L, Duration.between(Instant.now(), jobs.getFirst().start()).toMillis() + 10), TimeUnit.MILLISECONDS); + } else { + ScheduledRunnable scheduledRunnable = jobs.pollFirst(); + scheduledRunnable.runnable().run(); + } + } + } finally { + lock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private record ScheduledRunnable(Instant start, Runnable runnable) implements Comparable { + @Override + public int compareTo(ScheduledRunnable o) { + return this.start.compareTo(o.start); + } + } +} diff --git a/src/test/java/paxel/lintstone/impl/SimpleSchedulerTest.java b/src/test/java/paxel/lintstone/impl/SimpleSchedulerTest.java new file mode 100644 index 0000000..8a8ae59 --- /dev/null +++ b/src/test/java/paxel/lintstone/impl/SimpleSchedulerTest.java @@ -0,0 +1,30 @@ +package paxel.lintstone.impl; + +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + +import static org.hamcrest.MatcherAssert.assertThat; + +class SimpleSchedulerTest { + + @Test + void testOrderAndWhereIsTheTwo() throws InterruptedException { + SimpleScheduler scheduler = new SimpleScheduler(); + new Thread(scheduler::run).start(); + LinkedBlockingDeque order = new LinkedBlockingDeque<>(); + + scheduler.runLater(() -> order.add("five"), Duration.ofMillis(700)); + scheduler.runLater(() -> order.add("one"), Duration.ofMillis(100)); + scheduler.runLater(() -> order.add("4"), Duration.ofMillis(400)); + scheduler.runLater(() -> order.add("3"), Duration.ofMillis(300)); + CountDownLatch latch = new CountDownLatch(1); + scheduler.runLater(() -> latch.countDown(), Duration.ofSeconds(1)); + latch.await(); + assertThat(order, IsIterableContainingInOrder.contains("one", "3", "4", "five")); + } + +} \ No newline at end of file