Skip to content

z archived Kontraktor 3

moru0011 edited this page Aug 5, 2017 · 1 revision

Table of Contents

Overview

Kontraktor provides an abstraction layer to build distributed or client/server applications with little effort and high flexibility making system distribution and network infrastructure a deployment time decision

The smallest distributable entity is made up by a service having an asynchronous API (Actor [or eventloop, isolate, Service ..]).
These actors can be distributed as needed using various networking "connectors". Of course they can also run inside a single process providing a message passing paradigm instead of traditional multi threading.

As Kontraktor frees applications from networking and marshalling details, it provides a mighty abstraction with many use cases. Kontraktor does not provide pre-built solutions for imaginary use cases, but helps and simplifies creation of domain specific systems and applications.

  • High Productivity, High Performance & Scalability

    Kontraktor implements a high performance asynchronous remote message passing scheme featuring a pluggable transport. Wether you are connecting Actors/Services locally, via Http (Long/Short Polling), WebSockets or TCP, binary encoded messages or json for interoperability: Kontraktor strips away unnecessary networking and marshalling details and differences. Your application code keeps the same.

  • Typed Actors, asynchronous foundation.

    Actors are modelled as a special kind of Java classes, so messages become asynchronous method calls. This keeps existing tooling (refactoring, code completion), reduces boilerplate and keeps code maintainable.

  • Abstraction over 'Frameworking'

    Kontraktor does not provide pre-built solutions, but helps you building solutions faster.
    Use Kontraktor Actors to partition your application into (potentially) distributable asynchronous services, power WebApps, thousands of clients, connect high throughput systems, built your very own SOA, microservice or HA platform.

  • Service oriented Actors

    Kontraktor features a service oriented peer-to-peer (or client-server) model of distribution. This matches today's heterogenous infrastructure; nodes can connect via TCP inside LAN, via websockets or http long poll over WAN.

  • JavaScript interoperability, Android + Java 7 client support

Current Java universe is a poor fit for modern responsive JavaScript powered browser app's. Kontraktor enables seamless and scalable communicaton with JavaScript Clients.

What is kontraktor good for ?

  • simpler concurrency model inside "monolithic" applications
  • organic scaling. Start with a monololith and split your app as requirements grow.
  • swiss army knife for distributed applications. Expose existing software as a service within minutes via http long poll, websockets or tcp or all at once.
  • high performance system interconnect (high point to point message volume) as well as client server patterns (one service, thousands of clients).
  • high level, high productivity network programming. Life is too short to deal with marshalling and network transports manually.

Concrete use case examples:

  • split fat java applications into separate JVM's to reduce GC pauses
  • replace slow + inflexible infrastructure components (e.g. message queues's) by lightweight but highly customizable actor nodes.
  • Distributed load tests simulating thousands of clients
  • Build single page javascript application back ends using the js4k.js library.
  • cross platform connectivity using (optional) Json encoding
  • generic remote software deployment
  • remote logging/metrics service
  • simulation applications, distributed simulations
  • distribute computing intensive applications e.g. machine learning, real time analytics

Kontraktor Artifacts

cut and paste maven dependencies

artifact functionality platform
kontraktor-core actor implementation + TCP remoting Java 8
kontraktor-http websocket + http long poll remoting support. Depends on kontraktor-core Java 8
js4k.js javascript websocket/http long poll client (typeless tell/ask style API) ES5
kontraktor-reactive-streams Implementation of reactive streams spec 1.0 . Supports streaming over network. Java 8
kontraktor-bare lean standalone http long poll client API Java 7, Android

examples (SPA web apps, decoupling, ..) : https://github.com/RuedigerMoeller/kontraktor/tree/trunk/examples

##Reactive Streams 1.0 interop

see https://github.com/RuedigerMoeller/kontraktor/tree/trunk/modules/reactive-streams

Actors

Creating an Actor

1. Define a public class inheriting actor

public class MyActor extends Actor<MyActor> {
}

don't define a constructor (use async init() method instead).

You can also inherit from existing actors. If the Actor<MyActor> gets you into compilation trouble, avoid it by

public class MyActor extends Actor {
    @Override MyActor self() { return super.self() }
}

(Generics trickery just used to get proper return type of self())

2. All public methods become async messages. Define them

public class MyActor extends Actor {
    public void init( ... ) {}
    public void fireAndForget() {}
    public IPromise requestResponse(args,..) { return new Promise("empty"); }
    public void withCallback(args,..,Callback callback) { 
        colback.stream("Hello"); 
        colback.stream("Hello 1"); 
        callback.finish(); 
    }
    private String normalHelperMethod() { return "nothing special with me"; }
}

similar to other actor implementations (e.g. Microsoft Orleans), only void or IPromise methods are allowed async (=public). Non-public methods are just like normal methods (synchronous).

Overloading is not supported for public actor methods:

e.g.

public IPromise getNameOf(X x) { return new Promise(x.getName()); }
public IPromise getNameOf(Y y) { return new Promise(y.getNick(); }

will result in an error when starting the actor. Replace by something like

public IPromise getNameOfX(X x) { return new Promise(x.getName()); }
public IPromise getNameOfY(Y y) { return new Promise(y.getNick(); }

3. run your actor

MyActor act = Actors.AsActor(act); // act == act.self() - a proxy
act.init( ... );

4. keep this secret, use self() instead

childActor.init( self() );

5. publish it on the network if needed

new TCPPublisher(act, 4001).publish();

6. connect it from another machine/process

ConnectableActor con = TCPConnectable(MyActor.class,"localhost",4001);
MyActor act = (MyActor) con.connect( disconnectCallback );

7. stop it

act.stop();

Implementation, Terminology + API

Technically, an Actor divides into the actual Actor class implementation and a proxy object (Actor Reference) which automatically transforms all calls to public methods into messages put to the mailbox (or forward over network link).
The Actor Ref (Proxy) only is handed out to other remote or local components of an application.

DispatcherThread

Each Actor is assigned a thread ("DispatcherThread"), however many actors can share one. The dispatcher thread polls events from the actor's mailbox + callback queue and executes them. So inside one actor, everything is single threaded.

mailbox

The size of the mailbox queue is bounded. If the queue is full because of overload, sender gets blocked or optionally gets an exception thrown (>3.16: not anymore, see below). The mailbox queue size can be set when creating an actor.

callback queue

Similar to other eventloop systems, Kontraktor provides 2 queues per Actor: mailbox and callback queue. The callback queue has higher priority.

The callback queue is used to enable thread safe callback delivery. Unlike in other actor implementations, the following code is thread safe ("close over actor state"):

(implementation inside an actor)

HashMap plainMap = new HashMap();      // <= This
otherActor.doSomething().then( result -> {
   plainMap.put( "Yes", "you can" );   // and this run in the same thread !
});

Kontraktor manages the thread switching for built in Promise and Callback classes. With help of ìnThread(), its possible to wrap any interface on-the-fly into a thread-switching callback. This avoids the need to deal with tricky and error prone details of concurrent state modification and rules of memory visibility as defined in the JMM.

EcmaScript Terminology

To avoid "alienation" and name clashes with javax.concurrent, Kontraktor 3's terminology is borrowed from EcmaScript 5, 6 and 7. Many public educational articles/blogs dedicated to EcmaScript's promises also apply to Kontraktor, which reduces the amount of required documentation on my side :).

Namely:

  • A "Future" is called Promise. Kontraktor's Promise class is similar to Java 8's CompleteableFuture, naming is borrowed from EcmaScript 6 Promises (with minor adaptions+extensions)
  • Promise.then, Promise.catchError, Actors.all, Actors.race like EcmaScript 6.
  • Promise.await (non blocking await), Actor.yield. Both implemented stackbased (see below on implications).

there are various implementation related reasons Kontraktor cannot use JDK 8's javax.concurrent async additions directly, however a tighter integration/bridgeing is considered for future releases.

Promise

Promise in kontraktor differs in implementation a bit from ES6:

  • as Java is strongly typed, methods returning a promise need to return the IPromise interface (there are more than one implementations of this, required for e.g. remoting). Example: public IPromise getPromise() { return new Promise("Hello");}
  • one does not pass a callback, but the result into a Promise constructor.
    • new Promise() - unressolved (pending) promise. Later than call reject(error) or resolve(result) or just complete(result,error)
    • new Promise("hello") - promise is immediately ressolved
    • new Promise(null,"error") - promise is immediately rejected

The caller side then adds a callback using then( result -> .. ). Note that Kontraktor handles the case where the result/error is available before the callback has been added. Both resolving and registering a callback using then (+ derivates like thenAnd, onError, onTimeout, ..) can be arbitrary delayed.

Though there are many blogs and articles (of varying quality), i'll try to give a brief explanation for the two major async primitives:

A Callback is a lambda function created at caller side and then passed to another actor. The receiving actor then can send an arbitrary amount of objects to the caller until it "closes" by signaling no further data will be sent. It works like a streaming channel. (Actually a callback is transformed into a message put to the mailbox of the caller, so the callback method is executed in the thread of the caller).

Implementation in Actor subclass. (can run remote)

public void streamMyName( Callback<Character> channel ) {
    myName.chars().forEach( ch -> channel.stream( (char) ch) );
    channel.finish();
}

client usage:

actor.streamMyName( (ch,err) -> {
    if ( ! Actors.isFinal(err) ) // check for channel finish message
        System.out.print(ch);
});

A Promise is somewhat similar to a Callback, but created by the callee. The caller then registers a lambda receiving the result thrown in by the callee. Note that unlike to callbacks, only one result or error can be sent to a Promise.

public IPromise<Boolean> findInUrl( URL url, String toFind ) {
    Promise result = new Promise();
    someAsyncApi.getURLData().then( 
        data -> result.resolve( data.indexOf(toFind) >= 0 ) 
    ).catchError( err -> result.reject(err) );
    return result;
}

usage:

actor.findInUrl( url, "Java" ).then( res -> ... ).catchError( err -> ... ); 

or handle result and errors with one lambda

actor.findInUrl( url, "Java" ).then( (res, err) -> ... ); 

or return another promise and execute the chained one therafter (ordered execution):

actor.findInUrl( url, "Java" ).thenAnd( (res, err) -> { 
        ...; return promise; 
    })
    .then( resOfPromise -> .. )
    .catchError( catchedErr -> .. ); 

or use a timeout

actor.findInUrl( url, "Java" ).timeoutIn(5000)
    .then( res -> ... )
    .onTimeout( to -> ... ); 
    .catchError( err -> ... ); 

Promise.await enters the event-poll loop and returns once the the promise is resolved. This means other messages potentially modifying your Actor's state are processed meanwhile. Note that await does not block the Actor's thread this way.
If await is called from a a non-actor thread, the calling thread is blocked until the Promise is resolved.

// non blocking await, errors will be transformed to exceptions
Boolean found = actor.findInUrl( url, "Java" ).await(5000); 

This reduces the amount of nesting when using asynchronous API.

Promise.yield(), Promise.yield(timeMillis). yield() enters the eventloop, processing messages from the mailbox if available. It returns once queue(s) are empty. This reduces queue sizes and latency if an actor method enqueues a lot of callbacks to self() or does a long iteration.
yield(timeMillis) enters event loop polling for other messages for the given amount of milliseconds.

A Spore is a snippet of code sent to and processed by the receiver (remotely or inside the thread of another actor). This enables to "send code" to a remote actor, let it work on remote data and stream results back to the callee (=sender).

Actor.current() delivers the currently active actor. As the Actor class implements the Executor interface, this can be used to execute code on the actor's thread. It throws an exception if called from outside actor code. Use Actor.inside() to find out if running inside an Actor thread.

Actor.delayed(timeMs,Runnable) executes given runnable delayed on the actor's thread.

Example below does not produce a stack overflow (runnable is put on actor mailbox like a message). Also the actor is not blocked. One can schedule thousands of parallel tasks on a single thread this way

public void parallelLoop() {
    if ( ! isStopped() ) {
        // do stuff each 500 ms
        delayed( 500, () -> parallelLoop() );
    }   
}

Actor.exec(Callable) enables executing blocking operations (e.g. database queries) in a thread pool. The result of the callable will be delivered thread safe inside the actor's thread to the promise returned by 'exec'. This way the actor thread is not blocked.

Actor.stop() stops execution of current actor. If there are no other actors scheduled on it, the associated thread will terminate.

Actor.tell( "messageName", ... args ), IPromise Actor.ask("messageName", ... args ) untyped messaging interface for actors.

Actor.serialOn( key, promise ) enables guaranteed order of processing by key. Useful to implement non-blocking transactional processing.

Mixing Actor's and 'old school' multithreading

One can use actors (message passing/shared nothing) instead of multithreading/shared state by using actors to replace threads.
However, asynchronous programming can be hard to get used to, in addition most of current Java API / frameworks contain lots of blocking calls.

Kontraktor's async primitives (Promise, Callback) automatically detect wether they are running inside an actor DispatcherThread or not.

E.g. Promise.await() invoked inside an actor does "pseudo-block" instead of blocking execution. The current (Dispatcher-)thread enters the actor's eventloop, processing incoming messages while waiting (after each message processed, the promise state is checked).
If Promise.await() is called from within an 'ordinary' java thread (no actor code), await will block the current thread until the promise result is available.

So in order to avoid Promise/Callback style in ordinary multithreaded java code, just use await() to get de-facto synchronous calls.

Deal with blocking API (e.g. JDBC)

As you CANNOT block the actor's DispatcherThread (even if done rarely, it always fails ..), blocking calls need to get 'outsourced' to a thread pool. Use Promises/Callback's to stream back results 'into' the actors thread. The Actor.exec( Callable ) is an util to safely execute blocking API without blocking the actor's DispatcherThread. Note there is Actors.MAX_EXTERNAL_THREADS_POOL_SIZE (default 1000) defining the max. amount of threads available to run blocking calls.

example: Use exec() to fetch an URL content using thread blocking API

public IPromise<String> get( final String url ) {
    Promise<String> content = new Promise();
    exec( () -> {
        // here goes the blocking code snippet (runs in threadpool thread)
        return new Scanner(new URL(url).openStream(), "UTF-8").useDelimiter("\\A").next();
    }).then( (result, error) -> {
        content.complete(result, error); // runs in actor thread
    });
    return content;
}

just as a note: this can be shortened:

public IPromise<String> get( final String url ) {
    Promise<String> content = new Promise();
    exec( () -> {
        // here goes the blocking code snippet (read URL content)
        return new Scanner(new URL(url).openStream(), "UTF-8").useDelimiter("\\A").next();
    }).then( content ); // don't need manual forwarding code here
    return content;
}

handling failure

When connecting a remote actor, connect method should be given a callback which is fired on disconnect. There is no catch all solution for recovering a disconnect to a remote actor. Because its undefined which messages actually have reached the remote actor, most of the time a stop actor + reset + reconnect will be the only available option.

Advanced mechanisms such as acknowledged message transfer have to be added at application level (e.g. sequence important messages or let them be acknowledged by the receiver).

Published Actors can listen to clients disconnecting by implementing the RemotedActor interface: e.g.:

public class MyHttpAppSession extends Actor<MyHttpAppSession> implements RemotedActor {
[...]
    @Override
    public void hasBeenUnpublished() {
        app.clientClosed(self());
        self().stop();
    }
}

Actor Lifecycle

Creation of an actor immediately starts it. Actor.stop() stops the actor. If there are no other Actors run by the associated DispatcherThread, the DispatcherThread will terminate.

Queue sizes, blocking options

With 3.16 Kontraktor defaults to unbounded queues, so stuff below partially does not apply anymore. The old bounded queue implementation can be enabled by setting the public global flag in the Actors class (not recommended). Because of the blocking nature of bounded queues, system behaviour completely changes once a queue is full, leading to very unpredictable behaviour. In practice/production I found myself to always increases queue sizes heuristically to a point where they never will get full, leading to issues once this happened due to load peaks. Giving up on bounded queues has some implications, especially methods involving computation of queue sizes get very expensive ( O(n) ), also system overload will result in OOM instead of blocking/deadlocks, so putting a fuse on system master input is strongly recommended.

By default each actor gets its own thread and a default queue size (32_000). Its possible (and necessary) to increase or decrease the queue size when creating an Actor. E.g. a Server facade serving many connections might want to have a queue of 256_000 elements, an Actor representing a web user session only needs a small queue of say 1000 (this is of importance when having thousands of session actors [see spa-knockout example]).

MyActor act = Actors.AsActor(MyActor.class, [QUEUE SIZE]);

If an actor queue is full, actors / threads trying to send messages to it are blocked (after 5 seconds a warning is logged). In case of a 'facade-role' actor dispatching to thousands of session actors, this is unwanted, frequently the client has just disconnected, so messages cannot be forwarded anymore.
Therefore one can use act.setThrowExWhenBlocked(true); so the sender gets an exception thrown instead of being stuck.

The alternative to blocking is unlimited queue growth leading to performance degradation, self re-inforced more queue growth until out of memory. In addition linked list's do not perform too well (pointer chaising, cache misses). Kontraktor does not support that.

Scheduling

With 3.16 Kontraktor defaults to unbounded queues, so stuff below partially might not apply anymore.

One can assign a scheduler to an actor at creation time. Either by creating a scheduler explicitely like

SimpleScheduler sched = new SimpleScheduler(DefaultQSize);
MyActor act = Actors.AsActor(MyActor.class, sched);
MyActor act1 = Actors.AsActor(MyActor.class, sched);

act,act1 will be run on the same thread and inherit the default queue size of the scheduler.

Example for super simple load balancing (random assignment) from a web app server:

public class MyServer extends Actor<MyServer> {
[...]
Scheduler clientThreads[];

public void init() {
    clientThreads = new Scheduler[]{
        new SimpleScheduler(CLIENT_QSIZE),
        new SimpleScheduler(CLIENT_QSIZE),
        new SimpleScheduler(CLIENT_QSIZE),
        new SimpleScheduler(CLIENT_QSIZE),
    };
    Thread.currentThread().setName("MyServer Dispatcher");
}

public IPromise<MyHttpAppSession> login( String user, String pwd ) {

    // create new session and assign it a random scheduler (~thread). 
    // Note that with async nonblocking style
    // one thread will be sufficient most of the time. 
    // For computing intensive apps increase clientThreads to like 2-4.
    // This means you need to delegate any blocking operation outside of an actors thread

    int random = (int) (Math.random() * clientThreads.length);
    MyHttpAppSession sess = AsActor(MyHttpAppSession.class,clientThreads[random]);
}
[...]
}

backpressure, yielding

With 3.16 Kontraktor defaults to unbounded queues, so stuff below partially might not apply anymore. (a caller is never blocked now)

As an actor is run by a single thread, operations doing a large amount of work can be problematic, as meanwhile your mailbox gets full (as its not polled because your routine occupies the thread). Once your mailbox is full other actors get stuck/blocked in turn blocking your operation. Yes, actors can also deadlock.

Kontraktor provides some pragmatic help:

yield() calls the eventloop and processes all messages. isMailboxPressured() returns true if your mailbox is half full (similar isCallbackQueuePressured()).

A typical example is a synthetic benchmark, where you want to send out 10 million messages to a remote actor. As you can put messages faster to the mailbox queue than they can get remoted/sent, one piles up a huge amount of pending messages which consume memory and finally the mailbox will be full and block.

Example how this is solved (from remoting benchmark github repo):

public IPromise benchAskSum() {
    server.print("benchAskSum START"); // remote print ..
    for ( int i = 0; i < NUM_MSG; i++ ) {
        while ( server.isMailboxPressured() )
            yield(); // nonblocking wait. else actor thread gets stuck as messages queue up massively
        server.askSum(i, i + 1);
    }
    server.print("benchAskSum DONE");
    Log.Info(this,"Done");
    return resolve();
}

Note that yield() just jumps into message polling and returns once the queue is empty. No thread is blocked.

if yield() is called from non actor code, it's a NOP

Distributed Actors

As actor's use message passing to communicate/share data, its easy to just route messages over a network link in order to distribute applications. This is not a development time decision as actor's run fine sharing a single java VM (useful during test, development, debugging, downscale).

Transports

The following transport options are available

Transport Throughput tell/sec Throughput ask/sec supported # Clients Use case
None (Local) 7-8 million 4 million n/a (many) simpler concurrency model, system scale down
TCP Blocking 2.2 million 1 million <500 (2 Threads per client) System Interconnect
TCP NIO 1.8 million 0.8 million >10_000 Service Provider, System interconnect
WebSocket 2 million 1 million >10_000 System Interconnect, Serve GUI Clients, Service Provider
Http-Long Poll 400_000 250_000 >10_000 Serve GUI + Mobile Clients, Unstable networks
Http-Long Poll JSON 200_000 120_000 >10_000 Serve GUI + Mobile Clients, Unstable networks

Note: Throughput numbers above are for fast-serialization messages sent between two actor instances over the network. Test Messages pass/return an integer, Linux/XEON 2.5 Ghz.

Note 1: Scaling numbers are conservative, current limit is caused by memory consumption. 15_000 connections require ~0.8GB. Memory footprint will be reduced in future releases.

Throughput, messages pass/return fat 20-field Pojo:

| Transport | Throughput tell/sec | Throughput ask/sec | |----------|:-------------:|------:|------:|------:| | None (Local) | 7-8 million | 4 million | | TCP Blocking | 750_000 | 300_000 | | TCP NIO | 620_000 | 250_000 | | WebSocket | 750_000 | 550_000 | | Http-Long Poll | 230_000 | 140_000 | | Http-Long Poll JSON | 80_000 | 50_000 |

Encodings

The following encodings are available (they all directly support Java's Serializable and Externalizable interfaces, so zero to none extra work for application code):

  • fast serialization binary (java to java only). Best performance.
  • minbin binary json (java to java, java to javascript).
  • Json (java to java, java to javascript).
  • more might be added in the future if needed (e.g. protobuf, sbe)

Clients

The following standalone clients (no dependency to kontraktor-core.jar) exist:

  • JavaScript actor client (js4k.js). Allows to back a Single Page Application with Kontraktor Actors with a few lines of code (see examples). Supports json encoding only. Can transparently choose using WebSockets or Http-LongPoll or plain Http (no server push then).
  • Android / 1.7 compatible kontraktor-bare client library. Pure client, does not support definition and publishing of actors, kontraktor is not included as a dependency.

Kontraktor's distributed Actor model

Kontraktor implements an asynchronous Actor / Eventloop model similar to what is known from e.g. Node.js and many network centric libraries. This model has huge scaling and performance advantages over traditional synchronous thread-blocking approaches, especially when dealing with cascaded service calls (service calls service calls service) and high connection latency (e.g. cross data center, WAN).

Actor: A class having an asynchronous public API, sits behind a queue (=mailbox) and has an associated thread.

Example: A 'Naming Service' published over WebSockets, Http and TCP.

public class HelloActor extends Actor<HelloActor> {
    String myName = "Kevin";
    public void setMyName( String name ) {
        if ( ! "Rüdiger".equals(name) )
            myName = name;
    }
    public IPromise<String> getMyName() {
        return new Promise<>(myName);
    }
    public void streamMyName( Callback<Character> channel ) {
        myName.chars().forEach( ch -> channel.stream( (char) ch) );
        channel.finish();
    }
    public static void main(String a[]) {
        HelloActor myService = AsActor(HelloActor.class);

        // as websocket service fast serialialized
        new WebSocketPublisher()
            .facade(myService)
            .hostName("localhost")
            .urlPath("/hello")
            .port(8080)
            .serType(SerializerType.FSTSer)
            .publish();

        // as http long poll service, json encoding
        new HttpPublisher()
            .facade(myService)
            .hostName("localhost")
            .urlPath("/hellohttp")
            .port(8080)
            .serType(SerializerType.JsonNoRefPretty)
            .publish();

        // as tcp nio service, fast serialized
        new TCPNIOPublisher()
            .facade(myService)
            .port(6789)
            .serType(SerializerType.FSTSer)
            .publish();

        // there is also TCPPublisher - blocking IO, but highest throughput if few clients.
    }
}

Also displays the 3 supported asynchronous actor method signatures:

  • 'simple void'

    an abstraction for a fire-and-forget message (method parameter make up the message data)

  • 'returns IPromise'

    an abstraction for (asynchronous) request-response

  • 'void with callback in parameters'

    an abstraction for a stream of results.

For remoting, all arguments must be Serializable ofc.

Client side: (connects the HelloActor via websocket)

ConnectableActor connectable = 
    new WebSocketConnectable(HelloActor.class,"http://localhost:8080/hello")
        .serType(SerializerType.FSTSer);

HelloActor remote = (HelloActor) 
    connectable
      .connect(null /*disconnect handler*/)
      .await();
remote.getMyName().then( name -> {
    System.out.println(connectable.getClass().getSimpleName()+" "+name);
});

How to publish as server and connect as client

All implementors of ActorPublisher can be used to "publish" an Actor on the network.

ActorPublisher (org.nustaq.kontraktor.remoting.base)
  WebSocketPublisher (org.nustaq.kontraktor.remoting.websockets)
  TCPNIOPublisher (org.nustaq.kontraktor.remoting.tcp)
    TCPPublisher (org.nustaq.kontraktor.remoting.tcp)
  HttpPublisher (org.nustaq.kontraktor.remoting.http)

All implementors of ConnectableActor can be used to connect to a remote actor ("facade").

ConnectableActor (org.nustaq.kontraktor.remoting.base)
  TCPConnectable (org.nustaq.kontraktor.remoting.tcp)
  LocalConnectable (org.nustaq.kontraktor.remoting.base)
  HttpConnectable (org.nustaq.kontraktor.remoting.http)
  WebSocketConnectable (org.nustaq.kontraktor.remoting.websockets)

for http/websockets there is also Http4k class offering file-serving and automatic js script aggregation and minification

Remote References

Consider the following actor implementation making up a Service for GUI clients:

public class MyHttpApp extends Actor<MyHttpApp> {
    [...]
    public IPromise<MyHttpAppSession> login( String user, String pwd ) {
        Promise result = new Promise<>();
        if ( "admin".equals(user) ) {
            // deny access for admin's :)
            result.reject("Access denied");
        } else {
            MyHttpAppSession sess = AsActor(MyHttpAppSession.class,clientScheduler);
            sess.setThrowExWhenBlocked(true);
            sess.init( self() );
            result.resolve(sess);
            sessionCount++;
        }
        return result;
    }
    [...]

A client first connects to the "facade" actor MyHttpApp, then sends a login(u,p) message (aka calls login() on the remote reference after connecting).
If the authentication is successful, at server side a session actor is created (sharing a thread with other session actors) and a reference to the session actor is sent back.

The serialization layer of kontraktor (fast-serialization) detects the actor reference during encoding and automatically registers an id to later route messages targeted at the actor reference behind the scenes.

client code would look like this:

ConnectableActor connectable =
    new WebSocketConnectable(MyHttpApp.class, "ws://localhost:8080/ws")
        .serType(SerializerType.JsonNoRef);

connectable
    .connect( null ).then( (app,err) -> {
        myApp = (MyHttpApp) app;
        myApp.login("dummy", "dummy").then( (session, error) -> {
            System.out.println("** login complete: "+session+" **");
        });
    });

or less asyncy (using await, caveat!)

ConnectableActor connectable =
    new WebSocketConnectable(MyHttpApp.class, "ws://localhost:8080/ws")
        .serType(SerializerType.JsonNoRef);

MyHttpApp myApp = (MyHttpApp) connectable.connect(null).await(); 
MyHttpAppSession session = myApp.login("dummy", "dummy").await();
System.out.println("** login complete: "+session+" **");
// errors are transformed to exceptions by 'await()'

The client now holds two remote references: myApp and session.

using js4k.js typeless actor client code

Actor messages are sent using tell for fire and forget, ask for request/response, any javascript function passed as parameter to tell(),ask() is translated into a Callback at server side:

jsk.connect("ws://localhost:8080/ws","WS")
  .then( function( app, error ) {
    server = app;
    server.ask("login", "clientuser", "clientpwd")
      .then( function(mySession,err) {
        console.log("login successfull");
      }
  };

(all samples without error handling for simplicity)

Passing RemoteRef's to other Remote Actors

Consider three Actors running in different processes A,B,C. A has a network connection to B, B has a network connection to C. B then passes its C-remote-reference to A. A sends a message to it.

The message sent by A to its C-RemoteRef internally (transparent) is routed via the B actor then (peer to peer model).

public static class A extends Actor<A> {
    public void showChain( ConnectableActor b ) {
        B bref = (B) b.connect(null).await();
        C cref = bref.getC().await();
        String pok = cref.hello("POK").await();
        System.out.println("received "+pok);
    }
}

public static class B extends Actor<A> {
    C c;
    public void init(ConnectableActor connectable) {
        connectable.connect( null ).then( c -> this.c = (C) c);
    }
    public IPromise<C> getC() {
        return new Promise<>(c);
    }
}

public static class C extends Actor<A> {
    public IPromise<String> hello(String s) {
        return new Promise<>("Hello:"+s);
    }
}

public static void main(String[] args) throws InterruptedException {

    // though a,b,c run inside single process use remote refs to interconnect
    A a = Actors.AsActor(A.class);
    B b = Actors.AsActor(B.class);
    C c = Actors.AsActor(C.class);

    new TCPPublisher(a, 4001).publish();
    new TCPPublisher(b, 4002).publish();
    new TCPPublisher(c, 4003).publish();

    ConnectableActor cConnect = new TCPConnectable(C.class,"localhost",4003);
    ConnectableActor bConnect = new TCPConnectable(B.class,"localhost",4002);

    b.init(cConnect);
    Thread.sleep(500); // don't program like this, init should return promise ..

    a.showChain(bConnect);
}

Although this can be useful at times, its not recommended to use this as a default pattern. It will increase network traffic and disconnects/failures are harder to handle. Best is to also prefer a peer-to-peer service/actor interface design (also better decoupling).

If A and C live in the same network, its favourable to let B pass a ConnectableActor of C to A, so A could connect directly to C.

However if network infrastructure is such that A cannot directly connect to C, then ReferenceChaining is a transparent way to make B act as a gateway/proxy node providing access to outside remote actors (like C).

If actors run inside a single process, they can share ActorRef's without chaining effects of course.

Passing ConnectableActor's to an Actor's init() should be the preferred way to interconnect actors as there is also a 'LocalConnectable` implementation if one want's to run everything in a single process. When designing your app like this, its easy to determine level of system distribution by app startup configuration.

##Limits of automatism

Its very important to understand both scheduling and remoting only scan and transform the signature of an async method. E.g. if a Callback or Promise is passed as an argument from one Actor to another, kontraktor automatically will create a wrapper such that callback invocations done in a foreign thread will result in an event on the callback queue of a sender (thread safe callbacks).
Similar, if the same call is done on a remote reference, a remote forwarding wrapper will be generated such that invocation of a callback (or promise resolve) will we transmitted over the network to the receiver.

These transformations are not applied to embedded objects, as this would require kontraktor to iterate the full object graph of each parameter of an async method call (too slow). For remoting full traversal is performed anyway, so it could be done there, but its more important to have identical behaviour regardless wether an actor is remote or not.

Example:

public class OtherActor extends Actor<OtherActor> {
    public IPromise doSomething( Callback cb ) {
        cb.stream("A"); cb.stream("B"); cb.finish();
    }
}

public class SendingActor extends Actor<..> {

HashMap plainMap = new HashMap();      // <= This
Callback sampleCallback = (res,err) -> {
   // some code                           <= This
};
otherActor.doSomething( sampleCallback ).then( result -> {
   plainMap.put( "Yes", "you can" );   // and this run in the sender Actor's thread !
});

}

If OtherActor and SendingActor run on different machines, this will be transparent. Note that thread safe callbacks/promises are a unique feature of kontraktor. Though it might seem a mere detail, in practice this has big implications as accidental multithreading does not immediately fail. In most cases, related errors and data corruption show up under load/scale out conditions. Expensive production failures and long winded debug session put a big price tag on this one.

So now for the pitfall:

public class PitFall implements Serializable {
    public PitFall( Promise p, Callback cb ) { .... }
    Promise aPromise;
    Callback cb;
}

public class OtherActor extends Actor<OtherActor> {
    public void doSomething( Pitfall pitfall ) {
        pitfall.cb.stream("I'll come in a foreign thread"); 
        pitfall.cb.stream("I'll come in a foreign thread"); 
        pitfall.cb.finish();
        pitFall.aPromise.resolve("I'll come in a foreign thread");
    }
}

public class SendingActor extends Actor<..> {

HashMap plainMap = new HashMap();
Callback mycb = (res,err) -> plainMap.put( "No", "production killer" );
Promise myprom = res -> { .. };
otherActor.doSomething( new Pitfall(myprom, mycb) );

}

Implications for local (not remoted) execution:

As the instrumentation does not perform a deep scan (too expensive) of parameter objects, the callback's will be called in the thread of the receiver (e.g. OtherActor). So if callback code modifies internal actor state in SendingActor, synchronization is required as we have concurrent access to shared data now.

Implications for remoted execution:

The sample won't work out as expected. The Pitfall class will get serialized. If a callback is invoked, nothing (or an exception) will happen as the callbacks have not been replaced by "network forwarding" wrappers. Remote side is actually operating on a plain copy of the Pitfallinstance.

This limitation does not apply to Actor References. These can be embedded deep into an object graph and still remoting and thread redirection mechanics will work out as expected.

** Workarounds ? **

Its possible to break down parameters containing embedded async primitives in case to provide a clean and simple interface, however this is advanced stuff and requires some understanding of kontraktor's internal mechanics.

Outline: Mark doSomething( new Pitfall ) as @CallersideMethod so its executed synchronously/at remote. Then decompose the Pitfall instance to plain signatures e.g.

@CallersideMethod public void doSomething( Pitfall pitfall ) {
   // executed at receiver side, now decompose to primitives and
   // properly callback remote sender
   __doSomething( pitfall.cb, pitfall.aPromise );
}

public void __doSomething( Callback pitfallCB, IPromise pitfallPromise ) {
    // executed at sender side with proper remote forwarding  
}

an example can be found in kontrakor's (remoteable) implementation of the reactive streams spec. This can be tricky, so stick to simple actor method signatures by putting async primitves onto the an async method's signature. It's not a hard restriction in practice, but more a matter of reduced aestethics.

Another possibility would be the use of inThread( Callback ) .. just have to try this myself, in theory it should work out :).

Possible improvements in the future

Main reason for the restriction come from functional interfaces. If Callback could be made an abstract class, it could be recognized and handled by serialization for remote case, for local case thread switching transformations could be done dynamically. However lambda syntax cannot be applied to classes with a single abstract method. Maybe some advanced trickery with default method's can solve this for future releases of kontraktor.

Classpath and distributed deployment

As kontraktor uses (fast-)serialization for marshalling/unmarshalling, distributed actor systems are version coupled: Each Node requires all classes involved in communication (sent inbetween remoted actors) in its classpath.

In order to update a single node only, care has to be taken not to change classes which are part of the protocol.

Popular belief is, that this is a serialization specific problem, however if one thinks about it:

Systems communicating need an agreement about the structure of data they exchange.

If binary formats such as protocol buffers are used, one needs to share the message definition files, for json its common to have the agreement "in the code" (probably the worst option longterm).

Technical background

Kontraktor's remoting works via reflection, its possible to use kind of ducktyping. As long a method can be found on receiver side, it will be invoked.
This way its possible to add methods (=messages) to an Actors interface without having to update existing remote deployments. However classes part of parameters in an actors public remote method need stay compatible.

Options

So here are the options to decouple kontraktor services protocol/classpath/versioning wise:

  1. Connect via kontraktor-bare or js4k.js. Applicable for client/server patterns: clients talk to the service actor using json.
    For server side system-node interconnect, Json probably is too slow, in addition one just moves the interface agreement "into the code" as it is expecting certain json structures to receive.
  2. Separate remoted classes from other application code. E.g. define a protocol package + .jar artifact containing all pojos and classes part of remoted actor interfaces. Only the protocol .jar needs to be shared and kept stable accross nodes. See https://github.com/RuedigerMoeller/kontraktor/blob/trunk/examples/non-shared-classpath/common-interface/src/main/java/service/common for an example. This creates quite some boilerplating overhead, so ensure total version and service decoupling its actually a required feature.
  3. Recommended: Accept the constraint and keep your code organized (e.g. use extra package for remoted classes). Its possible to add new methods to an actor interface without breaking compatibility, you can change the implementation of existing ones. In case datastructures have to be changed, consider creating new classes e.g. x.y.protocol.v2.MyClass extends x.y.protocol.v1.MyClass.
    Sufficient to apply fixes and minor enhancements without redeployment of all nodes. Use major releases to do breaking changes.
  4. Simplest: Always redeploy all nodes in case of software changes. Requires a short downtime or can be managed seamless with some reverse proxy (for webapps) trickery.

Of course these approaches can be mixed depending on the role of an actor.

Design advice

If you have used or read about other actor systems, notice that Kontraktor has a different approach.

  • Kontraktor favours a course grained Actor design. Its primary target is to partition applications into distributable/separatable async components. Think of a kontraktor Actor more like a Node.js instance or something like Dart's Isolates. You'll be astonished what can be done on a single thread with nonblocking programming/app design.

  • Replace threads by actors (not classes).

  • Frequently an app will consist of <10 Actors, often only 1.

  • There are patterns where creation of many actors (sharing a handful of threads) makes sense:

    • 'Sessions' in a server role actor having many clients (see HttpApp examples), each represented by an Actor instance.
    • Simulation alike applications.
    • need to reduce queue sizes (default is 32k) when running many actors

Reasoning:

  • Asynchronous programs can be hard to debug and understand, so limit async programming to the most rewarding use cases (remote messaging, bulkheading, simplify complex multithreading).
  • Fine grained Actor design creates significant performance overhead as many method calls are replaced by messages then.
  • Kontraktor aims for a deterministic threading model (messages are not scheduled on a pool or a similar automatic machinery). This is less ambitious in favour of simplicity and easier reasoning. It has advantages, e.g. promise callbacks are executed on the sending actor's thread (low risk of accidental concurrency). One can "close over actor state" safely. Execution is mostly single threaded resulting in excellent serial performance, without negative drawbacks to scalability/concurrent processing. Scale out is then done by intent, not automatic.

"Experimentation" with real world teams and applications made me choose this model (kontraktor 1.0 was simple, 2.0 had a complex autoscaling scheduler, with 3.0 is simple again).

Modern WebApps with JavaScript interop & Http4k

Example projects:

Simple (no html import support):

https://github.com/RuedigerMoeller/kontraktor/tree/trunk/examples/http-ws-javascript

SPA's using html import (shim) with polymer, knockout, jquery ..

https://github.com/RuedigerMoeller/kontraktor-polymer-example

https://github.com/RuedigerMoeller/kontraktor-spa-example

js4k.js

js4k implements an untyped actor interface to kontraktor services (actors). actorRef.tell sends a message fire-and-forget, actorRef.ask can be used for messages returning a Promise. Automatic callback routing works same as with kontraktor remoting.
Both WebSocket's and Http LongPolling is supported via a unified API.

With Kontraktor/Http4k/js4k "server push" is the default. Its always possible to send a message to a client session actor. Still it scales well, serving thousands of clients with a handful of threads is not an issue (non-blocking IO).

Java Objects are translated into json object structures. Vice versa there are helpers to construct json objects which are "understood" by the server. js4k does not support shared references in encoding, so the service actor has to be published with JsonNoRef encoding option.

Example of connecting a service (actor) obtaining a ActorReference to a session actorRef and doing some calls on them.

var server = null;
var errCB = function( err ) { console.error(err); };
//    jsk.connect("ws://localhost:8080/ws","WS",errCB) // use this for websockets
  jsk.connect("http://localhost:8080/api","HTLP",errCB) // use this for long poll
  .then( function( app, error ) {
    if ( ! app ) {
      console.log("connection failure");
      console.error(error);
    }
    server = app;

    server.ask("login", "clientuser", "clientpwd")
      .then( function(mySession,err) {
        if ( err )
          console.log(err);
        else {

          mySession.ask("getToDo").then( function( res,err ) {
            var li = "<ul>";
            for ( var i=0; i < res.length; i++ )
              li += "<li>"+res[i]+"</li>";
            li += "</ul>";
            document.getElementById("todo").innerHTML=li;
          });

          mySession.tell( "streamToDo", "p", function( res, err ) {
            if ('FIN' !== err )
              document.getElementById("filtered").innerHTML += "'"+res+"',&nbsp;";
          });

          // subscribe to server time stream (push)
          mySession.tell( "subscribe", function(e,r) {
            var tim = document.getElementById("time");
            tim.innerHTML=e;
          })
        }
    })
});

server side Main Actor (stateless server facade):

public class MyHttpApp extends Actor<MyHttpApp> {
    [...]
    public IPromise<MyHttpAppSession> login( String user, String pwd ) {
        Promise result = new Promise<>();
        if ( "admin".equals(user) ) {
            // deny access for admin's
            result.reject("Access denied");
        } else {
            MyHttpAppSession sess = AsActor(MyHttpAppSession.class,clientSched);
            sess.setThrowExWhenBlocked(true);
            sess.init( self() );
            result.resolve(sess);
        }
        return result;
    }
}

server side Session Actor (stateful, one instance per client):

public class MyHttpAppSession extends Actor<MyHttpAppSession> implements RemotedActor {
    [...]
    public IPromise<ArrayList<String>> getToDo() {
        return resolve(toDo);
    }
    public void streamToDo( String filter, Callback cb ) {
        toDo.forEach( item -> {
            if ( item.indexOf(filter) >= 0 ) {
                cb.stream(item);
            }
        });
        cb.finish();
    }
    public void subscribe( Callback cb ) {
        subscription = cb;
    }
    public void unsubscribe() {
        if ( subscription != null ) {
            subscription.finish();
            subscription = null;
        }
    }
    public void pushEventLoop() {
        if ( ! isStopped() ) {
            if ( subscription != null ) {
                subscription.stream(new Date().toString()+", "+getScheduler().getNumActors()+" Session Actors");
            }
            delayed( 2000, () -> pushEventLoop() );
        }
    }
}

Note how the JavaScript callback functions / promises are seamlessly transformed to server side callbacks / promises.

Builder style setup of server, simple and clean:

// create server actor
MyHttpApp myHttpApp = AsActor(MyHttpApp.class);

Http4K.Build("localhost", 8080)
    .fileRoot("/", root)
    .fileRoot("/jsk", jsroot)
    .httpAPI("/api", myHttpApp)
        .serType(SerializerType.JsonNoRef)
        .setSessionTimeout(30_000)
        .build()
    .websocket("/ws", myHttpApp)
        .serType(SerializerType.JsonNoRef)
        .build()
    .build();

encoding Pojo's such that Java understands them

FST JSon serialization serializes objects to json in a deterministic way. While this is convenient on the java side, it can be somewhat clumsy for javascript sclients. Therefore js4k simplifies json objects by flattening them and putting a _type property to denote the original java type.

Naturally, when talking to the server, its necessary to create json structures which are understood by the Java server, js4k has some utils for that:

  /**
   * create wrapper object to make given list a valid fst-json Java array
   */
  jsk.prototype.buildJArray = function( type, list )

 /**
   *
   * build java list style collection
   * Does not work for java Map's
   *
   * @param type - 'list' for ArrayList, 'set' for HashSet, class name for subclassed
   * @param list - list of properly structured (for java) objects
   */
  jsk.prototype.buildJColl = function( type, list )

  /**
   * builds a java hashmap from array like '[ key, val, key, val ]'
   *
   * @param type - "map" or class name if subclassed map is used
   * @param list - of key, val, key1, val1 
   */
  jsk.prototype.buildJMap = function( type, list )

  /**
   * create wrapper object to make given list a valid fst-json Java Object for sending
   * type - java type
   */
  jsk.prototype.buildJObject = function( type, obj ) 

Note that full qualified java class names can be avoided if registering a class mapping at the Coding instance configured in the Http4k builder.

see also the node.js example application (npm install js4k) on how to connect from node.js to a java actors via websockets.

Misc js4k functions

connect(wsurl, connectionMode, optErrorcallback)

Connects a WebSocket or Http Long Poll Kontraktor actor (mode='WS' | 'HTLP' | 'HTTP'). See js4k source documentation for details.

class jsk.Promise

minimalistic js promise implementation. Note that result and error are delivered into the then function.

class jsk.KontrActor

the actor proxy implementation. There is no need to create those. Usually on obtains a proxy for the facade actor from the connect method. Other references to remote actors are then obtained by doing calls onto the facade. js4k automatically detects remote actor references and creates a proxy object on the fly.

methods:

jsk.KontrActor.tell(methodName, arg0, arg1, .. )

calls a fire-and-forget method on to the actor ref. Note in case signatures contain more complex objects (pojos, java collections), the arguments should have correct structure (see helpers above) to be decodable at server side. I recommend to stick to simple, plain types as much as possible to avoid trouble.

jsk.KontrActor.ask(methodName, arg0, arg1, .. )

call an actor method returning a Promise. The java Promise is automatically transformed into a js4k.Promise (see samples above).

if a function(res,err) {..} is put onto an argument list, it will be transformed to a Kontraktor Callback instance at server side.

Polymer WebComponents and server side Html Import shim

Dependency resolution of JavaScript libraries can be really painful and tends to end up in mess and redundancy (source version, minified version, aggregates of different .js libs). Many copies of the same libraries.

Kontraktor Http4k addresses that:

  • provides kind of a (server side) classpath for html resources (js + html snippets + css)
  • during development use Html Imports (devmode). Currently test+debug best with Chrome
  • in production mode, Kontraktor Http4k inlines, aggregates and minifies html import statements + scripts automatically resulting in a very low number of http requests and size. This also lets your webapp work in browsers lacking html import support.

Eric Bidelman has some pretty strong arguments for using html imports as dependency mechanism + some in depth explanations

Resource Path

The resource path is similar to java's classpath, however kontraktors http4k uses it to locate html import resources. Note that you can expose more than one resource path on different url roots.

Detail documentation pending, check examples for now

https://github.com/RuedigerMoeller/kontraktor-polymer-example

https://github.com/RuedigerMoeller/kontraktor-spa-example

DevMode

in devmode, http4k does not resolve + cache html imports, so changes in files are reflected immediately. Scripts are not aggregated + minified as its hard to debug inside a huge file containing all your javascript source.

ProdMode

In prodmode, html imports are inlined server side. The result is cached in-memory. JavaScripts minified, all files are compressed. Its unnecessary this way to maintain debug and .min.js versions of files, in addition compressing one big file is more efficient. Even large code bases using many libs reduce to some 100-300kb and are loaded with a single request.

File serving

A js server without the ability to serve files isn't that useful, so there is shorthand API (mostly a wrap of Undertow API). In general its possible to directly access and configure Undertow instances if more specific setups are needed, it just does not make sense to build yet another wrapping layer.

Logging

Kontraktor does not bring its own logger. It asynchronously logs to sysout by default (Log actor). Plug in your favourite logging api by setting Log.Lg.setLogWrapper(yourLogWrapper) at app startup.

public static interface LogWrapper {
    public void msg(Thread t, int level, Object source, Throwable ex, String msg);
}

What about configuration files ?

There are none. Everything is configured using builder style API. Imo it does not make sense to provide a huge configuration layer allowing to modify everything and the kitchen sink. You as an application developer want to control what to figure out automatically, what can be changed and what is untouchable for operation engineers.

It's a matter of minutes to create KSon config support from a (strongly typed) corresponding class. see: https://github.com/RuedigerMoeller/fast-serialization/wiki/KSon

Security

unremotable Actor methods

Actor methods annotated with "@Local" cannot be called from remote. This is implemented at receiver side in central dispatch, so there is no way of bypassing this from remote.

SSL

(Server Side) SSL-Encryption is supported for Http and WebSocket Remoting. Client Certificate support can be added as the underlying Undertow Webserver currently used actually supports this. Subclassing the Http4k enables arbitrary tweaking of the Undertow configuration and setup used.

In general its recommended to use a reverse proxy such HAProxy or NginX in order to shield your webapps. This is also the easiest way to add SSL encryption.