SocketAddressSupplier API
The SocketAddressSupplier API allows to control the used
connection points/SocketAddresses for initial connection and reconnection.
The use case for controlling the source and sequence of
connection points are failover and cluster node discovery.
The API exposes a factory for SocketAddressSupplier
s which accepts
a RedisURI
. By default, multiple addresses are utilized in with RoundRobin.
The predefined methods can be found in the SocketAddressSupplierFactory.Factories
enum:
ROUND_ROBIN
: Cyclic use of multiple addresses specified by theRedisURI
HELLO_CLUSTER
: UsesROUND_ROBIN
for the initial connect. Once a connection is
established the mechanism obtains the cluster nodes using theHELLO
command AT STARTUP. Periodical scheduling/updating is currently not implemented.
This can be however achieved by implementing an own factory
that calls thereloadNodes()
method periodically/on demand.
New mechanisms can be implemented by implementing the
SocketAddressSupplier
/SocketAddressSupplierFactory
interfaces.
Submit your mechanism by opening a Pull-Request if you want to contribute to spinach.
DisqueClient client = new DisqueClient();
DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
SocketAddressSupplierFactory.Factories.ROUND_ROBIN);
DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
SocketAddressSupplierFactory.Factories.HELLO_CLUSTER);
QueueListener API
Spinach allows a push pattern to obtain jobs from Disque. The QueueListener API
introduces a listener/notification style by utilizing rx-java Observables.
The QueueListener API exposes an Observable<Job>
that emits jobs.
The jobs can be processed asynchronously by applying transformations
or doOnNext
steps. Each Observable<Job>
allocates at the time
of subscription resources. The resources are released when unsubscribing
from the subscription.
QueueListenerFactory<String, String> queueListenerFactory = QueueListenerFactory.create(disqueURI,
new Utf8StringCodec(), "queue"); // 1
Observable<Job<String, String>> getjobs = queueListenerFactory.getjobs(); // 2
Subscription subscribe = getjobs.doOnNext(new Action1<Job<String, String>>() { // 3
@Override
public void call(Job<String, String> job) {
// process the job
System.out.println(job.getId());
}
}).doOnNext(new Action1<Job<String, String>>() {
@Override
public void call(Job<String, String> job) {
// ack the job (different connection)
connection.sync().ackjob(job.getId());
}
}).subscribe(); // 4
- The
QueueListenerFactory
is set up. You can reuse an existingDisqueClient
to reuse thread pools. - Create an
Observable<Job>
by calling thegetjobs
method. This call
just creates the observable, but no resources are allocated. - Apply transformations or set up callbacks to process the jobs
- Finally subscribe and open the connection/listener process
To be honest, the QueueListener API is just a side-product of the
recommendation to keep track of the producer node id when receiving jobs.
A client can optimize locality to connect directly the node that produces
most of the received jobs. The QueueListenerFactory can initiate locality tracking
for a particular observable and can enable periodically scheduled checks to
switch to the node that produced the most received jobs.
Locality tracking
The node switch can also be triggered directly on a QueueListenerFactory for all
active listeners with enabled locality tracking.
QueueListenerFactory<String, String> queueListenerFactory = QueueListenerFactory.create(disqueURI,
new Utf8StringCodec(), "queue"); // 1
Observable<Job<String, String>> getjobs = queueListenerFactory
.withLocalityTracking() // 2
.withNodeSwitching(2, TimeUnit.MINUTES) // 3
.getjobs(); // 4
Subscription subscribe = getjobs.doOnNext(new Action1<Job<String, String>>() { // 5
@Override
public void call(Job<String, String> job) {
// process the job
System.out.println(job.getId());
}
}).doOnNext(new Action1<Job<String, String>>() {
@Override
public void call(Job<String, String> job) {
// ack the job (different connection)
connection.sync().ackjob(job.getId());
}
}).subscribe(); // 6
queueListenerFactory.switchNodes(); // 7
- The
QueueListenerFactory
is set up. You can reuse an existingDisqueClient
to reuse thread pools. - Enable locality tracking for this particular
Observable<Job>
- Enable node switching. The check whether node switching is necessary and
the actual reconnect happens every 2 minutes. - Create an
Observable<Job>
by calling thegetjobs
method. This call
just creates the observable, but no resources are allocated. - Apply transformations or set up callbacks to process the jobs
- Finally subscribe and open the connection/listener process
- Initiate a programmatic node switch check
The QueueListener API does not emit a terminal event.
Please note this API is experimental and may change.
Upgrade to lettuce 3.4
With this upgrade you get all the features provided within lettuce 3.4.
Some of the highlights, also available for spinach are:
- Reusable ClientResources
- EventBus and Client Events
- Command Latency Metrics
Read more: https://github.com/mp911de/lettuce/releases/tag/3.4.Final
Updated dependencies
netty 4.0.28.Final -> 4.0.34.Final
Enhancements
- Keep track of nodeId's that were obtained by GETJOB/Implement QueueListener API #8
- Support a pluggable reconnect mechanism #9
- Upgrade to lettuce 3.4 #10
- Getjob NOHANG, WITHARGUMENTS #14 #15 (thanks to @macobo)
- Switch nodes when received LEAVING in QueueListener API #17
- Adopt new Disque Job ID format #18
- Implement PAUSE command #19
- Implement QSTAT command #20
Fixes
For complete information on spinach see the website: