Skip to content

Commit

Permalink
leanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
paxel committed Oct 1, 2023
1 parent a41f2f9 commit 2aa7fe0
Show file tree
Hide file tree
Showing 21 changed files with 564 additions and 225 deletions.
14 changes: 10 additions & 4 deletions src/main/java/paxel/lintstone/impl/ActorSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,37 @@ private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory

@Override
public void shutDown() {
actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(false));
shutdownActors(false);
groupingExecutor.shutdown();
}

@Override
public void shutDownAndWait() throws InterruptedException {
actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(false));
shutdownActors(false);
groupingExecutor.shutdown();
//wait forever and a day
groupingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}

@Override
public boolean shutDownAndWait(Duration timeout) throws InterruptedException {
actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(false));
shutdownActors(false);
groupingExecutor.shutdown();
return groupingExecutor.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS);
}

@Override
public void shutDownNow() {
actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(true));
shutdownActors(true);
groupingExecutor.shutdownNow();
}

private void shutdownActors(boolean now) {
synchronized (actors) {
actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(now));
}
}


@Override
public boolean unregisterActor(String name) {
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/paxel/lintstone/api/ActorSortTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package paxel.lintstone.api;

import org.junit.jupiter.api.Test;
import paxel.lintstone.api.actors.SortNodeActor;

import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -37,6 +38,6 @@ void sort() throws ExecutionException, InterruptedException {
}

// stop system
system.shutDown();
system.shutDownNow();
}
}
8 changes: 0 additions & 8 deletions src/test/java/paxel/lintstone/api/EndMessage.java

This file was deleted.

42 changes: 2 additions & 40 deletions src/test/java/paxel/lintstone/api/ExternalAskTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package paxel.lintstone.api;

import org.junit.Test;
import paxel.lintstone.api.actors.Md5Actor;
import paxel.lintstone.api.messages.EndMessage;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Formatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -86,40 +84,4 @@ public void testAskExternal() throws InterruptedException {
system.shutDown();
}

private static class Md5Actor implements LintStoneActor {
private MessageDigest md5;

@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(String.class, (name, m) -> this.add(name.getBytes(StandardCharsets.UTF_8))).inCase(ByteBuffer.class, (byteBuffer, m) -> {
if (byteBuffer.hasArray())
add(byteBuffer.array());
}).inCase(EndMessage.class, (dmg, m) -> {
m.reply(getMd5String());
// let's die
m.unregister();
});
}

private Object getMd5String() {
byte[] digest = md5.digest();
Formatter f = new Formatter(new StringBuilder());
for (byte x :
digest) {
f.format("%01x", x);
}
return f.toString();
}

private void add(byte[] bytes) {
if (md5 == null) {
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
// ignorable for this test
}
}
md5.update(bytes);
}
}
}
81 changes: 1 addition & 80 deletions src/test/java/paxel/lintstone/api/FailingTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.junit.Assert;
import org.junit.Test;
import paxel.lintstone.impl.FailedMessage;
import paxel.lintstone.api.actors.StupidActor;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,83 +86,4 @@ public void testGetDataOut() throws InterruptedException, ExecutionException {
system.shutDownAndWait();
}

private static class StupidActor implements LintStoneActor {

@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec
.inCase(String.class, this::handleString)
.inCase(FailedMessage.class, this::handleFail)
.otherwise((o, m) -> System.out.println("otherwise: " + o));
}

private void handleString(String go, LintStoneMessageEventContext mec) {
LintStoneActorAccessor registered = mec.registerActor(FAILING, () -> m -> {
// this temporay actor will fail with each message that it receives
throw new IllegalArgumentException("Go away");
}, ActorSettings.DEFAULT);

if (registered.exists()) {
// the actor is registered, registering it again will not create a new actor but the previous one
// in that case the "Go away" actor
LintStoneActorAccessor reRegister = mec.registerActor(FAILING, () -> m -> {
// no fail anymore, but this factory will not be called
}, ActorSettings.DEFAULT);

if (reRegister.exists()) {
// so this first message to the actor should fail and be given to the errorhandler
// it also should cause a FailedMessage to be returned to us, that the Message could not be processed
reRegister.send("Hi!");
}
}
// We send a message to ourselves, that we don't support
// the false object will end in the otherwise branch of newMessageEvent
mec.send(mec.getName(), Boolean.FALSE);
try {
mec.send("Unknown Actor", "Will not be delivered");
throw new IllegalStateException("Should have failed");
} catch (UnregisteredRecipientException unregisteredRecipientException) {
// we can't send to unknown actors
}

LintStoneActorAccessor actor = mec.getActor(NOT_EXISTANT);

if (!actor.exists()) {
try {
actor.send("fail me");
throw new IllegalStateException("Should have failed");
} catch (UnregisteredRecipientException unregisteredRecipientException) {
}
// register an actor with that name
mec.registerActor(NOT_EXISTANT, () -> a -> {
}, ActorSettings.DEFAULT);

boolean exists = actor.exists();
// would throw exception if LintStoneActorAccessor is not self updating
actor.send("This actor reference works now: " + exists);
}
}

private void handleFail(FailedMessage go, LintStoneMessageEventContext m) {
// The failed message was sent by the temporary actor, because it could not process it
System.out.println("Failed on " + go.actorName() + " because " + go.cause() + " when processing " + go.message());

final LintStoneActorAccessor me = m.getActor(m.getName());
me.send(true);
// we unregister ourselves
m.unregister();
if (me.exists()) {
throw new IllegalStateException("I was just unregistered");
}
try {
me.send("will not happen");
throw new IllegalStateException("Should have failed");
} catch (UnregisteredRecipientException unregisteredRecipientException) {
}

// end the test
m.getActor(STOP_ACTOR).send("stop");
}
}

}
82 changes: 6 additions & 76 deletions src/test/java/paxel/lintstone/api/InternalAskTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package paxel.lintstone.api;

import org.junit.Test;
import paxel.lintstone.api.actors.CharCount;
import paxel.lintstone.api.actors.Distributor;
import paxel.lintstone.api.actors.Sorter;
import paxel.lintstone.api.actors.WordCount;
import paxel.lintstone.api.messages.EndMessage;

import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -35,8 +37,7 @@ public InternalAskTest() {
@Test
public void testAskExternal() throws InterruptedException, ExecutionException, TimeoutException {
LintStoneSystem system = LintStoneSystemFactory.create();
// the entry actor is limited to 1 message at the time input queue
// just to test that the messages are added even so we send faster than the actor can process (creating backpressure)

LintStoneActorAccessor dist = system.registerActor("dist", Distributor::new, ActorSettings.DEFAULT);
system.registerActor("wordCount", WordCount::new, ActorSettings.DEFAULT);
system.registerActor("charCount", CharCount::new, ActorSettings.DEFAULT);
Expand All @@ -63,75 +64,4 @@ public void testAskExternal() throws InterruptedException, ExecutionException, T
}


private static class Distributor implements LintStoneActor {
@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(String.class, this::send).inCase(EndMessage.class, (dmg, askContext) -> {
CompletableFuture<Integer> words = new CompletableFuture<>();
CompletableFuture<Integer> chars = new CompletableFuture<>();
CompletableFuture<String> sort = new CompletableFuture<>();
// each of these completes will be called in the thread context of this actor
mec.ask("wordCount", new EndMessage(), c -> c.inCase(Integer.class, (r, replyContext) -> words.complete(r)));
mec.ask("charCount", new EndMessage(), c -> c.inCase(Integer.class, (r, replyContext) -> chars.complete(r)));
mec.ask("sorter", new EndMessage(), c -> c.inCase(String.class, (r, replyContext) -> sort.complete(r)));

// when the last reply comes, the reply of the external ask is fulfilled.
CompletableFuture.allOf(words, chars, sort).thenApply(x -> {
try {
askContext.reply(MessageFormat.format("{0} words, {1} letters, {2}", words.get(), chars.get(), sort.get()));
} catch (Exception e) {
askContext.reply(e.getMessage());
}
return null;
});
});
}


private void send(String txt, LintStoneMessageEventContext mec) {
mec.send("wordCount", txt);
mec.send("charCount", txt);
mec.send("sorter", txt);
}
}

private static class WordCount implements LintStoneActor {
private int count;

@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(String.class, (txt, m) -> {
int length = (int) Arrays.stream(txt.trim().split(" ")).filter(f->!f.trim().isEmpty()).count();
this.count += length;
}).inCase(EndMessage.class, (dmg, askContext) -> {
askContext.reply(count);
askContext.unregister();
});
}
}

private static class CharCount implements LintStoneActor {
private int count;

@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(String.class, (txt, m) -> this.count += txt.replaceAll(" ", "").length()).inCase(EndMessage.class, (dmg, askContext) -> {
askContext.reply(count);
askContext.unregister();
});
}
}

private static class Sorter implements LintStoneActor {
final Set<String> words = new HashSet<>();

@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(String.class, (txt, m) -> words.addAll(Arrays.stream(txt.trim().split(" ")).filter(f->!f.trim().isEmpty()).map(String::toLowerCase).collect(Collectors.toList()))).inCase(EndMessage.class, (dmg, askContext) -> {
ArrayList<String> list = new ArrayList<>(words);
Collections.sort(list);
askContext.reply(String.join(",", list));
});
}
}
}
3 changes: 3 additions & 0 deletions src/test/java/paxel/lintstone/api/LintStoneSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import static org.hamcrest.MatcherAssert.*;

import org.junit.Test;
import paxel.lintstone.api.actors.AdderActor;
import paxel.lintstone.api.actors.SumActor;
import paxel.lintstone.api.messages.EndMessage;

/**
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package paxel.lintstone.api;
package paxel.lintstone.api.actors;

import paxel.lintstone.api.LintStoneActor;
import paxel.lintstone.api.LintStoneMessageEventContext;
import paxel.lintstone.api.messages.DieMessage;
import paxel.lintstone.api.messages.EndMessage;

public class AdderActor implements LintStoneActor {

private long sum;
private String name;


@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(Integer.class, this::addInteger)
.inCase(EndMessage.class, this::endSum)
.inCase(String.class, this::name)
.inCase(DieMessage.class, this::unregister)
.otherwise((o, m) -> System.err.println("Unknown message " + o));
.otherwise(this::other);
}

private void addInteger(Integer num, LintStoneMessageEventContext mec) {
int last = -1;
if (last >= num) {

// make sure that the order is correct
throw new IllegalStateException("Expected something bigger than " + last + " but got "+num);
}
sum += num;
}

Expand All @@ -37,4 +37,8 @@ private void unregister(DieMessage msg, LintStoneMessageEventContext mec) {
System.out.println("Actor " + name + " unregistered: " + unregister);
}

private void other(Object o, LintStoneMessageEventContext m) {
System.err.println("Unknown message " + o);
}

}
Loading

0 comments on commit 2aa7fe0

Please sign in to comment.