Skip to content

Commit

Permalink
Merge branch 'feature/master/introduce_ask_future'
Browse files Browse the repository at this point in the history
  • Loading branch information
paxel committed Sep 4, 2023
2 parents 017031a + e4c4e1e commit 3a8f489
Show file tree
Hide file tree
Showing 20 changed files with 353 additions and 130 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Java CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn --batch-mode --update-snapshots verify
17 changes: 6 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ LintStoneSystem system = LintStoneSystemFactory.create(Executors.newCachedThread
the system creates the actors

```java
LintStoneActorAccess fileCollector = system.registerActor("fileCollector", () -> new FileCollector(cfg), Optional.empty());
LintStoneActorAccess fileCollector = system.registerActor("fileCollector", () -> new FileCollector(cfg), Optional.empty(), ActorSettings.DEFAULT);
...
fileCollector.send(FileCollector.fileMessage(root, readOnly));
```
Expand Down Expand Up @@ -52,7 +52,7 @@ The message is enqueued and eventually processed by the actor instance
} else {
fileData += length;
final LintStoneActorAccess actor = actors.computeIfAbsent(length, k -> {
return m.registerActor("counter-" + length, () -> new FileComparator(length), Optional.empty());
return m.registerActor("counter-" + length, () -> new FileComparator(length), Optional.empty(), ActorSettings.DEFAULT);
});
actor.send(fileMessage(f, readOnly));
}
Expand All @@ -72,18 +72,13 @@ If the asked actor responds to the message, the consumer is called with the resp
This is a good way to get the result from the system in case of a multithreaded process.

```java
CompletableFuture result = new CompletableFuture();

for (String text : data) {
dist.send(text);
}

//finally ask for result
dist.ask(new EndMessage(), replyMec -> {
replyMec.inCase(String.class, (reply, ignored) -> {
result.complete(reply);
});
});

Object v = result.get(1, TimeUnit.MINUTES);
```

String v = dist.<String>ask(new EndMessage())
.get(1, TimeUnit.MINUTES);
```
43 changes: 34 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.github.paxel</groupId>
Expand Down Expand Up @@ -30,12 +31,27 @@
</license>
</licenses>
<packaging>bundle</packaging>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.9.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
<groupId>io.github.paxel</groupId>
<artifactId>group-executor</artifactId>
<version>0.11.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
Expand All @@ -57,9 +73,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.paxel</groupId>
<artifactId>group-executor</artifactId>
<version>0.10.7</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand All @@ -69,6 +90,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.0.1</version>
<configuration>
<!-- fails to build in intellij on ubuntu for some reason otherwise -->
<javadocExecutable>${java.home}/bin/javadoc</javadocExecutable>
</configuration>
<executions>
<execution>
<id>attach-javadoc</id>
Expand Down Expand Up @@ -159,7 +184,7 @@
<executable>${java.home}/bin/java</executable>
<arguments>
<argument>-classpath</argument>
<classpath />
<classpath/>
<argument>org.openjdk.jmh.Main</argument>
<argument>.*</argument>
</arguments>
Expand Down
18 changes: 0 additions & 18 deletions src/main/java/paxel/lintstone/api/ActorSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,7 @@ public interface ActorSettings {
*/
ActorSettings DEFAULT = ActorSettings.create().build();

/**
* Defines if the Actor receives messages from multiple sources.
*
* @return true if multiple sources send messages to the actor.
*/
boolean isMulti();

/**
* Defines if the send message to the actor should block until the limited queue has space for another message or if the message is ignored (and returned false.
*
* @return true if the send message should block until the message can be enqueued.
*/
boolean isBlocking();

/**
* The number of messages that should be processed by the actor in one batch.
Expand All @@ -33,12 +21,6 @@ public interface ActorSettings {
*/
int getBatch();

/**
* The limit of the input queue of the actor.
*
* @return the limit.
*/
int getLimit();

/**
* The handler for uncaught exceptions in the actor.
Expand Down
31 changes: 1 addition & 30 deletions src/main/java/paxel/lintstone/api/ActorSettingsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import paxel.lintstone.impl.ActorSettingsImpl;

public class ActorSettingsBuilder {
private boolean blocking;
private int batch = 1;
private int limit;
private boolean multi = true;
private ErrorHandler errorHandler = x -> true;

public ErrorHandler getErrorHandler() {
Expand All @@ -28,34 +25,8 @@ public ActorSettingsBuilder setBatch(int batch) {
return this;
}

public boolean isBlocking() {
return blocking;
}

public ActorSettingsBuilder setBlocking(boolean blocking) {
this.blocking = blocking;
return this;
}

public long getLimit() {
return limit;
}

public ActorSettingsBuilder setLimit(int limit) {
this.limit = limit;
return this;
}

public boolean isMulti() {
return multi;
}

public ActorSettingsBuilder setMulti(boolean multi) {
this.multi = multi;
return this;
}

public ActorSettings build() {
return new ActorSettingsImpl(limit,multi,batch,errorHandler, blocking);
return new ActorSettingsImpl(batch, errorHandler);
}
}
23 changes: 23 additions & 0 deletions src/main/java/paxel/lintstone/api/LintStoneActorAccess.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package paxel.lintstone.api;

import java.util.concurrent.CompletableFuture;

/**
* This interface is used to send messages to an actor. This object should never
* be used multithreaded unless synchronized externally.
Expand All @@ -14,6 +16,16 @@ public interface LintStoneActorAccess {
*/
void send(Object message) throws UnregisteredRecipientException;

/**
* Sends a message to the Actor represented by this Access. But blocks the call until the number of messages queued
* is less than the given threshold. If someone else is sending messages to the actor, this call might block forever.
*
* @param message The message to send-
* @param blockThreshold The number of queued messages that causes the call to block.
* @throws UnregisteredRecipientException in case the actor does not exist.
*/
void sendWithBackPressure(Object message, int blockThreshold) throws UnregisteredRecipientException;

/**
* Retrieve if the actor is currently registered. using this does not ensure
* that send will work, depending on how you register and unregister Actors.
Expand All @@ -38,6 +50,17 @@ public interface LintStoneActorAccess {
*/
void ask(Object message, ReplyHandler replyHandler) throws UnregisteredRecipientException;

/**
* A convenient {@link #ask(Object, ReplyHandler)} that returns and completes a {@link CompletableFuture} once, if the replied type is correct.
* Otherwise finishes exceptional with a {@link ClassCastException}
*
* @param message the Message for the actor
* @param <F> The type of the expected reply
* @return The future result. It will be completed in the context of the asked actor.
* @throws UnregisteredRecipientException in case the actor does not exist.
*/
<F> CompletableFuture<F> ask(Object message) throws UnregisteredRecipientException;

/**
* Retrieve the total amount of queued messages and replies of this actor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package paxel.lintstone.api;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Represents the access to the message and the actor system for one message
Expand Down Expand Up @@ -70,6 +71,20 @@ public interface LintStoneMessageEventContext {
*/
void ask(String name, Object msg, ReplyHandler handler) throws UnregisteredRecipientException;

/**
* Sends the message to the actor with the registered name.
* The first reply will complete the resulting future in the context of this actor.
* If the replied type doesn't match the future it is completed exceptionally.
*
* @param name the name of the actor.
* @param msg The message to send.
* @param <F> the type of the future.
* @return the future result.
* @throws UnregisteredRecipientException if there is no actor with that
* name.
*/
<F> CompletableFuture<F> ask(String name, Object msg) throws UnregisteredRecipientException;

/**
* Retrieve the actor with given name. This method will always return an
* object. Use the provided object to check if the actor exists by calling {@link LintStoneActorAccess#exists()
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/paxel/lintstone/impl/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ boolean isValid() {
return registered == true;
}

void send(Object message, Optional<SelfUpdatingActorAccess> sender, Optional<ReplyHandler> replyHandler) throws UnregisteredRecipientException {
void send(Object message, Optional<SelfUpdatingActorAccess> sender, Optional<ReplyHandler> replyHandler, Integer blockThreshold) throws UnregisteredRecipientException {
if (!registered) {
throw new UnregisteredRecipientException("Actor " + name + " is not registered");
}
boolean success = sequentialProcessor.add(() -> {


Runnable runnable = () -> {
// create mec and delegate replies to our handleReply method
MessageContext mec = messageContextFactory.create(message, (msg, self) -> {
this.handleReply(msg, self, sender, replyHandler);
Expand All @@ -59,9 +61,15 @@ void send(Object message, Optional<SelfUpdatingActorAccess> sender, Optional<Rep
}
}
// TODO: catch exception. introduce error handler.
});
if (!success) {
throw new IllegalStateException("The sequential processor rejected the message.");
};
if (blockThreshold == null) {
if (!sequentialProcessor.add(runnable)) {
throw new IllegalStateException("The sequential processor rejected the message.");
}
} else {
if (!sequentialProcessor.addWithBackPressure(runnable, blockThreshold)) {
throw new IllegalStateException("The sequential processor rejected the message.");
}
}
totalMessages.incrementAndGet();
}
Expand Down
22 changes: 1 addition & 21 deletions src/main/java/paxel/lintstone/impl/ActorSettingsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,12 @@
import paxel.lintstone.api.ActorSettings;

public class ActorSettingsImpl implements ActorSettings {
private final int limit;
private final boolean multi;
private final boolean blocking;
private final int batch;
private final ErrorHandler errorHandler;

public ActorSettingsImpl(int limit, boolean multi, int batch, ErrorHandler errorHandler, boolean blocking) {
this.limit = limit;
this.multi = multi;
public ActorSettingsImpl(int batch, ErrorHandler errorHandler) {
this.batch = batch;
this.errorHandler = errorHandler;
this.blocking = blocking;
}

@Override
public int getLimit() {
return limit;
}

@Override
public boolean isMulti() {
return multi;
}

@Override
Expand All @@ -38,8 +22,4 @@ public ErrorHandler getErrorHandler() {
return errorHandler;
}

@Override
public boolean isBlocking() {
return blocking;
}
}
8 changes: 2 additions & 6 deletions src/main/java/paxel/lintstone/impl/ActorSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,12 @@ public LintStoneActorAccess registerActor(String name, LintStoneActorFactory fac


LintStoneActorAccess registerActor(String name, LintStoneActorFactory factory, Optional<Object> initMessage, Optional<SelfUpdatingActorAccess> sender) {
return registerActor(name, factory, initMessage, sender, groupingExecutor.create().setMultiSource(true).build());
return registerActor(name, factory, initMessage, sender, groupingExecutor.create().build());
}

LintStoneActorAccess registerActor(String name, LintStoneActorFactory factory, Optional<Object> initMessage, Optional<SelfUpdatingActorAccess> sender, ActorSettings settings) {
SequentialProcessorBuilder sequentialProcessorBuilder = groupingExecutor.create();
sequentialProcessorBuilder.setMultiSource(settings.isMulti());
sequentialProcessorBuilder.setBatchSize(settings.getBatch());
sequentialProcessorBuilder.setLimited(settings.getLimit());
sequentialProcessorBuilder.setLimited(settings.getLimit());
sequentialProcessorBuilder.setBlocking(settings.isBlocking());
sequentialProcessorBuilder.setErrorHandler(settings.getErrorHandler());
return registerActor(name, factory, initMessage, sender, sequentialProcessorBuilder.build());
}
Expand All @@ -60,7 +56,7 @@ private LintStoneActorAccess registerActor(String name, LintStoneActorFactory fa
LintStoneActor actorInstance = factory.create();
Actor newActor = new Actor(name, actorInstance, sequentialProcessor, this, sender);
// actor receives the initMessage as first message.
initMessage.ifPresent(msg -> newActor.send(msg, Optional.empty(), null));
initMessage.ifPresent(msg -> newActor.send(msg, Optional.empty(), null, null));
actors.put(name, newActor);
return new SelfUpdatingActorAccess(name, newActor, this, sender);
}
Expand Down
Loading

0 comments on commit 3a8f489

Please sign in to comment.