Skip to content
This repository has been archived by the owner on Aug 3, 2019. It is now read-only.
Compare
Choose a tag to compare
@mp911de mp911de released this 18 Feb 19:49
· 18 commits to master since this release

SocketAddressSupplier API

The SocketAddressSupplier API allows to control the used
connection points/SocketAddresses for initial connection and reconnection.
The use case for controlling the source and sequence of
connection points are failover and cluster node discovery.

The API exposes a factory for SocketAddressSuppliers which accepts
a RedisURI. By default, multiple addresses are utilized in with RoundRobin.

The predefined methods can be found in the SocketAddressSupplierFactory.Factories enum:

  • ROUND_ROBIN: Cyclic use of multiple addresses specified by the RedisURI
  • HELLO_CLUSTER: Uses ROUND_ROBIN for the initial connect. Once a connection is
    established the mechanism obtains the cluster nodes using the HELLO
    command AT STARTUP. Periodical scheduling/updating is currently not implemented.
    This can be however achieved by implementing an own factory
    that calls the reloadNodes() method periodically/on demand.

New mechanisms can be implemented by implementing the
SocketAddressSupplier/SocketAddressSupplierFactory interfaces.

Submit your mechanism by opening a Pull-Request if you want to contribute to spinach.

DisqueClient client = new DisqueClient();

DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
                SocketAddressSupplierFactory.Factories.ROUND_ROBIN);


DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
                SocketAddressSupplierFactory.Factories.HELLO_CLUSTER);

QueueListener API

Spinach allows a push pattern to obtain jobs from Disque. The QueueListener API
introduces a listener/notification style by utilizing rx-java Observables.
The QueueListener API exposes an Observable<Job> that emits jobs.
The jobs can be processed asynchronously by applying transformations
or doOnNext steps. Each Observable<Job> allocates at the time
of subscription resources. The resources are released when unsubscribing
from the subscription.

QueueListenerFactory<String, String> queueListenerFactory = QueueListenerFactory.create(disqueURI,
            new Utf8StringCodec(), "queue");  // 1

Observable<Job<String, String>> getjobs = queueListenerFactory.getjobs(); // 2

Subscription subscribe = getjobs.doOnNext(new Action1<Job<String, String>>() { // 3
            @Override
            public void call(Job<String, String> job) {
                // process the job
                System.out.println(job.getId());
            }
        }).doOnNext(new Action1<Job<String, String>>() {
            @Override
            public void call(Job<String, String> job) {
                // ack the job (different connection)
                connection.sync().ackjob(job.getId());
            }
        }).subscribe(); // 4
  1. The QueueListenerFactory is set up. You can reuse an existing DisqueClient
    to reuse thread pools.
  2. Create an Observable<Job> by calling the getjobs method. This call
    just creates the observable, but no resources are allocated.
  3. Apply transformations or set up callbacks to process the jobs
  4. Finally subscribe and open the connection/listener process

To be honest, the QueueListener API is just a side-product of the
recommendation to keep track of the producer node id when receiving jobs.
A client can optimize locality to connect directly the node that produces
most of the received jobs. The QueueListenerFactory can initiate locality tracking
for a particular observable and can enable periodically scheduled checks to
switch to the node that produced the most received jobs.

Locality tracking

The node switch can also be triggered directly on a QueueListenerFactory for all
active listeners with enabled locality tracking.

QueueListenerFactory<String, String> queueListenerFactory = QueueListenerFactory.create(disqueURI,
            new Utf8StringCodec(), "queue");  // 1

Observable<Job<String, String>> getjobs = queueListenerFactory
                                            .withLocalityTracking() // 2
                                            .withNodeSwitching(2, TimeUnit.MINUTES) // 3
                                            .getjobs(); // 4

Subscription subscribe = getjobs.doOnNext(new Action1<Job<String, String>>() { // 5
            @Override
            public void call(Job<String, String> job) {
                // process the job
                System.out.println(job.getId());
            }
        }).doOnNext(new Action1<Job<String, String>>() {
            @Override
            public void call(Job<String, String> job) {
                // ack the job (different connection)
                connection.sync().ackjob(job.getId());
            }
        }).subscribe(); // 6

queueListenerFactory.switchNodes(); // 7
  1. The QueueListenerFactory is set up. You can reuse an existing DisqueClient
    to reuse thread pools.
  2. Enable locality tracking for this particular Observable<Job>
  3. Enable node switching. The check whether node switching is necessary and
    the actual reconnect happens every 2 minutes.
  4. Create an Observable<Job> by calling the getjobs method. This call
    just creates the observable, but no resources are allocated.
  5. Apply transformations or set up callbacks to process the jobs
  6. Finally subscribe and open the connection/listener process
  7. Initiate a programmatic node switch check

The QueueListener API does not emit a terminal event.

Please note this API is experimental and may change.

Upgrade to lettuce 3.4

With this upgrade you get all the features provided within lettuce 3.4.

Some of the highlights, also available for spinach are:

  • Reusable ClientResources
  • EventBus and Client Events
  • Command Latency Metrics

Read more: https://github.com/mp911de/lettuce/releases/tag/3.4.Final

Updated dependencies

netty 4.0.28.Final -> 4.0.34.Final

Enhancements

  • Keep track of nodeId's that were obtained by GETJOB/Implement QueueListener API #8
  • Support a pluggable reconnect mechanism #9
  • Upgrade to lettuce 3.4 #10
  • Getjob NOHANG, WITHARGUMENTS #14 #15 (thanks to @macobo)
  • Switch nodes when received LEAVING in QueueListener API #17
  • Adopt new Disque Job ID format #18
  • Implement PAUSE command #19
  • Implement QSTAT command #20

Fixes

  • Do not cache InetSocketAddress in DisqueURI #16
  • Fix addjob documentation about timeout/TTL #21

For complete information on spinach see the website: