Skip to content

[Document] EventBus using Consul and QBit to wire together an event bus.

Richard Hightower edited this page Nov 28, 2015 · 1 revision

QBit has an event system. You have likely seen it if you have read through the QBit documents.

QBit allows the event bus to be connected to remote instance of QBit forming a cluster.

It does this through the ServiceDiscovery that QBit provides.

By default the EventBusClusterBuilder will use the ConsulServiceDiscoveryBuilder if you do not provide it a ServiceDiscovery. You can read more about Consul and Service Discovery.

To construct an event bus, you first start up Consul.

Using Consul

consul agent -server -bootstrap-expect 1 -dc dc1 -data-dir /tmp/consulqbit -ui-dir ./support/ui/

Next you use the EventBusClusterBuilder to construct an event bus cluster as follows:

EventBusClusterBuilder

        final EventBusClusterBuilder eventBusClusterBuilder = EventBusClusterBuilder.eventBusClusterBuilder();
        eventBusClusterBuilder.setEventBusName("event-bus");
        eventBusClusterBuilder.setReplicationPortLocal(replicatorPort);
        final EventBusCluster eventBusCluster = eventBusClusterBuilder.build();
        eventBusCluster.start();

Then you inject eventBusCluster event manager into builders.

Inject the event manager from EventBusCluster into service builders

        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder
                .managedServiceBuilder().setRootURI("/")
                .setEventManager(eventBusCluster.eventManagerImpl())
                .setPort(webPort);

You inject client proxies of the event manager into other services.

inject client proxies of the event manager into other services.

        final EventExampleService eventExampleService = new EventExampleService(
                eventBusCluster.createClientEventManager(),
                "event.",
                ReactorBuilder.reactorBuilder().build(),
                Timer.timer(),
                managedServiceBuilder.getStatServiceBuilder().buildStatsCollector());

        managedServiceBuilder.addEndpointService(eventExampleService);

Here is a complete example:

Complete example

package io.advantageous.qbit.example.event.bus;


import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.Listen;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.http.DELETE;
import io.advantageous.qbit.annotation.http.GET;
import io.advantageous.qbit.annotation.http.POST;
import io.advantageous.qbit.eventbus.EventBusCluster;
import io.advantageous.qbit.eventbus.EventBusClusterBuilder;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventManagerBuilder;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.service.BaseService;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.qbit.util.Timer;

import java.util.ArrayList;
import java.util.List;

/**
 * curl http://localhost:8080/event/
 * curl -X POST  -H "Content-Type: application/json"  http://localhost:8080/event -d '{"id":"123", "message":"hello"}'
 */
@RequestMapping("/")
public class EventExampleService extends BaseService{

    private final EventManager eventManager;
    private final List<MyEvent> events = new ArrayList<>();

    public EventExampleService(final EventManager eventManager,
                               final String statKeyPrefix,
                               final Reactor reactor,
                               final Timer timer,
                               final StatsCollector statsCollector) {
        super(statKeyPrefix, reactor, timer, statsCollector);
        this.eventManager = eventManager;
        reactor.addServiceToFlush(eventManager);
    }

    @POST("/event")
    public boolean sendEvent(MyEvent event) {
        eventManager.sendArguments("myevent", event);
        return true;
    }


    @DELETE("/event/")
    public boolean clearEvents() {
         events.clear();
         return true;
    }

    @GET("/event/")
    public List<MyEvent> getEvents() {
        return events;
    }

    @Listen("myevent")
    public void listenEvent(final MyEvent event) {
        events.add(event);
    }

    public static void run(final int webPort, final int replicatorPort)  {

        final EventBusClusterBuilder eventBusClusterBuilder = EventBusClusterBuilder.eventBusClusterBuilder();
        eventBusClusterBuilder.setEventBusName("event-bus");
        eventBusClusterBuilder.setReplicationPortLocal(replicatorPort);
        final EventBusCluster eventBusCluster = eventBusClusterBuilder.build();
        eventBusCluster.start();


        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder
                .managedServiceBuilder().setRootURI("/")
                .setEventManager(eventBusCluster.eventManagerImpl())
                .setPort(webPort);

        final EventExampleService eventExampleService = new EventExampleService(
                eventBusCluster.createClientEventManager(),
                "event.",
                ReactorBuilder.reactorBuilder().build(),
                Timer.timer(),
                managedServiceBuilder.getStatServiceBuilder().buildStatsCollector());
        managedServiceBuilder.addEndpointService(eventExampleService);

        managedServiceBuilder.getEndpointServerBuilder().build().startServerAndWait();



    }
    public static void main(final String... args)  {

        run(8080, 7070);

    }
}

....

package io.advantageous.qbit.example.event.bus;

public class MyEvent {

    private String id;
    private String message;


    public String getId() {
        return id;
    }

    public MyEvent setId(String id) {
        this.id = id;
        return this;
    }

    public String getMessage() {
        return message;
    }

    public MyEvent setMessage(String message) {
        this.message = message;
        return this;
    }
}
...
package io.advantageous.qbit.example.event.bus;

public class SecondEventExample {
    public static void main(final String... args)  {

        EventExampleService.run(6060, 5050);

    }

}

....
package io.advantageous.qbit.example.event.bus;

public class ThirdEventExample {

    public static void main(final String... args)  {
        EventExampleService.run(4040, 3030);
    }
}

To run this run Consul, EventExampleService, ThirdEventExample, and SecondEventExample, then use the following curl commands.

Curl commands to exercise example.

## Send an event
$ curl -X POST  -H "Content-Type: application/json"  http://localhost:8080/event -d '{"id":"123", "message":"hello"}'
true

## See the event is on each of the nodes.
$ curl http://localhost:8080/event/
[{"id":"123","message":"hello"}]

$ curl http://localhost:6060/event/
[{"id":"123","message":"hello"}]

$ curl http://localhost:4040/event/
[{"id":"123","message":"hello"}]

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally