Skip to content

Commit

Permalink
Introduces some code improvements
Browse files Browse the repository at this point in the history
ErrorHandler Interface more clear
SequentialProcessor less complex
Adds backpressure usage to the ReadMe
  • Loading branch information
paxel committed Jun 22, 2024
1 parent 89d37fe commit 8f561b7
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 62 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,24 @@ for (String text : data) {
String v = dist.<String>ask(new EndMessage())
.get(1, TimeUnit.MINUTES);
```
## Usage with backpressure

In some situations you create too many events and want to prevent to flood the system.
Therefore the tellWithBackPressure method was introduced.
It blocks the tell until the unprocessed message number is less than the given value.

```java

for (String text : data) {
dist.tellWithBackPressure(text,1_000_000);
}

```

There is no ask with BackPressure (yet).
Ask should be used at the end of batch and not in mass, because it is less effective

Be aware that if you send messages with backpressure in a circle you might cause deadlocks!

# Benchmarks

Expand Down
2 changes: 1 addition & 1 deletion src/main/asciidoc/description.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ interface paxel.lintstone.api.ReplyHandler {
interface paxel.lintstone.api.LintStoneEventHandler {
~ void handle(T,LintStoneMessageEventContext)
}
class paxel.lintstone.api.TypeSafeMonad {
class paxel.lintstone.api.MessageAccess {
- {static} TypeSafeMonad DONE
- Object message
- LintStoneMessageEventContext context
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/paxel/lintstone/api/ErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,15 @@

@FunctionalInterface
public interface ErrorHandler {
boolean handleError(Object a);

/**
* This Errorhandler is called whenever an Exception is caused by an Actor.
* The handler decides if the Actor should continue {@link ErrorHandlerDecision#CONTINUE} or
* abort processing {@link ErrorHandlerDecision#ABORT}.
*
* @param exception the Exception caught by the {@link paxel.lintstone.impl.SequentialProcessor}
* @return The decision of the ErrorHandler
*/
ErrorHandlerDecision handleError(Exception exception);

}
8 changes: 8 additions & 0 deletions src/main/java/paxel/lintstone/api/ErrorHandlerDecision.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package paxel.lintstone.api;

/**
* The decision of the {@link ErrorHandler} after a caught Exception.
*/
public enum ErrorHandlerDecision {
CONTINUE, ABORT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface LintStoneMessageEventContext {
* @param consumer The consumer of messages of the class.
* @return the context itself
*/
<T> TypeSafeMonad inCase(Class<T> clazz, LintStoneEventHandler<T> consumer);
<T> MessageAccess inCase(Class<T> clazz, LintStoneEventHandler<T> consumer);

/**
* Is executed if no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
/**
* This class provides access to the message or the reply.
*/
public class TypeSafeMonad {
private static final TypeSafeMonad DONE = new TypeSafeMonad(null, null) {
public class MessageAccess {
private static final MessageAccess DONE = new MessageAccess(null, null) {
@Override
public <T> TypeSafeMonad inCase(Class<T> clazz, LintStoneEventHandler<T> lintStoneEventHandler) {
public <T> MessageAccess inCase(Class<T> clazz, LintStoneEventHandler<T> lintStoneEventHandler) {
return this;
}

Expand All @@ -18,7 +18,7 @@ public void otherwise(LintStoneEventHandler<Object> catchAll) {
private final Object message;
private final LintStoneMessageEventContext context;

public TypeSafeMonad(Object message, LintStoneMessageEventContext context) {
public MessageAccess(Object message, LintStoneMessageEventContext context) {
this.message = message;
this.context = context;
}
Expand All @@ -32,7 +32,7 @@ public TypeSafeMonad(Object message, LintStoneMessageEventContext context) {
* @param <T> The type.
* @return A Monad.
*/
public <T> TypeSafeMonad inCase(Class<T> clazz, LintStoneEventHandler<T> lintStoneEventHandler) {
public <T> MessageAccess inCase(Class<T> clazz, LintStoneEventHandler<T> lintStoneEventHandler) {
if (clazz.isAssignableFrom(message.getClass())) {
lintStoneEventHandler.handle(clazz.cast(message), context);
return DONE;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/paxel/lintstone/impl/ActorSettingsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import paxel.lintstone.api.ActorSettings;
import paxel.lintstone.api.ErrorHandler;
import paxel.lintstone.api.ErrorHandlerDecision;

public class ActorSettingsBuilder {
private ErrorHandler errorHandler = x -> true;
private ErrorHandler errorHandler = x -> ErrorHandlerDecision.CONTINUE;

public ActorSettingsBuilder setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/paxel/lintstone/impl/MessageContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ public MessageContext(Object message, ActorSystem actorSystem, SelfUpdatingActor
}

@Override
public <T> TypeSafeMonad inCase(Class<T> clazz, LintStoneEventHandler<T> consumer) {
return new TypeSafeMonad(message, this).inCase(clazz, consumer);
public <T> MessageAccess inCase(Class<T> clazz, LintStoneEventHandler<T> consumer) {
return new MessageAccess(message, this).inCase(clazz, consumer);
}

@Override
public void otherwise(LintStoneEventHandler<Object> catchAll) {
new TypeSafeMonad(message, this).otherwise(catchAll);
new MessageAccess(message, this).otherwise(catchAll);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package paxel.lintstone.impl;

import paxel.lintstone.api.ErrorHandler;
import paxel.lintstone.api.ErrorHandlerDecision;

import java.util.concurrent.ExecutorService;

public class SequentialProcessorBuilder {
private final ExecutorService executorService;
private ErrorHandler errorHandler = err -> true;
private ErrorHandler errorHandler = err -> ErrorHandlerDecision.CONTINUE;

public SequentialProcessorBuilder(ExecutorService executorService) {

Expand Down
114 changes: 71 additions & 43 deletions src/main/java/paxel/lintstone/impl/SequentialProcessorImpl.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package paxel.lintstone.impl;

import paxel.lintstone.api.ErrorHandler;
import sun.misc.Unsafe;
import paxel.lintstone.api.ErrorHandlerDecision;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -110,61 +110,89 @@ public Runnable getRunnable() {

private void run() {
try {
runMessages();
} finally {
status.set(STOPPED);
}
}

private void runMessages() {
for (; ; ) {
// Poll blocks until a message is available. If null is returned we should stop
Runnable runnable = poll();
if (runnable == null) {
break;
}
runNextMessage(runnable);
}
}

private Runnable poll() {
try {
lock.lock();
for (; ; ) {
Runnable runnable;
Runnable runnable = queuedRunnables.poll();
if (!checkRunnable(runnable)) {
// There is no Runnable and there will never be one again.
return null;
}
//It is valid. So if it is not null we use it. Or retry.
if (runnable != null) {
return runnable;
}
}
} finally {
lock.unlock();
}

}

private boolean checkRunnable(Runnable runnable) {
if (status.get() == ABORT)
// we should not be running anymore
return false;
if (runnable == null) {
if (endGracefully.get()) {
// end the Thread. we will never see another runnable
return false;
}
try {
// Set this Thread to inactive until a message is received
empty.await();
return true;
} catch (InterruptedException e) {
// End this Thread
return false;
}
} else {
// we pulled a job from queue, so notify the backpressure threads
backPressure.signalAll();
// process the next message / response
return true;
}
}

private void runNextMessage(Runnable runnable) {
try {
runnable.run();
} catch (Exception e) {
if (errorHandler.handleError(e) != ErrorHandlerDecision.CONTINUE) {
// errorhandler says: give up
status.set(ABORT);
try {
// flush jobs and unlock blocked
lock.lock();
runnable = queuedRunnables.poll();
if (status.get() == ABORT)
// we should not be running anymore
break;
if (runnable == null) {
if (endGracefully.get())
// end the Thread. we will never see another runnable
break;
try {
// Thread is idle until a job is added
empty.await();
} catch (InterruptedException e) {
Unsafe.getUnsafe().throwException(e);
}
} else {
// we pulled a job from queue, so notify the backpressure threads
backPressure.signalAll();
// process the next message / response
}
queuedRunnables.clear();
backPressure.signalAll();
} finally {
lock.unlock();
}
// run outside the lock, in case the process wants to add a message to itself :D
if (runnable != null)
try {
runnable.run();
} catch (Exception e) {
if (!errorHandler.handleError(e)) {
// errorhandler says: give up
status.set(ABORT);
try {
// flush jobs and unlock blocked
lock.lock();
queuedRunnables.clear();
backPressure.signalAll();
} finally {
lock.unlock();
}
}
}

}
} finally {
status.set(STOPPED);
}
}


enum RunStatus {
ACTIVE, STOPPED, ABORT
}

}
23 changes: 17 additions & 6 deletions src/test/java/paxel/lintstone/api/FailingTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import paxel.lintstone.api.actors.StupidActor;

import java.util.ArrayList;
Expand All @@ -24,22 +25,32 @@ public class FailingTests {
public static final String FAILING = "failing";
public static final String NOT_EXISTENT = "no";
final CountDownLatch latch = new CountDownLatch(1);
List<Object> errorMessage = new ArrayList<>();

public FailingTests() {
}

private ErrorHandlerDecision addError(Object o) {
errorMessage.add(o);
return ErrorHandlerDecision.CONTINUE;
}

@BeforeEach
void init() {
errorMessage.clear();
}

@Test
public void testFailedMessageResponse() throws InterruptedException, ExecutionException {
public void testFailedMessageResponse() throws InterruptedException {
LintStoneSystem system = LintStoneSystemFactory.create();
List<Object> errorMessage = new ArrayList<>();
system.registerActor(STOP_ACTOR, () -> a -> {
System.out.println("stop received. countdown latch");
latch.countDown();
}, ActorSettings.create().setErrorHandler(errorMessage::add).build());
}, ActorSettings.create().setErrorHandler(this::addError).build());


// This creates an actor that will create a FAILING actor
LintStoneActorAccessor stupid = system.registerActor(GATEWAY, StupidActor::new, ActorSettings.create().setErrorHandler(errorMessage::add).build());
LintStoneActorAccessor stupid = system.registerActor(GATEWAY, StupidActor::new, ActorSettings.create().setErrorHandler(this::addError).build());

// this will create the error handler
// send a "Hi" to the error handler
Expand Down Expand Up @@ -67,10 +78,9 @@ public void testFailedMessageResponse() throws InterruptedException, ExecutionEx
@Test
public void testGetDataOut() throws InterruptedException, ExecutionException {
LintStoneSystem system = LintStoneSystemFactory.create();
List<Object> errorMessage = new ArrayList<>();


LintStoneActorAccessor echoActor = system.registerActor(ECHO_ACTOR, () -> a -> a.reply("echo"), ActorSettings.create().setErrorHandler(errorMessage::add).build());
LintStoneActorAccessor echoActor = system.registerActor(ECHO_ACTOR, () -> a -> a.reply("echo"), ActorSettings.create().setErrorHandler(this::addError).build());

// this message goes to an actor that wants to reply. but can't, because we are calling from outside the actor system
// so this should be a message in the errorHandler
Expand All @@ -86,4 +96,5 @@ public void testGetDataOut() throws InterruptedException, ExecutionException {
system.shutDownAndWait();
}


}

0 comments on commit 8f561b7

Please sign in to comment.