Skip to content

z archived 1.x documentation

RuedigerMoeller edited this page Mar 15, 2015 · 1 revision

#kontraktor documentation

###Introduction

Kontraktor aims to provide a light weight, high performance and non-invasive implementation of the actor pattern (with some modifications, its actually a mixture of actors and CSP with many similarities to Go and node.js). Kontraktor does not take over your application, nor do I advise you to convert all classes to actors.

Kontraktor can be used in mixed threading model applications in a non-dogmatic fashion, allowing for stepwise transition & cleanup of messy or out-of-control multi threaded applications.

For Kontraktor, an actor can be seen similar to an advanced version of a thread. So don't go overboard with the number of actors: a well designed application typically only has 3-10 independent actors. An exception to this are designs (e.g. gaming servers) where one uses actors to simulate a lot of independent interacting entities, in these cases one might schedule many actors on a single thread.

Kontrakor has an explicit model to assign actors to threads, there is no automatic scheduling. The programmer may choose to assign a dedicated thread to an actor or assign multiple actors to a single thread.

Classes+Objects still make up data structures, business entities and processing logic. Actors then model the threading architecture and work flow of your application. By binding data structures to an executing thread we get:

  • mostly or completely single threaded logic
  • no or rare need to synchonize on shared structures
  • maintainable applications, as its typically clear which thread executes a given piece of code and which thread accesses a given piece of data.
  • good scalability and performance are the byproduct of the properties above.

From an academical point of view, Kontraktor's actor model is not an exact implementation of the 'official' actor model. It provides

  • strong guarantees regarding message ordering
  • results of a message are passed to the sender in a callback style for high performance, (with Kontraktor 1.1 Callbacks, Promises and Futures are introduced).
  • dual mailbox system to avoid deadlock issues with simple interaction patterns
  • Aims to structure the threading architecture only (usage of actors course grained, no replacement for classes)

###Kontraktor's actor model

An actor class in kontraktor defines its set of messages by defining public void methods. When the actor gets created, the kontraktor framework generates a proxy wrapping the actor implementation. Calls done on the actor(-proxy) get enqueued on the 'mailbox' (workqueue) of an actor and are executed in the thread the actor is assigned to (DispatcherThread). Kontraktor uses a high performance bounded (currently statically sized) queue implementation from the Jaq-in-A-Box project.

kontraktor aktor

In order to avoid common deadlock patterns and reduce queue size requirements, an actor has a second queue for callbacks coming in from different actors/threads ('callback queue'). Messages in the callback queue have priority.

Guarantees/Properties of a 'kontrActor':

  • callback code always runs in sender's (=caller's) thread.
  • messages are received by the actor in the order they have been sent to the actor
  • messages should be immutable to avoid hidden sharing of state. For kontraktor this means arguments passed to an actor call must be immutable.
  • kontraktor does no hidden copying of message data. Parameters of a message-method are passed by reference same as with ordinary method calls.
  • code of a callback runs 'later' (gets enqueued).
  • there is no concept of 'blocking' or waiting (like java.conc.Future.get()) as blocking inside an actor's implementation is the root of all evil. There are special utilities (Actors.Async()) to support calls to traditional synchronous blocking API.

How actors are scheduled

In contradiction to the image above, 'work queue' and 'callback queue' are not kept per actor instance. Instead a DispatcherThread owns those 2 queues. A DispatcherThread can run an arbitrary number of actors all sharing the thread and the queues. Frequently you might want to use one thread per actor.

kontraktor aktor dispatcher thread

Setting up an Actor

Though' not used in this documentation, I started to use a naming convention in order to distinguish asynchronous from normal synchronous methods, by adding a '$' to each async public method of an actor class. so init() from below would look like $init(). This helps once the codebase grows. Normal non-IO processing of course is done in plain old java, so its easier to identify asynchronous calls inside your logic.

I choose the implementation of an asynchronous logger as an example.

public class AsyncLogger extends Actor<AsyncLogger> {

    Logger logger;

    public void init(String filename) {
        logger = Logger.getGlobal();
        logger.setLevel(Level.INFO);
        try {
            logger.addHandler( new FileHandler(filename) );
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void infoTim(long time, String s) {
        logCount++;
        logger.info(new Date(time) + " " + s);
    }

    public void warnTim(long time, String s) {
        logCount++;
        logger.warning(new Date(time)+" " + s);
    }

    public void severeTim(long time, String s) {
        logCount++;
        logger.severe(new Date(time) + " " + s);
    }
  • all actors have to extend Actor
  • actors cannot have constructors. Instead initialization is done using a init method. Reason is, the init() method gets dispatched as a message to the actor, so run's in the actor's dispatcher thread.
  • overloading is not supported. This means each method has to have a unique name.
  • all public methods become 'messages' of the actor. If they are called from outside, they are executed asynchronously by adding the call as a message to the Actors queue.
  • a message (=public method) has to return either void or Future<>, else an exception will be raised at instantiation time.

Using SpawnActor() one can create an actor. 'Spawn' always implies creating a new DispatcherThread scheduling the newly created actor.

        AsyncLogger logger = Actors.SpawnActor(AsyncLogger.class);
        // message executed in the actors thread
        logger.init("./log/log.txt"); 
        // message executed in the actors thread
        logger.infoTim( System.currentTimeMillis(), "Hello" );
        [..]
        logger.stop();

Note that the init() method already is a message put onto the new actor's queue.

SpawnActor vs AsActor

Actors can be created:

  • from inside another actor
  • from 'outside' the actor system (e.g. in main(String arg[]) thread)

If created from outside the actor system, AsActor() and SpawnActor() behave identical. If created from inside an actor, AsActor() schedules the newly created actor on the DispatchingThread of the current actor (no thread created) whereas SpawnActor always creates a new DispatchingThread for the newly created actor. Note there are also ways to explicitely pass the DispatchingThread an Actor is run off. The DispatchingThread of an actor can be obtained from an actor using getDispatcher().

Example:

public class HttpActorServer extends Actor<HttpActorServer> {

    AsyncLogger logger;
    RequestProcessor processor;
    ServerSocketChannel socket;
    Selector selector;
    SelectionKey serverkey;
    ByteBuffer buffer = ByteBuffer.allocate(1024*1024);

    public void init(int port) {
        // use my thread:
        logger = Actors.AsActor(AsyncLogger.class);
        logger.init();

        // use a separate thread
        processor = Actors.SpawnActor(RequestProcessor.class);

Note that Actors need explicit termination if they should stop. If An Actor spawns sub-actors, it should also terminate those e.g.:

    public void stop() {
        super.stop();
        processor.stop();
        logger.stop();
    }

Actor LifeCycle

Each DispatcherThreads maintains a counter for the number of actors scheduled. To stop an actor operations, call actor.stop(). This will decrement the counter on the associated DispatcherThread. Once the counter is 0, the DispatcherThread will terminate itself.

this and self()

While the standard java 'this' references directly to the actor implementation, the self() call refers to the queue backed proxy object of an actor. In order to avoid casts, use extends Actor<[your actor class here]>.

public class HttpActorServer extends Actor<HttpActorServer> { ..

example for self() vs this

    public void someActorMethod(int x) {
        if ( x == 0 )
            return;
        else if ( x <= 100 ) {
            computeSomething(x); // normal direct method call
        } else {
            self().someActorMethod(100);   // enqueue message to this instance
            self().someActorMethod(x-100); // enqueue message to this instance
        }
    }

usage of self() to have kind of a main loop():

    public void mainLoop() {
        [..stuff, e.g. poll network adaptor..]
        // free CPU in case nothing to do
        // note you'd probably use some kind of exponential backoff 
        // using an idleCounter
        if ( noStuffHappened ) {
            LockSupport.parkNanos(1000*50); 
        }
        if ( ! terminated ) {
            self().mainLoop();
            //calling without self() would brake system as the queue gets not polled anymore
        }
    }

Callerside Methods

When logging asynchronously one actually wants to use the timestamp at caller side, therefore it needs to be provided with each call to our synchronous logger. To ease this, there is the possibility to execute methods in the callers threads using the @CallerSideMethod annotation. Another use case is to expose internal constant actor state (e.g. immutable state) to outer threads without having to do async calls. Note you need to refer from the proxy to the actual actor then@CallerSideMethod public int getId() { return getActor().id; }.

Callerside methods have no access to the state of the actor, they only may call other methods of an actor. This is enforced by the runtime as a callerside method is invoked on the wrapping 'proxy' object of an actor, holding no state.

    @CallerSideMethod public void info(String s) {
        // logCount variable will be 0 always here as this method is executed in the proxy,
        // not the actual actor to these methods
        infoTim(System.currentTimeMillis(), s);
    }

    @CallerSideMethod public void warn(String s) {
        warnTim(System.currentTimeMillis(), s);
    }

    @CallerSideMethod public void severe(String s) {
        severeTim(System.currentTimeMillis(), s);
    }

Invoking ´logger.info("Hello");` will do the following:

  • invoke the info() method on the actor proxy in the current thread.
  • inside the info() method a call to 'infoTim()' is done. This is a regular actor method, so a message is enqueued to the actor's mailbox and processed in the actor's dispatcherthread later on.

CallerSideMethods are rarely used, however they come in handy if you need them (e.g. do sanity checks int the sender's thread).

Callbacks

Kontraktor requires a non-blocking style of operation. However it must be possible to retrieve results or values from an actor. Kontraktor uses callbacks or futures/promises for that.

There are two kinds of callbacks:

  • built in Callback class, acknowledged and handled automatically by kontraktor framework
  • wrap arbitrary callback interfaces using Actor.InThread() or @InThread annotation (slightly more expensive than the hardwired variant).

Using arbitrary callback interfaces

Using the @InThread parameter annotation advises kontraktor to transfer execution of a callback to the senders thread (by enqueuing a message on the sender's callback 'mailbox').

This feature exists to ease integration with non-actor code. Frequently methods of e.g. a listener interfaces are delivered in a foreign thread thereby forcing your class/actor into multithreading. @InThread manages to schedule calls onto an actors DispatcherThread.

If you write an actor based application from scratch, it is favorable to use futures and promises. However frequently one has to deal with mixed threading models if one migrates to actor-style incrementally. Additional we have to be able to deal with non-actor-based libraries.

public static interface SomeCallbackHandler {
    public void myCallback( Object result );
}

public static class Sample extends Actor<Sample> {
    int count = 13;
    public void getResult( @InThread SomeCallbackHandler cb ) {
        cb.myCallback(count);
    }
}

```java
// code in a sending actor, retrieve a value from Sample asynchronously
Sample bs = SpawnActor(Sample.class);
bs.getResult( (result) -> {
        int res = result.intValue(); // runs in my thread
        [...]
    });

Warning: if @InThread is ommited, the callback will be run directly in Sample's thread, so the sending code becomes multithreaded.

Alternatively you may also use Actor.InThread() on caller side to achieve the same without using the annotation.

public static class Sample extends Actor<Sample> {
    int count = 13;
    public void getResult( SomeCallbackHandler cb ) { // no annotation !
        cb.callbackReceived(count);
    }
}

[code in a sending actor]callbackReceived
Sample bs = SpawnActor(Sample.class);
bs.getResult( Actors.InThread( (result) -> {
        int res = result.intValue(); // runs in my thread
        [...]
    }));

Note: if you both annotate and use Actors.InThread(), the result will be enqeued twice to the sender's queue.

Built in Callback class

The Callback class is known to the internal framework of kontraktor. Execution is scheduled by kontraktor without the need for annotation.

public static class Sample extends Actor<Sample> {
    int count = 13;
    public void getResult( Callback<Integer> cb ) {
        cb.receiveResult(count,null);
    }
}

on the caller (sender) side:

Sample bs = SpawnActor(Sample.class);
bs.getResult( new Callback<Integer>() {
        @Override
        public void receiveResult(Integer result, Object error) {
            int res = result.intValue();
            [...]
        } or special handling
    });

nicer with lambdas:

Sample bs = SpawnActor(Sample.class);
bs.getResult( (result, error) -> {
         int res = result.intValue();
         [...]
    });

The callback is executed in the thread of the caller/sender. Behind the scenes a wrapper is created. Once the receiver calls receiveResult() on the callback, a new message is created and put onto the (callback) queue of the sender.

If the sending code is not part of an actor (non-actor world), the callback is done in the thread of the receiver, so you get the usual implicit callback-induced multithreading many java applications suffer from.

Futures and Promises

(for in-depth discussion see also Callbacks,-Futures,-Promises-and-Messages )

In kontraktor, error and result of a callback are delivered in one single method resultReceived. A Future is an interface extending callback. A Promise is the implementation of a Future. A Future can receive only one result once (by calling resultReceived on the implementing Promise). In contradiction to other libraries only one listener(=callback or other future) is allowed per future, however its possible to chain futures using then to get same effect. Futures/Promises are designed to be used from within Actors. They should work outside from normal multi threaded code, however in these cases the single thread guarantee cannot be given (callback/then will be executed in a different thread to caller thread).

in contradiction to java.concurrency.Future and other implementations, kontraktor's future has no thread-blocking wait-method.

Calling non-Actor asynchronous API

Using Actors.InThread() one can keep actors single threaded even when calling non-actor asynchronous APIs.

[code inside actor]
SomeAsyncNonActorService badService ...;
badService.query( Actors.InThread( (result) -> {
        // runs in my thread
        [processing]
    }));

the InThread method ensures the callback coming in some arbitrary thread schedules a message in your actors mailbox, so the actor code stays single threaded.

Invoking blocking operations

public static class TestBlockingAPI extends Actor<TestBlockingAPI> {
  public Future<String> get( final String url ) {
    Promise<String> content = new Promise();

    Async( // execute a blocking call async using Actors.Execute
      () -> new Scanner( new URL(url).openStream(), "UTF-8" ).useDelimiter("\\A").next() 
    ).then(content); // fulfill promise

    return content;
  }
}

Queue size + Lock Backoff

By default the mailbox is limited to the size of DispatcherThread.DEFAULT_QUEUE_SIZE. The default size can be modified using Actors.setDefaultQueueSize(). Additionally one may specify different queue sizes when creating an actor.

Kontraktor uses spinlocks for efficiency reasons. The backdraw of spin locks is Idle CPU consumption. By default a strategy is employed which degrades stepwise from busyspin => yield => park => sleep in order to free CPU consumption. The backdraw of this is increased latency in sudden burst scenarios.

The Backoffstrategy can be set on the DispatcherThread object obtained by actor.getDispatcher();. In order to lower idle CPU consumption decrease the number of busy spins and yield cycles.

Golden rules

  • Never ever block inside an actor method. This includes calls to blocking systems as well as long running loops. Try to give back control as fast as possible, so the next message or callback can be processed. Consider splitting large jobs/iterations into several smaller invocations on self(). Use Actors.Execute in order to isolate blocking code.
  • Don't mess up with callbacks and get a second thread into an actor's code. Stay single threaded inside an actor
  • Don't modify data objects passed to an actor method (not in sender not in receiver). This way you silently introduce shared data. Copy if you cannot avoid it.
  • If you cannot avoid shared global data, use lock free structures for them. Treat any external data accessed from within an actor as 'shared'. Apply concurrency control for those data structures.
  • Avoid cyclic direct actor calls e.g. actor A messages actor B, inside B's message handler message actor A. Use callbacks instead
  • If passing an actor references, pass "self()" not "this"

#Callbacks, Futures, Promises and Messages

Note: I am aware stuff below is hardly understandable. Working towards a more clear and structured documentation with 2.0, which also adds transparent remoting.

If you are already familar with callbacks and futures:

In kontraktor, error and result of a callback are delivered in one single method resultReceived. A Future is an interface extending callback. A Promise is the implementation of a Future. A Future can receive only one result once (by calling resultReceived on the implementing Promise). In contradiction to other libraries only one listener(=callback or other future) is allowed per future, however its possible to chain futures using then to get same effect. Futures/Promises are designed to be used from within Actors. They should work outside from normal multi threaded code, however in these cases the single thread guarantee cannot be given (callback/then will be executed in a different thread to caller thread).

in contradiction to java.concurrency.Future and other implementations, kontraktor's future has no thread-blocking wait-method.

###What is a Future ?

Disclaimer: Keep your app simple. Frequently one or two Actor's will be sufficient. You rarely will need the (somewhat) mind-boggling future stuff outlined below. Create a second (Actor)thread only if you really need a second core.

Callbacks are a 'honest' and easy to understand methodology to pass results from an asynchronous actor-message-call back to the calling actor. However there are use cases, where things get messy if solved with callbacks. Therefore Futures/Promises have been introduced with Kontraktor 1.1.

To avoid mystic explanations ("a handle for a computational result avaiable in the future"), I'll try to explain it from a very technical perspective.

In Kontraktor, Future is an interface and Promise is the implementation of that interface. So they are kind of synonym.

think of the following actor method (or message):

public void computeSomething( Callback resultReceiver ) {
   Double result = [..compute..];
   resultReceiver.receiveResult( result );
}

the caller then does something like:

   actor.computeSomething( (result,error) -> System.out.print("result:"+result) );

But what if I want to execute three tasks sequentially before continuing ? I'd get awkward nested code, as the 2cnd call has to be done inside the callback handler of the first one and so on.

With actor's its easy to write asynchronous calls, but it can be tricky to write sequential, ordered execution sequences.

Another hard-to-achieve task is to send a message to several other actors and continue processing (without blocking or waiting) once all messages have been replied:

   actor1.computeSomething( (result,error) -> System.out.print("result:"+result) );
   actor2.computeSomething( (result,error) -> System.out.print("result:"+result) );
   actor3.computeSomething( (result,error) -> System.out.print("result:"+result) );

   // ? how to continue once all results are there ?

We can create a class implementing Callback and a new variable 'myResult' to save the result received when called back:

   CountDownLatch latch = new CountDownLatch(3);
   MyCallback cb1 = (result,error) -> { 
         myResult = result; 
         latch.countDown(); 
   };
   MyCallback cb2 = (result,error) -> { 
         myResult = result; 
         latch.countDown(); 
   };
   MyCallback cb2 = (result,error) -> { 
         myResult = result; 
         latch.countDown(); 
   };
   actor1.computeSomething( cb1 );
   actor2.computeSomething( cb2 );
   actor3.computeSomething( cb3 );

   // TERRIBLY WRONG. NEVER BLOCK OR WAIT !!!!!!!!
   // TERRIBLY WRONG. NEVER BLOCK OR WAIT !!!!!!!!
   // TERRIBLY WRONG. NEVER BLOCK OR WAIT !!!!!!!!
   latch.await();

Pretty messy and terribly wrong (because: "Never block"). In order to avoid clutter and ease the caller side's code, we can use a Promise. Using a Promise, we can move creation of the Callback from sender side to inside the computeSomething() method. Promise implements Future which is a subclass of Callback.

// note Future is the interface, Promise the implementation
public Future<Double> computeSomething() {
   Double result = [..compute..];
   return new Promise( result );
}

Since Future inherits Callback, the caller can now write:

   Future result = actor.computeSomething();
   result.then((result,error) -> System.out.print("result:"+result) );

or more elegant:

   actor.computeSomething().then(
      (result,error) -> System.out.print("result:"+result) 
   );

We just moved the callback code outside the '(..)' of the method call, thereby reducing code nesting by one level.

We can take this a step further. Consider the computation involves calling another asynchronous method on another actor. We can return an 'unfulfilled' Promise immediately which is fulfilled later on (by calling resultReceived on the Promise).

public Future<Double> computeSomething() {
   Promise promise = new Promise<Double>();
   yetAnotherActor.asyncCall().then( (r,e) -> promise.resultReceived(r,e) );
   // shorter: yetAnotherActor.asyncCall().then( promise );
   return promise;
}

so the caller will enter his "then" action once the two nested asynchronous calls have completed.

But back to the problem of waiting for several calls to complete. With Futures this is much easier to achieve. Consider this actor which emulates having some computational work in its 'sleep' message handler.

    // sample actor to simulate some random work
    public static class SleepActor extends Actor<SleepActor> {

        public Future<Long> sleep() {
            long millis = (long) (Math.random() * 1000);
            try {
                Thread.sleep(millis); // emulate some work
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Promise<>(millis);
        }

    }

Side note: class SleepActor extends Actor<SleepActor> is a trick to enable correct typing of the self() method.

inside a calling actor:

   // somewhere in init
   act = new SleepActor[4];
   for (int i = 0; i < act.length; i++) {
       act[i] = Actors.SpawnActor(SleepActor.class);
   }
   [...]

   yield( act[0].sleep(), act[1].sleep(), act[2].sleep(), act[3].sleep() ).then( 
       (res, err) -> {
           for (int i = 0; i < result.length; i++) {
               Future future = result[i];
               System.out.println("result "+i+" "+future.getResult());
           }
       });

the 'then' closure will be executed once all operations (which run in parallel) are completed.

Side note: executing in order one after another:

act[0].sleep()
  .then( (r,e) -> act[1].sleep() )
  .then( (r,e) -> act[2].sleep() )
...

then vs map

If then is chained like future.then( (r,e) -> .. ).then( (r,e) -> ) ), the result thrown into the handler of each then method is the result of the initial future. However often a pipeline-alike processing is wanted. So with future.map( (r,e) -> return new Promise(r.toString); ).then(..) the then clause will receive the result of the map closure.

Conclusion

Futures are much more convenient once you get used to them, however its easy to create mind-boggling code using futures. My advise is to start with plain Callback's, you'll find out yourself when you need futures. By the time you'll probably use messages returning futures everywhere.

Performance

Pure Callbacks are preferable when you have very time critical code and there is not much work to do inside an actor's message. Because an actor's message implementation cannot know wether the caller needs a future, it always has to create the resulting Promise, even if it is not used later on.

###Messages

As public methods of an actor are always treated as messages, there is no natural way of obtaining a 'message Object'. However there is a solution:

import static de.ruedigermoeller.kontraktor.Actors.*;
[...]
Message msg = msg($$(SleepActor.class).init("saved message"));

Technically $$() creates a proxy class which enqueues incoming method calls into a thread local array, which then is cleared using one of the msg(), currentMsg(), seq(), currentSeq() methods. Therefore its pointless to try to build up MessageSequences across a complete call chain.

Only regular public messages (=methods in kontraktor) can be used this way. Messages or @Callerside or built in Actor methods cannot be obtained. There are only a few sanity checks, so take care.

This gets clearer if you want to obtain a message object for a 'void' method (somewhat ugly, but not 'workaroundable' [cannot pass void as parameter type to a method] ).

[...]
$$(SleepActor.class).voidMethod("saved message");
Message msg = currentMsg();

$$() is a static method of the Actors class, its favorable to import static the Actors class. Messages obtained with $$() have a null-target, so you have to provide a target actor later on like message.send(actor)

While $$() creates Message Objects with null target, there is the $() at the Actor class. With $() message object have the target of the actor instance the $() is called on.

Message Sequence

[...]
MessageSequence sequence = 
  seq( workerA.$().doWork(..),
       workerB.$().doSomething(...),
       workerC.$().doDifferentWork(...),
  );

again if 'void' returning messages are involved you need:

workerA.$().doWork(..);
workerB.$().doSomething(...);
workerC.$().doDifferentWork(...);
MessageSequence sequence = currentSequence();

Messages and MessageSequence's can be sent, yielded, exec'ed (executed one after another) and manipulated. I admit I have not yet discovered the potential of this feature, however in general having another level of abstraction can help to implement elegant concepts.

Example on how to init a hoarde of sub actors, after they signal completion, fullfil a finished Promise:

final SleepActor workers[] = new SleepActor[4];
for (int i = 0; i < workers.length; i++) {
    workers[i] = SpawnActor(SleepActor.class);
}

msg( $$(SleepActor.class).init() ).yield( workers ).then( finished ); // yay !

#Solutions to typical problems of async concurrency

This page collects solutions to typical asynchronous patterns. Its work in progress

TicketMachine

..kontraktor.util.TicketMachine is a utility class solving a common problem:

Example: Your actor needs to process a stream of stock pricing events covering different stocks (e.g. market data stream). For each data item an asynchronous call (sync would be too slow) to a remote service must be done (e.g. a remote key value store or in memory data grid). While price events covering different stocks can be executed out-of-order concurrently, price events on the same stock have to be processed in the order they come in. However the duration of an async query of a remote system can vary, so it might happen you process the event in invalid order.

e.g.

2 Events: [BMW,100,12.14], [FDAX,500,1.14] 
=> make async calls 
=> "BMW" query response is received first, so reordering happens.

This case is not problematic, but now this one should not happen:

2 Events: [BMW,100,12.14], [BMW,500,12.16] 
=> make async calls 
=> "BMW,500,12.16" query response is received first, 
so reordering happens. But we would have need to 
process [BMW,100,12.14] first !!

So you have a stream of events containing an 'item'-bound requirement for ordered execution.

The ticket machine now helps in ordering.

  1. you ask for a ticket for processing a "BMW" event.
  2. you receive a future and register your processing routine as callback on this future.
  3. the ticket machine calls your processing closure giving in a new future you have to call once processing of your item is finished.

The ticket machine schedules processing such that all events to a single item are processed in order, events to unrelated items are processed concurrently.

public void $process( StockEvent stock, long sequence ) {
  machine.getTicket(stock.getName()).then( (Future finSignal, Object error) -> {
    // async query
    grid.$query("some query where x=$stock.quantity").then( 
      (queryresult, error) -> {
        // query result received
        [compute, compute, compute];
        [update something]

        // now tell ticket machine you are done and it 
        // might trigger the next event for
        // this stock if present
        // "done" is just a dummy, only signal needed
        [other async stuff might be here]
           .then( (r,e) -> finSignal.receiveResult("done", null) ); 
    });
  });
}

Note the ticket machine only holds queues in case a second price event for e.g. "BMW" comes in while the previous one is still being processed. So even if there are 100k different stocks, you won't get into memory trouble. This way processing can be single threaded and asynchronous, only in rare cases (e.g. you receive a sequence for a single stock) your actor thread will be idle.

Note this problem could also be solved by having a dedicated actor for each Stock, however this will be problematic once you face streams with a huge amount of different items (=stocks).

A full (unfortunately jdk7-no-lambdas-style) example can be found here