-
Notifications
You must be signed in to change notification settings - Fork 140
[Z Design] Existing design of method dispatch (circa 2016) May 3rd
This is not documentation but merely notes on the design (current code really) of the EndpointServer as it stands April 2016. It is very useful for understanding how QBit works.
An EndpointServer
is built by the EndpointServerBuilder
. The EndpointServerBuilder
produces a ServiceEndpointServer
.
public ServiceEndpointServer build() {
final ServiceBundle serviceBundle;
serviceBundle = getFactory().createServiceBundle(uri,
getRequestQueueBuilder(),
getResponseQueueBuilder(),
getWebResponseQueueBuilder(),
getFactory(),
eachServiceInItsOwnThread, this.getBeforeMethodCall(),
this.getBeforeMethodCallAfterTransform(),
this.getArgTransformer(), true,
getSystemManager(),
getHealthService(),
getStatsCollector(), getTimer(),
getStatsFlushRateSeconds(),
getCheckTimingEveryXCalls(),
getCallbackManager(),
getEventManager(),
getBeforeMethodSent(),
getBeforeMethodCallOnServiceQueue(), getAfterMethodCallOnServiceQueue());
final ServiceEndpointServer serviceEndpointServer = new ServiceEndpointServerImpl(getHttpServer(),
getEncoder(), getParser(), serviceBundle, getJsonMapper(), this.getTimeoutSeconds(),
this.getNumberOfOutstandingRequests(), getProtocolBatchSize(),
this.getFlushInterval(), this.getSystemManager(), getEndpointName(),
getServiceDiscovery(), getPort(), getTtlSeconds(), getHealthService(), getErrorHandler(),
getFlushResponseInterval(), getParserWorkerCount(), getEncoderWorkerCount());
if (serviceEndpointServer != null && qBitSystemManager != null) {
qBitSystemManager.registerServer(serviceEndpointServer);
}
if (services != null) {
serviceEndpointServer.initServices(services);
}
if (servicesWithAlias != null) {
servicesWithAlias.entrySet().forEach(entry -> serviceEndpointServer.addServiceObject(entry.getKey(), entry.getValue()));
}
return serviceEndpointServer;
}
The serviceBundle
has three queues: method request queue
, response queue
and a web response queue
.
The web response queue
exists so that we can stream responses to the ServiceEndpointServer
as normal callback handling does not make sense in terms of WebSocket
and REST
.
Currently there is one implementation of the ServiceBundle
interface.
package io.advantageous.qbit.service.impl;
/**
* Manages a collection of services.
*/
public class ServiceBundleImpl implements ServiceBundle {
We will refer to it as ServiceBundle
as at the moment there is only one.
The ServiceBundle
was originally used as the primary way to dispatch REST calls, but that activity has been improved upon and the responsibility has moved to other classes, but there still might be some leftover REST/WebSocket dispatch code in the ServiceBundle
.
There are many ways to startup a ServiceBundle
.
public void startReturnHandlerProcessor(ReceiveQueueListener<Response<Object>> listener) {
responseQueue.startListener(listener);
}
public void startWebResponseReturnHandler(ReceiveQueueListener<Response<Object>> listener) {
webResponseQueue.startListener(listener);
}
public void startReturnHandlerProcessor() {
The third noArg startReturnHandlerProcessor
has this logic inside of it.
final Request<Object> originatingRequest = response.request().originatingRequest();
if (originatingRequest == null) {
callbackManager.handleResponse(response);
} else if (originatingRequest instanceof HttpRequest
|| originatingRequest instanceof WebSocketMessage) {
webResponseSendQueue.send(response);
} else {
callbackManager.handleResponse(response);
}
Looks like that can be simplified. :)
One problem with this is we have a web queue in a ServiceBundle
that is an area that can be refactored.
This logic should probably exist only in the ServiceEndpointServer
.
/**
* Sets up the response queue listener so we can sendText responses
* to HTTP / WebSocket end points.
*/
private void startResponseQueueListener() {
serviceBundle.startReturnHandlerProcessor();
serviceBundle.startWebResponseReturnHandler(createResponseQueueListener());
}
...
private void doStart() {
...
serviceBundle.startUpCallQueue();
startResponseQueueListener();
}
The ServiceEndpointServerImpl
also calls serviceBundle.startUpCallQueue()
.
/**
* Start the client bundle.
*/
public ServiceBundle startUpCallQueue() {
methodQueue.startListener(new ReceiveQueueListener<MethodCall<Object>>() {
/**
* When we receive a method call, we call doCall.
* @param item item
*/
@Override
public void receive(MethodCall<Object> item) {
doCall(item);
}
The method serviceBundle.doCall
is one of the best places to debug why things are not working.
class ServiceBundleImpl
...
public ServiceBundle addService(Object object) {
...
public ServiceBundle addServiceWithQueueCallBackHandlers(final Object serviceObject,
final QueueCallBackHandler... queueCallBackHandlers) {
...
public ServiceBundle addServiceObject(final String serviceAddress, final Object serviceObject) {
...
public ServiceBundle addServiceObjectWithQueueCallBackHandlers(final String serviceAddress,
final Object serviceObject,
final QueueCallBackHandler... queueCallBackHandlers) {
...
public ServiceBundle addServiceConsumer(final String serviceAddress,
final Consumer<MethodCall<Object>> methodCallConsumer) {
...
public void addServiceService(final String serviceAddress, final ServiceQueue serviceQueue) {
...
public void addServiceService(final String objectName, final String serviceAddress, final ServiceQueue serviceQueue) {
Let's break this down.
- addService(...)
- addServiceWithQueueCallBackHandlers(...)
- addServiceObject(...)
- addServiceConsumer(...)
- addServiceService(...)
- addRoundRobinService
Now that seems like a lot less.
What do they do.
- addService(...) add a Java object to the service bundle
- addServiceWithQueueCallBackHandlers(...) add a POJO to the service bundle
- addServiceObject(...) add a POJO to the service bundle with an alias
- addServiceConsumer(...) add a MethodConsumer to the service bundle (used for RoundRobin and sharded Service pools)
- addServiceService(...) add a ServiceQueue to the service bundle
All of the add POJO methods end up calling addServiceObjectWithQueueCallBackHandlers
.
Let's cover that one.
public ServiceBundle addServiceObjectWithQueueCallBackHandlers(final String serviceAddress,
final Object serviceObject,
final QueueCallBackHandler... queueCallBackHandlers) {
logger.info("Adding service {} @ {} with object {}", ServiceBundleImpl.class.getName(),
serviceAddress, serviceObject);
if (serviceObject instanceof Consumer) {
//noinspection unchecked
addServiceConsumer(serviceAddress, (Consumer<MethodCall<Object>>) serviceObject);
return this;
}
if (serviceObject instanceof ServiceQueue) {
addServiceService(serviceAddress, (ServiceQueue) serviceObject);
return this;
}
If somehow you called this addServiceObjectWithQueueCallBackHandlers
directly or indirectly with a MethodCall
Consumer
or a ServiceQueue
, it will short circuit to the right add method.
If it does not short circuit, then you gave it a POJO, just a Java object which is your service.
It will then create a ServiceQueue
for your POJO so your POJO handles calls async.
public ServiceBundle addServiceObjectWithQueueCallBackHandlers(final String serviceAddress,
final Object serviceObject,
final QueueCallBackHandler... queueCallBackHandlers) {
...
/** Turn this client object into a client with queues. */
final ServiceBuilder serviceBuilder = ServiceBuilder.serviceBuilder()
.setRootAddress(rootAddress)
.setServiceObject(serviceObject)
.setServiceAddress(serviceAddress)
.setTimer(timer)
.setResponseQueue(responseQueue)
.setAsyncResponse(asyncCalls)
.setInvokeDynamic(invokeDynamic)
.setSystemManager(systemManager)
.setRequestQueueBuilder(BeanUtils.copy(this.requestQueueBuilder))
.setRequestQueueBuilder(requestQueueBuilder)
.setHandleCallbacks(false)
.setCreateCallbackHandler(false)
.setEventManager(eventManager)
.setBeforeMethodCall(this.beforeMethodCallOnServiceQueue)
.setAfterMethodCall(this.afterMethodCallOnServiceQueue);
if (queueCallBackHandlers != null) {
stream(queueCallBackHandlers).forEach(serviceBuilder::addQueueCallbackHandler);
}
final String bindStatHealthName = serviceAddress == null
? AnnotationUtils.readServiceName(serviceObject)
: serviceAddress;
if (healthService != null) {
serviceBuilder.registerHealthChecks(healthService, bindStatHealthName);
}
if (statsCollector != null) {
/*
The default is to flush stats every five seconds, and sample
every 10_000 queue calls.
*/
serviceBuilder.registerStatsCollections(bindStatHealthName,
statsCollector, sampleStatFlushRate, checkTimingEveryXCalls);
}
final ServiceQueue serviceQueue = serviceBuilder.buildAndStart();
addServiceService(serviceAddress, serviceQueue);
return this;
Notice that this calls serviceBuilder.buildAndStart()
which in turns creates the serviceQueue
and
calls startServiceQueue build().startServiceQueue()
.
The serviceQueue
has four start
methods.
- start
- startServiceQueue (same as start, but fluent)
- startAll (which calls startServiceQueue and startCallBackHandler)
- startCallBackHandler used to run the serviceQueue standalone
A serviceQueue
that is getting used in a serviceBundle
should never call startCallBackHandler
or startAll
before it registers.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting