diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml new file mode 100644 index 0000000..688f8ea --- /dev/null +++ b/.github/workflows/build-test.yml @@ -0,0 +1,15 @@ +name: Java CI +on: [push] +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + cache: maven + - name: Build with Maven + run: mvn --batch-mode --update-snapshots verify \ No newline at end of file diff --git a/README.md b/README.md index e4bc800..415f64c 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ LintStoneSystem system = LintStoneSystemFactory.create(Executors.newCachedThread the system creates the actors ```java -LintStoneActorAccess fileCollector = system.registerActor("fileCollector", () -> new FileCollector(cfg), Optional.empty()); +LintStoneActorAccess fileCollector = system.registerActor("fileCollector", () -> new FileCollector(cfg), Optional.empty(), ActorSettings.DEFAULT); ... fileCollector.send(FileCollector.fileMessage(root, readOnly)); ``` @@ -52,7 +52,7 @@ The message is enqueued and eventually processed by the actor instance } else { fileData += length; final LintStoneActorAccess actor = actors.computeIfAbsent(length, k -> { - return m.registerActor("counter-" + length, () -> new FileComparator(length), Optional.empty()); + return m.registerActor("counter-" + length, () -> new FileComparator(length), Optional.empty(), ActorSettings.DEFAULT); }); actor.send(fileMessage(f, readOnly)); } @@ -72,18 +72,13 @@ If the asked actor responds to the message, the consumer is called with the resp This is a good way to get the result from the system in case of a multithreaded process. ```java -CompletableFuture result = new CompletableFuture(); for (String text : data) { dist.send(text); } //finally ask for result -dist.ask(new EndMessage(), replyMec -> { - replyMec.inCase(String.class, (reply, ignored) -> { - result.complete(reply); - }); -}); - -Object v = result.get(1, TimeUnit.MINUTES); -``` \ No newline at end of file + +String v = dist.ask(new EndMessage()) + .get(1, TimeUnit.MINUTES); +``` diff --git a/pom.xml b/pom.xml index 608edf9..ade7704 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,7 @@ - + 4.0.0 io.github.paxel @@ -30,12 +31,27 @@ bundle + + + + org.junit + junit-bom + 5.9.2 + pom + import + + + - junit - junit - 4.13.2 - test + io.github.paxel + group-executor + 0.11.1 + + + org.projectlombok + lombok + 1.18.28 org.hamcrest @@ -57,9 +73,14 @@ test - io.github.paxel - group-executor - 0.10.7 + org.junit.jupiter + junit-jupiter + test + + + org.junit.vintage + junit-vintage-engine + test @@ -69,6 +90,10 @@ org.apache.maven.plugins maven-javadoc-plugin 3.0.1 + + + ${java.home}/bin/javadoc + attach-javadoc @@ -159,7 +184,7 @@ ${java.home}/bin/java -classpath - + org.openjdk.jmh.Main .* diff --git a/src/main/java/paxel/lintstone/api/ActorSettings.java b/src/main/java/paxel/lintstone/api/ActorSettings.java index 8e93b1f..c674937 100644 --- a/src/main/java/paxel/lintstone/api/ActorSettings.java +++ b/src/main/java/paxel/lintstone/api/ActorSettings.java @@ -12,19 +12,7 @@ public interface ActorSettings { */ ActorSettings DEFAULT = ActorSettings.create().build(); - /** - * Defines if the Actor receives messages from multiple sources. - * - * @return true if multiple sources send messages to the actor. - */ - boolean isMulti(); - /** - * Defines if the send message to the actor should block until the limited queue has space for another message or if the message is ignored (and returned false. - * - * @return true if the send message should block until the message can be enqueued. - */ - boolean isBlocking(); /** * The number of messages that should be processed by the actor in one batch. @@ -33,12 +21,6 @@ public interface ActorSettings { */ int getBatch(); - /** - * The limit of the input queue of the actor. - * - * @return the limit. - */ - int getLimit(); /** * The handler for uncaught exceptions in the actor. diff --git a/src/main/java/paxel/lintstone/api/ActorSettingsBuilder.java b/src/main/java/paxel/lintstone/api/ActorSettingsBuilder.java index 790f2bd..ec0b3b7 100644 --- a/src/main/java/paxel/lintstone/api/ActorSettingsBuilder.java +++ b/src/main/java/paxel/lintstone/api/ActorSettingsBuilder.java @@ -4,10 +4,7 @@ import paxel.lintstone.impl.ActorSettingsImpl; public class ActorSettingsBuilder { - private boolean blocking; private int batch = 1; - private int limit; - private boolean multi = true; private ErrorHandler errorHandler = x -> true; public ErrorHandler getErrorHandler() { @@ -28,34 +25,8 @@ public ActorSettingsBuilder setBatch(int batch) { return this; } - public boolean isBlocking() { - return blocking; - } - - public ActorSettingsBuilder setBlocking(boolean blocking) { - this.blocking = blocking; - return this; - } - - public long getLimit() { - return limit; - } - - public ActorSettingsBuilder setLimit(int limit) { - this.limit = limit; - return this; - } - - public boolean isMulti() { - return multi; - } - - public ActorSettingsBuilder setMulti(boolean multi) { - this.multi = multi; - return this; - } public ActorSettings build() { - return new ActorSettingsImpl(limit,multi,batch,errorHandler, blocking); + return new ActorSettingsImpl(batch, errorHandler); } } diff --git a/src/main/java/paxel/lintstone/api/LintStoneActorAccess.java b/src/main/java/paxel/lintstone/api/LintStoneActorAccess.java index 5cd38b6..a0eb699 100644 --- a/src/main/java/paxel/lintstone/api/LintStoneActorAccess.java +++ b/src/main/java/paxel/lintstone/api/LintStoneActorAccess.java @@ -1,5 +1,7 @@ package paxel.lintstone.api; +import java.util.concurrent.CompletableFuture; + /** * This interface is used to send messages to an actor. This object should never * be used multithreaded unless synchronized externally. @@ -14,6 +16,16 @@ public interface LintStoneActorAccess { */ void send(Object message) throws UnregisteredRecipientException; + /** + * Sends a message to the Actor represented by this Access. But blocks the call until the number of messages queued + * is less than the given threshold. If someone else is sending messages to the actor, this call might block forever. + * + * @param message The message to send- + * @param blockThreshold The number of queued messages that causes the call to block. + * @throws UnregisteredRecipientException in case the actor does not exist. + */ + void sendWithBackPressure(Object message, int blockThreshold) throws UnregisteredRecipientException; + /** * Retrieve if the actor is currently registered. using this does not ensure * that send will work, depending on how you register and unregister Actors. @@ -38,6 +50,17 @@ public interface LintStoneActorAccess { */ void ask(Object message, ReplyHandler replyHandler) throws UnregisteredRecipientException; + /** + * A convenient {@link #ask(Object, ReplyHandler)} that returns and completes a {@link CompletableFuture} once, if the replied type is correct. + * Otherwise finishes exceptional with a {@link ClassCastException} + * + * @param message the Message for the actor + * @param The type of the expected reply + * @return The future result. It will be completed in the context of the asked actor. + * @throws UnregisteredRecipientException in case the actor does not exist. + */ + CompletableFuture ask(Object message) throws UnregisteredRecipientException; + /** * Retrieve the total amount of queued messages and replies of this actor. * diff --git a/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java b/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java index de41387..34e6c51 100644 --- a/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java +++ b/src/main/java/paxel/lintstone/api/LintStoneMessageEventContext.java @@ -1,6 +1,7 @@ package paxel.lintstone.api; import java.util.Optional; +import java.util.concurrent.CompletableFuture; /** * Represents the access to the message and the actor system for one message @@ -70,6 +71,20 @@ public interface LintStoneMessageEventContext { */ void ask(String name, Object msg, ReplyHandler handler) throws UnregisteredRecipientException; + /** + * Sends the message to the actor with the registered name. + * The first reply will complete the resulting future in the context of this actor. + * If the replied type doesn't match the future it is completed exceptionally. + * + * @param name the name of the actor. + * @param msg The message to send. + * @param the type of the future. + * @return the future result. + * @throws UnregisteredRecipientException if there is no actor with that + * name. + */ + CompletableFuture ask(String name, Object msg) throws UnregisteredRecipientException; + /** * Retrieve the actor with given name. This method will always return an * object. Use the provided object to check if the actor exists by calling {@link LintStoneActorAccess#exists() diff --git a/src/main/java/paxel/lintstone/impl/Actor.java b/src/main/java/paxel/lintstone/impl/Actor.java index ed7dc18..d454236 100644 --- a/src/main/java/paxel/lintstone/impl/Actor.java +++ b/src/main/java/paxel/lintstone/impl/Actor.java @@ -39,11 +39,13 @@ boolean isValid() { return registered == true; } - void send(Object message, Optional sender, Optional replyHandler) throws UnregisteredRecipientException { + void send(Object message, Optional sender, Optional replyHandler, Integer blockThreshold) throws UnregisteredRecipientException { if (!registered) { throw new UnregisteredRecipientException("Actor " + name + " is not registered"); } - boolean success = sequentialProcessor.add(() -> { + + + Runnable runnable = () -> { // create mec and delegate replies to our handleReply method MessageContext mec = messageContextFactory.create(message, (msg, self) -> { this.handleReply(msg, self, sender, replyHandler); @@ -59,9 +61,15 @@ void send(Object message, Optional sender, Optional initMessage, Optional sender) { - return registerActor(name, factory, initMessage, sender, groupingExecutor.create().setMultiSource(true).build()); + return registerActor(name, factory, initMessage, sender, groupingExecutor.create().build()); } LintStoneActorAccess registerActor(String name, LintStoneActorFactory factory, Optional initMessage, Optional sender, ActorSettings settings) { SequentialProcessorBuilder sequentialProcessorBuilder = groupingExecutor.create(); - sequentialProcessorBuilder.setMultiSource(settings.isMulti()); sequentialProcessorBuilder.setBatchSize(settings.getBatch()); - sequentialProcessorBuilder.setLimited(settings.getLimit()); - sequentialProcessorBuilder.setLimited(settings.getLimit()); - sequentialProcessorBuilder.setBlocking(settings.isBlocking()); sequentialProcessorBuilder.setErrorHandler(settings.getErrorHandler()); return registerActor(name, factory, initMessage, sender, sequentialProcessorBuilder.build()); } @@ -60,7 +56,7 @@ private LintStoneActorAccess registerActor(String name, LintStoneActorFactory fa LintStoneActor actorInstance = factory.create(); Actor newActor = new Actor(name, actorInstance, sequentialProcessor, this, sender); // actor receives the initMessage as first message. - initMessage.ifPresent(msg -> newActor.send(msg, Optional.empty(), null)); + initMessage.ifPresent(msg -> newActor.send(msg, Optional.empty(), null, null)); actors.put(name, newActor); return new SelfUpdatingActorAccess(name, newActor, this, sender); } diff --git a/src/main/java/paxel/lintstone/impl/MessageContext.java b/src/main/java/paxel/lintstone/impl/MessageContext.java index 09d74f7..8f9bed9 100644 --- a/src/main/java/paxel/lintstone/impl/MessageContext.java +++ b/src/main/java/paxel/lintstone/impl/MessageContext.java @@ -3,6 +3,7 @@ import paxel.lintstone.api.*; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; /** @@ -43,15 +44,35 @@ public void send(String name, Object msg) throws UnregisteredRecipientException if (!actor.isPresent()) { throw new UnregisteredRecipientException("Actor with name " + name + " does not exist"); } - actor.get().send(msg, Optional.of(self), null); + actor.get().send(msg, Optional.of(self), null, null); } + @Override public void ask(String name, Object msg, ReplyHandler handler) throws UnregisteredRecipientException { Optional actor = actorSystem.getActor(name); if (!actor.isPresent()) { throw new UnregisteredRecipientException("Actor with name " + name + " does not exist"); } - actor.get().send(msg, Optional.of(self), Optional.of(handler)); + actor.get().send(msg, Optional.of(self), Optional.of(handler), null); + } + + @Override + public CompletableFuture ask(String name, Object msg) throws UnregisteredRecipientException { + Optional actor = actorSystem.getActor(name); + if (!actor.isPresent()) { + throw new UnregisteredRecipientException("Actor with name " + name + " does not exist"); + } + CompletableFuture result = new CompletableFuture<>(); + actor.get().send(msg, Optional.of(self), Optional.of(mec -> { + mec.otherwise((m, o) -> { + try { + result.complete((F) o); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + }), null); + return result; } @@ -62,12 +83,9 @@ public LintStoneActorAccess getActor(String name) { } - - - @Override public LintStoneActorAccess registerActor(String name, LintStoneActorFactory factory, Optional initMessage, ActorSettings settings) { - return actorSystem.registerActor(name, factory, initMessage, Optional.of(self),settings); + return actorSystem.registerActor(name, factory, initMessage, Optional.of(self), settings); } @Override diff --git a/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccess.java b/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccess.java index 6204219..c2cd5ce 100644 --- a/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccess.java +++ b/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccess.java @@ -5,6 +5,7 @@ import paxel.lintstone.api.UnregisteredRecipientException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; /** * This ActorAccess will try to fetch a new instance of an actor in case the @@ -27,7 +28,12 @@ public class SelfUpdatingActorAccess implements LintStoneActorAccess { @Override public void send(Object message) throws UnregisteredRecipientException { - tell(message, sender, Optional.empty()); + tell(message, sender, Optional.empty(), null); + } + + @Override + public void sendWithBackPressure(Object message, int blockThreshold) throws UnregisteredRecipientException { + tell(message, sender, Optional.empty(),blockThreshold); } /** @@ -51,20 +57,20 @@ void run(ReplyHandler runnable, Object reply) throws UnregisteredRecipientExcept } public void send(Object message, SelfUpdatingActorAccess sender) throws UnregisteredRecipientException { - tell(message, Optional.ofNullable(sender), Optional.empty()); + tell(message, Optional.ofNullable(sender), Optional.empty(), null); } - private void tell(Object message, Optional sender, Optional replyHandler) throws UnregisteredRecipientException { + private void tell(Object message, Optional sender, Optional replyHandler, Integer blockThreshold) throws UnregisteredRecipientException { if (actor == null) { updateActor(); } try { - actor.send(message, sender, replyHandler); + actor.send(message, sender, replyHandler,blockThreshold); } catch (UnregisteredRecipientException ignoredOnce) { actor = null; updateActor(); // second try throws ,the exception to the outside, in case the actore provided was already unregistered. - actor.send(message, sender, replyHandler); + actor.send(message, sender, replyHandler,blockThreshold); } } @@ -85,9 +91,25 @@ public boolean exists() { @Override public void ask(Object message, ReplyHandler replyHandler) throws UnregisteredRecipientException { // replyHandler is required, therefore not Optional.ofNullable - tell(message, sender, Optional.of(replyHandler)); + tell(message, sender, Optional.of(replyHandler),null); } + @Override + public CompletableFuture ask(Object message) throws UnregisteredRecipientException { + CompletableFuture result = new CompletableFuture<>(); + tell(message, sender, Optional.of(mec -> { + mec.otherwise((reply, resultMec) -> { + try { + result.complete((F) reply); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + }),null); + return result; + } + + @Override public int getQueuedMessagesAndReplies() { return actor.getQueued(); diff --git a/src/test/java/paxel/lintstone/api/ActorSortTest.java b/src/test/java/paxel/lintstone/api/ActorSortTest.java new file mode 100644 index 0000000..7359940 --- /dev/null +++ b/src/test/java/paxel/lintstone/api/ActorSortTest.java @@ -0,0 +1,44 @@ +package paxel.lintstone.api; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; + +public class ActorSortTest { + + @Test + void sort() throws ExecutionException, InterruptedException { + + LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); + + LintStoneActorAccess root = system.registerActor("root", () -> new SortNodeActor("/"), Optional.empty(), ActorSettings.DEFAULT); + + // send numbers to root actor. it will generate an actor system that resembles a simple binary tree + Random random = new Random(1007); + int number = 10_000; + for (int i = 0; i < number; i++) { + root.sendWithBackPressure(random.nextLong(), 100); + } + + // request the sorted list from the root actor. It will also clean up the actor System + List sorted = root.>ask("get").get(); + + // check that the result is the same (is only true if no duplicates were created) + assertThat(sorted.size(), is(number)); + + // check that it's sorted + for (int i = 0; i < number - 1; i++) { + assertThat(String.format("at %d : %d greater than %d", i, sorted.get(i), sorted.get(i + 1)), sorted.get(i) <= sorted.get(i + 1)); + } + + // stop system + system.shutDown(); + } +} diff --git a/src/test/java/paxel/lintstone/api/ExternalAskTest.java b/src/test/java/paxel/lintstone/api/ExternalAskTest.java index 61c6074..c79e7d9 100644 --- a/src/test/java/paxel/lintstone/api/ExternalAskTest.java +++ b/src/test/java/paxel/lintstone/api/ExternalAskTest.java @@ -9,9 +9,12 @@ import java.util.Formatter; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -23,10 +26,50 @@ public class ExternalAskTest { CountDownLatch latch = new CountDownLatch(1); + @Test + public void testAskExternalFuture() throws InterruptedException, ExecutionException { + LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); + LintStoneActorAccess md5 = system.registerActor("md5", () -> new Md5Actor(), + Optional.empty(), ActorSettings.create().build()); + + md5.send("This is my test string"); + for (int i = 0; i < 1000; i++) { + md5.send(ByteBuffer.wrap(new byte[i])); + } + String result = md5.ask(new EndMessage()).get(); + + // this should be repeatable correct. because the messages are processed in correct order and the ask will be the last one + assertThat(result, is("993e7b2144d8c8a5cde9cf36463959e")); + assertThat(md5.getQueuedMessagesAndReplies(), is(0)); + assertThat(md5.getProcessedMessages(), is(1002L)); //1 string 1000 byte 1 ask + assertThat(md5.getProcessedReplies(), is(1L)); // processed the ask reply for external call + System.out.println(system); + system.shutDown(); + } + + @Test + public void testAskExternalFutureTypeFail() throws InterruptedException, ExecutionException { + LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); + LintStoneActorAccess md5 = system.registerActor("md5", () -> new Md5Actor(), + Optional.empty(), ActorSettings.create().build()); + + md5.send("This is my test string"); + for (int i = 0; i < 1000; i++) { + md5.send(ByteBuffer.wrap(new byte[i])); + } + + // wrong type cast, but pit it in Object, to avoid ClassCastException + Object result = md5.ask(new EndMessage()).get(); + + // was no integer + assertThat(result, is(instanceOf(String.class))); + } + @Test public void testAskExternal() throws InterruptedException { LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); - LintStoneActorAccess md5 = system.registerActor("md5", () -> new Md5Actor(), Optional.empty(), ActorSettings.create().setMulti(true).build()); + LintStoneActorAccess md5 = system.registerActor("md5", () -> new Md5Actor(), + Optional.empty(), ActorSettings.create().build()); AtomicReference result = new AtomicReference<>(); md5.send("This is my test string"); diff --git a/src/test/java/paxel/lintstone/api/FailingTests.java b/src/test/java/paxel/lintstone/api/FailingTests.java index aae4a05..a24d124 100644 --- a/src/test/java/paxel/lintstone/api/FailingTests.java +++ b/src/test/java/paxel/lintstone/api/FailingTests.java @@ -16,6 +16,9 @@ public class FailingTests { private static final Random R = new Random(0xbadbee); + public static final String LULU = "lulu"; + public static final String LALA = "lala"; + public static final String STOP_ACTOR = "floor"; CountDownLatch latch = new CountDownLatch(1); public FailingTests() { @@ -24,12 +27,16 @@ public FailingTests() { @Test public void testSomeMethod() throws InterruptedException { LintStoneSystem system = LintStoneSystemFactory.create(Executors.newWorkStealingPool()); - LintStoneActorAccess stopper = system.registerActor("floor", () -> a -> latch.countDown(), Optional.empty(), ActorSettings.create().setMulti(true).build()); - LintStoneActorAccess lala = system.registerActor("lala", () -> new StupidActor(), Optional.of("Go"), ActorSettings.create().setMulti(true).build()); + system.registerActor(STOP_ACTOR, () -> a -> { + latch.countDown(); + }, Optional.empty(), ActorSettings.create().build()); - LintStoneActorAccess lulu = system.registerActor("lulu", () -> a -> { + // this is actually happening + system.registerActor(LALA, () -> new StupidActor(), Optional.of("Go"), ActorSettings.create().build()); + + LintStoneActorAccess lulu = system.registerActor(LULU, () -> a -> { a.reply("nope"); - }, Optional.empty(), ActorSettings.create().setMulti(true).build()); + }, Optional.empty(), ActorSettings.create().build()); lulu.send("you ok?"); @@ -107,7 +114,7 @@ private void handleString(FailedMessage go, LintStoneMessageEventContext m) { LintStoneActorAccess oldActor = m.registerActor("someOne", () -> a -> { }, Optional.empty(), ActorSettings.DEFAULT); - oldActor.send("lala"); + oldActor.send("ho"); // unregister that one if (m.unregister("someOne")) { // register a new one @@ -117,7 +124,7 @@ private void handleString(FailedMessage go, LintStoneMessageEventContext m) { oldActor.send("works"); - m.getActor("floor").send("sztop"); + m.getActor(STOP_ACTOR).send("sztop"); } } } diff --git a/src/test/java/paxel/lintstone/api/InternalAskTest.java b/src/test/java/paxel/lintstone/api/InternalAskTest.java index 26440f8..ab17099 100644 --- a/src/test/java/paxel/lintstone/api/InternalAskTest.java +++ b/src/test/java/paxel/lintstone/api/InternalAskTest.java @@ -37,10 +37,10 @@ public void testAskExternal() throws InterruptedException, ExecutionException, T LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); // the entry actor is limited to 1 message at the time input queue // just to test that the messages are added even so we send faster than the actor can process (creating backpressure) - LintStoneActorAccess dist = system.registerActor("dist", () -> new Distributor(), Optional.empty(), ActorSettings.create().setMulti(true).setLimit(1).setBlocking(true).build()); - system.registerActor("wordCount", () -> new WordCount(), Optional.empty(), ActorSettings.create().setMulti(true).build()); - system.registerActor("charCount", () -> new CharCount(), Optional.empty(), ActorSettings.create().setMulti(true).build()); - system.registerActor("sorter", () -> new Sorter(), Optional.empty(), ActorSettings.create().setMulti(true).build()); + LintStoneActorAccess dist = system.registerActor("dist", () -> new Distributor(), Optional.empty(), ActorSettings.create().build()); + system.registerActor("wordCount", () -> new WordCount(), Optional.empty(), ActorSettings.create().build()); + system.registerActor("charCount", () -> new CharCount(), Optional.empty(), ActorSettings.create().build()); + system.registerActor("sorter", () -> new Sorter(), Optional.empty(), ActorSettings.create().build()); LintStoneSystem s = LintStoneSystemFactory.createLimitedThreadCount(5); LintStoneActorAccess syncedOut = s.registerActor("out", () -> mec -> mec.otherwise((o,m)->System.out.println(o)), Optional.empty(), ActorSettings.create().build()); diff --git a/src/test/java/paxel/lintstone/api/JmhTest.java b/src/test/java/paxel/lintstone/api/JmhTest.java index 5132847..fe1a052 100644 --- a/src/test/java/paxel/lintstone/api/JmhTest.java +++ b/src/test/java/paxel/lintstone/api/JmhTest.java @@ -112,10 +112,12 @@ public void run999ActorOnWorkStealingThreads() throws InterruptedException { private void run(int threads, int actorCount, int messages, LintStoneSystem system) throws InterruptedException, UnregisteredRecipientException { CountDownLatch latch = new CountDownLatch(threads); - system.registerActor("END", () -> new EndActor(latch), Optional.empty(), ActorSettings.create().setMulti(true).build()); + system.registerActor("END", () -> new EndActor(latch), + Optional.empty(), ActorSettings.create().build()); List actors = new ArrayList<>(); for (int i = 0; i < actorCount; i++) { - actors.add(system.registerActor(TEST + i, () -> new MessageActor(), Optional.empty(), ActorSettings.create().setMulti(true).build())); + actors.add(system.registerActor(TEST + i, () -> new MessageActor(), + Optional.empty(), ActorSettings.create().build())); } for (int i = 0; i < messages; i++) { actors.get(i % actorCount).send(i); diff --git a/src/test/java/paxel/lintstone/api/LintStoneSystemTest.java b/src/test/java/paxel/lintstone/api/LintStoneSystemTest.java index 803f27d..e4ca7fb 100644 --- a/src/test/java/paxel/lintstone/api/LintStoneSystemTest.java +++ b/src/test/java/paxel/lintstone/api/LintStoneSystemTest.java @@ -25,7 +25,7 @@ public LintStoneSystemTest() { @Test public void testSomeMethod() throws InterruptedException { LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); - LintStoneActorAccess sumActor = system.registerActor("sumActor", () -> new SumActor(this::result), Optional.empty(), ActorSettings.create().setMulti(true).build()); + LintStoneActorAccess sumActor = system.registerActor("sumActor", () -> new SumActor(this::result), Optional.empty(), ActorSettings.create().build()); Map actors = new HashMap<>(); for (int i = 1; i < 100000; i++) { @@ -33,7 +33,7 @@ public void testSomeMethod() throws InterruptedException { actors.computeIfAbsent(m, val -> { // create a new actor on the fly final String name = "addActor" + val; - final LintStoneActorAccess actor = system.registerActor(name, AdderActor::new, Optional.empty(), ActorSettings.create().setMulti(true).build()); + final LintStoneActorAccess actor = system.registerActor(name, AdderActor::new, Optional.empty(), ActorSettings.create().build()); // register the actor at the sum actor sumActor.send(name); // tell the adder his name. diff --git a/src/test/java/paxel/lintstone/api/ReplyTest.java b/src/test/java/paxel/lintstone/api/ReplyTest.java index 7ca4567..b8a1623 100644 --- a/src/test/java/paxel/lintstone/api/ReplyTest.java +++ b/src/test/java/paxel/lintstone/api/ReplyTest.java @@ -23,13 +23,15 @@ public ReplyTest() { @Test public void testSomeMethod() throws InterruptedException { LintStoneSystem system = LintStoneSystemFactory.createLimitedThreadCount(5); - LintStoneActorAccess alex = system.registerActor("Alex", () -> new FightActor(50, 1, 12, 0, 0), Optional.of("Uta"), ActorSettings.create().setMulti(true).build()); - LintStoneActorAccess uta = system.registerActor("Uta", () -> new FightActor(40, 3, 4, 1, 1), Optional.of("Alex"), ActorSettings.create().setMulti(true).build()); + LintStoneActorAccess alex = system.registerActor("Alex", () -> new FightActor(50, 1, 12, 0, 0), + Optional.of("Uta"), ActorSettings.create().build()); + LintStoneActorAccess uta = system.registerActor("Uta", () -> new FightActor(40, 3, 4, 1, 1), + Optional.of("Alex"), ActorSettings.create().build()); LintStoneActorAccess floor = system.registerActor("floor", () -> a -> { // someone died a.inCase(String.class, (n, mec) -> System.out.println(n + " lost")); latch.countDown(); - }, Optional.empty(), ActorSettings.create().setMulti(true).build()); + }, Optional.empty(), ActorSettings.create().build()); uta.send(new StartMessage()); diff --git a/src/test/java/paxel/lintstone/api/SortNodeActor.java b/src/test/java/paxel/lintstone/api/SortNodeActor.java new file mode 100644 index 0000000..623f534 --- /dev/null +++ b/src/test/java/paxel/lintstone/api/SortNodeActor.java @@ -0,0 +1,75 @@ +package paxel.lintstone.api; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class SortNodeActor implements LintStoneActor { + private final String name; + private Long value; + private LintStoneActorAccess left; + private LintStoneActorAccess right; + + public SortNodeActor(String name) { + this.name = name; + } + + @Override + public void newMessageEvent(LintStoneMessageEventContext mec) { + mec.inCase(Long.class, this::add) + .inCase(String.class, this::get) + .otherwise(this::err); + } + + private void err(Object o, LintStoneMessageEventContext lintStoneMessageEventContext) { + System.err.printf("Unsupported message %s: <%s>%n", o.getClass(), o); + } + + private void get(String ignore, LintStoneMessageEventContext lintStoneMessageEventContext) { + Optional.ofNullable(left) + // ask left side for values, if exists + .map(a -> a.>ask(ignore)) + .orElse(CompletableFuture.completedFuture(Collections.emptyList())) + .thenCompose(leftValues -> + Optional.ofNullable(right) + // ask right side for values, if exists + .map(b -> b.>ask(ignore)) + .orElse(CompletableFuture.completedFuture(Collections.emptyList())) + .thenApply(rightValues -> { + // create sorted list of values under this actor + List longs = sumList(leftValues, value, rightValues); + lintStoneMessageEventContext.reply(longs); + // kill self + lintStoneMessageEventContext.unregister(); + return longs; + })); + } + + private List sumList(List leftValues, Long value, List rightValues) { + ArrayList longs = new ArrayList<>(leftValues); + longs.add(value); + longs.addAll(rightValues); + return longs; + } + + private void add(Long value, LintStoneMessageEventContext lintStoneMessageEventContext) { + if (this.value == null || this.value == value) + // first value that we receive // same value received + this.value = value; + else if (this.value > value) { + // lesser than us. delegate to left actor + if (this.left == null) + this.left = lintStoneMessageEventContext.registerActor(name + "<", () -> new SortNodeActor(name + "<"), Optional.of(value), ActorSettings.DEFAULT); + else + left.send(value); + } else { + // bigger than us. delegate to right actor + if (this.right == null) + this.right = lintStoneMessageEventContext.registerActor(name + ">", () -> new SortNodeActor(name + ">"), Optional.of(value), ActorSettings.DEFAULT); + else + right.send(value); + } + } +}