Skip to content

Commit

Permalink
more adaptions
Browse files Browse the repository at this point in the history
  • Loading branch information
paxel committed Sep 25, 2023
1 parent 4e0df7a commit eca84d7
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>io.github.paxel</groupId>
<artifactId>lintstone</artifactId>
<artifactId>lintstone-java11</artifactId>
<version>1.0.0-SNAPSHOT</version>

<name>LintStone - Actor System</name>
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/paxel/lintstone/api/ActorSettingsBuilder.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package paxel.lintstone.api;

import lombok.Getter;
import paxel.bulkexecutor.ErrorHandler;
import paxel.lintstone.impl.ActorSettingsImpl;

@Getter
public class ActorSettingsBuilder {
private int batch = 1;
private ErrorHandler errorHandler = x -> true;

public ErrorHandler getErrorHandler() {
return errorHandler;
}

public ActorSettingsBuilder setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
return this;
}

public int getBatch() {
return batch;
}

public ActorSettingsBuilder setBatch(int batch) {
this.batch = batch;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* implement it for each message type it wants to process and delegate the
* function to the context. The context is given to the function so that the
* context must not be stored as member.
*
* <p>
* Basically this is our way to make typesafe calls without instanceof switches
* or if else trees
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface LintStoneFailedMessage {
/**
* Retrieve the name of the failing actor.
*
* @return
* @return The name of the actor
*/
String getActorName();
}
3 changes: 2 additions & 1 deletion src/main/java/paxel/lintstone/api/LintStoneSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public interface LintStoneSystem {
* timeout duration has passed.
*
* @param timeout the duration to wait
* @return {@code true} if shutdown happened before the timeout.
* @throws java.lang.InterruptedException in case the Thread is interrupted
* while shutting down.
*/
void shutDownAndWait(Duration timeout) throws InterruptedException;
boolean shutDownAndWait(Duration timeout) throws InterruptedException;

/**
* This immediately kills the executor.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/paxel/lintstone/api/package-info.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package paxel.lintstone.api;

/**
* Contains all interfaces and classes that are required for casual usage of the
* lintstone actor system.
*/
package paxel.lintstone.api;

9 changes: 2 additions & 7 deletions src/main/java/paxel/lintstone/impl/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ void send(Object message, Optional<SelfUpdatingActorAccess> sender, Optional<Rep
// TODO: catch exception. introduce error handler.
};
if (blockThreshold == null) {
if (!sequentialProcessor.add(runnable)) {
throw new IllegalStateException("The sequential processor rejected the message.");
}
sequentialProcessor.add(runnable);
} else {
if (!sequentialProcessor.addWithBackPressure(runnable, blockThreshold)) {
throw new IllegalStateException("The sequential processor rejected the message.");
Expand Down Expand Up @@ -107,7 +105,7 @@ public void run(ReplyHandler replyHandler, Object reply) {
if (!registered) {
throw new UnregisteredRecipientException("Actor " + name + " is not registered");
}
boolean success = sequentialProcessor.add(() -> {
sequentialProcessor.add(() -> {
// we update the message context with the reply and give it to the reply handler
MessageContext mec = messageContextFactory.create(reply, (msg, self) -> this.handleReply(msg, self, Optional.empty(), Optional.empty()));
try {
Expand All @@ -117,9 +115,6 @@ public void run(ReplyHandler replyHandler, Object reply) {
}
// TODO: catch exception. introduce error handler.
});
if (!success) {
throw new IllegalStateException("The sequential processor rejected the Runnable.");
}
totalReplies.incrementAndGet();
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/paxel/lintstone/impl/ActorSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public void shutDownAndWait() throws InterruptedException {
}

@Override
public void shutDownAndWait(Duration timeout) throws InterruptedException {
public boolean shutDownAndWait(Duration timeout) throws InterruptedException {
executorService.shutdown();
executorService.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS);
return executorService.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/paxel/lintstone/api/InternalAskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private static class Sorter implements LintStoneActor {

@Override
public void newMessageEvent(LintStoneMessageEventContext mec) {
mec.inCase(String.class, (txt, m) -> words.addAll(Arrays.asList(txt.trim().split(" ")).stream().filter(f->!f.trim().isEmpty()).map(String::toLowerCase).collect(Collectors.toList()))).inCase(EndMessage.class, (dmg, askContext) -> {
mec.inCase(String.class, (txt, m) -> words.addAll(Arrays.stream(txt.trim().split(" ")).filter(f->!f.trim().isEmpty()).map(String::toLowerCase).collect(Collectors.toList()))).inCase(EndMessage.class, (dmg, askContext) -> {
ArrayList<String> list = new ArrayList<>(words);
Collections.sort(list);
askContext.reply(String.join(",", list));
Expand Down

0 comments on commit eca84d7

Please sign in to comment.