Skip to content

Reactor tutorial | reactively handling async calls with QBit Reactive Microservices

Richard Hightower edited this page Sep 9, 2015 · 17 revisions

This tutorial covers mastering the concepts behind async / reactive programming with the QBit Microservices Lib. Since QBit embraces the Microservices Architecture ethos, it embraces being async and allowing for services to reactively handle async calls. This is what this tutorial covers.

-- -example code- -qbit docs-

Mammatus Tech

QBit reactive programming with the Reactor

The problem with microservices

Distributed system are complex.

"Microservices imply a distributed system." (Microservices - Not A Free Lunch!)

And this means to scale we need asynchronous calls.

"Where before we might have had a method call acting as a subsystem boundary, we now introduce lots of remote procedure calls, REST APIs or messaging to glue components together across different processes and servers." (Microservices - Not A Free Lunch!)

This introduces all sorts of coordination issues.

"Once we have distributed a system, we have to consider a whole host of concerns that we didn't before. Network latency, fault tolerance, message serialisation, unreliable networks, asynchronicity, versioning, varying loads within our application tiers etc." (Microservices - Not A Free Lunch!.)

Coordinating asynchronous calls is difficult.

"However, when things have to happen synchronously or transactionally in an inherently Asynchronous architecture, things get complex with us needing to manage correlation IDs and distributed transactions to tie various actions together." again according to according to Microservices - Not A Free Lunch!.

You can't really escape asynchronous calls as synchronous calls considered harmful according to the Microservices paper by Martin Fowler et al, which we discuss in some more detail below.

"Any time you have a number of synchronous calls between services you will encounter the multiplicative effect of downtime. Simply, this is when the downtime of your system becomes the product of the downtimes of the individual components. You face a choice, making your calls asynchronous or managing the downtime." (Microservices paper by Martin Fowler et al).

QBit provides support for coordinating asynchronous calls and avoiding cascading failures.

If you see a tutorial that makes a lot of synchronous calls and does not discuss eliminating cascading failures and the need for asynchronous programming, realize you are not really reading a microserivce tutorial, you are reading a tutorial most likely on how to expose your classes as REST. Microservices is more. This is a microservices tutorial.

Reactive Microservices Background

One of the key tenets of a microservices architecture is the ability to be asynchronous. This is important because you want to make the best use of your hardware. There is little point in starting up thousands of threads that are waiting on IO. Instead you can have fewer CPU threads and use a async model.

An asynchronous programming model is not in and of itself a reactive programming model. You need asynchronous model before you can have a truly reactive model. In order to have a reactive model, you need to be able to coordinate asynchronous calls.

Imagine you have three services. One of the services is client facing. By client facing, we mean public web or for internal app, it is the end point that the client app talks to. Let's call this client facing service Service A.

For example, let’s say Service A performs an operation on behalf of the client, and this operation needs to call Service B, and after it calls Service B, it needs to take the result of Service B and call Service C. And then Service A takes the combined results a Service B and Service C and returns those back to the client. These of course are all nonblocking asynchronous calls.

Let’s summarize the Client calls a method on Service A. Service A calls a method on Service B,. Then when result from Service B method invocation comes back, Service A then calls a method on Service C, passing results from Service B as a parameter to the call to Service C. The combined results from Service B and Service C are then processed by Service A and then finally Service A passes the response that depended on calls to Service B and Service C to the Client.

The reactive nature comes into play in that we need to coordinate the call to Service C to happen after the call to Service B. And we need to maintain enough with the context from the original call to return results to the original client. At this point we are still mostly talking about an asynchronous system not really a reactive system per se. There are language constructs and Java to capture the context of the call either lambda expression or an anonymous class.

Where a reactive system start to come into play is what happens if Service B or Service C takes too long to respond. Or if the total operation of Service A takes too long to respond. You need to have a way to detect when asynchronous call do not come back in allotted period of time. If you do not have this, the client can continue to hold onto the connection that is not responding and there is hardware limitations to how many open connections you can. Now let's say if the Client is not a client app but rather another service that is calling Service A. You do not want a rolling back up of waiting connections if a downstream service like Service B stopped responding. The ability to handle a non-responsive system is what makes a reactive system reactive. The system has to be able to react to things like timeouts or downstream failures.

Now the call sequence that was just described is a fairly simple one. A more complicated call sequence might involved many downstream services and perhaps calls that rely on calls that rely on calls that then decide which other calls to make. It might make sense to have some service internal cache that can cache results of the calls and coordinate a filtered response based on N number of calls. However complex the call sequences the basic principle that you can't leave the client hanging still applies. At some point one has to determine that the call sequence is not going to be successful and at that point a response even if it's an error response must return to client. The main mission of reactive system is to not have a cascading failure.

Again, the main mission of reactive system is to not have a cascading failure.

In the case of the cache, which doesn't have to be a real cache at all one may need a mechanism to purge this cache and/or keep the cache warmed up. Perhaps instead of making frequent calls to Service B, Service A can be notified by Service B via an event that item of interest has been changed and Service A can ask ahead a time for the things it needs from serves be before the client asked for them.

Things like async call coordination, handling async call timeouts, coordinating complex async calls, and populating caches based on events, and having periodic jobs to manage real-time stats, cache eviction, and complex call coordination is needed. A system that provides these things is a reactive system. In QBit the main interface to this reactive system is the Reactor.

QBit Background

QBit is a service-oriented, reactive, micro-service library. QBit revolves around having a Java idiomatic service architecture. Your services are Java classes. These services are guaranteed to only be called by one thread at a time. Since there are strong guarantees for thread safety your services can maintain state. Since your services can maintain state then they can do things like keeping internal cache where the internal cash might just be a tree map or a hash map or your own data structure. Stateful services can also do things like reports statistics since they can easily have counters.

If you have a CPU intensive service that needs to maintain state, QBit allows you too shard services in the same JVM. There are built-in shard rules to shard based on method call arguments and you can create your own shard rules easily.

Data safety can be accomplished through using tools like Cassandra, Kafka or by simply having services that replicate to another service peer. You can set the service up so it does not mutate its internal state, until a call to Kaka, or an update to Cassandra or call to a replica succeeds. The calls to the replica or async store or transactional message bus will be asynchronous calls.

QBit enables the development of in-memory services, IO bound services, or both running in the same JVM, etc. QBit provides a cluster event bus (using idiomatic Java, i.e., interfaces, classes), a fast batched queuing system based on streams of calls, shared services, round-robin services, as well as exposing services via REST/JSON or WebSocket (pluggable remoting) not to mention a ServiceDiscovery mechnism so services can find peers for replication. QBit services can automatically be enrolled in the QBit health system or the QBit stats system. QBit provides a HealthService, ServiceDiscovery, EventService and a StatService. The StatService can be integrated with StatsD to publish passive stats. Or you can query the stats engine and react to the stats (counts, timings and levels). The StatsService is a reactive stats system that can be clustered. The StatService is reactive in that your services can publish to it and query it and react based on the results. You can implement things like rate limiting and react to an increased rate of something. The ServiceDiscovery system integrates with the HealthSystem and Consul to roll up each of your internal services that make up you micro service and publish the composite availably of your micro service to a single HTTP endpoint or a dead mans switch in Consul (TTL). In short without going into a ton of detail, QBit fully embraces microservices. Down to even publishing the REST interfaces as swagger meta-data to enable API-gateways.

Whether QBIt is calling another async service or calling another QBit async service (remote or local) or is using a pool of services to call a blocking IO service one thing is clear, you need async call coordination.

QBit Reactor to reactively manage async microserivce calls

First and foremost, the Reactor ensures that async calls come in on the same thread as the method calls and event publication that the ServiceQueue already handles not a foriegn thread so the callback handlers are thread safe. The Reactor is more or less a utility class to manage async calls and periodic jobs.

The Reactor works in concert with a ServiceQueue to manage async calls and schedule periodic jobs. Recall that events and method calls that come through a ServiceQueue are guaranteed to come in on the same thread. The ServiceQueue based service is inherently thread safe. This is not a new idea DCOM supported this with active objects and apartment model threading, Akka supports this same concept with typed Actors and the LMAX architecture for trading uses the same principle (although souped up and highly optimized for high-speed trading). As it turns out, CPUs are fairly fast, and you can do a lot of operations per second on a single thread, quite a bit more than often the IO hardware card can handle.

Thus if both events and method calls come in on the same thread, what happens when we call into another service or use a library that has a callback or some sort of async future. The callback or async future will come back on a foreign thread. We need a way to get that callback to come back on the same thread as the ServiceQueue. This is where the Reactor comes into play. The Reactor ensures that callbacks happen on the same thread as the Service running in a ServiceQueue.

If you adopt the QBit model, you embrace the fact that services can be stateful, even if the state is only counters and caches. You are in effect embracing in-memory services. This does not force you to manage state in a Java class, but it allows you to manage state and makes things like counters and stats collection chlid’s play.

The missing link is managing callbacks so that they also come back on the same thread as the ServiceQueue. The Reactor allows callbacks to be handled like events and method calls.

Reactor to manage async calls

package io.advantageous.qbit.reactive;

…
public class Reactor {

   
    /** Add an object that is auto flushed.
     *
     * @param serviceObject as service object that will be auto-flushed.
     */
    public void addServiceToFlush(final Object serviceObject) {
       ….
    }

    /** Add a task that gets repeated.
     *
     * @param repeatEvery repeat Every time period
     * @param timeUnit unit for repeatEvery
     * @param task task to perform
     */
    public void addRepeatingTask(final long repeatEvery, final TimeUnit timeUnit, 
                                                     final Runnable task) {

       …
   }

   public CallbackBuilder callbackBuilder() {
        return CallbackBuilder.callbackBuilder(this);
    }

    public CoordinatorBuilder coordinatorBuilder() {
        return CoordinatorBuilder.coordinatorBuilder(this);
    }

…

   public process() {
       …
   }

}

You do not always need to create a callback via the Reactor. However, if you want to mutate the state of the Service based on a ServiceQueue, you will want to use a Reactor. Also the Reactor makes it convenient to have callbacks with timeouts. Those are the two use cases for the Reactor. You want to enforce a timeout or you want to ensure that the callback executes on the same thread as the method calls and events so that the access to member variables of the service are thread safe. The Reactor is stateful and meant to be owned by a single service actor (ServiceQueue). You must call the Reactors process method periodically, and it is usually a good idea to do this in a @QueueCallback handler that handles limit (batch size limit has been met for our method/event queue), idle (our method/event queue is doing nothing), and empty (our queue method/event queue is empty). Calling the Reactors process method gives the reactor a chance to drain callback handlers, and invoke any repeating jobs (as well as flush any collaborating service calls).

HRService and DepartmentRepo example using Reactor

Let’s create a small example to show how it all ties in.

We have the following components and classes and interfaces:

  • HRService (Human resources service) that is exposed via REST
  • DepartmentRepo which stores departments in a long term storage Department a department object
  • DepartmentRepoAsync which is the async interface to DepartmentRepo
  • Reactor which coordinates calls to DepartmentRepo
  • HRServiceMain which constructs the servers and services queues (wiring)

Let’s look at HRService. HRService (Human Resource Service) is a s Service that is running on a ServiceQueue thread.

HRService

/** This is the public REST interface to the Human Resources services.
 *
 */
@RequestMapping("/hr")
public class HRService {

    private final Map<Integer, Department> departmentMap 
                               = new HashMap<>();

    private final Reactor reactor;
    private final DepartmentRepoAsync departmentRepoAsync;

    /**
     * Construct a new HR REST Service.
     * @param reactor reactor
     * @param departmentRepoAsync async interface to DepartmentStore
     */
    public HRService(final Reactor reactor, 
                                   final DepartmentRepoAsync departmentRepoAsync) {
        this.reactor = reactor;
        this.reactor.addServiceToFlush(departmentRepoAsync);
        this.departmentRepoAsync = departmentRepoAsync;
    }

    /**
     * Add a new department
     * @param callback callback
     * @param departmentId department id
     * @param department department
     */
    @RequestMapping(value = "/department/{departmentId}/", 
                                    method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> callback, 
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department) {

        final Callback<Boolean> repoCallback = reactor.callbackBuilder()
                .setCallback(Boolean.class, succeeded -> {
                    departmentMap.put(departmentId, department);
                    callback.accept(succeeded);
                }).build();

        //TODO improve this to handle timeout and error handling.
        departmentRepoAsync.addDepartment(repoCallback, department);

    }

    /** Register to be notified when the service queue is idle, empty, 
          or has hit its batch limit.
     */
    @QueueCallback({QueueCallbackType.EMPTY, 
               QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    private void process () {

        /** Call the reactor to process callbacks. */
        reactor.process();
    }

To use the Reactor, you must do the following, 1) register collaborating services with addServiceToFlush, call the reactor’s process method from a @QueueCallback method of the service that registers for idle, empty and limit notification. The Reactor's process method will handle registered coordinators, repeating jobs, collaborating service queue flushes, and callback timeouts & callbacks running on the same thread as the service queue. Now every time we make a call to our collaborating service we will use the callback builder from the reactor (reactor.callbackBuilder) so the reactor can manage the callback and if it times out. Let's break this down.

First we register the collaborating services with addServiceToFlush.

register collaborating services with addServiceToFlush

public HRService(final Reactor reactor, 
                                final DepartmentRepoAsync departmentRepoAsync) {
       ...
        this.reactor.addServiceToFlush(departmentRepoAsync);

Next we call the reactor’s process method from a @QueueCallback method that registers for idle, empty and limit notification.

call the reactor’s process from a @QueueCallback method

    /** Register to be notified when the service queue is 
      idle, empty, or has hit its batch limit.
     */
    @QueueCallback({QueueCallbackType.EMPTY, 
               QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    private void process () {

        /** Call the reactor to process callbacks. */
        reactor.process();
    }

This literally means if the queue is idle or empty or we reached the batch size limit, then run the reactor process method. This works for most use cases, but you could opt to call reactor.process after some other important event or after X number of calls to a certain method. The reactor process method is where it manages the service flushes, callbacks, periodic jobs, etc.

DepartmentRepo which stores departments in a long term storage for now is just a simple class to keep the discussion moving forward.

DepartmentRepo which stores departments

package com.mammatustech.hr;

import java.util.HashMap;
import java.util.Map;

/**
 * Represents a storage repo. Imagine this is talking to MongoDB or
 * Cassandra. Perhaps it is also indexing the department name via
 * SOLR. It does all of this and then returns when it is finished.
 * If this in turn called other services, it would use a Callback instead of
 * returning a boolean.
 */
public class DepartmentRepo {

    private final Map<Long, Department> departmentMap = new HashMap<>();


    /**
     * Add a department.
     * @param department department we are adding.
     * @return true if successfully stored the department
     */
    public boolean addDepartment(final Department department) {

        departmentMap.put(department.getId(), department);
        return true;
    }
}

For now imagine it writing to database or Cassandra or LevelDB or something.

Since this is such a simple version, we don’t even need a Callback, but we do need one when we call it. (Later we will coordinate multiple calls).

DepartmentRepoAsync which is the async interface to DepartmentRepo so it allows async access even though, it does not technically need it yet.

DepartmentRepoAsync which is the async interface to DepartmentRepo

package com.mammatustech.hr;


import io.advantageous.qbit.reactive.Callback;

/**
 * Async interface to DepartmentRepo internal service.
 *
 */
public interface DepartmentRepoAsync {

    /**
     * Add a department to the repo.
     * @param callback callback which returns the success code async.
     * @param department department to add
     */
     void addDepartment(final Callback<Boolean> callback,
                        final Department department);

}

There is nothing special about the Department object.

Department Object

package com.mammatustech.hr;

import java.util.ArrayList;
import java.util.List;

public class Department {

    private final long id;
    private final String name;
    private final List<Employee> employeeList;

    public Department(long id, String name, List<Employee> employeeList) {
        this.id = id;
        this.name = name;
        this.employeeList = employeeList;
    }

    public void addEmployee(Employee employee) {
        employeeList.add(employee);
    }

    public List<Employee> getEmployeeList() {
        return new ArrayList<>(employeeList);
    }

    public long getId() {
        return id;
    }
}

HRServiceMain constructs the servers and services queues and starts them up. It is the bootstrap class.

HRServiceMain wires up DepartmentRepo and HRService

/**
 * Default port for admin is 7777.
 * Default port for main endpoint is 8080.
 *
 * <pre>
 * <code>
 *
 *     Access the service:
 *
 *    $ curl http://localhost:8888/v1/...
 *
 *
 *     To see swagger file for this service:
 *
 *    $ curl http://localhost:7777/__admin/meta/
 *
 *     To see health for this service:
 *
 *    $ curl http://localhost:8888/__health
 *     Returns "ok" if all registered health systems are healthy.
 *
 *     OR if same port endpoint health is disabled then:
 *
 *    $ curl http://localhost:7777/__admin/ok
 *     Returns "true" if all registered health systems are healthy.
 *
 *
 *     A node is a service, service bundle, queue, or server endpoint that is being monitored.
 *
 *     List all service nodes or endpoints
 *
 *    $ curl http://localhost:7777/__admin/all-nodes/
 *
 *
 *      List healthy nodes by name:
 *
 *    $ curl http://localhost:7777/__admin/healthy-nodes/
 *
 *      List complete node information:
 *
 *    $ curl http://localhost:7777/__admin/load-nodes/
 *
 *
 *      Show service stats and metrics
 *
 *    $ curl http://localhost:8888/__stats/instance
 * </code>
 * </pre>
 */
public class HRServiceMain {

    public static void main(final String... args) throws Exception {

        /* Create the ManagedServiceBuilder which 
               manages a clean shutdown, health, stats, etc. */
        final ManagedServiceBuilder managedServiceBuilder =
                ManagedServiceBuilder.managedServiceBuilder()
                        .setRootURI("/v1") //Defaults to services
                        .setPort(8888); //Defaults to 8080 or environment variable PORT


        /* Build the reactor. */
        final Reactor reactor = ReactorBuilder.reactorBuilder()
                                .setDefaultTimeOut(1)
                                .setTimeUnit(TimeUnit.SECONDS)
                                .build();


        /* Build the service queue for DepartmentRepo. */
        final ServiceQueue departmentRepoServiceQueue = 
                            managedServiceBuilder
                                 .createServiceBuilderForServiceObject(
                                         new DepartmentRepo()).build();

        departmentRepoServiceQueue
                 .startServiceQueue()
                 .startCallBackHandler();
        
        /* Build the remote interface for department repo. */
        final DepartmentRepoAsync departmentRepoAsync =
                          departmentRepoServiceQueue
                              .createProxy(DepartmentRepoAsync.class);



        /* Start the service. */
        managedServiceBuilder.addEndpointService(
               new HRService(reactor, departmentRepoAsync)) //Register HRService
                .getEndpointServerBuilder()
                .build().startServer();

        /* Start the admin builder which exposes health 
                     end-points and swagger meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("HR Server and Admin Server started");

    }
}

You can run this example by going to Reactor Example on github. There is even a REST client generated with swagger to exercise this example HRService client generated with Swagger.

Thus far we have only handled making the callback from DepartmentRepo happen on the same thread as the ServiceQueue of HRService. We have not really handled the timeout case.

To handle the timeout case, we need to handle the onTimeOut handler. Essentially we need to register an onTimeOut with the callbackBuilder as follows.

Registering an onTimeOut to handle timeouts

    @RequestMapping(value = "/department/{departmentId}/", 
             method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> callback, 
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department) {


        final Callback<Boolean> repoCallback = reactor.callbackBuilder()
                .setCallback(Boolean.class, succeeded -> {
                    departmentMap.put(departmentId, department);
                    callback.accept(succeeded);
                }).setOnTimeout(() -> { //handle onTimeout
                    //callback.accept(false); // one way

                    // callback.onTimeout();  //another way
                    /* The best way. */
                    callback.onError(
                            new TimeoutException("Timeout can't add department " + departmentId));
                }).setOnError(error -> { //handle error handler
                    callback.onError(error);
                }).build();

        departmentRepoAsync.addDepartment(repoCallback, department);

Notice that now we handle not only the callback, but we handle if there was a timeout. You could just return false by calling callback.accept(false) but since a timeout is an exceptional case, we opted to create an Exception and pass it to the callback.onError(…). The other option is call the default onTimeout handler, but by using onError to report the timeout, we are able to pass some additional context information about the timeout.

In addition to handling the timeout, we handle the error handler case. If we don’t handle the timeout and the error handler if their is a timeout or an error then the REST client will hold on to the connection until the HTTP connection times out. We don’t want the client to hold on to the connection for a long time as that could lead to a cascading failure if a downstream service fails while upstream services or clients hold on to connections waiting for their HTTP connections to timeout. Bottom line, handle timeouts and errors by sending a response to the client (even if the client is only an upstream service). Don’t let the client hang. Prevent cascading failures.

Coordinating multiple calls

The code for this can be found at this branch..

Let's take this a step further. Let's say that instead of calling one service when addDepartment gets called, that we call three services: AuthService, DepartmentCassandraRepo and DepartmentSolrIndexer. First we want the HRService to call the AuthService to see if the user identified by userName is authorized to add a department. The doAddDepartment gets called if auth succeeds. Remember this is merely an example to show what async call coordination looks like. Then the doAddDepartment calls the DepartmentCassandraRepo repo to store the deparment and if it successful it stores the department in the department cache (departmentMap), notifies the clientCallback, and then call DepartmentSolrIndexer to index the department so that it is searchable.

AuthServiceImpl

 package com.mammatustech.hr;

import io.advantageous.qbit.reactive.Callback;

public interface AuthService {

    void allowedToAddDepartment(Callback<Boolean> callback,
                                String username,
                                int departmentId);

}
...
package com.mammatustech.hr;

import io.advantageous.qbit.reactive.Callback;

public class AuthServiceImpl implements AuthService {

    public void allowedToAddDepartment(final Callback<Boolean> callback,
                                       final String username,
                                       final int departmentId) {

...
    }

}

DepartmentCassandraRepo to store departments

package com.mammatustech.hr;

import io.advantageous.boon.core.Sys;

import java.util.HashMap;
import java.util.Map;

/**
 * Represents a storage repo. Imagine this is talking to 
 * Cassandra. 
 */
public class DepartmentCassandraRepo {
...


    /**
     * Add a department.
     * @param department department we are adding.
     * @return true if successfully stored the department
     */
    public void addDepartment(final Callback<Boolean> callback, 
                        final Department department) {
         ...
    }
}

DepartmentSolrIndexer to index departments

package com.mammatustech.hr;

import io.advantageous.boon.core.Sys;

import java.util.HashMap;
import java.util.Map;

/**
 * Represents a SOLR indexer. Imagine this is talking to 
 * SOLR. 
 */
public class DepartmentSolrIndexer {
...


    /**
     * Add a department.
     * @param department department we are adding.
     * @return true if successfully stored the department
     */
    public void addDepartment(final Callback<Boolean> callback, 
                        final Department department) {
         ...
    }
}

HRService REST interface

/** This is the public REST interface to the Human Resources services.
 *
 */
@RequestMapping("/hr")
public class HRService {

    private final Map<Integer, Department> departmentMap = 
                                                  new HashMap<>();

    private final Reactor reactor;
    private final DepartmentRepoAsync solrIndexer;
    private final DepartmentRepoAsync cassandraStore;
    private final AuthService authService;

    /**
     * Construct a new HR REST Service.
     * @param reactor reactor
     * @param cassandraStore async interface to DepartmentStore
     * @param solrIndexer async interface to SOLR Service
     */
    public HRService(final Reactor reactor,
                     final DepartmentRepoAsync cassandraStore,
                     final DepartmentRepoAsync solrIndexer,
                     final AuthService authService) {
        this.reactor = reactor;
        this.reactor.addServiceToFlush(cassandraStore);
        this.reactor.addServiceToFlush(solrIndexer);
        this.reactor.addServiceToFlush(authService);
        this.cassandraStore = cassandraStore;
        this.solrIndexer = solrIndexer;
        this.authService = authService;
    }

    /**
     * Add a new department
     * @param clientCallback callback
     * @param departmentId department id
     * @param department department
     */
    @RequestMapping(value = "/department/{departmentId}/", method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> clientCallback,
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department,
                              @HeaderParam(value="username", defaultValue = "noAuth")
                                  final String userName) {

        final CallbackBuilder callbackBuilder = reactor.callbackBuilder()
                .setOnTimeout(() -> {
                    clientCallback.onError(
                            new TimeoutException("Timeout can't add department " 
                                          + departmentId));
                }).setOnError(clientCallback::onError);


        authService.allowedToAddDepartment(callbackBuilder.setCallback(Boolean.class, allowed -> {
            if (allowed) {
                doAddDepartment(clientCallback, callbackBuilder, department);
            } else {
                clientCallback.onError(new SecurityException("Go away!"));
            }
        }).build(), userName,  departmentId);


    }

    private void doAddDepartment(final Callback<Boolean> clientCallback,
                                 final CallbackBuilder callbackBuilder,
                                 final Department department) {

        final Callback<Boolean> callbackDeptRepo = callbackBuilder.setCallback(Boolean.class, addedDepartment -> {

            departmentMap.put((int)department.getId(), department);
            clientCallback.accept(addedDepartment);

            solrIndexer.addDepartment(indexedOk -> {
            }, department);
        }).build();

        cassandraStore.addDepartment(callbackDeptRepo, department);

    }

    /** Register to be notified when the service queue is idle, empty, or has hit its batch limit.
     */
    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    private void process () {

        /** Call the reactor to process callbacks. */
        reactor.process();
    }

The key to this is the shared callback builder.

    /**
     * Add a new department
     * @param clientCallback callback
     * @param departmentId department id
     * @param department department
     */
    @RequestMapping(value = "/department/{departmentId}/", method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> clientCallback,
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department,
                              @HeaderParam(value="username", defaultValue = "noAuth")
                                  final String userName) {

        final CallbackBuilder callbackBuilder = reactor.callbackBuilder()
                .setOnTimeout(() -> {
                    clientCallback.onError(
                            new TimeoutException("Timeout can't add department " 
                                          + departmentId));
                }).setOnError(clientCallback::onError);

Notice how we break the methods down and functional decompose them so that things are easier to read, witness doAddDepartment and how it is called.

Breaking down callback handling

        authService.allowedToAddDepartment(callbackBuilder.setCallback(Boolean.class, allowed -> {
            if (allowed) {
                doAddDepartment(clientCallback, callbackBuilder, department);
            } else {
                clientCallback.onError(new SecurityException("Go away!"));
            }
        }).build(), userName,  departmentId);
...

    private void doAddDepartment(final Callback<Boolean> clientCallback,
                                 final CallbackBuilder callbackBuilder,
                                 final Department department) {

        final Callback<Boolean> callbackDeptRepo = callbackBuilder.setCallback(Boolean.class, addedDepartment -> {

            departmentMap.put((int)department.getId(), department);
            clientCallback.accept(addedDepartment);

            solrIndexer.addDepartment(indexedOk -> {
            }, department);
        }).build();

        cassandraStore.addDepartment(callbackDeptRepo, department);

    }

Callback builder specifying timeouts

The CallbackBuilder allows you to specify timeouts for calls.

Specifying timeouts per CallbackBuilder

        final CallbackBuilder callbackBuilder = reactor.callbackBuilder()
                .setOnTimeout(() -> {
                    clientCallback.onError(
                            new TimeoutException("Timeout can't add department " + departmentId));
                }).setOnError(clientCallback::onError)
                .setTimeoutDuration(200)
                .setTimeoutTimeUnit(TimeUnit.MILLISECONDS);

Working with repeating tasks

@RequestMapping("/hr")
public class HRService {
    ...
    /**
     * Construct a new HR REST Service.
     * @param reactor reactor
     * @param cassandraStore async interface to DepartmentStore
     * @param solrIndexer async interface to SOLR Service
     */
    public HRService(final Reactor reactor,
                     final DepartmentRepoAsync cassandraStore,
                     final DepartmentRepoAsync solrIndexer,
                     final AuthService authService) {
        ...
        this.reactor.addRepeatingTask(1, TimeUnit.SECONDS, () -> {
            manageCache();
        });
    }

Working with QBit and Async NoSQL (Cassandra et al)

Cassandra and many NoSQL solutions offer an async API as does QBit. Cassandra uses Google Guava. QBit uses QBit. :) You can easily cooridinate calls to Cassandra and other NoSQL solutions.

How do you combine them so you do not have to create a worker pool in QBit to make async calls to Cassandra?

Let's say you have a Cassandra service as follows.

Example Cassandra service

import com.datastax.driver.core.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.advantageous.qbit.annotation.*;

import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

import io.advantageous.qbit.reactive.Callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
...

public class CassandraService {


	private final Logger logger = LoggerFactory.getLogger(CassandraService.class);
	private final CassandraCluster cluster ;
	private final CassandraConfig config;
	private final Session session; //only one per keyspace,
	private final AtomicBoolean isConnected = new AtomicBoolean(false);
	/**
	 * Configure the client to connect to cluster
	 * @param config
	 */
	public CassandraService (final CassandraConfig config) {

            ...
	}




	public void executeAsync(final Callback<ResultSet> callback, final Statement stmt) {
		final ResultSetFuture future = this.session.executeAsync(stmt);

		Futures.addCallback(future, new FutureCallback<ResultSet>() {
			@Override
			public void onSuccess(ResultSet result) {
				callback.accept(result);
			}

			@Override
			public void onFailure(Throwable t) {
				callback.onError(t);
			}
		});

	}

This is just an example and not prescriptive how you should write your Cassandra lib. Kept simple to focus on QBit integration.

Note that Futures from Cassandra driver support comes from the Guava library from google. DataStax has a nice tutorial on using Cassandra async API with Guava.

In this example we have a service called EventStorageService which endeavors to store an event into Cassandra. Most of the plumbing and tables DDL for the Event have been omitted. This is not a Cassandra tutorial by any means.

Note that in the onSuccess of the FutureCallback that we call the QBit callback aka Callback accept method. A QBit callback is a Java 8 consumer interface Callback<T> extends Consumer<T> which is probably what FutureCallback would have been if it were created post Java 8. You can also see that if the FutureCallback.onFailure gets called and that the code delegates to onError. Fairly simple.

Now we have another service call this service. As in this example CassandraService is a thin wrapper over the Cassandra API.

Example service that uses the Cassandra service

public class EventStorageService {
	private final Logger logger = LoggerFactory.getLogger(EventStorageService.class);

	private final CassandraService cassandraService;


	private final Reactor reactor;

	public EventStorageService (final CassandraService cassandraService,
								final Reactor reactor) {
		this.cassandraService = cassandraService;
		logger.info(" Event Storage Service is up ");

		if (reactor!=null) {
			this.reactor = reactor;
		} else {
			this.reactor = ReactorBuilder.reactorBuilder().build();
		}

	}


	@RequestMapping(value = "/event", method = RequestMethod.POST)
	public void addEventAsync (final Callback<Boolean> statusCallback, final Event event) {
		logger.debug("Storing Event  async {} " , event);
		final EventStorageRecord storageRec = EventConverter.toStorageRec(event);

		final Callback<ResultSet> callback = reactor.callbackBuilder()
				.setCallback(ResultSet.class, resultSet -> 
                                             statusCallback.accept(resultSet!=null))
				.setOnTimeout(() -> statusCallback.accept(false))
				.setOnError(error -> statusCallback.onError(error))
				.build(ResultSet.class);

		this.addEventStorageRecordAsync(callback, storageRec);


	}




	public void addEventStorageRecordAsync (final Callback<ResultSet> callback, 
											final EventStorageRecord storageRec) {
		logger.info("Storing  the record with storage-key {} async  ", storageRec.getStorageKey());

		if(storageRec != null) {

			SimpleStatement simpleStatement = ...;
			cassandraService.executeAsync(callback, simpleStatement);

		}


	}

Note that QBit uses a callbackBuilder so the constituent parts of a callback can be lambda expressions.

A Callback is a rather simple interface that builds on Java 8 Consumer and adds timeout and error handling.

Callback

public interface Callback<T> extends Consumer<T> {

    default void onError(Throwable error) {

        LoggerFactory.getLogger(Callback.class)
                .error(error.getMessage(), error);
    }


    default void onTimeout() {

    }

}

The Reactor is class to manage timeouts, schedule periodic tasks, and other service call coordination. We initialize the Reactor in the constructor of the EventStorageService as seen in the previous code listing. We use the callbackBuilder created from the Reactor as it will register the callbacks with the reactor for timeouts and such.

To enable the reactor, we must call it from service queue callback method of idle, limit and empty. One merely needs to call reactor.process from the callback, and it will periodically check for timeouts and such.

Calling reactor process to process callbacks and handle timeouts

	@QueueCallback({
			QueueCallbackType.LIMIT, 
			QueueCallbackType.IDLE,
			QueueCallbackType.EMPTY})
	public void process() {
		reactor.process();
	}

Underneath the covers.

The Reactor uses AsyncFutureCallback which is both a Future, Runnable and a Callback so therefore a Consumer. Rather then invent our own async API or functional API we decided to lean on Java 8, and build on the shoulders of giants.

Reactor uses AsyncFutureCallback internally. And CallBack builder really builds AsyncFutureCallback

public interface AsyncFutureCallback<T> extends Runnable, Callback<T>, Future<T> {
    Exception CANCEL = new Exception("Cancelled RunnableCallback");

    boolean checkTimeOut(long now);

    void accept(T t);

    void onError(Throwable error);

    void run();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);

    @Override
    boolean isCancelled();

    @Override
    boolean isDone();

    @Override
    T get();

    @SuppressWarnings("NullableProblems")
    @Override
    T get(long timeout, TimeUnit unit);


    default boolean timedOut(long now) {

        return !(startTime() == -1 || timeOutDuration() == -1) && (now - startTime()) > timeOutDuration();
    }

    default long timeOutDuration() {
        return -1;
    }


    default long startTime() {
        return -1;
    }

    default void finished() {

    }


    default boolean isTimedOut() {
        return false;
    }
}

You can see that it is quite easy to integrate Cassandra and QBit using QBit's async reactive support. In fact QBit is the easy-on-ramp for Java Async / reactive programming.

Working Async with non-Async APIs using QBit Workers

In a truly reactive world, one can expect that all APIs are async. However, at times we have to integrate with legacy services and legacy APIs like JDBC, legacy services, SOLR.

There are times when you will need worker pools. If you are dealing with IO and the API is not async, then you will want to wrap the API in a service that you can access from a Service pool.

In this example, we will use SOLRJ API which at this time is blocking to access SOLR.

Example SOLR service

public class SolrServiceImpl implements SolrService {

	
	/**
	 * Create SolrCalypsoDataStore with config file.
	 *
	 * @param solrConfig solrConfig
	 */
	public SolrServiceImpl(final SolrConfig solrConfig, ...) {

		logger.info("SOLR Calypso Exporter Service init {}", solrConfig);
		healthServiceAsync.register(HEALTH_NAME, 20, TimeUnit.SECONDS);
		this.solrConfig = solrConfig;
		connect();
	}

        ...
	
        /**
	 * Connect to solr.
	 */
	private void connect() {

          ...
	}


	@Override
	public void storeEvent(Event event) {
		store(event);
	}

	@Override
	public void storeTimeSeries(TimeSeries timeSeries) { store(timeSeries);}


	@Override
	public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
		 callback.accept(doGet(queryParams));
	}

	private boolean store(final Object data) {

		logger.info("store():: importing calypso data event into solr {}",
				data);

		if (connectedToSolr) {

			SolrInputDocument doc = SolrServiceHelper.getSolrDocument(data);

			try {
				UpdateResponse ur = client.add(doc);
				if (solrConfig.isForceCommit()) {
					client.commit();
				}
				
			} catch (Exception e) {
                             ...	
		       }

			return true;
		} else {
                            ...
			return false;
		}
	}

	/**
	 * Proxy the request to solr
	 * @param queryParams query params
	 * @return
	 */
	public String doGet(@RequestParam(value = "q", required = true) String queryParams) {
		
		queryParams = queryParams.replaceAll("\\n", "");
		
		logger.debug("Processing query params: {} ", queryParams);
		String solrQueryUrl = this.solrConfig.getSolrQueryUrl() + queryParams;
		
		logger.info("solr request Built {} ", solrQueryUrl);
		
		String result = null;
		try {
			result = IOUtils.toString(new URI(solrQueryUrl));
			
		} catch (IOException | URISyntaxException e) {
			logger.error("Failed to get solr response for queryUrl {} ", solrQueryUrl, e);
		}
		
		return result;
	}
	
	

	@QueueCallback(QueueCallbackType.SHUTDOWN)
	public void stop() {

		logger.info("Solr Client stopped");
		try {
			
			this.client.close();
			this.connectedToSolr = false;
		} catch (IOException e) {
			logger.warn("Exception while closing the solr client ", e);
		}

	}
}

Pretty simple. Mainly for an example. Now we want to access this from multiple threads since SOLR can block.

To do this we will use a RoundRobinServiceWorkerBuilder which creates a RoundRobinServiceWorker. To get more background on workers in QBit read sharded service workers and service workers.

A RoundRobinServiceWorker is a start-able service dispatcher (Startable, ServiceMethodDispatcher) which can be registered with a ServiceBundle. A ServiceMethodDispatcher is an object that can dispatch method calls to a service.

Using RoundRobinServiceWorker to create a service worker pool

final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();

final CassandraService cassandraService = new CassandraService(config.cassandra);


/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);

/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
        -> new SolrServiceImpl(config.solr));

/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();

/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();

/* Create other end points and register them with service endpoint server. */
final SolrServiceEndpoint solrServiceEndpoint = new SolrServiceEndpoint(solrWorkers);
final EventStorageService eventStorageService = new EventStorageService(cassandraService);

//final EventManager eventManager = managedServiceBuilder.getEventManager(); In 0.8.16+
final EventManager eventManager = QBit.factory().systemEventManager();
final IngestionService ingestionService = new IngestionService(eventManager);



managedServiceBuilder.getEndpointServerBuilder().setUri("/").build()
                .initServices( cassandraService,
                			   eventStorageService,
                			   ingestionService,
                               solrServiceEndpoint
                			 )
                .startServer();
        

Notice this code that creates a RoundRobinServiceWorkerBuilder.

Working with RoundRobinServiceWorkerBuilder

        /* Create the round robin dispatcher with 16 threads. */
        final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);

Above we are creating the builder and setting the number of workers for the round robin dispatcher. The default is to set the number equal to the number of available CPUs. Next we need to tell the builder how to create the service impl objects as follows:

Registering a callback to create instance of the service.

        /* Register a callback to create instances. */
        roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
              -> new SolrServiceImpl(config.solr));

NOTE: Note that you use RoundRobinServiceWorkerBuilder when the services are stateless (other than connection state) and you use ShardedServiceWorkerBuilder if you must maintain sharded state (caches or some such).

A ServiceBundle knows how to deal with a collection of addressable ServiceMethodDispatchers. Thus to use the RoundRobinServiceWorker we need to use a service bundle. Therefore, we create a service bundle and register the service worker with it.

Registering the roundRobinServiceWorker with a service bundle

        /* Build and start the dispatcher. */
        final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
        serviceMethodDispatcher.start();

        /* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
        final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
        bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
        final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
        bundle.start();

Service bundles do not auto flush, and we are using an interface from a service bundle from our SolrServiceEndpoint instance. Therefore, we should use a Reactor. A QBit Reactor is owned by a service that is siting behind a service queue (ServiceQueue). You can register services to be flushed with a reactor, you can register for repeating jobs with the reactor, and you can coordinate callbacks with the reactor. The reactor has a process method that needs to be periodically called during idle times, when batch limits (queue is full) are met and when the queue is empty. We do that by calling the process method as follows:

SolrServiceEndpoint using a reactor object to manage callbacks and flushes

@RequestMapping(value = "/storage/solr", method = RequestMethod.ALL)
public class SolrServiceEndpoint {


    private final SolrService solrService;
    private final Reactor reactor;

    public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }

    @OnEvent(IngestionService.NEW_EVENT_CHANNEL)
    public void storeEvent(final Event event) {
        solrService.storeEvent(event);
    }

    @OnEvent(IngestionService.NEW_TIMESERIES_CHANNEL)
    public void storeTimeSeries(final TimeSeries timeSeries) {
        solrService.storeTimeSeries(timeSeries);
    }


    /**
     * Proxy the request to solr
     *
     * @param queryParams
     * @return
     */
    @RequestMapping(value = "/get", method = RequestMethod.GET)
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
        solrService.get(callback, queryParams);
    }


    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    public void process() {
        reactor.process();
    }
}

Notice that the process method of SolrServiceEndpoint uses the QueueCallBack annotation and enums (@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT}), and then all it does it call reactor.process. In the constructor, we registered the solrService service proxy with the reactor.

Registering the solrService with the reactor

 public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }

To learn more about QBit worker pools read QBit Java Microservice Lib Working with Workers Sharded and Pooled and QBit microservice worker pools.

Clone this wiki locally