From f2cbbd7efbb5abb558f6e7cf89bd3d36ecaeb430 Mon Sep 17 00:00:00 2001 From: Patrick Zimmer Date: Thu, 7 Dec 2023 19:09:31 +0100 Subject: [PATCH] Adds AI generated java doc --- .../paxel/lintstone/api/ErrorHandler.java | 13 ++- src/main/java/paxel/lintstone/impl/Actor.java | 54 +++++++++ .../lintstone/impl/ActorSettingsBuilder.java | 20 +++- .../lintstone/impl/ActorSettingsImpl.java | 3 + .../paxel/lintstone/impl/ActorSystem.java | 106 +++++++++++++++++- .../paxel/lintstone/impl/FailedMessage.java | 5 + .../lintstone/impl/GroupingExecutor.java | 68 ++++++++++- .../paxel/lintstone/impl/MessageContext.java | 84 +++++++++++++- .../lintstone/impl/MessageContextFactory.java | 18 ++- .../impl/SelfUpdatingActorAccessor.java | 76 ++++++++++++- .../lintstone/impl/SequentialProcessor.java | 31 +++++ .../impl/SequentialProcessorBuilder.java | 18 +++ .../impl/SequentialProcessorImpl.java | 5 +- 13 files changed, 481 insertions(+), 20 deletions(-) diff --git a/src/main/java/paxel/lintstone/api/ErrorHandler.java b/src/main/java/paxel/lintstone/api/ErrorHandler.java index 8e78b4c..9008feb 100644 --- a/src/main/java/paxel/lintstone/api/ErrorHandler.java +++ b/src/main/java/paxel/lintstone/api/ErrorHandler.java @@ -1,6 +1,17 @@ package paxel.lintstone.api; +/** + * ErrorHandler is a functional interface for handling errors. + * The handleError method is called to handle an error. + */ @FunctionalInterface public interface ErrorHandler { - boolean handleError(Object a); + + /** + * Handles an error. + * + * @param failedMessage The object representing the error. + * @return True if the error was handled successfully, false otherwise. + */ + boolean handleError(Object failedMessage); } diff --git a/src/main/java/paxel/lintstone/impl/Actor.java b/src/main/java/paxel/lintstone/impl/Actor.java index ea7d0a8..6eaf968 100644 --- a/src/main/java/paxel/lintstone/impl/Actor.java +++ b/src/main/java/paxel/lintstone/impl/Actor.java @@ -23,6 +23,15 @@ class Actor { private final AtomicLong totalReplies = new AtomicLong(); private final MessageContextFactory messageContextFactory; + /** + * Constructs a new Actor with the given parameters. + * + * @param name The name of the actor. + * @param actorInstance The instance of the LintStoneActor interface that this actor represents. + * @param sequentialProcessor The SequentialProcessor responsible for processing messages sequentially. + * @param system The ActorSystem that manages this actor. + * @param sender The SelfUpdatingActorAccessor of the sender. + */ Actor(String name, LintStoneActor actorInstance, SequentialProcessor sequentialProcessor, ActorSystem system, SelfUpdatingActorAccessor sender) { this.name = name; this.actorInstance = actorInstance; @@ -31,10 +40,24 @@ class Actor { } + /** + * Checks if the actor is valid. + * + * @return true if the actor is valid, false otherwise. + */ boolean isValid() { return registered; } + /** + * Sends a message to an actor for processing. + * + * @param message The message to be sent. + * @param sender The SelfUpdatingActorAccessor of the sender. + * @param replyHandler The handler for the reply. If null, the message is sent to the sender without relation to the previous message. + * @param blockThreshold The threshold for back pressure. If null, the message is added to the sequential processor without back pressure. + * @throws UnregisteredRecipientException If the recipient actor is not registered. + */ void send(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHandler, Integer blockThreshold) throws UnregisteredRecipientException { if (!registered) { throw new UnregisteredRecipientException("Actor " + name + " is not registered"); @@ -94,15 +117,31 @@ private void handleReply(Object reply, SelfUpdatingActorAccessor self, SelfUpdat } } + /** + * Unregisters the actor gracefully. This method sets the 'registered' flag to false + * and calls the unregisterGracefully() method of the sequential processor associated with the actor. + */ void unregisterGracefully() { registered = false; sequentialProcessor.unregisterGracefully(); } + /** + * Shuts down the sequential processor. + * + * @param now true if the shutdown should happen immediately, false if it should be graceful + */ void shutdown(boolean now) { sequentialProcessor.shutdown(now); } + /** + * Executes the specified reply handler with the given reply object. + * + * @param replyHandler The handler for the reply. + * @param reply The reply object. + * @throws UnregisteredRecipientException If the actor is not registered. + */ public void run(ReplyHandler replyHandler, Object reply) { if (!registered) { throw new UnregisteredRecipientException("Actor " + name + " is not registered"); @@ -130,14 +169,29 @@ public String toString() { '}'; } + /** + * Returns the total number of messages processed by the actor. + * + * @return The total number of messages processed. + */ public long getTotalMessages() { return totalMessages.get(); } + /** + * Returns the total number of replies processed by the actor. + * + * @return The total number of replies processed. + */ public long getTotalReplies() { return totalReplies.get(); } + /** + * Returns the number of messages currently queued in the SequentialProcessor. + * + * @return The number of queued messages. + */ public int getQueued() { return sequentialProcessor.size(); } diff --git a/src/main/java/paxel/lintstone/impl/ActorSettingsBuilder.java b/src/main/java/paxel/lintstone/impl/ActorSettingsBuilder.java index 96b0f5e..67bad43 100644 --- a/src/main/java/paxel/lintstone/impl/ActorSettingsBuilder.java +++ b/src/main/java/paxel/lintstone/impl/ActorSettingsBuilder.java @@ -3,19 +3,37 @@ import paxel.lintstone.api.ActorSettings; import paxel.lintstone.api.ErrorHandler; +/** + * A builder class for creating instances of {@link ActorSettings}. + */ public class ActorSettingsBuilder { private ErrorHandler errorHandler = x -> true; + /** + * Sets the error handler for the actor settings. + * + * @param errorHandler the error handler to set + * @return the updated instance of ActorSettingsBuilder + */ public ActorSettingsBuilder setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler; return this; } - + /** + * Builds an instance of {@link ActorSettings} with the specified configuration. + * + * @return the built instance of ActorSettings + */ public ActorSettings build() { return new ActorSettingsImpl( errorHandler); } + /** + * Retrieves the error handler for the actor settings. + * + * @return the error handler. + */ public ErrorHandler getErrorHandler() { return this.errorHandler; } diff --git a/src/main/java/paxel/lintstone/impl/ActorSettingsImpl.java b/src/main/java/paxel/lintstone/impl/ActorSettingsImpl.java index 3ff842e..ca35393 100644 --- a/src/main/java/paxel/lintstone/impl/ActorSettingsImpl.java +++ b/src/main/java/paxel/lintstone/impl/ActorSettingsImpl.java @@ -3,6 +3,9 @@ import paxel.lintstone.api.ErrorHandler; import paxel.lintstone.api.ActorSettings; +/** + * Implementation of the {@link ActorSettings} interface. Represents the actor settings for the creation of configured actors. + */ public record ActorSettingsImpl(ErrorHandler errorHandler) implements ActorSettings { } diff --git a/src/main/java/paxel/lintstone/impl/ActorSystem.java b/src/main/java/paxel/lintstone/impl/ActorSystem.java index 7453f8d..f5f5126 100644 --- a/src/main/java/paxel/lintstone/impl/ActorSystem.java +++ b/src/main/java/paxel/lintstone/impl/ActorSystem.java @@ -9,25 +9,54 @@ import paxel.lintstone.api.*; +/** + * This class represents an Actor System in the LintStone framework. + */ public class ActorSystem implements LintStoneSystem { private final Map actors = Collections.synchronizedMap(new HashMap<>()); private final GroupingExecutor groupingExecutor; + /** + * The ActorSystem class represents the LintStone Actor system. + */ public ActorSystem() { groupingExecutor = new GroupingExecutor(); } + /** + * Registers an actor in the system with the specified name, factory, settings, and optional init message. + * + * @param name The name of the actor. The name must be unique in the system. + * @param factory The factory to create the actor if not already exists. + * @param settings The actor settings. Use {@link ActorSettings#create()} to create a builder and {@link ActorSettingsBuilder#build()} to build the instance. + * @param initMessage The init message. Can be null, but you should use {@link #registerActor(String, LintStoneActorFactory, ActorSettings)} then instead. + * @return The {@link LintStoneActorAccessor} for the registered actor. + */ @Override public LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, ActorSettings settings, Object initMessage) { return registerActor(name, factory, null, settings, initMessage); } + /** + * Registers an actor in the system with the specified name, factory, and settings. + * + * @param name The name of the actor. The name must be unique in the system. + * @param factory The factory to create the actor if it does not already exist. + * @param settings The actor settings. + * @return The LintStoneActorAccessor for the registered actor. + */ @Override public LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, ActorSettings settings) { return registerActor(name, factory, null, settings, null); } + /** + * Retrieves the LintStoneActorAccessor for the actor with the specified name. + * + * @param name The name of the actor. The name must be unique in the system. + * @return The LintStoneActorAccessor for the actor with the specified name. + */ @Override public LintStoneActorAccessor getActor(String name) { synchronized (actors) { @@ -35,6 +64,16 @@ public LintStoneActorAccessor getActor(String name) { } } + /** + * Registers an actor in the system with the specified name, factory, sender, settings, and optional init message. + * + * @param name The name of the actor. The name must be unique in the system. + * @param factory The factory to create the actor if not already exists. + * @param sender The sender of the actor. + * @param settings The actor settings. Use {@link ActorSettings#create()} to create a builder and {@link ActorSettingsBuilder#build()} to build the instance. + * @param initMessage The init message. Can be null, but you should use {@link #registerActor(String, LintStoneActorFactory, ActorSettings)} then instead. + * @return The {@link LintStoneActorAccessor} for the registered actor. + */ LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, SelfUpdatingActorAccessor sender, ActorSettings settings, Object initMessage) { SequentialProcessorBuilder sequentialProcessorBuilder = groupingExecutor.create(); sequentialProcessorBuilder.setErrorHandler(settings.errorHandler()); @@ -42,6 +81,16 @@ LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, } + /** + * Registers an actor in the system with the specified name, factory, initMessage, sender, and sequentialProcessor. + * + * @param name The name of the actor. The name must be unique in the system. + * @param factory The factory to create the actor if not already exists. + * @param initMessage The initialization message for the actor. Can be null. + * @param sender The sender of the actor. + * @param sequentialProcessor The sequential processor for the actor. + * @return The LintStoneActorAccessor for the registered actor. + */ private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, Object initMessage, SelfUpdatingActorAccessor sender, SequentialProcessorBuilder sequentialProcessor) { synchronized (actors) { Actor existing = actors.get(name); @@ -58,12 +107,30 @@ private LintStoneActorAccessor registerActor(String name, LintStoneActorFactory } + /** + * Shuts down the Actor system. + * + * This method will stop all Actors in the system after all messages are processed. + * The method returns immediately, but it does not guarantee that all messages are processed on return. + * + * This method is called internally to gracefully shut down the system. + * It first calls the private method shutdownActors with a parameter "false" to shutdown all Actors in the system. + * Then it calls the shutdown method of the groupingExecutor to shut down the executor service used by the system. + * + * @see ActorSystem#shutdownActors(boolean) + * @see LintStoneSystem#shutDown() + */ @Override public void shutDown() { shutdownActors(false); groupingExecutor.shutdown(); } + /** + * Shuts down the ActorSystem and waits for all Actors to complete processing their messages. + * + * @throws InterruptedException If the thread is interrupted while waiting for termination. + */ @Override public void shutDownAndWait() throws InterruptedException { shutdownActors(false); @@ -72,6 +139,13 @@ public void shutDownAndWait() throws InterruptedException { groupingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS); } + /** + * Shuts down the ActorSystem and waits for all Actors to complete processing their messages. + * + * @param timeout the duration to wait + * @return true if all Actors have terminated within the specified timeout, false otherwise + * @throws InterruptedException if the thread is interrupted while waiting for termination + */ @Override public boolean shutDownAndWait(Duration timeout) throws InterruptedException { shutdownActors(false); @@ -79,19 +153,41 @@ public boolean shutDownAndWait(Duration timeout) throws InterruptedException { return groupingExecutor.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS); } + /** + * Shuts down the ActorSystem immediately. + * + * This method shuts down all Actors in the system and terminates the executor service used by the system. + * + * @see ActorSystem#shutdownActors(boolean) + * @see ActorSystem#groupingExecutor + */ @Override public void shutDownNow() { shutdownActors(true); groupingExecutor.shutdownNow(); } + /** + * Shuts down the actors in the system. + * + * This method is called internally to gracefully shut down the system. + * It iterates over all the actors in the system and calls their shutdown method. + * The shutdown of each actor can be immediate or graceful depending on the value of the "now" parameter. + * + * @param now true if the shutdown should be immediate, false if it should be graceful + */ private void shutdownActors(boolean now) { synchronized (actors) { - actors.entrySet().stream().map(Map.Entry::getValue).forEach(a -> a.shutdown(now)); + actors.values().stream().forEach(a -> a.shutdown(now)); } } - + /** + * Unregisters an actor from the system. + * + * @param name The name of the actor to be removed. + * @return true if the actor was successfully unregistered, false otherwise. + */ @Override public boolean unregisterActor(String name) { synchronized (actors) { @@ -105,6 +201,12 @@ public boolean unregisterActor(String name) { } } + /** + * Retrieves an optional Actor object by name. + * + * @param name The name of the Actor. + * @return An Optional object representing the Actor with the specified name, or an empty Optional if the Actor does not exist. + */ Optional getOptionalActor(String name) { synchronized (actors) { return Optional.ofNullable(actors.get(name)); diff --git a/src/main/java/paxel/lintstone/impl/FailedMessage.java b/src/main/java/paxel/lintstone/impl/FailedMessage.java index c02b0e0..8a61ed2 100644 --- a/src/main/java/paxel/lintstone/impl/FailedMessage.java +++ b/src/main/java/paxel/lintstone/impl/FailedMessage.java @@ -4,6 +4,11 @@ import java.util.Objects; +/** + * Represents a failed message in the framework. + * This class implements the LintStoneFailedMessage interface. + * It contains the failed message, the cause of the failure, and the name of the actor where the failure occurred. + */ public record FailedMessage(Object message, Throwable cause, String actorName) implements LintStoneFailedMessage { @Override diff --git a/src/main/java/paxel/lintstone/impl/GroupingExecutor.java b/src/main/java/paxel/lintstone/impl/GroupingExecutor.java index d1c70fc..dfc81a1 100644 --- a/src/main/java/paxel/lintstone/impl/GroupingExecutor.java +++ b/src/main/java/paxel/lintstone/impl/GroupingExecutor.java @@ -1,42 +1,106 @@ package paxel.lintstone.impl; -import paxel.lintstone.impl.SequentialProcessorBuilder; - import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +/** + * The GroupingExecutor class represents an executor service for grouping and executing tasks. + * It provides methods for creating a sequential processor and managing the executor service. + * + * Usage example: + * + * GroupingExecutor executor = new GroupingExecutor(); + * SequentialProcessorBuilder builder = executor.create(); + * // Configure the sequential processor builder + * SequentialProcessor processor = builder.build(); + * // Use the sequential processor to add and execute tasks + * + * // Shutdown the executor + * executor.shutdown(); + * + * Note: This implementation uses virtual threads for execution. + */ public class GroupingExecutor { private final ExecutorService executorService; + /** + * The GroupingExecutor class represents an executor service for grouping and executing tasks. + * It provides methods for creating a sequential processor and managing the executor service. + */ public GroupingExecutor() { // this implementation is completely for virtual Threads this.executorService = Executors.newVirtualThreadPerTaskExecutor(); } + /** + * Creates a new SequentialProcessorBuilder instance. + * The builder is used to configure the SequentialProcessor and submit the runnable to the executor service. + * + * @return the SequentialProcessorBuilder instance + */ public SequentialProcessorBuilder create() { // the Builder will submit the runnable to the service when the Processor is build. return new SequentialProcessorBuilder(executorService); } + /** + * Shuts down the executor service used by the GroupingExecutor. + * + * This method initiates an orderly shutdown of the executor service. + * Any tasks that have been submitted to the executor service will be executed before the shutdown is complete. + * The method returns immediately after initiating the shutdown, but it does not wait for the tasks to complete. + * If you need to wait for the tasks to complete, use the awaitTermination method. + * + * @see GroupingExecutor#awaitTermination(long, TimeUnit) + */ public void shutdown() { executorService.shutdown(); } + /** + * Shuts down the executor service immediately. + * + * This method initiates an immediate shutdown of the executor service. + * Any tasks that have been submitted to the executor service may be terminated before completion. + * The method returns immediately after initiating the shutdown, but it does not wait for the tasks to complete. + * + * @return a list of tasks that were scheduled for execution but have not yet started + * @see GroupingExecutor#shutdown() + * @see GroupingExecutor#awaitTermination(long, TimeUnit) + */ public List shutdownNow() { return executorService.shutdownNow(); } + /** + * Returns true if the executor service used by the GroupingExecutor has been shut down, false otherwise. + * + * @return true if the executor service has been shut down, false otherwise + */ public boolean isShutdown() { return executorService.isShutdown(); } + /** + * Returns true if the executor service used by the GroupingExecutor has been terminated, false otherwise. + * + * @return true if the executor service has been terminated, false otherwise + */ public boolean isTerminated() { return executorService.isTerminated(); } + /** + * Waits for the executor service to terminate. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if the executor service terminated and false if the timeout elapsed before termination + * @throws InterruptedException if the current thread is interrupted while waiting + */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executorService.awaitTermination(timeout, unit); } diff --git a/src/main/java/paxel/lintstone/impl/MessageContext.java b/src/main/java/paxel/lintstone/impl/MessageContext.java index b83fe1b..3087c6c 100644 --- a/src/main/java/paxel/lintstone/impl/MessageContext.java +++ b/src/main/java/paxel/lintstone/impl/MessageContext.java @@ -7,7 +7,7 @@ import java.util.function.BiConsumer; /** - * Message context instance dedicated for a process of a message or reply + * Represents the context for a message event. It provides methods for handling messages, replying to senders, sending messages to actors, and registering/unregistering actors. */ public class MessageContext implements LintStoneMessageEventContext { @@ -16,6 +16,9 @@ public class MessageContext implements LintStoneMessageEventContext { private final Object message; private final BiConsumer replyHandler; + /** + * Represents the context of a message being processed by an actor system. + */ public MessageContext(Object message, ActorSystem actorSystem, SelfUpdatingActorAccessor self, BiConsumer replyHandler) { this.message = message; this.actorSystem = actorSystem; @@ -23,21 +26,50 @@ public MessageContext(Object message, ActorSystem actorSystem, SelfUpdatingActor this.replyHandler = replyHandler; } + /** + * Executes the given event handler if the message class is assignable from the specified class. + * The returned TypeSafeMonad instance will be non-functional if the handler is executed, + * otherwise, it returns itself. + * + * @param clazz The class to check assignability. + * @param consumer The event handler for the class. + * @return The updated TypeSafeMonad instance. + * @param The type of the class. + */ @Override public TypeSafeMonad inCase(Class clazz, LintStoneEventHandler consumer) { return new TypeSafeMonad(message, this).inCase(clazz, consumer); } + /** + * Executes the provided event handler if no match is found for the message class. + * + * @param catchAll The handler for unknown types. + */ @Override public void otherwise(LintStoneEventHandler catchAll) { new TypeSafeMonad(message, this).otherwise(catchAll); } + /** + * Sends a reply to the sender of the message. + * + * @param msg the message to reply. + * @throws NoSenderException if there is no sender for the current message. + * @throws UnregisteredRecipientException if there is no actor with that name. + */ @Override public void reply(Object msg) throws NoSenderException, UnregisteredRecipientException { replyHandler.accept(msg, self); } + /** + * Sends a message to the actor with the given name. + * + * @param name the name of the actor. + * @param msg The message to send. + * @throws UnregisteredRecipientException if there is no actor with that name. + */ @Override public void tell(String name, Object msg) throws UnregisteredRecipientException { Optional actor = actorSystem.getOptionalActor(name); @@ -47,6 +79,14 @@ public void tell(String name, Object msg) throws UnregisteredRecipientException actor.get().send(msg, self, null, null); } + /** + * Sends a message to the actor with the given name, expecting a reply. + * + * @param name the name of the actor. + * @param msg The message to send. + * @param handler The reply handler. + * @throws UnregisteredRecipientException if there is no actor with that name. + */ @Override public void ask(String name, Object msg, ReplyHandler handler) throws UnregisteredRecipientException { Optional actor = actorSystem.getOptionalActor(name); @@ -56,6 +96,15 @@ public void ask(String name, Object msg, ReplyHandler handler) throws Unregister actor.get().send(msg, self, handler, null); } + /** + * Sends a message to the actor with the given name and expects a reply. + * + * @param name the name of the actor. + * @param msg The message to send. + * @param The type of the expected reply. + * @return a CompletableFuture that represents the expected reply from the actor. + * @throws UnregisteredRecipientException if there is no actor registered with the given name. + */ @Override public CompletableFuture ask(String name, Object msg) throws UnregisteredRecipientException { Optional actor = actorSystem.getOptionalActor(name); @@ -74,6 +123,12 @@ public CompletableFuture ask(String name, Object msg) throws Unregistered } + /** + * Retrieves a LintStoneActorAccessor for the specified actor name. + * + * @param name The name of the actor. + * @return A LintStoneActorAccessor for the specified actor name. + */ @Override public LintStoneActorAccessor getActor(String name) { // give a empty ref, that is filled on demand. @@ -81,21 +136,48 @@ public LintStoneActorAccessor getActor(String name) { } + /** + * Registers an actor with the given name, factory, init message, and settings. + * + * @param name The name of the actor. + * @param factory The factory used to create the actor. + * @param initMessage The initial message to send to the actor. + * @param settings The settings for the actor. + * @return A LintStoneActorAccessor for the registered actor. + */ @Override public LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, Object initMessage, ActorSettings settings) { return actorSystem.registerActor(name, factory, self, settings, initMessage); } + /** + * Registers an actor with the specified name, factory, and settings. + * + * @param name The name of the actor. + * @param factory The factory used to create the actor. + * @param settings The settings for the actor. + * @return A LintStoneActorAccessor for the registered actor. + */ @Override public LintStoneActorAccessor registerActor(String name, LintStoneActorFactory factory, ActorSettings settings) { return actorSystem.registerActor(name, factory, self, settings, null); } + /** + * Unregisters the actor from the system. + * + * @return true if the actor is successfully unregistered, false otherwise. + */ @Override public boolean unregister() { return actorSystem.unregisterActor(self.getName()); } + /** + * Retrieves the name associated with this MessageContext. + * + * @return The name associated with this MessageContext. + */ @Override public String getName() { return self.getName(); diff --git a/src/main/java/paxel/lintstone/impl/MessageContextFactory.java b/src/main/java/paxel/lintstone/impl/MessageContextFactory.java index 432cff9..f6299d3 100644 --- a/src/main/java/paxel/lintstone/impl/MessageContextFactory.java +++ b/src/main/java/paxel/lintstone/impl/MessageContextFactory.java @@ -3,18 +3,16 @@ import java.util.function.BiConsumer; /** - * Constructs MessageContexts for dedicated calls, that stay valid forever. - * This is needed to use the ask context inside the reply handler. + * The {@code MessageContextFactory} class represents a factory for creating {@link MessageContext} objects. + * It provides a method to construct an immutable {@link MessageContext} using the given parameters. */ public class MessageContextFactory { private final ActorSystem actorSystem; private final SelfUpdatingActorAccessor self; /** - * The factory is created with the actorSystem and the current actor access. - * - * @param actorSystem The system. - * @param self the current actor access. + * The {@code MessageContextFactory} class represents a factory for creating {@link MessageContext} objects. + * It provides a method to construct an immutable {@link MessageContext} using the given parameters. */ public MessageContextFactory(ActorSystem actorSystem, SelfUpdatingActorAccessor self) { this.actorSystem = actorSystem; @@ -22,11 +20,11 @@ public MessageContextFactory(ActorSystem actorSystem, SelfUpdatingActorAccessor } /** - * Constructs an immutable MessageContext. + * Creates a new {@link MessageContext} object with the given message and reply handler. * - * @param message The message of the context. - * @param replyHandler The reply handler for the reply method of the context. - * @return The MessageContext. + * @param message The message to be processed by the actor system. + * @param replyHandler The handler to be invoked when a reply is received. + * @return The newly created MessageContext object. */ public MessageContext create(Object message, BiConsumer replyHandler) { return new MessageContext(message, actorSystem, self, replyHandler); diff --git a/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccessor.java b/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccessor.java index b97dc24..cd38f3e 100644 --- a/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccessor.java +++ b/src/main/java/paxel/lintstone/impl/SelfUpdatingActorAccessor.java @@ -7,8 +7,9 @@ import java.util.concurrent.CompletableFuture; /** - * This ActorAccess will try to fetch a new instance of an actor in case the - * current one becomes invalid. + * The SelfUpdatingActorAccessor class is an implementation of the LintStoneActorAccessor interface. + * It provides the ability to send messages to an actor, handle responses to ask() requests, + * and retrieve information about the actor's status and statistics. */ public class SelfUpdatingActorAccessor implements LintStoneActorAccessor { @@ -25,11 +26,26 @@ public class SelfUpdatingActorAccessor implements LintStoneActorAccessor { this.sender = sender; } + /** + * Sends a message to an actor for processing. + * + * @param message The message to send. + * @throws UnregisteredRecipientException If the recipient actor is not registered. + */ @Override public void tell(Object message) throws UnregisteredRecipientException { tell(message, sender, null, null); } + /** + * Sends a message to the Actor represented by this Access. But blocks the call until the number + * of messages queued is less than the given threshold. If someone else is sending messages to the actor, + * this call might block forever. + * + * @param message The message to send. + * @param blockThreshold The number of queued messages that causes the call to block. + * @throws UnregisteredRecipientException in case the actor does not exist. + */ @Override public void tellWithBackPressure(Object message, int blockThreshold) throws UnregisteredRecipientException { tell(message, sender, null, blockThreshold); @@ -55,10 +71,26 @@ void run(ReplyHandler runnable, Object reply) throws UnregisteredRecipientExcept } } + /** + * Sends a message to an actor for processing. + * + * @param message The message to send. + * @param sender The SelfUpdatingActorAccessor of the sender. + * @throws UnregisteredRecipientException If the recipient actor is not registered. + */ public void send(Object message, SelfUpdatingActorAccessor sender) throws UnregisteredRecipientException { tell(message, sender, null, null); } + /** + * Sends a message to an actor for processing. + * + * @param message The message to be sent. + * @param sender The SelfUpdatingActorAccessor of the sender. + * @param replyHandler The handler for the reply. If null, the message is sent to the sender without relation to the previous message. + * @param blockThreshold The threshold for back pressure. If null, the message is added to the sequential processor without back pressure. + * @throws UnregisteredRecipientException If the recipient actor is not registered. + */ private void tell(Object message, SelfUpdatingActorAccessor sender, ReplyHandler replyHandler, Integer blockThreshold) throws UnregisteredRecipientException { if (actor == null) { updateActor(); @@ -78,6 +110,11 @@ private void updateActor() throws UnregisteredRecipientException { .orElseThrow(() -> new UnregisteredRecipientException("An actor with the name " + name + " is not available")); } + /** + * Checks if the actor represented by this accessor exists and is valid. + * + * @return true if the actor exists and is valid, false otherwise. + */ @Override public boolean exists() { if (actor == null) { @@ -87,12 +124,27 @@ public boolean exists() { return actor != null && actor.isValid(); } + /** + * Sends a message to the actor for processing and provides a handler for the reply. + * + * @param message The message to send. + * @param replyHandler The handler for the reply. + * @throws UnregisteredRecipientException If the recipient actor is not registered. + */ @Override public void ask(Object message, ReplyHandler replyHandler) throws UnregisteredRecipientException { // replyHandler is required, therefore not Optional.ofNullable tell(message, sender, replyHandler, null); } + /** + * Sends a message to an actor for processing and provides a {@link CompletableFuture} for the reply. + * + * @param message the Message for the actor + * @param the type of the expected reply + * @return a {@link CompletableFuture} that represents the result of the ask operation. It will be completed in the context of the asked actor. + * @throws UnregisteredRecipientException if the recipient actor is not registered + */ @Override public CompletableFuture ask(Object message) throws UnregisteredRecipientException { CompletableFuture result = new CompletableFuture<>(); @@ -107,21 +159,41 @@ public CompletableFuture ask(Object message) throws UnregisteredRecipient } + /** + * Returns the number of messages currently queued in the actor. + * + * @return The number of queued messages. + */ @Override public int getQueuedMessagesAndReplies() { return actor.getQueued(); } + /** + * Returns the number of messages processed by the actor. + * + * @return The number of processed messages. + */ @Override public long getProcessedMessages() { return actor.getTotalMessages(); } + /** + * Returns the number of processed replies to ask() requests. + * + * @return The number of processed replies. + */ @Override public long getProcessedReplies() { return actor.getTotalReplies(); } + /** + * Returns the name of the actor. + * + * @return The name of the actor as a String. + */ @Override public String getName() { return name; diff --git a/src/main/java/paxel/lintstone/impl/SequentialProcessor.java b/src/main/java/paxel/lintstone/impl/SequentialProcessor.java index f3adafe..da63b56 100644 --- a/src/main/java/paxel/lintstone/impl/SequentialProcessor.java +++ b/src/main/java/paxel/lintstone/impl/SequentialProcessor.java @@ -1,13 +1,44 @@ package paxel.lintstone.impl; +/** + * A SequentialProcessor interface for managing sequential processing of Runnables. + */ public interface SequentialProcessor { + + /** + * Adds a {@link Runnable} to be processed. + * + * @param runnable The {@link Runnable} to be added. + */ void add(Runnable runnable); + /** + * Adds a {@link Runnable} to be processed with back pressure. + * + * @param runnable The {@link Runnable} to be added. + * @param blockThreshold The threshold for back pressure. If the number of currently queued messages in the SequentialProcessor exceeds this threshold, the message will be rejected + *. + * @return true if the runnable was successfully added, false if the SequentialProcessor rejected the message. + */ boolean addWithBackPressure(Runnable runnable, Integer blockThreshold); + /** + * Returns the number of messages currently queued in the SequentialProcessor. + * + * @return The number of queued messages. + */ int size(); + /** + * Unregisters the actor gracefully. This method sets the 'registered' flag to false + * and calls the unregisterGracefully() method of the sequential processor associated with the actor. + */ void unregisterGracefully(); + /** + * Shuts down the SequentialProcessor. + * + * @param now true if the shutdown should happen immediately, false if it should be graceful + */ void shutdown(boolean now); } diff --git a/src/main/java/paxel/lintstone/impl/SequentialProcessorBuilder.java b/src/main/java/paxel/lintstone/impl/SequentialProcessorBuilder.java index ae23b45..922b938 100644 --- a/src/main/java/paxel/lintstone/impl/SequentialProcessorBuilder.java +++ b/src/main/java/paxel/lintstone/impl/SequentialProcessorBuilder.java @@ -4,19 +4,37 @@ import java.util.concurrent.ExecutorService; +/** + * The SequentialProcessorBuilder class is responsible for constructing a SequentialProcessor object with the specified ExecutorService and ErrorHandler. + */ public class SequentialProcessorBuilder { private final ExecutorService executorService; private ErrorHandler errorHandler = err -> true; + /** + * Constructs a SequentialProcessorBuilder object with the specified ExecutorService. + * + * @param executorService the ExecutorService to be used by the SequentialProcessor + */ public SequentialProcessorBuilder(ExecutorService executorService) { this.executorService = executorService; } + /** + * Sets the error handler for the SequentialProcessor. + * + * @param errorHandler The error handler to be set. + */ public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler; } + /** + * Builds and returns a SequentialProcessor object using the specified error handler and executor service. + * + * @return The created SequentialProcessor object. + */ public SequentialProcessor build() { SequentialProcessorImpl sequentialProcessor = new SequentialProcessorImpl(errorHandler); executorService.submit(sequentialProcessor.getRunnable()); diff --git a/src/main/java/paxel/lintstone/impl/SequentialProcessorImpl.java b/src/main/java/paxel/lintstone/impl/SequentialProcessorImpl.java index 9526286..5a8cbb4 100644 --- a/src/main/java/paxel/lintstone/impl/SequentialProcessorImpl.java +++ b/src/main/java/paxel/lintstone/impl/SequentialProcessorImpl.java @@ -11,6 +11,9 @@ import static paxel.lintstone.impl.SequentialProcessorImpl.RunStatus.*; +/** + * SequentialProcessorImpl is an implementation of the SequentialProcessor interface for managing sequential processing of Runnables. + */ public class SequentialProcessorImpl implements SequentialProcessor { private final ReentrantLock lock = new ReentrantLock(); @@ -168,7 +171,7 @@ private void run() { enum RunStatus { - ACTIVE, STOPPED, ABORT; + ACTIVE, STOPPED, ABORT } }