-
Notifications
You must be signed in to change notification settings - Fork 140
[Doc] Calling Cassandra async from QBit using Reactor and CallBackBuilder, and Callbacks
Cassandra offers an async API as does QBit. Cassandra uses Google Guava. QBit uses QBit. :)
How do you combine them so you do not have to create a worker pool in QBit to make async calls to Cassandra?
Let's say you have a Cassandra service like so...
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.advantageous.qbit.annotation.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import io.advantageous.qbit.reactive.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
...
public class CassandraService {
private final Logger logger = LoggerFactory.getLogger(CassandraService.class);
private final CassandraCluster cluster ;
private final CassandraConfig config;
private final Session session; //only one per keyspace,
private final AtomicBoolean isConnected = new AtomicBoolean(false);
/**
* Configure the client to connect to cluster
* @param config
*/
public CassandraService (final CassandraConfig config) {
...
}
public void executeAsync(final Callback<ResultSet> callback, final Statement stmt) {
final ResultSetFuture future = this.session.executeAsync(stmt);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
callback.accept(result);
}
@Override
public void onFailure(Throwable t) {
callback.onError(t);
}
});
}
Note that Futures
from Cassandra driver support comes from the Guava library from google. DataStax has a nice tutorial on using Cassandra async API with Guava.
In this example we have a service called EventStorageService
which endeavors to store an event into Cassandra. Most of the plumbing and tables DDL for the Event have been omitted. This is not a Cassandra tutorial by any means.
Note that in the onSuccess
of the FutureCallback
that we call the QBit callback
aka Callback
accept method. A QBit callback is a Java 8 consumer interface Callback<T> extends Consumer<T>
which is probably what FutureCallback
would have been if it were created post Java 8. You can also see that if the FutureCallback.onFailure
gets called and that the code delegates to onError
. Fairly simple.
Now we have another service call this service. As in this example CassandraService
is a thin wrapper over the Cassandra API.
public class EventStorageService {
private final Logger logger = LoggerFactory.getLogger(EventStorageService.class);
private final CassandraService cassandraService;
private final Reactor reactor;
public EventStorageService (final CassandraService cassandraService,
final Reactor reactor) {
this.cassandraService = cassandraService;
logger.info(" Event Storage Service is up ");
if (reactor!=null) {
this.reactor = reactor;
} else {
this.reactor = ReactorBuilder.reactorBuilder().build();
}
}
@RequestMapping(value = "/event", method = RequestMethod.POST)
public void addEventAsync (final Callback<Boolean> statusCallback, final Event event) {
logger.debug("Storing Event async {} " , event);
final EventStorageRecord storageRec = EventConverter.toStorageRec(event);
final Callback<ResultSet> callback = reactor.callbackBuilder()
.setCallback(ResultSet.class, resultSet ->
statusCallback.accept(resultSet!=null))
.setOnTimeout(() -> statusCallback.accept(false))
.setOnError(error -> statusCallback.onError(error))
.build(ResultSet.class);
this.addEventStorageRecordAsync(callback, storageRec);
}
public void addEventStorageRecordAsync (final Callback<ResultSet> callback,
final EventStorageRecord storageRec) {
logger.info("Storing the record with storage-key {} async ", storageRec.getStorageKey());
if(storageRec != null) {
SimpleStatement simpleStatement = ...;
cassandraService.executeAsync(callback, simpleStatement);
}
}
Note that QBit uses a callbackBuilder
so the constituent parts of a callback can be lambda expressions.
A Callback
is a rather simple interface that builds on Java 8 Consumer and adds timeout and error handling.
public interface Callback<T> extends Consumer<T> {
default void onError(Throwable error) {
LoggerFactory.getLogger(Callback.class)
.error(error.getMessage(), error);
}
default void onTimeout() {
}
}
The Reactor
is class to manage timeouts, schedule periodic tasks, and other service call coordination.
We initialize the Reactor
in the constructor of the EventStorageService
as seen in the previous code listing. We use the callbackBuilder
created from the Reactor
as it will register the callbacks
with the reactor
for timeouts and such.
To enable the reactor, we must call it from service queue callback method of idle, limit and empty. One merely needs to call reactor.process
from the callback, and it will periodically check for timeouts and such.
@QueueCallback({
QueueCallbackType.LIMIT,
QueueCallbackType.IDLE,
QueueCallbackType.EMPTY})
public void process() {
reactor.process();
}
The Reactor
uses AsyncFutureCallback
which is both a Future
, Runnable
and a Callback
so therefore a Consumer
. Rather then invent our own async API or functional API we decided to lean on Java 8, and build on the shoulders of giants.
public interface AsyncFutureCallback<T> extends Runnable, Callback<T>, Future<T> {
Exception CANCEL = new Exception("Cancelled RunnableCallback");
boolean checkTimeOut(long now);
void accept(T t);
void onError(Throwable error);
void run();
@Override
boolean cancel(boolean mayInterruptIfRunning);
@Override
boolean isCancelled();
@Override
boolean isDone();
@Override
T get();
@SuppressWarnings("NullableProblems")
@Override
T get(long timeout, TimeUnit unit);
default boolean timedOut(long now) {
return !(startTime() == -1 || timeOutDuration() == -1) && (now - startTime()) > timeOutDuration();
}
default long timeOutDuration() {
return -1;
}
default long startTime() {
return -1;
}
default void finished() {
}
default boolean isTimedOut() {
return false;
}
}
To learn more about QBit's async support and the Reactor see Reactively handling Async Calls with QBit Microservices Reactor.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting